Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jobs:
runs-on: ubuntu-latest
name: Run Checks
env:
RUST_BACKTRACE: 1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use full?

RUSTFLAGS: -D warnings --cfg tokio_unstable
steps:
- uses: hecrj/setup-rust-action@v2
Expand Down Expand Up @@ -179,6 +180,7 @@ jobs:
runs-on: ubuntu-latest
name: Run Tests
env:
RUST_BACKTRACE: 1
RUSTFLAGS: -D warnings --cfg tokio_unstable
steps:
- uses: hecrj/setup-rust-action@v2
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/tests/standalone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ async fn insert_rows(conn: &Connection, start: u32, count: u32) -> libsql::Resul

async fn insert_rows_with_args(conn: &Connection, start: u32, count: u32) -> libsql::Result<()> {
for i in start..(start + count) {
let mut stmt = conn.prepare("INSERT INTO test(a, b) VALUES(?,?)").await?;
let stmt = conn.prepare("INSERT INTO test(a, b) VALUES(?,?)").await?;
stmt.execute(params![i, i]).await?;
}
Ok(())
Expand Down
12 changes: 6 additions & 6 deletions libsql/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn bench(c: &mut Criterion) {
group.bench_function("in-memory-select-1-prepared", |b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT 1")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand All @@ -113,7 +113,7 @@ fn bench(c: &mut Criterion) {
group.bench_function("in-memory-select-star-from-users-limit-1-unprepared", |b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT * FROM users LIMIT 1")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand All @@ -128,7 +128,7 @@ fn bench(c: &mut Criterion) {
|b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT * FROM users LIMIT 100")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand Down Expand Up @@ -156,7 +156,7 @@ fn bench(c: &mut Criterion) {
group.bench_function("local-replica-select-1-prepared", |b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT 1")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand Down Expand Up @@ -188,7 +188,7 @@ fn bench(c: &mut Criterion) {
|b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT * FROM users LIMIT 1")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand All @@ -204,7 +204,7 @@ fn bench(c: &mut Criterion) {
|b| {
b.to_async(&rt).iter_batched(
|| block_on(conn.prepare("SELECT * FROM users LIMIT 100")).unwrap(),
|mut stmt| async move {
|stmt| async move {
let mut rows = stmt.query(()).await.unwrap();
let row = rows.next().await.unwrap().unwrap();
assert_eq!(row.get::<i32>(0).unwrap(), 1);
Expand Down
4 changes: 2 additions & 2 deletions libsql/examples/deserialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ async fn main() {
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("INSERT INTO users (name, age, vision, avatar) VALUES (?1, ?2, ?3, ?4)")
.await
.unwrap();
stmt.execute(("Ferris the Crab", 8, -6.5, vec![1, 2, 3]))
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("SELECT * FROM users WHERE name = ?1")
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions libsql/examples/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ async fn main() {
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("INSERT INTO users (email) VALUES (?1)")
.await
.unwrap();

stmt.execute(["foo@example.com"]).await.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("SELECT * FROM users WHERE email = ?1")
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions libsql/examples/example_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ async fn main() {
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("INSERT INTO users (email) VALUES (?1)")
.await
.unwrap();

stmt.execute(["foo@example.com"]).await.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("SELECT * FROM users WHERE email = ?1")
.await
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions libsql/examples/flutter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ async fn main() {
.await
.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("INSERT INTO users (email) VALUES (?1)")
.await
.unwrap();

stmt.execute(["foo@example.com"]).await.unwrap();

let mut stmt = conn
let stmt = conn
.prepare("SELECT * FROM users WHERE email = ?1")
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion libsql/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl Connection {
/// For more info on how to pass params check [`IntoParams`]'s docs and on how to
/// extract values out of the rows check the [`Rows`] docs.
pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
let mut stmt = self.prepare(sql).await?;
let stmt = self.prepare(sql).await?;

stmt.query(params).await
}
Expand Down
10 changes: 5 additions & 5 deletions libsql/src/hrana/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,25 +237,25 @@ impl Conn for HttpConnection<HttpSender> {
impl crate::statement::Stmt for crate::hrana::Statement<HttpSender> {
fn finalize(&mut self) {}

async fn execute(&mut self, params: &Params) -> crate::Result<usize> {
async fn execute(&self, params: &Params) -> crate::Result<usize> {
self.execute(params).await
}

async fn query(&mut self, params: &Params) -> crate::Result<Rows> {
async fn query(&self, params: &Params) -> crate::Result<Rows> {
self.query(params).await
}

async fn run(&mut self, params: &Params) -> crate::Result<()> {
async fn run(&self, params: &Params) -> crate::Result<()> {
self.run(params).await
}

fn interrupt(&mut self) -> crate::Result<()> {
fn interrupt(&self) -> crate::Result<()> {
Err(crate::Error::Misuse(
"interrupt is not supported for remote connections".to_string(),
))
}

fn reset(&mut self) {}
fn reset(&self) {}

fn parameter_count(&self) -> usize {
let stmt = &self.inner;
Expand Down
8 changes: 4 additions & 4 deletions libsql/src/hrana/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ where
}
}

pub async fn execute(&mut self, params: &Params) -> crate::Result<usize> {
pub async fn execute(&self, params: &Params) -> crate::Result<usize> {
let mut stmt = self.inner.clone();
bind_params(params.clone(), &mut stmt);

let result = self.stream.execute_inner(stmt, self.close_stream).await?;
Ok(result.affected_row_count as usize)
}

pub async fn run(&mut self, params: &Params) -> crate::Result<()> {
pub async fn run(&self, params: &Params) -> crate::Result<()> {
let mut stmt = self.inner.clone();
bind_params(params.clone(), &mut stmt);

Expand All @@ -179,7 +179,7 @@ where
}

pub(crate) async fn query_raw(
&mut self,
&self,
params: &Params,
) -> crate::Result<HranaRows<T::Stream, T>> {
let mut stmt = self.inner.clone();
Expand All @@ -197,7 +197,7 @@ where
T: HttpSend + Send + Sync + 'static,
<T as HttpSend>::Stream: Send + Sync + 'static,
{
pub async fn query(&mut self, params: &Params) -> crate::Result<super::Rows> {
pub async fn query(&self, params: &Params) -> crate::Result<super::Rows> {
let rows = self.query_raw(params).await?;
Ok(super::Rows::new(rows))
}
Expand Down
34 changes: 18 additions & 16 deletions libsql/src/hrana/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use libsql_hrana::proto::{
GetAutocommitStreamReq, PipelineReqBody, PipelineRespBody, SequenceStreamReq,
StoreSqlStreamReq, StreamRequest, StreamResponse, StreamResult,
};
use std::cell::RefCell;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
Expand Down Expand Up @@ -66,8 +67,8 @@ where
pipeline_url,
cursor_url,
auth_token,
sql_id_generator: 0,
baton: None,
sql_id_generator: RefCell::new(0),
baton: RefCell::new(None),
}),
}),
}
Expand All @@ -77,7 +78,7 @@ where
/// Returns true if request was finalized correctly, false if stream was already closed.
pub(super) async fn finalize(&mut self, req: StreamRequest) -> Result<bool> {
let mut client = self.inner.stream.lock().await;
if client.baton.is_none() {
if client.baton.borrow().is_none() {
tracing::trace!("baton not found - skipping finalize for {:?}", req);
return Ok(false);
}
Expand Down Expand Up @@ -298,11 +299,11 @@ where
T: HttpSend,
{
client: T,
baton: Option<String>,
baton: RefCell<Option<String>>,
pipeline_url: Arc<str>,
cursor_url: Arc<str>,
auth_token: Arc<str>,
sql_id_generator: SqlId,
sql_id_generator: RefCell<SqlId>,
}

impl<T> RawStream<T>
Expand All @@ -316,7 +317,7 @@ where

pub async fn open_cursor(&mut self, batch: Batch) -> Result<Cursor<T::Stream>> {
let msg = CursorReq {
baton: self.baton.clone(),
baton: self.baton.borrow().clone(),
batch,
};
let body = serde_json::to_string(&msg).map_err(HranaError::Json)?;
Expand All @@ -336,7 +337,7 @@ where
} // stream has been closed by the server
Some(baton) => {
tracing::trace!("client stream has been assigned with baton: `{}`", baton);
self.baton = Some(baton)
*self.baton.borrow_mut() = Some(baton)
}
}
Ok(cursor)
Expand All @@ -349,11 +350,11 @@ where
tracing::trace!(
"client stream sending {} requests with baton `{}`: {:?}",
N,
self.baton.as_deref().unwrap_or_default(),
self.baton.borrow().as_deref().unwrap_or_default(),
requests
);
let msg = PipelineReqBody {
baton: self.baton.clone(),
baton: self.baton.borrow().clone(),
requests: Vec::from(requests),
};
let body = serde_json::to_string(&msg).map_err(HranaError::Json)?;
Expand All @@ -375,7 +376,7 @@ where
} // stream has been closed by the server
Some(baton) => {
tracing::trace!("client stream has been assigned with baton: `{}`", baton);
self.baton = Some(baton)
*self.baton.borrow_mut() = Some(baton)
}
}

Expand Down Expand Up @@ -424,16 +425,17 @@ where
Ok((resp, is_autocommit))
}

fn reset(&mut self) {
if let Some(baton) = self.baton.take() {
fn reset(&self) {
if let Some(baton) = self.baton.borrow_mut().take() {
tracing::trace!("closing client stream (baton: `{}`)", baton);
}
self.sql_id_generator = 0;
*self.sql_id_generator.borrow_mut() = 0;
}

fn next_sql_id(&mut self) -> SqlId {
self.sql_id_generator = self.sql_id_generator.wrapping_add(1);
self.sql_id_generator
let mut gen = self.sql_id_generator.borrow_mut();
*gen = gen.wrapping_add(1);
*gen
}
}

Expand All @@ -443,7 +445,7 @@ where
T: HttpSend,
{
fn drop(&mut self) {
if let Some(baton) = self.baton.take() {
if let Some(baton) = self.baton.borrow_mut().take() {
// only send a close request if stream was ever used to send the data
tracing::trace!("closing client stream (baton: `{}`)", baton);
let req = serde_json::to_string(&PipelineReqBody {
Expand Down
10 changes: 5 additions & 5 deletions libsql/src/local/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,32 @@ impl Stmt for LibsqlStmt {
self.0.finalize();
}

async fn execute(&mut self, params: &Params) -> Result<usize> {
async fn execute(&self, params: &Params) -> Result<usize> {
let params = params.clone();
let stmt = self.0.clone();

stmt.execute(&params).map(|i| i as usize)
}

async fn query(&mut self, params: &Params) -> Result<Rows> {
async fn query(&self, params: &Params) -> Result<Rows> {
let params = params.clone();
let stmt = self.0.clone();

stmt.query(&params).map(LibsqlRows).map(Rows::new)
}

async fn run(&mut self, params: &Params) -> Result<()> {
async fn run(&self, params: &Params) -> Result<()> {
let params = params.clone();
let stmt = self.0.clone();

stmt.run(&params)
}

fn interrupt(&mut self) -> Result<()> {
fn interrupt(&self) -> Result<()> {
self.0.interrupt()
}

fn reset(&mut self) {
fn reset(&self) {
self.0.reset();
}

Expand Down
Loading
Loading