]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/ops_executer.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / osd / ops_executer.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "ops_executer.h"
5
6#include <boost/range/adaptor/filtered.hpp>
7#include <boost/range/adaptor/map.hpp>
8#include <boost/range/adaptor/transformed.hpp>
9#include <boost/range/algorithm_ext/push_back.hpp>
10#include <boost/range/algorithm/max_element.hpp>
11#include <boost/range/numeric.hpp>
12
13#include <fmt/format.h>
14#include <fmt/ostream.h>
15
16#include <seastar/core/thread.hh>
17
18#include "crimson/osd/exceptions.h"
f67539c2 19#include "crimson/osd/pg.h"
9f95a23c
TL
20#include "crimson/osd/watch.h"
21#include "osd/ClassHandler.h"
1e59de90 22#include "osd/SnapMapper.h"
9f95a23c
TL
23
24namespace {
25 seastar::logger& logger() {
26 return crimson::get_logger(ceph_subsys_osd);
27 }
28}
29
30namespace crimson::osd {
31
20effc67 32OpsExecuter::call_ierrorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
9f95a23c
TL
33{
34 std::string cname, mname;
35 ceph::bufferlist indata;
36 try {
37 auto bp = std::begin(osd_op.indata);
38 bp.copy(osd_op.op.cls.class_len, cname);
39 bp.copy(osd_op.op.cls.method_len, mname);
40 bp.copy(osd_op.op.cls.indata_len, indata);
41 } catch (buffer::error&) {
42 logger().warn("call unable to decode class + method + indata");
43 return crimson::ct_error::invarg::make();
44 }
45
46 // NOTE: opening a class can actually result in dlopen(), and thus
47 // blocking the entire reactor. Thankfully to ClassHandler's cache
48 // this is supposed to be extremely infrequent.
49 ClassHandler::ClassData* cls;
50 int r = ClassHandler::get_instance().open_class(cname, &cls);
51 if (r) {
52 logger().warn("class {} open got {}", cname, cpp_strerror(r));
53 if (r == -ENOENT) {
54 return crimson::ct_error::operation_not_supported::make();
55 } else if (r == -EPERM) {
56 // propagate permission errors
57 return crimson::ct_error::permission_denied::make();
58 }
59 return crimson::ct_error::input_output_error::make();
60 }
61
62 ClassHandler::ClassMethod* method = cls->get_method(mname);
63 if (!method) {
64 logger().warn("call method {}.{} does not exist", cname, mname);
65 return crimson::ct_error::operation_not_supported::make();
66 }
67
68 const auto flags = method->get_flags();
69 if (!obc->obs.exists && (flags & CLS_METHOD_WR) == 0) {
70 return crimson::ct_error::enoent::make();
71 }
72
73#if 0
74 if (flags & CLS_METHOD_WR) {
75 ctx->user_modify = true;
76 }
77#endif
78
f67539c2
TL
79 logger().debug("calling method {}.{}, num_read={}, num_write={}",
80 cname, mname, num_read, num_write);
81 const auto prev_rd = num_read;
82 const auto prev_wr = num_write;
20effc67 83 return interruptor::async(
9f95a23c
TL
84 [this, method, indata=std::move(indata)]() mutable {
85 ceph::bufferlist outdata;
86 auto cls_context = reinterpret_cast<cls_method_context_t>(this);
87 const auto ret = method->exec(cls_context, indata, outdata);
88 return std::make_pair(ret, std::move(outdata));
89 }
20effc67 90 ).then_interruptible(
f67539c2 91 [this, prev_rd, prev_wr, &osd_op, flags]
9f95a23c
TL
92 (auto outcome) -> call_errorator::future<> {
93 auto& [ret, outdata] = outcome;
f67539c2 94 osd_op.rval = ret;
9f95a23c 95
f67539c2
TL
96 logger().debug("do_op_call: method returned ret={}, outdata.length()={}"
97 " while num_read={}, num_write={}",
98 ret, outdata.length(), num_read, num_write);
9f95a23c
TL
99 if (num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
100 logger().error("method tried to read object but is not marked RD");
f67539c2 101 osd_op.rval = -EIO;
9f95a23c
TL
102 return crimson::ct_error::input_output_error::make();
103 }
104 if (num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
105 logger().error("method tried to update object but is not marked WR");
f67539c2 106 osd_op.rval = -EIO;
9f95a23c
TL
107 return crimson::ct_error::input_output_error::make();
108 }
f67539c2
TL
109 // ceph-osd has this implemented in `PrimaryLogPG::execute_ctx`,
110 // grep for `ignore_out_data`.
111 using crimson::common::local_conf;
112 if (op_info.allows_returnvec() &&
113 op_info.may_write() &&
114 ret >= 0 &&
115 outdata.length() > local_conf()->osd_max_write_op_reply_len) {
116 // the justification of this limit it to not inflate the pg log.
117 // that's the reason why we don't worry about pure reads.
118 logger().error("outdata overflow due to .length()={}, limit={}",
119 outdata.length(),
120 local_conf()->osd_max_write_op_reply_len);
121 osd_op.rval = -EOVERFLOW;
122 return crimson::ct_error::value_too_large::make();
123 }
124 // for write calls we never return data expect errors or RETURNVEC.
125 // please refer cls/cls_hello.cc to details.
126 if (!op_info.may_write() || op_info.allows_returnvec() || ret < 0) {
9f95a23c
TL
127 osd_op.op.extent.length = outdata.length();
128 osd_op.outdata.claim_append(outdata);
129 }
130 if (ret < 0) {
f67539c2
TL
131 return crimson::stateful_ec{
132 std::error_code(-ret, std::generic_category()) };
133 } else {
134 return seastar::now();
9f95a23c 135 }
9f95a23c
TL
136 }
137 );
138}
139
140static watch_info_t create_watch_info(const OSDOp& osd_op,
1e59de90
TL
141 const OpsExecuter::ExecutableMessage& msg,
142 entity_addr_t peer_addr)
9f95a23c
TL
143{
144 using crimson::common::local_conf;
145 const uint32_t timeout =
146 osd_op.op.watch.timeout == 0 ? local_conf()->osd_client_watch_timeout
147 : osd_op.op.watch.timeout;
148 return {
149 osd_op.op.watch.cookie,
150 timeout,
1e59de90 151 peer_addr
9f95a23c
TL
152 };
153}
154
20effc67 155OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
9f95a23c
TL
156 OSDOp& osd_op,
157 ObjectState& os,
158 ceph::os::Transaction& txn)
159{
20effc67 160 logger().debug("{}", __func__);
9f95a23c
TL
161 struct connect_ctx_t {
162 ObjectContext::watch_key_t key;
163 crimson::net::ConnectionRef conn;
164 watch_info_t info;
165
1e59de90
TL
166 connect_ctx_t(
167 const OSDOp& osd_op,
168 const ExecutableMessage& msg,
169 crimson::net::ConnectionRef conn)
9f95a23c 170 : key(osd_op.op.watch.cookie, msg.get_reqid().name),
1e59de90
TL
171 conn(conn),
172 info(create_watch_info(osd_op, msg, conn->get_peer_addr())) {
9f95a23c
TL
173 }
174 };
1e59de90
TL
175
176 return with_effect_on_obc(
177 connect_ctx_t{ osd_op, get_message(), conn },
178 [&](auto& ctx) {
9f95a23c
TL
179 const auto& entity = ctx.key.second;
180 auto [it, emplaced] =
181 os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
182 if (emplaced) {
183 logger().info("registered new watch {} by {}", it->second, entity);
184 txn.nop();
185 } else {
186 logger().info("found existing watch {} by {}", it->second, entity);
187 }
188 return seastar::now();
189 },
1e59de90 190 [](auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
20effc67 191 assert(pg);
9f95a23c
TL
192 auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
193 if (emplaced) {
194 const auto& [cookie, entity] = ctx.key;
20effc67
TL
195 it->second = crimson::osd::Watch::create(
196 obc, ctx.info, entity, std::move(pg));
9f95a23c
TL
197 logger().info("op_effect: added new watcher: {}", ctx.key);
198 } else {
199 logger().info("op_effect: found existing watcher: {}", ctx.key);
200 }
201 return it->second->connect(std::move(ctx.conn), true /* will_ping */);
1e59de90
TL
202 }
203 );
9f95a23c
TL
204}
205
20effc67 206OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
9f95a23c
TL
207 OSDOp& osd_op,
208 ObjectState& os,
209 ceph::os::Transaction& txn)
210{
211 const entity_name_t& entity = get_message().get_reqid().name;
212 const auto& cookie = osd_op.op.watch.cookie;
213 if (!os.oi.watchers.count(std::make_pair(cookie, entity))) {
214 return crimson::ct_error::not_connected::make();
215 } else {
216 logger().info("found existing watch by {}", entity);
217 return do_op_watch_subop_watch(osd_op, os, txn);
218 }
219}
220
20effc67 221OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_unwatch(
9f95a23c
TL
222 OSDOp& osd_op,
223 ObjectState& os,
224 ceph::os::Transaction& txn)
225{
226 logger().info("{}", __func__);
227
228 struct disconnect_ctx_t {
229 ObjectContext::watch_key_t key;
20effc67 230 disconnect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg)
9f95a23c
TL
231 : key(osd_op.op.watch.cookie, msg.get_reqid().name) {
232 }
233 };
234 return with_effect_on_obc(disconnect_ctx_t{ osd_op, get_message() },
235 [&] (auto& ctx) {
236 const auto& entity = ctx.key.second;
237 if (auto nh = os.oi.watchers.extract(ctx.key); !nh.empty()) {
238 logger().info("removed watch {} by {}", nh.mapped(), entity);
239 txn.nop();
240 } else {
241 logger().info("can't remove: no watch by {}", entity);
242 }
243 return seastar::now();
244 },
20effc67 245 [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
9f95a23c
TL
246 if (auto nh = obc->watchers.extract(ctx.key); !nh.empty()) {
247 return seastar::do_with(std::move(nh.mapped()),
248 [ctx](auto&& watcher) {
249 logger().info("op_effect: disconnect watcher {}", ctx.key);
20effc67 250 return watcher->remove();
9f95a23c
TL
251 });
252 } else {
253 logger().info("op_effect: disconnect failed to find watcher {}", ctx.key);
254 return seastar::now();
255 }
256 });
257}
258
20effc67 259OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_ping(
9f95a23c
TL
260 OSDOp& osd_op,
261 ObjectState& os,
262 ceph::os::Transaction& txn)
263{
264 const entity_name_t& entity = get_message().get_reqid().name;
265 const auto& cookie = osd_op.op.watch.cookie;
266 const auto key = std::make_pair(cookie, entity);
267
268 // Note: WATCH with PING doesn't cause may_write() to return true,
269 // so if there is nothing else in the transaction, this is going
270 // to run do_osd_op_effects, but not write out a log entry */
271 if (!os.oi.watchers.count(key)) {
272 return crimson::ct_error::not_connected::make();
273 }
274 auto it = obc->watchers.find(key);
275 if (it == std::end(obc->watchers) || !it->second->is_connected()) {
276 return crimson::ct_error::timed_out::make();
277 }
278 logger().info("found existing watch by {}", entity);
279 it->second->got_ping(ceph_clock_now());
280 return seastar::now();
281}
282
20effc67 283OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch(
9f95a23c
TL
284 OSDOp& osd_op,
285 ObjectState& os,
286 ceph::os::Transaction& txn)
287{
288 logger().debug("{}", __func__);
289 if (!os.exists) {
290 return crimson::ct_error::enoent::make();
291 }
292 switch (osd_op.op.watch.op) {
293 case CEPH_OSD_WATCH_OP_WATCH:
294 return do_op_watch_subop_watch(osd_op, os, txn);
295 case CEPH_OSD_WATCH_OP_RECONNECT:
296 return do_op_watch_subop_reconnect(osd_op, os, txn);
297 case CEPH_OSD_WATCH_OP_PING:
298 return do_op_watch_subop_ping(osd_op, os, txn);
299 case CEPH_OSD_WATCH_OP_UNWATCH:
300 return do_op_watch_subop_unwatch(osd_op, os, txn);
301 case CEPH_OSD_WATCH_OP_LEGACY_WATCH:
302 logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH");
303 return crimson::ct_error::invarg::make();
304 }
305 logger().warn("unrecognized WATCH subop: {}", osd_op.op.watch.op);
306 return crimson::ct_error::invarg::make();
307}
308
309static uint64_t get_next_notify_id(epoch_t e)
310{
311 // FIXME
312 static std::uint64_t next_notify_id = 0;
313 return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++));
314}
315
20effc67 316OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify(
9f95a23c
TL
317 OSDOp& osd_op,
318 const ObjectState& os)
319{
320 logger().debug("{}, msg epoch: {}", __func__, get_message().get_map_epoch());
321
322 if (!os.exists) {
323 return crimson::ct_error::enoent::make();
324 }
325 struct notify_ctx_t {
326 crimson::net::ConnectionRef conn;
327 notify_info_t ninfo;
328 const uint64_t client_gid;
329 const epoch_t epoch;
330
1e59de90
TL
331 notify_ctx_t(const ExecutableMessage& msg,
332 crimson::net::ConnectionRef conn)
333 : conn(conn),
9f95a23c
TL
334 client_gid(msg.get_reqid().name.num()),
335 epoch(msg.get_map_epoch()) {
336 }
337 };
1e59de90
TL
338 return with_effect_on_obc(
339 notify_ctx_t{ get_message(), conn },
340 [&](auto& ctx) {
9f95a23c
TL
341 try {
342 auto bp = osd_op.indata.cbegin();
343 uint32_t ver; // obsolete
344 ceph::decode(ver, bp);
345 ceph::decode(ctx.ninfo.timeout, bp);
346 ceph::decode(ctx.ninfo.bl, bp);
347 } catch (const buffer::error&) {
348 ctx.ninfo.timeout = 0;
349 }
350 if (!ctx.ninfo.timeout) {
351 using crimson::common::local_conf;
352 ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
353 }
354 ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
355 ctx.ninfo.cookie = osd_op.op.notify.cookie;
356 // return our unique notify id to the client
357 ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
358 return seastar::now();
359 },
1e59de90 360 [](auto&& ctx, ObjectContextRef obc, Ref<PG>) {
9f95a23c 361 auto alive_watchers = obc->watchers | boost::adaptors::map_values
1e59de90
TL
362 | boost::adaptors::filtered(
363 [] (const auto& w) {
364 // FIXME: filter as for the `is_ping` in `Watch::start_notify`
365 return w->is_alive();
366 });
9f95a23c
TL
367 return crimson::osd::Notify::create_n_propagate(
368 std::begin(alive_watchers),
369 std::end(alive_watchers),
370 std::move(ctx.conn),
371 ctx.ninfo,
372 ctx.client_gid,
373 obc->obs.oi.user_version);
1e59de90
TL
374 }
375 );
9f95a23c
TL
376}
377
20effc67
TL
378OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_list_watchers(
379 OSDOp& osd_op,
380 const ObjectState& os)
381{
382 logger().debug("{}", __func__);
383
384 obj_list_watch_response_t response;
385 for (const auto& [key, info] : os.oi.watchers) {
386 logger().debug("{}: key cookie={}, entity={}",
387 __func__, key.first, key.second);
388 assert(key.first == info.cookie);
389 assert(key.second.is_client());
390 response.entries.emplace_back(watch_item_t{
391 key.second, info.cookie, info.timeout_seconds, info.addr});
20effc67 392 }
1e59de90 393 response.encode(osd_op.outdata, get_message().get_features());
20effc67
TL
394 return watch_ierrorator::now();
395}
396
397OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify_ack(
9f95a23c
TL
398 OSDOp& osd_op,
399 const ObjectState& os)
400{
401 logger().debug("{}", __func__);
402
403 struct notifyack_ctx_t {
404 const entity_name_t entity;
405 uint64_t watch_cookie;
406 uint64_t notify_id;
407 ceph::bufferlist reply_bl;
408
20effc67
TL
409 notifyack_ctx_t(const ExecutableMessage& msg)
410 : entity(msg.get_reqid().name) {
9f95a23c
TL
411 }
412 };
413 return with_effect_on_obc(notifyack_ctx_t{ get_message() },
414 [&] (auto& ctx) -> watch_errorator::future<> {
415 try {
416 auto bp = osd_op.indata.cbegin();
417 ceph::decode(ctx.notify_id, bp);
418 ceph::decode(ctx.watch_cookie, bp);
419 if (!bp.end()) {
420 ceph::decode(ctx.reply_bl, bp);
421 }
422 } catch (const buffer::error&) {
423 // here we behave differently than ceph-osd. For historical reasons,
424 // it falls back to using `osd_op.op.watch.cookie` as `ctx.notify_id`.
425 // crimson just returns EINVAL if the data cannot be decoded.
426 return crimson::ct_error::invarg::make();
427 }
428 return watch_errorator::now();
429 },
20effc67 430 [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
9f95a23c
TL
431 logger().info("notify_ack watch_cookie={}, notify_id={}",
432 ctx.watch_cookie, ctx.notify_id);
433 return seastar::do_for_each(obc->watchers,
434 [ctx=std::move(ctx)] (auto& kv) {
435 const auto& [key, watchp] = kv;
436 static_assert(
437 std::is_same_v<std::decay_t<decltype(watchp)>,
438 seastar::shared_ptr<crimson::osd::Watch>>);
439 auto& [cookie, entity] = key;
440 if (ctx.entity != entity) {
441 logger().debug("skipping watch {}; entity name {} != {}",
442 key, entity, ctx.entity);
443 return seastar::now();
444 }
445 if (ctx.watch_cookie != cookie) {
446 logger().debug("skipping watch {}; cookie {} != {}",
447 key, ctx.watch_cookie, cookie);
448 return seastar::now();
449 }
450 logger().info("acking notify on watch {}", key);
451 return watchp->notify_ack(ctx.notify_id, ctx.reply_bl);
452 });
453 });
454}
455
20effc67
TL
456// Defined here because there is a circular dependency between OpsExecuter and PG
457template <class Func>
458auto OpsExecuter::do_const_op(Func&& f) {
459 // TODO: pass backend as read-only
460 return std::forward<Func>(f)(pg->get_backend(), std::as_const(obc->obs));
461}
462
463// Defined here because there is a circular dependency between OpsExecuter and PG
464template <class Func>
1e59de90 465auto OpsExecuter::do_write_op(Func&& f, OpsExecuter::modified_by m) {
20effc67
TL
466 ++num_write;
467 if (!osd_op_params) {
468 osd_op_params.emplace();
1e59de90 469 fill_op_params_bump_pg_version();
20effc67 470 }
1e59de90 471 user_modify = (m == modified_by::user);
20effc67
TL
472 return std::forward<Func>(f)(pg->get_backend(), obc->obs, txn);
473}
1e59de90
TL
474OpsExecuter::call_errorator::future<> OpsExecuter::do_assert_ver(
475 OSDOp& osd_op,
476 const ObjectState& os)
477{
478 if (!osd_op.op.assert_ver.ver) {
479 return crimson::ct_error::invarg::make();
480 } else if (osd_op.op.assert_ver.ver < os.oi.user_version) {
481 return crimson::ct_error::erange::make();
482 } else if (osd_op.op.assert_ver.ver > os.oi.user_version) {
483 return crimson::ct_error::value_too_large::make();
484 }
485 return seastar::now();
486}
487
488OpsExecuter::list_snaps_iertr::future<> OpsExecuter::do_list_snaps(
489 OSDOp& osd_op,
490 const ObjectState& os,
491 const SnapSet& ss)
492{
493 obj_list_snap_response_t resp;
494 resp.clones.reserve(ss.clones.size() + 1);
495 for (auto &clone: ss.clones) {
496 clone_info ci;
497 ci.cloneid = clone;
498
499 {
500 auto p = ss.clone_snaps.find(clone);
501 if (p == ss.clone_snaps.end()) {
502 logger().error(
503 "OpsExecutor::do_list_snaps: {} has inconsistent "
504 "clone_snaps, missing clone {}",
505 os.oi.soid,
506 clone);
507 return crimson::ct_error::invarg::make();
508 }
509 ci.snaps.reserve(p->second.size());
510 ci.snaps.insert(ci.snaps.end(), p->second.rbegin(), p->second.rend());
511 }
512
513 {
514 auto p = ss.clone_overlap.find(clone);
515 if (p == ss.clone_overlap.end()) {
516 logger().error(
517 "OpsExecutor::do_list_snaps: {} has inconsistent "
518 "clone_overlap, missing clone {}",
519 os.oi.soid,
520 clone);
521 return crimson::ct_error::invarg::make();
522 }
523 ci.overlap.reserve(p->second.num_intervals());
524 ci.overlap.insert(ci.overlap.end(), p->second.begin(), p->second.end());
525 }
526
527 {
528 auto p = ss.clone_size.find(clone);
529 if (p == ss.clone_size.end()) {
530 logger().error(
531 "OpsExecutor::do_list_snaps: {} has inconsistent "
532 "clone_size, missing clone {}",
533 os.oi.soid,
534 clone);
535 return crimson::ct_error::invarg::make();
536 }
537 ci.size = p->second;
538 }
539 resp.clones.push_back(std::move(ci));
540 }
541
542 if (!os.oi.is_whiteout()) {
543 clone_info ci;
544 ci.cloneid = CEPH_NOSNAP;
545 ci.size = os.oi.size;
546 resp.clones.push_back(std::move(ci));
547 }
548 resp.seq = ss.seq;
549 logger().error(
550 "OpsExecutor::do_list_snaps: {}, resp.clones.size(): {}",
551 os.oi.soid,
552 resp.clones.size());
553 resp.encode(osd_op.outdata);
554 return read_ierrorator::now();
555}
20effc67
TL
556
557OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator>
f67539c2 558OpsExecuter::execute_op(OSDOp& osd_op)
1e59de90
TL
559{
560 return do_execute_op(osd_op).handle_error_interruptible(
561 osd_op_errorator::all_same_way([&osd_op](auto e, auto&& e_raw)
562 -> OpsExecuter::osd_op_errorator::future<> {
563 // All ops except for CMPEXT should have rval set to -e.value(),
564 // CMPEXT sets rval itself and shouldn't be overridden.
565 if (e.value() != ct_error::cmp_fail_error_value) {
566 osd_op.rval = -e.value();
567 }
568 if ((osd_op.op.flags & CEPH_OSD_OP_FLAG_FAILOK) &&
569 e.value() != EAGAIN && e.value() != EINPROGRESS) {
570 return osd_op_errorator::now();
571 } else {
572 return std::move(e_raw);
573 }
574 }));
575}
576
577OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator>
578OpsExecuter::do_execute_op(OSDOp& osd_op)
f67539c2
TL
579{
580 // TODO: dispatch via call table?
581 // TODO: we might want to find a way to unify both input and output
582 // of each op.
583 logger().debug(
584 "handling op {} on object {}",
585 ceph_osd_op_name(osd_op.op.op),
586 get_target());
587 switch (const ceph_osd_op& op = osd_op.op; op.op) {
588 case CEPH_OSD_OP_SYNC_READ:
589 [[fallthrough]];
590 case CEPH_OSD_OP_READ:
1e59de90 591 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
20effc67 592 return backend.read(os, osd_op, delta_stats);
f67539c2
TL
593 });
594 case CEPH_OSD_OP_SPARSE_READ:
1e59de90 595 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
20effc67 596 return backend.sparse_read(os, osd_op, delta_stats);
f67539c2
TL
597 });
598 case CEPH_OSD_OP_CHECKSUM:
1e59de90 599 return do_read_op([&osd_op](auto& backend, const auto& os) {
f67539c2
TL
600 return backend.checksum(os, osd_op);
601 });
602 case CEPH_OSD_OP_CMPEXT:
1e59de90 603 return do_read_op([&osd_op](auto& backend, const auto& os) {
f67539c2
TL
604 return backend.cmp_ext(os, osd_op);
605 });
606 case CEPH_OSD_OP_GETXATTR:
1e59de90 607 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
20effc67 608 return backend.getxattr(os, osd_op, delta_stats);
f67539c2
TL
609 });
610 case CEPH_OSD_OP_GETXATTRS:
1e59de90 611 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
20effc67
TL
612 return backend.get_xattrs(os, osd_op, delta_stats);
613 });
614 case CEPH_OSD_OP_CMPXATTR:
1e59de90 615 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
20effc67 616 return backend.cmp_xattr(os, osd_op, delta_stats);
f67539c2
TL
617 });
618 case CEPH_OSD_OP_RMXATTR:
1e59de90 619 return do_write_op([&osd_op](auto& backend, auto& os, auto& txn) {
f67539c2 620 return backend.rm_xattr(os, osd_op, txn);
1e59de90 621 });
f67539c2 622 case CEPH_OSD_OP_CREATE:
1e59de90 623 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 624 return backend.create(os, osd_op, txn, delta_stats);
1e59de90 625 });
f67539c2 626 case CEPH_OSD_OP_WRITE:
1e59de90 627 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 628 return backend.write(os, osd_op, txn, *osd_op_params, delta_stats);
1e59de90 629 });
f67539c2 630 case CEPH_OSD_OP_WRITESAME:
1e59de90 631 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 632 return backend.write_same(os, osd_op, txn, *osd_op_params, delta_stats);
1e59de90 633 });
f67539c2 634 case CEPH_OSD_OP_WRITEFULL:
1e59de90 635 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 636 return backend.writefull(os, osd_op, txn, *osd_op_params, delta_stats);
1e59de90
TL
637 });
638 case CEPH_OSD_OP_ROLLBACK:
639 return do_write_op([this, &head=obc,
640 &osd_op](auto& backend, auto& os, auto& txn) {
641 return backend.rollback(os, osd_op, txn, *osd_op_params, delta_stats,
642 head, pg->obc_loader);
643 });
f67539c2 644 case CEPH_OSD_OP_APPEND:
1e59de90 645 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 646 return backend.append(os, osd_op, txn, *osd_op_params, delta_stats);
1e59de90 647 });
f67539c2 648 case CEPH_OSD_OP_TRUNCATE:
1e59de90 649 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
f67539c2
TL
650 // FIXME: rework needed. Move this out to do_write_op(), introduce
651 // do_write_op_no_user_modify()...
20effc67 652 return backend.truncate(os, osd_op, txn, *osd_op_params, delta_stats);
1e59de90 653 });
f67539c2 654 case CEPH_OSD_OP_ZERO:
1e59de90 655 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 656 return backend.zero(os, osd_op, txn, *osd_op_params, delta_stats);
1e59de90 657 });
f67539c2 658 case CEPH_OSD_OP_SETALLOCHINT:
1e59de90
TL
659 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
660 return backend.set_allochint(os, osd_op, txn, delta_stats);
661 });
f67539c2 662 case CEPH_OSD_OP_SETXATTR:
1e59de90 663 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 664 return backend.setxattr(os, osd_op, txn, delta_stats);
1e59de90 665 });
f67539c2 666 case CEPH_OSD_OP_DELETE:
1e59de90
TL
667 {
668 bool whiteout = false;
669 if (!obc->ssc->snapset.clones.empty() ||
670 (snapc.snaps.size() && // there are snaps
671 snapc.snaps[0] > obc->ssc->snapset.seq)) { // existing obj is old
672 logger().debug("{} has or will have clones, will whiteout {}",
673 __func__, obc->obs.oi.soid);
674 whiteout = true;
675 }
676 return do_write_op([this, whiteout](auto& backend, auto& os, auto& txn) {
677 return backend.remove(os, txn, delta_stats, whiteout);
678 });
679 }
f67539c2
TL
680 case CEPH_OSD_OP_CALL:
681 return this->do_op_call(osd_op);
682 case CEPH_OSD_OP_STAT:
683 // note: stat does not require RD
20effc67
TL
684 return do_const_op([this, &osd_op] (/* const */auto& backend, const auto& os) {
685 return backend.stat(os, osd_op, delta_stats);
f67539c2 686 });
1e59de90
TL
687
688 case CEPH_OSD_OP_TMAPPUT:
689 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
690 return backend.tmapput(os, osd_op, txn, delta_stats, *osd_op_params);
691 });
f67539c2 692 case CEPH_OSD_OP_TMAPUP:
1e59de90
TL
693 return do_write_op([this, &osd_op](auto& backend, auto& os, auto &txn) {
694 return backend.tmapup(os, osd_op, txn, delta_stats, *osd_op_params);
695 });
696 case CEPH_OSD_OP_TMAPGET:
697 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
698 return backend.tmapget(os, osd_op, delta_stats);
699 });
f67539c2
TL
700
701 // OMAP
702 case CEPH_OSD_OP_OMAPGETKEYS:
1e59de90 703 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
20effc67 704 return backend.omap_get_keys(os, osd_op, delta_stats);
f67539c2
TL
705 });
706 case CEPH_OSD_OP_OMAPGETVALS:
1e59de90 707 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
20effc67 708 return backend.omap_get_vals(os, osd_op, delta_stats);
f67539c2 709 });
1e59de90
TL
710 case CEPH_OSD_OP_OMAP_CMP:
711 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
712 return backend.omap_cmp(os, osd_op, delta_stats);
713 });
f67539c2 714 case CEPH_OSD_OP_OMAPGETHEADER:
1e59de90 715 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
20effc67 716 return backend.omap_get_header(os, osd_op, delta_stats);
f67539c2
TL
717 });
718 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
1e59de90 719 return do_read_op([this, &osd_op](auto& backend, const auto& os) {
20effc67 720 return backend.omap_get_vals_by_keys(os, osd_op, delta_stats);
f67539c2
TL
721 });
722 case CEPH_OSD_OP_OMAPSETVALS:
723#if 0
1e59de90 724 if (!pg.get_pgpool().info.supports_omap()) {
f67539c2
TL
725 return crimson::ct_error::operation_not_supported::make();
726 }
727#endif
1e59de90 728 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 729 return backend.omap_set_vals(os, osd_op, txn, *osd_op_params, delta_stats);
1e59de90 730 });
f67539c2
TL
731 case CEPH_OSD_OP_OMAPSETHEADER:
732#if 0
1e59de90 733 if (!pg.get_pgpool().info.supports_omap()) {
f67539c2
TL
734 return crimson::ct_error::operation_not_supported::make();
735 }
736#endif
1e59de90 737 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67
TL
738 return backend.omap_set_header(os, osd_op, txn, *osd_op_params,
739 delta_stats);
1e59de90 740 });
f67539c2
TL
741 case CEPH_OSD_OP_OMAPRMKEYRANGE:
742#if 0
1e59de90 743 if (!pg.get_pgpool().info.supports_omap()) {
f67539c2
TL
744 return crimson::ct_error::operation_not_supported::make();
745 }
746#endif
1e59de90 747 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 748 return backend.omap_remove_range(os, osd_op, txn, delta_stats);
1e59de90 749 });
20effc67
TL
750 case CEPH_OSD_OP_OMAPRMKEYS:
751 /** TODO: Implement supports_omap()
1e59de90 752 if (!pg.get_pgpool().info.supports_omap()) {
20effc67
TL
753 return crimson::ct_error::operation_not_supported::make();
754 }*/
1e59de90 755 return do_write_op([&osd_op](auto& backend, auto& os, auto& txn) {
20effc67 756 return backend.omap_remove_key(os, osd_op, txn);
1e59de90 757 });
f67539c2 758 case CEPH_OSD_OP_OMAPCLEAR:
1e59de90 759 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
20effc67 760 return backend.omap_clear(os, osd_op, txn, *osd_op_params, delta_stats);
1e59de90 761 });
f67539c2
TL
762
763 // watch/notify
764 case CEPH_OSD_OP_WATCH:
1e59de90 765 return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
f67539c2 766 return do_op_watch(osd_op, os, txn);
1e59de90 767 }, modified_by::sys);
20effc67 768 case CEPH_OSD_OP_LIST_WATCHERS:
1e59de90 769 return do_read_op([this, &osd_op](auto&, const auto& os) {
20effc67
TL
770 return do_op_list_watchers(osd_op, os);
771 });
f67539c2 772 case CEPH_OSD_OP_NOTIFY:
1e59de90 773 return do_read_op([this, &osd_op](auto&, const auto& os) {
f67539c2
TL
774 return do_op_notify(osd_op, os);
775 });
776 case CEPH_OSD_OP_NOTIFY_ACK:
1e59de90 777 return do_read_op([this, &osd_op](auto&, const auto& os) {
f67539c2
TL
778 return do_op_notify_ack(osd_op, os);
779 });
1e59de90
TL
780 case CEPH_OSD_OP_ASSERT_VER:
781 return do_read_op([this, &osd_op](auto&, const auto& os) {
782 return do_assert_ver(osd_op, os);
783 });
784 case CEPH_OSD_OP_LIST_SNAPS:
785 return do_snapset_op([this, &osd_op](const auto &os, const auto &ss) {
786 return do_list_snaps(osd_op, os, ss);
787 });
f67539c2
TL
788
789 default:
790 logger().warn("unknown op {}", ceph_osd_op_name(op.op));
791 throw std::runtime_error(
792 fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
793 }
794}
795
1e59de90
TL
796void OpsExecuter::fill_op_params_bump_pg_version()
797{
798 osd_op_params->req_id = msg->get_reqid();
799 osd_op_params->mtime = msg->get_mtime();
800 osd_op_params->at_version = pg->next_version();
801 osd_op_params->pg_trim_to = pg->get_pg_trim_to();
802 osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk();
803 osd_op_params->last_complete = pg->get_info().last_complete;
804}
805
806std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
807 const std::vector<OSDOp>& ops)
808{
809 // let's ensure we don't need to inform SnapMapper about this particular
810 // entry.
811 assert(obc->obs.oi.soid.snap >= CEPH_MAXSNAP);
812 std::vector<pg_log_entry_t> log_entries;
813 log_entries.emplace_back(
814 obc->obs.exists ?
815 pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
816 obc->obs.oi.soid,
817 osd_op_params->at_version,
818 obc->obs.oi.version,
819 osd_op_params->user_modify ? osd_op_params->at_version.version : 0,
820 osd_op_params->req_id,
821 osd_op_params->mtime,
822 op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
823 if (op_info.allows_returnvec()) {
824 // also the per-op values are recorded in the pg log
825 log_entries.back().set_op_returns(ops);
826 logger().debug("{} op_returns: {}",
827 __func__, log_entries.back().op_returns);
828 }
829 log_entries.back().clean_regions = std::move(osd_op_params->clean_regions);
830 return log_entries;
831}
832
833OpsExecuter::interruptible_future<> OpsExecuter::snap_map_remove(
834 const hobject_t& soid,
835 SnapMapper& snap_mapper,
836 OSDriver& osdriver,
837 ceph::os::Transaction& txn)
838{
839 logger().debug("{}: soid {}", __func__, soid);
840 return interruptor::async([soid, &snap_mapper,
841 _t=osdriver.get_transaction(&txn)]() mutable {
842 const auto r = snap_mapper.remove_oid(soid, &_t);
843 if (r) {
844 logger().error("{}: remove_oid {} failed with {}",
845 __func__, soid, r);
846 }
847 // On removal tolerate missing key corruption
848 assert(r == 0 || r == -ENOENT);
849 });
850}
851
852OpsExecuter::interruptible_future<> OpsExecuter::snap_map_modify(
853 const hobject_t& soid,
854 const std::set<snapid_t>& snaps,
855 SnapMapper& snap_mapper,
856 OSDriver& osdriver,
857 ceph::os::Transaction& txn)
858{
859 logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
860 return interruptor::async([soid, snaps, &snap_mapper,
861 _t=osdriver.get_transaction(&txn)]() mutable {
862 assert(std::size(snaps) > 0);
863 [[maybe_unused]] const auto r = snap_mapper.update_snaps(
864 soid, snaps, 0, &_t);
865 assert(r == 0);
866 });
867}
868
869OpsExecuter::interruptible_future<> OpsExecuter::snap_map_clone(
870 const hobject_t& soid,
871 const std::set<snapid_t>& snaps,
872 SnapMapper& snap_mapper,
873 OSDriver& osdriver,
874 ceph::os::Transaction& txn)
875{
876 logger().debug("{}: soid {}, snaps {}", __func__, soid, snaps);
877 return interruptor::async([soid, snaps, &snap_mapper,
878 _t=osdriver.get_transaction(&txn)]() mutable {
879 assert(std::size(snaps) > 0);
880 snap_mapper.add_oid(soid, snaps, &_t);
881 });
882}
883
20effc67
TL
884// Defined here because there is a circular dependency between OpsExecuter and PG
885uint32_t OpsExecuter::get_pool_stripe_width() const {
1e59de90 886 return pg->get_pgpool().info.get_stripe_width();
20effc67
TL
887}
888
889// Defined here because there is a circular dependency between OpsExecuter and PG
890version_t OpsExecuter::get_last_user_version() const
891{
892 return pg->get_last_user_version();
893}
894
1e59de90
TL
895std::unique_ptr<OpsExecuter::CloningContext> OpsExecuter::execute_clone(
896 const SnapContext& snapc,
897 const ObjectState& initial_obs,
898 const SnapSet& initial_snapset,
899 PGBackend& backend,
900 ceph::os::Transaction& txn)
901{
902 const hobject_t& soid = initial_obs.oi.soid;
903 logger().debug("{} {} snapset={} snapc={}",
904 __func__, soid,
905 initial_snapset, snapc);
906
907 auto cloning_ctx = std::make_unique<CloningContext>();
908 cloning_ctx->new_snapset = initial_snapset;
909
910 // clone object, the snap field is set to the seq of the SnapContext
911 // at its creation.
912 hobject_t coid = soid;
913 coid.snap = snapc.seq;
914
915 // existing snaps are stored in descending order in snapc,
916 // cloned_snaps vector will hold all the snaps stored until snapset.seq
917 const std::vector<snapid_t> cloned_snaps = [&] {
918 auto last = std::find_if(
919 std::begin(snapc.snaps), std::end(snapc.snaps),
920 [&](snapid_t snap_id) { return snap_id <= initial_snapset.seq; });
921 return std::vector<snapid_t>{std::begin(snapc.snaps), last};
922 }();
923
924 auto [snap_oi, clone_obc] = prepare_clone(coid);
925 // make clone
926 backend.clone(snap_oi, initial_obs, clone_obc->obs, txn);
927
928 delta_stats.num_objects++;
929 if (snap_oi.is_omap()) {
930 delta_stats.num_objects_omap++;
931 }
932 delta_stats.num_object_clones++;
933 // newsnapset is obc's ssc
934 cloning_ctx->new_snapset.clones.push_back(coid.snap);
935 cloning_ctx->new_snapset.clone_size[coid.snap] = initial_obs.oi.size;
936 cloning_ctx->new_snapset.clone_snaps[coid.snap] = cloned_snaps;
937
938 // clone_overlap should contain an entry for each clone
939 // (an empty interval_set if there is no overlap)
940 auto &overlap = cloning_ctx->new_snapset.clone_overlap[coid.snap];
941 if (initial_obs.oi.size) {
942 overlap.insert(0, initial_obs.oi.size);
943 }
944
945 // log clone
946 logger().debug("cloning v {} to {} v {} snaps={} snapset={}",
947 initial_obs.oi.version, coid,
948 osd_op_params->at_version, cloned_snaps, cloning_ctx->new_snapset);
949
950 cloning_ctx->log_entry = {
951 pg_log_entry_t::CLONE,
952 coid,
953 snap_oi.version,
954 initial_obs.oi.version,
955 initial_obs.oi.user_version,
956 osd_reqid_t(),
957 initial_obs.oi.mtime, // will be replaced in `apply_to()`
958 0
959 };
960 encode(cloned_snaps, cloning_ctx->log_entry.snaps);
961
962 // TODO: update most recent clone_overlap and usage stats
963 return cloning_ctx;
964}
965
966void OpsExecuter::CloningContext::apply_to(
967 std::vector<pg_log_entry_t>& log_entries,
968 ObjectContext& processed_obc) &&
969{
970 log_entry.mtime = processed_obc.obs.oi.mtime;
971 log_entries.emplace_back(std::move(log_entry));
972 processed_obc.ssc->snapset = std::move(new_snapset);
973}
974
975OpsExecuter::interruptible_future<std::vector<pg_log_entry_t>>
976OpsExecuter::flush_clone_metadata(
977 std::vector<pg_log_entry_t>&& log_entries,
978 SnapMapper& snap_mapper,
979 OSDriver& osdriver,
980 ceph::os::Transaction& txn)
981{
982 assert(!txn.empty());
983 auto maybe_snap_mapped = interruptor::now();
984 if (cloning_ctx) {
985 std::move(*cloning_ctx).apply_to(log_entries, *obc);
986 const auto& coid = log_entries.back().soid;
987 const auto& cloned_snaps = obc->ssc->snapset.clone_snaps[coid.snap];
988 maybe_snap_mapped = snap_map_clone(
989 coid,
990 std::set<snapid_t>{std::begin(cloned_snaps), std::end(cloned_snaps)},
991 snap_mapper,
992 osdriver,
993 txn);
994 }
995 if (snapc.seq > obc->ssc->snapset.seq) {
996 // update snapset with latest snap context
997 obc->ssc->snapset.seq = snapc.seq;
998 obc->ssc->snapset.snaps.clear();
999 }
1000 logger().debug("{} done, initial snapset={}, new snapset={}",
1001 __func__, obc->obs.oi.soid, obc->ssc->snapset);
1002 return std::move(
1003 maybe_snap_mapped
1004 ).then_interruptible([log_entries=std::move(log_entries)]() mutable {
1005 return interruptor::make_ready_future<std::vector<pg_log_entry_t>>(
1006 std::move(log_entries));
1007 });
1008}
1009
1010// TODO: make this static
1011std::pair<object_info_t, ObjectContextRef> OpsExecuter::prepare_clone(
1012 const hobject_t& coid)
1013{
1014 object_info_t static_snap_oi(coid);
1015 static_snap_oi.version = pg->next_version();
1016 static_snap_oi.prior_version = obc->obs.oi.version;
1017 static_snap_oi.copy_user_bits(obc->obs.oi);
1018 if (static_snap_oi.is_whiteout()) {
1019 // clone shouldn't be marked as whiteout
1020 static_snap_oi.clear_flag(object_info_t::FLAG_WHITEOUT);
1021 }
1022
1023 ObjectContextRef clone_obc;
1024 if (pg->is_primary()) {
1025 // lookup_or_create
1026 auto [c_obc, existed] =
1027 pg->obc_registry.get_cached_obc(std::move(coid));
1028 assert(!existed);
1029 c_obc->obs.oi = static_snap_oi;
1030 c_obc->obs.exists = true;
1031 c_obc->ssc = obc->ssc;
1032 logger().debug("clone_obc: {}", c_obc->obs.oi);
1033 clone_obc = std::move(c_obc);
1034 }
1035 return std::make_pair(std::move(static_snap_oi), std::move(clone_obc));
1036}
1037
1038void OpsExecuter::apply_stats()
1039{
1040 pg->get_peering_state().apply_op_stats(get_target(), delta_stats);
1041 pg->publish_stats_to_osd();
1042}
1043
1044OpsExecuter::OpsExecuter(Ref<PG> pg,
1045 ObjectContextRef _obc,
1046 const OpInfo& op_info,
1047 abstracted_msg_t&& msg,
1048 crimson::net::ConnectionRef conn,
1049 const SnapContext& _snapc)
1050 : pg(std::move(pg)),
1051 obc(std::move(_obc)),
1052 op_info(op_info),
1053 msg(std::move(msg)),
1054 conn(conn),
1055 snapc(_snapc)
1056{
1057 if (op_info.may_write() && should_clone(*obc, snapc)) {
1058 do_write_op([this](auto& backend, auto& os, auto& txn) {
1059 cloning_ctx = execute_clone(std::as_const(snapc),
1060 std::as_const(obc->obs),
1061 std::as_const(obc->ssc->snapset),
1062 backend,
1063 txn);
1064 });
1065 }
1066}
1067
9f95a23c
TL
1068static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
1069 const std::string& type,
1070 bufferlist::const_iterator& iter)
1071{
1072 // storing non-const PGLSFilter for the sake of ::init()
1073 std::unique_ptr<PGLSFilter> filter;
1074 if (type.compare("plain") == 0) {
1075 filter = std::make_unique<PGLSPlainFilter>();
1076 } else {
1077 std::size_t dot = type.find(".");
1078 if (dot == type.npos || dot == 0 || dot == type.size() - 1) {
1079 throw crimson::osd::invalid_argument{};
1080 }
1081
1082 const std::string class_name = type.substr(0, dot);
1083 const std::string filter_name = type.substr(dot + 1);
1084 ClassHandler::ClassData *cls = nullptr;
1085 int r = ClassHandler::get_instance().open_class(class_name, &cls);
1086 if (r != 0) {
1087 logger().warn("can't open class {}: {}", class_name, cpp_strerror(r));
1088 if (r == -EPERM) {
1089 // propogate permission error
1090 throw crimson::osd::permission_denied{};
1091 } else {
1092 throw crimson::osd::invalid_argument{};
1093 }
1094 } else {
1095 ceph_assert(cls);
1096 }
1097
1098 ClassHandler::ClassFilter * const class_filter = cls->get_filter(filter_name);
1099 if (class_filter == nullptr) {
1100 logger().warn("can't find filter {} in class {}", filter_name, class_name);
1101 throw crimson::osd::invalid_argument{};
1102 }
1103
1104 filter.reset(class_filter->fn());
1105 if (!filter) {
1106 // Object classes are obliged to return us something, but let's
1107 // give an error rather than asserting out.
1108 logger().warn("buggy class {} failed to construct filter {}",
1109 class_name, filter_name);
1110 throw crimson::osd::invalid_argument{};
1111 }
1112 }
1113
1114 ceph_assert(filter);
1115 int r = filter->init(iter);
1116 if (r < 0) {
1117 logger().warn("error initializing filter {}: {}", type, cpp_strerror(r));
1118 throw crimson::osd::invalid_argument{};
1119 }
1120
1121 // successfully constructed and initialized, return it.
1122 return filter;
1123}
1124
20effc67 1125static PG::interruptible_future<hobject_t> pgls_filter(
9f95a23c
TL
1126 const PGLSFilter& filter,
1127 const PGBackend& backend,
1128 const hobject_t& sobj)
1129{
1130 if (const auto xattr = filter.get_xattr(); !xattr.empty()) {
1131 logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
1132 xattr, sobj);
1e59de90 1133 return backend.getxattr(sobj, std::move(xattr)).safe_then_interruptible(
20effc67 1134 [&filter, sobj] (ceph::bufferlist val) {
9f95a23c
TL
1135 logger().debug("pgls_filter: got xvalue for obj={}", sobj);
1136
9f95a23c
TL
1137 const bool filtered = filter.filter(sobj, val);
1138 return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
1139 }, PGBackend::get_attr_errorator::all_same_way([&filter, sobj] {
1140 logger().debug("pgls_filter: got error for obj={}", sobj);
1141
1142 if (filter.reject_empty_xattr()) {
20effc67 1143 return seastar::make_ready_future<hobject_t>();
9f95a23c
TL
1144 }
1145 ceph::bufferlist val;
1146 const bool filtered = filter.filter(sobj, val);
1147 return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
1148 }));
1149 } else {
1150 ceph::bufferlist empty_lvalue_bl;
1151 const bool filtered = filter.filter(sobj, empty_lvalue_bl);
1152 return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
1153 }
1154}
1155
20effc67 1156static PG::interruptible_future<ceph::bufferlist> do_pgnls_common(
9f95a23c
TL
1157 const hobject_t& pg_start,
1158 const hobject_t& pg_end,
1159 const PGBackend& backend,
1160 const hobject_t& lower_bound,
1161 const std::string& nspace,
1162 const uint64_t limit,
1163 const PGLSFilter* const filter)
1164{
1165 if (!(lower_bound.is_min() ||
1166 lower_bound.is_max() ||
1167 (lower_bound >= pg_start && lower_bound < pg_end))) {
1168 // this should only happen with a buggy client.
1169 throw std::invalid_argument("outside of PG bounds");
1170 }
1171
20effc67
TL
1172 return backend.list_objects(lower_bound, limit).then_interruptible(
1173 [&backend, filter, nspace](auto&& ret)
1174 -> PG::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>> {
f67539c2 1175 auto& [objects, next] = ret;
9f95a23c
TL
1176 auto in_my_namespace = [&nspace](const hobject_t& obj) {
1177 using crimson::common::local_conf;
1178 if (obj.get_namespace() == local_conf()->osd_hit_set_namespace) {
1179 return false;
1180 } else if (nspace == librados::all_nspaces) {
1181 return true;
1182 } else {
1183 return obj.get_namespace() == nspace;
1184 }
1185 };
20effc67
TL
1186 auto to_pglsed = [&backend, filter] (const hobject_t& obj)
1187 -> PG::interruptible_future<hobject_t> {
9f95a23c
TL
1188 // this transformation looks costly. However, I don't have any
1189 // reason to think PGLS* operations are critical for, let's say,
1190 // general performance.
1191 //
1192 // from tchaikov: "another way is to use seastar::map_reduce(),
1193 // to 1) save the effort to filter the already filtered objects
1194 // 2) avoid the space to keep the tuple<bool, object> even if
1195 // the object is filtered out".
1196 if (filter) {
1197 return pgls_filter(*filter, backend, obj);
1198 } else {
1199 return seastar::make_ready_future<hobject_t>(obj);
1200 }
1201 };
1202
1203 auto range = objects | boost::adaptors::filtered(in_my_namespace)
1204 | boost::adaptors::transformed(to_pglsed);
1205 logger().debug("do_pgnls_common: finishing the 1st stage of pgls");
1206 return seastar::when_all_succeed(std::begin(range),
1207 std::end(range)).then(
1208 [next=std::move(next)] (auto items) mutable {
1209 // the sole purpose of this chaining is to pass `next` to 2nd
1210 // stage altogether with items
1211 logger().debug("do_pgnls_common: 1st done");
f67539c2
TL
1212 return seastar::make_ready_future<
1213 std::tuple<std::vector<hobject_t>, hobject_t>>(
20effc67 1214 std::move(items), std::move(next));
9f95a23c 1215 });
20effc67 1216 }).then_interruptible(
f67539c2
TL
1217 [pg_end] (auto&& ret) {
1218 auto& [items, next] = ret;
9f95a23c
TL
1219 auto is_matched = [] (const auto& obj) {
1220 return !obj.is_min();
1221 };
1222 auto to_entry = [] (const auto& obj) {
1223 return librados::ListObjectImpl{
1224 obj.get_namespace(), obj.oid.name, obj.get_key()
1225 };
1226 };
1227
1228 pg_nls_response_t response;
1229 boost::push_back(response.entries, items | boost::adaptors::filtered(is_matched)
1230 | boost::adaptors::transformed(to_entry));
1231 response.handle = next.is_max() ? pg_end : next;
1232 ceph::bufferlist out;
1233 encode(response, out);
1e59de90
TL
1234 logger().debug("do_pgnls_common: response.entries.size()= {}",
1235 response.entries.size());
9f95a23c
TL
1236 return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
1237 });
1238}
1239
20effc67 1240static PG::interruptible_future<> do_pgnls(
9f95a23c
TL
1241 const PG& pg,
1242 const std::string& nspace,
1243 OSDOp& osd_op)
1244{
1245 hobject_t lower_bound;
1246 try {
1247 ceph::decode(lower_bound, osd_op.indata);
1248 } catch (const buffer::error&) {
1249 throw std::invalid_argument("unable to decode PGNLS handle");
1250 }
1251 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
1252 const auto pg_end = \
1e59de90 1253 pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
9f95a23c
TL
1254 return do_pgnls_common(pg_start,
1255 pg_end,
1256 pg.get_backend(),
1257 lower_bound,
1258 nspace,
1259 osd_op.op.pgls.count,
1260 nullptr /* no filter */)
20effc67 1261 .then_interruptible([&osd_op](bufferlist bl) {
9f95a23c
TL
1262 osd_op.outdata = std::move(bl);
1263 return seastar::now();
1264 });
1265}
1266
20effc67 1267static PG::interruptible_future<> do_pgnls_filtered(
9f95a23c
TL
1268 const PG& pg,
1269 const std::string& nspace,
1270 OSDOp& osd_op)
1271{
1272 std::string cname, mname, type;
1273 auto bp = osd_op.indata.cbegin();
1274 try {
1275 ceph::decode(cname, bp);
1276 ceph::decode(mname, bp);
1277 ceph::decode(type, bp);
1278 } catch (const buffer::error&) {
1279 throw crimson::osd::invalid_argument{};
1280 }
1281
1282 auto filter = get_pgls_filter(type, bp);
1283
1284 hobject_t lower_bound;
1285 try {
1286 lower_bound.decode(bp);
1287 } catch (const buffer::error&) {
1288 throw std::invalid_argument("unable to decode PGNLS_FILTER description");
1289 }
1290
1291 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
1292 __func__, cname, mname, type, lower_bound,
1293 static_cast<const void*>(filter.get()));
1294 return seastar::do_with(std::move(filter),
1295 [&, lower_bound=std::move(lower_bound)](auto&& filter) {
1296 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
1e59de90 1297 const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
9f95a23c
TL
1298 return do_pgnls_common(pg_start,
1299 pg_end,
1300 pg.get_backend(),
1301 lower_bound,
1302 nspace,
1303 osd_op.op.pgls.count,
1304 filter.get())
20effc67 1305 .then_interruptible([&osd_op](bufferlist bl) {
9f95a23c
TL
1306 osd_op.outdata = std::move(bl);
1307 return seastar::now();
1308 });
1309 });
1310}
1311
20effc67 1312static PG::interruptible_future<ceph::bufferlist> do_pgls_common(
9f95a23c
TL
1313 const hobject_t& pg_start,
1314 const hobject_t& pg_end,
1315 const PGBackend& backend,
1316 const hobject_t& lower_bound,
1317 const std::string& nspace,
1318 const uint64_t limit,
1319 const PGLSFilter* const filter)
1320{
1321 if (!(lower_bound.is_min() ||
1322 lower_bound.is_max() ||
1323 (lower_bound >= pg_start && lower_bound < pg_end))) {
1324 // this should only happen with a buggy client.
1325 throw std::invalid_argument("outside of PG bounds");
1326 }
1327
1328 using entries_t = decltype(pg_ls_response_t::entries);
20effc67 1329 return backend.list_objects(lower_bound, limit).then_interruptible(
f67539c2
TL
1330 [&backend, filter, nspace](auto&& ret) {
1331 auto& [objects, next] = ret;
20effc67
TL
1332 return PG::interruptor::when_all(
1333 PG::interruptor::map_reduce(std::move(objects),
1334 [&backend, filter, nspace](const hobject_t& obj)
1335 -> PG::interruptible_future<hobject_t>{
9f95a23c
TL
1336 if (obj.get_namespace() == nspace) {
1337 if (filter) {
1338 return pgls_filter(*filter, backend, obj);
1339 } else {
1340 return seastar::make_ready_future<hobject_t>(obj);
1341 }
1342 } else {
20effc67 1343 return seastar::make_ready_future<hobject_t>();
9f95a23c
TL
1344 }
1345 },
1346 entries_t{},
f67539c2 1347 [](entries_t entries, hobject_t obj) {
9f95a23c
TL
1348 if (!obj.is_min()) {
1349 entries.emplace_back(obj.oid, obj.get_key());
1350 }
1351 return entries;
1352 }),
1353 seastar::make_ready_future<hobject_t>(next));
20effc67 1354 }).then_interruptible([pg_end](auto&& ret) {
f67539c2
TL
1355 auto entries = std::move(std::get<0>(ret).get0());
1356 auto next = std::move(std::get<1>(ret).get0());
9f95a23c
TL
1357 pg_ls_response_t response;
1358 response.handle = next.is_max() ? pg_end : next;
1359 response.entries = std::move(entries);
1360 ceph::bufferlist out;
1361 encode(response, out);
1362 logger().debug("{}: response.entries.size()=",
1363 __func__, response.entries.size());
1364 return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
1365 });
1366}
1367
20effc67 1368static PG::interruptible_future<> do_pgls(
9f95a23c
TL
1369 const PG& pg,
1370 const std::string& nspace,
1371 OSDOp& osd_op)
1372{
1373 hobject_t lower_bound;
1374 auto bp = osd_op.indata.cbegin();
1375 try {
1376 lower_bound.decode(bp);
1377 } catch (const buffer::error&) {
1378 throw std::invalid_argument{"unable to decode PGLS handle"};
1379 }
1380 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
1381 const auto pg_end =
1e59de90 1382 pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
9f95a23c
TL
1383 return do_pgls_common(pg_start,
1384 pg_end,
1385 pg.get_backend(),
1386 lower_bound,
1387 nspace,
1388 osd_op.op.pgls.count,
1389 nullptr /* no filter */)
20effc67 1390 .then_interruptible([&osd_op](bufferlist bl) {
9f95a23c
TL
1391 osd_op.outdata = std::move(bl);
1392 return seastar::now();
1393 });
1394}
1395
20effc67 1396static PG::interruptible_future<> do_pgls_filtered(
9f95a23c
TL
1397 const PG& pg,
1398 const std::string& nspace,
1399 OSDOp& osd_op)
1400{
1401 std::string cname, mname, type;
1402 auto bp = osd_op.indata.cbegin();
1403 try {
1404 ceph::decode(cname, bp);
1405 ceph::decode(mname, bp);
1406 ceph::decode(type, bp);
1407 } catch (const buffer::error&) {
1408 throw crimson::osd::invalid_argument{};
1409 }
1410
1411 auto filter = get_pgls_filter(type, bp);
1412
1413 hobject_t lower_bound;
1414 try {
1415 lower_bound.decode(bp);
1416 } catch (const buffer::error&) {
1417 throw std::invalid_argument("unable to decode PGLS_FILTER description");
1418 }
1419
1420 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
1421 __func__, cname, mname, type, lower_bound,
1422 static_cast<const void*>(filter.get()));
1423 return seastar::do_with(std::move(filter),
1424 [&, lower_bound=std::move(lower_bound)](auto&& filter) {
1425 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
1e59de90 1426 const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pgpool().info.get_pg_num());
9f95a23c
TL
1427 return do_pgls_common(pg_start,
1428 pg_end,
1429 pg.get_backend(),
1430 lower_bound,
1431 nspace,
1432 osd_op.op.pgls.count,
1433 filter.get())
20effc67 1434 .then_interruptible([&osd_op](bufferlist bl) {
9f95a23c
TL
1435 osd_op.outdata = std::move(bl);
1436 return seastar::now();
1437 });
1438 });
1439}
1440
20effc67 1441PgOpsExecuter::interruptible_future<>
f67539c2 1442PgOpsExecuter::execute_op(OSDOp& osd_op)
9f95a23c
TL
1443{
1444 logger().warn("handling op {}", ceph_osd_op_name(osd_op.op.op));
1445 switch (const ceph_osd_op& op = osd_op.op; op.op) {
1446 case CEPH_OSD_OP_PGLS:
f67539c2 1447 return do_pgls(pg, nspace, osd_op);
9f95a23c 1448 case CEPH_OSD_OP_PGLS_FILTER:
f67539c2 1449 return do_pgls_filtered(pg, nspace, osd_op);
9f95a23c 1450 case CEPH_OSD_OP_PGNLS:
f67539c2 1451 return do_pgnls(pg, nspace, osd_op);
9f95a23c 1452 case CEPH_OSD_OP_PGNLS_FILTER:
f67539c2 1453 return do_pgnls_filtered(pg, nspace, osd_op);
9f95a23c
TL
1454 default:
1455 logger().warn("unknown op {}", ceph_osd_op_name(op.op));
1456 throw std::runtime_error(
1457 fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
1458 }
1459}
1460
1461} // namespace crimson::osd