]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Objecter.h
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / osdc / Objecter.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_OBJECTER_H
16#define CEPH_OBJECTER_H
17
18#include <condition_variable>
19#include <list>
20#include <map>
21#include <mutex>
22#include <memory>
23#include <sstream>
24#include <type_traits>
25
26#include <boost/thread/shared_mutex.hpp>
27
28#include "include/assert.h"
29#include "include/buffer.h"
30#include "include/types.h"
31#include "include/rados/rados_types.hpp"
32
33#include "common/admin_socket.h"
34#include "common/ceph_time.h"
35#include "common/ceph_timer.h"
36#include "common/Finisher.h"
37#include "common/shunique_lock.h"
38#include "common/zipkin_trace.h"
39
40#include "messages/MOSDOp.h"
41#include "osd/OSDMap.h"
42
43using namespace std;
44
45class Context;
46class Messenger;
47class OSDMap;
48class MonClient;
49class Message;
50class Finisher;
51
52class MPoolOpReply;
53
54class MGetPoolStatsReply;
55class MStatfsReply;
56class MCommandReply;
57class MWatchNotify;
58
59class PerfCounters;
60
61// -----------------------------------------
62
63struct ObjectOperation {
64 vector<OSDOp> ops;
65 int flags;
66 int priority;
67
68 vector<bufferlist*> out_bl;
69 vector<Context*> out_handler;
70 vector<int*> out_rval;
71
72 ObjectOperation() : flags(0), priority(0) {}
73 ~ObjectOperation() {
74 while (!out_handler.empty()) {
75 delete out_handler.back();
76 out_handler.pop_back();
77 }
78 }
79
80 size_t size() {
81 return ops.size();
82 }
83
84 void set_last_op_flags(int flags) {
85 assert(!ops.empty());
86 ops.rbegin()->op.flags = flags;
87 }
88
89 class C_TwoContexts;
90 /**
91 * Add a callback to run when this operation completes,
92 * after any other callbacks for it.
93 */
94 void add_handler(Context *extra);
95
96 OSDOp& add_op(int op) {
97 int s = ops.size();
98 ops.resize(s+1);
99 ops[s].op.op = op;
100 out_bl.resize(s+1);
101 out_bl[s] = NULL;
102 out_handler.resize(s+1);
103 out_handler[s] = NULL;
104 out_rval.resize(s+1);
105 out_rval[s] = NULL;
106 return ops[s];
107 }
108 void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) {
109 OSDOp& osd_op = add_op(op);
110 osd_op.op.extent.offset = off;
111 osd_op.op.extent.length = len;
112 osd_op.indata.claim_append(bl);
113 }
114 void add_writesame(int op, uint64_t off, uint64_t write_len,
115 bufferlist& bl) {
116 OSDOp& osd_op = add_op(op);
117 osd_op.op.writesame.offset = off;
118 osd_op.op.writesame.length = write_len;
119 osd_op.op.writesame.data_length = bl.length();
120 osd_op.indata.claim_append(bl);
121 }
122 void add_xattr(int op, const char *name, const bufferlist& data) {
123 OSDOp& osd_op = add_op(op);
124 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
125 osd_op.op.xattr.value_len = data.length();
126 if (name)
127 osd_op.indata.append(name);
128 osd_op.indata.append(data);
129 }
130 void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
131 uint8_t cmp_mode, const bufferlist& data) {
132 OSDOp& osd_op = add_op(op);
133 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
134 osd_op.op.xattr.value_len = data.length();
135 osd_op.op.xattr.cmp_op = cmp_op;
136 osd_op.op.xattr.cmp_mode = cmp_mode;
137 if (name)
138 osd_op.indata.append(name);
139 osd_op.indata.append(data);
140 }
141 void add_call(int op, const char *cname, const char *method,
142 bufferlist &indata,
143 bufferlist *outbl, Context *ctx, int *prval) {
144 OSDOp& osd_op = add_op(op);
145
146 unsigned p = ops.size() - 1;
147 out_handler[p] = ctx;
148 out_bl[p] = outbl;
149 out_rval[p] = prval;
150
151 osd_op.op.cls.class_len = strlen(cname);
152 osd_op.op.cls.method_len = strlen(method);
153 osd_op.op.cls.indata_len = indata.length();
154 osd_op.indata.append(cname, osd_op.op.cls.class_len);
155 osd_op.indata.append(method, osd_op.op.cls.method_len);
156 osd_op.indata.append(indata);
157 }
158 void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
159 epoch_t start_epoch) {
160 OSDOp& osd_op = add_op(op);
161 osd_op.op.pgls.count = count;
162 osd_op.op.pgls.start_epoch = start_epoch;
163 ::encode(cookie, osd_op.indata);
164 }
165 void add_pgls_filter(int op, uint64_t count, const bufferlist& filter,
166 collection_list_handle_t cookie, epoch_t start_epoch) {
167 OSDOp& osd_op = add_op(op);
168 osd_op.op.pgls.count = count;
169 osd_op.op.pgls.start_epoch = start_epoch;
170 string cname = "pg";
171 string mname = "filter";
172 ::encode(cname, osd_op.indata);
173 ::encode(mname, osd_op.indata);
174 osd_op.indata.append(filter);
175 ::encode(cookie, osd_op.indata);
176 }
177 void add_alloc_hint(int op, uint64_t expected_object_size,
178 uint64_t expected_write_size,
179 uint32_t flags) {
180 OSDOp& osd_op = add_op(op);
181 osd_op.op.alloc_hint.expected_object_size = expected_object_size;
182 osd_op.op.alloc_hint.expected_write_size = expected_write_size;
183 osd_op.op.alloc_hint.flags = flags;
184 }
185
186 // ------
187
188 // pg
189 void pg_ls(uint64_t count, bufferlist& filter,
190 collection_list_handle_t cookie, epoch_t start_epoch) {
191 if (filter.length() == 0)
192 add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
193 else
194 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
195 start_epoch);
196 flags |= CEPH_OSD_FLAG_PGOP;
197 }
198
199 void pg_nls(uint64_t count, const bufferlist& filter,
200 collection_list_handle_t cookie, epoch_t start_epoch) {
201 if (filter.length() == 0)
202 add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
203 else
204 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
205 start_epoch);
206 flags |= CEPH_OSD_FLAG_PGOP;
207 }
208
209 void scrub_ls(const librados::object_id_t& start_after,
210 uint64_t max_to_get,
211 std::vector<librados::inconsistent_obj_t> *objects,
212 uint32_t *interval,
213 int *rval);
214 void scrub_ls(const librados::object_id_t& start_after,
215 uint64_t max_to_get,
216 std::vector<librados::inconsistent_snapset_t> *objects,
217 uint32_t *interval,
218 int *rval);
219
220 void create(bool excl) {
221 OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
222 o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
223 }
224
225 struct C_ObjectOperation_stat : public Context {
226 bufferlist bl;
227 uint64_t *psize;
228 ceph::real_time *pmtime;
229 time_t *ptime;
230 struct timespec *pts;
231 int *prval;
232 C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
233 int *prval)
234 : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {}
235 void finish(int r) override {
236 if (r >= 0) {
237 bufferlist::iterator p = bl.begin();
238 try {
239 uint64_t size;
240 ceph::real_time mtime;
241 ::decode(size, p);
242 ::decode(mtime, p);
243 if (psize)
244 *psize = size;
245 if (pmtime)
246 *pmtime = mtime;
247 if (ptime)
248 *ptime = ceph::real_clock::to_time_t(mtime);
249 if (pts)
250 *pts = ceph::real_clock::to_timespec(mtime);
251 } catch (buffer::error& e) {
252 if (prval)
253 *prval = -EIO;
254 }
255 }
256 }
257 };
258 void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
259 add_op(CEPH_OSD_OP_STAT);
260 unsigned p = ops.size() - 1;
261 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL,
262 prval);
263 out_bl[p] = &h->bl;
264 out_handler[p] = h;
265 out_rval[p] = prval;
266 }
267 void stat(uint64_t *psize, time_t *ptime, int *prval) {
268 add_op(CEPH_OSD_OP_STAT);
269 unsigned p = ops.size() - 1;
270 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL,
271 prval);
272 out_bl[p] = &h->bl;
273 out_handler[p] = h;
274 out_rval[p] = prval;
275 }
276 void stat(uint64_t *psize, struct timespec *pts, int *prval) {
277 add_op(CEPH_OSD_OP_STAT);
278 unsigned p = ops.size() - 1;
279 C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts,
280 prval);
281 out_bl[p] = &h->bl;
282 out_handler[p] = h;
283 out_rval[p] = prval;
284 }
285 // object cmpext
286 struct C_ObjectOperation_cmpext : public Context {
287 int *prval;
288 C_ObjectOperation_cmpext(int *prval)
289 : prval(prval) {}
290
291 void finish(int r) {
292 if (prval)
293 *prval = r;
294 }
295 };
296
297 void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval) {
298 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
299 unsigned p = ops.size() - 1;
300 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
301 out_handler[p] = h;
302 out_rval[p] = prval;
303 }
304
305 // Used by C API
306 void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
307 bufferlist cmp_bl;
308 cmp_bl.append(cmp_buf, cmp_len);
309 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
310 unsigned p = ops.size() - 1;
311 C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval);
312 out_handler[p] = h;
313 out_rval[p] = prval;
314 }
315
316 void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval,
317 Context* ctx) {
318 bufferlist bl;
319 add_data(CEPH_OSD_OP_READ, off, len, bl);
320 unsigned p = ops.size() - 1;
321 out_bl[p] = pbl;
322 out_rval[p] = prval;
323 out_handler[p] = ctx;
324 }
325
326 struct C_ObjectOperation_sparse_read : public Context {
327 bufferlist bl;
328 bufferlist *data_bl;
329 std::map<uint64_t, uint64_t> *extents;
330 int *prval;
331 C_ObjectOperation_sparse_read(bufferlist *data_bl,
332 std::map<uint64_t, uint64_t> *extents,
333 int *prval)
334 : data_bl(data_bl), extents(extents), prval(prval) {}
335 void finish(int r) override {
336 bufferlist::iterator iter = bl.begin();
337 if (r >= 0) {
338 try {
339 ::decode(*extents, iter);
340 ::decode(*data_bl, iter);
341 } catch (buffer::error& e) {
342 if (prval)
343 *prval = -EIO;
344 }
345 }
346 }
347 };
348 void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m,
349 bufferlist *data_bl, int *prval) {
350 bufferlist bl;
351 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
352 unsigned p = ops.size() - 1;
353 C_ObjectOperation_sparse_read *h =
354 new C_ObjectOperation_sparse_read(data_bl, m, prval);
355 out_bl[p] = &h->bl;
356 out_handler[p] = h;
357 out_rval[p] = prval;
358 }
359 void write(uint64_t off, bufferlist& bl,
360 uint64_t truncate_size,
361 uint32_t truncate_seq) {
362 add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
363 OSDOp& o = *ops.rbegin();
364 o.op.extent.truncate_size = truncate_size;
365 o.op.extent.truncate_seq = truncate_seq;
366 }
367 void write(uint64_t off, bufferlist& bl) {
368 write(off, bl, 0, 0);
369 }
370 void write_full(bufferlist& bl) {
371 add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
372 }
373 void writesame(uint64_t off, uint64_t write_len, bufferlist& bl) {
374 add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
375 }
376 void append(bufferlist& bl) {
377 add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
378 }
379 void zero(uint64_t off, uint64_t len) {
380 bufferlist bl;
381 add_data(CEPH_OSD_OP_ZERO, off, len, bl);
382 }
383 void truncate(uint64_t off) {
384 bufferlist bl;
385 add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
386 }
387 void remove() {
388 bufferlist bl;
389 add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
390 }
391 void mapext(uint64_t off, uint64_t len) {
392 bufferlist bl;
393 add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
394 }
395 void sparse_read(uint64_t off, uint64_t len) {
396 bufferlist bl;
397 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
398 }
399
400 void checksum(uint8_t type, const bufferlist &init_value_bl,
401 uint64_t off, uint64_t len, size_t chunk_size,
402 bufferlist *pbl, int *prval, Context *ctx) {
403 OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
404 osd_op.op.checksum.offset = off;
405 osd_op.op.checksum.length = len;
406 osd_op.op.checksum.type = type;
407 osd_op.op.checksum.chunk_size = chunk_size;
408 osd_op.indata.append(init_value_bl);
409
410 unsigned p = ops.size() - 1;
411 out_bl[p] = pbl;
412 out_rval[p] = prval;
413 out_handler[p] = ctx;
414 }
415
416 // object attrs
417 void getxattr(const char *name, bufferlist *pbl, int *prval) {
418 bufferlist bl;
419 add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
420 unsigned p = ops.size() - 1;
421 out_bl[p] = pbl;
422 out_rval[p] = prval;
423 }
424 struct C_ObjectOperation_decodevals : public Context {
425 uint64_t max_entries;
426 bufferlist bl;
427 std::map<std::string,bufferlist> *pattrs;
428 bool *ptruncated;
429 int *prval;
430 C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,bufferlist> *pa,
431 bool *pt, int *pr)
432 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
433 if (ptruncated) {
434 *ptruncated = false;
435 }
436 }
437 void finish(int r) override {
438 if (r >= 0) {
439 bufferlist::iterator p = bl.begin();
440 try {
441 if (pattrs)
442 ::decode(*pattrs, p);
443 if (ptruncated) {
444 std::map<std::string,bufferlist> ignore;
445 if (!pattrs) {
446 ::decode(ignore, p);
447 pattrs = &ignore;
448 }
449 if (!p.end()) {
450 ::decode(*ptruncated, p);
451 } else {
452 // the OSD did not provide this. since old OSDs do not
453 // enfoce omap result limits either, we can infer it from
454 // the size of the result
455 *ptruncated = (pattrs->size() == max_entries);
456 }
457 }
458 }
459 catch (buffer::error& e) {
460 if (prval)
461 *prval = -EIO;
462 }
463 }
464 }
465 };
466 struct C_ObjectOperation_decodekeys : public Context {
467 uint64_t max_entries;
468 bufferlist bl;
469 std::set<std::string> *pattrs;
470 bool *ptruncated;
471 int *prval;
472 C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt,
473 int *pr)
474 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) {
475 if (ptruncated) {
476 *ptruncated = false;
477 }
478 }
479 void finish(int r) override {
480 if (r >= 0) {
481 bufferlist::iterator p = bl.begin();
482 try {
483 if (pattrs)
484 ::decode(*pattrs, p);
485 if (ptruncated) {
486 std::set<std::string> ignore;
487 if (!pattrs) {
488 ::decode(ignore, p);
489 pattrs = &ignore;
490 }
491 if (!p.end()) {
492 ::decode(*ptruncated, p);
493 } else {
494 // the OSD did not provide this. since old OSDs do not
495 // enfoce omap result limits either, we can infer it from
496 // the size of the result
497 *ptruncated = (pattrs->size() == max_entries);
498 }
499 }
500 }
501 catch (buffer::error& e) {
502 if (prval)
503 *prval = -EIO;
504 }
505 }
506 }
507 };
508 struct C_ObjectOperation_decodewatchers : public Context {
509 bufferlist bl;
510 list<obj_watch_t> *pwatchers;
511 int *prval;
512 C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr)
513 : pwatchers(pw), prval(pr) {}
514 void finish(int r) override {
515 if (r >= 0) {
516 bufferlist::iterator p = bl.begin();
517 try {
518 obj_list_watch_response_t resp;
519 ::decode(resp, p);
520 if (pwatchers) {
521 for (list<watch_item_t>::iterator i = resp.entries.begin() ;
522 i != resp.entries.end() ; ++i) {
523 obj_watch_t ow;
524 ostringstream sa;
525 sa << i->addr;
526 strncpy(ow.addr, sa.str().c_str(), 256);
527 ow.watcher_id = i->name.num();
528 ow.cookie = i->cookie;
529 ow.timeout_seconds = i->timeout_seconds;
530 pwatchers->push_back(ow);
531 }
532 }
533 }
534 catch (buffer::error& e) {
535 if (prval)
536 *prval = -EIO;
537 }
538 }
539 }
540 };
541 struct C_ObjectOperation_decodesnaps : public Context {
542 bufferlist bl;
543 librados::snap_set_t *psnaps;
544 int *prval;
545 C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr)
546 : psnaps(ps), prval(pr) {}
547 void finish(int r) override {
548 if (r >= 0) {
549 bufferlist::iterator p = bl.begin();
550 try {
551 obj_list_snap_response_t resp;
552 ::decode(resp, p);
553 if (psnaps) {
554 psnaps->clones.clear();
555 for (vector<clone_info>::iterator ci = resp.clones.begin();
556 ci != resp.clones.end();
557 ++ci) {
558 librados::clone_info_t clone;
559
560 clone.cloneid = ci->cloneid;
561 clone.snaps.reserve(ci->snaps.size());
562 clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
563 ci->snaps.end());
564 clone.overlap = ci->overlap;
565 clone.size = ci->size;
566
567 psnaps->clones.push_back(clone);
568 }
569 psnaps->seq = resp.seq;
570 }
571 } catch (buffer::error& e) {
572 if (prval)
573 *prval = -EIO;
574 }
575 }
576 }
577 };
578 void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) {
579 add_op(CEPH_OSD_OP_GETXATTRS);
580 if (pattrs || prval) {
581 unsigned p = ops.size() - 1;
582 C_ObjectOperation_decodevals *h
583 = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval);
584 out_handler[p] = h;
585 out_bl[p] = &h->bl;
586 out_rval[p] = prval;
587 }
588 }
589 void setxattr(const char *name, const bufferlist& bl) {
590 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
591 }
592 void setxattr(const char *name, const string& s) {
593 bufferlist bl;
594 bl.append(s);
595 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
596 }
597 void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
598 const bufferlist& bl) {
599 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
600 }
601 void rmxattr(const char *name) {
602 bufferlist bl;
603 add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
604 }
605 void setxattrs(map<string, bufferlist>& attrs) {
606 bufferlist bl;
607 ::encode(attrs, bl);
608 add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
609 }
610 void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
611 bufferlist bl;
612 ::encode(attrs, bl);
613 add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
614 }
615
616 // trivialmap
617 void tmap_update(bufferlist& bl) {
618 add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
619 }
620 void tmap_put(bufferlist& bl) {
621 add_data(CEPH_OSD_OP_TMAPPUT, 0, bl.length(), bl);
622 }
623 void tmap_get(bufferlist *pbl, int *prval) {
624 add_op(CEPH_OSD_OP_TMAPGET);
625 unsigned p = ops.size() - 1;
626 out_bl[p] = pbl;
627 out_rval[p] = prval;
628 }
629 void tmap_get() {
630 add_op(CEPH_OSD_OP_TMAPGET);
631 }
632 void tmap_to_omap(bool nullok=false) {
633 OSDOp& osd_op = add_op(CEPH_OSD_OP_TMAP2OMAP);
634 if (nullok)
635 osd_op.op.tmap2omap.flags = CEPH_OSD_TMAP2OMAP_NULLOK;
636 }
637
638 // objectmap
639 void omap_get_keys(const string &start_after,
640 uint64_t max_to_get,
641 std::set<std::string> *out_set,
642 bool *ptruncated,
643 int *prval) {
644 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
645 bufferlist bl;
646 ::encode(start_after, bl);
647 ::encode(max_to_get, bl);
648 op.op.extent.offset = 0;
649 op.op.extent.length = bl.length();
650 op.indata.claim_append(bl);
651 if (prval || ptruncated || out_set) {
652 unsigned p = ops.size() - 1;
653 C_ObjectOperation_decodekeys *h =
654 new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval);
655 out_handler[p] = h;
656 out_bl[p] = &h->bl;
657 out_rval[p] = prval;
658 }
659 }
660
661 void omap_get_vals(const string &start_after,
662 const string &filter_prefix,
663 uint64_t max_to_get,
664 std::map<std::string, bufferlist> *out_set,
665 bool *ptruncated,
666 int *prval) {
667 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
668 bufferlist bl;
669 ::encode(start_after, bl);
670 ::encode(max_to_get, bl);
671 ::encode(filter_prefix, bl);
672 op.op.extent.offset = 0;
673 op.op.extent.length = bl.length();
674 op.indata.claim_append(bl);
675 if (prval || out_set || ptruncated) {
676 unsigned p = ops.size() - 1;
677 C_ObjectOperation_decodevals *h =
678 new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval);
679 out_handler[p] = h;
680 out_bl[p] = &h->bl;
681 out_rval[p] = prval;
682 }
683 }
684
685 void omap_get_vals_by_keys(const std::set<std::string> &to_get,
686 std::map<std::string, bufferlist> *out_set,
687 int *prval) {
688 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
689 bufferlist bl;
690 ::encode(to_get, bl);
691 op.op.extent.offset = 0;
692 op.op.extent.length = bl.length();
693 op.indata.claim_append(bl);
694 if (prval || out_set) {
695 unsigned p = ops.size() - 1;
696 C_ObjectOperation_decodevals *h =
697 new C_ObjectOperation_decodevals(0, out_set, nullptr, prval);
698 out_handler[p] = h;
699 out_bl[p] = &h->bl;
700 out_rval[p] = prval;
701 }
702 }
703
704 void omap_cmp(const std::map<std::string, pair<bufferlist,int> > &assertions,
705 int *prval) {
706 OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
707 bufferlist bl;
708 ::encode(assertions, bl);
709 op.op.extent.offset = 0;
710 op.op.extent.length = bl.length();
711 op.indata.claim_append(bl);
712 if (prval) {
713 unsigned p = ops.size() - 1;
714 out_rval[p] = prval;
715 }
716 }
717
718 struct C_ObjectOperation_copyget : public Context {
719 bufferlist bl;
720 object_copy_cursor_t *cursor;
721 uint64_t *out_size;
722 ceph::real_time *out_mtime;
723 std::map<std::string,bufferlist> *out_attrs;
724 bufferlist *out_data, *out_omap_header, *out_omap_data;
725 vector<snapid_t> *out_snaps;
726 snapid_t *out_snap_seq;
727 uint32_t *out_flags;
728 uint32_t *out_data_digest;
729 uint32_t *out_omap_digest;
730 vector<pair<osd_reqid_t, version_t> > *out_reqids;
731 uint64_t *out_truncate_seq;
732 uint64_t *out_truncate_size;
733 int *prval;
734 C_ObjectOperation_copyget(object_copy_cursor_t *c,
735 uint64_t *s,
736 ceph::real_time *m,
737 std::map<std::string,bufferlist> *a,
738 bufferlist *d, bufferlist *oh,
739 bufferlist *o,
740 std::vector<snapid_t> *osnaps,
741 snapid_t *osnap_seq,
742 uint32_t *flags,
743 uint32_t *dd,
744 uint32_t *od,
745 vector<pair<osd_reqid_t, version_t> > *oreqids,
746 uint64_t *otseq,
747 uint64_t *otsize,
748 int *r)
749 : cursor(c),
750 out_size(s), out_mtime(m),
751 out_attrs(a), out_data(d), out_omap_header(oh),
752 out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
753 out_flags(flags), out_data_digest(dd), out_omap_digest(od),
754 out_reqids(oreqids),
755 out_truncate_seq(otseq),
756 out_truncate_size(otsize),
757 prval(r) {}
758 void finish(int r) override {
759 // reqids are copied on ENOENT
760 if (r < 0 && r != -ENOENT)
761 return;
762 try {
763 bufferlist::iterator p = bl.begin();
764 object_copy_data_t copy_reply;
765 ::decode(copy_reply, p);
766 if (r == -ENOENT) {
767 if (out_reqids)
768 *out_reqids = copy_reply.reqids;
769 return;
770 }
771 if (out_size)
772 *out_size = copy_reply.size;
773 if (out_mtime)
774 *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
775 if (out_attrs)
776 *out_attrs = copy_reply.attrs;
777 if (out_data)
778 out_data->claim_append(copy_reply.data);
779 if (out_omap_header)
780 out_omap_header->claim_append(copy_reply.omap_header);
781 if (out_omap_data)
782 *out_omap_data = copy_reply.omap_data;
783 if (out_snaps)
784 *out_snaps = copy_reply.snaps;
785 if (out_snap_seq)
786 *out_snap_seq = copy_reply.snap_seq;
787 if (out_flags)
788 *out_flags = copy_reply.flags;
789 if (out_data_digest)
790 *out_data_digest = copy_reply.data_digest;
791 if (out_omap_digest)
792 *out_omap_digest = copy_reply.omap_digest;
793 if (out_reqids)
794 *out_reqids = copy_reply.reqids;
795 if (out_truncate_seq)
796 *out_truncate_seq = copy_reply.truncate_seq;
797 if (out_truncate_size)
798 *out_truncate_size = copy_reply.truncate_size;
799 *cursor = copy_reply.cursor;
800 } catch (buffer::error& e) {
801 if (prval)
802 *prval = -EIO;
803 }
804 }
805 };
806
807 void copy_get(object_copy_cursor_t *cursor,
808 uint64_t max,
809 uint64_t *out_size,
810 ceph::real_time *out_mtime,
811 std::map<std::string,bufferlist> *out_attrs,
812 bufferlist *out_data,
813 bufferlist *out_omap_header,
814 bufferlist *out_omap_data,
815 vector<snapid_t> *out_snaps,
816 snapid_t *out_snap_seq,
817 uint32_t *out_flags,
818 uint32_t *out_data_digest,
819 uint32_t *out_omap_digest,
820 vector<pair<osd_reqid_t, version_t> > *out_reqids,
821 uint64_t *truncate_seq,
822 uint64_t *truncate_size,
823 int *prval) {
824 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
825 osd_op.op.copy_get.max = max;
826 ::encode(*cursor, osd_op.indata);
827 ::encode(max, osd_op.indata);
828 unsigned p = ops.size() - 1;
829 out_rval[p] = prval;
830 C_ObjectOperation_copyget *h =
831 new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
832 out_attrs, out_data, out_omap_header,
833 out_omap_data, out_snaps, out_snap_seq,
834 out_flags, out_data_digest,
835 out_omap_digest, out_reqids, truncate_seq,
836 truncate_size, prval);
837 out_bl[p] = &h->bl;
838 out_handler[p] = h;
839 }
840
841 void undirty() {
842 add_op(CEPH_OSD_OP_UNDIRTY);
843 }
844
845 struct C_ObjectOperation_isdirty : public Context {
846 bufferlist bl;
847 bool *pisdirty;
848 int *prval;
849 C_ObjectOperation_isdirty(bool *p, int *r)
850 : pisdirty(p), prval(r) {}
851 void finish(int r) override {
852 if (r < 0)
853 return;
854 try {
855 bufferlist::iterator p = bl.begin();
856 bool isdirty;
857 ::decode(isdirty, p);
858 if (pisdirty)
859 *pisdirty = isdirty;
860 } catch (buffer::error& e) {
861 if (prval)
862 *prval = -EIO;
863 }
864 }
865 };
866
867 void is_dirty(bool *pisdirty, int *prval) {
868 add_op(CEPH_OSD_OP_ISDIRTY);
869 unsigned p = ops.size() - 1;
870 out_rval[p] = prval;
871 C_ObjectOperation_isdirty *h =
872 new C_ObjectOperation_isdirty(pisdirty, prval);
873 out_bl[p] = &h->bl;
874 out_handler[p] = h;
875 }
876
877 struct C_ObjectOperation_hit_set_ls : public Context {
878 bufferlist bl;
879 std::list< std::pair<time_t, time_t> > *ptls;
880 std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
881 int *prval;
882 C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
883 std::list< std::pair<ceph::real_time,
884 ceph::real_time> > *ut,
885 int *r)
886 : ptls(t), putls(ut), prval(r) {}
887 void finish(int r) override {
888 if (r < 0)
889 return;
890 try {
891 bufferlist::iterator p = bl.begin();
892 std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
893 ::decode(ls, p);
894 if (ptls) {
895 ptls->clear();
896 for (auto p = ls.begin(); p != ls.end(); ++p)
897 // round initial timestamp up to the next full second to
898 // keep this a valid interval.
899 ptls->push_back(
900 make_pair(ceph::real_clock::to_time_t(
901 ceph::ceil(p->first,
902 // Sadly, no time literals until C++14.
903 std::chrono::seconds(1))),
904 ceph::real_clock::to_time_t(p->second)));
905 }
906 if (putls)
907 putls->swap(ls);
908 } catch (buffer::error& e) {
909 r = -EIO;
910 }
911 if (prval)
912 *prval = r;
913 }
914 };
915
916 /**
917 * list available HitSets.
918 *
919 * We will get back a list of time intervals. Note that the most
920 * recent range may have an empty end timestamp if it is still
921 * accumulating.
922 *
923 * @param pls [out] list of time intervals
924 * @param prval [out] return value
925 */
926 void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
927 add_op(CEPH_OSD_OP_PG_HITSET_LS);
928 unsigned p = ops.size() - 1;
929 out_rval[p] = prval;
930 C_ObjectOperation_hit_set_ls *h =
931 new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
932 out_bl[p] = &h->bl;
933 out_handler[p] = h;
934 }
935 void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
936 int *prval) {
937 add_op(CEPH_OSD_OP_PG_HITSET_LS);
938 unsigned p = ops.size() - 1;
939 out_rval[p] = prval;
940 C_ObjectOperation_hit_set_ls *h =
941 new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
942 out_bl[p] = &h->bl;
943 out_handler[p] = h;
944 }
945
946 /**
947 * get HitSet
948 *
949 * Return an encoded HitSet that includes the provided time
950 * interval.
951 *
952 * @param stamp [in] timestamp
953 * @param pbl [out] target buffer for encoded HitSet
954 * @param prval [out] return value
955 */
956 void hit_set_get(ceph::real_time stamp, bufferlist *pbl, int *prval) {
957 OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
958 op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
959 unsigned p = ops.size() - 1;
960 out_rval[p] = prval;
961 out_bl[p] = pbl;
962 }
963
964 void omap_get_header(bufferlist *bl, int *prval) {
965 add_op(CEPH_OSD_OP_OMAPGETHEADER);
966 unsigned p = ops.size() - 1;
967 out_bl[p] = bl;
968 out_rval[p] = prval;
969 }
970
971 void omap_set(const map<string, bufferlist> &map) {
972 bufferlist bl;
973 ::encode(map, bl);
974 add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
975 }
976
977 void omap_set_header(bufferlist &bl) {
978 add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
979 }
980
981 void omap_clear() {
982 add_op(CEPH_OSD_OP_OMAPCLEAR);
983 }
984
985 void omap_rm_keys(const std::set<std::string> &to_remove) {
986 bufferlist bl;
987 ::encode(to_remove, bl);
988 add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
989 }
990
991 // object classes
992 void call(const char *cname, const char *method, bufferlist &indata) {
993 add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
994 }
995
996 void call(const char *cname, const char *method, bufferlist &indata,
997 bufferlist *outdata, Context *ctx, int *prval) {
998 add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
999 }
1000
1001 // watch/notify
1002 void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) {
1003 OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH);
1004 osd_op.op.watch.cookie = cookie;
1005 osd_op.op.watch.op = op;
1006 osd_op.op.watch.timeout = timeout;
1007 }
1008
1009 void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
1010 bufferlist &bl, bufferlist *inbl) {
1011 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
1012 osd_op.op.notify.cookie = cookie;
1013 ::encode(prot_ver, *inbl);
1014 ::encode(timeout, *inbl);
1015 ::encode(bl, *inbl);
1016 osd_op.indata.append(*inbl);
1017 }
1018
1019 void notify_ack(uint64_t notify_id, uint64_t cookie,
1020 bufferlist& reply_bl) {
1021 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
1022 bufferlist bl;
1023 ::encode(notify_id, bl);
1024 ::encode(cookie, bl);
1025 ::encode(reply_bl, bl);
1026 osd_op.indata.append(bl);
1027 }
1028
1029 void list_watchers(list<obj_watch_t> *out,
1030 int *prval) {
1031 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS);
1032 if (prval || out) {
1033 unsigned p = ops.size() - 1;
1034 C_ObjectOperation_decodewatchers *h =
1035 new C_ObjectOperation_decodewatchers(out, prval);
1036 out_handler[p] = h;
1037 out_bl[p] = &h->bl;
1038 out_rval[p] = prval;
1039 }
1040 }
1041
1042 void list_snaps(librados::snap_set_t *out, int *prval) {
1043 (void)add_op(CEPH_OSD_OP_LIST_SNAPS);
1044 if (prval || out) {
1045 unsigned p = ops.size() - 1;
1046 C_ObjectOperation_decodesnaps *h =
1047 new C_ObjectOperation_decodesnaps(out, prval);
1048 out_handler[p] = h;
1049 out_bl[p] = &h->bl;
1050 out_rval[p] = prval;
1051 }
1052 }
1053
1054 void assert_version(uint64_t ver) {
1055 OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
1056 osd_op.op.assert_ver.ver = ver;
1057 }
1058
1059 void cmpxattr(const char *name, const bufferlist& val,
1060 int op, int mode) {
1061 add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
1062 OSDOp& o = *ops.rbegin();
1063 o.op.xattr.cmp_op = op;
1064 o.op.xattr.cmp_mode = mode;
1065 }
1066
1067 void rollback(uint64_t snapid) {
1068 OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
1069 osd_op.op.snap.snapid = snapid;
1070 }
1071
1072 void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
1073 version_t src_version, unsigned flags,
1074 unsigned src_fadvise_flags) {
1075 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
1076 osd_op.op.copy_from.snapid = snapid;
1077 osd_op.op.copy_from.src_version = src_version;
1078 osd_op.op.copy_from.flags = flags;
1079 osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
1080 ::encode(src, osd_op.indata);
1081 ::encode(src_oloc, osd_op.indata);
1082 }
1083
1084 /**
1085 * writeback content to backing tier
1086 *
1087 * If object is marked dirty in the cache tier, write back content
1088 * to backing tier. If the object is clean this is a no-op.
1089 *
1090 * If writeback races with an update, the update will block.
1091 *
1092 * use with IGNORE_CACHE to avoid triggering promote.
1093 */
1094 void cache_flush() {
1095 add_op(CEPH_OSD_OP_CACHE_FLUSH);
1096 }
1097
1098 /**
1099 * writeback content to backing tier
1100 *
1101 * If object is marked dirty in the cache tier, write back content
1102 * to backing tier. If the object is clean this is a no-op.
1103 *
1104 * If writeback races with an update, return EAGAIN. Requires that
1105 * the SKIPRWLOCKS flag be set.
1106 *
1107 * use with IGNORE_CACHE to avoid triggering promote.
1108 */
1109 void cache_try_flush() {
1110 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
1111 }
1112
1113 /**
1114 * evict object from cache tier
1115 *
1116 * If object is marked clean, remove the object from the cache tier.
1117 * Otherwise, return EBUSY.
1118 *
1119 * use with IGNORE_CACHE to avoid triggering promote.
1120 */
1121 void cache_evict() {
1122 add_op(CEPH_OSD_OP_CACHE_EVICT);
1123 }
1124
1125 void set_alloc_hint(uint64_t expected_object_size,
1126 uint64_t expected_write_size,
1127 uint32_t flags) {
1128 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
1129 expected_write_size, flags);
1130
1131 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1132 // not worth a feature bit. Set FAILOK per-op flag to make
1133 // sure older osds don't trip over an unsupported opcode.
1134 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1135 }
1136
1137 void dup(vector<OSDOp>& sops) {
1138 ops = sops;
1139 out_bl.resize(sops.size());
1140 out_handler.resize(sops.size());
1141 out_rval.resize(sops.size());
1142 for (uint32_t i = 0; i < sops.size(); i++) {
1143 out_bl[i] = &sops[i].outdata;
1144 out_handler[i] = NULL;
1145 out_rval[i] = &sops[i].rval;
1146 }
1147 }
1148
1149 /**
1150 * Pin/unpin an object in cache tier
1151 */
1152 void cache_pin() {
1153 add_op(CEPH_OSD_OP_CACHE_PIN);
1154 }
1155
1156 void cache_unpin() {
1157 add_op(CEPH_OSD_OP_CACHE_UNPIN);
1158 }
1159};
1160
1161
1162// ----------------
1163
1164
1165class Objecter : public md_config_obs_t, public Dispatcher {
1166public:
1167 // config observer bits
1168 const char** get_tracked_conf_keys() const override;
1169 void handle_conf_change(const struct md_config_t *conf,
1170 const std::set <std::string> &changed) override;
1171
1172public:
1173 Messenger *messenger;
1174 MonClient *monc;
1175 Finisher *finisher;
1176 ZTracer::Endpoint trace_endpoint;
1177private:
1178 OSDMap *osdmap;
1179public:
1180 using Dispatcher::cct;
1181 std::multimap<string,string> crush_location;
1182
1183 atomic_t initialized;
1184
1185private:
1186 atomic64_t last_tid;
1187 atomic_t inflight_ops;
1188 atomic_t client_inc;
1189 uint64_t max_linger_id;
1190 atomic_t num_in_flight;
1191 atomic_t global_op_flags; // flags which are applied to each IO op
1192 bool keep_balanced_budget;
1193 bool honor_osdmap_full;
1194 bool osdmap_full_try;
1195
1196public:
1197 void maybe_request_map();
1198private:
1199
1200 void _maybe_request_map();
1201
1202 version_t last_seen_osdmap_version;
1203 version_t last_seen_pgmap_version;
1204
1205 mutable boost::shared_mutex rwlock;
1206 using lock_guard = std::unique_lock<decltype(rwlock)>;
1207 using unique_lock = std::unique_lock<decltype(rwlock)>;
1208 using shared_lock = boost::shared_lock<decltype(rwlock)>;
1209 using shunique_lock = ceph::shunique_lock<decltype(rwlock)>;
1210 ceph::timer<ceph::mono_clock> timer;
1211
1212 PerfCounters *logger;
1213
1214 uint64_t tick_event;
1215
1216 void start_tick();
1217 void tick();
1218 void update_crush_location();
1219
1220 class RequestStateHook;
1221
1222 RequestStateHook *m_request_state_hook;
1223
1224public:
1225 /*** track pending operations ***/
1226 // read
1227 public:
1228
1229 struct OSDSession;
1230
1231 struct op_target_t {
1232 int flags = 0;
1233
1234 epoch_t epoch = 0; ///< latest epoch we calculated the mapping
1235
1236 object_t base_oid;
1237 object_locator_t base_oloc;
1238 object_t target_oid;
1239 object_locator_t target_oloc;
1240
1241 ///< true if we are directed at base_pgid, not base_oid
1242 bool precalc_pgid = false;
1243
1244 ///< true if we have ever mapped to a valid pool
1245 bool pool_ever_existed = false;
1246
1247 ///< explcit pg target, if any
1248 pg_t base_pgid;
1249
1250 pg_t pgid; ///< last (raw) pg we mapped to
1251 spg_t actual_pgid; ///< last (actual) spg_t we mapped to
1252 unsigned pg_num = 0; ///< last pg_num we mapped to
1253 unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
1254 vector<int> up; ///< set of up osds for last pg we mapped to
1255 vector<int> acting; ///< set of acting osds for last pg we mapped to
1256 int up_primary = -1; ///< last up_primary we mapped to
1257 int acting_primary = -1; ///< last acting_primary we mapped to
1258 int size = -1; ///< the size of the pool when were were last mapped
1259 int min_size = -1; ///< the min size of the pool when were were last mapped
1260 bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise
1261
1262 bool used_replica = false;
1263 bool paused = false;
1264
1265 int osd = -1; ///< the final target osd, or -1
1266
1267 epoch_t last_force_resend = 0;
1268
1269 op_target_t(object_t oid, object_locator_t oloc, int flags)
1270 : flags(flags),
1271 base_oid(oid),
1272 base_oloc(oloc)
1273 {}
1274
1275 op_target_t(pg_t pgid)
1276 : base_oloc(pgid.pool(), pgid.ps()),
1277 precalc_pgid(true),
1278 base_pgid(pgid)
1279 {}
1280
1281 op_target_t() = default;
1282
1283 hobject_t get_hobj() {
1284 return hobject_t(target_oid,
1285 target_oloc.key,
1286 CEPH_NOSNAP,
1287 target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
1288 target_oloc.pool,
1289 target_oloc.nspace);
1290 }
1291
1292 bool contained_by(const hobject_t& begin, const hobject_t& end) {
1293 hobject_t h = get_hobj();
1294 int r = cmp(h, begin);
1295 return r == 0 || (r > 0 && h < end);
1296 }
1297
1298 void dump(Formatter *f) const;
1299 };
1300
1301 struct Op : public RefCountedObject {
1302 OSDSession *session;
1303 int incarnation;
1304
1305 op_target_t target;
1306
1307 ConnectionRef con; // for rx buffer only
1308 uint64_t features; // explicitly specified op features
1309
1310 vector<OSDOp> ops;
1311
1312 snapid_t snapid;
1313 SnapContext snapc;
1314 ceph::real_time mtime;
1315
1316 bufferlist *outbl;
1317 vector<bufferlist*> out_bl;
1318 vector<Context*> out_handler;
1319 vector<int*> out_rval;
1320
1321 int priority;
1322 Context *onfinish;
1323 uint64_t ontimeout;
1324
1325 ceph_tid_t tid;
1326 int attempts;
1327
1328 version_t *objver;
1329 epoch_t *reply_epoch;
1330
1331 ceph::mono_time stamp;
1332
1333 epoch_t map_dne_bound;
1334
1335 bool budgeted;
1336
1337 /// true if we should resend this message on failure
1338 bool should_resend;
1339
1340 /// true if the throttle budget is get/put on a series of OPs,
1341 /// instead of per OP basis, when this flag is set, the budget is
1342 /// acquired before sending the very first OP of the series and
1343 /// released upon receiving the last OP reply.
1344 bool ctx_budgeted;
1345
1346 int *data_offset;
1347
1348 osd_reqid_t reqid; // explicitly setting reqid
1349 ZTracer::Trace trace;
1350
1351 Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
1352 int f, Context *fin, version_t *ov, int *offset = NULL,
1353 ZTracer::Trace *parent_trace = nullptr) :
1354 session(NULL), incarnation(0),
1355 target(o, ol, f),
1356 con(NULL),
1357 features(CEPH_FEATURES_SUPPORTED_DEFAULT),
1358 snapid(CEPH_NOSNAP),
1359 outbl(NULL),
1360 priority(0),
1361 onfinish(fin),
1362 ontimeout(0),
1363 tid(0),
1364 attempts(0),
1365 objver(ov),
1366 reply_epoch(NULL),
1367 map_dne_bound(0),
1368 budgeted(false),
1369 should_resend(true),
1370 ctx_budgeted(false),
1371 data_offset(offset) {
1372 ops.swap(op);
1373
1374 /* initialize out_* to match op vector */
1375 out_bl.resize(ops.size());
1376 out_rval.resize(ops.size());
1377 out_handler.resize(ops.size());
1378 for (unsigned i = 0; i < ops.size(); i++) {
1379 out_bl[i] = NULL;
1380 out_handler[i] = NULL;
1381 out_rval[i] = NULL;
1382 }
1383
1384 if (target.base_oloc.key == o)
1385 target.base_oloc.key.clear();
1386
1387 if (parent_trace && parent_trace->valid())
1388 trace.init("op", nullptr, parent_trace);
1389 }
1390
1391 bool operator<(const Op& other) const {
1392 return tid < other.tid;
1393 }
1394
1395 bool respects_full() const {
1396 return
1397 (target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
1398 !(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
1399 }
1400
1401 private:
1402 ~Op() override {
1403 while (!out_handler.empty()) {
1404 delete out_handler.back();
1405 out_handler.pop_back();
1406 }
1407 }
1408 };
1409
1410 struct C_Op_Map_Latest : public Context {
1411 Objecter *objecter;
1412 ceph_tid_t tid;
1413 version_t latest;
1414 C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1415 latest(0) {}
1416 void finish(int r) override;
1417 };
1418
1419 struct C_Command_Map_Latest : public Context {
1420 Objecter *objecter;
1421 uint64_t tid;
1422 version_t latest;
1423 C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t),
1424 latest(0) {}
1425 void finish(int r) override;
1426 };
1427
1428 struct C_Stat : public Context {
1429 bufferlist bl;
1430 uint64_t *psize;
1431 ceph::real_time *pmtime;
1432 Context *fin;
1433 C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
1434 psize(ps), pmtime(pm), fin(c) {}
1435 void finish(int r) override {
1436 if (r >= 0) {
1437 bufferlist::iterator p = bl.begin();
1438 uint64_t s;
1439 ceph::real_time m;
1440 ::decode(s, p);
1441 ::decode(m, p);
1442 if (psize)
1443 *psize = s;
1444 if (pmtime)
1445 *pmtime = m;
1446 }
1447 fin->complete(r);
1448 }
1449 };
1450
1451 struct C_GetAttrs : public Context {
1452 bufferlist bl;
1453 map<string,bufferlist>& attrset;
1454 Context *fin;
1455 C_GetAttrs(map<string, bufferlist>& set, Context *c) : attrset(set),
1456 fin(c) {}
1457 void finish(int r) override {
1458 if (r >= 0) {
1459 bufferlist::iterator p = bl.begin();
1460 ::decode(attrset, p);
1461 }
1462 fin->complete(r);
1463 }
1464 };
1465
1466
1467 // Pools and statistics
1468 struct NListContext {
1469 collection_list_handle_t pos;
1470
1471 // these are for !sortbitwise compat only
1472 int current_pg = 0;
1473 int starting_pg_num = 0;
1474 bool sort_bitwise = false;
1475
1476 bool at_end_of_pool = false; ///< publicly visible end flag
1477
1478 int64_t pool_id = -1;
1479 int pool_snap_seq = 0;
1480 uint64_t max_entries = 0;
1481 string nspace;
1482
1483 bufferlist bl; // raw data read to here
1484 std::list<librados::ListObjectImpl> list;
1485
1486 bufferlist filter;
1487
1488 bufferlist extra_info;
1489
1490 // The budget associated with this context, once it is set (>= 0),
1491 // the budget is not get/released on OP basis, instead the budget
1492 // is acquired before sending the first OP and released upon receiving
1493 // the last op reply.
1494 int ctx_budget = -1;
1495
1496 bool at_end() const {
1497 return at_end_of_pool;
1498 }
1499
1500 uint32_t get_pg_hash_position() const {
1501 return pos.get_hash();
1502 }
1503 };
1504
1505 struct C_NList : public Context {
1506 NListContext *list_context;
1507 Context *final_finish;
1508 Objecter *objecter;
1509 epoch_t epoch;
1510 C_NList(NListContext *lc, Context * finish, Objecter *ob) :
1511 list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
1512 void finish(int r) override {
1513 if (r >= 0) {
1514 objecter->_nlist_reply(list_context, r, final_finish, epoch);
1515 } else {
1516 final_finish->complete(r);
1517 }
1518 }
1519 };
1520
1521 struct PoolStatOp {
1522 ceph_tid_t tid;
1523 list<string> pools;
1524
1525 map<string,pool_stat_t> *pool_stats;
1526 Context *onfinish;
1527 uint64_t ontimeout;
1528
1529 ceph::mono_time last_submit;
1530 };
1531
1532 struct StatfsOp {
1533 ceph_tid_t tid;
1534 struct ceph_statfs *stats;
1535 Context *onfinish;
1536 uint64_t ontimeout;
1537
1538 ceph::mono_time last_submit;
1539 };
1540
1541 struct PoolOp {
1542 ceph_tid_t tid;
1543 int64_t pool;
1544 string name;
1545 Context *onfinish;
1546 uint64_t ontimeout;
1547 int pool_op;
1548 uint64_t auid;
1549 int16_t crush_rule;
1550 snapid_t snapid;
1551 bufferlist *blp;
1552
1553 ceph::mono_time last_submit;
1554 PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0),
1555 auid(0), crush_rule(0), snapid(0), blp(NULL) {}
1556 };
1557
1558 // -- osd commands --
1559 struct CommandOp : public RefCountedObject {
1560 OSDSession *session = nullptr;
1561 ceph_tid_t tid = 0;
1562 vector<string> cmd;
1563 bufferlist inbl;
1564 bufferlist *poutbl = nullptr;
1565 string *prs = nullptr;
1566
1567 // target_osd == -1 means target_pg is valid
1568 const int target_osd = -1;
1569 const pg_t target_pg;
1570
1571 op_target_t target;
1572
1573 epoch_t map_dne_bound = 0;
1574 int map_check_error = 0; // error to return if map check fails
1575 const char *map_check_error_str = nullptr;
1576
1577 Context *onfinish = nullptr;
1578 uint64_t ontimeout = 0;
1579 ceph::mono_time last_submit;
1580
1581 CommandOp(
1582 int target_osd,
1583 const vector<string> &cmd,
1584 bufferlist inbl,
1585 bufferlist *poutbl,
1586 string *prs,
1587 Context *onfinish)
1588 : cmd(cmd),
1589 inbl(inbl),
1590 poutbl(poutbl),
1591 prs(prs),
1592 target_osd(target_osd),
1593 onfinish(onfinish) {}
1594
1595 CommandOp(
1596 pg_t pgid,
1597 const vector<string> &cmd,
1598 bufferlist inbl,
1599 bufferlist *poutbl,
1600 string *prs,
1601 Context *onfinish)
1602 : cmd(cmd),
1603 inbl(inbl),
1604 poutbl(poutbl),
1605 prs(prs),
1606 target_pg(pgid),
1607 target(pgid),
1608 onfinish(onfinish) {}
1609
1610 };
1611
1612 void submit_command(CommandOp *c, ceph_tid_t *ptid);
1613 int _calc_command_target(CommandOp *c, shunique_lock &sul);
1614 void _assign_command_session(CommandOp *c, shunique_lock &sul);
1615 void _send_command(CommandOp *c);
1616 int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
1617 void _finish_command(CommandOp *c, int r, string rs);
1618 void handle_command_reply(MCommandReply *m);
1619
1620
1621 // -- lingering ops --
1622
1623 struct WatchContext {
1624 // this simply mirrors librados WatchCtx2
1625 virtual void handle_notify(uint64_t notify_id,
1626 uint64_t cookie,
1627 uint64_t notifier_id,
1628 bufferlist& bl) = 0;
1629 virtual void handle_error(uint64_t cookie, int err) = 0;
1630 virtual ~WatchContext() {}
1631 };
1632
1633 struct LingerOp : public RefCountedObject {
1634 uint64_t linger_id;
1635
1636 op_target_t target;
1637
1638 snapid_t snap;
1639 SnapContext snapc;
1640 ceph::real_time mtime;
1641
1642 vector<OSDOp> ops;
1643 bufferlist inbl;
1644 bufferlist *poutbl;
1645 version_t *pobjver;
1646
1647 bool is_watch;
1648 ceph::mono_time watch_valid_thru; ///< send time for last acked ping
1649 int last_error; ///< error from last failed ping|reconnect, if any
1650 boost::shared_mutex watch_lock;
1651 using lock_guard = std::unique_lock<decltype(watch_lock)>;
1652 using unique_lock = std::unique_lock<decltype(watch_lock)>;
1653 using shared_lock = boost::shared_lock<decltype(watch_lock)>;
1654 using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>;
1655
1656 // queue of pending async operations, with the timestamp of
1657 // when they were queued.
1658 list<ceph::mono_time> watch_pending_async;
1659
1660 uint32_t register_gen;
1661 bool registered;
1662 bool canceled;
1663 Context *on_reg_commit;
1664
1665 // we trigger these from an async finisher
1666 Context *on_notify_finish;
1667 bufferlist *notify_result_bl;
1668 uint64_t notify_id;
1669
1670 WatchContext *watch_context;
1671
1672 OSDSession *session;
1673
1674 ceph_tid_t register_tid;
1675 ceph_tid_t ping_tid;
1676 epoch_t map_dne_bound;
1677
1678 void _queued_async() {
1679 // watch_lock ust be locked unique
1680 watch_pending_async.push_back(ceph::mono_clock::now());
1681 }
1682 void finished_async() {
1683 unique_lock l(watch_lock);
1684 assert(!watch_pending_async.empty());
1685 watch_pending_async.pop_front();
1686 }
1687
1688 LingerOp() : linger_id(0),
1689 target(object_t(), object_locator_t(), 0),
1690 snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
1691 is_watch(false), last_error(0),
1692 register_gen(0),
1693 registered(false),
1694 canceled(false),
1695 on_reg_commit(NULL),
1696 on_notify_finish(NULL),
1697 notify_result_bl(NULL),
1698 notify_id(0),
1699 watch_context(NULL),
1700 session(NULL),
1701 register_tid(0),
1702 ping_tid(0),
1703 map_dne_bound(0) {}
1704
1705 // no copy!
1706 const LingerOp &operator=(const LingerOp& r);
1707 LingerOp(const LingerOp& o);
1708
1709 uint64_t get_cookie() {
1710 return reinterpret_cast<uint64_t>(this);
1711 }
1712
1713 private:
1714 ~LingerOp() override {
1715 delete watch_context;
1716 }
1717 };
1718
1719 struct C_Linger_Commit : public Context {
1720 Objecter *objecter;
1721 LingerOp *info;
1722 bufferlist outbl; // used for notify only
1723 C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1724 info->get();
1725 }
1726 ~C_Linger_Commit() override {
1727 info->put();
1728 }
1729 void finish(int r) override {
1730 objecter->_linger_commit(info, r, outbl);
1731 }
1732 };
1733
1734 struct C_Linger_Reconnect : public Context {
1735 Objecter *objecter;
1736 LingerOp *info;
1737 C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {
1738 info->get();
1739 }
1740 ~C_Linger_Reconnect() override {
1741 info->put();
1742 }
1743 void finish(int r) override {
1744 objecter->_linger_reconnect(info, r);
1745 }
1746 };
1747
1748 struct C_Linger_Ping : public Context {
1749 Objecter *objecter;
1750 LingerOp *info;
1751 ceph::mono_time sent;
1752 uint32_t register_gen;
1753 C_Linger_Ping(Objecter *o, LingerOp *l)
1754 : objecter(o), info(l), register_gen(info->register_gen) {
1755 info->get();
1756 }
1757 ~C_Linger_Ping() override {
1758 info->put();
1759 }
1760 void finish(int r) override {
1761 objecter->_linger_ping(info, r, sent, register_gen);
1762 }
1763 };
1764
1765 struct C_Linger_Map_Latest : public Context {
1766 Objecter *objecter;
1767 uint64_t linger_id;
1768 version_t latest;
1769 C_Linger_Map_Latest(Objecter *o, uint64_t id) :
1770 objecter(o), linger_id(id), latest(0) {}
1771 void finish(int r) override;
1772 };
1773
1774 // -- osd sessions --
1775 struct OSDBackoff {
1776 spg_t pgid;
1777 uint64_t id;
1778 hobject_t begin, end;
1779 };
1780
1781 struct OSDSession : public RefCountedObject {
1782 boost::shared_mutex lock;
1783 using lock_guard = std::lock_guard<decltype(lock)>;
1784 using unique_lock = std::unique_lock<decltype(lock)>;
1785 using shared_lock = boost::shared_lock<decltype(lock)>;
1786 using shunique_lock = ceph::shunique_lock<decltype(lock)>;
1787
1788 // pending ops
1789 map<ceph_tid_t,Op*> ops;
1790 map<uint64_t, LingerOp*> linger_ops;
1791 map<ceph_tid_t,CommandOp*> command_ops;
1792
1793 // backoffs
1794 map<spg_t,map<hobject_t,OSDBackoff>> backoffs;
1795 map<uint64_t,OSDBackoff*> backoffs_by_id;
1796
1797 int osd;
1798 int incarnation;
1799 ConnectionRef con;
1800 int num_locks;
1801 std::unique_ptr<std::mutex[]> completion_locks;
1802 using unique_completion_lock = std::unique_lock<
1803 decltype(completion_locks)::element_type>;
1804
1805
1806 OSDSession(CephContext *cct, int o) :
1807 osd(o), incarnation(0), con(NULL),
1808 num_locks(cct->_conf->objecter_completion_locks_per_session),
1809 completion_locks(new std::mutex[num_locks]) {}
1810
1811 ~OSDSession() override;
1812
1813 bool is_homeless() { return (osd == -1); }
1814
1815 unique_completion_lock get_lock(object_t& oid);
1816 };
1817 map<int,OSDSession*> osd_sessions;
1818
1819 bool osdmap_full_flag() const;
1820 bool osdmap_pool_full(const int64_t pool_id) const;
1821
1822 private:
1823
1824 /**
1825 * Test pg_pool_t::FLAG_FULL on a pool
1826 *
1827 * @return true if the pool exists and has the flag set, or
1828 * the global full flag is set, else false
1829 */
1830 bool _osdmap_pool_full(const int64_t pool_id) const;
1831 bool _osdmap_pool_full(const pg_pool_t &p) const;
1832 void update_pool_full_map(map<int64_t, bool>& pool_full_map);
1833
1834 map<uint64_t, LingerOp*> linger_ops;
1835 // we use this just to confirm a cookie is valid before dereferencing the ptr
1836 set<LingerOp*> linger_ops_set;
1837
1838 map<ceph_tid_t,PoolStatOp*> poolstat_ops;
1839 map<ceph_tid_t,StatfsOp*> statfs_ops;
1840 map<ceph_tid_t,PoolOp*> pool_ops;
1841 atomic_t num_homeless_ops;
1842
1843 OSDSession *homeless_session;
1844
1845 // ops waiting for an osdmap with a new pool or confirmation that
1846 // the pool does not exist (may be expanded to other uses later)
1847 map<uint64_t, LingerOp*> check_latest_map_lingers;
1848 map<ceph_tid_t, Op*> check_latest_map_ops;
1849 map<ceph_tid_t, CommandOp*> check_latest_map_commands;
1850
1851 map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
1852
1853 ceph::timespan mon_timeout;
1854 ceph::timespan osd_timeout;
1855
1856 MOSDOp *_prepare_osd_op(Op *op);
1857 void _send_op(Op *op, MOSDOp *m = NULL);
1858 void _send_op_account(Op *op);
1859 void _cancel_linger_op(Op *op);
1860 void finish_op(OSDSession *session, ceph_tid_t tid);
1861 void _finish_op(Op *op, int r);
1862 static bool is_pg_changed(
1863 int oldprimary,
1864 const vector<int>& oldacting,
1865 int newprimary,
1866 const vector<int>& newacting,
1867 bool any_change=false);
1868 enum recalc_op_target_result {
1869 RECALC_OP_TARGET_NO_ACTION = 0,
1870 RECALC_OP_TARGET_NEED_RESEND,
1871 RECALC_OP_TARGET_POOL_DNE,
1872 RECALC_OP_TARGET_OSD_DNE,
1873 RECALC_OP_TARGET_OSD_DOWN,
1874 };
1875 bool _osdmap_full_flag() const;
1876 bool _osdmap_has_pool_full() const;
1877
1878 bool target_should_be_paused(op_target_t *op);
1879 int _calc_target(op_target_t *t, Connection *con,
1880 bool any_change = false);
1881 int _map_session(op_target_t *op, OSDSession **s,
1882 shunique_lock& lc);
1883
1884 void _session_op_assign(OSDSession *s, Op *op);
1885 void _session_op_remove(OSDSession *s, Op *op);
1886 void _session_linger_op_assign(OSDSession *to, LingerOp *op);
1887 void _session_linger_op_remove(OSDSession *from, LingerOp *op);
1888 void _session_command_op_assign(OSDSession *to, CommandOp *op);
1889 void _session_command_op_remove(OSDSession *from, CommandOp *op);
1890
1891 int _assign_op_target_session(Op *op, shunique_lock& lc,
1892 bool src_session_locked,
1893 bool dst_session_locked);
1894 int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc);
1895
1896 void _linger_submit(LingerOp *info, shunique_lock& sul);
1897 void _send_linger(LingerOp *info, shunique_lock& sul);
1898 void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
1899 void _linger_reconnect(LingerOp *info, int r);
1900 void _send_linger_ping(LingerOp *info);
1901 void _linger_ping(LingerOp *info, int r, ceph::mono_time sent,
1902 uint32_t register_gen);
1903 int _normalize_watch_error(int r);
1904
1905 friend class C_DoWatchError;
1906public:
1907 void linger_callback_flush(Context *ctx) {
1908 finisher->queue(ctx);
1909 }
1910
1911private:
1912 void _check_op_pool_dne(Op *op, unique_lock *sl);
1913 void _send_op_map_check(Op *op);
1914 void _op_cancel_map_check(Op *op);
1915 void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
1916 void _send_linger_map_check(LingerOp *op);
1917 void _linger_cancel_map_check(LingerOp *op);
1918 void _check_command_map_dne(CommandOp *op);
1919 void _send_command_map_check(CommandOp *op);
1920 void _command_cancel_map_check(CommandOp *op);
1921
1922 void kick_requests(OSDSession *session);
1923 void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend);
1924 void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul);
1925
1926 int _get_session(int osd, OSDSession **session, shunique_lock& sul);
1927 void put_session(OSDSession *s);
1928 void get_session(OSDSession *s);
1929 void _reopen_session(OSDSession *session);
1930 void close_session(OSDSession *session);
1931
1932 void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
1933 epoch_t reply_epoch);
1934
1935 void resend_mon_ops();
1936
1937 /**
1938 * handle a budget for in-flight ops
1939 * budget is taken whenever an op goes into the ops map
1940 * and returned whenever an op is removed from the map
1941 * If throttle_op needs to throttle it will unlock client_lock.
1942 */
1943 int calc_op_budget(Op *op);
1944 void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0);
1945 int _take_op_budget(Op *op, shunique_lock& sul) {
1946 assert(sul && sul.mutex() == &rwlock);
1947 int op_budget = calc_op_budget(op);
1948 if (keep_balanced_budget) {
1949 _throttle_op(op, sul, op_budget);
1950 } else {
1951 op_throttle_bytes.take(op_budget);
1952 op_throttle_ops.take(1);
1953 }
1954 op->budgeted = true;
1955 return op_budget;
1956 }
1957 void put_op_budget_bytes(int op_budget) {
1958 assert(op_budget >= 0);
1959 op_throttle_bytes.put(op_budget);
1960 op_throttle_ops.put(1);
1961 }
1962 void put_op_budget(Op *op) {
1963 assert(op->budgeted);
1964 int op_budget = calc_op_budget(op);
1965 put_op_budget_bytes(op_budget);
1966 }
1967 void put_nlist_context_budget(NListContext *list_context);
1968 Throttle op_throttle_bytes, op_throttle_ops;
1969
1970 public:
1971 Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
1972 Finisher *fin,
1973 double mon_timeout,
1974 double osd_timeout) :
1975 Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
1976 trace_endpoint("0.0.0.0", 0, "Objecter"),
1977 osdmap(new OSDMap), initialized(0), last_tid(0), client_inc(-1),
1978 max_linger_id(0), num_in_flight(0), global_op_flags(0),
1979 keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
1980 last_seen_osdmap_version(0), last_seen_pgmap_version(0),
1981 logger(NULL), tick_event(0), m_request_state_hook(NULL),
1982 num_homeless_ops(0),
1983 homeless_session(new OSDSession(cct, -1)),
1984 mon_timeout(ceph::make_timespan(mon_timeout)),
1985 osd_timeout(ceph::make_timespan(osd_timeout)),
1986 op_throttle_bytes(cct, "objecter_bytes",
1987 cct->_conf->objecter_inflight_op_bytes),
1988 op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
1989 epoch_barrier(0),
1990 retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
1991 { }
1992 ~Objecter() override;
1993
1994 void init();
1995 void start(const OSDMap *o = nullptr);
1996 void shutdown();
1997
1998 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
1999 // whatever functionality you want to use the OSDMap in a lambda like:
2000 //
2001 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2002 //
2003 // or
2004 //
2005 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2006 //
2007 // Do not call into something that will try to lock the OSDMap from
2008 // here or you will have great woe and misery.
2009
2010 template<typename Callback, typename...Args>
2011 auto with_osdmap(Callback&& cb, Args&&... args) const ->
2012 decltype(cb(*osdmap, std::forward<Args>(args)...)) {
2013 shared_lock l(rwlock);
2014 return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
2015 }
2016
2017
2018 /**
2019 * Tell the objecter to throttle outgoing ops according to its
2020 * budget (in _conf). If you do this, ops can block, in
2021 * which case it will unlock client_lock and sleep until
2022 * incoming messages reduce the used budget low enough for
2023 * the ops to continue going; then it will lock client_lock again.
2024 */
2025 void set_balanced_budget() { keep_balanced_budget = true; }
2026 void unset_balanced_budget() { keep_balanced_budget = false; }
2027
2028 void set_honor_osdmap_full() { honor_osdmap_full = true; }
2029 void unset_honor_osdmap_full() { honor_osdmap_full = false; }
2030
2031 void set_osdmap_full_try() { osdmap_full_try = true; }
2032 void unset_osdmap_full_try() { osdmap_full_try = false; }
2033
2034 void _scan_requests(OSDSession *s,
2035 bool force_resend,
2036 bool cluster_full,
2037 map<int64_t, bool> *pool_full_map,
2038 map<ceph_tid_t, Op*>& need_resend,
2039 list<LingerOp*>& need_resend_linger,
2040 map<ceph_tid_t, CommandOp*>& need_resend_command,
2041 shunique_lock& sul);
2042
2043 int64_t get_object_hash_position(int64_t pool, const string& key,
2044 const string& ns);
2045 int64_t get_object_pg_hash_position(int64_t pool, const string& key,
2046 const string& ns);
2047
2048 // messages
2049 public:
2050 bool ms_dispatch(Message *m) override;
2051 bool ms_can_fast_dispatch_any() const override {
2052 return true;
2053 }
2054 bool ms_can_fast_dispatch(const Message *m) const override {
2055 switch (m->get_type()) {
2056 case CEPH_MSG_OSD_OPREPLY:
2057 case CEPH_MSG_WATCH_NOTIFY:
2058 return true;
2059 default:
2060 return false;
2061 }
2062 }
2063 void ms_fast_dispatch(Message *m) override {
2064 ms_dispatch(m);
2065 }
2066
2067 void handle_osd_op_reply(class MOSDOpReply *m);
2068 void handle_osd_backoff(class MOSDBackoff *m);
2069 void handle_watch_notify(class MWatchNotify *m);
2070 void handle_osd_map(class MOSDMap *m);
2071 void wait_for_osd_map();
2072
2073 int pool_snap_by_name(int64_t poolid,
2074 const char *snap_name,
2075 snapid_t *snap) const;
2076 int pool_snap_get_info(int64_t poolid, snapid_t snap,
2077 pool_snap_info_t *info) const;
2078 int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps);
2079private:
2080
2081 // low-level
2082 void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid);
2083 void _op_submit_with_budget(Op *op, shunique_lock& lc,
2084 ceph_tid_t *ptid,
2085 int *ctx_budget = NULL);
2086 inline void unregister_op(Op *op);
2087
2088 // public interface
2089public:
2090 void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
2091 bool is_active() {
2092 shared_lock l(rwlock);
2093 return !((!inflight_ops.read()) && linger_ops.empty() &&
2094 poolstat_ops.empty() && statfs_ops.empty());
2095 }
2096
2097 /**
2098 * Output in-flight requests
2099 */
2100 void _dump_active(OSDSession *s);
2101 void _dump_active();
2102 void dump_active();
2103 void dump_requests(Formatter *fmt);
2104 void _dump_ops(const OSDSession *s, Formatter *fmt);
2105 void dump_ops(Formatter *fmt);
2106 void _dump_linger_ops(const OSDSession *s, Formatter *fmt);
2107 void dump_linger_ops(Formatter *fmt);
2108 void _dump_command_ops(const OSDSession *s, Formatter *fmt);
2109 void dump_command_ops(Formatter *fmt);
2110 void dump_pool_ops(Formatter *fmt) const;
2111 void dump_pool_stat_ops(Formatter *fmt) const;
2112 void dump_statfs_ops(Formatter *fmt) const;
2113
2114 int get_client_incarnation() const { return client_inc.read(); }
2115 void set_client_incarnation(int inc) { client_inc.set(inc); }
2116
2117 bool have_map(epoch_t epoch);
2118 /// wait for epoch; true if we already have it
2119 bool wait_for_map(epoch_t epoch, Context *c, int err=0);
2120 void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
2121 void wait_for_latest_osdmap(Context *fin);
2122 void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2123 void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
2124
2125 /** Get the current set of global op flags */
2126 int get_global_op_flags() { return global_op_flags.read(); }
2127 /** Add a flag to the global op flags, not really atomic operation */
2128 void add_global_op_flags(int flag) {
2129 global_op_flags.set(global_op_flags.read() | flag);
2130 }
2131 /** Clear the passed flags from the global op flag set, not really
2132 atomic operation */
2133 void clear_global_op_flag(int flags) {
2134 global_op_flags.set(global_op_flags.read() & ~flags);
2135 }
2136
2137 /// cancel an in-progress request with the given return code
2138private:
2139 int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
2140 int _op_cancel(ceph_tid_t tid, int r);
2141public:
2142 int op_cancel(ceph_tid_t tid, int r);
2143
2144 /**
2145 * Any write op which is in progress at the start of this call shall no
2146 * longer be in progress when this call ends. Operations started after the
2147 * start of this call may still be in progress when this call ends.
2148 *
2149 * @return the latest possible epoch in which a cancelled op could have
2150 * existed, or -1 if nothing was cancelled.
2151 */
2152 epoch_t op_cancel_writes(int r, int64_t pool=-1);
2153
2154 // commands
2155 void osd_command(int osd, const std::vector<string>& cmd,
2156 const bufferlist& inbl, ceph_tid_t *ptid,
2157 bufferlist *poutbl, string *prs, Context *onfinish) {
2158 assert(osd >= 0);
2159 CommandOp *c = new CommandOp(
2160 osd,
2161 cmd,
2162 inbl,
2163 poutbl,
2164 prs,
2165 onfinish);
2166 submit_command(c, ptid);
2167 }
2168 void pg_command(pg_t pgid, vector<string>& cmd,
2169 const bufferlist& inbl, ceph_tid_t *ptid,
2170 bufferlist *poutbl, string *prs, Context *onfinish) {
2171 CommandOp *c = new CommandOp(
2172 pgid,
2173 cmd,
2174 inbl,
2175 poutbl,
2176 prs,
2177 onfinish);
2178 submit_command(c, ptid);
2179 }
2180
2181 // mid-level helpers
2182 Op *prepare_mutate_op(
2183 const object_t& oid, const object_locator_t& oloc,
2184 ObjectOperation& op, const SnapContext& snapc,
2185 ceph::real_time mtime, int flags,
2186 Context *oncommit, version_t *objver = NULL,
2187 osd_reqid_t reqid = osd_reqid_t(),
2188 ZTracer::Trace *parent_trace = nullptr) {
2189 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() |
2190 CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace);
2191 o->priority = op.priority;
2192 o->mtime = mtime;
2193 o->snapc = snapc;
2194 o->out_rval.swap(op.out_rval);
2195 o->reqid = reqid;
2196 return o;
2197 }
2198 ceph_tid_t mutate(
2199 const object_t& oid, const object_locator_t& oloc,
2200 ObjectOperation& op, const SnapContext& snapc,
2201 ceph::real_time mtime, int flags,
2202 Context *oncommit, version_t *objver = NULL,
2203 osd_reqid_t reqid = osd_reqid_t()) {
2204 Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
2205 oncommit, objver, reqid);
2206 ceph_tid_t tid;
2207 op_submit(o, &tid);
2208 return tid;
2209 }
2210 Op *prepare_read_op(
2211 const object_t& oid, const object_locator_t& oloc,
2212 ObjectOperation& op,
2213 snapid_t snapid, bufferlist *pbl, int flags,
2214 Context *onack, version_t *objver = NULL,
2215 int *data_offset = NULL,
2216 uint64_t features = 0,
2217 ZTracer::Trace *parent_trace = nullptr) {
2218 Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() |
2219 CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace);
2220 o->priority = op.priority;
2221 o->snapid = snapid;
2222 o->outbl = pbl;
2223 if (!o->outbl && op.size() == 1 && op.out_bl[0]->length())
2224 o->outbl = op.out_bl[0];
2225 o->out_bl.swap(op.out_bl);
2226 o->out_handler.swap(op.out_handler);
2227 o->out_rval.swap(op.out_rval);
2228 return o;
2229 }
2230 ceph_tid_t read(
2231 const object_t& oid, const object_locator_t& oloc,
2232 ObjectOperation& op,
2233 snapid_t snapid, bufferlist *pbl, int flags,
2234 Context *onack, version_t *objver = NULL,
2235 int *data_offset = NULL,
2236 uint64_t features = 0) {
2237 Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
2238 data_offset);
2239 if (features)
2240 o->features = features;
2241 ceph_tid_t tid;
2242 op_submit(o, &tid);
2243 return tid;
2244 }
2245 Op *prepare_pg_read_op(
2246 uint32_t hash, object_locator_t oloc,
2247 ObjectOperation& op, bufferlist *pbl, int flags,
2248 Context *onack, epoch_t *reply_epoch,
2249 int *ctx_budget) {
2250 Op *o = new Op(object_t(), oloc,
2251 op.ops,
2252 flags | global_op_flags.read() | CEPH_OSD_FLAG_READ |
2253 CEPH_OSD_FLAG_IGNORE_OVERLAY,
2254 onack, NULL);
2255 o->target.precalc_pgid = true;
2256 o->target.base_pgid = pg_t(hash, oloc.pool);
2257 o->priority = op.priority;
2258 o->snapid = CEPH_NOSNAP;
2259 o->outbl = pbl;
2260 o->out_bl.swap(op.out_bl);
2261 o->out_handler.swap(op.out_handler);
2262 o->out_rval.swap(op.out_rval);
2263 o->reply_epoch = reply_epoch;
2264 if (ctx_budget) {
2265 // budget is tracked by listing context
2266 o->ctx_budgeted = true;
2267 }
2268 return o;
2269 }
2270 ceph_tid_t pg_read(
2271 uint32_t hash, object_locator_t oloc,
2272 ObjectOperation& op, bufferlist *pbl, int flags,
2273 Context *onack, epoch_t *reply_epoch,
2274 int *ctx_budget) {
2275 Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
2276 onack, reply_epoch, ctx_budget);
2277 ceph_tid_t tid;
2278 op_submit(o, &tid, ctx_budget);
2279 return tid;
2280 }
2281
2282 // caller owns a ref
2283 LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
2284 int flags);
2285 ceph_tid_t linger_watch(LingerOp *info,
2286 ObjectOperation& op,
2287 const SnapContext& snapc, ceph::real_time mtime,
2288 bufferlist& inbl,
2289 Context *onfinish,
2290 version_t *objver);
2291 ceph_tid_t linger_notify(LingerOp *info,
2292 ObjectOperation& op,
2293 snapid_t snap, bufferlist& inbl,
2294 bufferlist *poutbl,
2295 Context *onack,
2296 version_t *objver);
2297 int linger_check(LingerOp *info);
2298 void linger_cancel(LingerOp *info); // releases a reference
2299 void _linger_cancel(LingerOp *info);
2300
2301 void _do_watch_notify(LingerOp *info, MWatchNotify *m);
2302
2303 /**
2304 * set up initial ops in the op vector, and allocate a final op slot.
2305 *
2306 * The caller is responsible for filling in the final ops_count ops.
2307 *
2308 * @param ops op vector
2309 * @param ops_count number of final ops the caller will fill in
2310 * @param extra_ops pointer to [array of] initial op[s]
2311 * @return index of final op (for caller to fill in)
2312 */
2313 int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {
2314 int i;
2315 int extra = 0;
2316
2317 if (extra_ops)
2318 extra = extra_ops->ops.size();
2319
2320 ops.resize(ops_count + extra);
2321
2322 for (i=0; i<extra; i++) {
2323 ops[i] = extra_ops->ops[i];
2324 }
2325
2326 return i;
2327 }
2328
2329
2330 // high-level helpers
2331 Op *prepare_stat_op(
2332 const object_t& oid, const object_locator_t& oloc,
2333 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2334 int flags, Context *onfinish, version_t *objver = NULL,
2335 ObjectOperation *extra_ops = NULL) {
2336 vector<OSDOp> ops;
2337 int i = init_ops(ops, 1, extra_ops);
2338 ops[i].op.op = CEPH_OSD_OP_STAT;
2339 C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
2340 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2341 CEPH_OSD_FLAG_READ, fin, objver);
2342 o->snapid = snap;
2343 o->outbl = &fin->bl;
2344 return o;
2345 }
2346 ceph_tid_t stat(
2347 const object_t& oid, const object_locator_t& oloc,
2348 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
2349 int flags, Context *onfinish, version_t *objver = NULL,
2350 ObjectOperation *extra_ops = NULL) {
2351 Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
2352 onfinish, objver, extra_ops);
2353 ceph_tid_t tid;
2354 op_submit(o, &tid);
2355 return tid;
2356 }
2357
2358 Op *prepare_read_op(
2359 const object_t& oid, const object_locator_t& oloc,
2360 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2361 int flags, Context *onfinish, version_t *objver = NULL,
2362 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2363 ZTracer::Trace *parent_trace = nullptr) {
2364 vector<OSDOp> ops;
2365 int i = init_ops(ops, 1, extra_ops);
2366 ops[i].op.op = CEPH_OSD_OP_READ;
2367 ops[i].op.extent.offset = off;
2368 ops[i].op.extent.length = len;
2369 ops[i].op.extent.truncate_size = 0;
2370 ops[i].op.extent.truncate_seq = 0;
2371 ops[i].op.flags = op_flags;
2372 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2373 CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace);
2374 o->snapid = snap;
2375 o->outbl = pbl;
2376 return o;
2377 }
2378 ceph_tid_t read(
2379 const object_t& oid, const object_locator_t& oloc,
2380 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2381 int flags, Context *onfinish, version_t *objver = NULL,
2382 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2383 Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
2384 onfinish, objver, extra_ops, op_flags);
2385 ceph_tid_t tid;
2386 op_submit(o, &tid);
2387 return tid;
2388 }
2389
2390 Op *prepare_cmpext_op(
2391 const object_t& oid, const object_locator_t& oloc,
2392 uint64_t off, bufferlist &cmp_bl,
2393 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2394 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2395 vector<OSDOp> ops;
2396 int i = init_ops(ops, 1, extra_ops);
2397 ops[i].op.op = CEPH_OSD_OP_CMPEXT;
2398 ops[i].op.extent.offset = off;
2399 ops[i].op.extent.length = cmp_bl.length();
2400 ops[i].op.extent.truncate_size = 0;
2401 ops[i].op.extent.truncate_seq = 0;
2402 ops[i].indata = cmp_bl;
2403 ops[i].op.flags = op_flags;
2404 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2405 CEPH_OSD_FLAG_READ, onfinish, objver);
2406 o->snapid = snap;
2407 return o;
2408 }
2409
2410 ceph_tid_t cmpext(
2411 const object_t& oid, const object_locator_t& oloc,
2412 uint64_t off, bufferlist &cmp_bl,
2413 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
2414 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2415 Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
2416 flags, onfinish, objver, extra_ops, op_flags);
2417 ceph_tid_t tid;
2418 op_submit(o, &tid);
2419 return tid;
2420 }
2421
2422 ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
2423 uint64_t off, uint64_t len, snapid_t snap,
2424 bufferlist *pbl, int flags, uint64_t trunc_size,
2425 __u32 trunc_seq, Context *onfinish,
2426 version_t *objver = NULL,
2427 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2428 vector<OSDOp> ops;
2429 int i = init_ops(ops, 1, extra_ops);
2430 ops[i].op.op = CEPH_OSD_OP_READ;
2431 ops[i].op.extent.offset = off;
2432 ops[i].op.extent.length = len;
2433 ops[i].op.extent.truncate_size = trunc_size;
2434 ops[i].op.extent.truncate_seq = trunc_seq;
2435 ops[i].op.flags = op_flags;
2436 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2437 CEPH_OSD_FLAG_READ, onfinish, objver);
2438 o->snapid = snap;
2439 o->outbl = pbl;
2440 ceph_tid_t tid;
2441 op_submit(o, &tid);
2442 return tid;
2443 }
2444 ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc,
2445 uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl,
2446 int flags, Context *onfinish, version_t *objver = NULL,
2447 ObjectOperation *extra_ops = NULL) {
2448 vector<OSDOp> ops;
2449 int i = init_ops(ops, 1, extra_ops);
2450 ops[i].op.op = CEPH_OSD_OP_MAPEXT;
2451 ops[i].op.extent.offset = off;
2452 ops[i].op.extent.length = len;
2453 ops[i].op.extent.truncate_size = 0;
2454 ops[i].op.extent.truncate_seq = 0;
2455 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2456 CEPH_OSD_FLAG_READ, onfinish, objver);
2457 o->snapid = snap;
2458 o->outbl = pbl;
2459 ceph_tid_t tid;
2460 op_submit(o, &tid);
2461 return tid;
2462 }
2463 ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
2464 const char *name, snapid_t snap, bufferlist *pbl, int flags,
2465 Context *onfinish,
2466 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2467 vector<OSDOp> ops;
2468 int i = init_ops(ops, 1, extra_ops);
2469 ops[i].op.op = CEPH_OSD_OP_GETXATTR;
2470 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2471 ops[i].op.xattr.value_len = 0;
2472 if (name)
2473 ops[i].indata.append(name);
2474 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2475 CEPH_OSD_FLAG_READ, onfinish, objver);
2476 o->snapid = snap;
2477 o->outbl = pbl;
2478 ceph_tid_t tid;
2479 op_submit(o, &tid);
2480 return tid;
2481 }
2482
2483 ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc,
2484 snapid_t snap, map<string,bufferlist>& attrset,
2485 int flags, Context *onfinish, version_t *objver = NULL,
2486 ObjectOperation *extra_ops = NULL) {
2487 vector<OSDOp> ops;
2488 int i = init_ops(ops, 1, extra_ops);
2489 ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
2490 C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
2491 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2492 CEPH_OSD_FLAG_READ, fin, objver);
2493 o->snapid = snap;
2494 o->outbl = &fin->bl;
2495 ceph_tid_t tid;
2496 op_submit(o, &tid);
2497 return tid;
2498 }
2499
2500 ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc,
2501 snapid_t snap, bufferlist *pbl, int flags,
2502 Context *onfinish, version_t *objver = NULL,
2503 ObjectOperation *extra_ops = NULL) {
2504 return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags.read() |
2505 CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
2506 }
2507
2508
2509 // writes
2510 ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
2511 vector<OSDOp>& ops, ceph::real_time mtime,
2512 const SnapContext& snapc, int flags,
2513 Context *oncommit,
2514 version_t *objver = NULL) {
2515 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2516 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2517 o->mtime = mtime;
2518 o->snapc = snapc;
2519 ceph_tid_t tid;
2520 op_submit(o, &tid);
2521 return tid;
2522 }
2523 Op *prepare_write_op(
2524 const object_t& oid, const object_locator_t& oloc,
2525 uint64_t off, uint64_t len, const SnapContext& snapc,
2526 const bufferlist &bl, ceph::real_time mtime, int flags,
2527 Context *oncommit, version_t *objver = NULL,
2528 ObjectOperation *extra_ops = NULL, int op_flags = 0,
2529 ZTracer::Trace *parent_trace = nullptr) {
2530 vector<OSDOp> ops;
2531 int i = init_ops(ops, 1, extra_ops);
2532 ops[i].op.op = CEPH_OSD_OP_WRITE;
2533 ops[i].op.extent.offset = off;
2534 ops[i].op.extent.length = len;
2535 ops[i].op.extent.truncate_size = 0;
2536 ops[i].op.extent.truncate_seq = 0;
2537 ops[i].indata = bl;
2538 ops[i].op.flags = op_flags;
2539 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2540 CEPH_OSD_FLAG_WRITE, oncommit, objver,
2541 nullptr, parent_trace);
2542 o->mtime = mtime;
2543 o->snapc = snapc;
2544 return o;
2545 }
2546 ceph_tid_t write(
2547 const object_t& oid, const object_locator_t& oloc,
2548 uint64_t off, uint64_t len, const SnapContext& snapc,
2549 const bufferlist &bl, ceph::real_time mtime, int flags,
2550 Context *oncommit, version_t *objver = NULL,
2551 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2552 Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
2553 oncommit, objver, extra_ops, op_flags);
2554 ceph_tid_t tid;
2555 op_submit(o, &tid);
2556 return tid;
2557 }
2558 Op *prepare_append_op(
2559 const object_t& oid, const object_locator_t& oloc,
2560 uint64_t len, const SnapContext& snapc,
2561 const bufferlist &bl, ceph::real_time mtime, int flags,
2562 Context *oncommit,
2563 version_t *objver = NULL,
2564 ObjectOperation *extra_ops = NULL) {
2565 vector<OSDOp> ops;
2566 int i = init_ops(ops, 1, extra_ops);
2567 ops[i].op.op = CEPH_OSD_OP_APPEND;
2568 ops[i].op.extent.offset = 0;
2569 ops[i].op.extent.length = len;
2570 ops[i].op.extent.truncate_size = 0;
2571 ops[i].op.extent.truncate_seq = 0;
2572 ops[i].indata = bl;
2573 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2574 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2575 o->mtime = mtime;
2576 o->snapc = snapc;
2577 return o;
2578 }
2579 ceph_tid_t append(
2580 const object_t& oid, const object_locator_t& oloc,
2581 uint64_t len, const SnapContext& snapc,
2582 const bufferlist &bl, ceph::real_time mtime, int flags,
2583 Context *oncommit,
2584 version_t *objver = NULL,
2585 ObjectOperation *extra_ops = NULL) {
2586 Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
2587 oncommit, objver, extra_ops);
2588 ceph_tid_t tid;
2589 op_submit(o, &tid);
2590 return tid;
2591 }
2592 ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
2593 uint64_t off, uint64_t len, const SnapContext& snapc,
2594 const bufferlist &bl, ceph::real_time mtime, int flags,
2595 uint64_t trunc_size, __u32 trunc_seq,
2596 Context *oncommit,
2597 version_t *objver = NULL,
2598 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2599 vector<OSDOp> ops;
2600 int i = init_ops(ops, 1, extra_ops);
2601 ops[i].op.op = CEPH_OSD_OP_WRITE;
2602 ops[i].op.extent.offset = off;
2603 ops[i].op.extent.length = len;
2604 ops[i].op.extent.truncate_size = trunc_size;
2605 ops[i].op.extent.truncate_seq = trunc_seq;
2606 ops[i].indata = bl;
2607 ops[i].op.flags = op_flags;
2608 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2609 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2610 o->mtime = mtime;
2611 o->snapc = snapc;
2612 ceph_tid_t tid;
2613 op_submit(o, &tid);
2614 return tid;
2615 }
2616 Op *prepare_write_full_op(
2617 const object_t& oid, const object_locator_t& oloc,
2618 const SnapContext& snapc, const bufferlist &bl,
2619 ceph::real_time mtime, int flags,
2620 Context *oncommit, version_t *objver = NULL,
2621 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2622 vector<OSDOp> ops;
2623 int i = init_ops(ops, 1, extra_ops);
2624 ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
2625 ops[i].op.extent.offset = 0;
2626 ops[i].op.extent.length = bl.length();
2627 ops[i].indata = bl;
2628 ops[i].op.flags = op_flags;
2629 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2630 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2631 o->mtime = mtime;
2632 o->snapc = snapc;
2633 return o;
2634 }
2635 ceph_tid_t write_full(
2636 const object_t& oid, const object_locator_t& oloc,
2637 const SnapContext& snapc, const bufferlist &bl,
2638 ceph::real_time mtime, int flags,
2639 Context *oncommit, version_t *objver = NULL,
2640 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2641 Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
2642 oncommit, objver, extra_ops, op_flags);
2643 ceph_tid_t tid;
2644 op_submit(o, &tid);
2645 return tid;
2646 }
2647 Op *prepare_writesame_op(
2648 const object_t& oid, const object_locator_t& oloc,
2649 uint64_t write_len, uint64_t off,
2650 const SnapContext& snapc, const bufferlist &bl,
2651 ceph::real_time mtime, int flags,
2652 Context *oncommit, version_t *objver = NULL,
2653 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2654
2655 vector<OSDOp> ops;
2656 int i = init_ops(ops, 1, extra_ops);
2657 ops[i].op.op = CEPH_OSD_OP_WRITESAME;
2658 ops[i].op.writesame.offset = off;
2659 ops[i].op.writesame.length = write_len;
2660 ops[i].op.writesame.data_length = bl.length();
2661 ops[i].indata = bl;
2662 ops[i].op.flags = op_flags;
2663 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2664 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2665 o->mtime = mtime;
2666 o->snapc = snapc;
2667 return o;
2668 }
2669 ceph_tid_t writesame(
2670 const object_t& oid, const object_locator_t& oloc,
2671 uint64_t write_len, uint64_t off,
2672 const SnapContext& snapc, const bufferlist &bl,
2673 ceph::real_time mtime, int flags,
2674 Context *oncommit, version_t *objver = NULL,
2675 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
2676
2677 Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
2678 mtime, flags, oncommit, objver,
2679 extra_ops, op_flags);
2680
2681 ceph_tid_t tid;
2682 op_submit(o, &tid);
2683 return tid;
2684 }
2685 ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
2686 const SnapContext& snapc, ceph::real_time mtime, int flags,
2687 uint64_t trunc_size, __u32 trunc_seq,
2688 Context *oncommit, version_t *objver = NULL,
2689 ObjectOperation *extra_ops = NULL) {
2690 vector<OSDOp> ops;
2691 int i = init_ops(ops, 1, extra_ops);
2692 ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
2693 ops[i].op.extent.offset = trunc_size;
2694 ops[i].op.extent.truncate_size = trunc_size;
2695 ops[i].op.extent.truncate_seq = trunc_seq;
2696 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2697 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2698 o->mtime = mtime;
2699 o->snapc = snapc;
2700 ceph_tid_t tid;
2701 op_submit(o, &tid);
2702 return tid;
2703 }
2704 ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
2705 uint64_t off, uint64_t len, const SnapContext& snapc,
2706 ceph::real_time mtime, int flags, Context *oncommit,
2707 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2708 vector<OSDOp> ops;
2709 int i = init_ops(ops, 1, extra_ops);
2710 ops[i].op.op = CEPH_OSD_OP_ZERO;
2711 ops[i].op.extent.offset = off;
2712 ops[i].op.extent.length = len;
2713 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2714 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2715 o->mtime = mtime;
2716 o->snapc = snapc;
2717 ceph_tid_t tid;
2718 op_submit(o, &tid);
2719 return tid;
2720 }
2721 ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
2722 const SnapContext& snapc, snapid_t snapid,
2723 ceph::real_time mtime, Context *oncommit,
2724 version_t *objver = NULL,
2725 ObjectOperation *extra_ops = NULL) {
2726 vector<OSDOp> ops;
2727 int i = init_ops(ops, 1, extra_ops);
2728 ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
2729 ops[i].op.snap.snapid = snapid;
2730 Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver);
2731 o->mtime = mtime;
2732 o->snapc = snapc;
2733 ceph_tid_t tid;
2734 op_submit(o, &tid);
2735 return tid;
2736 }
2737 ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
2738 const SnapContext& snapc, ceph::real_time mtime, int global_flags,
2739 int create_flags, Context *oncommit,
2740 version_t *objver = NULL,
2741 ObjectOperation *extra_ops = NULL) {
2742 vector<OSDOp> ops;
2743 int i = init_ops(ops, 1, extra_ops);
2744 ops[i].op.op = CEPH_OSD_OP_CREATE;
2745 ops[i].op.flags = create_flags;
2746 Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags.read() |
2747 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2748 o->mtime = mtime;
2749 o->snapc = snapc;
2750 ceph_tid_t tid;
2751 op_submit(o, &tid);
2752 return tid;
2753 }
2754 Op *prepare_remove_op(
2755 const object_t& oid, const object_locator_t& oloc,
2756 const SnapContext& snapc, ceph::real_time mtime, int flags,
2757 Context *oncommit,
2758 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2759 vector<OSDOp> ops;
2760 int i = init_ops(ops, 1, extra_ops);
2761 ops[i].op.op = CEPH_OSD_OP_DELETE;
2762 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2763 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2764 o->mtime = mtime;
2765 o->snapc = snapc;
2766 return o;
2767 }
2768 ceph_tid_t remove(
2769 const object_t& oid, const object_locator_t& oloc,
2770 const SnapContext& snapc, ceph::real_time mtime, int flags,
2771 Context *oncommit,
2772 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2773 Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
2774 oncommit, objver, extra_ops);
2775 ceph_tid_t tid;
2776 op_submit(o, &tid);
2777 return tid;
2778 }
2779
2780 ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
2781 const char *name, const SnapContext& snapc, const bufferlist &bl,
2782 ceph::real_time mtime, int flags,
2783 Context *oncommit,
2784 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2785 vector<OSDOp> ops;
2786 int i = init_ops(ops, 1, extra_ops);
2787 ops[i].op.op = CEPH_OSD_OP_SETXATTR;
2788 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2789 ops[i].op.xattr.value_len = bl.length();
2790 if (name)
2791 ops[i].indata.append(name);
2792 ops[i].indata.append(bl);
2793 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2794 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2795 o->mtime = mtime;
2796 o->snapc = snapc;
2797 ceph_tid_t tid;
2798 op_submit(o, &tid);
2799 return tid;
2800 }
2801 ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
2802 const char *name, const SnapContext& snapc,
2803 ceph::real_time mtime, int flags,
2804 Context *oncommit,
2805 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
2806 vector<OSDOp> ops;
2807 int i = init_ops(ops, 1, extra_ops);
2808 ops[i].op.op = CEPH_OSD_OP_RMXATTR;
2809 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
2810 ops[i].op.xattr.value_len = 0;
2811 if (name)
2812 ops[i].indata.append(name);
2813 Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
2814 CEPH_OSD_FLAG_WRITE, oncommit, objver);
2815 o->mtime = mtime;
2816 o->snapc = snapc;
2817 ceph_tid_t tid;
2818 op_submit(o, &tid);
2819 return tid;
2820 }
2821
2822 void list_nobjects(NListContext *p, Context *onfinish);
2823 uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
2824 uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c);
2825 void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);
2826
2827 hobject_t enumerate_objects_begin();
2828 hobject_t enumerate_objects_end();
2829 //hobject_t enumerate_objects_begin(int n, int m);
2830 void enumerate_objects(
2831 int64_t pool_id,
2832 const std::string &ns,
2833 const hobject_t &start,
2834 const hobject_t &end,
2835 const uint32_t max,
2836 const bufferlist &filter_bl,
2837 std::list<librados::ListObjectImpl> *result,
2838 hobject_t *next,
2839 Context *on_finish);
2840
2841 void _enumerate_reply(
2842 bufferlist &bl,
2843 int r,
2844 const hobject_t &end,
2845 const int64_t pool_id,
2846 int budget,
2847 epoch_t reply_epoch,
2848 std::list<librados::ListObjectImpl> *result,
2849 hobject_t *next,
2850 Context *on_finish);
2851 friend class C_EnumerateReply;
2852
2853 // -------------------------
2854 // pool ops
2855private:
2856 void pool_op_submit(PoolOp *op);
2857 void _pool_op_submit(PoolOp *op);
2858 void _finish_pool_op(PoolOp *op, int r);
2859 void _do_delete_pool(int64_t pool, Context *onfinish);
2860public:
2861 int create_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2862 int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
2863 Context *onfinish);
2864 int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish);
2865 int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish);
2866
2867 int create_pool(string& name, Context *onfinish, uint64_t auid=0,
2868 int crush_rule=-1);
2869 int delete_pool(int64_t pool, Context *onfinish);
2870 int delete_pool(const string& name, Context *onfinish);
2871 int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid);
2872
2873 void handle_pool_op_reply(MPoolOpReply *m);
2874 int pool_op_cancel(ceph_tid_t tid, int r);
2875
2876 // --------------------------
2877 // pool stats
2878private:
2879 void _poolstat_submit(PoolStatOp *op);
2880public:
2881 void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
2882 void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
2883 Context *onfinish);
2884 int pool_stat_op_cancel(ceph_tid_t tid, int r);
2885 void _finish_pool_stat_op(PoolStatOp *op, int r);
2886
2887 // ---------------------------
2888 // df stats
2889private:
2890 void _fs_stats_submit(StatfsOp *op);
2891public:
2892 void handle_fs_stats_reply(MStatfsReply *m);
2893 void get_fs_stats(struct ceph_statfs& result, Context *onfinish);
2894 int statfs_op_cancel(ceph_tid_t tid, int r);
2895 void _finish_statfs_op(StatfsOp *op, int r);
2896
2897 // ---------------------------
2898 // some scatter/gather hackery
2899
2900 void _sg_read_finish(vector<ObjectExtent>& extents,
2901 vector<bufferlist>& resultbl,
2902 bufferlist *bl, Context *onfinish);
2903
2904 struct C_SGRead : public Context {
2905 Objecter *objecter;
2906 vector<ObjectExtent> extents;
2907 vector<bufferlist> resultbl;
2908 bufferlist *bl;
2909 Context *onfinish;
2910 C_SGRead(Objecter *ob,
2911 vector<ObjectExtent>& e, vector<bufferlist>& r, bufferlist *b,
2912 Context *c) :
2913 objecter(ob), bl(b), onfinish(c) {
2914 extents.swap(e);
2915 resultbl.swap(r);
2916 }
2917 void finish(int r) override {
2918 objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
2919 }
2920 };
2921
2922 void sg_read_trunc(vector<ObjectExtent>& extents, snapid_t snap,
2923 bufferlist *bl, int flags, uint64_t trunc_size,
2924 __u32 trunc_seq, Context *onfinish, int op_flags = 0) {
2925 if (extents.size() == 1) {
2926 read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
2927 extents[0].length, snap, bl, flags, extents[0].truncate_size,
2928 trunc_seq, onfinish, 0, 0, op_flags);
2929 } else {
2930 C_GatherBuilder gather(cct);
2931 vector<bufferlist> resultbl(extents.size());
2932 int i=0;
2933 for (vector<ObjectExtent>::iterator p = extents.begin();
2934 p != extents.end();
2935 ++p) {
2936 read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
2937 flags, p->truncate_size, trunc_seq, gather.new_sub(),
2938 0, 0, op_flags);
2939 }
2940 gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
2941 gather.activate();
2942 }
2943 }
2944
2945 void sg_read(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl,
2946 int flags, Context *onfinish, int op_flags = 0) {
2947 sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
2948 }
2949
2950 void sg_write_trunc(vector<ObjectExtent>& extents, const SnapContext& snapc,
2951 const bufferlist& bl, ceph::real_time mtime, int flags,
2952 uint64_t trunc_size, __u32 trunc_seq,
2953 Context *oncommit, int op_flags = 0) {
2954 if (extents.size() == 1) {
2955 write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
2956 extents[0].length, snapc, bl, mtime, flags,
2957 extents[0].truncate_size, trunc_seq, oncommit,
2958 0, 0, op_flags);
2959 } else {
2960 C_GatherBuilder gcom(cct, oncommit);
2961 for (vector<ObjectExtent>::iterator p = extents.begin();
2962 p != extents.end();
2963 ++p) {
2964 bufferlist cur;
2965 for (vector<pair<uint64_t,uint64_t> >::iterator bit
2966 = p->buffer_extents.begin();
2967 bit != p->buffer_extents.end();
2968 ++bit)
2969 bl.copy(bit->first, bit->second, cur);
2970 assert(cur.length() == p->length);
2971 write_trunc(p->oid, p->oloc, p->offset, p->length,
2972 snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
2973 oncommit ? gcom.new_sub():0,
2974 0, 0, op_flags);
2975 }
2976 gcom.activate();
2977 }
2978 }
2979
2980 void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc,
2981 const bufferlist& bl, ceph::real_time mtime, int flags,
2982 Context *oncommit, int op_flags = 0) {
2983 sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
2984 op_flags);
2985 }
2986
2987 void ms_handle_connect(Connection *con) override;
2988 bool ms_handle_reset(Connection *con) override;
2989 void ms_handle_remote_reset(Connection *con) override;
2990 bool ms_handle_refused(Connection *con) override;
2991 bool ms_get_authorizer(int dest_type,
2992 AuthAuthorizer **authorizer,
2993 bool force_new) override;
2994
2995 void blacklist_self(bool set);
2996
2997private:
2998 epoch_t epoch_barrier;
2999 bool retry_writes_after_first_reply;
3000public:
3001 void set_epoch_barrier(epoch_t epoch);
3002
3003 PerfCounters *get_logger() {
3004 return logger;
3005 }
3006};
3007
3008#endif