the logical flow.**
# Promise and Future: Bridging Callbacks and Coroutines
This is exactly where [asyncio](https://github.com/Hackerl/asyncio) shines.
`asyncio` is an async framework built on C++20 coroutines and the `libuv` event loop. Its core design idea is simple: use `Promise` and `Future` as a bridge to connect the callback world and the coroutine world.
A `Promise` is an object that can be "resolved" or "rejected". `Future` is its other face — you can `co_await` a `Future`, and the coroutine will automatically resume when the `Promise` is resolved.
Take the `sleep` function as an example, to see how `asyncio` does it:
asyncio::task::Task<void> asyncio::sleep(std::chrono::milliseconds ms) {
Promise<void> promise;
uv_timer_start(timer, [](uv_timer_t *handle) {
// Timer fires, resolve the promise, coroutine resumes
static_cast<Promise<void> *>(handle->data)->resolve();
}, ms.count(), 0);
co_return co_await promise.getFuture();
}
Apply the same treatment to gRPC callbacks: `resolve` or `reject` a `Promise` inside the callback, and `co_await` the corresponding `Future` on the coroutine side. This lets you write what would otherwise require nested callbacks as straight-line code.
# Cancellation Support
Coroutine cancellation is also implemented through the `Promise` mechanism. `asyncio` provides a `Cancellable` wrapper that takes a `Future` and a cancellation function:
co_return co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
// Perform the actual cancellation here
context->TryCancel();
return {};
}
};
When external code calls `task.cancel()`, `asyncio` walks the task chain to find the `Cancellable` currently being awaited and executes its cancellation function. For gRPC, the cancellation function simply calls `context->TryCancel()`. gRPC then handles the cleanup, triggers the `OnDone` callback, the `Promise` is eventually rejected, and the coroutine ends with a cancellation error.
With this mechanism, we can cleanly add cancellation support to any gRPC async operation without introducing any special state variables into business code.
# Defining the Sample Service
Before writing code, let's define a sample service covering all four RPC types:
syntax = "proto3";
package sample;
service SampleService {
// Unary RPC: simplest request-response pattern
rpc Echo(EchoRequest) returns (EchoResponse);
// Server streaming: server continuously pushes data to the client
rpc GetNumbers(GetNumbersRequest) returns (stream Number);
// Client streaming: client continuously sends data, server returns one result
rpc Sum(stream Number) returns (SumResponse);
// Bidirectional streaming: both sides can send and receive continuously
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message EchoRequest { string message = 1; }
message EchoResponse { string message = 1; int64 timestamp = 2; }
message GetNumbersRequest { int32 value = 1; int32 count = 2; }
message Number { int32 value = 1; }
message SumResponse { int32 total = 1; int32 count = 2; }
message ChatMessage { string user = 1; string content = 2; int64 timestamp = 3; }
Each RPC pattern has its use: `Echo` is the canonical RPC, `GetNumbers` lets the server stream a batch of data, `Sum` lets the client stream data and get an aggregated result, and `Chat` is the most complex — a bidirectional stream where either side can send at any time.
Let's implement them one by one, starting with the simplest: client Unary RPC.
# Client Unary RPC
Unary RPC is the most straightforward: send one request, receive one response. The corresponding gRPC Reactor method signature is:
void Stub::async::Echo(
grpc::ClientContext *,
const EchoRequest *,
EchoResponse *,
std::function<void(grpc::Status)>
);
gRPC calls the
# Promise and Future: Bridging Callbacks and Coroutines
This is exactly where [asyncio](https://github.com/Hackerl/asyncio) shines.
`asyncio` is an async framework built on C++20 coroutines and the `libuv` event loop. Its core design idea is simple: use `Promise` and `Future` as a bridge to connect the callback world and the coroutine world.
A `Promise` is an object that can be "resolved" or "rejected". `Future` is its other face — you can `co_await` a `Future`, and the coroutine will automatically resume when the `Promise` is resolved.
Take the `sleep` function as an example, to see how `asyncio` does it:
asyncio::task::Task<void> asyncio::sleep(std::chrono::milliseconds ms) {
Promise<void> promise;
uv_timer_start(timer, [](uv_timer_t *handle) {
// Timer fires, resolve the promise, coroutine resumes
static_cast<Promise<void> *>(handle->data)->resolve();
}, ms.count(), 0);
co_return co_await promise.getFuture();
}
Apply the same treatment to gRPC callbacks: `resolve` or `reject` a `Promise` inside the callback, and `co_await` the corresponding `Future` on the coroutine side. This lets you write what would otherwise require nested callbacks as straight-line code.
# Cancellation Support
Coroutine cancellation is also implemented through the `Promise` mechanism. `asyncio` provides a `Cancellable` wrapper that takes a `Future` and a cancellation function:
co_return co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
// Perform the actual cancellation here
context->TryCancel();
return {};
}
};
When external code calls `task.cancel()`, `asyncio` walks the task chain to find the `Cancellable` currently being awaited and executes its cancellation function. For gRPC, the cancellation function simply calls `context->TryCancel()`. gRPC then handles the cleanup, triggers the `OnDone` callback, the `Promise` is eventually rejected, and the coroutine ends with a cancellation error.
With this mechanism, we can cleanly add cancellation support to any gRPC async operation without introducing any special state variables into business code.
# Defining the Sample Service
Before writing code, let's define a sample service covering all four RPC types:
syntax = "proto3";
package sample;
service SampleService {
// Unary RPC: simplest request-response pattern
rpc Echo(EchoRequest) returns (EchoResponse);
// Server streaming: server continuously pushes data to the client
rpc GetNumbers(GetNumbersRequest) returns (stream Number);
// Client streaming: client continuously sends data, server returns one result
rpc Sum(stream Number) returns (SumResponse);
// Bidirectional streaming: both sides can send and receive continuously
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message EchoRequest { string message = 1; }
message EchoResponse { string message = 1; int64 timestamp = 2; }
message GetNumbersRequest { int32 value = 1; int32 count = 2; }
message Number { int32 value = 1; }
message SumResponse { int32 total = 1; int32 count = 2; }
message ChatMessage { string user = 1; string content = 2; int64 timestamp = 3; }
Each RPC pattern has its use: `Echo` is the canonical RPC, `GetNumbers` lets the server stream a batch of data, `Sum` lets the client stream data and get an aggregated result, and `Chat` is the most complex — a bidirectional stream where either side can send at any time.
Let's implement them one by one, starting with the simplest: client Unary RPC.
# Client Unary RPC
Unary RPC is the most straightforward: send one request, receive one response. The corresponding gRPC Reactor method signature is:
void Stub::async::Echo(
grpc::ClientContext *,
const EchoRequest *,
EchoResponse *,
std::function<void(grpc::Status)>
);
gRPC calls the
GitHub
GitHub - Hackerl/asyncio: C++23 coroutine network framework
C++23 coroutine network framework. Contribute to Hackerl/asyncio development by creating an account on GitHub.
callback we pass in when the RPC completes. Bridging it with `Promise` is natural:
asyncio::task::Task<sample::EchoResponse> echo(
const sample::EchoRequest &request,
std::shared_ptr<grpc::ClientContext> context
) {
sample::EchoResponse response;
asyncio::Promise<void, std::string> promise;
stub->async()->Echo(
context.get(), &request, &response,
[&](const grpc::Status &status) {
if (!status.ok()) {
promise.reject(status.error_message());
return;
}
promise.resolve();
}
);
if (const auto result = co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
}; !result)
throw co_await asyncio::error::StacktraceError<std::runtime_error>::make(result.error());
co_return response;
}
The core logic is just a few lines: construct a `Promise`, decide `resolve` or `reject` based on `Status` inside the callback, then `co_await` the `Promise`'s `Future`.
Cancellation support is woven in naturally — wrap the `Future` with `Cancellable`, call `context->TryCancel()` on cancellation. From the caller's perspective, this function is indistinguishable from a normal synchronous function, yet it never blocks the event loop.
# Client Streaming RPCs
Streaming RPCs are more complex than Unary, because data is transmitted one piece at a time and each read or write is an independent async operation.
# Server Streaming (Reader)
For a server-streaming RPC, the client needs to inherit `grpc::ClientReadReactor<T>`. Each call to `StartRead` causes gRPC to invoke `OnReadDone` when data is ready.
template<typename T>
class Reader final : public grpc::ClientReadReactor<T> {
public:
explicit Reader(std::shared_ptr<grpc::ClientContext> context) : mContext{std::move(context)} {
}
void OnDone(const grpc::Status &status) override {
if (!status.ok()) {
mDonePromise.reject(status.error_message());
return;
}
mDonePromise.resolve();
}
void OnReadDone(const bool ok) override {
// Resolve the per-read Promise when each read completes
std::exchange(mReadPromise, std::nullopt)->resolve(ok);
}
asyncio::task::Task<std::optional<T>> read() {
T element;
asyncio::Promise<bool> promise;
auto future = promise.getFuture();
mReadPromise.emplace(std::move(promise));
grpc::ClientReadReactor<T>::StartRead(&element);
// ok == false means the stream has ended
if (!co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
})
co_return std::nullopt;
co_return element;
}
asyncio::task::Task<void> done() {
if (const auto result = co_await mDonePromise.getFuture(); !result)
throw co_await asyncio::error::StacktraceError<std::runtime_error>::make(result.error());
}
private:
std::shared_ptr<grpc::ClientContext> mContext;
asyncio::Promise<void, std::string> mDonePromise;
std::optional<asyncio::Promise<bool>> mReadPromise;
};
A few details worth noting:
* `mReadPromise` is wrapped in `std::optional` because each call to `read()` constructs a new `Promise`.
* `std::exchange` is used in `OnReadDone` to take ownership of the `Promise`, signaling that a single read has completed.
* `done()` waits for the entire stream to finish; it holds a separate `mDonePromise` distinct from the per-read promise.
Using `Reader` to
asyncio::task::Task<sample::EchoResponse> echo(
const sample::EchoRequest &request,
std::shared_ptr<grpc::ClientContext> context
) {
sample::EchoResponse response;
asyncio::Promise<void, std::string> promise;
stub->async()->Echo(
context.get(), &request, &response,
[&](const grpc::Status &status) {
if (!status.ok()) {
promise.reject(status.error_message());
return;
}
promise.resolve();
}
);
if (const auto result = co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
}; !result)
throw co_await asyncio::error::StacktraceError<std::runtime_error>::make(result.error());
co_return response;
}
The core logic is just a few lines: construct a `Promise`, decide `resolve` or `reject` based on `Status` inside the callback, then `co_await` the `Promise`'s `Future`.
Cancellation support is woven in naturally — wrap the `Future` with `Cancellable`, call `context->TryCancel()` on cancellation. From the caller's perspective, this function is indistinguishable from a normal synchronous function, yet it never blocks the event loop.
# Client Streaming RPCs
Streaming RPCs are more complex than Unary, because data is transmitted one piece at a time and each read or write is an independent async operation.
# Server Streaming (Reader)
For a server-streaming RPC, the client needs to inherit `grpc::ClientReadReactor<T>`. Each call to `StartRead` causes gRPC to invoke `OnReadDone` when data is ready.
template<typename T>
class Reader final : public grpc::ClientReadReactor<T> {
public:
explicit Reader(std::shared_ptr<grpc::ClientContext> context) : mContext{std::move(context)} {
}
void OnDone(const grpc::Status &status) override {
if (!status.ok()) {
mDonePromise.reject(status.error_message());
return;
}
mDonePromise.resolve();
}
void OnReadDone(const bool ok) override {
// Resolve the per-read Promise when each read completes
std::exchange(mReadPromise, std::nullopt)->resolve(ok);
}
asyncio::task::Task<std::optional<T>> read() {
T element;
asyncio::Promise<bool> promise;
auto future = promise.getFuture();
mReadPromise.emplace(std::move(promise));
grpc::ClientReadReactor<T>::StartRead(&element);
// ok == false means the stream has ended
if (!co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
})
co_return std::nullopt;
co_return element;
}
asyncio::task::Task<void> done() {
if (const auto result = co_await mDonePromise.getFuture(); !result)
throw co_await asyncio::error::StacktraceError<std::runtime_error>::make(result.error());
}
private:
std::shared_ptr<grpc::ClientContext> mContext;
asyncio::Promise<void, std::string> mDonePromise;
std::optional<asyncio::Promise<bool>> mReadPromise;
};
A few details worth noting:
* `mReadPromise` is wrapped in `std::optional` because each call to `read()` constructs a new `Promise`.
* `std::exchange` is used in `OnReadDone` to take ownership of the `Promise`, signaling that a single read has completed.
* `done()` waits for the entire stream to finish; it holds a separate `mDonePromise` distinct from the per-read promise.
Using `Reader` to
consume a server-streaming RPC:
asyncio::task::Task<void> getNumbers(
const sample::GetNumbersRequest &request,
std::shared_ptr<grpc::ClientContext> context
) {
Reader<sample::Number> reader{context};
stub->async()->GetNumbers(context.get(), &request, &reader);
reader.AddHold();
reader.StartCall();
while (true) {
auto number = co_await reader.read();
if (!number)
break; // stream ended
fmt::print("Received: {}\n", number->value());
}
reader.RemoveHold();
co_await reader.done(); // wait for the stream to fully finish and check for errors
}
>`AddHold` and `RemoveHold` are gRPC Reactor lifecycle control mechanisms that prevent the Reactor from being destroyed while we hold it.
# Client Streaming (Writer)
Client-streaming RPC is similar to server-streaming but in the opposite direction. Inherit `grpc::ClientWriteReactor<T>`; after each `StartWrite` completes, `OnWriteDone` is called back:
template<typename T>
class Writer final : public grpc::ClientWriteReactor<T> {
public:
void OnWriteDone(const bool ok) override {
std::exchange(mWritePromise, std::nullopt)->resolve(ok);
}
void OnWritesDoneDone(const bool ok) override {
std::exchange(mWriteDonePromise, std::nullopt)->resolve(ok);
}
asyncio::task::Task<bool> write(const T element) {
asyncio::Promise<bool> promise;
auto future = promise.getFuture();
mWritePromise.emplace(std::move(promise));
grpc::ClientWriteReactor<T>::StartWrite(&element);
co_return co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
};
}
asyncio::task::Task<bool> writeDone() {
asyncio::Promise<bool> promise;
auto future = promise.getFuture();
mWriteDonePromise.emplace(std::move(promise));
grpc::ClientWriteReactor<T>::StartWritesDone();
co_return co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
};
}
// OnDone and done() are similar to Reader, omitted for brevity
private:
std::shared_ptr<grpc::ClientContext> mContext;
asyncio::Promise<void, std::string> mDonePromise;
std::optional<asyncio::Promise<bool>> mWritePromise;
std::optional<asyncio::Promise<bool>> mWriteDonePromise;
};
`writeDone()` corresponds to gRPC's `StartWritesDone`, which signals to the server that the client has finished writing — equivalent to sending EOF on the stream.
# Client Bidirectional Streaming
Bidirectional streaming is the most complex of the four patterns: the client simultaneously has both read and write capabilities. Fortunately, all that is needed is to merge the `Reader` and `Writer` logic into a single `Stream` class:
template<typename RequestElement, typename ResponseElement>
class Stream final : public grpc::ClientBidiReactor<RequestElement, ResponseElement> {
public:
// OnReadDone, OnWriteDone, OnWritesDoneDone, OnDone are the same as before
asyncio::task::Task<std::optional<ResponseElement>> read() {
ResponseElement element;
asyncio::Promise<bool> promise;
auto future = promise.getFuture();
mReadPromise.emplace(std::move(promise));
grpc::ClientBidiReactor<RequestElement, ResponseElement>::StartRead(&element);
if (!co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void,
asyncio::task::Task<void> getNumbers(
const sample::GetNumbersRequest &request,
std::shared_ptr<grpc::ClientContext> context
) {
Reader<sample::Number> reader{context};
stub->async()->GetNumbers(context.get(), &request, &reader);
reader.AddHold();
reader.StartCall();
while (true) {
auto number = co_await reader.read();
if (!number)
break; // stream ended
fmt::print("Received: {}\n", number->value());
}
reader.RemoveHold();
co_await reader.done(); // wait for the stream to fully finish and check for errors
}
>`AddHold` and `RemoveHold` are gRPC Reactor lifecycle control mechanisms that prevent the Reactor from being destroyed while we hold it.
# Client Streaming (Writer)
Client-streaming RPC is similar to server-streaming but in the opposite direction. Inherit `grpc::ClientWriteReactor<T>`; after each `StartWrite` completes, `OnWriteDone` is called back:
template<typename T>
class Writer final : public grpc::ClientWriteReactor<T> {
public:
void OnWriteDone(const bool ok) override {
std::exchange(mWritePromise, std::nullopt)->resolve(ok);
}
void OnWritesDoneDone(const bool ok) override {
std::exchange(mWriteDonePromise, std::nullopt)->resolve(ok);
}
asyncio::task::Task<bool> write(const T element) {
asyncio::Promise<bool> promise;
auto future = promise.getFuture();
mWritePromise.emplace(std::move(promise));
grpc::ClientWriteReactor<T>::StartWrite(&element);
co_return co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
};
}
asyncio::task::Task<bool> writeDone() {
asyncio::Promise<bool> promise;
auto future = promise.getFuture();
mWriteDonePromise.emplace(std::move(promise));
grpc::ClientWriteReactor<T>::StartWritesDone();
co_return co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
};
}
// OnDone and done() are similar to Reader, omitted for brevity
private:
std::shared_ptr<grpc::ClientContext> mContext;
asyncio::Promise<void, std::string> mDonePromise;
std::optional<asyncio::Promise<bool>> mWritePromise;
std::optional<asyncio::Promise<bool>> mWriteDonePromise;
};
`writeDone()` corresponds to gRPC's `StartWritesDone`, which signals to the server that the client has finished writing — equivalent to sending EOF on the stream.
# Client Bidirectional Streaming
Bidirectional streaming is the most complex of the four patterns: the client simultaneously has both read and write capabilities. Fortunately, all that is needed is to merge the `Reader` and `Writer` logic into a single `Stream` class:
template<typename RequestElement, typename ResponseElement>
class Stream final : public grpc::ClientBidiReactor<RequestElement, ResponseElement> {
public:
// OnReadDone, OnWriteDone, OnWritesDoneDone, OnDone are the same as before
asyncio::task::Task<std::optional<ResponseElement>> read() {
ResponseElement element;
asyncio::Promise<bool> promise;
auto future = promise.getFuture();
mReadPromise.emplace(std::move(promise));
grpc::ClientBidiReactor<RequestElement, ResponseElement>::StartRead(&element);
if (!co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void,
std::error_code> {
mContext->TryCancel();
return {};
}
})
co_return std::nullopt;
co_return element;
}
asyncio::task::Task<bool> write(const RequestElement element) { /* same as Writer */ }
asyncio::task::Task<bool> writeDone() { /* same as Writer */ }
asyncio::task::Task<void> done() { /* same as Reader */ }
private:
std::shared_ptr<grpc::ClientContext> mContext;
asyncio::Promise<void, std::string> mDonePromise;
std::optional<asyncio::Promise<bool>> mReadPromise;
std::optional<asyncio::Promise<bool>> mWritePromise;
std::optional<asyncio::Promise<bool>> mWriteDonePromise;
};
When using a bidirectional stream, reading and writing are two independent tasks that run concurrently:
asyncio::task::Task<void> chat(std::shared_ptr<grpc::ClientContext> context) {
Stream<sample::ChatMessage, sample::ChatMessage> stream{context};
stub->async()->Chat(context.get(), &stream);
stream.AddHold();
stream.StartCall();
co_await all(
// Read task
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (const auto msg = co_await stream.read()) {
fmt::print("Received: {}\n", msg->content());
}
}),
// Write task
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::ChatMessage msg;
msg.set_content("Hello!");
co_await stream.write(msg);
co_await stream.writeDone();
})
);
stream.RemoveHold();
co_await stream.done();
}
`all()` waits for both the read and write subtasks simultaneously. If either fails, it cancels the other and returns the error. This is the task-tree mechanism of asyncio in action — structured concurrency.
# Wrapping GenericClient
The three streaming wrappers above all follow the same pattern, so it is time to unify them with templates. `GenericClient` provides one `call` overload for each of the four RPC types:
template<typename T>
class GenericClient {
using Stub = T::Stub;
using AsyncStub = class Stub::async;
public:
explicit GenericClient(std::unique_ptr<Stub> stub) : mStub{std::move(stub)} {
}
protected:
// 1. Unary RPC
template<typename Request, typename Response>
asyncio::task::Task<Response>
call(
void (AsyncStub::*method)(grpc::ClientContext *, const Request *, Response *,
std::function<void(grpc::Status)>),
std::shared_ptr<grpc::ClientContext> context,
Request request
) {
Response response;
asyncio::Promise<void, std::string> promise;
std::invoke(
method,
mStub->async(),
context.get(),
&request,
&response,
[&](const grpc::Status &status) {
if (!status.ok()) {
promise.reject(status.error_message());
return;
}
promise.resolve();
}
);
if (const auto result = co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
}; !result)
throw co_await asyncio::error::StacktraceError<std::runtime_error>::make(result.error());
co_return response;
}
// 2. Server streaming: push data into a channel via Sender
template<typename Request, typename Element>
asyncio::task::Task<void>
call(
mContext->TryCancel();
return {};
}
})
co_return std::nullopt;
co_return element;
}
asyncio::task::Task<bool> write(const RequestElement element) { /* same as Writer */ }
asyncio::task::Task<bool> writeDone() { /* same as Writer */ }
asyncio::task::Task<void> done() { /* same as Reader */ }
private:
std::shared_ptr<grpc::ClientContext> mContext;
asyncio::Promise<void, std::string> mDonePromise;
std::optional<asyncio::Promise<bool>> mReadPromise;
std::optional<asyncio::Promise<bool>> mWritePromise;
std::optional<asyncio::Promise<bool>> mWriteDonePromise;
};
When using a bidirectional stream, reading and writing are two independent tasks that run concurrently:
asyncio::task::Task<void> chat(std::shared_ptr<grpc::ClientContext> context) {
Stream<sample::ChatMessage, sample::ChatMessage> stream{context};
stub->async()->Chat(context.get(), &stream);
stream.AddHold();
stream.StartCall();
co_await all(
// Read task
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (const auto msg = co_await stream.read()) {
fmt::print("Received: {}\n", msg->content());
}
}),
// Write task
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::ChatMessage msg;
msg.set_content("Hello!");
co_await stream.write(msg);
co_await stream.writeDone();
})
);
stream.RemoveHold();
co_await stream.done();
}
`all()` waits for both the read and write subtasks simultaneously. If either fails, it cancels the other and returns the error. This is the task-tree mechanism of asyncio in action — structured concurrency.
# Wrapping GenericClient
The three streaming wrappers above all follow the same pattern, so it is time to unify them with templates. `GenericClient` provides one `call` overload for each of the four RPC types:
template<typename T>
class GenericClient {
using Stub = T::Stub;
using AsyncStub = class Stub::async;
public:
explicit GenericClient(std::unique_ptr<Stub> stub) : mStub{std::move(stub)} {
}
protected:
// 1. Unary RPC
template<typename Request, typename Response>
asyncio::task::Task<Response>
call(
void (AsyncStub::*method)(grpc::ClientContext *, const Request *, Response *,
std::function<void(grpc::Status)>),
std::shared_ptr<grpc::ClientContext> context,
Request request
) {
Response response;
asyncio::Promise<void, std::string> promise;
std::invoke(
method,
mStub->async(),
context.get(),
&request,
&response,
[&](const grpc::Status &status) {
if (!status.ok()) {
promise.reject(status.error_message());
return;
}
promise.resolve();
}
);
if (const auto result = co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
}; !result)
throw co_await asyncio::error::StacktraceError<std::runtime_error>::make(result.error());
co_return response;
}
// 2. Server streaming: push data into a channel via Sender
template<typename Request, typename Element>
asyncio::task::Task<void>
call(
void (AsyncStub::*method)(grpc::ClientContext *, const Request *,
grpc::ClientReadReactor<Element> *),
std::shared_ptr<grpc::ClientContext> context,
Request request,
asyncio::Sender<Element> sender
) {
Reader<Element> reader{context};
std::invoke(method, mStub->async(), context.get(), &request, &reader);
reader.AddHold();
reader.StartCall();
const auto result = co_await asyncio::error::capture(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await reader.read();
if (!element)
break;
co_await asyncio::error::guard(sender.send(*std::move(element)));
}
})
);
reader.RemoveHold();
co_await reader.done();
if (!result)
std::rethrow_exception(result.error());
}
// 3. Client streaming: read data from Receiver and write into the stream
template<typename Response, typename Element>
asyncio::task::Task<Response>
call(
void (AsyncStub::*method)(grpc::ClientContext *, Response *,
grpc::ClientWriteReactor<Element> *),
std::shared_ptr<grpc::ClientContext> context,
asyncio::Receiver<Element> receiver
) {
Response response;
Writer<Element> writer{context};
std::invoke(method, mStub->async(), context.get(), &response, &writer);
writer.AddHold();
writer.StartCall();
const auto result = co_await asyncio::error::capture(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await receiver.receive();
if (!element) {
if (!co_await writer.writeDone())
fmt::print(stderr, "Write done failed\n");
if (element.error() != asyncio::ReceiveError::Disconnected)
throw co_await asyncio::error::StacktraceError<std::system_error>::make(element.error());
break;
}
co_await writer.write(*std::move(element));
}
})
);
writer.RemoveHold();
co_await writer.done();
if (!result)
std::rethrow_exception(result.error());
co_return response;
}
// 4. Bidirectional streaming: hold both a Receiver (input) and a Sender (output)
template<typename RequestElement, typename ResponseElement>
asyncio::task::Task<void>
call(
void (AsyncStub::*method)(grpc::ClientContext *,
grpc::ClientBidiReactor<RequestElement, ResponseElement> *),
std::shared_ptr<grpc::ClientContext> context,
asyncio::Receiver<RequestElement> receiver,
asyncio::Sender<ResponseElement> sender
) {
Stream<RequestElement, ResponseElement> stream{context};
std::invoke(method, mStub->async(), context.get(), &stream);
stream.AddHold();
stream.StartCall();
const auto result = co_await asyncio::error::capture(
all(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await stream.read();
if (!element)
break;
if (const auto res = co_await sender.send(*std::move(element)); !res) {
grpc::ClientReadReactor<Element> *),
std::shared_ptr<grpc::ClientContext> context,
Request request,
asyncio::Sender<Element> sender
) {
Reader<Element> reader{context};
std::invoke(method, mStub->async(), context.get(), &request, &reader);
reader.AddHold();
reader.StartCall();
const auto result = co_await asyncio::error::capture(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await reader.read();
if (!element)
break;
co_await asyncio::error::guard(sender.send(*std::move(element)));
}
})
);
reader.RemoveHold();
co_await reader.done();
if (!result)
std::rethrow_exception(result.error());
}
// 3. Client streaming: read data from Receiver and write into the stream
template<typename Response, typename Element>
asyncio::task::Task<Response>
call(
void (AsyncStub::*method)(grpc::ClientContext *, Response *,
grpc::ClientWriteReactor<Element> *),
std::shared_ptr<grpc::ClientContext> context,
asyncio::Receiver<Element> receiver
) {
Response response;
Writer<Element> writer{context};
std::invoke(method, mStub->async(), context.get(), &response, &writer);
writer.AddHold();
writer.StartCall();
const auto result = co_await asyncio::error::capture(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await receiver.receive();
if (!element) {
if (!co_await writer.writeDone())
fmt::print(stderr, "Write done failed\n");
if (element.error() != asyncio::ReceiveError::Disconnected)
throw co_await asyncio::error::StacktraceError<std::system_error>::make(element.error());
break;
}
co_await writer.write(*std::move(element));
}
})
);
writer.RemoveHold();
co_await writer.done();
if (!result)
std::rethrow_exception(result.error());
co_return response;
}
// 4. Bidirectional streaming: hold both a Receiver (input) and a Sender (output)
template<typename RequestElement, typename ResponseElement>
asyncio::task::Task<void>
call(
void (AsyncStub::*method)(grpc::ClientContext *,
grpc::ClientBidiReactor<RequestElement, ResponseElement> *),
std::shared_ptr<grpc::ClientContext> context,
asyncio::Receiver<RequestElement> receiver,
asyncio::Sender<ResponseElement> sender
) {
Stream<RequestElement, ResponseElement> stream{context};
std::invoke(method, mStub->async(), context.get(), &stream);
stream.AddHold();
stream.StartCall();
const auto result = co_await asyncio::error::capture(
all(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await stream.read();
if (!element)
break;
if (const auto res = co_await sender.send(*std::move(element)); !res) {
context->TryCancel();
throw co_await asyncio::error::StacktraceError<std::system_error>::make(res.error());
}
}
}),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await receiver.receive();
if (!element) {
if (!co_await stream.writeDone())
fmt::print(stderr, "Write done failed\n");
if (element.error() != asyncio::ReceiveError::Disconnected)
throw co_await asyncio::error::StacktraceError<std::system_error>::make(element.error());
break;
}
co_await stream.write(*std::move(element));
}
})
)
);
stream.RemoveHold();
co_await stream.done();
if (!result)
std::rethrow_exception(result.error());
}
private:
std::unique_ptr<Stub> mStub;
};
The four overloads are distinguished automatically by parameter types — the compiler selects the correct overload based on the method pointer type passed in. This is a classic use of template metaprogramming: different call patterns map to different function signatures, with zero runtime overhead.
For streaming RPCs, `GenericClient` uses `asyncio::channel` as the data conduit: `Sender` writes data into the channel, `Receiver` reads from it. The channel's close signal (`Receiver` receiving a `Disconnected` error) naturally maps to stream EOF.
# Implementing the Concrete Client
With `GenericClient` in place, implementing a concrete service client is straightforward:
class Client final : public GenericClient<sample::SampleService> {
public:
using GenericClient::GenericClient;
static Client make(const std::string &address) {
return Client{sample::SampleService::NewStub(grpc::CreateChannel(address, grpc::InsecureChannelCredentials()))};
}
asyncio::task::Task<sample::EchoResponse>
echo(
sample::EchoRequest request,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_return co_await call(&sample::SampleService::Stub::async::Echo, std::move(context), std::move(request));
}
asyncio::task::Task<void>
getNumbers(
sample::GetNumbersRequest request,
asyncio::Sender<sample::Number> sender,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_await call(
&sample::SampleService::Stub::async::GetNumbers,
std::move(context),
std::move(request),
std::move(sender)
);
}
asyncio::task::Task<sample::SumResponse> sum(
asyncio::Receiver<sample::Number> receiver,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_return co_await call(&sample::SampleService::Stub::async::Sum, std::move(context), std::move(receiver));
}
asyncio::task::Task<void>
chat(
asyncio::Receiver<sample::ChatMessage> receiver,
asyncio::Sender<sample::ChatMessage> sender,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_return co_await call(
&sample::SampleService::Stub::async::Chat,
std::move(context),
std::move(receiver),
std::move(sender)
);
}
};
Here is how to call all four RPC types concurrently,
throw co_await asyncio::error::StacktraceError<std::system_error>::make(res.error());
}
}
}),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await receiver.receive();
if (!element) {
if (!co_await stream.writeDone())
fmt::print(stderr, "Write done failed\n");
if (element.error() != asyncio::ReceiveError::Disconnected)
throw co_await asyncio::error::StacktraceError<std::system_error>::make(element.error());
break;
}
co_await stream.write(*std::move(element));
}
})
)
);
stream.RemoveHold();
co_await stream.done();
if (!result)
std::rethrow_exception(result.error());
}
private:
std::unique_ptr<Stub> mStub;
};
The four overloads are distinguished automatically by parameter types — the compiler selects the correct overload based on the method pointer type passed in. This is a classic use of template metaprogramming: different call patterns map to different function signatures, with zero runtime overhead.
For streaming RPCs, `GenericClient` uses `asyncio::channel` as the data conduit: `Sender` writes data into the channel, `Receiver` reads from it. The channel's close signal (`Receiver` receiving a `Disconnected` error) naturally maps to stream EOF.
# Implementing the Concrete Client
With `GenericClient` in place, implementing a concrete service client is straightforward:
class Client final : public GenericClient<sample::SampleService> {
public:
using GenericClient::GenericClient;
static Client make(const std::string &address) {
return Client{sample::SampleService::NewStub(grpc::CreateChannel(address, grpc::InsecureChannelCredentials()))};
}
asyncio::task::Task<sample::EchoResponse>
echo(
sample::EchoRequest request,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_return co_await call(&sample::SampleService::Stub::async::Echo, std::move(context), std::move(request));
}
asyncio::task::Task<void>
getNumbers(
sample::GetNumbersRequest request,
asyncio::Sender<sample::Number> sender,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_await call(
&sample::SampleService::Stub::async::GetNumbers,
std::move(context),
std::move(request),
std::move(sender)
);
}
asyncio::task::Task<sample::SumResponse> sum(
asyncio::Receiver<sample::Number> receiver,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_return co_await call(&sample::SampleService::Stub::async::Sum, std::move(context), std::move(receiver));
}
asyncio::task::Task<void>
chat(
asyncio::Receiver<sample::ChatMessage> receiver,
asyncio::Sender<sample::ChatMessage> sender,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_return co_await call(
&sample::SampleService::Stub::async::Chat,
std::move(context),
std::move(receiver),
std::move(sender)
);
}
};
Here is how to call all four RPC types concurrently,
showcasing the elegance of asyncio's concurrent programming model:
asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) {
auto client = Client::make("localhost:50051");
co_await all(
// Unary RPC
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::EchoRequest req;
req.set_message("Hello gRPC!");
const auto resp = co_await client.echo(req);
fmt::print("Echo: {}\n", resp.message());
}),
// Server streaming + client streaming, connected via a channel
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::GetNumbersRequest req;
req.set_value(1);
req.set_count(5);
auto [sender, receiver] = asyncio::channel<sample::Number>();
const auto result = co_await all(
client.getNumbers(req, std::move(sender)),
client.sum(std::move(receiver))
);
const auto &resp = std::get<sample::SumResponse>(result);
fmt::print("Sum: {}, count: {}\n", resp.total(), resp.count());
}),
// Bidirectional streaming
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
auto [inSender, inReceiver] = asyncio::channel<sample::ChatMessage>();
auto [outSender, outReceiver] = asyncio::channel<sample::ChatMessage>();
co_await all(
client.chat(std::move(outReceiver), std::move(inSender)),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::ChatMessage msg;
msg.set_content("Hello server!");
co_await asyncio::error::guard(outSender.send(std::move(msg)));
outSender.close();
}),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
const auto msg = co_await asyncio::error::guard(inReceiver.receive());
fmt::print("Chat reply: {}\n", msg.content());
})
);
})
);
}
The channel-based pipeline connecting `getNumbers` and `sum` is especially worth noting: numbers produced by the server-streaming RPC flow directly through the channel into the client-streaming RPC. The whole pipeline looks like synchronous code, but is fully asynchronous underneath.
> Full source code: [GitHub](https://github.com/Hackerl/asyncio/tree/master/sample/grpc)
> Due to word count limitations, the server-side section can only be placed in Part 2.
https://redd.it/1ssjrsx
@r_cpp
asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) {
auto client = Client::make("localhost:50051");
co_await all(
// Unary RPC
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::EchoRequest req;
req.set_message("Hello gRPC!");
const auto resp = co_await client.echo(req);
fmt::print("Echo: {}\n", resp.message());
}),
// Server streaming + client streaming, connected via a channel
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::GetNumbersRequest req;
req.set_value(1);
req.set_count(5);
auto [sender, receiver] = asyncio::channel<sample::Number>();
const auto result = co_await all(
client.getNumbers(req, std::move(sender)),
client.sum(std::move(receiver))
);
const auto &resp = std::get<sample::SumResponse>(result);
fmt::print("Sum: {}, count: {}\n", resp.total(), resp.count());
}),
// Bidirectional streaming
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
auto [inSender, inReceiver] = asyncio::channel<sample::ChatMessage>();
auto [outSender, outReceiver] = asyncio::channel<sample::ChatMessage>();
co_await all(
client.chat(std::move(outReceiver), std::move(inSender)),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::ChatMessage msg;
msg.set_content("Hello server!");
co_await asyncio::error::guard(outSender.send(std::move(msg)));
outSender.close();
}),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
const auto msg = co_await asyncio::error::guard(inReceiver.receive());
fmt::print("Chat reply: {}\n", msg.content());
})
);
})
);
}
The channel-based pipeline connecting `getNumbers` and `sum` is especially worth noting: numbers produced by the server-streaming RPC flow directly through the channel into the client-streaming RPC. The whole pipeline looks like synchronous code, but is fully asynchronous underneath.
> Full source code: [GitHub](https://github.com/Hackerl/asyncio/tree/master/sample/grpc)
> Due to word count limitations, the server-side section can only be placed in Part 2.
https://redd.it/1ssjrsx
@r_cpp
GitHub
asyncio/sample/grpc at master · Hackerl/asyncio
C++23 coroutine network framework. Contribute to Hackerl/asyncio development by creating an account on GitHub.
Boost 1.91 Released: New Decimal Library, SIMD UUID, Redis Sentinel, C++26 Reflection in PFR
https://boost.org/releases/1.91.0/
https://redd.it/1ssrnzf
@r_cpp
https://boost.org/releases/1.91.0/
https://redd.it/1ssrnzf
@r_cpp
www.boost.org
Boost 1.91.0
Discover what's new in Boost 1.91.0
Binary debug for nested big ass structures.
Heyaa,
So recently I had to compare binaries in the layout of multi level nested big fat structures. I surprised to find that there are no good tools to do that. The best i could find was watch section in visual studio. I have tried another tool, WinDbg this doesn’t work well with macros and arrays. To make matters worse, this big ass structure has offsets that point beyond of the structure. There is no good tools which automatically tells values for each field
Tldr: i have custom buffer layout with multiple nested level structures.
https://redd.it/1sssd6w
@r_cpp
Heyaa,
So recently I had to compare binaries in the layout of multi level nested big fat structures. I surprised to find that there are no good tools to do that. The best i could find was watch section in visual studio. I have tried another tool, WinDbg this doesn’t work well with macros and arrays. To make matters worse, this big ass structure has offsets that point beyond of the structure. There is no good tools which automatically tells values for each field
Tldr: i have custom buffer layout with multiple nested level structures.
https://redd.it/1sssd6w
@r_cpp
Reddit
From the cpp community on Reddit
Explore this post and more from the cpp community
Example for Implementing Functions of Partitions
https://abuehl.github.io/2026/04/22/implementing-functions-of-partitions.html
https://redd.it/1ssvtts
@r_cpp
https://abuehl.github.io/2026/04/22/implementing-functions-of-partitions.html
https://redd.it/1ssvtts
@r_cpp
Adrian’s Notes
Example for Implementing Functions of Partitions
There were some discussions about the fact that "module M;" imports the whole interface of M (see “Uneeded Recompilations When Using Partitions”). Looking at the external partition Core/Container/Container.ixx of our UML Editor, we can see that there is a…
C++ is unsafe. Rust is safe. Should we all move to Rust? 44CVEs found in Rust CoreUtils audit.
https://www.phoronix.com/news/Ubuntu-Rust-Coreutils-Audit
https://redd.it/1st3rbf
@r_cpp
https://www.phoronix.com/news/Ubuntu-Rust-Coreutils-Audit
https://redd.it/1st3rbf
@r_cpp
Phoronix
Ubuntu Rust Coreutils Audit Revealed 113 Issues, Ubuntu 26.10 Aims For "100% Rust Coreutils"
Ahead of tomorrow's Ubuntu 26.04 LTS release, Canonical published a blog post today outlining the state of Rust Coreutils for its premiere in this long-term support (LTS) version
Boost.Decimal: IEEE 754 Decimal Floating Point for C++ — Header-Only, Constexpr, C++14
https://www.boost.org/library/latest/decimal/
https://redd.it/1stha9t
@r_cpp
https://www.boost.org/library/latest/decimal/
https://redd.it/1stha9t
@r_cpp
www.boost.org
Boost.Decimal (1.91.0)
An implementation of IEEE754 Decimal Floating Point Numbers.
Libraries for general purpose 2D/3D geometry vocabulary types?
I work in the geospatial industry and have worked on plenty of large projects that have their own internal geometry libraries. Some good, some bad, most with interesting historical choices. I recently joined a new project that hasn't yet really defined its vocabulary types yet, and I'm finding that extremely inconvenient, so I'm looking around at what is common
The kinds of things I'm looking for are:
`Vector<typename T, size_t Dimension>`: Basically a `std::array<T,Dimension>` with a vector-like API
`Size`: A wrapper around a `Vector` with size semantics
`AxisAlignedBox`: A set of `Range`s in N dimensions
`Polyline`: A `std::vector<Point>` assumed to be open
`Matrix`: An NxM matrix
...
I know there are plenty of vector/matrix/linear algebra libraries out there, often focused on flexibilty and raw computational performance. I'm more interested in nice vocabulary types that implement proper semantics via convenient methods and operators.
It seems these things are often provided by game engines, but pulling in an entire game engine for a non-gaming project feels silly.
So if you were starting a new, greenfield C++ application dealing with 3D geometric data, which existing library, if any, would you reach for?
https://redd.it/1stif6d
@r_cpp
I work in the geospatial industry and have worked on plenty of large projects that have their own internal geometry libraries. Some good, some bad, most with interesting historical choices. I recently joined a new project that hasn't yet really defined its vocabulary types yet, and I'm finding that extremely inconvenient, so I'm looking around at what is common
The kinds of things I'm looking for are:
`Vector<typename T, size_t Dimension>`: Basically a `std::array<T,Dimension>` with a vector-like API
Point: A wrapper around a Vector with point semantics`Size`: A wrapper around a `Vector` with size semantics
Range: A basic min/max interval`AxisAlignedBox`: A set of `Range`s in N dimensions
RotatedBox: A AxisAlignedBox with a basis Vector`Polyline`: A `std::vector<Point>` assumed to be open
Polygon: A std::vector<Point> assumed to be closed`Matrix`: An NxM matrix
...
I know there are plenty of vector/matrix/linear algebra libraries out there, often focused on flexibilty and raw computational performance. I'm more interested in nice vocabulary types that implement proper semantics via convenient methods and operators.
It seems these things are often provided by game engines, but pulling in an entire game engine for a non-gaming project feels silly.
So if you were starting a new, greenfield C++ application dealing with 3D geometric data, which existing library, if any, would you reach for?
https://redd.it/1stif6d
@r_cpp
Reddit
From the cpp community on Reddit
Explore this post and more from the cpp community
Good Resource for Topics
Hi,
Please suggest good resource for Multithreading, Smart Pointers and Copy Constructor.
Thanks
https://redd.it/1stj0ka
@r_cpp
Hi,
Please suggest good resource for Multithreading, Smart Pointers and Copy Constructor.
Thanks
https://redd.it/1stj0ka
@r_cpp
Reddit
From the cpp community on Reddit
Explore this post and more from the cpp community
Can AI write truly optimized C++?
https://pvs-studio.com/en/blog/posts/cpp/1366/
https://redd.it/1stiofv
@r_cpp
https://pvs-studio.com/en/blog/posts/cpp/1366/
https://redd.it/1stiofv
@r_cpp
PVS-Studio
Let′s check vibe code that acts like optimized C++ one but is actually a mess
The value of a skilled developer is shifting toward the ability to effectively review code. Although generating code now is easier than ever, evaluating it for proper decomposition, correctness...
Devirtualization and Static Polymorphism
https://david.alvarezrosa.com/posts/devirtualization-and-static-polymorphism/
https://redd.it/1sto3hp
@r_cpp
https://david.alvarezrosa.com/posts/devirtualization-and-static-polymorphism/
https://redd.it/1sto3hp
@r_cpp
David Álvarez Rosa | Personal Website
Devirtualization and Static Polymorphism | David Álvarez Rosa | Personal Website
Ever wondered why your “clean” polymorphic design underperforms in benchmarks? Virtual dispatch enables polymorphism, but it comes with hidden …