Skip to content

Commit 2c3c88c

Browse files
committed
Optimise sync_pull to pull frames in batches
Previously, we pulled frames one by one. This patch changes it pull frames in batches. Currently, the batch size is set to 128 (the maximum supported by the server)
1 parent 5d57c82 commit 2c3c88c

1 file changed

Lines changed: 66 additions & 12 deletions

File tree

libsql/src/sync.rs

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const METADATA_VERSION: u32 = 0;
2020

2121
const DEFAULT_MAX_RETRIES: usize = 5;
2222
const DEFAULT_PUSH_BATCH_SIZE: u32 = 128;
23+
const DEFAULT_PULL_BATCH_SIZE: u32 = 128;
2324

2425
#[derive(thiserror::Error, Debug)]
2526
#[non_exhaustive]
@@ -66,6 +67,8 @@ pub enum SyncError {
6667
InvalidLocalGeneration(u32, u32),
6768
#[error("invalid local state: {0}")]
6869
InvalidLocalState(String),
70+
#[error("server returned invalid length of frames: {0}")]
71+
InvalidPullFrameBytes(usize),
6972
}
7073

7174
impl SyncError {
@@ -98,8 +101,8 @@ pub enum PushStatus {
98101
}
99102

100103
pub enum PullResult {
101-
/// A frame was successfully pulled.
102-
Frame(Bytes),
104+
/// Frames were successfully pulled.
105+
Frames(Bytes),
103106
/// We've reached the end of the generation.
104107
EndOfGeneration { max_generation: u32 },
105108
}
@@ -122,6 +125,7 @@ pub struct SyncContext {
122125
auth_token: Option<HeaderValue>,
123126
max_retries: usize,
124127
push_batch_size: u32,
128+
pull_batch_size: u32,
125129
/// The current durable generation.
126130
durable_generation: u32,
127131
/// Represents the max_frame_no from the server.
@@ -154,6 +158,7 @@ impl SyncContext {
154158
auth_token,
155159
max_retries: DEFAULT_MAX_RETRIES,
156160
push_batch_size: DEFAULT_PUSH_BATCH_SIZE,
161+
pull_batch_size: DEFAULT_PULL_BATCH_SIZE,
157162
client,
158163
durable_generation: 0,
159164
durable_frame_num: 0,
@@ -175,7 +180,7 @@ impl SyncContext {
175180
}
176181

177182
#[tracing::instrument(skip(self))]
178-
pub(crate) async fn pull_one_frame(
183+
pub(crate) async fn pull_frames(
179184
&mut self,
180185
generation: u32,
181186
frame_no: u32,
@@ -185,9 +190,10 @@ impl SyncContext {
185190
self.sync_url,
186191
generation,
187192
frame_no,
188-
frame_no + 1
193+
// the server expects the range of [start, end) frames, i.e. end is exclusive
194+
frame_no + self.pull_batch_size
189195
);
190-
tracing::debug!("pulling frame");
196+
tracing::debug!("pulling frame (uri={})", uri);
191197
self.pull_with_retry(uri, self.max_retries).await
192198
}
193199

@@ -417,10 +423,24 @@ impl SyncContext {
417423
.map_err(SyncError::HttpDispatch)?;
418424

419425
if res.status().is_success() {
420-
let frame = hyper::body::to_bytes(res.into_body())
426+
let frames = hyper::body::to_bytes(res.into_body())
421427
.await
422428
.map_err(SyncError::HttpBody)?;
423-
return Ok(PullResult::Frame(frame));
429+
// a success result should always return some frames
430+
if frames.is_empty() {
431+
tracing::error!("server returned empty frames in pull response");
432+
return Err(SyncError::InvalidPullFrameBytes(0).into());
433+
}
434+
// the minimum payload size cannot be less than a single frame
435+
if frames.len() < FRAME_SIZE {
436+
tracing::error!(
437+
"server returned frames with invalid length: {} < {}",
438+
frames.len(),
439+
FRAME_SIZE
440+
);
441+
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
442+
}
443+
return Ok(PullResult::Frames(frames));
424444
}
425445
// BUG ALERT: The server returns a 500 error if the remote database is empty.
426446
// This is a bug and should be fixed.
@@ -887,6 +907,11 @@ async fn try_push(
887907
})
888908
}
889909

910+
/// PAGE_SIZE used by the sync / diskless server
911+
const PAGE_SIZE: usize = 4096;
912+
const FRAME_HEADER_SIZE: usize = 24;
913+
const FRAME_SIZE: usize = PAGE_SIZE + FRAME_HEADER_SIZE;
914+
890915
pub async fn try_pull(
891916
sync_ctx: &mut SyncContext,
892917
conn: &Connection,
@@ -898,10 +923,39 @@ pub async fn try_pull(
898923
loop {
899924
let generation = sync_ctx.durable_generation();
900925
let frame_no = sync_ctx.durable_frame_num() + 1;
901-
match sync_ctx.pull_one_frame(generation, frame_no).await {
902-
Ok(PullResult::Frame(frame)) => {
903-
insert_handle.insert(&frame)?;
904-
sync_ctx.durable_frame_num = frame_no;
926+
match sync_ctx.pull_frames(generation, frame_no).await {
927+
Ok(PullResult::Frames(frames)) => {
928+
tracing::debug!(
929+
"pull_frames: generation={}, start_frame_no={} (batch_size={}), frame_size={}",
930+
generation,
931+
frame_no,
932+
sync_ctx.pull_batch_size,
933+
frames.len(),
934+
);
935+
if frames.len() % FRAME_SIZE != 0 {
936+
tracing::error!(
937+
"frame size {} is not a multiple of the expected size {}",
938+
frames.len(),
939+
FRAME_SIZE,
940+
);
941+
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
942+
}
943+
for chunk in frames.chunks(FRAME_SIZE) {
944+
tracing::debug!(
945+
"inserting frame (frame_no={})",
946+
sync_ctx.durable_frame_num + 1
947+
);
948+
let r = insert_handle.insert(&chunk);
949+
if let Err(e) = r {
950+
tracing::debug!(
951+
"insert error (frame= {}) : {:?}",
952+
sync_ctx.durable_frame_num + 1,
953+
e
954+
);
955+
return Err(e);
956+
}
957+
sync_ctx.durable_frame_num += 1;
958+
}
905959
}
906960
Ok(PullResult::EndOfGeneration { max_generation }) => {
907961
// If there are no more generations to pull, we're done.
@@ -920,7 +974,7 @@ pub async fn try_pull(
920974
insert_handle.begin()?;
921975
}
922976
Err(e) => {
923-
tracing::debug!("pull_one_frame error: {:?}", e);
977+
tracing::debug!("pull_frames error: {:?}", e);
924978
err.replace(e);
925979
break;
926980
}

0 commit comments

Comments
 (0)