]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/ops_executer.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / osd / ops_executer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "ops_executer.h"
5
6 #include <boost/range/adaptor/filtered.hpp>
7 #include <boost/range/adaptor/map.hpp>
8 #include <boost/range/adaptor/transformed.hpp>
9 #include <boost/range/algorithm_ext/push_back.hpp>
10 #include <boost/range/algorithm/max_element.hpp>
11 #include <boost/range/numeric.hpp>
12
13 #include <fmt/format.h>
14 #include <fmt/ostream.h>
15
16 #include <seastar/core/thread.hh>
17
18 #include "crimson/osd/exceptions.h"
19 #include "crimson/osd/watch.h"
20 #include "osd/ClassHandler.h"
21
22 namespace {
23 seastar::logger& logger() {
24 return crimson::get_logger(ceph_subsys_osd);
25 }
26 }
27
28 namespace crimson::osd {
29
30 OpsExecuter::call_errorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
31 {
32 std::string cname, mname;
33 ceph::bufferlist indata;
34 try {
35 auto bp = std::begin(osd_op.indata);
36 bp.copy(osd_op.op.cls.class_len, cname);
37 bp.copy(osd_op.op.cls.method_len, mname);
38 bp.copy(osd_op.op.cls.indata_len, indata);
39 } catch (buffer::error&) {
40 logger().warn("call unable to decode class + method + indata");
41 return crimson::ct_error::invarg::make();
42 }
43
44 // NOTE: opening a class can actually result in dlopen(), and thus
45 // blocking the entire reactor. Thankfully to ClassHandler's cache
46 // this is supposed to be extremely infrequent.
47 ClassHandler::ClassData* cls;
48 int r = ClassHandler::get_instance().open_class(cname, &cls);
49 if (r) {
50 logger().warn("class {} open got {}", cname, cpp_strerror(r));
51 if (r == -ENOENT) {
52 return crimson::ct_error::operation_not_supported::make();
53 } else if (r == -EPERM) {
54 // propagate permission errors
55 return crimson::ct_error::permission_denied::make();
56 }
57 return crimson::ct_error::input_output_error::make();
58 }
59
60 ClassHandler::ClassMethod* method = cls->get_method(mname);
61 if (!method) {
62 logger().warn("call method {}.{} does not exist", cname, mname);
63 return crimson::ct_error::operation_not_supported::make();
64 }
65
66 const auto flags = method->get_flags();
67 if (!obc->obs.exists && (flags & CLS_METHOD_WR) == 0) {
68 return crimson::ct_error::enoent::make();
69 }
70
71 #if 0
72 if (flags & CLS_METHOD_WR) {
73 ctx->user_modify = true;
74 }
75 #endif
76
77 logger().debug("calling method {}.{}", cname, mname);
78 return seastar::async(
79 [this, method, indata=std::move(indata)]() mutable {
80 ceph::bufferlist outdata;
81 auto cls_context = reinterpret_cast<cls_method_context_t>(this);
82 const auto ret = method->exec(cls_context, indata, outdata);
83 return std::make_pair(ret, std::move(outdata));
84 }
85 ).then(
86 [prev_rd = num_read, prev_wr = num_write, this, &osd_op, flags]
87 (auto outcome) -> call_errorator::future<> {
88 auto& [ret, outdata] = outcome;
89
90 if (num_read > prev_rd && !(flags & CLS_METHOD_RD)) {
91 logger().error("method tried to read object but is not marked RD");
92 return crimson::ct_error::input_output_error::make();
93 }
94 if (num_write > prev_wr && !(flags & CLS_METHOD_WR)) {
95 logger().error("method tried to update object but is not marked WR");
96 return crimson::ct_error::input_output_error::make();
97 }
98
99 // for write calls we never return data expect errors. For details refer
100 // to cls/cls_hello.cc.
101 if (ret < 0 || (flags & CLS_METHOD_WR) == 0) {
102 logger().debug("method called response length={}", outdata.length());
103 osd_op.op.extent.length = outdata.length();
104 osd_op.outdata.claim_append(outdata);
105 }
106 if (ret < 0) {
107 return crimson::stateful_ec{ std::error_code(-ret, std::generic_category()) };
108 }
109 return seastar::now();
110 }
111 );
112 }
113
114 static watch_info_t create_watch_info(const OSDOp& osd_op,
115 const MOSDOp& msg)
116 {
117 using crimson::common::local_conf;
118 const uint32_t timeout =
119 osd_op.op.watch.timeout == 0 ? local_conf()->osd_client_watch_timeout
120 : osd_op.op.watch.timeout;
121 return {
122 osd_op.op.watch.cookie,
123 timeout,
124 msg.get_connection()->get_peer_addr()
125 };
126 }
127
128 OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch(
129 OSDOp& osd_op,
130 ObjectState& os,
131 ceph::os::Transaction& txn)
132 {
133 struct connect_ctx_t {
134 ObjectContext::watch_key_t key;
135 crimson::net::ConnectionRef conn;
136 watch_info_t info;
137
138 connect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg)
139 : key(osd_op.op.watch.cookie, msg.get_reqid().name),
140 conn(msg.get_connection()),
141 info(create_watch_info(osd_op, msg)) {
142 }
143 };
144 return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() },
145 [&] (auto& ctx) {
146 const auto& entity = ctx.key.second;
147 auto [it, emplaced] =
148 os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
149 if (emplaced) {
150 logger().info("registered new watch {} by {}", it->second, entity);
151 txn.nop();
152 } else {
153 logger().info("found existing watch {} by {}", it->second, entity);
154 }
155 return seastar::now();
156 },
157 [] (auto&& ctx, ObjectContextRef obc) {
158 auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
159 if (emplaced) {
160 const auto& [cookie, entity] = ctx.key;
161 it->second = crimson::osd::Watch::create(obc, ctx.info, entity);
162 logger().info("op_effect: added new watcher: {}", ctx.key);
163 } else {
164 logger().info("op_effect: found existing watcher: {}", ctx.key);
165 }
166 return it->second->connect(std::move(ctx.conn), true /* will_ping */);
167 });
168 }
169
170 OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
171 OSDOp& osd_op,
172 ObjectState& os,
173 ceph::os::Transaction& txn)
174 {
175 const entity_name_t& entity = get_message().get_reqid().name;
176 const auto& cookie = osd_op.op.watch.cookie;
177 if (!os.oi.watchers.count(std::make_pair(cookie, entity))) {
178 return crimson::ct_error::not_connected::make();
179 } else {
180 logger().info("found existing watch by {}", entity);
181 return do_op_watch_subop_watch(osd_op, os, txn);
182 }
183 }
184
185 OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_unwatch(
186 OSDOp& osd_op,
187 ObjectState& os,
188 ceph::os::Transaction& txn)
189 {
190 logger().info("{}", __func__);
191
192 struct disconnect_ctx_t {
193 ObjectContext::watch_key_t key;
194 bool send_disconnect{ false };
195
196 disconnect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg)
197 : key(osd_op.op.watch.cookie, msg.get_reqid().name) {
198 }
199 };
200 return with_effect_on_obc(disconnect_ctx_t{ osd_op, get_message() },
201 [&] (auto& ctx) {
202 const auto& entity = ctx.key.second;
203 if (auto nh = os.oi.watchers.extract(ctx.key); !nh.empty()) {
204 logger().info("removed watch {} by {}", nh.mapped(), entity);
205 txn.nop();
206 } else {
207 logger().info("can't remove: no watch by {}", entity);
208 }
209 return seastar::now();
210 },
211 [] (auto&& ctx, ObjectContextRef obc) {
212 if (auto nh = obc->watchers.extract(ctx.key); !nh.empty()) {
213 return seastar::do_with(std::move(nh.mapped()),
214 [ctx](auto&& watcher) {
215 logger().info("op_effect: disconnect watcher {}", ctx.key);
216 return watcher->remove(ctx.send_disconnect);
217 });
218 } else {
219 logger().info("op_effect: disconnect failed to find watcher {}", ctx.key);
220 return seastar::now();
221 }
222 });
223 }
224
225 OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_ping(
226 OSDOp& osd_op,
227 ObjectState& os,
228 ceph::os::Transaction& txn)
229 {
230 const entity_name_t& entity = get_message().get_reqid().name;
231 const auto& cookie = osd_op.op.watch.cookie;
232 const auto key = std::make_pair(cookie, entity);
233
234 // Note: WATCH with PING doesn't cause may_write() to return true,
235 // so if there is nothing else in the transaction, this is going
236 // to run do_osd_op_effects, but not write out a log entry */
237 if (!os.oi.watchers.count(key)) {
238 return crimson::ct_error::not_connected::make();
239 }
240 auto it = obc->watchers.find(key);
241 if (it == std::end(obc->watchers) || !it->second->is_connected()) {
242 return crimson::ct_error::timed_out::make();
243 }
244 logger().info("found existing watch by {}", entity);
245 it->second->got_ping(ceph_clock_now());
246 return seastar::now();
247 }
248
249 OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch(
250 OSDOp& osd_op,
251 ObjectState& os,
252 ceph::os::Transaction& txn)
253 {
254 logger().debug("{}", __func__);
255 if (!os.exists) {
256 return crimson::ct_error::enoent::make();
257 }
258 switch (osd_op.op.watch.op) {
259 case CEPH_OSD_WATCH_OP_WATCH:
260 return do_op_watch_subop_watch(osd_op, os, txn);
261 case CEPH_OSD_WATCH_OP_RECONNECT:
262 return do_op_watch_subop_reconnect(osd_op, os, txn);
263 case CEPH_OSD_WATCH_OP_PING:
264 return do_op_watch_subop_ping(osd_op, os, txn);
265 case CEPH_OSD_WATCH_OP_UNWATCH:
266 return do_op_watch_subop_unwatch(osd_op, os, txn);
267 case CEPH_OSD_WATCH_OP_LEGACY_WATCH:
268 logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH");
269 return crimson::ct_error::invarg::make();
270 }
271 logger().warn("unrecognized WATCH subop: {}", osd_op.op.watch.op);
272 return crimson::ct_error::invarg::make();
273 }
274
275 static uint64_t get_next_notify_id(epoch_t e)
276 {
277 // FIXME
278 static std::uint64_t next_notify_id = 0;
279 return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++));
280 }
281
282 OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify(
283 OSDOp& osd_op,
284 const ObjectState& os)
285 {
286 logger().debug("{}, msg epoch: {}", __func__, get_message().get_map_epoch());
287
288 if (!os.exists) {
289 return crimson::ct_error::enoent::make();
290 }
291 struct notify_ctx_t {
292 crimson::net::ConnectionRef conn;
293 notify_info_t ninfo;
294 const uint64_t client_gid;
295 const epoch_t epoch;
296
297 notify_ctx_t(const MOSDOp& msg)
298 : conn(msg.get_connection()),
299 client_gid(msg.get_reqid().name.num()),
300 epoch(msg.get_map_epoch()) {
301 }
302 };
303 return with_effect_on_obc(notify_ctx_t{ get_message() },
304 [&] (auto& ctx) {
305 try {
306 auto bp = osd_op.indata.cbegin();
307 uint32_t ver; // obsolete
308 ceph::decode(ver, bp);
309 ceph::decode(ctx.ninfo.timeout, bp);
310 ceph::decode(ctx.ninfo.bl, bp);
311 } catch (const buffer::error&) {
312 ctx.ninfo.timeout = 0;
313 }
314 if (!ctx.ninfo.timeout) {
315 using crimson::common::local_conf;
316 ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
317 }
318 ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
319 ctx.ninfo.cookie = osd_op.op.notify.cookie;
320 // return our unique notify id to the client
321 ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
322 return seastar::now();
323 },
324 [] (auto&& ctx, ObjectContextRef obc) {
325 auto alive_watchers = obc->watchers | boost::adaptors::map_values
326 | boost::adaptors::filtered(
327 [] (const auto& w) {
328 // FIXME: filter as for the `is_ping` in `Watch::start_notify`
329 return w->is_alive();
330 });
331 return crimson::osd::Notify::create_n_propagate(
332 std::begin(alive_watchers),
333 std::end(alive_watchers),
334 std::move(ctx.conn),
335 ctx.ninfo,
336 ctx.client_gid,
337 obc->obs.oi.user_version);
338 });
339 }
340
341 OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack(
342 OSDOp& osd_op,
343 const ObjectState& os)
344 {
345 logger().debug("{}", __func__);
346
347 struct notifyack_ctx_t {
348 const entity_name_t entity;
349 uint64_t watch_cookie;
350 uint64_t notify_id;
351 ceph::bufferlist reply_bl;
352
353 notifyack_ctx_t(const MOSDOp& msg) : entity(msg.get_reqid().name) {
354 }
355 };
356 return with_effect_on_obc(notifyack_ctx_t{ get_message() },
357 [&] (auto& ctx) -> watch_errorator::future<> {
358 try {
359 auto bp = osd_op.indata.cbegin();
360 ceph::decode(ctx.notify_id, bp);
361 ceph::decode(ctx.watch_cookie, bp);
362 if (!bp.end()) {
363 ceph::decode(ctx.reply_bl, bp);
364 }
365 } catch (const buffer::error&) {
366 // here we behave differently than ceph-osd. For historical reasons,
367 // it falls back to using `osd_op.op.watch.cookie` as `ctx.notify_id`.
368 // crimson just returns EINVAL if the data cannot be decoded.
369 return crimson::ct_error::invarg::make();
370 }
371 return watch_errorator::now();
372 },
373 [] (auto&& ctx, ObjectContextRef obc) {
374 logger().info("notify_ack watch_cookie={}, notify_id={}",
375 ctx.watch_cookie, ctx.notify_id);
376 return seastar::do_for_each(obc->watchers,
377 [ctx=std::move(ctx)] (auto& kv) {
378 const auto& [key, watchp] = kv;
379 static_assert(
380 std::is_same_v<std::decay_t<decltype(watchp)>,
381 seastar::shared_ptr<crimson::osd::Watch>>);
382 auto& [cookie, entity] = key;
383 if (ctx.entity != entity) {
384 logger().debug("skipping watch {}; entity name {} != {}",
385 key, entity, ctx.entity);
386 return seastar::now();
387 }
388 if (ctx.watch_cookie != cookie) {
389 logger().debug("skipping watch {}; cookie {} != {}",
390 key, ctx.watch_cookie, cookie);
391 return seastar::now();
392 }
393 logger().info("acking notify on watch {}", key);
394 return watchp->notify_ack(ctx.notify_id, ctx.reply_bl);
395 });
396 });
397 }
398
399 static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
400 const std::string& type,
401 bufferlist::const_iterator& iter)
402 {
403 // storing non-const PGLSFilter for the sake of ::init()
404 std::unique_ptr<PGLSFilter> filter;
405 if (type.compare("plain") == 0) {
406 filter = std::make_unique<PGLSPlainFilter>();
407 } else {
408 std::size_t dot = type.find(".");
409 if (dot == type.npos || dot == 0 || dot == type.size() - 1) {
410 throw crimson::osd::invalid_argument{};
411 }
412
413 const std::string class_name = type.substr(0, dot);
414 const std::string filter_name = type.substr(dot + 1);
415 ClassHandler::ClassData *cls = nullptr;
416 int r = ClassHandler::get_instance().open_class(class_name, &cls);
417 if (r != 0) {
418 logger().warn("can't open class {}: {}", class_name, cpp_strerror(r));
419 if (r == -EPERM) {
420 // propogate permission error
421 throw crimson::osd::permission_denied{};
422 } else {
423 throw crimson::osd::invalid_argument{};
424 }
425 } else {
426 ceph_assert(cls);
427 }
428
429 ClassHandler::ClassFilter * const class_filter = cls->get_filter(filter_name);
430 if (class_filter == nullptr) {
431 logger().warn("can't find filter {} in class {}", filter_name, class_name);
432 throw crimson::osd::invalid_argument{};
433 }
434
435 filter.reset(class_filter->fn());
436 if (!filter) {
437 // Object classes are obliged to return us something, but let's
438 // give an error rather than asserting out.
439 logger().warn("buggy class {} failed to construct filter {}",
440 class_name, filter_name);
441 throw crimson::osd::invalid_argument{};
442 }
443 }
444
445 ceph_assert(filter);
446 int r = filter->init(iter);
447 if (r < 0) {
448 logger().warn("error initializing filter {}: {}", type, cpp_strerror(r));
449 throw crimson::osd::invalid_argument{};
450 }
451
452 // successfully constructed and initialized, return it.
453 return filter;
454 }
455
456 static seastar::future<hobject_t> pgls_filter(
457 const PGLSFilter& filter,
458 const PGBackend& backend,
459 const hobject_t& sobj)
460 {
461 if (const auto xattr = filter.get_xattr(); !xattr.empty()) {
462 logger().debug("pgls_filter: filter is interested in xattr={} for obj={}",
463 xattr, sobj);
464 return backend.getxattr(sobj, xattr).safe_then(
465 [&filter, sobj] (ceph::bufferptr bp) {
466 logger().debug("pgls_filter: got xvalue for obj={}", sobj);
467
468 ceph::bufferlist val;
469 val.push_back(std::move(bp));
470 const bool filtered = filter.filter(sobj, val);
471 return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
472 }, PGBackend::get_attr_errorator::all_same_way([&filter, sobj] {
473 logger().debug("pgls_filter: got error for obj={}", sobj);
474
475 if (filter.reject_empty_xattr()) {
476 return seastar::make_ready_future<hobject_t>(hobject_t{});
477 }
478 ceph::bufferlist val;
479 const bool filtered = filter.filter(sobj, val);
480 return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
481 }));
482 } else {
483 ceph::bufferlist empty_lvalue_bl;
484 const bool filtered = filter.filter(sobj, empty_lvalue_bl);
485 return seastar::make_ready_future<hobject_t>(filtered ? sobj : hobject_t{});
486 }
487 }
488
489 static seastar::future<ceph::bufferlist> do_pgnls_common(
490 const hobject_t& pg_start,
491 const hobject_t& pg_end,
492 const PGBackend& backend,
493 const hobject_t& lower_bound,
494 const std::string& nspace,
495 const uint64_t limit,
496 const PGLSFilter* const filter)
497 {
498 if (!(lower_bound.is_min() ||
499 lower_bound.is_max() ||
500 (lower_bound >= pg_start && lower_bound < pg_end))) {
501 // this should only happen with a buggy client.
502 throw std::invalid_argument("outside of PG bounds");
503 }
504
505 return backend.list_objects(lower_bound, limit).then(
506 [&backend, filter, nspace](auto objects, auto next) {
507 auto in_my_namespace = [&nspace](const hobject_t& obj) {
508 using crimson::common::local_conf;
509 if (obj.get_namespace() == local_conf()->osd_hit_set_namespace) {
510 return false;
511 } else if (nspace == librados::all_nspaces) {
512 return true;
513 } else {
514 return obj.get_namespace() == nspace;
515 }
516 };
517 auto to_pglsed = [&backend, filter] (const hobject_t& obj) {
518 // this transformation looks costly. However, I don't have any
519 // reason to think PGLS* operations are critical for, let's say,
520 // general performance.
521 //
522 // from tchaikov: "another way is to use seastar::map_reduce(),
523 // to 1) save the effort to filter the already filtered objects
524 // 2) avoid the space to keep the tuple<bool, object> even if
525 // the object is filtered out".
526 if (filter) {
527 return pgls_filter(*filter, backend, obj);
528 } else {
529 return seastar::make_ready_future<hobject_t>(obj);
530 }
531 };
532
533 auto range = objects | boost::adaptors::filtered(in_my_namespace)
534 | boost::adaptors::transformed(to_pglsed);
535 logger().debug("do_pgnls_common: finishing the 1st stage of pgls");
536 return seastar::when_all_succeed(std::begin(range),
537 std::end(range)).then(
538 [next=std::move(next)] (auto items) mutable {
539 // the sole purpose of this chaining is to pass `next` to 2nd
540 // stage altogether with items
541 logger().debug("do_pgnls_common: 1st done");
542 return seastar::make_ready_future<decltype(items), decltype(next)>(
543 std::move(items), std::move(next));
544 });
545 }).then(
546 [pg_end, filter] (const std::vector<hobject_t>& items, auto next) {
547 auto is_matched = [] (const auto& obj) {
548 return !obj.is_min();
549 };
550 auto to_entry = [] (const auto& obj) {
551 return librados::ListObjectImpl{
552 obj.get_namespace(), obj.oid.name, obj.get_key()
553 };
554 };
555
556 pg_nls_response_t response;
557 boost::push_back(response.entries, items | boost::adaptors::filtered(is_matched)
558 | boost::adaptors::transformed(to_entry));
559 response.handle = next.is_max() ? pg_end : next;
560 ceph::bufferlist out;
561 encode(response, out);
562 logger().debug("{}: response.entries.size()=",
563 __func__, response.entries.size());
564 return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
565 });
566 }
567
568 static seastar::future<> do_pgnls(
569 const PG& pg,
570 const std::string& nspace,
571 OSDOp& osd_op)
572 {
573 hobject_t lower_bound;
574 try {
575 ceph::decode(lower_bound, osd_op.indata);
576 } catch (const buffer::error&) {
577 throw std::invalid_argument("unable to decode PGNLS handle");
578 }
579 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
580 const auto pg_end = \
581 pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
582 return do_pgnls_common(pg_start,
583 pg_end,
584 pg.get_backend(),
585 lower_bound,
586 nspace,
587 osd_op.op.pgls.count,
588 nullptr /* no filter */)
589 .then([&osd_op](bufferlist bl) {
590 osd_op.outdata = std::move(bl);
591 return seastar::now();
592 });
593 }
594
595 static seastar::future<> do_pgnls_filtered(
596 const PG& pg,
597 const std::string& nspace,
598 OSDOp& osd_op)
599 {
600 std::string cname, mname, type;
601 auto bp = osd_op.indata.cbegin();
602 try {
603 ceph::decode(cname, bp);
604 ceph::decode(mname, bp);
605 ceph::decode(type, bp);
606 } catch (const buffer::error&) {
607 throw crimson::osd::invalid_argument{};
608 }
609
610 auto filter = get_pgls_filter(type, bp);
611
612 hobject_t lower_bound;
613 try {
614 lower_bound.decode(bp);
615 } catch (const buffer::error&) {
616 throw std::invalid_argument("unable to decode PGNLS_FILTER description");
617 }
618
619 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
620 __func__, cname, mname, type, lower_bound,
621 static_cast<const void*>(filter.get()));
622 return seastar::do_with(std::move(filter),
623 [&, lower_bound=std::move(lower_bound)](auto&& filter) {
624 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
625 const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
626 return do_pgnls_common(pg_start,
627 pg_end,
628 pg.get_backend(),
629 lower_bound,
630 nspace,
631 osd_op.op.pgls.count,
632 filter.get())
633 .then([&osd_op](bufferlist bl) {
634 osd_op.outdata = std::move(bl);
635 return seastar::now();
636 });
637 });
638 }
639
640 OpsExecuter::osd_op_errorator::future<>
641 OpsExecuter::execute_osd_op(OSDOp& osd_op)
642 {
643 // TODO: dispatch via call table?
644 // TODO: we might want to find a way to unify both input and output
645 // of each op.
646 logger().debug(
647 "handling op {} on object {}",
648 ceph_osd_op_name(osd_op.op.op),
649 get_target());
650 switch (const ceph_osd_op& op = osd_op.op; op.op) {
651 case CEPH_OSD_OP_SYNC_READ:
652 [[fallthrough]];
653 case CEPH_OSD_OP_READ:
654 return do_read_op([&osd_op] (auto& backend, const auto& os) {
655 return backend.read(os.oi,
656 osd_op.op.extent.offset,
657 osd_op.op.extent.length,
658 osd_op.op.extent.truncate_size,
659 osd_op.op.extent.truncate_seq,
660 osd_op.op.flags).safe_then(
661 [&osd_op](ceph::bufferlist&& bl) {
662 osd_op.rval = bl.length();
663 osd_op.outdata = std::move(bl);
664 return osd_op_errorator::now();
665 });
666 });
667 case CEPH_OSD_OP_GETXATTR:
668 return do_read_op([&osd_op] (auto& backend, const auto& os) {
669 return backend.getxattr(os, osd_op);
670 });
671 case CEPH_OSD_OP_CREATE:
672 return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
673 return backend.create(os, osd_op, txn);
674 });
675 case CEPH_OSD_OP_WRITE:
676 return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
677 return backend.write(os, osd_op, txn);
678 });
679 case CEPH_OSD_OP_WRITEFULL:
680 return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
681 return backend.writefull(os, osd_op, txn);
682 });
683 case CEPH_OSD_OP_SETALLOCHINT:
684 return osd_op_errorator::now();
685 case CEPH_OSD_OP_SETXATTR:
686 return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
687 return backend.setxattr(os, osd_op, txn);
688 });
689 case CEPH_OSD_OP_DELETE:
690 return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
691 return backend.remove(os, txn);
692 });
693 case CEPH_OSD_OP_CALL:
694 return this->do_op_call(osd_op);
695 case CEPH_OSD_OP_STAT:
696 // note: stat does not require RD
697 return do_const_op([&osd_op] (/* const */auto& backend, const auto& os) {
698 return backend.stat(os, osd_op);
699 });
700 case CEPH_OSD_OP_TMAPUP:
701 // TODO: there was an effort to kill TMAP in ceph-osd. According to
702 // @dzafman this isn't possible yet. Maybe it could be accomplished
703 // before crimson's readiness and we'd luckily don't need to carry.
704 return dont_do_legacy_op();
705
706 // OMAP
707 case CEPH_OSD_OP_OMAPGETKEYS:
708 return do_read_op([&osd_op] (auto& backend, const auto& os) {
709 return backend.omap_get_keys(os, osd_op);
710 });
711 case CEPH_OSD_OP_OMAPGETVALS:
712 return do_read_op([&osd_op] (auto& backend, const auto& os) {
713 return backend.omap_get_vals(os, osd_op);
714 });
715 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
716 return do_read_op([&osd_op] (auto& backend, const auto& os) {
717 return backend.omap_get_vals_by_keys(os, osd_op);
718 });
719 case CEPH_OSD_OP_OMAPSETVALS:
720 #if 0
721 if (!pg.get_pool().info.supports_omap()) {
722 return crimson::ct_error::operation_not_supported::make();
723 }
724 #endif
725 return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
726 return backend.omap_set_vals(os, osd_op, txn);
727 });
728
729 // watch/notify
730 case CEPH_OSD_OP_WATCH:
731 return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
732 return do_op_watch(osd_op, os, txn);
733 });
734 case CEPH_OSD_OP_NOTIFY:
735 return do_read_op([this, &osd_op] (auto&, const auto& os) {
736 return do_op_notify(osd_op, os);
737 });
738 case CEPH_OSD_OP_NOTIFY_ACK:
739 return do_read_op([this, &osd_op] (auto&, const auto& os) {
740 return do_op_notify_ack(osd_op, os);
741 });
742
743 default:
744 logger().warn("unknown op {}", ceph_osd_op_name(op.op));
745 throw std::runtime_error(
746 fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
747 }
748 }
749
750 static seastar::future<ceph::bufferlist> do_pgls_common(
751 const hobject_t& pg_start,
752 const hobject_t& pg_end,
753 const PGBackend& backend,
754 const hobject_t& lower_bound,
755 const std::string& nspace,
756 const uint64_t limit,
757 const PGLSFilter* const filter)
758 {
759 if (!(lower_bound.is_min() ||
760 lower_bound.is_max() ||
761 (lower_bound >= pg_start && lower_bound < pg_end))) {
762 // this should only happen with a buggy client.
763 throw std::invalid_argument("outside of PG bounds");
764 }
765
766 using entries_t = decltype(pg_ls_response_t::entries);
767 return backend.list_objects(lower_bound, limit).then(
768 [&backend, filter, nspace](auto objects, auto next) {
769 return seastar::when_all_succeed(
770 seastar::map_reduce(std::move(objects),
771 [&backend, filter, nspace](const hobject_t& obj) {
772 if (obj.get_namespace() == nspace) {
773 if (filter) {
774 return pgls_filter(*filter, backend, obj);
775 } else {
776 return seastar::make_ready_future<hobject_t>(obj);
777 }
778 } else {
779 return seastar::make_ready_future<hobject_t>(hobject_t{});
780 }
781 },
782 entries_t{},
783 [](entries_t&& entries, hobject_t obj) {
784 if (!obj.is_min()) {
785 entries.emplace_back(obj.oid, obj.get_key());
786 }
787 return entries;
788 }),
789 seastar::make_ready_future<hobject_t>(next));
790 }).then([pg_end](entries_t entries, hobject_t next) {
791 pg_ls_response_t response;
792 response.handle = next.is_max() ? pg_end : next;
793 response.entries = std::move(entries);
794 ceph::bufferlist out;
795 encode(response, out);
796 logger().debug("{}: response.entries.size()=",
797 __func__, response.entries.size());
798 return seastar::make_ready_future<ceph::bufferlist>(std::move(out));
799 });
800 }
801
802 static seastar::future<> do_pgls(
803 const PG& pg,
804 const std::string& nspace,
805 OSDOp& osd_op)
806 {
807 hobject_t lower_bound;
808 auto bp = osd_op.indata.cbegin();
809 try {
810 lower_bound.decode(bp);
811 } catch (const buffer::error&) {
812 throw std::invalid_argument{"unable to decode PGLS handle"};
813 }
814 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
815 const auto pg_end =
816 pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
817 return do_pgls_common(pg_start,
818 pg_end,
819 pg.get_backend(),
820 lower_bound,
821 nspace,
822 osd_op.op.pgls.count,
823 nullptr /* no filter */)
824 .then([&osd_op](bufferlist bl) {
825 osd_op.outdata = std::move(bl);
826 return seastar::now();
827 });
828 }
829
830 static seastar::future<> do_pgls_filtered(
831 const PG& pg,
832 const std::string& nspace,
833 OSDOp& osd_op)
834 {
835 std::string cname, mname, type;
836 auto bp = osd_op.indata.cbegin();
837 try {
838 ceph::decode(cname, bp);
839 ceph::decode(mname, bp);
840 ceph::decode(type, bp);
841 } catch (const buffer::error&) {
842 throw crimson::osd::invalid_argument{};
843 }
844
845 auto filter = get_pgls_filter(type, bp);
846
847 hobject_t lower_bound;
848 try {
849 lower_bound.decode(bp);
850 } catch (const buffer::error&) {
851 throw std::invalid_argument("unable to decode PGLS_FILTER description");
852 }
853
854 logger().debug("{}: cname={}, mname={}, type={}, lower_bound={}, filter={}",
855 __func__, cname, mname, type, lower_bound,
856 static_cast<const void*>(filter.get()));
857 return seastar::do_with(std::move(filter),
858 [&, lower_bound=std::move(lower_bound)](auto&& filter) {
859 const auto pg_start = pg.get_pgid().pgid.get_hobj_start();
860 const auto pg_end = pg.get_pgid().pgid.get_hobj_end(pg.get_pool().info.get_pg_num());
861 return do_pgls_common(pg_start,
862 pg_end,
863 pg.get_backend(),
864 lower_bound,
865 nspace,
866 osd_op.op.pgls.count,
867 filter.get())
868 .then([&osd_op](bufferlist bl) {
869 osd_op.outdata = std::move(bl);
870 return seastar::now();
871 });
872 });
873 }
874
875 seastar::future<>
876 OpsExecuter::execute_pg_op(OSDOp& osd_op)
877 {
878 logger().warn("handling op {}", ceph_osd_op_name(osd_op.op.op));
879 switch (const ceph_osd_op& op = osd_op.op; op.op) {
880 case CEPH_OSD_OP_PGLS:
881 return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
882 return do_pgls(pg, nspace, osd_op);
883 });
884 case CEPH_OSD_OP_PGLS_FILTER:
885 return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
886 return do_pgls_filtered(pg, nspace, osd_op);
887 });
888 case CEPH_OSD_OP_PGNLS:
889 return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
890 return do_pgnls(pg, nspace, osd_op);
891 });
892 case CEPH_OSD_OP_PGNLS_FILTER:
893 return do_pg_op([&osd_op] (const auto& pg, const auto& nspace) {
894 return do_pgnls_filtered(pg, nspace, osd_op);
895 });
896 default:
897 logger().warn("unknown op {}", ceph_osd_op_name(op.op));
898 throw std::runtime_error(
899 fmt::format("op '{}' not supported", ceph_osd_op_name(op.op)));
900 }
901 }
902
903 } // namespace crimson::osd