diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index b68354b8..9f8f4212 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -12,11 +12,10 @@ use gcp_auth::TokenProvider; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use objectstore_types::range::{ByteRange, ContentRange}; use reqwest::header::HeaderName; -use reqwest::{ - Body, IntoUrl, Method, RequestBuilder, Response, StatusCode, Url, header, multipart, -}; +use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart}; use serde::{Deserialize, Serialize}; +use super::response::ResponseExt; use crate::backend::common::{ self, Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend, PutResponse, @@ -378,16 +377,20 @@ fn insert_gcs_meta_header( } /// Returns `true` if the error is a transient reqwest failure worth retrying. -fn is_retryable(error: &Error) -> bool { - let Error::Reqwest { cause, .. } = error else { - return false; - }; - if cause.is_timeout() || cause.is_connect() || cause.is_request() { - return true; +fn error_is_retryable(error: &Error) -> bool { + match error { + Error::Reqwest { cause, .. } => { + cause.is_timeout() + || cause.is_connect() + || cause.is_request() + || cause.status().is_some_and(status_is_retryable) + } + Error::BackendResponse { status, .. } => status_is_retryable(*status), + _ => false, } - let Some(status) = cause.status() else { - return false; - }; +} + +fn status_is_retryable(status: StatusCode) -> bool { // https://docs.cloud.google.com/storage/docs/json_api/v1/status-codes matches!( status, @@ -511,7 +514,7 @@ impl GcsBackend { loop { match f().await { Ok(res) => return Ok(res), - Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => { + Err(ref e) if retry_count < REQUEST_RETRY_COUNT && error_is_retryable(e) => { retry_count += 1; objectstore_metrics::count!("gcs.retries", action = action); objectstore_log::warn!(!!e, retry_count, action, "Retrying request"); @@ -541,8 +544,8 @@ impl GcsBackend { } let metadata: GcsObject = resp - .error_for_status() - .map_err(|e| Error::reqwest("GCS: get metadata status", e))? + .check_error("GCS: get metadata status") + .await? .json() .await .map_err(|e| Error::reqwest("GCS: get metadata parse", e))?; @@ -594,8 +597,8 @@ impl GcsBackend { .json(&CustomTimeRequest { custom_time }) .send() .await - .and_then(Response::error_for_status) - .map_err(|e| Error::reqwest("GCS: update custom time", e))?; + .check_error("GCS: update custom time") + .await?; Ok(()) }) .await @@ -660,18 +663,19 @@ impl Backend for GcsBackend { // set the header *after* writing the multipart form into the request. let content_type = format!("multipart/related; boundary={}", multipart.boundary()); - self.request(Method::POST, self.upload_url(id, "multipart")?) + let resp = self + .request(Method::POST, self.upload_url(id, "multipart")?) .await? .multipart(multipart) .header(header::CONTENT_TYPE, content_type) .send() .await - .and_then(Response::error_for_status) .map_err(|e| match stream::unpack_client_error(&e) { Some(ce) => Error::Client(ce), _ => Error::reqwest("GCS: upload object", e), })?; + resp.check_error("GCS: upload object").await?; Ok(()) } @@ -717,8 +721,7 @@ impl Backend for GcsBackend { } } - resp.error_for_status() - .map_err(|e| Error::reqwest("GCS: get payload", e)) + resp.check_error("GCS: get payload").await }) .await?; @@ -771,8 +774,7 @@ impl Backend for GcsBackend { return Ok(()); } - resp.error_for_status() - .map_err(|e| Error::reqwest("GCS: delete object", e))?; + resp.check_error("GCS: delete object").await?; Ok(()) }) @@ -913,8 +915,8 @@ impl MultipartUploadBackend for GcsBackend { let resp = builder .send() .await - .and_then(Response::error_for_status) - .map_err(|e| Error::reqwest("GCS: initiate multipart upload", e))?; + .check_error("GCS: initiate multipart upload") + .await?; let body = resp .bytes() @@ -956,11 +958,7 @@ impl MultipartUploadBackend for GcsBackend { builder = builder.header("content-md5", md5); } - let resp = builder - .send() - .await - .and_then(Response::error_for_status) - .map_err(|e| Error::reqwest("GCS: upload part", e))?; + let resp = builder.send().await.check_error("GCS: upload part").await?; let etag = resp .headers() @@ -998,8 +996,8 @@ impl MultipartUploadBackend for GcsBackend { .await? .send() .await - .and_then(Response::error_for_status) - .map_err(|e| Error::reqwest("GCS: list parts", e))?; + .check_error("GCS: list parts") + .await?; let body = resp .bytes() @@ -1029,8 +1027,8 @@ impl MultipartUploadBackend for GcsBackend { .await? .send() .await - .and_then(Response::error_for_status) - .map_err(|e| Error::reqwest("GCS: abort multipart upload", e))?; + .check_error("GCS: abort multipart upload") + .await?; Ok(()) } @@ -1059,8 +1057,8 @@ impl MultipartUploadBackend for GcsBackend { .body(xml) .send() .await - .and_then(Response::error_for_status) - .map_err(|e| Error::reqwest("GCS: complete multipart upload", e))?; + .check_error("GCS: complete multipart upload") + .await?; let body = resp .bytes() diff --git a/objectstore-service/src/backend/mod.rs b/objectstore-service/src/backend/mod.rs index 3d6b92ae..f591bd54 100644 --- a/objectstore-service/src/backend/mod.rs +++ b/objectstore-service/src/backend/mod.rs @@ -18,6 +18,7 @@ pub mod counting; pub mod gcs; pub mod in_memory; pub mod local_fs; +mod response; pub mod s3_compatible; pub mod tiered; diff --git a/objectstore-service/src/backend/response.rs b/objectstore-service/src/backend/response.rs new file mode 100644 index 00000000..b25381df --- /dev/null +++ b/objectstore-service/src/backend/response.rs @@ -0,0 +1,156 @@ +//! HTTP response status checking with error body parsing. +//! +//! Provides [`ResponseExt`], an extension trait that replaces +//! [`reqwest::Response::error_for_status`] with a version that reads the +//! response body on 4xx/5xx errors and parses the structured error code and +//! message from it (JSON for GCS JSON API, XML for GCS XML API and S3). + +use reqwest::{Response, header}; +use serde::Deserialize; + +use crate::error::{Error, Result}; + +const MAX_ERROR_BODY_LEN: usize = 1024; + +/// GCS JSON API error envelope (`{"error": {"message": "...", ...}}`). +#[derive(Deserialize)] +struct JsonApiError { + error: JsonApiErrorDetail, +} + +/// Inner detail of a GCS JSON API error response. +#[derive(Deserialize)] +struct JsonApiErrorDetail { + #[serde(default)] + message: String, + #[serde(default)] + errors: Vec, +} + +/// Individual error entry in the GCS JSON API `errors` array. +#[derive(Deserialize)] +struct JsonApiErrorEntry { + #[serde(default)] + reason: String, +} + +/// GCS XML API / S3 error body (`......`). +#[derive(Deserialize)] +#[serde(rename_all = "PascalCase")] +struct XmlApiError { + #[serde(default)] + code: String, + #[serde(default)] + message: String, +} + +/// Extension trait for [`reqwest::Response`] that preserves error response bodies. +/// +/// Use `check_status` instead of [`reqwest::Response::error_for_status`] to avoid +/// losing the response body on 4xx/5xx errors. The method parses the structured +/// error body (JSON or XML) and returns an [`Error::BackendResponse`] with the +/// extracted error code and message. +/// +/// Implemented for both [`reqwest::Response`] and `Result` +/// so it can be chained directly after `.send().await`. +pub trait ResponseExt { + /// Checks the HTTP status and returns the response on success. + /// + /// On 4xx/5xx status codes, reads the response body and parses the error code + /// and message from it (JSON for GCS JSON API, XML for GCS XML API and S3). + /// For other error statuses (e.g., redirects), falls back to + /// [`reqwest::Response::error_for_status`]. + /// + /// When called on `Result`, transport errors are + /// wrapped as [`Error::Reqwest`] with the same context string. + async fn check_error(self, context: &'static str) -> Result; +} + +impl ResponseExt for Response { + async fn check_error(self, context: &'static str) -> Result { + let status = self.status(); + + if !(status.is_client_error() || status.is_server_error()) { + return self + .error_for_status() + .map_err(|e| Error::reqwest(context, e)); + } + + let content_type = self + .headers() + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + + let (code, message) = if content_type.starts_with("application/json") { + parse_json_error(self).await + } else if content_type.starts_with("application/xml") + || content_type.starts_with("text/xml") + { + parse_xml_error(self).await + } else { + parse_fallback(self).await + }; + + Err(Error::BackendResponse { + context, + status, + code, + message, + }) + } +} + +impl ResponseExt for Result { + async fn check_error(self, context: &'static str) -> Result { + match self { + Ok(resp) => resp.check_error(context).await, + Err(e) => Err(Error::reqwest(context, e)), + } + } +} + +async fn parse_json_error(resp: Response) -> (String, String) { + let status = resp.status(); + match resp.json::().await { + Ok(body) => { + let code = body + .error + .errors + .first() + .map(|e| e.reason.clone()) + .unwrap_or_else(|| status.as_str().to_owned()); + (code, body.error.message) + } + Err(_) => (status.as_str().to_owned(), status.to_string()), + } +} + +async fn parse_xml_error(resp: Response) -> (String, String) { + let status = resp.status(); + let bytes = match resp.bytes().await { + Ok(b) => b, + Err(_) => return (status.as_str().to_owned(), status.to_string()), + }; + + match quick_xml::de::from_reader::<_, XmlApiError>(bytes.as_ref()) { + Ok(body) => (body.code, body.message), + Err(_) => (status.as_str().to_owned(), status.to_string()), + } +} + +async fn parse_fallback(resp: Response) -> (String, String) { + let status = resp.status(); + match resp.text().await { + Ok(text) if !text.is_empty() => { + let truncated = if text.len() > MAX_ERROR_BODY_LEN { + let end = text.floor_char_boundary(MAX_ERROR_BODY_LEN); + format!("{}...(truncated)", &text[..end]) + } else { + text + }; + (status.as_str().to_owned(), truncated) + } + _ => (status.as_str().to_owned(), status.to_string()), + } +} diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index d8517c92..4d8e546b 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -9,6 +9,7 @@ use objectstore_types::range::{ByteRange, ContentRange}; use reqwest::header::HeaderMap; use reqwest::{Body, IntoUrl, Method, RequestBuilder, Response, StatusCode}; +use super::response::ResponseExt; use crate::backend::common::{ self, Backend, DeleteResponse, GetResponse, MetadataResponse, PutResponse, }; @@ -198,12 +199,7 @@ where } } - let response = response - .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to get object".to_string(), - cause, - })?; + let response = response.check_error("S3: failed to get object").await?; let headers = response.headers(); let mut metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?; @@ -261,15 +257,8 @@ where .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) .send() .await - .map_err(|cause| Error::Reqwest { - context: "S3: failed to send TTI update request".to_string(), - cause, - })? - .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to update expiration time for object with TTI".to_string(), - cause, - })?; + .check_error("S3: update expiration time") + .await?; Ok(()) } @@ -311,13 +300,13 @@ impl Backend for S3CompatibleBackend { stream: ClientStream, ) -> Result { objectstore_log::debug!("Writing to s3_compatible backend"); - self.request(Method::PUT, self.object_url(id)) + let resp = self + .request(Method::PUT, self.object_url(id)) .await? .headers(metadata_to_gcs_headers(metadata, GCS_CUSTOM_PREFIX)?) .body(Body::wrap_stream(stream)) .send() .await - .and_then(Response::error_for_status) .map_err(|cause| match stream::unpack_client_error(&cause) { Some(ce) => Error::Client(ce), _ => Error::Reqwest { @@ -325,6 +314,7 @@ impl Backend for S3CompatibleBackend { cause, }, })?; + resp.check_error("S3: failed to put object").await?; Ok(()) } @@ -365,13 +355,7 @@ impl Backend for S3CompatibleBackend { // Do not error for objects that do not exist. if response.status() != StatusCode::NOT_FOUND { - objectstore_log::debug!("Object not found"); - response - .error_for_status() - .map_err(|cause| Error::Reqwest { - context: "S3: failed to delete object".to_string(), - cause, - })?; + response.check_error("S3: failed to delete object").await?; } Ok(()) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs index 93c18818..1baa1e9a 100644 --- a/objectstore-service/src/error.rs +++ b/objectstore-service/src/error.rs @@ -6,6 +6,7 @@ use std::any::Any; use objectstore_log::Level; +use reqwest::StatusCode; use thiserror::Error as ThisError; use crate::stream::ClientError; @@ -47,6 +48,23 @@ pub enum Error { cause: reqwest::Error, }, + /// An HTTP error response from a storage backend (e.g., GCS, S3). + /// + /// Unlike [`Reqwest`](Self::Reqwest), which covers transport-level failures, this variant + /// captures application-level error responses where the server returned a 4xx/5xx status code + /// along with a structured error body. + #[error("{context} ({status}). {message} (code {code})")] + BackendResponse { + /// Context describing the request that failed. + context: &'static str, + /// The HTTP status code returned by the backend. + status: StatusCode, + /// Machine-readable error code from the response body (e.g., "InvalidArgument"). + code: String, + /// Human-readable error message from the response body. + message: String, + }, + /// Errors related to de/serialization and parsing of object metadata. #[error("metadata error: {0}")] Metadata(#[from] objectstore_types::metadata::Error), @@ -154,6 +172,7 @@ impl Error { Self::Io(_) => Level::ERROR, Self::Serde { .. } => Level::ERROR, Self::Reqwest { .. } => Level::ERROR, + Self::BackendResponse { .. } => Level::ERROR, Self::GcpAuth(_) => Level::ERROR, Self::Panic(_) => Level::ERROR, Self::Dropped => Level::ERROR,