diff --git a/libsql-server/tests/standalone/mod.rs b/libsql-server/tests/standalone/mod.rs index 0b3c631e75..046f147d4d 100644 --- a/libsql-server/tests/standalone/mod.rs +++ b/libsql-server/tests/standalone/mod.rs @@ -99,7 +99,7 @@ fn basic_metrics() { let snapshot = snapshot_metrics(); snapshot.assert_counter("libsql_server_libsql_execute_program", 3); - snapshot.assert_counter("libsql_server_user_http_response", 3); + snapshot.assert_counter("libsql_server_user_http_response", 4); for (key, (_, _, val)) in snapshot.snapshot() { if key.kind() == metrics_util::MetricKind::Counter @@ -107,7 +107,7 @@ fn basic_metrics() { { let label = key.key().labels().next().unwrap(); assert!(label.value().starts_with("libsql-remote-")); - assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(3)); + assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(4)); } } diff --git a/libsql/src/hrana/connection.rs b/libsql/src/hrana/connection.rs index ba8201230f..e6030ff7a5 100644 --- a/libsql/src/hrana/connection.rs +++ b/libsql/src/hrana/connection.rs @@ -80,9 +80,9 @@ where ) } - pub fn prepare(&self, sql: &str) -> crate::Result> { + pub async fn prepare(&self, sql: &str) -> crate::Result> { let stream = self.current_stream().clone(); - Statement::new(stream, sql.to_string(), true) + Statement::new(stream, sql.to_string(), true).await } } diff --git a/libsql/src/hrana/hyper.rs b/libsql/src/hrana/hyper.rs index 9ed0880af8..57840f56f4 100644 --- a/libsql/src/hrana/hyper.rs +++ b/libsql/src/hrana/hyper.rs @@ -131,7 +131,7 @@ impl Conn for HttpConnection { async fn prepare(&self, sql: &str) -> crate::Result { let stream = self.current_stream().clone(); - let stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?; + let stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?; Ok(Statement { inner: Box::new(stmt), }) @@ -241,7 +241,16 @@ impl crate::statement::Stmt for crate::hrana::Statement { // 2. Even if we do execute query, Hrana doesn't return all info that Column exposes. // 3. Even if we would like to return some of the column info ie. column [ValueType], this information is not // present in Hrana [Col] but rather inferred from the row cell type. - vec![] + self.cols + .iter() + .map(|name| crate::Column { + name, + origin_name: None, + table_name: None, + database_name: None, + decl_type: None, + }) + .collect() } } @@ -350,7 +359,7 @@ impl Conn for HranaStream { } async fn prepare(&self, sql: &str) -> crate::Result { - let stmt = crate::hrana::Statement::new(self.clone(), sql.to_string(), true)?; + let stmt = crate::hrana::Statement::new(self.clone(), sql.to_string(), true).await?; Ok(Statement { inner: Box::new(stmt), }) diff --git a/libsql/src/hrana/mod.rs b/libsql/src/hrana/mod.rs index 2e0d6cf4ae..b29d5e4d34 100644 --- a/libsql/src/hrana/mod.rs +++ b/libsql/src/hrana/mod.rs @@ -119,13 +119,22 @@ where stream: HranaStream, close_stream: bool, inner: Stmt, + cols: Vec, } impl Statement where T: HttpSend + Send + Sync + 'static, { - pub(crate) fn new(stream: HranaStream, sql: String, want_rows: bool) -> crate::Result { + pub(crate) async fn new( + stream: HranaStream, + sql: String, + want_rows: bool, + ) -> crate::Result { + let desc = stream.describe(&sql).await?; + + let cols: Vec<_> = desc.cols.into_iter().map(|col| col.name).collect(); + // in SQLite when a multiple statements are glued together into one string, only the first one is // executed and then a handle to continue execution is returned. However Hrana API doesn't allow // passing multi-statement strings, so we just pick first one. @@ -147,6 +156,7 @@ where stream, close_stream, inner, + cols, }) } } diff --git a/libsql/src/replication/connection.rs b/libsql/src/replication/connection.rs index e92954e29d..576e6179de 100644 --- a/libsql/src/replication/connection.rs +++ b/libsql/src/replication/connection.rs @@ -1,14 +1,14 @@ // TODO(lucio): Move this to `remote/mod.rs` -use std::time::Duration; -use std::str::FromStr; -use std::sync::Arc; -use std::sync::atomic::AtomicU64; use libsql_replication::rpc::proxy::{ describe_result, query_result::RowResult, Cond, DescribeResult, ExecuteResults, NotCond, OkCond, Positional, Query, ResultRows, State as RemoteState, Step, }; use parking_lot::Mutex; +use std::str::FromStr; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use std::time::Duration; use crate::parser; use crate::parser::StmtKind; @@ -168,7 +168,11 @@ impl From for State { } impl RemoteConnection { - pub(crate) fn new(local: LibsqlConnection, writer: Option, max_write_replication_index: Arc) -> Self { + pub(crate) fn new( + local: LibsqlConnection, + writer: Option, + max_write_replication_index: Arc, + ) -> Self { let state = Arc::new(Mutex::new(Inner::default())); Self { local, @@ -180,9 +184,16 @@ impl RemoteConnection { fn update_max_write_replication_index(&self, index: Option) { if let Some(index) = index { - let mut current = self.max_write_replication_index.load(std::sync::atomic::Ordering::SeqCst); + let mut current = self + .max_write_replication_index + .load(std::sync::atomic::Ordering::SeqCst); while index > current { - match self.max_write_replication_index.compare_exchange(current, index, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst) { + match self.max_write_replication_index.compare_exchange( + current, + index, + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + ) { Ok(_) => break, Err(new_current) => current = new_current, } diff --git a/libsql/src/sync/connection.rs b/libsql/src/sync/connection.rs index dbd85d0cdd..807e49a2f4 100644 --- a/libsql/src/sync/connection.rs +++ b/libsql/src/sync/connection.rs @@ -128,7 +128,7 @@ impl Conn for SyncedConnection { }) } else { let stmt = Statement { - inner: Box::new(self.remote.prepare(sql)?), + inner: Box::new(self.remote.prepare(sql).await?), }; if self.read_your_writes { diff --git a/libsql/src/wasm/mod.rs b/libsql/src/wasm/mod.rs index 6345086bd5..6bcd133303 100644 --- a/libsql/src/wasm/mod.rs +++ b/libsql/src/wasm/mod.rs @@ -74,7 +74,7 @@ where self.conn.current_stream().clone(), sql.to_string(), true, - )?; + ).await?; let rows = stmt.execute(¶ms.into_params()?).await?; Ok(rows as u64) } @@ -104,7 +104,7 @@ where self.conn.current_stream().clone(), sql.to_string(), true, - )?; + ).await?; let rows = stmt.query_raw(¶ms.into_params()?).await?; Ok(Rows { inner: Box::new(rows), @@ -139,7 +139,7 @@ where pub async fn query(&self, sql: &str, params: impl IntoParams) -> crate::Result { tracing::trace!("querying `{}`", sql); let stream = self.inner.stream().clone(); - let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?; + let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?; let rows = stmt.query_raw(¶ms.into_params()?).await?; Ok(Rows { inner: Box::new(rows), @@ -149,7 +149,7 @@ where pub async fn execute(&self, sql: &str, params: impl IntoParams) -> crate::Result { tracing::trace!("executing `{}`", sql); let stream = self.inner.stream().clone(); - let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?; + let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?; let rows = stmt.execute(¶ms.into_params()?).await?; Ok(rows as u64) }