]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/ops_executer.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / ops_executer.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "ops_executer.h"
5
6#include <boost/range/adaptor/filtered.hpp>
7#include <boost/range/adaptor/map.hpp>
8#include <boost/range/adaptor/transformed.hpp>
9#include <boost/range/algorithm_ext/push_back.hpp>
10#include <boost/range/algorithm/max_element.hpp>
11#include <boost/range/numeric.hpp>
12
13#include <fmt/format.h>
14#include <fmt/ostream.h>
15
16#include <seastar/core/thread.hh>
17
18#include "crimson/osd/exceptions.h"
f67539c2 19#include "crimson/osd/pg.h"
9f95a23c
TL
20#include "crimson/osd/watch.h"
21#include "osd/ClassHandler.h"
22
23namespace {
24 seastar::logger& logger() {
25 return crimson::get_logger(ceph_subsys_osd);
26 }
27}
28
29namespace crimson::osd {
30
20effc67 31OpsExecuter::call_ierrorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
9f95a23c
TL
32{
33 std::string cname, mname;
34 ceph::bufferlist indata;
35 try {
36 auto bp = std::begin(osd_op.indata);
37 bp.copy(osd_op.op.cls.class_len, cname);
38 bp.copy(osd_op.op.cls.method_len, mname);
39 bp.copy(osd_op.op.cls.indata_len, indata);
40 } catch (buffer::error&) {
41 logger().warn("call unable to decode class + method + indata");
42 return crimson::ct_error::invarg::make();
43 }
44
45 // NOTE: opening a class can actually result in dlopen(), and thus
46 // blocking the entire reactor. Thankfully to ClassHandler's cache
47 // this is supposed to be extremely infrequent.
48 ClassHandler::ClassData* cls;
49 int r = ClassHandler::get_instance().open_class(cname, &cls);
50 if (r) {
51 logger().warn("class {} open got {}", cname, cpp_strerror(r));
52 if (r == -ENOENT) {
53 return crimson::ct_error::operation_not_supported::make();
54 } else if (r == -EPERM) {
55 // propagate permission errors
56 return crimson::ct_error::permission_denied::make();
57 }
58 return crimson::ct_error::input_output_error::make();
59 }
60
61 ClassHandler::ClassMethod* method = cls->get_method(mname);
62 if (!method) {
63 logger().warn("call method {}.{} does not exist", cname, mname);
64 return crimson::ct_error::operation_not_supported::make();
65 }
66
67 const auto flags = method->get_flags();
68 if (!obc->obs.exists && (flags & CLS_METHOD_WR) == 0) {
69 return crimson::ct_error::enoent::make();
70 }
71
72#if 0
73 if (flags & CLS_METHOD_WR) {
74 ctx->user_modify = true;
75 }
76#endif
77
f67539c2
TL
78 logger().debug("calling method {}.{}, num_read={}, num_write={}",
79 cname, mname, num_read, num_write);
80 const auto prev_rd = num_read;
81 const auto prev_wr = num_write;
20effc67 82 return interruptor::async(
9f95a23c
TL
83 [this, method, indata=std::move(indata)]() mutable {
84 ceph::bufferlist outdata;
85 auto cls_context = reinterpret_cast<cls_method_context_t>(this);
86 const auto ret = method->exec(cls_context, indata, outdata);
87 return std::make_pair(ret, std::move(outdata));
88 }
20effc67 89 ).then_interruptible(
f67539c2 90 [this, prev_rd, prev_wr, &osd_op, flags]
9f95a23c
TL
91 (auto outcome) -> call_errorator::future<> {
92 auto& [ret, outdata] = outcome;
f67539c2 93 osd_op.rval = ret;
9f95a23c 94
f67539c2
TL
95 logger().debug("do_op_call: method returned ret={}, outdata.length()={}"
96 " while num_read={}, num_write={}",
97 ret, outdata.length(), num_read, num_write);
9f95a23c
TL
98 if (num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
99 logger().error("method tried to read object but is not marked RD");
f67539c2 100 osd_op.rval = -EIO;
9f95a23c
TL
101 return crimson::ct_error::input_output_error::make();
102 }
103 if (num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
104 logger().error("method tried to update object but is not marked WR");
f67539c2 105 osd_op.rval = -EIO;
9f95a23c
TL
106 return crimson::ct_error::input_output_error::make();
107 }
f67539c2
TL
108 // ceph-osd has this implemented in `PrimaryLogPG::execute_ctx`,
109 // grep for `ignore_out_data`.
110 using crimson::common::local_conf;
111 if (op_info.allows_returnvec() &&
112 op_info.may_write() &&
113 ret >= 0 &&
114 outdata.length() > local_conf()->osd_max_write_op_reply_len) {
115 // the justification of this limit it to not inflate the pg log.
116 // that's the reason why we don't worry about pure reads.
117 logger().error("outdata overflow due to .length()={}, limit={}",
118 outdata.length(),
119 local_conf()->osd_max_write_op_reply_len);
120 osd_op.rval = -EOVERFLOW;
121 return crimson::ct_error::value_too_large::make();
122 }
123 // for write calls we never return data expect errors or RETURNVEC.
124 // please refer cls/cls_hello.cc to details.
125 if (!op_info.may_write() || op_info.allows_returnvec() || ret < 0) {
9f95a23c
TL
126 osd_op.op.extent.length = outdata.length();
127 osd_op.outdata.claim_append(outdata);
128 }
129 if (ret < 0) {
f67539c2
TL
130 return crimson::stateful_ec{
131 std::error_code(-ret, std::generic_category()) };
132 } else {
133 return seastar::now();
9f95a23c 134 }
9f95a23c
TL
135 }
136 );
137}
138
139static watch_info_t create_watch_info(const OSDOp& osd_op,
20effc67 140 const OpsExecuter::ExecutableMessage& msg)
9f95a23c
TL
141{
142 using crimson::common::local_conf;
143 const uint32_t timeout =
144 osd_op.op.watch.timeout == 0 ? local_conf()->osd_client_watch_timeout
145 : osd_op.op.watch.timeout;
146 return {
147 osd_op.op.watch.cookie,
148 timeout,
149 msg.get_connection()->get_peer_addr()
150 };
151}
152
20effc67 153OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
9f95a23c
TL
154 OSDOp& osd_op,
155 ObjectState& os,
156 ceph::os::Transaction& txn)
157{
20effc67 158 logger().debug("{}", __func__);
9f95a23c
TL
159 struct connect_ctx_t {
160 ObjectContext::watch_key_t key;
161 crimson::net::ConnectionRef conn;
162 watch_info_t info;
163
20effc67 164 connect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg)
9f95a23c
TL
165 : key(osd_op.op.watch.cookie, msg.get_reqid().name),
166 conn(msg.get_connection()),
167 info(create_watch_info(osd_op, msg)) {
168 }
169 };
170 return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() },
171 [&] (auto& ctx) {
172 const auto& entity = ctx.key.second;
173 auto [it, emplaced] =
174 os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
175 if (emplaced) {
176 logger().info("registered new watch {} by {}", it->second, entity);
177 txn.nop();
178 } else {
179 logger().info("found existing watch {} by {}", it->second, entity);
180 }
181 return seastar::now();
182 },
20effc67
TL
183 [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
184 assert(pg);
9f95a23c
TL
185 auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
186 if (emplaced) {
187 const auto& [cookie, entity] = ctx.key;
20effc67
TL
188 it->second = crimson::osd::Watch::create(
189 obc, ctx.info, entity, std::move(pg));
9f95a23c
TL
190 logger().info("op_effect: added new watcher: {}", ctx.key);
191 } else {
192 logger().info("op_effect: found existing watcher: {}", ctx.key);
193 }
194 return it->second->connect(std::move(ctx.conn), true /* will_ping */);
195 });
196}
197
20effc67 198OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
9f95a23c
TL
199 OSDOp& osd_op,
200 ObjectState& os,
201 ceph::os::Transaction& txn)
202{
203 const entity_name_t& entity = get_message().get_reqid().name;
204 const auto& cookie = osd_op.op.watch.cookie;
205 if (!os.oi.watchers.count(std::make_pair(cookie, entity))) {
206 return crimson::ct_error::not_connected::make();
207 } else {
208 logger().info("found existing watch by {}", entity);
209 return do_op_watch_subop_watch(osd_op, os, txn);
210 }
211}
212
20effc67 213OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_unwatch(
9f95a23c
TL
214 OSDOp& osd_op,
215 ObjectState& os,
216 ceph::os::Transaction& txn)
217{
218 logger().info("{}", __func__);
219
220 struct disconnect_ctx_t {
221 ObjectContext::watch_key_t key;
20effc67 222 disconnect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg)
9f95a23c
TL
223 : key(osd_op.op.watch.cookie, msg.get_reqid().name) {
224 }
225 };
226 return with_effect_on_obc(disconnect_ctx_t{ osd_op, get_message() },
227 [&] (auto& ctx) {
228 const auto& entity = ctx.key.second;
229 if (auto nh = os.oi.watchers.extract(ctx.key); !nh.empty()) {
230 logger().info("removed watch {} by {}", nh.mapped(), entity);
231 txn.nop();
232 } else {
233 logger().info("can't remove: no watch by {}", entity);
234 }
235 return seastar::now();
236 },
20effc67 237 [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
9f95a23c
TL
238 if (auto nh = obc->watchers.extract(ctx.key); !nh.empty()) {
239 return seastar::do_with(std::move(nh.mapped()),
240 [ctx](auto&& watcher) {
241 logger().info("op_effect: disconnect watcher {}", ctx.key);
20effc67 242 return watcher->remove();
9f95a23c
TL
243 });
244 } else {
245 logger().info("op_effect: disconnect failed to find watcher {}", ctx.key);
246 return seastar::now();
247 }
248 });
249}
250
20effc67 251OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_ping(
9f95a23c
TL
252 OSDOp& osd_op,
253 ObjectState& os,
254 ceph::os::Transaction& txn)
255{
256 const entity_name_t& entity = get_message().get_reqid().name;
257 const auto& cookie = osd_op.op.watch.cookie;
258 const auto key = std::make_pair(cookie, entity);
259
260 // Note: WATCH with PING doesn't cause may_write() to return true,
261 // so if there is nothing else in the transaction, this is going
262 // to run do_osd_op_effects, but not write out a log entry */
263 if (!os.oi.watchers.count(key)) {
264 return crimson::ct_error::not_connected::make();
265 }
266 auto it = obc->watchers.find(key);
267 if (it == std::end(obc->watchers) || !it->second->is_connected()) {
268 return crimson::ct_error::timed_out::make();
269 }
270 logger().info("found existing watch by {}", entity);
271 it->second->got_ping(ceph_clock_now());
272 return seastar::now();
273}
274
20effc67 275OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch(
9f95a23c
TL
276 OSDOp& osd_op,
277 ObjectState& os,
278 ceph::os::Transaction& txn)
279{
280 logger().debug("{}", __func__);
281 if (!os.exists) {
282 return crimson::ct_error::enoent::make();
283 }
284 switch (osd_op.op.watch.op) {
285 case CEPH_OSD_WATCH_OP_WATCH:
286 return do_op_watch_subop_watch(osd_op, os, txn);
287 case CEPH_OSD_WATCH_OP_RECONNECT:
288 return do_op_watch_subop_reconnect(osd_op, os, txn);
289 case CEPH_OSD_WATCH_OP_PING:
290 return do_op_watch_subop_ping(osd_op, os, txn);
291 case CEPH_OSD_WATCH_OP_UNWATCH:
292 return do_op_watch_subop_unwatch(osd_op, os, txn);
293 case CEPH_OSD_WATCH_OP_LEGACY_WATCH:
294 logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH");
295 return crimson::ct_error::invarg::make();
296 }
297 logger().warn("unrecognized WATCH subop: {}", osd_op.op.watch.op);
298 return crimson::ct_error::invarg::make();
299}
300
301static uint64_t get_next_notify_id(epoch_t e)
302{
303 // FIXME
304 static std::uint64_t next_notify_id = 0;
305 return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++));
306}
307
20effc67 308OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify(
9f95a23c
TL
309 OSDOp& osd_op,
310 const ObjectState& os)
311{
312 logger().debug("{}, msg epoch: {}", __func__, get_message().get_map_epoch());
313
314 if (!os.exists) {
315 return crimson::ct_error::enoent::make();
316 }
317 struct notify_ctx_t {
318 crimson::net::ConnectionRef conn;
319 notify_info_t ninfo;
320 const uint64_t client_gid;
321 const epoch_t epoch;
322
20effc67 323 notify_ctx_t(const ExecutableMessage& msg)
9f95a23c
TL
324 : conn(msg.get_connection()),
325 client_gid(msg.get_reqid().name.num()),
326 epoch(msg.get_map_epoch()) {
327 }
328 };
329 return with_effect_on_obc(notify_ctx_t{ get_message() },
330 [&] (auto& ctx) {
331 try {
332 auto bp = osd_op.indata.cbegin();
333 uint32_t ver; // obsolete
334 ceph::decode(ver, bp);
335 ceph::decode(ctx.ninfo.timeout, bp);
336 ceph::decode(ctx.ninfo.bl, bp);
337 } catch (const buffer::error&) {
338 ctx.ninfo.timeout = 0;
339 }
340 if (!ctx.ninfo.timeout) {
341 using crimson::common::local_conf;
342 ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
343 }
344 ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
345 ctx.ninfo.cookie = osd_op.op.notify.cookie;
346 // return our unique notify id to the client
347 ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
348 return seastar::now();
349 },
20effc67 350 [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
9f95a23c
TL
351 auto alive_watchers = obc->watchers | boost::adaptors::map_values
352 | boost::adaptors::filtered(
353 [] (const auto& w) {
354 // FIXME: filter as for the `is_ping` in `Watch::start_notify`
355 return w->is_alive();
356 });
357 return crimson::osd::Notify::create_n_propagate(
358 std::begin(alive_watchers),
359 std::end(alive_watchers),
360 std::move(ctx.conn),
361 ctx.ninfo,
362 ctx.client_gid,
363 obc->obs.oi.user_version);
364 });
365}
366
20effc67
TL
367OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_list_watchers(
368 OSDOp& osd_op,
369 const ObjectState& os)
370{
371 logger().debug("{}", __func__);
372
373 obj_list_watch_response_t response;
374 for (const auto& [key, info] : os.oi.watchers) {
375 logger().debug("{}: key cookie={}, entity={}",
376 __func__, key.first, key.second);
377 assert(key.first == info.cookie);
378 assert(key.second.is_client());
379 response.entries.emplace_back(watch_item_t{
380 key.second, info.cookie, info.timeout_seconds, info.addr});
381 response.encode(osd_op.outdata, get_message().get_features());
382 }
383 return watch_ierrorator::now();
384}
385
386OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify_ack(
9f95a23c
TL
387 OSDOp& osd_op,
388 const ObjectState& os)
389{
390 logger().debug("{}", __func__);
391
392 struct notifyack_ctx_t {
393 const entity_name_t entity;
394 uint64_t watch_cookie;
395 uint64_t notify_id;
396 ceph::bufferlist reply_bl;
397
20effc67
TL
398 notifyack_ctx_t(const ExecutableMessage& msg)
399 : entity(msg.get_reqid().name) {
9f95a23c
TL
400 }
401 };
402 return with_effect_on_obc(notifyack_ctx_t{ get_message() },
403 [&] (auto& ctx) -> watch_errorator::future<> {
404 try {
405 auto bp = osd_op.indata.cbegin();
406 ceph::decode(ctx.notify_id, bp);
407 ceph::decode(ctx.watch_cookie, bp);
408 if (!bp.end()) {
409 ceph::decode(ctx.reply_bl, bp);
410 }
411 } catch (const buffer::error&) {
412 // here we behave differently than ceph-osd. For historical reasons,
413 // it falls back to using `osd_op.op.watch.cookie` as `ctx.notify_id`.
414 // crimson just returns EINVAL if the data cannot be decoded.
415 return crimson::ct_error::invarg::make();
416 }
417 return watch_errorator::now();
418 },
20effc67 419 [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
9f95a23c
TL
420 logger().info("notify_ack watch_cookie={}, notify_id={}",
421 ctx.watch_cookie, ctx.notify_id);
422 return seastar::do_for_each(obc->watchers,
423 [ctx=std::move(ctx)] (auto& kv) {
424 const auto& [key, watchp] = kv;
425 static_assert(
426 std::is_same_v<std::decay_t<decltype(watchp)>,
427 seastar::shared_ptr<crimson::osd::Watch>>);
428 auto& [cookie, entity] = key;
429 if (ctx.entity != entity) {
430 logger().debug("skipping watch {}; entity name {} != {}",
431 key, entity, ctx.entity);
432 return seastar::now();
433 }
434 if (ctx.watch_cookie != cookie) {
435 logger().debug("skipping watch {}; cookie {} != {}",
436 key, ctx.watch_cookie, cookie);
437 return seastar::now();
438 }
439 logger().info("acking notify on watch {}", key);
440 return watchp->notify_ack(ctx.notify_id, ctx.reply_bl);
441 });
442 });
443}
444
20effc67
TL
445// Defined here because there is a circular dependency between OpsExecuter and PG
446template <class Func>
447auto OpsExecuter::do_const_op(Func&& f) {
448 // TODO: pass backend as read-only
449 return std::forward<Func>(f)(pg->get_backend(), std::as_const(obc->obs));
450}
451
452// Defined here because there is a circular dependency between OpsExecuter and PG
453template <class Func>
454auto OpsExecuter::do_write_op(Func&& f, bool um) {
455 ++num_write;
456 if (!osd_op_params) {
457 osd_op_params.emplace();
458 }
459 user_modify = um;
460 return std::forward<Func>(f)(pg->get_backend(), obc->obs, txn);
461}
462
463OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator>
f67539c2
TL
464OpsExecuter::execute_op(OSDOp& osd_op)
465{
466 // TODO: dispatch via call table?
467 // TODO: we might want to find a way to unify both input and output
468 // of each op.
469 logger().debug(
470 "handling op {} on object {}",
471 ceph_osd_op_name(osd_op.op.op),
472 get_target());
473 switch (const ceph_osd_op& op = osd_op.op; op.op) {
474 case CEPH_OSD_OP_SYNC_READ:
475 [[fallthrough]];
476 case CEPH_OSD_OP_READ:
20effc67
TL
477 return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
478 return backend.read(os, osd_op, delta_stats);
f67539c2
TL
479 });
480 case CEPH_OSD_OP_SPARSE_READ:
20effc67
TL
481 return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
482 return backend.sparse_read(os, osd_op, delta_stats);
f67539c2
TL
483 });
484 case CEPH_OSD_OP_CHECKSUM:
485 return do_read_op([&osd_op] (auto& backend, const auto& os) {
486 return backend.checksum(os, osd_op);
487 });
488 case CEPH_OSD_OP_CMPEXT:
489 return do_read_op([&osd_op] (auto& backend, const auto& os) {
490 return backend.cmp_ext(os, osd_op);
491 });
492 case CEPH_OSD_OP_GETXATTR:
20effc67
TL
493 return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
494 return backend.getxattr(os, osd_op, delta_stats);
f67539c2
TL
495 });
496 case CEPH_OSD_OP_GETXATTRS:
20effc67
TL
497 return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
498 return backend.get_xattrs(os, osd_op, delta_stats);
499 });
500 case CEPH_OSD_OP_CMPXATTR:
501 return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
502 return backend.cmp_xattr(os, osd_op, delta_stats);
f67539c2
TL
503 });
504 case CEPH_OSD_OP_RMXATTR:
20effc67
TL
505 return do_write_op(
506 [&osd_op] (auto& backend, auto& os, auto& txn) {
f67539c2
TL
507 return backend.rm_xattr(os, osd_op, txn);
508 }, true);
509 case CEPH_OSD_OP_CREATE:
20effc67
TL
510 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
511 return backend.create(os, osd_op, txn, delta_stats);
f67539c2
TL
512 }, true);
513 case CEPH_OSD_OP_WRITE:
514 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
20effc67 515 return backend.write(os, osd_op, txn, *osd_op_params, delta_stats);
f67539c2
TL
516 }, true);
517 case CEPH_OSD_OP_WRITESAME:
518 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
20effc67 519 return backend.write_same(os, osd_op, txn, *osd_op_params, delta_stats);
f67539c2
TL
520 }, true);
521 case CEPH_OSD_OP_WRITEFULL:
522 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
20effc67 523 return backend.writefull(os, osd_op, txn, *osd_op_params, delta_stats);
f67539c2
TL
524 }, true);
525 case CEPH_OSD_OP_APPEND:
526 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
20effc67 527 return backend.append(os, osd_op, txn, *osd_op_params, delta_stats);
f67539c2
TL
528 }, true);
529 case CEPH_OSD_OP_TRUNCATE:
530 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
531 // FIXME: rework needed. Move this out to do_write_op(), introduce
532 // do_write_op_no_user_modify()...
20effc67 533 return backend.truncate(os, osd_op, txn, *osd_op_params, delta_stats);
f67539c2
TL
534 }, true);
535 case CEPH_OSD_OP_ZERO:
536 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
20effc67 537 return backend.zero(os, osd_op, txn, *osd_op_params, delta_stats);
f67539c2
TL
538 }, true);
539 case CEPH_OSD_OP_SETALLOCHINT:
540 return osd_op_errorator::now();
541 case CEPH_OSD_OP_SETXATTR:
20effc67
TL
542 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
543 return backend.setxattr(os, osd_op, txn, delta_stats);
f67539c2
TL
544 }, true);
545 case CEPH_OSD_OP_DELETE:
20effc67
TL
546 return do_write_op([this] (auto& backend, auto& os, auto& txn) {
547 return backend.remove(os, txn, delta_stats);
f67539c2
TL
548 }, true);
549 case CEPH_OSD_OP_CALL:
550 return this->do_op_call(osd_op);
551 case CEPH_OSD_OP_STAT:
552 // note: stat does not require RD
20effc67
TL
553 return do_const_op([this, &osd_op] (/* const */auto& backend, const auto& os) {
554 return backend.stat(os, osd_op, delta_stats);
f67539c2
TL
555 });
556 case CEPH_OSD_OP_TMAPUP:
557 // TODO: there was an effort to kill TMAP in ceph-osd. According to
558 // @dzafman this isn't possible yet. Maybe it could be accomplished
559 // before crimson's readiness and we'd luckily don't need to carry.
560 return dont_do_legacy_op();
561
562 // OMAP
563 case CEPH_OSD_OP_OMAPGETKEYS:
20effc67
TL
564 return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
565 return backend.omap_get_keys(os, osd_op, delta_stats);
f67539c2
TL
566 });
567 case CEPH_OSD_OP_OMAPGETVALS:
20effc67
TL
568 return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
569 return backend.omap_get_vals(os, osd_op, delta_stats);
f67539c2
TL
570 });
571 case CEPH_OSD_OP_OMAPGETHEADER:
20effc67
TL
572 return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
573 return backend.omap_get_header(os, osd_op, delta_stats);
f67539c2
TL
574 });
575 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
20effc67
TL
576 return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
577 return backend.omap_get_vals_by_keys(os, osd_op, delta_stats);
f67539c2
TL
578 });
579 case CEPH_OSD_OP_OMAPSETVALS:
580#if 0
581 if (!pg.get_pool().info.supports_omap()) {
582 return crimson::ct_error::operation_not_supported::make();
583 }
584#endif
585 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
20effc67 586 return backend.omap_set_vals(os, osd_op, txn, *osd_op_params, delta_stats);
f67539c2
TL
587 }, true);
588 case CEPH_OSD_OP_OMAPSETHEADER:
589#if 0
590 if (!pg.get_pool().info.supports_omap()) {
591 return crimson::ct_error::operation_not_supported::make();
592 }
593#endif
20effc67
TL
594 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
595 return backend.omap_set_header(os, osd_op, txn, *osd_op_params,
596 delta_stats);
f67539c2
TL
597 }, true);
598 case CEPH_OSD_OP_OMAPRMKEYRANGE:
599#if 0
600 if (!pg.get_pool().info.supports_omap()) {
601 return crimson::ct_error::operation_not_supported::make();
602 }
603#endif
20effc67
TL
604 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
605 return backend.omap_remove_range(os, osd_op, txn, delta_stats);
606 }, true);
607 case CEPH_OSD_OP_OMAPRMKEYS:
608 /** TODO: Implement supports_omap()
609 if (!pg.get_pool().info.supports_omap()) {
610 return crimson::ct_error::operation_not_supported::make();
611 }*/
f67539c2 612 return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
20effc67 613 return backend.omap_remove_key(os, osd_op, txn);
f67539c2
TL
614 }, true);
615 case CEPH_OSD_OP_OMAPCLEAR:
616 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
20effc67 617 return backend.omap_clear(os, osd_op, txn, *osd_op_params, delta_stats);
f67539c2
TL
618 }, true);
619
620 // watch/notify
621 case CEPH_OSD_OP_WATCH:
622 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
623 return do_op_watch(osd_op, os, txn);
624 }, false);
20effc67
TL
625 case CEPH_OSD_OP_LIST_WATCHERS:
626 return do_read_op([this, &osd_op] (auto&, const auto& os) {
627 return do_op_list_watchers(osd_op, os);
628 });
f67539c2
TL
629 case CEPH_OSD_OP_NOTIFY:
630 return do_read_op([this, &osd_op] (auto&, const auto& os) {
631 return do_op_notify(osd_op, os);
632 });
633 case CEPH_OSD_OP_NOTIFY_ACK:
634 return do_read_op([this, &osd_op] (auto&, const auto& os) {
635 return do_op_notify_ack(osd_op, os);
636 });
637
638 default:
639 logger().warn("unknown op {}", ceph_osd_op_name(op.op));
640 throw std::runtime_error(
641 fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
642 }
643}
644
20effc67
TL
645// Defined here because there is a circular dependency between OpsExecuter and PG
646uint32_t OpsExecuter::get_pool_stripe_width() const {
647 return pg->get_pool().info.get_stripe_width();
648}
649
650// Defined here because there is a circular dependency between OpsExecuter and PG
651version_t OpsExecuter::get_last_user_version() const
652{
653 return pg->get_last_user_version();
654}
655
9f95a23c
TL
656static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
657 const std::string& type,
658 bufferlist::const_iterator& iter)
659{
660 // storing non-const PGLSFilter for the sake of ::init()
661 std::unique_ptr<PGLSFilter> filter;
662 if (type.compare("plain") == 0) {
663 filter = std::make_unique<PGLSPlainFilter>();
664 } else {
665 std::size_t dot = type.find(".");
666 if (dot == type.npos || dot == 0 || dot == type.size() - 1) {
667 throw crimson::osd::invalid_argument{};
668 }
669
670 const std::string class_name = type.substr(0, dot);
671 const std::string filter_name = type.substr(dot + 1);
672 ClassHandler::ClassData *cls = nullptr;
673 int r = ClassHandler::get_instance().open_class(class_name, &cls);
674 if (r != 0) {
675 logger().warn("can't open class {}: {}", class_name, cpp_strerror(r));
676 if (r == -EPERM) {
677 // propogate permission error
678 throw crimson::osd::permission_denied{};
679 } else {
680 throw crimson::osd::invalid_argument{};
681 }
682 } else {
683 ceph_assert(cls);
684 }
685
686 ClassHandler::ClassFilter * const class_filter = cls->get_filter(filter_name);
687 if (class_filter == nullptr) {
688 logger().warn("can't find filter {} in class {}", filter_name, class_name);
689 throw crimson::osd::invalid_argument{};
690 }
691
692 filter.reset(class_filter->fn());
693 if (!filter) {
694 // Object classes are obliged to return us something, but let's
695 // give an error rather than asserting out.
696 logger().warn("buggy class {} failed to construct filter {}",
697 class_name, filter_name);
698 throw crimson::osd::invalid_argument{};
699 }
700 }
701
702 ceph_assert(filter);
703 int r = filter->init(iter);
704 if (r < 0) {
705 logger().warn("error initializing filter {}: {}", type, cpp_strerror(r));
706 throw crimson::osd::invalid_argument{};
707 }
708
709 // successfully constructed and initialized, return it.
710 return filter;
711}
712
20effc67 713static PG::interruptible_future<hobject_t> pgls_filter(
9f95a23c
TL
714 const PGLSFilter& filter,
715 const PGBackend& backend,
716 const hobject_t& sobj)
717{
718 if (const auto xattr = filter.get_xattr(); !xattr.empty()) {
719 logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
720 xattr, sobj);
20effc67
TL
721 return backend.getxattr(sobj, xattr).safe_then_interruptible(
722 [&filter, sobj] (ceph::bufferlist val) {
9f95a23c
TL
723 logger().debug("pgls_filter: got xvalue for obj={}", sobj);
724
9f95a23c
TL
725 const bool filtered = filter.filter(sobj, val);
726 return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
727 }, PGBackend::get_attr_errorator::all_same_way([&filter, sobj] {
728 logger().debug("pgls_filter: got error for obj={}", sobj);
729
730 if (filter.reject_empty_xattr()) {
20effc67 731 return seastar::make_ready_future<hobject_t>();
9f95a23c
TL
732 }
733 ceph::bufferlist val;
734 const bool filtered = filter.filter(sobj, val);
735 return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
736 }));
737 } else {
738 ceph::bufferlist empty_lvalue_bl;
739 const bool filtered = filter.filter(sobj, empty_lvalue_bl);
740 return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
741 }
742}
743
20effc67 744static PG::interruptible_future<ceph::bufferlist> do_pgnls_common(
9f95a23c
TL
745 const hobject_t& pg_start,
746 const hobject_t& pg_end,
747 const PGBackend& backend,
748 const hobject_t& lower_bound,
749 const std::string& nspace,
750 const uint64_t limit,
751 const PGLSFilter* const filter)
752{
753 if (!(lower_bound.is_min() ||
754 lower_bound.is_max() ||
755 (lower_bound >= pg_start && lower_bound < pg_end))) {
756 // this should only happen with a buggy client.
757 throw std::invalid_argument("outside of PG bounds");
758 }
759
20effc67
TL
760 return backend.list_objects(lower_bound, limit).then_interruptible(
761 [&backend, filter, nspace](auto&& ret)
762 -> PG::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>> {
f67539c2 763 auto& [objects, next] = ret;
9f95a23c
TL
764 auto in_my_namespace = [&nspace](const hobject_t& obj) {
765 using crimson::common::local_conf;
766 if (obj.get_namespace() == local_conf()->osd_hit_set_namespace) {
767 return false;
768 } else if (nspace == librados::all_nspaces) {
769 return true;
770 } else {
771 return obj.get_namespace() == nspace;
772 }
773 };
20effc67
TL
774 auto to_pglsed = [&backend, filter] (const hobject_t& obj)
775 -> PG::interruptible_future<hobject_t> {
9f95a23c
TL
776 // this transformation looks costly. However, I don't have any
777 // reason to think PGLS* operations are critical for, let's say,
778 // general performance.
779 //
780 // from tchaikov: "another way is to use seastar::map_reduce(),
781 // to 1) save the effort to filter the already filtered objects
782 // 2) avoid the space to keep the tuple<bool, object> even if
783 // the object is filtered out".
784 if (filter) {
785 return pgls_filter(*filter, backend, obj);
786 } else {
787 return seastar::make_ready_future<hobject_t>(obj);
788 }
789 };
790
791 auto range = objects | boost::adaptors::filtered(in_my_namespace)
792 | boost::adaptors::transformed(to_pglsed);
793 logger().debug("do_pgnls_common: finishing the 1st stage of pgls");
794 return seastar::when_all_succeed(std::begin(range),
795 std::end(range)).then(
796 [next=std::move(next)] (auto items) mutable {
797 // the sole purpose of this chaining is to pass `next` to 2nd
798 // stage altogether with items
799 logger().debug("do_pgnls_common: 1st done");
f67539c2
TL
800 return seastar::make_ready_future<
801 std::tuple<std::vector<hobject_t>, hobject_t>>(
20effc67 802 std::move(items), std::move(next));
9f95a23c 803 });
20effc67 804 }).then_interruptible(
f67539c2
TL
805 [pg_end] (auto&& ret) {
806 auto& [items, next] = ret;
9f95a23c
TL
807 auto is_matched = [] (const auto& obj) {
808 return !obj.is_min();
809 };
810 auto to_entry = [] (const auto& obj) {
811 return librados::ListObjectImpl{
812 obj.get_namespace(), obj.oid.name, obj.get_key()
813 };
814 };
815
816 pg_nls_response_t response;
817 boost::push_back(response.entries, items | boost::adaptors::filtered(is_matched)
818 | boost::adaptors::transformed(to_entry));
819 response.handle = next.is_max() ? pg_end : next;
820 ceph::bufferlist out;
821 encode(response, out);
822 logger().debug("{}: response.entries.size()=",
823 __func__, response.entries.size());
824 return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
825 });
826}
827
20effc67 828static PG::interruptible_future<> do_pgnls(
9f95a23c
TL
829 const PG& pg,
830 const std::string& nspace,
831 OSDOp& osd_op)
832{
833 hobject_t lower_bound;
834 try {
835 ceph::decode(lower_bound, osd_op.indata);
836 } catch (const buffer::error&) {
837 throw std::invalid_argument("unable to decode PGNLS handle");
838 }
839 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
840 const auto pg_end = \
841 pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
842 return do_pgnls_common(pg_start,
843 pg_end,
844 pg.get_backend(),
845 lower_bound,
846 nspace,
847 osd_op.op.pgls.count,
848 nullptr /* no filter */)
20effc67 849 .then_interruptible([&osd_op](bufferlist bl) {
9f95a23c
TL
850 osd_op.outdata = std::move(bl);
851 return seastar::now();
852 });
853}
854
20effc67 855static PG::interruptible_future<> do_pgnls_filtered(
9f95a23c
TL
856 const PG& pg,
857 const std::string& nspace,
858 OSDOp& osd_op)
859{
860 std::string cname, mname, type;
861 auto bp = osd_op.indata.cbegin();
862 try {
863 ceph::decode(cname, bp);
864 ceph::decode(mname, bp);
865 ceph::decode(type, bp);
866 } catch (const buffer::error&) {
867 throw crimson::osd::invalid_argument{};
868 }
869
870 auto filter = get_pgls_filter(type, bp);
871
872 hobject_t lower_bound;
873 try {
874 lower_bound.decode(bp);
875 } catch (const buffer::error&) {
876 throw std::invalid_argument("unable to decode PGNLS_FILTER description");
877 }
878
879 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
880 __func__, cname, mname, type, lower_bound,
881 static_cast<const void*>(filter.get()));
882 return seastar::do_with(std::move(filter),
883 [&, lower_bound=std::move(lower_bound)](auto&& filter) {
884 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
885 const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
886 return do_pgnls_common(pg_start,
887 pg_end,
888 pg.get_backend(),
889 lower_bound,
890 nspace,
891 osd_op.op.pgls.count,
892 filter.get())
20effc67 893 .then_interruptible([&osd_op](bufferlist bl) {
9f95a23c
TL
894 osd_op.outdata = std::move(bl);
895 return seastar::now();
896 });
897 });
898}
899
20effc67 900static PG::interruptible_future<ceph::bufferlist> do_pgls_common(
9f95a23c
TL
901 const hobject_t& pg_start,
902 const hobject_t& pg_end,
903 const PGBackend& backend,
904 const hobject_t& lower_bound,
905 const std::string& nspace,
906 const uint64_t limit,
907 const PGLSFilter* const filter)
908{
909 if (!(lower_bound.is_min() ||
910 lower_bound.is_max() ||
911 (lower_bound >= pg_start && lower_bound < pg_end))) {
912 // this should only happen with a buggy client.
913 throw std::invalid_argument("outside of PG bounds");
914 }
915
916 using entries_t = decltype(pg_ls_response_t::entries);
20effc67 917 return backend.list_objects(lower_bound, limit).then_interruptible(
f67539c2
TL
918 [&backend, filter, nspace](auto&& ret) {
919 auto& [objects, next] = ret;
20effc67
TL
920 return PG::interruptor::when_all(
921 PG::interruptor::map_reduce(std::move(objects),
922 [&backend, filter, nspace](const hobject_t& obj)
923 -> PG::interruptible_future<hobject_t>{
9f95a23c
TL
924 if (obj.get_namespace() == nspace) {
925 if (filter) {
926 return pgls_filter(*filter, backend, obj);
927 } else {
928 return seastar::make_ready_future<hobject_t>(obj);
929 }
930 } else {
20effc67 931 return seastar::make_ready_future<hobject_t>();
9f95a23c
TL
932 }
933 },
934 entries_t{},
f67539c2 935 [](entries_t entries, hobject_t obj) {
9f95a23c
TL
936 if (!obj.is_min()) {
937 entries.emplace_back(obj.oid, obj.get_key());
938 }
939 return entries;
940 }),
941 seastar::make_ready_future<hobject_t>(next));
20effc67 942 }).then_interruptible([pg_end](auto&& ret) {
f67539c2
TL
943 auto entries = std::move(std::get<0>(ret).get0());
944 auto next = std::move(std::get<1>(ret).get0());
9f95a23c
TL
945 pg_ls_response_t response;
946 response.handle = next.is_max() ? pg_end : next;
947 response.entries = std::move(entries);
948 ceph::bufferlist out;
949 encode(response, out);
950 logger().debug("{}: response.entries.size()=",
951 __func__, response.entries.size());
952 return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
953 });
954}
955
20effc67 956static PG::interruptible_future<> do_pgls(
9f95a23c
TL
957 const PG& pg,
958 const std::string& nspace,
959 OSDOp& osd_op)
960{
961 hobject_t lower_bound;
962 auto bp = osd_op.indata.cbegin();
963 try {
964 lower_bound.decode(bp);
965 } catch (const buffer::error&) {
966 throw std::invalid_argument{"unable to decode PGLS handle"};
967 }
968 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
969 const auto pg_end =
970 pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
971 return do_pgls_common(pg_start,
972 pg_end,
973 pg.get_backend(),
974 lower_bound,
975 nspace,
976 osd_op.op.pgls.count,
977 nullptr /* no filter */)
20effc67 978 .then_interruptible([&osd_op](bufferlist bl) {
9f95a23c
TL
979 osd_op.outdata = std::move(bl);
980 return seastar::now();
981 });
982}
983
20effc67 984static PG::interruptible_future<> do_pgls_filtered(
9f95a23c
TL
985 const PG& pg,
986 const std::string& nspace,
987 OSDOp& osd_op)
988{
989 std::string cname, mname, type;
990 auto bp = osd_op.indata.cbegin();
991 try {
992 ceph::decode(cname, bp);
993 ceph::decode(mname, bp);
994 ceph::decode(type, bp);
995 } catch (const buffer::error&) {
996 throw crimson::osd::invalid_argument{};
997 }
998
999 auto filter = get_pgls_filter(type, bp);
1000
1001 hobject_t lower_bound;
1002 try {
1003 lower_bound.decode(bp);
1004 } catch (const buffer::error&) {
1005 throw std::invalid_argument("unable to decode PGLS_FILTER description");
1006 }
1007
1008 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
1009 __func__, cname, mname, type, lower_bound,
1010 static_cast<const void*>(filter.get()));
1011 return seastar::do_with(std::move(filter),
1012 [&, lower_bound=std::move(lower_bound)](auto&& filter) {
1013 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
1014 const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
1015 return do_pgls_common(pg_start,
1016 pg_end,
1017 pg.get_backend(),
1018 lower_bound,
1019 nspace,
1020 osd_op.op.pgls.count,
1021 filter.get())
20effc67 1022 .then_interruptible([&osd_op](bufferlist bl) {
9f95a23c
TL
1023 osd_op.outdata = std::move(bl);
1024 return seastar::now();
1025 });
1026 });
1027}
1028
20effc67 1029PgOpsExecuter::interruptible_future<>
f67539c2 1030PgOpsExecuter::execute_op(OSDOp& osd_op)
9f95a23c
TL
1031{
1032 logger().warn("handling op {}", ceph_osd_op_name(osd_op.op.op));
1033 switch (const ceph_osd_op& op = osd_op.op; op.op) {
1034 case CEPH_OSD_OP_PGLS:
f67539c2 1035 return do_pgls(pg, nspace, osd_op);
9f95a23c 1036 case CEPH_OSD_OP_PGLS_FILTER:
f67539c2 1037 return do_pgls_filtered(pg, nspace, osd_op);
9f95a23c 1038 case CEPH_OSD_OP_PGNLS:
f67539c2 1039 return do_pgnls(pg, nspace, osd_op);
9f95a23c 1040 case CEPH_OSD_OP_PGNLS_FILTER:
f67539c2 1041 return do_pgnls_filtered(pg, nspace, osd_op);
9f95a23c
TL
1042 default:
1043 logger().warn("unknown op {}", ceph_osd_op_name(op.op));
1044 throw std::runtime_error(
1045 fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
1046 }
1047}
1048
1049} // namespace crimson::osd