]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
9f95a23c
TL
15#include <cerrno>
16
7c673cae
FG
17#include "Objecter.h"
18#include "osd/OSDMap.h"
19#include "Filer.h"
20
21#include "mon/MonClient.h"
22
23#include "msg/Messenger.h"
24#include "msg/Message.h"
25
26#include "messages/MPing.h"
27#include "messages/MOSDOp.h"
28#include "messages/MOSDOpReply.h"
29#include "messages/MOSDBackoff.h"
30#include "messages/MOSDMap.h"
31
32#include "messages/MPoolOp.h"
33#include "messages/MPoolOpReply.h"
34
35#include "messages/MGetPoolStats.h"
36#include "messages/MGetPoolStatsReply.h"
37#include "messages/MStatfs.h"
38#include "messages/MStatfsReply.h"
39
40#include "messages/MMonCommand.h"
41
42#include "messages/MCommand.h"
43#include "messages/MCommandReply.h"
44
45#include "messages/MWatchNotify.h"
46
7c673cae
FG
47
48#include "common/config.h"
49#include "common/perf_counters.h"
50#include "common/scrub_types.h"
51#include "include/str_list.h"
52#include "common/errno.h"
53#include "common/EventTrace.h"
54
9f95a23c
TL
55using std::list;
56using std::make_pair;
57using std::map;
58using std::ostream;
59using std::ostringstream;
60using std::pair;
61using std::set;
62using std::string;
63using std::stringstream;
64using std::vector;
65
66using ceph::decode;
67using ceph::encode;
68using ceph::Formatter;
69
70using std::defer_lock;
71
7c673cae
FG
72using ceph::real_time;
73using ceph::real_clock;
74
75using ceph::mono_clock;
76using ceph::mono_time;
77
78using ceph::timespan;
79
9f95a23c
TL
80using ceph::shunique_lock;
81using ceph::acquire_shared;
82using ceph::acquire_unique;
7c673cae
FG
83
84#define dout_subsys ceph_subsys_objecter
85#undef dout_prefix
86#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
87
88
89enum {
90 l_osdc_first = 123200,
91 l_osdc_op_active,
92 l_osdc_op_laggy,
93 l_osdc_op_send,
94 l_osdc_op_send_bytes,
95 l_osdc_op_resend,
96 l_osdc_op_reply,
97
98 l_osdc_op,
99 l_osdc_op_r,
100 l_osdc_op_w,
101 l_osdc_op_rmw,
102 l_osdc_op_pg,
103
104 l_osdc_osdop_stat,
105 l_osdc_osdop_create,
106 l_osdc_osdop_read,
107 l_osdc_osdop_write,
108 l_osdc_osdop_writefull,
109 l_osdc_osdop_writesame,
110 l_osdc_osdop_append,
111 l_osdc_osdop_zero,
112 l_osdc_osdop_truncate,
113 l_osdc_osdop_delete,
114 l_osdc_osdop_mapext,
115 l_osdc_osdop_sparse_read,
116 l_osdc_osdop_clonerange,
117 l_osdc_osdop_getxattr,
118 l_osdc_osdop_setxattr,
119 l_osdc_osdop_cmpxattr,
120 l_osdc_osdop_rmxattr,
121 l_osdc_osdop_resetxattrs,
7c673cae
FG
122 l_osdc_osdop_call,
123 l_osdc_osdop_watch,
124 l_osdc_osdop_notify,
125 l_osdc_osdop_src_cmpxattr,
126 l_osdc_osdop_pgls,
127 l_osdc_osdop_pgls_filter,
128 l_osdc_osdop_other,
129
130 l_osdc_linger_active,
131 l_osdc_linger_send,
132 l_osdc_linger_resend,
133 l_osdc_linger_ping,
134
135 l_osdc_poolop_active,
136 l_osdc_poolop_send,
137 l_osdc_poolop_resend,
138
139 l_osdc_poolstat_active,
140 l_osdc_poolstat_send,
141 l_osdc_poolstat_resend,
142
143 l_osdc_statfs_active,
144 l_osdc_statfs_send,
145 l_osdc_statfs_resend,
146
147 l_osdc_command_active,
148 l_osdc_command_send,
149 l_osdc_command_resend,
150
151 l_osdc_map_epoch,
152 l_osdc_map_full,
153 l_osdc_map_inc,
154
155 l_osdc_osd_sessions,
156 l_osdc_osd_session_open,
157 l_osdc_osd_session_close,
158 l_osdc_osd_laggy,
159
160 l_osdc_osdop_omap_wr,
161 l_osdc_osdop_omap_rd,
162 l_osdc_osdop_omap_del,
163
164 l_osdc_last,
165};
166
167
168// config obs ----------------------------
169
7c673cae
FG
170class Objecter::RequestStateHook : public AdminSocketHook {
171 Objecter *m_objecter;
172public:
173 explicit RequestStateHook(Objecter *objecter);
9f95a23c
TL
174 int call(std::string_view command, const cmdmap_t& cmdmap,
175 Formatter *f,
176 std::ostream& ss,
177 ceph::buffer::list& out) override;
7c673cae
FG
178};
179
180/**
181 * This is a more limited form of C_Contexts, but that requires
182 * a ceph_context which we don't have here.
183 */
184class ObjectOperation::C_TwoContexts : public Context {
185 Context *first;
186 Context *second;
187public:
188 C_TwoContexts(Context *first, Context *second) :
189 first(first), second(second) {}
190 void finish(int r) override {
191 first->complete(r);
192 second->complete(r);
193 first = NULL;
194 second = NULL;
195 }
196
197 ~C_TwoContexts() override {
198 delete first;
199 delete second;
200 }
201};
202
203void ObjectOperation::add_handler(Context *extra) {
204 size_t last = out_handler.size() - 1;
205 Context *orig = out_handler[last];
206 if (orig) {
207 Context *wrapper = new C_TwoContexts(orig, extra);
208 out_handler[last] = wrapper;
209 } else {
210 out_handler[last] = extra;
211 }
212}
213
214Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
215 object_t& oid)
216{
217 if (oid.name.empty())
218 return unique_completion_lock();
219
220 static constexpr uint32_t HASH_PRIME = 1021;
221 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
222 % HASH_PRIME;
223
224 return unique_completion_lock(completion_locks[h % num_locks],
225 std::defer_lock);
226}
227
228const char** Objecter::get_tracked_conf_keys() const
229{
f91f0fd5
TL
230 static const char *config_keys[] = {
231 "crush_location",
232 "rados_mon_op_timeout",
233 "rados_osd_op_timeout",
234 NULL
235 };
7c673cae
FG
236 return config_keys;
237}
238
239
11fdf7f2 240void Objecter::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
241 const std::set <std::string> &changed)
242{
243 if (changed.count("crush_location")) {
244 update_crush_location();
245 }
f91f0fd5
TL
246 if (changed.count("rados_mon_op_timeout")) {
247 mon_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
248 }
249 if (changed.count("rados_osd_op_timeout")) {
250 osd_timeout = conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
251 }
7c673cae
FG
252}
253
254void Objecter::update_crush_location()
255{
256 unique_lock wl(rwlock);
257 crush_location = cct->crush_location.get_location();
258}
259
260// messages ------------------------------
261
262/*
263 * initialize only internal data structures, don't initiate cluster interaction
264 */
265void Objecter::init()
266{
11fdf7f2 267 ceph_assert(!initialized);
7c673cae
FG
268
269 if (!logger) {
270 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
271
272 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
273 PerfCountersBuilder::PRIO_CRITICAL);
274 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
275 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
11fdf7f2 276 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
277 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
278 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
279
280 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
281 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
282 PerfCountersBuilder::PRIO_CRITICAL);
283 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
284 PerfCountersBuilder::PRIO_CRITICAL);
285 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
286 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
287 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
288
289 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
290 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
291 "Create object operations");
292 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
293 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
294 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
295 "Write full object operations");
296 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
297 "Write same operations");
298 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
299 "Append operation");
300 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
301 "Set object to zero operations");
302 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
303 "Truncate object operations");
304 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
305 "Delete object operations");
306 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
307 "Map extent operations");
308 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
309 "Sparse read operations");
310 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
311 "Clone range operations");
312 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
313 "Get xattr operations");
314 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
315 "Set xattr operations");
316 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
317 "Xattr comparison operations");
318 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
319 "Remove xattr operations");
320 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
321 "Reset xattr operations");
7c673cae
FG
322 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
323 "Call (execute) operations");
324 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
325 "Watch by object operations");
326 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
327 "Notify about object operations");
328 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
329 "Extended attribute comparison in multi operations");
330 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
331 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
332 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
333
334 pcb.add_u64(l_osdc_linger_active, "linger_active",
335 "Active lingering operations");
336 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
337 "Sent lingering operations");
338 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
339 "Resent lingering operations");
340 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
341 "Sent pings to lingering operations");
342
343 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
344 "Active pool operations");
345 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
346 "Sent pool operations");
347 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
348 "Resent pool operations");
349
350 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
351 "Active get pool stat operations");
352 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
353 "Pool stat operations sent");
354 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
355 "Resent pool stats");
356
357 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
358 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
359 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
360 "Resent FS stats");
361
362 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
363 pcb.add_u64_counter(l_osdc_command_send, "command_send",
364 "Sent commands");
365 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
366 "Resent commands");
367
368 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
369 pcb.add_u64_counter(l_osdc_map_full, "map_full",
370 "Full OSD maps received");
371 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
372 "Incremental OSD maps received");
373
374 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
375 "Open sessions"); // open sessions
376 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
377 "Sessions opened");
378 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
379 "Sessions closed");
380 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
381
382 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
383 "OSD OMAP write operations");
384 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
385 "OSD OMAP read operations");
386 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
387 "OSD OMAP delete operations");
388
389 logger = pcb.create_perf_counters();
390 cct->get_perfcounters_collection()->add(logger);
391 }
392
393 m_request_state_hook = new RequestStateHook(this);
394 AdminSocket* admin_socket = cct->get_admin_socket();
395 int ret = admin_socket->register_command("objecter_requests",
7c673cae
FG
396 m_request_state_hook,
397 "show in-progress osd requests");
398
399 /* Don't warn on EEXIST, happens if multiple ceph clients
400 * are instantiated from one process */
401 if (ret < 0 && ret != -EEXIST) {
402 lderr(cct) << "error registering admin socket command: "
403 << cpp_strerror(ret) << dendl;
404 }
405
406 update_crush_location();
407
11fdf7f2 408 cct->_conf.add_observer(this);
7c673cae 409
31f18b77 410 initialized = true;
7c673cae
FG
411}
412
413/*
414 * ok, cluster interaction can happen
415 */
416void Objecter::start(const OSDMap* o)
417{
418 shared_lock rl(rwlock);
419
420 start_tick();
421 if (o) {
422 osdmap->deepish_copy_from(*o);
9f95a23c 423 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
424 } else if (osdmap->get_epoch() == 0) {
425 _maybe_request_map();
426 }
427}
428
429void Objecter::shutdown()
430{
11fdf7f2 431 ceph_assert(initialized);
7c673cae
FG
432
433 unique_lock wl(rwlock);
434
31f18b77 435 initialized = false;
7c673cae 436
f64942e4 437 wl.unlock();
11fdf7f2 438 cct->_conf.remove_observer(this);
f64942e4 439 wl.lock();
7c673cae
FG
440
441 map<int,OSDSession*>::iterator p;
442 while (!osd_sessions.empty()) {
443 p = osd_sessions.begin();
444 close_session(p->second);
445 }
446
447 while(!check_latest_map_lingers.empty()) {
448 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
449 i->second->put();
450 check_latest_map_lingers.erase(i->first);
451 }
452
453 while(!check_latest_map_ops.empty()) {
454 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
455 i->second->put();
456 check_latest_map_ops.erase(i->first);
457 }
458
459 while(!check_latest_map_commands.empty()) {
460 map<ceph_tid_t, CommandOp*>::iterator i
461 = check_latest_map_commands.begin();
462 i->second->put();
463 check_latest_map_commands.erase(i->first);
464 }
465
466 while(!poolstat_ops.empty()) {
467 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
468 delete i->second;
469 poolstat_ops.erase(i->first);
470 }
471
472 while(!statfs_ops.empty()) {
473 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
474 delete i->second;
475 statfs_ops.erase(i->first);
476 }
477
478 while(!pool_ops.empty()) {
479 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
480 delete i->second;
481 pool_ops.erase(i->first);
482 }
483
484 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
485 while(!homeless_session->linger_ops.empty()) {
486 std::map<uint64_t, LingerOp*>::iterator i
487 = homeless_session->linger_ops.begin();
488 ldout(cct, 10) << " linger_op " << i->first << dendl;
489 LingerOp *lop = i->second;
490 {
491 OSDSession::unique_lock swl(homeless_session->lock);
492 _session_linger_op_remove(homeless_session, lop);
493 }
494 linger_ops.erase(lop->linger_id);
495 linger_ops_set.erase(lop);
496 lop->put();
497 }
498
499 while(!homeless_session->ops.empty()) {
500 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
501 ldout(cct, 10) << " op " << i->first << dendl;
502 Op *op = i->second;
503 {
504 OSDSession::unique_lock swl(homeless_session->lock);
505 _session_op_remove(homeless_session, op);
506 }
507 op->put();
508 }
509
510 while(!homeless_session->command_ops.empty()) {
511 std::map<ceph_tid_t, CommandOp*>::iterator i
512 = homeless_session->command_ops.begin();
513 ldout(cct, 10) << " command_op " << i->first << dendl;
514 CommandOp *cop = i->second;
515 {
516 OSDSession::unique_lock swl(homeless_session->lock);
517 _session_command_op_remove(homeless_session, cop);
518 }
519 cop->put();
520 }
521
522 if (tick_event) {
523 if (timer.cancel_event(tick_event)) {
524 ldout(cct, 10) << " successfully canceled tick" << dendl;
525 }
526 tick_event = 0;
527 }
528
529 if (logger) {
530 cct->get_perfcounters_collection()->remove(logger);
531 delete logger;
532 logger = NULL;
533 }
534
535 // Let go of Objecter write lock so timer thread can shutdown
536 wl.unlock();
537
538 // Outside of lock to avoid cycle WRT calls to RequestStateHook
539 // This is safe because we guarantee no concurrent calls to
540 // shutdown() with the ::initialized check at start.
541 if (m_request_state_hook) {
542 AdminSocket* admin_socket = cct->get_admin_socket();
9f95a23c 543 admin_socket->unregister_commands(m_request_state_hook);
7c673cae
FG
544 delete m_request_state_hook;
545 m_request_state_hook = NULL;
546 }
547}
548
549void Objecter::_send_linger(LingerOp *info,
550 shunique_lock& sul)
551{
11fdf7f2 552 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
553
554 vector<OSDOp> opv;
555 Context *oncommit = NULL;
556 LingerOp::shared_lock watchl(info->watch_lock);
9f95a23c 557 ceph::buffer::list *poutbl = NULL;
7c673cae
FG
558 if (info->registered && info->is_watch) {
559 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
560 << dendl;
561 opv.push_back(OSDOp());
562 opv.back().op.op = CEPH_OSD_OP_WATCH;
563 opv.back().op.watch.cookie = info->get_cookie();
564 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
565 opv.back().op.watch.gen = ++info->register_gen;
566 oncommit = new C_Linger_Reconnect(this, info);
567 } else {
568 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
569 << dendl;
570 opv = info->ops;
571 C_Linger_Commit *c = new C_Linger_Commit(this, info);
572 if (!info->is_watch) {
573 info->notify_id = 0;
574 poutbl = &c->outbl;
575 }
576 oncommit = c;
577 }
578 watchl.unlock();
579 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
580 opv, info->target.flags | CEPH_OSD_FLAG_READ,
581 oncommit, info->pobjver);
582 o->outbl = poutbl;
583 o->snapid = info->snap;
584 o->snapc = info->snapc;
585 o->mtime = info->mtime;
586
587 o->target = info->target;
31f18b77 588 o->tid = ++last_tid;
7c673cae
FG
589
590 // do not resend this; we will send a new op to reregister
591 o->should_resend = false;
11fdf7f2 592 o->ctx_budgeted = true;
7c673cae
FG
593
594 if (info->register_tid) {
11fdf7f2 595 // repeat send. cancel old registration op, if any.
7c673cae
FG
596 OSDSession::unique_lock sl(info->session->lock);
597 if (info->session->ops.count(info->register_tid)) {
598 Op *o = info->session->ops[info->register_tid];
599 _op_cancel_map_check(o);
600 _cancel_linger_op(o);
601 }
602 sl.unlock();
7c673cae
FG
603 }
604
11fdf7f2
TL
605 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
606
7c673cae
FG
607 logger->inc(l_osdc_linger_send);
608}
609
9f95a23c 610void Objecter::_linger_commit(LingerOp *info, int r, ceph::buffer::list& outbl)
7c673cae
FG
611{
612 LingerOp::unique_lock wl(info->watch_lock);
613 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
614 if (info->on_reg_commit) {
615 info->on_reg_commit->complete(r);
616 info->on_reg_commit = NULL;
617 }
28e407b8
AA
618 if (r < 0 && info->on_notify_finish) {
619 info->on_notify_finish->complete(r);
620 info->on_notify_finish = nullptr;
621 }
7c673cae
FG
622
623 // only tell the user the first time we do this
624 info->registered = true;
625 info->pobjver = NULL;
626
627 if (!info->is_watch) {
628 // make note of the notify_id
11fdf7f2 629 auto p = outbl.cbegin();
7c673cae 630 try {
11fdf7f2 631 decode(info->notify_id, p);
7c673cae
FG
632 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
633 << dendl;
634 }
9f95a23c 635 catch (ceph::buffer::error& e) {
7c673cae
FG
636 }
637 }
638}
639
640struct C_DoWatchError : public Context {
641 Objecter *objecter;
642 Objecter::LingerOp *info;
643 int err;
644 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
645 : objecter(o), info(i), err(r) {
646 info->get();
647 info->_queued_async();
648 }
649 void finish(int r) override {
650 Objecter::unique_lock wl(objecter->rwlock);
651 bool canceled = info->canceled;
652 wl.unlock();
653
654 if (!canceled) {
655 info->watch_context->handle_error(info->get_cookie(), err);
656 }
657
658 info->finished_async();
659 info->put();
660 }
661};
662
663int Objecter::_normalize_watch_error(int r)
664{
665 // translate ENOENT -> ENOTCONN so that a delete->disconnection
11fdf7f2 666 // notification and a failure to reconnect because we raced with
7c673cae
FG
667 // the delete appear the same to the user.
668 if (r == -ENOENT)
669 r = -ENOTCONN;
670 return r;
671}
672
673void Objecter::_linger_reconnect(LingerOp *info, int r)
674{
675 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
676 << " (last_error " << info->last_error << ")" << dendl;
677 if (r < 0) {
678 LingerOp::unique_lock wl(info->watch_lock);
679 if (!info->last_error) {
680 r = _normalize_watch_error(r);
681 info->last_error = r;
682 if (info->watch_context) {
683 finisher->queue(new C_DoWatchError(this, info, r));
684 }
685 }
686 wl.unlock();
687 }
688}
689
690void Objecter::_send_linger_ping(LingerOp *info)
691{
692 // rwlock is locked unique
693 // info->session->lock is locked
694
695 if (cct->_conf->objecter_inject_no_watch_ping) {
696 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
697 << dendl;
698 return;
699 }
700 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
701 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
702 return;
703 }
704
11fdf7f2 705 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
7c673cae
FG
706 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
707 << dendl;
708
709 vector<OSDOp> opv(1);
710 opv[0].op.op = CEPH_OSD_OP_WATCH;
711 opv[0].op.watch.cookie = info->get_cookie();
712 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
713 opv[0].op.watch.gen = info->register_gen;
714 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
715 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
716 opv, info->target.flags | CEPH_OSD_FLAG_READ,
717 onack, NULL, NULL);
718 o->target = info->target;
719 o->should_resend = false;
720 _send_op_account(o);
31f18b77 721 o->tid = ++last_tid;
7c673cae 722 _session_op_assign(info->session, o);
11fdf7f2 723 _send_op(o);
7c673cae
FG
724 info->ping_tid = o->tid;
725
726 onack->sent = now;
727 logger->inc(l_osdc_linger_ping);
728}
729
11fdf7f2 730void Objecter::_linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
7c673cae
FG
731 uint32_t register_gen)
732{
733 LingerOp::unique_lock l(info->watch_lock);
734 ldout(cct, 10) << __func__ << " " << info->linger_id
735 << " sent " << sent << " gen " << register_gen << " = " << r
736 << " (last_error " << info->last_error
737 << " register_gen " << info->register_gen << ")" << dendl;
738 if (info->register_gen == register_gen) {
739 if (r == 0) {
740 info->watch_valid_thru = sent;
741 } else if (r < 0 && !info->last_error) {
742 r = _normalize_watch_error(r);
743 info->last_error = r;
744 if (info->watch_context) {
745 finisher->queue(new C_DoWatchError(this, info, r));
746 }
747 }
748 } else {
749 ldout(cct, 20) << " ignoring old gen" << dendl;
750 }
751}
752
753int Objecter::linger_check(LingerOp *info)
754{
755 LingerOp::shared_lock l(info->watch_lock);
756
11fdf7f2 757 ceph::coarse_mono_time stamp = info->watch_valid_thru;
7c673cae 758 if (!info->watch_pending_async.empty())
11fdf7f2
TL
759 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
760 auto age = ceph::coarse_mono_clock::now() - stamp;
7c673cae
FG
761
762 ldout(cct, 10) << __func__ << " " << info->linger_id
763 << " err " << info->last_error
764 << " age " << age << dendl;
765 if (info->last_error)
766 return info->last_error;
767 // return a safe upper bound (we are truncating to ms)
768 return
769 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
770}
771
772void Objecter::linger_cancel(LingerOp *info)
773{
774 unique_lock wl(rwlock);
775 _linger_cancel(info);
776 info->put();
777}
778
779void Objecter::_linger_cancel(LingerOp *info)
780{
781 // rwlock is locked unique
782 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
783 if (!info->canceled) {
784 OSDSession *s = info->session;
785 OSDSession::unique_lock sl(s->lock);
786 _session_linger_op_remove(s, info);
787 sl.unlock();
788
789 linger_ops.erase(info->linger_id);
790 linger_ops_set.erase(info);
11fdf7f2 791 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
792
793 info->canceled = true;
794 info->put();
795
796 logger->dec(l_osdc_linger_active);
797 }
798}
799
800
801
802Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
803 const object_locator_t& oloc,
804 int flags)
805{
11fdf7f2 806 LingerOp *info = new LingerOp(this);
7c673cae
FG
807 info->target.base_oid = oid;
808 info->target.base_oloc = oloc;
809 if (info->target.base_oloc.key == oid)
810 info->target.base_oloc.key.clear();
811 info->target.flags = flags;
11fdf7f2 812 info->watch_valid_thru = ceph::coarse_mono_clock::now();
7c673cae
FG
813
814 unique_lock l(rwlock);
815
816 // Acquire linger ID
817 info->linger_id = ++max_linger_id;
818 ldout(cct, 10) << __func__ << " info " << info
819 << " linger_id " << info->linger_id
820 << " cookie " << info->get_cookie()
821 << dendl;
822 linger_ops[info->linger_id] = info;
823 linger_ops_set.insert(info);
11fdf7f2 824 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
825
826 info->get(); // for the caller
827 return info;
828}
829
830ceph_tid_t Objecter::linger_watch(LingerOp *info,
831 ObjectOperation& op,
832 const SnapContext& snapc,
833 real_time mtime,
9f95a23c 834 ceph::buffer::list& inbl,
7c673cae
FG
835 Context *oncommit,
836 version_t *objver)
837{
838 info->is_watch = true;
839 info->snapc = snapc;
840 info->mtime = mtime;
841 info->target.flags |= CEPH_OSD_FLAG_WRITE;
842 info->ops = op.ops;
843 info->inbl = inbl;
844 info->poutbl = NULL;
845 info->pobjver = objver;
846 info->on_reg_commit = oncommit;
847
11fdf7f2
TL
848 info->ctx_budget = take_linger_budget(info);
849
7c673cae
FG
850 shunique_lock sul(rwlock, ceph::acquire_unique);
851 _linger_submit(info, sul);
852 logger->inc(l_osdc_linger_active);
853
854 return info->linger_id;
855}
856
857ceph_tid_t Objecter::linger_notify(LingerOp *info,
858 ObjectOperation& op,
9f95a23c
TL
859 snapid_t snap, ceph::buffer::list& inbl,
860 ceph::buffer::list *poutbl,
7c673cae
FG
861 Context *onfinish,
862 version_t *objver)
863{
864 info->snap = snap;
865 info->target.flags |= CEPH_OSD_FLAG_READ;
866 info->ops = op.ops;
867 info->inbl = inbl;
868 info->poutbl = poutbl;
869 info->pobjver = objver;
870 info->on_reg_commit = onfinish;
871
11fdf7f2
TL
872 info->ctx_budget = take_linger_budget(info);
873
7c673cae
FG
874 shunique_lock sul(rwlock, ceph::acquire_unique);
875 _linger_submit(info, sul);
876 logger->inc(l_osdc_linger_active);
877
878 return info->linger_id;
879}
880
881void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
882{
11fdf7f2
TL
883 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
884 ceph_assert(info->linger_id);
885 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
7c673cae
FG
886
887 // Populate Op::target
888 OSDSession *s = NULL;
889 _calc_target(&info->target, nullptr);
890
891 // Create LingerOp<->OSDSession relation
892 int r = _get_session(info->target.osd, &s, sul);
11fdf7f2 893 ceph_assert(r == 0);
7c673cae
FG
894 OSDSession::unique_lock sl(s->lock);
895 _session_linger_op_assign(s, info);
896 sl.unlock();
897 put_session(s);
898
899 _send_linger(info, sul);
900}
901
902struct C_DoWatchNotify : public Context {
903 Objecter *objecter;
904 Objecter::LingerOp *info;
905 MWatchNotify *msg;
906 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
907 : objecter(o), info(i), msg(m) {
908 info->get();
909 info->_queued_async();
910 msg->get();
911 }
912 void finish(int r) override {
913 objecter->_do_watch_notify(info, msg);
914 }
915};
916
917void Objecter::handle_watch_notify(MWatchNotify *m)
918{
919 shared_lock l(rwlock);
31f18b77 920 if (!initialized) {
7c673cae
FG
921 return;
922 }
923
924 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
925 if (linger_ops_set.count(info) == 0) {
926 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
927 return;
928 }
929 LingerOp::unique_lock wl(info->watch_lock);
930 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
931 if (!info->last_error) {
932 info->last_error = -ENOTCONN;
933 if (info->watch_context) {
934 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
935 }
936 }
937 } else if (!info->is_watch) {
938 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
939 // since we know the only user (librados) is safe to call in
940 // fast-dispatch context
941 if (info->notify_id &&
942 info->notify_id != m->notify_id) {
943 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
944 << " != " << info->notify_id << ", ignoring" << dendl;
945 } else if (info->on_notify_finish) {
946 info->notify_result_bl->claim(m->get_data());
947 info->on_notify_finish->complete(m->return_code);
948
949 // if we race with reconnect we might get a second notify; only
950 // notify the caller once!
951 info->on_notify_finish = NULL;
952 }
953 } else {
954 finisher->queue(new C_DoWatchNotify(this, info, m));
955 }
956}
957
958void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
959{
960 ldout(cct, 10) << __func__ << " " << *m << dendl;
961
962 shared_lock l(rwlock);
11fdf7f2 963 ceph_assert(initialized);
7c673cae
FG
964
965 if (info->canceled) {
966 l.unlock();
967 goto out;
968 }
969
970 // notify completion?
11fdf7f2
TL
971 ceph_assert(info->is_watch);
972 ceph_assert(info->watch_context);
973 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
7c673cae
FG
974
975 l.unlock();
976
977 switch (m->opcode) {
978 case CEPH_WATCH_EVENT_NOTIFY:
979 info->watch_context->handle_notify(m->notify_id, m->cookie,
980 m->notifier_gid, m->bl);
981 break;
982 }
983
984 out:
985 info->finished_async();
986 info->put();
987 m->put();
988}
989
990bool Objecter::ms_dispatch(Message *m)
991{
992 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
7c673cae
FG
993 switch (m->get_type()) {
994 // these we exlusively handle
995 case CEPH_MSG_OSD_OPREPLY:
996 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
997 return true;
998
999 case CEPH_MSG_OSD_BACKOFF:
1000 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
1001 return true;
1002
1003 case CEPH_MSG_WATCH_NOTIFY:
1004 handle_watch_notify(static_cast<MWatchNotify*>(m));
1005 m->put();
1006 return true;
1007
1008 case MSG_COMMAND_REPLY:
1009 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
1010 handle_command_reply(static_cast<MCommandReply*>(m));
1011 return true;
1012 } else {
1013 return false;
1014 }
1015
1016 case MSG_GETPOOLSTATSREPLY:
1017 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
1018 return true;
1019
1020 case CEPH_MSG_POOLOP_REPLY:
1021 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1022 return true;
1023
1024 case CEPH_MSG_STATFS_REPLY:
1025 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1026 return true;
1027
1028 // these we give others a chance to inspect
1029
1030 // MDS, OSD
1031 case CEPH_MSG_OSD_MAP:
1032 handle_osd_map(static_cast<MOSDMap*>(m));
1033 return false;
1034 }
1035 return false;
1036}
1037
11fdf7f2
TL
1038void Objecter::_scan_requests(
1039 OSDSession *s,
1040 bool skipped_map,
1041 bool cluster_full,
1042 map<int64_t, bool> *pool_full_map,
1043 map<ceph_tid_t, Op*>& need_resend,
1044 list<LingerOp*>& need_resend_linger,
1045 map<ceph_tid_t, CommandOp*>& need_resend_command,
9f95a23c 1046 shunique_lock& sul)
7c673cae 1047{
11fdf7f2 1048 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
1049
1050 list<LingerOp*> unregister_lingers;
1051
1052 OSDSession::unique_lock sl(s->lock);
1053
1054 // check for changed linger mappings (_before_ regular ops)
1055 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1056 while (lp != s->linger_ops.end()) {
1057 LingerOp *op = lp->second;
11fdf7f2 1058 ceph_assert(op->session == s);
7c673cae
FG
1059 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1060 // invalidation
1061 ++lp;
1062 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1063 bool unregister, force_resend_writes = cluster_full;
1064 int r = _recalc_linger_op_target(op, sul);
1065 if (pool_full_map)
1066 force_resend_writes = force_resend_writes ||
1067 (*pool_full_map)[op->target.base_oloc.pool];
1068 switch (r) {
1069 case RECALC_OP_TARGET_NO_ACTION:
11fdf7f2 1070 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1071 break;
1072 // -- fall-thru --
1073 case RECALC_OP_TARGET_NEED_RESEND:
1074 need_resend_linger.push_back(op);
1075 _linger_cancel_map_check(op);
1076 break;
1077 case RECALC_OP_TARGET_POOL_DNE:
1078 _check_linger_pool_dne(op, &unregister);
1079 if (unregister) {
1080 ldout(cct, 10) << " need to unregister linger op "
1081 << op->linger_id << dendl;
1082 op->get();
1083 unregister_lingers.push_back(op);
1084 }
1085 break;
1086 }
1087 }
1088
1089 // check for changed request mappings
1090 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1091 while (p != s->ops.end()) {
1092 Op *op = p->second;
1093 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1094 ldout(cct, 10) << " checking op " << op->tid << dendl;
11fdf7f2 1095 _prune_snapc(osdmap->get_new_removed_snaps(), op);
7c673cae
FG
1096 bool force_resend_writes = cluster_full;
1097 if (pool_full_map)
1098 force_resend_writes = force_resend_writes ||
1099 (*pool_full_map)[op->target.base_oloc.pool];
1100 int r = _calc_target(&op->target,
1101 op->session ? op->session->con.get() : nullptr);
1102 switch (r) {
1103 case RECALC_OP_TARGET_NO_ACTION:
9f95a23c 1104 if (!skipped_map && !(force_resend_writes && op->target.respects_full()))
7c673cae
FG
1105 break;
1106 // -- fall-thru --
1107 case RECALC_OP_TARGET_NEED_RESEND:
11fdf7f2 1108 _session_op_remove(op->session, op);
7c673cae
FG
1109 need_resend[op->tid] = op;
1110 _op_cancel_map_check(op);
1111 break;
1112 case RECALC_OP_TARGET_POOL_DNE:
1113 _check_op_pool_dne(op, &sl);
1114 break;
1115 }
1116 }
1117
1118 // commands
1119 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1120 while (cp != s->command_ops.end()) {
1121 CommandOp *c = cp->second;
1122 ++cp;
1123 ldout(cct, 10) << " checking command " << c->tid << dendl;
1124 bool force_resend_writes = cluster_full;
1125 if (pool_full_map)
1126 force_resend_writes = force_resend_writes ||
1127 (*pool_full_map)[c->target_pg.pool()];
1128 int r = _calc_command_target(c, sul);
1129 switch (r) {
1130 case RECALC_OP_TARGET_NO_ACTION:
1131 // resend if skipped map; otherwise do nothing.
11fdf7f2 1132 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1133 break;
1134 // -- fall-thru --
1135 case RECALC_OP_TARGET_NEED_RESEND:
1136 need_resend_command[c->tid] = c;
11fdf7f2 1137 _session_command_op_remove(c->session, c);
7c673cae
FG
1138 _command_cancel_map_check(c);
1139 break;
1140 case RECALC_OP_TARGET_POOL_DNE:
1141 case RECALC_OP_TARGET_OSD_DNE:
1142 case RECALC_OP_TARGET_OSD_DOWN:
1143 _check_command_map_dne(c);
1144 break;
1145 }
1146 }
1147
1148 sl.unlock();
1149
1150 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1151 iter != unregister_lingers.end();
1152 ++iter) {
1153 _linger_cancel(*iter);
1154 (*iter)->put();
1155 }
1156}
1157
1158void Objecter::handle_osd_map(MOSDMap *m)
1159{
1160 shunique_lock sul(rwlock, acquire_unique);
31f18b77 1161 if (!initialized)
7c673cae
FG
1162 return;
1163
11fdf7f2 1164 ceph_assert(osdmap);
7c673cae
FG
1165
1166 if (m->fsid != monc->get_fsid()) {
1167 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1168 << " != " << monc->get_fsid() << dendl;
1169 return;
1170 }
1171
1172 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1173 bool cluster_full = _osdmap_full_flag();
1174 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1175 _osdmap_has_pool_full();
1176 map<int64_t, bool> pool_full_map;
1177 for (map<int64_t, pg_pool_t>::const_iterator it
1178 = osdmap->get_pools().begin();
1179 it != osdmap->get_pools().end(); ++it)
1180 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1181
1182
1183 list<LingerOp*> need_resend_linger;
1184 map<ceph_tid_t, Op*> need_resend;
1185 map<ceph_tid_t, CommandOp*> need_resend_command;
1186
1187 if (m->get_last() <= osdmap->get_epoch()) {
1188 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1189 << m->get_first() << "," << m->get_last()
1190 << "] <= " << osdmap->get_epoch() << dendl;
1191 } else {
1192 ldout(cct, 3) << "handle_osd_map got epochs ["
1193 << m->get_first() << "," << m->get_last()
1194 << "] > " << osdmap->get_epoch() << dendl;
1195
1196 if (osdmap->get_epoch()) {
1197 bool skipped_map = false;
1198 // we want incrementals
1199 for (epoch_t e = osdmap->get_epoch() + 1;
1200 e <= m->get_last();
1201 e++) {
1202
1203 if (osdmap->get_epoch() == e-1 &&
1204 m->incremental_maps.count(e)) {
1205 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1206 << dendl;
1207 OSDMap::Incremental inc(m->incremental_maps[e]);
1208 osdmap->apply_incremental(inc);
31f18b77
FG
1209
1210 emit_blacklist_events(inc);
1211
7c673cae
FG
1212 logger->inc(l_osdc_map_inc);
1213 }
1214 else if (m->maps.count(e)) {
1215 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
9f95a23c 1216 auto new_osdmap = std::make_unique<OSDMap>();
31f18b77
FG
1217 new_osdmap->decode(m->maps[e]);
1218
1219 emit_blacklist_events(*osdmap, *new_osdmap);
9f95a23c 1220 osdmap = std::move(new_osdmap);
31f18b77 1221
7c673cae
FG
1222 logger->inc(l_osdc_map_full);
1223 }
1224 else {
1225 if (e >= m->get_oldest()) {
1226 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1227 << osdmap->get_epoch()+1 << dendl;
1228 _maybe_request_map();
1229 break;
1230 }
1231 ldout(cct, 3) << "handle_osd_map missing epoch "
1232 << osdmap->get_epoch()+1
1233 << ", jumping to " << m->get_oldest() << dendl;
1234 e = m->get_oldest() - 1;
1235 skipped_map = true;
1236 continue;
1237 }
1238 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1239
9f95a23c 1240 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1241 cluster_full = cluster_full || _osdmap_full_flag();
1242 update_pool_full_map(pool_full_map);
1243
1244 // check all outstanding requests on every epoch
11fdf7f2
TL
1245 for (auto& i : need_resend) {
1246 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
11fdf7f2 1247 }
7c673cae
FG
1248 _scan_requests(homeless_session, skipped_map, cluster_full,
1249 &pool_full_map, need_resend,
9f95a23c 1250 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1251 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1252 p != osd_sessions.end(); ) {
1253 OSDSession *s = p->second;
1254 _scan_requests(s, skipped_map, cluster_full,
1255 &pool_full_map, need_resend,
9f95a23c 1256 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1257 ++p;
1258 // osd down or addr change?
1259 if (!osdmap->is_up(s->osd) ||
1260 (s->con &&
11fdf7f2 1261 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
7c673cae
FG
1262 close_session(s);
1263 }
1264 }
1265
11fdf7f2 1266 ceph_assert(e == osdmap->get_epoch());
7c673cae
FG
1267 }
1268
1269 } else {
1270 // first map. we want the full thing.
1271 if (m->maps.count(m->get_last())) {
1272 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1273 p != osd_sessions.end(); ++p) {
1274 OSDSession *s = p->second;
1275 _scan_requests(s, false, false, NULL, need_resend,
9f95a23c 1276 need_resend_linger, need_resend_command, sul);
7c673cae
FG
1277 }
1278 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1279 << m->get_last() << dendl;
1280 osdmap->decode(m->maps[m->get_last()]);
9f95a23c 1281 prune_pg_mapping(osdmap->get_pools());
7c673cae
FG
1282
1283 _scan_requests(homeless_session, false, false, NULL,
1284 need_resend, need_resend_linger,
9f95a23c 1285 need_resend_command, sul);
7c673cae
FG
1286 } else {
1287 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1288 << dendl;
1289 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1290 monc->renew_subs();
1291 }
1292 }
1293 }
1294
1295 // make sure need_resend targets reflect latest map
1296 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1297 Op *op = p->second;
1298 if (op->target.epoch < osdmap->get_epoch()) {
1299 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1300 int r = _calc_target(&op->target, nullptr);
1301 if (r == RECALC_OP_TARGET_POOL_DNE) {
1302 p = need_resend.erase(p);
1303 _check_op_pool_dne(op, nullptr);
1304 } else {
1305 ++p;
1306 }
1307 } else {
1308 ++p;
1309 }
1310 }
1311
1312 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1313 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1314 || _osdmap_has_pool_full();
1315
1316 // was/is paused?
1317 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1318 osdmap->get_epoch() < epoch_barrier) {
1319 _maybe_request_map();
1320 }
1321
1322 // resend requests
1323 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1324 p != need_resend.end(); ++p) {
1325 Op *op = p->second;
1326 OSDSession *s = op->session;
1327 bool mapped_session = false;
1328 if (!s) {
1329 int r = _map_session(&op->target, &s, sul);
11fdf7f2 1330 ceph_assert(r == 0);
7c673cae
FG
1331 mapped_session = true;
1332 } else {
1333 get_session(s);
1334 }
1335 OSDSession::unique_lock sl(s->lock);
1336 if (mapped_session) {
1337 _session_op_assign(s, op);
1338 }
1339 if (op->should_resend) {
1340 if (!op->session->is_homeless() && !op->target.paused) {
1341 logger->inc(l_osdc_op_resend);
1342 _send_op(op);
1343 }
1344 } else {
1345 _op_cancel_map_check(op);
1346 _cancel_linger_op(op);
1347 }
1348 sl.unlock();
1349 put_session(s);
1350 }
1351 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1352 p != need_resend_linger.end(); ++p) {
1353 LingerOp *op = *p;
11fdf7f2 1354 ceph_assert(op->session);
7c673cae
FG
1355 if (!op->session->is_homeless()) {
1356 logger->inc(l_osdc_linger_resend);
1357 _send_linger(op, sul);
1358 }
1359 }
1360 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1361 p != need_resend_command.end(); ++p) {
1362 CommandOp *c = p->second;
1363 if (c->target.osd >= 0) {
1364 _assign_command_session(c, sul);
1365 if (c->session && !c->session->is_homeless()) {
1366 _send_command(c);
1367 }
1368 }
1369 }
1370
1371 _dump_active();
1372
1373 // finish any Contexts that were waiting on a map update
1374 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1375 waiting_for_map.begin();
1376 while (p != waiting_for_map.end() &&
1377 p->first <= osdmap->get_epoch()) {
1378 //go through the list and call the onfinish methods
1379 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1380 i != p->second.end(); ++i) {
1381 i->first->complete(i->second);
1382 }
1383 waiting_for_map.erase(p++);
1384 }
1385
1386 monc->sub_got("osdmap", osdmap->get_epoch());
1387
1388 if (!waiting_for_map.empty()) {
1389 _maybe_request_map();
1390 }
1391}
1392
31f18b77
FG
1393void Objecter::enable_blacklist_events()
1394{
1395 unique_lock wl(rwlock);
1396
1397 blacklist_events_enabled = true;
1398}
1399
1400void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1401{
1402 unique_lock wl(rwlock);
1403
1404 if (events->empty()) {
1405 events->swap(blacklist_events);
1406 } else {
1407 for (const auto &i : blacklist_events) {
1408 events->insert(i);
1409 }
1410 blacklist_events.clear();
1411 }
1412}
1413
1414void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1415{
1416 if (!blacklist_events_enabled) {
1417 return;
1418 }
1419
1420 for (const auto &i : inc.new_blacklist) {
1421 blacklist_events.insert(i.first);
1422 }
1423}
1424
1425void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1426 const OSDMap &new_osd_map)
1427{
1428 if (!blacklist_events_enabled) {
1429 return;
1430 }
1431
1432 std::set<entity_addr_t> old_set;
1433 std::set<entity_addr_t> new_set;
1434
1435 old_osd_map.get_blacklist(&old_set);
1436 new_osd_map.get_blacklist(&new_set);
1437
1438 std::set<entity_addr_t> delta_set;
1439 std::set_difference(
1440 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1441 std::inserter(delta_set, delta_set.begin()));
1442 blacklist_events.insert(delta_set.begin(), delta_set.end());
1443}
1444
7c673cae
FG
1445// op pool check
1446
1447void Objecter::C_Op_Map_Latest::finish(int r)
1448{
1449 if (r == -EAGAIN || r == -ECANCELED)
1450 return;
1451
1452 lgeneric_subdout(objecter->cct, objecter, 10)
1453 << "op_map_latest r=" << r << " tid=" << tid
1454 << " latest " << latest << dendl;
1455
1456 Objecter::unique_lock wl(objecter->rwlock);
1457
1458 map<ceph_tid_t, Op*>::iterator iter =
1459 objecter->check_latest_map_ops.find(tid);
1460 if (iter == objecter->check_latest_map_ops.end()) {
1461 lgeneric_subdout(objecter->cct, objecter, 10)
1462 << "op_map_latest op "<< tid << " not found" << dendl;
1463 return;
1464 }
1465
1466 Op *op = iter->second;
1467 objecter->check_latest_map_ops.erase(iter);
1468
1469 lgeneric_subdout(objecter->cct, objecter, 20)
1470 << "op_map_latest op "<< op << dendl;
1471
1472 if (op->map_dne_bound == 0)
1473 op->map_dne_bound = latest;
1474
1475 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1476 objecter->_check_op_pool_dne(op, &sl);
1477
1478 op->put();
1479}
1480
1481int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1482 snapid_t *snap) const
1483{
1484 shared_lock rl(rwlock);
1485
1486 auto& pools = osdmap->get_pools();
1487 auto iter = pools.find(poolid);
1488 if (iter == pools.end()) {
1489 return -ENOENT;
1490 }
1491 const pg_pool_t& pg_pool = iter->second;
1492 for (auto p = pg_pool.snaps.begin();
1493 p != pg_pool.snaps.end();
1494 ++p) {
1495 if (p->second.name == snap_name) {
1496 *snap = p->first;
1497 return 0;
1498 }
1499 }
1500 return -ENOENT;
1501}
1502
1503int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1504 pool_snap_info_t *info) const
1505{
1506 shared_lock rl(rwlock);
1507
1508 auto& pools = osdmap->get_pools();
1509 auto iter = pools.find(poolid);
1510 if (iter == pools.end()) {
1511 return -ENOENT;
1512 }
1513 const pg_pool_t& pg_pool = iter->second;
1514 auto p = pg_pool.snaps.find(snap);
1515 if (p == pg_pool.snaps.end())
1516 return -ENOENT;
1517 *info = p->second;
1518
1519 return 0;
1520}
1521
1522int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1523{
1524 shared_lock rl(rwlock);
1525
1526 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1527 if (!pi)
1528 return -ENOENT;
1529 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1530 p != pi->snaps.end();
1531 ++p) {
1532 snaps->push_back(p->first);
1533 }
1534 return 0;
1535}
1536
1537// sl may be unlocked.
1538void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1539{
1540 // rwlock is locked unique
1541
1542 if (op->target.pool_ever_existed) {
1543 // the pool previously existed and now it does not, which means it
1544 // was deleted.
1545 op->map_dne_bound = osdmap->get_epoch();
1546 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1547 << " pool previously exists but now does not"
1548 << dendl;
1549 } else {
1550 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1551 << " current " << osdmap->get_epoch()
1552 << " map_dne_bound " << op->map_dne_bound
1553 << dendl;
1554 }
1555 if (op->map_dne_bound > 0) {
1556 if (osdmap->get_epoch() >= op->map_dne_bound) {
1557 // we had a new enough map
1558 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1559 << " concluding pool " << op->target.base_pgid.pool()
1560 << " dne" << dendl;
1561 if (op->onfinish) {
11fdf7f2 1562 num_in_flight--;
7c673cae
FG
1563 op->onfinish->complete(-ENOENT);
1564 }
1565
1566 OSDSession *s = op->session;
1567 if (s) {
11fdf7f2
TL
1568 ceph_assert(s != NULL);
1569 ceph_assert(sl->mutex() == &s->lock);
7c673cae
FG
1570 bool session_locked = sl->owns_lock();
1571 if (!session_locked) {
1572 sl->lock();
1573 }
1574 _finish_op(op, 0);
1575 if (!session_locked) {
1576 sl->unlock();
1577 }
1578 } else {
1579 _finish_op(op, 0); // no session
1580 }
1581 }
1582 } else {
1583 _send_op_map_check(op);
1584 }
1585}
1586
1587void Objecter::_send_op_map_check(Op *op)
1588{
1589 // rwlock is locked unique
1590 // ask the monitor
1591 if (check_latest_map_ops.count(op->tid) == 0) {
1592 op->get();
1593 check_latest_map_ops[op->tid] = op;
1594 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1595 monc->get_version("osdmap", &c->latest, NULL, c);
1596 }
1597}
1598
1599void Objecter::_op_cancel_map_check(Op *op)
1600{
1601 // rwlock is locked unique
1602 map<ceph_tid_t, Op*>::iterator iter =
1603 check_latest_map_ops.find(op->tid);
1604 if (iter != check_latest_map_ops.end()) {
1605 Op *op = iter->second;
1606 op->put();
1607 check_latest_map_ops.erase(iter);
1608 }
1609}
1610
1611// linger pool check
1612
1613void Objecter::C_Linger_Map_Latest::finish(int r)
1614{
1615 if (r == -EAGAIN || r == -ECANCELED) {
1616 // ignore callback; we will retry in resend_mon_ops()
1617 return;
1618 }
1619
1620 unique_lock wl(objecter->rwlock);
1621
1622 map<uint64_t, LingerOp*>::iterator iter =
1623 objecter->check_latest_map_lingers.find(linger_id);
1624 if (iter == objecter->check_latest_map_lingers.end()) {
1625 return;
1626 }
1627
1628 LingerOp *op = iter->second;
1629 objecter->check_latest_map_lingers.erase(iter);
1630
1631 if (op->map_dne_bound == 0)
1632 op->map_dne_bound = latest;
1633
1634 bool unregister;
1635 objecter->_check_linger_pool_dne(op, &unregister);
1636
1637 if (unregister) {
1638 objecter->_linger_cancel(op);
1639 }
1640
1641 op->put();
1642}
1643
1644void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1645{
1646 // rwlock is locked unique
1647
1648 *need_unregister = false;
1649
1650 if (op->register_gen > 0) {
1651 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1652 << " pool previously existed but now does not"
1653 << dendl;
1654 op->map_dne_bound = osdmap->get_epoch();
1655 } else {
1656 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1657 << " current " << osdmap->get_epoch()
1658 << " map_dne_bound " << op->map_dne_bound
1659 << dendl;
1660 }
1661 if (op->map_dne_bound > 0) {
1662 if (osdmap->get_epoch() >= op->map_dne_bound) {
28e407b8 1663 LingerOp::unique_lock wl{op->watch_lock};
7c673cae
FG
1664 if (op->on_reg_commit) {
1665 op->on_reg_commit->complete(-ENOENT);
28e407b8
AA
1666 op->on_reg_commit = nullptr;
1667 }
1668 if (op->on_notify_finish) {
1669 op->on_notify_finish->complete(-ENOENT);
1670 op->on_notify_finish = nullptr;
7c673cae
FG
1671 }
1672 *need_unregister = true;
1673 }
1674 } else {
1675 _send_linger_map_check(op);
1676 }
1677}
1678
1679void Objecter::_send_linger_map_check(LingerOp *op)
1680{
1681 // ask the monitor
1682 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1683 op->get();
1684 check_latest_map_lingers[op->linger_id] = op;
1685 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1686 monc->get_version("osdmap", &c->latest, NULL, c);
1687 }
1688}
1689
1690void Objecter::_linger_cancel_map_check(LingerOp *op)
1691{
1692 // rwlock is locked unique
1693
1694 map<uint64_t, LingerOp*>::iterator iter =
1695 check_latest_map_lingers.find(op->linger_id);
1696 if (iter != check_latest_map_lingers.end()) {
1697 LingerOp *op = iter->second;
1698 op->put();
1699 check_latest_map_lingers.erase(iter);
1700 }
1701}
1702
1703// command pool check
1704
1705void Objecter::C_Command_Map_Latest::finish(int r)
1706{
1707 if (r == -EAGAIN || r == -ECANCELED) {
1708 // ignore callback; we will retry in resend_mon_ops()
1709 return;
1710 }
1711
1712 unique_lock wl(objecter->rwlock);
1713
1714 map<uint64_t, CommandOp*>::iterator iter =
1715 objecter->check_latest_map_commands.find(tid);
1716 if (iter == objecter->check_latest_map_commands.end()) {
1717 return;
1718 }
1719
1720 CommandOp *c = iter->second;
1721 objecter->check_latest_map_commands.erase(iter);
1722
1723 if (c->map_dne_bound == 0)
1724 c->map_dne_bound = latest;
1725
28e407b8 1726 OSDSession::unique_lock sul(c->session->lock);
7c673cae 1727 objecter->_check_command_map_dne(c);
28e407b8 1728 sul.unlock();
7c673cae
FG
1729
1730 c->put();
1731}
1732
1733void Objecter::_check_command_map_dne(CommandOp *c)
1734{
1735 // rwlock is locked unique
28e407b8 1736 // session is locked unique
7c673cae
FG
1737
1738 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1739 << " current " << osdmap->get_epoch()
1740 << " map_dne_bound " << c->map_dne_bound
1741 << dendl;
1742 if (c->map_dne_bound > 0) {
1743 if (osdmap->get_epoch() >= c->map_dne_bound) {
1744 _finish_command(c, c->map_check_error, c->map_check_error_str);
1745 }
1746 } else {
1747 _send_command_map_check(c);
1748 }
1749}
1750
1751void Objecter::_send_command_map_check(CommandOp *c)
1752{
1753 // rwlock is locked unique
28e407b8 1754 // session is locked unique
7c673cae
FG
1755
1756 // ask the monitor
1757 if (check_latest_map_commands.count(c->tid) == 0) {
1758 c->get();
1759 check_latest_map_commands[c->tid] = c;
1760 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1761 monc->get_version("osdmap", &f->latest, NULL, f);
1762 }
1763}
1764
1765void Objecter::_command_cancel_map_check(CommandOp *c)
1766{
1767 // rwlock is locked uniqe
1768
1769 map<uint64_t, CommandOp*>::iterator iter =
1770 check_latest_map_commands.find(c->tid);
1771 if (iter != check_latest_map_commands.end()) {
1772 CommandOp *c = iter->second;
1773 c->put();
1774 check_latest_map_commands.erase(iter);
1775 }
1776}
1777
1778
1779/**
1780 * Look up OSDSession by OSD id.
1781 *
1782 * @returns 0 on success, or -EAGAIN if the lock context requires
1783 * promotion to write.
1784 */
1785int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1786{
11fdf7f2 1787 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
1788
1789 if (osd < 0) {
1790 *session = homeless_session;
1791 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1792 << dendl;
1793 return 0;
1794 }
1795
1796 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1797 if (p != osd_sessions.end()) {
1798 OSDSession *s = p->second;
1799 s->get();
1800 *session = s;
1801 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1802 << s->get_nref() << dendl;
1803 return 0;
1804 }
1805 if (!sul.owns_lock()) {
1806 return -EAGAIN;
1807 }
1808 OSDSession *s = new OSDSession(cct, osd);
1809 osd_sessions[osd] = s;
11fdf7f2
TL
1810 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1811 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1812 logger->inc(l_osdc_osd_session_open);
1813 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1814 s->get();
1815 *session = s;
1816 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1817 << s->get_nref() << dendl;
1818 return 0;
1819}
1820
1821void Objecter::put_session(Objecter::OSDSession *s)
1822{
1823 if (s && !s->is_homeless()) {
1824 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1825 << s->get_nref() << dendl;
1826 s->put();
1827 }
1828}
1829
1830void Objecter::get_session(Objecter::OSDSession *s)
1831{
11fdf7f2 1832 ceph_assert(s != NULL);
7c673cae
FG
1833
1834 if (!s->is_homeless()) {
1835 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1836 << s->get_nref() << dendl;
1837 s->get();
1838 }
1839}
1840
1841void Objecter::_reopen_session(OSDSession *s)
1842{
91327a77 1843 // rwlock is locked unique
7c673cae
FG
1844 // s->lock is locked
1845
11fdf7f2 1846 auto addrs = osdmap->get_addrs(s->osd);
7c673cae 1847 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
11fdf7f2 1848 << addrs << dendl;
7c673cae
FG
1849 if (s->con) {
1850 s->con->set_priv(NULL);
1851 s->con->mark_down();
1852 logger->inc(l_osdc_osd_session_close);
1853 }
11fdf7f2
TL
1854 s->con = messenger->connect_to_osd(addrs);
1855 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1856 s->incarnation++;
1857 logger->inc(l_osdc_osd_session_open);
1858}
1859
1860void Objecter::close_session(OSDSession *s)
1861{
1862 // rwlock is locked unique
1863
1864 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1865 if (s->con) {
1866 s->con->set_priv(NULL);
1867 s->con->mark_down();
1868 logger->inc(l_osdc_osd_session_close);
1869 }
1870 OSDSession::unique_lock sl(s->lock);
1871
1872 std::list<LingerOp*> homeless_lingers;
1873 std::list<CommandOp*> homeless_commands;
1874 std::list<Op*> homeless_ops;
1875
1876 while (!s->linger_ops.empty()) {
1877 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1878 ldout(cct, 10) << " linger_op " << i->first << dendl;
1879 homeless_lingers.push_back(i->second);
1880 _session_linger_op_remove(s, i->second);
1881 }
1882
1883 while (!s->ops.empty()) {
1884 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1885 ldout(cct, 10) << " op " << i->first << dendl;
1886 homeless_ops.push_back(i->second);
1887 _session_op_remove(s, i->second);
1888 }
1889
1890 while (!s->command_ops.empty()) {
1891 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1892 ldout(cct, 10) << " command_op " << i->first << dendl;
1893 homeless_commands.push_back(i->second);
1894 _session_command_op_remove(s, i->second);
1895 }
1896
1897 osd_sessions.erase(s->osd);
1898 sl.unlock();
1899 put_session(s);
1900
1901 // Assign any leftover ops to the homeless session
1902 {
1903 OSDSession::unique_lock hsl(homeless_session->lock);
1904 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1905 i != homeless_lingers.end(); ++i) {
1906 _session_linger_op_assign(homeless_session, *i);
1907 }
1908 for (std::list<Op*>::iterator i = homeless_ops.begin();
1909 i != homeless_ops.end(); ++i) {
1910 _session_op_assign(homeless_session, *i);
1911 }
1912 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1913 i != homeless_commands.end(); ++i) {
1914 _session_command_op_assign(homeless_session, *i);
1915 }
1916 }
1917
1918 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1919}
1920
9f95a23c 1921void Objecter::wait_for_osd_map(epoch_t e)
7c673cae
FG
1922{
1923 unique_lock l(rwlock);
9f95a23c 1924 if (osdmap->get_epoch() >= e) {
7c673cae
FG
1925 l.unlock();
1926 return;
1927 }
1928
1929 // Leave this since it goes with C_SafeCond
9f95a23c
TL
1930 ceph::mutex lock = ceph::make_mutex("");
1931 ceph::condition_variable cond;
7c673cae 1932 bool done;
9f95a23c
TL
1933 std::unique_lock mlock{lock};
1934 C_SafeCond *context = new C_SafeCond(lock, cond, &done, NULL);
1935 waiting_for_map[e].push_back(pair<Context*, int>(context, 0));
7c673cae 1936 l.unlock();
9f95a23c 1937 cond.wait(mlock, [&done] { return done; });
7c673cae
FG
1938}
1939
1940struct C_Objecter_GetVersion : public Context {
1941 Objecter *objecter;
1942 uint64_t oldest, newest;
1943 Context *fin;
1944 C_Objecter_GetVersion(Objecter *o, Context *c)
1945 : objecter(o), oldest(0), newest(0), fin(c) {}
1946 void finish(int r) override {
1947 if (r >= 0) {
1948 objecter->get_latest_version(oldest, newest, fin);
1949 } else if (r == -EAGAIN) { // try again as instructed
1950 objecter->wait_for_latest_osdmap(fin);
1951 } else {
1952 // it doesn't return any other error codes!
1953 ceph_abort();
1954 }
1955 }
1956};
1957
1958void Objecter::wait_for_latest_osdmap(Context *fin)
1959{
1960 ldout(cct, 10) << __func__ << dendl;
1961 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1962 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1963}
1964
1965void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1966{
1967 unique_lock wl(rwlock);
7c673cae 1968 if (osdmap->get_epoch() >= newest) {
11fdf7f2
TL
1969 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1970 wl.unlock();
7c673cae
FG
1971 if (fin)
1972 fin->complete(0);
1973 return;
1974 }
1975
1976 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1977 _wait_for_new_map(fin, newest, 0);
1978}
1979
1980void Objecter::maybe_request_map()
1981{
1982 shared_lock rl(rwlock);
1983 _maybe_request_map();
1984}
1985
1986void Objecter::_maybe_request_map()
1987{
1988 // rwlock is locked
1989 int flag = 0;
1990 if (_osdmap_full_flag()
1991 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1992 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1993 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1994 "osd map (FULL flag is set)" << dendl;
1995 } else {
1996 ldout(cct, 10)
1997 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1998 flag = CEPH_SUBSCRIBE_ONETIME;
1999 }
2000 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
2001 if (monc->sub_want("osdmap", epoch, flag)) {
2002 monc->renew_subs();
2003 }
2004}
2005
2006void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
2007{
2008 // rwlock is locked unique
2009 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
2010 _maybe_request_map();
2011}
2012
2013
2014/**
2015 * Use this together with wait_for_map: this is a pre-check to avoid
2016 * allocating a Context for wait_for_map if we can see that we
2017 * definitely already have the epoch.
2018 *
2019 * This does *not* replace the need to handle the return value of
2020 * wait_for_map: just because we don't have it in this pre-check
2021 * doesn't mean we won't have it when calling back into wait_for_map,
2022 * since the objecter lock is dropped in between.
2023 */
2024bool Objecter::have_map(const epoch_t epoch)
2025{
2026 shared_lock rl(rwlock);
2027 if (osdmap->get_epoch() >= epoch) {
2028 return true;
2029 } else {
2030 return false;
2031 }
2032}
2033
2034bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2035{
2036 unique_lock wl(rwlock);
2037 if (osdmap->get_epoch() >= epoch) {
2038 return true;
2039 }
2040 _wait_for_new_map(c, epoch, err);
2041 return false;
2042}
2043
7c673cae
FG
2044void Objecter::_kick_requests(OSDSession *session,
2045 map<uint64_t, LingerOp *>& lresend)
2046{
2047 // rwlock is locked unique
2048
2049 // clear backoffs
2050 session->backoffs.clear();
2051 session->backoffs_by_id.clear();
2052
2053 // resend ops
2054 map<ceph_tid_t,Op*> resend; // resend in tid order
2055 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2056 p != session->ops.end();) {
2057 Op *op = p->second;
2058 ++p;
7c673cae
FG
2059 if (op->should_resend) {
2060 if (!op->target.paused)
2061 resend[op->tid] = op;
2062 } else {
2063 _op_cancel_map_check(op);
2064 _cancel_linger_op(op);
2065 }
2066 }
2067
11fdf7f2 2068 logger->inc(l_osdc_op_resend, resend.size());
7c673cae
FG
2069 while (!resend.empty()) {
2070 _send_op(resend.begin()->second);
2071 resend.erase(resend.begin());
2072 }
2073
2074 // resend lingers
11fdf7f2 2075 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
7c673cae
FG
2076 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2077 j != session->linger_ops.end(); ++j) {
2078 LingerOp *op = j->second;
2079 op->get();
11fdf7f2 2080 ceph_assert(lresend.count(j->first) == 0);
7c673cae
FG
2081 lresend[j->first] = op;
2082 }
2083
2084 // resend commands
11fdf7f2 2085 logger->inc(l_osdc_command_resend, session->command_ops.size());
7c673cae
FG
2086 map<uint64_t,CommandOp*> cresend; // resend in order
2087 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2088 k != session->command_ops.end(); ++k) {
7c673cae
FG
2089 cresend[k->first] = k->second;
2090 }
2091 while (!cresend.empty()) {
2092 _send_command(cresend.begin()->second);
2093 cresend.erase(cresend.begin());
2094 }
2095}
2096
2097void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2098 unique_lock& ul)
2099{
11fdf7f2 2100 ceph_assert(ul.owns_lock());
7c673cae
FG
2101 shunique_lock sul(std::move(ul));
2102 while (!lresend.empty()) {
2103 LingerOp *op = lresend.begin()->second;
2104 if (!op->canceled) {
2105 _send_linger(op, sul);
2106 }
2107 op->put();
2108 lresend.erase(lresend.begin());
2109 }
11fdf7f2 2110 ul = sul.release_to_unique();
7c673cae
FG
2111}
2112
2113void Objecter::start_tick()
2114{
11fdf7f2 2115 ceph_assert(tick_event == 0);
7c673cae
FG
2116 tick_event =
2117 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2118 &Objecter::tick, this);
2119}
2120
2121void Objecter::tick()
2122{
2123 shared_lock rl(rwlock);
2124
2125 ldout(cct, 10) << "tick" << dendl;
2126
2127 // we are only called by C_Tick
2128 tick_event = 0;
2129
31f18b77 2130 if (!initialized) {
7c673cae
FG
2131 // we raced with shutdown
2132 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2133 return;
2134 }
2135
2136 set<OSDSession*> toping;
2137
2138
2139 // look for laggy requests
11fdf7f2 2140 auto cutoff = ceph::coarse_mono_clock::now();
7c673cae
FG
2141 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2142
2143 unsigned laggy_ops = 0;
2144
2145 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2146 siter != osd_sessions.end(); ++siter) {
2147 OSDSession *s = siter->second;
2148 OSDSession::lock_guard l(s->lock);
2149 bool found = false;
2150 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2151 p != s->ops.end();
2152 ++p) {
2153 Op *op = p->second;
11fdf7f2 2154 ceph_assert(op->session);
7c673cae
FG
2155 if (op->stamp < cutoff) {
2156 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2157 << " is laggy" << dendl;
2158 found = true;
2159 ++laggy_ops;
2160 }
2161 }
2162 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2163 p != s->linger_ops.end();
2164 ++p) {
2165 LingerOp *op = p->second;
2166 LingerOp::unique_lock wl(op->watch_lock);
11fdf7f2 2167 ceph_assert(op->session);
7c673cae
FG
2168 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2169 << " (osd." << op->session->osd << ")" << dendl;
2170 found = true;
2171 if (op->is_watch && op->registered && !op->last_error)
2172 _send_linger_ping(op);
2173 }
2174 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2175 p != s->command_ops.end();
2176 ++p) {
2177 CommandOp *op = p->second;
11fdf7f2 2178 ceph_assert(op->session);
7c673cae
FG
2179 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2180 << " (osd." << op->session->osd << ")" << dendl;
2181 found = true;
2182 }
2183 if (found)
2184 toping.insert(s);
2185 }
31f18b77 2186 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2187 _maybe_request_map();
2188 }
2189
2190 logger->set(l_osdc_op_laggy, laggy_ops);
2191 logger->set(l_osdc_osd_laggy, toping.size());
2192
2193 if (!toping.empty()) {
2194 // send a ping to these osds, to ensure we detect any session resets
2195 // (osd reply message policy is lossy)
2196 for (set<OSDSession*>::const_iterator i = toping.begin();
2197 i != toping.end();
2198 ++i) {
2199 (*i)->con->send_message(new MPing);
2200 }
2201 }
2202
11fdf7f2 2203 // Make sure we don't reschedule if we wake up after shutdown
31f18b77 2204 if (initialized) {
7c673cae
FG
2205 tick_event = timer.reschedule_me(ceph::make_timespan(
2206 cct->_conf->objecter_tick_interval));
2207 }
2208}
2209
2210void Objecter::resend_mon_ops()
2211{
2212 unique_lock wl(rwlock);
2213
2214 ldout(cct, 10) << "resend_mon_ops" << dendl;
2215
2216 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2217 p != poolstat_ops.end();
2218 ++p) {
2219 _poolstat_submit(p->second);
2220 logger->inc(l_osdc_poolstat_resend);
2221 }
2222
2223 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2224 p != statfs_ops.end();
2225 ++p) {
2226 _fs_stats_submit(p->second);
2227 logger->inc(l_osdc_statfs_resend);
2228 }
2229
2230 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2231 p != pool_ops.end();
2232 ++p) {
2233 _pool_op_submit(p->second);
2234 logger->inc(l_osdc_poolop_resend);
2235 }
2236
2237 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2238 p != check_latest_map_ops.end();
2239 ++p) {
2240 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2241 monc->get_version("osdmap", &c->latest, NULL, c);
2242 }
2243
2244 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2245 p != check_latest_map_lingers.end();
2246 ++p) {
2247 C_Linger_Map_Latest *c
2248 = new C_Linger_Map_Latest(this, p->second->linger_id);
2249 monc->get_version("osdmap", &c->latest, NULL, c);
2250 }
2251
2252 for (map<uint64_t, CommandOp*>::iterator p
2253 = check_latest_map_commands.begin();
2254 p != check_latest_map_commands.end();
2255 ++p) {
2256 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2257 monc->get_version("osdmap", &c->latest, NULL, c);
2258 }
2259}
2260
2261// read | write ---------------------------
2262
2263void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2264{
2265 shunique_lock rl(rwlock, ceph::acquire_shared);
2266 ceph_tid_t tid = 0;
2267 if (!ptid)
2268 ptid = &tid;
31f18b77 2269 op->trace.event("op submit");
7c673cae
FG
2270 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2271}
2272
2273void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2274 ceph_tid_t *ptid,
2275 int *ctx_budget)
2276{
11fdf7f2 2277 ceph_assert(initialized);
7c673cae 2278
11fdf7f2
TL
2279 ceph_assert(op->ops.size() == op->out_bl.size());
2280 ceph_assert(op->ops.size() == op->out_rval.size());
2281 ceph_assert(op->ops.size() == op->out_handler.size());
7c673cae
FG
2282
2283 // throttle. before we look at any state, because
2284 // _take_op_budget() may drop our lock while it blocks.
2285 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2286 int op_budget = _take_op_budget(op, sul);
2287 // take and pass out the budget for the first OP
2288 // in the context session
2289 if (ctx_budget && (*ctx_budget == -1)) {
2290 *ctx_budget = op_budget;
2291 }
2292 }
2293
2294 if (osd_timeout > timespan(0)) {
2295 if (op->tid == 0)
31f18b77 2296 op->tid = ++last_tid;
7c673cae
FG
2297 auto tid = op->tid;
2298 op->ontimeout = timer.add_event(osd_timeout,
2299 [this, tid]() {
2300 op_cancel(tid, -ETIMEDOUT); });
2301 }
2302
2303 _op_submit(op, sul, ptid);
2304}
2305
2306void Objecter::_send_op_account(Op *op)
2307{
31f18b77 2308 inflight_ops++;
7c673cae
FG
2309
2310 // add to gather set(s)
2311 if (op->onfinish) {
31f18b77 2312 num_in_flight++;
7c673cae
FG
2313 } else {
2314 ldout(cct, 20) << " note: not requesting reply" << dendl;
2315 }
2316
2317 logger->inc(l_osdc_op_active);
2318 logger->inc(l_osdc_op);
2319
2320 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2321 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2322 logger->inc(l_osdc_op_rmw);
2323 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2324 logger->inc(l_osdc_op_w);
2325 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2326 logger->inc(l_osdc_op_r);
2327
2328 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2329 logger->inc(l_osdc_op_pg);
2330
2331 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2332 int code = l_osdc_osdop_other;
2333 switch (p->op.op) {
2334 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2335 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2336 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2337 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2338 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2339 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2340 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2341 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2342 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2343 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2344 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2345 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2346 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2347 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2348 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2349 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2350 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
7c673cae
FG
2351
2352 // OMAP read operations
2353 case CEPH_OSD_OP_OMAPGETVALS:
2354 case CEPH_OSD_OP_OMAPGETKEYS:
2355 case CEPH_OSD_OP_OMAPGETHEADER:
2356 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2357 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2358
2359 // OMAP write operations
2360 case CEPH_OSD_OP_OMAPSETVALS:
2361 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2362
2363 // OMAP del operations
2364 case CEPH_OSD_OP_OMAPCLEAR:
2365 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2366
2367 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2368 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2369 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2370 }
2371 if (code)
2372 logger->inc(code);
2373 }
2374}
2375
2376void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2377{
2378 // rwlock is locked
2379
2380 ldout(cct, 10) << __func__ << " op " << op << dendl;
2381
2382 // pick target
11fdf7f2 2383 ceph_assert(op->session == NULL);
7c673cae
FG
2384 OSDSession *s = NULL;
2385
2386 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2387 == RECALC_OP_TARGET_POOL_DNE;
2388
2389 // Try to get a session, including a retry if we need to take write lock
2390 int r = _get_session(op->target.osd, &s, sul);
2391 if (r == -EAGAIN ||
11fdf7f2
TL
2392 (check_for_latest_map && sul.owns_lock_shared()) ||
2393 cct->_conf->objecter_debug_inject_relock_delay) {
7c673cae
FG
2394 epoch_t orig_epoch = osdmap->get_epoch();
2395 sul.unlock();
2396 if (cct->_conf->objecter_debug_inject_relock_delay) {
2397 sleep(1);
2398 }
2399 sul.lock();
2400 if (orig_epoch != osdmap->get_epoch()) {
2401 // map changed; recalculate mapping
2402 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2403 << dendl;
2404 check_for_latest_map = _calc_target(&op->target, nullptr)
2405 == RECALC_OP_TARGET_POOL_DNE;
2406 if (s) {
2407 put_session(s);
2408 s = NULL;
2409 r = -EAGAIN;
2410 }
2411 }
2412 }
2413 if (r == -EAGAIN) {
11fdf7f2 2414 ceph_assert(s == NULL);
7c673cae
FG
2415 r = _get_session(op->target.osd, &s, sul);
2416 }
11fdf7f2
TL
2417 ceph_assert(r == 0);
2418 ceph_assert(s); // may be homeless
7c673cae
FG
2419
2420 _send_op_account(op);
2421
2422 // send?
2423
11fdf7f2 2424 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
7c673cae 2425
9f95a23c 2426 if (pool_full_try) {
7c673cae
FG
2427 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2428 }
2429
2430 bool need_send = false;
9f95a23c
TL
2431 if (op->target.paused) {
2432 ldout(cct, 10) << " tid " << op->tid << " op " << op << " is paused"
7c673cae 2433 << dendl;
7c673cae
FG
2434 _maybe_request_map();
2435 } else if (!s->is_homeless()) {
2436 need_send = true;
2437 } else {
2438 _maybe_request_map();
2439 }
2440
7c673cae
FG
2441 OSDSession::unique_lock sl(s->lock);
2442 if (op->tid == 0)
31f18b77 2443 op->tid = ++last_tid;
7c673cae
FG
2444
2445 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2446 << " '" << op->target.base_oloc << "' '"
2447 << op->target.target_oloc << "' " << op->ops << " tid "
2448 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2449 << dendl;
2450
2451 _session_op_assign(s, op);
2452
2453 if (need_send) {
11fdf7f2 2454 _send_op(op);
7c673cae
FG
2455 }
2456
2457 // Last chance to touch Op here, after giving up session lock it can
2458 // be freed at any time by response handler.
2459 ceph_tid_t tid = op->tid;
2460 if (check_for_latest_map) {
2461 _send_op_map_check(op);
2462 }
2463 if (ptid)
2464 *ptid = tid;
2465 op = NULL;
2466
2467 sl.unlock();
2468 put_session(s);
2469
31f18b77 2470 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2471}
2472
2473int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2474{
11fdf7f2 2475 ceph_assert(initialized);
7c673cae
FG
2476
2477 OSDSession::unique_lock sl(s->lock);
2478
2479 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2480 if (p == s->ops.end()) {
2481 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2482 << s->osd << dendl;
2483 return -ENOENT;
2484 }
2485
11fdf7f2 2486#if 0
7c673cae 2487 if (s->con) {
9f95a23c 2488 ldout(cct, 20) << " revoking rx ceph::buffer for " << tid
7c673cae
FG
2489 << " on " << s->con << dendl;
2490 s->con->revoke_rx_buffer(tid);
2491 }
11fdf7f2 2492#endif
7c673cae
FG
2493
2494 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2495 << dendl;
2496 Op *op = p->second;
2497 if (op->onfinish) {
31f18b77 2498 num_in_flight--;
7c673cae
FG
2499 op->onfinish->complete(r);
2500 op->onfinish = NULL;
2501 }
2502 _op_cancel_map_check(op);
2503 _finish_op(op, r);
2504 sl.unlock();
2505
2506 return 0;
2507}
2508
2509int Objecter::op_cancel(ceph_tid_t tid, int r)
2510{
2511 int ret = 0;
2512
2513 unique_lock wl(rwlock);
2514 ret = _op_cancel(tid, r);
2515
2516 return ret;
2517}
2518
94b18763
FG
2519int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2520{
2521 unique_lock wl(rwlock);
2522 ldout(cct,10) << __func__ << " " << tids << dendl;
2523 for (auto tid : tids) {
2524 _op_cancel(tid, r);
2525 }
2526 return 0;
2527}
2528
7c673cae
FG
2529int Objecter::_op_cancel(ceph_tid_t tid, int r)
2530{
2531 int ret = 0;
2532
2533 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2534 << dendl;
2535
2536start:
2537
2538 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2539 siter != osd_sessions.end(); ++siter) {
2540 OSDSession *s = siter->second;
2541 OSDSession::shared_lock sl(s->lock);
2542 if (s->ops.find(tid) != s->ops.end()) {
2543 sl.unlock();
2544 ret = op_cancel(s, tid, r);
2545 if (ret == -ENOENT) {
2546 /* oh no! raced, maybe tid moved to another session, restarting */
2547 goto start;
2548 }
2549 return ret;
2550 }
2551 }
2552
2553 ldout(cct, 5) << __func__ << ": tid " << tid
2554 << " not found in live sessions" << dendl;
2555
2556 // Handle case where the op is in homeless session
2557 OSDSession::shared_lock sl(homeless_session->lock);
2558 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2559 sl.unlock();
2560 ret = op_cancel(homeless_session, tid, r);
2561 if (ret == -ENOENT) {
2562 /* oh no! raced, maybe tid moved to another session, restarting */
2563 goto start;
2564 } else {
2565 return ret;
2566 }
2567 } else {
2568 sl.unlock();
2569 }
2570
2571 ldout(cct, 5) << __func__ << ": tid " << tid
2572 << " not found in homeless session" << dendl;
2573
2574 return ret;
2575}
2576
2577
2578epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2579{
2580 unique_lock wl(rwlock);
2581
2582 std::vector<ceph_tid_t> to_cancel;
2583 bool found = false;
2584
2585 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2586 siter != osd_sessions.end(); ++siter) {
2587 OSDSession *s = siter->second;
2588 OSDSession::shared_lock sl(s->lock);
2589 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2590 op_i != s->ops.end(); ++op_i) {
2591 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2592 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2593 to_cancel.push_back(op_i->first);
2594 }
2595 }
2596 sl.unlock();
2597
2598 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2599 titer != to_cancel.end();
2600 ++titer) {
2601 int cancel_result = op_cancel(s, *titer, r);
2602 // We hold rwlock across search and cancellation, so cancels
2603 // should always succeed
11fdf7f2 2604 ceph_assert(cancel_result == 0);
7c673cae
FG
2605 }
2606 if (!found && to_cancel.size())
2607 found = true;
2608 to_cancel.clear();
2609 }
2610
2611 const epoch_t epoch = osdmap->get_epoch();
2612
2613 wl.unlock();
2614
2615 if (found) {
2616 return epoch;
2617 } else {
2618 return -1;
2619 }
2620}
2621
2622bool Objecter::is_pg_changed(
2623 int oldprimary,
2624 const vector<int>& oldacting,
2625 int newprimary,
2626 const vector<int>& newacting,
2627 bool any_change)
2628{
9f95a23c 2629 if (OSDMap::primary_changed_broken( // https://tracker.ceph.com/issues/43213
7c673cae
FG
2630 oldprimary,
2631 oldacting,
2632 newprimary,
2633 newacting))
2634 return true;
2635 if (any_change && oldacting != newacting)
2636 return true;
2637 return false; // same primary (tho replicas may have changed)
2638}
2639
2640bool Objecter::target_should_be_paused(op_target_t *t)
2641{
2642 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2643 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2644 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
9f95a23c 2645 (t->respects_full() && (_osdmap_full_flag() || _osdmap_pool_full(*pi)));
7c673cae
FG
2646
2647 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2648 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2649 (osdmap->get_epoch() < epoch_barrier);
2650}
2651
2652/**
2653 * Locking public accessor for _osdmap_full_flag
2654 */
2655bool Objecter::osdmap_full_flag() const
2656{
2657 shared_lock rl(rwlock);
2658
2659 return _osdmap_full_flag();
2660}
2661
2662bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2663{
2664 shared_lock rl(rwlock);
2665
2666 if (_osdmap_full_flag()) {
2667 return true;
2668 }
2669
2670 return _osdmap_pool_full(pool_id);
2671}
2672
2673bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2674{
2675 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2676 if (pool == NULL) {
2677 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2678 return false;
2679 }
2680
2681 return _osdmap_pool_full(*pool);
2682}
2683
2684bool Objecter::_osdmap_has_pool_full() const
2685{
2686 for (map<int64_t, pg_pool_t>::const_iterator it
2687 = osdmap->get_pools().begin();
2688 it != osdmap->get_pools().end(); ++it) {
2689 if (_osdmap_pool_full(it->second))
2690 return true;
2691 }
2692 return false;
2693}
2694
7c673cae
FG
2695/**
2696 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2697 */
2698bool Objecter::_osdmap_full_flag() const
2699{
11fdf7f2 2700 // Ignore the FULL flag if the caller does not have honor_osdmap_full
9f95a23c 2701 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_pool_full;
7c673cae
FG
2702}
2703
2704void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2705{
2706 for (map<int64_t, pg_pool_t>::const_iterator it
2707 = osdmap->get_pools().begin();
2708 it != osdmap->get_pools().end(); ++it) {
2709 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2710 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2711 } else {
2712 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2713 pool_full_map[it->first];
2714 }
2715 }
2716}
2717
2718int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2719 const string& ns)
2720{
2721 shared_lock rl(rwlock);
2722 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2723 if (!p)
2724 return -ENOENT;
2725 return p->hash_key(key, ns);
2726}
2727
2728int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2729 const string& ns)
2730{
2731 shared_lock rl(rwlock);
2732 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2733 if (!p)
2734 return -ENOENT;
2735 return p->raw_hash_to_pg(p->hash_key(key, ns));
2736}
2737
11fdf7f2
TL
2738void Objecter::_prune_snapc(
2739 const mempool::osdmap::map<int64_t,
9f95a23c 2740 snap_interval_set_t>& new_removed_snaps,
11fdf7f2
TL
2741 Op *op)
2742{
2743 bool match = false;
2744 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2745 if (i != new_removed_snaps.end()) {
2746 for (auto s : op->snapc.snaps) {
2747 if (i->second.contains(s)) {
2748 match = true;
2749 break;
2750 }
2751 }
2752 if (match) {
2753 vector<snapid_t> new_snaps;
2754 for (auto s : op->snapc.snaps) {
2755 if (!i->second.contains(s)) {
2756 new_snaps.push_back(s);
2757 }
2758 }
2759 op->snapc.snaps.swap(new_snaps);
2760 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2761 << " (was " << new_snaps << ")" << dendl;
2762 }
2763 }
2764}
2765
7c673cae
FG
2766int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2767{
2768 // rwlock is locked
2769 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2770 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2771 t->epoch = osdmap->get_epoch();
2772 ldout(cct,20) << __func__ << " epoch " << t->epoch
2773 << " base " << t->base_oid << " " << t->base_oloc
2774 << " precalc_pgid " << (int)t->precalc_pgid
2775 << " pgid " << t->base_pgid
2776 << (is_read ? " is_read" : "")
2777 << (is_write ? " is_write" : "")
2778 << dendl;
2779
2780 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2781 if (!pi) {
2782 t->osd = -1;
2783 return RECALC_OP_TARGET_POOL_DNE;
2784 }
2785 ldout(cct,30) << __func__ << " base pi " << pi
2786 << " pg_num " << pi->get_pg_num() << dendl;
2787
2788 bool force_resend = false;
2789 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2790 if (t->last_force_resend < pi->last_force_op_resend) {
2791 t->last_force_resend = pi->last_force_op_resend;
2792 force_resend = true;
2793 } else if (t->last_force_resend == 0) {
2794 force_resend = true;
2795 }
2796 }
2797
2798 // apply tiering
2799 t->target_oid = t->base_oid;
2800 t->target_oloc = t->base_oloc;
2801 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2802 if (is_read && pi->has_read_tier())
2803 t->target_oloc.pool = pi->read_tier;
2804 if (is_write && pi->has_write_tier())
2805 t->target_oloc.pool = pi->write_tier;
2806 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2807 if (!pi) {
2808 t->osd = -1;
2809 return RECALC_OP_TARGET_POOL_DNE;
2810 }
2811 }
2812
2813 pg_t pgid;
2814 if (t->precalc_pgid) {
11fdf7f2
TL
2815 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2816 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2817 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
7c673cae
FG
2818 pgid = t->base_pgid;
2819 } else {
2820 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2821 pgid);
2822 if (ret == -ENOENT) {
2823 t->osd = -1;
2824 return RECALC_OP_TARGET_POOL_DNE;
2825 }
2826 }
2827 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2828 << t->target_oloc << " -> pgid " << pgid << dendl;
2829 ldout(cct,30) << __func__ << " target pi " << pi
2830 << " pg_num " << pi->get_pg_num() << dendl;
2831 t->pool_ever_existed = true;
2832
2833 int size = pi->size;
2834 int min_size = pi->min_size;
2835 unsigned pg_num = pi->get_pg_num();
9f95a23c 2836 unsigned pg_num_mask = pi->get_pg_num_mask();
11fdf7f2 2837 unsigned pg_num_pending = pi->get_pg_num_pending();
7c673cae
FG
2838 int up_primary, acting_primary;
2839 vector<int> up, acting;
9f95a23c
TL
2840 ps_t actual_ps = ceph_stable_mod(pgid.ps(), pg_num, pg_num_mask);
2841 pg_t actual_pgid(actual_ps, pgid.pool());
2842 pg_mapping_t pg_mapping;
2843 pg_mapping.epoch = osdmap->get_epoch();
2844 if (lookup_pg_mapping(actual_pgid, &pg_mapping)) {
2845 up = pg_mapping.up;
2846 up_primary = pg_mapping.up_primary;
2847 acting = pg_mapping.acting;
2848 acting_primary = pg_mapping.acting_primary;
2849 } else {
2850 osdmap->pg_to_up_acting_osds(actual_pgid, &up, &up_primary,
2851 &acting, &acting_primary);
2852 pg_mapping_t pg_mapping(osdmap->get_epoch(),
2853 up, up_primary, acting, acting_primary);
2854 update_pg_mapping(actual_pgid, std::move(pg_mapping));
2855 }
7c673cae 2856 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2857 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2858 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2859 pg_t prev_pgid(prev_seed, pgid.pool());
2860 if (any_change && PastIntervals::is_new_interval(
2861 t->acting_primary,
2862 acting_primary,
2863 t->acting,
2864 acting,
2865 t->up_primary,
2866 up_primary,
2867 t->up,
2868 up,
2869 t->size,
2870 size,
2871 t->min_size,
2872 min_size,
2873 t->pg_num,
2874 pg_num,
11fdf7f2
TL
2875 t->pg_num_pending,
2876 pg_num_pending,
7c673cae
FG
2877 t->sort_bitwise,
2878 sort_bitwise,
c07f9fc5
FG
2879 t->recovery_deletes,
2880 recovery_deletes,
7c673cae
FG
2881 prev_pgid)) {
2882 force_resend = true;
2883 }
2884
2885 bool unpaused = false;
f64942e4
AA
2886 bool should_be_paused = target_should_be_paused(t);
2887 if (t->paused && !should_be_paused) {
7c673cae
FG
2888 unpaused = true;
2889 }
9f95a23c
TL
2890 if (t->paused != should_be_paused) {
2891 ldout(cct, 10) << __func__ << " paused " << t->paused
2892 << " -> " << should_be_paused << dendl;
2893 t->paused = should_be_paused;
2894 }
7c673cae
FG
2895
2896 bool legacy_change =
2897 t->pgid != pgid ||
2898 is_pg_changed(
2899 t->acting_primary, t->acting, acting_primary, acting,
2900 t->used_replica || any_change);
11fdf7f2 2901 bool split_or_merge = false;
7c673cae 2902 if (t->pg_num) {
11fdf7f2
TL
2903 split_or_merge =
2904 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2905 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2906 prev_pgid.is_merge_target(t->pg_num, pg_num);
7c673cae
FG
2907 }
2908
11fdf7f2 2909 if (legacy_change || split_or_merge || force_resend) {
7c673cae
FG
2910 t->pgid = pgid;
2911 t->acting = acting;
2912 t->acting_primary = acting_primary;
2913 t->up_primary = up_primary;
2914 t->up = up;
2915 t->size = size;
2916 t->min_size = min_size;
2917 t->pg_num = pg_num;
9f95a23c 2918 t->pg_num_mask = pg_num_mask;
11fdf7f2 2919 t->pg_num_pending = pg_num_pending;
9f95a23c
TL
2920 spg_t spgid(actual_pgid);
2921 if (pi->is_erasure()) {
2922 for (uint8_t i = 0; i < acting.size(); ++i) {
2923 if (acting[i] == acting_primary) {
2924 spgid.reset_shard(shard_id_t(i));
2925 break;
2926 }
2927 }
2928 }
2929 t->actual_pgid = spgid;
7c673cae 2930 t->sort_bitwise = sort_bitwise;
c07f9fc5 2931 t->recovery_deletes = recovery_deletes;
7c673cae
FG
2932 ldout(cct, 10) << __func__ << " "
2933 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2934 << " acting " << acting
2935 << " primary " << acting_primary << dendl;
2936 t->used_replica = false;
e306af50
TL
2937 if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
2938 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
2939 !is_write && pi->is_replicated() && acting.size() > 1) {
7c673cae 2940 int osd;
e306af50
TL
2941 ceph_assert(is_read && acting[0] == acting_primary);
2942 if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
7c673cae
FG
2943 int p = rand() % acting.size();
2944 if (p)
2945 t->used_replica = true;
2946 osd = acting[p];
2947 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2948 << dendl;
e306af50 2949 } else {
7c673cae
FG
2950 // look for a local replica. prefer the primary if the
2951 // distance is the same.
2952 int best = -1;
2953 int best_locality = 0;
2954 for (unsigned i = 0; i < acting.size(); ++i) {
2955 int locality = osdmap->crush->get_common_ancestor_distance(
2956 cct, acting[i], crush_location);
2957 ldout(cct, 20) << __func__ << " localize: rank " << i
2958 << " osd." << acting[i]
2959 << " locality " << locality << dendl;
2960 if (i == 0 ||
2961 (locality >= 0 && best_locality >= 0 &&
2962 locality < best_locality) ||
2963 (best_locality < 0 && locality >= 0)) {
2964 best = i;
2965 best_locality = locality;
2966 if (i)
2967 t->used_replica = true;
2968 }
2969 }
11fdf7f2 2970 ceph_assert(best >= 0);
7c673cae 2971 osd = acting[best];
7c673cae
FG
2972 }
2973 t->osd = osd;
e306af50
TL
2974 } else {
2975 t->osd = acting_primary;
7c673cae
FG
2976 }
2977 }
2978 if (legacy_change || unpaused || force_resend) {
2979 return RECALC_OP_TARGET_NEED_RESEND;
2980 }
11fdf7f2 2981 if (split_or_merge &&
9f95a23c 2982 (osdmap->require_osd_release >= ceph_release_t::luminous ||
91327a77
AA
2983 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
2984 RESEND_ON_SPLIT))) {
7c673cae
FG
2985 return RECALC_OP_TARGET_NEED_RESEND;
2986 }
2987 return RECALC_OP_TARGET_NO_ACTION;
2988}
2989
2990int Objecter::_map_session(op_target_t *target, OSDSession **s,
2991 shunique_lock& sul)
2992{
2993 _calc_target(target, nullptr);
2994 return _get_session(target->osd, s, sul);
2995}
2996
2997void Objecter::_session_op_assign(OSDSession *to, Op *op)
2998{
2999 // to->lock is locked
11fdf7f2
TL
3000 ceph_assert(op->session == NULL);
3001 ceph_assert(op->tid);
7c673cae
FG
3002
3003 get_session(to);
3004 op->session = to;
3005 to->ops[op->tid] = op;
3006
3007 if (to->is_homeless()) {
31f18b77 3008 num_homeless_ops++;
7c673cae
FG
3009 }
3010
3011 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3012}
3013
3014void Objecter::_session_op_remove(OSDSession *from, Op *op)
3015{
11fdf7f2 3016 ceph_assert(op->session == from);
7c673cae
FG
3017 // from->lock is locked
3018
3019 if (from->is_homeless()) {
31f18b77 3020 num_homeless_ops--;
7c673cae
FG
3021 }
3022
3023 from->ops.erase(op->tid);
3024 put_session(from);
3025 op->session = NULL;
3026
3027 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3028}
3029
3030void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3031{
3032 // to lock is locked unique
11fdf7f2 3033 ceph_assert(op->session == NULL);
7c673cae
FG
3034
3035 if (to->is_homeless()) {
31f18b77 3036 num_homeless_ops++;
7c673cae
FG
3037 }
3038
3039 get_session(to);
3040 op->session = to;
3041 to->linger_ops[op->linger_id] = op;
3042
3043 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3044 << dendl;
3045}
3046
3047void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3048{
11fdf7f2 3049 ceph_assert(from == op->session);
7c673cae
FG
3050 // from->lock is locked unique
3051
3052 if (from->is_homeless()) {
31f18b77 3053 num_homeless_ops--;
7c673cae
FG
3054 }
3055
3056 from->linger_ops.erase(op->linger_id);
3057 put_session(from);
3058 op->session = NULL;
3059
3060 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3061 << dendl;
3062}
3063
3064void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3065{
11fdf7f2 3066 ceph_assert(from == op->session);
7c673cae
FG
3067 // from->lock is locked
3068
3069 if (from->is_homeless()) {
31f18b77 3070 num_homeless_ops--;
7c673cae
FG
3071 }
3072
3073 from->command_ops.erase(op->tid);
3074 put_session(from);
3075 op->session = NULL;
3076
3077 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3078}
3079
3080void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3081{
3082 // to->lock is locked
11fdf7f2
TL
3083 ceph_assert(op->session == NULL);
3084 ceph_assert(op->tid);
7c673cae
FG
3085
3086 if (to->is_homeless()) {
31f18b77 3087 num_homeless_ops++;
7c673cae
FG
3088 }
3089
3090 get_session(to);
3091 op->session = to;
3092 to->command_ops[op->tid] = op;
3093
3094 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3095}
3096
3097int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3098 shunique_lock& sul)
3099{
3100 // rwlock is locked unique
3101
3102 int r = _calc_target(&linger_op->target, nullptr, true);
3103 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3104 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3105 << " pgid " << linger_op->target.pgid
3106 << " acting " << linger_op->target.acting << dendl;
3107
3108 OSDSession *s = NULL;
3109 r = _get_session(linger_op->target.osd, &s, sul);
11fdf7f2 3110 ceph_assert(r == 0);
7c673cae
FG
3111
3112 if (linger_op->session != s) {
3113 // NB locking two sessions (s and linger_op->session) at the
3114 // same time here is only safe because we are the only one that
3115 // takes two, and we are holding rwlock for write. Disable
3116 // lockdep because it doesn't know that.
3117 OSDSession::unique_lock sl(s->lock);
3118 _session_linger_op_remove(linger_op->session, linger_op);
3119 _session_linger_op_assign(s, linger_op);
3120 }
3121
3122 put_session(s);
3123 return RECALC_OP_TARGET_NEED_RESEND;
3124 }
3125 return r;
3126}
3127
3128void Objecter::_cancel_linger_op(Op *op)
3129{
3130 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3131
11fdf7f2 3132 ceph_assert(!op->should_resend);
7c673cae
FG
3133 if (op->onfinish) {
3134 delete op->onfinish;
31f18b77 3135 num_in_flight--;
7c673cae
FG
3136 }
3137
3138 _finish_op(op, 0);
3139}
3140
3141void Objecter::_finish_op(Op *op, int r)
3142{
11fdf7f2 3143 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
7c673cae
FG
3144
3145 // op->session->lock is locked unique or op->session is null
3146
11fdf7f2
TL
3147 if (!op->ctx_budgeted && op->budget >= 0) {
3148 put_op_budget_bytes(op->budget);
3149 op->budget = -1;
3150 }
7c673cae
FG
3151
3152 if (op->ontimeout && r != -ETIMEDOUT)
3153 timer.cancel_event(op->ontimeout);
3154
3155 if (op->session) {
3156 _session_op_remove(op->session, op);
3157 }
3158
3159 logger->dec(l_osdc_op_active);
3160
11fdf7f2 3161 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
7c673cae 3162
31f18b77 3163 inflight_ops--;
7c673cae
FG
3164
3165 op->put();
3166}
3167
7c673cae
FG
3168MOSDOp *Objecter::_prepare_osd_op(Op *op)
3169{
3170 // rwlock is locked
3171
3172 int flags = op->target.flags;
3173 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3174
3175 // Nothing checks this any longer, but needed for compatibility with
3176 // pre-luminous osds
3177 flags |= CEPH_OSD_FLAG_ONDISK;
3178
9f95a23c 3179 if (!honor_pool_full)
7c673cae
FG
3180 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3181
3182 op->target.paused = false;
11fdf7f2 3183 op->stamp = ceph::coarse_mono_clock::now();
7c673cae
FG
3184
3185 hobject_t hobj = op->target.get_hobj();
31f18b77 3186 MOSDOp *m = new MOSDOp(client_inc, op->tid,
7c673cae
FG
3187 hobj, op->target.actual_pgid,
3188 osdmap->get_epoch(),
3189 flags, op->features);
3190
3191 m->set_snapid(op->snapid);
3192 m->set_snap_seq(op->snapc.seq);
3193 m->set_snaps(op->snapc.snaps);
3194
3195 m->ops = op->ops;
3196 m->set_mtime(op->mtime);
3197 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3198
3199 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3200 op->trace.init("op", &trace_endpoint);
3201 }
7c673cae
FG
3202
3203 if (op->priority)
3204 m->set_priority(op->priority);
3205 else
3206 m->set_priority(cct->_conf->osd_client_op_priority);
3207
3208 if (op->reqid != osd_reqid_t()) {
3209 m->set_reqid(op->reqid);
3210 }
3211
3212 logger->inc(l_osdc_op_send);
b32b8144
FG
3213 ssize_t sum = 0;
3214 for (unsigned i = 0; i < m->ops.size(); i++) {
3215 sum += m->ops[i].indata.length();
3216 }
3217 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3218
3219 return m;
3220}
3221
11fdf7f2 3222void Objecter::_send_op(Op *op)
7c673cae
FG
3223{
3224 // rwlock is locked
3225 // op->session->lock is locked
3226
3227 // backoff?
7c673cae
FG
3228 auto p = op->session->backoffs.find(op->target.actual_pgid);
3229 if (p != op->session->backoffs.end()) {
b32b8144 3230 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3231 auto q = p->second.lower_bound(hoid);
3232 if (q != p->second.begin()) {
3233 --q;
3234 if (hoid >= q->second.end) {
3235 ++q;
3236 }
3237 }
3238 if (q != p->second.end()) {
3239 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3240 << "," << q->second.end << ")" << dendl;
3241 int r = cmp(hoid, q->second.begin);
3242 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3243 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3244 << " id " << q->second.id << " on " << hoid
3245 << ", queuing " << op << " tid " << op->tid << dendl;
3246 return;
3247 }
3248 }
3249 }
3250
11fdf7f2
TL
3251 ceph_assert(op->tid > 0);
3252 MOSDOp *m = _prepare_osd_op(op);
7c673cae
FG
3253
3254 if (op->target.actual_pgid != m->get_spg()) {
3255 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3256 << m->get_spg() << " to " << op->target.actual_pgid
3257 << ", updating and reencoding" << dendl;
3258 m->set_spg(op->target.actual_pgid);
3259 m->clear_payload(); // reencode
3260 }
3261
3262 ldout(cct, 15) << "_send_op " << op->tid << " to "
3263 << op->target.actual_pgid << " on osd." << op->session->osd
3264 << dendl;
3265
3266 ConnectionRef con = op->session->con;
11fdf7f2 3267 ceph_assert(con);
7c673cae 3268
11fdf7f2 3269#if 0
9f95a23c 3270 // preallocated rx ceph::buffer?
7c673cae 3271 if (op->con) {
9f95a23c 3272 ldout(cct, 20) << " revoking rx ceph::buffer for " << op->tid << " on "
7c673cae
FG
3273 << op->con << dendl;
3274 op->con->revoke_rx_buffer(op->tid);
3275 }
3276 if (op->outbl &&
3277 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3278 op->outbl->length()) {
11fdf7f2 3279 op->outbl->invalidate_crc(); // messenger writes through c_str()
9f95a23c 3280 ldout(cct, 20) << " posting rx ceph::buffer for " << op->tid << " on " << con
7c673cae
FG
3281 << dendl;
3282 op->con = con;
3283 op->con->post_rx_buffer(op->tid, *op->outbl);
3284 }
11fdf7f2 3285#endif
7c673cae
FG
3286
3287 op->incarnation = op->session->incarnation;
3288
31f18b77
FG
3289 if (op->trace.valid()) {
3290 m->trace.init("op msg", nullptr, &op->trace);
3291 }
7c673cae
FG
3292 op->session->con->send_message(m);
3293}
3294
11fdf7f2 3295int Objecter::calc_op_budget(const vector<OSDOp>& ops)
7c673cae
FG
3296{
3297 int op_budget = 0;
11fdf7f2
TL
3298 for (vector<OSDOp>::const_iterator i = ops.begin();
3299 i != ops.end();
7c673cae
FG
3300 ++i) {
3301 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3302 op_budget += i->indata.length();
3303 } else if (ceph_osd_op_mode_read(i->op.op)) {
a8e16298
TL
3304 if (ceph_osd_op_uses_extent(i->op.op)) {
3305 if ((int64_t)i->op.extent.length > 0)
3306 op_budget += (int64_t)i->op.extent.length;
7c673cae 3307 } else if (ceph_osd_op_type_attr(i->op.op)) {
a8e16298 3308 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
7c673cae
FG
3309 }
3310 }
3311 }
3312 return op_budget;
3313}
3314
3315void Objecter::_throttle_op(Op *op,
3316 shunique_lock& sul,
3317 int op_budget)
3318{
11fdf7f2 3319 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
3320 bool locked_for_write = sul.owns_lock();
3321
3322 if (!op_budget)
11fdf7f2 3323 op_budget = calc_op_budget(op->ops);
7c673cae
FG
3324 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3325 sul.unlock();
3326 op_throttle_bytes.get(op_budget);
3327 if (locked_for_write)
3328 sul.lock();
3329 else
3330 sul.lock_shared();
3331 }
3332 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3333 sul.unlock();
3334 op_throttle_ops.get(1);
3335 if (locked_for_write)
3336 sul.lock();
3337 else
3338 sul.lock_shared();
3339 }
3340}
3341
11fdf7f2 3342int Objecter::take_linger_budget(LingerOp *info)
7c673cae 3343{
11fdf7f2 3344 return 1;
7c673cae
FG
3345}
3346
3347/* This function DOES put the passed message before returning */
3348void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3349{
3350 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3351
3352 // get pio
3353 ceph_tid_t tid = m->get_tid();
3354
3355 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3356 if (!initialized) {
7c673cae
FG
3357 m->put();
3358 return;
3359 }
3360
3361 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3362 auto priv = con->get_priv();
3363 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3364 if (!s || s->con != con) {
3365 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3366 m->put();
3367 return;
3368 }
3369
3370 OSDSession::unique_lock sl(s->lock);
3371
3372 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3373 if (iter == s->ops.end()) {
3374 ldout(cct, 7) << "handle_osd_op_reply " << tid
3375 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3376 " onnvram" : " ack"))
3377 << " ... stray" << dendl;
3378 sl.unlock();
7c673cae
FG
3379 m->put();
3380 return;
3381 }
3382
3383 ldout(cct, 7) << "handle_osd_op_reply " << tid
3384 << (m->is_ondisk() ? " ondisk" :
3385 (m->is_onnvram() ? " onnvram" : " ack"))
3386 << " uv " << m->get_user_version()
3387 << " in " << m->get_pg()
3388 << " attempt " << m->get_retry_attempt()
3389 << dendl;
3390 Op *op = iter->second;
31f18b77 3391 op->trace.event("osd op reply");
7c673cae
FG
3392
3393 if (retry_writes_after_first_reply && op->attempts == 1 &&
3394 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3395 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3396 if (op->onfinish) {
31f18b77 3397 num_in_flight--;
7c673cae
FG
3398 }
3399 _session_op_remove(s, op);
3400 sl.unlock();
7c673cae
FG
3401
3402 _op_submit(op, sul, NULL);
3403 m->put();
3404 return;
3405 }
3406
3407 if (m->get_retry_attempt() >= 0) {
3408 if (m->get_retry_attempt() != (op->attempts - 1)) {
3409 ldout(cct, 7) << " ignoring reply from attempt "
3410 << m->get_retry_attempt()
3411 << " from " << m->get_source_inst()
3412 << "; last attempt " << (op->attempts - 1) << " sent to "
3413 << op->session->con->get_peer_addr() << dendl;
3414 m->put();
3415 sl.unlock();
7c673cae
FG
3416 return;
3417 }
3418 } else {
3419 // we don't know the request attempt because the server is old, so
3420 // just accept this one. we may do ACK callbacks we shouldn't
3421 // have, but that is better than doing callbacks out of order.
3422 }
3423
3424 Context *onfinish = 0;
3425
3426 int rc = m->get_result();
3427
3428 if (m->is_redirect_reply()) {
3429 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3430 if (op->onfinish)
31f18b77 3431 num_in_flight--;
7c673cae
FG
3432 _session_op_remove(s, op);
3433 sl.unlock();
7c673cae
FG
3434
3435 // FIXME: two redirects could race and reorder
3436
3437 op->tid = 0;
3438 m->get_redirect().combine_with_locator(op->target.target_oloc,
3439 op->target.target_oid.name);
f64942e4
AA
3440 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3441 CEPH_OSD_FLAG_IGNORE_CACHE |
3442 CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3443 _op_submit(op, sul, NULL);
3444 m->put();
3445 return;
3446 }
3447
3448 if (rc == -EAGAIN) {
3449 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
c07f9fc5
FG
3450 if (op->onfinish)
3451 num_in_flight--;
3452 _session_op_remove(s, op);
7c673cae 3453 sl.unlock();
c07f9fc5
FG
3454
3455 op->tid = 0;
3456 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3457 CEPH_OSD_FLAG_LOCALIZE_READS);
3458 op->target.pgid = pg_t();
3459 _op_submit(op, sul, NULL);
7c673cae
FG
3460 m->put();
3461 return;
3462 }
3463
3464 sul.unlock();
3465
3466 if (op->objver)
3467 *op->objver = m->get_user_version();
3468 if (op->reply_epoch)
3469 *op->reply_epoch = m->get_map_epoch();
3470 if (op->data_offset)
3471 *op->data_offset = m->get_header().data_off;
3472
3473 // got data?
3474 if (op->outbl) {
11fdf7f2 3475#if 0
7c673cae
FG
3476 if (op->con)
3477 op->con->revoke_rx_buffer(op->tid);
11fdf7f2
TL
3478#endif
3479 auto& bl = m->get_data();
3480 if (op->outbl->length() == bl.length() &&
3481 bl.get_num_buffers() <= 1) {
3482 // this is here to keep previous users to *relied* on getting data
3483 // read into existing buffers happy. Notably,
3484 // libradosstriper::RadosStriperImpl::aio_read().
3485 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
9f95a23c 3486 << " into existing ceph::buffer of length " << op->outbl->length()
11fdf7f2 3487 << dendl;
9f95a23c 3488 ceph::buffer::list t;
11fdf7f2
TL
3489 t.claim(*op->outbl);
3490 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
9f95a23c 3491 bl.begin().copy(bl.length(), t.c_str());
11fdf7f2
TL
3492 op->outbl->substr_of(t, 0, bl.length());
3493 } else {
3494 m->claim_data(*op->outbl);
3495 }
7c673cae
FG
3496 op->outbl = 0;
3497 }
3498
3499 // per-op result demuxing
3500 vector<OSDOp> out_ops;
3501 m->claim_ops(out_ops);
3502
3503 if (out_ops.size() != op->ops.size())
3504 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3505 << " != request ops " << op->ops
3506 << " from " << m->get_source_inst() << dendl;
3507
9f95a23c 3508 vector<ceph::buffer::list*>::iterator pb = op->out_bl.begin();
7c673cae
FG
3509 vector<int*>::iterator pr = op->out_rval.begin();
3510 vector<Context*>::iterator ph = op->out_handler.begin();
11fdf7f2
TL
3511 ceph_assert(op->out_bl.size() == op->out_rval.size());
3512 ceph_assert(op->out_bl.size() == op->out_handler.size());
7c673cae
FG
3513 vector<OSDOp>::iterator p = out_ops.begin();
3514 for (unsigned i = 0;
3515 p != out_ops.end() && pb != op->out_bl.end();
3516 ++i, ++p, ++pb, ++pr, ++ph) {
3517 ldout(cct, 10) << " op " << i << " rval " << p->rval
3518 << " len " << p->outdata.length() << dendl;
3519 if (*pb)
3520 **pb = p->outdata;
3521 // set rval before running handlers so that handlers
3522 // can change it if e.g. decoding fails
3523 if (*pr)
31f18b77 3524 **pr = ceph_to_hostos_errno(p->rval);
7c673cae
FG
3525 if (*ph) {
3526 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
31f18b77 3527 (*ph)->complete(ceph_to_hostos_errno(p->rval));
7c673cae
FG
3528 *ph = NULL;
3529 }
3530 }
3531
3532 // NOTE: we assume that since we only request ONDISK ever we will
3533 // only ever get back one (type of) ack ever.
3534
3535 if (op->onfinish) {
31f18b77 3536 num_in_flight--;
7c673cae
FG
3537 onfinish = op->onfinish;
3538 op->onfinish = NULL;
3539 }
3540 logger->inc(l_osdc_op_reply);
3541
3542 /* get it before we call _finish_op() */
3543 auto completion_lock = s->get_lock(op->target.base_oid);
3544
3545 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3546 _finish_op(op, 0);
3547
31f18b77 3548 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3549
3550 // serialize completions
3551 if (completion_lock.mutex()) {
3552 completion_lock.lock();
3553 }
3554 sl.unlock();
3555
3556 // do callbacks
3557 if (onfinish) {
3558 onfinish->complete(rc);
3559 }
3560 if (completion_lock.mutex()) {
3561 completion_lock.unlock();
3562 }
3563
3564 m->put();
7c673cae
FG
3565}
3566
3567void Objecter::handle_osd_backoff(MOSDBackoff *m)
3568{
3569 ldout(cct, 10) << __func__ << " " << *m << dendl;
3570 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3571 if (!initialized) {
7c673cae
FG
3572 m->put();
3573 return;
3574 }
3575
3576 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3577 auto priv = con->get_priv();
3578 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3579 if (!s || s->con != con) {
3580 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3581 m->put();
3582 return;
3583 }
3584
3585 get_session(s);
7c673cae
FG
3586
3587 OSDSession::unique_lock sl(s->lock);
3588
3589 switch (m->op) {
3590 case CEPH_OSD_BACKOFF_OP_BLOCK:
3591 {
3592 // register
3593 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3594 s->backoffs_by_id.insert(make_pair(m->id, &b));
3595 b.pgid = m->pgid;
3596 b.id = m->id;
3597 b.begin = m->begin;
3598 b.end = m->end;
3599
3600 // ack with original backoff's epoch so that the osd can discard this if
3601 // there was a pg split.
3602 Message *r = new MOSDBackoff(m->pgid,
3603 m->map_epoch,
3604 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3605 m->id, m->begin, m->end);
3606 // this priority must match the MOSDOps from _prepare_osd_op
3607 r->set_priority(cct->_conf->osd_client_op_priority);
3608 con->send_message(r);
3609 }
3610 break;
3611
3612 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3613 {
3614 auto p = s->backoffs_by_id.find(m->id);
3615 if (p != s->backoffs_by_id.end()) {
3616 OSDBackoff *b = p->second;
3617 if (b->begin != m->begin &&
3618 b->end != m->end) {
3619 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3620 << " unblock on ["
3621 << m->begin << "," << m->end << ") but backoff is ["
3622 << b->begin << "," << b->end << ")" << dendl;
3623 // hrmpf, unblock it anyway.
3624 }
3625 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3626 << " id " << b->id
3627 << " [" << b->begin << "," << b->end
3628 << ")" << dendl;
3629 auto spgp = s->backoffs.find(b->pgid);
11fdf7f2 3630 ceph_assert(spgp != s->backoffs.end());
7c673cae
FG
3631 spgp->second.erase(b->begin);
3632 if (spgp->second.empty()) {
3633 s->backoffs.erase(spgp);
3634 }
3635 s->backoffs_by_id.erase(p);
3636
3637 // check for any ops to resend
3638 for (auto& q : s->ops) {
3639 if (q.second->target.actual_pgid == m->pgid) {
3640 int r = q.second->target.contained_by(m->begin, m->end);
3641 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3642 << q.second->target.get_hobj() << dendl;
3643 if (r) {
3644 _send_op(q.second);
3645 }
3646 }
3647 }
3648 } else {
3649 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3650 << " unblock on ["
3651 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3652 }
3653 }
3654 break;
3655
3656 default:
3657 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3658 }
3659
3660 sul.unlock();
3661 sl.unlock();
3662
3663 m->put();
3664 put_session(s);
3665}
3666
3667uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3668 uint32_t pos)
3669{
3670 shared_lock rl(rwlock);
3671 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3672 pos, list_context->pool_id, string());
11fdf7f2 3673 ldout(cct, 10) << __func__ << " " << list_context
7c673cae
FG
3674 << " pos " << pos << " -> " << list_context->pos << dendl;
3675 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3676 list_context->current_pg = actual.ps();
3677 list_context->at_end_of_pool = false;
3678 return pos;
3679}
3680
3681uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3682 const hobject_t& cursor)
3683{
3684 shared_lock rl(rwlock);
3685 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3686 list_context->pos = cursor;
3687 list_context->at_end_of_pool = false;
3688 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3689 list_context->current_pg = actual.ps();
3690 list_context->sort_bitwise = true;
3691 return list_context->current_pg;
3692}
3693
3694void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3695 hobject_t *cursor)
3696{
3697 shared_lock rl(rwlock);
3698 if (list_context->list.empty()) {
3699 *cursor = list_context->pos;
3700 } else {
3701 const librados::ListObjectImpl& entry = list_context->list.front();
3702 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3703 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3704 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3705 }
3706}
3707
3708void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3709{
3710 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3711 << " pool_snap_seq " << list_context->pool_snap_seq
3712 << " max_entries " << list_context->max_entries
3713 << " list_context " << list_context
3714 << " onfinish " << onfinish
3715 << " current_pg " << list_context->current_pg
3716 << " pos " << list_context->pos << dendl;
3717
3718 shared_lock rl(rwlock);
3719 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3720 if (!pool) { // pool is gone
3721 rl.unlock();
3722 put_nlist_context_budget(list_context);
3723 onfinish->complete(-ENOENT);
3724 return;
3725 }
3726 int pg_num = pool->get_pg_num();
3727 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3728
3729 if (list_context->pos.is_min()) {
3730 list_context->starting_pg_num = 0;
3731 list_context->sort_bitwise = sort_bitwise;
3732 list_context->starting_pg_num = pg_num;
3733 }
3734 if (list_context->sort_bitwise != sort_bitwise) {
3735 list_context->pos = hobject_t(
3736 object_t(), string(), CEPH_NOSNAP,
3737 list_context->current_pg, list_context->pool_id, string());
3738 list_context->sort_bitwise = sort_bitwise;
3739 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3740 << list_context->pos << dendl;
3741 }
3742 if (list_context->starting_pg_num != pg_num) {
3743 if (!sort_bitwise) {
3744 // start reading from the beginning; the pgs have changed
3745 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3746 list_context->pos = collection_list_handle_t();
3747 }
3748 list_context->starting_pg_num = pg_num;
3749 }
3750
3751 if (list_context->pos.is_max()) {
3752 ldout(cct, 20) << __func__ << " end of pool, list "
3753 << list_context->list << dendl;
3754 if (list_context->list.empty()) {
3755 list_context->at_end_of_pool = true;
3756 }
3757 // release the listing context's budget once all
3758 // OPs (in the session) are finished
3759 put_nlist_context_budget(list_context);
3760 onfinish->complete(0);
3761 return;
3762 }
3763
3764 ObjectOperation op;
3765 op.pg_nls(list_context->max_entries, list_context->filter,
3766 list_context->pos, osdmap->get_epoch());
3767 list_context->bl.clear();
3768 C_NList *onack = new C_NList(list_context, onfinish, this);
3769 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3770
3771 // note current_pg in case we don't have (or lose) SORTBITWISE
3772 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3773 rl.unlock();
3774
3775 pg_read(list_context->current_pg, oloc, op,
3776 &list_context->bl, 0, onack, &onack->epoch,
3777 &list_context->ctx_budget);
3778}
3779
3780void Objecter::_nlist_reply(NListContext *list_context, int r,
3781 Context *final_finish, epoch_t reply_epoch)
3782{
3783 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3784
11fdf7f2 3785 auto iter = list_context->bl.cbegin();
7c673cae 3786 pg_nls_response_t response;
11fdf7f2 3787 decode(response, iter);
7c673cae 3788 if (!iter.end()) {
9f95a23c
TL
3789 // we do this as legacy.
3790 ceph::buffer::list legacy_extra_info;
3791 decode(legacy_extra_info, iter);
7c673cae
FG
3792 }
3793
3794 // if the osd returns 1 (newer code), or handle MAX, it means we
3795 // hit the end of the pg.
3796 if ((response.handle.is_max() || r == 1) &&
3797 !list_context->sort_bitwise) {
3798 // legacy OSD and !sortbitwise, figure out the next PG on our own
3799 ++list_context->current_pg;
3800 if (list_context->current_pg == list_context->starting_pg_num) {
3801 // end of pool
3802 list_context->pos = hobject_t::get_max();
3803 } else {
3804 // next pg
3805 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3806 list_context->current_pg,
3807 list_context->pool_id, string());
3808 }
3809 } else {
3810 list_context->pos = response.handle;
3811 }
3812
3813 int response_size = response.entries.size();
3814 ldout(cct, 20) << " response.entries.size " << response_size
3815 << ", response.entries " << response.entries
3816 << ", handle " << response.handle
3817 << ", tentative new pos " << list_context->pos << dendl;
7c673cae
FG
3818 if (response_size) {
3819 list_context->list.splice(list_context->list.end(), response.entries);
3820 }
3821
3822 if (list_context->list.size() >= list_context->max_entries) {
3823 ldout(cct, 20) << " hit max, returning results so far, "
3824 << list_context->list << dendl;
3825 // release the listing context's budget once all
3826 // OPs (in the session) are finished
3827 put_nlist_context_budget(list_context);
3828 final_finish->complete(0);
3829 return;
3830 }
3831
3832 // continue!
3833 list_nobjects(list_context, final_finish);
3834}
3835
3836void Objecter::put_nlist_context_budget(NListContext *list_context)
3837{
3838 if (list_context->ctx_budget >= 0) {
3839 ldout(cct, 10) << " release listing context's budget " <<
3840 list_context->ctx_budget << dendl;
3841 put_op_budget_bytes(list_context->ctx_budget);
3842 list_context->ctx_budget = -1;
3843 }
3844}
3845
3846// snapshots
3847
3848int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3849 Context *onfinish)
3850{
3851 unique_lock wl(rwlock);
3852 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3853 << snap_name << dendl;
3854
3855 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3856 if (!p)
3857 return -EINVAL;
3858 if (p->snap_exists(snap_name.c_str()))
3859 return -EEXIST;
3860
3861 PoolOp *op = new PoolOp;
3862 if (!op)
3863 return -ENOMEM;
31f18b77 3864 op->tid = ++last_tid;
7c673cae
FG
3865 op->pool = pool;
3866 op->name = snap_name;
3867 op->onfinish = onfinish;
3868 op->pool_op = POOL_OP_CREATE_SNAP;
3869 pool_ops[op->tid] = op;
3870
3871 pool_op_submit(op);
3872
3873 return 0;
3874}
3875
3876struct C_SelfmanagedSnap : public Context {
9f95a23c 3877 ceph::buffer::list bl;
7c673cae
FG
3878 snapid_t *psnapid;
3879 Context *fin;
3880 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3881 void finish(int r) override {
3882 if (r == 0) {
11fdf7f2
TL
3883 try {
3884 auto p = bl.cbegin();
3885 decode(*psnapid, p);
9f95a23c 3886 } catch (ceph::buffer::error&) {
11fdf7f2
TL
3887 r = -EIO;
3888 }
7c673cae
FG
3889 }
3890 fin->complete(r);
3891 }
3892};
3893
3894int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3895 Context *onfinish)
3896{
3897 unique_lock wl(rwlock);
3898 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3899 PoolOp *op = new PoolOp;
3900 if (!op) return -ENOMEM;
31f18b77 3901 op->tid = ++last_tid;
7c673cae
FG
3902 op->pool = pool;
3903 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3904 op->onfinish = fin;
3905 op->blp = &fin->bl;
3906 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3907 pool_ops[op->tid] = op;
3908
3909 pool_op_submit(op);
3910 return 0;
3911}
3912
3913int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3914 Context *onfinish)
3915{
3916 unique_lock wl(rwlock);
3917 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3918 << snap_name << dendl;
3919
3920 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3921 if (!p)
3922 return -EINVAL;
3923 if (!p->snap_exists(snap_name.c_str()))
3924 return -ENOENT;
3925
3926 PoolOp *op = new PoolOp;
3927 if (!op)
3928 return -ENOMEM;
31f18b77 3929 op->tid = ++last_tid;
7c673cae
FG
3930 op->pool = pool;
3931 op->name = snap_name;
3932 op->onfinish = onfinish;
3933 op->pool_op = POOL_OP_DELETE_SNAP;
3934 pool_ops[op->tid] = op;
3935
3936 pool_op_submit(op);
3937
3938 return 0;
3939}
3940
3941int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3942 Context *onfinish)
3943{
3944 unique_lock wl(rwlock);
3945 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3946 << snap << dendl;
3947 PoolOp *op = new PoolOp;
3948 if (!op) return -ENOMEM;
31f18b77 3949 op->tid = ++last_tid;
7c673cae
FG
3950 op->pool = pool;
3951 op->onfinish = onfinish;
3952 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3953 op->snapid = snap;
3954 pool_ops[op->tid] = op;
3955
3956 pool_op_submit(op);
3957
3958 return 0;
3959}
3960
11fdf7f2 3961int Objecter::create_pool(string& name, Context *onfinish,
7c673cae
FG
3962 int crush_rule)
3963{
3964 unique_lock wl(rwlock);
3965 ldout(cct, 10) << "create_pool name=" << name << dendl;
3966
3967 if (osdmap->lookup_pg_pool_name(name) >= 0)
3968 return -EEXIST;
3969
3970 PoolOp *op = new PoolOp;
3971 if (!op)
3972 return -ENOMEM;
31f18b77 3973 op->tid = ++last_tid;
7c673cae
FG
3974 op->pool = 0;
3975 op->name = name;
3976 op->onfinish = onfinish;
3977 op->pool_op = POOL_OP_CREATE;
3978 pool_ops[op->tid] = op;
7c673cae
FG
3979 op->crush_rule = crush_rule;
3980
3981 pool_op_submit(op);
3982
3983 return 0;
3984}
3985
3986int Objecter::delete_pool(int64_t pool, Context *onfinish)
3987{
3988 unique_lock wl(rwlock);
3989 ldout(cct, 10) << "delete_pool " << pool << dendl;
3990
3991 if (!osdmap->have_pg_pool(pool))
3992 return -ENOENT;
3993
3994 _do_delete_pool(pool, onfinish);
3995 return 0;
3996}
3997
3998int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3999{
4000 unique_lock wl(rwlock);
4001 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
4002
4003 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
4004 if (pool < 0)
4005 return pool;
4006
4007 _do_delete_pool(pool, onfinish);
4008 return 0;
4009}
4010
4011void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
4012{
4013 PoolOp *op = new PoolOp;
31f18b77 4014 op->tid = ++last_tid;
7c673cae
FG
4015 op->pool = pool;
4016 op->name = "delete";
4017 op->onfinish = onfinish;
4018 op->pool_op = POOL_OP_DELETE;
4019 pool_ops[op->tid] = op;
4020 pool_op_submit(op);
4021}
4022
7c673cae
FG
4023void Objecter::pool_op_submit(PoolOp *op)
4024{
4025 // rwlock is locked
4026 if (mon_timeout > timespan(0)) {
4027 op->ontimeout = timer.add_event(mon_timeout,
4028 [this, op]() {
4029 pool_op_cancel(op->tid, -ETIMEDOUT); });
4030 }
4031 _pool_op_submit(op);
4032}
4033
4034void Objecter::_pool_op_submit(PoolOp *op)
4035{
4036 // rwlock is locked unique
4037
4038 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4039 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4040 op->name, op->pool_op,
11fdf7f2 4041 last_seen_osdmap_version);
7c673cae
FG
4042 if (op->snapid) m->snapid = op->snapid;
4043 if (op->crush_rule) m->crush_rule = op->crush_rule;
4044 monc->send_mon_message(m);
11fdf7f2 4045 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4046
4047 logger->inc(l_osdc_poolop_send);
4048}
4049
4050/**
4051 * Handle a reply to a PoolOp message. Check that we sent the message
9f95a23c 4052 * and give the caller responsibility for the returned ceph::buffer::list.
7c673cae
FG
4053 * Then either call the finisher or stash the PoolOp, depending on if we
4054 * have a new enough map.
4055 * Lastly, clean up the message and PoolOp.
4056 */
4057void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4058{
11fdf7f2 4059 FUNCTRACE(cct);
7c673cae 4060 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4061 if (!initialized) {
7c673cae
FG
4062 sul.unlock();
4063 m->put();
4064 return;
4065 }
4066
4067 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4068 ceph_tid_t tid = m->get_tid();
4069 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4070 if (iter != pool_ops.end()) {
4071 PoolOp *op = iter->second;
4072 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4073 << ceph_pool_op_name(op->pool_op) << dendl;
4074 if (op->blp)
4075 op->blp->claim(m->response_data);
4076 if (m->version > last_seen_osdmap_version)
4077 last_seen_osdmap_version = m->version;
4078 if (osdmap->get_epoch() < m->epoch) {
4079 sul.unlock();
4080 sul.lock();
4081 // recheck op existence since we have let go of rwlock
4082 // (for promotion) above.
4083 iter = pool_ops.find(tid);
4084 if (iter == pool_ops.end())
4085 goto done; // op is gone.
4086 if (osdmap->get_epoch() < m->epoch) {
4087 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4088 << " before calling back" << dendl;
4089 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4090 } else {
4091 // map epoch changed, probably because a MOSDMap message
4092 // sneaked in. Do caller-specified callback now or else
4093 // we lose it forever.
11fdf7f2 4094 ceph_assert(op->onfinish);
7c673cae
FG
4095 op->onfinish->complete(m->replyCode);
4096 }
4097 } else {
11fdf7f2 4098 ceph_assert(op->onfinish);
7c673cae
FG
4099 op->onfinish->complete(m->replyCode);
4100 }
4101 op->onfinish = NULL;
4102 if (!sul.owns_lock()) {
4103 sul.unlock();
4104 sul.lock();
4105 }
4106 iter = pool_ops.find(tid);
4107 if (iter != pool_ops.end()) {
4108 _finish_pool_op(op, 0);
4109 }
4110 } else {
4111 ldout(cct, 10) << "unknown request " << tid << dendl;
4112 }
4113
4114done:
4115 // Not strictly necessary, since we'll release it on return.
4116 sul.unlock();
4117
4118 ldout(cct, 10) << "done" << dendl;
4119 m->put();
4120}
4121
4122int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4123{
11fdf7f2 4124 ceph_assert(initialized);
7c673cae
FG
4125
4126 unique_lock wl(rwlock);
4127
4128 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4129 if (it == pool_ops.end()) {
4130 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4131 return -ENOENT;
4132 }
4133
4134 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4135
4136 PoolOp *op = it->second;
4137 if (op->onfinish)
4138 op->onfinish->complete(r);
4139
4140 _finish_pool_op(op, r);
4141 return 0;
4142}
4143
4144void Objecter::_finish_pool_op(PoolOp *op, int r)
4145{
4146 // rwlock is locked unique
4147 pool_ops.erase(op->tid);
4148 logger->set(l_osdc_poolop_active, pool_ops.size());
4149
4150 if (op->ontimeout && r != -ETIMEDOUT) {
4151 timer.cancel_event(op->ontimeout);
4152 }
4153
4154 delete op;
4155}
4156
4157// pool stats
4158
4159void Objecter::get_pool_stats(list<string>& pools,
4160 map<string,pool_stat_t> *result,
81eedcae 4161 bool *per_pool,
7c673cae
FG
4162 Context *onfinish)
4163{
4164 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4165
4166 PoolStatOp *op = new PoolStatOp;
31f18b77 4167 op->tid = ++last_tid;
7c673cae
FG
4168 op->pools = pools;
4169 op->pool_stats = result;
81eedcae 4170 op->per_pool = per_pool;
7c673cae
FG
4171 op->onfinish = onfinish;
4172 if (mon_timeout > timespan(0)) {
4173 op->ontimeout = timer.add_event(mon_timeout,
4174 [this, op]() {
4175 pool_stat_op_cancel(op->tid,
4176 -ETIMEDOUT); });
4177 } else {
4178 op->ontimeout = 0;
4179 }
4180
4181 unique_lock wl(rwlock);
4182
4183 poolstat_ops[op->tid] = op;
4184
4185 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4186
4187 _poolstat_submit(op);
4188}
4189
4190void Objecter::_poolstat_submit(PoolStatOp *op)
4191{
4192 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4193 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4194 op->pools,
4195 last_seen_pgmap_version));
11fdf7f2 4196 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4197
4198 logger->inc(l_osdc_poolstat_send);
4199}
4200
4201void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4202{
4203 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4204 ceph_tid_t tid = m->get_tid();
4205
4206 unique_lock wl(rwlock);
31f18b77 4207 if (!initialized) {
7c673cae
FG
4208 m->put();
4209 return;
4210 }
4211
4212 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4213 if (iter != poolstat_ops.end()) {
4214 PoolStatOp *op = poolstat_ops[tid];
4215 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4216 *op->pool_stats = m->pool_stats;
81eedcae 4217 *op->per_pool = m->per_pool;
7c673cae
FG
4218 if (m->version > last_seen_pgmap_version) {
4219 last_seen_pgmap_version = m->version;
4220 }
4221 op->onfinish->complete(0);
4222 _finish_pool_stat_op(op, 0);
4223 } else {
4224 ldout(cct, 10) << "unknown request " << tid << dendl;
4225 }
4226 ldout(cct, 10) << "done" << dendl;
4227 m->put();
4228}
4229
4230int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4231{
11fdf7f2 4232 ceph_assert(initialized);
7c673cae
FG
4233
4234 unique_lock wl(rwlock);
4235
4236 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4237 if (it == poolstat_ops.end()) {
4238 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4239 return -ENOENT;
4240 }
4241
4242 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4243
4244 PoolStatOp *op = it->second;
4245 if (op->onfinish)
4246 op->onfinish->complete(r);
4247 _finish_pool_stat_op(op, r);
4248 return 0;
4249}
4250
4251void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4252{
4253 // rwlock is locked unique
4254
4255 poolstat_ops.erase(op->tid);
4256 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4257
4258 if (op->ontimeout && r != -ETIMEDOUT)
4259 timer.cancel_event(op->ontimeout);
4260
4261 delete op;
4262}
4263
d2e6a577
FG
4264void Objecter::get_fs_stats(ceph_statfs& result,
4265 boost::optional<int64_t> data_pool,
4266 Context *onfinish)
7c673cae
FG
4267{
4268 ldout(cct, 10) << "get_fs_stats" << dendl;
4269 unique_lock l(rwlock);
4270
4271 StatfsOp *op = new StatfsOp;
31f18b77 4272 op->tid = ++last_tid;
7c673cae 4273 op->stats = &result;
d2e6a577 4274 op->data_pool = data_pool;
7c673cae
FG
4275 op->onfinish = onfinish;
4276 if (mon_timeout > timespan(0)) {
4277 op->ontimeout = timer.add_event(mon_timeout,
4278 [this, op]() {
4279 statfs_op_cancel(op->tid,
4280 -ETIMEDOUT); });
4281 } else {
4282 op->ontimeout = 0;
4283 }
4284 statfs_ops[op->tid] = op;
4285
4286 logger->set(l_osdc_statfs_active, statfs_ops.size());
4287
4288 _fs_stats_submit(op);
4289}
4290
4291void Objecter::_fs_stats_submit(StatfsOp *op)
4292{
4293 // rwlock is locked unique
4294
4295 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4296 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4297 op->data_pool,
7c673cae 4298 last_seen_pgmap_version));
11fdf7f2 4299 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4300
4301 logger->inc(l_osdc_statfs_send);
4302}
4303
4304void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4305{
4306 unique_lock wl(rwlock);
31f18b77 4307 if (!initialized) {
7c673cae
FG
4308 m->put();
4309 return;
4310 }
4311
4312 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4313 ceph_tid_t tid = m->get_tid();
4314
4315 if (statfs_ops.count(tid)) {
4316 StatfsOp *op = statfs_ops[tid];
4317 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4318 *(op->stats) = m->h.st;
4319 if (m->h.version > last_seen_pgmap_version)
4320 last_seen_pgmap_version = m->h.version;
4321 op->onfinish->complete(0);
4322 _finish_statfs_op(op, 0);
4323 } else {
4324 ldout(cct, 10) << "unknown request " << tid << dendl;
4325 }
4326 m->put();
4327 ldout(cct, 10) << "done" << dendl;
4328}
4329
4330int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4331{
11fdf7f2 4332 ceph_assert(initialized);
7c673cae
FG
4333
4334 unique_lock wl(rwlock);
4335
4336 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4337 if (it == statfs_ops.end()) {
4338 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4339 return -ENOENT;
4340 }
4341
4342 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4343
4344 StatfsOp *op = it->second;
4345 if (op->onfinish)
4346 op->onfinish->complete(r);
4347 _finish_statfs_op(op, r);
4348 return 0;
4349}
4350
4351void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4352{
4353 // rwlock is locked unique
4354
4355 statfs_ops.erase(op->tid);
4356 logger->set(l_osdc_statfs_active, statfs_ops.size());
4357
4358 if (op->ontimeout && r != -ETIMEDOUT)
4359 timer.cancel_event(op->ontimeout);
4360
4361 delete op;
4362}
4363
4364// scatter/gather
4365
4366void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
9f95a23c
TL
4367 vector<ceph::buffer::list>& resultbl,
4368 ceph::buffer::list *bl, Context *onfinish)
7c673cae
FG
4369{
4370 // all done
4371 ldout(cct, 15) << "_sg_read_finish" << dendl;
4372
4373 if (extents.size() > 1) {
4374 Striper::StripedReadResult r;
9f95a23c 4375 vector<ceph::buffer::list>::iterator bit = resultbl.begin();
7c673cae
FG
4376 for (vector<ObjectExtent>::iterator eit = extents.begin();
4377 eit != extents.end();
4378 ++eit, ++bit) {
4379 r.add_partial_result(cct, *bit, eit->buffer_extents);
4380 }
4381 bl->clear();
4382 r.assemble_result(cct, *bl, false);
4383 } else {
4384 ldout(cct, 15) << " only one frag" << dendl;
4385 bl->claim(resultbl[0]);
4386 }
4387
4388 // done
4389 uint64_t bytes_read = bl->length();
4390 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4391
4392 if (onfinish) {
4393 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4394 }
4395}
4396
4397
4398void Objecter::ms_handle_connect(Connection *con)
4399{
4400 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4401 if (!initialized)
7c673cae
FG
4402 return;
4403
4404 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4405 resend_mon_ops();
4406}
4407
4408bool Objecter::ms_handle_reset(Connection *con)
4409{
31f18b77 4410 if (!initialized)
7c673cae
FG
4411 return false;
4412 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
91327a77 4413 unique_lock wl(rwlock);
11fdf7f2
TL
4414
4415 auto priv = con->get_priv();
4416 auto session = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4417 if (session) {
4418 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4419 << " osd." << session->osd << dendl;
a8e16298
TL
4420 // the session maybe had been closed if new osdmap just handled
4421 // says the osd down
4422 if (!(initialized && osdmap->is_up(session->osd))) {
4423 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
7c673cae
FG
4424 wl.unlock();
4425 return false;
4426 }
4427 map<uint64_t, LingerOp *> lresend;
4428 OSDSession::unique_lock sl(session->lock);
4429 _reopen_session(session);
4430 _kick_requests(session, lresend);
4431 sl.unlock();
4432 _linger_ops_resend(lresend, wl);
4433 wl.unlock();
4434 maybe_request_map();
7c673cae
FG
4435 }
4436 return true;
4437 }
4438 return false;
4439}
4440
4441void Objecter::ms_handle_remote_reset(Connection *con)
4442{
4443 /*
4444 * treat these the same.
4445 */
4446 ms_handle_reset(con);
4447}
4448
4449bool Objecter::ms_handle_refused(Connection *con)
4450{
4451 // just log for now
4452 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4453 int osd = osdmap->identify_osd(con->get_peer_addr());
4454 if (osd >= 0) {
4455 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4456 }
4457 }
4458 return false;
4459}
4460
7c673cae
FG
4461void Objecter::op_target_t::dump(Formatter *f) const
4462{
4463 f->dump_stream("pg") << pgid;
4464 f->dump_int("osd", osd);
4465 f->dump_stream("object_id") << base_oid;
4466 f->dump_stream("object_locator") << base_oloc;
4467 f->dump_stream("target_object_id") << target_oid;
4468 f->dump_stream("target_object_locator") << target_oloc;
4469 f->dump_int("paused", (int)paused);
4470 f->dump_int("used_replica", (int)used_replica);
4471 f->dump_int("precalc_pgid", (int)precalc_pgid);
4472}
4473
4474void Objecter::_dump_active(OSDSession *s)
4475{
4476 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4477 p != s->ops.end();
4478 ++p) {
4479 Op *op = p->second;
4480 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4481 << "\tosd." << (op->session ? op->session->osd : -1)
4482 << "\t" << op->target.base_oid
4483 << "\t" << op->ops << dendl;
4484 }
4485}
4486
4487void Objecter::_dump_active()
4488{
31f18b77 4489 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae
FG
4490 << dendl;
4491 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4492 siter != osd_sessions.end(); ++siter) {
4493 OSDSession *s = siter->second;
4494 OSDSession::shared_lock sl(s->lock);
4495 _dump_active(s);
4496 sl.unlock();
4497 }
4498 _dump_active(homeless_session);
4499}
4500
4501void Objecter::dump_active()
4502{
4503 shared_lock rl(rwlock);
4504 _dump_active();
4505 rl.unlock();
4506}
4507
4508void Objecter::dump_requests(Formatter *fmt)
4509{
4510 // Read-lock on Objecter held here
4511 fmt->open_object_section("requests");
4512 dump_ops(fmt);
4513 dump_linger_ops(fmt);
4514 dump_pool_ops(fmt);
4515 dump_pool_stat_ops(fmt);
4516 dump_statfs_ops(fmt);
4517 dump_command_ops(fmt);
4518 fmt->close_section(); // requests object
4519}
4520
4521void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4522{
4523 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4524 p != s->ops.end();
4525 ++p) {
4526 Op *op = p->second;
92f5a8d4 4527 auto age = std::chrono::duration<double>(coarse_mono_clock::now() - op->stamp);
7c673cae
FG
4528 fmt->open_object_section("op");
4529 fmt->dump_unsigned("tid", op->tid);
4530 op->target.dump(fmt);
4531 fmt->dump_stream("last_sent") << op->stamp;
92f5a8d4 4532 fmt->dump_float("age", age.count());
7c673cae
FG
4533 fmt->dump_int("attempts", op->attempts);
4534 fmt->dump_stream("snapid") << op->snapid;
4535 fmt->dump_stream("snap_context") << op->snapc;
4536 fmt->dump_stream("mtime") << op->mtime;
4537
4538 fmt->open_array_section("osd_ops");
4539 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4540 it != op->ops.end();
4541 ++it) {
4542 fmt->dump_stream("osd_op") << *it;
4543 }
4544 fmt->close_section(); // osd_ops array
4545
4546 fmt->close_section(); // op object
4547 }
4548}
4549
4550void Objecter::dump_ops(Formatter *fmt)
4551{
4552 // Read-lock on Objecter held
4553 fmt->open_array_section("ops");
4554 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4555 siter != osd_sessions.end(); ++siter) {
4556 OSDSession *s = siter->second;
4557 OSDSession::shared_lock sl(s->lock);
4558 _dump_ops(s, fmt);
4559 sl.unlock();
4560 }
4561 _dump_ops(homeless_session, fmt);
4562 fmt->close_section(); // ops array
4563}
4564
4565void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4566{
4567 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4568 p != s->linger_ops.end();
4569 ++p) {
4570 LingerOp *op = p->second;
4571 fmt->open_object_section("linger_op");
4572 fmt->dump_unsigned("linger_id", op->linger_id);
4573 op->target.dump(fmt);
4574 fmt->dump_stream("snapid") << op->snap;
4575 fmt->dump_stream("registered") << op->registered;
4576 fmt->close_section(); // linger_op object
4577 }
4578}
4579
4580void Objecter::dump_linger_ops(Formatter *fmt)
4581{
4582 // We have a read-lock on the objecter
4583 fmt->open_array_section("linger_ops");
4584 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4585 siter != osd_sessions.end(); ++siter) {
4586 OSDSession *s = siter->second;
4587 OSDSession::shared_lock sl(s->lock);
4588 _dump_linger_ops(s, fmt);
4589 sl.unlock();
4590 }
4591 _dump_linger_ops(homeless_session, fmt);
4592 fmt->close_section(); // linger_ops array
4593}
4594
4595void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4596{
4597 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4598 p != s->command_ops.end();
4599 ++p) {
4600 CommandOp *op = p->second;
4601 fmt->open_object_section("command_op");
4602 fmt->dump_unsigned("command_id", op->tid);
4603 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4604 fmt->open_array_section("command");
4605 for (vector<string>::const_iterator q = op->cmd.begin();
4606 q != op->cmd.end(); ++q)
4607 fmt->dump_string("word", *q);
4608 fmt->close_section();
4609 if (op->target_osd >= 0)
4610 fmt->dump_int("target_osd", op->target_osd);
4611 else
4612 fmt->dump_stream("target_pg") << op->target_pg;
4613 fmt->close_section(); // command_op object
4614 }
4615}
4616
4617void Objecter::dump_command_ops(Formatter *fmt)
4618{
4619 // We have a read-lock on the Objecter here
4620 fmt->open_array_section("command_ops");
4621 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4622 siter != osd_sessions.end(); ++siter) {
4623 OSDSession *s = siter->second;
4624 OSDSession::shared_lock sl(s->lock);
4625 _dump_command_ops(s, fmt);
4626 sl.unlock();
4627 }
4628 _dump_command_ops(homeless_session, fmt);
4629 fmt->close_section(); // command_ops array
4630}
4631
4632void Objecter::dump_pool_ops(Formatter *fmt) const
4633{
4634 fmt->open_array_section("pool_ops");
4635 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4636 p != pool_ops.end();
4637 ++p) {
4638 PoolOp *op = p->second;
4639 fmt->open_object_section("pool_op");
4640 fmt->dump_unsigned("tid", op->tid);
4641 fmt->dump_int("pool", op->pool);
4642 fmt->dump_string("name", op->name);
4643 fmt->dump_int("operation_type", op->pool_op);
7c673cae
FG
4644 fmt->dump_unsigned("crush_rule", op->crush_rule);
4645 fmt->dump_stream("snapid") << op->snapid;
4646 fmt->dump_stream("last_sent") << op->last_submit;
4647 fmt->close_section(); // pool_op object
4648 }
4649 fmt->close_section(); // pool_ops array
4650}
4651
4652void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4653{
4654 fmt->open_array_section("pool_stat_ops");
4655 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4656 p != poolstat_ops.end();
4657 ++p) {
4658 PoolStatOp *op = p->second;
4659 fmt->open_object_section("pool_stat_op");
4660 fmt->dump_unsigned("tid", op->tid);
4661 fmt->dump_stream("last_sent") << op->last_submit;
4662
4663 fmt->open_array_section("pools");
4664 for (list<string>::const_iterator it = op->pools.begin();
4665 it != op->pools.end();
4666 ++it) {
4667 fmt->dump_string("pool", *it);
4668 }
4669 fmt->close_section(); // pools array
4670
4671 fmt->close_section(); // pool_stat_op object
4672 }
4673 fmt->close_section(); // pool_stat_ops array
4674}
4675
4676void Objecter::dump_statfs_ops(Formatter *fmt) const
4677{
4678 fmt->open_array_section("statfs_ops");
4679 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4680 p != statfs_ops.end();
4681 ++p) {
4682 StatfsOp *op = p->second;
4683 fmt->open_object_section("statfs_op");
4684 fmt->dump_unsigned("tid", op->tid);
4685 fmt->dump_stream("last_sent") << op->last_submit;
4686 fmt->close_section(); // statfs_op object
4687 }
4688 fmt->close_section(); // statfs_ops array
4689}
4690
4691Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4692 m_objecter(objecter)
4693{
4694}
4695
9f95a23c
TL
4696int Objecter::RequestStateHook::call(std::string_view command,
4697 const cmdmap_t& cmdmap,
4698 Formatter *f,
4699 std::ostream& ss,
4700 ceph::buffer::list& out)
7c673cae 4701{
7c673cae
FG
4702 shared_lock rl(m_objecter->rwlock);
4703 m_objecter->dump_requests(f);
9f95a23c 4704 return 0;
7c673cae
FG
4705}
4706
4707void Objecter::blacklist_self(bool set)
4708{
4709 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4710
4711 vector<string> cmd;
4712 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4713 if (set)
4714 cmd.push_back("\"blacklistop\":\"add\",");
4715 else
4716 cmd.push_back("\"blacklistop\":\"rm\",");
4717 stringstream ss;
11fdf7f2
TL
4718 // this is somewhat imprecise in that we are blacklisting our first addr only
4719 ss << messenger->get_myaddrs().front().get_legacy_str();
7c673cae
FG
4720 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4721
4722 MMonCommand *m = new MMonCommand(monc->get_fsid());
4723 m->cmd = cmd;
4724
4725 monc->send_mon_message(m);
4726}
4727
4728// commands
4729
4730void Objecter::handle_command_reply(MCommandReply *m)
4731{
4732 unique_lock wl(rwlock);
31f18b77 4733 if (!initialized) {
7c673cae
FG
4734 m->put();
4735 return;
4736 }
4737
4738 ConnectionRef con = m->get_connection();
11fdf7f2
TL
4739 auto priv = con->get_priv();
4740 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4741 if (!s || s->con != con) {
4742 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4743 m->put();
7c673cae
FG
4744 return;
4745 }
4746
4747 OSDSession::shared_lock sl(s->lock);
4748 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4749 if (p == s->command_ops.end()) {
4750 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4751 << " not found" << dendl;
4752 m->put();
4753 sl.unlock();
7c673cae
FG
4754 return;
4755 }
4756
4757 CommandOp *c = p->second;
4758 if (!c->session ||
4759 m->get_connection() != c->session->con) {
4760 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4761 << " got reply from wrong connection "
4762 << m->get_connection() << " " << m->get_source_inst()
4763 << dendl;
4764 m->put();
4765 sl.unlock();
7c673cae
FG
4766 return;
4767 }
9f95a23c
TL
4768 if (m->r == -EAGAIN) {
4769 ldout(cct,10) << __func__ << " tid " << m->get_tid()
4770 << " got EAGAIN, requesting map and resending" << dendl;
4771 // NOTE: This might resend twice... once now, and once again when
4772 // we get an updated osdmap and the PG is found to have moved.
4773 _maybe_request_map();
4774 _send_command(c);
4775 m->put();
4776 sl.unlock();
4777 return;
4778 }
4779
7c673cae
FG
4780 if (c->poutbl) {
4781 c->poutbl->claim(m->get_data());
4782 }
4783
4784 sl.unlock();
4785
28e407b8 4786 OSDSession::unique_lock sul(s->lock);
7c673cae 4787 _finish_command(c, m->r, m->rs);
28e407b8
AA
4788 sul.unlock();
4789
7c673cae 4790 m->put();
7c673cae
FG
4791}
4792
4793void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4794{
4795 shunique_lock sul(rwlock, ceph::acquire_unique);
4796
31f18b77 4797 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4798 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4799 c->tid = tid;
4800
4801 {
4802 OSDSession::unique_lock hs_wl(homeless_session->lock);
4803 _session_command_op_assign(homeless_session, c);
4804 }
4805
4806 _calc_command_target(c, sul);
4807 _assign_command_session(c, sul);
4808 if (osd_timeout > timespan(0)) {
4809 c->ontimeout = timer.add_event(osd_timeout,
4810 [this, c, tid]() {
4811 command_op_cancel(c->session, tid,
4812 -ETIMEDOUT); });
4813 }
4814
4815 if (!c->session->is_homeless()) {
4816 _send_command(c);
4817 } else {
4818 _maybe_request_map();
4819 }
4820 if (c->map_check_error)
4821 _send_command_map_check(c);
4822 *ptid = tid;
4823
4824 logger->inc(l_osdc_command_active);
4825}
4826
4827int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4828{
11fdf7f2 4829 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4830
4831 c->map_check_error = 0;
4832
4833 // ignore overlays, just like we do with pg ops
4834 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4835
4836 if (c->target_osd >= 0) {
4837 if (!osdmap->exists(c->target_osd)) {
4838 c->map_check_error = -ENOENT;
4839 c->map_check_error_str = "osd dne";
4840 c->target.osd = -1;
4841 return RECALC_OP_TARGET_OSD_DNE;
4842 }
4843 if (osdmap->is_down(c->target_osd)) {
4844 c->map_check_error = -ENXIO;
4845 c->map_check_error_str = "osd down";
4846 c->target.osd = -1;
4847 return RECALC_OP_TARGET_OSD_DOWN;
4848 }
4849 c->target.osd = c->target_osd;
4850 } else {
4851 int ret = _calc_target(&(c->target), nullptr, true);
4852 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4853 c->map_check_error = -ENOENT;
4854 c->map_check_error_str = "pool dne";
4855 c->target.osd = -1;
4856 return ret;
4857 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4858 c->map_check_error = -ENXIO;
4859 c->map_check_error_str = "osd down";
4860 c->target.osd = -1;
4861 return ret;
4862 }
4863 }
4864
4865 OSDSession *s;
4866 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4867 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4868
4869 if (c->session != s) {
4870 put_session(s);
4871 return RECALC_OP_TARGET_NEED_RESEND;
4872 }
4873
4874 put_session(s);
4875
4876 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4877 << c->session << dendl;
4878
4879 return RECALC_OP_TARGET_NO_ACTION;
4880}
4881
4882void Objecter::_assign_command_session(CommandOp *c,
4883 shunique_lock& sul)
4884{
11fdf7f2 4885 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4886
4887 OSDSession *s;
4888 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4889 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4890
4891 if (c->session != s) {
4892 if (c->session) {
4893 OSDSession *cs = c->session;
4894 OSDSession::unique_lock csl(cs->lock);
4895 _session_command_op_remove(c->session, c);
4896 csl.unlock();
4897 }
4898 OSDSession::unique_lock sl(s->lock);
4899 _session_command_op_assign(s, c);
4900 }
4901
4902 put_session(s);
4903}
4904
4905void Objecter::_send_command(CommandOp *c)
4906{
4907 ldout(cct, 10) << "_send_command " << c->tid << dendl;
11fdf7f2
TL
4908 ceph_assert(c->session);
4909 ceph_assert(c->session->con);
7c673cae
FG
4910 MCommand *m = new MCommand(monc->monmap.fsid);
4911 m->cmd = c->cmd;
4912 m->set_data(c->inbl);
4913 m->set_tid(c->tid);
4914 c->session->con->send_message(m);
4915 logger->inc(l_osdc_command_send);
4916}
4917
4918int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4919{
11fdf7f2 4920 ceph_assert(initialized);
7c673cae
FG
4921
4922 unique_lock wl(rwlock);
4923
4924 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4925 if (it == s->command_ops.end()) {
4926 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4927 return -ENOENT;
4928 }
4929
4930 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4931
4932 CommandOp *op = it->second;
4933 _command_cancel_map_check(op);
28e407b8 4934 OSDSession::unique_lock sl(op->session->lock);
7c673cae 4935 _finish_command(op, r, "");
28e407b8 4936 sl.unlock();
7c673cae
FG
4937 return 0;
4938}
4939
4940void Objecter::_finish_command(CommandOp *c, int r, string rs)
4941{
4942 // rwlock is locked unique
28e407b8 4943 // session lock is locked
7c673cae
FG
4944
4945 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4946 << rs << dendl;
4947 if (c->prs)
4948 *c->prs = rs;
4949 if (c->onfinish)
4950 c->onfinish->complete(r);
4951
4952 if (c->ontimeout && r != -ETIMEDOUT)
4953 timer.cancel_event(c->ontimeout);
4954
7c673cae 4955 _session_command_op_remove(c->session, c);
7c673cae
FG
4956
4957 c->put();
4958
4959 logger->dec(l_osdc_command_active);
4960}
4961
4962Objecter::OSDSession::~OSDSession()
4963{
4964 // Caller is responsible for re-assigning or
4965 // destroying any ops that were assigned to us
11fdf7f2
TL
4966 ceph_assert(ops.empty());
4967 ceph_assert(linger_ops.empty());
4968 ceph_assert(command_ops.empty());
7c673cae
FG
4969}
4970
9f95a23c 4971Objecter::Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
f91f0fd5 4972 Finisher *fin) :
9f95a23c
TL
4973 Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
4974 trace_endpoint("0.0.0.0", 0, "Objecter"),
4975 osdmap{std::make_unique<OSDMap>()},
4976 homeless_session(new OSDSession(cct, -1)),
9f95a23c
TL
4977 op_throttle_bytes(cct, "objecter_bytes",
4978 cct->_conf->objecter_inflight_op_bytes),
4979 op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
4980 retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
f91f0fd5
TL
4981{
4982 mon_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
4983 osd_timeout = cct->_conf.get_val<std::chrono::seconds>("rados_osd_op_timeout");
4984}
9f95a23c 4985
7c673cae
FG
4986Objecter::~Objecter()
4987{
11fdf7f2
TL
4988 ceph_assert(homeless_session->get_nref() == 1);
4989 ceph_assert(num_homeless_ops == 0);
7c673cae
FG
4990 homeless_session->put();
4991
11fdf7f2
TL
4992 ceph_assert(osd_sessions.empty());
4993 ceph_assert(poolstat_ops.empty());
4994 ceph_assert(statfs_ops.empty());
4995 ceph_assert(pool_ops.empty());
4996 ceph_assert(waiting_for_map.empty());
4997 ceph_assert(linger_ops.empty());
4998 ceph_assert(check_latest_map_lingers.empty());
4999 ceph_assert(check_latest_map_ops.empty());
5000 ceph_assert(check_latest_map_commands.empty());
7c673cae 5001
11fdf7f2
TL
5002 ceph_assert(!m_request_state_hook);
5003 ceph_assert(!logger);
7c673cae
FG
5004}
5005
5006/**
5007 * Wait until this OSD map epoch is received before
5008 * sending any more operations to OSDs. Use this
5009 * when it is known that the client can't trust
5010 * anything from before this epoch (e.g. due to
5011 * client blacklist at this epoch).
5012 */
5013void Objecter::set_epoch_barrier(epoch_t epoch)
5014{
5015 unique_lock wl(rwlock);
5016
5017 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5018 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5019 << dendl;
5020 if (epoch > epoch_barrier) {
5021 epoch_barrier = epoch;
5022 _maybe_request_map();
5023 }
5024}
5025
5026
5027
5028hobject_t Objecter::enumerate_objects_begin()
5029{
5030 return hobject_t();
5031}
5032
5033hobject_t Objecter::enumerate_objects_end()
5034{
5035 return hobject_t::get_max();
5036}
5037
5038struct C_EnumerateReply : public Context {
9f95a23c 5039 ceph::buffer::list bl;
7c673cae
FG
5040
5041 Objecter *objecter;
5042 hobject_t *next;
5043 std::list<librados::ListObjectImpl> *result;
5044 const hobject_t end;
5045 const int64_t pool_id;
5046 Context *on_finish;
5047
5048 epoch_t epoch;
5049 int budget;
5050
5051 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5052 std::list<librados::ListObjectImpl> *result_,
5053 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5054 objecter(objecter_), next(next_), result(result_),
5055 end(end_), pool_id(pool_id_), on_finish(on_finish_),
11fdf7f2 5056 epoch(0), budget(-1)
7c673cae
FG
5057 {}
5058
5059 void finish(int r) override {
5060 objecter->_enumerate_reply(
5061 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5062 }
5063};
5064
5065void Objecter::enumerate_objects(
5066 int64_t pool_id,
5067 const std::string &ns,
5068 const hobject_t &start,
5069 const hobject_t &end,
5070 const uint32_t max,
9f95a23c 5071 const ceph::buffer::list &filter_bl,
7c673cae
FG
5072 std::list<librados::ListObjectImpl> *result,
5073 hobject_t *next,
5074 Context *on_finish)
5075{
11fdf7f2 5076 ceph_assert(result);
7c673cae
FG
5077
5078 if (!end.is_max() && start > end) {
5079 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5080 on_finish->complete(-EINVAL);
5081 return;
5082 }
5083
5084 if (max < 1) {
5085 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5086 on_finish->complete(-EINVAL);
5087 return;
5088 }
5089
5090 if (start.is_max()) {
5091 on_finish->complete(0);
5092 return;
5093 }
5094
5095 shared_lock rl(rwlock);
11fdf7f2 5096 ceph_assert(osdmap->get_epoch());
7c673cae
FG
5097 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5098 rl.unlock();
5099 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5100 on_finish->complete(-EOPNOTSUPP);
5101 return;
5102 }
5103 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5104 if (!p) {
5105 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5106 << osdmap->get_epoch() << dendl;
5107 rl.unlock();
5108 on_finish->complete(-ENOENT);
5109 return;
5110 } else {
5111 rl.unlock();
5112 }
5113
5114 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5115
5116 // Stash completion state
5117 C_EnumerateReply *on_ack = new C_EnumerateReply(
5118 this, next, result, end, pool_id, on_finish);
5119
5120 ObjectOperation op;
5121 op.pg_nls(max, filter_bl, start, 0);
5122
5123 // Issue. See you later in _enumerate_reply
5124 object_locator_t oloc(pool_id, ns);
5125 pg_read(start.get_hash(), oloc, op,
5126 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5127}
5128
5129void Objecter::_enumerate_reply(
9f95a23c 5130 ceph::buffer::list &bl,
7c673cae
FG
5131 int r,
5132 const hobject_t &end,
5133 const int64_t pool_id,
5134 int budget,
5135 epoch_t reply_epoch,
5136 std::list<librados::ListObjectImpl> *result,
5137 hobject_t *next,
5138 Context *on_finish)
5139{
11fdf7f2 5140 if (budget >= 0) {
7c673cae
FG
5141 put_op_budget_bytes(budget);
5142 }
5143
5144 if (r < 0) {
5145 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5146 on_finish->complete(r);
5147 return;
5148 }
5149
11fdf7f2 5150 ceph_assert(next != NULL);
7c673cae
FG
5151
5152 // Decode the results
11fdf7f2 5153 auto iter = bl.cbegin();
7c673cae
FG
5154 pg_nls_response_t response;
5155
11fdf7f2 5156 decode(response, iter);
7c673cae 5157 if (!iter.end()) {
9f95a23c
TL
5158 // extra_info isn't used anywhere. We do this solely to preserve
5159 // backward compatibility
5160 ceph::buffer::list legacy_extra_info;
5161 decode(legacy_extra_info, iter);
7c673cae
FG
5162 }
5163
5164 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5165 << " handle " << response.handle
5166 << " reply_epoch " << reply_epoch << dendl;
5167 ldout(cct, 20) << __func__ << ": response.entries.size "
5168 << response.entries.size() << ", response.entries "
5169 << response.entries << dendl;
5170 if (response.handle <= end) {
5171 *next = response.handle;
5172 } else {
5173 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5174 << dendl;
5175 *next = end;
5176
5177 // drop anything after 'end'
5178 shared_lock rl(rwlock);
5179 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5180 if (!pool) {
5181 // pool is gone, drop any results which are now meaningless.
5182 rl.unlock();
5183 on_finish->complete(-ENOENT);
5184 return;
5185 }
5186 while (!response.entries.empty()) {
5187 uint32_t hash = response.entries.back().locator.empty() ?
5188 pool->hash_key(response.entries.back().oid,
5189 response.entries.back().nspace) :
5190 pool->hash_key(response.entries.back().locator,
5191 response.entries.back().nspace);
5192 hobject_t last(response.entries.back().oid,
5193 response.entries.back().locator,
5194 CEPH_NOSNAP,
5195 hash,
5196 pool_id,
5197 response.entries.back().nspace);
5198 if (last < end)
5199 break;
5200 ldout(cct, 20) << __func__ << " dropping item " << last
5201 << " >= end " << end << dendl;
5202 response.entries.pop_back();
5203 }
5204 rl.unlock();
5205 }
5206 if (!response.entries.empty()) {
5207 result->merge(response.entries);
5208 }
5209
5210 // release the listing context's budget once all
5211 // OPs (in the session) are finished
5212#if 0
5213 put_nlist_context_budget(list_context);
5214#endif
5215 on_finish->complete(r);
5216 return;
5217}
5218
5219namespace {
5220 using namespace librados;
5221
5222 template <typename T>
9f95a23c 5223 void do_decode(std::vector<T>& items, std::vector<ceph::buffer::list>& bls)
7c673cae
FG
5224 {
5225 for (auto bl : bls) {
11fdf7f2 5226 auto p = bl.cbegin();
7c673cae
FG
5227 T t;
5228 decode(t, p);
5229 items.push_back(t);
5230 }
5231 }
5232
5233 struct C_ObjectOperation_scrub_ls : public Context {
9f95a23c 5234 ceph::buffer::list bl;
7c673cae
FG
5235 uint32_t *interval;
5236 std::vector<inconsistent_obj_t> *objects = nullptr;
5237 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5238 int *rval;
5239
5240 C_ObjectOperation_scrub_ls(uint32_t *interval,
5241 std::vector<inconsistent_obj_t> *objects,
5242 int *rval)
5243 : interval(interval), objects(objects), rval(rval) {}
5244 C_ObjectOperation_scrub_ls(uint32_t *interval,
5245 std::vector<inconsistent_snapset_t> *snapsets,
5246 int *rval)
5247 : interval(interval), snapsets(snapsets), rval(rval) {}
5248 void finish(int r) override {
5249 if (r < 0 && r != -EAGAIN) {
5250 if (rval)
5251 *rval = r;
5252 return;
5253 }
5254
5255 if (rval)
5256 *rval = 0;
5257
5258 try {
5259 decode();
9f95a23c 5260 } catch (ceph::buffer::error&) {
7c673cae
FG
5261 if (rval)
5262 *rval = -EIO;
5263 }
5264 }
5265 private:
5266 void decode() {
5267 scrub_ls_result_t result;
11fdf7f2 5268 auto p = bl.cbegin();
7c673cae
FG
5269 result.decode(p);
5270 *interval = result.interval;
5271 if (objects) {
5272 do_decode(*objects, result.vals);
5273 } else {
5274 do_decode(*snapsets, result.vals);
5275 }
5276 }
5277 };
5278
5279 template <typename T>
5280 void do_scrub_ls(::ObjectOperation *op,
5281 const scrub_ls_arg_t& arg,
5282 std::vector<T> *items,
5283 uint32_t *interval,
5284 int *rval)
5285 {
5286 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5287 op->flags |= CEPH_OSD_FLAG_PGOP;
11fdf7f2 5288 ceph_assert(interval);
7c673cae
FG
5289 arg.encode(osd_op.indata);
5290 unsigned p = op->ops.size() - 1;
5291 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5292 op->out_handler[p] = h;
5293 op->out_bl[p] = &h->bl;
5294 op->out_rval[p] = rval;
5295 }
5296}
5297
5298void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5299 uint64_t max_to_get,
5300 std::vector<librados::inconsistent_obj_t> *objects,
5301 uint32_t *interval,
5302 int *rval)
5303{
5304 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5305 do_scrub_ls(this, arg, objects, interval, rval);
5306}
5307
5308void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5309 uint64_t max_to_get,
5310 std::vector<librados::inconsistent_snapset_t> *snapsets,
5311 uint32_t *interval,
5312 int *rval)
5313{
5314 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5315 do_scrub_ls(this, arg, snapsets, interval, rval);
5316}