#include <seastar/core/bitops.hh>
#include <seastar/core/slab.hh>
#include <seastar/core/align.hh>
+#include <seastar/core/print.hh>
#include <seastar/net/api.hh>
#include <seastar/net/packet-data-source.hh>
+#include <seastar/util/std-compat.hh>
#include "ascii.hh"
#include "memcached.hh"
#include <unistd.h>
assert(it->_ref_count >= 0);
}
- friend class item_key_cmp;
+ friend struct item_key_cmp;
};
struct item_key_cmp
public:
static const size_t default_max_datagram_size = 1400;
private:
+ compat::optional<future<>> _task;
sharded_cache& _cache;
distributed<system_stats>& _system_stats;
udp_channel _chan;
void start() {
_chan = engine().net().make_udp_channel({_port});
- keep_doing([this] {
+ // Run in the background.
+ _task = keep_doing([this] {
return _chan.receive().then([this](udp_datagram dgram) {
packet& p = dgram.get_data();
if (p.len() < sizeof(header)) {
});
});
});
- }).or_terminate();
+ });
};
- future<> stop() { return make_ready_future<>(); }
+ future<> stop() {
+ _chan.shutdown_input();
+ _chan.shutdown_output();
+ return _task->handle_exception([](std::exception_ptr e) {
+ std::cerr << "exception in udp_server " << e << '\n';
+ });
+ }
};
class tcp_server {
private:
- lw_shared_ptr<server_socket> _listener;
+ compat::optional<future<>> _task;
+ lw_shared_ptr<seastar::api_v2::server_socket> _listener;
sharded_cache& _cache;
distributed<system_stats>& _system_stats;
uint16_t _port;
void start() {
listen_options lo;
lo.reuse_address = true;
- _listener = engine().listen(make_ipv4_address({_port}), lo);
- keep_doing([this] {
- return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable {
+ _listener = seastar::api_v2::server_socket(engine().listen(make_ipv4_address({_port}), lo));
+ // Run in the background until eof has reached on the input connection.
+ _task = keep_doing([this] {
+ return _listener->accept().then([this] (accept_result ar) mutable {
+ connected_socket fd = std::move(ar.connection);
+ socket_address addr = std::move(ar.remote_address);
auto conn = make_lw_shared<connection>(std::move(fd), addr, _cache, _system_stats);
- do_until([conn] { return conn->_in.eof(); }, [conn] {
+ (void)do_until([conn] { return conn->_in.eof(); }, [conn] {
return conn->_proto.handle(conn->_in, conn->_out).then([conn] {
return conn->_out.flush();
});
return conn->_out.close().finally([conn]{});
});
});
- }).or_terminate();
+ });
}
- future<> stop() { return make_ready_future<>(); }
+ future<> stop() {
+ _listener->abort_accept();
+ return _task->handle_exception([](std::exception_ptr e) {
+ std::cerr << "exception in tcp_server " << e << '\n';
+ });
+ }
};
class stats_printer {
void start() {
_timer.set_callback([this] {
- _cache.stats().then([] (auto stats) {
+ (void)_cache.stats().then([] (auto stats) {
auto gets_total = stats._get_hits + stats._get_misses;
auto get_hit_rate = gets_total ? ((double)stats._get_hits * 100 / gets_total) : 0;
auto sets_total = stats._set_adds + stats._set_replaces;