Skip to content

Commit 6c69a68

Browse files
committed
libsql: WAL sync baton handling
1 parent 81fa360 commit 6c69a68

2 files changed

Lines changed: 49 additions & 27 deletions

File tree

libsql/src/sync.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ pub enum SyncError {
4444
JsonEncode(serde_json::Error),
4545
#[error("failed to push frame: status={0}, error={1}")]
4646
PushFrame(StatusCode, String),
47+
#[error("no baton from WAL push operation")]
48+
NoBatonFromPush,
4749
#[error("failed to verify metadata file version: expected={0}, got={1}")]
4850
VerifyVersion(u32, u32),
4951
#[error("failed to verify metadata file hash: expected={0}, got={1}")]
@@ -79,7 +81,7 @@ pub struct PushResult {
7981
}
8082

8183
pub enum PushStatus {
82-
Ok,
84+
Ok { baton: String },
8385
Conflict,
8486
}
8587

@@ -174,28 +176,33 @@ impl SyncContext {
174176
#[tracing::instrument(skip(self, frames))]
175177
pub(crate) async fn push_frames(
176178
&mut self,
179+
baton: Option<String>,
177180
frames: Bytes,
178181
generation: u32,
179182
frame_no: u32,
180183
frames_count: u32,
181-
) -> Result<u32> {
182-
let uri = format!(
184+
) -> Result<(Option<String>, u32)> {
185+
let mut uri = format!(
183186
"{}/sync/{}/{}/{}",
184187
self.sync_url,
185188
generation,
186189
frame_no,
187190
frame_no + frames_count
188191
);
189192
tracing::debug!("pushing frame(frame_no={}, count={}, generation={})", frame_no, frames_count, generation);
193+
if let Some(baton) = baton {
194+
uri += &format!("/{}", baton);
195+
}
190196

191-
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
192197

193-
match result.status {
198+
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
199+
200+
let baton = match result.status {
194201
PushStatus::Conflict => {
195202
return Err(SyncError::InvalidPushFrameConflict(frame_no, result.max_frame_no).into());
196203
}
197-
_ => {}
198-
}
204+
PushStatus::Ok { baton } => baton,
205+
};
199206
let generation = result.generation;
200207
let durable_frame_num = result.max_frame_no;
201208

@@ -230,7 +237,7 @@ impl SyncContext {
230237
self.durable_generation = generation;
231238
self.durable_frame_num = durable_frame_num;
232239

233-
Ok(durable_frame_num)
240+
Ok((Some(baton), durable_frame_num))
234241
}
235242

236243
async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<PushResult> {
@@ -263,6 +270,11 @@ impl SyncContext {
263270
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
264271
.map_err(SyncError::JsonDecode)?;
265272

273+
let baton: Option<String> = resp
274+
.get("baton")
275+
.map(|v| v.as_str().map(String::from))
276+
.flatten();
277+
266278
let status = resp
267279
.get("status")
268280
.ok_or_else(|| SyncError::JsonValue(resp.clone()))?;
@@ -288,7 +300,13 @@ impl SyncContext {
288300
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;
289301

290302
let status = match status {
291-
"ok" => PushStatus::Ok,
303+
"ok" => {
304+
if let Some(baton) = baton {
305+
PushStatus::Ok { baton }
306+
} else {
307+
return Err(SyncError::NoBatonFromPush.into());
308+
}
309+
},
292310
"conflict" => PushStatus::Conflict,
293311
_ => return Err(SyncError::JsonValue(resp.clone()).into()),
294312
};
@@ -729,6 +747,7 @@ async fn try_push(
729747
});
730748
}
731749

750+
let mut baton: Option<String> = None;
732751
let generation = sync_ctx.durable_generation();
733752
let start_frame_no = sync_ctx.durable_frame_num() + 1;
734753
let end_frame_no = max_frame_no;
@@ -748,10 +767,12 @@ async fn try_push(
748767
// The server returns its maximum frame number. To avoid resending
749768
// frames the server already knows about, we need to update the
750769
// frame number to the one returned by the server.
751-
let max_frame_no = sync_ctx
752-
.push_frames(frames.freeze(), generation, frame_no, batch_size)
770+
let (new_baton, max_frame_no) = sync_ctx
771+
.push_frames(baton.clone(), frames.freeze(), generation, frame_no, batch_size)
753772
.await?;
754773

774+
baton = new_baton;
775+
755776
if max_frame_no > frame_no {
756777
frame_no = max_frame_no + 1;
757778
} else {

libsql/src/sync/test.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ async fn test_sync_context_push_frame() {
2828
let mut sync_ctx = sync_ctx;
2929

3030
// Push a frame and verify the response
31-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
31+
let durable_frame = sync_ctx.push_frames(None, frame, 1, 0, 1).await.unwrap();
3232
sync_ctx.write_metadata().await.unwrap();
33-
assert_eq!(durable_frame, 0); // First frame should return max_frame_no = 0
33+
assert_eq!(durable_frame.1, 0); // First frame should return max_frame_no = 0
3434

3535
// Verify internal state was updated
3636
assert_eq!(sync_ctx.durable_frame_num(), 0);
@@ -56,9 +56,9 @@ async fn test_sync_context_with_auth() {
5656
let frame = Bytes::from("test frame with auth");
5757
let mut sync_ctx = sync_ctx;
5858

59-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
59+
let durable_frame = sync_ctx.push_frames(None, frame, 1, 0, 1).await.unwrap();
6060
sync_ctx.write_metadata().await.unwrap();
61-
assert_eq!(durable_frame, 0);
61+
assert_eq!(durable_frame.1, 0);
6262
assert_eq!(server.frame_count(), 1);
6363
}
6464

@@ -82,9 +82,9 @@ async fn test_sync_context_multiple_frames() {
8282
// Push multiple frames and verify incrementing frame numbers
8383
for i in 0..3 {
8484
let frame = Bytes::from(format!("frame data {}", i));
85-
let durable_frame = sync_ctx.push_frames(frame, 1, i, 1).await.unwrap();
85+
let durable_frame = sync_ctx.push_frames(None, frame, 1, i, 1).await.unwrap();
8686
sync_ctx.write_metadata().await.unwrap();
87-
assert_eq!(durable_frame, i);
87+
assert_eq!(durable_frame.1, i);
8888
assert_eq!(sync_ctx.durable_frame_num(), i);
8989
assert_eq!(server.frame_count(), i + 1);
9090
}
@@ -108,9 +108,9 @@ async fn test_sync_context_corrupted_metadata() {
108108

109109
let mut sync_ctx = sync_ctx;
110110
let frame = Bytes::from("test frame data");
111-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
111+
let durable_frame = sync_ctx.push_frames(None, frame, 1, 0, 1).await.unwrap();
112112
sync_ctx.write_metadata().await.unwrap();
113-
assert_eq!(durable_frame, 0);
113+
assert_eq!(durable_frame.1, 0);
114114
assert_eq!(server.frame_count(), 1);
115115

116116
// Update metadata path to use -info instead of .meta
@@ -152,9 +152,9 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
152152

153153
let mut sync_ctx = sync_ctx;
154154
let frame = Bytes::from("test frame data");
155-
let durable_frame = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await.unwrap();
155+
let durable_frame = sync_ctx.push_frames(None, frame.clone(), 1, 0, 1).await.unwrap();
156156
sync_ctx.write_metadata().await.unwrap();
157-
assert_eq!(durable_frame, 0);
157+
assert_eq!(durable_frame.1, 0);
158158
assert_eq!(server.frame_count(), 1);
159159

160160
// Bump the durable frame num so that the next time we call the
@@ -180,14 +180,14 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
180180
// This push should fail because we are ahead of the server and thus should get an invalid
181181
// frame no error.
182182
sync_ctx
183-
.push_frames(frame.clone(), 1, frame_no, 1)
183+
.push_frames(None, frame.clone(), 1, frame_no, 1)
184184
.await
185185
.unwrap_err();
186186

187187
let frame_no = sync_ctx.durable_frame_num() + 1;
188188
// This then should work because when the last one failed it updated our state of the server
189189
// durable_frame_num and we should then start writing from there.
190-
sync_ctx.push_frames(frame, 1, frame_no, 1).await.unwrap();
190+
sync_ctx.push_frames(None, frame, 1, frame_no, 1).await.unwrap();
191191
}
192192

193193
#[tokio::test]
@@ -215,7 +215,7 @@ async fn test_sync_context_retry_on_error() {
215215
server.return_error.store(true, Ordering::SeqCst);
216216

217217
// First attempt should fail but retry
218-
let result = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await;
218+
let result = sync_ctx.push_frames(None, frame.clone(), 1, 0, 1).await;
219219
assert!(result.is_err());
220220

221221
// Advance time to trigger retries faster
@@ -228,9 +228,9 @@ async fn test_sync_context_retry_on_error() {
228228
server.return_error.store(false, Ordering::SeqCst);
229229

230230
// Next attempt should succeed
231-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
231+
let durable_frame = sync_ctx.push_frames(None, frame, 1, 0, 1).await.unwrap();
232232
sync_ctx.write_metadata().await.unwrap();
233-
assert_eq!(durable_frame, 0);
233+
assert_eq!(durable_frame.1, 0);
234234
assert_eq!(server.frame_count(), 1);
235235
}
236236

@@ -378,7 +378,8 @@ impl MockServer {
378378
let response = serde_json::json!({
379379
"status": "ok",
380380
"generation": 1,
381-
"max_frame_no": current_count
381+
"max_frame_no": current_count,
382+
"baton": "test_baton"
382383
});
383384

384385
Ok::<_, hyper::Error>(

0 commit comments

Comments
 (0)