11use anyhow:: Context as _;
2+ use axum:: body:: Body ;
23use axum:: extract:: { FromRef , Path , State } ;
34use axum:: middleware:: Next ;
45use axum:: response:: Response ;
@@ -7,7 +8,6 @@ use axum::Json;
78use bytes:: Bytes ;
89use chrono:: NaiveDateTime ;
910use futures:: { SinkExt , StreamExt } ;
10- use axum:: body:: Body ;
1111use http:: { Request , StatusCode } ;
1212use http_body_util:: BodyExt ;
1313use hyper_util:: client:: legacy:: Client as HyperClient ;
@@ -216,7 +216,9 @@ pub fn router_to_service(
216216 hyper:: Request < hyper:: body:: Incoming > ,
217217 Response = hyper:: Response < axum:: body:: Body > ,
218218 Error = std:: io:: Error ,
219- Future = impl std:: future:: Future < Output = Result < hyper:: Response < axum:: body:: Body > , std:: io:: Error > > ,
219+ Future = impl std:: future:: Future <
220+ Output = Result < hyper:: Response < axum:: body:: Body > , std:: io:: Error > ,
221+ > ,
220222> + Clone {
221223 // Create a service from the router that handles Request<Incoming>
222224 // Using hyper::service::service_fn which implements hyper::service::Service
@@ -226,13 +228,14 @@ pub fn router_to_service(
226228 // Convert Incoming body to axum Body
227229 // by collecting the body into bytes first
228230 let ( parts, body) = req. into_parts ( ) ;
229- let collected = body. collect ( ) . await . map_err ( |e| {
230- std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e)
231- } ) ?;
231+ let collected = body
232+ . collect ( )
233+ . await
234+ . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e) ) ?;
232235 let bytes = collected. to_bytes ( ) ;
233236 let body = axum:: body:: Body :: from ( bytes) ;
234237 let req = hyper:: Request :: from_parts ( parts, body) ;
235-
238+
236239 // Call the router and convert Infallible error to io::Error
237240 router. call ( req) . await . map_err ( |e| match e { } )
238241 }
@@ -249,7 +252,6 @@ where
249252 A : crate :: net:: Accept ,
250253{
251254 use std:: future:: poll_fn;
252-
253255
254256 let shutdown = shutdown. notified ( ) ;
255257 tokio:: pin!( shutdown) ;
@@ -272,9 +274,8 @@ where
272274
273275 let svc = router_to_service ( router. clone ( ) ) ;
274276 tokio:: spawn ( async move {
275- let builder = hyper_util:: server:: conn:: auto:: Builder :: new (
276- hyper_util:: rt:: TokioExecutor :: new ( ) ,
277- ) ;
277+ let builder =
278+ hyper_util:: server:: conn:: auto:: Builder :: new ( hyper_util:: rt:: TokioExecutor :: new ( ) ) ;
278279 let _ = builder
279280 . serve_connection ( hyper_util:: rt:: tokio:: TokioIo :: new ( conn) , svc)
280281 . await ;
@@ -500,7 +501,9 @@ async fn handle_create_namespace(
500501 Some ( ref _url) => {
501502 // TODO: Re-enable dump from URL after fixing connector for hyper 1.0
502503 // RestoreOption::Dump(dump_stream_from_url(_url, app_state.connector.clone()).await?)
503- return Err ( Error :: Internal ( "Dump from URL temporarily disabled" . to_string ( ) ) ) ;
504+ return Err ( Error :: Internal (
505+ "Dump from URL temporarily disabled" . to_string ( ) ,
506+ ) ) ;
504507 }
505508 None => RestoreOption :: Latest ,
506509 } ;
@@ -557,16 +560,16 @@ where
557560{
558561 match url. scheme ( ) {
559562 "http" | "https" => {
560- let client: HyperClient < C , http_body_util:: Empty < Bytes > > = HyperClient :: builder ( TokioExecutor :: new ( ) ) . build ( connector) ;
563+ let client: HyperClient < C , http_body_util:: Empty < Bytes > > =
564+ HyperClient :: builder ( TokioExecutor :: new ( ) ) . build ( connector) ;
561565 let uri = url
562566 . as_str ( )
563567 . parse ( )
564568 . map_err ( |_| LoadDumpError :: InvalidDumpUrl ) ?;
565569 let resp = client. get ( uri) . await ?;
566570 // Convert hyper body to a stream of io::Result<Bytes>
567571 let body_stream = resp. into_body ( ) . into_data_stream ( ) ;
568- let body = body_stream
569- . map ( |r| r. map_err ( |e| std:: io:: Error :: new ( ErrorKind :: Other , e) ) ) ;
572+ let body = body_stream. map ( |r| r. map_err ( |e| std:: io:: Error :: new ( ErrorKind :: Other , e) ) ) ;
570573 Ok ( Box :: new ( body) )
571574 }
572575 "file" => {
@@ -682,9 +685,7 @@ async fn disable_profile_heap(Path(profile): Path<String>) -> Response<Body> {
682685 let stream = tokio_stream:: wrappers:: ReceiverStream :: new ( rx) ;
683686 // Wrap items in Result for TryStream compatibility
684687 let stream = stream. map ( |b| Ok :: < _ , std:: io:: Error > ( b) ) ;
685- Response :: builder ( )
686- . body ( Body :: from_stream ( stream) )
687- . unwrap ( )
688+ Response :: builder ( ) . body ( Body :: from_stream ( stream) ) . unwrap ( )
688689}
689690
690691async fn delete_profile_heap ( Path ( profile) : Path < String > ) -> crate :: Result < ( ) > {
0 commit comments