diff --git a/libsql/src/database/builder.rs b/libsql/src/database/builder.rs index b8b128cb5b..3150648fa8 100644 --- a/libsql/src/database/builder.rs +++ b/libsql/src/database/builder.rs @@ -644,38 +644,54 @@ cfg_sync! { let mut bg_abort: Option> = None; + if let Some(sync_interval) = sync_interval { + let (cancel_tx, mut cancel_rx) = tokio::sync::oneshot::channel::<()>(); + + let sync_span = tracing::debug_span!("sync_interval"); + let _enter = sync_span.enter(); + let sync_ctx = db.sync_ctx.as_ref().unwrap().clone(); { let mut ctx = sync_ctx.lock().await; crate::sync::bootstrap_db(&mut ctx).await?; + tracing::debug!("finished bootstrap with sync interval"); } // db.connect creates a local db file, so it is important that we always call // `bootstrap_db` (for synced dbs) before calling connect. Otherwise, the sync // protocol skips calling `export` endpoint causing slowdown in initial bootstrap. let conn = db.connect()?; - let jh = tokio::spawn( + + tokio::spawn( async move { + let mut interval = tokio::time::interval(sync_interval); + loop { - tracing::trace!("trying to sync"); - let mut ctx = sync_ctx.lock().await; - if remote_writes { - if let Err(e) = crate::sync::try_pull(&mut ctx, &conn).await { - tracing::error!("sync error: {}", e); - } - } else { - if let Err(e) = crate::sync::sync_offline(&mut ctx, &conn).await { - tracing::error!("sync error: {}", e); + tokio::select! { + _ = &mut cancel_rx => break, + _ = interval.tick() => { + tracing::debug!("trying to sync"); + + let mut ctx = sync_ctx.lock().await; + + let result = if remote_writes { + crate::sync::try_pull(&mut ctx, &conn).await + } else { + crate::sync::sync_offline(&mut ctx, &conn).await + }; + + if let Err(e) = result { + tracing::error!("Error syncing database: {}", e); + } } } - tokio::time::sleep(sync_interval).await; } } - .instrument(tracing::info_span!("sync_interval")), + .instrument(tracing::debug_span!("sync interval thread")), ); - bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(jh.abort_handle()))); + bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(Some(cancel_tx)))); } Ok(Database { diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index 933ed199f7..de3ab6a4b4 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use chrono::Utc; use http::{HeaderValue, StatusCode}; use hyper::Body; -use tokio::{io::AsyncWriteExt as _, task::AbortHandle}; +use tokio::io::AsyncWriteExt as _; use uuid::Uuid; #[cfg(test)] @@ -81,11 +81,14 @@ pub struct PushResult { baton: Option, } -pub struct DropAbort(pub AbortHandle); +pub struct DropAbort(pub Option>); impl Drop for DropAbort { fn drop(&mut self) { - self.0.abort(); + tracing::debug!("aborting"); + if let Some(sender) = self.0.take() { + let _ = sender.send(()); + } } }