]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/asio/example/cpp20/channels/throttling_proxy.cpp
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / boost / libs / asio / example / cpp20 / channels / throttling_proxy.cpp
CommitLineData
1e59de90
TL
1//
2// throttling_proxy.cpp
3// ~~~~~~~~~~~~~~~~~~~~
4//
5// Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#include <boost/asio.hpp>
12#include <boost/asio/experimental/as_tuple.hpp>
13#include <boost/asio/experimental/awaitable_operators.hpp>
14#include <boost/asio/experimental/channel.hpp>
15#include <iostream>
16
17using boost::asio::awaitable;
18using boost::asio::buffer;
19using boost::asio::co_spawn;
20using boost::asio::detached;
21using boost::asio::experimental::as_tuple;
22using boost::asio::experimental::channel;
23using boost::asio::io_context;
24using boost::asio::ip::tcp;
25using boost::asio::steady_timer;
26using boost::asio::use_awaitable;
27namespace this_coro = boost::asio::this_coro;
28using namespace boost::asio::experimental::awaitable_operators;
29using namespace std::literals::chrono_literals;
30
31using token_channel = channel<void(boost::system::error_code, std::size_t)>;
32
33awaitable<void> produce_tokens(std::size_t bytes_per_token,
34 steady_timer::duration token_interval, token_channel& tokens)
35{
36 steady_timer timer(co_await this_coro::executor);
37 for (;;)
38 {
39 co_await tokens.async_send(
40 boost::system::error_code{}, bytes_per_token,
41 use_awaitable);
42
43 timer.expires_after(token_interval);
44 co_await timer.async_wait(use_awaitable);
45 }
46}
47
48awaitable<void> transfer(tcp::socket& from,
49 tcp::socket& to, token_channel& tokens)
50{
51 std::array<unsigned char, 4096> data;
52 for (;;)
53 {
54 std::size_t bytes_available = co_await tokens.async_receive(use_awaitable);
55 while (bytes_available > 0)
56 {
57 std::size_t n = co_await from.async_read_some(
58 buffer(data, bytes_available), use_awaitable);
59
60 co_await async_write(to, buffer(data, n), use_awaitable);
61
62 bytes_available -= n;
63 }
64 }
65}
66
67awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
68{
69 constexpr std::size_t number_of_tokens = 100;
70 constexpr size_t bytes_per_token = 20 * 1024;
71 constexpr steady_timer::duration token_interval = 100ms;
72
73 auto ex = client.get_executor();
74 tcp::socket server(ex);
75 token_channel client_tokens(ex, number_of_tokens);
76 token_channel server_tokens(ex, number_of_tokens);
77
78 co_await server.async_connect(target, use_awaitable);
79 co_await (
80 produce_tokens(bytes_per_token, token_interval, client_tokens) &&
81 transfer(client, server, client_tokens) &&
82 produce_tokens(bytes_per_token, token_interval, server_tokens) &&
83 transfer(server, client, server_tokens)
84 );
85}
86
87awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
88{
89 for (;;)
90 {
91 auto [e, client] = co_await acceptor.async_accept(as_tuple(use_awaitable));
92 if (!e)
93 {
94 auto ex = client.get_executor();
95 co_spawn(ex, proxy(std::move(client), target), detached);
96 }
97 else
98 {
99 std::cerr << "Accept failed: " << e.message() << "\n";
100 steady_timer timer(co_await this_coro::executor);
101 timer.expires_after(100ms);
102 co_await timer.async_wait(use_awaitable);
103 }
104 }
105}
106
107int main(int argc, char* argv[])
108{
109 try
110 {
111 if (argc != 5)
112 {
113 std::cerr << "Usage: throttling_proxy";
114 std::cerr << " <listen_address> <listen_port>";
115 std::cerr << " <target_address> <target_port>\n";
116 return 1;
117 }
118
119 io_context ctx;
120
121 auto listen_endpoint =
122 *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);
123
124 auto target_endpoint =
125 *tcp::resolver(ctx).resolve(argv[3], argv[4]);
126
127 tcp::acceptor acceptor(ctx, listen_endpoint);
128 co_spawn(ctx, listen(acceptor, target_endpoint), detached);
129 ctx.run();
130 }
131 catch (std::exception& e)
132 {
133 std::cerr << "Exception: " << e.what() << "\n";
134 }
135}