]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2018 Red Hat, Inc. | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
16 | #include "include/rados/librados.hpp" | |
17 | ||
18 | #include "rgw_aio_throttle.h" | |
19 | #include "rgw_rados.h" | |
20 | ||
21 | namespace rgw { | |
22 | ||
23 | void AioThrottle::aio_cb(void *cb, void *arg) | |
24 | { | |
25 | Pending& p = *static_cast<Pending*>(arg); | |
26 | p.result = p.completion->get_return_value(); | |
27 | p.parent->put(p); | |
28 | } | |
29 | ||
30 | bool AioThrottle::waiter_ready() const | |
31 | { | |
32 | switch (waiter) { | |
33 | case Wait::Available: return is_available(); | |
34 | case Wait::Completion: return has_completion(); | |
35 | case Wait::Drained: return is_drained(); | |
36 | default: return false; | |
37 | } | |
38 | } | |
39 | ||
40 | AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj, | |
41 | librados::ObjectWriteOperation *op, | |
42 | uint64_t cost, uint64_t id) | |
43 | { | |
44 | auto p = std::make_unique<Pending>(); | |
45 | p->obj = obj; | |
46 | p->id = id; | |
47 | p->cost = cost; | |
48 | ||
49 | if (cost > window) { | |
50 | p->result = -EDEADLK; // would never succeed | |
eafe8130 | 51 | std::unique_lock lock{mutex}; |
11fdf7f2 TL |
52 | completed.push_back(*p); |
53 | } else { | |
54 | get(*p); | |
55 | p->result = obj.aio_operate(p->completion, op); | |
56 | if (p->result < 0) { | |
57 | put(*p); | |
58 | } | |
59 | } | |
60 | p.release(); | |
eafe8130 | 61 | std::unique_lock lock{mutex}; |
11fdf7f2 TL |
62 | return std::move(completed); |
63 | } | |
64 | ||
65 | AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj, | |
66 | librados::ObjectReadOperation *op, | |
67 | uint64_t cost, uint64_t id) | |
68 | { | |
69 | auto p = std::make_unique<Pending>(); | |
70 | p->obj = obj; | |
71 | p->id = id; | |
72 | p->cost = cost; | |
73 | ||
74 | if (cost > window) { | |
75 | p->result = -EDEADLK; // would never succeed | |
eafe8130 | 76 | std::unique_lock lock{mutex}; |
11fdf7f2 TL |
77 | completed.push_back(*p); |
78 | } else { | |
79 | get(*p); | |
80 | p->result = obj.aio_operate(p->completion, op, &p->data); | |
81 | if (p->result < 0) { | |
82 | put(*p); | |
83 | } | |
84 | } | |
85 | p.release(); | |
eafe8130 | 86 | std::unique_lock lock{mutex}; |
11fdf7f2 TL |
87 | return std::move(completed); |
88 | } | |
89 | ||
90 | void AioThrottle::get(Pending& p) | |
91 | { | |
92 | std::unique_lock lock{mutex}; | |
93 | ||
94 | // wait for the write size to become available | |
95 | pending_size += p.cost; | |
96 | if (!is_available()) { | |
97 | ceph_assert(waiter == Wait::None); | |
98 | waiter = Wait::Available; | |
99 | cond.wait(lock, [this] { return is_available(); }); | |
100 | waiter = Wait::None; | |
101 | } | |
102 | ||
103 | // register the pending write and attach a completion | |
104 | p.parent = this; | |
105 | p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb); | |
106 | pending.push_back(p); | |
107 | } | |
108 | ||
109 | void AioThrottle::put(Pending& p) | |
110 | { | |
111 | p.completion->release(); | |
112 | p.completion = nullptr; | |
113 | ||
114 | std::scoped_lock lock{mutex}; | |
115 | ||
116 | // move from pending to completed | |
117 | pending.erase(pending.iterator_to(p)); | |
118 | completed.push_back(p); | |
119 | ||
120 | pending_size -= p.cost; | |
121 | ||
122 | if (waiter_ready()) { | |
123 | cond.notify_one(); | |
124 | } | |
125 | } | |
126 | ||
127 | AioResultList AioThrottle::poll() | |
128 | { | |
129 | std::unique_lock lock{mutex}; | |
130 | return std::move(completed); | |
131 | } | |
132 | ||
133 | AioResultList AioThrottle::wait() | |
134 | { | |
135 | std::unique_lock lock{mutex}; | |
136 | if (completed.empty() && !pending.empty()) { | |
137 | ceph_assert(waiter == Wait::None); | |
138 | waiter = Wait::Completion; | |
139 | cond.wait(lock, [this] { return has_completion(); }); | |
140 | waiter = Wait::None; | |
141 | } | |
142 | return std::move(completed); | |
143 | } | |
144 | ||
145 | AioResultList AioThrottle::drain() | |
146 | { | |
147 | std::unique_lock lock{mutex}; | |
148 | if (!pending.empty()) { | |
149 | ceph_assert(waiter == Wait::None); | |
150 | waiter = Wait::Drained; | |
151 | cond.wait(lock, [this] { return is_drained(); }); | |
152 | waiter = Wait::None; | |
153 | } | |
154 | return std::move(completed); | |
155 | } | |
156 | ||
157 | } // namespace rgw |