]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_OBJECTER_H | |
16 | #define CEPH_OBJECTER_H | |
17 | ||
18 | #include <condition_variable> | |
19 | #include <list> | |
20 | #include <map> | |
21 | #include <mutex> | |
22 | #include <memory> | |
23 | #include <sstream> | |
24 | #include <type_traits> | |
25 | ||
26 | #include <boost/thread/shared_mutex.hpp> | |
27 | ||
28 | #include "include/assert.h" | |
29 | #include "include/buffer.h" | |
30 | #include "include/types.h" | |
31 | #include "include/rados/rados_types.hpp" | |
32 | ||
33 | #include "common/admin_socket.h" | |
34 | #include "common/ceph_time.h" | |
35 | #include "common/ceph_timer.h" | |
36 | #include "common/Finisher.h" | |
37 | #include "common/shunique_lock.h" | |
38 | #include "common/zipkin_trace.h" | |
39 | ||
40 | #include "messages/MOSDOp.h" | |
41 | #include "osd/OSDMap.h" | |
42 | ||
43 | using namespace std; | |
44 | ||
45 | class Context; | |
46 | class Messenger; | |
47 | class OSDMap; | |
48 | class MonClient; | |
49 | class Message; | |
50 | class Finisher; | |
51 | ||
52 | class MPoolOpReply; | |
53 | ||
54 | class MGetPoolStatsReply; | |
55 | class MStatfsReply; | |
56 | class MCommandReply; | |
57 | class MWatchNotify; | |
58 | ||
59 | class PerfCounters; | |
60 | ||
61 | // ----------------------------------------- | |
62 | ||
63 | struct ObjectOperation { | |
64 | vector<OSDOp> ops; | |
65 | int flags; | |
66 | int priority; | |
67 | ||
68 | vector<bufferlist*> out_bl; | |
69 | vector<Context*> out_handler; | |
70 | vector<int*> out_rval; | |
71 | ||
72 | ObjectOperation() : flags(0), priority(0) {} | |
73 | ~ObjectOperation() { | |
74 | while (!out_handler.empty()) { | |
75 | delete out_handler.back(); | |
76 | out_handler.pop_back(); | |
77 | } | |
78 | } | |
79 | ||
80 | size_t size() { | |
81 | return ops.size(); | |
82 | } | |
83 | ||
84 | void set_last_op_flags(int flags) { | |
85 | assert(!ops.empty()); | |
86 | ops.rbegin()->op.flags = flags; | |
87 | } | |
88 | ||
89 | class C_TwoContexts; | |
90 | /** | |
91 | * Add a callback to run when this operation completes, | |
92 | * after any other callbacks for it. | |
93 | */ | |
94 | void add_handler(Context *extra); | |
95 | ||
96 | OSDOp& add_op(int op) { | |
97 | int s = ops.size(); | |
98 | ops.resize(s+1); | |
99 | ops[s].op.op = op; | |
100 | out_bl.resize(s+1); | |
101 | out_bl[s] = NULL; | |
102 | out_handler.resize(s+1); | |
103 | out_handler[s] = NULL; | |
104 | out_rval.resize(s+1); | |
105 | out_rval[s] = NULL; | |
106 | return ops[s]; | |
107 | } | |
108 | void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) { | |
109 | OSDOp& osd_op = add_op(op); | |
110 | osd_op.op.extent.offset = off; | |
111 | osd_op.op.extent.length = len; | |
112 | osd_op.indata.claim_append(bl); | |
113 | } | |
114 | void add_writesame(int op, uint64_t off, uint64_t write_len, | |
115 | bufferlist& bl) { | |
116 | OSDOp& osd_op = add_op(op); | |
117 | osd_op.op.writesame.offset = off; | |
118 | osd_op.op.writesame.length = write_len; | |
119 | osd_op.op.writesame.data_length = bl.length(); | |
120 | osd_op.indata.claim_append(bl); | |
121 | } | |
122 | void add_xattr(int op, const char *name, const bufferlist& data) { | |
123 | OSDOp& osd_op = add_op(op); | |
124 | osd_op.op.xattr.name_len = (name ? strlen(name) : 0); | |
125 | osd_op.op.xattr.value_len = data.length(); | |
126 | if (name) | |
127 | osd_op.indata.append(name); | |
128 | osd_op.indata.append(data); | |
129 | } | |
130 | void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, | |
131 | uint8_t cmp_mode, const bufferlist& data) { | |
132 | OSDOp& osd_op = add_op(op); | |
133 | osd_op.op.xattr.name_len = (name ? strlen(name) : 0); | |
134 | osd_op.op.xattr.value_len = data.length(); | |
135 | osd_op.op.xattr.cmp_op = cmp_op; | |
136 | osd_op.op.xattr.cmp_mode = cmp_mode; | |
137 | if (name) | |
138 | osd_op.indata.append(name); | |
139 | osd_op.indata.append(data); | |
140 | } | |
141 | void add_call(int op, const char *cname, const char *method, | |
142 | bufferlist &indata, | |
143 | bufferlist *outbl, Context *ctx, int *prval) { | |
144 | OSDOp& osd_op = add_op(op); | |
145 | ||
146 | unsigned p = ops.size() - 1; | |
147 | out_handler[p] = ctx; | |
148 | out_bl[p] = outbl; | |
149 | out_rval[p] = prval; | |
150 | ||
151 | osd_op.op.cls.class_len = strlen(cname); | |
152 | osd_op.op.cls.method_len = strlen(method); | |
153 | osd_op.op.cls.indata_len = indata.length(); | |
154 | osd_op.indata.append(cname, osd_op.op.cls.class_len); | |
155 | osd_op.indata.append(method, osd_op.op.cls.method_len); | |
156 | osd_op.indata.append(indata); | |
157 | } | |
158 | void add_pgls(int op, uint64_t count, collection_list_handle_t cookie, | |
159 | epoch_t start_epoch) { | |
160 | OSDOp& osd_op = add_op(op); | |
161 | osd_op.op.pgls.count = count; | |
162 | osd_op.op.pgls.start_epoch = start_epoch; | |
163 | ::encode(cookie, osd_op.indata); | |
164 | } | |
165 | void add_pgls_filter(int op, uint64_t count, const bufferlist& filter, | |
166 | collection_list_handle_t cookie, epoch_t start_epoch) { | |
167 | OSDOp& osd_op = add_op(op); | |
168 | osd_op.op.pgls.count = count; | |
169 | osd_op.op.pgls.start_epoch = start_epoch; | |
170 | string cname = "pg"; | |
171 | string mname = "filter"; | |
172 | ::encode(cname, osd_op.indata); | |
173 | ::encode(mname, osd_op.indata); | |
174 | osd_op.indata.append(filter); | |
175 | ::encode(cookie, osd_op.indata); | |
176 | } | |
177 | void add_alloc_hint(int op, uint64_t expected_object_size, | |
178 | uint64_t expected_write_size, | |
179 | uint32_t flags) { | |
180 | OSDOp& osd_op = add_op(op); | |
181 | osd_op.op.alloc_hint.expected_object_size = expected_object_size; | |
182 | osd_op.op.alloc_hint.expected_write_size = expected_write_size; | |
183 | osd_op.op.alloc_hint.flags = flags; | |
184 | } | |
185 | ||
186 | // ------ | |
187 | ||
188 | // pg | |
189 | void pg_ls(uint64_t count, bufferlist& filter, | |
190 | collection_list_handle_t cookie, epoch_t start_epoch) { | |
191 | if (filter.length() == 0) | |
192 | add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch); | |
193 | else | |
194 | add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie, | |
195 | start_epoch); | |
196 | flags |= CEPH_OSD_FLAG_PGOP; | |
197 | } | |
198 | ||
199 | void pg_nls(uint64_t count, const bufferlist& filter, | |
200 | collection_list_handle_t cookie, epoch_t start_epoch) { | |
201 | if (filter.length() == 0) | |
202 | add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch); | |
203 | else | |
204 | add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie, | |
205 | start_epoch); | |
206 | flags |= CEPH_OSD_FLAG_PGOP; | |
207 | } | |
208 | ||
209 | void scrub_ls(const librados::object_id_t& start_after, | |
210 | uint64_t max_to_get, | |
211 | std::vector<librados::inconsistent_obj_t> *objects, | |
212 | uint32_t *interval, | |
213 | int *rval); | |
214 | void scrub_ls(const librados::object_id_t& start_after, | |
215 | uint64_t max_to_get, | |
216 | std::vector<librados::inconsistent_snapset_t> *objects, | |
217 | uint32_t *interval, | |
218 | int *rval); | |
219 | ||
220 | void create(bool excl) { | |
221 | OSDOp& o = add_op(CEPH_OSD_OP_CREATE); | |
222 | o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0); | |
223 | } | |
224 | ||
225 | struct C_ObjectOperation_stat : public Context { | |
226 | bufferlist bl; | |
227 | uint64_t *psize; | |
228 | ceph::real_time *pmtime; | |
229 | time_t *ptime; | |
230 | struct timespec *pts; | |
231 | int *prval; | |
232 | C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts, | |
233 | int *prval) | |
234 | : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {} | |
235 | void finish(int r) override { | |
236 | if (r >= 0) { | |
237 | bufferlist::iterator p = bl.begin(); | |
238 | try { | |
239 | uint64_t size; | |
240 | ceph::real_time mtime; | |
241 | ::decode(size, p); | |
242 | ::decode(mtime, p); | |
243 | if (psize) | |
244 | *psize = size; | |
245 | if (pmtime) | |
246 | *pmtime = mtime; | |
247 | if (ptime) | |
248 | *ptime = ceph::real_clock::to_time_t(mtime); | |
249 | if (pts) | |
250 | *pts = ceph::real_clock::to_timespec(mtime); | |
251 | } catch (buffer::error& e) { | |
252 | if (prval) | |
253 | *prval = -EIO; | |
254 | } | |
255 | } | |
256 | } | |
257 | }; | |
258 | void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) { | |
259 | add_op(CEPH_OSD_OP_STAT); | |
260 | unsigned p = ops.size() - 1; | |
261 | C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL, | |
262 | prval); | |
263 | out_bl[p] = &h->bl; | |
264 | out_handler[p] = h; | |
265 | out_rval[p] = prval; | |
266 | } | |
267 | void stat(uint64_t *psize, time_t *ptime, int *prval) { | |
268 | add_op(CEPH_OSD_OP_STAT); | |
269 | unsigned p = ops.size() - 1; | |
270 | C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL, | |
271 | prval); | |
272 | out_bl[p] = &h->bl; | |
273 | out_handler[p] = h; | |
274 | out_rval[p] = prval; | |
275 | } | |
276 | void stat(uint64_t *psize, struct timespec *pts, int *prval) { | |
277 | add_op(CEPH_OSD_OP_STAT); | |
278 | unsigned p = ops.size() - 1; | |
279 | C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts, | |
280 | prval); | |
281 | out_bl[p] = &h->bl; | |
282 | out_handler[p] = h; | |
283 | out_rval[p] = prval; | |
284 | } | |
285 | // object cmpext | |
286 | struct C_ObjectOperation_cmpext : public Context { | |
287 | int *prval; | |
288 | C_ObjectOperation_cmpext(int *prval) | |
289 | : prval(prval) {} | |
290 | ||
291 | void finish(int r) { | |
292 | if (prval) | |
293 | *prval = r; | |
294 | } | |
295 | }; | |
296 | ||
297 | void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval) { | |
298 | add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl); | |
299 | unsigned p = ops.size() - 1; | |
300 | C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval); | |
301 | out_handler[p] = h; | |
302 | out_rval[p] = prval; | |
303 | } | |
304 | ||
305 | // Used by C API | |
306 | void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) { | |
307 | bufferlist cmp_bl; | |
308 | cmp_bl.append(cmp_buf, cmp_len); | |
309 | add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl); | |
310 | unsigned p = ops.size() - 1; | |
311 | C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval); | |
312 | out_handler[p] = h; | |
313 | out_rval[p] = prval; | |
314 | } | |
315 | ||
316 | void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval, | |
317 | Context* ctx) { | |
318 | bufferlist bl; | |
319 | add_data(CEPH_OSD_OP_READ, off, len, bl); | |
320 | unsigned p = ops.size() - 1; | |
321 | out_bl[p] = pbl; | |
322 | out_rval[p] = prval; | |
323 | out_handler[p] = ctx; | |
324 | } | |
325 | ||
326 | struct C_ObjectOperation_sparse_read : public Context { | |
327 | bufferlist bl; | |
328 | bufferlist *data_bl; | |
329 | std::map<uint64_t, uint64_t> *extents; | |
330 | int *prval; | |
331 | C_ObjectOperation_sparse_read(bufferlist *data_bl, | |
332 | std::map<uint64_t, uint64_t> *extents, | |
333 | int *prval) | |
334 | : data_bl(data_bl), extents(extents), prval(prval) {} | |
335 | void finish(int r) override { | |
336 | bufferlist::iterator iter = bl.begin(); | |
337 | if (r >= 0) { | |
338 | try { | |
339 | ::decode(*extents, iter); | |
340 | ::decode(*data_bl, iter); | |
341 | } catch (buffer::error& e) { | |
342 | if (prval) | |
343 | *prval = -EIO; | |
344 | } | |
345 | } | |
346 | } | |
347 | }; | |
348 | void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m, | |
349 | bufferlist *data_bl, int *prval) { | |
350 | bufferlist bl; | |
351 | add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl); | |
352 | unsigned p = ops.size() - 1; | |
353 | C_ObjectOperation_sparse_read *h = | |
354 | new C_ObjectOperation_sparse_read(data_bl, m, prval); | |
355 | out_bl[p] = &h->bl; | |
356 | out_handler[p] = h; | |
357 | out_rval[p] = prval; | |
358 | } | |
359 | void write(uint64_t off, bufferlist& bl, | |
360 | uint64_t truncate_size, | |
361 | uint32_t truncate_seq) { | |
362 | add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl); | |
363 | OSDOp& o = *ops.rbegin(); | |
364 | o.op.extent.truncate_size = truncate_size; | |
365 | o.op.extent.truncate_seq = truncate_seq; | |
366 | } | |
367 | void write(uint64_t off, bufferlist& bl) { | |
368 | write(off, bl, 0, 0); | |
369 | } | |
370 | void write_full(bufferlist& bl) { | |
371 | add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl); | |
372 | } | |
373 | void writesame(uint64_t off, uint64_t write_len, bufferlist& bl) { | |
374 | add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl); | |
375 | } | |
376 | void append(bufferlist& bl) { | |
377 | add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl); | |
378 | } | |
379 | void zero(uint64_t off, uint64_t len) { | |
380 | bufferlist bl; | |
381 | add_data(CEPH_OSD_OP_ZERO, off, len, bl); | |
382 | } | |
383 | void truncate(uint64_t off) { | |
384 | bufferlist bl; | |
385 | add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl); | |
386 | } | |
387 | void remove() { | |
388 | bufferlist bl; | |
389 | add_data(CEPH_OSD_OP_DELETE, 0, 0, bl); | |
390 | } | |
391 | void mapext(uint64_t off, uint64_t len) { | |
392 | bufferlist bl; | |
393 | add_data(CEPH_OSD_OP_MAPEXT, off, len, bl); | |
394 | } | |
395 | void sparse_read(uint64_t off, uint64_t len) { | |
396 | bufferlist bl; | |
397 | add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl); | |
398 | } | |
399 | ||
400 | void checksum(uint8_t type, const bufferlist &init_value_bl, | |
401 | uint64_t off, uint64_t len, size_t chunk_size, | |
402 | bufferlist *pbl, int *prval, Context *ctx) { | |
403 | OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM); | |
404 | osd_op.op.checksum.offset = off; | |
405 | osd_op.op.checksum.length = len; | |
406 | osd_op.op.checksum.type = type; | |
407 | osd_op.op.checksum.chunk_size = chunk_size; | |
408 | osd_op.indata.append(init_value_bl); | |
409 | ||
410 | unsigned p = ops.size() - 1; | |
411 | out_bl[p] = pbl; | |
412 | out_rval[p] = prval; | |
413 | out_handler[p] = ctx; | |
414 | } | |
415 | ||
416 | // object attrs | |
417 | void getxattr(const char *name, bufferlist *pbl, int *prval) { | |
418 | bufferlist bl; | |
419 | add_xattr(CEPH_OSD_OP_GETXATTR, name, bl); | |
420 | unsigned p = ops.size() - 1; | |
421 | out_bl[p] = pbl; | |
422 | out_rval[p] = prval; | |
423 | } | |
424 | struct C_ObjectOperation_decodevals : public Context { | |
425 | uint64_t max_entries; | |
426 | bufferlist bl; | |
427 | std::map<std::string,bufferlist> *pattrs; | |
428 | bool *ptruncated; | |
429 | int *prval; | |
430 | C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,bufferlist> *pa, | |
431 | bool *pt, int *pr) | |
432 | : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) { | |
433 | if (ptruncated) { | |
434 | *ptruncated = false; | |
435 | } | |
436 | } | |
437 | void finish(int r) override { | |
438 | if (r >= 0) { | |
439 | bufferlist::iterator p = bl.begin(); | |
440 | try { | |
441 | if (pattrs) | |
442 | ::decode(*pattrs, p); | |
443 | if (ptruncated) { | |
444 | std::map<std::string,bufferlist> ignore; | |
445 | if (!pattrs) { | |
446 | ::decode(ignore, p); | |
447 | pattrs = &ignore; | |
448 | } | |
449 | if (!p.end()) { | |
450 | ::decode(*ptruncated, p); | |
451 | } else { | |
452 | // the OSD did not provide this. since old OSDs do not | |
453 | // enfoce omap result limits either, we can infer it from | |
454 | // the size of the result | |
455 | *ptruncated = (pattrs->size() == max_entries); | |
456 | } | |
457 | } | |
458 | } | |
459 | catch (buffer::error& e) { | |
460 | if (prval) | |
461 | *prval = -EIO; | |
462 | } | |
463 | } | |
464 | } | |
465 | }; | |
466 | struct C_ObjectOperation_decodekeys : public Context { | |
467 | uint64_t max_entries; | |
468 | bufferlist bl; | |
469 | std::set<std::string> *pattrs; | |
470 | bool *ptruncated; | |
471 | int *prval; | |
472 | C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt, | |
473 | int *pr) | |
474 | : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) { | |
475 | if (ptruncated) { | |
476 | *ptruncated = false; | |
477 | } | |
478 | } | |
479 | void finish(int r) override { | |
480 | if (r >= 0) { | |
481 | bufferlist::iterator p = bl.begin(); | |
482 | try { | |
483 | if (pattrs) | |
484 | ::decode(*pattrs, p); | |
485 | if (ptruncated) { | |
486 | std::set<std::string> ignore; | |
487 | if (!pattrs) { | |
488 | ::decode(ignore, p); | |
489 | pattrs = &ignore; | |
490 | } | |
491 | if (!p.end()) { | |
492 | ::decode(*ptruncated, p); | |
493 | } else { | |
494 | // the OSD did not provide this. since old OSDs do not | |
495 | // enfoce omap result limits either, we can infer it from | |
496 | // the size of the result | |
497 | *ptruncated = (pattrs->size() == max_entries); | |
498 | } | |
499 | } | |
500 | } | |
501 | catch (buffer::error& e) { | |
502 | if (prval) | |
503 | *prval = -EIO; | |
504 | } | |
505 | } | |
506 | } | |
507 | }; | |
508 | struct C_ObjectOperation_decodewatchers : public Context { | |
509 | bufferlist bl; | |
510 | list<obj_watch_t> *pwatchers; | |
511 | int *prval; | |
512 | C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr) | |
513 | : pwatchers(pw), prval(pr) {} | |
514 | void finish(int r) override { | |
515 | if (r >= 0) { | |
516 | bufferlist::iterator p = bl.begin(); | |
517 | try { | |
518 | obj_list_watch_response_t resp; | |
519 | ::decode(resp, p); | |
520 | if (pwatchers) { | |
521 | for (list<watch_item_t>::iterator i = resp.entries.begin() ; | |
522 | i != resp.entries.end() ; ++i) { | |
523 | obj_watch_t ow; | |
524 | ostringstream sa; | |
525 | sa << i->addr; | |
526 | strncpy(ow.addr, sa.str().c_str(), 256); | |
527 | ow.watcher_id = i->name.num(); | |
528 | ow.cookie = i->cookie; | |
529 | ow.timeout_seconds = i->timeout_seconds; | |
530 | pwatchers->push_back(ow); | |
531 | } | |
532 | } | |
533 | } | |
534 | catch (buffer::error& e) { | |
535 | if (prval) | |
536 | *prval = -EIO; | |
537 | } | |
538 | } | |
539 | } | |
540 | }; | |
541 | struct C_ObjectOperation_decodesnaps : public Context { | |
542 | bufferlist bl; | |
543 | librados::snap_set_t *psnaps; | |
544 | int *prval; | |
545 | C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr) | |
546 | : psnaps(ps), prval(pr) {} | |
547 | void finish(int r) override { | |
548 | if (r >= 0) { | |
549 | bufferlist::iterator p = bl.begin(); | |
550 | try { | |
551 | obj_list_snap_response_t resp; | |
552 | ::decode(resp, p); | |
553 | if (psnaps) { | |
554 | psnaps->clones.clear(); | |
555 | for (vector<clone_info>::iterator ci = resp.clones.begin(); | |
556 | ci != resp.clones.end(); | |
557 | ++ci) { | |
558 | librados::clone_info_t clone; | |
559 | ||
560 | clone.cloneid = ci->cloneid; | |
561 | clone.snaps.reserve(ci->snaps.size()); | |
562 | clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(), | |
563 | ci->snaps.end()); | |
564 | clone.overlap = ci->overlap; | |
565 | clone.size = ci->size; | |
566 | ||
567 | psnaps->clones.push_back(clone); | |
568 | } | |
569 | psnaps->seq = resp.seq; | |
570 | } | |
571 | } catch (buffer::error& e) { | |
572 | if (prval) | |
573 | *prval = -EIO; | |
574 | } | |
575 | } | |
576 | } | |
577 | }; | |
578 | void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) { | |
579 | add_op(CEPH_OSD_OP_GETXATTRS); | |
580 | if (pattrs || prval) { | |
581 | unsigned p = ops.size() - 1; | |
582 | C_ObjectOperation_decodevals *h | |
583 | = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval); | |
584 | out_handler[p] = h; | |
585 | out_bl[p] = &h->bl; | |
586 | out_rval[p] = prval; | |
587 | } | |
588 | } | |
589 | void setxattr(const char *name, const bufferlist& bl) { | |
590 | add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); | |
591 | } | |
592 | void setxattr(const char *name, const string& s) { | |
593 | bufferlist bl; | |
594 | bl.append(s); | |
595 | add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); | |
596 | } | |
597 | void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, | |
598 | const bufferlist& bl) { | |
599 | add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl); | |
600 | } | |
601 | void rmxattr(const char *name) { | |
602 | bufferlist bl; | |
603 | add_xattr(CEPH_OSD_OP_RMXATTR, name, bl); | |
604 | } | |
605 | void setxattrs(map<string, bufferlist>& attrs) { | |
606 | bufferlist bl; | |
607 | ::encode(attrs, bl); | |
608 | add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length()); | |
609 | } | |
610 | void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) { | |
611 | bufferlist bl; | |
612 | ::encode(attrs, bl); | |
613 | add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl); | |
614 | } | |
615 | ||
616 | // trivialmap | |
617 | void tmap_update(bufferlist& bl) { | |
618 | add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl); | |
619 | } | |
620 | void tmap_put(bufferlist& bl) { | |
621 | add_data(CEPH_OSD_OP_TMAPPUT, 0, bl.length(), bl); | |
622 | } | |
623 | void tmap_get(bufferlist *pbl, int *prval) { | |
624 | add_op(CEPH_OSD_OP_TMAPGET); | |
625 | unsigned p = ops.size() - 1; | |
626 | out_bl[p] = pbl; | |
627 | out_rval[p] = prval; | |
628 | } | |
629 | void tmap_get() { | |
630 | add_op(CEPH_OSD_OP_TMAPGET); | |
631 | } | |
632 | void tmap_to_omap(bool nullok=false) { | |
633 | OSDOp& osd_op = add_op(CEPH_OSD_OP_TMAP2OMAP); | |
634 | if (nullok) | |
635 | osd_op.op.tmap2omap.flags = CEPH_OSD_TMAP2OMAP_NULLOK; | |
636 | } | |
637 | ||
638 | // objectmap | |
639 | void omap_get_keys(const string &start_after, | |
640 | uint64_t max_to_get, | |
641 | std::set<std::string> *out_set, | |
642 | bool *ptruncated, | |
643 | int *prval) { | |
644 | OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS); | |
645 | bufferlist bl; | |
646 | ::encode(start_after, bl); | |
647 | ::encode(max_to_get, bl); | |
648 | op.op.extent.offset = 0; | |
649 | op.op.extent.length = bl.length(); | |
650 | op.indata.claim_append(bl); | |
651 | if (prval || ptruncated || out_set) { | |
652 | unsigned p = ops.size() - 1; | |
653 | C_ObjectOperation_decodekeys *h = | |
654 | new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval); | |
655 | out_handler[p] = h; | |
656 | out_bl[p] = &h->bl; | |
657 | out_rval[p] = prval; | |
658 | } | |
659 | } | |
660 | ||
661 | void omap_get_vals(const string &start_after, | |
662 | const string &filter_prefix, | |
663 | uint64_t max_to_get, | |
664 | std::map<std::string, bufferlist> *out_set, | |
665 | bool *ptruncated, | |
666 | int *prval) { | |
667 | OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS); | |
668 | bufferlist bl; | |
669 | ::encode(start_after, bl); | |
670 | ::encode(max_to_get, bl); | |
671 | ::encode(filter_prefix, bl); | |
672 | op.op.extent.offset = 0; | |
673 | op.op.extent.length = bl.length(); | |
674 | op.indata.claim_append(bl); | |
675 | if (prval || out_set || ptruncated) { | |
676 | unsigned p = ops.size() - 1; | |
677 | C_ObjectOperation_decodevals *h = | |
678 | new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval); | |
679 | out_handler[p] = h; | |
680 | out_bl[p] = &h->bl; | |
681 | out_rval[p] = prval; | |
682 | } | |
683 | } | |
684 | ||
685 | void omap_get_vals_by_keys(const std::set<std::string> &to_get, | |
686 | std::map<std::string, bufferlist> *out_set, | |
687 | int *prval) { | |
688 | OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS); | |
689 | bufferlist bl; | |
690 | ::encode(to_get, bl); | |
691 | op.op.extent.offset = 0; | |
692 | op.op.extent.length = bl.length(); | |
693 | op.indata.claim_append(bl); | |
694 | if (prval || out_set) { | |
695 | unsigned p = ops.size() - 1; | |
696 | C_ObjectOperation_decodevals *h = | |
697 | new C_ObjectOperation_decodevals(0, out_set, nullptr, prval); | |
698 | out_handler[p] = h; | |
699 | out_bl[p] = &h->bl; | |
700 | out_rval[p] = prval; | |
701 | } | |
702 | } | |
703 | ||
704 | void omap_cmp(const std::map<std::string, pair<bufferlist,int> > &assertions, | |
705 | int *prval) { | |
706 | OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP); | |
707 | bufferlist bl; | |
708 | ::encode(assertions, bl); | |
709 | op.op.extent.offset = 0; | |
710 | op.op.extent.length = bl.length(); | |
711 | op.indata.claim_append(bl); | |
712 | if (prval) { | |
713 | unsigned p = ops.size() - 1; | |
714 | out_rval[p] = prval; | |
715 | } | |
716 | } | |
717 | ||
718 | struct C_ObjectOperation_copyget : public Context { | |
719 | bufferlist bl; | |
720 | object_copy_cursor_t *cursor; | |
721 | uint64_t *out_size; | |
722 | ceph::real_time *out_mtime; | |
723 | std::map<std::string,bufferlist> *out_attrs; | |
724 | bufferlist *out_data, *out_omap_header, *out_omap_data; | |
725 | vector<snapid_t> *out_snaps; | |
726 | snapid_t *out_snap_seq; | |
727 | uint32_t *out_flags; | |
728 | uint32_t *out_data_digest; | |
729 | uint32_t *out_omap_digest; | |
31f18b77 | 730 | mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids; |
7c673cae FG |
731 | uint64_t *out_truncate_seq; |
732 | uint64_t *out_truncate_size; | |
733 | int *prval; | |
734 | C_ObjectOperation_copyget(object_copy_cursor_t *c, | |
735 | uint64_t *s, | |
736 | ceph::real_time *m, | |
737 | std::map<std::string,bufferlist> *a, | |
738 | bufferlist *d, bufferlist *oh, | |
739 | bufferlist *o, | |
740 | std::vector<snapid_t> *osnaps, | |
741 | snapid_t *osnap_seq, | |
742 | uint32_t *flags, | |
743 | uint32_t *dd, | |
744 | uint32_t *od, | |
31f18b77 | 745 | mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *oreqids, |
7c673cae FG |
746 | uint64_t *otseq, |
747 | uint64_t *otsize, | |
748 | int *r) | |
749 | : cursor(c), | |
750 | out_size(s), out_mtime(m), | |
751 | out_attrs(a), out_data(d), out_omap_header(oh), | |
752 | out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq), | |
753 | out_flags(flags), out_data_digest(dd), out_omap_digest(od), | |
754 | out_reqids(oreqids), | |
755 | out_truncate_seq(otseq), | |
756 | out_truncate_size(otsize), | |
757 | prval(r) {} | |
758 | void finish(int r) override { | |
759 | // reqids are copied on ENOENT | |
760 | if (r < 0 && r != -ENOENT) | |
761 | return; | |
762 | try { | |
763 | bufferlist::iterator p = bl.begin(); | |
764 | object_copy_data_t copy_reply; | |
765 | ::decode(copy_reply, p); | |
766 | if (r == -ENOENT) { | |
767 | if (out_reqids) | |
768 | *out_reqids = copy_reply.reqids; | |
769 | return; | |
770 | } | |
771 | if (out_size) | |
772 | *out_size = copy_reply.size; | |
773 | if (out_mtime) | |
774 | *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime); | |
775 | if (out_attrs) | |
776 | *out_attrs = copy_reply.attrs; | |
777 | if (out_data) | |
778 | out_data->claim_append(copy_reply.data); | |
779 | if (out_omap_header) | |
780 | out_omap_header->claim_append(copy_reply.omap_header); | |
781 | if (out_omap_data) | |
782 | *out_omap_data = copy_reply.omap_data; | |
783 | if (out_snaps) | |
784 | *out_snaps = copy_reply.snaps; | |
785 | if (out_snap_seq) | |
786 | *out_snap_seq = copy_reply.snap_seq; | |
787 | if (out_flags) | |
788 | *out_flags = copy_reply.flags; | |
789 | if (out_data_digest) | |
790 | *out_data_digest = copy_reply.data_digest; | |
791 | if (out_omap_digest) | |
792 | *out_omap_digest = copy_reply.omap_digest; | |
793 | if (out_reqids) | |
794 | *out_reqids = copy_reply.reqids; | |
795 | if (out_truncate_seq) | |
796 | *out_truncate_seq = copy_reply.truncate_seq; | |
797 | if (out_truncate_size) | |
798 | *out_truncate_size = copy_reply.truncate_size; | |
799 | *cursor = copy_reply.cursor; | |
800 | } catch (buffer::error& e) { | |
801 | if (prval) | |
802 | *prval = -EIO; | |
803 | } | |
804 | } | |
805 | }; | |
806 | ||
807 | void copy_get(object_copy_cursor_t *cursor, | |
808 | uint64_t max, | |
809 | uint64_t *out_size, | |
810 | ceph::real_time *out_mtime, | |
811 | std::map<std::string,bufferlist> *out_attrs, | |
812 | bufferlist *out_data, | |
813 | bufferlist *out_omap_header, | |
814 | bufferlist *out_omap_data, | |
815 | vector<snapid_t> *out_snaps, | |
816 | snapid_t *out_snap_seq, | |
817 | uint32_t *out_flags, | |
818 | uint32_t *out_data_digest, | |
819 | uint32_t *out_omap_digest, | |
31f18b77 | 820 | mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids, |
7c673cae FG |
821 | uint64_t *truncate_seq, |
822 | uint64_t *truncate_size, | |
823 | int *prval) { | |
824 | OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET); | |
825 | osd_op.op.copy_get.max = max; | |
826 | ::encode(*cursor, osd_op.indata); | |
827 | ::encode(max, osd_op.indata); | |
828 | unsigned p = ops.size() - 1; | |
829 | out_rval[p] = prval; | |
830 | C_ObjectOperation_copyget *h = | |
831 | new C_ObjectOperation_copyget(cursor, out_size, out_mtime, | |
832 | out_attrs, out_data, out_omap_header, | |
833 | out_omap_data, out_snaps, out_snap_seq, | |
834 | out_flags, out_data_digest, | |
835 | out_omap_digest, out_reqids, truncate_seq, | |
836 | truncate_size, prval); | |
837 | out_bl[p] = &h->bl; | |
838 | out_handler[p] = h; | |
839 | } | |
840 | ||
841 | void undirty() { | |
842 | add_op(CEPH_OSD_OP_UNDIRTY); | |
843 | } | |
844 | ||
845 | struct C_ObjectOperation_isdirty : public Context { | |
846 | bufferlist bl; | |
847 | bool *pisdirty; | |
848 | int *prval; | |
849 | C_ObjectOperation_isdirty(bool *p, int *r) | |
850 | : pisdirty(p), prval(r) {} | |
851 | void finish(int r) override { | |
852 | if (r < 0) | |
853 | return; | |
854 | try { | |
855 | bufferlist::iterator p = bl.begin(); | |
856 | bool isdirty; | |
857 | ::decode(isdirty, p); | |
858 | if (pisdirty) | |
859 | *pisdirty = isdirty; | |
860 | } catch (buffer::error& e) { | |
861 | if (prval) | |
862 | *prval = -EIO; | |
863 | } | |
864 | } | |
865 | }; | |
866 | ||
867 | void is_dirty(bool *pisdirty, int *prval) { | |
868 | add_op(CEPH_OSD_OP_ISDIRTY); | |
869 | unsigned p = ops.size() - 1; | |
870 | out_rval[p] = prval; | |
871 | C_ObjectOperation_isdirty *h = | |
872 | new C_ObjectOperation_isdirty(pisdirty, prval); | |
873 | out_bl[p] = &h->bl; | |
874 | out_handler[p] = h; | |
875 | } | |
876 | ||
877 | struct C_ObjectOperation_hit_set_ls : public Context { | |
878 | bufferlist bl; | |
879 | std::list< std::pair<time_t, time_t> > *ptls; | |
880 | std::list< std::pair<ceph::real_time, ceph::real_time> > *putls; | |
881 | int *prval; | |
882 | C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t, | |
883 | std::list< std::pair<ceph::real_time, | |
884 | ceph::real_time> > *ut, | |
885 | int *r) | |
886 | : ptls(t), putls(ut), prval(r) {} | |
887 | void finish(int r) override { | |
888 | if (r < 0) | |
889 | return; | |
890 | try { | |
891 | bufferlist::iterator p = bl.begin(); | |
892 | std::list< std::pair<ceph::real_time, ceph::real_time> > ls; | |
893 | ::decode(ls, p); | |
894 | if (ptls) { | |
895 | ptls->clear(); | |
896 | for (auto p = ls.begin(); p != ls.end(); ++p) | |
897 | // round initial timestamp up to the next full second to | |
898 | // keep this a valid interval. | |
899 | ptls->push_back( | |
900 | make_pair(ceph::real_clock::to_time_t( | |
901 | ceph::ceil(p->first, | |
902 | // Sadly, no time literals until C++14. | |
903 | std::chrono::seconds(1))), | |
904 | ceph::real_clock::to_time_t(p->second))); | |
905 | } | |
906 | if (putls) | |
907 | putls->swap(ls); | |
908 | } catch (buffer::error& e) { | |
909 | r = -EIO; | |
910 | } | |
911 | if (prval) | |
912 | *prval = r; | |
913 | } | |
914 | }; | |
915 | ||
916 | /** | |
917 | * list available HitSets. | |
918 | * | |
919 | * We will get back a list of time intervals. Note that the most | |
920 | * recent range may have an empty end timestamp if it is still | |
921 | * accumulating. | |
922 | * | |
923 | * @param pls [out] list of time intervals | |
924 | * @param prval [out] return value | |
925 | */ | |
926 | void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) { | |
927 | add_op(CEPH_OSD_OP_PG_HITSET_LS); | |
928 | unsigned p = ops.size() - 1; | |
929 | out_rval[p] = prval; | |
930 | C_ObjectOperation_hit_set_ls *h = | |
931 | new C_ObjectOperation_hit_set_ls(pls, NULL, prval); | |
932 | out_bl[p] = &h->bl; | |
933 | out_handler[p] = h; | |
934 | } | |
935 | void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls, | |
936 | int *prval) { | |
937 | add_op(CEPH_OSD_OP_PG_HITSET_LS); | |
938 | unsigned p = ops.size() - 1; | |
939 | out_rval[p] = prval; | |
940 | C_ObjectOperation_hit_set_ls *h = | |
941 | new C_ObjectOperation_hit_set_ls(NULL, pls, prval); | |
942 | out_bl[p] = &h->bl; | |
943 | out_handler[p] = h; | |
944 | } | |
945 | ||
946 | /** | |
947 | * get HitSet | |
948 | * | |
949 | * Return an encoded HitSet that includes the provided time | |
950 | * interval. | |
951 | * | |
952 | * @param stamp [in] timestamp | |
953 | * @param pbl [out] target buffer for encoded HitSet | |
954 | * @param prval [out] return value | |
955 | */ | |
956 | void hit_set_get(ceph::real_time stamp, bufferlist *pbl, int *prval) { | |
957 | OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET); | |
958 | op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp); | |
959 | unsigned p = ops.size() - 1; | |
960 | out_rval[p] = prval; | |
961 | out_bl[p] = pbl; | |
962 | } | |
963 | ||
964 | void omap_get_header(bufferlist *bl, int *prval) { | |
965 | add_op(CEPH_OSD_OP_OMAPGETHEADER); | |
966 | unsigned p = ops.size() - 1; | |
967 | out_bl[p] = bl; | |
968 | out_rval[p] = prval; | |
969 | } | |
970 | ||
971 | void omap_set(const map<string, bufferlist> &map) { | |
972 | bufferlist bl; | |
973 | ::encode(map, bl); | |
974 | add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl); | |
975 | } | |
976 | ||
977 | void omap_set_header(bufferlist &bl) { | |
978 | add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl); | |
979 | } | |
980 | ||
981 | void omap_clear() { | |
982 | add_op(CEPH_OSD_OP_OMAPCLEAR); | |
983 | } | |
984 | ||
985 | void omap_rm_keys(const std::set<std::string> &to_remove) { | |
986 | bufferlist bl; | |
987 | ::encode(to_remove, bl); | |
988 | add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl); | |
989 | } | |
990 | ||
991 | // object classes | |
992 | void call(const char *cname, const char *method, bufferlist &indata) { | |
993 | add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL); | |
994 | } | |
995 | ||
996 | void call(const char *cname, const char *method, bufferlist &indata, | |
997 | bufferlist *outdata, Context *ctx, int *prval) { | |
998 | add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval); | |
999 | } | |
1000 | ||
1001 | // watch/notify | |
1002 | void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) { | |
1003 | OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH); | |
1004 | osd_op.op.watch.cookie = cookie; | |
1005 | osd_op.op.watch.op = op; | |
1006 | osd_op.op.watch.timeout = timeout; | |
1007 | } | |
1008 | ||
1009 | void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout, | |
1010 | bufferlist &bl, bufferlist *inbl) { | |
1011 | OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY); | |
1012 | osd_op.op.notify.cookie = cookie; | |
1013 | ::encode(prot_ver, *inbl); | |
1014 | ::encode(timeout, *inbl); | |
1015 | ::encode(bl, *inbl); | |
1016 | osd_op.indata.append(*inbl); | |
1017 | } | |
1018 | ||
1019 | void notify_ack(uint64_t notify_id, uint64_t cookie, | |
1020 | bufferlist& reply_bl) { | |
1021 | OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK); | |
1022 | bufferlist bl; | |
1023 | ::encode(notify_id, bl); | |
1024 | ::encode(cookie, bl); | |
1025 | ::encode(reply_bl, bl); | |
1026 | osd_op.indata.append(bl); | |
1027 | } | |
1028 | ||
1029 | void list_watchers(list<obj_watch_t> *out, | |
1030 | int *prval) { | |
1031 | (void)add_op(CEPH_OSD_OP_LIST_WATCHERS); | |
1032 | if (prval || out) { | |
1033 | unsigned p = ops.size() - 1; | |
1034 | C_ObjectOperation_decodewatchers *h = | |
1035 | new C_ObjectOperation_decodewatchers(out, prval); | |
1036 | out_handler[p] = h; | |
1037 | out_bl[p] = &h->bl; | |
1038 | out_rval[p] = prval; | |
1039 | } | |
1040 | } | |
1041 | ||
1042 | void list_snaps(librados::snap_set_t *out, int *prval) { | |
1043 | (void)add_op(CEPH_OSD_OP_LIST_SNAPS); | |
1044 | if (prval || out) { | |
1045 | unsigned p = ops.size() - 1; | |
1046 | C_ObjectOperation_decodesnaps *h = | |
1047 | new C_ObjectOperation_decodesnaps(out, prval); | |
1048 | out_handler[p] = h; | |
1049 | out_bl[p] = &h->bl; | |
1050 | out_rval[p] = prval; | |
1051 | } | |
1052 | } | |
1053 | ||
1054 | void assert_version(uint64_t ver) { | |
1055 | OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER); | |
1056 | osd_op.op.assert_ver.ver = ver; | |
1057 | } | |
1058 | ||
1059 | void cmpxattr(const char *name, const bufferlist& val, | |
1060 | int op, int mode) { | |
1061 | add_xattr(CEPH_OSD_OP_CMPXATTR, name, val); | |
1062 | OSDOp& o = *ops.rbegin(); | |
1063 | o.op.xattr.cmp_op = op; | |
1064 | o.op.xattr.cmp_mode = mode; | |
1065 | } | |
1066 | ||
1067 | void rollback(uint64_t snapid) { | |
1068 | OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK); | |
1069 | osd_op.op.snap.snapid = snapid; | |
1070 | } | |
1071 | ||
1072 | void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc, | |
1073 | version_t src_version, unsigned flags, | |
1074 | unsigned src_fadvise_flags) { | |
1075 | OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM); | |
1076 | osd_op.op.copy_from.snapid = snapid; | |
1077 | osd_op.op.copy_from.src_version = src_version; | |
1078 | osd_op.op.copy_from.flags = flags; | |
1079 | osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags; | |
1080 | ::encode(src, osd_op.indata); | |
1081 | ::encode(src_oloc, osd_op.indata); | |
1082 | } | |
1083 | ||
1084 | /** | |
1085 | * writeback content to backing tier | |
1086 | * | |
1087 | * If object is marked dirty in the cache tier, write back content | |
1088 | * to backing tier. If the object is clean this is a no-op. | |
1089 | * | |
1090 | * If writeback races with an update, the update will block. | |
1091 | * | |
1092 | * use with IGNORE_CACHE to avoid triggering promote. | |
1093 | */ | |
1094 | void cache_flush() { | |
1095 | add_op(CEPH_OSD_OP_CACHE_FLUSH); | |
1096 | } | |
1097 | ||
1098 | /** | |
1099 | * writeback content to backing tier | |
1100 | * | |
1101 | * If object is marked dirty in the cache tier, write back content | |
1102 | * to backing tier. If the object is clean this is a no-op. | |
1103 | * | |
1104 | * If writeback races with an update, return EAGAIN. Requires that | |
1105 | * the SKIPRWLOCKS flag be set. | |
1106 | * | |
1107 | * use with IGNORE_CACHE to avoid triggering promote. | |
1108 | */ | |
1109 | void cache_try_flush() { | |
1110 | add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH); | |
1111 | } | |
1112 | ||
1113 | /** | |
1114 | * evict object from cache tier | |
1115 | * | |
1116 | * If object is marked clean, remove the object from the cache tier. | |
1117 | * Otherwise, return EBUSY. | |
1118 | * | |
1119 | * use with IGNORE_CACHE to avoid triggering promote. | |
1120 | */ | |
1121 | void cache_evict() { | |
1122 | add_op(CEPH_OSD_OP_CACHE_EVICT); | |
1123 | } | |
1124 | ||
31f18b77 FG |
1125 | /* |
1126 | * Extensible tier | |
1127 | */ | |
1128 | void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc, | |
1129 | version_t tgt_version) { | |
1130 | OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT); | |
1131 | osd_op.op.copy_from.snapid = snapid; | |
1132 | osd_op.op.copy_from.src_version = tgt_version; | |
1133 | ::encode(tgt, osd_op.indata); | |
1134 | ::encode(tgt_oloc, osd_op.indata); | |
1135 | } | |
1136 | ||
7c673cae FG |
1137 | void set_alloc_hint(uint64_t expected_object_size, |
1138 | uint64_t expected_write_size, | |
1139 | uint32_t flags) { | |
1140 | add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size, | |
1141 | expected_write_size, flags); | |
1142 | ||
1143 | // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed | |
1144 | // not worth a feature bit. Set FAILOK per-op flag to make | |
1145 | // sure older osds don't trip over an unsupported opcode. | |
1146 | set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK); | |
1147 | } | |
1148 | ||
1149 | void dup(vector<OSDOp>& sops) { | |
1150 | ops = sops; | |
1151 | out_bl.resize(sops.size()); | |
1152 | out_handler.resize(sops.size()); | |
1153 | out_rval.resize(sops.size()); | |
1154 | for (uint32_t i = 0; i < sops.size(); i++) { | |
1155 | out_bl[i] = &sops[i].outdata; | |
1156 | out_handler[i] = NULL; | |
1157 | out_rval[i] = &sops[i].rval; | |
1158 | } | |
1159 | } | |
1160 | ||
1161 | /** | |
1162 | * Pin/unpin an object in cache tier | |
1163 | */ | |
1164 | void cache_pin() { | |
1165 | add_op(CEPH_OSD_OP_CACHE_PIN); | |
1166 | } | |
1167 | ||
1168 | void cache_unpin() { | |
1169 | add_op(CEPH_OSD_OP_CACHE_UNPIN); | |
1170 | } | |
1171 | }; | |
1172 | ||
1173 | ||
1174 | // ---------------- | |
1175 | ||
1176 | ||
1177 | class Objecter : public md_config_obs_t, public Dispatcher { | |
1178 | public: | |
1179 | // config observer bits | |
1180 | const char** get_tracked_conf_keys() const override; | |
1181 | void handle_conf_change(const struct md_config_t *conf, | |
1182 | const std::set <std::string> &changed) override; | |
1183 | ||
1184 | public: | |
1185 | Messenger *messenger; | |
1186 | MonClient *monc; | |
1187 | Finisher *finisher; | |
1188 | ZTracer::Endpoint trace_endpoint; | |
1189 | private: | |
1190 | OSDMap *osdmap; | |
1191 | public: | |
1192 | using Dispatcher::cct; | |
1193 | std::multimap<string,string> crush_location; | |
1194 | ||
31f18b77 | 1195 | std::atomic<bool> initialized{false}; |
7c673cae FG |
1196 | |
1197 | private: | |
31f18b77 FG |
1198 | std::atomic<uint64_t> last_tid{0}; |
1199 | std::atomic<unsigned> inflight_ops{0}; | |
1200 | std::atomic<int> client_inc{-1}; | |
7c673cae | 1201 | uint64_t max_linger_id; |
31f18b77 FG |
1202 | std::atomic<unsigned> num_in_flight{0}; |
1203 | std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op | |
7c673cae FG |
1204 | bool keep_balanced_budget; |
1205 | bool honor_osdmap_full; | |
1206 | bool osdmap_full_try; | |
1207 | ||
31f18b77 FG |
1208 | // If this is true, accumulate a set of blacklisted entities |
1209 | // to be drained by consume_blacklist_events. | |
1210 | bool blacklist_events_enabled; | |
1211 | std::set<entity_addr_t> blacklist_events; | |
1212 | ||
7c673cae FG |
1213 | public: |
1214 | void maybe_request_map(); | |
31f18b77 FG |
1215 | |
1216 | void enable_blacklist_events(); | |
7c673cae FG |
1217 | private: |
1218 | ||
1219 | void _maybe_request_map(); | |
1220 | ||
1221 | version_t last_seen_osdmap_version; | |
1222 | version_t last_seen_pgmap_version; | |
1223 | ||
1224 | mutable boost::shared_mutex rwlock; | |
1225 | using lock_guard = std::unique_lock<decltype(rwlock)>; | |
1226 | using unique_lock = std::unique_lock<decltype(rwlock)>; | |
1227 | using shared_lock = boost::shared_lock<decltype(rwlock)>; | |
1228 | using shunique_lock = ceph::shunique_lock<decltype(rwlock)>; | |
1229 | ceph::timer<ceph::mono_clock> timer; | |
1230 | ||
1231 | PerfCounters *logger; | |
1232 | ||
1233 | uint64_t tick_event; | |
1234 | ||
1235 | void start_tick(); | |
1236 | void tick(); | |
1237 | void update_crush_location(); | |
1238 | ||
1239 | class RequestStateHook; | |
1240 | ||
1241 | RequestStateHook *m_request_state_hook; | |
1242 | ||
1243 | public: | |
1244 | /*** track pending operations ***/ | |
1245 | // read | |
1246 | public: | |
1247 | ||
1248 | struct OSDSession; | |
1249 | ||
1250 | struct op_target_t { | |
1251 | int flags = 0; | |
1252 | ||
1253 | epoch_t epoch = 0; ///< latest epoch we calculated the mapping | |
1254 | ||
1255 | object_t base_oid; | |
1256 | object_locator_t base_oloc; | |
1257 | object_t target_oid; | |
1258 | object_locator_t target_oloc; | |
1259 | ||
1260 | ///< true if we are directed at base_pgid, not base_oid | |
1261 | bool precalc_pgid = false; | |
1262 | ||
1263 | ///< true if we have ever mapped to a valid pool | |
1264 | bool pool_ever_existed = false; | |
1265 | ||
1266 | ///< explcit pg target, if any | |
1267 | pg_t base_pgid; | |
1268 | ||
1269 | pg_t pgid; ///< last (raw) pg we mapped to | |
1270 | spg_t actual_pgid; ///< last (actual) spg_t we mapped to | |
1271 | unsigned pg_num = 0; ///< last pg_num we mapped to | |
1272 | unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to | |
1273 | vector<int> up; ///< set of up osds for last pg we mapped to | |
1274 | vector<int> acting; ///< set of acting osds for last pg we mapped to | |
1275 | int up_primary = -1; ///< last up_primary we mapped to | |
1276 | int acting_primary = -1; ///< last acting_primary we mapped to | |
1277 | int size = -1; ///< the size of the pool when were were last mapped | |
1278 | int min_size = -1; ///< the min size of the pool when were were last mapped | |
1279 | bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise | |
c07f9fc5 | 1280 | bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering |
7c673cae FG |
1281 | |
1282 | bool used_replica = false; | |
1283 | bool paused = false; | |
1284 | ||
1285 | int osd = -1; ///< the final target osd, or -1 | |
1286 | ||
1287 | epoch_t last_force_resend = 0; | |
1288 | ||
1289 | op_target_t(object_t oid, object_locator_t oloc, int flags) | |
1290 | : flags(flags), | |
1291 | base_oid(oid), | |
1292 | base_oloc(oloc) | |
1293 | {} | |
1294 | ||
1295 | op_target_t(pg_t pgid) | |
1296 | : base_oloc(pgid.pool(), pgid.ps()), | |
1297 | precalc_pgid(true), | |
1298 | base_pgid(pgid) | |
1299 | {} | |
1300 | ||
1301 | op_target_t() = default; | |
1302 | ||
1303 | hobject_t get_hobj() { | |
1304 | return hobject_t(target_oid, | |
1305 | target_oloc.key, | |
1306 | CEPH_NOSNAP, | |
1307 | target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(), | |
1308 | target_oloc.pool, | |
1309 | target_oloc.nspace); | |
1310 | } | |
1311 | ||
1312 | bool contained_by(const hobject_t& begin, const hobject_t& end) { | |
1313 | hobject_t h = get_hobj(); | |
1314 | int r = cmp(h, begin); | |
1315 | return r == 0 || (r > 0 && h < end); | |
1316 | } | |
1317 | ||
1318 | void dump(Formatter *f) const; | |
1319 | }; | |
1320 | ||
1321 | struct Op : public RefCountedObject { | |
1322 | OSDSession *session; | |
1323 | int incarnation; | |
1324 | ||
1325 | op_target_t target; | |
1326 | ||
1327 | ConnectionRef con; // for rx buffer only | |
1328 | uint64_t features; // explicitly specified op features | |
1329 | ||
1330 | vector<OSDOp> ops; | |
1331 | ||
1332 | snapid_t snapid; | |
1333 | SnapContext snapc; | |
1334 | ceph::real_time mtime; | |
1335 | ||
1336 | bufferlist *outbl; | |
1337 | vector<bufferlist*> out_bl; | |
1338 | vector<Context*> out_handler; | |
1339 | vector<int*> out_rval; | |
1340 | ||
1341 | int priority; | |
1342 | Context *onfinish; | |
1343 | uint64_t ontimeout; | |
1344 | ||
1345 | ceph_tid_t tid; | |
1346 | int attempts; | |
1347 | ||
1348 | version_t *objver; | |
1349 | epoch_t *reply_epoch; | |
1350 | ||
1351 | ceph::mono_time stamp; | |
1352 | ||
1353 | epoch_t map_dne_bound; | |
1354 | ||
1355 | bool budgeted; | |
1356 | ||
1357 | /// true if we should resend this message on failure | |
1358 | bool should_resend; | |
1359 | ||
1360 | /// true if the throttle budget is get/put on a series of OPs, | |
1361 | /// instead of per OP basis, when this flag is set, the budget is | |
1362 | /// acquired before sending the very first OP of the series and | |
1363 | /// released upon receiving the last OP reply. | |
1364 | bool ctx_budgeted; | |
1365 | ||
1366 | int *data_offset; | |
1367 | ||
1368 | osd_reqid_t reqid; // explicitly setting reqid | |
1369 | ZTracer::Trace trace; | |
1370 | ||
1371 | Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op, | |
1372 | int f, Context *fin, version_t *ov, int *offset = NULL, | |
1373 | ZTracer::Trace *parent_trace = nullptr) : | |
1374 | session(NULL), incarnation(0), | |
1375 | target(o, ol, f), | |
1376 | con(NULL), | |
1377 | features(CEPH_FEATURES_SUPPORTED_DEFAULT), | |
1378 | snapid(CEPH_NOSNAP), | |
1379 | outbl(NULL), | |
1380 | priority(0), | |
1381 | onfinish(fin), | |
1382 | ontimeout(0), | |
1383 | tid(0), | |
1384 | attempts(0), | |
1385 | objver(ov), | |
1386 | reply_epoch(NULL), | |
1387 | map_dne_bound(0), | |
1388 | budgeted(false), | |
1389 | should_resend(true), | |
1390 | ctx_budgeted(false), | |
1391 | data_offset(offset) { | |
1392 | ops.swap(op); | |
1393 | ||
1394 | /* initialize out_* to match op vector */ | |
1395 | out_bl.resize(ops.size()); | |
1396 | out_rval.resize(ops.size()); | |
1397 | out_handler.resize(ops.size()); | |
1398 | for (unsigned i = 0; i < ops.size(); i++) { | |
1399 | out_bl[i] = NULL; | |
1400 | out_handler[i] = NULL; | |
1401 | out_rval[i] = NULL; | |
1402 | } | |
1403 | ||
1404 | if (target.base_oloc.key == o) | |
1405 | target.base_oloc.key.clear(); | |
1406 | ||
31f18b77 | 1407 | if (parent_trace && parent_trace->valid()) { |
7c673cae | 1408 | trace.init("op", nullptr, parent_trace); |
31f18b77 FG |
1409 | trace.event("start"); |
1410 | } | |
7c673cae FG |
1411 | } |
1412 | ||
1413 | bool operator<(const Op& other) const { | |
1414 | return tid < other.tid; | |
1415 | } | |
1416 | ||
1417 | bool respects_full() const { | |
1418 | return | |
1419 | (target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) && | |
1420 | !(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE)); | |
1421 | } | |
1422 | ||
1423 | private: | |
1424 | ~Op() override { | |
1425 | while (!out_handler.empty()) { | |
1426 | delete out_handler.back(); | |
1427 | out_handler.pop_back(); | |
1428 | } | |
31f18b77 | 1429 | trace.event("finish"); |
7c673cae FG |
1430 | } |
1431 | }; | |
1432 | ||
1433 | struct C_Op_Map_Latest : public Context { | |
1434 | Objecter *objecter; | |
1435 | ceph_tid_t tid; | |
1436 | version_t latest; | |
1437 | C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), | |
1438 | latest(0) {} | |
1439 | void finish(int r) override; | |
1440 | }; | |
1441 | ||
1442 | struct C_Command_Map_Latest : public Context { | |
1443 | Objecter *objecter; | |
1444 | uint64_t tid; | |
1445 | version_t latest; | |
1446 | C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), | |
1447 | latest(0) {} | |
1448 | void finish(int r) override; | |
1449 | }; | |
1450 | ||
1451 | struct C_Stat : public Context { | |
1452 | bufferlist bl; | |
1453 | uint64_t *psize; | |
1454 | ceph::real_time *pmtime; | |
1455 | Context *fin; | |
1456 | C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) : | |
1457 | psize(ps), pmtime(pm), fin(c) {} | |
1458 | void finish(int r) override { | |
1459 | if (r >= 0) { | |
1460 | bufferlist::iterator p = bl.begin(); | |
1461 | uint64_t s; | |
1462 | ceph::real_time m; | |
1463 | ::decode(s, p); | |
1464 | ::decode(m, p); | |
1465 | if (psize) | |
1466 | *psize = s; | |
1467 | if (pmtime) | |
1468 | *pmtime = m; | |
1469 | } | |
1470 | fin->complete(r); | |
1471 | } | |
1472 | }; | |
1473 | ||
1474 | struct C_GetAttrs : public Context { | |
1475 | bufferlist bl; | |
1476 | map<string,bufferlist>& attrset; | |
1477 | Context *fin; | |
1478 | C_GetAttrs(map<string, bufferlist>& set, Context *c) : attrset(set), | |
1479 | fin(c) {} | |
1480 | void finish(int r) override { | |
1481 | if (r >= 0) { | |
1482 | bufferlist::iterator p = bl.begin(); | |
1483 | ::decode(attrset, p); | |
1484 | } | |
1485 | fin->complete(r); | |
1486 | } | |
1487 | }; | |
1488 | ||
1489 | ||
1490 | // Pools and statistics | |
1491 | struct NListContext { | |
1492 | collection_list_handle_t pos; | |
1493 | ||
1494 | // these are for !sortbitwise compat only | |
1495 | int current_pg = 0; | |
1496 | int starting_pg_num = 0; | |
1497 | bool sort_bitwise = false; | |
1498 | ||
1499 | bool at_end_of_pool = false; ///< publicly visible end flag | |
1500 | ||
1501 | int64_t pool_id = -1; | |
1502 | int pool_snap_seq = 0; | |
1503 | uint64_t max_entries = 0; | |
1504 | string nspace; | |
1505 | ||
1506 | bufferlist bl; // raw data read to here | |
1507 | std::list<librados::ListObjectImpl> list; | |
1508 | ||
1509 | bufferlist filter; | |
1510 | ||
1511 | bufferlist extra_info; | |
1512 | ||
1513 | // The budget associated with this context, once it is set (>= 0), | |
1514 | // the budget is not get/released on OP basis, instead the budget | |
1515 | // is acquired before sending the first OP and released upon receiving | |
1516 | // the last op reply. | |
1517 | int ctx_budget = -1; | |
1518 | ||
1519 | bool at_end() const { | |
1520 | return at_end_of_pool; | |
1521 | } | |
1522 | ||
1523 | uint32_t get_pg_hash_position() const { | |
1524 | return pos.get_hash(); | |
1525 | } | |
1526 | }; | |
1527 | ||
1528 | struct C_NList : public Context { | |
1529 | NListContext *list_context; | |
1530 | Context *final_finish; | |
1531 | Objecter *objecter; | |
1532 | epoch_t epoch; | |
1533 | C_NList(NListContext *lc, Context * finish, Objecter *ob) : | |
1534 | list_context(lc), final_finish(finish), objecter(ob), epoch(0) {} | |
1535 | void finish(int r) override { | |
1536 | if (r >= 0) { | |
1537 | objecter->_nlist_reply(list_context, r, final_finish, epoch); | |
1538 | } else { | |
1539 | final_finish->complete(r); | |
1540 | } | |
1541 | } | |
1542 | }; | |
1543 | ||
1544 | struct PoolStatOp { | |
1545 | ceph_tid_t tid; | |
1546 | list<string> pools; | |
1547 | ||
1548 | map<string,pool_stat_t> *pool_stats; | |
1549 | Context *onfinish; | |
1550 | uint64_t ontimeout; | |
1551 | ||
1552 | ceph::mono_time last_submit; | |
1553 | }; | |
1554 | ||
1555 | struct StatfsOp { | |
1556 | ceph_tid_t tid; | |
1557 | struct ceph_statfs *stats; | |
d2e6a577 | 1558 | boost::optional<int64_t> data_pool; |
7c673cae FG |
1559 | Context *onfinish; |
1560 | uint64_t ontimeout; | |
1561 | ||
1562 | ceph::mono_time last_submit; | |
1563 | }; | |
1564 | ||
1565 | struct PoolOp { | |
1566 | ceph_tid_t tid; | |
1567 | int64_t pool; | |
1568 | string name; | |
1569 | Context *onfinish; | |
1570 | uint64_t ontimeout; | |
1571 | int pool_op; | |
1572 | uint64_t auid; | |
1573 | int16_t crush_rule; | |
1574 | snapid_t snapid; | |
1575 | bufferlist *blp; | |
1576 | ||
1577 | ceph::mono_time last_submit; | |
1578 | PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0), | |
1579 | auid(0), crush_rule(0), snapid(0), blp(NULL) {} | |
1580 | }; | |
1581 | ||
1582 | // -- osd commands -- | |
1583 | struct CommandOp : public RefCountedObject { | |
1584 | OSDSession *session = nullptr; | |
1585 | ceph_tid_t tid = 0; | |
1586 | vector<string> cmd; | |
1587 | bufferlist inbl; | |
1588 | bufferlist *poutbl = nullptr; | |
1589 | string *prs = nullptr; | |
1590 | ||
1591 | // target_osd == -1 means target_pg is valid | |
1592 | const int target_osd = -1; | |
1593 | const pg_t target_pg; | |
1594 | ||
1595 | op_target_t target; | |
1596 | ||
1597 | epoch_t map_dne_bound = 0; | |
1598 | int map_check_error = 0; // error to return if map check fails | |
1599 | const char *map_check_error_str = nullptr; | |
1600 | ||
1601 | Context *onfinish = nullptr; | |
1602 | uint64_t ontimeout = 0; | |
1603 | ceph::mono_time last_submit; | |
1604 | ||
1605 | CommandOp( | |
1606 | int target_osd, | |
1607 | const vector<string> &cmd, | |
1608 | bufferlist inbl, | |
1609 | bufferlist *poutbl, | |
1610 | string *prs, | |
1611 | Context *onfinish) | |
1612 | : cmd(cmd), | |
1613 | inbl(inbl), | |
1614 | poutbl(poutbl), | |
1615 | prs(prs), | |
1616 | target_osd(target_osd), | |
1617 | onfinish(onfinish) {} | |
1618 | ||
1619 | CommandOp( | |
1620 | pg_t pgid, | |
1621 | const vector<string> &cmd, | |
1622 | bufferlist inbl, | |
1623 | bufferlist *poutbl, | |
1624 | string *prs, | |
1625 | Context *onfinish) | |
1626 | : cmd(cmd), | |
1627 | inbl(inbl), | |
1628 | poutbl(poutbl), | |
1629 | prs(prs), | |
1630 | target_pg(pgid), | |
1631 | target(pgid), | |
1632 | onfinish(onfinish) {} | |
1633 | ||
1634 | }; | |
1635 | ||
1636 | void submit_command(CommandOp *c, ceph_tid_t *ptid); | |
1637 | int _calc_command_target(CommandOp *c, shunique_lock &sul); | |
1638 | void _assign_command_session(CommandOp *c, shunique_lock &sul); | |
1639 | void _send_command(CommandOp *c); | |
1640 | int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r); | |
1641 | void _finish_command(CommandOp *c, int r, string rs); | |
1642 | void handle_command_reply(MCommandReply *m); | |
1643 | ||
1644 | ||
1645 | // -- lingering ops -- | |
1646 | ||
1647 | struct WatchContext { | |
1648 | // this simply mirrors librados WatchCtx2 | |
1649 | virtual void handle_notify(uint64_t notify_id, | |
1650 | uint64_t cookie, | |
1651 | uint64_t notifier_id, | |
1652 | bufferlist& bl) = 0; | |
1653 | virtual void handle_error(uint64_t cookie, int err) = 0; | |
1654 | virtual ~WatchContext() {} | |
1655 | }; | |
1656 | ||
1657 | struct LingerOp : public RefCountedObject { | |
1658 | uint64_t linger_id; | |
1659 | ||
1660 | op_target_t target; | |
1661 | ||
1662 | snapid_t snap; | |
1663 | SnapContext snapc; | |
1664 | ceph::real_time mtime; | |
1665 | ||
1666 | vector<OSDOp> ops; | |
1667 | bufferlist inbl; | |
1668 | bufferlist *poutbl; | |
1669 | version_t *pobjver; | |
1670 | ||
1671 | bool is_watch; | |
1672 | ceph::mono_time watch_valid_thru; ///< send time for last acked ping | |
1673 | int last_error; ///< error from last failed ping|reconnect, if any | |
1674 | boost::shared_mutex watch_lock; | |
1675 | using lock_guard = std::unique_lock<decltype(watch_lock)>; | |
1676 | using unique_lock = std::unique_lock<decltype(watch_lock)>; | |
1677 | using shared_lock = boost::shared_lock<decltype(watch_lock)>; | |
1678 | using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>; | |
1679 | ||
1680 | // queue of pending async operations, with the timestamp of | |
1681 | // when they were queued. | |
1682 | list<ceph::mono_time> watch_pending_async; | |
1683 | ||
1684 | uint32_t register_gen; | |
1685 | bool registered; | |
1686 | bool canceled; | |
1687 | Context *on_reg_commit; | |
1688 | ||
1689 | // we trigger these from an async finisher | |
1690 | Context *on_notify_finish; | |
1691 | bufferlist *notify_result_bl; | |
1692 | uint64_t notify_id; | |
1693 | ||
1694 | WatchContext *watch_context; | |
1695 | ||
1696 | OSDSession *session; | |
1697 | ||
1698 | ceph_tid_t register_tid; | |
1699 | ceph_tid_t ping_tid; | |
1700 | epoch_t map_dne_bound; | |
1701 | ||
1702 | void _queued_async() { | |
1703 | // watch_lock ust be locked unique | |
1704 | watch_pending_async.push_back(ceph::mono_clock::now()); | |
1705 | } | |
1706 | void finished_async() { | |
1707 | unique_lock l(watch_lock); | |
1708 | assert(!watch_pending_async.empty()); | |
1709 | watch_pending_async.pop_front(); | |
1710 | } | |
1711 | ||
1712 | LingerOp() : linger_id(0), | |
1713 | target(object_t(), object_locator_t(), 0), | |
1714 | snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL), | |
1715 | is_watch(false), last_error(0), | |
1716 | register_gen(0), | |
1717 | registered(false), | |
1718 | canceled(false), | |
1719 | on_reg_commit(NULL), | |
1720 | on_notify_finish(NULL), | |
1721 | notify_result_bl(NULL), | |
1722 | notify_id(0), | |
1723 | watch_context(NULL), | |
1724 | session(NULL), | |
1725 | register_tid(0), | |
1726 | ping_tid(0), | |
1727 | map_dne_bound(0) {} | |
1728 | ||
1729 | // no copy! | |
1730 | const LingerOp &operator=(const LingerOp& r); | |
1731 | LingerOp(const LingerOp& o); | |
1732 | ||
1733 | uint64_t get_cookie() { | |
1734 | return reinterpret_cast<uint64_t>(this); | |
1735 | } | |
1736 | ||
1737 | private: | |
1738 | ~LingerOp() override { | |
1739 | delete watch_context; | |
1740 | } | |
1741 | }; | |
1742 | ||
1743 | struct C_Linger_Commit : public Context { | |
1744 | Objecter *objecter; | |
1745 | LingerOp *info; | |
1746 | bufferlist outbl; // used for notify only | |
1747 | C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) { | |
1748 | info->get(); | |
1749 | } | |
1750 | ~C_Linger_Commit() override { | |
1751 | info->put(); | |
1752 | } | |
1753 | void finish(int r) override { | |
1754 | objecter->_linger_commit(info, r, outbl); | |
1755 | } | |
1756 | }; | |
1757 | ||
1758 | struct C_Linger_Reconnect : public Context { | |
1759 | Objecter *objecter; | |
1760 | LingerOp *info; | |
1761 | C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) { | |
1762 | info->get(); | |
1763 | } | |
1764 | ~C_Linger_Reconnect() override { | |
1765 | info->put(); | |
1766 | } | |
1767 | void finish(int r) override { | |
1768 | objecter->_linger_reconnect(info, r); | |
1769 | } | |
1770 | }; | |
1771 | ||
1772 | struct C_Linger_Ping : public Context { | |
1773 | Objecter *objecter; | |
1774 | LingerOp *info; | |
1775 | ceph::mono_time sent; | |
1776 | uint32_t register_gen; | |
1777 | C_Linger_Ping(Objecter *o, LingerOp *l) | |
1778 | : objecter(o), info(l), register_gen(info->register_gen) { | |
1779 | info->get(); | |
1780 | } | |
1781 | ~C_Linger_Ping() override { | |
1782 | info->put(); | |
1783 | } | |
1784 | void finish(int r) override { | |
1785 | objecter->_linger_ping(info, r, sent, register_gen); | |
1786 | } | |
1787 | }; | |
1788 | ||
1789 | struct C_Linger_Map_Latest : public Context { | |
1790 | Objecter *objecter; | |
1791 | uint64_t linger_id; | |
1792 | version_t latest; | |
1793 | C_Linger_Map_Latest(Objecter *o, uint64_t id) : | |
1794 | objecter(o), linger_id(id), latest(0) {} | |
1795 | void finish(int r) override; | |
1796 | }; | |
1797 | ||
1798 | // -- osd sessions -- | |
1799 | struct OSDBackoff { | |
1800 | spg_t pgid; | |
1801 | uint64_t id; | |
1802 | hobject_t begin, end; | |
1803 | }; | |
1804 | ||
1805 | struct OSDSession : public RefCountedObject { | |
1806 | boost::shared_mutex lock; | |
1807 | using lock_guard = std::lock_guard<decltype(lock)>; | |
1808 | using unique_lock = std::unique_lock<decltype(lock)>; | |
1809 | using shared_lock = boost::shared_lock<decltype(lock)>; | |
1810 | using shunique_lock = ceph::shunique_lock<decltype(lock)>; | |
1811 | ||
1812 | // pending ops | |
1813 | map<ceph_tid_t,Op*> ops; | |
1814 | map<uint64_t, LingerOp*> linger_ops; | |
1815 | map<ceph_tid_t,CommandOp*> command_ops; | |
1816 | ||
1817 | // backoffs | |
1818 | map<spg_t,map<hobject_t,OSDBackoff>> backoffs; | |
1819 | map<uint64_t,OSDBackoff*> backoffs_by_id; | |
1820 | ||
1821 | int osd; | |
1822 | int incarnation; | |
1823 | ConnectionRef con; | |
1824 | int num_locks; | |
1825 | std::unique_ptr<std::mutex[]> completion_locks; | |
1826 | using unique_completion_lock = std::unique_lock< | |
1827 | decltype(completion_locks)::element_type>; | |
1828 | ||
1829 | ||
1830 | OSDSession(CephContext *cct, int o) : | |
1831 | osd(o), incarnation(0), con(NULL), | |
1832 | num_locks(cct->_conf->objecter_completion_locks_per_session), | |
1833 | completion_locks(new std::mutex[num_locks]) {} | |
1834 | ||
1835 | ~OSDSession() override; | |
1836 | ||
1837 | bool is_homeless() { return (osd == -1); } | |
1838 | ||
1839 | unique_completion_lock get_lock(object_t& oid); | |
1840 | }; | |
1841 | map<int,OSDSession*> osd_sessions; | |
1842 | ||
1843 | bool osdmap_full_flag() const; | |
1844 | bool osdmap_pool_full(const int64_t pool_id) const; | |
1845 | ||
1846 | private: | |
1847 | ||
1848 | /** | |
1849 | * Test pg_pool_t::FLAG_FULL on a pool | |
1850 | * | |
1851 | * @return true if the pool exists and has the flag set, or | |
1852 | * the global full flag is set, else false | |
1853 | */ | |
1854 | bool _osdmap_pool_full(const int64_t pool_id) const; | |
1855 | bool _osdmap_pool_full(const pg_pool_t &p) const; | |
1856 | void update_pool_full_map(map<int64_t, bool>& pool_full_map); | |
1857 | ||
1858 | map<uint64_t, LingerOp*> linger_ops; | |
1859 | // we use this just to confirm a cookie is valid before dereferencing the ptr | |
1860 | set<LingerOp*> linger_ops_set; | |
1861 | ||
1862 | map<ceph_tid_t,PoolStatOp*> poolstat_ops; | |
1863 | map<ceph_tid_t,StatfsOp*> statfs_ops; | |
1864 | map<ceph_tid_t,PoolOp*> pool_ops; | |
31f18b77 | 1865 | std::atomic<unsigned> num_homeless_ops{0}; |
7c673cae FG |
1866 | |
1867 | OSDSession *homeless_session; | |
1868 | ||
1869 | // ops waiting for an osdmap with a new pool or confirmation that | |
1870 | // the pool does not exist (may be expanded to other uses later) | |
1871 | map<uint64_t, LingerOp*> check_latest_map_lingers; | |
1872 | map<ceph_tid_t, Op*> check_latest_map_ops; | |
1873 | map<ceph_tid_t, CommandOp*> check_latest_map_commands; | |
1874 | ||
1875 | map<epoch_t,list< pair<Context*, int> > > waiting_for_map; | |
1876 | ||
1877 | ceph::timespan mon_timeout; | |
1878 | ceph::timespan osd_timeout; | |
1879 | ||
1880 | MOSDOp *_prepare_osd_op(Op *op); | |
1881 | void _send_op(Op *op, MOSDOp *m = NULL); | |
1882 | void _send_op_account(Op *op); | |
1883 | void _cancel_linger_op(Op *op); | |
1884 | void finish_op(OSDSession *session, ceph_tid_t tid); | |
1885 | void _finish_op(Op *op, int r); | |
1886 | static bool is_pg_changed( | |
1887 | int oldprimary, | |
1888 | const vector<int>& oldacting, | |
1889 | int newprimary, | |
1890 | const vector<int>& newacting, | |
1891 | bool any_change=false); | |
1892 | enum recalc_op_target_result { | |
1893 | RECALC_OP_TARGET_NO_ACTION = 0, | |
1894 | RECALC_OP_TARGET_NEED_RESEND, | |
1895 | RECALC_OP_TARGET_POOL_DNE, | |
1896 | RECALC_OP_TARGET_OSD_DNE, | |
1897 | RECALC_OP_TARGET_OSD_DOWN, | |
1898 | }; | |
1899 | bool _osdmap_full_flag() const; | |
1900 | bool _osdmap_has_pool_full() const; | |
1901 | ||
1902 | bool target_should_be_paused(op_target_t *op); | |
1903 | int _calc_target(op_target_t *t, Connection *con, | |
1904 | bool any_change = false); | |
1905 | int _map_session(op_target_t *op, OSDSession **s, | |
1906 | shunique_lock& lc); | |
1907 | ||
1908 | void _session_op_assign(OSDSession *s, Op *op); | |
1909 | void _session_op_remove(OSDSession *s, Op *op); | |
1910 | void _session_linger_op_assign(OSDSession *to, LingerOp *op); | |
1911 | void _session_linger_op_remove(OSDSession *from, LingerOp *op); | |
1912 | void _session_command_op_assign(OSDSession *to, CommandOp *op); | |
1913 | void _session_command_op_remove(OSDSession *from, CommandOp *op); | |
1914 | ||
1915 | int _assign_op_target_session(Op *op, shunique_lock& lc, | |
1916 | bool src_session_locked, | |
1917 | bool dst_session_locked); | |
1918 | int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc); | |
1919 | ||
1920 | void _linger_submit(LingerOp *info, shunique_lock& sul); | |
1921 | void _send_linger(LingerOp *info, shunique_lock& sul); | |
1922 | void _linger_commit(LingerOp *info, int r, bufferlist& outbl); | |
1923 | void _linger_reconnect(LingerOp *info, int r); | |
1924 | void _send_linger_ping(LingerOp *info); | |
1925 | void _linger_ping(LingerOp *info, int r, ceph::mono_time sent, | |
1926 | uint32_t register_gen); | |
1927 | int _normalize_watch_error(int r); | |
1928 | ||
1929 | friend class C_DoWatchError; | |
1930 | public: | |
1931 | void linger_callback_flush(Context *ctx) { | |
1932 | finisher->queue(ctx); | |
1933 | } | |
1934 | ||
1935 | private: | |
1936 | void _check_op_pool_dne(Op *op, unique_lock *sl); | |
1937 | void _send_op_map_check(Op *op); | |
1938 | void _op_cancel_map_check(Op *op); | |
1939 | void _check_linger_pool_dne(LingerOp *op, bool *need_unregister); | |
1940 | void _send_linger_map_check(LingerOp *op); | |
1941 | void _linger_cancel_map_check(LingerOp *op); | |
1942 | void _check_command_map_dne(CommandOp *op); | |
1943 | void _send_command_map_check(CommandOp *op); | |
1944 | void _command_cancel_map_check(CommandOp *op); | |
1945 | ||
1946 | void kick_requests(OSDSession *session); | |
1947 | void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend); | |
1948 | void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul); | |
1949 | ||
1950 | int _get_session(int osd, OSDSession **session, shunique_lock& sul); | |
1951 | void put_session(OSDSession *s); | |
1952 | void get_session(OSDSession *s); | |
1953 | void _reopen_session(OSDSession *session); | |
1954 | void close_session(OSDSession *session); | |
1955 | ||
1956 | void _nlist_reply(NListContext *list_context, int r, Context *final_finish, | |
1957 | epoch_t reply_epoch); | |
1958 | ||
1959 | void resend_mon_ops(); | |
1960 | ||
1961 | /** | |
1962 | * handle a budget for in-flight ops | |
1963 | * budget is taken whenever an op goes into the ops map | |
1964 | * and returned whenever an op is removed from the map | |
1965 | * If throttle_op needs to throttle it will unlock client_lock. | |
1966 | */ | |
1967 | int calc_op_budget(Op *op); | |
1968 | void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0); | |
1969 | int _take_op_budget(Op *op, shunique_lock& sul) { | |
1970 | assert(sul && sul.mutex() == &rwlock); | |
1971 | int op_budget = calc_op_budget(op); | |
1972 | if (keep_balanced_budget) { | |
1973 | _throttle_op(op, sul, op_budget); | |
1974 | } else { | |
1975 | op_throttle_bytes.take(op_budget); | |
1976 | op_throttle_ops.take(1); | |
1977 | } | |
1978 | op->budgeted = true; | |
1979 | return op_budget; | |
1980 | } | |
1981 | void put_op_budget_bytes(int op_budget) { | |
1982 | assert(op_budget >= 0); | |
1983 | op_throttle_bytes.put(op_budget); | |
1984 | op_throttle_ops.put(1); | |
1985 | } | |
1986 | void put_op_budget(Op *op) { | |
1987 | assert(op->budgeted); | |
1988 | int op_budget = calc_op_budget(op); | |
1989 | put_op_budget_bytes(op_budget); | |
1990 | } | |
1991 | void put_nlist_context_budget(NListContext *list_context); | |
1992 | Throttle op_throttle_bytes, op_throttle_ops; | |
1993 | ||
1994 | public: | |
1995 | Objecter(CephContext *cct_, Messenger *m, MonClient *mc, | |
1996 | Finisher *fin, | |
1997 | double mon_timeout, | |
1998 | double osd_timeout) : | |
1999 | Dispatcher(cct_), messenger(m), monc(mc), finisher(fin), | |
2000 | trace_endpoint("0.0.0.0", 0, "Objecter"), | |
31f18b77 FG |
2001 | osdmap(new OSDMap), |
2002 | max_linger_id(0), | |
7c673cae | 2003 | keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false), |
31f18b77 | 2004 | blacklist_events_enabled(false), |
7c673cae FG |
2005 | last_seen_osdmap_version(0), last_seen_pgmap_version(0), |
2006 | logger(NULL), tick_event(0), m_request_state_hook(NULL), | |
7c673cae FG |
2007 | homeless_session(new OSDSession(cct, -1)), |
2008 | mon_timeout(ceph::make_timespan(mon_timeout)), | |
2009 | osd_timeout(ceph::make_timespan(osd_timeout)), | |
2010 | op_throttle_bytes(cct, "objecter_bytes", | |
2011 | cct->_conf->objecter_inflight_op_bytes), | |
2012 | op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops), | |
2013 | epoch_barrier(0), | |
2014 | retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply) | |
2015 | { } | |
2016 | ~Objecter() override; | |
2017 | ||
2018 | void init(); | |
2019 | void start(const OSDMap *o = nullptr); | |
2020 | void shutdown(); | |
2021 | ||
2022 | // These two templates replace osdmap_(get)|(put)_read. Simply wrap | |
2023 | // whatever functionality you want to use the OSDMap in a lambda like: | |
2024 | // | |
2025 | // with_osdmap([](const OSDMap& o) { o.do_stuff(); }); | |
2026 | // | |
2027 | // or | |
2028 | // | |
2029 | // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); }); | |
2030 | // | |
2031 | // Do not call into something that will try to lock the OSDMap from | |
2032 | // here or you will have great woe and misery. | |
2033 | ||
2034 | template<typename Callback, typename...Args> | |
2035 | auto with_osdmap(Callback&& cb, Args&&... args) const -> | |
2036 | decltype(cb(*osdmap, std::forward<Args>(args)...)) { | |
2037 | shared_lock l(rwlock); | |
2038 | return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...); | |
2039 | } | |
2040 | ||
2041 | ||
2042 | /** | |
2043 | * Tell the objecter to throttle outgoing ops according to its | |
2044 | * budget (in _conf). If you do this, ops can block, in | |
2045 | * which case it will unlock client_lock and sleep until | |
2046 | * incoming messages reduce the used budget low enough for | |
2047 | * the ops to continue going; then it will lock client_lock again. | |
2048 | */ | |
2049 | void set_balanced_budget() { keep_balanced_budget = true; } | |
2050 | void unset_balanced_budget() { keep_balanced_budget = false; } | |
2051 | ||
2052 | void set_honor_osdmap_full() { honor_osdmap_full = true; } | |
2053 | void unset_honor_osdmap_full() { honor_osdmap_full = false; } | |
2054 | ||
2055 | void set_osdmap_full_try() { osdmap_full_try = true; } | |
2056 | void unset_osdmap_full_try() { osdmap_full_try = false; } | |
2057 | ||
2058 | void _scan_requests(OSDSession *s, | |
2059 | bool force_resend, | |
2060 | bool cluster_full, | |
2061 | map<int64_t, bool> *pool_full_map, | |
2062 | map<ceph_tid_t, Op*>& need_resend, | |
2063 | list<LingerOp*>& need_resend_linger, | |
2064 | map<ceph_tid_t, CommandOp*>& need_resend_command, | |
2065 | shunique_lock& sul); | |
2066 | ||
2067 | int64_t get_object_hash_position(int64_t pool, const string& key, | |
2068 | const string& ns); | |
2069 | int64_t get_object_pg_hash_position(int64_t pool, const string& key, | |
2070 | const string& ns); | |
2071 | ||
2072 | // messages | |
2073 | public: | |
2074 | bool ms_dispatch(Message *m) override; | |
2075 | bool ms_can_fast_dispatch_any() const override { | |
2076 | return true; | |
2077 | } | |
2078 | bool ms_can_fast_dispatch(const Message *m) const override { | |
2079 | switch (m->get_type()) { | |
2080 | case CEPH_MSG_OSD_OPREPLY: | |
2081 | case CEPH_MSG_WATCH_NOTIFY: | |
2082 | return true; | |
2083 | default: | |
2084 | return false; | |
2085 | } | |
2086 | } | |
2087 | void ms_fast_dispatch(Message *m) override { | |
224ce89b WB |
2088 | if (!ms_dispatch(m)) { |
2089 | m->put(); | |
2090 | } | |
7c673cae FG |
2091 | } |
2092 | ||
2093 | void handle_osd_op_reply(class MOSDOpReply *m); | |
2094 | void handle_osd_backoff(class MOSDBackoff *m); | |
2095 | void handle_watch_notify(class MWatchNotify *m); | |
2096 | void handle_osd_map(class MOSDMap *m); | |
2097 | void wait_for_osd_map(); | |
2098 | ||
31f18b77 FG |
2099 | /** |
2100 | * Get list of entities blacklisted since this was last called, | |
2101 | * and reset the list. | |
2102 | * | |
2103 | * Uses a std::set because typical use case is to compare some | |
2104 | * other list of clients to see which overlap with the blacklisted | |
2105 | * addrs. | |
2106 | * | |
2107 | */ | |
2108 | void consume_blacklist_events(std::set<entity_addr_t> *events); | |
2109 | ||
7c673cae FG |
2110 | int pool_snap_by_name(int64_t poolid, |
2111 | const char *snap_name, | |
2112 | snapid_t *snap) const; | |
2113 | int pool_snap_get_info(int64_t poolid, snapid_t snap, | |
2114 | pool_snap_info_t *info) const; | |
2115 | int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps); | |
2116 | private: | |
2117 | ||
31f18b77 FG |
2118 | void emit_blacklist_events(const OSDMap::Incremental &inc); |
2119 | void emit_blacklist_events(const OSDMap &old_osd_map, | |
2120 | const OSDMap &new_osd_map); | |
2121 | ||
7c673cae FG |
2122 | // low-level |
2123 | void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid); | |
2124 | void _op_submit_with_budget(Op *op, shunique_lock& lc, | |
2125 | ceph_tid_t *ptid, | |
2126 | int *ctx_budget = NULL); | |
2127 | inline void unregister_op(Op *op); | |
2128 | ||
2129 | // public interface | |
2130 | public: | |
2131 | void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL); | |
2132 | bool is_active() { | |
2133 | shared_lock l(rwlock); | |
31f18b77 | 2134 | return !((!inflight_ops) && linger_ops.empty() && |
7c673cae FG |
2135 | poolstat_ops.empty() && statfs_ops.empty()); |
2136 | } | |
2137 | ||
2138 | /** | |
2139 | * Output in-flight requests | |
2140 | */ | |
2141 | void _dump_active(OSDSession *s); | |
2142 | void _dump_active(); | |
2143 | void dump_active(); | |
2144 | void dump_requests(Formatter *fmt); | |
2145 | void _dump_ops(const OSDSession *s, Formatter *fmt); | |
2146 | void dump_ops(Formatter *fmt); | |
2147 | void _dump_linger_ops(const OSDSession *s, Formatter *fmt); | |
2148 | void dump_linger_ops(Formatter *fmt); | |
2149 | void _dump_command_ops(const OSDSession *s, Formatter *fmt); | |
2150 | void dump_command_ops(Formatter *fmt); | |
2151 | void dump_pool_ops(Formatter *fmt) const; | |
2152 | void dump_pool_stat_ops(Formatter *fmt) const; | |
2153 | void dump_statfs_ops(Formatter *fmt) const; | |
2154 | ||
31f18b77 FG |
2155 | int get_client_incarnation() const { return client_inc; } |
2156 | void set_client_incarnation(int inc) { client_inc = inc; } | |
7c673cae FG |
2157 | |
2158 | bool have_map(epoch_t epoch); | |
2159 | /// wait for epoch; true if we already have it | |
2160 | bool wait_for_map(epoch_t epoch, Context *c, int err=0); | |
2161 | void _wait_for_new_map(Context *c, epoch_t epoch, int err=0); | |
2162 | void wait_for_latest_osdmap(Context *fin); | |
2163 | void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin); | |
2164 | void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin); | |
2165 | ||
2166 | /** Get the current set of global op flags */ | |
31f18b77 | 2167 | int get_global_op_flags() const { return global_op_flags; } |
7c673cae FG |
2168 | /** Add a flag to the global op flags, not really atomic operation */ |
2169 | void add_global_op_flags(int flag) { | |
31f18b77 | 2170 | global_op_flags.fetch_or(flag); |
7c673cae | 2171 | } |
31f18b77 | 2172 | /** Clear the passed flags from the global op flag set */ |
7c673cae | 2173 | void clear_global_op_flag(int flags) { |
31f18b77 | 2174 | global_op_flags.fetch_and(~flags); |
7c673cae FG |
2175 | } |
2176 | ||
2177 | /// cancel an in-progress request with the given return code | |
2178 | private: | |
2179 | int op_cancel(OSDSession *s, ceph_tid_t tid, int r); | |
2180 | int _op_cancel(ceph_tid_t tid, int r); | |
2181 | public: | |
2182 | int op_cancel(ceph_tid_t tid, int r); | |
2183 | ||
2184 | /** | |
2185 | * Any write op which is in progress at the start of this call shall no | |
2186 | * longer be in progress when this call ends. Operations started after the | |
2187 | * start of this call may still be in progress when this call ends. | |
2188 | * | |
2189 | * @return the latest possible epoch in which a cancelled op could have | |
2190 | * existed, or -1 if nothing was cancelled. | |
2191 | */ | |
2192 | epoch_t op_cancel_writes(int r, int64_t pool=-1); | |
2193 | ||
2194 | // commands | |
2195 | void osd_command(int osd, const std::vector<string>& cmd, | |
2196 | const bufferlist& inbl, ceph_tid_t *ptid, | |
2197 | bufferlist *poutbl, string *prs, Context *onfinish) { | |
2198 | assert(osd >= 0); | |
2199 | CommandOp *c = new CommandOp( | |
2200 | osd, | |
2201 | cmd, | |
2202 | inbl, | |
2203 | poutbl, | |
2204 | prs, | |
2205 | onfinish); | |
2206 | submit_command(c, ptid); | |
2207 | } | |
224ce89b | 2208 | void pg_command(pg_t pgid, const vector<string>& cmd, |
7c673cae FG |
2209 | const bufferlist& inbl, ceph_tid_t *ptid, |
2210 | bufferlist *poutbl, string *prs, Context *onfinish) { | |
2211 | CommandOp *c = new CommandOp( | |
2212 | pgid, | |
2213 | cmd, | |
2214 | inbl, | |
2215 | poutbl, | |
2216 | prs, | |
2217 | onfinish); | |
2218 | submit_command(c, ptid); | |
2219 | } | |
2220 | ||
2221 | // mid-level helpers | |
2222 | Op *prepare_mutate_op( | |
2223 | const object_t& oid, const object_locator_t& oloc, | |
2224 | ObjectOperation& op, const SnapContext& snapc, | |
2225 | ceph::real_time mtime, int flags, | |
2226 | Context *oncommit, version_t *objver = NULL, | |
2227 | osd_reqid_t reqid = osd_reqid_t(), | |
2228 | ZTracer::Trace *parent_trace = nullptr) { | |
31f18b77 | 2229 | Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | |
7c673cae FG |
2230 | CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace); |
2231 | o->priority = op.priority; | |
2232 | o->mtime = mtime; | |
2233 | o->snapc = snapc; | |
2234 | o->out_rval.swap(op.out_rval); | |
2235 | o->reqid = reqid; | |
2236 | return o; | |
2237 | } | |
2238 | ceph_tid_t mutate( | |
2239 | const object_t& oid, const object_locator_t& oloc, | |
2240 | ObjectOperation& op, const SnapContext& snapc, | |
2241 | ceph::real_time mtime, int flags, | |
2242 | Context *oncommit, version_t *objver = NULL, | |
2243 | osd_reqid_t reqid = osd_reqid_t()) { | |
2244 | Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags, | |
2245 | oncommit, objver, reqid); | |
2246 | ceph_tid_t tid; | |
2247 | op_submit(o, &tid); | |
2248 | return tid; | |
2249 | } | |
2250 | Op *prepare_read_op( | |
2251 | const object_t& oid, const object_locator_t& oloc, | |
2252 | ObjectOperation& op, | |
2253 | snapid_t snapid, bufferlist *pbl, int flags, | |
2254 | Context *onack, version_t *objver = NULL, | |
2255 | int *data_offset = NULL, | |
2256 | uint64_t features = 0, | |
2257 | ZTracer::Trace *parent_trace = nullptr) { | |
31f18b77 | 2258 | Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | |
7c673cae FG |
2259 | CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace); |
2260 | o->priority = op.priority; | |
2261 | o->snapid = snapid; | |
2262 | o->outbl = pbl; | |
2263 | if (!o->outbl && op.size() == 1 && op.out_bl[0]->length()) | |
2264 | o->outbl = op.out_bl[0]; | |
2265 | o->out_bl.swap(op.out_bl); | |
2266 | o->out_handler.swap(op.out_handler); | |
2267 | o->out_rval.swap(op.out_rval); | |
2268 | return o; | |
2269 | } | |
2270 | ceph_tid_t read( | |
2271 | const object_t& oid, const object_locator_t& oloc, | |
2272 | ObjectOperation& op, | |
2273 | snapid_t snapid, bufferlist *pbl, int flags, | |
2274 | Context *onack, version_t *objver = NULL, | |
2275 | int *data_offset = NULL, | |
2276 | uint64_t features = 0) { | |
2277 | Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver, | |
2278 | data_offset); | |
2279 | if (features) | |
2280 | o->features = features; | |
2281 | ceph_tid_t tid; | |
2282 | op_submit(o, &tid); | |
2283 | return tid; | |
2284 | } | |
2285 | Op *prepare_pg_read_op( | |
2286 | uint32_t hash, object_locator_t oloc, | |
2287 | ObjectOperation& op, bufferlist *pbl, int flags, | |
2288 | Context *onack, epoch_t *reply_epoch, | |
2289 | int *ctx_budget) { | |
2290 | Op *o = new Op(object_t(), oloc, | |
2291 | op.ops, | |
31f18b77 | 2292 | flags | global_op_flags | CEPH_OSD_FLAG_READ | |
7c673cae FG |
2293 | CEPH_OSD_FLAG_IGNORE_OVERLAY, |
2294 | onack, NULL); | |
2295 | o->target.precalc_pgid = true; | |
2296 | o->target.base_pgid = pg_t(hash, oloc.pool); | |
2297 | o->priority = op.priority; | |
2298 | o->snapid = CEPH_NOSNAP; | |
2299 | o->outbl = pbl; | |
2300 | o->out_bl.swap(op.out_bl); | |
2301 | o->out_handler.swap(op.out_handler); | |
2302 | o->out_rval.swap(op.out_rval); | |
2303 | o->reply_epoch = reply_epoch; | |
2304 | if (ctx_budget) { | |
2305 | // budget is tracked by listing context | |
2306 | o->ctx_budgeted = true; | |
2307 | } | |
2308 | return o; | |
2309 | } | |
2310 | ceph_tid_t pg_read( | |
2311 | uint32_t hash, object_locator_t oloc, | |
2312 | ObjectOperation& op, bufferlist *pbl, int flags, | |
2313 | Context *onack, epoch_t *reply_epoch, | |
2314 | int *ctx_budget) { | |
2315 | Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags, | |
2316 | onack, reply_epoch, ctx_budget); | |
2317 | ceph_tid_t tid; | |
2318 | op_submit(o, &tid, ctx_budget); | |
2319 | return tid; | |
2320 | } | |
2321 | ||
2322 | // caller owns a ref | |
2323 | LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc, | |
2324 | int flags); | |
2325 | ceph_tid_t linger_watch(LingerOp *info, | |
2326 | ObjectOperation& op, | |
2327 | const SnapContext& snapc, ceph::real_time mtime, | |
2328 | bufferlist& inbl, | |
2329 | Context *onfinish, | |
2330 | version_t *objver); | |
2331 | ceph_tid_t linger_notify(LingerOp *info, | |
2332 | ObjectOperation& op, | |
2333 | snapid_t snap, bufferlist& inbl, | |
2334 | bufferlist *poutbl, | |
2335 | Context *onack, | |
2336 | version_t *objver); | |
2337 | int linger_check(LingerOp *info); | |
2338 | void linger_cancel(LingerOp *info); // releases a reference | |
2339 | void _linger_cancel(LingerOp *info); | |
2340 | ||
2341 | void _do_watch_notify(LingerOp *info, MWatchNotify *m); | |
2342 | ||
2343 | /** | |
2344 | * set up initial ops in the op vector, and allocate a final op slot. | |
2345 | * | |
2346 | * The caller is responsible for filling in the final ops_count ops. | |
2347 | * | |
2348 | * @param ops op vector | |
2349 | * @param ops_count number of final ops the caller will fill in | |
2350 | * @param extra_ops pointer to [array of] initial op[s] | |
2351 | * @return index of final op (for caller to fill in) | |
2352 | */ | |
2353 | int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) { | |
2354 | int i; | |
2355 | int extra = 0; | |
2356 | ||
2357 | if (extra_ops) | |
2358 | extra = extra_ops->ops.size(); | |
2359 | ||
2360 | ops.resize(ops_count + extra); | |
2361 | ||
2362 | for (i=0; i<extra; i++) { | |
2363 | ops[i] = extra_ops->ops[i]; | |
2364 | } | |
2365 | ||
2366 | return i; | |
2367 | } | |
2368 | ||
2369 | ||
2370 | // high-level helpers | |
2371 | Op *prepare_stat_op( | |
2372 | const object_t& oid, const object_locator_t& oloc, | |
2373 | snapid_t snap, uint64_t *psize, ceph::real_time *pmtime, | |
2374 | int flags, Context *onfinish, version_t *objver = NULL, | |
2375 | ObjectOperation *extra_ops = NULL) { | |
2376 | vector<OSDOp> ops; | |
2377 | int i = init_ops(ops, 1, extra_ops); | |
2378 | ops[i].op.op = CEPH_OSD_OP_STAT; | |
2379 | C_Stat *fin = new C_Stat(psize, pmtime, onfinish); | |
31f18b77 | 2380 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2381 | CEPH_OSD_FLAG_READ, fin, objver); |
2382 | o->snapid = snap; | |
2383 | o->outbl = &fin->bl; | |
2384 | return o; | |
2385 | } | |
2386 | ceph_tid_t stat( | |
2387 | const object_t& oid, const object_locator_t& oloc, | |
2388 | snapid_t snap, uint64_t *psize, ceph::real_time *pmtime, | |
2389 | int flags, Context *onfinish, version_t *objver = NULL, | |
2390 | ObjectOperation *extra_ops = NULL) { | |
2391 | Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags, | |
2392 | onfinish, objver, extra_ops); | |
2393 | ceph_tid_t tid; | |
2394 | op_submit(o, &tid); | |
2395 | return tid; | |
2396 | } | |
2397 | ||
2398 | Op *prepare_read_op( | |
2399 | const object_t& oid, const object_locator_t& oloc, | |
2400 | uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, | |
2401 | int flags, Context *onfinish, version_t *objver = NULL, | |
2402 | ObjectOperation *extra_ops = NULL, int op_flags = 0, | |
2403 | ZTracer::Trace *parent_trace = nullptr) { | |
2404 | vector<OSDOp> ops; | |
2405 | int i = init_ops(ops, 1, extra_ops); | |
2406 | ops[i].op.op = CEPH_OSD_OP_READ; | |
2407 | ops[i].op.extent.offset = off; | |
2408 | ops[i].op.extent.length = len; | |
2409 | ops[i].op.extent.truncate_size = 0; | |
2410 | ops[i].op.extent.truncate_seq = 0; | |
2411 | ops[i].op.flags = op_flags; | |
31f18b77 | 2412 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2413 | CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace); |
2414 | o->snapid = snap; | |
2415 | o->outbl = pbl; | |
2416 | return o; | |
2417 | } | |
2418 | ceph_tid_t read( | |
2419 | const object_t& oid, const object_locator_t& oloc, | |
2420 | uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, | |
2421 | int flags, Context *onfinish, version_t *objver = NULL, | |
2422 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2423 | Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags, | |
2424 | onfinish, objver, extra_ops, op_flags); | |
2425 | ceph_tid_t tid; | |
2426 | op_submit(o, &tid); | |
2427 | return tid; | |
2428 | } | |
2429 | ||
2430 | Op *prepare_cmpext_op( | |
2431 | const object_t& oid, const object_locator_t& oloc, | |
2432 | uint64_t off, bufferlist &cmp_bl, | |
2433 | snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL, | |
2434 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2435 | vector<OSDOp> ops; | |
2436 | int i = init_ops(ops, 1, extra_ops); | |
2437 | ops[i].op.op = CEPH_OSD_OP_CMPEXT; | |
2438 | ops[i].op.extent.offset = off; | |
2439 | ops[i].op.extent.length = cmp_bl.length(); | |
2440 | ops[i].op.extent.truncate_size = 0; | |
2441 | ops[i].op.extent.truncate_seq = 0; | |
2442 | ops[i].indata = cmp_bl; | |
2443 | ops[i].op.flags = op_flags; | |
31f18b77 | 2444 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2445 | CEPH_OSD_FLAG_READ, onfinish, objver); |
2446 | o->snapid = snap; | |
2447 | return o; | |
2448 | } | |
2449 | ||
2450 | ceph_tid_t cmpext( | |
2451 | const object_t& oid, const object_locator_t& oloc, | |
2452 | uint64_t off, bufferlist &cmp_bl, | |
2453 | snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL, | |
2454 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2455 | Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap, | |
2456 | flags, onfinish, objver, extra_ops, op_flags); | |
2457 | ceph_tid_t tid; | |
2458 | op_submit(o, &tid); | |
2459 | return tid; | |
2460 | } | |
2461 | ||
2462 | ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc, | |
2463 | uint64_t off, uint64_t len, snapid_t snap, | |
2464 | bufferlist *pbl, int flags, uint64_t trunc_size, | |
2465 | __u32 trunc_seq, Context *onfinish, | |
2466 | version_t *objver = NULL, | |
2467 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2468 | vector<OSDOp> ops; | |
2469 | int i = init_ops(ops, 1, extra_ops); | |
2470 | ops[i].op.op = CEPH_OSD_OP_READ; | |
2471 | ops[i].op.extent.offset = off; | |
2472 | ops[i].op.extent.length = len; | |
2473 | ops[i].op.extent.truncate_size = trunc_size; | |
2474 | ops[i].op.extent.truncate_seq = trunc_seq; | |
2475 | ops[i].op.flags = op_flags; | |
31f18b77 | 2476 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2477 | CEPH_OSD_FLAG_READ, onfinish, objver); |
2478 | o->snapid = snap; | |
2479 | o->outbl = pbl; | |
2480 | ceph_tid_t tid; | |
2481 | op_submit(o, &tid); | |
2482 | return tid; | |
2483 | } | |
2484 | ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc, | |
2485 | uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, | |
2486 | int flags, Context *onfinish, version_t *objver = NULL, | |
2487 | ObjectOperation *extra_ops = NULL) { | |
2488 | vector<OSDOp> ops; | |
2489 | int i = init_ops(ops, 1, extra_ops); | |
2490 | ops[i].op.op = CEPH_OSD_OP_MAPEXT; | |
2491 | ops[i].op.extent.offset = off; | |
2492 | ops[i].op.extent.length = len; | |
2493 | ops[i].op.extent.truncate_size = 0; | |
2494 | ops[i].op.extent.truncate_seq = 0; | |
31f18b77 | 2495 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2496 | CEPH_OSD_FLAG_READ, onfinish, objver); |
2497 | o->snapid = snap; | |
2498 | o->outbl = pbl; | |
2499 | ceph_tid_t tid; | |
2500 | op_submit(o, &tid); | |
2501 | return tid; | |
2502 | } | |
2503 | ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc, | |
2504 | const char *name, snapid_t snap, bufferlist *pbl, int flags, | |
2505 | Context *onfinish, | |
2506 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2507 | vector<OSDOp> ops; | |
2508 | int i = init_ops(ops, 1, extra_ops); | |
2509 | ops[i].op.op = CEPH_OSD_OP_GETXATTR; | |
2510 | ops[i].op.xattr.name_len = (name ? strlen(name) : 0); | |
2511 | ops[i].op.xattr.value_len = 0; | |
2512 | if (name) | |
2513 | ops[i].indata.append(name); | |
31f18b77 | 2514 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2515 | CEPH_OSD_FLAG_READ, onfinish, objver); |
2516 | o->snapid = snap; | |
2517 | o->outbl = pbl; | |
2518 | ceph_tid_t tid; | |
2519 | op_submit(o, &tid); | |
2520 | return tid; | |
2521 | } | |
2522 | ||
2523 | ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, | |
2524 | snapid_t snap, map<string,bufferlist>& attrset, | |
2525 | int flags, Context *onfinish, version_t *objver = NULL, | |
2526 | ObjectOperation *extra_ops = NULL) { | |
2527 | vector<OSDOp> ops; | |
2528 | int i = init_ops(ops, 1, extra_ops); | |
2529 | ops[i].op.op = CEPH_OSD_OP_GETXATTRS; | |
2530 | C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish); | |
31f18b77 | 2531 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2532 | CEPH_OSD_FLAG_READ, fin, objver); |
2533 | o->snapid = snap; | |
2534 | o->outbl = &fin->bl; | |
2535 | ceph_tid_t tid; | |
2536 | op_submit(o, &tid); | |
2537 | return tid; | |
2538 | } | |
2539 | ||
2540 | ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc, | |
2541 | snapid_t snap, bufferlist *pbl, int flags, | |
2542 | Context *onfinish, version_t *objver = NULL, | |
2543 | ObjectOperation *extra_ops = NULL) { | |
31f18b77 | 2544 | return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags | |
7c673cae FG |
2545 | CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops); |
2546 | } | |
2547 | ||
2548 | ||
2549 | // writes | |
2550 | ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc, | |
2551 | vector<OSDOp>& ops, ceph::real_time mtime, | |
2552 | const SnapContext& snapc, int flags, | |
2553 | Context *oncommit, | |
2554 | version_t *objver = NULL) { | |
31f18b77 | 2555 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2556 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2557 | o->mtime = mtime; | |
2558 | o->snapc = snapc; | |
2559 | ceph_tid_t tid; | |
2560 | op_submit(o, &tid); | |
2561 | return tid; | |
2562 | } | |
2563 | Op *prepare_write_op( | |
2564 | const object_t& oid, const object_locator_t& oloc, | |
2565 | uint64_t off, uint64_t len, const SnapContext& snapc, | |
2566 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2567 | Context *oncommit, version_t *objver = NULL, | |
2568 | ObjectOperation *extra_ops = NULL, int op_flags = 0, | |
2569 | ZTracer::Trace *parent_trace = nullptr) { | |
2570 | vector<OSDOp> ops; | |
2571 | int i = init_ops(ops, 1, extra_ops); | |
2572 | ops[i].op.op = CEPH_OSD_OP_WRITE; | |
2573 | ops[i].op.extent.offset = off; | |
2574 | ops[i].op.extent.length = len; | |
2575 | ops[i].op.extent.truncate_size = 0; | |
2576 | ops[i].op.extent.truncate_seq = 0; | |
2577 | ops[i].indata = bl; | |
2578 | ops[i].op.flags = op_flags; | |
31f18b77 | 2579 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2580 | CEPH_OSD_FLAG_WRITE, oncommit, objver, |
2581 | nullptr, parent_trace); | |
2582 | o->mtime = mtime; | |
2583 | o->snapc = snapc; | |
2584 | return o; | |
2585 | } | |
2586 | ceph_tid_t write( | |
2587 | const object_t& oid, const object_locator_t& oloc, | |
2588 | uint64_t off, uint64_t len, const SnapContext& snapc, | |
2589 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2590 | Context *oncommit, version_t *objver = NULL, | |
2591 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2592 | Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags, | |
2593 | oncommit, objver, extra_ops, op_flags); | |
2594 | ceph_tid_t tid; | |
2595 | op_submit(o, &tid); | |
2596 | return tid; | |
2597 | } | |
2598 | Op *prepare_append_op( | |
2599 | const object_t& oid, const object_locator_t& oloc, | |
2600 | uint64_t len, const SnapContext& snapc, | |
2601 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2602 | Context *oncommit, | |
2603 | version_t *objver = NULL, | |
2604 | ObjectOperation *extra_ops = NULL) { | |
2605 | vector<OSDOp> ops; | |
2606 | int i = init_ops(ops, 1, extra_ops); | |
2607 | ops[i].op.op = CEPH_OSD_OP_APPEND; | |
2608 | ops[i].op.extent.offset = 0; | |
2609 | ops[i].op.extent.length = len; | |
2610 | ops[i].op.extent.truncate_size = 0; | |
2611 | ops[i].op.extent.truncate_seq = 0; | |
2612 | ops[i].indata = bl; | |
31f18b77 | 2613 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2614 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2615 | o->mtime = mtime; | |
2616 | o->snapc = snapc; | |
2617 | return o; | |
2618 | } | |
2619 | ceph_tid_t append( | |
2620 | const object_t& oid, const object_locator_t& oloc, | |
2621 | uint64_t len, const SnapContext& snapc, | |
2622 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2623 | Context *oncommit, | |
2624 | version_t *objver = NULL, | |
2625 | ObjectOperation *extra_ops = NULL) { | |
2626 | Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags, | |
2627 | oncommit, objver, extra_ops); | |
2628 | ceph_tid_t tid; | |
2629 | op_submit(o, &tid); | |
2630 | return tid; | |
2631 | } | |
2632 | ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc, | |
2633 | uint64_t off, uint64_t len, const SnapContext& snapc, | |
2634 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2635 | uint64_t trunc_size, __u32 trunc_seq, | |
2636 | Context *oncommit, | |
2637 | version_t *objver = NULL, | |
2638 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2639 | vector<OSDOp> ops; | |
2640 | int i = init_ops(ops, 1, extra_ops); | |
2641 | ops[i].op.op = CEPH_OSD_OP_WRITE; | |
2642 | ops[i].op.extent.offset = off; | |
2643 | ops[i].op.extent.length = len; | |
2644 | ops[i].op.extent.truncate_size = trunc_size; | |
2645 | ops[i].op.extent.truncate_seq = trunc_seq; | |
2646 | ops[i].indata = bl; | |
2647 | ops[i].op.flags = op_flags; | |
31f18b77 | 2648 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2649 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2650 | o->mtime = mtime; | |
2651 | o->snapc = snapc; | |
2652 | ceph_tid_t tid; | |
2653 | op_submit(o, &tid); | |
2654 | return tid; | |
2655 | } | |
2656 | Op *prepare_write_full_op( | |
2657 | const object_t& oid, const object_locator_t& oloc, | |
2658 | const SnapContext& snapc, const bufferlist &bl, | |
2659 | ceph::real_time mtime, int flags, | |
2660 | Context *oncommit, version_t *objver = NULL, | |
2661 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2662 | vector<OSDOp> ops; | |
2663 | int i = init_ops(ops, 1, extra_ops); | |
2664 | ops[i].op.op = CEPH_OSD_OP_WRITEFULL; | |
2665 | ops[i].op.extent.offset = 0; | |
2666 | ops[i].op.extent.length = bl.length(); | |
2667 | ops[i].indata = bl; | |
2668 | ops[i].op.flags = op_flags; | |
31f18b77 | 2669 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2670 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2671 | o->mtime = mtime; | |
2672 | o->snapc = snapc; | |
2673 | return o; | |
2674 | } | |
2675 | ceph_tid_t write_full( | |
2676 | const object_t& oid, const object_locator_t& oloc, | |
2677 | const SnapContext& snapc, const bufferlist &bl, | |
2678 | ceph::real_time mtime, int flags, | |
2679 | Context *oncommit, version_t *objver = NULL, | |
2680 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2681 | Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags, | |
2682 | oncommit, objver, extra_ops, op_flags); | |
2683 | ceph_tid_t tid; | |
2684 | op_submit(o, &tid); | |
2685 | return tid; | |
2686 | } | |
2687 | Op *prepare_writesame_op( | |
2688 | const object_t& oid, const object_locator_t& oloc, | |
2689 | uint64_t write_len, uint64_t off, | |
2690 | const SnapContext& snapc, const bufferlist &bl, | |
2691 | ceph::real_time mtime, int flags, | |
2692 | Context *oncommit, version_t *objver = NULL, | |
2693 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2694 | ||
2695 | vector<OSDOp> ops; | |
2696 | int i = init_ops(ops, 1, extra_ops); | |
2697 | ops[i].op.op = CEPH_OSD_OP_WRITESAME; | |
2698 | ops[i].op.writesame.offset = off; | |
2699 | ops[i].op.writesame.length = write_len; | |
2700 | ops[i].op.writesame.data_length = bl.length(); | |
2701 | ops[i].indata = bl; | |
2702 | ops[i].op.flags = op_flags; | |
31f18b77 | 2703 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2704 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2705 | o->mtime = mtime; | |
2706 | o->snapc = snapc; | |
2707 | return o; | |
2708 | } | |
2709 | ceph_tid_t writesame( | |
2710 | const object_t& oid, const object_locator_t& oloc, | |
2711 | uint64_t write_len, uint64_t off, | |
2712 | const SnapContext& snapc, const bufferlist &bl, | |
2713 | ceph::real_time mtime, int flags, | |
2714 | Context *oncommit, version_t *objver = NULL, | |
2715 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2716 | ||
2717 | Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl, | |
2718 | mtime, flags, oncommit, objver, | |
2719 | extra_ops, op_flags); | |
2720 | ||
2721 | ceph_tid_t tid; | |
2722 | op_submit(o, &tid); | |
2723 | return tid; | |
2724 | } | |
2725 | ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc, | |
2726 | const SnapContext& snapc, ceph::real_time mtime, int flags, | |
2727 | uint64_t trunc_size, __u32 trunc_seq, | |
2728 | Context *oncommit, version_t *objver = NULL, | |
2729 | ObjectOperation *extra_ops = NULL) { | |
2730 | vector<OSDOp> ops; | |
2731 | int i = init_ops(ops, 1, extra_ops); | |
2732 | ops[i].op.op = CEPH_OSD_OP_TRUNCATE; | |
2733 | ops[i].op.extent.offset = trunc_size; | |
2734 | ops[i].op.extent.truncate_size = trunc_size; | |
2735 | ops[i].op.extent.truncate_seq = trunc_seq; | |
31f18b77 | 2736 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2737 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2738 | o->mtime = mtime; | |
2739 | o->snapc = snapc; | |
2740 | ceph_tid_t tid; | |
2741 | op_submit(o, &tid); | |
2742 | return tid; | |
2743 | } | |
2744 | ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc, | |
2745 | uint64_t off, uint64_t len, const SnapContext& snapc, | |
2746 | ceph::real_time mtime, int flags, Context *oncommit, | |
2747 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2748 | vector<OSDOp> ops; | |
2749 | int i = init_ops(ops, 1, extra_ops); | |
2750 | ops[i].op.op = CEPH_OSD_OP_ZERO; | |
2751 | ops[i].op.extent.offset = off; | |
2752 | ops[i].op.extent.length = len; | |
31f18b77 | 2753 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2754 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2755 | o->mtime = mtime; | |
2756 | o->snapc = snapc; | |
2757 | ceph_tid_t tid; | |
2758 | op_submit(o, &tid); | |
2759 | return tid; | |
2760 | } | |
2761 | ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc, | |
2762 | const SnapContext& snapc, snapid_t snapid, | |
2763 | ceph::real_time mtime, Context *oncommit, | |
2764 | version_t *objver = NULL, | |
2765 | ObjectOperation *extra_ops = NULL) { | |
2766 | vector<OSDOp> ops; | |
2767 | int i = init_ops(ops, 1, extra_ops); | |
2768 | ops[i].op.op = CEPH_OSD_OP_ROLLBACK; | |
2769 | ops[i].op.snap.snapid = snapid; | |
2770 | Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver); | |
2771 | o->mtime = mtime; | |
2772 | o->snapc = snapc; | |
2773 | ceph_tid_t tid; | |
2774 | op_submit(o, &tid); | |
2775 | return tid; | |
2776 | } | |
2777 | ceph_tid_t create(const object_t& oid, const object_locator_t& oloc, | |
2778 | const SnapContext& snapc, ceph::real_time mtime, int global_flags, | |
2779 | int create_flags, Context *oncommit, | |
2780 | version_t *objver = NULL, | |
2781 | ObjectOperation *extra_ops = NULL) { | |
2782 | vector<OSDOp> ops; | |
2783 | int i = init_ops(ops, 1, extra_ops); | |
2784 | ops[i].op.op = CEPH_OSD_OP_CREATE; | |
2785 | ops[i].op.flags = create_flags; | |
31f18b77 | 2786 | Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags | |
7c673cae FG |
2787 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2788 | o->mtime = mtime; | |
2789 | o->snapc = snapc; | |
2790 | ceph_tid_t tid; | |
2791 | op_submit(o, &tid); | |
2792 | return tid; | |
2793 | } | |
2794 | Op *prepare_remove_op( | |
2795 | const object_t& oid, const object_locator_t& oloc, | |
2796 | const SnapContext& snapc, ceph::real_time mtime, int flags, | |
2797 | Context *oncommit, | |
2798 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2799 | vector<OSDOp> ops; | |
2800 | int i = init_ops(ops, 1, extra_ops); | |
2801 | ops[i].op.op = CEPH_OSD_OP_DELETE; | |
31f18b77 | 2802 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2803 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2804 | o->mtime = mtime; | |
2805 | o->snapc = snapc; | |
2806 | return o; | |
2807 | } | |
2808 | ceph_tid_t remove( | |
2809 | const object_t& oid, const object_locator_t& oloc, | |
2810 | const SnapContext& snapc, ceph::real_time mtime, int flags, | |
2811 | Context *oncommit, | |
2812 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2813 | Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags, | |
2814 | oncommit, objver, extra_ops); | |
2815 | ceph_tid_t tid; | |
2816 | op_submit(o, &tid); | |
2817 | return tid; | |
2818 | } | |
2819 | ||
2820 | ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc, | |
2821 | const char *name, const SnapContext& snapc, const bufferlist &bl, | |
2822 | ceph::real_time mtime, int flags, | |
2823 | Context *oncommit, | |
2824 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2825 | vector<OSDOp> ops; | |
2826 | int i = init_ops(ops, 1, extra_ops); | |
2827 | ops[i].op.op = CEPH_OSD_OP_SETXATTR; | |
2828 | ops[i].op.xattr.name_len = (name ? strlen(name) : 0); | |
2829 | ops[i].op.xattr.value_len = bl.length(); | |
2830 | if (name) | |
2831 | ops[i].indata.append(name); | |
2832 | ops[i].indata.append(bl); | |
31f18b77 | 2833 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2834 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2835 | o->mtime = mtime; | |
2836 | o->snapc = snapc; | |
2837 | ceph_tid_t tid; | |
2838 | op_submit(o, &tid); | |
2839 | return tid; | |
2840 | } | |
2841 | ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc, | |
2842 | const char *name, const SnapContext& snapc, | |
2843 | ceph::real_time mtime, int flags, | |
2844 | Context *oncommit, | |
2845 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2846 | vector<OSDOp> ops; | |
2847 | int i = init_ops(ops, 1, extra_ops); | |
2848 | ops[i].op.op = CEPH_OSD_OP_RMXATTR; | |
2849 | ops[i].op.xattr.name_len = (name ? strlen(name) : 0); | |
2850 | ops[i].op.xattr.value_len = 0; | |
2851 | if (name) | |
2852 | ops[i].indata.append(name); | |
31f18b77 | 2853 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2854 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2855 | o->mtime = mtime; | |
2856 | o->snapc = snapc; | |
2857 | ceph_tid_t tid; | |
2858 | op_submit(o, &tid); | |
2859 | return tid; | |
2860 | } | |
2861 | ||
2862 | void list_nobjects(NListContext *p, Context *onfinish); | |
2863 | uint32_t list_nobjects_seek(NListContext *p, uint32_t pos); | |
2864 | uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c); | |
2865 | void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c); | |
2866 | ||
2867 | hobject_t enumerate_objects_begin(); | |
2868 | hobject_t enumerate_objects_end(); | |
2869 | //hobject_t enumerate_objects_begin(int n, int m); | |
2870 | void enumerate_objects( | |
2871 | int64_t pool_id, | |
2872 | const std::string &ns, | |
2873 | const hobject_t &start, | |
2874 | const hobject_t &end, | |
2875 | const uint32_t max, | |
2876 | const bufferlist &filter_bl, | |
2877 | std::list<librados::ListObjectImpl> *result, | |
2878 | hobject_t *next, | |
2879 | Context *on_finish); | |
2880 | ||
2881 | void _enumerate_reply( | |
2882 | bufferlist &bl, | |
2883 | int r, | |
2884 | const hobject_t &end, | |
2885 | const int64_t pool_id, | |
2886 | int budget, | |
2887 | epoch_t reply_epoch, | |
2888 | std::list<librados::ListObjectImpl> *result, | |
2889 | hobject_t *next, | |
2890 | Context *on_finish); | |
2891 | friend class C_EnumerateReply; | |
2892 | ||
2893 | // ------------------------- | |
2894 | // pool ops | |
2895 | private: | |
2896 | void pool_op_submit(PoolOp *op); | |
2897 | void _pool_op_submit(PoolOp *op); | |
2898 | void _finish_pool_op(PoolOp *op, int r); | |
2899 | void _do_delete_pool(int64_t pool, Context *onfinish); | |
2900 | public: | |
2901 | int create_pool_snap(int64_t pool, string& snapName, Context *onfinish); | |
2902 | int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, | |
2903 | Context *onfinish); | |
2904 | int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish); | |
2905 | int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish); | |
2906 | ||
2907 | int create_pool(string& name, Context *onfinish, uint64_t auid=0, | |
2908 | int crush_rule=-1); | |
2909 | int delete_pool(int64_t pool, Context *onfinish); | |
2910 | int delete_pool(const string& name, Context *onfinish); | |
2911 | int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid); | |
2912 | ||
2913 | void handle_pool_op_reply(MPoolOpReply *m); | |
2914 | int pool_op_cancel(ceph_tid_t tid, int r); | |
2915 | ||
2916 | // -------------------------- | |
2917 | // pool stats | |
2918 | private: | |
2919 | void _poolstat_submit(PoolStatOp *op); | |
2920 | public: | |
2921 | void handle_get_pool_stats_reply(MGetPoolStatsReply *m); | |
2922 | void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result, | |
2923 | Context *onfinish); | |
2924 | int pool_stat_op_cancel(ceph_tid_t tid, int r); | |
2925 | void _finish_pool_stat_op(PoolStatOp *op, int r); | |
2926 | ||
2927 | // --------------------------- | |
2928 | // df stats | |
2929 | private: | |
2930 | void _fs_stats_submit(StatfsOp *op); | |
2931 | public: | |
2932 | void handle_fs_stats_reply(MStatfsReply *m); | |
d2e6a577 FG |
2933 | void get_fs_stats(struct ceph_statfs& result, boost::optional<int64_t> poolid, |
2934 | Context *onfinish); | |
7c673cae FG |
2935 | int statfs_op_cancel(ceph_tid_t tid, int r); |
2936 | void _finish_statfs_op(StatfsOp *op, int r); | |
2937 | ||
2938 | // --------------------------- | |
2939 | // some scatter/gather hackery | |
2940 | ||
2941 | void _sg_read_finish(vector<ObjectExtent>& extents, | |
2942 | vector<bufferlist>& resultbl, | |
2943 | bufferlist *bl, Context *onfinish); | |
2944 | ||
2945 | struct C_SGRead : public Context { | |
2946 | Objecter *objecter; | |
2947 | vector<ObjectExtent> extents; | |
2948 | vector<bufferlist> resultbl; | |
2949 | bufferlist *bl; | |
2950 | Context *onfinish; | |
2951 | C_SGRead(Objecter *ob, | |
2952 | vector<ObjectExtent>& e, vector<bufferlist>& r, bufferlist *b, | |
2953 | Context *c) : | |
2954 | objecter(ob), bl(b), onfinish(c) { | |
2955 | extents.swap(e); | |
2956 | resultbl.swap(r); | |
2957 | } | |
2958 | void finish(int r) override { | |
2959 | objecter->_sg_read_finish(extents, resultbl, bl, onfinish); | |
2960 | } | |
2961 | }; | |
2962 | ||
2963 | void sg_read_trunc(vector<ObjectExtent>& extents, snapid_t snap, | |
2964 | bufferlist *bl, int flags, uint64_t trunc_size, | |
2965 | __u32 trunc_seq, Context *onfinish, int op_flags = 0) { | |
2966 | if (extents.size() == 1) { | |
2967 | read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, | |
2968 | extents[0].length, snap, bl, flags, extents[0].truncate_size, | |
2969 | trunc_seq, onfinish, 0, 0, op_flags); | |
2970 | } else { | |
2971 | C_GatherBuilder gather(cct); | |
2972 | vector<bufferlist> resultbl(extents.size()); | |
2973 | int i=0; | |
2974 | for (vector<ObjectExtent>::iterator p = extents.begin(); | |
2975 | p != extents.end(); | |
2976 | ++p) { | |
2977 | read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++], | |
2978 | flags, p->truncate_size, trunc_seq, gather.new_sub(), | |
2979 | 0, 0, op_flags); | |
2980 | } | |
2981 | gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish)); | |
2982 | gather.activate(); | |
2983 | } | |
2984 | } | |
2985 | ||
2986 | void sg_read(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl, | |
2987 | int flags, Context *onfinish, int op_flags = 0) { | |
2988 | sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags); | |
2989 | } | |
2990 | ||
2991 | void sg_write_trunc(vector<ObjectExtent>& extents, const SnapContext& snapc, | |
2992 | const bufferlist& bl, ceph::real_time mtime, int flags, | |
2993 | uint64_t trunc_size, __u32 trunc_seq, | |
2994 | Context *oncommit, int op_flags = 0) { | |
2995 | if (extents.size() == 1) { | |
2996 | write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, | |
2997 | extents[0].length, snapc, bl, mtime, flags, | |
2998 | extents[0].truncate_size, trunc_seq, oncommit, | |
2999 | 0, 0, op_flags); | |
3000 | } else { | |
3001 | C_GatherBuilder gcom(cct, oncommit); | |
3002 | for (vector<ObjectExtent>::iterator p = extents.begin(); | |
3003 | p != extents.end(); | |
3004 | ++p) { | |
3005 | bufferlist cur; | |
3006 | for (vector<pair<uint64_t,uint64_t> >::iterator bit | |
3007 | = p->buffer_extents.begin(); | |
3008 | bit != p->buffer_extents.end(); | |
3009 | ++bit) | |
3010 | bl.copy(bit->first, bit->second, cur); | |
3011 | assert(cur.length() == p->length); | |
3012 | write_trunc(p->oid, p->oloc, p->offset, p->length, | |
3013 | snapc, cur, mtime, flags, p->truncate_size, trunc_seq, | |
3014 | oncommit ? gcom.new_sub():0, | |
3015 | 0, 0, op_flags); | |
3016 | } | |
3017 | gcom.activate(); | |
3018 | } | |
3019 | } | |
3020 | ||
3021 | void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc, | |
3022 | const bufferlist& bl, ceph::real_time mtime, int flags, | |
3023 | Context *oncommit, int op_flags = 0) { | |
3024 | sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit, | |
3025 | op_flags); | |
3026 | } | |
3027 | ||
3028 | void ms_handle_connect(Connection *con) override; | |
3029 | bool ms_handle_reset(Connection *con) override; | |
3030 | void ms_handle_remote_reset(Connection *con) override; | |
3031 | bool ms_handle_refused(Connection *con) override; | |
3032 | bool ms_get_authorizer(int dest_type, | |
3033 | AuthAuthorizer **authorizer, | |
3034 | bool force_new) override; | |
3035 | ||
3036 | void blacklist_self(bool set); | |
3037 | ||
3038 | private: | |
3039 | epoch_t epoch_barrier; | |
3040 | bool retry_writes_after_first_reply; | |
3041 | public: | |
3042 | void set_epoch_barrier(epoch_t epoch); | |
3043 | ||
3044 | PerfCounters *get_logger() { | |
3045 | return logger; | |
3046 | } | |
3047 | }; | |
3048 | ||
3049 | #endif |