From acd434cc8ea82426c14fe57c0e9d47200f76c2d8 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 26 Jun 2026 13:45:40 +0200 Subject: [PATCH] fix(sentry): Bind hub to lazy response streams Axum stream bodies are polled by hyper after the middleware stack unwinds, so Hub::current() inside a stream producer resolves to the tokio worker thread's ambient hub rather than the request's hub. Introduce SentryStream, a pin-projected stream wrapper that re-activates a captured Arc via HubSwitchGuard on every poll_next call, mirroring how SentryFuture works for futures. Bind the hub at stream construction time in the two affected endpoints: complete (multipart) and batch. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 1 + Cargo.toml | 1 + objectstore-server/Cargo.toml | 1 + objectstore-server/src/endpoints/batch.rs | 5 +- objectstore-server/src/endpoints/multipart.rs | 5 +- objectstore-server/src/web/mod.rs | 2 + objectstore-server/src/web/sentry_stream.rs | 60 +++++++++++++++++++ 7 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 objectstore-server/src/web/sentry_stream.rs diff --git a/Cargo.lock b/Cargo.lock index e4be1837..d7bb9f41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2709,6 +2709,7 @@ dependencies = [ "objectstore-types", "papaya", "percent-encoding", + "pin-project-lite", "rand 0.10.1", "reqwest", "rustls", diff --git a/Cargo.toml b/Cargo.toml index 38130f7d..b3f345e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ multer = "3.1.0" nix = "0.31.3" num_cpus = "1.17.0" papaya = "0.2.4" +pin-project-lite = "0.2.17" percent-encoding = "2.3.2" quick-xml = "0.40.1" rand = "0.10.1" diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index 67b61394..c809aa57 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -31,6 +31,7 @@ objectstore-metrics = { workspace = true } objectstore-service = { workspace = true } objectstore-types = { workspace = true } papaya = { workspace = true } +pin-project-lite = { workspace = true } percent-encoding = { workspace = true } rand = { workspace = true } reqwest = { workspace = true, features = ["charset", "http2", "system-proxy", "native-tls-no-alpn"] } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 7cccbe15..1ccb043f 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -23,6 +23,7 @@ use crate::extractors::Xt; use crate::extractors::batch::{BatchError, BatchOperationStream}; use crate::multipart::{IntoMultipartResponse, Part}; use crate::state::ServiceState; +use crate::web::SentryStreamExt; const MAX_BODY_SIZE: usize = 1024 * 1024 * 1024; // 1 GB const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status"; @@ -131,7 +132,9 @@ async fn batch( async move { convert_to_part(idx, result, &state, &context).await } }); - responses.into_multipart_response(rand::random()) + responses + .bind_hub(sentry::Hub::current()) + .into_multipart_response(rand::random()) } /// Converts a single operation result to a multipart [`Part`]. diff --git a/objectstore-server/src/endpoints/multipart.rs b/objectstore-server/src/endpoints/multipart.rs index a161c6f4..10c23d12 100644 --- a/objectstore-server/src/endpoints/multipart.rs +++ b/objectstore-server/src/endpoints/multipart.rs @@ -6,6 +6,7 @@ use crate::endpoints::common::{ApiError, ApiResult}; use crate::extractors::Xt; use crate::extractors::body::MeteredBody; use crate::state::ServiceState; +use crate::web::SentryStreamExt; use axum::body::Body; use axum::extract::{Query, State}; use axum::http::{HeaderMap, StatusCode}; @@ -257,7 +258,9 @@ async fn complete( } } }; - let stream = stream.map(Ok::<_, Infallible>); + let stream = stream + .map(Ok::<_, Infallible>) + .bind_hub(sentry::Hub::current()); let mut headers = HeaderMap::new(); headers.insert( diff --git a/objectstore-server/src/web/mod.rs b/objectstore-server/src/web/mod.rs index 1aa35336..0eef14f9 100644 --- a/objectstore-server/src/web/mod.rs +++ b/objectstore-server/src/web/mod.rs @@ -15,8 +15,10 @@ mod app; mod middleware; mod request_counter; +mod sentry_stream; mod server; pub use app::App; pub use request_counter::RequestCounter; +pub use sentry_stream::{SentryStream, SentryStreamExt}; pub use server::server; diff --git a/objectstore-server/src/web/sentry_stream.rs b/objectstore-server/src/web/sentry_stream.rs new file mode 100644 index 00000000..6a48b5fa --- /dev/null +++ b/objectstore-server/src/web/sentry_stream.rs @@ -0,0 +1,60 @@ +//! Stream wrapper that binds a Sentry hub around every poll. +//! +//! Axum handlers return response bodies that are polled by hyper after the +//! middleware stack unwinds, so `Hub::current()` inside a stream producer +//! resolves to whatever hub the tokio worker thread happens to have — not the +//! request's hub. [`SentryStream`] fixes this by re-activating the captured +//! hub on every `poll_next`. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures_util::Stream; +use pin_project_lite::pin_project; +use sentry::{Hub, HubSwitchGuard}; + +pin_project! { + /// Wraps a stream and re-activates a Sentry hub on every poll. + /// + /// The hub is activated for the duration of each [`Stream::poll_next`] call + /// and released immediately after, so downstream producers always see the + /// correct request hub regardless of which tokio thread they run on. + #[derive(Debug)] + pub struct SentryStream { + hub: Arc, + #[pin] + inner: S, + } +} + +impl SentryStream { + /// Creates a new [`SentryStream`] that binds `hub` around polls of `inner`. + pub fn new(hub: Arc, inner: S) -> Self { + Self { hub, inner } + } +} + +impl Stream for SentryStream { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let _guard = HubSwitchGuard::new(Arc::clone(this.hub)); + this.inner.poll_next(cx) + } +} + +/// Extension trait that binds a Sentry hub to any [`Stream`], mirroring +/// [`sentry::SentryFutureExt`]. +pub trait SentryStreamExt: Sized { + /// Wraps the stream so the given hub is active on every [`Stream::poll_next`] call. + fn bind_hub(self, hub: H) -> SentryStream + where + H: Into>, + { + SentryStream::new(hub.into(), self) + } +} + +impl SentryStreamExt for S {}