Files
asyncexper/cpepetest1/main.cpp

89 lines
3.3 KiB
C++

#include <beman/net29/net.hpp>
#include <beman/execution26/execution.hpp>
#include "demo_algorithm.hpp"
#include "demo_scope.hpp"
#include "demo_task.hpp"
#include <iostream>
#include <string>
#include <fstream>
#include <numeric>
#include <ranges>
#include <sstream>
#include <string_view>
#include <unordered_map>
#include <print>
#include <cstdio> // For std::fflush and stdout
namespace ex = beman::execution26;
namespace net = beman::net29;
using namespace std::chrono_literals;
// ----------------------------------------------------------------------------
struct Operation {
uint32_t id;
uint32_t sleep_s;
};
// TODO: how can I protect the DB from "concurrent access"?
// Generally this object in other languages (rust) can be wrapped in an "async mutex"
// that returns a guard when locked, and where the lock function returns a future
// to await. In this "locked mutex acquired" situation you CANNOT/better not
// await out of the task to prevent a deadlock.
// How can I manage it in this framework? Do I need to create a structure that
// return tasks for this and trust the owner to NOT await between the acquire and release?
std::vector<Operation> db{};
auto connection_handler(auto stream) -> demo::task<> {
char buffer[512] = {0};
while (true) {
const size_t read = co_await net::async_receive(stream, net::buffer(buffer));
if (read <= 0) break;
const size_t write = co_await net::async_send(stream, net::buffer(buffer, read));
std::cout << "read: " << read << " write: " << write << std::endl << std::flush;
}
std::cout << "stream terminated" << std::endl << std::flush;
}
auto op_execution(auto scheduler, Operation op) -> demo::task<> {
std::println("Joe {} TASK_ID - Started sleeping for {}sec...", op.id, op.sleep_s);
std::fflush(stdout);
co_await net::resume_after(scheduler, std::chrono::seconds(op.sleep_s));
std::println("Joe {} TASK_ID - Slept well!", op.id);
std::fflush(stdout);
}
auto op_spawner(auto scheduler, auto& scope) -> demo::task<> {
while (true) {
for (auto op: db) {
scope.spawn(op_execution(scheduler, std::move(op)));
}
co_await net::resume_after(scheduler, 10s);
}
}
auto main() -> int {
for (const uint32_t i: std::views::iota(1, 6))
db.push_back(Operation{.id = i, .sleep_s = i+1});
net::io_context context;
demo::scope scope;
net::ip::tcp::endpoint ep(net::ip::address_v4::any(), 8082);
net::ip::tcp::acceptor server(context, ep);
std::cout << "listening on " << ep << std::endl << std::flush;
// TODO: Is it ok to call context.get_scheduler() multiple time?
// Seems fine because it's just a lightweight type to allow for
// "time"-related interaction in task.
// Isn't the scheduler the first Sender of a chain? What does happen
// if we create "two chain of senders" from the same context?
scope.spawn(op_spawner(context.get_scheduler(), scope));
scope.spawn(std::invoke([](auto scheduler, auto& scope, auto& server) -> demo::task<> {
while (true) {
auto[stream, address] = co_await net::async_accept(server);
std::cout << "received connection from " << address << std::endl << std::flush;
scope.spawn(connection_handler(std::move(stream)));
}
}, context.get_scheduler(), scope, server));
context.run();
}