]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_aio_throttle.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_aio_throttle.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#include "include/rados/librados.hpp"
17
18#include "rgw_aio_throttle.h"
19#include "rgw_rados.h"
20
21namespace rgw {
22
23void AioThrottle::aio_cb(void *cb, void *arg)
24{
25 Pending& p = *static_cast<Pending*>(arg);
26 p.result = p.completion->get_return_value();
27 p.parent->put(p);
28}
29
30bool AioThrottle::waiter_ready() const
31{
32 switch (waiter) {
33 case Wait::Available: return is_available();
34 case Wait::Completion: return has_completion();
35 case Wait::Drained: return is_drained();
36 default: return false;
37 }
38}
39
40AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
41 librados::ObjectWriteOperation *op,
42 uint64_t cost, uint64_t id)
43{
44 auto p = std::make_unique<Pending>();
45 p->obj = obj;
46 p->id = id;
47 p->cost = cost;
48
49 if (cost > window) {
50 p->result = -EDEADLK; // would never succeed
eafe8130 51 std::unique_lock lock{mutex};
11fdf7f2
TL
52 completed.push_back(*p);
53 } else {
54 get(*p);
55 p->result = obj.aio_operate(p->completion, op);
56 if (p->result < 0) {
57 put(*p);
58 }
59 }
60 p.release();
eafe8130 61 std::unique_lock lock{mutex};
11fdf7f2
TL
62 return std::move(completed);
63}
64
65AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
66 librados::ObjectReadOperation *op,
67 uint64_t cost, uint64_t id)
68{
69 auto p = std::make_unique<Pending>();
70 p->obj = obj;
71 p->id = id;
72 p->cost = cost;
73
74 if (cost > window) {
75 p->result = -EDEADLK; // would never succeed
eafe8130 76 std::unique_lock lock{mutex};
11fdf7f2
TL
77 completed.push_back(*p);
78 } else {
79 get(*p);
80 p->result = obj.aio_operate(p->completion, op, &p->data);
81 if (p->result < 0) {
82 put(*p);
83 }
84 }
85 p.release();
eafe8130 86 std::unique_lock lock{mutex};
11fdf7f2
TL
87 return std::move(completed);
88}
89
90void AioThrottle::get(Pending& p)
91{
92 std::unique_lock lock{mutex};
93
94 // wait for the write size to become available
95 pending_size += p.cost;
96 if (!is_available()) {
97 ceph_assert(waiter == Wait::None);
98 waiter = Wait::Available;
99 cond.wait(lock, [this] { return is_available(); });
100 waiter = Wait::None;
101 }
102
103 // register the pending write and attach a completion
104 p.parent = this;
105 p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb);
106 pending.push_back(p);
107}
108
109void AioThrottle::put(Pending& p)
110{
111 p.completion->release();
112 p.completion = nullptr;
113
114 std::scoped_lock lock{mutex};
115
116 // move from pending to completed
117 pending.erase(pending.iterator_to(p));
118 completed.push_back(p);
119
120 pending_size -= p.cost;
121
122 if (waiter_ready()) {
123 cond.notify_one();
124 }
125}
126
127AioResultList AioThrottle::poll()
128{
129 std::unique_lock lock{mutex};
130 return std::move(completed);
131}
132
133AioResultList AioThrottle::wait()
134{
135 std::unique_lock lock{mutex};
136 if (completed.empty() && !pending.empty()) {
137 ceph_assert(waiter == Wait::None);
138 waiter = Wait::Completion;
139 cond.wait(lock, [this] { return has_completion(); });
140 waiter = Wait::None;
141 }
142 return std::move(completed);
143}
144
145AioResultList AioThrottle::drain()
146{
147 std::unique_lock lock{mutex};
148 if (!pending.empty()) {
149 ceph_assert(waiter == Wait::None);
150 waiter = Wait::Drained;
151 cond.wait(lock, [this] { return is_drained(); });
152 waiter = Wait::None;
153 }
154 return std::move(completed);
155}
156
157} // namespace rgw