From b551a60578a6d0f4fd9f6040b30e2a2a4187ef38 Mon Sep 17 00:00:00 2001 From: "Levy A." Date: Mon, 26 May 2025 11:09:39 -0300 Subject: [PATCH 1/2] fix: yield back to executor for abortion --- libsql/src/database/builder.rs | 14 +++++++++++--- libsql/src/sync.rs | 4 ++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/libsql/src/database/builder.rs b/libsql/src/database/builder.rs index b8b128cb5b..3b18abc2f0 100644 --- a/libsql/src/database/builder.rs +++ b/libsql/src/database/builder.rs @@ -645,10 +645,14 @@ cfg_sync! { let mut bg_abort: Option> = None; if let Some(sync_interval) = sync_interval { + let sync_span = tracing::debug_span!("sync_interval"); + let _enter = sync_span.enter(); + let sync_ctx = db.sync_ctx.as_ref().unwrap().clone(); { let mut ctx = sync_ctx.lock().await; crate::sync::bootstrap_db(&mut ctx).await?; + tracing::debug!("finished bootstrap with sync interval"); } // db.connect creates a local db file, so it is important that we always call @@ -657,8 +661,13 @@ cfg_sync! { let conn = db.connect()?; let jh = tokio::spawn( async move { + let mut interval = tokio::time::interval(sync_interval); + loop { - tracing::trace!("trying to sync"); + tracing::info!("trying to sync"); + + interval.tick().await; + let mut ctx = sync_ctx.lock().await; if remote_writes { if let Err(e) = crate::sync::try_pull(&mut ctx, &conn).await { @@ -669,10 +678,9 @@ cfg_sync! { tracing::error!("sync error: {}", e); } } - tokio::time::sleep(sync_interval).await; } } - .instrument(tracing::info_span!("sync_interval")), + .instrument(tracing::debug_span!("sync interval thread")), ); bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(jh.abort_handle()))); diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index 933ed199f7..b0156d4480 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -846,6 +846,8 @@ async fn try_push( let mut frame_no = start_frame_no; while frame_no <= end_frame_no { + tokio::task::yield_now().await; + let batch_size = sync_ctx.push_batch_size.min(end_frame_no - frame_no + 1); let mut frames = conn.wal_get_frame(frame_no, page_size)?; if batch_size > 1 { @@ -893,6 +895,8 @@ pub async fn try_pull( let mut err = None; loop { + tokio::task::yield_now().await; + let generation = sync_ctx.durable_generation(); let frame_no = sync_ctx.durable_frame_num() + 1; match sync_ctx.pull_one_frame(generation, frame_no).await { From db0878457afd921c8af14f26420ee957189d72e5 Mon Sep 17 00:00:00 2001 From: "Levy A." Date: Tue, 27 May 2025 14:29:39 -0300 Subject: [PATCH 2/2] fix: use oneshot channel for cancellation --- libsql/src/database/builder.rs | 36 +++++++++++++++++++++------------- libsql/src/sync.rs | 13 ++++++------ 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/libsql/src/database/builder.rs b/libsql/src/database/builder.rs index 3b18abc2f0..3150648fa8 100644 --- a/libsql/src/database/builder.rs +++ b/libsql/src/database/builder.rs @@ -644,7 +644,10 @@ cfg_sync! { let mut bg_abort: Option> = None; + if let Some(sync_interval) = sync_interval { + let (cancel_tx, mut cancel_rx) = tokio::sync::oneshot::channel::<()>(); + let sync_span = tracing::debug_span!("sync_interval"); let _enter = sync_span.enter(); @@ -659,23 +662,28 @@ cfg_sync! { // `bootstrap_db` (for synced dbs) before calling connect. Otherwise, the sync // protocol skips calling `export` endpoint causing slowdown in initial bootstrap. let conn = db.connect()?; - let jh = tokio::spawn( + + tokio::spawn( async move { let mut interval = tokio::time::interval(sync_interval); loop { - tracing::info!("trying to sync"); - - interval.tick().await; - - let mut ctx = sync_ctx.lock().await; - if remote_writes { - if let Err(e) = crate::sync::try_pull(&mut ctx, &conn).await { - tracing::error!("sync error: {}", e); - } - } else { - if let Err(e) = crate::sync::sync_offline(&mut ctx, &conn).await { - tracing::error!("sync error: {}", e); + tokio::select! { + _ = &mut cancel_rx => break, + _ = interval.tick() => { + tracing::debug!("trying to sync"); + + let mut ctx = sync_ctx.lock().await; + + let result = if remote_writes { + crate::sync::try_pull(&mut ctx, &conn).await + } else { + crate::sync::sync_offline(&mut ctx, &conn).await + }; + + if let Err(e) = result { + tracing::error!("Error syncing database: {}", e); + } } } } @@ -683,7 +691,7 @@ cfg_sync! { .instrument(tracing::debug_span!("sync interval thread")), ); - bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(jh.abort_handle()))); + bg_abort.replace(std::sync::Arc::new(crate::sync::DropAbort(Some(cancel_tx)))); } Ok(Database { diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index b0156d4480..de3ab6a4b4 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use chrono::Utc; use http::{HeaderValue, StatusCode}; use hyper::Body; -use tokio::{io::AsyncWriteExt as _, task::AbortHandle}; +use tokio::io::AsyncWriteExt as _; use uuid::Uuid; #[cfg(test)] @@ -81,11 +81,14 @@ pub struct PushResult { baton: Option, } -pub struct DropAbort(pub AbortHandle); +pub struct DropAbort(pub Option>); impl Drop for DropAbort { fn drop(&mut self) { - self.0.abort(); + tracing::debug!("aborting"); + if let Some(sender) = self.0.take() { + let _ = sender.send(()); + } } } @@ -846,8 +849,6 @@ async fn try_push( let mut frame_no = start_frame_no; while frame_no <= end_frame_no { - tokio::task::yield_now().await; - let batch_size = sync_ctx.push_batch_size.min(end_frame_no - frame_no + 1); let mut frames = conn.wal_get_frame(frame_no, page_size)?; if batch_size > 1 { @@ -895,8 +896,6 @@ pub async fn try_pull( let mut err = None; loop { - tokio::task::yield_now().await; - let generation = sync_ctx.durable_generation(); let frame_no = sync_ctx.durable_frame_num() + 1; match sync_ctx.pull_one_frame(generation, frame_no).await {