]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
update sources to v12.1.3
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Objecter.h"
16#include "osd/OSDMap.h"
17#include "Filer.h"
18
19#include "mon/MonClient.h"
20
21#include "msg/Messenger.h"
22#include "msg/Message.h"
23
24#include "messages/MPing.h"
25#include "messages/MOSDOp.h"
26#include "messages/MOSDOpReply.h"
27#include "messages/MOSDBackoff.h"
28#include "messages/MOSDMap.h"
29
30#include "messages/MPoolOp.h"
31#include "messages/MPoolOpReply.h"
32
33#include "messages/MGetPoolStats.h"
34#include "messages/MGetPoolStatsReply.h"
35#include "messages/MStatfs.h"
36#include "messages/MStatfsReply.h"
37
38#include "messages/MMonCommand.h"
39
40#include "messages/MCommand.h"
41#include "messages/MCommandReply.h"
42
43#include "messages/MWatchNotify.h"
44
45#include <errno.h>
46
47#include "common/config.h"
48#include "common/perf_counters.h"
49#include "common/scrub_types.h"
50#include "include/str_list.h"
51#include "common/errno.h"
52#include "common/EventTrace.h"
53
54using ceph::real_time;
55using ceph::real_clock;
56
57using ceph::mono_clock;
58using ceph::mono_time;
59
60using ceph::timespan;
61
62
63#define dout_subsys ceph_subsys_objecter
64#undef dout_prefix
65#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
66
67
68enum {
69 l_osdc_first = 123200,
70 l_osdc_op_active,
71 l_osdc_op_laggy,
72 l_osdc_op_send,
73 l_osdc_op_send_bytes,
74 l_osdc_op_resend,
75 l_osdc_op_reply,
76
77 l_osdc_op,
78 l_osdc_op_r,
79 l_osdc_op_w,
80 l_osdc_op_rmw,
81 l_osdc_op_pg,
82
83 l_osdc_osdop_stat,
84 l_osdc_osdop_create,
85 l_osdc_osdop_read,
86 l_osdc_osdop_write,
87 l_osdc_osdop_writefull,
88 l_osdc_osdop_writesame,
89 l_osdc_osdop_append,
90 l_osdc_osdop_zero,
91 l_osdc_osdop_truncate,
92 l_osdc_osdop_delete,
93 l_osdc_osdop_mapext,
94 l_osdc_osdop_sparse_read,
95 l_osdc_osdop_clonerange,
96 l_osdc_osdop_getxattr,
97 l_osdc_osdop_setxattr,
98 l_osdc_osdop_cmpxattr,
99 l_osdc_osdop_rmxattr,
100 l_osdc_osdop_resetxattrs,
101 l_osdc_osdop_tmap_up,
102 l_osdc_osdop_tmap_put,
103 l_osdc_osdop_tmap_get,
104 l_osdc_osdop_call,
105 l_osdc_osdop_watch,
106 l_osdc_osdop_notify,
107 l_osdc_osdop_src_cmpxattr,
108 l_osdc_osdop_pgls,
109 l_osdc_osdop_pgls_filter,
110 l_osdc_osdop_other,
111
112 l_osdc_linger_active,
113 l_osdc_linger_send,
114 l_osdc_linger_resend,
115 l_osdc_linger_ping,
116
117 l_osdc_poolop_active,
118 l_osdc_poolop_send,
119 l_osdc_poolop_resend,
120
121 l_osdc_poolstat_active,
122 l_osdc_poolstat_send,
123 l_osdc_poolstat_resend,
124
125 l_osdc_statfs_active,
126 l_osdc_statfs_send,
127 l_osdc_statfs_resend,
128
129 l_osdc_command_active,
130 l_osdc_command_send,
131 l_osdc_command_resend,
132
133 l_osdc_map_epoch,
134 l_osdc_map_full,
135 l_osdc_map_inc,
136
137 l_osdc_osd_sessions,
138 l_osdc_osd_session_open,
139 l_osdc_osd_session_close,
140 l_osdc_osd_laggy,
141
142 l_osdc_osdop_omap_wr,
143 l_osdc_osdop_omap_rd,
144 l_osdc_osdop_omap_del,
145
146 l_osdc_last,
147};
148
149
150// config obs ----------------------------
151
152static const char *config_keys[] = {
153 "crush_location",
154 NULL
155};
156
157class Objecter::RequestStateHook : public AdminSocketHook {
158 Objecter *m_objecter;
159public:
160 explicit RequestStateHook(Objecter *objecter);
161 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
162 bufferlist& out) override;
163};
164
165/**
166 * This is a more limited form of C_Contexts, but that requires
167 * a ceph_context which we don't have here.
168 */
169class ObjectOperation::C_TwoContexts : public Context {
170 Context *first;
171 Context *second;
172public:
173 C_TwoContexts(Context *first, Context *second) :
174 first(first), second(second) {}
175 void finish(int r) override {
176 first->complete(r);
177 second->complete(r);
178 first = NULL;
179 second = NULL;
180 }
181
182 ~C_TwoContexts() override {
183 delete first;
184 delete second;
185 }
186};
187
188void ObjectOperation::add_handler(Context *extra) {
189 size_t last = out_handler.size() - 1;
190 Context *orig = out_handler[last];
191 if (orig) {
192 Context *wrapper = new C_TwoContexts(orig, extra);
193 out_handler[last] = wrapper;
194 } else {
195 out_handler[last] = extra;
196 }
197}
198
199Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
200 object_t& oid)
201{
202 if (oid.name.empty())
203 return unique_completion_lock();
204
205 static constexpr uint32_t HASH_PRIME = 1021;
206 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
207 % HASH_PRIME;
208
209 return unique_completion_lock(completion_locks[h % num_locks],
210 std::defer_lock);
211}
212
213const char** Objecter::get_tracked_conf_keys() const
214{
215 return config_keys;
216}
217
218
219void Objecter::handle_conf_change(const struct md_config_t *conf,
220 const std::set <std::string> &changed)
221{
222 if (changed.count("crush_location")) {
223 update_crush_location();
224 }
225}
226
227void Objecter::update_crush_location()
228{
229 unique_lock wl(rwlock);
230 crush_location = cct->crush_location.get_location();
231}
232
233// messages ------------------------------
234
235/*
236 * initialize only internal data structures, don't initiate cluster interaction
237 */
238void Objecter::init()
239{
31f18b77 240 assert(!initialized);
7c673cae
FG
241
242 if (!logger) {
243 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
244
245 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
246 PerfCountersBuilder::PRIO_CRITICAL);
247 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
248 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
249 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
250 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
251 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
252
253 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
254 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
255 PerfCountersBuilder::PRIO_CRITICAL);
256 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
257 PerfCountersBuilder::PRIO_CRITICAL);
258 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
259 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
260 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
261
262 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
263 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
264 "Create object operations");
265 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
266 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
267 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
268 "Write full object operations");
269 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
270 "Write same operations");
271 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
272 "Append operation");
273 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
274 "Set object to zero operations");
275 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
276 "Truncate object operations");
277 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
278 "Delete object operations");
279 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
280 "Map extent operations");
281 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
282 "Sparse read operations");
283 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
284 "Clone range operations");
285 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
286 "Get xattr operations");
287 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
288 "Set xattr operations");
289 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
290 "Xattr comparison operations");
291 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
292 "Remove xattr operations");
293 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
294 "Reset xattr operations");
295 pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
296 "TMAP update operations");
297 pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
298 "TMAP put operations");
299 pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
300 "TMAP get operations");
301 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
302 "Call (execute) operations");
303 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
304 "Watch by object operations");
305 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
306 "Notify about object operations");
307 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
308 "Extended attribute comparison in multi operations");
309 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
310 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
311 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
312
313 pcb.add_u64(l_osdc_linger_active, "linger_active",
314 "Active lingering operations");
315 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
316 "Sent lingering operations");
317 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
318 "Resent lingering operations");
319 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
320 "Sent pings to lingering operations");
321
322 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
323 "Active pool operations");
324 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
325 "Sent pool operations");
326 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
327 "Resent pool operations");
328
329 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
330 "Active get pool stat operations");
331 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
332 "Pool stat operations sent");
333 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
334 "Resent pool stats");
335
336 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
337 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
338 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
339 "Resent FS stats");
340
341 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
342 pcb.add_u64_counter(l_osdc_command_send, "command_send",
343 "Sent commands");
344 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
345 "Resent commands");
346
347 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
348 pcb.add_u64_counter(l_osdc_map_full, "map_full",
349 "Full OSD maps received");
350 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
351 "Incremental OSD maps received");
352
353 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
354 "Open sessions"); // open sessions
355 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
356 "Sessions opened");
357 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
358 "Sessions closed");
359 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
360
361 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
362 "OSD OMAP write operations");
363 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
364 "OSD OMAP read operations");
365 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
366 "OSD OMAP delete operations");
367
368 logger = pcb.create_perf_counters();
369 cct->get_perfcounters_collection()->add(logger);
370 }
371
372 m_request_state_hook = new RequestStateHook(this);
373 AdminSocket* admin_socket = cct->get_admin_socket();
374 int ret = admin_socket->register_command("objecter_requests",
375 "objecter_requests",
376 m_request_state_hook,
377 "show in-progress osd requests");
378
379 /* Don't warn on EEXIST, happens if multiple ceph clients
380 * are instantiated from one process */
381 if (ret < 0 && ret != -EEXIST) {
382 lderr(cct) << "error registering admin socket command: "
383 << cpp_strerror(ret) << dendl;
384 }
385
386 update_crush_location();
387
388 cct->_conf->add_observer(this);
389
31f18b77 390 initialized = true;
7c673cae
FG
391}
392
393/*
394 * ok, cluster interaction can happen
395 */
396void Objecter::start(const OSDMap* o)
397{
398 shared_lock rl(rwlock);
399
400 start_tick();
401 if (o) {
402 osdmap->deepish_copy_from(*o);
403 } else if (osdmap->get_epoch() == 0) {
404 _maybe_request_map();
405 }
406}
407
408void Objecter::shutdown()
409{
31f18b77 410 assert(initialized);
7c673cae
FG
411
412 unique_lock wl(rwlock);
413
31f18b77 414 initialized = false;
7c673cae
FG
415
416 cct->_conf->remove_observer(this);
417
418 map<int,OSDSession*>::iterator p;
419 while (!osd_sessions.empty()) {
420 p = osd_sessions.begin();
421 close_session(p->second);
422 }
423
424 while(!check_latest_map_lingers.empty()) {
425 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
426 i->second->put();
427 check_latest_map_lingers.erase(i->first);
428 }
429
430 while(!check_latest_map_ops.empty()) {
431 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
432 i->second->put();
433 check_latest_map_ops.erase(i->first);
434 }
435
436 while(!check_latest_map_commands.empty()) {
437 map<ceph_tid_t, CommandOp*>::iterator i
438 = check_latest_map_commands.begin();
439 i->second->put();
440 check_latest_map_commands.erase(i->first);
441 }
442
443 while(!poolstat_ops.empty()) {
444 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
445 delete i->second;
446 poolstat_ops.erase(i->first);
447 }
448
449 while(!statfs_ops.empty()) {
450 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
451 delete i->second;
452 statfs_ops.erase(i->first);
453 }
454
455 while(!pool_ops.empty()) {
456 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
457 delete i->second;
458 pool_ops.erase(i->first);
459 }
460
461 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
462 while(!homeless_session->linger_ops.empty()) {
463 std::map<uint64_t, LingerOp*>::iterator i
464 = homeless_session->linger_ops.begin();
465 ldout(cct, 10) << " linger_op " << i->first << dendl;
466 LingerOp *lop = i->second;
467 {
468 OSDSession::unique_lock swl(homeless_session->lock);
469 _session_linger_op_remove(homeless_session, lop);
470 }
471 linger_ops.erase(lop->linger_id);
472 linger_ops_set.erase(lop);
473 lop->put();
474 }
475
476 while(!homeless_session->ops.empty()) {
477 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
478 ldout(cct, 10) << " op " << i->first << dendl;
479 Op *op = i->second;
480 {
481 OSDSession::unique_lock swl(homeless_session->lock);
482 _session_op_remove(homeless_session, op);
483 }
484 op->put();
485 }
486
487 while(!homeless_session->command_ops.empty()) {
488 std::map<ceph_tid_t, CommandOp*>::iterator i
489 = homeless_session->command_ops.begin();
490 ldout(cct, 10) << " command_op " << i->first << dendl;
491 CommandOp *cop = i->second;
492 {
493 OSDSession::unique_lock swl(homeless_session->lock);
494 _session_command_op_remove(homeless_session, cop);
495 }
496 cop->put();
497 }
498
499 if (tick_event) {
500 if (timer.cancel_event(tick_event)) {
501 ldout(cct, 10) << " successfully canceled tick" << dendl;
502 }
503 tick_event = 0;
504 }
505
506 if (logger) {
507 cct->get_perfcounters_collection()->remove(logger);
508 delete logger;
509 logger = NULL;
510 }
511
512 // Let go of Objecter write lock so timer thread can shutdown
513 wl.unlock();
514
515 // Outside of lock to avoid cycle WRT calls to RequestStateHook
516 // This is safe because we guarantee no concurrent calls to
517 // shutdown() with the ::initialized check at start.
518 if (m_request_state_hook) {
519 AdminSocket* admin_socket = cct->get_admin_socket();
520 admin_socket->unregister_command("objecter_requests");
521 delete m_request_state_hook;
522 m_request_state_hook = NULL;
523 }
524}
525
526void Objecter::_send_linger(LingerOp *info,
527 shunique_lock& sul)
528{
529 assert(sul.owns_lock() && sul.mutex() == &rwlock);
530
531 vector<OSDOp> opv;
532 Context *oncommit = NULL;
533 LingerOp::shared_lock watchl(info->watch_lock);
534 bufferlist *poutbl = NULL;
535 if (info->registered && info->is_watch) {
536 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
537 << dendl;
538 opv.push_back(OSDOp());
539 opv.back().op.op = CEPH_OSD_OP_WATCH;
540 opv.back().op.watch.cookie = info->get_cookie();
541 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
542 opv.back().op.watch.gen = ++info->register_gen;
543 oncommit = new C_Linger_Reconnect(this, info);
544 } else {
545 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
546 << dendl;
547 opv = info->ops;
548 C_Linger_Commit *c = new C_Linger_Commit(this, info);
549 if (!info->is_watch) {
550 info->notify_id = 0;
551 poutbl = &c->outbl;
552 }
553 oncommit = c;
554 }
555 watchl.unlock();
556 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
557 opv, info->target.flags | CEPH_OSD_FLAG_READ,
558 oncommit, info->pobjver);
559 o->outbl = poutbl;
560 o->snapid = info->snap;
561 o->snapc = info->snapc;
562 o->mtime = info->mtime;
563
564 o->target = info->target;
31f18b77 565 o->tid = ++last_tid;
7c673cae
FG
566
567 // do not resend this; we will send a new op to reregister
568 o->should_resend = false;
569
570 if (info->register_tid) {
571 // repeat send. cancel old registeration op, if any.
572 OSDSession::unique_lock sl(info->session->lock);
573 if (info->session->ops.count(info->register_tid)) {
574 Op *o = info->session->ops[info->register_tid];
575 _op_cancel_map_check(o);
576 _cancel_linger_op(o);
577 }
578 sl.unlock();
579
580 _op_submit(o, sul, &info->register_tid);
581 } else {
582 // first send
583 _op_submit_with_budget(o, sul, &info->register_tid);
584 }
585
586 logger->inc(l_osdc_linger_send);
587}
588
589void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
590{
591 LingerOp::unique_lock wl(info->watch_lock);
592 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
593 if (info->on_reg_commit) {
594 info->on_reg_commit->complete(r);
595 info->on_reg_commit = NULL;
596 }
597
598 // only tell the user the first time we do this
599 info->registered = true;
600 info->pobjver = NULL;
601
602 if (!info->is_watch) {
603 // make note of the notify_id
604 bufferlist::iterator p = outbl.begin();
605 try {
606 ::decode(info->notify_id, p);
607 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
608 << dendl;
609 }
610 catch (buffer::error& e) {
611 }
612 }
613}
614
615struct C_DoWatchError : public Context {
616 Objecter *objecter;
617 Objecter::LingerOp *info;
618 int err;
619 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
620 : objecter(o), info(i), err(r) {
621 info->get();
622 info->_queued_async();
623 }
624 void finish(int r) override {
625 Objecter::unique_lock wl(objecter->rwlock);
626 bool canceled = info->canceled;
627 wl.unlock();
628
629 if (!canceled) {
630 info->watch_context->handle_error(info->get_cookie(), err);
631 }
632
633 info->finished_async();
634 info->put();
635 }
636};
637
638int Objecter::_normalize_watch_error(int r)
639{
640 // translate ENOENT -> ENOTCONN so that a delete->disconnection
641 // notification and a failure to reconnect becuase we raced with
642 // the delete appear the same to the user.
643 if (r == -ENOENT)
644 r = -ENOTCONN;
645 return r;
646}
647
648void Objecter::_linger_reconnect(LingerOp *info, int r)
649{
650 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
651 << " (last_error " << info->last_error << ")" << dendl;
652 if (r < 0) {
653 LingerOp::unique_lock wl(info->watch_lock);
654 if (!info->last_error) {
655 r = _normalize_watch_error(r);
656 info->last_error = r;
657 if (info->watch_context) {
658 finisher->queue(new C_DoWatchError(this, info, r));
659 }
660 }
661 wl.unlock();
662 }
663}
664
665void Objecter::_send_linger_ping(LingerOp *info)
666{
667 // rwlock is locked unique
668 // info->session->lock is locked
669
670 if (cct->_conf->objecter_inject_no_watch_ping) {
671 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
672 << dendl;
673 return;
674 }
675 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
676 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
677 return;
678 }
679
680 ceph::mono_time now = ceph::mono_clock::now();
681 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
682 << dendl;
683
684 vector<OSDOp> opv(1);
685 opv[0].op.op = CEPH_OSD_OP_WATCH;
686 opv[0].op.watch.cookie = info->get_cookie();
687 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
688 opv[0].op.watch.gen = info->register_gen;
689 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
690 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
691 opv, info->target.flags | CEPH_OSD_FLAG_READ,
692 onack, NULL, NULL);
693 o->target = info->target;
694 o->should_resend = false;
695 _send_op_account(o);
696 MOSDOp *m = _prepare_osd_op(o);
31f18b77 697 o->tid = ++last_tid;
7c673cae
FG
698 _session_op_assign(info->session, o);
699 _send_op(o, m);
700 info->ping_tid = o->tid;
701
702 onack->sent = now;
703 logger->inc(l_osdc_linger_ping);
704}
705
706void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
707 uint32_t register_gen)
708{
709 LingerOp::unique_lock l(info->watch_lock);
710 ldout(cct, 10) << __func__ << " " << info->linger_id
711 << " sent " << sent << " gen " << register_gen << " = " << r
712 << " (last_error " << info->last_error
713 << " register_gen " << info->register_gen << ")" << dendl;
714 if (info->register_gen == register_gen) {
715 if (r == 0) {
716 info->watch_valid_thru = sent;
717 } else if (r < 0 && !info->last_error) {
718 r = _normalize_watch_error(r);
719 info->last_error = r;
720 if (info->watch_context) {
721 finisher->queue(new C_DoWatchError(this, info, r));
722 }
723 }
724 } else {
725 ldout(cct, 20) << " ignoring old gen" << dendl;
726 }
727}
728
729int Objecter::linger_check(LingerOp *info)
730{
731 LingerOp::shared_lock l(info->watch_lock);
732
733 mono_time stamp = info->watch_valid_thru;
734 if (!info->watch_pending_async.empty())
735 stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
736 auto age = mono_clock::now() - stamp;
737
738 ldout(cct, 10) << __func__ << " " << info->linger_id
739 << " err " << info->last_error
740 << " age " << age << dendl;
741 if (info->last_error)
742 return info->last_error;
743 // return a safe upper bound (we are truncating to ms)
744 return
745 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
746}
747
748void Objecter::linger_cancel(LingerOp *info)
749{
750 unique_lock wl(rwlock);
751 _linger_cancel(info);
752 info->put();
753}
754
755void Objecter::_linger_cancel(LingerOp *info)
756{
757 // rwlock is locked unique
758 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
759 if (!info->canceled) {
760 OSDSession *s = info->session;
761 OSDSession::unique_lock sl(s->lock);
762 _session_linger_op_remove(s, info);
763 sl.unlock();
764
765 linger_ops.erase(info->linger_id);
766 linger_ops_set.erase(info);
767 assert(linger_ops.size() == linger_ops_set.size());
768
769 info->canceled = true;
770 info->put();
771
772 logger->dec(l_osdc_linger_active);
773 }
774}
775
776
777
778Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
779 const object_locator_t& oloc,
780 int flags)
781{
782 LingerOp *info = new LingerOp;
783 info->target.base_oid = oid;
784 info->target.base_oloc = oloc;
785 if (info->target.base_oloc.key == oid)
786 info->target.base_oloc.key.clear();
787 info->target.flags = flags;
788 info->watch_valid_thru = mono_clock::now();
789
790 unique_lock l(rwlock);
791
792 // Acquire linger ID
793 info->linger_id = ++max_linger_id;
794 ldout(cct, 10) << __func__ << " info " << info
795 << " linger_id " << info->linger_id
796 << " cookie " << info->get_cookie()
797 << dendl;
798 linger_ops[info->linger_id] = info;
799 linger_ops_set.insert(info);
800 assert(linger_ops.size() == linger_ops_set.size());
801
802 info->get(); // for the caller
803 return info;
804}
805
806ceph_tid_t Objecter::linger_watch(LingerOp *info,
807 ObjectOperation& op,
808 const SnapContext& snapc,
809 real_time mtime,
810 bufferlist& inbl,
811 Context *oncommit,
812 version_t *objver)
813{
814 info->is_watch = true;
815 info->snapc = snapc;
816 info->mtime = mtime;
817 info->target.flags |= CEPH_OSD_FLAG_WRITE;
818 info->ops = op.ops;
819 info->inbl = inbl;
820 info->poutbl = NULL;
821 info->pobjver = objver;
822 info->on_reg_commit = oncommit;
823
824 shunique_lock sul(rwlock, ceph::acquire_unique);
825 _linger_submit(info, sul);
826 logger->inc(l_osdc_linger_active);
827
828 return info->linger_id;
829}
830
831ceph_tid_t Objecter::linger_notify(LingerOp *info,
832 ObjectOperation& op,
833 snapid_t snap, bufferlist& inbl,
834 bufferlist *poutbl,
835 Context *onfinish,
836 version_t *objver)
837{
838 info->snap = snap;
839 info->target.flags |= CEPH_OSD_FLAG_READ;
840 info->ops = op.ops;
841 info->inbl = inbl;
842 info->poutbl = poutbl;
843 info->pobjver = objver;
844 info->on_reg_commit = onfinish;
845
846 shunique_lock sul(rwlock, ceph::acquire_unique);
847 _linger_submit(info, sul);
848 logger->inc(l_osdc_linger_active);
849
850 return info->linger_id;
851}
852
853void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
854{
855 assert(sul.owns_lock() && sul.mutex() == &rwlock);
856 assert(info->linger_id);
857
858 // Populate Op::target
859 OSDSession *s = NULL;
860 _calc_target(&info->target, nullptr);
861
862 // Create LingerOp<->OSDSession relation
863 int r = _get_session(info->target.osd, &s, sul);
864 assert(r == 0);
865 OSDSession::unique_lock sl(s->lock);
866 _session_linger_op_assign(s, info);
867 sl.unlock();
868 put_session(s);
869
870 _send_linger(info, sul);
871}
872
873struct C_DoWatchNotify : public Context {
874 Objecter *objecter;
875 Objecter::LingerOp *info;
876 MWatchNotify *msg;
877 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
878 : objecter(o), info(i), msg(m) {
879 info->get();
880 info->_queued_async();
881 msg->get();
882 }
883 void finish(int r) override {
884 objecter->_do_watch_notify(info, msg);
885 }
886};
887
888void Objecter::handle_watch_notify(MWatchNotify *m)
889{
890 shared_lock l(rwlock);
31f18b77 891 if (!initialized) {
7c673cae
FG
892 return;
893 }
894
895 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
896 if (linger_ops_set.count(info) == 0) {
897 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
898 return;
899 }
900 LingerOp::unique_lock wl(info->watch_lock);
901 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
902 if (!info->last_error) {
903 info->last_error = -ENOTCONN;
904 if (info->watch_context) {
905 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
906 }
907 }
908 } else if (!info->is_watch) {
909 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
910 // since we know the only user (librados) is safe to call in
911 // fast-dispatch context
912 if (info->notify_id &&
913 info->notify_id != m->notify_id) {
914 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
915 << " != " << info->notify_id << ", ignoring" << dendl;
916 } else if (info->on_notify_finish) {
917 info->notify_result_bl->claim(m->get_data());
918 info->on_notify_finish->complete(m->return_code);
919
920 // if we race with reconnect we might get a second notify; only
921 // notify the caller once!
922 info->on_notify_finish = NULL;
923 }
924 } else {
925 finisher->queue(new C_DoWatchNotify(this, info, m));
926 }
927}
928
929void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
930{
931 ldout(cct, 10) << __func__ << " " << *m << dendl;
932
933 shared_lock l(rwlock);
31f18b77 934 assert(initialized);
7c673cae
FG
935
936 if (info->canceled) {
937 l.unlock();
938 goto out;
939 }
940
941 // notify completion?
942 assert(info->is_watch);
943 assert(info->watch_context);
944 assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
945
946 l.unlock();
947
948 switch (m->opcode) {
949 case CEPH_WATCH_EVENT_NOTIFY:
950 info->watch_context->handle_notify(m->notify_id, m->cookie,
951 m->notifier_gid, m->bl);
952 break;
953 }
954
955 out:
956 info->finished_async();
957 info->put();
958 m->put();
959}
960
961bool Objecter::ms_dispatch(Message *m)
962{
963 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
31f18b77 964 if (!initialized)
7c673cae
FG
965 return false;
966
967 switch (m->get_type()) {
968 // these we exlusively handle
969 case CEPH_MSG_OSD_OPREPLY:
970 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
971 return true;
972
973 case CEPH_MSG_OSD_BACKOFF:
974 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
975 return true;
976
977 case CEPH_MSG_WATCH_NOTIFY:
978 handle_watch_notify(static_cast<MWatchNotify*>(m));
979 m->put();
980 return true;
981
982 case MSG_COMMAND_REPLY:
983 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
984 handle_command_reply(static_cast<MCommandReply*>(m));
985 return true;
986 } else {
987 return false;
988 }
989
990 case MSG_GETPOOLSTATSREPLY:
991 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
992 return true;
993
994 case CEPH_MSG_POOLOP_REPLY:
995 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
996 return true;
997
998 case CEPH_MSG_STATFS_REPLY:
999 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1000 return true;
1001
1002 // these we give others a chance to inspect
1003
1004 // MDS, OSD
1005 case CEPH_MSG_OSD_MAP:
1006 handle_osd_map(static_cast<MOSDMap*>(m));
1007 return false;
1008 }
1009 return false;
1010}
1011
1012void Objecter::_scan_requests(OSDSession *s,
1013 bool force_resend,
1014 bool cluster_full,
1015 map<int64_t, bool> *pool_full_map,
1016 map<ceph_tid_t, Op*>& need_resend,
1017 list<LingerOp*>& need_resend_linger,
1018 map<ceph_tid_t, CommandOp*>& need_resend_command,
1019 shunique_lock& sul)
1020{
1021 assert(sul.owns_lock() && sul.mutex() == &rwlock);
1022
1023 list<LingerOp*> unregister_lingers;
1024
1025 OSDSession::unique_lock sl(s->lock);
1026
1027 // check for changed linger mappings (_before_ regular ops)
1028 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1029 while (lp != s->linger_ops.end()) {
1030 LingerOp *op = lp->second;
1031 assert(op->session == s);
1032 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1033 // invalidation
1034 ++lp;
1035 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1036 bool unregister, force_resend_writes = cluster_full;
1037 int r = _recalc_linger_op_target(op, sul);
1038 if (pool_full_map)
1039 force_resend_writes = force_resend_writes ||
1040 (*pool_full_map)[op->target.base_oloc.pool];
1041 switch (r) {
1042 case RECALC_OP_TARGET_NO_ACTION:
1043 if (!force_resend && !force_resend_writes)
1044 break;
1045 // -- fall-thru --
1046 case RECALC_OP_TARGET_NEED_RESEND:
1047 need_resend_linger.push_back(op);
1048 _linger_cancel_map_check(op);
1049 break;
1050 case RECALC_OP_TARGET_POOL_DNE:
1051 _check_linger_pool_dne(op, &unregister);
1052 if (unregister) {
1053 ldout(cct, 10) << " need to unregister linger op "
1054 << op->linger_id << dendl;
1055 op->get();
1056 unregister_lingers.push_back(op);
1057 }
1058 break;
1059 }
1060 }
1061
1062 // check for changed request mappings
1063 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1064 while (p != s->ops.end()) {
1065 Op *op = p->second;
1066 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1067 ldout(cct, 10) << " checking op " << op->tid << dendl;
1068 bool force_resend_writes = cluster_full;
1069 if (pool_full_map)
1070 force_resend_writes = force_resend_writes ||
1071 (*pool_full_map)[op->target.base_oloc.pool];
1072 int r = _calc_target(&op->target,
1073 op->session ? op->session->con.get() : nullptr);
1074 switch (r) {
1075 case RECALC_OP_TARGET_NO_ACTION:
1076 if (!force_resend && !(force_resend_writes && op->respects_full()))
1077 break;
1078 // -- fall-thru --
1079 case RECALC_OP_TARGET_NEED_RESEND:
1080 if (op->session) {
1081 _session_op_remove(op->session, op);
1082 }
1083 need_resend[op->tid] = op;
1084 _op_cancel_map_check(op);
1085 break;
1086 case RECALC_OP_TARGET_POOL_DNE:
1087 _check_op_pool_dne(op, &sl);
1088 break;
1089 }
1090 }
1091
1092 // commands
1093 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1094 while (cp != s->command_ops.end()) {
1095 CommandOp *c = cp->second;
1096 ++cp;
1097 ldout(cct, 10) << " checking command " << c->tid << dendl;
1098 bool force_resend_writes = cluster_full;
1099 if (pool_full_map)
1100 force_resend_writes = force_resend_writes ||
1101 (*pool_full_map)[c->target_pg.pool()];
1102 int r = _calc_command_target(c, sul);
1103 switch (r) {
1104 case RECALC_OP_TARGET_NO_ACTION:
1105 // resend if skipped map; otherwise do nothing.
1106 if (!force_resend && !force_resend_writes)
1107 break;
1108 // -- fall-thru --
1109 case RECALC_OP_TARGET_NEED_RESEND:
1110 need_resend_command[c->tid] = c;
1111 if (c->session) {
1112 _session_command_op_remove(c->session, c);
1113 }
1114 _command_cancel_map_check(c);
1115 break;
1116 case RECALC_OP_TARGET_POOL_DNE:
1117 case RECALC_OP_TARGET_OSD_DNE:
1118 case RECALC_OP_TARGET_OSD_DOWN:
1119 _check_command_map_dne(c);
1120 break;
1121 }
1122 }
1123
1124 sl.unlock();
1125
1126 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1127 iter != unregister_lingers.end();
1128 ++iter) {
1129 _linger_cancel(*iter);
1130 (*iter)->put();
1131 }
1132}
1133
1134void Objecter::handle_osd_map(MOSDMap *m)
1135{
1136 shunique_lock sul(rwlock, acquire_unique);
31f18b77 1137 if (!initialized)
7c673cae
FG
1138 return;
1139
1140 assert(osdmap);
1141
1142 if (m->fsid != monc->get_fsid()) {
1143 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1144 << " != " << monc->get_fsid() << dendl;
1145 return;
1146 }
1147
1148 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1149 bool cluster_full = _osdmap_full_flag();
1150 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1151 _osdmap_has_pool_full();
1152 map<int64_t, bool> pool_full_map;
1153 for (map<int64_t, pg_pool_t>::const_iterator it
1154 = osdmap->get_pools().begin();
1155 it != osdmap->get_pools().end(); ++it)
1156 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1157
1158
1159 list<LingerOp*> need_resend_linger;
1160 map<ceph_tid_t, Op*> need_resend;
1161 map<ceph_tid_t, CommandOp*> need_resend_command;
1162
1163 if (m->get_last() <= osdmap->get_epoch()) {
1164 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1165 << m->get_first() << "," << m->get_last()
1166 << "] <= " << osdmap->get_epoch() << dendl;
1167 } else {
1168 ldout(cct, 3) << "handle_osd_map got epochs ["
1169 << m->get_first() << "," << m->get_last()
1170 << "] > " << osdmap->get_epoch() << dendl;
1171
1172 if (osdmap->get_epoch()) {
1173 bool skipped_map = false;
1174 // we want incrementals
1175 for (epoch_t e = osdmap->get_epoch() + 1;
1176 e <= m->get_last();
1177 e++) {
1178
1179 if (osdmap->get_epoch() == e-1 &&
1180 m->incremental_maps.count(e)) {
1181 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1182 << dendl;
1183 OSDMap::Incremental inc(m->incremental_maps[e]);
1184 osdmap->apply_incremental(inc);
31f18b77
FG
1185
1186 emit_blacklist_events(inc);
1187
7c673cae
FG
1188 logger->inc(l_osdc_map_inc);
1189 }
1190 else if (m->maps.count(e)) {
1191 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
31f18b77
FG
1192 OSDMap *new_osdmap = new OSDMap();
1193 new_osdmap->decode(m->maps[e]);
1194
1195 emit_blacklist_events(*osdmap, *new_osdmap);
1196
1197 osdmap = new_osdmap;
1198
7c673cae
FG
1199 logger->inc(l_osdc_map_full);
1200 }
1201 else {
1202 if (e >= m->get_oldest()) {
1203 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1204 << osdmap->get_epoch()+1 << dendl;
1205 _maybe_request_map();
1206 break;
1207 }
1208 ldout(cct, 3) << "handle_osd_map missing epoch "
1209 << osdmap->get_epoch()+1
1210 << ", jumping to " << m->get_oldest() << dendl;
1211 e = m->get_oldest() - 1;
1212 skipped_map = true;
1213 continue;
1214 }
1215 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1216
1217 cluster_full = cluster_full || _osdmap_full_flag();
1218 update_pool_full_map(pool_full_map);
1219
1220 // check all outstanding requests on every epoch
1221 _scan_requests(homeless_session, skipped_map, cluster_full,
1222 &pool_full_map, need_resend,
1223 need_resend_linger, need_resend_command, sul);
1224 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1225 p != osd_sessions.end(); ) {
1226 OSDSession *s = p->second;
1227 _scan_requests(s, skipped_map, cluster_full,
1228 &pool_full_map, need_resend,
1229 need_resend_linger, need_resend_command, sul);
1230 ++p;
1231 // osd down or addr change?
1232 if (!osdmap->is_up(s->osd) ||
1233 (s->con &&
1234 s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
1235 close_session(s);
1236 }
1237 }
1238
1239 assert(e == osdmap->get_epoch());
1240 }
1241
1242 } else {
1243 // first map. we want the full thing.
1244 if (m->maps.count(m->get_last())) {
1245 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1246 p != osd_sessions.end(); ++p) {
1247 OSDSession *s = p->second;
1248 _scan_requests(s, false, false, NULL, need_resend,
1249 need_resend_linger, need_resend_command, sul);
1250 }
1251 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1252 << m->get_last() << dendl;
1253 osdmap->decode(m->maps[m->get_last()]);
1254
1255 _scan_requests(homeless_session, false, false, NULL,
1256 need_resend, need_resend_linger,
1257 need_resend_command, sul);
1258 } else {
1259 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1260 << dendl;
1261 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1262 monc->renew_subs();
1263 }
1264 }
1265 }
1266
1267 // make sure need_resend targets reflect latest map
1268 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1269 Op *op = p->second;
1270 if (op->target.epoch < osdmap->get_epoch()) {
1271 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1272 int r = _calc_target(&op->target, nullptr);
1273 if (r == RECALC_OP_TARGET_POOL_DNE) {
1274 p = need_resend.erase(p);
1275 _check_op_pool_dne(op, nullptr);
1276 } else {
1277 ++p;
1278 }
1279 } else {
1280 ++p;
1281 }
1282 }
1283
1284 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1285 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1286 || _osdmap_has_pool_full();
1287
1288 // was/is paused?
1289 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1290 osdmap->get_epoch() < epoch_barrier) {
1291 _maybe_request_map();
1292 }
1293
1294 // resend requests
1295 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1296 p != need_resend.end(); ++p) {
1297 Op *op = p->second;
1298 OSDSession *s = op->session;
1299 bool mapped_session = false;
1300 if (!s) {
1301 int r = _map_session(&op->target, &s, sul);
1302 assert(r == 0);
1303 mapped_session = true;
1304 } else {
1305 get_session(s);
1306 }
1307 OSDSession::unique_lock sl(s->lock);
1308 if (mapped_session) {
1309 _session_op_assign(s, op);
1310 }
1311 if (op->should_resend) {
1312 if (!op->session->is_homeless() && !op->target.paused) {
1313 logger->inc(l_osdc_op_resend);
1314 _send_op(op);
1315 }
1316 } else {
1317 _op_cancel_map_check(op);
1318 _cancel_linger_op(op);
1319 }
1320 sl.unlock();
1321 put_session(s);
1322 }
1323 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1324 p != need_resend_linger.end(); ++p) {
1325 LingerOp *op = *p;
1326 if (!op->session) {
1327 _calc_target(&op->target, nullptr);
1328 OSDSession *s = NULL;
d2e6a577 1329 const int r = _get_session(op->target.osd, &s, sul);
7c673cae
FG
1330 assert(r == 0);
1331 assert(s != NULL);
1332 op->session = s;
1333 put_session(s);
1334 }
1335 if (!op->session->is_homeless()) {
1336 logger->inc(l_osdc_linger_resend);
1337 _send_linger(op, sul);
1338 }
1339 }
1340 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1341 p != need_resend_command.end(); ++p) {
1342 CommandOp *c = p->second;
1343 if (c->target.osd >= 0) {
1344 _assign_command_session(c, sul);
1345 if (c->session && !c->session->is_homeless()) {
1346 _send_command(c);
1347 }
1348 }
1349 }
1350
1351 _dump_active();
1352
1353 // finish any Contexts that were waiting on a map update
1354 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1355 waiting_for_map.begin();
1356 while (p != waiting_for_map.end() &&
1357 p->first <= osdmap->get_epoch()) {
1358 //go through the list and call the onfinish methods
1359 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1360 i != p->second.end(); ++i) {
1361 i->first->complete(i->second);
1362 }
1363 waiting_for_map.erase(p++);
1364 }
1365
1366 monc->sub_got("osdmap", osdmap->get_epoch());
1367
1368 if (!waiting_for_map.empty()) {
1369 _maybe_request_map();
1370 }
1371}
1372
31f18b77
FG
1373void Objecter::enable_blacklist_events()
1374{
1375 unique_lock wl(rwlock);
1376
1377 blacklist_events_enabled = true;
1378}
1379
1380void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1381{
1382 unique_lock wl(rwlock);
1383
1384 if (events->empty()) {
1385 events->swap(blacklist_events);
1386 } else {
1387 for (const auto &i : blacklist_events) {
1388 events->insert(i);
1389 }
1390 blacklist_events.clear();
1391 }
1392}
1393
1394void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1395{
1396 if (!blacklist_events_enabled) {
1397 return;
1398 }
1399
1400 for (const auto &i : inc.new_blacklist) {
1401 blacklist_events.insert(i.first);
1402 }
1403}
1404
1405void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1406 const OSDMap &new_osd_map)
1407{
1408 if (!blacklist_events_enabled) {
1409 return;
1410 }
1411
1412 std::set<entity_addr_t> old_set;
1413 std::set<entity_addr_t> new_set;
1414
1415 old_osd_map.get_blacklist(&old_set);
1416 new_osd_map.get_blacklist(&new_set);
1417
1418 std::set<entity_addr_t> delta_set;
1419 std::set_difference(
1420 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1421 std::inserter(delta_set, delta_set.begin()));
1422 blacklist_events.insert(delta_set.begin(), delta_set.end());
1423}
1424
7c673cae
FG
1425// op pool check
1426
1427void Objecter::C_Op_Map_Latest::finish(int r)
1428{
1429 if (r == -EAGAIN || r == -ECANCELED)
1430 return;
1431
1432 lgeneric_subdout(objecter->cct, objecter, 10)
1433 << "op_map_latest r=" << r << " tid=" << tid
1434 << " latest " << latest << dendl;
1435
1436 Objecter::unique_lock wl(objecter->rwlock);
1437
1438 map<ceph_tid_t, Op*>::iterator iter =
1439 objecter->check_latest_map_ops.find(tid);
1440 if (iter == objecter->check_latest_map_ops.end()) {
1441 lgeneric_subdout(objecter->cct, objecter, 10)
1442 << "op_map_latest op "<< tid << " not found" << dendl;
1443 return;
1444 }
1445
1446 Op *op = iter->second;
1447 objecter->check_latest_map_ops.erase(iter);
1448
1449 lgeneric_subdout(objecter->cct, objecter, 20)
1450 << "op_map_latest op "<< op << dendl;
1451
1452 if (op->map_dne_bound == 0)
1453 op->map_dne_bound = latest;
1454
1455 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1456 objecter->_check_op_pool_dne(op, &sl);
1457
1458 op->put();
1459}
1460
1461int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1462 snapid_t *snap) const
1463{
1464 shared_lock rl(rwlock);
1465
1466 auto& pools = osdmap->get_pools();
1467 auto iter = pools.find(poolid);
1468 if (iter == pools.end()) {
1469 return -ENOENT;
1470 }
1471 const pg_pool_t& pg_pool = iter->second;
1472 for (auto p = pg_pool.snaps.begin();
1473 p != pg_pool.snaps.end();
1474 ++p) {
1475 if (p->second.name == snap_name) {
1476 *snap = p->first;
1477 return 0;
1478 }
1479 }
1480 return -ENOENT;
1481}
1482
1483int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1484 pool_snap_info_t *info) const
1485{
1486 shared_lock rl(rwlock);
1487
1488 auto& pools = osdmap->get_pools();
1489 auto iter = pools.find(poolid);
1490 if (iter == pools.end()) {
1491 return -ENOENT;
1492 }
1493 const pg_pool_t& pg_pool = iter->second;
1494 auto p = pg_pool.snaps.find(snap);
1495 if (p == pg_pool.snaps.end())
1496 return -ENOENT;
1497 *info = p->second;
1498
1499 return 0;
1500}
1501
1502int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1503{
1504 shared_lock rl(rwlock);
1505
1506 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1507 if (!pi)
1508 return -ENOENT;
1509 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1510 p != pi->snaps.end();
1511 ++p) {
1512 snaps->push_back(p->first);
1513 }
1514 return 0;
1515}
1516
1517// sl may be unlocked.
1518void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1519{
1520 // rwlock is locked unique
1521
1522 if (op->target.pool_ever_existed) {
1523 // the pool previously existed and now it does not, which means it
1524 // was deleted.
1525 op->map_dne_bound = osdmap->get_epoch();
1526 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1527 << " pool previously exists but now does not"
1528 << dendl;
1529 } else {
1530 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1531 << " current " << osdmap->get_epoch()
1532 << " map_dne_bound " << op->map_dne_bound
1533 << dendl;
1534 }
1535 if (op->map_dne_bound > 0) {
1536 if (osdmap->get_epoch() >= op->map_dne_bound) {
1537 // we had a new enough map
1538 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1539 << " concluding pool " << op->target.base_pgid.pool()
1540 << " dne" << dendl;
1541 if (op->onfinish) {
1542 op->onfinish->complete(-ENOENT);
1543 }
1544
1545 OSDSession *s = op->session;
1546 if (s) {
1547 assert(s != NULL);
1548 assert(sl->mutex() == &s->lock);
1549 bool session_locked = sl->owns_lock();
1550 if (!session_locked) {
1551 sl->lock();
1552 }
1553 _finish_op(op, 0);
1554 if (!session_locked) {
1555 sl->unlock();
1556 }
1557 } else {
1558 _finish_op(op, 0); // no session
1559 }
1560 }
1561 } else {
1562 _send_op_map_check(op);
1563 }
1564}
1565
1566void Objecter::_send_op_map_check(Op *op)
1567{
1568 // rwlock is locked unique
1569 // ask the monitor
1570 if (check_latest_map_ops.count(op->tid) == 0) {
1571 op->get();
1572 check_latest_map_ops[op->tid] = op;
1573 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1574 monc->get_version("osdmap", &c->latest, NULL, c);
1575 }
1576}
1577
1578void Objecter::_op_cancel_map_check(Op *op)
1579{
1580 // rwlock is locked unique
1581 map<ceph_tid_t, Op*>::iterator iter =
1582 check_latest_map_ops.find(op->tid);
1583 if (iter != check_latest_map_ops.end()) {
1584 Op *op = iter->second;
1585 op->put();
1586 check_latest_map_ops.erase(iter);
1587 }
1588}
1589
1590// linger pool check
1591
1592void Objecter::C_Linger_Map_Latest::finish(int r)
1593{
1594 if (r == -EAGAIN || r == -ECANCELED) {
1595 // ignore callback; we will retry in resend_mon_ops()
1596 return;
1597 }
1598
1599 unique_lock wl(objecter->rwlock);
1600
1601 map<uint64_t, LingerOp*>::iterator iter =
1602 objecter->check_latest_map_lingers.find(linger_id);
1603 if (iter == objecter->check_latest_map_lingers.end()) {
1604 return;
1605 }
1606
1607 LingerOp *op = iter->second;
1608 objecter->check_latest_map_lingers.erase(iter);
1609
1610 if (op->map_dne_bound == 0)
1611 op->map_dne_bound = latest;
1612
1613 bool unregister;
1614 objecter->_check_linger_pool_dne(op, &unregister);
1615
1616 if (unregister) {
1617 objecter->_linger_cancel(op);
1618 }
1619
1620 op->put();
1621}
1622
1623void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1624{
1625 // rwlock is locked unique
1626
1627 *need_unregister = false;
1628
1629 if (op->register_gen > 0) {
1630 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1631 << " pool previously existed but now does not"
1632 << dendl;
1633 op->map_dne_bound = osdmap->get_epoch();
1634 } else {
1635 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1636 << " current " << osdmap->get_epoch()
1637 << " map_dne_bound " << op->map_dne_bound
1638 << dendl;
1639 }
1640 if (op->map_dne_bound > 0) {
1641 if (osdmap->get_epoch() >= op->map_dne_bound) {
1642 if (op->on_reg_commit) {
1643 op->on_reg_commit->complete(-ENOENT);
1644 }
1645 *need_unregister = true;
1646 }
1647 } else {
1648 _send_linger_map_check(op);
1649 }
1650}
1651
1652void Objecter::_send_linger_map_check(LingerOp *op)
1653{
1654 // ask the monitor
1655 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1656 op->get();
1657 check_latest_map_lingers[op->linger_id] = op;
1658 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1659 monc->get_version("osdmap", &c->latest, NULL, c);
1660 }
1661}
1662
1663void Objecter::_linger_cancel_map_check(LingerOp *op)
1664{
1665 // rwlock is locked unique
1666
1667 map<uint64_t, LingerOp*>::iterator iter =
1668 check_latest_map_lingers.find(op->linger_id);
1669 if (iter != check_latest_map_lingers.end()) {
1670 LingerOp *op = iter->second;
1671 op->put();
1672 check_latest_map_lingers.erase(iter);
1673 }
1674}
1675
1676// command pool check
1677
1678void Objecter::C_Command_Map_Latest::finish(int r)
1679{
1680 if (r == -EAGAIN || r == -ECANCELED) {
1681 // ignore callback; we will retry in resend_mon_ops()
1682 return;
1683 }
1684
1685 unique_lock wl(objecter->rwlock);
1686
1687 map<uint64_t, CommandOp*>::iterator iter =
1688 objecter->check_latest_map_commands.find(tid);
1689 if (iter == objecter->check_latest_map_commands.end()) {
1690 return;
1691 }
1692
1693 CommandOp *c = iter->second;
1694 objecter->check_latest_map_commands.erase(iter);
1695
1696 if (c->map_dne_bound == 0)
1697 c->map_dne_bound = latest;
1698
1699 objecter->_check_command_map_dne(c);
1700
1701 c->put();
1702}
1703
1704void Objecter::_check_command_map_dne(CommandOp *c)
1705{
1706 // rwlock is locked unique
1707
1708 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1709 << " current " << osdmap->get_epoch()
1710 << " map_dne_bound " << c->map_dne_bound
1711 << dendl;
1712 if (c->map_dne_bound > 0) {
1713 if (osdmap->get_epoch() >= c->map_dne_bound) {
1714 _finish_command(c, c->map_check_error, c->map_check_error_str);
1715 }
1716 } else {
1717 _send_command_map_check(c);
1718 }
1719}
1720
1721void Objecter::_send_command_map_check(CommandOp *c)
1722{
1723 // rwlock is locked unique
1724
1725 // ask the monitor
1726 if (check_latest_map_commands.count(c->tid) == 0) {
1727 c->get();
1728 check_latest_map_commands[c->tid] = c;
1729 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1730 monc->get_version("osdmap", &f->latest, NULL, f);
1731 }
1732}
1733
1734void Objecter::_command_cancel_map_check(CommandOp *c)
1735{
1736 // rwlock is locked uniqe
1737
1738 map<uint64_t, CommandOp*>::iterator iter =
1739 check_latest_map_commands.find(c->tid);
1740 if (iter != check_latest_map_commands.end()) {
1741 CommandOp *c = iter->second;
1742 c->put();
1743 check_latest_map_commands.erase(iter);
1744 }
1745}
1746
1747
1748/**
1749 * Look up OSDSession by OSD id.
1750 *
1751 * @returns 0 on success, or -EAGAIN if the lock context requires
1752 * promotion to write.
1753 */
1754int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1755{
1756 assert(sul && sul.mutex() == &rwlock);
1757
1758 if (osd < 0) {
1759 *session = homeless_session;
1760 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1761 << dendl;
1762 return 0;
1763 }
1764
1765 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1766 if (p != osd_sessions.end()) {
1767 OSDSession *s = p->second;
1768 s->get();
1769 *session = s;
1770 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1771 << s->get_nref() << dendl;
1772 return 0;
1773 }
1774 if (!sul.owns_lock()) {
1775 return -EAGAIN;
1776 }
1777 OSDSession *s = new OSDSession(cct, osd);
1778 osd_sessions[osd] = s;
1779 s->con = messenger->get_connection(osdmap->get_inst(osd));
1780 s->con->set_priv(s->get());
1781 logger->inc(l_osdc_osd_session_open);
1782 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1783 s->get();
1784 *session = s;
1785 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1786 << s->get_nref() << dendl;
1787 return 0;
1788}
1789
1790void Objecter::put_session(Objecter::OSDSession *s)
1791{
1792 if (s && !s->is_homeless()) {
1793 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1794 << s->get_nref() << dendl;
1795 s->put();
1796 }
1797}
1798
1799void Objecter::get_session(Objecter::OSDSession *s)
1800{
1801 assert(s != NULL);
1802
1803 if (!s->is_homeless()) {
1804 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1805 << s->get_nref() << dendl;
1806 s->get();
1807 }
1808}
1809
1810void Objecter::_reopen_session(OSDSession *s)
1811{
1812 // s->lock is locked
1813
1814 entity_inst_t inst = osdmap->get_inst(s->osd);
1815 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
1816 << inst << dendl;
1817 if (s->con) {
1818 s->con->set_priv(NULL);
1819 s->con->mark_down();
1820 logger->inc(l_osdc_osd_session_close);
1821 }
1822 s->con = messenger->get_connection(inst);
1823 s->con->set_priv(s->get());
1824 s->incarnation++;
1825 logger->inc(l_osdc_osd_session_open);
1826}
1827
1828void Objecter::close_session(OSDSession *s)
1829{
1830 // rwlock is locked unique
1831
1832 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1833 if (s->con) {
1834 s->con->set_priv(NULL);
1835 s->con->mark_down();
1836 logger->inc(l_osdc_osd_session_close);
1837 }
1838 OSDSession::unique_lock sl(s->lock);
1839
1840 std::list<LingerOp*> homeless_lingers;
1841 std::list<CommandOp*> homeless_commands;
1842 std::list<Op*> homeless_ops;
1843
1844 while (!s->linger_ops.empty()) {
1845 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1846 ldout(cct, 10) << " linger_op " << i->first << dendl;
1847 homeless_lingers.push_back(i->second);
1848 _session_linger_op_remove(s, i->second);
1849 }
1850
1851 while (!s->ops.empty()) {
1852 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1853 ldout(cct, 10) << " op " << i->first << dendl;
1854 homeless_ops.push_back(i->second);
1855 _session_op_remove(s, i->second);
1856 }
1857
1858 while (!s->command_ops.empty()) {
1859 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1860 ldout(cct, 10) << " command_op " << i->first << dendl;
1861 homeless_commands.push_back(i->second);
1862 _session_command_op_remove(s, i->second);
1863 }
1864
1865 osd_sessions.erase(s->osd);
1866 sl.unlock();
1867 put_session(s);
1868
1869 // Assign any leftover ops to the homeless session
1870 {
1871 OSDSession::unique_lock hsl(homeless_session->lock);
1872 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1873 i != homeless_lingers.end(); ++i) {
1874 _session_linger_op_assign(homeless_session, *i);
1875 }
1876 for (std::list<Op*>::iterator i = homeless_ops.begin();
1877 i != homeless_ops.end(); ++i) {
1878 _session_op_assign(homeless_session, *i);
1879 }
1880 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1881 i != homeless_commands.end(); ++i) {
1882 _session_command_op_assign(homeless_session, *i);
1883 }
1884 }
1885
1886 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1887}
1888
1889void Objecter::wait_for_osd_map()
1890{
1891 unique_lock l(rwlock);
1892 if (osdmap->get_epoch()) {
1893 l.unlock();
1894 return;
1895 }
1896
1897 // Leave this since it goes with C_SafeCond
1898 Mutex lock("");
1899 Cond cond;
1900 bool done;
1901 lock.Lock();
1902 C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
1903 waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
1904 l.unlock();
1905 while (!done)
1906 cond.Wait(lock);
1907 lock.Unlock();
1908}
1909
1910struct C_Objecter_GetVersion : public Context {
1911 Objecter *objecter;
1912 uint64_t oldest, newest;
1913 Context *fin;
1914 C_Objecter_GetVersion(Objecter *o, Context *c)
1915 : objecter(o), oldest(0), newest(0), fin(c) {}
1916 void finish(int r) override {
1917 if (r >= 0) {
1918 objecter->get_latest_version(oldest, newest, fin);
1919 } else if (r == -EAGAIN) { // try again as instructed
1920 objecter->wait_for_latest_osdmap(fin);
1921 } else {
1922 // it doesn't return any other error codes!
1923 ceph_abort();
1924 }
1925 }
1926};
1927
1928void Objecter::wait_for_latest_osdmap(Context *fin)
1929{
1930 ldout(cct, 10) << __func__ << dendl;
1931 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1932 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1933}
1934
1935void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1936{
1937 unique_lock wl(rwlock);
1938 _get_latest_version(oldest, newest, fin);
1939}
1940
1941void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1942 Context *fin)
1943{
1944 // rwlock is locked unique
1945 if (osdmap->get_epoch() >= newest) {
1946 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1947 if (fin)
1948 fin->complete(0);
1949 return;
1950 }
1951
1952 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1953 _wait_for_new_map(fin, newest, 0);
1954}
1955
1956void Objecter::maybe_request_map()
1957{
1958 shared_lock rl(rwlock);
1959 _maybe_request_map();
1960}
1961
1962void Objecter::_maybe_request_map()
1963{
1964 // rwlock is locked
1965 int flag = 0;
1966 if (_osdmap_full_flag()
1967 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1968 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1969 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1970 "osd map (FULL flag is set)" << dendl;
1971 } else {
1972 ldout(cct, 10)
1973 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1974 flag = CEPH_SUBSCRIBE_ONETIME;
1975 }
1976 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1977 if (monc->sub_want("osdmap", epoch, flag)) {
1978 monc->renew_subs();
1979 }
1980}
1981
1982void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
1983{
1984 // rwlock is locked unique
1985 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
1986 _maybe_request_map();
1987}
1988
1989
1990/**
1991 * Use this together with wait_for_map: this is a pre-check to avoid
1992 * allocating a Context for wait_for_map if we can see that we
1993 * definitely already have the epoch.
1994 *
1995 * This does *not* replace the need to handle the return value of
1996 * wait_for_map: just because we don't have it in this pre-check
1997 * doesn't mean we won't have it when calling back into wait_for_map,
1998 * since the objecter lock is dropped in between.
1999 */
2000bool Objecter::have_map(const epoch_t epoch)
2001{
2002 shared_lock rl(rwlock);
2003 if (osdmap->get_epoch() >= epoch) {
2004 return true;
2005 } else {
2006 return false;
2007 }
2008}
2009
2010bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2011{
2012 unique_lock wl(rwlock);
2013 if (osdmap->get_epoch() >= epoch) {
2014 return true;
2015 }
2016 _wait_for_new_map(c, epoch, err);
2017 return false;
2018}
2019
2020void Objecter::kick_requests(OSDSession *session)
2021{
2022 ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
2023
2024 map<uint64_t, LingerOp *> lresend;
2025 unique_lock wl(rwlock);
2026
2027 OSDSession::unique_lock sl(session->lock);
2028 _kick_requests(session, lresend);
2029 sl.unlock();
2030
2031 _linger_ops_resend(lresend, wl);
2032}
2033
2034void Objecter::_kick_requests(OSDSession *session,
2035 map<uint64_t, LingerOp *>& lresend)
2036{
2037 // rwlock is locked unique
2038
2039 // clear backoffs
2040 session->backoffs.clear();
2041 session->backoffs_by_id.clear();
2042
2043 // resend ops
2044 map<ceph_tid_t,Op*> resend; // resend in tid order
2045 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2046 p != session->ops.end();) {
2047 Op *op = p->second;
2048 ++p;
2049 logger->inc(l_osdc_op_resend);
2050 if (op->should_resend) {
2051 if (!op->target.paused)
2052 resend[op->tid] = op;
2053 } else {
2054 _op_cancel_map_check(op);
2055 _cancel_linger_op(op);
2056 }
2057 }
2058
2059 while (!resend.empty()) {
2060 _send_op(resend.begin()->second);
2061 resend.erase(resend.begin());
2062 }
2063
2064 // resend lingers
2065 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2066 j != session->linger_ops.end(); ++j) {
2067 LingerOp *op = j->second;
2068 op->get();
2069 logger->inc(l_osdc_linger_resend);
2070 assert(lresend.count(j->first) == 0);
2071 lresend[j->first] = op;
2072 }
2073
2074 // resend commands
2075 map<uint64_t,CommandOp*> cresend; // resend in order
2076 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2077 k != session->command_ops.end(); ++k) {
2078 logger->inc(l_osdc_command_resend);
2079 cresend[k->first] = k->second;
2080 }
2081 while (!cresend.empty()) {
2082 _send_command(cresend.begin()->second);
2083 cresend.erase(cresend.begin());
2084 }
2085}
2086
2087void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2088 unique_lock& ul)
2089{
2090 assert(ul.owns_lock());
2091 shunique_lock sul(std::move(ul));
2092 while (!lresend.empty()) {
2093 LingerOp *op = lresend.begin()->second;
2094 if (!op->canceled) {
2095 _send_linger(op, sul);
2096 }
2097 op->put();
2098 lresend.erase(lresend.begin());
2099 }
2100 ul = unique_lock(sul.release_to_unique());
2101}
2102
2103void Objecter::start_tick()
2104{
2105 assert(tick_event == 0);
2106 tick_event =
2107 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2108 &Objecter::tick, this);
2109}
2110
2111void Objecter::tick()
2112{
2113 shared_lock rl(rwlock);
2114
2115 ldout(cct, 10) << "tick" << dendl;
2116
2117 // we are only called by C_Tick
2118 tick_event = 0;
2119
31f18b77 2120 if (!initialized) {
7c673cae
FG
2121 // we raced with shutdown
2122 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2123 return;
2124 }
2125
2126 set<OSDSession*> toping;
2127
2128
2129 // look for laggy requests
2130 auto cutoff = ceph::mono_clock::now();
2131 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2132
2133 unsigned laggy_ops = 0;
2134
2135 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2136 siter != osd_sessions.end(); ++siter) {
2137 OSDSession *s = siter->second;
2138 OSDSession::lock_guard l(s->lock);
2139 bool found = false;
2140 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2141 p != s->ops.end();
2142 ++p) {
2143 Op *op = p->second;
2144 assert(op->session);
2145 if (op->stamp < cutoff) {
2146 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2147 << " is laggy" << dendl;
2148 found = true;
2149 ++laggy_ops;
2150 }
2151 }
2152 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2153 p != s->linger_ops.end();
2154 ++p) {
2155 LingerOp *op = p->second;
2156 LingerOp::unique_lock wl(op->watch_lock);
2157 assert(op->session);
2158 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2159 << " (osd." << op->session->osd << ")" << dendl;
2160 found = true;
2161 if (op->is_watch && op->registered && !op->last_error)
2162 _send_linger_ping(op);
2163 }
2164 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2165 p != s->command_ops.end();
2166 ++p) {
2167 CommandOp *op = p->second;
2168 assert(op->session);
2169 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2170 << " (osd." << op->session->osd << ")" << dendl;
2171 found = true;
2172 }
2173 if (found)
2174 toping.insert(s);
2175 }
31f18b77 2176 if (num_homeless_ops || !toping.empty()) {
7c673cae
FG
2177 _maybe_request_map();
2178 }
2179
2180 logger->set(l_osdc_op_laggy, laggy_ops);
2181 logger->set(l_osdc_osd_laggy, toping.size());
2182
2183 if (!toping.empty()) {
2184 // send a ping to these osds, to ensure we detect any session resets
2185 // (osd reply message policy is lossy)
2186 for (set<OSDSession*>::const_iterator i = toping.begin();
2187 i != toping.end();
2188 ++i) {
2189 (*i)->con->send_message(new MPing);
2190 }
2191 }
2192
2193 // Make sure we don't resechedule if we wake up after shutdown
31f18b77 2194 if (initialized) {
7c673cae
FG
2195 tick_event = timer.reschedule_me(ceph::make_timespan(
2196 cct->_conf->objecter_tick_interval));
2197 }
2198}
2199
2200void Objecter::resend_mon_ops()
2201{
2202 unique_lock wl(rwlock);
2203
2204 ldout(cct, 10) << "resend_mon_ops" << dendl;
2205
2206 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2207 p != poolstat_ops.end();
2208 ++p) {
2209 _poolstat_submit(p->second);
2210 logger->inc(l_osdc_poolstat_resend);
2211 }
2212
2213 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2214 p != statfs_ops.end();
2215 ++p) {
2216 _fs_stats_submit(p->second);
2217 logger->inc(l_osdc_statfs_resend);
2218 }
2219
2220 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2221 p != pool_ops.end();
2222 ++p) {
2223 _pool_op_submit(p->second);
2224 logger->inc(l_osdc_poolop_resend);
2225 }
2226
2227 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2228 p != check_latest_map_ops.end();
2229 ++p) {
2230 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2231 monc->get_version("osdmap", &c->latest, NULL, c);
2232 }
2233
2234 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2235 p != check_latest_map_lingers.end();
2236 ++p) {
2237 C_Linger_Map_Latest *c
2238 = new C_Linger_Map_Latest(this, p->second->linger_id);
2239 monc->get_version("osdmap", &c->latest, NULL, c);
2240 }
2241
2242 for (map<uint64_t, CommandOp*>::iterator p
2243 = check_latest_map_commands.begin();
2244 p != check_latest_map_commands.end();
2245 ++p) {
2246 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2247 monc->get_version("osdmap", &c->latest, NULL, c);
2248 }
2249}
2250
2251// read | write ---------------------------
2252
2253void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2254{
2255 shunique_lock rl(rwlock, ceph::acquire_shared);
2256 ceph_tid_t tid = 0;
2257 if (!ptid)
2258 ptid = &tid;
31f18b77 2259 op->trace.event("op submit");
7c673cae
FG
2260 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2261}
2262
2263void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2264 ceph_tid_t *ptid,
2265 int *ctx_budget)
2266{
31f18b77 2267 assert(initialized);
7c673cae
FG
2268
2269 assert(op->ops.size() == op->out_bl.size());
2270 assert(op->ops.size() == op->out_rval.size());
2271 assert(op->ops.size() == op->out_handler.size());
2272
2273 // throttle. before we look at any state, because
2274 // _take_op_budget() may drop our lock while it blocks.
2275 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2276 int op_budget = _take_op_budget(op, sul);
2277 // take and pass out the budget for the first OP
2278 // in the context session
2279 if (ctx_budget && (*ctx_budget == -1)) {
2280 *ctx_budget = op_budget;
2281 }
2282 }
2283
2284 if (osd_timeout > timespan(0)) {
2285 if (op->tid == 0)
31f18b77 2286 op->tid = ++last_tid;
7c673cae
FG
2287 auto tid = op->tid;
2288 op->ontimeout = timer.add_event(osd_timeout,
2289 [this, tid]() {
2290 op_cancel(tid, -ETIMEDOUT); });
2291 }
2292
2293 _op_submit(op, sul, ptid);
2294}
2295
2296void Objecter::_send_op_account(Op *op)
2297{
31f18b77 2298 inflight_ops++;
7c673cae
FG
2299
2300 // add to gather set(s)
2301 if (op->onfinish) {
31f18b77 2302 num_in_flight++;
7c673cae
FG
2303 } else {
2304 ldout(cct, 20) << " note: not requesting reply" << dendl;
2305 }
2306
2307 logger->inc(l_osdc_op_active);
2308 logger->inc(l_osdc_op);
2309
2310 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2311 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2312 logger->inc(l_osdc_op_rmw);
2313 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2314 logger->inc(l_osdc_op_w);
2315 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2316 logger->inc(l_osdc_op_r);
2317
2318 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2319 logger->inc(l_osdc_op_pg);
2320
2321 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2322 int code = l_osdc_osdop_other;
2323 switch (p->op.op) {
2324 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2325 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2326 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2327 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2328 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2329 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2330 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2331 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2332 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2333 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2334 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2335 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2336 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2337 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2338 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2339 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2340 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
2341 case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
2342 case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
2343 case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
2344
2345 // OMAP read operations
2346 case CEPH_OSD_OP_OMAPGETVALS:
2347 case CEPH_OSD_OP_OMAPGETKEYS:
2348 case CEPH_OSD_OP_OMAPGETHEADER:
2349 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2350 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2351
2352 // OMAP write operations
2353 case CEPH_OSD_OP_OMAPSETVALS:
2354 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2355
2356 // OMAP del operations
2357 case CEPH_OSD_OP_OMAPCLEAR:
2358 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2359
2360 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2361 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2362 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2363 }
2364 if (code)
2365 logger->inc(code);
2366 }
2367}
2368
2369void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2370{
2371 // rwlock is locked
2372
2373 ldout(cct, 10) << __func__ << " op " << op << dendl;
2374
2375 // pick target
2376 assert(op->session == NULL);
2377 OSDSession *s = NULL;
2378
2379 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2380 == RECALC_OP_TARGET_POOL_DNE;
2381
2382 // Try to get a session, including a retry if we need to take write lock
2383 int r = _get_session(op->target.osd, &s, sul);
2384 if (r == -EAGAIN ||
2385 (check_for_latest_map && sul.owns_lock_shared())) {
2386 epoch_t orig_epoch = osdmap->get_epoch();
2387 sul.unlock();
2388 if (cct->_conf->objecter_debug_inject_relock_delay) {
2389 sleep(1);
2390 }
2391 sul.lock();
2392 if (orig_epoch != osdmap->get_epoch()) {
2393 // map changed; recalculate mapping
2394 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2395 << dendl;
2396 check_for_latest_map = _calc_target(&op->target, nullptr)
2397 == RECALC_OP_TARGET_POOL_DNE;
2398 if (s) {
2399 put_session(s);
2400 s = NULL;
2401 r = -EAGAIN;
2402 }
2403 }
2404 }
2405 if (r == -EAGAIN) {
2406 assert(s == NULL);
2407 r = _get_session(op->target.osd, &s, sul);
2408 }
2409 assert(r == 0);
2410 assert(s); // may be homeless
2411
2412 _send_op_account(op);
2413
2414 // send?
2415
2416 assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
2417
2418 if (osdmap_full_try) {
2419 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2420 }
2421
2422 bool need_send = false;
2423
2424 if (osdmap->get_epoch() < epoch_barrier) {
2425 ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
2426 << dendl;
2427 op->target.paused = true;
2428 _maybe_request_map();
2429 } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
2430 osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2431 ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
2432 << dendl;
2433 op->target.paused = true;
2434 _maybe_request_map();
2435 } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
2436 osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
2437 ldout(cct, 10) << " paused read " << op << " tid " << op->tid
2438 << dendl;
2439 op->target.paused = true;
2440 _maybe_request_map();
2441 } else if (op->respects_full() &&
2442 (_osdmap_full_flag() ||
2443 _osdmap_pool_full(op->target.base_oloc.pool))) {
2444 ldout(cct, 0) << " FULL, paused modify " << op << " tid "
2445 << op->tid << dendl;
2446 op->target.paused = true;
2447 _maybe_request_map();
2448 } else if (!s->is_homeless()) {
2449 need_send = true;
2450 } else {
2451 _maybe_request_map();
2452 }
2453
2454 MOSDOp *m = NULL;
2455 if (need_send) {
2456 m = _prepare_osd_op(op);
2457 }
2458
2459 OSDSession::unique_lock sl(s->lock);
2460 if (op->tid == 0)
31f18b77 2461 op->tid = ++last_tid;
7c673cae
FG
2462
2463 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2464 << " '" << op->target.base_oloc << "' '"
2465 << op->target.target_oloc << "' " << op->ops << " tid "
2466 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2467 << dendl;
2468
2469 _session_op_assign(s, op);
2470
2471 if (need_send) {
2472 _send_op(op, m);
2473 }
2474
2475 // Last chance to touch Op here, after giving up session lock it can
2476 // be freed at any time by response handler.
2477 ceph_tid_t tid = op->tid;
2478 if (check_for_latest_map) {
2479 _send_op_map_check(op);
2480 }
2481 if (ptid)
2482 *ptid = tid;
2483 op = NULL;
2484
2485 sl.unlock();
2486 put_session(s);
2487
31f18b77 2488 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
2489}
2490
2491int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2492{
31f18b77 2493 assert(initialized);
7c673cae
FG
2494
2495 OSDSession::unique_lock sl(s->lock);
2496
2497 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2498 if (p == s->ops.end()) {
2499 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2500 << s->osd << dendl;
2501 return -ENOENT;
2502 }
2503
2504 if (s->con) {
2505 ldout(cct, 20) << " revoking rx buffer for " << tid
2506 << " on " << s->con << dendl;
2507 s->con->revoke_rx_buffer(tid);
2508 }
2509
2510 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2511 << dendl;
2512 Op *op = p->second;
2513 if (op->onfinish) {
31f18b77 2514 num_in_flight--;
7c673cae
FG
2515 op->onfinish->complete(r);
2516 op->onfinish = NULL;
2517 }
2518 _op_cancel_map_check(op);
2519 _finish_op(op, r);
2520 sl.unlock();
2521
2522 return 0;
2523}
2524
2525int Objecter::op_cancel(ceph_tid_t tid, int r)
2526{
2527 int ret = 0;
2528
2529 unique_lock wl(rwlock);
2530 ret = _op_cancel(tid, r);
2531
2532 return ret;
2533}
2534
2535int Objecter::_op_cancel(ceph_tid_t tid, int r)
2536{
2537 int ret = 0;
2538
2539 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2540 << dendl;
2541
2542start:
2543
2544 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2545 siter != osd_sessions.end(); ++siter) {
2546 OSDSession *s = siter->second;
2547 OSDSession::shared_lock sl(s->lock);
2548 if (s->ops.find(tid) != s->ops.end()) {
2549 sl.unlock();
2550 ret = op_cancel(s, tid, r);
2551 if (ret == -ENOENT) {
2552 /* oh no! raced, maybe tid moved to another session, restarting */
2553 goto start;
2554 }
2555 return ret;
2556 }
2557 }
2558
2559 ldout(cct, 5) << __func__ << ": tid " << tid
2560 << " not found in live sessions" << dendl;
2561
2562 // Handle case where the op is in homeless session
2563 OSDSession::shared_lock sl(homeless_session->lock);
2564 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2565 sl.unlock();
2566 ret = op_cancel(homeless_session, tid, r);
2567 if (ret == -ENOENT) {
2568 /* oh no! raced, maybe tid moved to another session, restarting */
2569 goto start;
2570 } else {
2571 return ret;
2572 }
2573 } else {
2574 sl.unlock();
2575 }
2576
2577 ldout(cct, 5) << __func__ << ": tid " << tid
2578 << " not found in homeless session" << dendl;
2579
2580 return ret;
2581}
2582
2583
2584epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2585{
2586 unique_lock wl(rwlock);
2587
2588 std::vector<ceph_tid_t> to_cancel;
2589 bool found = false;
2590
2591 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2592 siter != osd_sessions.end(); ++siter) {
2593 OSDSession *s = siter->second;
2594 OSDSession::shared_lock sl(s->lock);
2595 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2596 op_i != s->ops.end(); ++op_i) {
2597 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2598 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2599 to_cancel.push_back(op_i->first);
2600 }
2601 }
2602 sl.unlock();
2603
2604 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2605 titer != to_cancel.end();
2606 ++titer) {
2607 int cancel_result = op_cancel(s, *titer, r);
2608 // We hold rwlock across search and cancellation, so cancels
2609 // should always succeed
2610 assert(cancel_result == 0);
2611 }
2612 if (!found && to_cancel.size())
2613 found = true;
2614 to_cancel.clear();
2615 }
2616
2617 const epoch_t epoch = osdmap->get_epoch();
2618
2619 wl.unlock();
2620
2621 if (found) {
2622 return epoch;
2623 } else {
2624 return -1;
2625 }
2626}
2627
2628bool Objecter::is_pg_changed(
2629 int oldprimary,
2630 const vector<int>& oldacting,
2631 int newprimary,
2632 const vector<int>& newacting,
2633 bool any_change)
2634{
2635 if (OSDMap::primary_changed(
2636 oldprimary,
2637 oldacting,
2638 newprimary,
2639 newacting))
2640 return true;
2641 if (any_change && oldacting != newacting)
2642 return true;
2643 return false; // same primary (tho replicas may have changed)
2644}
2645
2646bool Objecter::target_should_be_paused(op_target_t *t)
2647{
2648 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2649 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2650 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2651 _osdmap_full_flag() || _osdmap_pool_full(*pi);
2652
2653 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2654 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2655 (osdmap->get_epoch() < epoch_barrier);
2656}
2657
2658/**
2659 * Locking public accessor for _osdmap_full_flag
2660 */
2661bool Objecter::osdmap_full_flag() const
2662{
2663 shared_lock rl(rwlock);
2664
2665 return _osdmap_full_flag();
2666}
2667
2668bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2669{
2670 shared_lock rl(rwlock);
2671
2672 if (_osdmap_full_flag()) {
2673 return true;
2674 }
2675
2676 return _osdmap_pool_full(pool_id);
2677}
2678
2679bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2680{
2681 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2682 if (pool == NULL) {
2683 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2684 return false;
2685 }
2686
2687 return _osdmap_pool_full(*pool);
2688}
2689
2690bool Objecter::_osdmap_has_pool_full() const
2691{
2692 for (map<int64_t, pg_pool_t>::const_iterator it
2693 = osdmap->get_pools().begin();
2694 it != osdmap->get_pools().end(); ++it) {
2695 if (_osdmap_pool_full(it->second))
2696 return true;
2697 }
2698 return false;
2699}
2700
2701bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
2702{
2703 return p.has_flag(pg_pool_t::FLAG_FULL) && honor_osdmap_full;
2704}
2705
2706/**
2707 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2708 */
2709bool Objecter::_osdmap_full_flag() const
2710{
2711 // Ignore the FULL flag if the caller has honor_osdmap_full
2712 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
2713}
2714
2715void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2716{
2717 for (map<int64_t, pg_pool_t>::const_iterator it
2718 = osdmap->get_pools().begin();
2719 it != osdmap->get_pools().end(); ++it) {
2720 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2721 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2722 } else {
2723 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2724 pool_full_map[it->first];
2725 }
2726 }
2727}
2728
2729int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2730 const string& ns)
2731{
2732 shared_lock rl(rwlock);
2733 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2734 if (!p)
2735 return -ENOENT;
2736 return p->hash_key(key, ns);
2737}
2738
2739int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2740 const string& ns)
2741{
2742 shared_lock rl(rwlock);
2743 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2744 if (!p)
2745 return -ENOENT;
2746 return p->raw_hash_to_pg(p->hash_key(key, ns));
2747}
2748
2749int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2750{
2751 // rwlock is locked
2752 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2753 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2754 t->epoch = osdmap->get_epoch();
2755 ldout(cct,20) << __func__ << " epoch " << t->epoch
2756 << " base " << t->base_oid << " " << t->base_oloc
2757 << " precalc_pgid " << (int)t->precalc_pgid
2758 << " pgid " << t->base_pgid
2759 << (is_read ? " is_read" : "")
2760 << (is_write ? " is_write" : "")
2761 << dendl;
2762
2763 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2764 if (!pi) {
2765 t->osd = -1;
2766 return RECALC_OP_TARGET_POOL_DNE;
2767 }
2768 ldout(cct,30) << __func__ << " base pi " << pi
2769 << " pg_num " << pi->get_pg_num() << dendl;
2770
2771 bool force_resend = false;
2772 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2773 if (t->last_force_resend < pi->last_force_op_resend) {
2774 t->last_force_resend = pi->last_force_op_resend;
2775 force_resend = true;
2776 } else if (t->last_force_resend == 0) {
2777 force_resend = true;
2778 }
2779 }
2780
2781 // apply tiering
2782 t->target_oid = t->base_oid;
2783 t->target_oloc = t->base_oloc;
2784 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2785 if (is_read && pi->has_read_tier())
2786 t->target_oloc.pool = pi->read_tier;
2787 if (is_write && pi->has_write_tier())
2788 t->target_oloc.pool = pi->write_tier;
2789 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2790 if (!pi) {
2791 t->osd = -1;
2792 return RECALC_OP_TARGET_POOL_DNE;
2793 }
2794 }
2795
2796 pg_t pgid;
2797 if (t->precalc_pgid) {
2798 assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2799 assert(t->base_oid.name.empty()); // make sure this is a pg op
2800 assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
2801 pgid = t->base_pgid;
2802 } else {
2803 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2804 pgid);
2805 if (ret == -ENOENT) {
2806 t->osd = -1;
2807 return RECALC_OP_TARGET_POOL_DNE;
2808 }
2809 }
2810 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2811 << t->target_oloc << " -> pgid " << pgid << dendl;
2812 ldout(cct,30) << __func__ << " target pi " << pi
2813 << " pg_num " << pi->get_pg_num() << dendl;
2814 t->pool_ever_existed = true;
2815
2816 int size = pi->size;
2817 int min_size = pi->min_size;
2818 unsigned pg_num = pi->get_pg_num();
2819 int up_primary, acting_primary;
2820 vector<int> up, acting;
2821 osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
2822 &acting, &acting_primary);
2823 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
c07f9fc5 2824 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
7c673cae
FG
2825 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2826 pg_t prev_pgid(prev_seed, pgid.pool());
2827 if (any_change && PastIntervals::is_new_interval(
2828 t->acting_primary,
2829 acting_primary,
2830 t->acting,
2831 acting,
2832 t->up_primary,
2833 up_primary,
2834 t->up,
2835 up,
2836 t->size,
2837 size,
2838 t->min_size,
2839 min_size,
2840 t->pg_num,
2841 pg_num,
2842 t->sort_bitwise,
2843 sort_bitwise,
c07f9fc5
FG
2844 t->recovery_deletes,
2845 recovery_deletes,
7c673cae
FG
2846 prev_pgid)) {
2847 force_resend = true;
2848 }
2849
2850 bool unpaused = false;
2851 if (t->paused && !target_should_be_paused(t)) {
2852 t->paused = false;
2853 unpaused = true;
2854 }
2855
2856 bool legacy_change =
2857 t->pgid != pgid ||
2858 is_pg_changed(
2859 t->acting_primary, t->acting, acting_primary, acting,
2860 t->used_replica || any_change);
2861 bool split = false;
2862 if (t->pg_num) {
2863 split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
2864 }
2865
2866 if (legacy_change || split || force_resend) {
2867 t->pgid = pgid;
2868 t->acting = acting;
2869 t->acting_primary = acting_primary;
2870 t->up_primary = up_primary;
2871 t->up = up;
2872 t->size = size;
2873 t->min_size = min_size;
2874 t->pg_num = pg_num;
2875 t->pg_num_mask = pi->get_pg_num_mask();
2876 osdmap->get_primary_shard(
2877 pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
2878 &t->actual_pgid);
2879 t->sort_bitwise = sort_bitwise;
c07f9fc5 2880 t->recovery_deletes = recovery_deletes;
7c673cae
FG
2881 ldout(cct, 10) << __func__ << " "
2882 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2883 << " acting " << acting
2884 << " primary " << acting_primary << dendl;
2885 t->used_replica = false;
2886 if (acting_primary == -1) {
2887 t->osd = -1;
2888 } else {
2889 int osd;
2890 bool read = is_read && !is_write;
2891 if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2892 int p = rand() % acting.size();
2893 if (p)
2894 t->used_replica = true;
2895 osd = acting[p];
2896 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2897 << dendl;
2898 } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2899 acting.size() > 1) {
2900 // look for a local replica. prefer the primary if the
2901 // distance is the same.
2902 int best = -1;
2903 int best_locality = 0;
2904 for (unsigned i = 0; i < acting.size(); ++i) {
2905 int locality = osdmap->crush->get_common_ancestor_distance(
2906 cct, acting[i], crush_location);
2907 ldout(cct, 20) << __func__ << " localize: rank " << i
2908 << " osd." << acting[i]
2909 << " locality " << locality << dendl;
2910 if (i == 0 ||
2911 (locality >= 0 && best_locality >= 0 &&
2912 locality < best_locality) ||
2913 (best_locality < 0 && locality >= 0)) {
2914 best = i;
2915 best_locality = locality;
2916 if (i)
2917 t->used_replica = true;
2918 }
2919 }
2920 assert(best >= 0);
2921 osd = acting[best];
2922 } else {
2923 osd = acting_primary;
2924 }
2925 t->osd = osd;
2926 }
2927 }
2928 if (legacy_change || unpaused || force_resend) {
2929 return RECALC_OP_TARGET_NEED_RESEND;
2930 }
2931 if (split && con && con->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT)) {
2932 return RECALC_OP_TARGET_NEED_RESEND;
2933 }
2934 return RECALC_OP_TARGET_NO_ACTION;
2935}
2936
2937int Objecter::_map_session(op_target_t *target, OSDSession **s,
2938 shunique_lock& sul)
2939{
2940 _calc_target(target, nullptr);
2941 return _get_session(target->osd, s, sul);
2942}
2943
2944void Objecter::_session_op_assign(OSDSession *to, Op *op)
2945{
2946 // to->lock is locked
2947 assert(op->session == NULL);
2948 assert(op->tid);
2949
2950 get_session(to);
2951 op->session = to;
2952 to->ops[op->tid] = op;
2953
2954 if (to->is_homeless()) {
31f18b77 2955 num_homeless_ops++;
7c673cae
FG
2956 }
2957
2958 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
2959}
2960
2961void Objecter::_session_op_remove(OSDSession *from, Op *op)
2962{
2963 assert(op->session == from);
2964 // from->lock is locked
2965
2966 if (from->is_homeless()) {
31f18b77 2967 num_homeless_ops--;
7c673cae
FG
2968 }
2969
2970 from->ops.erase(op->tid);
2971 put_session(from);
2972 op->session = NULL;
2973
2974 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
2975}
2976
2977void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
2978{
2979 // to lock is locked unique
2980 assert(op->session == NULL);
2981
2982 if (to->is_homeless()) {
31f18b77 2983 num_homeless_ops++;
7c673cae
FG
2984 }
2985
2986 get_session(to);
2987 op->session = to;
2988 to->linger_ops[op->linger_id] = op;
2989
2990 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
2991 << dendl;
2992}
2993
2994void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
2995{
2996 assert(from == op->session);
2997 // from->lock is locked unique
2998
2999 if (from->is_homeless()) {
31f18b77 3000 num_homeless_ops--;
7c673cae
FG
3001 }
3002
3003 from->linger_ops.erase(op->linger_id);
3004 put_session(from);
3005 op->session = NULL;
3006
3007 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3008 << dendl;
3009}
3010
3011void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3012{
3013 assert(from == op->session);
3014 // from->lock is locked
3015
3016 if (from->is_homeless()) {
31f18b77 3017 num_homeless_ops--;
7c673cae
FG
3018 }
3019
3020 from->command_ops.erase(op->tid);
3021 put_session(from);
3022 op->session = NULL;
3023
3024 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3025}
3026
3027void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3028{
3029 // to->lock is locked
3030 assert(op->session == NULL);
3031 assert(op->tid);
3032
3033 if (to->is_homeless()) {
31f18b77 3034 num_homeless_ops++;
7c673cae
FG
3035 }
3036
3037 get_session(to);
3038 op->session = to;
3039 to->command_ops[op->tid] = op;
3040
3041 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3042}
3043
3044int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3045 shunique_lock& sul)
3046{
3047 // rwlock is locked unique
3048
3049 int r = _calc_target(&linger_op->target, nullptr, true);
3050 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3051 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3052 << " pgid " << linger_op->target.pgid
3053 << " acting " << linger_op->target.acting << dendl;
3054
3055 OSDSession *s = NULL;
3056 r = _get_session(linger_op->target.osd, &s, sul);
3057 assert(r == 0);
3058
3059 if (linger_op->session != s) {
3060 // NB locking two sessions (s and linger_op->session) at the
3061 // same time here is only safe because we are the only one that
3062 // takes two, and we are holding rwlock for write. Disable
3063 // lockdep because it doesn't know that.
3064 OSDSession::unique_lock sl(s->lock);
3065 _session_linger_op_remove(linger_op->session, linger_op);
3066 _session_linger_op_assign(s, linger_op);
3067 }
3068
3069 put_session(s);
3070 return RECALC_OP_TARGET_NEED_RESEND;
3071 }
3072 return r;
3073}
3074
3075void Objecter::_cancel_linger_op(Op *op)
3076{
3077 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3078
3079 assert(!op->should_resend);
3080 if (op->onfinish) {
3081 delete op->onfinish;
31f18b77 3082 num_in_flight--;
7c673cae
FG
3083 }
3084
3085 _finish_op(op, 0);
3086}
3087
3088void Objecter::_finish_op(Op *op, int r)
3089{
3090 ldout(cct, 15) << "finish_op " << op->tid << dendl;
3091
3092 // op->session->lock is locked unique or op->session is null
3093
3094 if (!op->ctx_budgeted && op->budgeted)
3095 put_op_budget(op);
3096
3097 if (op->ontimeout && r != -ETIMEDOUT)
3098 timer.cancel_event(op->ontimeout);
3099
3100 if (op->session) {
3101 _session_op_remove(op->session, op);
3102 }
3103
3104 logger->dec(l_osdc_op_active);
3105
3106 assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
3107
31f18b77 3108 inflight_ops--;
7c673cae
FG
3109
3110 op->put();
3111}
3112
3113void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
3114{
3115 ldout(cct, 15) << "finish_op " << tid << dendl;
3116 shared_lock rl(rwlock);
3117
3118 OSDSession::unique_lock wl(session->lock);
3119
3120 map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
3121 if (iter == session->ops.end())
3122 return;
3123
3124 Op *op = iter->second;
3125
3126 _finish_op(op, 0);
3127}
3128
3129MOSDOp *Objecter::_prepare_osd_op(Op *op)
3130{
3131 // rwlock is locked
3132
3133 int flags = op->target.flags;
3134 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3135
3136 // Nothing checks this any longer, but needed for compatibility with
3137 // pre-luminous osds
3138 flags |= CEPH_OSD_FLAG_ONDISK;
3139
3140 if (!honor_osdmap_full)
3141 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3142
3143 op->target.paused = false;
3144 op->stamp = ceph::mono_clock::now();
3145
3146 hobject_t hobj = op->target.get_hobj();
31f18b77 3147 MOSDOp *m = new MOSDOp(client_inc, op->tid,
7c673cae
FG
3148 hobj, op->target.actual_pgid,
3149 osdmap->get_epoch(),
3150 flags, op->features);
3151
3152 m->set_snapid(op->snapid);
3153 m->set_snap_seq(op->snapc.seq);
3154 m->set_snaps(op->snapc.snaps);
3155
3156 m->ops = op->ops;
3157 m->set_mtime(op->mtime);
3158 m->set_retry_attempt(op->attempts++);
31f18b77
FG
3159
3160 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3161 op->trace.init("op", &trace_endpoint);
3162 }
7c673cae
FG
3163
3164 if (op->priority)
3165 m->set_priority(op->priority);
3166 else
3167 m->set_priority(cct->_conf->osd_client_op_priority);
3168
3169 if (op->reqid != osd_reqid_t()) {
3170 m->set_reqid(op->reqid);
3171 }
3172
3173 logger->inc(l_osdc_op_send);
3174 logger->inc(l_osdc_op_send_bytes, m->get_data().length());
3175
3176 return m;
3177}
3178
3179void Objecter::_send_op(Op *op, MOSDOp *m)
3180{
3181 // rwlock is locked
3182 // op->session->lock is locked
3183
3184 // backoff?
3185 hobject_t hoid = op->target.get_hobj();
3186 auto p = op->session->backoffs.find(op->target.actual_pgid);
3187 if (p != op->session->backoffs.end()) {
3188 auto q = p->second.lower_bound(hoid);
3189 if (q != p->second.begin()) {
3190 --q;
3191 if (hoid >= q->second.end) {
3192 ++q;
3193 }
3194 }
3195 if (q != p->second.end()) {
3196 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3197 << "," << q->second.end << ")" << dendl;
3198 int r = cmp(hoid, q->second.begin);
3199 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3200 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3201 << " id " << q->second.id << " on " << hoid
3202 << ", queuing " << op << " tid " << op->tid << dendl;
3203 return;
3204 }
3205 }
3206 }
3207
3208 if (!m) {
3209 assert(op->tid > 0);
3210 m = _prepare_osd_op(op);
3211 }
3212
3213 if (op->target.actual_pgid != m->get_spg()) {
3214 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3215 << m->get_spg() << " to " << op->target.actual_pgid
3216 << ", updating and reencoding" << dendl;
3217 m->set_spg(op->target.actual_pgid);
3218 m->clear_payload(); // reencode
3219 }
3220
3221 ldout(cct, 15) << "_send_op " << op->tid << " to "
3222 << op->target.actual_pgid << " on osd." << op->session->osd
3223 << dendl;
3224
3225 ConnectionRef con = op->session->con;
3226 assert(con);
3227
3228 // preallocated rx buffer?
3229 if (op->con) {
3230 ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
3231 << op->con << dendl;
3232 op->con->revoke_rx_buffer(op->tid);
3233 }
3234 if (op->outbl &&
3235 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3236 op->outbl->length()) {
3237 ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
3238 << dendl;
3239 op->con = con;
3240 op->con->post_rx_buffer(op->tid, *op->outbl);
3241 }
3242
3243 op->incarnation = op->session->incarnation;
3244
3245 m->set_tid(op->tid);
3246
31f18b77
FG
3247 if (op->trace.valid()) {
3248 m->trace.init("op msg", nullptr, &op->trace);
3249 }
7c673cae
FG
3250 op->session->con->send_message(m);
3251}
3252
3253int Objecter::calc_op_budget(Op *op)
3254{
3255 int op_budget = 0;
3256 for (vector<OSDOp>::iterator i = op->ops.begin();
3257 i != op->ops.end();
3258 ++i) {
3259 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3260 op_budget += i->indata.length();
3261 } else if (ceph_osd_op_mode_read(i->op.op)) {
3262 if (ceph_osd_op_type_data(i->op.op)) {
3263 if ((int64_t)i->op.extent.length > 0)
3264 op_budget += (int64_t)i->op.extent.length;
3265 } else if (ceph_osd_op_type_attr(i->op.op)) {
3266 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
3267 }
3268 }
3269 }
3270 return op_budget;
3271}
3272
3273void Objecter::_throttle_op(Op *op,
3274 shunique_lock& sul,
3275 int op_budget)
3276{
3277 assert(sul && sul.mutex() == &rwlock);
3278 bool locked_for_write = sul.owns_lock();
3279
3280 if (!op_budget)
3281 op_budget = calc_op_budget(op);
3282 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3283 sul.unlock();
3284 op_throttle_bytes.get(op_budget);
3285 if (locked_for_write)
3286 sul.lock();
3287 else
3288 sul.lock_shared();
3289 }
3290 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3291 sul.unlock();
3292 op_throttle_ops.get(1);
3293 if (locked_for_write)
3294 sul.lock();
3295 else
3296 sul.lock_shared();
3297 }
3298}
3299
3300void Objecter::unregister_op(Op *op)
3301{
3302 OSDSession::unique_lock sl(op->session->lock);
3303 op->session->ops.erase(op->tid);
3304 sl.unlock();
3305 put_session(op->session);
3306 op->session = NULL;
3307
31f18b77 3308 inflight_ops--;
7c673cae
FG
3309}
3310
3311/* This function DOES put the passed message before returning */
3312void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3313{
3314 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3315
3316 // get pio
3317 ceph_tid_t tid = m->get_tid();
3318
3319 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3320 if (!initialized) {
7c673cae
FG
3321 m->put();
3322 return;
3323 }
3324
3325 ConnectionRef con = m->get_connection();
3326 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3327 if (!s || s->con != con) {
3328 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3329 if (s) {
3330 s->put();
3331 }
3332 m->put();
3333 return;
3334 }
3335
3336 OSDSession::unique_lock sl(s->lock);
3337
3338 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3339 if (iter == s->ops.end()) {
3340 ldout(cct, 7) << "handle_osd_op_reply " << tid
3341 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3342 " onnvram" : " ack"))
3343 << " ... stray" << dendl;
3344 sl.unlock();
3345 put_session(s);
3346 m->put();
3347 return;
3348 }
3349
3350 ldout(cct, 7) << "handle_osd_op_reply " << tid
3351 << (m->is_ondisk() ? " ondisk" :
3352 (m->is_onnvram() ? " onnvram" : " ack"))
3353 << " uv " << m->get_user_version()
3354 << " in " << m->get_pg()
3355 << " attempt " << m->get_retry_attempt()
3356 << dendl;
3357 Op *op = iter->second;
31f18b77 3358 op->trace.event("osd op reply");
7c673cae
FG
3359
3360 if (retry_writes_after_first_reply && op->attempts == 1 &&
3361 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3362 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3363 if (op->onfinish) {
31f18b77 3364 num_in_flight--;
7c673cae
FG
3365 }
3366 _session_op_remove(s, op);
3367 sl.unlock();
3368 put_session(s);
3369
3370 _op_submit(op, sul, NULL);
3371 m->put();
3372 return;
3373 }
3374
3375 if (m->get_retry_attempt() >= 0) {
3376 if (m->get_retry_attempt() != (op->attempts - 1)) {
3377 ldout(cct, 7) << " ignoring reply from attempt "
3378 << m->get_retry_attempt()
3379 << " from " << m->get_source_inst()
3380 << "; last attempt " << (op->attempts - 1) << " sent to "
3381 << op->session->con->get_peer_addr() << dendl;
3382 m->put();
3383 sl.unlock();
3384 put_session(s);
3385 return;
3386 }
3387 } else {
3388 // we don't know the request attempt because the server is old, so
3389 // just accept this one. we may do ACK callbacks we shouldn't
3390 // have, but that is better than doing callbacks out of order.
3391 }
3392
3393 Context *onfinish = 0;
3394
3395 int rc = m->get_result();
3396
3397 if (m->is_redirect_reply()) {
3398 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3399 if (op->onfinish)
31f18b77 3400 num_in_flight--;
7c673cae
FG
3401 _session_op_remove(s, op);
3402 sl.unlock();
3403 put_session(s);
3404
3405 // FIXME: two redirects could race and reorder
3406
3407 op->tid = 0;
3408 m->get_redirect().combine_with_locator(op->target.target_oloc,
3409 op->target.target_oid.name);
3410 op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
3411 _op_submit(op, sul, NULL);
3412 m->put();
3413 return;
3414 }
3415
3416 if (rc == -EAGAIN) {
3417 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
c07f9fc5
FG
3418 if (op->onfinish)
3419 num_in_flight--;
3420 _session_op_remove(s, op);
7c673cae
FG
3421 sl.unlock();
3422 put_session(s);
c07f9fc5
FG
3423
3424 op->tid = 0;
3425 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3426 CEPH_OSD_FLAG_LOCALIZE_READS);
3427 op->target.pgid = pg_t();
3428 _op_submit(op, sul, NULL);
7c673cae
FG
3429 m->put();
3430 return;
3431 }
3432
3433 sul.unlock();
3434
3435 if (op->objver)
3436 *op->objver = m->get_user_version();
3437 if (op->reply_epoch)
3438 *op->reply_epoch = m->get_map_epoch();
3439 if (op->data_offset)
3440 *op->data_offset = m->get_header().data_off;
3441
3442 // got data?
3443 if (op->outbl) {
3444 if (op->con)
3445 op->con->revoke_rx_buffer(op->tid);
3446 m->claim_data(*op->outbl);
3447 op->outbl = 0;
3448 }
3449
3450 // per-op result demuxing
3451 vector<OSDOp> out_ops;
3452 m->claim_ops(out_ops);
3453
3454 if (out_ops.size() != op->ops.size())
3455 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3456 << " != request ops " << op->ops
3457 << " from " << m->get_source_inst() << dendl;
3458
3459 vector<bufferlist*>::iterator pb = op->out_bl.begin();
3460 vector<int*>::iterator pr = op->out_rval.begin();
3461 vector<Context*>::iterator ph = op->out_handler.begin();
3462 assert(op->out_bl.size() == op->out_rval.size());
3463 assert(op->out_bl.size() == op->out_handler.size());
3464 vector<OSDOp>::iterator p = out_ops.begin();
3465 for (unsigned i = 0;
3466 p != out_ops.end() && pb != op->out_bl.end();
3467 ++i, ++p, ++pb, ++pr, ++ph) {
3468 ldout(cct, 10) << " op " << i << " rval " << p->rval
3469 << " len " << p->outdata.length() << dendl;
3470 if (*pb)
3471 **pb = p->outdata;
3472 // set rval before running handlers so that handlers
3473 // can change it if e.g. decoding fails
3474 if (*pr)
31f18b77 3475 **pr = ceph_to_hostos_errno(p->rval);
7c673cae
FG
3476 if (*ph) {
3477 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
31f18b77 3478 (*ph)->complete(ceph_to_hostos_errno(p->rval));
7c673cae
FG
3479 *ph = NULL;
3480 }
3481 }
3482
3483 // NOTE: we assume that since we only request ONDISK ever we will
3484 // only ever get back one (type of) ack ever.
3485
3486 if (op->onfinish) {
31f18b77 3487 num_in_flight--;
7c673cae
FG
3488 onfinish = op->onfinish;
3489 op->onfinish = NULL;
3490 }
3491 logger->inc(l_osdc_op_reply);
3492
3493 /* get it before we call _finish_op() */
3494 auto completion_lock = s->get_lock(op->target.base_oid);
3495
3496 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3497 _finish_op(op, 0);
3498
31f18b77 3499 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
7c673cae
FG
3500
3501 // serialize completions
3502 if (completion_lock.mutex()) {
3503 completion_lock.lock();
3504 }
3505 sl.unlock();
3506
3507 // do callbacks
3508 if (onfinish) {
3509 onfinish->complete(rc);
3510 }
3511 if (completion_lock.mutex()) {
3512 completion_lock.unlock();
3513 }
3514
3515 m->put();
3516 put_session(s);
3517}
3518
3519void Objecter::handle_osd_backoff(MOSDBackoff *m)
3520{
3521 ldout(cct, 10) << __func__ << " " << *m << dendl;
3522 shunique_lock sul(rwlock, ceph::acquire_shared);
31f18b77 3523 if (!initialized) {
7c673cae
FG
3524 m->put();
3525 return;
3526 }
3527
3528 ConnectionRef con = m->get_connection();
3529 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3530 if (!s || s->con != con) {
3531 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3532 if (s)
3533 s->put();
3534 m->put();
3535 return;
3536 }
3537
3538 get_session(s);
3539 s->put(); // from get_priv() above
3540
3541 OSDSession::unique_lock sl(s->lock);
3542
3543 switch (m->op) {
3544 case CEPH_OSD_BACKOFF_OP_BLOCK:
3545 {
3546 // register
3547 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3548 s->backoffs_by_id.insert(make_pair(m->id, &b));
3549 b.pgid = m->pgid;
3550 b.id = m->id;
3551 b.begin = m->begin;
3552 b.end = m->end;
3553
3554 // ack with original backoff's epoch so that the osd can discard this if
3555 // there was a pg split.
3556 Message *r = new MOSDBackoff(m->pgid,
3557 m->map_epoch,
3558 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3559 m->id, m->begin, m->end);
3560 // this priority must match the MOSDOps from _prepare_osd_op
3561 r->set_priority(cct->_conf->osd_client_op_priority);
3562 con->send_message(r);
3563 }
3564 break;
3565
3566 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3567 {
3568 auto p = s->backoffs_by_id.find(m->id);
3569 if (p != s->backoffs_by_id.end()) {
3570 OSDBackoff *b = p->second;
3571 if (b->begin != m->begin &&
3572 b->end != m->end) {
3573 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3574 << " unblock on ["
3575 << m->begin << "," << m->end << ") but backoff is ["
3576 << b->begin << "," << b->end << ")" << dendl;
3577 // hrmpf, unblock it anyway.
3578 }
3579 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3580 << " id " << b->id
3581 << " [" << b->begin << "," << b->end
3582 << ")" << dendl;
3583 auto spgp = s->backoffs.find(b->pgid);
3584 assert(spgp != s->backoffs.end());
3585 spgp->second.erase(b->begin);
3586 if (spgp->second.empty()) {
3587 s->backoffs.erase(spgp);
3588 }
3589 s->backoffs_by_id.erase(p);
3590
3591 // check for any ops to resend
3592 for (auto& q : s->ops) {
3593 if (q.second->target.actual_pgid == m->pgid) {
3594 int r = q.second->target.contained_by(m->begin, m->end);
3595 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3596 << q.second->target.get_hobj() << dendl;
3597 if (r) {
3598 _send_op(q.second);
3599 }
3600 }
3601 }
3602 } else {
3603 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3604 << " unblock on ["
3605 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3606 }
3607 }
3608 break;
3609
3610 default:
3611 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3612 }
3613
3614 sul.unlock();
3615 sl.unlock();
3616
3617 m->put();
3618 put_session(s);
3619}
3620
3621uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3622 uint32_t pos)
3623{
3624 shared_lock rl(rwlock);
3625 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3626 pos, list_context->pool_id, string());
3627 ldout(cct, 10) << __func__ << list_context
3628 << " pos " << pos << " -> " << list_context->pos << dendl;
3629 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3630 list_context->current_pg = actual.ps();
3631 list_context->at_end_of_pool = false;
3632 return pos;
3633}
3634
3635uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3636 const hobject_t& cursor)
3637{
3638 shared_lock rl(rwlock);
3639 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3640 list_context->pos = cursor;
3641 list_context->at_end_of_pool = false;
3642 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3643 list_context->current_pg = actual.ps();
3644 list_context->sort_bitwise = true;
3645 return list_context->current_pg;
3646}
3647
3648void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3649 hobject_t *cursor)
3650{
3651 shared_lock rl(rwlock);
3652 if (list_context->list.empty()) {
3653 *cursor = list_context->pos;
3654 } else {
3655 const librados::ListObjectImpl& entry = list_context->list.front();
3656 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3657 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3658 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3659 }
3660}
3661
3662void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3663{
3664 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3665 << " pool_snap_seq " << list_context->pool_snap_seq
3666 << " max_entries " << list_context->max_entries
3667 << " list_context " << list_context
3668 << " onfinish " << onfinish
3669 << " current_pg " << list_context->current_pg
3670 << " pos " << list_context->pos << dendl;
3671
3672 shared_lock rl(rwlock);
3673 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3674 if (!pool) { // pool is gone
3675 rl.unlock();
3676 put_nlist_context_budget(list_context);
3677 onfinish->complete(-ENOENT);
3678 return;
3679 }
3680 int pg_num = pool->get_pg_num();
3681 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3682
3683 if (list_context->pos.is_min()) {
3684 list_context->starting_pg_num = 0;
3685 list_context->sort_bitwise = sort_bitwise;
3686 list_context->starting_pg_num = pg_num;
3687 }
3688 if (list_context->sort_bitwise != sort_bitwise) {
3689 list_context->pos = hobject_t(
3690 object_t(), string(), CEPH_NOSNAP,
3691 list_context->current_pg, list_context->pool_id, string());
3692 list_context->sort_bitwise = sort_bitwise;
3693 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3694 << list_context->pos << dendl;
3695 }
3696 if (list_context->starting_pg_num != pg_num) {
3697 if (!sort_bitwise) {
3698 // start reading from the beginning; the pgs have changed
3699 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3700 list_context->pos = collection_list_handle_t();
3701 }
3702 list_context->starting_pg_num = pg_num;
3703 }
3704
3705 if (list_context->pos.is_max()) {
3706 ldout(cct, 20) << __func__ << " end of pool, list "
3707 << list_context->list << dendl;
3708 if (list_context->list.empty()) {
3709 list_context->at_end_of_pool = true;
3710 }
3711 // release the listing context's budget once all
3712 // OPs (in the session) are finished
3713 put_nlist_context_budget(list_context);
3714 onfinish->complete(0);
3715 return;
3716 }
3717
3718 ObjectOperation op;
3719 op.pg_nls(list_context->max_entries, list_context->filter,
3720 list_context->pos, osdmap->get_epoch());
3721 list_context->bl.clear();
3722 C_NList *onack = new C_NList(list_context, onfinish, this);
3723 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3724
3725 // note current_pg in case we don't have (or lose) SORTBITWISE
3726 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3727 rl.unlock();
3728
3729 pg_read(list_context->current_pg, oloc, op,
3730 &list_context->bl, 0, onack, &onack->epoch,
3731 &list_context->ctx_budget);
3732}
3733
3734void Objecter::_nlist_reply(NListContext *list_context, int r,
3735 Context *final_finish, epoch_t reply_epoch)
3736{
3737 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3738
3739 bufferlist::iterator iter = list_context->bl.begin();
3740 pg_nls_response_t response;
3741 bufferlist extra_info;
3742 ::decode(response, iter);
3743 if (!iter.end()) {
3744 ::decode(extra_info, iter);
3745 }
3746
3747 // if the osd returns 1 (newer code), or handle MAX, it means we
3748 // hit the end of the pg.
3749 if ((response.handle.is_max() || r == 1) &&
3750 !list_context->sort_bitwise) {
3751 // legacy OSD and !sortbitwise, figure out the next PG on our own
3752 ++list_context->current_pg;
3753 if (list_context->current_pg == list_context->starting_pg_num) {
3754 // end of pool
3755 list_context->pos = hobject_t::get_max();
3756 } else {
3757 // next pg
3758 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3759 list_context->current_pg,
3760 list_context->pool_id, string());
3761 }
3762 } else {
3763 list_context->pos = response.handle;
3764 }
3765
3766 int response_size = response.entries.size();
3767 ldout(cct, 20) << " response.entries.size " << response_size
3768 << ", response.entries " << response.entries
3769 << ", handle " << response.handle
3770 << ", tentative new pos " << list_context->pos << dendl;
3771 list_context->extra_info.append(extra_info);
3772 if (response_size) {
3773 list_context->list.splice(list_context->list.end(), response.entries);
3774 }
3775
3776 if (list_context->list.size() >= list_context->max_entries) {
3777 ldout(cct, 20) << " hit max, returning results so far, "
3778 << list_context->list << dendl;
3779 // release the listing context's budget once all
3780 // OPs (in the session) are finished
3781 put_nlist_context_budget(list_context);
3782 final_finish->complete(0);
3783 return;
3784 }
3785
3786 // continue!
3787 list_nobjects(list_context, final_finish);
3788}
3789
3790void Objecter::put_nlist_context_budget(NListContext *list_context)
3791{
3792 if (list_context->ctx_budget >= 0) {
3793 ldout(cct, 10) << " release listing context's budget " <<
3794 list_context->ctx_budget << dendl;
3795 put_op_budget_bytes(list_context->ctx_budget);
3796 list_context->ctx_budget = -1;
3797 }
3798}
3799
3800// snapshots
3801
3802int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3803 Context *onfinish)
3804{
3805 unique_lock wl(rwlock);
3806 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3807 << snap_name << dendl;
3808
3809 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3810 if (!p)
3811 return -EINVAL;
3812 if (p->snap_exists(snap_name.c_str()))
3813 return -EEXIST;
3814
3815 PoolOp *op = new PoolOp;
3816 if (!op)
3817 return -ENOMEM;
31f18b77 3818 op->tid = ++last_tid;
7c673cae
FG
3819 op->pool = pool;
3820 op->name = snap_name;
3821 op->onfinish = onfinish;
3822 op->pool_op = POOL_OP_CREATE_SNAP;
3823 pool_ops[op->tid] = op;
3824
3825 pool_op_submit(op);
3826
3827 return 0;
3828}
3829
3830struct C_SelfmanagedSnap : public Context {
3831 bufferlist bl;
3832 snapid_t *psnapid;
3833 Context *fin;
3834 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3835 void finish(int r) override {
3836 if (r == 0) {
3837 bufferlist::iterator p = bl.begin();
3838 ::decode(*psnapid, p);
3839 }
3840 fin->complete(r);
3841 }
3842};
3843
3844int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3845 Context *onfinish)
3846{
3847 unique_lock wl(rwlock);
3848 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3849 PoolOp *op = new PoolOp;
3850 if (!op) return -ENOMEM;
31f18b77 3851 op->tid = ++last_tid;
7c673cae
FG
3852 op->pool = pool;
3853 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3854 op->onfinish = fin;
3855 op->blp = &fin->bl;
3856 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3857 pool_ops[op->tid] = op;
3858
3859 pool_op_submit(op);
3860 return 0;
3861}
3862
3863int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3864 Context *onfinish)
3865{
3866 unique_lock wl(rwlock);
3867 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3868 << snap_name << dendl;
3869
3870 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3871 if (!p)
3872 return -EINVAL;
3873 if (!p->snap_exists(snap_name.c_str()))
3874 return -ENOENT;
3875
3876 PoolOp *op = new PoolOp;
3877 if (!op)
3878 return -ENOMEM;
31f18b77 3879 op->tid = ++last_tid;
7c673cae
FG
3880 op->pool = pool;
3881 op->name = snap_name;
3882 op->onfinish = onfinish;
3883 op->pool_op = POOL_OP_DELETE_SNAP;
3884 pool_ops[op->tid] = op;
3885
3886 pool_op_submit(op);
3887
3888 return 0;
3889}
3890
3891int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3892 Context *onfinish)
3893{
3894 unique_lock wl(rwlock);
3895 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3896 << snap << dendl;
3897 PoolOp *op = new PoolOp;
3898 if (!op) return -ENOMEM;
31f18b77 3899 op->tid = ++last_tid;
7c673cae
FG
3900 op->pool = pool;
3901 op->onfinish = onfinish;
3902 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3903 op->snapid = snap;
3904 pool_ops[op->tid] = op;
3905
3906 pool_op_submit(op);
3907
3908 return 0;
3909}
3910
3911int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
3912 int crush_rule)
3913{
3914 unique_lock wl(rwlock);
3915 ldout(cct, 10) << "create_pool name=" << name << dendl;
3916
3917 if (osdmap->lookup_pg_pool_name(name) >= 0)
3918 return -EEXIST;
3919
3920 PoolOp *op = new PoolOp;
3921 if (!op)
3922 return -ENOMEM;
31f18b77 3923 op->tid = ++last_tid;
7c673cae
FG
3924 op->pool = 0;
3925 op->name = name;
3926 op->onfinish = onfinish;
3927 op->pool_op = POOL_OP_CREATE;
3928 pool_ops[op->tid] = op;
3929 op->auid = auid;
3930 op->crush_rule = crush_rule;
3931
3932 pool_op_submit(op);
3933
3934 return 0;
3935}
3936
3937int Objecter::delete_pool(int64_t pool, Context *onfinish)
3938{
3939 unique_lock wl(rwlock);
3940 ldout(cct, 10) << "delete_pool " << pool << dendl;
3941
3942 if (!osdmap->have_pg_pool(pool))
3943 return -ENOENT;
3944
3945 _do_delete_pool(pool, onfinish);
3946 return 0;
3947}
3948
3949int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3950{
3951 unique_lock wl(rwlock);
3952 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3953
3954 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3955 if (pool < 0)
3956 return pool;
3957
3958 _do_delete_pool(pool, onfinish);
3959 return 0;
3960}
3961
3962void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
3963{
3964 PoolOp *op = new PoolOp;
31f18b77 3965 op->tid = ++last_tid;
7c673cae
FG
3966 op->pool = pool;
3967 op->name = "delete";
3968 op->onfinish = onfinish;
3969 op->pool_op = POOL_OP_DELETE;
3970 pool_ops[op->tid] = op;
3971 pool_op_submit(op);
3972}
3973
3974/**
3975 * change the auid owner of a pool by contacting the monitor.
3976 * This requires the current connection to have write permissions
3977 * on both the pool's current auid and the new (parameter) auid.
3978 * Uses the standard Context callback when done.
3979 */
3980int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
3981{
3982 unique_lock wl(rwlock);
3983 ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
3984 PoolOp *op = new PoolOp;
3985 if (!op) return -ENOMEM;
31f18b77 3986 op->tid = ++last_tid;
7c673cae
FG
3987 op->pool = pool;
3988 op->name = "change_pool_auid";
3989 op->onfinish = onfinish;
3990 op->pool_op = POOL_OP_AUID_CHANGE;
3991 op->auid = auid;
3992 pool_ops[op->tid] = op;
3993
3994 logger->set(l_osdc_poolop_active, pool_ops.size());
3995
3996 pool_op_submit(op);
3997 return 0;
3998}
3999
4000void Objecter::pool_op_submit(PoolOp *op)
4001{
4002 // rwlock is locked
4003 if (mon_timeout > timespan(0)) {
4004 op->ontimeout = timer.add_event(mon_timeout,
4005 [this, op]() {
4006 pool_op_cancel(op->tid, -ETIMEDOUT); });
4007 }
4008 _pool_op_submit(op);
4009}
4010
4011void Objecter::_pool_op_submit(PoolOp *op)
4012{
4013 // rwlock is locked unique
4014
4015 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4016 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4017 op->name, op->pool_op,
4018 op->auid, last_seen_osdmap_version);
4019 if (op->snapid) m->snapid = op->snapid;
4020 if (op->crush_rule) m->crush_rule = op->crush_rule;
4021 monc->send_mon_message(m);
4022 op->last_submit = ceph::mono_clock::now();
4023
4024 logger->inc(l_osdc_poolop_send);
4025}
4026
4027/**
4028 * Handle a reply to a PoolOp message. Check that we sent the message
4029 * and give the caller responsibility for the returned bufferlist.
4030 * Then either call the finisher or stash the PoolOp, depending on if we
4031 * have a new enough map.
4032 * Lastly, clean up the message and PoolOp.
4033 */
4034void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4035{
4036 FUNCTRACE();
4037 shunique_lock sul(rwlock, acquire_shared);
31f18b77 4038 if (!initialized) {
7c673cae
FG
4039 sul.unlock();
4040 m->put();
4041 return;
4042 }
4043
4044 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4045 ceph_tid_t tid = m->get_tid();
4046 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4047 if (iter != pool_ops.end()) {
4048 PoolOp *op = iter->second;
4049 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4050 << ceph_pool_op_name(op->pool_op) << dendl;
4051 if (op->blp)
4052 op->blp->claim(m->response_data);
4053 if (m->version > last_seen_osdmap_version)
4054 last_seen_osdmap_version = m->version;
4055 if (osdmap->get_epoch() < m->epoch) {
4056 sul.unlock();
4057 sul.lock();
4058 // recheck op existence since we have let go of rwlock
4059 // (for promotion) above.
4060 iter = pool_ops.find(tid);
4061 if (iter == pool_ops.end())
4062 goto done; // op is gone.
4063 if (osdmap->get_epoch() < m->epoch) {
4064 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4065 << " before calling back" << dendl;
4066 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4067 } else {
4068 // map epoch changed, probably because a MOSDMap message
4069 // sneaked in. Do caller-specified callback now or else
4070 // we lose it forever.
4071 assert(op->onfinish);
4072 op->onfinish->complete(m->replyCode);
4073 }
4074 } else {
4075 assert(op->onfinish);
4076 op->onfinish->complete(m->replyCode);
4077 }
4078 op->onfinish = NULL;
4079 if (!sul.owns_lock()) {
4080 sul.unlock();
4081 sul.lock();
4082 }
4083 iter = pool_ops.find(tid);
4084 if (iter != pool_ops.end()) {
4085 _finish_pool_op(op, 0);
4086 }
4087 } else {
4088 ldout(cct, 10) << "unknown request " << tid << dendl;
4089 }
4090
4091done:
4092 // Not strictly necessary, since we'll release it on return.
4093 sul.unlock();
4094
4095 ldout(cct, 10) << "done" << dendl;
4096 m->put();
4097}
4098
4099int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4100{
31f18b77 4101 assert(initialized);
7c673cae
FG
4102
4103 unique_lock wl(rwlock);
4104
4105 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4106 if (it == pool_ops.end()) {
4107 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4108 return -ENOENT;
4109 }
4110
4111 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4112
4113 PoolOp *op = it->second;
4114 if (op->onfinish)
4115 op->onfinish->complete(r);
4116
4117 _finish_pool_op(op, r);
4118 return 0;
4119}
4120
4121void Objecter::_finish_pool_op(PoolOp *op, int r)
4122{
4123 // rwlock is locked unique
4124 pool_ops.erase(op->tid);
4125 logger->set(l_osdc_poolop_active, pool_ops.size());
4126
4127 if (op->ontimeout && r != -ETIMEDOUT) {
4128 timer.cancel_event(op->ontimeout);
4129 }
4130
4131 delete op;
4132}
4133
4134// pool stats
4135
4136void Objecter::get_pool_stats(list<string>& pools,
4137 map<string,pool_stat_t> *result,
4138 Context *onfinish)
4139{
4140 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4141
4142 PoolStatOp *op = new PoolStatOp;
31f18b77 4143 op->tid = ++last_tid;
7c673cae
FG
4144 op->pools = pools;
4145 op->pool_stats = result;
4146 op->onfinish = onfinish;
4147 if (mon_timeout > timespan(0)) {
4148 op->ontimeout = timer.add_event(mon_timeout,
4149 [this, op]() {
4150 pool_stat_op_cancel(op->tid,
4151 -ETIMEDOUT); });
4152 } else {
4153 op->ontimeout = 0;
4154 }
4155
4156 unique_lock wl(rwlock);
4157
4158 poolstat_ops[op->tid] = op;
4159
4160 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4161
4162 _poolstat_submit(op);
4163}
4164
4165void Objecter::_poolstat_submit(PoolStatOp *op)
4166{
4167 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4168 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4169 op->pools,
4170 last_seen_pgmap_version));
4171 op->last_submit = ceph::mono_clock::now();
4172
4173 logger->inc(l_osdc_poolstat_send);
4174}
4175
4176void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4177{
4178 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4179 ceph_tid_t tid = m->get_tid();
4180
4181 unique_lock wl(rwlock);
31f18b77 4182 if (!initialized) {
7c673cae
FG
4183 m->put();
4184 return;
4185 }
4186
4187 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4188 if (iter != poolstat_ops.end()) {
4189 PoolStatOp *op = poolstat_ops[tid];
4190 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4191 *op->pool_stats = m->pool_stats;
4192 if (m->version > last_seen_pgmap_version) {
4193 last_seen_pgmap_version = m->version;
4194 }
4195 op->onfinish->complete(0);
4196 _finish_pool_stat_op(op, 0);
4197 } else {
4198 ldout(cct, 10) << "unknown request " << tid << dendl;
4199 }
4200 ldout(cct, 10) << "done" << dendl;
4201 m->put();
4202}
4203
4204int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4205{
31f18b77 4206 assert(initialized);
7c673cae
FG
4207
4208 unique_lock wl(rwlock);
4209
4210 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4211 if (it == poolstat_ops.end()) {
4212 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4213 return -ENOENT;
4214 }
4215
4216 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4217
4218 PoolStatOp *op = it->second;
4219 if (op->onfinish)
4220 op->onfinish->complete(r);
4221 _finish_pool_stat_op(op, r);
4222 return 0;
4223}
4224
4225void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4226{
4227 // rwlock is locked unique
4228
4229 poolstat_ops.erase(op->tid);
4230 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4231
4232 if (op->ontimeout && r != -ETIMEDOUT)
4233 timer.cancel_event(op->ontimeout);
4234
4235 delete op;
4236}
4237
d2e6a577
FG
4238void Objecter::get_fs_stats(ceph_statfs& result,
4239 boost::optional<int64_t> data_pool,
4240 Context *onfinish)
7c673cae
FG
4241{
4242 ldout(cct, 10) << "get_fs_stats" << dendl;
4243 unique_lock l(rwlock);
4244
4245 StatfsOp *op = new StatfsOp;
31f18b77 4246 op->tid = ++last_tid;
7c673cae 4247 op->stats = &result;
d2e6a577 4248 op->data_pool = data_pool;
7c673cae
FG
4249 op->onfinish = onfinish;
4250 if (mon_timeout > timespan(0)) {
4251 op->ontimeout = timer.add_event(mon_timeout,
4252 [this, op]() {
4253 statfs_op_cancel(op->tid,
4254 -ETIMEDOUT); });
4255 } else {
4256 op->ontimeout = 0;
4257 }
4258 statfs_ops[op->tid] = op;
4259
4260 logger->set(l_osdc_statfs_active, statfs_ops.size());
4261
4262 _fs_stats_submit(op);
4263}
4264
4265void Objecter::_fs_stats_submit(StatfsOp *op)
4266{
4267 // rwlock is locked unique
4268
4269 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4270 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
d2e6a577 4271 op->data_pool,
7c673cae
FG
4272 last_seen_pgmap_version));
4273 op->last_submit = ceph::mono_clock::now();
4274
4275 logger->inc(l_osdc_statfs_send);
4276}
4277
4278void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4279{
4280 unique_lock wl(rwlock);
31f18b77 4281 if (!initialized) {
7c673cae
FG
4282 m->put();
4283 return;
4284 }
4285
4286 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4287 ceph_tid_t tid = m->get_tid();
4288
4289 if (statfs_ops.count(tid)) {
4290 StatfsOp *op = statfs_ops[tid];
4291 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4292 *(op->stats) = m->h.st;
4293 if (m->h.version > last_seen_pgmap_version)
4294 last_seen_pgmap_version = m->h.version;
4295 op->onfinish->complete(0);
4296 _finish_statfs_op(op, 0);
4297 } else {
4298 ldout(cct, 10) << "unknown request " << tid << dendl;
4299 }
4300 m->put();
4301 ldout(cct, 10) << "done" << dendl;
4302}
4303
4304int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4305{
31f18b77 4306 assert(initialized);
7c673cae
FG
4307
4308 unique_lock wl(rwlock);
4309
4310 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4311 if (it == statfs_ops.end()) {
4312 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4313 return -ENOENT;
4314 }
4315
4316 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4317
4318 StatfsOp *op = it->second;
4319 if (op->onfinish)
4320 op->onfinish->complete(r);
4321 _finish_statfs_op(op, r);
4322 return 0;
4323}
4324
4325void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4326{
4327 // rwlock is locked unique
4328
4329 statfs_ops.erase(op->tid);
4330 logger->set(l_osdc_statfs_active, statfs_ops.size());
4331
4332 if (op->ontimeout && r != -ETIMEDOUT)
4333 timer.cancel_event(op->ontimeout);
4334
4335 delete op;
4336}
4337
4338// scatter/gather
4339
4340void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4341 vector<bufferlist>& resultbl,
4342 bufferlist *bl, Context *onfinish)
4343{
4344 // all done
4345 ldout(cct, 15) << "_sg_read_finish" << dendl;
4346
4347 if (extents.size() > 1) {
4348 Striper::StripedReadResult r;
4349 vector<bufferlist>::iterator bit = resultbl.begin();
4350 for (vector<ObjectExtent>::iterator eit = extents.begin();
4351 eit != extents.end();
4352 ++eit, ++bit) {
4353 r.add_partial_result(cct, *bit, eit->buffer_extents);
4354 }
4355 bl->clear();
4356 r.assemble_result(cct, *bl, false);
4357 } else {
4358 ldout(cct, 15) << " only one frag" << dendl;
4359 bl->claim(resultbl[0]);
4360 }
4361
4362 // done
4363 uint64_t bytes_read = bl->length();
4364 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4365
4366 if (onfinish) {
4367 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4368 }
4369}
4370
4371
4372void Objecter::ms_handle_connect(Connection *con)
4373{
4374 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
31f18b77 4375 if (!initialized)
7c673cae
FG
4376 return;
4377
4378 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4379 resend_mon_ops();
4380}
4381
4382bool Objecter::ms_handle_reset(Connection *con)
4383{
31f18b77 4384 if (!initialized)
7c673cae
FG
4385 return false;
4386 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
4387 OSDSession *session = static_cast<OSDSession*>(con->get_priv());
4388 if (session) {
4389 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4390 << " osd." << session->osd << dendl;
4391 unique_lock wl(rwlock);
31f18b77 4392 if (!initialized) {
7c673cae
FG
4393 wl.unlock();
4394 return false;
4395 }
4396 map<uint64_t, LingerOp *> lresend;
4397 OSDSession::unique_lock sl(session->lock);
4398 _reopen_session(session);
4399 _kick_requests(session, lresend);
4400 sl.unlock();
4401 _linger_ops_resend(lresend, wl);
4402 wl.unlock();
4403 maybe_request_map();
4404 session->put();
4405 }
4406 return true;
4407 }
4408 return false;
4409}
4410
4411void Objecter::ms_handle_remote_reset(Connection *con)
4412{
4413 /*
4414 * treat these the same.
4415 */
4416 ms_handle_reset(con);
4417}
4418
4419bool Objecter::ms_handle_refused(Connection *con)
4420{
4421 // just log for now
4422 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4423 int osd = osdmap->identify_osd(con->get_peer_addr());
4424 if (osd >= 0) {
4425 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4426 }
4427 }
4428 return false;
4429}
4430
4431bool Objecter::ms_get_authorizer(int dest_type,
4432 AuthAuthorizer **authorizer,
4433 bool force_new)
4434{
31f18b77 4435 if (!initialized)
7c673cae
FG
4436 return false;
4437 if (dest_type == CEPH_ENTITY_TYPE_MON)
4438 return true;
4439 *authorizer = monc->build_authorizer(dest_type);
4440 return *authorizer != NULL;
4441}
4442
4443void Objecter::op_target_t::dump(Formatter *f) const
4444{
4445 f->dump_stream("pg") << pgid;
4446 f->dump_int("osd", osd);
4447 f->dump_stream("object_id") << base_oid;
4448 f->dump_stream("object_locator") << base_oloc;
4449 f->dump_stream("target_object_id") << target_oid;
4450 f->dump_stream("target_object_locator") << target_oloc;
4451 f->dump_int("paused", (int)paused);
4452 f->dump_int("used_replica", (int)used_replica);
4453 f->dump_int("precalc_pgid", (int)precalc_pgid);
4454}
4455
4456void Objecter::_dump_active(OSDSession *s)
4457{
4458 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4459 p != s->ops.end();
4460 ++p) {
4461 Op *op = p->second;
4462 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4463 << "\tosd." << (op->session ? op->session->osd : -1)
4464 << "\t" << op->target.base_oid
4465 << "\t" << op->ops << dendl;
4466 }
4467}
4468
4469void Objecter::_dump_active()
4470{
31f18b77 4471 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
7c673cae
FG
4472 << dendl;
4473 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4474 siter != osd_sessions.end(); ++siter) {
4475 OSDSession *s = siter->second;
4476 OSDSession::shared_lock sl(s->lock);
4477 _dump_active(s);
4478 sl.unlock();
4479 }
4480 _dump_active(homeless_session);
4481}
4482
4483void Objecter::dump_active()
4484{
4485 shared_lock rl(rwlock);
4486 _dump_active();
4487 rl.unlock();
4488}
4489
4490void Objecter::dump_requests(Formatter *fmt)
4491{
4492 // Read-lock on Objecter held here
4493 fmt->open_object_section("requests");
4494 dump_ops(fmt);
4495 dump_linger_ops(fmt);
4496 dump_pool_ops(fmt);
4497 dump_pool_stat_ops(fmt);
4498 dump_statfs_ops(fmt);
4499 dump_command_ops(fmt);
4500 fmt->close_section(); // requests object
4501}
4502
4503void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4504{
4505 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4506 p != s->ops.end();
4507 ++p) {
4508 Op *op = p->second;
4509 fmt->open_object_section("op");
4510 fmt->dump_unsigned("tid", op->tid);
4511 op->target.dump(fmt);
4512 fmt->dump_stream("last_sent") << op->stamp;
4513 fmt->dump_int("attempts", op->attempts);
4514 fmt->dump_stream("snapid") << op->snapid;
4515 fmt->dump_stream("snap_context") << op->snapc;
4516 fmt->dump_stream("mtime") << op->mtime;
4517
4518 fmt->open_array_section("osd_ops");
4519 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4520 it != op->ops.end();
4521 ++it) {
4522 fmt->dump_stream("osd_op") << *it;
4523 }
4524 fmt->close_section(); // osd_ops array
4525
4526 fmt->close_section(); // op object
4527 }
4528}
4529
4530void Objecter::dump_ops(Formatter *fmt)
4531{
4532 // Read-lock on Objecter held
4533 fmt->open_array_section("ops");
4534 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4535 siter != osd_sessions.end(); ++siter) {
4536 OSDSession *s = siter->second;
4537 OSDSession::shared_lock sl(s->lock);
4538 _dump_ops(s, fmt);
4539 sl.unlock();
4540 }
4541 _dump_ops(homeless_session, fmt);
4542 fmt->close_section(); // ops array
4543}
4544
4545void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4546{
4547 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4548 p != s->linger_ops.end();
4549 ++p) {
4550 LingerOp *op = p->second;
4551 fmt->open_object_section("linger_op");
4552 fmt->dump_unsigned("linger_id", op->linger_id);
4553 op->target.dump(fmt);
4554 fmt->dump_stream("snapid") << op->snap;
4555 fmt->dump_stream("registered") << op->registered;
4556 fmt->close_section(); // linger_op object
4557 }
4558}
4559
4560void Objecter::dump_linger_ops(Formatter *fmt)
4561{
4562 // We have a read-lock on the objecter
4563 fmt->open_array_section("linger_ops");
4564 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4565 siter != osd_sessions.end(); ++siter) {
4566 OSDSession *s = siter->second;
4567 OSDSession::shared_lock sl(s->lock);
4568 _dump_linger_ops(s, fmt);
4569 sl.unlock();
4570 }
4571 _dump_linger_ops(homeless_session, fmt);
4572 fmt->close_section(); // linger_ops array
4573}
4574
4575void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4576{
4577 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4578 p != s->command_ops.end();
4579 ++p) {
4580 CommandOp *op = p->second;
4581 fmt->open_object_section("command_op");
4582 fmt->dump_unsigned("command_id", op->tid);
4583 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4584 fmt->open_array_section("command");
4585 for (vector<string>::const_iterator q = op->cmd.begin();
4586 q != op->cmd.end(); ++q)
4587 fmt->dump_string("word", *q);
4588 fmt->close_section();
4589 if (op->target_osd >= 0)
4590 fmt->dump_int("target_osd", op->target_osd);
4591 else
4592 fmt->dump_stream("target_pg") << op->target_pg;
4593 fmt->close_section(); // command_op object
4594 }
4595}
4596
4597void Objecter::dump_command_ops(Formatter *fmt)
4598{
4599 // We have a read-lock on the Objecter here
4600 fmt->open_array_section("command_ops");
4601 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4602 siter != osd_sessions.end(); ++siter) {
4603 OSDSession *s = siter->second;
4604 OSDSession::shared_lock sl(s->lock);
4605 _dump_command_ops(s, fmt);
4606 sl.unlock();
4607 }
4608 _dump_command_ops(homeless_session, fmt);
4609 fmt->close_section(); // command_ops array
4610}
4611
4612void Objecter::dump_pool_ops(Formatter *fmt) const
4613{
4614 fmt->open_array_section("pool_ops");
4615 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4616 p != pool_ops.end();
4617 ++p) {
4618 PoolOp *op = p->second;
4619 fmt->open_object_section("pool_op");
4620 fmt->dump_unsigned("tid", op->tid);
4621 fmt->dump_int("pool", op->pool);
4622 fmt->dump_string("name", op->name);
4623 fmt->dump_int("operation_type", op->pool_op);
4624 fmt->dump_unsigned("auid", op->auid);
4625 fmt->dump_unsigned("crush_rule", op->crush_rule);
4626 fmt->dump_stream("snapid") << op->snapid;
4627 fmt->dump_stream("last_sent") << op->last_submit;
4628 fmt->close_section(); // pool_op object
4629 }
4630 fmt->close_section(); // pool_ops array
4631}
4632
4633void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4634{
4635 fmt->open_array_section("pool_stat_ops");
4636 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4637 p != poolstat_ops.end();
4638 ++p) {
4639 PoolStatOp *op = p->second;
4640 fmt->open_object_section("pool_stat_op");
4641 fmt->dump_unsigned("tid", op->tid);
4642 fmt->dump_stream("last_sent") << op->last_submit;
4643
4644 fmt->open_array_section("pools");
4645 for (list<string>::const_iterator it = op->pools.begin();
4646 it != op->pools.end();
4647 ++it) {
4648 fmt->dump_string("pool", *it);
4649 }
4650 fmt->close_section(); // pools array
4651
4652 fmt->close_section(); // pool_stat_op object
4653 }
4654 fmt->close_section(); // pool_stat_ops array
4655}
4656
4657void Objecter::dump_statfs_ops(Formatter *fmt) const
4658{
4659 fmt->open_array_section("statfs_ops");
4660 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4661 p != statfs_ops.end();
4662 ++p) {
4663 StatfsOp *op = p->second;
4664 fmt->open_object_section("statfs_op");
4665 fmt->dump_unsigned("tid", op->tid);
4666 fmt->dump_stream("last_sent") << op->last_submit;
4667 fmt->close_section(); // statfs_op object
4668 }
4669 fmt->close_section(); // statfs_ops array
4670}
4671
4672Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4673 m_objecter(objecter)
4674{
4675}
4676
4677bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
4678 std::string format, bufferlist& out)
4679{
4680 Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
4681 shared_lock rl(m_objecter->rwlock);
4682 m_objecter->dump_requests(f);
4683 f->flush(out);
4684 delete f;
4685 return true;
4686}
4687
4688void Objecter::blacklist_self(bool set)
4689{
4690 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4691
4692 vector<string> cmd;
4693 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4694 if (set)
4695 cmd.push_back("\"blacklistop\":\"add\",");
4696 else
4697 cmd.push_back("\"blacklistop\":\"rm\",");
4698 stringstream ss;
4699 ss << messenger->get_myaddr();
4700 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4701
4702 MMonCommand *m = new MMonCommand(monc->get_fsid());
4703 m->cmd = cmd;
4704
4705 monc->send_mon_message(m);
4706}
4707
4708// commands
4709
4710void Objecter::handle_command_reply(MCommandReply *m)
4711{
4712 unique_lock wl(rwlock);
31f18b77 4713 if (!initialized) {
7c673cae
FG
4714 m->put();
4715 return;
4716 }
4717
4718 ConnectionRef con = m->get_connection();
4719 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
4720 if (!s || s->con != con) {
4721 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4722 m->put();
4723 if (s)
4724 s->put();
4725 return;
4726 }
4727
4728 OSDSession::shared_lock sl(s->lock);
4729 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4730 if (p == s->command_ops.end()) {
4731 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4732 << " not found" << dendl;
4733 m->put();
4734 sl.unlock();
4735 if (s)
4736 s->put();
4737 return;
4738 }
4739
4740 CommandOp *c = p->second;
4741 if (!c->session ||
4742 m->get_connection() != c->session->con) {
4743 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4744 << " got reply from wrong connection "
4745 << m->get_connection() << " " << m->get_source_inst()
4746 << dendl;
4747 m->put();
4748 sl.unlock();
4749 if (s)
4750 s->put();
4751 return;
4752 }
4753 if (c->poutbl) {
4754 c->poutbl->claim(m->get_data());
4755 }
4756
4757 sl.unlock();
4758
4759
4760 _finish_command(c, m->r, m->rs);
4761 m->put();
4762 if (s)
4763 s->put();
4764}
4765
4766void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4767{
4768 shunique_lock sul(rwlock, ceph::acquire_unique);
4769
31f18b77 4770 ceph_tid_t tid = ++last_tid;
7c673cae
FG
4771 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4772 c->tid = tid;
4773
4774 {
4775 OSDSession::unique_lock hs_wl(homeless_session->lock);
4776 _session_command_op_assign(homeless_session, c);
4777 }
4778
4779 _calc_command_target(c, sul);
4780 _assign_command_session(c, sul);
4781 if (osd_timeout > timespan(0)) {
4782 c->ontimeout = timer.add_event(osd_timeout,
4783 [this, c, tid]() {
4784 command_op_cancel(c->session, tid,
4785 -ETIMEDOUT); });
4786 }
4787
4788 if (!c->session->is_homeless()) {
4789 _send_command(c);
4790 } else {
4791 _maybe_request_map();
4792 }
4793 if (c->map_check_error)
4794 _send_command_map_check(c);
4795 *ptid = tid;
4796
4797 logger->inc(l_osdc_command_active);
4798}
4799
4800int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4801{
4802 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4803
4804 c->map_check_error = 0;
4805
4806 // ignore overlays, just like we do with pg ops
4807 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4808
4809 if (c->target_osd >= 0) {
4810 if (!osdmap->exists(c->target_osd)) {
4811 c->map_check_error = -ENOENT;
4812 c->map_check_error_str = "osd dne";
4813 c->target.osd = -1;
4814 return RECALC_OP_TARGET_OSD_DNE;
4815 }
4816 if (osdmap->is_down(c->target_osd)) {
4817 c->map_check_error = -ENXIO;
4818 c->map_check_error_str = "osd down";
4819 c->target.osd = -1;
4820 return RECALC_OP_TARGET_OSD_DOWN;
4821 }
4822 c->target.osd = c->target_osd;
4823 } else {
4824 int ret = _calc_target(&(c->target), nullptr, true);
4825 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4826 c->map_check_error = -ENOENT;
4827 c->map_check_error_str = "pool dne";
4828 c->target.osd = -1;
4829 return ret;
4830 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4831 c->map_check_error = -ENXIO;
4832 c->map_check_error_str = "osd down";
4833 c->target.osd = -1;
4834 return ret;
4835 }
4836 }
4837
4838 OSDSession *s;
4839 int r = _get_session(c->target.osd, &s, sul);
4840 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4841
4842 if (c->session != s) {
4843 put_session(s);
4844 return RECALC_OP_TARGET_NEED_RESEND;
4845 }
4846
4847 put_session(s);
4848
4849 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4850 << c->session << dendl;
4851
4852 return RECALC_OP_TARGET_NO_ACTION;
4853}
4854
4855void Objecter::_assign_command_session(CommandOp *c,
4856 shunique_lock& sul)
4857{
4858 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4859
4860 OSDSession *s;
4861 int r = _get_session(c->target.osd, &s, sul);
4862 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4863
4864 if (c->session != s) {
4865 if (c->session) {
4866 OSDSession *cs = c->session;
4867 OSDSession::unique_lock csl(cs->lock);
4868 _session_command_op_remove(c->session, c);
4869 csl.unlock();
4870 }
4871 OSDSession::unique_lock sl(s->lock);
4872 _session_command_op_assign(s, c);
4873 }
4874
4875 put_session(s);
4876}
4877
4878void Objecter::_send_command(CommandOp *c)
4879{
4880 ldout(cct, 10) << "_send_command " << c->tid << dendl;
4881 assert(c->session);
4882 assert(c->session->con);
4883 MCommand *m = new MCommand(monc->monmap.fsid);
4884 m->cmd = c->cmd;
4885 m->set_data(c->inbl);
4886 m->set_tid(c->tid);
4887 c->session->con->send_message(m);
4888 logger->inc(l_osdc_command_send);
4889}
4890
4891int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4892{
31f18b77 4893 assert(initialized);
7c673cae
FG
4894
4895 unique_lock wl(rwlock);
4896
4897 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4898 if (it == s->command_ops.end()) {
4899 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4900 return -ENOENT;
4901 }
4902
4903 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4904
4905 CommandOp *op = it->second;
4906 _command_cancel_map_check(op);
4907 _finish_command(op, r, "");
4908 return 0;
4909}
4910
4911void Objecter::_finish_command(CommandOp *c, int r, string rs)
4912{
4913 // rwlock is locked unique
4914
4915 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4916 << rs << dendl;
4917 if (c->prs)
4918 *c->prs = rs;
4919 if (c->onfinish)
4920 c->onfinish->complete(r);
4921
4922 if (c->ontimeout && r != -ETIMEDOUT)
4923 timer.cancel_event(c->ontimeout);
4924
4925 OSDSession *s = c->session;
4926 OSDSession::unique_lock sl(s->lock);
4927 _session_command_op_remove(c->session, c);
4928 sl.unlock();
4929
4930 c->put();
4931
4932 logger->dec(l_osdc_command_active);
4933}
4934
4935Objecter::OSDSession::~OSDSession()
4936{
4937 // Caller is responsible for re-assigning or
4938 // destroying any ops that were assigned to us
4939 assert(ops.empty());
4940 assert(linger_ops.empty());
4941 assert(command_ops.empty());
4942}
4943
4944Objecter::~Objecter()
4945{
4946 delete osdmap;
4947
4948 assert(homeless_session->get_nref() == 1);
31f18b77 4949 assert(num_homeless_ops == 0);
7c673cae
FG
4950 homeless_session->put();
4951
4952 assert(osd_sessions.empty());
4953 assert(poolstat_ops.empty());
4954 assert(statfs_ops.empty());
4955 assert(pool_ops.empty());
4956 assert(waiting_for_map.empty());
4957 assert(linger_ops.empty());
4958 assert(check_latest_map_lingers.empty());
4959 assert(check_latest_map_ops.empty());
4960 assert(check_latest_map_commands.empty());
4961
4962 assert(!m_request_state_hook);
4963 assert(!logger);
4964}
4965
4966/**
4967 * Wait until this OSD map epoch is received before
4968 * sending any more operations to OSDs. Use this
4969 * when it is known that the client can't trust
4970 * anything from before this epoch (e.g. due to
4971 * client blacklist at this epoch).
4972 */
4973void Objecter::set_epoch_barrier(epoch_t epoch)
4974{
4975 unique_lock wl(rwlock);
4976
4977 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
4978 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
4979 << dendl;
4980 if (epoch > epoch_barrier) {
4981 epoch_barrier = epoch;
4982 _maybe_request_map();
4983 }
4984}
4985
4986
4987
4988hobject_t Objecter::enumerate_objects_begin()
4989{
4990 return hobject_t();
4991}
4992
4993hobject_t Objecter::enumerate_objects_end()
4994{
4995 return hobject_t::get_max();
4996}
4997
4998struct C_EnumerateReply : public Context {
4999 bufferlist bl;
5000
5001 Objecter *objecter;
5002 hobject_t *next;
5003 std::list<librados::ListObjectImpl> *result;
5004 const hobject_t end;
5005 const int64_t pool_id;
5006 Context *on_finish;
5007
5008 epoch_t epoch;
5009 int budget;
5010
5011 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5012 std::list<librados::ListObjectImpl> *result_,
5013 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5014 objecter(objecter_), next(next_), result(result_),
5015 end(end_), pool_id(pool_id_), on_finish(on_finish_),
5016 epoch(0), budget(0)
5017 {}
5018
5019 void finish(int r) override {
5020 objecter->_enumerate_reply(
5021 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5022 }
5023};
5024
5025void Objecter::enumerate_objects(
5026 int64_t pool_id,
5027 const std::string &ns,
5028 const hobject_t &start,
5029 const hobject_t &end,
5030 const uint32_t max,
5031 const bufferlist &filter_bl,
5032 std::list<librados::ListObjectImpl> *result,
5033 hobject_t *next,
5034 Context *on_finish)
5035{
5036 assert(result);
5037
5038 if (!end.is_max() && start > end) {
5039 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5040 on_finish->complete(-EINVAL);
5041 return;
5042 }
5043
5044 if (max < 1) {
5045 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5046 on_finish->complete(-EINVAL);
5047 return;
5048 }
5049
5050 if (start.is_max()) {
5051 on_finish->complete(0);
5052 return;
5053 }
5054
5055 shared_lock rl(rwlock);
5056 assert(osdmap->get_epoch());
5057 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5058 rl.unlock();
5059 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5060 on_finish->complete(-EOPNOTSUPP);
5061 return;
5062 }
5063 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5064 if (!p) {
5065 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5066 << osdmap->get_epoch() << dendl;
5067 rl.unlock();
5068 on_finish->complete(-ENOENT);
5069 return;
5070 } else {
5071 rl.unlock();
5072 }
5073
5074 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5075
5076 // Stash completion state
5077 C_EnumerateReply *on_ack = new C_EnumerateReply(
5078 this, next, result, end, pool_id, on_finish);
5079
5080 ObjectOperation op;
5081 op.pg_nls(max, filter_bl, start, 0);
5082
5083 // Issue. See you later in _enumerate_reply
5084 object_locator_t oloc(pool_id, ns);
5085 pg_read(start.get_hash(), oloc, op,
5086 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5087}
5088
5089void Objecter::_enumerate_reply(
5090 bufferlist &bl,
5091 int r,
5092 const hobject_t &end,
5093 const int64_t pool_id,
5094 int budget,
5095 epoch_t reply_epoch,
5096 std::list<librados::ListObjectImpl> *result,
5097 hobject_t *next,
5098 Context *on_finish)
5099{
5100 if (budget > 0) {
5101 put_op_budget_bytes(budget);
5102 }
5103
5104 if (r < 0) {
5105 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5106 on_finish->complete(r);
5107 return;
5108 }
5109
5110 assert(next != NULL);
5111
5112 // Decode the results
5113 bufferlist::iterator iter = bl.begin();
5114 pg_nls_response_t response;
5115
5116 // XXX extra_info doesn't seem used anywhere?
5117 bufferlist extra_info;
5118 ::decode(response, iter);
5119 if (!iter.end()) {
5120 ::decode(extra_info, iter);
5121 }
5122
5123 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5124 << " handle " << response.handle
5125 << " reply_epoch " << reply_epoch << dendl;
5126 ldout(cct, 20) << __func__ << ": response.entries.size "
5127 << response.entries.size() << ", response.entries "
5128 << response.entries << dendl;
5129 if (response.handle <= end) {
5130 *next = response.handle;
5131 } else {
5132 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5133 << dendl;
5134 *next = end;
5135
5136 // drop anything after 'end'
5137 shared_lock rl(rwlock);
5138 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5139 if (!pool) {
5140 // pool is gone, drop any results which are now meaningless.
5141 rl.unlock();
5142 on_finish->complete(-ENOENT);
5143 return;
5144 }
5145 while (!response.entries.empty()) {
5146 uint32_t hash = response.entries.back().locator.empty() ?
5147 pool->hash_key(response.entries.back().oid,
5148 response.entries.back().nspace) :
5149 pool->hash_key(response.entries.back().locator,
5150 response.entries.back().nspace);
5151 hobject_t last(response.entries.back().oid,
5152 response.entries.back().locator,
5153 CEPH_NOSNAP,
5154 hash,
5155 pool_id,
5156 response.entries.back().nspace);
5157 if (last < end)
5158 break;
5159 ldout(cct, 20) << __func__ << " dropping item " << last
5160 << " >= end " << end << dendl;
5161 response.entries.pop_back();
5162 }
5163 rl.unlock();
5164 }
5165 if (!response.entries.empty()) {
5166 result->merge(response.entries);
5167 }
5168
5169 // release the listing context's budget once all
5170 // OPs (in the session) are finished
5171#if 0
5172 put_nlist_context_budget(list_context);
5173#endif
5174 on_finish->complete(r);
5175 return;
5176}
5177
5178namespace {
5179 using namespace librados;
5180
5181 template <typename T>
5182 void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
5183 {
5184 for (auto bl : bls) {
5185 auto p = bl.begin();
5186 T t;
5187 decode(t, p);
5188 items.push_back(t);
5189 }
5190 }
5191
5192 struct C_ObjectOperation_scrub_ls : public Context {
5193 bufferlist bl;
5194 uint32_t *interval;
5195 std::vector<inconsistent_obj_t> *objects = nullptr;
5196 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5197 int *rval;
5198
5199 C_ObjectOperation_scrub_ls(uint32_t *interval,
5200 std::vector<inconsistent_obj_t> *objects,
5201 int *rval)
5202 : interval(interval), objects(objects), rval(rval) {}
5203 C_ObjectOperation_scrub_ls(uint32_t *interval,
5204 std::vector<inconsistent_snapset_t> *snapsets,
5205 int *rval)
5206 : interval(interval), snapsets(snapsets), rval(rval) {}
5207 void finish(int r) override {
5208 if (r < 0 && r != -EAGAIN) {
5209 if (rval)
5210 *rval = r;
5211 return;
5212 }
5213
5214 if (rval)
5215 *rval = 0;
5216
5217 try {
5218 decode();
5219 } catch (buffer::error&) {
5220 if (rval)
5221 *rval = -EIO;
5222 }
5223 }
5224 private:
5225 void decode() {
5226 scrub_ls_result_t result;
5227 auto p = bl.begin();
5228 result.decode(p);
5229 *interval = result.interval;
5230 if (objects) {
5231 do_decode(*objects, result.vals);
5232 } else {
5233 do_decode(*snapsets, result.vals);
5234 }
5235 }
5236 };
5237
5238 template <typename T>
5239 void do_scrub_ls(::ObjectOperation *op,
5240 const scrub_ls_arg_t& arg,
5241 std::vector<T> *items,
5242 uint32_t *interval,
5243 int *rval)
5244 {
5245 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5246 op->flags |= CEPH_OSD_FLAG_PGOP;
5247 assert(interval);
5248 arg.encode(osd_op.indata);
5249 unsigned p = op->ops.size() - 1;
5250 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5251 op->out_handler[p] = h;
5252 op->out_bl[p] = &h->bl;
5253 op->out_rval[p] = rval;
5254 }
5255}
5256
5257void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5258 uint64_t max_to_get,
5259 std::vector<librados::inconsistent_obj_t> *objects,
5260 uint32_t *interval,
5261 int *rval)
5262{
5263 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5264 do_scrub_ls(this, arg, objects, interval, rval);
5265}
5266
5267void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5268 uint64_t max_to_get,
5269 std::vector<librados::inconsistent_snapset_t> *snapsets,
5270 uint32_t *interval,
5271 int *rval)
5272{
5273 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5274 do_scrub_ls(this, arg, snapsets, interval, rval);
5275}