]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/Objecter.h
update sources to v12.2.5
[ceph.git] / ceph / src / osdc / Objecter.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
17
18 #include <condition_variable>
19 #include <list>
20 #include <map>
21 #include <mutex>
22 #include <memory>
23 #include <sstream>
24 #include <type_traits>
25
26 #include <boost/thread/shared_mutex.hpp>
27
28 #include "include/assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
32
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/Finisher.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
39
40 #include "messages/MOSDOp.h"
41 #include "osd/OSDMap.h"
42
43 using namespace std;
44
45 class Context;
46 class Messenger;
47 class OSDMap;
48 class MonClient;
49 class Message;
50 class Finisher;
51
52 class MPoolOpReply;
53
54 class MGetPoolStatsReply;
55 class MStatfsReply;
56 class MCommandReply;
57 class MWatchNotify;
58
59 class PerfCounters;
60
61 // -----------------------------------------
62
63 struct ObjectOperation {
64 vector<OSDOp> ops;
65 int flags;
66 int priority;
67
68 vector<bufferlist*> out_bl;
69 vector<Context*> out_handler;
70 vector<int*> out_rval;
71
72 ObjectOperation() : flags(0), priority(0) {}
73 ~ObjectOperation() {
74 while (!out_handler.empty()) {
75 delete out_handler.back();
76 out_handler.pop_back();
77 }
78 }
79
80 size_t size() {
81 return ops.size();
82 }
83
84 void set_last_op_flags(int flags) {
85 assert(!ops.empty());
86 ops.rbegin()->op.flags = flags;
87 }
88
89 class C_TwoContexts;
90 /**
91 * Add a callback to run when this operation completes,
92 * after any other callbacks for it.
93 */
94 void add_handler(Context *extra);
95
96 OSDOp& add_op(int op) {
97 int s = ops.size();
98 ops.resize(s+1);
99 ops[s].op.op = op;
100 out_bl.resize(s+1);
101 out_bl[s] = NULL;
102 out_handler.resize(s+1);
103 out_handler[s] = NULL;
104 out_rval.resize(s+1);
105 out_rval[s] = NULL;
106 return ops[s];
107 }
108 void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) {
109 OSDOp& osd_op = add_op(op);
110 osd_op.op.extent.offset = off;
111 osd_op.op.extent.length = len;
112 osd_op.indata.claim_append(bl);
113 }
114 void add_writesame(int op, uint64_t off, uint64_t write_len,
115 bufferlist& bl) {
116 OSDOp& osd_op = add_op(op);
117 osd_op.op.writesame.offset = off;
118 osd_op.op.writesame.length = write_len;
119 osd_op.op.writesame.data_length = bl.length();
120 osd_op.indata.claim_append(bl);
121 }
122 void add_xattr(int op, const char *name, const bufferlist& data) {
123 OSDOp& osd_op = add_op(op);
124 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
125 osd_op.op.xattr.value_len = data.length();
126 if (name)
127 osd_op.indata.append(name);
128 osd_op.indata.append(data);
129 }
130 void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
131 uint8_t cmp_mode, const bufferlist& data) {
132 OSDOp& osd_op = add_op(op);
133 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
134 osd_op.op.xattr.value_len = data.length();
135 osd_op.op.xattr.cmp_op = cmp_op;
136 osd_op.op.xattr.cmp_mode = cmp_mode;
137 if (name)
138 osd_op.indata.append(name);
139 osd_op.indata.append(data);
140 }
141 void add_call(int op, const char *cname, const char *method,
142 bufferlist &indata,
143 bufferlist *outbl, Context *ctx, int *prval) {
144 OSDOp& osd_op = add_op(op);
145
146 unsigned p = ops.size() - 1;
147 out_handler[p] = ctx;
148 out_bl[p] = outbl;
149 out_rval[p] = prval;
150
151 osd_op.op.cls.class_len = strlen(cname);
152 osd_op.op.cls.method_len = strlen(method);
153 osd_op.op.cls.indata_len = indata.length();
154 osd_op.indata.append(cname, osd_op.op.cls.class_len);
155 osd_op.indata.append(method, osd_op.op.cls.method_len);
156 osd_op.indata.append(indata);
157 }
158 void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
159 epoch_t start_epoch) {
160 OSDOp& osd_op = add_op(op);
161 osd_op.op.pgls.count = count;
162 osd_op.op.pgls.start_epoch = start_epoch;
163 ::encode(cookie, osd_op.indata);
164 }
165 void add_pgls_filter(int op, uint64_t count, const bufferlist& filter,
166 collection_list_handle_t cookie, epoch_t start_epoch) {
167 OSDOp& osd_op = add_op(op);
168 osd_op.op.pgls.count = count;
169 osd_op.op.pgls.start_epoch = start_epoch;
170 string cname = "pg";
171 string mname = "filter";
172 ::encode(cname, osd_op.indata);
173 ::encode(mname, osd_op.indata);
174 osd_op.indata.append(filter);
175 ::encode(cookie, osd_op.indata);
176 }
177 void add_alloc_hint(int op, uint64_t expected_object_size,
178 uint64_t expected_write_size,
179 uint32_t flags) {
180 OSDOp& osd_op = add_op(op);
181 osd_op.op.alloc_hint.expected_object_size = expected_object_size;
182 osd_op.op.alloc_hint.expected_write_size = expected_write_size;
183 osd_op.op.alloc_hint.flags = flags;
184 }
185
186 // ------
187
188 // pg
189 void pg_ls(uint64_t count, bufferlist& filter,
190 collection_list_handle_t cookie, epoch_t start_epoch) {
191 if (filter.length() == 0)
192 add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
193 else
194 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
195 start_epoch);
196 flags |= CEPH_OSD_FLAG_PGOP;
197 }
198
199 void pg_nls(uint64_t count, const bufferlist& filter,
200 collection_list_handle_t cookie, epoch_t start_epoch) {
201 if (filter.length() == 0)
202 add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
203 else
204 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
205 start_epoch);
206 flags |= CEPH_OSD_FLAG_PGOP;
207 }
208
209 void scrub_ls(const librados::object_id_t& start_after,
210 uint64_t max_to_get,
211 std::vector<librados::inconsistent_obj_t> *objects,
212 uint32_t *interval,
213 int *rval);
214 void scrub_ls(const librados::object_id_t& start_after,
215 uint64_t max_to_get,
216 std::vector<librados::inconsistent_snapset_t> *objects,
217 uint32_t *interval,
218 int *rval);
219
220 void create(bool excl) {
221 OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
222 o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
223 }
224
225 struct C_ObjectOperation_stat : public Context {
226 bufferlist bl;
227 uint64_t *psize;
228 ceph::real_time *pmtime;
229 time_t *ptime;
230 struct timespec *pts;
231 int *prval;
232 C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
233 int *prval)
234 : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {}
235 void finish(int r) override {
236 if (r >= 0) {
237 bufferlist::iterator p = bl.begin();
238 try {
239 uint64_t size;
240 ceph::real_time mtime;
241 ::decode(size, p);
242 ::decode(mtime, p);
243 if (psize)
244 *psize = size;
245 if (pmtime)
246 *pmtime = mtime;
247 if (ptime)
248 *ptime = ceph::real_clock::to_time_t(mtime);
249 if (pts)
250 *pts = ceph::real_clock::to_timespec(mtime);
251 } catch (buffer::error& e) {
252 if (prval)
253 *prval = -EIO;
254 }
255 }
256 }
257 };
258 void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
259 add_op(CEPH_OSD_OP_STAT);
260 unsigned p = ops.size() - 1;
261 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL,
262 prval);
263 out_bl[p] = &h->bl;
264 out_handler[p] = h;
265 out_rval[p] = prval;
266 }
267 void stat(uint64_t *psize, time_t *ptime, int *prval) {
268 add_op(CEPH_OSD_OP_STAT);
269 unsigned p = ops.size() - 1;
270 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL,
271 prval);
272 out_bl[p] = &h->bl;
273 out_handler[p] = h;
274 out_rval[p] = prval;
275 }
276 void stat(uint64_t *psize, struct timespec *pts, int *prval) {
277 add_op(CEPH_OSD_OP_STAT);
278 unsigned p = ops.size() - 1;
279 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts,
280 prval);
281 out_bl[p] = &h->bl;
282 out_handler[p] = h;
283 out_rval[p] = prval;
284 }
285 // object cmpext
286 struct C_ObjectOperation_cmpext : public Context {
287 int *prval;
288 C_ObjectOperation_cmpext(int *prval)
289 : prval(prval) {}
290
291 void finish(int r) {
292 if (prval)
293 *prval = r;
294 }
295 };
296
297 void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval) {
298 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
299 unsigned p = ops.size() - 1;
300 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
301 out_handler[p] = h;
302 out_rval[p] = prval;
303 }
304
305 // Used by C API
306 void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
307 bufferlist cmp_bl;
308 cmp_bl.append(cmp_buf, cmp_len);
309 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
310 unsigned p = ops.size() - 1;
311 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
312 out_handler[p] = h;
313 out_rval[p] = prval;
314 }
315
316 void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval,
317 Context* ctx) {
318 bufferlist bl;
319 add_data(CEPH_OSD_OP_READ, off, len, bl);
320 unsigned p = ops.size() - 1;
321 out_bl[p] = pbl;
322 out_rval[p] = prval;
323 out_handler[p] = ctx;
324 }
325
326 struct C_ObjectOperation_sparse_read : public Context {
327 bufferlist bl;
328 bufferlist *data_bl;
329 std::map<uint64_t, uint64_t> *extents;
330 int *prval;
331 C_ObjectOperation_sparse_read(bufferlist *data_bl,
332 std::map<uint64_t, uint64_t> *extents,
333 int *prval)
334 : data_bl(data_bl), extents(extents), prval(prval) {}
335 void finish(int r) override {
336 bufferlist::iterator iter = bl.begin();
337 if (r >= 0) {
338 // NOTE: it's possible the sub-op has not been executed but the result
339 // code remains zeroed. Avoid the costly exception handling on a
340 // potential IO path.
341 if (bl.length() > 0) {
342 try {
343 ::decode(*extents, iter);
344 ::decode(*data_bl, iter);
345 } catch (buffer::error& e) {
346 if (prval)
347 *prval = -EIO;
348 }
349 } else if (prval) {
350 *prval = -EIO;
351 }
352 }
353 }
354 };
355 void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m,
356 bufferlist *data_bl, int *prval) {
357 bufferlist bl;
358 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
359 unsigned p = ops.size() - 1;
360 C_ObjectOperation_sparse_read *h =
361 new C_ObjectOperation_sparse_read(data_bl, m, prval);
362 out_bl[p] = &h->bl;
363 out_handler[p] = h;
364 out_rval[p] = prval;
365 }
366 void write(uint64_t off, bufferlist& bl,
367 uint64_t truncate_size,
368 uint32_t truncate_seq) {
369 add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
370 OSDOp& o = *ops.rbegin();
371 o.op.extent.truncate_size = truncate_size;
372 o.op.extent.truncate_seq = truncate_seq;
373 }
374 void write(uint64_t off, bufferlist& bl) {
375 write(off, bl, 0, 0);
376 }
377 void write_full(bufferlist& bl) {
378 add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
379 }
380 void writesame(uint64_t off, uint64_t write_len, bufferlist& bl) {
381 add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
382 }
383 void append(bufferlist& bl) {
384 add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
385 }
386 void zero(uint64_t off, uint64_t len) {
387 bufferlist bl;
388 add_data(CEPH_OSD_OP_ZERO, off, len, bl);
389 }
390 void truncate(uint64_t off) {
391 bufferlist bl;
392 add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
393 }
394 void remove() {
395 bufferlist bl;
396 add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
397 }
398 void mapext(uint64_t off, uint64_t len) {
399 bufferlist bl;
400 add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
401 }
402 void sparse_read(uint64_t off, uint64_t len) {
403 bufferlist bl;
404 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
405 }
406
407 void checksum(uint8_t type, const bufferlist &init_value_bl,
408 uint64_t off, uint64_t len, size_t chunk_size,
409 bufferlist *pbl, int *prval, Context *ctx) {
410 OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
411 osd_op.op.checksum.offset = off;
412 osd_op.op.checksum.length = len;
413 osd_op.op.checksum.type = type;
414 osd_op.op.checksum.chunk_size = chunk_size;
415 osd_op.indata.append(init_value_bl);
416
417 unsigned p = ops.size() - 1;
418 out_bl[p] = pbl;
419 out_rval[p] = prval;
420 out_handler[p] = ctx;
421 }
422
423 // object attrs
424 void getxattr(const char *name, bufferlist *pbl, int *prval) {
425 bufferlist bl;
426 add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
427 unsigned p = ops.size() - 1;
428 out_bl[p] = pbl;
429 out_rval[p] = prval;
430 }
431 struct C_ObjectOperation_decodevals : public Context {
432 uint64_t max_entries;
433 bufferlist bl;
434 std::map<std::string,bufferlist> *pattrs;
435 bool *ptruncated;
436 int *prval;
437 C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,bufferlist> *pa,
438 bool *pt, int *pr)
439 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
440 if (ptruncated) {
441 *ptruncated = false;
442 }
443 }
444 void finish(int r) override {
445 if (r >= 0) {
446 bufferlist::iterator p = bl.begin();
447 try {
448 if (pattrs)
449 ::decode(*pattrs, p);
450 if (ptruncated) {
451 std::map<std::string,bufferlist> ignore;
452 if (!pattrs) {
453 ::decode(ignore, p);
454 pattrs = &ignore;
455 }
456 if (!p.end()) {
457 ::decode(*ptruncated, p);
458 } else {
459 // the OSD did not provide this. since old OSDs do not
460 // enfoce omap result limits either, we can infer it from
461 // the size of the result
462 *ptruncated = (pattrs->size() == max_entries);
463 }
464 }
465 }
466 catch (buffer::error& e) {
467 if (prval)
468 *prval = -EIO;
469 }
470 }
471 }
472 };
473 struct C_ObjectOperation_decodekeys : public Context {
474 uint64_t max_entries;
475 bufferlist bl;
476 std::set<std::string> *pattrs;
477 bool *ptruncated;
478 int *prval;
479 C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt,
480 int *pr)
481 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
482 if (ptruncated) {
483 *ptruncated = false;
484 }
485 }
486 void finish(int r) override {
487 if (r >= 0) {
488 bufferlist::iterator p = bl.begin();
489 try {
490 if (pattrs)
491 ::decode(*pattrs, p);
492 if (ptruncated) {
493 std::set<std::string> ignore;
494 if (!pattrs) {
495 ::decode(ignore, p);
496 pattrs = &ignore;
497 }
498 if (!p.end()) {
499 ::decode(*ptruncated, p);
500 } else {
501 // the OSD did not provide this. since old OSDs do not
502 // enfoce omap result limits either, we can infer it from
503 // the size of the result
504 *ptruncated = (pattrs->size() == max_entries);
505 }
506 }
507 }
508 catch (buffer::error& e) {
509 if (prval)
510 *prval = -EIO;
511 }
512 }
513 }
514 };
515 struct C_ObjectOperation_decodewatchers : public Context {
516 bufferlist bl;
517 list<obj_watch_t> *pwatchers;
518 int *prval;
519 C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr)
520 : pwatchers(pw), prval(pr) {}
521 void finish(int r) override {
522 if (r >= 0) {
523 bufferlist::iterator p = bl.begin();
524 try {
525 obj_list_watch_response_t resp;
526 ::decode(resp, p);
527 if (pwatchers) {
528 for (list<watch_item_t>::iterator i = resp.entries.begin() ;
529 i != resp.entries.end() ; ++i) {
530 obj_watch_t ow;
531 ostringstream sa;
532 sa << i->addr;
533 strncpy(ow.addr, sa.str().c_str(), 256);
534 ow.watcher_id = i->name.num();
535 ow.cookie = i->cookie;
536 ow.timeout_seconds = i->timeout_seconds;
537 pwatchers->push_back(ow);
538 }
539 }
540 }
541 catch (buffer::error& e) {
542 if (prval)
543 *prval = -EIO;
544 }
545 }
546 }
547 };
548 struct C_ObjectOperation_decodesnaps : public Context {
549 bufferlist bl;
550 librados::snap_set_t *psnaps;
551 int *prval;
552 C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr)
553 : psnaps(ps), prval(pr) {}
554 void finish(int r) override {
555 if (r >= 0) {
556 bufferlist::iterator p = bl.begin();
557 try {
558 obj_list_snap_response_t resp;
559 ::decode(resp, p);
560 if (psnaps) {
561 psnaps->clones.clear();
562 for (vector<clone_info>::iterator ci = resp.clones.begin();
563 ci != resp.clones.end();
564 ++ci) {
565 librados::clone_info_t clone;
566
567 clone.cloneid = ci->cloneid;
568 clone.snaps.reserve(ci->snaps.size());
569 clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
570 ci->snaps.end());
571 clone.overlap = ci->overlap;
572 clone.size = ci->size;
573
574 psnaps->clones.push_back(clone);
575 }
576 psnaps->seq = resp.seq;
577 }
578 } catch (buffer::error& e) {
579 if (prval)
580 *prval = -EIO;
581 }
582 }
583 }
584 };
585 void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {
586 add_op(CEPH_OSD_OP_GETXATTRS);
587 if (pattrs || prval) {
588 unsigned p = ops.size() - 1;
589 C_ObjectOperation_decodevals *h
590 = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval);
591 out_handler[p] = h;
592 out_bl[p] = &h->bl;
593 out_rval[p] = prval;
594 }
595 }
596 void setxattr(const char *name, const bufferlist& bl) {
597 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
598 }
599 void setxattr(const char *name, const string& s) {
600 bufferlist bl;
601 bl.append(s);
602 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
603 }
604 void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
605 const bufferlist& bl) {
606 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
607 }
608 void rmxattr(const char *name) {
609 bufferlist bl;
610 add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
611 }
612 void setxattrs(map<string, bufferlist>& attrs) {
613 bufferlist bl;
614 ::encode(attrs, bl);
615 add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
616 }
617 void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
618 bufferlist bl;
619 ::encode(attrs, bl);
620 add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
621 }
622
623 // trivialmap
624 void tmap_update(bufferlist& bl) {
625 add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
626 }
627 void tmap_put(bufferlist& bl) {
628 add_data(CEPH_OSD_OP_TMAPPUT, 0, bl.length(), bl);
629 }
630 void tmap_get(bufferlist *pbl, int *prval) {
631 add_op(CEPH_OSD_OP_TMAPGET);
632 unsigned p = ops.size() - 1;
633 out_bl[p] = pbl;
634 out_rval[p] = prval;
635 }
636 void tmap_get() {
637 add_op(CEPH_OSD_OP_TMAPGET);
638 }
639 void tmap_to_omap(bool nullok=false) {
640 OSDOp& osd_op = add_op(CEPH_OSD_OP_TMAP2OMAP);
641 if (nullok)
642 osd_op.op.tmap2omap.flags = CEPH_OSD_TMAP2OMAP_NULLOK;
643 }
644
645 // objectmap
646 void omap_get_keys(const string &start_after,
647 uint64_t max_to_get,
648 std::set<std::string> *out_set,
649 bool *ptruncated,
650 int *prval) {
651 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
652 bufferlist bl;
653 ::encode(start_after, bl);
654 ::encode(max_to_get, bl);
655 op.op.extent.offset = 0;
656 op.op.extent.length = bl.length();
657 op.indata.claim_append(bl);
658 if (prval || ptruncated || out_set) {
659 unsigned p = ops.size() - 1;
660 C_ObjectOperation_decodekeys *h =
661 new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval);
662 out_handler[p] = h;
663 out_bl[p] = &h->bl;
664 out_rval[p] = prval;
665 }
666 }
667
668 void omap_get_vals(const string &start_after,
669 const string &filter_prefix,
670 uint64_t max_to_get,
671 std::map<std::string, bufferlist> *out_set,
672 bool *ptruncated,
673 int *prval) {
674 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
675 bufferlist bl;
676 ::encode(start_after, bl);
677 ::encode(max_to_get, bl);
678 ::encode(filter_prefix, bl);
679 op.op.extent.offset = 0;
680 op.op.extent.length = bl.length();
681 op.indata.claim_append(bl);
682 if (prval || out_set || ptruncated) {
683 unsigned p = ops.size() - 1;
684 C_ObjectOperation_decodevals *h =
685 new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval);
686 out_handler[p] = h;
687 out_bl[p] = &h->bl;
688 out_rval[p] = prval;
689 }
690 }
691
692 void omap_get_vals_by_keys(const std::set<std::string> &to_get,
693 std::map<std::string, bufferlist> *out_set,
694 int *prval) {
695 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
696 bufferlist bl;
697 ::encode(to_get, bl);
698 op.op.extent.offset = 0;
699 op.op.extent.length = bl.length();
700 op.indata.claim_append(bl);
701 if (prval || out_set) {
702 unsigned p = ops.size() - 1;
703 C_ObjectOperation_decodevals *h =
704 new C_ObjectOperation_decodevals(0, out_set, nullptr, prval);
705 out_handler[p] = h;
706 out_bl[p] = &h->bl;
707 out_rval[p] = prval;
708 }
709 }
710
711 void omap_cmp(const std::map<std::string, pair<bufferlist,int> > &assertions,
712 int *prval) {
713 OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
714 bufferlist bl;
715 ::encode(assertions, bl);
716 op.op.extent.offset = 0;
717 op.op.extent.length = bl.length();
718 op.indata.claim_append(bl);
719 if (prval) {
720 unsigned p = ops.size() - 1;
721 out_rval[p] = prval;
722 }
723 }
724
725 struct C_ObjectOperation_copyget : public Context {
726 bufferlist bl;
727 object_copy_cursor_t *cursor;
728 uint64_t *out_size;
729 ceph::real_time *out_mtime;
730 std::map<std::string,bufferlist> *out_attrs;
731 bufferlist *out_data, *out_omap_header, *out_omap_data;
732 vector<snapid_t> *out_snaps;
733 snapid_t *out_snap_seq;
734 uint32_t *out_flags;
735 uint32_t *out_data_digest;
736 uint32_t *out_omap_digest;
737 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids;
738 uint64_t *out_truncate_seq;
739 uint64_t *out_truncate_size;
740 int *prval;
741 C_ObjectOperation_copyget(object_copy_cursor_t *c,
742 uint64_t *s,
743 ceph::real_time *m,
744 std::map<std::string,bufferlist> *a,
745 bufferlist *d, bufferlist *oh,
746 bufferlist *o,
747 std::vector<snapid_t> *osnaps,
748 snapid_t *osnap_seq,
749 uint32_t *flags,
750 uint32_t *dd,
751 uint32_t *od,
752 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *oreqids,
753 uint64_t *otseq,
754 uint64_t *otsize,
755 int *r)
756 : cursor(c),
757 out_size(s), out_mtime(m),
758 out_attrs(a), out_data(d), out_omap_header(oh),
759 out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
760 out_flags(flags), out_data_digest(dd), out_omap_digest(od),
761 out_reqids(oreqids),
762 out_truncate_seq(otseq),
763 out_truncate_size(otsize),
764 prval(r) {}
765 void finish(int r) override {
766 // reqids are copied on ENOENT
767 if (r < 0 && r != -ENOENT)
768 return;
769 try {
770 bufferlist::iterator p = bl.begin();
771 object_copy_data_t copy_reply;
772 ::decode(copy_reply, p);
773 if (r == -ENOENT) {
774 if (out_reqids)
775 *out_reqids = copy_reply.reqids;
776 return;
777 }
778 if (out_size)
779 *out_size = copy_reply.size;
780 if (out_mtime)
781 *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
782 if (out_attrs)
783 *out_attrs = copy_reply.attrs;
784 if (out_data)
785 out_data->claim_append(copy_reply.data);
786 if (out_omap_header)
787 out_omap_header->claim_append(copy_reply.omap_header);
788 if (out_omap_data)
789 *out_omap_data = copy_reply.omap_data;
790 if (out_snaps)
791 *out_snaps = copy_reply.snaps;
792 if (out_snap_seq)
793 *out_snap_seq = copy_reply.snap_seq;
794 if (out_flags)
795 *out_flags = copy_reply.flags;
796 if (out_data_digest)
797 *out_data_digest = copy_reply.data_digest;
798 if (out_omap_digest)
799 *out_omap_digest = copy_reply.omap_digest;
800 if (out_reqids)
801 *out_reqids = copy_reply.reqids;
802 if (out_truncate_seq)
803 *out_truncate_seq = copy_reply.truncate_seq;
804 if (out_truncate_size)
805 *out_truncate_size = copy_reply.truncate_size;
806 *cursor = copy_reply.cursor;
807 } catch (buffer::error& e) {
808 if (prval)
809 *prval = -EIO;
810 }
811 }
812 };
813
814 void copy_get(object_copy_cursor_t *cursor,
815 uint64_t max,
816 uint64_t *out_size,
817 ceph::real_time *out_mtime,
818 std::map<std::string,bufferlist> *out_attrs,
819 bufferlist *out_data,
820 bufferlist *out_omap_header,
821 bufferlist *out_omap_data,
822 vector<snapid_t> *out_snaps,
823 snapid_t *out_snap_seq,
824 uint32_t *out_flags,
825 uint32_t *out_data_digest,
826 uint32_t *out_omap_digest,
827 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids,
828 uint64_t *truncate_seq,
829 uint64_t *truncate_size,
830 int *prval) {
831 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
832 osd_op.op.copy_get.max = max;
833 ::encode(*cursor, osd_op.indata);
834 ::encode(max, osd_op.indata);
835 unsigned p = ops.size() - 1;
836 out_rval[p] = prval;
837 C_ObjectOperation_copyget *h =
838 new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
839 out_attrs, out_data, out_omap_header,
840 out_omap_data, out_snaps, out_snap_seq,
841 out_flags, out_data_digest,
842 out_omap_digest, out_reqids, truncate_seq,
843 truncate_size, prval);
844 out_bl[p] = &h->bl;
845 out_handler[p] = h;
846 }
847
848 void undirty() {
849 add_op(CEPH_OSD_OP_UNDIRTY);
850 }
851
852 struct C_ObjectOperation_isdirty : public Context {
853 bufferlist bl;
854 bool *pisdirty;
855 int *prval;
856 C_ObjectOperation_isdirty(bool *p, int *r)
857 : pisdirty(p), prval(r) {}
858 void finish(int r) override {
859 if (r < 0)
860 return;
861 try {
862 bufferlist::iterator p = bl.begin();
863 bool isdirty;
864 ::decode(isdirty, p);
865 if (pisdirty)
866 *pisdirty = isdirty;
867 } catch (buffer::error& e) {
868 if (prval)
869 *prval = -EIO;
870 }
871 }
872 };
873
874 void is_dirty(bool *pisdirty, int *prval) {
875 add_op(CEPH_OSD_OP_ISDIRTY);
876 unsigned p = ops.size() - 1;
877 out_rval[p] = prval;
878 C_ObjectOperation_isdirty *h =
879 new C_ObjectOperation_isdirty(pisdirty, prval);
880 out_bl[p] = &h->bl;
881 out_handler[p] = h;
882 }
883
884 struct C_ObjectOperation_hit_set_ls : public Context {
885 bufferlist bl;
886 std::list< std::pair<time_t, time_t> > *ptls;
887 std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
888 int *prval;
889 C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
890 std::list< std::pair<ceph::real_time,
891 ceph::real_time> > *ut,
892 int *r)
893 : ptls(t), putls(ut), prval(r) {}
894 void finish(int r) override {
895 if (r < 0)
896 return;
897 try {
898 bufferlist::iterator p = bl.begin();
899 std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
900 ::decode(ls, p);
901 if (ptls) {
902 ptls->clear();
903 for (auto p = ls.begin(); p != ls.end(); ++p)
904 // round initial timestamp up to the next full second to
905 // keep this a valid interval.
906 ptls->push_back(
907 make_pair(ceph::real_clock::to_time_t(
908 ceph::ceil(p->first,
909 // Sadly, no time literals until C++14.
910 std::chrono::seconds(1))),
911 ceph::real_clock::to_time_t(p->second)));
912 }
913 if (putls)
914 putls->swap(ls);
915 } catch (buffer::error& e) {
916 r = -EIO;
917 }
918 if (prval)
919 *prval = r;
920 }
921 };
922
923 /**
924 * list available HitSets.
925 *
926 * We will get back a list of time intervals. Note that the most
927 * recent range may have an empty end timestamp if it is still
928 * accumulating.
929 *
930 * @param pls [out] list of time intervals
931 * @param prval [out] return value
932 */
933 void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
934 add_op(CEPH_OSD_OP_PG_HITSET_LS);
935 unsigned p = ops.size() - 1;
936 out_rval[p] = prval;
937 C_ObjectOperation_hit_set_ls *h =
938 new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
939 out_bl[p] = &h->bl;
940 out_handler[p] = h;
941 }
942 void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
943 int *prval) {
944 add_op(CEPH_OSD_OP_PG_HITSET_LS);
945 unsigned p = ops.size() - 1;
946 out_rval[p] = prval;
947 C_ObjectOperation_hit_set_ls *h =
948 new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
949 out_bl[p] = &h->bl;
950 out_handler[p] = h;
951 }
952
953 /**
954 * get HitSet
955 *
956 * Return an encoded HitSet that includes the provided time
957 * interval.
958 *
959 * @param stamp [in] timestamp
960 * @param pbl [out] target buffer for encoded HitSet
961 * @param prval [out] return value
962 */
963 void hit_set_get(ceph::real_time stamp, bufferlist *pbl, int *prval) {
964 OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
965 op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
966 unsigned p = ops.size() - 1;
967 out_rval[p] = prval;
968 out_bl[p] = pbl;
969 }
970
971 void omap_get_header(bufferlist *bl, int *prval) {
972 add_op(CEPH_OSD_OP_OMAPGETHEADER);
973 unsigned p = ops.size() - 1;
974 out_bl[p] = bl;
975 out_rval[p] = prval;
976 }
977
978 void omap_set(const map<string, bufferlist> &map) {
979 bufferlist bl;
980 ::encode(map, bl);
981 add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
982 }
983
984 void omap_set_header(bufferlist &bl) {
985 add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
986 }
987
988 void omap_clear() {
989 add_op(CEPH_OSD_OP_OMAPCLEAR);
990 }
991
992 void omap_rm_keys(const std::set<std::string> &to_remove) {
993 bufferlist bl;
994 ::encode(to_remove, bl);
995 add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
996 }
997
998 // object classes
999 void call(const char *cname, const char *method, bufferlist &indata) {
1000 add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
1001 }
1002
1003 void call(const char *cname, const char *method, bufferlist &indata,
1004 bufferlist *outdata, Context *ctx, int *prval) {
1005 add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
1006 }
1007
1008 // watch/notify
1009 void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) {
1010 OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH);
1011 osd_op.op.watch.cookie = cookie;
1012 osd_op.op.watch.op = op;
1013 osd_op.op.watch.timeout = timeout;
1014 }
1015
1016 void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
1017 bufferlist &bl, bufferlist *inbl) {
1018 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
1019 osd_op.op.notify.cookie = cookie;
1020 ::encode(prot_ver, *inbl);
1021 ::encode(timeout, *inbl);
1022 ::encode(bl, *inbl);
1023 osd_op.indata.append(*inbl);
1024 }
1025
1026 void notify_ack(uint64_t notify_id, uint64_t cookie,
1027 bufferlist& reply_bl) {
1028 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
1029 bufferlist bl;
1030 ::encode(notify_id, bl);
1031 ::encode(cookie, bl);
1032 ::encode(reply_bl, bl);
1033 osd_op.indata.append(bl);
1034 }
1035
1036 void list_watchers(list<obj_watch_t> *out,
1037 int *prval) {
1038 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
1039 if (prval || out) {
1040 unsigned p = ops.size() - 1;
1041 C_ObjectOperation_decodewatchers *h =
1042 new C_ObjectOperation_decodewatchers(out, prval);
1043 out_handler[p] = h;
1044 out_bl[p] = &h->bl;
1045 out_rval[p] = prval;
1046 }
1047 }
1048
1049 void list_snaps(librados::snap_set_t *out, int *prval) {
1050 (void)add_op(CEPH_OSD_OP_LIST_SNAPS);
1051 if (prval || out) {
1052 unsigned p = ops.size() - 1;
1053 C_ObjectOperation_decodesnaps *h =
1054 new C_ObjectOperation_decodesnaps(out, prval);
1055 out_handler[p] = h;
1056 out_bl[p] = &h->bl;
1057 out_rval[p] = prval;
1058 }
1059 }
1060
1061 void assert_version(uint64_t ver) {
1062 OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
1063 osd_op.op.assert_ver.ver = ver;
1064 }
1065
1066 void cmpxattr(const char *name, const bufferlist& val,
1067 int op, int mode) {
1068 add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
1069 OSDOp& o = *ops.rbegin();
1070 o.op.xattr.cmp_op = op;
1071 o.op.xattr.cmp_mode = mode;
1072 }
1073
1074 void rollback(uint64_t snapid) {
1075 OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
1076 osd_op.op.snap.snapid = snapid;
1077 }
1078
1079 void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
1080 version_t src_version, unsigned flags,
1081 unsigned src_fadvise_flags) {
1082 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
1083 osd_op.op.copy_from.snapid = snapid;
1084 osd_op.op.copy_from.src_version = src_version;
1085 osd_op.op.copy_from.flags = flags;
1086 osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
1087 ::encode(src, osd_op.indata);
1088 ::encode(src_oloc, osd_op.indata);
1089 }
1090
1091 /**
1092 * writeback content to backing tier
1093 *
1094 * If object is marked dirty in the cache tier, write back content
1095 * to backing tier. If the object is clean this is a no-op.
1096 *
1097 * If writeback races with an update, the update will block.
1098 *
1099 * use with IGNORE_CACHE to avoid triggering promote.
1100 */
1101 void cache_flush() {
1102 add_op(CEPH_OSD_OP_CACHE_FLUSH);
1103 }
1104
1105 /**
1106 * writeback content to backing tier
1107 *
1108 * If object is marked dirty in the cache tier, write back content
1109 * to backing tier. If the object is clean this is a no-op.
1110 *
1111 * If writeback races with an update, return EAGAIN. Requires that
1112 * the SKIPRWLOCKS flag be set.
1113 *
1114 * use with IGNORE_CACHE to avoid triggering promote.
1115 */
1116 void cache_try_flush() {
1117 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
1118 }
1119
1120 /**
1121 * evict object from cache tier
1122 *
1123 * If object is marked clean, remove the object from the cache tier.
1124 * Otherwise, return EBUSY.
1125 *
1126 * use with IGNORE_CACHE to avoid triggering promote.
1127 */
1128 void cache_evict() {
1129 add_op(CEPH_OSD_OP_CACHE_EVICT);
1130 }
1131
1132 /*
1133 * Extensible tier
1134 */
1135 void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc,
1136 version_t tgt_version) {
1137 OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
1138 osd_op.op.copy_from.snapid = snapid;
1139 osd_op.op.copy_from.src_version = tgt_version;
1140 ::encode(tgt, osd_op.indata);
1141 ::encode(tgt_oloc, osd_op.indata);
1142 }
1143
1144 void set_alloc_hint(uint64_t expected_object_size,
1145 uint64_t expected_write_size,
1146 uint32_t flags) {
1147 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
1148 expected_write_size, flags);
1149
1150 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1151 // not worth a feature bit. Set FAILOK per-op flag to make
1152 // sure older osds don't trip over an unsupported opcode.
1153 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1154 }
1155
1156 void dup(vector<OSDOp>& sops) {
1157 ops = sops;
1158 out_bl.resize(sops.size());
1159 out_handler.resize(sops.size());
1160 out_rval.resize(sops.size());
1161 for (uint32_t i = 0; i < sops.size(); i++) {
1162 out_bl[i] = &sops[i].outdata;
1163 out_handler[i] = NULL;
1164 out_rval[i] = &sops[i].rval;
1165 }
1166 }
1167
1168 /**
1169 * Pin/unpin an object in cache tier
1170 */
1171 void cache_pin() {
1172 add_op(CEPH_OSD_OP_CACHE_PIN);
1173 }
1174
1175 void cache_unpin() {
1176 add_op(CEPH_OSD_OP_CACHE_UNPIN);
1177 }
1178 };
1179
1180
1181 // ----------------
1182
1183
1184 class Objecter : public md_config_obs_t, public Dispatcher {
1185 public:
1186 // config observer bits
1187 const char** get_tracked_conf_keys() const override;
1188 void handle_conf_change(const struct md_config_t *conf,
1189 const std::set <std::string> &changed) override;
1190
1191 public:
1192 Messenger *messenger;
1193 MonClient *monc;
1194 Finisher *finisher;
1195 ZTracer::Endpoint trace_endpoint;
1196 private:
1197 OSDMap *osdmap;
1198 public:
1199 using Dispatcher::cct;
1200 std::multimap<string,string> crush_location;
1201
1202 std::atomic<bool> initialized{false};
1203
1204 private:
1205 std::atomic<uint64_t> last_tid{0};
1206 std::atomic<unsigned> inflight_ops{0};
1207 std::atomic<int> client_inc{-1};
1208 uint64_t max_linger_id;
1209 std::atomic<unsigned> num_in_flight{0};
1210 std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
1211 bool keep_balanced_budget;
1212 bool honor_osdmap_full;
1213 bool osdmap_full_try;
1214
1215 // If this is true, accumulate a set of blacklisted entities
1216 // to be drained by consume_blacklist_events.
1217 bool blacklist_events_enabled;
1218 std::set<entity_addr_t> blacklist_events;
1219
1220 public:
1221 void maybe_request_map();
1222
1223 void enable_blacklist_events();
1224 private:
1225
1226 void _maybe_request_map();
1227
1228 version_t last_seen_osdmap_version;
1229 version_t last_seen_pgmap_version;
1230
1231 mutable boost::shared_mutex rwlock;
1232 using lock_guard = std::unique_lock<decltype(rwlock)>;
1233 using unique_lock = std::unique_lock<decltype(rwlock)>;
1234 using shared_lock = boost::shared_lock<decltype(rwlock)>;
1235 using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
1236 ceph::timer<ceph::mono_clock> timer;
1237
1238 PerfCounters *logger;
1239
1240 uint64_t tick_event;
1241
1242 void start_tick();
1243 void tick();
1244 void update_crush_location();
1245
1246 class RequestStateHook;
1247
1248 RequestStateHook *m_request_state_hook;
1249
1250 public:
1251 /*** track pending operations ***/
1252 // read
1253 public:
1254
1255 struct OSDSession;
1256
1257 struct op_target_t {
1258 int flags = 0;
1259
1260 epoch_t epoch = 0; ///< latest epoch we calculated the mapping
1261
1262 object_t base_oid;
1263 object_locator_t base_oloc;
1264 object_t target_oid;
1265 object_locator_t target_oloc;
1266
1267 ///< true if we are directed at base_pgid, not base_oid
1268 bool precalc_pgid = false;
1269
1270 ///< true if we have ever mapped to a valid pool
1271 bool pool_ever_existed = false;
1272
1273 ///< explcit pg target, if any
1274 pg_t base_pgid;
1275
1276 pg_t pgid; ///< last (raw) pg we mapped to
1277 spg_t actual_pgid; ///< last (actual) spg_t we mapped to
1278 unsigned pg_num = 0; ///< last pg_num we mapped to
1279 unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
1280 vector<int> up; ///< set of up osds for last pg we mapped to
1281 vector<int> acting; ///< set of acting osds for last pg we mapped to
1282 int up_primary = -1; ///< last up_primary we mapped to
1283 int acting_primary = -1; ///< last acting_primary we mapped to
1284 int size = -1; ///< the size of the pool when were were last mapped
1285 int min_size = -1; ///< the min size of the pool when were were last mapped
1286 bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise
1287 bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering
1288
1289 bool used_replica = false;
1290 bool paused = false;
1291
1292 int osd = -1; ///< the final target osd, or -1
1293
1294 epoch_t last_force_resend = 0;
1295
1296 op_target_t(object_t oid, object_locator_t oloc, int flags)
1297 : flags(flags),
1298 base_oid(oid),
1299 base_oloc(oloc)
1300 {}
1301
1302 op_target_t(pg_t pgid)
1303 : base_oloc(pgid.pool(), pgid.ps()),
1304 precalc_pgid(true),
1305 base_pgid(pgid)
1306 {}
1307
1308 op_target_t() = default;
1309
1310 hobject_t get_hobj() {
1311 return hobject_t(target_oid,
1312 target_oloc.key,
1313 CEPH_NOSNAP,
1314 target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
1315 target_oloc.pool,
1316 target_oloc.nspace);
1317 }
1318
1319 bool contained_by(const hobject_t& begin, const hobject_t& end) {
1320 hobject_t h = get_hobj();
1321 int r = cmp(h, begin);
1322 return r == 0 || (r > 0 && h < end);
1323 }
1324
1325 void dump(Formatter *f) const;
1326 };
1327
1328 struct Op : public RefCountedObject {
1329 OSDSession *session;
1330 int incarnation;
1331
1332 op_target_t target;
1333
1334 ConnectionRef con; // for rx buffer only
1335 uint64_t features; // explicitly specified op features
1336
1337 vector<OSDOp> ops;
1338
1339 snapid_t snapid;
1340 SnapContext snapc;
1341 ceph::real_time mtime;
1342
1343 bufferlist *outbl;
1344 vector<bufferlist*> out_bl;
1345 vector<Context*> out_handler;
1346 vector<int*> out_rval;
1347
1348 int priority;
1349 Context *onfinish;
1350 uint64_t ontimeout;
1351
1352 ceph_tid_t tid;
1353 int attempts;
1354
1355 version_t *objver;
1356 epoch_t *reply_epoch;
1357
1358 ceph::mono_time stamp;
1359
1360 epoch_t map_dne_bound;
1361
1362 bool budgeted;
1363
1364 /// true if we should resend this message on failure
1365 bool should_resend;
1366
1367 /// true if the throttle budget is get/put on a series of OPs,
1368 /// instead of per OP basis, when this flag is set, the budget is
1369 /// acquired before sending the very first OP of the series and
1370 /// released upon receiving the last OP reply.
1371 bool ctx_budgeted;
1372
1373 int *data_offset;
1374
1375 osd_reqid_t reqid; // explicitly setting reqid
1376 ZTracer::Trace trace;
1377
1378 Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
1379 int f, Context *fin, version_t *ov, int *offset = NULL,
1380 ZTracer::Trace *parent_trace = nullptr) :
1381 session(NULL), incarnation(0),
1382 target(o, ol, f),
1383 con(NULL),
1384 features(CEPH_FEATURES_SUPPORTED_DEFAULT),
1385 snapid(CEPH_NOSNAP),
1386 outbl(NULL),
1387 priority(0),
1388 onfinish(fin),
1389 ontimeout(0),
1390 tid(0),
1391 attempts(0),
1392 objver(ov),
1393 reply_epoch(NULL),
1394 map_dne_bound(0),
1395 budgeted(false),
1396 should_resend(true),
1397 ctx_budgeted(false),
1398 data_offset(offset) {
1399 ops.swap(op);
1400
1401 /* initialize out_* to match op vector */
1402 out_bl.resize(ops.size());
1403 out_rval.resize(ops.size());
1404 out_handler.resize(ops.size());
1405 for (unsigned i = 0; i < ops.size(); i++) {
1406 out_bl[i] = NULL;
1407 out_handler[i] = NULL;
1408 out_rval[i] = NULL;
1409 }
1410
1411 if (target.base_oloc.key == o)
1412 target.base_oloc.key.clear();
1413
1414 if (parent_trace && parent_trace->valid()) {
1415 trace.init("op", nullptr, parent_trace);
1416 trace.event("start");
1417 }
1418 }
1419
1420 bool operator<(const Op& other) const {
1421 return tid < other.tid;
1422 }
1423
1424 bool respects_full() const {
1425 return
1426 (target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
1427 !(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
1428 }
1429
1430 private:
1431 ~Op() override {
1432 while (!out_handler.empty()) {
1433 delete out_handler.back();
1434 out_handler.pop_back();
1435 }
1436 trace.event("finish");
1437 }
1438 };
1439
1440 struct C_Op_Map_Latest : public Context {
1441 Objecter *objecter;
1442 ceph_tid_t tid;
1443 version_t latest;
1444 C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1445 latest(0) {}
1446 void finish(int r) override;
1447 };
1448
1449 struct C_Command_Map_Latest : public Context {
1450 Objecter *objecter;
1451 uint64_t tid;
1452 version_t latest;
1453 C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1454 latest(0) {}
1455 void finish(int r) override;
1456 };
1457
1458 struct C_Stat : public Context {
1459 bufferlist bl;
1460 uint64_t *psize;
1461 ceph::real_time *pmtime;
1462 Context *fin;
1463 C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
1464 psize(ps), pmtime(pm), fin(c) {}
1465 void finish(int r) override {
1466 if (r >= 0) {
1467 bufferlist::iterator p = bl.begin();
1468 uint64_t s;
1469 ceph::real_time m;
1470 ::decode(s, p);
1471 ::decode(m, p);
1472 if (psize)
1473 *psize = s;
1474 if (pmtime)
1475 *pmtime = m;
1476 }
1477 fin->complete(r);
1478 }
1479 };
1480
1481 struct C_GetAttrs : public Context {
1482 bufferlist bl;
1483 map<string,bufferlist>& attrset;
1484 Context *fin;
1485 C_GetAttrs(map<string, bufferlist>& set, Context *c) : attrset(set),
1486 fin(c) {}
1487 void finish(int r) override {
1488 if (r >= 0) {
1489 bufferlist::iterator p = bl.begin();
1490 ::decode(attrset, p);
1491 }
1492 fin->complete(r);
1493 }
1494 };
1495
1496
1497 // Pools and statistics
1498 struct NListContext {
1499 collection_list_handle_t pos;
1500
1501 // these are for !sortbitwise compat only
1502 int current_pg = 0;
1503 int starting_pg_num = 0;
1504 bool sort_bitwise = false;
1505
1506 bool at_end_of_pool = false; ///< publicly visible end flag
1507
1508 int64_t pool_id = -1;
1509 int pool_snap_seq = 0;
1510 uint64_t max_entries = 0;
1511 string nspace;
1512
1513 bufferlist bl; // raw data read to here
1514 std::list<librados::ListObjectImpl> list;
1515
1516 bufferlist filter;
1517
1518 bufferlist extra_info;
1519
1520 // The budget associated with this context, once it is set (>= 0),
1521 // the budget is not get/released on OP basis, instead the budget
1522 // is acquired before sending the first OP and released upon receiving
1523 // the last op reply.
1524 int ctx_budget = -1;
1525
1526 bool at_end() const {
1527 return at_end_of_pool;
1528 }
1529
1530 uint32_t get_pg_hash_position() const {
1531 return pos.get_hash();
1532 }
1533 };
1534
1535 struct C_NList : public Context {
1536 NListContext *list_context;
1537 Context *final_finish;
1538 Objecter *objecter;
1539 epoch_t epoch;
1540 C_NList(NListContext *lc, Context * finish, Objecter *ob) :
1541 list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
1542 void finish(int r) override {
1543 if (r >= 0) {
1544 objecter->_nlist_reply(list_context, r, final_finish, epoch);
1545 } else {
1546 final_finish->complete(r);
1547 }
1548 }
1549 };
1550
1551 struct PoolStatOp {
1552 ceph_tid_t tid;
1553 list<string> pools;
1554
1555 map<string,pool_stat_t> *pool_stats;
1556 Context *onfinish;
1557 uint64_t ontimeout;
1558
1559 ceph::mono_time last_submit;
1560 };
1561
1562 struct StatfsOp {
1563 ceph_tid_t tid;
1564 struct ceph_statfs *stats;
1565 boost::optional<int64_t> data_pool;
1566 Context *onfinish;
1567 uint64_t ontimeout;
1568
1569 ceph::mono_time last_submit;
1570 };
1571
1572 struct PoolOp {
1573 ceph_tid_t tid;
1574 int64_t pool;
1575 string name;
1576 Context *onfinish;
1577 uint64_t ontimeout;
1578 int pool_op;
1579 uint64_t auid;
1580 int16_t crush_rule;
1581 snapid_t snapid;
1582 bufferlist *blp;
1583
1584 ceph::mono_time last_submit;
1585 PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
1586 auid(0), crush_rule(0), snapid(0), blp(NULL) {}
1587 };
1588
1589 // -- osd commands --
1590 struct CommandOp : public RefCountedObject {
1591 OSDSession *session = nullptr;
1592 ceph_tid_t tid = 0;
1593 vector<string> cmd;
1594 bufferlist inbl;
1595 bufferlist *poutbl = nullptr;
1596 string *prs = nullptr;
1597
1598 // target_osd == -1 means target_pg is valid
1599 const int target_osd = -1;
1600 const pg_t target_pg;
1601
1602 op_target_t target;
1603
1604 epoch_t map_dne_bound = 0;
1605 int map_check_error = 0; // error to return if map check fails
1606 const char *map_check_error_str = nullptr;
1607
1608 Context *onfinish = nullptr;
1609 uint64_t ontimeout = 0;
1610 ceph::mono_time last_submit;
1611
1612 CommandOp(
1613 int target_osd,
1614 const vector<string> &cmd,
1615 bufferlist inbl,
1616 bufferlist *poutbl,
1617 string *prs,
1618 Context *onfinish)
1619 : cmd(cmd),
1620 inbl(inbl),
1621 poutbl(poutbl),
1622 prs(prs),
1623 target_osd(target_osd),
1624 onfinish(onfinish) {}
1625
1626 CommandOp(
1627 pg_t pgid,
1628 const vector<string> &cmd,
1629 bufferlist inbl,
1630 bufferlist *poutbl,
1631 string *prs,
1632 Context *onfinish)
1633 : cmd(cmd),
1634 inbl(inbl),
1635 poutbl(poutbl),
1636 prs(prs),
1637 target_pg(pgid),
1638 target(pgid),
1639 onfinish(onfinish) {}
1640
1641 };
1642
1643 void submit_command(CommandOp *c, ceph_tid_t *ptid);
1644 int _calc_command_target(CommandOp *c, shunique_lock &sul);
1645 void _assign_command_session(CommandOp *c, shunique_lock &sul);
1646 void _send_command(CommandOp *c);
1647 int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
1648 void _finish_command(CommandOp *c, int r, string rs);
1649 void handle_command_reply(MCommandReply *m);
1650
1651
1652 // -- lingering ops --
1653
1654 struct WatchContext {
1655 // this simply mirrors librados WatchCtx2
1656 virtual void handle_notify(uint64_t notify_id,
1657 uint64_t cookie,
1658 uint64_t notifier_id,
1659 bufferlist& bl) = 0;
1660 virtual void handle_error(uint64_t cookie, int err) = 0;
1661 virtual ~WatchContext() {}
1662 };
1663
1664 struct LingerOp : public RefCountedObject {
1665 uint64_t linger_id;
1666
1667 op_target_t target;
1668
1669 snapid_t snap;
1670 SnapContext snapc;
1671 ceph::real_time mtime;
1672
1673 vector<OSDOp> ops;
1674 bufferlist inbl;
1675 bufferlist *poutbl;
1676 version_t *pobjver;
1677
1678 bool is_watch;
1679 ceph::mono_time watch_valid_thru; ///< send time for last acked ping
1680 int last_error; ///< error from last failed ping|reconnect, if any
1681 boost::shared_mutex watch_lock;
1682 using lock_guard = std::unique_lock<decltype(watch_lock)>;
1683 using unique_lock = std::unique_lock<decltype(watch_lock)>;
1684 using shared_lock = boost::shared_lock<decltype(watch_lock)>;
1685 using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>;
1686
1687 // queue of pending async operations, with the timestamp of
1688 // when they were queued.
1689 list<ceph::mono_time> watch_pending_async;
1690
1691 uint32_t register_gen;
1692 bool registered;
1693 bool canceled;
1694 Context *on_reg_commit;
1695
1696 // we trigger these from an async finisher
1697 Context *on_notify_finish;
1698 bufferlist *notify_result_bl;
1699 uint64_t notify_id;
1700
1701 WatchContext *watch_context;
1702
1703 OSDSession *session;
1704
1705 ceph_tid_t register_tid;
1706 ceph_tid_t ping_tid;
1707 epoch_t map_dne_bound;
1708
1709 void _queued_async() {
1710 // watch_lock ust be locked unique
1711 watch_pending_async.push_back(ceph::mono_clock::now());
1712 }
1713 void finished_async() {
1714 unique_lock l(watch_lock);
1715 assert(!watch_pending_async.empty());
1716 watch_pending_async.pop_front();
1717 }
1718
1719 LingerOp() : linger_id(0),
1720 target(object_t(), object_locator_t(), 0),
1721 snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
1722 is_watch(false), last_error(0),
1723 register_gen(0),
1724 registered(false),
1725 canceled(false),
1726 on_reg_commit(NULL),
1727 on_notify_finish(NULL),
1728 notify_result_bl(NULL),
1729 notify_id(0),
1730 watch_context(NULL),
1731 session(NULL),
1732 register_tid(0),
1733 ping_tid(0),
1734 map_dne_bound(0) {}
1735
1736 // no copy!
1737 const LingerOp &operator=(const LingerOp& r);
1738 LingerOp(const LingerOp& o);
1739
1740 uint64_t get_cookie() {
1741 return reinterpret_cast<uint64_t>(this);
1742 }
1743
1744 private:
1745 ~LingerOp() override {
1746 delete watch_context;
1747 }
1748 };
1749
1750 struct C_Linger_Commit : public Context {
1751 Objecter *objecter;
1752 LingerOp *info;
1753 bufferlist outbl; // used for notify only
1754 C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1755 info->get();
1756 }
1757 ~C_Linger_Commit() override {
1758 info->put();
1759 }
1760 void finish(int r) override {
1761 objecter->_linger_commit(info, r, outbl);
1762 }
1763 };
1764
1765 struct C_Linger_Reconnect : public Context {
1766 Objecter *objecter;
1767 LingerOp *info;
1768 C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1769 info->get();
1770 }
1771 ~C_Linger_Reconnect() override {
1772 info->put();
1773 }
1774 void finish(int r) override {
1775 objecter->_linger_reconnect(info, r);
1776 }
1777 };
1778
1779 struct C_Linger_Ping : public Context {
1780 Objecter *objecter;
1781 LingerOp *info;
1782 ceph::mono_time sent;
1783 uint32_t register_gen;
1784 C_Linger_Ping(Objecter *o, LingerOp *l)
1785 : objecter(o), info(l), register_gen(info->register_gen) {
1786 info->get();
1787 }
1788 ~C_Linger_Ping() override {
1789 info->put();
1790 }
1791 void finish(int r) override {
1792 objecter->_linger_ping(info, r, sent, register_gen);
1793 }
1794 };
1795
1796 struct C_Linger_Map_Latest : public Context {
1797 Objecter *objecter;
1798 uint64_t linger_id;
1799 version_t latest;
1800 C_Linger_Map_Latest(Objecter *o, uint64_t id) :
1801 objecter(o), linger_id(id), latest(0) {}
1802 void finish(int r) override;
1803 };
1804
1805 // -- osd sessions --
1806 struct OSDBackoff {
1807 spg_t pgid;
1808 uint64_t id;
1809 hobject_t begin, end;
1810 };
1811
1812 struct OSDSession : public RefCountedObject {
1813 boost::shared_mutex lock;
1814 using lock_guard = std::lock_guard<decltype(lock)>;
1815 using unique_lock = std::unique_lock<decltype(lock)>;
1816 using shared_lock = boost::shared_lock<decltype(lock)>;
1817 using shunique_lock = ceph::shunique_lock<decltype(lock)>;
1818
1819 // pending ops
1820 map<ceph_tid_t,Op*> ops;
1821 map<uint64_t, LingerOp*> linger_ops;
1822 map<ceph_tid_t,CommandOp*> command_ops;
1823
1824 // backoffs
1825 map<spg_t,map<hobject_t,OSDBackoff>> backoffs;
1826 map<uint64_t,OSDBackoff*> backoffs_by_id;
1827
1828 int osd;
1829 int incarnation;
1830 ConnectionRef con;
1831 int num_locks;
1832 std::unique_ptr<std::mutex[]> completion_locks;
1833 using unique_completion_lock = std::unique_lock<
1834 decltype(completion_locks)::element_type>;
1835
1836
1837 OSDSession(CephContext *cct, int o) :
1838 osd(o), incarnation(0), con(NULL),
1839 num_locks(cct->_conf->objecter_completion_locks_per_session),
1840 completion_locks(new std::mutex[num_locks]) {}
1841
1842 ~OSDSession() override;
1843
1844 bool is_homeless() { return (osd == -1); }
1845
1846 unique_completion_lock get_lock(object_t& oid);
1847 };
1848 map<int,OSDSession*> osd_sessions;
1849
1850 bool osdmap_full_flag() const;
1851 bool osdmap_pool_full(const int64_t pool_id) const;
1852
1853 private:
1854
1855 /**
1856 * Test pg_pool_t::FLAG_FULL on a pool
1857 *
1858 * @return true if the pool exists and has the flag set, or
1859 * the global full flag is set, else false
1860 */
1861 bool _osdmap_pool_full(const int64_t pool_id) const;
1862 bool _osdmap_pool_full(const pg_pool_t &p) const;
1863 void update_pool_full_map(map<int64_t, bool>& pool_full_map);
1864
1865 map<uint64_t, LingerOp*> linger_ops;
1866 // we use this just to confirm a cookie is valid before dereferencing the ptr
1867 set<LingerOp*> linger_ops_set;
1868
1869 map<ceph_tid_t,PoolStatOp*> poolstat_ops;
1870 map<ceph_tid_t,StatfsOp*> statfs_ops;
1871 map<ceph_tid_t,PoolOp*> pool_ops;
1872 std::atomic<unsigned> num_homeless_ops{0};
1873
1874 OSDSession *homeless_session;
1875
1876 // ops waiting for an osdmap with a new pool or confirmation that
1877 // the pool does not exist (may be expanded to other uses later)
1878 map<uint64_t, LingerOp*> check_latest_map_lingers;
1879 map<ceph_tid_t, Op*> check_latest_map_ops;
1880 map<ceph_tid_t, CommandOp*> check_latest_map_commands;
1881
1882 map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
1883
1884 ceph::timespan mon_timeout;
1885 ceph::timespan osd_timeout;
1886
1887 MOSDOp *_prepare_osd_op(Op *op);
1888 void _send_op(Op *op, MOSDOp *m = NULL);
1889 void _send_op_account(Op *op);
1890 void _cancel_linger_op(Op *op);
1891 void finish_op(OSDSession *session, ceph_tid_t tid);
1892 void _finish_op(Op *op, int r);
1893 static bool is_pg_changed(
1894 int oldprimary,
1895 const vector<int>& oldacting,
1896 int newprimary,
1897 const vector<int>& newacting,
1898 bool any_change=false);
1899 enum recalc_op_target_result {
1900 RECALC_OP_TARGET_NO_ACTION = 0,
1901 RECALC_OP_TARGET_NEED_RESEND,
1902 RECALC_OP_TARGET_POOL_DNE,
1903 RECALC_OP_TARGET_OSD_DNE,
1904 RECALC_OP_TARGET_OSD_DOWN,
1905 };
1906 bool _osdmap_full_flag() const;
1907 bool _osdmap_has_pool_full() const;
1908
1909 bool target_should_be_paused(op_target_t *op);
1910 int _calc_target(op_target_t *t, Connection *con,
1911 bool any_change = false);
1912 int _map_session(op_target_t *op, OSDSession **s,
1913 shunique_lock& lc);
1914
1915 void _session_op_assign(OSDSession *s, Op *op);
1916 void _session_op_remove(OSDSession *s, Op *op);
1917 void _session_linger_op_assign(OSDSession *to, LingerOp *op);
1918 void _session_linger_op_remove(OSDSession *from, LingerOp *op);
1919 void _session_command_op_assign(OSDSession *to, CommandOp *op);
1920 void _session_command_op_remove(OSDSession *from, CommandOp *op);
1921
1922 int _assign_op_target_session(Op *op, shunique_lock& lc,
1923 bool src_session_locked,
1924 bool dst_session_locked);
1925 int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc);
1926
1927 void _linger_submit(LingerOp *info, shunique_lock& sul);
1928 void _send_linger(LingerOp *info, shunique_lock& sul);
1929 void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
1930 void _linger_reconnect(LingerOp *info, int r);
1931 void _send_linger_ping(LingerOp *info);
1932 void _linger_ping(LingerOp *info, int r, ceph::mono_time sent,
1933 uint32_t register_gen);
1934 int _normalize_watch_error(int r);
1935
1936 friend class C_DoWatchError;
1937 public:
1938 void linger_callback_flush(Context *ctx) {
1939 finisher->queue(ctx);
1940 }
1941
1942 private:
1943 void _check_op_pool_dne(Op *op, unique_lock *sl);
1944 void _send_op_map_check(Op *op);
1945 void _op_cancel_map_check(Op *op);
1946 void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
1947 void _send_linger_map_check(LingerOp *op);
1948 void _linger_cancel_map_check(LingerOp *op);
1949 void _check_command_map_dne(CommandOp *op);
1950 void _send_command_map_check(CommandOp *op);
1951 void _command_cancel_map_check(CommandOp *op);
1952
1953 void kick_requests(OSDSession *session);
1954 void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend);
1955 void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
1956
1957 int _get_session(int osd, OSDSession **session, shunique_lock& sul);
1958 void put_session(OSDSession *s);
1959 void get_session(OSDSession *s);
1960 void _reopen_session(OSDSession *session);
1961 void close_session(OSDSession *session);
1962
1963 void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
1964 epoch_t reply_epoch);
1965
1966 void resend_mon_ops();
1967
1968 /**
1969 * handle a budget for in-flight ops
1970 * budget is taken whenever an op goes into the ops map
1971 * and returned whenever an op is removed from the map
1972 * If throttle_op needs to throttle it will unlock client_lock.
1973 */
1974 int calc_op_budget(Op *op);
1975 void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
1976 int _take_op_budget(Op *op, shunique_lock& sul) {
1977 assert(sul && sul.mutex() == &rwlock);
1978 int op_budget = calc_op_budget(op);
1979 if (keep_balanced_budget) {
1980 _throttle_op(op, sul, op_budget);
1981 } else {
1982 op_throttle_bytes.take(op_budget);
1983 op_throttle_ops.take(1);
1984 }
1985 op->budgeted = true;
1986 return op_budget;
1987 }
1988 void put_op_budget_bytes(int op_budget) {
1989 assert(op_budget >= 0);
1990 op_throttle_bytes.put(op_budget);
1991 op_throttle_ops.put(1);
1992 }
1993 void put_op_budget(Op *op) {
1994 assert(op->budgeted);
1995 int op_budget = calc_op_budget(op);
1996 put_op_budget_bytes(op_budget);
1997 }
1998 void put_nlist_context_budget(NListContext *list_context);
1999 Throttle op_throttle_bytes, op_throttle_ops;
2000
2001 public:
2002 Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
2003 Finisher *fin,
2004 double mon_timeout,
2005 double osd_timeout) :
2006 Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
2007 trace_endpoint("0.0.0.0", 0, "Objecter"),
2008 osdmap(new OSDMap),
2009 max_linger_id(0),
2010 keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
2011 blacklist_events_enabled(false),
2012 last_seen_osdmap_version(0), last_seen_pgmap_version(0),
2013 logger(NULL), tick_event(0), m_request_state_hook(NULL),
2014 homeless_session(new OSDSession(cct, -1)),
2015 mon_timeout(ceph::make_timespan(mon_timeout)),
2016 osd_timeout(ceph::make_timespan(osd_timeout)),
2017 op_throttle_bytes(cct, "objecter_bytes",
2018 cct->_conf->objecter_inflight_op_bytes),
2019 op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
2020 epoch_barrier(0),
2021 retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
2022 { }
2023 ~Objecter() override;
2024
2025 void init();
2026 void start(const OSDMap *o = nullptr);
2027 void shutdown();
2028
2029 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2030 // whatever functionality you want to use the OSDMap in a lambda like:
2031 //
2032 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2033 //
2034 // or
2035 //
2036 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2037 //
2038 // Do not call into something that will try to lock the OSDMap from
2039 // here or you will have great woe and misery.
2040
2041 template<typename Callback, typename...Args>
2042 auto with_osdmap(Callback&& cb, Args&&... args) const ->
2043 decltype(cb(*osdmap, std::forward<Args>(args)...)) {
2044 shared_lock l(rwlock);
2045 return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
2046 }
2047
2048
2049 /**
2050 * Tell the objecter to throttle outgoing ops according to its
2051 * budget (in _conf). If you do this, ops can block, in
2052 * which case it will unlock client_lock and sleep until
2053 * incoming messages reduce the used budget low enough for
2054 * the ops to continue going; then it will lock client_lock again.
2055 */
2056 void set_balanced_budget() { keep_balanced_budget = true; }
2057 void unset_balanced_budget() { keep_balanced_budget = false; }
2058
2059 void set_honor_osdmap_full() { honor_osdmap_full = true; }
2060 void unset_honor_osdmap_full() { honor_osdmap_full = false; }
2061
2062 void set_osdmap_full_try() { osdmap_full_try = true; }
2063 void unset_osdmap_full_try() { osdmap_full_try = false; }
2064
2065 void _scan_requests(OSDSession *s,
2066 bool force_resend,
2067 bool cluster_full,
2068 map<int64_t, bool> *pool_full_map,
2069 map<ceph_tid_t, Op*>& need_resend,
2070 list<LingerOp*>& need_resend_linger,
2071 map<ceph_tid_t, CommandOp*>& need_resend_command,
2072 shunique_lock& sul);
2073
2074 int64_t get_object_hash_position(int64_t pool, const string& key,
2075 const string& ns);
2076 int64_t get_object_pg_hash_position(int64_t pool, const string& key,
2077 const string& ns);
2078
2079 // messages
2080 public:
2081 bool ms_dispatch(Message *m) override;
2082 bool ms_can_fast_dispatch_any() const override {
2083 return true;
2084 }
2085 bool ms_can_fast_dispatch(const Message *m) const override {
2086 switch (m->get_type()) {
2087 case CEPH_MSG_OSD_OPREPLY:
2088 case CEPH_MSG_WATCH_NOTIFY:
2089 return true;
2090 default:
2091 return false;
2092 }
2093 }
2094 void ms_fast_dispatch(Message *m) override {
2095 if (!ms_dispatch(m)) {
2096 m->put();
2097 }
2098 }
2099
2100 void handle_osd_op_reply(class MOSDOpReply *m);
2101 void handle_osd_backoff(class MOSDBackoff *m);
2102 void handle_watch_notify(class MWatchNotify *m);
2103 void handle_osd_map(class MOSDMap *m);
2104 void wait_for_osd_map();
2105
2106 /**
2107 * Get list of entities blacklisted since this was last called,
2108 * and reset the list.
2109 *
2110 * Uses a std::set because typical use case is to compare some
2111 * other list of clients to see which overlap with the blacklisted
2112 * addrs.
2113 *
2114 */
2115 void consume_blacklist_events(std::set<entity_addr_t> *events);
2116
2117 int pool_snap_by_name(int64_t poolid,
2118 const char *snap_name,
2119 snapid_t *snap) const;
2120 int pool_snap_get_info(int64_t poolid, snapid_t snap,
2121 pool_snap_info_t *info) const;
2122 int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps);
2123 private:
2124
2125 void emit_blacklist_events(const OSDMap::Incremental &inc);
2126 void emit_blacklist_events(const OSDMap &old_osd_map,
2127 const OSDMap &new_osd_map);
2128
2129 // low-level
2130 void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid);
2131 void _op_submit_with_budget(Op *op, shunique_lock& lc,
2132 ceph_tid_t *ptid,
2133 int *ctx_budget = NULL);
2134 inline void unregister_op(Op *op);
2135
2136 // public interface
2137 public:
2138 void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
2139 bool is_active() {
2140 shared_lock l(rwlock);
2141 return !((!inflight_ops) && linger_ops.empty() &&
2142 poolstat_ops.empty() && statfs_ops.empty());
2143 }
2144
2145 /**
2146 * Output in-flight requests
2147 */
2148 void _dump_active(OSDSession *s);
2149 void _dump_active();
2150 void dump_active();
2151 void dump_requests(Formatter *fmt);
2152 void _dump_ops(const OSDSession *s, Formatter *fmt);
2153 void dump_ops(Formatter *fmt);
2154 void _dump_linger_ops(const OSDSession *s, Formatter *fmt);
2155 void dump_linger_ops(Formatter *fmt);
2156 void _dump_command_ops(const OSDSession *s, Formatter *fmt);
2157 void dump_command_ops(Formatter *fmt);
2158 void dump_pool_ops(Formatter *fmt) const;
2159 void dump_pool_stat_ops(Formatter *fmt) const;
2160 void dump_statfs_ops(Formatter *fmt) const;
2161
2162 int get_client_incarnation() const { return client_inc; }
2163 void set_client_incarnation(int inc) { client_inc = inc; }
2164
2165 bool have_map(epoch_t epoch);
2166 /// wait for epoch; true if we already have it
2167 bool wait_for_map(epoch_t epoch, Context *c, int err=0);
2168 void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
2169 void wait_for_latest_osdmap(Context *fin);
2170 void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2171 void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2172
2173 /** Get the current set of global op flags */
2174 int get_global_op_flags() const { return global_op_flags; }
2175 /** Add a flag to the global op flags, not really atomic operation */
2176 void add_global_op_flags(int flag) {
2177 global_op_flags.fetch_or(flag);
2178 }
2179 /** Clear the passed flags from the global op flag set */
2180 void clear_global_op_flag(int flags) {
2181 global_op_flags.fetch_and(~flags);
2182 }
2183
2184 /// cancel an in-progress request with the given return code
2185 private:
2186 int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
2187 int _op_cancel(ceph_tid_t tid, int r);
2188 public:
2189 int op_cancel(ceph_tid_t tid, int r);
2190 int op_cancel(const vector<ceph_tid_t>& tidls, int r);
2191
2192 /**
2193 * Any write op which is in progress at the start of this call shall no
2194 * longer be in progress when this call ends. Operations started after the
2195 * start of this call may still be in progress when this call ends.
2196 *
2197 * @return the latest possible epoch in which a cancelled op could have
2198 * existed, or -1 if nothing was cancelled.
2199 */
2200 epoch_t op_cancel_writes(int r, int64_t pool=-1);
2201
2202 // commands
2203 void osd_command(int osd, const std::vector<string>& cmd,
2204 const bufferlist& inbl, ceph_tid_t *ptid,
2205 bufferlist *poutbl, string *prs, Context *onfinish) {
2206 assert(osd >= 0);
2207 CommandOp *c = new CommandOp(
2208 osd,
2209 cmd,
2210 inbl,
2211 poutbl,
2212 prs,
2213 onfinish);
2214 submit_command(c, ptid);
2215 }
2216 void pg_command(pg_t pgid, const vector<string>& cmd,
2217 const bufferlist& inbl, ceph_tid_t *ptid,
2218 bufferlist *poutbl, string *prs, Context *onfinish) {
2219 CommandOp *c = new CommandOp(
2220 pgid,
2221 cmd,
2222 inbl,
2223 poutbl,
2224 prs,
2225 onfinish);
2226 submit_command(c, ptid);
2227 }
2228
2229 // mid-level helpers
2230 Op *prepare_mutate_op(
2231 const object_t& oid, const object_locator_t& oloc,
2232 ObjectOperation& op, const SnapContext& snapc,
2233 ceph::real_time mtime, int flags,
2234 Context *oncommit, version_t *objver = NULL,
2235 osd_reqid_t reqid = osd_reqid_t(),
2236 ZTracer::Trace *parent_trace = nullptr) {
2237 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2238 CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace);
2239 o->priority = op.priority;
2240 o->mtime = mtime;
2241 o->snapc = snapc;
2242 o->out_rval.swap(op.out_rval);
2243 o->reqid = reqid;
2244 return o;
2245 }
2246 ceph_tid_t mutate(
2247 const object_t& oid, const object_locator_t& oloc,
2248 ObjectOperation& op, const SnapContext& snapc,
2249 ceph::real_time mtime, int flags,
2250 Context *oncommit, version_t *objver = NULL,
2251 osd_reqid_t reqid = osd_reqid_t()) {
2252 Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
2253 oncommit, objver, reqid);
2254 ceph_tid_t tid;
2255 op_submit(o, &tid);
2256 return tid;
2257 }
2258 Op *prepare_read_op(
2259 const object_t& oid, const object_locator_t& oloc,
2260 ObjectOperation& op,
2261 snapid_t snapid, bufferlist *pbl, int flags,
2262 Context *onack, version_t *objver = NULL,
2263 int *data_offset = NULL,
2264 uint64_t features = 0,
2265 ZTracer::Trace *parent_trace = nullptr) {
2266 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2267 CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace);
2268 o->priority = op.priority;
2269 o->snapid = snapid;
2270 o->outbl = pbl;
2271 if (!o->outbl && op.size() == 1 && op.out_bl[0]->length())
2272 o->outbl = op.out_bl[0];
2273 o->out_bl.swap(op.out_bl);
2274 o->out_handler.swap(op.out_handler);
2275 o->out_rval.swap(op.out_rval);
2276 return o;
2277 }
2278 ceph_tid_t read(
2279 const object_t& oid, const object_locator_t& oloc,
2280 ObjectOperation& op,
2281 snapid_t snapid, bufferlist *pbl, int flags,
2282 Context *onack, version_t *objver = NULL,
2283 int *data_offset = NULL,
2284 uint64_t features = 0) {
2285 Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
2286 data_offset);
2287 if (features)
2288 o->features = features;
2289 ceph_tid_t tid;
2290 op_submit(o, &tid);
2291 return tid;
2292 }
2293 Op *prepare_pg_read_op(
2294 uint32_t hash, object_locator_t oloc,
2295 ObjectOperation& op, bufferlist *pbl, int flags,
2296 Context *onack, epoch_t *reply_epoch,
2297 int *ctx_budget) {
2298 Op *o = new Op(object_t(), oloc,
2299 op.ops,
2300 flags | global_op_flags | CEPH_OSD_FLAG_READ |
2301 CEPH_OSD_FLAG_IGNORE_OVERLAY,
2302 onack, NULL);
2303 o->target.precalc_pgid = true;
2304 o->target.base_pgid = pg_t(hash, oloc.pool);
2305 o->priority = op.priority;
2306 o->snapid = CEPH_NOSNAP;
2307 o->outbl = pbl;
2308 o->out_bl.swap(op.out_bl);
2309 o->out_handler.swap(op.out_handler);
2310 o->out_rval.swap(op.out_rval);
2311 o->reply_epoch = reply_epoch;
2312 if (ctx_budget) {
2313 // budget is tracked by listing context
2314 o->ctx_budgeted = true;
2315 }
2316 return o;
2317 }
2318 ceph_tid_t pg_read(
2319 uint32_t hash, object_locator_t oloc,
2320 ObjectOperation& op, bufferlist *pbl, int flags,
2321 Context *onack, epoch_t *reply_epoch,
2322 int *ctx_budget) {
2323 Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
2324 onack, reply_epoch, ctx_budget);
2325 ceph_tid_t tid;
2326 op_submit(o, &tid, ctx_budget);
2327 return tid;
2328 }
2329
2330 // caller owns a ref
2331 LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
2332 int flags);
2333 ceph_tid_t linger_watch(LingerOp *info,
2334 ObjectOperation& op,
2335 const SnapContext& snapc, ceph::real_time mtime,
2336 bufferlist& inbl,
2337 Context *onfinish,
2338 version_t *objver);
2339 ceph_tid_t linger_notify(LingerOp *info,
2340 ObjectOperation& op,
2341 snapid_t snap, bufferlist& inbl,
2342 bufferlist *poutbl,
2343 Context *onack,
2344 version_t *objver);
2345 int linger_check(LingerOp *info);
2346 void linger_cancel(LingerOp *info); // releases a reference
2347 void _linger_cancel(LingerOp *info);
2348
2349 void _do_watch_notify(LingerOp *info, MWatchNotify *m);
2350
2351 /**
2352 * set up initial ops in the op vector, and allocate a final op slot.
2353 *
2354 * The caller is responsible for filling in the final ops_count ops.
2355 *
2356 * @param ops op vector
2357 * @param ops_count number of final ops the caller will fill in
2358 * @param extra_ops pointer to [array of] initial op[s]
2359 * @return index of final op (for caller to fill in)
2360 */
2361 int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {
2362 int i;
2363 int extra = 0;
2364
2365 if (extra_ops)
2366 extra = extra_ops->ops.size();
2367
2368 ops.resize(ops_count + extra);
2369
2370 for (i=0; i<extra; i++) {
2371 ops[i] = extra_ops->ops[i];
2372 }
2373
2374 return i;
2375 }
2376
2377
2378 // high-level helpers
2379 Op *prepare_stat_op(
2380 const object_t& oid, const object_locator_t& oloc,
2381 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2382 int flags, Context *onfinish, version_t *objver = NULL,
2383 ObjectOperation *extra_ops = NULL) {
2384 vector<OSDOp> ops;
2385 int i = init_ops(ops, 1, extra_ops);
2386 ops[i].op.op = CEPH_OSD_OP_STAT;
2387 C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
2388 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2389 CEPH_OSD_FLAG_READ, fin, objver);
2390 o->snapid = snap;
2391 o->outbl = &fin->bl;
2392 return o;
2393 }
2394 ceph_tid_t stat(
2395 const object_t& oid, const object_locator_t& oloc,
2396 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2397 int flags, Context *onfinish, version_t *objver = NULL,
2398 ObjectOperation *extra_ops = NULL) {
2399 Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
2400 onfinish, objver, extra_ops);
2401 ceph_tid_t tid;
2402 op_submit(o, &tid);
2403 return tid;
2404 }
2405
2406 Op *prepare_read_op(
2407 const object_t& oid, const object_locator_t& oloc,
2408 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2409 int flags, Context *onfinish, version_t *objver = NULL,
2410 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2411 ZTracer::Trace *parent_trace = nullptr) {
2412 vector<OSDOp> ops;
2413 int i = init_ops(ops, 1, extra_ops);
2414 ops[i].op.op = CEPH_OSD_OP_READ;
2415 ops[i].op.extent.offset = off;
2416 ops[i].op.extent.length = len;
2417 ops[i].op.extent.truncate_size = 0;
2418 ops[i].op.extent.truncate_seq = 0;
2419 ops[i].op.flags = op_flags;
2420 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2421 CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace);
2422 o->snapid = snap;
2423 o->outbl = pbl;
2424 return o;
2425 }
2426 ceph_tid_t read(
2427 const object_t& oid, const object_locator_t& oloc,
2428 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2429 int flags, Context *onfinish, version_t *objver = NULL,
2430 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2431 Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
2432 onfinish, objver, extra_ops, op_flags);
2433 ceph_tid_t tid;
2434 op_submit(o, &tid);
2435 return tid;
2436 }
2437
2438 Op *prepare_cmpext_op(
2439 const object_t& oid, const object_locator_t& oloc,
2440 uint64_t off, bufferlist &cmp_bl,
2441 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2442 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2443 vector<OSDOp> ops;
2444 int i = init_ops(ops, 1, extra_ops);
2445 ops[i].op.op = CEPH_OSD_OP_CMPEXT;
2446 ops[i].op.extent.offset = off;
2447 ops[i].op.extent.length = cmp_bl.length();
2448 ops[i].op.extent.truncate_size = 0;
2449 ops[i].op.extent.truncate_seq = 0;
2450 ops[i].indata = cmp_bl;
2451 ops[i].op.flags = op_flags;
2452 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2453 CEPH_OSD_FLAG_READ, onfinish, objver);
2454 o->snapid = snap;
2455 return o;
2456 }
2457
2458 ceph_tid_t cmpext(
2459 const object_t& oid, const object_locator_t& oloc,
2460 uint64_t off, bufferlist &cmp_bl,
2461 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2462 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2463 Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
2464 flags, onfinish, objver, extra_ops, op_flags);
2465 ceph_tid_t tid;
2466 op_submit(o, &tid);
2467 return tid;
2468 }
2469
2470 ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
2471 uint64_t off, uint64_t len, snapid_t snap,
2472 bufferlist *pbl, int flags, uint64_t trunc_size,
2473 __u32 trunc_seq, Context *onfinish,
2474 version_t *objver = NULL,
2475 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2476 vector<OSDOp> ops;
2477 int i = init_ops(ops, 1, extra_ops);
2478 ops[i].op.op = CEPH_OSD_OP_READ;
2479 ops[i].op.extent.offset = off;
2480 ops[i].op.extent.length = len;
2481 ops[i].op.extent.truncate_size = trunc_size;
2482 ops[i].op.extent.truncate_seq = trunc_seq;
2483 ops[i].op.flags = op_flags;
2484 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2485 CEPH_OSD_FLAG_READ, onfinish, objver);
2486 o->snapid = snap;
2487 o->outbl = pbl;
2488 ceph_tid_t tid;
2489 op_submit(o, &tid);
2490 return tid;
2491 }
2492 ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc,
2493 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2494 int flags, Context *onfinish, version_t *objver = NULL,
2495 ObjectOperation *extra_ops = NULL) {
2496 vector<OSDOp> ops;
2497 int i = init_ops(ops, 1, extra_ops);
2498 ops[i].op.op = CEPH_OSD_OP_MAPEXT;
2499 ops[i].op.extent.offset = off;
2500 ops[i].op.extent.length = len;
2501 ops[i].op.extent.truncate_size = 0;
2502 ops[i].op.extent.truncate_seq = 0;
2503 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2504 CEPH_OSD_FLAG_READ, onfinish, objver);
2505 o->snapid = snap;
2506 o->outbl = pbl;
2507 ceph_tid_t tid;
2508 op_submit(o, &tid);
2509 return tid;
2510 }
2511 ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
2512 const char *name, snapid_t snap, bufferlist *pbl, int flags,
2513 Context *onfinish,
2514 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2515 vector<OSDOp> ops;
2516 int i = init_ops(ops, 1, extra_ops);
2517 ops[i].op.op = CEPH_OSD_OP_GETXATTR;
2518 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2519 ops[i].op.xattr.value_len = 0;
2520 if (name)
2521 ops[i].indata.append(name);
2522 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2523 CEPH_OSD_FLAG_READ, onfinish, objver);
2524 o->snapid = snap;
2525 o->outbl = pbl;
2526 ceph_tid_t tid;
2527 op_submit(o, &tid);
2528 return tid;
2529 }
2530
2531 ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc,
2532 snapid_t snap, map<string,bufferlist>& attrset,
2533 int flags, Context *onfinish, version_t *objver = NULL,
2534 ObjectOperation *extra_ops = NULL) {
2535 vector<OSDOp> ops;
2536 int i = init_ops(ops, 1, extra_ops);
2537 ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
2538 C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
2539 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2540 CEPH_OSD_FLAG_READ, fin, objver);
2541 o->snapid = snap;
2542 o->outbl = &fin->bl;
2543 ceph_tid_t tid;
2544 op_submit(o, &tid);
2545 return tid;
2546 }
2547
2548 ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc,
2549 snapid_t snap, bufferlist *pbl, int flags,
2550 Context *onfinish, version_t *objver = NULL,
2551 ObjectOperation *extra_ops = NULL) {
2552 return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags |
2553 CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
2554 }
2555
2556
2557 // writes
2558 ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
2559 vector<OSDOp>& ops, ceph::real_time mtime,
2560 const SnapContext& snapc, int flags,
2561 Context *oncommit,
2562 version_t *objver = NULL) {
2563 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2564 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2565 o->mtime = mtime;
2566 o->snapc = snapc;
2567 ceph_tid_t tid;
2568 op_submit(o, &tid);
2569 return tid;
2570 }
2571 Op *prepare_write_op(
2572 const object_t& oid, const object_locator_t& oloc,
2573 uint64_t off, uint64_t len, const SnapContext& snapc,
2574 const bufferlist &bl, ceph::real_time mtime, int flags,
2575 Context *oncommit, version_t *objver = NULL,
2576 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2577 ZTracer::Trace *parent_trace = nullptr) {
2578 vector<OSDOp> ops;
2579 int i = init_ops(ops, 1, extra_ops);
2580 ops[i].op.op = CEPH_OSD_OP_WRITE;
2581 ops[i].op.extent.offset = off;
2582 ops[i].op.extent.length = len;
2583 ops[i].op.extent.truncate_size = 0;
2584 ops[i].op.extent.truncate_seq = 0;
2585 ops[i].indata = bl;
2586 ops[i].op.flags = op_flags;
2587 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2588 CEPH_OSD_FLAG_WRITE, oncommit, objver,
2589 nullptr, parent_trace);
2590 o->mtime = mtime;
2591 o->snapc = snapc;
2592 return o;
2593 }
2594 ceph_tid_t write(
2595 const object_t& oid, const object_locator_t& oloc,
2596 uint64_t off, uint64_t len, const SnapContext& snapc,
2597 const bufferlist &bl, ceph::real_time mtime, int flags,
2598 Context *oncommit, version_t *objver = NULL,
2599 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2600 Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
2601 oncommit, objver, extra_ops, op_flags);
2602 ceph_tid_t tid;
2603 op_submit(o, &tid);
2604 return tid;
2605 }
2606 Op *prepare_append_op(
2607 const object_t& oid, const object_locator_t& oloc,
2608 uint64_t len, const SnapContext& snapc,
2609 const bufferlist &bl, ceph::real_time mtime, int flags,
2610 Context *oncommit,
2611 version_t *objver = NULL,
2612 ObjectOperation *extra_ops = NULL) {
2613 vector<OSDOp> ops;
2614 int i = init_ops(ops, 1, extra_ops);
2615 ops[i].op.op = CEPH_OSD_OP_APPEND;
2616 ops[i].op.extent.offset = 0;
2617 ops[i].op.extent.length = len;
2618 ops[i].op.extent.truncate_size = 0;
2619 ops[i].op.extent.truncate_seq = 0;
2620 ops[i].indata = bl;
2621 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2622 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2623 o->mtime = mtime;
2624 o->snapc = snapc;
2625 return o;
2626 }
2627 ceph_tid_t append(
2628 const object_t& oid, const object_locator_t& oloc,
2629 uint64_t len, const SnapContext& snapc,
2630 const bufferlist &bl, ceph::real_time mtime, int flags,
2631 Context *oncommit,
2632 version_t *objver = NULL,
2633 ObjectOperation *extra_ops = NULL) {
2634 Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
2635 oncommit, objver, extra_ops);
2636 ceph_tid_t tid;
2637 op_submit(o, &tid);
2638 return tid;
2639 }
2640 ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
2641 uint64_t off, uint64_t len, const SnapContext& snapc,
2642 const bufferlist &bl, ceph::real_time mtime, int flags,
2643 uint64_t trunc_size, __u32 trunc_seq,
2644 Context *oncommit,
2645 version_t *objver = NULL,
2646 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2647 vector<OSDOp> ops;
2648 int i = init_ops(ops, 1, extra_ops);
2649 ops[i].op.op = CEPH_OSD_OP_WRITE;
2650 ops[i].op.extent.offset = off;
2651 ops[i].op.extent.length = len;
2652 ops[i].op.extent.truncate_size = trunc_size;
2653 ops[i].op.extent.truncate_seq = trunc_seq;
2654 ops[i].indata = bl;
2655 ops[i].op.flags = op_flags;
2656 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2657 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2658 o->mtime = mtime;
2659 o->snapc = snapc;
2660 ceph_tid_t tid;
2661 op_submit(o, &tid);
2662 return tid;
2663 }
2664 Op *prepare_write_full_op(
2665 const object_t& oid, const object_locator_t& oloc,
2666 const SnapContext& snapc, const bufferlist &bl,
2667 ceph::real_time mtime, int flags,
2668 Context *oncommit, version_t *objver = NULL,
2669 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2670 vector<OSDOp> ops;
2671 int i = init_ops(ops, 1, extra_ops);
2672 ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
2673 ops[i].op.extent.offset = 0;
2674 ops[i].op.extent.length = bl.length();
2675 ops[i].indata = bl;
2676 ops[i].op.flags = op_flags;
2677 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2678 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2679 o->mtime = mtime;
2680 o->snapc = snapc;
2681 return o;
2682 }
2683 ceph_tid_t write_full(
2684 const object_t& oid, const object_locator_t& oloc,
2685 const SnapContext& snapc, const bufferlist &bl,
2686 ceph::real_time mtime, int flags,
2687 Context *oncommit, version_t *objver = NULL,
2688 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2689 Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
2690 oncommit, objver, extra_ops, op_flags);
2691 ceph_tid_t tid;
2692 op_submit(o, &tid);
2693 return tid;
2694 }
2695 Op *prepare_writesame_op(
2696 const object_t& oid, const object_locator_t& oloc,
2697 uint64_t write_len, uint64_t off,
2698 const SnapContext& snapc, const bufferlist &bl,
2699 ceph::real_time mtime, int flags,
2700 Context *oncommit, version_t *objver = NULL,
2701 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2702
2703 vector<OSDOp> ops;
2704 int i = init_ops(ops, 1, extra_ops);
2705 ops[i].op.op = CEPH_OSD_OP_WRITESAME;
2706 ops[i].op.writesame.offset = off;
2707 ops[i].op.writesame.length = write_len;
2708 ops[i].op.writesame.data_length = bl.length();
2709 ops[i].indata = bl;
2710 ops[i].op.flags = op_flags;
2711 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2712 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2713 o->mtime = mtime;
2714 o->snapc = snapc;
2715 return o;
2716 }
2717 ceph_tid_t writesame(
2718 const object_t& oid, const object_locator_t& oloc,
2719 uint64_t write_len, uint64_t off,
2720 const SnapContext& snapc, const bufferlist &bl,
2721 ceph::real_time mtime, int flags,
2722 Context *oncommit, version_t *objver = NULL,
2723 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2724
2725 Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
2726 mtime, flags, oncommit, objver,
2727 extra_ops, op_flags);
2728
2729 ceph_tid_t tid;
2730 op_submit(o, &tid);
2731 return tid;
2732 }
2733 ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
2734 const SnapContext& snapc, ceph::real_time mtime, int flags,
2735 uint64_t trunc_size, __u32 trunc_seq,
2736 Context *oncommit, version_t *objver = NULL,
2737 ObjectOperation *extra_ops = NULL) {
2738 vector<OSDOp> ops;
2739 int i = init_ops(ops, 1, extra_ops);
2740 ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
2741 ops[i].op.extent.offset = trunc_size;
2742 ops[i].op.extent.truncate_size = trunc_size;
2743 ops[i].op.extent.truncate_seq = trunc_seq;
2744 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2745 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2746 o->mtime = mtime;
2747 o->snapc = snapc;
2748 ceph_tid_t tid;
2749 op_submit(o, &tid);
2750 return tid;
2751 }
2752 ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
2753 uint64_t off, uint64_t len, const SnapContext& snapc,
2754 ceph::real_time mtime, int flags, Context *oncommit,
2755 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2756 vector<OSDOp> ops;
2757 int i = init_ops(ops, 1, extra_ops);
2758 ops[i].op.op = CEPH_OSD_OP_ZERO;
2759 ops[i].op.extent.offset = off;
2760 ops[i].op.extent.length = len;
2761 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2762 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2763 o->mtime = mtime;
2764 o->snapc = snapc;
2765 ceph_tid_t tid;
2766 op_submit(o, &tid);
2767 return tid;
2768 }
2769 ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
2770 const SnapContext& snapc, snapid_t snapid,
2771 ceph::real_time mtime, Context *oncommit,
2772 version_t *objver = NULL,
2773 ObjectOperation *extra_ops = NULL) {
2774 vector<OSDOp> ops;
2775 int i = init_ops(ops, 1, extra_ops);
2776 ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
2777 ops[i].op.snap.snapid = snapid;
2778 Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver);
2779 o->mtime = mtime;
2780 o->snapc = snapc;
2781 ceph_tid_t tid;
2782 op_submit(o, &tid);
2783 return tid;
2784 }
2785 ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
2786 const SnapContext& snapc, ceph::real_time mtime, int global_flags,
2787 int create_flags, Context *oncommit,
2788 version_t *objver = NULL,
2789 ObjectOperation *extra_ops = NULL) {
2790 vector<OSDOp> ops;
2791 int i = init_ops(ops, 1, extra_ops);
2792 ops[i].op.op = CEPH_OSD_OP_CREATE;
2793 ops[i].op.flags = create_flags;
2794 Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags |
2795 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2796 o->mtime = mtime;
2797 o->snapc = snapc;
2798 ceph_tid_t tid;
2799 op_submit(o, &tid);
2800 return tid;
2801 }
2802 Op *prepare_remove_op(
2803 const object_t& oid, const object_locator_t& oloc,
2804 const SnapContext& snapc, ceph::real_time mtime, int flags,
2805 Context *oncommit,
2806 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2807 vector<OSDOp> ops;
2808 int i = init_ops(ops, 1, extra_ops);
2809 ops[i].op.op = CEPH_OSD_OP_DELETE;
2810 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2811 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2812 o->mtime = mtime;
2813 o->snapc = snapc;
2814 return o;
2815 }
2816 ceph_tid_t remove(
2817 const object_t& oid, const object_locator_t& oloc,
2818 const SnapContext& snapc, ceph::real_time mtime, int flags,
2819 Context *oncommit,
2820 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2821 Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
2822 oncommit, objver, extra_ops);
2823 ceph_tid_t tid;
2824 op_submit(o, &tid);
2825 return tid;
2826 }
2827
2828 ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
2829 const char *name, const SnapContext& snapc, const bufferlist &bl,
2830 ceph::real_time mtime, int flags,
2831 Context *oncommit,
2832 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2833 vector<OSDOp> ops;
2834 int i = init_ops(ops, 1, extra_ops);
2835 ops[i].op.op = CEPH_OSD_OP_SETXATTR;
2836 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2837 ops[i].op.xattr.value_len = bl.length();
2838 if (name)
2839 ops[i].indata.append(name);
2840 ops[i].indata.append(bl);
2841 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2842 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2843 o->mtime = mtime;
2844 o->snapc = snapc;
2845 ceph_tid_t tid;
2846 op_submit(o, &tid);
2847 return tid;
2848 }
2849 ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
2850 const char *name, const SnapContext& snapc,
2851 ceph::real_time mtime, int flags,
2852 Context *oncommit,
2853 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2854 vector<OSDOp> ops;
2855 int i = init_ops(ops, 1, extra_ops);
2856 ops[i].op.op = CEPH_OSD_OP_RMXATTR;
2857 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2858 ops[i].op.xattr.value_len = 0;
2859 if (name)
2860 ops[i].indata.append(name);
2861 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2862 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2863 o->mtime = mtime;
2864 o->snapc = snapc;
2865 ceph_tid_t tid;
2866 op_submit(o, &tid);
2867 return tid;
2868 }
2869
2870 void list_nobjects(NListContext *p, Context *onfinish);
2871 uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
2872 uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c);
2873 void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);
2874
2875 hobject_t enumerate_objects_begin();
2876 hobject_t enumerate_objects_end();
2877 //hobject_t enumerate_objects_begin(int n, int m);
2878 void enumerate_objects(
2879 int64_t pool_id,
2880 const std::string &ns,
2881 const hobject_t &start,
2882 const hobject_t &end,
2883 const uint32_t max,
2884 const bufferlist &filter_bl,
2885 std::list<librados::ListObjectImpl> *result,
2886 hobject_t *next,
2887 Context *on_finish);
2888
2889 void _enumerate_reply(
2890 bufferlist &bl,
2891 int r,
2892 const hobject_t &end,
2893 const int64_t pool_id,
2894 int budget,
2895 epoch_t reply_epoch,
2896 std::list<librados::ListObjectImpl> *result,
2897 hobject_t *next,
2898 Context *on_finish);
2899 friend class C_EnumerateReply;
2900
2901 // -------------------------
2902 // pool ops
2903 private:
2904 void pool_op_submit(PoolOp *op);
2905 void _pool_op_submit(PoolOp *op);
2906 void _finish_pool_op(PoolOp *op, int r);
2907 void _do_delete_pool(int64_t pool, Context *onfinish);
2908 public:
2909 int create_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2910 int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
2911 Context *onfinish);
2912 int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2913 int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish);
2914
2915 int create_pool(string& name, Context *onfinish, uint64_t auid=0,
2916 int crush_rule=-1);
2917 int delete_pool(int64_t pool, Context *onfinish);
2918 int delete_pool(const string& name, Context *onfinish);
2919 int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid);
2920
2921 void handle_pool_op_reply(MPoolOpReply *m);
2922 int pool_op_cancel(ceph_tid_t tid, int r);
2923
2924 // --------------------------
2925 // pool stats
2926 private:
2927 void _poolstat_submit(PoolStatOp *op);
2928 public:
2929 void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
2930 void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
2931 Context *onfinish);
2932 int pool_stat_op_cancel(ceph_tid_t tid, int r);
2933 void _finish_pool_stat_op(PoolStatOp *op, int r);
2934
2935 // ---------------------------
2936 // df stats
2937 private:
2938 void _fs_stats_submit(StatfsOp *op);
2939 public:
2940 void handle_fs_stats_reply(MStatfsReply *m);
2941 void get_fs_stats(struct ceph_statfs& result, boost::optional<int64_t> poolid,
2942 Context *onfinish);
2943 int statfs_op_cancel(ceph_tid_t tid, int r);
2944 void _finish_statfs_op(StatfsOp *op, int r);
2945
2946 // ---------------------------
2947 // some scatter/gather hackery
2948
2949 void _sg_read_finish(vector<ObjectExtent>& extents,
2950 vector<bufferlist>& resultbl,
2951 bufferlist *bl, Context *onfinish);
2952
2953 struct C_SGRead : public Context {
2954 Objecter *objecter;
2955 vector<ObjectExtent> extents;
2956 vector<bufferlist> resultbl;
2957 bufferlist *bl;
2958 Context *onfinish;
2959 C_SGRead(Objecter *ob,
2960 vector<ObjectExtent>& e, vector<bufferlist>& r, bufferlist *b,
2961 Context *c) :
2962 objecter(ob), bl(b), onfinish(c) {
2963 extents.swap(e);
2964 resultbl.swap(r);
2965 }
2966 void finish(int r) override {
2967 objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
2968 }
2969 };
2970
2971 void sg_read_trunc(vector<ObjectExtent>& extents, snapid_t snap,
2972 bufferlist *bl, int flags, uint64_t trunc_size,
2973 __u32 trunc_seq, Context *onfinish, int op_flags = 0) {
2974 if (extents.size() == 1) {
2975 read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
2976 extents[0].length, snap, bl, flags, extents[0].truncate_size,
2977 trunc_seq, onfinish, 0, 0, op_flags);
2978 } else {
2979 C_GatherBuilder gather(cct);
2980 vector<bufferlist> resultbl(extents.size());
2981 int i=0;
2982 for (vector<ObjectExtent>::iterator p = extents.begin();
2983 p != extents.end();
2984 ++p) {
2985 read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
2986 flags, p->truncate_size, trunc_seq, gather.new_sub(),
2987 0, 0, op_flags);
2988 }
2989 gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
2990 gather.activate();
2991 }
2992 }
2993
2994 void sg_read(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl,
2995 int flags, Context *onfinish, int op_flags = 0) {
2996 sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
2997 }
2998
2999 void sg_write_trunc(vector<ObjectExtent>& extents, const SnapContext& snapc,
3000 const bufferlist& bl, ceph::real_time mtime, int flags,
3001 uint64_t trunc_size, __u32 trunc_seq,
3002 Context *oncommit, int op_flags = 0) {
3003 if (extents.size() == 1) {
3004 write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
3005 extents[0].length, snapc, bl, mtime, flags,
3006 extents[0].truncate_size, trunc_seq, oncommit,
3007 0, 0, op_flags);
3008 } else {
3009 C_GatherBuilder gcom(cct, oncommit);
3010 for (vector<ObjectExtent>::iterator p = extents.begin();
3011 p != extents.end();
3012 ++p) {
3013 bufferlist cur;
3014 for (vector<pair<uint64_t,uint64_t> >::iterator bit
3015 = p->buffer_extents.begin();
3016 bit != p->buffer_extents.end();
3017 ++bit)
3018 bl.copy(bit->first, bit->second, cur);
3019 assert(cur.length() == p->length);
3020 write_trunc(p->oid, p->oloc, p->offset, p->length,
3021 snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
3022 oncommit ? gcom.new_sub():0,
3023 0, 0, op_flags);
3024 }
3025 gcom.activate();
3026 }
3027 }
3028
3029 void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc,
3030 const bufferlist& bl, ceph::real_time mtime, int flags,
3031 Context *oncommit, int op_flags = 0) {
3032 sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
3033 op_flags);
3034 }
3035
3036 void ms_handle_connect(Connection *con) override;
3037 bool ms_handle_reset(Connection *con) override;
3038 void ms_handle_remote_reset(Connection *con) override;
3039 bool ms_handle_refused(Connection *con) override;
3040 bool ms_get_authorizer(int dest_type,
3041 AuthAuthorizer **authorizer,
3042 bool force_new) override;
3043
3044 void blacklist_self(bool set);
3045
3046 private:
3047 epoch_t epoch_barrier;
3048 bool retry_writes_after_first_reply;
3049 public:
3050 void set_epoch_barrier(epoch_t epoch);
3051
3052 PerfCounters *get_logger() {
3053 return logger;
3054 }
3055 };
3056
3057 #endif