]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/asio/example/cpp17/coroutines_ts/chat_server.cpp
1477c24317d3efa1a27eaf1286efb31c52d3a070
[ceph.git] / ceph / src / boost / libs / asio / example / cpp17 / coroutines_ts / chat_server.cpp
1 //
2 // chat_server.cpp
3 // ~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2022 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include <cstdlib>
12 #include <deque>
13 #include <iostream>
14 #include <list>
15 #include <memory>
16 #include <set>
17 #include <string>
18 #include <utility>
19 #include <boost/asio/awaitable.hpp>
20 #include <boost/asio/detached.hpp>
21 #include <boost/asio/co_spawn.hpp>
22 #include <boost/asio/io_context.hpp>
23 #include <boost/asio/ip/tcp.hpp>
24 #include <boost/asio/read_until.hpp>
25 #include <boost/asio/redirect_error.hpp>
26 #include <boost/asio/signal_set.hpp>
27 #include <boost/asio/steady_timer.hpp>
28 #include <boost/asio/use_awaitable.hpp>
29 #include <boost/asio/write.hpp>
30
31 using boost::asio::ip::tcp;
32 using boost::asio::awaitable;
33 using boost::asio::co_spawn;
34 using boost::asio::detached;
35 using boost::asio::redirect_error;
36 using boost::asio::use_awaitable;
37
38 //----------------------------------------------------------------------
39
40 class chat_participant
41 {
42 public:
43 virtual ~chat_participant() {}
44 virtual void deliver(const std::string& msg) = 0;
45 };
46
47 typedef std::shared_ptr<chat_participant> chat_participant_ptr;
48
49 //----------------------------------------------------------------------
50
51 class chat_room
52 {
53 public:
54 void join(chat_participant_ptr participant)
55 {
56 participants_.insert(participant);
57 for (auto msg: recent_msgs_)
58 participant->deliver(msg);
59 }
60
61 void leave(chat_participant_ptr participant)
62 {
63 participants_.erase(participant);
64 }
65
66 void deliver(const std::string& msg)
67 {
68 recent_msgs_.push_back(msg);
69 while (recent_msgs_.size() > max_recent_msgs)
70 recent_msgs_.pop_front();
71
72 for (auto participant: participants_)
73 participant->deliver(msg);
74 }
75
76 private:
77 std::set<chat_participant_ptr> participants_;
78 enum { max_recent_msgs = 100 };
79 std::deque<std::string> recent_msgs_;
80 };
81
82 //----------------------------------------------------------------------
83
84 class chat_session
85 : public chat_participant,
86 public std::enable_shared_from_this<chat_session>
87 {
88 public:
89 chat_session(tcp::socket socket, chat_room& room)
90 : socket_(std::move(socket)),
91 timer_(socket_.get_executor()),
92 room_(room)
93 {
94 timer_.expires_at(std::chrono::steady_clock::time_point::max());
95 }
96
97 void start()
98 {
99 room_.join(shared_from_this());
100
101 co_spawn(socket_.get_executor(),
102 [self = shared_from_this()]{ return self->reader(); },
103 detached);
104
105 co_spawn(socket_.get_executor(),
106 [self = shared_from_this()]{ return self->writer(); },
107 detached);
108 }
109
110 void deliver(const std::string& msg)
111 {
112 write_msgs_.push_back(msg);
113 timer_.cancel_one();
114 }
115
116 private:
117 awaitable<void> reader()
118 {
119 try
120 {
121 for (std::string read_msg;;)
122 {
123 std::size_t n = co_await boost::asio::async_read_until(socket_,
124 boost::asio::dynamic_buffer(read_msg, 1024), "\n", use_awaitable);
125
126 room_.deliver(read_msg.substr(0, n));
127 read_msg.erase(0, n);
128 }
129 }
130 catch (std::exception&)
131 {
132 stop();
133 }
134 }
135
136 awaitable<void> writer()
137 {
138 try
139 {
140 while (socket_.is_open())
141 {
142 if (write_msgs_.empty())
143 {
144 boost::system::error_code ec;
145 co_await timer_.async_wait(redirect_error(use_awaitable, ec));
146 }
147 else
148 {
149 co_await boost::asio::async_write(socket_,
150 boost::asio::buffer(write_msgs_.front()), use_awaitable);
151 write_msgs_.pop_front();
152 }
153 }
154 }
155 catch (std::exception&)
156 {
157 stop();
158 }
159 }
160
161 void stop()
162 {
163 room_.leave(shared_from_this());
164 socket_.close();
165 timer_.cancel();
166 }
167
168 tcp::socket socket_;
169 boost::asio::steady_timer timer_;
170 chat_room& room_;
171 std::deque<std::string> write_msgs_;
172 };
173
174 //----------------------------------------------------------------------
175
176 awaitable<void> listener(tcp::acceptor acceptor)
177 {
178 chat_room room;
179
180 for (;;)
181 {
182 std::make_shared<chat_session>(
183 co_await acceptor.async_accept(use_awaitable),
184 room
185 )->start();
186 }
187 }
188
189 //----------------------------------------------------------------------
190
191 int main(int argc, char* argv[])
192 {
193 try
194 {
195 if (argc < 2)
196 {
197 std::cerr << "Usage: chat_server <port> [<port> ...]\n";
198 return 1;
199 }
200
201 boost::asio::io_context io_context(1);
202
203 for (int i = 1; i < argc; ++i)
204 {
205 unsigned short port = std::atoi(argv[i]);
206 co_spawn(io_context,
207 listener(tcp::acceptor(io_context, {tcp::v4(), port})),
208 detached);
209 }
210
211 boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
212 signals.async_wait([&](auto, auto){ io_context.stop(); });
213
214 io_context.run();
215 }
216 catch (std::exception& e)
217 {
218 std::cerr << "Exception: " << e.what() << "\n";
219 }
220
221 return 0;
222 }