]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "ops_executer.h" | |
5 | ||
6 | #include <boost/range/adaptor/filtered.hpp> | |
7 | #include <boost/range/adaptor/map.hpp> | |
8 | #include <boost/range/adaptor/transformed.hpp> | |
9 | #include <boost/range/algorithm_ext/push_back.hpp> | |
10 | #include <boost/range/algorithm/max_element.hpp> | |
11 | #include <boost/range/numeric.hpp> | |
12 | ||
13 | #include <fmt/format.h> | |
14 | #include <fmt/ostream.h> | |
15 | ||
16 | #include <seastar/core/thread.hh> | |
17 | ||
18 | #include "crimson/osd/exceptions.h" | |
f67539c2 | 19 | #include "crimson/osd/pg.h" |
9f95a23c TL |
20 | #include "crimson/osd/watch.h" |
21 | #include "osd/ClassHandler.h" | |
22 | ||
23 | namespace { | |
24 | seastar::logger& logger() { | |
25 | return crimson::get_logger(ceph_subsys_osd); | |
26 | } | |
27 | } | |
28 | ||
29 | namespace crimson::osd { | |
30 | ||
20effc67 | 31 | OpsExecuter::call_ierrorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op) |
9f95a23c TL |
32 | { |
33 | std::string cname, mname; | |
34 | ceph::bufferlist indata; | |
35 | try { | |
36 | auto bp = std::begin(osd_op.indata); | |
37 | bp.copy(osd_op.op.cls.class_len, cname); | |
38 | bp.copy(osd_op.op.cls.method_len, mname); | |
39 | bp.copy(osd_op.op.cls.indata_len, indata); | |
40 | } catch (buffer::error&) { | |
41 | logger().warn("call unable to decode class + method + indata"); | |
42 | return crimson::ct_error::invarg::make(); | |
43 | } | |
44 | ||
45 | // NOTE: opening a class can actually result in dlopen(), and thus | |
46 | // blocking the entire reactor. Thankfully to ClassHandler's cache | |
47 | // this is supposed to be extremely infrequent. | |
48 | ClassHandler::ClassData* cls; | |
49 | int r = ClassHandler::get_instance().open_class(cname, &cls); | |
50 | if (r) { | |
51 | logger().warn("class {} open got {}", cname, cpp_strerror(r)); | |
52 | if (r == -ENOENT) { | |
53 | return crimson::ct_error::operation_not_supported::make(); | |
54 | } else if (r == -EPERM) { | |
55 | // propagate permission errors | |
56 | return crimson::ct_error::permission_denied::make(); | |
57 | } | |
58 | return crimson::ct_error::input_output_error::make(); | |
59 | } | |
60 | ||
61 | ClassHandler::ClassMethod* method = cls->get_method(mname); | |
62 | if (!method) { | |
63 | logger().warn("call method {}.{} does not exist", cname, mname); | |
64 | return crimson::ct_error::operation_not_supported::make(); | |
65 | } | |
66 | ||
67 | const auto flags = method->get_flags(); | |
68 | if (!obc->obs.exists && (flags & CLS_METHOD_WR) == 0) { | |
69 | return crimson::ct_error::enoent::make(); | |
70 | } | |
71 | ||
72 | #if 0 | |
73 | if (flags & CLS_METHOD_WR) { | |
74 | ctx->user_modify = true; | |
75 | } | |
76 | #endif | |
77 | ||
f67539c2 TL |
78 | logger().debug("calling method {}.{}, num_read={}, num_write={}", |
79 | cname, mname, num_read, num_write); | |
80 | const auto prev_rd = num_read; | |
81 | const auto prev_wr = num_write; | |
20effc67 | 82 | return interruptor::async( |
9f95a23c TL |
83 | [this, method, indata=std::move(indata)]() mutable { |
84 | ceph::bufferlist outdata; | |
85 | auto cls_context = reinterpret_cast<cls_method_context_t>(this); | |
86 | const auto ret = method->exec(cls_context, indata, outdata); | |
87 | return std::make_pair(ret, std::move(outdata)); | |
88 | } | |
20effc67 | 89 | ).then_interruptible( |
f67539c2 | 90 | [this, prev_rd, prev_wr, &osd_op, flags] |
9f95a23c TL |
91 | (auto outcome) -> call_errorator::future<> { |
92 | auto& [ret, outdata] = outcome; | |
f67539c2 | 93 | osd_op.rval = ret; |
9f95a23c | 94 | |
f67539c2 TL |
95 | logger().debug("do_op_call: method returned ret={}, outdata.length()={}" |
96 | " while num_read={}, num_write={}", | |
97 | ret, outdata.length(), num_read, num_write); | |
9f95a23c TL |
98 | if (num_read > prev_rd && !(flags & CLS_METHOD_RD)) { |
99 | logger().error("method tried to read object but is not marked RD"); | |
f67539c2 | 100 | osd_op.rval = -EIO; |
9f95a23c TL |
101 | return crimson::ct_error::input_output_error::make(); |
102 | } | |
103 | if (num_write > prev_wr && !(flags & CLS_METHOD_WR)) { | |
104 | logger().error("method tried to update object but is not marked WR"); | |
f67539c2 | 105 | osd_op.rval = -EIO; |
9f95a23c TL |
106 | return crimson::ct_error::input_output_error::make(); |
107 | } | |
f67539c2 TL |
108 | // ceph-osd has this implemented in `PrimaryLogPG::execute_ctx`, |
109 | // grep for `ignore_out_data`. | |
110 | using crimson::common::local_conf; | |
111 | if (op_info.allows_returnvec() && | |
112 | op_info.may_write() && | |
113 | ret >= 0 && | |
114 | outdata.length() > local_conf()->osd_max_write_op_reply_len) { | |
115 | // the justification of this limit it to not inflate the pg log. | |
116 | // that's the reason why we don't worry about pure reads. | |
117 | logger().error("outdata overflow due to .length()={}, limit={}", | |
118 | outdata.length(), | |
119 | local_conf()->osd_max_write_op_reply_len); | |
120 | osd_op.rval = -EOVERFLOW; | |
121 | return crimson::ct_error::value_too_large::make(); | |
122 | } | |
123 | // for write calls we never return data expect errors or RETURNVEC. | |
124 | // please refer cls/cls_hello.cc to details. | |
125 | if (!op_info.may_write() || op_info.allows_returnvec() || ret < 0) { | |
9f95a23c TL |
126 | osd_op.op.extent.length = outdata.length(); |
127 | osd_op.outdata.claim_append(outdata); | |
128 | } | |
129 | if (ret < 0) { | |
f67539c2 TL |
130 | return crimson::stateful_ec{ |
131 | std::error_code(-ret, std::generic_category()) }; | |
132 | } else { | |
133 | return seastar::now(); | |
9f95a23c | 134 | } |
9f95a23c TL |
135 | } |
136 | ); | |
137 | } | |
138 | ||
139 | static watch_info_t create_watch_info(const OSDOp& osd_op, | |
20effc67 | 140 | const OpsExecuter::ExecutableMessage& msg) |
9f95a23c TL |
141 | { |
142 | using crimson::common::local_conf; | |
143 | const uint32_t timeout = | |
144 | osd_op.op.watch.timeout == 0 ? local_conf()->osd_client_watch_timeout | |
145 | : osd_op.op.watch.timeout; | |
146 | return { | |
147 | osd_op.op.watch.cookie, | |
148 | timeout, | |
149 | msg.get_connection()->get_peer_addr() | |
150 | }; | |
151 | } | |
152 | ||
20effc67 | 153 | OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch( |
9f95a23c TL |
154 | OSDOp& osd_op, |
155 | ObjectState& os, | |
156 | ceph::os::Transaction& txn) | |
157 | { | |
20effc67 | 158 | logger().debug("{}", __func__); |
9f95a23c TL |
159 | struct connect_ctx_t { |
160 | ObjectContext::watch_key_t key; | |
161 | crimson::net::ConnectionRef conn; | |
162 | watch_info_t info; | |
163 | ||
20effc67 | 164 | connect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg) |
9f95a23c TL |
165 | : key(osd_op.op.watch.cookie, msg.get_reqid().name), |
166 | conn(msg.get_connection()), | |
167 | info(create_watch_info(osd_op, msg)) { | |
168 | } | |
169 | }; | |
170 | return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() }, | |
171 | [&] (auto& ctx) { | |
172 | const auto& entity = ctx.key.second; | |
173 | auto [it, emplaced] = | |
174 | os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info)); | |
175 | if (emplaced) { | |
176 | logger().info("registered new watch {} by {}", it->second, entity); | |
177 | txn.nop(); | |
178 | } else { | |
179 | logger().info("found existing watch {} by {}", it->second, entity); | |
180 | } | |
181 | return seastar::now(); | |
182 | }, | |
20effc67 TL |
183 | [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) { |
184 | assert(pg); | |
9f95a23c TL |
185 | auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr); |
186 | if (emplaced) { | |
187 | const auto& [cookie, entity] = ctx.key; | |
20effc67 TL |
188 | it->second = crimson::osd::Watch::create( |
189 | obc, ctx.info, entity, std::move(pg)); | |
9f95a23c TL |
190 | logger().info("op_effect: added new watcher: {}", ctx.key); |
191 | } else { | |
192 | logger().info("op_effect: found existing watcher: {}", ctx.key); | |
193 | } | |
194 | return it->second->connect(std::move(ctx.conn), true /* will_ping */); | |
195 | }); | |
196 | } | |
197 | ||
20effc67 | 198 | OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect( |
9f95a23c TL |
199 | OSDOp& osd_op, |
200 | ObjectState& os, | |
201 | ceph::os::Transaction& txn) | |
202 | { | |
203 | const entity_name_t& entity = get_message().get_reqid().name; | |
204 | const auto& cookie = osd_op.op.watch.cookie; | |
205 | if (!os.oi.watchers.count(std::make_pair(cookie, entity))) { | |
206 | return crimson::ct_error::not_connected::make(); | |
207 | } else { | |
208 | logger().info("found existing watch by {}", entity); | |
209 | return do_op_watch_subop_watch(osd_op, os, txn); | |
210 | } | |
211 | } | |
212 | ||
20effc67 | 213 | OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_unwatch( |
9f95a23c TL |
214 | OSDOp& osd_op, |
215 | ObjectState& os, | |
216 | ceph::os::Transaction& txn) | |
217 | { | |
218 | logger().info("{}", __func__); | |
219 | ||
220 | struct disconnect_ctx_t { | |
221 | ObjectContext::watch_key_t key; | |
20effc67 | 222 | disconnect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg) |
9f95a23c TL |
223 | : key(osd_op.op.watch.cookie, msg.get_reqid().name) { |
224 | } | |
225 | }; | |
226 | return with_effect_on_obc(disconnect_ctx_t{ osd_op, get_message() }, | |
227 | [&] (auto& ctx) { | |
228 | const auto& entity = ctx.key.second; | |
229 | if (auto nh = os.oi.watchers.extract(ctx.key); !nh.empty()) { | |
230 | logger().info("removed watch {} by {}", nh.mapped(), entity); | |
231 | txn.nop(); | |
232 | } else { | |
233 | logger().info("can't remove: no watch by {}", entity); | |
234 | } | |
235 | return seastar::now(); | |
236 | }, | |
20effc67 | 237 | [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) { |
9f95a23c TL |
238 | if (auto nh = obc->watchers.extract(ctx.key); !nh.empty()) { |
239 | return seastar::do_with(std::move(nh.mapped()), | |
240 | [ctx](auto&& watcher) { | |
241 | logger().info("op_effect: disconnect watcher {}", ctx.key); | |
20effc67 | 242 | return watcher->remove(); |
9f95a23c TL |
243 | }); |
244 | } else { | |
245 | logger().info("op_effect: disconnect failed to find watcher {}", ctx.key); | |
246 | return seastar::now(); | |
247 | } | |
248 | }); | |
249 | } | |
250 | ||
20effc67 | 251 | OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_ping( |
9f95a23c TL |
252 | OSDOp& osd_op, |
253 | ObjectState& os, | |
254 | ceph::os::Transaction& txn) | |
255 | { | |
256 | const entity_name_t& entity = get_message().get_reqid().name; | |
257 | const auto& cookie = osd_op.op.watch.cookie; | |
258 | const auto key = std::make_pair(cookie, entity); | |
259 | ||
260 | // Note: WATCH with PING doesn't cause may_write() to return true, | |
261 | // so if there is nothing else in the transaction, this is going | |
262 | // to run do_osd_op_effects, but not write out a log entry */ | |
263 | if (!os.oi.watchers.count(key)) { | |
264 | return crimson::ct_error::not_connected::make(); | |
265 | } | |
266 | auto it = obc->watchers.find(key); | |
267 | if (it == std::end(obc->watchers) || !it->second->is_connected()) { | |
268 | return crimson::ct_error::timed_out::make(); | |
269 | } | |
270 | logger().info("found existing watch by {}", entity); | |
271 | it->second->got_ping(ceph_clock_now()); | |
272 | return seastar::now(); | |
273 | } | |
274 | ||
20effc67 | 275 | OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch( |
9f95a23c TL |
276 | OSDOp& osd_op, |
277 | ObjectState& os, | |
278 | ceph::os::Transaction& txn) | |
279 | { | |
280 | logger().debug("{}", __func__); | |
281 | if (!os.exists) { | |
282 | return crimson::ct_error::enoent::make(); | |
283 | } | |
284 | switch (osd_op.op.watch.op) { | |
285 | case CEPH_OSD_WATCH_OP_WATCH: | |
286 | return do_op_watch_subop_watch(osd_op, os, txn); | |
287 | case CEPH_OSD_WATCH_OP_RECONNECT: | |
288 | return do_op_watch_subop_reconnect(osd_op, os, txn); | |
289 | case CEPH_OSD_WATCH_OP_PING: | |
290 | return do_op_watch_subop_ping(osd_op, os, txn); | |
291 | case CEPH_OSD_WATCH_OP_UNWATCH: | |
292 | return do_op_watch_subop_unwatch(osd_op, os, txn); | |
293 | case CEPH_OSD_WATCH_OP_LEGACY_WATCH: | |
294 | logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH"); | |
295 | return crimson::ct_error::invarg::make(); | |
296 | } | |
297 | logger().warn("unrecognized WATCH subop: {}", osd_op.op.watch.op); | |
298 | return crimson::ct_error::invarg::make(); | |
299 | } | |
300 | ||
301 | static uint64_t get_next_notify_id(epoch_t e) | |
302 | { | |
303 | // FIXME | |
304 | static std::uint64_t next_notify_id = 0; | |
305 | return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++)); | |
306 | } | |
307 | ||
20effc67 | 308 | OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify( |
9f95a23c TL |
309 | OSDOp& osd_op, |
310 | const ObjectState& os) | |
311 | { | |
312 | logger().debug("{}, msg epoch: {}", __func__, get_message().get_map_epoch()); | |
313 | ||
314 | if (!os.exists) { | |
315 | return crimson::ct_error::enoent::make(); | |
316 | } | |
317 | struct notify_ctx_t { | |
318 | crimson::net::ConnectionRef conn; | |
319 | notify_info_t ninfo; | |
320 | const uint64_t client_gid; | |
321 | const epoch_t epoch; | |
322 | ||
20effc67 | 323 | notify_ctx_t(const ExecutableMessage& msg) |
9f95a23c TL |
324 | : conn(msg.get_connection()), |
325 | client_gid(msg.get_reqid().name.num()), | |
326 | epoch(msg.get_map_epoch()) { | |
327 | } | |
328 | }; | |
329 | return with_effect_on_obc(notify_ctx_t{ get_message() }, | |
330 | [&] (auto& ctx) { | |
331 | try { | |
332 | auto bp = osd_op.indata.cbegin(); | |
333 | uint32_t ver; // obsolete | |
334 | ceph::decode(ver, bp); | |
335 | ceph::decode(ctx.ninfo.timeout, bp); | |
336 | ceph::decode(ctx.ninfo.bl, bp); | |
337 | } catch (const buffer::error&) { | |
338 | ctx.ninfo.timeout = 0; | |
339 | } | |
340 | if (!ctx.ninfo.timeout) { | |
341 | using crimson::common::local_conf; | |
342 | ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout; | |
343 | } | |
344 | ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch); | |
345 | ctx.ninfo.cookie = osd_op.op.notify.cookie; | |
346 | // return our unique notify id to the client | |
347 | ceph::encode(ctx.ninfo.notify_id, osd_op.outdata); | |
348 | return seastar::now(); | |
349 | }, | |
20effc67 | 350 | [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) { |
9f95a23c TL |
351 | auto alive_watchers = obc->watchers | boost::adaptors::map_values |
352 | | boost::adaptors::filtered( | |
353 | [] (const auto& w) { | |
354 | // FIXME: filter as for the `is_ping` in `Watch::start_notify` | |
355 | return w->is_alive(); | |
356 | }); | |
357 | return crimson::osd::Notify::create_n_propagate( | |
358 | std::begin(alive_watchers), | |
359 | std::end(alive_watchers), | |
360 | std::move(ctx.conn), | |
361 | ctx.ninfo, | |
362 | ctx.client_gid, | |
363 | obc->obs.oi.user_version); | |
364 | }); | |
365 | } | |
366 | ||
20effc67 TL |
367 | OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_list_watchers( |
368 | OSDOp& osd_op, | |
369 | const ObjectState& os) | |
370 | { | |
371 | logger().debug("{}", __func__); | |
372 | ||
373 | obj_list_watch_response_t response; | |
374 | for (const auto& [key, info] : os.oi.watchers) { | |
375 | logger().debug("{}: key cookie={}, entity={}", | |
376 | __func__, key.first, key.second); | |
377 | assert(key.first == info.cookie); | |
378 | assert(key.second.is_client()); | |
379 | response.entries.emplace_back(watch_item_t{ | |
380 | key.second, info.cookie, info.timeout_seconds, info.addr}); | |
381 | response.encode(osd_op.outdata, get_message().get_features()); | |
382 | } | |
383 | return watch_ierrorator::now(); | |
384 | } | |
385 | ||
386 | OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify_ack( | |
9f95a23c TL |
387 | OSDOp& osd_op, |
388 | const ObjectState& os) | |
389 | { | |
390 | logger().debug("{}", __func__); | |
391 | ||
392 | struct notifyack_ctx_t { | |
393 | const entity_name_t entity; | |
394 | uint64_t watch_cookie; | |
395 | uint64_t notify_id; | |
396 | ceph::bufferlist reply_bl; | |
397 | ||
20effc67 TL |
398 | notifyack_ctx_t(const ExecutableMessage& msg) |
399 | : entity(msg.get_reqid().name) { | |
9f95a23c TL |
400 | } |
401 | }; | |
402 | return with_effect_on_obc(notifyack_ctx_t{ get_message() }, | |
403 | [&] (auto& ctx) -> watch_errorator::future<> { | |
404 | try { | |
405 | auto bp = osd_op.indata.cbegin(); | |
406 | ceph::decode(ctx.notify_id, bp); | |
407 | ceph::decode(ctx.watch_cookie, bp); | |
408 | if (!bp.end()) { | |
409 | ceph::decode(ctx.reply_bl, bp); | |
410 | } | |
411 | } catch (const buffer::error&) { | |
412 | // here we behave differently than ceph-osd. For historical reasons, | |
413 | // it falls back to using `osd_op.op.watch.cookie` as `ctx.notify_id`. | |
414 | // crimson just returns EINVAL if the data cannot be decoded. | |
415 | return crimson::ct_error::invarg::make(); | |
416 | } | |
417 | return watch_errorator::now(); | |
418 | }, | |
20effc67 | 419 | [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) { |
9f95a23c TL |
420 | logger().info("notify_ack watch_cookie={}, notify_id={}", |
421 | ctx.watch_cookie, ctx.notify_id); | |
422 | return seastar::do_for_each(obc->watchers, | |
423 | [ctx=std::move(ctx)] (auto& kv) { | |
424 | const auto& [key, watchp] = kv; | |
425 | static_assert( | |
426 | std::is_same_v<std::decay_t<decltype(watchp)>, | |
427 | seastar::shared_ptr<crimson::osd::Watch>>); | |
428 | auto& [cookie, entity] = key; | |
429 | if (ctx.entity != entity) { | |
430 | logger().debug("skipping watch {}; entity name {} != {}", | |
431 | key, entity, ctx.entity); | |
432 | return seastar::now(); | |
433 | } | |
434 | if (ctx.watch_cookie != cookie) { | |
435 | logger().debug("skipping watch {}; cookie {} != {}", | |
436 | key, ctx.watch_cookie, cookie); | |
437 | return seastar::now(); | |
438 | } | |
439 | logger().info("acking notify on watch {}", key); | |
440 | return watchp->notify_ack(ctx.notify_id, ctx.reply_bl); | |
441 | }); | |
442 | }); | |
443 | } | |
444 | ||
20effc67 TL |
445 | // Defined here because there is a circular dependency between OpsExecuter and PG |
446 | template <class Func> | |
447 | auto OpsExecuter::do_const_op(Func&& f) { | |
448 | // TODO: pass backend as read-only | |
449 | return std::forward<Func>(f)(pg->get_backend(), std::as_const(obc->obs)); | |
450 | } | |
451 | ||
452 | // Defined here because there is a circular dependency between OpsExecuter and PG | |
453 | template <class Func> | |
454 | auto OpsExecuter::do_write_op(Func&& f, bool um) { | |
455 | ++num_write; | |
456 | if (!osd_op_params) { | |
457 | osd_op_params.emplace(); | |
458 | } | |
459 | user_modify = um; | |
460 | return std::forward<Func>(f)(pg->get_backend(), obc->obs, txn); | |
461 | } | |
462 | ||
463 | OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator> | |
f67539c2 TL |
464 | OpsExecuter::execute_op(OSDOp& osd_op) |
465 | { | |
466 | // TODO: dispatch via call table? | |
467 | // TODO: we might want to find a way to unify both input and output | |
468 | // of each op. | |
469 | logger().debug( | |
470 | "handling op {} on object {}", | |
471 | ceph_osd_op_name(osd_op.op.op), | |
472 | get_target()); | |
473 | switch (const ceph_osd_op& op = osd_op.op; op.op) { | |
474 | case CEPH_OSD_OP_SYNC_READ: | |
475 | [[fallthrough]]; | |
476 | case CEPH_OSD_OP_READ: | |
20effc67 TL |
477 | return do_read_op([this, &osd_op] (auto& backend, const auto& os) { |
478 | return backend.read(os, osd_op, delta_stats); | |
f67539c2 TL |
479 | }); |
480 | case CEPH_OSD_OP_SPARSE_READ: | |
20effc67 TL |
481 | return do_read_op([this, &osd_op] (auto& backend, const auto& os) { |
482 | return backend.sparse_read(os, osd_op, delta_stats); | |
f67539c2 TL |
483 | }); |
484 | case CEPH_OSD_OP_CHECKSUM: | |
485 | return do_read_op([&osd_op] (auto& backend, const auto& os) { | |
486 | return backend.checksum(os, osd_op); | |
487 | }); | |
488 | case CEPH_OSD_OP_CMPEXT: | |
489 | return do_read_op([&osd_op] (auto& backend, const auto& os) { | |
490 | return backend.cmp_ext(os, osd_op); | |
491 | }); | |
492 | case CEPH_OSD_OP_GETXATTR: | |
20effc67 TL |
493 | return do_read_op([this, &osd_op] (auto& backend, const auto& os) { |
494 | return backend.getxattr(os, osd_op, delta_stats); | |
f67539c2 TL |
495 | }); |
496 | case CEPH_OSD_OP_GETXATTRS: | |
20effc67 TL |
497 | return do_read_op([this, &osd_op] (auto& backend, const auto& os) { |
498 | return backend.get_xattrs(os, osd_op, delta_stats); | |
499 | }); | |
500 | case CEPH_OSD_OP_CMPXATTR: | |
501 | return do_read_op([this, &osd_op] (auto& backend, const auto& os) { | |
502 | return backend.cmp_xattr(os, osd_op, delta_stats); | |
f67539c2 TL |
503 | }); |
504 | case CEPH_OSD_OP_RMXATTR: | |
20effc67 TL |
505 | return do_write_op( |
506 | [&osd_op] (auto& backend, auto& os, auto& txn) { | |
f67539c2 TL |
507 | return backend.rm_xattr(os, osd_op, txn); |
508 | }, true); | |
509 | case CEPH_OSD_OP_CREATE: | |
20effc67 TL |
510 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { |
511 | return backend.create(os, osd_op, txn, delta_stats); | |
f67539c2 TL |
512 | }, true); |
513 | case CEPH_OSD_OP_WRITE: | |
514 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { | |
20effc67 | 515 | return backend.write(os, osd_op, txn, *osd_op_params, delta_stats); |
f67539c2 TL |
516 | }, true); |
517 | case CEPH_OSD_OP_WRITESAME: | |
518 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { | |
20effc67 | 519 | return backend.write_same(os, osd_op, txn, *osd_op_params, delta_stats); |
f67539c2 TL |
520 | }, true); |
521 | case CEPH_OSD_OP_WRITEFULL: | |
522 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { | |
20effc67 | 523 | return backend.writefull(os, osd_op, txn, *osd_op_params, delta_stats); |
f67539c2 TL |
524 | }, true); |
525 | case CEPH_OSD_OP_APPEND: | |
526 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { | |
20effc67 | 527 | return backend.append(os, osd_op, txn, *osd_op_params, delta_stats); |
f67539c2 TL |
528 | }, true); |
529 | case CEPH_OSD_OP_TRUNCATE: | |
530 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { | |
531 | // FIXME: rework needed. Move this out to do_write_op(), introduce | |
532 | // do_write_op_no_user_modify()... | |
20effc67 | 533 | return backend.truncate(os, osd_op, txn, *osd_op_params, delta_stats); |
f67539c2 TL |
534 | }, true); |
535 | case CEPH_OSD_OP_ZERO: | |
536 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { | |
20effc67 | 537 | return backend.zero(os, osd_op, txn, *osd_op_params, delta_stats); |
f67539c2 TL |
538 | }, true); |
539 | case CEPH_OSD_OP_SETALLOCHINT: | |
540 | return osd_op_errorator::now(); | |
541 | case CEPH_OSD_OP_SETXATTR: | |
20effc67 TL |
542 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { |
543 | return backend.setxattr(os, osd_op, txn, delta_stats); | |
f67539c2 TL |
544 | }, true); |
545 | case CEPH_OSD_OP_DELETE: | |
20effc67 TL |
546 | return do_write_op([this] (auto& backend, auto& os, auto& txn) { |
547 | return backend.remove(os, txn, delta_stats); | |
f67539c2 TL |
548 | }, true); |
549 | case CEPH_OSD_OP_CALL: | |
550 | return this->do_op_call(osd_op); | |
551 | case CEPH_OSD_OP_STAT: | |
552 | // note: stat does not require RD | |
20effc67 TL |
553 | return do_const_op([this, &osd_op] (/* const */auto& backend, const auto& os) { |
554 | return backend.stat(os, osd_op, delta_stats); | |
f67539c2 TL |
555 | }); |
556 | case CEPH_OSD_OP_TMAPUP: | |
557 | // TODO: there was an effort to kill TMAP in ceph-osd. According to | |
558 | // @dzafman this isn't possible yet. Maybe it could be accomplished | |
559 | // before crimson's readiness and we'd luckily don't need to carry. | |
560 | return dont_do_legacy_op(); | |
561 | ||
562 | // OMAP | |
563 | case CEPH_OSD_OP_OMAPGETKEYS: | |
20effc67 TL |
564 | return do_read_op([this, &osd_op] (auto& backend, const auto& os) { |
565 | return backend.omap_get_keys(os, osd_op, delta_stats); | |
f67539c2 TL |
566 | }); |
567 | case CEPH_OSD_OP_OMAPGETVALS: | |
20effc67 TL |
568 | return do_read_op([this, &osd_op] (auto& backend, const auto& os) { |
569 | return backend.omap_get_vals(os, osd_op, delta_stats); | |
f67539c2 TL |
570 | }); |
571 | case CEPH_OSD_OP_OMAPGETHEADER: | |
20effc67 TL |
572 | return do_read_op([this, &osd_op] (auto& backend, const auto& os) { |
573 | return backend.omap_get_header(os, osd_op, delta_stats); | |
f67539c2 TL |
574 | }); |
575 | case CEPH_OSD_OP_OMAPGETVALSBYKEYS: | |
20effc67 TL |
576 | return do_read_op([this, &osd_op] (auto& backend, const auto& os) { |
577 | return backend.omap_get_vals_by_keys(os, osd_op, delta_stats); | |
f67539c2 TL |
578 | }); |
579 | case CEPH_OSD_OP_OMAPSETVALS: | |
580 | #if 0 | |
581 | if (!pg.get_pool().info.supports_omap()) { | |
582 | return crimson::ct_error::operation_not_supported::make(); | |
583 | } | |
584 | #endif | |
585 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { | |
20effc67 | 586 | return backend.omap_set_vals(os, osd_op, txn, *osd_op_params, delta_stats); |
f67539c2 TL |
587 | }, true); |
588 | case CEPH_OSD_OP_OMAPSETHEADER: | |
589 | #if 0 | |
590 | if (!pg.get_pool().info.supports_omap()) { | |
591 | return crimson::ct_error::operation_not_supported::make(); | |
592 | } | |
593 | #endif | |
20effc67 TL |
594 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { |
595 | return backend.omap_set_header(os, osd_op, txn, *osd_op_params, | |
596 | delta_stats); | |
f67539c2 TL |
597 | }, true); |
598 | case CEPH_OSD_OP_OMAPRMKEYRANGE: | |
599 | #if 0 | |
600 | if (!pg.get_pool().info.supports_omap()) { | |
601 | return crimson::ct_error::operation_not_supported::make(); | |
602 | } | |
603 | #endif | |
20effc67 TL |
604 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { |
605 | return backend.omap_remove_range(os, osd_op, txn, delta_stats); | |
606 | }, true); | |
607 | case CEPH_OSD_OP_OMAPRMKEYS: | |
608 | /** TODO: Implement supports_omap() | |
609 | if (!pg.get_pool().info.supports_omap()) { | |
610 | return crimson::ct_error::operation_not_supported::make(); | |
611 | }*/ | |
f67539c2 | 612 | return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { |
20effc67 | 613 | return backend.omap_remove_key(os, osd_op, txn); |
f67539c2 TL |
614 | }, true); |
615 | case CEPH_OSD_OP_OMAPCLEAR: | |
616 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { | |
20effc67 | 617 | return backend.omap_clear(os, osd_op, txn, *osd_op_params, delta_stats); |
f67539c2 TL |
618 | }, true); |
619 | ||
620 | // watch/notify | |
621 | case CEPH_OSD_OP_WATCH: | |
622 | return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { | |
623 | return do_op_watch(osd_op, os, txn); | |
624 | }, false); | |
20effc67 TL |
625 | case CEPH_OSD_OP_LIST_WATCHERS: |
626 | return do_read_op([this, &osd_op] (auto&, const auto& os) { | |
627 | return do_op_list_watchers(osd_op, os); | |
628 | }); | |
f67539c2 TL |
629 | case CEPH_OSD_OP_NOTIFY: |
630 | return do_read_op([this, &osd_op] (auto&, const auto& os) { | |
631 | return do_op_notify(osd_op, os); | |
632 | }); | |
633 | case CEPH_OSD_OP_NOTIFY_ACK: | |
634 | return do_read_op([this, &osd_op] (auto&, const auto& os) { | |
635 | return do_op_notify_ack(osd_op, os); | |
636 | }); | |
637 | ||
638 | default: | |
639 | logger().warn("unknown op {}", ceph_osd_op_name(op.op)); | |
640 | throw std::runtime_error( | |
641 | fmt::format("op '{}' not supported", ceph_osd_op_name(op.op))); | |
642 | } | |
643 | } | |
644 | ||
20effc67 TL |
645 | // Defined here because there is a circular dependency between OpsExecuter and PG |
646 | uint32_t OpsExecuter::get_pool_stripe_width() const { | |
647 | return pg->get_pool().info.get_stripe_width(); | |
648 | } | |
649 | ||
650 | // Defined here because there is a circular dependency between OpsExecuter and PG | |
651 | version_t OpsExecuter::get_last_user_version() const | |
652 | { | |
653 | return pg->get_last_user_version(); | |
654 | } | |
655 | ||
9f95a23c TL |
656 | static inline std::unique_ptr<const PGLSFilter> get_pgls_filter( |
657 | const std::string& type, | |
658 | bufferlist::const_iterator& iter) | |
659 | { | |
660 | // storing non-const PGLSFilter for the sake of ::init() | |
661 | std::unique_ptr<PGLSFilter> filter; | |
662 | if (type.compare("plain") == 0) { | |
663 | filter = std::make_unique<PGLSPlainFilter>(); | |
664 | } else { | |
665 | std::size_t dot = type.find("."); | |
666 | if (dot == type.npos || dot == 0 || dot == type.size() - 1) { | |
667 | throw crimson::osd::invalid_argument{}; | |
668 | } | |
669 | ||
670 | const std::string class_name = type.substr(0, dot); | |
671 | const std::string filter_name = type.substr(dot + 1); | |
672 | ClassHandler::ClassData *cls = nullptr; | |
673 | int r = ClassHandler::get_instance().open_class(class_name, &cls); | |
674 | if (r != 0) { | |
675 | logger().warn("can't open class {}: {}", class_name, cpp_strerror(r)); | |
676 | if (r == -EPERM) { | |
677 | // propogate permission error | |
678 | throw crimson::osd::permission_denied{}; | |
679 | } else { | |
680 | throw crimson::osd::invalid_argument{}; | |
681 | } | |
682 | } else { | |
683 | ceph_assert(cls); | |
684 | } | |
685 | ||
686 | ClassHandler::ClassFilter * const class_filter = cls->get_filter(filter_name); | |
687 | if (class_filter == nullptr) { | |
688 | logger().warn("can't find filter {} in class {}", filter_name, class_name); | |
689 | throw crimson::osd::invalid_argument{}; | |
690 | } | |
691 | ||
692 | filter.reset(class_filter->fn()); | |
693 | if (!filter) { | |
694 | // Object classes are obliged to return us something, but let's | |
695 | // give an error rather than asserting out. | |
696 | logger().warn("buggy class {} failed to construct filter {}", | |
697 | class_name, filter_name); | |
698 | throw crimson::osd::invalid_argument{}; | |
699 | } | |
700 | } | |
701 | ||
702 | ceph_assert(filter); | |
703 | int r = filter->init(iter); | |
704 | if (r < 0) { | |
705 | logger().warn("error initializing filter {}: {}", type, cpp_strerror(r)); | |
706 | throw crimson::osd::invalid_argument{}; | |
707 | } | |
708 | ||
709 | // successfully constructed and initialized, return it. | |
710 | return filter; | |
711 | } | |
712 | ||
20effc67 | 713 | static PG::interruptible_future<hobject_t> pgls_filter( |
9f95a23c TL |
714 | const PGLSFilter& filter, |
715 | const PGBackend& backend, | |
716 | const hobject_t& sobj) | |
717 | { | |
718 | if (const auto xattr = filter.get_xattr(); !xattr.empty()) { | |
719 | logger().debug("pgls_filter: filter is interested in xattr={} for obj={}", | |
720 | xattr, sobj); | |
20effc67 TL |
721 | return backend.getxattr(sobj, xattr).safe_then_interruptible( |
722 | [&filter, sobj] (ceph::bufferlist val) { | |
9f95a23c TL |
723 | logger().debug("pgls_filter: got xvalue for obj={}", sobj); |
724 | ||
9f95a23c TL |
725 | const bool filtered = filter.filter(sobj, val); |
726 | return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{}); | |
727 | }, PGBackend::get_attr_errorator::all_same_way([&filter, sobj] { | |
728 | logger().debug("pgls_filter: got error for obj={}", sobj); | |
729 | ||
730 | if (filter.reject_empty_xattr()) { | |
20effc67 | 731 | return seastar::make_ready_future<hobject_t>(); |
9f95a23c TL |
732 | } |
733 | ceph::bufferlist val; | |
734 | const bool filtered = filter.filter(sobj, val); | |
735 | return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{}); | |
736 | })); | |
737 | } else { | |
738 | ceph::bufferlist empty_lvalue_bl; | |
739 | const bool filtered = filter.filter(sobj, empty_lvalue_bl); | |
740 | return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{}); | |
741 | } | |
742 | } | |
743 | ||
20effc67 | 744 | static PG::interruptible_future<ceph::bufferlist> do_pgnls_common( |
9f95a23c TL |
745 | const hobject_t& pg_start, |
746 | const hobject_t& pg_end, | |
747 | const PGBackend& backend, | |
748 | const hobject_t& lower_bound, | |
749 | const std::string& nspace, | |
750 | const uint64_t limit, | |
751 | const PGLSFilter* const filter) | |
752 | { | |
753 | if (!(lower_bound.is_min() || | |
754 | lower_bound.is_max() || | |
755 | (lower_bound >= pg_start && lower_bound < pg_end))) { | |
756 | // this should only happen with a buggy client. | |
757 | throw std::invalid_argument("outside of PG bounds"); | |
758 | } | |
759 | ||
20effc67 TL |
760 | return backend.list_objects(lower_bound, limit).then_interruptible( |
761 | [&backend, filter, nspace](auto&& ret) | |
762 | -> PG::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>> { | |
f67539c2 | 763 | auto& [objects, next] = ret; |
9f95a23c TL |
764 | auto in_my_namespace = [&nspace](const hobject_t& obj) { |
765 | using crimson::common::local_conf; | |
766 | if (obj.get_namespace() == local_conf()->osd_hit_set_namespace) { | |
767 | return false; | |
768 | } else if (nspace == librados::all_nspaces) { | |
769 | return true; | |
770 | } else { | |
771 | return obj.get_namespace() == nspace; | |
772 | } | |
773 | }; | |
20effc67 TL |
774 | auto to_pglsed = [&backend, filter] (const hobject_t& obj) |
775 | -> PG::interruptible_future<hobject_t> { | |
9f95a23c TL |
776 | // this transformation looks costly. However, I don't have any |
777 | // reason to think PGLS* operations are critical for, let's say, | |
778 | // general performance. | |
779 | // | |
780 | // from tchaikov: "another way is to use seastar::map_reduce(), | |
781 | // to 1) save the effort to filter the already filtered objects | |
782 | // 2) avoid the space to keep the tuple<bool, object> even if | |
783 | // the object is filtered out". | |
784 | if (filter) { | |
785 | return pgls_filter(*filter, backend, obj); | |
786 | } else { | |
787 | return seastar::make_ready_future<hobject_t>(obj); | |
788 | } | |
789 | }; | |
790 | ||
791 | auto range = objects | boost::adaptors::filtered(in_my_namespace) | |
792 | | boost::adaptors::transformed(to_pglsed); | |
793 | logger().debug("do_pgnls_common: finishing the 1st stage of pgls"); | |
794 | return seastar::when_all_succeed(std::begin(range), | |
795 | std::end(range)).then( | |
796 | [next=std::move(next)] (auto items) mutable { | |
797 | // the sole purpose of this chaining is to pass `next` to 2nd | |
798 | // stage altogether with items | |
799 | logger().debug("do_pgnls_common: 1st done"); | |
f67539c2 TL |
800 | return seastar::make_ready_future< |
801 | std::tuple<std::vector<hobject_t>, hobject_t>>( | |
20effc67 | 802 | std::move(items), std::move(next)); |
9f95a23c | 803 | }); |
20effc67 | 804 | }).then_interruptible( |
f67539c2 TL |
805 | [pg_end] (auto&& ret) { |
806 | auto& [items, next] = ret; | |
9f95a23c TL |
807 | auto is_matched = [] (const auto& obj) { |
808 | return !obj.is_min(); | |
809 | }; | |
810 | auto to_entry = [] (const auto& obj) { | |
811 | return librados::ListObjectImpl{ | |
812 | obj.get_namespace(), obj.oid.name, obj.get_key() | |
813 | }; | |
814 | }; | |
815 | ||
816 | pg_nls_response_t response; | |
817 | boost::push_back(response.entries, items | boost::adaptors::filtered(is_matched) | |
818 | | boost::adaptors::transformed(to_entry)); | |
819 | response.handle = next.is_max() ? pg_end : next; | |
820 | ceph::bufferlist out; | |
821 | encode(response, out); | |
822 | logger().debug("{}: response.entries.size()=", | |
823 | __func__, response.entries.size()); | |
824 | return seastar::make_ready_future<ceph::bufferlist>(std::move(out)); | |
825 | }); | |
826 | } | |
827 | ||
20effc67 | 828 | static PG::interruptible_future<> do_pgnls( |
9f95a23c TL |
829 | const PG& pg, |
830 | const std::string& nspace, | |
831 | OSDOp& osd_op) | |
832 | { | |
833 | hobject_t lower_bound; | |
834 | try { | |
835 | ceph::decode(lower_bound, osd_op.indata); | |
836 | } catch (const buffer::error&) { | |
837 | throw std::invalid_argument("unable to decode PGNLS handle"); | |
838 | } | |
839 | const auto pg_start = pg.get_pgid().pgid.get_hobj_start(); | |
840 | const auto pg_end = \ | |
841 | pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num()); | |
842 | return do_pgnls_common(pg_start, | |
843 | pg_end, | |
844 | pg.get_backend(), | |
845 | lower_bound, | |
846 | nspace, | |
847 | osd_op.op.pgls.count, | |
848 | nullptr /* no filter */) | |
20effc67 | 849 | .then_interruptible([&osd_op](bufferlist bl) { |
9f95a23c TL |
850 | osd_op.outdata = std::move(bl); |
851 | return seastar::now(); | |
852 | }); | |
853 | } | |
854 | ||
20effc67 | 855 | static PG::interruptible_future<> do_pgnls_filtered( |
9f95a23c TL |
856 | const PG& pg, |
857 | const std::string& nspace, | |
858 | OSDOp& osd_op) | |
859 | { | |
860 | std::string cname, mname, type; | |
861 | auto bp = osd_op.indata.cbegin(); | |
862 | try { | |
863 | ceph::decode(cname, bp); | |
864 | ceph::decode(mname, bp); | |
865 | ceph::decode(type, bp); | |
866 | } catch (const buffer::error&) { | |
867 | throw crimson::osd::invalid_argument{}; | |
868 | } | |
869 | ||
870 | auto filter = get_pgls_filter(type, bp); | |
871 | ||
872 | hobject_t lower_bound; | |
873 | try { | |
874 | lower_bound.decode(bp); | |
875 | } catch (const buffer::error&) { | |
876 | throw std::invalid_argument("unable to decode PGNLS_FILTER description"); | |
877 | } | |
878 | ||
879 | logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}", | |
880 | __func__, cname, mname, type, lower_bound, | |
881 | static_cast<const void*>(filter.get())); | |
882 | return seastar::do_with(std::move(filter), | |
883 | [&, lower_bound=std::move(lower_bound)](auto&& filter) { | |
884 | const auto pg_start = pg.get_pgid().pgid.get_hobj_start(); | |
885 | const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num()); | |
886 | return do_pgnls_common(pg_start, | |
887 | pg_end, | |
888 | pg.get_backend(), | |
889 | lower_bound, | |
890 | nspace, | |
891 | osd_op.op.pgls.count, | |
892 | filter.get()) | |
20effc67 | 893 | .then_interruptible([&osd_op](bufferlist bl) { |
9f95a23c TL |
894 | osd_op.outdata = std::move(bl); |
895 | return seastar::now(); | |
896 | }); | |
897 | }); | |
898 | } | |
899 | ||
20effc67 | 900 | static PG::interruptible_future<ceph::bufferlist> do_pgls_common( |
9f95a23c TL |
901 | const hobject_t& pg_start, |
902 | const hobject_t& pg_end, | |
903 | const PGBackend& backend, | |
904 | const hobject_t& lower_bound, | |
905 | const std::string& nspace, | |
906 | const uint64_t limit, | |
907 | const PGLSFilter* const filter) | |
908 | { | |
909 | if (!(lower_bound.is_min() || | |
910 | lower_bound.is_max() || | |
911 | (lower_bound >= pg_start && lower_bound < pg_end))) { | |
912 | // this should only happen with a buggy client. | |
913 | throw std::invalid_argument("outside of PG bounds"); | |
914 | } | |
915 | ||
916 | using entries_t = decltype(pg_ls_response_t::entries); | |
20effc67 | 917 | return backend.list_objects(lower_bound, limit).then_interruptible( |
f67539c2 TL |
918 | [&backend, filter, nspace](auto&& ret) { |
919 | auto& [objects, next] = ret; | |
20effc67 TL |
920 | return PG::interruptor::when_all( |
921 | PG::interruptor::map_reduce(std::move(objects), | |
922 | [&backend, filter, nspace](const hobject_t& obj) | |
923 | -> PG::interruptible_future<hobject_t>{ | |
9f95a23c TL |
924 | if (obj.get_namespace() == nspace) { |
925 | if (filter) { | |
926 | return pgls_filter(*filter, backend, obj); | |
927 | } else { | |
928 | return seastar::make_ready_future<hobject_t>(obj); | |
929 | } | |
930 | } else { | |
20effc67 | 931 | return seastar::make_ready_future<hobject_t>(); |
9f95a23c TL |
932 | } |
933 | }, | |
934 | entries_t{}, | |
f67539c2 | 935 | [](entries_t entries, hobject_t obj) { |
9f95a23c TL |
936 | if (!obj.is_min()) { |
937 | entries.emplace_back(obj.oid, obj.get_key()); | |
938 | } | |
939 | return entries; | |
940 | }), | |
941 | seastar::make_ready_future<hobject_t>(next)); | |
20effc67 | 942 | }).then_interruptible([pg_end](auto&& ret) { |
f67539c2 TL |
943 | auto entries = std::move(std::get<0>(ret).get0()); |
944 | auto next = std::move(std::get<1>(ret).get0()); | |
9f95a23c TL |
945 | pg_ls_response_t response; |
946 | response.handle = next.is_max() ? pg_end : next; | |
947 | response.entries = std::move(entries); | |
948 | ceph::bufferlist out; | |
949 | encode(response, out); | |
950 | logger().debug("{}: response.entries.size()=", | |
951 | __func__, response.entries.size()); | |
952 | return seastar::make_ready_future<ceph::bufferlist>(std::move(out)); | |
953 | }); | |
954 | } | |
955 | ||
20effc67 | 956 | static PG::interruptible_future<> do_pgls( |
9f95a23c TL |
957 | const PG& pg, |
958 | const std::string& nspace, | |
959 | OSDOp& osd_op) | |
960 | { | |
961 | hobject_t lower_bound; | |
962 | auto bp = osd_op.indata.cbegin(); | |
963 | try { | |
964 | lower_bound.decode(bp); | |
965 | } catch (const buffer::error&) { | |
966 | throw std::invalid_argument{"unable to decode PGLS handle"}; | |
967 | } | |
968 | const auto pg_start = pg.get_pgid().pgid.get_hobj_start(); | |
969 | const auto pg_end = | |
970 | pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num()); | |
971 | return do_pgls_common(pg_start, | |
972 | pg_end, | |
973 | pg.get_backend(), | |
974 | lower_bound, | |
975 | nspace, | |
976 | osd_op.op.pgls.count, | |
977 | nullptr /* no filter */) | |
20effc67 | 978 | .then_interruptible([&osd_op](bufferlist bl) { |
9f95a23c TL |
979 | osd_op.outdata = std::move(bl); |
980 | return seastar::now(); | |
981 | }); | |
982 | } | |
983 | ||
20effc67 | 984 | static PG::interruptible_future<> do_pgls_filtered( |
9f95a23c TL |
985 | const PG& pg, |
986 | const std::string& nspace, | |
987 | OSDOp& osd_op) | |
988 | { | |
989 | std::string cname, mname, type; | |
990 | auto bp = osd_op.indata.cbegin(); | |
991 | try { | |
992 | ceph::decode(cname, bp); | |
993 | ceph::decode(mname, bp); | |
994 | ceph::decode(type, bp); | |
995 | } catch (const buffer::error&) { | |
996 | throw crimson::osd::invalid_argument{}; | |
997 | } | |
998 | ||
999 | auto filter = get_pgls_filter(type, bp); | |
1000 | ||
1001 | hobject_t lower_bound; | |
1002 | try { | |
1003 | lower_bound.decode(bp); | |
1004 | } catch (const buffer::error&) { | |
1005 | throw std::invalid_argument("unable to decode PGLS_FILTER description"); | |
1006 | } | |
1007 | ||
1008 | logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}", | |
1009 | __func__, cname, mname, type, lower_bound, | |
1010 | static_cast<const void*>(filter.get())); | |
1011 | return seastar::do_with(std::move(filter), | |
1012 | [&, lower_bound=std::move(lower_bound)](auto&& filter) { | |
1013 | const auto pg_start = pg.get_pgid().pgid.get_hobj_start(); | |
1014 | const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num()); | |
1015 | return do_pgls_common(pg_start, | |
1016 | pg_end, | |
1017 | pg.get_backend(), | |
1018 | lower_bound, | |
1019 | nspace, | |
1020 | osd_op.op.pgls.count, | |
1021 | filter.get()) | |
20effc67 | 1022 | .then_interruptible([&osd_op](bufferlist bl) { |
9f95a23c TL |
1023 | osd_op.outdata = std::move(bl); |
1024 | return seastar::now(); | |
1025 | }); | |
1026 | }); | |
1027 | } | |
1028 | ||
20effc67 | 1029 | PgOpsExecuter::interruptible_future<> |
f67539c2 | 1030 | PgOpsExecuter::execute_op(OSDOp& osd_op) |
9f95a23c TL |
1031 | { |
1032 | logger().warn("handling op {}", ceph_osd_op_name(osd_op.op.op)); | |
1033 | switch (const ceph_osd_op& op = osd_op.op; op.op) { | |
1034 | case CEPH_OSD_OP_PGLS: | |
f67539c2 | 1035 | return do_pgls(pg, nspace, osd_op); |
9f95a23c | 1036 | case CEPH_OSD_OP_PGLS_FILTER: |
f67539c2 | 1037 | return do_pgls_filtered(pg, nspace, osd_op); |
9f95a23c | 1038 | case CEPH_OSD_OP_PGNLS: |
f67539c2 | 1039 | return do_pgnls(pg, nspace, osd_op); |
9f95a23c | 1040 | case CEPH_OSD_OP_PGNLS_FILTER: |
f67539c2 | 1041 | return do_pgnls_filtered(pg, nspace, osd_op); |
9f95a23c TL |
1042 | default: |
1043 | logger().warn("unknown op {}", ceph_osd_op_name(op.op)); | |
1044 | throw std::runtime_error( | |
1045 | fmt::format("op '{}' not supported", ceph_osd_op_name(op.op))); | |
1046 | } | |
1047 | } | |
1048 | ||
1049 | } // namespace crimson::osd |