]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/alien.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / alien.cc
1 // -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
2 /*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 /*
20 * Copyright (C) 2018 Red Hat
21 */
22
23 #include <seastar/core/alien.hh>
24 #include <seastar/core/reactor.hh>
25 #include <seastar/core/metrics.hh>
26 #include <seastar/core/prefetch.hh>
27
28 namespace seastar {
29 namespace alien {
30
31 message_queue::message_queue(reactor *to)
32 : _pending(to)
33 {}
34
35 void message_queue::stop() {
36 _metrics.clear();
37 }
38
39 void
40 message_queue::lf_queue::maybe_wakeup() {
41 // see also smp_message_queue::lf_queue::maybe_wakeup()
42 std::atomic_signal_fence(std::memory_order_seq_cst);
43 if (remote->_sleeping.load(std::memory_order_relaxed)) {
44 remote->_sleeping.store(false, std::memory_order_relaxed);
45 remote->wakeup();
46 }
47 }
48
49 void message_queue::submit_item(std::unique_ptr<message_queue::work_item> item) {
50 if (!_pending.push(item.get())) {
51 throw std::bad_alloc();
52 }
53 item.release();
54 _pending.maybe_wakeup();
55 ++_sent.value;
56 }
57
58 bool message_queue::pure_poll_rx() const {
59 return !_pending.empty();
60 }
61
62 template<typename Func>
63 size_t message_queue::process_queue(lf_queue& q, Func process) {
64 // copy batch to local memory in order to minimize
65 // time in which cross-cpu data is accessed
66 work_item* wi;
67 if (!q.pop(wi)) {
68 return 0;
69 }
70 work_item* items[batch_size + prefetch_cnt];
71 // start prefetching first item before popping the rest to overlap memory
72 // access with potential cache miss the second pop may cause
73 prefetch<2>(wi);
74 size_t nr = 0;
75 while (nr < batch_size && q.pop(items[nr])) {
76 ++nr;
77 }
78 std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nr ? items[nr - 1] : wi);
79 unsigned i = 0;
80 do {
81 prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + prefetch_cnt);
82 process(wi);
83 wi = items[i++];
84 } while (i <= nr);
85
86 return nr + 1;
87 }
88
89 size_t message_queue::process_incoming() {
90 if (_pending.empty()) {
91 return 0;
92 }
93 auto nr = process_queue(_pending, [] (work_item* wi) {
94 wi->process();
95 delete wi;
96 });
97 _received += nr;
98 _last_rcv_batch = nr;
99 return nr;
100 }
101
102 void message_queue::start() {
103 namespace sm = seastar::metrics;
104 char instance[10];
105 std::snprintf(instance, sizeof(instance), "%u", this_shard_id());
106 _metrics.add_group("alien", {
107 // Absolute value of num packets in last tx batch.
108 sm::make_queue_length("receive_batch_queue_length", _last_rcv_batch, sm::description("Current receive batch queue length")),
109 // total_operations value:DERIVE:0:U
110 sm::make_derive("total_received_messages", _received, sm::description("Total number of received messages")),
111 // total_operations value:DERIVE:0:U
112 sm::make_derive("total_sent_messages", [this] { return _sent.value.load(); }, sm::description("Total number of sent messages")),
113 });
114 }
115
116
117 void smp::qs_deleter::operator()(alien::message_queue* qs) const {
118 for (unsigned i = 0; i < count; i++) {
119 qs[i].~message_queue();
120 }
121 ::operator delete[](qs);
122 }
123
124 smp::qs smp::_qs;
125
126 smp::qs smp::create_qs(const std::vector<reactor*>& reactors) {
127 auto queues = reinterpret_cast<alien::message_queue*>(operator new[] (sizeof(alien::message_queue) * reactors.size()));
128 for (unsigned i = 0; i < reactors.size(); i++) {
129 new (&queues[i]) alien::message_queue(reactors[i]);
130 }
131 return qs{queues, smp::qs_deleter{static_cast<unsigned>(reactors.size())}};
132 }
133
134 bool smp::poll_queues() {
135 auto& queue = _qs[this_shard_id()];
136 return queue.process_incoming() != 0;
137 }
138
139 bool smp::pure_poll_queues() {
140 auto& queue = _qs[this_shard_id()];
141 return queue.pure_poll_rx();
142 }
143
144 }
145 }