Skip to content

Commit 0ca6f77

Browse files
feat(client): adaptive batch coalescing, configurable via Android UI
Replace the fixed 8ms batch coalesce window with an adaptive scheme: after each new op arrives, wait another 40ms (step) for more ops. The timer resets on every arrival, up to 1000ms (max) from the first op. Both values are configurable via config JSON (coalesce_step_ms, coalesce_max_ms) and two new sliders in the Android Advanced section. Why this helps: Apps Script adds ~1.5s overhead per HTTP call. The previous 8ms window barely caught any concurrent ops — most batches carried just 1 op. With 40ms/1000ms, batches average 2-3 ops, reducing total Apps Script calls for the same workload. Tested on device in Iran: Before: P75=6.2s, 61% fast (<3s), ~1 op/batch After: P75=3.0s, 74-85% fast, ~2-3 ops/batch Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7e8e467 commit 0ca6f77

5 files changed

Lines changed: 76 additions & 21 deletions

File tree

android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ data class MhrvConfig(
9292
val verifySsl: Boolean = true,
9393
val logLevel: String = "info",
9494
val parallelRelay: Int = 1,
95+
val coalesceStepMs: Int = 40,
96+
val coalesceMaxMs: Int = 1000,
9597
val upstreamSocks5: String = "",
9698

9799
/**
@@ -197,6 +199,8 @@ data class MhrvConfig(
197199
put("verify_ssl", verifySsl)
198200
put("log_level", logLevel)
199201
put("parallel_relay", parallelRelay)
202+
if (coalesceStepMs != 40) put("coalesce_step_ms", coalesceStepMs)
203+
if (coalesceMaxMs != 1000) put("coalesce_max_ms", coalesceMaxMs)
200204
if (upstreamSocks5.isNotBlank()) {
201205
put("upstream_socks5", upstreamSocks5.trim())
202206
}
@@ -304,6 +308,8 @@ object ConfigStore {
304308
if (cfg.verifySsl != defaults.verifySsl) obj.put("verify_ssl", cfg.verifySsl)
305309
if (cfg.logLevel != defaults.logLevel) obj.put("log_level", cfg.logLevel)
306310
if (cfg.parallelRelay != defaults.parallelRelay) obj.put("parallel_relay", cfg.parallelRelay)
311+
if (cfg.coalesceStepMs != defaults.coalesceStepMs) obj.put("coalesce_step_ms", cfg.coalesceStepMs)
312+
if (cfg.coalesceMaxMs != defaults.coalesceMaxMs) obj.put("coalesce_max_ms", cfg.coalesceMaxMs)
307313
if (cfg.upstreamSocks5.isNotBlank()) obj.put("upstream_socks5", cfg.upstreamSocks5)
308314
if (cfg.passthroughHosts.isNotEmpty()) obj.put("passthrough_hosts", JSONArray().apply { cfg.passthroughHosts.forEach { put(it) } })
309315
if (cfg.tunnelDoh != defaults.tunnelDoh) obj.put("tunnel_doh", cfg.tunnelDoh)
@@ -400,6 +406,8 @@ object ConfigStore {
400406
verifySsl = obj.optBoolean("verify_ssl", true),
401407
logLevel = obj.optString("log_level", "info"),
402408
parallelRelay = obj.optInt("parallel_relay", 1),
409+
coalesceStepMs = obj.optInt("coalesce_step_ms", 40),
410+
coalesceMaxMs = obj.optInt("coalesce_max_ms", 1000),
403411
upstreamSocks5 = obj.optString("upstream_socks5", ""),
404412
passthroughHosts = obj.optJSONArray("passthrough_hosts")?.let { arr ->
405413
buildList { for (i in 0 until arr.length()) add(arr.optString(i)) }

android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,32 @@ private fun AdvancedSettings(
12461246
)
12471247
}
12481248

1249+
// Batch coalesce step slider
1250+
Column {
1251+
Text(
1252+
"Coalesce step: ${cfg.coalesceStepMs}ms",
1253+
style = MaterialTheme.typography.bodyMedium,
1254+
)
1255+
Slider(
1256+
value = cfg.coalesceStepMs.toFloat(),
1257+
onValueChange = { onChange(cfg.copy(coalesceStepMs = it.toInt().coerceIn(10, 500))) },
1258+
valueRange = 10f..500f,
1259+
)
1260+
}
1261+
1262+
// Batch coalesce max slider
1263+
Column {
1264+
Text(
1265+
"Coalesce max: ${cfg.coalesceMaxMs}ms",
1266+
style = MaterialTheme.typography.bodyMedium,
1267+
)
1268+
Slider(
1269+
value = cfg.coalesceMaxMs.toFloat(),
1270+
onValueChange = { onChange(cfg.copy(coalesceMaxMs = it.toInt().coerceIn(100, 2000))) },
1271+
valueRange = 100f..2000f,
1272+
)
1273+
}
1274+
12491275
OutlinedTextField(
12501276
value = cfg.upstreamSocks5,
12511277
onValueChange = { onChange(cfg.copy(upstreamSocks5 = it)) },

src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ pub struct Config {
9696
/// script IDs.
9797
#[serde(default)]
9898
pub parallel_relay: u8,
99+
/// Adaptive batch coalesce: after each op arrives, wait this many ms
100+
/// for more ops before firing the batch. Resets on every arrival.
101+
/// 0 = use compiled default (40ms).
102+
#[serde(default)]
103+
pub coalesce_step_ms: u16,
104+
/// Hard cap on total coalesce wait (ms). 0 = use compiled default (1000ms).
105+
#[serde(default)]
106+
pub coalesce_max_ms: u16,
99107
/// Optional explicit SNI rotation pool for outbound TLS to `google_ip`.
100108
/// Empty / missing = auto-expand from `front_domain` (current default of
101109
/// {www, mail, drive, docs, calendar}.google.com). Set to an explicit list

src/proxy_server.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ pub struct ProxyServer {
216216
mitm: Arc<Mutex<MitmCertManager>>,
217217
rewrite_ctx: Arc<RewriteCtx>,
218218
tunnel_mux: Option<Arc<TunnelMux>>,
219+
coalesce_step_ms: u64,
220+
coalesce_max_ms: u64,
219221
}
220222

221223
pub struct RewriteCtx {
@@ -375,6 +377,8 @@ impl ProxyServer {
375377
mitm,
376378
rewrite_ctx,
377379
tunnel_mux: None, // initialized in run() inside the tokio runtime
380+
coalesce_step_ms: if config.coalesce_step_ms > 0 { config.coalesce_step_ms as u64 } else { 40 },
381+
coalesce_max_ms: if config.coalesce_max_ms > 0 { config.coalesce_max_ms as u64 } else { 1000 },
378382
})
379383
}
380384

@@ -388,7 +392,7 @@ impl ProxyServer {
388392
// Initialize TunnelMux inside the runtime (tokio::spawn requires it).
389393
if self.rewrite_ctx.mode == Mode::Full {
390394
if let Some(f) = self.fronter.as_ref() {
391-
self.tunnel_mux = Some(TunnelMux::start(f.clone()));
395+
self.tunnel_mux = Some(TunnelMux::start(f.clone(), self.coalesce_step_ms, self.coalesce_max_ms));
392396
}
393397
}
394398

src/tunnel_client.rs

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,11 @@ const REPLY_TIMEOUT: Duration = Duration::from_secs(35);
5555
/// connect saves one Apps Script round-trip per new flow.
5656
const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);
5757

58-
/// How long the muxer holds open the batch buffer after the first op
59-
/// arrives, waiting for more ops to coalesce. Issue #231 — the previous
60-
/// implementation drained `try_recv()` *immediately* after the first
61-
/// message landed, so under any non-bursty workload every batch held
62-
/// exactly one op (defeating the entire batching premise). 8 ms is small
63-
/// vs the ~2-7 s Apps Script round-trip the batch is amortizing, but
64-
/// long enough that concurrent HTTP/2 stream openings, parallel fetches,
65-
/// or any other burst lands in the same batch.
66-
const BATCH_COALESCE_WINDOW: Duration = Duration::from_millis(8);
58+
/// Adaptive coalesce defaults: after each new op arrives, wait another
59+
/// step for more ops. Resets on every arrival, up to max from the first
60+
/// op. Overridable via config `coalesce_step_ms` / `coalesce_max_ms`.
61+
const DEFAULT_COALESCE_STEP_MS: u64 = 40;
62+
const DEFAULT_COALESCE_MAX_MS: u64 = 1000;
6763

6864
/// Structured error code the tunnel-node returns when it doesn't know the
6965
/// op (version mismatch). Must match `tunnel-node/src/main.rs`.
@@ -255,7 +251,7 @@ pub struct TunnelMux {
255251
}
256252

257253
impl TunnelMux {
258-
pub fn start(fronter: Arc<DomainFronter>) -> Arc<Self> {
254+
pub fn start(fronter: Arc<DomainFronter>, coalesce_step_ms: u64, coalesce_max_ms: u64) -> Arc<Self> {
259255
// Dedupe before snapshotting: the aggregate `all_legacy` gate
260256
// compares `legacy_deployments.len()` (a HashMap, so unique
261257
// keys) against this count, so using the raw `num_scripts()`
@@ -280,8 +276,11 @@ impl TunnelMux {
280276
unique_n,
281277
CONCURRENCY_PER_DEPLOYMENT
282278
);
279+
let step = if coalesce_step_ms > 0 { coalesce_step_ms } else { DEFAULT_COALESCE_STEP_MS };
280+
let max = if coalesce_max_ms > 0 { coalesce_max_ms } else { DEFAULT_COALESCE_MAX_MS };
281+
tracing::info!("batch coalesce: step={}ms max={}ms", step, max);
283282
let (tx, rx) = mpsc::channel(512);
284-
tokio::spawn(mux_loop(rx, fronter));
283+
tokio::spawn(mux_loop(rx, fronter, step, max));
285284
Arc::new(Self {
286285
tx,
287286
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
@@ -556,7 +555,9 @@ impl TunnelMux {
556555
}
557556
}
558557

559-
async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
558+
async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>, coalesce_step_ms: u64, coalesce_max_ms: u64) {
559+
let coalesce_step = Duration::from_millis(coalesce_step_ms);
560+
let coalesce_max = Duration::from_millis(coalesce_max_ms);
560561
// One semaphore per deployment ID, each allowing 30 concurrent requests.
561562
let sems: Arc<HashMap<String, Arc<Semaphore>>> = Arc::new(
562563
fronter
@@ -574,27 +575,35 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
574575
loop {
575576
let mut msgs = Vec::new();
576577
// Block on the first message — no point waking up to find an empty
577-
// queue. Once the first op lands, we hold open BATCH_COALESCE_WINDOW
578-
// so concurrent ops (parallel fetches, HTTP/2 stream openings, etc.)
579-
// land in the same batch instead of getting a fresh round-trip each.
578+
// queue. Once the first op lands, the adaptive coalesce loop waits
579+
// in `coalesce_step` increments (resetting on each new arrival, up
580+
// to `coalesce_max`) so concurrent ops land in the same batch.
580581
match rx.recv().await {
581582
Some(msg) => msgs.push(msg),
582583
None => break,
583584
}
584-
let deadline = tokio::time::Instant::now() + BATCH_COALESCE_WINDOW;
585+
let hard_deadline = tokio::time::Instant::now() + coalesce_max;
586+
let mut soft_deadline = tokio::time::Instant::now() + coalesce_step;
585587
loop {
586588
// Drain anything that's already queued without waiting.
587589
while let Ok(msg) = rx.try_recv() {
588590
msgs.push(msg);
591+
// Reset the soft deadline — more ops are arriving.
592+
soft_deadline = tokio::time::Instant::now() + coalesce_step;
589593
}
590594
let now = tokio::time::Instant::now();
591-
if now >= deadline {
595+
let wait_until = soft_deadline.min(hard_deadline);
596+
if now >= wait_until {
592597
break;
593598
}
594-
match tokio::time::timeout(deadline - now, rx.recv()).await {
595-
Ok(Some(msg)) => msgs.push(msg),
599+
match tokio::time::timeout(wait_until - now, rx.recv()).await {
600+
Ok(Some(msg)) => {
601+
msgs.push(msg);
602+
// New op arrived — extend the soft deadline.
603+
soft_deadline = tokio::time::Instant::now() + coalesce_step;
604+
}
596605
Ok(None) => return,
597-
Err(_) => break,
606+
Err(_) => break, // soft or hard deadline hit, no more ops
598607
}
599608
}
600609

0 commit comments

Comments
 (0)