]>
git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/alien.cc
1 // -*- mode:C++; tab-width:4; c-basic-offset:4; indent-tabs-mode:nil -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2018 Red Hat
23 #include <seastar/core/alien.hh>
24 #include <seastar/core/reactor.hh>
25 #include <seastar/core/metrics.hh>
26 #include <seastar/core/prefetch.hh>
31 message_queue::message_queue(reactor
*to
)
35 void message_queue::stop() {
40 message_queue::lf_queue::maybe_wakeup() {
41 // see also smp_message_queue::lf_queue::maybe_wakeup()
42 std::atomic_signal_fence(std::memory_order_seq_cst
);
43 if (remote
->_sleeping
.load(std::memory_order_relaxed
)) {
44 remote
->_sleeping
.store(false, std::memory_order_relaxed
);
49 void message_queue::submit_item(std::unique_ptr
<message_queue::work_item
> item
) {
50 if (!_pending
.push(item
.get())) {
51 throw std::bad_alloc();
54 _pending
.maybe_wakeup();
58 bool message_queue::pure_poll_rx() const {
59 return !_pending
.empty();
62 template<typename Func
>
63 size_t message_queue::process_queue(lf_queue
& q
, Func process
) {
64 // copy batch to local memory in order to minimize
65 // time in which cross-cpu data is accessed
70 work_item
* items
[batch_size
+ prefetch_cnt
];
71 // start prefetching first item before popping the rest to overlap memory
72 // access with potential cache miss the second pop may cause
75 while (nr
< batch_size
&& q
.pop(items
[nr
])) {
78 std::fill(std::begin(items
) + nr
, std::begin(items
) + nr
+ prefetch_cnt
, nr
? items
[nr
- 1] : wi
);
81 prefetch_n
<2>(std::begin(items
) + i
, std::begin(items
) + i
+ prefetch_cnt
);
89 size_t message_queue::process_incoming() {
90 if (_pending
.empty()) {
93 auto nr
= process_queue(_pending
, [] (work_item
* wi
) {
102 void message_queue::start() {
103 namespace sm
= seastar::metrics
;
105 std::snprintf(instance
, sizeof(instance
), "%u", this_shard_id());
106 _metrics
.add_group("alien", {
107 // Absolute value of num packets in last tx batch.
108 sm::make_queue_length("receive_batch_queue_length", _last_rcv_batch
, sm::description("Current receive batch queue length")),
109 // total_operations value:DERIVE:0:U
110 sm::make_derive("total_received_messages", _received
, sm::description("Total number of received messages")),
111 // total_operations value:DERIVE:0:U
112 sm::make_derive("total_sent_messages", [this] { return _sent
.value
.load(); }, sm::description("Total number of sent messages")),
117 void smp::qs_deleter::operator()(alien::message_queue
* qs
) const {
118 for (unsigned i
= 0; i
< count
; i
++) {
119 qs
[i
].~message_queue();
121 ::operator delete[](qs
);
126 smp::qs
smp::create_qs(const std::vector
<reactor
*>& reactors
) {
127 auto queues
= reinterpret_cast<alien::message_queue
*>(operator new[] (sizeof(alien::message_queue
) * reactors
.size()));
128 for (unsigned i
= 0; i
< reactors
.size(); i
++) {
129 new (&queues
[i
]) alien::message_queue(reactors
[i
]);
131 return qs
{queues
, smp::qs_deleter
{static_cast<unsigned>(reactors
.size())}};
134 bool smp::poll_queues() {
135 auto& queue
= _qs
[this_shard_id()];
136 return queue
.process_incoming() != 0;
139 bool smp::pure_poll_queues() {
140 auto& queue
= _qs
[this_shard_id()];
141 return queue
.pure_poll_rx();