]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/apps/memcached/memcache.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / apps / memcached / memcache.cc
index 4cce0e9272097d7bdd230ed4443c846952df4264..cf0f3eaaa6edd8a6c704c99893ce3d6d2d6267ce 100644 (file)
 #include <seastar/core/bitops.hh>
 #include <seastar/core/slab.hh>
 #include <seastar/core/align.hh>
+#include <seastar/core/print.hh>
 #include <seastar/net/api.hh>
 #include <seastar/net/packet-data-source.hh>
+#include <seastar/util/std-compat.hh>
 #include "ascii.hh"
 #include "memcached.hh"
 #include <unistd.h>
@@ -268,7 +270,7 @@ public:
         assert(it->_ref_count >= 0);
     }
 
-    friend class item_key_cmp;
+    friend struct item_key_cmp;
 };
 
 struct item_key_cmp
@@ -1219,6 +1221,7 @@ class udp_server {
 public:
     static const size_t default_max_datagram_size = 1400;
 private:
+    compat::optional<future<>> _task;
     sharded_cache& _cache;
     distributed<system_stats>& _system_stats;
     udp_channel _chan;
@@ -1280,7 +1283,8 @@ public:
 
     void start() {
         _chan = engine().net().make_udp_channel({_port});
-        keep_doing([this] {
+        // Run in the background.
+        _task = keep_doing([this] {
             return _chan.receive().then([this](udp_datagram dgram) {
                 packet& p = dgram.get_data();
                 if (p.len() < sizeof(header)) {
@@ -1310,15 +1314,22 @@ public:
                     });
                 });
             });
-        }).or_terminate();
+        });
     };
 
-    future<> stop() { return make_ready_future<>(); }
+    future<> stop() {
+        _chan.shutdown_input();
+        _chan.shutdown_output();
+        return _task->handle_exception([](std::exception_ptr e) {
+            std::cerr << "exception in udp_server " << e << '\n';
+        });
+    }
 };
 
 class tcp_server {
 private:
-    lw_shared_ptr<server_socket> _listener;
+    compat::optional<future<>> _task;
+    lw_shared_ptr<seastar::api_v2::server_socket> _listener;
     sharded_cache& _cache;
     distributed<system_stats>& _system_stats;
     uint16_t _port;
@@ -1354,11 +1365,14 @@ public:
     void start() {
         listen_options lo;
         lo.reuse_address = true;
-        _listener = engine().listen(make_ipv4_address({_port}), lo);
-        keep_doing([this] {
-            return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable {
+        _listener = seastar::api_v2::server_socket(engine().listen(make_ipv4_address({_port}), lo));
+        // Run in the background until eof has reached on the input connection.
+        _task = keep_doing([this] {
+            return _listener->accept().then([this] (accept_result ar) mutable {
+                connected_socket fd = std::move(ar.connection);
+                socket_address addr = std::move(ar.remote_address);
                 auto conn = make_lw_shared<connection>(std::move(fd), addr, _cache, _system_stats);
-                do_until([conn] { return conn->_in.eof(); }, [conn] {
+                (void)do_until([conn] { return conn->_in.eof(); }, [conn] {
                     return conn->_proto.handle(conn->_in, conn->_out).then([conn] {
                         return conn->_out.flush();
                     });
@@ -1366,10 +1380,15 @@ public:
                     return conn->_out.close().finally([conn]{});
                 });
             });
-        }).or_terminate();
+        });
     }
 
-    future<> stop() { return make_ready_future<>(); }
+    future<> stop() {
+        _listener->abort_accept();
+        return _task->handle_exception([](std::exception_ptr e) {
+            std::cerr << "exception in tcp_server " << e << '\n';
+        });
+    }
 };
 
 class stats_printer {
@@ -1382,7 +1401,7 @@ public:
 
     void start() {
         _timer.set_callback([this] {
-            _cache.stats().then([] (auto stats) {
+            (void)_cache.stats().then([] (auto stats) {
                 auto gets_total = stats._get_hits + stats._get_misses;
                 auto get_hit_rate = gets_total ? ((double)stats._get_hits * 100 / gets_total) : 0;
                 auto sets_total = stats._set_adds + stats._set_replaces;