]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/beast/example/http/client/crawl/http_crawl.cpp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / libs / beast / example / http / client / crawl / http_crawl.cpp
1 //
2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 //------------------------------------------------------------------------------
11 //
12 // Example: HTTP crawl (asynchronous)
13 //
14 //------------------------------------------------------------------------------
15
16 #include "urls_large_data.hpp"
17
18 #include <boost/beast/core.hpp>
19 #include <boost/beast/http.hpp>
20 #include <boost/beast/version.hpp>
21 #include <boost/asio/bind_executor.hpp>
22 #include <boost/asio/connect.hpp>
23 #include <boost/asio/ip/tcp.hpp>
24 #include <boost/asio/post.hpp>
25 #include <boost/asio/strand.hpp>
26 #include <atomic>
27 #include <chrono>
28 #include <cstdlib>
29 #include <functional>
30 #include <iomanip>
31 #include <iostream>
32 #include <memory>
33 #include <string>
34 #include <thread>
35 #include <vector>
36 #include <map>
37
38 using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
39 namespace http = boost::beast::http; // from <boost/beast/http.hpp>
40 namespace chrono = std::chrono; // from <chrono>
41
42 //------------------------------------------------------------------------------
43
44 // This structure aggregates statistics on all the sites
45 class crawl_report
46 {
47 boost::asio::io_context& ioc_;
48 boost::asio::strand<
49 boost::asio::io_context::executor_type> strand_;
50 std::atomic<std::size_t> index_;
51 std::vector<char const*> const& hosts_;
52 std::size_t count_ = 0;
53
54 public:
55 crawl_report(boost::asio::io_context& ioc)
56 : ioc_(ioc)
57 , strand_(ioc_.get_executor())
58 , index_(0)
59 , hosts_(urls_large_data())
60 {
61 }
62
63 // Run an aggregation function on the strand.
64 // This allows synchronization without a mutex.
65 template<class F>
66 void
67 aggregate(F const& f)
68 {
69 boost::asio::post(
70 strand_,
71 [&, f]
72 {
73 f(*this);
74 if(count_ % 100 == 0)
75 {
76 std::cerr <<
77 "Progress: " << count_ << " of " << hosts_.size() << "\n";
78 //std::cerr << *this;
79 }
80 ++count_;
81 });
82 }
83
84 // Returns the next host to check
85 char const*
86 get_host()
87 {
88 auto const n = index_++;
89 if(n >= hosts_.size())
90 return nullptr;
91 return hosts_[n];
92 }
93
94 // Counts the number of timer failures
95 std::size_t timer_failures = 0;
96
97 // Counts the number of name resolution failures
98 std::size_t resolve_failures = 0;
99
100 // Counts the number of connection failures
101 std::size_t connect_failures = 0;
102
103 // Counts the number of write failures
104 std::size_t write_failures = 0;
105
106 // Counts the number of read failures
107 std::size_t read_failures = 0;
108
109 // Counts the number of success reads
110 std::size_t success = 0;
111
112 // Counts the number received of each status code
113 std::map<unsigned, std::size_t> status_codes;
114 };
115
116 std::ostream&
117 operator<<(std::ostream& os, crawl_report const& report)
118 {
119 // Print the report
120 os <<
121 "Crawl report\n" <<
122 " Failure counts\n" <<
123 " Timer : " << report.timer_failures << "\n" <<
124 " Resolve : " << report.resolve_failures << "\n" <<
125 " Connect : " << report.connect_failures << "\n" <<
126 " Write : " << report.write_failures << "\n" <<
127 " Read : " << report.read_failures << "\n" <<
128 " Success : " << report.success << "\n" <<
129 " Status codes\n"
130 ;
131 for(auto const& result : report.status_codes)
132 os <<
133 " " << std::setw(3) << result.first << ": " << result.second <<
134 " (" << http::obsolete_reason(static_cast<http::status>(result.first)) << ")\n";
135 os.flush();
136 return os;
137 }
138
139 //------------------------------------------------------------------------------
140
141 // Performs HTTP GET requests and aggregates the results into a report
142 class worker : public std::enable_shared_from_this<worker>
143 {
144 enum
145 {
146 // Use a small timeout to keep things lively
147 timeout = 5
148 };
149
150 crawl_report& report_;
151 tcp::resolver resolver_;
152 tcp::socket socket_;
153 boost::asio::steady_timer timer_;
154 boost::asio::strand<
155 boost::asio::io_context::executor_type> strand_;
156 boost::beast::flat_buffer buffer_; // (Must persist between reads)
157 http::request<http::empty_body> req_;
158 http::response<http::string_body> res_;
159
160 public:
161 worker(worker&&) = default;
162
163 // Resolver and socket require an io_context
164 worker(
165 crawl_report& report,
166 boost::asio::io_context& ioc)
167 : report_(report)
168 , resolver_(ioc)
169 , socket_(ioc)
170 , timer_(ioc,
171 (chrono::steady_clock::time_point::max)())
172 , strand_(ioc.get_executor())
173 {
174 // Set up the common fields of the request
175 req_.version(11);
176 req_.method(http::verb::get);
177 req_.target("/");
178 req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
179 }
180
181 // Start the asynchronous operation
182 void
183 run()
184 {
185 // Run the timer. The timer is operated
186 // continuously, this simplifies the code.
187 on_timer({});
188
189 do_get_host();
190 }
191
192 void
193 on_timer(boost::system::error_code ec)
194 {
195 if(ec && ec != boost::asio::error::operation_aborted)
196 {
197 // Should never happen
198 report_.aggregate(
199 [](crawl_report& rep)
200 {
201 ++rep.timer_failures;
202 });
203 return;
204 }
205
206 // Verify that the timer really expired since the deadline may have moved.
207 if(timer_.expiry() <= chrono::steady_clock::now())
208 {
209 socket_.shutdown(tcp::socket::shutdown_both, ec);
210 socket_.close(ec);
211 return;
212 }
213
214 // Wait on the timer
215 timer_.async_wait(
216 boost::asio::bind_executor(
217 strand_,
218 std::bind(
219 &worker::on_timer,
220 shared_from_this(),
221 std::placeholders::_1)));
222 }
223
224 void
225 do_get_host()
226 {
227 // Grab another host
228 auto const host = report_.get_host();
229
230 // nullptr means no more work
231 if(! host)
232 {
233 timer_.cancel_one();
234 return;
235 }
236
237 // The Host HTTP field is required
238 req_.set(http::field::host, host);
239
240 // Set the timer
241 timer_.expires_after(chrono::seconds(timeout));
242
243 // Set up an HTTP GET request message
244 // Look up the domain name
245 resolver_.async_resolve(
246 host,
247 "http",
248 boost::asio::bind_executor(
249 strand_,
250 std::bind(
251 &worker::on_resolve,
252 shared_from_this(),
253 std::placeholders::_1,
254 std::placeholders::_2)));
255 }
256
257 void
258 on_resolve(
259 boost::system::error_code ec,
260 tcp::resolver::results_type results)
261 {
262 if(ec)
263 {
264 report_.aggregate(
265 [](crawl_report& rep)
266 {
267 ++rep.resolve_failures;
268 });
269 return do_get_host();
270 }
271
272 // Set the timer
273 timer_.expires_after(chrono::seconds(timeout));
274
275 // Make the connection on the IP address we get from a lookup
276 boost::asio::async_connect(
277 socket_,
278 results.begin(),
279 results.end(),
280 boost::asio::bind_executor(
281 strand_,
282 std::bind(
283 &worker::on_connect,
284 shared_from_this(),
285 std::placeholders::_1)));
286 }
287
288 void
289 on_connect(boost::system::error_code ec)
290 {
291 if(ec)
292 {
293 report_.aggregate(
294 [](crawl_report& rep)
295 {
296 ++rep.connect_failures;
297 });
298 return do_get_host();
299 }
300
301 // Set the timer
302 timer_.expires_after(chrono::seconds(timeout));
303
304 // Send the HTTP request to the remote host
305 http::async_write(
306 socket_,
307 req_,
308 boost::asio::bind_executor(
309 strand_,
310 std::bind(
311 &worker::on_write,
312 shared_from_this(),
313 std::placeholders::_1,
314 std::placeholders::_2)));
315 }
316
317 void
318 on_write(
319 boost::system::error_code ec,
320 std::size_t bytes_transferred)
321 {
322 boost::ignore_unused(bytes_transferred);
323
324 if(ec)
325 {
326 report_.aggregate(
327 [](crawl_report& rep)
328 {
329 ++rep.write_failures;
330 });
331 return do_get_host();
332 }
333
334 // Set the timer
335 timer_.expires_after(chrono::seconds(timeout));
336
337 // Receive the HTTP response
338 http::async_read(
339 socket_,
340 buffer_,
341 res_,
342 boost::asio::bind_executor(
343 strand_,
344 std::bind(
345 &worker::on_read,
346 shared_from_this(),
347 std::placeholders::_1,
348 std::placeholders::_2)));
349 }
350
351 void
352 on_read(
353 boost::system::error_code ec,
354 std::size_t bytes_transferred)
355 {
356 boost::ignore_unused(bytes_transferred);
357
358 if(ec)
359 {
360 report_.aggregate(
361 [](crawl_report& rep)
362 {
363 ++rep.read_failures;
364 });
365 return do_get_host();
366 }
367
368 auto const code = res_.result_int();
369 report_.aggregate(
370 [code](crawl_report& rep)
371 {
372 ++rep.success;
373 ++rep.status_codes[code];
374 });
375
376 // Gracefully close the socket
377 socket_.shutdown(tcp::socket::shutdown_both, ec);
378 socket_.close(ec);
379
380 // If we get here then the connection is closed gracefully
381
382 do_get_host();
383 }
384 };
385
386 class timer
387 {
388 using clock_type = chrono::system_clock;
389
390 clock_type::time_point when_;
391
392 public:
393 using duration = clock_type::duration;
394
395 timer()
396 : when_(clock_type::now())
397 {
398 }
399
400 duration
401 elapsed() const
402 {
403 return clock_type::now() - when_;
404 }
405 };
406
407 int main(int argc, char* argv[])
408 {
409 // Check command line arguments.
410 if (argc != 2)
411 {
412 std::cerr <<
413 "Usage: http-crawl <threads>\n" <<
414 "Example:\n" <<
415 " http-crawl 100 1\n";
416 return EXIT_FAILURE;
417 }
418 auto const threads = std::max<int>(1, std::atoi(argv[1]));
419
420 // The io_context is required for all I/O
421 boost::asio::io_context ioc{1};
422
423 // The work keeps io_context::run from returning
424 auto work = boost::asio::make_work_guard(ioc);
425
426 // The report holds the aggregated statistics
427 crawl_report report{ioc};
428
429 timer t;
430
431 // Create and launch the worker threads.
432 std::vector<std::thread> workers;
433 workers.reserve(threads + 1);
434 for(int i = 0; i < threads; ++i)
435 workers.emplace_back(
436 [&report]
437 {
438 // We use a separate io_context for each worker because
439 // the asio resolver simulates asynchronous operation using
440 // a dedicated worker thread per io_context, and we want to
441 // do a lot of name resolutions in parallel.
442 boost::asio::io_context ioc{1};
443 std::make_shared<worker>(report, ioc)->run();
444 ioc.run();
445 });
446
447 // Add another thread to run the main io_context which
448 // is used to aggregate the statistics
449 workers.emplace_back(
450 [&ioc]
451 {
452 ioc.run();
453 });
454
455 // Now block until all threads exit
456 for(std::size_t i = 0; i < workers.size(); ++i)
457 {
458 auto& thread = workers[i];
459
460 // If this is the last thread, reset the
461 // work object so that it can return from run.
462 if(i == workers.size() - 1)
463 work.reset();
464
465 // Wait for the thread to exit
466 thread.join();
467 }
468
469 std::cout <<
470 "Elapsed time: " << chrono::duration_cast<chrono::seconds>(t.elapsed()).count() << " seconds\n";
471 std::cout << report;
472
473 return EXIT_SUCCESS;
474 }