File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -176,7 +176,7 @@ where
176176
177177 pub async fn cursor ( & self , batch : Batch ) -> Result < Cursor < T :: Stream > > {
178178 let mut client = self . inner . stream . lock ( ) . await ;
179- let cursor = client. open_cursor ( batch) . await ?;
179+ let cursor = client. open_cursor ( batch, ! self . is_autocommit ( ) ) . await ?;
180180 Ok ( cursor)
181181 }
182182
@@ -314,7 +314,7 @@ where
314314 Ok ( resp)
315315 }
316316
317- pub async fn open_cursor ( & mut self , batch : Batch ) -> Result < Cursor < T :: Stream > > {
317+ pub async fn open_cursor ( & mut self , batch : Batch , use_baton : bool ) -> Result < Cursor < T :: Stream > > {
318318 let msg = CursorReq {
319319 baton : self . baton . clone ( ) ,
320320 batch,
@@ -329,7 +329,17 @@ where
329329 self . pipeline_url = Arc :: from ( format ! ( "{base_url}/v3/pipeline" ) ) ;
330330 self . cursor_url = Arc :: from ( format ! ( "{base_url}/v3/cursor" ) ) ;
331331 }
332- self . reset ( ) ;
332+ dbg ! ( use_baton) ;
333+ match response. baton . take ( ) {
334+ Some ( baton) if use_baton => {
335+ tracing:: trace!( "client stream has been assigned with baton: `{}`" , baton) ;
336+ self . baton = Some ( baton)
337+ }
338+ _ => {
339+ tracing:: trace!( "client stream has been closed by the server" ) ;
340+ self . reset ( ) ;
341+ }
342+ }
333343 Ok ( cursor)
334344 }
335345
You can’t perform that action at this time.
0 commit comments