1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ops_executer.h"
6 #include <boost/range/adaptor/filtered.hpp>
7 #include <boost/range/adaptor/map.hpp>
8 #include <boost/range/adaptor/transformed.hpp>
9 #include <boost/range/algorithm_ext/push_back.hpp>
10 #include <boost/range/algorithm/max_element.hpp>
11 #include <boost/range/numeric.hpp>
13 #include <fmt/format.h>
14 #include <fmt/ostream.h>
16 #include <seastar/core/thread.hh>
18 #include "crimson/osd/exceptions.h"
19 #include "crimson/osd/pg.h"
20 #include "crimson/osd/watch.h"
21 #include "osd/ClassHandler.h"
24 seastar::logger
& logger() {
25 return crimson::get_logger(ceph_subsys_osd
);
29 namespace crimson::osd
{
31 OpsExecuter::call_ierrorator::future
<> OpsExecuter::do_op_call(OSDOp
& osd_op
)
33 std::string cname
, mname
;
34 ceph::bufferlist indata
;
36 auto bp
= std::begin(osd_op
.indata
);
37 bp
.copy(osd_op
.op
.cls
.class_len
, cname
);
38 bp
.copy(osd_op
.op
.cls
.method_len
, mname
);
39 bp
.copy(osd_op
.op
.cls
.indata_len
, indata
);
40 } catch (buffer::error
&) {
41 logger().warn("call unable to decode class + method + indata");
42 return crimson::ct_error::invarg::make();
45 // NOTE: opening a class can actually result in dlopen(), and thus
46 // blocking the entire reactor. Thankfully to ClassHandler's cache
47 // this is supposed to be extremely infrequent.
48 ClassHandler::ClassData
* cls
;
49 int r
= ClassHandler::get_instance().open_class(cname
, &cls
);
51 logger().warn("class {} open got {}", cname
, cpp_strerror(r
));
53 return crimson::ct_error::operation_not_supported::make();
54 } else if (r
== -EPERM
) {
55 // propagate permission errors
56 return crimson::ct_error::permission_denied::make();
58 return crimson::ct_error::input_output_error::make();
61 ClassHandler::ClassMethod
* method
= cls
->get_method(mname
);
63 logger().warn("call method {}.{} does not exist", cname
, mname
);
64 return crimson::ct_error::operation_not_supported::make();
67 const auto flags
= method
->get_flags();
68 if (!obc
->obs
.exists
&& (flags
& CLS_METHOD_WR
) == 0) {
69 return crimson::ct_error::enoent::make();
73 if (flags
& CLS_METHOD_WR
) {
74 ctx
->user_modify
= true;
78 logger().debug("calling method {}.{}, num_read={}, num_write={}",
79 cname
, mname
, num_read
, num_write
);
80 const auto prev_rd
= num_read
;
81 const auto prev_wr
= num_write
;
82 return interruptor::async(
83 [this, method
, indata
=std::move(indata
)]() mutable {
84 ceph::bufferlist outdata
;
85 auto cls_context
= reinterpret_cast<cls_method_context_t
>(this);
86 const auto ret
= method
->exec(cls_context
, indata
, outdata
);
87 return std::make_pair(ret
, std::move(outdata
));
90 [this, prev_rd
, prev_wr
, &osd_op
, flags
]
91 (auto outcome
) -> call_errorator::future
<> {
92 auto& [ret
, outdata
] = outcome
;
95 logger().debug("do_op_call: method returned ret={}, outdata.length()={}"
96 " while num_read={}, num_write={}",
97 ret
, outdata
.length(), num_read
, num_write
);
98 if (num_read
> prev_rd
&& !(flags
& CLS_METHOD_RD
)) {
99 logger().error("method tried to read object but is not marked RD");
101 return crimson::ct_error::input_output_error::make();
103 if (num_write
> prev_wr
&& !(flags
& CLS_METHOD_WR
)) {
104 logger().error("method tried to update object but is not marked WR");
106 return crimson::ct_error::input_output_error::make();
108 // ceph-osd has this implemented in `PrimaryLogPG::execute_ctx`,
109 // grep for `ignore_out_data`.
110 using crimson::common::local_conf
;
111 if (op_info
.allows_returnvec() &&
112 op_info
.may_write() &&
114 outdata
.length() > local_conf()->osd_max_write_op_reply_len
) {
115 // the justification of this limit it to not inflate the pg log.
116 // that's the reason why we don't worry about pure reads.
117 logger().error("outdata overflow due to .length()={}, limit={}",
119 local_conf()->osd_max_write_op_reply_len
);
120 osd_op
.rval
= -EOVERFLOW
;
121 return crimson::ct_error::value_too_large::make();
123 // for write calls we never return data expect errors or RETURNVEC.
124 // please refer cls/cls_hello.cc to details.
125 if (!op_info
.may_write() || op_info
.allows_returnvec() || ret
< 0) {
126 osd_op
.op
.extent
.length
= outdata
.length();
127 osd_op
.outdata
.claim_append(outdata
);
130 return crimson::stateful_ec
{
131 std::error_code(-ret
, std::generic_category()) };
133 return seastar::now();
139 static watch_info_t
create_watch_info(const OSDOp
& osd_op
,
140 const OpsExecuter::ExecutableMessage
& msg
)
142 using crimson::common::local_conf
;
143 const uint32_t timeout
=
144 osd_op
.op
.watch
.timeout
== 0 ? local_conf()->osd_client_watch_timeout
145 : osd_op
.op
.watch
.timeout
;
147 osd_op
.op
.watch
.cookie
,
149 msg
.get_connection()->get_peer_addr()
153 OpsExecuter::watch_ierrorator::future
<> OpsExecuter::do_op_watch_subop_watch(
156 ceph::os::Transaction
& txn
)
158 logger().debug("{}", __func__
);
159 struct connect_ctx_t
{
160 ObjectContext::watch_key_t key
;
161 crimson::net::ConnectionRef conn
;
164 connect_ctx_t(const OSDOp
& osd_op
, const ExecutableMessage
& msg
)
165 : key(osd_op
.op
.watch
.cookie
, msg
.get_reqid().name
),
166 conn(msg
.get_connection()),
167 info(create_watch_info(osd_op
, msg
)) {
170 return with_effect_on_obc(connect_ctx_t
{ osd_op
, get_message() },
172 const auto& entity
= ctx
.key
.second
;
173 auto [it
, emplaced
] =
174 os
.oi
.watchers
.try_emplace(ctx
.key
, std::move(ctx
.info
));
176 logger().info("registered new watch {} by {}", it
->second
, entity
);
179 logger().info("found existing watch {} by {}", it
->second
, entity
);
181 return seastar::now();
183 [] (auto&& ctx
, ObjectContextRef obc
, Ref
<PG
> pg
) {
185 auto [it
, emplaced
] = obc
->watchers
.try_emplace(ctx
.key
, nullptr);
187 const auto& [cookie
, entity
] = ctx
.key
;
188 it
->second
= crimson::osd::Watch::create(
189 obc
, ctx
.info
, entity
, std::move(pg
));
190 logger().info("op_effect: added new watcher: {}", ctx
.key
);
192 logger().info("op_effect: found existing watcher: {}", ctx
.key
);
194 return it
->second
->connect(std::move(ctx
.conn
), true /* will_ping */);
198 OpsExecuter::watch_ierrorator::future
<> OpsExecuter::do_op_watch_subop_reconnect(
201 ceph::os::Transaction
& txn
)
203 const entity_name_t
& entity
= get_message().get_reqid().name
;
204 const auto& cookie
= osd_op
.op
.watch
.cookie
;
205 if (!os
.oi
.watchers
.count(std::make_pair(cookie
, entity
))) {
206 return crimson::ct_error::not_connected::make();
208 logger().info("found existing watch by {}", entity
);
209 return do_op_watch_subop_watch(osd_op
, os
, txn
);
213 OpsExecuter::watch_ierrorator::future
<> OpsExecuter::do_op_watch_subop_unwatch(
216 ceph::os::Transaction
& txn
)
218 logger().info("{}", __func__
);
220 struct disconnect_ctx_t
{
221 ObjectContext::watch_key_t key
;
222 disconnect_ctx_t(const OSDOp
& osd_op
, const ExecutableMessage
& msg
)
223 : key(osd_op
.op
.watch
.cookie
, msg
.get_reqid().name
) {
226 return with_effect_on_obc(disconnect_ctx_t
{ osd_op
, get_message() },
228 const auto& entity
= ctx
.key
.second
;
229 if (auto nh
= os
.oi
.watchers
.extract(ctx
.key
); !nh
.empty()) {
230 logger().info("removed watch {} by {}", nh
.mapped(), entity
);
233 logger().info("can't remove: no watch by {}", entity
);
235 return seastar::now();
237 [] (auto&& ctx
, ObjectContextRef obc
, Ref
<PG
>) {
238 if (auto nh
= obc
->watchers
.extract(ctx
.key
); !nh
.empty()) {
239 return seastar::do_with(std::move(nh
.mapped()),
240 [ctx
](auto&& watcher
) {
241 logger().info("op_effect: disconnect watcher {}", ctx
.key
);
242 return watcher
->remove();
245 logger().info("op_effect: disconnect failed to find watcher {}", ctx
.key
);
246 return seastar::now();
251 OpsExecuter::watch_ierrorator::future
<> OpsExecuter::do_op_watch_subop_ping(
254 ceph::os::Transaction
& txn
)
256 const entity_name_t
& entity
= get_message().get_reqid().name
;
257 const auto& cookie
= osd_op
.op
.watch
.cookie
;
258 const auto key
= std::make_pair(cookie
, entity
);
260 // Note: WATCH with PING doesn't cause may_write() to return true,
261 // so if there is nothing else in the transaction, this is going
262 // to run do_osd_op_effects, but not write out a log entry */
263 if (!os
.oi
.watchers
.count(key
)) {
264 return crimson::ct_error::not_connected::make();
266 auto it
= obc
->watchers
.find(key
);
267 if (it
== std::end(obc
->watchers
) || !it
->second
->is_connected()) {
268 return crimson::ct_error::timed_out::make();
270 logger().info("found existing watch by {}", entity
);
271 it
->second
->got_ping(ceph_clock_now());
272 return seastar::now();
275 OpsExecuter::watch_ierrorator::future
<> OpsExecuter::do_op_watch(
278 ceph::os::Transaction
& txn
)
280 logger().debug("{}", __func__
);
282 return crimson::ct_error::enoent::make();
284 switch (osd_op
.op
.watch
.op
) {
285 case CEPH_OSD_WATCH_OP_WATCH
:
286 return do_op_watch_subop_watch(osd_op
, os
, txn
);
287 case CEPH_OSD_WATCH_OP_RECONNECT
:
288 return do_op_watch_subop_reconnect(osd_op
, os
, txn
);
289 case CEPH_OSD_WATCH_OP_PING
:
290 return do_op_watch_subop_ping(osd_op
, os
, txn
);
291 case CEPH_OSD_WATCH_OP_UNWATCH
:
292 return do_op_watch_subop_unwatch(osd_op
, os
, txn
);
293 case CEPH_OSD_WATCH_OP_LEGACY_WATCH
:
294 logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH");
295 return crimson::ct_error::invarg::make();
297 logger().warn("unrecognized WATCH subop: {}", osd_op
.op
.watch
.op
);
298 return crimson::ct_error::invarg::make();
301 static uint64_t get_next_notify_id(epoch_t e
)
304 static std::uint64_t next_notify_id
= 0;
305 return (((uint64_t)e
) << 32) | ((uint64_t)(next_notify_id
++));
308 OpsExecuter::watch_ierrorator::future
<> OpsExecuter::do_op_notify(
310 const ObjectState
& os
)
312 logger().debug("{}, msg epoch: {}", __func__
, get_message().get_map_epoch());
315 return crimson::ct_error::enoent::make();
317 struct notify_ctx_t
{
318 crimson::net::ConnectionRef conn
;
320 const uint64_t client_gid
;
323 notify_ctx_t(const ExecutableMessage
& msg
)
324 : conn(msg
.get_connection()),
325 client_gid(msg
.get_reqid().name
.num()),
326 epoch(msg
.get_map_epoch()) {
329 return with_effect_on_obc(notify_ctx_t
{ get_message() },
332 auto bp
= osd_op
.indata
.cbegin();
333 uint32_t ver
; // obsolete
334 ceph::decode(ver
, bp
);
335 ceph::decode(ctx
.ninfo
.timeout
, bp
);
336 ceph::decode(ctx
.ninfo
.bl
, bp
);
337 } catch (const buffer::error
&) {
338 ctx
.ninfo
.timeout
= 0;
340 if (!ctx
.ninfo
.timeout
) {
341 using crimson::common::local_conf
;
342 ctx
.ninfo
.timeout
= local_conf()->osd_default_notify_timeout
;
344 ctx
.ninfo
.notify_id
= get_next_notify_id(ctx
.epoch
);
345 ctx
.ninfo
.cookie
= osd_op
.op
.notify
.cookie
;
346 // return our unique notify id to the client
347 ceph::encode(ctx
.ninfo
.notify_id
, osd_op
.outdata
);
348 return seastar::now();
350 [] (auto&& ctx
, ObjectContextRef obc
, Ref
<PG
>) {
351 auto alive_watchers
= obc
->watchers
| boost::adaptors::map_values
352 | boost::adaptors::filtered(
354 // FIXME: filter as for the `is_ping` in `Watch::start_notify`
355 return w
->is_alive();
357 return crimson::osd::Notify::create_n_propagate(
358 std::begin(alive_watchers
),
359 std::end(alive_watchers
),
363 obc
->obs
.oi
.user_version
);
367 OpsExecuter::watch_ierrorator::future
<> OpsExecuter::do_op_list_watchers(
369 const ObjectState
& os
)
371 logger().debug("{}", __func__
);
373 obj_list_watch_response_t response
;
374 for (const auto& [key
, info
] : os
.oi
.watchers
) {
375 logger().debug("{}: key cookie={}, entity={}",
376 __func__
, key
.first
, key
.second
);
377 assert(key
.first
== info
.cookie
);
378 assert(key
.second
.is_client());
379 response
.entries
.emplace_back(watch_item_t
{
380 key
.second
, info
.cookie
, info
.timeout_seconds
, info
.addr
});
381 response
.encode(osd_op
.outdata
, get_message().get_features());
383 return watch_ierrorator::now();
386 OpsExecuter::watch_ierrorator::future
<> OpsExecuter::do_op_notify_ack(
388 const ObjectState
& os
)
390 logger().debug("{}", __func__
);
392 struct notifyack_ctx_t
{
393 const entity_name_t entity
;
394 uint64_t watch_cookie
;
396 ceph::bufferlist reply_bl
;
398 notifyack_ctx_t(const ExecutableMessage
& msg
)
399 : entity(msg
.get_reqid().name
) {
402 return with_effect_on_obc(notifyack_ctx_t
{ get_message() },
403 [&] (auto& ctx
) -> watch_errorator::future
<> {
405 auto bp
= osd_op
.indata
.cbegin();
406 ceph::decode(ctx
.notify_id
, bp
);
407 ceph::decode(ctx
.watch_cookie
, bp
);
409 ceph::decode(ctx
.reply_bl
, bp
);
411 } catch (const buffer::error
&) {
412 // here we behave differently than ceph-osd. For historical reasons,
413 // it falls back to using `osd_op.op.watch.cookie` as `ctx.notify_id`.
414 // crimson just returns EINVAL if the data cannot be decoded.
415 return crimson::ct_error::invarg::make();
417 return watch_errorator::now();
419 [] (auto&& ctx
, ObjectContextRef obc
, Ref
<PG
>) {
420 logger().info("notify_ack watch_cookie={}, notify_id={}",
421 ctx
.watch_cookie
, ctx
.notify_id
);
422 return seastar::do_for_each(obc
->watchers
,
423 [ctx
=std::move(ctx
)] (auto& kv
) {
424 const auto& [key
, watchp
] = kv
;
426 std::is_same_v
<std::decay_t
<decltype(watchp
)>,
427 seastar::shared_ptr
<crimson::osd::Watch
>>);
428 auto& [cookie
, entity
] = key
;
429 if (ctx
.entity
!= entity
) {
430 logger().debug("skipping watch {}; entity name {} != {}",
431 key
, entity
, ctx
.entity
);
432 return seastar::now();
434 if (ctx
.watch_cookie
!= cookie
) {
435 logger().debug("skipping watch {}; cookie {} != {}",
436 key
, ctx
.watch_cookie
, cookie
);
437 return seastar::now();
439 logger().info("acking notify on watch {}", key
);
440 return watchp
->notify_ack(ctx
.notify_id
, ctx
.reply_bl
);
445 // Defined here because there is a circular dependency between OpsExecuter and PG
446 template <class Func
>
447 auto OpsExecuter::do_const_op(Func
&& f
) {
448 // TODO: pass backend as read-only
449 return std::forward
<Func
>(f
)(pg
->get_backend(), std::as_const(obc
->obs
));
452 // Defined here because there is a circular dependency between OpsExecuter and PG
453 template <class Func
>
454 auto OpsExecuter::do_write_op(Func
&& f
, bool um
) {
456 if (!osd_op_params
) {
457 osd_op_params
.emplace();
460 return std::forward
<Func
>(f
)(pg
->get_backend(), obc
->obs
, txn
);
463 OpsExecuter::interruptible_errorated_future
<OpsExecuter::osd_op_errorator
>
464 OpsExecuter::execute_op(OSDOp
& osd_op
)
466 // TODO: dispatch via call table?
467 // TODO: we might want to find a way to unify both input and output
470 "handling op {} on object {}",
471 ceph_osd_op_name(osd_op
.op
.op
),
473 switch (const ceph_osd_op
& op
= osd_op
.op
; op
.op
) {
474 case CEPH_OSD_OP_SYNC_READ
:
476 case CEPH_OSD_OP_READ
:
477 return do_read_op([this, &osd_op
] (auto& backend
, const auto& os
) {
478 return backend
.read(os
, osd_op
, delta_stats
);
480 case CEPH_OSD_OP_SPARSE_READ
:
481 return do_read_op([this, &osd_op
] (auto& backend
, const auto& os
) {
482 return backend
.sparse_read(os
, osd_op
, delta_stats
);
484 case CEPH_OSD_OP_CHECKSUM
:
485 return do_read_op([&osd_op
] (auto& backend
, const auto& os
) {
486 return backend
.checksum(os
, osd_op
);
488 case CEPH_OSD_OP_CMPEXT
:
489 return do_read_op([&osd_op
] (auto& backend
, const auto& os
) {
490 return backend
.cmp_ext(os
, osd_op
);
492 case CEPH_OSD_OP_GETXATTR
:
493 return do_read_op([this, &osd_op
] (auto& backend
, const auto& os
) {
494 return backend
.getxattr(os
, osd_op
, delta_stats
);
496 case CEPH_OSD_OP_GETXATTRS
:
497 return do_read_op([this, &osd_op
] (auto& backend
, const auto& os
) {
498 return backend
.get_xattrs(os
, osd_op
, delta_stats
);
500 case CEPH_OSD_OP_CMPXATTR
:
501 return do_read_op([this, &osd_op
] (auto& backend
, const auto& os
) {
502 return backend
.cmp_xattr(os
, osd_op
, delta_stats
);
504 case CEPH_OSD_OP_RMXATTR
:
506 [&osd_op
] (auto& backend
, auto& os
, auto& txn
) {
507 return backend
.rm_xattr(os
, osd_op
, txn
);
509 case CEPH_OSD_OP_CREATE
:
510 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
511 return backend
.create(os
, osd_op
, txn
, delta_stats
);
513 case CEPH_OSD_OP_WRITE
:
514 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
515 return backend
.write(os
, osd_op
, txn
, *osd_op_params
, delta_stats
);
517 case CEPH_OSD_OP_WRITESAME
:
518 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
519 return backend
.write_same(os
, osd_op
, txn
, *osd_op_params
, delta_stats
);
521 case CEPH_OSD_OP_WRITEFULL
:
522 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
523 return backend
.writefull(os
, osd_op
, txn
, *osd_op_params
, delta_stats
);
525 case CEPH_OSD_OP_APPEND
:
526 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
527 return backend
.append(os
, osd_op
, txn
, *osd_op_params
, delta_stats
);
529 case CEPH_OSD_OP_TRUNCATE
:
530 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
531 // FIXME: rework needed. Move this out to do_write_op(), introduce
532 // do_write_op_no_user_modify()...
533 return backend
.truncate(os
, osd_op
, txn
, *osd_op_params
, delta_stats
);
535 case CEPH_OSD_OP_ZERO
:
536 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
537 return backend
.zero(os
, osd_op
, txn
, *osd_op_params
, delta_stats
);
539 case CEPH_OSD_OP_SETALLOCHINT
:
540 return osd_op_errorator::now();
541 case CEPH_OSD_OP_SETXATTR
:
542 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
543 return backend
.setxattr(os
, osd_op
, txn
, delta_stats
);
545 case CEPH_OSD_OP_DELETE
:
546 return do_write_op([this] (auto& backend
, auto& os
, auto& txn
) {
547 return backend
.remove(os
, txn
, delta_stats
);
549 case CEPH_OSD_OP_CALL
:
550 return this->do_op_call(osd_op
);
551 case CEPH_OSD_OP_STAT
:
552 // note: stat does not require RD
553 return do_const_op([this, &osd_op
] (/* const */auto& backend
, const auto& os
) {
554 return backend
.stat(os
, osd_op
, delta_stats
);
556 case CEPH_OSD_OP_TMAPUP
:
557 // TODO: there was an effort to kill TMAP in ceph-osd. According to
558 // @dzafman this isn't possible yet. Maybe it could be accomplished
559 // before crimson's readiness and we'd luckily don't need to carry.
560 return dont_do_legacy_op();
563 case CEPH_OSD_OP_OMAPGETKEYS
:
564 return do_read_op([this, &osd_op
] (auto& backend
, const auto& os
) {
565 return backend
.omap_get_keys(os
, osd_op
, delta_stats
);
567 case CEPH_OSD_OP_OMAPGETVALS
:
568 return do_read_op([this, &osd_op
] (auto& backend
, const auto& os
) {
569 return backend
.omap_get_vals(os
, osd_op
, delta_stats
);
571 case CEPH_OSD_OP_OMAPGETHEADER
:
572 return do_read_op([this, &osd_op
] (auto& backend
, const auto& os
) {
573 return backend
.omap_get_header(os
, osd_op
, delta_stats
);
575 case CEPH_OSD_OP_OMAPGETVALSBYKEYS
:
576 return do_read_op([this, &osd_op
] (auto& backend
, const auto& os
) {
577 return backend
.omap_get_vals_by_keys(os
, osd_op
, delta_stats
);
579 case CEPH_OSD_OP_OMAPSETVALS
:
581 if (!pg
.get_pool().info
.supports_omap()) {
582 return crimson::ct_error::operation_not_supported::make();
585 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
586 return backend
.omap_set_vals(os
, osd_op
, txn
, *osd_op_params
, delta_stats
);
588 case CEPH_OSD_OP_OMAPSETHEADER
:
590 if (!pg
.get_pool().info
.supports_omap()) {
591 return crimson::ct_error::operation_not_supported::make();
594 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
595 return backend
.omap_set_header(os
, osd_op
, txn
, *osd_op_params
,
598 case CEPH_OSD_OP_OMAPRMKEYRANGE
:
600 if (!pg
.get_pool().info
.supports_omap()) {
601 return crimson::ct_error::operation_not_supported::make();
604 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
605 return backend
.omap_remove_range(os
, osd_op
, txn
, delta_stats
);
607 case CEPH_OSD_OP_OMAPRMKEYS
:
608 /** TODO: Implement supports_omap()
609 if (!pg.get_pool().info.supports_omap()) {
610 return crimson::ct_error::operation_not_supported::make();
612 return do_write_op([&osd_op
] (auto& backend
, auto& os
, auto& txn
) {
613 return backend
.omap_remove_key(os
, osd_op
, txn
);
615 case CEPH_OSD_OP_OMAPCLEAR
:
616 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
617 return backend
.omap_clear(os
, osd_op
, txn
, *osd_op_params
, delta_stats
);
621 case CEPH_OSD_OP_WATCH
:
622 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
623 return do_op_watch(osd_op
, os
, txn
);
625 case CEPH_OSD_OP_LIST_WATCHERS
:
626 return do_read_op([this, &osd_op
] (auto&, const auto& os
) {
627 return do_op_list_watchers(osd_op
, os
);
629 case CEPH_OSD_OP_NOTIFY
:
630 return do_read_op([this, &osd_op
] (auto&, const auto& os
) {
631 return do_op_notify(osd_op
, os
);
633 case CEPH_OSD_OP_NOTIFY_ACK
:
634 return do_read_op([this, &osd_op
] (auto&, const auto& os
) {
635 return do_op_notify_ack(osd_op
, os
);
639 logger().warn("unknown op {}", ceph_osd_op_name(op
.op
));
640 throw std::runtime_error(
641 fmt::format("op '{}' not supported", ceph_osd_op_name(op
.op
)));
645 // Defined here because there is a circular dependency between OpsExecuter and PG
646 uint32_t OpsExecuter::get_pool_stripe_width() const {
647 return pg
->get_pool().info
.get_stripe_width();
650 // Defined here because there is a circular dependency between OpsExecuter and PG
651 version_t
OpsExecuter::get_last_user_version() const
653 return pg
->get_last_user_version();
656 static inline std::unique_ptr
<const PGLSFilter
> get_pgls_filter(
657 const std::string
& type
,
658 bufferlist::const_iterator
& iter
)
660 // storing non-const PGLSFilter for the sake of ::init()
661 std::unique_ptr
<PGLSFilter
> filter
;
662 if (type
.compare("plain") == 0) {
663 filter
= std::make_unique
<PGLSPlainFilter
>();
665 std::size_t dot
= type
.find(".");
666 if (dot
== type
.npos
|| dot
== 0 || dot
== type
.size() - 1) {
667 throw crimson::osd::invalid_argument
{};
670 const std::string class_name
= type
.substr(0, dot
);
671 const std::string filter_name
= type
.substr(dot
+ 1);
672 ClassHandler::ClassData
*cls
= nullptr;
673 int r
= ClassHandler::get_instance().open_class(class_name
, &cls
);
675 logger().warn("can't open class {}: {}", class_name
, cpp_strerror(r
));
677 // propogate permission error
678 throw crimson::osd::permission_denied
{};
680 throw crimson::osd::invalid_argument
{};
686 ClassHandler::ClassFilter
* const class_filter
= cls
->get_filter(filter_name
);
687 if (class_filter
== nullptr) {
688 logger().warn("can't find filter {} in class {}", filter_name
, class_name
);
689 throw crimson::osd::invalid_argument
{};
692 filter
.reset(class_filter
->fn());
694 // Object classes are obliged to return us something, but let's
695 // give an error rather than asserting out.
696 logger().warn("buggy class {} failed to construct filter {}",
697 class_name
, filter_name
);
698 throw crimson::osd::invalid_argument
{};
703 int r
= filter
->init(iter
);
705 logger().warn("error initializing filter {}: {}", type
, cpp_strerror(r
));
706 throw crimson::osd::invalid_argument
{};
709 // successfully constructed and initialized, return it.
713 static PG::interruptible_future
<hobject_t
> pgls_filter(
714 const PGLSFilter
& filter
,
715 const PGBackend
& backend
,
716 const hobject_t
& sobj
)
718 if (const auto xattr
= filter
.get_xattr(); !xattr
.empty()) {
719 logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
721 return backend
.getxattr(sobj
, xattr
).safe_then_interruptible(
722 [&filter
, sobj
] (ceph::bufferlist val
) {
723 logger().debug("pgls_filter: got xvalue for obj={}", sobj
);
725 const bool filtered
= filter
.filter(sobj
, val
);
726 return seastar::make_ready_future
<hobject_t
>(filtered
? sobj
: hobject_t
{});
727 }, PGBackend::get_attr_errorator::all_same_way([&filter
, sobj
] {
728 logger().debug("pgls_filter: got error for obj={}", sobj
);
730 if (filter
.reject_empty_xattr()) {
731 return seastar::make_ready_future
<hobject_t
>();
733 ceph::bufferlist val
;
734 const bool filtered
= filter
.filter(sobj
, val
);
735 return seastar::make_ready_future
<hobject_t
>(filtered
? sobj
: hobject_t
{});
738 ceph::bufferlist empty_lvalue_bl
;
739 const bool filtered
= filter
.filter(sobj
, empty_lvalue_bl
);
740 return seastar::make_ready_future
<hobject_t
>(filtered
? sobj
: hobject_t
{});
744 static PG::interruptible_future
<ceph::bufferlist
> do_pgnls_common(
745 const hobject_t
& pg_start
,
746 const hobject_t
& pg_end
,
747 const PGBackend
& backend
,
748 const hobject_t
& lower_bound
,
749 const std::string
& nspace
,
750 const uint64_t limit
,
751 const PGLSFilter
* const filter
)
753 if (!(lower_bound
.is_min() ||
754 lower_bound
.is_max() ||
755 (lower_bound
>= pg_start
&& lower_bound
< pg_end
))) {
756 // this should only happen with a buggy client.
757 throw std::invalid_argument("outside of PG bounds");
760 return backend
.list_objects(lower_bound
, limit
).then_interruptible(
761 [&backend
, filter
, nspace
](auto&& ret
)
762 -> PG::interruptible_future
<std::tuple
<std::vector
<hobject_t
>, hobject_t
>> {
763 auto& [objects
, next
] = ret
;
764 auto in_my_namespace
= [&nspace
](const hobject_t
& obj
) {
765 using crimson::common::local_conf
;
766 if (obj
.get_namespace() == local_conf()->osd_hit_set_namespace
) {
768 } else if (nspace
== librados::all_nspaces
) {
771 return obj
.get_namespace() == nspace
;
774 auto to_pglsed
= [&backend
, filter
] (const hobject_t
& obj
)
775 -> PG::interruptible_future
<hobject_t
> {
776 // this transformation looks costly. However, I don't have any
777 // reason to think PGLS* operations are critical for, let's say,
778 // general performance.
780 // from tchaikov: "another way is to use seastar::map_reduce(),
781 // to 1) save the effort to filter the already filtered objects
782 // 2) avoid the space to keep the tuple<bool, object> even if
783 // the object is filtered out".
785 return pgls_filter(*filter
, backend
, obj
);
787 return seastar::make_ready_future
<hobject_t
>(obj
);
791 auto range
= objects
| boost::adaptors::filtered(in_my_namespace
)
792 | boost::adaptors::transformed(to_pglsed
);
793 logger().debug("do_pgnls_common: finishing the 1st stage of pgls");
794 return seastar::when_all_succeed(std::begin(range
),
795 std::end(range
)).then(
796 [next
=std::move(next
)] (auto items
) mutable {
797 // the sole purpose of this chaining is to pass `next` to 2nd
798 // stage altogether with items
799 logger().debug("do_pgnls_common: 1st done");
800 return seastar::make_ready_future
<
801 std::tuple
<std::vector
<hobject_t
>, hobject_t
>>(
802 std::move(items
), std::move(next
));
804 }).then_interruptible(
805 [pg_end
] (auto&& ret
) {
806 auto& [items
, next
] = ret
;
807 auto is_matched
= [] (const auto& obj
) {
808 return !obj
.is_min();
810 auto to_entry
= [] (const auto& obj
) {
811 return librados::ListObjectImpl
{
812 obj
.get_namespace(), obj
.oid
.name
, obj
.get_key()
816 pg_nls_response_t response
;
817 boost::push_back(response
.entries
, items
| boost::adaptors::filtered(is_matched
)
818 | boost::adaptors::transformed(to_entry
));
819 response
.handle
= next
.is_max() ? pg_end
: next
;
820 ceph::bufferlist out
;
821 encode(response
, out
);
822 logger().debug("{}: response.entries.size()=",
823 __func__
, response
.entries
.size());
824 return seastar::make_ready_future
<ceph::bufferlist
>(std::move(out
));
828 static PG::interruptible_future
<> do_pgnls(
830 const std::string
& nspace
,
833 hobject_t lower_bound
;
835 ceph::decode(lower_bound
, osd_op
.indata
);
836 } catch (const buffer::error
&) {
837 throw std::invalid_argument("unable to decode PGNLS handle");
839 const auto pg_start
= pg
.get_pgid().pgid
.get_hobj_start();
840 const auto pg_end
= \
841 pg
.get_pgid().pgid
.get_hobj_end(pg
.get_pool().info
.get_pg_num());
842 return do_pgnls_common(pg_start
,
847 osd_op
.op
.pgls
.count
,
848 nullptr /* no filter */)
849 .then_interruptible([&osd_op
](bufferlist bl
) {
850 osd_op
.outdata
= std::move(bl
);
851 return seastar::now();
855 static PG::interruptible_future
<> do_pgnls_filtered(
857 const std::string
& nspace
,
860 std::string cname
, mname
, type
;
861 auto bp
= osd_op
.indata
.cbegin();
863 ceph::decode(cname
, bp
);
864 ceph::decode(mname
, bp
);
865 ceph::decode(type
, bp
);
866 } catch (const buffer::error
&) {
867 throw crimson::osd::invalid_argument
{};
870 auto filter
= get_pgls_filter(type
, bp
);
872 hobject_t lower_bound
;
874 lower_bound
.decode(bp
);
875 } catch (const buffer::error
&) {
876 throw std::invalid_argument("unable to decode PGNLS_FILTER description");
879 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
880 __func__
, cname
, mname
, type
, lower_bound
,
881 static_cast<const void*>(filter
.get()));
882 return seastar::do_with(std::move(filter
),
883 [&, lower_bound
=std::move(lower_bound
)](auto&& filter
) {
884 const auto pg_start
= pg
.get_pgid().pgid
.get_hobj_start();
885 const auto pg_end
= pg
.get_pgid().pgid
.get_hobj_end(pg
.get_pool().info
.get_pg_num());
886 return do_pgnls_common(pg_start
,
891 osd_op
.op
.pgls
.count
,
893 .then_interruptible([&osd_op
](bufferlist bl
) {
894 osd_op
.outdata
= std::move(bl
);
895 return seastar::now();
900 static PG::interruptible_future
<ceph::bufferlist
> do_pgls_common(
901 const hobject_t
& pg_start
,
902 const hobject_t
& pg_end
,
903 const PGBackend
& backend
,
904 const hobject_t
& lower_bound
,
905 const std::string
& nspace
,
906 const uint64_t limit
,
907 const PGLSFilter
* const filter
)
909 if (!(lower_bound
.is_min() ||
910 lower_bound
.is_max() ||
911 (lower_bound
>= pg_start
&& lower_bound
< pg_end
))) {
912 // this should only happen with a buggy client.
913 throw std::invalid_argument("outside of PG bounds");
916 using entries_t
= decltype(pg_ls_response_t::entries
);
917 return backend
.list_objects(lower_bound
, limit
).then_interruptible(
918 [&backend
, filter
, nspace
](auto&& ret
) {
919 auto& [objects
, next
] = ret
;
920 return PG::interruptor::when_all(
921 PG::interruptor::map_reduce(std::move(objects
),
922 [&backend
, filter
, nspace
](const hobject_t
& obj
)
923 -> PG::interruptible_future
<hobject_t
>{
924 if (obj
.get_namespace() == nspace
) {
926 return pgls_filter(*filter
, backend
, obj
);
928 return seastar::make_ready_future
<hobject_t
>(obj
);
931 return seastar::make_ready_future
<hobject_t
>();
935 [](entries_t entries
, hobject_t obj
) {
937 entries
.emplace_back(obj
.oid
, obj
.get_key());
941 seastar::make_ready_future
<hobject_t
>(next
));
942 }).then_interruptible([pg_end
](auto&& ret
) {
943 auto entries
= std::move(std::get
<0>(ret
).get0());
944 auto next
= std::move(std::get
<1>(ret
).get0());
945 pg_ls_response_t response
;
946 response
.handle
= next
.is_max() ? pg_end
: next
;
947 response
.entries
= std::move(entries
);
948 ceph::bufferlist out
;
949 encode(response
, out
);
950 logger().debug("{}: response.entries.size()=",
951 __func__
, response
.entries
.size());
952 return seastar::make_ready_future
<ceph::bufferlist
>(std::move(out
));
956 static PG::interruptible_future
<> do_pgls(
958 const std::string
& nspace
,
961 hobject_t lower_bound
;
962 auto bp
= osd_op
.indata
.cbegin();
964 lower_bound
.decode(bp
);
965 } catch (const buffer::error
&) {
966 throw std::invalid_argument
{"unable to decode PGLS handle"};
968 const auto pg_start
= pg
.get_pgid().pgid
.get_hobj_start();
970 pg
.get_pgid().pgid
.get_hobj_end(pg
.get_pool().info
.get_pg_num());
971 return do_pgls_common(pg_start
,
976 osd_op
.op
.pgls
.count
,
977 nullptr /* no filter */)
978 .then_interruptible([&osd_op
](bufferlist bl
) {
979 osd_op
.outdata
= std::move(bl
);
980 return seastar::now();
984 static PG::interruptible_future
<> do_pgls_filtered(
986 const std::string
& nspace
,
989 std::string cname
, mname
, type
;
990 auto bp
= osd_op
.indata
.cbegin();
992 ceph::decode(cname
, bp
);
993 ceph::decode(mname
, bp
);
994 ceph::decode(type
, bp
);
995 } catch (const buffer::error
&) {
996 throw crimson::osd::invalid_argument
{};
999 auto filter
= get_pgls_filter(type
, bp
);
1001 hobject_t lower_bound
;
1003 lower_bound
.decode(bp
);
1004 } catch (const buffer::error
&) {
1005 throw std::invalid_argument("unable to decode PGLS_FILTER description");
1008 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
1009 __func__
, cname
, mname
, type
, lower_bound
,
1010 static_cast<const void*>(filter
.get()));
1011 return seastar::do_with(std::move(filter
),
1012 [&, lower_bound
=std::move(lower_bound
)](auto&& filter
) {
1013 const auto pg_start
= pg
.get_pgid().pgid
.get_hobj_start();
1014 const auto pg_end
= pg
.get_pgid().pgid
.get_hobj_end(pg
.get_pool().info
.get_pg_num());
1015 return do_pgls_common(pg_start
,
1020 osd_op
.op
.pgls
.count
,
1022 .then_interruptible([&osd_op
](bufferlist bl
) {
1023 osd_op
.outdata
= std::move(bl
);
1024 return seastar::now();
1029 PgOpsExecuter::interruptible_future
<>
1030 PgOpsExecuter::execute_op(OSDOp
& osd_op
)
1032 logger().warn("handling op {}", ceph_osd_op_name(osd_op
.op
.op
));
1033 switch (const ceph_osd_op
& op
= osd_op
.op
; op
.op
) {
1034 case CEPH_OSD_OP_PGLS
:
1035 return do_pgls(pg
, nspace
, osd_op
);
1036 case CEPH_OSD_OP_PGLS_FILTER
:
1037 return do_pgls_filtered(pg
, nspace
, osd_op
);
1038 case CEPH_OSD_OP_PGNLS
:
1039 return do_pgnls(pg
, nspace
, osd_op
);
1040 case CEPH_OSD_OP_PGNLS_FILTER
:
1041 return do_pgnls_filtered(pg
, nspace
, osd_op
);
1043 logger().warn("unknown op {}", ceph_osd_op_name(op
.op
));
1044 throw std::runtime_error(
1045 fmt::format("op '{}' not supported", ceph_osd_op_name(op
.op
)));
1049 } // namespace crimson::osd