]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / osdc / Objecter.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "Objecter.h"
16#include "osd/OSDMap.h"
17#include "Filer.h"
18
19#include "mon/MonClient.h"
20
21#include "msg/Messenger.h"
22#include "msg/Message.h"
23
24#include "messages/MPing.h"
25#include "messages/MOSDOp.h"
26#include "messages/MOSDOpReply.h"
27#include "messages/MOSDBackoff.h"
28#include "messages/MOSDMap.h"
29
30#include "messages/MPoolOp.h"
31#include "messages/MPoolOpReply.h"
32
33#include "messages/MGetPoolStats.h"
34#include "messages/MGetPoolStatsReply.h"
35#include "messages/MStatfs.h"
36#include "messages/MStatfsReply.h"
37
38#include "messages/MMonCommand.h"
39
40#include "messages/MCommand.h"
41#include "messages/MCommandReply.h"
42
43#include "messages/MWatchNotify.h"
44
45#include <errno.h>
46
47#include "common/config.h"
48#include "common/perf_counters.h"
49#include "common/scrub_types.h"
50#include "include/str_list.h"
51#include "common/errno.h"
52#include "common/EventTrace.h"
53
54using ceph::real_time;
55using ceph::real_clock;
56
57using ceph::mono_clock;
58using ceph::mono_time;
59
60using ceph::timespan;
61
62
63#define dout_subsys ceph_subsys_objecter
64#undef dout_prefix
65#define dout_prefix *_dout << messenger->get_myname() << ".objecter "
66
67
68enum {
69 l_osdc_first = 123200,
70 l_osdc_op_active,
71 l_osdc_op_laggy,
72 l_osdc_op_send,
73 l_osdc_op_send_bytes,
74 l_osdc_op_resend,
75 l_osdc_op_reply,
76
77 l_osdc_op,
78 l_osdc_op_r,
79 l_osdc_op_w,
80 l_osdc_op_rmw,
81 l_osdc_op_pg,
82
83 l_osdc_osdop_stat,
84 l_osdc_osdop_create,
85 l_osdc_osdop_read,
86 l_osdc_osdop_write,
87 l_osdc_osdop_writefull,
88 l_osdc_osdop_writesame,
89 l_osdc_osdop_append,
90 l_osdc_osdop_zero,
91 l_osdc_osdop_truncate,
92 l_osdc_osdop_delete,
93 l_osdc_osdop_mapext,
94 l_osdc_osdop_sparse_read,
95 l_osdc_osdop_clonerange,
96 l_osdc_osdop_getxattr,
97 l_osdc_osdop_setxattr,
98 l_osdc_osdop_cmpxattr,
99 l_osdc_osdop_rmxattr,
100 l_osdc_osdop_resetxattrs,
101 l_osdc_osdop_tmap_up,
102 l_osdc_osdop_tmap_put,
103 l_osdc_osdop_tmap_get,
104 l_osdc_osdop_call,
105 l_osdc_osdop_watch,
106 l_osdc_osdop_notify,
107 l_osdc_osdop_src_cmpxattr,
108 l_osdc_osdop_pgls,
109 l_osdc_osdop_pgls_filter,
110 l_osdc_osdop_other,
111
112 l_osdc_linger_active,
113 l_osdc_linger_send,
114 l_osdc_linger_resend,
115 l_osdc_linger_ping,
116
117 l_osdc_poolop_active,
118 l_osdc_poolop_send,
119 l_osdc_poolop_resend,
120
121 l_osdc_poolstat_active,
122 l_osdc_poolstat_send,
123 l_osdc_poolstat_resend,
124
125 l_osdc_statfs_active,
126 l_osdc_statfs_send,
127 l_osdc_statfs_resend,
128
129 l_osdc_command_active,
130 l_osdc_command_send,
131 l_osdc_command_resend,
132
133 l_osdc_map_epoch,
134 l_osdc_map_full,
135 l_osdc_map_inc,
136
137 l_osdc_osd_sessions,
138 l_osdc_osd_session_open,
139 l_osdc_osd_session_close,
140 l_osdc_osd_laggy,
141
142 l_osdc_osdop_omap_wr,
143 l_osdc_osdop_omap_rd,
144 l_osdc_osdop_omap_del,
145
146 l_osdc_last,
147};
148
149
150// config obs ----------------------------
151
152static const char *config_keys[] = {
153 "crush_location",
154 NULL
155};
156
157class Objecter::RequestStateHook : public AdminSocketHook {
158 Objecter *m_objecter;
159public:
160 explicit RequestStateHook(Objecter *objecter);
161 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
162 bufferlist& out) override;
163};
164
165/**
166 * This is a more limited form of C_Contexts, but that requires
167 * a ceph_context which we don't have here.
168 */
169class ObjectOperation::C_TwoContexts : public Context {
170 Context *first;
171 Context *second;
172public:
173 C_TwoContexts(Context *first, Context *second) :
174 first(first), second(second) {}
175 void finish(int r) override {
176 first->complete(r);
177 second->complete(r);
178 first = NULL;
179 second = NULL;
180 }
181
182 ~C_TwoContexts() override {
183 delete first;
184 delete second;
185 }
186};
187
188void ObjectOperation::add_handler(Context *extra) {
189 size_t last = out_handler.size() - 1;
190 Context *orig = out_handler[last];
191 if (orig) {
192 Context *wrapper = new C_TwoContexts(orig, extra);
193 out_handler[last] = wrapper;
194 } else {
195 out_handler[last] = extra;
196 }
197}
198
199Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
200 object_t& oid)
201{
202 if (oid.name.empty())
203 return unique_completion_lock();
204
205 static constexpr uint32_t HASH_PRIME = 1021;
206 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
207 % HASH_PRIME;
208
209 return unique_completion_lock(completion_locks[h % num_locks],
210 std::defer_lock);
211}
212
213const char** Objecter::get_tracked_conf_keys() const
214{
215 return config_keys;
216}
217
218
219void Objecter::handle_conf_change(const struct md_config_t *conf,
220 const std::set <std::string> &changed)
221{
222 if (changed.count("crush_location")) {
223 update_crush_location();
224 }
225}
226
227void Objecter::update_crush_location()
228{
229 unique_lock wl(rwlock);
230 crush_location = cct->crush_location.get_location();
231}
232
233// messages ------------------------------
234
235/*
236 * initialize only internal data structures, don't initiate cluster interaction
237 */
238void Objecter::init()
239{
240 assert(!initialized.read());
241
242 if (!logger) {
243 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
244
245 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
246 PerfCountersBuilder::PRIO_CRITICAL);
247 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
248 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
249 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
250 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
251 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
252
253 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
254 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
255 PerfCountersBuilder::PRIO_CRITICAL);
256 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
257 PerfCountersBuilder::PRIO_CRITICAL);
258 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
259 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
260 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
261
262 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
263 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
264 "Create object operations");
265 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
266 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
267 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
268 "Write full object operations");
269 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
270 "Write same operations");
271 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
272 "Append operation");
273 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
274 "Set object to zero operations");
275 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
276 "Truncate object operations");
277 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
278 "Delete object operations");
279 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
280 "Map extent operations");
281 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
282 "Sparse read operations");
283 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
284 "Clone range operations");
285 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
286 "Get xattr operations");
287 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
288 "Set xattr operations");
289 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
290 "Xattr comparison operations");
291 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
292 "Remove xattr operations");
293 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
294 "Reset xattr operations");
295 pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
296 "TMAP update operations");
297 pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
298 "TMAP put operations");
299 pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
300 "TMAP get operations");
301 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
302 "Call (execute) operations");
303 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
304 "Watch by object operations");
305 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
306 "Notify about object operations");
307 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
308 "Extended attribute comparison in multi operations");
309 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
310 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
311 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
312
313 pcb.add_u64(l_osdc_linger_active, "linger_active",
314 "Active lingering operations");
315 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
316 "Sent lingering operations");
317 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
318 "Resent lingering operations");
319 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
320 "Sent pings to lingering operations");
321
322 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
323 "Active pool operations");
324 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
325 "Sent pool operations");
326 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
327 "Resent pool operations");
328
329 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
330 "Active get pool stat operations");
331 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
332 "Pool stat operations sent");
333 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
334 "Resent pool stats");
335
336 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
337 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
338 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
339 "Resent FS stats");
340
341 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
342 pcb.add_u64_counter(l_osdc_command_send, "command_send",
343 "Sent commands");
344 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
345 "Resent commands");
346
347 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
348 pcb.add_u64_counter(l_osdc_map_full, "map_full",
349 "Full OSD maps received");
350 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
351 "Incremental OSD maps received");
352
353 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
354 "Open sessions"); // open sessions
355 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
356 "Sessions opened");
357 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
358 "Sessions closed");
359 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
360
361 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
362 "OSD OMAP write operations");
363 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
364 "OSD OMAP read operations");
365 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
366 "OSD OMAP delete operations");
367
368 logger = pcb.create_perf_counters();
369 cct->get_perfcounters_collection()->add(logger);
370 }
371
372 m_request_state_hook = new RequestStateHook(this);
373 AdminSocket* admin_socket = cct->get_admin_socket();
374 int ret = admin_socket->register_command("objecter_requests",
375 "objecter_requests",
376 m_request_state_hook,
377 "show in-progress osd requests");
378
379 /* Don't warn on EEXIST, happens if multiple ceph clients
380 * are instantiated from one process */
381 if (ret < 0 && ret != -EEXIST) {
382 lderr(cct) << "error registering admin socket command: "
383 << cpp_strerror(ret) << dendl;
384 }
385
386 update_crush_location();
387
388 cct->_conf->add_observer(this);
389
390 initialized.set(1);
391}
392
393/*
394 * ok, cluster interaction can happen
395 */
396void Objecter::start(const OSDMap* o)
397{
398 shared_lock rl(rwlock);
399
400 start_tick();
401 if (o) {
402 osdmap->deepish_copy_from(*o);
403 } else if (osdmap->get_epoch() == 0) {
404 _maybe_request_map();
405 }
406}
407
408void Objecter::shutdown()
409{
410 assert(initialized.read());
411
412 unique_lock wl(rwlock);
413
414 initialized.set(0);
415
416 cct->_conf->remove_observer(this);
417
418 map<int,OSDSession*>::iterator p;
419 while (!osd_sessions.empty()) {
420 p = osd_sessions.begin();
421 close_session(p->second);
422 }
423
424 while(!check_latest_map_lingers.empty()) {
425 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
426 i->second->put();
427 check_latest_map_lingers.erase(i->first);
428 }
429
430 while(!check_latest_map_ops.empty()) {
431 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
432 i->second->put();
433 check_latest_map_ops.erase(i->first);
434 }
435
436 while(!check_latest_map_commands.empty()) {
437 map<ceph_tid_t, CommandOp*>::iterator i
438 = check_latest_map_commands.begin();
439 i->second->put();
440 check_latest_map_commands.erase(i->first);
441 }
442
443 while(!poolstat_ops.empty()) {
444 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
445 delete i->second;
446 poolstat_ops.erase(i->first);
447 }
448
449 while(!statfs_ops.empty()) {
450 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
451 delete i->second;
452 statfs_ops.erase(i->first);
453 }
454
455 while(!pool_ops.empty()) {
456 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
457 delete i->second;
458 pool_ops.erase(i->first);
459 }
460
461 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
462 while(!homeless_session->linger_ops.empty()) {
463 std::map<uint64_t, LingerOp*>::iterator i
464 = homeless_session->linger_ops.begin();
465 ldout(cct, 10) << " linger_op " << i->first << dendl;
466 LingerOp *lop = i->second;
467 {
468 OSDSession::unique_lock swl(homeless_session->lock);
469 _session_linger_op_remove(homeless_session, lop);
470 }
471 linger_ops.erase(lop->linger_id);
472 linger_ops_set.erase(lop);
473 lop->put();
474 }
475
476 while(!homeless_session->ops.empty()) {
477 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
478 ldout(cct, 10) << " op " << i->first << dendl;
479 Op *op = i->second;
480 {
481 OSDSession::unique_lock swl(homeless_session->lock);
482 _session_op_remove(homeless_session, op);
483 }
484 op->put();
485 }
486
487 while(!homeless_session->command_ops.empty()) {
488 std::map<ceph_tid_t, CommandOp*>::iterator i
489 = homeless_session->command_ops.begin();
490 ldout(cct, 10) << " command_op " << i->first << dendl;
491 CommandOp *cop = i->second;
492 {
493 OSDSession::unique_lock swl(homeless_session->lock);
494 _session_command_op_remove(homeless_session, cop);
495 }
496 cop->put();
497 }
498
499 if (tick_event) {
500 if (timer.cancel_event(tick_event)) {
501 ldout(cct, 10) << " successfully canceled tick" << dendl;
502 }
503 tick_event = 0;
504 }
505
506 if (logger) {
507 cct->get_perfcounters_collection()->remove(logger);
508 delete logger;
509 logger = NULL;
510 }
511
512 // Let go of Objecter write lock so timer thread can shutdown
513 wl.unlock();
514
515 // Outside of lock to avoid cycle WRT calls to RequestStateHook
516 // This is safe because we guarantee no concurrent calls to
517 // shutdown() with the ::initialized check at start.
518 if (m_request_state_hook) {
519 AdminSocket* admin_socket = cct->get_admin_socket();
520 admin_socket->unregister_command("objecter_requests");
521 delete m_request_state_hook;
522 m_request_state_hook = NULL;
523 }
524}
525
526void Objecter::_send_linger(LingerOp *info,
527 shunique_lock& sul)
528{
529 assert(sul.owns_lock() && sul.mutex() == &rwlock);
530
531 vector<OSDOp> opv;
532 Context *oncommit = NULL;
533 LingerOp::shared_lock watchl(info->watch_lock);
534 bufferlist *poutbl = NULL;
535 if (info->registered && info->is_watch) {
536 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
537 << dendl;
538 opv.push_back(OSDOp());
539 opv.back().op.op = CEPH_OSD_OP_WATCH;
540 opv.back().op.watch.cookie = info->get_cookie();
541 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
542 opv.back().op.watch.gen = ++info->register_gen;
543 oncommit = new C_Linger_Reconnect(this, info);
544 } else {
545 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
546 << dendl;
547 opv = info->ops;
548 C_Linger_Commit *c = new C_Linger_Commit(this, info);
549 if (!info->is_watch) {
550 info->notify_id = 0;
551 poutbl = &c->outbl;
552 }
553 oncommit = c;
554 }
555 watchl.unlock();
556 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
557 opv, info->target.flags | CEPH_OSD_FLAG_READ,
558 oncommit, info->pobjver);
559 o->outbl = poutbl;
560 o->snapid = info->snap;
561 o->snapc = info->snapc;
562 o->mtime = info->mtime;
563
564 o->target = info->target;
565 o->tid = last_tid.inc();
566
567 // do not resend this; we will send a new op to reregister
568 o->should_resend = false;
569
570 if (info->register_tid) {
571 // repeat send. cancel old registeration op, if any.
572 OSDSession::unique_lock sl(info->session->lock);
573 if (info->session->ops.count(info->register_tid)) {
574 Op *o = info->session->ops[info->register_tid];
575 _op_cancel_map_check(o);
576 _cancel_linger_op(o);
577 }
578 sl.unlock();
579
580 _op_submit(o, sul, &info->register_tid);
581 } else {
582 // first send
583 _op_submit_with_budget(o, sul, &info->register_tid);
584 }
585
586 logger->inc(l_osdc_linger_send);
587}
588
589void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
590{
591 LingerOp::unique_lock wl(info->watch_lock);
592 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
593 if (info->on_reg_commit) {
594 info->on_reg_commit->complete(r);
595 info->on_reg_commit = NULL;
596 }
597
598 // only tell the user the first time we do this
599 info->registered = true;
600 info->pobjver = NULL;
601
602 if (!info->is_watch) {
603 // make note of the notify_id
604 bufferlist::iterator p = outbl.begin();
605 try {
606 ::decode(info->notify_id, p);
607 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
608 << dendl;
609 }
610 catch (buffer::error& e) {
611 }
612 }
613}
614
615struct C_DoWatchError : public Context {
616 Objecter *objecter;
617 Objecter::LingerOp *info;
618 int err;
619 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
620 : objecter(o), info(i), err(r) {
621 info->get();
622 info->_queued_async();
623 }
624 void finish(int r) override {
625 Objecter::unique_lock wl(objecter->rwlock);
626 bool canceled = info->canceled;
627 wl.unlock();
628
629 if (!canceled) {
630 info->watch_context->handle_error(info->get_cookie(), err);
631 }
632
633 info->finished_async();
634 info->put();
635 }
636};
637
638int Objecter::_normalize_watch_error(int r)
639{
640 // translate ENOENT -> ENOTCONN so that a delete->disconnection
641 // notification and a failure to reconnect becuase we raced with
642 // the delete appear the same to the user.
643 if (r == -ENOENT)
644 r = -ENOTCONN;
645 return r;
646}
647
648void Objecter::_linger_reconnect(LingerOp *info, int r)
649{
650 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
651 << " (last_error " << info->last_error << ")" << dendl;
652 if (r < 0) {
653 LingerOp::unique_lock wl(info->watch_lock);
654 if (!info->last_error) {
655 r = _normalize_watch_error(r);
656 info->last_error = r;
657 if (info->watch_context) {
658 finisher->queue(new C_DoWatchError(this, info, r));
659 }
660 }
661 wl.unlock();
662 }
663}
664
665void Objecter::_send_linger_ping(LingerOp *info)
666{
667 // rwlock is locked unique
668 // info->session->lock is locked
669
670 if (cct->_conf->objecter_inject_no_watch_ping) {
671 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
672 << dendl;
673 return;
674 }
675 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
676 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
677 return;
678 }
679
680 ceph::mono_time now = ceph::mono_clock::now();
681 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
682 << dendl;
683
684 vector<OSDOp> opv(1);
685 opv[0].op.op = CEPH_OSD_OP_WATCH;
686 opv[0].op.watch.cookie = info->get_cookie();
687 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
688 opv[0].op.watch.gen = info->register_gen;
689 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
690 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
691 opv, info->target.flags | CEPH_OSD_FLAG_READ,
692 onack, NULL, NULL);
693 o->target = info->target;
694 o->should_resend = false;
695 _send_op_account(o);
696 MOSDOp *m = _prepare_osd_op(o);
697 o->tid = last_tid.inc();
698 _session_op_assign(info->session, o);
699 _send_op(o, m);
700 info->ping_tid = o->tid;
701
702 onack->sent = now;
703 logger->inc(l_osdc_linger_ping);
704}
705
706void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
707 uint32_t register_gen)
708{
709 LingerOp::unique_lock l(info->watch_lock);
710 ldout(cct, 10) << __func__ << " " << info->linger_id
711 << " sent " << sent << " gen " << register_gen << " = " << r
712 << " (last_error " << info->last_error
713 << " register_gen " << info->register_gen << ")" << dendl;
714 if (info->register_gen == register_gen) {
715 if (r == 0) {
716 info->watch_valid_thru = sent;
717 } else if (r < 0 && !info->last_error) {
718 r = _normalize_watch_error(r);
719 info->last_error = r;
720 if (info->watch_context) {
721 finisher->queue(new C_DoWatchError(this, info, r));
722 }
723 }
724 } else {
725 ldout(cct, 20) << " ignoring old gen" << dendl;
726 }
727}
728
729int Objecter::linger_check(LingerOp *info)
730{
731 LingerOp::shared_lock l(info->watch_lock);
732
733 mono_time stamp = info->watch_valid_thru;
734 if (!info->watch_pending_async.empty())
735 stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
736 auto age = mono_clock::now() - stamp;
737
738 ldout(cct, 10) << __func__ << " " << info->linger_id
739 << " err " << info->last_error
740 << " age " << age << dendl;
741 if (info->last_error)
742 return info->last_error;
743 // return a safe upper bound (we are truncating to ms)
744 return
745 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
746}
747
748void Objecter::linger_cancel(LingerOp *info)
749{
750 unique_lock wl(rwlock);
751 _linger_cancel(info);
752 info->put();
753}
754
755void Objecter::_linger_cancel(LingerOp *info)
756{
757 // rwlock is locked unique
758 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
759 if (!info->canceled) {
760 OSDSession *s = info->session;
761 OSDSession::unique_lock sl(s->lock);
762 _session_linger_op_remove(s, info);
763 sl.unlock();
764
765 linger_ops.erase(info->linger_id);
766 linger_ops_set.erase(info);
767 assert(linger_ops.size() == linger_ops_set.size());
768
769 info->canceled = true;
770 info->put();
771
772 logger->dec(l_osdc_linger_active);
773 }
774}
775
776
777
778Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
779 const object_locator_t& oloc,
780 int flags)
781{
782 LingerOp *info = new LingerOp;
783 info->target.base_oid = oid;
784 info->target.base_oloc = oloc;
785 if (info->target.base_oloc.key == oid)
786 info->target.base_oloc.key.clear();
787 info->target.flags = flags;
788 info->watch_valid_thru = mono_clock::now();
789
790 unique_lock l(rwlock);
791
792 // Acquire linger ID
793 info->linger_id = ++max_linger_id;
794 ldout(cct, 10) << __func__ << " info " << info
795 << " linger_id " << info->linger_id
796 << " cookie " << info->get_cookie()
797 << dendl;
798 linger_ops[info->linger_id] = info;
799 linger_ops_set.insert(info);
800 assert(linger_ops.size() == linger_ops_set.size());
801
802 info->get(); // for the caller
803 return info;
804}
805
806ceph_tid_t Objecter::linger_watch(LingerOp *info,
807 ObjectOperation& op,
808 const SnapContext& snapc,
809 real_time mtime,
810 bufferlist& inbl,
811 Context *oncommit,
812 version_t *objver)
813{
814 info->is_watch = true;
815 info->snapc = snapc;
816 info->mtime = mtime;
817 info->target.flags |= CEPH_OSD_FLAG_WRITE;
818 info->ops = op.ops;
819 info->inbl = inbl;
820 info->poutbl = NULL;
821 info->pobjver = objver;
822 info->on_reg_commit = oncommit;
823
824 shunique_lock sul(rwlock, ceph::acquire_unique);
825 _linger_submit(info, sul);
826 logger->inc(l_osdc_linger_active);
827
828 return info->linger_id;
829}
830
831ceph_tid_t Objecter::linger_notify(LingerOp *info,
832 ObjectOperation& op,
833 snapid_t snap, bufferlist& inbl,
834 bufferlist *poutbl,
835 Context *onfinish,
836 version_t *objver)
837{
838 info->snap = snap;
839 info->target.flags |= CEPH_OSD_FLAG_READ;
840 info->ops = op.ops;
841 info->inbl = inbl;
842 info->poutbl = poutbl;
843 info->pobjver = objver;
844 info->on_reg_commit = onfinish;
845
846 shunique_lock sul(rwlock, ceph::acquire_unique);
847 _linger_submit(info, sul);
848 logger->inc(l_osdc_linger_active);
849
850 return info->linger_id;
851}
852
853void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
854{
855 assert(sul.owns_lock() && sul.mutex() == &rwlock);
856 assert(info->linger_id);
857
858 // Populate Op::target
859 OSDSession *s = NULL;
860 _calc_target(&info->target, nullptr);
861
862 // Create LingerOp<->OSDSession relation
863 int r = _get_session(info->target.osd, &s, sul);
864 assert(r == 0);
865 OSDSession::unique_lock sl(s->lock);
866 _session_linger_op_assign(s, info);
867 sl.unlock();
868 put_session(s);
869
870 _send_linger(info, sul);
871}
872
873struct C_DoWatchNotify : public Context {
874 Objecter *objecter;
875 Objecter::LingerOp *info;
876 MWatchNotify *msg;
877 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
878 : objecter(o), info(i), msg(m) {
879 info->get();
880 info->_queued_async();
881 msg->get();
882 }
883 void finish(int r) override {
884 objecter->_do_watch_notify(info, msg);
885 }
886};
887
888void Objecter::handle_watch_notify(MWatchNotify *m)
889{
890 shared_lock l(rwlock);
891 if (!initialized.read()) {
892 return;
893 }
894
895 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
896 if (linger_ops_set.count(info) == 0) {
897 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
898 return;
899 }
900 LingerOp::unique_lock wl(info->watch_lock);
901 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
902 if (!info->last_error) {
903 info->last_error = -ENOTCONN;
904 if (info->watch_context) {
905 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
906 }
907 }
908 } else if (!info->is_watch) {
909 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
910 // since we know the only user (librados) is safe to call in
911 // fast-dispatch context
912 if (info->notify_id &&
913 info->notify_id != m->notify_id) {
914 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
915 << " != " << info->notify_id << ", ignoring" << dendl;
916 } else if (info->on_notify_finish) {
917 info->notify_result_bl->claim(m->get_data());
918 info->on_notify_finish->complete(m->return_code);
919
920 // if we race with reconnect we might get a second notify; only
921 // notify the caller once!
922 info->on_notify_finish = NULL;
923 }
924 } else {
925 finisher->queue(new C_DoWatchNotify(this, info, m));
926 }
927}
928
929void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
930{
931 ldout(cct, 10) << __func__ << " " << *m << dendl;
932
933 shared_lock l(rwlock);
934 assert(initialized.read());
935
936 if (info->canceled) {
937 l.unlock();
938 goto out;
939 }
940
941 // notify completion?
942 assert(info->is_watch);
943 assert(info->watch_context);
944 assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
945
946 l.unlock();
947
948 switch (m->opcode) {
949 case CEPH_WATCH_EVENT_NOTIFY:
950 info->watch_context->handle_notify(m->notify_id, m->cookie,
951 m->notifier_gid, m->bl);
952 break;
953 }
954
955 out:
956 info->finished_async();
957 info->put();
958 m->put();
959}
960
961bool Objecter::ms_dispatch(Message *m)
962{
963 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
964 if (!initialized.read())
965 return false;
966
967 switch (m->get_type()) {
968 // these we exlusively handle
969 case CEPH_MSG_OSD_OPREPLY:
970 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
971 return true;
972
973 case CEPH_MSG_OSD_BACKOFF:
974 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
975 return true;
976
977 case CEPH_MSG_WATCH_NOTIFY:
978 handle_watch_notify(static_cast<MWatchNotify*>(m));
979 m->put();
980 return true;
981
982 case MSG_COMMAND_REPLY:
983 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
984 handle_command_reply(static_cast<MCommandReply*>(m));
985 return true;
986 } else {
987 return false;
988 }
989
990 case MSG_GETPOOLSTATSREPLY:
991 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
992 return true;
993
994 case CEPH_MSG_POOLOP_REPLY:
995 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
996 return true;
997
998 case CEPH_MSG_STATFS_REPLY:
999 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1000 return true;
1001
1002 // these we give others a chance to inspect
1003
1004 // MDS, OSD
1005 case CEPH_MSG_OSD_MAP:
1006 handle_osd_map(static_cast<MOSDMap*>(m));
1007 return false;
1008 }
1009 return false;
1010}
1011
1012void Objecter::_scan_requests(OSDSession *s,
1013 bool force_resend,
1014 bool cluster_full,
1015 map<int64_t, bool> *pool_full_map,
1016 map<ceph_tid_t, Op*>& need_resend,
1017 list<LingerOp*>& need_resend_linger,
1018 map<ceph_tid_t, CommandOp*>& need_resend_command,
1019 shunique_lock& sul)
1020{
1021 assert(sul.owns_lock() && sul.mutex() == &rwlock);
1022
1023 list<LingerOp*> unregister_lingers;
1024
1025 OSDSession::unique_lock sl(s->lock);
1026
1027 // check for changed linger mappings (_before_ regular ops)
1028 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1029 while (lp != s->linger_ops.end()) {
1030 LingerOp *op = lp->second;
1031 assert(op->session == s);
1032 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1033 // invalidation
1034 ++lp;
1035 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1036 bool unregister, force_resend_writes = cluster_full;
1037 int r = _recalc_linger_op_target(op, sul);
1038 if (pool_full_map)
1039 force_resend_writes = force_resend_writes ||
1040 (*pool_full_map)[op->target.base_oloc.pool];
1041 switch (r) {
1042 case RECALC_OP_TARGET_NO_ACTION:
1043 if (!force_resend && !force_resend_writes)
1044 break;
1045 // -- fall-thru --
1046 case RECALC_OP_TARGET_NEED_RESEND:
1047 need_resend_linger.push_back(op);
1048 _linger_cancel_map_check(op);
1049 break;
1050 case RECALC_OP_TARGET_POOL_DNE:
1051 _check_linger_pool_dne(op, &unregister);
1052 if (unregister) {
1053 ldout(cct, 10) << " need to unregister linger op "
1054 << op->linger_id << dendl;
1055 op->get();
1056 unregister_lingers.push_back(op);
1057 }
1058 break;
1059 }
1060 }
1061
1062 // check for changed request mappings
1063 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1064 while (p != s->ops.end()) {
1065 Op *op = p->second;
1066 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1067 ldout(cct, 10) << " checking op " << op->tid << dendl;
1068 bool force_resend_writes = cluster_full;
1069 if (pool_full_map)
1070 force_resend_writes = force_resend_writes ||
1071 (*pool_full_map)[op->target.base_oloc.pool];
1072 int r = _calc_target(&op->target,
1073 op->session ? op->session->con.get() : nullptr);
1074 switch (r) {
1075 case RECALC_OP_TARGET_NO_ACTION:
1076 if (!force_resend && !(force_resend_writes && op->respects_full()))
1077 break;
1078 // -- fall-thru --
1079 case RECALC_OP_TARGET_NEED_RESEND:
1080 if (op->session) {
1081 _session_op_remove(op->session, op);
1082 }
1083 need_resend[op->tid] = op;
1084 _op_cancel_map_check(op);
1085 break;
1086 case RECALC_OP_TARGET_POOL_DNE:
1087 _check_op_pool_dne(op, &sl);
1088 break;
1089 }
1090 }
1091
1092 // commands
1093 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1094 while (cp != s->command_ops.end()) {
1095 CommandOp *c = cp->second;
1096 ++cp;
1097 ldout(cct, 10) << " checking command " << c->tid << dendl;
1098 bool force_resend_writes = cluster_full;
1099 if (pool_full_map)
1100 force_resend_writes = force_resend_writes ||
1101 (*pool_full_map)[c->target_pg.pool()];
1102 int r = _calc_command_target(c, sul);
1103 switch (r) {
1104 case RECALC_OP_TARGET_NO_ACTION:
1105 // resend if skipped map; otherwise do nothing.
1106 if (!force_resend && !force_resend_writes)
1107 break;
1108 // -- fall-thru --
1109 case RECALC_OP_TARGET_NEED_RESEND:
1110 need_resend_command[c->tid] = c;
1111 if (c->session) {
1112 _session_command_op_remove(c->session, c);
1113 }
1114 _command_cancel_map_check(c);
1115 break;
1116 case RECALC_OP_TARGET_POOL_DNE:
1117 case RECALC_OP_TARGET_OSD_DNE:
1118 case RECALC_OP_TARGET_OSD_DOWN:
1119 _check_command_map_dne(c);
1120 break;
1121 }
1122 }
1123
1124 sl.unlock();
1125
1126 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1127 iter != unregister_lingers.end();
1128 ++iter) {
1129 _linger_cancel(*iter);
1130 (*iter)->put();
1131 }
1132}
1133
1134void Objecter::handle_osd_map(MOSDMap *m)
1135{
1136 shunique_lock sul(rwlock, acquire_unique);
1137 if (!initialized.read())
1138 return;
1139
1140 assert(osdmap);
1141
1142 if (m->fsid != monc->get_fsid()) {
1143 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1144 << " != " << monc->get_fsid() << dendl;
1145 return;
1146 }
1147
1148 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1149 bool cluster_full = _osdmap_full_flag();
1150 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1151 _osdmap_has_pool_full();
1152 map<int64_t, bool> pool_full_map;
1153 for (map<int64_t, pg_pool_t>::const_iterator it
1154 = osdmap->get_pools().begin();
1155 it != osdmap->get_pools().end(); ++it)
1156 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1157
1158
1159 list<LingerOp*> need_resend_linger;
1160 map<ceph_tid_t, Op*> need_resend;
1161 map<ceph_tid_t, CommandOp*> need_resend_command;
1162
1163 if (m->get_last() <= osdmap->get_epoch()) {
1164 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1165 << m->get_first() << "," << m->get_last()
1166 << "] <= " << osdmap->get_epoch() << dendl;
1167 } else {
1168 ldout(cct, 3) << "handle_osd_map got epochs ["
1169 << m->get_first() << "," << m->get_last()
1170 << "] > " << osdmap->get_epoch() << dendl;
1171
1172 if (osdmap->get_epoch()) {
1173 bool skipped_map = false;
1174 // we want incrementals
1175 for (epoch_t e = osdmap->get_epoch() + 1;
1176 e <= m->get_last();
1177 e++) {
1178
1179 if (osdmap->get_epoch() == e-1 &&
1180 m->incremental_maps.count(e)) {
1181 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1182 << dendl;
1183 OSDMap::Incremental inc(m->incremental_maps[e]);
1184 osdmap->apply_incremental(inc);
1185 logger->inc(l_osdc_map_inc);
1186 }
1187 else if (m->maps.count(e)) {
1188 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
1189 osdmap->decode(m->maps[e]);
1190 logger->inc(l_osdc_map_full);
1191 }
1192 else {
1193 if (e >= m->get_oldest()) {
1194 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1195 << osdmap->get_epoch()+1 << dendl;
1196 _maybe_request_map();
1197 break;
1198 }
1199 ldout(cct, 3) << "handle_osd_map missing epoch "
1200 << osdmap->get_epoch()+1
1201 << ", jumping to " << m->get_oldest() << dendl;
1202 e = m->get_oldest() - 1;
1203 skipped_map = true;
1204 continue;
1205 }
1206 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1207
1208 cluster_full = cluster_full || _osdmap_full_flag();
1209 update_pool_full_map(pool_full_map);
1210
1211 // check all outstanding requests on every epoch
1212 _scan_requests(homeless_session, skipped_map, cluster_full,
1213 &pool_full_map, need_resend,
1214 need_resend_linger, need_resend_command, sul);
1215 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1216 p != osd_sessions.end(); ) {
1217 OSDSession *s = p->second;
1218 _scan_requests(s, skipped_map, cluster_full,
1219 &pool_full_map, need_resend,
1220 need_resend_linger, need_resend_command, sul);
1221 ++p;
1222 // osd down or addr change?
1223 if (!osdmap->is_up(s->osd) ||
1224 (s->con &&
1225 s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
1226 close_session(s);
1227 }
1228 }
1229
1230 assert(e == osdmap->get_epoch());
1231 }
1232
1233 } else {
1234 // first map. we want the full thing.
1235 if (m->maps.count(m->get_last())) {
1236 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1237 p != osd_sessions.end(); ++p) {
1238 OSDSession *s = p->second;
1239 _scan_requests(s, false, false, NULL, need_resend,
1240 need_resend_linger, need_resend_command, sul);
1241 }
1242 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1243 << m->get_last() << dendl;
1244 osdmap->decode(m->maps[m->get_last()]);
1245
1246 _scan_requests(homeless_session, false, false, NULL,
1247 need_resend, need_resend_linger,
1248 need_resend_command, sul);
1249 } else {
1250 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1251 << dendl;
1252 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1253 monc->renew_subs();
1254 }
1255 }
1256 }
1257
1258 // make sure need_resend targets reflect latest map
1259 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1260 Op *op = p->second;
1261 if (op->target.epoch < osdmap->get_epoch()) {
1262 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1263 int r = _calc_target(&op->target, nullptr);
1264 if (r == RECALC_OP_TARGET_POOL_DNE) {
1265 p = need_resend.erase(p);
1266 _check_op_pool_dne(op, nullptr);
1267 } else {
1268 ++p;
1269 }
1270 } else {
1271 ++p;
1272 }
1273 }
1274
1275 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1276 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1277 || _osdmap_has_pool_full();
1278
1279 // was/is paused?
1280 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1281 osdmap->get_epoch() < epoch_barrier) {
1282 _maybe_request_map();
1283 }
1284
1285 // resend requests
1286 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1287 p != need_resend.end(); ++p) {
1288 Op *op = p->second;
1289 OSDSession *s = op->session;
1290 bool mapped_session = false;
1291 if (!s) {
1292 int r = _map_session(&op->target, &s, sul);
1293 assert(r == 0);
1294 mapped_session = true;
1295 } else {
1296 get_session(s);
1297 }
1298 OSDSession::unique_lock sl(s->lock);
1299 if (mapped_session) {
1300 _session_op_assign(s, op);
1301 }
1302 if (op->should_resend) {
1303 if (!op->session->is_homeless() && !op->target.paused) {
1304 logger->inc(l_osdc_op_resend);
1305 _send_op(op);
1306 }
1307 } else {
1308 _op_cancel_map_check(op);
1309 _cancel_linger_op(op);
1310 }
1311 sl.unlock();
1312 put_session(s);
1313 }
1314 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1315 p != need_resend_linger.end(); ++p) {
1316 LingerOp *op = *p;
1317 if (!op->session) {
1318 _calc_target(&op->target, nullptr);
1319 OSDSession *s = NULL;
1320 int const r = _get_session(op->target.osd, &s, sul);
1321 assert(r == 0);
1322 assert(s != NULL);
1323 op->session = s;
1324 put_session(s);
1325 }
1326 if (!op->session->is_homeless()) {
1327 logger->inc(l_osdc_linger_resend);
1328 _send_linger(op, sul);
1329 }
1330 }
1331 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1332 p != need_resend_command.end(); ++p) {
1333 CommandOp *c = p->second;
1334 if (c->target.osd >= 0) {
1335 _assign_command_session(c, sul);
1336 if (c->session && !c->session->is_homeless()) {
1337 _send_command(c);
1338 }
1339 }
1340 }
1341
1342 _dump_active();
1343
1344 // finish any Contexts that were waiting on a map update
1345 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1346 waiting_for_map.begin();
1347 while (p != waiting_for_map.end() &&
1348 p->first <= osdmap->get_epoch()) {
1349 //go through the list and call the onfinish methods
1350 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1351 i != p->second.end(); ++i) {
1352 i->first->complete(i->second);
1353 }
1354 waiting_for_map.erase(p++);
1355 }
1356
1357 monc->sub_got("osdmap", osdmap->get_epoch());
1358
1359 if (!waiting_for_map.empty()) {
1360 _maybe_request_map();
1361 }
1362}
1363
1364// op pool check
1365
1366void Objecter::C_Op_Map_Latest::finish(int r)
1367{
1368 if (r == -EAGAIN || r == -ECANCELED)
1369 return;
1370
1371 lgeneric_subdout(objecter->cct, objecter, 10)
1372 << "op_map_latest r=" << r << " tid=" << tid
1373 << " latest " << latest << dendl;
1374
1375 Objecter::unique_lock wl(objecter->rwlock);
1376
1377 map<ceph_tid_t, Op*>::iterator iter =
1378 objecter->check_latest_map_ops.find(tid);
1379 if (iter == objecter->check_latest_map_ops.end()) {
1380 lgeneric_subdout(objecter->cct, objecter, 10)
1381 << "op_map_latest op "<< tid << " not found" << dendl;
1382 return;
1383 }
1384
1385 Op *op = iter->second;
1386 objecter->check_latest_map_ops.erase(iter);
1387
1388 lgeneric_subdout(objecter->cct, objecter, 20)
1389 << "op_map_latest op "<< op << dendl;
1390
1391 if (op->map_dne_bound == 0)
1392 op->map_dne_bound = latest;
1393
1394 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1395 objecter->_check_op_pool_dne(op, &sl);
1396
1397 op->put();
1398}
1399
1400int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1401 snapid_t *snap) const
1402{
1403 shared_lock rl(rwlock);
1404
1405 auto& pools = osdmap->get_pools();
1406 auto iter = pools.find(poolid);
1407 if (iter == pools.end()) {
1408 return -ENOENT;
1409 }
1410 const pg_pool_t& pg_pool = iter->second;
1411 for (auto p = pg_pool.snaps.begin();
1412 p != pg_pool.snaps.end();
1413 ++p) {
1414 if (p->second.name == snap_name) {
1415 *snap = p->first;
1416 return 0;
1417 }
1418 }
1419 return -ENOENT;
1420}
1421
1422int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1423 pool_snap_info_t *info) const
1424{
1425 shared_lock rl(rwlock);
1426
1427 auto& pools = osdmap->get_pools();
1428 auto iter = pools.find(poolid);
1429 if (iter == pools.end()) {
1430 return -ENOENT;
1431 }
1432 const pg_pool_t& pg_pool = iter->second;
1433 auto p = pg_pool.snaps.find(snap);
1434 if (p == pg_pool.snaps.end())
1435 return -ENOENT;
1436 *info = p->second;
1437
1438 return 0;
1439}
1440
1441int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1442{
1443 shared_lock rl(rwlock);
1444
1445 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1446 if (!pi)
1447 return -ENOENT;
1448 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1449 p != pi->snaps.end();
1450 ++p) {
1451 snaps->push_back(p->first);
1452 }
1453 return 0;
1454}
1455
1456// sl may be unlocked.
1457void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1458{
1459 // rwlock is locked unique
1460
1461 if (op->target.pool_ever_existed) {
1462 // the pool previously existed and now it does not, which means it
1463 // was deleted.
1464 op->map_dne_bound = osdmap->get_epoch();
1465 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1466 << " pool previously exists but now does not"
1467 << dendl;
1468 } else {
1469 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1470 << " current " << osdmap->get_epoch()
1471 << " map_dne_bound " << op->map_dne_bound
1472 << dendl;
1473 }
1474 if (op->map_dne_bound > 0) {
1475 if (osdmap->get_epoch() >= op->map_dne_bound) {
1476 // we had a new enough map
1477 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1478 << " concluding pool " << op->target.base_pgid.pool()
1479 << " dne" << dendl;
1480 if (op->onfinish) {
1481 op->onfinish->complete(-ENOENT);
1482 }
1483
1484 OSDSession *s = op->session;
1485 if (s) {
1486 assert(s != NULL);
1487 assert(sl->mutex() == &s->lock);
1488 bool session_locked = sl->owns_lock();
1489 if (!session_locked) {
1490 sl->lock();
1491 }
1492 _finish_op(op, 0);
1493 if (!session_locked) {
1494 sl->unlock();
1495 }
1496 } else {
1497 _finish_op(op, 0); // no session
1498 }
1499 }
1500 } else {
1501 _send_op_map_check(op);
1502 }
1503}
1504
1505void Objecter::_send_op_map_check(Op *op)
1506{
1507 // rwlock is locked unique
1508 // ask the monitor
1509 if (check_latest_map_ops.count(op->tid) == 0) {
1510 op->get();
1511 check_latest_map_ops[op->tid] = op;
1512 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1513 monc->get_version("osdmap", &c->latest, NULL, c);
1514 }
1515}
1516
1517void Objecter::_op_cancel_map_check(Op *op)
1518{
1519 // rwlock is locked unique
1520 map<ceph_tid_t, Op*>::iterator iter =
1521 check_latest_map_ops.find(op->tid);
1522 if (iter != check_latest_map_ops.end()) {
1523 Op *op = iter->second;
1524 op->put();
1525 check_latest_map_ops.erase(iter);
1526 }
1527}
1528
1529// linger pool check
1530
1531void Objecter::C_Linger_Map_Latest::finish(int r)
1532{
1533 if (r == -EAGAIN || r == -ECANCELED) {
1534 // ignore callback; we will retry in resend_mon_ops()
1535 return;
1536 }
1537
1538 unique_lock wl(objecter->rwlock);
1539
1540 map<uint64_t, LingerOp*>::iterator iter =
1541 objecter->check_latest_map_lingers.find(linger_id);
1542 if (iter == objecter->check_latest_map_lingers.end()) {
1543 return;
1544 }
1545
1546 LingerOp *op = iter->second;
1547 objecter->check_latest_map_lingers.erase(iter);
1548
1549 if (op->map_dne_bound == 0)
1550 op->map_dne_bound = latest;
1551
1552 bool unregister;
1553 objecter->_check_linger_pool_dne(op, &unregister);
1554
1555 if (unregister) {
1556 objecter->_linger_cancel(op);
1557 }
1558
1559 op->put();
1560}
1561
1562void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1563{
1564 // rwlock is locked unique
1565
1566 *need_unregister = false;
1567
1568 if (op->register_gen > 0) {
1569 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1570 << " pool previously existed but now does not"
1571 << dendl;
1572 op->map_dne_bound = osdmap->get_epoch();
1573 } else {
1574 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1575 << " current " << osdmap->get_epoch()
1576 << " map_dne_bound " << op->map_dne_bound
1577 << dendl;
1578 }
1579 if (op->map_dne_bound > 0) {
1580 if (osdmap->get_epoch() >= op->map_dne_bound) {
1581 if (op->on_reg_commit) {
1582 op->on_reg_commit->complete(-ENOENT);
1583 }
1584 *need_unregister = true;
1585 }
1586 } else {
1587 _send_linger_map_check(op);
1588 }
1589}
1590
1591void Objecter::_send_linger_map_check(LingerOp *op)
1592{
1593 // ask the monitor
1594 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1595 op->get();
1596 check_latest_map_lingers[op->linger_id] = op;
1597 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1598 monc->get_version("osdmap", &c->latest, NULL, c);
1599 }
1600}
1601
1602void Objecter::_linger_cancel_map_check(LingerOp *op)
1603{
1604 // rwlock is locked unique
1605
1606 map<uint64_t, LingerOp*>::iterator iter =
1607 check_latest_map_lingers.find(op->linger_id);
1608 if (iter != check_latest_map_lingers.end()) {
1609 LingerOp *op = iter->second;
1610 op->put();
1611 check_latest_map_lingers.erase(iter);
1612 }
1613}
1614
1615// command pool check
1616
1617void Objecter::C_Command_Map_Latest::finish(int r)
1618{
1619 if (r == -EAGAIN || r == -ECANCELED) {
1620 // ignore callback; we will retry in resend_mon_ops()
1621 return;
1622 }
1623
1624 unique_lock wl(objecter->rwlock);
1625
1626 map<uint64_t, CommandOp*>::iterator iter =
1627 objecter->check_latest_map_commands.find(tid);
1628 if (iter == objecter->check_latest_map_commands.end()) {
1629 return;
1630 }
1631
1632 CommandOp *c = iter->second;
1633 objecter->check_latest_map_commands.erase(iter);
1634
1635 if (c->map_dne_bound == 0)
1636 c->map_dne_bound = latest;
1637
1638 objecter->_check_command_map_dne(c);
1639
1640 c->put();
1641}
1642
1643void Objecter::_check_command_map_dne(CommandOp *c)
1644{
1645 // rwlock is locked unique
1646
1647 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1648 << " current " << osdmap->get_epoch()
1649 << " map_dne_bound " << c->map_dne_bound
1650 << dendl;
1651 if (c->map_dne_bound > 0) {
1652 if (osdmap->get_epoch() >= c->map_dne_bound) {
1653 _finish_command(c, c->map_check_error, c->map_check_error_str);
1654 }
1655 } else {
1656 _send_command_map_check(c);
1657 }
1658}
1659
1660void Objecter::_send_command_map_check(CommandOp *c)
1661{
1662 // rwlock is locked unique
1663
1664 // ask the monitor
1665 if (check_latest_map_commands.count(c->tid) == 0) {
1666 c->get();
1667 check_latest_map_commands[c->tid] = c;
1668 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1669 monc->get_version("osdmap", &f->latest, NULL, f);
1670 }
1671}
1672
1673void Objecter::_command_cancel_map_check(CommandOp *c)
1674{
1675 // rwlock is locked uniqe
1676
1677 map<uint64_t, CommandOp*>::iterator iter =
1678 check_latest_map_commands.find(c->tid);
1679 if (iter != check_latest_map_commands.end()) {
1680 CommandOp *c = iter->second;
1681 c->put();
1682 check_latest_map_commands.erase(iter);
1683 }
1684}
1685
1686
1687/**
1688 * Look up OSDSession by OSD id.
1689 *
1690 * @returns 0 on success, or -EAGAIN if the lock context requires
1691 * promotion to write.
1692 */
1693int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1694{
1695 assert(sul && sul.mutex() == &rwlock);
1696
1697 if (osd < 0) {
1698 *session = homeless_session;
1699 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1700 << dendl;
1701 return 0;
1702 }
1703
1704 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1705 if (p != osd_sessions.end()) {
1706 OSDSession *s = p->second;
1707 s->get();
1708 *session = s;
1709 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1710 << s->get_nref() << dendl;
1711 return 0;
1712 }
1713 if (!sul.owns_lock()) {
1714 return -EAGAIN;
1715 }
1716 OSDSession *s = new OSDSession(cct, osd);
1717 osd_sessions[osd] = s;
1718 s->con = messenger->get_connection(osdmap->get_inst(osd));
1719 s->con->set_priv(s->get());
1720 logger->inc(l_osdc_osd_session_open);
1721 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1722 s->get();
1723 *session = s;
1724 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1725 << s->get_nref() << dendl;
1726 return 0;
1727}
1728
1729void Objecter::put_session(Objecter::OSDSession *s)
1730{
1731 if (s && !s->is_homeless()) {
1732 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1733 << s->get_nref() << dendl;
1734 s->put();
1735 }
1736}
1737
1738void Objecter::get_session(Objecter::OSDSession *s)
1739{
1740 assert(s != NULL);
1741
1742 if (!s->is_homeless()) {
1743 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1744 << s->get_nref() << dendl;
1745 s->get();
1746 }
1747}
1748
1749void Objecter::_reopen_session(OSDSession *s)
1750{
1751 // s->lock is locked
1752
1753 entity_inst_t inst = osdmap->get_inst(s->osd);
1754 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
1755 << inst << dendl;
1756 if (s->con) {
1757 s->con->set_priv(NULL);
1758 s->con->mark_down();
1759 logger->inc(l_osdc_osd_session_close);
1760 }
1761 s->con = messenger->get_connection(inst);
1762 s->con->set_priv(s->get());
1763 s->incarnation++;
1764 logger->inc(l_osdc_osd_session_open);
1765}
1766
1767void Objecter::close_session(OSDSession *s)
1768{
1769 // rwlock is locked unique
1770
1771 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1772 if (s->con) {
1773 s->con->set_priv(NULL);
1774 s->con->mark_down();
1775 logger->inc(l_osdc_osd_session_close);
1776 }
1777 OSDSession::unique_lock sl(s->lock);
1778
1779 std::list<LingerOp*> homeless_lingers;
1780 std::list<CommandOp*> homeless_commands;
1781 std::list<Op*> homeless_ops;
1782
1783 while (!s->linger_ops.empty()) {
1784 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1785 ldout(cct, 10) << " linger_op " << i->first << dendl;
1786 homeless_lingers.push_back(i->second);
1787 _session_linger_op_remove(s, i->second);
1788 }
1789
1790 while (!s->ops.empty()) {
1791 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1792 ldout(cct, 10) << " op " << i->first << dendl;
1793 homeless_ops.push_back(i->second);
1794 _session_op_remove(s, i->second);
1795 }
1796
1797 while (!s->command_ops.empty()) {
1798 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1799 ldout(cct, 10) << " command_op " << i->first << dendl;
1800 homeless_commands.push_back(i->second);
1801 _session_command_op_remove(s, i->second);
1802 }
1803
1804 osd_sessions.erase(s->osd);
1805 sl.unlock();
1806 put_session(s);
1807
1808 // Assign any leftover ops to the homeless session
1809 {
1810 OSDSession::unique_lock hsl(homeless_session->lock);
1811 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1812 i != homeless_lingers.end(); ++i) {
1813 _session_linger_op_assign(homeless_session, *i);
1814 }
1815 for (std::list<Op*>::iterator i = homeless_ops.begin();
1816 i != homeless_ops.end(); ++i) {
1817 _session_op_assign(homeless_session, *i);
1818 }
1819 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1820 i != homeless_commands.end(); ++i) {
1821 _session_command_op_assign(homeless_session, *i);
1822 }
1823 }
1824
1825 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1826}
1827
1828void Objecter::wait_for_osd_map()
1829{
1830 unique_lock l(rwlock);
1831 if (osdmap->get_epoch()) {
1832 l.unlock();
1833 return;
1834 }
1835
1836 // Leave this since it goes with C_SafeCond
1837 Mutex lock("");
1838 Cond cond;
1839 bool done;
1840 lock.Lock();
1841 C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
1842 waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
1843 l.unlock();
1844 while (!done)
1845 cond.Wait(lock);
1846 lock.Unlock();
1847}
1848
1849struct C_Objecter_GetVersion : public Context {
1850 Objecter *objecter;
1851 uint64_t oldest, newest;
1852 Context *fin;
1853 C_Objecter_GetVersion(Objecter *o, Context *c)
1854 : objecter(o), oldest(0), newest(0), fin(c) {}
1855 void finish(int r) override {
1856 if (r >= 0) {
1857 objecter->get_latest_version(oldest, newest, fin);
1858 } else if (r == -EAGAIN) { // try again as instructed
1859 objecter->wait_for_latest_osdmap(fin);
1860 } else {
1861 // it doesn't return any other error codes!
1862 ceph_abort();
1863 }
1864 }
1865};
1866
1867void Objecter::wait_for_latest_osdmap(Context *fin)
1868{
1869 ldout(cct, 10) << __func__ << dendl;
1870 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1871 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1872}
1873
1874void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1875{
1876 unique_lock wl(rwlock);
1877 _get_latest_version(oldest, newest, fin);
1878}
1879
1880void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1881 Context *fin)
1882{
1883 // rwlock is locked unique
1884 if (osdmap->get_epoch() >= newest) {
1885 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1886 if (fin)
1887 fin->complete(0);
1888 return;
1889 }
1890
1891 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1892 _wait_for_new_map(fin, newest, 0);
1893}
1894
1895void Objecter::maybe_request_map()
1896{
1897 shared_lock rl(rwlock);
1898 _maybe_request_map();
1899}
1900
1901void Objecter::_maybe_request_map()
1902{
1903 // rwlock is locked
1904 int flag = 0;
1905 if (_osdmap_full_flag()
1906 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1907 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1908 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1909 "osd map (FULL flag is set)" << dendl;
1910 } else {
1911 ldout(cct, 10)
1912 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1913 flag = CEPH_SUBSCRIBE_ONETIME;
1914 }
1915 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1916 if (monc->sub_want("osdmap", epoch, flag)) {
1917 monc->renew_subs();
1918 }
1919}
1920
1921void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
1922{
1923 // rwlock is locked unique
1924 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
1925 _maybe_request_map();
1926}
1927
1928
1929/**
1930 * Use this together with wait_for_map: this is a pre-check to avoid
1931 * allocating a Context for wait_for_map if we can see that we
1932 * definitely already have the epoch.
1933 *
1934 * This does *not* replace the need to handle the return value of
1935 * wait_for_map: just because we don't have it in this pre-check
1936 * doesn't mean we won't have it when calling back into wait_for_map,
1937 * since the objecter lock is dropped in between.
1938 */
1939bool Objecter::have_map(const epoch_t epoch)
1940{
1941 shared_lock rl(rwlock);
1942 if (osdmap->get_epoch() >= epoch) {
1943 return true;
1944 } else {
1945 return false;
1946 }
1947}
1948
1949bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
1950{
1951 unique_lock wl(rwlock);
1952 if (osdmap->get_epoch() >= epoch) {
1953 return true;
1954 }
1955 _wait_for_new_map(c, epoch, err);
1956 return false;
1957}
1958
1959void Objecter::kick_requests(OSDSession *session)
1960{
1961 ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
1962
1963 map<uint64_t, LingerOp *> lresend;
1964 unique_lock wl(rwlock);
1965
1966 OSDSession::unique_lock sl(session->lock);
1967 _kick_requests(session, lresend);
1968 sl.unlock();
1969
1970 _linger_ops_resend(lresend, wl);
1971}
1972
1973void Objecter::_kick_requests(OSDSession *session,
1974 map<uint64_t, LingerOp *>& lresend)
1975{
1976 // rwlock is locked unique
1977
1978 // clear backoffs
1979 session->backoffs.clear();
1980 session->backoffs_by_id.clear();
1981
1982 // resend ops
1983 map<ceph_tid_t,Op*> resend; // resend in tid order
1984 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
1985 p != session->ops.end();) {
1986 Op *op = p->second;
1987 ++p;
1988 logger->inc(l_osdc_op_resend);
1989 if (op->should_resend) {
1990 if (!op->target.paused)
1991 resend[op->tid] = op;
1992 } else {
1993 _op_cancel_map_check(op);
1994 _cancel_linger_op(op);
1995 }
1996 }
1997
1998 while (!resend.empty()) {
1999 _send_op(resend.begin()->second);
2000 resend.erase(resend.begin());
2001 }
2002
2003 // resend lingers
2004 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2005 j != session->linger_ops.end(); ++j) {
2006 LingerOp *op = j->second;
2007 op->get();
2008 logger->inc(l_osdc_linger_resend);
2009 assert(lresend.count(j->first) == 0);
2010 lresend[j->first] = op;
2011 }
2012
2013 // resend commands
2014 map<uint64_t,CommandOp*> cresend; // resend in order
2015 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2016 k != session->command_ops.end(); ++k) {
2017 logger->inc(l_osdc_command_resend);
2018 cresend[k->first] = k->second;
2019 }
2020 while (!cresend.empty()) {
2021 _send_command(cresend.begin()->second);
2022 cresend.erase(cresend.begin());
2023 }
2024}
2025
2026void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2027 unique_lock& ul)
2028{
2029 assert(ul.owns_lock());
2030 shunique_lock sul(std::move(ul));
2031 while (!lresend.empty()) {
2032 LingerOp *op = lresend.begin()->second;
2033 if (!op->canceled) {
2034 _send_linger(op, sul);
2035 }
2036 op->put();
2037 lresend.erase(lresend.begin());
2038 }
2039 ul = unique_lock(sul.release_to_unique());
2040}
2041
2042void Objecter::start_tick()
2043{
2044 assert(tick_event == 0);
2045 tick_event =
2046 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2047 &Objecter::tick, this);
2048}
2049
2050void Objecter::tick()
2051{
2052 shared_lock rl(rwlock);
2053
2054 ldout(cct, 10) << "tick" << dendl;
2055
2056 // we are only called by C_Tick
2057 tick_event = 0;
2058
2059 if (!initialized.read()) {
2060 // we raced with shutdown
2061 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2062 return;
2063 }
2064
2065 set<OSDSession*> toping;
2066
2067
2068 // look for laggy requests
2069 auto cutoff = ceph::mono_clock::now();
2070 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2071
2072 unsigned laggy_ops = 0;
2073
2074 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2075 siter != osd_sessions.end(); ++siter) {
2076 OSDSession *s = siter->second;
2077 OSDSession::lock_guard l(s->lock);
2078 bool found = false;
2079 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2080 p != s->ops.end();
2081 ++p) {
2082 Op *op = p->second;
2083 assert(op->session);
2084 if (op->stamp < cutoff) {
2085 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2086 << " is laggy" << dendl;
2087 found = true;
2088 ++laggy_ops;
2089 }
2090 }
2091 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2092 p != s->linger_ops.end();
2093 ++p) {
2094 LingerOp *op = p->second;
2095 LingerOp::unique_lock wl(op->watch_lock);
2096 assert(op->session);
2097 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2098 << " (osd." << op->session->osd << ")" << dendl;
2099 found = true;
2100 if (op->is_watch && op->registered && !op->last_error)
2101 _send_linger_ping(op);
2102 }
2103 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2104 p != s->command_ops.end();
2105 ++p) {
2106 CommandOp *op = p->second;
2107 assert(op->session);
2108 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2109 << " (osd." << op->session->osd << ")" << dendl;
2110 found = true;
2111 }
2112 if (found)
2113 toping.insert(s);
2114 }
2115 if (num_homeless_ops.read() || !toping.empty()) {
2116 _maybe_request_map();
2117 }
2118
2119 logger->set(l_osdc_op_laggy, laggy_ops);
2120 logger->set(l_osdc_osd_laggy, toping.size());
2121
2122 if (!toping.empty()) {
2123 // send a ping to these osds, to ensure we detect any session resets
2124 // (osd reply message policy is lossy)
2125 for (set<OSDSession*>::const_iterator i = toping.begin();
2126 i != toping.end();
2127 ++i) {
2128 (*i)->con->send_message(new MPing);
2129 }
2130 }
2131
2132 // Make sure we don't resechedule if we wake up after shutdown
2133 if (initialized.read()) {
2134 tick_event = timer.reschedule_me(ceph::make_timespan(
2135 cct->_conf->objecter_tick_interval));
2136 }
2137}
2138
2139void Objecter::resend_mon_ops()
2140{
2141 unique_lock wl(rwlock);
2142
2143 ldout(cct, 10) << "resend_mon_ops" << dendl;
2144
2145 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2146 p != poolstat_ops.end();
2147 ++p) {
2148 _poolstat_submit(p->second);
2149 logger->inc(l_osdc_poolstat_resend);
2150 }
2151
2152 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2153 p != statfs_ops.end();
2154 ++p) {
2155 _fs_stats_submit(p->second);
2156 logger->inc(l_osdc_statfs_resend);
2157 }
2158
2159 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2160 p != pool_ops.end();
2161 ++p) {
2162 _pool_op_submit(p->second);
2163 logger->inc(l_osdc_poolop_resend);
2164 }
2165
2166 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2167 p != check_latest_map_ops.end();
2168 ++p) {
2169 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2170 monc->get_version("osdmap", &c->latest, NULL, c);
2171 }
2172
2173 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2174 p != check_latest_map_lingers.end();
2175 ++p) {
2176 C_Linger_Map_Latest *c
2177 = new C_Linger_Map_Latest(this, p->second->linger_id);
2178 monc->get_version("osdmap", &c->latest, NULL, c);
2179 }
2180
2181 for (map<uint64_t, CommandOp*>::iterator p
2182 = check_latest_map_commands.begin();
2183 p != check_latest_map_commands.end();
2184 ++p) {
2185 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2186 monc->get_version("osdmap", &c->latest, NULL, c);
2187 }
2188}
2189
2190// read | write ---------------------------
2191
2192void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2193{
2194 shunique_lock rl(rwlock, ceph::acquire_shared);
2195 ceph_tid_t tid = 0;
2196 if (!ptid)
2197 ptid = &tid;
2198 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2199}
2200
2201void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2202 ceph_tid_t *ptid,
2203 int *ctx_budget)
2204{
2205 assert(initialized.read());
2206
2207 assert(op->ops.size() == op->out_bl.size());
2208 assert(op->ops.size() == op->out_rval.size());
2209 assert(op->ops.size() == op->out_handler.size());
2210
2211 // throttle. before we look at any state, because
2212 // _take_op_budget() may drop our lock while it blocks.
2213 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2214 int op_budget = _take_op_budget(op, sul);
2215 // take and pass out the budget for the first OP
2216 // in the context session
2217 if (ctx_budget && (*ctx_budget == -1)) {
2218 *ctx_budget = op_budget;
2219 }
2220 }
2221
2222 if (osd_timeout > timespan(0)) {
2223 if (op->tid == 0)
2224 op->tid = last_tid.inc();
2225 auto tid = op->tid;
2226 op->ontimeout = timer.add_event(osd_timeout,
2227 [this, tid]() {
2228 op_cancel(tid, -ETIMEDOUT); });
2229 }
2230
2231 _op_submit(op, sul, ptid);
2232}
2233
2234void Objecter::_send_op_account(Op *op)
2235{
2236 inflight_ops.inc();
2237
2238 // add to gather set(s)
2239 if (op->onfinish) {
2240 num_in_flight.inc();
2241 } else {
2242 ldout(cct, 20) << " note: not requesting reply" << dendl;
2243 }
2244
2245 logger->inc(l_osdc_op_active);
2246 logger->inc(l_osdc_op);
2247
2248 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2249 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2250 logger->inc(l_osdc_op_rmw);
2251 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2252 logger->inc(l_osdc_op_w);
2253 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2254 logger->inc(l_osdc_op_r);
2255
2256 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2257 logger->inc(l_osdc_op_pg);
2258
2259 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2260 int code = l_osdc_osdop_other;
2261 switch (p->op.op) {
2262 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2263 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2264 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2265 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2266 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2267 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2268 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2269 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2270 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2271 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2272 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2273 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2274 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2275 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2276 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2277 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2278 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
2279 case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
2280 case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
2281 case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
2282
2283 // OMAP read operations
2284 case CEPH_OSD_OP_OMAPGETVALS:
2285 case CEPH_OSD_OP_OMAPGETKEYS:
2286 case CEPH_OSD_OP_OMAPGETHEADER:
2287 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2288 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2289
2290 // OMAP write operations
2291 case CEPH_OSD_OP_OMAPSETVALS:
2292 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2293
2294 // OMAP del operations
2295 case CEPH_OSD_OP_OMAPCLEAR:
2296 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2297
2298 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2299 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2300 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2301 }
2302 if (code)
2303 logger->inc(code);
2304 }
2305}
2306
2307void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2308{
2309 // rwlock is locked
2310
2311 ldout(cct, 10) << __func__ << " op " << op << dendl;
2312
2313 // pick target
2314 assert(op->session == NULL);
2315 OSDSession *s = NULL;
2316
2317 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2318 == RECALC_OP_TARGET_POOL_DNE;
2319
2320 // Try to get a session, including a retry if we need to take write lock
2321 int r = _get_session(op->target.osd, &s, sul);
2322 if (r == -EAGAIN ||
2323 (check_for_latest_map && sul.owns_lock_shared())) {
2324 epoch_t orig_epoch = osdmap->get_epoch();
2325 sul.unlock();
2326 if (cct->_conf->objecter_debug_inject_relock_delay) {
2327 sleep(1);
2328 }
2329 sul.lock();
2330 if (orig_epoch != osdmap->get_epoch()) {
2331 // map changed; recalculate mapping
2332 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2333 << dendl;
2334 check_for_latest_map = _calc_target(&op->target, nullptr)
2335 == RECALC_OP_TARGET_POOL_DNE;
2336 if (s) {
2337 put_session(s);
2338 s = NULL;
2339 r = -EAGAIN;
2340 }
2341 }
2342 }
2343 if (r == -EAGAIN) {
2344 assert(s == NULL);
2345 r = _get_session(op->target.osd, &s, sul);
2346 }
2347 assert(r == 0);
2348 assert(s); // may be homeless
2349
2350 _send_op_account(op);
2351
2352 // send?
2353
2354 assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
2355
2356 if (osdmap_full_try) {
2357 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2358 }
2359
2360 bool need_send = false;
2361
2362 if (osdmap->get_epoch() < epoch_barrier) {
2363 ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
2364 << dendl;
2365 op->target.paused = true;
2366 _maybe_request_map();
2367 } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
2368 osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2369 ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
2370 << dendl;
2371 op->target.paused = true;
2372 _maybe_request_map();
2373 } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
2374 osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
2375 ldout(cct, 10) << " paused read " << op << " tid " << op->tid
2376 << dendl;
2377 op->target.paused = true;
2378 _maybe_request_map();
2379 } else if (op->respects_full() &&
2380 (_osdmap_full_flag() ||
2381 _osdmap_pool_full(op->target.base_oloc.pool))) {
2382 ldout(cct, 0) << " FULL, paused modify " << op << " tid "
2383 << op->tid << dendl;
2384 op->target.paused = true;
2385 _maybe_request_map();
2386 } else if (!s->is_homeless()) {
2387 need_send = true;
2388 } else {
2389 _maybe_request_map();
2390 }
2391
2392 MOSDOp *m = NULL;
2393 if (need_send) {
2394 m = _prepare_osd_op(op);
2395 }
2396
2397 OSDSession::unique_lock sl(s->lock);
2398 if (op->tid == 0)
2399 op->tid = last_tid.inc();
2400
2401 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2402 << " '" << op->target.base_oloc << "' '"
2403 << op->target.target_oloc << "' " << op->ops << " tid "
2404 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2405 << dendl;
2406
2407 _session_op_assign(s, op);
2408
2409 if (need_send) {
2410 _send_op(op, m);
2411 }
2412
2413 // Last chance to touch Op here, after giving up session lock it can
2414 // be freed at any time by response handler.
2415 ceph_tid_t tid = op->tid;
2416 if (check_for_latest_map) {
2417 _send_op_map_check(op);
2418 }
2419 if (ptid)
2420 *ptid = tid;
2421 op = NULL;
2422
2423 sl.unlock();
2424 put_session(s);
2425
2426 ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
2427}
2428
2429int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2430{
2431 assert(initialized.read());
2432
2433 OSDSession::unique_lock sl(s->lock);
2434
2435 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2436 if (p == s->ops.end()) {
2437 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2438 << s->osd << dendl;
2439 return -ENOENT;
2440 }
2441
2442 if (s->con) {
2443 ldout(cct, 20) << " revoking rx buffer for " << tid
2444 << " on " << s->con << dendl;
2445 s->con->revoke_rx_buffer(tid);
2446 }
2447
2448 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2449 << dendl;
2450 Op *op = p->second;
2451 if (op->onfinish) {
2452 num_in_flight.dec();
2453 op->onfinish->complete(r);
2454 op->onfinish = NULL;
2455 }
2456 _op_cancel_map_check(op);
2457 _finish_op(op, r);
2458 sl.unlock();
2459
2460 return 0;
2461}
2462
2463int Objecter::op_cancel(ceph_tid_t tid, int r)
2464{
2465 int ret = 0;
2466
2467 unique_lock wl(rwlock);
2468 ret = _op_cancel(tid, r);
2469
2470 return ret;
2471}
2472
2473int Objecter::_op_cancel(ceph_tid_t tid, int r)
2474{
2475 int ret = 0;
2476
2477 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2478 << dendl;
2479
2480start:
2481
2482 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2483 siter != osd_sessions.end(); ++siter) {
2484 OSDSession *s = siter->second;
2485 OSDSession::shared_lock sl(s->lock);
2486 if (s->ops.find(tid) != s->ops.end()) {
2487 sl.unlock();
2488 ret = op_cancel(s, tid, r);
2489 if (ret == -ENOENT) {
2490 /* oh no! raced, maybe tid moved to another session, restarting */
2491 goto start;
2492 }
2493 return ret;
2494 }
2495 }
2496
2497 ldout(cct, 5) << __func__ << ": tid " << tid
2498 << " not found in live sessions" << dendl;
2499
2500 // Handle case where the op is in homeless session
2501 OSDSession::shared_lock sl(homeless_session->lock);
2502 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2503 sl.unlock();
2504 ret = op_cancel(homeless_session, tid, r);
2505 if (ret == -ENOENT) {
2506 /* oh no! raced, maybe tid moved to another session, restarting */
2507 goto start;
2508 } else {
2509 return ret;
2510 }
2511 } else {
2512 sl.unlock();
2513 }
2514
2515 ldout(cct, 5) << __func__ << ": tid " << tid
2516 << " not found in homeless session" << dendl;
2517
2518 return ret;
2519}
2520
2521
2522epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2523{
2524 unique_lock wl(rwlock);
2525
2526 std::vector<ceph_tid_t> to_cancel;
2527 bool found = false;
2528
2529 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2530 siter != osd_sessions.end(); ++siter) {
2531 OSDSession *s = siter->second;
2532 OSDSession::shared_lock sl(s->lock);
2533 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2534 op_i != s->ops.end(); ++op_i) {
2535 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2536 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2537 to_cancel.push_back(op_i->first);
2538 }
2539 }
2540 sl.unlock();
2541
2542 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2543 titer != to_cancel.end();
2544 ++titer) {
2545 int cancel_result = op_cancel(s, *titer, r);
2546 // We hold rwlock across search and cancellation, so cancels
2547 // should always succeed
2548 assert(cancel_result == 0);
2549 }
2550 if (!found && to_cancel.size())
2551 found = true;
2552 to_cancel.clear();
2553 }
2554
2555 const epoch_t epoch = osdmap->get_epoch();
2556
2557 wl.unlock();
2558
2559 if (found) {
2560 return epoch;
2561 } else {
2562 return -1;
2563 }
2564}
2565
2566bool Objecter::is_pg_changed(
2567 int oldprimary,
2568 const vector<int>& oldacting,
2569 int newprimary,
2570 const vector<int>& newacting,
2571 bool any_change)
2572{
2573 if (OSDMap::primary_changed(
2574 oldprimary,
2575 oldacting,
2576 newprimary,
2577 newacting))
2578 return true;
2579 if (any_change && oldacting != newacting)
2580 return true;
2581 return false; // same primary (tho replicas may have changed)
2582}
2583
2584bool Objecter::target_should_be_paused(op_target_t *t)
2585{
2586 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2587 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2588 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2589 _osdmap_full_flag() || _osdmap_pool_full(*pi);
2590
2591 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2592 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2593 (osdmap->get_epoch() < epoch_barrier);
2594}
2595
2596/**
2597 * Locking public accessor for _osdmap_full_flag
2598 */
2599bool Objecter::osdmap_full_flag() const
2600{
2601 shared_lock rl(rwlock);
2602
2603 return _osdmap_full_flag();
2604}
2605
2606bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2607{
2608 shared_lock rl(rwlock);
2609
2610 if (_osdmap_full_flag()) {
2611 return true;
2612 }
2613
2614 return _osdmap_pool_full(pool_id);
2615}
2616
2617bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2618{
2619 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2620 if (pool == NULL) {
2621 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2622 return false;
2623 }
2624
2625 return _osdmap_pool_full(*pool);
2626}
2627
2628bool Objecter::_osdmap_has_pool_full() const
2629{
2630 for (map<int64_t, pg_pool_t>::const_iterator it
2631 = osdmap->get_pools().begin();
2632 it != osdmap->get_pools().end(); ++it) {
2633 if (_osdmap_pool_full(it->second))
2634 return true;
2635 }
2636 return false;
2637}
2638
2639bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
2640{
2641 return p.has_flag(pg_pool_t::FLAG_FULL) && honor_osdmap_full;
2642}
2643
2644/**
2645 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2646 */
2647bool Objecter::_osdmap_full_flag() const
2648{
2649 // Ignore the FULL flag if the caller has honor_osdmap_full
2650 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
2651}
2652
2653void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2654{
2655 for (map<int64_t, pg_pool_t>::const_iterator it
2656 = osdmap->get_pools().begin();
2657 it != osdmap->get_pools().end(); ++it) {
2658 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2659 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2660 } else {
2661 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2662 pool_full_map[it->first];
2663 }
2664 }
2665}
2666
2667int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2668 const string& ns)
2669{
2670 shared_lock rl(rwlock);
2671 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2672 if (!p)
2673 return -ENOENT;
2674 return p->hash_key(key, ns);
2675}
2676
2677int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2678 const string& ns)
2679{
2680 shared_lock rl(rwlock);
2681 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2682 if (!p)
2683 return -ENOENT;
2684 return p->raw_hash_to_pg(p->hash_key(key, ns));
2685}
2686
2687int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2688{
2689 // rwlock is locked
2690 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2691 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2692 t->epoch = osdmap->get_epoch();
2693 ldout(cct,20) << __func__ << " epoch " << t->epoch
2694 << " base " << t->base_oid << " " << t->base_oloc
2695 << " precalc_pgid " << (int)t->precalc_pgid
2696 << " pgid " << t->base_pgid
2697 << (is_read ? " is_read" : "")
2698 << (is_write ? " is_write" : "")
2699 << dendl;
2700
2701 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2702 if (!pi) {
2703 t->osd = -1;
2704 return RECALC_OP_TARGET_POOL_DNE;
2705 }
2706 ldout(cct,30) << __func__ << " base pi " << pi
2707 << " pg_num " << pi->get_pg_num() << dendl;
2708
2709 bool force_resend = false;
2710 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2711 if (t->last_force_resend < pi->last_force_op_resend) {
2712 t->last_force_resend = pi->last_force_op_resend;
2713 force_resend = true;
2714 } else if (t->last_force_resend == 0) {
2715 force_resend = true;
2716 }
2717 }
2718
2719 // apply tiering
2720 t->target_oid = t->base_oid;
2721 t->target_oloc = t->base_oloc;
2722 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2723 if (is_read && pi->has_read_tier())
2724 t->target_oloc.pool = pi->read_tier;
2725 if (is_write && pi->has_write_tier())
2726 t->target_oloc.pool = pi->write_tier;
2727 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2728 if (!pi) {
2729 t->osd = -1;
2730 return RECALC_OP_TARGET_POOL_DNE;
2731 }
2732 }
2733
2734 pg_t pgid;
2735 if (t->precalc_pgid) {
2736 assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2737 assert(t->base_oid.name.empty()); // make sure this is a pg op
2738 assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
2739 pgid = t->base_pgid;
2740 } else {
2741 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2742 pgid);
2743 if (ret == -ENOENT) {
2744 t->osd = -1;
2745 return RECALC_OP_TARGET_POOL_DNE;
2746 }
2747 }
2748 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2749 << t->target_oloc << " -> pgid " << pgid << dendl;
2750 ldout(cct,30) << __func__ << " target pi " << pi
2751 << " pg_num " << pi->get_pg_num() << dendl;
2752 t->pool_ever_existed = true;
2753
2754 int size = pi->size;
2755 int min_size = pi->min_size;
2756 unsigned pg_num = pi->get_pg_num();
2757 int up_primary, acting_primary;
2758 vector<int> up, acting;
2759 osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
2760 &acting, &acting_primary);
2761 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
2762 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2763 pg_t prev_pgid(prev_seed, pgid.pool());
2764 if (any_change && PastIntervals::is_new_interval(
2765 t->acting_primary,
2766 acting_primary,
2767 t->acting,
2768 acting,
2769 t->up_primary,
2770 up_primary,
2771 t->up,
2772 up,
2773 t->size,
2774 size,
2775 t->min_size,
2776 min_size,
2777 t->pg_num,
2778 pg_num,
2779 t->sort_bitwise,
2780 sort_bitwise,
2781 prev_pgid)) {
2782 force_resend = true;
2783 }
2784
2785 bool unpaused = false;
2786 if (t->paused && !target_should_be_paused(t)) {
2787 t->paused = false;
2788 unpaused = true;
2789 }
2790
2791 bool legacy_change =
2792 t->pgid != pgid ||
2793 is_pg_changed(
2794 t->acting_primary, t->acting, acting_primary, acting,
2795 t->used_replica || any_change);
2796 bool split = false;
2797 if (t->pg_num) {
2798 split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
2799 }
2800
2801 if (legacy_change || split || force_resend) {
2802 t->pgid = pgid;
2803 t->acting = acting;
2804 t->acting_primary = acting_primary;
2805 t->up_primary = up_primary;
2806 t->up = up;
2807 t->size = size;
2808 t->min_size = min_size;
2809 t->pg_num = pg_num;
2810 t->pg_num_mask = pi->get_pg_num_mask();
2811 osdmap->get_primary_shard(
2812 pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
2813 &t->actual_pgid);
2814 t->sort_bitwise = sort_bitwise;
2815 ldout(cct, 10) << __func__ << " "
2816 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2817 << " acting " << acting
2818 << " primary " << acting_primary << dendl;
2819 t->used_replica = false;
2820 if (acting_primary == -1) {
2821 t->osd = -1;
2822 } else {
2823 int osd;
2824 bool read = is_read && !is_write;
2825 if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2826 int p = rand() % acting.size();
2827 if (p)
2828 t->used_replica = true;
2829 osd = acting[p];
2830 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2831 << dendl;
2832 } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2833 acting.size() > 1) {
2834 // look for a local replica. prefer the primary if the
2835 // distance is the same.
2836 int best = -1;
2837 int best_locality = 0;
2838 for (unsigned i = 0; i < acting.size(); ++i) {
2839 int locality = osdmap->crush->get_common_ancestor_distance(
2840 cct, acting[i], crush_location);
2841 ldout(cct, 20) << __func__ << " localize: rank " << i
2842 << " osd." << acting[i]
2843 << " locality " << locality << dendl;
2844 if (i == 0 ||
2845 (locality >= 0 && best_locality >= 0 &&
2846 locality < best_locality) ||
2847 (best_locality < 0 && locality >= 0)) {
2848 best = i;
2849 best_locality = locality;
2850 if (i)
2851 t->used_replica = true;
2852 }
2853 }
2854 assert(best >= 0);
2855 osd = acting[best];
2856 } else {
2857 osd = acting_primary;
2858 }
2859 t->osd = osd;
2860 }
2861 }
2862 if (legacy_change || unpaused || force_resend) {
2863 return RECALC_OP_TARGET_NEED_RESEND;
2864 }
2865 if (split && con && con->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT)) {
2866 return RECALC_OP_TARGET_NEED_RESEND;
2867 }
2868 return RECALC_OP_TARGET_NO_ACTION;
2869}
2870
2871int Objecter::_map_session(op_target_t *target, OSDSession **s,
2872 shunique_lock& sul)
2873{
2874 _calc_target(target, nullptr);
2875 return _get_session(target->osd, s, sul);
2876}
2877
2878void Objecter::_session_op_assign(OSDSession *to, Op *op)
2879{
2880 // to->lock is locked
2881 assert(op->session == NULL);
2882 assert(op->tid);
2883
2884 get_session(to);
2885 op->session = to;
2886 to->ops[op->tid] = op;
2887
2888 if (to->is_homeless()) {
2889 num_homeless_ops.inc();
2890 }
2891
2892 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
2893}
2894
2895void Objecter::_session_op_remove(OSDSession *from, Op *op)
2896{
2897 assert(op->session == from);
2898 // from->lock is locked
2899
2900 if (from->is_homeless()) {
2901 num_homeless_ops.dec();
2902 }
2903
2904 from->ops.erase(op->tid);
2905 put_session(from);
2906 op->session = NULL;
2907
2908 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
2909}
2910
2911void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
2912{
2913 // to lock is locked unique
2914 assert(op->session == NULL);
2915
2916 if (to->is_homeless()) {
2917 num_homeless_ops.inc();
2918 }
2919
2920 get_session(to);
2921 op->session = to;
2922 to->linger_ops[op->linger_id] = op;
2923
2924 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
2925 << dendl;
2926}
2927
2928void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
2929{
2930 assert(from == op->session);
2931 // from->lock is locked unique
2932
2933 if (from->is_homeless()) {
2934 num_homeless_ops.dec();
2935 }
2936
2937 from->linger_ops.erase(op->linger_id);
2938 put_session(from);
2939 op->session = NULL;
2940
2941 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
2942 << dendl;
2943}
2944
2945void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
2946{
2947 assert(from == op->session);
2948 // from->lock is locked
2949
2950 if (from->is_homeless()) {
2951 num_homeless_ops.dec();
2952 }
2953
2954 from->command_ops.erase(op->tid);
2955 put_session(from);
2956 op->session = NULL;
2957
2958 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
2959}
2960
2961void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
2962{
2963 // to->lock is locked
2964 assert(op->session == NULL);
2965 assert(op->tid);
2966
2967 if (to->is_homeless()) {
2968 num_homeless_ops.inc();
2969 }
2970
2971 get_session(to);
2972 op->session = to;
2973 to->command_ops[op->tid] = op;
2974
2975 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
2976}
2977
2978int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
2979 shunique_lock& sul)
2980{
2981 // rwlock is locked unique
2982
2983 int r = _calc_target(&linger_op->target, nullptr, true);
2984 if (r == RECALC_OP_TARGET_NEED_RESEND) {
2985 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
2986 << " pgid " << linger_op->target.pgid
2987 << " acting " << linger_op->target.acting << dendl;
2988
2989 OSDSession *s = NULL;
2990 r = _get_session(linger_op->target.osd, &s, sul);
2991 assert(r == 0);
2992
2993 if (linger_op->session != s) {
2994 // NB locking two sessions (s and linger_op->session) at the
2995 // same time here is only safe because we are the only one that
2996 // takes two, and we are holding rwlock for write. Disable
2997 // lockdep because it doesn't know that.
2998 OSDSession::unique_lock sl(s->lock);
2999 _session_linger_op_remove(linger_op->session, linger_op);
3000 _session_linger_op_assign(s, linger_op);
3001 }
3002
3003 put_session(s);
3004 return RECALC_OP_TARGET_NEED_RESEND;
3005 }
3006 return r;
3007}
3008
3009void Objecter::_cancel_linger_op(Op *op)
3010{
3011 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3012
3013 assert(!op->should_resend);
3014 if (op->onfinish) {
3015 delete op->onfinish;
3016 num_in_flight.dec();
3017 }
3018
3019 _finish_op(op, 0);
3020}
3021
3022void Objecter::_finish_op(Op *op, int r)
3023{
3024 ldout(cct, 15) << "finish_op " << op->tid << dendl;
3025
3026 // op->session->lock is locked unique or op->session is null
3027
3028 if (!op->ctx_budgeted && op->budgeted)
3029 put_op_budget(op);
3030
3031 if (op->ontimeout && r != -ETIMEDOUT)
3032 timer.cancel_event(op->ontimeout);
3033
3034 if (op->session) {
3035 _session_op_remove(op->session, op);
3036 }
3037
3038 logger->dec(l_osdc_op_active);
3039
3040 assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
3041
3042 inflight_ops.dec();
3043
3044 op->put();
3045}
3046
3047void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
3048{
3049 ldout(cct, 15) << "finish_op " << tid << dendl;
3050 shared_lock rl(rwlock);
3051
3052 OSDSession::unique_lock wl(session->lock);
3053
3054 map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
3055 if (iter == session->ops.end())
3056 return;
3057
3058 Op *op = iter->second;
3059
3060 _finish_op(op, 0);
3061}
3062
3063MOSDOp *Objecter::_prepare_osd_op(Op *op)
3064{
3065 // rwlock is locked
3066
3067 int flags = op->target.flags;
3068 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3069
3070 // Nothing checks this any longer, but needed for compatibility with
3071 // pre-luminous osds
3072 flags |= CEPH_OSD_FLAG_ONDISK;
3073
3074 if (!honor_osdmap_full)
3075 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3076
3077 op->target.paused = false;
3078 op->stamp = ceph::mono_clock::now();
3079
3080 hobject_t hobj = op->target.get_hobj();
3081 MOSDOp *m = new MOSDOp(client_inc.read(), op->tid,
3082 hobj, op->target.actual_pgid,
3083 osdmap->get_epoch(),
3084 flags, op->features);
3085
3086 m->set_snapid(op->snapid);
3087 m->set_snap_seq(op->snapc.seq);
3088 m->set_snaps(op->snapc.snaps);
3089
3090 m->ops = op->ops;
3091 m->set_mtime(op->mtime);
3092 m->set_retry_attempt(op->attempts++);
3093 m->trace = op->trace;
3094 if (!m->trace && cct->_conf->osdc_blkin_trace_all)
3095 m->trace.init("objecter op", &trace_endpoint);
3096
3097 if (op->priority)
3098 m->set_priority(op->priority);
3099 else
3100 m->set_priority(cct->_conf->osd_client_op_priority);
3101
3102 if (op->reqid != osd_reqid_t()) {
3103 m->set_reqid(op->reqid);
3104 }
3105
3106 logger->inc(l_osdc_op_send);
3107 logger->inc(l_osdc_op_send_bytes, m->get_data().length());
3108
3109 return m;
3110}
3111
3112void Objecter::_send_op(Op *op, MOSDOp *m)
3113{
3114 // rwlock is locked
3115 // op->session->lock is locked
3116
3117 // backoff?
3118 hobject_t hoid = op->target.get_hobj();
3119 auto p = op->session->backoffs.find(op->target.actual_pgid);
3120 if (p != op->session->backoffs.end()) {
3121 auto q = p->second.lower_bound(hoid);
3122 if (q != p->second.begin()) {
3123 --q;
3124 if (hoid >= q->second.end) {
3125 ++q;
3126 }
3127 }
3128 if (q != p->second.end()) {
3129 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3130 << "," << q->second.end << ")" << dendl;
3131 int r = cmp(hoid, q->second.begin);
3132 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3133 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3134 << " id " << q->second.id << " on " << hoid
3135 << ", queuing " << op << " tid " << op->tid << dendl;
3136 return;
3137 }
3138 }
3139 }
3140
3141 if (!m) {
3142 assert(op->tid > 0);
3143 m = _prepare_osd_op(op);
3144 }
3145
3146 if (op->target.actual_pgid != m->get_spg()) {
3147 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3148 << m->get_spg() << " to " << op->target.actual_pgid
3149 << ", updating and reencoding" << dendl;
3150 m->set_spg(op->target.actual_pgid);
3151 m->clear_payload(); // reencode
3152 }
3153
3154 ldout(cct, 15) << "_send_op " << op->tid << " to "
3155 << op->target.actual_pgid << " on osd." << op->session->osd
3156 << dendl;
3157
3158 ConnectionRef con = op->session->con;
3159 assert(con);
3160
3161 // preallocated rx buffer?
3162 if (op->con) {
3163 ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
3164 << op->con << dendl;
3165 op->con->revoke_rx_buffer(op->tid);
3166 }
3167 if (op->outbl &&
3168 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3169 op->outbl->length()) {
3170 ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
3171 << dendl;
3172 op->con = con;
3173 op->con->post_rx_buffer(op->tid, *op->outbl);
3174 }
3175
3176 op->incarnation = op->session->incarnation;
3177
3178 m->set_tid(op->tid);
3179
3180 op->session->con->send_message(m);
3181}
3182
3183int Objecter::calc_op_budget(Op *op)
3184{
3185 int op_budget = 0;
3186 for (vector<OSDOp>::iterator i = op->ops.begin();
3187 i != op->ops.end();
3188 ++i) {
3189 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3190 op_budget += i->indata.length();
3191 } else if (ceph_osd_op_mode_read(i->op.op)) {
3192 if (ceph_osd_op_type_data(i->op.op)) {
3193 if ((int64_t)i->op.extent.length > 0)
3194 op_budget += (int64_t)i->op.extent.length;
3195 } else if (ceph_osd_op_type_attr(i->op.op)) {
3196 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
3197 }
3198 }
3199 }
3200 return op_budget;
3201}
3202
3203void Objecter::_throttle_op(Op *op,
3204 shunique_lock& sul,
3205 int op_budget)
3206{
3207 assert(sul && sul.mutex() == &rwlock);
3208 bool locked_for_write = sul.owns_lock();
3209
3210 if (!op_budget)
3211 op_budget = calc_op_budget(op);
3212 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3213 sul.unlock();
3214 op_throttle_bytes.get(op_budget);
3215 if (locked_for_write)
3216 sul.lock();
3217 else
3218 sul.lock_shared();
3219 }
3220 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3221 sul.unlock();
3222 op_throttle_ops.get(1);
3223 if (locked_for_write)
3224 sul.lock();
3225 else
3226 sul.lock_shared();
3227 }
3228}
3229
3230void Objecter::unregister_op(Op *op)
3231{
3232 OSDSession::unique_lock sl(op->session->lock);
3233 op->session->ops.erase(op->tid);
3234 sl.unlock();
3235 put_session(op->session);
3236 op->session = NULL;
3237
3238 inflight_ops.dec();
3239}
3240
3241/* This function DOES put the passed message before returning */
3242void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3243{
3244 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3245
3246 // get pio
3247 ceph_tid_t tid = m->get_tid();
3248
3249 shunique_lock sul(rwlock, ceph::acquire_shared);
3250 if (!initialized.read()) {
3251 m->put();
3252 return;
3253 }
3254
3255 ConnectionRef con = m->get_connection();
3256 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3257 if (!s || s->con != con) {
3258 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3259 if (s) {
3260 s->put();
3261 }
3262 m->put();
3263 return;
3264 }
3265
3266 OSDSession::unique_lock sl(s->lock);
3267
3268 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3269 if (iter == s->ops.end()) {
3270 ldout(cct, 7) << "handle_osd_op_reply " << tid
3271 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3272 " onnvram" : " ack"))
3273 << " ... stray" << dendl;
3274 sl.unlock();
3275 put_session(s);
3276 m->put();
3277 return;
3278 }
3279
3280 ldout(cct, 7) << "handle_osd_op_reply " << tid
3281 << (m->is_ondisk() ? " ondisk" :
3282 (m->is_onnvram() ? " onnvram" : " ack"))
3283 << " uv " << m->get_user_version()
3284 << " in " << m->get_pg()
3285 << " attempt " << m->get_retry_attempt()
3286 << dendl;
3287 Op *op = iter->second;
3288
3289 if (retry_writes_after_first_reply && op->attempts == 1 &&
3290 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3291 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3292 if (op->onfinish) {
3293 num_in_flight.dec();
3294 }
3295 _session_op_remove(s, op);
3296 sl.unlock();
3297 put_session(s);
3298
3299 _op_submit(op, sul, NULL);
3300 m->put();
3301 return;
3302 }
3303
3304 if (m->get_retry_attempt() >= 0) {
3305 if (m->get_retry_attempt() != (op->attempts - 1)) {
3306 ldout(cct, 7) << " ignoring reply from attempt "
3307 << m->get_retry_attempt()
3308 << " from " << m->get_source_inst()
3309 << "; last attempt " << (op->attempts - 1) << " sent to "
3310 << op->session->con->get_peer_addr() << dendl;
3311 m->put();
3312 sl.unlock();
3313 put_session(s);
3314 return;
3315 }
3316 } else {
3317 // we don't know the request attempt because the server is old, so
3318 // just accept this one. we may do ACK callbacks we shouldn't
3319 // have, but that is better than doing callbacks out of order.
3320 }
3321
3322 Context *onfinish = 0;
3323
3324 int rc = m->get_result();
3325
3326 if (m->is_redirect_reply()) {
3327 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3328 if (op->onfinish)
3329 num_in_flight.dec();
3330 _session_op_remove(s, op);
3331 sl.unlock();
3332 put_session(s);
3333
3334 // FIXME: two redirects could race and reorder
3335
3336 op->tid = 0;
3337 m->get_redirect().combine_with_locator(op->target.target_oloc,
3338 op->target.target_oid.name);
3339 op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
3340 _op_submit(op, sul, NULL);
3341 m->put();
3342 return;
3343 }
3344
3345 if (rc == -EAGAIN) {
3346 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
3347
3348 // new tid
3349 s->ops.erase(op->tid);
3350 op->tid = last_tid.inc();
3351
3352 _send_op(op);
3353 sl.unlock();
3354 put_session(s);
3355 m->put();
3356 return;
3357 }
3358
3359 sul.unlock();
3360
3361 if (op->objver)
3362 *op->objver = m->get_user_version();
3363 if (op->reply_epoch)
3364 *op->reply_epoch = m->get_map_epoch();
3365 if (op->data_offset)
3366 *op->data_offset = m->get_header().data_off;
3367
3368 // got data?
3369 if (op->outbl) {
3370 if (op->con)
3371 op->con->revoke_rx_buffer(op->tid);
3372 m->claim_data(*op->outbl);
3373 op->outbl = 0;
3374 }
3375
3376 // per-op result demuxing
3377 vector<OSDOp> out_ops;
3378 m->claim_ops(out_ops);
3379
3380 if (out_ops.size() != op->ops.size())
3381 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3382 << " != request ops " << op->ops
3383 << " from " << m->get_source_inst() << dendl;
3384
3385 vector<bufferlist*>::iterator pb = op->out_bl.begin();
3386 vector<int*>::iterator pr = op->out_rval.begin();
3387 vector<Context*>::iterator ph = op->out_handler.begin();
3388 assert(op->out_bl.size() == op->out_rval.size());
3389 assert(op->out_bl.size() == op->out_handler.size());
3390 vector<OSDOp>::iterator p = out_ops.begin();
3391 for (unsigned i = 0;
3392 p != out_ops.end() && pb != op->out_bl.end();
3393 ++i, ++p, ++pb, ++pr, ++ph) {
3394 ldout(cct, 10) << " op " << i << " rval " << p->rval
3395 << " len " << p->outdata.length() << dendl;
3396 if (*pb)
3397 **pb = p->outdata;
3398 // set rval before running handlers so that handlers
3399 // can change it if e.g. decoding fails
3400 if (*pr)
3401 **pr = ceph_to_host_errno(p->rval);
3402 if (*ph) {
3403 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
3404 (*ph)->complete(ceph_to_host_errno(p->rval));
3405 *ph = NULL;
3406 }
3407 }
3408
3409 // NOTE: we assume that since we only request ONDISK ever we will
3410 // only ever get back one (type of) ack ever.
3411
3412 if (op->onfinish) {
3413 num_in_flight.dec();
3414 onfinish = op->onfinish;
3415 op->onfinish = NULL;
3416 }
3417 logger->inc(l_osdc_op_reply);
3418
3419 /* get it before we call _finish_op() */
3420 auto completion_lock = s->get_lock(op->target.base_oid);
3421
3422 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3423 _finish_op(op, 0);
3424
3425 ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
3426
3427 // serialize completions
3428 if (completion_lock.mutex()) {
3429 completion_lock.lock();
3430 }
3431 sl.unlock();
3432
3433 // do callbacks
3434 if (onfinish) {
3435 onfinish->complete(rc);
3436 }
3437 if (completion_lock.mutex()) {
3438 completion_lock.unlock();
3439 }
3440
3441 m->put();
3442 put_session(s);
3443}
3444
3445void Objecter::handle_osd_backoff(MOSDBackoff *m)
3446{
3447 ldout(cct, 10) << __func__ << " " << *m << dendl;
3448 shunique_lock sul(rwlock, ceph::acquire_shared);
3449 if (!initialized.read()) {
3450 m->put();
3451 return;
3452 }
3453
3454 ConnectionRef con = m->get_connection();
3455 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3456 if (!s || s->con != con) {
3457 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3458 if (s)
3459 s->put();
3460 m->put();
3461 return;
3462 }
3463
3464 get_session(s);
3465 s->put(); // from get_priv() above
3466
3467 OSDSession::unique_lock sl(s->lock);
3468
3469 switch (m->op) {
3470 case CEPH_OSD_BACKOFF_OP_BLOCK:
3471 {
3472 // register
3473 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3474 s->backoffs_by_id.insert(make_pair(m->id, &b));
3475 b.pgid = m->pgid;
3476 b.id = m->id;
3477 b.begin = m->begin;
3478 b.end = m->end;
3479
3480 // ack with original backoff's epoch so that the osd can discard this if
3481 // there was a pg split.
3482 Message *r = new MOSDBackoff(m->pgid,
3483 m->map_epoch,
3484 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3485 m->id, m->begin, m->end);
3486 // this priority must match the MOSDOps from _prepare_osd_op
3487 r->set_priority(cct->_conf->osd_client_op_priority);
3488 con->send_message(r);
3489 }
3490 break;
3491
3492 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3493 {
3494 auto p = s->backoffs_by_id.find(m->id);
3495 if (p != s->backoffs_by_id.end()) {
3496 OSDBackoff *b = p->second;
3497 if (b->begin != m->begin &&
3498 b->end != m->end) {
3499 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3500 << " unblock on ["
3501 << m->begin << "," << m->end << ") but backoff is ["
3502 << b->begin << "," << b->end << ")" << dendl;
3503 // hrmpf, unblock it anyway.
3504 }
3505 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3506 << " id " << b->id
3507 << " [" << b->begin << "," << b->end
3508 << ")" << dendl;
3509 auto spgp = s->backoffs.find(b->pgid);
3510 assert(spgp != s->backoffs.end());
3511 spgp->second.erase(b->begin);
3512 if (spgp->second.empty()) {
3513 s->backoffs.erase(spgp);
3514 }
3515 s->backoffs_by_id.erase(p);
3516
3517 // check for any ops to resend
3518 for (auto& q : s->ops) {
3519 if (q.second->target.actual_pgid == m->pgid) {
3520 int r = q.second->target.contained_by(m->begin, m->end);
3521 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3522 << q.second->target.get_hobj() << dendl;
3523 if (r) {
3524 _send_op(q.second);
3525 }
3526 }
3527 }
3528 } else {
3529 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3530 << " unblock on ["
3531 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3532 }
3533 }
3534 break;
3535
3536 default:
3537 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3538 }
3539
3540 sul.unlock();
3541 sl.unlock();
3542
3543 m->put();
3544 put_session(s);
3545}
3546
3547uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3548 uint32_t pos)
3549{
3550 shared_lock rl(rwlock);
3551 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3552 pos, list_context->pool_id, string());
3553 ldout(cct, 10) << __func__ << list_context
3554 << " pos " << pos << " -> " << list_context->pos << dendl;
3555 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3556 list_context->current_pg = actual.ps();
3557 list_context->at_end_of_pool = false;
3558 return pos;
3559}
3560
3561uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3562 const hobject_t& cursor)
3563{
3564 shared_lock rl(rwlock);
3565 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3566 list_context->pos = cursor;
3567 list_context->at_end_of_pool = false;
3568 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3569 list_context->current_pg = actual.ps();
3570 list_context->sort_bitwise = true;
3571 return list_context->current_pg;
3572}
3573
3574void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3575 hobject_t *cursor)
3576{
3577 shared_lock rl(rwlock);
3578 if (list_context->list.empty()) {
3579 *cursor = list_context->pos;
3580 } else {
3581 const librados::ListObjectImpl& entry = list_context->list.front();
3582 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3583 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3584 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3585 }
3586}
3587
3588void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3589{
3590 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3591 << " pool_snap_seq " << list_context->pool_snap_seq
3592 << " max_entries " << list_context->max_entries
3593 << " list_context " << list_context
3594 << " onfinish " << onfinish
3595 << " current_pg " << list_context->current_pg
3596 << " pos " << list_context->pos << dendl;
3597
3598 shared_lock rl(rwlock);
3599 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3600 if (!pool) { // pool is gone
3601 rl.unlock();
3602 put_nlist_context_budget(list_context);
3603 onfinish->complete(-ENOENT);
3604 return;
3605 }
3606 int pg_num = pool->get_pg_num();
3607 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3608
3609 if (list_context->pos.is_min()) {
3610 list_context->starting_pg_num = 0;
3611 list_context->sort_bitwise = sort_bitwise;
3612 list_context->starting_pg_num = pg_num;
3613 }
3614 if (list_context->sort_bitwise != sort_bitwise) {
3615 list_context->pos = hobject_t(
3616 object_t(), string(), CEPH_NOSNAP,
3617 list_context->current_pg, list_context->pool_id, string());
3618 list_context->sort_bitwise = sort_bitwise;
3619 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3620 << list_context->pos << dendl;
3621 }
3622 if (list_context->starting_pg_num != pg_num) {
3623 if (!sort_bitwise) {
3624 // start reading from the beginning; the pgs have changed
3625 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3626 list_context->pos = collection_list_handle_t();
3627 }
3628 list_context->starting_pg_num = pg_num;
3629 }
3630
3631 if (list_context->pos.is_max()) {
3632 ldout(cct, 20) << __func__ << " end of pool, list "
3633 << list_context->list << dendl;
3634 if (list_context->list.empty()) {
3635 list_context->at_end_of_pool = true;
3636 }
3637 // release the listing context's budget once all
3638 // OPs (in the session) are finished
3639 put_nlist_context_budget(list_context);
3640 onfinish->complete(0);
3641 return;
3642 }
3643
3644 ObjectOperation op;
3645 op.pg_nls(list_context->max_entries, list_context->filter,
3646 list_context->pos, osdmap->get_epoch());
3647 list_context->bl.clear();
3648 C_NList *onack = new C_NList(list_context, onfinish, this);
3649 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3650
3651 // note current_pg in case we don't have (or lose) SORTBITWISE
3652 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3653 rl.unlock();
3654
3655 pg_read(list_context->current_pg, oloc, op,
3656 &list_context->bl, 0, onack, &onack->epoch,
3657 &list_context->ctx_budget);
3658}
3659
3660void Objecter::_nlist_reply(NListContext *list_context, int r,
3661 Context *final_finish, epoch_t reply_epoch)
3662{
3663 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3664
3665 bufferlist::iterator iter = list_context->bl.begin();
3666 pg_nls_response_t response;
3667 bufferlist extra_info;
3668 ::decode(response, iter);
3669 if (!iter.end()) {
3670 ::decode(extra_info, iter);
3671 }
3672
3673 // if the osd returns 1 (newer code), or handle MAX, it means we
3674 // hit the end of the pg.
3675 if ((response.handle.is_max() || r == 1) &&
3676 !list_context->sort_bitwise) {
3677 // legacy OSD and !sortbitwise, figure out the next PG on our own
3678 ++list_context->current_pg;
3679 if (list_context->current_pg == list_context->starting_pg_num) {
3680 // end of pool
3681 list_context->pos = hobject_t::get_max();
3682 } else {
3683 // next pg
3684 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3685 list_context->current_pg,
3686 list_context->pool_id, string());
3687 }
3688 } else {
3689 list_context->pos = response.handle;
3690 }
3691
3692 int response_size = response.entries.size();
3693 ldout(cct, 20) << " response.entries.size " << response_size
3694 << ", response.entries " << response.entries
3695 << ", handle " << response.handle
3696 << ", tentative new pos " << list_context->pos << dendl;
3697 list_context->extra_info.append(extra_info);
3698 if (response_size) {
3699 list_context->list.splice(list_context->list.end(), response.entries);
3700 }
3701
3702 if (list_context->list.size() >= list_context->max_entries) {
3703 ldout(cct, 20) << " hit max, returning results so far, "
3704 << list_context->list << dendl;
3705 // release the listing context's budget once all
3706 // OPs (in the session) are finished
3707 put_nlist_context_budget(list_context);
3708 final_finish->complete(0);
3709 return;
3710 }
3711
3712 // continue!
3713 list_nobjects(list_context, final_finish);
3714}
3715
3716void Objecter::put_nlist_context_budget(NListContext *list_context)
3717{
3718 if (list_context->ctx_budget >= 0) {
3719 ldout(cct, 10) << " release listing context's budget " <<
3720 list_context->ctx_budget << dendl;
3721 put_op_budget_bytes(list_context->ctx_budget);
3722 list_context->ctx_budget = -1;
3723 }
3724}
3725
3726// snapshots
3727
3728int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3729 Context *onfinish)
3730{
3731 unique_lock wl(rwlock);
3732 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3733 << snap_name << dendl;
3734
3735 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3736 if (!p)
3737 return -EINVAL;
3738 if (p->snap_exists(snap_name.c_str()))
3739 return -EEXIST;
3740
3741 PoolOp *op = new PoolOp;
3742 if (!op)
3743 return -ENOMEM;
3744 op->tid = last_tid.inc();
3745 op->pool = pool;
3746 op->name = snap_name;
3747 op->onfinish = onfinish;
3748 op->pool_op = POOL_OP_CREATE_SNAP;
3749 pool_ops[op->tid] = op;
3750
3751 pool_op_submit(op);
3752
3753 return 0;
3754}
3755
3756struct C_SelfmanagedSnap : public Context {
3757 bufferlist bl;
3758 snapid_t *psnapid;
3759 Context *fin;
3760 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3761 void finish(int r) override {
3762 if (r == 0) {
3763 bufferlist::iterator p = bl.begin();
3764 ::decode(*psnapid, p);
3765 }
3766 fin->complete(r);
3767 }
3768};
3769
3770int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3771 Context *onfinish)
3772{
3773 unique_lock wl(rwlock);
3774 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3775 PoolOp *op = new PoolOp;
3776 if (!op) return -ENOMEM;
3777 op->tid = last_tid.inc();
3778 op->pool = pool;
3779 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3780 op->onfinish = fin;
3781 op->blp = &fin->bl;
3782 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3783 pool_ops[op->tid] = op;
3784
3785 pool_op_submit(op);
3786 return 0;
3787}
3788
3789int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3790 Context *onfinish)
3791{
3792 unique_lock wl(rwlock);
3793 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3794 << snap_name << dendl;
3795
3796 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3797 if (!p)
3798 return -EINVAL;
3799 if (!p->snap_exists(snap_name.c_str()))
3800 return -ENOENT;
3801
3802 PoolOp *op = new PoolOp;
3803 if (!op)
3804 return -ENOMEM;
3805 op->tid = last_tid.inc();
3806 op->pool = pool;
3807 op->name = snap_name;
3808 op->onfinish = onfinish;
3809 op->pool_op = POOL_OP_DELETE_SNAP;
3810 pool_ops[op->tid] = op;
3811
3812 pool_op_submit(op);
3813
3814 return 0;
3815}
3816
3817int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3818 Context *onfinish)
3819{
3820 unique_lock wl(rwlock);
3821 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3822 << snap << dendl;
3823 PoolOp *op = new PoolOp;
3824 if (!op) return -ENOMEM;
3825 op->tid = last_tid.inc();
3826 op->pool = pool;
3827 op->onfinish = onfinish;
3828 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3829 op->snapid = snap;
3830 pool_ops[op->tid] = op;
3831
3832 pool_op_submit(op);
3833
3834 return 0;
3835}
3836
3837int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
3838 int crush_rule)
3839{
3840 unique_lock wl(rwlock);
3841 ldout(cct, 10) << "create_pool name=" << name << dendl;
3842
3843 if (osdmap->lookup_pg_pool_name(name) >= 0)
3844 return -EEXIST;
3845
3846 PoolOp *op = new PoolOp;
3847 if (!op)
3848 return -ENOMEM;
3849 op->tid = last_tid.inc();
3850 op->pool = 0;
3851 op->name = name;
3852 op->onfinish = onfinish;
3853 op->pool_op = POOL_OP_CREATE;
3854 pool_ops[op->tid] = op;
3855 op->auid = auid;
3856 op->crush_rule = crush_rule;
3857
3858 pool_op_submit(op);
3859
3860 return 0;
3861}
3862
3863int Objecter::delete_pool(int64_t pool, Context *onfinish)
3864{
3865 unique_lock wl(rwlock);
3866 ldout(cct, 10) << "delete_pool " << pool << dendl;
3867
3868 if (!osdmap->have_pg_pool(pool))
3869 return -ENOENT;
3870
3871 _do_delete_pool(pool, onfinish);
3872 return 0;
3873}
3874
3875int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3876{
3877 unique_lock wl(rwlock);
3878 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3879
3880 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3881 if (pool < 0)
3882 return pool;
3883
3884 _do_delete_pool(pool, onfinish);
3885 return 0;
3886}
3887
3888void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
3889{
3890 PoolOp *op = new PoolOp;
3891 op->tid = last_tid.inc();
3892 op->pool = pool;
3893 op->name = "delete";
3894 op->onfinish = onfinish;
3895 op->pool_op = POOL_OP_DELETE;
3896 pool_ops[op->tid] = op;
3897 pool_op_submit(op);
3898}
3899
3900/**
3901 * change the auid owner of a pool by contacting the monitor.
3902 * This requires the current connection to have write permissions
3903 * on both the pool's current auid and the new (parameter) auid.
3904 * Uses the standard Context callback when done.
3905 */
3906int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
3907{
3908 unique_lock wl(rwlock);
3909 ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
3910 PoolOp *op = new PoolOp;
3911 if (!op) return -ENOMEM;
3912 op->tid = last_tid.inc();
3913 op->pool = pool;
3914 op->name = "change_pool_auid";
3915 op->onfinish = onfinish;
3916 op->pool_op = POOL_OP_AUID_CHANGE;
3917 op->auid = auid;
3918 pool_ops[op->tid] = op;
3919
3920 logger->set(l_osdc_poolop_active, pool_ops.size());
3921
3922 pool_op_submit(op);
3923 return 0;
3924}
3925
3926void Objecter::pool_op_submit(PoolOp *op)
3927{
3928 // rwlock is locked
3929 if (mon_timeout > timespan(0)) {
3930 op->ontimeout = timer.add_event(mon_timeout,
3931 [this, op]() {
3932 pool_op_cancel(op->tid, -ETIMEDOUT); });
3933 }
3934 _pool_op_submit(op);
3935}
3936
3937void Objecter::_pool_op_submit(PoolOp *op)
3938{
3939 // rwlock is locked unique
3940
3941 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
3942 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
3943 op->name, op->pool_op,
3944 op->auid, last_seen_osdmap_version);
3945 if (op->snapid) m->snapid = op->snapid;
3946 if (op->crush_rule) m->crush_rule = op->crush_rule;
3947 monc->send_mon_message(m);
3948 op->last_submit = ceph::mono_clock::now();
3949
3950 logger->inc(l_osdc_poolop_send);
3951}
3952
3953/**
3954 * Handle a reply to a PoolOp message. Check that we sent the message
3955 * and give the caller responsibility for the returned bufferlist.
3956 * Then either call the finisher or stash the PoolOp, depending on if we
3957 * have a new enough map.
3958 * Lastly, clean up the message and PoolOp.
3959 */
3960void Objecter::handle_pool_op_reply(MPoolOpReply *m)
3961{
3962 FUNCTRACE();
3963 shunique_lock sul(rwlock, acquire_shared);
3964 if (!initialized.read()) {
3965 sul.unlock();
3966 m->put();
3967 return;
3968 }
3969
3970 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
3971 ceph_tid_t tid = m->get_tid();
3972 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
3973 if (iter != pool_ops.end()) {
3974 PoolOp *op = iter->second;
3975 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
3976 << ceph_pool_op_name(op->pool_op) << dendl;
3977 if (op->blp)
3978 op->blp->claim(m->response_data);
3979 if (m->version > last_seen_osdmap_version)
3980 last_seen_osdmap_version = m->version;
3981 if (osdmap->get_epoch() < m->epoch) {
3982 sul.unlock();
3983 sul.lock();
3984 // recheck op existence since we have let go of rwlock
3985 // (for promotion) above.
3986 iter = pool_ops.find(tid);
3987 if (iter == pool_ops.end())
3988 goto done; // op is gone.
3989 if (osdmap->get_epoch() < m->epoch) {
3990 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
3991 << " before calling back" << dendl;
3992 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
3993 } else {
3994 // map epoch changed, probably because a MOSDMap message
3995 // sneaked in. Do caller-specified callback now or else
3996 // we lose it forever.
3997 assert(op->onfinish);
3998 op->onfinish->complete(m->replyCode);
3999 }
4000 } else {
4001 assert(op->onfinish);
4002 op->onfinish->complete(m->replyCode);
4003 }
4004 op->onfinish = NULL;
4005 if (!sul.owns_lock()) {
4006 sul.unlock();
4007 sul.lock();
4008 }
4009 iter = pool_ops.find(tid);
4010 if (iter != pool_ops.end()) {
4011 _finish_pool_op(op, 0);
4012 }
4013 } else {
4014 ldout(cct, 10) << "unknown request " << tid << dendl;
4015 }
4016
4017done:
4018 // Not strictly necessary, since we'll release it on return.
4019 sul.unlock();
4020
4021 ldout(cct, 10) << "done" << dendl;
4022 m->put();
4023}
4024
4025int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4026{
4027 assert(initialized.read());
4028
4029 unique_lock wl(rwlock);
4030
4031 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4032 if (it == pool_ops.end()) {
4033 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4034 return -ENOENT;
4035 }
4036
4037 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4038
4039 PoolOp *op = it->second;
4040 if (op->onfinish)
4041 op->onfinish->complete(r);
4042
4043 _finish_pool_op(op, r);
4044 return 0;
4045}
4046
4047void Objecter::_finish_pool_op(PoolOp *op, int r)
4048{
4049 // rwlock is locked unique
4050 pool_ops.erase(op->tid);
4051 logger->set(l_osdc_poolop_active, pool_ops.size());
4052
4053 if (op->ontimeout && r != -ETIMEDOUT) {
4054 timer.cancel_event(op->ontimeout);
4055 }
4056
4057 delete op;
4058}
4059
4060// pool stats
4061
4062void Objecter::get_pool_stats(list<string>& pools,
4063 map<string,pool_stat_t> *result,
4064 Context *onfinish)
4065{
4066 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4067
4068 PoolStatOp *op = new PoolStatOp;
4069 op->tid = last_tid.inc();
4070 op->pools = pools;
4071 op->pool_stats = result;
4072 op->onfinish = onfinish;
4073 if (mon_timeout > timespan(0)) {
4074 op->ontimeout = timer.add_event(mon_timeout,
4075 [this, op]() {
4076 pool_stat_op_cancel(op->tid,
4077 -ETIMEDOUT); });
4078 } else {
4079 op->ontimeout = 0;
4080 }
4081
4082 unique_lock wl(rwlock);
4083
4084 poolstat_ops[op->tid] = op;
4085
4086 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4087
4088 _poolstat_submit(op);
4089}
4090
4091void Objecter::_poolstat_submit(PoolStatOp *op)
4092{
4093 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4094 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4095 op->pools,
4096 last_seen_pgmap_version));
4097 op->last_submit = ceph::mono_clock::now();
4098
4099 logger->inc(l_osdc_poolstat_send);
4100}
4101
4102void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4103{
4104 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4105 ceph_tid_t tid = m->get_tid();
4106
4107 unique_lock wl(rwlock);
4108 if (!initialized.read()) {
4109 m->put();
4110 return;
4111 }
4112
4113 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4114 if (iter != poolstat_ops.end()) {
4115 PoolStatOp *op = poolstat_ops[tid];
4116 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4117 *op->pool_stats = m->pool_stats;
4118 if (m->version > last_seen_pgmap_version) {
4119 last_seen_pgmap_version = m->version;
4120 }
4121 op->onfinish->complete(0);
4122 _finish_pool_stat_op(op, 0);
4123 } else {
4124 ldout(cct, 10) << "unknown request " << tid << dendl;
4125 }
4126 ldout(cct, 10) << "done" << dendl;
4127 m->put();
4128}
4129
4130int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4131{
4132 assert(initialized.read());
4133
4134 unique_lock wl(rwlock);
4135
4136 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4137 if (it == poolstat_ops.end()) {
4138 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4139 return -ENOENT;
4140 }
4141
4142 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4143
4144 PoolStatOp *op = it->second;
4145 if (op->onfinish)
4146 op->onfinish->complete(r);
4147 _finish_pool_stat_op(op, r);
4148 return 0;
4149}
4150
4151void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4152{
4153 // rwlock is locked unique
4154
4155 poolstat_ops.erase(op->tid);
4156 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4157
4158 if (op->ontimeout && r != -ETIMEDOUT)
4159 timer.cancel_event(op->ontimeout);
4160
4161 delete op;
4162}
4163
4164void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
4165{
4166 ldout(cct, 10) << "get_fs_stats" << dendl;
4167 unique_lock l(rwlock);
4168
4169 StatfsOp *op = new StatfsOp;
4170 op->tid = last_tid.inc();
4171 op->stats = &result;
4172 op->onfinish = onfinish;
4173 if (mon_timeout > timespan(0)) {
4174 op->ontimeout = timer.add_event(mon_timeout,
4175 [this, op]() {
4176 statfs_op_cancel(op->tid,
4177 -ETIMEDOUT); });
4178 } else {
4179 op->ontimeout = 0;
4180 }
4181 statfs_ops[op->tid] = op;
4182
4183 logger->set(l_osdc_statfs_active, statfs_ops.size());
4184
4185 _fs_stats_submit(op);
4186}
4187
4188void Objecter::_fs_stats_submit(StatfsOp *op)
4189{
4190 // rwlock is locked unique
4191
4192 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4193 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
4194 last_seen_pgmap_version));
4195 op->last_submit = ceph::mono_clock::now();
4196
4197 logger->inc(l_osdc_statfs_send);
4198}
4199
4200void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4201{
4202 unique_lock wl(rwlock);
4203 if (!initialized.read()) {
4204 m->put();
4205 return;
4206 }
4207
4208 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4209 ceph_tid_t tid = m->get_tid();
4210
4211 if (statfs_ops.count(tid)) {
4212 StatfsOp *op = statfs_ops[tid];
4213 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4214 *(op->stats) = m->h.st;
4215 if (m->h.version > last_seen_pgmap_version)
4216 last_seen_pgmap_version = m->h.version;
4217 op->onfinish->complete(0);
4218 _finish_statfs_op(op, 0);
4219 } else {
4220 ldout(cct, 10) << "unknown request " << tid << dendl;
4221 }
4222 m->put();
4223 ldout(cct, 10) << "done" << dendl;
4224}
4225
4226int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4227{
4228 assert(initialized.read());
4229
4230 unique_lock wl(rwlock);
4231
4232 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4233 if (it == statfs_ops.end()) {
4234 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4235 return -ENOENT;
4236 }
4237
4238 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4239
4240 StatfsOp *op = it->second;
4241 if (op->onfinish)
4242 op->onfinish->complete(r);
4243 _finish_statfs_op(op, r);
4244 return 0;
4245}
4246
4247void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4248{
4249 // rwlock is locked unique
4250
4251 statfs_ops.erase(op->tid);
4252 logger->set(l_osdc_statfs_active, statfs_ops.size());
4253
4254 if (op->ontimeout && r != -ETIMEDOUT)
4255 timer.cancel_event(op->ontimeout);
4256
4257 delete op;
4258}
4259
4260// scatter/gather
4261
4262void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4263 vector<bufferlist>& resultbl,
4264 bufferlist *bl, Context *onfinish)
4265{
4266 // all done
4267 ldout(cct, 15) << "_sg_read_finish" << dendl;
4268
4269 if (extents.size() > 1) {
4270 Striper::StripedReadResult r;
4271 vector<bufferlist>::iterator bit = resultbl.begin();
4272 for (vector<ObjectExtent>::iterator eit = extents.begin();
4273 eit != extents.end();
4274 ++eit, ++bit) {
4275 r.add_partial_result(cct, *bit, eit->buffer_extents);
4276 }
4277 bl->clear();
4278 r.assemble_result(cct, *bl, false);
4279 } else {
4280 ldout(cct, 15) << " only one frag" << dendl;
4281 bl->claim(resultbl[0]);
4282 }
4283
4284 // done
4285 uint64_t bytes_read = bl->length();
4286 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4287
4288 if (onfinish) {
4289 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4290 }
4291}
4292
4293
4294void Objecter::ms_handle_connect(Connection *con)
4295{
4296 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
4297 if (!initialized.read())
4298 return;
4299
4300 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4301 resend_mon_ops();
4302}
4303
4304bool Objecter::ms_handle_reset(Connection *con)
4305{
4306 if (!initialized.read())
4307 return false;
4308 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
4309 OSDSession *session = static_cast<OSDSession*>(con->get_priv());
4310 if (session) {
4311 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4312 << " osd." << session->osd << dendl;
4313 unique_lock wl(rwlock);
4314 if (!initialized.read()) {
4315 wl.unlock();
4316 return false;
4317 }
4318 map<uint64_t, LingerOp *> lresend;
4319 OSDSession::unique_lock sl(session->lock);
4320 _reopen_session(session);
4321 _kick_requests(session, lresend);
4322 sl.unlock();
4323 _linger_ops_resend(lresend, wl);
4324 wl.unlock();
4325 maybe_request_map();
4326 session->put();
4327 }
4328 return true;
4329 }
4330 return false;
4331}
4332
4333void Objecter::ms_handle_remote_reset(Connection *con)
4334{
4335 /*
4336 * treat these the same.
4337 */
4338 ms_handle_reset(con);
4339}
4340
4341bool Objecter::ms_handle_refused(Connection *con)
4342{
4343 // just log for now
4344 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4345 int osd = osdmap->identify_osd(con->get_peer_addr());
4346 if (osd >= 0) {
4347 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4348 }
4349 }
4350 return false;
4351}
4352
4353bool Objecter::ms_get_authorizer(int dest_type,
4354 AuthAuthorizer **authorizer,
4355 bool force_new)
4356{
4357 if (!initialized.read())
4358 return false;
4359 if (dest_type == CEPH_ENTITY_TYPE_MON)
4360 return true;
4361 *authorizer = monc->build_authorizer(dest_type);
4362 return *authorizer != NULL;
4363}
4364
4365void Objecter::op_target_t::dump(Formatter *f) const
4366{
4367 f->dump_stream("pg") << pgid;
4368 f->dump_int("osd", osd);
4369 f->dump_stream("object_id") << base_oid;
4370 f->dump_stream("object_locator") << base_oloc;
4371 f->dump_stream("target_object_id") << target_oid;
4372 f->dump_stream("target_object_locator") << target_oloc;
4373 f->dump_int("paused", (int)paused);
4374 f->dump_int("used_replica", (int)used_replica);
4375 f->dump_int("precalc_pgid", (int)precalc_pgid);
4376}
4377
4378void Objecter::_dump_active(OSDSession *s)
4379{
4380 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4381 p != s->ops.end();
4382 ++p) {
4383 Op *op = p->second;
4384 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4385 << "\tosd." << (op->session ? op->session->osd : -1)
4386 << "\t" << op->target.base_oid
4387 << "\t" << op->ops << dendl;
4388 }
4389}
4390
4391void Objecter::_dump_active()
4392{
4393 ldout(cct, 20) << "dump_active .. " << num_homeless_ops.read() << " homeless"
4394 << dendl;
4395 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4396 siter != osd_sessions.end(); ++siter) {
4397 OSDSession *s = siter->second;
4398 OSDSession::shared_lock sl(s->lock);
4399 _dump_active(s);
4400 sl.unlock();
4401 }
4402 _dump_active(homeless_session);
4403}
4404
4405void Objecter::dump_active()
4406{
4407 shared_lock rl(rwlock);
4408 _dump_active();
4409 rl.unlock();
4410}
4411
4412void Objecter::dump_requests(Formatter *fmt)
4413{
4414 // Read-lock on Objecter held here
4415 fmt->open_object_section("requests");
4416 dump_ops(fmt);
4417 dump_linger_ops(fmt);
4418 dump_pool_ops(fmt);
4419 dump_pool_stat_ops(fmt);
4420 dump_statfs_ops(fmt);
4421 dump_command_ops(fmt);
4422 fmt->close_section(); // requests object
4423}
4424
4425void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4426{
4427 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4428 p != s->ops.end();
4429 ++p) {
4430 Op *op = p->second;
4431 fmt->open_object_section("op");
4432 fmt->dump_unsigned("tid", op->tid);
4433 op->target.dump(fmt);
4434 fmt->dump_stream("last_sent") << op->stamp;
4435 fmt->dump_int("attempts", op->attempts);
4436 fmt->dump_stream("snapid") << op->snapid;
4437 fmt->dump_stream("snap_context") << op->snapc;
4438 fmt->dump_stream("mtime") << op->mtime;
4439
4440 fmt->open_array_section("osd_ops");
4441 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4442 it != op->ops.end();
4443 ++it) {
4444 fmt->dump_stream("osd_op") << *it;
4445 }
4446 fmt->close_section(); // osd_ops array
4447
4448 fmt->close_section(); // op object
4449 }
4450}
4451
4452void Objecter::dump_ops(Formatter *fmt)
4453{
4454 // Read-lock on Objecter held
4455 fmt->open_array_section("ops");
4456 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4457 siter != osd_sessions.end(); ++siter) {
4458 OSDSession *s = siter->second;
4459 OSDSession::shared_lock sl(s->lock);
4460 _dump_ops(s, fmt);
4461 sl.unlock();
4462 }
4463 _dump_ops(homeless_session, fmt);
4464 fmt->close_section(); // ops array
4465}
4466
4467void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4468{
4469 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4470 p != s->linger_ops.end();
4471 ++p) {
4472 LingerOp *op = p->second;
4473 fmt->open_object_section("linger_op");
4474 fmt->dump_unsigned("linger_id", op->linger_id);
4475 op->target.dump(fmt);
4476 fmt->dump_stream("snapid") << op->snap;
4477 fmt->dump_stream("registered") << op->registered;
4478 fmt->close_section(); // linger_op object
4479 }
4480}
4481
4482void Objecter::dump_linger_ops(Formatter *fmt)
4483{
4484 // We have a read-lock on the objecter
4485 fmt->open_array_section("linger_ops");
4486 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4487 siter != osd_sessions.end(); ++siter) {
4488 OSDSession *s = siter->second;
4489 OSDSession::shared_lock sl(s->lock);
4490 _dump_linger_ops(s, fmt);
4491 sl.unlock();
4492 }
4493 _dump_linger_ops(homeless_session, fmt);
4494 fmt->close_section(); // linger_ops array
4495}
4496
4497void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4498{
4499 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4500 p != s->command_ops.end();
4501 ++p) {
4502 CommandOp *op = p->second;
4503 fmt->open_object_section("command_op");
4504 fmt->dump_unsigned("command_id", op->tid);
4505 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4506 fmt->open_array_section("command");
4507 for (vector<string>::const_iterator q = op->cmd.begin();
4508 q != op->cmd.end(); ++q)
4509 fmt->dump_string("word", *q);
4510 fmt->close_section();
4511 if (op->target_osd >= 0)
4512 fmt->dump_int("target_osd", op->target_osd);
4513 else
4514 fmt->dump_stream("target_pg") << op->target_pg;
4515 fmt->close_section(); // command_op object
4516 }
4517}
4518
4519void Objecter::dump_command_ops(Formatter *fmt)
4520{
4521 // We have a read-lock on the Objecter here
4522 fmt->open_array_section("command_ops");
4523 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4524 siter != osd_sessions.end(); ++siter) {
4525 OSDSession *s = siter->second;
4526 OSDSession::shared_lock sl(s->lock);
4527 _dump_command_ops(s, fmt);
4528 sl.unlock();
4529 }
4530 _dump_command_ops(homeless_session, fmt);
4531 fmt->close_section(); // command_ops array
4532}
4533
4534void Objecter::dump_pool_ops(Formatter *fmt) const
4535{
4536 fmt->open_array_section("pool_ops");
4537 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4538 p != pool_ops.end();
4539 ++p) {
4540 PoolOp *op = p->second;
4541 fmt->open_object_section("pool_op");
4542 fmt->dump_unsigned("tid", op->tid);
4543 fmt->dump_int("pool", op->pool);
4544 fmt->dump_string("name", op->name);
4545 fmt->dump_int("operation_type", op->pool_op);
4546 fmt->dump_unsigned("auid", op->auid);
4547 fmt->dump_unsigned("crush_rule", op->crush_rule);
4548 fmt->dump_stream("snapid") << op->snapid;
4549 fmt->dump_stream("last_sent") << op->last_submit;
4550 fmt->close_section(); // pool_op object
4551 }
4552 fmt->close_section(); // pool_ops array
4553}
4554
4555void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4556{
4557 fmt->open_array_section("pool_stat_ops");
4558 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4559 p != poolstat_ops.end();
4560 ++p) {
4561 PoolStatOp *op = p->second;
4562 fmt->open_object_section("pool_stat_op");
4563 fmt->dump_unsigned("tid", op->tid);
4564 fmt->dump_stream("last_sent") << op->last_submit;
4565
4566 fmt->open_array_section("pools");
4567 for (list<string>::const_iterator it = op->pools.begin();
4568 it != op->pools.end();
4569 ++it) {
4570 fmt->dump_string("pool", *it);
4571 }
4572 fmt->close_section(); // pools array
4573
4574 fmt->close_section(); // pool_stat_op object
4575 }
4576 fmt->close_section(); // pool_stat_ops array
4577}
4578
4579void Objecter::dump_statfs_ops(Formatter *fmt) const
4580{
4581 fmt->open_array_section("statfs_ops");
4582 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4583 p != statfs_ops.end();
4584 ++p) {
4585 StatfsOp *op = p->second;
4586 fmt->open_object_section("statfs_op");
4587 fmt->dump_unsigned("tid", op->tid);
4588 fmt->dump_stream("last_sent") << op->last_submit;
4589 fmt->close_section(); // statfs_op object
4590 }
4591 fmt->close_section(); // statfs_ops array
4592}
4593
4594Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4595 m_objecter(objecter)
4596{
4597}
4598
4599bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
4600 std::string format, bufferlist& out)
4601{
4602 Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
4603 shared_lock rl(m_objecter->rwlock);
4604 m_objecter->dump_requests(f);
4605 f->flush(out);
4606 delete f;
4607 return true;
4608}
4609
4610void Objecter::blacklist_self(bool set)
4611{
4612 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4613
4614 vector<string> cmd;
4615 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4616 if (set)
4617 cmd.push_back("\"blacklistop\":\"add\",");
4618 else
4619 cmd.push_back("\"blacklistop\":\"rm\",");
4620 stringstream ss;
4621 ss << messenger->get_myaddr();
4622 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4623
4624 MMonCommand *m = new MMonCommand(monc->get_fsid());
4625 m->cmd = cmd;
4626
4627 monc->send_mon_message(m);
4628}
4629
4630// commands
4631
4632void Objecter::handle_command_reply(MCommandReply *m)
4633{
4634 unique_lock wl(rwlock);
4635 if (!initialized.read()) {
4636 m->put();
4637 return;
4638 }
4639
4640 ConnectionRef con = m->get_connection();
4641 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
4642 if (!s || s->con != con) {
4643 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4644 m->put();
4645 if (s)
4646 s->put();
4647 return;
4648 }
4649
4650 OSDSession::shared_lock sl(s->lock);
4651 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4652 if (p == s->command_ops.end()) {
4653 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4654 << " not found" << dendl;
4655 m->put();
4656 sl.unlock();
4657 if (s)
4658 s->put();
4659 return;
4660 }
4661
4662 CommandOp *c = p->second;
4663 if (!c->session ||
4664 m->get_connection() != c->session->con) {
4665 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4666 << " got reply from wrong connection "
4667 << m->get_connection() << " " << m->get_source_inst()
4668 << dendl;
4669 m->put();
4670 sl.unlock();
4671 if (s)
4672 s->put();
4673 return;
4674 }
4675 if (c->poutbl) {
4676 c->poutbl->claim(m->get_data());
4677 }
4678
4679 sl.unlock();
4680
4681
4682 _finish_command(c, m->r, m->rs);
4683 m->put();
4684 if (s)
4685 s->put();
4686}
4687
4688void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4689{
4690 shunique_lock sul(rwlock, ceph::acquire_unique);
4691
4692 ceph_tid_t tid = last_tid.inc();
4693 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4694 c->tid = tid;
4695
4696 {
4697 OSDSession::unique_lock hs_wl(homeless_session->lock);
4698 _session_command_op_assign(homeless_session, c);
4699 }
4700
4701 _calc_command_target(c, sul);
4702 _assign_command_session(c, sul);
4703 if (osd_timeout > timespan(0)) {
4704 c->ontimeout = timer.add_event(osd_timeout,
4705 [this, c, tid]() {
4706 command_op_cancel(c->session, tid,
4707 -ETIMEDOUT); });
4708 }
4709
4710 if (!c->session->is_homeless()) {
4711 _send_command(c);
4712 } else {
4713 _maybe_request_map();
4714 }
4715 if (c->map_check_error)
4716 _send_command_map_check(c);
4717 *ptid = tid;
4718
4719 logger->inc(l_osdc_command_active);
4720}
4721
4722int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4723{
4724 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4725
4726 c->map_check_error = 0;
4727
4728 // ignore overlays, just like we do with pg ops
4729 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4730
4731 if (c->target_osd >= 0) {
4732 if (!osdmap->exists(c->target_osd)) {
4733 c->map_check_error = -ENOENT;
4734 c->map_check_error_str = "osd dne";
4735 c->target.osd = -1;
4736 return RECALC_OP_TARGET_OSD_DNE;
4737 }
4738 if (osdmap->is_down(c->target_osd)) {
4739 c->map_check_error = -ENXIO;
4740 c->map_check_error_str = "osd down";
4741 c->target.osd = -1;
4742 return RECALC_OP_TARGET_OSD_DOWN;
4743 }
4744 c->target.osd = c->target_osd;
4745 } else {
4746 int ret = _calc_target(&(c->target), nullptr, true);
4747 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4748 c->map_check_error = -ENOENT;
4749 c->map_check_error_str = "pool dne";
4750 c->target.osd = -1;
4751 return ret;
4752 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4753 c->map_check_error = -ENXIO;
4754 c->map_check_error_str = "osd down";
4755 c->target.osd = -1;
4756 return ret;
4757 }
4758 }
4759
4760 OSDSession *s;
4761 int r = _get_session(c->target.osd, &s, sul);
4762 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4763
4764 if (c->session != s) {
4765 put_session(s);
4766 return RECALC_OP_TARGET_NEED_RESEND;
4767 }
4768
4769 put_session(s);
4770
4771 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4772 << c->session << dendl;
4773
4774 return RECALC_OP_TARGET_NO_ACTION;
4775}
4776
4777void Objecter::_assign_command_session(CommandOp *c,
4778 shunique_lock& sul)
4779{
4780 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4781
4782 OSDSession *s;
4783 int r = _get_session(c->target.osd, &s, sul);
4784 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4785
4786 if (c->session != s) {
4787 if (c->session) {
4788 OSDSession *cs = c->session;
4789 OSDSession::unique_lock csl(cs->lock);
4790 _session_command_op_remove(c->session, c);
4791 csl.unlock();
4792 }
4793 OSDSession::unique_lock sl(s->lock);
4794 _session_command_op_assign(s, c);
4795 }
4796
4797 put_session(s);
4798}
4799
4800void Objecter::_send_command(CommandOp *c)
4801{
4802 ldout(cct, 10) << "_send_command " << c->tid << dendl;
4803 assert(c->session);
4804 assert(c->session->con);
4805 MCommand *m = new MCommand(monc->monmap.fsid);
4806 m->cmd = c->cmd;
4807 m->set_data(c->inbl);
4808 m->set_tid(c->tid);
4809 c->session->con->send_message(m);
4810 logger->inc(l_osdc_command_send);
4811}
4812
4813int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4814{
4815 assert(initialized.read());
4816
4817 unique_lock wl(rwlock);
4818
4819 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4820 if (it == s->command_ops.end()) {
4821 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4822 return -ENOENT;
4823 }
4824
4825 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4826
4827 CommandOp *op = it->second;
4828 _command_cancel_map_check(op);
4829 _finish_command(op, r, "");
4830 return 0;
4831}
4832
4833void Objecter::_finish_command(CommandOp *c, int r, string rs)
4834{
4835 // rwlock is locked unique
4836
4837 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4838 << rs << dendl;
4839 if (c->prs)
4840 *c->prs = rs;
4841 if (c->onfinish)
4842 c->onfinish->complete(r);
4843
4844 if (c->ontimeout && r != -ETIMEDOUT)
4845 timer.cancel_event(c->ontimeout);
4846
4847 OSDSession *s = c->session;
4848 OSDSession::unique_lock sl(s->lock);
4849 _session_command_op_remove(c->session, c);
4850 sl.unlock();
4851
4852 c->put();
4853
4854 logger->dec(l_osdc_command_active);
4855}
4856
4857Objecter::OSDSession::~OSDSession()
4858{
4859 // Caller is responsible for re-assigning or
4860 // destroying any ops that were assigned to us
4861 assert(ops.empty());
4862 assert(linger_ops.empty());
4863 assert(command_ops.empty());
4864}
4865
4866Objecter::~Objecter()
4867{
4868 delete osdmap;
4869
4870 assert(homeless_session->get_nref() == 1);
4871 assert(num_homeless_ops.read() == 0);
4872 homeless_session->put();
4873
4874 assert(osd_sessions.empty());
4875 assert(poolstat_ops.empty());
4876 assert(statfs_ops.empty());
4877 assert(pool_ops.empty());
4878 assert(waiting_for_map.empty());
4879 assert(linger_ops.empty());
4880 assert(check_latest_map_lingers.empty());
4881 assert(check_latest_map_ops.empty());
4882 assert(check_latest_map_commands.empty());
4883
4884 assert(!m_request_state_hook);
4885 assert(!logger);
4886}
4887
4888/**
4889 * Wait until this OSD map epoch is received before
4890 * sending any more operations to OSDs. Use this
4891 * when it is known that the client can't trust
4892 * anything from before this epoch (e.g. due to
4893 * client blacklist at this epoch).
4894 */
4895void Objecter::set_epoch_barrier(epoch_t epoch)
4896{
4897 unique_lock wl(rwlock);
4898
4899 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
4900 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
4901 << dendl;
4902 if (epoch > epoch_barrier) {
4903 epoch_barrier = epoch;
4904 _maybe_request_map();
4905 }
4906}
4907
4908
4909
4910hobject_t Objecter::enumerate_objects_begin()
4911{
4912 return hobject_t();
4913}
4914
4915hobject_t Objecter::enumerate_objects_end()
4916{
4917 return hobject_t::get_max();
4918}
4919
4920struct C_EnumerateReply : public Context {
4921 bufferlist bl;
4922
4923 Objecter *objecter;
4924 hobject_t *next;
4925 std::list<librados::ListObjectImpl> *result;
4926 const hobject_t end;
4927 const int64_t pool_id;
4928 Context *on_finish;
4929
4930 epoch_t epoch;
4931 int budget;
4932
4933 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
4934 std::list<librados::ListObjectImpl> *result_,
4935 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
4936 objecter(objecter_), next(next_), result(result_),
4937 end(end_), pool_id(pool_id_), on_finish(on_finish_),
4938 epoch(0), budget(0)
4939 {}
4940
4941 void finish(int r) override {
4942 objecter->_enumerate_reply(
4943 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
4944 }
4945};
4946
4947void Objecter::enumerate_objects(
4948 int64_t pool_id,
4949 const std::string &ns,
4950 const hobject_t &start,
4951 const hobject_t &end,
4952 const uint32_t max,
4953 const bufferlist &filter_bl,
4954 std::list<librados::ListObjectImpl> *result,
4955 hobject_t *next,
4956 Context *on_finish)
4957{
4958 assert(result);
4959
4960 if (!end.is_max() && start > end) {
4961 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
4962 on_finish->complete(-EINVAL);
4963 return;
4964 }
4965
4966 if (max < 1) {
4967 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
4968 on_finish->complete(-EINVAL);
4969 return;
4970 }
4971
4972 if (start.is_max()) {
4973 on_finish->complete(0);
4974 return;
4975 }
4976
4977 shared_lock rl(rwlock);
4978 assert(osdmap->get_epoch());
4979 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
4980 rl.unlock();
4981 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
4982 on_finish->complete(-EOPNOTSUPP);
4983 return;
4984 }
4985 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
4986 if (!p) {
4987 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
4988 << osdmap->get_epoch() << dendl;
4989 rl.unlock();
4990 on_finish->complete(-ENOENT);
4991 return;
4992 } else {
4993 rl.unlock();
4994 }
4995
4996 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
4997
4998 // Stash completion state
4999 C_EnumerateReply *on_ack = new C_EnumerateReply(
5000 this, next, result, end, pool_id, on_finish);
5001
5002 ObjectOperation op;
5003 op.pg_nls(max, filter_bl, start, 0);
5004
5005 // Issue. See you later in _enumerate_reply
5006 object_locator_t oloc(pool_id, ns);
5007 pg_read(start.get_hash(), oloc, op,
5008 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5009}
5010
5011void Objecter::_enumerate_reply(
5012 bufferlist &bl,
5013 int r,
5014 const hobject_t &end,
5015 const int64_t pool_id,
5016 int budget,
5017 epoch_t reply_epoch,
5018 std::list<librados::ListObjectImpl> *result,
5019 hobject_t *next,
5020 Context *on_finish)
5021{
5022 if (budget > 0) {
5023 put_op_budget_bytes(budget);
5024 }
5025
5026 if (r < 0) {
5027 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5028 on_finish->complete(r);
5029 return;
5030 }
5031
5032 assert(next != NULL);
5033
5034 // Decode the results
5035 bufferlist::iterator iter = bl.begin();
5036 pg_nls_response_t response;
5037
5038 // XXX extra_info doesn't seem used anywhere?
5039 bufferlist extra_info;
5040 ::decode(response, iter);
5041 if (!iter.end()) {
5042 ::decode(extra_info, iter);
5043 }
5044
5045 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5046 << " handle " << response.handle
5047 << " reply_epoch " << reply_epoch << dendl;
5048 ldout(cct, 20) << __func__ << ": response.entries.size "
5049 << response.entries.size() << ", response.entries "
5050 << response.entries << dendl;
5051 if (response.handle <= end) {
5052 *next = response.handle;
5053 } else {
5054 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5055 << dendl;
5056 *next = end;
5057
5058 // drop anything after 'end'
5059 shared_lock rl(rwlock);
5060 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5061 if (!pool) {
5062 // pool is gone, drop any results which are now meaningless.
5063 rl.unlock();
5064 on_finish->complete(-ENOENT);
5065 return;
5066 }
5067 while (!response.entries.empty()) {
5068 uint32_t hash = response.entries.back().locator.empty() ?
5069 pool->hash_key(response.entries.back().oid,
5070 response.entries.back().nspace) :
5071 pool->hash_key(response.entries.back().locator,
5072 response.entries.back().nspace);
5073 hobject_t last(response.entries.back().oid,
5074 response.entries.back().locator,
5075 CEPH_NOSNAP,
5076 hash,
5077 pool_id,
5078 response.entries.back().nspace);
5079 if (last < end)
5080 break;
5081 ldout(cct, 20) << __func__ << " dropping item " << last
5082 << " >= end " << end << dendl;
5083 response.entries.pop_back();
5084 }
5085 rl.unlock();
5086 }
5087 if (!response.entries.empty()) {
5088 result->merge(response.entries);
5089 }
5090
5091 // release the listing context's budget once all
5092 // OPs (in the session) are finished
5093#if 0
5094 put_nlist_context_budget(list_context);
5095#endif
5096 on_finish->complete(r);
5097 return;
5098}
5099
5100namespace {
5101 using namespace librados;
5102
5103 template <typename T>
5104 void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
5105 {
5106 for (auto bl : bls) {
5107 auto p = bl.begin();
5108 T t;
5109 decode(t, p);
5110 items.push_back(t);
5111 }
5112 }
5113
5114 struct C_ObjectOperation_scrub_ls : public Context {
5115 bufferlist bl;
5116 uint32_t *interval;
5117 std::vector<inconsistent_obj_t> *objects = nullptr;
5118 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5119 int *rval;
5120
5121 C_ObjectOperation_scrub_ls(uint32_t *interval,
5122 std::vector<inconsistent_obj_t> *objects,
5123 int *rval)
5124 : interval(interval), objects(objects), rval(rval) {}
5125 C_ObjectOperation_scrub_ls(uint32_t *interval,
5126 std::vector<inconsistent_snapset_t> *snapsets,
5127 int *rval)
5128 : interval(interval), snapsets(snapsets), rval(rval) {}
5129 void finish(int r) override {
5130 if (r < 0 && r != -EAGAIN) {
5131 if (rval)
5132 *rval = r;
5133 return;
5134 }
5135
5136 if (rval)
5137 *rval = 0;
5138
5139 try {
5140 decode();
5141 } catch (buffer::error&) {
5142 if (rval)
5143 *rval = -EIO;
5144 }
5145 }
5146 private:
5147 void decode() {
5148 scrub_ls_result_t result;
5149 auto p = bl.begin();
5150 result.decode(p);
5151 *interval = result.interval;
5152 if (objects) {
5153 do_decode(*objects, result.vals);
5154 } else {
5155 do_decode(*snapsets, result.vals);
5156 }
5157 }
5158 };
5159
5160 template <typename T>
5161 void do_scrub_ls(::ObjectOperation *op,
5162 const scrub_ls_arg_t& arg,
5163 std::vector<T> *items,
5164 uint32_t *interval,
5165 int *rval)
5166 {
5167 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5168 op->flags |= CEPH_OSD_FLAG_PGOP;
5169 assert(interval);
5170 arg.encode(osd_op.indata);
5171 unsigned p = op->ops.size() - 1;
5172 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5173 op->out_handler[p] = h;
5174 op->out_bl[p] = &h->bl;
5175 op->out_rval[p] = rval;
5176 }
5177}
5178
5179void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5180 uint64_t max_to_get,
5181 std::vector<librados::inconsistent_obj_t> *objects,
5182 uint32_t *interval,
5183 int *rval)
5184{
5185 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5186 do_scrub_ls(this, arg, objects, interval, rval);
5187}
5188
5189void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5190 uint64_t max_to_get,
5191 std::vector<librados::inconsistent_snapset_t> *snapsets,
5192 uint32_t *interval,
5193 int *rval)
5194{
5195 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5196 do_scrub_ls(this, arg, snapsets, interval, rval);
5197}