]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/demos/udp_zero_copy_demo.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / demos / udp_zero_copy_demo.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22 #include <seastar/core/app-template.hh>
23 #include <seastar/core/future-util.hh>
24 #include <seastar/core/scattered_message.hh>
25 #include <seastar/core/vector-data-sink.hh>
26 #include <seastar/core/shared_ptr.hh>
27 #include <seastar/core/units.hh>
28 #include <random>
29 #include <iomanip>
30 #include <iostream>
31
32 using namespace seastar;
33 using namespace net;
34 using namespace std::chrono_literals;
35 namespace bpo = boost::program_options;
36
37 template <typename Duration>
38 typename Duration::rep to_seconds(Duration d) {
39 return std::chrono::duration_cast<std::chrono::seconds>(d).count();
40 }
41
42 class server {
43 private:
44 udp_channel _chan;
45 timer<> _stats_timer;
46 uint64_t _n_sent {};
47 size_t _chunk_size;
48 bool _copy;
49 std::vector<packet> _packets;
50 std::unique_ptr<output_stream<char>> _out;
51 steady_clock_type::time_point _last;
52 sstring _key;
53 size_t _packet_size = 8*KB;
54 char* _mem;
55 size_t _mem_size;
56 std::mt19937 _rnd;
57 std::random_device _randem_dev;
58 std::uniform_int_distribution<size_t> _chunk_distribution;
59 private:
60 char* next_chunk() {
61 return _mem + _chunk_distribution(_rnd);
62 }
63 public:
64 server()
65 : _rnd(std::random_device()()) {
66 }
67 future<> send(ipv4_addr dst, packet p) {
68 return _chan.send(dst, std::move(p)).then([this] {
69 _n_sent++;
70 });
71 }
72 void start(int chunk_size, bool copy, size_t mem_size) {
73 ipv4_addr listen_addr{10000};
74 _chan = engine().net().make_udp_channel(listen_addr);
75
76 std::cout << "Listening on " << listen_addr << std::endl;
77
78 _last = steady_clock_type::now();
79 _stats_timer.set_callback([this] {
80 auto now = steady_clock_type::now();
81 std::cout << "Out: "
82 << std::setprecision(2) << std::fixed
83 << (double)_n_sent / to_seconds(now - _last)
84 << " pps" << std::endl;
85 _last = now;
86 _n_sent = 0;
87 });
88 _stats_timer.arm_periodic(1s);
89
90 _chunk_size = chunk_size;
91 _copy = copy;
92 _key = sstring(new char[64], 64);
93
94 _out = std::make_unique<output_stream<char>>(
95 data_sink(std::make_unique<vector_data_sink>(_packets)), _packet_size);
96
97 _mem = new char[mem_size];
98 _mem_size = mem_size;
99
100 _chunk_distribution = std::uniform_int_distribution<size_t>(0, _mem_size - _chunk_size * 3);
101
102 assert(3 * _chunk_size <= _packet_size);
103
104 // Run sender in background.
105 (void)keep_doing([this] {
106 return _chan.receive().then([this] (udp_datagram dgram) {
107 auto chunk = next_chunk();
108 lw_shared_ptr<sstring> item;
109 if (_copy) {
110 _packets.clear();
111 // FIXME: future is discarded
112 (void)_out->write(chunk, _chunk_size);
113 chunk += _chunk_size;
114 (void)_out->write(chunk, _chunk_size);
115 chunk += _chunk_size;
116 (void)_out->write(chunk, _chunk_size);
117 (void)_out->flush();
118 assert(_packets.size() == 1);
119 return send(dgram.get_src(), std::move(_packets[0]));
120 } else {
121 auto chunk = next_chunk();
122 scattered_message<char> msg;
123 msg.reserve(3);
124 msg.append_static(chunk, _chunk_size);
125 msg.append_static(chunk, _chunk_size);
126 msg.append_static(chunk, _chunk_size);
127 return send(dgram.get_src(), std::move(msg).release());
128 }
129 });
130 });
131 }
132 };
133
134 int main(int ac, char ** av) {
135 server s;
136 app_template app;
137 app.add_options()
138 ("chunk-size", bpo::value<int>()->default_value(1024),
139 "Chunk size")
140 ("mem-size", bpo::value<int>()->default_value(512),
141 "Memory pool size in MiB")
142 ("copy", "Copy data rather than send via zero-copy")
143 ;
144 return app.run_deprecated(ac, av, [&app, &s] {
145 auto&& config = app.configuration();
146 auto chunk_size = config["chunk-size"].as<int>();
147 auto mem_size = (size_t)config["mem-size"].as<int>() * MB;
148 auto copy = config.count("copy");
149 s.start(chunk_size, copy, mem_size);
150 });
151 }