]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/Objecter.cc
c93370ac249f4670d66c198a463bcb78b3796e7d
[ceph.git] / ceph / src / osdc / Objecter.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "Objecter.h"
16 #include "osd/OSDMap.h"
17 #include "Filer.h"
18
19 #include "mon/MonClient.h"
20
21 #include "msg/Messenger.h"
22 #include "msg/Message.h"
23
24 #include "messages/MPing.h"
25 #include "messages/MOSDOp.h"
26 #include "messages/MOSDOpReply.h"
27 #include "messages/MOSDBackoff.h"
28 #include "messages/MOSDMap.h"
29
30 #include "messages/MPoolOp.h"
31 #include "messages/MPoolOpReply.h"
32
33 #include "messages/MGetPoolStats.h"
34 #include "messages/MGetPoolStatsReply.h"
35 #include "messages/MStatfs.h"
36 #include "messages/MStatfsReply.h"
37
38 #include "messages/MMonCommand.h"
39
40 #include "messages/MCommand.h"
41 #include "messages/MCommandReply.h"
42
43 #include "messages/MWatchNotify.h"
44
45 #include <errno.h>
46
47 #include "common/config.h"
48 #include "common/perf_counters.h"
49 #include "common/scrub_types.h"
50 #include "include/str_list.h"
51 #include "common/errno.h"
52 #include "common/EventTrace.h"
53
54 using ceph::real_time;
55 using ceph::real_clock;
56
57 using ceph::mono_clock;
58 using ceph::mono_time;
59
60 using ceph::timespan;
61
62
63 #define dout_subsys ceph_subsys_objecter
64 #undef dout_prefix
65 #define dout_prefix *_dout << messenger->get_myname() << ".objecter "
66
67
68 enum {
69 l_osdc_first = 123200,
70 l_osdc_op_active,
71 l_osdc_op_laggy,
72 l_osdc_op_send,
73 l_osdc_op_send_bytes,
74 l_osdc_op_resend,
75 l_osdc_op_reply,
76
77 l_osdc_op,
78 l_osdc_op_r,
79 l_osdc_op_w,
80 l_osdc_op_rmw,
81 l_osdc_op_pg,
82
83 l_osdc_osdop_stat,
84 l_osdc_osdop_create,
85 l_osdc_osdop_read,
86 l_osdc_osdop_write,
87 l_osdc_osdop_writefull,
88 l_osdc_osdop_writesame,
89 l_osdc_osdop_append,
90 l_osdc_osdop_zero,
91 l_osdc_osdop_truncate,
92 l_osdc_osdop_delete,
93 l_osdc_osdop_mapext,
94 l_osdc_osdop_sparse_read,
95 l_osdc_osdop_clonerange,
96 l_osdc_osdop_getxattr,
97 l_osdc_osdop_setxattr,
98 l_osdc_osdop_cmpxattr,
99 l_osdc_osdop_rmxattr,
100 l_osdc_osdop_resetxattrs,
101 l_osdc_osdop_tmap_up,
102 l_osdc_osdop_tmap_put,
103 l_osdc_osdop_tmap_get,
104 l_osdc_osdop_call,
105 l_osdc_osdop_watch,
106 l_osdc_osdop_notify,
107 l_osdc_osdop_src_cmpxattr,
108 l_osdc_osdop_pgls,
109 l_osdc_osdop_pgls_filter,
110 l_osdc_osdop_other,
111
112 l_osdc_linger_active,
113 l_osdc_linger_send,
114 l_osdc_linger_resend,
115 l_osdc_linger_ping,
116
117 l_osdc_poolop_active,
118 l_osdc_poolop_send,
119 l_osdc_poolop_resend,
120
121 l_osdc_poolstat_active,
122 l_osdc_poolstat_send,
123 l_osdc_poolstat_resend,
124
125 l_osdc_statfs_active,
126 l_osdc_statfs_send,
127 l_osdc_statfs_resend,
128
129 l_osdc_command_active,
130 l_osdc_command_send,
131 l_osdc_command_resend,
132
133 l_osdc_map_epoch,
134 l_osdc_map_full,
135 l_osdc_map_inc,
136
137 l_osdc_osd_sessions,
138 l_osdc_osd_session_open,
139 l_osdc_osd_session_close,
140 l_osdc_osd_laggy,
141
142 l_osdc_osdop_omap_wr,
143 l_osdc_osdop_omap_rd,
144 l_osdc_osdop_omap_del,
145
146 l_osdc_last,
147 };
148
149
150 // config obs ----------------------------
151
152 static const char *config_keys[] = {
153 "crush_location",
154 NULL
155 };
156
157 class Objecter::RequestStateHook : public AdminSocketHook {
158 Objecter *m_objecter;
159 public:
160 explicit RequestStateHook(Objecter *objecter);
161 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
162 bufferlist& out) override;
163 };
164
165 /**
166 * This is a more limited form of C_Contexts, but that requires
167 * a ceph_context which we don't have here.
168 */
169 class ObjectOperation::C_TwoContexts : public Context {
170 Context *first;
171 Context *second;
172 public:
173 C_TwoContexts(Context *first, Context *second) :
174 first(first), second(second) {}
175 void finish(int r) override {
176 first->complete(r);
177 second->complete(r);
178 first = NULL;
179 second = NULL;
180 }
181
182 ~C_TwoContexts() override {
183 delete first;
184 delete second;
185 }
186 };
187
188 void ObjectOperation::add_handler(Context *extra) {
189 size_t last = out_handler.size() - 1;
190 Context *orig = out_handler[last];
191 if (orig) {
192 Context *wrapper = new C_TwoContexts(orig, extra);
193 out_handler[last] = wrapper;
194 } else {
195 out_handler[last] = extra;
196 }
197 }
198
199 Objecter::OSDSession::unique_completion_lock Objecter::OSDSession::get_lock(
200 object_t& oid)
201 {
202 if (oid.name.empty())
203 return unique_completion_lock();
204
205 static constexpr uint32_t HASH_PRIME = 1021;
206 uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size())
207 % HASH_PRIME;
208
209 return unique_completion_lock(completion_locks[h % num_locks],
210 std::defer_lock);
211 }
212
213 const char** Objecter::get_tracked_conf_keys() const
214 {
215 return config_keys;
216 }
217
218
219 void Objecter::handle_conf_change(const struct md_config_t *conf,
220 const std::set <std::string> &changed)
221 {
222 if (changed.count("crush_location")) {
223 update_crush_location();
224 }
225 }
226
227 void Objecter::update_crush_location()
228 {
229 unique_lock wl(rwlock);
230 crush_location = cct->crush_location.get_location();
231 }
232
233 // messages ------------------------------
234
235 /*
236 * initialize only internal data structures, don't initiate cluster interaction
237 */
238 void Objecter::init()
239 {
240 assert(!initialized);
241
242 if (!logger) {
243 PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
244
245 pcb.add_u64(l_osdc_op_active, "op_active", "Operations active", "actv",
246 PerfCountersBuilder::PRIO_CRITICAL);
247 pcb.add_u64(l_osdc_op_laggy, "op_laggy", "Laggy operations");
248 pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
249 pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data", NULL, 0, unit_t(BYTES));
250 pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
251 pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
252
253 pcb.add_u64_counter(l_osdc_op, "op", "Operations");
254 pcb.add_u64_counter(l_osdc_op_r, "op_r", "Read operations", "rd",
255 PerfCountersBuilder::PRIO_CRITICAL);
256 pcb.add_u64_counter(l_osdc_op_w, "op_w", "Write operations", "wr",
257 PerfCountersBuilder::PRIO_CRITICAL);
258 pcb.add_u64_counter(l_osdc_op_rmw, "op_rmw", "Read-modify-write operations",
259 "rdwr", PerfCountersBuilder::PRIO_INTERESTING);
260 pcb.add_u64_counter(l_osdc_op_pg, "op_pg", "PG operation");
261
262 pcb.add_u64_counter(l_osdc_osdop_stat, "osdop_stat", "Stat operations");
263 pcb.add_u64_counter(l_osdc_osdop_create, "osdop_create",
264 "Create object operations");
265 pcb.add_u64_counter(l_osdc_osdop_read, "osdop_read", "Read operations");
266 pcb.add_u64_counter(l_osdc_osdop_write, "osdop_write", "Write operations");
267 pcb.add_u64_counter(l_osdc_osdop_writefull, "osdop_writefull",
268 "Write full object operations");
269 pcb.add_u64_counter(l_osdc_osdop_writesame, "osdop_writesame",
270 "Write same operations");
271 pcb.add_u64_counter(l_osdc_osdop_append, "osdop_append",
272 "Append operation");
273 pcb.add_u64_counter(l_osdc_osdop_zero, "osdop_zero",
274 "Set object to zero operations");
275 pcb.add_u64_counter(l_osdc_osdop_truncate, "osdop_truncate",
276 "Truncate object operations");
277 pcb.add_u64_counter(l_osdc_osdop_delete, "osdop_delete",
278 "Delete object operations");
279 pcb.add_u64_counter(l_osdc_osdop_mapext, "osdop_mapext",
280 "Map extent operations");
281 pcb.add_u64_counter(l_osdc_osdop_sparse_read, "osdop_sparse_read",
282 "Sparse read operations");
283 pcb.add_u64_counter(l_osdc_osdop_clonerange, "osdop_clonerange",
284 "Clone range operations");
285 pcb.add_u64_counter(l_osdc_osdop_getxattr, "osdop_getxattr",
286 "Get xattr operations");
287 pcb.add_u64_counter(l_osdc_osdop_setxattr, "osdop_setxattr",
288 "Set xattr operations");
289 pcb.add_u64_counter(l_osdc_osdop_cmpxattr, "osdop_cmpxattr",
290 "Xattr comparison operations");
291 pcb.add_u64_counter(l_osdc_osdop_rmxattr, "osdop_rmxattr",
292 "Remove xattr operations");
293 pcb.add_u64_counter(l_osdc_osdop_resetxattrs, "osdop_resetxattrs",
294 "Reset xattr operations");
295 pcb.add_u64_counter(l_osdc_osdop_tmap_up, "osdop_tmap_up",
296 "TMAP update operations");
297 pcb.add_u64_counter(l_osdc_osdop_tmap_put, "osdop_tmap_put",
298 "TMAP put operations");
299 pcb.add_u64_counter(l_osdc_osdop_tmap_get, "osdop_tmap_get",
300 "TMAP get operations");
301 pcb.add_u64_counter(l_osdc_osdop_call, "osdop_call",
302 "Call (execute) operations");
303 pcb.add_u64_counter(l_osdc_osdop_watch, "osdop_watch",
304 "Watch by object operations");
305 pcb.add_u64_counter(l_osdc_osdop_notify, "osdop_notify",
306 "Notify about object operations");
307 pcb.add_u64_counter(l_osdc_osdop_src_cmpxattr, "osdop_src_cmpxattr",
308 "Extended attribute comparison in multi operations");
309 pcb.add_u64_counter(l_osdc_osdop_pgls, "osdop_pgls");
310 pcb.add_u64_counter(l_osdc_osdop_pgls_filter, "osdop_pgls_filter");
311 pcb.add_u64_counter(l_osdc_osdop_other, "osdop_other", "Other operations");
312
313 pcb.add_u64(l_osdc_linger_active, "linger_active",
314 "Active lingering operations");
315 pcb.add_u64_counter(l_osdc_linger_send, "linger_send",
316 "Sent lingering operations");
317 pcb.add_u64_counter(l_osdc_linger_resend, "linger_resend",
318 "Resent lingering operations");
319 pcb.add_u64_counter(l_osdc_linger_ping, "linger_ping",
320 "Sent pings to lingering operations");
321
322 pcb.add_u64(l_osdc_poolop_active, "poolop_active",
323 "Active pool operations");
324 pcb.add_u64_counter(l_osdc_poolop_send, "poolop_send",
325 "Sent pool operations");
326 pcb.add_u64_counter(l_osdc_poolop_resend, "poolop_resend",
327 "Resent pool operations");
328
329 pcb.add_u64(l_osdc_poolstat_active, "poolstat_active",
330 "Active get pool stat operations");
331 pcb.add_u64_counter(l_osdc_poolstat_send, "poolstat_send",
332 "Pool stat operations sent");
333 pcb.add_u64_counter(l_osdc_poolstat_resend, "poolstat_resend",
334 "Resent pool stats");
335
336 pcb.add_u64(l_osdc_statfs_active, "statfs_active", "Statfs operations");
337 pcb.add_u64_counter(l_osdc_statfs_send, "statfs_send", "Sent FS stats");
338 pcb.add_u64_counter(l_osdc_statfs_resend, "statfs_resend",
339 "Resent FS stats");
340
341 pcb.add_u64(l_osdc_command_active, "command_active", "Active commands");
342 pcb.add_u64_counter(l_osdc_command_send, "command_send",
343 "Sent commands");
344 pcb.add_u64_counter(l_osdc_command_resend, "command_resend",
345 "Resent commands");
346
347 pcb.add_u64(l_osdc_map_epoch, "map_epoch", "OSD map epoch");
348 pcb.add_u64_counter(l_osdc_map_full, "map_full",
349 "Full OSD maps received");
350 pcb.add_u64_counter(l_osdc_map_inc, "map_inc",
351 "Incremental OSD maps received");
352
353 pcb.add_u64(l_osdc_osd_sessions, "osd_sessions",
354 "Open sessions"); // open sessions
355 pcb.add_u64_counter(l_osdc_osd_session_open, "osd_session_open",
356 "Sessions opened");
357 pcb.add_u64_counter(l_osdc_osd_session_close, "osd_session_close",
358 "Sessions closed");
359 pcb.add_u64(l_osdc_osd_laggy, "osd_laggy", "Laggy OSD sessions");
360
361 pcb.add_u64_counter(l_osdc_osdop_omap_wr, "omap_wr",
362 "OSD OMAP write operations");
363 pcb.add_u64_counter(l_osdc_osdop_omap_rd, "omap_rd",
364 "OSD OMAP read operations");
365 pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del",
366 "OSD OMAP delete operations");
367
368 logger = pcb.create_perf_counters();
369 cct->get_perfcounters_collection()->add(logger);
370 }
371
372 m_request_state_hook = new RequestStateHook(this);
373 AdminSocket* admin_socket = cct->get_admin_socket();
374 int ret = admin_socket->register_command("objecter_requests",
375 "objecter_requests",
376 m_request_state_hook,
377 "show in-progress osd requests");
378
379 /* Don't warn on EEXIST, happens if multiple ceph clients
380 * are instantiated from one process */
381 if (ret < 0 && ret != -EEXIST) {
382 lderr(cct) << "error registering admin socket command: "
383 << cpp_strerror(ret) << dendl;
384 }
385
386 update_crush_location();
387
388 cct->_conf->add_observer(this);
389
390 initialized = true;
391 }
392
393 /*
394 * ok, cluster interaction can happen
395 */
396 void Objecter::start(const OSDMap* o)
397 {
398 shared_lock rl(rwlock);
399
400 start_tick();
401 if (o) {
402 osdmap->deepish_copy_from(*o);
403 } else if (osdmap->get_epoch() == 0) {
404 _maybe_request_map();
405 }
406 }
407
408 void Objecter::shutdown()
409 {
410 assert(initialized);
411
412 unique_lock wl(rwlock);
413
414 initialized = false;
415
416 cct->_conf->remove_observer(this);
417
418 map<int,OSDSession*>::iterator p;
419 while (!osd_sessions.empty()) {
420 p = osd_sessions.begin();
421 close_session(p->second);
422 }
423
424 while(!check_latest_map_lingers.empty()) {
425 map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
426 i->second->put();
427 check_latest_map_lingers.erase(i->first);
428 }
429
430 while(!check_latest_map_ops.empty()) {
431 map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
432 i->second->put();
433 check_latest_map_ops.erase(i->first);
434 }
435
436 while(!check_latest_map_commands.empty()) {
437 map<ceph_tid_t, CommandOp*>::iterator i
438 = check_latest_map_commands.begin();
439 i->second->put();
440 check_latest_map_commands.erase(i->first);
441 }
442
443 while(!poolstat_ops.empty()) {
444 map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
445 delete i->second;
446 poolstat_ops.erase(i->first);
447 }
448
449 while(!statfs_ops.empty()) {
450 map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
451 delete i->second;
452 statfs_ops.erase(i->first);
453 }
454
455 while(!pool_ops.empty()) {
456 map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
457 delete i->second;
458 pool_ops.erase(i->first);
459 }
460
461 ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
462 while(!homeless_session->linger_ops.empty()) {
463 std::map<uint64_t, LingerOp*>::iterator i
464 = homeless_session->linger_ops.begin();
465 ldout(cct, 10) << " linger_op " << i->first << dendl;
466 LingerOp *lop = i->second;
467 {
468 OSDSession::unique_lock swl(homeless_session->lock);
469 _session_linger_op_remove(homeless_session, lop);
470 }
471 linger_ops.erase(lop->linger_id);
472 linger_ops_set.erase(lop);
473 lop->put();
474 }
475
476 while(!homeless_session->ops.empty()) {
477 std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
478 ldout(cct, 10) << " op " << i->first << dendl;
479 Op *op = i->second;
480 {
481 OSDSession::unique_lock swl(homeless_session->lock);
482 _session_op_remove(homeless_session, op);
483 }
484 op->put();
485 }
486
487 while(!homeless_session->command_ops.empty()) {
488 std::map<ceph_tid_t, CommandOp*>::iterator i
489 = homeless_session->command_ops.begin();
490 ldout(cct, 10) << " command_op " << i->first << dendl;
491 CommandOp *cop = i->second;
492 {
493 OSDSession::unique_lock swl(homeless_session->lock);
494 _session_command_op_remove(homeless_session, cop);
495 }
496 cop->put();
497 }
498
499 if (tick_event) {
500 if (timer.cancel_event(tick_event)) {
501 ldout(cct, 10) << " successfully canceled tick" << dendl;
502 }
503 tick_event = 0;
504 }
505
506 if (logger) {
507 cct->get_perfcounters_collection()->remove(logger);
508 delete logger;
509 logger = NULL;
510 }
511
512 // Let go of Objecter write lock so timer thread can shutdown
513 wl.unlock();
514
515 // Outside of lock to avoid cycle WRT calls to RequestStateHook
516 // This is safe because we guarantee no concurrent calls to
517 // shutdown() with the ::initialized check at start.
518 if (m_request_state_hook) {
519 AdminSocket* admin_socket = cct->get_admin_socket();
520 admin_socket->unregister_command("objecter_requests");
521 delete m_request_state_hook;
522 m_request_state_hook = NULL;
523 }
524 }
525
526 void Objecter::_send_linger(LingerOp *info,
527 shunique_lock& sul)
528 {
529 assert(sul.owns_lock() && sul.mutex() == &rwlock);
530
531 vector<OSDOp> opv;
532 Context *oncommit = NULL;
533 LingerOp::shared_lock watchl(info->watch_lock);
534 bufferlist *poutbl = NULL;
535 if (info->registered && info->is_watch) {
536 ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect"
537 << dendl;
538 opv.push_back(OSDOp());
539 opv.back().op.op = CEPH_OSD_OP_WATCH;
540 opv.back().op.watch.cookie = info->get_cookie();
541 opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
542 opv.back().op.watch.gen = ++info->register_gen;
543 oncommit = new C_Linger_Reconnect(this, info);
544 } else {
545 ldout(cct, 15) << "send_linger " << info->linger_id << " register"
546 << dendl;
547 opv = info->ops;
548 C_Linger_Commit *c = new C_Linger_Commit(this, info);
549 if (!info->is_watch) {
550 info->notify_id = 0;
551 poutbl = &c->outbl;
552 }
553 oncommit = c;
554 }
555 watchl.unlock();
556 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
557 opv, info->target.flags | CEPH_OSD_FLAG_READ,
558 oncommit, info->pobjver);
559 o->outbl = poutbl;
560 o->snapid = info->snap;
561 o->snapc = info->snapc;
562 o->mtime = info->mtime;
563
564 o->target = info->target;
565 o->tid = ++last_tid;
566
567 // do not resend this; we will send a new op to reregister
568 o->should_resend = false;
569
570 if (info->register_tid) {
571 // repeat send. cancel old registeration op, if any.
572 OSDSession::unique_lock sl(info->session->lock);
573 if (info->session->ops.count(info->register_tid)) {
574 Op *o = info->session->ops[info->register_tid];
575 _op_cancel_map_check(o);
576 _cancel_linger_op(o);
577 }
578 sl.unlock();
579
580 _op_submit(o, sul, &info->register_tid);
581 } else {
582 // first send
583 _op_submit_with_budget(o, sul, &info->register_tid);
584 }
585
586 logger->inc(l_osdc_linger_send);
587 }
588
589 void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
590 {
591 LingerOp::unique_lock wl(info->watch_lock);
592 ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
593 if (info->on_reg_commit) {
594 info->on_reg_commit->complete(r);
595 info->on_reg_commit = NULL;
596 }
597 if (r < 0 && info->on_notify_finish) {
598 info->on_notify_finish->complete(r);
599 info->on_notify_finish = nullptr;
600 }
601
602 // only tell the user the first time we do this
603 info->registered = true;
604 info->pobjver = NULL;
605
606 if (!info->is_watch) {
607 // make note of the notify_id
608 bufferlist::iterator p = outbl.begin();
609 try {
610 ::decode(info->notify_id, p);
611 ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id
612 << dendl;
613 }
614 catch (buffer::error& e) {
615 }
616 }
617 }
618
619 struct C_DoWatchError : public Context {
620 Objecter *objecter;
621 Objecter::LingerOp *info;
622 int err;
623 C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
624 : objecter(o), info(i), err(r) {
625 info->get();
626 info->_queued_async();
627 }
628 void finish(int r) override {
629 Objecter::unique_lock wl(objecter->rwlock);
630 bool canceled = info->canceled;
631 wl.unlock();
632
633 if (!canceled) {
634 info->watch_context->handle_error(info->get_cookie(), err);
635 }
636
637 info->finished_async();
638 info->put();
639 }
640 };
641
642 int Objecter::_normalize_watch_error(int r)
643 {
644 // translate ENOENT -> ENOTCONN so that a delete->disconnection
645 // notification and a failure to reconnect becuase we raced with
646 // the delete appear the same to the user.
647 if (r == -ENOENT)
648 r = -ENOTCONN;
649 return r;
650 }
651
652 void Objecter::_linger_reconnect(LingerOp *info, int r)
653 {
654 ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
655 << " (last_error " << info->last_error << ")" << dendl;
656 if (r < 0) {
657 LingerOp::unique_lock wl(info->watch_lock);
658 if (!info->last_error) {
659 r = _normalize_watch_error(r);
660 info->last_error = r;
661 if (info->watch_context) {
662 finisher->queue(new C_DoWatchError(this, info, r));
663 }
664 }
665 wl.unlock();
666 }
667 }
668
669 void Objecter::_send_linger_ping(LingerOp *info)
670 {
671 // rwlock is locked unique
672 // info->session->lock is locked
673
674 if (cct->_conf->objecter_inject_no_watch_ping) {
675 ldout(cct, 10) << __func__ << " " << info->linger_id << " SKIPPING"
676 << dendl;
677 return;
678 }
679 if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
680 ldout(cct, 10) << __func__ << " PAUSERD" << dendl;
681 return;
682 }
683
684 ceph::mono_time now = ceph::mono_clock::now();
685 ldout(cct, 10) << __func__ << " " << info->linger_id << " now " << now
686 << dendl;
687
688 vector<OSDOp> opv(1);
689 opv[0].op.op = CEPH_OSD_OP_WATCH;
690 opv[0].op.watch.cookie = info->get_cookie();
691 opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
692 opv[0].op.watch.gen = info->register_gen;
693 C_Linger_Ping *onack = new C_Linger_Ping(this, info);
694 Op *o = new Op(info->target.base_oid, info->target.base_oloc,
695 opv, info->target.flags | CEPH_OSD_FLAG_READ,
696 onack, NULL, NULL);
697 o->target = info->target;
698 o->should_resend = false;
699 _send_op_account(o);
700 MOSDOp *m = _prepare_osd_op(o);
701 o->tid = ++last_tid;
702 _session_op_assign(info->session, o);
703 _send_op(o, m);
704 info->ping_tid = o->tid;
705
706 onack->sent = now;
707 logger->inc(l_osdc_linger_ping);
708 }
709
710 void Objecter::_linger_ping(LingerOp *info, int r, mono_time sent,
711 uint32_t register_gen)
712 {
713 LingerOp::unique_lock l(info->watch_lock);
714 ldout(cct, 10) << __func__ << " " << info->linger_id
715 << " sent " << sent << " gen " << register_gen << " = " << r
716 << " (last_error " << info->last_error
717 << " register_gen " << info->register_gen << ")" << dendl;
718 if (info->register_gen == register_gen) {
719 if (r == 0) {
720 info->watch_valid_thru = sent;
721 } else if (r < 0 && !info->last_error) {
722 r = _normalize_watch_error(r);
723 info->last_error = r;
724 if (info->watch_context) {
725 finisher->queue(new C_DoWatchError(this, info, r));
726 }
727 }
728 } else {
729 ldout(cct, 20) << " ignoring old gen" << dendl;
730 }
731 }
732
733 int Objecter::linger_check(LingerOp *info)
734 {
735 LingerOp::shared_lock l(info->watch_lock);
736
737 mono_time stamp = info->watch_valid_thru;
738 if (!info->watch_pending_async.empty())
739 stamp = MIN(info->watch_valid_thru, info->watch_pending_async.front());
740 auto age = mono_clock::now() - stamp;
741
742 ldout(cct, 10) << __func__ << " " << info->linger_id
743 << " err " << info->last_error
744 << " age " << age << dendl;
745 if (info->last_error)
746 return info->last_error;
747 // return a safe upper bound (we are truncating to ms)
748 return
749 1 + std::chrono::duration_cast<std::chrono::milliseconds>(age).count();
750 }
751
752 void Objecter::linger_cancel(LingerOp *info)
753 {
754 unique_lock wl(rwlock);
755 _linger_cancel(info);
756 info->put();
757 }
758
759 void Objecter::_linger_cancel(LingerOp *info)
760 {
761 // rwlock is locked unique
762 ldout(cct, 20) << __func__ << " linger_id=" << info->linger_id << dendl;
763 if (!info->canceled) {
764 OSDSession *s = info->session;
765 OSDSession::unique_lock sl(s->lock);
766 _session_linger_op_remove(s, info);
767 sl.unlock();
768
769 linger_ops.erase(info->linger_id);
770 linger_ops_set.erase(info);
771 assert(linger_ops.size() == linger_ops_set.size());
772
773 info->canceled = true;
774 info->put();
775
776 logger->dec(l_osdc_linger_active);
777 }
778 }
779
780
781
782 Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
783 const object_locator_t& oloc,
784 int flags)
785 {
786 LingerOp *info = new LingerOp;
787 info->target.base_oid = oid;
788 info->target.base_oloc = oloc;
789 if (info->target.base_oloc.key == oid)
790 info->target.base_oloc.key.clear();
791 info->target.flags = flags;
792 info->watch_valid_thru = mono_clock::now();
793
794 unique_lock l(rwlock);
795
796 // Acquire linger ID
797 info->linger_id = ++max_linger_id;
798 ldout(cct, 10) << __func__ << " info " << info
799 << " linger_id " << info->linger_id
800 << " cookie " << info->get_cookie()
801 << dendl;
802 linger_ops[info->linger_id] = info;
803 linger_ops_set.insert(info);
804 assert(linger_ops.size() == linger_ops_set.size());
805
806 info->get(); // for the caller
807 return info;
808 }
809
810 ceph_tid_t Objecter::linger_watch(LingerOp *info,
811 ObjectOperation& op,
812 const SnapContext& snapc,
813 real_time mtime,
814 bufferlist& inbl,
815 Context *oncommit,
816 version_t *objver)
817 {
818 info->is_watch = true;
819 info->snapc = snapc;
820 info->mtime = mtime;
821 info->target.flags |= CEPH_OSD_FLAG_WRITE;
822 info->ops = op.ops;
823 info->inbl = inbl;
824 info->poutbl = NULL;
825 info->pobjver = objver;
826 info->on_reg_commit = oncommit;
827
828 shunique_lock sul(rwlock, ceph::acquire_unique);
829 _linger_submit(info, sul);
830 logger->inc(l_osdc_linger_active);
831
832 return info->linger_id;
833 }
834
835 ceph_tid_t Objecter::linger_notify(LingerOp *info,
836 ObjectOperation& op,
837 snapid_t snap, bufferlist& inbl,
838 bufferlist *poutbl,
839 Context *onfinish,
840 version_t *objver)
841 {
842 info->snap = snap;
843 info->target.flags |= CEPH_OSD_FLAG_READ;
844 info->ops = op.ops;
845 info->inbl = inbl;
846 info->poutbl = poutbl;
847 info->pobjver = objver;
848 info->on_reg_commit = onfinish;
849
850 shunique_lock sul(rwlock, ceph::acquire_unique);
851 _linger_submit(info, sul);
852 logger->inc(l_osdc_linger_active);
853
854 return info->linger_id;
855 }
856
857 void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
858 {
859 assert(sul.owns_lock() && sul.mutex() == &rwlock);
860 assert(info->linger_id);
861
862 // Populate Op::target
863 OSDSession *s = NULL;
864 _calc_target(&info->target, nullptr);
865
866 // Create LingerOp<->OSDSession relation
867 int r = _get_session(info->target.osd, &s, sul);
868 assert(r == 0);
869 OSDSession::unique_lock sl(s->lock);
870 _session_linger_op_assign(s, info);
871 sl.unlock();
872 put_session(s);
873
874 _send_linger(info, sul);
875 }
876
877 struct C_DoWatchNotify : public Context {
878 Objecter *objecter;
879 Objecter::LingerOp *info;
880 MWatchNotify *msg;
881 C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
882 : objecter(o), info(i), msg(m) {
883 info->get();
884 info->_queued_async();
885 msg->get();
886 }
887 void finish(int r) override {
888 objecter->_do_watch_notify(info, msg);
889 }
890 };
891
892 void Objecter::handle_watch_notify(MWatchNotify *m)
893 {
894 shared_lock l(rwlock);
895 if (!initialized) {
896 return;
897 }
898
899 LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
900 if (linger_ops_set.count(info) == 0) {
901 ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
902 return;
903 }
904 LingerOp::unique_lock wl(info->watch_lock);
905 if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
906 if (!info->last_error) {
907 info->last_error = -ENOTCONN;
908 if (info->watch_context) {
909 finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
910 }
911 }
912 } else if (!info->is_watch) {
913 // we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline
914 // since we know the only user (librados) is safe to call in
915 // fast-dispatch context
916 if (info->notify_id &&
917 info->notify_id != m->notify_id) {
918 ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
919 << " != " << info->notify_id << ", ignoring" << dendl;
920 } else if (info->on_notify_finish) {
921 info->notify_result_bl->claim(m->get_data());
922 info->on_notify_finish->complete(m->return_code);
923
924 // if we race with reconnect we might get a second notify; only
925 // notify the caller once!
926 info->on_notify_finish = NULL;
927 }
928 } else {
929 finisher->queue(new C_DoWatchNotify(this, info, m));
930 }
931 }
932
933 void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
934 {
935 ldout(cct, 10) << __func__ << " " << *m << dendl;
936
937 shared_lock l(rwlock);
938 assert(initialized);
939
940 if (info->canceled) {
941 l.unlock();
942 goto out;
943 }
944
945 // notify completion?
946 assert(info->is_watch);
947 assert(info->watch_context);
948 assert(m->opcode != CEPH_WATCH_EVENT_DISCONNECT);
949
950 l.unlock();
951
952 switch (m->opcode) {
953 case CEPH_WATCH_EVENT_NOTIFY:
954 info->watch_context->handle_notify(m->notify_id, m->cookie,
955 m->notifier_gid, m->bl);
956 break;
957 }
958
959 out:
960 info->finished_async();
961 info->put();
962 m->put();
963 }
964
965 bool Objecter::ms_dispatch(Message *m)
966 {
967 ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
968 if (!initialized)
969 return false;
970
971 switch (m->get_type()) {
972 // these we exlusively handle
973 case CEPH_MSG_OSD_OPREPLY:
974 handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
975 return true;
976
977 case CEPH_MSG_OSD_BACKOFF:
978 handle_osd_backoff(static_cast<MOSDBackoff*>(m));
979 return true;
980
981 case CEPH_MSG_WATCH_NOTIFY:
982 handle_watch_notify(static_cast<MWatchNotify*>(m));
983 m->put();
984 return true;
985
986 case MSG_COMMAND_REPLY:
987 if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
988 handle_command_reply(static_cast<MCommandReply*>(m));
989 return true;
990 } else {
991 return false;
992 }
993
994 case MSG_GETPOOLSTATSREPLY:
995 handle_get_pool_stats_reply(static_cast<MGetPoolStatsReply*>(m));
996 return true;
997
998 case CEPH_MSG_POOLOP_REPLY:
999 handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
1000 return true;
1001
1002 case CEPH_MSG_STATFS_REPLY:
1003 handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
1004 return true;
1005
1006 // these we give others a chance to inspect
1007
1008 // MDS, OSD
1009 case CEPH_MSG_OSD_MAP:
1010 handle_osd_map(static_cast<MOSDMap*>(m));
1011 return false;
1012 }
1013 return false;
1014 }
1015
1016 void Objecter::_scan_requests(OSDSession *s,
1017 bool force_resend,
1018 bool cluster_full,
1019 map<int64_t, bool> *pool_full_map,
1020 map<ceph_tid_t, Op*>& need_resend,
1021 list<LingerOp*>& need_resend_linger,
1022 map<ceph_tid_t, CommandOp*>& need_resend_command,
1023 shunique_lock& sul)
1024 {
1025 assert(sul.owns_lock() && sul.mutex() == &rwlock);
1026
1027 list<LingerOp*> unregister_lingers;
1028
1029 OSDSession::unique_lock sl(s->lock);
1030
1031 // check for changed linger mappings (_before_ regular ops)
1032 map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
1033 while (lp != s->linger_ops.end()) {
1034 LingerOp *op = lp->second;
1035 assert(op->session == s);
1036 // check_linger_pool_dne() may touch linger_ops; prevent iterator
1037 // invalidation
1038 ++lp;
1039 ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
1040 bool unregister, force_resend_writes = cluster_full;
1041 int r = _recalc_linger_op_target(op, sul);
1042 if (pool_full_map)
1043 force_resend_writes = force_resend_writes ||
1044 (*pool_full_map)[op->target.base_oloc.pool];
1045 switch (r) {
1046 case RECALC_OP_TARGET_NO_ACTION:
1047 if (!force_resend && !force_resend_writes)
1048 break;
1049 // -- fall-thru --
1050 case RECALC_OP_TARGET_NEED_RESEND:
1051 need_resend_linger.push_back(op);
1052 _linger_cancel_map_check(op);
1053 break;
1054 case RECALC_OP_TARGET_POOL_DNE:
1055 _check_linger_pool_dne(op, &unregister);
1056 if (unregister) {
1057 ldout(cct, 10) << " need to unregister linger op "
1058 << op->linger_id << dendl;
1059 op->get();
1060 unregister_lingers.push_back(op);
1061 }
1062 break;
1063 }
1064 }
1065
1066 // check for changed request mappings
1067 map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
1068 while (p != s->ops.end()) {
1069 Op *op = p->second;
1070 ++p; // check_op_pool_dne() may touch ops; prevent iterator invalidation
1071 ldout(cct, 10) << " checking op " << op->tid << dendl;
1072 bool force_resend_writes = cluster_full;
1073 if (pool_full_map)
1074 force_resend_writes = force_resend_writes ||
1075 (*pool_full_map)[op->target.base_oloc.pool];
1076 int r = _calc_target(&op->target,
1077 op->session ? op->session->con.get() : nullptr);
1078 switch (r) {
1079 case RECALC_OP_TARGET_NO_ACTION:
1080 if (!force_resend && !(force_resend_writes && op->respects_full()))
1081 break;
1082 // -- fall-thru --
1083 case RECALC_OP_TARGET_NEED_RESEND:
1084 if (op->session) {
1085 _session_op_remove(op->session, op);
1086 }
1087 need_resend[op->tid] = op;
1088 _op_cancel_map_check(op);
1089 break;
1090 case RECALC_OP_TARGET_POOL_DNE:
1091 _check_op_pool_dne(op, &sl);
1092 break;
1093 }
1094 }
1095
1096 // commands
1097 map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
1098 while (cp != s->command_ops.end()) {
1099 CommandOp *c = cp->second;
1100 ++cp;
1101 ldout(cct, 10) << " checking command " << c->tid << dendl;
1102 bool force_resend_writes = cluster_full;
1103 if (pool_full_map)
1104 force_resend_writes = force_resend_writes ||
1105 (*pool_full_map)[c->target_pg.pool()];
1106 int r = _calc_command_target(c, sul);
1107 switch (r) {
1108 case RECALC_OP_TARGET_NO_ACTION:
1109 // resend if skipped map; otherwise do nothing.
1110 if (!force_resend && !force_resend_writes)
1111 break;
1112 // -- fall-thru --
1113 case RECALC_OP_TARGET_NEED_RESEND:
1114 need_resend_command[c->tid] = c;
1115 if (c->session) {
1116 _session_command_op_remove(c->session, c);
1117 }
1118 _command_cancel_map_check(c);
1119 break;
1120 case RECALC_OP_TARGET_POOL_DNE:
1121 case RECALC_OP_TARGET_OSD_DNE:
1122 case RECALC_OP_TARGET_OSD_DOWN:
1123 _check_command_map_dne(c);
1124 break;
1125 }
1126 }
1127
1128 sl.unlock();
1129
1130 for (list<LingerOp*>::iterator iter = unregister_lingers.begin();
1131 iter != unregister_lingers.end();
1132 ++iter) {
1133 _linger_cancel(*iter);
1134 (*iter)->put();
1135 }
1136 }
1137
1138 void Objecter::handle_osd_map(MOSDMap *m)
1139 {
1140 shunique_lock sul(rwlock, acquire_unique);
1141 if (!initialized)
1142 return;
1143
1144 assert(osdmap);
1145
1146 if (m->fsid != monc->get_fsid()) {
1147 ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
1148 << " != " << monc->get_fsid() << dendl;
1149 return;
1150 }
1151
1152 bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1153 bool cluster_full = _osdmap_full_flag();
1154 bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full ||
1155 _osdmap_has_pool_full();
1156 map<int64_t, bool> pool_full_map;
1157 for (map<int64_t, pg_pool_t>::const_iterator it
1158 = osdmap->get_pools().begin();
1159 it != osdmap->get_pools().end(); ++it)
1160 pool_full_map[it->first] = _osdmap_pool_full(it->second);
1161
1162
1163 list<LingerOp*> need_resend_linger;
1164 map<ceph_tid_t, Op*> need_resend;
1165 map<ceph_tid_t, CommandOp*> need_resend_command;
1166
1167 if (m->get_last() <= osdmap->get_epoch()) {
1168 ldout(cct, 3) << "handle_osd_map ignoring epochs ["
1169 << m->get_first() << "," << m->get_last()
1170 << "] <= " << osdmap->get_epoch() << dendl;
1171 } else {
1172 ldout(cct, 3) << "handle_osd_map got epochs ["
1173 << m->get_first() << "," << m->get_last()
1174 << "] > " << osdmap->get_epoch() << dendl;
1175
1176 if (osdmap->get_epoch()) {
1177 bool skipped_map = false;
1178 // we want incrementals
1179 for (epoch_t e = osdmap->get_epoch() + 1;
1180 e <= m->get_last();
1181 e++) {
1182
1183 if (osdmap->get_epoch() == e-1 &&
1184 m->incremental_maps.count(e)) {
1185 ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
1186 << dendl;
1187 OSDMap::Incremental inc(m->incremental_maps[e]);
1188 osdmap->apply_incremental(inc);
1189
1190 emit_blacklist_events(inc);
1191
1192 logger->inc(l_osdc_map_inc);
1193 }
1194 else if (m->maps.count(e)) {
1195 ldout(cct, 3) << "handle_osd_map decoding full epoch " << e << dendl;
1196 OSDMap *new_osdmap = new OSDMap();
1197 new_osdmap->decode(m->maps[e]);
1198
1199 emit_blacklist_events(*osdmap, *new_osdmap);
1200
1201 osdmap = new_osdmap;
1202
1203 logger->inc(l_osdc_map_full);
1204 }
1205 else {
1206 if (e >= m->get_oldest()) {
1207 ldout(cct, 3) << "handle_osd_map requesting missing epoch "
1208 << osdmap->get_epoch()+1 << dendl;
1209 _maybe_request_map();
1210 break;
1211 }
1212 ldout(cct, 3) << "handle_osd_map missing epoch "
1213 << osdmap->get_epoch()+1
1214 << ", jumping to " << m->get_oldest() << dendl;
1215 e = m->get_oldest() - 1;
1216 skipped_map = true;
1217 continue;
1218 }
1219 logger->set(l_osdc_map_epoch, osdmap->get_epoch());
1220
1221 cluster_full = cluster_full || _osdmap_full_flag();
1222 update_pool_full_map(pool_full_map);
1223
1224 // check all outstanding requests on every epoch
1225 _scan_requests(homeless_session, skipped_map, cluster_full,
1226 &pool_full_map, need_resend,
1227 need_resend_linger, need_resend_command, sul);
1228 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1229 p != osd_sessions.end(); ) {
1230 OSDSession *s = p->second;
1231 _scan_requests(s, skipped_map, cluster_full,
1232 &pool_full_map, need_resend,
1233 need_resend_linger, need_resend_command, sul);
1234 ++p;
1235 // osd down or addr change?
1236 if (!osdmap->is_up(s->osd) ||
1237 (s->con &&
1238 s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
1239 close_session(s);
1240 }
1241 }
1242
1243 assert(e == osdmap->get_epoch());
1244 }
1245
1246 } else {
1247 // first map. we want the full thing.
1248 if (m->maps.count(m->get_last())) {
1249 for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
1250 p != osd_sessions.end(); ++p) {
1251 OSDSession *s = p->second;
1252 _scan_requests(s, false, false, NULL, need_resend,
1253 need_resend_linger, need_resend_command, sul);
1254 }
1255 ldout(cct, 3) << "handle_osd_map decoding full epoch "
1256 << m->get_last() << dendl;
1257 osdmap->decode(m->maps[m->get_last()]);
1258
1259 _scan_requests(homeless_session, false, false, NULL,
1260 need_resend, need_resend_linger,
1261 need_resend_command, sul);
1262 } else {
1263 ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
1264 << dendl;
1265 monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
1266 monc->renew_subs();
1267 }
1268 }
1269 }
1270
1271 // make sure need_resend targets reflect latest map
1272 for (auto p = need_resend.begin(); p != need_resend.end(); ) {
1273 Op *op = p->second;
1274 if (op->target.epoch < osdmap->get_epoch()) {
1275 ldout(cct, 10) << __func__ << " checking op " << p->first << dendl;
1276 int r = _calc_target(&op->target, nullptr);
1277 if (r == RECALC_OP_TARGET_POOL_DNE) {
1278 p = need_resend.erase(p);
1279 _check_op_pool_dne(op, nullptr);
1280 } else {
1281 ++p;
1282 }
1283 } else {
1284 ++p;
1285 }
1286 }
1287
1288 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
1289 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag()
1290 || _osdmap_has_pool_full();
1291
1292 // was/is paused?
1293 if (was_pauserd || was_pausewr || pauserd || pausewr ||
1294 osdmap->get_epoch() < epoch_barrier) {
1295 _maybe_request_map();
1296 }
1297
1298 // resend requests
1299 for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
1300 p != need_resend.end(); ++p) {
1301 Op *op = p->second;
1302 OSDSession *s = op->session;
1303 bool mapped_session = false;
1304 if (!s) {
1305 int r = _map_session(&op->target, &s, sul);
1306 assert(r == 0);
1307 mapped_session = true;
1308 } else {
1309 get_session(s);
1310 }
1311 OSDSession::unique_lock sl(s->lock);
1312 if (mapped_session) {
1313 _session_op_assign(s, op);
1314 }
1315 if (op->should_resend) {
1316 if (!op->session->is_homeless() && !op->target.paused) {
1317 logger->inc(l_osdc_op_resend);
1318 _send_op(op);
1319 }
1320 } else {
1321 _op_cancel_map_check(op);
1322 _cancel_linger_op(op);
1323 }
1324 sl.unlock();
1325 put_session(s);
1326 }
1327 for (list<LingerOp*>::iterator p = need_resend_linger.begin();
1328 p != need_resend_linger.end(); ++p) {
1329 LingerOp *op = *p;
1330 if (!op->session) {
1331 _calc_target(&op->target, nullptr);
1332 OSDSession *s = NULL;
1333 const int r = _get_session(op->target.osd, &s, sul);
1334 assert(r == 0);
1335 assert(s != NULL);
1336 op->session = s;
1337 put_session(s);
1338 }
1339 if (!op->session->is_homeless()) {
1340 logger->inc(l_osdc_linger_resend);
1341 _send_linger(op, sul);
1342 }
1343 }
1344 for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
1345 p != need_resend_command.end(); ++p) {
1346 CommandOp *c = p->second;
1347 if (c->target.osd >= 0) {
1348 _assign_command_session(c, sul);
1349 if (c->session && !c->session->is_homeless()) {
1350 _send_command(c);
1351 }
1352 }
1353 }
1354
1355 _dump_active();
1356
1357 // finish any Contexts that were waiting on a map update
1358 map<epoch_t,list< pair< Context*, int > > >::iterator p =
1359 waiting_for_map.begin();
1360 while (p != waiting_for_map.end() &&
1361 p->first <= osdmap->get_epoch()) {
1362 //go through the list and call the onfinish methods
1363 for (list<pair<Context*, int> >::iterator i = p->second.begin();
1364 i != p->second.end(); ++i) {
1365 i->first->complete(i->second);
1366 }
1367 waiting_for_map.erase(p++);
1368 }
1369
1370 monc->sub_got("osdmap", osdmap->get_epoch());
1371
1372 if (!waiting_for_map.empty()) {
1373 _maybe_request_map();
1374 }
1375 }
1376
1377 void Objecter::enable_blacklist_events()
1378 {
1379 unique_lock wl(rwlock);
1380
1381 blacklist_events_enabled = true;
1382 }
1383
1384 void Objecter::consume_blacklist_events(std::set<entity_addr_t> *events)
1385 {
1386 unique_lock wl(rwlock);
1387
1388 if (events->empty()) {
1389 events->swap(blacklist_events);
1390 } else {
1391 for (const auto &i : blacklist_events) {
1392 events->insert(i);
1393 }
1394 blacklist_events.clear();
1395 }
1396 }
1397
1398 void Objecter::emit_blacklist_events(const OSDMap::Incremental &inc)
1399 {
1400 if (!blacklist_events_enabled) {
1401 return;
1402 }
1403
1404 for (const auto &i : inc.new_blacklist) {
1405 blacklist_events.insert(i.first);
1406 }
1407 }
1408
1409 void Objecter::emit_blacklist_events(const OSDMap &old_osd_map,
1410 const OSDMap &new_osd_map)
1411 {
1412 if (!blacklist_events_enabled) {
1413 return;
1414 }
1415
1416 std::set<entity_addr_t> old_set;
1417 std::set<entity_addr_t> new_set;
1418
1419 old_osd_map.get_blacklist(&old_set);
1420 new_osd_map.get_blacklist(&new_set);
1421
1422 std::set<entity_addr_t> delta_set;
1423 std::set_difference(
1424 new_set.begin(), new_set.end(), old_set.begin(), old_set.end(),
1425 std::inserter(delta_set, delta_set.begin()));
1426 blacklist_events.insert(delta_set.begin(), delta_set.end());
1427 }
1428
1429 // op pool check
1430
1431 void Objecter::C_Op_Map_Latest::finish(int r)
1432 {
1433 if (r == -EAGAIN || r == -ECANCELED)
1434 return;
1435
1436 lgeneric_subdout(objecter->cct, objecter, 10)
1437 << "op_map_latest r=" << r << " tid=" << tid
1438 << " latest " << latest << dendl;
1439
1440 Objecter::unique_lock wl(objecter->rwlock);
1441
1442 map<ceph_tid_t, Op*>::iterator iter =
1443 objecter->check_latest_map_ops.find(tid);
1444 if (iter == objecter->check_latest_map_ops.end()) {
1445 lgeneric_subdout(objecter->cct, objecter, 10)
1446 << "op_map_latest op "<< tid << " not found" << dendl;
1447 return;
1448 }
1449
1450 Op *op = iter->second;
1451 objecter->check_latest_map_ops.erase(iter);
1452
1453 lgeneric_subdout(objecter->cct, objecter, 20)
1454 << "op_map_latest op "<< op << dendl;
1455
1456 if (op->map_dne_bound == 0)
1457 op->map_dne_bound = latest;
1458
1459 OSDSession::unique_lock sl(op->session->lock, defer_lock);
1460 objecter->_check_op_pool_dne(op, &sl);
1461
1462 op->put();
1463 }
1464
1465 int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name,
1466 snapid_t *snap) const
1467 {
1468 shared_lock rl(rwlock);
1469
1470 auto& pools = osdmap->get_pools();
1471 auto iter = pools.find(poolid);
1472 if (iter == pools.end()) {
1473 return -ENOENT;
1474 }
1475 const pg_pool_t& pg_pool = iter->second;
1476 for (auto p = pg_pool.snaps.begin();
1477 p != pg_pool.snaps.end();
1478 ++p) {
1479 if (p->second.name == snap_name) {
1480 *snap = p->first;
1481 return 0;
1482 }
1483 }
1484 return -ENOENT;
1485 }
1486
1487 int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap,
1488 pool_snap_info_t *info) const
1489 {
1490 shared_lock rl(rwlock);
1491
1492 auto& pools = osdmap->get_pools();
1493 auto iter = pools.find(poolid);
1494 if (iter == pools.end()) {
1495 return -ENOENT;
1496 }
1497 const pg_pool_t& pg_pool = iter->second;
1498 auto p = pg_pool.snaps.find(snap);
1499 if (p == pg_pool.snaps.end())
1500 return -ENOENT;
1501 *info = p->second;
1502
1503 return 0;
1504 }
1505
1506 int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
1507 {
1508 shared_lock rl(rwlock);
1509
1510 const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
1511 if (!pi)
1512 return -ENOENT;
1513 for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
1514 p != pi->snaps.end();
1515 ++p) {
1516 snaps->push_back(p->first);
1517 }
1518 return 0;
1519 }
1520
1521 // sl may be unlocked.
1522 void Objecter::_check_op_pool_dne(Op *op, unique_lock *sl)
1523 {
1524 // rwlock is locked unique
1525
1526 if (op->target.pool_ever_existed) {
1527 // the pool previously existed and now it does not, which means it
1528 // was deleted.
1529 op->map_dne_bound = osdmap->get_epoch();
1530 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1531 << " pool previously exists but now does not"
1532 << dendl;
1533 } else {
1534 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1535 << " current " << osdmap->get_epoch()
1536 << " map_dne_bound " << op->map_dne_bound
1537 << dendl;
1538 }
1539 if (op->map_dne_bound > 0) {
1540 if (osdmap->get_epoch() >= op->map_dne_bound) {
1541 // we had a new enough map
1542 ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
1543 << " concluding pool " << op->target.base_pgid.pool()
1544 << " dne" << dendl;
1545 if (op->onfinish) {
1546 op->onfinish->complete(-ENOENT);
1547 }
1548
1549 OSDSession *s = op->session;
1550 if (s) {
1551 assert(s != NULL);
1552 assert(sl->mutex() == &s->lock);
1553 bool session_locked = sl->owns_lock();
1554 if (!session_locked) {
1555 sl->lock();
1556 }
1557 _finish_op(op, 0);
1558 if (!session_locked) {
1559 sl->unlock();
1560 }
1561 } else {
1562 _finish_op(op, 0); // no session
1563 }
1564 }
1565 } else {
1566 _send_op_map_check(op);
1567 }
1568 }
1569
1570 void Objecter::_send_op_map_check(Op *op)
1571 {
1572 // rwlock is locked unique
1573 // ask the monitor
1574 if (check_latest_map_ops.count(op->tid) == 0) {
1575 op->get();
1576 check_latest_map_ops[op->tid] = op;
1577 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, op->tid);
1578 monc->get_version("osdmap", &c->latest, NULL, c);
1579 }
1580 }
1581
1582 void Objecter::_op_cancel_map_check(Op *op)
1583 {
1584 // rwlock is locked unique
1585 map<ceph_tid_t, Op*>::iterator iter =
1586 check_latest_map_ops.find(op->tid);
1587 if (iter != check_latest_map_ops.end()) {
1588 Op *op = iter->second;
1589 op->put();
1590 check_latest_map_ops.erase(iter);
1591 }
1592 }
1593
1594 // linger pool check
1595
1596 void Objecter::C_Linger_Map_Latest::finish(int r)
1597 {
1598 if (r == -EAGAIN || r == -ECANCELED) {
1599 // ignore callback; we will retry in resend_mon_ops()
1600 return;
1601 }
1602
1603 unique_lock wl(objecter->rwlock);
1604
1605 map<uint64_t, LingerOp*>::iterator iter =
1606 objecter->check_latest_map_lingers.find(linger_id);
1607 if (iter == objecter->check_latest_map_lingers.end()) {
1608 return;
1609 }
1610
1611 LingerOp *op = iter->second;
1612 objecter->check_latest_map_lingers.erase(iter);
1613
1614 if (op->map_dne_bound == 0)
1615 op->map_dne_bound = latest;
1616
1617 bool unregister;
1618 objecter->_check_linger_pool_dne(op, &unregister);
1619
1620 if (unregister) {
1621 objecter->_linger_cancel(op);
1622 }
1623
1624 op->put();
1625 }
1626
1627 void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
1628 {
1629 // rwlock is locked unique
1630
1631 *need_unregister = false;
1632
1633 if (op->register_gen > 0) {
1634 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1635 << " pool previously existed but now does not"
1636 << dendl;
1637 op->map_dne_bound = osdmap->get_epoch();
1638 } else {
1639 ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
1640 << " current " << osdmap->get_epoch()
1641 << " map_dne_bound " << op->map_dne_bound
1642 << dendl;
1643 }
1644 if (op->map_dne_bound > 0) {
1645 if (osdmap->get_epoch() >= op->map_dne_bound) {
1646 LingerOp::unique_lock wl{op->watch_lock};
1647 if (op->on_reg_commit) {
1648 op->on_reg_commit->complete(-ENOENT);
1649 op->on_reg_commit = nullptr;
1650 }
1651 if (op->on_notify_finish) {
1652 op->on_notify_finish->complete(-ENOENT);
1653 op->on_notify_finish = nullptr;
1654 }
1655 *need_unregister = true;
1656 }
1657 } else {
1658 _send_linger_map_check(op);
1659 }
1660 }
1661
1662 void Objecter::_send_linger_map_check(LingerOp *op)
1663 {
1664 // ask the monitor
1665 if (check_latest_map_lingers.count(op->linger_id) == 0) {
1666 op->get();
1667 check_latest_map_lingers[op->linger_id] = op;
1668 C_Linger_Map_Latest *c = new C_Linger_Map_Latest(this, op->linger_id);
1669 monc->get_version("osdmap", &c->latest, NULL, c);
1670 }
1671 }
1672
1673 void Objecter::_linger_cancel_map_check(LingerOp *op)
1674 {
1675 // rwlock is locked unique
1676
1677 map<uint64_t, LingerOp*>::iterator iter =
1678 check_latest_map_lingers.find(op->linger_id);
1679 if (iter != check_latest_map_lingers.end()) {
1680 LingerOp *op = iter->second;
1681 op->put();
1682 check_latest_map_lingers.erase(iter);
1683 }
1684 }
1685
1686 // command pool check
1687
1688 void Objecter::C_Command_Map_Latest::finish(int r)
1689 {
1690 if (r == -EAGAIN || r == -ECANCELED) {
1691 // ignore callback; we will retry in resend_mon_ops()
1692 return;
1693 }
1694
1695 unique_lock wl(objecter->rwlock);
1696
1697 map<uint64_t, CommandOp*>::iterator iter =
1698 objecter->check_latest_map_commands.find(tid);
1699 if (iter == objecter->check_latest_map_commands.end()) {
1700 return;
1701 }
1702
1703 CommandOp *c = iter->second;
1704 objecter->check_latest_map_commands.erase(iter);
1705
1706 if (c->map_dne_bound == 0)
1707 c->map_dne_bound = latest;
1708
1709 OSDSession::unique_lock sul(c->session->lock);
1710 objecter->_check_command_map_dne(c);
1711 sul.unlock();
1712
1713 c->put();
1714 }
1715
1716 void Objecter::_check_command_map_dne(CommandOp *c)
1717 {
1718 // rwlock is locked unique
1719 // session is locked unique
1720
1721 ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
1722 << " current " << osdmap->get_epoch()
1723 << " map_dne_bound " << c->map_dne_bound
1724 << dendl;
1725 if (c->map_dne_bound > 0) {
1726 if (osdmap->get_epoch() >= c->map_dne_bound) {
1727 _finish_command(c, c->map_check_error, c->map_check_error_str);
1728 }
1729 } else {
1730 _send_command_map_check(c);
1731 }
1732 }
1733
1734 void Objecter::_send_command_map_check(CommandOp *c)
1735 {
1736 // rwlock is locked unique
1737 // session is locked unique
1738
1739 // ask the monitor
1740 if (check_latest_map_commands.count(c->tid) == 0) {
1741 c->get();
1742 check_latest_map_commands[c->tid] = c;
1743 C_Command_Map_Latest *f = new C_Command_Map_Latest(this, c->tid);
1744 monc->get_version("osdmap", &f->latest, NULL, f);
1745 }
1746 }
1747
1748 void Objecter::_command_cancel_map_check(CommandOp *c)
1749 {
1750 // rwlock is locked uniqe
1751
1752 map<uint64_t, CommandOp*>::iterator iter =
1753 check_latest_map_commands.find(c->tid);
1754 if (iter != check_latest_map_commands.end()) {
1755 CommandOp *c = iter->second;
1756 c->put();
1757 check_latest_map_commands.erase(iter);
1758 }
1759 }
1760
1761
1762 /**
1763 * Look up OSDSession by OSD id.
1764 *
1765 * @returns 0 on success, or -EAGAIN if the lock context requires
1766 * promotion to write.
1767 */
1768 int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
1769 {
1770 assert(sul && sul.mutex() == &rwlock);
1771
1772 if (osd < 0) {
1773 *session = homeless_session;
1774 ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless"
1775 << dendl;
1776 return 0;
1777 }
1778
1779 map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
1780 if (p != osd_sessions.end()) {
1781 OSDSession *s = p->second;
1782 s->get();
1783 *session = s;
1784 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1785 << s->get_nref() << dendl;
1786 return 0;
1787 }
1788 if (!sul.owns_lock()) {
1789 return -EAGAIN;
1790 }
1791 OSDSession *s = new OSDSession(cct, osd);
1792 osd_sessions[osd] = s;
1793 s->con = messenger->get_connection(osdmap->get_inst(osd));
1794 s->con->set_priv(s->get());
1795 logger->inc(l_osdc_osd_session_open);
1796 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1797 s->get();
1798 *session = s;
1799 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " "
1800 << s->get_nref() << dendl;
1801 return 0;
1802 }
1803
1804 void Objecter::put_session(Objecter::OSDSession *s)
1805 {
1806 if (s && !s->is_homeless()) {
1807 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1808 << s->get_nref() << dendl;
1809 s->put();
1810 }
1811 }
1812
1813 void Objecter::get_session(Objecter::OSDSession *s)
1814 {
1815 assert(s != NULL);
1816
1817 if (!s->is_homeless()) {
1818 ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " "
1819 << s->get_nref() << dendl;
1820 s->get();
1821 }
1822 }
1823
1824 void Objecter::_reopen_session(OSDSession *s)
1825 {
1826 // rwlock is locked unique
1827 // s->lock is locked
1828
1829 entity_inst_t inst = osdmap->get_inst(s->osd);
1830 ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
1831 << inst << dendl;
1832 if (s->con) {
1833 s->con->set_priv(NULL);
1834 s->con->mark_down();
1835 logger->inc(l_osdc_osd_session_close);
1836 }
1837 s->con = messenger->get_connection(inst);
1838 s->con->set_priv(s->get());
1839 s->incarnation++;
1840 logger->inc(l_osdc_osd_session_open);
1841 }
1842
1843 void Objecter::close_session(OSDSession *s)
1844 {
1845 // rwlock is locked unique
1846
1847 ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
1848 if (s->con) {
1849 s->con->set_priv(NULL);
1850 s->con->mark_down();
1851 logger->inc(l_osdc_osd_session_close);
1852 }
1853 OSDSession::unique_lock sl(s->lock);
1854
1855 std::list<LingerOp*> homeless_lingers;
1856 std::list<CommandOp*> homeless_commands;
1857 std::list<Op*> homeless_ops;
1858
1859 while (!s->linger_ops.empty()) {
1860 std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
1861 ldout(cct, 10) << " linger_op " << i->first << dendl;
1862 homeless_lingers.push_back(i->second);
1863 _session_linger_op_remove(s, i->second);
1864 }
1865
1866 while (!s->ops.empty()) {
1867 std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
1868 ldout(cct, 10) << " op " << i->first << dendl;
1869 homeless_ops.push_back(i->second);
1870 _session_op_remove(s, i->second);
1871 }
1872
1873 while (!s->command_ops.empty()) {
1874 std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
1875 ldout(cct, 10) << " command_op " << i->first << dendl;
1876 homeless_commands.push_back(i->second);
1877 _session_command_op_remove(s, i->second);
1878 }
1879
1880 osd_sessions.erase(s->osd);
1881 sl.unlock();
1882 put_session(s);
1883
1884 // Assign any leftover ops to the homeless session
1885 {
1886 OSDSession::unique_lock hsl(homeless_session->lock);
1887 for (std::list<LingerOp*>::iterator i = homeless_lingers.begin();
1888 i != homeless_lingers.end(); ++i) {
1889 _session_linger_op_assign(homeless_session, *i);
1890 }
1891 for (std::list<Op*>::iterator i = homeless_ops.begin();
1892 i != homeless_ops.end(); ++i) {
1893 _session_op_assign(homeless_session, *i);
1894 }
1895 for (std::list<CommandOp*>::iterator i = homeless_commands.begin();
1896 i != homeless_commands.end(); ++i) {
1897 _session_command_op_assign(homeless_session, *i);
1898 }
1899 }
1900
1901 logger->set(l_osdc_osd_sessions, osd_sessions.size());
1902 }
1903
1904 void Objecter::wait_for_osd_map()
1905 {
1906 unique_lock l(rwlock);
1907 if (osdmap->get_epoch()) {
1908 l.unlock();
1909 return;
1910 }
1911
1912 // Leave this since it goes with C_SafeCond
1913 Mutex lock("");
1914 Cond cond;
1915 bool done;
1916 lock.Lock();
1917 C_SafeCond *context = new C_SafeCond(&lock, &cond, &done, NULL);
1918 waiting_for_map[0].push_back(pair<Context*, int>(context, 0));
1919 l.unlock();
1920 while (!done)
1921 cond.Wait(lock);
1922 lock.Unlock();
1923 }
1924
1925 struct C_Objecter_GetVersion : public Context {
1926 Objecter *objecter;
1927 uint64_t oldest, newest;
1928 Context *fin;
1929 C_Objecter_GetVersion(Objecter *o, Context *c)
1930 : objecter(o), oldest(0), newest(0), fin(c) {}
1931 void finish(int r) override {
1932 if (r >= 0) {
1933 objecter->get_latest_version(oldest, newest, fin);
1934 } else if (r == -EAGAIN) { // try again as instructed
1935 objecter->wait_for_latest_osdmap(fin);
1936 } else {
1937 // it doesn't return any other error codes!
1938 ceph_abort();
1939 }
1940 }
1941 };
1942
1943 void Objecter::wait_for_latest_osdmap(Context *fin)
1944 {
1945 ldout(cct, 10) << __func__ << dendl;
1946 C_Objecter_GetVersion *c = new C_Objecter_GetVersion(this, fin);
1947 monc->get_version("osdmap", &c->newest, &c->oldest, c);
1948 }
1949
1950 void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
1951 {
1952 unique_lock wl(rwlock);
1953 _get_latest_version(oldest, newest, fin);
1954 }
1955
1956 void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest,
1957 Context *fin)
1958 {
1959 // rwlock is locked unique
1960 if (osdmap->get_epoch() >= newest) {
1961 ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
1962 if (fin)
1963 fin->complete(0);
1964 return;
1965 }
1966
1967 ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
1968 _wait_for_new_map(fin, newest, 0);
1969 }
1970
1971 void Objecter::maybe_request_map()
1972 {
1973 shared_lock rl(rwlock);
1974 _maybe_request_map();
1975 }
1976
1977 void Objecter::_maybe_request_map()
1978 {
1979 // rwlock is locked
1980 int flag = 0;
1981 if (_osdmap_full_flag()
1982 || osdmap->test_flag(CEPH_OSDMAP_PAUSERD)
1983 || osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
1984 ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next "
1985 "osd map (FULL flag is set)" << dendl;
1986 } else {
1987 ldout(cct, 10)
1988 << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
1989 flag = CEPH_SUBSCRIBE_ONETIME;
1990 }
1991 epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
1992 if (monc->sub_want("osdmap", epoch, flag)) {
1993 monc->renew_subs();
1994 }
1995 }
1996
1997 void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
1998 {
1999 // rwlock is locked unique
2000 waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
2001 _maybe_request_map();
2002 }
2003
2004
2005 /**
2006 * Use this together with wait_for_map: this is a pre-check to avoid
2007 * allocating a Context for wait_for_map if we can see that we
2008 * definitely already have the epoch.
2009 *
2010 * This does *not* replace the need to handle the return value of
2011 * wait_for_map: just because we don't have it in this pre-check
2012 * doesn't mean we won't have it when calling back into wait_for_map,
2013 * since the objecter lock is dropped in between.
2014 */
2015 bool Objecter::have_map(const epoch_t epoch)
2016 {
2017 shared_lock rl(rwlock);
2018 if (osdmap->get_epoch() >= epoch) {
2019 return true;
2020 } else {
2021 return false;
2022 }
2023 }
2024
2025 bool Objecter::wait_for_map(epoch_t epoch, Context *c, int err)
2026 {
2027 unique_lock wl(rwlock);
2028 if (osdmap->get_epoch() >= epoch) {
2029 return true;
2030 }
2031 _wait_for_new_map(c, epoch, err);
2032 return false;
2033 }
2034
2035 void Objecter::kick_requests(OSDSession *session)
2036 {
2037 ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
2038
2039 map<uint64_t, LingerOp *> lresend;
2040 unique_lock wl(rwlock);
2041
2042 OSDSession::unique_lock sl(session->lock);
2043 _kick_requests(session, lresend);
2044 sl.unlock();
2045
2046 _linger_ops_resend(lresend, wl);
2047 }
2048
2049 void Objecter::_kick_requests(OSDSession *session,
2050 map<uint64_t, LingerOp *>& lresend)
2051 {
2052 // rwlock is locked unique
2053
2054 // clear backoffs
2055 session->backoffs.clear();
2056 session->backoffs_by_id.clear();
2057
2058 // resend ops
2059 map<ceph_tid_t,Op*> resend; // resend in tid order
2060 for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
2061 p != session->ops.end();) {
2062 Op *op = p->second;
2063 ++p;
2064 logger->inc(l_osdc_op_resend);
2065 if (op->should_resend) {
2066 if (!op->target.paused)
2067 resend[op->tid] = op;
2068 } else {
2069 _op_cancel_map_check(op);
2070 _cancel_linger_op(op);
2071 }
2072 }
2073
2074 while (!resend.empty()) {
2075 _send_op(resend.begin()->second);
2076 resend.erase(resend.begin());
2077 }
2078
2079 // resend lingers
2080 for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin();
2081 j != session->linger_ops.end(); ++j) {
2082 LingerOp *op = j->second;
2083 op->get();
2084 logger->inc(l_osdc_linger_resend);
2085 assert(lresend.count(j->first) == 0);
2086 lresend[j->first] = op;
2087 }
2088
2089 // resend commands
2090 map<uint64_t,CommandOp*> cresend; // resend in order
2091 for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin();
2092 k != session->command_ops.end(); ++k) {
2093 logger->inc(l_osdc_command_resend);
2094 cresend[k->first] = k->second;
2095 }
2096 while (!cresend.empty()) {
2097 _send_command(cresend.begin()->second);
2098 cresend.erase(cresend.begin());
2099 }
2100 }
2101
2102 void Objecter::_linger_ops_resend(map<uint64_t, LingerOp *>& lresend,
2103 unique_lock& ul)
2104 {
2105 assert(ul.owns_lock());
2106 shunique_lock sul(std::move(ul));
2107 while (!lresend.empty()) {
2108 LingerOp *op = lresend.begin()->second;
2109 if (!op->canceled) {
2110 _send_linger(op, sul);
2111 }
2112 op->put();
2113 lresend.erase(lresend.begin());
2114 }
2115 ul = unique_lock(sul.release_to_unique());
2116 }
2117
2118 void Objecter::start_tick()
2119 {
2120 assert(tick_event == 0);
2121 tick_event =
2122 timer.add_event(ceph::make_timespan(cct->_conf->objecter_tick_interval),
2123 &Objecter::tick, this);
2124 }
2125
2126 void Objecter::tick()
2127 {
2128 shared_lock rl(rwlock);
2129
2130 ldout(cct, 10) << "tick" << dendl;
2131
2132 // we are only called by C_Tick
2133 tick_event = 0;
2134
2135 if (!initialized) {
2136 // we raced with shutdown
2137 ldout(cct, 10) << __func__ << " raced with shutdown" << dendl;
2138 return;
2139 }
2140
2141 set<OSDSession*> toping;
2142
2143
2144 // look for laggy requests
2145 auto cutoff = ceph::mono_clock::now();
2146 cutoff -= ceph::make_timespan(cct->_conf->objecter_timeout); // timeout
2147
2148 unsigned laggy_ops = 0;
2149
2150 for (map<int,OSDSession*>::iterator siter = osd_sessions.begin();
2151 siter != osd_sessions.end(); ++siter) {
2152 OSDSession *s = siter->second;
2153 OSDSession::lock_guard l(s->lock);
2154 bool found = false;
2155 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
2156 p != s->ops.end();
2157 ++p) {
2158 Op *op = p->second;
2159 assert(op->session);
2160 if (op->stamp < cutoff) {
2161 ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd
2162 << " is laggy" << dendl;
2163 found = true;
2164 ++laggy_ops;
2165 }
2166 }
2167 for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
2168 p != s->linger_ops.end();
2169 ++p) {
2170 LingerOp *op = p->second;
2171 LingerOp::unique_lock wl(op->watch_lock);
2172 assert(op->session);
2173 ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first
2174 << " (osd." << op->session->osd << ")" << dendl;
2175 found = true;
2176 if (op->is_watch && op->registered && !op->last_error)
2177 _send_linger_ping(op);
2178 }
2179 for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
2180 p != s->command_ops.end();
2181 ++p) {
2182 CommandOp *op = p->second;
2183 assert(op->session);
2184 ldout(cct, 10) << " pinging osd that serves command tid " << p->first
2185 << " (osd." << op->session->osd << ")" << dendl;
2186 found = true;
2187 }
2188 if (found)
2189 toping.insert(s);
2190 }
2191 if (num_homeless_ops || !toping.empty()) {
2192 _maybe_request_map();
2193 }
2194
2195 logger->set(l_osdc_op_laggy, laggy_ops);
2196 logger->set(l_osdc_osd_laggy, toping.size());
2197
2198 if (!toping.empty()) {
2199 // send a ping to these osds, to ensure we detect any session resets
2200 // (osd reply message policy is lossy)
2201 for (set<OSDSession*>::const_iterator i = toping.begin();
2202 i != toping.end();
2203 ++i) {
2204 (*i)->con->send_message(new MPing);
2205 }
2206 }
2207
2208 // Make sure we don't resechedule if we wake up after shutdown
2209 if (initialized) {
2210 tick_event = timer.reschedule_me(ceph::make_timespan(
2211 cct->_conf->objecter_tick_interval));
2212 }
2213 }
2214
2215 void Objecter::resend_mon_ops()
2216 {
2217 unique_lock wl(rwlock);
2218
2219 ldout(cct, 10) << "resend_mon_ops" << dendl;
2220
2221 for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin();
2222 p != poolstat_ops.end();
2223 ++p) {
2224 _poolstat_submit(p->second);
2225 logger->inc(l_osdc_poolstat_resend);
2226 }
2227
2228 for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin();
2229 p != statfs_ops.end();
2230 ++p) {
2231 _fs_stats_submit(p->second);
2232 logger->inc(l_osdc_statfs_resend);
2233 }
2234
2235 for (map<ceph_tid_t,PoolOp*>::iterator p = pool_ops.begin();
2236 p != pool_ops.end();
2237 ++p) {
2238 _pool_op_submit(p->second);
2239 logger->inc(l_osdc_poolop_resend);
2240 }
2241
2242 for (map<ceph_tid_t, Op*>::iterator p = check_latest_map_ops.begin();
2243 p != check_latest_map_ops.end();
2244 ++p) {
2245 C_Op_Map_Latest *c = new C_Op_Map_Latest(this, p->second->tid);
2246 monc->get_version("osdmap", &c->latest, NULL, c);
2247 }
2248
2249 for (map<uint64_t, LingerOp*>::iterator p = check_latest_map_lingers.begin();
2250 p != check_latest_map_lingers.end();
2251 ++p) {
2252 C_Linger_Map_Latest *c
2253 = new C_Linger_Map_Latest(this, p->second->linger_id);
2254 monc->get_version("osdmap", &c->latest, NULL, c);
2255 }
2256
2257 for (map<uint64_t, CommandOp*>::iterator p
2258 = check_latest_map_commands.begin();
2259 p != check_latest_map_commands.end();
2260 ++p) {
2261 C_Command_Map_Latest *c = new C_Command_Map_Latest(this, p->second->tid);
2262 monc->get_version("osdmap", &c->latest, NULL, c);
2263 }
2264 }
2265
2266 // read | write ---------------------------
2267
2268 void Objecter::op_submit(Op *op, ceph_tid_t *ptid, int *ctx_budget)
2269 {
2270 shunique_lock rl(rwlock, ceph::acquire_shared);
2271 ceph_tid_t tid = 0;
2272 if (!ptid)
2273 ptid = &tid;
2274 op->trace.event("op submit");
2275 _op_submit_with_budget(op, rl, ptid, ctx_budget);
2276 }
2277
2278 void Objecter::_op_submit_with_budget(Op *op, shunique_lock& sul,
2279 ceph_tid_t *ptid,
2280 int *ctx_budget)
2281 {
2282 assert(initialized);
2283
2284 assert(op->ops.size() == op->out_bl.size());
2285 assert(op->ops.size() == op->out_rval.size());
2286 assert(op->ops.size() == op->out_handler.size());
2287
2288 // throttle. before we look at any state, because
2289 // _take_op_budget() may drop our lock while it blocks.
2290 if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
2291 int op_budget = _take_op_budget(op, sul);
2292 // take and pass out the budget for the first OP
2293 // in the context session
2294 if (ctx_budget && (*ctx_budget == -1)) {
2295 *ctx_budget = op_budget;
2296 }
2297 }
2298
2299 if (osd_timeout > timespan(0)) {
2300 if (op->tid == 0)
2301 op->tid = ++last_tid;
2302 auto tid = op->tid;
2303 op->ontimeout = timer.add_event(osd_timeout,
2304 [this, tid]() {
2305 op_cancel(tid, -ETIMEDOUT); });
2306 }
2307
2308 _op_submit(op, sul, ptid);
2309 }
2310
2311 void Objecter::_send_op_account(Op *op)
2312 {
2313 inflight_ops++;
2314
2315 // add to gather set(s)
2316 if (op->onfinish) {
2317 num_in_flight++;
2318 } else {
2319 ldout(cct, 20) << " note: not requesting reply" << dendl;
2320 }
2321
2322 logger->inc(l_osdc_op_active);
2323 logger->inc(l_osdc_op);
2324
2325 if ((op->target.flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)) ==
2326 (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
2327 logger->inc(l_osdc_op_rmw);
2328 else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
2329 logger->inc(l_osdc_op_w);
2330 else if (op->target.flags & CEPH_OSD_FLAG_READ)
2331 logger->inc(l_osdc_op_r);
2332
2333 if (op->target.flags & CEPH_OSD_FLAG_PGOP)
2334 logger->inc(l_osdc_op_pg);
2335
2336 for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); ++p) {
2337 int code = l_osdc_osdop_other;
2338 switch (p->op.op) {
2339 case CEPH_OSD_OP_STAT: code = l_osdc_osdop_stat; break;
2340 case CEPH_OSD_OP_CREATE: code = l_osdc_osdop_create; break;
2341 case CEPH_OSD_OP_READ: code = l_osdc_osdop_read; break;
2342 case CEPH_OSD_OP_WRITE: code = l_osdc_osdop_write; break;
2343 case CEPH_OSD_OP_WRITEFULL: code = l_osdc_osdop_writefull; break;
2344 case CEPH_OSD_OP_WRITESAME: code = l_osdc_osdop_writesame; break;
2345 case CEPH_OSD_OP_APPEND: code = l_osdc_osdop_append; break;
2346 case CEPH_OSD_OP_ZERO: code = l_osdc_osdop_zero; break;
2347 case CEPH_OSD_OP_TRUNCATE: code = l_osdc_osdop_truncate; break;
2348 case CEPH_OSD_OP_DELETE: code = l_osdc_osdop_delete; break;
2349 case CEPH_OSD_OP_MAPEXT: code = l_osdc_osdop_mapext; break;
2350 case CEPH_OSD_OP_SPARSE_READ: code = l_osdc_osdop_sparse_read; break;
2351 case CEPH_OSD_OP_GETXATTR: code = l_osdc_osdop_getxattr; break;
2352 case CEPH_OSD_OP_SETXATTR: code = l_osdc_osdop_setxattr; break;
2353 case CEPH_OSD_OP_CMPXATTR: code = l_osdc_osdop_cmpxattr; break;
2354 case CEPH_OSD_OP_RMXATTR: code = l_osdc_osdop_rmxattr; break;
2355 case CEPH_OSD_OP_RESETXATTRS: code = l_osdc_osdop_resetxattrs; break;
2356 case CEPH_OSD_OP_TMAPUP: code = l_osdc_osdop_tmap_up; break;
2357 case CEPH_OSD_OP_TMAPPUT: code = l_osdc_osdop_tmap_put; break;
2358 case CEPH_OSD_OP_TMAPGET: code = l_osdc_osdop_tmap_get; break;
2359
2360 // OMAP read operations
2361 case CEPH_OSD_OP_OMAPGETVALS:
2362 case CEPH_OSD_OP_OMAPGETKEYS:
2363 case CEPH_OSD_OP_OMAPGETHEADER:
2364 case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
2365 case CEPH_OSD_OP_OMAP_CMP: code = l_osdc_osdop_omap_rd; break;
2366
2367 // OMAP write operations
2368 case CEPH_OSD_OP_OMAPSETVALS:
2369 case CEPH_OSD_OP_OMAPSETHEADER: code = l_osdc_osdop_omap_wr; break;
2370
2371 // OMAP del operations
2372 case CEPH_OSD_OP_OMAPCLEAR:
2373 case CEPH_OSD_OP_OMAPRMKEYS: code = l_osdc_osdop_omap_del; break;
2374
2375 case CEPH_OSD_OP_CALL: code = l_osdc_osdop_call; break;
2376 case CEPH_OSD_OP_WATCH: code = l_osdc_osdop_watch; break;
2377 case CEPH_OSD_OP_NOTIFY: code = l_osdc_osdop_notify; break;
2378 }
2379 if (code)
2380 logger->inc(code);
2381 }
2382 }
2383
2384 void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
2385 {
2386 // rwlock is locked
2387
2388 ldout(cct, 10) << __func__ << " op " << op << dendl;
2389
2390 // pick target
2391 assert(op->session == NULL);
2392 OSDSession *s = NULL;
2393
2394 bool check_for_latest_map = _calc_target(&op->target, nullptr)
2395 == RECALC_OP_TARGET_POOL_DNE;
2396
2397 // Try to get a session, including a retry if we need to take write lock
2398 int r = _get_session(op->target.osd, &s, sul);
2399 if (r == -EAGAIN ||
2400 (check_for_latest_map && sul.owns_lock_shared())) {
2401 epoch_t orig_epoch = osdmap->get_epoch();
2402 sul.unlock();
2403 if (cct->_conf->objecter_debug_inject_relock_delay) {
2404 sleep(1);
2405 }
2406 sul.lock();
2407 if (orig_epoch != osdmap->get_epoch()) {
2408 // map changed; recalculate mapping
2409 ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
2410 << dendl;
2411 check_for_latest_map = _calc_target(&op->target, nullptr)
2412 == RECALC_OP_TARGET_POOL_DNE;
2413 if (s) {
2414 put_session(s);
2415 s = NULL;
2416 r = -EAGAIN;
2417 }
2418 }
2419 }
2420 if (r == -EAGAIN) {
2421 assert(s == NULL);
2422 r = _get_session(op->target.osd, &s, sul);
2423 }
2424 assert(r == 0);
2425 assert(s); // may be homeless
2426
2427 _send_op_account(op);
2428
2429 // send?
2430
2431 assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
2432
2433 if (osdmap_full_try) {
2434 op->target.flags |= CEPH_OSD_FLAG_FULL_TRY;
2435 }
2436
2437 bool need_send = false;
2438
2439 if (osdmap->get_epoch() < epoch_barrier) {
2440 ldout(cct, 10) << " barrier, paused " << op << " tid " << op->tid
2441 << dendl;
2442 op->target.paused = true;
2443 _maybe_request_map();
2444 } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
2445 osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
2446 ldout(cct, 10) << " paused modify " << op << " tid " << op->tid
2447 << dendl;
2448 op->target.paused = true;
2449 _maybe_request_map();
2450 } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
2451 osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
2452 ldout(cct, 10) << " paused read " << op << " tid " << op->tid
2453 << dendl;
2454 op->target.paused = true;
2455 _maybe_request_map();
2456 } else if (op->respects_full() &&
2457 (_osdmap_full_flag() ||
2458 _osdmap_pool_full(op->target.base_oloc.pool))) {
2459 ldout(cct, 0) << " FULL, paused modify " << op << " tid "
2460 << op->tid << dendl;
2461 op->target.paused = true;
2462 _maybe_request_map();
2463 } else if (!s->is_homeless()) {
2464 need_send = true;
2465 } else {
2466 _maybe_request_map();
2467 }
2468
2469 MOSDOp *m = NULL;
2470 if (need_send) {
2471 m = _prepare_osd_op(op);
2472 }
2473
2474 OSDSession::unique_lock sl(s->lock);
2475 if (op->tid == 0)
2476 op->tid = ++last_tid;
2477
2478 ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
2479 << " '" << op->target.base_oloc << "' '"
2480 << op->target.target_oloc << "' " << op->ops << " tid "
2481 << op->tid << " osd." << (!s->is_homeless() ? s->osd : -1)
2482 << dendl;
2483
2484 _session_op_assign(s, op);
2485
2486 if (need_send) {
2487 _send_op(op, m);
2488 }
2489
2490 // Last chance to touch Op here, after giving up session lock it can
2491 // be freed at any time by response handler.
2492 ceph_tid_t tid = op->tid;
2493 if (check_for_latest_map) {
2494 _send_op_map_check(op);
2495 }
2496 if (ptid)
2497 *ptid = tid;
2498 op = NULL;
2499
2500 sl.unlock();
2501 put_session(s);
2502
2503 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
2504 }
2505
2506 int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
2507 {
2508 assert(initialized);
2509
2510 OSDSession::unique_lock sl(s->lock);
2511
2512 map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
2513 if (p == s->ops.end()) {
2514 ldout(cct, 10) << __func__ << " tid " << tid << " dne in session "
2515 << s->osd << dendl;
2516 return -ENOENT;
2517 }
2518
2519 if (s->con) {
2520 ldout(cct, 20) << " revoking rx buffer for " << tid
2521 << " on " << s->con << dendl;
2522 s->con->revoke_rx_buffer(tid);
2523 }
2524
2525 ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
2526 << dendl;
2527 Op *op = p->second;
2528 if (op->onfinish) {
2529 num_in_flight--;
2530 op->onfinish->complete(r);
2531 op->onfinish = NULL;
2532 }
2533 _op_cancel_map_check(op);
2534 _finish_op(op, r);
2535 sl.unlock();
2536
2537 return 0;
2538 }
2539
2540 int Objecter::op_cancel(ceph_tid_t tid, int r)
2541 {
2542 int ret = 0;
2543
2544 unique_lock wl(rwlock);
2545 ret = _op_cancel(tid, r);
2546
2547 return ret;
2548 }
2549
2550 int Objecter::op_cancel(const vector<ceph_tid_t>& tids, int r)
2551 {
2552 unique_lock wl(rwlock);
2553 ldout(cct,10) << __func__ << " " << tids << dendl;
2554 for (auto tid : tids) {
2555 _op_cancel(tid, r);
2556 }
2557 return 0;
2558 }
2559
2560 int Objecter::_op_cancel(ceph_tid_t tid, int r)
2561 {
2562 int ret = 0;
2563
2564 ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r
2565 << dendl;
2566
2567 start:
2568
2569 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2570 siter != osd_sessions.end(); ++siter) {
2571 OSDSession *s = siter->second;
2572 OSDSession::shared_lock sl(s->lock);
2573 if (s->ops.find(tid) != s->ops.end()) {
2574 sl.unlock();
2575 ret = op_cancel(s, tid, r);
2576 if (ret == -ENOENT) {
2577 /* oh no! raced, maybe tid moved to another session, restarting */
2578 goto start;
2579 }
2580 return ret;
2581 }
2582 }
2583
2584 ldout(cct, 5) << __func__ << ": tid " << tid
2585 << " not found in live sessions" << dendl;
2586
2587 // Handle case where the op is in homeless session
2588 OSDSession::shared_lock sl(homeless_session->lock);
2589 if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
2590 sl.unlock();
2591 ret = op_cancel(homeless_session, tid, r);
2592 if (ret == -ENOENT) {
2593 /* oh no! raced, maybe tid moved to another session, restarting */
2594 goto start;
2595 } else {
2596 return ret;
2597 }
2598 } else {
2599 sl.unlock();
2600 }
2601
2602 ldout(cct, 5) << __func__ << ": tid " << tid
2603 << " not found in homeless session" << dendl;
2604
2605 return ret;
2606 }
2607
2608
2609 epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
2610 {
2611 unique_lock wl(rwlock);
2612
2613 std::vector<ceph_tid_t> to_cancel;
2614 bool found = false;
2615
2616 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
2617 siter != osd_sessions.end(); ++siter) {
2618 OSDSession *s = siter->second;
2619 OSDSession::shared_lock sl(s->lock);
2620 for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin();
2621 op_i != s->ops.end(); ++op_i) {
2622 if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE
2623 && (pool == -1 || op_i->second->target.target_oloc.pool == pool)) {
2624 to_cancel.push_back(op_i->first);
2625 }
2626 }
2627 sl.unlock();
2628
2629 for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin();
2630 titer != to_cancel.end();
2631 ++titer) {
2632 int cancel_result = op_cancel(s, *titer, r);
2633 // We hold rwlock across search and cancellation, so cancels
2634 // should always succeed
2635 assert(cancel_result == 0);
2636 }
2637 if (!found && to_cancel.size())
2638 found = true;
2639 to_cancel.clear();
2640 }
2641
2642 const epoch_t epoch = osdmap->get_epoch();
2643
2644 wl.unlock();
2645
2646 if (found) {
2647 return epoch;
2648 } else {
2649 return -1;
2650 }
2651 }
2652
2653 bool Objecter::is_pg_changed(
2654 int oldprimary,
2655 const vector<int>& oldacting,
2656 int newprimary,
2657 const vector<int>& newacting,
2658 bool any_change)
2659 {
2660 if (OSDMap::primary_changed(
2661 oldprimary,
2662 oldacting,
2663 newprimary,
2664 newacting))
2665 return true;
2666 if (any_change && oldacting != newacting)
2667 return true;
2668 return false; // same primary (tho replicas may have changed)
2669 }
2670
2671 bool Objecter::target_should_be_paused(op_target_t *t)
2672 {
2673 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2674 bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
2675 bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) ||
2676 _osdmap_full_flag() || _osdmap_pool_full(*pi);
2677
2678 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
2679 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
2680 (osdmap->get_epoch() < epoch_barrier);
2681 }
2682
2683 /**
2684 * Locking public accessor for _osdmap_full_flag
2685 */
2686 bool Objecter::osdmap_full_flag() const
2687 {
2688 shared_lock rl(rwlock);
2689
2690 return _osdmap_full_flag();
2691 }
2692
2693 bool Objecter::osdmap_pool_full(const int64_t pool_id) const
2694 {
2695 shared_lock rl(rwlock);
2696
2697 if (_osdmap_full_flag()) {
2698 return true;
2699 }
2700
2701 return _osdmap_pool_full(pool_id);
2702 }
2703
2704 bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
2705 {
2706 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
2707 if (pool == NULL) {
2708 ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
2709 return false;
2710 }
2711
2712 return _osdmap_pool_full(*pool);
2713 }
2714
2715 bool Objecter::_osdmap_has_pool_full() const
2716 {
2717 for (map<int64_t, pg_pool_t>::const_iterator it
2718 = osdmap->get_pools().begin();
2719 it != osdmap->get_pools().end(); ++it) {
2720 if (_osdmap_pool_full(it->second))
2721 return true;
2722 }
2723 return false;
2724 }
2725
2726 bool Objecter::_osdmap_pool_full(const pg_pool_t &p) const
2727 {
2728 return p.has_flag(pg_pool_t::FLAG_FULL) && honor_osdmap_full;
2729 }
2730
2731 /**
2732 * Wrapper around osdmap->test_flag for special handling of the FULL flag.
2733 */
2734 bool Objecter::_osdmap_full_flag() const
2735 {
2736 // Ignore the FULL flag if the caller has honor_osdmap_full
2737 return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
2738 }
2739
2740 void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
2741 {
2742 for (map<int64_t, pg_pool_t>::const_iterator it
2743 = osdmap->get_pools().begin();
2744 it != osdmap->get_pools().end(); ++it) {
2745 if (pool_full_map.find(it->first) == pool_full_map.end()) {
2746 pool_full_map[it->first] = _osdmap_pool_full(it->second);
2747 } else {
2748 pool_full_map[it->first] = _osdmap_pool_full(it->second) ||
2749 pool_full_map[it->first];
2750 }
2751 }
2752 }
2753
2754 int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
2755 const string& ns)
2756 {
2757 shared_lock rl(rwlock);
2758 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2759 if (!p)
2760 return -ENOENT;
2761 return p->hash_key(key, ns);
2762 }
2763
2764 int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
2765 const string& ns)
2766 {
2767 shared_lock rl(rwlock);
2768 const pg_pool_t *p = osdmap->get_pg_pool(pool);
2769 if (!p)
2770 return -ENOENT;
2771 return p->raw_hash_to_pg(p->hash_key(key, ns));
2772 }
2773
2774 int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
2775 {
2776 // rwlock is locked
2777 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
2778 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
2779 t->epoch = osdmap->get_epoch();
2780 ldout(cct,20) << __func__ << " epoch " << t->epoch
2781 << " base " << t->base_oid << " " << t->base_oloc
2782 << " precalc_pgid " << (int)t->precalc_pgid
2783 << " pgid " << t->base_pgid
2784 << (is_read ? " is_read" : "")
2785 << (is_write ? " is_write" : "")
2786 << dendl;
2787
2788 const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
2789 if (!pi) {
2790 t->osd = -1;
2791 return RECALC_OP_TARGET_POOL_DNE;
2792 }
2793 ldout(cct,30) << __func__ << " base pi " << pi
2794 << " pg_num " << pi->get_pg_num() << dendl;
2795
2796 bool force_resend = false;
2797 if (osdmap->get_epoch() == pi->last_force_op_resend) {
2798 if (t->last_force_resend < pi->last_force_op_resend) {
2799 t->last_force_resend = pi->last_force_op_resend;
2800 force_resend = true;
2801 } else if (t->last_force_resend == 0) {
2802 force_resend = true;
2803 }
2804 }
2805
2806 // apply tiering
2807 t->target_oid = t->base_oid;
2808 t->target_oloc = t->base_oloc;
2809 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
2810 if (is_read && pi->has_read_tier())
2811 t->target_oloc.pool = pi->read_tier;
2812 if (is_write && pi->has_write_tier())
2813 t->target_oloc.pool = pi->write_tier;
2814 pi = osdmap->get_pg_pool(t->target_oloc.pool);
2815 if (!pi) {
2816 t->osd = -1;
2817 return RECALC_OP_TARGET_POOL_DNE;
2818 }
2819 }
2820
2821 pg_t pgid;
2822 if (t->precalc_pgid) {
2823 assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
2824 assert(t->base_oid.name.empty()); // make sure this is a pg op
2825 assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
2826 pgid = t->base_pgid;
2827 } else {
2828 int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
2829 pgid);
2830 if (ret == -ENOENT) {
2831 t->osd = -1;
2832 return RECALC_OP_TARGET_POOL_DNE;
2833 }
2834 }
2835 ldout(cct,20) << __func__ << " target " << t->target_oid << " "
2836 << t->target_oloc << " -> pgid " << pgid << dendl;
2837 ldout(cct,30) << __func__ << " target pi " << pi
2838 << " pg_num " << pi->get_pg_num() << dendl;
2839 t->pool_ever_existed = true;
2840
2841 int size = pi->size;
2842 int min_size = pi->min_size;
2843 unsigned pg_num = pi->get_pg_num();
2844 int up_primary, acting_primary;
2845 vector<int> up, acting;
2846 osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
2847 &acting, &acting_primary);
2848 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
2849 bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
2850 unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
2851 pg_t prev_pgid(prev_seed, pgid.pool());
2852 if (any_change && PastIntervals::is_new_interval(
2853 t->acting_primary,
2854 acting_primary,
2855 t->acting,
2856 acting,
2857 t->up_primary,
2858 up_primary,
2859 t->up,
2860 up,
2861 t->size,
2862 size,
2863 t->min_size,
2864 min_size,
2865 t->pg_num,
2866 pg_num,
2867 t->sort_bitwise,
2868 sort_bitwise,
2869 t->recovery_deletes,
2870 recovery_deletes,
2871 prev_pgid)) {
2872 force_resend = true;
2873 }
2874
2875 bool unpaused = false;
2876 if (t->paused && !target_should_be_paused(t)) {
2877 t->paused = false;
2878 unpaused = true;
2879 }
2880
2881 bool legacy_change =
2882 t->pgid != pgid ||
2883 is_pg_changed(
2884 t->acting_primary, t->acting, acting_primary, acting,
2885 t->used_replica || any_change);
2886 bool split = false;
2887 if (t->pg_num) {
2888 split = prev_pgid.is_split(t->pg_num, pg_num, nullptr);
2889 }
2890
2891 if (legacy_change || split || force_resend) {
2892 t->pgid = pgid;
2893 t->acting = acting;
2894 t->acting_primary = acting_primary;
2895 t->up_primary = up_primary;
2896 t->up = up;
2897 t->size = size;
2898 t->min_size = min_size;
2899 t->pg_num = pg_num;
2900 t->pg_num_mask = pi->get_pg_num_mask();
2901 osdmap->get_primary_shard(
2902 pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
2903 &t->actual_pgid);
2904 t->sort_bitwise = sort_bitwise;
2905 t->recovery_deletes = recovery_deletes;
2906 ldout(cct, 10) << __func__ << " "
2907 << " raw pgid " << pgid << " -> actual " << t->actual_pgid
2908 << " acting " << acting
2909 << " primary " << acting_primary << dendl;
2910 t->used_replica = false;
2911 if (acting_primary == -1) {
2912 t->osd = -1;
2913 } else {
2914 int osd;
2915 bool read = is_read && !is_write;
2916 if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
2917 int p = rand() % acting.size();
2918 if (p)
2919 t->used_replica = true;
2920 osd = acting[p];
2921 ldout(cct, 10) << " chose random osd." << osd << " of " << acting
2922 << dendl;
2923 } else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
2924 acting.size() > 1) {
2925 // look for a local replica. prefer the primary if the
2926 // distance is the same.
2927 int best = -1;
2928 int best_locality = 0;
2929 for (unsigned i = 0; i < acting.size(); ++i) {
2930 int locality = osdmap->crush->get_common_ancestor_distance(
2931 cct, acting[i], crush_location);
2932 ldout(cct, 20) << __func__ << " localize: rank " << i
2933 << " osd." << acting[i]
2934 << " locality " << locality << dendl;
2935 if (i == 0 ||
2936 (locality >= 0 && best_locality >= 0 &&
2937 locality < best_locality) ||
2938 (best_locality < 0 && locality >= 0)) {
2939 best = i;
2940 best_locality = locality;
2941 if (i)
2942 t->used_replica = true;
2943 }
2944 }
2945 assert(best >= 0);
2946 osd = acting[best];
2947 } else {
2948 osd = acting_primary;
2949 }
2950 t->osd = osd;
2951 }
2952 }
2953 if (legacy_change || unpaused || force_resend) {
2954 return RECALC_OP_TARGET_NEED_RESEND;
2955 }
2956 if (split &&
2957 (osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS ||
2958 HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
2959 RESEND_ON_SPLIT))) {
2960 return RECALC_OP_TARGET_NEED_RESEND;
2961 }
2962 return RECALC_OP_TARGET_NO_ACTION;
2963 }
2964
2965 int Objecter::_map_session(op_target_t *target, OSDSession **s,
2966 shunique_lock& sul)
2967 {
2968 _calc_target(target, nullptr);
2969 return _get_session(target->osd, s, sul);
2970 }
2971
2972 void Objecter::_session_op_assign(OSDSession *to, Op *op)
2973 {
2974 // to->lock is locked
2975 assert(op->session == NULL);
2976 assert(op->tid);
2977
2978 get_session(to);
2979 op->session = to;
2980 to->ops[op->tid] = op;
2981
2982 if (to->is_homeless()) {
2983 num_homeless_ops++;
2984 }
2985
2986 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
2987 }
2988
2989 void Objecter::_session_op_remove(OSDSession *from, Op *op)
2990 {
2991 assert(op->session == from);
2992 // from->lock is locked
2993
2994 if (from->is_homeless()) {
2995 num_homeless_ops--;
2996 }
2997
2998 from->ops.erase(op->tid);
2999 put_session(from);
3000 op->session = NULL;
3001
3002 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3003 }
3004
3005 void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
3006 {
3007 // to lock is locked unique
3008 assert(op->session == NULL);
3009
3010 if (to->is_homeless()) {
3011 num_homeless_ops++;
3012 }
3013
3014 get_session(to);
3015 op->session = to;
3016 to->linger_ops[op->linger_id] = op;
3017
3018 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id
3019 << dendl;
3020 }
3021
3022 void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
3023 {
3024 assert(from == op->session);
3025 // from->lock is locked unique
3026
3027 if (from->is_homeless()) {
3028 num_homeless_ops--;
3029 }
3030
3031 from->linger_ops.erase(op->linger_id);
3032 put_session(from);
3033 op->session = NULL;
3034
3035 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id
3036 << dendl;
3037 }
3038
3039 void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
3040 {
3041 assert(from == op->session);
3042 // from->lock is locked
3043
3044 if (from->is_homeless()) {
3045 num_homeless_ops--;
3046 }
3047
3048 from->command_ops.erase(op->tid);
3049 put_session(from);
3050 op->session = NULL;
3051
3052 ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
3053 }
3054
3055 void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
3056 {
3057 // to->lock is locked
3058 assert(op->session == NULL);
3059 assert(op->tid);
3060
3061 if (to->is_homeless()) {
3062 num_homeless_ops++;
3063 }
3064
3065 get_session(to);
3066 op->session = to;
3067 to->command_ops[op->tid] = op;
3068
3069 ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
3070 }
3071
3072 int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
3073 shunique_lock& sul)
3074 {
3075 // rwlock is locked unique
3076
3077 int r = _calc_target(&linger_op->target, nullptr, true);
3078 if (r == RECALC_OP_TARGET_NEED_RESEND) {
3079 ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
3080 << " pgid " << linger_op->target.pgid
3081 << " acting " << linger_op->target.acting << dendl;
3082
3083 OSDSession *s = NULL;
3084 r = _get_session(linger_op->target.osd, &s, sul);
3085 assert(r == 0);
3086
3087 if (linger_op->session != s) {
3088 // NB locking two sessions (s and linger_op->session) at the
3089 // same time here is only safe because we are the only one that
3090 // takes two, and we are holding rwlock for write. Disable
3091 // lockdep because it doesn't know that.
3092 OSDSession::unique_lock sl(s->lock);
3093 _session_linger_op_remove(linger_op->session, linger_op);
3094 _session_linger_op_assign(s, linger_op);
3095 }
3096
3097 put_session(s);
3098 return RECALC_OP_TARGET_NEED_RESEND;
3099 }
3100 return r;
3101 }
3102
3103 void Objecter::_cancel_linger_op(Op *op)
3104 {
3105 ldout(cct, 15) << "cancel_op " << op->tid << dendl;
3106
3107 assert(!op->should_resend);
3108 if (op->onfinish) {
3109 delete op->onfinish;
3110 num_in_flight--;
3111 }
3112
3113 _finish_op(op, 0);
3114 }
3115
3116 void Objecter::_finish_op(Op *op, int r)
3117 {
3118 ldout(cct, 15) << "finish_op " << op->tid << dendl;
3119
3120 // op->session->lock is locked unique or op->session is null
3121
3122 if (!op->ctx_budgeted && op->budgeted)
3123 put_op_budget(op);
3124
3125 if (op->ontimeout && r != -ETIMEDOUT)
3126 timer.cancel_event(op->ontimeout);
3127
3128 if (op->session) {
3129 _session_op_remove(op->session, op);
3130 }
3131
3132 logger->dec(l_osdc_op_active);
3133
3134 assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
3135
3136 inflight_ops--;
3137
3138 op->put();
3139 }
3140
3141 void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
3142 {
3143 ldout(cct, 15) << "finish_op " << tid << dendl;
3144 shared_lock rl(rwlock);
3145
3146 OSDSession::unique_lock wl(session->lock);
3147
3148 map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
3149 if (iter == session->ops.end())
3150 return;
3151
3152 Op *op = iter->second;
3153
3154 _finish_op(op, 0);
3155 }
3156
3157 MOSDOp *Objecter::_prepare_osd_op(Op *op)
3158 {
3159 // rwlock is locked
3160
3161 int flags = op->target.flags;
3162 flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
3163
3164 // Nothing checks this any longer, but needed for compatibility with
3165 // pre-luminous osds
3166 flags |= CEPH_OSD_FLAG_ONDISK;
3167
3168 if (!honor_osdmap_full)
3169 flags |= CEPH_OSD_FLAG_FULL_FORCE;
3170
3171 op->target.paused = false;
3172 op->stamp = ceph::mono_clock::now();
3173
3174 hobject_t hobj = op->target.get_hobj();
3175 MOSDOp *m = new MOSDOp(client_inc, op->tid,
3176 hobj, op->target.actual_pgid,
3177 osdmap->get_epoch(),
3178 flags, op->features);
3179
3180 m->set_snapid(op->snapid);
3181 m->set_snap_seq(op->snapc.seq);
3182 m->set_snaps(op->snapc.snaps);
3183
3184 m->ops = op->ops;
3185 m->set_mtime(op->mtime);
3186 m->set_retry_attempt(op->attempts++);
3187
3188 if (!op->trace.valid() && cct->_conf->osdc_blkin_trace_all) {
3189 op->trace.init("op", &trace_endpoint);
3190 }
3191
3192 if (op->priority)
3193 m->set_priority(op->priority);
3194 else
3195 m->set_priority(cct->_conf->osd_client_op_priority);
3196
3197 if (op->reqid != osd_reqid_t()) {
3198 m->set_reqid(op->reqid);
3199 }
3200
3201 logger->inc(l_osdc_op_send);
3202 ssize_t sum = 0;
3203 for (unsigned i = 0; i < m->ops.size(); i++) {
3204 sum += m->ops[i].indata.length();
3205 }
3206 logger->inc(l_osdc_op_send_bytes, sum);
3207
3208 return m;
3209 }
3210
3211 void Objecter::_send_op(Op *op, MOSDOp *m)
3212 {
3213 // rwlock is locked
3214 // op->session->lock is locked
3215
3216 // backoff?
3217 auto p = op->session->backoffs.find(op->target.actual_pgid);
3218 if (p != op->session->backoffs.end()) {
3219 hobject_t hoid = op->target.get_hobj();
3220 auto q = p->second.lower_bound(hoid);
3221 if (q != p->second.begin()) {
3222 --q;
3223 if (hoid >= q->second.end) {
3224 ++q;
3225 }
3226 }
3227 if (q != p->second.end()) {
3228 ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
3229 << "," << q->second.end << ")" << dendl;
3230 int r = cmp(hoid, q->second.begin);
3231 if (r == 0 || (r > 0 && hoid < q->second.end)) {
3232 ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
3233 << " id " << q->second.id << " on " << hoid
3234 << ", queuing " << op << " tid " << op->tid << dendl;
3235 return;
3236 }
3237 }
3238 }
3239
3240 if (!m) {
3241 assert(op->tid > 0);
3242 m = _prepare_osd_op(op);
3243 }
3244
3245 if (op->target.actual_pgid != m->get_spg()) {
3246 ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
3247 << m->get_spg() << " to " << op->target.actual_pgid
3248 << ", updating and reencoding" << dendl;
3249 m->set_spg(op->target.actual_pgid);
3250 m->clear_payload(); // reencode
3251 }
3252
3253 ldout(cct, 15) << "_send_op " << op->tid << " to "
3254 << op->target.actual_pgid << " on osd." << op->session->osd
3255 << dendl;
3256
3257 ConnectionRef con = op->session->con;
3258 assert(con);
3259
3260 // preallocated rx buffer?
3261 if (op->con) {
3262 ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on "
3263 << op->con << dendl;
3264 op->con->revoke_rx_buffer(op->tid);
3265 }
3266 if (op->outbl &&
3267 op->ontimeout == 0 && // only post rx_buffer if no timeout; see #9582
3268 op->outbl->length()) {
3269 ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con
3270 << dendl;
3271 op->con = con;
3272 op->con->post_rx_buffer(op->tid, *op->outbl);
3273 }
3274
3275 op->incarnation = op->session->incarnation;
3276
3277 m->set_tid(op->tid);
3278
3279 if (op->trace.valid()) {
3280 m->trace.init("op msg", nullptr, &op->trace);
3281 }
3282 op->session->con->send_message(m);
3283 }
3284
3285 int Objecter::calc_op_budget(Op *op)
3286 {
3287 int op_budget = 0;
3288 for (vector<OSDOp>::iterator i = op->ops.begin();
3289 i != op->ops.end();
3290 ++i) {
3291 if (i->op.op & CEPH_OSD_OP_MODE_WR) {
3292 op_budget += i->indata.length();
3293 } else if (ceph_osd_op_mode_read(i->op.op)) {
3294 if (ceph_osd_op_type_data(i->op.op)) {
3295 if ((int64_t)i->op.extent.length > 0)
3296 op_budget += (int64_t)i->op.extent.length;
3297 } else if (ceph_osd_op_type_attr(i->op.op)) {
3298 op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
3299 }
3300 }
3301 }
3302 return op_budget;
3303 }
3304
3305 void Objecter::_throttle_op(Op *op,
3306 shunique_lock& sul,
3307 int op_budget)
3308 {
3309 assert(sul && sul.mutex() == &rwlock);
3310 bool locked_for_write = sul.owns_lock();
3311
3312 if (!op_budget)
3313 op_budget = calc_op_budget(op);
3314 if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
3315 sul.unlock();
3316 op_throttle_bytes.get(op_budget);
3317 if (locked_for_write)
3318 sul.lock();
3319 else
3320 sul.lock_shared();
3321 }
3322 if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
3323 sul.unlock();
3324 op_throttle_ops.get(1);
3325 if (locked_for_write)
3326 sul.lock();
3327 else
3328 sul.lock_shared();
3329 }
3330 }
3331
3332 void Objecter::unregister_op(Op *op)
3333 {
3334 OSDSession::unique_lock sl(op->session->lock);
3335 op->session->ops.erase(op->tid);
3336 sl.unlock();
3337 put_session(op->session);
3338 op->session = NULL;
3339
3340 inflight_ops--;
3341 }
3342
3343 /* This function DOES put the passed message before returning */
3344 void Objecter::handle_osd_op_reply(MOSDOpReply *m)
3345 {
3346 ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
3347
3348 // get pio
3349 ceph_tid_t tid = m->get_tid();
3350
3351 shunique_lock sul(rwlock, ceph::acquire_shared);
3352 if (!initialized) {
3353 m->put();
3354 return;
3355 }
3356
3357 ConnectionRef con = m->get_connection();
3358 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3359 if (!s || s->con != con) {
3360 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3361 if (s) {
3362 s->put();
3363 }
3364 m->put();
3365 return;
3366 }
3367
3368 OSDSession::unique_lock sl(s->lock);
3369
3370 map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
3371 if (iter == s->ops.end()) {
3372 ldout(cct, 7) << "handle_osd_op_reply " << tid
3373 << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ?
3374 " onnvram" : " ack"))
3375 << " ... stray" << dendl;
3376 sl.unlock();
3377 put_session(s);
3378 m->put();
3379 return;
3380 }
3381
3382 ldout(cct, 7) << "handle_osd_op_reply " << tid
3383 << (m->is_ondisk() ? " ondisk" :
3384 (m->is_onnvram() ? " onnvram" : " ack"))
3385 << " uv " << m->get_user_version()
3386 << " in " << m->get_pg()
3387 << " attempt " << m->get_retry_attempt()
3388 << dendl;
3389 Op *op = iter->second;
3390 op->trace.event("osd op reply");
3391
3392 if (retry_writes_after_first_reply && op->attempts == 1 &&
3393 (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
3394 ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
3395 if (op->onfinish) {
3396 num_in_flight--;
3397 }
3398 _session_op_remove(s, op);
3399 sl.unlock();
3400 put_session(s);
3401
3402 _op_submit(op, sul, NULL);
3403 m->put();
3404 return;
3405 }
3406
3407 if (m->get_retry_attempt() >= 0) {
3408 if (m->get_retry_attempt() != (op->attempts - 1)) {
3409 ldout(cct, 7) << " ignoring reply from attempt "
3410 << m->get_retry_attempt()
3411 << " from " << m->get_source_inst()
3412 << "; last attempt " << (op->attempts - 1) << " sent to "
3413 << op->session->con->get_peer_addr() << dendl;
3414 m->put();
3415 sl.unlock();
3416 put_session(s);
3417 return;
3418 }
3419 } else {
3420 // we don't know the request attempt because the server is old, so
3421 // just accept this one. we may do ACK callbacks we shouldn't
3422 // have, but that is better than doing callbacks out of order.
3423 }
3424
3425 Context *onfinish = 0;
3426
3427 int rc = m->get_result();
3428
3429 if (m->is_redirect_reply()) {
3430 ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
3431 if (op->onfinish)
3432 num_in_flight--;
3433 _session_op_remove(s, op);
3434 sl.unlock();
3435 put_session(s);
3436
3437 // FIXME: two redirects could race and reorder
3438
3439 op->tid = 0;
3440 m->get_redirect().combine_with_locator(op->target.target_oloc,
3441 op->target.target_oid.name);
3442 op->target.flags |= (CEPH_OSD_FLAG_REDIRECTED | CEPH_OSD_FLAG_IGNORE_OVERLAY);
3443 _op_submit(op, sul, NULL);
3444 m->put();
3445 return;
3446 }
3447
3448 if (rc == -EAGAIN) {
3449 ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
3450 if (op->onfinish)
3451 num_in_flight--;
3452 _session_op_remove(s, op);
3453 sl.unlock();
3454 put_session(s);
3455
3456 op->tid = 0;
3457 op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3458 CEPH_OSD_FLAG_LOCALIZE_READS);
3459 op->target.pgid = pg_t();
3460 _op_submit(op, sul, NULL);
3461 m->put();
3462 return;
3463 }
3464
3465 sul.unlock();
3466
3467 if (op->objver)
3468 *op->objver = m->get_user_version();
3469 if (op->reply_epoch)
3470 *op->reply_epoch = m->get_map_epoch();
3471 if (op->data_offset)
3472 *op->data_offset = m->get_header().data_off;
3473
3474 // got data?
3475 if (op->outbl) {
3476 if (op->con)
3477 op->con->revoke_rx_buffer(op->tid);
3478 m->claim_data(*op->outbl);
3479 op->outbl = 0;
3480 }
3481
3482 // per-op result demuxing
3483 vector<OSDOp> out_ops;
3484 m->claim_ops(out_ops);
3485
3486 if (out_ops.size() != op->ops.size())
3487 ldout(cct, 0) << "WARNING: tid " << op->tid << " reply ops " << out_ops
3488 << " != request ops " << op->ops
3489 << " from " << m->get_source_inst() << dendl;
3490
3491 vector<bufferlist*>::iterator pb = op->out_bl.begin();
3492 vector<int*>::iterator pr = op->out_rval.begin();
3493 vector<Context*>::iterator ph = op->out_handler.begin();
3494 assert(op->out_bl.size() == op->out_rval.size());
3495 assert(op->out_bl.size() == op->out_handler.size());
3496 vector<OSDOp>::iterator p = out_ops.begin();
3497 for (unsigned i = 0;
3498 p != out_ops.end() && pb != op->out_bl.end();
3499 ++i, ++p, ++pb, ++pr, ++ph) {
3500 ldout(cct, 10) << " op " << i << " rval " << p->rval
3501 << " len " << p->outdata.length() << dendl;
3502 if (*pb)
3503 **pb = p->outdata;
3504 // set rval before running handlers so that handlers
3505 // can change it if e.g. decoding fails
3506 if (*pr)
3507 **pr = ceph_to_hostos_errno(p->rval);
3508 if (*ph) {
3509 ldout(cct, 10) << " op " << i << " handler " << *ph << dendl;
3510 (*ph)->complete(ceph_to_hostos_errno(p->rval));
3511 *ph = NULL;
3512 }
3513 }
3514
3515 // NOTE: we assume that since we only request ONDISK ever we will
3516 // only ever get back one (type of) ack ever.
3517
3518 if (op->onfinish) {
3519 num_in_flight--;
3520 onfinish = op->onfinish;
3521 op->onfinish = NULL;
3522 }
3523 logger->inc(l_osdc_op_reply);
3524
3525 /* get it before we call _finish_op() */
3526 auto completion_lock = s->get_lock(op->target.base_oid);
3527
3528 ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
3529 _finish_op(op, 0);
3530
3531 ldout(cct, 5) << num_in_flight << " in flight" << dendl;
3532
3533 // serialize completions
3534 if (completion_lock.mutex()) {
3535 completion_lock.lock();
3536 }
3537 sl.unlock();
3538
3539 // do callbacks
3540 if (onfinish) {
3541 onfinish->complete(rc);
3542 }
3543 if (completion_lock.mutex()) {
3544 completion_lock.unlock();
3545 }
3546
3547 m->put();
3548 put_session(s);
3549 }
3550
3551 void Objecter::handle_osd_backoff(MOSDBackoff *m)
3552 {
3553 ldout(cct, 10) << __func__ << " " << *m << dendl;
3554 shunique_lock sul(rwlock, ceph::acquire_shared);
3555 if (!initialized) {
3556 m->put();
3557 return;
3558 }
3559
3560 ConnectionRef con = m->get_connection();
3561 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
3562 if (!s || s->con != con) {
3563 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
3564 if (s)
3565 s->put();
3566 m->put();
3567 return;
3568 }
3569
3570 get_session(s);
3571 s->put(); // from get_priv() above
3572
3573 OSDSession::unique_lock sl(s->lock);
3574
3575 switch (m->op) {
3576 case CEPH_OSD_BACKOFF_OP_BLOCK:
3577 {
3578 // register
3579 OSDBackoff& b = s->backoffs[m->pgid][m->begin];
3580 s->backoffs_by_id.insert(make_pair(m->id, &b));
3581 b.pgid = m->pgid;
3582 b.id = m->id;
3583 b.begin = m->begin;
3584 b.end = m->end;
3585
3586 // ack with original backoff's epoch so that the osd can discard this if
3587 // there was a pg split.
3588 Message *r = new MOSDBackoff(m->pgid,
3589 m->map_epoch,
3590 CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
3591 m->id, m->begin, m->end);
3592 // this priority must match the MOSDOps from _prepare_osd_op
3593 r->set_priority(cct->_conf->osd_client_op_priority);
3594 con->send_message(r);
3595 }
3596 break;
3597
3598 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
3599 {
3600 auto p = s->backoffs_by_id.find(m->id);
3601 if (p != s->backoffs_by_id.end()) {
3602 OSDBackoff *b = p->second;
3603 if (b->begin != m->begin &&
3604 b->end != m->end) {
3605 lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
3606 << " unblock on ["
3607 << m->begin << "," << m->end << ") but backoff is ["
3608 << b->begin << "," << b->end << ")" << dendl;
3609 // hrmpf, unblock it anyway.
3610 }
3611 ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
3612 << " id " << b->id
3613 << " [" << b->begin << "," << b->end
3614 << ")" << dendl;
3615 auto spgp = s->backoffs.find(b->pgid);
3616 assert(spgp != s->backoffs.end());
3617 spgp->second.erase(b->begin);
3618 if (spgp->second.empty()) {
3619 s->backoffs.erase(spgp);
3620 }
3621 s->backoffs_by_id.erase(p);
3622
3623 // check for any ops to resend
3624 for (auto& q : s->ops) {
3625 if (q.second->target.actual_pgid == m->pgid) {
3626 int r = q.second->target.contained_by(m->begin, m->end);
3627 ldout(cct, 20) << __func__ << " contained_by " << r << " on "
3628 << q.second->target.get_hobj() << dendl;
3629 if (r) {
3630 _send_op(q.second);
3631 }
3632 }
3633 }
3634 } else {
3635 lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
3636 << " unblock on ["
3637 << m->begin << "," << m->end << ") but backoff dne" << dendl;
3638 }
3639 }
3640 break;
3641
3642 default:
3643 ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
3644 }
3645
3646 sul.unlock();
3647 sl.unlock();
3648
3649 m->put();
3650 put_session(s);
3651 }
3652
3653 uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3654 uint32_t pos)
3655 {
3656 shared_lock rl(rwlock);
3657 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3658 pos, list_context->pool_id, string());
3659 ldout(cct, 10) << __func__ << list_context
3660 << " pos " << pos << " -> " << list_context->pos << dendl;
3661 pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
3662 list_context->current_pg = actual.ps();
3663 list_context->at_end_of_pool = false;
3664 return pos;
3665 }
3666
3667 uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
3668 const hobject_t& cursor)
3669 {
3670 shared_lock rl(rwlock);
3671 ldout(cct, 10) << "list_nobjects_seek " << list_context << dendl;
3672 list_context->pos = cursor;
3673 list_context->at_end_of_pool = false;
3674 pg_t actual = osdmap->raw_pg_to_pg(pg_t(cursor.get_hash(), list_context->pool_id));
3675 list_context->current_pg = actual.ps();
3676 list_context->sort_bitwise = true;
3677 return list_context->current_pg;
3678 }
3679
3680 void Objecter::list_nobjects_get_cursor(NListContext *list_context,
3681 hobject_t *cursor)
3682 {
3683 shared_lock rl(rwlock);
3684 if (list_context->list.empty()) {
3685 *cursor = list_context->pos;
3686 } else {
3687 const librados::ListObjectImpl& entry = list_context->list.front();
3688 const string *key = (entry.locator.empty() ? &entry.oid : &entry.locator);
3689 uint32_t h = osdmap->get_pg_pool(list_context->pool_id)->hash_key(*key, entry.nspace);
3690 *cursor = hobject_t(entry.oid, entry.locator, list_context->pool_snap_seq, h, list_context->pool_id, entry.nspace);
3691 }
3692 }
3693
3694 void Objecter::list_nobjects(NListContext *list_context, Context *onfinish)
3695 {
3696 ldout(cct, 10) << __func__ << " pool_id " << list_context->pool_id
3697 << " pool_snap_seq " << list_context->pool_snap_seq
3698 << " max_entries " << list_context->max_entries
3699 << " list_context " << list_context
3700 << " onfinish " << onfinish
3701 << " current_pg " << list_context->current_pg
3702 << " pos " << list_context->pos << dendl;
3703
3704 shared_lock rl(rwlock);
3705 const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
3706 if (!pool) { // pool is gone
3707 rl.unlock();
3708 put_nlist_context_budget(list_context);
3709 onfinish->complete(-ENOENT);
3710 return;
3711 }
3712 int pg_num = pool->get_pg_num();
3713 bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
3714
3715 if (list_context->pos.is_min()) {
3716 list_context->starting_pg_num = 0;
3717 list_context->sort_bitwise = sort_bitwise;
3718 list_context->starting_pg_num = pg_num;
3719 }
3720 if (list_context->sort_bitwise != sort_bitwise) {
3721 list_context->pos = hobject_t(
3722 object_t(), string(), CEPH_NOSNAP,
3723 list_context->current_pg, list_context->pool_id, string());
3724 list_context->sort_bitwise = sort_bitwise;
3725 ldout(cct, 10) << " hobject sort order changed, restarting this pg at "
3726 << list_context->pos << dendl;
3727 }
3728 if (list_context->starting_pg_num != pg_num) {
3729 if (!sort_bitwise) {
3730 // start reading from the beginning; the pgs have changed
3731 ldout(cct, 10) << " pg_num changed; restarting with " << pg_num << dendl;
3732 list_context->pos = collection_list_handle_t();
3733 }
3734 list_context->starting_pg_num = pg_num;
3735 }
3736
3737 if (list_context->pos.is_max()) {
3738 ldout(cct, 20) << __func__ << " end of pool, list "
3739 << list_context->list << dendl;
3740 if (list_context->list.empty()) {
3741 list_context->at_end_of_pool = true;
3742 }
3743 // release the listing context's budget once all
3744 // OPs (in the session) are finished
3745 put_nlist_context_budget(list_context);
3746 onfinish->complete(0);
3747 return;
3748 }
3749
3750 ObjectOperation op;
3751 op.pg_nls(list_context->max_entries, list_context->filter,
3752 list_context->pos, osdmap->get_epoch());
3753 list_context->bl.clear();
3754 C_NList *onack = new C_NList(list_context, onfinish, this);
3755 object_locator_t oloc(list_context->pool_id, list_context->nspace);
3756
3757 // note current_pg in case we don't have (or lose) SORTBITWISE
3758 list_context->current_pg = pool->raw_hash_to_pg(list_context->pos.get_hash());
3759 rl.unlock();
3760
3761 pg_read(list_context->current_pg, oloc, op,
3762 &list_context->bl, 0, onack, &onack->epoch,
3763 &list_context->ctx_budget);
3764 }
3765
3766 void Objecter::_nlist_reply(NListContext *list_context, int r,
3767 Context *final_finish, epoch_t reply_epoch)
3768 {
3769 ldout(cct, 10) << __func__ << " " << list_context << dendl;
3770
3771 bufferlist::iterator iter = list_context->bl.begin();
3772 pg_nls_response_t response;
3773 bufferlist extra_info;
3774 ::decode(response, iter);
3775 if (!iter.end()) {
3776 ::decode(extra_info, iter);
3777 }
3778
3779 // if the osd returns 1 (newer code), or handle MAX, it means we
3780 // hit the end of the pg.
3781 if ((response.handle.is_max() || r == 1) &&
3782 !list_context->sort_bitwise) {
3783 // legacy OSD and !sortbitwise, figure out the next PG on our own
3784 ++list_context->current_pg;
3785 if (list_context->current_pg == list_context->starting_pg_num) {
3786 // end of pool
3787 list_context->pos = hobject_t::get_max();
3788 } else {
3789 // next pg
3790 list_context->pos = hobject_t(object_t(), string(), CEPH_NOSNAP,
3791 list_context->current_pg,
3792 list_context->pool_id, string());
3793 }
3794 } else {
3795 list_context->pos = response.handle;
3796 }
3797
3798 int response_size = response.entries.size();
3799 ldout(cct, 20) << " response.entries.size " << response_size
3800 << ", response.entries " << response.entries
3801 << ", handle " << response.handle
3802 << ", tentative new pos " << list_context->pos << dendl;
3803 list_context->extra_info.append(extra_info);
3804 if (response_size) {
3805 list_context->list.splice(list_context->list.end(), response.entries);
3806 }
3807
3808 if (list_context->list.size() >= list_context->max_entries) {
3809 ldout(cct, 20) << " hit max, returning results so far, "
3810 << list_context->list << dendl;
3811 // release the listing context's budget once all
3812 // OPs (in the session) are finished
3813 put_nlist_context_budget(list_context);
3814 final_finish->complete(0);
3815 return;
3816 }
3817
3818 // continue!
3819 list_nobjects(list_context, final_finish);
3820 }
3821
3822 void Objecter::put_nlist_context_budget(NListContext *list_context)
3823 {
3824 if (list_context->ctx_budget >= 0) {
3825 ldout(cct, 10) << " release listing context's budget " <<
3826 list_context->ctx_budget << dendl;
3827 put_op_budget_bytes(list_context->ctx_budget);
3828 list_context->ctx_budget = -1;
3829 }
3830 }
3831
3832 // snapshots
3833
3834 int Objecter::create_pool_snap(int64_t pool, string& snap_name,
3835 Context *onfinish)
3836 {
3837 unique_lock wl(rwlock);
3838 ldout(cct, 10) << "create_pool_snap; pool: " << pool << "; snap: "
3839 << snap_name << dendl;
3840
3841 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3842 if (!p)
3843 return -EINVAL;
3844 if (p->snap_exists(snap_name.c_str()))
3845 return -EEXIST;
3846
3847 PoolOp *op = new PoolOp;
3848 if (!op)
3849 return -ENOMEM;
3850 op->tid = ++last_tid;
3851 op->pool = pool;
3852 op->name = snap_name;
3853 op->onfinish = onfinish;
3854 op->pool_op = POOL_OP_CREATE_SNAP;
3855 pool_ops[op->tid] = op;
3856
3857 pool_op_submit(op);
3858
3859 return 0;
3860 }
3861
3862 struct C_SelfmanagedSnap : public Context {
3863 bufferlist bl;
3864 snapid_t *psnapid;
3865 Context *fin;
3866 C_SelfmanagedSnap(snapid_t *ps, Context *f) : psnapid(ps), fin(f) {}
3867 void finish(int r) override {
3868 if (r == 0) {
3869 bufferlist::iterator p = bl.begin();
3870 ::decode(*psnapid, p);
3871 }
3872 fin->complete(r);
3873 }
3874 };
3875
3876 int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
3877 Context *onfinish)
3878 {
3879 unique_lock wl(rwlock);
3880 ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
3881 PoolOp *op = new PoolOp;
3882 if (!op) return -ENOMEM;
3883 op->tid = ++last_tid;
3884 op->pool = pool;
3885 C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
3886 op->onfinish = fin;
3887 op->blp = &fin->bl;
3888 op->pool_op = POOL_OP_CREATE_UNMANAGED_SNAP;
3889 pool_ops[op->tid] = op;
3890
3891 pool_op_submit(op);
3892 return 0;
3893 }
3894
3895 int Objecter::delete_pool_snap(int64_t pool, string& snap_name,
3896 Context *onfinish)
3897 {
3898 unique_lock wl(rwlock);
3899 ldout(cct, 10) << "delete_pool_snap; pool: " << pool << "; snap: "
3900 << snap_name << dendl;
3901
3902 const pg_pool_t *p = osdmap->get_pg_pool(pool);
3903 if (!p)
3904 return -EINVAL;
3905 if (!p->snap_exists(snap_name.c_str()))
3906 return -ENOENT;
3907
3908 PoolOp *op = new PoolOp;
3909 if (!op)
3910 return -ENOMEM;
3911 op->tid = ++last_tid;
3912 op->pool = pool;
3913 op->name = snap_name;
3914 op->onfinish = onfinish;
3915 op->pool_op = POOL_OP_DELETE_SNAP;
3916 pool_ops[op->tid] = op;
3917
3918 pool_op_submit(op);
3919
3920 return 0;
3921 }
3922
3923 int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3924 Context *onfinish)
3925 {
3926 unique_lock wl(rwlock);
3927 ldout(cct, 10) << "delete_selfmanaged_snap; pool: " << pool << "; snap: "
3928 << snap << dendl;
3929 PoolOp *op = new PoolOp;
3930 if (!op) return -ENOMEM;
3931 op->tid = ++last_tid;
3932 op->pool = pool;
3933 op->onfinish = onfinish;
3934 op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
3935 op->snapid = snap;
3936 pool_ops[op->tid] = op;
3937
3938 pool_op_submit(op);
3939
3940 return 0;
3941 }
3942
3943 int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
3944 int crush_rule)
3945 {
3946 unique_lock wl(rwlock);
3947 ldout(cct, 10) << "create_pool name=" << name << dendl;
3948
3949 if (osdmap->lookup_pg_pool_name(name) >= 0)
3950 return -EEXIST;
3951
3952 PoolOp *op = new PoolOp;
3953 if (!op)
3954 return -ENOMEM;
3955 op->tid = ++last_tid;
3956 op->pool = 0;
3957 op->name = name;
3958 op->onfinish = onfinish;
3959 op->pool_op = POOL_OP_CREATE;
3960 pool_ops[op->tid] = op;
3961 op->auid = auid;
3962 op->crush_rule = crush_rule;
3963
3964 pool_op_submit(op);
3965
3966 return 0;
3967 }
3968
3969 int Objecter::delete_pool(int64_t pool, Context *onfinish)
3970 {
3971 unique_lock wl(rwlock);
3972 ldout(cct, 10) << "delete_pool " << pool << dendl;
3973
3974 if (!osdmap->have_pg_pool(pool))
3975 return -ENOENT;
3976
3977 _do_delete_pool(pool, onfinish);
3978 return 0;
3979 }
3980
3981 int Objecter::delete_pool(const string &pool_name, Context *onfinish)
3982 {
3983 unique_lock wl(rwlock);
3984 ldout(cct, 10) << "delete_pool " << pool_name << dendl;
3985
3986 int64_t pool = osdmap->lookup_pg_pool_name(pool_name);
3987 if (pool < 0)
3988 return pool;
3989
3990 _do_delete_pool(pool, onfinish);
3991 return 0;
3992 }
3993
3994 void Objecter::_do_delete_pool(int64_t pool, Context *onfinish)
3995 {
3996 PoolOp *op = new PoolOp;
3997 op->tid = ++last_tid;
3998 op->pool = pool;
3999 op->name = "delete";
4000 op->onfinish = onfinish;
4001 op->pool_op = POOL_OP_DELETE;
4002 pool_ops[op->tid] = op;
4003 pool_op_submit(op);
4004 }
4005
4006 /**
4007 * change the auid owner of a pool by contacting the monitor.
4008 * This requires the current connection to have write permissions
4009 * on both the pool's current auid and the new (parameter) auid.
4010 * Uses the standard Context callback when done.
4011 */
4012 int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
4013 {
4014 unique_lock wl(rwlock);
4015 ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
4016 PoolOp *op = new PoolOp;
4017 if (!op) return -ENOMEM;
4018 op->tid = ++last_tid;
4019 op->pool = pool;
4020 op->name = "change_pool_auid";
4021 op->onfinish = onfinish;
4022 op->pool_op = POOL_OP_AUID_CHANGE;
4023 op->auid = auid;
4024 pool_ops[op->tid] = op;
4025
4026 logger->set(l_osdc_poolop_active, pool_ops.size());
4027
4028 pool_op_submit(op);
4029 return 0;
4030 }
4031
4032 void Objecter::pool_op_submit(PoolOp *op)
4033 {
4034 // rwlock is locked
4035 if (mon_timeout > timespan(0)) {
4036 op->ontimeout = timer.add_event(mon_timeout,
4037 [this, op]() {
4038 pool_op_cancel(op->tid, -ETIMEDOUT); });
4039 }
4040 _pool_op_submit(op);
4041 }
4042
4043 void Objecter::_pool_op_submit(PoolOp *op)
4044 {
4045 // rwlock is locked unique
4046
4047 ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
4048 MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
4049 op->name, op->pool_op,
4050 op->auid, last_seen_osdmap_version);
4051 if (op->snapid) m->snapid = op->snapid;
4052 if (op->crush_rule) m->crush_rule = op->crush_rule;
4053 monc->send_mon_message(m);
4054 op->last_submit = ceph::mono_clock::now();
4055
4056 logger->inc(l_osdc_poolop_send);
4057 }
4058
4059 /**
4060 * Handle a reply to a PoolOp message. Check that we sent the message
4061 * and give the caller responsibility for the returned bufferlist.
4062 * Then either call the finisher or stash the PoolOp, depending on if we
4063 * have a new enough map.
4064 * Lastly, clean up the message and PoolOp.
4065 */
4066 void Objecter::handle_pool_op_reply(MPoolOpReply *m)
4067 {
4068 FUNCTRACE();
4069 shunique_lock sul(rwlock, acquire_shared);
4070 if (!initialized) {
4071 sul.unlock();
4072 m->put();
4073 return;
4074 }
4075
4076 ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
4077 ceph_tid_t tid = m->get_tid();
4078 map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
4079 if (iter != pool_ops.end()) {
4080 PoolOp *op = iter->second;
4081 ldout(cct, 10) << "have request " << tid << " at " << op << " Op: "
4082 << ceph_pool_op_name(op->pool_op) << dendl;
4083 if (op->blp)
4084 op->blp->claim(m->response_data);
4085 if (m->version > last_seen_osdmap_version)
4086 last_seen_osdmap_version = m->version;
4087 if (osdmap->get_epoch() < m->epoch) {
4088 sul.unlock();
4089 sul.lock();
4090 // recheck op existence since we have let go of rwlock
4091 // (for promotion) above.
4092 iter = pool_ops.find(tid);
4093 if (iter == pool_ops.end())
4094 goto done; // op is gone.
4095 if (osdmap->get_epoch() < m->epoch) {
4096 ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch
4097 << " before calling back" << dendl;
4098 _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
4099 } else {
4100 // map epoch changed, probably because a MOSDMap message
4101 // sneaked in. Do caller-specified callback now or else
4102 // we lose it forever.
4103 assert(op->onfinish);
4104 op->onfinish->complete(m->replyCode);
4105 }
4106 } else {
4107 assert(op->onfinish);
4108 op->onfinish->complete(m->replyCode);
4109 }
4110 op->onfinish = NULL;
4111 if (!sul.owns_lock()) {
4112 sul.unlock();
4113 sul.lock();
4114 }
4115 iter = pool_ops.find(tid);
4116 if (iter != pool_ops.end()) {
4117 _finish_pool_op(op, 0);
4118 }
4119 } else {
4120 ldout(cct, 10) << "unknown request " << tid << dendl;
4121 }
4122
4123 done:
4124 // Not strictly necessary, since we'll release it on return.
4125 sul.unlock();
4126
4127 ldout(cct, 10) << "done" << dendl;
4128 m->put();
4129 }
4130
4131 int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
4132 {
4133 assert(initialized);
4134
4135 unique_lock wl(rwlock);
4136
4137 map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
4138 if (it == pool_ops.end()) {
4139 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4140 return -ENOENT;
4141 }
4142
4143 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4144
4145 PoolOp *op = it->second;
4146 if (op->onfinish)
4147 op->onfinish->complete(r);
4148
4149 _finish_pool_op(op, r);
4150 return 0;
4151 }
4152
4153 void Objecter::_finish_pool_op(PoolOp *op, int r)
4154 {
4155 // rwlock is locked unique
4156 pool_ops.erase(op->tid);
4157 logger->set(l_osdc_poolop_active, pool_ops.size());
4158
4159 if (op->ontimeout && r != -ETIMEDOUT) {
4160 timer.cancel_event(op->ontimeout);
4161 }
4162
4163 delete op;
4164 }
4165
4166 // pool stats
4167
4168 void Objecter::get_pool_stats(list<string>& pools,
4169 map<string,pool_stat_t> *result,
4170 Context *onfinish)
4171 {
4172 ldout(cct, 10) << "get_pool_stats " << pools << dendl;
4173
4174 PoolStatOp *op = new PoolStatOp;
4175 op->tid = ++last_tid;
4176 op->pools = pools;
4177 op->pool_stats = result;
4178 op->onfinish = onfinish;
4179 if (mon_timeout > timespan(0)) {
4180 op->ontimeout = timer.add_event(mon_timeout,
4181 [this, op]() {
4182 pool_stat_op_cancel(op->tid,
4183 -ETIMEDOUT); });
4184 } else {
4185 op->ontimeout = 0;
4186 }
4187
4188 unique_lock wl(rwlock);
4189
4190 poolstat_ops[op->tid] = op;
4191
4192 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4193
4194 _poolstat_submit(op);
4195 }
4196
4197 void Objecter::_poolstat_submit(PoolStatOp *op)
4198 {
4199 ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
4200 monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid,
4201 op->pools,
4202 last_seen_pgmap_version));
4203 op->last_submit = ceph::mono_clock::now();
4204
4205 logger->inc(l_osdc_poolstat_send);
4206 }
4207
4208 void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
4209 {
4210 ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
4211 ceph_tid_t tid = m->get_tid();
4212
4213 unique_lock wl(rwlock);
4214 if (!initialized) {
4215 m->put();
4216 return;
4217 }
4218
4219 map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
4220 if (iter != poolstat_ops.end()) {
4221 PoolStatOp *op = poolstat_ops[tid];
4222 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4223 *op->pool_stats = m->pool_stats;
4224 if (m->version > last_seen_pgmap_version) {
4225 last_seen_pgmap_version = m->version;
4226 }
4227 op->onfinish->complete(0);
4228 _finish_pool_stat_op(op, 0);
4229 } else {
4230 ldout(cct, 10) << "unknown request " << tid << dendl;
4231 }
4232 ldout(cct, 10) << "done" << dendl;
4233 m->put();
4234 }
4235
4236 int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
4237 {
4238 assert(initialized);
4239
4240 unique_lock wl(rwlock);
4241
4242 map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
4243 if (it == poolstat_ops.end()) {
4244 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4245 return -ENOENT;
4246 }
4247
4248 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4249
4250 PoolStatOp *op = it->second;
4251 if (op->onfinish)
4252 op->onfinish->complete(r);
4253 _finish_pool_stat_op(op, r);
4254 return 0;
4255 }
4256
4257 void Objecter::_finish_pool_stat_op(PoolStatOp *op, int r)
4258 {
4259 // rwlock is locked unique
4260
4261 poolstat_ops.erase(op->tid);
4262 logger->set(l_osdc_poolstat_active, poolstat_ops.size());
4263
4264 if (op->ontimeout && r != -ETIMEDOUT)
4265 timer.cancel_event(op->ontimeout);
4266
4267 delete op;
4268 }
4269
4270 void Objecter::get_fs_stats(ceph_statfs& result,
4271 boost::optional<int64_t> data_pool,
4272 Context *onfinish)
4273 {
4274 ldout(cct, 10) << "get_fs_stats" << dendl;
4275 unique_lock l(rwlock);
4276
4277 StatfsOp *op = new StatfsOp;
4278 op->tid = ++last_tid;
4279 op->stats = &result;
4280 op->data_pool = data_pool;
4281 op->onfinish = onfinish;
4282 if (mon_timeout > timespan(0)) {
4283 op->ontimeout = timer.add_event(mon_timeout,
4284 [this, op]() {
4285 statfs_op_cancel(op->tid,
4286 -ETIMEDOUT); });
4287 } else {
4288 op->ontimeout = 0;
4289 }
4290 statfs_ops[op->tid] = op;
4291
4292 logger->set(l_osdc_statfs_active, statfs_ops.size());
4293
4294 _fs_stats_submit(op);
4295 }
4296
4297 void Objecter::_fs_stats_submit(StatfsOp *op)
4298 {
4299 // rwlock is locked unique
4300
4301 ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
4302 monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid,
4303 op->data_pool,
4304 last_seen_pgmap_version));
4305 op->last_submit = ceph::mono_clock::now();
4306
4307 logger->inc(l_osdc_statfs_send);
4308 }
4309
4310 void Objecter::handle_fs_stats_reply(MStatfsReply *m)
4311 {
4312 unique_lock wl(rwlock);
4313 if (!initialized) {
4314 m->put();
4315 return;
4316 }
4317
4318 ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
4319 ceph_tid_t tid = m->get_tid();
4320
4321 if (statfs_ops.count(tid)) {
4322 StatfsOp *op = statfs_ops[tid];
4323 ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
4324 *(op->stats) = m->h.st;
4325 if (m->h.version > last_seen_pgmap_version)
4326 last_seen_pgmap_version = m->h.version;
4327 op->onfinish->complete(0);
4328 _finish_statfs_op(op, 0);
4329 } else {
4330 ldout(cct, 10) << "unknown request " << tid << dendl;
4331 }
4332 m->put();
4333 ldout(cct, 10) << "done" << dendl;
4334 }
4335
4336 int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
4337 {
4338 assert(initialized);
4339
4340 unique_lock wl(rwlock);
4341
4342 map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
4343 if (it == statfs_ops.end()) {
4344 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4345 return -ENOENT;
4346 }
4347
4348 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4349
4350 StatfsOp *op = it->second;
4351 if (op->onfinish)
4352 op->onfinish->complete(r);
4353 _finish_statfs_op(op, r);
4354 return 0;
4355 }
4356
4357 void Objecter::_finish_statfs_op(StatfsOp *op, int r)
4358 {
4359 // rwlock is locked unique
4360
4361 statfs_ops.erase(op->tid);
4362 logger->set(l_osdc_statfs_active, statfs_ops.size());
4363
4364 if (op->ontimeout && r != -ETIMEDOUT)
4365 timer.cancel_event(op->ontimeout);
4366
4367 delete op;
4368 }
4369
4370 // scatter/gather
4371
4372 void Objecter::_sg_read_finish(vector<ObjectExtent>& extents,
4373 vector<bufferlist>& resultbl,
4374 bufferlist *bl, Context *onfinish)
4375 {
4376 // all done
4377 ldout(cct, 15) << "_sg_read_finish" << dendl;
4378
4379 if (extents.size() > 1) {
4380 Striper::StripedReadResult r;
4381 vector<bufferlist>::iterator bit = resultbl.begin();
4382 for (vector<ObjectExtent>::iterator eit = extents.begin();
4383 eit != extents.end();
4384 ++eit, ++bit) {
4385 r.add_partial_result(cct, *bit, eit->buffer_extents);
4386 }
4387 bl->clear();
4388 r.assemble_result(cct, *bl, false);
4389 } else {
4390 ldout(cct, 15) << " only one frag" << dendl;
4391 bl->claim(resultbl[0]);
4392 }
4393
4394 // done
4395 uint64_t bytes_read = bl->length();
4396 ldout(cct, 7) << "_sg_read_finish " << bytes_read << " bytes" << dendl;
4397
4398 if (onfinish) {
4399 onfinish->complete(bytes_read);// > 0 ? bytes_read:m->get_result());
4400 }
4401 }
4402
4403
4404 void Objecter::ms_handle_connect(Connection *con)
4405 {
4406 ldout(cct, 10) << "ms_handle_connect " << con << dendl;
4407 if (!initialized)
4408 return;
4409
4410 if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
4411 resend_mon_ops();
4412 }
4413
4414 bool Objecter::ms_handle_reset(Connection *con)
4415 {
4416 if (!initialized)
4417 return false;
4418 if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
4419 unique_lock wl(rwlock);
4420
4421 OSDSession *session = static_cast<OSDSession*>(con->get_priv());
4422 if (session) {
4423 ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
4424 << " osd." << session->osd << dendl;
4425 if (!initialized) {
4426 wl.unlock();
4427 return false;
4428 }
4429 map<uint64_t, LingerOp *> lresend;
4430 OSDSession::unique_lock sl(session->lock);
4431 _reopen_session(session);
4432 _kick_requests(session, lresend);
4433 sl.unlock();
4434 _linger_ops_resend(lresend, wl);
4435 wl.unlock();
4436 maybe_request_map();
4437 session->put();
4438 }
4439 return true;
4440 }
4441 return false;
4442 }
4443
4444 void Objecter::ms_handle_remote_reset(Connection *con)
4445 {
4446 /*
4447 * treat these the same.
4448 */
4449 ms_handle_reset(con);
4450 }
4451
4452 bool Objecter::ms_handle_refused(Connection *con)
4453 {
4454 // just log for now
4455 if (osdmap && (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD)) {
4456 int osd = osdmap->identify_osd(con->get_peer_addr());
4457 if (osd >= 0) {
4458 ldout(cct, 1) << "ms_handle_refused on osd." << osd << dendl;
4459 }
4460 }
4461 return false;
4462 }
4463
4464 bool Objecter::ms_get_authorizer(int dest_type,
4465 AuthAuthorizer **authorizer,
4466 bool force_new)
4467 {
4468 if (!initialized)
4469 return false;
4470 if (dest_type == CEPH_ENTITY_TYPE_MON)
4471 return true;
4472 *authorizer = monc->build_authorizer(dest_type);
4473 return *authorizer != NULL;
4474 }
4475
4476 void Objecter::op_target_t::dump(Formatter *f) const
4477 {
4478 f->dump_stream("pg") << pgid;
4479 f->dump_int("osd", osd);
4480 f->dump_stream("object_id") << base_oid;
4481 f->dump_stream("object_locator") << base_oloc;
4482 f->dump_stream("target_object_id") << target_oid;
4483 f->dump_stream("target_object_locator") << target_oloc;
4484 f->dump_int("paused", (int)paused);
4485 f->dump_int("used_replica", (int)used_replica);
4486 f->dump_int("precalc_pgid", (int)precalc_pgid);
4487 }
4488
4489 void Objecter::_dump_active(OSDSession *s)
4490 {
4491 for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
4492 p != s->ops.end();
4493 ++p) {
4494 Op *op = p->second;
4495 ldout(cct, 20) << op->tid << "\t" << op->target.pgid
4496 << "\tosd." << (op->session ? op->session->osd : -1)
4497 << "\t" << op->target.base_oid
4498 << "\t" << op->ops << dendl;
4499 }
4500 }
4501
4502 void Objecter::_dump_active()
4503 {
4504 ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless"
4505 << dendl;
4506 for (map<int, OSDSession *>::iterator siter = osd_sessions.begin();
4507 siter != osd_sessions.end(); ++siter) {
4508 OSDSession *s = siter->second;
4509 OSDSession::shared_lock sl(s->lock);
4510 _dump_active(s);
4511 sl.unlock();
4512 }
4513 _dump_active(homeless_session);
4514 }
4515
4516 void Objecter::dump_active()
4517 {
4518 shared_lock rl(rwlock);
4519 _dump_active();
4520 rl.unlock();
4521 }
4522
4523 void Objecter::dump_requests(Formatter *fmt)
4524 {
4525 // Read-lock on Objecter held here
4526 fmt->open_object_section("requests");
4527 dump_ops(fmt);
4528 dump_linger_ops(fmt);
4529 dump_pool_ops(fmt);
4530 dump_pool_stat_ops(fmt);
4531 dump_statfs_ops(fmt);
4532 dump_command_ops(fmt);
4533 fmt->close_section(); // requests object
4534 }
4535
4536 void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
4537 {
4538 for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
4539 p != s->ops.end();
4540 ++p) {
4541 Op *op = p->second;
4542 fmt->open_object_section("op");
4543 fmt->dump_unsigned("tid", op->tid);
4544 op->target.dump(fmt);
4545 fmt->dump_stream("last_sent") << op->stamp;
4546 fmt->dump_int("attempts", op->attempts);
4547 fmt->dump_stream("snapid") << op->snapid;
4548 fmt->dump_stream("snap_context") << op->snapc;
4549 fmt->dump_stream("mtime") << op->mtime;
4550
4551 fmt->open_array_section("osd_ops");
4552 for (vector<OSDOp>::const_iterator it = op->ops.begin();
4553 it != op->ops.end();
4554 ++it) {
4555 fmt->dump_stream("osd_op") << *it;
4556 }
4557 fmt->close_section(); // osd_ops array
4558
4559 fmt->close_section(); // op object
4560 }
4561 }
4562
4563 void Objecter::dump_ops(Formatter *fmt)
4564 {
4565 // Read-lock on Objecter held
4566 fmt->open_array_section("ops");
4567 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4568 siter != osd_sessions.end(); ++siter) {
4569 OSDSession *s = siter->second;
4570 OSDSession::shared_lock sl(s->lock);
4571 _dump_ops(s, fmt);
4572 sl.unlock();
4573 }
4574 _dump_ops(homeless_session, fmt);
4575 fmt->close_section(); // ops array
4576 }
4577
4578 void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
4579 {
4580 for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
4581 p != s->linger_ops.end();
4582 ++p) {
4583 LingerOp *op = p->second;
4584 fmt->open_object_section("linger_op");
4585 fmt->dump_unsigned("linger_id", op->linger_id);
4586 op->target.dump(fmt);
4587 fmt->dump_stream("snapid") << op->snap;
4588 fmt->dump_stream("registered") << op->registered;
4589 fmt->close_section(); // linger_op object
4590 }
4591 }
4592
4593 void Objecter::dump_linger_ops(Formatter *fmt)
4594 {
4595 // We have a read-lock on the objecter
4596 fmt->open_array_section("linger_ops");
4597 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4598 siter != osd_sessions.end(); ++siter) {
4599 OSDSession *s = siter->second;
4600 OSDSession::shared_lock sl(s->lock);
4601 _dump_linger_ops(s, fmt);
4602 sl.unlock();
4603 }
4604 _dump_linger_ops(homeless_session, fmt);
4605 fmt->close_section(); // linger_ops array
4606 }
4607
4608 void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
4609 {
4610 for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
4611 p != s->command_ops.end();
4612 ++p) {
4613 CommandOp *op = p->second;
4614 fmt->open_object_section("command_op");
4615 fmt->dump_unsigned("command_id", op->tid);
4616 fmt->dump_int("osd", op->session ? op->session->osd : -1);
4617 fmt->open_array_section("command");
4618 for (vector<string>::const_iterator q = op->cmd.begin();
4619 q != op->cmd.end(); ++q)
4620 fmt->dump_string("word", *q);
4621 fmt->close_section();
4622 if (op->target_osd >= 0)
4623 fmt->dump_int("target_osd", op->target_osd);
4624 else
4625 fmt->dump_stream("target_pg") << op->target_pg;
4626 fmt->close_section(); // command_op object
4627 }
4628 }
4629
4630 void Objecter::dump_command_ops(Formatter *fmt)
4631 {
4632 // We have a read-lock on the Objecter here
4633 fmt->open_array_section("command_ops");
4634 for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin();
4635 siter != osd_sessions.end(); ++siter) {
4636 OSDSession *s = siter->second;
4637 OSDSession::shared_lock sl(s->lock);
4638 _dump_command_ops(s, fmt);
4639 sl.unlock();
4640 }
4641 _dump_command_ops(homeless_session, fmt);
4642 fmt->close_section(); // command_ops array
4643 }
4644
4645 void Objecter::dump_pool_ops(Formatter *fmt) const
4646 {
4647 fmt->open_array_section("pool_ops");
4648 for (map<ceph_tid_t, PoolOp*>::const_iterator p = pool_ops.begin();
4649 p != pool_ops.end();
4650 ++p) {
4651 PoolOp *op = p->second;
4652 fmt->open_object_section("pool_op");
4653 fmt->dump_unsigned("tid", op->tid);
4654 fmt->dump_int("pool", op->pool);
4655 fmt->dump_string("name", op->name);
4656 fmt->dump_int("operation_type", op->pool_op);
4657 fmt->dump_unsigned("auid", op->auid);
4658 fmt->dump_unsigned("crush_rule", op->crush_rule);
4659 fmt->dump_stream("snapid") << op->snapid;
4660 fmt->dump_stream("last_sent") << op->last_submit;
4661 fmt->close_section(); // pool_op object
4662 }
4663 fmt->close_section(); // pool_ops array
4664 }
4665
4666 void Objecter::dump_pool_stat_ops(Formatter *fmt) const
4667 {
4668 fmt->open_array_section("pool_stat_ops");
4669 for (map<ceph_tid_t, PoolStatOp*>::const_iterator p = poolstat_ops.begin();
4670 p != poolstat_ops.end();
4671 ++p) {
4672 PoolStatOp *op = p->second;
4673 fmt->open_object_section("pool_stat_op");
4674 fmt->dump_unsigned("tid", op->tid);
4675 fmt->dump_stream("last_sent") << op->last_submit;
4676
4677 fmt->open_array_section("pools");
4678 for (list<string>::const_iterator it = op->pools.begin();
4679 it != op->pools.end();
4680 ++it) {
4681 fmt->dump_string("pool", *it);
4682 }
4683 fmt->close_section(); // pools array
4684
4685 fmt->close_section(); // pool_stat_op object
4686 }
4687 fmt->close_section(); // pool_stat_ops array
4688 }
4689
4690 void Objecter::dump_statfs_ops(Formatter *fmt) const
4691 {
4692 fmt->open_array_section("statfs_ops");
4693 for (map<ceph_tid_t, StatfsOp*>::const_iterator p = statfs_ops.begin();
4694 p != statfs_ops.end();
4695 ++p) {
4696 StatfsOp *op = p->second;
4697 fmt->open_object_section("statfs_op");
4698 fmt->dump_unsigned("tid", op->tid);
4699 fmt->dump_stream("last_sent") << op->last_submit;
4700 fmt->close_section(); // statfs_op object
4701 }
4702 fmt->close_section(); // statfs_ops array
4703 }
4704
4705 Objecter::RequestStateHook::RequestStateHook(Objecter *objecter) :
4706 m_objecter(objecter)
4707 {
4708 }
4709
4710 bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
4711 std::string format, bufferlist& out)
4712 {
4713 Formatter *f = Formatter::create(format, "json-pretty", "json-pretty");
4714 shared_lock rl(m_objecter->rwlock);
4715 m_objecter->dump_requests(f);
4716 f->flush(out);
4717 delete f;
4718 return true;
4719 }
4720
4721 void Objecter::blacklist_self(bool set)
4722 {
4723 ldout(cct, 10) << "blacklist_self " << (set ? "add" : "rm") << dendl;
4724
4725 vector<string> cmd;
4726 cmd.push_back("{\"prefix\":\"osd blacklist\", ");
4727 if (set)
4728 cmd.push_back("\"blacklistop\":\"add\",");
4729 else
4730 cmd.push_back("\"blacklistop\":\"rm\",");
4731 stringstream ss;
4732 ss << messenger->get_myaddr();
4733 cmd.push_back("\"addr\":\"" + ss.str() + "\"");
4734
4735 MMonCommand *m = new MMonCommand(monc->get_fsid());
4736 m->cmd = cmd;
4737
4738 monc->send_mon_message(m);
4739 }
4740
4741 // commands
4742
4743 void Objecter::handle_command_reply(MCommandReply *m)
4744 {
4745 unique_lock wl(rwlock);
4746 if (!initialized) {
4747 m->put();
4748 return;
4749 }
4750
4751 ConnectionRef con = m->get_connection();
4752 OSDSession *s = static_cast<OSDSession*>(con->get_priv());
4753 if (!s || s->con != con) {
4754 ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
4755 m->put();
4756 if (s)
4757 s->put();
4758 return;
4759 }
4760
4761 OSDSession::shared_lock sl(s->lock);
4762 map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
4763 if (p == s->command_ops.end()) {
4764 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4765 << " not found" << dendl;
4766 m->put();
4767 sl.unlock();
4768 if (s)
4769 s->put();
4770 return;
4771 }
4772
4773 CommandOp *c = p->second;
4774 if (!c->session ||
4775 m->get_connection() != c->session->con) {
4776 ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
4777 << " got reply from wrong connection "
4778 << m->get_connection() << " " << m->get_source_inst()
4779 << dendl;
4780 m->put();
4781 sl.unlock();
4782 if (s)
4783 s->put();
4784 return;
4785 }
4786 if (c->poutbl) {
4787 c->poutbl->claim(m->get_data());
4788 }
4789
4790 sl.unlock();
4791
4792 OSDSession::unique_lock sul(s->lock);
4793 _finish_command(c, m->r, m->rs);
4794 sul.unlock();
4795
4796 m->put();
4797 if (s)
4798 s->put();
4799 }
4800
4801 void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
4802 {
4803 shunique_lock sul(rwlock, ceph::acquire_unique);
4804
4805 ceph_tid_t tid = ++last_tid;
4806 ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
4807 c->tid = tid;
4808
4809 {
4810 OSDSession::unique_lock hs_wl(homeless_session->lock);
4811 _session_command_op_assign(homeless_session, c);
4812 }
4813
4814 _calc_command_target(c, sul);
4815 _assign_command_session(c, sul);
4816 if (osd_timeout > timespan(0)) {
4817 c->ontimeout = timer.add_event(osd_timeout,
4818 [this, c, tid]() {
4819 command_op_cancel(c->session, tid,
4820 -ETIMEDOUT); });
4821 }
4822
4823 if (!c->session->is_homeless()) {
4824 _send_command(c);
4825 } else {
4826 _maybe_request_map();
4827 }
4828 if (c->map_check_error)
4829 _send_command_map_check(c);
4830 *ptid = tid;
4831
4832 logger->inc(l_osdc_command_active);
4833 }
4834
4835 int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
4836 {
4837 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4838
4839 c->map_check_error = 0;
4840
4841 // ignore overlays, just like we do with pg ops
4842 c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
4843
4844 if (c->target_osd >= 0) {
4845 if (!osdmap->exists(c->target_osd)) {
4846 c->map_check_error = -ENOENT;
4847 c->map_check_error_str = "osd dne";
4848 c->target.osd = -1;
4849 return RECALC_OP_TARGET_OSD_DNE;
4850 }
4851 if (osdmap->is_down(c->target_osd)) {
4852 c->map_check_error = -ENXIO;
4853 c->map_check_error_str = "osd down";
4854 c->target.osd = -1;
4855 return RECALC_OP_TARGET_OSD_DOWN;
4856 }
4857 c->target.osd = c->target_osd;
4858 } else {
4859 int ret = _calc_target(&(c->target), nullptr, true);
4860 if (ret == RECALC_OP_TARGET_POOL_DNE) {
4861 c->map_check_error = -ENOENT;
4862 c->map_check_error_str = "pool dne";
4863 c->target.osd = -1;
4864 return ret;
4865 } else if (ret == RECALC_OP_TARGET_OSD_DOWN) {
4866 c->map_check_error = -ENXIO;
4867 c->map_check_error_str = "osd down";
4868 c->target.osd = -1;
4869 return ret;
4870 }
4871 }
4872
4873 OSDSession *s;
4874 int r = _get_session(c->target.osd, &s, sul);
4875 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4876
4877 if (c->session != s) {
4878 put_session(s);
4879 return RECALC_OP_TARGET_NEED_RESEND;
4880 }
4881
4882 put_session(s);
4883
4884 ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, "
4885 << c->session << dendl;
4886
4887 return RECALC_OP_TARGET_NO_ACTION;
4888 }
4889
4890 void Objecter::_assign_command_session(CommandOp *c,
4891 shunique_lock& sul)
4892 {
4893 assert(sul.owns_lock() && sul.mutex() == &rwlock);
4894
4895 OSDSession *s;
4896 int r = _get_session(c->target.osd, &s, sul);
4897 assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
4898
4899 if (c->session != s) {
4900 if (c->session) {
4901 OSDSession *cs = c->session;
4902 OSDSession::unique_lock csl(cs->lock);
4903 _session_command_op_remove(c->session, c);
4904 csl.unlock();
4905 }
4906 OSDSession::unique_lock sl(s->lock);
4907 _session_command_op_assign(s, c);
4908 }
4909
4910 put_session(s);
4911 }
4912
4913 void Objecter::_send_command(CommandOp *c)
4914 {
4915 ldout(cct, 10) << "_send_command " << c->tid << dendl;
4916 assert(c->session);
4917 assert(c->session->con);
4918 MCommand *m = new MCommand(monc->monmap.fsid);
4919 m->cmd = c->cmd;
4920 m->set_data(c->inbl);
4921 m->set_tid(c->tid);
4922 c->session->con->send_message(m);
4923 logger->inc(l_osdc_command_send);
4924 }
4925
4926 int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
4927 {
4928 assert(initialized);
4929
4930 unique_lock wl(rwlock);
4931
4932 map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
4933 if (it == s->command_ops.end()) {
4934 ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
4935 return -ENOENT;
4936 }
4937
4938 ldout(cct, 10) << __func__ << " tid " << tid << dendl;
4939
4940 CommandOp *op = it->second;
4941 _command_cancel_map_check(op);
4942 OSDSession::unique_lock sl(op->session->lock);
4943 _finish_command(op, r, "");
4944 sl.unlock();
4945 return 0;
4946 }
4947
4948 void Objecter::_finish_command(CommandOp *c, int r, string rs)
4949 {
4950 // rwlock is locked unique
4951 // session lock is locked
4952
4953 ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " "
4954 << rs << dendl;
4955 if (c->prs)
4956 *c->prs = rs;
4957 if (c->onfinish)
4958 c->onfinish->complete(r);
4959
4960 if (c->ontimeout && r != -ETIMEDOUT)
4961 timer.cancel_event(c->ontimeout);
4962
4963 OSDSession *s = c->session;
4964 _session_command_op_remove(c->session, c);
4965
4966 c->put();
4967
4968 logger->dec(l_osdc_command_active);
4969 }
4970
4971 Objecter::OSDSession::~OSDSession()
4972 {
4973 // Caller is responsible for re-assigning or
4974 // destroying any ops that were assigned to us
4975 assert(ops.empty());
4976 assert(linger_ops.empty());
4977 assert(command_ops.empty());
4978 }
4979
4980 Objecter::~Objecter()
4981 {
4982 delete osdmap;
4983
4984 assert(homeless_session->get_nref() == 1);
4985 assert(num_homeless_ops == 0);
4986 homeless_session->put();
4987
4988 assert(osd_sessions.empty());
4989 assert(poolstat_ops.empty());
4990 assert(statfs_ops.empty());
4991 assert(pool_ops.empty());
4992 assert(waiting_for_map.empty());
4993 assert(linger_ops.empty());
4994 assert(check_latest_map_lingers.empty());
4995 assert(check_latest_map_ops.empty());
4996 assert(check_latest_map_commands.empty());
4997
4998 assert(!m_request_state_hook);
4999 assert(!logger);
5000 }
5001
5002 /**
5003 * Wait until this OSD map epoch is received before
5004 * sending any more operations to OSDs. Use this
5005 * when it is known that the client can't trust
5006 * anything from before this epoch (e.g. due to
5007 * client blacklist at this epoch).
5008 */
5009 void Objecter::set_epoch_barrier(epoch_t epoch)
5010 {
5011 unique_lock wl(rwlock);
5012
5013 ldout(cct, 7) << __func__ << ": barrier " << epoch << " (was "
5014 << epoch_barrier << ") current epoch " << osdmap->get_epoch()
5015 << dendl;
5016 if (epoch > epoch_barrier) {
5017 epoch_barrier = epoch;
5018 _maybe_request_map();
5019 }
5020 }
5021
5022
5023
5024 hobject_t Objecter::enumerate_objects_begin()
5025 {
5026 return hobject_t();
5027 }
5028
5029 hobject_t Objecter::enumerate_objects_end()
5030 {
5031 return hobject_t::get_max();
5032 }
5033
5034 struct C_EnumerateReply : public Context {
5035 bufferlist bl;
5036
5037 Objecter *objecter;
5038 hobject_t *next;
5039 std::list<librados::ListObjectImpl> *result;
5040 const hobject_t end;
5041 const int64_t pool_id;
5042 Context *on_finish;
5043
5044 epoch_t epoch;
5045 int budget;
5046
5047 C_EnumerateReply(Objecter *objecter_, hobject_t *next_,
5048 std::list<librados::ListObjectImpl> *result_,
5049 const hobject_t end_, const int64_t pool_id_, Context *on_finish_) :
5050 objecter(objecter_), next(next_), result(result_),
5051 end(end_), pool_id(pool_id_), on_finish(on_finish_),
5052 epoch(0), budget(0)
5053 {}
5054
5055 void finish(int r) override {
5056 objecter->_enumerate_reply(
5057 bl, r, end, pool_id, budget, epoch, result, next, on_finish);
5058 }
5059 };
5060
5061 void Objecter::enumerate_objects(
5062 int64_t pool_id,
5063 const std::string &ns,
5064 const hobject_t &start,
5065 const hobject_t &end,
5066 const uint32_t max,
5067 const bufferlist &filter_bl,
5068 std::list<librados::ListObjectImpl> *result,
5069 hobject_t *next,
5070 Context *on_finish)
5071 {
5072 assert(result);
5073
5074 if (!end.is_max() && start > end) {
5075 lderr(cct) << __func__ << ": start " << start << " > end " << end << dendl;
5076 on_finish->complete(-EINVAL);
5077 return;
5078 }
5079
5080 if (max < 1) {
5081 lderr(cct) << __func__ << ": result size may not be zero" << dendl;
5082 on_finish->complete(-EINVAL);
5083 return;
5084 }
5085
5086 if (start.is_max()) {
5087 on_finish->complete(0);
5088 return;
5089 }
5090
5091 shared_lock rl(rwlock);
5092 assert(osdmap->get_epoch());
5093 if (!osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
5094 rl.unlock();
5095 lderr(cct) << __func__ << ": SORTBITWISE cluster flag not set" << dendl;
5096 on_finish->complete(-EOPNOTSUPP);
5097 return;
5098 }
5099 const pg_pool_t *p = osdmap->get_pg_pool(pool_id);
5100 if (!p) {
5101 lderr(cct) << __func__ << ": pool " << pool_id << " DNE in osd epoch "
5102 << osdmap->get_epoch() << dendl;
5103 rl.unlock();
5104 on_finish->complete(-ENOENT);
5105 return;
5106 } else {
5107 rl.unlock();
5108 }
5109
5110 ldout(cct, 20) << __func__ << ": start=" << start << " end=" << end << dendl;
5111
5112 // Stash completion state
5113 C_EnumerateReply *on_ack = new C_EnumerateReply(
5114 this, next, result, end, pool_id, on_finish);
5115
5116 ObjectOperation op;
5117 op.pg_nls(max, filter_bl, start, 0);
5118
5119 // Issue. See you later in _enumerate_reply
5120 object_locator_t oloc(pool_id, ns);
5121 pg_read(start.get_hash(), oloc, op,
5122 &on_ack->bl, 0, on_ack, &on_ack->epoch, &on_ack->budget);
5123 }
5124
5125 void Objecter::_enumerate_reply(
5126 bufferlist &bl,
5127 int r,
5128 const hobject_t &end,
5129 const int64_t pool_id,
5130 int budget,
5131 epoch_t reply_epoch,
5132 std::list<librados::ListObjectImpl> *result,
5133 hobject_t *next,
5134 Context *on_finish)
5135 {
5136 if (budget > 0) {
5137 put_op_budget_bytes(budget);
5138 }
5139
5140 if (r < 0) {
5141 ldout(cct, 4) << __func__ << ": remote error " << r << dendl;
5142 on_finish->complete(r);
5143 return;
5144 }
5145
5146 assert(next != NULL);
5147
5148 // Decode the results
5149 bufferlist::iterator iter = bl.begin();
5150 pg_nls_response_t response;
5151
5152 // XXX extra_info doesn't seem used anywhere?
5153 bufferlist extra_info;
5154 ::decode(response, iter);
5155 if (!iter.end()) {
5156 ::decode(extra_info, iter);
5157 }
5158
5159 ldout(cct, 10) << __func__ << ": got " << response.entries.size()
5160 << " handle " << response.handle
5161 << " reply_epoch " << reply_epoch << dendl;
5162 ldout(cct, 20) << __func__ << ": response.entries.size "
5163 << response.entries.size() << ", response.entries "
5164 << response.entries << dendl;
5165 if (response.handle <= end) {
5166 *next = response.handle;
5167 } else {
5168 ldout(cct, 10) << __func__ << ": adjusted next down to end " << end
5169 << dendl;
5170 *next = end;
5171
5172 // drop anything after 'end'
5173 shared_lock rl(rwlock);
5174 const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
5175 if (!pool) {
5176 // pool is gone, drop any results which are now meaningless.
5177 rl.unlock();
5178 on_finish->complete(-ENOENT);
5179 return;
5180 }
5181 while (!response.entries.empty()) {
5182 uint32_t hash = response.entries.back().locator.empty() ?
5183 pool->hash_key(response.entries.back().oid,
5184 response.entries.back().nspace) :
5185 pool->hash_key(response.entries.back().locator,
5186 response.entries.back().nspace);
5187 hobject_t last(response.entries.back().oid,
5188 response.entries.back().locator,
5189 CEPH_NOSNAP,
5190 hash,
5191 pool_id,
5192 response.entries.back().nspace);
5193 if (last < end)
5194 break;
5195 ldout(cct, 20) << __func__ << " dropping item " << last
5196 << " >= end " << end << dendl;
5197 response.entries.pop_back();
5198 }
5199 rl.unlock();
5200 }
5201 if (!response.entries.empty()) {
5202 result->merge(response.entries);
5203 }
5204
5205 // release the listing context's budget once all
5206 // OPs (in the session) are finished
5207 #if 0
5208 put_nlist_context_budget(list_context);
5209 #endif
5210 on_finish->complete(r);
5211 return;
5212 }
5213
5214 namespace {
5215 using namespace librados;
5216
5217 template <typename T>
5218 void do_decode(std::vector<T>& items, std::vector<bufferlist>& bls)
5219 {
5220 for (auto bl : bls) {
5221 auto p = bl.begin();
5222 T t;
5223 decode(t, p);
5224 items.push_back(t);
5225 }
5226 }
5227
5228 struct C_ObjectOperation_scrub_ls : public Context {
5229 bufferlist bl;
5230 uint32_t *interval;
5231 std::vector<inconsistent_obj_t> *objects = nullptr;
5232 std::vector<inconsistent_snapset_t> *snapsets = nullptr;
5233 int *rval;
5234
5235 C_ObjectOperation_scrub_ls(uint32_t *interval,
5236 std::vector<inconsistent_obj_t> *objects,
5237 int *rval)
5238 : interval(interval), objects(objects), rval(rval) {}
5239 C_ObjectOperation_scrub_ls(uint32_t *interval,
5240 std::vector<inconsistent_snapset_t> *snapsets,
5241 int *rval)
5242 : interval(interval), snapsets(snapsets), rval(rval) {}
5243 void finish(int r) override {
5244 if (r < 0 && r != -EAGAIN) {
5245 if (rval)
5246 *rval = r;
5247 return;
5248 }
5249
5250 if (rval)
5251 *rval = 0;
5252
5253 try {
5254 decode();
5255 } catch (buffer::error&) {
5256 if (rval)
5257 *rval = -EIO;
5258 }
5259 }
5260 private:
5261 void decode() {
5262 scrub_ls_result_t result;
5263 auto p = bl.begin();
5264 result.decode(p);
5265 *interval = result.interval;
5266 if (objects) {
5267 do_decode(*objects, result.vals);
5268 } else {
5269 do_decode(*snapsets, result.vals);
5270 }
5271 }
5272 };
5273
5274 template <typename T>
5275 void do_scrub_ls(::ObjectOperation *op,
5276 const scrub_ls_arg_t& arg,
5277 std::vector<T> *items,
5278 uint32_t *interval,
5279 int *rval)
5280 {
5281 OSDOp& osd_op = op->add_op(CEPH_OSD_OP_SCRUBLS);
5282 op->flags |= CEPH_OSD_FLAG_PGOP;
5283 assert(interval);
5284 arg.encode(osd_op.indata);
5285 unsigned p = op->ops.size() - 1;
5286 auto *h = new C_ObjectOperation_scrub_ls{interval, items, rval};
5287 op->out_handler[p] = h;
5288 op->out_bl[p] = &h->bl;
5289 op->out_rval[p] = rval;
5290 }
5291 }
5292
5293 void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5294 uint64_t max_to_get,
5295 std::vector<librados::inconsistent_obj_t> *objects,
5296 uint32_t *interval,
5297 int *rval)
5298 {
5299 scrub_ls_arg_t arg = {*interval, 0, start_after, max_to_get};
5300 do_scrub_ls(this, arg, objects, interval, rval);
5301 }
5302
5303 void ::ObjectOperation::scrub_ls(const librados::object_id_t& start_after,
5304 uint64_t max_to_get,
5305 std::vector<librados::inconsistent_snapset_t> *snapsets,
5306 uint32_t *interval,
5307 int *rval)
5308 {
5309 scrub_ls_arg_t arg = {*interval, 1, start_after, max_to_get};
5310 do_scrub_ls(this, arg, snapsets, interval, rval);
5311 }