Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ multer = "3.1.0"
nix = "0.31.3"
num_cpus = "1.17.0"
papaya = "0.2.4"
pin-project-lite = "0.2.17"
percent-encoding = "2.3.2"
quick-xml = "0.40.1"
rand = "0.10.1"
Expand Down
1 change: 1 addition & 0 deletions objectstore-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ objectstore-metrics = { workspace = true }
objectstore-service = { workspace = true }
objectstore-types = { workspace = true }
papaya = { workspace = true }
pin-project-lite = { workspace = true }
percent-encoding = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true, features = ["charset", "http2", "system-proxy", "native-tls-no-alpn"] }
Expand Down
5 changes: 4 additions & 1 deletion objectstore-server/src/endpoints/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::extractors::Xt;
use crate::extractors::batch::{BatchError, BatchOperationStream};
use crate::multipart::{IntoMultipartResponse, Part};
use crate::state::ServiceState;
use crate::web::SentryStreamExt;

const MAX_BODY_SIZE: usize = 1024 * 1024 * 1024; // 1 GB
const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status";
Expand Down Expand Up @@ -131,7 +132,9 @@ async fn batch(
async move { convert_to_part(idx, result, &state, &context).await }
});

responses.into_multipart_response(rand::random())
responses
.bind_hub(sentry::Hub::current())
.into_multipart_response(rand::random())
Comment on lines +135 to +137

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the two locations where we now bind the hub.

To expand on the description for why this has to be done here: We could technically import http_body and then pipe the size hint and other methods through from the underlying stream. Still, it means that we now add another box around every response, and we lose any form of optimization that axum might have on eager responses.

As long as we only have two places where we stream responses back, I think this is fine. Once we have more places, we'll have to reassess this.

Comment thread
jan-auer marked this conversation as resolved.
}

/// Converts a single operation result to a multipart [`Part`].
Expand Down
5 changes: 4 additions & 1 deletion objectstore-server/src/endpoints/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::endpoints::common::{ApiError, ApiResult};
use crate::extractors::Xt;
use crate::extractors::body::MeteredBody;
use crate::state::ServiceState;
use crate::web::SentryStreamExt;
use axum::body::Body;
use axum::extract::{Query, State};
use axum::http::{HeaderMap, StatusCode};
Expand Down Expand Up @@ -257,7 +258,9 @@ async fn complete(
}
}
};
let stream = stream.map(Ok::<_, Infallible>);
let stream = stream
.map(Ok::<_, Infallible>)
.bind_hub(sentry::Hub::current());

let mut headers = HeaderMap::new();
headers.insert(
Expand Down
2 changes: 2 additions & 0 deletions objectstore-server/src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
mod app;
mod middleware;
mod request_counter;
mod sentry_stream;
mod server;

pub use app::App;
pub use request_counter::RequestCounter;
pub use sentry_stream::{SentryStream, SentryStreamExt};
pub use server::server;
60 changes: 60 additions & 0 deletions objectstore-server/src/web/sentry_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! Stream wrapper that binds a Sentry hub around every poll.
//!
//! Axum handlers return response bodies that are polled by hyper after the
//! middleware stack unwinds, so `Hub::current()` inside a stream producer
//! resolves to whatever hub the tokio worker thread happens to have — not the
//! request's hub. [`SentryStream`] fixes this by re-activating the captured
//! hub on every `poll_next`.

use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures_util::Stream;
use pin_project_lite::pin_project;
use sentry::{Hub, HubSwitchGuard};

pin_project! {
/// Wraps a stream and re-activates a Sentry hub on every poll.
///
/// The hub is activated for the duration of each [`Stream::poll_next`] call
/// and released immediately after, so downstream producers always see the
/// correct request hub regardless of which tokio thread they run on.
#[derive(Debug)]
pub struct SentryStream<S> {
hub: Arc<Hub>,
#[pin]
inner: S,
}
}

impl<S> SentryStream<S> {
/// Creates a new [`SentryStream`] that binds `hub` around polls of `inner`.
pub fn new(hub: Arc<Hub>, inner: S) -> Self {
Self { hub, inner }
}
}

impl<S: Stream> Stream for SentryStream<S> {
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let _guard = HubSwitchGuard::new(Arc::clone(this.hub));
this.inner.poll_next(cx)
}
}

/// Extension trait that binds a Sentry hub to any [`Stream`], mirroring
/// [`sentry::SentryFutureExt`].
pub trait SentryStreamExt: Sized {
/// Wraps the stream so the given hub is active on every [`Stream::poll_next`] call.
fn bind_hub<H>(self, hub: H) -> SentryStream<Self>
where
H: Into<Arc<Hub>>,
{
SentryStream::new(hub.into(), self)
}
}

impl<S: Stream> SentryStreamExt for S {}
Loading