#include <boost/intrusive/list.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/lexical_cast.hpp>
-#include <boost/optional.hpp>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <seastar/core/app-template.hh>
-#include <seastar/core/future-util.hh>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/seastar.hh>
+#include <seastar/core/loop.hh>
#include <seastar/core/timer-set.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/stream.hh>
#include <seastar/net/api.hh>
#include <seastar/net/packet-data-source.hh>
#include <seastar/util/std-compat.hh>
+#include <seastar/util/log.hh>
#include "ascii.hh"
#include "memcached.hh"
#include <unistd.h>
static thread_local std::unique_ptr<slab_allocator<item>> slab_holder;
template<typename T>
-using optional = boost::optional<T>;
+using optional = std::optional<T>;
using clock_type = lowres_clock;
return _version;
}
- const compat::string_view key() const {
- return compat::string_view(_data, _key_size);
+ const std::string_view key() const {
+ return std::string_view(_data, _key_size);
}
- const compat::string_view ascii_prefix() const {
+ const std::string_view ascii_prefix() const {
const char *p = _data + align_up(_key_size, field_alignment);
- return compat::string_view(p, _ascii_prefix_size);
+ return std::string_view(p, _ascii_prefix_size);
}
- const compat::string_view value() const {
+ const std::string_view value() const {
const char *p = _data + align_up(_key_size, field_alignment) +
align_up(_ascii_prefix_size, field_alignment);
- return compat::string_view(p, _value_size);
+ return std::string_view(p, _value_size);
}
size_t key_size() const {
}
ss << histo[i] << "\n";
}
- return {engine().cpu_id(), make_foreign(make_lw_shared<std::string>(ss.str()))};
+ return {this_shard_id(), make_foreign(make_lw_shared<std::string>(ss.str()))};
}
future<> stop() { return make_ready_future<>(); }
// The caller must keep @insertion live until the resulting future resolves.
future<bool> set(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
- if (engine().cpu_id() == cpu) {
+ if (this_shard_id() == cpu) {
return make_ready_future<bool>(_peers.local().set(insertion));
}
return _peers.invoke_on(cpu, &cache::set<remote_origin_tag>, std::ref(insertion));
// The caller must keep @insertion live until the resulting future resolves.
future<bool> add(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
- if (engine().cpu_id() == cpu) {
+ if (this_shard_id() == cpu) {
return make_ready_future<bool>(_peers.local().add(insertion));
}
return _peers.invoke_on(cpu, &cache::add<remote_origin_tag>, std::ref(insertion));
// The caller must keep @insertion live until the resulting future resolves.
future<bool> replace(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
- if (engine().cpu_id() == cpu) {
+ if (this_shard_id() == cpu) {
return make_ready_future<bool>(_peers.local().replace(insertion));
}
return _peers.invoke_on(cpu, &cache::replace<remote_origin_tag>, std::ref(insertion));
// The caller must keep @insertion live until the resulting future resolves.
future<cas_result> cas(item_insertion_data& insertion, item::version_type version) {
auto cpu = get_cpu(insertion.key);
- if (engine().cpu_id() == cpu) {
+ if (this_shard_id() == cpu) {
return make_ready_future<cas_result>(_peers.local().cas(insertion, version));
}
return _peers.invoke_on(cpu, &cache::cas<remote_origin_tag>, std::ref(insertion), std::move(version));
// The caller must keep @key live until the resulting future resolves.
future<std::pair<item_ptr, bool>> incr(item_key& key, uint64_t delta) {
auto cpu = get_cpu(key);
- if (engine().cpu_id() == cpu) {
+ if (this_shard_id() == cpu) {
return make_ready_future<std::pair<item_ptr, bool>>(
_peers.local().incr<local_origin_tag>(key, delta));
}
// The caller must keep @key live until the resulting future resolves.
future<std::pair<item_ptr, bool>> decr(item_key& key, uint64_t delta) {
auto cpu = get_cpu(key);
- if (engine().cpu_id() == cpu) {
+ if (this_shard_id() == cpu) {
return make_ready_future<std::pair<item_ptr, bool>>(
_peers.local().decr(key, delta));
}
public:
static const size_t default_max_datagram_size = 1400;
private:
- compat::optional<future<>> _task;
+ std::optional<future<>> _task;
sharded_cache& _cache;
distributed<system_stats>& _system_stats;
udp_channel _chan;
}
void start() {
- _chan = engine().net().make_udp_channel({_port});
+ _chan = make_udp_channel({_port});
// Run in the background.
_task = keep_doing([this] {
return _chan.receive().then([this](udp_datagram dgram) {
class tcp_server {
private:
- compat::optional<future<>> _task;
- lw_shared_ptr<seastar::api_v2::server_socket> _listener;
+ std::optional<future<>> _task;
+ lw_shared_ptr<seastar::server_socket> _listener;
sharded_cache& _cache;
distributed<system_stats>& _system_stats;
uint16_t _port;
void start() {
listen_options lo;
lo.reuse_address = true;
- _listener = seastar::api_v2::server_socket(engine().listen(make_ipv4_address({_port}), lo));
+ _listener = seastar::server_socket(seastar::listen(make_ipv4_address({_port}), lo));
// Run in the background until eof has reached on the input connection.
_task = keep_doing([this] {
return _listener->accept().then([this] (accept_result ar) mutable {