diff --git a/libsql/src/hrana/connection.rs b/libsql/src/hrana/connection.rs index 2e54e9e45e..ba8201230f 100644 --- a/libsql/src/hrana/connection.rs +++ b/libsql/src/hrana/connection.rs @@ -28,7 +28,7 @@ where impl HttpConnection where - T: HttpSend, + T: HttpSend + Send + Sync + 'static, { pub fn new(url: String, token: String, inner: T) -> Self { // The `libsql://` protocol is an alias for `https://`. diff --git a/libsql/src/hrana/mod.rs b/libsql/src/hrana/mod.rs index 432ad6eea4..2e0d6cf4ae 100644 --- a/libsql/src/hrana/mod.rs +++ b/libsql/src/hrana/mod.rs @@ -21,6 +21,7 @@ use libsql_hrana::proto::{Batch, BatchResult, Col, Stmt, StmtResult}; use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; @@ -122,7 +123,7 @@ where impl Statement where - T: HttpSend, + T: HttpSend + Send + Sync + 'static, { pub(crate) fn new(stream: HranaStream, sql: String, want_rows: bool) -> crate::Result { // in SQLite when a multiple statements are glued together into one string, only the first one is @@ -170,12 +171,12 @@ where pub(crate) async fn query_raw( &mut self, params: &Params, - ) -> crate::Result> { + ) -> crate::Result> { let mut stmt = self.inner.clone(); bind_params(params.clone(), &mut stmt); let cursor = self.stream.cursor(Batch::single(stmt)).await?; - let rows = HranaRows::from_cursor(cursor).await?; + let rows = HranaRows::from_cursor(cursor, self.stream.clone()).await?; Ok(rows) } @@ -183,7 +184,7 @@ where impl Statement where - T: HttpSend, + T: HttpSend + Send + Sync + 'static, ::Stream: Send + Sync + 'static, { pub async fn query(&mut self, params: &Params) -> crate::Result { @@ -192,20 +193,23 @@ where } } -pub struct HranaRows { +pub struct HranaRows { cursor_step: OwnedCursorStep, column_types: Option>, + stream: HranaStream, } -impl HranaRows +impl HranaRows where + T: HttpSend + Send + Sync + 'static, S: Stream> + Unpin, { - async fn from_cursor(cursor: Cursor) -> Result { + async fn from_cursor(cursor: Cursor, stream: HranaStream) -> Result { let cursor_step = cursor.next_step_owned().await?; Ok(HranaRows { cursor_step, column_types: None, + stream, }) } @@ -213,7 +217,13 @@ where let row = match self.cursor_step.next().await { Some(Ok(row)) => row, Some(Err(e)) => return Err(crate::Error::Hrana(Box::new(e))), - None => return Ok(None), + None => { + self.stream + .inner + .affected_row_count + .store(self.cursor_step.affected_rows().into(), Ordering::SeqCst); + return Ok(None); + } }; if self.column_types.is_none() { @@ -254,8 +264,9 @@ where } #[async_trait::async_trait] -impl RowsInner for HranaRows +impl RowsInner for HranaRows where + T: HttpSend + Send + Sync + 'static, S: Stream> + Send + Sync + Unpin, { async fn next(&mut self) -> crate::Result> { @@ -263,8 +274,9 @@ where } } -impl ColumnsInner for HranaRows +impl ColumnsInner for HranaRows where + T: HttpSend + Send + Sync + 'static, S: Stream> + Send + Sync + Unpin, { fn column_count(&self) -> i32 { diff --git a/libsql/src/hrana/stream.rs b/libsql/src/hrana/stream.rs index b27e48145e..c63f600c75 100644 --- a/libsql/src/hrana/stream.rs +++ b/libsql/src/hrana/stream.rs @@ -30,7 +30,7 @@ pub struct HranaStream where T: HttpSend, { - inner: Arc>, + pub inner: Arc>, } impl Clone for HranaStream @@ -94,7 +94,9 @@ where (0, 0) }; - self.inner.total_changes.fetch_add(affected_row_count, Ordering::SeqCst); + self.inner + .total_changes + .fetch_add(affected_row_count, Ordering::SeqCst); self.inner .affected_row_count .store(affected_row_count, Ordering::SeqCst); @@ -279,11 +281,11 @@ where } #[derive(Debug)] -struct Inner +pub struct Inner where T: HttpSend, { - affected_row_count: AtomicU64, + pub affected_row_count: AtomicU64, total_changes: AtomicU64, last_insert_rowid: AtomicI64, is_autocommit: AtomicBool, diff --git a/libsql/src/wasm/mod.rs b/libsql/src/wasm/mod.rs index 8dfe9188c6..6345086bd5 100644 --- a/libsql/src/wasm/mod.rs +++ b/libsql/src/wasm/mod.rs @@ -48,7 +48,7 @@ cfg_cloudflare! { #[derive(Debug, Clone)] pub struct Connection where - T: HttpSend, + T: HttpSend + Sync + Send + 'static, { conn: HttpConnection, } @@ -65,7 +65,7 @@ cfg_cloudflare! { impl Connection where - T: HttpSend, + T: HttpSend + Sync + Send + 'static, ::Stream: 'static, { pub async fn execute(&self, sql: &str, params: impl IntoParams) -> crate::Result { @@ -126,14 +126,14 @@ where #[derive(Debug, Clone)] pub struct Transaction where - T: HttpSend, + T: HttpSend + Sync + Send + 'static, { inner: HttpTransaction, } impl Transaction where - T: HttpSend, + T: HttpSend + Sync + Send + 'static, ::Stream: 'static, { pub async fn query(&self, sql: &str, params: impl IntoParams) -> crate::Result { diff --git a/libsql/src/wasm/rows.rs b/libsql/src/wasm/rows.rs index c19a94a62b..954dd02d38 100644 --- a/libsql/src/wasm/rows.rs +++ b/libsql/src/wasm/rows.rs @@ -1,4 +1,5 @@ use crate::hrana::HranaRows; +use crate::hrana::HttpSend; use crate::Row; use bytes::Bytes; use futures::Stream; @@ -42,8 +43,9 @@ pub(super) trait RowsInner { } #[async_trait::async_trait(?Send)] -impl RowsInner for HranaRows +impl RowsInner for HranaRows where + T: HttpSend + Sync + Send + 'static, S: Stream> + Unpin, { async fn next(&mut self) -> crate::Result> {