1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "ops_executer.h"
6 #include <boost/range/adaptor/filtered.hpp>
7 #include <boost/range/adaptor/map.hpp>
8 #include <boost/range/adaptor/transformed.hpp>
9 #include <boost/range/algorithm_ext/push_back.hpp>
10 #include <boost/range/algorithm/max_element.hpp>
11 #include <boost/range/numeric.hpp>
13 #include <fmt/format.h>
14 #include <fmt/ostream.h>
16 #include <seastar/core/thread.hh>
18 #include "crimson/osd/exceptions.h"
19 #include "crimson/osd/watch.h"
20 #include "osd/ClassHandler.h"
23 seastar::logger
& logger() {
24 return crimson::get_logger(ceph_subsys_osd
);
28 namespace crimson::osd
{
30 OpsExecuter::call_errorator::future
<> OpsExecuter::do_op_call(OSDOp
& osd_op
)
32 std::string cname
, mname
;
33 ceph::bufferlist indata
;
35 auto bp
= std::begin(osd_op
.indata
);
36 bp
.copy(osd_op
.op
.cls
.class_len
, cname
);
37 bp
.copy(osd_op
.op
.cls
.method_len
, mname
);
38 bp
.copy(osd_op
.op
.cls
.indata_len
, indata
);
39 } catch (buffer::error
&) {
40 logger().warn("call unable to decode class + method + indata");
41 return crimson::ct_error::invarg::make();
44 // NOTE: opening a class can actually result in dlopen(), and thus
45 // blocking the entire reactor. Thankfully to ClassHandler's cache
46 // this is supposed to be extremely infrequent.
47 ClassHandler::ClassData
* cls
;
48 int r
= ClassHandler::get_instance().open_class(cname
, &cls
);
50 logger().warn("class {} open got {}", cname
, cpp_strerror(r
));
52 return crimson::ct_error::operation_not_supported::make();
53 } else if (r
== -EPERM
) {
54 // propagate permission errors
55 return crimson::ct_error::permission_denied::make();
57 return crimson::ct_error::input_output_error::make();
60 ClassHandler::ClassMethod
* method
= cls
->get_method(mname
);
62 logger().warn("call method {}.{} does not exist", cname
, mname
);
63 return crimson::ct_error::operation_not_supported::make();
66 const auto flags
= method
->get_flags();
67 if (!obc
->obs
.exists
&& (flags
& CLS_METHOD_WR
) == 0) {
68 return crimson::ct_error::enoent::make();
72 if (flags
& CLS_METHOD_WR
) {
73 ctx
->user_modify
= true;
77 logger().debug("calling method {}.{}", cname
, mname
);
78 return seastar::async(
79 [this, method
, indata
=std::move(indata
)]() mutable {
80 ceph::bufferlist outdata
;
81 auto cls_context
= reinterpret_cast<cls_method_context_t
>(this);
82 const auto ret
= method
->exec(cls_context
, indata
, outdata
);
83 return std::make_pair(ret
, std::move(outdata
));
86 [prev_rd
= num_read
, prev_wr
= num_write
, this, &osd_op
, flags
]
87 (auto outcome
) -> call_errorator::future
<> {
88 auto& [ret
, outdata
] = outcome
;
90 if (num_read
> prev_rd
&& !(flags
& CLS_METHOD_RD
)) {
91 logger().error("method tried to read object but is not marked RD");
92 return crimson::ct_error::input_output_error::make();
94 if (num_write
> prev_wr
&& !(flags
& CLS_METHOD_WR
)) {
95 logger().error("method tried to update object but is not marked WR");
96 return crimson::ct_error::input_output_error::make();
99 // for write calls we never return data expect errors. For details refer
100 // to cls/cls_hello.cc.
101 if (ret
< 0 || (flags
& CLS_METHOD_WR
) == 0) {
102 logger().debug("method called response length={}", outdata
.length());
103 osd_op
.op
.extent
.length
= outdata
.length();
104 osd_op
.outdata
.claim_append(outdata
);
107 return crimson::stateful_ec
{ std::error_code(-ret
, std::generic_category()) };
109 return seastar::now();
114 static watch_info_t
create_watch_info(const OSDOp
& osd_op
,
117 using crimson::common::local_conf
;
118 const uint32_t timeout
=
119 osd_op
.op
.watch
.timeout
== 0 ? local_conf()->osd_client_watch_timeout
120 : osd_op
.op
.watch
.timeout
;
122 osd_op
.op
.watch
.cookie
,
124 msg
.get_connection()->get_peer_addr()
128 OpsExecuter::watch_errorator::future
<> OpsExecuter::do_op_watch_subop_watch(
131 ceph::os::Transaction
& txn
)
133 struct connect_ctx_t
{
134 ObjectContext::watch_key_t key
;
135 crimson::net::ConnectionRef conn
;
138 connect_ctx_t(const OSDOp
& osd_op
, const MOSDOp
& msg
)
139 : key(osd_op
.op
.watch
.cookie
, msg
.get_reqid().name
),
140 conn(msg
.get_connection()),
141 info(create_watch_info(osd_op
, msg
)) {
144 return with_effect_on_obc(connect_ctx_t
{ osd_op
, get_message() },
146 const auto& entity
= ctx
.key
.second
;
147 auto [it
, emplaced
] =
148 os
.oi
.watchers
.try_emplace(ctx
.key
, std::move(ctx
.info
));
150 logger().info("registered new watch {} by {}", it
->second
, entity
);
153 logger().info("found existing watch {} by {}", it
->second
, entity
);
155 return seastar::now();
157 [] (auto&& ctx
, ObjectContextRef obc
) {
158 auto [it
, emplaced
] = obc
->watchers
.try_emplace(ctx
.key
, nullptr);
160 const auto& [cookie
, entity
] = ctx
.key
;
161 it
->second
= crimson::osd::Watch::create(obc
, ctx
.info
, entity
);
162 logger().info("op_effect: added new watcher: {}", ctx
.key
);
164 logger().info("op_effect: found existing watcher: {}", ctx
.key
);
166 return it
->second
->connect(std::move(ctx
.conn
), true /* will_ping */);
170 OpsExecuter::watch_errorator::future
<> OpsExecuter::do_op_watch_subop_reconnect(
173 ceph::os::Transaction
& txn
)
175 const entity_name_t
& entity
= get_message().get_reqid().name
;
176 const auto& cookie
= osd_op
.op
.watch
.cookie
;
177 if (!os
.oi
.watchers
.count(std::make_pair(cookie
, entity
))) {
178 return crimson::ct_error::not_connected::make();
180 logger().info("found existing watch by {}", entity
);
181 return do_op_watch_subop_watch(osd_op
, os
, txn
);
185 OpsExecuter::watch_errorator::future
<> OpsExecuter::do_op_watch_subop_unwatch(
188 ceph::os::Transaction
& txn
)
190 logger().info("{}", __func__
);
192 struct disconnect_ctx_t
{
193 ObjectContext::watch_key_t key
;
194 bool send_disconnect
{ false };
196 disconnect_ctx_t(const OSDOp
& osd_op
, const MOSDOp
& msg
)
197 : key(osd_op
.op
.watch
.cookie
, msg
.get_reqid().name
) {
200 return with_effect_on_obc(disconnect_ctx_t
{ osd_op
, get_message() },
202 const auto& entity
= ctx
.key
.second
;
203 if (auto nh
= os
.oi
.watchers
.extract(ctx
.key
); !nh
.empty()) {
204 logger().info("removed watch {} by {}", nh
.mapped(), entity
);
207 logger().info("can't remove: no watch by {}", entity
);
209 return seastar::now();
211 [] (auto&& ctx
, ObjectContextRef obc
) {
212 if (auto nh
= obc
->watchers
.extract(ctx
.key
); !nh
.empty()) {
213 return seastar::do_with(std::move(nh
.mapped()),
214 [ctx
](auto&& watcher
) {
215 logger().info("op_effect: disconnect watcher {}", ctx
.key
);
216 return watcher
->remove(ctx
.send_disconnect
);
219 logger().info("op_effect: disconnect failed to find watcher {}", ctx
.key
);
220 return seastar::now();
225 OpsExecuter::watch_errorator::future
<> OpsExecuter::do_op_watch_subop_ping(
228 ceph::os::Transaction
& txn
)
230 const entity_name_t
& entity
= get_message().get_reqid().name
;
231 const auto& cookie
= osd_op
.op
.watch
.cookie
;
232 const auto key
= std::make_pair(cookie
, entity
);
234 // Note: WATCH with PING doesn't cause may_write() to return true,
235 // so if there is nothing else in the transaction, this is going
236 // to run do_osd_op_effects, but not write out a log entry */
237 if (!os
.oi
.watchers
.count(key
)) {
238 return crimson::ct_error::not_connected::make();
240 auto it
= obc
->watchers
.find(key
);
241 if (it
== std::end(obc
->watchers
) || !it
->second
->is_connected()) {
242 return crimson::ct_error::timed_out::make();
244 logger().info("found existing watch by {}", entity
);
245 it
->second
->got_ping(ceph_clock_now());
246 return seastar::now();
249 OpsExecuter::watch_errorator::future
<> OpsExecuter::do_op_watch(
252 ceph::os::Transaction
& txn
)
254 logger().debug("{}", __func__
);
256 return crimson::ct_error::enoent::make();
258 switch (osd_op
.op
.watch
.op
) {
259 case CEPH_OSD_WATCH_OP_WATCH
:
260 return do_op_watch_subop_watch(osd_op
, os
, txn
);
261 case CEPH_OSD_WATCH_OP_RECONNECT
:
262 return do_op_watch_subop_reconnect(osd_op
, os
, txn
);
263 case CEPH_OSD_WATCH_OP_PING
:
264 return do_op_watch_subop_ping(osd_op
, os
, txn
);
265 case CEPH_OSD_WATCH_OP_UNWATCH
:
266 return do_op_watch_subop_unwatch(osd_op
, os
, txn
);
267 case CEPH_OSD_WATCH_OP_LEGACY_WATCH
:
268 logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH");
269 return crimson::ct_error::invarg::make();
271 logger().warn("unrecognized WATCH subop: {}", osd_op
.op
.watch
.op
);
272 return crimson::ct_error::invarg::make();
275 static uint64_t get_next_notify_id(epoch_t e
)
278 static std::uint64_t next_notify_id
= 0;
279 return (((uint64_t)e
) << 32) | ((uint64_t)(next_notify_id
++));
282 OpsExecuter::watch_errorator::future
<> OpsExecuter::do_op_notify(
284 const ObjectState
& os
)
286 logger().debug("{}, msg epoch: {}", __func__
, get_message().get_map_epoch());
289 return crimson::ct_error::enoent::make();
291 struct notify_ctx_t
{
292 crimson::net::ConnectionRef conn
;
294 const uint64_t client_gid
;
297 notify_ctx_t(const MOSDOp
& msg
)
298 : conn(msg
.get_connection()),
299 client_gid(msg
.get_reqid().name
.num()),
300 epoch(msg
.get_map_epoch()) {
303 return with_effect_on_obc(notify_ctx_t
{ get_message() },
306 auto bp
= osd_op
.indata
.cbegin();
307 uint32_t ver
; // obsolete
308 ceph::decode(ver
, bp
);
309 ceph::decode(ctx
.ninfo
.timeout
, bp
);
310 ceph::decode(ctx
.ninfo
.bl
, bp
);
311 } catch (const buffer::error
&) {
312 ctx
.ninfo
.timeout
= 0;
314 if (!ctx
.ninfo
.timeout
) {
315 using crimson::common::local_conf
;
316 ctx
.ninfo
.timeout
= local_conf()->osd_default_notify_timeout
;
318 ctx
.ninfo
.notify_id
= get_next_notify_id(ctx
.epoch
);
319 ctx
.ninfo
.cookie
= osd_op
.op
.notify
.cookie
;
320 // return our unique notify id to the client
321 ceph::encode(ctx
.ninfo
.notify_id
, osd_op
.outdata
);
322 return seastar::now();
324 [] (auto&& ctx
, ObjectContextRef obc
) {
325 auto alive_watchers
= obc
->watchers
| boost::adaptors::map_values
326 | boost::adaptors::filtered(
328 // FIXME: filter as for the `is_ping` in `Watch::start_notify`
329 return w
->is_alive();
331 return crimson::osd::Notify::create_n_propagate(
332 std::begin(alive_watchers
),
333 std::end(alive_watchers
),
337 obc
->obs
.oi
.user_version
);
341 OpsExecuter::watch_errorator::future
<> OpsExecuter::do_op_notify_ack(
343 const ObjectState
& os
)
345 logger().debug("{}", __func__
);
347 struct notifyack_ctx_t
{
348 const entity_name_t entity
;
349 uint64_t watch_cookie
;
351 ceph::bufferlist reply_bl
;
353 notifyack_ctx_t(const MOSDOp
& msg
) : entity(msg
.get_reqid().name
) {
356 return with_effect_on_obc(notifyack_ctx_t
{ get_message() },
357 [&] (auto& ctx
) -> watch_errorator::future
<> {
359 auto bp
= osd_op
.indata
.cbegin();
360 ceph::decode(ctx
.notify_id
, bp
);
361 ceph::decode(ctx
.watch_cookie
, bp
);
363 ceph::decode(ctx
.reply_bl
, bp
);
365 } catch (const buffer::error
&) {
366 // here we behave differently than ceph-osd. For historical reasons,
367 // it falls back to using `osd_op.op.watch.cookie` as `ctx.notify_id`.
368 // crimson just returns EINVAL if the data cannot be decoded.
369 return crimson::ct_error::invarg::make();
371 return watch_errorator::now();
373 [] (auto&& ctx
, ObjectContextRef obc
) {
374 logger().info("notify_ack watch_cookie={}, notify_id={}",
375 ctx
.watch_cookie
, ctx
.notify_id
);
376 return seastar::do_for_each(obc
->watchers
,
377 [ctx
=std::move(ctx
)] (auto& kv
) {
378 const auto& [key
, watchp
] = kv
;
380 std::is_same_v
<std::decay_t
<decltype(watchp
)>,
381 seastar::shared_ptr
<crimson::osd::Watch
>>);
382 auto& [cookie
, entity
] = key
;
383 if (ctx
.entity
!= entity
) {
384 logger().debug("skipping watch {}; entity name {} != {}",
385 key
, entity
, ctx
.entity
);
386 return seastar::now();
388 if (ctx
.watch_cookie
!= cookie
) {
389 logger().debug("skipping watch {}; cookie {} != {}",
390 key
, ctx
.watch_cookie
, cookie
);
391 return seastar::now();
393 logger().info("acking notify on watch {}", key
);
394 return watchp
->notify_ack(ctx
.notify_id
, ctx
.reply_bl
);
399 static inline std::unique_ptr
<const PGLSFilter
> get_pgls_filter(
400 const std::string
& type
,
401 bufferlist::const_iterator
& iter
)
403 // storing non-const PGLSFilter for the sake of ::init()
404 std::unique_ptr
<PGLSFilter
> filter
;
405 if (type
.compare("plain") == 0) {
406 filter
= std::make_unique
<PGLSPlainFilter
>();
408 std::size_t dot
= type
.find(".");
409 if (dot
== type
.npos
|| dot
== 0 || dot
== type
.size() - 1) {
410 throw crimson::osd::invalid_argument
{};
413 const std::string class_name
= type
.substr(0, dot
);
414 const std::string filter_name
= type
.substr(dot
+ 1);
415 ClassHandler::ClassData
*cls
= nullptr;
416 int r
= ClassHandler::get_instance().open_class(class_name
, &cls
);
418 logger().warn("can't open class {}: {}", class_name
, cpp_strerror(r
));
420 // propogate permission error
421 throw crimson::osd::permission_denied
{};
423 throw crimson::osd::invalid_argument
{};
429 ClassHandler::ClassFilter
* const class_filter
= cls
->get_filter(filter_name
);
430 if (class_filter
== nullptr) {
431 logger().warn("can't find filter {} in class {}", filter_name
, class_name
);
432 throw crimson::osd::invalid_argument
{};
435 filter
.reset(class_filter
->fn());
437 // Object classes are obliged to return us something, but let's
438 // give an error rather than asserting out.
439 logger().warn("buggy class {} failed to construct filter {}",
440 class_name
, filter_name
);
441 throw crimson::osd::invalid_argument
{};
446 int r
= filter
->init(iter
);
448 logger().warn("error initializing filter {}: {}", type
, cpp_strerror(r
));
449 throw crimson::osd::invalid_argument
{};
452 // successfully constructed and initialized, return it.
456 static seastar::future
<hobject_t
> pgls_filter(
457 const PGLSFilter
& filter
,
458 const PGBackend
& backend
,
459 const hobject_t
& sobj
)
461 if (const auto xattr
= filter
.get_xattr(); !xattr
.empty()) {
462 logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
464 return backend
.getxattr(sobj
, xattr
).safe_then(
465 [&filter
, sobj
] (ceph::bufferptr bp
) {
466 logger().debug("pgls_filter: got xvalue for obj={}", sobj
);
468 ceph::bufferlist val
;
469 val
.push_back(std::move(bp
));
470 const bool filtered
= filter
.filter(sobj
, val
);
471 return seastar::make_ready_future
<hobject_t
>(filtered
? sobj
: hobject_t
{});
472 }, PGBackend::get_attr_errorator::all_same_way([&filter
, sobj
] {
473 logger().debug("pgls_filter: got error for obj={}", sobj
);
475 if (filter
.reject_empty_xattr()) {
476 return seastar::make_ready_future
<hobject_t
>(hobject_t
{});
478 ceph::bufferlist val
;
479 const bool filtered
= filter
.filter(sobj
, val
);
480 return seastar::make_ready_future
<hobject_t
>(filtered
? sobj
: hobject_t
{});
483 ceph::bufferlist empty_lvalue_bl
;
484 const bool filtered
= filter
.filter(sobj
, empty_lvalue_bl
);
485 return seastar::make_ready_future
<hobject_t
>(filtered
? sobj
: hobject_t
{});
489 static seastar::future
<ceph::bufferlist
> do_pgnls_common(
490 const hobject_t
& pg_start
,
491 const hobject_t
& pg_end
,
492 const PGBackend
& backend
,
493 const hobject_t
& lower_bound
,
494 const std::string
& nspace
,
495 const uint64_t limit
,
496 const PGLSFilter
* const filter
)
498 if (!(lower_bound
.is_min() ||
499 lower_bound
.is_max() ||
500 (lower_bound
>= pg_start
&& lower_bound
< pg_end
))) {
501 // this should only happen with a buggy client.
502 throw std::invalid_argument("outside of PG bounds");
505 return backend
.list_objects(lower_bound
, limit
).then(
506 [&backend
, filter
, nspace
](auto objects
, auto next
) {
507 auto in_my_namespace
= [&nspace
](const hobject_t
& obj
) {
508 using crimson::common::local_conf
;
509 if (obj
.get_namespace() == local_conf()->osd_hit_set_namespace
) {
511 } else if (nspace
== librados::all_nspaces
) {
514 return obj
.get_namespace() == nspace
;
517 auto to_pglsed
= [&backend
, filter
] (const hobject_t
& obj
) {
518 // this transformation looks costly. However, I don't have any
519 // reason to think PGLS* operations are critical for, let's say,
520 // general performance.
522 // from tchaikov: "another way is to use seastar::map_reduce(),
523 // to 1) save the effort to filter the already filtered objects
524 // 2) avoid the space to keep the tuple<bool, object> even if
525 // the object is filtered out".
527 return pgls_filter(*filter
, backend
, obj
);
529 return seastar::make_ready_future
<hobject_t
>(obj
);
533 auto range
= objects
| boost::adaptors::filtered(in_my_namespace
)
534 | boost::adaptors::transformed(to_pglsed
);
535 logger().debug("do_pgnls_common: finishing the 1st stage of pgls");
536 return seastar::when_all_succeed(std::begin(range
),
537 std::end(range
)).then(
538 [next
=std::move(next
)] (auto items
) mutable {
539 // the sole purpose of this chaining is to pass `next` to 2nd
540 // stage altogether with items
541 logger().debug("do_pgnls_common: 1st done");
542 return seastar::make_ready_future
<decltype(items
), decltype(next
)>(
543 std::move(items
), std::move(next
));
546 [pg_end
, filter
] (const std::vector
<hobject_t
>& items
, auto next
) {
547 auto is_matched
= [] (const auto& obj
) {
548 return !obj
.is_min();
550 auto to_entry
= [] (const auto& obj
) {
551 return librados::ListObjectImpl
{
552 obj
.get_namespace(), obj
.oid
.name
, obj
.get_key()
556 pg_nls_response_t response
;
557 boost::push_back(response
.entries
, items
| boost::adaptors::filtered(is_matched
)
558 | boost::adaptors::transformed(to_entry
));
559 response
.handle
= next
.is_max() ? pg_end
: next
;
560 ceph::bufferlist out
;
561 encode(response
, out
);
562 logger().debug("{}: response.entries.size()=",
563 __func__
, response
.entries
.size());
564 return seastar::make_ready_future
<ceph::bufferlist
>(std::move(out
));
568 static seastar::future
<> do_pgnls(
570 const std::string
& nspace
,
573 hobject_t lower_bound
;
575 ceph::decode(lower_bound
, osd_op
.indata
);
576 } catch (const buffer::error
&) {
577 throw std::invalid_argument("unable to decode PGNLS handle");
579 const auto pg_start
= pg
.get_pgid().pgid
.get_hobj_start();
580 const auto pg_end
= \
581 pg
.get_pgid().pgid
.get_hobj_end(pg
.get_pool().info
.get_pg_num());
582 return do_pgnls_common(pg_start
,
587 osd_op
.op
.pgls
.count
,
588 nullptr /* no filter */)
589 .then([&osd_op
](bufferlist bl
) {
590 osd_op
.outdata
= std::move(bl
);
591 return seastar::now();
595 static seastar::future
<> do_pgnls_filtered(
597 const std::string
& nspace
,
600 std::string cname
, mname
, type
;
601 auto bp
= osd_op
.indata
.cbegin();
603 ceph::decode(cname
, bp
);
604 ceph::decode(mname
, bp
);
605 ceph::decode(type
, bp
);
606 } catch (const buffer::error
&) {
607 throw crimson::osd::invalid_argument
{};
610 auto filter
= get_pgls_filter(type
, bp
);
612 hobject_t lower_bound
;
614 lower_bound
.decode(bp
);
615 } catch (const buffer::error
&) {
616 throw std::invalid_argument("unable to decode PGNLS_FILTER description");
619 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
620 __func__
, cname
, mname
, type
, lower_bound
,
621 static_cast<const void*>(filter
.get()));
622 return seastar::do_with(std::move(filter
),
623 [&, lower_bound
=std::move(lower_bound
)](auto&& filter
) {
624 const auto pg_start
= pg
.get_pgid().pgid
.get_hobj_start();
625 const auto pg_end
= pg
.get_pgid().pgid
.get_hobj_end(pg
.get_pool().info
.get_pg_num());
626 return do_pgnls_common(pg_start
,
631 osd_op
.op
.pgls
.count
,
633 .then([&osd_op
](bufferlist bl
) {
634 osd_op
.outdata
= std::move(bl
);
635 return seastar::now();
640 OpsExecuter::osd_op_errorator::future
<>
641 OpsExecuter::execute_osd_op(OSDOp
& osd_op
)
643 // TODO: dispatch via call table?
644 // TODO: we might want to find a way to unify both input and output
647 "handling op {} on object {}",
648 ceph_osd_op_name(osd_op
.op
.op
),
650 switch (const ceph_osd_op
& op
= osd_op
.op
; op
.op
) {
651 case CEPH_OSD_OP_SYNC_READ
:
653 case CEPH_OSD_OP_READ
:
654 return do_read_op([&osd_op
] (auto& backend
, const auto& os
) {
655 return backend
.read(os
.oi
,
656 osd_op
.op
.extent
.offset
,
657 osd_op
.op
.extent
.length
,
658 osd_op
.op
.extent
.truncate_size
,
659 osd_op
.op
.extent
.truncate_seq
,
660 osd_op
.op
.flags
).safe_then(
661 [&osd_op
](ceph::bufferlist
&& bl
) {
662 osd_op
.rval
= bl
.length();
663 osd_op
.outdata
= std::move(bl
);
664 return osd_op_errorator::now();
667 case CEPH_OSD_OP_GETXATTR
:
668 return do_read_op([&osd_op
] (auto& backend
, const auto& os
) {
669 return backend
.getxattr(os
, osd_op
);
671 case CEPH_OSD_OP_CREATE
:
672 return do_write_op([&osd_op
] (auto& backend
, auto& os
, auto& txn
) {
673 return backend
.create(os
, osd_op
, txn
);
675 case CEPH_OSD_OP_WRITE
:
676 return do_write_op([&osd_op
] (auto& backend
, auto& os
, auto& txn
) {
677 return backend
.write(os
, osd_op
, txn
);
679 case CEPH_OSD_OP_WRITEFULL
:
680 return do_write_op([&osd_op
] (auto& backend
, auto& os
, auto& txn
) {
681 return backend
.writefull(os
, osd_op
, txn
);
683 case CEPH_OSD_OP_SETALLOCHINT
:
684 return osd_op_errorator::now();
685 case CEPH_OSD_OP_SETXATTR
:
686 return do_write_op([&osd_op
] (auto& backend
, auto& os
, auto& txn
) {
687 return backend
.setxattr(os
, osd_op
, txn
);
689 case CEPH_OSD_OP_DELETE
:
690 return do_write_op([&osd_op
] (auto& backend
, auto& os
, auto& txn
) {
691 return backend
.remove(os
, txn
);
693 case CEPH_OSD_OP_CALL
:
694 return this->do_op_call(osd_op
);
695 case CEPH_OSD_OP_STAT
:
696 // note: stat does not require RD
697 return do_const_op([&osd_op
] (/* const */auto& backend
, const auto& os
) {
698 return backend
.stat(os
, osd_op
);
700 case CEPH_OSD_OP_TMAPUP
:
701 // TODO: there was an effort to kill TMAP in ceph-osd. According to
702 // @dzafman this isn't possible yet. Maybe it could be accomplished
703 // before crimson's readiness and we'd luckily don't need to carry.
704 return dont_do_legacy_op();
707 case CEPH_OSD_OP_OMAPGETKEYS
:
708 return do_read_op([&osd_op
] (auto& backend
, const auto& os
) {
709 return backend
.omap_get_keys(os
, osd_op
);
711 case CEPH_OSD_OP_OMAPGETVALS
:
712 return do_read_op([&osd_op
] (auto& backend
, const auto& os
) {
713 return backend
.omap_get_vals(os
, osd_op
);
715 case CEPH_OSD_OP_OMAPGETVALSBYKEYS
:
716 return do_read_op([&osd_op
] (auto& backend
, const auto& os
) {
717 return backend
.omap_get_vals_by_keys(os
, osd_op
);
719 case CEPH_OSD_OP_OMAPSETVALS
:
721 if (!pg
.get_pool().info
.supports_omap()) {
722 return crimson::ct_error::operation_not_supported::make();
725 return do_write_op([&osd_op
] (auto& backend
, auto& os
, auto& txn
) {
726 return backend
.omap_set_vals(os
, osd_op
, txn
);
730 case CEPH_OSD_OP_WATCH
:
731 return do_write_op([this, &osd_op
] (auto& backend
, auto& os
, auto& txn
) {
732 return do_op_watch(osd_op
, os
, txn
);
734 case CEPH_OSD_OP_NOTIFY
:
735 return do_read_op([this, &osd_op
] (auto&, const auto& os
) {
736 return do_op_notify(osd_op
, os
);
738 case CEPH_OSD_OP_NOTIFY_ACK
:
739 return do_read_op([this, &osd_op
] (auto&, const auto& os
) {
740 return do_op_notify_ack(osd_op
, os
);
744 logger().warn("unknown op {}", ceph_osd_op_name(op
.op
));
745 throw std::runtime_error(
746 fmt::format("op '{}' not supported", ceph_osd_op_name(op
.op
)));
750 static seastar::future
<ceph::bufferlist
> do_pgls_common(
751 const hobject_t
& pg_start
,
752 const hobject_t
& pg_end
,
753 const PGBackend
& backend
,
754 const hobject_t
& lower_bound
,
755 const std::string
& nspace
,
756 const uint64_t limit
,
757 const PGLSFilter
* const filter
)
759 if (!(lower_bound
.is_min() ||
760 lower_bound
.is_max() ||
761 (lower_bound
>= pg_start
&& lower_bound
< pg_end
))) {
762 // this should only happen with a buggy client.
763 throw std::invalid_argument("outside of PG bounds");
766 using entries_t
= decltype(pg_ls_response_t::entries
);
767 return backend
.list_objects(lower_bound
, limit
).then(
768 [&backend
, filter
, nspace
](auto objects
, auto next
) {
769 return seastar::when_all_succeed(
770 seastar::map_reduce(std::move(objects
),
771 [&backend
, filter
, nspace
](const hobject_t
& obj
) {
772 if (obj
.get_namespace() == nspace
) {
774 return pgls_filter(*filter
, backend
, obj
);
776 return seastar::make_ready_future
<hobject_t
>(obj
);
779 return seastar::make_ready_future
<hobject_t
>(hobject_t
{});
783 [](entries_t
&& entries
, hobject_t obj
) {
785 entries
.emplace_back(obj
.oid
, obj
.get_key());
789 seastar::make_ready_future
<hobject_t
>(next
));
790 }).then([pg_end
](entries_t entries
, hobject_t next
) {
791 pg_ls_response_t response
;
792 response
.handle
= next
.is_max() ? pg_end
: next
;
793 response
.entries
= std::move(entries
);
794 ceph::bufferlist out
;
795 encode(response
, out
);
796 logger().debug("{}: response.entries.size()=",
797 __func__
, response
.entries
.size());
798 return seastar::make_ready_future
<ceph::bufferlist
>(std::move(out
));
802 static seastar::future
<> do_pgls(
804 const std::string
& nspace
,
807 hobject_t lower_bound
;
808 auto bp
= osd_op
.indata
.cbegin();
810 lower_bound
.decode(bp
);
811 } catch (const buffer::error
&) {
812 throw std::invalid_argument
{"unable to decode PGLS handle"};
814 const auto pg_start
= pg
.get_pgid().pgid
.get_hobj_start();
816 pg
.get_pgid().pgid
.get_hobj_end(pg
.get_pool().info
.get_pg_num());
817 return do_pgls_common(pg_start
,
822 osd_op
.op
.pgls
.count
,
823 nullptr /* no filter */)
824 .then([&osd_op
](bufferlist bl
) {
825 osd_op
.outdata
= std::move(bl
);
826 return seastar::now();
830 static seastar::future
<> do_pgls_filtered(
832 const std::string
& nspace
,
835 std::string cname
, mname
, type
;
836 auto bp
= osd_op
.indata
.cbegin();
838 ceph::decode(cname
, bp
);
839 ceph::decode(mname
, bp
);
840 ceph::decode(type
, bp
);
841 } catch (const buffer::error
&) {
842 throw crimson::osd::invalid_argument
{};
845 auto filter
= get_pgls_filter(type
, bp
);
847 hobject_t lower_bound
;
849 lower_bound
.decode(bp
);
850 } catch (const buffer::error
&) {
851 throw std::invalid_argument("unable to decode PGLS_FILTER description");
854 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
855 __func__
, cname
, mname
, type
, lower_bound
,
856 static_cast<const void*>(filter
.get()));
857 return seastar::do_with(std::move(filter
),
858 [&, lower_bound
=std::move(lower_bound
)](auto&& filter
) {
859 const auto pg_start
= pg
.get_pgid().pgid
.get_hobj_start();
860 const auto pg_end
= pg
.get_pgid().pgid
.get_hobj_end(pg
.get_pool().info
.get_pg_num());
861 return do_pgls_common(pg_start
,
866 osd_op
.op
.pgls
.count
,
868 .then([&osd_op
](bufferlist bl
) {
869 osd_op
.outdata
= std::move(bl
);
870 return seastar::now();
876 OpsExecuter::execute_pg_op(OSDOp
& osd_op
)
878 logger().warn("handling op {}", ceph_osd_op_name(osd_op
.op
.op
));
879 switch (const ceph_osd_op
& op
= osd_op
.op
; op
.op
) {
880 case CEPH_OSD_OP_PGLS
:
881 return do_pg_op([&osd_op
] (const auto& pg
, const auto& nspace
) {
882 return do_pgls(pg
, nspace
, osd_op
);
884 case CEPH_OSD_OP_PGLS_FILTER
:
885 return do_pg_op([&osd_op
] (const auto& pg
, const auto& nspace
) {
886 return do_pgls_filtered(pg
, nspace
, osd_op
);
888 case CEPH_OSD_OP_PGNLS
:
889 return do_pg_op([&osd_op
] (const auto& pg
, const auto& nspace
) {
890 return do_pgnls(pg
, nspace
, osd_op
);
892 case CEPH_OSD_OP_PGNLS_FILTER
:
893 return do_pg_op([&osd_op
] (const auto& pg
, const auto& nspace
) {
894 return do_pgnls_filtered(pg
, nspace
, osd_op
);
897 logger().warn("unknown op {}", ceph_osd_op_name(op
.op
));
898 throw std::runtime_error(
899 fmt::format("op '{}' not supported", ceph_osd_op_name(op
.op
)));
903 } // namespace crimson::osd