2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2014-2015 Cloudius Systems
22 #include <boost/intrusive/unordered_set.hpp>
23 #include <boost/intrusive/list.hpp>
24 #include <boost/intrusive_ptr.hpp>
25 #include <boost/lexical_cast.hpp>
29 #include <seastar/core/app-template.hh>
30 #include <seastar/core/reactor.hh>
31 #include <seastar/core/seastar.hh>
32 #include <seastar/core/loop.hh>
33 #include <seastar/core/timer-set.hh>
34 #include <seastar/core/shared_ptr.hh>
35 #include <seastar/core/stream.hh>
36 #include <seastar/core/memory.hh>
37 #include <seastar/core/units.hh>
38 #include <seastar/core/distributed.hh>
39 #include <seastar/core/vector-data-sink.hh>
40 #include <seastar/core/bitops.hh>
41 #include <seastar/core/slab.hh>
42 #include <seastar/core/align.hh>
43 #include <seastar/core/print.hh>
44 #include <seastar/net/api.hh>
45 #include <seastar/net/packet-data-source.hh>
46 #include <seastar/util/std-compat.hh>
47 #include <seastar/util/log.hh>
49 #include "memcached.hh"
52 #define PLATFORM "seastar"
53 #define VERSION "v1.0"
54 #define VERSION_STRING PLATFORM " " VERSION
56 using namespace seastar
;
61 namespace bi
= boost::intrusive
;
63 static constexpr double default_slab_growth_factor
= 1.25;
64 static constexpr uint64_t default_slab_page_size
= 1UL*MB
;
65 static constexpr uint64_t default_per_cpu_slab_size
= 0UL; // zero means reclaimer is enabled.
66 static __thread slab_allocator
<item
>* slab
;
67 static thread_local
std::unique_ptr
<slab_allocator
<item
>> slab_holder
;
70 using optional
= std::optional
<T
>;
72 using clock_type
= lowres_clock
;
75 // "Expiration" is a uint32_t value.
76 // The minimal value of _time is when "expiration" is set to (seconds_in_a_month
78 // In this case _time will have a value of
80 // (seconds_in_a_month + 1 - Wall_Clock_Time_Since_Epoch)
82 // because lowres_clock now() initialized to zero when the application starts.
84 // We will use a timepoint at LLONG_MIN to represent a "never expire" value
85 // since it will not collide with the minimum _time value mentioned above for
86 // about 290 thousand years to come.
88 static constexpr clock_type::time_point never_expire_timepoint
= clock_type::time_point(clock_type::duration::min());
91 using time_point
= clock_type::time_point
;
92 using duration
= time_point::duration
;
94 static constexpr uint32_t seconds_in_a_month
= 60U * 60 * 24 * 30;
95 time_point _time
= never_expire_timepoint
;
99 expiration(clock_type::duration wc_to_clock_type_delta
, uint32_t s
) {
100 using namespace std::chrono
;
102 static_assert(sizeof(clock_type::duration::rep
) >= 8, "clock_type::duration::rep must be at least 8 bytes wide");
105 return; // means never expire.
106 } else if (s
<= seconds_in_a_month
) {
107 _time
= clock_type::now() + seconds(s
); // from delta
110 // seastar::reactor supports only a monotonic clock at the moment
111 // therefore this may make the elements with the absolute expiration
112 // time expire at the wrong time if the wall clock has been updated
113 // during the expiration period. However the original memcached has
114 // the same weakness.
116 // TODO: Fix this when a support for system_clock-based timers is
117 // added to the seastar::reactor.
119 _time
= time_point(seconds(s
) + wc_to_clock_type_delta
); // from real time
123 bool ever_expires() {
124 return _time
!= never_expire_timepoint
;
127 time_point
to_time_point() {
132 class item
: public slab_item_base
{
134 using version_type
= uint64_t;
135 using time_point
= expiration::time_point
;
136 using duration
= expiration::duration
;
137 static constexpr uint8_t field_alignment
= alignof(void*);
139 using hook_type
= bi::unordered_set_member_hook
<>;
140 // TODO: align shared data to cache line boundary
141 version_type _version
;
142 hook_type _cache_link
;
143 bi::list_member_hook
<> _timer_link
;
146 uint32_t _value_size
;
147 uint32_t _slab_page_index
;
150 uint8_t _ascii_prefix_size
;
151 char _data
[]; // layout: data=key, (data+key_size)=ascii_prefix, (data+key_size+ascii_prefix_size)=value.
154 item(uint32_t slab_page_index
, item_key
&& key
, sstring
&& ascii_prefix
,
155 sstring
&& value
, expiration expiry
, version_type version
= 1)
157 , _key_hash(key
.hash())
159 , _value_size(value
.size())
160 , _slab_page_index(slab_page_index
)
162 , _key_size(key
.key().size())
163 , _ascii_prefix_size(ascii_prefix
.size())
165 assert(_key_size
<= std::numeric_limits
<uint8_t>::max());
166 assert(_ascii_prefix_size
<= std::numeric_limits
<uint8_t>::max());
168 memcpy(_data
, key
.key().c_str(), _key_size
);
169 // storing ascii_prefix
170 memcpy(_data
+ align_up(_key_size
, field_alignment
), ascii_prefix
.c_str(), _ascii_prefix_size
);
172 memcpy(_data
+ align_up(_key_size
, field_alignment
) + align_up(_ascii_prefix_size
, field_alignment
),
173 value
.c_str(), _value_size
);
176 item(const item
&) = delete;
177 item(item
&&) = delete;
179 clock_type::time_point
get_timeout() {
180 return _expiry
.to_time_point();
183 version_type
version() {
187 const std::string_view
key() const {
188 return std::string_view(_data
, _key_size
);
191 const std::string_view
ascii_prefix() const {
192 const char *p
= _data
+ align_up(_key_size
, field_alignment
);
193 return std::string_view(p
, _ascii_prefix_size
);
196 const std::string_view
value() const {
197 const char *p
= _data
+ align_up(_key_size
, field_alignment
) +
198 align_up(_ascii_prefix_size
, field_alignment
);
199 return std::string_view(p
, _value_size
);
202 size_t key_size() const {
206 size_t ascii_prefix_size() const {
207 return _ascii_prefix_size
;
210 size_t value_size() const {
214 optional
<uint64_t> data_as_integral() {
215 auto str
= value().data();
220 auto len
= _value_size
;
222 // Strip trailing space
223 while (len
&& str
[len
- 1] == ' ') {
228 return {boost::lexical_cast
<uint64_t>(str
, len
)};
229 } catch (const boost::bad_lexical_cast
& e
) {
234 // needed by timer_set
239 // Methods required by slab allocator.
240 uint32_t get_slab_page_index() const {
241 return _slab_page_index
;
243 bool is_unlocked() const {
244 return _ref_count
== 1;
247 friend bool operator==(const item
&a
, const item
&b
) {
248 return (a
._key_hash
== b
._key_hash
) &&
249 (a
._key_size
== b
._key_size
) &&
250 (memcmp(a
._data
, b
._data
, a
._key_size
) == 0);
253 friend std::size_t hash_value(const item
&i
) {
257 friend inline void intrusive_ptr_add_ref(item
* it
) {
258 assert(it
->_ref_count
>= 0);
260 if (it
->_ref_count
== 2) {
265 friend inline void intrusive_ptr_release(item
* it
) {
267 if (it
->_ref_count
== 1) {
268 slab
->unlock_item(it
);
269 } else if (it
->_ref_count
== 0) {
272 assert(it
->_ref_count
>= 0);
275 friend struct item_key_cmp
;
281 bool compare(const item_key
& key
, const item
& it
) const {
282 return (it
._key_hash
== key
.hash()) &&
283 (it
._key_size
== key
.key().size()) &&
284 (memcmp(it
._data
, key
.key().c_str(), it
._key_size
) == 0);
287 bool operator()(const item_key
& key
, const item
& it
) const {
288 return compare(key
, it
);
291 bool operator()(const item
& it
, const item_key
& key
) const {
292 return compare(key
, it
);
296 using item_ptr
= foreign_ptr
<boost::intrusive_ptr
<item
>>;
300 size_t _get_misses
{};
302 size_t _set_replaces
{};
304 size_t _cas_misses
{};
305 size_t _cas_badval
{};
306 size_t _delete_misses
{};
307 size_t _delete_hits
{};
308 size_t _incr_misses
{};
309 size_t _incr_hits
{};
310 size_t _decr_misses
{};
311 size_t _decr_hits
{};
315 size_t _resize_failure
{};
319 void operator+=(const cache_stats
& o
) {
320 _get_hits
+= o
._get_hits
;
321 _get_misses
+= o
._get_misses
;
322 _set_adds
+= o
._set_adds
;
323 _set_replaces
+= o
._set_replaces
;
324 _cas_hits
+= o
._cas_hits
;
325 _cas_misses
+= o
._cas_misses
;
326 _cas_badval
+= o
._cas_badval
;
327 _delete_misses
+= o
._delete_misses
;
328 _delete_hits
+= o
._delete_hits
;
329 _incr_misses
+= o
._incr_misses
;
330 _incr_hits
+= o
._incr_hits
;
331 _decr_misses
+= o
._decr_misses
;
332 _decr_hits
+= o
._decr_hits
;
333 _expired
+= o
._expired
;
334 _evicted
+= o
._evicted
;
336 _resize_failure
+= o
._resize_failure
;
338 _reclaims
+= o
._reclaims
;
342 enum class cas_result
{
343 not_found
, stored
, bad_version
346 struct remote_origin_tag
{
347 template <typename T
>
349 T
move_if_local(T
& ref
) {
354 struct local_origin_tag
{
355 template <typename T
>
357 T
move_if_local(T
& ref
) {
358 return std::move(ref
);
362 struct item_insertion_data
{
364 sstring ascii_prefix
;
371 using cache_type
= bi::unordered_set
<item
,
372 bi::member_hook
<item
, item::hook_type
, &item::_cache_link
>,
373 bi::power_2_buckets
<true>,
374 bi::constant_time_size
<true>>;
375 using cache_iterator
= typename
cache_type::iterator
;
376 static constexpr size_t initial_bucket_count
= 1 << 10;
377 static constexpr float load_factor
= 0.75f
;
378 size_t _resize_up_threshold
= load_factor
* initial_bucket_count
;
379 std::vector
<cache_type::bucket_type
> _buckets
;
381 seastar::timer_set
<item
, &item::_timer_link
> _alive
;
382 timer
<clock_type
> _timer
;
383 // delta in seconds between the current values of a wall clock and a clock_type clock
384 clock_type::duration _wc_to_clock_type_delta
;
386 timer
<clock_type
> _flush_timer
;
388 size_t item_size(item
& item_ref
) {
389 constexpr size_t field_alignment
= alignof(void*);
390 return sizeof(item
) +
391 align_up(item_ref
.key_size(), field_alignment
) +
392 align_up(item_ref
.ascii_prefix_size(), field_alignment
) +
393 item_ref
.value_size();
396 size_t item_size(item_insertion_data
& insertion
) {
397 constexpr size_t field_alignment
= alignof(void*);
398 auto size
= sizeof(item
) +
399 align_up(insertion
.key
.key().size(), field_alignment
) +
400 align_up(insertion
.ascii_prefix
.size(), field_alignment
) +
401 insertion
.data
.size();
403 static bool print_item_footprint
= true;
404 if (print_item_footprint
) {
405 print_item_footprint
= false;
406 std::cout
<< __FUNCTION__
<< ": " << size
<< "\n";
407 std::cout
<< "sizeof(item) " << sizeof(item
) << "\n";
408 std::cout
<< "key.size " << insertion
.key
.key().size() << "\n";
409 std::cout
<< "value.size " << insertion
.data
.size() << "\n";
410 std::cout
<< "ascii_prefix.size " << insertion
.ascii_prefix
.size() << "\n";
416 template <bool IsInCache
= true, bool IsInTimerList
= true, bool Release
= true>
417 void erase(item
& item_ref
) {
419 _cache
.erase(_cache
.iterator_to(item_ref
));
422 if (item_ref
._expiry
.ever_expires()) {
423 _alive
.remove(item_ref
);
426 _stats
._bytes
-= item_size(item_ref
);
428 // memory used by item shouldn't be freed when slab is replacing it with another item.
429 intrusive_ptr_release(&item_ref
);
434 using namespace std::chrono
;
437 // Adjust the delta on every timer event to minimize an error caused
438 // by a wall clock adjustment.
440 _wc_to_clock_type_delta
=
441 duration_cast
<clock_type::duration
>(clock_type::now().time_since_epoch() - system_clock::now().time_since_epoch());
443 auto exp
= _alive
.expire(clock_type::now());
444 while (!exp
.empty()) {
445 auto item
= &*exp
.begin();
447 erase
<true, false>(*item
);
450 _timer
.arm(_alive
.get_next_timeout());
454 cache_iterator
find(const item_key
& key
) {
455 return _cache
.find(key
, std::hash
<item_key
>(), item_key_cmp());
458 template <typename Origin
>
460 cache_iterator
add_overriding(cache_iterator i
, item_insertion_data
& insertion
) {
462 uint64_t old_item_version
= old_item
._version
;
466 size_t size
= item_size(insertion
);
467 auto new_item
= slab
->create(size
, Origin::move_if_local(insertion
.key
), Origin::move_if_local(insertion
.ascii_prefix
),
468 Origin::move_if_local(insertion
.data
), insertion
.expiry
, old_item_version
+ 1);
469 intrusive_ptr_add_ref(new_item
);
471 auto insert_result
= _cache
.insert(*new_item
);
472 assert(insert_result
.second
);
473 if (insertion
.expiry
.ever_expires() && _alive
.insert(*new_item
)) {
474 _timer
.rearm(new_item
->get_timeout());
476 _stats
._bytes
+= size
;
477 return insert_result
.first
;
480 template <typename Origin
>
482 void add_new(item_insertion_data
& insertion
) {
483 size_t size
= item_size(insertion
);
484 auto new_item
= slab
->create(size
, Origin::move_if_local(insertion
.key
), Origin::move_if_local(insertion
.ascii_prefix
),
485 Origin::move_if_local(insertion
.data
), insertion
.expiry
);
486 intrusive_ptr_add_ref(new_item
);
487 auto& item_ref
= *new_item
;
488 _cache
.insert(item_ref
);
489 if (insertion
.expiry
.ever_expires() && _alive
.insert(item_ref
)) {
490 _timer
.rearm(item_ref
.get_timeout());
492 _stats
._bytes
+= size
;
496 void maybe_rehash() {
497 if (_cache
.size() >= _resize_up_threshold
) {
498 auto new_size
= _cache
.bucket_count() * 2;
499 std::vector
<cache_type::bucket_type
> old_buckets
;
501 old_buckets
= std::exchange(_buckets
, std::vector
<cache_type::bucket_type
>(new_size
));
502 } catch (const std::bad_alloc
& e
) {
503 _stats
._resize_failure
++;
506 _cache
.rehash(typename
cache_type::bucket_traits(_buckets
.data(), new_size
));
507 _resize_up_threshold
= _cache
.bucket_count() * load_factor
;
511 cache(uint64_t per_cpu_slab_size
, uint64_t slab_page_size
)
512 : _buckets(initial_bucket_count
)
513 , _cache(cache_type::bucket_traits(_buckets
.data(), initial_bucket_count
))
515 using namespace std::chrono
;
517 _wc_to_clock_type_delta
=
518 duration_cast
<clock_type::duration
>(clock_type::now().time_since_epoch() - system_clock::now().time_since_epoch());
520 _timer
.set_callback([this] { expire(); });
521 _flush_timer
.set_callback([this] { flush_all(); });
523 // initialize per-thread slab allocator.
524 slab_holder
= std::make_unique
<slab_allocator
<item
>>(default_slab_growth_factor
, per_cpu_slab_size
, slab_page_size
,
525 [this](item
& item_ref
) { erase
<true, true, false>(item_ref
); _stats
._evicted
++; });
526 slab
= slab_holder
.get();
528 static bool print_slab_classes
= true;
529 if (print_slab_classes
) {
530 print_slab_classes
= false;
531 slab
->print_slab_classes();
541 _flush_timer
.cancel();
542 _cache
.erase_and_dispose(_cache
.begin(), _cache
.end(), [this] (item
* it
) {
543 erase
<false, true>(*it
);
547 void flush_at(uint32_t time
) {
548 auto expiry
= expiration(get_wc_to_clock_type_delta(), time
);
549 _flush_timer
.rearm(expiry
.to_time_point());
552 template <typename Origin
= local_origin_tag
>
553 bool set(item_insertion_data
& insertion
) {
554 auto i
= find(insertion
.key
);
555 if (i
!= _cache
.end()) {
556 add_overriding
<Origin
>(i
, insertion
);
557 _stats
._set_replaces
++;
560 add_new
<Origin
>(insertion
);
566 template <typename Origin
= local_origin_tag
>
567 bool add(item_insertion_data
& insertion
) {
568 auto i
= find(insertion
.key
);
569 if (i
!= _cache
.end()) {
574 add_new
<Origin
>(insertion
);
578 template <typename Origin
= local_origin_tag
>
579 bool replace(item_insertion_data
& insertion
) {
580 auto i
= find(insertion
.key
);
581 if (i
== _cache
.end()) {
585 _stats
._set_replaces
++;
586 add_overriding
<Origin
>(i
, insertion
);
590 bool remove(const item_key
& key
) {
592 if (i
== _cache
.end()) {
593 _stats
._delete_misses
++;
596 _stats
._delete_hits
++;
602 item_ptr
get(const item_key
& key
) {
604 if (i
== _cache
.end()) {
605 _stats
._get_misses
++;
610 return item_ptr(&item_ref
);
613 template <typename Origin
= local_origin_tag
>
614 cas_result
cas(item_insertion_data
& insertion
, item::version_type version
) {
615 auto i
= find(insertion
.key
);
616 if (i
== _cache
.end()) {
617 _stats
._cas_misses
++;
618 return cas_result::not_found
;
621 if (item_ref
._version
!= version
) {
622 _stats
._cas_badval
++;
623 return cas_result::bad_version
;
626 add_overriding
<Origin
>(i
, insertion
);
627 return cas_result::stored
;
631 return _cache
.size();
634 size_t bucket_count() {
635 return _cache
.bucket_count();
638 cache_stats
stats() {
639 _stats
._size
= size();
643 template <typename Origin
= local_origin_tag
>
644 std::pair
<item_ptr
, bool> incr(item_key
& key
, uint64_t delta
) {
646 if (i
== _cache
.end()) {
647 _stats
._incr_misses
++;
648 return {item_ptr
{}, false};
652 auto value
= item_ref
.data_as_integral();
654 return {boost::intrusive_ptr
<item
>(&item_ref
), false};
656 item_insertion_data insertion
{
657 .key
= Origin::move_if_local(key
),
658 .ascii_prefix
= sstring(item_ref
.ascii_prefix().data(), item_ref
.ascii_prefix_size()),
659 .data
= to_sstring(*value
+ delta
),
660 .expiry
= item_ref
._expiry
662 i
= add_overriding
<local_origin_tag
>(i
, insertion
);
663 return {boost::intrusive_ptr
<item
>(&*i
), true};
666 template <typename Origin
= local_origin_tag
>
667 std::pair
<item_ptr
, bool> decr(item_key
& key
, uint64_t delta
) {
669 if (i
== _cache
.end()) {
670 _stats
._decr_misses
++;
671 return {item_ptr
{}, false};
675 auto value
= item_ref
.data_as_integral();
677 return {boost::intrusive_ptr
<item
>(&item_ref
), false};
679 item_insertion_data insertion
{
680 .key
= Origin::move_if_local(key
),
681 .ascii_prefix
= sstring(item_ref
.ascii_prefix().data(), item_ref
.ascii_prefix_size()),
682 .data
= to_sstring(*value
- std::min(*value
, delta
)),
683 .expiry
= item_ref
._expiry
685 i
= add_overriding
<local_origin_tag
>(i
, insertion
);
686 return {boost::intrusive_ptr
<item
>(&*i
), true};
689 std::pair
<unsigned, foreign_ptr
<lw_shared_ptr
<std::string
>>> print_hash_stats() {
690 static constexpr unsigned bits
= sizeof(size_t) * 8;
691 size_t histo
[bits
+ 1] {};
693 unsigned max_bucket
= 0;
695 for (size_t i
= 0; i
< _cache
.bucket_count(); i
++) {
696 size_t size
= _cache
.bucket_size(i
);
701 bucket
= bits
- count_leading_zeros(size
);
703 max_bucket
= std::max(max_bucket
, bucket
);
704 max_size
= std::max(max_size
, size
);
708 std::stringstream ss
;
710 ss
<< "size: " << _cache
.size() << "\n";
711 ss
<< "buckets: " << _cache
.bucket_count() << "\n";
712 ss
<< "load: " << format("{:.2f}", (double)_cache
.size() / _cache
.bucket_count()) << "\n";
713 ss
<< "max bucket occupancy: " << max_size
<< "\n";
714 ss
<< "bucket occupancy histogram:\n";
716 for (unsigned i
= 0; i
< (max_bucket
+ 2); i
++) {
723 ss
<< (1 << (i
- 1)) << "+: ";
725 ss
<< histo
[i
] << "\n";
727 return {this_shard_id(), make_foreign(make_lw_shared
<std::string
>(ss
.str()))};
730 future
<> stop() { return make_ready_future
<>(); }
731 clock_type::duration
get_wc_to_clock_type_delta() { return _wc_to_clock_type_delta
; }
734 class sharded_cache
{
736 distributed
<cache
>& _peers
;
739 unsigned get_cpu(const item_key
& key
) {
740 return std::hash
<item_key
>()(key
) % smp::count
;
743 sharded_cache(distributed
<cache
>& peers
) : _peers(peers
) {}
745 future
<> flush_all() {
746 return _peers
.invoke_on_all(&cache::flush_all
);
749 future
<> flush_at(uint32_t time
) {
750 return _peers
.invoke_on_all(&cache::flush_at
, time
);
753 auto get_wc_to_clock_type_delta() { return _peers
.local().get_wc_to_clock_type_delta(); }
755 // The caller must keep @insertion live until the resulting future resolves.
756 future
<bool> set(item_insertion_data
& insertion
) {
757 auto cpu
= get_cpu(insertion
.key
);
758 if (this_shard_id() == cpu
) {
759 return make_ready_future
<bool>(_peers
.local().set(insertion
));
761 return _peers
.invoke_on(cpu
, &cache::set
<remote_origin_tag
>, std::ref(insertion
));
764 // The caller must keep @insertion live until the resulting future resolves.
765 future
<bool> add(item_insertion_data
& insertion
) {
766 auto cpu
= get_cpu(insertion
.key
);
767 if (this_shard_id() == cpu
) {
768 return make_ready_future
<bool>(_peers
.local().add(insertion
));
770 return _peers
.invoke_on(cpu
, &cache::add
<remote_origin_tag
>, std::ref(insertion
));
773 // The caller must keep @insertion live until the resulting future resolves.
774 future
<bool> replace(item_insertion_data
& insertion
) {
775 auto cpu
= get_cpu(insertion
.key
);
776 if (this_shard_id() == cpu
) {
777 return make_ready_future
<bool>(_peers
.local().replace(insertion
));
779 return _peers
.invoke_on(cpu
, &cache::replace
<remote_origin_tag
>, std::ref(insertion
));
782 // The caller must keep @key live until the resulting future resolves.
783 future
<bool> remove(const item_key
& key
) {
784 auto cpu
= get_cpu(key
);
785 return _peers
.invoke_on(cpu
, &cache::remove
, std::ref(key
));
788 // The caller must keep @key live until the resulting future resolves.
789 future
<item_ptr
> get(const item_key
& key
) {
790 auto cpu
= get_cpu(key
);
791 return _peers
.invoke_on(cpu
, &cache::get
, std::ref(key
));
794 // The caller must keep @insertion live until the resulting future resolves.
795 future
<cas_result
> cas(item_insertion_data
& insertion
, item::version_type version
) {
796 auto cpu
= get_cpu(insertion
.key
);
797 if (this_shard_id() == cpu
) {
798 return make_ready_future
<cas_result
>(_peers
.local().cas(insertion
, version
));
800 return _peers
.invoke_on(cpu
, &cache::cas
<remote_origin_tag
>, std::ref(insertion
), std::move(version
));
803 future
<cache_stats
> stats() {
804 return _peers
.map_reduce(adder
<cache_stats
>(), &cache::stats
);
807 // The caller must keep @key live until the resulting future resolves.
808 future
<std::pair
<item_ptr
, bool>> incr(item_key
& key
, uint64_t delta
) {
809 auto cpu
= get_cpu(key
);
810 if (this_shard_id() == cpu
) {
811 return make_ready_future
<std::pair
<item_ptr
, bool>>(
812 _peers
.local().incr
<local_origin_tag
>(key
, delta
));
814 return _peers
.invoke_on(cpu
, &cache::incr
<remote_origin_tag
>, std::ref(key
), std::move(delta
));
817 // The caller must keep @key live until the resulting future resolves.
818 future
<std::pair
<item_ptr
, bool>> decr(item_key
& key
, uint64_t delta
) {
819 auto cpu
= get_cpu(key
);
820 if (this_shard_id() == cpu
) {
821 return make_ready_future
<std::pair
<item_ptr
, bool>>(
822 _peers
.local().decr(key
, delta
));
824 return _peers
.invoke_on(cpu
, &cache::decr
<remote_origin_tag
>, std::ref(key
), std::move(delta
));
827 future
<> print_hash_stats(output_stream
<char>& out
) {
828 return _peers
.map_reduce([&out
] (std::pair
<unsigned, foreign_ptr
<lw_shared_ptr
<std::string
>>> data
) mutable {
829 return out
.write("=== CPU " + std::to_string(data
.first
) + " ===\r\n")
830 .then([&out
, str
= std::move(data
.second
)] {
831 return out
.write(*str
);
833 }, &cache::print_hash_stats
);
837 struct system_stats
{
838 uint32_t _curr_connections
{};
839 uint32_t _total_connections
{};
840 uint64_t _cmd_get
{};
841 uint64_t _cmd_set
{};
842 uint64_t _cmd_flush
{};
843 clock_type::time_point _start_time
;
846 _start_time
= clock_type::time_point::max();
848 system_stats(clock_type::time_point start_time
)
849 : _start_time(start_time
) {
851 system_stats
self() {
854 void operator+=(const system_stats
& other
) {
855 _curr_connections
+= other
._curr_connections
;
856 _total_connections
+= other
._total_connections
;
857 _cmd_get
+= other
._cmd_get
;
858 _cmd_set
+= other
._cmd_set
;
859 _cmd_flush
+= other
._cmd_flush
;
860 _start_time
= std::min(_start_time
, other
._start_time
);
862 future
<> stop() { return make_ready_future
<>(); }
865 class ascii_protocol
{
867 using this_type
= ascii_protocol
;
868 sharded_cache
& _cache
;
869 distributed
<system_stats
>& _system_stats
;
870 memcache_ascii_parser _parser
;
872 item_insertion_data _insertion
;
873 std::vector
<item_ptr
> _items
;
875 static constexpr const char *msg_crlf
= "\r\n";
876 static constexpr const char *msg_error
= "ERROR\r\n";
877 static constexpr const char *msg_stored
= "STORED\r\n";
878 static constexpr const char *msg_not_stored
= "NOT_STORED\r\n";
879 static constexpr const char *msg_end
= "END\r\n";
880 static constexpr const char *msg_value
= "VALUE ";
881 static constexpr const char *msg_deleted
= "DELETED\r\n";
882 static constexpr const char *msg_not_found
= "NOT_FOUND\r\n";
883 static constexpr const char *msg_ok
= "OK\r\n";
884 static constexpr const char *msg_version
= "VERSION " VERSION_STRING
"\r\n";
885 static constexpr const char *msg_exists
= "EXISTS\r\n";
886 static constexpr const char *msg_stat
= "STAT ";
887 static constexpr const char *msg_out_of_memory
= "SERVER_ERROR Out of memory allocating new item\r\n";
888 static constexpr const char *msg_error_non_numeric_value
= "CLIENT_ERROR cannot increment or decrement non-numeric value\r\n";
890 template <bool WithVersion
>
891 static void append_item(scattered_message
<char>& msg
, item_ptr item
) {
896 msg
.append_static("VALUE ");
897 msg
.append_static(item
->key());
898 msg
.append_static(item
->ascii_prefix());
901 msg
.append_static(" ");
902 msg
.append(to_sstring(item
->version()));
905 msg
.append_static(msg_crlf
);
906 msg
.append_static(item
->value());
907 msg
.append_static(msg_crlf
);
908 msg
.on_delete([item
= std::move(item
)] {});
911 template <bool WithVersion
>
912 future
<> handle_get(output_stream
<char>& out
) {
913 _system_stats
.local()._cmd_get
++;
914 if (_parser
._keys
.size() == 1) {
915 return _cache
.get(_parser
._keys
[0]).then([&out
] (auto item
) -> future
<> {
916 scattered_message
<char> msg
;
917 this_type::append_item
<WithVersion
>(msg
, std::move(item
));
918 msg
.append_static(msg_end
);
919 return out
.write(std::move(msg
));
923 return parallel_for_each(_parser
._keys
.begin(), _parser
._keys
.end(), [this] (const auto& key
) {
924 return _cache
.get(key
).then([this] (auto item
) {
925 _items
.emplace_back(std::move(item
));
927 }).then([this, &out
] () {
928 scattered_message
<char> msg
;
929 for (auto& item
: _items
) {
930 append_item
<WithVersion
>(msg
, std::move(item
));
932 msg
.append_static(msg_end
);
933 return out
.write(std::move(msg
));
938 template <typename Value
>
939 static future
<> print_stat(output_stream
<char>& out
, const char* key
, Value value
) {
940 return out
.write(msg_stat
)
941 .then([&out
, key
] { return out
.write(key
); })
942 .then([&out
] { return out
.write(" "); })
943 .then([&out
, value
] { return out
.write(to_sstring(value
)); })
944 .then([&out
] { return out
.write(msg_crlf
); });
947 future
<> print_stats(output_stream
<char>& out
) {
948 return _cache
.stats().then([this, &out
] (auto stats
) {
949 return _system_stats
.map_reduce(adder
<system_stats
>(), &system_stats::self
)
950 .then([&out
, all_cache_stats
= std::move(stats
)] (auto all_system_stats
) -> future
<> {
951 auto now
= clock_type::now();
952 auto total_items
= all_cache_stats
._set_replaces
+ all_cache_stats
._set_adds
953 + all_cache_stats
._cas_hits
;
954 return print_stat(out
, "pid", getpid())
955 .then([&out
, uptime
= now
- all_system_stats
._start_time
] {
956 return print_stat(out
, "uptime",
957 std::chrono::duration_cast
<std::chrono::seconds
>(uptime
).count());
958 }).then([now
, &out
] {
959 return print_stat(out
, "time",
960 std::chrono::duration_cast
<std::chrono::seconds
>(now
.time_since_epoch()).count());
962 return print_stat(out
, "version", VERSION_STRING
);
964 return print_stat(out
, "pointer_size", sizeof(void*)*8);
965 }).then([&out
, v
= all_system_stats
._curr_connections
] {
966 return print_stat(out
, "curr_connections", v
);
967 }).then([&out
, v
= all_system_stats
._total_connections
] {
968 return print_stat(out
, "total_connections", v
);
969 }).then([&out
, v
= all_system_stats
._curr_connections
] {
970 return print_stat(out
, "connection_structures", v
);
971 }).then([&out
, v
= all_system_stats
._cmd_get
] {
972 return print_stat(out
, "cmd_get", v
);
973 }).then([&out
, v
= all_system_stats
._cmd_set
] {
974 return print_stat(out
, "cmd_set", v
);
975 }).then([&out
, v
= all_system_stats
._cmd_flush
] {
976 return print_stat(out
, "cmd_flush", v
);
978 return print_stat(out
, "cmd_touch", 0);
979 }).then([&out
, v
= all_cache_stats
._get_hits
] {
980 return print_stat(out
, "get_hits", v
);
981 }).then([&out
, v
= all_cache_stats
._get_misses
] {
982 return print_stat(out
, "get_misses", v
);
983 }).then([&out
, v
= all_cache_stats
._delete_misses
] {
984 return print_stat(out
, "delete_misses", v
);
985 }).then([&out
, v
= all_cache_stats
._delete_hits
] {
986 return print_stat(out
, "delete_hits", v
);
987 }).then([&out
, v
= all_cache_stats
._incr_misses
] {
988 return print_stat(out
, "incr_misses", v
);
989 }).then([&out
, v
= all_cache_stats
._incr_hits
] {
990 return print_stat(out
, "incr_hits", v
);
991 }).then([&out
, v
= all_cache_stats
._decr_misses
] {
992 return print_stat(out
, "decr_misses", v
);
993 }).then([&out
, v
= all_cache_stats
._decr_hits
] {
994 return print_stat(out
, "decr_hits", v
);
995 }).then([&out
, v
= all_cache_stats
._cas_misses
] {
996 return print_stat(out
, "cas_misses", v
);
997 }).then([&out
, v
= all_cache_stats
._cas_hits
] {
998 return print_stat(out
, "cas_hits", v
);
999 }).then([&out
, v
= all_cache_stats
._cas_badval
] {
1000 return print_stat(out
, "cas_badval", v
);
1002 return print_stat(out
, "touch_hits", 0);
1004 return print_stat(out
, "touch_misses", 0);
1006 return print_stat(out
, "auth_cmds", 0);
1008 return print_stat(out
, "auth_errors", 0);
1010 return print_stat(out
, "threads", smp::count
);
1011 }).then([&out
, v
= all_cache_stats
._size
] {
1012 return print_stat(out
, "curr_items", v
);
1013 }).then([&out
, v
= total_items
] {
1014 return print_stat(out
, "total_items", v
);
1015 }).then([&out
, v
= all_cache_stats
._expired
] {
1016 return print_stat(out
, "seastar.expired", v
);
1017 }).then([&out
, v
= all_cache_stats
._resize_failure
] {
1018 return print_stat(out
, "seastar.resize_failure", v
);
1019 }).then([&out
, v
= all_cache_stats
._evicted
] {
1020 return print_stat(out
, "evictions", v
);
1021 }).then([&out
, v
= all_cache_stats
._bytes
] {
1022 return print_stat(out
, "bytes", v
);
1024 return out
.write(msg_end
);
1030 ascii_protocol(sharded_cache
& cache
, distributed
<system_stats
>& system_stats
)
1032 , _system_stats(system_stats
)
1035 void prepare_insertion() {
1036 _insertion
= item_insertion_data
{
1037 .key
= std::move(_parser
._key
),
1038 .ascii_prefix
= make_sstring(" ", _parser
._flags_str
, " ", _parser
._size_str
),
1039 .data
= std::move(_parser
._blob
),
1040 .expiry
= expiration(_cache
.get_wc_to_clock_type_delta(), _parser
._expiration
)
1044 future
<> handle(input_stream
<char>& in
, output_stream
<char>& out
) {
1046 return in
.consume(_parser
).then([this, &out
] () -> future
<> {
1047 switch (_parser
._state
) {
1048 case memcache_ascii_parser::state::eof
:
1049 return make_ready_future
<>();
1051 case memcache_ascii_parser::state::error
:
1052 return out
.write(msg_error
);
1054 case memcache_ascii_parser::state::cmd_set
:
1056 _system_stats
.local()._cmd_set
++;
1057 prepare_insertion();
1058 auto f
= _cache
.set(_insertion
);
1059 if (_parser
._noreply
) {
1060 return std::move(f
).discard_result();
1062 return std::move(f
).then([&out
] (...) {
1063 return out
.write(msg_stored
);
1067 case memcache_ascii_parser::state::cmd_cas
:
1069 _system_stats
.local()._cmd_set
++;
1070 prepare_insertion();
1071 auto f
= _cache
.cas(_insertion
, _parser
._version
);
1072 if (_parser
._noreply
) {
1073 return std::move(f
).discard_result();
1075 return std::move(f
).then([&out
] (auto result
) {
1077 case cas_result::stored
:
1078 return out
.write(msg_stored
);
1079 case cas_result::not_found
:
1080 return out
.write(msg_not_found
);
1081 case cas_result::bad_version
:
1082 return out
.write(msg_exists
);
1089 case memcache_ascii_parser::state::cmd_add
:
1091 _system_stats
.local()._cmd_set
++;
1092 prepare_insertion();
1093 auto f
= _cache
.add(_insertion
);
1094 if (_parser
._noreply
) {
1095 return std::move(f
).discard_result();
1097 return std::move(f
).then([&out
] (bool added
) {
1098 return out
.write(added
? msg_stored
: msg_not_stored
);
1102 case memcache_ascii_parser::state::cmd_replace
:
1104 _system_stats
.local()._cmd_set
++;
1105 prepare_insertion();
1106 auto f
= _cache
.replace(_insertion
);
1107 if (_parser
._noreply
) {
1108 return std::move(f
).discard_result();
1110 return std::move(f
).then([&out
] (auto replaced
) {
1111 return out
.write(replaced
? msg_stored
: msg_not_stored
);
1115 case memcache_ascii_parser::state::cmd_get
:
1116 return handle_get
<false>(out
);
1118 case memcache_ascii_parser::state::cmd_gets
:
1119 return handle_get
<true>(out
);
1121 case memcache_ascii_parser::state::cmd_delete
:
1123 auto f
= _cache
.remove(_parser
._key
);
1124 if (_parser
._noreply
) {
1125 return std::move(f
).discard_result();
1127 return std::move(f
).then([&out
] (bool removed
) {
1128 return out
.write(removed
? msg_deleted
: msg_not_found
);
1132 case memcache_ascii_parser::state::cmd_flush_all
:
1134 _system_stats
.local()._cmd_flush
++;
1135 if (_parser
._expiration
) {
1136 auto f
= _cache
.flush_at(_parser
._expiration
);
1137 if (_parser
._noreply
) {
1140 return std::move(f
).then([&out
] {
1141 return out
.write(msg_ok
);
1144 auto f
= _cache
.flush_all();
1145 if (_parser
._noreply
) {
1148 return std::move(f
).then([&out
] {
1149 return out
.write(msg_ok
);
1154 case memcache_ascii_parser::state::cmd_version
:
1155 return out
.write(msg_version
);
1157 case memcache_ascii_parser::state::cmd_stats
:
1158 return print_stats(out
);
1160 case memcache_ascii_parser::state::cmd_stats_hash
:
1161 return _cache
.print_hash_stats(out
);
1163 case memcache_ascii_parser::state::cmd_incr
:
1165 auto f
= _cache
.incr(_parser
._key
, _parser
._u64
);
1166 if (_parser
._noreply
) {
1167 return std::move(f
).discard_result();
1169 return std::move(f
).then([&out
] (auto result
) {
1170 auto item
= std::move(result
.first
);
1172 return out
.write(msg_not_found
);
1174 auto incremented
= result
.second
;
1176 return out
.write(msg_error_non_numeric_value
);
1178 return out
.write(item
->value().data(), item
->value_size()).then([&out
] {
1179 return out
.write(msg_crlf
);
1184 case memcache_ascii_parser::state::cmd_decr
:
1186 auto f
= _cache
.decr(_parser
._key
, _parser
._u64
);
1187 if (_parser
._noreply
) {
1188 return std::move(f
).discard_result();
1190 return std::move(f
).then([&out
] (auto result
) {
1191 auto item
= std::move(result
.first
);
1193 return out
.write(msg_not_found
);
1195 auto decremented
= result
.second
;
1197 return out
.write(msg_error_non_numeric_value
);
1199 return out
.write(item
->value().data(), item
->value_size()).then([&out
] {
1200 return out
.write(msg_crlf
);
1206 }).then_wrapped([this, &out
] (auto&& f
) -> future
<> {
1207 // FIXME: then_wrapped() being scheduled even though no exception was triggered has a
1208 // performance cost of about 2.6%. Not using it means maintainability penalty.
1211 } catch (std::bad_alloc
& e
) {
1212 if (_parser
._noreply
) {
1213 return make_ready_future
<>();
1215 return out
.write(msg_out_of_memory
);
1217 return make_ready_future
<>();
1224 static const size_t default_max_datagram_size
= 1400;
1226 std::optional
<future
<>> _task
;
1227 sharded_cache
& _cache
;
1228 distributed
<system_stats
>& _system_stats
;
1231 size_t _max_datagram_size
= default_max_datagram_size
;
1234 packed
<uint16_t> _request_id
;
1235 packed
<uint16_t> _sequence_number
;
1236 packed
<uint16_t> _n
;
1237 packed
<uint16_t> _reserved
;
1239 template<typename Adjuster
>
1240 auto adjust_endianness(Adjuster a
) {
1241 return a(_request_id
, _sequence_number
, _n
);
1243 } __attribute__((packed
));
1247 uint16_t _request_id
;
1248 input_stream
<char> _in
;
1249 output_stream
<char> _out
;
1250 std::vector
<packet
> _out_bufs
;
1251 ascii_protocol _proto
;
1253 static output_stream_options
make_opts() noexcept
{
1254 output_stream_options opts
;
1255 opts
.trim_to_size
= true;
1259 connection(ipv4_addr src
, uint16_t request_id
, input_stream
<char>&& in
, size_t out_size
,
1260 sharded_cache
& c
, distributed
<system_stats
>& system_stats
)
1262 , _request_id(request_id
)
1263 , _in(std::move(in
))
1264 , _out(output_stream
<char>(data_sink(std::make_unique
<vector_data_sink
>(_out_bufs
)), out_size
, make_opts()))
1265 , _proto(c
, system_stats
)
1268 future
<> respond(udp_channel
& chan
) {
1270 return do_for_each(_out_bufs
.begin(), _out_bufs
.end(), [this, i
, &chan
] (packet
& p
) mutable {
1271 header
* out_hdr
= p
.prepend_header
<header
>(0);
1272 out_hdr
->_request_id
= _request_id
;
1273 out_hdr
->_sequence_number
= i
++;
1274 out_hdr
->_n
= _out_bufs
.size();
1275 *out_hdr
= hton(*out_hdr
);
1276 return chan
.send(_src
, std::move(p
));
1282 udp_server(sharded_cache
& c
, distributed
<system_stats
>& system_stats
, uint16_t port
= 11211)
1284 , _system_stats(system_stats
)
1288 void set_max_datagram_size(size_t max_datagram_size
) {
1289 _max_datagram_size
= max_datagram_size
;
1293 _chan
= make_udp_channel({_port
});
1294 // Run in the background.
1295 _task
= keep_doing([this] {
1296 return _chan
.receive().then([this](udp_datagram dgram
) {
1297 packet
& p
= dgram
.get_data();
1298 if (p
.len() < sizeof(header
)) {
1299 // dropping invalid packet
1300 return make_ready_future
<>();
1303 header hdr
= ntoh(*p
.get_header
<header
>());
1304 p
.trim_front(sizeof(hdr
));
1306 auto request_id
= hdr
._request_id
;
1307 auto in
= as_input_stream(std::move(p
));
1308 auto conn
= make_lw_shared
<connection
>(dgram
.get_src(), request_id
, std::move(in
),
1309 _max_datagram_size
- sizeof(header
), _cache
, _system_stats
);
1311 if (hdr
._n
!= 1 || hdr
._sequence_number
!= 0) {
1312 return conn
->_out
.write("CLIENT_ERROR only single-datagram requests supported\r\n").then([this, conn
] {
1313 return conn
->_out
.flush().then([this, conn
] {
1314 return conn
->respond(_chan
).then([conn
] {});
1319 return conn
->_proto
.handle(conn
->_in
, conn
->_out
).then([this, conn
]() mutable {
1320 return conn
->_out
.flush().then([this, conn
] {
1321 return conn
->respond(_chan
).then([conn
] {});
1329 _chan
.shutdown_input();
1330 _chan
.shutdown_output();
1331 return _task
->handle_exception([](std::exception_ptr e
) {
1332 std::cerr
<< "exception in udp_server " << e
<< '\n';
1339 std::optional
<future
<>> _task
;
1340 lw_shared_ptr
<seastar::server_socket
> _listener
;
1341 sharded_cache
& _cache
;
1342 distributed
<system_stats
>& _system_stats
;
1345 connected_socket _socket
;
1346 socket_address _addr
;
1347 input_stream
<char> _in
;
1348 output_stream
<char> _out
;
1349 ascii_protocol _proto
;
1350 distributed
<system_stats
>& _system_stats
;
1351 connection(connected_socket
&& socket
, socket_address addr
, sharded_cache
& c
, distributed
<system_stats
>& system_stats
)
1352 : _socket(std::move(socket
))
1354 , _in(_socket
.input())
1355 , _out(_socket
.output())
1356 , _proto(c
, system_stats
)
1357 , _system_stats(system_stats
)
1359 _system_stats
.local()._curr_connections
++;
1360 _system_stats
.local()._total_connections
++;
1363 _system_stats
.local()._curr_connections
--;
1367 tcp_server(sharded_cache
& cache
, distributed
<system_stats
>& system_stats
, uint16_t port
= 11211)
1369 , _system_stats(system_stats
)
1375 lo
.reuse_address
= true;
1376 _listener
= seastar::server_socket(seastar::listen(make_ipv4_address({_port
}), lo
));
1377 // Run in the background until eof has reached on the input connection.
1378 _task
= keep_doing([this] {
1379 return _listener
->accept().then([this] (accept_result ar
) mutable {
1380 connected_socket fd
= std::move(ar
.connection
);
1381 socket_address addr
= std::move(ar
.remote_address
);
1382 auto conn
= make_lw_shared
<connection
>(std::move(fd
), addr
, _cache
, _system_stats
);
1383 (void)do_until([conn
] { return conn
->_in
.eof(); }, [conn
] {
1384 return conn
->_proto
.handle(conn
->_in
, conn
->_out
).then([conn
] {
1385 return conn
->_out
.flush();
1388 return conn
->_out
.close().finally([conn
]{});
1395 _listener
->abort_accept();
1396 return _task
->handle_exception([](std::exception_ptr e
) {
1397 std::cerr
<< "exception in tcp_server " << e
<< '\n';
1402 class stats_printer
{
1405 sharded_cache
& _cache
;
1407 stats_printer(sharded_cache
& cache
)
1411 _timer
.set_callback([this] {
1412 (void)_cache
.stats().then([] (auto stats
) {
1413 auto gets_total
= stats
._get_hits
+ stats
._get_misses
;
1414 auto get_hit_rate
= gets_total
? ((double)stats
._get_hits
* 100 / gets_total
) : 0;
1415 auto sets_total
= stats
._set_adds
+ stats
._set_replaces
;
1416 auto set_replace_rate
= sets_total
? ((double)stats
._set_replaces
* 100/ sets_total
) : 0;
1417 std::cout
<< "items: " << stats
._size
<< " "
1418 << std::setprecision(2) << std::fixed
1419 << "get: " << stats
._get_hits
<< "/" << gets_total
<< " (" << get_hit_rate
<< "%) "
1420 << "set: " << stats
._set_replaces
<< "/" << sets_total
<< " (" << set_replace_rate
<< "%)";
1421 std::cout
<< std::endl
;
1424 _timer
.arm_periodic(std::chrono::seconds(1));
1427 future
<> stop() { return make_ready_future
<>(); }
1430 } /* namespace memcache */
1432 int main(int ac
, char** av
) {
1433 distributed
<memcache::cache
> cache_peers
;
1434 memcache::sharded_cache
cache(cache_peers
);
1435 distributed
<memcache::system_stats
> system_stats
;
1436 distributed
<memcache::udp_server
> udp_server
;
1437 distributed
<memcache::tcp_server
> tcp_server
;
1438 memcache::stats_printer
stats(cache
);
1440 namespace bpo
= boost::program_options
;
1443 ("max-datagram-size", bpo::value
<int>()->default_value(memcache::udp_server::default_max_datagram_size
),
1444 "Maximum size of UDP datagram")
1445 ("max-slab-size", bpo::value
<uint64_t>()->default_value(memcache::default_per_cpu_slab_size
/MB
),
1446 "Maximum memory to be used for items (value in megabytes) (reclaimer is disabled if set)")
1447 ("slab-page-size", bpo::value
<uint64_t>()->default_value(memcache::default_slab_page_size
/MB
),
1448 "Size of slab page (value in megabytes)")
1450 "Print basic statistics periodically (every second)")
1451 ("port", bpo::value
<uint16_t>()->default_value(11211),
1452 "Specify UDP and TCP ports for memcached server to listen on")
1455 return app
.run_deprecated(ac
, av
, [&] {
1456 engine().at_exit([&] { return tcp_server
.stop(); });
1457 engine().at_exit([&] { return udp_server
.stop(); });
1458 engine().at_exit([&] { return cache_peers
.stop(); });
1459 engine().at_exit([&] { return system_stats
.stop(); });
1461 auto&& config
= app
.configuration();
1462 uint16_t port
= config
["port"].as
<uint16_t>();
1463 uint64_t per_cpu_slab_size
= config
["max-slab-size"].as
<uint64_t>() * MB
;
1464 uint64_t slab_page_size
= config
["slab-page-size"].as
<uint64_t>() * MB
;
1465 return cache_peers
.start(std::move(per_cpu_slab_size
), std::move(slab_page_size
)).then([&system_stats
] {
1466 return system_stats
.start(memcache::clock_type::now());
1468 std::cout
<< PLATFORM
<< " memcached " << VERSION
<< "\n";
1469 return make_ready_future
<>();
1471 return tcp_server
.start(std::ref(cache
), std::ref(system_stats
), port
);
1472 }).then([&tcp_server
] {
1473 return tcp_server
.invoke_on_all(&memcache::tcp_server::start
);
1475 if (engine().net().has_per_core_namespace()) {
1476 return udp_server
.start(std::ref(cache
), std::ref(system_stats
), port
);
1478 return udp_server
.start_single(std::ref(cache
), std::ref(system_stats
), port
);
1481 return udp_server
.invoke_on_all(&memcache::udp_server::set_max_datagram_size
,
1482 (size_t)config
["max-datagram-size"].as
<int>());
1484 return udp_server
.invoke_on_all(&memcache::udp_server::start
);
1485 }).then([&stats
, start_stats
= config
.count("stats")] {