]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
bump version to 12.2.11-pve1
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Objecter.h"
16#include "osd/OSDMap.h"
17#include "Filer.h"
18
19#include "mon/MonClient.h"
20
21#include "msg/Messenger.h"
22#include "msg/Message.h"
23
24#include "messages/MPing.h"
25#include "messages/MOSDOp.h"
26#include "messages/MOSDOpReply.h"
27#include "messages/MOSDBackoff.h"
28#include "messages/MOSDMap.h"
29
30#include "messages/MPoolOp.h"
31#include "messages/MPoolOpReply.h"
32
33#include "messages/MGetPoolStats.h"
34#include "messages/MGetPoolStatsReply.h"
35#include "messages/MStatfs.h"
36#include "messages/MStatfsReply.h"
37
38#include "messages/MMonCommand.h"
39
40#include "messages/MCommand.h"
41#include "messages/MCommandReply.h"
42
43#include "messages/MWatchNotify.h"
44
45#include <errno.h>
46
47#include "common/config.h"
48#include "common/perf_counters.h"
49#include "common/scrub_types.h"
50#include "include/str_list.h"
51#include "common/errno.h"
52#include "common/EventTrace.h"
53
54using ceph::real_time;
55using ceph::real_clock;
56
57using ceph::mono_clock;
58using ceph::mono_time;
59
60using ceph::timespan;
61
62
63#define dout_subsys ceph_subsys_objecter
64#undef dout_prefix
65#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
66
67
68enum {
69 l_osdc_first = 123200,
70 l_osdc_op_active,
71 l_osdc_op_laggy,
72 l_osdc_op_send,
73 l_osdc_op_send_bytes,
74 l_osdc_op_resend,
75 l_osdc_op_reply,
76
77 l_osdc_op,
78 l_osdc_op_r,
79 l_osdc_op_w,
80 l_osdc_op_rmw,
81 l_osdc_op_pg,
82
83 l_osdc_osdop_stat,
84 l_osdc_osdop_create,
85 l_osdc_osdop_read,
86 l_osdc_osdop_write,
87 l_osdc_osdop_writefull,
88 l_osdc_osdop_writesame,
89 l_osdc_osdop_append,
90 l_osdc_osdop_zero,
91 l_osdc_osdop_truncate,
92 l_osdc_osdop_delete,
93 l_osdc_osdop_mapext,
94 l_osdc_osdop_sparse_read,
95 l_osdc_osdop_clonerange,
96 l_osdc_osdop_getxattr,
97 l_osdc_osdop_setxattr,
98 l_osdc_osdop_cmpxattr,
99 l_osdc_osdop_rmxattr,
100 l_osdc_osdop_resetxattrs,
101 l_osdc_osdop_tmap_up,
102 l_osdc_osdop_tmap_put,
103 l_osdc_osdop_tmap_get,
104 l_osdc_osdop_call,
105 l_osdc_osdop_watch,
106 l_osdc_osdop_notify,
107 l_osdc_osdop_src_cmpxattr,
108 l_osdc_osdop_pgls,
109 l_osdc_osdop_pgls_filter,
110 l_osdc_osdop_other,
111
112 l_osdc_linger_active,
113 l_osdc_linger_send,
114 l_osdc_linger_resend,
115 l_osdc_linger_ping,
116
117 l_osdc_poolop_active,
118 l_osdc_poolop_send,
119 l_osdc_poolop_resend,
120
121 l_osdc_poolstat_active,
122 l_osdc_poolstat_send,
123 l_osdc_poolstat_resend,
124
125 l_osdc_statfs_active,
126 l_osdc_statfs_send,
127 l_osdc_statfs_resend,
128
129 l_osdc_command_active,
130 l_osdc_command_send,
131 l_osdc_command_resend,
132
133 l_osdc_map_epoch,
134 l_osdc_map_full,
135 l_osdc_map_inc,
136
137 l_osdc_osd_sessions,
138 l_osdc_osd_session_open,
139 l_osdc_osd_session_close,
140 l_osdc_osd_laggy,
141
142 l_osdc_osdop_omap_wr,
143 l_osdc_osdop_omap_rd,
144 l_osdc_osdop_omap_del,
145
146 l_osdc_last,
147};
148
149
150// config obs ----------------------------
151
152static const char *config_keys[] = {
153 "crush_location",
154 NULL
155};
156
157class Objecter::RequestStateHook : public AdminSocketHook {
158 Objecter *m_objecter;
159public:
160 explicit RequestStateHook(Objecter *objecter);
161 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
162 bufferlist& out) override;
163};
164
165/**
166 * This is a more limited form of C_Contexts, but that requires
167 * a ceph_context which we don't have here.
168 */
169class ObjectOperation::C_TwoContexts : public Context {
170 Context *first;
171 Context *second;
172public:
173 C_TwoContexts(Context *first, Context *second) :
174 first(first), second(second) {}
175 void finish(int r) override {
176 first->complete(r);
177 second->complete(r);
178 first = NULL;
179 second = NULL;
180 }
181
182 ~C_TwoContexts() override {
183 delete first;
184 delete second;
185 }
186};
187
188void ObjectOperation::add_handler(Context *extra) {
189 size_t last = out_handler.size() - 1;
190 Context *orig = out_handler[last];
191 if (orig) {
192 Context *wrapper = new C_TwoContexts(orig, extra);
193 out_handler[last] = wrapper;
194 } else {
195 out_handler[last] = extra;
196 }
197}
198
199Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
200 object_t& oid)
201{
202 if (oid.name.empty())
203 return unique_completion_lock();
204
205 static constexpr uint32_t HASH_PRIME = 1021;
206 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
207 % HASH_PRIME;
208
209 return unique_completion_lock(completion_locks[h % num_locks],
210 std::defer_lock);
211}
212
213const char** Objecter::get_tracked_conf_keys() const
214{
215 return config_keys;
216}
217
218
219void Objecter::handle_conf_change(const struct md_config_t *conf,
220 const std::set <std::string> &changed)
221{
222 if (changed.count("crush_location")) {
223 update_crush_location();
224 }
225}
226
227void Objecter::update_crush_location()
228{
229 unique_lock wl(rwlock);
230 crush_location = cct->crush_location.get_location();
231}
232
233// messages ------------------------------
234
235/*
236 * initialize only internal data structures, don't initiate cluster interaction
237 */
238void Objecter::init()
239{
31f18b77 240 assert(!initialized);
7c673cae
FG
241
242 if (!logger) {
243 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
244
245 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
246 PerfCountersBuilder::PRIO_CRITICAL);
247 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
248 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
1adf2230 249 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(BYTES));
7c673cae
FG
250 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
251 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
252
253 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
254 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
255 PerfCountersBuilder::PRIO_CRITICAL);
256 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
257 PerfCountersBuilder::PRIO_CRITICAL);
258 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
259 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
260 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
261
262 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
263 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
264 "Create object operations");
265 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
266 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
267 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
268 "Write full object operations");
269 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
270 "Write same operations");
271 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
272 "Append operation");
273 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
274 "Set object to zero operations");
275 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
276 "Truncate object operations");
277 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
278 "Delete object operations");
279 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
280 "Map extent operations");
281 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
282 "Sparse read operations");
283 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
284 "Clone range operations");
285 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
286 "Get xattr operations");
287 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
288 "Set xattr operations");
289 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
290 "Xattr comparison operations");
291 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
292 "Remove xattr operations");
293 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
294 "Reset xattr operations");
295 pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
296 "TMAP update operations");
297 pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
298 "TMAP put operations");
299 pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
300 "TMAP get operations");
301 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
302 "Call (execute) operations");
303 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
304 "Watch by object operations");
305 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
306 "Notify about object operations");
307 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
308 "Extended attribute comparison in multi operations");
309 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
310 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
311 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
312
313 pcb.add_u64(l_osdc_linger_active, "linger_active",
314 "Active lingering operations");
315 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
316 "Sent lingering operations");
317 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
318 "Resent lingering operations");
319 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
320 "Sent pings to lingering operations");
321
322 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
323 "Active pool operations");
324 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
325 "Sent pool operations");
326 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
327 "Resent pool operations");
328
329 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
330 "Active get pool stat operations");
331 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
332 "Pool stat operations sent");
333 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
334 "Resent pool stats");
335
336 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
337 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
338 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
339 "Resent FS stats");
340
341 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
342 pcb.add_u64_counter(l_osdc_command_send, "command_send",
343 "Sent commands");
344 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
345 "Resent commands");
346
347 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
348 pcb.add_u64_counter(l_osdc_map_full, "map_full",
349 "Full OSD maps received");
350 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
351 "Incremental OSD maps received");
352
353 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
354 "Open sessions"); // open sessions
355 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
356 "Sessions opened");
357 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
358 "Sessions closed");
359 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
360
361 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
362 "OSD OMAP write operations");
363 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
364 "OSD OMAP read operations");
365 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
366 "OSD OMAP delete operations");
367
368 logger = pcb.create_perf_counters();
369 cct->get_perfcounters_collection()->add(logger);
370 }
371
372 m_request_state_hook = new RequestStateHook(this);
373 AdminSocket* admin_socket = cct->get_admin_socket();
374 int ret = admin_socket->register_command("objecter_requests",
375 "objecter_requests",
376 m_request_state_hook,
377 "show in-progress osd requests");
378
379 /* Don't warn on EEXIST, happens if multiple ceph clients
380 * are instantiated from one process */
381 if (ret < 0 && ret != -EEXIST) {
382 lderr(cct) << "error registering admin socket command: "
383 << cpp_strerror(ret) << dendl;
384 }
385
386 update_crush_location();
387
388 cct->_conf->add_observer(this);
389
31f18b77 390 initialized = true;
7c673cae
FG
391}
392
393/*
394 * ok, cluster interaction can happen
395 */
396void Objecter::start(const OSDMap* o)
397{
398 shared_lock rl(rwlock);
399
400 start_tick();
401 if (o) {
402 osdmap->deepish_copy_from(*o);
403 } else if (osdmap->get_epoch() == 0) {
404 _maybe_request_map();
405 }
406}
407
408void Objecter::shutdown()
409{
31f18b77 410 assert(initialized);
7c673cae
FG
411
412 unique_lock wl(rwlock);
413
31f18b77 414 initialized = false;
7c673cae 415
f64942e4 416 wl.unlock();
7c673cae 417 cct->_conf->remove_observer(this);
f64942e4 418 wl.lock();
7c673cae
FG
419
420 map<int,OSDSession*>::iterator p;
421 while (!osd_sessions.empty()) {
422 p = osd_sessions.begin();
423 close_session(p->second);
424 }
425
426 while(!check_latest_map_lingers.empty()) {
427 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
428 i->second->put();
429 check_latest_map_lingers.erase(i->first);
430 }
431
432 while(!check_latest_map_ops.empty()) {
433 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
434 i->second->put();
435 check_latest_map_ops.erase(i->first);
436 }
437
438 while(!check_latest_map_commands.empty()) {
439 map<ceph_tid_t, CommandOp*>::iterator i
440 = check_latest_map_commands.begin();
441 i->second->put();
442 check_latest_map_commands.erase(i->first);
443 }
444
445 while(!poolstat_ops.empty()) {
446 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
447 delete i->second;
448 poolstat_ops.erase(i->first);
449 }
450
451 while(!statfs_ops.empty()) {
452 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
453 delete i->second;
454 statfs_ops.erase(i->first);
455 }
456
457 while(!pool_ops.empty()) {
458 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
459 delete i->second;
460 pool_ops.erase(i->first);
461 }
462
463 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
464 while(!homeless_session->linger_ops.empty()) {
465 std::map<uint64_t, LingerOp*>::iterator i
466 = homeless_session->linger_ops.begin();
467 ldout(cct, 10) << " linger_op " << i->first << dendl;
468 LingerOp *lop = i->second;
469 {
470 OSDSession::unique_lock swl(homeless_session->lock);
471 _session_linger_op_remove(homeless_session, lop);
472 }
473 linger_ops.erase(lop->linger_id);
474 linger_ops_set.erase(lop);
475 lop->put();
476 }
477
478 while(!homeless_session->ops.empty()) {
479 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
480 ldout(cct, 10) << " op " << i->first << dendl;
481 Op *op = i->second;
482 {
483 OSDSession::unique_lock swl(homeless_session->lock);
484 _session_op_remove(homeless_session, op);
485 }
486 op->put();
487 }
488
489 while(!homeless_session->command_ops.empty()) {
490 std::map<ceph_tid_t, CommandOp*>::iterator i
491 = homeless_session->command_ops.begin();
492 ldout(cct, 10) << " command_op " << i->first << dendl;
493 CommandOp *cop = i->second;
494 {
495 OSDSession::unique_lock swl(homeless_session->lock);
496 _session_command_op_remove(homeless_session, cop);
497 }
498 cop->put();
499 }
500
501 if (tick_event) {
502 if (timer.cancel_event(tick_event)) {
503 ldout(cct, 10) << " successfully canceled tick" << dendl;
504 }
505 tick_event = 0;
506 }
507
508 if (logger) {
509 cct->get_perfcounters_collection()->remove(logger);
510 delete logger;
511 logger = NULL;
512 }
513
514 // Let go of Objecter write lock so timer thread can shutdown
515 wl.unlock();
516
517 // Outside of lock to avoid cycle WRT calls to RequestStateHook
518 // This is safe because we guarantee no concurrent calls to
519 // shutdown() with the ::initialized check at start.
520 if (m_request_state_hook) {
521 AdminSocket* admin_socket = cct->get_admin_socket();
522 admin_socket->unregister_command("objecter_requests");
523 delete m_request_state_hook;
524 m_request_state_hook = NULL;
525 }
526}
527
528void Objecter::_send_linger(LingerOp *info,
529 shunique_lock& sul)
530{
531 assert(sul.owns_lock() && sul.mutex() == &rwlock);
532
533 vector<OSDOp> opv;
534 Context *oncommit = NULL;
535 LingerOp::shared_lock watchl(info->watch_lock);
536 bufferlist *poutbl = NULL;
537 if (info->registered && info->is_watch) {
538 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
539 << dendl;
540 opv.push_back(OSDOp());
541 opv.back().op.op = CEPH_OSD_OP_WATCH;
542 opv.back().op.watch.cookie = info->get_cookie();
543 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
544 opv.back().op.watch.gen = ++info->register_gen;
545 oncommit = new C_Linger_Reconnect(this, info);
546 } else {
547 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
548 << dendl;
549 opv = info->ops;
550 C_Linger_Commit *c = new C_Linger_Commit(this, info);
551 if (!info->is_watch) {
552 info->notify_id = 0;
553 poutbl = &c->outbl;
554 }
555 oncommit = c;
556 }
557 watchl.unlock();
558 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
559 opv, info->target.flags | CEPH_OSD_FLAG_READ,
560 oncommit, info->pobjver);
561 o->outbl = poutbl;
562 o->snapid = info->snap;
563 o->snapc = info->snapc;
564 o->mtime = info->mtime;
565
566 o->target = info->target;
31f18b77 567 o->tid = ++last_tid;
7c673cae
FG
568
569 // do not resend this; we will send a new op to reregister
570 o->should_resend = false;
571
572 if (info->register_tid) {
573 // repeat send. cancel old registeration op, if any.
574 OSDSession::unique_lock sl(info->session->lock);
575 if (info->session->ops.count(info->register_tid)) {
576 Op *o = info->session->ops[info->register_tid];
577 _op_cancel_map_check(o);
578 _cancel_linger_op(o);
579 }
580 sl.unlock();
581
582 _op_submit(o, sul, &info->register_tid);
583 } else {
584 // first send
585 _op_submit_with_budget(o, sul, &info->register_tid);
586 }
587
588 logger->inc(l_osdc_linger_send);
589}
590
591void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
592{
593 LingerOp::unique_lock wl(info->watch_lock);
594 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
595 if (info->on_reg_commit) {
596 info->on_reg_commit->complete(r);
597 info->on_reg_commit = NULL;
598 }
28e407b8
AA
599 if (r < 0 && info->on_notify_finish) {
600 info->on_notify_finish->complete(r);
601 info->on_notify_finish = nullptr;
602 }
7c673cae
FG
603
604 // only tell the user the first time we do this
605 info->registered = true;
606 info->pobjver = NULL;
607
608 if (!info->is_watch) {
609 // make note of the notify_id
610 bufferlist::iterator p = outbl.begin();
611 try {
612 ::decode(info->notify_id, p);
613 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
614 << dendl;
615 }
616 catch (buffer::error& e) {
617 }
618 }
619}
620
621struct C_DoWatchError : public Context {
622 Objecter *objecter;
623 Objecter::LingerOp *info;
624 int err;
625 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
626 : objecter(o), info(i), err(r) {
627 info->get();
628 info->_queued_async();
629 }
630 void finish(int r) override {
631 Objecter::unique_lock wl(objecter->rwlock);
632 bool canceled = info->canceled;
633 wl.unlock();
634
635 if (!canceled) {
636 info->watch_context->handle_error(info->get_cookie(), err);
637 }
638
639 info->finished_async();
640 info->put();
641 }
642};
643
644int Objecter::_normalize_watch_error(int r)
645{
646 // translate ENOENT -> ENOTCONN so that a delete->disconnection
647 // notification and a failure to reconnect becuase we raced with
648 // the delete appear the same to the user.
649 if (r == -ENOENT)
650 r = -ENOTCONN;
651 return r;
652}
653
654void Objecter::_linger_reconnect(LingerOp *info, int r)
655{
656 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
657 << " (last_error " << info->last_error << ")" << dendl;
658 if (r < 0) {
659 LingerOp::unique_lock wl(info->watch_lock);
660 if (!info->last_error) {
661 r = _normalize_watch_error(r);
662 info->last_error = r;
663 if (info->watch_context) {
664 finisher->queue(new C_DoWatchError(this, info, r));
665 }
666 }
667 wl.unlock();
668 }
669}
670
671void Objecter::_send_linger_ping(LingerOp *info)
672{
673 // rwlock is locked unique
674 // info->session->lock is locked
675
676 if (cct->_conf->objecter_inject_no_watch_ping) {
677 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
678 << dendl;
679 return;
680 }
681 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
682 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
683 return;
684 }
685
686 ceph::mono_time now = ceph::mono_clock::now();
687 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
688 << dendl;
689
690 vector<OSDOp> opv(1);
691 opv[0].op.op = CEPH_OSD_OP_WATCH;
692 opv[0].op.watch.cookie = info->get_cookie();
693 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
694 opv[0].op.watch.gen = info->register_gen;
695 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
696 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
697 opv, info->target.flags | CEPH_OSD_FLAG_READ,
698 onack, NULL, NULL);
699 o->target = info->target;
700 o->should_resend = false;
701 _send_op_account(o);
702 MOSDOp *m = _prepare_osd_op(o);
31f18b77 703 o->tid = ++last_tid;
7c673cae
FG
704 _session_op_assign(info->session, o);
705 _send_op(o, m);
706 info->ping_tid = o->tid;
707
708 onack->sent = now;
709 logger->inc(l_osdc_linger_ping);
710}
711
712void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
713 uint32_t register_gen)
714{
715 LingerOp::unique_lock l(info->watch_lock);
716 ldout(cct, 10) << __func__ << " " << info->linger_id
717 << " sent " << sent << " gen " << register_gen << " = " << r
718 << " (last_error " << info->last_error
719 << " register_gen " << info->register_gen << ")" << dendl;
720 if (info->register_gen == register_gen) {
721 if (r == 0) {
722 info->watch_valid_thru = sent;
723 } else if (r < 0 && !info->last_error) {
724 r = _normalize_watch_error(r);
725 info->last_error = r;
726 if (info->watch_context) {
727 finisher->queue(new C_DoWatchError(this, info, r));
728 }
729 }
730 } else {
731 ldout(cct, 20) << " ignoring old gen" << dendl;
732 }
733}
734
735int Objecter::linger_check(LingerOp *info)
736{
737 LingerOp::shared_lock l(info->watch_lock);
738
739 mono_time stamp = info->watch_valid_thru;
740 if (!info->watch_pending_async.empty())
741 stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
742 auto age = mono_clock::now() - stamp;
743
744 ldout(cct, 10) << __func__ << " " << info->linger_id
745 << " err " << info->last_error
746 << " age " << age << dendl;
747 if (info->last_error)
748 return info->last_error;
749 // return a safe upper bound (we are truncating to ms)
750 return
751 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
752}
753
754void Objecter::linger_cancel(LingerOp *info)
755{
756 unique_lock wl(rwlock);
757 _linger_cancel(info);
758 info->put();
759}
760
761void Objecter::_linger_cancel(LingerOp *info)
762{
763 // rwlock is locked unique
764 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
765 if (!info->canceled) {
766 OSDSession *s = info->session;
767 OSDSession::unique_lock sl(s->lock);
768 _session_linger_op_remove(s, info);
769 sl.unlock();
770
771 linger_ops.erase(info->linger_id);
772 linger_ops_set.erase(info);
773 assert(linger_ops.size() == linger_ops_set.size());
774
775 info->canceled = true;
776 info->put();
777
778 logger->dec(l_osdc_linger_active);
779 }
780}
781
782
783
784Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
785 const object_locator_t& oloc,
786 int flags)
787{
788 LingerOp *info = new LingerOp;
789 info->target.base_oid = oid;
790 info->target.base_oloc = oloc;
791 if (info->target.base_oloc.key == oid)
792 info->target.base_oloc.key.clear();
793 info->target.flags = flags;
794 info->watch_valid_thru = mono_clock::now();
795
796 unique_lock l(rwlock);
797
798 // Acquire linger ID
799 info->linger_id = ++max_linger_id;
800 ldout(cct, 10) << __func__ << " info " << info
801 << " linger_id " << info->linger_id
802 << " cookie " << info->get_cookie()
803 << dendl;
804 linger_ops[info->linger_id] = info;
805 linger_ops_set.insert(info);
806 assert(linger_ops.size() == linger_ops_set.size());
807
808 info->get(); // for the caller
809 return info;
810}
811
812ceph_tid_t Objecter::linger_watch(LingerOp *info,
813 ObjectOperation& op,
814 const SnapContext& snapc,
815 real_time mtime,
816 bufferlist& inbl,
817 Context *oncommit,
818 version_t *objver)
819{
820 info->is_watch = true;
821 info->snapc = snapc;
822 info->mtime = mtime;
823 info->target.flags |= CEPH_OSD_FLAG_WRITE;
824 info->ops = op.ops;
825 info->inbl = inbl;
826 info->poutbl = NULL;
827 info->pobjver = objver;
828 info->on_reg_commit = oncommit;
829
830 shunique_lock sul(rwlock, ceph::acquire_unique);
831 _linger_submit(info, sul);
832 logger->inc(l_osdc_linger_active);
833
834 return info->linger_id;
835}
836
837ceph_tid_t Objecter::linger_notify(LingerOp *info,
838 ObjectOperation& op,
839 snapid_t snap, bufferlist& inbl,
840 bufferlist *poutbl,
841 Context *onfinish,
842 version_t *objver)
843{
844 info->snap = snap;
845 info->target.flags |= CEPH_OSD_FLAG_READ;
846 info->ops = op.ops;
847 info->inbl = inbl;
848 info->poutbl = poutbl;
849 info->pobjver = objver;
850 info->on_reg_commit = onfinish;
851
852 shunique_lock sul(rwlock, ceph::acquire_unique);
853 _linger_submit(info, sul);
854 logger->inc(l_osdc_linger_active);
855
856 return info->linger_id;
857}
858
859void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
860{
861 assert(sul.owns_lock() && sul.mutex() == &rwlock);
862 assert(info->linger_id);
863
864 // Populate Op::target
865 OSDSession *s = NULL;
866 _calc_target(&info->target, nullptr);
867
868 // Create LingerOp<->OSDSession relation
869 int r = _get_session(info->target.osd, &s, sul);
870 assert(r == 0);
871 OSDSession::unique_lock sl(s->lock);
872 _session_linger_op_assign(s, info);
873 sl.unlock();
874 put_session(s);
875
876 _send_linger(info, sul);
877}
878
879struct C_DoWatchNotify : public Context {
880 Objecter *objecter;
881 Objecter::LingerOp *info;
882 MWatchNotify *msg;
883 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
884 : objecter(o), info(i), msg(m) {
885 info->get();
886 info->_queued_async();
887 msg->get();
888 }
889 void finish(int r) override {
890 objecter->_do_watch_notify(info, msg);
891 }
892};
893
894void Objecter::handle_watch_notify(MWatchNotify *m)
895{
896 shared_lock l(rwlock);
31f18b77 897 if (!initialized) {
7c673cae
FG
898 return;
899 }
900
901 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
902 if (linger_ops_set.count(info) == 0) {
903 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
904 return;
905 }
906 LingerOp::unique_lock wl(info->watch_lock);
907 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
908 if (!info->last_error) {
909 info->last_error = -ENOTCONN;
910 if (info->watch_context) {
911 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
912 }
913 }
914 } else if (!info->is_watch) {
915 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
916 // since we know the only user (librados) is safe to call in
917 // fast-dispatch context
918 if (info->notify_id &&
919 info->notify_id != m->notify_id) {
920 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
921 << " != " << info->notify_id << ", ignoring" << dendl;
922 } else if (info->on_notify_finish) {
923 info->notify_result_bl->claim(m->get_data());
924 info->on_notify_finish->complete(m->return_code);
925
926 // if we race with reconnect we might get a second notify; only
927 // notify the caller once!
928 info->on_notify_finish = NULL;
929 }
930 } else {
931 finisher->queue(new C_DoWatchNotify(this, info, m));
932 }
933}
934
935void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
936{
937 ldout(cct, 10) << __func__ << " " << *m << dendl;
938
939 shared_lock l(rwlock);
31f18b77 940 assert(initialized);
7c673cae
FG
941
942 if (info->canceled) {
943 l.unlock();
944 goto out;
945 }
946
947 // notify completion?
948 assert(info->is_watch);
949 assert(info->watch_context);
950 assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
951
952 l.unlock();
953
954 switch (m->opcode) {
955 case CEPH_WATCH_EVENT_NOTIFY:
956 info->watch_context->handle_notify(m->notify_id, m->cookie,
957 m->notifier_gid, m->bl);
958 break;
959 }
960
961 out:
962 info->finished_async();
963 info->put();
964 m->put();
965}
966
967bool Objecter::ms_dispatch(Message *m)
968{
969 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
31f18b77 970 if (!initialized)
7c673cae
FG
971 return false;
972
973 switch (m->get_type()) {
974 // these we exlusively handle
975 case CEPH_MSG_OSD_OPREPLY:
976 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
977 return true;
978
979 case CEPH_MSG_OSD_BACKOFF:
980 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
981 return true;
982
983 case CEPH_MSG_WATCH_NOTIFY:
984 handle_watch_notify(static_cast<MWatchNotify*>(m));
985 m->put();
986 return true;
987
988 case MSG_COMMAND_REPLY:
989 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
990 handle_command_reply(static_cast<MCommandReply*>(m));
991 return true;
992 } else {
993 return false;
994 }
995
996 case MSG_GETPOOLSTATSREPLY:
997 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
998 return true;
999
1000 case CEPH_MSG_POOLOP_REPLY:
1001 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1002 return true;
1003
1004 case CEPH_MSG_STATFS_REPLY:
1005 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1006 return true;
1007
1008 // these we give others a chance to inspect
1009
1010 // MDS, OSD
1011 case CEPH_MSG_OSD_MAP:
1012 handle_osd_map(static_cast<MOSDMap*>(m));
1013 return false;
1014 }
1015 return false;
1016}
1017
1018void Objecter::_scan_requests(OSDSession *s,
1019 bool force_resend,
1020 bool cluster_full,
1021 map<int64_t, bool> *pool_full_map,
1022 map<ceph_tid_t, Op*>& need_resend,
1023 list<LingerOp*>& need_resend_linger,
1024 map<ceph_tid_t, CommandOp*>& need_resend_command,
1025 shunique_lock& sul)
1026{
1027 assert(sul.owns_lock() && sul.mutex() == &rwlock);
1028
1029 list<LingerOp*> unregister_lingers;
1030
1031 OSDSession::unique_lock sl(s->lock);
1032
1033 // check for changed linger mappings (_before_ regular ops)
1034 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1035 while (lp != s->linger_ops.end()) {
1036 LingerOp *op = lp->second;
1037 assert(op->session == s);
1038 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1039 // invalidation
1040 ++lp;
1041 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1042 bool unregister, force_resend_writes = cluster_full;
1043 int r = _recalc_linger_op_target(op, sul);
1044 if (pool_full_map)
1045 force_resend_writes = force_resend_writes ||
1046 (*pool_full_map)[op->target.base_oloc.pool];
1047 switch (r) {
1048 case RECALC_OP_TARGET_NO_ACTION:
1049 if (!force_resend && !force_resend_writes)
1050 break;
1051 // -- fall-thru --
1052 case RECALC_OP_TARGET_NEED_RESEND:
1053 need_resend_linger.push_back(op);
1054 _linger_cancel_map_check(op);
1055 break;
1056 case RECALC_OP_TARGET_POOL_DNE:
1057 _check_linger_pool_dne(op, &unregister);
1058 if (unregister) {
1059 ldout(cct, 10) << " need to unregister linger op "
1060 << op->linger_id << dendl;
1061 op->get();
1062 unregister_lingers.push_back(op);
1063 }
1064 break;
1065 }
1066 }
1067
1068 // check for changed request mappings
1069 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1070 while (p != s->ops.end()) {
1071 Op *op = p->second;
1072 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1073 ldout(cct, 10) << " checking op " << op->tid << dendl;
1074 bool force_resend_writes = cluster_full;
1075 if (pool_full_map)
1076 force_resend_writes = force_resend_writes ||
1077 (*pool_full_map)[op->target.base_oloc.pool];
1078 int r = _calc_target(&op->target,
1079 op->session ? op->session->con.get() : nullptr);
1080 switch (r) {
1081 case RECALC_OP_TARGET_NO_ACTION:
1082 if (!force_resend && !(force_resend_writes && op->respects_full()))
1083 break;
1084 // -- fall-thru --
1085 case RECALC_OP_TARGET_NEED_RESEND:
1086 if (op->session) {
1087 _session_op_remove(op->session, op);
1088 }
1089 need_resend[op->tid] = op;
1090 _op_cancel_map_check(op);
1091 break;
1092 case RECALC_OP_TARGET_POOL_DNE:
1093 _check_op_pool_dne(op, &sl);
1094 break;
1095 }
1096 }
1097
1098 // commands
1099 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1100 while (cp != s->command_ops.end()) {
1101 CommandOp *c = cp->second;
1102 ++cp;
1103 ldout(cct, 10) << " checking command " << c->tid << dendl;
1104 bool force_resend_writes = cluster_full;
1105 if (pool_full_map)
1106 force_resend_writes = force_resend_writes ||
1107 (*pool_full_map)[c->target_pg.pool()];
1108 int r = _calc_command_target(c, sul);
1109 switch (r) {
1110 case RECALC_OP_TARGET_NO_ACTION:
1111 // resend if skipped map; otherwise do nothing.
1112 if (!force_resend && !force_resend_writes)
1113 break;
1114 // -- fall-thru --
1115 case RECALC_OP_TARGET_NEED_RESEND:
1116 need_resend_command[c->tid] = c;
1117 if (c->session) {
1118 _session_command_op_remove(c->session, c);
1119 }
1120 _command_cancel_map_check(c);
1121 break;
1122 case RECALC_OP_TARGET_POOL_DNE:
1123 case RECALC_OP_TARGET_OSD_DNE:
1124 case RECALC_OP_TARGET_OSD_DOWN:
1125 _check_command_map_dne(c);
1126 break;
1127 }
1128 }
1129
1130 sl.unlock();
1131
1132 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1133 iter != unregister_lingers.end();
1134 ++iter) {
1135 _linger_cancel(*iter);
1136 (*iter)->put();
1137 }
1138}
1139
1140void Objecter::handle_osd_map(MOSDMap *m)
1141{
1142 shunique_lock sul(rwlock, acquire_unique);
31f18b77 1143 if (!initialized)
7c673cae
FG
1144 return;
1145
1146 assert(osdmap);
1147
1148 if (m->fsid != monc->get_fsid()) {
1149 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1150 << " != " << monc->get_fsid() << dendl;
1151 return;
1152 }
1153
1154 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1155 bool cluster_full = _osdmap_full_flag();
1156 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1157 _osdmap_has_pool_full();
1158 map<int64_t, bool> pool_full_map;
1159 for (map<int64_t, pg_pool_t>::const_iterator it
1160 = osdmap->get_pools().begin();
1161 it != osdmap->get_pools().end(); ++it)
1162 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1163
1164
1165 list<LingerOp*> need_resend_linger;
1166 map<ceph_tid_t, Op*> need_resend;
1167 map<ceph_tid_t, CommandOp*> need_resend_command;
1168
1169 if (m->get_last() <= osdmap->get_epoch()) {
1170 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1171 << m->get_first() << "," << m->get_last()
1172 << "] <= " << osdmap->get_epoch() << dendl;
1173 } else {
1174 ldout(cct, 3) << "handle_osd_map got epochs ["
1175 << m->get_first() << "," << m->get_last()
1176 << "] > " << osdmap->get_epoch() << dendl;
1177
1178 if (osdmap->get_epoch()) {
1179 bool skipped_map = false;
1180 // we want incrementals
1181 for (epoch_t e = osdmap->get_epoch() + 1;
1182 e <= m->get_last();
1183 e++) {
1184
1185 if (osdmap->get_epoch() == e-1 &&
1186 m->incremental_maps.count(e)) {
1187 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1188 << dendl;
1189 OSDMap::Incremental inc(m->incremental_maps[e]);
1190 osdmap->apply_incremental(inc);
31f18b77
FG
1191
1192 emit_blacklist_events(inc);
1193
7c673cae
FG
1194 logger->inc(l_osdc_map_inc);
1195 }
1196 else if (m->maps.count(e)) {
1197 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
31f18b77
FG
1198 OSDMap *new_osdmap = new OSDMap();
1199 new_osdmap->decode(m->maps[e]);
1200
1201 emit_blacklist_events(*osdmap, *new_osdmap);
1202
1203 osdmap = new_osdmap;
1204
7c673cae
FG
1205 logger->inc(l_osdc_map_full);
1206 }
1207 else {
1208 if (e >= m->get_oldest()) {
1209 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1210 << osdmap->get_epoch()+1 << dendl;
1211 _maybe_request_map();
1212 break;
1213 }
1214 ldout(cct, 3) << "handle_osd_map missing epoch "
1215 << osdmap->get_epoch()+1
1216 << ", jumping to " << m->get_oldest() << dendl;
1217 e = m->get_oldest() - 1;
1218 skipped_map = true;
1219 continue;
1220 }
1221 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1222
1223 cluster_full = cluster_full || _osdmap_full_flag();
1224 update_pool_full_map(pool_full_map);
1225
1226 // check all outstanding requests on every epoch
1227 _scan_requests(homeless_session, skipped_map, cluster_full,
1228 &pool_full_map, need_resend,
1229 need_resend_linger, need_resend_command, sul);
1230 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1231 p != osd_sessions.end(); ) {
1232 OSDSession *s = p->second;
1233 _scan_requests(s, skipped_map, cluster_full,
1234 &pool_full_map, need_resend,
1235 need_resend_linger, need_resend_command, sul);
1236 ++p;
1237 // osd down or addr change?
1238 if (!osdmap->is_up(s->osd) ||
1239 (s->con &&
1240 s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
1241 close_session(s);
1242 }
1243 }
1244
1245 assert(e == osdmap->get_epoch());
1246 }
1247
1248 } else {
1249 // first map. we want the full thing.
1250 if (m->maps.count(m->get_last())) {
1251 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1252 p != osd_sessions.end(); ++p) {
1253 OSDSession *s = p->second;
1254 _scan_requests(s, false, false, NULL, need_resend,
1255 need_resend_linger, need_resend_command, sul);
1256 }
1257 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1258 << m->get_last() << dendl;
1259 osdmap->decode(m->maps[m->get_last()]);
1260
1261 _scan_requests(homeless_session, false, false, NULL,
1262 need_resend, need_resend_linger,
1263 need_resend_command, sul);
1264 } else {
1265 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1266 << dendl;
1267 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1268 monc->renew_subs();
1269 }
1270 }
1271 }
1272
1273 // make sure need_resend targets reflect latest map
1274 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1275 Op *op = p->second;
1276 if (op->target.epoch < osdmap->get_epoch()) {
1277 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1278 int r = _calc_target(&op->target, nullptr);
1279 if (r == RECALC_OP_TARGET_POOL_DNE) {
1280 p = need_resend.erase(p);
1281 _check_op_pool_dne(op, nullptr);
1282 } else {
1283 ++p;
1284 }
1285 } else {
1286 ++p;
1287 }
1288 }
1289
1290 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1291 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1292 || _osdmap_has_pool_full();
1293
1294 // was/is paused?
1295 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1296 osdmap->get_epoch() < epoch_barrier) {
1297 _maybe_request_map();
1298 }
1299
1300 // resend requests
1301 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1302 p != need_resend.end(); ++p) {
1303 Op *op = p->second;
1304 OSDSession *s = op->session;
1305 bool mapped_session = false;
1306 if (!s) {
1307 int r = _map_session(&op->target, &s, sul);
1308 assert(r == 0);
1309 mapped_session = true;
1310 } else {
1311 get_session(s);
1312 }
1313 OSDSession::unique_lock sl(s->lock);
1314 if (mapped_session) {
1315 _session_op_assign(s, op);
1316 }
1317 if (op->should_resend) {
1318 if (!op->session->is_homeless() && !op->target.paused) {
1319 logger->inc(l_osdc_op_resend);
1320 _send_op(op);
1321 }
1322 } else {
1323 _op_cancel_map_check(op);
1324 _cancel_linger_op(op);
1325 }
1326 sl.unlock();
1327 put_session(s);
1328 }
1329 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1330 p != need_resend_linger.end(); ++p) {
1331 LingerOp *op = *p;
1332 if (!op->session) {
1333 _calc_target(&op->target, nullptr);
1334 OSDSession *s = NULL;
d2e6a577 1335 const int r = _get_session(op->target.osd, &s, sul);
7c673cae
FG
1336 assert(r == 0);
1337 assert(s != NULL);
1338 op->session = s;
1339 put_session(s);
1340 }
1341 if (!op->session->is_homeless()) {
1342 logger->inc(l_osdc_linger_resend);
1343 _send_linger(op, sul);
1344 }
1345 }
1346 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1347 p != need_resend_command.end(); ++p) {
1348 CommandOp *c = p->second;
1349 if (c->target.osd >= 0) {
1350 _assign_command_session(c, sul);
1351 if (c->session && !c->session->is_homeless()) {
1352 _send_command(c);
1353 }
1354 }
1355 }
1356
1357 _dump_active();
1358
1359 // finish any Contexts that were waiting on a map update
1360 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1361 waiting_for_map.begin();
1362 while (p != waiting_for_map.end() &&
1363 p->first <= osdmap->get_epoch()) {
1364 //go through the list and call the onfinish methods
1365 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1366 i != p->second.end(); ++i) {
1367 i->first->complete(i->second);
1368 }
1369 waiting_for_map.erase(p++);
1370 }
1371
1372 monc->sub_got("osdmap", osdmap->get_epoch());
1373
1374 if (!waiting_for_map.empty()) {
1375 _maybe_request_map();
1376 }
1377}
1378
31f18b77
FG
1379void Objecter::enable_blacklist_events()
1380{
1381 unique_lock wl(rwlock);
1382
1383 blacklist_events_enabled = true;
1384}
1385
1386void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1387{
1388 unique_lock wl(rwlock);
1389
1390 if (events->empty()) {
1391 events->swap(blacklist_events);
1392 } else {
1393 for (const auto &i : blacklist_events) {
1394 events->insert(i);
1395 }
1396 blacklist_events.clear();
1397 }
1398}
1399
1400void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1401{
1402 if (!blacklist_events_enabled) {
1403 return;
1404 }
1405
1406 for (const auto &i : inc.new_blacklist) {
1407 blacklist_events.insert(i.first);
1408 }
1409}
1410
1411void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1412 const OSDMap &new_osd_map)
1413{
1414 if (!blacklist_events_enabled) {
1415 return;
1416 }
1417
1418 std::set<entity_addr_t> old_set;
1419 std::set<entity_addr_t> new_set;
1420
1421 old_osd_map.get_blacklist(&old_set);
1422 new_osd_map.get_blacklist(&new_set);
1423
1424 std::set<entity_addr_t> delta_set;
1425 std::set_difference(
1426 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1427 std::inserter(delta_set, delta_set.begin()));
1428 blacklist_events.insert(delta_set.begin(), delta_set.end());
1429}
1430
7c673cae
FG
1431// op pool check
1432
1433void Objecter::C_Op_Map_Latest::finish(int r)
1434{
1435 if (r == -EAGAIN || r == -ECANCELED)
1436 return;
1437
1438 lgeneric_subdout(objecter->cct, objecter, 10)
1439 << "op_map_latest r=" << r << " tid=" << tid
1440 << " latest " << latest << dendl;
1441
1442 Objecter::unique_lock wl(objecter->rwlock);
1443
1444 map<ceph_tid_t, Op*>::iterator iter =
1445 objecter->check_latest_map_ops.find(tid);
1446 if (iter == objecter->check_latest_map_ops.end()) {
1447 lgeneric_subdout(objecter->cct, objecter, 10)
1448 << "op_map_latest op "<< tid << " not found" << dendl;
1449 return;
1450 }
1451
1452 Op *op = iter->second;
1453 objecter->check_latest_map_ops.erase(iter);
1454
1455 lgeneric_subdout(objecter->cct, objecter, 20)
1456 << "op_map_latest op "<< op << dendl;
1457
1458 if (op->map_dne_bound == 0)
1459 op->map_dne_bound = latest;
1460
1461 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1462 objecter->_check_op_pool_dne(op, &sl);
1463
1464 op->put();
1465}
1466
1467int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1468 snapid_t *snap) const
1469{
1470 shared_lock rl(rwlock);
1471
1472 auto& pools = osdmap->get_pools();
1473 auto iter = pools.find(poolid);
1474 if (iter == pools.end()) {
1475 return -ENOENT;
1476 }
1477 const pg_pool_t& pg_pool = iter->second;
1478 for (auto p = pg_pool.snaps.begin();
1479 p != pg_pool.snaps.end();
1480 ++p) {
1481 if (p->second.name == snap_name) {
1482 *snap = p->first;
1483 return 0;
1484 }
1485 }
1486 return -ENOENT;
1487}
1488
1489int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1490 pool_snap_info_t *info) const
1491{
1492 shared_lock rl(rwlock);
1493
1494 auto& pools = osdmap->get_pools();
1495 auto iter = pools.find(poolid);
1496 if (iter == pools.end()) {
1497 return -ENOENT;
1498 }
1499 const pg_pool_t& pg_pool = iter->second;
1500 auto p = pg_pool.snaps.find(snap);
1501 if (p == pg_pool.snaps.end())
1502 return -ENOENT;
1503 *info = p->second;
1504
1505 return 0;
1506}
1507
1508int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1509{
1510 shared_lock rl(rwlock);
1511
1512 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1513 if (!pi)
1514 return -ENOENT;
1515 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1516 p != pi->snaps.end();
1517 ++p) {
1518 snaps->push_back(p->first);
1519 }
1520 return 0;
1521}
1522
1523// sl may be unlocked.
1524void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1525{
1526 // rwlock is locked unique
1527
1528 if (op->target.pool_ever_existed) {
1529 // the pool previously existed and now it does not, which means it
1530 // was deleted.
1531 op->map_dne_bound = osdmap->get_epoch();
1532 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1533 << " pool previously exists but now does not"
1534 << dendl;
1535 } else {
1536 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1537 << " current " << osdmap->get_epoch()
1538 << " map_dne_bound " << op->map_dne_bound
1539 << dendl;
1540 }
1541 if (op->map_dne_bound > 0) {
1542 if (osdmap->get_epoch() >= op->map_dne_bound) {
1543 // we had a new enough map
1544 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1545 << " concluding pool " << op->target.base_pgid.pool()
1546 << " dne" << dendl;
1547 if (op->onfinish) {
1548 op->onfinish->complete(-ENOENT);
1549 }
1550
1551 OSDSession *s = op->session;
1552 if (s) {
1553 assert(s != NULL);
1554 assert(sl->mutex() == &s->lock);
1555 bool session_locked = sl->owns_lock();
1556 if (!session_locked) {
1557 sl->lock();
1558 }
1559 _finish_op(op, 0);
1560 if (!session_locked) {
1561 sl->unlock();
1562 }
1563 } else {
1564 _finish_op(op, 0); // no session
1565 }
1566 }
1567 } else {
1568 _send_op_map_check(op);
1569 }
1570}
1571
1572void Objecter::_send_op_map_check(Op *op)
1573{
1574 // rwlock is locked unique
1575 // ask the monitor
1576 if (check_latest_map_ops.count(op->tid) == 0) {
1577 op->get();
1578 check_latest_map_ops[op->tid] = op;
1579 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1580 monc->get_version("osdmap", &c->latest, NULL, c);
1581 }
1582}
1583
1584void Objecter::_op_cancel_map_check(Op *op)
1585{
1586 // rwlock is locked unique
1587 map<ceph_tid_t, Op*>::iterator iter =
1588 check_latest_map_ops.find(op->tid);
1589 if (iter != check_latest_map_ops.end()) {
1590 Op *op = iter->second;
1591 op->put();
1592 check_latest_map_ops.erase(iter);
1593 }
1594}
1595
1596// linger pool check
1597
1598void Objecter::C_Linger_Map_Latest::finish(int r)
1599{
1600 if (r == -EAGAIN || r == -ECANCELED) {
1601 // ignore callback; we will retry in resend_mon_ops()
1602 return;
1603 }
1604
1605 unique_lock wl(objecter->rwlock);
1606
1607 map<uint64_t, LingerOp*>::iterator iter =
1608 objecter->check_latest_map_lingers.find(linger_id);
1609 if (iter == objecter->check_latest_map_lingers.end()) {
1610 return;
1611 }
1612
1613 LingerOp *op = iter->second;
1614 objecter->check_latest_map_lingers.erase(iter);
1615
1616 if (op->map_dne_bound == 0)
1617 op->map_dne_bound = latest;
1618
1619 bool unregister;
1620 objecter->_check_linger_pool_dne(op, &unregister);
1621
1622 if (unregister) {
1623 objecter->_linger_cancel(op);
1624 }
1625
1626 op->put();
1627}
1628
1629void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1630{
1631 // rwlock is locked unique
1632
1633 *need_unregister = false;
1634
1635 if (op->register_gen > 0) {
1636 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1637 << " pool previously existed but now does not"
1638 << dendl;
1639 op->map_dne_bound = osdmap->get_epoch();
1640 } else {
1641 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1642 << " current " << osdmap->get_epoch()
1643 << " map_dne_bound " << op->map_dne_bound
1644 << dendl;
1645 }
1646 if (op->map_dne_bound > 0) {
1647 if (osdmap->get_epoch() >= op->map_dne_bound) {
28e407b8 1648 LingerOp::unique_lock wl{op->watch_lock};
7c673cae
FG
1649 if (op->on_reg_commit) {
1650 op->on_reg_commit->complete(-ENOENT);
28e407b8
AA
1651 op->on_reg_commit = nullptr;
1652 }
1653 if (op->on_notify_finish) {
1654 op->on_notify_finish->complete(-ENOENT);
1655 op->on_notify_finish = nullptr;
7c673cae
FG
1656 }
1657 *need_unregister = true;
1658 }
1659 } else {
1660 _send_linger_map_check(op);
1661 }
1662}
1663
1664void Objecter::_send_linger_map_check(LingerOp *op)
1665{
1666 // ask the monitor
1667 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1668 op->get();
1669 check_latest_map_lingers[op->linger_id] = op;
1670 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1671 monc->get_version("osdmap", &c->latest, NULL, c);
1672 }
1673}
1674
1675void Objecter::_linger_cancel_map_check(LingerOp *op)
1676{
1677 // rwlock is locked unique
1678
1679 map<uint64_t, LingerOp*>::iterator iter =
1680 check_latest_map_lingers.find(op->linger_id);
1681 if (iter != check_latest_map_lingers.end()) {
1682 LingerOp *op = iter->second;
1683 op->put();
1684 check_latest_map_lingers.erase(iter);
1685 }
1686}
1687
1688// command pool check
1689
1690void Objecter::C_Command_Map_Latest::finish(int r)
1691{
1692 if (r == -EAGAIN || r == -ECANCELED) {
1693 // ignore callback; we will retry in resend_mon_ops()
1694 return;
1695 }
1696
1697 unique_lock wl(objecter->rwlock);
1698
1699 map<uint64_t, CommandOp*>::iterator iter =
1700 objecter->check_latest_map_commands.find(tid);
1701 if (iter == objecter->check_latest_map_commands.end()) {
1702 return;
1703 }
1704
1705 CommandOp *c = iter->second;
1706 objecter->check_latest_map_commands.erase(iter);
1707
1708 if (c->map_dne_bound == 0)
1709 c->map_dne_bound = latest;
1710
28e407b8 1711 OSDSession::unique_lock sul(c->session->lock);
7c673cae 1712 objecter->_check_command_map_dne(c);
28e407b8 1713 sul.unlock();
7c673cae
FG
1714
1715 c->put();
1716}
1717
1718void Objecter::_check_command_map_dne(CommandOp *c)
1719{
1720 // rwlock is locked unique
28e407b8 1721 // session is locked unique
7c673cae
FG
1722
1723 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1724 << " current " << osdmap->get_epoch()
1725 << " map_dne_bound " << c->map_dne_bound
1726 << dendl;
1727 if (c->map_dne_bound > 0) {
1728 if (osdmap->get_epoch() >= c->map_dne_bound) {
1729 _finish_command(c, c->map_check_error, c->map_check_error_str);
1730 }
1731 } else {
1732 _send_command_map_check(c);
1733 }
1734}
1735
1736void Objecter::_send_command_map_check(CommandOp *c)
1737{
1738 // rwlock is locked unique
28e407b8 1739 // session is locked unique
7c673cae
FG
1740
1741 // ask the monitor
1742 if (check_latest_map_commands.count(c->tid) == 0) {
1743 c->get();
1744 check_latest_map_commands[c->tid] = c;
1745 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1746 monc->get_version("osdmap", &f->latest, NULL, f);
1747 }
1748}
1749
1750void Objecter::_command_cancel_map_check(CommandOp *c)
1751{
1752 // rwlock is locked uniqe
1753
1754 map<uint64_t, CommandOp*>::iterator iter =
1755 check_latest_map_commands.find(c->tid);
1756 if (iter != check_latest_map_commands.end()) {
1757 CommandOp *c = iter->second;
1758 c->put();
1759 check_latest_map_commands.erase(iter);
1760 }
1761}
1762
1763
1764/**
1765 * Look up OSDSession by OSD id.
1766 *
1767 * @returns 0 on success, or -EAGAIN if the lock context requires
1768 * promotion to write.
1769 */
1770int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1771{
1772 assert(sul && sul.mutex() == &rwlock);
1773
1774 if (osd < 0) {
1775 *session = homeless_session;
1776 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1777 << dendl;
1778 return 0;
1779 }
1780
1781 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1782 if (p != osd_sessions.end()) {
1783 OSDSession *s = p->second;
1784 s->get();
1785 *session = s;
1786 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1787 << s->get_nref() << dendl;
1788 return 0;
1789 }
1790 if (!sul.owns_lock()) {
1791 return -EAGAIN;
1792 }
1793 OSDSession *s = new OSDSession(cct, osd);
1794 osd_sessions[osd] = s;
1795 s->con = messenger->get_connection(osdmap->get_inst(osd));
1796 s->con->set_priv(s->get());
1797 logger->inc(l_osdc_osd_session_open);
1798 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1799 s->get();
1800 *session = s;
1801 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1802 << s->get_nref() << dendl;
1803 return 0;
1804}
1805
1806void Objecter::put_session(Objecter::OSDSession *s)
1807{
1808 if (s && !s->is_homeless()) {
1809 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1810 << s->get_nref() << dendl;
1811 s->put();
1812 }
1813}
1814
1815void Objecter::get_session(Objecter::OSDSession *s)
1816{
1817 assert(s != NULL);
1818
1819 if (!s->is_homeless()) {
1820 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1821 << s->get_nref() << dendl;
1822 s->get();
1823 }
1824}
1825
1826void Objecter::_reopen_session(OSDSession *s)
1827{
91327a77 1828 // rwlock is locked unique
7c673cae
FG
1829 // s->lock is locked
1830
1831 entity_inst_t inst = osdmap->get_inst(s->osd);
1832 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
1833 << inst << dendl;
1834 if (s->con) {
1835 s->con->set_priv(NULL);
1836 s->con->mark_down();
1837 logger->inc(l_osdc_osd_session_close);
1838 }
1839 s->con = messenger->get_connection(inst);
1840 s->con->set_priv(s->get());
1841 s->incarnation++;
1842 logger->inc(l_osdc_osd_session_open);
1843}
1844
1845void Objecter::close_session(OSDSession *s)
1846{
1847 // rwlock is locked unique
1848
1849 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1850 if (s->con) {
1851 s->con->set_priv(NULL);
1852 s->con->mark_down();
1853 logger->inc(l_osdc_osd_session_close);
1854 }
1855 OSDSession::unique_lock sl(s->lock);
1856
1857 std::list<LingerOp*> homeless_lingers;
1858 std::list<CommandOp*> homeless_commands;
1859 std::list<Op*> homeless_ops;
1860
1861 while (!s->linger_ops.empty()) {
1862 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1863 ldout(cct, 10) << " linger_op " << i->first << dendl;
1864 homeless_lingers.push_back(i->second);
1865 _session_linger_op_remove(s, i->second);
1866 }
1867
1868 while (!s->ops.empty()) {
1869 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1870 ldout(cct, 10) << " op " << i->first << dendl;
1871 homeless_ops.push_back(i->second);
1872 _session_op_remove(s, i->second);
1873 }
1874
1875 while (!s->command_ops.empty()) {
1876 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1877 ldout(cct, 10) << " command_op " << i->first << dendl;
1878 homeless_commands.push_back(i->second);
1879 _session_command_op_remove(s, i->second);
1880 }
1881
1882 osd_sessions.erase(s->osd);
1883 sl.unlock();
1884 put_session(s);
1885
1886 // Assign any leftover ops to the homeless session
1887 {
1888 OSDSession::unique_lock hsl(homeless_session->lock);
1889 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1890 i != homeless_lingers.end(); ++i) {
1891 _session_linger_op_assign(homeless_session, *i);
1892 }
1893 for (std::list<Op*>::iterator i = homeless_ops.begin();
1894 i != homeless_ops.end(); ++i) {
1895 _session_op_assign(homeless_session, *i);
1896 }
1897 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1898 i != homeless_commands.end(); ++i) {
1899 _session_command_op_assign(homeless_session, *i);
1900 }
1901 }
1902
1903 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1904}
1905
1906void Objecter::wait_for_osd_map()
1907{
1908 unique_lock l(rwlock);
1909 if (osdmap->get_epoch()) {
1910 l.unlock();
1911 return;
1912 }
1913
1914 // Leave this since it goes with C_SafeCond
1915 Mutex lock("");
1916 Cond cond;
1917 bool done;
1918 lock.Lock();
1919 C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
1920 waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
1921 l.unlock();
1922 while (!done)
1923 cond.Wait(lock);
1924 lock.Unlock();
1925}
1926
1927struct C_Objecter_GetVersion : public Context {
1928 Objecter *objecter;
1929 uint64_t oldest, newest;
1930 Context *fin;
1931 C_Objecter_GetVersion(Objecter *o, Context *c)
1932 : objecter(o), oldest(0), newest(0), fin(c) {}
1933 void finish(int r) override {
1934 if (r >= 0) {
1935 objecter->get_latest_version(oldest, newest, fin);
1936 } else if (r == -EAGAIN) { // try again as instructed
1937 objecter->wait_for_latest_osdmap(fin);
1938 } else {
1939 // it doesn't return any other error codes!
1940 ceph_abort();
1941 }
1942 }
1943};
1944
1945void Objecter::wait_for_latest_osdmap(Context *fin)
1946{
1947 ldout(cct, 10) << __func__ << dendl;
1948 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1949 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1950}
1951
1952void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1953{
1954 unique_lock wl(rwlock);
1955 _get_latest_version(oldest, newest, fin);
1956}
1957
1958void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1959 Context *fin)
1960{
1961 // rwlock is locked unique
1962 if (osdmap->get_epoch() >= newest) {
1963 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1964 if (fin)
1965 fin->complete(0);
1966 return;
1967 }
1968
1969 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1970 _wait_for_new_map(fin, newest, 0);
1971}
1972
1973void Objecter::maybe_request_map()
1974{
1975 shared_lock rl(rwlock);
1976 _maybe_request_map();
1977}
1978
1979void Objecter::_maybe_request_map()
1980{
1981 // rwlock is locked
1982 int flag = 0;
1983 if (_osdmap_full_flag()
1984 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1985 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1986 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1987 "osd map (FULL flag is set)" << dendl;
1988 } else {
1989 ldout(cct, 10)
1990 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1991 flag = CEPH_SUBSCRIBE_ONETIME;
1992 }
1993 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1994 if (monc->sub_want("osdmap", epoch, flag)) {
1995 monc->renew_subs();
1996 }
1997}
1998
1999void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
2000{
2001 // rwlock is locked unique
2002 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
2003 _maybe_request_map();
2004}
2005
2006
2007/**
2008 * Use this together with wait_for_map: this is a pre-check to avoid
2009 * allocating a Context for wait_for_map if we can see that we
2010 * definitely already have the epoch.
2011 *
2012 * This does *not* replace the need to handle the return value of
2013 * wait_for_map: just because we don't have it in this pre-check
2014 * doesn't mean we won't have it when calling back into wait_for_map,
2015 * since the objecter lock is dropped in between.
2016 */
2017bool Objecter::have_map(const epoch_t epoch)
2018{
2019 shared_lock rl(rwlock);
2020 if (osdmap->get_epoch() >= epoch) {
2021 return true;
2022 } else {
2023 return false;
2024 }
2025}
2026
2027bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2028{
2029 unique_lock wl(rwlock);
2030 if (osdmap->get_epoch() >= epoch) {
2031 return true;
2032 }
2033 _wait_for_new_map(c, epoch, err);
2034 return false;
2035}
2036
2037void Objecter::kick_requests(OSDSession *session)
2038{
2039 ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
2040
2041 map<uint64_t, LingerOp *> lresend;
2042 unique_lock wl(rwlock);
2043
2044 OSDSession::unique_lock sl(session->lock);
2045 _kick_requests(session, lresend);
2046 sl.unlock();
2047
2048 _linger_ops_resend(lresend, wl);
2049}
2050
2051void Objecter::_kick_requests(OSDSession *session,
2052 map<uint64_t, LingerOp *>& lresend)
2053{
2054 // rwlock is locked unique
2055
2056 // clear backoffs
2057 session->backoffs.clear();
2058 session->backoffs_by_id.clear();
2059
2060 // resend ops
2061 map<ceph_tid_t,Op*> resend; // resend in tid order
2062 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2063 p != session->ops.end();) {
2064 Op *op = p->second;
2065 ++p;
2066 logger->inc(l_osdc_op_resend);
2067 if (op->should_resend) {
2068 if (!op->target.paused)
2069 resend[op->tid] = op;
2070 } else {
2071 _op_cancel_map_check(op);
2072 _cancel_linger_op(op);
2073 }
2074 }
2075
2076 while (!resend.empty()) {
2077 _send_op(resend.begin()->second);
2078 resend.erase(resend.begin());
2079 }
2080
2081 // resend lingers
2082 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2083 j != session->linger_ops.end(); ++j) {
2084 LingerOp *op = j->second;
2085 op->get();
2086 logger->inc(l_osdc_linger_resend);
2087 assert(lresend.count(j->first) == 0);
2088 lresend[j->first] = op;
2089 }
2090
2091 // resend commands
2092 map<uint64_t,CommandOp*> cresend; // resend in order
2093 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2094 k != session->command_ops.end(); ++k) {
2095 logger->inc(l_osdc_command_resend);
2096 cresend[k->first] = k->second;
2097 }
2098 while (!cresend.empty()) {
2099 _send_command(cresend.begin()->second);
2100 cresend.erase(cresend.begin());
2101 }
2102}
2103
2104void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2105 unique_lock& ul)
2106{
2107 assert(ul.owns_lock());
2108 shunique_lock sul(std::move(ul));
2109 while (!lresend.empty()) {
2110 LingerOp *op = lresend.begin()->second;
2111 if (!op->canceled) {
2112 _send_linger(op, sul);
2113 }
2114 op->put();
2115 lresend.erase(lresend.begin());
2116 }
2117 ul = unique_lock(sul.release_to_unique());
2118}
2119
2120void Objecter::start_tick()
2121{
2122 assert(tick_event == 0);
2123 tick_event =
2124 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2125 &Objecter::tick, this);
2126}
2127
2128void Objecter::tick()
2129{
2130 shared_lock rl(rwlock);
2131
2132 ldout(cct, 10) << "tick" << dendl;
2133
2134 // we are only called by C_Tick
2135 tick_event = 0;
2136
31f18b77 2137 if (!initialized) {
7c673cae
FG
2138 // we raced with shutdown
2139 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2140 return;
2141 }
2142
2143 set<OSDSession*> toping;
2144
2145
2146 // look for laggy requests
2147 auto cutoff = ceph::mono_clock::now();
2148 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2149
2150 unsigned laggy_ops = 0;
2151
2152 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2153 siter != osd_sessions.end(); ++siter) {
2154 OSDSession *s = siter->second;
2155 OSDSession::lock_guard l(s->lock);
2156 bool found = false;
2157 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2158 p != s->ops.end();
2159 ++p) {
2160 Op *op = p->second;
2161 assert(op->session);
2162 if (op->stamp < cutoff) {
2163 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2164 << " is laggy" << dendl;
2165 found = true;
2166 ++laggy_ops;
2167 }
2168 }
2169 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2170 p != s->linger_ops.end();
2171 ++p) {
2172 LingerOp *op = p->second;
2173 LingerOp::unique_lock wl(op->watch_lock);
2174 assert(op->session);
2175 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2176 << " (osd." << op->session->osd << ")" << dendl;
2177 found = true;
2178 if (op->is_watch && op->registered && !op->last_error)
2179 _send_linger_ping(op);
2180 }
2181 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2182 p != s->command_ops.end();
2183 ++p) {
2184 CommandOp *op = p->second;
2185 assert(op->session);
2186 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2187 << " (osd." << op->session->osd << ")" << dendl;
2188 found = true;
2189 }
2190 if (found)
2191 toping.insert(s);
2192 }
31f18b77 2193 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2194 _maybe_request_map();
2195 }
2196
2197 logger->set(l_osdc_op_laggy, laggy_ops);
2198 logger->set(l_osdc_osd_laggy, toping.size());
2199
2200 if (!toping.empty()) {
2201 // send a ping to these osds, to ensure we detect any session resets
2202 // (osd reply message policy is lossy)
2203 for (set<OSDSession*>::const_iterator i = toping.begin();
2204 i != toping.end();
2205 ++i) {
2206 (*i)->con->send_message(new MPing);
2207 }
2208 }
2209
2210 // Make sure we don't resechedule if we wake up after shutdown
31f18b77 2211 if (initialized) {
7c673cae
FG
2212 tick_event = timer.reschedule_me(ceph::make_timespan(
2213 cct->_conf->objecter_tick_interval));
2214 }
2215}
2216
2217void Objecter::resend_mon_ops()
2218{
2219 unique_lock wl(rwlock);
2220
2221 ldout(cct, 10) << "resend_mon_ops" << dendl;
2222
2223 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2224 p != poolstat_ops.end();
2225 ++p) {
2226 _poolstat_submit(p->second);
2227 logger->inc(l_osdc_poolstat_resend);
2228 }
2229
2230 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2231 p != statfs_ops.end();
2232 ++p) {
2233 _fs_stats_submit(p->second);
2234 logger->inc(l_osdc_statfs_resend);
2235 }
2236
2237 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2238 p != pool_ops.end();
2239 ++p) {
2240 _pool_op_submit(p->second);
2241 logger->inc(l_osdc_poolop_resend);
2242 }
2243
2244 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2245 p != check_latest_map_ops.end();
2246 ++p) {
2247 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2248 monc->get_version("osdmap", &c->latest, NULL, c);
2249 }
2250
2251 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2252 p != check_latest_map_lingers.end();
2253 ++p) {
2254 C_Linger_Map_Latest *c
2255 = new C_Linger_Map_Latest(this, p->second->linger_id);
2256 monc->get_version("osdmap", &c->latest, NULL, c);
2257 }
2258
2259 for (map<uint64_t, CommandOp*>::iterator p
2260 = check_latest_map_commands.begin();
2261 p != check_latest_map_commands.end();
2262 ++p) {
2263 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2264 monc->get_version("osdmap", &c->latest, NULL, c);
2265 }
2266}
2267
2268// read | write ---------------------------
2269
2270void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2271{
2272 shunique_lock rl(rwlock, ceph::acquire_shared);
2273 ceph_tid_t tid = 0;
2274 if (!ptid)
2275 ptid = &tid;
31f18b77 2276 op->trace.event("op submit");
7c673cae
FG
2277 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2278}
2279
2280void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2281 ceph_tid_t *ptid,
2282 int *ctx_budget)
2283{
31f18b77 2284 assert(initialized);
7c673cae
FG
2285
2286 assert(op->ops.size() == op->out_bl.size());
2287 assert(op->ops.size() == op->out_rval.size());
2288 assert(op->ops.size() == op->out_handler.size());
2289
2290 // throttle. before we look at any state, because
2291 // _take_op_budget() may drop our lock while it blocks.
2292 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2293 int op_budget = _take_op_budget(op, sul);
2294 // take and pass out the budget for the first OP
2295 // in the context session
2296 if (ctx_budget && (*ctx_budget == -1)) {
2297 *ctx_budget = op_budget;
2298 }
2299 }
2300
2301 if (osd_timeout > timespan(0)) {
2302 if (op->tid == 0)
31f18b77 2303 op->tid = ++last_tid;
7c673cae
FG
2304 auto tid = op->tid;
2305 op->ontimeout = timer.add_event(osd_timeout,
2306 [this, tid]() {
2307 op_cancel(tid, -ETIMEDOUT); });
2308 }
2309
2310 _op_submit(op, sul, ptid);
2311}
2312
2313void Objecter::_send_op_account(Op *op)
2314{
31f18b77 2315 inflight_ops++;
7c673cae
FG
2316
2317 // add to gather set(s)
2318 if (op->onfinish) {
31f18b77 2319 num_in_flight++;
7c673cae
FG
2320 } else {
2321 ldout(cct, 20) << " note: not requesting reply" << dendl;
2322 }
2323
2324 logger->inc(l_osdc_op_active);
2325 logger->inc(l_osdc_op);
2326
2327 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2328 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2329 logger->inc(l_osdc_op_rmw);
2330 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2331 logger->inc(l_osdc_op_w);
2332 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2333 logger->inc(l_osdc_op_r);
2334
2335 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2336 logger->inc(l_osdc_op_pg);
2337
2338 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2339 int code = l_osdc_osdop_other;
2340 switch (p->op.op) {
2341 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2342 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2343 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2344 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2345 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2346 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2347 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2348 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2349 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2350 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2351 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2352 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2353 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2354 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2355 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2356 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2357 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
2358 case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
2359 case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
2360 case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
2361
2362 // OMAP read operations
2363 case CEPH_OSD_OP_OMAPGETVALS:
2364 case CEPH_OSD_OP_OMAPGETKEYS:
2365 case CEPH_OSD_OP_OMAPGETHEADER:
2366 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2367 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2368
2369 // OMAP write operations
2370 case CEPH_OSD_OP_OMAPSETVALS:
2371 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2372
2373 // OMAP del operations
2374 case CEPH_OSD_OP_OMAPCLEAR:
2375 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2376
2377 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2378 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2379 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2380 }
2381 if (code)
2382 logger->inc(code);
2383 }
2384}
2385
2386void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2387{
2388 // rwlock is locked
2389
2390 ldout(cct, 10) << __func__ << " op " << op << dendl;
2391
2392 // pick target
2393 assert(op->session == NULL);
2394 OSDSession *s = NULL;
2395
2396 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2397 == RECALC_OP_TARGET_POOL_DNE;
2398
2399 // Try to get a session, including a retry if we need to take write lock
2400 int r = _get_session(op->target.osd, &s, sul);
2401 if (r == -EAGAIN ||
2402 (check_for_latest_map && sul.owns_lock_shared())) {
2403 epoch_t orig_epoch = osdmap->get_epoch();
2404 sul.unlock();
2405 if (cct->_conf->objecter_debug_inject_relock_delay) {
2406 sleep(1);
2407 }
2408 sul.lock();
2409 if (orig_epoch != osdmap->get_epoch()) {
2410 // map changed; recalculate mapping
2411 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2412 << dendl;
2413 check_for_latest_map = _calc_target(&op->target, nullptr)
2414 == RECALC_OP_TARGET_POOL_DNE;
2415 if (s) {
2416 put_session(s);
2417 s = NULL;
2418 r = -EAGAIN;
2419 }
2420 }
2421 }
2422 if (r == -EAGAIN) {
2423 assert(s == NULL);
2424 r = _get_session(op->target.osd, &s, sul);
2425 }
2426 assert(r == 0);
2427 assert(s); // may be homeless
2428
2429 _send_op_account(op);
2430
2431 // send?
2432
2433 assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
2434
2435 if (osdmap_full_try) {
2436 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2437 }
2438
2439 bool need_send = false;
2440
2441 if (osdmap->get_epoch() < epoch_barrier) {
2442 ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
2443 << dendl;
2444 op->target.paused = true;
2445 _maybe_request_map();
2446 } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
2447 osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2448 ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
2449 << dendl;
2450 op->target.paused = true;
2451 _maybe_request_map();
2452 } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
2453 osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
2454 ldout(cct, 10) << " paused read " << op << " tid " << op->tid
2455 << dendl;
2456 op->target.paused = true;
2457 _maybe_request_map();
2458 } else if (op->respects_full() &&
2459 (_osdmap_full_flag() ||
2460 _osdmap_pool_full(op->target.base_oloc.pool))) {
2461 ldout(cct, 0) << " FULL, paused modify " << op << " tid "
2462 << op->tid << dendl;
2463 op->target.paused = true;
2464 _maybe_request_map();
2465 } else if (!s->is_homeless()) {
2466 need_send = true;
2467 } else {
2468 _maybe_request_map();
2469 }
2470
2471 MOSDOp *m = NULL;
2472 if (need_send) {
2473 m = _prepare_osd_op(op);
2474 }
2475
2476 OSDSession::unique_lock sl(s->lock);
2477 if (op->tid == 0)
31f18b77 2478 op->tid = ++last_tid;
7c673cae
FG
2479
2480 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2481 << " '" << op->target.base_oloc << "' '"
2482 << op->target.target_oloc << "' " << op->ops << " tid "
2483 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2484 << dendl;
2485
2486 _session_op_assign(s, op);
2487
2488 if (need_send) {
2489 _send_op(op, m);
2490 }
2491
2492 // Last chance to touch Op here, after giving up session lock it can
2493 // be freed at any time by response handler.
2494 ceph_tid_t tid = op->tid;
2495 if (check_for_latest_map) {
2496 _send_op_map_check(op);
2497 }
2498 if (ptid)
2499 *ptid = tid;
2500 op = NULL;
2501
2502 sl.unlock();
2503 put_session(s);
2504
31f18b77 2505 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2506}
2507
2508int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2509{
31f18b77 2510 assert(initialized);
7c673cae
FG
2511
2512 OSDSession::unique_lock sl(s->lock);
2513
2514 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2515 if (p == s->ops.end()) {
2516 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2517 << s->osd << dendl;
2518 return -ENOENT;
2519 }
2520
2521 if (s->con) {
2522 ldout(cct, 20) << " revoking rx buffer for " << tid
2523 << " on " << s->con << dendl;
2524 s->con->revoke_rx_buffer(tid);
2525 }
2526
2527 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2528 << dendl;
2529 Op *op = p->second;
2530 if (op->onfinish) {
31f18b77 2531 num_in_flight--;
7c673cae
FG
2532 op->onfinish->complete(r);
2533 op->onfinish = NULL;
2534 }
2535 _op_cancel_map_check(op);
2536 _finish_op(op, r);
2537 sl.unlock();
2538
2539 return 0;
2540}
2541
2542int Objecter::op_cancel(ceph_tid_t tid, int r)
2543{
2544 int ret = 0;
2545
2546 unique_lock wl(rwlock);
2547 ret = _op_cancel(tid, r);
2548
2549 return ret;
2550}
2551
94b18763
FG
2552int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2553{
2554 unique_lock wl(rwlock);
2555 ldout(cct,10) << __func__ << " " << tids << dendl;
2556 for (auto tid : tids) {
2557 _op_cancel(tid, r);
2558 }
2559 return 0;
2560}
2561
7c673cae
FG
2562int Objecter::_op_cancel(ceph_tid_t tid, int r)
2563{
2564 int ret = 0;
2565
2566 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2567 << dendl;
2568
2569start:
2570
2571 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2572 siter != osd_sessions.end(); ++siter) {
2573 OSDSession *s = siter->second;
2574 OSDSession::shared_lock sl(s->lock);
2575 if (s->ops.find(tid) != s->ops.end()) {
2576 sl.unlock();
2577 ret = op_cancel(s, tid, r);
2578 if (ret == -ENOENT) {
2579 /* oh no! raced, maybe tid moved to another session, restarting */
2580 goto start;
2581 }
2582 return ret;
2583 }
2584 }
2585
2586 ldout(cct, 5) << __func__ << ": tid " << tid
2587 << " not found in live sessions" << dendl;
2588
2589 // Handle case where the op is in homeless session
2590 OSDSession::shared_lock sl(homeless_session->lock);
2591 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2592 sl.unlock();
2593 ret = op_cancel(homeless_session, tid, r);
2594 if (ret == -ENOENT) {
2595 /* oh no! raced, maybe tid moved to another session, restarting */
2596 goto start;
2597 } else {
2598 return ret;
2599 }
2600 } else {
2601 sl.unlock();
2602 }
2603
2604 ldout(cct, 5) << __func__ << ": tid " << tid
2605 << " not found in homeless session" << dendl;
2606
2607 return ret;
2608}
2609
2610
2611epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2612{
2613 unique_lock wl(rwlock);
2614
2615 std::vector<ceph_tid_t> to_cancel;
2616 bool found = false;
2617
2618 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2619 siter != osd_sessions.end(); ++siter) {
2620 OSDSession *s = siter->second;
2621 OSDSession::shared_lock sl(s->lock);
2622 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2623 op_i != s->ops.end(); ++op_i) {
2624 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2625 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2626 to_cancel.push_back(op_i->first);
2627 }
2628 }
2629 sl.unlock();
2630
2631 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2632 titer != to_cancel.end();
2633 ++titer) {
2634 int cancel_result = op_cancel(s, *titer, r);
2635 // We hold rwlock across search and cancellation, so cancels
2636 // should always succeed
2637 assert(cancel_result == 0);
2638 }
2639 if (!found && to_cancel.size())
2640 found = true;
2641 to_cancel.clear();
2642 }
2643
2644 const epoch_t epoch = osdmap->get_epoch();
2645
2646 wl.unlock();
2647
2648 if (found) {
2649 return epoch;
2650 } else {
2651 return -1;
2652 }
2653}
2654
2655bool Objecter::is_pg_changed(
2656 int oldprimary,
2657 const vector<int>& oldacting,
2658 int newprimary,
2659 const vector<int>& newacting,
2660 bool any_change)
2661{
2662 if (OSDMap::primary_changed(
2663 oldprimary,
2664 oldacting,
2665 newprimary,
2666 newacting))
2667 return true;
2668 if (any_change && oldacting != newacting)
2669 return true;
2670 return false; // same primary (tho replicas may have changed)
2671}
2672
2673bool Objecter::target_should_be_paused(op_target_t *t)
2674{
2675 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2676 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2677 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2678 _osdmap_full_flag() || _osdmap_pool_full(*pi);
2679
2680 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2681 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2682 (osdmap->get_epoch() < epoch_barrier);
2683}
2684
2685/**
2686 * Locking public accessor for _osdmap_full_flag
2687 */
2688bool Objecter::osdmap_full_flag() const
2689{
2690 shared_lock rl(rwlock);
2691
2692 return _osdmap_full_flag();
2693}
2694
2695bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2696{
2697 shared_lock rl(rwlock);
2698
2699 if (_osdmap_full_flag()) {
2700 return true;
2701 }
2702
2703 return _osdmap_pool_full(pool_id);
2704}
2705
2706bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2707{
2708 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2709 if (pool == NULL) {
2710 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2711 return false;
2712 }
2713
2714 return _osdmap_pool_full(*pool);
2715}
2716
2717bool Objecter::_osdmap_has_pool_full() const
2718{
2719 for (map<int64_t, pg_pool_t>::const_iterator it
2720 = osdmap->get_pools().begin();
2721 it != osdmap->get_pools().end(); ++it) {
2722 if (_osdmap_pool_full(it->second))
2723 return true;
2724 }
2725 return false;
2726}
2727
2728bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
2729{
2730 return p.has_flag(pg_pool_t::FLAG_FULL) && honor_osdmap_full;
2731}
2732
2733/**
2734 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2735 */
2736bool Objecter::_osdmap_full_flag() const
2737{
2738 // Ignore the FULL flag if the caller has honor_osdmap_full
2739 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
2740}
2741
2742void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2743{
2744 for (map<int64_t, pg_pool_t>::const_iterator it
2745 = osdmap->get_pools().begin();
2746 it != osdmap->get_pools().end(); ++it) {
2747 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2748 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2749 } else {
2750 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2751 pool_full_map[it->first];
2752 }
2753 }
2754}
2755
2756int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2757 const string& ns)
2758{
2759 shared_lock rl(rwlock);
2760 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2761 if (!p)
2762 return -ENOENT;
2763 return p->hash_key(key, ns);
2764}
2765
2766int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2767 const string& ns)
2768{
2769 shared_lock rl(rwlock);
2770 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2771 if (!p)
2772 return -ENOENT;
2773 return p->raw_hash_to_pg(p->hash_key(key, ns));
2774}
2775
2776int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2777{
2778 // rwlock is locked
2779 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2780 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2781 t->epoch = osdmap->get_epoch();
2782 ldout(cct,20) << __func__ << " epoch " << t->epoch
2783 << " base " << t->base_oid << " " << t->base_oloc
2784 << " precalc_pgid " << (int)t->precalc_pgid
2785 << " pgid " << t->base_pgid
2786 << (is_read ? " is_read" : "")
2787 << (is_write ? " is_write" : "")
2788 << dendl;
2789
2790 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2791 if (!pi) {
2792 t->osd = -1;
2793 return RECALC_OP_TARGET_POOL_DNE;
2794 }
2795 ldout(cct,30) << __func__ << " base pi " << pi
2796 << " pg_num " << pi->get_pg_num() << dendl;
2797
2798 bool force_resend = false;
2799 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2800 if (t->last_force_resend < pi->last_force_op_resend) {
2801 t->last_force_resend = pi->last_force_op_resend;
2802 force_resend = true;
2803 } else if (t->last_force_resend == 0) {
2804 force_resend = true;
2805 }
2806 }
2807
2808 // apply tiering
2809 t->target_oid = t->base_oid;
2810 t->target_oloc = t->base_oloc;
2811 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2812 if (is_read && pi->has_read_tier())
2813 t->target_oloc.pool = pi->read_tier;
2814 if (is_write && pi->has_write_tier())
2815 t->target_oloc.pool = pi->write_tier;
2816 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2817 if (!pi) {
2818 t->osd = -1;
2819 return RECALC_OP_TARGET_POOL_DNE;
2820 }
2821 }
2822
2823 pg_t pgid;
2824 if (t->precalc_pgid) {
2825 assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2826 assert(t->base_oid.name.empty()); // make sure this is a pg op
2827 assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
2828 pgid = t->base_pgid;
2829 } else {
2830 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2831 pgid);
2832 if (ret == -ENOENT) {
2833 t->osd = -1;
2834 return RECALC_OP_TARGET_POOL_DNE;
2835 }
2836 }
2837 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2838 << t->target_oloc << " -> pgid " << pgid << dendl;
2839 ldout(cct,30) << __func__ << " target pi " << pi
2840 << " pg_num " << pi->get_pg_num() << dendl;
2841 t->pool_ever_existed = true;
2842
2843 int size = pi->size;
2844 int min_size = pi->min_size;
2845 unsigned pg_num = pi->get_pg_num();
2846 int up_primary, acting_primary;
2847 vector<int> up, acting;
2848 osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
2849 &acting, &acting_primary);
2850 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2851 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2852 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2853 pg_t prev_pgid(prev_seed, pgid.pool());
2854 if (any_change && PastIntervals::is_new_interval(
2855 t->acting_primary,
2856 acting_primary,
2857 t->acting,
2858 acting,
2859 t->up_primary,
2860 up_primary,
2861 t->up,
2862 up,
2863 t->size,
2864 size,
2865 t->min_size,
2866 min_size,
2867 t->pg_num,
2868 pg_num,
2869 t->sort_bitwise,
2870 sort_bitwise,
c07f9fc5
FG
2871 t->recovery_deletes,
2872 recovery_deletes,
7c673cae
FG
2873 prev_pgid)) {
2874 force_resend = true;
2875 }
2876
2877 bool unpaused = false;
f64942e4
AA
2878 bool should_be_paused = target_should_be_paused(t);
2879 if (t->paused && !should_be_paused) {
7c673cae
FG
2880 unpaused = true;
2881 }
f64942e4 2882 t->paused = should_be_paused;
7c673cae
FG
2883
2884 bool legacy_change =
2885 t->pgid != pgid ||
2886 is_pg_changed(
2887 t->acting_primary, t->acting, acting_primary, acting,
2888 t->used_replica || any_change);
2889 bool split = false;
2890 if (t->pg_num) {
2891 split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
2892 }
2893
2894 if (legacy_change || split || force_resend) {
2895 t->pgid = pgid;
2896 t->acting = acting;
2897 t->acting_primary = acting_primary;
2898 t->up_primary = up_primary;
2899 t->up = up;
2900 t->size = size;
2901 t->min_size = min_size;
2902 t->pg_num = pg_num;
2903 t->pg_num_mask = pi->get_pg_num_mask();
2904 osdmap->get_primary_shard(
2905 pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
2906 &t->actual_pgid);
2907 t->sort_bitwise = sort_bitwise;
c07f9fc5 2908 t->recovery_deletes = recovery_deletes;
7c673cae
FG
2909 ldout(cct, 10) << __func__ << " "
2910 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2911 << " acting " << acting
2912 << " primary " << acting_primary << dendl;
2913 t->used_replica = false;
2914 if (acting_primary == -1) {
2915 t->osd = -1;
2916 } else {
2917 int osd;
2918 bool read = is_read && !is_write;
2919 if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2920 int p = rand() % acting.size();
2921 if (p)
2922 t->used_replica = true;
2923 osd = acting[p];
2924 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2925 << dendl;
2926 } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2927 acting.size() > 1) {
2928 // look for a local replica. prefer the primary if the
2929 // distance is the same.
2930 int best = -1;
2931 int best_locality = 0;
2932 for (unsigned i = 0; i < acting.size(); ++i) {
2933 int locality = osdmap->crush->get_common_ancestor_distance(
2934 cct, acting[i], crush_location);
2935 ldout(cct, 20) << __func__ << " localize: rank " << i
2936 << " osd." << acting[i]
2937 << " locality " << locality << dendl;
2938 if (i == 0 ||
2939 (locality >= 0 && best_locality >= 0 &&
2940 locality < best_locality) ||
2941 (best_locality < 0 && locality >= 0)) {
2942 best = i;
2943 best_locality = locality;
2944 if (i)
2945 t->used_replica = true;
2946 }
2947 }
2948 assert(best >= 0);
2949 osd = acting[best];
2950 } else {
2951 osd = acting_primary;
2952 }
2953 t->osd = osd;
2954 }
2955 }
2956 if (legacy_change || unpaused || force_resend) {
2957 return RECALC_OP_TARGET_NEED_RESEND;
2958 }
91327a77
AA
2959 if (split &&
2960 (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS ||
2961 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
2962 RESEND_ON_SPLIT))) {
7c673cae
FG
2963 return RECALC_OP_TARGET_NEED_RESEND;
2964 }
2965 return RECALC_OP_TARGET_NO_ACTION;
2966}
2967
2968int Objecter::_map_session(op_target_t *target, OSDSession **s,
2969 shunique_lock& sul)
2970{
2971 _calc_target(target, nullptr);
2972 return _get_session(target->osd, s, sul);
2973}
2974
2975void Objecter::_session_op_assign(OSDSession *to, Op *op)
2976{
2977 // to->lock is locked
2978 assert(op->session == NULL);
2979 assert(op->tid);
2980
2981 get_session(to);
2982 op->session = to;
2983 to->ops[op->tid] = op;
2984
2985 if (to->is_homeless()) {
31f18b77 2986 num_homeless_ops++;
7c673cae
FG
2987 }
2988
2989 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
2990}
2991
2992void Objecter::_session_op_remove(OSDSession *from, Op *op)
2993{
2994 assert(op->session == from);
2995 // from->lock is locked
2996
2997 if (from->is_homeless()) {
31f18b77 2998 num_homeless_ops--;
7c673cae
FG
2999 }
3000
3001 from->ops.erase(op->tid);
3002 put_session(from);
3003 op->session = NULL;
3004
3005 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3006}
3007
3008void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3009{
3010 // to lock is locked unique
3011 assert(op->session == NULL);
3012
3013 if (to->is_homeless()) {
31f18b77 3014 num_homeless_ops++;
7c673cae
FG
3015 }
3016
3017 get_session(to);
3018 op->session = to;
3019 to->linger_ops[op->linger_id] = op;
3020
3021 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3022 << dendl;
3023}
3024
3025void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3026{
3027 assert(from == op->session);
3028 // from->lock is locked unique
3029
3030 if (from->is_homeless()) {
31f18b77 3031 num_homeless_ops--;
7c673cae
FG
3032 }
3033
3034 from->linger_ops.erase(op->linger_id);
3035 put_session(from);
3036 op->session = NULL;
3037
3038 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3039 << dendl;
3040}
3041
3042void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3043{
3044 assert(from == op->session);
3045 // from->lock is locked
3046
3047 if (from->is_homeless()) {
31f18b77 3048 num_homeless_ops--;
7c673cae
FG
3049 }
3050
3051 from->command_ops.erase(op->tid);
3052 put_session(from);
3053 op->session = NULL;
3054
3055 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3056}
3057
3058void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3059{
3060 // to->lock is locked
3061 assert(op->session == NULL);
3062 assert(op->tid);
3063
3064 if (to->is_homeless()) {
31f18b77 3065 num_homeless_ops++;
7c673cae
FG
3066 }
3067
3068 get_session(to);
3069 op->session = to;
3070 to->command_ops[op->tid] = op;
3071
3072 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3073}
3074
3075int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3076 shunique_lock& sul)
3077{
3078 // rwlock is locked unique
3079
3080 int r = _calc_target(&linger_op->target, nullptr, true);
3081 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3082 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3083 << " pgid " << linger_op->target.pgid
3084 << " acting " << linger_op->target.acting << dendl;
3085
3086 OSDSession *s = NULL;
3087 r = _get_session(linger_op->target.osd, &s, sul);
3088 assert(r == 0);
3089
3090 if (linger_op->session != s) {
3091 // NB locking two sessions (s and linger_op->session) at the
3092 // same time here is only safe because we are the only one that
3093 // takes two, and we are holding rwlock for write. Disable
3094 // lockdep because it doesn't know that.
3095 OSDSession::unique_lock sl(s->lock);
3096 _session_linger_op_remove(linger_op->session, linger_op);
3097 _session_linger_op_assign(s, linger_op);
3098 }
3099
3100 put_session(s);
3101 return RECALC_OP_TARGET_NEED_RESEND;
3102 }
3103 return r;
3104}
3105
3106void Objecter::_cancel_linger_op(Op *op)
3107{
3108 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3109
3110 assert(!op->should_resend);
3111 if (op->onfinish) {
3112 delete op->onfinish;
31f18b77 3113 num_in_flight--;
7c673cae
FG
3114 }
3115
3116 _finish_op(op, 0);
3117}
3118
3119void Objecter::_finish_op(Op *op, int r)
3120{
3121 ldout(cct, 15) << "finish_op " << op->tid << dendl;
3122
3123 // op->session->lock is locked unique or op->session is null
3124
3125 if (!op->ctx_budgeted && op->budgeted)
3126 put_op_budget(op);
3127
3128 if (op->ontimeout && r != -ETIMEDOUT)
3129 timer.cancel_event(op->ontimeout);
3130
3131 if (op->session) {
3132 _session_op_remove(op->session, op);
3133 }
3134
3135 logger->dec(l_osdc_op_active);
3136
3137 assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
3138
31f18b77 3139 inflight_ops--;
7c673cae
FG
3140
3141 op->put();
3142}
3143
3144void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
3145{
3146 ldout(cct, 15) << "finish_op " << tid << dendl;
3147 shared_lock rl(rwlock);
3148
3149 OSDSession::unique_lock wl(session->lock);
3150
3151 map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
3152 if (iter == session->ops.end())
3153 return;
3154
3155 Op *op = iter->second;
3156
3157 _finish_op(op, 0);
3158}
3159
3160MOSDOp *Objecter::_prepare_osd_op(Op *op)
3161{
3162 // rwlock is locked
3163
3164 int flags = op->target.flags;
3165 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3166
3167 // Nothing checks this any longer, but needed for compatibility with
3168 // pre-luminous osds
3169 flags |= CEPH_OSD_FLAG_ONDISK;
3170
3171 if (!honor_osdmap_full)
3172 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3173
3174 op->target.paused = false;
3175 op->stamp = ceph::mono_clock::now();
3176
3177 hobject_t hobj = op->target.get_hobj();
31f18b77 3178 MOSDOp *m = new MOSDOp(client_inc, op->tid,
7c673cae
FG
3179 hobj, op->target.actual_pgid,
3180 osdmap->get_epoch(),
3181 flags, op->features);
3182
3183 m->set_snapid(op->snapid);
3184 m->set_snap_seq(op->snapc.seq);
3185 m->set_snaps(op->snapc.snaps);
3186
3187 m->ops = op->ops;
3188 m->set_mtime(op->mtime);
3189 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3190
3191 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3192 op->trace.init("op", &trace_endpoint);
3193 }
7c673cae
FG
3194
3195 if (op->priority)
3196 m->set_priority(op->priority);
3197 else
3198 m->set_priority(cct->_conf->osd_client_op_priority);
3199
3200 if (op->reqid != osd_reqid_t()) {
3201 m->set_reqid(op->reqid);
3202 }
3203
3204 logger->inc(l_osdc_op_send);
b32b8144
FG
3205 ssize_t sum = 0;
3206 for (unsigned i = 0; i < m->ops.size(); i++) {
3207 sum += m->ops[i].indata.length();
3208 }
3209 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3210
3211 return m;
3212}
3213
3214void Objecter::_send_op(Op *op, MOSDOp *m)
3215{
3216 // rwlock is locked
3217 // op->session->lock is locked
3218
3219 // backoff?
7c673cae
FG
3220 auto p = op->session->backoffs.find(op->target.actual_pgid);
3221 if (p != op->session->backoffs.end()) {
b32b8144 3222 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3223 auto q = p->second.lower_bound(hoid);
3224 if (q != p->second.begin()) {
3225 --q;
3226 if (hoid >= q->second.end) {
3227 ++q;
3228 }
3229 }
3230 if (q != p->second.end()) {
3231 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3232 << "," << q->second.end << ")" << dendl;
3233 int r = cmp(hoid, q->second.begin);
3234 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3235 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3236 << " id " << q->second.id << " on " << hoid
3237 << ", queuing " << op << " tid " << op->tid << dendl;
3238 return;
3239 }
3240 }
3241 }
3242
3243 if (!m) {
3244 assert(op->tid > 0);
3245 m = _prepare_osd_op(op);
3246 }
3247
3248 if (op->target.actual_pgid != m->get_spg()) {
3249 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3250 << m->get_spg() << " to " << op->target.actual_pgid
3251 << ", updating and reencoding" << dendl;
3252 m->set_spg(op->target.actual_pgid);
3253 m->clear_payload(); // reencode
3254 }
3255
3256 ldout(cct, 15) << "_send_op " << op->tid << " to "
3257 << op->target.actual_pgid << " on osd." << op->session->osd
3258 << dendl;
3259
3260 ConnectionRef con = op->session->con;
3261 assert(con);
3262
3263 // preallocated rx buffer?
3264 if (op->con) {
3265 ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
3266 << op->con << dendl;
3267 op->con->revoke_rx_buffer(op->tid);
3268 }
3269 if (op->outbl &&
3270 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3271 op->outbl->length()) {
3272 ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
3273 << dendl;
3274 op->con = con;
3275 op->con->post_rx_buffer(op->tid, *op->outbl);
3276 }
3277
3278 op->incarnation = op->session->incarnation;
3279
3280 m->set_tid(op->tid);
3281
31f18b77
FG
3282 if (op->trace.valid()) {
3283 m->trace.init("op msg", nullptr, &op->trace);
3284 }
7c673cae
FG
3285 op->session->con->send_message(m);
3286}
3287
3288int Objecter::calc_op_budget(Op *op)
3289{
3290 int op_budget = 0;
3291 for (vector<OSDOp>::iterator i = op->ops.begin();
3292 i != op->ops.end();
3293 ++i) {
3294 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3295 op_budget += i->indata.length();
3296 } else if (ceph_osd_op_mode_read(i->op.op)) {
3297 if (ceph_osd_op_type_data(i->op.op)) {
3298 if ((int64_t)i->op.extent.length > 0)
3299 op_budget += (int64_t)i->op.extent.length;
3300 } else if (ceph_osd_op_type_attr(i->op.op)) {
3301 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
3302 }
3303 }
3304 }
3305 return op_budget;
3306}
3307
3308void Objecter::_throttle_op(Op *op,
3309 shunique_lock& sul,
3310 int op_budget)
3311{
3312 assert(sul && sul.mutex() == &rwlock);
3313 bool locked_for_write = sul.owns_lock();
3314
3315 if (!op_budget)
3316 op_budget = calc_op_budget(op);
3317 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3318 sul.unlock();
3319 op_throttle_bytes.get(op_budget);
3320 if (locked_for_write)
3321 sul.lock();
3322 else
3323 sul.lock_shared();
3324 }
3325 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3326 sul.unlock();
3327 op_throttle_ops.get(1);
3328 if (locked_for_write)
3329 sul.lock();
3330 else
3331 sul.lock_shared();
3332 }
3333}
3334
3335void Objecter::unregister_op(Op *op)
3336{
3337 OSDSession::unique_lock sl(op->session->lock);
3338 op->session->ops.erase(op->tid);
3339 sl.unlock();
3340 put_session(op->session);
3341 op->session = NULL;
3342
31f18b77 3343 inflight_ops--;
7c673cae
FG
3344}
3345
3346/* This function DOES put the passed message before returning */
3347void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3348{
3349 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3350
3351 // get pio
3352 ceph_tid_t tid = m->get_tid();
3353
3354 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3355 if (!initialized) {
7c673cae
FG
3356 m->put();
3357 return;
3358 }
3359
3360 ConnectionRef con = m->get_connection();
3361 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3362 if (!s || s->con != con) {
3363 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3364 if (s) {
3365 s->put();
3366 }
3367 m->put();
3368 return;
3369 }
3370
3371 OSDSession::unique_lock sl(s->lock);
3372
3373 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3374 if (iter == s->ops.end()) {
3375 ldout(cct, 7) << "handle_osd_op_reply " << tid
3376 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3377 " onnvram" : " ack"))
3378 << " ... stray" << dendl;
3379 sl.unlock();
3380 put_session(s);
3381 m->put();
3382 return;
3383 }
3384
3385 ldout(cct, 7) << "handle_osd_op_reply " << tid
3386 << (m->is_ondisk() ? " ondisk" :
3387 (m->is_onnvram() ? " onnvram" : " ack"))
3388 << " uv " << m->get_user_version()
3389 << " in " << m->get_pg()
3390 << " attempt " << m->get_retry_attempt()
3391 << dendl;
3392 Op *op = iter->second;
31f18b77 3393 op->trace.event("osd op reply");
7c673cae
FG
3394
3395 if (retry_writes_after_first_reply && op->attempts == 1 &&
3396 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3397 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3398 if (op->onfinish) {
31f18b77 3399 num_in_flight--;
7c673cae
FG
3400 }
3401 _session_op_remove(s, op);
3402 sl.unlock();
3403 put_session(s);
3404
3405 _op_submit(op, sul, NULL);
3406 m->put();
3407 return;
3408 }
3409
3410 if (m->get_retry_attempt() >= 0) {
3411 if (m->get_retry_attempt() != (op->attempts - 1)) {
3412 ldout(cct, 7) << " ignoring reply from attempt "
3413 << m->get_retry_attempt()
3414 << " from " << m->get_source_inst()
3415 << "; last attempt " << (op->attempts - 1) << " sent to "
3416 << op->session->con->get_peer_addr() << dendl;
3417 m->put();
3418 sl.unlock();
3419 put_session(s);
3420 return;
3421 }
3422 } else {
3423 // we don't know the request attempt because the server is old, so
3424 // just accept this one. we may do ACK callbacks we shouldn't
3425 // have, but that is better than doing callbacks out of order.
3426 }
3427
3428 Context *onfinish = 0;
3429
3430 int rc = m->get_result();
3431
3432 if (m->is_redirect_reply()) {
3433 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3434 if (op->onfinish)
31f18b77 3435 num_in_flight--;
7c673cae
FG
3436 _session_op_remove(s, op);
3437 sl.unlock();
3438 put_session(s);
3439
3440 // FIXME: two redirects could race and reorder
3441
3442 op->tid = 0;
3443 m->get_redirect().combine_with_locator(op->target.target_oloc,
3444 op->target.target_oid.name);
f64942e4
AA
3445 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED |
3446 CEPH_OSD_FLAG_IGNORE_CACHE |
3447 CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3448 _op_submit(op, sul, NULL);
3449 m->put();
3450 return;
3451 }
3452
3453 if (rc == -EAGAIN) {
3454 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
c07f9fc5
FG
3455 if (op->onfinish)
3456 num_in_flight--;
3457 _session_op_remove(s, op);
7c673cae
FG
3458 sl.unlock();
3459 put_session(s);
c07f9fc5
FG
3460
3461 op->tid = 0;
3462 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3463 CEPH_OSD_FLAG_LOCALIZE_READS);
3464 op->target.pgid = pg_t();
3465 _op_submit(op, sul, NULL);
7c673cae
FG
3466 m->put();
3467 return;
3468 }
3469
3470 sul.unlock();
3471
3472 if (op->objver)
3473 *op->objver = m->get_user_version();
3474 if (op->reply_epoch)
3475 *op->reply_epoch = m->get_map_epoch();
3476 if (op->data_offset)
3477 *op->data_offset = m->get_header().data_off;
3478
3479 // got data?
3480 if (op->outbl) {
3481 if (op->con)
3482 op->con->revoke_rx_buffer(op->tid);
3483 m->claim_data(*op->outbl);
3484 op->outbl = 0;
3485 }
3486
3487 // per-op result demuxing
3488 vector<OSDOp> out_ops;
3489 m->claim_ops(out_ops);
3490
3491 if (out_ops.size() != op->ops.size())
3492 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3493 << " != request ops " << op->ops
3494 << " from " << m->get_source_inst() << dendl;
3495
3496 vector<bufferlist*>::iterator pb = op->out_bl.begin();
3497 vector<int*>::iterator pr = op->out_rval.begin();
3498 vector<Context*>::iterator ph = op->out_handler.begin();
3499 assert(op->out_bl.size() == op->out_rval.size());
3500 assert(op->out_bl.size() == op->out_handler.size());
3501 vector<OSDOp>::iterator p = out_ops.begin();
3502 for (unsigned i = 0;
3503 p != out_ops.end() && pb != op->out_bl.end();
3504 ++i, ++p, ++pb, ++pr, ++ph) {
3505 ldout(cct, 10) << " op " << i << " rval " << p->rval
3506 << " len " << p->outdata.length() << dendl;
3507 if (*pb)
3508 **pb = p->outdata;
3509 // set rval before running handlers so that handlers
3510 // can change it if e.g. decoding fails
3511 if (*pr)
31f18b77 3512 **pr = ceph_to_hostos_errno(p->rval);
7c673cae
FG
3513 if (*ph) {
3514 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
31f18b77 3515 (*ph)->complete(ceph_to_hostos_errno(p->rval));
7c673cae
FG
3516 *ph = NULL;
3517 }
3518 }
3519
3520 // NOTE: we assume that since we only request ONDISK ever we will
3521 // only ever get back one (type of) ack ever.
3522
3523 if (op->onfinish) {
31f18b77 3524 num_in_flight--;
7c673cae
FG
3525 onfinish = op->onfinish;
3526 op->onfinish = NULL;
3527 }
3528 logger->inc(l_osdc_op_reply);
3529
3530 /* get it before we call _finish_op() */
3531 auto completion_lock = s->get_lock(op->target.base_oid);
3532
3533 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3534 _finish_op(op, 0);
3535
31f18b77 3536 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3537
3538 // serialize completions
3539 if (completion_lock.mutex()) {
3540 completion_lock.lock();
3541 }
3542 sl.unlock();
3543
3544 // do callbacks
3545 if (onfinish) {
3546 onfinish->complete(rc);
3547 }
3548 if (completion_lock.mutex()) {
3549 completion_lock.unlock();
3550 }
3551
3552 m->put();
3553 put_session(s);
3554}
3555
3556void Objecter::handle_osd_backoff(MOSDBackoff *m)
3557{
3558 ldout(cct, 10) << __func__ << " " << *m << dendl;
3559 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3560 if (!initialized) {
7c673cae
FG
3561 m->put();
3562 return;
3563 }
3564
3565 ConnectionRef con = m->get_connection();
3566 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3567 if (!s || s->con != con) {
3568 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3569 if (s)
3570 s->put();
3571 m->put();
3572 return;
3573 }
3574
3575 get_session(s);
3576 s->put(); // from get_priv() above
3577
3578 OSDSession::unique_lock sl(s->lock);
3579
3580 switch (m->op) {
3581 case CEPH_OSD_BACKOFF_OP_BLOCK:
3582 {
3583 // register
3584 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3585 s->backoffs_by_id.insert(make_pair(m->id, &b));
3586 b.pgid = m->pgid;
3587 b.id = m->id;
3588 b.begin = m->begin;
3589 b.end = m->end;
3590
3591 // ack with original backoff's epoch so that the osd can discard this if
3592 // there was a pg split.
3593 Message *r = new MOSDBackoff(m->pgid,
3594 m->map_epoch,
3595 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3596 m->id, m->begin, m->end);
3597 // this priority must match the MOSDOps from _prepare_osd_op
3598 r->set_priority(cct->_conf->osd_client_op_priority);
3599 con->send_message(r);
3600 }
3601 break;
3602
3603 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3604 {
3605 auto p = s->backoffs_by_id.find(m->id);
3606 if (p != s->backoffs_by_id.end()) {
3607 OSDBackoff *b = p->second;
3608 if (b->begin != m->begin &&
3609 b->end != m->end) {
3610 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3611 << " unblock on ["
3612 << m->begin << "," << m->end << ") but backoff is ["
3613 << b->begin << "," << b->end << ")" << dendl;
3614 // hrmpf, unblock it anyway.
3615 }
3616 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3617 << " id " << b->id
3618 << " [" << b->begin << "," << b->end
3619 << ")" << dendl;
3620 auto spgp = s->backoffs.find(b->pgid);
3621 assert(spgp != s->backoffs.end());
3622 spgp->second.erase(b->begin);
3623 if (spgp->second.empty()) {
3624 s->backoffs.erase(spgp);
3625 }
3626 s->backoffs_by_id.erase(p);
3627
3628 // check for any ops to resend
3629 for (auto& q : s->ops) {
3630 if (q.second->target.actual_pgid == m->pgid) {
3631 int r = q.second->target.contained_by(m->begin, m->end);
3632 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3633 << q.second->target.get_hobj() << dendl;
3634 if (r) {
3635 _send_op(q.second);
3636 }
3637 }
3638 }
3639 } else {
3640 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3641 << " unblock on ["
3642 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3643 }
3644 }
3645 break;
3646
3647 default:
3648 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3649 }
3650
3651 sul.unlock();
3652 sl.unlock();
3653
3654 m->put();
3655 put_session(s);
3656}
3657
3658uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3659 uint32_t pos)
3660{
3661 shared_lock rl(rwlock);
3662 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3663 pos, list_context->pool_id, string());
3664 ldout(cct, 10) << __func__ << list_context
3665 << " pos " << pos << " -> " << list_context->pos << dendl;
3666 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3667 list_context->current_pg = actual.ps();
3668 list_context->at_end_of_pool = false;
3669 return pos;
3670}
3671
3672uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3673 const hobject_t& cursor)
3674{
3675 shared_lock rl(rwlock);
3676 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3677 list_context->pos = cursor;
3678 list_context->at_end_of_pool = false;
3679 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3680 list_context->current_pg = actual.ps();
3681 list_context->sort_bitwise = true;
3682 return list_context->current_pg;
3683}
3684
3685void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3686 hobject_t *cursor)
3687{
3688 shared_lock rl(rwlock);
3689 if (list_context->list.empty()) {
3690 *cursor = list_context->pos;
3691 } else {
3692 const librados::ListObjectImpl& entry = list_context->list.front();
3693 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3694 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3695 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3696 }
3697}
3698
3699void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3700{
3701 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3702 << " pool_snap_seq " << list_context->pool_snap_seq
3703 << " max_entries " << list_context->max_entries
3704 << " list_context " << list_context
3705 << " onfinish " << onfinish
3706 << " current_pg " << list_context->current_pg
3707 << " pos " << list_context->pos << dendl;
3708
3709 shared_lock rl(rwlock);
3710 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3711 if (!pool) { // pool is gone
3712 rl.unlock();
3713 put_nlist_context_budget(list_context);
3714 onfinish->complete(-ENOENT);
3715 return;
3716 }
3717 int pg_num = pool->get_pg_num();
3718 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3719
3720 if (list_context->pos.is_min()) {
3721 list_context->starting_pg_num = 0;
3722 list_context->sort_bitwise = sort_bitwise;
3723 list_context->starting_pg_num = pg_num;
3724 }
3725 if (list_context->sort_bitwise != sort_bitwise) {
3726 list_context->pos = hobject_t(
3727 object_t(), string(), CEPH_NOSNAP,
3728 list_context->current_pg, list_context->pool_id, string());
3729 list_context->sort_bitwise = sort_bitwise;
3730 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3731 << list_context->pos << dendl;
3732 }
3733 if (list_context->starting_pg_num != pg_num) {
3734 if (!sort_bitwise) {
3735 // start reading from the beginning; the pgs have changed
3736 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3737 list_context->pos = collection_list_handle_t();
3738 }
3739 list_context->starting_pg_num = pg_num;
3740 }
3741
3742 if (list_context->pos.is_max()) {
3743 ldout(cct, 20) << __func__ << " end of pool, list "
3744 << list_context->list << dendl;
3745 if (list_context->list.empty()) {
3746 list_context->at_end_of_pool = true;
3747 }
3748 // release the listing context's budget once all
3749 // OPs (in the session) are finished
3750 put_nlist_context_budget(list_context);
3751 onfinish->complete(0);
3752 return;
3753 }
3754
3755 ObjectOperation op;
3756 op.pg_nls(list_context->max_entries, list_context->filter,
3757 list_context->pos, osdmap->get_epoch());
3758 list_context->bl.clear();
3759 C_NList *onack = new C_NList(list_context, onfinish, this);
3760 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3761
3762 // note current_pg in case we don't have (or lose) SORTBITWISE
3763 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3764 rl.unlock();
3765
3766 pg_read(list_context->current_pg, oloc, op,
3767 &list_context->bl, 0, onack, &onack->epoch,
3768 &list_context->ctx_budget);
3769}
3770
3771void Objecter::_nlist_reply(NListContext *list_context, int r,
3772 Context *final_finish, epoch_t reply_epoch)
3773{
3774 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3775
3776 bufferlist::iterator iter = list_context->bl.begin();
3777 pg_nls_response_t response;
3778 bufferlist extra_info;
3779 ::decode(response, iter);
3780 if (!iter.end()) {
3781 ::decode(extra_info, iter);
3782 }
3783
3784 // if the osd returns 1 (newer code), or handle MAX, it means we
3785 // hit the end of the pg.
3786 if ((response.handle.is_max() || r == 1) &&
3787 !list_context->sort_bitwise) {
3788 // legacy OSD and !sortbitwise, figure out the next PG on our own
3789 ++list_context->current_pg;
3790 if (list_context->current_pg == list_context->starting_pg_num) {
3791 // end of pool
3792 list_context->pos = hobject_t::get_max();
3793 } else {
3794 // next pg
3795 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3796 list_context->current_pg,
3797 list_context->pool_id, string());
3798 }
3799 } else {
3800 list_context->pos = response.handle;
3801 }
3802
3803 int response_size = response.entries.size();
3804 ldout(cct, 20) << " response.entries.size " << response_size
3805 << ", response.entries " << response.entries
3806 << ", handle " << response.handle
3807 << ", tentative new pos " << list_context->pos << dendl;
3808 list_context->extra_info.append(extra_info);
3809 if (response_size) {
3810 list_context->list.splice(list_context->list.end(), response.entries);
3811 }
3812
3813 if (list_context->list.size() >= list_context->max_entries) {
3814 ldout(cct, 20) << " hit max, returning results so far, "
3815 << list_context->list << dendl;
3816 // release the listing context's budget once all
3817 // OPs (in the session) are finished
3818 put_nlist_context_budget(list_context);
3819 final_finish->complete(0);
3820 return;
3821 }
3822
3823 // continue!
3824 list_nobjects(list_context, final_finish);
3825}
3826
3827void Objecter::put_nlist_context_budget(NListContext *list_context)
3828{
3829 if (list_context->ctx_budget >= 0) {
3830 ldout(cct, 10) << " release listing context's budget " <<
3831 list_context->ctx_budget << dendl;
3832 put_op_budget_bytes(list_context->ctx_budget);
3833 list_context->ctx_budget = -1;
3834 }
3835}
3836
3837// snapshots
3838
3839int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3840 Context *onfinish)
3841{
3842 unique_lock wl(rwlock);
3843 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3844 << snap_name << dendl;
3845
3846 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3847 if (!p)
3848 return -EINVAL;
3849 if (p->snap_exists(snap_name.c_str()))
3850 return -EEXIST;
3851
3852 PoolOp *op = new PoolOp;
3853 if (!op)
3854 return -ENOMEM;
31f18b77 3855 op->tid = ++last_tid;
7c673cae
FG
3856 op->pool = pool;
3857 op->name = snap_name;
3858 op->onfinish = onfinish;
3859 op->pool_op = POOL_OP_CREATE_SNAP;
3860 pool_ops[op->tid] = op;
3861
3862 pool_op_submit(op);
3863
3864 return 0;
3865}
3866
3867struct C_SelfmanagedSnap : public Context {
3868 bufferlist bl;
3869 snapid_t *psnapid;
3870 Context *fin;
3871 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3872 void finish(int r) override {
3873 if (r == 0) {
3874 bufferlist::iterator p = bl.begin();
3875 ::decode(*psnapid, p);
3876 }
3877 fin->complete(r);
3878 }
3879};
3880
3881int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3882 Context *onfinish)
3883{
3884 unique_lock wl(rwlock);
3885 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3886 PoolOp *op = new PoolOp;
3887 if (!op) return -ENOMEM;
31f18b77 3888 op->tid = ++last_tid;
7c673cae
FG
3889 op->pool = pool;
3890 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3891 op->onfinish = fin;
3892 op->blp = &fin->bl;
3893 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3894 pool_ops[op->tid] = op;
3895
3896 pool_op_submit(op);
3897 return 0;
3898}
3899
3900int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3901 Context *onfinish)
3902{
3903 unique_lock wl(rwlock);
3904 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3905 << snap_name << dendl;
3906
3907 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3908 if (!p)
3909 return -EINVAL;
3910 if (!p->snap_exists(snap_name.c_str()))
3911 return -ENOENT;
3912
3913 PoolOp *op = new PoolOp;
3914 if (!op)
3915 return -ENOMEM;
31f18b77 3916 op->tid = ++last_tid;
7c673cae
FG
3917 op->pool = pool;
3918 op->name = snap_name;
3919 op->onfinish = onfinish;
3920 op->pool_op = POOL_OP_DELETE_SNAP;
3921 pool_ops[op->tid] = op;
3922
3923 pool_op_submit(op);
3924
3925 return 0;
3926}
3927
3928int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3929 Context *onfinish)
3930{
3931 unique_lock wl(rwlock);
3932 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3933 << snap << dendl;
3934 PoolOp *op = new PoolOp;
3935 if (!op) return -ENOMEM;
31f18b77 3936 op->tid = ++last_tid;
7c673cae
FG
3937 op->pool = pool;
3938 op->onfinish = onfinish;
3939 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3940 op->snapid = snap;
3941 pool_ops[op->tid] = op;
3942
3943 pool_op_submit(op);
3944
3945 return 0;
3946}
3947
3948int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
3949 int crush_rule)
3950{
3951 unique_lock wl(rwlock);
3952 ldout(cct, 10) << "create_pool name=" << name << dendl;
3953
3954 if (osdmap->lookup_pg_pool_name(name) >= 0)
3955 return -EEXIST;
3956
3957 PoolOp *op = new PoolOp;
3958 if (!op)
3959 return -ENOMEM;
31f18b77 3960 op->tid = ++last_tid;
7c673cae
FG
3961 op->pool = 0;
3962 op->name = name;
3963 op->onfinish = onfinish;
3964 op->pool_op = POOL_OP_CREATE;
3965 pool_ops[op->tid] = op;
3966 op->auid = auid;
3967 op->crush_rule = crush_rule;
3968
3969 pool_op_submit(op);
3970
3971 return 0;
3972}
3973
3974int Objecter::delete_pool(int64_t pool, Context *onfinish)
3975{
3976 unique_lock wl(rwlock);
3977 ldout(cct, 10) << "delete_pool " << pool << dendl;
3978
3979 if (!osdmap->have_pg_pool(pool))
3980 return -ENOENT;
3981
3982 _do_delete_pool(pool, onfinish);
3983 return 0;
3984}
3985
3986int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3987{
3988 unique_lock wl(rwlock);
3989 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3990
3991 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3992 if (pool < 0)
3993 return pool;
3994
3995 _do_delete_pool(pool, onfinish);
3996 return 0;
3997}
3998
3999void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
4000{
4001 PoolOp *op = new PoolOp;
31f18b77 4002 op->tid = ++last_tid;
7c673cae
FG
4003 op->pool = pool;
4004 op->name = "delete";
4005 op->onfinish = onfinish;
4006 op->pool_op = POOL_OP_DELETE;
4007 pool_ops[op->tid] = op;
4008 pool_op_submit(op);
4009}
4010
4011/**
4012 * change the auid owner of a pool by contacting the monitor.
4013 * This requires the current connection to have write permissions
4014 * on both the pool's current auid and the new (parameter) auid.
4015 * Uses the standard Context callback when done.
4016 */
4017int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
4018{
4019 unique_lock wl(rwlock);
4020 ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
4021 PoolOp *op = new PoolOp;
4022 if (!op) return -ENOMEM;
31f18b77 4023 op->tid = ++last_tid;
7c673cae
FG
4024 op->pool = pool;
4025 op->name = "change_pool_auid";
4026 op->onfinish = onfinish;
4027 op->pool_op = POOL_OP_AUID_CHANGE;
4028 op->auid = auid;
4029 pool_ops[op->tid] = op;
4030
4031 logger->set(l_osdc_poolop_active, pool_ops.size());
4032
4033 pool_op_submit(op);
4034 return 0;
4035}
4036
4037void Objecter::pool_op_submit(PoolOp *op)
4038{
4039 // rwlock is locked
4040 if (mon_timeout > timespan(0)) {
4041 op->ontimeout = timer.add_event(mon_timeout,
4042 [this, op]() {
4043 pool_op_cancel(op->tid, -ETIMEDOUT); });
4044 }
4045 _pool_op_submit(op);
4046}
4047
4048void Objecter::_pool_op_submit(PoolOp *op)
4049{
4050 // rwlock is locked unique
4051
4052 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4053 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4054 op->name, op->pool_op,
4055 op->auid, last_seen_osdmap_version);
4056 if (op->snapid) m->snapid = op->snapid;
4057 if (op->crush_rule) m->crush_rule = op->crush_rule;
4058 monc->send_mon_message(m);
4059 op->last_submit = ceph::mono_clock::now();
4060
4061 logger->inc(l_osdc_poolop_send);
4062}
4063
4064/**
4065 * Handle a reply to a PoolOp message. Check that we sent the message
4066 * and give the caller responsibility for the returned bufferlist.
4067 * Then either call the finisher or stash the PoolOp, depending on if we
4068 * have a new enough map.
4069 * Lastly, clean up the message and PoolOp.
4070 */
4071void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4072{
4073 FUNCTRACE();
4074 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4075 if (!initialized) {
7c673cae
FG
4076 sul.unlock();
4077 m->put();
4078 return;
4079 }
4080
4081 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4082 ceph_tid_t tid = m->get_tid();
4083 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4084 if (iter != pool_ops.end()) {
4085 PoolOp *op = iter->second;
4086 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4087 << ceph_pool_op_name(op->pool_op) << dendl;
4088 if (op->blp)
4089 op->blp->claim(m->response_data);
4090 if (m->version > last_seen_osdmap_version)
4091 last_seen_osdmap_version = m->version;
4092 if (osdmap->get_epoch() < m->epoch) {
4093 sul.unlock();
4094 sul.lock();
4095 // recheck op existence since we have let go of rwlock
4096 // (for promotion) above.
4097 iter = pool_ops.find(tid);
4098 if (iter == pool_ops.end())
4099 goto done; // op is gone.
4100 if (osdmap->get_epoch() < m->epoch) {
4101 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4102 << " before calling back" << dendl;
4103 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4104 } else {
4105 // map epoch changed, probably because a MOSDMap message
4106 // sneaked in. Do caller-specified callback now or else
4107 // we lose it forever.
4108 assert(op->onfinish);
4109 op->onfinish->complete(m->replyCode);
4110 }
4111 } else {
4112 assert(op->onfinish);
4113 op->onfinish->complete(m->replyCode);
4114 }
4115 op->onfinish = NULL;
4116 if (!sul.owns_lock()) {
4117 sul.unlock();
4118 sul.lock();
4119 }
4120 iter = pool_ops.find(tid);
4121 if (iter != pool_ops.end()) {
4122 _finish_pool_op(op, 0);
4123 }
4124 } else {
4125 ldout(cct, 10) << "unknown request " << tid << dendl;
4126 }
4127
4128done:
4129 // Not strictly necessary, since we'll release it on return.
4130 sul.unlock();
4131
4132 ldout(cct, 10) << "done" << dendl;
4133 m->put();
4134}
4135
4136int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4137{
31f18b77 4138 assert(initialized);
7c673cae
FG
4139
4140 unique_lock wl(rwlock);
4141
4142 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4143 if (it == pool_ops.end()) {
4144 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4145 return -ENOENT;
4146 }
4147
4148 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4149
4150 PoolOp *op = it->second;
4151 if (op->onfinish)
4152 op->onfinish->complete(r);
4153
4154 _finish_pool_op(op, r);
4155 return 0;
4156}
4157
4158void Objecter::_finish_pool_op(PoolOp *op, int r)
4159{
4160 // rwlock is locked unique
4161 pool_ops.erase(op->tid);
4162 logger->set(l_osdc_poolop_active, pool_ops.size());
4163
4164 if (op->ontimeout && r != -ETIMEDOUT) {
4165 timer.cancel_event(op->ontimeout);
4166 }
4167
4168 delete op;
4169}
4170
4171// pool stats
4172
4173void Objecter::get_pool_stats(list<string>& pools,
4174 map<string,pool_stat_t> *result,
4175 Context *onfinish)
4176{
4177 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4178
4179 PoolStatOp *op = new PoolStatOp;
31f18b77 4180 op->tid = ++last_tid;
7c673cae
FG
4181 op->pools = pools;
4182 op->pool_stats = result;
4183 op->onfinish = onfinish;
4184 if (mon_timeout > timespan(0)) {
4185 op->ontimeout = timer.add_event(mon_timeout,
4186 [this, op]() {
4187 pool_stat_op_cancel(op->tid,
4188 -ETIMEDOUT); });
4189 } else {
4190 op->ontimeout = 0;
4191 }
4192
4193 unique_lock wl(rwlock);
4194
4195 poolstat_ops[op->tid] = op;
4196
4197 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4198
4199 _poolstat_submit(op);
4200}
4201
4202void Objecter::_poolstat_submit(PoolStatOp *op)
4203{
4204 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4205 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4206 op->pools,
4207 last_seen_pgmap_version));
4208 op->last_submit = ceph::mono_clock::now();
4209
4210 logger->inc(l_osdc_poolstat_send);
4211}
4212
4213void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4214{
4215 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4216 ceph_tid_t tid = m->get_tid();
4217
4218 unique_lock wl(rwlock);
31f18b77 4219 if (!initialized) {
7c673cae
FG
4220 m->put();
4221 return;
4222 }
4223
4224 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4225 if (iter != poolstat_ops.end()) {
4226 PoolStatOp *op = poolstat_ops[tid];
4227 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4228 *op->pool_stats = m->pool_stats;
4229 if (m->version > last_seen_pgmap_version) {
4230 last_seen_pgmap_version = m->version;
4231 }
4232 op->onfinish->complete(0);
4233 _finish_pool_stat_op(op, 0);
4234 } else {
4235 ldout(cct, 10) << "unknown request " << tid << dendl;
4236 }
4237 ldout(cct, 10) << "done" << dendl;
4238 m->put();
4239}
4240
4241int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4242{
31f18b77 4243 assert(initialized);
7c673cae
FG
4244
4245 unique_lock wl(rwlock);
4246
4247 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4248 if (it == poolstat_ops.end()) {
4249 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4250 return -ENOENT;
4251 }
4252
4253 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4254
4255 PoolStatOp *op = it->second;
4256 if (op->onfinish)
4257 op->onfinish->complete(r);
4258 _finish_pool_stat_op(op, r);
4259 return 0;
4260}
4261
4262void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4263{
4264 // rwlock is locked unique
4265
4266 poolstat_ops.erase(op->tid);
4267 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4268
4269 if (op->ontimeout && r != -ETIMEDOUT)
4270 timer.cancel_event(op->ontimeout);
4271
4272 delete op;
4273}
4274
d2e6a577
FG
4275void Objecter::get_fs_stats(ceph_statfs& result,
4276 boost::optional<int64_t> data_pool,
4277 Context *onfinish)
7c673cae
FG
4278{
4279 ldout(cct, 10) << "get_fs_stats" << dendl;
4280 unique_lock l(rwlock);
4281
4282 StatfsOp *op = new StatfsOp;
31f18b77 4283 op->tid = ++last_tid;
7c673cae 4284 op->stats = &result;
d2e6a577 4285 op->data_pool = data_pool;
7c673cae
FG
4286 op->onfinish = onfinish;
4287 if (mon_timeout > timespan(0)) {
4288 op->ontimeout = timer.add_event(mon_timeout,
4289 [this, op]() {
4290 statfs_op_cancel(op->tid,
4291 -ETIMEDOUT); });
4292 } else {
4293 op->ontimeout = 0;
4294 }
4295 statfs_ops[op->tid] = op;
4296
4297 logger->set(l_osdc_statfs_active, statfs_ops.size());
4298
4299 _fs_stats_submit(op);
4300}
4301
4302void Objecter::_fs_stats_submit(StatfsOp *op)
4303{
4304 // rwlock is locked unique
4305
4306 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4307 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4308 op->data_pool,
7c673cae
FG
4309 last_seen_pgmap_version));
4310 op->last_submit = ceph::mono_clock::now();
4311
4312 logger->inc(l_osdc_statfs_send);
4313}
4314
4315void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4316{
4317 unique_lock wl(rwlock);
31f18b77 4318 if (!initialized) {
7c673cae
FG
4319 m->put();
4320 return;
4321 }
4322
4323 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4324 ceph_tid_t tid = m->get_tid();
4325
4326 if (statfs_ops.count(tid)) {
4327 StatfsOp *op = statfs_ops[tid];
4328 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4329 *(op->stats) = m->h.st;
4330 if (m->h.version > last_seen_pgmap_version)
4331 last_seen_pgmap_version = m->h.version;
4332 op->onfinish->complete(0);
4333 _finish_statfs_op(op, 0);
4334 } else {
4335 ldout(cct, 10) << "unknown request " << tid << dendl;
4336 }
4337 m->put();
4338 ldout(cct, 10) << "done" << dendl;
4339}
4340
4341int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4342{
31f18b77 4343 assert(initialized);
7c673cae
FG
4344
4345 unique_lock wl(rwlock);
4346
4347 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4348 if (it == statfs_ops.end()) {
4349 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4350 return -ENOENT;
4351 }
4352
4353 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4354
4355 StatfsOp *op = it->second;
4356 if (op->onfinish)
4357 op->onfinish->complete(r);
4358 _finish_statfs_op(op, r);
4359 return 0;
4360}
4361
4362void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4363{
4364 // rwlock is locked unique
4365
4366 statfs_ops.erase(op->tid);
4367 logger->set(l_osdc_statfs_active, statfs_ops.size());
4368
4369 if (op->ontimeout && r != -ETIMEDOUT)
4370 timer.cancel_event(op->ontimeout);
4371
4372 delete op;
4373}
4374
4375// scatter/gather
4376
4377void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4378 vector<bufferlist>& resultbl,
4379 bufferlist *bl, Context *onfinish)
4380{
4381 // all done
4382 ldout(cct, 15) << "_sg_read_finish" << dendl;
4383
4384 if (extents.size() > 1) {
4385 Striper::StripedReadResult r;
4386 vector<bufferlist>::iterator bit = resultbl.begin();
4387 for (vector<ObjectExtent>::iterator eit = extents.begin();
4388 eit != extents.end();
4389 ++eit, ++bit) {
4390 r.add_partial_result(cct, *bit, eit->buffer_extents);
4391 }
4392 bl->clear();
4393 r.assemble_result(cct, *bl, false);
4394 } else {
4395 ldout(cct, 15) << " only one frag" << dendl;
4396 bl->claim(resultbl[0]);
4397 }
4398
4399 // done
4400 uint64_t bytes_read = bl->length();
4401 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4402
4403 if (onfinish) {
4404 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4405 }
4406}
4407
4408
4409void Objecter::ms_handle_connect(Connection *con)
4410{
4411 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4412 if (!initialized)
7c673cae
FG
4413 return;
4414
4415 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4416 resend_mon_ops();
4417}
4418
4419bool Objecter::ms_handle_reset(Connection *con)
4420{
31f18b77 4421 if (!initialized)
7c673cae
FG
4422 return false;
4423 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
91327a77
AA
4424 unique_lock wl(rwlock);
4425
7c673cae
FG
4426 OSDSession *session = static_cast<OSDSession*>(con->get_priv());
4427 if (session) {
4428 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4429 << " osd." << session->osd << dendl;
31f18b77 4430 if (!initialized) {
7c673cae
FG
4431 wl.unlock();
4432 return false;
4433 }
4434 map<uint64_t, LingerOp *> lresend;
4435 OSDSession::unique_lock sl(session->lock);
4436 _reopen_session(session);
4437 _kick_requests(session, lresend);
4438 sl.unlock();
4439 _linger_ops_resend(lresend, wl);
4440 wl.unlock();
4441 maybe_request_map();
4442 session->put();
4443 }
4444 return true;
4445 }
4446 return false;
4447}
4448
4449void Objecter::ms_handle_remote_reset(Connection *con)
4450{
4451 /*
4452 * treat these the same.
4453 */
4454 ms_handle_reset(con);
4455}
4456
4457bool Objecter::ms_handle_refused(Connection *con)
4458{
4459 // just log for now
4460 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4461 int osd = osdmap->identify_osd(con->get_peer_addr());
4462 if (osd >= 0) {
4463 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4464 }
4465 }
4466 return false;
4467}
4468
4469bool Objecter::ms_get_authorizer(int dest_type,
4470 AuthAuthorizer **authorizer,
4471 bool force_new)
4472{
31f18b77 4473 if (!initialized)
7c673cae
FG
4474 return false;
4475 if (dest_type == CEPH_ENTITY_TYPE_MON)
4476 return true;
4477 *authorizer = monc->build_authorizer(dest_type);
4478 return *authorizer != NULL;
4479}
4480
4481void Objecter::op_target_t::dump(Formatter *f) const
4482{
4483 f->dump_stream("pg") << pgid;
4484 f->dump_int("osd", osd);
4485 f->dump_stream("object_id") << base_oid;
4486 f->dump_stream("object_locator") << base_oloc;
4487 f->dump_stream("target_object_id") << target_oid;
4488 f->dump_stream("target_object_locator") << target_oloc;
4489 f->dump_int("paused", (int)paused);
4490 f->dump_int("used_replica", (int)used_replica);
4491 f->dump_int("precalc_pgid", (int)precalc_pgid);
4492}
4493
4494void Objecter::_dump_active(OSDSession *s)
4495{
4496 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4497 p != s->ops.end();
4498 ++p) {
4499 Op *op = p->second;
4500 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4501 << "\tosd." << (op->session ? op->session->osd : -1)
4502 << "\t" << op->target.base_oid
4503 << "\t" << op->ops << dendl;
4504 }
4505}
4506
4507void Objecter::_dump_active()
4508{
31f18b77 4509 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae
FG
4510 << dendl;
4511 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4512 siter != osd_sessions.end(); ++siter) {
4513 OSDSession *s = siter->second;
4514 OSDSession::shared_lock sl(s->lock);
4515 _dump_active(s);
4516 sl.unlock();
4517 }
4518 _dump_active(homeless_session);
4519}
4520
4521void Objecter::dump_active()
4522{
4523 shared_lock rl(rwlock);
4524 _dump_active();
4525 rl.unlock();
4526}
4527
4528void Objecter::dump_requests(Formatter *fmt)
4529{
4530 // Read-lock on Objecter held here
4531 fmt->open_object_section("requests");
4532 dump_ops(fmt);
4533 dump_linger_ops(fmt);
4534 dump_pool_ops(fmt);
4535 dump_pool_stat_ops(fmt);
4536 dump_statfs_ops(fmt);
4537 dump_command_ops(fmt);
4538 fmt->close_section(); // requests object
4539}
4540
4541void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4542{
4543 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4544 p != s->ops.end();
4545 ++p) {
4546 Op *op = p->second;
4547 fmt->open_object_section("op");
4548 fmt->dump_unsigned("tid", op->tid);
4549 op->target.dump(fmt);
4550 fmt->dump_stream("last_sent") << op->stamp;
4551 fmt->dump_int("attempts", op->attempts);
4552 fmt->dump_stream("snapid") << op->snapid;
4553 fmt->dump_stream("snap_context") << op->snapc;
4554 fmt->dump_stream("mtime") << op->mtime;
4555
4556 fmt->open_array_section("osd_ops");
4557 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4558 it != op->ops.end();
4559 ++it) {
4560 fmt->dump_stream("osd_op") << *it;
4561 }
4562 fmt->close_section(); // osd_ops array
4563
4564 fmt->close_section(); // op object
4565 }
4566}
4567
4568void Objecter::dump_ops(Formatter *fmt)
4569{
4570 // Read-lock on Objecter held
4571 fmt->open_array_section("ops");
4572 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4573 siter != osd_sessions.end(); ++siter) {
4574 OSDSession *s = siter->second;
4575 OSDSession::shared_lock sl(s->lock);
4576 _dump_ops(s, fmt);
4577 sl.unlock();
4578 }
4579 _dump_ops(homeless_session, fmt);
4580 fmt->close_section(); // ops array
4581}
4582
4583void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4584{
4585 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4586 p != s->linger_ops.end();
4587 ++p) {
4588 LingerOp *op = p->second;
4589 fmt->open_object_section("linger_op");
4590 fmt->dump_unsigned("linger_id", op->linger_id);
4591 op->target.dump(fmt);
4592 fmt->dump_stream("snapid") << op->snap;
4593 fmt->dump_stream("registered") << op->registered;
4594 fmt->close_section(); // linger_op object
4595 }
4596}
4597
4598void Objecter::dump_linger_ops(Formatter *fmt)
4599{
4600 // We have a read-lock on the objecter
4601 fmt->open_array_section("linger_ops");
4602 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4603 siter != osd_sessions.end(); ++siter) {
4604 OSDSession *s = siter->second;
4605 OSDSession::shared_lock sl(s->lock);
4606 _dump_linger_ops(s, fmt);
4607 sl.unlock();
4608 }
4609 _dump_linger_ops(homeless_session, fmt);
4610 fmt->close_section(); // linger_ops array
4611}
4612
4613void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4614{
4615 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4616 p != s->command_ops.end();
4617 ++p) {
4618 CommandOp *op = p->second;
4619 fmt->open_object_section("command_op");
4620 fmt->dump_unsigned("command_id", op->tid);
4621 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4622 fmt->open_array_section("command");
4623 for (vector<string>::const_iterator q = op->cmd.begin();
4624 q != op->cmd.end(); ++q)
4625 fmt->dump_string("word", *q);
4626 fmt->close_section();
4627 if (op->target_osd >= 0)
4628 fmt->dump_int("target_osd", op->target_osd);
4629 else
4630 fmt->dump_stream("target_pg") << op->target_pg;
4631 fmt->close_section(); // command_op object
4632 }
4633}
4634
4635void Objecter::dump_command_ops(Formatter *fmt)
4636{
4637 // We have a read-lock on the Objecter here
4638 fmt->open_array_section("command_ops");
4639 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4640 siter != osd_sessions.end(); ++siter) {
4641 OSDSession *s = siter->second;
4642 OSDSession::shared_lock sl(s->lock);
4643 _dump_command_ops(s, fmt);
4644 sl.unlock();
4645 }
4646 _dump_command_ops(homeless_session, fmt);
4647 fmt->close_section(); // command_ops array
4648}
4649
4650void Objecter::dump_pool_ops(Formatter *fmt) const
4651{
4652 fmt->open_array_section("pool_ops");
4653 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4654 p != pool_ops.end();
4655 ++p) {
4656 PoolOp *op = p->second;
4657 fmt->open_object_section("pool_op");
4658 fmt->dump_unsigned("tid", op->tid);
4659 fmt->dump_int("pool", op->pool);
4660 fmt->dump_string("name", op->name);
4661 fmt->dump_int("operation_type", op->pool_op);
4662 fmt->dump_unsigned("auid", op->auid);
4663 fmt->dump_unsigned("crush_rule", op->crush_rule);
4664 fmt->dump_stream("snapid") << op->snapid;
4665 fmt->dump_stream("last_sent") << op->last_submit;
4666 fmt->close_section(); // pool_op object
4667 }
4668 fmt->close_section(); // pool_ops array
4669}
4670
4671void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4672{
4673 fmt->open_array_section("pool_stat_ops");
4674 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4675 p != poolstat_ops.end();
4676 ++p) {
4677 PoolStatOp *op = p->second;
4678 fmt->open_object_section("pool_stat_op");
4679 fmt->dump_unsigned("tid", op->tid);
4680 fmt->dump_stream("last_sent") << op->last_submit;
4681
4682 fmt->open_array_section("pools");
4683 for (list<string>::const_iterator it = op->pools.begin();
4684 it != op->pools.end();
4685 ++it) {
4686 fmt->dump_string("pool", *it);
4687 }
4688 fmt->close_section(); // pools array
4689
4690 fmt->close_section(); // pool_stat_op object
4691 }
4692 fmt->close_section(); // pool_stat_ops array
4693}
4694
4695void Objecter::dump_statfs_ops(Formatter *fmt) const
4696{
4697 fmt->open_array_section("statfs_ops");
4698 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4699 p != statfs_ops.end();
4700 ++p) {
4701 StatfsOp *op = p->second;
4702 fmt->open_object_section("statfs_op");
4703 fmt->dump_unsigned("tid", op->tid);
4704 fmt->dump_stream("last_sent") << op->last_submit;
4705 fmt->close_section(); // statfs_op object
4706 }
4707 fmt->close_section(); // statfs_ops array
4708}
4709
4710Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4711 m_objecter(objecter)
4712{
4713}
4714
4715bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
4716 std::string format, bufferlist& out)
4717{
4718 Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
4719 shared_lock rl(m_objecter->rwlock);
4720 m_objecter->dump_requests(f);
4721 f->flush(out);
4722 delete f;
4723 return true;
4724}
4725
4726void Objecter::blacklist_self(bool set)
4727{
4728 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4729
4730 vector<string> cmd;
4731 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4732 if (set)
4733 cmd.push_back("\"blacklistop\":\"add\",");
4734 else
4735 cmd.push_back("\"blacklistop\":\"rm\",");
4736 stringstream ss;
4737 ss << messenger->get_myaddr();
4738 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4739
4740 MMonCommand *m = new MMonCommand(monc->get_fsid());
4741 m->cmd = cmd;
4742
4743 monc->send_mon_message(m);
4744}
4745
4746// commands
4747
4748void Objecter::handle_command_reply(MCommandReply *m)
4749{
4750 unique_lock wl(rwlock);
31f18b77 4751 if (!initialized) {
7c673cae
FG
4752 m->put();
4753 return;
4754 }
4755
4756 ConnectionRef con = m->get_connection();
4757 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
4758 if (!s || s->con != con) {
4759 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4760 m->put();
4761 if (s)
4762 s->put();
4763 return;
4764 }
4765
4766 OSDSession::shared_lock sl(s->lock);
4767 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4768 if (p == s->command_ops.end()) {
4769 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4770 << " not found" << dendl;
4771 m->put();
4772 sl.unlock();
4773 if (s)
4774 s->put();
4775 return;
4776 }
4777
4778 CommandOp *c = p->second;
4779 if (!c->session ||
4780 m->get_connection() != c->session->con) {
4781 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4782 << " got reply from wrong connection "
4783 << m->get_connection() << " " << m->get_source_inst()
4784 << dendl;
4785 m->put();
4786 sl.unlock();
4787 if (s)
4788 s->put();
4789 return;
4790 }
4791 if (c->poutbl) {
4792 c->poutbl->claim(m->get_data());
4793 }
4794
4795 sl.unlock();
4796
28e407b8 4797 OSDSession::unique_lock sul(s->lock);
7c673cae 4798 _finish_command(c, m->r, m->rs);
28e407b8
AA
4799 sul.unlock();
4800
7c673cae
FG
4801 m->put();
4802 if (s)
4803 s->put();
4804}
4805
4806void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4807{
4808 shunique_lock sul(rwlock, ceph::acquire_unique);
4809
31f18b77 4810 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4811 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4812 c->tid = tid;
4813
4814 {
4815 OSDSession::unique_lock hs_wl(homeless_session->lock);
4816 _session_command_op_assign(homeless_session, c);
4817 }
4818
4819 _calc_command_target(c, sul);
4820 _assign_command_session(c, sul);
4821 if (osd_timeout > timespan(0)) {
4822 c->ontimeout = timer.add_event(osd_timeout,
4823 [this, c, tid]() {
4824 command_op_cancel(c->session, tid,
4825 -ETIMEDOUT); });
4826 }
4827
4828 if (!c->session->is_homeless()) {
4829 _send_command(c);
4830 } else {
4831 _maybe_request_map();
4832 }
4833 if (c->map_check_error)
4834 _send_command_map_check(c);
4835 *ptid = tid;
4836
4837 logger->inc(l_osdc_command_active);
4838}
4839
4840int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4841{
4842 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4843
4844 c->map_check_error = 0;
4845
4846 // ignore overlays, just like we do with pg ops
4847 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4848
4849 if (c->target_osd >= 0) {
4850 if (!osdmap->exists(c->target_osd)) {
4851 c->map_check_error = -ENOENT;
4852 c->map_check_error_str = "osd dne";
4853 c->target.osd = -1;
4854 return RECALC_OP_TARGET_OSD_DNE;
4855 }
4856 if (osdmap->is_down(c->target_osd)) {
4857 c->map_check_error = -ENXIO;
4858 c->map_check_error_str = "osd down";
4859 c->target.osd = -1;
4860 return RECALC_OP_TARGET_OSD_DOWN;
4861 }
4862 c->target.osd = c->target_osd;
4863 } else {
4864 int ret = _calc_target(&(c->target), nullptr, true);
4865 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4866 c->map_check_error = -ENOENT;
4867 c->map_check_error_str = "pool dne";
4868 c->target.osd = -1;
4869 return ret;
4870 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4871 c->map_check_error = -ENXIO;
4872 c->map_check_error_str = "osd down";
4873 c->target.osd = -1;
4874 return ret;
4875 }
4876 }
4877
4878 OSDSession *s;
4879 int r = _get_session(c->target.osd, &s, sul);
4880 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4881
4882 if (c->session != s) {
4883 put_session(s);
4884 return RECALC_OP_TARGET_NEED_RESEND;
4885 }
4886
4887 put_session(s);
4888
4889 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4890 << c->session << dendl;
4891
4892 return RECALC_OP_TARGET_NO_ACTION;
4893}
4894
4895void Objecter::_assign_command_session(CommandOp *c,
4896 shunique_lock& sul)
4897{
4898 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4899
4900 OSDSession *s;
4901 int r = _get_session(c->target.osd, &s, sul);
4902 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4903
4904 if (c->session != s) {
4905 if (c->session) {
4906 OSDSession *cs = c->session;
4907 OSDSession::unique_lock csl(cs->lock);
4908 _session_command_op_remove(c->session, c);
4909 csl.unlock();
4910 }
4911 OSDSession::unique_lock sl(s->lock);
4912 _session_command_op_assign(s, c);
4913 }
4914
4915 put_session(s);
4916}
4917
4918void Objecter::_send_command(CommandOp *c)
4919{
4920 ldout(cct, 10) << "_send_command " << c->tid << dendl;
4921 assert(c->session);
4922 assert(c->session->con);
4923 MCommand *m = new MCommand(monc->monmap.fsid);
4924 m->cmd = c->cmd;
4925 m->set_data(c->inbl);
4926 m->set_tid(c->tid);
4927 c->session->con->send_message(m);
4928 logger->inc(l_osdc_command_send);
4929}
4930
4931int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4932{
31f18b77 4933 assert(initialized);
7c673cae
FG
4934
4935 unique_lock wl(rwlock);
4936
4937 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4938 if (it == s->command_ops.end()) {
4939 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4940 return -ENOENT;
4941 }
4942
4943 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4944
4945 CommandOp *op = it->second;
4946 _command_cancel_map_check(op);
28e407b8 4947 OSDSession::unique_lock sl(op->session->lock);
7c673cae 4948 _finish_command(op, r, "");
28e407b8 4949 sl.unlock();
7c673cae
FG
4950 return 0;
4951}
4952
4953void Objecter::_finish_command(CommandOp *c, int r, string rs)
4954{
4955 // rwlock is locked unique
28e407b8 4956 // session lock is locked
7c673cae
FG
4957
4958 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4959 << rs << dendl;
4960 if (c->prs)
4961 *c->prs = rs;
4962 if (c->onfinish)
4963 c->onfinish->complete(r);
4964
4965 if (c->ontimeout && r != -ETIMEDOUT)
4966 timer.cancel_event(c->ontimeout);
4967
4968 OSDSession *s = c->session;
7c673cae 4969 _session_command_op_remove(c->session, c);
7c673cae
FG
4970
4971 c->put();
4972
4973 logger->dec(l_osdc_command_active);
4974}
4975
4976Objecter::OSDSession::~OSDSession()
4977{
4978 // Caller is responsible for re-assigning or
4979 // destroying any ops that were assigned to us
4980 assert(ops.empty());
4981 assert(linger_ops.empty());
4982 assert(command_ops.empty());
4983}
4984
4985Objecter::~Objecter()
4986{
4987 delete osdmap;
4988
4989 assert(homeless_session->get_nref() == 1);
31f18b77 4990 assert(num_homeless_ops == 0);
7c673cae
FG
4991 homeless_session->put();
4992
4993 assert(osd_sessions.empty());
4994 assert(poolstat_ops.empty());
4995 assert(statfs_ops.empty());
4996 assert(pool_ops.empty());
4997 assert(waiting_for_map.empty());
4998 assert(linger_ops.empty());
4999 assert(check_latest_map_lingers.empty());
5000 assert(check_latest_map_ops.empty());
5001 assert(check_latest_map_commands.empty());
5002
5003 assert(!m_request_state_hook);
5004 assert(!logger);
5005}
5006
5007/**
5008 * Wait until this OSD map epoch is received before
5009 * sending any more operations to OSDs. Use this
5010 * when it is known that the client can't trust
5011 * anything from before this epoch (e.g. due to
5012 * client blacklist at this epoch).
5013 */
5014void Objecter::set_epoch_barrier(epoch_t epoch)
5015{
5016 unique_lock wl(rwlock);
5017
5018 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5019 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5020 << dendl;
5021 if (epoch > epoch_barrier) {
5022 epoch_barrier = epoch;
5023 _maybe_request_map();
5024 }
5025}
5026
5027
5028
5029hobject_t Objecter::enumerate_objects_begin()
5030{
5031 return hobject_t();
5032}
5033
5034hobject_t Objecter::enumerate_objects_end()
5035{
5036 return hobject_t::get_max();
5037}
5038
5039struct C_EnumerateReply : public Context {
5040 bufferlist bl;
5041
5042 Objecter *objecter;
5043 hobject_t *next;
5044 std::list<librados::ListObjectImpl> *result;
5045 const hobject_t end;
5046 const int64_t pool_id;
5047 Context *on_finish;
5048
5049 epoch_t epoch;
5050 int budget;
5051
5052 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5053 std::list<librados::ListObjectImpl> *result_,
5054 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5055 objecter(objecter_), next(next_), result(result_),
5056 end(end_), pool_id(pool_id_), on_finish(on_finish_),
5057 epoch(0), budget(0)
5058 {}
5059
5060 void finish(int r) override {
5061 objecter->_enumerate_reply(
5062 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5063 }
5064};
5065
5066void Objecter::enumerate_objects(
5067 int64_t pool_id,
5068 const std::string &ns,
5069 const hobject_t &start,
5070 const hobject_t &end,
5071 const uint32_t max,
5072 const bufferlist &filter_bl,
5073 std::list<librados::ListObjectImpl> *result,
5074 hobject_t *next,
5075 Context *on_finish)
5076{
5077 assert(result);
5078
5079 if (!end.is_max() && start > end) {
5080 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5081 on_finish->complete(-EINVAL);
5082 return;
5083 }
5084
5085 if (max < 1) {
5086 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5087 on_finish->complete(-EINVAL);
5088 return;
5089 }
5090
5091 if (start.is_max()) {
5092 on_finish->complete(0);
5093 return;
5094 }
5095
5096 shared_lock rl(rwlock);
5097 assert(osdmap->get_epoch());
5098 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5099 rl.unlock();
5100 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5101 on_finish->complete(-EOPNOTSUPP);
5102 return;
5103 }
5104 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5105 if (!p) {
5106 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5107 << osdmap->get_epoch() << dendl;
5108 rl.unlock();
5109 on_finish->complete(-ENOENT);
5110 return;
5111 } else {
5112 rl.unlock();
5113 }
5114
5115 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5116
5117 // Stash completion state
5118 C_EnumerateReply *on_ack = new C_EnumerateReply(
5119 this, next, result, end, pool_id, on_finish);
5120
5121 ObjectOperation op;
5122 op.pg_nls(max, filter_bl, start, 0);
5123
5124 // Issue. See you later in _enumerate_reply
5125 object_locator_t oloc(pool_id, ns);
5126 pg_read(start.get_hash(), oloc, op,
5127 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5128}
5129
5130void Objecter::_enumerate_reply(
5131 bufferlist &bl,
5132 int r,
5133 const hobject_t &end,
5134 const int64_t pool_id,
5135 int budget,
5136 epoch_t reply_epoch,
5137 std::list<librados::ListObjectImpl> *result,
5138 hobject_t *next,
5139 Context *on_finish)
5140{
5141 if (budget > 0) {
5142 put_op_budget_bytes(budget);
5143 }
5144
5145 if (r < 0) {
5146 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5147 on_finish->complete(r);
5148 return;
5149 }
5150
5151 assert(next != NULL);
5152
5153 // Decode the results
5154 bufferlist::iterator iter = bl.begin();
5155 pg_nls_response_t response;
5156
5157 // XXX extra_info doesn't seem used anywhere?
5158 bufferlist extra_info;
5159 ::decode(response, iter);
5160 if (!iter.end()) {
5161 ::decode(extra_info, iter);
5162 }
5163
5164 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5165 << " handle " << response.handle
5166 << " reply_epoch " << reply_epoch << dendl;
5167 ldout(cct, 20) << __func__ << ": response.entries.size "
5168 << response.entries.size() << ", response.entries "
5169 << response.entries << dendl;
5170 if (response.handle <= end) {
5171 *next = response.handle;
5172 } else {
5173 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5174 << dendl;
5175 *next = end;
5176
5177 // drop anything after 'end'
5178 shared_lock rl(rwlock);
5179 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5180 if (!pool) {
5181 // pool is gone, drop any results which are now meaningless.
5182 rl.unlock();
5183 on_finish->complete(-ENOENT);
5184 return;
5185 }
5186 while (!response.entries.empty()) {
5187 uint32_t hash = response.entries.back().locator.empty() ?
5188 pool->hash_key(response.entries.back().oid,
5189 response.entries.back().nspace) :
5190 pool->hash_key(response.entries.back().locator,
5191 response.entries.back().nspace);
5192 hobject_t last(response.entries.back().oid,
5193 response.entries.back().locator,
5194 CEPH_NOSNAP,
5195 hash,
5196 pool_id,
5197 response.entries.back().nspace);
5198 if (last < end)
5199 break;
5200 ldout(cct, 20) << __func__ << " dropping item " << last
5201 << " >= end " << end << dendl;
5202 response.entries.pop_back();
5203 }
5204 rl.unlock();
5205 }
5206 if (!response.entries.empty()) {
5207 result->merge(response.entries);
5208 }
5209
5210 // release the listing context's budget once all
5211 // OPs (in the session) are finished
5212#if 0
5213 put_nlist_context_budget(list_context);
5214#endif
5215 on_finish->complete(r);
5216 return;
5217}
5218
5219namespace {
5220 using namespace librados;
5221
5222 template <typename T>
5223 void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
5224 {
5225 for (auto bl : bls) {
5226 auto p = bl.begin();
5227 T t;
5228 decode(t, p);
5229 items.push_back(t);
5230 }
5231 }
5232
5233 struct C_ObjectOperation_scrub_ls : public Context {
5234 bufferlist bl;
5235 uint32_t *interval;
5236 std::vector<inconsistent_obj_t> *objects = nullptr;
5237 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5238 int *rval;
5239
5240 C_ObjectOperation_scrub_ls(uint32_t *interval,
5241 std::vector<inconsistent_obj_t> *objects,
5242 int *rval)
5243 : interval(interval), objects(objects), rval(rval) {}
5244 C_ObjectOperation_scrub_ls(uint32_t *interval,
5245 std::vector<inconsistent_snapset_t> *snapsets,
5246 int *rval)
5247 : interval(interval), snapsets(snapsets), rval(rval) {}
5248 void finish(int r) override {
5249 if (r < 0 && r != -EAGAIN) {
5250 if (rval)
5251 *rval = r;
5252 return;
5253 }
5254
5255 if (rval)
5256 *rval = 0;
5257
5258 try {
5259 decode();
5260 } catch (buffer::error&) {
5261 if (rval)
5262 *rval = -EIO;
5263 }
5264 }
5265 private:
5266 void decode() {
5267 scrub_ls_result_t result;
5268 auto p = bl.begin();
5269 result.decode(p);
5270 *interval = result.interval;
5271 if (objects) {
5272 do_decode(*objects, result.vals);
5273 } else {
5274 do_decode(*snapsets, result.vals);
5275 }
5276 }
5277 };
5278
5279 template <typename T>
5280 void do_scrub_ls(::ObjectOperation *op,
5281 const scrub_ls_arg_t& arg,
5282 std::vector<T> *items,
5283 uint32_t *interval,
5284 int *rval)
5285 {
5286 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5287 op->flags |= CEPH_OSD_FLAG_PGOP;
5288 assert(interval);
5289 arg.encode(osd_op.indata);
5290 unsigned p = op->ops.size() - 1;
5291 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5292 op->out_handler[p] = h;
5293 op->out_bl[p] = &h->bl;
5294 op->out_rval[p] = rval;
5295 }
5296}
5297
5298void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5299 uint64_t max_to_get,
5300 std::vector<librados::inconsistent_obj_t> *objects,
5301 uint32_t *interval,
5302 int *rval)
5303{
5304 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5305 do_scrub_ls(this, arg, objects, interval, rval);
5306}
5307
5308void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5309 uint64_t max_to_get,
5310 std::vector<librados::inconsistent_snapset_t> *snapsets,
5311 uint32_t *interval,
5312 int *rval)
5313{
5314 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5315 do_scrub_ls(this, arg, snapsets, interval, rval);
5316}