Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libsql/src/local/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,10 @@ impl WalInsertHandle<'_> {
self.conn.wal_insert_frame(frame_no, frame)
}

pub fn in_session(&self) -> bool {
*self.in_session.borrow()
}

pub fn begin(&self) -> Result<()> {
assert!(!*self.in_session.borrow());
self.conn.wal_insert_begin()?;
Expand Down
99 changes: 57 additions & 42 deletions libsql/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use hyper::Body;
use std::path::Path;
use tokio::io::AsyncWriteExt as _;
use uuid::Uuid;
use zerocopy::big_endian;

#[cfg(test)]
mod test;
Expand Down Expand Up @@ -953,18 +954,24 @@ pub async fn try_pull(
sync_ctx: &mut SyncContext,
conn: &Connection,
) -> Result<crate::database::Replicated> {
let insert_handle = conn.wal_insert_handle()?;
// note, that updates of durable_frame_num are valid only after SQLite commited the WAL
// (because if WAL has uncommited suffix - it will be omitted by any other SQLite connection - for example after restart)
// so, try_pull maintains local next_frame_no during the pull operation and update durable_frame_num when it's appropriate
let mut next_frame_no = sync_ctx.durable_frame_num + 1;

let mut err = None;
// libsql maintain consistent state about WAL sync session locally in the insert_handle
// note, that insert_handle will always close the session on drop - so we never keep active WAL session after we exit from the method
let insert_handle = conn.wal_insert_handle()?;

loop {
// get current generation (it may be updated multiple times during execution)
let generation = sync_ctx.durable_generation();
let frame_no = sync_ctx.durable_frame_num() + 1;
match sync_ctx.pull_frames(generation, frame_no).await {

match sync_ctx.pull_frames(generation, next_frame_no).await {
Ok(PullResult::Frames(frames)) => {
tracing::debug!(
"pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}",
generation, frame_no, frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(),
generation, next_frame_no, next_frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(),
);
if frames.len() % FRAME_SIZE != 0 {
tracing::error!(
Expand All @@ -974,63 +981,71 @@ pub async fn try_pull(
);
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
}
for (i, chunk) in frames.chunks(FRAME_SIZE).enumerate() {
let r = insert_handle.insert_at(frame_no + i as u32, &chunk);
if let Err(e) = r {
tracing::error!(
"insert error (frame= {}) : {:?}",
sync_ctx.durable_frame_num + 1,
e
for chunk in frames.chunks(FRAME_SIZE) {
let mut size_after_buf = [0u8; 4];
size_after_buf.copy_from_slice(&chunk[4..8]);
let size_after = big_endian::U32::from_bytes(size_after_buf);
// start WAL sync session if it was closed
// (this can happen if on previous iteration client received commit frame)
if !insert_handle.in_session() {
tracing::debug!(
"pull_frames: generation={}, frame={}, start wal transaction session",
generation,
next_frame_no
);
insert_handle.begin()?;
}
let result = insert_handle.insert_at(next_frame_no, &chunk);
if let Err(e) = result {
tracing::error!("insert error (frame={}) : {:?}", next_frame_no, e);
return Err(e);
}
sync_ctx.durable_frame_num += 1;
// if this is commit frame - we can close WAL sync session and update durable_frame_num
if size_after.get() > 0 {
tracing::debug!(
"pull_frames: generation={}, frame={}, finish wal transaction session, size_after={}",
generation,
next_frame_no,
size_after.get()
);
insert_handle.end()?;
sync_ctx.durable_frame_num = next_frame_no;
sync_ctx.write_metadata().await?;
}

next_frame_no += 1;
}
}
Ok(PullResult::EndOfGeneration { max_generation }) => {
// If there are no more generations to pull, we're done.
if generation >= max_generation {
break;
}
insert_handle.end()?;
sync_ctx.write_metadata().await?;
assert!(
!insert_handle.in_session(),
"WAL transaction must be finished"
);

tracing::debug!(
"pull_frames: generation={}, frame={}, checkpoint in order to move to next generation",
generation,
next_frame_no
);
// TODO: Make this crash-proof.
conn.wal_checkpoint(true)?;

sync_ctx.next_generation();
sync_ctx.write_metadata().await?;

insert_handle.begin()?;
}
Err(e) => {
tracing::debug!("pull_frames error: {:?}", e);
err.replace(e);
break;
next_frame_no = 1;
}
Err(e) => return Err(e),
}
}
// This is crash-proof because we:
//
// 1. Write WAL frame first
// 2. Write new max frame to temporary metadata
// 3. Atomically rename the temporary metadata to the real metadata
//
// If we crash before metadata rename completes, the old metadata still
// points to last successful frame, allowing safe retry from that point.
// If we happen to have the frame already in the WAL, it's fine to re-pull
// because append locally is idempotent.
insert_handle.end()?;
sync_ctx.write_metadata().await?;

if let Some(err) = err {
Err(err)
} else {
Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 1,
})
}
Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 1,
})
}

fn check_if_file_exists(path: &str) -> core::result::Result<bool, SyncError> {
Expand Down
Loading