]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
import 15.2.4
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
9f95a23c
TL
15#include <cerrno>
16
7c673cae
FG
17#include "Objecter.h"
18#include "osd/OSDMap.h"
19#include "Filer.h"
20
21#include "mon/MonClient.h"
22
23#include "msg/Messenger.h"
24#include "msg/Message.h"
25
26#include "messages/MPing.h"
27#include "messages/MOSDOp.h"
28#include "messages/MOSDOpReply.h"
29#include "messages/MOSDBackoff.h"
30#include "messages/MOSDMap.h"
31
32#include "messages/MPoolOp.h"
33#include "messages/MPoolOpReply.h"
34
35#include "messages/MGetPoolStats.h"
36#include "messages/MGetPoolStatsReply.h"
37#include "messages/MStatfs.h"
38#include "messages/MStatfsReply.h"
39
40#include "messages/MMonCommand.h"
41
42#include "messages/MCommand.h"
43#include "messages/MCommandReply.h"
44
45#include "messages/MWatchNotify.h"
46
7c673cae
FG
47
48#include "common/config.h"
49#include "common/perf_counters.h"
50#include "common/scrub_types.h"
51#include "include/str_list.h"
52#include "common/errno.h"
53#include "common/EventTrace.h"
54
9f95a23c
TL
55using std::list;
56using std::make_pair;
57using std::map;
58using std::ostream;
59using std::ostringstream;
60using std::pair;
61using std::set;
62using std::string;
63using std::stringstream;
64using std::vector;
65
66using ceph::decode;
67using ceph::encode;
68using ceph::Formatter;
69
70using std::defer_lock;
71
7c673cae
FG
72using ceph::real_time;
73using ceph::real_clock;
74
75using ceph::mono_clock;
76using ceph::mono_time;
77
78using ceph::timespan;
79
9f95a23c
TL
80using ceph::shunique_lock;
81using ceph::acquire_shared;
82using ceph::acquire_unique;
7c673cae
FG
83
84#define dout_subsys ceph_subsys_objecter
85#undef dout_prefix
86#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
87
88
89enum {
90 l_osdc_first = 123200,
91 l_osdc_op_active,
92 l_osdc_op_laggy,
93 l_osdc_op_send,
94 l_osdc_op_send_bytes,
95 l_osdc_op_resend,
96 l_osdc_op_reply,
97
98 l_osdc_op,
99 l_osdc_op_r,
100 l_osdc_op_w,
101 l_osdc_op_rmw,
102 l_osdc_op_pg,
103
104 l_osdc_osdop_stat,
105 l_osdc_osdop_create,
106 l_osdc_osdop_read,
107 l_osdc_osdop_write,
108 l_osdc_osdop_writefull,
109 l_osdc_osdop_writesame,
110 l_osdc_osdop_append,
111 l_osdc_osdop_zero,
112 l_osdc_osdop_truncate,
113 l_osdc_osdop_delete,
114 l_osdc_osdop_mapext,
115 l_osdc_osdop_sparse_read,
116 l_osdc_osdop_clonerange,
117 l_osdc_osdop_getxattr,
118 l_osdc_osdop_setxattr,
119 l_osdc_osdop_cmpxattr,
120 l_osdc_osdop_rmxattr,
121 l_osdc_osdop_resetxattrs,
7c673cae
FG
122 l_osdc_osdop_call,
123 l_osdc_osdop_watch,
124 l_osdc_osdop_notify,
125 l_osdc_osdop_src_cmpxattr,
126 l_osdc_osdop_pgls,
127 l_osdc_osdop_pgls_filter,
128 l_osdc_osdop_other,
129
130 l_osdc_linger_active,
131 l_osdc_linger_send,
132 l_osdc_linger_resend,
133 l_osdc_linger_ping,
134
135 l_osdc_poolop_active,
136 l_osdc_poolop_send,
137 l_osdc_poolop_resend,
138
139 l_osdc_poolstat_active,
140 l_osdc_poolstat_send,
141 l_osdc_poolstat_resend,
142
143 l_osdc_statfs_active,
144 l_osdc_statfs_send,
145 l_osdc_statfs_resend,
146
147 l_osdc_command_active,
148 l_osdc_command_send,
149 l_osdc_command_resend,
150
151 l_osdc_map_epoch,
152 l_osdc_map_full,
153 l_osdc_map_inc,
154
155 l_osdc_osd_sessions,
156 l_osdc_osd_session_open,
157 l_osdc_osd_session_close,
158 l_osdc_osd_laggy,
159
160 l_osdc_osdop_omap_wr,
161 l_osdc_osdop_omap_rd,
162 l_osdc_osdop_omap_del,
163
164 l_osdc_last,
165};
166
167
168// config obs ----------------------------
169
170static const char *config_keys[] = {
171 "crush_location",
172 NULL
173};
174
175class Objecter::RequestStateHook : public AdminSocketHook {
176 Objecter *m_objecter;
177public:
178 explicit RequestStateHook(Objecter *objecter);
9f95a23c
TL
179 int call(std::string_view command, const cmdmap_t& cmdmap,
180 Formatter *f,
181 std::ostream& ss,
182 ceph::buffer::list& out) override;
7c673cae
FG
183};
184
185/**
186 * This is a more limited form of C_Contexts, but that requires
187 * a ceph_context which we don't have here.
188 */
189class ObjectOperation::C_TwoContexts : public Context {
190 Context *first;
191 Context *second;
192public:
193 C_TwoContexts(Context *first, Context *second) :
194 first(first), second(second) {}
195 void finish(int r) override {
196 first->complete(r);
197 second->complete(r);
198 first = NULL;
199 second = NULL;
200 }
201
202 ~C_TwoContexts() override {
203 delete first;
204 delete second;
205 }
206};
207
208void ObjectOperation::add_handler(Context *extra) {
209 size_t last = out_handler.size() - 1;
210 Context *orig = out_handler[last];
211 if (orig) {
212 Context *wrapper = new C_TwoContexts(orig, extra);
213 out_handler[last] = wrapper;
214 } else {
215 out_handler[last] = extra;
216 }
217}
218
219Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
220 object_t& oid)
221{
222 if (oid.name.empty())
223 return unique_completion_lock();
224
225 static constexpr uint32_t HASH_PRIME = 1021;
226 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
227 % HASH_PRIME;
228
229 return unique_completion_lock(completion_locks[h % num_locks],
230 std::defer_lock);
231}
232
233const char** Objecter::get_tracked_conf_keys() const
234{
235 return config_keys;
236}
237
238
11fdf7f2 239void Objecter::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
240 const std::set <std::string> &changed)
241{
242 if (changed.count("crush_location")) {
243 update_crush_location();
244 }
245}
246
247void Objecter::update_crush_location()
248{
249 unique_lock wl(rwlock);
250 crush_location = cct->crush_location.get_location();
251}
252
253// messages ------------------------------
254
255/*
256 * initialize only internal data structures, don't initiate cluster interaction
257 */
258void Objecter::init()
259{
11fdf7f2 260 ceph_assert(!initialized);
7c673cae
FG
261
262 if (!logger) {
263 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
264
265 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
266 PerfCountersBuilder::PRIO_CRITICAL);
267 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
268 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
11fdf7f2 269 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
270 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
271 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
272
273 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
274 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
275 PerfCountersBuilder::PRIO_CRITICAL);
276 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
277 PerfCountersBuilder::PRIO_CRITICAL);
278 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
279 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
280 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
281
282 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
283 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
284 "Create object operations");
285 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
286 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
287 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
288 "Write full object operations");
289 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
290 "Write same operations");
291 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
292 "Append operation");
293 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
294 "Set object to zero operations");
295 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
296 "Truncate object operations");
297 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
298 "Delete object operations");
299 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
300 "Map extent operations");
301 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
302 "Sparse read operations");
303 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
304 "Clone range operations");
305 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
306 "Get xattr operations");
307 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
308 "Set xattr operations");
309 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
310 "Xattr comparison operations");
311 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
312 "Remove xattr operations");
313 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
314 "Reset xattr operations");
7c673cae
FG
315 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
316 "Call (execute) operations");
317 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
318 "Watch by object operations");
319 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
320 "Notify about object operations");
321 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
322 "Extended attribute comparison in multi operations");
323 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
324 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
325 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
326
327 pcb.add_u64(l_osdc_linger_active, "linger_active",
328 "Active lingering operations");
329 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
330 "Sent lingering operations");
331 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
332 "Resent lingering operations");
333 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
334 "Sent pings to lingering operations");
335
336 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
337 "Active pool operations");
338 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
339 "Sent pool operations");
340 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
341 "Resent pool operations");
342
343 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
344 "Active get pool stat operations");
345 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
346 "Pool stat operations sent");
347 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
348 "Resent pool stats");
349
350 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
351 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
352 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
353 "Resent FS stats");
354
355 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
356 pcb.add_u64_counter(l_osdc_command_send, "command_send",
357 "Sent commands");
358 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
359 "Resent commands");
360
361 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
362 pcb.add_u64_counter(l_osdc_map_full, "map_full",
363 "Full OSD maps received");
364 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
365 "Incremental OSD maps received");
366
367 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
368 "Open sessions"); // open sessions
369 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
370 "Sessions opened");
371 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
372 "Sessions closed");
373 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
374
375 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
376 "OSD OMAP write operations");
377 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
378 "OSD OMAP read operations");
379 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
380 "OSD OMAP delete operations");
381
382 logger = pcb.create_perf_counters();
383 cct->get_perfcounters_collection()->add(logger);
384 }
385
386 m_request_state_hook = new RequestStateHook(this);
387 AdminSocket* admin_socket = cct->get_admin_socket();
388 int ret = admin_socket->register_command("objecter_requests",
7c673cae
FG
389 m_request_state_hook,
390 "show in-progress osd requests");
391
392 /* Don't warn on EEXIST, happens if multiple ceph clients
393 * are instantiated from one process */
394 if (ret < 0 && ret != -EEXIST) {
395 lderr(cct) << "error registering admin socket command: "
396 << cpp_strerror(ret) << dendl;
397 }
398
399 update_crush_location();
400
11fdf7f2 401 cct->_conf.add_observer(this);
7c673cae 402
31f18b77 403 initialized = true;
7c673cae
FG
404}
405
406/*
407 * ok, cluster interaction can happen
408 */
409void Objecter::start(const OSDMap* o)
410{
411 shared_lock rl(rwlock);
412
413 start_tick();
414 if (o) {
415 osdmap->deepish_copy_from(*o);
9f95a23c 416 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
417 } else if (osdmap->get_epoch() == 0) {
418 _maybe_request_map();
419 }
420}
421
422void Objecter::shutdown()
423{
11fdf7f2 424 ceph_assert(initialized);
7c673cae
FG
425
426 unique_lock wl(rwlock);
427
31f18b77 428 initialized = false;
7c673cae 429
f64942e4 430 wl.unlock();
11fdf7f2 431 cct->_conf.remove_observer(this);
f64942e4 432 wl.lock();
7c673cae
FG
433
434 map<int,OSDSession*>::iterator p;
435 while (!osd_sessions.empty()) {
436 p = osd_sessions.begin();
437 close_session(p->second);
438 }
439
440 while(!check_latest_map_lingers.empty()) {
441 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
442 i->second->put();
443 check_latest_map_lingers.erase(i->first);
444 }
445
446 while(!check_latest_map_ops.empty()) {
447 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
448 i->second->put();
449 check_latest_map_ops.erase(i->first);
450 }
451
452 while(!check_latest_map_commands.empty()) {
453 map<ceph_tid_t, CommandOp*>::iterator i
454 = check_latest_map_commands.begin();
455 i->second->put();
456 check_latest_map_commands.erase(i->first);
457 }
458
459 while(!poolstat_ops.empty()) {
460 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
461 delete i->second;
462 poolstat_ops.erase(i->first);
463 }
464
465 while(!statfs_ops.empty()) {
466 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
467 delete i->second;
468 statfs_ops.erase(i->first);
469 }
470
471 while(!pool_ops.empty()) {
472 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
473 delete i->second;
474 pool_ops.erase(i->first);
475 }
476
477 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
478 while(!homeless_session->linger_ops.empty()) {
479 std::map<uint64_t, LingerOp*>::iterator i
480 = homeless_session->linger_ops.begin();
481 ldout(cct, 10) << " linger_op " << i->first << dendl;
482 LingerOp *lop = i->second;
483 {
484 OSDSession::unique_lock swl(homeless_session->lock);
485 _session_linger_op_remove(homeless_session, lop);
486 }
487 linger_ops.erase(lop->linger_id);
488 linger_ops_set.erase(lop);
489 lop->put();
490 }
491
492 while(!homeless_session->ops.empty()) {
493 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
494 ldout(cct, 10) << " op " << i->first << dendl;
495 Op *op = i->second;
496 {
497 OSDSession::unique_lock swl(homeless_session->lock);
498 _session_op_remove(homeless_session, op);
499 }
500 op->put();
501 }
502
503 while(!homeless_session->command_ops.empty()) {
504 std::map<ceph_tid_t, CommandOp*>::iterator i
505 = homeless_session->command_ops.begin();
506 ldout(cct, 10) << " command_op " << i->first << dendl;
507 CommandOp *cop = i->second;
508 {
509 OSDSession::unique_lock swl(homeless_session->lock);
510 _session_command_op_remove(homeless_session, cop);
511 }
512 cop->put();
513 }
514
515 if (tick_event) {
516 if (timer.cancel_event(tick_event)) {
517 ldout(cct, 10) << " successfully canceled tick" << dendl;
518 }
519 tick_event = 0;
520 }
521
522 if (logger) {
523 cct->get_perfcounters_collection()->remove(logger);
524 delete logger;
525 logger = NULL;
526 }
527
528 // Let go of Objecter write lock so timer thread can shutdown
529 wl.unlock();
530
531 // Outside of lock to avoid cycle WRT calls to RequestStateHook
532 // This is safe because we guarantee no concurrent calls to
533 // shutdown() with the ::initialized check at start.
534 if (m_request_state_hook) {
535 AdminSocket* admin_socket = cct->get_admin_socket();
9f95a23c 536 admin_socket->unregister_commands(m_request_state_hook);
7c673cae
FG
537 delete m_request_state_hook;
538 m_request_state_hook = NULL;
539 }
540}
541
542void Objecter::_send_linger(LingerOp *info,
543 shunique_lock& sul)
544{
11fdf7f2 545 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
546
547 vector<OSDOp> opv;
548 Context *oncommit = NULL;
549 LingerOp::shared_lock watchl(info->watch_lock);
9f95a23c 550 ceph::buffer::list *poutbl = NULL;
7c673cae
FG
551 if (info->registered && info->is_watch) {
552 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
553 << dendl;
554 opv.push_back(OSDOp());
555 opv.back().op.op = CEPH_OSD_OP_WATCH;
556 opv.back().op.watch.cookie = info->get_cookie();
557 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
558 opv.back().op.watch.gen = ++info->register_gen;
559 oncommit = new C_Linger_Reconnect(this, info);
560 } else {
561 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
562 << dendl;
563 opv = info->ops;
564 C_Linger_Commit *c = new C_Linger_Commit(this, info);
565 if (!info->is_watch) {
566 info->notify_id = 0;
567 poutbl = &c->outbl;
568 }
569 oncommit = c;
570 }
571 watchl.unlock();
572 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
573 opv, info->target.flags | CEPH_OSD_FLAG_READ,
574 oncommit, info->pobjver);
575 o->outbl = poutbl;
576 o->snapid = info->snap;
577 o->snapc = info->snapc;
578 o->mtime = info->mtime;
579
580 o->target = info->target;
31f18b77 581 o->tid = ++last_tid;
7c673cae
FG
582
583 // do not resend this; we will send a new op to reregister
584 o->should_resend = false;
11fdf7f2 585 o->ctx_budgeted = true;
7c673cae
FG
586
587 if (info->register_tid) {
11fdf7f2 588 // repeat send. cancel old registration op, if any.
7c673cae
FG
589 OSDSession::unique_lock sl(info->session->lock);
590 if (info->session->ops.count(info->register_tid)) {
591 Op *o = info->session->ops[info->register_tid];
592 _op_cancel_map_check(o);
593 _cancel_linger_op(o);
594 }
595 sl.unlock();
7c673cae
FG
596 }
597
11fdf7f2
TL
598 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
599
7c673cae
FG
600 logger->inc(l_osdc_linger_send);
601}
602
9f95a23c 603void Objecter::_linger_commit(LingerOp *info, int r, ceph::buffer::list& outbl)
7c673cae
FG
604{
605 LingerOp::unique_lock wl(info->watch_lock);
606 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
607 if (info->on_reg_commit) {
608 info->on_reg_commit->complete(r);
609 info->on_reg_commit = NULL;
610 }
28e407b8
AA
611 if (r < 0 && info->on_notify_finish) {
612 info->on_notify_finish->complete(r);
613 info->on_notify_finish = nullptr;
614 }
7c673cae
FG
615
616 // only tell the user the first time we do this
617 info->registered = true;
618 info->pobjver = NULL;
619
620 if (!info->is_watch) {
621 // make note of the notify_id
11fdf7f2 622 auto p = outbl.cbegin();
7c673cae 623 try {
11fdf7f2 624 decode(info->notify_id, p);
7c673cae
FG
625 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
626 << dendl;
627 }
9f95a23c 628 catch (ceph::buffer::error& e) {
7c673cae
FG
629 }
630 }
631}
632
633struct C_DoWatchError : public Context {
634 Objecter *objecter;
635 Objecter::LingerOp *info;
636 int err;
637 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
638 : objecter(o), info(i), err(r) {
639 info->get();
640 info->_queued_async();
641 }
642 void finish(int r) override {
643 Objecter::unique_lock wl(objecter->rwlock);
644 bool canceled = info->canceled;
645 wl.unlock();
646
647 if (!canceled) {
648 info->watch_context->handle_error(info->get_cookie(), err);
649 }
650
651 info->finished_async();
652 info->put();
653 }
654};
655
656int Objecter::_normalize_watch_error(int r)
657{
658 // translate ENOENT -> ENOTCONN so that a delete->disconnection
11fdf7f2 659 // notification and a failure to reconnect because we raced with
7c673cae
FG
660 // the delete appear the same to the user.
661 if (r == -ENOENT)
662 r = -ENOTCONN;
663 return r;
664}
665
666void Objecter::_linger_reconnect(LingerOp *info, int r)
667{
668 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
669 << " (last_error " << info->last_error << ")" << dendl;
670 if (r < 0) {
671 LingerOp::unique_lock wl(info->watch_lock);
672 if (!info->last_error) {
673 r = _normalize_watch_error(r);
674 info->last_error = r;
675 if (info->watch_context) {
676 finisher->queue(new C_DoWatchError(this, info, r));
677 }
678 }
679 wl.unlock();
680 }
681}
682
683void Objecter::_send_linger_ping(LingerOp *info)
684{
685 // rwlock is locked unique
686 // info->session->lock is locked
687
688 if (cct->_conf->objecter_inject_no_watch_ping) {
689 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
690 << dendl;
691 return;
692 }
693 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
694 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
695 return;
696 }
697
11fdf7f2 698 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
7c673cae
FG
699 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
700 << dendl;
701
702 vector<OSDOp> opv(1);
703 opv[0].op.op = CEPH_OSD_OP_WATCH;
704 opv[0].op.watch.cookie = info->get_cookie();
705 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
706 opv[0].op.watch.gen = info->register_gen;
707 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
708 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
709 opv, info->target.flags | CEPH_OSD_FLAG_READ,
710 onack, NULL, NULL);
711 o->target = info->target;
712 o->should_resend = false;
713 _send_op_account(o);
31f18b77 714 o->tid = ++last_tid;
7c673cae 715 _session_op_assign(info->session, o);
11fdf7f2 716 _send_op(o);
7c673cae
FG
717 info->ping_tid = o->tid;
718
719 onack->sent = now;
720 logger->inc(l_osdc_linger_ping);
721}
722
11fdf7f2 723void Objecter::_linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
7c673cae
FG
724 uint32_t register_gen)
725{
726 LingerOp::unique_lock l(info->watch_lock);
727 ldout(cct, 10) << __func__ << " " << info->linger_id
728 << " sent " << sent << " gen " << register_gen << " = " << r
729 << " (last_error " << info->last_error
730 << " register_gen " << info->register_gen << ")" << dendl;
731 if (info->register_gen == register_gen) {
732 if (r == 0) {
733 info->watch_valid_thru = sent;
734 } else if (r < 0 && !info->last_error) {
735 r = _normalize_watch_error(r);
736 info->last_error = r;
737 if (info->watch_context) {
738 finisher->queue(new C_DoWatchError(this, info, r));
739 }
740 }
741 } else {
742 ldout(cct, 20) << " ignoring old gen" << dendl;
743 }
744}
745
746int Objecter::linger_check(LingerOp *info)
747{
748 LingerOp::shared_lock l(info->watch_lock);
749
11fdf7f2 750 ceph::coarse_mono_time stamp = info->watch_valid_thru;
7c673cae 751 if (!info->watch_pending_async.empty())
11fdf7f2
TL
752 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
753 auto age = ceph::coarse_mono_clock::now() - stamp;
7c673cae
FG
754
755 ldout(cct, 10) << __func__ << " " << info->linger_id
756 << " err " << info->last_error
757 << " age " << age << dendl;
758 if (info->last_error)
759 return info->last_error;
760 // return a safe upper bound (we are truncating to ms)
761 return
762 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
763}
764
765void Objecter::linger_cancel(LingerOp *info)
766{
767 unique_lock wl(rwlock);
768 _linger_cancel(info);
769 info->put();
770}
771
772void Objecter::_linger_cancel(LingerOp *info)
773{
774 // rwlock is locked unique
775 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
776 if (!info->canceled) {
777 OSDSession *s = info->session;
778 OSDSession::unique_lock sl(s->lock);
779 _session_linger_op_remove(s, info);
780 sl.unlock();
781
782 linger_ops.erase(info->linger_id);
783 linger_ops_set.erase(info);
11fdf7f2 784 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
785
786 info->canceled = true;
787 info->put();
788
789 logger->dec(l_osdc_linger_active);
790 }
791}
792
793
794
795Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
796 const object_locator_t& oloc,
797 int flags)
798{
11fdf7f2 799 LingerOp *info = new LingerOp(this);
7c673cae
FG
800 info->target.base_oid = oid;
801 info->target.base_oloc = oloc;
802 if (info->target.base_oloc.key == oid)
803 info->target.base_oloc.key.clear();
804 info->target.flags = flags;
11fdf7f2 805 info->watch_valid_thru = ceph::coarse_mono_clock::now();
7c673cae
FG
806
807 unique_lock l(rwlock);
808
809 // Acquire linger ID
810 info->linger_id = ++max_linger_id;
811 ldout(cct, 10) << __func__ << " info " << info
812 << " linger_id " << info->linger_id
813 << " cookie " << info->get_cookie()
814 << dendl;
815 linger_ops[info->linger_id] = info;
816 linger_ops_set.insert(info);
11fdf7f2 817 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
818
819 info->get(); // for the caller
820 return info;
821}
822
823ceph_tid_t Objecter::linger_watch(LingerOp *info,
824 ObjectOperation& op,
825 const SnapContext& snapc,
826 real_time mtime,
9f95a23c 827 ceph::buffer::list& inbl,
7c673cae
FG
828 Context *oncommit,
829 version_t *objver)
830{
831 info->is_watch = true;
832 info->snapc = snapc;
833 info->mtime = mtime;
834 info->target.flags |= CEPH_OSD_FLAG_WRITE;
835 info->ops = op.ops;
836 info->inbl = inbl;
837 info->poutbl = NULL;
838 info->pobjver = objver;
839 info->on_reg_commit = oncommit;
840
11fdf7f2
TL
841 info->ctx_budget = take_linger_budget(info);
842
7c673cae
FG
843 shunique_lock sul(rwlock, ceph::acquire_unique);
844 _linger_submit(info, sul);
845 logger->inc(l_osdc_linger_active);
846
847 return info->linger_id;
848}
849
850ceph_tid_t Objecter::linger_notify(LingerOp *info,
851 ObjectOperation& op,
9f95a23c
TL
852 snapid_t snap, ceph::buffer::list& inbl,
853 ceph::buffer::list *poutbl,
7c673cae
FG
854 Context *onfinish,
855 version_t *objver)
856{
857 info->snap = snap;
858 info->target.flags |= CEPH_OSD_FLAG_READ;
859 info->ops = op.ops;
860 info->inbl = inbl;
861 info->poutbl = poutbl;
862 info->pobjver = objver;
863 info->on_reg_commit = onfinish;
864
11fdf7f2
TL
865 info->ctx_budget = take_linger_budget(info);
866
7c673cae
FG
867 shunique_lock sul(rwlock, ceph::acquire_unique);
868 _linger_submit(info, sul);
869 logger->inc(l_osdc_linger_active);
870
871 return info->linger_id;
872}
873
874void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
875{
11fdf7f2
TL
876 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
877 ceph_assert(info->linger_id);
878 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
7c673cae
FG
879
880 // Populate Op::target
881 OSDSession *s = NULL;
882 _calc_target(&info->target, nullptr);
883
884 // Create LingerOp<->OSDSession relation
885 int r = _get_session(info->target.osd, &s, sul);
11fdf7f2 886 ceph_assert(r == 0);
7c673cae
FG
887 OSDSession::unique_lock sl(s->lock);
888 _session_linger_op_assign(s, info);
889 sl.unlock();
890 put_session(s);
891
892 _send_linger(info, sul);
893}
894
895struct C_DoWatchNotify : public Context {
896 Objecter *objecter;
897 Objecter::LingerOp *info;
898 MWatchNotify *msg;
899 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
900 : objecter(o), info(i), msg(m) {
901 info->get();
902 info->_queued_async();
903 msg->get();
904 }
905 void finish(int r) override {
906 objecter->_do_watch_notify(info, msg);
907 }
908};
909
910void Objecter::handle_watch_notify(MWatchNotify *m)
911{
912 shared_lock l(rwlock);
31f18b77 913 if (!initialized) {
7c673cae
FG
914 return;
915 }
916
917 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
918 if (linger_ops_set.count(info) == 0) {
919 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
920 return;
921 }
922 LingerOp::unique_lock wl(info->watch_lock);
923 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
924 if (!info->last_error) {
925 info->last_error = -ENOTCONN;
926 if (info->watch_context) {
927 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
928 }
929 }
930 } else if (!info->is_watch) {
931 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
932 // since we know the only user (librados) is safe to call in
933 // fast-dispatch context
934 if (info->notify_id &&
935 info->notify_id != m->notify_id) {
936 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
937 << " != " << info->notify_id << ", ignoring" << dendl;
938 } else if (info->on_notify_finish) {
939 info->notify_result_bl->claim(m->get_data());
940 info->on_notify_finish->complete(m->return_code);
941
942 // if we race with reconnect we might get a second notify; only
943 // notify the caller once!
944 info->on_notify_finish = NULL;
945 }
946 } else {
947 finisher->queue(new C_DoWatchNotify(this, info, m));
948 }
949}
950
951void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
952{
953 ldout(cct, 10) << __func__ << " " << *m << dendl;
954
955 shared_lock l(rwlock);
11fdf7f2 956 ceph_assert(initialized);
7c673cae
FG
957
958 if (info->canceled) {
959 l.unlock();
960 goto out;
961 }
962
963 // notify completion?
11fdf7f2
TL
964 ceph_assert(info->is_watch);
965 ceph_assert(info->watch_context);
966 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
7c673cae
FG
967
968 l.unlock();
969
970 switch (m->opcode) {
971 case CEPH_WATCH_EVENT_NOTIFY:
972 info->watch_context->handle_notify(m->notify_id, m->cookie,
973 m->notifier_gid, m->bl);
974 break;
975 }
976
977 out:
978 info->finished_async();
979 info->put();
980 m->put();
981}
982
983bool Objecter::ms_dispatch(Message *m)
984{
985 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
7c673cae
FG
986 switch (m->get_type()) {
987 // these we exlusively handle
988 case CEPH_MSG_OSD_OPREPLY:
989 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
990 return true;
991
992 case CEPH_MSG_OSD_BACKOFF:
993 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
994 return true;
995
996 case CEPH_MSG_WATCH_NOTIFY:
997 handle_watch_notify(static_cast<MWatchNotify*>(m));
998 m->put();
999 return true;
1000
1001 case MSG_COMMAND_REPLY:
1002 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
1003 handle_command_reply(static_cast<MCommandReply*>(m));
1004 return true;
1005 } else {
1006 return false;
1007 }
1008
1009 case MSG_GETPOOLSTATSREPLY:
1010 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
1011 return true;
1012
1013 case CEPH_MSG_POOLOP_REPLY:
1014 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1015 return true;
1016
1017 case CEPH_MSG_STATFS_REPLY:
1018 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1019 return true;
1020
1021 // these we give others a chance to inspect
1022
1023 // MDS, OSD
1024 case CEPH_MSG_OSD_MAP:
1025 handle_osd_map(static_cast<MOSDMap*>(m));
1026 return false;
1027 }
1028 return false;
1029}
1030
11fdf7f2
TL
1031void Objecter::_scan_requests(
1032 OSDSession *s,
1033 bool skipped_map,
1034 bool cluster_full,
1035 map<int64_t, bool> *pool_full_map,
1036 map<ceph_tid_t, Op*>& need_resend,
1037 list<LingerOp*>& need_resend_linger,
1038 map<ceph_tid_t, CommandOp*>& need_resend_command,
9f95a23c 1039 shunique_lock& sul)
7c673cae 1040{
11fdf7f2 1041 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
1042
1043 list<LingerOp*> unregister_lingers;
1044
1045 OSDSession::unique_lock sl(s->lock);
1046
1047 // check for changed linger mappings (_before_ regular ops)
1048 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1049 while (lp != s->linger_ops.end()) {
1050 LingerOp *op = lp->second;
11fdf7f2 1051 ceph_assert(op->session == s);
7c673cae
FG
1052 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1053 // invalidation
1054 ++lp;
1055 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1056 bool unregister, force_resend_writes = cluster_full;
1057 int r = _recalc_linger_op_target(op, sul);
1058 if (pool_full_map)
1059 force_resend_writes = force_resend_writes ||
1060 (*pool_full_map)[op->target.base_oloc.pool];
1061 switch (r) {
1062 case RECALC_OP_TARGET_NO_ACTION:
11fdf7f2 1063 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1064 break;
1065 // -- fall-thru --
1066 case RECALC_OP_TARGET_NEED_RESEND:
1067 need_resend_linger.push_back(op);
1068 _linger_cancel_map_check(op);
1069 break;
1070 case RECALC_OP_TARGET_POOL_DNE:
1071 _check_linger_pool_dne(op, &unregister);
1072 if (unregister) {
1073 ldout(cct, 10) << " need to unregister linger op "
1074 << op->linger_id << dendl;
1075 op->get();
1076 unregister_lingers.push_back(op);
1077 }
1078 break;
1079 }
1080 }
1081
1082 // check for changed request mappings
1083 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1084 while (p != s->ops.end()) {
1085 Op *op = p->second;
1086 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1087 ldout(cct, 10) << " checking op " << op->tid << dendl;
11fdf7f2 1088 _prune_snapc(osdmap->get_new_removed_snaps(), op);
7c673cae
FG
1089 bool force_resend_writes = cluster_full;
1090 if (pool_full_map)
1091 force_resend_writes = force_resend_writes ||
1092 (*pool_full_map)[op->target.base_oloc.pool];
1093 int r = _calc_target(&op->target,
1094 op->session ? op->session->con.get() : nullptr);
1095 switch (r) {
1096 case RECALC_OP_TARGET_NO_ACTION:
9f95a23c 1097 if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
7c673cae
FG
1098 break;
1099 // -- fall-thru --
1100 case RECALC_OP_TARGET_NEED_RESEND:
11fdf7f2 1101 _session_op_remove(op->session, op);
7c673cae
FG
1102 need_resend[op->tid] = op;
1103 _op_cancel_map_check(op);
1104 break;
1105 case RECALC_OP_TARGET_POOL_DNE:
1106 _check_op_pool_dne(op, &sl);
1107 break;
1108 }
1109 }
1110
1111 // commands
1112 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1113 while (cp != s->command_ops.end()) {
1114 CommandOp *c = cp->second;
1115 ++cp;
1116 ldout(cct, 10) << " checking command " << c->tid << dendl;
1117 bool force_resend_writes = cluster_full;
1118 if (pool_full_map)
1119 force_resend_writes = force_resend_writes ||
1120 (*pool_full_map)[c->target_pg.pool()];
1121 int r = _calc_command_target(c, sul);
1122 switch (r) {
1123 case RECALC_OP_TARGET_NO_ACTION:
1124 // resend if skipped map; otherwise do nothing.
11fdf7f2 1125 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1126 break;
1127 // -- fall-thru --
1128 case RECALC_OP_TARGET_NEED_RESEND:
1129 need_resend_command[c->tid] = c;
11fdf7f2 1130 _session_command_op_remove(c->session, c);
7c673cae
FG
1131 _command_cancel_map_check(c);
1132 break;
1133 case RECALC_OP_TARGET_POOL_DNE:
1134 case RECALC_OP_TARGET_OSD_DNE:
1135 case RECALC_OP_TARGET_OSD_DOWN:
1136 _check_command_map_dne(c);
1137 break;
1138 }
1139 }
1140
1141 sl.unlock();
1142
1143 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1144 iter != unregister_lingers.end();
1145 ++iter) {
1146 _linger_cancel(*iter);
1147 (*iter)->put();
1148 }
1149}
1150
1151void Objecter::handle_osd_map(MOSDMap *m)
1152{
1153 shunique_lock sul(rwlock, acquire_unique);
31f18b77 1154 if (!initialized)
7c673cae
FG
1155 return;
1156
11fdf7f2 1157 ceph_assert(osdmap);
7c673cae
FG
1158
1159 if (m->fsid != monc->get_fsid()) {
1160 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1161 << " != " << monc->get_fsid() << dendl;
1162 return;
1163 }
1164
1165 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1166 bool cluster_full = _osdmap_full_flag();
1167 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1168 _osdmap_has_pool_full();
1169 map<int64_t, bool> pool_full_map;
1170 for (map<int64_t, pg_pool_t>::const_iterator it
1171 = osdmap->get_pools().begin();
1172 it != osdmap->get_pools().end(); ++it)
1173 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1174
1175
1176 list<LingerOp*> need_resend_linger;
1177 map<ceph_tid_t, Op*> need_resend;
1178 map<ceph_tid_t, CommandOp*> need_resend_command;
1179
1180 if (m->get_last() <= osdmap->get_epoch()) {
1181 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1182 << m->get_first() << "," << m->get_last()
1183 << "] <= " << osdmap->get_epoch() << dendl;
1184 } else {
1185 ldout(cct, 3) << "handle_osd_map got epochs ["
1186 << m->get_first() << "," << m->get_last()
1187 << "] > " << osdmap->get_epoch() << dendl;
1188
1189 if (osdmap->get_epoch()) {
1190 bool skipped_map = false;
1191 // we want incrementals
1192 for (epoch_t e = osdmap->get_epoch() + 1;
1193 e <= m->get_last();
1194 e++) {
1195
1196 if (osdmap->get_epoch() == e-1 &&
1197 m->incremental_maps.count(e)) {
1198 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1199 << dendl;
1200 OSDMap::Incremental inc(m->incremental_maps[e]);
1201 osdmap->apply_incremental(inc);
31f18b77
FG
1202
1203 emit_blacklist_events(inc);
1204
7c673cae
FG
1205 logger->inc(l_osdc_map_inc);
1206 }
1207 else if (m->maps.count(e)) {
1208 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
9f95a23c 1209 auto new_osdmap = std::make_unique<OSDMap>();
31f18b77
FG
1210 new_osdmap->decode(m->maps[e]);
1211
1212 emit_blacklist_events(*osdmap, *new_osdmap);
9f95a23c 1213 osdmap = std::move(new_osdmap);
31f18b77 1214
7c673cae
FG
1215 logger->inc(l_osdc_map_full);
1216 }
1217 else {
1218 if (e >= m->get_oldest()) {
1219 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1220 << osdmap->get_epoch()+1 << dendl;
1221 _maybe_request_map();
1222 break;
1223 }
1224 ldout(cct, 3) << "handle_osd_map missing epoch "
1225 << osdmap->get_epoch()+1
1226 << ", jumping to " << m->get_oldest() << dendl;
1227 e = m->get_oldest() - 1;
1228 skipped_map = true;
1229 continue;
1230 }
1231 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1232
9f95a23c 1233 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1234 cluster_full = cluster_full || _osdmap_full_flag();
1235 update_pool_full_map(pool_full_map);
1236
1237 // check all outstanding requests on every epoch
11fdf7f2
TL
1238 for (auto& i : need_resend) {
1239 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
11fdf7f2 1240 }
7c673cae
FG
1241 _scan_requests(homeless_session, skipped_map, cluster_full,
1242 &pool_full_map, need_resend,
9f95a23c 1243 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1244 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1245 p != osd_sessions.end(); ) {
1246 OSDSession *s = p->second;
1247 _scan_requests(s, skipped_map, cluster_full,
1248 &pool_full_map, need_resend,
9f95a23c 1249 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1250 ++p;
1251 // osd down or addr change?
1252 if (!osdmap->is_up(s->osd) ||
1253 (s->con &&
11fdf7f2 1254 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
7c673cae
FG
1255 close_session(s);
1256 }
1257 }
1258
11fdf7f2 1259 ceph_assert(e == osdmap->get_epoch());
7c673cae
FG
1260 }
1261
1262 } else {
1263 // first map. we want the full thing.
1264 if (m->maps.count(m->get_last())) {
1265 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1266 p != osd_sessions.end(); ++p) {
1267 OSDSession *s = p->second;
1268 _scan_requests(s, false, false, NULL, need_resend,
9f95a23c 1269 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1270 }
1271 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1272 << m->get_last() << dendl;
1273 osdmap->decode(m->maps[m->get_last()]);
9f95a23c 1274 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1275
1276 _scan_requests(homeless_session, false, false, NULL,
1277 need_resend, need_resend_linger,
9f95a23c 1278 need_resend_command, sul);
7c673cae
FG
1279 } else {
1280 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1281 << dendl;
1282 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1283 monc->renew_subs();
1284 }
1285 }
1286 }
1287
1288 // make sure need_resend targets reflect latest map
1289 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1290 Op *op = p->second;
1291 if (op->target.epoch < osdmap->get_epoch()) {
1292 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1293 int r = _calc_target(&op->target, nullptr);
1294 if (r == RECALC_OP_TARGET_POOL_DNE) {
1295 p = need_resend.erase(p);
1296 _check_op_pool_dne(op, nullptr);
1297 } else {
1298 ++p;
1299 }
1300 } else {
1301 ++p;
1302 }
1303 }
1304
1305 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1306 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1307 || _osdmap_has_pool_full();
1308
1309 // was/is paused?
1310 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1311 osdmap->get_epoch() < epoch_barrier) {
1312 _maybe_request_map();
1313 }
1314
1315 // resend requests
1316 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1317 p != need_resend.end(); ++p) {
1318 Op *op = p->second;
1319 OSDSession *s = op->session;
1320 bool mapped_session = false;
1321 if (!s) {
1322 int r = _map_session(&op->target, &s, sul);
11fdf7f2 1323 ceph_assert(r == 0);
7c673cae
FG
1324 mapped_session = true;
1325 } else {
1326 get_session(s);
1327 }
1328 OSDSession::unique_lock sl(s->lock);
1329 if (mapped_session) {
1330 _session_op_assign(s, op);
1331 }
1332 if (op->should_resend) {
1333 if (!op->session->is_homeless() && !op->target.paused) {
1334 logger->inc(l_osdc_op_resend);
1335 _send_op(op);
1336 }
1337 } else {
1338 _op_cancel_map_check(op);
1339 _cancel_linger_op(op);
1340 }
1341 sl.unlock();
1342 put_session(s);
1343 }
1344 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1345 p != need_resend_linger.end(); ++p) {
1346 LingerOp *op = *p;
11fdf7f2 1347 ceph_assert(op->session);
7c673cae
FG
1348 if (!op->session->is_homeless()) {
1349 logger->inc(l_osdc_linger_resend);
1350 _send_linger(op, sul);
1351 }
1352 }
1353 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1354 p != need_resend_command.end(); ++p) {
1355 CommandOp *c = p->second;
1356 if (c->target.osd >= 0) {
1357 _assign_command_session(c, sul);
1358 if (c->session && !c->session->is_homeless()) {
1359 _send_command(c);
1360 }
1361 }
1362 }
1363
1364 _dump_active();
1365
1366 // finish any Contexts that were waiting on a map update
1367 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1368 waiting_for_map.begin();
1369 while (p != waiting_for_map.end() &&
1370 p->first <= osdmap->get_epoch()) {
1371 //go through the list and call the onfinish methods
1372 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1373 i != p->second.end(); ++i) {
1374 i->first->complete(i->second);
1375 }
1376 waiting_for_map.erase(p++);
1377 }
1378
1379 monc->sub_got("osdmap", osdmap->get_epoch());
1380
1381 if (!waiting_for_map.empty()) {
1382 _maybe_request_map();
1383 }
1384}
1385
31f18b77
FG
1386void Objecter::enable_blacklist_events()
1387{
1388 unique_lock wl(rwlock);
1389
1390 blacklist_events_enabled = true;
1391}
1392
1393void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1394{
1395 unique_lock wl(rwlock);
1396
1397 if (events->empty()) {
1398 events->swap(blacklist_events);
1399 } else {
1400 for (const auto &i : blacklist_events) {
1401 events->insert(i);
1402 }
1403 blacklist_events.clear();
1404 }
1405}
1406
1407void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1408{
1409 if (!blacklist_events_enabled) {
1410 return;
1411 }
1412
1413 for (const auto &i : inc.new_blacklist) {
1414 blacklist_events.insert(i.first);
1415 }
1416}
1417
1418void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1419 const OSDMap &new_osd_map)
1420{
1421 if (!blacklist_events_enabled) {
1422 return;
1423 }
1424
1425 std::set<entity_addr_t> old_set;
1426 std::set<entity_addr_t> new_set;
1427
1428 old_osd_map.get_blacklist(&old_set);
1429 new_osd_map.get_blacklist(&new_set);
1430
1431 std::set<entity_addr_t> delta_set;
1432 std::set_difference(
1433 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1434 std::inserter(delta_set, delta_set.begin()));
1435 blacklist_events.insert(delta_set.begin(), delta_set.end());
1436}
1437
7c673cae
FG
1438// op pool check
1439
1440void Objecter::C_Op_Map_Latest::finish(int r)
1441{
1442 if (r == -EAGAIN || r == -ECANCELED)
1443 return;
1444
1445 lgeneric_subdout(objecter->cct, objecter, 10)
1446 << "op_map_latest r=" << r << " tid=" << tid
1447 << " latest " << latest << dendl;
1448
1449 Objecter::unique_lock wl(objecter->rwlock);
1450
1451 map<ceph_tid_t, Op*>::iterator iter =
1452 objecter->check_latest_map_ops.find(tid);
1453 if (iter == objecter->check_latest_map_ops.end()) {
1454 lgeneric_subdout(objecter->cct, objecter, 10)
1455 << "op_map_latest op "<< tid << " not found" << dendl;
1456 return;
1457 }
1458
1459 Op *op = iter->second;
1460 objecter->check_latest_map_ops.erase(iter);
1461
1462 lgeneric_subdout(objecter->cct, objecter, 20)
1463 << "op_map_latest op "<< op << dendl;
1464
1465 if (op->map_dne_bound == 0)
1466 op->map_dne_bound = latest;
1467
1468 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1469 objecter->_check_op_pool_dne(op, &sl);
1470
1471 op->put();
1472}
1473
1474int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1475 snapid_t *snap) const
1476{
1477 shared_lock rl(rwlock);
1478
1479 auto& pools = osdmap->get_pools();
1480 auto iter = pools.find(poolid);
1481 if (iter == pools.end()) {
1482 return -ENOENT;
1483 }
1484 const pg_pool_t& pg_pool = iter->second;
1485 for (auto p = pg_pool.snaps.begin();
1486 p != pg_pool.snaps.end();
1487 ++p) {
1488 if (p->second.name == snap_name) {
1489 *snap = p->first;
1490 return 0;
1491 }
1492 }
1493 return -ENOENT;
1494}
1495
1496int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1497 pool_snap_info_t *info) const
1498{
1499 shared_lock rl(rwlock);
1500
1501 auto& pools = osdmap->get_pools();
1502 auto iter = pools.find(poolid);
1503 if (iter == pools.end()) {
1504 return -ENOENT;
1505 }
1506 const pg_pool_t& pg_pool = iter->second;
1507 auto p = pg_pool.snaps.find(snap);
1508 if (p == pg_pool.snaps.end())
1509 return -ENOENT;
1510 *info = p->second;
1511
1512 return 0;
1513}
1514
1515int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1516{
1517 shared_lock rl(rwlock);
1518
1519 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1520 if (!pi)
1521 return -ENOENT;
1522 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1523 p != pi->snaps.end();
1524 ++p) {
1525 snaps->push_back(p->first);
1526 }
1527 return 0;
1528}
1529
1530// sl may be unlocked.
1531void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1532{
1533 // rwlock is locked unique
1534
1535 if (op->target.pool_ever_existed) {
1536 // the pool previously existed and now it does not, which means it
1537 // was deleted.
1538 op->map_dne_bound = osdmap->get_epoch();
1539 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1540 << " pool previously exists but now does not"
1541 << dendl;
1542 } else {
1543 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1544 << " current " << osdmap->get_epoch()
1545 << " map_dne_bound " << op->map_dne_bound
1546 << dendl;
1547 }
1548 if (op->map_dne_bound > 0) {
1549 if (osdmap->get_epoch() >= op->map_dne_bound) {
1550 // we had a new enough map
1551 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1552 << " concluding pool " << op->target.base_pgid.pool()
1553 << " dne" << dendl;
1554 if (op->onfinish) {
11fdf7f2 1555 num_in_flight--;
7c673cae
FG
1556 op->onfinish->complete(-ENOENT);
1557 }
1558
1559 OSDSession *s = op->session;
1560 if (s) {
11fdf7f2
TL
1561 ceph_assert(s != NULL);
1562 ceph_assert(sl->mutex() == &s->lock);
7c673cae
FG
1563 bool session_locked = sl->owns_lock();
1564 if (!session_locked) {
1565 sl->lock();
1566 }
1567 _finish_op(op, 0);
1568 if (!session_locked) {
1569 sl->unlock();
1570 }
1571 } else {
1572 _finish_op(op, 0); // no session
1573 }
1574 }
1575 } else {
1576 _send_op_map_check(op);
1577 }
1578}
1579
1580void Objecter::_send_op_map_check(Op *op)
1581{
1582 // rwlock is locked unique
1583 // ask the monitor
1584 if (check_latest_map_ops.count(op->tid) == 0) {
1585 op->get();
1586 check_latest_map_ops[op->tid] = op;
1587 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1588 monc->get_version("osdmap", &c->latest, NULL, c);
1589 }
1590}
1591
1592void Objecter::_op_cancel_map_check(Op *op)
1593{
1594 // rwlock is locked unique
1595 map<ceph_tid_t, Op*>::iterator iter =
1596 check_latest_map_ops.find(op->tid);
1597 if (iter != check_latest_map_ops.end()) {
1598 Op *op = iter->second;
1599 op->put();
1600 check_latest_map_ops.erase(iter);
1601 }
1602}
1603
1604// linger pool check
1605
1606void Objecter::C_Linger_Map_Latest::finish(int r)
1607{
1608 if (r == -EAGAIN || r == -ECANCELED) {
1609 // ignore callback; we will retry in resend_mon_ops()
1610 return;
1611 }
1612
1613 unique_lock wl(objecter->rwlock);
1614
1615 map<uint64_t, LingerOp*>::iterator iter =
1616 objecter->check_latest_map_lingers.find(linger_id);
1617 if (iter == objecter->check_latest_map_lingers.end()) {
1618 return;
1619 }
1620
1621 LingerOp *op = iter->second;
1622 objecter->check_latest_map_lingers.erase(iter);
1623
1624 if (op->map_dne_bound == 0)
1625 op->map_dne_bound = latest;
1626
1627 bool unregister;
1628 objecter->_check_linger_pool_dne(op, &unregister);
1629
1630 if (unregister) {
1631 objecter->_linger_cancel(op);
1632 }
1633
1634 op->put();
1635}
1636
1637void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1638{
1639 // rwlock is locked unique
1640
1641 *need_unregister = false;
1642
1643 if (op->register_gen > 0) {
1644 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1645 << " pool previously existed but now does not"
1646 << dendl;
1647 op->map_dne_bound = osdmap->get_epoch();
1648 } else {
1649 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1650 << " current " << osdmap->get_epoch()
1651 << " map_dne_bound " << op->map_dne_bound
1652 << dendl;
1653 }
1654 if (op->map_dne_bound > 0) {
1655 if (osdmap->get_epoch() >= op->map_dne_bound) {
28e407b8 1656 LingerOp::unique_lock wl{op->watch_lock};
7c673cae
FG
1657 if (op->on_reg_commit) {
1658 op->on_reg_commit->complete(-ENOENT);
28e407b8
AA
1659 op->on_reg_commit = nullptr;
1660 }
1661 if (op->on_notify_finish) {
1662 op->on_notify_finish->complete(-ENOENT);
1663 op->on_notify_finish = nullptr;
7c673cae
FG
1664 }
1665 *need_unregister = true;
1666 }
1667 } else {
1668 _send_linger_map_check(op);
1669 }
1670}
1671
1672void Objecter::_send_linger_map_check(LingerOp *op)
1673{
1674 // ask the monitor
1675 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1676 op->get();
1677 check_latest_map_lingers[op->linger_id] = op;
1678 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1679 monc->get_version("osdmap", &c->latest, NULL, c);
1680 }
1681}
1682
1683void Objecter::_linger_cancel_map_check(LingerOp *op)
1684{
1685 // rwlock is locked unique
1686
1687 map<uint64_t, LingerOp*>::iterator iter =
1688 check_latest_map_lingers.find(op->linger_id);
1689 if (iter != check_latest_map_lingers.end()) {
1690 LingerOp *op = iter->second;
1691 op->put();
1692 check_latest_map_lingers.erase(iter);
1693 }
1694}
1695
1696// command pool check
1697
1698void Objecter::C_Command_Map_Latest::finish(int r)
1699{
1700 if (r == -EAGAIN || r == -ECANCELED) {
1701 // ignore callback; we will retry in resend_mon_ops()
1702 return;
1703 }
1704
1705 unique_lock wl(objecter->rwlock);
1706
1707 map<uint64_t, CommandOp*>::iterator iter =
1708 objecter->check_latest_map_commands.find(tid);
1709 if (iter == objecter->check_latest_map_commands.end()) {
1710 return;
1711 }
1712
1713 CommandOp *c = iter->second;
1714 objecter->check_latest_map_commands.erase(iter);
1715
1716 if (c->map_dne_bound == 0)
1717 c->map_dne_bound = latest;
1718
28e407b8 1719 OSDSession::unique_lock sul(c->session->lock);
7c673cae 1720 objecter->_check_command_map_dne(c);
28e407b8 1721 sul.unlock();
7c673cae
FG
1722
1723 c->put();
1724}
1725
1726void Objecter::_check_command_map_dne(CommandOp *c)
1727{
1728 // rwlock is locked unique
28e407b8 1729 // session is locked unique
7c673cae
FG
1730
1731 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1732 << " current " << osdmap->get_epoch()
1733 << " map_dne_bound " << c->map_dne_bound
1734 << dendl;
1735 if (c->map_dne_bound > 0) {
1736 if (osdmap->get_epoch() >= c->map_dne_bound) {
1737 _finish_command(c, c->map_check_error, c->map_check_error_str);
1738 }
1739 } else {
1740 _send_command_map_check(c);
1741 }
1742}
1743
1744void Objecter::_send_command_map_check(CommandOp *c)
1745{
1746 // rwlock is locked unique
28e407b8 1747 // session is locked unique
7c673cae
FG
1748
1749 // ask the monitor
1750 if (check_latest_map_commands.count(c->tid) == 0) {
1751 c->get();
1752 check_latest_map_commands[c->tid] = c;
1753 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1754 monc->get_version("osdmap", &f->latest, NULL, f);
1755 }
1756}
1757
1758void Objecter::_command_cancel_map_check(CommandOp *c)
1759{
1760 // rwlock is locked uniqe
1761
1762 map<uint64_t, CommandOp*>::iterator iter =
1763 check_latest_map_commands.find(c->tid);
1764 if (iter != check_latest_map_commands.end()) {
1765 CommandOp *c = iter->second;
1766 c->put();
1767 check_latest_map_commands.erase(iter);
1768 }
1769}
1770
1771
1772/**
1773 * Look up OSDSession by OSD id.
1774 *
1775 * @returns 0 on success, or -EAGAIN if the lock context requires
1776 * promotion to write.
1777 */
1778int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1779{
11fdf7f2 1780 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
1781
1782 if (osd < 0) {
1783 *session = homeless_session;
1784 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1785 << dendl;
1786 return 0;
1787 }
1788
1789 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1790 if (p != osd_sessions.end()) {
1791 OSDSession *s = p->second;
1792 s->get();
1793 *session = s;
1794 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1795 << s->get_nref() << dendl;
1796 return 0;
1797 }
1798 if (!sul.owns_lock()) {
1799 return -EAGAIN;
1800 }
1801 OSDSession *s = new OSDSession(cct, osd);
1802 osd_sessions[osd] = s;
11fdf7f2
TL
1803 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1804 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1805 logger->inc(l_osdc_osd_session_open);
1806 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1807 s->get();
1808 *session = s;
1809 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1810 << s->get_nref() << dendl;
1811 return 0;
1812}
1813
1814void Objecter::put_session(Objecter::OSDSession *s)
1815{
1816 if (s && !s->is_homeless()) {
1817 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1818 << s->get_nref() << dendl;
1819 s->put();
1820 }
1821}
1822
1823void Objecter::get_session(Objecter::OSDSession *s)
1824{
11fdf7f2 1825 ceph_assert(s != NULL);
7c673cae
FG
1826
1827 if (!s->is_homeless()) {
1828 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1829 << s->get_nref() << dendl;
1830 s->get();
1831 }
1832}
1833
1834void Objecter::_reopen_session(OSDSession *s)
1835{
91327a77 1836 // rwlock is locked unique
7c673cae
FG
1837 // s->lock is locked
1838
11fdf7f2 1839 auto addrs = osdmap->get_addrs(s->osd);
7c673cae 1840 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
11fdf7f2 1841 << addrs << dendl;
7c673cae
FG
1842 if (s->con) {
1843 s->con->set_priv(NULL);
1844 s->con->mark_down();
1845 logger->inc(l_osdc_osd_session_close);
1846 }
11fdf7f2
TL
1847 s->con = messenger->connect_to_osd(addrs);
1848 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1849 s->incarnation++;
1850 logger->inc(l_osdc_osd_session_open);
1851}
1852
1853void Objecter::close_session(OSDSession *s)
1854{
1855 // rwlock is locked unique
1856
1857 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1858 if (s->con) {
1859 s->con->set_priv(NULL);
1860 s->con->mark_down();
1861 logger->inc(l_osdc_osd_session_close);
1862 }
1863 OSDSession::unique_lock sl(s->lock);
1864
1865 std::list<LingerOp*> homeless_lingers;
1866 std::list<CommandOp*> homeless_commands;
1867 std::list<Op*> homeless_ops;
1868
1869 while (!s->linger_ops.empty()) {
1870 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1871 ldout(cct, 10) << " linger_op " << i->first << dendl;
1872 homeless_lingers.push_back(i->second);
1873 _session_linger_op_remove(s, i->second);
1874 }
1875
1876 while (!s->ops.empty()) {
1877 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1878 ldout(cct, 10) << " op " << i->first << dendl;
1879 homeless_ops.push_back(i->second);
1880 _session_op_remove(s, i->second);
1881 }
1882
1883 while (!s->command_ops.empty()) {
1884 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1885 ldout(cct, 10) << " command_op " << i->first << dendl;
1886 homeless_commands.push_back(i->second);
1887 _session_command_op_remove(s, i->second);
1888 }
1889
1890 osd_sessions.erase(s->osd);
1891 sl.unlock();
1892 put_session(s);
1893
1894 // Assign any leftover ops to the homeless session
1895 {
1896 OSDSession::unique_lock hsl(homeless_session->lock);
1897 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1898 i != homeless_lingers.end(); ++i) {
1899 _session_linger_op_assign(homeless_session, *i);
1900 }
1901 for (std::list<Op*>::iterator i = homeless_ops.begin();
1902 i != homeless_ops.end(); ++i) {
1903 _session_op_assign(homeless_session, *i);
1904 }
1905 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1906 i != homeless_commands.end(); ++i) {
1907 _session_command_op_assign(homeless_session, *i);
1908 }
1909 }
1910
1911 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1912}
1913
9f95a23c 1914void Objecter::wait_for_osd_map(epoch_t e)
7c673cae
FG
1915{
1916 unique_lock l(rwlock);
9f95a23c 1917 if (osdmap->get_epoch() >= e) {
7c673cae
FG
1918 l.unlock();
1919 return;
1920 }
1921
1922 // Leave this since it goes with C_SafeCond
9f95a23c
TL
1923 ceph::mutex lock = ceph::make_mutex("");
1924 ceph::condition_variable cond;
7c673cae 1925 bool done;
9f95a23c
TL
1926 std::unique_lock mlock{lock};
1927 C_SafeCond *context = new C_SafeCond(lock, cond, &done, NULL);
1928 waiting_for_map[e].push_back(pair<Context*, int>(context, 0));
7c673cae 1929 l.unlock();
9f95a23c 1930 cond.wait(mlock, [&done] { return done; });
7c673cae
FG
1931}
1932
1933struct C_Objecter_GetVersion : public Context {
1934 Objecter *objecter;
1935 uint64_t oldest, newest;
1936 Context *fin;
1937 C_Objecter_GetVersion(Objecter *o, Context *c)
1938 : objecter(o), oldest(0), newest(0), fin(c) {}
1939 void finish(int r) override {
1940 if (r >= 0) {
1941 objecter->get_latest_version(oldest, newest, fin);
1942 } else if (r == -EAGAIN) { // try again as instructed
1943 objecter->wait_for_latest_osdmap(fin);
1944 } else {
1945 // it doesn't return any other error codes!
1946 ceph_abort();
1947 }
1948 }
1949};
1950
1951void Objecter::wait_for_latest_osdmap(Context *fin)
1952{
1953 ldout(cct, 10) << __func__ << dendl;
1954 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1955 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1956}
1957
1958void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1959{
1960 unique_lock wl(rwlock);
7c673cae 1961 if (osdmap->get_epoch() >= newest) {
11fdf7f2
TL
1962 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1963 wl.unlock();
7c673cae
FG
1964 if (fin)
1965 fin->complete(0);
1966 return;
1967 }
1968
1969 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1970 _wait_for_new_map(fin, newest, 0);
1971}
1972
1973void Objecter::maybe_request_map()
1974{
1975 shared_lock rl(rwlock);
1976 _maybe_request_map();
1977}
1978
1979void Objecter::_maybe_request_map()
1980{
1981 // rwlock is locked
1982 int flag = 0;
1983 if (_osdmap_full_flag()
1984 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1985 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1986 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1987 "osd map (FULL flag is set)" << dendl;
1988 } else {
1989 ldout(cct, 10)
1990 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1991 flag = CEPH_SUBSCRIBE_ONETIME;
1992 }
1993 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1994 if (monc->sub_want("osdmap", epoch, flag)) {
1995 monc->renew_subs();
1996 }
1997}
1998
1999void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
2000{
2001 // rwlock is locked unique
2002 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
2003 _maybe_request_map();
2004}
2005
2006
2007/**
2008 * Use this together with wait_for_map: this is a pre-check to avoid
2009 * allocating a Context for wait_for_map if we can see that we
2010 * definitely already have the epoch.
2011 *
2012 * This does *not* replace the need to handle the return value of
2013 * wait_for_map: just because we don't have it in this pre-check
2014 * doesn't mean we won't have it when calling back into wait_for_map,
2015 * since the objecter lock is dropped in between.
2016 */
2017bool Objecter::have_map(const epoch_t epoch)
2018{
2019 shared_lock rl(rwlock);
2020 if (osdmap->get_epoch() >= epoch) {
2021 return true;
2022 } else {
2023 return false;
2024 }
2025}
2026
2027bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2028{
2029 unique_lock wl(rwlock);
2030 if (osdmap->get_epoch() >= epoch) {
2031 return true;
2032 }
2033 _wait_for_new_map(c, epoch, err);
2034 return false;
2035}
2036
7c673cae
FG
2037void Objecter::_kick_requests(OSDSession *session,
2038 map<uint64_t, LingerOp *>& lresend)
2039{
2040 // rwlock is locked unique
2041
2042 // clear backoffs
2043 session->backoffs.clear();
2044 session->backoffs_by_id.clear();
2045
2046 // resend ops
2047 map<ceph_tid_t,Op*> resend; // resend in tid order
2048 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2049 p != session->ops.end();) {
2050 Op *op = p->second;
2051 ++p;
7c673cae
FG
2052 if (op->should_resend) {
2053 if (!op->target.paused)
2054 resend[op->tid] = op;
2055 } else {
2056 _op_cancel_map_check(op);
2057 _cancel_linger_op(op);
2058 }
2059 }
2060
11fdf7f2 2061 logger->inc(l_osdc_op_resend, resend.size());
7c673cae
FG
2062 while (!resend.empty()) {
2063 _send_op(resend.begin()->second);
2064 resend.erase(resend.begin());
2065 }
2066
2067 // resend lingers
11fdf7f2 2068 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
7c673cae
FG
2069 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2070 j != session->linger_ops.end(); ++j) {
2071 LingerOp *op = j->second;
2072 op->get();
11fdf7f2 2073 ceph_assert(lresend.count(j->first) == 0);
7c673cae
FG
2074 lresend[j->first] = op;
2075 }
2076
2077 // resend commands
11fdf7f2 2078 logger->inc(l_osdc_command_resend, session->command_ops.size());
7c673cae
FG
2079 map<uint64_t,CommandOp*> cresend; // resend in order
2080 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2081 k != session->command_ops.end(); ++k) {
7c673cae
FG
2082 cresend[k->first] = k->second;
2083 }
2084 while (!cresend.empty()) {
2085 _send_command(cresend.begin()->second);
2086 cresend.erase(cresend.begin());
2087 }
2088}
2089
2090void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2091 unique_lock& ul)
2092{
11fdf7f2 2093 ceph_assert(ul.owns_lock());
7c673cae
FG
2094 shunique_lock sul(std::move(ul));
2095 while (!lresend.empty()) {
2096 LingerOp *op = lresend.begin()->second;
2097 if (!op->canceled) {
2098 _send_linger(op, sul);
2099 }
2100 op->put();
2101 lresend.erase(lresend.begin());
2102 }
11fdf7f2 2103 ul = sul.release_to_unique();
7c673cae
FG
2104}
2105
2106void Objecter::start_tick()
2107{
11fdf7f2 2108 ceph_assert(tick_event == 0);
7c673cae
FG
2109 tick_event =
2110 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2111 &Objecter::tick, this);
2112}
2113
2114void Objecter::tick()
2115{
2116 shared_lock rl(rwlock);
2117
2118 ldout(cct, 10) << "tick" << dendl;
2119
2120 // we are only called by C_Tick
2121 tick_event = 0;
2122
31f18b77 2123 if (!initialized) {
7c673cae
FG
2124 // we raced with shutdown
2125 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2126 return;
2127 }
2128
2129 set<OSDSession*> toping;
2130
2131
2132 // look for laggy requests
11fdf7f2 2133 auto cutoff = ceph::coarse_mono_clock::now();
7c673cae
FG
2134 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2135
2136 unsigned laggy_ops = 0;
2137
2138 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2139 siter != osd_sessions.end(); ++siter) {
2140 OSDSession *s = siter->second;
2141 OSDSession::lock_guard l(s->lock);
2142 bool found = false;
2143 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2144 p != s->ops.end();
2145 ++p) {
2146 Op *op = p->second;
11fdf7f2 2147 ceph_assert(op->session);
7c673cae
FG
2148 if (op->stamp < cutoff) {
2149 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2150 << " is laggy" << dendl;
2151 found = true;
2152 ++laggy_ops;
2153 }
2154 }
2155 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2156 p != s->linger_ops.end();
2157 ++p) {
2158 LingerOp *op = p->second;
2159 LingerOp::unique_lock wl(op->watch_lock);
11fdf7f2 2160 ceph_assert(op->session);
7c673cae
FG
2161 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2162 << " (osd." << op->session->osd << ")" << dendl;
2163 found = true;
2164 if (op->is_watch && op->registered && !op->last_error)
2165 _send_linger_ping(op);
2166 }
2167 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2168 p != s->command_ops.end();
2169 ++p) {
2170 CommandOp *op = p->second;
11fdf7f2 2171 ceph_assert(op->session);
7c673cae
FG
2172 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2173 << " (osd." << op->session->osd << ")" << dendl;
2174 found = true;
2175 }
2176 if (found)
2177 toping.insert(s);
2178 }
31f18b77 2179 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2180 _maybe_request_map();
2181 }
2182
2183 logger->set(l_osdc_op_laggy, laggy_ops);
2184 logger->set(l_osdc_osd_laggy, toping.size());
2185
2186 if (!toping.empty()) {
2187 // send a ping to these osds, to ensure we detect any session resets
2188 // (osd reply message policy is lossy)
2189 for (set<OSDSession*>::const_iterator i = toping.begin();
2190 i != toping.end();
2191 ++i) {
2192 (*i)->con->send_message(new MPing);
2193 }
2194 }
2195
11fdf7f2 2196 // Make sure we don't reschedule if we wake up after shutdown
31f18b77 2197 if (initialized) {
7c673cae
FG
2198 tick_event = timer.reschedule_me(ceph::make_timespan(
2199 cct->_conf->objecter_tick_interval));
2200 }
2201}
2202
2203void Objecter::resend_mon_ops()
2204{
2205 unique_lock wl(rwlock);
2206
2207 ldout(cct, 10) << "resend_mon_ops" << dendl;
2208
2209 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2210 p != poolstat_ops.end();
2211 ++p) {
2212 _poolstat_submit(p->second);
2213 logger->inc(l_osdc_poolstat_resend);
2214 }
2215
2216 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2217 p != statfs_ops.end();
2218 ++p) {
2219 _fs_stats_submit(p->second);
2220 logger->inc(l_osdc_statfs_resend);
2221 }
2222
2223 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2224 p != pool_ops.end();
2225 ++p) {
2226 _pool_op_submit(p->second);
2227 logger->inc(l_osdc_poolop_resend);
2228 }
2229
2230 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2231 p != check_latest_map_ops.end();
2232 ++p) {
2233 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2234 monc->get_version("osdmap", &c->latest, NULL, c);
2235 }
2236
2237 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2238 p != check_latest_map_lingers.end();
2239 ++p) {
2240 C_Linger_Map_Latest *c
2241 = new C_Linger_Map_Latest(this, p->second->linger_id);
2242 monc->get_version("osdmap", &c->latest, NULL, c);
2243 }
2244
2245 for (map<uint64_t, CommandOp*>::iterator p
2246 = check_latest_map_commands.begin();
2247 p != check_latest_map_commands.end();
2248 ++p) {
2249 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2250 monc->get_version("osdmap", &c->latest, NULL, c);
2251 }
2252}
2253
2254// read | write ---------------------------
2255
2256void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2257{
2258 shunique_lock rl(rwlock, ceph::acquire_shared);
2259 ceph_tid_t tid = 0;
2260 if (!ptid)
2261 ptid = &tid;
31f18b77 2262 op->trace.event("op submit");
7c673cae
FG
2263 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2264}
2265
2266void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2267 ceph_tid_t *ptid,
2268 int *ctx_budget)
2269{
11fdf7f2 2270 ceph_assert(initialized);
7c673cae 2271
11fdf7f2
TL
2272 ceph_assert(op->ops.size() == op->out_bl.size());
2273 ceph_assert(op->ops.size() == op->out_rval.size());
2274 ceph_assert(op->ops.size() == op->out_handler.size());
7c673cae
FG
2275
2276 // throttle. before we look at any state, because
2277 // _take_op_budget() may drop our lock while it blocks.
2278 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2279 int op_budget = _take_op_budget(op, sul);
2280 // take and pass out the budget for the first OP
2281 // in the context session
2282 if (ctx_budget && (*ctx_budget == -1)) {
2283 *ctx_budget = op_budget;
2284 }
2285 }
2286
2287 if (osd_timeout > timespan(0)) {
2288 if (op->tid == 0)
31f18b77 2289 op->tid = ++last_tid;
7c673cae
FG
2290 auto tid = op->tid;
2291 op->ontimeout = timer.add_event(osd_timeout,
2292 [this, tid]() {
2293 op_cancel(tid, -ETIMEDOUT); });
2294 }
2295
2296 _op_submit(op, sul, ptid);
2297}
2298
2299void Objecter::_send_op_account(Op *op)
2300{
31f18b77 2301 inflight_ops++;
7c673cae
FG
2302
2303 // add to gather set(s)
2304 if (op->onfinish) {
31f18b77 2305 num_in_flight++;
7c673cae
FG
2306 } else {
2307 ldout(cct, 20) << " note: not requesting reply" << dendl;
2308 }
2309
2310 logger->inc(l_osdc_op_active);
2311 logger->inc(l_osdc_op);
2312
2313 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2314 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2315 logger->inc(l_osdc_op_rmw);
2316 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2317 logger->inc(l_osdc_op_w);
2318 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2319 logger->inc(l_osdc_op_r);
2320
2321 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2322 logger->inc(l_osdc_op_pg);
2323
2324 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2325 int code = l_osdc_osdop_other;
2326 switch (p->op.op) {
2327 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2328 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2329 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2330 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2331 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2332 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2333 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2334 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2335 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2336 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2337 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2338 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2339 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2340 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2341 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2342 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2343 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
7c673cae
FG
2344
2345 // OMAP read operations
2346 case CEPH_OSD_OP_OMAPGETVALS:
2347 case CEPH_OSD_OP_OMAPGETKEYS:
2348 case CEPH_OSD_OP_OMAPGETHEADER:
2349 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2350 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2351
2352 // OMAP write operations
2353 case CEPH_OSD_OP_OMAPSETVALS:
2354 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2355
2356 // OMAP del operations
2357 case CEPH_OSD_OP_OMAPCLEAR:
2358 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2359
2360 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2361 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2362 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2363 }
2364 if (code)
2365 logger->inc(code);
2366 }
2367}
2368
2369void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2370{
2371 // rwlock is locked
2372
2373 ldout(cct, 10) << __func__ << " op " << op << dendl;
2374
2375 // pick target
11fdf7f2 2376 ceph_assert(op->session == NULL);
7c673cae
FG
2377 OSDSession *s = NULL;
2378
2379 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2380 == RECALC_OP_TARGET_POOL_DNE;
2381
2382 // Try to get a session, including a retry if we need to take write lock
2383 int r = _get_session(op->target.osd, &s, sul);
2384 if (r == -EAGAIN ||
11fdf7f2
TL
2385 (check_for_latest_map && sul.owns_lock_shared()) ||
2386 cct->_conf->objecter_debug_inject_relock_delay) {
7c673cae
FG
2387 epoch_t orig_epoch = osdmap->get_epoch();
2388 sul.unlock();
2389 if (cct->_conf->objecter_debug_inject_relock_delay) {
2390 sleep(1);
2391 }
2392 sul.lock();
2393 if (orig_epoch != osdmap->get_epoch()) {
2394 // map changed; recalculate mapping
2395 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2396 << dendl;
2397 check_for_latest_map = _calc_target(&op->target, nullptr)
2398 == RECALC_OP_TARGET_POOL_DNE;
2399 if (s) {
2400 put_session(s);
2401 s = NULL;
2402 r = -EAGAIN;
2403 }
2404 }
2405 }
2406 if (r == -EAGAIN) {
11fdf7f2 2407 ceph_assert(s == NULL);
7c673cae
FG
2408 r = _get_session(op->target.osd, &s, sul);
2409 }
11fdf7f2
TL
2410 ceph_assert(r == 0);
2411 ceph_assert(s); // may be homeless
7c673cae
FG
2412
2413 _send_op_account(op);
2414
2415 // send?
2416
11fdf7f2 2417 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
7c673cae 2418
9f95a23c 2419 if (pool_full_try) {
7c673cae
FG
2420 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2421 }
2422
2423 bool need_send = false;
9f95a23c
TL
2424 if (op->target.paused) {
2425 ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
7c673cae 2426 << dendl;
7c673cae
FG
2427 _maybe_request_map();
2428 } else if (!s->is_homeless()) {
2429 need_send = true;
2430 } else {
2431 _maybe_request_map();
2432 }
2433
7c673cae
FG
2434 OSDSession::unique_lock sl(s->lock);
2435 if (op->tid == 0)
31f18b77 2436 op->tid = ++last_tid;
7c673cae
FG
2437
2438 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2439 << " '" << op->target.base_oloc << "' '"
2440 << op->target.target_oloc << "' " << op->ops << " tid "
2441 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2442 << dendl;
2443
2444 _session_op_assign(s, op);
2445
2446 if (need_send) {
11fdf7f2 2447 _send_op(op);
7c673cae
FG
2448 }
2449
2450 // Last chance to touch Op here, after giving up session lock it can
2451 // be freed at any time by response handler.
2452 ceph_tid_t tid = op->tid;
2453 if (check_for_latest_map) {
2454 _send_op_map_check(op);
2455 }
2456 if (ptid)
2457 *ptid = tid;
2458 op = NULL;
2459
2460 sl.unlock();
2461 put_session(s);
2462
31f18b77 2463 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2464}
2465
2466int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2467{
11fdf7f2 2468 ceph_assert(initialized);
7c673cae
FG
2469
2470 OSDSession::unique_lock sl(s->lock);
2471
2472 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2473 if (p == s->ops.end()) {
2474 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2475 << s->osd << dendl;
2476 return -ENOENT;
2477 }
2478
11fdf7f2 2479#if 0
7c673cae 2480 if (s->con) {
9f95a23c 2481 ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
7c673cae
FG
2482 << " on " << s->con << dendl;
2483 s->con->revoke_rx_buffer(tid);
2484 }
11fdf7f2 2485#endif
7c673cae
FG
2486
2487 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2488 << dendl;
2489 Op *op = p->second;
2490 if (op->onfinish) {
31f18b77 2491 num_in_flight--;
7c673cae
FG
2492 op->onfinish->complete(r);
2493 op->onfinish = NULL;
2494 }
2495 _op_cancel_map_check(op);
2496 _finish_op(op, r);
2497 sl.unlock();
2498
2499 return 0;
2500}
2501
2502int Objecter::op_cancel(ceph_tid_t tid, int r)
2503{
2504 int ret = 0;
2505
2506 unique_lock wl(rwlock);
2507 ret = _op_cancel(tid, r);
2508
2509 return ret;
2510}
2511
94b18763
FG
2512int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2513{
2514 unique_lock wl(rwlock);
2515 ldout(cct,10) << __func__ << " " << tids << dendl;
2516 for (auto tid : tids) {
2517 _op_cancel(tid, r);
2518 }
2519 return 0;
2520}
2521
7c673cae
FG
2522int Objecter::_op_cancel(ceph_tid_t tid, int r)
2523{
2524 int ret = 0;
2525
2526 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2527 << dendl;
2528
2529start:
2530
2531 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2532 siter != osd_sessions.end(); ++siter) {
2533 OSDSession *s = siter->second;
2534 OSDSession::shared_lock sl(s->lock);
2535 if (s->ops.find(tid) != s->ops.end()) {
2536 sl.unlock();
2537 ret = op_cancel(s, tid, r);
2538 if (ret == -ENOENT) {
2539 /* oh no! raced, maybe tid moved to another session, restarting */
2540 goto start;
2541 }
2542 return ret;
2543 }
2544 }
2545
2546 ldout(cct, 5) << __func__ << ": tid " << tid
2547 << " not found in live sessions" << dendl;
2548
2549 // Handle case where the op is in homeless session
2550 OSDSession::shared_lock sl(homeless_session->lock);
2551 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2552 sl.unlock();
2553 ret = op_cancel(homeless_session, tid, r);
2554 if (ret == -ENOENT) {
2555 /* oh no! raced, maybe tid moved to another session, restarting */
2556 goto start;
2557 } else {
2558 return ret;
2559 }
2560 } else {
2561 sl.unlock();
2562 }
2563
2564 ldout(cct, 5) << __func__ << ": tid " << tid
2565 << " not found in homeless session" << dendl;
2566
2567 return ret;
2568}
2569
2570
2571epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2572{
2573 unique_lock wl(rwlock);
2574
2575 std::vector<ceph_tid_t> to_cancel;
2576 bool found = false;
2577
2578 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2579 siter != osd_sessions.end(); ++siter) {
2580 OSDSession *s = siter->second;
2581 OSDSession::shared_lock sl(s->lock);
2582 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2583 op_i != s->ops.end(); ++op_i) {
2584 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2585 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2586 to_cancel.push_back(op_i->first);
2587 }
2588 }
2589 sl.unlock();
2590
2591 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2592 titer != to_cancel.end();
2593 ++titer) {
2594 int cancel_result = op_cancel(s, *titer, r);
2595 // We hold rwlock across search and cancellation, so cancels
2596 // should always succeed
11fdf7f2 2597 ceph_assert(cancel_result == 0);
7c673cae
FG
2598 }
2599 if (!found && to_cancel.size())
2600 found = true;
2601 to_cancel.clear();
2602 }
2603
2604 const epoch_t epoch = osdmap->get_epoch();
2605
2606 wl.unlock();
2607
2608 if (found) {
2609 return epoch;
2610 } else {
2611 return -1;
2612 }
2613}
2614
2615bool Objecter::is_pg_changed(
2616 int oldprimary,
2617 const vector<int>& oldacting,
2618 int newprimary,
2619 const vector<int>& newacting,
2620 bool any_change)
2621{
9f95a23c 2622 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
7c673cae
FG
2623 oldprimary,
2624 oldacting,
2625 newprimary,
2626 newacting))
2627 return true;
2628 if (any_change && oldacting != newacting)
2629 return true;
2630 return false; // same primary (tho replicas may have changed)
2631}
2632
2633bool Objecter::target_should_be_paused(op_target_t *t)
2634{
2635 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2636 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2637 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
9f95a23c 2638 (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
7c673cae
FG
2639
2640 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2641 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2642 (osdmap->get_epoch() < epoch_barrier);
2643}
2644
2645/**
2646 * Locking public accessor for _osdmap_full_flag
2647 */
2648bool Objecter::osdmap_full_flag() const
2649{
2650 shared_lock rl(rwlock);
2651
2652 return _osdmap_full_flag();
2653}
2654
2655bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2656{
2657 shared_lock rl(rwlock);
2658
2659 if (_osdmap_full_flag()) {
2660 return true;
2661 }
2662
2663 return _osdmap_pool_full(pool_id);
2664}
2665
2666bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2667{
2668 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2669 if (pool == NULL) {
2670 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2671 return false;
2672 }
2673
2674 return _osdmap_pool_full(*pool);
2675}
2676
2677bool Objecter::_osdmap_has_pool_full() const
2678{
2679 for (map<int64_t, pg_pool_t>::const_iterator it
2680 = osdmap->get_pools().begin();
2681 it != osdmap->get_pools().end(); ++it) {
2682 if (_osdmap_pool_full(it->second))
2683 return true;
2684 }
2685 return false;
2686}
2687
7c673cae
FG
2688/**
2689 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2690 */
2691bool Objecter::_osdmap_full_flag() const
2692{
11fdf7f2 2693 // Ignore the FULL flag if the caller does not have honor_osdmap_full
9f95a23c 2694 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
7c673cae
FG
2695}
2696
2697void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2698{
2699 for (map<int64_t, pg_pool_t>::const_iterator it
2700 = osdmap->get_pools().begin();
2701 it != osdmap->get_pools().end(); ++it) {
2702 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2703 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2704 } else {
2705 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2706 pool_full_map[it->first];
2707 }
2708 }
2709}
2710
2711int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2712 const string& ns)
2713{
2714 shared_lock rl(rwlock);
2715 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2716 if (!p)
2717 return -ENOENT;
2718 return p->hash_key(key, ns);
2719}
2720
2721int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2722 const string& ns)
2723{
2724 shared_lock rl(rwlock);
2725 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2726 if (!p)
2727 return -ENOENT;
2728 return p->raw_hash_to_pg(p->hash_key(key, ns));
2729}
2730
11fdf7f2
TL
2731void Objecter::_prune_snapc(
2732 const mempool::osdmap::map<int64_t,
9f95a23c 2733 snap_interval_set_t>& new_removed_snaps,
11fdf7f2
TL
2734 Op *op)
2735{
2736 bool match = false;
2737 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2738 if (i != new_removed_snaps.end()) {
2739 for (auto s : op->snapc.snaps) {
2740 if (i->second.contains(s)) {
2741 match = true;
2742 break;
2743 }
2744 }
2745 if (match) {
2746 vector<snapid_t> new_snaps;
2747 for (auto s : op->snapc.snaps) {
2748 if (!i->second.contains(s)) {
2749 new_snaps.push_back(s);
2750 }
2751 }
2752 op->snapc.snaps.swap(new_snaps);
2753 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2754 << " (was " << new_snaps << ")" << dendl;
2755 }
2756 }
2757}
2758
7c673cae
FG
2759int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2760{
2761 // rwlock is locked
2762 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2763 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2764 t->epoch = osdmap->get_epoch();
2765 ldout(cct,20) << __func__ << " epoch " << t->epoch
2766 << " base " << t->base_oid << " " << t->base_oloc
2767 << " precalc_pgid " << (int)t->precalc_pgid
2768 << " pgid " << t->base_pgid
2769 << (is_read ? " is_read" : "")
2770 << (is_write ? " is_write" : "")
2771 << dendl;
2772
2773 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2774 if (!pi) {
2775 t->osd = -1;
2776 return RECALC_OP_TARGET_POOL_DNE;
2777 }
2778 ldout(cct,30) << __func__ << " base pi " << pi
2779 << " pg_num " << pi->get_pg_num() << dendl;
2780
2781 bool force_resend = false;
2782 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2783 if (t->last_force_resend < pi->last_force_op_resend) {
2784 t->last_force_resend = pi->last_force_op_resend;
2785 force_resend = true;
2786 } else if (t->last_force_resend == 0) {
2787 force_resend = true;
2788 }
2789 }
2790
2791 // apply tiering
2792 t->target_oid = t->base_oid;
2793 t->target_oloc = t->base_oloc;
2794 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2795 if (is_read && pi->has_read_tier())
2796 t->target_oloc.pool = pi->read_tier;
2797 if (is_write && pi->has_write_tier())
2798 t->target_oloc.pool = pi->write_tier;
2799 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2800 if (!pi) {
2801 t->osd = -1;
2802 return RECALC_OP_TARGET_POOL_DNE;
2803 }
2804 }
2805
2806 pg_t pgid;
2807 if (t->precalc_pgid) {
11fdf7f2
TL
2808 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2809 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2810 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
7c673cae
FG
2811 pgid = t->base_pgid;
2812 } else {
2813 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2814 pgid);
2815 if (ret == -ENOENT) {
2816 t->osd = -1;
2817 return RECALC_OP_TARGET_POOL_DNE;
2818 }
2819 }
2820 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2821 << t->target_oloc << " -> pgid " << pgid << dendl;
2822 ldout(cct,30) << __func__ << " target pi " << pi
2823 << " pg_num " << pi->get_pg_num() << dendl;
2824 t->pool_ever_existed = true;
2825
2826 int size = pi->size;
2827 int min_size = pi->min_size;
2828 unsigned pg_num = pi->get_pg_num();
9f95a23c 2829 unsigned pg_num_mask = pi->get_pg_num_mask();
11fdf7f2 2830 unsigned pg_num_pending = pi->get_pg_num_pending();
7c673cae
FG
2831 int up_primary, acting_primary;
2832 vector<int> up, acting;
9f95a23c
TL
2833 ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2834 pg_t actual_pgid(actual_ps, pgid.pool());
2835 pg_mapping_t pg_mapping;
2836 pg_mapping.epoch = osdmap->get_epoch();
2837 if (lookup_pg_mapping(actual_pgid, &pg_mapping)) {
2838 up = pg_mapping.up;
2839 up_primary = pg_mapping.up_primary;
2840 acting = pg_mapping.acting;
2841 acting_primary = pg_mapping.acting_primary;
2842 } else {
2843 osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2844 &acting, &acting_primary);
2845 pg_mapping_t pg_mapping(osdmap->get_epoch(),
2846 up, up_primary, acting, acting_primary);
2847 update_pg_mapping(actual_pgid, std::move(pg_mapping));
2848 }
7c673cae 2849 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2850 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2851 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2852 pg_t prev_pgid(prev_seed, pgid.pool());
2853 if (any_change && PastIntervals::is_new_interval(
2854 t->acting_primary,
2855 acting_primary,
2856 t->acting,
2857 acting,
2858 t->up_primary,
2859 up_primary,
2860 t->up,
2861 up,
2862 t->size,
2863 size,
2864 t->min_size,
2865 min_size,
2866 t->pg_num,
2867 pg_num,
11fdf7f2
TL
2868 t->pg_num_pending,
2869 pg_num_pending,
7c673cae
FG
2870 t->sort_bitwise,
2871 sort_bitwise,
c07f9fc5
FG
2872 t->recovery_deletes,
2873 recovery_deletes,
7c673cae
FG
2874 prev_pgid)) {
2875 force_resend = true;
2876 }
2877
2878 bool unpaused = false;
f64942e4
AA
2879 bool should_be_paused = target_should_be_paused(t);
2880 if (t->paused && !should_be_paused) {
7c673cae
FG
2881 unpaused = true;
2882 }
9f95a23c
TL
2883 if (t->paused != should_be_paused) {
2884 ldout(cct, 10) << __func__ << " paused " << t->paused
2885 << " -> " << should_be_paused << dendl;
2886 t->paused = should_be_paused;
2887 }
7c673cae
FG
2888
2889 bool legacy_change =
2890 t->pgid != pgid ||
2891 is_pg_changed(
2892 t->acting_primary, t->acting, acting_primary, acting,
2893 t->used_replica || any_change);
11fdf7f2 2894 bool split_or_merge = false;
7c673cae 2895 if (t->pg_num) {
11fdf7f2
TL
2896 split_or_merge =
2897 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2898 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2899 prev_pgid.is_merge_target(t->pg_num, pg_num);
7c673cae
FG
2900 }
2901
11fdf7f2 2902 if (legacy_change || split_or_merge || force_resend) {
7c673cae
FG
2903 t->pgid = pgid;
2904 t->acting = acting;
2905 t->acting_primary = acting_primary;
2906 t->up_primary = up_primary;
2907 t->up = up;
2908 t->size = size;
2909 t->min_size = min_size;
2910 t->pg_num = pg_num;
9f95a23c 2911 t->pg_num_mask = pg_num_mask;
11fdf7f2 2912 t->pg_num_pending = pg_num_pending;
9f95a23c
TL
2913 spg_t spgid(actual_pgid);
2914 if (pi->is_erasure()) {
2915 for (uint8_t i = 0; i < acting.size(); ++i) {
2916 if (acting[i] == acting_primary) {
2917 spgid.reset_shard(shard_id_t(i));
2918 break;
2919 }
2920 }
2921 }
2922 t->actual_pgid = spgid;
7c673cae 2923 t->sort_bitwise = sort_bitwise;
c07f9fc5 2924 t->recovery_deletes = recovery_deletes;
7c673cae
FG
2925 ldout(cct, 10) << __func__ << " "
2926 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2927 << " acting " << acting
2928 << " primary " << acting_primary << dendl;
2929 t->used_replica = false;
e306af50
TL
2930 if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
2931 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
2932 !is_write && pi->is_replicated() && acting.size() > 1) {
7c673cae 2933 int osd;
e306af50
TL
2934 ceph_assert(is_read && acting[0] == acting_primary);
2935 if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
7c673cae
FG
2936 int p = rand() % acting.size();
2937 if (p)
2938 t->used_replica = true;
2939 osd = acting[p];
2940 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2941 << dendl;
e306af50 2942 } else {
7c673cae
FG
2943 // look for a local replica. prefer the primary if the
2944 // distance is the same.
2945 int best = -1;
2946 int best_locality = 0;
2947 for (unsigned i = 0; i < acting.size(); ++i) {
2948 int locality = osdmap->crush->get_common_ancestor_distance(
2949 cct, acting[i], crush_location);
2950 ldout(cct, 20) << __func__ << " localize: rank " << i
2951 << " osd." << acting[i]
2952 << " locality " << locality << dendl;
2953 if (i == 0 ||
2954 (locality >= 0 && best_locality >= 0 &&
2955 locality < best_locality) ||
2956 (best_locality < 0 && locality >= 0)) {
2957 best = i;
2958 best_locality = locality;
2959 if (i)
2960 t->used_replica = true;
2961 }
2962 }
11fdf7f2 2963 ceph_assert(best >= 0);
7c673cae 2964 osd = acting[best];
7c673cae
FG
2965 }
2966 t->osd = osd;
e306af50
TL
2967 } else {
2968 t->osd = acting_primary;
7c673cae
FG
2969 }
2970 }
2971 if (legacy_change || unpaused || force_resend) {
2972 return RECALC_OP_TARGET_NEED_RESEND;
2973 }
11fdf7f2 2974 if (split_or_merge &&
9f95a23c 2975 (osdmap->require_osd_release >= ceph_release_t::luminous ||
91327a77
AA
2976 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
2977 RESEND_ON_SPLIT))) {
7c673cae
FG
2978 return RECALC_OP_TARGET_NEED_RESEND;
2979 }
2980 return RECALC_OP_TARGET_NO_ACTION;
2981}
2982
2983int Objecter::_map_session(op_target_t *target, OSDSession **s,
2984 shunique_lock& sul)
2985{
2986 _calc_target(target, nullptr);
2987 return _get_session(target->osd, s, sul);
2988}
2989
2990void Objecter::_session_op_assign(OSDSession *to, Op *op)
2991{
2992 // to->lock is locked
11fdf7f2
TL
2993 ceph_assert(op->session == NULL);
2994 ceph_assert(op->tid);
7c673cae
FG
2995
2996 get_session(to);
2997 op->session = to;
2998 to->ops[op->tid] = op;
2999
3000 if (to->is_homeless()) {
31f18b77 3001 num_homeless_ops++;
7c673cae
FG
3002 }
3003
3004 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3005}
3006
3007void Objecter::_session_op_remove(OSDSession *from, Op *op)
3008{
11fdf7f2 3009 ceph_assert(op->session == from);
7c673cae
FG
3010 // from->lock is locked
3011
3012 if (from->is_homeless()) {
31f18b77 3013 num_homeless_ops--;
7c673cae
FG
3014 }
3015
3016 from->ops.erase(op->tid);
3017 put_session(from);
3018 op->session = NULL;
3019
3020 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3021}
3022
3023void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3024{
3025 // to lock is locked unique
11fdf7f2 3026 ceph_assert(op->session == NULL);
7c673cae
FG
3027
3028 if (to->is_homeless()) {
31f18b77 3029 num_homeless_ops++;
7c673cae
FG
3030 }
3031
3032 get_session(to);
3033 op->session = to;
3034 to->linger_ops[op->linger_id] = op;
3035
3036 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3037 << dendl;
3038}
3039
3040void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3041{
11fdf7f2 3042 ceph_assert(from == op->session);
7c673cae
FG
3043 // from->lock is locked unique
3044
3045 if (from->is_homeless()) {
31f18b77 3046 num_homeless_ops--;
7c673cae
FG
3047 }
3048
3049 from->linger_ops.erase(op->linger_id);
3050 put_session(from);
3051 op->session = NULL;
3052
3053 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3054 << dendl;
3055}
3056
3057void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3058{
11fdf7f2 3059 ceph_assert(from == op->session);
7c673cae
FG
3060 // from->lock is locked
3061
3062 if (from->is_homeless()) {
31f18b77 3063 num_homeless_ops--;
7c673cae
FG
3064 }
3065
3066 from->command_ops.erase(op->tid);
3067 put_session(from);
3068 op->session = NULL;
3069
3070 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3071}
3072
3073void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3074{
3075 // to->lock is locked
11fdf7f2
TL
3076 ceph_assert(op->session == NULL);
3077 ceph_assert(op->tid);
7c673cae
FG
3078
3079 if (to->is_homeless()) {
31f18b77 3080 num_homeless_ops++;
7c673cae
FG
3081 }
3082
3083 get_session(to);
3084 op->session = to;
3085 to->command_ops[op->tid] = op;
3086
3087 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3088}
3089
3090int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3091 shunique_lock& sul)
3092{
3093 // rwlock is locked unique
3094
3095 int r = _calc_target(&linger_op->target, nullptr, true);
3096 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3097 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3098 << " pgid " << linger_op->target.pgid
3099 << " acting " << linger_op->target.acting << dendl;
3100
3101 OSDSession *s = NULL;
3102 r = _get_session(linger_op->target.osd, &s, sul);
11fdf7f2 3103 ceph_assert(r == 0);
7c673cae
FG
3104
3105 if (linger_op->session != s) {
3106 // NB locking two sessions (s and linger_op->session) at the
3107 // same time here is only safe because we are the only one that
3108 // takes two, and we are holding rwlock for write. Disable
3109 // lockdep because it doesn't know that.
3110 OSDSession::unique_lock sl(s->lock);
3111 _session_linger_op_remove(linger_op->session, linger_op);
3112 _session_linger_op_assign(s, linger_op);
3113 }
3114
3115 put_session(s);
3116 return RECALC_OP_TARGET_NEED_RESEND;
3117 }
3118 return r;
3119}
3120
3121void Objecter::_cancel_linger_op(Op *op)
3122{
3123 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3124
11fdf7f2 3125 ceph_assert(!op->should_resend);
7c673cae
FG
3126 if (op->onfinish) {
3127 delete op->onfinish;
31f18b77 3128 num_in_flight--;
7c673cae
FG
3129 }
3130
3131 _finish_op(op, 0);
3132}
3133
3134void Objecter::_finish_op(Op *op, int r)
3135{
11fdf7f2 3136 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
7c673cae
FG
3137
3138 // op->session->lock is locked unique or op->session is null
3139
11fdf7f2
TL
3140 if (!op->ctx_budgeted && op->budget >= 0) {
3141 put_op_budget_bytes(op->budget);
3142 op->budget = -1;
3143 }
7c673cae
FG
3144
3145 if (op->ontimeout && r != -ETIMEDOUT)
3146 timer.cancel_event(op->ontimeout);
3147
3148 if (op->session) {
3149 _session_op_remove(op->session, op);
3150 }
3151
3152 logger->dec(l_osdc_op_active);
3153
11fdf7f2 3154 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
7c673cae 3155
31f18b77 3156 inflight_ops--;
7c673cae
FG
3157
3158 op->put();
3159}
3160
7c673cae
FG
3161MOSDOp *Objecter::_prepare_osd_op(Op *op)
3162{
3163 // rwlock is locked
3164
3165 int flags = op->target.flags;
3166 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3167
3168 // Nothing checks this any longer, but needed for compatibility with
3169 // pre-luminous osds
3170 flags |= CEPH_OSD_FLAG_ONDISK;
3171
9f95a23c 3172 if (!honor_pool_full)
7c673cae
FG
3173 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3174
3175 op->target.paused = false;
11fdf7f2 3176 op->stamp = ceph::coarse_mono_clock::now();
7c673cae
FG
3177
3178 hobject_t hobj = op->target.get_hobj();
31f18b77 3179 MOSDOp *m = new MOSDOp(client_inc, op->tid,
7c673cae
FG
3180 hobj, op->target.actual_pgid,
3181 osdmap->get_epoch(),
3182 flags, op->features);
3183
3184 m->set_snapid(op->snapid);
3185 m->set_snap_seq(op->snapc.seq);
3186 m->set_snaps(op->snapc.snaps);
3187
3188 m->ops = op->ops;
3189 m->set_mtime(op->mtime);
3190 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3191
3192 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3193 op->trace.init("op", &trace_endpoint);
3194 }
7c673cae
FG
3195
3196 if (op->priority)
3197 m->set_priority(op->priority);
3198 else
3199 m->set_priority(cct->_conf->osd_client_op_priority);
3200
3201 if (op->reqid != osd_reqid_t()) {
3202 m->set_reqid(op->reqid);
3203 }
3204
3205 logger->inc(l_osdc_op_send);
b32b8144
FG
3206 ssize_t sum = 0;
3207 for (unsigned i = 0; i < m->ops.size(); i++) {
3208 sum += m->ops[i].indata.length();
3209 }
3210 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3211
3212 return m;
3213}
3214
11fdf7f2 3215void Objecter::_send_op(Op *op)
7c673cae
FG
3216{
3217 // rwlock is locked
3218 // op->session->lock is locked
3219
3220 // backoff?
7c673cae
FG
3221 auto p = op->session->backoffs.find(op->target.actual_pgid);
3222 if (p != op->session->backoffs.end()) {
b32b8144 3223 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3224 auto q = p->second.lower_bound(hoid);
3225 if (q != p->second.begin()) {
3226 --q;
3227 if (hoid >= q->second.end) {
3228 ++q;
3229 }
3230 }
3231 if (q != p->second.end()) {
3232 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3233 << "," << q->second.end << ")" << dendl;
3234 int r = cmp(hoid, q->second.begin);
3235 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3236 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3237 << " id " << q->second.id << " on " << hoid
3238 << ", queuing " << op << " tid " << op->tid << dendl;
3239 return;
3240 }
3241 }
3242 }
3243
11fdf7f2
TL
3244 ceph_assert(op->tid > 0);
3245 MOSDOp *m = _prepare_osd_op(op);
7c673cae
FG
3246
3247 if (op->target.actual_pgid != m->get_spg()) {
3248 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3249 << m->get_spg() << " to " << op->target.actual_pgid
3250 << ", updating and reencoding" << dendl;
3251 m->set_spg(op->target.actual_pgid);
3252 m->clear_payload(); // reencode
3253 }
3254
3255 ldout(cct, 15) << "_send_op " << op->tid << " to "
3256 << op->target.actual_pgid << " on osd." << op->session->osd
3257 << dendl;
3258
3259 ConnectionRef con = op->session->con;
11fdf7f2 3260 ceph_assert(con);
7c673cae 3261
11fdf7f2 3262#if 0
9f95a23c 3263 // preallocated rx ceph::buffer?
7c673cae 3264 if (op->con) {
9f95a23c 3265 ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
7c673cae
FG
3266 << op->con << dendl;
3267 op->con->revoke_rx_buffer(op->tid);
3268 }
3269 if (op->outbl &&
3270 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3271 op->outbl->length()) {
11fdf7f2 3272 op->outbl->invalidate_crc(); // messenger writes through c_str()
9f95a23c 3273 ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
7c673cae
FG
3274 << dendl;
3275 op->con = con;
3276 op->con->post_rx_buffer(op->tid, *op->outbl);
3277 }
11fdf7f2 3278#endif
7c673cae
FG
3279
3280 op->incarnation = op->session->incarnation;
3281
31f18b77
FG
3282 if (op->trace.valid()) {
3283 m->trace.init("op msg", nullptr, &op->trace);
3284 }
7c673cae
FG
3285 op->session->con->send_message(m);
3286}
3287
11fdf7f2 3288int Objecter::calc_op_budget(const vector<OSDOp>& ops)
7c673cae
FG
3289{
3290 int op_budget = 0;
11fdf7f2
TL
3291 for (vector<OSDOp>::const_iterator i = ops.begin();
3292 i != ops.end();
7c673cae
FG
3293 ++i) {
3294 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3295 op_budget += i->indata.length();
3296 } else if (ceph_osd_op_mode_read(i->op.op)) {
a8e16298
TL
3297 if (ceph_osd_op_uses_extent(i->op.op)) {
3298 if ((int64_t)i->op.extent.length > 0)
3299 op_budget += (int64_t)i->op.extent.length;
7c673cae 3300 } else if (ceph_osd_op_type_attr(i->op.op)) {
a8e16298 3301 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
7c673cae
FG
3302 }
3303 }
3304 }
3305 return op_budget;
3306}
3307
3308void Objecter::_throttle_op(Op *op,
3309 shunique_lock& sul,
3310 int op_budget)
3311{
11fdf7f2 3312 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
3313 bool locked_for_write = sul.owns_lock();
3314
3315 if (!op_budget)
11fdf7f2 3316 op_budget = calc_op_budget(op->ops);
7c673cae
FG
3317 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3318 sul.unlock();
3319 op_throttle_bytes.get(op_budget);
3320 if (locked_for_write)
3321 sul.lock();
3322 else
3323 sul.lock_shared();
3324 }
3325 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3326 sul.unlock();
3327 op_throttle_ops.get(1);
3328 if (locked_for_write)
3329 sul.lock();
3330 else
3331 sul.lock_shared();
3332 }
3333}
3334
11fdf7f2 3335int Objecter::take_linger_budget(LingerOp *info)
7c673cae 3336{
11fdf7f2 3337 return 1;
7c673cae
FG
3338}
3339
3340/* This function DOES put the passed message before returning */
3341void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3342{
3343 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3344
3345 // get pio
3346 ceph_tid_t tid = m->get_tid();
3347
3348 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3349 if (!initialized) {
7c673cae
FG
3350 m->put();
3351 return;
3352 }
3353
3354 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3355 auto priv = con->get_priv();
3356 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3357 if (!s || s->con != con) {
3358 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3359 m->put();
3360 return;
3361 }
3362
3363 OSDSession::unique_lock sl(s->lock);
3364
3365 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3366 if (iter == s->ops.end()) {
3367 ldout(cct, 7) << "handle_osd_op_reply " << tid
3368 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3369 " onnvram" : " ack"))
3370 << " ... stray" << dendl;
3371 sl.unlock();
7c673cae
FG
3372 m->put();
3373 return;
3374 }
3375
3376 ldout(cct, 7) << "handle_osd_op_reply " << tid
3377 << (m->is_ondisk() ? " ondisk" :
3378 (m->is_onnvram() ? " onnvram" : " ack"))
3379 << " uv " << m->get_user_version()
3380 << " in " << m->get_pg()
3381 << " attempt " << m->get_retry_attempt()
3382 << dendl;
3383 Op *op = iter->second;
31f18b77 3384 op->trace.event("osd op reply");
7c673cae
FG
3385
3386 if (retry_writes_after_first_reply && op->attempts == 1 &&
3387 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3388 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3389 if (op->onfinish) {
31f18b77 3390 num_in_flight--;
7c673cae
FG
3391 }
3392 _session_op_remove(s, op);
3393 sl.unlock();
7c673cae
FG
3394
3395 _op_submit(op, sul, NULL);
3396 m->put();
3397 return;
3398 }
3399
3400 if (m->get_retry_attempt() >= 0) {
3401 if (m->get_retry_attempt() != (op->attempts - 1)) {
3402 ldout(cct, 7) << " ignoring reply from attempt "
3403 << m->get_retry_attempt()
3404 << " from " << m->get_source_inst()
3405 << "; last attempt " << (op->attempts - 1) << " sent to "
3406 << op->session->con->get_peer_addr() << dendl;
3407 m->put();
3408 sl.unlock();
7c673cae
FG
3409 return;
3410 }
3411 } else {
3412 // we don't know the request attempt because the server is old, so
3413 // just accept this one. we may do ACK callbacks we shouldn't
3414 // have, but that is better than doing callbacks out of order.
3415 }
3416
3417 Context *onfinish = 0;
3418
3419 int rc = m->get_result();
3420
3421 if (m->is_redirect_reply()) {
3422 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3423 if (op->onfinish)
31f18b77 3424 num_in_flight--;
7c673cae
FG
3425 _session_op_remove(s, op);
3426 sl.unlock();
7c673cae
FG
3427
3428 // FIXME: two redirects could race and reorder
3429
3430 op->tid = 0;
3431 m->get_redirect().combine_with_locator(op->target.target_oloc,
3432 op->target.target_oid.name);
f64942e4
AA
3433 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3434 CEPH_OSD_FLAG_IGNORE_CACHE |
3435 CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3436 _op_submit(op, sul, NULL);
3437 m->put();
3438 return;
3439 }
3440
3441 if (rc == -EAGAIN) {
3442 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
c07f9fc5
FG
3443 if (op->onfinish)
3444 num_in_flight--;
3445 _session_op_remove(s, op);
7c673cae 3446 sl.unlock();
c07f9fc5
FG
3447
3448 op->tid = 0;
3449 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3450 CEPH_OSD_FLAG_LOCALIZE_READS);
3451 op->target.pgid = pg_t();
3452 _op_submit(op, sul, NULL);
7c673cae
FG
3453 m->put();
3454 return;
3455 }
3456
3457 sul.unlock();
3458
3459 if (op->objver)
3460 *op->objver = m->get_user_version();
3461 if (op->reply_epoch)
3462 *op->reply_epoch = m->get_map_epoch();
3463 if (op->data_offset)
3464 *op->data_offset = m->get_header().data_off;
3465
3466 // got data?
3467 if (op->outbl) {
11fdf7f2 3468#if 0
7c673cae
FG
3469 if (op->con)
3470 op->con->revoke_rx_buffer(op->tid);
11fdf7f2
TL
3471#endif
3472 auto& bl = m->get_data();
3473 if (op->outbl->length() == bl.length() &&
3474 bl.get_num_buffers() <= 1) {
3475 // this is here to keep previous users to *relied* on getting data
3476 // read into existing buffers happy. Notably,
3477 // libradosstriper::RadosStriperImpl::aio_read().
3478 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
9f95a23c 3479 << " into existing ceph::buffer of length " << op->outbl->length()
11fdf7f2 3480 << dendl;
9f95a23c 3481 ceph::buffer::list t;
11fdf7f2
TL
3482 t.claim(*op->outbl);
3483 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
9f95a23c 3484 bl.begin().copy(bl.length(), t.c_str());
11fdf7f2
TL
3485 op->outbl->substr_of(t, 0, bl.length());
3486 } else {
3487 m->claim_data(*op->outbl);
3488 }
7c673cae
FG
3489 op->outbl = 0;
3490 }
3491
3492 // per-op result demuxing
3493 vector<OSDOp> out_ops;
3494 m->claim_ops(out_ops);
3495
3496 if (out_ops.size() != op->ops.size())
3497 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3498 << " != request ops " << op->ops
3499 << " from " << m->get_source_inst() << dendl;
3500
9f95a23c 3501 vector<ceph::buffer::list*>::iterator pb = op->out_bl.begin();
7c673cae
FG
3502 vector<int*>::iterator pr = op->out_rval.begin();
3503 vector<Context*>::iterator ph = op->out_handler.begin();
11fdf7f2
TL
3504 ceph_assert(op->out_bl.size() == op->out_rval.size());
3505 ceph_assert(op->out_bl.size() == op->out_handler.size());
7c673cae
FG
3506 vector<OSDOp>::iterator p = out_ops.begin();
3507 for (unsigned i = 0;
3508 p != out_ops.end() && pb != op->out_bl.end();
3509 ++i, ++p, ++pb, ++pr, ++ph) {
3510 ldout(cct, 10) << " op " << i << " rval " << p->rval
3511 << " len " << p->outdata.length() << dendl;
3512 if (*pb)
3513 **pb = p->outdata;
3514 // set rval before running handlers so that handlers
3515 // can change it if e.g. decoding fails
3516 if (*pr)
31f18b77 3517 **pr = ceph_to_hostos_errno(p->rval);
7c673cae
FG
3518 if (*ph) {
3519 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
31f18b77 3520 (*ph)->complete(ceph_to_hostos_errno(p->rval));
7c673cae
FG
3521 *ph = NULL;
3522 }
3523 }
3524
3525 // NOTE: we assume that since we only request ONDISK ever we will
3526 // only ever get back one (type of) ack ever.
3527
3528 if (op->onfinish) {
31f18b77 3529 num_in_flight--;
7c673cae
FG
3530 onfinish = op->onfinish;
3531 op->onfinish = NULL;
3532 }
3533 logger->inc(l_osdc_op_reply);
3534
3535 /* get it before we call _finish_op() */
3536 auto completion_lock = s->get_lock(op->target.base_oid);
3537
3538 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3539 _finish_op(op, 0);
3540
31f18b77 3541 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3542
3543 // serialize completions
3544 if (completion_lock.mutex()) {
3545 completion_lock.lock();
3546 }
3547 sl.unlock();
3548
3549 // do callbacks
3550 if (onfinish) {
3551 onfinish->complete(rc);
3552 }
3553 if (completion_lock.mutex()) {
3554 completion_lock.unlock();
3555 }
3556
3557 m->put();
7c673cae
FG
3558}
3559
3560void Objecter::handle_osd_backoff(MOSDBackoff *m)
3561{
3562 ldout(cct, 10) << __func__ << " " << *m << dendl;
3563 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3564 if (!initialized) {
7c673cae
FG
3565 m->put();
3566 return;
3567 }
3568
3569 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3570 auto priv = con->get_priv();
3571 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3572 if (!s || s->con != con) {
3573 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3574 m->put();
3575 return;
3576 }
3577
3578 get_session(s);
7c673cae
FG
3579
3580 OSDSession::unique_lock sl(s->lock);
3581
3582 switch (m->op) {
3583 case CEPH_OSD_BACKOFF_OP_BLOCK:
3584 {
3585 // register
3586 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3587 s->backoffs_by_id.insert(make_pair(m->id, &b));
3588 b.pgid = m->pgid;
3589 b.id = m->id;
3590 b.begin = m->begin;
3591 b.end = m->end;
3592
3593 // ack with original backoff's epoch so that the osd can discard this if
3594 // there was a pg split.
3595 Message *r = new MOSDBackoff(m->pgid,
3596 m->map_epoch,
3597 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3598 m->id, m->begin, m->end);
3599 // this priority must match the MOSDOps from _prepare_osd_op
3600 r->set_priority(cct->_conf->osd_client_op_priority);
3601 con->send_message(r);
3602 }
3603 break;
3604
3605 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3606 {
3607 auto p = s->backoffs_by_id.find(m->id);
3608 if (p != s->backoffs_by_id.end()) {
3609 OSDBackoff *b = p->second;
3610 if (b->begin != m->begin &&
3611 b->end != m->end) {
3612 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3613 << " unblock on ["
3614 << m->begin << "," << m->end << ") but backoff is ["
3615 << b->begin << "," << b->end << ")" << dendl;
3616 // hrmpf, unblock it anyway.
3617 }
3618 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3619 << " id " << b->id
3620 << " [" << b->begin << "," << b->end
3621 << ")" << dendl;
3622 auto spgp = s->backoffs.find(b->pgid);
11fdf7f2 3623 ceph_assert(spgp != s->backoffs.end());
7c673cae
FG
3624 spgp->second.erase(b->begin);
3625 if (spgp->second.empty()) {
3626 s->backoffs.erase(spgp);
3627 }
3628 s->backoffs_by_id.erase(p);
3629
3630 // check for any ops to resend
3631 for (auto& q : s->ops) {
3632 if (q.second->target.actual_pgid == m->pgid) {
3633 int r = q.second->target.contained_by(m->begin, m->end);
3634 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3635 << q.second->target.get_hobj() << dendl;
3636 if (r) {
3637 _send_op(q.second);
3638 }
3639 }
3640 }
3641 } else {
3642 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3643 << " unblock on ["
3644 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3645 }
3646 }
3647 break;
3648
3649 default:
3650 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3651 }
3652
3653 sul.unlock();
3654 sl.unlock();
3655
3656 m->put();
3657 put_session(s);
3658}
3659
3660uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3661 uint32_t pos)
3662{
3663 shared_lock rl(rwlock);
3664 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3665 pos, list_context->pool_id, string());
11fdf7f2 3666 ldout(cct, 10) << __func__ << " " << list_context
7c673cae
FG
3667 << " pos " << pos << " -> " << list_context->pos << dendl;
3668 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3669 list_context->current_pg = actual.ps();
3670 list_context->at_end_of_pool = false;
3671 return pos;
3672}
3673
3674uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3675 const hobject_t& cursor)
3676{
3677 shared_lock rl(rwlock);
3678 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3679 list_context->pos = cursor;
3680 list_context->at_end_of_pool = false;
3681 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3682 list_context->current_pg = actual.ps();
3683 list_context->sort_bitwise = true;
3684 return list_context->current_pg;
3685}
3686
3687void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3688 hobject_t *cursor)
3689{
3690 shared_lock rl(rwlock);
3691 if (list_context->list.empty()) {
3692 *cursor = list_context->pos;
3693 } else {
3694 const librados::ListObjectImpl& entry = list_context->list.front();
3695 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3696 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3697 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3698 }
3699}
3700
3701void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3702{
3703 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3704 << " pool_snap_seq " << list_context->pool_snap_seq
3705 << " max_entries " << list_context->max_entries
3706 << " list_context " << list_context
3707 << " onfinish " << onfinish
3708 << " current_pg " << list_context->current_pg
3709 << " pos " << list_context->pos << dendl;
3710
3711 shared_lock rl(rwlock);
3712 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3713 if (!pool) { // pool is gone
3714 rl.unlock();
3715 put_nlist_context_budget(list_context);
3716 onfinish->complete(-ENOENT);
3717 return;
3718 }
3719 int pg_num = pool->get_pg_num();
3720 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3721
3722 if (list_context->pos.is_min()) {
3723 list_context->starting_pg_num = 0;
3724 list_context->sort_bitwise = sort_bitwise;
3725 list_context->starting_pg_num = pg_num;
3726 }
3727 if (list_context->sort_bitwise != sort_bitwise) {
3728 list_context->pos = hobject_t(
3729 object_t(), string(), CEPH_NOSNAP,
3730 list_context->current_pg, list_context->pool_id, string());
3731 list_context->sort_bitwise = sort_bitwise;
3732 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3733 << list_context->pos << dendl;
3734 }
3735 if (list_context->starting_pg_num != pg_num) {
3736 if (!sort_bitwise) {
3737 // start reading from the beginning; the pgs have changed
3738 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3739 list_context->pos = collection_list_handle_t();
3740 }
3741 list_context->starting_pg_num = pg_num;
3742 }
3743
3744 if (list_context->pos.is_max()) {
3745 ldout(cct, 20) << __func__ << " end of pool, list "
3746 << list_context->list << dendl;
3747 if (list_context->list.empty()) {
3748 list_context->at_end_of_pool = true;
3749 }
3750 // release the listing context's budget once all
3751 // OPs (in the session) are finished
3752 put_nlist_context_budget(list_context);
3753 onfinish->complete(0);
3754 return;
3755 }
3756
3757 ObjectOperation op;
3758 op.pg_nls(list_context->max_entries, list_context->filter,
3759 list_context->pos, osdmap->get_epoch());
3760 list_context->bl.clear();
3761 C_NList *onack = new C_NList(list_context, onfinish, this);
3762 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3763
3764 // note current_pg in case we don't have (or lose) SORTBITWISE
3765 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3766 rl.unlock();
3767
3768 pg_read(list_context->current_pg, oloc, op,
3769 &list_context->bl, 0, onack, &onack->epoch,
3770 &list_context->ctx_budget);
3771}
3772
3773void Objecter::_nlist_reply(NListContext *list_context, int r,
3774 Context *final_finish, epoch_t reply_epoch)
3775{
3776 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3777
11fdf7f2 3778 auto iter = list_context->bl.cbegin();
7c673cae 3779 pg_nls_response_t response;
11fdf7f2 3780 decode(response, iter);
7c673cae 3781 if (!iter.end()) {
9f95a23c
TL
3782 // we do this as legacy.
3783 ceph::buffer::list legacy_extra_info;
3784 decode(legacy_extra_info, iter);
7c673cae
FG
3785 }
3786
3787 // if the osd returns 1 (newer code), or handle MAX, it means we
3788 // hit the end of the pg.
3789 if ((response.handle.is_max() || r == 1) &&
3790 !list_context->sort_bitwise) {
3791 // legacy OSD and !sortbitwise, figure out the next PG on our own
3792 ++list_context->current_pg;
3793 if (list_context->current_pg == list_context->starting_pg_num) {
3794 // end of pool
3795 list_context->pos = hobject_t::get_max();
3796 } else {
3797 // next pg
3798 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3799 list_context->current_pg,
3800 list_context->pool_id, string());
3801 }
3802 } else {
3803 list_context->pos = response.handle;
3804 }
3805
3806 int response_size = response.entries.size();
3807 ldout(cct, 20) << " response.entries.size " << response_size
3808 << ", response.entries " << response.entries
3809 << ", handle " << response.handle
3810 << ", tentative new pos " << list_context->pos << dendl;
7c673cae
FG
3811 if (response_size) {
3812 list_context->list.splice(list_context->list.end(), response.entries);
3813 }
3814
3815 if (list_context->list.size() >= list_context->max_entries) {
3816 ldout(cct, 20) << " hit max, returning results so far, "
3817 << list_context->list << dendl;
3818 // release the listing context's budget once all
3819 // OPs (in the session) are finished
3820 put_nlist_context_budget(list_context);
3821 final_finish->complete(0);
3822 return;
3823 }
3824
3825 // continue!
3826 list_nobjects(list_context, final_finish);
3827}
3828
3829void Objecter::put_nlist_context_budget(NListContext *list_context)
3830{
3831 if (list_context->ctx_budget >= 0) {
3832 ldout(cct, 10) << " release listing context's budget " <<
3833 list_context->ctx_budget << dendl;
3834 put_op_budget_bytes(list_context->ctx_budget);
3835 list_context->ctx_budget = -1;
3836 }
3837}
3838
3839// snapshots
3840
3841int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3842 Context *onfinish)
3843{
3844 unique_lock wl(rwlock);
3845 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3846 << snap_name << dendl;
3847
3848 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3849 if (!p)
3850 return -EINVAL;
3851 if (p->snap_exists(snap_name.c_str()))
3852 return -EEXIST;
3853
3854 PoolOp *op = new PoolOp;
3855 if (!op)
3856 return -ENOMEM;
31f18b77 3857 op->tid = ++last_tid;
7c673cae
FG
3858 op->pool = pool;
3859 op->name = snap_name;
3860 op->onfinish = onfinish;
3861 op->pool_op = POOL_OP_CREATE_SNAP;
3862 pool_ops[op->tid] = op;
3863
3864 pool_op_submit(op);
3865
3866 return 0;
3867}
3868
3869struct C_SelfmanagedSnap : public Context {
9f95a23c 3870 ceph::buffer::list bl;
7c673cae
FG
3871 snapid_t *psnapid;
3872 Context *fin;
3873 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3874 void finish(int r) override {
3875 if (r == 0) {
11fdf7f2
TL
3876 try {
3877 auto p = bl.cbegin();
3878 decode(*psnapid, p);
9f95a23c 3879 } catch (ceph::buffer::error&) {
11fdf7f2
TL
3880 r = -EIO;
3881 }
7c673cae
FG
3882 }
3883 fin->complete(r);
3884 }
3885};
3886
3887int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3888 Context *onfinish)
3889{
3890 unique_lock wl(rwlock);
3891 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3892 PoolOp *op = new PoolOp;
3893 if (!op) return -ENOMEM;
31f18b77 3894 op->tid = ++last_tid;
7c673cae
FG
3895 op->pool = pool;
3896 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3897 op->onfinish = fin;
3898 op->blp = &fin->bl;
3899 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3900 pool_ops[op->tid] = op;
3901
3902 pool_op_submit(op);
3903 return 0;
3904}
3905
3906int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3907 Context *onfinish)
3908{
3909 unique_lock wl(rwlock);
3910 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3911 << snap_name << dendl;
3912
3913 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3914 if (!p)
3915 return -EINVAL;
3916 if (!p->snap_exists(snap_name.c_str()))
3917 return -ENOENT;
3918
3919 PoolOp *op = new PoolOp;
3920 if (!op)
3921 return -ENOMEM;
31f18b77 3922 op->tid = ++last_tid;
7c673cae
FG
3923 op->pool = pool;
3924 op->name = snap_name;
3925 op->onfinish = onfinish;
3926 op->pool_op = POOL_OP_DELETE_SNAP;
3927 pool_ops[op->tid] = op;
3928
3929 pool_op_submit(op);
3930
3931 return 0;
3932}
3933
3934int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3935 Context *onfinish)
3936{
3937 unique_lock wl(rwlock);
3938 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3939 << snap << dendl;
3940 PoolOp *op = new PoolOp;
3941 if (!op) return -ENOMEM;
31f18b77 3942 op->tid = ++last_tid;
7c673cae
FG
3943 op->pool = pool;
3944 op->onfinish = onfinish;
3945 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3946 op->snapid = snap;
3947 pool_ops[op->tid] = op;
3948
3949 pool_op_submit(op);
3950
3951 return 0;
3952}
3953
11fdf7f2 3954int Objecter::create_pool(string& name, Context *onfinish,
7c673cae
FG
3955 int crush_rule)
3956{
3957 unique_lock wl(rwlock);
3958 ldout(cct, 10) << "create_pool name=" << name << dendl;
3959
3960 if (osdmap->lookup_pg_pool_name(name) >= 0)
3961 return -EEXIST;
3962
3963 PoolOp *op = new PoolOp;
3964 if (!op)
3965 return -ENOMEM;
31f18b77 3966 op->tid = ++last_tid;
7c673cae
FG
3967 op->pool = 0;
3968 op->name = name;
3969 op->onfinish = onfinish;
3970 op->pool_op = POOL_OP_CREATE;
3971 pool_ops[op->tid] = op;
7c673cae
FG
3972 op->crush_rule = crush_rule;
3973
3974 pool_op_submit(op);
3975
3976 return 0;
3977}
3978
3979int Objecter::delete_pool(int64_t pool, Context *onfinish)
3980{
3981 unique_lock wl(rwlock);
3982 ldout(cct, 10) << "delete_pool " << pool << dendl;
3983
3984 if (!osdmap->have_pg_pool(pool))
3985 return -ENOENT;
3986
3987 _do_delete_pool(pool, onfinish);
3988 return 0;
3989}
3990
3991int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3992{
3993 unique_lock wl(rwlock);
3994 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3995
3996 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3997 if (pool < 0)
3998 return pool;
3999
4000 _do_delete_pool(pool, onfinish);
4001 return 0;
4002}
4003
4004void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
4005{
4006 PoolOp *op = new PoolOp;
31f18b77 4007 op->tid = ++last_tid;
7c673cae
FG
4008 op->pool = pool;
4009 op->name = "delete";
4010 op->onfinish = onfinish;
4011 op->pool_op = POOL_OP_DELETE;
4012 pool_ops[op->tid] = op;
4013 pool_op_submit(op);
4014}
4015
7c673cae
FG
4016void Objecter::pool_op_submit(PoolOp *op)
4017{
4018 // rwlock is locked
4019 if (mon_timeout > timespan(0)) {
4020 op->ontimeout = timer.add_event(mon_timeout,
4021 [this, op]() {
4022 pool_op_cancel(op->tid, -ETIMEDOUT); });
4023 }
4024 _pool_op_submit(op);
4025}
4026
4027void Objecter::_pool_op_submit(PoolOp *op)
4028{
4029 // rwlock is locked unique
4030
4031 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4032 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4033 op->name, op->pool_op,
11fdf7f2 4034 last_seen_osdmap_version);
7c673cae
FG
4035 if (op->snapid) m->snapid = op->snapid;
4036 if (op->crush_rule) m->crush_rule = op->crush_rule;
4037 monc->send_mon_message(m);
11fdf7f2 4038 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4039
4040 logger->inc(l_osdc_poolop_send);
4041}
4042
4043/**
4044 * Handle a reply to a PoolOp message. Check that we sent the message
9f95a23c 4045 * and give the caller responsibility for the returned ceph::buffer::list.
7c673cae
FG
4046 * Then either call the finisher or stash the PoolOp, depending on if we
4047 * have a new enough map.
4048 * Lastly, clean up the message and PoolOp.
4049 */
4050void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4051{
11fdf7f2 4052 FUNCTRACE(cct);
7c673cae 4053 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4054 if (!initialized) {
7c673cae
FG
4055 sul.unlock();
4056 m->put();
4057 return;
4058 }
4059
4060 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4061 ceph_tid_t tid = m->get_tid();
4062 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4063 if (iter != pool_ops.end()) {
4064 PoolOp *op = iter->second;
4065 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4066 << ceph_pool_op_name(op->pool_op) << dendl;
4067 if (op->blp)
4068 op->blp->claim(m->response_data);
4069 if (m->version > last_seen_osdmap_version)
4070 last_seen_osdmap_version = m->version;
4071 if (osdmap->get_epoch() < m->epoch) {
4072 sul.unlock();
4073 sul.lock();
4074 // recheck op existence since we have let go of rwlock
4075 // (for promotion) above.
4076 iter = pool_ops.find(tid);
4077 if (iter == pool_ops.end())
4078 goto done; // op is gone.
4079 if (osdmap->get_epoch() < m->epoch) {
4080 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4081 << " before calling back" << dendl;
4082 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4083 } else {
4084 // map epoch changed, probably because a MOSDMap message
4085 // sneaked in. Do caller-specified callback now or else
4086 // we lose it forever.
11fdf7f2 4087 ceph_assert(op->onfinish);
7c673cae
FG
4088 op->onfinish->complete(m->replyCode);
4089 }
4090 } else {
11fdf7f2 4091 ceph_assert(op->onfinish);
7c673cae
FG
4092 op->onfinish->complete(m->replyCode);
4093 }
4094 op->onfinish = NULL;
4095 if (!sul.owns_lock()) {
4096 sul.unlock();
4097 sul.lock();
4098 }
4099 iter = pool_ops.find(tid);
4100 if (iter != pool_ops.end()) {
4101 _finish_pool_op(op, 0);
4102 }
4103 } else {
4104 ldout(cct, 10) << "unknown request " << tid << dendl;
4105 }
4106
4107done:
4108 // Not strictly necessary, since we'll release it on return.
4109 sul.unlock();
4110
4111 ldout(cct, 10) << "done" << dendl;
4112 m->put();
4113}
4114
4115int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4116{
11fdf7f2 4117 ceph_assert(initialized);
7c673cae
FG
4118
4119 unique_lock wl(rwlock);
4120
4121 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4122 if (it == pool_ops.end()) {
4123 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4124 return -ENOENT;
4125 }
4126
4127 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4128
4129 PoolOp *op = it->second;
4130 if (op->onfinish)
4131 op->onfinish->complete(r);
4132
4133 _finish_pool_op(op, r);
4134 return 0;
4135}
4136
4137void Objecter::_finish_pool_op(PoolOp *op, int r)
4138{
4139 // rwlock is locked unique
4140 pool_ops.erase(op->tid);
4141 logger->set(l_osdc_poolop_active, pool_ops.size());
4142
4143 if (op->ontimeout && r != -ETIMEDOUT) {
4144 timer.cancel_event(op->ontimeout);
4145 }
4146
4147 delete op;
4148}
4149
4150// pool stats
4151
4152void Objecter::get_pool_stats(list<string>& pools,
4153 map<string,pool_stat_t> *result,
81eedcae 4154 bool *per_pool,
7c673cae
FG
4155 Context *onfinish)
4156{
4157 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4158
4159 PoolStatOp *op = new PoolStatOp;
31f18b77 4160 op->tid = ++last_tid;
7c673cae
FG
4161 op->pools = pools;
4162 op->pool_stats = result;
81eedcae 4163 op->per_pool = per_pool;
7c673cae
FG
4164 op->onfinish = onfinish;
4165 if (mon_timeout > timespan(0)) {
4166 op->ontimeout = timer.add_event(mon_timeout,
4167 [this, op]() {
4168 pool_stat_op_cancel(op->tid,
4169 -ETIMEDOUT); });
4170 } else {
4171 op->ontimeout = 0;
4172 }
4173
4174 unique_lock wl(rwlock);
4175
4176 poolstat_ops[op->tid] = op;
4177
4178 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4179
4180 _poolstat_submit(op);
4181}
4182
4183void Objecter::_poolstat_submit(PoolStatOp *op)
4184{
4185 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4186 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4187 op->pools,
4188 last_seen_pgmap_version));
11fdf7f2 4189 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4190
4191 logger->inc(l_osdc_poolstat_send);
4192}
4193
4194void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4195{
4196 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4197 ceph_tid_t tid = m->get_tid();
4198
4199 unique_lock wl(rwlock);
31f18b77 4200 if (!initialized) {
7c673cae
FG
4201 m->put();
4202 return;
4203 }
4204
4205 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4206 if (iter != poolstat_ops.end()) {
4207 PoolStatOp *op = poolstat_ops[tid];
4208 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4209 *op->pool_stats = m->pool_stats;
81eedcae 4210 *op->per_pool = m->per_pool;
7c673cae
FG
4211 if (m->version > last_seen_pgmap_version) {
4212 last_seen_pgmap_version = m->version;
4213 }
4214 op->onfinish->complete(0);
4215 _finish_pool_stat_op(op, 0);
4216 } else {
4217 ldout(cct, 10) << "unknown request " << tid << dendl;
4218 }
4219 ldout(cct, 10) << "done" << dendl;
4220 m->put();
4221}
4222
4223int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4224{
11fdf7f2 4225 ceph_assert(initialized);
7c673cae
FG
4226
4227 unique_lock wl(rwlock);
4228
4229 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4230 if (it == poolstat_ops.end()) {
4231 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4232 return -ENOENT;
4233 }
4234
4235 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4236
4237 PoolStatOp *op = it->second;
4238 if (op->onfinish)
4239 op->onfinish->complete(r);
4240 _finish_pool_stat_op(op, r);
4241 return 0;
4242}
4243
4244void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4245{
4246 // rwlock is locked unique
4247
4248 poolstat_ops.erase(op->tid);
4249 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4250
4251 if (op->ontimeout && r != -ETIMEDOUT)
4252 timer.cancel_event(op->ontimeout);
4253
4254 delete op;
4255}
4256
d2e6a577
FG
4257void Objecter::get_fs_stats(ceph_statfs& result,
4258 boost::optional<int64_t> data_pool,
4259 Context *onfinish)
7c673cae
FG
4260{
4261 ldout(cct, 10) << "get_fs_stats" << dendl;
4262 unique_lock l(rwlock);
4263
4264 StatfsOp *op = new StatfsOp;
31f18b77 4265 op->tid = ++last_tid;
7c673cae 4266 op->stats = &result;
d2e6a577 4267 op->data_pool = data_pool;
7c673cae
FG
4268 op->onfinish = onfinish;
4269 if (mon_timeout > timespan(0)) {
4270 op->ontimeout = timer.add_event(mon_timeout,
4271 [this, op]() {
4272 statfs_op_cancel(op->tid,
4273 -ETIMEDOUT); });
4274 } else {
4275 op->ontimeout = 0;
4276 }
4277 statfs_ops[op->tid] = op;
4278
4279 logger->set(l_osdc_statfs_active, statfs_ops.size());
4280
4281 _fs_stats_submit(op);
4282}
4283
4284void Objecter::_fs_stats_submit(StatfsOp *op)
4285{
4286 // rwlock is locked unique
4287
4288 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4289 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4290 op->data_pool,
7c673cae 4291 last_seen_pgmap_version));
11fdf7f2 4292 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4293
4294 logger->inc(l_osdc_statfs_send);
4295}
4296
4297void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4298{
4299 unique_lock wl(rwlock);
31f18b77 4300 if (!initialized) {
7c673cae
FG
4301 m->put();
4302 return;
4303 }
4304
4305 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4306 ceph_tid_t tid = m->get_tid();
4307
4308 if (statfs_ops.count(tid)) {
4309 StatfsOp *op = statfs_ops[tid];
4310 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4311 *(op->stats) = m->h.st;
4312 if (m->h.version > last_seen_pgmap_version)
4313 last_seen_pgmap_version = m->h.version;
4314 op->onfinish->complete(0);
4315 _finish_statfs_op(op, 0);
4316 } else {
4317 ldout(cct, 10) << "unknown request " << tid << dendl;
4318 }
4319 m->put();
4320 ldout(cct, 10) << "done" << dendl;
4321}
4322
4323int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4324{
11fdf7f2 4325 ceph_assert(initialized);
7c673cae
FG
4326
4327 unique_lock wl(rwlock);
4328
4329 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4330 if (it == statfs_ops.end()) {
4331 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4332 return -ENOENT;
4333 }
4334
4335 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4336
4337 StatfsOp *op = it->second;
4338 if (op->onfinish)
4339 op->onfinish->complete(r);
4340 _finish_statfs_op(op, r);
4341 return 0;
4342}
4343
4344void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4345{
4346 // rwlock is locked unique
4347
4348 statfs_ops.erase(op->tid);
4349 logger->set(l_osdc_statfs_active, statfs_ops.size());
4350
4351 if (op->ontimeout && r != -ETIMEDOUT)
4352 timer.cancel_event(op->ontimeout);
4353
4354 delete op;
4355}
4356
4357// scatter/gather
4358
4359void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
9f95a23c
TL
4360 vector<ceph::buffer::list>& resultbl,
4361 ceph::buffer::list *bl, Context *onfinish)
7c673cae
FG
4362{
4363 // all done
4364 ldout(cct, 15) << "_sg_read_finish" << dendl;
4365
4366 if (extents.size() > 1) {
4367 Striper::StripedReadResult r;
9f95a23c 4368 vector<ceph::buffer::list>::iterator bit = resultbl.begin();
7c673cae
FG
4369 for (vector<ObjectExtent>::iterator eit = extents.begin();
4370 eit != extents.end();
4371 ++eit, ++bit) {
4372 r.add_partial_result(cct, *bit, eit->buffer_extents);
4373 }
4374 bl->clear();
4375 r.assemble_result(cct, *bl, false);
4376 } else {
4377 ldout(cct, 15) << " only one frag" << dendl;
4378 bl->claim(resultbl[0]);
4379 }
4380
4381 // done
4382 uint64_t bytes_read = bl->length();
4383 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4384
4385 if (onfinish) {
4386 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4387 }
4388}
4389
4390
4391void Objecter::ms_handle_connect(Connection *con)
4392{
4393 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4394 if (!initialized)
7c673cae
FG
4395 return;
4396
4397 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4398 resend_mon_ops();
4399}
4400
4401bool Objecter::ms_handle_reset(Connection *con)
4402{
31f18b77 4403 if (!initialized)
7c673cae
FG
4404 return false;
4405 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
91327a77 4406 unique_lock wl(rwlock);
11fdf7f2
TL
4407
4408 auto priv = con->get_priv();
4409 auto session = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4410 if (session) {
4411 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4412 << " osd." << session->osd << dendl;
a8e16298
TL
4413 // the session maybe had been closed if new osdmap just handled
4414 // says the osd down
4415 if (!(initialized && osdmap->is_up(session->osd))) {
4416 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
7c673cae
FG
4417 wl.unlock();
4418 return false;
4419 }
4420 map<uint64_t, LingerOp *> lresend;
4421 OSDSession::unique_lock sl(session->lock);
4422 _reopen_session(session);
4423 _kick_requests(session, lresend);
4424 sl.unlock();
4425 _linger_ops_resend(lresend, wl);
4426 wl.unlock();
4427 maybe_request_map();
7c673cae
FG
4428 }
4429 return true;
4430 }
4431 return false;
4432}
4433
4434void Objecter::ms_handle_remote_reset(Connection *con)
4435{
4436 /*
4437 * treat these the same.
4438 */
4439 ms_handle_reset(con);
4440}
4441
4442bool Objecter::ms_handle_refused(Connection *con)
4443{
4444 // just log for now
4445 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4446 int osd = osdmap->identify_osd(con->get_peer_addr());
4447 if (osd >= 0) {
4448 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4449 }
4450 }
4451 return false;
4452}
4453
7c673cae
FG
4454void Objecter::op_target_t::dump(Formatter *f) const
4455{
4456 f->dump_stream("pg") << pgid;
4457 f->dump_int("osd", osd);
4458 f->dump_stream("object_id") << base_oid;
4459 f->dump_stream("object_locator") << base_oloc;
4460 f->dump_stream("target_object_id") << target_oid;
4461 f->dump_stream("target_object_locator") << target_oloc;
4462 f->dump_int("paused", (int)paused);
4463 f->dump_int("used_replica", (int)used_replica);
4464 f->dump_int("precalc_pgid", (int)precalc_pgid);
4465}
4466
4467void Objecter::_dump_active(OSDSession *s)
4468{
4469 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4470 p != s->ops.end();
4471 ++p) {
4472 Op *op = p->second;
4473 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4474 << "\tosd." << (op->session ? op->session->osd : -1)
4475 << "\t" << op->target.base_oid
4476 << "\t" << op->ops << dendl;
4477 }
4478}
4479
4480void Objecter::_dump_active()
4481{
31f18b77 4482 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae
FG
4483 << dendl;
4484 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4485 siter != osd_sessions.end(); ++siter) {
4486 OSDSession *s = siter->second;
4487 OSDSession::shared_lock sl(s->lock);
4488 _dump_active(s);
4489 sl.unlock();
4490 }
4491 _dump_active(homeless_session);
4492}
4493
4494void Objecter::dump_active()
4495{
4496 shared_lock rl(rwlock);
4497 _dump_active();
4498 rl.unlock();
4499}
4500
4501void Objecter::dump_requests(Formatter *fmt)
4502{
4503 // Read-lock on Objecter held here
4504 fmt->open_object_section("requests");
4505 dump_ops(fmt);
4506 dump_linger_ops(fmt);
4507 dump_pool_ops(fmt);
4508 dump_pool_stat_ops(fmt);
4509 dump_statfs_ops(fmt);
4510 dump_command_ops(fmt);
4511 fmt->close_section(); // requests object
4512}
4513
4514void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4515{
4516 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4517 p != s->ops.end();
4518 ++p) {
4519 Op *op = p->second;
92f5a8d4 4520 auto age = std::chrono::duration<double>(coarse_mono_clock::now() - op->stamp);
7c673cae
FG
4521 fmt->open_object_section("op");
4522 fmt->dump_unsigned("tid", op->tid);
4523 op->target.dump(fmt);
4524 fmt->dump_stream("last_sent") << op->stamp;
92f5a8d4 4525 fmt->dump_float("age", age.count());
7c673cae
FG
4526 fmt->dump_int("attempts", op->attempts);
4527 fmt->dump_stream("snapid") << op->snapid;
4528 fmt->dump_stream("snap_context") << op->snapc;
4529 fmt->dump_stream("mtime") << op->mtime;
4530
4531 fmt->open_array_section("osd_ops");
4532 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4533 it != op->ops.end();
4534 ++it) {
4535 fmt->dump_stream("osd_op") << *it;
4536 }
4537 fmt->close_section(); // osd_ops array
4538
4539 fmt->close_section(); // op object
4540 }
4541}
4542
4543void Objecter::dump_ops(Formatter *fmt)
4544{
4545 // Read-lock on Objecter held
4546 fmt->open_array_section("ops");
4547 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4548 siter != osd_sessions.end(); ++siter) {
4549 OSDSession *s = siter->second;
4550 OSDSession::shared_lock sl(s->lock);
4551 _dump_ops(s, fmt);
4552 sl.unlock();
4553 }
4554 _dump_ops(homeless_session, fmt);
4555 fmt->close_section(); // ops array
4556}
4557
4558void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4559{
4560 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4561 p != s->linger_ops.end();
4562 ++p) {
4563 LingerOp *op = p->second;
4564 fmt->open_object_section("linger_op");
4565 fmt->dump_unsigned("linger_id", op->linger_id);
4566 op->target.dump(fmt);
4567 fmt->dump_stream("snapid") << op->snap;
4568 fmt->dump_stream("registered") << op->registered;
4569 fmt->close_section(); // linger_op object
4570 }
4571}
4572
4573void Objecter::dump_linger_ops(Formatter *fmt)
4574{
4575 // We have a read-lock on the objecter
4576 fmt->open_array_section("linger_ops");
4577 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4578 siter != osd_sessions.end(); ++siter) {
4579 OSDSession *s = siter->second;
4580 OSDSession::shared_lock sl(s->lock);
4581 _dump_linger_ops(s, fmt);
4582 sl.unlock();
4583 }
4584 _dump_linger_ops(homeless_session, fmt);
4585 fmt->close_section(); // linger_ops array
4586}
4587
4588void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4589{
4590 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4591 p != s->command_ops.end();
4592 ++p) {
4593 CommandOp *op = p->second;
4594 fmt->open_object_section("command_op");
4595 fmt->dump_unsigned("command_id", op->tid);
4596 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4597 fmt->open_array_section("command");
4598 for (vector<string>::const_iterator q = op->cmd.begin();
4599 q != op->cmd.end(); ++q)
4600 fmt->dump_string("word", *q);
4601 fmt->close_section();
4602 if (op->target_osd >= 0)
4603 fmt->dump_int("target_osd", op->target_osd);
4604 else
4605 fmt->dump_stream("target_pg") << op->target_pg;
4606 fmt->close_section(); // command_op object
4607 }
4608}
4609
4610void Objecter::dump_command_ops(Formatter *fmt)
4611{
4612 // We have a read-lock on the Objecter here
4613 fmt->open_array_section("command_ops");
4614 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4615 siter != osd_sessions.end(); ++siter) {
4616 OSDSession *s = siter->second;
4617 OSDSession::shared_lock sl(s->lock);
4618 _dump_command_ops(s, fmt);
4619 sl.unlock();
4620 }
4621 _dump_command_ops(homeless_session, fmt);
4622 fmt->close_section(); // command_ops array
4623}
4624
4625void Objecter::dump_pool_ops(Formatter *fmt) const
4626{
4627 fmt->open_array_section("pool_ops");
4628 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4629 p != pool_ops.end();
4630 ++p) {
4631 PoolOp *op = p->second;
4632 fmt->open_object_section("pool_op");
4633 fmt->dump_unsigned("tid", op->tid);
4634 fmt->dump_int("pool", op->pool);
4635 fmt->dump_string("name", op->name);
4636 fmt->dump_int("operation_type", op->pool_op);
7c673cae
FG
4637 fmt->dump_unsigned("crush_rule", op->crush_rule);
4638 fmt->dump_stream("snapid") << op->snapid;
4639 fmt->dump_stream("last_sent") << op->last_submit;
4640 fmt->close_section(); // pool_op object
4641 }
4642 fmt->close_section(); // pool_ops array
4643}
4644
4645void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4646{
4647 fmt->open_array_section("pool_stat_ops");
4648 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4649 p != poolstat_ops.end();
4650 ++p) {
4651 PoolStatOp *op = p->second;
4652 fmt->open_object_section("pool_stat_op");
4653 fmt->dump_unsigned("tid", op->tid);
4654 fmt->dump_stream("last_sent") << op->last_submit;
4655
4656 fmt->open_array_section("pools");
4657 for (list<string>::const_iterator it = op->pools.begin();
4658 it != op->pools.end();
4659 ++it) {
4660 fmt->dump_string("pool", *it);
4661 }
4662 fmt->close_section(); // pools array
4663
4664 fmt->close_section(); // pool_stat_op object
4665 }
4666 fmt->close_section(); // pool_stat_ops array
4667}
4668
4669void Objecter::dump_statfs_ops(Formatter *fmt) const
4670{
4671 fmt->open_array_section("statfs_ops");
4672 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4673 p != statfs_ops.end();
4674 ++p) {
4675 StatfsOp *op = p->second;
4676 fmt->open_object_section("statfs_op");
4677 fmt->dump_unsigned("tid", op->tid);
4678 fmt->dump_stream("last_sent") << op->last_submit;
4679 fmt->close_section(); // statfs_op object
4680 }
4681 fmt->close_section(); // statfs_ops array
4682}
4683
4684Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4685 m_objecter(objecter)
4686{
4687}
4688
9f95a23c
TL
4689int Objecter::RequestStateHook::call(std::string_view command,
4690 const cmdmap_t& cmdmap,
4691 Formatter *f,
4692 std::ostream& ss,
4693 ceph::buffer::list& out)
7c673cae 4694{
7c673cae
FG
4695 shared_lock rl(m_objecter->rwlock);
4696 m_objecter->dump_requests(f);
9f95a23c 4697 return 0;
7c673cae
FG
4698}
4699
4700void Objecter::blacklist_self(bool set)
4701{
4702 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4703
4704 vector<string> cmd;
4705 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4706 if (set)
4707 cmd.push_back("\"blacklistop\":\"add\",");
4708 else
4709 cmd.push_back("\"blacklistop\":\"rm\",");
4710 stringstream ss;
11fdf7f2
TL
4711 // this is somewhat imprecise in that we are blacklisting our first addr only
4712 ss << messenger->get_myaddrs().front().get_legacy_str();
7c673cae
FG
4713 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4714
4715 MMonCommand *m = new MMonCommand(monc->get_fsid());
4716 m->cmd = cmd;
4717
4718 monc->send_mon_message(m);
4719}
4720
4721// commands
4722
4723void Objecter::handle_command_reply(MCommandReply *m)
4724{
4725 unique_lock wl(rwlock);
31f18b77 4726 if (!initialized) {
7c673cae
FG
4727 m->put();
4728 return;
4729 }
4730
4731 ConnectionRef con = m->get_connection();
11fdf7f2
TL
4732 auto priv = con->get_priv();
4733 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4734 if (!s || s->con != con) {
4735 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4736 m->put();
7c673cae
FG
4737 return;
4738 }
4739
4740 OSDSession::shared_lock sl(s->lock);
4741 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4742 if (p == s->command_ops.end()) {
4743 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4744 << " not found" << dendl;
4745 m->put();
4746 sl.unlock();
7c673cae
FG
4747 return;
4748 }
4749
4750 CommandOp *c = p->second;
4751 if (!c->session ||
4752 m->get_connection() != c->session->con) {
4753 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4754 << " got reply from wrong connection "
4755 << m->get_connection() << " " << m->get_source_inst()
4756 << dendl;
4757 m->put();
4758 sl.unlock();
7c673cae
FG
4759 return;
4760 }
9f95a23c
TL
4761 if (m->r == -EAGAIN) {
4762 ldout(cct,10) << __func__ << " tid " << m->get_tid()
4763 << " got EAGAIN, requesting map and resending" << dendl;
4764 // NOTE: This might resend twice... once now, and once again when
4765 // we get an updated osdmap and the PG is found to have moved.
4766 _maybe_request_map();
4767 _send_command(c);
4768 m->put();
4769 sl.unlock();
4770 return;
4771 }
4772
7c673cae
FG
4773 if (c->poutbl) {
4774 c->poutbl->claim(m->get_data());
4775 }
4776
4777 sl.unlock();
4778
28e407b8 4779 OSDSession::unique_lock sul(s->lock);
7c673cae 4780 _finish_command(c, m->r, m->rs);
28e407b8
AA
4781 sul.unlock();
4782
7c673cae 4783 m->put();
7c673cae
FG
4784}
4785
4786void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4787{
4788 shunique_lock sul(rwlock, ceph::acquire_unique);
4789
31f18b77 4790 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4791 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4792 c->tid = tid;
4793
4794 {
4795 OSDSession::unique_lock hs_wl(homeless_session->lock);
4796 _session_command_op_assign(homeless_session, c);
4797 }
4798
4799 _calc_command_target(c, sul);
4800 _assign_command_session(c, sul);
4801 if (osd_timeout > timespan(0)) {
4802 c->ontimeout = timer.add_event(osd_timeout,
4803 [this, c, tid]() {
4804 command_op_cancel(c->session, tid,
4805 -ETIMEDOUT); });
4806 }
4807
4808 if (!c->session->is_homeless()) {
4809 _send_command(c);
4810 } else {
4811 _maybe_request_map();
4812 }
4813 if (c->map_check_error)
4814 _send_command_map_check(c);
4815 *ptid = tid;
4816
4817 logger->inc(l_osdc_command_active);
4818}
4819
4820int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4821{
11fdf7f2 4822 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4823
4824 c->map_check_error = 0;
4825
4826 // ignore overlays, just like we do with pg ops
4827 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4828
4829 if (c->target_osd >= 0) {
4830 if (!osdmap->exists(c->target_osd)) {
4831 c->map_check_error = -ENOENT;
4832 c->map_check_error_str = "osd dne";
4833 c->target.osd = -1;
4834 return RECALC_OP_TARGET_OSD_DNE;
4835 }
4836 if (osdmap->is_down(c->target_osd)) {
4837 c->map_check_error = -ENXIO;
4838 c->map_check_error_str = "osd down";
4839 c->target.osd = -1;
4840 return RECALC_OP_TARGET_OSD_DOWN;
4841 }
4842 c->target.osd = c->target_osd;
4843 } else {
4844 int ret = _calc_target(&(c->target), nullptr, true);
4845 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4846 c->map_check_error = -ENOENT;
4847 c->map_check_error_str = "pool dne";
4848 c->target.osd = -1;
4849 return ret;
4850 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4851 c->map_check_error = -ENXIO;
4852 c->map_check_error_str = "osd down";
4853 c->target.osd = -1;
4854 return ret;
4855 }
4856 }
4857
4858 OSDSession *s;
4859 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4860 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4861
4862 if (c->session != s) {
4863 put_session(s);
4864 return RECALC_OP_TARGET_NEED_RESEND;
4865 }
4866
4867 put_session(s);
4868
4869 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4870 << c->session << dendl;
4871
4872 return RECALC_OP_TARGET_NO_ACTION;
4873}
4874
4875void Objecter::_assign_command_session(CommandOp *c,
4876 shunique_lock& sul)
4877{
11fdf7f2 4878 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4879
4880 OSDSession *s;
4881 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4882 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4883
4884 if (c->session != s) {
4885 if (c->session) {
4886 OSDSession *cs = c->session;
4887 OSDSession::unique_lock csl(cs->lock);
4888 _session_command_op_remove(c->session, c);
4889 csl.unlock();
4890 }
4891 OSDSession::unique_lock sl(s->lock);
4892 _session_command_op_assign(s, c);
4893 }
4894
4895 put_session(s);
4896}
4897
4898void Objecter::_send_command(CommandOp *c)
4899{
4900 ldout(cct, 10) << "_send_command " << c->tid << dendl;
11fdf7f2
TL
4901 ceph_assert(c->session);
4902 ceph_assert(c->session->con);
7c673cae
FG
4903 MCommand *m = new MCommand(monc->monmap.fsid);
4904 m->cmd = c->cmd;
4905 m->set_data(c->inbl);
4906 m->set_tid(c->tid);
4907 c->session->con->send_message(m);
4908 logger->inc(l_osdc_command_send);
4909}
4910
4911int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4912{
11fdf7f2 4913 ceph_assert(initialized);
7c673cae
FG
4914
4915 unique_lock wl(rwlock);
4916
4917 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4918 if (it == s->command_ops.end()) {
4919 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4920 return -ENOENT;
4921 }
4922
4923 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4924
4925 CommandOp *op = it->second;
4926 _command_cancel_map_check(op);
28e407b8 4927 OSDSession::unique_lock sl(op->session->lock);
7c673cae 4928 _finish_command(op, r, "");
28e407b8 4929 sl.unlock();
7c673cae
FG
4930 return 0;
4931}
4932
4933void Objecter::_finish_command(CommandOp *c, int r, string rs)
4934{
4935 // rwlock is locked unique
28e407b8 4936 // session lock is locked
7c673cae
FG
4937
4938 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4939 << rs << dendl;
4940 if (c->prs)
4941 *c->prs = rs;
4942 if (c->onfinish)
4943 c->onfinish->complete(r);
4944
4945 if (c->ontimeout && r != -ETIMEDOUT)
4946 timer.cancel_event(c->ontimeout);
4947
7c673cae 4948 _session_command_op_remove(c->session, c);
7c673cae
FG
4949
4950 c->put();
4951
4952 logger->dec(l_osdc_command_active);
4953}
4954
4955Objecter::OSDSession::~OSDSession()
4956{
4957 // Caller is responsible for re-assigning or
4958 // destroying any ops that were assigned to us
11fdf7f2
TL
4959 ceph_assert(ops.empty());
4960 ceph_assert(linger_ops.empty());
4961 ceph_assert(command_ops.empty());
7c673cae
FG
4962}
4963
9f95a23c
TL
4964Objecter::Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
4965 Finisher *fin,
4966 double mon_timeout,
4967 double osd_timeout) :
4968 Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
4969 trace_endpoint("0.0.0.0", 0, "Objecter"),
4970 osdmap{std::make_unique<OSDMap>()},
4971 homeless_session(new OSDSession(cct, -1)),
4972 mon_timeout(ceph::make_timespan(mon_timeout)),
4973 osd_timeout(ceph::make_timespan(osd_timeout)),
4974 op_throttle_bytes(cct, "objecter_bytes",
4975 cct->_conf->objecter_inflight_op_bytes),
4976 op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
4977 retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
4978{}
4979
7c673cae
FG
4980Objecter::~Objecter()
4981{
11fdf7f2
TL
4982 ceph_assert(homeless_session->get_nref() == 1);
4983 ceph_assert(num_homeless_ops == 0);
7c673cae
FG
4984 homeless_session->put();
4985
11fdf7f2
TL
4986 ceph_assert(osd_sessions.empty());
4987 ceph_assert(poolstat_ops.empty());
4988 ceph_assert(statfs_ops.empty());
4989 ceph_assert(pool_ops.empty());
4990 ceph_assert(waiting_for_map.empty());
4991 ceph_assert(linger_ops.empty());
4992 ceph_assert(check_latest_map_lingers.empty());
4993 ceph_assert(check_latest_map_ops.empty());
4994 ceph_assert(check_latest_map_commands.empty());
7c673cae 4995
11fdf7f2
TL
4996 ceph_assert(!m_request_state_hook);
4997 ceph_assert(!logger);
7c673cae
FG
4998}
4999
5000/**
5001 * Wait until this OSD map epoch is received before
5002 * sending any more operations to OSDs. Use this
5003 * when it is known that the client can't trust
5004 * anything from before this epoch (e.g. due to
5005 * client blacklist at this epoch).
5006 */
5007void Objecter::set_epoch_barrier(epoch_t epoch)
5008{
5009 unique_lock wl(rwlock);
5010
5011 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5012 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5013 << dendl;
5014 if (epoch > epoch_barrier) {
5015 epoch_barrier = epoch;
5016 _maybe_request_map();
5017 }
5018}
5019
5020
5021
5022hobject_t Objecter::enumerate_objects_begin()
5023{
5024 return hobject_t();
5025}
5026
5027hobject_t Objecter::enumerate_objects_end()
5028{
5029 return hobject_t::get_max();
5030}
5031
5032struct C_EnumerateReply : public Context {
9f95a23c 5033 ceph::buffer::list bl;
7c673cae
FG
5034
5035 Objecter *objecter;
5036 hobject_t *next;
5037 std::list<librados::ListObjectImpl> *result;
5038 const hobject_t end;
5039 const int64_t pool_id;
5040 Context *on_finish;
5041
5042 epoch_t epoch;
5043 int budget;
5044
5045 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5046 std::list<librados::ListObjectImpl> *result_,
5047 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5048 objecter(objecter_), next(next_), result(result_),
5049 end(end_), pool_id(pool_id_), on_finish(on_finish_),
11fdf7f2 5050 epoch(0), budget(-1)
7c673cae
FG
5051 {}
5052
5053 void finish(int r) override {
5054 objecter->_enumerate_reply(
5055 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5056 }
5057};
5058
5059void Objecter::enumerate_objects(
5060 int64_t pool_id,
5061 const std::string &ns,
5062 const hobject_t &start,
5063 const hobject_t &end,
5064 const uint32_t max,
9f95a23c 5065 const ceph::buffer::list &filter_bl,
7c673cae
FG
5066 std::list<librados::ListObjectImpl> *result,
5067 hobject_t *next,
5068 Context *on_finish)
5069{
11fdf7f2 5070 ceph_assert(result);
7c673cae
FG
5071
5072 if (!end.is_max() && start > end) {
5073 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5074 on_finish->complete(-EINVAL);
5075 return;
5076 }
5077
5078 if (max < 1) {
5079 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5080 on_finish->complete(-EINVAL);
5081 return;
5082 }
5083
5084 if (start.is_max()) {
5085 on_finish->complete(0);
5086 return;
5087 }
5088
5089 shared_lock rl(rwlock);
11fdf7f2 5090 ceph_assert(osdmap->get_epoch());
7c673cae
FG
5091 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5092 rl.unlock();
5093 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5094 on_finish->complete(-EOPNOTSUPP);
5095 return;
5096 }
5097 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5098 if (!p) {
5099 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5100 << osdmap->get_epoch() << dendl;
5101 rl.unlock();
5102 on_finish->complete(-ENOENT);
5103 return;
5104 } else {
5105 rl.unlock();
5106 }
5107
5108 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5109
5110 // Stash completion state
5111 C_EnumerateReply *on_ack = new C_EnumerateReply(
5112 this, next, result, end, pool_id, on_finish);
5113
5114 ObjectOperation op;
5115 op.pg_nls(max, filter_bl, start, 0);
5116
5117 // Issue. See you later in _enumerate_reply
5118 object_locator_t oloc(pool_id, ns);
5119 pg_read(start.get_hash(), oloc, op,
5120 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5121}
5122
5123void Objecter::_enumerate_reply(
9f95a23c 5124 ceph::buffer::list &bl,
7c673cae
FG
5125 int r,
5126 const hobject_t &end,
5127 const int64_t pool_id,
5128 int budget,
5129 epoch_t reply_epoch,
5130 std::list<librados::ListObjectImpl> *result,
5131 hobject_t *next,
5132 Context *on_finish)
5133{
11fdf7f2 5134 if (budget >= 0) {
7c673cae
FG
5135 put_op_budget_bytes(budget);
5136 }
5137
5138 if (r < 0) {
5139 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5140 on_finish->complete(r);
5141 return;
5142 }
5143
11fdf7f2 5144 ceph_assert(next != NULL);
7c673cae
FG
5145
5146 // Decode the results
11fdf7f2 5147 auto iter = bl.cbegin();
7c673cae
FG
5148 pg_nls_response_t response;
5149
11fdf7f2 5150 decode(response, iter);
7c673cae 5151 if (!iter.end()) {
9f95a23c
TL
5152 // extra_info isn't used anywhere. We do this solely to preserve
5153 // backward compatibility
5154 ceph::buffer::list legacy_extra_info;
5155 decode(legacy_extra_info, iter);
7c673cae
FG
5156 }
5157
5158 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5159 << " handle " << response.handle
5160 << " reply_epoch " << reply_epoch << dendl;
5161 ldout(cct, 20) << __func__ << ": response.entries.size "
5162 << response.entries.size() << ", response.entries "
5163 << response.entries << dendl;
5164 if (response.handle <= end) {
5165 *next = response.handle;
5166 } else {
5167 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5168 << dendl;
5169 *next = end;
5170
5171 // drop anything after 'end'
5172 shared_lock rl(rwlock);
5173 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5174 if (!pool) {
5175 // pool is gone, drop any results which are now meaningless.
5176 rl.unlock();
5177 on_finish->complete(-ENOENT);
5178 return;
5179 }
5180 while (!response.entries.empty()) {
5181 uint32_t hash = response.entries.back().locator.empty() ?
5182 pool->hash_key(response.entries.back().oid,
5183 response.entries.back().nspace) :
5184 pool->hash_key(response.entries.back().locator,
5185 response.entries.back().nspace);
5186 hobject_t last(response.entries.back().oid,
5187 response.entries.back().locator,
5188 CEPH_NOSNAP,
5189 hash,
5190 pool_id,
5191 response.entries.back().nspace);
5192 if (last < end)
5193 break;
5194 ldout(cct, 20) << __func__ << " dropping item " << last
5195 << " >= end " << end << dendl;
5196 response.entries.pop_back();
5197 }
5198 rl.unlock();
5199 }
5200 if (!response.entries.empty()) {
5201 result->merge(response.entries);
5202 }
5203
5204 // release the listing context's budget once all
5205 // OPs (in the session) are finished
5206#if 0
5207 put_nlist_context_budget(list_context);
5208#endif
5209 on_finish->complete(r);
5210 return;
5211}
5212
5213namespace {
5214 using namespace librados;
5215
5216 template <typename T>
9f95a23c 5217 void do_decode(std::vector<T>& items, std::vector<ceph::buffer::list>& bls)
7c673cae
FG
5218 {
5219 for (auto bl : bls) {
11fdf7f2 5220 auto p = bl.cbegin();
7c673cae
FG
5221 T t;
5222 decode(t, p);
5223 items.push_back(t);
5224 }
5225 }
5226
5227 struct C_ObjectOperation_scrub_ls : public Context {
9f95a23c 5228 ceph::buffer::list bl;
7c673cae
FG
5229 uint32_t *interval;
5230 std::vector<inconsistent_obj_t> *objects = nullptr;
5231 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5232 int *rval;
5233
5234 C_ObjectOperation_scrub_ls(uint32_t *interval,
5235 std::vector<inconsistent_obj_t> *objects,
5236 int *rval)
5237 : interval(interval), objects(objects), rval(rval) {}
5238 C_ObjectOperation_scrub_ls(uint32_t *interval,
5239 std::vector<inconsistent_snapset_t> *snapsets,
5240 int *rval)
5241 : interval(interval), snapsets(snapsets), rval(rval) {}
5242 void finish(int r) override {
5243 if (r < 0 && r != -EAGAIN) {
5244 if (rval)
5245 *rval = r;
5246 return;
5247 }
5248
5249 if (rval)
5250 *rval = 0;
5251
5252 try {
5253 decode();
9f95a23c 5254 } catch (ceph::buffer::error&) {
7c673cae
FG
5255 if (rval)
5256 *rval = -EIO;
5257 }
5258 }
5259 private:
5260 void decode() {
5261 scrub_ls_result_t result;
11fdf7f2 5262 auto p = bl.cbegin();
7c673cae
FG
5263 result.decode(p);
5264 *interval = result.interval;
5265 if (objects) {
5266 do_decode(*objects, result.vals);
5267 } else {
5268 do_decode(*snapsets, result.vals);
5269 }
5270 }
5271 };
5272
5273 template <typename T>
5274 void do_scrub_ls(::ObjectOperation *op,
5275 const scrub_ls_arg_t& arg,
5276 std::vector<T> *items,
5277 uint32_t *interval,
5278 int *rval)
5279 {
5280 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5281 op->flags |= CEPH_OSD_FLAG_PGOP;
11fdf7f2 5282 ceph_assert(interval);
7c673cae
FG
5283 arg.encode(osd_op.indata);
5284 unsigned p = op->ops.size() - 1;
5285 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5286 op->out_handler[p] = h;
5287 op->out_bl[p] = &h->bl;
5288 op->out_rval[p] = rval;
5289 }
5290}
5291
5292void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5293 uint64_t max_to_get,
5294 std::vector<librados::inconsistent_obj_t> *objects,
5295 uint32_t *interval,
5296 int *rval)
5297{
5298 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5299 do_scrub_ls(this, arg, objects, interval, rval);
5300}
5301
5302void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5303 uint64_t max_to_get,
5304 std::vector<librados::inconsistent_snapset_t> *snapsets,
5305 uint32_t *interval,
5306 int *rval)
5307{
5308 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5309 do_scrub_ls(this, arg, snapsets, interval, rval);
5310}