2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
23 #include <boost/intrusive/parent_from_member.hpp>
24 #include <seastar/core/file.hh>
25 #include <seastar/core/fair_queue.hh>
26 #include <seastar/core/io_queue.hh>
27 #include <seastar/core/io_intent.hh>
28 #include <seastar/core/reactor.hh>
29 #include <seastar/core/metrics.hh>
30 #include <seastar/core/linux-aio.hh>
31 #include <seastar/core/internal/io_desc.hh>
32 #include <seastar/core/internal/io_sink.hh>
33 #include <seastar/core/io_priority_class.hh>
34 #include <seastar/util/log.hh>
38 #include <fmt/format.h>
39 #include <fmt/ostream.h>
45 using namespace std::chrono_literals
;
46 using namespace internal::linux_abi
;
48 struct default_io_exception_factory
{
49 static auto cancelled() {
50 return cancelled_error();
54 class io_queue::priority_class_data
{
55 const io_priority_class _pc
;
61 void add(size_t len
) noexcept
{
67 uint32_t _nr_executing
;
68 std::chrono::duration
<double> _queue_time
;
69 std::chrono::duration
<double> _total_queue_time
;
70 std::chrono::duration
<double> _total_execution_time
;
71 std::chrono::duration
<double> _starvation_time
;
72 io_queue::clock_type::time_point _activated
;
73 metrics::metric_groups _metric_groups
;
75 void register_stats(sstring name
, sstring mountpoint
);
78 void rename(sstring new_name
, sstring mountpoint
);
79 void update_shares(uint32_t shares
) noexcept
{
80 _shares
= std::max(shares
, 1u);
83 priority_class_data(io_priority_class pc
, uint32_t shares
, sstring name
, sstring mountpoint
)
89 , _total_queue_time(0)
90 , _total_execution_time(0)
93 register_stats(name
, mountpoint
);
96 void on_queue() noexcept
{
98 if (_nr_executing
== 0 && _nr_queued
== 1) {
99 _activated
= io_queue::clock_type::now();
103 void on_dispatch(internal::io_direction_and_length dnl
, std::chrono::duration
<double> lat
) noexcept
{
104 _rwstat
[dnl
.rw_idx()].add(dnl
.length());
106 _total_queue_time
+= lat
;
109 if (_nr_executing
== 1) {
110 _starvation_time
+= io_queue::clock_type::now() - _activated
;
114 void on_cancel() noexcept
{
118 void on_complete(std::chrono::duration
<double> lat
) noexcept
{
119 _total_execution_time
+= lat
;
121 if (_nr_executing
== 0 && _nr_queued
!= 0) {
122 _activated
= io_queue::clock_type::now();
126 void on_error() noexcept
{
128 if (_nr_executing
== 0 && _nr_queued
!= 0) {
129 _activated
= io_queue::clock_type::now();
133 fair_queue::class_id
fq_class() const noexcept
{ return _pc
.id(); }
136 class io_desc_read_write final
: public io_completion
{
138 io_queue::priority_class_data
& _pclass
;
139 io_queue::clock_type::time_point _dispatched
;
140 const stream_id _stream
;
141 fair_queue_ticket _fq_ticket
;
145 io_desc_read_write(io_queue
& ioq
, io_queue::priority_class_data
& pc
, stream_id stream
, fair_queue_ticket ticket
)
152 virtual void set_exception(std::exception_ptr eptr
) noexcept override
{
153 io_log
.trace("dev {} : req {} error", _ioq
.dev_id(), fmt::ptr(this));
155 _ioq
.complete_request(*this);
156 _pr
.set_exception(eptr
);
160 virtual void complete(size_t res
) noexcept override
{
161 io_log
.trace("dev {} : req {} complete", _ioq
.dev_id(), fmt::ptr(this));
162 auto now
= io_queue::clock_type::now();
163 _pclass
.on_complete(std::chrono::duration_cast
<std::chrono::duration
<double>>(now
- _dispatched
));
164 _ioq
.complete_request(*this);
169 void cancel() noexcept
{
171 _pr
.set_exception(std::make_exception_ptr(default_io_exception_factory::cancelled()));
175 void dispatch(internal::io_direction_and_length dnl
, io_queue::clock_type::time_point queued
) noexcept
{
176 auto now
= io_queue::clock_type::now();
177 _pclass
.on_dispatch(dnl
, std::chrono::duration_cast
<std::chrono::duration
<double>>(now
- queued
));
181 future
<size_t> get_future() {
182 return _pr
.get_future();
185 fair_queue_ticket
ticket() const noexcept
{ return _fq_ticket
; }
186 stream_id
stream() const noexcept
{ return _stream
; }
189 class queued_io_request
: private internal::io_request
{
191 internal::io_direction_and_length _dnl
;
192 io_queue::clock_type::time_point _started
;
193 const stream_id _stream
;
194 fair_queue_entry _fq_entry
;
195 internal::cancellable_queue::link _intent
;
196 std::unique_ptr
<io_desc_read_write
> _desc
;
198 bool is_cancelled() const noexcept
{ return !_desc
; }
201 queued_io_request(internal::io_request req
, io_queue
& q
, io_queue::priority_class_data
& pc
, internal::io_direction_and_length dnl
)
202 : io_request(std::move(req
))
204 , _dnl(std::move(dnl
))
205 , _started(io_queue::clock_type::now())
206 , _stream(_ioq
.request_stream(_dnl
))
207 , _fq_entry(_ioq
.request_fq_ticket(dnl
))
208 , _desc(std::make_unique
<io_desc_read_write
>(_ioq
, pc
, _stream
, _fq_entry
.ticket()))
210 io_log
.trace("dev {} : req {} queue len {} ticket {}", _ioq
.dev_id(), fmt::ptr(&*_desc
), _dnl
.length(), _fq_entry
.ticket());
213 queued_io_request(queued_io_request
&&) = delete;
215 void dispatch() noexcept
{
216 if (is_cancelled()) {
217 _ioq
.complete_cancelled_request(*this);
222 io_log
.trace("dev {} : req {} submit", _ioq
.dev_id(), fmt::ptr(&*_desc
));
223 _intent
.maybe_dequeue();
224 _desc
->dispatch(_dnl
, _started
);
225 _ioq
.submit_request(_desc
.release(), std::move(*this));
229 void cancel() noexcept
{
230 _ioq
.cancel_request(*this);
231 _desc
.release()->cancel();
234 void set_intent(internal::cancellable_queue
* cq
) noexcept
{
238 future
<size_t> get_future() noexcept
{ return _desc
->get_future(); }
239 fair_queue_entry
& queue_entry() noexcept
{ return _fq_entry
; }
240 stream_id
stream() const noexcept
{ return _stream
; }
242 static queued_io_request
& from_fq_entry(fair_queue_entry
& ent
) noexcept
{
243 return *boost::intrusive::get_parent_from_member(&ent
, &queued_io_request::_fq_entry
);
246 static queued_io_request
& from_cq_link(internal::cancellable_queue::link
& link
) noexcept
{
247 return *boost::intrusive::get_parent_from_member(&link
, &queued_io_request::_intent
);
251 internal::cancellable_queue::cancellable_queue(cancellable_queue
&& o
) noexcept
252 : _first(std::exchange(o
._first
, nullptr))
253 , _rest(std::move(o
._rest
)) {
254 if (_first
!= nullptr) {
259 internal::cancellable_queue
& internal::cancellable_queue::operator=(cancellable_queue
&& o
) noexcept
{
261 _first
= std::exchange(o
._first
, nullptr);
262 _rest
= std::move(o
._rest
);
263 if (_first
!= nullptr) {
270 internal::cancellable_queue::~cancellable_queue() {
271 while (_first
!= nullptr) {
272 queued_io_request::from_cq_link(*_first
).cancel();
277 void internal::cancellable_queue::push_back(link
& il
) noexcept
{
278 if (_first
== nullptr) {
282 new (&il
._hook
) bi::slist_member_hook
<>();
287 void internal::cancellable_queue::pop_front() noexcept
{
288 _first
->_ref
= nullptr;
292 _first
= &_rest
.front();
294 _first
->_hook
.~slist_member_hook
<>();
299 internal::intent_reference::intent_reference(io_intent
* intent
) noexcept
: _intent(intent
) {
300 if (_intent
!= nullptr) {
301 intent
->_refs
.bind(*this);
305 io_intent
* internal::intent_reference::retrieve() const {
306 if (is_cancelled()) {
307 throw default_io_exception_factory::cancelled();
314 io_queue::complete_request(io_desc_read_write
& desc
) noexcept
{
315 _requests_executing
--;
316 _streams
[desc
.stream()].notify_request_finished(desc
.ticket());
319 fair_queue::config
io_queue::make_fair_queue_config(const config
& iocfg
) {
320 fair_queue::config cfg
;
321 cfg
.ticket_weight_pace
= iocfg
.disk_us_per_request
/ read_request_base_count
;
322 cfg
.ticket_size_pace
= (iocfg
.disk_us_per_byte
* (1 << request_ticket_size_shift
)) / read_request_base_count
;
326 io_queue::io_queue(io_group_ptr group
, internal::io_sink
& sink
)
327 : _priority_classes()
328 , _group(std::move(group
))
331 auto fq_cfg
= make_fair_queue_config(get_config());
332 _streams
.emplace_back(*_group
->_fgs
[0], fq_cfg
);
333 if (get_config().duplex
) {
334 _streams
.emplace_back(*_group
->_fgs
[1], fq_cfg
);
336 seastar_logger
.debug("Created io queue, multipliers {}:{}",
337 get_config().disk_req_write_to_read_multiplier
,
338 get_config().disk_bytes_write_to_read_multiplier
);
341 fair_group::config
io_group::make_fair_group_config(const io_queue::config
& qcfg
) noexcept
{
343 * It doesn't make sense to configure requests limit higher than
344 * it can be if the queue is full of minimal requests. At the same
345 * time setting too large value increases the chances to overflow
346 * the group rovers and lock-up the queue.
348 * The same is technically true for bytes limit, but the group
349 * rovers are configured in blocks (ticket size shift), and this
350 * already makes a good protection.
352 auto max_req_count
= std::min(qcfg
.max_req_count
,
353 qcfg
.max_bytes_count
/ io_queue::minimal_request_size
);
354 auto max_req_count_min
= std::max(io_queue::read_request_base_count
, qcfg
.disk_req_write_to_read_multiplier
);
356 * Read requests weight read_request_base_count, writes weight
357 * disk_req_write_to_read_multiplier. The fair queue limit must
358 * be enough to pass the largest one through. The same is true
359 * for request sizes, but that check is done run-time, see the
360 * request_fq_ticket() method.
362 if (max_req_count
< max_req_count_min
) {
363 seastar_logger
.warn("The disk request rate is too low, configuring it to {}, but you may experience latency problems", max_req_count_min
);
364 max_req_count
= max_req_count_min
;
366 return fair_group::config(max_req_count
,
367 qcfg
.max_bytes_count
>> io_queue::request_ticket_size_shift
);
370 io_group::io_group(io_queue::config io_cfg
) noexcept
371 : _config(std::move(io_cfg
))
373 auto fg_cfg
= make_fair_group_config(_config
);
374 _fgs
.push_back(std::make_unique
<fair_group
>(fg_cfg
));
375 if (_config
.duplex
) {
376 _fgs
.push_back(std::make_unique
<fair_group
>(fg_cfg
));
378 seastar_logger
.debug("Created io group, limits {}:{}", _config
.max_req_count
, _config
.max_bytes_count
);
381 io_queue::~io_queue() {
382 // It is illegal to stop the I/O queue with pending requests.
383 // Technically we would use a gate to guarantee that. But here, it is not
384 // needed since this is expected to be destroyed only after the reactor is destroyed.
386 // And that will happen only when there are no more fibers to run. If we ever change
387 // that, then this has to change.
388 for (auto&& pc_data
: _priority_classes
) {
390 for (auto&& s
: _streams
) {
391 s
.unregister_priority_class(pc_data
->fq_class());
397 std::mutex
io_priority_class::_register_lock
;
398 std::array
<io_priority_class::class_info
, io_priority_class::_max_classes
> io_priority_class::_infos
;
400 unsigned io_priority_class::get_shares() const {
401 return _infos
.at(_id
).shares
;
404 sstring
io_priority_class::get_name() const {
405 std::lock_guard
<std::mutex
> lock(_register_lock
);
406 return _infos
.at(_id
).name
;
409 io_priority_class
io_priority_class::register_one(sstring name
, uint32_t shares
) {
410 std::lock_guard
<std::mutex
> lock(_register_lock
);
411 for (unsigned i
= 0; i
< _max_classes
; ++i
) {
412 if (!_infos
[i
].registered()) {
413 _infos
[i
].shares
= shares
;
414 _infos
[i
].name
= std::move(name
);
415 } else if (_infos
[i
].name
!= name
) {
418 // found an entry matching the name to be registered,
419 // make sure it was registered with the same number shares
420 // Note: those may change dynamically later on in the
422 assert(_infos
[i
].shares
== shares
);
424 return io_priority_class(i
);
426 throw std::runtime_error("No more room for new I/O priority classes");
429 future
<> io_priority_class::update_shares(uint32_t shares
) const {
430 // Keep registered shares intact, just update the ones
432 return engine().update_shares_for_queues(*this, shares
);
435 bool io_priority_class::rename_registered(sstring new_name
) {
436 std::lock_guard
<std::mutex
> guard(_register_lock
);
437 for (unsigned i
= 0; i
< _max_classes
; ++i
) {
438 if (!_infos
[i
].registered()) {
441 if (_infos
[i
].name
== new_name
) {
445 io_log
.error("trying to rename priority class with id {} to \"{}\" but that name already exists", id(), new_name
);
446 throw std::runtime_error(format("rename priority class: an attempt was made to rename a priority class to an"
447 " already existing name ({})", new_name
));
451 _infos
[id()].name
= new_name
;
455 future
<> io_priority_class::rename(sstring new_name
) noexcept
{
456 return futurize_invoke([this, new_name
= std::move(new_name
)] () mutable {
457 // Taking the lock here will prevent from newly registered classes
458 // to register under the old name (and will prevent undefined
459 // behavior since this array is shared cross shards. However, it
460 // doesn't prevent the case where a newly registered class (that
461 // got registered right after the lock release) will be unnecessarily
462 // renamed. This is not a real problem and it is a lot better than
463 // holding the lock until all cross shard activity is over.
465 if (!rename_registered(new_name
)) {
466 return make_ready_future
<>();
469 return smp::invoke_on_all([this, new_name
= std::move(new_name
)] {
470 return engine().rename_queues(*this, new_name
);
475 seastar::metrics::label
io_queue_shard("ioshard");
478 io_queue::priority_class_data::rename(sstring new_name
, sstring mountpoint
) {
480 register_stats(new_name
, mountpoint
);
481 } catch (metrics::double_registration
&e
) {
482 // we need to ignore this exception, since it can happen that
483 // a class that was already created with the new name will be
484 // renamed again (this will cause a double registration exception
491 io_queue::priority_class_data::register_stats(sstring name
, sstring mountpoint
) {
492 shard_id owner
= this_shard_id();
493 seastar::metrics::metric_groups new_metrics
;
494 namespace sm
= seastar::metrics
;
495 auto shard
= sm::impl::shard();
497 auto ioq_group
= sm::label("mountpoint");
498 auto mountlabel
= ioq_group(mountpoint
);
500 auto class_label_type
= sm::label("class");
501 auto class_label
= class_label_type(name
);
502 new_metrics
.add_group("io_queue", {
503 sm::make_derive("total_bytes", [this] {
504 return _rwstat
[internal::io_direction_and_length::read_idx
].bytes
+ _rwstat
[internal::io_direction_and_length::write_idx
].bytes
;
505 }, sm::description("Total bytes passed in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
506 sm::make_derive("total_operations", [this] {
507 return _rwstat
[internal::io_direction_and_length::read_idx
].ops
+ _rwstat
[internal::io_direction_and_length::write_idx
].ops
;
508 }, sm::description("Total operations passed in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
509 sm::make_derive("total_read_bytes", _rwstat
[internal::io_direction_and_length::read_idx
].bytes
,
510 sm::description("Total read bytes passed in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
511 sm::make_derive("total_read_ops", _rwstat
[internal::io_direction_and_length::read_idx
].ops
,
512 sm::description("Total read operations passed in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
513 sm::make_derive("total_write_bytes", _rwstat
[internal::io_direction_and_length::write_idx
].bytes
,
514 sm::description("Total write bytes passed in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
515 sm::make_derive("total_write_ops", _rwstat
[internal::io_direction_and_length::write_idx
].ops
,
516 sm::description("Total write operations passed in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
517 sm::make_derive("total_delay_sec", [this] {
518 return _total_queue_time
.count();
519 }, sm::description("Total time spent in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
520 sm::make_derive("total_exec_sec", [this] {
521 return _total_execution_time
.count();
522 }, sm::description("Total time spent in disk"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
523 sm::make_derive("starvation_time_sec", [this] {
524 auto st
= _starvation_time
;
525 if (_nr_queued
!= 0 && _nr_executing
== 0) {
526 st
+= io_queue::clock_type::now() - _activated
;
529 }, sm::description("Total time spent starving for disk"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
531 // Note: The counter below is not the same as reactor's queued-io-requests
532 // queued-io-requests shows us how many requests in total exist in this I/O Queue.
534 // This counter lives in the priority class, so it will count only queued requests
535 // that belong to that class.
537 // In other words: the new counter tells you how busy a class is, and the
538 // old counter tells you how busy the system is.
540 sm::make_queue_length("queue_length", _nr_queued
, sm::description("Number of requests in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
541 sm::make_queue_length("disk_queue_length", _nr_executing
, sm::description("Number of requests in the disk"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
542 sm::make_gauge("delay", [this] {
543 return _queue_time
.count();
544 }, sm::description("random delay time in the queue"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
}),
545 sm::make_gauge("shares", _shares
, sm::description("current amount of shares"), {io_queue_shard(shard
), sm::shard_label(owner
), mountlabel
, class_label
})
547 _metric_groups
= std::exchange(new_metrics
, {});
550 io_queue::priority_class_data
& io_queue::find_or_create_class(const io_priority_class
& pc
) {
552 if (id
>= _priority_classes
.size()) {
553 _priority_classes
.resize(id
+ 1);
555 if (!_priority_classes
[id
]) {
556 auto shares
= pc
.get_shares();
557 auto name
= pc
.get_name();
561 // We could just add the owner as the instance id and have something like:
562 // io_queue-<class_owner>-<counter>-<class_name>
564 // However, when there are more than one shard per I/O queue, it is very useful
565 // to know which shards are being served by the same queue. Therefore, a better name
568 // io_queue-<queue_owner>-<counter>-<class_name>, shard=<class_owner>
569 // using the shard label to hold the owner number
571 // This conveys all the information we need and allows one to easily group all classes from
572 // the same I/O queue (by filtering by shard)
573 for (auto&& s
: _streams
) {
574 s
.register_priority_class(id
, shares
);
576 auto pc_data
= std::make_unique
<priority_class_data
>(pc
, shares
, name
, mountpoint());
578 _priority_classes
[id
] = std::move(pc_data
);
580 return *_priority_classes
[id
];
583 stream_id
io_queue::request_stream(internal::io_direction_and_length dnl
) const noexcept
{
584 return get_config().duplex
? dnl
.rw_idx() : 0;
587 fair_queue_ticket
io_queue::request_fq_ticket(internal::io_direction_and_length dnl
) const noexcept
{
591 if (dnl
.is_write()) {
592 weight
= get_config().disk_req_write_to_read_multiplier
;
593 size
= get_config().disk_bytes_write_to_read_multiplier
* dnl
.length();
595 weight
= io_queue::read_request_base_count
;
596 size
= io_queue::read_request_base_count
* dnl
.length();
599 static thread_local
size_t oversize_warning_threshold
= 0;
601 if (size
>= get_config().max_bytes_count
) {
602 if (size
> oversize_warning_threshold
) {
603 oversize_warning_threshold
= size
;
604 io_log
.warn("oversized request (length {}) submitted. "
605 "dazed and confuzed, trimming its weight from {} down to {}", dnl
.length(),
606 size
>> request_ticket_size_shift
,
607 get_config().max_bytes_count
>> request_ticket_size_shift
);
609 size
= get_config().max_bytes_count
;
612 return fair_queue_ticket(weight
, size
>> request_ticket_size_shift
);
615 io_queue::request_limits
io_queue::get_request_limits() const noexcept
{
617 l
.max_read
= align_down
<size_t>(std::min
<size_t>(get_config().disk_read_saturation_length
, get_config().max_bytes_count
/ read_request_base_count
), minimal_request_size
);
618 l
.max_write
= align_down
<size_t>(std::min
<size_t>(get_config().disk_write_saturation_length
, get_config().max_bytes_count
/ get_config().disk_bytes_write_to_read_multiplier
), minimal_request_size
);
623 io_queue::queue_request(const io_priority_class
& pc
, size_t len
, internal::io_request req
, io_intent
* intent
) noexcept
{
624 return futurize_invoke([&pc
, len
, req
= std::move(req
), this, intent
] () mutable {
625 // First time will hit here, and then we create the class. It is important
626 // that we create the shared pointer in the same shard it will be used at later.
627 auto& pclass
= find_or_create_class(pc
);
628 internal::io_direction_and_length
dnl(req
, len
);
629 auto queued_req
= std::make_unique
<queued_io_request
>(std::move(req
), *this, pclass
, std::move(dnl
));
630 auto fut
= queued_req
->get_future();
631 internal::cancellable_queue
* cq
= nullptr;
632 if (intent
!= nullptr) {
633 cq
= &intent
->find_or_create_cancellable_queue(dev_id(), pc
.id());
636 _streams
[queued_req
->stream()].queue(pclass
.fq_class(), queued_req
->queue_entry());
637 queued_req
->set_intent(cq
);
638 queued_req
.release();
645 void io_queue::poll_io_queue() {
646 for (auto&& st
: _streams
) {
647 st
.dispatch_requests([] (fair_queue_entry
& fqe
) {
648 queued_io_request::from_fq_entry(fqe
).dispatch();
653 void io_queue::submit_request(io_desc_read_write
* desc
, internal::io_request req
) noexcept
{
655 _requests_executing
++;
656 _sink
.submit(desc
, std::move(req
));
659 void io_queue::cancel_request(queued_io_request
& req
) noexcept
{
661 _streams
[req
.stream()].notify_request_cancelled(req
.queue_entry());
664 void io_queue::complete_cancelled_request(queued_io_request
& req
) noexcept
{
665 _streams
[req
.stream()].notify_request_finished(req
.queue_entry().ticket());
668 io_queue::clock_type::time_point
io_queue::next_pending_aio() const noexcept
{
669 clock_type::time_point next
= clock_type::time_point::max();
671 for (const auto& s
: _streams
) {
672 clock_type::time_point n
= s
.next_pending_aio();
682 io_queue::update_shares_for_class(const io_priority_class pc
, size_t new_shares
) {
683 return futurize_invoke([this, pc
, new_shares
] {
684 auto& pclass
= find_or_create_class(pc
);
685 pclass
.update_shares(new_shares
);
686 for (auto&& s
: _streams
) {
687 s
.update_shares_for_class(pclass
.fq_class(), new_shares
);
693 io_queue::rename_priority_class(io_priority_class pc
, sstring new_name
) {
694 if (_priority_classes
.size() > pc
.id() &&
695 _priority_classes
[pc
.id()]) {
696 _priority_classes
[pc
.id()]->rename(new_name
, get_config().mountpoint
);
700 void internal::io_sink::submit(io_completion
* desc
, internal::io_request req
) noexcept
{
702 _pending_io
.emplace_back(std::move(req
), desc
);
704 desc
->set_exception(std::current_exception());