Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libsql/src/hrana/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ where

impl<T> HttpConnection<T>
where
T: HttpSend,
T: HttpSend + Send + Sync + 'static,
{
pub fn new(url: String, token: String, inner: T) -> Self {
// The `libsql://` protocol is an alias for `https://`.
Expand Down
32 changes: 22 additions & 10 deletions libsql/src/hrana/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use libsql_hrana::proto::{Batch, BatchResult, Col, Stmt, StmtResult};
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -122,7 +123,7 @@ where

impl<T> Statement<T>
where
T: HttpSend,
T: HttpSend + Send + Sync + 'static,
{
pub(crate) fn new(stream: HranaStream<T>, sql: String, want_rows: bool) -> crate::Result<Self> {
// in SQLite when a multiple statements are glued together into one string, only the first one is
Expand Down Expand Up @@ -170,20 +171,20 @@ where
pub(crate) async fn query_raw(
&mut self,
params: &Params,
) -> crate::Result<HranaRows<T::Stream>> {
) -> crate::Result<HranaRows<T::Stream, T>> {
let mut stmt = self.inner.clone();
bind_params(params.clone(), &mut stmt);

let cursor = self.stream.cursor(Batch::single(stmt)).await?;
let rows = HranaRows::from_cursor(cursor).await?;
let rows = HranaRows::from_cursor(cursor, self.stream.clone()).await?;

Ok(rows)
}
}

impl<T> Statement<T>
where
T: HttpSend,
T: HttpSend + Send + Sync + 'static,
<T as HttpSend>::Stream: Send + Sync + 'static,
{
pub async fn query(&mut self, params: &Params) -> crate::Result<super::Rows> {
Expand All @@ -192,28 +193,37 @@ where
}
}

pub struct HranaRows<S> {
pub struct HranaRows<S, T: HttpSend> {
cursor_step: OwnedCursorStep<S>,
column_types: Option<Vec<ValueType>>,
stream: HranaStream<T>,
}

impl<S> HranaRows<S>
impl<S, T> HranaRows<S, T>
where
T: HttpSend + Send + Sync + 'static,
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
{
async fn from_cursor(cursor: Cursor<S>) -> Result<Self> {
async fn from_cursor(cursor: Cursor<S>, stream: HranaStream<T>) -> Result<Self> {
let cursor_step = cursor.next_step_owned().await?;
Ok(HranaRows {
cursor_step,
column_types: None,
stream,
})
}

pub async fn next(&mut self) -> crate::Result<Option<super::Row>> {
let row = match self.cursor_step.next().await {
Some(Ok(row)) => row,
Some(Err(e)) => return Err(crate::Error::Hrana(Box::new(e))),
None => return Ok(None),
None => {
self.stream
.inner
.affected_row_count
.store(self.cursor_step.affected_rows().into(), Ordering::SeqCst);
return Ok(None);
}
};

if self.column_types.is_none() {
Expand Down Expand Up @@ -254,17 +264,19 @@ where
}

#[async_trait::async_trait]
impl<S> RowsInner for HranaRows<S>
impl<S, T> RowsInner for HranaRows<S, T>
where
T: HttpSend + Send + Sync + 'static,
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + Unpin,
{
async fn next(&mut self) -> crate::Result<Option<super::Row>> {
self.next().await
}
}

impl<S> ColumnsInner for HranaRows<S>
impl<S, T> ColumnsInner for HranaRows<S, T>
where
T: HttpSend + Send + Sync + 'static,
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + Unpin,
{
fn column_count(&self) -> i32 {
Expand Down
10 changes: 6 additions & 4 deletions libsql/src/hrana/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct HranaStream<T>
where
T: HttpSend,
{
inner: Arc<Inner<T>>,
pub inner: Arc<Inner<T>>,
}

impl<T> Clone for HranaStream<T>
Expand Down Expand Up @@ -94,7 +94,9 @@ where
(0, 0)
};

self.inner.total_changes.fetch_add(affected_row_count, Ordering::SeqCst);
self.inner
.total_changes
.fetch_add(affected_row_count, Ordering::SeqCst);
self.inner
.affected_row_count
.store(affected_row_count, Ordering::SeqCst);
Expand Down Expand Up @@ -279,11 +281,11 @@ where
}

#[derive(Debug)]
struct Inner<T>
pub struct Inner<T>
where
T: HttpSend,
{
affected_row_count: AtomicU64,
pub affected_row_count: AtomicU64,
total_changes: AtomicU64,
last_insert_rowid: AtomicI64,
is_autocommit: AtomicBool,
Expand Down
8 changes: 4 additions & 4 deletions libsql/src/wasm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ cfg_cloudflare! {
#[derive(Debug, Clone)]
pub struct Connection<T>
where
T: HttpSend,
T: HttpSend + Sync + Send + 'static,
{
conn: HttpConnection<T>,
}
Expand All @@ -65,7 +65,7 @@ cfg_cloudflare! {

impl<T> Connection<T>
where
T: HttpSend,
T: HttpSend + Sync + Send + 'static,
<T as HttpSend>::Stream: 'static,
{
pub async fn execute(&self, sql: &str, params: impl IntoParams) -> crate::Result<u64> {
Expand Down Expand Up @@ -126,14 +126,14 @@ where
#[derive(Debug, Clone)]
pub struct Transaction<T>
where
T: HttpSend,
T: HttpSend + Sync + Send + 'static,
{
inner: HttpTransaction<T>,
}

impl<T> Transaction<T>
where
T: HttpSend,
T: HttpSend + Sync + Send + 'static,
<T as HttpSend>::Stream: 'static,
{
pub async fn query(&self, sql: &str, params: impl IntoParams) -> crate::Result<Rows> {
Expand Down
4 changes: 3 additions & 1 deletion libsql/src/wasm/rows.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::hrana::HranaRows;
use crate::hrana::HttpSend;
use crate::Row;
use bytes::Bytes;
use futures::Stream;
Expand Down Expand Up @@ -42,8 +43,9 @@ pub(super) trait RowsInner {
}

#[async_trait::async_trait(?Send)]
impl<S> RowsInner for HranaRows<S>
impl<S, T> RowsInner for HranaRows<S, T>
where
T: HttpSend + Sync + Send + 'static,
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
{
async fn next(&mut self) -> crate::Result<Option<Row>> {
Expand Down
Loading