Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ impl Database {
/// Sync WAL frames to remote.
pub async fn sync_offline(&self) -> Result<crate::database::Replicated> {
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
crate::sync::bootstrap_db(&mut sync_ctx).await?;
let conn = self.connect()?;

crate::sync::sync_offline(&mut sync_ctx, &conn).await
}

Expand Down
102 changes: 76 additions & 26 deletions libsql/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub enum SyncError {
PullDb(StatusCode, String),
#[error("server returned a lower generation than local: local={0}, remote={1}")]
InvalidLocalGeneration(u32, u32),
#[error("invalid local state: {0}")]
InvalidLocalState(String),
}

impl SyncError {
Expand Down Expand Up @@ -509,27 +511,63 @@ impl SyncContext {
}

async fn sync_db_if_needed(&mut self, generation: u32) -> Result<()> {
// we will get the export file only if the remote generation is different from the one we have
if generation == self.durable_generation {
return Ok(());
}
// somehow we are ahead of the remote in generations. following should not happen because
// we checkpoint only if the remote server tells us to do so.
if self.durable_generation > generation {
tracing::error!(
"server returned a lower generation than what we have: sent={}, got={}",
"server returned a lower generation than what we have: local={}, remote={}",
self.durable_generation,
generation
);
return Err(
SyncError::InvalidLocalGeneration(self.durable_generation, generation).into(),
);
}
tracing::debug!(
"syncing db file from remote server, generation={}",
generation
);
self.sync_db(generation).await
// we use the following heuristic to determine if we need to sync the db file
// 1. if no db file or the metadata file exists, then user is starting from scratch
// and we will do the sync
// 2. if the db file exists, but the metadata file does not exist (or other way around),
// then local db is in an incorrect state. we stop and return with an error
// 3. if the db file exists and the metadata file exists, then we don't need to do the
// sync
let metadata_exists = check_if_file_exists(&format!("{}-info", self.db_path))?;
let db_file_exists = check_if_file_exists(&self.db_path)?;
match (metadata_exists, db_file_exists) {
(false, false) => {
// neither the db file nor the metadata file exists, lets bootstrap from remote
tracing::debug!(
"syncing db file from remote server, generation={}",
generation
);
self.sync_db(generation).await
}
(false, true) => {
// kinda inconsistent state: DB exists but metadata missing
// however, this generally not an issue. For a fresh db, a user might do writes
// locally and then try to do sync later. So in this case, we will not
// bootstrap the db file and let the user proceed. If it is not a fresh db, the
// push will fail anyways later.
// if metadata file does not exist, then generation should be zero
assert_eq!(self.durable_generation, 0);
// lets initialise it to first generation
self.durable_generation = 1;
Ok(())
}
(true, false) => {
// inconsistent state: Metadata exists but DB missing
tracing::error!(
"local state is incorrect, metadata file exists but db file does not"
);
Err(SyncError::InvalidLocalState(
"metadata file exists but db file does not".to_string(),
)
.into())
}
(true, true) => {
// both files exists, no need to sync
Ok(())
}
}
}

/// sync_db will download the db file from the remote server and replace the local file.
Expand Down Expand Up @@ -646,6 +684,28 @@ async fn atomic_write<P: AsRef<Path>>(path: P, data: &[u8]) -> Result<()> {
Ok(())
}

/// bootstrap_db brings the .db file from remote, if required. If the .db file already exists, then
/// it does nothing. Calling this function multiple times is safe.
pub async fn bootstrap_db(sync_ctx: &mut SyncContext) -> Result<()> {
// todo: we are checking with the remote server only during initialisation. ideally,
// we need to do this when we notice a large gap in generations, when bootstrapping is cheaper
// than pulling each frame
if !sync_ctx.initial_server_sync {
// sync is being called first time. so we will call remote, get the generation information
// if we are lagging behind, then we will call the export API and get to the latest
// generation directly.
let info = sync_ctx.get_remote_info().await?;
sync_ctx
.sync_db_if_needed(info.current_generation)
.await?;
// when sync_ctx is initialised, we set durable_generation to 0. however, once
// sync_db is called, it should be > 0.
assert!(sync_ctx.durable_generation > 0, "generation should be > 0");
sync_ctx.initial_server_sync = true;
}
Ok(())
}

/// Sync WAL frames to remote.
pub async fn sync_offline(
sync_ctx: &mut SyncContext,
Expand All @@ -667,22 +727,6 @@ pub async fn sync_offline(
Err(e) => Err(e),
}
} else {
// todo: we are checking with the remote server only during initialisation. ideally,
// we should check everytime we try to sync with the remote server. However, we need to close
// all the ongoing connections since we replace `.db` file and remove the `.db-wal` file
if !sync_ctx.initial_server_sync {
// sync is being called first time. so we will call remote, get the generation information
// if we are lagging behind, then we will call the export API and get to the latest
// generation directly.
let info = sync_ctx.get_remote_info().await?;
sync_ctx
.sync_db_if_needed(info.current_generation)
.await?;
// when sync_ctx is initialised, we set durable_generation to 0. however, once
// sync_db is called, it should be > 0.
assert!(sync_ctx.durable_generation > 0, "generation should be > 0");
sync_ctx.initial_server_sync = true;
}
try_pull(sync_ctx, conn).await
}
.or_else(|err| {
Expand Down Expand Up @@ -831,3 +875,9 @@ async fn try_pull(
})
}
}

fn check_if_file_exists(path: &str) -> core::result::Result<bool, SyncError> {
Path::new(&path)
.try_exists()
.map_err(SyncError::io("metadata file exists"))
}
Loading