Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/bin/ui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ fn load_form() -> (FormState, Option<String>) {
youtube_via_relay: false,
passthrough_hosts: Vec::new(),
block_quic: true,
block_stun: true,
block_stun: false,
disable_padding: false,
force_http1: false,
tunnel_doh: true,
Expand Down Expand Up @@ -695,8 +695,8 @@ struct ConfigWire<'a> {
/// emit only when the user has explicitly disabled the block.
#[serde(skip_serializing_if = "is_true")]
block_doh: bool,
/// Default true. Emit only when the user disables STUN/TURN blocking.
#[serde(skip_serializing_if = "is_true")]
/// Default false. Emit only when the user enables STUN/TURN blocking.
#[serde(skip_serializing_if = "is_false")]
block_stun: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
fronting_groups: &'a Vec<FrontingGroup>,
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ fn default_tunnel_doh() -> bool { true }
/// Default for `block_quic`: `true`. QUIC over the TCP-based tunnel
/// causes TCP-over-TCP meltdown (<1 Mbps). Browsers fall back to
/// HTTPS/TCP within seconds of the silent UDP drop. Issue #793.
fn default_block_stun() -> bool { true }
fn default_block_stun() -> bool { false }
fn default_block_quic() -> bool { true }

/// Default for `block_doh`: `true` (browser DoH is rejected so the
Expand Down
81 changes: 66 additions & 15 deletions src/tunnel_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ const INFLIGHT_ACTIVE: usize = 4;
/// How many consecutive empty replies before dropping from active to idle depth.
const INFLIGHT_COOLDOWN: u32 = 3;

/// Max sessions that can run at elevated pipeline depth per deployment.
const MAX_ELEVATED_PER_DEPLOYMENT: u64 = 30;
/// Max sessions that can run at elevated pipeline depth (total, not per deployment).
const MAX_ELEVATED_TOTAL: u64 = 10;

/// Adaptive coalesce defaults: after each new op arrives, wait another
/// step for more ops. Resets on every arrival, up to max from the first
Expand Down Expand Up @@ -442,7 +442,7 @@ impl TunnelMux {
.batch_timeout()
.saturating_add(REPLY_TIMEOUT_SLACK);
pipeline_debug::set_limits(
MAX_ELEVATED_PER_DEPLOYMENT * unique_n as u64,
MAX_ELEVATED_TOTAL,
(CONCURRENCY_PER_DEPLOYMENT * unique_n) as u64,
);
let (tx, rx) = mpsc::unbounded_channel();
Expand All @@ -462,7 +462,7 @@ impl TunnelMux {
unreachable_cache: Mutex::new(HashMap::new()),
reply_timeout,
elevated_sessions: AtomicU64::new(0),
max_elevated: MAX_ELEVATED_PER_DEPLOYMENT * unique_n as u64,
max_elevated: MAX_ELEVATED_TOTAL,
})
}

Expand Down Expand Up @@ -1439,7 +1439,7 @@ async fn tunnel_loop(
let mut next_data_write_seq: u64 = 0;
let mut eof_seen = false;
let mut client_closed = false;
let mut pending_writes: BTreeMap<u64, (TunnelResponse, String)> = BTreeMap::new();
let mut pending_writes: BTreeMap<u64, (TunnelResponse, String, bool)> = BTreeMap::new();

// Buffered upload data waiting to be sent (when pipeline is full).
let mut buffered_upload: Option<Bytes> = None;
Expand Down Expand Up @@ -1597,12 +1597,12 @@ async fn tunnel_loop(
next_write_seq += 1;
while let Some(entry) = pending_writes.first_entry() {
if *entry.key() != next_write_seq { break; }
let (_, (buffered_resp, _)) = entry.remove_entry();
let (_, (buffered_resp, _, _)) = entry.remove_entry();
let _ = write_tunnel_response(&mut writer, &buffered_resp).await;
next_write_seq += 1;
}
} else {
pending_writes.insert(meta.seq, (resp, script_id));
pending_writes.insert(meta.seq, (resp, script_id, meta.was_empty_poll));
}
continue;
}
Expand Down Expand Up @@ -1632,6 +1632,41 @@ async fn tunnel_loop(
}
}

// Escalating backoff: avoid flooding empty polls on idle
// sessions. Mirrors the pre-pipelining cadence.
let keepalive_delay = match consecutive_empty {
0 => Duration::from_millis(20),
1 => Duration::from_millis(80),
2 => Duration::from_millis(200),
3 => Duration::from_millis(500),
_ => Duration::from_secs(2),
};
if consecutive_empty > 0 {
// Wait for either the backoff timer or client data.
if !client_closed {
read_buf.reserve(65536);
tokio::select! {
biased;
result = reader.read_buf(&mut read_buf) => {
match result {
Ok(0) => break,
Ok(n) => {
consecutive_empty = 0;
let data = extract_bytes(&mut read_buf, n);
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
inflight.push(wrap_reply(meta, reply_rx));
continue;
}
Err(_) => break,
}
}
_ = tokio::time::sleep(keepalive_delay) => {}
}
} else {
tokio::time::sleep(keepalive_delay).await;
}
}

