diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index 937ee03fa7..871a117899 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -78,6 +78,7 @@ pub struct PushResult { status: PushStatus, generation: u32, max_frame_no: u32, + baton: Option, } pub enum PushStatus { @@ -97,6 +98,12 @@ struct InfoResult { current_generation: u32, } +#[derive(Debug)] +struct PushFramesResult { + max_frame_no: u32, + baton: Option, +} + pub struct SyncContext { db_path: String, client: hyper::Client, @@ -180,15 +187,30 @@ impl SyncContext { generation: u32, frame_no: u32, frames_count: u32, - ) -> Result { - let uri = format!( - "{}/sync/{}/{}/{}", - self.sync_url, - generation, + baton: Option, + ) -> Result { + let uri = { + let mut uri = format!( + "{}/sync/{}/{}/{}", + self.sync_url, + generation, + frame_no, + frame_no + frames_count + ); + if let Some(ref baton) = baton { + uri.push_str(&format!("/{}", baton)); + } + uri + }; + + tracing::debug!( + "pushing frame(frame_no={} (to={}), count={}, generation={}, baton={:?})", frame_no, - frame_no + frames_count + frame_no + frames_count, + frames_count, + generation, + baton ); - tracing::debug!("pushing frame(frame_no={}, count={}, generation={})", frame_no, frames_count, generation); let result = self.push_with_retry(uri, frames, self.max_retries).await?; @@ -200,6 +222,7 @@ impl SyncContext { } let generation = result.generation; let durable_frame_num = result.max_frame_no; + let baton = result.baton; if durable_frame_num > frame_no + frames_count - 1 { tracing::error!( @@ -232,7 +255,10 @@ impl SyncContext { self.durable_generation = generation; self.durable_frame_num = durable_frame_num; - Ok(durable_frame_num) + Ok(PushFramesResult { + max_frame_no: durable_frame_num, + baton, + }) } async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result { @@ -289,14 +315,32 @@ impl SyncContext { .as_u64() .ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?; + let baton = resp + .get("baton") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + tracing::trace!( + ?baton, + ?generation, + ?max_frame_no, + ?status, + "pushed frame to server" + ); + let status = match status { "ok" => PushStatus::Ok, "conflict" => PushStatus::Conflict, _ => return Err(SyncError::JsonValue(resp.clone()).into()), }; - let generation = generation as u32; + let generation = generation as u32; let max_frame_no = max_frame_no as u32; - return Ok(PushResult { status, generation, max_frame_no }); + return Ok(PushResult { + status, + generation, + max_frame_no, + baton, + }); } if res.status().is_redirection() { @@ -778,6 +822,7 @@ async fn try_push( let generation = sync_ctx.durable_generation(); let start_frame_no = sync_ctx.durable_frame_num() + 1; let end_frame_no = max_frame_no; + let mut baton = None; let mut frame_no = start_frame_no; while frame_no <= end_frame_no { @@ -794,9 +839,12 @@ async fn try_push( // The server returns its maximum frame number. To avoid resending // frames the server already knows about, we need to update the // frame number to the one returned by the server. - let max_frame_no = sync_ctx - .push_frames(frames.freeze(), generation, frame_no, batch_size) + let result = sync_ctx + .push_frames(frames.freeze(), generation, frame_no, batch_size, baton) .await?; + // if the server sent us a baton, then we will reuse it for the next request + baton = result.baton; + let max_frame_no = result.max_frame_no; if max_frame_no > frame_no { frame_no = max_frame_no + 1; diff --git a/libsql/src/sync/test.rs b/libsql/src/sync/test.rs index 2417fa8158..edd6d33eee 100644 --- a/libsql/src/sync/test.rs +++ b/libsql/src/sync/test.rs @@ -28,9 +28,9 @@ async fn test_sync_context_push_frame() { let mut sync_ctx = sync_ctx; // Push a frame and verify the response - let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap(); + let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, 0); // First frame should return max_frame_no = 0 + assert_eq!(durable_frame.max_frame_no, 0); // First frame should return max_frame_no = 0 // Verify internal state was updated assert_eq!(sync_ctx.durable_frame_num(), 0); @@ -56,9 +56,9 @@ async fn test_sync_context_with_auth() { let frame = Bytes::from("test frame with auth"); let mut sync_ctx = sync_ctx; - let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap(); + let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, 0); + assert_eq!(durable_frame.max_frame_no, 0); assert_eq!(server.frame_count(), 1); } @@ -82,9 +82,9 @@ async fn test_sync_context_multiple_frames() { // Push multiple frames and verify incrementing frame numbers for i in 0..3 { let frame = Bytes::from(format!("frame data {}", i)); - let durable_frame = sync_ctx.push_frames(frame, 1, i, 1).await.unwrap(); + let durable_frame = sync_ctx.push_frames(frame, 1, i, 1, None).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, i); + assert_eq!(durable_frame.max_frame_no, i); assert_eq!(sync_ctx.durable_frame_num(), i); assert_eq!(server.frame_count(), i + 1); } @@ -108,9 +108,9 @@ async fn test_sync_context_corrupted_metadata() { let mut sync_ctx = sync_ctx; let frame = Bytes::from("test frame data"); - let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap(); + let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, 0); + assert_eq!(durable_frame.max_frame_no, 0); assert_eq!(server.frame_count(), 1); // Update metadata path to use -info instead of .meta @@ -152,9 +152,12 @@ async fn test_sync_restarts_with_lower_max_frame_no() { let mut sync_ctx = sync_ctx; let frame = Bytes::from("test frame data"); - let durable_frame = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await.unwrap(); + let durable_frame = sync_ctx + .push_frames(frame.clone(), 1, 0, 1, None) + .await + .unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, 0); + assert_eq!(durable_frame.max_frame_no, 0); assert_eq!(server.frame_count(), 1); // Bump the durable frame num so that the next time we call the @@ -180,14 +183,17 @@ async fn test_sync_restarts_with_lower_max_frame_no() { // This push should fail because we are ahead of the server and thus should get an invalid // frame no error. sync_ctx - .push_frames(frame.clone(), 1, frame_no, 1) + .push_frames(frame.clone(), 1, frame_no, 1, None) .await .unwrap_err(); let frame_no = sync_ctx.durable_frame_num() + 1; // This then should work because when the last one failed it updated our state of the server // durable_frame_num and we should then start writing from there. - sync_ctx.push_frames(frame, 1, frame_no, 1).await.unwrap(); + sync_ctx + .push_frames(frame, 1, frame_no, 1, None) + .await + .unwrap(); } #[tokio::test] @@ -215,7 +221,7 @@ async fn test_sync_context_retry_on_error() { server.return_error.store(true, Ordering::SeqCst); // First attempt should fail but retry - let result = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await; + let result = sync_ctx.push_frames(frame.clone(), 1, 0, 1, None).await; assert!(result.is_err()); // Advance time to trigger retries faster @@ -228,9 +234,9 @@ async fn test_sync_context_retry_on_error() { server.return_error.store(false, Ordering::SeqCst); // Next attempt should succeed - let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap(); + let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap(); sync_ctx.write_metadata().await.unwrap(); - assert_eq!(durable_frame, 0); + assert_eq!(durable_frame.max_frame_no, 0); assert_eq!(server.frame_count(), 1); }