Coroutines in C ++ 20. Part 2

Introduction



This article is a continuation of this article .



Endless data flow with co_yield



The code below implements an endless stream of data. The coroutine is getNextused co_yieldto create a stream of data that starts with startand issues, upon request, each new value with a step step.



Endless data stream
//infiniteDataStream.cpp
#include <coroutine>
#include <memory>
#include <iostream>

template <typename T>
struct Generator {
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;
    Generator(handle_type h) : coro(h) {}                       // (3)
    handle_type coro;
    std::shared_ptr<T> value;
    ~Generator() {
        if (coro) {
            coro.destroy();
        }
    }
    Generator(const Generator &) = delete;
    Generator& operator=(const Generator &) = delete;
    Generator(Generator &&other) : coro(other.coro) {
        other.coro = nullptr;
    }
    Generator& operator=(Generator &&other) {
        coro = other.coro;
        other.coro = nullptr;
        return *this;
    }
    T getValue() {
        return coro.promise().current_value;
    }
    bool next() {                                               // (5)
        coro.resume();
        return not coro.done();
    }
    struct promise_type {
        promise_type() = default;                               // (1)
        ~promise_type() = default;
        auto initial_suspend() {                                // (4)
            return std::suspend_always{};
        }
        auto final_suspend() {
            return std::suspend_always{};
        }
        auto get_return_object() {                              // (2)
            return Generator{handle_type::from_promise(*this)};
        }
        auto return_void() {
            return std::suspend_never{};
        }
        auto yield_value(T value) {                             // (6)
            current_value = value;
            return std::suspend_always{};
        }
        void unhandled_exception() {
            std::exit(1);
        }
        T current_value;
    };
};
Generator <int> getNext(int start = 0, int step = 1) {
    auto value = start;
    for (int i = 0; ; ++i) {
        co_yield value;
        value += step;
    }
}
int main() {
    std::cout << "getNext():";
    auto gen = getNext();
    for (int i = 0; i <= 10; ++i) {
        gen.next();
        std::cout << " " << gen.getValue();                     // (7)
    }
    std::cout << "\ngetNext(100, -10):";
    auto gen2 = getNext(100, -10);
    for (int i = 0; i <= 20; ++i) {
        gen2.next();
        std::cout << " " << gen2.getValue();
    }
    std::cout << std::endl;
}


Translator's note: the assembly was carried out by the commandg++ -fcoroutines infiniteDataStream.cpp

In the function main, 2 coroutines are created. The first gen,, returns values ​​from 0 to 10. The second,, gen2- from 100 to -100 in increments of 10. Program output:



$ ./infDS
getNext(): 0 1 2 3 4 5 6 7 8 9 10
getNext(100, -10): 100 90 80 70 60 50 40 30 20 10 0 -10 -20 -30 -40 -50 -60 -70 -80 -90 -100


Labels with numbers in comments in the program infiniteDataStream.cppdescribe the first iteration in the following sequence:



  1. Create a promise object
  2. Calling promise.get_return_object()and storing the result in a local variable
  3. Generator creation
  4. Call promise.initial_suspend(), because the generator is "lazy", thereforesuspend_always
  5. Request the next value and return a flag if the generator is exhausted
  6. Action on co_yield, after which the next value will be available
  7. Getting the next value


In subsequent iterations, only steps 5 and 6 are performed.



Synchronizing streams using co_await



co_await. , β€” . (condition variables), promises futures, -. , (spurious wakeups) (lost wakeups).



// senderReceiver.cpp
#include <coroutine>
#include <chrono>
#include <iostream>
#include <functional>
#include <string>
#include <stdexcept>
#include <atomic>
#include <thread>

class Event {
public:
    Event() = default;
    Event(const Event &) = delete;
    Event(Event &&) = delete;
    Event& operator=(const Event &) = delete;
    Event& operator=(Event &&) = delete;
    class Awaiter;
    Awaiter operator co_await() const;
    void notify();
private:
    friend class Awaiter;
    mutable std::atomic<void *> suspendedWaiter{nullptr};
    mutable std::atomic<bool> notified{false};
};

class Event::Awaiter {
public:
    Awaiter(const Event &e) : event(e) {}
    bool await_ready() const;
    bool await_suspend(std::coroutine_handle<> ch);
    void await_resume() {}
private:
    friend class Event;
    const Event &event;
    std::coroutine_handle<> coroutineHandle;
};

bool Event::Awaiter::await_ready() const {
    if (event.suspendedWaiter.load() != nullptr) {
        throw std::runtime_error("More than one waiter is not valid");
    }
    return event.notified; // true -     , false -  
}

bool Event::Awaiter::await_suspend(std::coroutine_handle<> ch) {
    coroutineHandle = ch;
    if (event.notified) {
        return false;
    }
    //  waiter   
    event.suspendedWaiter.store(this);
    return true;
}

void Event::notify() {
    notified = true;
    //   waiter
    auto *waiter = static_cast<Awaiter *>(suspendedWaiter.load());
    //    waiter
    if (waiter != nullptr) {
        //   
        waiter->coroutineHandle.resume();
    }
}

Event::Awaiter Event::operator co_await() const {
    return Awaiter{*this};
}

struct Task {
    struct promise_type {
        Task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() { return {}; }
        void return_void() {}
        void unhandled_exception() {}
    };
};

Task receiver(Event &event) {
    auto start = std::chrono::high_resolution_clock::now();
    co_await event;
    std::cout << "Got the notification!" << std::endl;
    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> elapsed = end - start;
    std::cout << "Waited " << elapsed.count() << " seconds." << std::endl;
}

int main() {
    std::cout << "Notification before waiting" << std::endl;
    Event event1{};
    auto senderThread1 = std::thread([&event1] { event1.notify(); });
    auto receiverThread1 = std::thread(receiver, std::ref(event1));
    receiverThread1.join();
    senderThread1.join();

    std::cout << "\nNotification after 2 seconds waiting" << std::endl;
    Event event2{};
    auto receiverThread2 = std::thread(receiver, std::ref(event2));
    auto senderThread2 = std::thread([&event2] {
                                         using namespace std::chrono_literals;
                                         std::this_thread::sleep_for(2s);
                                         event2.notify();
                                     });
    receiverThread2.join();
    senderThread2.join();
}


: g++ -pthread -fcoroutines senderReceiver.cpp



, . , senderReceiver.cpp senderThread1 senderThread2 event (eventN.notify()). receiver , receiverThread1 receiverThread2. , . .



senderReceiver



$ ./senderReceiver
Notification before waiting
Got the notification!
Waited 3.7006e-05 seconds.

Notification after 2 seconds waiting
Got the notification!
Waited 2.00056 seconds.


Generator Event , . , Generator awaitable awaiter; Event operator co_await awaiter. awaitable awaiter .



, , 2 . , event1 , , event2 , 2 .

senderReceiver.cpp . Event : suspendedWaiter notified. waiter , .



, event1 receiverThread1 . even1.notify() notified waiter. waiter nullptr .. , , waiter->coroutineHandle.resume() . await_ready waiter , , std::runtime_error. . notified true notify, , , .



event2 co_await event , . await_ready. , event.notified false . , await_suspend handle ch corotineHandle. , , . , waiter suspendedWaiter. event2.notify notify. , waiter nullptr. waiteruses coroutineHandlecoroutines to resume work.




All Articles