Skip to content

Commit e0a486f

Browse files
committed
libsql: WAL sync baton handling
1 parent a05ad5d commit e0a486f

2 files changed

Lines changed: 50 additions & 28 deletions

File tree

libsql/src/sync.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub enum SyncError {
4444
JsonEncode(serde_json::Error),
4545
#[error("failed to push frame: status={0}, error={1}")]
4646
PushFrame(StatusCode, String),
47+
#[error("no baton from WAL push operation")]
48+
NoBatonFromPush,
4749
#[error("failed to verify metadata file version: expected={0}, got={1}")]
4850
VerifyVersion(u32, u32),
4951
#[error("failed to verify metadata file hash: expected={0}, got={1}")]
@@ -75,7 +77,7 @@ pub struct PushResult {
7577
}
7678

7779
pub enum PushStatus {
78-
Ok,
80+
Ok { baton: String },
7981
Conflict,
8082
}
8183

@@ -161,28 +163,33 @@ impl SyncContext {
161163
#[tracing::instrument(skip(self, frames))]
162164
pub(crate) async fn push_frames(
163165
&mut self,
166+
baton: Option<String>,
164167
frames: Bytes,
165168
generation: u32,
166169
frame_no: u32,
167170
frames_count: u32,
168-
) -> Result<u32> {
169-
let uri = format!(
171+
) -> Result<(Option<String>, u32)> {
172+
let mut uri = format!(
170173
"{}/sync/{}/{}/{}",
171174
self.sync_url,
172175
generation,
173176
frame_no,
174177
frame_no + frames_count
175178
);
176-
tracing::debug!("pushing frame");
179+
if let Some(baton) = baton {
180+
uri += &format!("/{}", baton);
181+
}
177182

178-
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
183+
tracing::debug!("pushing frame with uri: {}", uri);
179184

180-
match result.status {
185+
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
186+
187+
let baton = match result.status {
181188
PushStatus::Conflict => {
182189
return Err(SyncError::InvalidPushFrameConflict(frame_no, result.max_frame_no).into());
183190
}
184-
_ => {}
185-
}
191+
PushStatus::Ok { baton } => baton,
192+
};
186193
let generation = result.generation;
187194
let durable_frame_num = result.max_frame_no;
188195

@@ -217,7 +224,7 @@ impl SyncContext {
217224
self.durable_generation = generation;
218225
self.durable_frame_num = durable_frame_num;
219226

220-
Ok(durable_frame_num)
227+
Ok((Some(baton), durable_frame_num))
221228
}
222229

223230
async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<PushResult> {
@@ -250,6 +257,11 @@ impl SyncContext {
250257
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
251258
.map_err(SyncError::JsonDecode)?;
252259

260+
let baton: Option<String> = resp
261+
.get("baton")
262+
.map(|v| v.as_str().map(String::from))
263+
.flatten();
264+
253265
let status = resp
254266
.get("status")
255267
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
@@ -275,7 +287,13 @@ impl SyncContext {
275287
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;
276288

277289
let status = match status {
278-
"ok" => PushStatus::Ok,
290+
"ok" => {
291+
if let Some(baton) = baton {
292+
PushStatus::Ok { baton }
293+
} else {
294+
return Err(SyncError::NoBatonFromPush.into());
295+
}
296+
},
279297
"conflict" => PushStatus::Conflict,
280298
_ => return Err(SyncError::JsonValue(resp.clone()).into()),
281299
};
@@ -601,6 +619,7 @@ async fn try_push(
601619
});
602620
}
603621

622+
let mut baton: Option<String> = None;
604623
let generation = sync_ctx.durable_generation();
605624
let start_frame_no = sync_ctx.durable_frame_num() + 1;
606625
let end_frame_no = max_frame_no;
@@ -620,10 +639,12 @@ async fn try_push(
620639
// The server returns its maximum frame number. To avoid resending
621640
// frames the server already knows about, we need to update the
622641
// frame number to the one returned by the server.
623-
let max_frame_no = sync_ctx
624-
.push_frames(frames.freeze(), generation, frame_no, batch_size)
642+
let (new_baton, max_frame_no) = sync_ctx
643+
.push_frames(baton.clone(), frames.freeze(), generation, frame_no, batch_size)
625644
.await?;
626645

646+
baton = new_baton;
647+
627648
if max_frame_no > frame_no {
628649
frame_no = max_frame_no + 1;
629650
} else {

libsql/src/sync/test.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ async fn test_sync_context_push_frame() {
2828
let mut sync_ctx = sync_ctx;
2929

3030
// Push a frame and verify the response
31-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
31+
let durable_frame = sync_ctx.push_frames(None, frame, 1, 0, 1).await.unwrap();
3232
sync_ctx.write_metadata().await.unwrap();
33-
assert_eq!(durable_frame, 0); // First frame should return max_frame_no = 0
33+
assert_eq!(durable_frame.1, 0); // First frame should return max_frame_no = 0
3434

3535
// Verify internal state was updated
3636
assert_eq!(sync_ctx.durable_frame_num(), 0);
@@ -56,9 +56,9 @@ async fn test_sync_context_with_auth() {
5656
let frame = Bytes::from("test frame with auth");
5757
let mut sync_ctx = sync_ctx;
5858

59-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
59+
let durable_frame = sync_ctx.push_frames(None, frame, 1, 0, 1).await.unwrap();
6060
sync_ctx.write_metadata().await.unwrap();
61-
assert_eq!(durable_frame, 0);
61+
assert_eq!(durable_frame.1, 0);
6262
assert_eq!(server.frame_count(), 1);
6363
}
6464

@@ -82,9 +82,9 @@ async fn test_sync_context_multiple_frames() {
8282
// Push multiple frames and verify incrementing frame numbers
8383
for i in 0..3 {
8484
let frame = Bytes::from(format!("frame data {}", i));
85-
let durable_frame = sync_ctx.push_frames(frame, 1, i, 1).await.unwrap();
85+
let durable_frame = sync_ctx.push_frames(None, frame, 1, i, 1).await.unwrap();
8686
sync_ctx.write_metadata().await.unwrap();
87-
assert_eq!(durable_frame, i);
87+
assert_eq!(durable_frame.1, i);
8888
assert_eq!(sync_ctx.durable_frame_num(), i);
8989
assert_eq!(server.frame_count(), i + 1);
9090
}
@@ -108,9 +108,9 @@ async fn test_sync_context_corrupted_metadata() {
108108

109109
let mut sync_ctx = sync_ctx;
110110
let frame = Bytes::from("test frame data");
111-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
111+
let durable_frame = sync_ctx.push_frames(None, frame, 1, 0, 1).await.unwrap();
112112
sync_ctx.write_metadata().await.unwrap();
113-
assert_eq!(durable_frame, 0);
113+
assert_eq!(durable_frame.1, 0);
114114
assert_eq!(server.frame_count(), 1);
115115

116116
// Update metadata path to use -info instead of .meta
@@ -152,9 +152,9 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
152152

153153
let mut sync_ctx = sync_ctx;
154154
let frame = Bytes::from("test frame data");
155-
let durable_frame = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await.unwrap();
155+
let durable_frame = sync_ctx.push_frames(None, frame.clone(), 1, 0, 1).await.unwrap();
156156
sync_ctx.write_metadata().await.unwrap();
157-
assert_eq!(durable_frame, 0);
157+
assert_eq!(durable_frame.1, 0);
158158
assert_eq!(server.frame_count(), 1);
159159

160160
// Bump the durable frame num so that the next time we call the
@@ -180,14 +180,14 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
180180
// This push should fail because we are ahead of the server and thus should get an invalid
181181
// frame no error.
182182
sync_ctx
183-
.push_frames(frame.clone(), 1, frame_no, 1)
183+
.push_frames(None, frame.clone(), 1, frame_no, 1)
184184
.await
185185
.unwrap_err();
186186

187187
let frame_no = sync_ctx.durable_frame_num() + 1;
188188
// This then should work because when the last one failed it updated our state of the server
189189
// durable_frame_num and we should then start writing from there.
190-
sync_ctx.push_frames(frame, 1, frame_no, 1).await.unwrap();
190+
sync_ctx.push_frames(None, frame, 1, frame_no, 1).await.unwrap();
191191
}
192192

193193
#[tokio::test]
@@ -215,7 +215,7 @@ async fn test_sync_context_retry_on_error() {
215215
server.return_error.store(true, Ordering::SeqCst);
216216

217217
// First attempt should fail but retry
218-
let result = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await;
218+
let result = sync_ctx.push_frames(None, frame.clone(), 1, 0, 1).await;
219219
assert!(result.is_err());
220220

221221
// Advance time to trigger retries faster
@@ -228,9 +228,9 @@ async fn test_sync_context_retry_on_error() {
228228
server.return_error.store(false, Ordering::SeqCst);
229229

230230
// Next attempt should succeed
231-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
231+
let durable_frame = sync_ctx.push_frames(None, frame, 1, 0, 1).await.unwrap();
232232
sync_ctx.write_metadata().await.unwrap();
233-
assert_eq!(durable_frame, 0);
233+
assert_eq!(durable_frame.1, 0);
234234
assert_eq!(server.frame_count(), 1);
235235
}
236236

@@ -378,7 +378,8 @@ impl MockServer {
378378
let response = serde_json::json!({
379379
"status": "ok",
380380
"generation": 1,
381-
"max_frame_no": current_count
381+
"max_frame_no": current_count,
382+
"baton": "test_baton"
382383
});
383384

384385
Ok::<_, hyper::Error>(

0 commit comments

Comments
 (0)