Introduction
This article is a continuation of this article .
Endless data flow with co_yield
The code below implements an endless stream of data. The coroutine is getNext
used co_yield
to create a stream of data that starts with start
and issues, upon request, each new value with a step step
.
//infiniteDataStream.cpp
#include <coroutine>
#include <memory>
#include <iostream>
template <typename T>
struct Generator {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
Generator(handle_type h) : coro(h) {} // (3)
handle_type coro;
std::shared_ptr<T> value;
~Generator() {
if (coro) {
coro.destroy();
}
}
Generator(const Generator &) = delete;
Generator& operator=(const Generator &) = delete;
Generator(Generator &&other) : coro(other.coro) {
other.coro = nullptr;
}
Generator& operator=(Generator &&other) {
coro = other.coro;
other.coro = nullptr;
return *this;
}
T getValue() {
return coro.promise().current_value;
}
bool next() { // (5)
coro.resume();
return not coro.done();
}
struct promise_type {
promise_type() = default; // (1)
~promise_type() = default;
auto initial_suspend() { // (4)
return std::suspend_always{};
}
auto final_suspend() {
return std::suspend_always{};
}
auto get_return_object() { // (2)
return Generator{handle_type::from_promise(*this)};
}
auto return_void() {
return std::suspend_never{};
}
auto yield_value(T value) { // (6)
current_value = value;
return std::suspend_always{};
}
void unhandled_exception() {
std::exit(1);
}
T current_value;
};
};
Generator <int> getNext(int start = 0, int step = 1) {
auto value = start;
for (int i = 0; ; ++i) {
co_yield value;
value += step;
}
}
int main() {
std::cout << "getNext():";
auto gen = getNext();
for (int i = 0; i <= 10; ++i) {
gen.next();
std::cout << " " << gen.getValue(); // (7)
}
std::cout << "\ngetNext(100, -10):";
auto gen2 = getNext(100, -10);
for (int i = 0; i <= 20; ++i) {
gen2.next();
std::cout << " " << gen2.getValue();
}
std::cout << std::endl;
}
Translator's note: the assembly was carried out by the commandg++ -fcoroutines infiniteDataStream.cpp
In the function main
, 2 coroutines are created. The first gen
,, returns values ββfrom 0 to 10. The second,, gen2
- from 100 to -100 in increments of 10. Program output:
$ ./infDS
getNext(): 0 1 2 3 4 5 6 7 8 9 10
getNext(100, -10): 100 90 80 70 60 50 40 30 20 10 0 -10 -20 -30 -40 -50 -60 -70 -80 -90 -100
Labels with numbers in comments in the program infiniteDataStream.cpp
describe the first iteration in the following sequence:
- Create a promise object
- Calling
promise.get_return_object()
and storing the result in a local variable - Generator creation
- Call
promise.initial_suspend()
, because the generator is "lazy", thereforesuspend_always
- Request the next value and return a flag if the generator is exhausted
- Action on
co_yield
, after which the next value will be available - Getting the next value
In subsequent iterations, only steps 5 and 6 are performed.
Synchronizing streams using co_await
co_await
. , β . (condition variables), promises futures, -. , (spurious wakeups) (lost wakeups).
// senderReceiver.cpp
#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>
class Event {
public:
Event() = default;
Event(const Event &) = delete;
Event(Event &&) = delete;
Event& operator=(const Event &) = delete;
Event& operator=(Event &&) = delete;
class Awaiter;
Awaiter operator co_await() const;
void notify();
private:
friend class Awaiter;
mutable std::atomic<void *> suspendedWaiter{nullptr};
mutable std::atomic<bool> notified{false};
};
class Event::Awaiter {
public:
Awaiter(const Event &e) : event(e) {}
bool await_ready() const;
bool await_suspend(std::coroutine_handle<> ch);
void await_resume() {}
private:
friend class Event;
const Event &event;
std::coroutine_handle<> coroutineHandle;
};
bool Event::Awaiter::await_ready() const {
if (event.suspendedWaiter.load() != nullptr) {
throw std::runtime_error("More than one waiter is not valid");
}
return event.notified; // true - , false -
}
bool Event::Awaiter::await_suspend(std::coroutine_handle<> ch) {
coroutineHandle = ch;
if (event.notified) {
return false;
}
// waiter
event.suspendedWaiter.store(this);
return true;
}
void Event::notify() {
notified = true;
// waiter
auto *waiter = static_cast<Awaiter *>(suspendedWaiter.load());
// waiter
if (waiter != nullptr) {
//
waiter->coroutineHandle.resume();
}
}
Event::Awaiter Event::operator co_await() const {
return Awaiter{*this};
}
struct Task {
struct promise_type {
Task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
Task receiver(Event &event) {
auto start = std::chrono::high_resolution_clock::now();
co_await event;
std::cout << "Got the notification!" << std::endl;
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}
int main() {
std::cout << "Notification before waiting" << std::endl;
Event event1{};
auto senderThread1 = std::thread([&event1] { event1.notify(); });
auto receiverThread1 = std::thread(receiver, std::ref(event1));
receiverThread1.join();
senderThread1.join();
std::cout << "\nNotification after 2 seconds waiting" << std::endl;
Event event2{};
auto receiverThread2 = std::thread(receiver, std::ref(event2));
auto senderThread2 = std::thread([&event2] {
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
event2.notify();
});
receiverThread2.join();
senderThread2.join();
}
: g++ -pthread -fcoroutines senderReceiver.cpp
, . , senderReceiver.cpp
senderThread1
senderThread2
event
(eventN.notify()
). receiver
, receiverThread1
receiverThread2
. , . .
senderReceiver
$ ./senderReceiver
Notification before waiting
Got the notification!
Waited 3.7006e-05 seconds.
Notification after 2 seconds waiting
Got the notification!
Waited 2.00056 seconds.
Generator
Event
, . , Generator
awaitable awaiter; Event
operator co_await
awaiter. awaitable awaiter .
, , 2 . , event1
, , event2
, 2 .
senderReceiver.cpp
. Event
: suspendedWaiter
notified
. waiter , .
, event1
receiverThread1
. even1.notify()
notified
waiter. waiter
nullptr
.. , , waiter->coroutineHandle.resume()
. await_ready
waiter , , std::runtime_error
. . notified
true
notify
, , , .
event2
co_await event
, . await_ready
. , event.notified
false
. , await_suspend
handle ch
corotineHandle
. , , . , waiter
suspendedWaiter
. event2.notify
notify
. , waiter
nullptr
. waiter
uses coroutineHandle
coroutines to resume work.