Skip to content

Commit 7c89772

Browse files
authored
feat(client): adaptive batch coalescing with configurable UI (#448)
Adaptive batch coalescing from @yyoyoian-pixel based on field testing in Iran. Replace fixed 8ms batch coalesce with adaptive 40ms-step / 1000ms-max scheme. Apps Script adds ~1.5s overhead per HTTP call — packing more ops into each batch means fewer total calls. Field testing showed P75 RTT 6.2s → 3.0s, fast (<3s) batches 61% → 74-85%. Both values configurable via config.json (coalesce_step_ms, coalesce_max_ms) and Android UI Advanced sliders (10-500ms / 100-2000ms). Note: desktop UI's to_config() needs follow-up to round-trip the new fields. Filing immediately as a separate commit so v1.8.4 can ship both PRs together.
2 parents 87ec804 + 0ca6f77 commit 7c89772

5 files changed

Lines changed: 76 additions & 21 deletions

File tree

android/app/src/main/java/com/therealaleph/mhrv/ConfigStore.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ data class MhrvConfig(
9292
val verifySsl: Boolean = true,
9393
val logLevel: String = "info",
9494
val parallelRelay: Int = 1,
95+
val coalesceStepMs: Int = 40,
96+
val coalesceMaxMs: Int = 1000,
9597
val upstreamSocks5: String = "",
9698

9799
/**
@@ -197,6 +199,8 @@ data class MhrvConfig(
197199
put("verify_ssl", verifySsl)
198200
put("log_level", logLevel)
199201
put("parallel_relay", parallelRelay)
202+
if (coalesceStepMs != 40) put("coalesce_step_ms", coalesceStepMs)
203+
if (coalesceMaxMs != 1000) put("coalesce_max_ms", coalesceMaxMs)
200204
if (upstreamSocks5.isNotBlank()) {
201205
put("upstream_socks5", upstreamSocks5.trim())
202206
}
@@ -304,6 +308,8 @@ object ConfigStore {
304308
if (cfg.verifySsl != defaults.verifySsl) obj.put("verify_ssl", cfg.verifySsl)
305309
if (cfg.logLevel != defaults.logLevel) obj.put("log_level", cfg.logLevel)
306310
if (cfg.parallelRelay != defaults.parallelRelay) obj.put("parallel_relay", cfg.parallelRelay)
311+
if (cfg.coalesceStepMs != defaults.coalesceStepMs) obj.put("coalesce_step_ms", cfg.coalesceStepMs)
312+
if (cfg.coalesceMaxMs != defaults.coalesceMaxMs) obj.put("coalesce_max_ms", cfg.coalesceMaxMs)
307313
if (cfg.upstreamSocks5.isNotBlank()) obj.put("upstream_socks5", cfg.upstreamSocks5)
308314
if (cfg.passthroughHosts.isNotEmpty()) obj.put("passthrough_hosts", JSONArray().apply { cfg.passthroughHosts.forEach { put(it) } })
309315
if (cfg.tunnelDoh != defaults.tunnelDoh) obj.put("tunnel_doh", cfg.tunnelDoh)
@@ -400,6 +406,8 @@ object ConfigStore {
400406
verifySsl = obj.optBoolean("verify_ssl", true),
401407
logLevel = obj.optString("log_level", "info"),
402408
parallelRelay = obj.optInt("parallel_relay", 1),
409+
coalesceStepMs = obj.optInt("coalesce_step_ms", 40),
410+
coalesceMaxMs = obj.optInt("coalesce_max_ms", 1000),
403411
upstreamSocks5 = obj.optString("upstream_socks5", ""),
404412
passthroughHosts = obj.optJSONArray("passthrough_hosts")?.let { arr ->
405413
buildList { for (i in 0 until arr.length()) add(arr.optString(i)) }

android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,32 @@ private fun AdvancedSettings(
12461246
)
12471247
}
12481248

1249+
// Batch coalesce step slider
1250+
Column {
1251+
Text(
1252+
"Coalesce step: ${cfg.coalesceStepMs}ms",
1253+
style = MaterialTheme.typography.bodyMedium,
1254+
)
1255+
Slider(
1256+
value = cfg.coalesceStepMs.toFloat(),
1257+
onValueChange = { onChange(cfg.copy(coalesceStepMs = it.toInt().coerceIn(10, 500))) },
1258+
valueRange = 10f..500f,
1259+
)
1260+
}
1261+
1262+
// Batch coalesce max slider
1263+
Column {
1264+
Text(
1265+
"Coalesce max: ${cfg.coalesceMaxMs}ms",
1266+
style = MaterialTheme.typography.bodyMedium,
1267+
)
1268+
Slider(
1269+
value = cfg.coalesceMaxMs.toFloat(),
1270+
onValueChange = { onChange(cfg.copy(coalesceMaxMs = it.toInt().coerceIn(100, 2000))) },
1271+
valueRange = 100f..2000f,
1272+
)
1273+
}
1274+
12491275
OutlinedTextField(
12501276
value = cfg.upstreamSocks5,
12511277
onValueChange = { onChange(cfg.copy(upstreamSocks5 = it)) },

src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ pub struct Config {
9696
/// script IDs.
9797
#[serde(default)]
9898
pub parallel_relay: u8,
99+
/// Adaptive batch coalesce: after each op arrives, wait this many ms
100+
/// for more ops before firing the batch. Resets on every arrival.
101+
/// 0 = use compiled default (40ms).
102+
#[serde(default)]
103+
pub coalesce_step_ms: u16,
104+
/// Hard cap on total coalesce wait (ms). 0 = use compiled default (1000ms).
105+
#[serde(default)]
106+
pub coalesce_max_ms: u16,
99107
/// Optional explicit SNI rotation pool for outbound TLS to `google_ip`.
100108
/// Empty / missing = auto-expand from `front_domain` (current default of
101109
/// {www, mail, drive, docs, calendar}.google.com). Set to an explicit list

src/proxy_server.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ pub struct ProxyServer {
216216
mitm: Arc<Mutex<MitmCertManager>>,
217217
rewrite_ctx: Arc<RewriteCtx>,
218218
tunnel_mux: Option<Arc<TunnelMux>>,
219+
coalesce_step_ms: u64,
220+
coalesce_max_ms: u64,
219221
}
220222

221223
pub struct RewriteCtx {
@@ -375,6 +377,8 @@ impl ProxyServer {
375377
mitm,
376378
rewrite_ctx,
377379
tunnel_mux: None, // initialized in run() inside the tokio runtime
380+
coalesce_step_ms: if config.coalesce_step_ms > 0 { config.coalesce_step_ms as u64 } else { 40 },
381+
coalesce_max_ms: if config.coalesce_max_ms > 0 { config.coalesce_max_ms as u64 } else { 1000 },
378382
})
379383
}
380384

@@ -388,7 +392,7 @@ impl ProxyServer {
388392
// Initialize TunnelMux inside the runtime (tokio::spawn requires it).
389393
if self.rewrite_ctx.mode == Mode::Full {
390394
if let Some(f) = self.fronter.as_ref() {
391-
self.tunnel_mux = Some(TunnelMux::start(f.clone()));
395+
self.tunnel_mux = Some(TunnelMux::start(f.clone(), self.coalesce_step_ms, self.coalesce_max_ms));
392396
}
393397
}
394398

src/tunnel_client.rs

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,11 @@ const REPLY_TIMEOUT: Duration = Duration::from_secs(35);
5555
/// connect saves one Apps Script round-trip per new flow.
5656
const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);
5757

58-
/// How long the muxer holds open the batch buffer after the first op
59-
/// arrives, waiting for more ops to coalesce. Issue #231 — the previous
60-
/// implementation drained `try_recv()` *immediately* after the first
61-
/// message landed, so under any non-bursty workload every batch held
62-
/// exactly one op (defeating the entire batching premise). 8 ms is small
63-
/// vs the ~2-7 s Apps Script round-trip the batch is amortizing, but
64-
/// long enough that concurrent HTTP/2 stream openings, parallel fetches,
65-
/// or any other burst lands in the same batch.
66-
const BATCH_COALESCE_WINDOW: Duration = Duration::from_millis(8);
58+
/// Adaptive coalesce defaults: after each new op arrives, wait another
59+
/// step for more ops. Resets on every arrival, up to max from the first
60+
/// op. Overridable via config `coalesce_step_ms` / `coalesce_max_ms`.
61+
const DEFAULT_COALESCE_STEP_MS: u64 = 40;
62+
const DEFAULT_COALESCE_MAX_MS: u64 = 1000;
6763

6864
/// Structured error code the tunnel-node returns when it doesn't know the
6965
/// op (version mismatch). Must match `tunnel-node/src/main.rs`.
@@ -255,7 +251,7 @@ pub struct TunnelMux {
255251
}
256252

257253
impl TunnelMux {
258-
pub fn start(fronter: Arc<DomainFronter>) -> Arc<Self> {
254+
pub fn start(fronter: Arc<DomainFronter>, coalesce_step_ms: u64, coalesce_max_ms: u64) -> Arc<Self> {
259255
// Dedupe before snapshotting: the aggregate `all_legacy` gate
260256
// compares `legacy_deployments.len()` (a HashMap, so unique
261257
// keys) against this count, so using the raw `num_scripts()`
@@ -280,8 +276,11 @@ impl TunnelMux {
280276
unique_n,
281277
CONCURRENCY_PER_DEPLOYMENT
282278
);
279+
let step = if coalesce_step_ms > 0 { coalesce_step_ms } else { DEFAULT_COALESCE_STEP_MS };
280+
let max = if coalesce_max_ms > 0 { coalesce_max_ms } else { DEFAULT_COALESCE_MAX_MS };
281+
tracing::info!("batch coalesce: step={}ms max={}ms", step, max);
283282
let (tx, rx) = mpsc::channel(512);
284-
tokio::spawn(mux_loop(rx, fronter));
283+
tokio::spawn(mux_loop(rx, fronter, step, max));
285284
Arc::new(Self {
286285
tx,
287286
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
@@ -556,7 +555,9 @@ impl TunnelMux {
556555
}
557556
}
558557

559-
async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
558+
async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>, coalesce_step_ms: u64, coalesce_max_ms: u64) {
559+
let coalesce_step = Duration::from_millis(coalesce_step_ms);
560+
let coalesce_max = Duration::from_millis(coalesce_max_ms);
560561
// One semaphore per deployment ID, each allowing 30 concurrent requests.
561562
let sems: Arc<HashMap<String, Arc<Semaphore>>> = Arc::new(
562563
fronter
@@ -574,27 +575,35 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
574575
loop {
575576
let mut msgs = Vec::new();
576577
// Block on the first message — no point waking up to find an empty
577-
// queue. Once the first op lands, we hold open BATCH_COALESCE_WINDOW
578-
// so concurrent ops (parallel fetches, HTTP/2 stream openings, etc.)
579-
// land in the same batch instead of getting a fresh round-trip each.
578+
// queue. Once the first op lands, the adaptive coalesce loop waits
579+
// in `coalesce_step` increments (resetting on each new arrival, up
580+
// to `coalesce_max`) so concurrent ops land in the same batch.
580581
match rx.recv().await {
581582
Some(msg) => msgs.push(msg),
582583
None => break,
583584
}
584-
let deadline = tokio::time::Instant::now() + BATCH_COALESCE_WINDOW;
585+
let hard_deadline = tokio::time::Instant::now() + coalesce_max;
586+
let mut soft_deadline = tokio::time::Instant::now() + coalesce_step;
585587
loop {
586588
// Drain anything that's already queued without waiting.
587589
while let Ok(msg) = rx.try_recv() {
588590
msgs.push(msg);
591+
// Reset the soft deadline — more ops are arriving.
592+
soft_deadline = tokio::time::Instant::now() + coalesce_step;
589593
}
590594
let now = tokio::time::Instant::now();
591-
if now >= deadline {
595+
let wait_until = soft_deadline.min(hard_deadline);
596+
if now >= wait_until {
592597
break;
593598
}
594-
match tokio::time::timeout(deadline - now, rx.recv()).await {
595-
Ok(Some(msg)) => msgs.push(msg),
599+
match tokio::time::timeout(wait_until - now, rx.recv()).await {
600+
Ok(Some(msg)) => {
601+
msgs.push(msg);
602+
// New op arrived — extend the soft deadline.
603+
soft_deadline = tokio::time::Instant::now() + coalesce_step;
604+
}
596605
Ok(None) => return,
597-
Err(_) => break,
606+
Err(_) => break, // soft or hard deadline hit, no more ops
598607
}
599608
}
600609

0 commit comments

Comments
 (0)