Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ data class MhrvConfig(
val verifySsl: Boolean = true,
val logLevel: String = "info",
val parallelRelay: Int = 1,
val coalesceStepMs: Int = 40,
val coalesceMaxMs: Int = 1000,
val upstreamSocks5: String = "",

/**
Expand Down Expand Up @@ -197,6 +199,8 @@ data class MhrvConfig(
put("verify_ssl", verifySsl)
put("log_level", logLevel)
put("parallel_relay", parallelRelay)
if (coalesceStepMs != 40) put("coalesce_step_ms", coalesceStepMs)
if (coalesceMaxMs != 1000) put("coalesce_max_ms", coalesceMaxMs)
if (upstreamSocks5.isNotBlank()) {
put("upstream_socks5", upstreamSocks5.trim())
}
Expand Down Expand Up @@ -304,6 +308,8 @@ object ConfigStore {
if (cfg.verifySsl != defaults.verifySsl) obj.put("verify_ssl", cfg.verifySsl)
if (cfg.logLevel != defaults.logLevel) obj.put("log_level", cfg.logLevel)
if (cfg.parallelRelay != defaults.parallelRelay) obj.put("parallel_relay", cfg.parallelRelay)
if (cfg.coalesceStepMs != defaults.coalesceStepMs) obj.put("coalesce_step_ms", cfg.coalesceStepMs)
if (cfg.coalesceMaxMs != defaults.coalesceMaxMs) obj.put("coalesce_max_ms", cfg.coalesceMaxMs)
if (cfg.upstreamSocks5.isNotBlank()) obj.put("upstream_socks5", cfg.upstreamSocks5)
if (cfg.passthroughHosts.isNotEmpty()) obj.put("passthrough_hosts", JSONArray().apply { cfg.passthroughHosts.forEach { put(it) } })
if (cfg.tunnelDoh != defaults.tunnelDoh) obj.put("tunnel_doh", cfg.tunnelDoh)
Expand Down Expand Up @@ -400,6 +406,8 @@ object ConfigStore {
verifySsl = obj.optBoolean("verify_ssl", true),
logLevel = obj.optString("log_level", "info"),
parallelRelay = obj.optInt("parallel_relay", 1),
coalesceStepMs = obj.optInt("coalesce_step_ms", 40),
coalesceMaxMs = obj.optInt("coalesce_max_ms", 1000),
upstreamSocks5 = obj.optString("upstream_socks5", ""),
passthroughHosts = obj.optJSONArray("passthrough_hosts")?.let { arr ->
buildList { for (i in 0 until arr.length()) add(arr.optString(i)) }
Expand Down
26 changes: 26 additions & 0 deletions android/app/src/main/java/com/therealaleph/mhrv/ui/HomeScreen.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,32 @@ private fun AdvancedSettings(
)
}

// Batch coalesce step slider
Column {
Text(
"Coalesce step: ${cfg.coalesceStepMs}ms",
style = MaterialTheme.typography.bodyMedium,
)
Slider(
value = cfg.coalesceStepMs.toFloat(),
onValueChange = { onChange(cfg.copy(coalesceStepMs = it.toInt().coerceIn(10, 500))) },
valueRange = 10f..500f,
)
}

// Batch coalesce max slider
Column {
Text(
"Coalesce max: ${cfg.coalesceMaxMs}ms",
style = MaterialTheme.typography.bodyMedium,
)
Slider(
value = cfg.coalesceMaxMs.toFloat(),
onValueChange = { onChange(cfg.copy(coalesceMaxMs = it.toInt().coerceIn(100, 2000))) },
valueRange = 100f..2000f,
)
}

OutlinedTextField(
value = cfg.upstreamSocks5,
onValueChange = { onChange(cfg.copy(upstreamSocks5 = it)) },
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ pub struct Config {
/// script IDs.
#[serde(default)]
pub parallel_relay: u8,
/// Adaptive batch coalesce: after each op arrives, wait this many ms
/// for more ops before firing the batch. Resets on every arrival.
/// 0 = use compiled default (40ms).
#[serde(default)]
pub coalesce_step_ms: u16,
/// Hard cap on total coalesce wait (ms). 0 = use compiled default (1000ms).
#[serde(default)]
pub coalesce_max_ms: u16,
/// Optional explicit SNI rotation pool for outbound TLS to `google_ip`.
/// Empty / missing = auto-expand from `front_domain` (current default of
/// {www, mail, drive, docs, calendar}.google.com). Set to an explicit list
Expand Down
6 changes: 5 additions & 1 deletion src/proxy_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ pub struct ProxyServer {
mitm: Arc<Mutex<MitmCertManager>>,
rewrite_ctx: Arc<RewriteCtx>,
tunnel_mux: Option<Arc<TunnelMux>>,
coalesce_step_ms: u64,
coalesce_max_ms: u64,
}

pub struct RewriteCtx {
Expand Down Expand Up @@ -375,6 +377,8 @@ impl ProxyServer {
mitm,
rewrite_ctx,
tunnel_mux: None, // initialized in run() inside the tokio runtime
coalesce_step_ms: if config.coalesce_step_ms > 0 { config.coalesce_step_ms as u64 } else { 40 },
coalesce_max_ms: if config.coalesce_max_ms > 0 { config.coalesce_max_ms as u64 } else { 1000 },
})
}

Expand All @@ -388,7 +392,7 @@ impl ProxyServer {
// Initialize TunnelMux inside the runtime (tokio::spawn requires it).
if self.rewrite_ctx.mode == Mode::Full {
if let Some(f) = self.fronter.as_ref() {
self.tunnel_mux = Some(TunnelMux::start(f.clone()));
self.tunnel_mux = Some(TunnelMux::start(f.clone(), self.coalesce_step_ms, self.coalesce_max_ms));
}
}

Expand Down
49 changes: 29 additions & 20 deletions src/tunnel_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ const REPLY_TIMEOUT: Duration = Duration::from_secs(35);
/// connect saves one Apps Script round-trip per new flow.
const CLIENT_FIRST_DATA_WAIT: Duration = Duration::from_millis(50);

/// How long the muxer holds open the batch buffer after the first op
/// arrives, waiting for more ops to coalesce. Issue #231 — the previous
/// implementation drained `try_recv()` *immediately* after the first
/// message landed, so under any non-bursty workload every batch held
/// exactly one op (defeating the entire batching premise). 8 ms is small
/// vs the ~2-7 s Apps Script round-trip the batch is amortizing, but
/// long enough that concurrent HTTP/2 stream openings, parallel fetches,
/// or any other burst lands in the same batch.
const BATCH_COALESCE_WINDOW: Duration = Duration::from_millis(8);
/// Adaptive coalesce defaults: after each new op arrives, wait another
/// step for more ops. Resets on every arrival, up to max from the first
/// op. Overridable via config `coalesce_step_ms` / `coalesce_max_ms`.
const DEFAULT_COALESCE_STEP_MS: u64 = 40;
const DEFAULT_COALESCE_MAX_MS: u64 = 1000;

/// Structured error code the tunnel-node returns when it doesn't know the
/// op (version mismatch). Must match `tunnel-node/src/main.rs`.
Expand Down Expand Up @@ -255,7 +251,7 @@ pub struct TunnelMux {
}

impl TunnelMux {
pub fn start(fronter: Arc<DomainFronter>) -> Arc<Self> {
pub fn start(fronter: Arc<DomainFronter>, coalesce_step_ms: u64, coalesce_max_ms: u64) -> Arc<Self> {
// Dedupe before snapshotting: the aggregate `all_legacy` gate
// compares `legacy_deployments.len()` (a HashMap, so unique
// keys) against this count, so using the raw `num_scripts()`
Expand All @@ -280,8 +276,11 @@ impl TunnelMux {
unique_n,
CONCURRENCY_PER_DEPLOYMENT
);
let step = if coalesce_step_ms > 0 { coalesce_step_ms } else { DEFAULT_COALESCE_STEP_MS };
let max = if coalesce_max_ms > 0 { coalesce_max_ms } else { DEFAULT_COALESCE_MAX_MS };
tracing::info!("batch coalesce: step={}ms max={}ms", step, max);
let (tx, rx) = mpsc::channel(512);
tokio::spawn(mux_loop(rx, fronter));
tokio::spawn(mux_loop(rx, fronter, step, max));
Arc::new(Self {
tx,
connect_data_unsupported: Arc::new(AtomicBool::new(false)),
Expand Down Expand Up @@ -556,7 +555,9 @@ impl TunnelMux {
}
}

async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>, coalesce_step_ms: u64, coalesce_max_ms: u64) {
let coalesce_step = Duration::from_millis(coalesce_step_ms);
let coalesce_max = Duration::from_millis(coalesce_max_ms);
// One semaphore per deployment ID, each allowing 30 concurrent requests.
let sems: Arc<HashMap<String, Arc<Semaphore>>> = Arc::new(
fronter
Expand All @@ -574,27 +575,35 @@ async fn mux_loop(mut rx: mpsc::Receiver<MuxMsg>, fronter: Arc<DomainFronter>) {
loop {
let mut msgs = Vec::new();
// Block on the first message — no point waking up to find an empty
// queue. Once the first op lands, we hold open BATCH_COALESCE_WINDOW
// so concurrent ops (parallel fetches, HTTP/2 stream openings, etc.)
// land in the same batch instead of getting a fresh round-trip each.
// queue. Once the first op lands, the adaptive coalesce loop waits
// in `coalesce_step` increments (resetting on each new arrival, up
// to `coalesce_max`) so concurrent ops land in the same batch.
match rx.recv().await {
Some(msg) => msgs.push(msg),
None => break,
}
let deadline = tokio::time::Instant::now() + BATCH_COALESCE_WINDOW;
let hard_deadline = tokio::time::Instant::now() + coalesce_max;
let mut soft_deadline = tokio::time::Instant::now() + coalesce_step;
loop {
// Drain anything that's already queued without waiting.
while let Ok(msg) = rx.try_recv() {
msgs.push(msg);
// Reset the soft deadline — more ops are arriving.
soft_deadline = tokio::time::Instant::now() + coalesce_step;
}
let now = tokio::time::Instant::now();
if now >= deadline {
let wait_until = soft_deadline.min(hard_deadline);
if now >= wait_until {
break;
}
match tokio::time::timeout(deadline - now, rx.recv()).await {
Ok(Some(msg)) => msgs.push(msg),
match tokio::time::timeout(wait_until - now, rx.recv()).await {
Ok(Some(msg)) => {
msgs.push(msg);
// New op arrived — extend the soft deadline.
soft_deadline = tokio::time::Instant::now() + coalesce_step;
}
Ok(None) => return,
Err(_) => break,
Err(_) => break, // soft or hard deadline hit, no more ops
}
}

Expand Down
Loading