diff --git a/libsql/src/local/connection.rs b/libsql/src/local/connection.rs index 466d158622..6df44c3a1b 100644 --- a/libsql/src/local/connection.rs +++ b/libsql/src/local/connection.rs @@ -675,6 +675,10 @@ impl WalInsertHandle<'_> { self.conn.wal_insert_frame(frame_no, frame) } + pub fn in_session(&self) -> bool { + *self.in_session.borrow() + } + pub fn begin(&self) -> Result<()> { assert!(!*self.in_session.borrow()); self.conn.wal_insert_begin()?; diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index b713b88bbb..9ddf7cd50f 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -8,6 +8,7 @@ use hyper::Body; use std::path::Path; use tokio::io::AsyncWriteExt as _; use uuid::Uuid; +use zerocopy::big_endian; #[cfg(test)] mod test; @@ -953,18 +954,24 @@ pub async fn try_pull( sync_ctx: &mut SyncContext, conn: &Connection, ) -> Result { - let insert_handle = conn.wal_insert_handle()?; + // note, that updates of durable_frame_num are valid only after SQLite commited the WAL + // (because if WAL has uncommited suffix - it will be omitted by any other SQLite connection - for example after restart) + // so, try_pull maintains local next_frame_no during the pull operation and update durable_frame_num when it's appropriate + let mut next_frame_no = sync_ctx.durable_frame_num + 1; - let mut err = None; + // libsql maintain consistent state about WAL sync session locally in the insert_handle + // note, that insert_handle will always close the session on drop - so we never keep active WAL session after we exit from the method + let insert_handle = conn.wal_insert_handle()?; loop { + // get current generation (it may be updated multiple times during execution) let generation = sync_ctx.durable_generation(); - let frame_no = sync_ctx.durable_frame_num() + 1; - match sync_ctx.pull_frames(generation, frame_no).await { + + match sync_ctx.pull_frames(generation, next_frame_no).await { Ok(PullResult::Frames(frames)) => { tracing::debug!( "pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}", - generation, frame_no, frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(), + generation, next_frame_no, next_frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(), ); if frames.len() % FRAME_SIZE != 0 { tracing::error!( @@ -974,17 +981,39 @@ pub async fn try_pull( ); return Err(SyncError::InvalidPullFrameBytes(frames.len()).into()); } - for (i, chunk) in frames.chunks(FRAME_SIZE).enumerate() { - let r = insert_handle.insert_at(frame_no + i as u32, &chunk); - if let Err(e) = r { - tracing::error!( - "insert error (frame= {}) : {:?}", - sync_ctx.durable_frame_num + 1, - e + for chunk in frames.chunks(FRAME_SIZE) { + let mut size_after_buf = [0u8; 4]; + size_after_buf.copy_from_slice(&chunk[4..8]); + let size_after = big_endian::U32::from_bytes(size_after_buf); + // start WAL sync session if it was closed + // (this can happen if on previous iteration client received commit frame) + if !insert_handle.in_session() { + tracing::debug!( + "pull_frames: generation={}, frame={}, start wal transaction session", + generation, + next_frame_no ); + insert_handle.begin()?; + } + let result = insert_handle.insert_at(next_frame_no, &chunk); + if let Err(e) = result { + tracing::error!("insert error (frame={}) : {:?}", next_frame_no, e); return Err(e); } - sync_ctx.durable_frame_num += 1; + // if this is commit frame - we can close WAL sync session and update durable_frame_num + if size_after.get() > 0 { + tracing::debug!( + "pull_frames: generation={}, frame={}, finish wal transaction session, size_after={}", + generation, + next_frame_no, + size_after.get() + ); + insert_handle.end()?; + sync_ctx.durable_frame_num = next_frame_no; + sync_ctx.write_metadata().await?; + } + + next_frame_no += 1; } } Ok(PullResult::EndOfGeneration { max_generation }) => { @@ -992,45 +1021,31 @@ pub async fn try_pull( if generation >= max_generation { break; } - insert_handle.end()?; - sync_ctx.write_metadata().await?; + assert!( + !insert_handle.in_session(), + "WAL transaction must be finished" + ); + tracing::debug!( + "pull_frames: generation={}, frame={}, checkpoint in order to move to next generation", + generation, + next_frame_no + ); // TODO: Make this crash-proof. conn.wal_checkpoint(true)?; sync_ctx.next_generation(); sync_ctx.write_metadata().await?; - - insert_handle.begin()?; - } - Err(e) => { - tracing::debug!("pull_frames error: {:?}", e); - err.replace(e); - break; + next_frame_no = 1; } + Err(e) => return Err(e), } } - // This is crash-proof because we: - // - // 1. Write WAL frame first - // 2. Write new max frame to temporary metadata - // 3. Atomically rename the temporary metadata to the real metadata - // - // If we crash before metadata rename completes, the old metadata still - // points to last successful frame, allowing safe retry from that point. - // If we happen to have the frame already in the WAL, it's fine to re-pull - // because append locally is idempotent. - insert_handle.end()?; - sync_ctx.write_metadata().await?; - if let Some(err) = err { - Err(err) - } else { - Ok(crate::database::Replicated { - frame_no: None, - frames_synced: 1, - }) - } + Ok(crate::database::Replicated { + frame_no: None, + frames_synced: 1, + }) } fn check_if_file_exists(path: &str) -> core::result::Result {