]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
update download target update for octopus release
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Objecter.h"
16#include "osd/OSDMap.h"
17#include "Filer.h"
18
19#include "mon/MonClient.h"
20
21#include "msg/Messenger.h"
22#include "msg/Message.h"
23
24#include "messages/MPing.h"
25#include "messages/MOSDOp.h"
26#include "messages/MOSDOpReply.h"
27#include "messages/MOSDBackoff.h"
28#include "messages/MOSDMap.h"
29
30#include "messages/MPoolOp.h"
31#include "messages/MPoolOpReply.h"
32
33#include "messages/MGetPoolStats.h"
34#include "messages/MGetPoolStatsReply.h"
35#include "messages/MStatfs.h"
36#include "messages/MStatfsReply.h"
37
38#include "messages/MMonCommand.h"
39
40#include "messages/MCommand.h"
41#include "messages/MCommandReply.h"
42
43#include "messages/MWatchNotify.h"
44
45#include <errno.h>
46
47#include "common/config.h"
48#include "common/perf_counters.h"
49#include "common/scrub_types.h"
50#include "include/str_list.h"
51#include "common/errno.h"
52#include "common/EventTrace.h"
53
54using ceph::real_time;
55using ceph::real_clock;
56
57using ceph::mono_clock;
58using ceph::mono_time;
59
60using ceph::timespan;
61
62
63#define dout_subsys ceph_subsys_objecter
64#undef dout_prefix
65#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
66
67
68enum {
69 l_osdc_first = 123200,
70 l_osdc_op_active,
71 l_osdc_op_laggy,
72 l_osdc_op_send,
73 l_osdc_op_send_bytes,
74 l_osdc_op_resend,
75 l_osdc_op_reply,
76
77 l_osdc_op,
78 l_osdc_op_r,
79 l_osdc_op_w,
80 l_osdc_op_rmw,
81 l_osdc_op_pg,
82
83 l_osdc_osdop_stat,
84 l_osdc_osdop_create,
85 l_osdc_osdop_read,
86 l_osdc_osdop_write,
87 l_osdc_osdop_writefull,
88 l_osdc_osdop_writesame,
89 l_osdc_osdop_append,
90 l_osdc_osdop_zero,
91 l_osdc_osdop_truncate,
92 l_osdc_osdop_delete,
93 l_osdc_osdop_mapext,
94 l_osdc_osdop_sparse_read,
95 l_osdc_osdop_clonerange,
96 l_osdc_osdop_getxattr,
97 l_osdc_osdop_setxattr,
98 l_osdc_osdop_cmpxattr,
99 l_osdc_osdop_rmxattr,
100 l_osdc_osdop_resetxattrs,
7c673cae
FG
101 l_osdc_osdop_call,
102 l_osdc_osdop_watch,
103 l_osdc_osdop_notify,
104 l_osdc_osdop_src_cmpxattr,
105 l_osdc_osdop_pgls,
106 l_osdc_osdop_pgls_filter,
107 l_osdc_osdop_other,
108
109 l_osdc_linger_active,
110 l_osdc_linger_send,
111 l_osdc_linger_resend,
112 l_osdc_linger_ping,
113
114 l_osdc_poolop_active,
115 l_osdc_poolop_send,
116 l_osdc_poolop_resend,
117
118 l_osdc_poolstat_active,
119 l_osdc_poolstat_send,
120 l_osdc_poolstat_resend,
121
122 l_osdc_statfs_active,
123 l_osdc_statfs_send,
124 l_osdc_statfs_resend,
125
126 l_osdc_command_active,
127 l_osdc_command_send,
128 l_osdc_command_resend,
129
130 l_osdc_map_epoch,
131 l_osdc_map_full,
132 l_osdc_map_inc,
133
134 l_osdc_osd_sessions,
135 l_osdc_osd_session_open,
136 l_osdc_osd_session_close,
137 l_osdc_osd_laggy,
138
139 l_osdc_osdop_omap_wr,
140 l_osdc_osdop_omap_rd,
141 l_osdc_osdop_omap_del,
142
143 l_osdc_last,
144};
145
146
147// config obs ----------------------------
148
149static const char *config_keys[] = {
150 "crush_location",
151 NULL
152};
153
154class Objecter::RequestStateHook : public AdminSocketHook {
155 Objecter *m_objecter;
156public:
157 explicit RequestStateHook(Objecter *objecter);
11fdf7f2
TL
158 bool call(std::string_view command, const cmdmap_t& cmdmap,
159 std::string_view format, bufferlist& out) override;
7c673cae
FG
160};
161
162/**
163 * This is a more limited form of C_Contexts, but that requires
164 * a ceph_context which we don't have here.
165 */
166class ObjectOperation::C_TwoContexts : public Context {
167 Context *first;
168 Context *second;
169public:
170 C_TwoContexts(Context *first, Context *second) :
171 first(first), second(second) {}
172 void finish(int r) override {
173 first->complete(r);
174 second->complete(r);
175 first = NULL;
176 second = NULL;
177 }
178
179 ~C_TwoContexts() override {
180 delete first;
181 delete second;
182 }
183};
184
185void ObjectOperation::add_handler(Context *extra) {
186 size_t last = out_handler.size() - 1;
187 Context *orig = out_handler[last];
188 if (orig) {
189 Context *wrapper = new C_TwoContexts(orig, extra);
190 out_handler[last] = wrapper;
191 } else {
192 out_handler[last] = extra;
193 }
194}
195
196Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
197 object_t& oid)
198{
199 if (oid.name.empty())
200 return unique_completion_lock();
201
202 static constexpr uint32_t HASH_PRIME = 1021;
203 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
204 % HASH_PRIME;
205
206 return unique_completion_lock(completion_locks[h % num_locks],
207 std::defer_lock);
208}
209
210const char** Objecter::get_tracked_conf_keys() const
211{
212 return config_keys;
213}
214
215
11fdf7f2 216void Objecter::handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
217 const std::set <std::string> &changed)
218{
219 if (changed.count("crush_location")) {
220 update_crush_location();
221 }
222}
223
224void Objecter::update_crush_location()
225{
226 unique_lock wl(rwlock);
227 crush_location = cct->crush_location.get_location();
228}
229
230// messages ------------------------------
231
232/*
233 * initialize only internal data structures, don't initiate cluster interaction
234 */
235void Objecter::init()
236{
11fdf7f2 237 ceph_assert(!initialized);
7c673cae
FG
238
239 if (!logger) {
240 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
241
242 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
243 PerfCountersBuilder::PRIO_CRITICAL);
244 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
245 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
11fdf7f2 246 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
247 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
248 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
249
250 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
251 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
252 PerfCountersBuilder::PRIO_CRITICAL);
253 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
254 PerfCountersBuilder::PRIO_CRITICAL);
255 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
256 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
257 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
258
259 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
260 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
261 "Create object operations");
262 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
263 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
264 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
265 "Write full object operations");
266 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
267 "Write same operations");
268 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
269 "Append operation");
270 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
271 "Set object to zero operations");
272 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
273 "Truncate object operations");
274 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
275 "Delete object operations");
276 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
277 "Map extent operations");
278 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
279 "Sparse read operations");
280 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
281 "Clone range operations");
282 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
283 "Get xattr operations");
284 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
285 "Set xattr operations");
286 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
287 "Xattr comparison operations");
288 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
289 "Remove xattr operations");
290 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
291 "Reset xattr operations");
7c673cae
FG
292 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
293 "Call (execute) operations");
294 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
295 "Watch by object operations");
296 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
297 "Notify about object operations");
298 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
299 "Extended attribute comparison in multi operations");
300 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
301 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
302 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
303
304 pcb.add_u64(l_osdc_linger_active, "linger_active",
305 "Active lingering operations");
306 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
307 "Sent lingering operations");
308 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
309 "Resent lingering operations");
310 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
311 "Sent pings to lingering operations");
312
313 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
314 "Active pool operations");
315 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
316 "Sent pool operations");
317 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
318 "Resent pool operations");
319
320 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
321 "Active get pool stat operations");
322 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
323 "Pool stat operations sent");
324 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
325 "Resent pool stats");
326
327 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
328 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
329 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
330 "Resent FS stats");
331
332 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
333 pcb.add_u64_counter(l_osdc_command_send, "command_send",
334 "Sent commands");
335 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
336 "Resent commands");
337
338 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
339 pcb.add_u64_counter(l_osdc_map_full, "map_full",
340 "Full OSD maps received");
341 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
342 "Incremental OSD maps received");
343
344 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
345 "Open sessions"); // open sessions
346 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
347 "Sessions opened");
348 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
349 "Sessions closed");
350 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
351
352 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
353 "OSD OMAP write operations");
354 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
355 "OSD OMAP read operations");
356 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
357 "OSD OMAP delete operations");
358
359 logger = pcb.create_perf_counters();
360 cct->get_perfcounters_collection()->add(logger);
361 }
362
363 m_request_state_hook = new RequestStateHook(this);
364 AdminSocket* admin_socket = cct->get_admin_socket();
365 int ret = admin_socket->register_command("objecter_requests",
366 "objecter_requests",
367 m_request_state_hook,
368 "show in-progress osd requests");
369
370 /* Don't warn on EEXIST, happens if multiple ceph clients
371 * are instantiated from one process */
372 if (ret < 0 && ret != -EEXIST) {
373 lderr(cct) << "error registering admin socket command: "
374 << cpp_strerror(ret) << dendl;
375 }
376
377 update_crush_location();
378
11fdf7f2 379 cct->_conf.add_observer(this);
7c673cae 380
31f18b77 381 initialized = true;
7c673cae
FG
382}
383
384/*
385 * ok, cluster interaction can happen
386 */
387void Objecter::start(const OSDMap* o)
388{
389 shared_lock rl(rwlock);
390
391 start_tick();
392 if (o) {
393 osdmap->deepish_copy_from(*o);
394 } else if (osdmap->get_epoch() == 0) {
395 _maybe_request_map();
396 }
397}
398
399void Objecter::shutdown()
400{
11fdf7f2 401 ceph_assert(initialized);
7c673cae
FG
402
403 unique_lock wl(rwlock);
404
31f18b77 405 initialized = false;
7c673cae 406
f64942e4 407 wl.unlock();
11fdf7f2 408 cct->_conf.remove_observer(this);
f64942e4 409 wl.lock();
7c673cae
FG
410
411 map<int,OSDSession*>::iterator p;
412 while (!osd_sessions.empty()) {
413 p = osd_sessions.begin();
414 close_session(p->second);
415 }
416
417 while(!check_latest_map_lingers.empty()) {
418 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
419 i->second->put();
420 check_latest_map_lingers.erase(i->first);
421 }
422
423 while(!check_latest_map_ops.empty()) {
424 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
425 i->second->put();
426 check_latest_map_ops.erase(i->first);
427 }
428
429 while(!check_latest_map_commands.empty()) {
430 map<ceph_tid_t, CommandOp*>::iterator i
431 = check_latest_map_commands.begin();
432 i->second->put();
433 check_latest_map_commands.erase(i->first);
434 }
435
436 while(!poolstat_ops.empty()) {
437 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
438 delete i->second;
439 poolstat_ops.erase(i->first);
440 }
441
442 while(!statfs_ops.empty()) {
443 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
444 delete i->second;
445 statfs_ops.erase(i->first);
446 }
447
448 while(!pool_ops.empty()) {
449 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
450 delete i->second;
451 pool_ops.erase(i->first);
452 }
453
454 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
455 while(!homeless_session->linger_ops.empty()) {
456 std::map<uint64_t, LingerOp*>::iterator i
457 = homeless_session->linger_ops.begin();
458 ldout(cct, 10) << " linger_op " << i->first << dendl;
459 LingerOp *lop = i->second;
460 {
461 OSDSession::unique_lock swl(homeless_session->lock);
462 _session_linger_op_remove(homeless_session, lop);
463 }
464 linger_ops.erase(lop->linger_id);
465 linger_ops_set.erase(lop);
466 lop->put();
467 }
468
469 while(!homeless_session->ops.empty()) {
470 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
471 ldout(cct, 10) << " op " << i->first << dendl;
472 Op *op = i->second;
473 {
474 OSDSession::unique_lock swl(homeless_session->lock);
475 _session_op_remove(homeless_session, op);
476 }
477 op->put();
478 }
479
480 while(!homeless_session->command_ops.empty()) {
481 std::map<ceph_tid_t, CommandOp*>::iterator i
482 = homeless_session->command_ops.begin();
483 ldout(cct, 10) << " command_op " << i->first << dendl;
484 CommandOp *cop = i->second;
485 {
486 OSDSession::unique_lock swl(homeless_session->lock);
487 _session_command_op_remove(homeless_session, cop);
488 }
489 cop->put();
490 }
491
492 if (tick_event) {
493 if (timer.cancel_event(tick_event)) {
494 ldout(cct, 10) << " successfully canceled tick" << dendl;
495 }
496 tick_event = 0;
497 }
498
499 if (logger) {
500 cct->get_perfcounters_collection()->remove(logger);
501 delete logger;
502 logger = NULL;
503 }
504
505 // Let go of Objecter write lock so timer thread can shutdown
506 wl.unlock();
507
508 // Outside of lock to avoid cycle WRT calls to RequestStateHook
509 // This is safe because we guarantee no concurrent calls to
510 // shutdown() with the ::initialized check at start.
511 if (m_request_state_hook) {
512 AdminSocket* admin_socket = cct->get_admin_socket();
513 admin_socket->unregister_command("objecter_requests");
514 delete m_request_state_hook;
515 m_request_state_hook = NULL;
516 }
517}
518
519void Objecter::_send_linger(LingerOp *info,
520 shunique_lock& sul)
521{
11fdf7f2 522 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
523
524 vector<OSDOp> opv;
525 Context *oncommit = NULL;
526 LingerOp::shared_lock watchl(info->watch_lock);
527 bufferlist *poutbl = NULL;
528 if (info->registered && info->is_watch) {
529 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
530 << dendl;
531 opv.push_back(OSDOp());
532 opv.back().op.op = CEPH_OSD_OP_WATCH;
533 opv.back().op.watch.cookie = info->get_cookie();
534 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
535 opv.back().op.watch.gen = ++info->register_gen;
536 oncommit = new C_Linger_Reconnect(this, info);
537 } else {
538 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
539 << dendl;
540 opv = info->ops;
541 C_Linger_Commit *c = new C_Linger_Commit(this, info);
542 if (!info->is_watch) {
543 info->notify_id = 0;
544 poutbl = &c->outbl;
545 }
546 oncommit = c;
547 }
548 watchl.unlock();
549 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
550 opv, info->target.flags | CEPH_OSD_FLAG_READ,
551 oncommit, info->pobjver);
552 o->outbl = poutbl;
553 o->snapid = info->snap;
554 o->snapc = info->snapc;
555 o->mtime = info->mtime;
556
557 o->target = info->target;
31f18b77 558 o->tid = ++last_tid;
7c673cae
FG
559
560 // do not resend this; we will send a new op to reregister
561 o->should_resend = false;
11fdf7f2 562 o->ctx_budgeted = true;
7c673cae
FG
563
564 if (info->register_tid) {
11fdf7f2 565 // repeat send. cancel old registration op, if any.
7c673cae
FG
566 OSDSession::unique_lock sl(info->session->lock);
567 if (info->session->ops.count(info->register_tid)) {
568 Op *o = info->session->ops[info->register_tid];
569 _op_cancel_map_check(o);
570 _cancel_linger_op(o);
571 }
572 sl.unlock();
7c673cae
FG
573 }
574
11fdf7f2
TL
575 _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
576
7c673cae
FG
577 logger->inc(l_osdc_linger_send);
578}
579
580void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
581{
582 LingerOp::unique_lock wl(info->watch_lock);
583 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
584 if (info->on_reg_commit) {
585 info->on_reg_commit->complete(r);
586 info->on_reg_commit = NULL;
587 }
28e407b8
AA
588 if (r < 0 && info->on_notify_finish) {
589 info->on_notify_finish->complete(r);
590 info->on_notify_finish = nullptr;
591 }
7c673cae
FG
592
593 // only tell the user the first time we do this
594 info->registered = true;
595 info->pobjver = NULL;
596
597 if (!info->is_watch) {
598 // make note of the notify_id
11fdf7f2 599 auto p = outbl.cbegin();
7c673cae 600 try {
11fdf7f2 601 decode(info->notify_id, p);
7c673cae
FG
602 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
603 << dendl;
604 }
605 catch (buffer::error& e) {
606 }
607 }
608}
609
610struct C_DoWatchError : public Context {
611 Objecter *objecter;
612 Objecter::LingerOp *info;
613 int err;
614 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
615 : objecter(o), info(i), err(r) {
616 info->get();
617 info->_queued_async();
618 }
619 void finish(int r) override {
620 Objecter::unique_lock wl(objecter->rwlock);
621 bool canceled = info->canceled;
622 wl.unlock();
623
624 if (!canceled) {
625 info->watch_context->handle_error(info->get_cookie(), err);
626 }
627
628 info->finished_async();
629 info->put();
630 }
631};
632
633int Objecter::_normalize_watch_error(int r)
634{
635 // translate ENOENT -> ENOTCONN so that a delete->disconnection
11fdf7f2 636 // notification and a failure to reconnect because we raced with
7c673cae
FG
637 // the delete appear the same to the user.
638 if (r == -ENOENT)
639 r = -ENOTCONN;
640 return r;
641}
642
643void Objecter::_linger_reconnect(LingerOp *info, int r)
644{
645 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
646 << " (last_error " << info->last_error << ")" << dendl;
647 if (r < 0) {
648 LingerOp::unique_lock wl(info->watch_lock);
649 if (!info->last_error) {
650 r = _normalize_watch_error(r);
651 info->last_error = r;
652 if (info->watch_context) {
653 finisher->queue(new C_DoWatchError(this, info, r));
654 }
655 }
656 wl.unlock();
657 }
658}
659
660void Objecter::_send_linger_ping(LingerOp *info)
661{
662 // rwlock is locked unique
663 // info->session->lock is locked
664
665 if (cct->_conf->objecter_inject_no_watch_ping) {
666 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
667 << dendl;
668 return;
669 }
670 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
671 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
672 return;
673 }
674
11fdf7f2 675 ceph::coarse_mono_time now = ceph::coarse_mono_clock::now();
7c673cae
FG
676 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
677 << dendl;
678
679 vector<OSDOp> opv(1);
680 opv[0].op.op = CEPH_OSD_OP_WATCH;
681 opv[0].op.watch.cookie = info->get_cookie();
682 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
683 opv[0].op.watch.gen = info->register_gen;
684 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
685 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
686 opv, info->target.flags | CEPH_OSD_FLAG_READ,
687 onack, NULL, NULL);
688 o->target = info->target;
689 o->should_resend = false;
690 _send_op_account(o);
31f18b77 691 o->tid = ++last_tid;
7c673cae 692 _session_op_assign(info->session, o);
11fdf7f2 693 _send_op(o);
7c673cae
FG
694 info->ping_tid = o->tid;
695
696 onack->sent = now;
697 logger->inc(l_osdc_linger_ping);
698}
699
11fdf7f2 700void Objecter::_linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
7c673cae
FG
701 uint32_t register_gen)
702{
703 LingerOp::unique_lock l(info->watch_lock);
704 ldout(cct, 10) << __func__ << " " << info->linger_id
705 << " sent " << sent << " gen " << register_gen << " = " << r
706 << " (last_error " << info->last_error
707 << " register_gen " << info->register_gen << ")" << dendl;
708 if (info->register_gen == register_gen) {
709 if (r == 0) {
710 info->watch_valid_thru = sent;
711 } else if (r < 0 && !info->last_error) {
712 r = _normalize_watch_error(r);
713 info->last_error = r;
714 if (info->watch_context) {
715 finisher->queue(new C_DoWatchError(this, info, r));
716 }
717 }
718 } else {
719 ldout(cct, 20) << " ignoring old gen" << dendl;
720 }
721}
722
723int Objecter::linger_check(LingerOp *info)
724{
725 LingerOp::shared_lock l(info->watch_lock);
726
11fdf7f2 727 ceph::coarse_mono_time stamp = info->watch_valid_thru;
7c673cae 728 if (!info->watch_pending_async.empty())
11fdf7f2
TL
729 stamp = std::min(info->watch_valid_thru, info->watch_pending_async.front());
730 auto age = ceph::coarse_mono_clock::now() - stamp;
7c673cae
FG
731
732 ldout(cct, 10) << __func__ << " " << info->linger_id
733 << " err " << info->last_error
734 << " age " << age << dendl;
735 if (info->last_error)
736 return info->last_error;
737 // return a safe upper bound (we are truncating to ms)
738 return
739 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
740}
741
742void Objecter::linger_cancel(LingerOp *info)
743{
744 unique_lock wl(rwlock);
745 _linger_cancel(info);
746 info->put();
747}
748
749void Objecter::_linger_cancel(LingerOp *info)
750{
751 // rwlock is locked unique
752 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
753 if (!info->canceled) {
754 OSDSession *s = info->session;
755 OSDSession::unique_lock sl(s->lock);
756 _session_linger_op_remove(s, info);
757 sl.unlock();
758
759 linger_ops.erase(info->linger_id);
760 linger_ops_set.erase(info);
11fdf7f2 761 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
762
763 info->canceled = true;
764 info->put();
765
766 logger->dec(l_osdc_linger_active);
767 }
768}
769
770
771
772Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
773 const object_locator_t& oloc,
774 int flags)
775{
11fdf7f2 776 LingerOp *info = new LingerOp(this);
7c673cae
FG
777 info->target.base_oid = oid;
778 info->target.base_oloc = oloc;
779 if (info->target.base_oloc.key == oid)
780 info->target.base_oloc.key.clear();
781 info->target.flags = flags;
11fdf7f2 782 info->watch_valid_thru = ceph::coarse_mono_clock::now();
7c673cae
FG
783
784 unique_lock l(rwlock);
785
786 // Acquire linger ID
787 info->linger_id = ++max_linger_id;
788 ldout(cct, 10) << __func__ << " info " << info
789 << " linger_id " << info->linger_id
790 << " cookie " << info->get_cookie()
791 << dendl;
792 linger_ops[info->linger_id] = info;
793 linger_ops_set.insert(info);
11fdf7f2 794 ceph_assert(linger_ops.size() == linger_ops_set.size());
7c673cae
FG
795
796 info->get(); // for the caller
797 return info;
798}
799
800ceph_tid_t Objecter::linger_watch(LingerOp *info,
801 ObjectOperation& op,
802 const SnapContext& snapc,
803 real_time mtime,
804 bufferlist& inbl,
805 Context *oncommit,
806 version_t *objver)
807{
808 info->is_watch = true;
809 info->snapc = snapc;
810 info->mtime = mtime;
811 info->target.flags |= CEPH_OSD_FLAG_WRITE;
812 info->ops = op.ops;
813 info->inbl = inbl;
814 info->poutbl = NULL;
815 info->pobjver = objver;
816 info->on_reg_commit = oncommit;
817
11fdf7f2
TL
818 info->ctx_budget = take_linger_budget(info);
819
7c673cae
FG
820 shunique_lock sul(rwlock, ceph::acquire_unique);
821 _linger_submit(info, sul);
822 logger->inc(l_osdc_linger_active);
823
824 return info->linger_id;
825}
826
827ceph_tid_t Objecter::linger_notify(LingerOp *info,
828 ObjectOperation& op,
829 snapid_t snap, bufferlist& inbl,
830 bufferlist *poutbl,
831 Context *onfinish,
832 version_t *objver)
833{
834 info->snap = snap;
835 info->target.flags |= CEPH_OSD_FLAG_READ;
836 info->ops = op.ops;
837 info->inbl = inbl;
838 info->poutbl = poutbl;
839 info->pobjver = objver;
840 info->on_reg_commit = onfinish;
841
11fdf7f2
TL
842 info->ctx_budget = take_linger_budget(info);
843
7c673cae
FG
844 shunique_lock sul(rwlock, ceph::acquire_unique);
845 _linger_submit(info, sul);
846 logger->inc(l_osdc_linger_active);
847
848 return info->linger_id;
849}
850
851void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
852{
11fdf7f2
TL
853 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
854 ceph_assert(info->linger_id);
855 ceph_assert(info->ctx_budget != -1); // caller needs to have taken budget already!
7c673cae
FG
856
857 // Populate Op::target
858 OSDSession *s = NULL;
859 _calc_target(&info->target, nullptr);
860
861 // Create LingerOp<->OSDSession relation
862 int r = _get_session(info->target.osd, &s, sul);
11fdf7f2 863 ceph_assert(r == 0);
7c673cae
FG
864 OSDSession::unique_lock sl(s->lock);
865 _session_linger_op_assign(s, info);
866 sl.unlock();
867 put_session(s);
868
869 _send_linger(info, sul);
870}
871
872struct C_DoWatchNotify : public Context {
873 Objecter *objecter;
874 Objecter::LingerOp *info;
875 MWatchNotify *msg;
876 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
877 : objecter(o), info(i), msg(m) {
878 info->get();
879 info->_queued_async();
880 msg->get();
881 }
882 void finish(int r) override {
883 objecter->_do_watch_notify(info, msg);
884 }
885};
886
887void Objecter::handle_watch_notify(MWatchNotify *m)
888{
889 shared_lock l(rwlock);
31f18b77 890 if (!initialized) {
7c673cae
FG
891 return;
892 }
893
894 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
895 if (linger_ops_set.count(info) == 0) {
896 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
897 return;
898 }
899 LingerOp::unique_lock wl(info->watch_lock);
900 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
901 if (!info->last_error) {
902 info->last_error = -ENOTCONN;
903 if (info->watch_context) {
904 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
905 }
906 }
907 } else if (!info->is_watch) {
908 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
909 // since we know the only user (librados) is safe to call in
910 // fast-dispatch context
911 if (info->notify_id &&
912 info->notify_id != m->notify_id) {
913 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
914 << " != " << info->notify_id << ", ignoring" << dendl;
915 } else if (info->on_notify_finish) {
916 info->notify_result_bl->claim(m->get_data());
917 info->on_notify_finish->complete(m->return_code);
918
919 // if we race with reconnect we might get a second notify; only
920 // notify the caller once!
921 info->on_notify_finish = NULL;
922 }
923 } else {
924 finisher->queue(new C_DoWatchNotify(this, info, m));
925 }
926}
927
928void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
929{
930 ldout(cct, 10) << __func__ << " " << *m << dendl;
931
932 shared_lock l(rwlock);
11fdf7f2 933 ceph_assert(initialized);
7c673cae
FG
934
935 if (info->canceled) {
936 l.unlock();
937 goto out;
938 }
939
940 // notify completion?
11fdf7f2
TL
941 ceph_assert(info->is_watch);
942 ceph_assert(info->watch_context);
943 ceph_assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
7c673cae
FG
944
945 l.unlock();
946
947 switch (m->opcode) {
948 case CEPH_WATCH_EVENT_NOTIFY:
949 info->watch_context->handle_notify(m->notify_id, m->cookie,
950 m->notifier_gid, m->bl);
951 break;
952 }
953
954 out:
955 info->finished_async();
956 info->put();
957 m->put();
958}
959
960bool Objecter::ms_dispatch(Message *m)
961{
962 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
7c673cae
FG
963 switch (m->get_type()) {
964 // these we exlusively handle
965 case CEPH_MSG_OSD_OPREPLY:
966 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
967 return true;
968
969 case CEPH_MSG_OSD_BACKOFF:
970 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
971 return true;
972
973 case CEPH_MSG_WATCH_NOTIFY:
974 handle_watch_notify(static_cast<MWatchNotify*>(m));
975 m->put();
976 return true;
977
978 case MSG_COMMAND_REPLY:
979 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
980 handle_command_reply(static_cast<MCommandReply*>(m));
981 return true;
982 } else {
983 return false;
984 }
985
986 case MSG_GETPOOLSTATSREPLY:
987 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
988 return true;
989
990 case CEPH_MSG_POOLOP_REPLY:
991 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
992 return true;
993
994 case CEPH_MSG_STATFS_REPLY:
995 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
996 return true;
997
998 // these we give others a chance to inspect
999
1000 // MDS, OSD
1001 case CEPH_MSG_OSD_MAP:
1002 handle_osd_map(static_cast<MOSDMap*>(m));
1003 return false;
1004 }
1005 return false;
1006}
1007
11fdf7f2
TL
1008void Objecter::_scan_requests(
1009 OSDSession *s,
1010 bool skipped_map,
1011 bool cluster_full,
1012 map<int64_t, bool> *pool_full_map,
1013 map<ceph_tid_t, Op*>& need_resend,
1014 list<LingerOp*>& need_resend_linger,
1015 map<ceph_tid_t, CommandOp*>& need_resend_command,
1016 shunique_lock& sul,
1017 const mempool::osdmap::map<int64_t,OSDMap::snap_interval_set_t> *gap_removed_snaps)
7c673cae 1018{
11fdf7f2 1019 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
1020
1021 list<LingerOp*> unregister_lingers;
1022
1023 OSDSession::unique_lock sl(s->lock);
1024
1025 // check for changed linger mappings (_before_ regular ops)
1026 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1027 while (lp != s->linger_ops.end()) {
1028 LingerOp *op = lp->second;
11fdf7f2 1029 ceph_assert(op->session == s);
7c673cae
FG
1030 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1031 // invalidation
1032 ++lp;
1033 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1034 bool unregister, force_resend_writes = cluster_full;
1035 int r = _recalc_linger_op_target(op, sul);
1036 if (pool_full_map)
1037 force_resend_writes = force_resend_writes ||
1038 (*pool_full_map)[op->target.base_oloc.pool];
1039 switch (r) {
1040 case RECALC_OP_TARGET_NO_ACTION:
11fdf7f2 1041 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1042 break;
1043 // -- fall-thru --
1044 case RECALC_OP_TARGET_NEED_RESEND:
1045 need_resend_linger.push_back(op);
1046 _linger_cancel_map_check(op);
1047 break;
1048 case RECALC_OP_TARGET_POOL_DNE:
1049 _check_linger_pool_dne(op, &unregister);
1050 if (unregister) {
1051 ldout(cct, 10) << " need to unregister linger op "
1052 << op->linger_id << dendl;
1053 op->get();
1054 unregister_lingers.push_back(op);
1055 }
1056 break;
1057 }
1058 }
1059
1060 // check for changed request mappings
1061 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1062 while (p != s->ops.end()) {
1063 Op *op = p->second;
1064 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1065 ldout(cct, 10) << " checking op " << op->tid << dendl;
11fdf7f2
TL
1066 _prune_snapc(osdmap->get_new_removed_snaps(), op);
1067 if (skipped_map) {
1068 _prune_snapc(*gap_removed_snaps, op);
1069 }
7c673cae
FG
1070 bool force_resend_writes = cluster_full;
1071 if (pool_full_map)
1072 force_resend_writes = force_resend_writes ||
1073 (*pool_full_map)[op->target.base_oloc.pool];
1074 int r = _calc_target(&op->target,
1075 op->session ? op->session->con.get() : nullptr);
1076 switch (r) {
1077 case RECALC_OP_TARGET_NO_ACTION:
11fdf7f2 1078 if (!skipped_map && !(force_resend_writes && op->respects_full()))
7c673cae
FG
1079 break;
1080 // -- fall-thru --
1081 case RECALC_OP_TARGET_NEED_RESEND:
11fdf7f2 1082 _session_op_remove(op->session, op);
7c673cae
FG
1083 need_resend[op->tid] = op;
1084 _op_cancel_map_check(op);
1085 break;
1086 case RECALC_OP_TARGET_POOL_DNE:
1087 _check_op_pool_dne(op, &sl);
1088 break;
1089 }
1090 }
1091
1092 // commands
1093 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1094 while (cp != s->command_ops.end()) {
1095 CommandOp *c = cp->second;
1096 ++cp;
1097 ldout(cct, 10) << " checking command " << c->tid << dendl;
1098 bool force_resend_writes = cluster_full;
1099 if (pool_full_map)
1100 force_resend_writes = force_resend_writes ||
1101 (*pool_full_map)[c->target_pg.pool()];
1102 int r = _calc_command_target(c, sul);
1103 switch (r) {
1104 case RECALC_OP_TARGET_NO_ACTION:
1105 // resend if skipped map; otherwise do nothing.
11fdf7f2 1106 if (!skipped_map && !force_resend_writes)
7c673cae
FG
1107 break;
1108 // -- fall-thru --
1109 case RECALC_OP_TARGET_NEED_RESEND:
1110 need_resend_command[c->tid] = c;
11fdf7f2 1111 _session_command_op_remove(c->session, c);
7c673cae
FG
1112 _command_cancel_map_check(c);
1113 break;
1114 case RECALC_OP_TARGET_POOL_DNE:
1115 case RECALC_OP_TARGET_OSD_DNE:
1116 case RECALC_OP_TARGET_OSD_DOWN:
1117 _check_command_map_dne(c);
1118 break;
1119 }
1120 }
1121
1122 sl.unlock();
1123
1124 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1125 iter != unregister_lingers.end();
1126 ++iter) {
1127 _linger_cancel(*iter);
1128 (*iter)->put();
1129 }
1130}
1131
1132void Objecter::handle_osd_map(MOSDMap *m)
1133{
1134 shunique_lock sul(rwlock, acquire_unique);
31f18b77 1135 if (!initialized)
7c673cae
FG
1136 return;
1137
11fdf7f2 1138 ceph_assert(osdmap);
7c673cae
FG
1139
1140 if (m->fsid != monc->get_fsid()) {
1141 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1142 << " != " << monc->get_fsid() << dendl;
1143 return;
1144 }
1145
1146 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1147 bool cluster_full = _osdmap_full_flag();
1148 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1149 _osdmap_has_pool_full();
1150 map<int64_t, bool> pool_full_map;
1151 for (map<int64_t, pg_pool_t>::const_iterator it
1152 = osdmap->get_pools().begin();
1153 it != osdmap->get_pools().end(); ++it)
1154 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1155
1156
1157 list<LingerOp*> need_resend_linger;
1158 map<ceph_tid_t, Op*> need_resend;
1159 map<ceph_tid_t, CommandOp*> need_resend_command;
1160
1161 if (m->get_last() <= osdmap->get_epoch()) {
1162 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1163 << m->get_first() << "," << m->get_last()
1164 << "] <= " << osdmap->get_epoch() << dendl;
1165 } else {
1166 ldout(cct, 3) << "handle_osd_map got epochs ["
1167 << m->get_first() << "," << m->get_last()
1168 << "] > " << osdmap->get_epoch() << dendl;
1169
1170 if (osdmap->get_epoch()) {
1171 bool skipped_map = false;
1172 // we want incrementals
1173 for (epoch_t e = osdmap->get_epoch() + 1;
1174 e <= m->get_last();
1175 e++) {
1176
1177 if (osdmap->get_epoch() == e-1 &&
1178 m->incremental_maps.count(e)) {
1179 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1180 << dendl;
1181 OSDMap::Incremental inc(m->incremental_maps[e]);
1182 osdmap->apply_incremental(inc);
31f18b77
FG
1183
1184 emit_blacklist_events(inc);
1185
7c673cae
FG
1186 logger->inc(l_osdc_map_inc);
1187 }
1188 else if (m->maps.count(e)) {
1189 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
31f18b77
FG
1190 OSDMap *new_osdmap = new OSDMap();
1191 new_osdmap->decode(m->maps[e]);
1192
1193 emit_blacklist_events(*osdmap, *new_osdmap);
1194
1195 osdmap = new_osdmap;
1196
7c673cae
FG
1197 logger->inc(l_osdc_map_full);
1198 }
1199 else {
1200 if (e >= m->get_oldest()) {
1201 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1202 << osdmap->get_epoch()+1 << dendl;
1203 _maybe_request_map();
1204 break;
1205 }
1206 ldout(cct, 3) << "handle_osd_map missing epoch "
1207 << osdmap->get_epoch()+1
1208 << ", jumping to " << m->get_oldest() << dendl;
1209 e = m->get_oldest() - 1;
1210 skipped_map = true;
1211 continue;
1212 }
1213 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1214
1215 cluster_full = cluster_full || _osdmap_full_flag();
1216 update_pool_full_map(pool_full_map);
1217
1218 // check all outstanding requests on every epoch
11fdf7f2
TL
1219 for (auto& i : need_resend) {
1220 _prune_snapc(osdmap->get_new_removed_snaps(), i.second);
1221 if (skipped_map) {
1222 _prune_snapc(m->gap_removed_snaps, i.second);
1223 }
1224 }
7c673cae
FG
1225 _scan_requests(homeless_session, skipped_map, cluster_full,
1226 &pool_full_map, need_resend,
11fdf7f2
TL
1227 need_resend_linger, need_resend_command, sul,
1228 &m->gap_removed_snaps);
7c673cae
FG
1229 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1230 p != osd_sessions.end(); ) {
1231 OSDSession *s = p->second;
1232 _scan_requests(s, skipped_map, cluster_full,
1233 &pool_full_map, need_resend,
11fdf7f2
TL
1234 need_resend_linger, need_resend_command, sul,
1235 &m->gap_removed_snaps);
7c673cae
FG
1236 ++p;
1237 // osd down or addr change?
1238 if (!osdmap->is_up(s->osd) ||
1239 (s->con &&
11fdf7f2 1240 s->con->get_peer_addrs() != osdmap->get_addrs(s->osd))) {
7c673cae
FG
1241 close_session(s);
1242 }
1243 }
1244
11fdf7f2 1245 ceph_assert(e == osdmap->get_epoch());
7c673cae
FG
1246 }
1247
1248 } else {
1249 // first map. we want the full thing.
1250 if (m->maps.count(m->get_last())) {
1251 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1252 p != osd_sessions.end(); ++p) {
1253 OSDSession *s = p->second;
1254 _scan_requests(s, false, false, NULL, need_resend,
11fdf7f2
TL
1255 need_resend_linger, need_resend_command, sul,
1256 nullptr);
7c673cae
FG
1257 }
1258 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1259 << m->get_last() << dendl;
1260 osdmap->decode(m->maps[m->get_last()]);
1261
1262 _scan_requests(homeless_session, false, false, NULL,
1263 need_resend, need_resend_linger,
11fdf7f2 1264 need_resend_command, sul, nullptr);
7c673cae
FG
1265 } else {
1266 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1267 << dendl;
1268 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1269 monc->renew_subs();
1270 }
1271 }
1272 }
1273
1274 // make sure need_resend targets reflect latest map
1275 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1276 Op *op = p->second;
1277 if (op->target.epoch < osdmap->get_epoch()) {
1278 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1279 int r = _calc_target(&op->target, nullptr);
1280 if (r == RECALC_OP_TARGET_POOL_DNE) {
1281 p = need_resend.erase(p);
1282 _check_op_pool_dne(op, nullptr);
1283 } else {
1284 ++p;
1285 }
1286 } else {
1287 ++p;
1288 }
1289 }
1290
1291 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1292 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1293 || _osdmap_has_pool_full();
1294
1295 // was/is paused?
1296 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1297 osdmap->get_epoch() < epoch_barrier) {
1298 _maybe_request_map();
1299 }
1300
1301 // resend requests
1302 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1303 p != need_resend.end(); ++p) {
1304 Op *op = p->second;
1305 OSDSession *s = op->session;
1306 bool mapped_session = false;
1307 if (!s) {
1308 int r = _map_session(&op->target, &s, sul);
11fdf7f2 1309 ceph_assert(r == 0);
7c673cae
FG
1310 mapped_session = true;
1311 } else {
1312 get_session(s);
1313 }
1314 OSDSession::unique_lock sl(s->lock);
1315 if (mapped_session) {
1316 _session_op_assign(s, op);
1317 }
1318 if (op->should_resend) {
1319 if (!op->session->is_homeless() && !op->target.paused) {
1320 logger->inc(l_osdc_op_resend);
1321 _send_op(op);
1322 }
1323 } else {
1324 _op_cancel_map_check(op);
1325 _cancel_linger_op(op);
1326 }
1327 sl.unlock();
1328 put_session(s);
1329 }
1330 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1331 p != need_resend_linger.end(); ++p) {
1332 LingerOp *op = *p;
11fdf7f2 1333 ceph_assert(op->session);
7c673cae
FG
1334 if (!op->session->is_homeless()) {
1335 logger->inc(l_osdc_linger_resend);
1336 _send_linger(op, sul);
1337 }
1338 }
1339 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1340 p != need_resend_command.end(); ++p) {
1341 CommandOp *c = p->second;
1342 if (c->target.osd >= 0) {
1343 _assign_command_session(c, sul);
1344 if (c->session && !c->session->is_homeless()) {
1345 _send_command(c);
1346 }
1347 }
1348 }
1349
1350 _dump_active();
1351
1352 // finish any Contexts that were waiting on a map update
1353 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1354 waiting_for_map.begin();
1355 while (p != waiting_for_map.end() &&
1356 p->first <= osdmap->get_epoch()) {
1357 //go through the list and call the onfinish methods
1358 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1359 i != p->second.end(); ++i) {
1360 i->first->complete(i->second);
1361 }
1362 waiting_for_map.erase(p++);
1363 }
1364
1365 monc->sub_got("osdmap", osdmap->get_epoch());
1366
1367 if (!waiting_for_map.empty()) {
1368 _maybe_request_map();
1369 }
1370}
1371
31f18b77
FG
1372void Objecter::enable_blacklist_events()
1373{
1374 unique_lock wl(rwlock);
1375
1376 blacklist_events_enabled = true;
1377}
1378
1379void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1380{
1381 unique_lock wl(rwlock);
1382
1383 if (events->empty()) {
1384 events->swap(blacklist_events);
1385 } else {
1386 for (const auto &i : blacklist_events) {
1387 events->insert(i);
1388 }
1389 blacklist_events.clear();
1390 }
1391}
1392
1393void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1394{
1395 if (!blacklist_events_enabled) {
1396 return;
1397 }
1398
1399 for (const auto &i : inc.new_blacklist) {
1400 blacklist_events.insert(i.first);
1401 }
1402}
1403
1404void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1405 const OSDMap &new_osd_map)
1406{
1407 if (!blacklist_events_enabled) {
1408 return;
1409 }
1410
1411 std::set<entity_addr_t> old_set;
1412 std::set<entity_addr_t> new_set;
1413
1414 old_osd_map.get_blacklist(&old_set);
1415 new_osd_map.get_blacklist(&new_set);
1416
1417 std::set<entity_addr_t> delta_set;
1418 std::set_difference(
1419 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1420 std::inserter(delta_set, delta_set.begin()));
1421 blacklist_events.insert(delta_set.begin(), delta_set.end());
1422}
1423
7c673cae
FG
1424// op pool check
1425
1426void Objecter::C_Op_Map_Latest::finish(int r)
1427{
1428 if (r == -EAGAIN || r == -ECANCELED)
1429 return;
1430
1431 lgeneric_subdout(objecter->cct, objecter, 10)
1432 << "op_map_latest r=" << r << " tid=" << tid
1433 << " latest " << latest << dendl;
1434
1435 Objecter::unique_lock wl(objecter->rwlock);
1436
1437 map<ceph_tid_t, Op*>::iterator iter =
1438 objecter->check_latest_map_ops.find(tid);
1439 if (iter == objecter->check_latest_map_ops.end()) {
1440 lgeneric_subdout(objecter->cct, objecter, 10)
1441 << "op_map_latest op "<< tid << " not found" << dendl;
1442 return;
1443 }
1444
1445 Op *op = iter->second;
1446 objecter->check_latest_map_ops.erase(iter);
1447
1448 lgeneric_subdout(objecter->cct, objecter, 20)
1449 << "op_map_latest op "<< op << dendl;
1450
1451 if (op->map_dne_bound == 0)
1452 op->map_dne_bound = latest;
1453
1454 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1455 objecter->_check_op_pool_dne(op, &sl);
1456
1457 op->put();
1458}
1459
1460int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1461 snapid_t *snap) const
1462{
1463 shared_lock rl(rwlock);
1464
1465 auto& pools = osdmap->get_pools();
1466 auto iter = pools.find(poolid);
1467 if (iter == pools.end()) {
1468 return -ENOENT;
1469 }
1470 const pg_pool_t& pg_pool = iter->second;
1471 for (auto p = pg_pool.snaps.begin();
1472 p != pg_pool.snaps.end();
1473 ++p) {
1474 if (p->second.name == snap_name) {
1475 *snap = p->first;
1476 return 0;
1477 }
1478 }
1479 return -ENOENT;
1480}
1481
1482int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1483 pool_snap_info_t *info) const
1484{
1485 shared_lock rl(rwlock);
1486
1487 auto& pools = osdmap->get_pools();
1488 auto iter = pools.find(poolid);
1489 if (iter == pools.end()) {
1490 return -ENOENT;
1491 }
1492 const pg_pool_t& pg_pool = iter->second;
1493 auto p = pg_pool.snaps.find(snap);
1494 if (p == pg_pool.snaps.end())
1495 return -ENOENT;
1496 *info = p->second;
1497
1498 return 0;
1499}
1500
1501int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1502{
1503 shared_lock rl(rwlock);
1504
1505 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1506 if (!pi)
1507 return -ENOENT;
1508 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1509 p != pi->snaps.end();
1510 ++p) {
1511 snaps->push_back(p->first);
1512 }
1513 return 0;
1514}
1515
1516// sl may be unlocked.
1517void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1518{
1519 // rwlock is locked unique
1520
1521 if (op->target.pool_ever_existed) {
1522 // the pool previously existed and now it does not, which means it
1523 // was deleted.
1524 op->map_dne_bound = osdmap->get_epoch();
1525 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1526 << " pool previously exists but now does not"
1527 << dendl;
1528 } else {
1529 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1530 << " current " << osdmap->get_epoch()
1531 << " map_dne_bound " << op->map_dne_bound
1532 << dendl;
1533 }
1534 if (op->map_dne_bound > 0) {
1535 if (osdmap->get_epoch() >= op->map_dne_bound) {
1536 // we had a new enough map
1537 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1538 << " concluding pool " << op->target.base_pgid.pool()
1539 << " dne" << dendl;
1540 if (op->onfinish) {
11fdf7f2 1541 num_in_flight--;
7c673cae
FG
1542 op->onfinish->complete(-ENOENT);
1543 }
1544
1545 OSDSession *s = op->session;
1546 if (s) {
11fdf7f2
TL
1547 ceph_assert(s != NULL);
1548 ceph_assert(sl->mutex() == &s->lock);
7c673cae
FG
1549 bool session_locked = sl->owns_lock();
1550 if (!session_locked) {
1551 sl->lock();
1552 }
1553 _finish_op(op, 0);
1554 if (!session_locked) {
1555 sl->unlock();
1556 }
1557 } else {
1558 _finish_op(op, 0); // no session
1559 }
1560 }
1561 } else {
1562 _send_op_map_check(op);
1563 }
1564}
1565
1566void Objecter::_send_op_map_check(Op *op)
1567{
1568 // rwlock is locked unique
1569 // ask the monitor
1570 if (check_latest_map_ops.count(op->tid) == 0) {
1571 op->get();
1572 check_latest_map_ops[op->tid] = op;
1573 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1574 monc->get_version("osdmap", &c->latest, NULL, c);
1575 }
1576}
1577
1578void Objecter::_op_cancel_map_check(Op *op)
1579{
1580 // rwlock is locked unique
1581 map<ceph_tid_t, Op*>::iterator iter =
1582 check_latest_map_ops.find(op->tid);
1583 if (iter != check_latest_map_ops.end()) {
1584 Op *op = iter->second;
1585 op->put();
1586 check_latest_map_ops.erase(iter);
1587 }
1588}
1589
1590// linger pool check
1591
1592void Objecter::C_Linger_Map_Latest::finish(int r)
1593{
1594 if (r == -EAGAIN || r == -ECANCELED) {
1595 // ignore callback; we will retry in resend_mon_ops()
1596 return;
1597 }
1598
1599 unique_lock wl(objecter->rwlock);
1600
1601 map<uint64_t, LingerOp*>::iterator iter =
1602 objecter->check_latest_map_lingers.find(linger_id);
1603 if (iter == objecter->check_latest_map_lingers.end()) {
1604 return;
1605 }
1606
1607 LingerOp *op = iter->second;
1608 objecter->check_latest_map_lingers.erase(iter);
1609
1610 if (op->map_dne_bound == 0)
1611 op->map_dne_bound = latest;
1612
1613 bool unregister;
1614 objecter->_check_linger_pool_dne(op, &unregister);
1615
1616 if (unregister) {
1617 objecter->_linger_cancel(op);
1618 }
1619
1620 op->put();
1621}
1622
1623void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1624{
1625 // rwlock is locked unique
1626
1627 *need_unregister = false;
1628
1629 if (op->register_gen > 0) {
1630 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1631 << " pool previously existed but now does not"
1632 << dendl;
1633 op->map_dne_bound = osdmap->get_epoch();
1634 } else {
1635 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1636 << " current " << osdmap->get_epoch()
1637 << " map_dne_bound " << op->map_dne_bound
1638 << dendl;
1639 }
1640 if (op->map_dne_bound > 0) {
1641 if (osdmap->get_epoch() >= op->map_dne_bound) {
28e407b8 1642 LingerOp::unique_lock wl{op->watch_lock};
7c673cae
FG
1643 if (op->on_reg_commit) {
1644 op->on_reg_commit->complete(-ENOENT);
28e407b8
AA
1645 op->on_reg_commit = nullptr;
1646 }
1647 if (op->on_notify_finish) {
1648 op->on_notify_finish->complete(-ENOENT);
1649 op->on_notify_finish = nullptr;
7c673cae
FG
1650 }
1651 *need_unregister = true;
1652 }
1653 } else {
1654 _send_linger_map_check(op);
1655 }
1656}
1657
1658void Objecter::_send_linger_map_check(LingerOp *op)
1659{
1660 // ask the monitor
1661 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1662 op->get();
1663 check_latest_map_lingers[op->linger_id] = op;
1664 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1665 monc->get_version("osdmap", &c->latest, NULL, c);
1666 }
1667}
1668
1669void Objecter::_linger_cancel_map_check(LingerOp *op)
1670{
1671 // rwlock is locked unique
1672
1673 map<uint64_t, LingerOp*>::iterator iter =
1674 check_latest_map_lingers.find(op->linger_id);
1675 if (iter != check_latest_map_lingers.end()) {
1676 LingerOp *op = iter->second;
1677 op->put();
1678 check_latest_map_lingers.erase(iter);
1679 }
1680}
1681
1682// command pool check
1683
1684void Objecter::C_Command_Map_Latest::finish(int r)
1685{
1686 if (r == -EAGAIN || r == -ECANCELED) {
1687 // ignore callback; we will retry in resend_mon_ops()
1688 return;
1689 }
1690
1691 unique_lock wl(objecter->rwlock);
1692
1693 map<uint64_t, CommandOp*>::iterator iter =
1694 objecter->check_latest_map_commands.find(tid);
1695 if (iter == objecter->check_latest_map_commands.end()) {
1696 return;
1697 }
1698
1699 CommandOp *c = iter->second;
1700 objecter->check_latest_map_commands.erase(iter);
1701
1702 if (c->map_dne_bound == 0)
1703 c->map_dne_bound = latest;
1704
28e407b8 1705 OSDSession::unique_lock sul(c->session->lock);
7c673cae 1706 objecter->_check_command_map_dne(c);
28e407b8 1707 sul.unlock();
7c673cae
FG
1708
1709 c->put();
1710}
1711
1712void Objecter::_check_command_map_dne(CommandOp *c)
1713{
1714 // rwlock is locked unique
28e407b8 1715 // session is locked unique
7c673cae
FG
1716
1717 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1718 << " current " << osdmap->get_epoch()
1719 << " map_dne_bound " << c->map_dne_bound
1720 << dendl;
1721 if (c->map_dne_bound > 0) {
1722 if (osdmap->get_epoch() >= c->map_dne_bound) {
1723 _finish_command(c, c->map_check_error, c->map_check_error_str);
1724 }
1725 } else {
1726 _send_command_map_check(c);
1727 }
1728}
1729
1730void Objecter::_send_command_map_check(CommandOp *c)
1731{
1732 // rwlock is locked unique
28e407b8 1733 // session is locked unique
7c673cae
FG
1734
1735 // ask the monitor
1736 if (check_latest_map_commands.count(c->tid) == 0) {
1737 c->get();
1738 check_latest_map_commands[c->tid] = c;
1739 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1740 monc->get_version("osdmap", &f->latest, NULL, f);
1741 }
1742}
1743
1744void Objecter::_command_cancel_map_check(CommandOp *c)
1745{
1746 // rwlock is locked uniqe
1747
1748 map<uint64_t, CommandOp*>::iterator iter =
1749 check_latest_map_commands.find(c->tid);
1750 if (iter != check_latest_map_commands.end()) {
1751 CommandOp *c = iter->second;
1752 c->put();
1753 check_latest_map_commands.erase(iter);
1754 }
1755}
1756
1757
1758/**
1759 * Look up OSDSession by OSD id.
1760 *
1761 * @returns 0 on success, or -EAGAIN if the lock context requires
1762 * promotion to write.
1763 */
1764int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1765{
11fdf7f2 1766 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
1767
1768 if (osd < 0) {
1769 *session = homeless_session;
1770 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1771 << dendl;
1772 return 0;
1773 }
1774
1775 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1776 if (p != osd_sessions.end()) {
1777 OSDSession *s = p->second;
1778 s->get();
1779 *session = s;
1780 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1781 << s->get_nref() << dendl;
1782 return 0;
1783 }
1784 if (!sul.owns_lock()) {
1785 return -EAGAIN;
1786 }
1787 OSDSession *s = new OSDSession(cct, osd);
1788 osd_sessions[osd] = s;
11fdf7f2
TL
1789 s->con = messenger->connect_to_osd(osdmap->get_addrs(osd));
1790 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1791 logger->inc(l_osdc_osd_session_open);
1792 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1793 s->get();
1794 *session = s;
1795 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1796 << s->get_nref() << dendl;
1797 return 0;
1798}
1799
1800void Objecter::put_session(Objecter::OSDSession *s)
1801{
1802 if (s && !s->is_homeless()) {
1803 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1804 << s->get_nref() << dendl;
1805 s->put();
1806 }
1807}
1808
1809void Objecter::get_session(Objecter::OSDSession *s)
1810{
11fdf7f2 1811 ceph_assert(s != NULL);
7c673cae
FG
1812
1813 if (!s->is_homeless()) {
1814 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1815 << s->get_nref() << dendl;
1816 s->get();
1817 }
1818}
1819
1820void Objecter::_reopen_session(OSDSession *s)
1821{
91327a77 1822 // rwlock is locked unique
7c673cae
FG
1823 // s->lock is locked
1824
11fdf7f2 1825 auto addrs = osdmap->get_addrs(s->osd);
7c673cae 1826 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
11fdf7f2 1827 << addrs << dendl;
7c673cae
FG
1828 if (s->con) {
1829 s->con->set_priv(NULL);
1830 s->con->mark_down();
1831 logger->inc(l_osdc_osd_session_close);
1832 }
11fdf7f2
TL
1833 s->con = messenger->connect_to_osd(addrs);
1834 s->con->set_priv(RefCountedPtr{s});
7c673cae
FG
1835 s->incarnation++;
1836 logger->inc(l_osdc_osd_session_open);
1837}
1838
1839void Objecter::close_session(OSDSession *s)
1840{
1841 // rwlock is locked unique
1842
1843 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1844 if (s->con) {
1845 s->con->set_priv(NULL);
1846 s->con->mark_down();
1847 logger->inc(l_osdc_osd_session_close);
1848 }
1849 OSDSession::unique_lock sl(s->lock);
1850
1851 std::list<LingerOp*> homeless_lingers;
1852 std::list<CommandOp*> homeless_commands;
1853 std::list<Op*> homeless_ops;
1854
1855 while (!s->linger_ops.empty()) {
1856 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1857 ldout(cct, 10) << " linger_op " << i->first << dendl;
1858 homeless_lingers.push_back(i->second);
1859 _session_linger_op_remove(s, i->second);
1860 }
1861
1862 while (!s->ops.empty()) {
1863 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1864 ldout(cct, 10) << " op " << i->first << dendl;
1865 homeless_ops.push_back(i->second);
1866 _session_op_remove(s, i->second);
1867 }
1868
1869 while (!s->command_ops.empty()) {
1870 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1871 ldout(cct, 10) << " command_op " << i->first << dendl;
1872 homeless_commands.push_back(i->second);
1873 _session_command_op_remove(s, i->second);
1874 }
1875
1876 osd_sessions.erase(s->osd);
1877 sl.unlock();
1878 put_session(s);
1879
1880 // Assign any leftover ops to the homeless session
1881 {
1882 OSDSession::unique_lock hsl(homeless_session->lock);
1883 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1884 i != homeless_lingers.end(); ++i) {
1885 _session_linger_op_assign(homeless_session, *i);
1886 }
1887 for (std::list<Op*>::iterator i = homeless_ops.begin();
1888 i != homeless_ops.end(); ++i) {
1889 _session_op_assign(homeless_session, *i);
1890 }
1891 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1892 i != homeless_commands.end(); ++i) {
1893 _session_command_op_assign(homeless_session, *i);
1894 }
1895 }
1896
1897 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1898}
1899
1900void Objecter::wait_for_osd_map()
1901{
1902 unique_lock l(rwlock);
1903 if (osdmap->get_epoch()) {
1904 l.unlock();
1905 return;
1906 }
1907
1908 // Leave this since it goes with C_SafeCond
1909 Mutex lock("");
1910 Cond cond;
1911 bool done;
1912 lock.Lock();
1913 C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
1914 waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
1915 l.unlock();
1916 while (!done)
1917 cond.Wait(lock);
1918 lock.Unlock();
1919}
1920
1921struct C_Objecter_GetVersion : public Context {
1922 Objecter *objecter;
1923 uint64_t oldest, newest;
1924 Context *fin;
1925 C_Objecter_GetVersion(Objecter *o, Context *c)
1926 : objecter(o), oldest(0), newest(0), fin(c) {}
1927 void finish(int r) override {
1928 if (r >= 0) {
1929 objecter->get_latest_version(oldest, newest, fin);
1930 } else if (r == -EAGAIN) { // try again as instructed
1931 objecter->wait_for_latest_osdmap(fin);
1932 } else {
1933 // it doesn't return any other error codes!
1934 ceph_abort();
1935 }
1936 }
1937};
1938
1939void Objecter::wait_for_latest_osdmap(Context *fin)
1940{
1941 ldout(cct, 10) << __func__ << dendl;
1942 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1943 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1944}
1945
1946void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1947{
1948 unique_lock wl(rwlock);
7c673cae 1949 if (osdmap->get_epoch() >= newest) {
11fdf7f2
TL
1950 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1951 wl.unlock();
7c673cae
FG
1952 if (fin)
1953 fin->complete(0);
1954 return;
1955 }
1956
1957 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1958 _wait_for_new_map(fin, newest, 0);
1959}
1960
1961void Objecter::maybe_request_map()
1962{
1963 shared_lock rl(rwlock);
1964 _maybe_request_map();
1965}
1966
1967void Objecter::_maybe_request_map()
1968{
1969 // rwlock is locked
1970 int flag = 0;
1971 if (_osdmap_full_flag()
1972 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1973 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1974 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1975 "osd map (FULL flag is set)" << dendl;
1976 } else {
1977 ldout(cct, 10)
1978 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1979 flag = CEPH_SUBSCRIBE_ONETIME;
1980 }
1981 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1982 if (monc->sub_want("osdmap", epoch, flag)) {
1983 monc->renew_subs();
1984 }
1985}
1986
1987void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
1988{
1989 // rwlock is locked unique
1990 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
1991 _maybe_request_map();
1992}
1993
1994
1995/**
1996 * Use this together with wait_for_map: this is a pre-check to avoid
1997 * allocating a Context for wait_for_map if we can see that we
1998 * definitely already have the epoch.
1999 *
2000 * This does *not* replace the need to handle the return value of
2001 * wait_for_map: just because we don't have it in this pre-check
2002 * doesn't mean we won't have it when calling back into wait_for_map,
2003 * since the objecter lock is dropped in between.
2004 */
2005bool Objecter::have_map(const epoch_t epoch)
2006{
2007 shared_lock rl(rwlock);
2008 if (osdmap->get_epoch() >= epoch) {
2009 return true;
2010 } else {
2011 return false;
2012 }
2013}
2014
2015bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2016{
2017 unique_lock wl(rwlock);
2018 if (osdmap->get_epoch() >= epoch) {
2019 return true;
2020 }
2021 _wait_for_new_map(c, epoch, err);
2022 return false;
2023}
2024
7c673cae
FG
2025void Objecter::_kick_requests(OSDSession *session,
2026 map<uint64_t, LingerOp *>& lresend)
2027{
2028 // rwlock is locked unique
2029
2030 // clear backoffs
2031 session->backoffs.clear();
2032 session->backoffs_by_id.clear();
2033
2034 // resend ops
2035 map<ceph_tid_t,Op*> resend; // resend in tid order
2036 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2037 p != session->ops.end();) {
2038 Op *op = p->second;
2039 ++p;
7c673cae
FG
2040 if (op->should_resend) {
2041 if (!op->target.paused)
2042 resend[op->tid] = op;
2043 } else {
2044 _op_cancel_map_check(op);
2045 _cancel_linger_op(op);
2046 }
2047 }
2048
11fdf7f2 2049 logger->inc(l_osdc_op_resend, resend.size());
7c673cae
FG
2050 while (!resend.empty()) {
2051 _send_op(resend.begin()->second);
2052 resend.erase(resend.begin());
2053 }
2054
2055 // resend lingers
11fdf7f2 2056 logger->inc(l_osdc_linger_resend, session->linger_ops.size());
7c673cae
FG
2057 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2058 j != session->linger_ops.end(); ++j) {
2059 LingerOp *op = j->second;
2060 op->get();
11fdf7f2 2061 ceph_assert(lresend.count(j->first) == 0);
7c673cae
FG
2062 lresend[j->first] = op;
2063 }
2064
2065 // resend commands
11fdf7f2 2066 logger->inc(l_osdc_command_resend, session->command_ops.size());
7c673cae
FG
2067 map<uint64_t,CommandOp*> cresend; // resend in order
2068 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2069 k != session->command_ops.end(); ++k) {
7c673cae
FG
2070 cresend[k->first] = k->second;
2071 }
2072 while (!cresend.empty()) {
2073 _send_command(cresend.begin()->second);
2074 cresend.erase(cresend.begin());
2075 }
2076}
2077
2078void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2079 unique_lock& ul)
2080{
11fdf7f2 2081 ceph_assert(ul.owns_lock());
7c673cae
FG
2082 shunique_lock sul(std::move(ul));
2083 while (!lresend.empty()) {
2084 LingerOp *op = lresend.begin()->second;
2085 if (!op->canceled) {
2086 _send_linger(op, sul);
2087 }
2088 op->put();
2089 lresend.erase(lresend.begin());
2090 }
11fdf7f2 2091 ul = sul.release_to_unique();
7c673cae
FG
2092}
2093
2094void Objecter::start_tick()
2095{
11fdf7f2 2096 ceph_assert(tick_event == 0);
7c673cae
FG
2097 tick_event =
2098 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2099 &Objecter::tick, this);
2100}
2101
2102void Objecter::tick()
2103{
2104 shared_lock rl(rwlock);
2105
2106 ldout(cct, 10) << "tick" << dendl;
2107
2108 // we are only called by C_Tick
2109 tick_event = 0;
2110
31f18b77 2111 if (!initialized) {
7c673cae
FG
2112 // we raced with shutdown
2113 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2114 return;
2115 }
2116
2117 set<OSDSession*> toping;
2118
2119
2120 // look for laggy requests
11fdf7f2 2121 auto cutoff = ceph::coarse_mono_clock::now();
7c673cae
FG
2122 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2123
2124 unsigned laggy_ops = 0;
2125
2126 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2127 siter != osd_sessions.end(); ++siter) {
2128 OSDSession *s = siter->second;
2129 OSDSession::lock_guard l(s->lock);
2130 bool found = false;
2131 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2132 p != s->ops.end();
2133 ++p) {
2134 Op *op = p->second;
11fdf7f2 2135 ceph_assert(op->session);
7c673cae
FG
2136 if (op->stamp < cutoff) {
2137 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2138 << " is laggy" << dendl;
2139 found = true;
2140 ++laggy_ops;
2141 }
2142 }
2143 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2144 p != s->linger_ops.end();
2145 ++p) {
2146 LingerOp *op = p->second;
2147 LingerOp::unique_lock wl(op->watch_lock);
11fdf7f2 2148 ceph_assert(op->session);
7c673cae
FG
2149 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2150 << " (osd." << op->session->osd << ")" << dendl;
2151 found = true;
2152 if (op->is_watch && op->registered && !op->last_error)
2153 _send_linger_ping(op);
2154 }
2155 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2156 p != s->command_ops.end();
2157 ++p) {
2158 CommandOp *op = p->second;
11fdf7f2 2159 ceph_assert(op->session);
7c673cae
FG
2160 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2161 << " (osd." << op->session->osd << ")" << dendl;
2162 found = true;
2163 }
2164 if (found)
2165 toping.insert(s);
2166 }
31f18b77 2167 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2168 _maybe_request_map();
2169 }
2170
2171 logger->set(l_osdc_op_laggy, laggy_ops);
2172 logger->set(l_osdc_osd_laggy, toping.size());
2173
2174 if (!toping.empty()) {
2175 // send a ping to these osds, to ensure we detect any session resets
2176 // (osd reply message policy is lossy)
2177 for (set<OSDSession*>::const_iterator i = toping.begin();
2178 i != toping.end();
2179 ++i) {
2180 (*i)->con->send_message(new MPing);
2181 }
2182 }
2183
11fdf7f2 2184 // Make sure we don't reschedule if we wake up after shutdown
31f18b77 2185 if (initialized) {
7c673cae
FG
2186 tick_event = timer.reschedule_me(ceph::make_timespan(
2187 cct->_conf->objecter_tick_interval));
2188 }
2189}
2190
2191void Objecter::resend_mon_ops()
2192{
2193 unique_lock wl(rwlock);
2194
2195 ldout(cct, 10) << "resend_mon_ops" << dendl;
2196
2197 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2198 p != poolstat_ops.end();
2199 ++p) {
2200 _poolstat_submit(p->second);
2201 logger->inc(l_osdc_poolstat_resend);
2202 }
2203
2204 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2205 p != statfs_ops.end();
2206 ++p) {
2207 _fs_stats_submit(p->second);
2208 logger->inc(l_osdc_statfs_resend);
2209 }
2210
2211 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2212 p != pool_ops.end();
2213 ++p) {
2214 _pool_op_submit(p->second);
2215 logger->inc(l_osdc_poolop_resend);
2216 }
2217
2218 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2219 p != check_latest_map_ops.end();
2220 ++p) {
2221 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2222 monc->get_version("osdmap", &c->latest, NULL, c);
2223 }
2224
2225 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2226 p != check_latest_map_lingers.end();
2227 ++p) {
2228 C_Linger_Map_Latest *c
2229 = new C_Linger_Map_Latest(this, p->second->linger_id);
2230 monc->get_version("osdmap", &c->latest, NULL, c);
2231 }
2232
2233 for (map<uint64_t, CommandOp*>::iterator p
2234 = check_latest_map_commands.begin();
2235 p != check_latest_map_commands.end();
2236 ++p) {
2237 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2238 monc->get_version("osdmap", &c->latest, NULL, c);
2239 }
2240}
2241
2242// read | write ---------------------------
2243
2244void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2245{
2246 shunique_lock rl(rwlock, ceph::acquire_shared);
2247 ceph_tid_t tid = 0;
2248 if (!ptid)
2249 ptid = &tid;
31f18b77 2250 op->trace.event("op submit");
7c673cae
FG
2251 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2252}
2253
2254void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2255 ceph_tid_t *ptid,
2256 int *ctx_budget)
2257{
11fdf7f2 2258 ceph_assert(initialized);
7c673cae 2259
11fdf7f2
TL
2260 ceph_assert(op->ops.size() == op->out_bl.size());
2261 ceph_assert(op->ops.size() == op->out_rval.size());
2262 ceph_assert(op->ops.size() == op->out_handler.size());
7c673cae
FG
2263
2264 // throttle. before we look at any state, because
2265 // _take_op_budget() may drop our lock while it blocks.
2266 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2267 int op_budget = _take_op_budget(op, sul);
2268 // take and pass out the budget for the first OP
2269 // in the context session
2270 if (ctx_budget && (*ctx_budget == -1)) {
2271 *ctx_budget = op_budget;
2272 }
2273 }
2274
2275 if (osd_timeout > timespan(0)) {
2276 if (op->tid == 0)
31f18b77 2277 op->tid = ++last_tid;
7c673cae
FG
2278 auto tid = op->tid;
2279 op->ontimeout = timer.add_event(osd_timeout,
2280 [this, tid]() {
2281 op_cancel(tid, -ETIMEDOUT); });
2282 }
2283
2284 _op_submit(op, sul, ptid);
2285}
2286
2287void Objecter::_send_op_account(Op *op)
2288{
31f18b77 2289 inflight_ops++;
7c673cae
FG
2290
2291 // add to gather set(s)
2292 if (op->onfinish) {
31f18b77 2293 num_in_flight++;
7c673cae
FG
2294 } else {
2295 ldout(cct, 20) << " note: not requesting reply" << dendl;
2296 }
2297
2298 logger->inc(l_osdc_op_active);
2299 logger->inc(l_osdc_op);
2300
2301 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2302 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2303 logger->inc(l_osdc_op_rmw);
2304 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2305 logger->inc(l_osdc_op_w);
2306 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2307 logger->inc(l_osdc_op_r);
2308
2309 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2310 logger->inc(l_osdc_op_pg);
2311
2312 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2313 int code = l_osdc_osdop_other;
2314 switch (p->op.op) {
2315 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2316 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2317 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2318 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2319 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2320 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2321 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2322 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2323 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2324 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2325 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2326 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2327 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2328 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2329 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2330 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2331 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
7c673cae
FG
2332
2333 // OMAP read operations
2334 case CEPH_OSD_OP_OMAPGETVALS:
2335 case CEPH_OSD_OP_OMAPGETKEYS:
2336 case CEPH_OSD_OP_OMAPGETHEADER:
2337 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2338 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2339
2340 // OMAP write operations
2341 case CEPH_OSD_OP_OMAPSETVALS:
2342 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2343
2344 // OMAP del operations
2345 case CEPH_OSD_OP_OMAPCLEAR:
2346 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2347
2348 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2349 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2350 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2351 }
2352 if (code)
2353 logger->inc(code);
2354 }
2355}
2356
2357void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2358{
2359 // rwlock is locked
2360
2361 ldout(cct, 10) << __func__ << " op " << op << dendl;
2362
2363 // pick target
11fdf7f2 2364 ceph_assert(op->session == NULL);
7c673cae
FG
2365 OSDSession *s = NULL;
2366
2367 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2368 == RECALC_OP_TARGET_POOL_DNE;
2369
2370 // Try to get a session, including a retry if we need to take write lock
2371 int r = _get_session(op->target.osd, &s, sul);
2372 if (r == -EAGAIN ||
11fdf7f2
TL
2373 (check_for_latest_map && sul.owns_lock_shared()) ||
2374 cct->_conf->objecter_debug_inject_relock_delay) {
7c673cae
FG
2375 epoch_t orig_epoch = osdmap->get_epoch();
2376 sul.unlock();
2377 if (cct->_conf->objecter_debug_inject_relock_delay) {
2378 sleep(1);
2379 }
2380 sul.lock();
2381 if (orig_epoch != osdmap->get_epoch()) {
2382 // map changed; recalculate mapping
2383 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2384 << dendl;
2385 check_for_latest_map = _calc_target(&op->target, nullptr)
2386 == RECALC_OP_TARGET_POOL_DNE;
2387 if (s) {
2388 put_session(s);
2389 s = NULL;
2390 r = -EAGAIN;
2391 }
2392 }
2393 }
2394 if (r == -EAGAIN) {
11fdf7f2 2395 ceph_assert(s == NULL);
7c673cae
FG
2396 r = _get_session(op->target.osd, &s, sul);
2397 }
11fdf7f2
TL
2398 ceph_assert(r == 0);
2399 ceph_assert(s); // may be homeless
7c673cae
FG
2400
2401 _send_op_account(op);
2402
2403 // send?
2404
11fdf7f2 2405 ceph_assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
7c673cae
FG
2406
2407 if (osdmap_full_try) {
2408 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2409 }
2410
2411 bool need_send = false;
2412
2413 if (osdmap->get_epoch() < epoch_barrier) {
2414 ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
2415 << dendl;
2416 op->target.paused = true;
2417 _maybe_request_map();
2418 } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
2419 osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2420 ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
2421 << dendl;
2422 op->target.paused = true;
2423 _maybe_request_map();
2424 } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
2425 osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
2426 ldout(cct, 10) << " paused read " << op << " tid " << op->tid
2427 << dendl;
2428 op->target.paused = true;
2429 _maybe_request_map();
2430 } else if (op->respects_full() &&
2431 (_osdmap_full_flag() ||
2432 _osdmap_pool_full(op->target.base_oloc.pool))) {
2433 ldout(cct, 0) << " FULL, paused modify " << op << " tid "
2434 << op->tid << dendl;
2435 op->target.paused = true;
2436 _maybe_request_map();
2437 } else if (!s->is_homeless()) {
2438 need_send = true;
2439 } else {
2440 _maybe_request_map();
2441 }
2442
7c673cae
FG
2443 OSDSession::unique_lock sl(s->lock);
2444 if (op->tid == 0)
31f18b77 2445 op->tid = ++last_tid;
7c673cae
FG
2446
2447 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2448 << " '" << op->target.base_oloc << "' '"
2449 << op->target.target_oloc << "' " << op->ops << " tid "
2450 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2451 << dendl;
2452
2453 _session_op_assign(s, op);
2454
2455 if (need_send) {
11fdf7f2 2456 _send_op(op);
7c673cae
FG
2457 }
2458
2459 // Last chance to touch Op here, after giving up session lock it can
2460 // be freed at any time by response handler.
2461 ceph_tid_t tid = op->tid;
2462 if (check_for_latest_map) {
2463 _send_op_map_check(op);
2464 }
2465 if (ptid)
2466 *ptid = tid;
2467 op = NULL;
2468
2469 sl.unlock();
2470 put_session(s);
2471
31f18b77 2472 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2473}
2474
2475int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2476{
11fdf7f2 2477 ceph_assert(initialized);
7c673cae
FG
2478
2479 OSDSession::unique_lock sl(s->lock);
2480
2481 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2482 if (p == s->ops.end()) {
2483 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2484 << s->osd << dendl;
2485 return -ENOENT;
2486 }
2487
11fdf7f2 2488#if 0
7c673cae
FG
2489 if (s->con) {
2490 ldout(cct, 20) << " revoking rx buffer for " << tid
2491 << " on " << s->con << dendl;
2492 s->con->revoke_rx_buffer(tid);
2493 }
11fdf7f2 2494#endif
7c673cae
FG
2495
2496 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2497 << dendl;
2498 Op *op = p->second;
2499 if (op->onfinish) {
31f18b77 2500 num_in_flight--;
7c673cae
FG
2501 op->onfinish->complete(r);
2502 op->onfinish = NULL;
2503 }
2504 _op_cancel_map_check(op);
2505 _finish_op(op, r);
2506 sl.unlock();
2507
2508 return 0;
2509}
2510
2511int Objecter::op_cancel(ceph_tid_t tid, int r)
2512{
2513 int ret = 0;
2514
2515 unique_lock wl(rwlock);
2516 ret = _op_cancel(tid, r);
2517
2518 return ret;
2519}
2520
94b18763
FG
2521int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2522{
2523 unique_lock wl(rwlock);
2524 ldout(cct,10) << __func__ << " " << tids << dendl;
2525 for (auto tid : tids) {
2526 _op_cancel(tid, r);
2527 }
2528 return 0;
2529}
2530
7c673cae
FG
2531int Objecter::_op_cancel(ceph_tid_t tid, int r)
2532{
2533 int ret = 0;
2534
2535 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2536 << dendl;
2537
2538start:
2539
2540 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2541 siter != osd_sessions.end(); ++siter) {
2542 OSDSession *s = siter->second;
2543 OSDSession::shared_lock sl(s->lock);
2544 if (s->ops.find(tid) != s->ops.end()) {
2545 sl.unlock();
2546 ret = op_cancel(s, tid, r);
2547 if (ret == -ENOENT) {
2548 /* oh no! raced, maybe tid moved to another session, restarting */
2549 goto start;
2550 }
2551 return ret;
2552 }
2553 }
2554
2555 ldout(cct, 5) << __func__ << ": tid " << tid
2556 << " not found in live sessions" << dendl;
2557
2558 // Handle case where the op is in homeless session
2559 OSDSession::shared_lock sl(homeless_session->lock);
2560 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2561 sl.unlock();
2562 ret = op_cancel(homeless_session, tid, r);
2563 if (ret == -ENOENT) {
2564 /* oh no! raced, maybe tid moved to another session, restarting */
2565 goto start;
2566 } else {
2567 return ret;
2568 }
2569 } else {
2570 sl.unlock();
2571 }
2572
2573 ldout(cct, 5) << __func__ << ": tid " << tid
2574 << " not found in homeless session" << dendl;
2575
2576 return ret;
2577}
2578
2579
2580epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2581{
2582 unique_lock wl(rwlock);
2583
2584 std::vector<ceph_tid_t> to_cancel;
2585 bool found = false;
2586
2587 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2588 siter != osd_sessions.end(); ++siter) {
2589 OSDSession *s = siter->second;
2590 OSDSession::shared_lock sl(s->lock);
2591 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2592 op_i != s->ops.end(); ++op_i) {
2593 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2594 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2595 to_cancel.push_back(op_i->first);
2596 }
2597 }
2598 sl.unlock();
2599
2600 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2601 titer != to_cancel.end();
2602 ++titer) {
2603 int cancel_result = op_cancel(s, *titer, r);
2604 // We hold rwlock across search and cancellation, so cancels
2605 // should always succeed
11fdf7f2 2606 ceph_assert(cancel_result == 0);
7c673cae
FG
2607 }
2608 if (!found && to_cancel.size())
2609 found = true;
2610 to_cancel.clear();
2611 }
2612
2613 const epoch_t epoch = osdmap->get_epoch();
2614
2615 wl.unlock();
2616
2617 if (found) {
2618 return epoch;
2619 } else {
2620 return -1;
2621 }
2622}
2623
2624bool Objecter::is_pg_changed(
2625 int oldprimary,
2626 const vector<int>& oldacting,
2627 int newprimary,
2628 const vector<int>& newacting,
2629 bool any_change)
2630{
2631 if (OSDMap::primary_changed(
2632 oldprimary,
2633 oldacting,
2634 newprimary,
2635 newacting))
2636 return true;
2637 if (any_change && oldacting != newacting)
2638 return true;
2639 return false; // same primary (tho replicas may have changed)
2640}
2641
2642bool Objecter::target_should_be_paused(op_target_t *t)
2643{
2644 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2645 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2646 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2647 _osdmap_full_flag() || _osdmap_pool_full(*pi);
2648
2649 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2650 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2651 (osdmap->get_epoch() < epoch_barrier);
2652}
2653
2654/**
2655 * Locking public accessor for _osdmap_full_flag
2656 */
2657bool Objecter::osdmap_full_flag() const
2658{
2659 shared_lock rl(rwlock);
2660
2661 return _osdmap_full_flag();
2662}
2663
2664bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2665{
2666 shared_lock rl(rwlock);
2667
2668 if (_osdmap_full_flag()) {
2669 return true;
2670 }
2671
2672 return _osdmap_pool_full(pool_id);
2673}
2674
2675bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2676{
2677 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2678 if (pool == NULL) {
2679 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2680 return false;
2681 }
2682
2683 return _osdmap_pool_full(*pool);
2684}
2685
2686bool Objecter::_osdmap_has_pool_full() const
2687{
2688 for (map<int64_t, pg_pool_t>::const_iterator it
2689 = osdmap->get_pools().begin();
2690 it != osdmap->get_pools().end(); ++it) {
2691 if (_osdmap_pool_full(it->second))
2692 return true;
2693 }
2694 return false;
2695}
2696
2697bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
2698{
2699 return p.has_flag(pg_pool_t::FLAG_FULL) && honor_osdmap_full;
2700}
2701
2702/**
2703 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2704 */
2705bool Objecter::_osdmap_full_flag() const
2706{
11fdf7f2 2707 // Ignore the FULL flag if the caller does not have honor_osdmap_full
7c673cae
FG
2708 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
2709}
2710
2711void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2712{
2713 for (map<int64_t, pg_pool_t>::const_iterator it
2714 = osdmap->get_pools().begin();
2715 it != osdmap->get_pools().end(); ++it) {
2716 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2717 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2718 } else {
2719 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2720 pool_full_map[it->first];
2721 }
2722 }
2723}
2724
2725int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2726 const string& ns)
2727{
2728 shared_lock rl(rwlock);
2729 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2730 if (!p)
2731 return -ENOENT;
2732 return p->hash_key(key, ns);
2733}
2734
2735int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2736 const string& ns)
2737{
2738 shared_lock rl(rwlock);
2739 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2740 if (!p)
2741 return -ENOENT;
2742 return p->raw_hash_to_pg(p->hash_key(key, ns));
2743}
2744
11fdf7f2
TL
2745void Objecter::_prune_snapc(
2746 const mempool::osdmap::map<int64_t,
2747 OSDMap::snap_interval_set_t>& new_removed_snaps,
2748 Op *op)
2749{
2750 bool match = false;
2751 auto i = new_removed_snaps.find(op->target.base_pgid.pool());
2752 if (i != new_removed_snaps.end()) {
2753 for (auto s : op->snapc.snaps) {
2754 if (i->second.contains(s)) {
2755 match = true;
2756 break;
2757 }
2758 }
2759 if (match) {
2760 vector<snapid_t> new_snaps;
2761 for (auto s : op->snapc.snaps) {
2762 if (!i->second.contains(s)) {
2763 new_snaps.push_back(s);
2764 }
2765 }
2766 op->snapc.snaps.swap(new_snaps);
2767 ldout(cct,10) << __func__ << " op " << op->tid << " snapc " << op->snapc
2768 << " (was " << new_snaps << ")" << dendl;
2769 }
2770 }
2771}
2772
7c673cae
FG
2773int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2774{
2775 // rwlock is locked
2776 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2777 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2778 t->epoch = osdmap->get_epoch();
2779 ldout(cct,20) << __func__ << " epoch " << t->epoch
2780 << " base " << t->base_oid << " " << t->base_oloc
2781 << " precalc_pgid " << (int)t->precalc_pgid
2782 << " pgid " << t->base_pgid
2783 << (is_read ? " is_read" : "")
2784 << (is_write ? " is_write" : "")
2785 << dendl;
2786
2787 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2788 if (!pi) {
2789 t->osd = -1;
2790 return RECALC_OP_TARGET_POOL_DNE;
2791 }
2792 ldout(cct,30) << __func__ << " base pi " << pi
2793 << " pg_num " << pi->get_pg_num() << dendl;
2794
2795 bool force_resend = false;
2796 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2797 if (t->last_force_resend < pi->last_force_op_resend) {
2798 t->last_force_resend = pi->last_force_op_resend;
2799 force_resend = true;
2800 } else if (t->last_force_resend == 0) {
2801 force_resend = true;
2802 }
2803 }
2804
2805 // apply tiering
2806 t->target_oid = t->base_oid;
2807 t->target_oloc = t->base_oloc;
2808 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2809 if (is_read && pi->has_read_tier())
2810 t->target_oloc.pool = pi->read_tier;
2811 if (is_write && pi->has_write_tier())
2812 t->target_oloc.pool = pi->write_tier;
2813 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2814 if (!pi) {
2815 t->osd = -1;
2816 return RECALC_OP_TARGET_POOL_DNE;
2817 }
2818 }
2819
2820 pg_t pgid;
2821 if (t->precalc_pgid) {
11fdf7f2
TL
2822 ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2823 ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
2824 ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
7c673cae
FG
2825 pgid = t->base_pgid;
2826 } else {
2827 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2828 pgid);
2829 if (ret == -ENOENT) {
2830 t->osd = -1;
2831 return RECALC_OP_TARGET_POOL_DNE;
2832 }
2833 }
2834 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2835 << t->target_oloc << " -> pgid " << pgid << dendl;
2836 ldout(cct,30) << __func__ << " target pi " << pi
2837 << " pg_num " << pi->get_pg_num() << dendl;
2838 t->pool_ever_existed = true;
2839
2840 int size = pi->size;
2841 int min_size = pi->min_size;
2842 unsigned pg_num = pi->get_pg_num();
11fdf7f2 2843 unsigned pg_num_pending = pi->get_pg_num_pending();
7c673cae
FG
2844 int up_primary, acting_primary;
2845 vector<int> up, acting;
2846 osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
2847 &acting, &acting_primary);
2848 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2849 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2850 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2851 pg_t prev_pgid(prev_seed, pgid.pool());
2852 if (any_change && PastIntervals::is_new_interval(
2853 t->acting_primary,
2854 acting_primary,
2855 t->acting,
2856 acting,
2857 t->up_primary,
2858 up_primary,
2859 t->up,
2860 up,
2861 t->size,
2862 size,
2863 t->min_size,
2864 min_size,
2865 t->pg_num,
2866 pg_num,
11fdf7f2
TL
2867 t->pg_num_pending,
2868 pg_num_pending,
7c673cae
FG
2869 t->sort_bitwise,
2870 sort_bitwise,
c07f9fc5
FG
2871 t->recovery_deletes,
2872 recovery_deletes,
7c673cae
FG
2873 prev_pgid)) {
2874 force_resend = true;
2875 }
2876
2877 bool unpaused = false;
f64942e4
AA
2878 bool should_be_paused = target_should_be_paused(t);
2879 if (t->paused && !should_be_paused) {
7c673cae
FG
2880 unpaused = true;
2881 }
f64942e4 2882 t->paused = should_be_paused;
7c673cae
FG
2883
2884 bool legacy_change =
2885 t->pgid != pgid ||
2886 is_pg_changed(
2887 t->acting_primary, t->acting, acting_primary, acting,
2888 t->used_replica || any_change);
11fdf7f2 2889 bool split_or_merge = false;
7c673cae 2890 if (t->pg_num) {
11fdf7f2
TL
2891 split_or_merge =
2892 prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
2893 prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
2894 prev_pgid.is_merge_target(t->pg_num, pg_num);
7c673cae
FG
2895 }
2896
11fdf7f2 2897 if (legacy_change || split_or_merge || force_resend) {
7c673cae
FG
2898 t->pgid = pgid;
2899 t->acting = acting;
2900 t->acting_primary = acting_primary;
2901 t->up_primary = up_primary;
2902 t->up = up;
2903 t->size = size;
2904 t->min_size = min_size;
2905 t->pg_num = pg_num;
2906 t->pg_num_mask = pi->get_pg_num_mask();
11fdf7f2 2907 t->pg_num_pending = pg_num_pending;
7c673cae
FG
2908 osdmap->get_primary_shard(
2909 pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
2910 &t->actual_pgid);
2911 t->sort_bitwise = sort_bitwise;
c07f9fc5 2912 t->recovery_deletes = recovery_deletes;
7c673cae
FG
2913 ldout(cct, 10) << __func__ << " "
2914 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2915 << " acting " << acting
2916 << " primary " << acting_primary << dendl;
2917 t->used_replica = false;
2918 if (acting_primary == -1) {
2919 t->osd = -1;
2920 } else {
2921 int osd;
2922 bool read = is_read && !is_write;
2923 if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2924 int p = rand() % acting.size();
2925 if (p)
2926 t->used_replica = true;
2927 osd = acting[p];
2928 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2929 << dendl;
2930 } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2931 acting.size() > 1) {
2932 // look for a local replica. prefer the primary if the
2933 // distance is the same.
2934 int best = -1;
2935 int best_locality = 0;
2936 for (unsigned i = 0; i < acting.size(); ++i) {
2937 int locality = osdmap->crush->get_common_ancestor_distance(
2938 cct, acting[i], crush_location);
2939 ldout(cct, 20) << __func__ << " localize: rank " << i
2940 << " osd." << acting[i]
2941 << " locality " << locality << dendl;
2942 if (i == 0 ||
2943 (locality >= 0 && best_locality >= 0 &&
2944 locality < best_locality) ||
2945 (best_locality < 0 && locality >= 0)) {
2946 best = i;
2947 best_locality = locality;
2948 if (i)
2949 t->used_replica = true;
2950 }
2951 }
11fdf7f2 2952 ceph_assert(best >= 0);
7c673cae
FG
2953 osd = acting[best];
2954 } else {
2955 osd = acting_primary;
2956 }
2957 t->osd = osd;
2958 }
2959 }
2960 if (legacy_change || unpaused || force_resend) {
2961 return RECALC_OP_TARGET_NEED_RESEND;
2962 }
11fdf7f2 2963 if (split_or_merge &&
91327a77
AA
2964 (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS ||
2965 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
2966 RESEND_ON_SPLIT))) {
7c673cae
FG
2967 return RECALC_OP_TARGET_NEED_RESEND;
2968 }
2969 return RECALC_OP_TARGET_NO_ACTION;
2970}
2971
2972int Objecter::_map_session(op_target_t *target, OSDSession **s,
2973 shunique_lock& sul)
2974{
2975 _calc_target(target, nullptr);
2976 return _get_session(target->osd, s, sul);
2977}
2978
2979void Objecter::_session_op_assign(OSDSession *to, Op *op)
2980{
2981 // to->lock is locked
11fdf7f2
TL
2982 ceph_assert(op->session == NULL);
2983 ceph_assert(op->tid);
7c673cae
FG
2984
2985 get_session(to);
2986 op->session = to;
2987 to->ops[op->tid] = op;
2988
2989 if (to->is_homeless()) {
31f18b77 2990 num_homeless_ops++;
7c673cae
FG
2991 }
2992
2993 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
2994}
2995
2996void Objecter::_session_op_remove(OSDSession *from, Op *op)
2997{
11fdf7f2 2998 ceph_assert(op->session == from);
7c673cae
FG
2999 // from->lock is locked
3000
3001 if (from->is_homeless()) {
31f18b77 3002 num_homeless_ops--;
7c673cae
FG
3003 }
3004
3005 from->ops.erase(op->tid);
3006 put_session(from);
3007 op->session = NULL;
3008
3009 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3010}
3011
3012void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3013{
3014 // to lock is locked unique
11fdf7f2 3015 ceph_assert(op->session == NULL);
7c673cae
FG
3016
3017 if (to->is_homeless()) {
31f18b77 3018 num_homeless_ops++;
7c673cae
FG
3019 }
3020
3021 get_session(to);
3022 op->session = to;
3023 to->linger_ops[op->linger_id] = op;
3024
3025 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3026 << dendl;
3027}
3028
3029void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3030{
11fdf7f2 3031 ceph_assert(from == op->session);
7c673cae
FG
3032 // from->lock is locked unique
3033
3034 if (from->is_homeless()) {
31f18b77 3035 num_homeless_ops--;
7c673cae
FG
3036 }
3037
3038 from->linger_ops.erase(op->linger_id);
3039 put_session(from);
3040 op->session = NULL;
3041
3042 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3043 << dendl;
3044}
3045
3046void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3047{
11fdf7f2 3048 ceph_assert(from == op->session);
7c673cae
FG
3049 // from->lock is locked
3050
3051 if (from->is_homeless()) {
31f18b77 3052 num_homeless_ops--;
7c673cae
FG
3053 }
3054
3055 from->command_ops.erase(op->tid);
3056 put_session(from);
3057 op->session = NULL;
3058
3059 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3060}
3061
3062void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3063{
3064 // to->lock is locked
11fdf7f2
TL
3065 ceph_assert(op->session == NULL);
3066 ceph_assert(op->tid);
7c673cae
FG
3067
3068 if (to->is_homeless()) {
31f18b77 3069 num_homeless_ops++;
7c673cae
FG
3070 }
3071
3072 get_session(to);
3073 op->session = to;
3074 to->command_ops[op->tid] = op;
3075
3076 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3077}
3078
3079int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3080 shunique_lock& sul)
3081{
3082 // rwlock is locked unique
3083
3084 int r = _calc_target(&linger_op->target, nullptr, true);
3085 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3086 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3087 << " pgid " << linger_op->target.pgid
3088 << " acting " << linger_op->target.acting << dendl;
3089
3090 OSDSession *s = NULL;
3091 r = _get_session(linger_op->target.osd, &s, sul);
11fdf7f2 3092 ceph_assert(r == 0);
7c673cae
FG
3093
3094 if (linger_op->session != s) {
3095 // NB locking two sessions (s and linger_op->session) at the
3096 // same time here is only safe because we are the only one that
3097 // takes two, and we are holding rwlock for write. Disable
3098 // lockdep because it doesn't know that.
3099 OSDSession::unique_lock sl(s->lock);
3100 _session_linger_op_remove(linger_op->session, linger_op);
3101 _session_linger_op_assign(s, linger_op);
3102 }
3103
3104 put_session(s);
3105 return RECALC_OP_TARGET_NEED_RESEND;
3106 }
3107 return r;
3108}
3109
3110void Objecter::_cancel_linger_op(Op *op)
3111{
3112 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3113
11fdf7f2 3114 ceph_assert(!op->should_resend);
7c673cae
FG
3115 if (op->onfinish) {
3116 delete op->onfinish;
31f18b77 3117 num_in_flight--;
7c673cae
FG
3118 }
3119
3120 _finish_op(op, 0);
3121}
3122
3123void Objecter::_finish_op(Op *op, int r)
3124{
11fdf7f2 3125 ldout(cct, 15) << __func__ << " " << op->tid << dendl;
7c673cae
FG
3126
3127 // op->session->lock is locked unique or op->session is null
3128
11fdf7f2
TL
3129 if (!op->ctx_budgeted && op->budget >= 0) {
3130 put_op_budget_bytes(op->budget);
3131 op->budget = -1;
3132 }
7c673cae
FG
3133
3134 if (op->ontimeout && r != -ETIMEDOUT)
3135 timer.cancel_event(op->ontimeout);
3136
3137 if (op->session) {
3138 _session_op_remove(op->session, op);
3139 }
3140
3141 logger->dec(l_osdc_op_active);
3142
11fdf7f2 3143 ceph_assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
7c673cae 3144
31f18b77 3145 inflight_ops--;
7c673cae
FG
3146
3147 op->put();
3148}
3149
7c673cae
FG
3150MOSDOp *Objecter::_prepare_osd_op(Op *op)
3151{
3152 // rwlock is locked
3153
3154 int flags = op->target.flags;
3155 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3156
3157 // Nothing checks this any longer, but needed for compatibility with
3158 // pre-luminous osds
3159 flags |= CEPH_OSD_FLAG_ONDISK;
3160
3161 if (!honor_osdmap_full)
3162 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3163
3164 op->target.paused = false;
11fdf7f2 3165 op->stamp = ceph::coarse_mono_clock::now();
7c673cae
FG
3166
3167 hobject_t hobj = op->target.get_hobj();
31f18b77 3168 MOSDOp *m = new MOSDOp(client_inc, op->tid,
7c673cae
FG
3169 hobj, op->target.actual_pgid,
3170 osdmap->get_epoch(),
3171 flags, op->features);
3172
3173 m->set_snapid(op->snapid);
3174 m->set_snap_seq(op->snapc.seq);
3175 m->set_snaps(op->snapc.snaps);
3176
3177 m->ops = op->ops;
3178 m->set_mtime(op->mtime);
3179 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3180
3181 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3182 op->trace.init("op", &trace_endpoint);
3183 }
7c673cae
FG
3184
3185 if (op->priority)
3186 m->set_priority(op->priority);
3187 else
3188 m->set_priority(cct->_conf->osd_client_op_priority);
3189
3190 if (op->reqid != osd_reqid_t()) {
3191 m->set_reqid(op->reqid);
3192 }
3193
3194 logger->inc(l_osdc_op_send);
b32b8144
FG
3195 ssize_t sum = 0;
3196 for (unsigned i = 0; i < m->ops.size(); i++) {
3197 sum += m->ops[i].indata.length();
3198 }
3199 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3200
3201 return m;
3202}
3203
11fdf7f2 3204void Objecter::_send_op(Op *op)
7c673cae
FG
3205{
3206 // rwlock is locked
3207 // op->session->lock is locked
3208
3209 // backoff?
7c673cae
FG
3210 auto p = op->session->backoffs.find(op->target.actual_pgid);
3211 if (p != op->session->backoffs.end()) {
b32b8144 3212 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3213 auto q = p->second.lower_bound(hoid);
3214 if (q != p->second.begin()) {
3215 --q;
3216 if (hoid >= q->second.end) {
3217 ++q;
3218 }
3219 }
3220 if (q != p->second.end()) {
3221 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3222 << "," << q->second.end << ")" << dendl;
3223 int r = cmp(hoid, q->second.begin);
3224 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3225 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3226 << " id " << q->second.id << " on " << hoid
3227 << ", queuing " << op << " tid " << op->tid << dendl;
3228 return;
3229 }
3230 }
3231 }
3232
11fdf7f2
TL
3233 ceph_assert(op->tid > 0);
3234 MOSDOp *m = _prepare_osd_op(op);
7c673cae
FG
3235
3236 if (op->target.actual_pgid != m->get_spg()) {
3237 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3238 << m->get_spg() << " to " << op->target.actual_pgid
3239 << ", updating and reencoding" << dendl;
3240 m->set_spg(op->target.actual_pgid);
3241 m->clear_payload(); // reencode
3242 }
3243
3244 ldout(cct, 15) << "_send_op " << op->tid << " to "
3245 << op->target.actual_pgid << " on osd." << op->session->osd
3246 << dendl;
3247
3248 ConnectionRef con = op->session->con;
11fdf7f2 3249 ceph_assert(con);
7c673cae 3250
11fdf7f2 3251#if 0
7c673cae
FG
3252 // preallocated rx buffer?
3253 if (op->con) {
3254 ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
3255 << op->con << dendl;
3256 op->con->revoke_rx_buffer(op->tid);
3257 }
3258 if (op->outbl &&
3259 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3260 op->outbl->length()) {
11fdf7f2 3261 op->outbl->invalidate_crc(); // messenger writes through c_str()
7c673cae
FG
3262 ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
3263 << dendl;
3264 op->con = con;
3265 op->con->post_rx_buffer(op->tid, *op->outbl);
3266 }
11fdf7f2 3267#endif
7c673cae
FG
3268
3269 op->incarnation = op->session->incarnation;
3270
31f18b77
FG
3271 if (op->trace.valid()) {
3272 m->trace.init("op msg", nullptr, &op->trace);
3273 }
7c673cae
FG
3274 op->session->con->send_message(m);
3275}
3276
11fdf7f2 3277int Objecter::calc_op_budget(const vector<OSDOp>& ops)
7c673cae
FG
3278{
3279 int op_budget = 0;
11fdf7f2
TL
3280 for (vector<OSDOp>::const_iterator i = ops.begin();
3281 i != ops.end();
7c673cae
FG
3282 ++i) {
3283 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3284 op_budget += i->indata.length();
3285 } else if (ceph_osd_op_mode_read(i->op.op)) {
a8e16298
TL
3286 if (ceph_osd_op_uses_extent(i->op.op)) {
3287 if ((int64_t)i->op.extent.length > 0)
3288 op_budget += (int64_t)i->op.extent.length;
7c673cae 3289 } else if (ceph_osd_op_type_attr(i->op.op)) {
a8e16298 3290 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
7c673cae
FG
3291 }
3292 }
3293 }
3294 return op_budget;
3295}
3296
3297void Objecter::_throttle_op(Op *op,
3298 shunique_lock& sul,
3299 int op_budget)
3300{
11fdf7f2 3301 ceph_assert(sul && sul.mutex() == &rwlock);
7c673cae
FG
3302 bool locked_for_write = sul.owns_lock();
3303
3304 if (!op_budget)
11fdf7f2 3305 op_budget = calc_op_budget(op->ops);
7c673cae
FG
3306 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3307 sul.unlock();
3308 op_throttle_bytes.get(op_budget);
3309 if (locked_for_write)
3310 sul.lock();
3311 else
3312 sul.lock_shared();
3313 }
3314 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3315 sul.unlock();
3316 op_throttle_ops.get(1);
3317 if (locked_for_write)
3318 sul.lock();
3319 else
3320 sul.lock_shared();
3321 }
3322}
3323
11fdf7f2 3324int Objecter::take_linger_budget(LingerOp *info)
7c673cae 3325{
11fdf7f2 3326 return 1;
7c673cae
FG
3327}
3328
3329/* This function DOES put the passed message before returning */
3330void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3331{
3332 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3333
3334 // get pio
3335 ceph_tid_t tid = m->get_tid();
3336
3337 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3338 if (!initialized) {
7c673cae
FG
3339 m->put();
3340 return;
3341 }
3342
3343 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3344 auto priv = con->get_priv();
3345 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3346 if (!s || s->con != con) {
3347 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3348 m->put();
3349 return;
3350 }
3351
3352 OSDSession::unique_lock sl(s->lock);
3353
3354 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3355 if (iter == s->ops.end()) {
3356 ldout(cct, 7) << "handle_osd_op_reply " << tid
3357 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3358 " onnvram" : " ack"))
3359 << " ... stray" << dendl;
3360 sl.unlock();
7c673cae
FG
3361 m->put();
3362 return;
3363 }
3364
3365 ldout(cct, 7) << "handle_osd_op_reply " << tid
3366 << (m->is_ondisk() ? " ondisk" :
3367 (m->is_onnvram() ? " onnvram" : " ack"))
3368 << " uv " << m->get_user_version()
3369 << " in " << m->get_pg()
3370 << " attempt " << m->get_retry_attempt()
3371 << dendl;
3372 Op *op = iter->second;
31f18b77 3373 op->trace.event("osd op reply");
7c673cae
FG
3374
3375 if (retry_writes_after_first_reply && op->attempts == 1 &&
3376 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3377 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3378 if (op->onfinish) {
31f18b77 3379 num_in_flight--;
7c673cae
FG
3380 }
3381 _session_op_remove(s, op);
3382 sl.unlock();
7c673cae
FG
3383
3384 _op_submit(op, sul, NULL);
3385 m->put();
3386 return;
3387 }
3388
3389 if (m->get_retry_attempt() >= 0) {
3390 if (m->get_retry_attempt() != (op->attempts - 1)) {
3391 ldout(cct, 7) << " ignoring reply from attempt "
3392 << m->get_retry_attempt()
3393 << " from " << m->get_source_inst()
3394 << "; last attempt " << (op->attempts - 1) << " sent to "
3395 << op->session->con->get_peer_addr() << dendl;
3396 m->put();
3397 sl.unlock();
7c673cae
FG
3398 return;
3399 }
3400 } else {
3401 // we don't know the request attempt because the server is old, so
3402 // just accept this one. we may do ACK callbacks we shouldn't
3403 // have, but that is better than doing callbacks out of order.
3404 }
3405
3406 Context *onfinish = 0;
3407
3408 int rc = m->get_result();
3409
3410 if (m->is_redirect_reply()) {
3411 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3412 if (op->onfinish)
31f18b77 3413 num_in_flight--;
7c673cae
FG
3414 _session_op_remove(s, op);
3415 sl.unlock();
7c673cae
FG
3416
3417 // FIXME: two redirects could race and reorder
3418
3419 op->tid = 0;
3420 m->get_redirect().combine_with_locator(op->target.target_oloc,
3421 op->target.target_oid.name);
f64942e4
AA
3422 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3423 CEPH_OSD_FLAG_IGNORE_CACHE |
3424 CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3425 _op_submit(op, sul, NULL);
3426 m->put();
3427 return;
3428 }
3429
3430 if (rc == -EAGAIN) {
3431 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
c07f9fc5
FG
3432 if (op->onfinish)
3433 num_in_flight--;
3434 _session_op_remove(s, op);
7c673cae 3435 sl.unlock();
c07f9fc5
FG
3436
3437 op->tid = 0;
3438 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3439 CEPH_OSD_FLAG_LOCALIZE_READS);
3440 op->target.pgid = pg_t();
3441 _op_submit(op, sul, NULL);
7c673cae
FG
3442 m->put();
3443 return;
3444 }
3445
3446 sul.unlock();
3447
3448 if (op->objver)
3449 *op->objver = m->get_user_version();
3450 if (op->reply_epoch)
3451 *op->reply_epoch = m->get_map_epoch();
3452 if (op->data_offset)
3453 *op->data_offset = m->get_header().data_off;
3454
3455 // got data?
3456 if (op->outbl) {
11fdf7f2 3457#if 0
7c673cae
FG
3458 if (op->con)
3459 op->con->revoke_rx_buffer(op->tid);
11fdf7f2
TL
3460#endif
3461 auto& bl = m->get_data();
3462 if (op->outbl->length() == bl.length() &&
3463 bl.get_num_buffers() <= 1) {
3464 // this is here to keep previous users to *relied* on getting data
3465 // read into existing buffers happy. Notably,
3466 // libradosstriper::RadosStriperImpl::aio_read().
3467 ldout(cct,10) << __func__ << " copying resulting " << bl.length()
3468 << " into existing buffer of length " << op->outbl->length()
3469 << dendl;
3470 bufferlist t;
3471 t.claim(*op->outbl);
3472 t.invalidate_crc(); // we're overwriting the raw buffers via c_str()
3473 bl.copy(0, bl.length(), t.c_str());
3474 op->outbl->substr_of(t, 0, bl.length());
3475 } else {
3476 m->claim_data(*op->outbl);
3477 }
7c673cae
FG
3478 op->outbl = 0;
3479 }
3480
3481 // per-op result demuxing
3482 vector<OSDOp> out_ops;
3483 m->claim_ops(out_ops);
3484
3485 if (out_ops.size() != op->ops.size())
3486 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3487 << " != request ops " << op->ops
3488 << " from " << m->get_source_inst() << dendl;
3489
3490 vector<bufferlist*>::iterator pb = op->out_bl.begin();
3491 vector<int*>::iterator pr = op->out_rval.begin();
3492 vector<Context*>::iterator ph = op->out_handler.begin();
11fdf7f2
TL
3493 ceph_assert(op->out_bl.size() == op->out_rval.size());
3494 ceph_assert(op->out_bl.size() == op->out_handler.size());
7c673cae
FG
3495 vector<OSDOp>::iterator p = out_ops.begin();
3496 for (unsigned i = 0;
3497 p != out_ops.end() && pb != op->out_bl.end();
3498 ++i, ++p, ++pb, ++pr, ++ph) {
3499 ldout(cct, 10) << " op " << i << " rval " << p->rval
3500 << " len " << p->outdata.length() << dendl;
3501 if (*pb)
3502 **pb = p->outdata;
3503 // set rval before running handlers so that handlers
3504 // can change it if e.g. decoding fails
3505 if (*pr)
31f18b77 3506 **pr = ceph_to_hostos_errno(p->rval);
7c673cae
FG
3507 if (*ph) {
3508 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
31f18b77 3509 (*ph)->complete(ceph_to_hostos_errno(p->rval));
7c673cae
FG
3510 *ph = NULL;
3511 }
3512 }
3513
3514 // NOTE: we assume that since we only request ONDISK ever we will
3515 // only ever get back one (type of) ack ever.
3516
3517 if (op->onfinish) {
31f18b77 3518 num_in_flight--;
7c673cae
FG
3519 onfinish = op->onfinish;
3520 op->onfinish = NULL;
3521 }
3522 logger->inc(l_osdc_op_reply);
3523
3524 /* get it before we call _finish_op() */
3525 auto completion_lock = s->get_lock(op->target.base_oid);
3526
3527 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3528 _finish_op(op, 0);
3529
31f18b77 3530 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3531
3532 // serialize completions
3533 if (completion_lock.mutex()) {
3534 completion_lock.lock();
3535 }
3536 sl.unlock();
3537
3538 // do callbacks
3539 if (onfinish) {
3540 onfinish->complete(rc);
3541 }
3542 if (completion_lock.mutex()) {
3543 completion_lock.unlock();
3544 }
3545
3546 m->put();
7c673cae
FG
3547}
3548
3549void Objecter::handle_osd_backoff(MOSDBackoff *m)
3550{
3551 ldout(cct, 10) << __func__ << " " << *m << dendl;
3552 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3553 if (!initialized) {
7c673cae
FG
3554 m->put();
3555 return;
3556 }
3557
3558 ConnectionRef con = m->get_connection();
11fdf7f2
TL
3559 auto priv = con->get_priv();
3560 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
3561 if (!s || s->con != con) {
3562 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
7c673cae
FG
3563 m->put();
3564 return;
3565 }
3566
3567 get_session(s);
7c673cae
FG
3568
3569 OSDSession::unique_lock sl(s->lock);
3570
3571 switch (m->op) {
3572 case CEPH_OSD_BACKOFF_OP_BLOCK:
3573 {
3574 // register
3575 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3576 s->backoffs_by_id.insert(make_pair(m->id, &b));
3577 b.pgid = m->pgid;
3578 b.id = m->id;
3579 b.begin = m->begin;
3580 b.end = m->end;
3581
3582 // ack with original backoff's epoch so that the osd can discard this if
3583 // there was a pg split.
3584 Message *r = new MOSDBackoff(m->pgid,
3585 m->map_epoch,
3586 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3587 m->id, m->begin, m->end);
3588 // this priority must match the MOSDOps from _prepare_osd_op
3589 r->set_priority(cct->_conf->osd_client_op_priority);
3590 con->send_message(r);
3591 }
3592 break;
3593
3594 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3595 {
3596 auto p = s->backoffs_by_id.find(m->id);
3597 if (p != s->backoffs_by_id.end()) {
3598 OSDBackoff *b = p->second;
3599 if (b->begin != m->begin &&
3600 b->end != m->end) {
3601 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3602 << " unblock on ["
3603 << m->begin << "," << m->end << ") but backoff is ["
3604 << b->begin << "," << b->end << ")" << dendl;
3605 // hrmpf, unblock it anyway.
3606 }
3607 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3608 << " id " << b->id
3609 << " [" << b->begin << "," << b->end
3610 << ")" << dendl;
3611 auto spgp = s->backoffs.find(b->pgid);
11fdf7f2 3612 ceph_assert(spgp != s->backoffs.end());
7c673cae
FG
3613 spgp->second.erase(b->begin);
3614 if (spgp->second.empty()) {
3615 s->backoffs.erase(spgp);
3616 }
3617 s->backoffs_by_id.erase(p);
3618
3619 // check for any ops to resend
3620 for (auto& q : s->ops) {
3621 if (q.second->target.actual_pgid == m->pgid) {
3622 int r = q.second->target.contained_by(m->begin, m->end);
3623 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3624 << q.second->target.get_hobj() << dendl;
3625 if (r) {
3626 _send_op(q.second);
3627 }
3628 }
3629 }
3630 } else {
3631 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3632 << " unblock on ["
3633 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3634 }
3635 }
3636 break;
3637
3638 default:
3639 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3640 }
3641
3642 sul.unlock();
3643 sl.unlock();
3644
3645 m->put();
3646 put_session(s);
3647}
3648
3649uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3650 uint32_t pos)
3651{
3652 shared_lock rl(rwlock);
3653 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3654 pos, list_context->pool_id, string());
11fdf7f2 3655 ldout(cct, 10) << __func__ << " " << list_context
7c673cae
FG
3656 << " pos " << pos << " -> " << list_context->pos << dendl;
3657 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3658 list_context->current_pg = actual.ps();
3659 list_context->at_end_of_pool = false;
3660 return pos;
3661}
3662
3663uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3664 const hobject_t& cursor)
3665{
3666 shared_lock rl(rwlock);
3667 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3668 list_context->pos = cursor;
3669 list_context->at_end_of_pool = false;
3670 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3671 list_context->current_pg = actual.ps();
3672 list_context->sort_bitwise = true;
3673 return list_context->current_pg;
3674}
3675
3676void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3677 hobject_t *cursor)
3678{
3679 shared_lock rl(rwlock);
3680 if (list_context->list.empty()) {
3681 *cursor = list_context->pos;
3682 } else {
3683 const librados::ListObjectImpl& entry = list_context->list.front();
3684 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3685 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3686 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3687 }
3688}
3689
3690void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3691{
3692 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3693 << " pool_snap_seq " << list_context->pool_snap_seq
3694 << " max_entries " << list_context->max_entries
3695 << " list_context " << list_context
3696 << " onfinish " << onfinish
3697 << " current_pg " << list_context->current_pg
3698 << " pos " << list_context->pos << dendl;
3699
3700 shared_lock rl(rwlock);
3701 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3702 if (!pool) { // pool is gone
3703 rl.unlock();
3704 put_nlist_context_budget(list_context);
3705 onfinish->complete(-ENOENT);
3706 return;
3707 }
3708 int pg_num = pool->get_pg_num();
3709 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3710
3711 if (list_context->pos.is_min()) {
3712 list_context->starting_pg_num = 0;
3713 list_context->sort_bitwise = sort_bitwise;
3714 list_context->starting_pg_num = pg_num;
3715 }
3716 if (list_context->sort_bitwise != sort_bitwise) {
3717 list_context->pos = hobject_t(
3718 object_t(), string(), CEPH_NOSNAP,
3719 list_context->current_pg, list_context->pool_id, string());
3720 list_context->sort_bitwise = sort_bitwise;
3721 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3722 << list_context->pos << dendl;
3723 }
3724 if (list_context->starting_pg_num != pg_num) {
3725 if (!sort_bitwise) {
3726 // start reading from the beginning; the pgs have changed
3727 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3728 list_context->pos = collection_list_handle_t();
3729 }
3730 list_context->starting_pg_num = pg_num;
3731 }
3732
3733 if (list_context->pos.is_max()) {
3734 ldout(cct, 20) << __func__ << " end of pool, list "
3735 << list_context->list << dendl;
3736 if (list_context->list.empty()) {
3737 list_context->at_end_of_pool = true;
3738 }
3739 // release the listing context's budget once all
3740 // OPs (in the session) are finished
3741 put_nlist_context_budget(list_context);
3742 onfinish->complete(0);
3743 return;
3744 }
3745
3746 ObjectOperation op;
3747 op.pg_nls(list_context->max_entries, list_context->filter,
3748 list_context->pos, osdmap->get_epoch());
3749 list_context->bl.clear();
3750 C_NList *onack = new C_NList(list_context, onfinish, this);
3751 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3752
3753 // note current_pg in case we don't have (or lose) SORTBITWISE
3754 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3755 rl.unlock();
3756
3757 pg_read(list_context->current_pg, oloc, op,
3758 &list_context->bl, 0, onack, &onack->epoch,
3759 &list_context->ctx_budget);
3760}
3761
3762void Objecter::_nlist_reply(NListContext *list_context, int r,
3763 Context *final_finish, epoch_t reply_epoch)
3764{
3765 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3766
11fdf7f2 3767 auto iter = list_context->bl.cbegin();
7c673cae
FG
3768 pg_nls_response_t response;
3769 bufferlist extra_info;
11fdf7f2 3770 decode(response, iter);
7c673cae 3771 if (!iter.end()) {
11fdf7f2 3772 decode(extra_info, iter);
7c673cae
FG
3773 }
3774
3775 // if the osd returns 1 (newer code), or handle MAX, it means we
3776 // hit the end of the pg.
3777 if ((response.handle.is_max() || r == 1) &&
3778 !list_context->sort_bitwise) {
3779 // legacy OSD and !sortbitwise, figure out the next PG on our own
3780 ++list_context->current_pg;
3781 if (list_context->current_pg == list_context->starting_pg_num) {
3782 // end of pool
3783 list_context->pos = hobject_t::get_max();
3784 } else {
3785 // next pg
3786 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3787 list_context->current_pg,
3788 list_context->pool_id, string());
3789 }
3790 } else {
3791 list_context->pos = response.handle;
3792 }
3793
3794 int response_size = response.entries.size();
3795 ldout(cct, 20) << " response.entries.size " << response_size
3796 << ", response.entries " << response.entries
3797 << ", handle " << response.handle
3798 << ", tentative new pos " << list_context->pos << dendl;
3799 list_context->extra_info.append(extra_info);
3800 if (response_size) {
3801 list_context->list.splice(list_context->list.end(), response.entries);
3802 }
3803
3804 if (list_context->list.size() >= list_context->max_entries) {
3805 ldout(cct, 20) << " hit max, returning results so far, "
3806 << list_context->list << dendl;
3807 // release the listing context's budget once all
3808 // OPs (in the session) are finished
3809 put_nlist_context_budget(list_context);
3810 final_finish->complete(0);
3811 return;
3812 }
3813
3814 // continue!
3815 list_nobjects(list_context, final_finish);
3816}
3817
3818void Objecter::put_nlist_context_budget(NListContext *list_context)
3819{
3820 if (list_context->ctx_budget >= 0) {
3821 ldout(cct, 10) << " release listing context's budget " <<
3822 list_context->ctx_budget << dendl;
3823 put_op_budget_bytes(list_context->ctx_budget);
3824 list_context->ctx_budget = -1;
3825 }
3826}
3827
3828// snapshots
3829
3830int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3831 Context *onfinish)
3832{
3833 unique_lock wl(rwlock);
3834 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3835 << snap_name << dendl;
3836
3837 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3838 if (!p)
3839 return -EINVAL;
3840 if (p->snap_exists(snap_name.c_str()))
3841 return -EEXIST;
3842
3843 PoolOp *op = new PoolOp;
3844 if (!op)
3845 return -ENOMEM;
31f18b77 3846 op->tid = ++last_tid;
7c673cae
FG
3847 op->pool = pool;
3848 op->name = snap_name;
3849 op->onfinish = onfinish;
3850 op->pool_op = POOL_OP_CREATE_SNAP;
3851 pool_ops[op->tid] = op;
3852
3853 pool_op_submit(op);
3854
3855 return 0;
3856}
3857
3858struct C_SelfmanagedSnap : public Context {
3859 bufferlist bl;
3860 snapid_t *psnapid;
3861 Context *fin;
3862 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3863 void finish(int r) override {
3864 if (r == 0) {
11fdf7f2
TL
3865 try {
3866 auto p = bl.cbegin();
3867 decode(*psnapid, p);
3868 } catch (buffer::error&) {
3869 r = -EIO;
3870 }
7c673cae
FG
3871 }
3872 fin->complete(r);
3873 }
3874};
3875
3876int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3877 Context *onfinish)
3878{
3879 unique_lock wl(rwlock);
3880 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3881 PoolOp *op = new PoolOp;
3882 if (!op) return -ENOMEM;
31f18b77 3883 op->tid = ++last_tid;
7c673cae
FG
3884 op->pool = pool;
3885 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3886 op->onfinish = fin;
3887 op->blp = &fin->bl;
3888 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3889 pool_ops[op->tid] = op;
3890
3891 pool_op_submit(op);
3892 return 0;
3893}
3894
3895int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3896 Context *onfinish)
3897{
3898 unique_lock wl(rwlock);
3899 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3900 << snap_name << dendl;
3901
3902 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3903 if (!p)
3904 return -EINVAL;
3905 if (!p->snap_exists(snap_name.c_str()))
3906 return -ENOENT;
3907
3908 PoolOp *op = new PoolOp;
3909 if (!op)
3910 return -ENOMEM;
31f18b77 3911 op->tid = ++last_tid;
7c673cae
FG
3912 op->pool = pool;
3913 op->name = snap_name;
3914 op->onfinish = onfinish;
3915 op->pool_op = POOL_OP_DELETE_SNAP;
3916 pool_ops[op->tid] = op;
3917
3918 pool_op_submit(op);
3919
3920 return 0;
3921}
3922
3923int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3924 Context *onfinish)
3925{
3926 unique_lock wl(rwlock);
3927 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3928 << snap << dendl;
3929 PoolOp *op = new PoolOp;
3930 if (!op) return -ENOMEM;
31f18b77 3931 op->tid = ++last_tid;
7c673cae
FG
3932 op->pool = pool;
3933 op->onfinish = onfinish;
3934 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3935 op->snapid = snap;
3936 pool_ops[op->tid] = op;
3937
3938 pool_op_submit(op);
3939
3940 return 0;
3941}
3942
11fdf7f2 3943int Objecter::create_pool(string& name, Context *onfinish,
7c673cae
FG
3944 int crush_rule)
3945{
3946 unique_lock wl(rwlock);
3947 ldout(cct, 10) << "create_pool name=" << name << dendl;
3948
3949 if (osdmap->lookup_pg_pool_name(name) >= 0)
3950 return -EEXIST;
3951
3952 PoolOp *op = new PoolOp;
3953 if (!op)
3954 return -ENOMEM;
31f18b77 3955 op->tid = ++last_tid;
7c673cae
FG
3956 op->pool = 0;
3957 op->name = name;
3958 op->onfinish = onfinish;
3959 op->pool_op = POOL_OP_CREATE;
3960 pool_ops[op->tid] = op;
7c673cae
FG
3961 op->crush_rule = crush_rule;
3962
3963 pool_op_submit(op);
3964
3965 return 0;
3966}
3967
3968int Objecter::delete_pool(int64_t pool, Context *onfinish)
3969{
3970 unique_lock wl(rwlock);
3971 ldout(cct, 10) << "delete_pool " << pool << dendl;
3972
3973 if (!osdmap->have_pg_pool(pool))
3974 return -ENOENT;
3975
3976 _do_delete_pool(pool, onfinish);
3977 return 0;
3978}
3979
3980int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3981{
3982 unique_lock wl(rwlock);
3983 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3984
3985 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3986 if (pool < 0)
3987 return pool;
3988
3989 _do_delete_pool(pool, onfinish);
3990 return 0;
3991}
3992
3993void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
3994{
3995 PoolOp *op = new PoolOp;
31f18b77 3996 op->tid = ++last_tid;
7c673cae
FG
3997 op->pool = pool;
3998 op->name = "delete";
3999 op->onfinish = onfinish;
4000 op->pool_op = POOL_OP_DELETE;
4001 pool_ops[op->tid] = op;
4002 pool_op_submit(op);
4003}
4004
7c673cae
FG
4005void Objecter::pool_op_submit(PoolOp *op)
4006{
4007 // rwlock is locked
4008 if (mon_timeout > timespan(0)) {
4009 op->ontimeout = timer.add_event(mon_timeout,
4010 [this, op]() {
4011 pool_op_cancel(op->tid, -ETIMEDOUT); });
4012 }
4013 _pool_op_submit(op);
4014}
4015
4016void Objecter::_pool_op_submit(PoolOp *op)
4017{
4018 // rwlock is locked unique
4019
4020 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4021 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4022 op->name, op->pool_op,
11fdf7f2 4023 last_seen_osdmap_version);
7c673cae
FG
4024 if (op->snapid) m->snapid = op->snapid;
4025 if (op->crush_rule) m->crush_rule = op->crush_rule;
4026 monc->send_mon_message(m);
11fdf7f2 4027 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4028
4029 logger->inc(l_osdc_poolop_send);
4030}
4031
4032/**
4033 * Handle a reply to a PoolOp message. Check that we sent the message
4034 * and give the caller responsibility for the returned bufferlist.
4035 * Then either call the finisher or stash the PoolOp, depending on if we
4036 * have a new enough map.
4037 * Lastly, clean up the message and PoolOp.
4038 */
4039void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4040{
11fdf7f2 4041 FUNCTRACE(cct);
7c673cae 4042 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4043 if (!initialized) {
7c673cae
FG
4044 sul.unlock();
4045 m->put();
4046 return;
4047 }
4048
4049 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4050 ceph_tid_t tid = m->get_tid();
4051 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4052 if (iter != pool_ops.end()) {
4053 PoolOp *op = iter->second;
4054 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4055 << ceph_pool_op_name(op->pool_op) << dendl;
4056 if (op->blp)
4057 op->blp->claim(m->response_data);
4058 if (m->version > last_seen_osdmap_version)
4059 last_seen_osdmap_version = m->version;
4060 if (osdmap->get_epoch() < m->epoch) {
4061 sul.unlock();
4062 sul.lock();
4063 // recheck op existence since we have let go of rwlock
4064 // (for promotion) above.
4065 iter = pool_ops.find(tid);
4066 if (iter == pool_ops.end())
4067 goto done; // op is gone.
4068 if (osdmap->get_epoch() < m->epoch) {
4069 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4070 << " before calling back" << dendl;
4071 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4072 } else {
4073 // map epoch changed, probably because a MOSDMap message
4074 // sneaked in. Do caller-specified callback now or else
4075 // we lose it forever.
11fdf7f2 4076 ceph_assert(op->onfinish);
7c673cae
FG
4077 op->onfinish->complete(m->replyCode);
4078 }
4079 } else {
11fdf7f2 4080 ceph_assert(op->onfinish);
7c673cae
FG
4081 op->onfinish->complete(m->replyCode);
4082 }
4083 op->onfinish = NULL;
4084 if (!sul.owns_lock()) {
4085 sul.unlock();
4086 sul.lock();
4087 }
4088 iter = pool_ops.find(tid);
4089 if (iter != pool_ops.end()) {
4090 _finish_pool_op(op, 0);
4091 }
4092 } else {
4093 ldout(cct, 10) << "unknown request " << tid << dendl;
4094 }
4095
4096done:
4097 // Not strictly necessary, since we'll release it on return.
4098 sul.unlock();
4099
4100 ldout(cct, 10) << "done" << dendl;
4101 m->put();
4102}
4103
4104int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4105{
11fdf7f2 4106 ceph_assert(initialized);
7c673cae
FG
4107
4108 unique_lock wl(rwlock);
4109
4110 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4111 if (it == pool_ops.end()) {
4112 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4113 return -ENOENT;
4114 }
4115
4116 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4117
4118 PoolOp *op = it->second;
4119 if (op->onfinish)
4120 op->onfinish->complete(r);
4121
4122 _finish_pool_op(op, r);
4123 return 0;
4124}
4125
4126void Objecter::_finish_pool_op(PoolOp *op, int r)
4127{
4128 // rwlock is locked unique
4129 pool_ops.erase(op->tid);
4130 logger->set(l_osdc_poolop_active, pool_ops.size());
4131
4132 if (op->ontimeout && r != -ETIMEDOUT) {
4133 timer.cancel_event(op->ontimeout);
4134 }
4135
4136 delete op;
4137}
4138
4139// pool stats
4140
4141void Objecter::get_pool_stats(list<string>& pools,
4142 map<string,pool_stat_t> *result,
81eedcae 4143 bool *per_pool,
7c673cae
FG
4144 Context *onfinish)
4145{
4146 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4147
4148 PoolStatOp *op = new PoolStatOp;
31f18b77 4149 op->tid = ++last_tid;
7c673cae
FG
4150 op->pools = pools;
4151 op->pool_stats = result;
81eedcae 4152 op->per_pool = per_pool;
7c673cae
FG
4153 op->onfinish = onfinish;
4154 if (mon_timeout > timespan(0)) {
4155 op->ontimeout = timer.add_event(mon_timeout,
4156 [this, op]() {
4157 pool_stat_op_cancel(op->tid,
4158 -ETIMEDOUT); });
4159 } else {
4160 op->ontimeout = 0;
4161 }
4162
4163 unique_lock wl(rwlock);
4164
4165 poolstat_ops[op->tid] = op;
4166
4167 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4168
4169 _poolstat_submit(op);
4170}
4171
4172void Objecter::_poolstat_submit(PoolStatOp *op)
4173{
4174 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4175 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4176 op->pools,
4177 last_seen_pgmap_version));
11fdf7f2 4178 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4179
4180 logger->inc(l_osdc_poolstat_send);
4181}
4182
4183void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4184{
4185 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4186 ceph_tid_t tid = m->get_tid();
4187
4188 unique_lock wl(rwlock);
31f18b77 4189 if (!initialized) {
7c673cae
FG
4190 m->put();
4191 return;
4192 }
4193
4194 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4195 if (iter != poolstat_ops.end()) {
4196 PoolStatOp *op = poolstat_ops[tid];
4197 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4198 *op->pool_stats = m->pool_stats;
81eedcae 4199 *op->per_pool = m->per_pool;
7c673cae
FG
4200 if (m->version > last_seen_pgmap_version) {
4201 last_seen_pgmap_version = m->version;
4202 }
4203 op->onfinish->complete(0);
4204 _finish_pool_stat_op(op, 0);
4205 } else {
4206 ldout(cct, 10) << "unknown request " << tid << dendl;
4207 }
4208 ldout(cct, 10) << "done" << dendl;
4209 m->put();
4210}
4211
4212int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4213{
11fdf7f2 4214 ceph_assert(initialized);
7c673cae
FG
4215
4216 unique_lock wl(rwlock);
4217
4218 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4219 if (it == poolstat_ops.end()) {
4220 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4221 return -ENOENT;
4222 }
4223
4224 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4225
4226 PoolStatOp *op = it->second;
4227 if (op->onfinish)
4228 op->onfinish->complete(r);
4229 _finish_pool_stat_op(op, r);
4230 return 0;
4231}
4232
4233void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4234{
4235 // rwlock is locked unique
4236
4237 poolstat_ops.erase(op->tid);
4238 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4239
4240 if (op->ontimeout && r != -ETIMEDOUT)
4241 timer.cancel_event(op->ontimeout);
4242
4243 delete op;
4244}
4245
d2e6a577
FG
4246void Objecter::get_fs_stats(ceph_statfs& result,
4247 boost::optional<int64_t> data_pool,
4248 Context *onfinish)
7c673cae
FG
4249{
4250 ldout(cct, 10) << "get_fs_stats" << dendl;
4251 unique_lock l(rwlock);
4252
4253 StatfsOp *op = new StatfsOp;
31f18b77 4254 op->tid = ++last_tid;
7c673cae 4255 op->stats = &result;
d2e6a577 4256 op->data_pool = data_pool;
7c673cae
FG
4257 op->onfinish = onfinish;
4258 if (mon_timeout > timespan(0)) {
4259 op->ontimeout = timer.add_event(mon_timeout,
4260 [this, op]() {
4261 statfs_op_cancel(op->tid,
4262 -ETIMEDOUT); });
4263 } else {
4264 op->ontimeout = 0;
4265 }
4266 statfs_ops[op->tid] = op;
4267
4268 logger->set(l_osdc_statfs_active, statfs_ops.size());
4269
4270 _fs_stats_submit(op);
4271}
4272
4273void Objecter::_fs_stats_submit(StatfsOp *op)
4274{
4275 // rwlock is locked unique
4276
4277 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4278 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4279 op->data_pool,
7c673cae 4280 last_seen_pgmap_version));
11fdf7f2 4281 op->last_submit = ceph::coarse_mono_clock::now();
7c673cae
FG
4282
4283 logger->inc(l_osdc_statfs_send);
4284}
4285
4286void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4287{
4288 unique_lock wl(rwlock);
31f18b77 4289 if (!initialized) {
7c673cae
FG
4290 m->put();
4291 return;
4292 }
4293
4294 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4295 ceph_tid_t tid = m->get_tid();
4296
4297 if (statfs_ops.count(tid)) {
4298 StatfsOp *op = statfs_ops[tid];
4299 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4300 *(op->stats) = m->h.st;
4301 if (m->h.version > last_seen_pgmap_version)
4302 last_seen_pgmap_version = m->h.version;
4303 op->onfinish->complete(0);
4304 _finish_statfs_op(op, 0);
4305 } else {
4306 ldout(cct, 10) << "unknown request " << tid << dendl;
4307 }
4308 m->put();
4309 ldout(cct, 10) << "done" << dendl;
4310}
4311
4312int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4313{
11fdf7f2 4314 ceph_assert(initialized);
7c673cae
FG
4315
4316 unique_lock wl(rwlock);
4317
4318 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4319 if (it == statfs_ops.end()) {
4320 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4321 return -ENOENT;
4322 }
4323
4324 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4325
4326 StatfsOp *op = it->second;
4327 if (op->onfinish)
4328 op->onfinish->complete(r);
4329 _finish_statfs_op(op, r);
4330 return 0;
4331}
4332
4333void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4334{
4335 // rwlock is locked unique
4336
4337 statfs_ops.erase(op->tid);
4338 logger->set(l_osdc_statfs_active, statfs_ops.size());
4339
4340 if (op->ontimeout && r != -ETIMEDOUT)
4341 timer.cancel_event(op->ontimeout);
4342
4343 delete op;
4344}
4345
4346// scatter/gather
4347
4348void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4349 vector<bufferlist>& resultbl,
4350 bufferlist *bl, Context *onfinish)
4351{
4352 // all done
4353 ldout(cct, 15) << "_sg_read_finish" << dendl;
4354
4355 if (extents.size() > 1) {
4356 Striper::StripedReadResult r;
4357 vector<bufferlist>::iterator bit = resultbl.begin();
4358 for (vector<ObjectExtent>::iterator eit = extents.begin();
4359 eit != extents.end();
4360 ++eit, ++bit) {
4361 r.add_partial_result(cct, *bit, eit->buffer_extents);
4362 }
4363 bl->clear();
4364 r.assemble_result(cct, *bl, false);
4365 } else {
4366 ldout(cct, 15) << " only one frag" << dendl;
4367 bl->claim(resultbl[0]);
4368 }
4369
4370 // done
4371 uint64_t bytes_read = bl->length();
4372 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4373
4374 if (onfinish) {
4375 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4376 }
4377}
4378
4379
4380void Objecter::ms_handle_connect(Connection *con)
4381{
4382 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4383 if (!initialized)
7c673cae
FG
4384 return;
4385
4386 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4387 resend_mon_ops();
4388}
4389
4390bool Objecter::ms_handle_reset(Connection *con)
4391{
31f18b77 4392 if (!initialized)
7c673cae
FG
4393 return false;
4394 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
91327a77 4395 unique_lock wl(rwlock);
11fdf7f2
TL
4396
4397 auto priv = con->get_priv();
4398 auto session = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4399 if (session) {
4400 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4401 << " osd." << session->osd << dendl;
a8e16298
TL
4402 // the session maybe had been closed if new osdmap just handled
4403 // says the osd down
4404 if (!(initialized && osdmap->is_up(session->osd))) {
4405 ldout(cct, 1) << "ms_handle_reset aborted,initialized=" << initialized << dendl;
7c673cae
FG
4406 wl.unlock();
4407 return false;
4408 }
4409 map<uint64_t, LingerOp *> lresend;
4410 OSDSession::unique_lock sl(session->lock);
4411 _reopen_session(session);
4412 _kick_requests(session, lresend);
4413 sl.unlock();
4414 _linger_ops_resend(lresend, wl);
4415 wl.unlock();
4416 maybe_request_map();
7c673cae
FG
4417 }
4418 return true;
4419 }
4420 return false;
4421}
4422
4423void Objecter::ms_handle_remote_reset(Connection *con)
4424{
4425 /*
4426 * treat these the same.
4427 */
4428 ms_handle_reset(con);
4429}
4430
4431bool Objecter::ms_handle_refused(Connection *con)
4432{
4433 // just log for now
4434 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4435 int osd = osdmap->identify_osd(con->get_peer_addr());
4436 if (osd >= 0) {
4437 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4438 }
4439 }
4440 return false;
4441}
4442
4443bool Objecter::ms_get_authorizer(int dest_type,
11fdf7f2 4444 AuthAuthorizer **authorizer)
7c673cae 4445{
31f18b77 4446 if (!initialized)
7c673cae
FG
4447 return false;
4448 if (dest_type == CEPH_ENTITY_TYPE_MON)
4449 return true;
4450 *authorizer = monc->build_authorizer(dest_type);
4451 return *authorizer != NULL;
4452}
4453
4454void Objecter::op_target_t::dump(Formatter *f) const
4455{
4456 f->dump_stream("pg") << pgid;
4457 f->dump_int("osd", osd);
4458 f->dump_stream("object_id") << base_oid;
4459 f->dump_stream("object_locator") << base_oloc;
4460 f->dump_stream("target_object_id") << target_oid;
4461 f->dump_stream("target_object_locator") << target_oloc;
4462 f->dump_int("paused", (int)paused);
4463 f->dump_int("used_replica", (int)used_replica);
4464 f->dump_int("precalc_pgid", (int)precalc_pgid);
4465}
4466
4467void Objecter::_dump_active(OSDSession *s)
4468{
4469 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4470 p != s->ops.end();
4471 ++p) {
4472 Op *op = p->second;
4473 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4474 << "\tosd." << (op->session ? op->session->osd : -1)
4475 << "\t" << op->target.base_oid
4476 << "\t" << op->ops << dendl;
4477 }
4478}
4479
4480void Objecter::_dump_active()
4481{
31f18b77 4482 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae
FG
4483 << dendl;
4484 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4485 siter != osd_sessions.end(); ++siter) {
4486 OSDSession *s = siter->second;
4487 OSDSession::shared_lock sl(s->lock);
4488 _dump_active(s);
4489 sl.unlock();
4490 }
4491 _dump_active(homeless_session);
4492}
4493
4494void Objecter::dump_active()
4495{
4496 shared_lock rl(rwlock);
4497 _dump_active();
4498 rl.unlock();
4499}
4500
4501void Objecter::dump_requests(Formatter *fmt)
4502{
4503 // Read-lock on Objecter held here
4504 fmt->open_object_section("requests");
4505 dump_ops(fmt);
4506 dump_linger_ops(fmt);
4507 dump_pool_ops(fmt);
4508 dump_pool_stat_ops(fmt);
4509 dump_statfs_ops(fmt);
4510 dump_command_ops(fmt);
4511 fmt->close_section(); // requests object
4512}
4513
4514void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4515{
4516 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4517 p != s->ops.end();
4518 ++p) {
4519 Op *op = p->second;
92f5a8d4 4520 auto age = std::chrono::duration<double>(coarse_mono_clock::now() - op->stamp);
7c673cae
FG
4521 fmt->open_object_section("op");
4522 fmt->dump_unsigned("tid", op->tid);
4523 op->target.dump(fmt);
4524 fmt->dump_stream("last_sent") << op->stamp;
92f5a8d4 4525 fmt->dump_float("age", age.count());
7c673cae
FG
4526 fmt->dump_int("attempts", op->attempts);
4527 fmt->dump_stream("snapid") << op->snapid;
4528 fmt->dump_stream("snap_context") << op->snapc;
4529 fmt->dump_stream("mtime") << op->mtime;
4530
4531 fmt->open_array_section("osd_ops");
4532 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4533 it != op->ops.end();
4534 ++it) {
4535 fmt->dump_stream("osd_op") << *it;
4536 }
4537 fmt->close_section(); // osd_ops array
4538
4539 fmt->close_section(); // op object
4540 }
4541}
4542
4543void Objecter::dump_ops(Formatter *fmt)
4544{
4545 // Read-lock on Objecter held
4546 fmt->open_array_section("ops");
4547 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4548 siter != osd_sessions.end(); ++siter) {
4549 OSDSession *s = siter->second;
4550 OSDSession::shared_lock sl(s->lock);
4551 _dump_ops(s, fmt);
4552 sl.unlock();
4553 }
4554 _dump_ops(homeless_session, fmt);
4555 fmt->close_section(); // ops array
4556}
4557
4558void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4559{
4560 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4561 p != s->linger_ops.end();
4562 ++p) {
4563 LingerOp *op = p->second;
4564 fmt->open_object_section("linger_op");
4565 fmt->dump_unsigned("linger_id", op->linger_id);
4566 op->target.dump(fmt);
4567 fmt->dump_stream("snapid") << op->snap;
4568 fmt->dump_stream("registered") << op->registered;
4569 fmt->close_section(); // linger_op object
4570 }
4571}
4572
4573void Objecter::dump_linger_ops(Formatter *fmt)
4574{
4575 // We have a read-lock on the objecter
4576 fmt->open_array_section("linger_ops");
4577 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4578 siter != osd_sessions.end(); ++siter) {
4579 OSDSession *s = siter->second;
4580 OSDSession::shared_lock sl(s->lock);
4581 _dump_linger_ops(s, fmt);
4582 sl.unlock();
4583 }
4584 _dump_linger_ops(homeless_session, fmt);
4585 fmt->close_section(); // linger_ops array
4586}
4587
4588void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4589{
4590 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4591 p != s->command_ops.end();
4592 ++p) {
4593 CommandOp *op = p->second;
4594 fmt->open_object_section("command_op");
4595 fmt->dump_unsigned("command_id", op->tid);
4596 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4597 fmt->open_array_section("command");
4598 for (vector<string>::const_iterator q = op->cmd.begin();
4599 q != op->cmd.end(); ++q)
4600 fmt->dump_string("word", *q);
4601 fmt->close_section();
4602 if (op->target_osd >= 0)
4603 fmt->dump_int("target_osd", op->target_osd);
4604 else
4605 fmt->dump_stream("target_pg") << op->target_pg;
4606 fmt->close_section(); // command_op object
4607 }
4608}
4609
4610void Objecter::dump_command_ops(Formatter *fmt)
4611{
4612 // We have a read-lock on the Objecter here
4613 fmt->open_array_section("command_ops");
4614 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4615 siter != osd_sessions.end(); ++siter) {
4616 OSDSession *s = siter->second;
4617 OSDSession::shared_lock sl(s->lock);
4618 _dump_command_ops(s, fmt);
4619 sl.unlock();
4620 }
4621 _dump_command_ops(homeless_session, fmt);
4622 fmt->close_section(); // command_ops array
4623}
4624
4625void Objecter::dump_pool_ops(Formatter *fmt) const
4626{
4627 fmt->open_array_section("pool_ops");
4628 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4629 p != pool_ops.end();
4630 ++p) {
4631 PoolOp *op = p->second;
4632 fmt->open_object_section("pool_op");
4633 fmt->dump_unsigned("tid", op->tid);
4634 fmt->dump_int("pool", op->pool);
4635 fmt->dump_string("name", op->name);
4636 fmt->dump_int("operation_type", op->pool_op);
7c673cae
FG
4637 fmt->dump_unsigned("crush_rule", op->crush_rule);
4638 fmt->dump_stream("snapid") << op->snapid;
4639 fmt->dump_stream("last_sent") << op->last_submit;
4640 fmt->close_section(); // pool_op object
4641 }
4642 fmt->close_section(); // pool_ops array
4643}
4644
4645void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4646{
4647 fmt->open_array_section("pool_stat_ops");
4648 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4649 p != poolstat_ops.end();
4650 ++p) {
4651 PoolStatOp *op = p->second;
4652 fmt->open_object_section("pool_stat_op");
4653 fmt->dump_unsigned("tid", op->tid);
4654 fmt->dump_stream("last_sent") << op->last_submit;
4655
4656 fmt->open_array_section("pools");
4657 for (list<string>::const_iterator it = op->pools.begin();
4658 it != op->pools.end();
4659 ++it) {
4660 fmt->dump_string("pool", *it);
4661 }
4662 fmt->close_section(); // pools array
4663
4664 fmt->close_section(); // pool_stat_op object
4665 }
4666 fmt->close_section(); // pool_stat_ops array
4667}
4668
4669void Objecter::dump_statfs_ops(Formatter *fmt) const
4670{
4671 fmt->open_array_section("statfs_ops");
4672 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4673 p != statfs_ops.end();
4674 ++p) {
4675 StatfsOp *op = p->second;
4676 fmt->open_object_section("statfs_op");
4677 fmt->dump_unsigned("tid", op->tid);
4678 fmt->dump_stream("last_sent") << op->last_submit;
4679 fmt->close_section(); // statfs_op object
4680 }
4681 fmt->close_section(); // statfs_ops array
4682}
4683
4684Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4685 m_objecter(objecter)
4686{
4687}
4688
11fdf7f2
TL
4689bool Objecter::RequestStateHook::call(std::string_view command,
4690 const cmdmap_t& cmdmap,
4691 std::string_view format,
4692 bufferlist& out)
7c673cae
FG
4693{
4694 Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
4695 shared_lock rl(m_objecter->rwlock);
4696 m_objecter->dump_requests(f);
4697 f->flush(out);
4698 delete f;
4699 return true;
4700}
4701
4702void Objecter::blacklist_self(bool set)
4703{
4704 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4705
4706 vector<string> cmd;
4707 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4708 if (set)
4709 cmd.push_back("\"blacklistop\":\"add\",");
4710 else
4711 cmd.push_back("\"blacklistop\":\"rm\",");
4712 stringstream ss;
11fdf7f2
TL
4713 // this is somewhat imprecise in that we are blacklisting our first addr only
4714 ss << messenger->get_myaddrs().front().get_legacy_str();
7c673cae
FG
4715 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4716
4717 MMonCommand *m = new MMonCommand(monc->get_fsid());
4718 m->cmd = cmd;
4719
4720 monc->send_mon_message(m);
4721}
4722
4723// commands
4724
4725void Objecter::handle_command_reply(MCommandReply *m)
4726{
4727 unique_lock wl(rwlock);
31f18b77 4728 if (!initialized) {
7c673cae
FG
4729 m->put();
4730 return;
4731 }
4732
4733 ConnectionRef con = m->get_connection();
11fdf7f2
TL
4734 auto priv = con->get_priv();
4735 auto s = static_cast<OSDSession*>(priv.get());
7c673cae
FG
4736 if (!s || s->con != con) {
4737 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4738 m->put();
7c673cae
FG
4739 return;
4740 }
4741
4742 OSDSession::shared_lock sl(s->lock);
4743 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4744 if (p == s->command_ops.end()) {
4745 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4746 << " not found" << dendl;
4747 m->put();
4748 sl.unlock();
7c673cae
FG
4749 return;
4750 }
4751
4752 CommandOp *c = p->second;
4753 if (!c->session ||
4754 m->get_connection() != c->session->con) {
4755 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4756 << " got reply from wrong connection "
4757 << m->get_connection() << " " << m->get_source_inst()
4758 << dendl;
4759 m->put();
4760 sl.unlock();
7c673cae
FG
4761 return;
4762 }
4763 if (c->poutbl) {
4764 c->poutbl->claim(m->get_data());
4765 }
4766
4767 sl.unlock();
4768
28e407b8 4769 OSDSession::unique_lock sul(s->lock);
7c673cae 4770 _finish_command(c, m->r, m->rs);
28e407b8
AA
4771 sul.unlock();
4772
7c673cae 4773 m->put();
7c673cae
FG
4774}
4775
4776void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4777{
4778 shunique_lock sul(rwlock, ceph::acquire_unique);
4779
31f18b77 4780 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4781 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4782 c->tid = tid;
4783
4784 {
4785 OSDSession::unique_lock hs_wl(homeless_session->lock);
4786 _session_command_op_assign(homeless_session, c);
4787 }
4788
4789 _calc_command_target(c, sul);
4790 _assign_command_session(c, sul);
4791 if (osd_timeout > timespan(0)) {
4792 c->ontimeout = timer.add_event(osd_timeout,
4793 [this, c, tid]() {
4794 command_op_cancel(c->session, tid,
4795 -ETIMEDOUT); });
4796 }
4797
4798 if (!c->session->is_homeless()) {
4799 _send_command(c);
4800 } else {
4801 _maybe_request_map();
4802 }
4803 if (c->map_check_error)
4804 _send_command_map_check(c);
4805 *ptid = tid;
4806
4807 logger->inc(l_osdc_command_active);
4808}
4809
4810int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4811{
11fdf7f2 4812 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4813
4814 c->map_check_error = 0;
4815
4816 // ignore overlays, just like we do with pg ops
4817 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4818
4819 if (c->target_osd >= 0) {
4820 if (!osdmap->exists(c->target_osd)) {
4821 c->map_check_error = -ENOENT;
4822 c->map_check_error_str = "osd dne";
4823 c->target.osd = -1;
4824 return RECALC_OP_TARGET_OSD_DNE;
4825 }
4826 if (osdmap->is_down(c->target_osd)) {
4827 c->map_check_error = -ENXIO;
4828 c->map_check_error_str = "osd down";
4829 c->target.osd = -1;
4830 return RECALC_OP_TARGET_OSD_DOWN;
4831 }
4832 c->target.osd = c->target_osd;
4833 } else {
4834 int ret = _calc_target(&(c->target), nullptr, true);
4835 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4836 c->map_check_error = -ENOENT;
4837 c->map_check_error_str = "pool dne";
4838 c->target.osd = -1;
4839 return ret;
4840 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4841 c->map_check_error = -ENXIO;
4842 c->map_check_error_str = "osd down";
4843 c->target.osd = -1;
4844 return ret;
4845 }
4846 }
4847
4848 OSDSession *s;
4849 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4850 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4851
4852 if (c->session != s) {
4853 put_session(s);
4854 return RECALC_OP_TARGET_NEED_RESEND;
4855 }
4856
4857 put_session(s);
4858
4859 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4860 << c->session << dendl;
4861
4862 return RECALC_OP_TARGET_NO_ACTION;
4863}
4864
4865void Objecter::_assign_command_session(CommandOp *c,
4866 shunique_lock& sul)
4867{
11fdf7f2 4868 ceph_assert(sul.owns_lock() && sul.mutex() == &rwlock);
7c673cae
FG
4869
4870 OSDSession *s;
4871 int r = _get_session(c->target.osd, &s, sul);
11fdf7f2 4872 ceph_assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
7c673cae
FG
4873
4874 if (c->session != s) {
4875 if (c->session) {
4876 OSDSession *cs = c->session;
4877 OSDSession::unique_lock csl(cs->lock);
4878 _session_command_op_remove(c->session, c);
4879 csl.unlock();
4880 }
4881 OSDSession::unique_lock sl(s->lock);
4882 _session_command_op_assign(s, c);
4883 }
4884
4885 put_session(s);
4886}
4887
4888void Objecter::_send_command(CommandOp *c)
4889{
4890 ldout(cct, 10) << "_send_command " << c->tid << dendl;
11fdf7f2
TL
4891 ceph_assert(c->session);
4892 ceph_assert(c->session->con);
7c673cae
FG
4893 MCommand *m = new MCommand(monc->monmap.fsid);
4894 m->cmd = c->cmd;
4895 m->set_data(c->inbl);
4896 m->set_tid(c->tid);
4897 c->session->con->send_message(m);
4898 logger->inc(l_osdc_command_send);
4899}
4900
4901int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4902{
11fdf7f2 4903 ceph_assert(initialized);
7c673cae
FG
4904
4905 unique_lock wl(rwlock);
4906
4907 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4908 if (it == s->command_ops.end()) {
4909 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4910 return -ENOENT;
4911 }
4912
4913 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4914
4915 CommandOp *op = it->second;
4916 _command_cancel_map_check(op);
28e407b8 4917 OSDSession::unique_lock sl(op->session->lock);
7c673cae 4918 _finish_command(op, r, "");
28e407b8 4919 sl.unlock();
7c673cae
FG
4920 return 0;
4921}
4922
4923void Objecter::_finish_command(CommandOp *c, int r, string rs)
4924{
4925 // rwlock is locked unique
28e407b8 4926 // session lock is locked
7c673cae
FG
4927
4928 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4929 << rs << dendl;
4930 if (c->prs)
4931 *c->prs = rs;
4932 if (c->onfinish)
4933 c->onfinish->complete(r);
4934
4935 if (c->ontimeout && r != -ETIMEDOUT)
4936 timer.cancel_event(c->ontimeout);
4937
7c673cae 4938 _session_command_op_remove(c->session, c);
7c673cae
FG
4939
4940 c->put();
4941
4942 logger->dec(l_osdc_command_active);
4943}
4944
4945Objecter::OSDSession::~OSDSession()
4946{
4947 // Caller is responsible for re-assigning or
4948 // destroying any ops that were assigned to us
11fdf7f2
TL
4949 ceph_assert(ops.empty());
4950 ceph_assert(linger_ops.empty());
4951 ceph_assert(command_ops.empty());
7c673cae
FG
4952}
4953
4954Objecter::~Objecter()
4955{
4956 delete osdmap;
4957
11fdf7f2
TL
4958 ceph_assert(homeless_session->get_nref() == 1);
4959 ceph_assert(num_homeless_ops == 0);
7c673cae
FG
4960 homeless_session->put();
4961
11fdf7f2
TL
4962 ceph_assert(osd_sessions.empty());
4963 ceph_assert(poolstat_ops.empty());
4964 ceph_assert(statfs_ops.empty());
4965 ceph_assert(pool_ops.empty());
4966 ceph_assert(waiting_for_map.empty());
4967 ceph_assert(linger_ops.empty());
4968 ceph_assert(check_latest_map_lingers.empty());
4969 ceph_assert(check_latest_map_ops.empty());
4970 ceph_assert(check_latest_map_commands.empty());
7c673cae 4971
11fdf7f2
TL
4972 ceph_assert(!m_request_state_hook);
4973 ceph_assert(!logger);
7c673cae
FG
4974}
4975
4976/**
4977 * Wait until this OSD map epoch is received before
4978 * sending any more operations to OSDs. Use this
4979 * when it is known that the client can't trust
4980 * anything from before this epoch (e.g. due to
4981 * client blacklist at this epoch).
4982 */
4983void Objecter::set_epoch_barrier(epoch_t epoch)
4984{
4985 unique_lock wl(rwlock);
4986
4987 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
4988 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
4989 << dendl;
4990 if (epoch > epoch_barrier) {
4991 epoch_barrier = epoch;
4992 _maybe_request_map();
4993 }
4994}
4995
4996
4997
4998hobject_t Objecter::enumerate_objects_begin()
4999{
5000 return hobject_t();
5001}
5002
5003hobject_t Objecter::enumerate_objects_end()
5004{
5005 return hobject_t::get_max();
5006}
5007
5008struct C_EnumerateReply : public Context {
5009 bufferlist bl;
5010
5011 Objecter *objecter;
5012 hobject_t *next;
5013 std::list<librados::ListObjectImpl> *result;
5014 const hobject_t end;
5015 const int64_t pool_id;
5016 Context *on_finish;
5017
5018 epoch_t epoch;
5019 int budget;
5020
5021 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5022 std::list<librados::ListObjectImpl> *result_,
5023 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5024 objecter(objecter_), next(next_), result(result_),
5025 end(end_), pool_id(pool_id_), on_finish(on_finish_),
11fdf7f2 5026 epoch(0), budget(-1)
7c673cae
FG
5027 {}
5028
5029 void finish(int r) override {
5030 objecter->_enumerate_reply(
5031 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5032 }
5033};
5034
5035void Objecter::enumerate_objects(
5036 int64_t pool_id,
5037 const std::string &ns,
5038 const hobject_t &start,
5039 const hobject_t &end,
5040 const uint32_t max,
5041 const bufferlist &filter_bl,
5042 std::list<librados::ListObjectImpl> *result,
5043 hobject_t *next,
5044 Context *on_finish)
5045{
11fdf7f2 5046 ceph_assert(result);
7c673cae
FG
5047
5048 if (!end.is_max() && start > end) {
5049 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5050 on_finish->complete(-EINVAL);
5051 return;
5052 }
5053
5054 if (max < 1) {
5055 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5056 on_finish->complete(-EINVAL);
5057 return;
5058 }
5059
5060 if (start.is_max()) {
5061 on_finish->complete(0);
5062 return;
5063 }
5064
5065 shared_lock rl(rwlock);
11fdf7f2 5066 ceph_assert(osdmap->get_epoch());
7c673cae
FG
5067 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5068 rl.unlock();
5069 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5070 on_finish->complete(-EOPNOTSUPP);
5071 return;
5072 }
5073 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5074 if (!p) {
5075 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5076 << osdmap->get_epoch() << dendl;
5077 rl.unlock();
5078 on_finish->complete(-ENOENT);
5079 return;
5080 } else {
5081 rl.unlock();
5082 }
5083
5084 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5085
5086 // Stash completion state
5087 C_EnumerateReply *on_ack = new C_EnumerateReply(
5088 this, next, result, end, pool_id, on_finish);
5089
5090 ObjectOperation op;
5091 op.pg_nls(max, filter_bl, start, 0);
5092
5093 // Issue. See you later in _enumerate_reply
5094 object_locator_t oloc(pool_id, ns);
5095 pg_read(start.get_hash(), oloc, op,
5096 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5097}
5098
5099void Objecter::_enumerate_reply(
5100 bufferlist &bl,
5101 int r,
5102 const hobject_t &end,
5103 const int64_t pool_id,
5104 int budget,
5105 epoch_t reply_epoch,
5106 std::list<librados::ListObjectImpl> *result,
5107 hobject_t *next,
5108 Context *on_finish)
5109{
11fdf7f2 5110 if (budget >= 0) {
7c673cae
FG
5111 put_op_budget_bytes(budget);
5112 }
5113
5114 if (r < 0) {
5115 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5116 on_finish->complete(r);
5117 return;
5118 }
5119
11fdf7f2 5120 ceph_assert(next != NULL);
7c673cae
FG
5121
5122 // Decode the results
11fdf7f2 5123 auto iter = bl.cbegin();
7c673cae
FG
5124 pg_nls_response_t response;
5125
5126 // XXX extra_info doesn't seem used anywhere?
5127 bufferlist extra_info;
11fdf7f2 5128 decode(response, iter);
7c673cae 5129 if (!iter.end()) {
11fdf7f2 5130 decode(extra_info, iter);
7c673cae
FG
5131 }
5132
5133 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5134 << " handle " << response.handle
5135 << " reply_epoch " << reply_epoch << dendl;
5136 ldout(cct, 20) << __func__ << ": response.entries.size "
5137 << response.entries.size() << ", response.entries "
5138 << response.entries << dendl;
5139 if (response.handle <= end) {
5140 *next = response.handle;
5141 } else {
5142 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5143 << dendl;
5144 *next = end;
5145
5146 // drop anything after 'end'
5147 shared_lock rl(rwlock);
5148 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5149 if (!pool) {
5150 // pool is gone, drop any results which are now meaningless.
5151 rl.unlock();
5152 on_finish->complete(-ENOENT);
5153 return;
5154 }
5155 while (!response.entries.empty()) {
5156 uint32_t hash = response.entries.back().locator.empty() ?
5157 pool->hash_key(response.entries.back().oid,
5158 response.entries.back().nspace) :
5159 pool->hash_key(response.entries.back().locator,
5160 response.entries.back().nspace);
5161 hobject_t last(response.entries.back().oid,
5162 response.entries.back().locator,
5163 CEPH_NOSNAP,
5164 hash,
5165 pool_id,
5166 response.entries.back().nspace);
5167 if (last < end)
5168 break;
5169 ldout(cct, 20) << __func__ << " dropping item " << last
5170 << " >= end " << end << dendl;
5171 response.entries.pop_back();
5172 }
5173 rl.unlock();
5174 }
5175 if (!response.entries.empty()) {
5176 result->merge(response.entries);
5177 }
5178
5179 // release the listing context's budget once all
5180 // OPs (in the session) are finished
5181#if 0
5182 put_nlist_context_budget(list_context);
5183#endif
5184 on_finish->complete(r);
5185 return;
5186}
5187
5188namespace {
5189 using namespace librados;
5190
5191 template <typename T>
5192 void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
5193 {
5194 for (auto bl : bls) {
11fdf7f2 5195 auto p = bl.cbegin();
7c673cae
FG
5196 T t;
5197 decode(t, p);
5198 items.push_back(t);
5199 }
5200 }
5201
5202 struct C_ObjectOperation_scrub_ls : public Context {
5203 bufferlist bl;
5204 uint32_t *interval;
5205 std::vector<inconsistent_obj_t> *objects = nullptr;
5206 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5207 int *rval;
5208
5209 C_ObjectOperation_scrub_ls(uint32_t *interval,
5210 std::vector<inconsistent_obj_t> *objects,
5211 int *rval)
5212 : interval(interval), objects(objects), rval(rval) {}
5213 C_ObjectOperation_scrub_ls(uint32_t *interval,
5214 std::vector<inconsistent_snapset_t> *snapsets,
5215 int *rval)
5216 : interval(interval), snapsets(snapsets), rval(rval) {}
5217 void finish(int r) override {
5218 if (r < 0 && r != -EAGAIN) {
5219 if (rval)
5220 *rval = r;
5221 return;
5222 }
5223
5224 if (rval)
5225 *rval = 0;
5226
5227 try {
5228 decode();
5229 } catch (buffer::error&) {
5230 if (rval)
5231 *rval = -EIO;
5232 }
5233 }
5234 private:
5235 void decode() {
5236 scrub_ls_result_t result;
11fdf7f2 5237 auto p = bl.cbegin();
7c673cae
FG
5238 result.decode(p);
5239 *interval = result.interval;
5240 if (objects) {
5241 do_decode(*objects, result.vals);
5242 } else {
5243 do_decode(*snapsets, result.vals);
5244 }
5245 }
5246 };
5247
5248 template <typename T>
5249 void do_scrub_ls(::ObjectOperation *op,
5250 const scrub_ls_arg_t& arg,
5251 std::vector<T> *items,
5252 uint32_t *interval,
5253 int *rval)
5254 {
5255 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5256 op->flags |= CEPH_OSD_FLAG_PGOP;
11fdf7f2 5257 ceph_assert(interval);
7c673cae
FG
5258 arg.encode(osd_op.indata);
5259 unsigned p = op->ops.size() - 1;
5260 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5261 op->out_handler[p] = h;
5262 op->out_bl[p] = &h->bl;
5263 op->out_rval[p] = rval;
5264 }
5265}
5266
5267void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5268 uint64_t max_to_get,
5269 std::vector<librados::inconsistent_obj_t> *objects,
5270 uint32_t *interval,
5271 int *rval)
5272{
5273 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5274 do_scrub_ls(this, arg, objects, interval, rval);
5275}
5276
5277void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5278 uint64_t max_to_get,
5279 std::vector<librados::inconsistent_snapset_t> *snapsets,
5280 uint32_t *interval,
5281 int *rval)
5282{
5283 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5284 do_scrub_ls(this, arg, snapsets, interval, rval);
5285}