Skip to content

Commit 5555cdb

Browse files
feat(client): adaptive batch coalescing, configurable via Android UI
Replace the fixed 8ms batch coalesce window with an adaptive scheme: after each new op arrives, wait another 40ms (step) for more ops. The timer resets on every arrival, up to 1000ms (max) from the first op. Both values are configurable via config JSON (coalesce_step_ms, coalesce_max_ms) and two new sliders in the Android Advanced section. Why this helps: Apps Script adds ~1.5s overhead per HTTP call. The previous 8ms window barely caught any concurrent ops — most batches carried just 1 op. With 40ms/1000ms, batches average 2-3 ops, reducing total Apps Script calls for the same workload. Also disables legacy tunnel-node detection (threshold set to 0ms). The heuristic triggered false positives with longer tunnel-node drain times, causing the client to back off polling to 30s intervals and stalling sessions. Tested on device in Iran: Before: P75=6.2s, 61% fast (<3s), ~1 op/batch After: P75=3.0s, 74-85% fast, ~2-3 ops/batch Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7e8e467 commit 5555cdb

5 files changed

Lines changed: 80 additions & 31 deletions

File tree

android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ data class MhrvConfig(
9292
val verifySsl: Boolean = true,
9393
val logLevel: String = "info",
9494
val parallelRelay: Int = 1,
95+
val coalesceStepMs: Int = 40,
96+
val coalesceMaxMs: Int = 1000,
9597
val upstreamSocks5: String = "",
9698

9799
/**
@@ -197,6 +199,8 @@ data class MhrvConfig(
197199
put("verify_ssl", verifySsl)
198200
put("log_level", logLevel)
199201
put("parallel_relay", parallelRelay)
202+
if (coalesceStepMs != 40) put("coalesce_step_ms", coalesceStepMs)
203+
if (coalesceMaxMs != 1000) put("coalesce_max_ms", coalesceMaxMs)
200204
if (upstreamSocks5.isNotBlank()) {
201205
put("upstream_socks5", upstreamSocks5.trim())
202206
}
@@ -304,6 +308,8 @@ object ConfigStore {
304308
if (cfg.verifySsl != defaults.verifySsl) obj.put("verify_ssl", cfg.verifySsl)
305309
if (cfg.logLevel != defaults.logLevel) obj.put("log_level", cfg.logLevel)
306310
if (cfg.parallelRelay != defaults.parallelRelay) obj.put("parallel_relay", cfg.parallelRelay)
311+
if (cfg.coalesceStepMs != defaults.coalesceStepMs) obj.put("coalesce_step_ms", cfg.coalesceStepMs)
312+
if (cfg.coalesceMaxMs != defaults.coalesceMaxMs) obj.put("coalesce_max_ms", cfg.coalesceMaxMs)
307313
if (cfg.upstreamSocks5.isNotBlank()) obj.put("upstream_socks5", cfg.upstreamSocks5)
308314
if (cfg.passthroughHosts.isNotEmpty()) obj.put("passthrough_hosts", JSONArray().apply { cfg.passthroughHosts.forEach { put(it) } })
309315
if (cfg.tunnelDoh != defaults.tunnelDoh) obj.put("tunnel_doh", cfg.tunnelDoh)
@@ -400,6 +406,8 @@ object ConfigStore {
400406
verifySsl = obj.optBoolean("verify_ssl", true),
401407
logLevel = obj.optString("log_level", "info"),
402408
parallelRelay = obj.optInt("parallel_relay", 1),
409+
coalesceStepMs = obj.optInt("coalesce_step_ms", 40),
410+
coalesceMaxMs = obj.optInt("coalesce_max_ms", 1000),
403411
upstreamSocks5 = obj.optString("upstream_socks5", ""),
404412
passthroughHosts = obj.optJSONArray("passthrough_hosts")?.let { arr ->
405413
buildList { for (i in 0 until arr.length()) add(arr.optString(i)) }

android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,32 @@ private fun AdvancedSettings(
12461246
)
12471247
}
12481248

1249+
// Batch coalesce step slider
1250+
Column {
1251+
Text(
1252+
"Coalesce step: ${cfg.coalesceStepMs}ms",
1253+
style = MaterialTheme.typography.bodyMedium,
1254+
)
1255+
Slider(
1256+
value = cfg.coalesceStepMs.toFloat(),
1257+
onValueChange = { onChange(cfg.copy(coalesceStepMs = it.toInt().coerceIn(10, 500))) },
1258+
valueRange = 10f..500f,
1259+
)
1260+
}
1261+
1262+
// Batch coalesce max slider
1263+
Column {
1264+
Text(
1265+
"Coalesce max: ${cfg.coalesceMaxMs}ms",
1266+
style = MaterialTheme.typography.bodyMedium,
1267+
)
1268+
Slider(
1269+
value = cfg.coalesceMaxMs.toFloat(),
1270+
onValueChange = { onChange(cfg.copy(coalesceMaxMs = it.toInt().coerceIn(100, 2000))) },
1271+
valueRange = 100f..2000f,
1272+
)
1273+
}
1274+
12491275
OutlinedTextField(
12501276
value = cfg.upstreamSocks5,
12511277
onValueChange = { onChange(cfg.copy(upstreamSocks5 = it)) },

src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ pub struct Config {
9696
/// script IDs.
9797
#[serde(default)]
9898
pub parallel_relay: u8,
99+
/// Adaptive batch coalesce: after each op arrives, wait this many ms
100+
/// for more ops before firing the batch. Resets on every arrival.
101+
/// 0 = use compiled default (40ms).
102+
#[serde(default)]
103+
pub coalesce_step_ms: u16,
104+
/// Hard cap on total coalesce wait (ms). 0 = use compiled default (1000ms).
105+
#[serde(default)]
106+
pub coalesce_max_ms: u16,
99107
/// Optional explicit SNI rotation pool for outbound TLS to `google_ip`.
100108
/// Empty / missing = auto-expand from `front_domain` (current default of
101109
/// {www, mail, drive, docs, calendar}.google.com). Set to an explicit list

src/proxy_server.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ pub struct ProxyServer {
216216
mitm: Arc<Mutex<MitmCertManager>>,
217217
rewrite_ctx: Arc<RewriteCtx>,
218218
tunnel_mux: Option<Arc<TunnelMux>>,
219+
coalesce_step_ms: u64,
220+
coalesce_max_ms: u64,
219221
}
220222

221223
pub struct RewriteCtx {
@@ -375,6 +377,8 @@ impl ProxyServer {
375377
mitm,
376378
rewrite_ctx,
377379
tunnel_mux: None, // initialized in run() inside the tokio runtime
380+
coalesce_step_ms: if config.coalesce_step_ms > 0 { config.coalesce_step_ms as u64 } else { 40 },
381+
coalesce_max_ms: if config.coalesce_max_ms > 0 { config.coalesce_max_ms as u64 } else { 1000 },
378382
})
379383
}
380384

@@ -388,7 +392,7 @@ impl ProxyServer {
388392
// Initialize TunnelMux inside the runtime (tokio::spawn requires it).
389393
if self.rewrite_ctx.mode == Mode::Full {
390394
if let Some(f) = self.fronter.as_ref() {
391-
self.tunnel_mux = Some(TunnelMux::start(f.clone()));
395+
self.tunnel_mux = Some(TunnelMux::start(f.clone(), self.coalesce_step_ms, self.coalesce_max_ms));
392396
}
393397
}
394398

src/tunnel_client.rs

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -55,28 +55,20 @@ const REPLY_TIMEOUT: Duration = Duration::from_secs(35);
5555
/// connect saves one Apps Script round-trip per new flow.
5656
const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);
5757

58-
/// How long the muxer holds open the batch buffer after the first op
59-
/// arrives, waiting for more ops to coalesce. Issue #231 — the previous
60-
/// implementation drained `try_recv()` *immediately* after the first
61-
/// message landed, so under any non-bursty workload every batch held
62-
/// exactly one op (defeating the entire batching premise). 8 ms is small
63-
/// vs the ~2-7 s Apps Script round-trip the batch is amortizing, but
64-
/// long enough that concurrent HTTP/2 stream openings, parallel fetches,
65-
/// or any other burst lands in the same batch.
66-
const BATCH_COALESCE_WINDOW: Duration = Duration::from_millis(8);
58+
/// Adaptive coalesce defaults: after each new op arrives, wait another
59+
/// step for more ops. Resets on every arrival, up to max from the first
60+
/// op. Overridable via config `coalesce_step_ms` / `coalesce_max_ms`.
61+
const DEFAULT_COALESCE_STEP_MS: u64 = 40;
62+
const DEFAULT_COALESCE_MAX_MS: u64 = 1000;
6763

6864
/// Structured error code the tunnel-node returns when it doesn't know the
6965
/// op (version mismatch). Must match `tunnel-node/src/main.rs`.
7066
const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
7167

72-
/// Empty poll round-trip latency below which we conclude the tunnel-node
73-
/// is *not* long-polling (legacy fixed-sleep drain instead). On a
74-
/// long-poll-capable server an empty poll with no upstream push either
75-
/// returns near `LONGPOLL_DEADLINE` (~5 s) or comes back early *with*
76-
/// pushed bytes — neither matches a fast empty reply. Threshold sits
77-
/// well above the legacy `~350 ms` drain and well below the long-poll
78-
/// floor, so network jitter on either side won't false-trigger.
79-
const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(1500);
68+
/// Legacy tunnel-node detection threshold. Set to 0 to disable — users
69+
/// who deploy their own tunnel-node don't need auto-detection, and the
70+
/// heuristic triggered false positives with longer drain/settle times.
71+
const LEGACY_DETECT_THRESHOLD: Duration = Duration::from_millis(0);
8072

8173
/// How long a deployment stays in "legacy / no long-poll" mode after the
8274
/// last detection. Must be much longer than `LEGACY_DETECT_THRESHOLD` so a
@@ -255,7 +247,7 @@ pub struct TunnelMux {
255247
}
256248

257249
impl TunnelMux {
258-
pub fn start(fronter: Arc<DomainFronter>) -> Arc<Self> {
250+
pub fn start(fronter: Arc<DomainFronter>, coalesce_step_ms: u64, coalesce_max_ms: u64) -> Arc<Self> {
259251
// Dedupe before snapshotting: the aggregate `all_legacy` gate
260252
// compares `legacy_deployments.len()` (a HashMap, so unique
261253
// keys) against this count, so using the raw `num_scripts()`
@@ -280,8 +272,11 @@ impl TunnelMux {
280272
unique_n,
281273
CONCURRENCY_PER_DEPLOYMENT
282274
);
275+
let step = if coalesce_step_ms > 0 { coalesce_step_ms } else { DEFAULT_COALESCE_STEP_MS };
276+
let max = if coalesce_max_ms > 0 { coalesce_max_ms } else { DEFAULT_COALESCE_MAX_MS };
277+
tracing::info!("batch coalesce: step={}ms max={}ms", step, max);
283278
let (tx, rx) = mpsc::channel(512);
284-
tokio::spawn(mux_loop(rx, fronter));
279+
tokio::spawn(mux_loop(rx, fronter, step, max));
285280
Arc::new(Self {
286281
tx,
287282
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
@@ -556,7 +551,9 @@ impl TunnelMux {
556551
}
557552
}
558553

559-
async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
554+
async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>, coalesce_step_ms: u64, coalesce_max_ms: u64) {
555+
let coalesce_step = Duration::from_millis(coalesce_step_ms);
556+
let coalesce_max = Duration::from_millis(coalesce_max_ms);
560557
// One semaphore per deployment ID, each allowing 30 concurrent requests.
561558
let sems: Arc<HashMap<String, Arc<Semaphore>>> = Arc::new(
562559
fronter
@@ -581,20 +578,28 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
581578
Some(msg) => msgs.push(msg),
582579
None => break,
583580
}
584-
let deadline = tokio::time::Instant::now() + BATCH_COALESCE_WINDOW;
581+
let hard_deadline = tokio::time::Instant::now() + coalesce_max;
582+
let mut soft_deadline = tokio::time::Instant::now() + coalesce_step;
585583
loop {
586584
// Drain anything that's already queued without waiting.
587585
while let Ok(msg) = rx.try_recv() {
588586
msgs.push(msg);
587+
// Reset the soft deadline — more ops are arriving.
588+
soft_deadline = tokio::time::Instant::now() + coalesce_step;
589589
}
590590
let now = tokio::time::Instant::now();
591-
if now >= deadline {
591+
let wait_until = soft_deadline.min(hard_deadline);
592+
if now >= wait_until {
592593
break;
593594
}
594-
match tokio::time::timeout(deadline - now, rx.recv()).await {
595-
Ok(Some(msg)) => msgs.push(msg),
595+
match tokio::time::timeout(wait_until - now, rx.recv()).await {
596+
Ok(Some(msg)) => {
597+
msgs.push(msg);
598+
// New op arrived — extend the soft deadline.
599+
soft_deadline = tokio::time::Instant::now() + coalesce_step;
600+
}
596601
Ok(None) => return,
597-
Err(_) => break,
602+
Err(_) => break, // soft or hard deadline hit, no more ops
598603
}
599604
}
600605

@@ -787,11 +792,9 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
787792
}
788793

789794
/// Pick a deployment, acquire its per-account concurrency slot, and spawn
790-
/// a batch request task.
791-
///
792-
/// The batch HTTP round-trip is bounded by `BATCH_TIMEOUT` so a slow or
793-
/// dead tunnel-node target cannot hold a pipeline slot (and block waiting
794-
/// sessions) forever.
795+
/// a batch request task. The semaphore acquire blocks the mux_loop until
796+
/// a slot opens — this is intentional: under heavy load, ops pile up in
797+
/// the channel and get collected into bigger batches on the next iteration.
795798
async fn fire_batch(
796799
sems: &Arc<HashMap<String, Arc<Semaphore>>>,
797800
fronter: &Arc<DomainFronter>,

0 commit comments

Comments
 (0)