]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
d/control: depend on python3-yaml for ceph-mgr
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
9f95a23c
TL
15#include <cerrno>
16
7c673cae
FG
17#include "Objecter.h"
18#include "osd/OSDMap.h"
19#include "Filer.h"
20
21#include "mon/MonClient.h"
22
23#include "msg/Messenger.h"
24#include "msg/Message.h"
25
26#include "messages/MPing.h"
27#include "messages/MOSDOp.h"
28#include "messages/MOSDOpReply.h"
29#include "messages/MOSDBackoff.h"
30#include "messages/MOSDMap.h"
31
32#include "messages/MPoolOp.h"
33#include "messages/MPoolOpReply.h"
34
35#include "messages/MGetPoolStats.h"
36#include "messages/MGetPoolStatsReply.h"
37#include "messages/MStatfs.h"
38#include "messages/MStatfsReply.h"
39
40#include "messages/MMonCommand.h"
41
42#include "messages/MCommand.h"
43#include "messages/MCommandReply.h"
44
45#include "messages/MWatchNotify.h"
46
7c673cae
FG
47
48#include "common/config.h"
49#include "common/perf_counters.h"
50#include "common/scrub_types.h"
51#include "include/str_list.h"
52#include "common/errno.h"
53#include "common/EventTrace.h"
54
9f95a23c
TL
55using std::list;
56using std::make_pair;
57using std::map;
58using std::ostream;
59using std::ostringstream;
60using std::pair;
61using std::set;
62using std::string;
63using std::stringstream;
64using std::vector;
65
66using ceph::decode;
67using ceph::encode;
68using ceph::Formatter;
69
70using std::defer_lock;
71
7c673cae
FG
72using ceph::real_time;
73using ceph::real_clock;
74
75using ceph::mono_clock;
76using ceph::mono_time;
77
78using ceph::timespan;
79
9f95a23c
TL
80using ceph::shunique_lock;
81using ceph::acquire_shared;
82using ceph::acquire_unique;
7c673cae
FG
83
84#define dout_subsys ceph_subsys_objecter
85#undef dout_prefix
86#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
87
88
89enum {
90 l_osdc_first = 123200,
91 l_osdc_op_active,
92 l_osdc_op_laggy,
93 l_osdc_op_send,
94 l_osdc_op_send_bytes,
95 l_osdc_op_resend,
96 l_osdc_op_reply,
97
98 l_osdc_op,
99 l_osdc_op_r,
100 l_osdc_op_w,
101 l_osdc_op_rmw,
102 l_osdc_op_pg,
103
104 l_osdc_osdop_stat,
105 l_osdc_osdop_create,
106 l_osdc_osdop_read,
107 l_osdc_osdop_write,
108 l_osdc_osdop_writefull,
109 l_osdc_osdop_writesame,
110 l_osdc_osdop_append,
111 l_osdc_osdop_zero,
112 l_osdc_osdop_truncate,
113 l_osdc_osdop_delete,
114 l_osdc_osdop_mapext,
115 l_osdc_osdop_sparse_read,
116 l_osdc_osdop_clonerange,
117 l_osdc_osdop_getxattr,
118 l_osdc_osdop_setxattr,
119 l_osdc_osdop_cmpxattr,
120 l_osdc_osdop_rmxattr,
121 l_osdc_osdop_resetxattrs,
7c673cae
FG
122 l_osdc_osdop_call,
123 l_osdc_osdop_watch,
124 l_osdc_osdop_notify,
125 l_osdc_osdop_src_cmpxattr,
126 l_osdc_osdop_pgls,
127 l_osdc_osdop_pgls_filter,
128 l_osdc_osdop_other,
129
130 l_osdc_linger_active,
131 l_osdc_linger_send,
132 l_osdc_linger_resend,
133 l_osdc_linger_ping,
134
135 l_osdc_poolop_active,
136 l_osdc_poolop_send,
137 l_osdc_poolop_resend,
138
139 l_osdc_poolstat_active,
140 l_osdc_poolstat_send,
141 l_osdc_poolstat_resend,
142
143 l_osdc_statfs_active,
144 l_osdc_statfs_send,
145 l_osdc_statfs_resend,
146
147 l_osdc_command_active,
148 l_osdc_command_send,
149 l_osdc_command_resend,
150
151 l_osdc_map_epoch,
152 l_osdc_map_full,
153 l_osdc_map_inc,
154
155 l_osdc_osd_sessions,
156 l_osdc_osd_session_open,
157 l_osdc_osd_session_close,
158 l_osdc_osd_laggy,
159
160 l_osdc_osdop_omap_wr,
161 l_osdc_osdop_omap_rd,
162 l_osdc_osdop_omap_del,
163
164 l_osdc_last,
165};
166
167
168// config obs ----------------------------
169
170static const char *config_keys[] = {
171 "crush_location",
172 NULL
173};
174
175class Objecter::RequestStateHook : public AdminSocketHook {
176 Objecter *m_objecter;
177public:
178 explicit RequestStateHook(Objecter *objecter);
9f95a23c
TL
179 int call(std::string_view command, const cmdmap_t& cmdmap,
180 Formatter *f,
181 std::ostream& ss,
182 ceph::buffer::list& out) override;
7c673cae
FG
183};
184
185/**
186 * This is a more limited form of C_Contexts, but that requires
187 * a ceph_context which we don't have here.
188 */
189class ObjectOperation::C_TwoContexts : public Context {
190 Context *first;
191 Context *second;
192public:
193 C_TwoContexts(Context *first, Context *second) :
194 first(first), second(second) {}
195 void finish(int r) override {
196 first->complete(r);
197 second->complete(r);
198 first = NULL;
199 second = NULL;
200 }
201
202 ~C_TwoContexts() override {
203 delete first;
204 delete second;
205 }
206};
207
208void ObjectOperation::add_handler(Context *extra) {
209 size_t last = out_handler.size() - 1;
210 Context *orig = out_handler[last];
211 if (orig) {
212 Context *wrapper = new C_TwoContexts(orig, extra);
213 out_handler[last] = wrapper;
214 } else {
215 out_handler[last] = extra;
216 }
217}
218
219Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
220 object_t& oid)
221{
222 if (oid.name.empty())
223 return unique_completion_lock();
224
225 static constexpr uint32_t HASH_PRIME = 1021;
226 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
227 % HASH_PRIME;
228
229 return unique_completion_lock(completion_locks[h % num_locks],
230 std::defer_lock);
231}
232
233const char** Objecter::get_tracked_conf_keys() const
234{
235 return config_keys;
236}
237
238
11fdf7f2 239void Objecter::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
240 const std::set <std::string> &changed)
241{
242 if (changed.count("crush_location")) {
243 update_crush_location();
244 }
245}
246
247void Objecter::update_crush_location()
248{
249 unique_lock wl(rwlock);
250 crush_location = cct->crush_location.get_location();
251}
252
253// messages ------------------------------
254
255/*
256 * initialize only internal data structures, don't initiate cluster interaction
257 */
258void Objecter::init()
259{
11fdf7f2 260 ceph_assert(!initialized);
7c673cae
FG
261
262 if (!logger) {
263 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
264
265 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
266 PerfCountersBuilder::PRIO_CRITICAL);
267 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
268 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
11fdf7f2 269 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
270 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
271 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
272
273 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
274 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
275 PerfCountersBuilder::PRIO_CRITICAL);
276 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
277 PerfCountersBuilder::PRIO_CRITICAL);
278 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
279 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
280 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
281
282 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
283 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
284 "Create object operations");
285 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
286 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
287 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
288 "Write full object operations");
289 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
290 "Write same operations");
291 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
292 "Append operation");
293 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
294 "Set object to zero operations");
295 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
296 "Truncate object operations");
297 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
298 "Delete object operations");
299 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
300 "Map extent operations");
301 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
302 "Sparse read operations");
303 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
304 "Clone range operations");
305 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
306 "Get xattr operations");
307 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
308 "Set xattr operations");
309 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
310 "Xattr comparison operations");
311 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
312 "Remove xattr operations");
313 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
314 "Reset xattr operations");
7c673cae
FG
315 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
316 "Call (execute) operations");
317 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
318 "Watch by object operations");
319 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
320 "Notify about object operations");
321 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
322 "Extended attribute comparison in multi operations");
323 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
324 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
325 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
326
327 pcb.add_u64(l_osdc_linger_active, "linger_active",
328 "Active lingering operations");
329 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
330 "Sent lingering operations");
331 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
332 "Resent lingering operations");
333 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
334 "Sent pings to lingering operations");
335
336 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
337 "Active pool operations");
338 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
339 "Sent pool operations");
340 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
341 "Resent pool operations");
342
343 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
344 "Active get pool stat operations");
345 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
346 "Pool stat operations sent");
347 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
348 "Resent pool stats");
349
350 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
351 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
352 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
353 "Resent FS stats");
354
355 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
356 pcb.add_u64_counter(l_osdc_command_send, "command_send",
357 "Sent commands");
358 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
359 "Resent commands");
360
361 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
362 pcb.add_u64_counter(l_osdc_map_full, "map_full",
363 "Full OSD maps received");
364 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
365 "Incremental OSD maps received");
366
367 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
368 "Open sessions"); // open sessions
369 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
370 "Sessions opened");
371 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
372 "Sessions closed");
373 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
374
375 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
376 "OSD OMAP write operations");
377 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
378 "OSD OMAP read operations");
379 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
380 "OSD OMAP delete operations");
381
382 logger = pcb.create_perf_counters();
383 cct->get_perfcounters_collection()->add(logger);
384 }
385
386 m_request_state_hook = new RequestStateHook(this);
387 AdminSocket* admin_socket = cct->get_admin_socket();
388 int ret = admin_socket->register_command("objecter_requests",
7c673cae
FG
389 m_request_state_hook,
390 "show in-progress osd requests");
391
392 /* Don't warn on EEXIST, happens if multiple ceph clients
393 * are instantiated from one process */
394 if (ret < 0 && ret != -EEXIST) {
395 lderr(cct) << "error registering admin socket command: "
396 << cpp_strerror(ret) << dendl;
397 }
398
399 update_crush_location();
400
11fdf7f2 401 cct->_conf.add_observer(this);
7c673cae 402
31f18b77 403 initialized = true;
7c673cae
FG
404}
405
406/*
407 * ok, cluster interaction can happen
408 */
409void Objecter::start(const OSDMap* o)
410{
411 shared_lock rl(rwlock);
412
413 start_tick();
414 if (o) {
415 osdmap->deepish_copy_from(*o);
9f95a23c 416 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
417 } else if (osdmap->get_epoch() == 0) {
418 _maybe_request_map();
419 }
420}
421
422void Objecter::shutdown()
423{
11fdf7f2 424 ceph_assert(initialized);
7c673cae
FG
425
426 unique_lock wl(rwlock);
427
31f18b77 428 initialized = false;
7c673cae 429
f64942e4 430 wl.unlock();
11fdf7f2 431 cct->_conf.remove_observer(this);
f64942e4 432 wl.lock();
7c673cae
FG
433
434 map<int,OSDSession*>::iterator p;
435 while (!osd_sessions.empty()) {
436 p = osd_sessions.begin();
437 close_session(p->second);
438 }
439
440 while(!check_latest_map_lingers.empty()) {
441 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
442 i->second->put();
443 check_latest_map_lingers.erase(i->first);
444 }
445
446 while(!check_latest_map_ops.empty()) {
447 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
448 i->second->put();
449 check_latest_map_ops.erase(i->first);
450 }
451
452 while(!check_latest_map_commands.empty()) {
453 map<ceph_tid_t, CommandOp*>::iterator i
454 = check_latest_map_commands.begin();
455 i->second->put();
456 check_latest_map_commands.erase(i->first);
457 }
458
459 while(!poolstat_ops.empty()) {
460 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
461 delete i->second;
462 poolstat_ops.erase(i->first);
463 }
464
465 while(!statfs_ops.empty()) {
466 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
467 delete i->second;
468 statfs_ops.erase(i->first);
469 }
470
471 while(!pool_ops.empty()) {
472 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
473 delete i->second;
474 pool_ops.erase(i->first);
475 }
476
477 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
478 while(!homeless_session->linger_ops.empty()) {
479 std::map<uint64_t, LingerOp*>::iterator i
480 = homeless_session->linger_ops.begin();
481 ldout(cct, 10) << " linger_op " << i->first << dendl;
482 LingerOp *lop = i->second;
483 {
484 OSDSession::unique_lock swl(homeless_session->lock);
485 _session_linger_op_remove(homeless_session, lop);
486 }
487 linger_ops.erase(lop->linger_id);
488 linger_ops_set.erase(lop);
489 lop->put();
490 }
491
492 while(!homeless_session->ops.empty()) {
493 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
494 ldout(cct, 10) << " op " << i->first << dendl;
495 Op *op = i->second;
496 {
497 OSDSession::unique_lock swl(homeless_session->lock);
498 _session_op_remove(homeless_session, op);
499 }
500 op->put();
501 }
502
503 while(!homeless_session->command_ops.empty()) {
504 std::map<ceph_tid_t, CommandOp*>::iterator i
505 = homeless_session->command_ops.begin();
506 ldout(cct, 10) << " command_op " << i->first << dendl;
507 CommandOp *cop = i->second;
508 {
509 OSDSession::unique_lock swl(homeless_session->lock);
510 _session_command_op_remove(homeless_session, cop);
511 }
512 cop->put();
513 }
514
515 if (tick_event) {
516 if (timer.cancel_event(tick_event)) {
517 ldout(cct, 10) << " successfully canceled tick" << dendl;
518 }
519 tick_event = 0;
520 }
521
522 if (logger) {
523 cct->get_perfcounters_collection()->remove(logger);
524 delete logger;
525 logger = NULL;
526 }
527
528 // Let go of Objecter write lock so timer thread can shutdown
529 wl.unlock();
530
531 // Outside of lock to avoid cycle WRT calls to RequestStateHook
532 // This is safe because we guarantee no concurrent calls to
533 // shutdown() with the ::initialized check at start.
534 if (m_request_state_hook) {
535 AdminSocket* admin_socket = cct->get_admin_socket();
9f95a23c 536 admin_socket->unregister_commands(m_request_state_hook);
7c673cae
FG
537 delete m_request_state_hook;
538 m_request_state_hook = NULL;
539 }
540}
541
542void Objecter::_send_linger(LingerOp *info,
543 shunique_lock& sul)
544{
11fdf7f2 545 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
546
547 vector<OSDOp> opv;
548 Context *oncommit = NULL;
549 LingerOp::shared_lock watchl(info->watch_lock);
9f95a23c 550 ceph::buffer::list *poutbl = NULL;
7c673cae
FG
551 if (info->registered && info->is_watch) {
552 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
553 << dendl;
554 opv.push_back(OSDOp());
555 opv.back().op.op = CEPH_OSD_OP_WATCH;
556 opv.back().op.watch.cookie = info->get_cookie();
557 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
558 opv.back().op.watch.gen = ++info->register_gen;
559 oncommit = new C_Linger_Reconnect(this, info);
560 } else {
561 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
562 << dendl;
563 opv = info->ops;
564 C_Linger_Commit *c = new C_Linger_Commit(this, info);
565 if (!info->is_watch) {
566 info->notify_id = 0;
567 poutbl = &c->outbl;
568 }
569 oncommit = c;
570 }
571 watchl.unlock();
572 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
573 opv, info->target.flags | CEPH_OSD_FLAG_READ,
574 oncommit, info->pobjver);
575 o->outbl = poutbl;
576 o->snapid = info->snap;
577 o->snapc = info->snapc;
578 o->mtime = info->mtime;
579
580 o->target = info->target;
31f18b77 581 o->tid = ++last_tid;
7c673cae
FG
582
583 // do not resend this; we will send a new op to reregister
584 o->should_resend = false;
11fdf7f2 585 o->ctx_budgeted = true;
7c673cae
FG
586
587 if (info->register_tid) {
11fdf7f2 588 // repeat send. cancel old registration op, if any.
7c673cae
FG
589 OSDSession::unique_lock sl(info->session->lock);
590 if (info->session->ops.count(info->register_tid)) {
591 Op *o = info->session->ops[info->register_tid];
592 _op_cancel_map_check(o);
593 _cancel_linger_op(o);
594 }
595 sl.unlock();
7c673cae
FG
596 }
597
11fdf7f2
TL
598 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
599
7c673cae
FG
600 logger->inc(l_osdc_linger_send);
601}
602
9f95a23c 603void Objecter::_linger_commit(LingerOp *info, int r, ceph::buffer::list& outbl)
7c673cae
FG
604{
605 LingerOp::unique_lock wl(info->watch_lock);
606 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
607 if (info->on_reg_commit) {
608 info->on_reg_commit->complete(r);
609 info->on_reg_commit = NULL;
610 }
28e407b8
AA
611 if (r < 0 && info->on_notify_finish) {
612 info->on_notify_finish->complete(r);
613 info->on_notify_finish = nullptr;
614 }
7c673cae
FG
615
616 // only tell the user the first time we do this
617 info->registered = true;
618 info->pobjver = NULL;
619
620 if (!info->is_watch) {
621 // make note of the notify_id
11fdf7f2 622 auto p = outbl.cbegin();
7c673cae 623 try {
11fdf7f2 624 decode(info->notify_id, p);
7c673cae
FG
625 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
626 << dendl;
627 }
9f95a23c 628 catch (ceph::buffer::error& e) {
7c673cae
FG
629 }
630 }
631}
632
633struct C_DoWatchError : public Context {
634 Objecter *objecter;
635 Objecter::LingerOp *info;
636 int err;
637 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
638 : objecter(o), info(i), err(r) {
639 info->get();
640 info->_queued_async();
641 }
642 void finish(int r) override {
643 Objecter::unique_lock wl(objecter->rwlock);
644 bool canceled = info->canceled;
645 wl.unlock();
646
647 if (!canceled) {
648 info->watch_context->handle_error(info->get_cookie(), err);
649 }
650
651 info->finished_async();
652 info->put();
653 }
654};
655
656int Objecter::_normalize_watch_error(int r)
657{
658 // translate ENOENT -> ENOTCONN so that a delete->disconnection
11fdf7f2 659 // notification and a failure to reconnect because we raced with
7c673cae
FG
660 // the delete appear the same to the user.
661 if (r == -ENOENT)
662 r = -ENOTCONN;
663 return r;
664}
665
666void Objecter::_linger_reconnect(LingerOp *info, int r)
667{
668 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
669 << " (last_error " << info->last_error << ")" << dendl;
670 if (r < 0) {
671 LingerOp::unique_lock wl(info->watch_lock);
672 if (!info->last_error) {
673 r = _normalize_watch_error(r);
674 info->last_error = r;
675 if (info->watch_context) {
676 finisher->queue(new C_DoWatchError(this, info, r));
677 }
678 }
679 wl.unlock();
680 }
681}
682
683void Objecter::_send_linger_ping(LingerOp *info)
684{
685 // rwlock is locked unique
686 // info->session->lock is locked
687
688 if (cct->_conf->objecter_inject_no_watch_ping) {
689 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
690 << dendl;
691 return;
692 }
693 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
694 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
695 return;
696 }
697
11fdf7f2 698 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
7c673cae
FG
699 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
700 << dendl;
701
702 vector<OSDOp> opv(1);
703 opv[0].op.op = CEPH_OSD_OP_WATCH;
704 opv[0].op.watch.cookie = info->get_cookie();
705 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
706 opv[0].op.watch.gen = info->register_gen;
707 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
708 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
709 opv, info->target.flags | CEPH_OSD_FLAG_READ,
710 onack, NULL, NULL);
711 o->target = info->target;
712 o->should_resend = false;
713 _send_op_account(o);
31f18b77 714 o->tid = ++last_tid;
7c673cae 715 _session_op_assign(info->session, o);
11fdf7f2 716 _send_op(o);
7c673cae
FG
717 info->ping_tid = o->tid;
718
719 onack->sent = now;
720 logger->inc(l_osdc_linger_ping);
721}
722
11fdf7f2 723void Objecter::_linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
7c673cae
FG
724 uint32_t register_gen)
725{
726 LingerOp::unique_lock l(info->watch_lock);
727 ldout(cct, 10) << __func__ << " " << info->linger_id
728 << " sent " << sent << " gen " << register_gen << " = " << r
729 << " (last_error " << info->last_error
730 << " register_gen " << info->register_gen << ")" << dendl;
731 if (info->register_gen == register_gen) {
732 if (r == 0) {
733 info->watch_valid_thru = sent;
734 } else if (r < 0 && !info->last_error) {
735 r = _normalize_watch_error(r);
736 info->last_error = r;
737 if (info->watch_context) {
738 finisher->queue(new C_DoWatchError(this, info, r));
739 }
740 }
741 } else {
742 ldout(cct, 20) << " ignoring old gen" << dendl;
743 }
744}
745
746int Objecter::linger_check(LingerOp *info)
747{
748 LingerOp::shared_lock l(info->watch_lock);
749
11fdf7f2 750 ceph::coarse_mono_time stamp = info->watch_valid_thru;
7c673cae 751 if (!info->watch_pending_async.empty())
11fdf7f2
TL
752 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
753 auto age = ceph::coarse_mono_clock::now() - stamp;
7c673cae
FG
754
755 ldout(cct, 10) << __func__ << " " << info->linger_id
756 << " err " << info->last_error
757 << " age " << age << dendl;
758 if (info->last_error)
759 return info->last_error;
760 // return a safe upper bound (we are truncating to ms)
761 return
762 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
763}
764
765void Objecter::linger_cancel(LingerOp *info)
766{
767 unique_lock wl(rwlock);
768 _linger_cancel(info);
769 info->put();
770}
771
772void Objecter::_linger_cancel(LingerOp *info)
773{
774 // rwlock is locked unique
775 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
776 if (!info->canceled) {
777 OSDSession *s = info->session;
778 OSDSession::unique_lock sl(s->lock);
779 _session_linger_op_remove(s, info);
780 sl.unlock();
781
782 linger_ops.erase(info->linger_id);
783 linger_ops_set.erase(info);
11fdf7f2 784 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
785
786 info->canceled = true;
787 info->put();
788
789 logger->dec(l_osdc_linger_active);
790 }
791}
792
793
794
795Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
796 const object_locator_t& oloc,
797 int flags)
798{
11fdf7f2 799 LingerOp *info = new LingerOp(this);
7c673cae
FG
800 info->target.base_oid = oid;
801 info->target.base_oloc = oloc;
802 if (info->target.base_oloc.key == oid)
803 info->target.base_oloc.key.clear();
804 info->target.flags = flags;
11fdf7f2 805 info->watch_valid_thru = ceph::coarse_mono_clock::now();
7c673cae
FG
806
807 unique_lock l(rwlock);
808
809 // Acquire linger ID
810 info->linger_id = ++max_linger_id;
811 ldout(cct, 10) << __func__ << " info " << info
812 << " linger_id " << info->linger_id
813 << " cookie " << info->get_cookie()
814 << dendl;
815 linger_ops[info->linger_id] = info;
816 linger_ops_set.insert(info);
11fdf7f2 817 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
818
819 info->get(); // for the caller
820 return info;
821}
822
823ceph_tid_t Objecter::linger_watch(LingerOp *info,
824 ObjectOperation& op,
825 const SnapContext& snapc,
826 real_time mtime,
9f95a23c 827 ceph::buffer::list& inbl,
7c673cae
FG
828 Context *oncommit,
829 version_t *objver)
830{
831 info->is_watch = true;
832 info->snapc = snapc;
833 info->mtime = mtime;
834 info->target.flags |= CEPH_OSD_FLAG_WRITE;
835 info->ops = op.ops;
836 info->inbl = inbl;
837 info->poutbl = NULL;
838 info->pobjver = objver;
839 info->on_reg_commit = oncommit;
840
11fdf7f2
TL
841 info->ctx_budget = take_linger_budget(info);
842
7c673cae
FG
843 shunique_lock sul(rwlock, ceph::acquire_unique);
844 _linger_submit(info, sul);
845 logger->inc(l_osdc_linger_active);
846
847 return info->linger_id;
848}
849
850ceph_tid_t Objecter::linger_notify(LingerOp *info,
851 ObjectOperation& op,
9f95a23c
TL
852 snapid_t snap, ceph::buffer::list& inbl,
853 ceph::buffer::list *poutbl,
7c673cae
FG
854 Context *onfinish,
855 version_t *objver)
856{
857 info->snap = snap;
858 info->target.flags |= CEPH_OSD_FLAG_READ;
859 info->ops = op.ops;
860 info->inbl = inbl;
861 info->poutbl = poutbl;
862 info->pobjver = objver;
863 info->on_reg_commit = onfinish;
864
11fdf7f2
TL
865 info->ctx_budget = take_linger_budget(info);
866
7c673cae
FG
867 shunique_lock sul(rwlock, ceph::acquire_unique);
868 _linger_submit(info, sul);
869 logger->inc(l_osdc_linger_active);
870
871 return info->linger_id;
872}
873
874void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
875{
11fdf7f2
TL
876 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
877 ceph_assert(info->linger_id);
878 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
7c673cae
FG
879
880 // Populate Op::target
881 OSDSession *s = NULL;
882 _calc_target(&info->target, nullptr);
883
884 // Create LingerOp<->OSDSession relation
885 int r = _get_session(info->target.osd, &s, sul);
11fdf7f2 886 ceph_assert(r == 0);
7c673cae
FG
887 OSDSession::unique_lock sl(s->lock);
888 _session_linger_op_assign(s, info);
889 sl.unlock();
890 put_session(s);
891
892 _send_linger(info, sul);
893}
894
895struct C_DoWatchNotify : public Context {
896 Objecter *objecter;
897 Objecter::LingerOp *info;
898 MWatchNotify *msg;
899 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
900 : objecter(o), info(i), msg(m) {
901 info->get();
902 info->_queued_async();
903 msg->get();
904 }
905 void finish(int r) override {
906 objecter->_do_watch_notify(info, msg);
907 }
908};
909
910void Objecter::handle_watch_notify(MWatchNotify *m)
911{
912 shared_lock l(rwlock);
31f18b77 913 if (!initialized) {
7c673cae
FG
914 return;
915 }
916
917 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
918 if (linger_ops_set.count(info) == 0) {
919 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
920 return;
921 }
922 LingerOp::unique_lock wl(info->watch_lock);
923 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
924 if (!info->last_error) {
925 info->last_error = -ENOTCONN;
926 if (info->watch_context) {
927 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
928 }
929 }
930 } else if (!info->is_watch) {
931 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
932 // since we know the only user (librados) is safe to call in
933 // fast-dispatch context
934 if (info->notify_id &&
935 info->notify_id != m->notify_id) {
936 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
937 << " != " << info->notify_id << ", ignoring" << dendl;
938 } else if (info->on_notify_finish) {
939 info->notify_result_bl->claim(m->get_data());
940 info->on_notify_finish->complete(m->return_code);
941
942 // if we race with reconnect we might get a second notify; only
943 // notify the caller once!
944 info->on_notify_finish = NULL;
945 }
946 } else {
947 finisher->queue(new C_DoWatchNotify(this, info, m));
948 }
949}
950
951void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
952{
953 ldout(cct, 10) << __func__ << " " << *m << dendl;
954
955 shared_lock l(rwlock);
11fdf7f2 956 ceph_assert(initialized);
7c673cae
FG
957
958 if (info->canceled) {
959 l.unlock();
960 goto out;
961 }
962
963 // notify completion?
11fdf7f2
TL
964 ceph_assert(info->is_watch);
965 ceph_assert(info->watch_context);
966 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
7c673cae
FG
967
968 l.unlock();
969
970 switch (m->opcode) {
971 case CEPH_WATCH_EVENT_NOTIFY:
972 info->watch_context->handle_notify(m->notify_id, m->cookie,
973 m->notifier_gid, m->bl);
974 break;
975 }
976
977 out:
978 info->finished_async();
979 info->put();
980 m->put();
981}
982
983bool Objecter::ms_dispatch(Message *m)
984{
985 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
7c673cae
FG
986 switch (m->get_type()) {
987 // these we exlusively handle
988 case CEPH_MSG_OSD_OPREPLY:
989 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
990 return true;
991
992 case CEPH_MSG_OSD_BACKOFF:
993 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
994 return true;
995
996 case CEPH_MSG_WATCH_NOTIFY:
997 handle_watch_notify(static_cast<MWatchNotify*>(m));
998 m->put();
999 return true;
1000
1001 case MSG_COMMAND_REPLY:
1002 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
1003 handle_command_reply(static_cast<MCommandReply*>(m));
1004 return true;
1005 } else {
1006 return false;
1007 }
1008
1009 case MSG_GETPOOLSTATSREPLY:
1010 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
1011 return true;
1012
1013 case CEPH_MSG_POOLOP_REPLY:
1014 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1015 return true;
1016
1017 case CEPH_MSG_STATFS_REPLY:
1018 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1019 return true;
1020
1021 // these we give others a chance to inspect
1022
1023 // MDS, OSD
1024 case CEPH_MSG_OSD_MAP:
1025 handle_osd_map(static_cast<MOSDMap*>(m));
1026 return false;
1027 }
1028 return false;
1029}
1030
11fdf7f2
TL
1031void Objecter::_scan_requests(
1032 OSDSession *s,
1033 bool skipped_map,
1034 bool cluster_full,
1035 map<int64_t, bool> *pool_full_map,
1036 map<ceph_tid_t, Op*>& need_resend,
1037 list<LingerOp*>& need_resend_linger,
1038 map<ceph_tid_t, CommandOp*>& need_resend_command,
9f95a23c 1039 shunique_lock& sul)
7c673cae 1040{
11fdf7f2 1041 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
1042
1043 list<LingerOp*> unregister_lingers;
1044
1045 OSDSession::unique_lock sl(s->lock);
1046
1047 // check for changed linger mappings (_before_ regular ops)
1048 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1049 while (lp != s->linger_ops.end()) {
1050 LingerOp *op = lp->second;
11fdf7f2 1051 ceph_assert(op->session == s);
7c673cae
FG
1052 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1053 // invalidation
1054 ++lp;
1055 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1056 bool unregister, force_resend_writes = cluster_full;
1057 int r = _recalc_linger_op_target(op, sul);
1058 if (pool_full_map)
1059 force_resend_writes = force_resend_writes ||
1060 (*pool_full_map)[op->target.base_oloc.pool];
1061 switch (r) {
1062 case RECALC_OP_TARGET_NO_ACTION:
11fdf7f2 1063 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1064 break;
1065 // -- fall-thru --
1066 case RECALC_OP_TARGET_NEED_RESEND:
1067 need_resend_linger.push_back(op);
1068 _linger_cancel_map_check(op);
1069 break;
1070 case RECALC_OP_TARGET_POOL_DNE:
1071 _check_linger_pool_dne(op, &unregister);
1072 if (unregister) {
1073 ldout(cct, 10) << " need to unregister linger op "
1074 << op->linger_id << dendl;
1075 op->get();
1076 unregister_lingers.push_back(op);
1077 }
1078 break;
1079 }
1080 }
1081
1082 // check for changed request mappings
1083 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1084 while (p != s->ops.end()) {
1085 Op *op = p->second;
1086 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1087 ldout(cct, 10) << " checking op " << op->tid << dendl;
11fdf7f2 1088 _prune_snapc(osdmap->get_new_removed_snaps(), op);
7c673cae
FG
1089 bool force_resend_writes = cluster_full;
1090 if (pool_full_map)
1091 force_resend_writes = force_resend_writes ||
1092 (*pool_full_map)[op->target.base_oloc.pool];
1093 int r = _calc_target(&op->target,
1094 op->session ? op->session->con.get() : nullptr);
1095 switch (r) {
1096 case RECALC_OP_TARGET_NO_ACTION:
9f95a23c 1097 if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
7c673cae
FG
1098 break;
1099 // -- fall-thru --
1100 case RECALC_OP_TARGET_NEED_RESEND:
11fdf7f2 1101 _session_op_remove(op->session, op);
7c673cae
FG
1102 need_resend[op->tid] = op;
1103 _op_cancel_map_check(op);
1104 break;
1105 case RECALC_OP_TARGET_POOL_DNE:
1106 _check_op_pool_dne(op, &sl);
1107 break;
1108 }
1109 }
1110
1111 // commands
1112 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1113 while (cp != s->command_ops.end()) {
1114 CommandOp *c = cp->second;
1115 ++cp;
1116 ldout(cct, 10) << " checking command " << c->tid << dendl;
1117 bool force_resend_writes = cluster_full;
1118 if (pool_full_map)
1119 force_resend_writes = force_resend_writes ||
1120 (*pool_full_map)[c->target_pg.pool()];
1121 int r = _calc_command_target(c, sul);
1122 switch (r) {
1123 case RECALC_OP_TARGET_NO_ACTION:
1124 // resend if skipped map; otherwise do nothing.
11fdf7f2 1125 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1126 break;
1127 // -- fall-thru --
1128 case RECALC_OP_TARGET_NEED_RESEND:
1129 need_resend_command[c->tid] = c;
11fdf7f2 1130 _session_command_op_remove(c->session, c);
7c673cae
FG
1131 _command_cancel_map_check(c);
1132 break;
1133 case RECALC_OP_TARGET_POOL_DNE:
1134 case RECALC_OP_TARGET_OSD_DNE:
1135 case RECALC_OP_TARGET_OSD_DOWN:
1136 _check_command_map_dne(c);
1137 break;
1138 }
1139 }
1140
1141 sl.unlock();
1142
1143 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1144 iter != unregister_lingers.end();
1145 ++iter) {
1146 _linger_cancel(*iter);
1147 (*iter)->put();
1148 }
1149}
1150
1151void Objecter::handle_osd_map(MOSDMap *m)
1152{
1153 shunique_lock sul(rwlock, acquire_unique);
31f18b77 1154 if (!initialized)
7c673cae
FG
1155 return;
1156
11fdf7f2 1157 ceph_assert(osdmap);
7c673cae
FG
1158
1159 if (m->fsid != monc->get_fsid()) {
1160 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1161 << " != " << monc->get_fsid() << dendl;
1162 return;
1163 }
1164
1165 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1166 bool cluster_full = _osdmap_full_flag();
1167 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1168 _osdmap_has_pool_full();
1169 map<int64_t, bool> pool_full_map;
1170 for (map<int64_t, pg_pool_t>::const_iterator it
1171 = osdmap->get_pools().begin();
1172 it != osdmap->get_pools().end(); ++it)
1173 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1174
1175
1176 list<LingerOp*> need_resend_linger;
1177 map<ceph_tid_t, Op*> need_resend;
1178 map<ceph_tid_t, CommandOp*> need_resend_command;
1179
1180 if (m->get_last() <= osdmap->get_epoch()) {
1181 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1182 << m->get_first() << "," << m->get_last()
1183 << "] <= " << osdmap->get_epoch() << dendl;
1184 } else {
1185 ldout(cct, 3) << "handle_osd_map got epochs ["
1186 << m->get_first() << "," << m->get_last()
1187 << "] > " << osdmap->get_epoch() << dendl;
1188
1189 if (osdmap->get_epoch()) {
1190 bool skipped_map = false;
1191 // we want incrementals
1192 for (epoch_t e = osdmap->get_epoch() + 1;
1193 e <= m->get_last();
1194 e++) {
1195
1196 if (osdmap->get_epoch() == e-1 &&
1197 m->incremental_maps.count(e)) {
1198 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1199 << dendl;
1200 OSDMap::Incremental inc(m->incremental_maps[e]);
1201 osdmap->apply_incremental(inc);
31f18b77
FG
1202
1203 emit_blacklist_events(inc);
1204
7c673cae
FG
1205 logger->inc(l_osdc_map_inc);
1206 }
1207 else if (m->maps.count(e)) {
1208 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
9f95a23c 1209 auto new_osdmap = std::make_unique<OSDMap>();
31f18b77
FG
1210 new_osdmap->decode(m->maps[e]);
1211
1212 emit_blacklist_events(*osdmap, *new_osdmap);
9f95a23c 1213 osdmap = std::move(new_osdmap);
31f18b77 1214
7c673cae
FG
1215 logger->inc(l_osdc_map_full);
1216 }
1217 else {
1218 if (e >= m->get_oldest()) {
1219 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1220 << osdmap->get_epoch()+1 << dendl;
1221 _maybe_request_map();
1222 break;
1223 }
1224 ldout(cct, 3) << "handle_osd_map missing epoch "
1225 << osdmap->get_epoch()+1
1226 << ", jumping to " << m->get_oldest() << dendl;
1227 e = m->get_oldest() - 1;
1228 skipped_map = true;
1229 continue;
1230 }
1231 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1232
9f95a23c 1233 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1234 cluster_full = cluster_full || _osdmap_full_flag();
1235 update_pool_full_map(pool_full_map);
1236
1237 // check all outstanding requests on every epoch
11fdf7f2
TL
1238 for (auto& i : need_resend) {
1239 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
11fdf7f2 1240 }
7c673cae
FG
1241 _scan_requests(homeless_session, skipped_map, cluster_full,
1242 &pool_full_map, need_resend,
9f95a23c 1243 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1244 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1245 p != osd_sessions.end(); ) {
1246 OSDSession *s = p->second;
1247 _scan_requests(s, skipped_map, cluster_full,
1248 &pool_full_map, need_resend,
9f95a23c 1249 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1250 ++p;
1251 // osd down or addr change?
1252 if (!osdmap->is_up(s->osd) ||
1253 (s->con &&
11fdf7f2 1254 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
7c673cae
FG
1255 close_session(s);
1256 }
1257 }
1258
11fdf7f2 1259 ceph_assert(e == osdmap->get_epoch());
7c673cae
FG
1260 }
1261
1262 } else {
1263 // first map. we want the full thing.
1264 if (m->maps.count(m->get_last())) {
1265 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1266 p != osd_sessions.end(); ++p) {
1267 OSDSession *s = p->second;
1268 _scan_requests(s, false, false, NULL, need_resend,
9f95a23c 1269 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1270 }
1271 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1272 << m->get_last() << dendl;
1273 osdmap->decode(m->maps[m->get_last()]);
9f95a23c 1274 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1275
1276 _scan_requests(homeless_session, false, false, NULL,
1277 need_resend, need_resend_linger,
9f95a23c 1278 need_resend_command, sul);
7c673cae
FG
1279 } else {
1280 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1281 << dendl;
1282 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1283 monc->renew_subs();
1284 }
1285 }
1286 }
1287
1288 // make sure need_resend targets reflect latest map
1289 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1290 Op *op = p->second;
1291 if (op->target.epoch < osdmap->get_epoch()) {
1292 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1293 int r = _calc_target(&op->target, nullptr);
1294 if (r == RECALC_OP_TARGET_POOL_DNE) {
1295 p = need_resend.erase(p);
1296 _check_op_pool_dne(op, nullptr);
1297 } else {
1298 ++p;
1299 }
1300 } else {
1301 ++p;
1302 }
1303 }
1304
1305 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1306 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1307 || _osdmap_has_pool_full();
1308
1309 // was/is paused?
1310 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1311 osdmap->get_epoch() < epoch_barrier) {
1312 _maybe_request_map();
1313 }
1314
1315 // resend requests
1316 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1317 p != need_resend.end(); ++p) {
1318 Op *op = p->second;
1319 OSDSession *s = op->session;
1320 bool mapped_session = false;
1321 if (!s) {
1322 int r = _map_session(&op->target, &s, sul);
11fdf7f2 1323 ceph_assert(r == 0);
7c673cae
FG
1324 mapped_session = true;
1325 } else {
1326 get_session(s);
1327 }
1328 OSDSession::unique_lock sl(s->lock);
1329 if (mapped_session) {
1330 _session_op_assign(s, op);
1331 }
1332 if (op->should_resend) {
1333 if (!op->session->is_homeless() && !op->target.paused) {
1334 logger->inc(l_osdc_op_resend);
1335 _send_op(op);
1336 }
1337 } else {
1338 _op_cancel_map_check(op);
1339 _cancel_linger_op(op);
1340 }
1341 sl.unlock();
1342 put_session(s);
1343 }
1344 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1345 p != need_resend_linger.end(); ++p) {
1346 LingerOp *op = *p;
11fdf7f2 1347 ceph_assert(op->session);
7c673cae
FG
1348 if (!op->session->is_homeless()) {
1349 logger->inc(l_osdc_linger_resend);
1350 _send_linger(op, sul);
1351 }
1352 }
1353 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1354 p != need_resend_command.end(); ++p) {
1355 CommandOp *c = p->second;
1356 if (c->target.osd >= 0) {
1357 _assign_command_session(c, sul);
1358 if (c->session && !c->session->is_homeless()) {
1359 _send_command(c);
1360 }
1361 }
1362 }
1363
1364 _dump_active();
1365
1366 // finish any Contexts that were waiting on a map update
1367 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1368 waiting_for_map.begin();
1369 while (p != waiting_for_map.end() &&
1370 p->first <= osdmap->get_epoch()) {
1371 //go through the list and call the onfinish methods
1372 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1373 i != p->second.end(); ++i) {
1374 i->first->complete(i->second);
1375 }
1376 waiting_for_map.erase(p++);
1377 }
1378
1379 monc->sub_got("osdmap", osdmap->get_epoch());
1380
1381 if (!waiting_for_map.empty()) {
1382 _maybe_request_map();
1383 }
1384}
1385
31f18b77
FG
1386void Objecter::enable_blacklist_events()
1387{
1388 unique_lock wl(rwlock);
1389
1390 blacklist_events_enabled = true;
1391}
1392
1393void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1394{
1395 unique_lock wl(rwlock);
1396
1397 if (events->empty()) {
1398 events->swap(blacklist_events);
1399 } else {
1400 for (const auto &i : blacklist_events) {
1401 events->insert(i);
1402 }
1403 blacklist_events.clear();
1404 }
1405}
1406
1407void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1408{
1409 if (!blacklist_events_enabled) {
1410 return;
1411 }
1412
1413 for (const auto &i : inc.new_blacklist) {
1414 blacklist_events.insert(i.first);
1415 }
1416}
1417
1418void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1419 const OSDMap &new_osd_map)
1420{
1421 if (!blacklist_events_enabled) {
1422 return;
1423 }
1424
1425 std::set<entity_addr_t> old_set;
1426 std::set<entity_addr_t> new_set;
1427
1428 old_osd_map.get_blacklist(&old_set);
1429 new_osd_map.get_blacklist(&new_set);
1430
1431 std::set<entity_addr_t> delta_set;
1432 std::set_difference(
1433 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1434 std::inserter(delta_set, delta_set.begin()));
1435 blacklist_events.insert(delta_set.begin(), delta_set.end());
1436}
1437
7c673cae
FG
1438// op pool check
1439
1440void Objecter::C_Op_Map_Latest::finish(int r)
1441{
1442 if (r == -EAGAIN || r == -ECANCELED)
1443 return;
1444
1445 lgeneric_subdout(objecter->cct, objecter, 10)
1446 << "op_map_latest r=" << r << " tid=" << tid
1447 << " latest " << latest << dendl;
1448
1449 Objecter::unique_lock wl(objecter->rwlock);
1450
1451 map<ceph_tid_t, Op*>::iterator iter =
1452 objecter->check_latest_map_ops.find(tid);
1453 if (iter == objecter->check_latest_map_ops.end()) {
1454 lgeneric_subdout(objecter->cct, objecter, 10)
1455 << "op_map_latest op "<< tid << " not found" << dendl;
1456 return;
1457 }
1458
1459 Op *op = iter->second;
1460 objecter->check_latest_map_ops.erase(iter);
1461
1462 lgeneric_subdout(objecter->cct, objecter, 20)
1463 << "op_map_latest op "<< op << dendl;
1464
1465 if (op->map_dne_bound == 0)
1466 op->map_dne_bound = latest;
1467
1468 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1469 objecter->_check_op_pool_dne(op, &sl);
1470
1471 op->put();
1472}
1473
1474int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1475 snapid_t *snap) const
1476{
1477 shared_lock rl(rwlock);
1478
1479 auto& pools = osdmap->get_pools();
1480 auto iter = pools.find(poolid);
1481 if (iter == pools.end()) {
1482 return -ENOENT;
1483 }
1484 const pg_pool_t& pg_pool = iter->second;
1485 for (auto p = pg_pool.snaps.begin();
1486 p != pg_pool.snaps.end();
1487 ++p) {
1488 if (p->second.name == snap_name) {
1489 *snap = p->first;
1490 return 0;
1491 }
1492 }
1493 return -ENOENT;
1494}
1495
1496int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1497 pool_snap_info_t *info) const
1498{
1499 shared_lock rl(rwlock);
1500
1501 auto& pools = osdmap->get_pools();
1502 auto iter = pools.find(poolid);
1503 if (iter == pools.end()) {
1504 return -ENOENT;
1505 }
1506 const pg_pool_t& pg_pool = iter->second;
1507 auto p = pg_pool.snaps.find(snap);
1508 if (p == pg_pool.snaps.end())
1509 return -ENOENT;
1510 *info = p->second;
1511
1512 return 0;
1513}
1514
1515int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1516{
1517 shared_lock rl(rwlock);
1518
1519 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1520 if (!pi)
1521 return -ENOENT;
1522 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1523 p != pi->snaps.end();
1524 ++p) {
1525 snaps->push_back(p->first);
1526 }
1527 return 0;
1528}
1529
1530// sl may be unlocked.
1531void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1532{
1533 // rwlock is locked unique
1534
1535 if (op->target.pool_ever_existed) {
1536 // the pool previously existed and now it does not, which means it
1537 // was deleted.
1538 op->map_dne_bound = osdmap->get_epoch();
1539 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1540 << " pool previously exists but now does not"
1541 << dendl;
1542 } else {
1543 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1544 << " current " << osdmap->get_epoch()
1545 << " map_dne_bound " << op->map_dne_bound
1546 << dendl;
1547 }
1548 if (op->map_dne_bound > 0) {
1549 if (osdmap->get_epoch() >= op->map_dne_bound) {
1550 // we had a new enough map
1551 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1552 << " concluding pool " << op->target.base_pgid.pool()
1553 << " dne" << dendl;
1554 if (op->onfinish) {
11fdf7f2 1555 num_in_flight--;
7c673cae
FG
1556 op->onfinish->complete(-ENOENT);
1557 }
1558
1559 OSDSession *s = op->session;
1560 if (s) {
11fdf7f2
TL
1561 ceph_assert(s != NULL);
1562 ceph_assert(sl->mutex() == &s->lock);
7c673cae
FG
1563 bool session_locked = sl->owns_lock();
1564 if (!session_locked) {
1565 sl->lock();
1566 }
1567 _finish_op(op, 0);
1568 if (!session_locked) {
1569 sl->unlock();
1570 }
1571 } else {
1572 _finish_op(op, 0); // no session
1573 }
1574 }
1575 } else {
1576 _send_op_map_check(op);
1577 }
1578}
1579
1580void Objecter::_send_op_map_check(Op *op)
1581{
1582 // rwlock is locked unique
1583 // ask the monitor
1584 if (check_latest_map_ops.count(op->tid) == 0) {
1585 op->get();
1586 check_latest_map_ops[op->tid] = op;
1587 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1588 monc->get_version("osdmap", &c->latest, NULL, c);
1589 }
1590}
1591
1592void Objecter::_op_cancel_map_check(Op *op)
1593{
1594 // rwlock is locked unique
1595 map<ceph_tid_t, Op*>::iterator iter =
1596 check_latest_map_ops.find(op->tid);
1597 if (iter != check_latest_map_ops.end()) {
1598 Op *op = iter->second;
1599 op->put();
1600 check_latest_map_ops.erase(iter);
1601 }
1602}
1603
1604// linger pool check
1605
1606void Objecter::C_Linger_Map_Latest::finish(int r)
1607{
1608 if (r == -EAGAIN || r == -ECANCELED) {
1609 // ignore callback; we will retry in resend_mon_ops()
1610 return;
1611 }
1612
1613 unique_lock wl(objecter->rwlock);
1614
1615 map<uint64_t, LingerOp*>::iterator iter =
1616 objecter->check_latest_map_lingers.find(linger_id);
1617 if (iter == objecter->check_latest_map_lingers.end()) {
1618 return;
1619 }
1620
1621 LingerOp *op = iter->second;
1622 objecter->check_latest_map_lingers.erase(iter);
1623
1624 if (op->map_dne_bound == 0)
1625 op->map_dne_bound = latest;
1626
1627 bool unregister;
1628 objecter->_check_linger_pool_dne(op, &unregister);
1629
1630 if (unregister) {
1631 objecter->_linger_cancel(op);
1632 }
1633
1634 op->put();
1635}
1636
1637void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1638{
1639 // rwlock is locked unique
1640
1641 *need_unregister = false;
1642
1643 if (op->register_gen > 0) {
1644 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1645 << " pool previously existed but now does not"
1646 << dendl;
1647 op->map_dne_bound = osdmap->get_epoch();
1648 } else {
1649 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1650 << " current " << osdmap->get_epoch()
1651 << " map_dne_bound " << op->map_dne_bound
1652 << dendl;
1653 }
1654 if (op->map_dne_bound > 0) {
1655 if (osdmap->get_epoch() >= op->map_dne_bound) {
28e407b8 1656 LingerOp::unique_lock wl{op->watch_lock};
7c673cae
FG
1657 if (op->on_reg_commit) {
1658 op->on_reg_commit->complete(-ENOENT);
28e407b8
AA
1659 op->on_reg_commit = nullptr;
1660 }
1661 if (op->on_notify_finish) {
1662 op->on_notify_finish->complete(-ENOENT);
1663 op->on_notify_finish = nullptr;
7c673cae
FG
1664 }
1665 *need_unregister = true;
1666 }
1667 } else {
1668 _send_linger_map_check(op);
1669 }
1670}
1671
1672void Objecter::_send_linger_map_check(LingerOp *op)
1673{
1674 // ask the monitor
1675 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1676 op->get();
1677 check_latest_map_lingers[op->linger_id] = op;
1678 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1679 monc->get_version("osdmap", &c->latest, NULL, c);
1680 }
1681}
1682
1683void Objecter::_linger_cancel_map_check(LingerOp *op)
1684{
1685 // rwlock is locked unique
1686
1687 map<uint64_t, LingerOp*>::iterator iter =
1688 check_latest_map_lingers.find(op->linger_id);
1689 if (iter != check_latest_map_lingers.end()) {
1690 LingerOp *op = iter->second;
1691 op->put();
1692 check_latest_map_lingers.erase(iter);
1693 }
1694}
1695
1696// command pool check
1697
1698void Objecter::C_Command_Map_Latest::finish(int r)
1699{
1700 if (r == -EAGAIN || r == -ECANCELED) {
1701 // ignore callback; we will retry in resend_mon_ops()
1702 return;
1703 }
1704
1705 unique_lock wl(objecter->rwlock);
1706
1707 map<uint64_t, CommandOp*>::iterator iter =
1708 objecter->check_latest_map_commands.find(tid);
1709 if (iter == objecter->check_latest_map_commands.end()) {
1710 return;
1711 }
1712
1713 CommandOp *c = iter->second;
1714 objecter->check_latest_map_commands.erase(iter);
1715
1716 if (c->map_dne_bound == 0)
1717 c->map_dne_bound = latest;
1718
28e407b8 1719 OSDSession::unique_lock sul(c->session->lock);
7c673cae 1720 objecter->_check_command_map_dne(c);
28e407b8 1721 sul.unlock();
7c673cae
FG
1722
1723 c->put();
1724}
1725
1726void Objecter::_check_command_map_dne(CommandOp *c)
1727{
1728 // rwlock is locked unique
28e407b8 1729 // session is locked unique
7c673cae
FG
1730
1731 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1732 << " current " << osdmap->get_epoch()
1733 << " map_dne_bound " << c->map_dne_bound
1734 << dendl;
1735 if (c->map_dne_bound > 0) {
1736 if (osdmap->get_epoch() >= c->map_dne_bound) {
1737 _finish_command(c, c->map_check_error, c->map_check_error_str);
1738 }
1739 } else {
1740 _send_command_map_check(c);
1741 }
1742}
1743
1744void Objecter::_send_command_map_check(CommandOp *c)
1745{
1746 // rwlock is locked unique
28e407b8 1747 // session is locked unique
7c673cae
FG
1748
1749 // ask the monitor
1750 if (check_latest_map_commands.count(c->tid) == 0) {
1751 c->get();
1752 check_latest_map_commands[c->tid] = c;
1753 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1754 monc->get_version("osdmap", &f->latest, NULL, f);
1755 }
1756}
1757
1758void Objecter::_command_cancel_map_check(CommandOp *c)
1759{
1760 // rwlock is locked uniqe
1761
1762 map<uint64_t, CommandOp*>::iterator iter =
1763 check_latest_map_commands.find(c->tid);
1764 if (iter != check_latest_map_commands.end()) {
1765 CommandOp *c = iter->second;
1766 c->put();
1767 check_latest_map_commands.erase(iter);
1768 }
1769}
1770
1771
1772/**
1773 * Look up OSDSession by OSD id.
1774 *
1775 * @returns 0 on success, or -EAGAIN if the lock context requires
1776 * promotion to write.
1777 */
1778int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1779{
11fdf7f2 1780 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
1781
1782 if (osd < 0) {
1783 *session = homeless_session;
1784 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1785 << dendl;
1786 return 0;
1787 }
1788
1789 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1790 if (p != osd_sessions.end()) {
1791 OSDSession *s = p->second;
1792 s->get();
1793 *session = s;
1794 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1795 << s->get_nref() << dendl;
1796 return 0;
1797 }
1798 if (!sul.owns_lock()) {
1799 return -EAGAIN;
1800 }
1801 OSDSession *s = new OSDSession(cct, osd);
1802 osd_sessions[osd] = s;
11fdf7f2
TL
1803 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1804 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1805 logger->inc(l_osdc_osd_session_open);
1806 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1807 s->get();
1808 *session = s;
1809 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1810 << s->get_nref() << dendl;
1811 return 0;
1812}
1813
1814void Objecter::put_session(Objecter::OSDSession *s)
1815{
1816 if (s && !s->is_homeless()) {
1817 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1818 << s->get_nref() << dendl;
1819 s->put();
1820 }
1821}
1822
1823void Objecter::get_session(Objecter::OSDSession *s)
1824{
11fdf7f2 1825 ceph_assert(s != NULL);
7c673cae
FG
1826
1827 if (!s->is_homeless()) {
1828 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1829 << s->get_nref() << dendl;
1830 s->get();
1831 }
1832}
1833
1834void Objecter::_reopen_session(OSDSession *s)
1835{
91327a77 1836 // rwlock is locked unique
7c673cae
FG
1837 // s->lock is locked
1838
11fdf7f2 1839 auto addrs = osdmap->get_addrs(s->osd);
7c673cae 1840 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
11fdf7f2 1841 << addrs << dendl;
7c673cae
FG
1842 if (s->con) {
1843 s->con->set_priv(NULL);
1844 s->con->mark_down();
1845 logger->inc(l_osdc_osd_session_close);
1846 }
11fdf7f2
TL
1847 s->con = messenger->connect_to_osd(addrs);
1848 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1849 s->incarnation++;
1850 logger->inc(l_osdc_osd_session_open);
1851}
1852
1853void Objecter::close_session(OSDSession *s)
1854{
1855 // rwlock is locked unique
1856
1857 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1858 if (s->con) {
1859 s->con->set_priv(NULL);
1860 s->con->mark_down();
1861 logger->inc(l_osdc_osd_session_close);
1862 }
1863 OSDSession::unique_lock sl(s->lock);
1864
1865 std::list<LingerOp*> homeless_lingers;
1866 std::list<CommandOp*> homeless_commands;
1867 std::list<Op*> homeless_ops;
1868
1869 while (!s->linger_ops.empty()) {
1870 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1871 ldout(cct, 10) << " linger_op " << i->first << dendl;
1872 homeless_lingers.push_back(i->second);
1873 _session_linger_op_remove(s, i->second);
1874 }
1875
1876 while (!s->ops.empty()) {
1877 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1878 ldout(cct, 10) << " op " << i->first << dendl;
1879 homeless_ops.push_back(i->second);
1880 _session_op_remove(s, i->second);
1881 }
1882
1883 while (!s->command_ops.empty()) {
1884 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1885 ldout(cct, 10) << " command_op " << i->first << dendl;
1886 homeless_commands.push_back(i->second);
1887 _session_command_op_remove(s, i->second);
1888 }
1889
1890 osd_sessions.erase(s->osd);
1891 sl.unlock();
1892 put_session(s);
1893
1894 // Assign any leftover ops to the homeless session
1895 {
1896 OSDSession::unique_lock hsl(homeless_session->lock);
1897 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1898 i != homeless_lingers.end(); ++i) {
1899 _session_linger_op_assign(homeless_session, *i);
1900 }
1901 for (std::list<Op*>::iterator i = homeless_ops.begin();
1902 i != homeless_ops.end(); ++i) {
1903 _session_op_assign(homeless_session, *i);
1904 }
1905 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1906 i != homeless_commands.end(); ++i) {
1907 _session_command_op_assign(homeless_session, *i);
1908 }
1909 }
1910
1911 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1912}
1913
9f95a23c 1914void Objecter::wait_for_osd_map(epoch_t e)
7c673cae
FG
1915{
1916 unique_lock l(rwlock);
9f95a23c 1917 if (osdmap->get_epoch() >= e) {
7c673cae
FG
1918 l.unlock();
1919 return;
1920 }
1921
1922 // Leave this since it goes with C_SafeCond
9f95a23c
TL
1923 ceph::mutex lock = ceph::make_mutex("");
1924 ceph::condition_variable cond;
7c673cae 1925 bool done;
9f95a23c
TL
1926 std::unique_lock mlock{lock};
1927 C_SafeCond *context = new C_SafeCond(lock, cond, &done, NULL);
1928 waiting_for_map[e].push_back(pair<Context*, int>(context, 0));
7c673cae 1929 l.unlock();
9f95a23c 1930 cond.wait(mlock, [&done] { return done; });
7c673cae
FG
1931}
1932
1933struct C_Objecter_GetVersion : public Context {
1934 Objecter *objecter;
1935 uint64_t oldest, newest;
1936 Context *fin;
1937 C_Objecter_GetVersion(Objecter *o, Context *c)
1938 : objecter(o), oldest(0), newest(0), fin(c) {}
1939 void finish(int r) override {
1940 if (r >= 0) {
1941 objecter->get_latest_version(oldest, newest, fin);
1942 } else if (r == -EAGAIN) { // try again as instructed
1943 objecter->wait_for_latest_osdmap(fin);
1944 } else {
1945 // it doesn't return any other error codes!
1946 ceph_abort();
1947 }
1948 }
1949};
1950
1951void Objecter::wait_for_latest_osdmap(Context *fin)
1952{
1953 ldout(cct, 10) << __func__ << dendl;
1954 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1955 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1956}
1957
1958void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1959{
1960 unique_lock wl(rwlock);
7c673cae 1961 if (osdmap->get_epoch() >= newest) {
11fdf7f2
TL
1962 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1963 wl.unlock();
7c673cae
FG
1964 if (fin)
1965 fin->complete(0);
1966 return;
1967 }
1968
1969 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1970 _wait_for_new_map(fin, newest, 0);
1971}
1972
1973void Objecter::maybe_request_map()
1974{
1975 shared_lock rl(rwlock);
1976 _maybe_request_map();
1977}
1978
1979void Objecter::_maybe_request_map()
1980{
1981 // rwlock is locked
1982 int flag = 0;
1983 if (_osdmap_full_flag()
1984 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1985 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1986 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1987 "osd map (FULL flag is set)" << dendl;
1988 } else {
1989 ldout(cct, 10)
1990 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1991 flag = CEPH_SUBSCRIBE_ONETIME;
1992 }
1993 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1994 if (monc->sub_want("osdmap", epoch, flag)) {
1995 monc->renew_subs();
1996 }
1997}
1998
1999void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
2000{
2001 // rwlock is locked unique
2002 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
2003 _maybe_request_map();
2004}
2005
2006
2007/**
2008 * Use this together with wait_for_map: this is a pre-check to avoid
2009 * allocating a Context for wait_for_map if we can see that we
2010 * definitely already have the epoch.
2011 *
2012 * This does *not* replace the need to handle the return value of
2013 * wait_for_map: just because we don't have it in this pre-check
2014 * doesn't mean we won't have it when calling back into wait_for_map,
2015 * since the objecter lock is dropped in between.
2016 */
2017bool Objecter::have_map(const epoch_t epoch)
2018{
2019 shared_lock rl(rwlock);
2020 if (osdmap->get_epoch() >= epoch) {
2021 return true;
2022 } else {
2023 return false;
2024 }
2025}
2026
2027bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2028{
2029 unique_lock wl(rwlock);
2030 if (osdmap->get_epoch() >= epoch) {
2031 return true;
2032 }
2033 _wait_for_new_map(c, epoch, err);
2034 return false;
2035}
2036
7c673cae
FG
2037void Objecter::_kick_requests(OSDSession *session,
2038 map<uint64_t, LingerOp *>& lresend)
2039{
2040 // rwlock is locked unique
2041
2042 // clear backoffs
2043 session->backoffs.clear();
2044 session->backoffs_by_id.clear();
2045
2046 // resend ops
2047 map<ceph_tid_t,Op*> resend; // resend in tid order
2048 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2049 p != session->ops.end();) {
2050 Op *op = p->second;
2051 ++p;
7c673cae
FG
2052 if (op->should_resend) {
2053 if (!op->target.paused)
2054 resend[op->tid] = op;
2055 } else {
2056 _op_cancel_map_check(op);
2057 _cancel_linger_op(op);
2058 }
2059 }
2060
11fdf7f2 2061 logger->inc(l_osdc_op_resend, resend.size());
7c673cae
FG
2062 while (!resend.empty()) {
2063 _send_op(resend.begin()->second);
2064 resend.erase(resend.begin());
2065 }
2066
2067 // resend lingers
11fdf7f2 2068 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
7c673cae
FG
2069 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2070 j != session->linger_ops.end(); ++j) {
2071 LingerOp *op = j->second;
2072 op->get();
11fdf7f2 2073 ceph_assert(lresend.count(j->first) == 0);
7c673cae
FG
2074 lresend[j->first] = op;
2075 }
2076
2077 // resend commands
11fdf7f2 2078 logger->inc(l_osdc_command_resend, session->command_ops.size());
7c673cae
FG
2079 map<uint64_t,CommandOp*> cresend; // resend in order
2080 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2081 k != session->command_ops.end(); ++k) {
7c673cae
FG
2082 cresend[k->first] = k->second;
2083 }
2084 while (!cresend.empty()) {
2085 _send_command(cresend.begin()->second);
2086 cresend.erase(cresend.begin());
2087 }
2088}
2089
2090void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2091 unique_lock& ul)
2092{
11fdf7f2 2093 ceph_assert(ul.owns_lock());
7c673cae
FG
2094 shunique_lock sul(std::move(ul));
2095 while (!lresend.empty()) {
2096 LingerOp *op = lresend.begin()->second;
2097 if (!op->canceled) {
2098 _send_linger(op, sul);
2099 }
2100 op->put();
2101 lresend.erase(lresend.begin());
2102 }
11fdf7f2 2103 ul = sul.release_to_unique();
7c673cae
FG
2104}
2105
2106void Objecter::start_tick()
2107{
11fdf7f2 2108 ceph_assert(tick_event == 0);
7c673cae
FG
2109 tick_event =
2110 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2111 &Objecter::tick, this);
2112}
2113
2114void Objecter::tick()
2115{
2116 shared_lock rl(rwlock);
2117
2118 ldout(cct, 10) << "tick" << dendl;
2119
2120 // we are only called by C_Tick
2121 tick_event = 0;
2122
31f18b77 2123 if (!initialized) {
7c673cae
FG
2124 // we raced with shutdown
2125 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2126 return;
2127 }
2128
2129 set<OSDSession*> toping;
2130
2131
2132 // look for laggy requests
11fdf7f2 2133 auto cutoff = ceph::coarse_mono_clock::now();
7c673cae
FG
2134 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2135
2136 unsigned laggy_ops = 0;
2137
2138 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2139 siter != osd_sessions.end(); ++siter) {
2140 OSDSession *s = siter->second;
2141 OSDSession::lock_guard l(s->lock);
2142 bool found = false;
2143 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2144 p != s->ops.end();
2145 ++p) {
2146 Op *op = p->second;
11fdf7f2 2147 ceph_assert(op->session);
7c673cae
FG
2148 if (op->stamp < cutoff) {
2149 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2150 << " is laggy" << dendl;
2151 found = true;
2152 ++laggy_ops;
2153 }
2154 }
2155 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2156 p != s->linger_ops.end();
2157 ++p) {
2158 LingerOp *op = p->second;
2159 LingerOp::unique_lock wl(op->watch_lock);
11fdf7f2 2160 ceph_assert(op->session);
7c673cae
FG
2161 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2162 << " (osd." << op->session->osd << ")" << dendl;
2163 found = true;
2164 if (op->is_watch && op->registered && !op->last_error)
2165 _send_linger_ping(op);
2166 }
2167 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2168 p != s->command_ops.end();
2169 ++p) {
2170 CommandOp *op = p->second;
11fdf7f2 2171 ceph_assert(op->session);
7c673cae
FG
2172 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2173 << " (osd." << op->session->osd << ")" << dendl;
2174 found = true;
2175 }
2176 if (found)
2177 toping.insert(s);
2178 }
31f18b77 2179 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2180 _maybe_request_map();
2181 }
2182
2183 logger->set(l_osdc_op_laggy, laggy_ops);
2184 logger->set(l_osdc_osd_laggy, toping.size());
2185
2186 if (!toping.empty()) {
2187 // send a ping to these osds, to ensure we detect any session resets
2188 // (osd reply message policy is lossy)
2189 for (set<OSDSession*>::const_iterator i = toping.begin();
2190 i != toping.end();
2191 ++i) {
2192 (*i)->con->send_message(new MPing);
2193 }
2194 }
2195
11fdf7f2 2196 // Make sure we don't reschedule if we wake up after shutdown
31f18b77 2197 if (initialized) {
7c673cae
FG
2198 tick_event = timer.reschedule_me(ceph::make_timespan(
2199 cct->_conf->objecter_tick_interval));
2200 }
2201}
2202
2203void Objecter::resend_mon_ops()
2204{
2205 unique_lock wl(rwlock);
2206
2207 ldout(cct, 10) << "resend_mon_ops" << dendl;
2208
2209 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2210 p != poolstat_ops.end();
2211 ++p) {
2212 _poolstat_submit(p->second);
2213 logger->inc(l_osdc_poolstat_resend);
2214 }
2215
2216 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2217 p != statfs_ops.end();
2218 ++p) {
2219 _fs_stats_submit(p->second);
2220 logger->inc(l_osdc_statfs_resend);
2221 }
2222
2223 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2224 p != pool_ops.end();
2225 ++p) {
2226 _pool_op_submit(p->second);
2227 logger->inc(l_osdc_poolop_resend);
2228 }
2229
2230 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2231 p != check_latest_map_ops.end();
2232 ++p) {
2233 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2234 monc->get_version("osdmap", &c->latest, NULL, c);
2235 }
2236
2237 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2238 p != check_latest_map_lingers.end();
2239 ++p) {
2240 C_Linger_Map_Latest *c
2241 = new C_Linger_Map_Latest(this, p->second->linger_id);
2242 monc->get_version("osdmap", &c->latest, NULL, c);
2243 }
2244
2245 for (map<uint64_t, CommandOp*>::iterator p
2246 = check_latest_map_commands.begin();
2247 p != check_latest_map_commands.end();
2248 ++p) {
2249 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2250 monc->get_version("osdmap", &c->latest, NULL, c);
2251 }
2252}
2253
2254// read | write ---------------------------
2255
2256void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2257{
2258 shunique_lock rl(rwlock, ceph::acquire_shared);
2259 ceph_tid_t tid = 0;
2260 if (!ptid)
2261 ptid = &tid;
31f18b77 2262 op->trace.event("op submit");
7c673cae
FG
2263 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2264}
2265
2266void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2267 ceph_tid_t *ptid,
2268 int *ctx_budget)
2269{
11fdf7f2 2270 ceph_assert(initialized);
7c673cae 2271
11fdf7f2
TL
2272 ceph_assert(op->ops.size() == op->out_bl.size());
2273 ceph_assert(op->ops.size() == op->out_rval.size());
2274 ceph_assert(op->ops.size() == op->out_handler.size());
7c673cae
FG
2275
2276 // throttle. before we look at any state, because
2277 // _take_op_budget() may drop our lock while it blocks.
2278 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2279 int op_budget = _take_op_budget(op, sul);
2280 // take and pass out the budget for the first OP
2281 // in the context session
2282 if (ctx_budget && (*ctx_budget == -1)) {
2283 *ctx_budget = op_budget;
2284 }
2285 }
2286
2287 if (osd_timeout > timespan(0)) {
2288 if (op->tid == 0)
31f18b77 2289 op->tid = ++last_tid;
7c673cae
FG
2290 auto tid = op->tid;
2291 op->ontimeout = timer.add_event(osd_timeout,
2292 [this, tid]() {
2293 op_cancel(tid, -ETIMEDOUT); });
2294 }
2295
2296 _op_submit(op, sul, ptid);
2297}
2298
2299void Objecter::_send_op_account(Op *op)
2300{
31f18b77 2301 inflight_ops++;
7c673cae
FG
2302
2303 // add to gather set(s)
2304 if (op->onfinish) {
31f18b77 2305 num_in_flight++;
7c673cae
FG
2306 } else {
2307 ldout(cct, 20) << " note: not requesting reply" << dendl;
2308 }
2309
2310 logger->inc(l_osdc_op_active);
2311 logger->inc(l_osdc_op);
2312
2313 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2314 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2315 logger->inc(l_osdc_op_rmw);
2316 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2317 logger->inc(l_osdc_op_w);
2318 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2319 logger->inc(l_osdc_op_r);
2320
2321 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2322 logger->inc(l_osdc_op_pg);
2323
2324 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2325 int code = l_osdc_osdop_other;
2326 switch (p->op.op) {
2327 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2328 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2329 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2330 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2331 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2332 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2333 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2334 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2335 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2336 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2337 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2338 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2339 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2340 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2341 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2342 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2343 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
7c673cae
FG
2344
2345 // OMAP read operations
2346 case CEPH_OSD_OP_OMAPGETVALS:
2347 case CEPH_OSD_OP_OMAPGETKEYS:
2348 case CEPH_OSD_OP_OMAPGETHEADER:
2349 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2350 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2351
2352 // OMAP write operations
2353 case CEPH_OSD_OP_OMAPSETVALS:
2354 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2355
2356 // OMAP del operations
2357 case CEPH_OSD_OP_OMAPCLEAR:
2358 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2359
2360 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2361 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2362 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2363 }
2364 if (code)
2365 logger->inc(code);
2366 }
2367}
2368
2369void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2370{
2371 // rwlock is locked
2372
2373 ldout(cct, 10) << __func__ << " op " << op << dendl;
2374
2375 // pick target
11fdf7f2 2376 ceph_assert(op->session == NULL);
7c673cae
FG
2377 OSDSession *s = NULL;
2378
2379 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2380 == RECALC_OP_TARGET_POOL_DNE;
2381
2382 // Try to get a session, including a retry if we need to take write lock
2383 int r = _get_session(op->target.osd, &s, sul);
2384 if (r == -EAGAIN ||
11fdf7f2
TL
2385 (check_for_latest_map && sul.owns_lock_shared()) ||
2386 cct->_conf->objecter_debug_inject_relock_delay) {
7c673cae
FG
2387 epoch_t orig_epoch = osdmap->get_epoch();
2388 sul.unlock();
2389 if (cct->_conf->objecter_debug_inject_relock_delay) {
2390 sleep(1);
2391 }
2392 sul.lock();
2393 if (orig_epoch != osdmap->get_epoch()) {
2394 // map changed; recalculate mapping
2395 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2396 << dendl;
2397 check_for_latest_map = _calc_target(&op->target, nullptr)
2398 == RECALC_OP_TARGET_POOL_DNE;
2399 if (s) {
2400 put_session(s);
2401 s = NULL;
2402 r = -EAGAIN;
2403 }
2404 }
2405 }
2406 if (r == -EAGAIN) {
11fdf7f2 2407 ceph_assert(s == NULL);
7c673cae
FG
2408 r = _get_session(op->target.osd, &s, sul);
2409 }
11fdf7f2
TL
2410 ceph_assert(r == 0);
2411 ceph_assert(s); // may be homeless
7c673cae
FG
2412
2413 _send_op_account(op);
2414
2415 // send?
2416
11fdf7f2 2417 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
7c673cae 2418
9f95a23c 2419 if (pool_full_try) {
7c673cae
FG
2420 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2421 }
2422
2423 bool need_send = false;
9f95a23c
TL
2424 if (op->target.paused) {
2425 ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
7c673cae 2426 << dendl;
7c673cae
FG
2427 _maybe_request_map();
2428 } else if (!s->is_homeless()) {
2429 need_send = true;
2430 } else {
2431 _maybe_request_map();
2432 }
2433
7c673cae
FG
2434 OSDSession::unique_lock sl(s->lock);
2435 if (op->tid == 0)
31f18b77 2436 op->tid = ++last_tid;
7c673cae
FG
2437
2438 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2439 << " '" << op->target.base_oloc << "' '"
2440 << op->target.target_oloc << "' " << op->ops << " tid "
2441 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2442 << dendl;
2443
2444 _session_op_assign(s, op);
2445
2446 if (need_send) {
11fdf7f2 2447 _send_op(op);
7c673cae
FG
2448 }
2449
2450 // Last chance to touch Op here, after giving up session lock it can
2451 // be freed at any time by response handler.
2452 ceph_tid_t tid = op->tid;
2453 if (check_for_latest_map) {
2454 _send_op_map_check(op);
2455 }
2456 if (ptid)
2457 *ptid = tid;
2458 op = NULL;
2459
2460 sl.unlock();
2461 put_session(s);
2462
31f18b77 2463 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2464}
2465
2466int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2467{
11fdf7f2 2468 ceph_assert(initialized);
7c673cae
FG
2469
2470 OSDSession::unique_lock sl(s->lock);
2471
2472 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2473 if (p == s->ops.end()) {
2474 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2475 << s->osd << dendl;
2476 return -ENOENT;
2477 }
2478
11fdf7f2 2479#if 0
7c673cae 2480 if (s->con) {
9f95a23c 2481 ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
7c673cae
FG
2482 << " on " << s->con << dendl;
2483 s->con->revoke_rx_buffer(tid);
2484 }
11fdf7f2 2485#endif
7c673cae
FG
2486
2487 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2488 << dendl;
2489 Op *op = p->second;
2490 if (op->onfinish) {
31f18b77 2491 num_in_flight--;
7c673cae
FG
2492 op->onfinish->complete(r);
2493 op->onfinish = NULL;
2494 }
2495 _op_cancel_map_check(op);
2496 _finish_op(op, r);
2497 sl.unlock();
2498
2499 return 0;
2500}
2501
2502int Objecter::op_cancel(ceph_tid_t tid, int r)
2503{
2504 int ret = 0;
2505
2506 unique_lock wl(rwlock);
2507 ret = _op_cancel(tid, r);
2508
2509 return ret;
2510}
2511
94b18763
FG
2512int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2513{
2514 unique_lock wl(rwlock);
2515 ldout(cct,10) << __func__ << " " << tids << dendl;
2516 for (auto tid : tids) {
2517 _op_cancel(tid, r);
2518 }
2519 return 0;
2520}
2521
7c673cae
FG
2522int Objecter::_op_cancel(ceph_tid_t tid, int r)
2523{
2524 int ret = 0;
2525
2526 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2527 << dendl;
2528
2529start:
2530
2531 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2532 siter != osd_sessions.end(); ++siter) {
2533 OSDSession *s = siter->second;
2534 OSDSession::shared_lock sl(s->lock);
2535 if (s->ops.find(tid) != s->ops.end()) {
2536 sl.unlock();
2537 ret = op_cancel(s, tid, r);
2538 if (ret == -ENOENT) {
2539 /* oh no! raced, maybe tid moved to another session, restarting */
2540 goto start;
2541 }
2542 return ret;
2543 }
2544 }
2545
2546 ldout(cct, 5) << __func__ << ": tid " << tid
2547 << " not found in live sessions" << dendl;
2548
2549 // Handle case where the op is in homeless session
2550 OSDSession::shared_lock sl(homeless_session->lock);
2551 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2552 sl.unlock();
2553 ret = op_cancel(homeless_session, tid, r);
2554 if (ret == -ENOENT) {
2555 /* oh no! raced, maybe tid moved to another session, restarting */
2556 goto start;
2557 } else {
2558 return ret;
2559 }
2560 } else {
2561 sl.unlock();
2562 }
2563
2564 ldout(cct, 5) << __func__ << ": tid " << tid
2565 << " not found in homeless session" << dendl;
2566
2567 return ret;
2568}
2569
2570
2571epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2572{
2573 unique_lock wl(rwlock);
2574
2575 std::vector<ceph_tid_t> to_cancel;
2576 bool found = false;
2577
2578 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2579 siter != osd_sessions.end(); ++siter) {
2580 OSDSession *s = siter->second;
2581 OSDSession::shared_lock sl(s->lock);
2582 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2583 op_i != s->ops.end(); ++op_i) {
2584 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2585 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2586 to_cancel.push_back(op_i->first);
2587 }
2588 }
2589 sl.unlock();
2590
2591 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2592 titer != to_cancel.end();
2593 ++titer) {
2594 int cancel_result = op_cancel(s, *titer, r);
2595 // We hold rwlock across search and cancellation, so cancels
2596 // should always succeed
11fdf7f2 2597 ceph_assert(cancel_result == 0);
7c673cae
FG
2598 }
2599 if (!found && to_cancel.size())
2600 found = true;
2601 to_cancel.clear();
2602 }
2603
2604 const epoch_t epoch = osdmap->get_epoch();
2605
2606 wl.unlock();
2607
2608 if (found) {
2609 return epoch;
2610 } else {
2611 return -1;
2612 }
2613}
2614
2615bool Objecter::is_pg_changed(
2616 int oldprimary,
2617 const vector<int>& oldacting,
2618 int newprimary,
2619 const vector<int>& newacting,
2620 bool any_change)
2621{
9f95a23c 2622 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
7c673cae
FG
2623 oldprimary,
2624 oldacting,
2625 newprimary,
2626 newacting))
2627 return true;
2628 if (any_change && oldacting != newacting)
2629 return true;
2630 return false; // same primary (tho replicas may have changed)
2631}
2632
2633bool Objecter::target_should_be_paused(op_target_t *t)
2634{
2635 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2636 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2637 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
9f95a23c 2638 (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
7c673cae
FG
2639
2640 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2641 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2642 (osdmap->get_epoch() < epoch_barrier);
2643}
2644
2645/**
2646 * Locking public accessor for _osdmap_full_flag
2647 */
2648bool Objecter::osdmap_full_flag() const
2649{
2650 shared_lock rl(rwlock);
2651
2652 return _osdmap_full_flag();
2653}
2654
2655bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2656{
2657 shared_lock rl(rwlock);
2658
2659 if (_osdmap_full_flag()) {
2660 return true;
2661 }
2662
2663 return _osdmap_pool_full(pool_id);
2664}
2665
2666bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2667{
2668 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2669 if (pool == NULL) {
2670 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2671 return false;
2672 }
2673
2674 return _osdmap_pool_full(*pool);
2675}
2676
2677bool Objecter::_osdmap_has_pool_full() const
2678{
2679 for (map<int64_t, pg_pool_t>::const_iterator it
2680 = osdmap->get_pools().begin();
2681 it != osdmap->get_pools().end(); ++it) {
2682 if (_osdmap_pool_full(it->second))
2683 return true;
2684 }
2685 return false;
2686}
2687
7c673cae
FG
2688/**
2689 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2690 */
2691bool Objecter::_osdmap_full_flag() const
2692{
11fdf7f2 2693 // Ignore the FULL flag if the caller does not have honor_osdmap_full
9f95a23c 2694 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
7c673cae
FG
2695}
2696
2697void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2698{
2699 for (map<int64_t, pg_pool_t>::const_iterator it
2700 = osdmap->get_pools().begin();
2701 it != osdmap->get_pools().end(); ++it) {
2702 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2703 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2704 } else {
2705 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2706 pool_full_map[it->first];
2707 }
2708 }
2709}
2710
2711int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2712 const string& ns)
2713{
2714 shared_lock rl(rwlock);
2715 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2716 if (!p)
2717 return -ENOENT;
2718 return p->hash_key(key, ns);
2719}
2720
2721int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2722 const string& ns)
2723{
2724 shared_lock rl(rwlock);
2725 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2726 if (!p)
2727 return -ENOENT;
2728 return p->raw_hash_to_pg(p->hash_key(key, ns));
2729}
2730
11fdf7f2
TL
2731void Objecter::_prune_snapc(
2732 const mempool::osdmap::map<int64_t,
9f95a23c 2733 snap_interval_set_t>& new_removed_snaps,
11fdf7f2
TL
2734 Op *op)
2735{
2736 bool match = false;
2737 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2738 if (i != new_removed_snaps.end()) {
2739 for (auto s : op->snapc.snaps) {
2740 if (i->second.contains(s)) {
2741 match = true;
2742 break;
2743 }
2744 }
2745 if (match) {
2746 vector<snapid_t> new_snaps;
2747 for (auto s : op->snapc.snaps) {
2748 if (!i->second.contains(s)) {
2749 new_snaps.push_back(s);
2750 }
2751 }
2752 op->snapc.snaps.swap(new_snaps);
2753 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2754 << " (was " << new_snaps << ")" << dendl;
2755 }
2756 }
2757}
2758
7c673cae
FG
2759int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2760{
2761 // rwlock is locked
2762 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2763 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2764 t->epoch = osdmap->get_epoch();
2765 ldout(cct,20) << __func__ << " epoch " << t->epoch
2766 << " base " << t->base_oid << " " << t->base_oloc
2767 << " precalc_pgid " << (int)t->precalc_pgid
2768 << " pgid " << t->base_pgid
2769 << (is_read ? " is_read" : "")
2770 << (is_write ? " is_write" : "")
2771 << dendl;
2772
2773 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2774 if (!pi) {
2775 t->osd = -1;
2776 return RECALC_OP_TARGET_POOL_DNE;
2777 }
2778 ldout(cct,30) << __func__ << " base pi " << pi
2779 << " pg_num " << pi->get_pg_num() << dendl;
2780
2781 bool force_resend = false;
2782 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2783 if (t->last_force_resend < pi->last_force_op_resend) {
2784 t->last_force_resend = pi->last_force_op_resend;
2785 force_resend = true;
2786 } else if (t->last_force_resend == 0) {
2787 force_resend = true;
2788 }
2789 }
2790
2791 // apply tiering
2792 t->target_oid = t->base_oid;
2793 t->target_oloc = t->base_oloc;
2794 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2795 if (is_read && pi->has_read_tier())
2796 t->target_oloc.pool = pi->read_tier;
2797 if (is_write && pi->has_write_tier())
2798 t->target_oloc.pool = pi->write_tier;
2799 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2800 if (!pi) {
2801 t->osd = -1;
2802 return RECALC_OP_TARGET_POOL_DNE;
2803 }
2804 }
2805
2806 pg_t pgid;
2807 if (t->precalc_pgid) {
11fdf7f2
TL
2808 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2809 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2810 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
7c673cae
FG
2811 pgid = t->base_pgid;
2812 } else {
2813 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2814 pgid);
2815 if (ret == -ENOENT) {
2816 t->osd = -1;
2817 return RECALC_OP_TARGET_POOL_DNE;
2818 }
2819 }
2820 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2821 << t->target_oloc << " -> pgid " << pgid << dendl;
2822 ldout(cct,30) << __func__ << " target pi " << pi
2823 << " pg_num " << pi->get_pg_num() << dendl;
2824 t->pool_ever_existed = true;
2825
2826 int size = pi->size;
2827 int min_size = pi->min_size;
2828 unsigned pg_num = pi->get_pg_num();
9f95a23c 2829 unsigned pg_num_mask = pi->get_pg_num_mask();
11fdf7f2 2830 unsigned pg_num_pending = pi->get_pg_num_pending();
7c673cae
FG
2831 int up_primary, acting_primary;
2832 vector<int> up, acting;
9f95a23c
TL
2833 ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2834 pg_t actual_pgid(actual_ps, pgid.pool());
2835 pg_mapping_t pg_mapping;
2836 pg_mapping.epoch = osdmap->get_epoch();
2837 if (lookup_pg_mapping(actual_pgid, &pg_mapping)) {
2838 up = pg_mapping.up;
2839 up_primary = pg_mapping.up_primary;
2840 acting = pg_mapping.acting;
2841 acting_primary = pg_mapping.acting_primary;
2842 } else {
2843 osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2844 &acting, &acting_primary);
2845 pg_mapping_t pg_mapping(osdmap->get_epoch(),
2846 up, up_primary, acting, acting_primary);
2847 update_pg_mapping(actual_pgid, std::move(pg_mapping));
2848 }
7c673cae 2849 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2850 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2851 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2852 pg_t prev_pgid(prev_seed, pgid.pool());
2853 if (any_change && PastIntervals::is_new_interval(
2854 t->acting_primary,
2855 acting_primary,
2856 t->acting,
2857 acting,
2858 t->up_primary,
2859 up_primary,
2860 t->up,
2861 up,
2862 t->size,
2863 size,
2864 t->min_size,
2865 min_size,
2866 t->pg_num,
2867 pg_num,
11fdf7f2
TL
2868 t->pg_num_pending,
2869 pg_num_pending,
7c673cae
FG
2870 t->sort_bitwise,
2871 sort_bitwise,
c07f9fc5
FG
2872 t->recovery_deletes,
2873 recovery_deletes,
7c673cae
FG
2874 prev_pgid)) {
2875 force_resend = true;
2876 }
2877
2878 bool unpaused = false;
f64942e4
AA
2879 bool should_be_paused = target_should_be_paused(t);
2880 if (t->paused && !should_be_paused) {
7c673cae
FG
2881 unpaused = true;
2882 }
9f95a23c
TL
2883 if (t->paused != should_be_paused) {
2884 ldout(cct, 10) << __func__ << " paused " << t->paused
2885 << " -> " << should_be_paused << dendl;
2886 t->paused = should_be_paused;
2887 }
7c673cae
FG
2888
2889 bool legacy_change =
2890 t->pgid != pgid ||
2891 is_pg_changed(
2892 t->acting_primary, t->acting, acting_primary, acting,
2893 t->used_replica || any_change);
11fdf7f2 2894 bool split_or_merge = false;
7c673cae 2895 if (t->pg_num) {
11fdf7f2
TL
2896 split_or_merge =
2897 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2898 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2899 prev_pgid.is_merge_target(t->pg_num, pg_num);
7c673cae
FG
2900 }
2901
11fdf7f2 2902 if (legacy_change || split_or_merge || force_resend) {
7c673cae
FG
2903 t->pgid = pgid;
2904 t->acting = acting;
2905 t->acting_primary = acting_primary;
2906 t->up_primary = up_primary;
2907 t->up = up;
2908 t->size = size;
2909 t->min_size = min_size;
2910 t->pg_num = pg_num;
9f95a23c 2911 t->pg_num_mask = pg_num_mask;
11fdf7f2 2912 t->pg_num_pending = pg_num_pending;
9f95a23c
TL
2913 spg_t spgid(actual_pgid);
2914 if (pi->is_erasure()) {
2915 for (uint8_t i = 0; i < acting.size(); ++i) {
2916 if (acting[i] == acting_primary) {
2917 spgid.reset_shard(shard_id_t(i));
2918 break;
2919 }
2920 }
2921 }
2922 t->actual_pgid = spgid;
7c673cae 2923 t->sort_bitwise = sort_bitwise;
c07f9fc5 2924 t->recovery_deletes = recovery_deletes;
7c673cae
FG
2925 ldout(cct, 10) << __func__ << " "
2926 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2927 << " acting " << acting
2928 << " primary " << acting_primary << dendl;
2929 t->used_replica = false;
2930 if (acting_primary == -1) {
2931 t->osd = -1;
2932 } else {
2933 int osd;
2934 bool read = is_read && !is_write;
2935 if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2936 int p = rand() % acting.size();
2937 if (p)
2938 t->used_replica = true;
2939 osd = acting[p];
2940 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2941 << dendl;
2942 } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2943 acting.size() > 1) {
2944 // look for a local replica. prefer the primary if the
2945 // distance is the same.
2946 int best = -1;
2947 int best_locality = 0;
2948 for (unsigned i = 0; i < acting.size(); ++i) {
2949 int locality = osdmap->crush->get_common_ancestor_distance(
2950 cct, acting[i], crush_location);
2951 ldout(cct, 20) << __func__ << " localize: rank " << i
2952 << " osd." << acting[i]
2953 << " locality " << locality << dendl;
2954 if (i == 0 ||
2955 (locality >= 0 && best_locality >= 0 &&
2956 locality < best_locality) ||
2957 (best_locality < 0 && locality >= 0)) {
2958 best = i;
2959 best_locality = locality;
2960 if (i)
2961 t->used_replica = true;
2962 }
2963 }
11fdf7f2 2964 ceph_assert(best >= 0);
7c673cae
FG
2965 osd = acting[best];
2966 } else {
2967 osd = acting_primary;
2968 }
2969 t->osd = osd;
2970 }
2971 }
2972 if (legacy_change || unpaused || force_resend) {
2973 return RECALC_OP_TARGET_NEED_RESEND;
2974 }
11fdf7f2 2975 if (split_or_merge &&
9f95a23c 2976 (osdmap->require_osd_release >= ceph_release_t::luminous ||
91327a77
AA
2977 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
2978 RESEND_ON_SPLIT))) {
7c673cae
FG
2979 return RECALC_OP_TARGET_NEED_RESEND;
2980 }
2981 return RECALC_OP_TARGET_NO_ACTION;
2982}
2983
2984int Objecter::_map_session(op_target_t *target, OSDSession **s,
2985 shunique_lock& sul)
2986{
2987 _calc_target(target, nullptr);
2988 return _get_session(target->osd, s, sul);
2989}
2990
2991void Objecter::_session_op_assign(OSDSession *to, Op *op)
2992{
2993 // to->lock is locked
11fdf7f2
TL
2994 ceph_assert(op->session == NULL);
2995 ceph_assert(op->tid);
7c673cae
FG
2996
2997 get_session(to);
2998 op->session = to;
2999 to->ops[op->tid] = op;
3000
3001 if (to->is_homeless()) {
31f18b77 3002 num_homeless_ops++;
7c673cae
FG
3003 }
3004
3005 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3006}
3007
3008void Objecter::_session_op_remove(OSDSession *from, Op *op)
3009{
11fdf7f2 3010 ceph_assert(op->session == from);
7c673cae
FG
3011 // from->lock is locked
3012
3013 if (from->is_homeless()) {
31f18b77 3014 num_homeless_ops--;
7c673cae
FG
3015 }
3016
3017 from->ops.erase(op->tid);
3018 put_session(from);
3019 op->session = NULL;
3020
3021 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3022}
3023
3024void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3025{
3026 // to lock is locked unique
11fdf7f2 3027 ceph_assert(op->session == NULL);
7c673cae
FG
3028
3029 if (to->is_homeless()) {
31f18b77 3030 num_homeless_ops++;
7c673cae
FG
3031 }
3032
3033 get_session(to);
3034 op->session = to;
3035 to->linger_ops[op->linger_id] = op;
3036
3037 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3038 << dendl;
3039}
3040
3041void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3042{
11fdf7f2 3043 ceph_assert(from == op->session);
7c673cae
FG
3044 // from->lock is locked unique
3045
3046 if (from->is_homeless()) {
31f18b77 3047 num_homeless_ops--;
7c673cae
FG
3048 }
3049
3050 from->linger_ops.erase(op->linger_id);
3051 put_session(from);
3052 op->session = NULL;
3053
3054 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3055 << dendl;
3056}
3057
3058void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3059{
11fdf7f2 3060 ceph_assert(from == op->session);
7c673cae
FG
3061 // from->lock is locked
3062
3063 if (from->is_homeless()) {
31f18b77 3064 num_homeless_ops--;
7c673cae
FG
3065 }
3066
3067 from->command_ops.erase(op->tid);
3068 put_session(from);
3069 op->session = NULL;
3070
3071 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3072}
3073
3074void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3075{
3076 // to->lock is locked
11fdf7f2
TL
3077 ceph_assert(op->session == NULL);
3078 ceph_assert(op->tid);
7c673cae
FG
3079
3080 if (to->is_homeless()) {
31f18b77 3081 num_homeless_ops++;
7c673cae
FG
3082 }
3083
3084 get_session(to);
3085 op->session = to;
3086 to->command_ops[op->tid] = op;
3087
3088 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3089}
3090
3091int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3092 shunique_lock& sul)
3093{
3094 // rwlock is locked unique
3095
3096 int r = _calc_target(&linger_op->target, nullptr, true);
3097 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3098 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3099 << " pgid " << linger_op->target.pgid
3100 << " acting " << linger_op->target.acting << dendl;
3101
3102 OSDSession *s = NULL;
3103 r = _get_session(linger_op->target.osd, &s, sul);
11fdf7f2 3104 ceph_assert(r == 0);
7c673cae
FG
3105
3106 if (linger_op->session != s) {
3107 // NB locking two sessions (s and linger_op->session) at the
3108 // same time here is only safe because we are the only one that
3109 // takes two, and we are holding rwlock for write. Disable
3110 // lockdep because it doesn't know that.
3111 OSDSession::unique_lock sl(s->lock);
3112 _session_linger_op_remove(linger_op->session, linger_op);
3113 _session_linger_op_assign(s, linger_op);
3114 }
3115
3116 put_session(s);
3117 return RECALC_OP_TARGET_NEED_RESEND;
3118 }
3119 return r;
3120}
3121
3122void Objecter::_cancel_linger_op(Op *op)
3123{
3124 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3125
11fdf7f2 3126 ceph_assert(!op->should_resend);
7c673cae
FG
3127 if (op->onfinish) {
3128 delete op->onfinish;
31f18b77 3129 num_in_flight--;
7c673cae
FG
3130 }
3131
3132 _finish_op(op, 0);
3133}
3134
3135void Objecter::_finish_op(Op *op, int r)
3136{
11fdf7f2 3137 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
7c673cae
FG
3138
3139 // op->session->lock is locked unique or op->session is null
3140
11fdf7f2
TL
3141 if (!op->ctx_budgeted && op->budget >= 0) {
3142 put_op_budget_bytes(op->budget);
3143 op->budget = -1;
3144 }
7c673cae
FG
3145
3146 if (op->ontimeout && r != -ETIMEDOUT)
3147 timer.cancel_event(op->ontimeout);
3148
3149 if (op->session) {
3150 _session_op_remove(op->session, op);
3151 }
3152
3153 logger->dec(l_osdc_op_active);
3154
11fdf7f2 3155 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
7c673cae 3156
31f18b77 3157 inflight_ops--;
7c673cae
FG
3158
3159 op->put();
3160}
3161
7c673cae
FG
3162MOSDOp *Objecter::_prepare_osd_op(Op *op)
3163{
3164 // rwlock is locked
3165
3166 int flags = op->target.flags;
3167 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3168
3169 // Nothing checks this any longer, but needed for compatibility with
3170 // pre-luminous osds
3171 flags |= CEPH_OSD_FLAG_ONDISK;
3172
9f95a23c 3173 if (!honor_pool_full)
7c673cae
FG
3174 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3175
3176 op->target.paused = false;
11fdf7f2 3177 op->stamp = ceph::coarse_mono_clock::now();
7c673cae
FG
3178
3179 hobject_t hobj = op->target.get_hobj();
31f18b77 3180 MOSDOp *m = new MOSDOp(client_inc, op->tid,
7c673cae
FG
3181 hobj, op->target.actual_pgid,
3182 osdmap->get_epoch(),
3183 flags, op->features);
3184
3185 m->set_snapid(op->snapid);
3186 m->set_snap_seq(op->snapc.seq);
3187 m->set_snaps(op->snapc.snaps);
3188
3189 m->ops = op->ops;
3190 m->set_mtime(op->mtime);
3191 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3192
3193 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3194 op->trace.init("op", &trace_endpoint);
3195 }
7c673cae
FG
3196
3197 if (op->priority)
3198 m->set_priority(op->priority);
3199 else
3200 m->set_priority(cct->_conf->osd_client_op_priority);
3201
3202 if (op->reqid != osd_reqid_t()) {
3203 m->set_reqid(op->reqid);
3204 }
3205
3206 logger->inc(l_osdc_op_send);
b32b8144
FG
3207 ssize_t sum = 0;
3208 for (unsigned i = 0; i < m->ops.size(); i++) {
3209 sum += m->ops[i].indata.length();
3210 }
3211 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3212
3213 return m;
3214}
3215
11fdf7f2 3216void Objecter::_send_op(Op *op)
7c673cae
FG
3217{
3218 // rwlock is locked
3219 // op->session->lock is locked
3220
3221 // backoff?
7c673cae
FG
3222 auto p = op->session->backoffs.find(op->target.actual_pgid);
3223 if (p != op->session->backoffs.end()) {
b32b8144 3224 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3225 auto q = p->second.lower_bound(hoid);
3226 if (q != p->second.begin()) {
3227 --q;
3228 if (hoid >= q->second.end) {
3229 ++q;
3230 }
3231 }
3232 if (q != p->second.end()) {
3233 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3234 << "," << q->second.end << ")" << dendl;
3235 int r = cmp(hoid, q->second.begin);
3236 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3237 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3238 << " id " << q->second.id << " on " << hoid
3239 << ", queuing " << op << " tid " << op->tid << dendl;
3240 return;
3241 }
3242 }
3243 }
3244
11fdf7f2
TL
3245 ceph_assert(op->tid > 0);
3246 MOSDOp *m = _prepare_osd_op(op);
7c673cae
FG
3247
3248 if (op->target.actual_pgid != m->get_spg()) {
3249 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3250 << m->get_spg() << " to " << op->target.actual_pgid
3251 << ", updating and reencoding" << dendl;
3252 m->set_spg(op->target.actual_pgid);
3253 m->clear_payload(); // reencode
3254 }
3255
3256 ldout(cct, 15) << "_send_op " << op->tid << " to "
3257 << op->target.actual_pgid << " on osd." << op->session->osd
3258 << dendl;
3259
3260 ConnectionRef con = op->session->con;
11fdf7f2 3261 ceph_assert(con);
7c673cae 3262
11fdf7f2 3263#if 0
9f95a23c 3264 // preallocated rx ceph::buffer?
7c673cae 3265 if (op->con) {
9f95a23c 3266 ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
7c673cae
FG
3267 << op->con << dendl;
3268 op->con->revoke_rx_buffer(op->tid);
3269 }
3270 if (op->outbl &&
3271 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3272 op->outbl->length()) {
11fdf7f2 3273 op->outbl->invalidate_crc(); // messenger writes through c_str()
9f95a23c 3274 ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
7c673cae
FG
3275 << dendl;
3276 op->con = con;
3277 op->con->post_rx_buffer(op->tid, *op->outbl);
3278 }
11fdf7f2 3279#endif
7c673cae
FG
3280
3281 op->incarnation = op->session->incarnation;
3282
31f18b77
FG
3283 if (op->trace.valid()) {
3284 m->trace.init("op msg", nullptr, &op->trace);
3285 }
7c673cae
FG
3286 op->session->con->send_message(m);
3287}
3288
11fdf7f2 3289int Objecter::calc_op_budget(const vector<OSDOp>& ops)
7c673cae
FG
3290{
3291 int op_budget = 0;
11fdf7f2
TL
3292 for (vector<OSDOp>::const_iterator i = ops.begin();
3293 i != ops.end();
7c673cae
FG
3294 ++i) {
3295 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3296 op_budget += i->indata.length();
3297 } else if (ceph_osd_op_mode_read(i->op.op)) {
a8e16298
TL
3298 if (ceph_osd_op_uses_extent(i->op.op)) {
3299 if ((int64_t)i->op.extent.length > 0)
3300 op_budget += (int64_t)i->op.extent.length;
7c673cae 3301 } else if (ceph_osd_op_type_attr(i->op.op)) {
a8e16298 3302 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
7c673cae
FG
3303 }
3304 }
3305 }
3306 return op_budget;
3307}
3308
3309void Objecter::_throttle_op(Op *op,
3310 shunique_lock& sul,
3311 int op_budget)
3312{
11fdf7f2 3313 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
3314 bool locked_for_write = sul.owns_lock();
3315
3316 if (!op_budget)
11fdf7f2 3317 op_budget = calc_op_budget(op->ops);
7c673cae
FG
3318 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3319 sul.unlock();
3320 op_throttle_bytes.get(op_budget);
3321 if (locked_for_write)
3322 sul.lock();
3323 else
3324 sul.lock_shared();
3325 }
3326 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3327 sul.unlock();
3328 op_throttle_ops.get(1);
3329 if (locked_for_write)
3330 sul.lock();
3331 else
3332 sul.lock_shared();
3333 }
3334}
3335
11fdf7f2 3336int Objecter::take_linger_budget(LingerOp *info)
7c673cae 3337{
11fdf7f2 3338 return 1;
7c673cae
FG
3339}
3340
3341/* This function DOES put the passed message before returning */
3342void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3343{
3344 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3345
3346 // get pio
3347 ceph_tid_t tid = m->get_tid();
3348
3349 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3350 if (!initialized) {
7c673cae
FG
3351 m->put();
3352 return;
3353 }
3354
3355 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3356 auto priv = con->get_priv();
3357 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3358 if (!s || s->con != con) {
3359 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3360 m->put();
3361 return;
3362 }
3363
3364 OSDSession::unique_lock sl(s->lock);
3365
3366 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3367 if (iter == s->ops.end()) {
3368 ldout(cct, 7) << "handle_osd_op_reply " << tid
3369 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3370 " onnvram" : " ack"))
3371 << " ... stray" << dendl;
3372 sl.unlock();
7c673cae
FG
3373 m->put();
3374 return;
3375 }
3376
3377 ldout(cct, 7) << "handle_osd_op_reply " << tid
3378 << (m->is_ondisk() ? " ondisk" :
3379 (m->is_onnvram() ? " onnvram" : " ack"))
3380 << " uv " << m->get_user_version()
3381 << " in " << m->get_pg()
3382 << " attempt " << m->get_retry_attempt()
3383 << dendl;
3384 Op *op = iter->second;
31f18b77 3385 op->trace.event("osd op reply");
7c673cae
FG
3386
3387 if (retry_writes_after_first_reply && op->attempts == 1 &&
3388 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3389 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3390 if (op->onfinish) {
31f18b77 3391 num_in_flight--;
7c673cae
FG
3392 }
3393 _session_op_remove(s, op);
3394 sl.unlock();
7c673cae
FG
3395
3396 _op_submit(op, sul, NULL);
3397 m->put();
3398 return;
3399 }
3400
3401 if (m->get_retry_attempt() >= 0) {
3402 if (m->get_retry_attempt() != (op->attempts - 1)) {
3403 ldout(cct, 7) << " ignoring reply from attempt "
3404 << m->get_retry_attempt()
3405 << " from " << m->get_source_inst()
3406 << "; last attempt " << (op->attempts - 1) << " sent to "
3407 << op->session->con->get_peer_addr() << dendl;
3408 m->put();
3409 sl.unlock();
7c673cae
FG
3410 return;
3411 }
3412 } else {
3413 // we don't know the request attempt because the server is old, so
3414 // just accept this one. we may do ACK callbacks we shouldn't
3415 // have, but that is better than doing callbacks out of order.
3416 }
3417
3418 Context *onfinish = 0;
3419
3420 int rc = m->get_result();
3421
3422 if (m->is_redirect_reply()) {
3423 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3424 if (op->onfinish)
31f18b77 3425 num_in_flight--;
7c673cae
FG
3426 _session_op_remove(s, op);
3427 sl.unlock();
7c673cae
FG
3428
3429 // FIXME: two redirects could race and reorder
3430
3431 op->tid = 0;
3432 m->get_redirect().combine_with_locator(op->target.target_oloc,
3433 op->target.target_oid.name);
f64942e4
AA
3434 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3435 CEPH_OSD_FLAG_IGNORE_CACHE |
3436 CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3437 _op_submit(op, sul, NULL);
3438 m->put();
3439 return;
3440 }
3441
3442 if (rc == -EAGAIN) {
3443 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
c07f9fc5
FG
3444 if (op->onfinish)
3445 num_in_flight--;
3446 _session_op_remove(s, op);
7c673cae 3447 sl.unlock();
c07f9fc5
FG
3448
3449 op->tid = 0;
3450 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3451 CEPH_OSD_FLAG_LOCALIZE_READS);
3452 op->target.pgid = pg_t();
3453 _op_submit(op, sul, NULL);
7c673cae
FG
3454 m->put();
3455 return;
3456 }
3457
3458 sul.unlock();
3459
3460 if (op->objver)
3461 *op->objver = m->get_user_version();
3462 if (op->reply_epoch)
3463 *op->reply_epoch = m->get_map_epoch();
3464 if (op->data_offset)
3465 *op->data_offset = m->get_header().data_off;
3466
3467 // got data?
3468 if (op->outbl) {
11fdf7f2 3469#if 0
7c673cae
FG
3470 if (op->con)
3471 op->con->revoke_rx_buffer(op->tid);
11fdf7f2
TL
3472#endif
3473 auto& bl = m->get_data();
3474 if (op->outbl->length() == bl.length() &&
3475 bl.get_num_buffers() <= 1) {
3476 // this is here to keep previous users to *relied* on getting data
3477 // read into existing buffers happy. Notably,
3478 // libradosstriper::RadosStriperImpl::aio_read().
3479 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
9f95a23c 3480 << " into existing ceph::buffer of length " << op->outbl->length()
11fdf7f2 3481 << dendl;
9f95a23c 3482 ceph::buffer::list t;
11fdf7f2
TL
3483 t.claim(*op->outbl);
3484 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
9f95a23c 3485 bl.begin().copy(bl.length(), t.c_str());
11fdf7f2
TL
3486 op->outbl->substr_of(t, 0, bl.length());
3487 } else {
3488 m->claim_data(*op->outbl);
3489 }
7c673cae
FG
3490 op->outbl = 0;
3491 }
3492
3493 // per-op result demuxing
3494 vector<OSDOp> out_ops;
3495 m->claim_ops(out_ops);
3496
3497 if (out_ops.size() != op->ops.size())
3498 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3499 << " != request ops " << op->ops
3500 << " from " << m->get_source_inst() << dendl;
3501
9f95a23c 3502 vector<ceph::buffer::list*>::iterator pb = op->out_bl.begin();
7c673cae
FG
3503 vector<int*>::iterator pr = op->out_rval.begin();
3504 vector<Context*>::iterator ph = op->out_handler.begin();
11fdf7f2
TL
3505 ceph_assert(op->out_bl.size() == op->out_rval.size());
3506 ceph_assert(op->out_bl.size() == op->out_handler.size());
7c673cae
FG
3507 vector<OSDOp>::iterator p = out_ops.begin();
3508 for (unsigned i = 0;
3509 p != out_ops.end() && pb != op->out_bl.end();
3510 ++i, ++p, ++pb, ++pr, ++ph) {
3511 ldout(cct, 10) << " op " << i << " rval " << p->rval
3512 << " len " << p->outdata.length() << dendl;
3513 if (*pb)
3514 **pb = p->outdata;
3515 // set rval before running handlers so that handlers
3516 // can change it if e.g. decoding fails
3517 if (*pr)
31f18b77 3518 **pr = ceph_to_hostos_errno(p->rval);
7c673cae
FG
3519 if (*ph) {
3520 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
31f18b77 3521 (*ph)->complete(ceph_to_hostos_errno(p->rval));
7c673cae
FG
3522 *ph = NULL;
3523 }
3524 }
3525
3526 // NOTE: we assume that since we only request ONDISK ever we will
3527 // only ever get back one (type of) ack ever.
3528
3529 if (op->onfinish) {
31f18b77 3530 num_in_flight--;
7c673cae
FG
3531 onfinish = op->onfinish;
3532 op->onfinish = NULL;
3533 }
3534 logger->inc(l_osdc_op_reply);
3535
3536 /* get it before we call _finish_op() */
3537 auto completion_lock = s->get_lock(op->target.base_oid);
3538
3539 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3540 _finish_op(op, 0);
3541
31f18b77 3542 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3543
3544 // serialize completions
3545 if (completion_lock.mutex()) {
3546 completion_lock.lock();
3547 }
3548 sl.unlock();
3549
3550 // do callbacks
3551 if (onfinish) {
3552 onfinish->complete(rc);
3553 }
3554 if (completion_lock.mutex()) {
3555 completion_lock.unlock();
3556 }
3557
3558 m->put();
7c673cae
FG
3559}
3560
3561void Objecter::handle_osd_backoff(MOSDBackoff *m)
3562{
3563 ldout(cct, 10) << __func__ << " " << *m << dendl;
3564 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3565 if (!initialized) {
7c673cae
FG
3566 m->put();
3567 return;
3568 }
3569
3570 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3571 auto priv = con->get_priv();
3572 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3573 if (!s || s->con != con) {
3574 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3575 m->put();
3576 return;
3577 }
3578
3579 get_session(s);
7c673cae
FG
3580
3581 OSDSession::unique_lock sl(s->lock);
3582
3583 switch (m->op) {
3584 case CEPH_OSD_BACKOFF_OP_BLOCK:
3585 {
3586 // register
3587 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3588 s->backoffs_by_id.insert(make_pair(m->id, &b));
3589 b.pgid = m->pgid;
3590 b.id = m->id;
3591 b.begin = m->begin;
3592 b.end = m->end;
3593
3594 // ack with original backoff's epoch so that the osd can discard this if
3595 // there was a pg split.
3596 Message *r = new MOSDBackoff(m->pgid,
3597 m->map_epoch,
3598 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3599 m->id, m->begin, m->end);
3600 // this priority must match the MOSDOps from _prepare_osd_op
3601 r->set_priority(cct->_conf->osd_client_op_priority);
3602 con->send_message(r);
3603 }
3604 break;
3605
3606 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3607 {
3608 auto p = s->backoffs_by_id.find(m->id);
3609 if (p != s->backoffs_by_id.end()) {
3610 OSDBackoff *b = p->second;
3611 if (b->begin != m->begin &&
3612 b->end != m->end) {
3613 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3614 << " unblock on ["
3615 << m->begin << "," << m->end << ") but backoff is ["
3616 << b->begin << "," << b->end << ")" << dendl;
3617 // hrmpf, unblock it anyway.
3618 }
3619 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3620 << " id " << b->id
3621 << " [" << b->begin << "," << b->end
3622 << ")" << dendl;
3623 auto spgp = s->backoffs.find(b->pgid);
11fdf7f2 3624 ceph_assert(spgp != s->backoffs.end());
7c673cae
FG
3625 spgp->second.erase(b->begin);
3626 if (spgp->second.empty()) {
3627 s->backoffs.erase(spgp);
3628 }
3629 s->backoffs_by_id.erase(p);
3630
3631 // check for any ops to resend
3632 for (auto& q : s->ops) {
3633 if (q.second->target.actual_pgid == m->pgid) {
3634 int r = q.second->target.contained_by(m->begin, m->end);
3635 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3636 << q.second->target.get_hobj() << dendl;
3637 if (r) {
3638 _send_op(q.second);
3639 }
3640 }
3641 }
3642 } else {
3643 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3644 << " unblock on ["
3645 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3646 }
3647 }
3648 break;
3649
3650 default:
3651 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3652 }
3653
3654 sul.unlock();
3655 sl.unlock();
3656
3657 m->put();
3658 put_session(s);
3659}
3660
3661uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3662 uint32_t pos)
3663{
3664 shared_lock rl(rwlock);
3665 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3666 pos, list_context->pool_id, string());
11fdf7f2 3667 ldout(cct, 10) << __func__ << " " << list_context
7c673cae
FG
3668 << " pos " << pos << " -> " << list_context->pos << dendl;
3669 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3670 list_context->current_pg = actual.ps();
3671 list_context->at_end_of_pool = false;
3672 return pos;
3673}
3674
3675uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3676 const hobject_t& cursor)
3677{
3678 shared_lock rl(rwlock);
3679 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3680 list_context->pos = cursor;
3681 list_context->at_end_of_pool = false;
3682 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3683 list_context->current_pg = actual.ps();
3684 list_context->sort_bitwise = true;
3685 return list_context->current_pg;
3686}
3687
3688void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3689 hobject_t *cursor)
3690{
3691 shared_lock rl(rwlock);
3692 if (list_context->list.empty()) {
3693 *cursor = list_context->pos;
3694 } else {
3695 const librados::ListObjectImpl& entry = list_context->list.front();
3696 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3697 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3698 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3699 }
3700}
3701
3702void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3703{
3704 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3705 << " pool_snap_seq " << list_context->pool_snap_seq
3706 << " max_entries " << list_context->max_entries
3707 << " list_context " << list_context
3708 << " onfinish " << onfinish
3709 << " current_pg " << list_context->current_pg
3710 << " pos " << list_context->pos << dendl;
3711
3712 shared_lock rl(rwlock);
3713 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3714 if (!pool) { // pool is gone
3715 rl.unlock();
3716 put_nlist_context_budget(list_context);
3717 onfinish->complete(-ENOENT);
3718 return;
3719 }
3720 int pg_num = pool->get_pg_num();
3721 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3722
3723 if (list_context->pos.is_min()) {
3724 list_context->starting_pg_num = 0;
3725 list_context->sort_bitwise = sort_bitwise;
3726 list_context->starting_pg_num = pg_num;
3727 }
3728 if (list_context->sort_bitwise != sort_bitwise) {
3729 list_context->pos = hobject_t(
3730 object_t(), string(), CEPH_NOSNAP,
3731 list_context->current_pg, list_context->pool_id, string());
3732 list_context->sort_bitwise = sort_bitwise;
3733 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3734 << list_context->pos << dendl;
3735 }
3736 if (list_context->starting_pg_num != pg_num) {
3737 if (!sort_bitwise) {
3738 // start reading from the beginning; the pgs have changed
3739 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3740 list_context->pos = collection_list_handle_t();
3741 }
3742 list_context->starting_pg_num = pg_num;
3743 }
3744
3745 if (list_context->pos.is_max()) {
3746 ldout(cct, 20) << __func__ << " end of pool, list "
3747 << list_context->list << dendl;
3748 if (list_context->list.empty()) {
3749 list_context->at_end_of_pool = true;
3750 }
3751 // release the listing context's budget once all
3752 // OPs (in the session) are finished
3753 put_nlist_context_budget(list_context);
3754 onfinish->complete(0);
3755 return;
3756 }
3757
3758 ObjectOperation op;
3759 op.pg_nls(list_context->max_entries, list_context->filter,
3760 list_context->pos, osdmap->get_epoch());
3761 list_context->bl.clear();
3762 C_NList *onack = new C_NList(list_context, onfinish, this);
3763 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3764
3765 // note current_pg in case we don't have (or lose) SORTBITWISE
3766 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3767 rl.unlock();
3768
3769 pg_read(list_context->current_pg, oloc, op,
3770 &list_context->bl, 0, onack, &onack->epoch,
3771 &list_context->ctx_budget);
3772}
3773
3774void Objecter::_nlist_reply(NListContext *list_context, int r,
3775 Context *final_finish, epoch_t reply_epoch)
3776{
3777 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3778
11fdf7f2 3779 auto iter = list_context->bl.cbegin();
7c673cae 3780 pg_nls_response_t response;
11fdf7f2 3781 decode(response, iter);
7c673cae 3782 if (!iter.end()) {
9f95a23c
TL
3783 // we do this as legacy.
3784 ceph::buffer::list legacy_extra_info;
3785 decode(legacy_extra_info, iter);
7c673cae
FG
3786 }
3787
3788 // if the osd returns 1 (newer code), or handle MAX, it means we
3789 // hit the end of the pg.
3790 if ((response.handle.is_max() || r == 1) &&
3791 !list_context->sort_bitwise) {
3792 // legacy OSD and !sortbitwise, figure out the next PG on our own
3793 ++list_context->current_pg;
3794 if (list_context->current_pg == list_context->starting_pg_num) {
3795 // end of pool
3796 list_context->pos = hobject_t::get_max();
3797 } else {
3798 // next pg
3799 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3800 list_context->current_pg,
3801 list_context->pool_id, string());
3802 }
3803 } else {
3804 list_context->pos = response.handle;
3805 }
3806
3807 int response_size = response.entries.size();
3808 ldout(cct, 20) << " response.entries.size " << response_size
3809 << ", response.entries " << response.entries
3810 << ", handle " << response.handle
3811 << ", tentative new pos " << list_context->pos << dendl;
7c673cae
FG
3812 if (response_size) {
3813 list_context->list.splice(list_context->list.end(), response.entries);
3814 }
3815
3816 if (list_context->list.size() >= list_context->max_entries) {
3817 ldout(cct, 20) << " hit max, returning results so far, "
3818 << list_context->list << dendl;
3819 // release the listing context's budget once all
3820 // OPs (in the session) are finished
3821 put_nlist_context_budget(list_context);
3822 final_finish->complete(0);
3823 return;
3824 }
3825
3826 // continue!
3827 list_nobjects(list_context, final_finish);
3828}
3829
3830void Objecter::put_nlist_context_budget(NListContext *list_context)
3831{
3832 if (list_context->ctx_budget >= 0) {
3833 ldout(cct, 10) << " release listing context's budget " <<
3834 list_context->ctx_budget << dendl;
3835 put_op_budget_bytes(list_context->ctx_budget);
3836 list_context->ctx_budget = -1;
3837 }
3838}
3839
3840// snapshots
3841
3842int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3843 Context *onfinish)
3844{
3845 unique_lock wl(rwlock);
3846 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3847 << snap_name << dendl;
3848
3849 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3850 if (!p)
3851 return -EINVAL;
3852 if (p->snap_exists(snap_name.c_str()))
3853 return -EEXIST;
3854
3855 PoolOp *op = new PoolOp;
3856 if (!op)
3857 return -ENOMEM;
31f18b77 3858 op->tid = ++last_tid;
7c673cae
FG
3859 op->pool = pool;
3860 op->name = snap_name;
3861 op->onfinish = onfinish;
3862 op->pool_op = POOL_OP_CREATE_SNAP;
3863 pool_ops[op->tid] = op;
3864
3865 pool_op_submit(op);
3866
3867 return 0;
3868}
3869
3870struct C_SelfmanagedSnap : public Context {
9f95a23c 3871 ceph::buffer::list bl;
7c673cae
FG
3872 snapid_t *psnapid;
3873 Context *fin;
3874 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3875 void finish(int r) override {
3876 if (r == 0) {
11fdf7f2
TL
3877 try {
3878 auto p = bl.cbegin();
3879 decode(*psnapid, p);
9f95a23c 3880 } catch (ceph::buffer::error&) {
11fdf7f2
TL
3881 r = -EIO;
3882 }
7c673cae
FG
3883 }
3884 fin->complete(r);
3885 }
3886};
3887
3888int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3889 Context *onfinish)
3890{
3891 unique_lock wl(rwlock);
3892 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3893 PoolOp *op = new PoolOp;
3894 if (!op) return -ENOMEM;
31f18b77 3895 op->tid = ++last_tid;
7c673cae
FG
3896 op->pool = pool;
3897 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3898 op->onfinish = fin;
3899 op->blp = &fin->bl;
3900 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3901 pool_ops[op->tid] = op;
3902
3903 pool_op_submit(op);
3904 return 0;
3905}
3906
3907int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3908 Context *onfinish)
3909{
3910 unique_lock wl(rwlock);
3911 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3912 << snap_name << dendl;
3913
3914 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3915 if (!p)
3916 return -EINVAL;
3917 if (!p->snap_exists(snap_name.c_str()))
3918 return -ENOENT;
3919
3920 PoolOp *op = new PoolOp;
3921 if (!op)
3922 return -ENOMEM;
31f18b77 3923 op->tid = ++last_tid;
7c673cae
FG
3924 op->pool = pool;
3925 op->name = snap_name;
3926 op->onfinish = onfinish;
3927 op->pool_op = POOL_OP_DELETE_SNAP;
3928 pool_ops[op->tid] = op;
3929
3930 pool_op_submit(op);
3931
3932 return 0;
3933}
3934
3935int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3936 Context *onfinish)
3937{
3938 unique_lock wl(rwlock);
3939 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3940 << snap << dendl;
3941 PoolOp *op = new PoolOp;
3942 if (!op) return -ENOMEM;
31f18b77 3943 op->tid = ++last_tid;
7c673cae
FG
3944 op->pool = pool;
3945 op->onfinish = onfinish;
3946 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3947 op->snapid = snap;
3948 pool_ops[op->tid] = op;
3949
3950 pool_op_submit(op);
3951
3952 return 0;
3953}
3954
11fdf7f2 3955int Objecter::create_pool(string& name, Context *onfinish,
7c673cae
FG
3956 int crush_rule)
3957{
3958 unique_lock wl(rwlock);
3959 ldout(cct, 10) << "create_pool name=" << name << dendl;
3960
3961 if (osdmap->lookup_pg_pool_name(name) >= 0)
3962 return -EEXIST;
3963
3964 PoolOp *op = new PoolOp;
3965 if (!op)
3966 return -ENOMEM;
31f18b77 3967 op->tid = ++last_tid;
7c673cae
FG
3968 op->pool = 0;
3969 op->name = name;
3970 op->onfinish = onfinish;
3971 op->pool_op = POOL_OP_CREATE;
3972 pool_ops[op->tid] = op;
7c673cae
FG
3973 op->crush_rule = crush_rule;
3974
3975 pool_op_submit(op);
3976
3977 return 0;
3978}
3979
3980int Objecter::delete_pool(int64_t pool, Context *onfinish)
3981{
3982 unique_lock wl(rwlock);
3983 ldout(cct, 10) << "delete_pool " << pool << dendl;
3984
3985 if (!osdmap->have_pg_pool(pool))
3986 return -ENOENT;
3987
3988 _do_delete_pool(pool, onfinish);
3989 return 0;
3990}
3991
3992int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3993{
3994 unique_lock wl(rwlock);
3995 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3996
3997 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3998 if (pool < 0)
3999 return pool;
4000
4001 _do_delete_pool(pool, onfinish);
4002 return 0;
4003}
4004
4005void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
4006{
4007 PoolOp *op = new PoolOp;
31f18b77 4008 op->tid = ++last_tid;
7c673cae
FG
4009 op->pool = pool;
4010 op->name = "delete";
4011 op->onfinish = onfinish;
4012 op->pool_op = POOL_OP_DELETE;
4013 pool_ops[op->tid] = op;
4014 pool_op_submit(op);
4015}
4016
7c673cae
FG
4017void Objecter::pool_op_submit(PoolOp *op)
4018{
4019 // rwlock is locked
4020 if (mon_timeout > timespan(0)) {
4021 op->ontimeout = timer.add_event(mon_timeout,
4022 [this, op]() {
4023 pool_op_cancel(op->tid, -ETIMEDOUT); });
4024 }
4025 _pool_op_submit(op);
4026}
4027
4028void Objecter::_pool_op_submit(PoolOp *op)
4029{
4030 // rwlock is locked unique
4031
4032 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4033 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4034 op->name, op->pool_op,
11fdf7f2 4035 last_seen_osdmap_version);
7c673cae
FG
4036 if (op->snapid) m->snapid = op->snapid;
4037 if (op->crush_rule) m->crush_rule = op->crush_rule;
4038 monc->send_mon_message(m);
11fdf7f2 4039 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4040
4041 logger->inc(l_osdc_poolop_send);
4042}
4043
4044/**
4045 * Handle a reply to a PoolOp message. Check that we sent the message
9f95a23c 4046 * and give the caller responsibility for the returned ceph::buffer::list.
7c673cae
FG
4047 * Then either call the finisher or stash the PoolOp, depending on if we
4048 * have a new enough map.
4049 * Lastly, clean up the message and PoolOp.
4050 */
4051void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4052{
11fdf7f2 4053 FUNCTRACE(cct);
7c673cae 4054 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4055 if (!initialized) {
7c673cae
FG
4056 sul.unlock();
4057 m->put();
4058 return;
4059 }
4060
4061 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4062 ceph_tid_t tid = m->get_tid();
4063 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4064 if (iter != pool_ops.end()) {
4065 PoolOp *op = iter->second;
4066 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4067 << ceph_pool_op_name(op->pool_op) << dendl;
4068 if (op->blp)
4069 op->blp->claim(m->response_data);
4070 if (m->version > last_seen_osdmap_version)
4071 last_seen_osdmap_version = m->version;
4072 if (osdmap->get_epoch() < m->epoch) {
4073 sul.unlock();
4074 sul.lock();
4075 // recheck op existence since we have let go of rwlock
4076 // (for promotion) above.
4077 iter = pool_ops.find(tid);
4078 if (iter == pool_ops.end())
4079 goto done; // op is gone.
4080 if (osdmap->get_epoch() < m->epoch) {
4081 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4082 << " before calling back" << dendl;
4083 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4084 } else {
4085 // map epoch changed, probably because a MOSDMap message
4086 // sneaked in. Do caller-specified callback now or else
4087 // we lose it forever.
11fdf7f2 4088 ceph_assert(op->onfinish);
7c673cae
FG
4089 op->onfinish->complete(m->replyCode);
4090 }
4091 } else {
11fdf7f2 4092 ceph_assert(op->onfinish);
7c673cae
FG
4093 op->onfinish->complete(m->replyCode);
4094 }
4095 op->onfinish = NULL;
4096 if (!sul.owns_lock()) {
4097 sul.unlock();
4098 sul.lock();
4099 }
4100 iter = pool_ops.find(tid);
4101 if (iter != pool_ops.end()) {
4102 _finish_pool_op(op, 0);
4103 }
4104 } else {
4105 ldout(cct, 10) << "unknown request " << tid << dendl;
4106 }
4107
4108done:
4109 // Not strictly necessary, since we'll release it on return.
4110 sul.unlock();
4111
4112 ldout(cct, 10) << "done" << dendl;
4113 m->put();
4114}
4115
4116int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4117{
11fdf7f2 4118 ceph_assert(initialized);
7c673cae
FG
4119
4120 unique_lock wl(rwlock);
4121
4122 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4123 if (it == pool_ops.end()) {
4124 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4125 return -ENOENT;
4126 }
4127
4128 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4129
4130 PoolOp *op = it->second;
4131 if (op->onfinish)
4132 op->onfinish->complete(r);
4133
4134 _finish_pool_op(op, r);
4135 return 0;
4136}
4137
4138void Objecter::_finish_pool_op(PoolOp *op, int r)
4139{
4140 // rwlock is locked unique
4141 pool_ops.erase(op->tid);
4142 logger->set(l_osdc_poolop_active, pool_ops.size());
4143
4144 if (op->ontimeout && r != -ETIMEDOUT) {
4145 timer.cancel_event(op->ontimeout);
4146 }
4147
4148 delete op;
4149}
4150
4151// pool stats
4152
4153void Objecter::get_pool_stats(list<string>& pools,
4154 map<string,pool_stat_t> *result,
81eedcae 4155 bool *per_pool,
7c673cae
FG
4156 Context *onfinish)
4157{
4158 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4159
4160 PoolStatOp *op = new PoolStatOp;
31f18b77 4161 op->tid = ++last_tid;
7c673cae
FG
4162 op->pools = pools;
4163 op->pool_stats = result;
81eedcae 4164 op->per_pool = per_pool;
7c673cae
FG
4165 op->onfinish = onfinish;
4166 if (mon_timeout > timespan(0)) {
4167 op->ontimeout = timer.add_event(mon_timeout,
4168 [this, op]() {
4169 pool_stat_op_cancel(op->tid,
4170 -ETIMEDOUT); });
4171 } else {
4172 op->ontimeout = 0;
4173 }
4174
4175 unique_lock wl(rwlock);
4176
4177 poolstat_ops[op->tid] = op;
4178
4179 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4180
4181 _poolstat_submit(op);
4182}
4183
4184void Objecter::_poolstat_submit(PoolStatOp *op)
4185{
4186 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4187 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4188 op->pools,
4189 last_seen_pgmap_version));
11fdf7f2 4190 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4191
4192 logger->inc(l_osdc_poolstat_send);
4193}
4194
4195void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4196{
4197 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4198 ceph_tid_t tid = m->get_tid();
4199
4200 unique_lock wl(rwlock);
31f18b77 4201 if (!initialized) {
7c673cae
FG
4202 m->put();
4203 return;
4204 }
4205
4206 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4207 if (iter != poolstat_ops.end()) {
4208 PoolStatOp *op = poolstat_ops[tid];
4209 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4210 *op->pool_stats = m->pool_stats;
81eedcae 4211 *op->per_pool = m->per_pool;
7c673cae
FG
4212 if (m->version > last_seen_pgmap_version) {
4213 last_seen_pgmap_version = m->version;
4214 }
4215 op->onfinish->complete(0);
4216 _finish_pool_stat_op(op, 0);
4217 } else {
4218 ldout(cct, 10) << "unknown request " << tid << dendl;
4219 }
4220 ldout(cct, 10) << "done" << dendl;
4221 m->put();
4222}
4223
4224int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4225{
11fdf7f2 4226 ceph_assert(initialized);
7c673cae
FG
4227
4228 unique_lock wl(rwlock);
4229
4230 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4231 if (it == poolstat_ops.end()) {
4232 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4233 return -ENOENT;
4234 }
4235
4236 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4237
4238 PoolStatOp *op = it->second;
4239 if (op->onfinish)
4240 op->onfinish->complete(r);
4241 _finish_pool_stat_op(op, r);
4242 return 0;
4243}
4244
4245void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4246{
4247 // rwlock is locked unique
4248
4249 poolstat_ops.erase(op->tid);
4250 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4251
4252 if (op->ontimeout && r != -ETIMEDOUT)
4253 timer.cancel_event(op->ontimeout);
4254
4255 delete op;
4256}
4257
d2e6a577
FG
4258void Objecter::get_fs_stats(ceph_statfs& result,
4259 boost::optional<int64_t> data_pool,
4260 Context *onfinish)
7c673cae
FG
4261{
4262 ldout(cct, 10) << "get_fs_stats" << dendl;
4263 unique_lock l(rwlock);
4264
4265 StatfsOp *op = new StatfsOp;
31f18b77 4266 op->tid = ++last_tid;
7c673cae 4267 op->stats = &result;
d2e6a577 4268 op->data_pool = data_pool;
7c673cae
FG
4269 op->onfinish = onfinish;
4270 if (mon_timeout > timespan(0)) {
4271 op->ontimeout = timer.add_event(mon_timeout,
4272 [this, op]() {
4273 statfs_op_cancel(op->tid,
4274 -ETIMEDOUT); });
4275 } else {
4276 op->ontimeout = 0;
4277 }
4278 statfs_ops[op->tid] = op;
4279
4280 logger->set(l_osdc_statfs_active, statfs_ops.size());
4281
4282 _fs_stats_submit(op);
4283}
4284
4285void Objecter::_fs_stats_submit(StatfsOp *op)
4286{
4287 // rwlock is locked unique
4288
4289 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4290 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4291 op->data_pool,
7c673cae 4292 last_seen_pgmap_version));
11fdf7f2 4293 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4294
4295 logger->inc(l_osdc_statfs_send);
4296}
4297
4298void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4299{
4300 unique_lock wl(rwlock);
31f18b77 4301 if (!initialized) {
7c673cae
FG
4302 m->put();
4303 return;
4304 }
4305
4306 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4307 ceph_tid_t tid = m->get_tid();
4308
4309 if (statfs_ops.count(tid)) {
4310 StatfsOp *op = statfs_ops[tid];
4311 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4312 *(op->stats) = m->h.st;
4313 if (m->h.version > last_seen_pgmap_version)
4314 last_seen_pgmap_version = m->h.version;
4315 op->onfinish->complete(0);
4316 _finish_statfs_op(op, 0);
4317 } else {
4318 ldout(cct, 10) << "unknown request " << tid << dendl;
4319 }
4320 m->put();
4321 ldout(cct, 10) << "done" << dendl;
4322}
4323
4324int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4325{
11fdf7f2 4326 ceph_assert(initialized);
7c673cae
FG
4327
4328 unique_lock wl(rwlock);
4329
4330 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4331 if (it == statfs_ops.end()) {
4332 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4333 return -ENOENT;
4334 }
4335
4336 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4337
4338 StatfsOp *op = it->second;
4339 if (op->onfinish)
4340 op->onfinish->complete(r);
4341 _finish_statfs_op(op, r);
4342 return 0;
4343}
4344
4345void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4346{
4347 // rwlock is locked unique
4348
4349 statfs_ops.erase(op->tid);
4350 logger->set(l_osdc_statfs_active, statfs_ops.size());
4351
4352 if (op->ontimeout && r != -ETIMEDOUT)
4353 timer.cancel_event(op->ontimeout);
4354
4355 delete op;
4356}
4357
4358// scatter/gather
4359
4360void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
9f95a23c
TL
4361 vector<ceph::buffer::list>& resultbl,
4362 ceph::buffer::list *bl, Context *onfinish)
7c673cae
FG
4363{
4364 // all done
4365 ldout(cct, 15) << "_sg_read_finish" << dendl;
4366
4367 if (extents.size() > 1) {
4368 Striper::StripedReadResult r;
9f95a23c 4369 vector<ceph::buffer::list>::iterator bit = resultbl.begin();
7c673cae
FG
4370 for (vector<ObjectExtent>::iterator eit = extents.begin();
4371 eit != extents.end();
4372 ++eit, ++bit) {
4373 r.add_partial_result(cct, *bit, eit->buffer_extents);
4374 }
4375 bl->clear();
4376 r.assemble_result(cct, *bl, false);
4377 } else {
4378 ldout(cct, 15) << " only one frag" << dendl;
4379 bl->claim(resultbl[0]);
4380 }
4381
4382 // done
4383 uint64_t bytes_read = bl->length();
4384 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4385
4386 if (onfinish) {
4387 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4388 }
4389}
4390
4391
4392void Objecter::ms_handle_connect(Connection *con)
4393{
4394 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4395 if (!initialized)
7c673cae
FG
4396 return;
4397
4398 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4399 resend_mon_ops();
4400}
4401
4402bool Objecter::ms_handle_reset(Connection *con)
4403{
31f18b77 4404 if (!initialized)
7c673cae
FG
4405 return false;
4406 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
91327a77 4407 unique_lock wl(rwlock);
11fdf7f2
TL
4408
4409 auto priv = con->get_priv();
4410 auto session = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4411 if (session) {
4412 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4413 << " osd." << session->osd << dendl;
a8e16298
TL
4414 // the session maybe had been closed if new osdmap just handled
4415 // says the osd down
4416 if (!(initialized && osdmap->is_up(session->osd))) {
4417 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
7c673cae
FG
4418 wl.unlock();
4419 return false;
4420 }
4421 map<uint64_t, LingerOp *> lresend;
4422 OSDSession::unique_lock sl(session->lock);
4423 _reopen_session(session);
4424 _kick_requests(session, lresend);
4425 sl.unlock();
4426 _linger_ops_resend(lresend, wl);
4427 wl.unlock();
4428 maybe_request_map();
7c673cae
FG
4429 }
4430 return true;
4431 }
4432 return false;
4433}
4434
4435void Objecter::ms_handle_remote_reset(Connection *con)
4436{
4437 /*
4438 * treat these the same.
4439 */
4440 ms_handle_reset(con);
4441}
4442
4443bool Objecter::ms_handle_refused(Connection *con)
4444{
4445 // just log for now
4446 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4447 int osd = osdmap->identify_osd(con->get_peer_addr());
4448 if (osd >= 0) {
4449 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4450 }
4451 }
4452 return false;
4453}
4454
7c673cae
FG
4455void Objecter::op_target_t::dump(Formatter *f) const
4456{
4457 f->dump_stream("pg") << pgid;
4458 f->dump_int("osd", osd);
4459 f->dump_stream("object_id") << base_oid;
4460 f->dump_stream("object_locator") << base_oloc;
4461 f->dump_stream("target_object_id") << target_oid;
4462 f->dump_stream("target_object_locator") << target_oloc;
4463 f->dump_int("paused", (int)paused);
4464 f->dump_int("used_replica", (int)used_replica);
4465 f->dump_int("precalc_pgid", (int)precalc_pgid);
4466}
4467
4468void Objecter::_dump_active(OSDSession *s)
4469{
4470 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4471 p != s->ops.end();
4472 ++p) {
4473 Op *op = p->second;
4474 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4475 << "\tosd." << (op->session ? op->session->osd : -1)
4476 << "\t" << op->target.base_oid
4477 << "\t" << op->ops << dendl;
4478 }
4479}
4480
4481void Objecter::_dump_active()
4482{
31f18b77 4483 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae
FG
4484 << dendl;
4485 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4486 siter != osd_sessions.end(); ++siter) {
4487 OSDSession *s = siter->second;
4488 OSDSession::shared_lock sl(s->lock);
4489 _dump_active(s);
4490 sl.unlock();
4491 }
4492 _dump_active(homeless_session);
4493}
4494
4495void Objecter::dump_active()
4496{
4497 shared_lock rl(rwlock);
4498 _dump_active();
4499 rl.unlock();
4500}
4501
4502void Objecter::dump_requests(Formatter *fmt)
4503{
4504 // Read-lock on Objecter held here
4505 fmt->open_object_section("requests");
4506 dump_ops(fmt);
4507 dump_linger_ops(fmt);
4508 dump_pool_ops(fmt);
4509 dump_pool_stat_ops(fmt);
4510 dump_statfs_ops(fmt);
4511 dump_command_ops(fmt);
4512 fmt->close_section(); // requests object
4513}
4514
4515void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4516{
4517 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4518 p != s->ops.end();
4519 ++p) {
4520 Op *op = p->second;
92f5a8d4 4521 auto age = std::chrono::duration<double>(coarse_mono_clock::now() - op->stamp);
7c673cae
FG
4522 fmt->open_object_section("op");
4523 fmt->dump_unsigned("tid", op->tid);
4524 op->target.dump(fmt);
4525 fmt->dump_stream("last_sent") << op->stamp;
92f5a8d4 4526 fmt->dump_float("age", age.count());
7c673cae
FG
4527 fmt->dump_int("attempts", op->attempts);
4528 fmt->dump_stream("snapid") << op->snapid;
4529 fmt->dump_stream("snap_context") << op->snapc;
4530 fmt->dump_stream("mtime") << op->mtime;
4531
4532 fmt->open_array_section("osd_ops");
4533 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4534 it != op->ops.end();
4535 ++it) {
4536 fmt->dump_stream("osd_op") << *it;
4537 }
4538 fmt->close_section(); // osd_ops array
4539
4540 fmt->close_section(); // op object
4541 }
4542}
4543
4544void Objecter::dump_ops(Formatter *fmt)
4545{
4546 // Read-lock on Objecter held
4547 fmt->open_array_section("ops");
4548 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4549 siter != osd_sessions.end(); ++siter) {
4550 OSDSession *s = siter->second;
4551 OSDSession::shared_lock sl(s->lock);
4552 _dump_ops(s, fmt);
4553 sl.unlock();
4554 }
4555 _dump_ops(homeless_session, fmt);
4556 fmt->close_section(); // ops array
4557}
4558
4559void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4560{
4561 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4562 p != s->linger_ops.end();
4563 ++p) {
4564 LingerOp *op = p->second;
4565 fmt->open_object_section("linger_op");
4566 fmt->dump_unsigned("linger_id", op->linger_id);
4567 op->target.dump(fmt);
4568 fmt->dump_stream("snapid") << op->snap;
4569 fmt->dump_stream("registered") << op->registered;
4570 fmt->close_section(); // linger_op object
4571 }
4572}
4573
4574void Objecter::dump_linger_ops(Formatter *fmt)
4575{
4576 // We have a read-lock on the objecter
4577 fmt->open_array_section("linger_ops");
4578 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4579 siter != osd_sessions.end(); ++siter) {
4580 OSDSession *s = siter->second;
4581 OSDSession::shared_lock sl(s->lock);
4582 _dump_linger_ops(s, fmt);
4583 sl.unlock();
4584 }
4585 _dump_linger_ops(homeless_session, fmt);
4586 fmt->close_section(); // linger_ops array
4587}
4588
4589void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4590{
4591 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4592 p != s->command_ops.end();
4593 ++p) {
4594 CommandOp *op = p->second;
4595 fmt->open_object_section("command_op");
4596 fmt->dump_unsigned("command_id", op->tid);
4597 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4598 fmt->open_array_section("command");
4599 for (vector<string>::const_iterator q = op->cmd.begin();
4600 q != op->cmd.end(); ++q)
4601 fmt->dump_string("word", *q);
4602 fmt->close_section();
4603 if (op->target_osd >= 0)
4604 fmt->dump_int("target_osd", op->target_osd);
4605 else
4606 fmt->dump_stream("target_pg") << op->target_pg;
4607 fmt->close_section(); // command_op object
4608 }
4609}
4610
4611void Objecter::dump_command_ops(Formatter *fmt)
4612{
4613 // We have a read-lock on the Objecter here
4614 fmt->open_array_section("command_ops");
4615 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4616 siter != osd_sessions.end(); ++siter) {
4617 OSDSession *s = siter->second;
4618 OSDSession::shared_lock sl(s->lock);
4619 _dump_command_ops(s, fmt);
4620 sl.unlock();
4621 }
4622 _dump_command_ops(homeless_session, fmt);
4623 fmt->close_section(); // command_ops array
4624}
4625
4626void Objecter::dump_pool_ops(Formatter *fmt) const
4627{
4628 fmt->open_array_section("pool_ops");
4629 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4630 p != pool_ops.end();
4631 ++p) {
4632 PoolOp *op = p->second;
4633 fmt->open_object_section("pool_op");
4634 fmt->dump_unsigned("tid", op->tid);
4635 fmt->dump_int("pool", op->pool);
4636 fmt->dump_string("name", op->name);
4637 fmt->dump_int("operation_type", op->pool_op);
7c673cae
FG
4638 fmt->dump_unsigned("crush_rule", op->crush_rule);
4639 fmt->dump_stream("snapid") << op->snapid;
4640 fmt->dump_stream("last_sent") << op->last_submit;
4641 fmt->close_section(); // pool_op object
4642 }
4643 fmt->close_section(); // pool_ops array
4644}
4645
4646void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4647{
4648 fmt->open_array_section("pool_stat_ops");
4649 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4650 p != poolstat_ops.end();
4651 ++p) {
4652 PoolStatOp *op = p->second;
4653 fmt->open_object_section("pool_stat_op");
4654 fmt->dump_unsigned("tid", op->tid);
4655 fmt->dump_stream("last_sent") << op->last_submit;
4656
4657 fmt->open_array_section("pools");
4658 for (list<string>::const_iterator it = op->pools.begin();
4659 it != op->pools.end();
4660 ++it) {
4661 fmt->dump_string("pool", *it);
4662 }
4663 fmt->close_section(); // pools array
4664
4665 fmt->close_section(); // pool_stat_op object
4666 }
4667 fmt->close_section(); // pool_stat_ops array
4668}
4669
4670void Objecter::dump_statfs_ops(Formatter *fmt) const
4671{
4672 fmt->open_array_section("statfs_ops");
4673 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4674 p != statfs_ops.end();
4675 ++p) {
4676 StatfsOp *op = p->second;
4677 fmt->open_object_section("statfs_op");
4678 fmt->dump_unsigned("tid", op->tid);
4679 fmt->dump_stream("last_sent") << op->last_submit;
4680 fmt->close_section(); // statfs_op object
4681 }
4682 fmt->close_section(); // statfs_ops array
4683}
4684
4685Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4686 m_objecter(objecter)
4687{
4688}
4689
9f95a23c
TL
4690int Objecter::RequestStateHook::call(std::string_view command,
4691 const cmdmap_t& cmdmap,
4692 Formatter *f,
4693 std::ostream& ss,
4694 ceph::buffer::list& out)
7c673cae 4695{
7c673cae
FG
4696 shared_lock rl(m_objecter->rwlock);
4697 m_objecter->dump_requests(f);
9f95a23c 4698 return 0;
7c673cae
FG
4699}
4700
4701void Objecter::blacklist_self(bool set)
4702{
4703 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4704
4705 vector<string> cmd;
4706 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4707 if (set)
4708 cmd.push_back("\"blacklistop\":\"add\",");
4709 else
4710 cmd.push_back("\"blacklistop\":\"rm\",");
4711 stringstream ss;
11fdf7f2
TL
4712 // this is somewhat imprecise in that we are blacklisting our first addr only
4713 ss << messenger->get_myaddrs().front().get_legacy_str();
7c673cae
FG
4714 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4715
4716 MMonCommand *m = new MMonCommand(monc->get_fsid());
4717 m->cmd = cmd;
4718
4719 monc->send_mon_message(m);
4720}
4721
4722// commands
4723
4724void Objecter::handle_command_reply(MCommandReply *m)
4725{
4726 unique_lock wl(rwlock);
31f18b77 4727 if (!initialized) {
7c673cae
FG
4728 m->put();
4729 return;
4730 }
4731
4732 ConnectionRef con = m->get_connection();
11fdf7f2
TL
4733 auto priv = con->get_priv();
4734 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4735 if (!s || s->con != con) {
4736 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4737 m->put();
7c673cae
FG
4738 return;
4739 }
4740
4741 OSDSession::shared_lock sl(s->lock);
4742 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4743 if (p == s->command_ops.end()) {
4744 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4745 << " not found" << dendl;
4746 m->put();
4747 sl.unlock();
7c673cae
FG
4748 return;
4749 }
4750
4751 CommandOp *c = p->second;
4752 if (!c->session ||
4753 m->get_connection() != c->session->con) {
4754 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4755 << " got reply from wrong connection "
4756 << m->get_connection() << " " << m->get_source_inst()
4757 << dendl;
4758 m->put();
4759 sl.unlock();
7c673cae
FG
4760 return;
4761 }
9f95a23c
TL
4762 if (m->r == -EAGAIN) {
4763 ldout(cct,10) << __func__ << " tid " << m->get_tid()
4764 << " got EAGAIN, requesting map and resending" << dendl;
4765 // NOTE: This might resend twice... once now, and once again when
4766 // we get an updated osdmap and the PG is found to have moved.
4767 _maybe_request_map();
4768 _send_command(c);
4769 m->put();
4770 sl.unlock();
4771 return;
4772 }
4773
7c673cae
FG
4774 if (c->poutbl) {
4775 c->poutbl->claim(m->get_data());
4776 }
4777
4778 sl.unlock();
4779
28e407b8 4780 OSDSession::unique_lock sul(s->lock);
7c673cae 4781 _finish_command(c, m->r, m->rs);
28e407b8
AA
4782 sul.unlock();
4783
7c673cae 4784 m->put();
7c673cae
FG
4785}
4786
4787void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4788{
4789 shunique_lock sul(rwlock, ceph::acquire_unique);
4790
31f18b77 4791 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4792 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4793 c->tid = tid;
4794
4795 {
4796 OSDSession::unique_lock hs_wl(homeless_session->lock);
4797 _session_command_op_assign(homeless_session, c);
4798 }
4799
4800 _calc_command_target(c, sul);
4801 _assign_command_session(c, sul);
4802 if (osd_timeout > timespan(0)) {
4803 c->ontimeout = timer.add_event(osd_timeout,
4804 [this, c, tid]() {
4805 command_op_cancel(c->session, tid,
4806 -ETIMEDOUT); });
4807 }
4808
4809 if (!c->session->is_homeless()) {
4810 _send_command(c);
4811 } else {
4812 _maybe_request_map();
4813 }
4814 if (c->map_check_error)
4815 _send_command_map_check(c);
4816 *ptid = tid;
4817
4818 logger->inc(l_osdc_command_active);
4819}
4820
4821int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4822{
11fdf7f2 4823 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4824
4825 c->map_check_error = 0;
4826
4827 // ignore overlays, just like we do with pg ops
4828 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4829
4830 if (c->target_osd >= 0) {
4831 if (!osdmap->exists(c->target_osd)) {
4832 c->map_check_error = -ENOENT;
4833 c->map_check_error_str = "osd dne";
4834 c->target.osd = -1;
4835 return RECALC_OP_TARGET_OSD_DNE;
4836 }
4837 if (osdmap->is_down(c->target_osd)) {
4838 c->map_check_error = -ENXIO;
4839 c->map_check_error_str = "osd down";
4840 c->target.osd = -1;
4841 return RECALC_OP_TARGET_OSD_DOWN;
4842 }
4843 c->target.osd = c->target_osd;
4844 } else {
4845 int ret = _calc_target(&(c->target), nullptr, true);
4846 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4847 c->map_check_error = -ENOENT;
4848 c->map_check_error_str = "pool dne";
4849 c->target.osd = -1;
4850 return ret;
4851 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4852 c->map_check_error = -ENXIO;
4853 c->map_check_error_str = "osd down";
4854 c->target.osd = -1;
4855 return ret;
4856 }
4857 }
4858
4859 OSDSession *s;
4860 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4861 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4862
4863 if (c->session != s) {
4864 put_session(s);
4865 return RECALC_OP_TARGET_NEED_RESEND;
4866 }
4867
4868 put_session(s);
4869
4870 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4871 << c->session << dendl;
4872
4873 return RECALC_OP_TARGET_NO_ACTION;
4874}
4875
4876void Objecter::_assign_command_session(CommandOp *c,
4877 shunique_lock& sul)
4878{
11fdf7f2 4879 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4880
4881 OSDSession *s;
4882 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4883 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4884
4885 if (c->session != s) {
4886 if (c->session) {
4887 OSDSession *cs = c->session;
4888 OSDSession::unique_lock csl(cs->lock);
4889 _session_command_op_remove(c->session, c);
4890 csl.unlock();
4891 }
4892 OSDSession::unique_lock sl(s->lock);
4893 _session_command_op_assign(s, c);
4894 }
4895
4896 put_session(s);
4897}
4898
4899void Objecter::_send_command(CommandOp *c)
4900{
4901 ldout(cct, 10) << "_send_command " << c->tid << dendl;
11fdf7f2
TL
4902 ceph_assert(c->session);
4903 ceph_assert(c->session->con);
7c673cae
FG
4904 MCommand *m = new MCommand(monc->monmap.fsid);
4905 m->cmd = c->cmd;
4906 m->set_data(c->inbl);
4907 m->set_tid(c->tid);
4908 c->session->con->send_message(m);
4909 logger->inc(l_osdc_command_send);
4910}
4911
4912int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4913{
11fdf7f2 4914 ceph_assert(initialized);
7c673cae
FG
4915
4916 unique_lock wl(rwlock);
4917
4918 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4919 if (it == s->command_ops.end()) {
4920 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4921 return -ENOENT;
4922 }
4923
4924 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4925
4926 CommandOp *op = it->second;
4927 _command_cancel_map_check(op);
28e407b8 4928 OSDSession::unique_lock sl(op->session->lock);
7c673cae 4929 _finish_command(op, r, "");
28e407b8 4930 sl.unlock();
7c673cae
FG
4931 return 0;
4932}
4933
4934void Objecter::_finish_command(CommandOp *c, int r, string rs)
4935{
4936 // rwlock is locked unique
28e407b8 4937 // session lock is locked
7c673cae
FG
4938
4939 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4940 << rs << dendl;
4941 if (c->prs)
4942 *c->prs = rs;
4943 if (c->onfinish)
4944 c->onfinish->complete(r);
4945
4946 if (c->ontimeout && r != -ETIMEDOUT)
4947 timer.cancel_event(c->ontimeout);
4948
7c673cae 4949 _session_command_op_remove(c->session, c);
7c673cae
FG
4950
4951 c->put();
4952
4953 logger->dec(l_osdc_command_active);
4954}
4955
4956Objecter::OSDSession::~OSDSession()
4957{
4958 // Caller is responsible for re-assigning or
4959 // destroying any ops that were assigned to us
11fdf7f2
TL
4960 ceph_assert(ops.empty());
4961 ceph_assert(linger_ops.empty());
4962 ceph_assert(command_ops.empty());
7c673cae
FG
4963}
4964
9f95a23c
TL
4965Objecter::Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
4966 Finisher *fin,
4967 double mon_timeout,
4968 double osd_timeout) :
4969 Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
4970 trace_endpoint("0.0.0.0", 0, "Objecter"),
4971 osdmap{std::make_unique<OSDMap>()},
4972 homeless_session(new OSDSession(cct, -1)),
4973 mon_timeout(ceph::make_timespan(mon_timeout)),
4974 osd_timeout(ceph::make_timespan(osd_timeout)),
4975 op_throttle_bytes(cct, "objecter_bytes",
4976 cct->_conf->objecter_inflight_op_bytes),
4977 op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
4978 retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
4979{}
4980
7c673cae
FG
4981Objecter::~Objecter()
4982{
11fdf7f2
TL
4983 ceph_assert(homeless_session->get_nref() == 1);
4984 ceph_assert(num_homeless_ops == 0);
7c673cae
FG
4985 homeless_session->put();
4986
11fdf7f2
TL
4987 ceph_assert(osd_sessions.empty());
4988 ceph_assert(poolstat_ops.empty());
4989 ceph_assert(statfs_ops.empty());
4990 ceph_assert(pool_ops.empty());
4991 ceph_assert(waiting_for_map.empty());
4992 ceph_assert(linger_ops.empty());
4993 ceph_assert(check_latest_map_lingers.empty());
4994 ceph_assert(check_latest_map_ops.empty());
4995 ceph_assert(check_latest_map_commands.empty());
7c673cae 4996
11fdf7f2
TL
4997 ceph_assert(!m_request_state_hook);
4998 ceph_assert(!logger);
7c673cae
FG
4999}
5000
5001/**
5002 * Wait until this OSD map epoch is received before
5003 * sending any more operations to OSDs. Use this
5004 * when it is known that the client can't trust
5005 * anything from before this epoch (e.g. due to
5006 * client blacklist at this epoch).
5007 */
5008void Objecter::set_epoch_barrier(epoch_t epoch)
5009{
5010 unique_lock wl(rwlock);
5011
5012 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5013 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5014 << dendl;
5015 if (epoch > epoch_barrier) {
5016 epoch_barrier = epoch;
5017 _maybe_request_map();
5018 }
5019}
5020
5021
5022
5023hobject_t Objecter::enumerate_objects_begin()
5024{
5025 return hobject_t();
5026}
5027
5028hobject_t Objecter::enumerate_objects_end()
5029{
5030 return hobject_t::get_max();
5031}
5032
5033struct C_EnumerateReply : public Context {
9f95a23c 5034 ceph::buffer::list bl;
7c673cae
FG
5035
5036 Objecter *objecter;
5037 hobject_t *next;
5038 std::list<librados::ListObjectImpl> *result;
5039 const hobject_t end;
5040 const int64_t pool_id;
5041 Context *on_finish;
5042
5043 epoch_t epoch;
5044 int budget;
5045
5046 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5047 std::list<librados::ListObjectImpl> *result_,
5048 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5049 objecter(objecter_), next(next_), result(result_),
5050 end(end_), pool_id(pool_id_), on_finish(on_finish_),
11fdf7f2 5051 epoch(0), budget(-1)
7c673cae
FG
5052 {}
5053
5054 void finish(int r) override {
5055 objecter->_enumerate_reply(
5056 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5057 }
5058};
5059
5060void Objecter::enumerate_objects(
5061 int64_t pool_id,
5062 const std::string &ns,
5063 const hobject_t &start,
5064 const hobject_t &end,
5065 const uint32_t max,
9f95a23c 5066 const ceph::buffer::list &filter_bl,
7c673cae
FG
5067 std::list<librados::ListObjectImpl> *result,
5068 hobject_t *next,
5069 Context *on_finish)
5070{
11fdf7f2 5071 ceph_assert(result);
7c673cae
FG
5072
5073 if (!end.is_max() && start > end) {
5074 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5075 on_finish->complete(-EINVAL);
5076 return;
5077 }
5078
5079 if (max < 1) {
5080 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5081 on_finish->complete(-EINVAL);
5082 return;
5083 }
5084
5085 if (start.is_max()) {
5086 on_finish->complete(0);
5087 return;
5088 }
5089
5090 shared_lock rl(rwlock);
11fdf7f2 5091 ceph_assert(osdmap->get_epoch());
7c673cae
FG
5092 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5093 rl.unlock();
5094 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5095 on_finish->complete(-EOPNOTSUPP);
5096 return;
5097 }
5098 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5099 if (!p) {
5100 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5101 << osdmap->get_epoch() << dendl;
5102 rl.unlock();
5103 on_finish->complete(-ENOENT);
5104 return;
5105 } else {
5106 rl.unlock();
5107 }
5108
5109 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5110
5111 // Stash completion state
5112 C_EnumerateReply *on_ack = new C_EnumerateReply(
5113 this, next, result, end, pool_id, on_finish);
5114
5115 ObjectOperation op;
5116 op.pg_nls(max, filter_bl, start, 0);
5117
5118 // Issue. See you later in _enumerate_reply
5119 object_locator_t oloc(pool_id, ns);
5120 pg_read(start.get_hash(), oloc, op,
5121 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5122}
5123
5124void Objecter::_enumerate_reply(
9f95a23c 5125 ceph::buffer::list &bl,
7c673cae
FG
5126 int r,
5127 const hobject_t &end,
5128 const int64_t pool_id,
5129 int budget,
5130 epoch_t reply_epoch,
5131 std::list<librados::ListObjectImpl> *result,
5132 hobject_t *next,
5133 Context *on_finish)
5134{
11fdf7f2 5135 if (budget >= 0) {
7c673cae
FG
5136 put_op_budget_bytes(budget);
5137 }
5138
5139 if (r < 0) {
5140 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5141 on_finish->complete(r);
5142 return;
5143 }
5144
11fdf7f2 5145 ceph_assert(next != NULL);
7c673cae
FG
5146
5147 // Decode the results
11fdf7f2 5148 auto iter = bl.cbegin();
7c673cae
FG
5149 pg_nls_response_t response;
5150
11fdf7f2 5151 decode(response, iter);
7c673cae 5152 if (!iter.end()) {
9f95a23c
TL
5153 // extra_info isn't used anywhere. We do this solely to preserve
5154 // backward compatibility
5155 ceph::buffer::list legacy_extra_info;
5156 decode(legacy_extra_info, iter);
7c673cae
FG
5157 }
5158
5159 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5160 << " handle " << response.handle
5161 << " reply_epoch " << reply_epoch << dendl;
5162 ldout(cct, 20) << __func__ << ": response.entries.size "
5163 << response.entries.size() << ", response.entries "
5164 << response.entries << dendl;
5165 if (response.handle <= end) {
5166 *next = response.handle;
5167 } else {
5168 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5169 << dendl;
5170 *next = end;
5171
5172 // drop anything after 'end'
5173 shared_lock rl(rwlock);
5174 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5175 if (!pool) {
5176 // pool is gone, drop any results which are now meaningless.
5177 rl.unlock();
5178 on_finish->complete(-ENOENT);
5179 return;
5180 }
5181 while (!response.entries.empty()) {
5182 uint32_t hash = response.entries.back().locator.empty() ?
5183 pool->hash_key(response.entries.back().oid,
5184 response.entries.back().nspace) :
5185 pool->hash_key(response.entries.back().locator,
5186 response.entries.back().nspace);
5187 hobject_t last(response.entries.back().oid,
5188 response.entries.back().locator,
5189 CEPH_NOSNAP,
5190 hash,
5191 pool_id,
5192 response.entries.back().nspace);
5193 if (last < end)
5194 break;
5195 ldout(cct, 20) << __func__ << " dropping item " << last
5196 << " >= end " << end << dendl;
5197 response.entries.pop_back();
5198 }
5199 rl.unlock();
5200 }
5201 if (!response.entries.empty()) {
5202 result->merge(response.entries);
5203 }
5204
5205 // release the listing context's budget once all
5206 // OPs (in the session) are finished
5207#if 0
5208 put_nlist_context_budget(list_context);
5209#endif
5210 on_finish->complete(r);
5211 return;
5212}
5213
5214namespace {
5215 using namespace librados;
5216
5217 template <typename T>
9f95a23c 5218 void do_decode(std::vector<T>& items, std::vector<ceph::buffer::list>& bls)
7c673cae
FG
5219 {
5220 for (auto bl : bls) {
11fdf7f2 5221 auto p = bl.cbegin();
7c673cae
FG
5222 T t;
5223 decode(t, p);
5224 items.push_back(t);
5225 }
5226 }
5227
5228 struct C_ObjectOperation_scrub_ls : public Context {
9f95a23c 5229 ceph::buffer::list bl;
7c673cae
FG
5230 uint32_t *interval;
5231 std::vector<inconsistent_obj_t> *objects = nullptr;
5232 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5233 int *rval;
5234
5235 C_ObjectOperation_scrub_ls(uint32_t *interval,
5236 std::vector<inconsistent_obj_t> *objects,
5237 int *rval)
5238 : interval(interval), objects(objects), rval(rval) {}
5239 C_ObjectOperation_scrub_ls(uint32_t *interval,
5240 std::vector<inconsistent_snapset_t> *snapsets,
5241 int *rval)
5242 : interval(interval), snapsets(snapsets), rval(rval) {}
5243 void finish(int r) override {
5244 if (r < 0 && r != -EAGAIN) {
5245 if (rval)
5246 *rval = r;
5247 return;
5248 }
5249
5250 if (rval)
5251 *rval = 0;
5252
5253 try {
5254 decode();
9f95a23c 5255 } catch (ceph::buffer::error&) {
7c673cae
FG
5256 if (rval)
5257 *rval = -EIO;
5258 }
5259 }
5260 private:
5261 void decode() {
5262 scrub_ls_result_t result;
11fdf7f2 5263 auto p = bl.cbegin();
7c673cae
FG
5264 result.decode(p);
5265 *interval = result.interval;
5266 if (objects) {
5267 do_decode(*objects, result.vals);
5268 } else {
5269 do_decode(*snapsets, result.vals);
5270 }
5271 }
5272 };
5273
5274 template <typename T>
5275 void do_scrub_ls(::ObjectOperation *op,
5276 const scrub_ls_arg_t& arg,
5277 std::vector<T> *items,
5278 uint32_t *interval,
5279 int *rval)
5280 {
5281 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5282 op->flags |= CEPH_OSD_FLAG_PGOP;
11fdf7f2 5283 ceph_assert(interval);
7c673cae
FG
5284 arg.encode(osd_op.indata);
5285 unsigned p = op->ops.size() - 1;
5286 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5287 op->out_handler[p] = h;
5288 op->out_bl[p] = &h->bl;
5289 op->out_rval[p] = rval;
5290 }
5291}
5292
5293void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5294 uint64_t max_to_get,
5295 std::vector<librados::inconsistent_obj_t> *objects,
5296 uint32_t *interval,
5297 int *rval)
5298{
5299 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5300 do_scrub_ls(this, arg, objects, interval, rval);
5301}
5302
5303void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5304 uint64_t max_to_get,
5305 std::vector<librados::inconsistent_snapset_t> *snapsets,
5306 uint32_t *interval,
5307 int *rval)
5308{
5309 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5310 do_scrub_ls(this, arg, snapsets, interval, rval);
5311}