]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/Objecter.h
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / osdc / Objecter.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
17
18 #include <condition_variable>
19 #include <list>
20 #include <map>
21 #include <mutex>
22 #include <memory>
23 #include <sstream>
24 #include <type_traits>
25
26 #include <boost/thread/shared_mutex.hpp>
27
28 #include "include/ceph_assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
32
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/config_obs.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
39 #include "common/Finisher.h"
40 #include "common/Throttle.h"
41
42 #include "messages/MOSDOp.h"
43 #include "msg/Dispatcher.h"
44 #include "osd/OSDMap.h"
45
46
47 class Context;
48 class Messenger;
49 class OSDMap;
50 class MonClient;
51 class Message;
52 class Finisher;
53
54 class MPoolOpReply;
55
56 class MGetPoolStatsReply;
57 class MStatfsReply;
58 class MCommandReply;
59 class MWatchNotify;
60
61 class PerfCounters;
62
63 // -----------------------------------------
64
65 struct ObjectOperation {
66 vector<OSDOp> ops;
67 int flags;
68 int priority;
69
70 vector<bufferlist*> out_bl;
71 vector<Context*> out_handler;
72 vector<int*> out_rval;
73
74 ObjectOperation() : flags(0), priority(0) {}
75 ~ObjectOperation() {
76 while (!out_handler.empty()) {
77 delete out_handler.back();
78 out_handler.pop_back();
79 }
80 }
81
82 size_t size() {
83 return ops.size();
84 }
85
86 void set_last_op_flags(int flags) {
87 ceph_assert(!ops.empty());
88 ops.rbegin()->op.flags = flags;
89 }
90
91 class C_TwoContexts;
92 /**
93 * Add a callback to run when this operation completes,
94 * after any other callbacks for it.
95 */
96 void add_handler(Context *extra);
97
98 OSDOp& add_op(int op) {
99 int s = ops.size();
100 ops.resize(s+1);
101 ops[s].op.op = op;
102 out_bl.resize(s+1);
103 out_bl[s] = NULL;
104 out_handler.resize(s+1);
105 out_handler[s] = NULL;
106 out_rval.resize(s+1);
107 out_rval[s] = NULL;
108 return ops[s];
109 }
110 void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) {
111 OSDOp& osd_op = add_op(op);
112 osd_op.op.extent.offset = off;
113 osd_op.op.extent.length = len;
114 osd_op.indata.claim_append(bl);
115 }
116 void add_writesame(int op, uint64_t off, uint64_t write_len,
117 bufferlist& bl) {
118 OSDOp& osd_op = add_op(op);
119 osd_op.op.writesame.offset = off;
120 osd_op.op.writesame.length = write_len;
121 osd_op.op.writesame.data_length = bl.length();
122 osd_op.indata.claim_append(bl);
123 }
124 void add_xattr(int op, const char *name, const bufferlist& data) {
125 OSDOp& osd_op = add_op(op);
126 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
127 osd_op.op.xattr.value_len = data.length();
128 if (name)
129 osd_op.indata.append(name, osd_op.op.xattr.name_len);
130 osd_op.indata.append(data);
131 }
132 void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
133 uint8_t cmp_mode, const bufferlist& data) {
134 OSDOp& osd_op = add_op(op);
135 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
136 osd_op.op.xattr.value_len = data.length();
137 osd_op.op.xattr.cmp_op = cmp_op;
138 osd_op.op.xattr.cmp_mode = cmp_mode;
139 if (name)
140 osd_op.indata.append(name, osd_op.op.xattr.name_len);
141 osd_op.indata.append(data);
142 }
143 void add_call(int op, const char *cname, const char *method,
144 bufferlist &indata,
145 bufferlist *outbl, Context *ctx, int *prval) {
146 OSDOp& osd_op = add_op(op);
147
148 unsigned p = ops.size() - 1;
149 out_handler[p] = ctx;
150 out_bl[p] = outbl;
151 out_rval[p] = prval;
152
153 osd_op.op.cls.class_len = strlen(cname);
154 osd_op.op.cls.method_len = strlen(method);
155 osd_op.op.cls.indata_len = indata.length();
156 osd_op.indata.append(cname, osd_op.op.cls.class_len);
157 osd_op.indata.append(method, osd_op.op.cls.method_len);
158 osd_op.indata.append(indata);
159 }
160 void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
161 epoch_t start_epoch) {
162 OSDOp& osd_op = add_op(op);
163 osd_op.op.pgls.count = count;
164 osd_op.op.pgls.start_epoch = start_epoch;
165 encode(cookie, osd_op.indata);
166 }
167 void add_pgls_filter(int op, uint64_t count, const bufferlist& filter,
168 collection_list_handle_t cookie, epoch_t start_epoch) {
169 OSDOp& osd_op = add_op(op);
170 osd_op.op.pgls.count = count;
171 osd_op.op.pgls.start_epoch = start_epoch;
172 string cname = "pg";
173 string mname = "filter";
174 encode(cname, osd_op.indata);
175 encode(mname, osd_op.indata);
176 osd_op.indata.append(filter);
177 encode(cookie, osd_op.indata);
178 }
179 void add_alloc_hint(int op, uint64_t expected_object_size,
180 uint64_t expected_write_size,
181 uint32_t flags) {
182 OSDOp& osd_op = add_op(op);
183 osd_op.op.alloc_hint.expected_object_size = expected_object_size;
184 osd_op.op.alloc_hint.expected_write_size = expected_write_size;
185 osd_op.op.alloc_hint.flags = flags;
186 }
187
188 // ------
189
190 // pg
191 void pg_ls(uint64_t count, bufferlist& filter,
192 collection_list_handle_t cookie, epoch_t start_epoch) {
193 if (filter.length() == 0)
194 add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
195 else
196 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
197 start_epoch);
198 flags |= CEPH_OSD_FLAG_PGOP;
199 }
200
201 void pg_nls(uint64_t count, const bufferlist& filter,
202 collection_list_handle_t cookie, epoch_t start_epoch) {
203 if (filter.length() == 0)
204 add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
205 else
206 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
207 start_epoch);
208 flags |= CEPH_OSD_FLAG_PGOP;
209 }
210
211 void scrub_ls(const librados::object_id_t& start_after,
212 uint64_t max_to_get,
213 std::vector<librados::inconsistent_obj_t> *objects,
214 uint32_t *interval,
215 int *rval);
216 void scrub_ls(const librados::object_id_t& start_after,
217 uint64_t max_to_get,
218 std::vector<librados::inconsistent_snapset_t> *objects,
219 uint32_t *interval,
220 int *rval);
221
222 void create(bool excl) {
223 OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
224 o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
225 }
226
227 struct C_ObjectOperation_stat : public Context {
228 bufferlist bl;
229 uint64_t *psize;
230 ceph::real_time *pmtime;
231 time_t *ptime;
232 struct timespec *pts;
233 int *prval;
234 C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
235 int *prval)
236 : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {}
237 void finish(int r) override {
238 if (r >= 0) {
239 auto p = bl.cbegin();
240 try {
241 uint64_t size;
242 ceph::real_time mtime;
243 decode(size, p);
244 decode(mtime, p);
245 if (psize)
246 *psize = size;
247 if (pmtime)
248 *pmtime = mtime;
249 if (ptime)
250 *ptime = ceph::real_clock::to_time_t(mtime);
251 if (pts)
252 *pts = ceph::real_clock::to_timespec(mtime);
253 } catch (buffer::error& e) {
254 if (prval)
255 *prval = -EIO;
256 }
257 }
258 }
259 };
260 void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
261 add_op(CEPH_OSD_OP_STAT);
262 unsigned p = ops.size() - 1;
263 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL,
264 prval);
265 out_bl[p] = &h->bl;
266 out_handler[p] = h;
267 out_rval[p] = prval;
268 }
269 void stat(uint64_t *psize, time_t *ptime, int *prval) {
270 add_op(CEPH_OSD_OP_STAT);
271 unsigned p = ops.size() - 1;
272 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL,
273 prval);
274 out_bl[p] = &h->bl;
275 out_handler[p] = h;
276 out_rval[p] = prval;
277 }
278 void stat(uint64_t *psize, struct timespec *pts, int *prval) {
279 add_op(CEPH_OSD_OP_STAT);
280 unsigned p = ops.size() - 1;
281 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts,
282 prval);
283 out_bl[p] = &h->bl;
284 out_handler[p] = h;
285 out_rval[p] = prval;
286 }
287 // object cmpext
288 struct C_ObjectOperation_cmpext : public Context {
289 int *prval;
290 explicit C_ObjectOperation_cmpext(int *prval)
291 : prval(prval) {}
292
293 void finish(int r) {
294 if (prval)
295 *prval = r;
296 }
297 };
298
299 void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval) {
300 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
301 unsigned p = ops.size() - 1;
302 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
303 out_handler[p] = h;
304 out_rval[p] = prval;
305 }
306
307 // Used by C API
308 void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
309 bufferlist cmp_bl;
310 cmp_bl.append(cmp_buf, cmp_len);
311 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
312 unsigned p = ops.size() - 1;
313 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
314 out_handler[p] = h;
315 out_rval[p] = prval;
316 }
317
318 void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval,
319 Context* ctx) {
320 bufferlist bl;
321 add_data(CEPH_OSD_OP_READ, off, len, bl);
322 unsigned p = ops.size() - 1;
323 out_bl[p] = pbl;
324 out_rval[p] = prval;
325 out_handler[p] = ctx;
326 }
327
328 struct C_ObjectOperation_sparse_read : public Context {
329 bufferlist bl;
330 bufferlist *data_bl;
331 std::map<uint64_t, uint64_t> *extents;
332 int *prval;
333 C_ObjectOperation_sparse_read(bufferlist *data_bl,
334 std::map<uint64_t, uint64_t> *extents,
335 int *prval)
336 : data_bl(data_bl), extents(extents), prval(prval) {}
337 void finish(int r) override {
338 auto iter = bl.cbegin();
339 if (r >= 0) {
340 // NOTE: it's possible the sub-op has not been executed but the result
341 // code remains zeroed. Avoid the costly exception handling on a
342 // potential IO path.
343 if (bl.length() > 0) {
344 try {
345 decode(*extents, iter);
346 decode(*data_bl, iter);
347 } catch (buffer::error& e) {
348 if (prval)
349 *prval = -EIO;
350 }
351 } else if (prval) {
352 *prval = -EIO;
353 }
354 }
355 }
356 };
357 void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m,
358 bufferlist *data_bl, int *prval) {
359 bufferlist bl;
360 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
361 unsigned p = ops.size() - 1;
362 C_ObjectOperation_sparse_read *h =
363 new C_ObjectOperation_sparse_read(data_bl, m, prval);
364 out_bl[p] = &h->bl;
365 out_handler[p] = h;
366 out_rval[p] = prval;
367 }
368 void write(uint64_t off, bufferlist& bl,
369 uint64_t truncate_size,
370 uint32_t truncate_seq) {
371 add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
372 OSDOp& o = *ops.rbegin();
373 o.op.extent.truncate_size = truncate_size;
374 o.op.extent.truncate_seq = truncate_seq;
375 }
376 void write(uint64_t off, bufferlist& bl) {
377 write(off, bl, 0, 0);
378 }
379 void write_full(bufferlist& bl) {
380 add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
381 }
382 void writesame(uint64_t off, uint64_t write_len, bufferlist& bl) {
383 add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
384 }
385 void append(bufferlist& bl) {
386 add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
387 }
388 void zero(uint64_t off, uint64_t len) {
389 bufferlist bl;
390 add_data(CEPH_OSD_OP_ZERO, off, len, bl);
391 }
392 void truncate(uint64_t off) {
393 bufferlist bl;
394 add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
395 }
396 void remove() {
397 bufferlist bl;
398 add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
399 }
400 void mapext(uint64_t off, uint64_t len) {
401 bufferlist bl;
402 add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
403 }
404 void sparse_read(uint64_t off, uint64_t len) {
405 bufferlist bl;
406 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
407 }
408
409 void checksum(uint8_t type, const bufferlist &init_value_bl,
410 uint64_t off, uint64_t len, size_t chunk_size,
411 bufferlist *pbl, int *prval, Context *ctx) {
412 OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
413 osd_op.op.checksum.offset = off;
414 osd_op.op.checksum.length = len;
415 osd_op.op.checksum.type = type;
416 osd_op.op.checksum.chunk_size = chunk_size;
417 osd_op.indata.append(init_value_bl);
418
419 unsigned p = ops.size() - 1;
420 out_bl[p] = pbl;
421 out_rval[p] = prval;
422 out_handler[p] = ctx;
423 }
424
425 // object attrs
426 void getxattr(const char *name, bufferlist *pbl, int *prval) {
427 bufferlist bl;
428 add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
429 unsigned p = ops.size() - 1;
430 out_bl[p] = pbl;
431 out_rval[p] = prval;
432 }
433 struct C_ObjectOperation_decodevals : public Context {
434 uint64_t max_entries;
435 bufferlist bl;
436 std::map<std::string,bufferlist> *pattrs;
437 bool *ptruncated;
438 int *prval;
439 C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,bufferlist> *pa,
440 bool *pt, int *pr)
441 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
442 if (ptruncated) {
443 *ptruncated = false;
444 }
445 }
446 void finish(int r) override {
447 if (r >= 0) {
448 auto p = bl.cbegin();
449 try {
450 if (pattrs)
451 decode(*pattrs, p);
452 if (ptruncated) {
453 std::map<std::string,bufferlist> ignore;
454 if (!pattrs) {
455 decode(ignore, p);
456 pattrs = &ignore;
457 }
458 if (!p.end()) {
459 decode(*ptruncated, p);
460 } else {
461 // the OSD did not provide this. since old OSDs do not
462 // enfoce omap result limits either, we can infer it from
463 // the size of the result
464 *ptruncated = (pattrs->size() == max_entries);
465 }
466 }
467 }
468 catch (buffer::error& e) {
469 if (prval)
470 *prval = -EIO;
471 }
472 }
473 }
474 };
475 struct C_ObjectOperation_decodekeys : public Context {
476 uint64_t max_entries;
477 bufferlist bl;
478 std::set<std::string> *pattrs;
479 bool *ptruncated;
480 int *prval;
481 C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt,
482 int *pr)
483 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
484 if (ptruncated) {
485 *ptruncated = false;
486 }
487 }
488 void finish(int r) override {
489 if (r >= 0) {
490 auto p = bl.cbegin();
491 try {
492 if (pattrs)
493 decode(*pattrs, p);
494 if (ptruncated) {
495 std::set<std::string> ignore;
496 if (!pattrs) {
497 decode(ignore, p);
498 pattrs = &ignore;
499 }
500 if (!p.end()) {
501 decode(*ptruncated, p);
502 } else {
503 // the OSD did not provide this. since old OSDs do not
504 // enforce omap result limits either, we can infer it from
505 // the size of the result
506 *ptruncated = (pattrs->size() == max_entries);
507 }
508 }
509 }
510 catch (buffer::error& e) {
511 if (prval)
512 *prval = -EIO;
513 }
514 }
515 }
516 };
517 struct C_ObjectOperation_decodewatchers : public Context {
518 bufferlist bl;
519 list<obj_watch_t> *pwatchers;
520 int *prval;
521 C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr)
522 : pwatchers(pw), prval(pr) {}
523 void finish(int r) override {
524 if (r >= 0) {
525 auto p = bl.cbegin();
526 try {
527 obj_list_watch_response_t resp;
528 decode(resp, p);
529 if (pwatchers) {
530 for (list<watch_item_t>::iterator i = resp.entries.begin() ;
531 i != resp.entries.end() ; ++i) {
532 obj_watch_t ow;
533 string sa = i->addr.get_legacy_str();
534 strncpy(ow.addr, sa.c_str(), 256);
535 ow.watcher_id = i->name.num();
536 ow.cookie = i->cookie;
537 ow.timeout_seconds = i->timeout_seconds;
538 pwatchers->push_back(ow);
539 }
540 }
541 }
542 catch (buffer::error& e) {
543 if (prval)
544 *prval = -EIO;
545 }
546 }
547 }
548 };
549 struct C_ObjectOperation_decodesnaps : public Context {
550 bufferlist bl;
551 librados::snap_set_t *psnaps;
552 int *prval;
553 C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr)
554 : psnaps(ps), prval(pr) {}
555 void finish(int r) override {
556 if (r >= 0) {
557 auto p = bl.cbegin();
558 try {
559 obj_list_snap_response_t resp;
560 decode(resp, p);
561 if (psnaps) {
562 psnaps->clones.clear();
563 for (vector<clone_info>::iterator ci = resp.clones.begin();
564 ci != resp.clones.end();
565 ++ci) {
566 librados::clone_info_t clone;
567
568 clone.cloneid = ci->cloneid;
569 clone.snaps.reserve(ci->snaps.size());
570 clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
571 ci->snaps.end());
572 clone.overlap = ci->overlap;
573 clone.size = ci->size;
574
575 psnaps->clones.push_back(clone);
576 }
577 psnaps->seq = resp.seq;
578 }
579 } catch (buffer::error& e) {
580 if (prval)
581 *prval = -EIO;
582 }
583 }
584 }
585 };
586 void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {
587 add_op(CEPH_OSD_OP_GETXATTRS);
588 if (pattrs || prval) {
589 unsigned p = ops.size() - 1;
590 C_ObjectOperation_decodevals *h
591 = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval);
592 out_handler[p] = h;
593 out_bl[p] = &h->bl;
594 out_rval[p] = prval;
595 }
596 }
597 void setxattr(const char *name, const bufferlist& bl) {
598 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
599 }
600 void setxattr(const char *name, const string& s) {
601 bufferlist bl;
602 bl.append(s);
603 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
604 }
605 void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
606 const bufferlist& bl) {
607 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
608 }
609 void rmxattr(const char *name) {
610 bufferlist bl;
611 add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
612 }
613 void setxattrs(map<string, bufferlist>& attrs) {
614 bufferlist bl;
615 encode(attrs, bl);
616 add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
617 }
618 void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
619 bufferlist bl;
620 encode(attrs, bl);
621 add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
622 }
623
624 // trivialmap
625 void tmap_update(bufferlist& bl) {
626 add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
627 }
628
629 // objectmap
630 void omap_get_keys(const string &start_after,
631 uint64_t max_to_get,
632 std::set<std::string> *out_set,
633 bool *ptruncated,
634 int *prval) {
635 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
636 bufferlist bl;
637 encode(start_after, bl);
638 encode(max_to_get, bl);
639 op.op.extent.offset = 0;
640 op.op.extent.length = bl.length();
641 op.indata.claim_append(bl);
642 if (prval || ptruncated || out_set) {
643 unsigned p = ops.size() - 1;
644 C_ObjectOperation_decodekeys *h =
645 new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval);
646 out_handler[p] = h;
647 out_bl[p] = &h->bl;
648 out_rval[p] = prval;
649 }
650 }
651
652 void omap_get_vals(const string &start_after,
653 const string &filter_prefix,
654 uint64_t max_to_get,
655 std::map<std::string, bufferlist> *out_set,
656 bool *ptruncated,
657 int *prval) {
658 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
659 bufferlist bl;
660 encode(start_after, bl);
661 encode(max_to_get, bl);
662 encode(filter_prefix, bl);
663 op.op.extent.offset = 0;
664 op.op.extent.length = bl.length();
665 op.indata.claim_append(bl);
666 if (prval || out_set || ptruncated) {
667 unsigned p = ops.size() - 1;
668 C_ObjectOperation_decodevals *h =
669 new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval);
670 out_handler[p] = h;
671 out_bl[p] = &h->bl;
672 out_rval[p] = prval;
673 }
674 }
675
676 void omap_get_vals_by_keys(const std::set<std::string> &to_get,
677 std::map<std::string, bufferlist> *out_set,
678 int *prval) {
679 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
680 bufferlist bl;
681 encode(to_get, bl);
682 op.op.extent.offset = 0;
683 op.op.extent.length = bl.length();
684 op.indata.claim_append(bl);
685 if (prval || out_set) {
686 unsigned p = ops.size() - 1;
687 C_ObjectOperation_decodevals *h =
688 new C_ObjectOperation_decodevals(0, out_set, nullptr, prval);
689 out_handler[p] = h;
690 out_bl[p] = &h->bl;
691 out_rval[p] = prval;
692 }
693 }
694
695 void omap_cmp(const std::map<std::string, pair<bufferlist,int> > &assertions,
696 int *prval) {
697 OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
698 bufferlist bl;
699 encode(assertions, bl);
700 op.op.extent.offset = 0;
701 op.op.extent.length = bl.length();
702 op.indata.claim_append(bl);
703 if (prval) {
704 unsigned p = ops.size() - 1;
705 out_rval[p] = prval;
706 }
707 }
708
709 struct C_ObjectOperation_copyget : public Context {
710 bufferlist bl;
711 object_copy_cursor_t *cursor;
712 uint64_t *out_size;
713 ceph::real_time *out_mtime;
714 std::map<std::string,bufferlist> *out_attrs;
715 bufferlist *out_data, *out_omap_header, *out_omap_data;
716 vector<snapid_t> *out_snaps;
717 snapid_t *out_snap_seq;
718 uint32_t *out_flags;
719 uint32_t *out_data_digest;
720 uint32_t *out_omap_digest;
721 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids;
722 mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes;
723 uint64_t *out_truncate_seq;
724 uint64_t *out_truncate_size;
725 int *prval;
726 C_ObjectOperation_copyget(object_copy_cursor_t *c,
727 uint64_t *s,
728 ceph::real_time *m,
729 std::map<std::string,bufferlist> *a,
730 bufferlist *d, bufferlist *oh,
731 bufferlist *o,
732 std::vector<snapid_t> *osnaps,
733 snapid_t *osnap_seq,
734 uint32_t *flags,
735 uint32_t *dd,
736 uint32_t *od,
737 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *oreqids,
738 mempool::osd_pglog::map<uint32_t, int> *oreqid_return_codes,
739 uint64_t *otseq,
740 uint64_t *otsize,
741 int *r)
742 : cursor(c),
743 out_size(s), out_mtime(m),
744 out_attrs(a), out_data(d), out_omap_header(oh),
745 out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
746 out_flags(flags), out_data_digest(dd), out_omap_digest(od),
747 out_reqids(oreqids),
748 out_reqid_return_codes(oreqid_return_codes),
749 out_truncate_seq(otseq),
750 out_truncate_size(otsize),
751 prval(r) {}
752 void finish(int r) override {
753 // reqids are copied on ENOENT
754 if (r < 0 && r != -ENOENT)
755 return;
756 try {
757 auto p = bl.cbegin();
758 object_copy_data_t copy_reply;
759 decode(copy_reply, p);
760 if (r == -ENOENT) {
761 if (out_reqids)
762 *out_reqids = copy_reply.reqids;
763 return;
764 }
765 if (out_size)
766 *out_size = copy_reply.size;
767 if (out_mtime)
768 *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
769 if (out_attrs)
770 *out_attrs = copy_reply.attrs;
771 if (out_data)
772 out_data->claim_append(copy_reply.data);
773 if (out_omap_header)
774 out_omap_header->claim_append(copy_reply.omap_header);
775 if (out_omap_data)
776 *out_omap_data = copy_reply.omap_data;
777 if (out_snaps)
778 *out_snaps = copy_reply.snaps;
779 if (out_snap_seq)
780 *out_snap_seq = copy_reply.snap_seq;
781 if (out_flags)
782 *out_flags = copy_reply.flags;
783 if (out_data_digest)
784 *out_data_digest = copy_reply.data_digest;
785 if (out_omap_digest)
786 *out_omap_digest = copy_reply.omap_digest;
787 if (out_reqids)
788 *out_reqids = copy_reply.reqids;
789 if (out_reqid_return_codes)
790 *out_reqid_return_codes = copy_reply.reqid_return_codes;
791 if (out_truncate_seq)
792 *out_truncate_seq = copy_reply.truncate_seq;
793 if (out_truncate_size)
794 *out_truncate_size = copy_reply.truncate_size;
795 *cursor = copy_reply.cursor;
796 } catch (buffer::error& e) {
797 if (prval)
798 *prval = -EIO;
799 }
800 }
801 };
802
803 void copy_get(object_copy_cursor_t *cursor,
804 uint64_t max,
805 uint64_t *out_size,
806 ceph::real_time *out_mtime,
807 std::map<std::string,bufferlist> *out_attrs,
808 bufferlist *out_data,
809 bufferlist *out_omap_header,
810 bufferlist *out_omap_data,
811 vector<snapid_t> *out_snaps,
812 snapid_t *out_snap_seq,
813 uint32_t *out_flags,
814 uint32_t *out_data_digest,
815 uint32_t *out_omap_digest,
816 mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids,
817 mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes,
818 uint64_t *truncate_seq,
819 uint64_t *truncate_size,
820 int *prval) {
821 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
822 osd_op.op.copy_get.max = max;
823 encode(*cursor, osd_op.indata);
824 encode(max, osd_op.indata);
825 unsigned p = ops.size() - 1;
826 out_rval[p] = prval;
827 C_ObjectOperation_copyget *h =
828 new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
829 out_attrs, out_data, out_omap_header,
830 out_omap_data, out_snaps, out_snap_seq,
831 out_flags, out_data_digest,
832 out_omap_digest, out_reqids,
833 out_reqid_return_codes, truncate_seq,
834 truncate_size, prval);
835 out_bl[p] = &h->bl;
836 out_handler[p] = h;
837 }
838
839 void undirty() {
840 add_op(CEPH_OSD_OP_UNDIRTY);
841 }
842
843 struct C_ObjectOperation_isdirty : public Context {
844 bufferlist bl;
845 bool *pisdirty;
846 int *prval;
847 C_ObjectOperation_isdirty(bool *p, int *r)
848 : pisdirty(p), prval(r) {}
849 void finish(int r) override {
850 if (r < 0)
851 return;
852 try {
853 auto p = bl.cbegin();
854 bool isdirty;
855 decode(isdirty, p);
856 if (pisdirty)
857 *pisdirty = isdirty;
858 } catch (buffer::error& e) {
859 if (prval)
860 *prval = -EIO;
861 }
862 }
863 };
864
865 void is_dirty(bool *pisdirty, int *prval) {
866 add_op(CEPH_OSD_OP_ISDIRTY);
867 unsigned p = ops.size() - 1;
868 out_rval[p] = prval;
869 C_ObjectOperation_isdirty *h =
870 new C_ObjectOperation_isdirty(pisdirty, prval);
871 out_bl[p] = &h->bl;
872 out_handler[p] = h;
873 }
874
875 struct C_ObjectOperation_hit_set_ls : public Context {
876 bufferlist bl;
877 std::list< std::pair<time_t, time_t> > *ptls;
878 std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
879 int *prval;
880 C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
881 std::list< std::pair<ceph::real_time,
882 ceph::real_time> > *ut,
883 int *r)
884 : ptls(t), putls(ut), prval(r) {}
885 void finish(int r) override {
886 if (r < 0)
887 return;
888 try {
889 auto p = bl.cbegin();
890 std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
891 decode(ls, p);
892 if (ptls) {
893 ptls->clear();
894 for (auto p = ls.begin(); p != ls.end(); ++p)
895 // round initial timestamp up to the next full second to
896 // keep this a valid interval.
897 ptls->push_back(
898 make_pair(ceph::real_clock::to_time_t(
899 ceph::ceil(p->first,
900 // Sadly, no time literals until C++14.
901 std::chrono::seconds(1))),
902 ceph::real_clock::to_time_t(p->second)));
903 }
904 if (putls)
905 putls->swap(ls);
906 } catch (buffer::error& e) {
907 r = -EIO;
908 }
909 if (prval)
910 *prval = r;
911 }
912 };
913
914 /**
915 * list available HitSets.
916 *
917 * We will get back a list of time intervals. Note that the most
918 * recent range may have an empty end timestamp if it is still
919 * accumulating.
920 *
921 * @param pls [out] list of time intervals
922 * @param prval [out] return value
923 */
924 void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
925 add_op(CEPH_OSD_OP_PG_HITSET_LS);
926 unsigned p = ops.size() - 1;
927 out_rval[p] = prval;
928 C_ObjectOperation_hit_set_ls *h =
929 new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
930 out_bl[p] = &h->bl;
931 out_handler[p] = h;
932 }
933 void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
934 int *prval) {
935 add_op(CEPH_OSD_OP_PG_HITSET_LS);
936 unsigned p = ops.size() - 1;
937 out_rval[p] = prval;
938 C_ObjectOperation_hit_set_ls *h =
939 new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
940 out_bl[p] = &h->bl;
941 out_handler[p] = h;
942 }
943
944 /**
945 * get HitSet
946 *
947 * Return an encoded HitSet that includes the provided time
948 * interval.
949 *
950 * @param stamp [in] timestamp
951 * @param pbl [out] target buffer for encoded HitSet
952 * @param prval [out] return value
953 */
954 void hit_set_get(ceph::real_time stamp, bufferlist *pbl, int *prval) {
955 OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
956 op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
957 unsigned p = ops.size() - 1;
958 out_rval[p] = prval;
959 out_bl[p] = pbl;
960 }
961
962 void omap_get_header(bufferlist *bl, int *prval) {
963 add_op(CEPH_OSD_OP_OMAPGETHEADER);
964 unsigned p = ops.size() - 1;
965 out_bl[p] = bl;
966 out_rval[p] = prval;
967 }
968
969 void omap_set(const map<string, bufferlist> &map) {
970 bufferlist bl;
971 encode(map, bl);
972 add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
973 }
974
975 void omap_set_header(bufferlist &bl) {
976 add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
977 }
978
979 void omap_clear() {
980 add_op(CEPH_OSD_OP_OMAPCLEAR);
981 }
982
983 void omap_rm_keys(const std::set<std::string> &to_remove) {
984 bufferlist bl;
985 encode(to_remove, bl);
986 add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
987 }
988
989 // object classes
990 void call(const char *cname, const char *method, bufferlist &indata) {
991 add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
992 }
993
994 void call(const char *cname, const char *method, bufferlist &indata,
995 bufferlist *outdata, Context *ctx, int *prval) {
996 add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
997 }
998
999 // watch/notify
1000 void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) {
1001 OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH);
1002 osd_op.op.watch.cookie = cookie;
1003 osd_op.op.watch.op = op;
1004 osd_op.op.watch.timeout = timeout;
1005 }
1006
1007 void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
1008 bufferlist &bl, bufferlist *inbl) {
1009 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
1010 osd_op.op.notify.cookie = cookie;
1011 encode(prot_ver, *inbl);
1012 encode(timeout, *inbl);
1013 encode(bl, *inbl);
1014 osd_op.indata.append(*inbl);
1015 }
1016
1017 void notify_ack(uint64_t notify_id, uint64_t cookie,
1018 bufferlist& reply_bl) {
1019 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
1020 bufferlist bl;
1021 encode(notify_id, bl);
1022 encode(cookie, bl);
1023 encode(reply_bl, bl);
1024 osd_op.indata.append(bl);
1025 }
1026
1027 void list_watchers(list<obj_watch_t> *out,
1028 int *prval) {
1029 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
1030 if (prval || out) {
1031 unsigned p = ops.size() - 1;
1032 C_ObjectOperation_decodewatchers *h =
1033 new C_ObjectOperation_decodewatchers(out, prval);
1034 out_handler[p] = h;
1035 out_bl[p] = &h->bl;
1036 out_rval[p] = prval;
1037 }
1038 }
1039
1040 void list_snaps(librados::snap_set_t *out, int *prval) {
1041 (void)add_op(CEPH_OSD_OP_LIST_SNAPS);
1042 if (prval || out) {
1043 unsigned p = ops.size() - 1;
1044 C_ObjectOperation_decodesnaps *h =
1045 new C_ObjectOperation_decodesnaps(out, prval);
1046 out_handler[p] = h;
1047 out_bl[p] = &h->bl;
1048 out_rval[p] = prval;
1049 }
1050 }
1051
1052 void assert_version(uint64_t ver) {
1053 OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
1054 osd_op.op.assert_ver.ver = ver;
1055 }
1056
1057 void cmpxattr(const char *name, const bufferlist& val,
1058 int op, int mode) {
1059 add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
1060 OSDOp& o = *ops.rbegin();
1061 o.op.xattr.cmp_op = op;
1062 o.op.xattr.cmp_mode = mode;
1063 }
1064
1065 void rollback(uint64_t snapid) {
1066 OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
1067 osd_op.op.snap.snapid = snapid;
1068 }
1069
1070 void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
1071 version_t src_version, unsigned flags,
1072 unsigned src_fadvise_flags) {
1073 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
1074 osd_op.op.copy_from.snapid = snapid;
1075 osd_op.op.copy_from.src_version = src_version;
1076 osd_op.op.copy_from.flags = flags;
1077 osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
1078 encode(src, osd_op.indata);
1079 encode(src_oloc, osd_op.indata);
1080 }
1081
1082 /**
1083 * writeback content to backing tier
1084 *
1085 * If object is marked dirty in the cache tier, write back content
1086 * to backing tier. If the object is clean this is a no-op.
1087 *
1088 * If writeback races with an update, the update will block.
1089 *
1090 * use with IGNORE_CACHE to avoid triggering promote.
1091 */
1092 void cache_flush() {
1093 add_op(CEPH_OSD_OP_CACHE_FLUSH);
1094 }
1095
1096 /**
1097 * writeback content to backing tier
1098 *
1099 * If object is marked dirty in the cache tier, write back content
1100 * to backing tier. If the object is clean this is a no-op.
1101 *
1102 * If writeback races with an update, return EAGAIN. Requires that
1103 * the SKIPRWLOCKS flag be set.
1104 *
1105 * use with IGNORE_CACHE to avoid triggering promote.
1106 */
1107 void cache_try_flush() {
1108 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
1109 }
1110
1111 /**
1112 * evict object from cache tier
1113 *
1114 * If object is marked clean, remove the object from the cache tier.
1115 * Otherwise, return EBUSY.
1116 *
1117 * use with IGNORE_CACHE to avoid triggering promote.
1118 */
1119 void cache_evict() {
1120 add_op(CEPH_OSD_OP_CACHE_EVICT);
1121 }
1122
1123 /*
1124 * Extensible tier
1125 */
1126 void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc,
1127 version_t tgt_version, int flag) {
1128 OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
1129 osd_op.op.copy_from.snapid = snapid;
1130 osd_op.op.copy_from.src_version = tgt_version;
1131 encode(tgt, osd_op.indata);
1132 encode(tgt_oloc, osd_op.indata);
1133 set_last_op_flags(flag);
1134 }
1135
1136 void set_chunk(uint64_t src_offset, uint64_t src_length, object_locator_t tgt_oloc,
1137 object_t tgt_oid, uint64_t tgt_offset, int flag) {
1138 OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_CHUNK);
1139 encode(src_offset, osd_op.indata);
1140 encode(src_length, osd_op.indata);
1141 encode(tgt_oloc, osd_op.indata);
1142 encode(tgt_oid, osd_op.indata);
1143 encode(tgt_offset, osd_op.indata);
1144 set_last_op_flags(flag);
1145 }
1146
1147 void tier_promote() {
1148 add_op(CEPH_OSD_OP_TIER_PROMOTE);
1149 }
1150
1151 void unset_manifest() {
1152 add_op(CEPH_OSD_OP_UNSET_MANIFEST);
1153 }
1154
1155 void set_alloc_hint(uint64_t expected_object_size,
1156 uint64_t expected_write_size,
1157 uint32_t flags) {
1158 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
1159 expected_write_size, flags);
1160
1161 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1162 // not worth a feature bit. Set FAILOK per-op flag to make
1163 // sure older osds don't trip over an unsupported opcode.
1164 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1165 }
1166
1167 void dup(vector<OSDOp>& sops) {
1168 ops = sops;
1169 out_bl.resize(sops.size());
1170 out_handler.resize(sops.size());
1171 out_rval.resize(sops.size());
1172 for (uint32_t i = 0; i < sops.size(); i++) {
1173 out_bl[i] = &sops[i].outdata;
1174 out_handler[i] = NULL;
1175 out_rval[i] = &sops[i].rval;
1176 }
1177 }
1178
1179 /**
1180 * Pin/unpin an object in cache tier
1181 */
1182 void cache_pin() {
1183 add_op(CEPH_OSD_OP_CACHE_PIN);
1184 }
1185
1186 void cache_unpin() {
1187 add_op(CEPH_OSD_OP_CACHE_UNPIN);
1188 }
1189 };
1190
1191
1192 // ----------------
1193
1194
1195 class Objecter : public md_config_obs_t, public Dispatcher {
1196 public:
1197 // config observer bits
1198 const char** get_tracked_conf_keys() const override;
1199 void handle_conf_change(const ConfigProxy& conf,
1200 const std::set <std::string> &changed) override;
1201
1202 public:
1203 Messenger *messenger;
1204 MonClient *monc;
1205 Finisher *finisher;
1206 ZTracer::Endpoint trace_endpoint;
1207 private:
1208 OSDMap *osdmap;
1209 public:
1210 using Dispatcher::cct;
1211 std::multimap<string,string> crush_location;
1212
1213 std::atomic<bool> initialized{false};
1214
1215 private:
1216 std::atomic<uint64_t> last_tid{0};
1217 std::atomic<unsigned> inflight_ops{0};
1218 std::atomic<int> client_inc{-1};
1219 uint64_t max_linger_id;
1220 std::atomic<unsigned> num_in_flight{0};
1221 std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
1222 bool keep_balanced_budget;
1223 bool honor_osdmap_full;
1224 bool osdmap_full_try;
1225
1226 // If this is true, accumulate a set of blacklisted entities
1227 // to be drained by consume_blacklist_events.
1228 bool blacklist_events_enabled;
1229 std::set<entity_addr_t> blacklist_events;
1230
1231 public:
1232 void maybe_request_map();
1233
1234 void enable_blacklist_events();
1235 private:
1236
1237 void _maybe_request_map();
1238
1239 version_t last_seen_osdmap_version;
1240 version_t last_seen_pgmap_version;
1241
1242 mutable std::shared_mutex rwlock;
1243 using lock_guard = std::lock_guard<decltype(rwlock)>;
1244 using unique_lock = std::unique_lock<decltype(rwlock)>;
1245 using shared_lock = boost::shared_lock<decltype(rwlock)>;
1246 using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
1247 ceph::timer<ceph::coarse_mono_clock> timer;
1248
1249 PerfCounters *logger;
1250
1251 uint64_t tick_event;
1252
1253 void start_tick();
1254 void tick();
1255 void update_crush_location();
1256
1257 class RequestStateHook;
1258
1259 RequestStateHook *m_request_state_hook;
1260
1261 public:
1262 /*** track pending operations ***/
1263 // read
1264
1265 struct OSDSession;
1266
1267 struct op_target_t {
1268 int flags = 0;
1269
1270 epoch_t epoch = 0; ///< latest epoch we calculated the mapping
1271
1272 object_t base_oid;
1273 object_locator_t base_oloc;
1274 object_t target_oid;
1275 object_locator_t target_oloc;
1276
1277 ///< true if we are directed at base_pgid, not base_oid
1278 bool precalc_pgid = false;
1279
1280 ///< true if we have ever mapped to a valid pool
1281 bool pool_ever_existed = false;
1282
1283 ///< explcit pg target, if any
1284 pg_t base_pgid;
1285
1286 pg_t pgid; ///< last (raw) pg we mapped to
1287 spg_t actual_pgid; ///< last (actual) spg_t we mapped to
1288 unsigned pg_num = 0; ///< last pg_num we mapped to
1289 unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
1290 unsigned pg_num_pending = 0; ///< last pg_num we mapped to
1291 vector<int> up; ///< set of up osds for last pg we mapped to
1292 vector<int> acting; ///< set of acting osds for last pg we mapped to
1293 int up_primary = -1; ///< last up_primary we mapped to
1294 int acting_primary = -1; ///< last acting_primary we mapped to
1295 int size = -1; ///< the size of the pool when were were last mapped
1296 int min_size = -1; ///< the min size of the pool when were were last mapped
1297 bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise
1298 bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering
1299
1300 bool used_replica = false;
1301 bool paused = false;
1302
1303 int osd = -1; ///< the final target osd, or -1
1304
1305 epoch_t last_force_resend = 0;
1306
1307 op_target_t(object_t oid, object_locator_t oloc, int flags)
1308 : flags(flags),
1309 base_oid(oid),
1310 base_oloc(oloc)
1311 {}
1312
1313 explicit op_target_t(pg_t pgid)
1314 : base_oloc(pgid.pool(), pgid.ps()),
1315 precalc_pgid(true),
1316 base_pgid(pgid)
1317 {}
1318
1319 op_target_t() = default;
1320
1321 hobject_t get_hobj() {
1322 return hobject_t(target_oid,
1323 target_oloc.key,
1324 CEPH_NOSNAP,
1325 target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
1326 target_oloc.pool,
1327 target_oloc.nspace);
1328 }
1329
1330 bool contained_by(const hobject_t& begin, const hobject_t& end) {
1331 hobject_t h = get_hobj();
1332 int r = cmp(h, begin);
1333 return r == 0 || (r > 0 && h < end);
1334 }
1335
1336 void dump(Formatter *f) const;
1337 };
1338
1339 struct Op : public RefCountedObject {
1340 OSDSession *session;
1341 int incarnation;
1342
1343 op_target_t target;
1344
1345 ConnectionRef con; // for rx buffer only
1346 uint64_t features; // explicitly specified op features
1347
1348 vector<OSDOp> ops;
1349
1350 snapid_t snapid;
1351 SnapContext snapc;
1352 ceph::real_time mtime;
1353
1354 bufferlist *outbl;
1355 vector<bufferlist*> out_bl;
1356 vector<Context*> out_handler;
1357 vector<int*> out_rval;
1358
1359 int priority;
1360 Context *onfinish;
1361 uint64_t ontimeout;
1362
1363 ceph_tid_t tid;
1364 int attempts;
1365
1366 version_t *objver;
1367 epoch_t *reply_epoch;
1368
1369 ceph::coarse_mono_time stamp;
1370
1371 epoch_t map_dne_bound;
1372
1373 int budget;
1374
1375 /// true if we should resend this message on failure
1376 bool should_resend;
1377
1378 /// true if the throttle budget is get/put on a series of OPs,
1379 /// instead of per OP basis, when this flag is set, the budget is
1380 /// acquired before sending the very first OP of the series and
1381 /// released upon receiving the last OP reply.
1382 bool ctx_budgeted;
1383
1384 int *data_offset;
1385
1386 osd_reqid_t reqid; // explicitly setting reqid
1387 ZTracer::Trace trace;
1388
1389 Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
1390 int f, Context *fin, version_t *ov, int *offset = NULL,
1391 ZTracer::Trace *parent_trace = nullptr) :
1392 session(NULL), incarnation(0),
1393 target(o, ol, f),
1394 con(NULL),
1395 features(CEPH_FEATURES_SUPPORTED_DEFAULT),
1396 snapid(CEPH_NOSNAP),
1397 outbl(NULL),
1398 priority(0),
1399 onfinish(fin),
1400 ontimeout(0),
1401 tid(0),
1402 attempts(0),
1403 objver(ov),
1404 reply_epoch(NULL),
1405 map_dne_bound(0),
1406 budget(-1),
1407 should_resend(true),
1408 ctx_budgeted(false),
1409 data_offset(offset) {
1410 ops.swap(op);
1411
1412 /* initialize out_* to match op vector */
1413 out_bl.resize(ops.size());
1414 out_rval.resize(ops.size());
1415 out_handler.resize(ops.size());
1416 for (unsigned i = 0; i < ops.size(); i++) {
1417 out_bl[i] = NULL;
1418 out_handler[i] = NULL;
1419 out_rval[i] = NULL;
1420 }
1421
1422 if (target.base_oloc.key == o)
1423 target.base_oloc.key.clear();
1424
1425 if (parent_trace && parent_trace->valid()) {
1426 trace.init("op", nullptr, parent_trace);
1427 trace.event("start");
1428 }
1429 }
1430
1431 bool operator<(const Op& other) const {
1432 return tid < other.tid;
1433 }
1434
1435 bool respects_full() const {
1436 return
1437 (target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
1438 !(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
1439 }
1440
1441 private:
1442 ~Op() override {
1443 while (!out_handler.empty()) {
1444 delete out_handler.back();
1445 out_handler.pop_back();
1446 }
1447 trace.event("finish");
1448 }
1449 };
1450
1451 struct C_Op_Map_Latest : public Context {
1452 Objecter *objecter;
1453 ceph_tid_t tid;
1454 version_t latest;
1455 C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1456 latest(0) {}
1457 void finish(int r) override;
1458 };
1459
1460 struct C_Command_Map_Latest : public Context {
1461 Objecter *objecter;
1462 uint64_t tid;
1463 version_t latest;
1464 C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1465 latest(0) {}
1466 void finish(int r) override;
1467 };
1468
1469 struct C_Stat : public Context {
1470 bufferlist bl;
1471 uint64_t *psize;
1472 ceph::real_time *pmtime;
1473 Context *fin;
1474 C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
1475 psize(ps), pmtime(pm), fin(c) {}
1476 void finish(int r) override {
1477 if (r >= 0) {
1478 auto p = bl.cbegin();
1479 uint64_t s;
1480 ceph::real_time m;
1481 decode(s, p);
1482 decode(m, p);
1483 if (psize)
1484 *psize = s;
1485 if (pmtime)
1486 *pmtime = m;
1487 }
1488 fin->complete(r);
1489 }
1490 };
1491
1492 struct C_GetAttrs : public Context {
1493 bufferlist bl;
1494 map<string,bufferlist>& attrset;
1495 Context *fin;
1496 C_GetAttrs(map<string, bufferlist>& set, Context *c) : attrset(set),
1497 fin(c) {}
1498 void finish(int r) override {
1499 if (r >= 0) {
1500 auto p = bl.cbegin();
1501 decode(attrset, p);
1502 }
1503 fin->complete(r);
1504 }
1505 };
1506
1507
1508 // Pools and statistics
1509 struct NListContext {
1510 collection_list_handle_t pos;
1511
1512 // these are for !sortbitwise compat only
1513 int current_pg = 0;
1514 int starting_pg_num = 0;
1515 bool sort_bitwise = false;
1516
1517 bool at_end_of_pool = false; ///< publicly visible end flag
1518
1519 int64_t pool_id = -1;
1520 int pool_snap_seq = 0;
1521 uint64_t max_entries = 0;
1522 string nspace;
1523
1524 bufferlist bl; // raw data read to here
1525 std::list<librados::ListObjectImpl> list;
1526
1527 bufferlist filter;
1528
1529 bufferlist extra_info;
1530
1531 // The budget associated with this context, once it is set (>= 0),
1532 // the budget is not get/released on OP basis, instead the budget
1533 // is acquired before sending the first OP and released upon receiving
1534 // the last op reply.
1535 int ctx_budget = -1;
1536
1537 bool at_end() const {
1538 return at_end_of_pool;
1539 }
1540
1541 uint32_t get_pg_hash_position() const {
1542 return pos.get_hash();
1543 }
1544 };
1545
1546 struct C_NList : public Context {
1547 NListContext *list_context;
1548 Context *final_finish;
1549 Objecter *objecter;
1550 epoch_t epoch;
1551 C_NList(NListContext *lc, Context * finish, Objecter *ob) :
1552 list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
1553 void finish(int r) override {
1554 if (r >= 0) {
1555 objecter->_nlist_reply(list_context, r, final_finish, epoch);
1556 } else {
1557 final_finish->complete(r);
1558 }
1559 }
1560 };
1561
1562 struct PoolStatOp {
1563 ceph_tid_t tid;
1564 list<string> pools;
1565
1566 map<string,pool_stat_t> *pool_stats;
1567 bool *per_pool;
1568 Context *onfinish;
1569 uint64_t ontimeout;
1570
1571 ceph::coarse_mono_time last_submit;
1572 };
1573
1574 struct StatfsOp {
1575 ceph_tid_t tid;
1576 struct ceph_statfs *stats;
1577 boost::optional<int64_t> data_pool;
1578 Context *onfinish;
1579 uint64_t ontimeout;
1580
1581 ceph::coarse_mono_time last_submit;
1582 };
1583
1584 struct PoolOp {
1585 ceph_tid_t tid;
1586 int64_t pool;
1587 string name;
1588 Context *onfinish;
1589 uint64_t ontimeout;
1590 int pool_op;
1591 int16_t crush_rule;
1592 snapid_t snapid;
1593 bufferlist *blp;
1594
1595 ceph::coarse_mono_time last_submit;
1596 PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
1597 crush_rule(0), snapid(0), blp(NULL) {}
1598 };
1599
1600 // -- osd commands --
1601 struct CommandOp : public RefCountedObject {
1602 OSDSession *session = nullptr;
1603 ceph_tid_t tid = 0;
1604 vector<string> cmd;
1605 bufferlist inbl;
1606 bufferlist *poutbl = nullptr;
1607 string *prs = nullptr;
1608
1609 // target_osd == -1 means target_pg is valid
1610 const int target_osd = -1;
1611 const pg_t target_pg;
1612
1613 op_target_t target;
1614
1615 epoch_t map_dne_bound = 0;
1616 int map_check_error = 0; // error to return if map check fails
1617 const char *map_check_error_str = nullptr;
1618
1619 Context *onfinish = nullptr;
1620 uint64_t ontimeout = 0;
1621 ceph::coarse_mono_time last_submit;
1622
1623 CommandOp(
1624 int target_osd,
1625 const vector<string> &cmd,
1626 bufferlist inbl,
1627 bufferlist *poutbl,
1628 string *prs,
1629 Context *onfinish)
1630 : cmd(cmd),
1631 inbl(inbl),
1632 poutbl(poutbl),
1633 prs(prs),
1634 target_osd(target_osd),
1635 onfinish(onfinish) {}
1636
1637 CommandOp(
1638 pg_t pgid,
1639 const vector<string> &cmd,
1640 bufferlist inbl,
1641 bufferlist *poutbl,
1642 string *prs,
1643 Context *onfinish)
1644 : cmd(cmd),
1645 inbl(inbl),
1646 poutbl(poutbl),
1647 prs(prs),
1648 target_pg(pgid),
1649 target(pgid),
1650 onfinish(onfinish) {}
1651
1652 };
1653
1654 void submit_command(CommandOp *c, ceph_tid_t *ptid);
1655 int _calc_command_target(CommandOp *c, shunique_lock &sul);
1656 void _assign_command_session(CommandOp *c, shunique_lock &sul);
1657 void _send_command(CommandOp *c);
1658 int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
1659 void _finish_command(CommandOp *c, int r, string rs);
1660 void handle_command_reply(MCommandReply *m);
1661
1662
1663 // -- lingering ops --
1664
1665 struct WatchContext {
1666 // this simply mirrors librados WatchCtx2
1667 virtual void handle_notify(uint64_t notify_id,
1668 uint64_t cookie,
1669 uint64_t notifier_id,
1670 bufferlist& bl) = 0;
1671 virtual void handle_error(uint64_t cookie, int err) = 0;
1672 virtual ~WatchContext() {}
1673 };
1674
1675 struct LingerOp : public RefCountedObject {
1676 uint64_t linger_id;
1677
1678 op_target_t target;
1679
1680 snapid_t snap;
1681 SnapContext snapc;
1682 ceph::real_time mtime;
1683
1684 vector<OSDOp> ops;
1685 bufferlist inbl;
1686 bufferlist *poutbl;
1687 version_t *pobjver;
1688
1689 bool is_watch;
1690 ceph::coarse_mono_time watch_valid_thru; ///< send time for last acked ping
1691 int last_error; ///< error from last failed ping|reconnect, if any
1692 std::shared_mutex watch_lock;
1693 using lock_guard = std::unique_lock<decltype(watch_lock)>;
1694 using unique_lock = std::unique_lock<decltype(watch_lock)>;
1695 using shared_lock = boost::shared_lock<decltype(watch_lock)>;
1696 using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>;
1697
1698 // queue of pending async operations, with the timestamp of
1699 // when they were queued.
1700 list<ceph::coarse_mono_time> watch_pending_async;
1701
1702 uint32_t register_gen;
1703 bool registered;
1704 bool canceled;
1705 Context *on_reg_commit;
1706
1707 // we trigger these from an async finisher
1708 Context *on_notify_finish;
1709 bufferlist *notify_result_bl;
1710 uint64_t notify_id;
1711
1712 WatchContext *watch_context;
1713
1714 OSDSession *session;
1715
1716 Objecter *objecter;
1717 int ctx_budget;
1718 ceph_tid_t register_tid;
1719 ceph_tid_t ping_tid;
1720 epoch_t map_dne_bound;
1721
1722 void _queued_async() {
1723 // watch_lock ust be locked unique
1724 watch_pending_async.push_back(ceph::coarse_mono_clock::now());
1725 }
1726 void finished_async() {
1727 unique_lock l(watch_lock);
1728 ceph_assert(!watch_pending_async.empty());
1729 watch_pending_async.pop_front();
1730 }
1731
1732 explicit LingerOp(Objecter *o) : linger_id(0),
1733 target(object_t(), object_locator_t(), 0),
1734 snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
1735 is_watch(false), last_error(0),
1736 register_gen(0),
1737 registered(false),
1738 canceled(false),
1739 on_reg_commit(NULL),
1740 on_notify_finish(NULL),
1741 notify_result_bl(NULL),
1742 notify_id(0),
1743 watch_context(NULL),
1744 session(NULL),
1745 objecter(o),
1746 ctx_budget(-1),
1747 register_tid(0),
1748 ping_tid(0),
1749 map_dne_bound(0) {}
1750
1751 const LingerOp &operator=(const LingerOp& r) = delete;
1752 LingerOp(const LingerOp& o) = delete;
1753
1754 uint64_t get_cookie() {
1755 return reinterpret_cast<uint64_t>(this);
1756 }
1757
1758 private:
1759 ~LingerOp() override {
1760 delete watch_context;
1761 }
1762 };
1763
1764 struct C_Linger_Commit : public Context {
1765 Objecter *objecter;
1766 LingerOp *info;
1767 bufferlist outbl; // used for notify only
1768 C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1769 info->get();
1770 }
1771 ~C_Linger_Commit() override {
1772 info->put();
1773 }
1774 void finish(int r) override {
1775 objecter->_linger_commit(info, r, outbl);
1776 }
1777 };
1778
1779 struct C_Linger_Reconnect : public Context {
1780 Objecter *objecter;
1781 LingerOp *info;
1782 C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1783 info->get();
1784 }
1785 ~C_Linger_Reconnect() override {
1786 info->put();
1787 }
1788 void finish(int r) override {
1789 objecter->_linger_reconnect(info, r);
1790 }
1791 };
1792
1793 struct C_Linger_Ping : public Context {
1794 Objecter *objecter;
1795 LingerOp *info;
1796 ceph::coarse_mono_time sent;
1797 uint32_t register_gen;
1798 C_Linger_Ping(Objecter *o, LingerOp *l)
1799 : objecter(o), info(l), register_gen(info->register_gen) {
1800 info->get();
1801 }
1802 ~C_Linger_Ping() override {
1803 info->put();
1804 }
1805 void finish(int r) override {
1806 objecter->_linger_ping(info, r, sent, register_gen);
1807 }
1808 };
1809
1810 struct C_Linger_Map_Latest : public Context {
1811 Objecter *objecter;
1812 uint64_t linger_id;
1813 version_t latest;
1814 C_Linger_Map_Latest(Objecter *o, uint64_t id) :
1815 objecter(o), linger_id(id), latest(0) {}
1816 void finish(int r) override;
1817 };
1818
1819 // -- osd sessions --
1820 struct OSDBackoff {
1821 spg_t pgid;
1822 uint64_t id;
1823 hobject_t begin, end;
1824 };
1825
1826 struct OSDSession : public RefCountedObject {
1827 std::shared_mutex lock;
1828 using lock_guard = std::lock_guard<decltype(lock)>;
1829 using unique_lock = std::unique_lock<decltype(lock)>;
1830 using shared_lock = boost::shared_lock<decltype(lock)>;
1831 using shunique_lock = ceph::shunique_lock<decltype(lock)>;
1832
1833 // pending ops
1834 map<ceph_tid_t,Op*> ops;
1835 map<uint64_t, LingerOp*> linger_ops;
1836 map<ceph_tid_t,CommandOp*> command_ops;
1837
1838 // backoffs
1839 map<spg_t,map<hobject_t,OSDBackoff>> backoffs;
1840 map<uint64_t,OSDBackoff*> backoffs_by_id;
1841
1842 int osd;
1843 int incarnation;
1844 ConnectionRef con;
1845 int num_locks;
1846 std::unique_ptr<std::mutex[]> completion_locks;
1847 using unique_completion_lock = std::unique_lock<
1848 decltype(completion_locks)::element_type>;
1849
1850
1851 OSDSession(CephContext *cct, int o) :
1852 osd(o), incarnation(0), con(NULL),
1853 num_locks(cct->_conf->objecter_completion_locks_per_session),
1854 completion_locks(new std::mutex[num_locks]) {}
1855
1856 ~OSDSession() override;
1857
1858 bool is_homeless() { return (osd == -1); }
1859
1860 unique_completion_lock get_lock(object_t& oid);
1861 };
1862 map<int,OSDSession*> osd_sessions;
1863
1864 bool osdmap_full_flag() const;
1865 bool osdmap_pool_full(const int64_t pool_id) const;
1866
1867 private:
1868
1869 /**
1870 * Test pg_pool_t::FLAG_FULL on a pool
1871 *
1872 * @return true if the pool exists and has the flag set, or
1873 * the global full flag is set, else false
1874 */
1875 bool _osdmap_pool_full(const int64_t pool_id) const;
1876 bool _osdmap_pool_full(const pg_pool_t &p) const;
1877 void update_pool_full_map(map<int64_t, bool>& pool_full_map);
1878
1879 map<uint64_t, LingerOp*> linger_ops;
1880 // we use this just to confirm a cookie is valid before dereferencing the ptr
1881 set<LingerOp*> linger_ops_set;
1882
1883 map<ceph_tid_t,PoolStatOp*> poolstat_ops;
1884 map<ceph_tid_t,StatfsOp*> statfs_ops;
1885 map<ceph_tid_t,PoolOp*> pool_ops;
1886 std::atomic<unsigned> num_homeless_ops{0};
1887
1888 OSDSession *homeless_session;
1889
1890 // ops waiting for an osdmap with a new pool or confirmation that
1891 // the pool does not exist (may be expanded to other uses later)
1892 map<uint64_t, LingerOp*> check_latest_map_lingers;
1893 map<ceph_tid_t, Op*> check_latest_map_ops;
1894 map<ceph_tid_t, CommandOp*> check_latest_map_commands;
1895
1896 map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
1897
1898 ceph::timespan mon_timeout;
1899 ceph::timespan osd_timeout;
1900
1901 MOSDOp *_prepare_osd_op(Op *op);
1902 void _send_op(Op *op);
1903 void _send_op_account(Op *op);
1904 void _cancel_linger_op(Op *op);
1905 void _finish_op(Op *op, int r);
1906 static bool is_pg_changed(
1907 int oldprimary,
1908 const vector<int>& oldacting,
1909 int newprimary,
1910 const vector<int>& newacting,
1911 bool any_change=false);
1912 enum recalc_op_target_result {
1913 RECALC_OP_TARGET_NO_ACTION = 0,
1914 RECALC_OP_TARGET_NEED_RESEND,
1915 RECALC_OP_TARGET_POOL_DNE,
1916 RECALC_OP_TARGET_OSD_DNE,
1917 RECALC_OP_TARGET_OSD_DOWN,
1918 };
1919 bool _osdmap_full_flag() const;
1920 bool _osdmap_has_pool_full() const;
1921 void _prune_snapc(
1922 const mempool::osdmap::map<int64_t, OSDMap::snap_interval_set_t>& new_removed_snaps,
1923 Op *op);
1924
1925 bool target_should_be_paused(op_target_t *op);
1926 int _calc_target(op_target_t *t, Connection *con,
1927 bool any_change = false);
1928 int _map_session(op_target_t *op, OSDSession **s,
1929 shunique_lock& lc);
1930
1931 void _session_op_assign(OSDSession *s, Op *op);
1932 void _session_op_remove(OSDSession *s, Op *op);
1933 void _session_linger_op_assign(OSDSession *to, LingerOp *op);
1934 void _session_linger_op_remove(OSDSession *from, LingerOp *op);
1935 void _session_command_op_assign(OSDSession *to, CommandOp *op);
1936 void _session_command_op_remove(OSDSession *from, CommandOp *op);
1937
1938 int _assign_op_target_session(Op *op, shunique_lock& lc,
1939 bool src_session_locked,
1940 bool dst_session_locked);
1941 int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc);
1942
1943 void _linger_submit(LingerOp *info, shunique_lock& sul);
1944 void _send_linger(LingerOp *info, shunique_lock& sul);
1945 void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
1946 void _linger_reconnect(LingerOp *info, int r);
1947 void _send_linger_ping(LingerOp *info);
1948 void _linger_ping(LingerOp *info, int r, ceph::coarse_mono_time sent,
1949 uint32_t register_gen);
1950 int _normalize_watch_error(int r);
1951
1952 friend class C_DoWatchError;
1953 public:
1954 void linger_callback_flush(Context *ctx) {
1955 finisher->queue(ctx);
1956 }
1957
1958 private:
1959 void _check_op_pool_dne(Op *op, unique_lock *sl);
1960 void _send_op_map_check(Op *op);
1961 void _op_cancel_map_check(Op *op);
1962 void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
1963 void _send_linger_map_check(LingerOp *op);
1964 void _linger_cancel_map_check(LingerOp *op);
1965 void _check_command_map_dne(CommandOp *op);
1966 void _send_command_map_check(CommandOp *op);
1967 void _command_cancel_map_check(CommandOp *op);
1968
1969 void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend);
1970 void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
1971
1972 int _get_session(int osd, OSDSession **session, shunique_lock& sul);
1973 void put_session(OSDSession *s);
1974 void get_session(OSDSession *s);
1975 void _reopen_session(OSDSession *session);
1976 void close_session(OSDSession *session);
1977
1978 void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
1979 epoch_t reply_epoch);
1980
1981 void resend_mon_ops();
1982
1983 /**
1984 * handle a budget for in-flight ops
1985 * budget is taken whenever an op goes into the ops map
1986 * and returned whenever an op is removed from the map
1987 * If throttle_op needs to throttle it will unlock client_lock.
1988 */
1989 int calc_op_budget(const vector<OSDOp>& ops);
1990 void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
1991 int _take_op_budget(Op *op, shunique_lock& sul) {
1992 ceph_assert(sul && sul.mutex() == &rwlock);
1993 int op_budget = calc_op_budget(op->ops);
1994 if (keep_balanced_budget) {
1995 _throttle_op(op, sul, op_budget);
1996 } else { // update take_linger_budget to match this!
1997 op_throttle_bytes.take(op_budget);
1998 op_throttle_ops.take(1);
1999 }
2000 op->budget = op_budget;
2001 return op_budget;
2002 }
2003 int take_linger_budget(LingerOp *info);
2004 friend class WatchContext; // to invoke put_up_budget_bytes
2005 void put_op_budget_bytes(int op_budget) {
2006 ceph_assert(op_budget >= 0);
2007 op_throttle_bytes.put(op_budget);
2008 op_throttle_ops.put(1);
2009 }
2010 void put_nlist_context_budget(NListContext *list_context);
2011 Throttle op_throttle_bytes, op_throttle_ops;
2012
2013 public:
2014 Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
2015 Finisher *fin,
2016 double mon_timeout,
2017 double osd_timeout) :
2018 Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
2019 trace_endpoint("0.0.0.0", 0, "Objecter"),
2020 osdmap(new OSDMap),
2021 max_linger_id(0),
2022 keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
2023 blacklist_events_enabled(false),
2024 last_seen_osdmap_version(0), last_seen_pgmap_version(0),
2025 logger(NULL), tick_event(0), m_request_state_hook(NULL),
2026 homeless_session(new OSDSession(cct, -1)),
2027 mon_timeout(ceph::make_timespan(mon_timeout)),
2028 osd_timeout(ceph::make_timespan(osd_timeout)),
2029 op_throttle_bytes(cct, "objecter_bytes",
2030 cct->_conf->objecter_inflight_op_bytes),
2031 op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
2032 epoch_barrier(0),
2033 retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
2034 { }
2035 ~Objecter() override;
2036
2037 void init();
2038 void start(const OSDMap *o = nullptr);
2039 void shutdown();
2040
2041 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2042 // whatever functionality you want to use the OSDMap in a lambda like:
2043 //
2044 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2045 //
2046 // or
2047 //
2048 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2049 //
2050 // Do not call into something that will try to lock the OSDMap from
2051 // here or you will have great woe and misery.
2052
2053 template<typename Callback, typename...Args>
2054 auto with_osdmap(Callback&& cb, Args&&... args) const ->
2055 decltype(cb(*osdmap, std::forward<Args>(args)...)) {
2056 shared_lock l(rwlock);
2057 return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
2058 }
2059
2060
2061 /**
2062 * Tell the objecter to throttle outgoing ops according to its
2063 * budget (in _conf). If you do this, ops can block, in
2064 * which case it will unlock client_lock and sleep until
2065 * incoming messages reduce the used budget low enough for
2066 * the ops to continue going; then it will lock client_lock again.
2067 */
2068 void set_balanced_budget() { keep_balanced_budget = true; }
2069 void unset_balanced_budget() { keep_balanced_budget = false; }
2070
2071 void set_honor_osdmap_full() { honor_osdmap_full = true; }
2072 void unset_honor_osdmap_full() { honor_osdmap_full = false; }
2073
2074 void set_osdmap_full_try() { osdmap_full_try = true; }
2075 void unset_osdmap_full_try() { osdmap_full_try = false; }
2076
2077 void _scan_requests(
2078 OSDSession *s,
2079 bool skipped_map,
2080 bool cluster_full,
2081 map<int64_t, bool> *pool_full_map,
2082 map<ceph_tid_t, Op*>& need_resend,
2083 list<LingerOp*>& need_resend_linger,
2084 map<ceph_tid_t, CommandOp*>& need_resend_command,
2085 shunique_lock& sul,
2086 const mempool::osdmap::map<int64_t,OSDMap::snap_interval_set_t> *gap_removed_snaps);
2087
2088 int64_t get_object_hash_position(int64_t pool, const string& key,
2089 const string& ns);
2090 int64_t get_object_pg_hash_position(int64_t pool, const string& key,
2091 const string& ns);
2092
2093 // messages
2094 public:
2095 bool ms_dispatch(Message *m) override;
2096 bool ms_can_fast_dispatch_any() const override {
2097 return true;
2098 }
2099 bool ms_can_fast_dispatch(const Message *m) const override {
2100 switch (m->get_type()) {
2101 case CEPH_MSG_OSD_OPREPLY:
2102 case CEPH_MSG_WATCH_NOTIFY:
2103 return true;
2104 default:
2105 return false;
2106 }
2107 }
2108 void ms_fast_dispatch(Message *m) override {
2109 if (!ms_dispatch(m)) {
2110 m->put();
2111 }
2112 }
2113
2114 void handle_osd_op_reply(class MOSDOpReply *m);
2115 void handle_osd_backoff(class MOSDBackoff *m);
2116 void handle_watch_notify(class MWatchNotify *m);
2117 void handle_osd_map(class MOSDMap *m);
2118 void wait_for_osd_map();
2119
2120 /**
2121 * Get list of entities blacklisted since this was last called,
2122 * and reset the list.
2123 *
2124 * Uses a std::set because typical use case is to compare some
2125 * other list of clients to see which overlap with the blacklisted
2126 * addrs.
2127 *
2128 */
2129 void consume_blacklist_events(std::set<entity_addr_t> *events);
2130
2131 int pool_snap_by_name(int64_t poolid,
2132 const char *snap_name,
2133 snapid_t *snap) const;
2134 int pool_snap_get_info(int64_t poolid, snapid_t snap,
2135 pool_snap_info_t *info) const;
2136 int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps);
2137 private:
2138
2139 void emit_blacklist_events(const OSDMap::Incremental &inc);
2140 void emit_blacklist_events(const OSDMap &old_osd_map,
2141 const OSDMap &new_osd_map);
2142
2143 // low-level
2144 void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid);
2145 void _op_submit_with_budget(Op *op, shunique_lock& lc,
2146 ceph_tid_t *ptid,
2147 int *ctx_budget = NULL);
2148 // public interface
2149 public:
2150 void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
2151 bool is_active() {
2152 shared_lock l(rwlock);
2153 return !((!inflight_ops) && linger_ops.empty() &&
2154 poolstat_ops.empty() && statfs_ops.empty());
2155 }
2156
2157 /**
2158 * Output in-flight requests
2159 */
2160 void _dump_active(OSDSession *s);
2161 void _dump_active();
2162 void dump_active();
2163 void dump_requests(Formatter *fmt);
2164 void _dump_ops(const OSDSession *s, Formatter *fmt);
2165 void dump_ops(Formatter *fmt);
2166 void _dump_linger_ops(const OSDSession *s, Formatter *fmt);
2167 void dump_linger_ops(Formatter *fmt);
2168 void _dump_command_ops(const OSDSession *s, Formatter *fmt);
2169 void dump_command_ops(Formatter *fmt);
2170 void dump_pool_ops(Formatter *fmt) const;
2171 void dump_pool_stat_ops(Formatter *fmt) const;
2172 void dump_statfs_ops(Formatter *fmt) const;
2173
2174 int get_client_incarnation() const { return client_inc; }
2175 void set_client_incarnation(int inc) { client_inc = inc; }
2176
2177 bool have_map(epoch_t epoch);
2178 /// wait for epoch; true if we already have it
2179 bool wait_for_map(epoch_t epoch, Context *c, int err=0);
2180 void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
2181 void wait_for_latest_osdmap(Context *fin);
2182 void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2183
2184 /** Get the current set of global op flags */
2185 int get_global_op_flags() const { return global_op_flags; }
2186 /** Add a flag to the global op flags, not really atomic operation */
2187 void add_global_op_flags(int flag) {
2188 global_op_flags.fetch_or(flag);
2189 }
2190 /** Clear the passed flags from the global op flag set */
2191 void clear_global_op_flag(int flags) {
2192 global_op_flags.fetch_and(~flags);
2193 }
2194
2195 /// cancel an in-progress request with the given return code
2196 private:
2197 int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
2198 int _op_cancel(ceph_tid_t tid, int r);
2199 public:
2200 int op_cancel(ceph_tid_t tid, int r);
2201 int op_cancel(const vector<ceph_tid_t>& tidls, int r);
2202
2203 /**
2204 * Any write op which is in progress at the start of this call shall no
2205 * longer be in progress when this call ends. Operations started after the
2206 * start of this call may still be in progress when this call ends.
2207 *
2208 * @return the latest possible epoch in which a cancelled op could have
2209 * existed, or -1 if nothing was cancelled.
2210 */
2211 epoch_t op_cancel_writes(int r, int64_t pool=-1);
2212
2213 // commands
2214 void osd_command(int osd, const std::vector<string>& cmd,
2215 const bufferlist& inbl, ceph_tid_t *ptid,
2216 bufferlist *poutbl, string *prs, Context *onfinish) {
2217 ceph_assert(osd >= 0);
2218 CommandOp *c = new CommandOp(
2219 osd,
2220 cmd,
2221 inbl,
2222 poutbl,
2223 prs,
2224 onfinish);
2225 submit_command(c, ptid);
2226 }
2227 void pg_command(pg_t pgid, const vector<string>& cmd,
2228 const bufferlist& inbl, ceph_tid_t *ptid,
2229 bufferlist *poutbl, string *prs, Context *onfinish) {
2230 CommandOp *c = new CommandOp(
2231 pgid,
2232 cmd,
2233 inbl,
2234 poutbl,
2235 prs,
2236 onfinish);
2237 submit_command(c, ptid);
2238 }
2239
2240 // mid-level helpers
2241 Op *prepare_mutate_op(
2242 const object_t& oid, const object_locator_t& oloc,
2243 ObjectOperation& op, const SnapContext& snapc,
2244 ceph::real_time mtime, int flags,
2245 Context *oncommit, version_t *objver = NULL,
2246 osd_reqid_t reqid = osd_reqid_t(),
2247 ZTracer::Trace *parent_trace = nullptr) {
2248 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2249 CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace);
2250 o->priority = op.priority;
2251 o->mtime = mtime;
2252 o->snapc = snapc;
2253 o->out_rval.swap(op.out_rval);
2254 o->reqid = reqid;
2255 return o;
2256 }
2257 ceph_tid_t mutate(
2258 const object_t& oid, const object_locator_t& oloc,
2259 ObjectOperation& op, const SnapContext& snapc,
2260 ceph::real_time mtime, int flags,
2261 Context *oncommit, version_t *objver = NULL,
2262 osd_reqid_t reqid = osd_reqid_t()) {
2263 Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
2264 oncommit, objver, reqid);
2265 ceph_tid_t tid;
2266 op_submit(o, &tid);
2267 return tid;
2268 }
2269 Op *prepare_read_op(
2270 const object_t& oid, const object_locator_t& oloc,
2271 ObjectOperation& op,
2272 snapid_t snapid, bufferlist *pbl, int flags,
2273 Context *onack, version_t *objver = NULL,
2274 int *data_offset = NULL,
2275 uint64_t features = 0,
2276 ZTracer::Trace *parent_trace = nullptr) {
2277 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags |
2278 CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace);
2279 o->priority = op.priority;
2280 o->snapid = snapid;
2281 o->outbl = pbl;
2282 if (!o->outbl && op.size() == 1 && op.out_bl[0]->length())
2283 o->outbl = op.out_bl[0];
2284 o->out_bl.swap(op.out_bl);
2285 o->out_handler.swap(op.out_handler);
2286 o->out_rval.swap(op.out_rval);
2287 return o;
2288 }
2289 ceph_tid_t read(
2290 const object_t& oid, const object_locator_t& oloc,
2291 ObjectOperation& op,
2292 snapid_t snapid, bufferlist *pbl, int flags,
2293 Context *onack, version_t *objver = NULL,
2294 int *data_offset = NULL,
2295 uint64_t features = 0) {
2296 Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
2297 data_offset);
2298 if (features)
2299 o->features = features;
2300 ceph_tid_t tid;
2301 op_submit(o, &tid);
2302 return tid;
2303 }
2304 Op *prepare_pg_read_op(
2305 uint32_t hash, object_locator_t oloc,
2306 ObjectOperation& op, bufferlist *pbl, int flags,
2307 Context *onack, epoch_t *reply_epoch,
2308 int *ctx_budget) {
2309 Op *o = new Op(object_t(), oloc,
2310 op.ops,
2311 flags | global_op_flags | CEPH_OSD_FLAG_READ |
2312 CEPH_OSD_FLAG_IGNORE_OVERLAY,
2313 onack, NULL);
2314 o->target.precalc_pgid = true;
2315 o->target.base_pgid = pg_t(hash, oloc.pool);
2316 o->priority = op.priority;
2317 o->snapid = CEPH_NOSNAP;
2318 o->outbl = pbl;
2319 o->out_bl.swap(op.out_bl);
2320 o->out_handler.swap(op.out_handler);
2321 o->out_rval.swap(op.out_rval);
2322 o->reply_epoch = reply_epoch;
2323 if (ctx_budget) {
2324 // budget is tracked by listing context
2325 o->ctx_budgeted = true;
2326 }
2327 return o;
2328 }
2329 ceph_tid_t pg_read(
2330 uint32_t hash, object_locator_t oloc,
2331 ObjectOperation& op, bufferlist *pbl, int flags,
2332 Context *onack, epoch_t *reply_epoch,
2333 int *ctx_budget) {
2334 Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
2335 onack, reply_epoch, ctx_budget);
2336 ceph_tid_t tid;
2337 op_submit(o, &tid, ctx_budget);
2338 return tid;
2339 }
2340
2341 // caller owns a ref
2342 LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
2343 int flags);
2344 ceph_tid_t linger_watch(LingerOp *info,
2345 ObjectOperation& op,
2346 const SnapContext& snapc, ceph::real_time mtime,
2347 bufferlist& inbl,
2348 Context *onfinish,
2349 version_t *objver);
2350 ceph_tid_t linger_notify(LingerOp *info,
2351 ObjectOperation& op,
2352 snapid_t snap, bufferlist& inbl,
2353 bufferlist *poutbl,
2354 Context *onack,
2355 version_t *objver);
2356 int linger_check(LingerOp *info);
2357 void linger_cancel(LingerOp *info); // releases a reference
2358 void _linger_cancel(LingerOp *info);
2359
2360 void _do_watch_notify(LingerOp *info, MWatchNotify *m);
2361
2362 /**
2363 * set up initial ops in the op vector, and allocate a final op slot.
2364 *
2365 * The caller is responsible for filling in the final ops_count ops.
2366 *
2367 * @param ops op vector
2368 * @param ops_count number of final ops the caller will fill in
2369 * @param extra_ops pointer to [array of] initial op[s]
2370 * @return index of final op (for caller to fill in)
2371 */
2372 int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {
2373 int i;
2374 int extra = 0;
2375
2376 if (extra_ops)
2377 extra = extra_ops->ops.size();
2378
2379 ops.resize(ops_count + extra);
2380
2381 for (i=0; i<extra; i++) {
2382 ops[i] = extra_ops->ops[i];
2383 }
2384
2385 return i;
2386 }
2387
2388
2389 // high-level helpers
2390 Op *prepare_stat_op(
2391 const object_t& oid, const object_locator_t& oloc,
2392 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2393 int flags, Context *onfinish, version_t *objver = NULL,
2394 ObjectOperation *extra_ops = NULL) {
2395 vector<OSDOp> ops;
2396 int i = init_ops(ops, 1, extra_ops);
2397 ops[i].op.op = CEPH_OSD_OP_STAT;
2398 C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
2399 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2400 CEPH_OSD_FLAG_READ, fin, objver);
2401 o->snapid = snap;
2402 o->outbl = &fin->bl;
2403 return o;
2404 }
2405 ceph_tid_t stat(
2406 const object_t& oid, const object_locator_t& oloc,
2407 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2408 int flags, Context *onfinish, version_t *objver = NULL,
2409 ObjectOperation *extra_ops = NULL) {
2410 Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
2411 onfinish, objver, extra_ops);
2412 ceph_tid_t tid;
2413 op_submit(o, &tid);
2414 return tid;
2415 }
2416
2417 Op *prepare_read_op(
2418 const object_t& oid, const object_locator_t& oloc,
2419 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2420 int flags, Context *onfinish, version_t *objver = NULL,
2421 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2422 ZTracer::Trace *parent_trace = nullptr) {
2423 vector<OSDOp> ops;
2424 int i = init_ops(ops, 1, extra_ops);
2425 ops[i].op.op = CEPH_OSD_OP_READ;
2426 ops[i].op.extent.offset = off;
2427 ops[i].op.extent.length = len;
2428 ops[i].op.extent.truncate_size = 0;
2429 ops[i].op.extent.truncate_seq = 0;
2430 ops[i].op.flags = op_flags;
2431 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2432 CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace);
2433 o->snapid = snap;
2434 o->outbl = pbl;
2435 return o;
2436 }
2437 ceph_tid_t read(
2438 const object_t& oid, const object_locator_t& oloc,
2439 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2440 int flags, Context *onfinish, version_t *objver = NULL,
2441 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2442 Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
2443 onfinish, objver, extra_ops, op_flags);
2444 ceph_tid_t tid;
2445 op_submit(o, &tid);
2446 return tid;
2447 }
2448
2449 Op *prepare_cmpext_op(
2450 const object_t& oid, const object_locator_t& oloc,
2451 uint64_t off, bufferlist &cmp_bl,
2452 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2453 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2454 vector<OSDOp> ops;
2455 int i = init_ops(ops, 1, extra_ops);
2456 ops[i].op.op = CEPH_OSD_OP_CMPEXT;
2457 ops[i].op.extent.offset = off;
2458 ops[i].op.extent.length = cmp_bl.length();
2459 ops[i].op.extent.truncate_size = 0;
2460 ops[i].op.extent.truncate_seq = 0;
2461 ops[i].indata = cmp_bl;
2462 ops[i].op.flags = op_flags;
2463 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2464 CEPH_OSD_FLAG_READ, onfinish, objver);
2465 o->snapid = snap;
2466 return o;
2467 }
2468
2469 ceph_tid_t cmpext(
2470 const object_t& oid, const object_locator_t& oloc,
2471 uint64_t off, bufferlist &cmp_bl,
2472 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2473 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2474 Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
2475 flags, onfinish, objver, extra_ops, op_flags);
2476 ceph_tid_t tid;
2477 op_submit(o, &tid);
2478 return tid;
2479 }
2480
2481 ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
2482 uint64_t off, uint64_t len, snapid_t snap,
2483 bufferlist *pbl, int flags, uint64_t trunc_size,
2484 __u32 trunc_seq, Context *onfinish,
2485 version_t *objver = NULL,
2486 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2487 vector<OSDOp> ops;
2488 int i = init_ops(ops, 1, extra_ops);
2489 ops[i].op.op = CEPH_OSD_OP_READ;
2490 ops[i].op.extent.offset = off;
2491 ops[i].op.extent.length = len;
2492 ops[i].op.extent.truncate_size = trunc_size;
2493 ops[i].op.extent.truncate_seq = trunc_seq;
2494 ops[i].op.flags = op_flags;
2495 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2496 CEPH_OSD_FLAG_READ, onfinish, objver);
2497 o->snapid = snap;
2498 o->outbl = pbl;
2499 ceph_tid_t tid;
2500 op_submit(o, &tid);
2501 return tid;
2502 }
2503 ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc,
2504 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2505 int flags, Context *onfinish, version_t *objver = NULL,
2506 ObjectOperation *extra_ops = NULL) {
2507 vector<OSDOp> ops;
2508 int i = init_ops(ops, 1, extra_ops);
2509 ops[i].op.op = CEPH_OSD_OP_MAPEXT;
2510 ops[i].op.extent.offset = off;
2511 ops[i].op.extent.length = len;
2512 ops[i].op.extent.truncate_size = 0;
2513 ops[i].op.extent.truncate_seq = 0;
2514 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2515 CEPH_OSD_FLAG_READ, onfinish, objver);
2516 o->snapid = snap;
2517 o->outbl = pbl;
2518 ceph_tid_t tid;
2519 op_submit(o, &tid);
2520 return tid;
2521 }
2522 ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
2523 const char *name, snapid_t snap, bufferlist *pbl, int flags,
2524 Context *onfinish,
2525 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2526 vector<OSDOp> ops;
2527 int i = init_ops(ops, 1, extra_ops);
2528 ops[i].op.op = CEPH_OSD_OP_GETXATTR;
2529 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2530 ops[i].op.xattr.value_len = 0;
2531 if (name)
2532 ops[i].indata.append(name, ops[i].op.xattr.name_len);
2533 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2534 CEPH_OSD_FLAG_READ, onfinish, objver);
2535 o->snapid = snap;
2536 o->outbl = pbl;
2537 ceph_tid_t tid;
2538 op_submit(o, &tid);
2539 return tid;
2540 }
2541
2542 ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc,
2543 snapid_t snap, map<string,bufferlist>& attrset,
2544 int flags, Context *onfinish, version_t *objver = NULL,
2545 ObjectOperation *extra_ops = NULL) {
2546 vector<OSDOp> ops;
2547 int i = init_ops(ops, 1, extra_ops);
2548 ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
2549 C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
2550 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2551 CEPH_OSD_FLAG_READ, fin, objver);
2552 o->snapid = snap;
2553 o->outbl = &fin->bl;
2554 ceph_tid_t tid;
2555 op_submit(o, &tid);
2556 return tid;
2557 }
2558
2559 ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc,
2560 snapid_t snap, bufferlist *pbl, int flags,
2561 Context *onfinish, version_t *objver = NULL,
2562 ObjectOperation *extra_ops = NULL) {
2563 return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags |
2564 CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
2565 }
2566
2567
2568 // writes
2569 ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
2570 vector<OSDOp>& ops, ceph::real_time mtime,
2571 const SnapContext& snapc, int flags,
2572 Context *oncommit,
2573 version_t *objver = NULL) {
2574 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2575 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2576 o->mtime = mtime;
2577 o->snapc = snapc;
2578 ceph_tid_t tid;
2579 op_submit(o, &tid);
2580 return tid;
2581 }
2582 Op *prepare_write_op(
2583 const object_t& oid, const object_locator_t& oloc,
2584 uint64_t off, uint64_t len, const SnapContext& snapc,
2585 const bufferlist &bl, ceph::real_time mtime, int flags,
2586 Context *oncommit, version_t *objver = NULL,
2587 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2588 ZTracer::Trace *parent_trace = nullptr) {
2589 vector<OSDOp> ops;
2590 int i = init_ops(ops, 1, extra_ops);
2591 ops[i].op.op = CEPH_OSD_OP_WRITE;
2592 ops[i].op.extent.offset = off;
2593 ops[i].op.extent.length = len;
2594 ops[i].op.extent.truncate_size = 0;
2595 ops[i].op.extent.truncate_seq = 0;
2596 ops[i].indata = bl;
2597 ops[i].op.flags = op_flags;
2598 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2599 CEPH_OSD_FLAG_WRITE, oncommit, objver,
2600 nullptr, parent_trace);
2601 o->mtime = mtime;
2602 o->snapc = snapc;
2603 return o;
2604 }
2605 ceph_tid_t write(
2606 const object_t& oid, const object_locator_t& oloc,
2607 uint64_t off, uint64_t len, const SnapContext& snapc,
2608 const bufferlist &bl, ceph::real_time mtime, int flags,
2609 Context *oncommit, version_t *objver = NULL,
2610 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2611 Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
2612 oncommit, objver, extra_ops, op_flags);
2613 ceph_tid_t tid;
2614 op_submit(o, &tid);
2615 return tid;
2616 }
2617 Op *prepare_append_op(
2618 const object_t& oid, const object_locator_t& oloc,
2619 uint64_t len, const SnapContext& snapc,
2620 const bufferlist &bl, ceph::real_time mtime, int flags,
2621 Context *oncommit,
2622 version_t *objver = NULL,
2623 ObjectOperation *extra_ops = NULL) {
2624 vector<OSDOp> ops;
2625 int i = init_ops(ops, 1, extra_ops);
2626 ops[i].op.op = CEPH_OSD_OP_APPEND;
2627 ops[i].op.extent.offset = 0;
2628 ops[i].op.extent.length = len;
2629 ops[i].op.extent.truncate_size = 0;
2630 ops[i].op.extent.truncate_seq = 0;
2631 ops[i].indata = bl;
2632 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2633 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2634 o->mtime = mtime;
2635 o->snapc = snapc;
2636 return o;
2637 }
2638 ceph_tid_t append(
2639 const object_t& oid, const object_locator_t& oloc,
2640 uint64_t len, const SnapContext& snapc,
2641 const bufferlist &bl, ceph::real_time mtime, int flags,
2642 Context *oncommit,
2643 version_t *objver = NULL,
2644 ObjectOperation *extra_ops = NULL) {
2645 Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
2646 oncommit, objver, extra_ops);
2647 ceph_tid_t tid;
2648 op_submit(o, &tid);
2649 return tid;
2650 }
2651 ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
2652 uint64_t off, uint64_t len, const SnapContext& snapc,
2653 const bufferlist &bl, ceph::real_time mtime, int flags,
2654 uint64_t trunc_size, __u32 trunc_seq,
2655 Context *oncommit,
2656 version_t *objver = NULL,
2657 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2658 vector<OSDOp> ops;
2659 int i = init_ops(ops, 1, extra_ops);
2660 ops[i].op.op = CEPH_OSD_OP_WRITE;
2661 ops[i].op.extent.offset = off;
2662 ops[i].op.extent.length = len;
2663 ops[i].op.extent.truncate_size = trunc_size;
2664 ops[i].op.extent.truncate_seq = trunc_seq;
2665 ops[i].indata = bl;
2666 ops[i].op.flags = op_flags;
2667 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2668 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2669 o->mtime = mtime;
2670 o->snapc = snapc;
2671 ceph_tid_t tid;
2672 op_submit(o, &tid);
2673 return tid;
2674 }
2675 Op *prepare_write_full_op(
2676 const object_t& oid, const object_locator_t& oloc,
2677 const SnapContext& snapc, const bufferlist &bl,
2678 ceph::real_time mtime, int flags,
2679 Context *oncommit, version_t *objver = NULL,
2680 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2681 vector<OSDOp> ops;
2682 int i = init_ops(ops, 1, extra_ops);
2683 ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
2684 ops[i].op.extent.offset = 0;
2685 ops[i].op.extent.length = bl.length();
2686 ops[i].indata = bl;
2687 ops[i].op.flags = op_flags;
2688 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2689 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2690 o->mtime = mtime;
2691 o->snapc = snapc;
2692 return o;
2693 }
2694 ceph_tid_t write_full(
2695 const object_t& oid, const object_locator_t& oloc,
2696 const SnapContext& snapc, const bufferlist &bl,
2697 ceph::real_time mtime, int flags,
2698 Context *oncommit, version_t *objver = NULL,
2699 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2700 Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
2701 oncommit, objver, extra_ops, op_flags);
2702 ceph_tid_t tid;
2703 op_submit(o, &tid);
2704 return tid;
2705 }
2706 Op *prepare_writesame_op(
2707 const object_t& oid, const object_locator_t& oloc,
2708 uint64_t write_len, uint64_t off,
2709 const SnapContext& snapc, const bufferlist &bl,
2710 ceph::real_time mtime, int flags,
2711 Context *oncommit, version_t *objver = NULL,
2712 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2713
2714 vector<OSDOp> ops;
2715 int i = init_ops(ops, 1, extra_ops);
2716 ops[i].op.op = CEPH_OSD_OP_WRITESAME;
2717 ops[i].op.writesame.offset = off;
2718 ops[i].op.writesame.length = write_len;
2719 ops[i].op.writesame.data_length = bl.length();
2720 ops[i].indata = bl;
2721 ops[i].op.flags = op_flags;
2722 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2723 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2724 o->mtime = mtime;
2725 o->snapc = snapc;
2726 return o;
2727 }
2728 ceph_tid_t writesame(
2729 const object_t& oid, const object_locator_t& oloc,
2730 uint64_t write_len, uint64_t off,
2731 const SnapContext& snapc, const bufferlist &bl,
2732 ceph::real_time mtime, int flags,
2733 Context *oncommit, version_t *objver = NULL,
2734 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2735
2736 Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
2737 mtime, flags, oncommit, objver,
2738 extra_ops, op_flags);
2739
2740 ceph_tid_t tid;
2741 op_submit(o, &tid);
2742 return tid;
2743 }
2744 ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
2745 const SnapContext& snapc, ceph::real_time mtime, int flags,
2746 uint64_t trunc_size, __u32 trunc_seq,
2747 Context *oncommit, version_t *objver = NULL,
2748 ObjectOperation *extra_ops = NULL) {
2749 vector<OSDOp> ops;
2750 int i = init_ops(ops, 1, extra_ops);
2751 ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
2752 ops[i].op.extent.offset = trunc_size;
2753 ops[i].op.extent.truncate_size = trunc_size;
2754 ops[i].op.extent.truncate_seq = trunc_seq;
2755 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2756 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2757 o->mtime = mtime;
2758 o->snapc = snapc;
2759 ceph_tid_t tid;
2760 op_submit(o, &tid);
2761 return tid;
2762 }
2763 ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
2764 uint64_t off, uint64_t len, const SnapContext& snapc,
2765 ceph::real_time mtime, int flags, Context *oncommit,
2766 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2767 vector<OSDOp> ops;
2768 int i = init_ops(ops, 1, extra_ops);
2769 ops[i].op.op = CEPH_OSD_OP_ZERO;
2770 ops[i].op.extent.offset = off;
2771 ops[i].op.extent.length = len;
2772 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2773 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2774 o->mtime = mtime;
2775 o->snapc = snapc;
2776 ceph_tid_t tid;
2777 op_submit(o, &tid);
2778 return tid;
2779 }
2780 ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
2781 const SnapContext& snapc, snapid_t snapid,
2782 ceph::real_time mtime, Context *oncommit,
2783 version_t *objver = NULL,
2784 ObjectOperation *extra_ops = NULL) {
2785 vector<OSDOp> ops;
2786 int i = init_ops(ops, 1, extra_ops);
2787 ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
2788 ops[i].op.snap.snapid = snapid;
2789 Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver);
2790 o->mtime = mtime;
2791 o->snapc = snapc;
2792 ceph_tid_t tid;
2793 op_submit(o, &tid);
2794 return tid;
2795 }
2796 ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
2797 const SnapContext& snapc, ceph::real_time mtime, int global_flags,
2798 int create_flags, Context *oncommit,
2799 version_t *objver = NULL,
2800 ObjectOperation *extra_ops = NULL) {
2801 vector<OSDOp> ops;
2802 int i = init_ops(ops, 1, extra_ops);
2803 ops[i].op.op = CEPH_OSD_OP_CREATE;
2804 ops[i].op.flags = create_flags;
2805 Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags |
2806 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2807 o->mtime = mtime;
2808 o->snapc = snapc;
2809 ceph_tid_t tid;
2810 op_submit(o, &tid);
2811 return tid;
2812 }
2813 Op *prepare_remove_op(
2814 const object_t& oid, const object_locator_t& oloc,
2815 const SnapContext& snapc, ceph::real_time mtime, int flags,
2816 Context *oncommit,
2817 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2818 vector<OSDOp> ops;
2819 int i = init_ops(ops, 1, extra_ops);
2820 ops[i].op.op = CEPH_OSD_OP_DELETE;
2821 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2822 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2823 o->mtime = mtime;
2824 o->snapc = snapc;
2825 return o;
2826 }
2827 ceph_tid_t remove(
2828 const object_t& oid, const object_locator_t& oloc,
2829 const SnapContext& snapc, ceph::real_time mtime, int flags,
2830 Context *oncommit,
2831 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2832 Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
2833 oncommit, objver, extra_ops);
2834 ceph_tid_t tid;
2835 op_submit(o, &tid);
2836 return tid;
2837 }
2838
2839 ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
2840 const char *name, const SnapContext& snapc, const bufferlist &bl,
2841 ceph::real_time mtime, int flags,
2842 Context *oncommit,
2843 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2844 vector<OSDOp> ops;
2845 int i = init_ops(ops, 1, extra_ops);
2846 ops[i].op.op = CEPH_OSD_OP_SETXATTR;
2847 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2848 ops[i].op.xattr.value_len = bl.length();
2849 if (name)
2850 ops[i].indata.append(name, ops[i].op.xattr.name_len);
2851 ops[i].indata.append(bl);
2852 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2853 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2854 o->mtime = mtime;
2855 o->snapc = snapc;
2856 ceph_tid_t tid;
2857 op_submit(o, &tid);
2858 return tid;
2859 }
2860 ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
2861 const char *name, const SnapContext& snapc,
2862 ceph::real_time mtime, int flags,
2863 Context *oncommit,
2864 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2865 vector<OSDOp> ops;
2866 int i = init_ops(ops, 1, extra_ops);
2867 ops[i].op.op = CEPH_OSD_OP_RMXATTR;
2868 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2869 ops[i].op.xattr.value_len = 0;
2870 if (name)
2871 ops[i].indata.append(name, ops[i].op.xattr.name_len);
2872 Op *o = new Op(oid, oloc, ops, flags | global_op_flags |
2873 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2874 o->mtime = mtime;
2875 o->snapc = snapc;
2876 ceph_tid_t tid;
2877 op_submit(o, &tid);
2878 return tid;
2879 }
2880
2881 void list_nobjects(NListContext *p, Context *onfinish);
2882 uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
2883 uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c);
2884 void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);
2885
2886 hobject_t enumerate_objects_begin();
2887 hobject_t enumerate_objects_end();
2888 //hobject_t enumerate_objects_begin(int n, int m);
2889 void enumerate_objects(
2890 int64_t pool_id,
2891 const std::string &ns,
2892 const hobject_t &start,
2893 const hobject_t &end,
2894 const uint32_t max,
2895 const bufferlist &filter_bl,
2896 std::list<librados::ListObjectImpl> *result,
2897 hobject_t *next,
2898 Context *on_finish);
2899
2900 void _enumerate_reply(
2901 bufferlist &bl,
2902 int r,
2903 const hobject_t &end,
2904 const int64_t pool_id,
2905 int budget,
2906 epoch_t reply_epoch,
2907 std::list<librados::ListObjectImpl> *result,
2908 hobject_t *next,
2909 Context *on_finish);
2910 friend class C_EnumerateReply;
2911
2912 // -------------------------
2913 // pool ops
2914 private:
2915 void pool_op_submit(PoolOp *op);
2916 void _pool_op_submit(PoolOp *op);
2917 void _finish_pool_op(PoolOp *op, int r);
2918 void _do_delete_pool(int64_t pool, Context *onfinish);
2919 public:
2920 int create_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2921 int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
2922 Context *onfinish);
2923 int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2924 int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish);
2925
2926 int create_pool(string& name, Context *onfinish,
2927 int crush_rule=-1);
2928 int delete_pool(int64_t pool, Context *onfinish);
2929 int delete_pool(const string& name, Context *onfinish);
2930
2931 void handle_pool_op_reply(MPoolOpReply *m);
2932 int pool_op_cancel(ceph_tid_t tid, int r);
2933
2934 // --------------------------
2935 // pool stats
2936 private:
2937 void _poolstat_submit(PoolStatOp *op);
2938 public:
2939 void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
2940 void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
2941 bool *per_pool,
2942 Context *onfinish);
2943 int pool_stat_op_cancel(ceph_tid_t tid, int r);
2944 void _finish_pool_stat_op(PoolStatOp *op, int r);
2945
2946 // ---------------------------
2947 // df stats
2948 private:
2949 void _fs_stats_submit(StatfsOp *op);
2950 public:
2951 void handle_fs_stats_reply(MStatfsReply *m);
2952 void get_fs_stats(struct ceph_statfs& result, boost::optional<int64_t> poolid,
2953 Context *onfinish);
2954 int statfs_op_cancel(ceph_tid_t tid, int r);
2955 void _finish_statfs_op(StatfsOp *op, int r);
2956
2957 // ---------------------------
2958 // some scatter/gather hackery
2959
2960 void _sg_read_finish(vector<ObjectExtent>& extents,
2961 vector<bufferlist>& resultbl,
2962 bufferlist *bl, Context *onfinish);
2963
2964 struct C_SGRead : public Context {
2965 Objecter *objecter;
2966 vector<ObjectExtent> extents;
2967 vector<bufferlist> resultbl;
2968 bufferlist *bl;
2969 Context *onfinish;
2970 C_SGRead(Objecter *ob,
2971 vector<ObjectExtent>& e, vector<bufferlist>& r, bufferlist *b,
2972 Context *c) :
2973 objecter(ob), bl(b), onfinish(c) {
2974 extents.swap(e);
2975 resultbl.swap(r);
2976 }
2977 void finish(int r) override {
2978 objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
2979 }
2980 };
2981
2982 void sg_read_trunc(vector<ObjectExtent>& extents, snapid_t snap,
2983 bufferlist *bl, int flags, uint64_t trunc_size,
2984 __u32 trunc_seq, Context *onfinish, int op_flags = 0) {
2985 if (extents.size() == 1) {
2986 read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
2987 extents[0].length, snap, bl, flags, extents[0].truncate_size,
2988 trunc_seq, onfinish, 0, 0, op_flags);
2989 } else {
2990 C_GatherBuilder gather(cct);
2991 vector<bufferlist> resultbl(extents.size());
2992 int i=0;
2993 for (vector<ObjectExtent>::iterator p = extents.begin();
2994 p != extents.end();
2995 ++p) {
2996 read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
2997 flags, p->truncate_size, trunc_seq, gather.new_sub(),
2998 0, 0, op_flags);
2999 }
3000 gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
3001 gather.activate();
3002 }
3003 }
3004
3005 void sg_read(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl,
3006 int flags, Context *onfinish, int op_flags = 0) {
3007 sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
3008 }
3009
3010 void sg_write_trunc(vector<ObjectExtent>& extents, const SnapContext& snapc,
3011 const bufferlist& bl, ceph::real_time mtime, int flags,
3012 uint64_t trunc_size, __u32 trunc_seq,
3013 Context *oncommit, int op_flags = 0) {
3014 if (extents.size() == 1) {
3015 write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
3016 extents[0].length, snapc, bl, mtime, flags,
3017 extents[0].truncate_size, trunc_seq, oncommit,
3018 0, 0, op_flags);
3019 } else {
3020 C_GatherBuilder gcom(cct, oncommit);
3021 for (vector<ObjectExtent>::iterator p = extents.begin();
3022 p != extents.end();
3023 ++p) {
3024 bufferlist cur;
3025 for (vector<pair<uint64_t,uint64_t> >::iterator bit
3026 = p->buffer_extents.begin();
3027 bit != p->buffer_extents.end();
3028 ++bit)
3029 bl.copy(bit->first, bit->second, cur);
3030 ceph_assert(cur.length() == p->length);
3031 write_trunc(p->oid, p->oloc, p->offset, p->length,
3032 snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
3033 oncommit ? gcom.new_sub():0,
3034 0, 0, op_flags);
3035 }
3036 gcom.activate();
3037 }
3038 }
3039
3040 void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc,
3041 const bufferlist& bl, ceph::real_time mtime, int flags,
3042 Context *oncommit, int op_flags = 0) {
3043 sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
3044 op_flags);
3045 }
3046
3047 void ms_handle_connect(Connection *con) override;
3048 bool ms_handle_reset(Connection *con) override;
3049 void ms_handle_remote_reset(Connection *con) override;
3050 bool ms_handle_refused(Connection *con) override;
3051 bool ms_get_authorizer(int dest_type,
3052 AuthAuthorizer **authorizer) override;
3053
3054 void blacklist_self(bool set);
3055
3056 private:
3057 epoch_t epoch_barrier;
3058 bool retry_writes_after_first_reply;
3059 public:
3060 void set_epoch_barrier(epoch_t epoch);
3061
3062 PerfCounters *get_logger() {
3063 return logger;
3064 }
3065 };
3066
3067 #endif