From f84f441f156da48f7ca861c226586edaa8d56082 Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Sat, 20 Jun 2026 21:54:53 +0200 Subject: [PATCH] docs: reconcile manual with code and fix non-compiling examples Audited the Antora book and header Javadoc against the implementation and corrected every factual and conceptual discrepancy, treating the code as canonical. Key corrections: - read_some/write_some error contract: n < buffer_size on error, and a full buffer always means success (conditions deferred to the next read); rewrote the chapter 9 rationale and fixed the 9n header misquote - BufferSource/BufferSink: span-based pull/prepare plus consume, matching the real concepts (was pointer+count with integer returns) - cancellation: this_coro::stop_token and error::canceled (removed the nonexistent get_stop_token and error::operation_aborted) - dynamic buffers: fixed-capacity adapters over external storage built via pointer or the dynamic_buffer factory (was grow-on-demand) - any_* wrappers: owning by value vs reference by pointer (no S& ctor) - IoAwaitable/IoRunnable concepts, Executor concept, composed-op return types (io_task), and assorted symbol/path fixes Verified all 489 code examples compile against the real API. Fixed real breakage: mutable_buffer(array) to make_buffer, co_await std::move on a named task, the mmap_source io_result return, the fabricated test::stream(fuse) constructor, and buffer_to_string on bufgrind slices. Build instructions link capy with no external dependencies, and the thread_pool examples call join() to drain outstanding work. --- .../ROOT/pages/4.coroutines/4a.tasks.adoc | 10 +- .../ROOT/pages/4.coroutines/4b.launching.adoc | 3 +- .../ROOT/pages/4.coroutines/4c.executors.adoc | 30 +++- .../pages/4.coroutines/4d.io-awaitable.adoc | 31 ++-- .../pages/4.coroutines/4e.cancellation.adoc | 24 +-- .../pages/4.coroutines/4f.composition.adoc | 3 - .../pages/4.coroutines/4g.allocators.adoc | 2 +- .../4.coroutines/4h.lambda-captures.adoc | 6 +- .../ROOT/pages/5.buffers/5b.types.adoc | 6 +- .../ROOT/pages/5.buffers/5e.algorithms.adoc | 10 +- .../ROOT/pages/5.buffers/5f.dynamic.adoc | 67 +++++--- .../ROOT/pages/6.streams/6a.overview.adoc | 28 +-- .../ROOT/pages/6.streams/6b.streams.adoc | 50 ++++-- .../pages/6.streams/6c.sources-sinks.adoc | 16 +- .../pages/6.streams/6d.buffer-concepts.adoc | 162 +++++++++++------- .../ROOT/pages/6.streams/6e.algorithms.adoc | 65 +++---- .../ROOT/pages/6.streams/6f.isolation.adoc | 21 +-- .../pages/7.testing/7e.buffer-inspection.adoc | 25 +-- .../ROOT/pages/8.examples/8a.hello-task.adoc | 3 +- .../8.examples/8d.mock-stream-testing.adoc | 20 +-- .../ROOT/pages/9.design/9a.CapyLayering.adoc | 5 +- .../ROOT/pages/9.design/9c.ReadStream.adoc | 40 ++--- .../ROOT/pages/9.design/9d.ReadSource.adoc | 22 +-- .../ROOT/pages/9.design/9f.WriteStream.adoc | 4 +- .../ROOT/pages/9.design/9g.WriteSink.adoc | 6 +- .../ROOT/pages/9.design/9h.BufferSink.adoc | 2 +- .../9.design/9n.WhyNotCobaltConcepts.adoc | 20 ++- .../ROOT/pages/9.design/9o.WhyNotTMC.adoc | 2 +- doc/modules/ROOT/pages/index.adoc | 17 +- doc/modules/ROOT/pages/quick-start.adoc | 8 +- include/boost/capy/ex/strand.hpp | 4 +- include/boost/capy/ex/thread_pool.hpp | 3 +- include/boost/capy/io_result.hpp | 10 +- include/boost/capy/read.hpp | 14 +- include/boost/capy/test/buffer_sink.hpp | 10 +- include/boost/capy/test/buffer_to_string.hpp | 2 +- 36 files changed, 433 insertions(+), 318 deletions(-) diff --git a/doc/modules/ROOT/pages/4.coroutines/4a.tasks.adoc b/doc/modules/ROOT/pages/4.coroutines/4a.tasks.adoc index 8a14c71ed..42d3d5eb9 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4a.tasks.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4a.tasks.adoc @@ -115,7 +115,7 @@ task<> example() auto t = compute(); // Task created, but "Computing..." NOT printed yet std::cout << "Task created\n"; - int result = co_await t; // NOW "Computing..." is printed + int result = co_await std::move(t); // NOW "Computing..." is printed std::cout << "Result: " << result << "\n"; } ---- @@ -167,11 +167,11 @@ task compute(); task<> example() { auto t1 = compute(); - auto t2 = std::move(t1); // OK: ownership transferred - + auto t2 = std::move(t1); // OK: ownership transferred, t1 is now empty + // auto t3 = t2; // Error: task is not copyable - - int result = co_await t2; // t1 is now empty + + int result = co_await std::move(t2); } ---- diff --git a/doc/modules/ROOT/pages/4.coroutines/4b.launching.adoc b/doc/modules/ROOT/pages/4.coroutines/4b.launching.adoc index 0e088cac3..c58e4f579 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4b.launching.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4b.launching.adoc @@ -36,8 +36,7 @@ int main() run_async(pool.get_executor())(compute()); // Task is now running on the thread pool - // pool destructor waits for work to complete - return 0; + pool.join(); // wait for outstanding work to complete } ---- diff --git a/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc b/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc index 4839686f8..76e239747 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4c.executors.adoc @@ -9,14 +9,25 @@ This section explains executors and execution contexts—the mechanisms that con == The Executor Concept -An *executor* is an object that can schedule work for execution. In Capy, executors must provide three methods: +An *executor* is an object that can schedule work for execution. An executor must be nothrow copy- and move-constructible and provide the following interface: [source,cpp] ---- -concept Executor = requires(E ex, continuation& c) { - { ex.dispatch(c) } -> std::same_as>; - { ex.post(c) } -> std::same_as; - { ex.context() } -> std::convertible_to; +concept Executor = requires(E const& ce, E const& ce2, continuation& c) { + // Equality comparable + { ce == ce2 } noexcept -> std::convertible_to; + + // Owning context, returned as an lvalue reference to a type + // derived from execution_context + { ce.context() } noexcept; + + // Work tracking + { ce.on_work_started() } noexcept; + { ce.on_work_finished() } noexcept; + + // Scheduling + { ce.dispatch(c) } -> std::same_as>; + { ce.post(c) }; }; ---- @@ -49,8 +60,9 @@ int main() { thread_pool pool; executor_ref ex = pool.get_executor(); // Type erasure - - schedule_work(ex, some_coroutine); + + continuation c = /* ... */; + schedule_work(ex, c); } ---- @@ -74,8 +86,8 @@ int main() // Launch work on the pool run_async(ex)(my_task()); - - // pool destructor waits for all work to complete + + pool.join(); // wait for outstanding work to complete } ---- diff --git a/doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc b/doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc index 48672d51d..f05e033b2 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4d.io-awaitable.adoc @@ -53,36 +53,37 @@ This signature receives: ** `env->stop_token` — A stop token for cooperative cancellation ** `env->frame_allocator` — An optional frame allocator -The return type enables symmetric transfer. +Many IoAwaitables return `std::coroutine_handle<>` to enable symmetric transfer, but the concept does not require any particular return type. == IoAwaitable Concept -An awaitable satisfies `IoAwaitable` if: +An awaitable satisfies `IoAwaitable` if `a.await_suspend(h, env)` is a valid expression: [source,cpp] ---- -template -concept IoAwaitable = requires(T& t, std::coroutine_handle<> h, io_env const* env) { - { t.await_ready() } -> std::convertible_to; - { t.await_suspend(h, env) } -> std::same_as>; - t.await_resume(); -}; +template +concept IoAwaitable = + requires(A a, std::coroutine_handle<> h, io_env const* env) { + a.await_suspend(h, env); + }; ---- -The key difference from standard awaitables is the two-argument `await_suspend` that receives the `io_env`. +The concept constrains only the two-argument `await_suspend` that receives the `io_env`. It does not require `await_ready` or `await_resume`, nor does it constrain the return type of `await_suspend`. A complete awaitable still provides `await_ready` and `await_resume` so it can be `co_await`-ed; the concept simply does not test for them. == IoRunnable Concept -For tasks that can be launched from non-coroutine contexts, the `IoRunnable` concept refines `IoAwaitable` with: +For tasks that can be launched from non-coroutine contexts, the `IoRunnable` concept refines `IoAwaitable` and requires a `promise_type` plus the following: -* `handle()` — Access the typed coroutine handle -* `release()` — Transfer ownership of the frame -* `exception()` — Check for captured exceptions -* `result()` — Access the result value (non-void tasks) +* `handle()`: Access the typed coroutine handle +* `release()`: Transfer ownership of the frame +* `exception()`: Check for captured exceptions (on the promise) +* `result()`: Access the result value (on the promise, for non-void tasks) +* `set_continuation()`: Set the continuation handle (on the promise) +* `set_environment()`: Inject the `io_env` (on the promise) These methods exist because launch functions like `run_async` cannot `co_await` the task directly. The trampoline must be allocated before the task type is known, so it type-erases the task through function pointers and needs a common API to manage lifetime and extract results. -Context injection methods (`set_environment`, `set_continuation`) are internal to the promise and not part of any concept. Launch functions access them through the typed handle provided by `handle()`. +The context injection methods `set_continuation` and `set_environment` are part of the `IoRunnable` concept: it requires them on the `promise_type`. Launch functions access them through the typed handle provided by `handle()`. Capy's `task` satisfies this concept. diff --git a/doc/modules/ROOT/pages/4.coroutines/4e.cancellation.adoc b/doc/modules/ROOT/pages/4.coroutines/4e.cancellation.adoc index 9e8ee15eb..1043fea9e 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4e.cancellation.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4e.cancellation.adoc @@ -201,7 +201,7 @@ task<> parent() task<> child() { // Receives parent's stop token via IoAwaitable protocol - auto token = co_await get_stop_token(); // Access current token + auto token = co_await this_coro::stop_token; // Access current token } ---- @@ -209,13 +209,13 @@ No manual threading—the protocol handles it. === Accessing the Stop Token -Inside a task, use `get_stop_token()` to access the current stop token: +Inside a task, use `co_await this_coro::stop_token` to access the current stop token: [source,cpp] ---- task<> cancellable_work() { - auto token = co_await get_stop_token(); + auto token = co_await this_coro::stop_token; while (!token.stop_requested()) { @@ -243,8 +243,8 @@ The rule: ---- task<> process_items(std::vector const& items) { - auto token = co_await get_stop_token(); - + auto token = co_await this_coro::stop_token; + for (auto const& item : items) { if (token.stop_requested()) @@ -264,7 +264,7 @@ RAII ensures resources are released on early exit: task<> with_resource() { auto resource = acquire_resource(); // RAII wrapper - auto token = co_await get_stop_token(); + auto token = co_await this_coro::stop_token; while (!token.stop_requested()) { @@ -274,22 +274,22 @@ task<> with_resource() } ---- -=== The operation_aborted Convention +=== The canceled Convention -When cancellation causes an operation to fail, the conventional error code is `error::operation_aborted`: +When cancellation causes an operation to fail, the conventional error code is `error::canceled`, which compares equal to the portable condition `cond::canceled`: [source,cpp] ---- task fetch_with_cancel() { - auto token = co_await get_stop_token(); - + auto token = co_await this_coro::stop_token; + if (token.stop_requested()) { throw std::system_error( - make_error_code(std::errc::operation_canceled)); + make_error_code(error::canceled)); } - + co_return co_await do_fetch(); } ---- diff --git a/doc/modules/ROOT/pages/4.coroutines/4f.composition.adoc b/doc/modules/ROOT/pages/4.coroutines/4f.composition.adoc index dbf181285..27a7341b9 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4f.composition.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4f.composition.adoc @@ -344,9 +344,6 @@ This design ensures proper context propagation to all children. | `` | Asynchronous sleep that suspends instead of blocking the thread -| `` -| Race an awaitable against a deadline - | `` | Race an awaitable against a deadline |=== diff --git a/doc/modules/ROOT/pages/4.coroutines/4g.allocators.adoc b/doc/modules/ROOT/pages/4.coroutines/4g.allocators.adoc index b0cdeadac..b4fa11aff 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4g.allocators.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4g.allocators.adoc @@ -267,7 +267,7 @@ Coroutine frame allocation is rarely the bottleneck. Profile your application be | `` | Frame allocator concept and utilities -| `` +| `` | Mixin base for promise types that use the TLS frame allocator | `` diff --git a/doc/modules/ROOT/pages/4.coroutines/4h.lambda-captures.adoc b/doc/modules/ROOT/pages/4.coroutines/4h.lambda-captures.adoc index bbe82a95e..754d62bcf 100644 --- a/doc/modules/ROOT/pages/4.coroutines/4h.lambda-captures.adoc +++ b/doc/modules/ROOT/pages/4.coroutines/4h.lambda-captures.adoc @@ -20,7 +20,7 @@ void process(socket& sock) auto task = [&sock]() -> capy::task<> { char buf[1024]; - auto [ec, n] = co_await sock.read_some(buffer(buf, sizeof(buf))); + auto [ec, n] = co_await sock.read_some(make_buffer(buf)); }(); run_async(executor)(std::move(task)); @@ -58,7 +58,7 @@ void process(socket& sock) auto task = [](socket* s) -> capy::task<> { char buf[1024]; - auto [ec, n] = co_await s->read_some(buffer(buf, sizeof(buf))); + auto [ec, n] = co_await s->read_some(make_buffer(buf)); }(&sock); run_async(executor)(std::move(task)); @@ -126,7 +126,7 @@ auto handler = [&sock]() -> capy::task<> }; // Lambda 'handler' still exists here -run_and_wait(handler()); // Blocks until coroutine completes +capy::test::run_blocking()(handler()); // Blocks until coroutine completes // Lambda destroyed after coroutine finishes ---- diff --git a/doc/modules/ROOT/pages/5.buffers/5b.types.adoc b/doc/modules/ROOT/pages/5.buffers/5b.types.adoc index b4574f231..c3ccd3025 100644 --- a/doc/modules/ROOT/pages/5.buffers/5b.types.adoc +++ b/doc/modules/ROOT/pages/5.buffers/5b.types.adoc @@ -150,10 +150,10 @@ auto buf = make_buffer(sp); elements—including `std::span` and `boost::span`—in addition to the sources shown above. -The returned buffer type depends on constness: +The returned buffer type depends on the element constness of the range: -* Non-const containers → `mutable_buffer` -* Const containers, `string_view` → `const_buffer` +* Ranges of mutable elements → `mutable_buffer` +* Ranges of const elements, `string_view`, string literals → `const_buffer` == Layout Compatibility diff --git a/doc/modules/ROOT/pages/5.buffers/5e.algorithms.adoc b/doc/modules/ROOT/pages/5.buffers/5e.algorithms.adoc index ca169e594..96d0a1f25 100644 --- a/doc/modules/ROOT/pages/5.buffers/5e.algorithms.adoc +++ b/doc/modules/ROOT/pages/5.buffers/5e.algorithms.adoc @@ -88,14 +88,14 @@ Note the distinction: Copies data from one buffer sequence to another: +`buffer_copy` is a function object with a single call operator whose +`at_most` parameter defaults to copying everything available: + [source,cpp] ---- template -std::size_t buffer_copy(Target const& target, Source const& source); - -template -std::size_t buffer_copy(Target const& target, Source const& source, - std::size_t at_most); +std::size_t buffer_copy(Target const& target, Source const& source, + std::size_t at_most = std::size_t(-1)); ---- Returns the number of bytes copied. diff --git a/doc/modules/ROOT/pages/5.buffers/5f.dynamic.adoc b/doc/modules/ROOT/pages/5.buffers/5f.dynamic.adoc index 8ac07a9ac..29d8a39c4 100644 --- a/doc/modules/ROOT/pages/5.buffers/5f.dynamic.adoc +++ b/doc/modules/ROOT/pages/5.buffers/5f.dynamic.adoc @@ -14,7 +14,7 @@ Dynamic buffers serve as intermediate storage between a *producer* (typically ne The flow: 1. **Producer** writes data into the buffer -2. **Buffer** grows as needed to accommodate data +2. **Buffer** accommodates data within its capacity 3. **Consumer** reads and processes data 4. **Buffer** releases consumed data @@ -25,22 +25,22 @@ This model decouples production rate from consumption rate—the buffer absorbs [source,cpp] ---- template -concept DynamicBuffer = requires(T& t, std::size_t n) { +concept DynamicBuffer = requires(T& t, T const& ct, std::size_t n) { typename T::const_buffers_type; typename T::mutable_buffers_type; // Producer side { t.prepare(n) } -> std::same_as; { t.commit(n) }; - + // Consumer side - { t.data() } -> std::same_as; + { ct.data() } -> std::same_as; { t.consume(n) }; - + // Capacity - { t.size() } -> std::same_as; - { t.max_size() } -> std::same_as; - { t.capacity() } -> std::same_as; + { ct.size() } -> std::convertible_to; + { ct.max_size() } -> std::convertible_to; + { ct.capacity() } -> std::convertible_to; }; ---- @@ -61,11 +61,14 @@ Returns a mutable buffer sequence `mb` for writing up to `n` bytes: capy::MutableBufferSequence auto buffers = dynamic_buf.prepare(1024); // Space for up to 1024 bytes ---- -_Effects:_ If `n > max_size() - size()` throws `std::length_error()`. -Otherwise, ensures that a wirteable region of size at leats `n` exists -and returns an object of type `mutable_buffers_type` representing this region. +_Effects:_ If `n` exceeds the available capacity, throws. Otherwise, +ensures that a writeable region of size at least `n` exists and returns +an object of type `mutable_buffers_type` representing this region. -_Throws:_ `std::bad_alloc` in case an allocation is performed and fails. +_Throws:_ The concrete adapters throw `std::invalid_argument` when `n` +exceeds the available capacity; `circular_dynamic_buffer` throws +`std::length_error`. `std::bad_alloc` may also be thrown if an allocation +is performed and fails. _Postcondition:_ `buffer_size(buffers) >= n`. @@ -114,7 +117,7 @@ Returns a const buffer sequence representing the readable region. [source,cpp] ---- -capy::ReadableBufferSequence auto readable = dynamic_buf.data(); +capy::ConstBufferSequence auto readable = dynamic_buf.data(); // Process readable bytes ---- @@ -203,14 +206,18 @@ This concept ensures proper handling of lvalues and rvalues, preventing dangling === flat_dynamic_buffer -Linear storage with single-buffer sequences: +A fixed-capacity adapter over caller-owned contiguous storage, with +single-buffer sequences. It never reallocates: the capacity is fixed at +construction, and a default-constructed buffer has zero capacity (so +`prepare` would throw). [source,cpp] ---- #include -flat_dynamic_buffer buffer; -buffer.prepare(1024); +char storage[1024]; +flat_dynamic_buffer buffer(storage, sizeof(storage)); +auto space = buffer.prepare(256); // ... write data ... buffer.commit(n); @@ -224,7 +231,7 @@ Advantages: Disadvantages: -* May require copying when buffer wraps or grows +* Fixed capacity; `prepare` throws when more space is requested than remains === circular_dynamic_buffer @@ -234,7 +241,8 @@ Ring buffer implementation: ---- #include -circular_dynamic_buffer<1024> buffer; // Fixed capacity +char storage[1024]; +circular_dynamic_buffer buffer(storage, sizeof(storage)); // Fixed capacity ---- Advantages: @@ -249,17 +257,20 @@ Disadvantages: === vector_dynamic_buffer -Backed by `std::vector`: +Backed by a caller-owned `std::vector` of byte-sized elements +(`vector_dynamic_buffer` itself uses `unsigned char`). Use the +`dynamic_buffer` factory, which deduces the element type: [source,cpp] ---- #include -std::vector storage; -vector_dynamic_buffer buffer(storage); +std::vector storage; +auto buffer = dynamic_buffer(storage); ---- -Adapts an existing vector for use as a dynamic buffer. +Adapts an existing vector for use as a dynamic buffer. The vector must +outlive the adapter. === string_dynamic_buffer @@ -270,10 +281,13 @@ Backed by `std::string`: #include std::string storage; -string_dynamic_buffer buffer(storage); +auto buffer = dynamic_buffer(storage); ---- -Useful when you want the final data as a string. +Useful when you want the final data as a string. The `dynamic_buffer` +factory wraps the string (you may also construct +`string_dynamic_buffer(&storage)` directly). The string must outlive the +adapter. == Example: Line-Based Protocol @@ -281,8 +295,9 @@ Useful when you want the final data as a string. ---- task read_line(Stream& stream) { - flat_dynamic_buffer buffer; - + char storage[4096]; + flat_dynamic_buffer buffer(storage, sizeof(storage)); + while (true) { // Prepare space and read diff --git a/doc/modules/ROOT/pages/6.streams/6a.overview.adoc b/doc/modules/ROOT/pages/6.streams/6a.overview.adoc index 479409463..1070bb6cc 100644 --- a/doc/modules/ROOT/pages/6.streams/6a.overview.adoc +++ b/doc/modules/ROOT/pages/6.streams/6a.overview.adoc @@ -85,18 +85,20 @@ The third pair inverts buffer ownership: [source,cpp] ---- // BufferSource: callee provides read-only buffers -const_buffer bufs[8]; -auto [ec, count] = co_await source.pull(bufs, 8); -// bufs[0..count-1] now point to source's internal data +const_buffer arr[8]; +auto [ec, bufs] = co_await source.pull(arr); +// bufs is a span pointing into source's internal data; +// ec == cond::eof with an empty span signals exhaustion +source.consume(n); // mark n bytes consumed before the next pull // BufferSink: callee provides writable buffers -mutable_buffer bufs[8]; -std::size_t count = sink.prepare(bufs, 8); -// Write into bufs[0..count-1], then commit +mutable_buffer arr[8]; +auto bufs = sink.prepare(arr); // returns a span of writable buffers +// Write into bufs, then commit co_await sink.commit(bytes_written); ---- -This pattern enables zero-copy I/O—data never moves through intermediate buffers. +This pattern enables zero-copy I/O: data never moves through intermediate buffers. == Type-Erasing Wrappers @@ -166,7 +168,7 @@ task<> echo(any_stream& stream) char buf[1024]; for (;;) { - auto [ec, n] = co_await stream.read_some(mutable_buffer(buf)); + auto [ec, n] = co_await stream.read_some(make_buffer(buf)); auto [wec, wn] = co_await write(stream, const_buffer(buf, n)); @@ -183,19 +185,19 @@ The caller decides the concrete implementation: [source,cpp] ---- -// Works with Corosio TCP sockets +// Owns a moved-in TCP socket (the lvalue form takes ownership by value) any_stream s1{tcp_socket}; echo(s1); -// Works with TLS streams -any_stream s2{tls_stream}; +// Wraps a TLS stream by pointer (reference semantics, must outlive s2) +any_stream s2{&tls_stream}; echo(s2); -// Works with test mocks +// Owns a temporary test mock any_stream s3{test::stream{}}; echo(s3); ---- -Same code, different transports—compile once, link anywhere. +Same code, different transports: compile once, link anywhere. Continue to xref:6.streams/6b.streams.adoc[Streams (Partial I/O)] to learn the `ReadStream` and `WriteStream` concepts in detail. diff --git a/doc/modules/ROOT/pages/6.streams/6b.streams.adoc b/doc/modules/ROOT/pages/6.streams/6b.streams.adoc index 44f42a0eb..5ebc018de 100644 --- a/doc/modules/ROOT/pages/6.streams/6b.streams.adoc +++ b/doc/modules/ROOT/pages/6.streams/6b.streams.adoc @@ -33,7 +33,9 @@ Attempts to read up to `buffer_size(buffers)` bytes from the stream into the buf If `buffer_size(buffers) > 0`: * If `!ec`, then `n >= 1 && n \<= buffer_size(buffers)`. `n` bytes were read into the buffer sequence. -* If `ec`, then `n >= 0 && n \<= buffer_size(buffers)`. `n` is the number of bytes read before the I/O condition arose. +* If `ec`, then `n >= 0 && n < buffer_size(buffers)`. `n` is the number of bytes read before the I/O condition arose. + +Equivalently, `n == buffer_size(buffers)` implies `!ec`: a read that fills the buffer sequence is a success even when the underlying operation also signals a condition such as end-of-stream. That condition is reported on a subsequent read. If `buffer_empty(buffers)` is true, `n` is 0. The empty buffer is not itself a cause for error, but `ec` may reflect the state of the stream. @@ -48,12 +50,12 @@ I/O conditions from the underlying system are reported via `ec`. Failures in the [source,cpp] ---- char buf[1024]; -auto [ec, n] = co_await stream.read_some(mutable_buffer(buf)); +auto [ec, n] = co_await stream.read_some(make_buffer(buf)); // n might be 1, might be 500, might be 1024 // if !ec, then n >= 1 ---- -This matches underlying OS behavior—reads return when *some* data is available. +This matches underlying OS behavior: reads return when *some* data is available. === Example @@ -66,7 +68,7 @@ task<> dump_stream(Stream& stream) for (;;) { - auto [ec, n] = co_await stream.read_some(mutable_buffer(buf)); + auto [ec, n] = co_await stream.read_some(make_buffer(buf)); std::cout.write(buf, n); @@ -102,7 +104,9 @@ Attempts to write up to `buffer_size(buffers)` bytes from the buffer sequence to If `buffer_size(buffers) > 0`: * If `!ec`, then `n >= 1 && n \<= buffer_size(buffers)`. `n` bytes were written from the buffer sequence. -* If `ec`, then `n >= 0 && n \<= buffer_size(buffers)`. `n` is the number of bytes written before the I/O condition arose. +* If `ec`, then `n >= 0 && n < buffer_size(buffers)`. `n` is the number of bytes written before the I/O condition arose. + +Equivalently, `n == buffer_size(buffers)` implies `!ec`: a write that drains the entire buffer sequence is a success even when the underlying operation also signals a condition. That condition is reported on a subsequent write. If `buffer_empty(buffers)` is true, `n` is 0. The empty buffer is not itself a cause for error, but `ec` may reflect the state of the stream. @@ -132,11 +136,16 @@ Wraps any `ReadStream` in a type-erased container: ---- #include +// Owning: takes ownership of a moved-in stream +template +any_read_stream(S stream); + +// Reference: wraps by pointer without ownership template -any_read_stream(S& stream); +any_read_stream(S* stream); ---- -The wrapped stream is referenced—the original must outlive the wrapper. +Each wrapper has two construction modes. Passing an object by value takes ownership: the wrapper moves the stream into internally allocated storage. Passing a pointer wraps the pointed-to stream by reference, and that stream must outlive the wrapper. === any_write_stream @@ -147,7 +156,10 @@ Wraps any `WriteStream`: #include template -any_write_stream(S& stream); +any_write_stream(S stream); // owning + +template +any_write_stream(S* stream); // reference ---- === any_stream @@ -158,19 +170,23 @@ Wraps bidirectional streams (both `ReadStream` and `WriteStream`): ---- #include -template - requires WriteStream -any_stream(S& stream); +template + requires ReadStream && WriteStream +any_stream(S stream); // owning + +template + requires ReadStream && WriteStream +any_stream(S* stream); // reference ---- === Wrapper Characteristics All wrappers share these properties: -* *Reference semantics* — Wrap existing objects without ownership -* *Preallocated coroutine frame* — Zero steady-state allocation -* *Move-only* — Non-copyable; moving transfers the cached frame -* *Lifetime requirement* — Wrapped object must outlive wrapper +* *Owning or reference*: By-value construction owns a moved-in object; pointer construction wraps by reference +* *Preallocated coroutine frame*: Zero steady-state allocation +* *Move-only*: Non-copyable; moving transfers the cached frame +* *Lifetime requirement*: A pointer-wrapped object must outlive the wrapper Example usage: @@ -181,7 +197,7 @@ void process_stream(any_stream& stream); tcp::socket socket; // ... connect socket ... -any_stream wrapped{socket}; // Type erasure here +any_stream wrapped{&socket}; // Type erasure, references the existing socket process_stream(wrapped); // process_stream doesn't know about tcp::socket ---- @@ -199,7 +215,7 @@ task<> handle_connection(any_stream& stream) for (;;) { - auto [ec, n] = co_await stream.read_some(mutable_buffer(buf)); + auto [ec, n] = co_await stream.read_some(make_buffer(buf)); auto [wec, wn] = co_await write(stream, const_buffer(buf, n)); diff --git a/doc/modules/ROOT/pages/6.streams/6c.sources-sinks.adoc b/doc/modules/ROOT/pages/6.streams/6c.sources-sinks.adoc index 4ea54bfa5..874059770 100644 --- a/doc/modules/ROOT/pages/6.streams/6c.sources-sinks.adoc +++ b/doc/modules/ROOT/pages/6.streams/6c.sources-sinks.adoc @@ -147,7 +147,10 @@ task<> send_response(Sink& sink, response const& resp) #include template -any_read_source(S& source); +any_read_source(S source); // owning: takes ownership of a moved-in source + +template +any_read_source(S* source); // reference: wraps by pointer, source must outlive the wrapper ---- === any_write_sink @@ -157,7 +160,10 @@ any_read_source(S& source); #include template -any_write_sink(S& sink); +any_write_sink(S sink); // owning: takes ownership of a moved-in sink + +template +any_write_sink(S* sink); // reference: wraps by pointer, sink must outlive the wrapper ---- == Example: HTTP Body Handler @@ -185,16 +191,16 @@ The caller decides the concrete implementation: ---- // Content-length mode content_length_sink cl_sink(socket, data.size()); -any_write_sink body{cl_sink}; +any_write_sink body{&cl_sink}; // references cl_sink send_body(body, data); // Chunked mode chunked_sink ch_sink(socket); -any_write_sink body{ch_sink}; +any_write_sink body{&ch_sink}; // references ch_sink send_body(body, data); ---- -Same `send_body` function, different transfer encodings—the library handles the difference. +Same `send_body` function, different transfer encodings: the library handles the difference. == Streams vs Sources/Sinks diff --git a/doc/modules/ROOT/pages/6.streams/6d.buffer-concepts.adoc b/doc/modules/ROOT/pages/6.streams/6d.buffer-concepts.adoc index 3f4af412c..f18bfc78c 100644 --- a/doc/modules/ROOT/pages/6.streams/6d.buffer-concepts.adoc +++ b/doc/modules/ROOT/pages/6.streams/6d.buffer-concepts.adoc @@ -15,7 +15,7 @@ With streams and sources/sinks, the *caller* provides buffers: ---- // Caller owns the buffer char my_buffer[1024]; -co_await stream.read_some(mutable_buffer(my_buffer)); +co_await stream.read_some(make_buffer(my_buffer)); ---- Data flows: source → caller's buffer → processing @@ -25,9 +25,9 @@ With buffer sources/sinks, the *callee* provides buffers: [source,cpp] ---- // Callee owns the buffers -const_buffer bufs[8]; -auto [ec, count] = co_await source.pull(bufs, 8); -// bufs now point into source's internal storage +const_buffer arr[8]; +auto [ec, bufs] = co_await source.pull(arr); +// bufs is a span pointing into source's internal storage ---- Data flows: source's internal buffer → processing (no copy!) @@ -40,8 +40,10 @@ A `BufferSource` provides read-only buffers from its internal storage: ---- template concept BufferSource = - requires(T& source, const_buffer* arr, std::size_t max_count) { - { source.pull(arr, max_count) } -> IoAwaitable; + requires(T& src, std::span dest, std::size_t n) { + { src.pull(dest) } -> IoAwaitable; + // pull await-returns (error_code, std::span) + src.consume(n); }; ---- @@ -49,16 +51,17 @@ concept BufferSource = [source,cpp] ---- -IoAwaitable auto pull(const_buffer* arr, std::size_t max_count); +IoAwaitable auto pull(std::span dest); +void consume(std::size_t n); ---- -Await-returns `(error_code, std::size_t)`: +`pull` await-returns `(error_code, std::span)`. The source fills `dest` with descriptors pointing into its internal storage and returns a span over the filled prefix: -* On success: `!ec`, fills `arr[0..count-1]` with buffer descriptors -* On exhausted: `count == 0` indicates no more data -* On error: `ec` +* On success: `!ec`, the returned span describes the available data +* On exhaustion: `ec == cond::eof` with an empty span +* On error: `ec` set to the error condition -The buffers point into the source's internal storage. You must consume all returned data before calling `pull()` again—the previous buffers become invalid. +The returned buffers point into the source's internal storage and remain valid until you call `consume`. Calling `pull` again *without* an intervening `consume` returns the same unconsumed data. Call `consume(n)` to mark `n` bytes as consumed and advance the source. === Example @@ -67,21 +70,27 @@ The buffers point into the source's internal storage. You must consume all retur template task<> process_source(Source& source) { - const_buffer bufs[8]; - + const_buffer arr[8]; + for (;;) { - auto [ec, count] = co_await source.pull(bufs, 8); - + auto [ec, bufs] = co_await source.pull(arr); + + if (ec == cond::eof) + break; // Source exhausted + if (ec) throw std::system_error(ec); - - if (count == 0) - break; // Source exhausted - + // Process buffers (zero-copy!) - for (std::size_t i = 0; i < count; ++i) - process_data(bufs[i].data(), bufs[i].size()); + std::size_t total = 0; + for (auto const& b : bufs) + { + process_data(b.data(), b.size()); + total += b.size(); + } + + source.consume(total); } } ---- @@ -94,11 +103,10 @@ A `BufferSink` provides writable buffers for direct write access: ---- template concept BufferSink = - requires(T& sink, mutable_buffer* arr, std::size_t max_count, std::size_t n) { - { sink.prepare(arr, max_count) } -> std::same_as; - { sink.commit(n) } -> IoAwaitable; - { sink.commit(n, bool{}) } -> IoAwaitable; - { sink.commit_eof() } -> IoAwaitable; + requires(T& sink, std::span dest, std::size_t n) { + { sink.prepare(dest) } -> std::same_as>; + { sink.commit(n) } -> IoAwaitable; // await-returns (error_code) + { sink.commit_eof(n) } -> IoAwaitable; // await-returns (error_code) }; ---- @@ -106,21 +114,20 @@ concept BufferSink = [source,cpp] ---- -std::size_t prepare(mutable_buffer* arr, std::size_t max_count); +std::span prepare(std::span dest); ---- -Synchronous operation. Returns the number of buffers prepared (may be less than `max_count`). Fills `arr[0..count-1]` with writable buffer descriptors. +Synchronous operation. The sink fills `dest` with writable buffer descriptors from its internal storage and returns a span over the filled prefix (which may be shorter than `dest`). === commit Semantics [source,cpp] ---- IoAwaitable auto commit(std::size_t n); -IoAwaitable auto commit(std::size_t n, bool eof); -IoAwaitable auto commit_eof(); +IoAwaitable auto commit_eof(std::size_t n); ---- -Finalizes `n` bytes of prepared data. The `eof` flag or `commit_eof()` signals end-of-stream. +`commit(n)` finalizes `n` bytes of prepared data and await-returns `(error_code)`. `commit_eof(n)` finalizes `n` final bytes and signals end-of-stream; pass `0` to signal end-of-stream with no further data. === Example @@ -130,29 +137,33 @@ template task<> write_to_sink(Sink& sink, std::span data) { std::size_t written = 0; - + while (written < data.size()) { - mutable_buffer bufs[8]; - std::size_t count = sink.prepare(bufs, 8); - - if (count == 0) + mutable_buffer arr[8]; + auto bufs = sink.prepare(arr); + + if (bufs.empty()) throw std::runtime_error("sink full"); - + // Copy into sink's buffers std::size_t copied = 0; - for (std::size_t i = 0; i < count && written < data.size(); ++i) + for (auto& b : bufs) { + if (written >= data.size()) + break; std::size_t chunk = (std::min)( - bufs[i].size(), + b.size(), data.size() - written); - std::memcpy(bufs[i].data(), data.data() + written, chunk); + std::memcpy(b.data(), data.data() + written, chunk); written += chunk; copied += chunk; } - - bool eof = (written == data.size()); - co_await sink.commit(copied, eof); + + if (written == data.size()) + co_await sink.commit_eof(copied); + else + co_await sink.commit(copied); } } ---- @@ -163,27 +174,34 @@ Buffer sources/sinks enable true zero-copy I/O: === Memory-Mapped Files +A `BufferSource` is a concept, not a base class, so a type models it +simply by providing the required members: + [source,cpp] ---- -class mmap_source : public BufferSource +class mmap_source // models BufferSource { void* mapped_region_; std::size_t size_; std::size_t offset_ = 0; - + public: - io_result pull(const_buffer* arr, std::size_t max_count) + io_task> pull(std::span dest) { if (offset_ >= size_) - co_return {error_code{}, 0}; // Exhausted - - // Return pointer into mapped memory—no copy! - arr[0] = const_buffer( + co_return {error::eof, {}}; // Exhausted + + // Return pointer into mapped memory: no copy! + dest[0] = const_buffer( static_cast(mapped_region_) + offset_, size_ - offset_); - offset_ = size_; - - co_return {error_code{}, 1}; + + co_return {{}, dest.subspan(0, 1)}; + } + + void consume(std::size_t n) + { + offset_ += n; } }; ---- @@ -200,8 +218,13 @@ DMA buffers, GPU memory, network card ring buffers—all can be exposed through ---- #include +// Owning: takes ownership of the source +template +any_buffer_source(S source); + +// Reference: wraps without ownership, source must outlive the wrapper template -any_buffer_source(S& source); +any_buffer_source(S* source); ---- === any_buffer_sink @@ -210,8 +233,13 @@ any_buffer_source(S& source); ---- #include +// Owning: takes ownership of the sink template -any_buffer_sink(S& sink); +any_buffer_sink(S sink); + +// Reference: wraps without ownership, sink must outlive the wrapper +template +any_buffer_sink(S* sink); ---- == Example: Compression Pipeline @@ -223,23 +251,27 @@ any_buffer_sink(S& sink); task<> decompress_stream(any_buffer_source& compressed, any_write_sink& output) { - const_buffer bufs[8]; - + const_buffer arr[8]; + for (;;) { - auto [ec, count] = co_await compressed.pull(bufs, 8); + auto [ec, bufs] = co_await compressed.pull(arr); + if (ec == cond::eof) + break; if (ec) throw std::system_error(ec); - if (count == 0) - break; - - for (std::size_t i = 0; i < count; ++i) + + std::size_t total = 0; + for (auto const& b : bufs) { - auto decompressed = decompress_block(bufs[i]); + auto decompressed = decompress_block(b); co_await output.write(make_buffer(decompressed)); + total += b.size(); } + + compressed.consume(total); } - + co_await output.write_eof(); } ---- diff --git a/doc/modules/ROOT/pages/6.streams/6e.algorithms.adoc b/doc/modules/ROOT/pages/6.streams/6e.algorithms.adoc index 7692211fa..5bce28b23 100644 --- a/doc/modules/ROOT/pages/6.streams/6e.algorithms.adoc +++ b/doc/modules/ROOT/pages/6.streams/6e.algorithms.adoc @@ -20,15 +20,14 @@ Fills a buffer completely by looping `read_some`: #include template -task> -read(Stream& stream, Buffers const& buffers); +io_task +read(Stream& stream, Buffers buffers); ---- -Keeps reading until: +Await-returns `io_result`, destructuring as `[ec, n]`. Keeps reading until: * Buffer is full (`n == buffer_size(buffers)`) -* EOF is reached (returns `cond::eof` with partial count) -* Error occurs (returns error with partial count) +* The underlying `read_some` reports a condition before the buffer is full (the condition is propagated with the partial count) Example: @@ -45,9 +44,9 @@ Reads until EOF into a growable buffer: [source,cpp] ---- -template -task> -read(Stream& stream, Buffer&& buffer); +template +io_task +read(Stream& stream, Buffer&& buffer, std::size_t initial_amount = 2048); ---- Example: @@ -68,8 +67,8 @@ Writes all data by looping `write_some`: #include template -task> -write(Stream& stream, Buffers const& buffers); +io_task +write(Stream& stream, Buffers buffers); ---- Keeps writing until: @@ -98,16 +97,16 @@ Transfers data from a `BufferSource` to a destination: // To WriteSink (with EOF propagation) template -task> +io_task push_to(Source& source, Sink& sink); // To WriteStream (streaming, no EOF) template -task> +io_task push_to(Source& source, Stream& stream); ---- -The source provides buffers via `pull()`. Data is pushed to the destination. Buffer ownership stays with the source—no intermediate copying when possible. +The source provides buffers via `pull()`. Data is pushed to the destination. Buffer ownership stays with the source: no intermediate copying when possible. Example: @@ -128,12 +127,12 @@ Transfers data from a source to a `BufferSink`: // From ReadSource template -task> +io_task pull_from(Source& source, Sink& sink); // From ReadStream (streaming) template -task> +io_task pull_from(Stream& stream, Sink& sink); ---- @@ -156,34 +155,38 @@ Instead, compose with an intermediate stage: [source,cpp] ---- -// Transform: BufferSource → processing → BufferSink +// Transform: BufferSource -> processing -> BufferSink task<> process_pipeline(any_buffer_source& source, any_buffer_sink& sink) { - const_buffer src_bufs[8]; - + const_buffer src_arr[8]; + while (true) { - auto [ec, count] = co_await source.pull(src_bufs, 8); - if (count == 0) + auto [ec, src_bufs] = co_await source.pull(src_arr); + if (ec == cond::eof) break; - - for (std::size_t i = 0; i < count; ++i) + + std::size_t consumed = 0; + for (auto const& b : src_bufs) { - auto processed = transform(src_bufs[i]); - + auto processed = transform(b); + // Write processed data to sink - mutable_buffer dst_bufs[8]; - std::size_t dst_count = sink.prepare(dst_bufs, 8); - + mutable_buffer dst_arr[8]; + auto dst_bufs = sink.prepare(dst_arr); + std::size_t copied = buffer_copy( - std::span(dst_bufs, dst_count), + dst_bufs, make_buffer(processed)); - + co_await sink.commit(copied); + consumed += b.size(); } + + source.consume(consumed); } - - co_await sink.commit_eof(); + + co_await sink.commit_eof(0); } ---- diff --git a/doc/modules/ROOT/pages/6.streams/6f.isolation.adoc b/doc/modules/ROOT/pages/6.streams/6f.isolation.adoc index ff2218db2..52c8355a5 100644 --- a/doc/modules/ROOT/pages/6.streams/6f.isolation.adoc +++ b/doc/modules/ROOT/pages/6.streams/6f.isolation.adoc @@ -93,7 +93,7 @@ Type erasure decouples your code from specific transport implementations: task<> send_message(any_write_sink& sink, message const& msg) { co_await sink.write(make_buffer(msg.header)); - co_await sink.write(make_buffer(msg.body), true); + co_await sink.write_eof(make_buffer(msg.body)); } ---- @@ -103,26 +103,26 @@ Callers provide any conforming implementation: ---- // TCP socket tcp::socket socket; -any_write_sink sink{socket}; +any_write_sink sink{&socket}; // references socket send_message(sink, msg); // TLS stream tls::stream stream; -any_write_sink sink{stream}; +any_write_sink sink{&stream}; // references stream send_message(sink, msg); // HTTP chunked encoding chunked_sink chunked{underlying}; -any_write_sink sink{chunked}; +any_write_sink sink{&chunked}; // references chunked send_message(sink, msg); // Test mock test::write_sink mock; -any_write_sink sink{mock}; +any_write_sink sink{&mock}; // references mock send_message(sink, msg); ---- -Same `send_message` function, different transports—compile once, use everywhere. +Same `send_message` function, different transports: compile once, use everywhere. == API Design Guidelines @@ -143,8 +143,8 @@ task<> process(tcp::socket& socket); ---- void caller(tcp::socket& socket) { - any_stream stream{socket}; // Wrap here - process(stream); // Call with erased type + any_stream stream{&socket}; // Wrap by reference here + process(stream); // Call with erased type } ---- @@ -159,7 +159,8 @@ tcp::socket create_socket(); // Then caller wraps if needed auto socket = create_socket(); -any_stream stream{socket}; +any_stream stream{&socket}; // reference; socket must outlive stream +// or: any_stream stream{std::move(socket)}; // wrapper takes ownership ---- Returning type-erased values forces heap allocation. Return concrete types when the caller knows what they need. @@ -200,7 +201,7 @@ Users don't need to know how HTTP is implemented: tcp::socket socket; // ... connect ... -any_stream conn{socket}; +any_stream conn{&socket}; // references socket auto response = co_await send_request(conn, { .method = "GET", .url = "/api/data" diff --git a/doc/modules/ROOT/pages/7.testing/7e.buffer-inspection.adoc b/doc/modules/ROOT/pages/7.testing/7e.buffer-inspection.adoc index 5b8aa3481..c1b21ced4 100644 --- a/doc/modules/ROOT/pages/7.testing/7e.buffer-inspection.adoc +++ b/doc/modules/ROOT/pages/7.testing/7e.buffer-inspection.adoc @@ -49,7 +49,7 @@ void test_all_splits() while(bg) { auto [b1, b2] = co_await bg.next(); - BOOST_TEST_EQ(buffer_to_string(b1, b2), data); + BOOST_TEST_EQ(buffer_to_string(b1.data(), b2.data()), data); } }); BOOST_TEST(r.success); @@ -97,11 +97,13 @@ two positions: 0 and size. === Mutability Preservation -`bufgrind` is templated on a `ConstBufferSequence` but the split type it -produces follows the mutability of the input. Passing a `mutable_buffer` -yields `mutable_buffer` slices; passing a `const_buffer` yields -`const_buffer` slices. This matters for tests that need to write into the -produced buffers rather than only read from them. +`bufgrind` is templated on a `ConstBufferSequence` but the slices it +produces follow the mutability of the input. Each half is a buffer-slice +view (`slice_type`); its `data()` member exposes the underlying buffer +sequence. Passing a `mutable_buffer` yields halves whose `data()` models +`MutableBufferSequence`; passing a `const_buffer` yields halves whose +`data()` models `ConstBufferSequence`. This matters for tests that need +to write into the produced buffers rather than only read from them. [source,cpp] ---- @@ -112,8 +114,9 @@ bufgrind bg(mb); while(bg) { auto [b1, b2] = co_await bg.next(); - // b1 and b2 are mutable_buffer; callers may write into them - static_assert(std::is_same_v); + // b1 and b2 are buffer-slice views; data() exposes the underlying + // buffer sequence, here a MutableBufferSequence callers may write into + static_assert(MutableBufferSequence); } ---- @@ -180,7 +183,7 @@ auto r = f.inert([&](fuse&) -> task<> { while(bg) { auto [b1, b2] = co_await bg.next(); - BOOST_TEST_EQ(buffer_to_string(b1, b2), original); + BOOST_TEST_EQ(buffer_to_string(b1.data(), b2.data()), original); } }); BOOST_TEST(r.success); @@ -242,8 +245,8 @@ void test_parser_all_splits() // Feed the split as two discrete reads read_stream rs(f); - rs.provide(buffer_to_string(b1)); - rs.provide(buffer_to_string(b2)); + rs.provide(buffer_to_string(b1.data())); + rs.provide(buffer_to_string(b2.data())); std::string got = co_await read_all(rs); BOOST_TEST_EQ(got, input); diff --git a/doc/modules/ROOT/pages/8.examples/8a.hello-task.adoc b/doc/modules/ROOT/pages/8.examples/8a.hello-task.adoc index 4be42e699..24b547646 100644 --- a/doc/modules/ROOT/pages/8.examples/8a.hello-task.adoc +++ b/doc/modules/ROOT/pages/8.examples/8a.hello-task.adoc @@ -32,6 +32,7 @@ int main() { capy::thread_pool pool; capy::run_async(pool.get_executor())(say_hello()); + pool.join(); return 0; } ---- @@ -70,7 +71,7 @@ capy::thread_pool pool; `thread_pool` provides an execution context with worker threads. By default, it creates one thread per CPU core. -The pool's destructor waits for all work to complete before returning. This ensures the program doesn't exit while tasks are running. +Call `pool.join()` before the pool is destroyed to wait for all outstanding work to finish. The destructor does *not* wait for queued work: `~thread_pool` calls `stop()`, which abandons any work that has not yet started, then joins the worker threads. Without the explicit `pool.join()`, `main` could destroy the pool and exit before `say_hello` ever runs, so `"Hello from Capy!"` might never print. === Launching diff --git a/doc/modules/ROOT/pages/8.examples/8d.mock-stream-testing.adoc b/doc/modules/ROOT/pages/8.examples/8d.mock-stream-testing.adoc index 4d7fc39c0..534de6757 100644 --- a/doc/modules/ROOT/pages/8.examples/8d.mock-stream-testing.adoc +++ b/doc/modules/ROOT/pages/8.examples/8d.mock-stream-testing.adoc @@ -181,26 +181,26 @@ target_link_libraries(mock_stream_testing PRIVATE capy) [source,cpp] ---- capy::test::fuse f; // test::fuse -capy::test::stream mock(f); // test::stream -mock.provide("hello\n"); +auto [a, b] = capy::test::make_stream_pair(f); // connected test::stream pair +b.provide("hello\n"); // supply read input on one end ---- `test::stream` is a bidirectional mock that satisfies both `ReadStream` and `WriteStream`: -* Constructor takes a `fuse&` for error injection -* `provide(data)` — Supplies data for reads -* `data()` — Returns data written to the mock -* Second constructor parameter controls max bytes per operation +* Streams are obtained in connected pairs from `make_stream_pair(f)`; the shared `fuse` enables error injection +* `provide(data)` — Supplies data for the peer's reads +* `data()` — Returns data written to this end +* `set_max_read_size(n)` — Limits bytes returned per read, simulating chunked delivery === Type-Erased Streams [source,cpp] ---- -// Wrap mock in any_stream using pointer construction for reference semantics -capy::any_stream stream{&mock}; // any_stream +// Wrap one end in any_stream using pointer construction for reference semantics +capy::any_stream stream{&a}; // any_stream ---- -Use pointer construction (`&mock`) so the `any_stream` wrapper references the mock without taking ownership. This allows inspecting `mock.data()` after operations. +Use pointer construction (`&a`) so the `any_stream` wrapper references the stream end without taking ownership. This allows inspecting `b.data()` after operations. === Synchronous Testing @@ -218,7 +218,7 @@ capy::test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream ---- capy::test::fuse f; // test::fuse auto r = f.armed([&](capy::test::fuse&) -> capy::task<> { - capy::test::stream mock(f); // test::stream + auto [a, b] = capy::test::make_stream_pair(f); // connected test::stream pair // ... run test ... }); ---- diff --git a/doc/modules/ROOT/pages/9.design/9a.CapyLayering.adoc b/doc/modules/ROOT/pages/9.design/9a.CapyLayering.adoc index 4b3b2abe7..1d8700334 100644 --- a/doc/modules/ROOT/pages/9.design/9a.CapyLayering.adoc +++ b/doc/modules/ROOT/pages/9.design/9a.CapyLayering.adoc @@ -41,7 +41,7 @@ task<> echo(any_stream& stream) for(;;) { auto [ec, n] = co_await stream.read_some( - mutable_buffer(buf)); + make_buffer(buf)); if(ec) co_return; co_await write(stream, const_buffer(buf, n)); @@ -84,7 +84,7 @@ The user chooses where the boundary falls. Not the library. Buffer sink and buffer source invert the traditional ownership model. Instead of the caller allocating a buffer, filling it with data, and handing it to the library, the library exposes its own internal storage and the caller fills it in place. Zero copies. The data goes directly where it needs to be. -The `BufferSink` concept formalizes this with three operations. `prepare()` returns writable buffers from the sink's internal storage. The caller writes data into those buffers. `commit()` tells the sink how many bytes were written: +The `BufferSink` concept formalizes this with three operations. `prepare()` returns writable buffers from the sink's internal storage. The caller writes data into those buffers. `commit()` tells the sink how many bytes were written, and `commit_eof()` commits the final bytes and signals end-of-stream: [source,cpp] ---- @@ -95,6 +95,7 @@ concept BufferSink = { sink.prepare(dest) } -> std::same_as>; { sink.commit(n) } -> IoAwaitable; + { sink.commit_eof(n) } -> IoAwaitable; }; ---- diff --git a/doc/modules/ROOT/pages/9.design/9c.ReadStream.adoc b/doc/modules/ROOT/pages/9.design/9c.ReadStream.adoc index 4edfce423..77fa2b052 100644 --- a/doc/modules/ROOT/pages/9.design/9c.ReadStream.adoc +++ b/doc/modules/ROOT/pages/9.design/9c.ReadStream.adoc @@ -30,7 +30,9 @@ Attempts to read up to `buffer_size(buffers)` bytes from the stream into the buf If `buffer_size(buffers) > 0`: - If `!ec`, then `n >= 1 && n \<= buffer_size(buffers)`. `n` bytes were read into the buffer sequence. -- If `ec`, then `n >= 0 && n \<= buffer_size(buffers)`. `n` is the number of bytes read before the I/O condition arose. +- If `ec`, then `n >= 0 && n \< buffer_size(buffers)`. `n` is the number of bytes read before the I/O condition arose. + +Equivalently, `n == buffer_size(buffers)` implies `!ec`: a completion that fills the buffer sequence is a success, even when the underlying operation also signals a condition such as end-of-stream. That condition is reported on a subsequent read. If `buffer_empty(buffers)` is true, `n` is 0. The empty buffer is not itself a cause for error, but `ec` may reflect the state of the stream. @@ -104,7 +106,7 @@ task<> read_header(Stream& stream) { char header[16]; auto [ec, n] = co_await read( - stream, mutable_buffer(header)); + stream, make_buffer(header)); if(ec == cond::eof) co_return; // clean shutdown if(ec) @@ -172,7 +174,7 @@ task<> echo(Stream& stream, WriteStream auto& dest) for(;;) { auto [ec, n] = co_await stream.read_some( - mutable_buffer(buf)); + make_buffer(buf)); auto [wec, nw] = co_await dest.write_some( const_buffer(buf, n)); @@ -245,21 +247,23 @@ Because `ReadSource` refines `ReadStream`, this relay function also accepts `Rea | `WriteSink::write` |=== -== Design Foundations: Why Errors May Accompany Data +== Design Foundations: Why a Full Buffer Is Always Success -The `read_some` contract permits `n > 0` when `ec` is set. Data and errors are not mutually exclusive: the implementation reports exactly what happened. This is the most consequential design decision in the `ReadStream` concept, with implications for every consumer of `read_some` in the library. This section explains the design and its consequences. +The `read_some` contract treats a completion that fills the buffer sequence as a success: `n == buffer_size(buffers)` implies `!ec`. An error is reported only when the transfer was incomplete, in which case `n \< buffer_size(buffers)`. A pending condition, such as end-of-stream, is never delivered alongside a full buffer; it is deferred to the next read. This is the most consequential design decision in the `ReadStream` concept, with implications for every consumer of `read_some` in the library. This section explains the design and its consequences. === The Return Type's Purpose -POSIX `read(2)` returns a single `ssize_t` -- either a byte count or -1 with `errno`. It cannot report both a byte count and an error simultaneously. When a partial transfer occurs before an error, POSIX returns the byte count on the current call and defers the error to the next. The `(error_code, size_t)` return type was designed to transcend this limitation. It can carry both values at once, allowing implementations to report partial transfers alongside the condition that stopped the transfer, as a single result. +The `(error_code, size_t)` return type carries both a byte count and a condition, but the contract assigns them disjoint roles. A nonzero `ec` describes only a transfer that fell short of the requested size; it never qualifies a complete transfer. This gives every consumer a single, unambiguous test: if `n == buffer_size(buffers)` the operation succeeded and the bytes are valid, full stop. There is no need to inspect `ec` to decide whether the data can be used. + +=== Deferring Conditions to the Next Read -=== Departing from Asio +A condition such as end-of-stream is a property of the stream, not of the bytes that were just delivered. If a read happens to fill the buffer exactly as the stream reaches its end, the bytes are still good and the read still succeeded. Reporting EOF on that same completion would force every caller to reconcile "I got all my bytes" with "but there was also an error," which is precisely the ambiguity the contract removes. Instead the condition surfaces on the next read, when `n` is necessarily less than the buffer size, and the caller observes it cleanly. -Asio's `AsyncReadStream` concept requires `bytes_transferred == 0` on error. This was a reasonable design for an API built around POSIX-like streams, where the underlying system calls enforce binary outcomes per call. However, it imposes a burden on layered streams that do not share this limitation. +This is what lets generic composition algorithms such as `when_all` and `when_any` distinguish a completed transfer from a failure by inspecting `n` alone. A short read signals a condition; a full read does not. -A TLS stream might decrypt 100 bytes into user space, then receive a fatal alert on the next record. Under the strict rule it must either report `(!ec, 100)` now and `(ec, 0)` on the next call (requiring deferred-error bookkeeping), or report `(ec, 0)` and discard 100 valid bytes. Neither is clean. Under the relaxed rule, the TLS stream reports `(ec, 100)` honestly: here are the bytes that arrived, and here is the condition that stopped the transfer. +=== The Implementation Burden Is Internal -The `ReadStream` concept permits both behaviors. Streams that naturally produce `(ec, 0)` on error (such as POSIX socket wrappers) conform. Streams that report `(ec, n)` with `n > 0` (such as TLS or compression layers) also conform. The concept imposes the weakest postcondition that all conforming types can satisfy. +Deferral is forced only at the boundary, when the final bytes exactly fill the buffer. Because `(ec, buffer_size(buffers))` is not a permitted result, a stream that fills the buffer and simultaneously reaches a stopping condition reports `(!ec, buffer_size(buffers))` now and surfaces the condition on the next call. When the buffer is not filled, no deferral is required: the stream may report the condition directly alongside the partial count. The bookkeeping for the rare exact-fill case lives inside the stream, where it already has the context, rather than being pushed onto every caller. The concept imposes the postcondition that makes consumers simplest, and conforming streams arrange their internal state to honor it. === The Empty-Buffer Rule @@ -292,23 +296,17 @@ if(ec) The advance-then-check ordering is the only correct pattern. It is required for any operation that can report partial progress alongside an error -- `read` returning `(eof, 47)` being the canonical example. If the check precedes the advance, the 47 bytes are silently dropped. -Under the strict rule (`n == 0` on error), the advance is a harmless no-op. Under the relaxed rule (`n >= 0` on error), the advance captures partial progress. The caller writes identical code either way. The perceived simplification of the strict rule exists only if the caller writes the check-then-advance anti-pattern, which is already incorrect for other reasons. +Because an error can accompany partial data, the advance must run before the check so the bytes that did arrive are counted; on a clean completion the same code advances by the full amount. Writing the check first would silently drop those bytes, so advance-then-check is the only correct order. === Implementer Freedom -Under the strict rule, every stream that might encounter an error after a partial transfer must choose between: +When a stream produces some bytes and then observes a stopping condition before the buffer is full, it reports both at once: `(ec, k)` with `k \< buffer_size(buffers)`. There is no deferred state, no discarded data, and no internal replay buffer. A stream that decrypts or decompresses into the caller's buffer and then hits a terminal marker simply returns the bytes and the condition together. -- **Deferred errors.** Report `(!ec, k)` now, remember the error, and report `(ec, 0)` on the next call. This requires per-stream state and makes the stream's behavior depend on call history. -- **Data loss.** Report `(ec, 0)` and discard the `k` bytes that were transferred. -- **Internal buffering.** Copy the `k` bytes into an internal buffer and replay them on the next call. This adds allocation and copying overhead. - -Under the relaxed rule, the implementation reports what happened: `(ec, k)`. No deferred state, no data loss, no internal buffering. +The one case that requires deferral is the exact-fill boundary, where the final bytes leave no free space. Since `(ec, buffer_size(buffers))` is not permitted, the stream reports `(!ec, buffer_size(buffers))` and carries the condition to the next call. This case is rare, and its bookkeeping is local to the stream. === Consistency from Primitives Through Composed Operations -The strict postcondition on `read_some` does not propagate to composed operations. The composed `read` returns `(ec, m)` where `m > 0` on failure, because it accumulates data across multiple internal `read_some` calls. The `(ec, n > 0)` case that the strict rule eliminates from `read_some` is immediately reintroduced one layer up. - -The relaxed postcondition avoids this inconsistency. Partial progress alongside an error code is the same pattern at every level -- from `read_some` through composed `read` -- rather than being forbidden at the primitive level and required at the composed level. +`read_some` and the composed `read` report progress with the same shape: `(ec, n)`, where `n` counts the bytes transferred before the condition. The composed `read` returns `(eof, m)` with `m` short of the requested total when the stream ends early; the primitive `read_some` likewise returns `(ec, n)` with `n` short of the buffer size. Partial progress alongside an error code is the same pattern at every level. The single refinement at the primitive level, that an exactly full buffer is reported as success, keeps `n` a reliable proxy for completion at every layer. === Conforming Sources @@ -335,4 +333,4 @@ No source is forced into an unnatural pattern. Sources that naturally separate d - Algorithms that need to process data as it arrives use `read_some` directly. - `ReadSource` refines `ReadStream` by adding `read` for complete-read semantics. -The contract permits errors to accompany partial data. This uses the richer `(error_code, size_t)` return type to its full potential, avoids forcing non-POSIX streams into a deferred-error model, and produces a postcondition that is consistent from `read_some` through composed operations. The canonical advance-then-check loop handles both cases correctly with no additional call-site cost. +The contract permits errors to accompany partial data, with one rule: a completely filled buffer is always reported as success, and any condition that coincides with it is deferred to the next call. This uses the `(error_code, size_t)` return type to its full potential, keeps a stream from deferring in the common partial case, and keeps `n` a reliable proxy for completion from `read_some` through composed operations. The canonical advance-then-check loop handles every case correctly with no additional call-site cost. diff --git a/doc/modules/ROOT/pages/9.design/9d.ReadSource.adoc b/doc/modules/ROOT/pages/9.design/9d.ReadSource.adoc index 0e0a61b73..25eaacaf3 100644 --- a/doc/modules/ROOT/pages/9.design/9d.ReadSource.adoc +++ b/doc/modules/ROOT/pages/9.design/9d.ReadSource.adoc @@ -31,7 +31,9 @@ Attempts to read up to `buffer_size(buffers)` bytes from the source into the buf If `buffer_size(buffers) > 0`: - If `!ec`, then `n >= 1 && n \<= buffer_size(buffers)`. `n` bytes were read into the buffer sequence. -- If `ec`, then `n >= 0 && n \<= buffer_size(buffers)`. `n` is the number of bytes read before the I/O condition arose. +- If `ec`, then `n >= 0 && n \< buffer_size(buffers)`. `n` is the number of bytes read before the I/O condition arose. + +Equivalently, `n == buffer_size(buffers)` implies `!ec`: a completion that fills the buffer sequence is a success, even when the underlying operation also signals a condition such as end-of-stream. That condition is reported on a subsequent read. If `buffer_empty(buffers)` is true, `n` is 0. The empty buffer is not itself a cause for error, but `ec` may reflect the state of the source. @@ -269,7 +271,7 @@ task<> relay_decompressed(Source& inflater, Sink& dest) { // read_some: decompress whatever is available auto [ec, n] = co_await inflater.read_some( - mutable_buffer(buf)); + make_buffer(buf)); if(ec == cond::eof) { auto [wec] = co_await dest.write_eof(); @@ -299,7 +301,7 @@ task<> relay(Src& src, Sink& dest) for(;;) { auto [ec, n] = co_await src.read_some( - mutable_buffer(buf)); + make_buffer(buf)); auto [wec, nw] = co_await dest.write( const_buffer(buf, n)); @@ -350,16 +352,16 @@ Examples of types that satisfy `ReadSource`: - **File source**: `read_some` is a single `read()` syscall. `read` loops until the buffer is filled or EOF. - **Memory source**: `read_some` returns available bytes. `read` fills the buffer from the memory region. -== Errors May Accompany Data +== A Full Buffer Is Always Success -The `read_some` contract (inherited from `ReadStream`) permits `n > 0` when `ec` is set, including on EOF. The implementation reports exactly what happened: the bytes that arrived and the condition that stopped the transfer. See xref:9.design/9c.ReadStream.adoc#_design_foundations_why_errors_may_accompany_data[ReadStream: Why Errors May Accompany Data] for the full rationale. The key points: +The `read_some` contract (inherited from `ReadStream`) treats a completion that fills the buffer sequence as a success: `n == buffer_size(buffers)` implies `!ec`. An error is reported only when the transfer was incomplete, in which case `n \< buffer_size(buffers)`. A pending condition such as end-of-stream is deferred to the next read. See xref:9.design/9c.ReadStream.adoc#_design_foundations_why_a_full_buffer_is_always_success[ReadStream: Why a Full Buffer Is Always Success] for the full rationale. The key points: -- The `(error_code, size_t)` return type can carry both values simultaneously, transcending the POSIX limitation of reporting only one per call. -- Layered streams (TLS, compression) may encounter an error after a partial transfer. Allowing `(ec, n)` with `n > 0` avoids forcing deferred-error bookkeeping or data loss. -- The canonical advance-then-check loop handles both cases correctly with no additional call-site cost. -- Concrete types that naturally produce `(ec, 0)` on error (POSIX socket wrappers) continue to do so. +- A nonzero `ec` describes only a transfer that fell short of the requested size; it never qualifies a complete transfer. +- A condition observed exactly as the buffer fills is held for the next call, where `n` is necessarily less than the buffer size, rather than delivered alongside good bytes. +- Consumers test `n == buffer_size(buffers)` alone to know the read succeeded, and generic algorithms such as `when_all` and `when_any` distinguish a completed transfer from a failure on that basis. +- A layered stream (TLS, compression) keeps the deferred-condition bookkeeping internal, where it has the context to manage it. -This contract carries over to `ReadSource` unchanged. Both `read_some` and `read` allow `n > 0` on error or EOF, reporting the bytes that were transferred before the condition arose. +The `read` operation is a different primitive and uses the complete-read semantics described above: it either fills the entire buffer with `!ec`, or returns `ec` (including `cond::eof`) with `n` indicating the bytes transferred before the condition arose. == Summary diff --git a/doc/modules/ROOT/pages/9.design/9f.WriteStream.adoc b/doc/modules/ROOT/pages/9.design/9f.WriteStream.adoc index 42cebd6c4..4db67494e 100644 --- a/doc/modules/ROOT/pages/9.design/9f.WriteStream.adoc +++ b/doc/modules/ROOT/pages/9.design/9f.WriteStream.adoc @@ -30,7 +30,9 @@ Attempts to write up to `buffer_size(buffers)` bytes from the buffer sequence to If `buffer_size(buffers) > 0`: - If `!ec`, then `n >= 1 && n \<= buffer_size(buffers)`. `n` bytes were written from the buffer sequence. -- If `ec`, then `n >= 0 && n \<= buffer_size(buffers)`. `n` is the number of bytes written before the I/O condition arose. +- If `ec`, then `n >= 0 && n \< buffer_size(buffers)`. `n` is the number of bytes written before the I/O condition arose. + +Equivalently, `n == buffer_size(buffers)` implies `!ec`: a completion that writes the entire buffer sequence is a success, even when the underlying operation also signals a condition. That condition is reported on a subsequent write. If `buffer_empty(buffers)` is true, `n` is 0. The empty buffer is not itself a cause for error, but `ec` may reflect the state of the stream. diff --git a/doc/modules/ROOT/pages/9.design/9g.WriteSink.adoc b/doc/modules/ROOT/pages/9.design/9g.WriteSink.adoc index 3e9aae202..db8fefd1b 100644 --- a/doc/modules/ROOT/pages/9.design/9g.WriteSink.adoc +++ b/doc/modules/ROOT/pages/9.design/9g.WriteSink.adoc @@ -57,7 +57,9 @@ This is the low-level primitive inherited from `WriteStream`. It is appropriate If `buffer_size(buffers) > 0`: - If `!ec`, then `n >= 1 && n \<= buffer_size(buffers)`. `n` bytes were written from the buffer sequence. -- If `ec`, then `n >= 0 && n \<= buffer_size(buffers)`. `n` is the number of bytes written before the I/O condition arose. +- If `ec`, then `n >= 0 && n \< buffer_size(buffers)`. `n` is the number of bytes written before the I/O condition arose. + +Equivalently, `n == buffer_size(buffers)` implies `!ec`: a completion that writes the entire buffer sequence is a success, even when the underlying operation also signals a condition. That condition is reported on a subsequent write. If `buffer_empty(buffers)` is true, `n` is 0. The empty buffer is not itself a cause for error, but `ec` may reflect the state of the stream. @@ -179,7 +181,7 @@ task<> relay(Source& src, Sink& dest) for(;;) { auto [ec, n] = co_await src.read_some( - mutable_buffer(buf)); + make_buffer(buf)); // Forward whatever arrived before checking the error std::size_t written = 0; diff --git a/doc/modules/ROOT/pages/9.design/9h.BufferSink.adoc b/doc/modules/ROOT/pages/9.design/9h.BufferSink.adoc index f2389603d..5eb26abc4 100644 --- a/doc/modules/ROOT/pages/9.design/9h.BufferSink.adoc +++ b/doc/modules/ROOT/pages/9.design/9h.BufferSink.adoc @@ -281,7 +281,7 @@ task<> transfer(Source& source, Sink& sink) for(;;) { auto [ec, n] = co_await source.read_some( - mutable_buffer(buf)); + make_buffer(buf)); if(ec == cond::eof) { auto [wec] = co_await sink.write_eof(); diff --git a/doc/modules/ROOT/pages/9.design/9n.WhyNotCobaltConcepts.adoc b/doc/modules/ROOT/pages/9.design/9n.WhyNotCobaltConcepts.adoc index 7a0c8b8ff..0c9f37ac7 100644 --- a/doc/modules/ROOT/pages/9.design/9n.WhyNotCobaltConcepts.adoc +++ b/doc/modules/ROOT/pages/9.design/9n.WhyNotCobaltConcepts.adoc @@ -38,7 +38,7 @@ concept IoAwaitable = }; ---- -`IoRunnable` refines `IoAwaitable` with operations needed to start a task from non-coroutine contexts: `handle()`, `release()`, `exception()`, and `result()`. +`IoRunnable` refines `IoAwaitable` with operations needed to start a task from non-coroutine contexts: `handle()`, `release()`, `exception()`, `result()`, and the promise-level `set_continuation()` and `set_environment()`. [source,cpp] ---- @@ -46,18 +46,21 @@ template concept IoRunnable = IoAwaitable && requires { typename T::promise_type; } && - requires(T& t, T const& ct, typename T::promise_type const& cp) + requires(T& t, T const& ct, typename T::promise_type const& cp, + typename T::promise_type& p) { { ct.handle() } noexcept -> std::same_as>; { cp.exception() } noexcept -> std::same_as; { t.release() } noexcept; + { p.set_continuation(std::coroutine_handle<>{}) } noexcept; + { p.set_environment(static_cast(nullptr)) } noexcept; } && (std::is_void_v().await_resume())> || requires(typename T::promise_type& p) { p.result(); }); ---- -Context injection (`set_environment`, `set_continuation`) is handled internally by the promise through `await_suspend` and is not part of any concept. +Context injection (`set_environment`, `set_continuation`) consists of `noexcept` requirements that `IoRunnable` places on `T::promise_type`; launch functions invoke them through the typed handle returned by `handle()` before resuming the frame. Each concept has documented syntactic requirements, semantic requirements, conforming signatures, and examples. A user who wants to create a custom task type can read the concept definition and know exactly what to provide. The compiler enforces the syntactic requirements at constraint-check time. @@ -305,10 +308,17 @@ Capy's `WriteStream` concept includes semantic requirements in the concept's doc // // If !ec, then n >= 1 && n <= buffer_size( buffers ). // n bytes were written from the buffer sequence. -// If ec, then n >= 0 && n <= buffer_size( buffers ). +// If ec, then n >= 0 && n < buffer_size( buffers ). // n is the number of bytes written before the I/O // condition arose. // +// Equivalently, n == buffer_size( buffers ) implies !ec: a +// completion that writes the entire buffer sequence is a success, +// even when the underlying operation also signals a condition. That +// condition is reported on a subsequent write. This lets generic +// composition algorithms such as when_all and when_any distinguish +// a completed transfer from a failure. +// // If buffer_empty( buffers ) is true, n is 0. The empty // buffer is not itself a cause for error, but ec may reflect // the state of the stream. @@ -341,7 +351,7 @@ The concept also includes a coroutine-specific warning about buffer lifetime: | | Error reporting semantics -| Documented (`ec` + `n >= 0 && n \<= buffer_size`) +| Documented (`ec` + `n >= 0 && n \< buffer_size`) | | Partial write guarantees diff --git a/doc/modules/ROOT/pages/9.design/9o.WhyNotTMC.adoc b/doc/modules/ROOT/pages/9.design/9o.WhyNotTMC.adoc index 4b006ac39..8bb472d2f 100644 --- a/doc/modules/ROOT/pages/9.design/9o.WhyNotTMC.adoc +++ b/doc/modules/ROOT/pages/9.design/9o.WhyNotTMC.adoc @@ -187,7 +187,7 @@ The awaitable receives: * `env` — The execution environment containing: ** `env->executor` — The executor (where to resume) ** `env->stop_token` — A stop token (for cancellation) -** `env->allocator` — An optional allocator for frame allocation +** `env->frame_allocator` — An optional `std::pmr::memory_resource*` for coroutine frame allocation (null selects the default allocator) *TMC's approach:* diff --git a/doc/modules/ROOT/pages/index.adoc b/doc/modules/ROOT/pages/index.adoc index a0153eef3..a2559acb5 100644 --- a/doc/modules/ROOT/pages/index.adoc +++ b/doc/modules/ROOT/pages/index.adoc @@ -6,7 +6,7 @@ Capy abstracts away sockets, files, and asynchrony with type-erased streams and * *Lazy coroutine tasks* — `task` with forward-propagating stop tokens and automatic cancellation * *Buffer sequences* — taken straight from Asio and improved -* *Stream concepts* — `ReadStream`, `WriteStream`, `ReadSource`, `WriteSink`, `BufferSource`, `BufferSink` +* *Stream concepts* — seven coroutine stream concepts: `ReadStream`, `WriteStream`, `Stream`, `ReadSource`, `WriteSink`, `BufferSource`, `BufferSink` * *Type-erased streams* — `any_stream`, `any_read_stream`, `any_write_stream` for fast compilation * *Concurrency facilities* — executors, strands, thread pools, `when_all`, `when_any` * *Test utilities* — mock streams, mock sources/sinks, error injection @@ -86,7 +86,7 @@ task<> echo(any_stream& stream) char buf[1024]; for(;;) { - auto [ec, n] = co_await stream.read_some(mutable_buffer(buf)); + auto [ec, n] = co_await stream.read_some(make_buffer(buf)); auto [wec, wn] = co_await write(stream, const_buffer(buf, n)); @@ -100,10 +100,13 @@ task<> echo(any_stream& stream) int main() { - thread_pool pool; - // In a real application, you would obtain a stream from Corosio - // and call: run_async(pool.get_executor())(echo(stream)); - return 0; + // In a real application, you would obtain a stream from Corosio, + // then launch the coroutine on its io_context and run it: + // + // corosio::io_context ioc; + // corosio::tcp_socket stream = /* from an acceptor or connect */; + // run_async(ioc.get_executor())(echo(stream)); + // ioc.run(); } ---- @@ -118,4 +121,4 @@ The `task<>` return type (equivalent to `task`) creates a lazy coroutine t * xref:3.concurrency/3a.foundations.adoc[Concurrency Tutorial] — Understand threads, mutexes, and synchronization * xref:4.coroutines/4a.tasks.adoc[Coroutines in Capy] — Deep dive into `task` and the IoAwaitable protocol * xref:5.buffers/5a.overview.adoc[Buffer Sequences] — Master the concept-driven buffer model -* xref:6.streams/6a.overview.adoc[Stream Concepts] — Understand the six stream concepts +* xref:6.streams/6a.overview.adoc[Stream Concepts] — Understand the seven stream concepts diff --git a/doc/modules/ROOT/pages/quick-start.adoc b/doc/modules/ROOT/pages/quick-start.adoc index d04ef68f5..370f46698 100644 --- a/doc/modules/ROOT/pages/quick-start.adoc +++ b/doc/modules/ROOT/pages/quick-start.adoc @@ -46,7 +46,9 @@ int main() // Launch the coroutine on the pool's executor capy::run_async(pool.get_executor())(greet()); - // Pool destructor waits for all work to complete + // join() waits for outstanding work to complete; the pool + // destructor only stops the pool and discards pending work + pool.join(); } ---- @@ -55,7 +57,7 @@ int main() [source,bash] ---- # With GCC -g++ -std=c++20 -o hello_coro hello_coro.cpp -lboost_system -pthread +g++ -std=c++20 -o hello_coro hello_coro.cpp -lcapy -pthread # Run ./hello_coro @@ -75,7 +77,7 @@ The answer is 42 4. `greet()` runs until it hits `co_await answer()` 5. `answer()` runs and returns 42 6. `greet()` resumes with the result and prints it -7. `greet()` completes, the pool destructor waits and returns +7. `greet()` completes, `pool.join()` returns, and `main()` exits The key insight: both coroutines ran on the same executor because affinity propagated automatically through the `co_await`. diff --git a/include/boost/capy/ex/strand.hpp b/include/boost/capy/ex/strand.hpp index f607610d3..b761622c9 100644 --- a/include/boost/capy/ex/strand.hpp +++ b/include/boost/capy/ex/strand.hpp @@ -57,7 +57,7 @@ namespace capy { @par Example @code thread_pool pool(4); - auto strand = make_strand(pool.get_executor()); + strand strand(pool.get_executor()); // CTAD deduces the executor type // Continuations are linked intrusively into the strand's queue, // so each one must outlive its time there. Storage is typically @@ -71,7 +71,7 @@ namespace capy { @tparam E The type of the underlying executor. Must satisfy the `Executor` concept. - @see make_strand, Executor + @see Executor */ template class strand diff --git a/include/boost/capy/ex/thread_pool.hpp b/include/boost/capy/ex/thread_pool.hpp index 29097ec60..f6e1b9625 100644 --- a/include/boost/capy/ex/thread_pool.hpp +++ b/include/boost/capy/ex/thread_pool.hpp @@ -37,7 +37,8 @@ namespace capy { thread_pool pool(4); // 4 worker threads auto ex = pool.get_executor(); ex.post(some_coroutine); - // pool destructor waits for all work to complete + pool.join(); // wait for outstanding work to complete + // pool destructor stops the pool, discarding any pending work @endcode */ class BOOST_CAPY_DECL diff --git a/include/boost/capy/io_result.hpp b/include/boost/capy/io_result.hpp index ac2f13434..8a058ae22 100644 --- a/include/boost/capy/io_result.hpp +++ b/include/boost/capy/io_result.hpp @@ -33,8 +33,11 @@ namespace capy { if (ec) { ... } @endcode - @note Payload members are only meaningful when - `ec` does not indicate an error. + @note Whether the payload is meaningful when `ec` is set is + defined by the operation that produced the result. Many I/O + operations report a meaningful partial result alongside `ec` + (for example, the number of bytes transferred before the + condition, as with EOF); others leave it unspecified. @tparam Ts Ordered payload types following the leading `std::error_code`. @@ -45,7 +48,8 @@ struct [[nodiscard]] io_result /// The error code from the operation. std::error_code ec; - /// The payload values. Unspecified when `ec` is set. + /// The payload values. Their meaning when `ec` is set is defined + /// by the producing operation (see the class note). std::tuple values; /// Construct a default io_result. diff --git a/include/boost/capy/read.hpp b/include/boost/capy/read.hpp index 8817fa1e8..054771216 100644 --- a/include/boost/capy/read.hpp +++ b/include/boost/capy/read.hpp @@ -126,9 +126,10 @@ read(S& stream, MB buffers) -> The last, potenitally partial, read is also appended. - The value passed in the first call to `dynbuf.prepare` is the smallest of - `initial_amount` and `dynbuf.max_size() - dynbuf.size()`. Value passed - to each subsequent call is 1.5 the value passed in the preceding call. + The value passed in the first call to `dynbuf.prepare` is `initial_amount`. + The value is grown to 1.5 times the preceding value only after a read that + completely filled the prepared buffer; otherwise it is left unchanged for + the next call. @par Await-returns @@ -220,9 +221,10 @@ read( The last, potenitally partial, read is also appended. - The value passed in the first call to `dynbuf.prepare` is the smallest of - `initial_amount` and `dynbuf.max_size() - dynbuf.size()`. Value passed - to each subsequent call is 1.5 the value passed in the preceding call. + The value passed in the first call to `dynbuf.prepare` is `initial_amount`. + The value is grown to 1.5 times the preceding value only after a read that + completely filled the prepared buffer; otherwise it is left unchanged for + the next call. @par Await-returns diff --git a/include/boost/capy/test/buffer_sink.hpp b/include/boost/capy/test/buffer_sink.hpp index 516c95977..5c6991f02 100644 --- a/include/boost/capy/test/buffer_sink.hpp +++ b/include/boost/capy/test/buffer_sink.hpp @@ -47,18 +47,18 @@ namespace test { auto r = f.armed( [&]( fuse& ) -> task { mutable_buffer arr[16]; - std::size_t count = bs.prepare( arr, 16 ); - if( count == 0 ) + auto bufs = bs.prepare( arr ); + if( bufs.empty() ) co_return; - // Write data into arr[0] - std::memcpy( arr[0].data(), "Hello", 5 ); + // Write data into the first prepared buffer + std::memcpy( bufs[0].data(), "Hello", 5 ); auto [ec] = co_await bs.commit( 5 ); if( ec ) co_return; - auto [ec2] = co_await bs.commit_eof(); + auto [ec2] = co_await bs.commit_eof( 0 ); // bs.data() returns "Hello" } ); @endcode diff --git a/include/boost/capy/test/buffer_to_string.hpp b/include/boost/capy/test/buffer_to_string.hpp index 7dd350973..9d66d7327 100644 --- a/include/boost/capy/test/buffer_to_string.hpp +++ b/include/boost/capy/test/buffer_to_string.hpp @@ -41,7 +41,7 @@ namespace test { bufgrind bg( cb ); while( bg ) { auto [b1, b2] = co_await bg.next(); - BOOST_TEST_EQ( buffer_to_string( b1, b2 ), "hello" ); + BOOST_TEST_EQ( buffer_to_string( b1.data(), b2.data() ), "hello" ); } @endcode