]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
update sources to 12.2.7
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Objecter.h"
16#include "osd/OSDMap.h"
17#include "Filer.h"
18
19#include "mon/MonClient.h"
20
21#include "msg/Messenger.h"
22#include "msg/Message.h"
23
24#include "messages/MPing.h"
25#include "messages/MOSDOp.h"
26#include "messages/MOSDOpReply.h"
27#include "messages/MOSDBackoff.h"
28#include "messages/MOSDMap.h"
29
30#include "messages/MPoolOp.h"
31#include "messages/MPoolOpReply.h"
32
33#include "messages/MGetPoolStats.h"
34#include "messages/MGetPoolStatsReply.h"
35#include "messages/MStatfs.h"
36#include "messages/MStatfsReply.h"
37
38#include "messages/MMonCommand.h"
39
40#include "messages/MCommand.h"
41#include "messages/MCommandReply.h"
42
43#include "messages/MWatchNotify.h"
44
45#include <errno.h>
46
47#include "common/config.h"
48#include "common/perf_counters.h"
49#include "common/scrub_types.h"
50#include "include/str_list.h"
51#include "common/errno.h"
52#include "common/EventTrace.h"
53
54using ceph::real_time;
55using ceph::real_clock;
56
57using ceph::mono_clock;
58using ceph::mono_time;
59
60using ceph::timespan;
61
62
63#define dout_subsys ceph_subsys_objecter
64#undef dout_prefix
65#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
66
67
68enum {
69 l_osdc_first = 123200,
70 l_osdc_op_active,
71 l_osdc_op_laggy,
72 l_osdc_op_send,
73 l_osdc_op_send_bytes,
74 l_osdc_op_resend,
75 l_osdc_op_reply,
76
77 l_osdc_op,
78 l_osdc_op_r,
79 l_osdc_op_w,
80 l_osdc_op_rmw,
81 l_osdc_op_pg,
82
83 l_osdc_osdop_stat,
84 l_osdc_osdop_create,
85 l_osdc_osdop_read,
86 l_osdc_osdop_write,
87 l_osdc_osdop_writefull,
88 l_osdc_osdop_writesame,
89 l_osdc_osdop_append,
90 l_osdc_osdop_zero,
91 l_osdc_osdop_truncate,
92 l_osdc_osdop_delete,
93 l_osdc_osdop_mapext,
94 l_osdc_osdop_sparse_read,
95 l_osdc_osdop_clonerange,
96 l_osdc_osdop_getxattr,
97 l_osdc_osdop_setxattr,
98 l_osdc_osdop_cmpxattr,
99 l_osdc_osdop_rmxattr,
100 l_osdc_osdop_resetxattrs,
101 l_osdc_osdop_tmap_up,
102 l_osdc_osdop_tmap_put,
103 l_osdc_osdop_tmap_get,
104 l_osdc_osdop_call,
105 l_osdc_osdop_watch,
106 l_osdc_osdop_notify,
107 l_osdc_osdop_src_cmpxattr,
108 l_osdc_osdop_pgls,
109 l_osdc_osdop_pgls_filter,
110 l_osdc_osdop_other,
111
112 l_osdc_linger_active,
113 l_osdc_linger_send,
114 l_osdc_linger_resend,
115 l_osdc_linger_ping,
116
117 l_osdc_poolop_active,
118 l_osdc_poolop_send,
119 l_osdc_poolop_resend,
120
121 l_osdc_poolstat_active,
122 l_osdc_poolstat_send,
123 l_osdc_poolstat_resend,
124
125 l_osdc_statfs_active,
126 l_osdc_statfs_send,
127 l_osdc_statfs_resend,
128
129 l_osdc_command_active,
130 l_osdc_command_send,
131 l_osdc_command_resend,
132
133 l_osdc_map_epoch,
134 l_osdc_map_full,
135 l_osdc_map_inc,
136
137 l_osdc_osd_sessions,
138 l_osdc_osd_session_open,
139 l_osdc_osd_session_close,
140 l_osdc_osd_laggy,
141
142 l_osdc_osdop_omap_wr,
143 l_osdc_osdop_omap_rd,
144 l_osdc_osdop_omap_del,
145
146 l_osdc_last,
147};
148
149
150// config obs ----------------------------
151
152static const char *config_keys[] = {
153 "crush_location",
154 NULL
155};
156
157class Objecter::RequestStateHook : public AdminSocketHook {
158 Objecter *m_objecter;
159public:
160 explicit RequestStateHook(Objecter *objecter);
161 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
162 bufferlist& out) override;
163};
164
165/**
166 * This is a more limited form of C_Contexts, but that requires
167 * a ceph_context which we don't have here.
168 */
169class ObjectOperation::C_TwoContexts : public Context {
170 Context *first;
171 Context *second;
172public:
173 C_TwoContexts(Context *first, Context *second) :
174 first(first), second(second) {}
175 void finish(int r) override {
176 first->complete(r);
177 second->complete(r);
178 first = NULL;
179 second = NULL;
180 }
181
182 ~C_TwoContexts() override {
183 delete first;
184 delete second;
185 }
186};
187
188void ObjectOperation::add_handler(Context *extra) {
189 size_t last = out_handler.size() - 1;
190 Context *orig = out_handler[last];
191 if (orig) {
192 Context *wrapper = new C_TwoContexts(orig, extra);
193 out_handler[last] = wrapper;
194 } else {
195 out_handler[last] = extra;
196 }
197}
198
199Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
200 object_t& oid)
201{
202 if (oid.name.empty())
203 return unique_completion_lock();
204
205 static constexpr uint32_t HASH_PRIME = 1021;
206 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
207 % HASH_PRIME;
208
209 return unique_completion_lock(completion_locks[h % num_locks],
210 std::defer_lock);
211}
212
213const char** Objecter::get_tracked_conf_keys() const
214{
215 return config_keys;
216}
217
218
219void Objecter::handle_conf_change(const struct md_config_t *conf,
220 const std::set <std::string> &changed)
221{
222 if (changed.count("crush_location")) {
223 update_crush_location();
224 }
225}
226
227void Objecter::update_crush_location()
228{
229 unique_lock wl(rwlock);
230 crush_location = cct->crush_location.get_location();
231}
232
233// messages ------------------------------
234
235/*
236 * initialize only internal data structures, don't initiate cluster interaction
237 */
238void Objecter::init()
239{
31f18b77 240 assert(!initialized);
7c673cae
FG
241
242 if (!logger) {
243 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
244
245 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
246 PerfCountersBuilder::PRIO_CRITICAL);
247 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
248 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
249 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
250 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
251 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
252
253 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
254 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
255 PerfCountersBuilder::PRIO_CRITICAL);
256 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
257 PerfCountersBuilder::PRIO_CRITICAL);
258 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
259 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
260 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
261
262 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
263 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
264 "Create object operations");
265 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
266 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
267 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
268 "Write full object operations");
269 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
270 "Write same operations");
271 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
272 "Append operation");
273 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
274 "Set object to zero operations");
275 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
276 "Truncate object operations");
277 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
278 "Delete object operations");
279 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
280 "Map extent operations");
281 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
282 "Sparse read operations");
283 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
284 "Clone range operations");
285 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
286 "Get xattr operations");
287 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
288 "Set xattr operations");
289 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
290 "Xattr comparison operations");
291 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
292 "Remove xattr operations");
293 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
294 "Reset xattr operations");
295 pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
296 "TMAP update operations");
297 pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
298 "TMAP put operations");
299 pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
300 "TMAP get operations");
301 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
302 "Call (execute) operations");
303 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
304 "Watch by object operations");
305 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
306 "Notify about object operations");
307 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
308 "Extended attribute comparison in multi operations");
309 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
310 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
311 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
312
313 pcb.add_u64(l_osdc_linger_active, "linger_active",
314 "Active lingering operations");
315 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
316 "Sent lingering operations");
317 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
318 "Resent lingering operations");
319 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
320 "Sent pings to lingering operations");
321
322 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
323 "Active pool operations");
324 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
325 "Sent pool operations");
326 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
327 "Resent pool operations");
328
329 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
330 "Active get pool stat operations");
331 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
332 "Pool stat operations sent");
333 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
334 "Resent pool stats");
335
336 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
337 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
338 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
339 "Resent FS stats");
340
341 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
342 pcb.add_u64_counter(l_osdc_command_send, "command_send",
343 "Sent commands");
344 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
345 "Resent commands");
346
347 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
348 pcb.add_u64_counter(l_osdc_map_full, "map_full",
349 "Full OSD maps received");
350 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
351 "Incremental OSD maps received");
352
353 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
354 "Open sessions"); // open sessions
355 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
356 "Sessions opened");
357 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
358 "Sessions closed");
359 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
360
361 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
362 "OSD OMAP write operations");
363 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
364 "OSD OMAP read operations");
365 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
366 "OSD OMAP delete operations");
367
368 logger = pcb.create_perf_counters();
369 cct->get_perfcounters_collection()->add(logger);
370 }
371
372 m_request_state_hook = new RequestStateHook(this);
373 AdminSocket* admin_socket = cct->get_admin_socket();
374 int ret = admin_socket->register_command("objecter_requests",
375 "objecter_requests",
376 m_request_state_hook,
377 "show in-progress osd requests");
378
379 /* Don't warn on EEXIST, happens if multiple ceph clients
380 * are instantiated from one process */
381 if (ret < 0 && ret != -EEXIST) {
382 lderr(cct) << "error registering admin socket command: "
383 << cpp_strerror(ret) << dendl;
384 }
385
386 update_crush_location();
387
388 cct->_conf->add_observer(this);
389
31f18b77 390 initialized = true;
7c673cae
FG
391}
392
393/*
394 * ok, cluster interaction can happen
395 */
396void Objecter::start(const OSDMap* o)
397{
398 shared_lock rl(rwlock);
399
400 start_tick();
401 if (o) {
402 osdmap->deepish_copy_from(*o);
403 } else if (osdmap->get_epoch() == 0) {
404 _maybe_request_map();
405 }
406}
407
408void Objecter::shutdown()
409{
31f18b77 410 assert(initialized);
7c673cae
FG
411
412 unique_lock wl(rwlock);
413
31f18b77 414 initialized = false;
7c673cae
FG
415
416 cct->_conf->remove_observer(this);
417
418 map<int,OSDSession*>::iterator p;
419 while (!osd_sessions.empty()) {
420 p = osd_sessions.begin();
421 close_session(p->second);
422 }
423
424 while(!check_latest_map_lingers.empty()) {
425 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
426 i->second->put();
427 check_latest_map_lingers.erase(i->first);
428 }
429
430 while(!check_latest_map_ops.empty()) {
431 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
432 i->second->put();
433 check_latest_map_ops.erase(i->first);
434 }
435
436 while(!check_latest_map_commands.empty()) {
437 map<ceph_tid_t, CommandOp*>::iterator i
438 = check_latest_map_commands.begin();
439 i->second->put();
440 check_latest_map_commands.erase(i->first);
441 }
442
443 while(!poolstat_ops.empty()) {
444 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
445 delete i->second;
446 poolstat_ops.erase(i->first);
447 }
448
449 while(!statfs_ops.empty()) {
450 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
451 delete i->second;
452 statfs_ops.erase(i->first);
453 }
454
455 while(!pool_ops.empty()) {
456 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
457 delete i->second;
458 pool_ops.erase(i->first);
459 }
460
461 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
462 while(!homeless_session->linger_ops.empty()) {
463 std::map<uint64_t, LingerOp*>::iterator i
464 = homeless_session->linger_ops.begin();
465 ldout(cct, 10) << " linger_op " << i->first << dendl;
466 LingerOp *lop = i->second;
467 {
468 OSDSession::unique_lock swl(homeless_session->lock);
469 _session_linger_op_remove(homeless_session, lop);
470 }
471 linger_ops.erase(lop->linger_id);
472 linger_ops_set.erase(lop);
473 lop->put();
474 }
475
476 while(!homeless_session->ops.empty()) {
477 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
478 ldout(cct, 10) << " op " << i->first << dendl;
479 Op *op = i->second;
480 {
481 OSDSession::unique_lock swl(homeless_session->lock);
482 _session_op_remove(homeless_session, op);
483 }
484 op->put();
485 }
486
487 while(!homeless_session->command_ops.empty()) {
488 std::map<ceph_tid_t, CommandOp*>::iterator i
489 = homeless_session->command_ops.begin();
490 ldout(cct, 10) << " command_op " << i->first << dendl;
491 CommandOp *cop = i->second;
492 {
493 OSDSession::unique_lock swl(homeless_session->lock);
494 _session_command_op_remove(homeless_session, cop);
495 }
496 cop->put();
497 }
498
499 if (tick_event) {
500 if (timer.cancel_event(tick_event)) {
501 ldout(cct, 10) << " successfully canceled tick" << dendl;
502 }
503 tick_event = 0;
504 }
505
506 if (logger) {
507 cct->get_perfcounters_collection()->remove(logger);
508 delete logger;
509 logger = NULL;
510 }
511
512 // Let go of Objecter write lock so timer thread can shutdown
513 wl.unlock();
514
515 // Outside of lock to avoid cycle WRT calls to RequestStateHook
516 // This is safe because we guarantee no concurrent calls to
517 // shutdown() with the ::initialized check at start.
518 if (m_request_state_hook) {
519 AdminSocket* admin_socket = cct->get_admin_socket();
520 admin_socket->unregister_command("objecter_requests");
521 delete m_request_state_hook;
522 m_request_state_hook = NULL;
523 }
524}
525
526void Objecter::_send_linger(LingerOp *info,
527 shunique_lock& sul)
528{
529 assert(sul.owns_lock() && sul.mutex() == &rwlock);
530
531 vector<OSDOp> opv;
532 Context *oncommit = NULL;
533 LingerOp::shared_lock watchl(info->watch_lock);
534 bufferlist *poutbl = NULL;
535 if (info->registered && info->is_watch) {
536 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
537 << dendl;
538 opv.push_back(OSDOp());
539 opv.back().op.op = CEPH_OSD_OP_WATCH;
540 opv.back().op.watch.cookie = info->get_cookie();
541 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
542 opv.back().op.watch.gen = ++info->register_gen;
543 oncommit = new C_Linger_Reconnect(this, info);
544 } else {
545 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
546 << dendl;
547 opv = info->ops;
548 C_Linger_Commit *c = new C_Linger_Commit(this, info);
549 if (!info->is_watch) {
550 info->notify_id = 0;
551 poutbl = &c->outbl;
552 }
553 oncommit = c;
554 }
555 watchl.unlock();
556 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
557 opv, info->target.flags | CEPH_OSD_FLAG_READ,
558 oncommit, info->pobjver);
559 o->outbl = poutbl;
560 o->snapid = info->snap;
561 o->snapc = info->snapc;
562 o->mtime = info->mtime;
563
564 o->target = info->target;
31f18b77 565 o->tid = ++last_tid;
7c673cae
FG
566
567 // do not resend this; we will send a new op to reregister
568 o->should_resend = false;
569
570 if (info->register_tid) {
571 // repeat send. cancel old registeration op, if any.
572 OSDSession::unique_lock sl(info->session->lock);
573 if (info->session->ops.count(info->register_tid)) {
574 Op *o = info->session->ops[info->register_tid];
575 _op_cancel_map_check(o);
576 _cancel_linger_op(o);
577 }
578 sl.unlock();
579
580 _op_submit(o, sul, &info->register_tid);
581 } else {
582 // first send
583 _op_submit_with_budget(o, sul, &info->register_tid);
584 }
585
586 logger->inc(l_osdc_linger_send);
587}
588
589void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
590{
591 LingerOp::unique_lock wl(info->watch_lock);
592 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
593 if (info->on_reg_commit) {
594 info->on_reg_commit->complete(r);
595 info->on_reg_commit = NULL;
596 }
28e407b8
AA
597 if (r < 0 && info->on_notify_finish) {
598 info->on_notify_finish->complete(r);
599 info->on_notify_finish = nullptr;
600 }
7c673cae
FG
601
602 // only tell the user the first time we do this
603 info->registered = true;
604 info->pobjver = NULL;
605
606 if (!info->is_watch) {
607 // make note of the notify_id
608 bufferlist::iterator p = outbl.begin();
609 try {
610 ::decode(info->notify_id, p);
611 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
612 << dendl;
613 }
614 catch (buffer::error& e) {
615 }
616 }
617}
618
619struct C_DoWatchError : public Context {
620 Objecter *objecter;
621 Objecter::LingerOp *info;
622 int err;
623 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
624 : objecter(o), info(i), err(r) {
625 info->get();
626 info->_queued_async();
627 }
628 void finish(int r) override {
629 Objecter::unique_lock wl(objecter->rwlock);
630 bool canceled = info->canceled;
631 wl.unlock();
632
633 if (!canceled) {
634 info->watch_context->handle_error(info->get_cookie(), err);
635 }
636
637 info->finished_async();
638 info->put();
639 }
640};
641
642int Objecter::_normalize_watch_error(int r)
643{
644 // translate ENOENT -> ENOTCONN so that a delete->disconnection
645 // notification and a failure to reconnect becuase we raced with
646 // the delete appear the same to the user.
647 if (r == -ENOENT)
648 r = -ENOTCONN;
649 return r;
650}
651
652void Objecter::_linger_reconnect(LingerOp *info, int r)
653{
654 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
655 << " (last_error " << info->last_error << ")" << dendl;
656 if (r < 0) {
657 LingerOp::unique_lock wl(info->watch_lock);
658 if (!info->last_error) {
659 r = _normalize_watch_error(r);
660 info->last_error = r;
661 if (info->watch_context) {
662 finisher->queue(new C_DoWatchError(this, info, r));
663 }
664 }
665 wl.unlock();
666 }
667}
668
669void Objecter::_send_linger_ping(LingerOp *info)
670{
671 // rwlock is locked unique
672 // info->session->lock is locked
673
674 if (cct->_conf->objecter_inject_no_watch_ping) {
675 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
676 << dendl;
677 return;
678 }
679 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
680 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
681 return;
682 }
683
684 ceph::mono_time now = ceph::mono_clock::now();
685 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
686 << dendl;
687
688 vector<OSDOp> opv(1);
689 opv[0].op.op = CEPH_OSD_OP_WATCH;
690 opv[0].op.watch.cookie = info->get_cookie();
691 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
692 opv[0].op.watch.gen = info->register_gen;
693 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
694 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
695 opv, info->target.flags | CEPH_OSD_FLAG_READ,
696 onack, NULL, NULL);
697 o->target = info->target;
698 o->should_resend = false;
699 _send_op_account(o);
700 MOSDOp *m = _prepare_osd_op(o);
31f18b77 701 o->tid = ++last_tid;
7c673cae
FG
702 _session_op_assign(info->session, o);
703 _send_op(o, m);
704 info->ping_tid = o->tid;
705
706 onack->sent = now;
707 logger->inc(l_osdc_linger_ping);
708}
709
710void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
711 uint32_t register_gen)
712{
713 LingerOp::unique_lock l(info->watch_lock);
714 ldout(cct, 10) << __func__ << " " << info->linger_id
715 << " sent " << sent << " gen " << register_gen << " = " << r
716 << " (last_error " << info->last_error
717 << " register_gen " << info->register_gen << ")" << dendl;
718 if (info->register_gen == register_gen) {
719 if (r == 0) {
720 info->watch_valid_thru = sent;
721 } else if (r < 0 && !info->last_error) {
722 r = _normalize_watch_error(r);
723 info->last_error = r;
724 if (info->watch_context) {
725 finisher->queue(new C_DoWatchError(this, info, r));
726 }
727 }
728 } else {
729 ldout(cct, 20) << " ignoring old gen" << dendl;
730 }
731}
732
733int Objecter::linger_check(LingerOp *info)
734{
735 LingerOp::shared_lock l(info->watch_lock);
736
737 mono_time stamp = info->watch_valid_thru;
738 if (!info->watch_pending_async.empty())
739 stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
740 auto age = mono_clock::now() - stamp;
741
742 ldout(cct, 10) << __func__ << " " << info->linger_id
743 << " err " << info->last_error
744 << " age " << age << dendl;
745 if (info->last_error)
746 return info->last_error;
747 // return a safe upper bound (we are truncating to ms)
748 return
749 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
750}
751
752void Objecter::linger_cancel(LingerOp *info)
753{
754 unique_lock wl(rwlock);
755 _linger_cancel(info);
756 info->put();
757}
758
759void Objecter::_linger_cancel(LingerOp *info)
760{
761 // rwlock is locked unique
762 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
763 if (!info->canceled) {
764 OSDSession *s = info->session;
765 OSDSession::unique_lock sl(s->lock);
766 _session_linger_op_remove(s, info);
767 sl.unlock();
768
769 linger_ops.erase(info->linger_id);
770 linger_ops_set.erase(info);
771 assert(linger_ops.size() == linger_ops_set.size());
772
773 info->canceled = true;
774 info->put();
775
776 logger->dec(l_osdc_linger_active);
777 }
778}
779
780
781
782Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
783 const object_locator_t& oloc,
784 int flags)
785{
786 LingerOp *info = new LingerOp;
787 info->target.base_oid = oid;
788 info->target.base_oloc = oloc;
789 if (info->target.base_oloc.key == oid)
790 info->target.base_oloc.key.clear();
791 info->target.flags = flags;
792 info->watch_valid_thru = mono_clock::now();
793
794 unique_lock l(rwlock);
795
796 // Acquire linger ID
797 info->linger_id = ++max_linger_id;
798 ldout(cct, 10) << __func__ << " info " << info
799 << " linger_id " << info->linger_id
800 << " cookie " << info->get_cookie()
801 << dendl;
802 linger_ops[info->linger_id] = info;
803 linger_ops_set.insert(info);
804 assert(linger_ops.size() == linger_ops_set.size());
805
806 info->get(); // for the caller
807 return info;
808}
809
810ceph_tid_t Objecter::linger_watch(LingerOp *info,
811 ObjectOperation& op,
812 const SnapContext& snapc,
813 real_time mtime,
814 bufferlist& inbl,
815 Context *oncommit,
816 version_t *objver)
817{
818 info->is_watch = true;
819 info->snapc = snapc;
820 info->mtime = mtime;
821 info->target.flags |= CEPH_OSD_FLAG_WRITE;
822 info->ops = op.ops;
823 info->inbl = inbl;
824 info->poutbl = NULL;
825 info->pobjver = objver;
826 info->on_reg_commit = oncommit;
827
828 shunique_lock sul(rwlock, ceph::acquire_unique);
829 _linger_submit(info, sul);
830 logger->inc(l_osdc_linger_active);
831
832 return info->linger_id;
833}
834
835ceph_tid_t Objecter::linger_notify(LingerOp *info,
836 ObjectOperation& op,
837 snapid_t snap, bufferlist& inbl,
838 bufferlist *poutbl,
839 Context *onfinish,
840 version_t *objver)
841{
842 info->snap = snap;
843 info->target.flags |= CEPH_OSD_FLAG_READ;
844 info->ops = op.ops;
845 info->inbl = inbl;
846 info->poutbl = poutbl;
847 info->pobjver = objver;
848 info->on_reg_commit = onfinish;
849
850 shunique_lock sul(rwlock, ceph::acquire_unique);
851 _linger_submit(info, sul);
852 logger->inc(l_osdc_linger_active);
853
854 return info->linger_id;
855}
856
857void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
858{
859 assert(sul.owns_lock() && sul.mutex() == &rwlock);
860 assert(info->linger_id);
861
862 // Populate Op::target
863 OSDSession *s = NULL;
864 _calc_target(&info->target, nullptr);
865
866 // Create LingerOp<->OSDSession relation
867 int r = _get_session(info->target.osd, &s, sul);
868 assert(r == 0);
869 OSDSession::unique_lock sl(s->lock);
870 _session_linger_op_assign(s, info);
871 sl.unlock();
872 put_session(s);
873
874 _send_linger(info, sul);
875}
876
877struct C_DoWatchNotify : public Context {
878 Objecter *objecter;
879 Objecter::LingerOp *info;
880 MWatchNotify *msg;
881 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
882 : objecter(o), info(i), msg(m) {
883 info->get();
884 info->_queued_async();
885 msg->get();
886 }
887 void finish(int r) override {
888 objecter->_do_watch_notify(info, msg);
889 }
890};
891
892void Objecter::handle_watch_notify(MWatchNotify *m)
893{
894 shared_lock l(rwlock);
31f18b77 895 if (!initialized) {
7c673cae
FG
896 return;
897 }
898
899 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
900 if (linger_ops_set.count(info) == 0) {
901 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
902 return;
903 }
904 LingerOp::unique_lock wl(info->watch_lock);
905 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
906 if (!info->last_error) {
907 info->last_error = -ENOTCONN;
908 if (info->watch_context) {
909 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
910 }
911 }
912 } else if (!info->is_watch) {
913 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
914 // since we know the only user (librados) is safe to call in
915 // fast-dispatch context
916 if (info->notify_id &&
917 info->notify_id != m->notify_id) {
918 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
919 << " != " << info->notify_id << ", ignoring" << dendl;
920 } else if (info->on_notify_finish) {
921 info->notify_result_bl->claim(m->get_data());
922 info->on_notify_finish->complete(m->return_code);
923
924 // if we race with reconnect we might get a second notify; only
925 // notify the caller once!
926 info->on_notify_finish = NULL;
927 }
928 } else {
929 finisher->queue(new C_DoWatchNotify(this, info, m));
930 }
931}
932
933void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
934{
935 ldout(cct, 10) << __func__ << " " << *m << dendl;
936
937 shared_lock l(rwlock);
31f18b77 938 assert(initialized);
7c673cae
FG
939
940 if (info->canceled) {
941 l.unlock();
942 goto out;
943 }
944
945 // notify completion?
946 assert(info->is_watch);
947 assert(info->watch_context);
948 assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
949
950 l.unlock();
951
952 switch (m->opcode) {
953 case CEPH_WATCH_EVENT_NOTIFY:
954 info->watch_context->handle_notify(m->notify_id, m->cookie,
955 m->notifier_gid, m->bl);
956 break;
957 }
958
959 out:
960 info->finished_async();
961 info->put();
962 m->put();
963}
964
965bool Objecter::ms_dispatch(Message *m)
966{
967 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
31f18b77 968 if (!initialized)
7c673cae
FG
969 return false;
970
971 switch (m->get_type()) {
972 // these we exlusively handle
973 case CEPH_MSG_OSD_OPREPLY:
974 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
975 return true;
976
977 case CEPH_MSG_OSD_BACKOFF:
978 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
979 return true;
980
981 case CEPH_MSG_WATCH_NOTIFY:
982 handle_watch_notify(static_cast<MWatchNotify*>(m));
983 m->put();
984 return true;
985
986 case MSG_COMMAND_REPLY:
987 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
988 handle_command_reply(static_cast<MCommandReply*>(m));
989 return true;
990 } else {
991 return false;
992 }
993
994 case MSG_GETPOOLSTATSREPLY:
995 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
996 return true;
997
998 case CEPH_MSG_POOLOP_REPLY:
999 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1000 return true;
1001
1002 case CEPH_MSG_STATFS_REPLY:
1003 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1004 return true;
1005
1006 // these we give others a chance to inspect
1007
1008 // MDS, OSD
1009 case CEPH_MSG_OSD_MAP:
1010 handle_osd_map(static_cast<MOSDMap*>(m));
1011 return false;
1012 }
1013 return false;
1014}
1015
1016void Objecter::_scan_requests(OSDSession *s,
1017 bool force_resend,
1018 bool cluster_full,
1019 map<int64_t, bool> *pool_full_map,
1020 map<ceph_tid_t, Op*>& need_resend,
1021 list<LingerOp*>& need_resend_linger,
1022 map<ceph_tid_t, CommandOp*>& need_resend_command,
1023 shunique_lock& sul)
1024{
1025 assert(sul.owns_lock() && sul.mutex() == &rwlock);
1026
1027 list<LingerOp*> unregister_lingers;
1028
1029 OSDSession::unique_lock sl(s->lock);
1030
1031 // check for changed linger mappings (_before_ regular ops)
1032 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1033 while (lp != s->linger_ops.end()) {
1034 LingerOp *op = lp->second;
1035 assert(op->session == s);
1036 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1037 // invalidation
1038 ++lp;
1039 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1040 bool unregister, force_resend_writes = cluster_full;
1041 int r = _recalc_linger_op_target(op, sul);
1042 if (pool_full_map)
1043 force_resend_writes = force_resend_writes ||
1044 (*pool_full_map)[op->target.base_oloc.pool];
1045 switch (r) {
1046 case RECALC_OP_TARGET_NO_ACTION:
1047 if (!force_resend && !force_resend_writes)
1048 break;
1049 // -- fall-thru --
1050 case RECALC_OP_TARGET_NEED_RESEND:
1051 need_resend_linger.push_back(op);
1052 _linger_cancel_map_check(op);
1053 break;
1054 case RECALC_OP_TARGET_POOL_DNE:
1055 _check_linger_pool_dne(op, &unregister);
1056 if (unregister) {
1057 ldout(cct, 10) << " need to unregister linger op "
1058 << op->linger_id << dendl;
1059 op->get();
1060 unregister_lingers.push_back(op);
1061 }
1062 break;
1063 }
1064 }
1065
1066 // check for changed request mappings
1067 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1068 while (p != s->ops.end()) {
1069 Op *op = p->second;
1070 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1071 ldout(cct, 10) << " checking op " << op->tid << dendl;
1072 bool force_resend_writes = cluster_full;
1073 if (pool_full_map)
1074 force_resend_writes = force_resend_writes ||
1075 (*pool_full_map)[op->target.base_oloc.pool];
1076 int r = _calc_target(&op->target,
1077 op->session ? op->session->con.get() : nullptr);
1078 switch (r) {
1079 case RECALC_OP_TARGET_NO_ACTION:
1080 if (!force_resend && !(force_resend_writes && op->respects_full()))
1081 break;
1082 // -- fall-thru --
1083 case RECALC_OP_TARGET_NEED_RESEND:
1084 if (op->session) {
1085 _session_op_remove(op->session, op);
1086 }
1087 need_resend[op->tid] = op;
1088 _op_cancel_map_check(op);
1089 break;
1090 case RECALC_OP_TARGET_POOL_DNE:
1091 _check_op_pool_dne(op, &sl);
1092 break;
1093 }
1094 }
1095
1096 // commands
1097 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1098 while (cp != s->command_ops.end()) {
1099 CommandOp *c = cp->second;
1100 ++cp;
1101 ldout(cct, 10) << " checking command " << c->tid << dendl;
1102 bool force_resend_writes = cluster_full;
1103 if (pool_full_map)
1104 force_resend_writes = force_resend_writes ||
1105 (*pool_full_map)[c->target_pg.pool()];
1106 int r = _calc_command_target(c, sul);
1107 switch (r) {
1108 case RECALC_OP_TARGET_NO_ACTION:
1109 // resend if skipped map; otherwise do nothing.
1110 if (!force_resend && !force_resend_writes)
1111 break;
1112 // -- fall-thru --
1113 case RECALC_OP_TARGET_NEED_RESEND:
1114 need_resend_command[c->tid] = c;
1115 if (c->session) {
1116 _session_command_op_remove(c->session, c);
1117 }
1118 _command_cancel_map_check(c);
1119 break;
1120 case RECALC_OP_TARGET_POOL_DNE:
1121 case RECALC_OP_TARGET_OSD_DNE:
1122 case RECALC_OP_TARGET_OSD_DOWN:
1123 _check_command_map_dne(c);
1124 break;
1125 }
1126 }
1127
1128 sl.unlock();
1129
1130 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1131 iter != unregister_lingers.end();
1132 ++iter) {
1133 _linger_cancel(*iter);
1134 (*iter)->put();
1135 }
1136}
1137
1138void Objecter::handle_osd_map(MOSDMap *m)
1139{
1140 shunique_lock sul(rwlock, acquire_unique);
31f18b77 1141 if (!initialized)
7c673cae
FG
1142 return;
1143
1144 assert(osdmap);
1145
1146 if (m->fsid != monc->get_fsid()) {
1147 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1148 << " != " << monc->get_fsid() << dendl;
1149 return;
1150 }
1151
1152 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1153 bool cluster_full = _osdmap_full_flag();
1154 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1155 _osdmap_has_pool_full();
1156 map<int64_t, bool> pool_full_map;
1157 for (map<int64_t, pg_pool_t>::const_iterator it
1158 = osdmap->get_pools().begin();
1159 it != osdmap->get_pools().end(); ++it)
1160 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1161
1162
1163 list<LingerOp*> need_resend_linger;
1164 map<ceph_tid_t, Op*> need_resend;
1165 map<ceph_tid_t, CommandOp*> need_resend_command;
1166
1167 if (m->get_last() <= osdmap->get_epoch()) {
1168 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1169 << m->get_first() << "," << m->get_last()
1170 << "] <= " << osdmap->get_epoch() << dendl;
1171 } else {
1172 ldout(cct, 3) << "handle_osd_map got epochs ["
1173 << m->get_first() << "," << m->get_last()
1174 << "] > " << osdmap->get_epoch() << dendl;
1175
1176 if (osdmap->get_epoch()) {
1177 bool skipped_map = false;
1178 // we want incrementals
1179 for (epoch_t e = osdmap->get_epoch() + 1;
1180 e <= m->get_last();
1181 e++) {
1182
1183 if (osdmap->get_epoch() == e-1 &&
1184 m->incremental_maps.count(e)) {
1185 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1186 << dendl;
1187 OSDMap::Incremental inc(m->incremental_maps[e]);
1188 osdmap->apply_incremental(inc);
31f18b77
FG
1189
1190 emit_blacklist_events(inc);
1191
7c673cae
FG
1192 logger->inc(l_osdc_map_inc);
1193 }
1194 else if (m->maps.count(e)) {
1195 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
31f18b77
FG
1196 OSDMap *new_osdmap = new OSDMap();
1197 new_osdmap->decode(m->maps[e]);
1198
1199 emit_blacklist_events(*osdmap, *new_osdmap);
1200
1201 osdmap = new_osdmap;
1202
7c673cae
FG
1203 logger->inc(l_osdc_map_full);
1204 }
1205 else {
1206 if (e >= m->get_oldest()) {
1207 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1208 << osdmap->get_epoch()+1 << dendl;
1209 _maybe_request_map();
1210 break;
1211 }
1212 ldout(cct, 3) << "handle_osd_map missing epoch "
1213 << osdmap->get_epoch()+1
1214 << ", jumping to " << m->get_oldest() << dendl;
1215 e = m->get_oldest() - 1;
1216 skipped_map = true;
1217 continue;
1218 }
1219 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1220
1221 cluster_full = cluster_full || _osdmap_full_flag();
1222 update_pool_full_map(pool_full_map);
1223
1224 // check all outstanding requests on every epoch
1225 _scan_requests(homeless_session, skipped_map, cluster_full,
1226 &pool_full_map, need_resend,
1227 need_resend_linger, need_resend_command, sul);
1228 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1229 p != osd_sessions.end(); ) {
1230 OSDSession *s = p->second;
1231 _scan_requests(s, skipped_map, cluster_full,
1232 &pool_full_map, need_resend,
1233 need_resend_linger, need_resend_command, sul);
1234 ++p;
1235 // osd down or addr change?
1236 if (!osdmap->is_up(s->osd) ||
1237 (s->con &&
1238 s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
1239 close_session(s);
1240 }
1241 }
1242
1243 assert(e == osdmap->get_epoch());
1244 }
1245
1246 } else {
1247 // first map. we want the full thing.
1248 if (m->maps.count(m->get_last())) {
1249 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1250 p != osd_sessions.end(); ++p) {
1251 OSDSession *s = p->second;
1252 _scan_requests(s, false, false, NULL, need_resend,
1253 need_resend_linger, need_resend_command, sul);
1254 }
1255 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1256 << m->get_last() << dendl;
1257 osdmap->decode(m->maps[m->get_last()]);
1258
1259 _scan_requests(homeless_session, false, false, NULL,
1260 need_resend, need_resend_linger,
1261 need_resend_command, sul);
1262 } else {
1263 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1264 << dendl;
1265 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1266 monc->renew_subs();
1267 }
1268 }
1269 }
1270
1271 // make sure need_resend targets reflect latest map
1272 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1273 Op *op = p->second;
1274 if (op->target.epoch < osdmap->get_epoch()) {
1275 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1276 int r = _calc_target(&op->target, nullptr);
1277 if (r == RECALC_OP_TARGET_POOL_DNE) {
1278 p = need_resend.erase(p);
1279 _check_op_pool_dne(op, nullptr);
1280 } else {
1281 ++p;
1282 }
1283 } else {
1284 ++p;
1285 }
1286 }
1287
1288 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1289 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1290 || _osdmap_has_pool_full();
1291
1292 // was/is paused?
1293 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1294 osdmap->get_epoch() < epoch_barrier) {
1295 _maybe_request_map();
1296 }
1297
1298 // resend requests
1299 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1300 p != need_resend.end(); ++p) {
1301 Op *op = p->second;
1302 OSDSession *s = op->session;
1303 bool mapped_session = false;
1304 if (!s) {
1305 int r = _map_session(&op->target, &s, sul);
1306 assert(r == 0);
1307 mapped_session = true;
1308 } else {
1309 get_session(s);
1310 }
1311 OSDSession::unique_lock sl(s->lock);
1312 if (mapped_session) {
1313 _session_op_assign(s, op);
1314 }
1315 if (op->should_resend) {
1316 if (!op->session->is_homeless() && !op->target.paused) {
1317 logger->inc(l_osdc_op_resend);
1318 _send_op(op);
1319 }
1320 } else {
1321 _op_cancel_map_check(op);
1322 _cancel_linger_op(op);
1323 }
1324 sl.unlock();
1325 put_session(s);
1326 }
1327 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1328 p != need_resend_linger.end(); ++p) {
1329 LingerOp *op = *p;
1330 if (!op->session) {
1331 _calc_target(&op->target, nullptr);
1332 OSDSession *s = NULL;
d2e6a577 1333 const int r = _get_session(op->target.osd, &s, sul);
7c673cae
FG
1334 assert(r == 0);
1335 assert(s != NULL);
1336 op->session = s;
1337 put_session(s);
1338 }
1339 if (!op->session->is_homeless()) {
1340 logger->inc(l_osdc_linger_resend);
1341 _send_linger(op, sul);
1342 }
1343 }
1344 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1345 p != need_resend_command.end(); ++p) {
1346 CommandOp *c = p->second;
1347 if (c->target.osd >= 0) {
1348 _assign_command_session(c, sul);
1349 if (c->session && !c->session->is_homeless()) {
1350 _send_command(c);
1351 }
1352 }
1353 }
1354
1355 _dump_active();
1356
1357 // finish any Contexts that were waiting on a map update
1358 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1359 waiting_for_map.begin();
1360 while (p != waiting_for_map.end() &&
1361 p->first <= osdmap->get_epoch()) {
1362 //go through the list and call the onfinish methods
1363 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1364 i != p->second.end(); ++i) {
1365 i->first->complete(i->second);
1366 }
1367 waiting_for_map.erase(p++);
1368 }
1369
1370 monc->sub_got("osdmap", osdmap->get_epoch());
1371
1372 if (!waiting_for_map.empty()) {
1373 _maybe_request_map();
1374 }
1375}
1376
31f18b77
FG
1377void Objecter::enable_blacklist_events()
1378{
1379 unique_lock wl(rwlock);
1380
1381 blacklist_events_enabled = true;
1382}
1383
1384void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1385{
1386 unique_lock wl(rwlock);
1387
1388 if (events->empty()) {
1389 events->swap(blacklist_events);
1390 } else {
1391 for (const auto &i : blacklist_events) {
1392 events->insert(i);
1393 }
1394 blacklist_events.clear();
1395 }
1396}
1397
1398void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1399{
1400 if (!blacklist_events_enabled) {
1401 return;
1402 }
1403
1404 for (const auto &i : inc.new_blacklist) {
1405 blacklist_events.insert(i.first);
1406 }
1407}
1408
1409void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1410 const OSDMap &new_osd_map)
1411{
1412 if (!blacklist_events_enabled) {
1413 return;
1414 }
1415
1416 std::set<entity_addr_t> old_set;
1417 std::set<entity_addr_t> new_set;
1418
1419 old_osd_map.get_blacklist(&old_set);
1420 new_osd_map.get_blacklist(&new_set);
1421
1422 std::set<entity_addr_t> delta_set;
1423 std::set_difference(
1424 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1425 std::inserter(delta_set, delta_set.begin()));
1426 blacklist_events.insert(delta_set.begin(), delta_set.end());
1427}
1428
7c673cae
FG
1429// op pool check
1430
1431void Objecter::C_Op_Map_Latest::finish(int r)
1432{
1433 if (r == -EAGAIN || r == -ECANCELED)
1434 return;
1435
1436 lgeneric_subdout(objecter->cct, objecter, 10)
1437 << "op_map_latest r=" << r << " tid=" << tid
1438 << " latest " << latest << dendl;
1439
1440 Objecter::unique_lock wl(objecter->rwlock);
1441
1442 map<ceph_tid_t, Op*>::iterator iter =
1443 objecter->check_latest_map_ops.find(tid);
1444 if (iter == objecter->check_latest_map_ops.end()) {
1445 lgeneric_subdout(objecter->cct, objecter, 10)
1446 << "op_map_latest op "<< tid << " not found" << dendl;
1447 return;
1448 }
1449
1450 Op *op = iter->second;
1451 objecter->check_latest_map_ops.erase(iter);
1452
1453 lgeneric_subdout(objecter->cct, objecter, 20)
1454 << "op_map_latest op "<< op << dendl;
1455
1456 if (op->map_dne_bound == 0)
1457 op->map_dne_bound = latest;
1458
1459 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1460 objecter->_check_op_pool_dne(op, &sl);
1461
1462 op->put();
1463}
1464
1465int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1466 snapid_t *snap) const
1467{
1468 shared_lock rl(rwlock);
1469
1470 auto& pools = osdmap->get_pools();
1471 auto iter = pools.find(poolid);
1472 if (iter == pools.end()) {
1473 return -ENOENT;
1474 }
1475 const pg_pool_t& pg_pool = iter->second;
1476 for (auto p = pg_pool.snaps.begin();
1477 p != pg_pool.snaps.end();
1478 ++p) {
1479 if (p->second.name == snap_name) {
1480 *snap = p->first;
1481 return 0;
1482 }
1483 }
1484 return -ENOENT;
1485}
1486
1487int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1488 pool_snap_info_t *info) const
1489{
1490 shared_lock rl(rwlock);
1491
1492 auto& pools = osdmap->get_pools();
1493 auto iter = pools.find(poolid);
1494 if (iter == pools.end()) {
1495 return -ENOENT;
1496 }
1497 const pg_pool_t& pg_pool = iter->second;
1498 auto p = pg_pool.snaps.find(snap);
1499 if (p == pg_pool.snaps.end())
1500 return -ENOENT;
1501 *info = p->second;
1502
1503 return 0;
1504}
1505
1506int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1507{
1508 shared_lock rl(rwlock);
1509
1510 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1511 if (!pi)
1512 return -ENOENT;
1513 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1514 p != pi->snaps.end();
1515 ++p) {
1516 snaps->push_back(p->first);
1517 }
1518 return 0;
1519}
1520
1521// sl may be unlocked.
1522void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1523{
1524 // rwlock is locked unique
1525
1526 if (op->target.pool_ever_existed) {
1527 // the pool previously existed and now it does not, which means it
1528 // was deleted.
1529 op->map_dne_bound = osdmap->get_epoch();
1530 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1531 << " pool previously exists but now does not"
1532 << dendl;
1533 } else {
1534 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1535 << " current " << osdmap->get_epoch()
1536 << " map_dne_bound " << op->map_dne_bound
1537 << dendl;
1538 }
1539 if (op->map_dne_bound > 0) {
1540 if (osdmap->get_epoch() >= op->map_dne_bound) {
1541 // we had a new enough map
1542 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1543 << " concluding pool " << op->target.base_pgid.pool()
1544 << " dne" << dendl;
1545 if (op->onfinish) {
1546 op->onfinish->complete(-ENOENT);
1547 }
1548
1549 OSDSession *s = op->session;
1550 if (s) {
1551 assert(s != NULL);
1552 assert(sl->mutex() == &s->lock);
1553 bool session_locked = sl->owns_lock();
1554 if (!session_locked) {
1555 sl->lock();
1556 }
1557 _finish_op(op, 0);
1558 if (!session_locked) {
1559 sl->unlock();
1560 }
1561 } else {
1562 _finish_op(op, 0); // no session
1563 }
1564 }
1565 } else {
1566 _send_op_map_check(op);
1567 }
1568}
1569
1570void Objecter::_send_op_map_check(Op *op)
1571{
1572 // rwlock is locked unique
1573 // ask the monitor
1574 if (check_latest_map_ops.count(op->tid) == 0) {
1575 op->get();
1576 check_latest_map_ops[op->tid] = op;
1577 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1578 monc->get_version("osdmap", &c->latest, NULL, c);
1579 }
1580}
1581
1582void Objecter::_op_cancel_map_check(Op *op)
1583{
1584 // rwlock is locked unique
1585 map<ceph_tid_t, Op*>::iterator iter =
1586 check_latest_map_ops.find(op->tid);
1587 if (iter != check_latest_map_ops.end()) {
1588 Op *op = iter->second;
1589 op->put();
1590 check_latest_map_ops.erase(iter);
1591 }
1592}
1593
1594// linger pool check
1595
1596void Objecter::C_Linger_Map_Latest::finish(int r)
1597{
1598 if (r == -EAGAIN || r == -ECANCELED) {
1599 // ignore callback; we will retry in resend_mon_ops()
1600 return;
1601 }
1602
1603 unique_lock wl(objecter->rwlock);
1604
1605 map<uint64_t, LingerOp*>::iterator iter =
1606 objecter->check_latest_map_lingers.find(linger_id);
1607 if (iter == objecter->check_latest_map_lingers.end()) {
1608 return;
1609 }
1610
1611 LingerOp *op = iter->second;
1612 objecter->check_latest_map_lingers.erase(iter);
1613
1614 if (op->map_dne_bound == 0)
1615 op->map_dne_bound = latest;
1616
1617 bool unregister;
1618 objecter->_check_linger_pool_dne(op, &unregister);
1619
1620 if (unregister) {
1621 objecter->_linger_cancel(op);
1622 }
1623
1624 op->put();
1625}
1626
1627void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1628{
1629 // rwlock is locked unique
1630
1631 *need_unregister = false;
1632
1633 if (op->register_gen > 0) {
1634 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1635 << " pool previously existed but now does not"
1636 << dendl;
1637 op->map_dne_bound = osdmap->get_epoch();
1638 } else {
1639 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1640 << " current " << osdmap->get_epoch()
1641 << " map_dne_bound " << op->map_dne_bound
1642 << dendl;
1643 }
1644 if (op->map_dne_bound > 0) {
1645 if (osdmap->get_epoch() >= op->map_dne_bound) {
28e407b8 1646 LingerOp::unique_lock wl{op->watch_lock};
7c673cae
FG
1647 if (op->on_reg_commit) {
1648 op->on_reg_commit->complete(-ENOENT);
28e407b8
AA
1649 op->on_reg_commit = nullptr;
1650 }
1651 if (op->on_notify_finish) {
1652 op->on_notify_finish->complete(-ENOENT);
1653 op->on_notify_finish = nullptr;
7c673cae
FG
1654 }
1655 *need_unregister = true;
1656 }
1657 } else {
1658 _send_linger_map_check(op);
1659 }
1660}
1661
1662void Objecter::_send_linger_map_check(LingerOp *op)
1663{
1664 // ask the monitor
1665 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1666 op->get();
1667 check_latest_map_lingers[op->linger_id] = op;
1668 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1669 monc->get_version("osdmap", &c->latest, NULL, c);
1670 }
1671}
1672
1673void Objecter::_linger_cancel_map_check(LingerOp *op)
1674{
1675 // rwlock is locked unique
1676
1677 map<uint64_t, LingerOp*>::iterator iter =
1678 check_latest_map_lingers.find(op->linger_id);
1679 if (iter != check_latest_map_lingers.end()) {
1680 LingerOp *op = iter->second;
1681 op->put();
1682 check_latest_map_lingers.erase(iter);
1683 }
1684}
1685
1686// command pool check
1687
1688void Objecter::C_Command_Map_Latest::finish(int r)
1689{
1690 if (r == -EAGAIN || r == -ECANCELED) {
1691 // ignore callback; we will retry in resend_mon_ops()
1692 return;
1693 }
1694
1695 unique_lock wl(objecter->rwlock);
1696
1697 map<uint64_t, CommandOp*>::iterator iter =
1698 objecter->check_latest_map_commands.find(tid);
1699 if (iter == objecter->check_latest_map_commands.end()) {
1700 return;
1701 }
1702
1703 CommandOp *c = iter->second;
1704 objecter->check_latest_map_commands.erase(iter);
1705
1706 if (c->map_dne_bound == 0)
1707 c->map_dne_bound = latest;
1708
28e407b8 1709 OSDSession::unique_lock sul(c->session->lock);
7c673cae 1710 objecter->_check_command_map_dne(c);
28e407b8 1711 sul.unlock();
7c673cae
FG
1712
1713 c->put();
1714}
1715
1716void Objecter::_check_command_map_dne(CommandOp *c)
1717{
1718 // rwlock is locked unique
28e407b8 1719 // session is locked unique
7c673cae
FG
1720
1721 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1722 << " current " << osdmap->get_epoch()
1723 << " map_dne_bound " << c->map_dne_bound
1724 << dendl;
1725 if (c->map_dne_bound > 0) {
1726 if (osdmap->get_epoch() >= c->map_dne_bound) {
1727 _finish_command(c, c->map_check_error, c->map_check_error_str);
1728 }
1729 } else {
1730 _send_command_map_check(c);
1731 }
1732}
1733
1734void Objecter::_send_command_map_check(CommandOp *c)
1735{
1736 // rwlock is locked unique
28e407b8 1737 // session is locked unique
7c673cae
FG
1738
1739 // ask the monitor
1740 if (check_latest_map_commands.count(c->tid) == 0) {
1741 c->get();
1742 check_latest_map_commands[c->tid] = c;
1743 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1744 monc->get_version("osdmap", &f->latest, NULL, f);
1745 }
1746}
1747
1748void Objecter::_command_cancel_map_check(CommandOp *c)
1749{
1750 // rwlock is locked uniqe
1751
1752 map<uint64_t, CommandOp*>::iterator iter =
1753 check_latest_map_commands.find(c->tid);
1754 if (iter != check_latest_map_commands.end()) {
1755 CommandOp *c = iter->second;
1756 c->put();
1757 check_latest_map_commands.erase(iter);
1758 }
1759}
1760
1761
1762/**
1763 * Look up OSDSession by OSD id.
1764 *
1765 * @returns 0 on success, or -EAGAIN if the lock context requires
1766 * promotion to write.
1767 */
1768int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1769{
1770 assert(sul && sul.mutex() == &rwlock);
1771
1772 if (osd < 0) {
1773 *session = homeless_session;
1774 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1775 << dendl;
1776 return 0;
1777 }
1778
1779 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1780 if (p != osd_sessions.end()) {
1781 OSDSession *s = p->second;
1782 s->get();
1783 *session = s;
1784 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1785 << s->get_nref() << dendl;
1786 return 0;
1787 }
1788 if (!sul.owns_lock()) {
1789 return -EAGAIN;
1790 }
1791 OSDSession *s = new OSDSession(cct, osd);
1792 osd_sessions[osd] = s;
1793 s->con = messenger->get_connection(osdmap->get_inst(osd));
1794 s->con->set_priv(s->get());
1795 logger->inc(l_osdc_osd_session_open);
1796 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1797 s->get();
1798 *session = s;
1799 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1800 << s->get_nref() << dendl;
1801 return 0;
1802}
1803
1804void Objecter::put_session(Objecter::OSDSession *s)
1805{
1806 if (s && !s->is_homeless()) {
1807 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1808 << s->get_nref() << dendl;
1809 s->put();
1810 }
1811}
1812
1813void Objecter::get_session(Objecter::OSDSession *s)
1814{
1815 assert(s != NULL);
1816
1817 if (!s->is_homeless()) {
1818 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1819 << s->get_nref() << dendl;
1820 s->get();
1821 }
1822}
1823
1824void Objecter::_reopen_session(OSDSession *s)
1825{
1826 // s->lock is locked
1827
1828 entity_inst_t inst = osdmap->get_inst(s->osd);
1829 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
1830 << inst << dendl;
1831 if (s->con) {
1832 s->con->set_priv(NULL);
1833 s->con->mark_down();
1834 logger->inc(l_osdc_osd_session_close);
1835 }
1836 s->con = messenger->get_connection(inst);
1837 s->con->set_priv(s->get());
1838 s->incarnation++;
1839 logger->inc(l_osdc_osd_session_open);
1840}
1841
1842void Objecter::close_session(OSDSession *s)
1843{
1844 // rwlock is locked unique
1845
1846 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1847 if (s->con) {
1848 s->con->set_priv(NULL);
1849 s->con->mark_down();
1850 logger->inc(l_osdc_osd_session_close);
1851 }
1852 OSDSession::unique_lock sl(s->lock);
1853
1854 std::list<LingerOp*> homeless_lingers;
1855 std::list<CommandOp*> homeless_commands;
1856 std::list<Op*> homeless_ops;
1857
1858 while (!s->linger_ops.empty()) {
1859 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1860 ldout(cct, 10) << " linger_op " << i->first << dendl;
1861 homeless_lingers.push_back(i->second);
1862 _session_linger_op_remove(s, i->second);
1863 }
1864
1865 while (!s->ops.empty()) {
1866 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1867 ldout(cct, 10) << " op " << i->first << dendl;
1868 homeless_ops.push_back(i->second);
1869 _session_op_remove(s, i->second);
1870 }
1871
1872 while (!s->command_ops.empty()) {
1873 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1874 ldout(cct, 10) << " command_op " << i->first << dendl;
1875 homeless_commands.push_back(i->second);
1876 _session_command_op_remove(s, i->second);
1877 }
1878
1879 osd_sessions.erase(s->osd);
1880 sl.unlock();
1881 put_session(s);
1882
1883 // Assign any leftover ops to the homeless session
1884 {
1885 OSDSession::unique_lock hsl(homeless_session->lock);
1886 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1887 i != homeless_lingers.end(); ++i) {
1888 _session_linger_op_assign(homeless_session, *i);
1889 }
1890 for (std::list<Op*>::iterator i = homeless_ops.begin();
1891 i != homeless_ops.end(); ++i) {
1892 _session_op_assign(homeless_session, *i);
1893 }
1894 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1895 i != homeless_commands.end(); ++i) {
1896 _session_command_op_assign(homeless_session, *i);
1897 }
1898 }
1899
1900 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1901}
1902
1903void Objecter::wait_for_osd_map()
1904{
1905 unique_lock l(rwlock);
1906 if (osdmap->get_epoch()) {
1907 l.unlock();
1908 return;
1909 }
1910
1911 // Leave this since it goes with C_SafeCond
1912 Mutex lock("");
1913 Cond cond;
1914 bool done;
1915 lock.Lock();
1916 C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
1917 waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
1918 l.unlock();
1919 while (!done)
1920 cond.Wait(lock);
1921 lock.Unlock();
1922}
1923
1924struct C_Objecter_GetVersion : public Context {
1925 Objecter *objecter;
1926 uint64_t oldest, newest;
1927 Context *fin;
1928 C_Objecter_GetVersion(Objecter *o, Context *c)
1929 : objecter(o), oldest(0), newest(0), fin(c) {}
1930 void finish(int r) override {
1931 if (r >= 0) {
1932 objecter->get_latest_version(oldest, newest, fin);
1933 } else if (r == -EAGAIN) { // try again as instructed
1934 objecter->wait_for_latest_osdmap(fin);
1935 } else {
1936 // it doesn't return any other error codes!
1937 ceph_abort();
1938 }
1939 }
1940};
1941
1942void Objecter::wait_for_latest_osdmap(Context *fin)
1943{
1944 ldout(cct, 10) << __func__ << dendl;
1945 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1946 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1947}
1948
1949void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1950{
1951 unique_lock wl(rwlock);
1952 _get_latest_version(oldest, newest, fin);
1953}
1954
1955void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1956 Context *fin)
1957{
1958 // rwlock is locked unique
1959 if (osdmap->get_epoch() >= newest) {
1960 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1961 if (fin)
1962 fin->complete(0);
1963 return;
1964 }
1965
1966 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1967 _wait_for_new_map(fin, newest, 0);
1968}
1969
1970void Objecter::maybe_request_map()
1971{
1972 shared_lock rl(rwlock);
1973 _maybe_request_map();
1974}
1975
1976void Objecter::_maybe_request_map()
1977{
1978 // rwlock is locked
1979 int flag = 0;
1980 if (_osdmap_full_flag()
1981 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1982 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1983 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1984 "osd map (FULL flag is set)" << dendl;
1985 } else {
1986 ldout(cct, 10)
1987 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1988 flag = CEPH_SUBSCRIBE_ONETIME;
1989 }
1990 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1991 if (monc->sub_want("osdmap", epoch, flag)) {
1992 monc->renew_subs();
1993 }
1994}
1995
1996void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
1997{
1998 // rwlock is locked unique
1999 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
2000 _maybe_request_map();
2001}
2002
2003
2004/**
2005 * Use this together with wait_for_map: this is a pre-check to avoid
2006 * allocating a Context for wait_for_map if we can see that we
2007 * definitely already have the epoch.
2008 *
2009 * This does *not* replace the need to handle the return value of
2010 * wait_for_map: just because we don't have it in this pre-check
2011 * doesn't mean we won't have it when calling back into wait_for_map,
2012 * since the objecter lock is dropped in between.
2013 */
2014bool Objecter::have_map(const epoch_t epoch)
2015{
2016 shared_lock rl(rwlock);
2017 if (osdmap->get_epoch() >= epoch) {
2018 return true;
2019 } else {
2020 return false;
2021 }
2022}
2023
2024bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2025{
2026 unique_lock wl(rwlock);
2027 if (osdmap->get_epoch() >= epoch) {
2028 return true;
2029 }
2030 _wait_for_new_map(c, epoch, err);
2031 return false;
2032}
2033
2034void Objecter::kick_requests(OSDSession *session)
2035{
2036 ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
2037
2038 map<uint64_t, LingerOp *> lresend;
2039 unique_lock wl(rwlock);
2040
2041 OSDSession::unique_lock sl(session->lock);
2042 _kick_requests(session, lresend);
2043 sl.unlock();
2044
2045 _linger_ops_resend(lresend, wl);
2046}
2047
2048void Objecter::_kick_requests(OSDSession *session,
2049 map<uint64_t, LingerOp *>& lresend)
2050{
2051 // rwlock is locked unique
2052
2053 // clear backoffs
2054 session->backoffs.clear();
2055 session->backoffs_by_id.clear();
2056
2057 // resend ops
2058 map<ceph_tid_t,Op*> resend; // resend in tid order
2059 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2060 p != session->ops.end();) {
2061 Op *op = p->second;
2062 ++p;
2063 logger->inc(l_osdc_op_resend);
2064 if (op->should_resend) {
2065 if (!op->target.paused)
2066 resend[op->tid] = op;
2067 } else {
2068 _op_cancel_map_check(op);
2069 _cancel_linger_op(op);
2070 }
2071 }
2072
2073 while (!resend.empty()) {
2074 _send_op(resend.begin()->second);
2075 resend.erase(resend.begin());
2076 }
2077
2078 // resend lingers
2079 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2080 j != session->linger_ops.end(); ++j) {
2081 LingerOp *op = j->second;
2082 op->get();
2083 logger->inc(l_osdc_linger_resend);
2084 assert(lresend.count(j->first) == 0);
2085 lresend[j->first] = op;
2086 }
2087
2088 // resend commands
2089 map<uint64_t,CommandOp*> cresend; // resend in order
2090 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2091 k != session->command_ops.end(); ++k) {
2092 logger->inc(l_osdc_command_resend);
2093 cresend[k->first] = k->second;
2094 }
2095 while (!cresend.empty()) {
2096 _send_command(cresend.begin()->second);
2097 cresend.erase(cresend.begin());
2098 }
2099}
2100
2101void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2102 unique_lock& ul)
2103{
2104 assert(ul.owns_lock());
2105 shunique_lock sul(std::move(ul));
2106 while (!lresend.empty()) {
2107 LingerOp *op = lresend.begin()->second;
2108 if (!op->canceled) {
2109 _send_linger(op, sul);
2110 }
2111 op->put();
2112 lresend.erase(lresend.begin());
2113 }
2114 ul = unique_lock(sul.release_to_unique());
2115}
2116
2117void Objecter::start_tick()
2118{
2119 assert(tick_event == 0);
2120 tick_event =
2121 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2122 &Objecter::tick, this);
2123}
2124
2125void Objecter::tick()
2126{
2127 shared_lock rl(rwlock);
2128
2129 ldout(cct, 10) << "tick" << dendl;
2130
2131 // we are only called by C_Tick
2132 tick_event = 0;
2133
31f18b77 2134 if (!initialized) {
7c673cae
FG
2135 // we raced with shutdown
2136 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2137 return;
2138 }
2139
2140 set<OSDSession*> toping;
2141
2142
2143 // look for laggy requests
2144 auto cutoff = ceph::mono_clock::now();
2145 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2146
2147 unsigned laggy_ops = 0;
2148
2149 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2150 siter != osd_sessions.end(); ++siter) {
2151 OSDSession *s = siter->second;
2152 OSDSession::lock_guard l(s->lock);
2153 bool found = false;
2154 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2155 p != s->ops.end();
2156 ++p) {
2157 Op *op = p->second;
2158 assert(op->session);
2159 if (op->stamp < cutoff) {
2160 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2161 << " is laggy" << dendl;
2162 found = true;
2163 ++laggy_ops;
2164 }
2165 }
2166 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2167 p != s->linger_ops.end();
2168 ++p) {
2169 LingerOp *op = p->second;
2170 LingerOp::unique_lock wl(op->watch_lock);
2171 assert(op->session);
2172 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2173 << " (osd." << op->session->osd << ")" << dendl;
2174 found = true;
2175 if (op->is_watch && op->registered && !op->last_error)
2176 _send_linger_ping(op);
2177 }
2178 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2179 p != s->command_ops.end();
2180 ++p) {
2181 CommandOp *op = p->second;
2182 assert(op->session);
2183 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2184 << " (osd." << op->session->osd << ")" << dendl;
2185 found = true;
2186 }
2187 if (found)
2188 toping.insert(s);
2189 }
31f18b77 2190 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2191 _maybe_request_map();
2192 }
2193
2194 logger->set(l_osdc_op_laggy, laggy_ops);
2195 logger->set(l_osdc_osd_laggy, toping.size());
2196
2197 if (!toping.empty()) {
2198 // send a ping to these osds, to ensure we detect any session resets
2199 // (osd reply message policy is lossy)
2200 for (set<OSDSession*>::const_iterator i = toping.begin();
2201 i != toping.end();
2202 ++i) {
2203 (*i)->con->send_message(new MPing);
2204 }
2205 }
2206
2207 // Make sure we don't resechedule if we wake up after shutdown
31f18b77 2208 if (initialized) {
7c673cae
FG
2209 tick_event = timer.reschedule_me(ceph::make_timespan(
2210 cct->_conf->objecter_tick_interval));
2211 }
2212}
2213
2214void Objecter::resend_mon_ops()
2215{
2216 unique_lock wl(rwlock);
2217
2218 ldout(cct, 10) << "resend_mon_ops" << dendl;
2219
2220 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2221 p != poolstat_ops.end();
2222 ++p) {
2223 _poolstat_submit(p->second);
2224 logger->inc(l_osdc_poolstat_resend);
2225 }
2226
2227 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2228 p != statfs_ops.end();
2229 ++p) {
2230 _fs_stats_submit(p->second);
2231 logger->inc(l_osdc_statfs_resend);
2232 }
2233
2234 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2235 p != pool_ops.end();
2236 ++p) {
2237 _pool_op_submit(p->second);
2238 logger->inc(l_osdc_poolop_resend);
2239 }
2240
2241 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2242 p != check_latest_map_ops.end();
2243 ++p) {
2244 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2245 monc->get_version("osdmap", &c->latest, NULL, c);
2246 }
2247
2248 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2249 p != check_latest_map_lingers.end();
2250 ++p) {
2251 C_Linger_Map_Latest *c
2252 = new C_Linger_Map_Latest(this, p->second->linger_id);
2253 monc->get_version("osdmap", &c->latest, NULL, c);
2254 }
2255
2256 for (map<uint64_t, CommandOp*>::iterator p
2257 = check_latest_map_commands.begin();
2258 p != check_latest_map_commands.end();
2259 ++p) {
2260 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2261 monc->get_version("osdmap", &c->latest, NULL, c);
2262 }
2263}
2264
2265// read | write ---------------------------
2266
2267void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2268{
2269 shunique_lock rl(rwlock, ceph::acquire_shared);
2270 ceph_tid_t tid = 0;
2271 if (!ptid)
2272 ptid = &tid;
31f18b77 2273 op->trace.event("op submit");
7c673cae
FG
2274 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2275}
2276
2277void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2278 ceph_tid_t *ptid,
2279 int *ctx_budget)
2280{
31f18b77 2281 assert(initialized);
7c673cae
FG
2282
2283 assert(op->ops.size() == op->out_bl.size());
2284 assert(op->ops.size() == op->out_rval.size());
2285 assert(op->ops.size() == op->out_handler.size());
2286
2287 // throttle. before we look at any state, because
2288 // _take_op_budget() may drop our lock while it blocks.
2289 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2290 int op_budget = _take_op_budget(op, sul);
2291 // take and pass out the budget for the first OP
2292 // in the context session
2293 if (ctx_budget && (*ctx_budget == -1)) {
2294 *ctx_budget = op_budget;
2295 }
2296 }
2297
2298 if (osd_timeout > timespan(0)) {
2299 if (op->tid == 0)
31f18b77 2300 op->tid = ++last_tid;
7c673cae
FG
2301 auto tid = op->tid;
2302 op->ontimeout = timer.add_event(osd_timeout,
2303 [this, tid]() {
2304 op_cancel(tid, -ETIMEDOUT); });
2305 }
2306
2307 _op_submit(op, sul, ptid);
2308}
2309
2310void Objecter::_send_op_account(Op *op)
2311{
31f18b77 2312 inflight_ops++;
7c673cae
FG
2313
2314 // add to gather set(s)
2315 if (op->onfinish) {
31f18b77 2316 num_in_flight++;
7c673cae
FG
2317 } else {
2318 ldout(cct, 20) << " note: not requesting reply" << dendl;
2319 }
2320
2321 logger->inc(l_osdc_op_active);
2322 logger->inc(l_osdc_op);
2323
2324 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2325 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2326 logger->inc(l_osdc_op_rmw);
2327 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2328 logger->inc(l_osdc_op_w);
2329 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2330 logger->inc(l_osdc_op_r);
2331
2332 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2333 logger->inc(l_osdc_op_pg);
2334
2335 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2336 int code = l_osdc_osdop_other;
2337 switch (p->op.op) {
2338 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2339 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2340 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2341 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2342 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2343 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2344 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2345 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2346 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2347 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2348 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2349 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2350 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2351 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2352 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2353 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2354 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
2355 case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
2356 case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
2357 case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
2358
2359 // OMAP read operations
2360 case CEPH_OSD_OP_OMAPGETVALS:
2361 case CEPH_OSD_OP_OMAPGETKEYS:
2362 case CEPH_OSD_OP_OMAPGETHEADER:
2363 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2364 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2365
2366 // OMAP write operations
2367 case CEPH_OSD_OP_OMAPSETVALS:
2368 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2369
2370 // OMAP del operations
2371 case CEPH_OSD_OP_OMAPCLEAR:
2372 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2373
2374 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2375 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2376 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2377 }
2378 if (code)
2379 logger->inc(code);
2380 }
2381}
2382
2383void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2384{
2385 // rwlock is locked
2386
2387 ldout(cct, 10) << __func__ << " op " << op << dendl;
2388
2389 // pick target
2390 assert(op->session == NULL);
2391 OSDSession *s = NULL;
2392
2393 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2394 == RECALC_OP_TARGET_POOL_DNE;
2395
2396 // Try to get a session, including a retry if we need to take write lock
2397 int r = _get_session(op->target.osd, &s, sul);
2398 if (r == -EAGAIN ||
2399 (check_for_latest_map && sul.owns_lock_shared())) {
2400 epoch_t orig_epoch = osdmap->get_epoch();
2401 sul.unlock();
2402 if (cct->_conf->objecter_debug_inject_relock_delay) {
2403 sleep(1);
2404 }
2405 sul.lock();
2406 if (orig_epoch != osdmap->get_epoch()) {
2407 // map changed; recalculate mapping
2408 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2409 << dendl;
2410 check_for_latest_map = _calc_target(&op->target, nullptr)
2411 == RECALC_OP_TARGET_POOL_DNE;
2412 if (s) {
2413 put_session(s);
2414 s = NULL;
2415 r = -EAGAIN;
2416 }
2417 }
2418 }
2419 if (r == -EAGAIN) {
2420 assert(s == NULL);
2421 r = _get_session(op->target.osd, &s, sul);
2422 }
2423 assert(r == 0);
2424 assert(s); // may be homeless
2425
2426 _send_op_account(op);
2427
2428 // send?
2429
2430 assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
2431
2432 if (osdmap_full_try) {
2433 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2434 }
2435
2436 bool need_send = false;
2437
2438 if (osdmap->get_epoch() < epoch_barrier) {
2439 ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
2440 << dendl;
2441 op->target.paused = true;
2442 _maybe_request_map();
2443 } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
2444 osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2445 ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
2446 << dendl;
2447 op->target.paused = true;
2448 _maybe_request_map();
2449 } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
2450 osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
2451 ldout(cct, 10) << " paused read " << op << " tid " << op->tid
2452 << dendl;
2453 op->target.paused = true;
2454 _maybe_request_map();
2455 } else if (op->respects_full() &&
2456 (_osdmap_full_flag() ||
2457 _osdmap_pool_full(op->target.base_oloc.pool))) {
2458 ldout(cct, 0) << " FULL, paused modify " << op << " tid "
2459 << op->tid << dendl;
2460 op->target.paused = true;
2461 _maybe_request_map();
2462 } else if (!s->is_homeless()) {
2463 need_send = true;
2464 } else {
2465 _maybe_request_map();
2466 }
2467
2468 MOSDOp *m = NULL;
2469 if (need_send) {
2470 m = _prepare_osd_op(op);
2471 }
2472
2473 OSDSession::unique_lock sl(s->lock);
2474 if (op->tid == 0)
31f18b77 2475 op->tid = ++last_tid;
7c673cae
FG
2476
2477 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2478 << " '" << op->target.base_oloc << "' '"
2479 << op->target.target_oloc << "' " << op->ops << " tid "
2480 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2481 << dendl;
2482
2483 _session_op_assign(s, op);
2484
2485 if (need_send) {
2486 _send_op(op, m);
2487 }
2488
2489 // Last chance to touch Op here, after giving up session lock it can
2490 // be freed at any time by response handler.
2491 ceph_tid_t tid = op->tid;
2492 if (check_for_latest_map) {
2493 _send_op_map_check(op);
2494 }
2495 if (ptid)
2496 *ptid = tid;
2497 op = NULL;
2498
2499 sl.unlock();
2500 put_session(s);
2501
31f18b77 2502 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2503}
2504
2505int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2506{
31f18b77 2507 assert(initialized);
7c673cae
FG
2508
2509 OSDSession::unique_lock sl(s->lock);
2510
2511 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2512 if (p == s->ops.end()) {
2513 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2514 << s->osd << dendl;
2515 return -ENOENT;
2516 }
2517
2518 if (s->con) {
2519 ldout(cct, 20) << " revoking rx buffer for " << tid
2520 << " on " << s->con << dendl;
2521 s->con->revoke_rx_buffer(tid);
2522 }
2523
2524 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2525 << dendl;
2526 Op *op = p->second;
2527 if (op->onfinish) {
31f18b77 2528 num_in_flight--;
7c673cae
FG
2529 op->onfinish->complete(r);
2530 op->onfinish = NULL;
2531 }
2532 _op_cancel_map_check(op);
2533 _finish_op(op, r);
2534 sl.unlock();
2535
2536 return 0;
2537}
2538
2539int Objecter::op_cancel(ceph_tid_t tid, int r)
2540{
2541 int ret = 0;
2542
2543 unique_lock wl(rwlock);
2544 ret = _op_cancel(tid, r);
2545
2546 return ret;
2547}
2548
94b18763
FG
2549int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2550{
2551 unique_lock wl(rwlock);
2552 ldout(cct,10) << __func__ << " " << tids << dendl;
2553 for (auto tid : tids) {
2554 _op_cancel(tid, r);
2555 }
2556 return 0;
2557}
2558
7c673cae
FG
2559int Objecter::_op_cancel(ceph_tid_t tid, int r)
2560{
2561 int ret = 0;
2562
2563 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2564 << dendl;
2565
2566start:
2567
2568 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2569 siter != osd_sessions.end(); ++siter) {
2570 OSDSession *s = siter->second;
2571 OSDSession::shared_lock sl(s->lock);
2572 if (s->ops.find(tid) != s->ops.end()) {
2573 sl.unlock();
2574 ret = op_cancel(s, tid, r);
2575 if (ret == -ENOENT) {
2576 /* oh no! raced, maybe tid moved to another session, restarting */
2577 goto start;
2578 }
2579 return ret;
2580 }
2581 }
2582
2583 ldout(cct, 5) << __func__ << ": tid " << tid
2584 << " not found in live sessions" << dendl;
2585
2586 // Handle case where the op is in homeless session
2587 OSDSession::shared_lock sl(homeless_session->lock);
2588 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2589 sl.unlock();
2590 ret = op_cancel(homeless_session, tid, r);
2591 if (ret == -ENOENT) {
2592 /* oh no! raced, maybe tid moved to another session, restarting */
2593 goto start;
2594 } else {
2595 return ret;
2596 }
2597 } else {
2598 sl.unlock();
2599 }
2600
2601 ldout(cct, 5) << __func__ << ": tid " << tid
2602 << " not found in homeless session" << dendl;
2603
2604 return ret;
2605}
2606
2607
2608epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2609{
2610 unique_lock wl(rwlock);
2611
2612 std::vector<ceph_tid_t> to_cancel;
2613 bool found = false;
2614
2615 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2616 siter != osd_sessions.end(); ++siter) {
2617 OSDSession *s = siter->second;
2618 OSDSession::shared_lock sl(s->lock);
2619 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2620 op_i != s->ops.end(); ++op_i) {
2621 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2622 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2623 to_cancel.push_back(op_i->first);
2624 }
2625 }
2626 sl.unlock();
2627
2628 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2629 titer != to_cancel.end();
2630 ++titer) {
2631 int cancel_result = op_cancel(s, *titer, r);
2632 // We hold rwlock across search and cancellation, so cancels
2633 // should always succeed
2634 assert(cancel_result == 0);
2635 }
2636 if (!found && to_cancel.size())
2637 found = true;
2638 to_cancel.clear();
2639 }
2640
2641 const epoch_t epoch = osdmap->get_epoch();
2642
2643 wl.unlock();
2644
2645 if (found) {
2646 return epoch;
2647 } else {
2648 return -1;
2649 }
2650}
2651
2652bool Objecter::is_pg_changed(
2653 int oldprimary,
2654 const vector<int>& oldacting,
2655 int newprimary,
2656 const vector<int>& newacting,
2657 bool any_change)
2658{
2659 if (OSDMap::primary_changed(
2660 oldprimary,
2661 oldacting,
2662 newprimary,
2663 newacting))
2664 return true;
2665 if (any_change && oldacting != newacting)
2666 return true;
2667 return false; // same primary (tho replicas may have changed)
2668}
2669
2670bool Objecter::target_should_be_paused(op_target_t *t)
2671{
2672 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2673 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2674 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2675 _osdmap_full_flag() || _osdmap_pool_full(*pi);
2676
2677 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2678 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2679 (osdmap->get_epoch() < epoch_barrier);
2680}
2681
2682/**
2683 * Locking public accessor for _osdmap_full_flag
2684 */
2685bool Objecter::osdmap_full_flag() const
2686{
2687 shared_lock rl(rwlock);
2688
2689 return _osdmap_full_flag();
2690}
2691
2692bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2693{
2694 shared_lock rl(rwlock);
2695
2696 if (_osdmap_full_flag()) {
2697 return true;
2698 }
2699
2700 return _osdmap_pool_full(pool_id);
2701}
2702
2703bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2704{
2705 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2706 if (pool == NULL) {
2707 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2708 return false;
2709 }
2710
2711 return _osdmap_pool_full(*pool);
2712}
2713
2714bool Objecter::_osdmap_has_pool_full() const
2715{
2716 for (map<int64_t, pg_pool_t>::const_iterator it
2717 = osdmap->get_pools().begin();
2718 it != osdmap->get_pools().end(); ++it) {
2719 if (_osdmap_pool_full(it->second))
2720 return true;
2721 }
2722 return false;
2723}
2724
2725bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
2726{
2727 return p.has_flag(pg_pool_t::FLAG_FULL) && honor_osdmap_full;
2728}
2729
2730/**
2731 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2732 */
2733bool Objecter::_osdmap_full_flag() const
2734{
2735 // Ignore the FULL flag if the caller has honor_osdmap_full
2736 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
2737}
2738
2739void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2740{
2741 for (map<int64_t, pg_pool_t>::const_iterator it
2742 = osdmap->get_pools().begin();
2743 it != osdmap->get_pools().end(); ++it) {
2744 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2745 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2746 } else {
2747 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2748 pool_full_map[it->first];
2749 }
2750 }
2751}
2752
2753int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2754 const string& ns)
2755{
2756 shared_lock rl(rwlock);
2757 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2758 if (!p)
2759 return -ENOENT;
2760 return p->hash_key(key, ns);
2761}
2762
2763int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2764 const string& ns)
2765{
2766 shared_lock rl(rwlock);
2767 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2768 if (!p)
2769 return -ENOENT;
2770 return p->raw_hash_to_pg(p->hash_key(key, ns));
2771}
2772
2773int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2774{
2775 // rwlock is locked
2776 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2777 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2778 t->epoch = osdmap->get_epoch();
2779 ldout(cct,20) << __func__ << " epoch " << t->epoch
2780 << " base " << t->base_oid << " " << t->base_oloc
2781 << " precalc_pgid " << (int)t->precalc_pgid
2782 << " pgid " << t->base_pgid
2783 << (is_read ? " is_read" : "")
2784 << (is_write ? " is_write" : "")
2785 << dendl;
2786
2787 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2788 if (!pi) {
2789 t->osd = -1;
2790 return RECALC_OP_TARGET_POOL_DNE;
2791 }
2792 ldout(cct,30) << __func__ << " base pi " << pi
2793 << " pg_num " << pi->get_pg_num() << dendl;
2794
2795 bool force_resend = false;
2796 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2797 if (t->last_force_resend < pi->last_force_op_resend) {
2798 t->last_force_resend = pi->last_force_op_resend;
2799 force_resend = true;
2800 } else if (t->last_force_resend == 0) {
2801 force_resend = true;
2802 }
2803 }
2804
2805 // apply tiering
2806 t->target_oid = t->base_oid;
2807 t->target_oloc = t->base_oloc;
2808 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2809 if (is_read && pi->has_read_tier())
2810 t->target_oloc.pool = pi->read_tier;
2811 if (is_write && pi->has_write_tier())
2812 t->target_oloc.pool = pi->write_tier;
2813 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2814 if (!pi) {
2815 t->osd = -1;
2816 return RECALC_OP_TARGET_POOL_DNE;
2817 }
2818 }
2819
2820 pg_t pgid;
2821 if (t->precalc_pgid) {
2822 assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2823 assert(t->base_oid.name.empty()); // make sure this is a pg op
2824 assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
2825 pgid = t->base_pgid;
2826 } else {
2827 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2828 pgid);
2829 if (ret == -ENOENT) {
2830 t->osd = -1;
2831 return RECALC_OP_TARGET_POOL_DNE;
2832 }
2833 }
2834 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2835 << t->target_oloc << " -> pgid " << pgid << dendl;
2836 ldout(cct,30) << __func__ << " target pi " << pi
2837 << " pg_num " << pi->get_pg_num() << dendl;
2838 t->pool_ever_existed = true;
2839
2840 int size = pi->size;
2841 int min_size = pi->min_size;
2842 unsigned pg_num = pi->get_pg_num();
2843 int up_primary, acting_primary;
2844 vector<int> up, acting;
2845 osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
2846 &acting, &acting_primary);
2847 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2848 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2849 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2850 pg_t prev_pgid(prev_seed, pgid.pool());
2851 if (any_change && PastIntervals::is_new_interval(
2852 t->acting_primary,
2853 acting_primary,
2854 t->acting,
2855 acting,
2856 t->up_primary,
2857 up_primary,
2858 t->up,
2859 up,
2860 t->size,
2861 size,
2862 t->min_size,
2863 min_size,
2864 t->pg_num,
2865 pg_num,
2866 t->sort_bitwise,
2867 sort_bitwise,
c07f9fc5
FG
2868 t->recovery_deletes,
2869 recovery_deletes,
7c673cae
FG
2870 prev_pgid)) {
2871 force_resend = true;
2872 }
2873
2874 bool unpaused = false;
2875 if (t->paused && !target_should_be_paused(t)) {
2876 t->paused = false;
2877 unpaused = true;
2878 }
2879
2880 bool legacy_change =
2881 t->pgid != pgid ||
2882 is_pg_changed(
2883 t->acting_primary, t->acting, acting_primary, acting,
2884 t->used_replica || any_change);
2885 bool split = false;
2886 if (t->pg_num) {
2887 split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
2888 }
2889
2890 if (legacy_change || split || force_resend) {
2891 t->pgid = pgid;
2892 t->acting = acting;
2893 t->acting_primary = acting_primary;
2894 t->up_primary = up_primary;
2895 t->up = up;
2896 t->size = size;
2897 t->min_size = min_size;
2898 t->pg_num = pg_num;
2899 t->pg_num_mask = pi->get_pg_num_mask();
2900 osdmap->get_primary_shard(
2901 pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
2902 &t->actual_pgid);
2903 t->sort_bitwise = sort_bitwise;
c07f9fc5 2904 t->recovery_deletes = recovery_deletes;
7c673cae
FG
2905 ldout(cct, 10) << __func__ << " "
2906 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2907 << " acting " << acting
2908 << " primary " << acting_primary << dendl;
2909 t->used_replica = false;
2910 if (acting_primary == -1) {
2911 t->osd = -1;
2912 } else {
2913 int osd;
2914 bool read = is_read && !is_write;
2915 if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2916 int p = rand() % acting.size();
2917 if (p)
2918 t->used_replica = true;
2919 osd = acting[p];
2920 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2921 << dendl;
2922 } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2923 acting.size() > 1) {
2924 // look for a local replica. prefer the primary if the
2925 // distance is the same.
2926 int best = -1;
2927 int best_locality = 0;
2928 for (unsigned i = 0; i < acting.size(); ++i) {
2929 int locality = osdmap->crush->get_common_ancestor_distance(
2930 cct, acting[i], crush_location);
2931 ldout(cct, 20) << __func__ << " localize: rank " << i
2932 << " osd." << acting[i]
2933 << " locality " << locality << dendl;
2934 if (i == 0 ||
2935 (locality >= 0 && best_locality >= 0 &&
2936 locality < best_locality) ||
2937 (best_locality < 0 && locality >= 0)) {
2938 best = i;
2939 best_locality = locality;
2940 if (i)
2941 t->used_replica = true;
2942 }
2943 }
2944 assert(best >= 0);
2945 osd = acting[best];
2946 } else {
2947 osd = acting_primary;
2948 }
2949 t->osd = osd;
2950 }
2951 }
2952 if (legacy_change || unpaused || force_resend) {
2953 return RECALC_OP_TARGET_NEED_RESEND;
2954 }
2955 if (split && con && con->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT)) {
2956 return RECALC_OP_TARGET_NEED_RESEND;
2957 }
2958 return RECALC_OP_TARGET_NO_ACTION;
2959}
2960
2961int Objecter::_map_session(op_target_t *target, OSDSession **s,
2962 shunique_lock& sul)
2963{
2964 _calc_target(target, nullptr);
2965 return _get_session(target->osd, s, sul);
2966}
2967
2968void Objecter::_session_op_assign(OSDSession *to, Op *op)
2969{
2970 // to->lock is locked
2971 assert(op->session == NULL);
2972 assert(op->tid);
2973
2974 get_session(to);
2975 op->session = to;
2976 to->ops[op->tid] = op;
2977
2978 if (to->is_homeless()) {
31f18b77 2979 num_homeless_ops++;
7c673cae
FG
2980 }
2981
2982 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
2983}
2984
2985void Objecter::_session_op_remove(OSDSession *from, Op *op)
2986{
2987 assert(op->session == from);
2988 // from->lock is locked
2989
2990 if (from->is_homeless()) {
31f18b77 2991 num_homeless_ops--;
7c673cae
FG
2992 }
2993
2994 from->ops.erase(op->tid);
2995 put_session(from);
2996 op->session = NULL;
2997
2998 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
2999}
3000
3001void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3002{
3003 // to lock is locked unique
3004 assert(op->session == NULL);
3005
3006 if (to->is_homeless()) {
31f18b77 3007 num_homeless_ops++;
7c673cae
FG
3008 }
3009
3010 get_session(to);
3011 op->session = to;
3012 to->linger_ops[op->linger_id] = op;
3013
3014 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3015 << dendl;
3016}
3017
3018void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3019{
3020 assert(from == op->session);
3021 // from->lock is locked unique
3022
3023 if (from->is_homeless()) {
31f18b77 3024 num_homeless_ops--;
7c673cae
FG
3025 }
3026
3027 from->linger_ops.erase(op->linger_id);
3028 put_session(from);
3029 op->session = NULL;
3030
3031 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3032 << dendl;
3033}
3034
3035void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3036{
3037 assert(from == op->session);
3038 // from->lock is locked
3039
3040 if (from->is_homeless()) {
31f18b77 3041 num_homeless_ops--;
7c673cae
FG
3042 }
3043
3044 from->command_ops.erase(op->tid);
3045 put_session(from);
3046 op->session = NULL;
3047
3048 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3049}
3050
3051void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3052{
3053 // to->lock is locked
3054 assert(op->session == NULL);
3055 assert(op->tid);
3056
3057 if (to->is_homeless()) {
31f18b77 3058 num_homeless_ops++;
7c673cae
FG
3059 }
3060
3061 get_session(to);
3062 op->session = to;
3063 to->command_ops[op->tid] = op;
3064
3065 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3066}
3067
3068int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3069 shunique_lock& sul)
3070{
3071 // rwlock is locked unique
3072
3073 int r = _calc_target(&linger_op->target, nullptr, true);
3074 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3075 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3076 << " pgid " << linger_op->target.pgid
3077 << " acting " << linger_op->target.acting << dendl;
3078
3079 OSDSession *s = NULL;
3080 r = _get_session(linger_op->target.osd, &s, sul);
3081 assert(r == 0);
3082
3083 if (linger_op->session != s) {
3084 // NB locking two sessions (s and linger_op->session) at the
3085 // same time here is only safe because we are the only one that
3086 // takes two, and we are holding rwlock for write. Disable
3087 // lockdep because it doesn't know that.
3088 OSDSession::unique_lock sl(s->lock);
3089 _session_linger_op_remove(linger_op->session, linger_op);
3090 _session_linger_op_assign(s, linger_op);
3091 }
3092
3093 put_session(s);
3094 return RECALC_OP_TARGET_NEED_RESEND;
3095 }
3096 return r;
3097}
3098
3099void Objecter::_cancel_linger_op(Op *op)
3100{
3101 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3102
3103 assert(!op->should_resend);
3104 if (op->onfinish) {
3105 delete op->onfinish;
31f18b77 3106 num_in_flight--;
7c673cae
FG
3107 }
3108
3109 _finish_op(op, 0);
3110}
3111
3112void Objecter::_finish_op(Op *op, int r)
3113{
3114 ldout(cct, 15) << "finish_op " << op->tid << dendl;
3115
3116 // op->session->lock is locked unique or op->session is null
3117
3118 if (!op->ctx_budgeted && op->budgeted)
3119 put_op_budget(op);
3120
3121 if (op->ontimeout && r != -ETIMEDOUT)
3122 timer.cancel_event(op->ontimeout);
3123
3124 if (op->session) {
3125 _session_op_remove(op->session, op);
3126 }
3127
3128 logger->dec(l_osdc_op_active);
3129
3130 assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
3131
31f18b77 3132 inflight_ops--;
7c673cae
FG
3133
3134 op->put();
3135}
3136
3137void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
3138{
3139 ldout(cct, 15) << "finish_op " << tid << dendl;
3140 shared_lock rl(rwlock);
3141
3142 OSDSession::unique_lock wl(session->lock);
3143
3144 map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
3145 if (iter == session->ops.end())
3146 return;
3147
3148 Op *op = iter->second;
3149
3150 _finish_op(op, 0);
3151}
3152
3153MOSDOp *Objecter::_prepare_osd_op(Op *op)
3154{
3155 // rwlock is locked
3156
3157 int flags = op->target.flags;
3158 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3159
3160 // Nothing checks this any longer, but needed for compatibility with
3161 // pre-luminous osds
3162 flags |= CEPH_OSD_FLAG_ONDISK;
3163
3164 if (!honor_osdmap_full)
3165 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3166
3167 op->target.paused = false;
3168 op->stamp = ceph::mono_clock::now();
3169
3170 hobject_t hobj = op->target.get_hobj();
31f18b77 3171 MOSDOp *m = new MOSDOp(client_inc, op->tid,
7c673cae
FG
3172 hobj, op->target.actual_pgid,
3173 osdmap->get_epoch(),
3174 flags, op->features);
3175
3176 m->set_snapid(op->snapid);
3177 m->set_snap_seq(op->snapc.seq);
3178 m->set_snaps(op->snapc.snaps);
3179
3180 m->ops = op->ops;
3181 m->set_mtime(op->mtime);
3182 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3183
3184 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3185 op->trace.init("op", &trace_endpoint);
3186 }
7c673cae
FG
3187
3188 if (op->priority)
3189 m->set_priority(op->priority);
3190 else
3191 m->set_priority(cct->_conf->osd_client_op_priority);
3192
3193 if (op->reqid != osd_reqid_t()) {
3194 m->set_reqid(op->reqid);
3195 }
3196
3197 logger->inc(l_osdc_op_send);
b32b8144
FG
3198 ssize_t sum = 0;
3199 for (unsigned i = 0; i < m->ops.size(); i++) {
3200 sum += m->ops[i].indata.length();
3201 }
3202 logger->inc(l_osdc_op_send_bytes, sum);
7c673cae
FG
3203
3204 return m;
3205}
3206
3207void Objecter::_send_op(Op *op, MOSDOp *m)
3208{
3209 // rwlock is locked
3210 // op->session->lock is locked
3211
3212 // backoff?
7c673cae
FG
3213 auto p = op->session->backoffs.find(op->target.actual_pgid);
3214 if (p != op->session->backoffs.end()) {
b32b8144 3215 hobject_t hoid = op->target.get_hobj();
7c673cae
FG
3216 auto q = p->second.lower_bound(hoid);
3217 if (q != p->second.begin()) {
3218 --q;
3219 if (hoid >= q->second.end) {
3220 ++q;
3221 }
3222 }
3223 if (q != p->second.end()) {
3224 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3225 << "," << q->second.end << ")" << dendl;
3226 int r = cmp(hoid, q->second.begin);
3227 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3228 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3229 << " id " << q->second.id << " on " << hoid
3230 << ", queuing " << op << " tid " << op->tid << dendl;
3231 return;
3232 }
3233 }
3234 }
3235
3236 if (!m) {
3237 assert(op->tid > 0);
3238 m = _prepare_osd_op(op);
3239 }
3240
3241 if (op->target.actual_pgid != m->get_spg()) {
3242 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3243 << m->get_spg() << " to " << op->target.actual_pgid
3244 << ", updating and reencoding" << dendl;
3245 m->set_spg(op->target.actual_pgid);
3246 m->clear_payload(); // reencode
3247 }
3248
3249 ldout(cct, 15) << "_send_op " << op->tid << " to "
3250 << op->target.actual_pgid << " on osd." << op->session->osd
3251 << dendl;
3252
3253 ConnectionRef con = op->session->con;
3254 assert(con);
3255
3256 // preallocated rx buffer?
3257 if (op->con) {
3258 ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
3259 << op->con << dendl;
3260 op->con->revoke_rx_buffer(op->tid);
3261 }
3262 if (op->outbl &&
3263 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3264 op->outbl->length()) {
3265 ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
3266 << dendl;
3267 op->con = con;
3268 op->con->post_rx_buffer(op->tid, *op->outbl);
3269 }
3270
3271 op->incarnation = op->session->incarnation;
3272
3273 m->set_tid(op->tid);
3274
31f18b77
FG
3275 if (op->trace.valid()) {
3276 m->trace.init("op msg", nullptr, &op->trace);
3277 }
7c673cae
FG
3278 op->session->con->send_message(m);
3279}
3280
3281int Objecter::calc_op_budget(Op *op)
3282{
3283 int op_budget = 0;
3284 for (vector<OSDOp>::iterator i = op->ops.begin();
3285 i != op->ops.end();
3286 ++i) {
3287 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3288 op_budget += i->indata.length();
3289 } else if (ceph_osd_op_mode_read(i->op.op)) {
3290 if (ceph_osd_op_type_data(i->op.op)) {
3291 if ((int64_t)i->op.extent.length > 0)
3292 op_budget += (int64_t)i->op.extent.length;
3293 } else if (ceph_osd_op_type_attr(i->op.op)) {
3294 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
3295 }
3296 }
3297 }
3298 return op_budget;
3299}
3300
3301void Objecter::_throttle_op(Op *op,
3302 shunique_lock& sul,
3303 int op_budget)
3304{
3305 assert(sul && sul.mutex() == &rwlock);
3306 bool locked_for_write = sul.owns_lock();
3307
3308 if (!op_budget)
3309 op_budget = calc_op_budget(op);
3310 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3311 sul.unlock();
3312 op_throttle_bytes.get(op_budget);
3313 if (locked_for_write)
3314 sul.lock();
3315 else
3316 sul.lock_shared();
3317 }
3318 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3319 sul.unlock();
3320 op_throttle_ops.get(1);
3321 if (locked_for_write)
3322 sul.lock();
3323 else
3324 sul.lock_shared();
3325 }
3326}
3327
3328void Objecter::unregister_op(Op *op)
3329{
3330 OSDSession::unique_lock sl(op->session->lock);
3331 op->session->ops.erase(op->tid);
3332 sl.unlock();
3333 put_session(op->session);
3334 op->session = NULL;
3335
31f18b77 3336 inflight_ops--;
7c673cae
FG
3337}
3338
3339/* This function DOES put the passed message before returning */
3340void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3341{
3342 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3343
3344 // get pio
3345 ceph_tid_t tid = m->get_tid();
3346
3347 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3348 if (!initialized) {
7c673cae
FG
3349 m->put();
3350 return;
3351 }
3352
3353 ConnectionRef con = m->get_connection();
3354 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3355 if (!s || s->con != con) {
3356 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3357 if (s) {
3358 s->put();
3359 }
3360 m->put();
3361 return;
3362 }
3363
3364 OSDSession::unique_lock sl(s->lock);
3365
3366 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3367 if (iter == s->ops.end()) {
3368 ldout(cct, 7) << "handle_osd_op_reply " << tid
3369 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3370 " onnvram" : " ack"))
3371 << " ... stray" << dendl;
3372 sl.unlock();
3373 put_session(s);
3374 m->put();
3375 return;
3376 }
3377
3378 ldout(cct, 7) << "handle_osd_op_reply " << tid
3379 << (m->is_ondisk() ? " ondisk" :
3380 (m->is_onnvram() ? " onnvram" : " ack"))
3381 << " uv " << m->get_user_version()
3382 << " in " << m->get_pg()
3383 << " attempt " << m->get_retry_attempt()
3384 << dendl;
3385 Op *op = iter->second;
31f18b77 3386 op->trace.event("osd op reply");
7c673cae
FG
3387
3388 if (retry_writes_after_first_reply && op->attempts == 1 &&
3389 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3390 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3391 if (op->onfinish) {
31f18b77 3392 num_in_flight--;
7c673cae
FG
3393 }
3394 _session_op_remove(s, op);
3395 sl.unlock();
3396 put_session(s);
3397
3398 _op_submit(op, sul, NULL);
3399 m->put();
3400 return;
3401 }
3402
3403 if (m->get_retry_attempt() >= 0) {
3404 if (m->get_retry_attempt() != (op->attempts - 1)) {
3405 ldout(cct, 7) << " ignoring reply from attempt "
3406 << m->get_retry_attempt()
3407 << " from " << m->get_source_inst()
3408 << "; last attempt " << (op->attempts - 1) << " sent to "
3409 << op->session->con->get_peer_addr() << dendl;
3410 m->put();
3411 sl.unlock();
3412 put_session(s);
3413 return;
3414 }
3415 } else {
3416 // we don't know the request attempt because the server is old, so
3417 // just accept this one. we may do ACK callbacks we shouldn't
3418 // have, but that is better than doing callbacks out of order.
3419 }
3420
3421 Context *onfinish = 0;
3422
3423 int rc = m->get_result();
3424
3425 if (m->is_redirect_reply()) {
3426 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3427 if (op->onfinish)
31f18b77 3428 num_in_flight--;
7c673cae
FG
3429 _session_op_remove(s, op);
3430 sl.unlock();
3431 put_session(s);
3432
3433 // FIXME: two redirects could race and reorder
3434
3435 op->tid = 0;
3436 m->get_redirect().combine_with_locator(op->target.target_oloc,
3437 op->target.target_oid.name);
94b18763 3438 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED | CEPH_OSD_FLAG_IGNORE_OVERLAY);
7c673cae
FG
3439 _op_submit(op, sul, NULL);
3440 m->put();
3441 return;
3442 }
3443
3444 if (rc == -EAGAIN) {
3445 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
c07f9fc5
FG
3446 if (op->onfinish)
3447 num_in_flight--;
3448 _session_op_remove(s, op);
7c673cae
FG
3449 sl.unlock();
3450 put_session(s);
c07f9fc5
FG
3451
3452 op->tid = 0;
3453 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3454 CEPH_OSD_FLAG_LOCALIZE_READS);
3455 op->target.pgid = pg_t();
3456 _op_submit(op, sul, NULL);
7c673cae
FG
3457 m->put();
3458 return;
3459 }
3460
3461 sul.unlock();
3462
3463 if (op->objver)
3464 *op->objver = m->get_user_version();
3465 if (op->reply_epoch)
3466 *op->reply_epoch = m->get_map_epoch();
3467 if (op->data_offset)
3468 *op->data_offset = m->get_header().data_off;
3469
3470 // got data?
3471 if (op->outbl) {
3472 if (op->con)
3473 op->con->revoke_rx_buffer(op->tid);
3474 m->claim_data(*op->outbl);
3475 op->outbl = 0;
3476 }
3477
3478 // per-op result demuxing
3479 vector<OSDOp> out_ops;
3480 m->claim_ops(out_ops);
3481
3482 if (out_ops.size() != op->ops.size())
3483 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3484 << " != request ops " << op->ops
3485 << " from " << m->get_source_inst() << dendl;
3486
3487 vector<bufferlist*>::iterator pb = op->out_bl.begin();
3488 vector<int*>::iterator pr = op->out_rval.begin();
3489 vector<Context*>::iterator ph = op->out_handler.begin();
3490 assert(op->out_bl.size() == op->out_rval.size());
3491 assert(op->out_bl.size() == op->out_handler.size());
3492 vector<OSDOp>::iterator p = out_ops.begin();
3493 for (unsigned i = 0;
3494 p != out_ops.end() && pb != op->out_bl.end();
3495 ++i, ++p, ++pb, ++pr, ++ph) {
3496 ldout(cct, 10) << " op " << i << " rval " << p->rval
3497 << " len " << p->outdata.length() << dendl;
3498 if (*pb)
3499 **pb = p->outdata;
3500 // set rval before running handlers so that handlers
3501 // can change it if e.g. decoding fails
3502 if (*pr)
31f18b77 3503 **pr = ceph_to_hostos_errno(p->rval);
7c673cae
FG
3504 if (*ph) {
3505 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
31f18b77 3506 (*ph)->complete(ceph_to_hostos_errno(p->rval));
7c673cae
FG
3507 *ph = NULL;
3508 }
3509 }
3510
3511 // NOTE: we assume that since we only request ONDISK ever we will
3512 // only ever get back one (type of) ack ever.
3513
3514 if (op->onfinish) {
31f18b77 3515 num_in_flight--;
7c673cae
FG
3516 onfinish = op->onfinish;
3517 op->onfinish = NULL;
3518 }
3519 logger->inc(l_osdc_op_reply);
3520
3521 /* get it before we call _finish_op() */
3522 auto completion_lock = s->get_lock(op->target.base_oid);
3523
3524 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3525 _finish_op(op, 0);
3526
31f18b77 3527 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3528
3529 // serialize completions
3530 if (completion_lock.mutex()) {
3531 completion_lock.lock();
3532 }
3533 sl.unlock();
3534
3535 // do callbacks
3536 if (onfinish) {
3537 onfinish->complete(rc);
3538 }
3539 if (completion_lock.mutex()) {
3540 completion_lock.unlock();
3541 }
3542
3543 m->put();
3544 put_session(s);
3545}
3546
3547void Objecter::handle_osd_backoff(MOSDBackoff *m)
3548{
3549 ldout(cct, 10) << __func__ << " " << *m << dendl;
3550 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3551 if (!initialized) {
7c673cae
FG
3552 m->put();
3553 return;
3554 }
3555
3556 ConnectionRef con = m->get_connection();
3557 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3558 if (!s || s->con != con) {
3559 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3560 if (s)
3561 s->put();
3562 m->put();
3563 return;
3564 }
3565
3566 get_session(s);
3567 s->put(); // from get_priv() above
3568
3569 OSDSession::unique_lock sl(s->lock);
3570
3571 switch (m->op) {
3572 case CEPH_OSD_BACKOFF_OP_BLOCK:
3573 {
3574 // register
3575 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3576 s->backoffs_by_id.insert(make_pair(m->id, &b));
3577 b.pgid = m->pgid;
3578 b.id = m->id;
3579 b.begin = m->begin;
3580 b.end = m->end;
3581
3582 // ack with original backoff's epoch so that the osd can discard this if
3583 // there was a pg split.
3584 Message *r = new MOSDBackoff(m->pgid,
3585 m->map_epoch,
3586 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3587 m->id, m->begin, m->end);
3588 // this priority must match the MOSDOps from _prepare_osd_op
3589 r->set_priority(cct->_conf->osd_client_op_priority);
3590 con->send_message(r);
3591 }
3592 break;
3593
3594 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3595 {
3596 auto p = s->backoffs_by_id.find(m->id);
3597 if (p != s->backoffs_by_id.end()) {
3598 OSDBackoff *b = p->second;
3599 if (b->begin != m->begin &&
3600 b->end != m->end) {
3601 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3602 << " unblock on ["
3603 << m->begin << "," << m->end << ") but backoff is ["
3604 << b->begin << "," << b->end << ")" << dendl;
3605 // hrmpf, unblock it anyway.
3606 }
3607 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3608 << " id " << b->id
3609 << " [" << b->begin << "," << b->end
3610 << ")" << dendl;
3611 auto spgp = s->backoffs.find(b->pgid);
3612 assert(spgp != s->backoffs.end());
3613 spgp->second.erase(b->begin);
3614 if (spgp->second.empty()) {
3615 s->backoffs.erase(spgp);
3616 }
3617 s->backoffs_by_id.erase(p);
3618
3619 // check for any ops to resend
3620 for (auto& q : s->ops) {
3621 if (q.second->target.actual_pgid == m->pgid) {
3622 int r = q.second->target.contained_by(m->begin, m->end);
3623 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3624 << q.second->target.get_hobj() << dendl;
3625 if (r) {
3626 _send_op(q.second);
3627 }
3628 }
3629 }
3630 } else {
3631 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3632 << " unblock on ["
3633 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3634 }
3635 }
3636 break;
3637
3638 default:
3639 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3640 }
3641
3642 sul.unlock();
3643 sl.unlock();
3644
3645 m->put();
3646 put_session(s);
3647}
3648
3649uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3650 uint32_t pos)
3651{
3652 shared_lock rl(rwlock);
3653 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3654 pos, list_context->pool_id, string());
3655 ldout(cct, 10) << __func__ << list_context
3656 << " pos " << pos << " -> " << list_context->pos << dendl;
3657 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3658 list_context->current_pg = actual.ps();
3659 list_context->at_end_of_pool = false;
3660 return pos;
3661}
3662
3663uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3664 const hobject_t& cursor)
3665{
3666 shared_lock rl(rwlock);
3667 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3668 list_context->pos = cursor;
3669 list_context->at_end_of_pool = false;
3670 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3671 list_context->current_pg = actual.ps();
3672 list_context->sort_bitwise = true;
3673 return list_context->current_pg;
3674}
3675
3676void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3677 hobject_t *cursor)
3678{
3679 shared_lock rl(rwlock);
3680 if (list_context->list.empty()) {
3681 *cursor = list_context->pos;
3682 } else {
3683 const librados::ListObjectImpl& entry = list_context->list.front();
3684 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3685 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3686 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3687 }
3688}
3689
3690void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3691{
3692 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3693 << " pool_snap_seq " << list_context->pool_snap_seq
3694 << " max_entries " << list_context->max_entries
3695 << " list_context " << list_context
3696 << " onfinish " << onfinish
3697 << " current_pg " << list_context->current_pg
3698 << " pos " << list_context->pos << dendl;
3699
3700 shared_lock rl(rwlock);
3701 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3702 if (!pool) { // pool is gone
3703 rl.unlock();
3704 put_nlist_context_budget(list_context);
3705 onfinish->complete(-ENOENT);
3706 return;
3707 }
3708 int pg_num = pool->get_pg_num();
3709 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3710
3711 if (list_context->pos.is_min()) {
3712 list_context->starting_pg_num = 0;
3713 list_context->sort_bitwise = sort_bitwise;
3714 list_context->starting_pg_num = pg_num;
3715 }
3716 if (list_context->sort_bitwise != sort_bitwise) {
3717 list_context->pos = hobject_t(
3718 object_t(), string(), CEPH_NOSNAP,
3719 list_context->current_pg, list_context->pool_id, string());
3720 list_context->sort_bitwise = sort_bitwise;
3721 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3722 << list_context->pos << dendl;
3723 }
3724 if (list_context->starting_pg_num != pg_num) {
3725 if (!sort_bitwise) {
3726 // start reading from the beginning; the pgs have changed
3727 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3728 list_context->pos = collection_list_handle_t();
3729 }
3730 list_context->starting_pg_num = pg_num;
3731 }
3732
3733 if (list_context->pos.is_max()) {
3734 ldout(cct, 20) << __func__ << " end of pool, list "
3735 << list_context->list << dendl;
3736 if (list_context->list.empty()) {
3737 list_context->at_end_of_pool = true;
3738 }
3739 // release the listing context's budget once all
3740 // OPs (in the session) are finished
3741 put_nlist_context_budget(list_context);
3742 onfinish->complete(0);
3743 return;
3744 }
3745
3746 ObjectOperation op;
3747 op.pg_nls(list_context->max_entries, list_context->filter,
3748 list_context->pos, osdmap->get_epoch());
3749 list_context->bl.clear();
3750 C_NList *onack = new C_NList(list_context, onfinish, this);
3751 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3752
3753 // note current_pg in case we don't have (or lose) SORTBITWISE
3754 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3755 rl.unlock();
3756
3757 pg_read(list_context->current_pg, oloc, op,
3758 &list_context->bl, 0, onack, &onack->epoch,
3759 &list_context->ctx_budget);
3760}
3761
3762void Objecter::_nlist_reply(NListContext *list_context, int r,
3763 Context *final_finish, epoch_t reply_epoch)
3764{
3765 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3766
3767 bufferlist::iterator iter = list_context->bl.begin();
3768 pg_nls_response_t response;
3769 bufferlist extra_info;
3770 ::decode(response, iter);
3771 if (!iter.end()) {
3772 ::decode(extra_info, iter);
3773 }
3774
3775 // if the osd returns 1 (newer code), or handle MAX, it means we
3776 // hit the end of the pg.
3777 if ((response.handle.is_max() || r == 1) &&
3778 !list_context->sort_bitwise) {
3779 // legacy OSD and !sortbitwise, figure out the next PG on our own
3780 ++list_context->current_pg;
3781 if (list_context->current_pg == list_context->starting_pg_num) {
3782 // end of pool
3783 list_context->pos = hobject_t::get_max();
3784 } else {
3785 // next pg
3786 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3787 list_context->current_pg,
3788 list_context->pool_id, string());
3789 }
3790 } else {
3791 list_context->pos = response.handle;
3792 }
3793
3794 int response_size = response.entries.size();
3795 ldout(cct, 20) << " response.entries.size " << response_size
3796 << ", response.entries " << response.entries
3797 << ", handle " << response.handle
3798 << ", tentative new pos " << list_context->pos << dendl;
3799 list_context->extra_info.append(extra_info);
3800 if (response_size) {
3801 list_context->list.splice(list_context->list.end(), response.entries);
3802 }
3803
3804 if (list_context->list.size() >= list_context->max_entries) {
3805 ldout(cct, 20) << " hit max, returning results so far, "
3806 << list_context->list << dendl;
3807 // release the listing context's budget once all
3808 // OPs (in the session) are finished
3809 put_nlist_context_budget(list_context);
3810 final_finish->complete(0);
3811 return;
3812 }
3813
3814 // continue!
3815 list_nobjects(list_context, final_finish);
3816}
3817
3818void Objecter::put_nlist_context_budget(NListContext *list_context)
3819{
3820 if (list_context->ctx_budget >= 0) {
3821 ldout(cct, 10) << " release listing context's budget " <<
3822 list_context->ctx_budget << dendl;
3823 put_op_budget_bytes(list_context->ctx_budget);
3824 list_context->ctx_budget = -1;
3825 }
3826}
3827
3828// snapshots
3829
3830int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3831 Context *onfinish)
3832{
3833 unique_lock wl(rwlock);
3834 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3835 << snap_name << dendl;
3836
3837 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3838 if (!p)
3839 return -EINVAL;
3840 if (p->snap_exists(snap_name.c_str()))
3841 return -EEXIST;
3842
3843 PoolOp *op = new PoolOp;
3844 if (!op)
3845 return -ENOMEM;
31f18b77 3846 op->tid = ++last_tid;
7c673cae
FG
3847 op->pool = pool;
3848 op->name = snap_name;
3849 op->onfinish = onfinish;
3850 op->pool_op = POOL_OP_CREATE_SNAP;
3851 pool_ops[op->tid] = op;
3852
3853 pool_op_submit(op);
3854
3855 return 0;
3856}
3857
3858struct C_SelfmanagedSnap : public Context {
3859 bufferlist bl;
3860 snapid_t *psnapid;
3861 Context *fin;
3862 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3863 void finish(int r) override {
3864 if (r == 0) {
3865 bufferlist::iterator p = bl.begin();
3866 ::decode(*psnapid, p);
3867 }
3868 fin->complete(r);
3869 }
3870};
3871
3872int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3873 Context *onfinish)
3874{
3875 unique_lock wl(rwlock);
3876 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3877 PoolOp *op = new PoolOp;
3878 if (!op) return -ENOMEM;
31f18b77 3879 op->tid = ++last_tid;
7c673cae
FG
3880 op->pool = pool;
3881 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3882 op->onfinish = fin;
3883 op->blp = &fin->bl;
3884 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3885 pool_ops[op->tid] = op;
3886
3887 pool_op_submit(op);
3888 return 0;
3889}
3890
3891int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3892 Context *onfinish)
3893{
3894 unique_lock wl(rwlock);
3895 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3896 << snap_name << dendl;
3897
3898 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3899 if (!p)
3900 return -EINVAL;
3901 if (!p->snap_exists(snap_name.c_str()))
3902 return -ENOENT;
3903
3904 PoolOp *op = new PoolOp;
3905 if (!op)
3906 return -ENOMEM;
31f18b77 3907 op->tid = ++last_tid;
7c673cae
FG
3908 op->pool = pool;
3909 op->name = snap_name;
3910 op->onfinish = onfinish;
3911 op->pool_op = POOL_OP_DELETE_SNAP;
3912 pool_ops[op->tid] = op;
3913
3914 pool_op_submit(op);
3915
3916 return 0;
3917}
3918
3919int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3920 Context *onfinish)
3921{
3922 unique_lock wl(rwlock);
3923 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3924 << snap << dendl;
3925 PoolOp *op = new PoolOp;
3926 if (!op) return -ENOMEM;
31f18b77 3927 op->tid = ++last_tid;
7c673cae
FG
3928 op->pool = pool;
3929 op->onfinish = onfinish;
3930 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3931 op->snapid = snap;
3932 pool_ops[op->tid] = op;
3933
3934 pool_op_submit(op);
3935
3936 return 0;
3937}
3938
3939int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
3940 int crush_rule)
3941{
3942 unique_lock wl(rwlock);
3943 ldout(cct, 10) << "create_pool name=" << name << dendl;
3944
3945 if (osdmap->lookup_pg_pool_name(name) >= 0)
3946 return -EEXIST;
3947
3948 PoolOp *op = new PoolOp;
3949 if (!op)
3950 return -ENOMEM;
31f18b77 3951 op->tid = ++last_tid;
7c673cae
FG
3952 op->pool = 0;
3953 op->name = name;
3954 op->onfinish = onfinish;
3955 op->pool_op = POOL_OP_CREATE;
3956 pool_ops[op->tid] = op;
3957 op->auid = auid;
3958 op->crush_rule = crush_rule;
3959
3960 pool_op_submit(op);
3961
3962 return 0;
3963}
3964
3965int Objecter::delete_pool(int64_t pool, Context *onfinish)
3966{
3967 unique_lock wl(rwlock);
3968 ldout(cct, 10) << "delete_pool " << pool << dendl;
3969
3970 if (!osdmap->have_pg_pool(pool))
3971 return -ENOENT;
3972
3973 _do_delete_pool(pool, onfinish);
3974 return 0;
3975}
3976
3977int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3978{
3979 unique_lock wl(rwlock);
3980 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3981
3982 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3983 if (pool < 0)
3984 return pool;
3985
3986 _do_delete_pool(pool, onfinish);
3987 return 0;
3988}
3989
3990void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
3991{
3992 PoolOp *op = new PoolOp;
31f18b77 3993 op->tid = ++last_tid;
7c673cae
FG
3994 op->pool = pool;
3995 op->name = "delete";
3996 op->onfinish = onfinish;
3997 op->pool_op = POOL_OP_DELETE;
3998 pool_ops[op->tid] = op;
3999 pool_op_submit(op);
4000}
4001
4002/**
4003 * change the auid owner of a pool by contacting the monitor.
4004 * This requires the current connection to have write permissions
4005 * on both the pool's current auid and the new (parameter) auid.
4006 * Uses the standard Context callback when done.
4007 */
4008int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
4009{
4010 unique_lock wl(rwlock);
4011 ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
4012 PoolOp *op = new PoolOp;
4013 if (!op) return -ENOMEM;
31f18b77 4014 op->tid = ++last_tid;
7c673cae
FG
4015 op->pool = pool;
4016 op->name = "change_pool_auid";
4017 op->onfinish = onfinish;
4018 op->pool_op = POOL_OP_AUID_CHANGE;
4019 op->auid = auid;
4020 pool_ops[op->tid] = op;
4021
4022 logger->set(l_osdc_poolop_active, pool_ops.size());
4023
4024 pool_op_submit(op);
4025 return 0;
4026}
4027
4028void Objecter::pool_op_submit(PoolOp *op)
4029{
4030 // rwlock is locked
4031 if (mon_timeout > timespan(0)) {
4032 op->ontimeout = timer.add_event(mon_timeout,
4033 [this, op]() {
4034 pool_op_cancel(op->tid, -ETIMEDOUT); });
4035 }
4036 _pool_op_submit(op);
4037}
4038
4039void Objecter::_pool_op_submit(PoolOp *op)
4040{
4041 // rwlock is locked unique
4042
4043 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4044 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4045 op->name, op->pool_op,
4046 op->auid, last_seen_osdmap_version);
4047 if (op->snapid) m->snapid = op->snapid;
4048 if (op->crush_rule) m->crush_rule = op->crush_rule;
4049 monc->send_mon_message(m);
4050 op->last_submit = ceph::mono_clock::now();
4051
4052 logger->inc(l_osdc_poolop_send);
4053}
4054
4055/**
4056 * Handle a reply to a PoolOp message. Check that we sent the message
4057 * and give the caller responsibility for the returned bufferlist.
4058 * Then either call the finisher or stash the PoolOp, depending on if we
4059 * have a new enough map.
4060 * Lastly, clean up the message and PoolOp.
4061 */
4062void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4063{
4064 FUNCTRACE();
4065 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4066 if (!initialized) {
7c673cae
FG
4067 sul.unlock();
4068 m->put();
4069 return;
4070 }
4071
4072 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4073 ceph_tid_t tid = m->get_tid();
4074 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4075 if (iter != pool_ops.end()) {
4076 PoolOp *op = iter->second;
4077 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4078 << ceph_pool_op_name(op->pool_op) << dendl;
4079 if (op->blp)
4080 op->blp->claim(m->response_data);
4081 if (m->version > last_seen_osdmap_version)
4082 last_seen_osdmap_version = m->version;
4083 if (osdmap->get_epoch() < m->epoch) {
4084 sul.unlock();
4085 sul.lock();
4086 // recheck op existence since we have let go of rwlock
4087 // (for promotion) above.
4088 iter = pool_ops.find(tid);
4089 if (iter == pool_ops.end())
4090 goto done; // op is gone.
4091 if (osdmap->get_epoch() < m->epoch) {
4092 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4093 << " before calling back" << dendl;
4094 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4095 } else {
4096 // map epoch changed, probably because a MOSDMap message
4097 // sneaked in. Do caller-specified callback now or else
4098 // we lose it forever.
4099 assert(op->onfinish);
4100 op->onfinish->complete(m->replyCode);
4101 }
4102 } else {
4103 assert(op->onfinish);
4104 op->onfinish->complete(m->replyCode);
4105 }
4106 op->onfinish = NULL;
4107 if (!sul.owns_lock()) {
4108 sul.unlock();
4109 sul.lock();
4110 }
4111 iter = pool_ops.find(tid);
4112 if (iter != pool_ops.end()) {
4113 _finish_pool_op(op, 0);
4114 }
4115 } else {
4116 ldout(cct, 10) << "unknown request " << tid << dendl;
4117 }
4118
4119done:
4120 // Not strictly necessary, since we'll release it on return.
4121 sul.unlock();
4122
4123 ldout(cct, 10) << "done" << dendl;
4124 m->put();
4125}
4126
4127int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4128{
31f18b77 4129 assert(initialized);
7c673cae
FG
4130
4131 unique_lock wl(rwlock);
4132
4133 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4134 if (it == pool_ops.end()) {
4135 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4136 return -ENOENT;
4137 }
4138
4139 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4140
4141 PoolOp *op = it->second;
4142 if (op->onfinish)
4143 op->onfinish->complete(r);
4144
4145 _finish_pool_op(op, r);
4146 return 0;
4147}
4148
4149void Objecter::_finish_pool_op(PoolOp *op, int r)
4150{
4151 // rwlock is locked unique
4152 pool_ops.erase(op->tid);
4153 logger->set(l_osdc_poolop_active, pool_ops.size());
4154
4155 if (op->ontimeout && r != -ETIMEDOUT) {
4156 timer.cancel_event(op->ontimeout);
4157 }
4158
4159 delete op;
4160}
4161
4162// pool stats
4163
4164void Objecter::get_pool_stats(list<string>& pools,
4165 map<string,pool_stat_t> *result,
4166 Context *onfinish)
4167{
4168 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4169
4170 PoolStatOp *op = new PoolStatOp;
31f18b77 4171 op->tid = ++last_tid;
7c673cae
FG
4172 op->pools = pools;
4173 op->pool_stats = result;
4174 op->onfinish = onfinish;
4175 if (mon_timeout > timespan(0)) {
4176 op->ontimeout = timer.add_event(mon_timeout,
4177 [this, op]() {
4178 pool_stat_op_cancel(op->tid,
4179 -ETIMEDOUT); });
4180 } else {
4181 op->ontimeout = 0;
4182 }
4183
4184 unique_lock wl(rwlock);
4185
4186 poolstat_ops[op->tid] = op;
4187
4188 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4189
4190 _poolstat_submit(op);
4191}
4192
4193void Objecter::_poolstat_submit(PoolStatOp *op)
4194{
4195 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4196 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4197 op->pools,
4198 last_seen_pgmap_version));
4199 op->last_submit = ceph::mono_clock::now();
4200
4201 logger->inc(l_osdc_poolstat_send);
4202}
4203
4204void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4205{
4206 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4207 ceph_tid_t tid = m->get_tid();
4208
4209 unique_lock wl(rwlock);
31f18b77 4210 if (!initialized) {
7c673cae
FG
4211 m->put();
4212 return;
4213 }
4214
4215 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4216 if (iter != poolstat_ops.end()) {
4217 PoolStatOp *op = poolstat_ops[tid];
4218 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4219 *op->pool_stats = m->pool_stats;
4220 if (m->version > last_seen_pgmap_version) {
4221 last_seen_pgmap_version = m->version;
4222 }
4223 op->onfinish->complete(0);
4224 _finish_pool_stat_op(op, 0);
4225 } else {
4226 ldout(cct, 10) << "unknown request " << tid << dendl;
4227 }
4228 ldout(cct, 10) << "done" << dendl;
4229 m->put();
4230}
4231
4232int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4233{
31f18b77 4234 assert(initialized);
7c673cae
FG
4235
4236 unique_lock wl(rwlock);
4237
4238 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4239 if (it == poolstat_ops.end()) {
4240 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4241 return -ENOENT;
4242 }
4243
4244 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4245
4246 PoolStatOp *op = it->second;
4247 if (op->onfinish)
4248 op->onfinish->complete(r);
4249 _finish_pool_stat_op(op, r);
4250 return 0;
4251}
4252
4253void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4254{
4255 // rwlock is locked unique
4256
4257 poolstat_ops.erase(op->tid);
4258 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4259
4260 if (op->ontimeout && r != -ETIMEDOUT)
4261 timer.cancel_event(op->ontimeout);
4262
4263 delete op;
4264}
4265
d2e6a577
FG
4266void Objecter::get_fs_stats(ceph_statfs& result,
4267 boost::optional<int64_t> data_pool,
4268 Context *onfinish)
7c673cae
FG
4269{
4270 ldout(cct, 10) << "get_fs_stats" << dendl;
4271 unique_lock l(rwlock);
4272
4273 StatfsOp *op = new StatfsOp;
31f18b77 4274 op->tid = ++last_tid;
7c673cae 4275 op->stats = &result;
d2e6a577 4276 op->data_pool = data_pool;
7c673cae
FG
4277 op->onfinish = onfinish;
4278 if (mon_timeout > timespan(0)) {
4279 op->ontimeout = timer.add_event(mon_timeout,
4280 [this, op]() {
4281 statfs_op_cancel(op->tid,
4282 -ETIMEDOUT); });
4283 } else {
4284 op->ontimeout = 0;
4285 }
4286 statfs_ops[op->tid] = op;
4287
4288 logger->set(l_osdc_statfs_active, statfs_ops.size());
4289
4290 _fs_stats_submit(op);
4291}
4292
4293void Objecter::_fs_stats_submit(StatfsOp *op)
4294{
4295 // rwlock is locked unique
4296
4297 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4298 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4299 op->data_pool,
7c673cae
FG
4300 last_seen_pgmap_version));
4301 op->last_submit = ceph::mono_clock::now();
4302
4303 logger->inc(l_osdc_statfs_send);
4304}
4305
4306void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4307{
4308 unique_lock wl(rwlock);
31f18b77 4309 if (!initialized) {
7c673cae
FG
4310 m->put();
4311 return;
4312 }
4313
4314 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4315 ceph_tid_t tid = m->get_tid();
4316
4317 if (statfs_ops.count(tid)) {
4318 StatfsOp *op = statfs_ops[tid];
4319 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4320 *(op->stats) = m->h.st;
4321 if (m->h.version > last_seen_pgmap_version)
4322 last_seen_pgmap_version = m->h.version;
4323 op->onfinish->complete(0);
4324 _finish_statfs_op(op, 0);
4325 } else {
4326 ldout(cct, 10) << "unknown request " << tid << dendl;
4327 }
4328 m->put();
4329 ldout(cct, 10) << "done" << dendl;
4330}
4331
4332int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4333{
31f18b77 4334 assert(initialized);
7c673cae
FG
4335
4336 unique_lock wl(rwlock);
4337
4338 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4339 if (it == statfs_ops.end()) {
4340 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4341 return -ENOENT;
4342 }
4343
4344 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4345
4346 StatfsOp *op = it->second;
4347 if (op->onfinish)
4348 op->onfinish->complete(r);
4349 _finish_statfs_op(op, r);
4350 return 0;
4351}
4352
4353void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4354{
4355 // rwlock is locked unique
4356
4357 statfs_ops.erase(op->tid);
4358 logger->set(l_osdc_statfs_active, statfs_ops.size());
4359
4360 if (op->ontimeout && r != -ETIMEDOUT)
4361 timer.cancel_event(op->ontimeout);
4362
4363 delete op;
4364}
4365
4366// scatter/gather
4367
4368void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4369 vector<bufferlist>& resultbl,
4370 bufferlist *bl, Context *onfinish)
4371{
4372 // all done
4373 ldout(cct, 15) << "_sg_read_finish" << dendl;
4374
4375 if (extents.size() > 1) {
4376 Striper::StripedReadResult r;
4377 vector<bufferlist>::iterator bit = resultbl.begin();
4378 for (vector<ObjectExtent>::iterator eit = extents.begin();
4379 eit != extents.end();
4380 ++eit, ++bit) {
4381 r.add_partial_result(cct, *bit, eit->buffer_extents);
4382 }
4383 bl->clear();
4384 r.assemble_result(cct, *bl, false);
4385 } else {
4386 ldout(cct, 15) << " only one frag" << dendl;
4387 bl->claim(resultbl[0]);
4388 }
4389
4390 // done
4391 uint64_t bytes_read = bl->length();
4392 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4393
4394 if (onfinish) {
4395 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4396 }
4397}
4398
4399
4400void Objecter::ms_handle_connect(Connection *con)
4401{
4402 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4403 if (!initialized)
7c673cae
FG
4404 return;
4405
4406 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4407 resend_mon_ops();
4408}
4409
4410bool Objecter::ms_handle_reset(Connection *con)
4411{
31f18b77 4412 if (!initialized)
7c673cae
FG
4413 return false;
4414 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
4415 OSDSession *session = static_cast<OSDSession*>(con->get_priv());
4416 if (session) {
4417 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4418 << " osd." << session->osd << dendl;
4419 unique_lock wl(rwlock);
31f18b77 4420 if (!initialized) {
7c673cae
FG
4421 wl.unlock();
4422 return false;
4423 }
4424 map<uint64_t, LingerOp *> lresend;
4425 OSDSession::unique_lock sl(session->lock);
4426 _reopen_session(session);
4427 _kick_requests(session, lresend);
4428 sl.unlock();
4429 _linger_ops_resend(lresend, wl);
4430 wl.unlock();
4431 maybe_request_map();
4432 session->put();
4433 }
4434 return true;
4435 }
4436 return false;
4437}
4438
4439void Objecter::ms_handle_remote_reset(Connection *con)
4440{
4441 /*
4442 * treat these the same.
4443 */
4444 ms_handle_reset(con);
4445}
4446
4447bool Objecter::ms_handle_refused(Connection *con)
4448{
4449 // just log for now
4450 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4451 int osd = osdmap->identify_osd(con->get_peer_addr());
4452 if (osd >= 0) {
4453 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4454 }
4455 }
4456 return false;
4457}
4458
4459bool Objecter::ms_get_authorizer(int dest_type,
4460 AuthAuthorizer **authorizer,
4461 bool force_new)
4462{
31f18b77 4463 if (!initialized)
7c673cae
FG
4464 return false;
4465 if (dest_type == CEPH_ENTITY_TYPE_MON)
4466 return true;
4467 *authorizer = monc->build_authorizer(dest_type);
4468 return *authorizer != NULL;
4469}
4470
4471void Objecter::op_target_t::dump(Formatter *f) const
4472{
4473 f->dump_stream("pg") << pgid;
4474 f->dump_int("osd", osd);
4475 f->dump_stream("object_id") << base_oid;
4476 f->dump_stream("object_locator") << base_oloc;
4477 f->dump_stream("target_object_id") << target_oid;
4478 f->dump_stream("target_object_locator") << target_oloc;
4479 f->dump_int("paused", (int)paused);
4480 f->dump_int("used_replica", (int)used_replica);
4481 f->dump_int("precalc_pgid", (int)precalc_pgid);
4482}
4483
4484void Objecter::_dump_active(OSDSession *s)
4485{
4486 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4487 p != s->ops.end();
4488 ++p) {
4489 Op *op = p->second;
4490 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4491 << "\tosd." << (op->session ? op->session->osd : -1)
4492 << "\t" << op->target.base_oid
4493 << "\t" << op->ops << dendl;
4494 }
4495}
4496
4497void Objecter::_dump_active()
4498{
31f18b77 4499 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae
FG
4500 << dendl;
4501 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4502 siter != osd_sessions.end(); ++siter) {
4503 OSDSession *s = siter->second;
4504 OSDSession::shared_lock sl(s->lock);
4505 _dump_active(s);
4506 sl.unlock();
4507 }
4508 _dump_active(homeless_session);
4509}
4510
4511void Objecter::dump_active()
4512{
4513 shared_lock rl(rwlock);
4514 _dump_active();
4515 rl.unlock();
4516}
4517
4518void Objecter::dump_requests(Formatter *fmt)
4519{
4520 // Read-lock on Objecter held here
4521 fmt->open_object_section("requests");
4522 dump_ops(fmt);
4523 dump_linger_ops(fmt);
4524 dump_pool_ops(fmt);
4525 dump_pool_stat_ops(fmt);
4526 dump_statfs_ops(fmt);
4527 dump_command_ops(fmt);
4528 fmt->close_section(); // requests object
4529}
4530
4531void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4532{
4533 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4534 p != s->ops.end();
4535 ++p) {
4536 Op *op = p->second;
4537 fmt->open_object_section("op");
4538 fmt->dump_unsigned("tid", op->tid);
4539 op->target.dump(fmt);
4540 fmt->dump_stream("last_sent") << op->stamp;
4541 fmt->dump_int("attempts", op->attempts);
4542 fmt->dump_stream("snapid") << op->snapid;
4543 fmt->dump_stream("snap_context") << op->snapc;
4544 fmt->dump_stream("mtime") << op->mtime;
4545
4546 fmt->open_array_section("osd_ops");
4547 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4548 it != op->ops.end();
4549 ++it) {
4550 fmt->dump_stream("osd_op") << *it;
4551 }
4552 fmt->close_section(); // osd_ops array
4553
4554 fmt->close_section(); // op object
4555 }
4556}
4557
4558void Objecter::dump_ops(Formatter *fmt)
4559{
4560 // Read-lock on Objecter held
4561 fmt->open_array_section("ops");
4562 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4563 siter != osd_sessions.end(); ++siter) {
4564 OSDSession *s = siter->second;
4565 OSDSession::shared_lock sl(s->lock);
4566 _dump_ops(s, fmt);
4567 sl.unlock();
4568 }
4569 _dump_ops(homeless_session, fmt);
4570 fmt->close_section(); // ops array
4571}
4572
4573void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4574{
4575 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4576 p != s->linger_ops.end();
4577 ++p) {
4578 LingerOp *op = p->second;
4579 fmt->open_object_section("linger_op");
4580 fmt->dump_unsigned("linger_id", op->linger_id);
4581 op->target.dump(fmt);
4582 fmt->dump_stream("snapid") << op->snap;
4583 fmt->dump_stream("registered") << op->registered;
4584 fmt->close_section(); // linger_op object
4585 }
4586}
4587
4588void Objecter::dump_linger_ops(Formatter *fmt)
4589{
4590 // We have a read-lock on the objecter
4591 fmt->open_array_section("linger_ops");
4592 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4593 siter != osd_sessions.end(); ++siter) {
4594 OSDSession *s = siter->second;
4595 OSDSession::shared_lock sl(s->lock);
4596 _dump_linger_ops(s, fmt);
4597 sl.unlock();
4598 }
4599 _dump_linger_ops(homeless_session, fmt);
4600 fmt->close_section(); // linger_ops array
4601}
4602
4603void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4604{
4605 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4606 p != s->command_ops.end();
4607 ++p) {
4608 CommandOp *op = p->second;
4609 fmt->open_object_section("command_op");
4610 fmt->dump_unsigned("command_id", op->tid);
4611 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4612 fmt->open_array_section("command");
4613 for (vector<string>::const_iterator q = op->cmd.begin();
4614 q != op->cmd.end(); ++q)
4615 fmt->dump_string("word", *q);
4616 fmt->close_section();
4617 if (op->target_osd >= 0)
4618 fmt->dump_int("target_osd", op->target_osd);
4619 else
4620 fmt->dump_stream("target_pg") << op->target_pg;
4621 fmt->close_section(); // command_op object
4622 }
4623}
4624
4625void Objecter::dump_command_ops(Formatter *fmt)
4626{
4627 // We have a read-lock on the Objecter here
4628 fmt->open_array_section("command_ops");
4629 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4630 siter != osd_sessions.end(); ++siter) {
4631 OSDSession *s = siter->second;
4632 OSDSession::shared_lock sl(s->lock);
4633 _dump_command_ops(s, fmt);
4634 sl.unlock();
4635 }
4636 _dump_command_ops(homeless_session, fmt);
4637 fmt->close_section(); // command_ops array
4638}
4639
4640void Objecter::dump_pool_ops(Formatter *fmt) const
4641{
4642 fmt->open_array_section("pool_ops");
4643 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4644 p != pool_ops.end();
4645 ++p) {
4646 PoolOp *op = p->second;
4647 fmt->open_object_section("pool_op");
4648 fmt->dump_unsigned("tid", op->tid);
4649 fmt->dump_int("pool", op->pool);
4650 fmt->dump_string("name", op->name);
4651 fmt->dump_int("operation_type", op->pool_op);
4652 fmt->dump_unsigned("auid", op->auid);
4653 fmt->dump_unsigned("crush_rule", op->crush_rule);
4654 fmt->dump_stream("snapid") << op->snapid;
4655 fmt->dump_stream("last_sent") << op->last_submit;
4656 fmt->close_section(); // pool_op object
4657 }
4658 fmt->close_section(); // pool_ops array
4659}
4660
4661void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4662{
4663 fmt->open_array_section("pool_stat_ops");
4664 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4665 p != poolstat_ops.end();
4666 ++p) {
4667 PoolStatOp *op = p->second;
4668 fmt->open_object_section("pool_stat_op");
4669 fmt->dump_unsigned("tid", op->tid);
4670 fmt->dump_stream("last_sent") << op->last_submit;
4671
4672 fmt->open_array_section("pools");
4673 for (list<string>::const_iterator it = op->pools.begin();
4674 it != op->pools.end();
4675 ++it) {
4676 fmt->dump_string("pool", *it);
4677 }
4678 fmt->close_section(); // pools array
4679
4680 fmt->close_section(); // pool_stat_op object
4681 }
4682 fmt->close_section(); // pool_stat_ops array
4683}
4684
4685void Objecter::dump_statfs_ops(Formatter *fmt) const
4686{
4687 fmt->open_array_section("statfs_ops");
4688 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4689 p != statfs_ops.end();
4690 ++p) {
4691 StatfsOp *op = p->second;
4692 fmt->open_object_section("statfs_op");
4693 fmt->dump_unsigned("tid", op->tid);
4694 fmt->dump_stream("last_sent") << op->last_submit;
4695 fmt->close_section(); // statfs_op object
4696 }
4697 fmt->close_section(); // statfs_ops array
4698}
4699
4700Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4701 m_objecter(objecter)
4702{
4703}
4704
4705bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
4706 std::string format, bufferlist& out)
4707{
4708 Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
4709 shared_lock rl(m_objecter->rwlock);
4710 m_objecter->dump_requests(f);
4711 f->flush(out);
4712 delete f;
4713 return true;
4714}
4715
4716void Objecter::blacklist_self(bool set)
4717{
4718 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4719
4720 vector<string> cmd;
4721 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4722 if (set)
4723 cmd.push_back("\"blacklistop\":\"add\",");
4724 else
4725 cmd.push_back("\"blacklistop\":\"rm\",");
4726 stringstream ss;
4727 ss << messenger->get_myaddr();
4728 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4729
4730 MMonCommand *m = new MMonCommand(monc->get_fsid());
4731 m->cmd = cmd;
4732
4733 monc->send_mon_message(m);
4734}
4735
4736// commands
4737
4738void Objecter::handle_command_reply(MCommandReply *m)
4739{
4740 unique_lock wl(rwlock);
31f18b77 4741 if (!initialized) {
7c673cae
FG
4742 m->put();
4743 return;
4744 }
4745
4746 ConnectionRef con = m->get_connection();
4747 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
4748 if (!s || s->con != con) {
4749 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4750 m->put();
4751 if (s)
4752 s->put();
4753 return;
4754 }
4755
4756 OSDSession::shared_lock sl(s->lock);
4757 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4758 if (p == s->command_ops.end()) {
4759 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4760 << " not found" << dendl;
4761 m->put();
4762 sl.unlock();
4763 if (s)
4764 s->put();
4765 return;
4766 }
4767
4768 CommandOp *c = p->second;
4769 if (!c->session ||
4770 m->get_connection() != c->session->con) {
4771 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4772 << " got reply from wrong connection "
4773 << m->get_connection() << " " << m->get_source_inst()
4774 << dendl;
4775 m->put();
4776 sl.unlock();
4777 if (s)
4778 s->put();
4779 return;
4780 }
4781 if (c->poutbl) {
4782 c->poutbl->claim(m->get_data());
4783 }
4784
4785 sl.unlock();
4786
28e407b8 4787 OSDSession::unique_lock sul(s->lock);
7c673cae 4788 _finish_command(c, m->r, m->rs);
28e407b8
AA
4789 sul.unlock();
4790
7c673cae
FG
4791 m->put();
4792 if (s)
4793 s->put();
4794}
4795
4796void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4797{
4798 shunique_lock sul(rwlock, ceph::acquire_unique);
4799
31f18b77 4800 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4801 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4802 c->tid = tid;
4803
4804 {
4805 OSDSession::unique_lock hs_wl(homeless_session->lock);
4806 _session_command_op_assign(homeless_session, c);
4807 }
4808
4809 _calc_command_target(c, sul);
4810 _assign_command_session(c, sul);
4811 if (osd_timeout > timespan(0)) {
4812 c->ontimeout = timer.add_event(osd_timeout,
4813 [this, c, tid]() {
4814 command_op_cancel(c->session, tid,
4815 -ETIMEDOUT); });
4816 }
4817
4818 if (!c->session->is_homeless()) {
4819 _send_command(c);
4820 } else {
4821 _maybe_request_map();
4822 }
4823 if (c->map_check_error)
4824 _send_command_map_check(c);
4825 *ptid = tid;
4826
4827 logger->inc(l_osdc_command_active);
4828}
4829
4830int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4831{
4832 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4833
4834 c->map_check_error = 0;
4835
4836 // ignore overlays, just like we do with pg ops
4837 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4838
4839 if (c->target_osd >= 0) {
4840 if (!osdmap->exists(c->target_osd)) {
4841 c->map_check_error = -ENOENT;
4842 c->map_check_error_str = "osd dne";
4843 c->target.osd = -1;
4844 return RECALC_OP_TARGET_OSD_DNE;
4845 }
4846 if (osdmap->is_down(c->target_osd)) {
4847 c->map_check_error = -ENXIO;
4848 c->map_check_error_str = "osd down";
4849 c->target.osd = -1;
4850 return RECALC_OP_TARGET_OSD_DOWN;
4851 }
4852 c->target.osd = c->target_osd;
4853 } else {
4854 int ret = _calc_target(&(c->target), nullptr, true);
4855 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4856 c->map_check_error = -ENOENT;
4857 c->map_check_error_str = "pool dne";
4858 c->target.osd = -1;
4859 return ret;
4860 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4861 c->map_check_error = -ENXIO;
4862 c->map_check_error_str = "osd down";
4863 c->target.osd = -1;
4864 return ret;
4865 }
4866 }
4867
4868 OSDSession *s;
4869 int r = _get_session(c->target.osd, &s, sul);
4870 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4871
4872 if (c->session != s) {
4873 put_session(s);
4874 return RECALC_OP_TARGET_NEED_RESEND;
4875 }
4876
4877 put_session(s);
4878
4879 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4880 << c->session << dendl;
4881
4882 return RECALC_OP_TARGET_NO_ACTION;
4883}
4884
4885void Objecter::_assign_command_session(CommandOp *c,
4886 shunique_lock& sul)
4887{
4888 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4889
4890 OSDSession *s;
4891 int r = _get_session(c->target.osd, &s, sul);
4892 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4893
4894 if (c->session != s) {
4895 if (c->session) {
4896 OSDSession *cs = c->session;
4897 OSDSession::unique_lock csl(cs->lock);
4898 _session_command_op_remove(c->session, c);
4899 csl.unlock();
4900 }
4901 OSDSession::unique_lock sl(s->lock);
4902 _session_command_op_assign(s, c);
4903 }
4904
4905 put_session(s);
4906}
4907
4908void Objecter::_send_command(CommandOp *c)
4909{
4910 ldout(cct, 10) << "_send_command " << c->tid << dendl;
4911 assert(c->session);
4912 assert(c->session->con);
4913 MCommand *m = new MCommand(monc->monmap.fsid);
4914 m->cmd = c->cmd;
4915 m->set_data(c->inbl);
4916 m->set_tid(c->tid);
4917 c->session->con->send_message(m);
4918 logger->inc(l_osdc_command_send);
4919}
4920
4921int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4922{
31f18b77 4923 assert(initialized);
7c673cae
FG
4924
4925 unique_lock wl(rwlock);
4926
4927 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4928 if (it == s->command_ops.end()) {
4929 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4930 return -ENOENT;
4931 }
4932
4933 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4934
4935 CommandOp *op = it->second;
4936 _command_cancel_map_check(op);
28e407b8 4937 OSDSession::unique_lock sl(op->session->lock);
7c673cae 4938 _finish_command(op, r, "");
28e407b8 4939 sl.unlock();
7c673cae
FG
4940 return 0;
4941}
4942
4943void Objecter::_finish_command(CommandOp *c, int r, string rs)
4944{
4945 // rwlock is locked unique
28e407b8 4946 // session lock is locked
7c673cae
FG
4947
4948 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4949 << rs << dendl;
4950 if (c->prs)
4951 *c->prs = rs;
4952 if (c->onfinish)
4953 c->onfinish->complete(r);
4954
4955 if (c->ontimeout && r != -ETIMEDOUT)
4956 timer.cancel_event(c->ontimeout);
4957
4958 OSDSession *s = c->session;
7c673cae 4959 _session_command_op_remove(c->session, c);
7c673cae
FG
4960
4961 c->put();
4962
4963 logger->dec(l_osdc_command_active);
4964}
4965
4966Objecter::OSDSession::~OSDSession()
4967{
4968 // Caller is responsible for re-assigning or
4969 // destroying any ops that were assigned to us
4970 assert(ops.empty());
4971 assert(linger_ops.empty());
4972 assert(command_ops.empty());
4973}
4974
4975Objecter::~Objecter()
4976{
4977 delete osdmap;
4978
4979 assert(homeless_session->get_nref() == 1);
31f18b77 4980 assert(num_homeless_ops == 0);
7c673cae
FG
4981 homeless_session->put();
4982
4983 assert(osd_sessions.empty());
4984 assert(poolstat_ops.empty());
4985 assert(statfs_ops.empty());
4986 assert(pool_ops.empty());
4987 assert(waiting_for_map.empty());
4988 assert(linger_ops.empty());
4989 assert(check_latest_map_lingers.empty());
4990 assert(check_latest_map_ops.empty());
4991 assert(check_latest_map_commands.empty());
4992
4993 assert(!m_request_state_hook);
4994 assert(!logger);
4995}
4996
4997/**
4998 * Wait until this OSD map epoch is received before
4999 * sending any more operations to OSDs. Use this
5000 * when it is known that the client can't trust
5001 * anything from before this epoch (e.g. due to
5002 * client blacklist at this epoch).
5003 */
5004void Objecter::set_epoch_barrier(epoch_t epoch)
5005{
5006 unique_lock wl(rwlock);
5007
5008 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5009 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5010 << dendl;
5011 if (epoch > epoch_barrier) {
5012 epoch_barrier = epoch;
5013 _maybe_request_map();
5014 }
5015}
5016
5017
5018
5019hobject_t Objecter::enumerate_objects_begin()
5020{
5021 return hobject_t();
5022}
5023
5024hobject_t Objecter::enumerate_objects_end()
5025{
5026 return hobject_t::get_max();
5027}
5028
5029struct C_EnumerateReply : public Context {
5030 bufferlist bl;
5031
5032 Objecter *objecter;
5033 hobject_t *next;
5034 std::list<librados::ListObjectImpl> *result;
5035 const hobject_t end;
5036 const int64_t pool_id;
5037 Context *on_finish;
5038
5039 epoch_t epoch;
5040 int budget;
5041
5042 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5043 std::list<librados::ListObjectImpl> *result_,
5044 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5045 objecter(objecter_), next(next_), result(result_),
5046 end(end_), pool_id(pool_id_), on_finish(on_finish_),
5047 epoch(0), budget(0)
5048 {}
5049
5050 void finish(int r) override {
5051 objecter->_enumerate_reply(
5052 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5053 }
5054};
5055
5056void Objecter::enumerate_objects(
5057 int64_t pool_id,
5058 const std::string &ns,
5059 const hobject_t &start,
5060 const hobject_t &end,
5061 const uint32_t max,
5062 const bufferlist &filter_bl,
5063 std::list<librados::ListObjectImpl> *result,
5064 hobject_t *next,
5065 Context *on_finish)
5066{
5067 assert(result);
5068
5069 if (!end.is_max() && start > end) {
5070 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5071 on_finish->complete(-EINVAL);
5072 return;
5073 }
5074
5075 if (max < 1) {
5076 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5077 on_finish->complete(-EINVAL);
5078 return;
5079 }
5080
5081 if (start.is_max()) {
5082 on_finish->complete(0);
5083 return;
5084 }
5085
5086 shared_lock rl(rwlock);
5087 assert(osdmap->get_epoch());
5088 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5089 rl.unlock();
5090 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5091 on_finish->complete(-EOPNOTSUPP);
5092 return;
5093 }
5094 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5095 if (!p) {
5096 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5097 << osdmap->get_epoch() << dendl;
5098 rl.unlock();
5099 on_finish->complete(-ENOENT);
5100 return;
5101 } else {
5102 rl.unlock();
5103 }
5104
5105 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5106
5107 // Stash completion state
5108 C_EnumerateReply *on_ack = new C_EnumerateReply(
5109 this, next, result, end, pool_id, on_finish);
5110
5111 ObjectOperation op;
5112 op.pg_nls(max, filter_bl, start, 0);
5113
5114 // Issue. See you later in _enumerate_reply
5115 object_locator_t oloc(pool_id, ns);
5116 pg_read(start.get_hash(), oloc, op,
5117 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5118}
5119
5120void Objecter::_enumerate_reply(
5121 bufferlist &bl,
5122 int r,
5123 const hobject_t &end,
5124 const int64_t pool_id,
5125 int budget,
5126 epoch_t reply_epoch,
5127 std::list<librados::ListObjectImpl> *result,
5128 hobject_t *next,
5129 Context *on_finish)
5130{
5131 if (budget > 0) {
5132 put_op_budget_bytes(budget);
5133 }
5134
5135 if (r < 0) {
5136 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5137 on_finish->complete(r);
5138 return;
5139 }
5140
5141 assert(next != NULL);
5142
5143 // Decode the results
5144 bufferlist::iterator iter = bl.begin();
5145 pg_nls_response_t response;
5146
5147 // XXX extra_info doesn't seem used anywhere?
5148 bufferlist extra_info;
5149 ::decode(response, iter);
5150 if (!iter.end()) {
5151 ::decode(extra_info, iter);
5152 }
5153
5154 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5155 << " handle " << response.handle
5156 << " reply_epoch " << reply_epoch << dendl;
5157 ldout(cct, 20) << __func__ << ": response.entries.size "
5158 << response.entries.size() << ", response.entries "
5159 << response.entries << dendl;
5160 if (response.handle <= end) {
5161 *next = response.handle;
5162 } else {
5163 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5164 << dendl;
5165 *next = end;
5166
5167 // drop anything after 'end'
5168 shared_lock rl(rwlock);
5169 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5170 if (!pool) {
5171 // pool is gone, drop any results which are now meaningless.
5172 rl.unlock();
5173 on_finish->complete(-ENOENT);
5174 return;
5175 }
5176 while (!response.entries.empty()) {
5177 uint32_t hash = response.entries.back().locator.empty() ?
5178 pool->hash_key(response.entries.back().oid,
5179 response.entries.back().nspace) :
5180 pool->hash_key(response.entries.back().locator,
5181 response.entries.back().nspace);
5182 hobject_t last(response.entries.back().oid,
5183 response.entries.back().locator,
5184 CEPH_NOSNAP,
5185 hash,
5186 pool_id,
5187 response.entries.back().nspace);
5188 if (last < end)
5189 break;
5190 ldout(cct, 20) << __func__ << " dropping item " << last
5191 << " >= end " << end << dendl;
5192 response.entries.pop_back();
5193 }
5194 rl.unlock();
5195 }
5196 if (!response.entries.empty()) {
5197 result->merge(response.entries);
5198 }
5199
5200 // release the listing context's budget once all
5201 // OPs (in the session) are finished
5202#if 0
5203 put_nlist_context_budget(list_context);
5204#endif
5205 on_finish->complete(r);
5206 return;
5207}
5208
5209namespace {
5210 using namespace librados;
5211
5212 template <typename T>
5213 void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
5214 {
5215 for (auto bl : bls) {
5216 auto p = bl.begin();
5217 T t;
5218 decode(t, p);
5219 items.push_back(t);
5220 }
5221 }
5222
5223 struct C_ObjectOperation_scrub_ls : public Context {
5224 bufferlist bl;
5225 uint32_t *interval;
5226 std::vector<inconsistent_obj_t> *objects = nullptr;
5227 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5228 int *rval;
5229
5230 C_ObjectOperation_scrub_ls(uint32_t *interval,
5231 std::vector<inconsistent_obj_t> *objects,
5232 int *rval)
5233 : interval(interval), objects(objects), rval(rval) {}
5234 C_ObjectOperation_scrub_ls(uint32_t *interval,
5235 std::vector<inconsistent_snapset_t> *snapsets,
5236 int *rval)
5237 : interval(interval), snapsets(snapsets), rval(rval) {}
5238 void finish(int r) override {
5239 if (r < 0 && r != -EAGAIN) {
5240 if (rval)
5241 *rval = r;
5242 return;
5243 }
5244
5245 if (rval)
5246 *rval = 0;
5247
5248 try {
5249 decode();
5250 } catch (buffer::error&) {
5251 if (rval)
5252 *rval = -EIO;
5253 }
5254 }
5255 private:
5256 void decode() {
5257 scrub_ls_result_t result;
5258 auto p = bl.begin();
5259 result.decode(p);
5260 *interval = result.interval;
5261 if (objects) {
5262 do_decode(*objects, result.vals);
5263 } else {
5264 do_decode(*snapsets, result.vals);
5265 }
5266 }
5267 };
5268
5269 template <typename T>
5270 void do_scrub_ls(::ObjectOperation *op,
5271 const scrub_ls_arg_t& arg,
5272 std::vector<T> *items,
5273 uint32_t *interval,
5274 int *rval)
5275 {
5276 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5277 op->flags |= CEPH_OSD_FLAG_PGOP;
5278 assert(interval);
5279 arg.encode(osd_op.indata);
5280 unsigned p = op->ops.size() - 1;
5281 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5282 op->out_handler[p] = h;
5283 op->out_bl[p] = &h->bl;
5284 op->out_rval[p] = rval;
5285 }
5286}
5287
5288void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5289 uint64_t max_to_get,
5290 std::vector<librados::inconsistent_obj_t> *objects,
5291 uint32_t *interval,
5292 int *rval)
5293{
5294 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5295 do_scrub_ls(this, arg, objects, interval, rval);
5296}
5297
5298void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5299 uint64_t max_to_get,
5300 std::vector<librados::inconsistent_snapset_t> *snapsets,
5301 uint32_t *interval,
5302 int *rval)
5303{
5304 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5305 do_scrub_ls(this, arg, snapsets, interval, rval);
5306}