1+ use std:: future:: poll_fn;
12use std:: pin:: Pin ;
23use std:: sync:: Arc ;
34use std:: task:: { Context , Poll } ;
45
6+ use futures:: stream:: FuturesUnordered ;
57use futures:: Stream ;
68use libsql_replication:: rpc:: replication:: replication_log_server:: ReplicationLogServer ;
79use libsql_replication:: rpc:: replication:: { BoxReplicationService , NAMESPACE_METADATA_KEY } ;
@@ -38,6 +40,9 @@ pub async fn run_rpc_server<A: Accept>(
3840 // Build the tonic server with services
3941 let idle_layer = option_layer ( idle_shutdown_layer) ;
4042 let mut server = tonic:: transport:: Server :: builder ( )
43+ . max_decoding_message_size ( 64 * 1024 * 1024 ) // 64MB max request
44+ . max_encoding_message_size ( 64 * 1024 * 1024 ) // 64MB max response
45+ . timeout ( std:: time:: Duration :: from_secs ( 60 ) ) // Request timeout
4146 . layer ( & idle_layer)
4247 . layer (
4348 tower_http:: trace:: TraceLayer :: new_for_grpc ( )
@@ -109,16 +114,22 @@ pub async fn run_rpc_server<A: Accept>(
109114}
110115
111116/// Custom stream for accepting TLS connections
117+ /// Properly manages pending TLS handshakes and yields them when complete
112118struct TlsIncomingStream < A : Accept > {
113119 acceptor : A ,
114120 tls_acceptor : TlsAcceptor ,
121+ pending_handshakes :
122+ FuturesUnordered < tokio:: task:: JoinHandle < Result < TlsStream < A :: Connection > , anyhow:: Error > > > ,
123+ acceptor_closed : bool ,
115124}
116125
117126impl < A : Accept > TlsIncomingStream < A > {
118127 fn new ( acceptor : A , tls_acceptor : TlsAcceptor ) -> Self {
119128 Self {
120129 acceptor,
121130 tls_acceptor,
131+ pending_handshakes : FuturesUnordered :: new ( ) ,
132+ acceptor_closed : false ,
122133 }
123134 }
124135}
@@ -128,32 +139,61 @@ impl<A: Accept> Stream for TlsIncomingStream<A> {
128139
129140 fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
130141 let this = self . get_mut ( ) ;
131- match Pin :: new ( & mut this. acceptor ) . poll_accept ( cx) {
132- Poll :: Ready ( Some ( Ok ( conn) ) ) => {
133- let tls_acceptor = this. tls_acceptor . clone ( ) ;
134- // Spawn a task to handle TLS handshake
135- tokio:: spawn ( async move {
136- match tls_acceptor. accept ( conn) . await {
137- Ok ( tls_stream) => Ok ( TlsStream ( tls_stream) ) ,
138- Err ( err) => {
139- tracing:: error!( "failed to perform tls handshake: {:#}" , err) ;
140- Err ( anyhow:: anyhow!( "TLS handshake failed: {}" , err) )
142+
143+ // Try to accept a new connection if acceptor is not closed
144+ if !this. acceptor_closed {
145+ match Pin :: new ( & mut this. acceptor ) . poll_accept ( cx) {
146+ Poll :: Ready ( Some ( Ok ( conn) ) ) => {
147+ let tls_acceptor = this. tls_acceptor . clone ( ) ;
148+ // Spawn TLS handshake and track it
149+ let handle = tokio:: spawn ( async move {
150+ match tls_acceptor. accept ( conn) . await {
151+ Ok ( tls_stream) => Ok ( TlsStream ( tls_stream) ) ,
152+ Err ( err) => {
153+ tracing:: error!( "failed to perform tls handshake: {:#}" , err) ;
154+ Err ( anyhow:: anyhow!( "TLS handshake failed: {}" , err) )
155+ }
141156 }
142- }
143- } ) ;
144- // For now, just pend and let the next poll handle it
145- // This is a simplified version - in production, we'd need proper handling
146- cx. waker ( ) . wake_by_ref ( ) ;
147- Poll :: Pending
157+ } ) ;
158+ this. pending_handshakes . push ( handle) ;
159+ }
160+ Poll :: Ready ( Some ( Err ( e) ) ) => {
161+ tracing:: error!( "Accept error: {}" , e) ;
162+ }
163+ Poll :: Ready ( None ) => {
164+ this. acceptor_closed = true ;
165+ }
166+ Poll :: Pending => { }
148167 }
149- Poll :: Ready ( Some ( Err ( e) ) ) => {
150- tracing:: error!( "Accept error: {}" , e) ;
151- cx. waker ( ) . wake_by_ref ( ) ;
152- Poll :: Pending
168+ }
169+
170+ // Poll pending handshakes for any completed ones
171+ if !this. pending_handshakes . is_empty ( ) {
172+ match Pin :: new ( & mut this. pending_handshakes ) . poll_next ( cx) {
173+ Poll :: Ready ( Some ( Ok ( result) ) ) => return Poll :: Ready ( Some ( result) ) ,
174+ Poll :: Ready ( Some ( Err ( e) ) ) => {
175+ tracing:: error!( "TLS handshake task panicked: {}" , e) ;
176+ return Poll :: Ready ( Some ( Err ( anyhow:: anyhow!(
177+ "TLS handshake panicked: {}" ,
178+ e
179+ ) ) ) ) ;
180+ }
181+ Poll :: Ready ( None ) => {
182+ // No more pending handshakes
183+ if this. acceptor_closed {
184+ return Poll :: Ready ( None ) ;
185+ }
186+ }
187+ Poll :: Pending => { }
153188 }
154- Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
155- Poll :: Pending => Poll :: Pending ,
156189 }
190+
191+ // If acceptor is closed and no pending handshakes, we're done
192+ if this. acceptor_closed && this. pending_handshakes . is_empty ( ) {
193+ return Poll :: Ready ( None ) ;
194+ }
195+
196+ Poll :: Pending
157197 }
158198}
159199
@@ -164,47 +204,33 @@ fn tls_incoming_stream<A: Accept>(
164204 TlsIncomingStream :: new ( acceptor, tls_acceptor)
165205}
166206
167- /// Custom stream for accepting plain (non-TLS) connections
168- struct PlainIncomingStream < A : Accept > {
207+ fn plain_incoming_stream < A : Accept > (
169208 acceptor : A ,
170- }
171-
172- impl < A : Accept > PlainIncomingStream < A > {
173- fn new ( acceptor : A ) -> Self {
174- Self { acceptor }
175- }
176- }
177-
178- impl < A : Accept > Stream for PlainIncomingStream < A > {
179- type Item = Result < A :: Connection , anyhow:: Error > ;
209+ ) -> impl Stream < Item = Result < A :: Connection , anyhow:: Error > >
210+ where
211+ A : Accept ,
212+ {
213+ tracing:: info!( "Starting plain incoming stream" ) ;
180214
181- fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
182- let this = self . get_mut ( ) ;
183- match Pin :: new ( & mut this . acceptor ) . poll_accept ( cx) {
184- Poll :: Ready ( Some ( Ok ( conn) ) ) => {
185- tracing:: debug!( "Accepted new connection" ) ;
186- Poll :: Ready ( Some ( Ok ( conn) ) )
187- }
188- Poll :: Ready ( Some ( Err ( e) ) ) => {
189- tracing:: error!( "Accept error: {}" , e) ;
190- // Continue to next connection on error
191- cx . waker ( ) . wake_by_ref ( ) ;
192- Poll :: Pending
193- }
194- Poll :: Ready ( None ) => {
195- tracing :: info! ( "Acceptor closed, stopping stream" ) ;
196- Poll :: Ready ( None )
215+ futures :: stream :: unfold ( acceptor , | mut acceptor| async move {
216+ loop {
217+ match poll_fn ( |cx| Pin :: new ( & mut acceptor) . poll_accept ( cx) ) . await {
218+ Some ( Ok ( conn) ) => {
219+ tracing:: debug!( "Accepted new connection" ) ;
220+ return Some ( ( Ok ( conn) , acceptor ) ) ;
221+ }
222+ Some ( Err ( e) ) => {
223+ tracing:: error!( "Accept error: {}" , e) ;
224+ // Continue to next iteration
225+ continue ;
226+ }
227+ None => {
228+ tracing :: info! ( "Acceptor closed, stopping stream" ) ;
229+ return None ;
230+ }
197231 }
198- Poll :: Pending => Poll :: Pending ,
199232 }
200- }
201- }
202-
203- fn plain_incoming_stream < A : Accept > (
204- acceptor : A ,
205- ) -> impl Stream < Item = Result < A :: Connection , anyhow:: Error > > {
206- tracing:: info!( "Starting plain incoming stream" ) ;
207- PlainIncomingStream :: new ( acceptor)
233+ } )
208234}
209235
210236// Wrapper for TLS stream to implement Connected
0 commit comments