#include "Socket.h"
+#include <seastar/core/when_all.hh>
+
#include "crimson/common/log.h"
#include "Errors.h"
return seastar::when_all_succeed(
in.close(),
close_and_handle_errors(out)
- ).handle_exception([] (auto eptr) {
+ ).then_unpack([] {
+ return seastar::make_ready_future<>();
+ }).handle_exception([] (auto eptr) {
logger().error("Socket::close(): unexpected exception {}", eptr);
ceph_abort();
});
default:
ceph_abort("unexpected action from trap");
}
- return seastar::now();
+ return seastar::make_ready_future<>();
}
seastar::future<> Socket::try_trap_post(bp_action_t& trap) {
default:
ceph_abort("unexpected action from trap");
}
- return seastar::now();
+ return seastar::make_ready_future<>();
}
void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
}
#endif
+FixedCPUServerSocket::listen_ertr::future<>
+FixedCPUServerSocket::listen(entity_addr_t addr)
+{
+ assert(seastar::this_shard_id() == cpu);
+ logger().trace("FixedCPUServerSocket::listen({})...", addr);
+ return container().invoke_on_all([addr] (auto& ss) {
+ ss.addr = addr;
+ seastar::socket_address s_addr(addr.in4_addr());
+ seastar::listen_options lo;
+ lo.reuse_address = true;
+ lo.set_fixed_cpu(ss.cpu);
+ ss.listener = seastar::listen(s_addr, lo);
+ }).then([] {
+ return true;
+ }).handle_exception_type([addr] (const std::system_error& e) {
+ if (e.code() == std::errc::address_in_use) {
+ logger().trace("FixedCPUServerSocket::listen({}): address in use", addr);
+ } else {
+ logger().error("FixedCPUServerSocket::listen({}): "
+ "got unexpeted error {}", addr, e);
+ ceph_abort();
+ }
+ return false;
+ }).then([] (bool success) -> listen_ertr::future<> {
+ if (success) {
+ return listen_ertr::now();
+ } else {
+ return crimson::ct_error::address_in_use::make();
+ }
+ });
+}
+
+seastar::future<> FixedCPUServerSocket::shutdown()
+{
+ assert(seastar::this_shard_id() == cpu);
+ logger().trace("FixedCPUServerSocket({})::shutdown()...", addr);
+ return container().invoke_on_all([] (auto& ss) {
+ if (ss.listener) {
+ ss.listener->abort_accept();
+ }
+ return ss.shutdown_gate.close();
+ }).then([this] {
+ return reset();
+ });
+}
+
+seastar::future<> FixedCPUServerSocket::destroy()
+{
+ assert(seastar::this_shard_id() == cpu);
+ return shutdown().then([this] {
+ // we should only construct/stop shards on #0
+ return container().invoke_on(0, [] (auto& ss) {
+ assert(ss.service);
+ return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
+ });
+ });
+}
+
+seastar::future<FixedCPUServerSocket*> FixedCPUServerSocket::create()
+{
+ auto cpu = seastar::this_shard_id();
+ // we should only construct/stop shards on #0
+ return seastar::smp::submit_to(0, [cpu] {
+ auto service = std::make_unique<sharded_service_t>();
+ return service->start(cpu, construct_tag{}
+ ).then([service = std::move(service)] () mutable {
+ auto p_shard = service.get();
+ p_shard->local().service = std::move(service);
+ return p_shard;
+ });
+ }).then([] (auto p_shard) {
+ return &p_shard->local();
+ });
+}
+
} // namespace crimson::net