let (meta, reply_rx) = send_empty_poll(sid, &mut next_send_seq, mux);
tracing::debug!(
"sess {}: keepalive poll seq={}", &sid[..sid.len().min(8)], meta.seq
Expand All @@ -1640,8 +1675,9 @@ async fn tunnel_loop(
}

// Can we read from the client? Yes if not closed, not eof, and
// we have room for more inflight ops (fast-path allows +4 extra).
let can_read = !client_closed && !eof_seen && inflight.len() < max_inflight + 4;
// we have room for more inflight ops (+2 extra for upload data
// so it doesn't wait for a full pipeline drain).
let can_read = !client_closed && !eof_seen && inflight.len() < max_inflight + 2;

tokio::select! {
biased;
Expand Down Expand Up @@ -1712,8 +1748,14 @@ async fn tunnel_loop(
consecutive_data = consecutive_data.saturating_add(1);
let bytes = resp.d.as_ref().map(|d| d.len() as u64 * 3 / 4).unwrap_or(0);
total_download_bytes += bytes;
} else if meta.was_empty_poll && consecutive_data > 0 {
// Stale empty-poll reply during an active data
// streak — don't penalise the streak. The poll
// was queued before data started flowing; the
// empty result is expected.
} else {
consecutive_empty = consecutive_empty.saturating_add(1);
consecutive_data = 0;
}
if is_eof {
eof_seen = true;
Expand All @@ -1722,7 +1764,7 @@ async fn tunnel_loop(
// Flush buffered out-of-order writes.
while let Some(entry) = pending_writes.first_entry() {
if *entry.key() != next_write_seq { break; }
let (_, (buffered_resp, _)) = entry.remove_entry();
let (_, (buffered_resp, _, buf_was_empty_poll)) = entry.remove_entry();
let buf_eof = buffered_resp.eof.unwrap_or(false);
match write_tunnel_response(&mut writer, &buffered_resp).await? {
WriteOutcome::Wrote => {
Expand All @@ -1732,7 +1774,12 @@ async fn tunnel_loop(
total_download_bytes += bytes;
}
WriteOutcome::NoData => {
consecutive_empty = consecutive_empty.saturating_add(1);
if buf_was_empty_poll && consecutive_data > 0 {
// Stale empty poll — don't break data streak.
} else {
consecutive_empty = consecutive_empty.saturating_add(1);
consecutive_data = 0;
}
}
WriteOutcome::BadBase64 => break,
}
Expand All @@ -1742,7 +1789,7 @@ async fn tunnel_loop(
}
}
} else {
pending_writes.insert(meta.seq, (resp, script_id));
pending_writes.insert(meta.seq, (resp, script_id, meta.was_empty_poll));
}

// Send buffered upload data now that a slot freed up.
Expand Down Expand Up @@ -1810,9 +1857,12 @@ async fn tunnel_loop(
}

// Schedule refill if pipeline needs more polls.
// Skip refill at IDLE depth with consecutive empties —
// the keepalive path handles that with proper backoff.
if !eof_seen
&& inflight.len() < max_inflight
&& refill_at.is_none()
&& !(max_inflight <= INFLIGHT_IDLE && consecutive_empty >= 2)
{
refill_at = Some(Box::pin(tokio::time::sleep(
if max_inflight > INFLIGHT_IDLE { Duration::from_millis(100) } else { Duration::ZERO }
Expand Down Expand Up @@ -1854,8 +1904,9 @@ async fn tunnel_loop(
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
consecutive_empty = 0;
inflight.push(wrap_reply(meta, reply_rx));
} else if inflight.len() < max_inflight + 4 {
// Fast-path: pipeline full but under +4 extra.
} else if inflight.len() < max_inflight + 2 {
// Two extra slots for upload data so it doesn't
// wait for a full pipeline drain.
let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux);
consecutive_empty = 0;
inflight.push(wrap_reply(meta, reply_rx));
Expand Down Expand Up @@ -2200,7 +2251,7 @@ mod tests {
// `fronter.batch_timeout()` (see `TunnelMux::start`).
reply_timeout: Duration::from_secs(35),
elevated_sessions: AtomicU64::new(0),
max_elevated: MAX_ELEVATED_PER_DEPLOYMENT * num_scripts as u64,
max_elevated: MAX_ELEVATED_TOTAL,
});
(mux, rx)
}
Expand Down
Loading