Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 34 additions & 36 deletions objectstore-service/src/backend/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ use gcp_auth::TokenProvider;
use objectstore_types::metadata::{ExpirationPolicy, Metadata};
use objectstore_types::range::{ByteRange, ContentRange};
use reqwest::header::HeaderName;
use reqwest::{
Body, IntoUrl, Method, RequestBuilder, Response, StatusCode, Url, header, multipart,
};
use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode, Url, header, multipart};
use serde::{Deserialize, Serialize};

use super::response::ResponseExt;
use crate::backend::common::{
self, Backend, DeleteResponse, GetResponse, MetadataResponse, MultipartUploadBackend,
PutResponse,
Expand Down Expand Up @@ -378,16 +377,20 @@ fn insert_gcs_meta_header(
}

/// Returns `true` if the error is a transient reqwest failure worth retrying.
fn is_retryable(error: &Error) -> bool {
let Error::Reqwest { cause, .. } = error else {
return false;
};
if cause.is_timeout() || cause.is_connect() || cause.is_request() {
return true;
fn error_is_retryable(error: &Error) -> bool {
match error {
Error::Reqwest { cause, .. } => {
cause.is_timeout()
|| cause.is_connect()
|| cause.is_request()
|| cause.status().is_some_and(status_is_retryable)
}
Error::BackendResponse { status, .. } => status_is_retryable(*status),
_ => false,
}
let Some(status) = cause.status() else {
return false;
};
}

fn status_is_retryable(status: StatusCode) -> bool {
// https://docs.cloud.google.com/storage/docs/json_api/v1/status-codes
matches!(
status,
Expand Down Expand Up @@ -511,7 +514,7 @@ impl GcsBackend {
loop {
match f().await {
Ok(res) => return Ok(res),
Err(ref e) if retry_count < REQUEST_RETRY_COUNT && is_retryable(e) => {
Err(ref e) if retry_count < REQUEST_RETRY_COUNT && error_is_retryable(e) => {
retry_count += 1;
objectstore_metrics::count!("gcs.retries", action = action);
objectstore_log::warn!(!!e, retry_count, action, "Retrying request");
Expand Down Expand Up @@ -541,8 +544,8 @@ impl GcsBackend {
}

let metadata: GcsObject = resp
.error_for_status()
.map_err(|e| Error::reqwest("GCS: get metadata status", e))?
.check_error("GCS: get metadata status")
.await?
.json()
.await
.map_err(|e| Error::reqwest("GCS: get metadata parse", e))?;
Expand Down Expand Up @@ -594,8 +597,8 @@ impl GcsBackend {
.json(&CustomTimeRequest { custom_time })
.send()
.await
.and_then(Response::error_for_status)
.map_err(|e| Error::reqwest("GCS: update custom time", e))?;
.check_error("GCS: update custom time")
.await?;
Ok(())
})
.await
Expand Down Expand Up @@ -660,18 +663,19 @@ impl Backend for GcsBackend {
// set the header *after* writing the multipart form into the request.
let content_type = format!("multipart/related; boundary={}", multipart.boundary());

self.request(Method::POST, self.upload_url(id, "multipart")?)
let resp = self
.request(Method::POST, self.upload_url(id, "multipart")?)
.await?
.multipart(multipart)
.header(header::CONTENT_TYPE, content_type)
.send()
.await
.and_then(Response::error_for_status)
.map_err(|e| match stream::unpack_client_error(&e) {
Some(ce) => Error::Client(ce),
_ => Error::reqwest("GCS: upload object", e),
})?;

resp.check_error("GCS: upload object").await?;
Ok(())
}

Expand Down Expand Up @@ -717,8 +721,7 @@ impl Backend for GcsBackend {
}
}

resp.error_for_status()
.map_err(|e| Error::reqwest("GCS: get payload", e))
resp.check_error("GCS: get payload").await
})
.await?;

Expand Down Expand Up @@ -771,8 +774,7 @@ impl Backend for GcsBackend {
return Ok(());
}

resp.error_for_status()
.map_err(|e| Error::reqwest("GCS: delete object", e))?;
resp.check_error("GCS: delete object").await?;

Ok(())
})
Expand Down Expand Up @@ -913,8 +915,8 @@ impl MultipartUploadBackend for GcsBackend {
let resp = builder
.send()
.await
.and_then(Response::error_for_status)
.map_err(|e| Error::reqwest("GCS: initiate multipart upload", e))?;
.check_error("GCS: initiate multipart upload")
.await?;

let body = resp
.bytes()
Expand Down Expand Up @@ -956,11 +958,7 @@ impl MultipartUploadBackend for GcsBackend {
builder = builder.header("content-md5", md5);
}

let resp = builder
.send()
.await
.and_then(Response::error_for_status)
.map_err(|e| Error::reqwest("GCS: upload part", e))?;
let resp = builder.send().await.check_error("GCS: upload part").await?;

let etag = resp
.headers()
Expand Down Expand Up @@ -998,8 +996,8 @@ impl MultipartUploadBackend for GcsBackend {
.await?
.send()
.await
.and_then(Response::error_for_status)
.map_err(|e| Error::reqwest("GCS: list parts", e))?;
.check_error("GCS: list parts")
.await?;

let body = resp
.bytes()
Expand Down Expand Up @@ -1029,8 +1027,8 @@ impl MultipartUploadBackend for GcsBackend {
.await?
.send()
.await
.and_then(Response::error_for_status)
.map_err(|e| Error::reqwest("GCS: abort multipart upload", e))?;
.check_error("GCS: abort multipart upload")
.await?;

Ok(())
}
Expand Down Expand Up @@ -1059,8 +1057,8 @@ impl MultipartUploadBackend for GcsBackend {
.body(xml)
.send()
.await
.and_then(Response::error_for_status)
.map_err(|e| Error::reqwest("GCS: complete multipart upload", e))?;
.check_error("GCS: complete multipart upload")
.await?;

let body = resp
.bytes()
Expand Down
1 change: 1 addition & 0 deletions objectstore-service/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod counting;
pub mod gcs;
pub mod in_memory;
pub mod local_fs;
mod response;
pub mod s3_compatible;
pub mod tiered;

Expand Down
156 changes: 156 additions & 0 deletions objectstore-service/src/backend/response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//! HTTP response status checking with error body parsing.
//!
//! Provides [`ResponseExt`], an extension trait that replaces
//! [`reqwest::Response::error_for_status`] with a version that reads the
//! response body on 4xx/5xx errors and parses the structured error code and
//! message from it (JSON for GCS JSON API, XML for GCS XML API and S3).

use reqwest::{Response, header};
use serde::Deserialize;

use crate::error::{Error, Result};

const MAX_ERROR_BODY_LEN: usize = 1024;

/// GCS JSON API error envelope (`{"error": {"message": "...", ...}}`).
#[derive(Deserialize)]
struct JsonApiError {
error: JsonApiErrorDetail,
}

/// Inner detail of a GCS JSON API error response.
#[derive(Deserialize)]
struct JsonApiErrorDetail {
#[serde(default)]
message: String,
#[serde(default)]
errors: Vec<JsonApiErrorEntry>,
}

/// Individual error entry in the GCS JSON API `errors` array.
#[derive(Deserialize)]
struct JsonApiErrorEntry {
#[serde(default)]
reason: String,
}

/// GCS XML API / S3 error body (`<Error><Code>...</Code><Message>...</Message></Error>`).
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
struct XmlApiError {
#[serde(default)]
code: String,
#[serde(default)]
message: String,
}

/// Extension trait for [`reqwest::Response`] that preserves error response bodies.
///
/// Use `check_status` instead of [`reqwest::Response::error_for_status`] to avoid
/// losing the response body on 4xx/5xx errors. The method parses the structured
/// error body (JSON or XML) and returns an [`Error::BackendResponse`] with the
/// extracted error code and message.
///
/// Implemented for both [`reqwest::Response`] and `Result<Response, reqwest::Error>`
/// so it can be chained directly after `.send().await`.
pub trait ResponseExt {
/// Checks the HTTP status and returns the response on success.
///
/// On 4xx/5xx status codes, reads the response body and parses the error code
/// and message from it (JSON for GCS JSON API, XML for GCS XML API and S3).
/// For other error statuses (e.g., redirects), falls back to
/// [`reqwest::Response::error_for_status`].
///
/// When called on `Result<Response, reqwest::Error>`, transport errors are
/// wrapped as [`Error::Reqwest`] with the same context string.
async fn check_error(self, context: &'static str) -> Result<Response>;
}

impl ResponseExt for Response {
async fn check_error(self, context: &'static str) -> Result<Response> {
let status = self.status();

if !(status.is_client_error() || status.is_server_error()) {
return self
.error_for_status()
.map_err(|e| Error::reqwest(context, e));
}

let content_type = self
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("");

let (code, message) = if content_type.starts_with("application/json") {
parse_json_error(self).await
} else if content_type.starts_with("application/xml")
|| content_type.starts_with("text/xml")
{
parse_xml_error(self).await
} else {
parse_fallback(self).await
};
Comment on lines +85 to +93

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a first attempt, though I think we can further improve this.


Err(Error::BackendResponse {
context,
status,
code,
message,
})
}
}

impl ResponseExt for Result<Response, reqwest::Error> {
async fn check_error(self, context: &'static str) -> Result<Response> {
match self {
Ok(resp) => resp.check_error(context).await,
Err(e) => Err(Error::reqwest(context, e)),
}
}
}

async fn parse_json_error(resp: Response) -> (String, String) {
let status = resp.status();
match resp.json::<JsonApiError>().await {
Ok(body) => {
let code = body
.error
.errors
.first()
.map(|e| e.reason.clone())
.unwrap_or_else(|| status.as_str().to_owned());
(code, body.error.message)
}
Err(_) => (status.as_str().to_owned(), status.to_string()),
}
}

async fn parse_xml_error(resp: Response) -> (String, String) {
let status = resp.status();
let bytes = match resp.bytes().await {
Ok(b) => b,
Err(_) => return (status.as_str().to_owned(), status.to_string()),
};

match quick_xml::de::from_reader::<_, XmlApiError>(bytes.as_ref()) {
Ok(body) => (body.code, body.message),
Err(_) => (status.as_str().to_owned(), status.to_string()),
}
}

async fn parse_fallback(resp: Response) -> (String, String) {
let status = resp.status();
match resp.text().await {
Ok(text) if !text.is_empty() => {
let truncated = if text.len() > MAX_ERROR_BODY_LEN {
let end = text.floor_char_boundary(MAX_ERROR_BODY_LEN);
format!("{}...(truncated)", &text[..end])
} else {
text
};
(status.as_str().to_owned(), truncated)
}
_ => (status.as_str().to_owned(), status.to_string()),
}
}
Loading
Loading