]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/io_queue.cc
722eb447dff41f5a7c8ab62e236e75e578a27ce0
[ceph.git] / ceph / src / seastar / src / core / io_queue.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21
22
23 #include <boost/intrusive/parent_from_member.hpp>
24 #include <seastar/core/file.hh>
25 #include <seastar/core/fair_queue.hh>
26 #include <seastar/core/io_queue.hh>
27 #include <seastar/core/io_intent.hh>
28 #include <seastar/core/reactor.hh>
29 #include <seastar/core/metrics.hh>
30 #include <seastar/core/linux-aio.hh>
31 #include <seastar/core/internal/io_desc.hh>
32 #include <seastar/core/internal/io_sink.hh>
33 #include <seastar/core/io_priority_class.hh>
34 #include <seastar/util/log.hh>
35 #include <chrono>
36 #include <mutex>
37 #include <array>
38 #include <fmt/format.h>
39 #include <fmt/ostream.h>
40
41 namespace seastar {
42
43 logger io_log("io");
44
45 using namespace std::chrono_literals;
46 using namespace internal::linux_abi;
47
48 struct default_io_exception_factory {
49 static auto cancelled() {
50 return cancelled_error();
51 }
52 };
53
54 class io_queue::priority_class_data {
55 const io_priority_class _pc;
56 uint32_t _shares;
57 struct {
58 size_t bytes = 0;
59 uint64_t ops = 0;
60
61 void add(size_t len) noexcept {
62 ops++;
63 bytes += len;
64 }
65 } _rwstat[2] = {};
66 uint32_t _nr_queued;
67 uint32_t _nr_executing;
68 std::chrono::duration<double> _queue_time;
69 std::chrono::duration<double> _total_queue_time;
70 std::chrono::duration<double> _total_execution_time;
71 std::chrono::duration<double> _starvation_time;
72 io_queue::clock_type::time_point _activated;
73 metrics::metric_groups _metric_groups;
74
75 void register_stats(sstring name, sstring mountpoint);
76
77 public:
78 void rename(sstring new_name, sstring mountpoint);
79 void update_shares(uint32_t shares) noexcept {
80 _shares = std::max(shares, 1u);
81 }
82
83 priority_class_data(io_priority_class pc, uint32_t shares, sstring name, sstring mountpoint)
84 : _pc(pc)
85 , _shares(shares)
86 , _nr_queued(0)
87 , _nr_executing(0)
88 , _queue_time(0)
89 , _total_queue_time(0)
90 , _total_execution_time(0)
91 , _starvation_time(0)
92 {
93 register_stats(name, mountpoint);
94 }
95
96 void on_queue() noexcept {
97 _nr_queued++;
98 if (_nr_executing == 0 && _nr_queued == 1) {
99 _activated = io_queue::clock_type::now();
100 }
101 }
102
103 void on_dispatch(internal::io_direction_and_length dnl, std::chrono::duration<double> lat) noexcept {
104 _rwstat[dnl.rw_idx()].add(dnl.length());
105 _queue_time = lat;
106 _total_queue_time += lat;
107 _nr_queued--;
108 _nr_executing++;
109 if (_nr_executing == 1) {
110 _starvation_time += io_queue::clock_type::now() - _activated;
111 }
112 }
113
114 void on_cancel() noexcept {
115 _nr_queued--;
116 }
117
118 void on_complete(std::chrono::duration<double> lat) noexcept {
119 _total_execution_time += lat;
120 _nr_executing--;
121 if (_nr_executing == 0 && _nr_queued != 0) {
122 _activated = io_queue::clock_type::now();
123 }
124 }
125
126 void on_error() noexcept {
127 _nr_executing--;
128 if (_nr_executing == 0 && _nr_queued != 0) {
129 _activated = io_queue::clock_type::now();
130 }
131 }
132
133 fair_queue::class_id fq_class() const noexcept { return _pc.id(); }
134 };
135
136 class io_desc_read_write final : public io_completion {
137 io_queue& _ioq;
138 io_queue::priority_class_data& _pclass;
139 io_queue::clock_type::time_point _dispatched;
140 const stream_id _stream;
141 fair_queue_ticket _fq_ticket;
142 promise<size_t> _pr;
143
144 public:
145 io_desc_read_write(io_queue& ioq, io_queue::priority_class_data& pc, stream_id stream, fair_queue_ticket ticket)
146 : _ioq(ioq)
147 , _pclass(pc)
148 , _stream(stream)
149 , _fq_ticket(ticket)
150 {}
151
152 virtual void set_exception(std::exception_ptr eptr) noexcept override {
153 io_log.trace("dev {} : req {} error", _ioq.dev_id(), fmt::ptr(this));
154 _pclass.on_error();
155 _ioq.complete_request(*this);
156 _pr.set_exception(eptr);
157 delete this;
158 }
159
160 virtual void complete(size_t res) noexcept override {
161 io_log.trace("dev {} : req {} complete", _ioq.dev_id(), fmt::ptr(this));
162 auto now = io_queue::clock_type::now();
163 _pclass.on_complete(std::chrono::duration_cast<std::chrono::duration<double>>(now - _dispatched));
164 _ioq.complete_request(*this);
165 _pr.set_value(res);
166 delete this;
167 }
168
169 void cancel() noexcept {
170 _pclass.on_cancel();
171 _pr.set_exception(std::make_exception_ptr(default_io_exception_factory::cancelled()));
172 delete this;
173 }
174
175 void dispatch(internal::io_direction_and_length dnl, io_queue::clock_type::time_point queued) noexcept {
176 auto now = io_queue::clock_type::now();
177 _pclass.on_dispatch(dnl, std::chrono::duration_cast<std::chrono::duration<double>>(now - queued));
178 _dispatched = now;
179 }
180
181 future<size_t> get_future() {
182 return _pr.get_future();
183 }
184
185 fair_queue_ticket ticket() const noexcept { return _fq_ticket; }
186 stream_id stream() const noexcept { return _stream; }
187 };
188
189 class queued_io_request : private internal::io_request {
190 io_queue& _ioq;
191 internal::io_direction_and_length _dnl;
192 io_queue::clock_type::time_point _started;
193 const stream_id _stream;
194 fair_queue_entry _fq_entry;
195 internal::cancellable_queue::link _intent;
196 std::unique_ptr<io_desc_read_write> _desc;
197
198 bool is_cancelled() const noexcept { return !_desc; }
199
200 public:
201 queued_io_request(internal::io_request req, io_queue& q, io_queue::priority_class_data& pc, internal::io_direction_and_length dnl)
202 : io_request(std::move(req))
203 , _ioq(q)
204 , _dnl(std::move(dnl))
205 , _started(io_queue::clock_type::now())
206 , _stream(_ioq.request_stream(_dnl))
207 , _fq_entry(_ioq.request_fq_ticket(dnl))
208 , _desc(std::make_unique<io_desc_read_write>(_ioq, pc, _stream, _fq_entry.ticket()))
209 {
210 io_log.trace("dev {} : req {} queue len {} ticket {}", _ioq.dev_id(), fmt::ptr(&*_desc), _dnl.length(), _fq_entry.ticket());
211 }
212
213 queued_io_request(queued_io_request&&) = delete;
214
215 void dispatch() noexcept {
216 if (is_cancelled()) {
217 _ioq.complete_cancelled_request(*this);
218 delete this;
219 return;
220 }
221
222 io_log.trace("dev {} : req {} submit", _ioq.dev_id(), fmt::ptr(&*_desc));
223 _intent.maybe_dequeue();
224 _desc->dispatch(_dnl, _started);
225 _ioq.submit_request(_desc.release(), std::move(*this));
226 delete this;
227 }
228
229 void cancel() noexcept {
230 _ioq.cancel_request(*this);
231 _desc.release()->cancel();
232 }
233
234 void set_intent(internal::cancellable_queue* cq) noexcept {
235 _intent.enqueue(cq);
236 }
237
238 future<size_t> get_future() noexcept { return _desc->get_future(); }
239 fair_queue_entry& queue_entry() noexcept { return _fq_entry; }
240 stream_id stream() const noexcept { return _stream; }
241
242 static queued_io_request& from_fq_entry(fair_queue_entry& ent) noexcept {
243 return *boost::intrusive::get_parent_from_member(&ent, &queued_io_request::_fq_entry);
244 }
245
246 static queued_io_request& from_cq_link(internal::cancellable_queue::link& link) noexcept {
247 return *boost::intrusive::get_parent_from_member(&link, &queued_io_request::_intent);
248 }
249 };
250
251 internal::cancellable_queue::cancellable_queue(cancellable_queue&& o) noexcept
252 : _first(std::exchange(o._first, nullptr))
253 , _rest(std::move(o._rest)) {
254 if (_first != nullptr) {
255 _first->_ref = this;
256 }
257 }
258
259 internal::cancellable_queue& internal::cancellable_queue::operator=(cancellable_queue&& o) noexcept {
260 if (this != &o) {
261 _first = std::exchange(o._first, nullptr);
262 _rest = std::move(o._rest);
263 if (_first != nullptr) {
264 _first->_ref = this;
265 }
266 }
267 return *this;
268 }
269
270 internal::cancellable_queue::~cancellable_queue() {
271 while (_first != nullptr) {
272 queued_io_request::from_cq_link(*_first).cancel();
273 pop_front();
274 }
275 }
276
277 void internal::cancellable_queue::push_back(link& il) noexcept {
278 if (_first == nullptr) {
279 _first = &il;
280 il._ref = this;
281 } else {
282 new (&il._hook) bi::slist_member_hook<>();
283 _rest.push_back(il);
284 }
285 }
286
287 void internal::cancellable_queue::pop_front() noexcept {
288 _first->_ref = nullptr;
289 if (_rest.empty()) {
290 _first = nullptr;
291 } else {
292 _first = &_rest.front();
293 _rest.pop_front();
294 _first->_hook.~slist_member_hook<>();
295 _first->_ref = this;
296 }
297 }
298
299 internal::intent_reference::intent_reference(io_intent* intent) noexcept : _intent(intent) {
300 if (_intent != nullptr) {
301 intent->_refs.bind(*this);
302 }
303 }
304
305 io_intent* internal::intent_reference::retrieve() const {
306 if (is_cancelled()) {
307 throw default_io_exception_factory::cancelled();
308 }
309
310 return _intent;
311 }
312
313 void
314 io_queue::complete_request(io_desc_read_write& desc) noexcept {
315 _requests_executing--;
316 _streams[desc.stream()].notify_request_finished(desc.ticket());
317 }
318
319 fair_queue::config io_queue::make_fair_queue_config(const config& iocfg) {
320 fair_queue::config cfg;
321 cfg.ticket_weight_pace = iocfg.disk_us_per_request / read_request_base_count;
322 cfg.ticket_size_pace = (iocfg.disk_us_per_byte * (1 << request_ticket_size_shift)) / read_request_base_count;
323 return cfg;
324 }
325
326 io_queue::io_queue(io_group_ptr group, internal::io_sink& sink)
327 : _priority_classes()
328 , _group(std::move(group))
329 , _sink(sink)
330 {
331 auto fq_cfg = make_fair_queue_config(get_config());
332 _streams.emplace_back(*_group->_fgs[0], fq_cfg);
333 if (get_config().duplex) {
334 _streams.emplace_back(*_group->_fgs[1], fq_cfg);
335 }
336 seastar_logger.debug("Created io queue, multipliers {}:{}",
337 get_config().disk_req_write_to_read_multiplier,
338 get_config().disk_bytes_write_to_read_multiplier);
339 }
340
341 fair_group::config io_group::make_fair_group_config(const io_queue::config& qcfg) noexcept {
342 /*
343 * It doesn't make sense to configure requests limit higher than
344 * it can be if the queue is full of minimal requests. At the same
345 * time setting too large value increases the chances to overflow
346 * the group rovers and lock-up the queue.
347 *
348 * The same is technically true for bytes limit, but the group
349 * rovers are configured in blocks (ticket size shift), and this
350 * already makes a good protection.
351 */
352 auto max_req_count = std::min(qcfg.max_req_count,
353 qcfg.max_bytes_count / io_queue::minimal_request_size);
354 auto max_req_count_min = std::max(io_queue::read_request_base_count, qcfg.disk_req_write_to_read_multiplier);
355 /*
356 * Read requests weight read_request_base_count, writes weight
357 * disk_req_write_to_read_multiplier. The fair queue limit must
358 * be enough to pass the largest one through. The same is true
359 * for request sizes, but that check is done run-time, see the
360 * request_fq_ticket() method.
361 */
362 if (max_req_count < max_req_count_min) {
363 seastar_logger.warn("The disk request rate is too low, configuring it to {}, but you may experience latency problems", max_req_count_min);
364 max_req_count = max_req_count_min;
365 }
366 return fair_group::config(max_req_count,
367 qcfg.max_bytes_count >> io_queue::request_ticket_size_shift);
368 }
369
370 io_group::io_group(io_queue::config io_cfg) noexcept
371 : _config(std::move(io_cfg))
372 {
373 auto fg_cfg = make_fair_group_config(_config);
374 _fgs.push_back(std::make_unique<fair_group>(fg_cfg));
375 if (_config.duplex) {
376 _fgs.push_back(std::make_unique<fair_group>(fg_cfg));
377 }
378 seastar_logger.debug("Created io group, limits {}:{}", _config.max_req_count, _config.max_bytes_count);
379 }
380
381 io_queue::~io_queue() {
382 // It is illegal to stop the I/O queue with pending requests.
383 // Technically we would use a gate to guarantee that. But here, it is not
384 // needed since this is expected to be destroyed only after the reactor is destroyed.
385 //
386 // And that will happen only when there are no more fibers to run. If we ever change
387 // that, then this has to change.
388 for (auto&& pc_data : _priority_classes) {
389 if (pc_data) {
390 for (auto&& s : _streams) {
391 s.unregister_priority_class(pc_data->fq_class());
392 }
393 }
394 }
395 }
396
397 std::mutex io_priority_class::_register_lock;
398 std::array<io_priority_class::class_info, io_priority_class::_max_classes> io_priority_class::_infos;
399
400 unsigned io_priority_class::get_shares() const {
401 return _infos.at(_id).shares;
402 }
403
404 sstring io_priority_class::get_name() const {
405 std::lock_guard<std::mutex> lock(_register_lock);
406 return _infos.at(_id).name;
407 }
408
409 io_priority_class io_priority_class::register_one(sstring name, uint32_t shares) {
410 std::lock_guard<std::mutex> lock(_register_lock);
411 for (unsigned i = 0; i < _max_classes; ++i) {
412 if (!_infos[i].registered()) {
413 _infos[i].shares = shares;
414 _infos[i].name = std::move(name);
415 } else if (_infos[i].name != name) {
416 continue;
417 } else {
418 // found an entry matching the name to be registered,
419 // make sure it was registered with the same number shares
420 // Note: those may change dynamically later on in the
421 // fair queue
422 assert(_infos[i].shares == shares);
423 }
424 return io_priority_class(i);
425 }
426 throw std::runtime_error("No more room for new I/O priority classes");
427 }
428
429 future<> io_priority_class::update_shares(uint32_t shares) const {
430 // Keep registered shares intact, just update the ones
431 // on reactor queues
432 return engine().update_shares_for_queues(*this, shares);
433 }
434
435 bool io_priority_class::rename_registered(sstring new_name) {
436 std::lock_guard<std::mutex> guard(_register_lock);
437 for (unsigned i = 0; i < _max_classes; ++i) {
438 if (!_infos[i].registered()) {
439 break;
440 }
441 if (_infos[i].name == new_name) {
442 if (i == id()) {
443 return false;
444 } else {
445 io_log.error("trying to rename priority class with id {} to \"{}\" but that name already exists", id(), new_name);
446 throw std::runtime_error(format("rename priority class: an attempt was made to rename a priority class to an"
447 " already existing name ({})", new_name));
448 }
449 }
450 }
451 _infos[id()].name = new_name;
452 return true;
453 }
454
455 future<> io_priority_class::rename(sstring new_name) noexcept {
456 return futurize_invoke([this, new_name = std::move(new_name)] () mutable {
457 // Taking the lock here will prevent from newly registered classes
458 // to register under the old name (and will prevent undefined
459 // behavior since this array is shared cross shards. However, it
460 // doesn't prevent the case where a newly registered class (that
461 // got registered right after the lock release) will be unnecessarily
462 // renamed. This is not a real problem and it is a lot better than
463 // holding the lock until all cross shard activity is over.
464
465 if (!rename_registered(new_name)) {
466 return make_ready_future<>();
467 }
468
469 return smp::invoke_on_all([this, new_name = std::move(new_name)] {
470 return engine().rename_queues(*this, new_name);
471 });
472 });
473 }
474
475 seastar::metrics::label io_queue_shard("ioshard");
476
477 void
478 io_queue::priority_class_data::rename(sstring new_name, sstring mountpoint) {
479 try {
480 register_stats(new_name, mountpoint);
481 } catch (metrics::double_registration &e) {
482 // we need to ignore this exception, since it can happen that
483 // a class that was already created with the new name will be
484 // renamed again (this will cause a double registration exception
485 // to be thrown).
486 }
487
488 }
489
490 void
491 io_queue::priority_class_data::register_stats(sstring name, sstring mountpoint) {
492 shard_id owner = this_shard_id();
493 seastar::metrics::metric_groups new_metrics;
494 namespace sm = seastar::metrics;
495 auto shard = sm::impl::shard();
496
497 auto ioq_group = sm::label("mountpoint");
498 auto mountlabel = ioq_group(mountpoint);
499
500 auto class_label_type = sm::label("class");
501 auto class_label = class_label_type(name);
502 new_metrics.add_group("io_queue", {
503 sm::make_derive("total_bytes", [this] {
504 return _rwstat[internal::io_direction_and_length::read_idx].bytes + _rwstat[internal::io_direction_and_length::write_idx].bytes;
505 }, sm::description("Total bytes passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
506 sm::make_derive("total_operations", [this] {
507 return _rwstat[internal::io_direction_and_length::read_idx].ops + _rwstat[internal::io_direction_and_length::write_idx].ops;
508 }, sm::description("Total operations passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
509 sm::make_derive("total_read_bytes", _rwstat[internal::io_direction_and_length::read_idx].bytes,
510 sm::description("Total read bytes passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
511 sm::make_derive("total_read_ops", _rwstat[internal::io_direction_and_length::read_idx].ops,
512 sm::description("Total read operations passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
513 sm::make_derive("total_write_bytes", _rwstat[internal::io_direction_and_length::write_idx].bytes,
514 sm::description("Total write bytes passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
515 sm::make_derive("total_write_ops", _rwstat[internal::io_direction_and_length::write_idx].ops,
516 sm::description("Total write operations passed in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
517 sm::make_derive("total_delay_sec", [this] {
518 return _total_queue_time.count();
519 }, sm::description("Total time spent in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
520 sm::make_derive("total_exec_sec", [this] {
521 return _total_execution_time.count();
522 }, sm::description("Total time spent in disk"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
523 sm::make_derive("starvation_time_sec", [this] {
524 auto st = _starvation_time;
525 if (_nr_queued != 0 && _nr_executing == 0) {
526 st += io_queue::clock_type::now() - _activated;
527 }
528 return st.count();
529 }, sm::description("Total time spent starving for disk"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
530
531 // Note: The counter below is not the same as reactor's queued-io-requests
532 // queued-io-requests shows us how many requests in total exist in this I/O Queue.
533 //
534 // This counter lives in the priority class, so it will count only queued requests
535 // that belong to that class.
536 //
537 // In other words: the new counter tells you how busy a class is, and the
538 // old counter tells you how busy the system is.
539
540 sm::make_queue_length("queue_length", _nr_queued, sm::description("Number of requests in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
541 sm::make_queue_length("disk_queue_length", _nr_executing, sm::description("Number of requests in the disk"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
542 sm::make_gauge("delay", [this] {
543 return _queue_time.count();
544 }, sm::description("random delay time in the queue"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label}),
545 sm::make_gauge("shares", _shares, sm::description("current amount of shares"), {io_queue_shard(shard), sm::shard_label(owner), mountlabel, class_label})
546 });
547 _metric_groups = std::exchange(new_metrics, {});
548 }
549
550 io_queue::priority_class_data& io_queue::find_or_create_class(const io_priority_class& pc) {
551 auto id = pc.id();
552 if (id >= _priority_classes.size()) {
553 _priority_classes.resize(id + 1);
554 }
555 if (!_priority_classes[id]) {
556 auto shares = pc.get_shares();
557 auto name = pc.get_name();
558
559 // A note on naming:
560 //
561 // We could just add the owner as the instance id and have something like:
562 // io_queue-<class_owner>-<counter>-<class_name>
563 //
564 // However, when there are more than one shard per I/O queue, it is very useful
565 // to know which shards are being served by the same queue. Therefore, a better name
566 // scheme is:
567 //
568 // io_queue-<queue_owner>-<counter>-<class_name>, shard=<class_owner>
569 // using the shard label to hold the owner number
570 //
571 // This conveys all the information we need and allows one to easily group all classes from
572 // the same I/O queue (by filtering by shard)
573 for (auto&& s : _streams) {
574 s.register_priority_class(id, shares);
575 }
576 auto pc_data = std::make_unique<priority_class_data>(pc, shares, name, mountpoint());
577
578 _priority_classes[id] = std::move(pc_data);
579 }
580 return *_priority_classes[id];
581 }
582
583 stream_id io_queue::request_stream(internal::io_direction_and_length dnl) const noexcept {
584 return get_config().duplex ? dnl.rw_idx() : 0;
585 }
586
587 fair_queue_ticket io_queue::request_fq_ticket(internal::io_direction_and_length dnl) const noexcept {
588 unsigned weight;
589 size_t size;
590
591 if (dnl.is_write()) {
592 weight = get_config().disk_req_write_to_read_multiplier;
593 size = get_config().disk_bytes_write_to_read_multiplier * dnl.length();
594 } else {
595 weight = io_queue::read_request_base_count;
596 size = io_queue::read_request_base_count * dnl.length();
597 }
598
599 static thread_local size_t oversize_warning_threshold = 0;
600
601 if (size >= get_config().max_bytes_count) {
602 if (size > oversize_warning_threshold) {
603 oversize_warning_threshold = size;
604 io_log.warn("oversized request (length {}) submitted. "
605 "dazed and confuzed, trimming its weight from {} down to {}", dnl.length(),
606 size >> request_ticket_size_shift,
607 get_config().max_bytes_count >> request_ticket_size_shift);
608 }
609 size = get_config().max_bytes_count;
610 }
611
612 return fair_queue_ticket(weight, size >> request_ticket_size_shift);
613 }
614
615 io_queue::request_limits io_queue::get_request_limits() const noexcept {
616 request_limits l;
617 l.max_read = align_down<size_t>(std::min<size_t>(get_config().disk_read_saturation_length, get_config().max_bytes_count / read_request_base_count), minimal_request_size);
618 l.max_write = align_down<size_t>(std::min<size_t>(get_config().disk_write_saturation_length, get_config().max_bytes_count / get_config().disk_bytes_write_to_read_multiplier), minimal_request_size);
619 return l;
620 }
621
622 future<size_t>
623 io_queue::queue_request(const io_priority_class& pc, size_t len, internal::io_request req, io_intent* intent) noexcept {
624 return futurize_invoke([&pc, len, req = std::move(req), this, intent] () mutable {
625 // First time will hit here, and then we create the class. It is important
626 // that we create the shared pointer in the same shard it will be used at later.
627 auto& pclass = find_or_create_class(pc);
628 internal::io_direction_and_length dnl(req, len);
629 auto queued_req = std::make_unique<queued_io_request>(std::move(req), *this, pclass, std::move(dnl));
630 auto fut = queued_req->get_future();
631 internal::cancellable_queue* cq = nullptr;
632 if (intent != nullptr) {
633 cq = &intent->find_or_create_cancellable_queue(dev_id(), pc.id());
634 }
635
636 _streams[queued_req->stream()].queue(pclass.fq_class(), queued_req->queue_entry());
637 queued_req->set_intent(cq);
638 queued_req.release();
639 pclass.on_queue();
640 _queued_requests++;
641 return fut;
642 });
643 }
644
645 void io_queue::poll_io_queue() {
646 for (auto&& st : _streams) {
647 st.dispatch_requests([] (fair_queue_entry& fqe) {
648 queued_io_request::from_fq_entry(fqe).dispatch();
649 });
650 }
651 }
652
653 void io_queue::submit_request(io_desc_read_write* desc, internal::io_request req) noexcept {
654 _queued_requests--;
655 _requests_executing++;
656 _sink.submit(desc, std::move(req));
657 }
658
659 void io_queue::cancel_request(queued_io_request& req) noexcept {
660 _queued_requests--;
661 _streams[req.stream()].notify_request_cancelled(req.queue_entry());
662 }
663
664 void io_queue::complete_cancelled_request(queued_io_request& req) noexcept {
665 _streams[req.stream()].notify_request_finished(req.queue_entry().ticket());
666 }
667
668 io_queue::clock_type::time_point io_queue::next_pending_aio() const noexcept {
669 clock_type::time_point next = clock_type::time_point::max();
670
671 for (const auto& s : _streams) {
672 clock_type::time_point n = s.next_pending_aio();
673 if (n < next) {
674 next = std::move(n);
675 }
676 }
677
678 return next;
679 }
680
681 future<>
682 io_queue::update_shares_for_class(const io_priority_class pc, size_t new_shares) {
683 return futurize_invoke([this, pc, new_shares] {
684 auto& pclass = find_or_create_class(pc);
685 pclass.update_shares(new_shares);
686 for (auto&& s : _streams) {
687 s.update_shares_for_class(pclass.fq_class(), new_shares);
688 }
689 });
690 }
691
692 void
693 io_queue::rename_priority_class(io_priority_class pc, sstring new_name) {
694 if (_priority_classes.size() > pc.id() &&
695 _priority_classes[pc.id()]) {
696 _priority_classes[pc.id()]->rename(new_name, get_config().mountpoint);
697 }
698 }
699
700 void internal::io_sink::submit(io_completion* desc, internal::io_request req) noexcept {
701 try {
702 _pending_io.emplace_back(std::move(req), desc);
703 } catch (...) {
704 desc->set_exception(std::current_exception());
705 }
706 }
707
708 }