Skip to content

Commit d2fb9a8

Browse files
committed
fix: pull on read
1 parent c85ec3d commit d2fb9a8

4 files changed

Lines changed: 19 additions & 80 deletions

File tree

libsql/src/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,7 @@ impl Database {
712712
read_your_writes: *read_your_writes,
713713
context: db.sync_ctx.clone().unwrap(),
714714
state: std::sync::Arc::new(Mutex::new(State::Init)),
715+
needs_pull: std::sync::atomic::AtomicBool::new(false).into(),
715716
};
716717

717718
let conn = std::sync::Arc::new(synced);

libsql/src/sync.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use uuid::Uuid;
1313
mod test;
1414

1515
pub mod connection;
16-
pub mod statement;
1716
pub mod transaction;
1817

1918
const METADATA_VERSION: u32 = 0;

libsql/src/sync/connection.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use crate::{
88
sync::SyncContext,
99
BatchRows, Error, Result, Statement, Transaction, TransactionBehavior,
1010
};
11-
use std::sync::Arc;
11+
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
1212
use std::time::Duration;
1313
use tokio::sync::Mutex;
1414

15-
use super::{statement::SyncedStatement, transaction::SyncedTx};
15+
use super::transaction::SyncedTx;
1616

1717
#[derive(Clone)]
1818
pub struct SyncedConnection {
@@ -21,6 +21,7 @@ pub struct SyncedConnection {
2121
pub read_your_writes: bool,
2222
pub context: Arc<Mutex<SyncContext>>,
2323
pub state: Arc<Mutex<State>>,
24+
pub needs_pull: Arc<AtomicBool>,
2425
}
2526

2627
impl SyncedConnection {
@@ -106,6 +107,10 @@ impl Conn for SyncedConnection {
106107

107108
async fn execute_batch(&self, sql: &str) -> Result<BatchRows> {
108109
if self.should_execute_local(sql).await? {
110+
if self.needs_pull.swap(false, Ordering::Relaxed) {
111+
let mut context = self.context.lock().await;
112+
crate::sync::try_pull(&mut context, &self.local).await?;
113+
}
109114
self.local.execute_batch(sql)
110115
} else {
111116
self.remote.execute_batch(sql).await
@@ -114,6 +119,10 @@ impl Conn for SyncedConnection {
114119

115120
async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows> {
116121
if self.should_execute_local(sql).await? {
122+
if self.needs_pull.swap(false, Ordering::Relaxed) {
123+
let mut context = self.context.lock().await;
124+
crate::sync::try_pull(&mut context, &self.local).await?;
125+
}
117126
self.local.execute_transactional_batch(sql)?;
118127
Ok(BatchRows::empty())
119128
} else {
@@ -123,6 +132,10 @@ impl Conn for SyncedConnection {
123132

124133
async fn prepare(&self, sql: &str) -> Result<Statement> {
125134
if self.should_execute_local(sql).await? {
135+
if self.needs_pull.swap(false, Ordering::Relaxed) {
136+
let mut context = self.context.lock().await;
137+
crate::sync::try_pull(&mut context, &self.local).await?;
138+
}
126139
Ok(Statement {
127140
inner: Box::new(LibsqlStmt(self.local.prepare(sql)?)),
128141
})
@@ -131,17 +144,9 @@ impl Conn for SyncedConnection {
131144
inner: Box::new(self.remote.prepare(sql).await?),
132145
};
133146

134-
if self.read_your_writes {
135-
Ok(Statement {
136-
inner: Box::new(SyncedStatement {
137-
conn: self.local.clone(),
138-
context: self.context.clone(),
139-
inner: stmt,
140-
}),
141-
})
142-
} else {
143-
Ok(stmt)
144-
}
147+
self.needs_pull.store(true, Ordering::Relaxed);
148+
149+
Ok(stmt)
145150
}
146151
}
147152

libsql/src/sync/statement.rs

Lines changed: 0 additions & 66 deletions
This file was deleted.

0 commit comments

Comments
 (0)