2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
22 #include <seastar/core/app-template.hh>
23 #include <seastar/core/future-util.hh>
24 #include <seastar/core/scattered_message.hh>
25 #include <seastar/core/vector-data-sink.hh>
26 #include <seastar/core/shared_ptr.hh>
27 #include <seastar/core/units.hh>
32 using namespace seastar
;
34 using namespace std::chrono_literals
;
35 namespace bpo
= boost::program_options
;
37 template <typename Duration
>
38 typename
Duration::rep
to_seconds(Duration d
) {
39 return std::chrono::duration_cast
<std::chrono::seconds
>(d
).count();
49 std::vector
<packet
> _packets
;
50 std::unique_ptr
<output_stream
<char>> _out
;
51 steady_clock_type::time_point _last
;
53 size_t _packet_size
= 8*KB
;
57 std::random_device _randem_dev
;
58 std::uniform_int_distribution
<size_t> _chunk_distribution
;
61 return _mem
+ _chunk_distribution(_rnd
);
65 : _rnd(std::random_device()()) {
67 future
<> send(ipv4_addr dst
, packet p
) {
68 return _chan
.send(dst
, std::move(p
)).then([this] {
72 void start(int chunk_size
, bool copy
, size_t mem_size
) {
73 ipv4_addr listen_addr
{10000};
74 _chan
= engine().net().make_udp_channel(listen_addr
);
76 std::cout
<< "Listening on " << listen_addr
<< std::endl
;
78 _last
= steady_clock_type::now();
79 _stats_timer
.set_callback([this] {
80 auto now
= steady_clock_type::now();
82 << std::setprecision(2) << std::fixed
83 << (double)_n_sent
/ to_seconds(now
- _last
)
84 << " pps" << std::endl
;
88 _stats_timer
.arm_periodic(1s
);
90 _chunk_size
= chunk_size
;
92 _key
= sstring(new char[64], 64);
94 _out
= std::make_unique
<output_stream
<char>>(
95 data_sink(std::make_unique
<vector_data_sink
>(_packets
)), _packet_size
);
97 _mem
= new char[mem_size
];
100 _chunk_distribution
= std::uniform_int_distribution
<size_t>(0, _mem_size
- _chunk_size
* 3);
102 assert(3 * _chunk_size
<= _packet_size
);
104 // Run sender in background.
105 (void)keep_doing([this] {
106 return _chan
.receive().then([this] (udp_datagram dgram
) {
107 auto chunk
= next_chunk();
108 lw_shared_ptr
<sstring
> item
;
111 // FIXME: future is discarded
112 (void)_out
->write(chunk
, _chunk_size
);
113 chunk
+= _chunk_size
;
114 (void)_out
->write(chunk
, _chunk_size
);
115 chunk
+= _chunk_size
;
116 (void)_out
->write(chunk
, _chunk_size
);
118 assert(_packets
.size() == 1);
119 return send(dgram
.get_src(), std::move(_packets
[0]));
121 auto chunk
= next_chunk();
122 scattered_message
<char> msg
;
124 msg
.append_static(chunk
, _chunk_size
);
125 msg
.append_static(chunk
, _chunk_size
);
126 msg
.append_static(chunk
, _chunk_size
);
127 return send(dgram
.get_src(), std::move(msg
).release());
134 int main(int ac
, char ** av
) {
138 ("chunk-size", bpo::value
<int>()->default_value(1024),
140 ("mem-size", bpo::value
<int>()->default_value(512),
141 "Memory pool size in MiB")
142 ("copy", "Copy data rather than send via zero-copy")
144 return app
.run_deprecated(ac
, av
, [&app
, &s
] {
145 auto&& config
= app
.configuration();
146 auto chunk_size
= config
["chunk-size"].as
<int>();
147 auto mem_size
= (size_t)config
["mem-size"].as
<int>() * MB
;
148 auto copy
= config
.count("copy");
149 s
.start(chunk_size
, copy
, mem_size
);