@@ -8,11 +8,14 @@ use crate::{
88 sync:: SyncContext ,
99 BatchRows , Error , Result , Statement , Transaction , TransactionBehavior ,
1010} ;
11- use std:: sync:: Arc ;
11+ use std:: sync:: {
12+ atomic:: { AtomicBool , Ordering } ,
13+ Arc ,
14+ } ;
1215use std:: time:: Duration ;
1316use tokio:: sync:: Mutex ;
1417
15- use super :: { statement :: SyncedStatement , transaction:: SyncedTx } ;
18+ use super :: transaction:: SyncedTx ;
1619
1720#[ derive( Clone ) ]
1821pub struct SyncedConnection {
@@ -21,6 +24,7 @@ pub struct SyncedConnection {
2124 pub read_your_writes : bool ,
2225 pub context : Arc < Mutex < SyncContext > > ,
2326 pub state : Arc < Mutex < State > > ,
27+ pub needs_pull : Arc < AtomicBool > ,
2428}
2529
2630impl SyncedConnection {
@@ -89,7 +93,7 @@ impl SyncedConnection {
8993 _ => {
9094 * state = predicted_end_state;
9195 false
92- } ,
96+ }
9397 } ;
9498
9599 Ok ( should_execute_local)
@@ -106,6 +110,10 @@ impl Conn for SyncedConnection {
106110
107111 async fn execute_batch ( & self , sql : & str ) -> Result < BatchRows > {
108112 if self . should_execute_local ( sql) . await ? {
113+ if self . needs_pull . swap ( false , Ordering :: Relaxed ) {
114+ let mut context = self . context . lock ( ) . await ;
115+ crate :: sync:: try_pull ( & mut context, & self . local ) . await ?;
116+ }
109117 self . local . execute_batch ( sql)
110118 } else {
111119 self . remote . execute_batch ( sql) . await
@@ -114,6 +122,10 @@ impl Conn for SyncedConnection {
114122
115123 async fn execute_transactional_batch ( & self , sql : & str ) -> Result < BatchRows > {
116124 if self . should_execute_local ( sql) . await ? {
125+ if self . needs_pull . swap ( false , Ordering :: Relaxed ) {
126+ let mut context = self . context . lock ( ) . await ;
127+ crate :: sync:: try_pull ( & mut context, & self . local ) . await ?;
128+ }
117129 self . local . execute_transactional_batch ( sql) ?;
118130 Ok ( BatchRows :: empty ( ) )
119131 } else {
@@ -123,6 +135,10 @@ impl Conn for SyncedConnection {
123135
124136 async fn prepare ( & self , sql : & str ) -> Result < Statement > {
125137 if self . should_execute_local ( sql) . await ? {
138+ if self . needs_pull . swap ( false , Ordering :: Relaxed ) {
139+ let mut context = self . context . lock ( ) . await ;
140+ crate :: sync:: try_pull ( & mut context, & self . local ) . await ?;
141+ }
126142 Ok ( Statement {
127143 inner : Box :: new ( LibsqlStmt ( self . local . prepare ( sql) ?) ) ,
128144 } )
@@ -132,16 +148,10 @@ impl Conn for SyncedConnection {
132148 } ;
133149
134150 if self . read_your_writes {
135- Ok ( Statement {
136- inner : Box :: new ( SyncedStatement {
137- conn : self . local . clone ( ) ,
138- context : self . context . clone ( ) ,
139- inner : stmt,
140- } ) ,
141- } )
142- } else {
143- Ok ( stmt)
151+ self . needs_pull . store ( true , Ordering :: Relaxed ) ;
144152 }
153+
154+ Ok ( stmt)
145155 }
146156 }
147157
0 commit comments