]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/apps/memcached/memcache.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / apps / memcached / memcache.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2014-2015 Cloudius Systems
20 */
21
22 #include <boost/intrusive/unordered_set.hpp>
23 #include <boost/intrusive/list.hpp>
24 #include <boost/intrusive_ptr.hpp>
25 #include <boost/lexical_cast.hpp>
26 #include <iostream>
27 #include <iomanip>
28 #include <sstream>
29 #include <seastar/core/app-template.hh>
30 #include <seastar/core/reactor.hh>
31 #include <seastar/core/seastar.hh>
32 #include <seastar/core/loop.hh>
33 #include <seastar/core/timer-set.hh>
34 #include <seastar/core/shared_ptr.hh>
35 #include <seastar/core/stream.hh>
36 #include <seastar/core/memory.hh>
37 #include <seastar/core/units.hh>
38 #include <seastar/core/distributed.hh>
39 #include <seastar/core/vector-data-sink.hh>
40 #include <seastar/core/bitops.hh>
41 #include <seastar/core/slab.hh>
42 #include <seastar/core/align.hh>
43 #include <seastar/core/print.hh>
44 #include <seastar/net/api.hh>
45 #include <seastar/net/packet-data-source.hh>
46 #include <seastar/util/std-compat.hh>
47 #include <seastar/util/log.hh>
48 #include "ascii.hh"
49 #include "memcached.hh"
50 #include <unistd.h>
51
52 #define PLATFORM "seastar"
53 #define VERSION "v1.0"
54 #define VERSION_STRING PLATFORM " " VERSION
55
56 using namespace seastar;
57 using namespace net;
58
59 namespace memcache {
60
61 namespace bi = boost::intrusive;
62
63 static constexpr double default_slab_growth_factor = 1.25;
64 static constexpr uint64_t default_slab_page_size = 1UL*MB;
65 static constexpr uint64_t default_per_cpu_slab_size = 0UL; // zero means reclaimer is enabled.
66 static __thread slab_allocator<item>* slab;
67 static thread_local std::unique_ptr<slab_allocator<item>> slab_holder;
68
69 template<typename T>
70 using optional = std::optional<T>;
71
72 using clock_type = lowres_clock;
73
74 //
75 // "Expiration" is a uint32_t value.
76 // The minimal value of _time is when "expiration" is set to (seconds_in_a_month
77 // + 1).
78 // In this case _time will have a value of
79 //
80 // (seconds_in_a_month + 1 - Wall_Clock_Time_Since_Epoch)
81 //
82 // because lowres_clock now() initialized to zero when the application starts.
83 //
84 // We will use a timepoint at LLONG_MIN to represent a "never expire" value
85 // since it will not collide with the minimum _time value mentioned above for
86 // about 290 thousand years to come.
87 //
88 static constexpr clock_type::time_point never_expire_timepoint = clock_type::time_point(clock_type::duration::min());
89
90 struct expiration {
91 using time_point = clock_type::time_point;
92 using duration = time_point::duration;
93
94 static constexpr uint32_t seconds_in_a_month = 60U * 60 * 24 * 30;
95 time_point _time = never_expire_timepoint;
96
97 expiration() {}
98
99 expiration(clock_type::duration wc_to_clock_type_delta, uint32_t s) {
100 using namespace std::chrono;
101
102 static_assert(sizeof(clock_type::duration::rep) >= 8, "clock_type::duration::rep must be at least 8 bytes wide");
103
104 if (s == 0U) {
105 return; // means never expire.
106 } else if (s <= seconds_in_a_month) {
107 _time = clock_type::now() + seconds(s); // from delta
108 } else {
109 //
110 // seastar::reactor supports only a monotonic clock at the moment
111 // therefore this may make the elements with the absolute expiration
112 // time expire at the wrong time if the wall clock has been updated
113 // during the expiration period. However the original memcached has
114 // the same weakness.
115 //
116 // TODO: Fix this when a support for system_clock-based timers is
117 // added to the seastar::reactor.
118 //
119 _time = time_point(seconds(s) + wc_to_clock_type_delta); // from real time
120 }
121 }
122
123 bool ever_expires() {
124 return _time != never_expire_timepoint;
125 }
126
127 time_point to_time_point() {
128 return _time;
129 }
130 };
131
132 class item : public slab_item_base {
133 public:
134 using version_type = uint64_t;
135 using time_point = expiration::time_point;
136 using duration = expiration::duration;
137 static constexpr uint8_t field_alignment = alignof(void*);
138 private:
139 using hook_type = bi::unordered_set_member_hook<>;
140 // TODO: align shared data to cache line boundary
141 version_type _version;
142 hook_type _cache_link;
143 bi::list_member_hook<> _timer_link;
144 size_t _key_hash;
145 expiration _expiry;
146 uint32_t _value_size;
147 uint32_t _slab_page_index;
148 uint16_t _ref_count;
149 uint8_t _key_size;
150 uint8_t _ascii_prefix_size;
151 char _data[]; // layout: data=key, (data+key_size)=ascii_prefix, (data+key_size+ascii_prefix_size)=value.
152 friend class cache;
153 public:
154 item(uint32_t slab_page_index, item_key&& key, sstring&& ascii_prefix,
155 sstring&& value, expiration expiry, version_type version = 1)
156 : _version(version)
157 , _key_hash(key.hash())
158 , _expiry(expiry)
159 , _value_size(value.size())
160 , _slab_page_index(slab_page_index)
161 , _ref_count(0U)
162 , _key_size(key.key().size())
163 , _ascii_prefix_size(ascii_prefix.size())
164 {
165 assert(_key_size <= std::numeric_limits<uint8_t>::max());
166 assert(_ascii_prefix_size <= std::numeric_limits<uint8_t>::max());
167 // storing key
168 memcpy(_data, key.key().c_str(), _key_size);
169 // storing ascii_prefix
170 memcpy(_data + align_up(_key_size, field_alignment), ascii_prefix.c_str(), _ascii_prefix_size);
171 // storing value
172 memcpy(_data + align_up(_key_size, field_alignment) + align_up(_ascii_prefix_size, field_alignment),
173 value.c_str(), _value_size);
174 }
175
176 item(const item&) = delete;
177 item(item&&) = delete;
178
179 clock_type::time_point get_timeout() {
180 return _expiry.to_time_point();
181 }
182
183 version_type version() {
184 return _version;
185 }
186
187 const std::string_view key() const {
188 return std::string_view(_data, _key_size);
189 }
190
191 const std::string_view ascii_prefix() const {
192 const char *p = _data + align_up(_key_size, field_alignment);
193 return std::string_view(p, _ascii_prefix_size);
194 }
195
196 const std::string_view value() const {
197 const char *p = _data + align_up(_key_size, field_alignment) +
198 align_up(_ascii_prefix_size, field_alignment);
199 return std::string_view(p, _value_size);
200 }
201
202 size_t key_size() const {
203 return _key_size;
204 }
205
206 size_t ascii_prefix_size() const {
207 return _ascii_prefix_size;
208 }
209
210 size_t value_size() const {
211 return _value_size;
212 }
213
214 optional<uint64_t> data_as_integral() {
215 auto str = value().data();
216 if (str[0] == '-') {
217 return {};
218 }
219
220 auto len = _value_size;
221
222 // Strip trailing space
223 while (len && str[len - 1] == ' ') {
224 len--;
225 }
226
227 try {
228 return {boost::lexical_cast<uint64_t>(str, len)};
229 } catch (const boost::bad_lexical_cast& e) {
230 return {};
231 }
232 }
233
234 // needed by timer_set
235 bool cancel() {
236 return false;
237 }
238
239 // Methods required by slab allocator.
240 uint32_t get_slab_page_index() const {
241 return _slab_page_index;
242 }
243 bool is_unlocked() const {
244 return _ref_count == 1;
245 }
246
247 friend bool operator==(const item &a, const item &b) {
248 return (a._key_hash == b._key_hash) &&
249 (a._key_size == b._key_size) &&
250 (memcmp(a._data, b._data, a._key_size) == 0);
251 }
252
253 friend std::size_t hash_value(const item &i) {
254 return i._key_hash;
255 }
256
257 friend inline void intrusive_ptr_add_ref(item* it) {
258 assert(it->_ref_count >= 0);
259 ++it->_ref_count;
260 if (it->_ref_count == 2) {
261 slab->lock_item(it);
262 }
263 }
264
265 friend inline void intrusive_ptr_release(item* it) {
266 --it->_ref_count;
267 if (it->_ref_count == 1) {
268 slab->unlock_item(it);
269 } else if (it->_ref_count == 0) {
270 slab->free(it);
271 }
272 assert(it->_ref_count >= 0);
273 }
274
275 friend struct item_key_cmp;
276 };
277
278 struct item_key_cmp
279 {
280 private:
281 bool compare(const item_key& key, const item& it) const {
282 return (it._key_hash == key.hash()) &&
283 (it._key_size == key.key().size()) &&
284 (memcmp(it._data, key.key().c_str(), it._key_size) == 0);
285 }
286 public:
287 bool operator()(const item_key& key, const item& it) const {
288 return compare(key, it);
289 }
290
291 bool operator()(const item& it, const item_key& key) const {
292 return compare(key, it);
293 }
294 };
295
296 using item_ptr = foreign_ptr<boost::intrusive_ptr<item>>;
297
298 struct cache_stats {
299 size_t _get_hits {};
300 size_t _get_misses {};
301 size_t _set_adds {};
302 size_t _set_replaces {};
303 size_t _cas_hits {};
304 size_t _cas_misses {};
305 size_t _cas_badval {};
306 size_t _delete_misses {};
307 size_t _delete_hits {};
308 size_t _incr_misses {};
309 size_t _incr_hits {};
310 size_t _decr_misses {};
311 size_t _decr_hits {};
312 size_t _expired {};
313 size_t _evicted {};
314 size_t _bytes {};
315 size_t _resize_failure {};
316 size_t _size {};
317 size_t _reclaims{};
318
319 void operator+=(const cache_stats& o) {
320 _get_hits += o._get_hits;
321 _get_misses += o._get_misses;
322 _set_adds += o._set_adds;
323 _set_replaces += o._set_replaces;
324 _cas_hits += o._cas_hits;
325 _cas_misses += o._cas_misses;
326 _cas_badval += o._cas_badval;
327 _delete_misses += o._delete_misses;
328 _delete_hits += o._delete_hits;
329 _incr_misses += o._incr_misses;
330 _incr_hits += o._incr_hits;
331 _decr_misses += o._decr_misses;
332 _decr_hits += o._decr_hits;
333 _expired += o._expired;
334 _evicted += o._evicted;
335 _bytes += o._bytes;
336 _resize_failure += o._resize_failure;
337 _size += o._size;
338 _reclaims += o._reclaims;
339 }
340 };
341
342 enum class cas_result {
343 not_found, stored, bad_version
344 };
345
346 struct remote_origin_tag {
347 template <typename T>
348 static inline
349 T move_if_local(T& ref) {
350 return ref;
351 }
352 };
353
354 struct local_origin_tag {
355 template <typename T>
356 static inline
357 T move_if_local(T& ref) {
358 return std::move(ref);
359 }
360 };
361
362 struct item_insertion_data {
363 item_key key;
364 sstring ascii_prefix;
365 sstring data;
366 expiration expiry;
367 };
368
369 class cache {
370 private:
371 using cache_type = bi::unordered_set<item,
372 bi::member_hook<item, item::hook_type, &item::_cache_link>,
373 bi::power_2_buckets<true>,
374 bi::constant_time_size<true>>;
375 using cache_iterator = typename cache_type::iterator;
376 static constexpr size_t initial_bucket_count = 1 << 10;
377 static constexpr float load_factor = 0.75f;
378 size_t _resize_up_threshold = load_factor * initial_bucket_count;
379 std::vector<cache_type::bucket_type> _buckets;
380 cache_type _cache;
381 seastar::timer_set<item, &item::_timer_link> _alive;
382 timer<clock_type> _timer;
383 // delta in seconds between the current values of a wall clock and a clock_type clock
384 clock_type::duration _wc_to_clock_type_delta;
385 cache_stats _stats;
386 timer<clock_type> _flush_timer;
387 private:
388 size_t item_size(item& item_ref) {
389 constexpr size_t field_alignment = alignof(void*);
390 return sizeof(item) +
391 align_up(item_ref.key_size(), field_alignment) +
392 align_up(item_ref.ascii_prefix_size(), field_alignment) +
393 item_ref.value_size();
394 }
395
396 size_t item_size(item_insertion_data& insertion) {
397 constexpr size_t field_alignment = alignof(void*);
398 auto size = sizeof(item) +
399 align_up(insertion.key.key().size(), field_alignment) +
400 align_up(insertion.ascii_prefix.size(), field_alignment) +
401 insertion.data.size();
402 #ifdef __DEBUG__
403 static bool print_item_footprint = true;
404 if (print_item_footprint) {
405 print_item_footprint = false;
406 std::cout << __FUNCTION__ << ": " << size << "\n";
407 std::cout << "sizeof(item) " << sizeof(item) << "\n";
408 std::cout << "key.size " << insertion.key.key().size() << "\n";
409 std::cout << "value.size " << insertion.data.size() << "\n";
410 std::cout << "ascii_prefix.size " << insertion.ascii_prefix.size() << "\n";
411 }
412 #endif
413 return size;
414 }
415
416 template <bool IsInCache = true, bool IsInTimerList = true, bool Release = true>
417 void erase(item& item_ref) {
418 if (IsInCache) {
419 _cache.erase(_cache.iterator_to(item_ref));
420 }
421 if (IsInTimerList) {
422 if (item_ref._expiry.ever_expires()) {
423 _alive.remove(item_ref);
424 }
425 }
426 _stats._bytes -= item_size(item_ref);
427 if (Release) {
428 // memory used by item shouldn't be freed when slab is replacing it with another item.
429 intrusive_ptr_release(&item_ref);
430 }
431 }
432
433 void expire() {
434 using namespace std::chrono;
435
436 //
437 // Adjust the delta on every timer event to minimize an error caused
438 // by a wall clock adjustment.
439 //
440 _wc_to_clock_type_delta =
441 duration_cast<clock_type::duration>(clock_type::now().time_since_epoch() - system_clock::now().time_since_epoch());
442
443 auto exp = _alive.expire(clock_type::now());
444 while (!exp.empty()) {
445 auto item = &*exp.begin();
446 exp.pop_front();
447 erase<true, false>(*item);
448 _stats._expired++;
449 }
450 _timer.arm(_alive.get_next_timeout());
451 }
452
453 inline
454 cache_iterator find(const item_key& key) {
455 return _cache.find(key, std::hash<item_key>(), item_key_cmp());
456 }
457
458 template <typename Origin>
459 inline
460 cache_iterator add_overriding(cache_iterator i, item_insertion_data& insertion) {
461 auto& old_item = *i;
462 uint64_t old_item_version = old_item._version;
463
464 erase(old_item);
465
466 size_t size = item_size(insertion);
467 auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix),
468 Origin::move_if_local(insertion.data), insertion.expiry, old_item_version + 1);
469 intrusive_ptr_add_ref(new_item);
470
471 auto insert_result = _cache.insert(*new_item);
472 assert(insert_result.second);
473 if (insertion.expiry.ever_expires() && _alive.insert(*new_item)) {
474 _timer.rearm(new_item->get_timeout());
475 }
476 _stats._bytes += size;
477 return insert_result.first;
478 }
479
480 template <typename Origin>
481 inline
482 void add_new(item_insertion_data& insertion) {
483 size_t size = item_size(insertion);
484 auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix),
485 Origin::move_if_local(insertion.data), insertion.expiry);
486 intrusive_ptr_add_ref(new_item);
487 auto& item_ref = *new_item;
488 _cache.insert(item_ref);
489 if (insertion.expiry.ever_expires() && _alive.insert(item_ref)) {
490 _timer.rearm(item_ref.get_timeout());
491 }
492 _stats._bytes += size;
493 maybe_rehash();
494 }
495
496 void maybe_rehash() {
497 if (_cache.size() >= _resize_up_threshold) {
498 auto new_size = _cache.bucket_count() * 2;
499 std::vector<cache_type::bucket_type> old_buckets;
500 try {
501 old_buckets = std::exchange(_buckets, std::vector<cache_type::bucket_type>(new_size));
502 } catch (const std::bad_alloc& e) {
503 _stats._resize_failure++;
504 return;
505 }
506 _cache.rehash(typename cache_type::bucket_traits(_buckets.data(), new_size));
507 _resize_up_threshold = _cache.bucket_count() * load_factor;
508 }
509 }
510 public:
511 cache(uint64_t per_cpu_slab_size, uint64_t slab_page_size)
512 : _buckets(initial_bucket_count)
513 , _cache(cache_type::bucket_traits(_buckets.data(), initial_bucket_count))
514 {
515 using namespace std::chrono;
516
517 _wc_to_clock_type_delta =
518 duration_cast<clock_type::duration>(clock_type::now().time_since_epoch() - system_clock::now().time_since_epoch());
519
520 _timer.set_callback([this] { expire(); });
521 _flush_timer.set_callback([this] { flush_all(); });
522
523 // initialize per-thread slab allocator.
524 slab_holder = std::make_unique<slab_allocator<item>>(default_slab_growth_factor, per_cpu_slab_size, slab_page_size,
525 [this](item& item_ref) { erase<true, true, false>(item_ref); _stats._evicted++; });
526 slab = slab_holder.get();
527 #ifdef __DEBUG__
528 static bool print_slab_classes = true;
529 if (print_slab_classes) {
530 print_slab_classes = false;
531 slab->print_slab_classes();
532 }
533 #endif
534 }
535
536 ~cache() {
537 flush_all();
538 }
539
540 void flush_all() {
541 _flush_timer.cancel();
542 _cache.erase_and_dispose(_cache.begin(), _cache.end(), [this] (item* it) {
543 erase<false, true>(*it);
544 });
545 }
546
547 void flush_at(uint32_t time) {
548 auto expiry = expiration(get_wc_to_clock_type_delta(), time);
549 _flush_timer.rearm(expiry.to_time_point());
550 }
551
552 template <typename Origin = local_origin_tag>
553 bool set(item_insertion_data& insertion) {
554 auto i = find(insertion.key);
555 if (i != _cache.end()) {
556 add_overriding<Origin>(i, insertion);
557 _stats._set_replaces++;
558 return true;
559 } else {
560 add_new<Origin>(insertion);
561 _stats._set_adds++;
562 return false;
563 }
564 }
565
566 template <typename Origin = local_origin_tag>
567 bool add(item_insertion_data& insertion) {
568 auto i = find(insertion.key);
569 if (i != _cache.end()) {
570 return false;
571 }
572
573 _stats._set_adds++;
574 add_new<Origin>(insertion);
575 return true;
576 }
577
578 template <typename Origin = local_origin_tag>
579 bool replace(item_insertion_data& insertion) {
580 auto i = find(insertion.key);
581 if (i == _cache.end()) {
582 return false;
583 }
584
585 _stats._set_replaces++;
586 add_overriding<Origin>(i, insertion);
587 return true;
588 }
589
590 bool remove(const item_key& key) {
591 auto i = find(key);
592 if (i == _cache.end()) {
593 _stats._delete_misses++;
594 return false;
595 }
596 _stats._delete_hits++;
597 auto& item_ref = *i;
598 erase(item_ref);
599 return true;
600 }
601
602 item_ptr get(const item_key& key) {
603 auto i = find(key);
604 if (i == _cache.end()) {
605 _stats._get_misses++;
606 return nullptr;
607 }
608 _stats._get_hits++;
609 auto& item_ref = *i;
610 return item_ptr(&item_ref);
611 }
612
613 template <typename Origin = local_origin_tag>
614 cas_result cas(item_insertion_data& insertion, item::version_type version) {
615 auto i = find(insertion.key);
616 if (i == _cache.end()) {
617 _stats._cas_misses++;
618 return cas_result::not_found;
619 }
620 auto& item_ref = *i;
621 if (item_ref._version != version) {
622 _stats._cas_badval++;
623 return cas_result::bad_version;
624 }
625 _stats._cas_hits++;
626 add_overriding<Origin>(i, insertion);
627 return cas_result::stored;
628 }
629
630 size_t size() {
631 return _cache.size();
632 }
633
634 size_t bucket_count() {
635 return _cache.bucket_count();
636 }
637
638 cache_stats stats() {
639 _stats._size = size();
640 return _stats;
641 }
642
643 template <typename Origin = local_origin_tag>
644 std::pair<item_ptr, bool> incr(item_key& key, uint64_t delta) {
645 auto i = find(key);
646 if (i == _cache.end()) {
647 _stats._incr_misses++;
648 return {item_ptr{}, false};
649 }
650 auto& item_ref = *i;
651 _stats._incr_hits++;
652 auto value = item_ref.data_as_integral();
653 if (!value) {
654 return {boost::intrusive_ptr<item>(&item_ref), false};
655 }
656 item_insertion_data insertion {
657 .key = Origin::move_if_local(key),
658 .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()),
659 .data = to_sstring(*value + delta),
660 .expiry = item_ref._expiry
661 };
662 i = add_overriding<local_origin_tag>(i, insertion);
663 return {boost::intrusive_ptr<item>(&*i), true};
664 }
665
666 template <typename Origin = local_origin_tag>
667 std::pair<item_ptr, bool> decr(item_key& key, uint64_t delta) {
668 auto i = find(key);
669 if (i == _cache.end()) {
670 _stats._decr_misses++;
671 return {item_ptr{}, false};
672 }
673 auto& item_ref = *i;
674 _stats._decr_hits++;
675 auto value = item_ref.data_as_integral();
676 if (!value) {
677 return {boost::intrusive_ptr<item>(&item_ref), false};
678 }
679 item_insertion_data insertion {
680 .key = Origin::move_if_local(key),
681 .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()),
682 .data = to_sstring(*value - std::min(*value, delta)),
683 .expiry = item_ref._expiry
684 };
685 i = add_overriding<local_origin_tag>(i, insertion);
686 return {boost::intrusive_ptr<item>(&*i), true};
687 }
688
689 std::pair<unsigned, foreign_ptr<lw_shared_ptr<std::string>>> print_hash_stats() {
690 static constexpr unsigned bits = sizeof(size_t) * 8;
691 size_t histo[bits + 1] {};
692 size_t max_size = 0;
693 unsigned max_bucket = 0;
694
695 for (size_t i = 0; i < _cache.bucket_count(); i++) {
696 size_t size = _cache.bucket_size(i);
697 unsigned bucket;
698 if (size == 0) {
699 bucket = 0;
700 } else {
701 bucket = bits - count_leading_zeros(size);
702 }
703 max_bucket = std::max(max_bucket, bucket);
704 max_size = std::max(max_size, size);
705 histo[bucket]++;
706 }
707
708 std::stringstream ss;
709
710 ss << "size: " << _cache.size() << "\n";
711 ss << "buckets: " << _cache.bucket_count() << "\n";
712 ss << "load: " << format("{:.2f}", (double)_cache.size() / _cache.bucket_count()) << "\n";
713 ss << "max bucket occupancy: " << max_size << "\n";
714 ss << "bucket occupancy histogram:\n";
715
716 for (unsigned i = 0; i < (max_bucket + 2); i++) {
717 ss << " ";
718 if (i == 0) {
719 ss << "0: ";
720 } else if (i == 1) {
721 ss << "1: ";
722 } else {
723 ss << (1 << (i - 1)) << "+: ";
724 }
725 ss << histo[i] << "\n";
726 }
727 return {this_shard_id(), make_foreign(make_lw_shared<std::string>(ss.str()))};
728 }
729
730 future<> stop() { return make_ready_future<>(); }
731 clock_type::duration get_wc_to_clock_type_delta() { return _wc_to_clock_type_delta; }
732 };
733
734 class sharded_cache {
735 private:
736 distributed<cache>& _peers;
737
738 inline
739 unsigned get_cpu(const item_key& key) {
740 return std::hash<item_key>()(key) % smp::count;
741 }
742 public:
743 sharded_cache(distributed<cache>& peers) : _peers(peers) {}
744
745 future<> flush_all() {
746 return _peers.invoke_on_all(&cache::flush_all);
747 }
748
749 future<> flush_at(uint32_t time) {
750 return _peers.invoke_on_all(&cache::flush_at, time);
751 }
752
753 auto get_wc_to_clock_type_delta() { return _peers.local().get_wc_to_clock_type_delta(); }
754
755 // The caller must keep @insertion live until the resulting future resolves.
756 future<bool> set(item_insertion_data& insertion) {
757 auto cpu = get_cpu(insertion.key);
758 if (this_shard_id() == cpu) {
759 return make_ready_future<bool>(_peers.local().set(insertion));
760 }
761 return _peers.invoke_on(cpu, &cache::set<remote_origin_tag>, std::ref(insertion));
762 }
763
764 // The caller must keep @insertion live until the resulting future resolves.
765 future<bool> add(item_insertion_data& insertion) {
766 auto cpu = get_cpu(insertion.key);
767 if (this_shard_id() == cpu) {
768 return make_ready_future<bool>(_peers.local().add(insertion));
769 }
770 return _peers.invoke_on(cpu, &cache::add<remote_origin_tag>, std::ref(insertion));
771 }
772
773 // The caller must keep @insertion live until the resulting future resolves.
774 future<bool> replace(item_insertion_data& insertion) {
775 auto cpu = get_cpu(insertion.key);
776 if (this_shard_id() == cpu) {
777 return make_ready_future<bool>(_peers.local().replace(insertion));
778 }
779 return _peers.invoke_on(cpu, &cache::replace<remote_origin_tag>, std::ref(insertion));
780 }
781
782 // The caller must keep @key live until the resulting future resolves.
783 future<bool> remove(const item_key& key) {
784 auto cpu = get_cpu(key);
785 return _peers.invoke_on(cpu, &cache::remove, std::ref(key));
786 }
787
788 // The caller must keep @key live until the resulting future resolves.
789 future<item_ptr> get(const item_key& key) {
790 auto cpu = get_cpu(key);
791 return _peers.invoke_on(cpu, &cache::get, std::ref(key));
792 }
793
794 // The caller must keep @insertion live until the resulting future resolves.
795 future<cas_result> cas(item_insertion_data& insertion, item::version_type version) {
796 auto cpu = get_cpu(insertion.key);
797 if (this_shard_id() == cpu) {
798 return make_ready_future<cas_result>(_peers.local().cas(insertion, version));
799 }
800 return _peers.invoke_on(cpu, &cache::cas<remote_origin_tag>, std::ref(insertion), std::move(version));
801 }
802
803 future<cache_stats> stats() {
804 return _peers.map_reduce(adder<cache_stats>(), &cache::stats);
805 }
806
807 // The caller must keep @key live until the resulting future resolves.
808 future<std::pair<item_ptr, bool>> incr(item_key& key, uint64_t delta) {
809 auto cpu = get_cpu(key);
810 if (this_shard_id() == cpu) {
811 return make_ready_future<std::pair<item_ptr, bool>>(
812 _peers.local().incr<local_origin_tag>(key, delta));
813 }
814 return _peers.invoke_on(cpu, &cache::incr<remote_origin_tag>, std::ref(key), std::move(delta));
815 }
816
817 // The caller must keep @key live until the resulting future resolves.
818 future<std::pair<item_ptr, bool>> decr(item_key& key, uint64_t delta) {
819 auto cpu = get_cpu(key);
820 if (this_shard_id() == cpu) {
821 return make_ready_future<std::pair<item_ptr, bool>>(
822 _peers.local().decr(key, delta));
823 }
824 return _peers.invoke_on(cpu, &cache::decr<remote_origin_tag>, std::ref(key), std::move(delta));
825 }
826
827 future<> print_hash_stats(output_stream<char>& out) {
828 return _peers.map_reduce([&out] (std::pair<unsigned, foreign_ptr<lw_shared_ptr<std::string>>> data) mutable {
829 return out.write("=== CPU " + std::to_string(data.first) + " ===\r\n")
830 .then([&out, str = std::move(data.second)] {
831 return out.write(*str);
832 });
833 }, &cache::print_hash_stats);
834 }
835 };
836
837 struct system_stats {
838 uint32_t _curr_connections {};
839 uint32_t _total_connections {};
840 uint64_t _cmd_get {};
841 uint64_t _cmd_set {};
842 uint64_t _cmd_flush {};
843 clock_type::time_point _start_time;
844 public:
845 system_stats() {
846 _start_time = clock_type::time_point::max();
847 }
848 system_stats(clock_type::time_point start_time)
849 : _start_time(start_time) {
850 }
851 system_stats self() {
852 return *this;
853 }
854 void operator+=(const system_stats& other) {
855 _curr_connections += other._curr_connections;
856 _total_connections += other._total_connections;
857 _cmd_get += other._cmd_get;
858 _cmd_set += other._cmd_set;
859 _cmd_flush += other._cmd_flush;
860 _start_time = std::min(_start_time, other._start_time);
861 }
862 future<> stop() { return make_ready_future<>(); }
863 };
864
865 class ascii_protocol {
866 private:
867 using this_type = ascii_protocol;
868 sharded_cache& _cache;
869 distributed<system_stats>& _system_stats;
870 memcache_ascii_parser _parser;
871 item_key _item_key;
872 item_insertion_data _insertion;
873 std::vector<item_ptr> _items;
874 private:
875 static constexpr const char *msg_crlf = "\r\n";
876 static constexpr const char *msg_error = "ERROR\r\n";
877 static constexpr const char *msg_stored = "STORED\r\n";
878 static constexpr const char *msg_not_stored = "NOT_STORED\r\n";
879 static constexpr const char *msg_end = "END\r\n";
880 static constexpr const char *msg_value = "VALUE ";
881 static constexpr const char *msg_deleted = "DELETED\r\n";
882 static constexpr const char *msg_not_found = "NOT_FOUND\r\n";
883 static constexpr const char *msg_ok = "OK\r\n";
884 static constexpr const char *msg_version = "VERSION " VERSION_STRING "\r\n";
885 static constexpr const char *msg_exists = "EXISTS\r\n";
886 static constexpr const char *msg_stat = "STAT ";
887 static constexpr const char *msg_out_of_memory = "SERVER_ERROR Out of memory allocating new item\r\n";
888 static constexpr const char *msg_error_non_numeric_value = "CLIENT_ERROR cannot increment or decrement non-numeric value\r\n";
889 private:
890 template <bool WithVersion>
891 static void append_item(scattered_message<char>& msg, item_ptr item) {
892 if (!item) {
893 return;
894 }
895
896 msg.append_static("VALUE ");
897 msg.append_static(item->key());
898 msg.append_static(item->ascii_prefix());
899
900 if (WithVersion) {
901 msg.append_static(" ");
902 msg.append(to_sstring(item->version()));
903 }
904
905 msg.append_static(msg_crlf);
906 msg.append_static(item->value());
907 msg.append_static(msg_crlf);
908 msg.on_delete([item = std::move(item)] {});
909 }
910
911 template <bool WithVersion>
912 future<> handle_get(output_stream<char>& out) {
913 _system_stats.local()._cmd_get++;
914 if (_parser._keys.size() == 1) {
915 return _cache.get(_parser._keys[0]).then([&out] (auto item) -> future<> {
916 scattered_message<char> msg;
917 this_type::append_item<WithVersion>(msg, std::move(item));
918 msg.append_static(msg_end);
919 return out.write(std::move(msg));
920 });
921 } else {
922 _items.clear();
923 return parallel_for_each(_parser._keys.begin(), _parser._keys.end(), [this] (const auto& key) {
924 return _cache.get(key).then([this] (auto item) {
925 _items.emplace_back(std::move(item));
926 });
927 }).then([this, &out] () {
928 scattered_message<char> msg;
929 for (auto& item : _items) {
930 append_item<WithVersion>(msg, std::move(item));
931 }
932 msg.append_static(msg_end);
933 return out.write(std::move(msg));
934 });
935 }
936 }
937
938 template <typename Value>
939 static future<> print_stat(output_stream<char>& out, const char* key, Value value) {
940 return out.write(msg_stat)
941 .then([&out, key] { return out.write(key); })
942 .then([&out] { return out.write(" "); })
943 .then([&out, value] { return out.write(to_sstring(value)); })
944 .then([&out] { return out.write(msg_crlf); });
945 }
946
947 future<> print_stats(output_stream<char>& out) {
948 return _cache.stats().then([this, &out] (auto stats) {
949 return _system_stats.map_reduce(adder<system_stats>(), &system_stats::self)
950 .then([&out, all_cache_stats = std::move(stats)] (auto all_system_stats) -> future<> {
951 auto now = clock_type::now();
952 auto total_items = all_cache_stats._set_replaces + all_cache_stats._set_adds
953 + all_cache_stats._cas_hits;
954 return print_stat(out, "pid", getpid())
955 .then([&out, uptime = now - all_system_stats._start_time] {
956 return print_stat(out, "uptime",
957 std::chrono::duration_cast<std::chrono::seconds>(uptime).count());
958 }).then([now, &out] {
959 return print_stat(out, "time",
960 std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count());
961 }).then([&out] {
962 return print_stat(out, "version", VERSION_STRING);
963 }).then([&out] {
964 return print_stat(out, "pointer_size", sizeof(void*)*8);
965 }).then([&out, v = all_system_stats._curr_connections] {
966 return print_stat(out, "curr_connections", v);
967 }).then([&out, v = all_system_stats._total_connections] {
968 return print_stat(out, "total_connections", v);
969 }).then([&out, v = all_system_stats._curr_connections] {
970 return print_stat(out, "connection_structures", v);
971 }).then([&out, v = all_system_stats._cmd_get] {
972 return print_stat(out, "cmd_get", v);
973 }).then([&out, v = all_system_stats._cmd_set] {
974 return print_stat(out, "cmd_set", v);
975 }).then([&out, v = all_system_stats._cmd_flush] {
976 return print_stat(out, "cmd_flush", v);
977 }).then([&out] {
978 return print_stat(out, "cmd_touch", 0);
979 }).then([&out, v = all_cache_stats._get_hits] {
980 return print_stat(out, "get_hits", v);
981 }).then([&out, v = all_cache_stats._get_misses] {
982 return print_stat(out, "get_misses", v);
983 }).then([&out, v = all_cache_stats._delete_misses] {
984 return print_stat(out, "delete_misses", v);
985 }).then([&out, v = all_cache_stats._delete_hits] {
986 return print_stat(out, "delete_hits", v);
987 }).then([&out, v = all_cache_stats._incr_misses] {
988 return print_stat(out, "incr_misses", v);
989 }).then([&out, v = all_cache_stats._incr_hits] {
990 return print_stat(out, "incr_hits", v);
991 }).then([&out, v = all_cache_stats._decr_misses] {
992 return print_stat(out, "decr_misses", v);
993 }).then([&out, v = all_cache_stats._decr_hits] {
994 return print_stat(out, "decr_hits", v);
995 }).then([&out, v = all_cache_stats._cas_misses] {
996 return print_stat(out, "cas_misses", v);
997 }).then([&out, v = all_cache_stats._cas_hits] {
998 return print_stat(out, "cas_hits", v);
999 }).then([&out, v = all_cache_stats._cas_badval] {
1000 return print_stat(out, "cas_badval", v);
1001 }).then([&out] {
1002 return print_stat(out, "touch_hits", 0);
1003 }).then([&out] {
1004 return print_stat(out, "touch_misses", 0);
1005 }).then([&out] {
1006 return print_stat(out, "auth_cmds", 0);
1007 }).then([&out] {
1008 return print_stat(out, "auth_errors", 0);
1009 }).then([&out] {
1010 return print_stat(out, "threads", smp::count);
1011 }).then([&out, v = all_cache_stats._size] {
1012 return print_stat(out, "curr_items", v);
1013 }).then([&out, v = total_items] {
1014 return print_stat(out, "total_items", v);
1015 }).then([&out, v = all_cache_stats._expired] {
1016 return print_stat(out, "seastar.expired", v);
1017 }).then([&out, v = all_cache_stats._resize_failure] {
1018 return print_stat(out, "seastar.resize_failure", v);
1019 }).then([&out, v = all_cache_stats._evicted] {
1020 return print_stat(out, "evictions", v);
1021 }).then([&out, v = all_cache_stats._bytes] {
1022 return print_stat(out, "bytes", v);
1023 }).then([&out] {
1024 return out.write(msg_end);
1025 });
1026 });
1027 });
1028 }
1029 public:
1030 ascii_protocol(sharded_cache& cache, distributed<system_stats>& system_stats)
1031 : _cache(cache)
1032 , _system_stats(system_stats)
1033 {}
1034
1035 void prepare_insertion() {
1036 _insertion = item_insertion_data{
1037 .key = std::move(_parser._key),
1038 .ascii_prefix = make_sstring(" ", _parser._flags_str, " ", _parser._size_str),
1039 .data = std::move(_parser._blob),
1040 .expiry = expiration(_cache.get_wc_to_clock_type_delta(), _parser._expiration)
1041 };
1042 }
1043
1044 future<> handle(input_stream<char>& in, output_stream<char>& out) {
1045 _parser.init();
1046 return in.consume(_parser).then([this, &out] () -> future<> {
1047 switch (_parser._state) {
1048 case memcache_ascii_parser::state::eof:
1049 return make_ready_future<>();
1050
1051 case memcache_ascii_parser::state::error:
1052 return out.write(msg_error);
1053
1054 case memcache_ascii_parser::state::cmd_set:
1055 {
1056 _system_stats.local()._cmd_set++;
1057 prepare_insertion();
1058 auto f = _cache.set(_insertion);
1059 if (_parser._noreply) {
1060 return std::move(f).discard_result();
1061 }
1062 return std::move(f).then([&out] (...) {
1063 return out.write(msg_stored);
1064 });
1065 }
1066
1067 case memcache_ascii_parser::state::cmd_cas:
1068 {
1069 _system_stats.local()._cmd_set++;
1070 prepare_insertion();
1071 auto f = _cache.cas(_insertion, _parser._version);
1072 if (_parser._noreply) {
1073 return std::move(f).discard_result();
1074 }
1075 return std::move(f).then([&out] (auto result) {
1076 switch (result) {
1077 case cas_result::stored:
1078 return out.write(msg_stored);
1079 case cas_result::not_found:
1080 return out.write(msg_not_found);
1081 case cas_result::bad_version:
1082 return out.write(msg_exists);
1083 default:
1084 std::abort();
1085 }
1086 });
1087 }
1088
1089 case memcache_ascii_parser::state::cmd_add:
1090 {
1091 _system_stats.local()._cmd_set++;
1092 prepare_insertion();
1093 auto f = _cache.add(_insertion);
1094 if (_parser._noreply) {
1095 return std::move(f).discard_result();
1096 }
1097 return std::move(f).then([&out] (bool added) {
1098 return out.write(added ? msg_stored : msg_not_stored);
1099 });
1100 }
1101
1102 case memcache_ascii_parser::state::cmd_replace:
1103 {
1104 _system_stats.local()._cmd_set++;
1105 prepare_insertion();
1106 auto f = _cache.replace(_insertion);
1107 if (_parser._noreply) {
1108 return std::move(f).discard_result();
1109 }
1110 return std::move(f).then([&out] (auto replaced) {
1111 return out.write(replaced ? msg_stored : msg_not_stored);
1112 });
1113 }
1114
1115 case memcache_ascii_parser::state::cmd_get:
1116 return handle_get<false>(out);
1117
1118 case memcache_ascii_parser::state::cmd_gets:
1119 return handle_get<true>(out);
1120
1121 case memcache_ascii_parser::state::cmd_delete:
1122 {
1123 auto f = _cache.remove(_parser._key);
1124 if (_parser._noreply) {
1125 return std::move(f).discard_result();
1126 }
1127 return std::move(f).then([&out] (bool removed) {
1128 return out.write(removed ? msg_deleted : msg_not_found);
1129 });
1130 }
1131
1132 case memcache_ascii_parser::state::cmd_flush_all:
1133 {
1134 _system_stats.local()._cmd_flush++;
1135 if (_parser._expiration) {
1136 auto f = _cache.flush_at(_parser._expiration);
1137 if (_parser._noreply) {
1138 return f;
1139 }
1140 return std::move(f).then([&out] {
1141 return out.write(msg_ok);
1142 });
1143 } else {
1144 auto f = _cache.flush_all();
1145 if (_parser._noreply) {
1146 return f;
1147 }
1148 return std::move(f).then([&out] {
1149 return out.write(msg_ok);
1150 });
1151 }
1152 }
1153
1154 case memcache_ascii_parser::state::cmd_version:
1155 return out.write(msg_version);
1156
1157 case memcache_ascii_parser::state::cmd_stats:
1158 return print_stats(out);
1159
1160 case memcache_ascii_parser::state::cmd_stats_hash:
1161 return _cache.print_hash_stats(out);
1162
1163 case memcache_ascii_parser::state::cmd_incr:
1164 {
1165 auto f = _cache.incr(_parser._key, _parser._u64);
1166 if (_parser._noreply) {
1167 return std::move(f).discard_result();
1168 }
1169 return std::move(f).then([&out] (auto result) {
1170 auto item = std::move(result.first);
1171 if (!item) {
1172 return out.write(msg_not_found);
1173 }
1174 auto incremented = result.second;
1175 if (!incremented) {
1176 return out.write(msg_error_non_numeric_value);
1177 }
1178 return out.write(item->value().data(), item->value_size()).then([&out] {
1179 return out.write(msg_crlf);
1180 });
1181 });
1182 }
1183
1184 case memcache_ascii_parser::state::cmd_decr:
1185 {
1186 auto f = _cache.decr(_parser._key, _parser._u64);
1187 if (_parser._noreply) {
1188 return std::move(f).discard_result();
1189 }
1190 return std::move(f).then([&out] (auto result) {
1191 auto item = std::move(result.first);
1192 if (!item) {
1193 return out.write(msg_not_found);
1194 }
1195 auto decremented = result.second;
1196 if (!decremented) {
1197 return out.write(msg_error_non_numeric_value);
1198 }
1199 return out.write(item->value().data(), item->value_size()).then([&out] {
1200 return out.write(msg_crlf);
1201 });
1202 });
1203 }
1204 };
1205 std::abort();
1206 }).then_wrapped([this, &out] (auto&& f) -> future<> {
1207 // FIXME: then_wrapped() being scheduled even though no exception was triggered has a
1208 // performance cost of about 2.6%. Not using it means maintainability penalty.
1209 try {
1210 f.get();
1211 } catch (std::bad_alloc& e) {
1212 if (_parser._noreply) {
1213 return make_ready_future<>();
1214 }
1215 return out.write(msg_out_of_memory);
1216 }
1217 return make_ready_future<>();
1218 });
1219 };
1220 };
1221
1222 class udp_server {
1223 public:
1224 static const size_t default_max_datagram_size = 1400;
1225 private:
1226 std::optional<future<>> _task;
1227 sharded_cache& _cache;
1228 distributed<system_stats>& _system_stats;
1229 udp_channel _chan;
1230 uint16_t _port;
1231 size_t _max_datagram_size = default_max_datagram_size;
1232
1233 struct header {
1234 packed<uint16_t> _request_id;
1235 packed<uint16_t> _sequence_number;
1236 packed<uint16_t> _n;
1237 packed<uint16_t> _reserved;
1238
1239 template<typename Adjuster>
1240 auto adjust_endianness(Adjuster a) {
1241 return a(_request_id, _sequence_number, _n);
1242 }
1243 } __attribute__((packed));
1244
1245 struct connection {
1246 ipv4_addr _src;
1247 uint16_t _request_id;
1248 input_stream<char> _in;
1249 output_stream<char> _out;
1250 std::vector<packet> _out_bufs;
1251 ascii_protocol _proto;
1252
1253 static output_stream_options make_opts() noexcept {
1254 output_stream_options opts;
1255 opts.trim_to_size = true;
1256 return opts;
1257 }
1258
1259 connection(ipv4_addr src, uint16_t request_id, input_stream<char>&& in, size_t out_size,
1260 sharded_cache& c, distributed<system_stats>& system_stats)
1261 : _src(src)
1262 , _request_id(request_id)
1263 , _in(std::move(in))
1264 , _out(output_stream<char>(data_sink(std::make_unique<vector_data_sink>(_out_bufs)), out_size, make_opts()))
1265 , _proto(c, system_stats)
1266 {}
1267
1268 future<> respond(udp_channel& chan) {
1269 int i = 0;
1270 return do_for_each(_out_bufs.begin(), _out_bufs.end(), [this, i, &chan] (packet& p) mutable {
1271 header* out_hdr = p.prepend_header<header>(0);
1272 out_hdr->_request_id = _request_id;
1273 out_hdr->_sequence_number = i++;
1274 out_hdr->_n = _out_bufs.size();
1275 *out_hdr = hton(*out_hdr);
1276 return chan.send(_src, std::move(p));
1277 });
1278 }
1279 };
1280
1281 public:
1282 udp_server(sharded_cache& c, distributed<system_stats>& system_stats, uint16_t port = 11211)
1283 : _cache(c)
1284 , _system_stats(system_stats)
1285 , _port(port)
1286 {}
1287
1288 void set_max_datagram_size(size_t max_datagram_size) {
1289 _max_datagram_size = max_datagram_size;
1290 }
1291
1292 void start() {
1293 _chan = make_udp_channel({_port});
1294 // Run in the background.
1295 _task = keep_doing([this] {
1296 return _chan.receive().then([this](udp_datagram dgram) {
1297 packet& p = dgram.get_data();
1298 if (p.len() < sizeof(header)) {
1299 // dropping invalid packet
1300 return make_ready_future<>();
1301 }
1302
1303 header hdr = ntoh(*p.get_header<header>());
1304 p.trim_front(sizeof(hdr));
1305
1306 auto request_id = hdr._request_id;
1307 auto in = as_input_stream(std::move(p));
1308 auto conn = make_lw_shared<connection>(dgram.get_src(), request_id, std::move(in),
1309 _max_datagram_size - sizeof(header), _cache, _system_stats);
1310
1311 if (hdr._n != 1 || hdr._sequence_number != 0) {
1312 return conn->_out.write("CLIENT_ERROR only single-datagram requests supported\r\n").then([this, conn] {
1313 return conn->_out.flush().then([this, conn] {
1314 return conn->respond(_chan).then([conn] {});
1315 });
1316 });
1317 }
1318
1319 return conn->_proto.handle(conn->_in, conn->_out).then([this, conn]() mutable {
1320 return conn->_out.flush().then([this, conn] {
1321 return conn->respond(_chan).then([conn] {});
1322 });
1323 });
1324 });
1325 });
1326 };
1327
1328 future<> stop() {
1329 _chan.shutdown_input();
1330 _chan.shutdown_output();
1331 return _task->handle_exception([](std::exception_ptr e) {
1332 std::cerr << "exception in udp_server " << e << '\n';
1333 });
1334 }
1335 };
1336
1337 class tcp_server {
1338 private:
1339 std::optional<future<>> _task;
1340 lw_shared_ptr<seastar::server_socket> _listener;
1341 sharded_cache& _cache;
1342 distributed<system_stats>& _system_stats;
1343 uint16_t _port;
1344 struct connection {
1345 connected_socket _socket;
1346 socket_address _addr;
1347 input_stream<char> _in;
1348 output_stream<char> _out;
1349 ascii_protocol _proto;
1350 distributed<system_stats>& _system_stats;
1351 connection(connected_socket&& socket, socket_address addr, sharded_cache& c, distributed<system_stats>& system_stats)
1352 : _socket(std::move(socket))
1353 , _addr(addr)
1354 , _in(_socket.input())
1355 , _out(_socket.output())
1356 , _proto(c, system_stats)
1357 , _system_stats(system_stats)
1358 {
1359 _system_stats.local()._curr_connections++;
1360 _system_stats.local()._total_connections++;
1361 }
1362 ~connection() {
1363 _system_stats.local()._curr_connections--;
1364 }
1365 };
1366 public:
1367 tcp_server(sharded_cache& cache, distributed<system_stats>& system_stats, uint16_t port = 11211)
1368 : _cache(cache)
1369 , _system_stats(system_stats)
1370 , _port(port)
1371 {}
1372
1373 void start() {
1374 listen_options lo;
1375 lo.reuse_address = true;
1376 _listener = seastar::server_socket(seastar::listen(make_ipv4_address({_port}), lo));
1377 // Run in the background until eof has reached on the input connection.
1378 _task = keep_doing([this] {
1379 return _listener->accept().then([this] (accept_result ar) mutable {
1380 connected_socket fd = std::move(ar.connection);
1381 socket_address addr = std::move(ar.remote_address);
1382 auto conn = make_lw_shared<connection>(std::move(fd), addr, _cache, _system_stats);
1383 (void)do_until([conn] { return conn->_in.eof(); }, [conn] {
1384 return conn->_proto.handle(conn->_in, conn->_out).then([conn] {
1385 return conn->_out.flush();
1386 });
1387 }).finally([conn] {
1388 return conn->_out.close().finally([conn]{});
1389 });
1390 });
1391 });
1392 }
1393
1394 future<> stop() {
1395 _listener->abort_accept();
1396 return _task->handle_exception([](std::exception_ptr e) {
1397 std::cerr << "exception in tcp_server " << e << '\n';
1398 });
1399 }
1400 };
1401
1402 class stats_printer {
1403 private:
1404 timer<> _timer;
1405 sharded_cache& _cache;
1406 public:
1407 stats_printer(sharded_cache& cache)
1408 : _cache(cache) {}
1409
1410 void start() {
1411 _timer.set_callback([this] {
1412 (void)_cache.stats().then([] (auto stats) {
1413 auto gets_total = stats._get_hits + stats._get_misses;
1414 auto get_hit_rate = gets_total ? ((double)stats._get_hits * 100 / gets_total) : 0;
1415 auto sets_total = stats._set_adds + stats._set_replaces;
1416 auto set_replace_rate = sets_total ? ((double)stats._set_replaces * 100/ sets_total) : 0;
1417 std::cout << "items: " << stats._size << " "
1418 << std::setprecision(2) << std::fixed
1419 << "get: " << stats._get_hits << "/" << gets_total << " (" << get_hit_rate << "%) "
1420 << "set: " << stats._set_replaces << "/" << sets_total << " (" << set_replace_rate << "%)";
1421 std::cout << std::endl;
1422 });
1423 });
1424 _timer.arm_periodic(std::chrono::seconds(1));
1425 }
1426
1427 future<> stop() { return make_ready_future<>(); }
1428 };
1429
1430 } /* namespace memcache */
1431
1432 int main(int ac, char** av) {
1433 distributed<memcache::cache> cache_peers;
1434 memcache::sharded_cache cache(cache_peers);
1435 distributed<memcache::system_stats> system_stats;
1436 distributed<memcache::udp_server> udp_server;
1437 distributed<memcache::tcp_server> tcp_server;
1438 memcache::stats_printer stats(cache);
1439
1440 namespace bpo = boost::program_options;
1441 app_template app;
1442 app.add_options()
1443 ("max-datagram-size", bpo::value<int>()->default_value(memcache::udp_server::default_max_datagram_size),
1444 "Maximum size of UDP datagram")
1445 ("max-slab-size", bpo::value<uint64_t>()->default_value(memcache::default_per_cpu_slab_size/MB),
1446 "Maximum memory to be used for items (value in megabytes) (reclaimer is disabled if set)")
1447 ("slab-page-size", bpo::value<uint64_t>()->default_value(memcache::default_slab_page_size/MB),
1448 "Size of slab page (value in megabytes)")
1449 ("stats",
1450 "Print basic statistics periodically (every second)")
1451 ("port", bpo::value<uint16_t>()->default_value(11211),
1452 "Specify UDP and TCP ports for memcached server to listen on")
1453 ;
1454
1455 return app.run_deprecated(ac, av, [&] {
1456 engine().at_exit([&] { return tcp_server.stop(); });
1457 engine().at_exit([&] { return udp_server.stop(); });
1458 engine().at_exit([&] { return cache_peers.stop(); });
1459 engine().at_exit([&] { return system_stats.stop(); });
1460
1461 auto&& config = app.configuration();
1462 uint16_t port = config["port"].as<uint16_t>();
1463 uint64_t per_cpu_slab_size = config["max-slab-size"].as<uint64_t>() * MB;
1464 uint64_t slab_page_size = config["slab-page-size"].as<uint64_t>() * MB;
1465 return cache_peers.start(std::move(per_cpu_slab_size), std::move(slab_page_size)).then([&system_stats] {
1466 return system_stats.start(memcache::clock_type::now());
1467 }).then([&] {
1468 std::cout << PLATFORM << " memcached " << VERSION << "\n";
1469 return make_ready_future<>();
1470 }).then([&, port] {
1471 return tcp_server.start(std::ref(cache), std::ref(system_stats), port);
1472 }).then([&tcp_server] {
1473 return tcp_server.invoke_on_all(&memcache::tcp_server::start);
1474 }).then([&, port] {
1475 if (engine().net().has_per_core_namespace()) {
1476 return udp_server.start(std::ref(cache), std::ref(system_stats), port);
1477 } else {
1478 return udp_server.start_single(std::ref(cache), std::ref(system_stats), port);
1479 }
1480 }).then([&] {
1481 return udp_server.invoke_on_all(&memcache::udp_server::set_max_datagram_size,
1482 (size_t)config["max-datagram-size"].as<int>());
1483 }).then([&] {
1484 return udp_server.invoke_on_all(&memcache::udp_server::start);
1485 }).then([&stats, start_stats = config.count("stats")] {
1486 if (start_stats) {
1487 stats.start();
1488 }
1489 });
1490 });
1491 }