Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 60 additions & 12 deletions libsql/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub struct PushResult {
status: PushStatus,
generation: u32,
max_frame_no: u32,
baton: Option<String>,
}

pub enum PushStatus {
Expand All @@ -97,6 +98,12 @@ struct InfoResult {
current_generation: u32,
}

#[derive(Debug)]
struct PushFramesResult {
max_frame_no: u32,
baton: Option<String>,
}

pub struct SyncContext {
db_path: String,
client: hyper::Client<ConnectorService, Body>,
Expand Down Expand Up @@ -180,15 +187,30 @@ impl SyncContext {
generation: u32,
frame_no: u32,
frames_count: u32,
) -> Result<u32> {
let uri = format!(
"{}/sync/{}/{}/{}",
self.sync_url,
generation,
baton: Option<String>,
) -> Result<PushFramesResult> {
let uri = {
let mut uri = format!(
"{}/sync/{}/{}/{}",
self.sync_url,
generation,
frame_no,
frame_no + frames_count
);
if let Some(ref baton) = baton {
uri.push_str(&format!("/{}", baton));
}
uri
};

tracing::debug!(
"pushing frame(frame_no={} (to={}), count={}, generation={}, baton={:?})",
frame_no,
frame_no + frames_count
frame_no + frames_count,
frames_count,
generation,
baton
);
tracing::debug!("pushing frame(frame_no={}, count={}, generation={})", frame_no, frames_count, generation);

let result = self.push_with_retry(uri, frames, self.max_retries).await?;

Expand All @@ -200,6 +222,7 @@ impl SyncContext {
}
let generation = result.generation;
let durable_frame_num = result.max_frame_no;
let baton = result.baton;

if durable_frame_num > frame_no + frames_count - 1 {
tracing::error!(
Expand Down Expand Up @@ -232,7 +255,10 @@ impl SyncContext {
self.durable_generation = generation;
self.durable_frame_num = durable_frame_num;

Ok(durable_frame_num)
Ok(PushFramesResult {
max_frame_no: durable_frame_num,
baton,
})
}

async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<PushResult> {
Expand Down Expand Up @@ -289,14 +315,32 @@ impl SyncContext {
.as_u64()
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;

let baton = resp
.get("baton")
.and_then(|v| v.as_str())
.map(|s| s.to_string());

tracing::trace!(
?baton,
?generation,
?max_frame_no,
?status,
"pushed frame to server"
);

let status = match status {
"ok" => PushStatus::Ok,
"conflict" => PushStatus::Conflict,
_ => return Err(SyncError::JsonValue(resp.clone()).into()),
};
let generation = generation as u32;
let generation = generation as u32;
let max_frame_no = max_frame_no as u32;
return Ok(PushResult { status, generation, max_frame_no });
return Ok(PushResult {
status,
generation,
max_frame_no,
baton,
});
}

if res.status().is_redirection() {
Expand Down Expand Up @@ -778,6 +822,7 @@ async fn try_push(
let generation = sync_ctx.durable_generation();
let start_frame_no = sync_ctx.durable_frame_num() + 1;
let end_frame_no = max_frame_no;
let mut baton = None;

let mut frame_no = start_frame_no;
while frame_no <= end_frame_no {
Expand All @@ -794,9 +839,12 @@ async fn try_push(
// The server returns its maximum frame number. To avoid resending
// frames the server already knows about, we need to update the
// frame number to the one returned by the server.
let max_frame_no = sync_ctx
.push_frames(frames.freeze(), generation, frame_no, batch_size)
let result = sync_ctx
.push_frames(frames.freeze(), generation, frame_no, batch_size, baton)
.await?;
// if the server sent us a baton, then we will reuse it for the next request
baton = result.baton;
let max_frame_no = result.max_frame_no;

if max_frame_no > frame_no {
frame_no = max_frame_no + 1;
Expand Down
36 changes: 21 additions & 15 deletions libsql/src/sync/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ async fn test_sync_context_push_frame() {
let mut sync_ctx = sync_ctx;

// Push a frame and verify the response
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap();
sync_ctx.write_metadata().await.unwrap();
assert_eq!(durable_frame, 0); // First frame should return max_frame_no = 0
assert_eq!(durable_frame.max_frame_no, 0); // First frame should return max_frame_no = 0

// Verify internal state was updated
assert_eq!(sync_ctx.durable_frame_num(), 0);
Expand All @@ -56,9 +56,9 @@ async fn test_sync_context_with_auth() {
let frame = Bytes::from("test frame with auth");
let mut sync_ctx = sync_ctx;

let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap();
sync_ctx.write_metadata().await.unwrap();
assert_eq!(durable_frame, 0);
assert_eq!(durable_frame.max_frame_no, 0);
assert_eq!(server.frame_count(), 1);
}

Expand All @@ -82,9 +82,9 @@ async fn test_sync_context_multiple_frames() {
// Push multiple frames and verify incrementing frame numbers
for i in 0..3 {
let frame = Bytes::from(format!("frame data {}", i));
let durable_frame = sync_ctx.push_frames(frame, 1, i, 1).await.unwrap();
let durable_frame = sync_ctx.push_frames(frame, 1, i, 1, None).await.unwrap();
sync_ctx.write_metadata().await.unwrap();
assert_eq!(durable_frame, i);
assert_eq!(durable_frame.max_frame_no, i);
assert_eq!(sync_ctx.durable_frame_num(), i);
assert_eq!(server.frame_count(), i + 1);
}
Expand All @@ -108,9 +108,9 @@ async fn test_sync_context_corrupted_metadata() {

let mut sync_ctx = sync_ctx;
let frame = Bytes::from("test frame data");
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap();
sync_ctx.write_metadata().await.unwrap();
assert_eq!(durable_frame, 0);
assert_eq!(durable_frame.max_frame_no, 0);
assert_eq!(server.frame_count(), 1);

// Update metadata path to use -info instead of .meta
Expand Down Expand Up @@ -152,9 +152,12 @@ async fn test_sync_restarts_with_lower_max_frame_no() {

let mut sync_ctx = sync_ctx;
let frame = Bytes::from("test frame data");
let durable_frame = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await.unwrap();
let durable_frame = sync_ctx
.push_frames(frame.clone(), 1, 0, 1, None)
.await
.unwrap();
sync_ctx.write_metadata().await.unwrap();
assert_eq!(durable_frame, 0);
assert_eq!(durable_frame.max_frame_no, 0);
assert_eq!(server.frame_count(), 1);

// Bump the durable frame num so that the next time we call the
Expand All @@ -180,14 +183,17 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
// This push should fail because we are ahead of the server and thus should get an invalid
// frame no error.
sync_ctx
.push_frames(frame.clone(), 1, frame_no, 1)
.push_frames(frame.clone(), 1, frame_no, 1, None)
.await
.unwrap_err();

let frame_no = sync_ctx.durable_frame_num() + 1;
// This then should work because when the last one failed it updated our state of the server
// durable_frame_num and we should then start writing from there.
sync_ctx.push_frames(frame, 1, frame_no, 1).await.unwrap();
sync_ctx
.push_frames(frame, 1, frame_no, 1, None)
.await
.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -215,7 +221,7 @@ async fn test_sync_context_retry_on_error() {
server.return_error.store(true, Ordering::SeqCst);

// First attempt should fail but retry
let result = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await;
let result = sync_ctx.push_frames(frame.clone(), 1, 0, 1, None).await;
assert!(result.is_err());

// Advance time to trigger retries faster
Expand All @@ -228,9 +234,9 @@ async fn test_sync_context_retry_on_error() {
server.return_error.store(false, Ordering::SeqCst);

// Next attempt should succeed
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap();
sync_ctx.write_metadata().await.unwrap();
assert_eq!(durable_frame, 0);
assert_eq!(durable_frame.max_frame_no, 0);
assert_eq!(server.frame_count(), 1);
}

Expand Down
Loading