]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/libs/asio/example/cpp17/coroutines_ts/chat_server.cpp
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / libs / asio / example / cpp17 / coroutines_ts / chat_server.cpp
CommitLineData
11fdf7f2
TL
1//
2// chat_server.cpp
3// ~~~~~~~~~~~~~~~
4//
f67539c2 5// Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
11fdf7f2
TL
6//
7// Distributed under the Boost Software License, Version 1.0. (See accompanying
8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9//
10
11#include <cstdlib>
12#include <deque>
13#include <iostream>
14#include <list>
15#include <memory>
16#include <set>
17#include <string>
18#include <utility>
92f5a8d4
TL
19#include <boost/asio/awaitable.hpp>
20#include <boost/asio/detached.hpp>
21#include <boost/asio/co_spawn.hpp>
11fdf7f2
TL
22#include <boost/asio/io_context.hpp>
23#include <boost/asio/ip/tcp.hpp>
24#include <boost/asio/read_until.hpp>
92f5a8d4 25#include <boost/asio/redirect_error.hpp>
11fdf7f2
TL
26#include <boost/asio/signal_set.hpp>
27#include <boost/asio/steady_timer.hpp>
92f5a8d4 28#include <boost/asio/use_awaitable.hpp>
11fdf7f2
TL
29#include <boost/asio/write.hpp>
30
31using boost::asio::ip::tcp;
92f5a8d4
TL
32using boost::asio::awaitable;
33using boost::asio::co_spawn;
34using boost::asio::detached;
35using boost::asio::redirect_error;
36using boost::asio::use_awaitable;
11fdf7f2
TL
37
38//----------------------------------------------------------------------
39
40class chat_participant
41{
42public:
43 virtual ~chat_participant() {}
44 virtual void deliver(const std::string& msg) = 0;
45};
46
47typedef std::shared_ptr<chat_participant> chat_participant_ptr;
48
49//----------------------------------------------------------------------
50
51class chat_room
52{
53public:
54 void join(chat_participant_ptr participant)
55 {
56 participants_.insert(participant);
57 for (auto msg: recent_msgs_)
58 participant->deliver(msg);
59 }
60
61 void leave(chat_participant_ptr participant)
62 {
63 participants_.erase(participant);
64 }
65
66 void deliver(const std::string& msg)
67 {
68 recent_msgs_.push_back(msg);
69 while (recent_msgs_.size() > max_recent_msgs)
70 recent_msgs_.pop_front();
71
72 for (auto participant: participants_)
73 participant->deliver(msg);
74 }
75
76private:
77 std::set<chat_participant_ptr> participants_;
78 enum { max_recent_msgs = 100 };
79 std::deque<std::string> recent_msgs_;
80};
81
82//----------------------------------------------------------------------
83
84class chat_session
85 : public chat_participant,
86 public std::enable_shared_from_this<chat_session>
87{
88public:
89 chat_session(tcp::socket socket, chat_room& room)
90 : socket_(std::move(socket)),
92f5a8d4 91 timer_(socket_.get_executor()),
11fdf7f2
TL
92 room_(room)
93 {
94 timer_.expires_at(std::chrono::steady_clock::time_point::max());
95 }
96
97 void start()
98 {
99 room_.join(shared_from_this());
100
101 co_spawn(socket_.get_executor(),
102 [self = shared_from_this()]{ return self->reader(); },
103 detached);
104
105 co_spawn(socket_.get_executor(),
106 [self = shared_from_this()]{ return self->writer(); },
107 detached);
108 }
109
110 void deliver(const std::string& msg)
111 {
112 write_msgs_.push_back(msg);
113 timer_.cancel_one();
114 }
115
116private:
117 awaitable<void> reader()
118 {
11fdf7f2
TL
119 try
120 {
121 for (std::string read_msg;;)
122 {
123 std::size_t n = co_await boost::asio::async_read_until(socket_,
92f5a8d4 124 boost::asio::dynamic_buffer(read_msg, 1024), "\n", use_awaitable);
11fdf7f2
TL
125
126 room_.deliver(read_msg.substr(0, n));
127 read_msg.erase(0, n);
128 }
129 }
130 catch (std::exception&)
131 {
132 stop();
133 }
134 }
135
136 awaitable<void> writer()
137 {
11fdf7f2
TL
138 try
139 {
140 while (socket_.is_open())
141 {
142 if (write_msgs_.empty())
143 {
144 boost::system::error_code ec;
92f5a8d4 145 co_await timer_.async_wait(redirect_error(use_awaitable, ec));
11fdf7f2
TL
146 }
147 else
148 {
149 co_await boost::asio::async_write(socket_,
92f5a8d4 150 boost::asio::buffer(write_msgs_.front()), use_awaitable);
11fdf7f2
TL
151 write_msgs_.pop_front();
152 }
153 }
154 }
155 catch (std::exception&)
156 {
157 stop();
158 }
159 }
160
161 void stop()
162 {
163 room_.leave(shared_from_this());
164 socket_.close();
165 timer_.cancel();
166 }
167
168 tcp::socket socket_;
169 boost::asio::steady_timer timer_;
170 chat_room& room_;
171 std::deque<std::string> write_msgs_;
172};
173
174//----------------------------------------------------------------------
175
176awaitable<void> listener(tcp::acceptor acceptor)
177{
11fdf7f2
TL
178 chat_room room;
179
180 for (;;)
181 {
182 std::make_shared<chat_session>(
92f5a8d4 183 co_await acceptor.async_accept(use_awaitable),
11fdf7f2
TL
184 room
185 )->start();
186 }
187}
188
189//----------------------------------------------------------------------
190
191int main(int argc, char* argv[])
192{
193 try
194 {
195 if (argc < 2)
196 {
197 std::cerr << "Usage: chat_server <port> [<port> ...]\n";
198 return 1;
199 }
200
201 boost::asio::io_context io_context(1);
202
203 for (int i = 1; i < argc; ++i)
204 {
205 unsigned short port = std::atoi(argv[i]);
206 co_spawn(io_context,
20effc67 207 listener(tcp::acceptor(io_context, {tcp::v4(), port})),
11fdf7f2
TL
208 detached);
209 }
210
211 boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
212 signals.async_wait([&](auto, auto){ io_context.stop(); });
213
214 io_context.run();
215 }
216 catch (std::exception& e)
217 {
218 std::cerr << "Exception: " << e.what() << "\n";
219 }
220
221 return 0;
222}