Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libsql-server/tests/standalone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ fn basic_metrics() {

let snapshot = snapshot_metrics();
snapshot.assert_counter("libsql_server_libsql_execute_program", 3);
snapshot.assert_counter("libsql_server_user_http_response", 3);
snapshot.assert_counter("libsql_server_user_http_response", 4);

for (key, (_, _, val)) in snapshot.snapshot() {
if key.kind() == metrics_util::MetricKind::Counter
&& key.key().name() == "libsql_client_version"
{
let label = key.key().labels().next().unwrap();
assert!(label.value().starts_with("libsql-remote-"));
assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(3));
assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(4));
}
}

Expand Down
4 changes: 2 additions & 2 deletions libsql/src/hrana/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ where
)
}

pub fn prepare(&self, sql: &str) -> crate::Result<Statement<T>> {
pub async fn prepare(&self, sql: &str) -> crate::Result<Statement<T>> {
let stream = self.current_stream().clone();
Statement::new(stream, sql.to_string(), true)
Statement::new(stream, sql.to_string(), true).await
}
}

Expand Down
15 changes: 12 additions & 3 deletions libsql/src/hrana/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl Conn for HttpConnection<HttpSender> {

async fn prepare(&self, sql: &str) -> crate::Result<Statement> {
let stream = self.current_stream().clone();
let stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?;
let stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?;
Ok(Statement {
inner: Box::new(stmt),
})
Expand Down Expand Up @@ -241,7 +241,16 @@ impl crate::statement::Stmt for crate::hrana::Statement<HttpSender> {
// 2. Even if we do execute query, Hrana doesn't return all info that Column exposes.
// 3. Even if we would like to return some of the column info ie. column [ValueType], this information is not
// present in Hrana [Col] but rather inferred from the row cell type.
vec![]
self.cols
.iter()
.map(|name| crate::Column {
name,
origin_name: None,
table_name: None,
database_name: None,
decl_type: None,
})
.collect()
}
}

Expand Down Expand Up @@ -350,7 +359,7 @@ impl Conn for HranaStream<HttpSender> {
}

async fn prepare(&self, sql: &str) -> crate::Result<Statement> {
let stmt = crate::hrana::Statement::new(self.clone(), sql.to_string(), true)?;
let stmt = crate::hrana::Statement::new(self.clone(), sql.to_string(), true).await?;
Ok(Statement {
inner: Box::new(stmt),
})
Expand Down
12 changes: 11 additions & 1 deletion libsql/src/hrana/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,22 @@ where
stream: HranaStream<T>,
close_stream: bool,
inner: Stmt,
cols: Vec<String>,
}

impl<T> Statement<T>
where
T: HttpSend + Send + Sync + 'static,
{
pub(crate) fn new(stream: HranaStream<T>, sql: String, want_rows: bool) -> crate::Result<Self> {
pub(crate) async fn new(
stream: HranaStream<T>,
sql: String,
want_rows: bool,
) -> crate::Result<Self> {
let desc = stream.describe(&sql).await?;

let cols: Vec<_> = desc.cols.into_iter().map(|col| col.name).collect();

// in SQLite when a multiple statements are glued together into one string, only the first one is
// executed and then a handle to continue execution is returned. However Hrana API doesn't allow
// passing multi-statement strings, so we just pick first one.
Expand All @@ -147,6 +156,7 @@ where
stream,
close_stream,
inner,
cols,
})
}
}
Expand Down
25 changes: 18 additions & 7 deletions libsql/src/replication/connection.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// TODO(lucio): Move this to `remote/mod.rs`

use std::time::Duration;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use libsql_replication::rpc::proxy::{
describe_result, query_result::RowResult, Cond, DescribeResult, ExecuteResults, NotCond,
OkCond, Positional, Query, ResultRows, State as RemoteState, Step,
};
use parking_lot::Mutex;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;

use crate::parser;
use crate::parser::StmtKind;
Expand Down Expand Up @@ -168,7 +168,11 @@ impl From<RemoteState> for State {
}

impl RemoteConnection {
pub(crate) fn new(local: LibsqlConnection, writer: Option<Writer>, max_write_replication_index: Arc<AtomicU64>) -> Self {
pub(crate) fn new(
local: LibsqlConnection,
writer: Option<Writer>,
max_write_replication_index: Arc<AtomicU64>,
) -> Self {
let state = Arc::new(Mutex::new(Inner::default()));
Self {
local,
Expand All @@ -180,9 +184,16 @@ impl RemoteConnection {

fn update_max_write_replication_index(&self, index: Option<u64>) {
if let Some(index) = index {
let mut current = self.max_write_replication_index.load(std::sync::atomic::Ordering::SeqCst);
let mut current = self
.max_write_replication_index
.load(std::sync::atomic::Ordering::SeqCst);
while index > current {
match self.max_write_replication_index.compare_exchange(current, index, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst) {
match self.max_write_replication_index.compare_exchange(
current,
index,
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
) {
Ok(_) => break,
Err(new_current) => current = new_current,
}
Expand Down
2 changes: 1 addition & 1 deletion libsql/src/sync/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Conn for SyncedConnection {
})
} else {
let stmt = Statement {
inner: Box::new(self.remote.prepare(sql)?),
inner: Box::new(self.remote.prepare(sql).await?),
};

if self.read_your_writes {
Expand Down
8 changes: 4 additions & 4 deletions libsql/src/wasm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ where
self.conn.current_stream().clone(),
sql.to_string(),
true,
)?;
).await?;
let rows = stmt.execute(&params.into_params()?).await?;
Ok(rows as u64)
}
Expand Down Expand Up @@ -104,7 +104,7 @@ where
self.conn.current_stream().clone(),
sql.to_string(),
true,
)?;
).await?;
let rows = stmt.query_raw(&params.into_params()?).await?;
Ok(Rows {
inner: Box::new(rows),
Expand Down Expand Up @@ -139,7 +139,7 @@ where
pub async fn query(&self, sql: &str, params: impl IntoParams) -> crate::Result<Rows> {
tracing::trace!("querying `{}`", sql);
let stream = self.inner.stream().clone();
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?;
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?;
let rows = stmt.query_raw(&params.into_params()?).await?;
Ok(Rows {
inner: Box::new(rows),
Expand All @@ -149,7 +149,7 @@ where
pub async fn execute(&self, sql: &str, params: impl IntoParams) -> crate::Result<u64> {
tracing::trace!("executing `{}`", sql);
let stream = self.inner.stream().clone();
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?;
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?;
let rows = stmt.execute(&params.into_params()?).await?;
Ok(rows as u64)
}
Expand Down
Loading