@@ -42,16 +42,13 @@ const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP";
4242/// milliseconds — once any session in the batch fires its notify.
4343const ACTIVE_DRAIN_DEADLINE : Duration = Duration :: from_millis ( 350 ) ;
4444
45- /// After the first session in an active batch wakes the wait, we sleep
46- /// briefly so neighboring sessions whose responses land just after the
47- /// first one don't get reported empty and pay an extra round-trip. Only
48- /// applies to active batches — for long-poll batches the wake event IS
49- /// the data we want, so we deliver it immediately.
50- ///
51- /// 30 ms is much shorter than the legacy two-pass retry (150 + 200 ms)
52- /// but covers the typical case of co-located upstreams whose RTTs
53- /// cluster within a few tens of ms of each other.
54- const STRAGGLER_SETTLE : Duration = Duration :: from_millis ( 30 ) ;
45+ /// Adaptive straggler settle: after the first session in an active batch
46+ /// wakes the drain, keep checking in STEP increments whether new data is
47+ /// still arriving. Stops when no new data arrived in the last STEP (the
48+ /// burst is over) or MAX is reached. Packing more session responses into
49+ /// one batch saves quota on high-latency relays (~1.5s Apps Script overhead).
50+ const STRAGGLER_SETTLE_STEP : Duration = Duration :: from_millis ( 40 ) ;
51+ const STRAGGLER_SETTLE_MAX : Duration = Duration :: from_millis ( 500 ) ;
5552
5653/// Drain-phase deadline when the batch is a pure poll (no writes, no new
5754/// connections — clients just asking "any push data?"). Holding the
@@ -65,18 +62,16 @@ const STRAGGLER_SETTLE: Duration = Duration::from_millis(30);
6562/// op per session), so any local bytes that arrive while the poll is
6663/// being held are stuck in the kernel until the poll returns.
6764///
68- /// * Lower (e.g. 2 s) — interactive shells / typing-burst flows feel
69- /// snappier, but push-only sessions pay more empty round-trips.
70- /// * Higher (e.g. 20 s) — push delivery is near-RTT and round-trip
71- /// count is minimal, but a thinking pause between keystrokes can
72- /// tax the next keystroke by up to the chosen value.
73- ///
74- /// 5 s is a middle ground: a typing user pausing mid-thought pays at
75- /// most a 5 s nudge before their next keystroke flows, while idle
76- /// sessions still get the bulk of the long-poll benefit. Must also
77- /// stay safely below the client's `BATCH_TIMEOUT` (30 s) and Apps
78- /// Script's UrlFetch ceiling (~60 s).
79- const LONGPOLL_DEADLINE : Duration = Duration :: from_secs ( 5 ) ;
65+ /// 15 s keeps persistent connections (Telegram XMPP on :5222, Google
66+ /// Push on :5228) alive without forcing frequent reconnects. At 5 s,
67+ /// apps like Telegram interpreted the frequent empty returns as
68+ /// connection instability and rotated sessions — each reconnect costs
69+ /// a full TLS handshake (~4 s through Apps Script), causing visible
70+ /// video/voice interruptions. 15 s is well below the client's
71+ /// `BATCH_TIMEOUT` (30 s) and Apps Script's UrlFetch ceiling (~60 s).
72+ /// Tested on censored networks in Iran where users reported smoother
73+ /// Telegram video playback and fewer session resets at this value.
74+ const LONGPOLL_DEADLINE : Duration = Duration :: from_secs ( 15 ) ;
8075
8176/// Bound on each UDP session's inbound queue. Beyond this we drop oldest
8277/// to keep recent voice/media packets moving — a stale RTP frame is
@@ -914,7 +909,6 @@ async fn handle_batch(
914909 . collect ( )
915910 } ;
916911
917- let wait_start = Instant :: now ( ) ;
918912 // Wait for either side to wake. Running both concurrently means
919913 // a TCP-only batch isn't slowed by a stale UDP watch list, and
920914 // vice versa.
@@ -924,9 +918,45 @@ async fn handle_batch(
924918 ) ;
925919
926920 if had_writes_or_connects {
927- let remaining = deadline. saturating_sub ( wait_start. elapsed ( ) ) ;
928- if !remaining. is_zero ( ) {
929- tokio:: time:: sleep ( STRAGGLER_SETTLE . min ( remaining) ) . await ;
921+ // Adaptive settle: keep waiting in steps while new data
922+ // keeps arriving. Break when:
923+ // 1. No new data arrived in the last step (burst is over)
924+ // 2. 500ms max reached
925+ let settle_end = Instant :: now ( ) + STRAGGLER_SETTLE_MAX ;
926+ let mut prev_tcp_bytes: usize = 0 ;
927+ let mut prev_udp_pkts: usize = 0 ;
928+ // Snapshot current buffer sizes.
929+ for inner in & tcp_inners {
930+ prev_tcp_bytes += inner. read_buf . lock ( ) . await . len ( ) ;
931+ }
932+ for inner in & udp_inners {
933+ prev_udp_pkts += inner. packets . lock ( ) . await . len ( ) ;
934+ }
935+ loop {
936+ let now = Instant :: now ( ) ;
937+ if now >= settle_end {
938+ break ;
939+ }
940+ let remaining = settle_end. duration_since ( now) ;
941+ tokio:: time:: sleep ( STRAGGLER_SETTLE_STEP . min ( remaining) ) . await ;
942+
943+ // Measure current buffer sizes.
944+ let mut tcp_bytes: usize = 0 ;
945+ let mut udp_pkts: usize = 0 ;
946+ for inner in & tcp_inners {
947+ tcp_bytes += inner. read_buf . lock ( ) . await . len ( ) ;
948+ }
949+ for inner in & udp_inners {
950+ udp_pkts += inner. packets . lock ( ) . await . len ( ) ;
951+ }
952+
953+ // No new data since last step — burst is over.
954+ if tcp_bytes == prev_tcp_bytes && udp_pkts == prev_udp_pkts {
955+ break ;
956+ }
957+
958+ prev_tcp_bytes = tcp_bytes;
959+ prev_udp_pkts = udp_pkts;
930960 }
931961 }
932962
0 commit comments