]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_OBJECTER_H | |
16 | #define CEPH_OBJECTER_H | |
17 | ||
18 | #include <condition_variable> | |
19 | #include <list> | |
20 | #include <map> | |
21 | #include <mutex> | |
22 | #include <memory> | |
23 | #include <sstream> | |
24 | #include <type_traits> | |
25 | ||
26 | #include <boost/thread/shared_mutex.hpp> | |
27 | ||
28 | #include "include/assert.h" | |
29 | #include "include/buffer.h" | |
30 | #include "include/types.h" | |
31 | #include "include/rados/rados_types.hpp" | |
32 | ||
33 | #include "common/admin_socket.h" | |
34 | #include "common/ceph_time.h" | |
35 | #include "common/ceph_timer.h" | |
36 | #include "common/Finisher.h" | |
37 | #include "common/shunique_lock.h" | |
38 | #include "common/zipkin_trace.h" | |
39 | ||
40 | #include "messages/MOSDOp.h" | |
41 | #include "osd/OSDMap.h" | |
42 | ||
43 | using namespace std; | |
44 | ||
45 | class Context; | |
46 | class Messenger; | |
47 | class OSDMap; | |
48 | class MonClient; | |
49 | class Message; | |
50 | class Finisher; | |
51 | ||
52 | class MPoolOpReply; | |
53 | ||
54 | class MGetPoolStatsReply; | |
55 | class MStatfsReply; | |
56 | class MCommandReply; | |
57 | class MWatchNotify; | |
58 | ||
59 | class PerfCounters; | |
60 | ||
61 | // ----------------------------------------- | |
62 | ||
63 | struct ObjectOperation { | |
64 | vector<OSDOp> ops; | |
65 | int flags; | |
66 | int priority; | |
67 | ||
68 | vector<bufferlist*> out_bl; | |
69 | vector<Context*> out_handler; | |
70 | vector<int*> out_rval; | |
71 | ||
72 | ObjectOperation() : flags(0), priority(0) {} | |
73 | ~ObjectOperation() { | |
74 | while (!out_handler.empty()) { | |
75 | delete out_handler.back(); | |
76 | out_handler.pop_back(); | |
77 | } | |
78 | } | |
79 | ||
80 | size_t size() { | |
81 | return ops.size(); | |
82 | } | |
83 | ||
84 | void set_last_op_flags(int flags) { | |
85 | assert(!ops.empty()); | |
86 | ops.rbegin()->op.flags = flags; | |
87 | } | |
88 | ||
89 | class C_TwoContexts; | |
90 | /** | |
91 | * Add a callback to run when this operation completes, | |
92 | * after any other callbacks for it. | |
93 | */ | |
94 | void add_handler(Context *extra); | |
95 | ||
96 | OSDOp& add_op(int op) { | |
97 | int s = ops.size(); | |
98 | ops.resize(s+1); | |
99 | ops[s].op.op = op; | |
100 | out_bl.resize(s+1); | |
101 | out_bl[s] = NULL; | |
102 | out_handler.resize(s+1); | |
103 | out_handler[s] = NULL; | |
104 | out_rval.resize(s+1); | |
105 | out_rval[s] = NULL; | |
106 | return ops[s]; | |
107 | } | |
108 | void add_data(int op, uint64_t off, uint64_t len, bufferlist& bl) { | |
109 | OSDOp& osd_op = add_op(op); | |
110 | osd_op.op.extent.offset = off; | |
111 | osd_op.op.extent.length = len; | |
112 | osd_op.indata.claim_append(bl); | |
113 | } | |
114 | void add_writesame(int op, uint64_t off, uint64_t write_len, | |
115 | bufferlist& bl) { | |
116 | OSDOp& osd_op = add_op(op); | |
117 | osd_op.op.writesame.offset = off; | |
118 | osd_op.op.writesame.length = write_len; | |
119 | osd_op.op.writesame.data_length = bl.length(); | |
120 | osd_op.indata.claim_append(bl); | |
121 | } | |
122 | void add_xattr(int op, const char *name, const bufferlist& data) { | |
123 | OSDOp& osd_op = add_op(op); | |
124 | osd_op.op.xattr.name_len = (name ? strlen(name) : 0); | |
125 | osd_op.op.xattr.value_len = data.length(); | |
126 | if (name) | |
127 | osd_op.indata.append(name); | |
128 | osd_op.indata.append(data); | |
129 | } | |
130 | void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, | |
131 | uint8_t cmp_mode, const bufferlist& data) { | |
132 | OSDOp& osd_op = add_op(op); | |
133 | osd_op.op.xattr.name_len = (name ? strlen(name) : 0); | |
134 | osd_op.op.xattr.value_len = data.length(); | |
135 | osd_op.op.xattr.cmp_op = cmp_op; | |
136 | osd_op.op.xattr.cmp_mode = cmp_mode; | |
137 | if (name) | |
138 | osd_op.indata.append(name); | |
139 | osd_op.indata.append(data); | |
140 | } | |
141 | void add_call(int op, const char *cname, const char *method, | |
142 | bufferlist &indata, | |
143 | bufferlist *outbl, Context *ctx, int *prval) { | |
144 | OSDOp& osd_op = add_op(op); | |
145 | ||
146 | unsigned p = ops.size() - 1; | |
147 | out_handler[p] = ctx; | |
148 | out_bl[p] = outbl; | |
149 | out_rval[p] = prval; | |
150 | ||
151 | osd_op.op.cls.class_len = strlen(cname); | |
152 | osd_op.op.cls.method_len = strlen(method); | |
153 | osd_op.op.cls.indata_len = indata.length(); | |
154 | osd_op.indata.append(cname, osd_op.op.cls.class_len); | |
155 | osd_op.indata.append(method, osd_op.op.cls.method_len); | |
156 | osd_op.indata.append(indata); | |
157 | } | |
158 | void add_pgls(int op, uint64_t count, collection_list_handle_t cookie, | |
159 | epoch_t start_epoch) { | |
160 | OSDOp& osd_op = add_op(op); | |
161 | osd_op.op.pgls.count = count; | |
162 | osd_op.op.pgls.start_epoch = start_epoch; | |
163 | ::encode(cookie, osd_op.indata); | |
164 | } | |
165 | void add_pgls_filter(int op, uint64_t count, const bufferlist& filter, | |
166 | collection_list_handle_t cookie, epoch_t start_epoch) { | |
167 | OSDOp& osd_op = add_op(op); | |
168 | osd_op.op.pgls.count = count; | |
169 | osd_op.op.pgls.start_epoch = start_epoch; | |
170 | string cname = "pg"; | |
171 | string mname = "filter"; | |
172 | ::encode(cname, osd_op.indata); | |
173 | ::encode(mname, osd_op.indata); | |
174 | osd_op.indata.append(filter); | |
175 | ::encode(cookie, osd_op.indata); | |
176 | } | |
177 | void add_alloc_hint(int op, uint64_t expected_object_size, | |
178 | uint64_t expected_write_size, | |
179 | uint32_t flags) { | |
180 | OSDOp& osd_op = add_op(op); | |
181 | osd_op.op.alloc_hint.expected_object_size = expected_object_size; | |
182 | osd_op.op.alloc_hint.expected_write_size = expected_write_size; | |
183 | osd_op.op.alloc_hint.flags = flags; | |
184 | } | |
185 | ||
186 | // ------ | |
187 | ||
188 | // pg | |
189 | void pg_ls(uint64_t count, bufferlist& filter, | |
190 | collection_list_handle_t cookie, epoch_t start_epoch) { | |
191 | if (filter.length() == 0) | |
192 | add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch); | |
193 | else | |
194 | add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie, | |
195 | start_epoch); | |
196 | flags |= CEPH_OSD_FLAG_PGOP; | |
197 | } | |
198 | ||
199 | void pg_nls(uint64_t count, const bufferlist& filter, | |
200 | collection_list_handle_t cookie, epoch_t start_epoch) { | |
201 | if (filter.length() == 0) | |
202 | add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch); | |
203 | else | |
204 | add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie, | |
205 | start_epoch); | |
206 | flags |= CEPH_OSD_FLAG_PGOP; | |
207 | } | |
208 | ||
209 | void scrub_ls(const librados::object_id_t& start_after, | |
210 | uint64_t max_to_get, | |
211 | std::vector<librados::inconsistent_obj_t> *objects, | |
212 | uint32_t *interval, | |
213 | int *rval); | |
214 | void scrub_ls(const librados::object_id_t& start_after, | |
215 | uint64_t max_to_get, | |
216 | std::vector<librados::inconsistent_snapset_t> *objects, | |
217 | uint32_t *interval, | |
218 | int *rval); | |
219 | ||
220 | void create(bool excl) { | |
221 | OSDOp& o = add_op(CEPH_OSD_OP_CREATE); | |
222 | o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0); | |
223 | } | |
224 | ||
225 | struct C_ObjectOperation_stat : public Context { | |
226 | bufferlist bl; | |
227 | uint64_t *psize; | |
228 | ceph::real_time *pmtime; | |
229 | time_t *ptime; | |
230 | struct timespec *pts; | |
231 | int *prval; | |
232 | C_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts, | |
233 | int *prval) | |
234 | : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval) {} | |
235 | void finish(int r) override { | |
236 | if (r >= 0) { | |
237 | bufferlist::iterator p = bl.begin(); | |
238 | try { | |
239 | uint64_t size; | |
240 | ceph::real_time mtime; | |
241 | ::decode(size, p); | |
242 | ::decode(mtime, p); | |
243 | if (psize) | |
244 | *psize = size; | |
245 | if (pmtime) | |
246 | *pmtime = mtime; | |
247 | if (ptime) | |
248 | *ptime = ceph::real_clock::to_time_t(mtime); | |
249 | if (pts) | |
250 | *pts = ceph::real_clock::to_timespec(mtime); | |
251 | } catch (buffer::error& e) { | |
252 | if (prval) | |
253 | *prval = -EIO; | |
254 | } | |
255 | } | |
256 | } | |
257 | }; | |
258 | void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) { | |
259 | add_op(CEPH_OSD_OP_STAT); | |
260 | unsigned p = ops.size() - 1; | |
261 | C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, pmtime, NULL, NULL, | |
262 | prval); | |
263 | out_bl[p] = &h->bl; | |
264 | out_handler[p] = h; | |
265 | out_rval[p] = prval; | |
266 | } | |
267 | void stat(uint64_t *psize, time_t *ptime, int *prval) { | |
268 | add_op(CEPH_OSD_OP_STAT); | |
269 | unsigned p = ops.size() - 1; | |
270 | C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, ptime, NULL, | |
271 | prval); | |
272 | out_bl[p] = &h->bl; | |
273 | out_handler[p] = h; | |
274 | out_rval[p] = prval; | |
275 | } | |
276 | void stat(uint64_t *psize, struct timespec *pts, int *prval) { | |
277 | add_op(CEPH_OSD_OP_STAT); | |
278 | unsigned p = ops.size() - 1; | |
279 | C_ObjectOperation_stat *h = new C_ObjectOperation_stat(psize, NULL, NULL, pts, | |
280 | prval); | |
281 | out_bl[p] = &h->bl; | |
282 | out_handler[p] = h; | |
283 | out_rval[p] = prval; | |
284 | } | |
285 | // object cmpext | |
286 | struct C_ObjectOperation_cmpext : public Context { | |
287 | int *prval; | |
288 | C_ObjectOperation_cmpext(int *prval) | |
289 | : prval(prval) {} | |
290 | ||
291 | void finish(int r) { | |
292 | if (prval) | |
293 | *prval = r; | |
294 | } | |
295 | }; | |
296 | ||
297 | void cmpext(uint64_t off, bufferlist& cmp_bl, int *prval) { | |
298 | add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl); | |
299 | unsigned p = ops.size() - 1; | |
300 | C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval); | |
301 | out_handler[p] = h; | |
302 | out_rval[p] = prval; | |
303 | } | |
304 | ||
305 | // Used by C API | |
306 | void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) { | |
307 | bufferlist cmp_bl; | |
308 | cmp_bl.append(cmp_buf, cmp_len); | |
309 | add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl); | |
310 | unsigned p = ops.size() - 1; | |
311 | C_ObjectOperation_cmpext *h = new C_ObjectOperation_cmpext(prval); | |
312 | out_handler[p] = h; | |
313 | out_rval[p] = prval; | |
314 | } | |
315 | ||
316 | void read(uint64_t off, uint64_t len, bufferlist *pbl, int *prval, | |
317 | Context* ctx) { | |
318 | bufferlist bl; | |
319 | add_data(CEPH_OSD_OP_READ, off, len, bl); | |
320 | unsigned p = ops.size() - 1; | |
321 | out_bl[p] = pbl; | |
322 | out_rval[p] = prval; | |
323 | out_handler[p] = ctx; | |
324 | } | |
325 | ||
326 | struct C_ObjectOperation_sparse_read : public Context { | |
327 | bufferlist bl; | |
328 | bufferlist *data_bl; | |
329 | std::map<uint64_t, uint64_t> *extents; | |
330 | int *prval; | |
331 | C_ObjectOperation_sparse_read(bufferlist *data_bl, | |
332 | std::map<uint64_t, uint64_t> *extents, | |
333 | int *prval) | |
334 | : data_bl(data_bl), extents(extents), prval(prval) {} | |
335 | void finish(int r) override { | |
336 | bufferlist::iterator iter = bl.begin(); | |
337 | if (r >= 0) { | |
b32b8144 FG |
338 | // NOTE: it's possible the sub-op has not been executed but the result |
339 | // code remains zeroed. Avoid the costly exception handling on a | |
340 | // potential IO path. | |
341 | if (bl.length() > 0) { | |
342 | try { | |
343 | ::decode(*extents, iter); | |
344 | ::decode(*data_bl, iter); | |
345 | } catch (buffer::error& e) { | |
346 | if (prval) | |
347 | *prval = -EIO; | |
348 | } | |
349 | } else if (prval) { | |
350 | *prval = -EIO; | |
351 | } | |
7c673cae FG |
352 | } |
353 | } | |
354 | }; | |
355 | void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t,uint64_t> *m, | |
356 | bufferlist *data_bl, int *prval) { | |
357 | bufferlist bl; | |
358 | add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl); | |
359 | unsigned p = ops.size() - 1; | |
360 | C_ObjectOperation_sparse_read *h = | |
361 | new C_ObjectOperation_sparse_read(data_bl, m, prval); | |
362 | out_bl[p] = &h->bl; | |
363 | out_handler[p] = h; | |
364 | out_rval[p] = prval; | |
365 | } | |
366 | void write(uint64_t off, bufferlist& bl, | |
367 | uint64_t truncate_size, | |
368 | uint32_t truncate_seq) { | |
369 | add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl); | |
370 | OSDOp& o = *ops.rbegin(); | |
371 | o.op.extent.truncate_size = truncate_size; | |
372 | o.op.extent.truncate_seq = truncate_seq; | |
373 | } | |
374 | void write(uint64_t off, bufferlist& bl) { | |
375 | write(off, bl, 0, 0); | |
376 | } | |
377 | void write_full(bufferlist& bl) { | |
378 | add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl); | |
379 | } | |
380 | void writesame(uint64_t off, uint64_t write_len, bufferlist& bl) { | |
381 | add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl); | |
382 | } | |
383 | void append(bufferlist& bl) { | |
384 | add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl); | |
385 | } | |
386 | void zero(uint64_t off, uint64_t len) { | |
387 | bufferlist bl; | |
388 | add_data(CEPH_OSD_OP_ZERO, off, len, bl); | |
389 | } | |
390 | void truncate(uint64_t off) { | |
391 | bufferlist bl; | |
392 | add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl); | |
393 | } | |
394 | void remove() { | |
395 | bufferlist bl; | |
396 | add_data(CEPH_OSD_OP_DELETE, 0, 0, bl); | |
397 | } | |
398 | void mapext(uint64_t off, uint64_t len) { | |
399 | bufferlist bl; | |
400 | add_data(CEPH_OSD_OP_MAPEXT, off, len, bl); | |
401 | } | |
402 | void sparse_read(uint64_t off, uint64_t len) { | |
403 | bufferlist bl; | |
404 | add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl); | |
405 | } | |
406 | ||
407 | void checksum(uint8_t type, const bufferlist &init_value_bl, | |
408 | uint64_t off, uint64_t len, size_t chunk_size, | |
409 | bufferlist *pbl, int *prval, Context *ctx) { | |
410 | OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM); | |
411 | osd_op.op.checksum.offset = off; | |
412 | osd_op.op.checksum.length = len; | |
413 | osd_op.op.checksum.type = type; | |
414 | osd_op.op.checksum.chunk_size = chunk_size; | |
415 | osd_op.indata.append(init_value_bl); | |
416 | ||
417 | unsigned p = ops.size() - 1; | |
418 | out_bl[p] = pbl; | |
419 | out_rval[p] = prval; | |
420 | out_handler[p] = ctx; | |
421 | } | |
422 | ||
423 | // object attrs | |
424 | void getxattr(const char *name, bufferlist *pbl, int *prval) { | |
425 | bufferlist bl; | |
426 | add_xattr(CEPH_OSD_OP_GETXATTR, name, bl); | |
427 | unsigned p = ops.size() - 1; | |
428 | out_bl[p] = pbl; | |
429 | out_rval[p] = prval; | |
430 | } | |
431 | struct C_ObjectOperation_decodevals : public Context { | |
432 | uint64_t max_entries; | |
433 | bufferlist bl; | |
434 | std::map<std::string,bufferlist> *pattrs; | |
435 | bool *ptruncated; | |
436 | int *prval; | |
437 | C_ObjectOperation_decodevals(uint64_t m, std::map<std::string,bufferlist> *pa, | |
438 | bool *pt, int *pr) | |
439 | : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) { | |
440 | if (ptruncated) { | |
441 | *ptruncated = false; | |
442 | } | |
443 | } | |
444 | void finish(int r) override { | |
445 | if (r >= 0) { | |
446 | bufferlist::iterator p = bl.begin(); | |
447 | try { | |
448 | if (pattrs) | |
449 | ::decode(*pattrs, p); | |
450 | if (ptruncated) { | |
451 | std::map<std::string,bufferlist> ignore; | |
452 | if (!pattrs) { | |
453 | ::decode(ignore, p); | |
454 | pattrs = &ignore; | |
455 | } | |
456 | if (!p.end()) { | |
457 | ::decode(*ptruncated, p); | |
458 | } else { | |
459 | // the OSD did not provide this. since old OSDs do not | |
460 | // enfoce omap result limits either, we can infer it from | |
461 | // the size of the result | |
462 | *ptruncated = (pattrs->size() == max_entries); | |
463 | } | |
464 | } | |
465 | } | |
466 | catch (buffer::error& e) { | |
467 | if (prval) | |
468 | *prval = -EIO; | |
469 | } | |
470 | } | |
471 | } | |
472 | }; | |
473 | struct C_ObjectOperation_decodekeys : public Context { | |
474 | uint64_t max_entries; | |
475 | bufferlist bl; | |
476 | std::set<std::string> *pattrs; | |
477 | bool *ptruncated; | |
478 | int *prval; | |
479 | C_ObjectOperation_decodekeys(uint64_t m, std::set<std::string> *pa, bool *pt, | |
480 | int *pr) | |
481 | : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr) { | |
482 | if (ptruncated) { | |
483 | *ptruncated = false; | |
484 | } | |
485 | } | |
486 | void finish(int r) override { | |
487 | if (r >= 0) { | |
488 | bufferlist::iterator p = bl.begin(); | |
489 | try { | |
490 | if (pattrs) | |
491 | ::decode(*pattrs, p); | |
492 | if (ptruncated) { | |
493 | std::set<std::string> ignore; | |
494 | if (!pattrs) { | |
495 | ::decode(ignore, p); | |
496 | pattrs = &ignore; | |
497 | } | |
498 | if (!p.end()) { | |
499 | ::decode(*ptruncated, p); | |
500 | } else { | |
501 | // the OSD did not provide this. since old OSDs do not | |
502 | // enfoce omap result limits either, we can infer it from | |
503 | // the size of the result | |
504 | *ptruncated = (pattrs->size() == max_entries); | |
505 | } | |
506 | } | |
507 | } | |
508 | catch (buffer::error& e) { | |
509 | if (prval) | |
510 | *prval = -EIO; | |
511 | } | |
512 | } | |
513 | } | |
514 | }; | |
515 | struct C_ObjectOperation_decodewatchers : public Context { | |
516 | bufferlist bl; | |
517 | list<obj_watch_t> *pwatchers; | |
518 | int *prval; | |
519 | C_ObjectOperation_decodewatchers(list<obj_watch_t> *pw, int *pr) | |
520 | : pwatchers(pw), prval(pr) {} | |
521 | void finish(int r) override { | |
522 | if (r >= 0) { | |
523 | bufferlist::iterator p = bl.begin(); | |
524 | try { | |
525 | obj_list_watch_response_t resp; | |
526 | ::decode(resp, p); | |
527 | if (pwatchers) { | |
528 | for (list<watch_item_t>::iterator i = resp.entries.begin() ; | |
529 | i != resp.entries.end() ; ++i) { | |
530 | obj_watch_t ow; | |
531 | ostringstream sa; | |
532 | sa << i->addr; | |
533 | strncpy(ow.addr, sa.str().c_str(), 256); | |
534 | ow.watcher_id = i->name.num(); | |
535 | ow.cookie = i->cookie; | |
536 | ow.timeout_seconds = i->timeout_seconds; | |
537 | pwatchers->push_back(ow); | |
538 | } | |
539 | } | |
540 | } | |
541 | catch (buffer::error& e) { | |
542 | if (prval) | |
543 | *prval = -EIO; | |
544 | } | |
545 | } | |
546 | } | |
547 | }; | |
548 | struct C_ObjectOperation_decodesnaps : public Context { | |
549 | bufferlist bl; | |
550 | librados::snap_set_t *psnaps; | |
551 | int *prval; | |
552 | C_ObjectOperation_decodesnaps(librados::snap_set_t *ps, int *pr) | |
553 | : psnaps(ps), prval(pr) {} | |
554 | void finish(int r) override { | |
555 | if (r >= 0) { | |
556 | bufferlist::iterator p = bl.begin(); | |
557 | try { | |
558 | obj_list_snap_response_t resp; | |
559 | ::decode(resp, p); | |
560 | if (psnaps) { | |
561 | psnaps->clones.clear(); | |
562 | for (vector<clone_info>::iterator ci = resp.clones.begin(); | |
563 | ci != resp.clones.end(); | |
564 | ++ci) { | |
565 | librados::clone_info_t clone; | |
566 | ||
567 | clone.cloneid = ci->cloneid; | |
568 | clone.snaps.reserve(ci->snaps.size()); | |
569 | clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(), | |
570 | ci->snaps.end()); | |
571 | clone.overlap = ci->overlap; | |
572 | clone.size = ci->size; | |
573 | ||
574 | psnaps->clones.push_back(clone); | |
575 | } | |
576 | psnaps->seq = resp.seq; | |
577 | } | |
578 | } catch (buffer::error& e) { | |
579 | if (prval) | |
580 | *prval = -EIO; | |
581 | } | |
582 | } | |
583 | } | |
584 | }; | |
585 | void getxattrs(std::map<std::string,bufferlist> *pattrs, int *prval) { | |
586 | add_op(CEPH_OSD_OP_GETXATTRS); | |
587 | if (pattrs || prval) { | |
588 | unsigned p = ops.size() - 1; | |
589 | C_ObjectOperation_decodevals *h | |
590 | = new C_ObjectOperation_decodevals(0, pattrs, nullptr, prval); | |
591 | out_handler[p] = h; | |
592 | out_bl[p] = &h->bl; | |
593 | out_rval[p] = prval; | |
594 | } | |
595 | } | |
596 | void setxattr(const char *name, const bufferlist& bl) { | |
597 | add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); | |
598 | } | |
599 | void setxattr(const char *name, const string& s) { | |
600 | bufferlist bl; | |
601 | bl.append(s); | |
602 | add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); | |
603 | } | |
604 | void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, | |
605 | const bufferlist& bl) { | |
606 | add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl); | |
607 | } | |
608 | void rmxattr(const char *name) { | |
609 | bufferlist bl; | |
610 | add_xattr(CEPH_OSD_OP_RMXATTR, name, bl); | |
611 | } | |
612 | void setxattrs(map<string, bufferlist>& attrs) { | |
613 | bufferlist bl; | |
614 | ::encode(attrs, bl); | |
615 | add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length()); | |
616 | } | |
617 | void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) { | |
618 | bufferlist bl; | |
619 | ::encode(attrs, bl); | |
620 | add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl); | |
621 | } | |
622 | ||
623 | // trivialmap | |
624 | void tmap_update(bufferlist& bl) { | |
625 | add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl); | |
626 | } | |
627 | void tmap_put(bufferlist& bl) { | |
628 | add_data(CEPH_OSD_OP_TMAPPUT, 0, bl.length(), bl); | |
629 | } | |
630 | void tmap_get(bufferlist *pbl, int *prval) { | |
631 | add_op(CEPH_OSD_OP_TMAPGET); | |
632 | unsigned p = ops.size() - 1; | |
633 | out_bl[p] = pbl; | |
634 | out_rval[p] = prval; | |
635 | } | |
636 | void tmap_get() { | |
637 | add_op(CEPH_OSD_OP_TMAPGET); | |
638 | } | |
639 | void tmap_to_omap(bool nullok=false) { | |
640 | OSDOp& osd_op = add_op(CEPH_OSD_OP_TMAP2OMAP); | |
641 | if (nullok) | |
642 | osd_op.op.tmap2omap.flags = CEPH_OSD_TMAP2OMAP_NULLOK; | |
643 | } | |
644 | ||
645 | // objectmap | |
646 | void omap_get_keys(const string &start_after, | |
647 | uint64_t max_to_get, | |
648 | std::set<std::string> *out_set, | |
649 | bool *ptruncated, | |
650 | int *prval) { | |
651 | OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS); | |
652 | bufferlist bl; | |
653 | ::encode(start_after, bl); | |
654 | ::encode(max_to_get, bl); | |
655 | op.op.extent.offset = 0; | |
656 | op.op.extent.length = bl.length(); | |
657 | op.indata.claim_append(bl); | |
658 | if (prval || ptruncated || out_set) { | |
659 | unsigned p = ops.size() - 1; | |
660 | C_ObjectOperation_decodekeys *h = | |
661 | new C_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval); | |
662 | out_handler[p] = h; | |
663 | out_bl[p] = &h->bl; | |
664 | out_rval[p] = prval; | |
665 | } | |
666 | } | |
667 | ||
668 | void omap_get_vals(const string &start_after, | |
669 | const string &filter_prefix, | |
670 | uint64_t max_to_get, | |
671 | std::map<std::string, bufferlist> *out_set, | |
672 | bool *ptruncated, | |
673 | int *prval) { | |
674 | OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS); | |
675 | bufferlist bl; | |
676 | ::encode(start_after, bl); | |
677 | ::encode(max_to_get, bl); | |
678 | ::encode(filter_prefix, bl); | |
679 | op.op.extent.offset = 0; | |
680 | op.op.extent.length = bl.length(); | |
681 | op.indata.claim_append(bl); | |
682 | if (prval || out_set || ptruncated) { | |
683 | unsigned p = ops.size() - 1; | |
684 | C_ObjectOperation_decodevals *h = | |
685 | new C_ObjectOperation_decodevals(max_to_get, out_set, ptruncated, prval); | |
686 | out_handler[p] = h; | |
687 | out_bl[p] = &h->bl; | |
688 | out_rval[p] = prval; | |
689 | } | |
690 | } | |
691 | ||
692 | void omap_get_vals_by_keys(const std::set<std::string> &to_get, | |
693 | std::map<std::string, bufferlist> *out_set, | |
694 | int *prval) { | |
695 | OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS); | |
696 | bufferlist bl; | |
697 | ::encode(to_get, bl); | |
698 | op.op.extent.offset = 0; | |
699 | op.op.extent.length = bl.length(); | |
700 | op.indata.claim_append(bl); | |
701 | if (prval || out_set) { | |
702 | unsigned p = ops.size() - 1; | |
703 | C_ObjectOperation_decodevals *h = | |
704 | new C_ObjectOperation_decodevals(0, out_set, nullptr, prval); | |
705 | out_handler[p] = h; | |
706 | out_bl[p] = &h->bl; | |
707 | out_rval[p] = prval; | |
708 | } | |
709 | } | |
710 | ||
711 | void omap_cmp(const std::map<std::string, pair<bufferlist,int> > &assertions, | |
712 | int *prval) { | |
713 | OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP); | |
714 | bufferlist bl; | |
715 | ::encode(assertions, bl); | |
716 | op.op.extent.offset = 0; | |
717 | op.op.extent.length = bl.length(); | |
718 | op.indata.claim_append(bl); | |
719 | if (prval) { | |
720 | unsigned p = ops.size() - 1; | |
721 | out_rval[p] = prval; | |
722 | } | |
723 | } | |
724 | ||
725 | struct C_ObjectOperation_copyget : public Context { | |
726 | bufferlist bl; | |
727 | object_copy_cursor_t *cursor; | |
728 | uint64_t *out_size; | |
729 | ceph::real_time *out_mtime; | |
730 | std::map<std::string,bufferlist> *out_attrs; | |
731 | bufferlist *out_data, *out_omap_header, *out_omap_data; | |
732 | vector<snapid_t> *out_snaps; | |
733 | snapid_t *out_snap_seq; | |
734 | uint32_t *out_flags; | |
735 | uint32_t *out_data_digest; | |
736 | uint32_t *out_omap_digest; | |
31f18b77 | 737 | mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids; |
7c673cae FG |
738 | uint64_t *out_truncate_seq; |
739 | uint64_t *out_truncate_size; | |
740 | int *prval; | |
741 | C_ObjectOperation_copyget(object_copy_cursor_t *c, | |
742 | uint64_t *s, | |
743 | ceph::real_time *m, | |
744 | std::map<std::string,bufferlist> *a, | |
745 | bufferlist *d, bufferlist *oh, | |
746 | bufferlist *o, | |
747 | std::vector<snapid_t> *osnaps, | |
748 | snapid_t *osnap_seq, | |
749 | uint32_t *flags, | |
750 | uint32_t *dd, | |
751 | uint32_t *od, | |
31f18b77 | 752 | mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *oreqids, |
7c673cae FG |
753 | uint64_t *otseq, |
754 | uint64_t *otsize, | |
755 | int *r) | |
756 | : cursor(c), | |
757 | out_size(s), out_mtime(m), | |
758 | out_attrs(a), out_data(d), out_omap_header(oh), | |
759 | out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq), | |
760 | out_flags(flags), out_data_digest(dd), out_omap_digest(od), | |
761 | out_reqids(oreqids), | |
762 | out_truncate_seq(otseq), | |
763 | out_truncate_size(otsize), | |
764 | prval(r) {} | |
765 | void finish(int r) override { | |
766 | // reqids are copied on ENOENT | |
767 | if (r < 0 && r != -ENOENT) | |
768 | return; | |
769 | try { | |
770 | bufferlist::iterator p = bl.begin(); | |
771 | object_copy_data_t copy_reply; | |
772 | ::decode(copy_reply, p); | |
773 | if (r == -ENOENT) { | |
774 | if (out_reqids) | |
775 | *out_reqids = copy_reply.reqids; | |
776 | return; | |
777 | } | |
778 | if (out_size) | |
779 | *out_size = copy_reply.size; | |
780 | if (out_mtime) | |
781 | *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime); | |
782 | if (out_attrs) | |
783 | *out_attrs = copy_reply.attrs; | |
784 | if (out_data) | |
785 | out_data->claim_append(copy_reply.data); | |
786 | if (out_omap_header) | |
787 | out_omap_header->claim_append(copy_reply.omap_header); | |
788 | if (out_omap_data) | |
789 | *out_omap_data = copy_reply.omap_data; | |
790 | if (out_snaps) | |
791 | *out_snaps = copy_reply.snaps; | |
792 | if (out_snap_seq) | |
793 | *out_snap_seq = copy_reply.snap_seq; | |
794 | if (out_flags) | |
795 | *out_flags = copy_reply.flags; | |
796 | if (out_data_digest) | |
797 | *out_data_digest = copy_reply.data_digest; | |
798 | if (out_omap_digest) | |
799 | *out_omap_digest = copy_reply.omap_digest; | |
800 | if (out_reqids) | |
801 | *out_reqids = copy_reply.reqids; | |
802 | if (out_truncate_seq) | |
803 | *out_truncate_seq = copy_reply.truncate_seq; | |
804 | if (out_truncate_size) | |
805 | *out_truncate_size = copy_reply.truncate_size; | |
806 | *cursor = copy_reply.cursor; | |
807 | } catch (buffer::error& e) { | |
808 | if (prval) | |
809 | *prval = -EIO; | |
810 | } | |
811 | } | |
812 | }; | |
813 | ||
814 | void copy_get(object_copy_cursor_t *cursor, | |
815 | uint64_t max, | |
816 | uint64_t *out_size, | |
817 | ceph::real_time *out_mtime, | |
818 | std::map<std::string,bufferlist> *out_attrs, | |
819 | bufferlist *out_data, | |
820 | bufferlist *out_omap_header, | |
821 | bufferlist *out_omap_data, | |
822 | vector<snapid_t> *out_snaps, | |
823 | snapid_t *out_snap_seq, | |
824 | uint32_t *out_flags, | |
825 | uint32_t *out_data_digest, | |
826 | uint32_t *out_omap_digest, | |
31f18b77 | 827 | mempool::osd_pglog::vector<pair<osd_reqid_t, version_t> > *out_reqids, |
7c673cae FG |
828 | uint64_t *truncate_seq, |
829 | uint64_t *truncate_size, | |
830 | int *prval) { | |
831 | OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET); | |
832 | osd_op.op.copy_get.max = max; | |
833 | ::encode(*cursor, osd_op.indata); | |
834 | ::encode(max, osd_op.indata); | |
835 | unsigned p = ops.size() - 1; | |
836 | out_rval[p] = prval; | |
837 | C_ObjectOperation_copyget *h = | |
838 | new C_ObjectOperation_copyget(cursor, out_size, out_mtime, | |
839 | out_attrs, out_data, out_omap_header, | |
840 | out_omap_data, out_snaps, out_snap_seq, | |
841 | out_flags, out_data_digest, | |
842 | out_omap_digest, out_reqids, truncate_seq, | |
843 | truncate_size, prval); | |
844 | out_bl[p] = &h->bl; | |
845 | out_handler[p] = h; | |
846 | } | |
847 | ||
848 | void undirty() { | |
849 | add_op(CEPH_OSD_OP_UNDIRTY); | |
850 | } | |
851 | ||
852 | struct C_ObjectOperation_isdirty : public Context { | |
853 | bufferlist bl; | |
854 | bool *pisdirty; | |
855 | int *prval; | |
856 | C_ObjectOperation_isdirty(bool *p, int *r) | |
857 | : pisdirty(p), prval(r) {} | |
858 | void finish(int r) override { | |
859 | if (r < 0) | |
860 | return; | |
861 | try { | |
862 | bufferlist::iterator p = bl.begin(); | |
863 | bool isdirty; | |
864 | ::decode(isdirty, p); | |
865 | if (pisdirty) | |
866 | *pisdirty = isdirty; | |
867 | } catch (buffer::error& e) { | |
868 | if (prval) | |
869 | *prval = -EIO; | |
870 | } | |
871 | } | |
872 | }; | |
873 | ||
874 | void is_dirty(bool *pisdirty, int *prval) { | |
875 | add_op(CEPH_OSD_OP_ISDIRTY); | |
876 | unsigned p = ops.size() - 1; | |
877 | out_rval[p] = prval; | |
878 | C_ObjectOperation_isdirty *h = | |
879 | new C_ObjectOperation_isdirty(pisdirty, prval); | |
880 | out_bl[p] = &h->bl; | |
881 | out_handler[p] = h; | |
882 | } | |
883 | ||
884 | struct C_ObjectOperation_hit_set_ls : public Context { | |
885 | bufferlist bl; | |
886 | std::list< std::pair<time_t, time_t> > *ptls; | |
887 | std::list< std::pair<ceph::real_time, ceph::real_time> > *putls; | |
888 | int *prval; | |
889 | C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t, | |
890 | std::list< std::pair<ceph::real_time, | |
891 | ceph::real_time> > *ut, | |
892 | int *r) | |
893 | : ptls(t), putls(ut), prval(r) {} | |
894 | void finish(int r) override { | |
895 | if (r < 0) | |
896 | return; | |
897 | try { | |
898 | bufferlist::iterator p = bl.begin(); | |
899 | std::list< std::pair<ceph::real_time, ceph::real_time> > ls; | |
900 | ::decode(ls, p); | |
901 | if (ptls) { | |
902 | ptls->clear(); | |
903 | for (auto p = ls.begin(); p != ls.end(); ++p) | |
904 | // round initial timestamp up to the next full second to | |
905 | // keep this a valid interval. | |
906 | ptls->push_back( | |
907 | make_pair(ceph::real_clock::to_time_t( | |
908 | ceph::ceil(p->first, | |
909 | // Sadly, no time literals until C++14. | |
910 | std::chrono::seconds(1))), | |
911 | ceph::real_clock::to_time_t(p->second))); | |
912 | } | |
913 | if (putls) | |
914 | putls->swap(ls); | |
915 | } catch (buffer::error& e) { | |
916 | r = -EIO; | |
917 | } | |
918 | if (prval) | |
919 | *prval = r; | |
920 | } | |
921 | }; | |
922 | ||
923 | /** | |
924 | * list available HitSets. | |
925 | * | |
926 | * We will get back a list of time intervals. Note that the most | |
927 | * recent range may have an empty end timestamp if it is still | |
928 | * accumulating. | |
929 | * | |
930 | * @param pls [out] list of time intervals | |
931 | * @param prval [out] return value | |
932 | */ | |
933 | void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) { | |
934 | add_op(CEPH_OSD_OP_PG_HITSET_LS); | |
935 | unsigned p = ops.size() - 1; | |
936 | out_rval[p] = prval; | |
937 | C_ObjectOperation_hit_set_ls *h = | |
938 | new C_ObjectOperation_hit_set_ls(pls, NULL, prval); | |
939 | out_bl[p] = &h->bl; | |
940 | out_handler[p] = h; | |
941 | } | |
942 | void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls, | |
943 | int *prval) { | |
944 | add_op(CEPH_OSD_OP_PG_HITSET_LS); | |
945 | unsigned p = ops.size() - 1; | |
946 | out_rval[p] = prval; | |
947 | C_ObjectOperation_hit_set_ls *h = | |
948 | new C_ObjectOperation_hit_set_ls(NULL, pls, prval); | |
949 | out_bl[p] = &h->bl; | |
950 | out_handler[p] = h; | |
951 | } | |
952 | ||
953 | /** | |
954 | * get HitSet | |
955 | * | |
956 | * Return an encoded HitSet that includes the provided time | |
957 | * interval. | |
958 | * | |
959 | * @param stamp [in] timestamp | |
960 | * @param pbl [out] target buffer for encoded HitSet | |
961 | * @param prval [out] return value | |
962 | */ | |
963 | void hit_set_get(ceph::real_time stamp, bufferlist *pbl, int *prval) { | |
964 | OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET); | |
965 | op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp); | |
966 | unsigned p = ops.size() - 1; | |
967 | out_rval[p] = prval; | |
968 | out_bl[p] = pbl; | |
969 | } | |
970 | ||
971 | void omap_get_header(bufferlist *bl, int *prval) { | |
972 | add_op(CEPH_OSD_OP_OMAPGETHEADER); | |
973 | unsigned p = ops.size() - 1; | |
974 | out_bl[p] = bl; | |
975 | out_rval[p] = prval; | |
976 | } | |
977 | ||
978 | void omap_set(const map<string, bufferlist> &map) { | |
979 | bufferlist bl; | |
980 | ::encode(map, bl); | |
981 | add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl); | |
982 | } | |
983 | ||
984 | void omap_set_header(bufferlist &bl) { | |
985 | add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl); | |
986 | } | |
987 | ||
988 | void omap_clear() { | |
989 | add_op(CEPH_OSD_OP_OMAPCLEAR); | |
990 | } | |
991 | ||
992 | void omap_rm_keys(const std::set<std::string> &to_remove) { | |
993 | bufferlist bl; | |
994 | ::encode(to_remove, bl); | |
995 | add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl); | |
996 | } | |
997 | ||
998 | // object classes | |
999 | void call(const char *cname, const char *method, bufferlist &indata) { | |
1000 | add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL); | |
1001 | } | |
1002 | ||
1003 | void call(const char *cname, const char *method, bufferlist &indata, | |
1004 | bufferlist *outdata, Context *ctx, int *prval) { | |
1005 | add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval); | |
1006 | } | |
1007 | ||
1008 | // watch/notify | |
1009 | void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) { | |
1010 | OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH); | |
1011 | osd_op.op.watch.cookie = cookie; | |
1012 | osd_op.op.watch.op = op; | |
1013 | osd_op.op.watch.timeout = timeout; | |
1014 | } | |
1015 | ||
1016 | void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout, | |
1017 | bufferlist &bl, bufferlist *inbl) { | |
1018 | OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY); | |
1019 | osd_op.op.notify.cookie = cookie; | |
1020 | ::encode(prot_ver, *inbl); | |
1021 | ::encode(timeout, *inbl); | |
1022 | ::encode(bl, *inbl); | |
1023 | osd_op.indata.append(*inbl); | |
1024 | } | |
1025 | ||
1026 | void notify_ack(uint64_t notify_id, uint64_t cookie, | |
1027 | bufferlist& reply_bl) { | |
1028 | OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK); | |
1029 | bufferlist bl; | |
1030 | ::encode(notify_id, bl); | |
1031 | ::encode(cookie, bl); | |
1032 | ::encode(reply_bl, bl); | |
1033 | osd_op.indata.append(bl); | |
1034 | } | |
1035 | ||
1036 | void list_watchers(list<obj_watch_t> *out, | |
1037 | int *prval) { | |
1038 | (void)add_op(CEPH_OSD_OP_LIST_WATCHERS); | |
1039 | if (prval || out) { | |
1040 | unsigned p = ops.size() - 1; | |
1041 | C_ObjectOperation_decodewatchers *h = | |
1042 | new C_ObjectOperation_decodewatchers(out, prval); | |
1043 | out_handler[p] = h; | |
1044 | out_bl[p] = &h->bl; | |
1045 | out_rval[p] = prval; | |
1046 | } | |
1047 | } | |
1048 | ||
1049 | void list_snaps(librados::snap_set_t *out, int *prval) { | |
1050 | (void)add_op(CEPH_OSD_OP_LIST_SNAPS); | |
1051 | if (prval || out) { | |
1052 | unsigned p = ops.size() - 1; | |
1053 | C_ObjectOperation_decodesnaps *h = | |
1054 | new C_ObjectOperation_decodesnaps(out, prval); | |
1055 | out_handler[p] = h; | |
1056 | out_bl[p] = &h->bl; | |
1057 | out_rval[p] = prval; | |
1058 | } | |
1059 | } | |
1060 | ||
1061 | void assert_version(uint64_t ver) { | |
1062 | OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER); | |
1063 | osd_op.op.assert_ver.ver = ver; | |
1064 | } | |
1065 | ||
1066 | void cmpxattr(const char *name, const bufferlist& val, | |
1067 | int op, int mode) { | |
1068 | add_xattr(CEPH_OSD_OP_CMPXATTR, name, val); | |
1069 | OSDOp& o = *ops.rbegin(); | |
1070 | o.op.xattr.cmp_op = op; | |
1071 | o.op.xattr.cmp_mode = mode; | |
1072 | } | |
1073 | ||
1074 | void rollback(uint64_t snapid) { | |
1075 | OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK); | |
1076 | osd_op.op.snap.snapid = snapid; | |
1077 | } | |
1078 | ||
1079 | void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc, | |
1080 | version_t src_version, unsigned flags, | |
1081 | unsigned src_fadvise_flags) { | |
1082 | OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM); | |
1083 | osd_op.op.copy_from.snapid = snapid; | |
1084 | osd_op.op.copy_from.src_version = src_version; | |
1085 | osd_op.op.copy_from.flags = flags; | |
1086 | osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags; | |
1087 | ::encode(src, osd_op.indata); | |
1088 | ::encode(src_oloc, osd_op.indata); | |
1089 | } | |
1090 | ||
1091 | /** | |
1092 | * writeback content to backing tier | |
1093 | * | |
1094 | * If object is marked dirty in the cache tier, write back content | |
1095 | * to backing tier. If the object is clean this is a no-op. | |
1096 | * | |
1097 | * If writeback races with an update, the update will block. | |
1098 | * | |
1099 | * use with IGNORE_CACHE to avoid triggering promote. | |
1100 | */ | |
1101 | void cache_flush() { | |
1102 | add_op(CEPH_OSD_OP_CACHE_FLUSH); | |
1103 | } | |
1104 | ||
1105 | /** | |
1106 | * writeback content to backing tier | |
1107 | * | |
1108 | * If object is marked dirty in the cache tier, write back content | |
1109 | * to backing tier. If the object is clean this is a no-op. | |
1110 | * | |
1111 | * If writeback races with an update, return EAGAIN. Requires that | |
1112 | * the SKIPRWLOCKS flag be set. | |
1113 | * | |
1114 | * use with IGNORE_CACHE to avoid triggering promote. | |
1115 | */ | |
1116 | void cache_try_flush() { | |
1117 | add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH); | |
1118 | } | |
1119 | ||
1120 | /** | |
1121 | * evict object from cache tier | |
1122 | * | |
1123 | * If object is marked clean, remove the object from the cache tier. | |
1124 | * Otherwise, return EBUSY. | |
1125 | * | |
1126 | * use with IGNORE_CACHE to avoid triggering promote. | |
1127 | */ | |
1128 | void cache_evict() { | |
1129 | add_op(CEPH_OSD_OP_CACHE_EVICT); | |
1130 | } | |
1131 | ||
31f18b77 FG |
1132 | /* |
1133 | * Extensible tier | |
1134 | */ | |
1135 | void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc, | |
1136 | version_t tgt_version) { | |
1137 | OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT); | |
1138 | osd_op.op.copy_from.snapid = snapid; | |
1139 | osd_op.op.copy_from.src_version = tgt_version; | |
1140 | ::encode(tgt, osd_op.indata); | |
1141 | ::encode(tgt_oloc, osd_op.indata); | |
1142 | } | |
1143 | ||
7c673cae FG |
1144 | void set_alloc_hint(uint64_t expected_object_size, |
1145 | uint64_t expected_write_size, | |
1146 | uint32_t flags) { | |
1147 | add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size, | |
1148 | expected_write_size, flags); | |
1149 | ||
1150 | // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed | |
1151 | // not worth a feature bit. Set FAILOK per-op flag to make | |
1152 | // sure older osds don't trip over an unsupported opcode. | |
1153 | set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK); | |
1154 | } | |
1155 | ||
1156 | void dup(vector<OSDOp>& sops) { | |
1157 | ops = sops; | |
1158 | out_bl.resize(sops.size()); | |
1159 | out_handler.resize(sops.size()); | |
1160 | out_rval.resize(sops.size()); | |
1161 | for (uint32_t i = 0; i < sops.size(); i++) { | |
1162 | out_bl[i] = &sops[i].outdata; | |
1163 | out_handler[i] = NULL; | |
1164 | out_rval[i] = &sops[i].rval; | |
1165 | } | |
1166 | } | |
1167 | ||
1168 | /** | |
1169 | * Pin/unpin an object in cache tier | |
1170 | */ | |
1171 | void cache_pin() { | |
1172 | add_op(CEPH_OSD_OP_CACHE_PIN); | |
1173 | } | |
1174 | ||
1175 | void cache_unpin() { | |
1176 | add_op(CEPH_OSD_OP_CACHE_UNPIN); | |
1177 | } | |
1178 | }; | |
1179 | ||
1180 | ||
1181 | // ---------------- | |
1182 | ||
1183 | ||
1184 | class Objecter : public md_config_obs_t, public Dispatcher { | |
1185 | public: | |
1186 | // config observer bits | |
1187 | const char** get_tracked_conf_keys() const override; | |
1188 | void handle_conf_change(const struct md_config_t *conf, | |
1189 | const std::set <std::string> &changed) override; | |
1190 | ||
1191 | public: | |
1192 | Messenger *messenger; | |
1193 | MonClient *monc; | |
1194 | Finisher *finisher; | |
1195 | ZTracer::Endpoint trace_endpoint; | |
1196 | private: | |
1197 | OSDMap *osdmap; | |
1198 | public: | |
1199 | using Dispatcher::cct; | |
1200 | std::multimap<string,string> crush_location; | |
1201 | ||
31f18b77 | 1202 | std::atomic<bool> initialized{false}; |
7c673cae FG |
1203 | |
1204 | private: | |
31f18b77 FG |
1205 | std::atomic<uint64_t> last_tid{0}; |
1206 | std::atomic<unsigned> inflight_ops{0}; | |
1207 | std::atomic<int> client_inc{-1}; | |
7c673cae | 1208 | uint64_t max_linger_id; |
31f18b77 FG |
1209 | std::atomic<unsigned> num_in_flight{0}; |
1210 | std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op | |
7c673cae FG |
1211 | bool keep_balanced_budget; |
1212 | bool honor_osdmap_full; | |
1213 | bool osdmap_full_try; | |
1214 | ||
31f18b77 FG |
1215 | // If this is true, accumulate a set of blacklisted entities |
1216 | // to be drained by consume_blacklist_events. | |
1217 | bool blacklist_events_enabled; | |
1218 | std::set<entity_addr_t> blacklist_events; | |
1219 | ||
7c673cae FG |
1220 | public: |
1221 | void maybe_request_map(); | |
31f18b77 FG |
1222 | |
1223 | void enable_blacklist_events(); | |
7c673cae FG |
1224 | private: |
1225 | ||
1226 | void _maybe_request_map(); | |
1227 | ||
1228 | version_t last_seen_osdmap_version; | |
1229 | version_t last_seen_pgmap_version; | |
1230 | ||
1231 | mutable boost::shared_mutex rwlock; | |
1232 | using lock_guard = std::unique_lock<decltype(rwlock)>; | |
1233 | using unique_lock = std::unique_lock<decltype(rwlock)>; | |
1234 | using shared_lock = boost::shared_lock<decltype(rwlock)>; | |
1235 | using shunique_lock = ceph::shunique_lock<decltype(rwlock)>; | |
1236 | ceph::timer<ceph::mono_clock> timer; | |
1237 | ||
1238 | PerfCounters *logger; | |
1239 | ||
1240 | uint64_t tick_event; | |
1241 | ||
1242 | void start_tick(); | |
1243 | void tick(); | |
1244 | void update_crush_location(); | |
1245 | ||
1246 | class RequestStateHook; | |
1247 | ||
1248 | RequestStateHook *m_request_state_hook; | |
1249 | ||
1250 | public: | |
1251 | /*** track pending operations ***/ | |
1252 | // read | |
1253 | public: | |
1254 | ||
1255 | struct OSDSession; | |
1256 | ||
1257 | struct op_target_t { | |
1258 | int flags = 0; | |
1259 | ||
1260 | epoch_t epoch = 0; ///< latest epoch we calculated the mapping | |
1261 | ||
1262 | object_t base_oid; | |
1263 | object_locator_t base_oloc; | |
1264 | object_t target_oid; | |
1265 | object_locator_t target_oloc; | |
1266 | ||
1267 | ///< true if we are directed at base_pgid, not base_oid | |
1268 | bool precalc_pgid = false; | |
1269 | ||
1270 | ///< true if we have ever mapped to a valid pool | |
1271 | bool pool_ever_existed = false; | |
1272 | ||
1273 | ///< explcit pg target, if any | |
1274 | pg_t base_pgid; | |
1275 | ||
1276 | pg_t pgid; ///< last (raw) pg we mapped to | |
1277 | spg_t actual_pgid; ///< last (actual) spg_t we mapped to | |
1278 | unsigned pg_num = 0; ///< last pg_num we mapped to | |
1279 | unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to | |
1280 | vector<int> up; ///< set of up osds for last pg we mapped to | |
1281 | vector<int> acting; ///< set of acting osds for last pg we mapped to | |
1282 | int up_primary = -1; ///< last up_primary we mapped to | |
1283 | int acting_primary = -1; ///< last acting_primary we mapped to | |
1284 | int size = -1; ///< the size of the pool when were were last mapped | |
1285 | int min_size = -1; ///< the min size of the pool when were were last mapped | |
1286 | bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise | |
c07f9fc5 | 1287 | bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering |
7c673cae FG |
1288 | |
1289 | bool used_replica = false; | |
1290 | bool paused = false; | |
1291 | ||
1292 | int osd = -1; ///< the final target osd, or -1 | |
1293 | ||
1294 | epoch_t last_force_resend = 0; | |
1295 | ||
1296 | op_target_t(object_t oid, object_locator_t oloc, int flags) | |
1297 | : flags(flags), | |
1298 | base_oid(oid), | |
1299 | base_oloc(oloc) | |
1300 | {} | |
1301 | ||
1302 | op_target_t(pg_t pgid) | |
1303 | : base_oloc(pgid.pool(), pgid.ps()), | |
1304 | precalc_pgid(true), | |
1305 | base_pgid(pgid) | |
1306 | {} | |
1307 | ||
1308 | op_target_t() = default; | |
1309 | ||
1310 | hobject_t get_hobj() { | |
1311 | return hobject_t(target_oid, | |
1312 | target_oloc.key, | |
1313 | CEPH_NOSNAP, | |
1314 | target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(), | |
1315 | target_oloc.pool, | |
1316 | target_oloc.nspace); | |
1317 | } | |
1318 | ||
1319 | bool contained_by(const hobject_t& begin, const hobject_t& end) { | |
1320 | hobject_t h = get_hobj(); | |
1321 | int r = cmp(h, begin); | |
1322 | return r == 0 || (r > 0 && h < end); | |
1323 | } | |
1324 | ||
1325 | void dump(Formatter *f) const; | |
1326 | }; | |
1327 | ||
1328 | struct Op : public RefCountedObject { | |
1329 | OSDSession *session; | |
1330 | int incarnation; | |
1331 | ||
1332 | op_target_t target; | |
1333 | ||
1334 | ConnectionRef con; // for rx buffer only | |
1335 | uint64_t features; // explicitly specified op features | |
1336 | ||
1337 | vector<OSDOp> ops; | |
1338 | ||
1339 | snapid_t snapid; | |
1340 | SnapContext snapc; | |
1341 | ceph::real_time mtime; | |
1342 | ||
1343 | bufferlist *outbl; | |
1344 | vector<bufferlist*> out_bl; | |
1345 | vector<Context*> out_handler; | |
1346 | vector<int*> out_rval; | |
1347 | ||
1348 | int priority; | |
1349 | Context *onfinish; | |
1350 | uint64_t ontimeout; | |
1351 | ||
1352 | ceph_tid_t tid; | |
1353 | int attempts; | |
1354 | ||
1355 | version_t *objver; | |
1356 | epoch_t *reply_epoch; | |
1357 | ||
1358 | ceph::mono_time stamp; | |
1359 | ||
1360 | epoch_t map_dne_bound; | |
1361 | ||
1362 | bool budgeted; | |
1363 | ||
1364 | /// true if we should resend this message on failure | |
1365 | bool should_resend; | |
1366 | ||
1367 | /// true if the throttle budget is get/put on a series of OPs, | |
1368 | /// instead of per OP basis, when this flag is set, the budget is | |
1369 | /// acquired before sending the very first OP of the series and | |
1370 | /// released upon receiving the last OP reply. | |
1371 | bool ctx_budgeted; | |
1372 | ||
1373 | int *data_offset; | |
1374 | ||
1375 | osd_reqid_t reqid; // explicitly setting reqid | |
1376 | ZTracer::Trace trace; | |
1377 | ||
1378 | Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op, | |
1379 | int f, Context *fin, version_t *ov, int *offset = NULL, | |
1380 | ZTracer::Trace *parent_trace = nullptr) : | |
1381 | session(NULL), incarnation(0), | |
1382 | target(o, ol, f), | |
1383 | con(NULL), | |
1384 | features(CEPH_FEATURES_SUPPORTED_DEFAULT), | |
1385 | snapid(CEPH_NOSNAP), | |
1386 | outbl(NULL), | |
1387 | priority(0), | |
1388 | onfinish(fin), | |
1389 | ontimeout(0), | |
1390 | tid(0), | |
1391 | attempts(0), | |
1392 | objver(ov), | |
1393 | reply_epoch(NULL), | |
1394 | map_dne_bound(0), | |
1395 | budgeted(false), | |
1396 | should_resend(true), | |
1397 | ctx_budgeted(false), | |
1398 | data_offset(offset) { | |
1399 | ops.swap(op); | |
1400 | ||
1401 | /* initialize out_* to match op vector */ | |
1402 | out_bl.resize(ops.size()); | |
1403 | out_rval.resize(ops.size()); | |
1404 | out_handler.resize(ops.size()); | |
1405 | for (unsigned i = 0; i < ops.size(); i++) { | |
1406 | out_bl[i] = NULL; | |
1407 | out_handler[i] = NULL; | |
1408 | out_rval[i] = NULL; | |
1409 | } | |
1410 | ||
1411 | if (target.base_oloc.key == o) | |
1412 | target.base_oloc.key.clear(); | |
1413 | ||
31f18b77 | 1414 | if (parent_trace && parent_trace->valid()) { |
7c673cae | 1415 | trace.init("op", nullptr, parent_trace); |
31f18b77 FG |
1416 | trace.event("start"); |
1417 | } | |
7c673cae FG |
1418 | } |
1419 | ||
1420 | bool operator<(const Op& other) const { | |
1421 | return tid < other.tid; | |
1422 | } | |
1423 | ||
1424 | bool respects_full() const { | |
1425 | return | |
1426 | (target.flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) && | |
1427 | !(target.flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE)); | |
1428 | } | |
1429 | ||
1430 | private: | |
1431 | ~Op() override { | |
1432 | while (!out_handler.empty()) { | |
1433 | delete out_handler.back(); | |
1434 | out_handler.pop_back(); | |
1435 | } | |
31f18b77 | 1436 | trace.event("finish"); |
7c673cae FG |
1437 | } |
1438 | }; | |
1439 | ||
1440 | struct C_Op_Map_Latest : public Context { | |
1441 | Objecter *objecter; | |
1442 | ceph_tid_t tid; | |
1443 | version_t latest; | |
1444 | C_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), | |
1445 | latest(0) {} | |
1446 | void finish(int r) override; | |
1447 | }; | |
1448 | ||
1449 | struct C_Command_Map_Latest : public Context { | |
1450 | Objecter *objecter; | |
1451 | uint64_t tid; | |
1452 | version_t latest; | |
1453 | C_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t), | |
1454 | latest(0) {} | |
1455 | void finish(int r) override; | |
1456 | }; | |
1457 | ||
1458 | struct C_Stat : public Context { | |
1459 | bufferlist bl; | |
1460 | uint64_t *psize; | |
1461 | ceph::real_time *pmtime; | |
1462 | Context *fin; | |
1463 | C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) : | |
1464 | psize(ps), pmtime(pm), fin(c) {} | |
1465 | void finish(int r) override { | |
1466 | if (r >= 0) { | |
1467 | bufferlist::iterator p = bl.begin(); | |
1468 | uint64_t s; | |
1469 | ceph::real_time m; | |
1470 | ::decode(s, p); | |
1471 | ::decode(m, p); | |
1472 | if (psize) | |
1473 | *psize = s; | |
1474 | if (pmtime) | |
1475 | *pmtime = m; | |
1476 | } | |
1477 | fin->complete(r); | |
1478 | } | |
1479 | }; | |
1480 | ||
1481 | struct C_GetAttrs : public Context { | |
1482 | bufferlist bl; | |
1483 | map<string,bufferlist>& attrset; | |
1484 | Context *fin; | |
1485 | C_GetAttrs(map<string, bufferlist>& set, Context *c) : attrset(set), | |
1486 | fin(c) {} | |
1487 | void finish(int r) override { | |
1488 | if (r >= 0) { | |
1489 | bufferlist::iterator p = bl.begin(); | |
1490 | ::decode(attrset, p); | |
1491 | } | |
1492 | fin->complete(r); | |
1493 | } | |
1494 | }; | |
1495 | ||
1496 | ||
1497 | // Pools and statistics | |
1498 | struct NListContext { | |
1499 | collection_list_handle_t pos; | |
1500 | ||
1501 | // these are for !sortbitwise compat only | |
1502 | int current_pg = 0; | |
1503 | int starting_pg_num = 0; | |
1504 | bool sort_bitwise = false; | |
1505 | ||
1506 | bool at_end_of_pool = false; ///< publicly visible end flag | |
1507 | ||
1508 | int64_t pool_id = -1; | |
1509 | int pool_snap_seq = 0; | |
1510 | uint64_t max_entries = 0; | |
1511 | string nspace; | |
1512 | ||
1513 | bufferlist bl; // raw data read to here | |
1514 | std::list<librados::ListObjectImpl> list; | |
1515 | ||
1516 | bufferlist filter; | |
1517 | ||
1518 | bufferlist extra_info; | |
1519 | ||
1520 | // The budget associated with this context, once it is set (>= 0), | |
1521 | // the budget is not get/released on OP basis, instead the budget | |
1522 | // is acquired before sending the first OP and released upon receiving | |
1523 | // the last op reply. | |
1524 | int ctx_budget = -1; | |
1525 | ||
1526 | bool at_end() const { | |
1527 | return at_end_of_pool; | |
1528 | } | |
1529 | ||
1530 | uint32_t get_pg_hash_position() const { | |
1531 | return pos.get_hash(); | |
1532 | } | |
1533 | }; | |
1534 | ||
1535 | struct C_NList : public Context { | |
1536 | NListContext *list_context; | |
1537 | Context *final_finish; | |
1538 | Objecter *objecter; | |
1539 | epoch_t epoch; | |
1540 | C_NList(NListContext *lc, Context * finish, Objecter *ob) : | |
1541 | list_context(lc), final_finish(finish), objecter(ob), epoch(0) {} | |
1542 | void finish(int r) override { | |
1543 | if (r >= 0) { | |
1544 | objecter->_nlist_reply(list_context, r, final_finish, epoch); | |
1545 | } else { | |
1546 | final_finish->complete(r); | |
1547 | } | |
1548 | } | |
1549 | }; | |
1550 | ||
1551 | struct PoolStatOp { | |
1552 | ceph_tid_t tid; | |
1553 | list<string> pools; | |
1554 | ||
1555 | map<string,pool_stat_t> *pool_stats; | |
1556 | Context *onfinish; | |
1557 | uint64_t ontimeout; | |
1558 | ||
1559 | ceph::mono_time last_submit; | |
1560 | }; | |
1561 | ||
1562 | struct StatfsOp { | |
1563 | ceph_tid_t tid; | |
1564 | struct ceph_statfs *stats; | |
d2e6a577 | 1565 | boost::optional<int64_t> data_pool; |
7c673cae FG |
1566 | Context *onfinish; |
1567 | uint64_t ontimeout; | |
1568 | ||
1569 | ceph::mono_time last_submit; | |
1570 | }; | |
1571 | ||
1572 | struct PoolOp { | |
1573 | ceph_tid_t tid; | |
1574 | int64_t pool; | |
1575 | string name; | |
1576 | Context *onfinish; | |
1577 | uint64_t ontimeout; | |
1578 | int pool_op; | |
1579 | uint64_t auid; | |
1580 | int16_t crush_rule; | |
1581 | snapid_t snapid; | |
1582 | bufferlist *blp; | |
1583 | ||
1584 | ceph::mono_time last_submit; | |
1585 | PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(0), pool_op(0), | |
1586 | auid(0), crush_rule(0), snapid(0), blp(NULL) {} | |
1587 | }; | |
1588 | ||
1589 | // -- osd commands -- | |
1590 | struct CommandOp : public RefCountedObject { | |
1591 | OSDSession *session = nullptr; | |
1592 | ceph_tid_t tid = 0; | |
1593 | vector<string> cmd; | |
1594 | bufferlist inbl; | |
1595 | bufferlist *poutbl = nullptr; | |
1596 | string *prs = nullptr; | |
1597 | ||
1598 | // target_osd == -1 means target_pg is valid | |
1599 | const int target_osd = -1; | |
1600 | const pg_t target_pg; | |
1601 | ||
1602 | op_target_t target; | |
1603 | ||
1604 | epoch_t map_dne_bound = 0; | |
1605 | int map_check_error = 0; // error to return if map check fails | |
1606 | const char *map_check_error_str = nullptr; | |
1607 | ||
1608 | Context *onfinish = nullptr; | |
1609 | uint64_t ontimeout = 0; | |
1610 | ceph::mono_time last_submit; | |
1611 | ||
1612 | CommandOp( | |
1613 | int target_osd, | |
1614 | const vector<string> &cmd, | |
1615 | bufferlist inbl, | |
1616 | bufferlist *poutbl, | |
1617 | string *prs, | |
1618 | Context *onfinish) | |
1619 | : cmd(cmd), | |
1620 | inbl(inbl), | |
1621 | poutbl(poutbl), | |
1622 | prs(prs), | |
1623 | target_osd(target_osd), | |
1624 | onfinish(onfinish) {} | |
1625 | ||
1626 | CommandOp( | |
1627 | pg_t pgid, | |
1628 | const vector<string> &cmd, | |
1629 | bufferlist inbl, | |
1630 | bufferlist *poutbl, | |
1631 | string *prs, | |
1632 | Context *onfinish) | |
1633 | : cmd(cmd), | |
1634 | inbl(inbl), | |
1635 | poutbl(poutbl), | |
1636 | prs(prs), | |
1637 | target_pg(pgid), | |
1638 | target(pgid), | |
1639 | onfinish(onfinish) {} | |
1640 | ||
1641 | }; | |
1642 | ||
1643 | void submit_command(CommandOp *c, ceph_tid_t *ptid); | |
1644 | int _calc_command_target(CommandOp *c, shunique_lock &sul); | |
1645 | void _assign_command_session(CommandOp *c, shunique_lock &sul); | |
1646 | void _send_command(CommandOp *c); | |
1647 | int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r); | |
1648 | void _finish_command(CommandOp *c, int r, string rs); | |
1649 | void handle_command_reply(MCommandReply *m); | |
1650 | ||
1651 | ||
1652 | // -- lingering ops -- | |
1653 | ||
1654 | struct WatchContext { | |
1655 | // this simply mirrors librados WatchCtx2 | |
1656 | virtual void handle_notify(uint64_t notify_id, | |
1657 | uint64_t cookie, | |
1658 | uint64_t notifier_id, | |
1659 | bufferlist& bl) = 0; | |
1660 | virtual void handle_error(uint64_t cookie, int err) = 0; | |
1661 | virtual ~WatchContext() {} | |
1662 | }; | |
1663 | ||
1664 | struct LingerOp : public RefCountedObject { | |
1665 | uint64_t linger_id; | |
1666 | ||
1667 | op_target_t target; | |
1668 | ||
1669 | snapid_t snap; | |
1670 | SnapContext snapc; | |
1671 | ceph::real_time mtime; | |
1672 | ||
1673 | vector<OSDOp> ops; | |
1674 | bufferlist inbl; | |
1675 | bufferlist *poutbl; | |
1676 | version_t *pobjver; | |
1677 | ||
1678 | bool is_watch; | |
1679 | ceph::mono_time watch_valid_thru; ///< send time for last acked ping | |
1680 | int last_error; ///< error from last failed ping|reconnect, if any | |
1681 | boost::shared_mutex watch_lock; | |
1682 | using lock_guard = std::unique_lock<decltype(watch_lock)>; | |
1683 | using unique_lock = std::unique_lock<decltype(watch_lock)>; | |
1684 | using shared_lock = boost::shared_lock<decltype(watch_lock)>; | |
1685 | using shunique_lock = ceph::shunique_lock<decltype(watch_lock)>; | |
1686 | ||
1687 | // queue of pending async operations, with the timestamp of | |
1688 | // when they were queued. | |
1689 | list<ceph::mono_time> watch_pending_async; | |
1690 | ||
1691 | uint32_t register_gen; | |
1692 | bool registered; | |
1693 | bool canceled; | |
1694 | Context *on_reg_commit; | |
1695 | ||
1696 | // we trigger these from an async finisher | |
1697 | Context *on_notify_finish; | |
1698 | bufferlist *notify_result_bl; | |
1699 | uint64_t notify_id; | |
1700 | ||
1701 | WatchContext *watch_context; | |
1702 | ||
1703 | OSDSession *session; | |
1704 | ||
1705 | ceph_tid_t register_tid; | |
1706 | ceph_tid_t ping_tid; | |
1707 | epoch_t map_dne_bound; | |
1708 | ||
1709 | void _queued_async() { | |
1710 | // watch_lock ust be locked unique | |
1711 | watch_pending_async.push_back(ceph::mono_clock::now()); | |
1712 | } | |
1713 | void finished_async() { | |
1714 | unique_lock l(watch_lock); | |
1715 | assert(!watch_pending_async.empty()); | |
1716 | watch_pending_async.pop_front(); | |
1717 | } | |
1718 | ||
1719 | LingerOp() : linger_id(0), | |
1720 | target(object_t(), object_locator_t(), 0), | |
1721 | snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL), | |
1722 | is_watch(false), last_error(0), | |
1723 | register_gen(0), | |
1724 | registered(false), | |
1725 | canceled(false), | |
1726 | on_reg_commit(NULL), | |
1727 | on_notify_finish(NULL), | |
1728 | notify_result_bl(NULL), | |
1729 | notify_id(0), | |
1730 | watch_context(NULL), | |
1731 | session(NULL), | |
1732 | register_tid(0), | |
1733 | ping_tid(0), | |
1734 | map_dne_bound(0) {} | |
1735 | ||
1736 | // no copy! | |
1737 | const LingerOp &operator=(const LingerOp& r); | |
1738 | LingerOp(const LingerOp& o); | |
1739 | ||
1740 | uint64_t get_cookie() { | |
1741 | return reinterpret_cast<uint64_t>(this); | |
1742 | } | |
1743 | ||
1744 | private: | |
1745 | ~LingerOp() override { | |
1746 | delete watch_context; | |
1747 | } | |
1748 | }; | |
1749 | ||
1750 | struct C_Linger_Commit : public Context { | |
1751 | Objecter *objecter; | |
1752 | LingerOp *info; | |
1753 | bufferlist outbl; // used for notify only | |
1754 | C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) { | |
1755 | info->get(); | |
1756 | } | |
1757 | ~C_Linger_Commit() override { | |
1758 | info->put(); | |
1759 | } | |
1760 | void finish(int r) override { | |
1761 | objecter->_linger_commit(info, r, outbl); | |
1762 | } | |
1763 | }; | |
1764 | ||
1765 | struct C_Linger_Reconnect : public Context { | |
1766 | Objecter *objecter; | |
1767 | LingerOp *info; | |
1768 | C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) { | |
1769 | info->get(); | |
1770 | } | |
1771 | ~C_Linger_Reconnect() override { | |
1772 | info->put(); | |
1773 | } | |
1774 | void finish(int r) override { | |
1775 | objecter->_linger_reconnect(info, r); | |
1776 | } | |
1777 | }; | |
1778 | ||
1779 | struct C_Linger_Ping : public Context { | |
1780 | Objecter *objecter; | |
1781 | LingerOp *info; | |
1782 | ceph::mono_time sent; | |
1783 | uint32_t register_gen; | |
1784 | C_Linger_Ping(Objecter *o, LingerOp *l) | |
1785 | : objecter(o), info(l), register_gen(info->register_gen) { | |
1786 | info->get(); | |
1787 | } | |
1788 | ~C_Linger_Ping() override { | |
1789 | info->put(); | |
1790 | } | |
1791 | void finish(int r) override { | |
1792 | objecter->_linger_ping(info, r, sent, register_gen); | |
1793 | } | |
1794 | }; | |
1795 | ||
1796 | struct C_Linger_Map_Latest : public Context { | |
1797 | Objecter *objecter; | |
1798 | uint64_t linger_id; | |
1799 | version_t latest; | |
1800 | C_Linger_Map_Latest(Objecter *o, uint64_t id) : | |
1801 | objecter(o), linger_id(id), latest(0) {} | |
1802 | void finish(int r) override; | |
1803 | }; | |
1804 | ||
1805 | // -- osd sessions -- | |
1806 | struct OSDBackoff { | |
1807 | spg_t pgid; | |
1808 | uint64_t id; | |
1809 | hobject_t begin, end; | |
1810 | }; | |
1811 | ||
1812 | struct OSDSession : public RefCountedObject { | |
1813 | boost::shared_mutex lock; | |
1814 | using lock_guard = std::lock_guard<decltype(lock)>; | |
1815 | using unique_lock = std::unique_lock<decltype(lock)>; | |
1816 | using shared_lock = boost::shared_lock<decltype(lock)>; | |
1817 | using shunique_lock = ceph::shunique_lock<decltype(lock)>; | |
1818 | ||
1819 | // pending ops | |
1820 | map<ceph_tid_t,Op*> ops; | |
1821 | map<uint64_t, LingerOp*> linger_ops; | |
1822 | map<ceph_tid_t,CommandOp*> command_ops; | |
1823 | ||
1824 | // backoffs | |
1825 | map<spg_t,map<hobject_t,OSDBackoff>> backoffs; | |
1826 | map<uint64_t,OSDBackoff*> backoffs_by_id; | |
1827 | ||
1828 | int osd; | |
1829 | int incarnation; | |
1830 | ConnectionRef con; | |
1831 | int num_locks; | |
1832 | std::unique_ptr<std::mutex[]> completion_locks; | |
1833 | using unique_completion_lock = std::unique_lock< | |
1834 | decltype(completion_locks)::element_type>; | |
1835 | ||
1836 | ||
1837 | OSDSession(CephContext *cct, int o) : | |
1838 | osd(o), incarnation(0), con(NULL), | |
1839 | num_locks(cct->_conf->objecter_completion_locks_per_session), | |
1840 | completion_locks(new std::mutex[num_locks]) {} | |
1841 | ||
1842 | ~OSDSession() override; | |
1843 | ||
1844 | bool is_homeless() { return (osd == -1); } | |
1845 | ||
1846 | unique_completion_lock get_lock(object_t& oid); | |
1847 | }; | |
1848 | map<int,OSDSession*> osd_sessions; | |
1849 | ||
1850 | bool osdmap_full_flag() const; | |
1851 | bool osdmap_pool_full(const int64_t pool_id) const; | |
1852 | ||
1853 | private: | |
1854 | ||
1855 | /** | |
1856 | * Test pg_pool_t::FLAG_FULL on a pool | |
1857 | * | |
1858 | * @return true if the pool exists and has the flag set, or | |
1859 | * the global full flag is set, else false | |
1860 | */ | |
1861 | bool _osdmap_pool_full(const int64_t pool_id) const; | |
1862 | bool _osdmap_pool_full(const pg_pool_t &p) const; | |
1863 | void update_pool_full_map(map<int64_t, bool>& pool_full_map); | |
1864 | ||
1865 | map<uint64_t, LingerOp*> linger_ops; | |
1866 | // we use this just to confirm a cookie is valid before dereferencing the ptr | |
1867 | set<LingerOp*> linger_ops_set; | |
1868 | ||
1869 | map<ceph_tid_t,PoolStatOp*> poolstat_ops; | |
1870 | map<ceph_tid_t,StatfsOp*> statfs_ops; | |
1871 | map<ceph_tid_t,PoolOp*> pool_ops; | |
31f18b77 | 1872 | std::atomic<unsigned> num_homeless_ops{0}; |
7c673cae FG |
1873 | |
1874 | OSDSession *homeless_session; | |
1875 | ||
1876 | // ops waiting for an osdmap with a new pool or confirmation that | |
1877 | // the pool does not exist (may be expanded to other uses later) | |
1878 | map<uint64_t, LingerOp*> check_latest_map_lingers; | |
1879 | map<ceph_tid_t, Op*> check_latest_map_ops; | |
1880 | map<ceph_tid_t, CommandOp*> check_latest_map_commands; | |
1881 | ||
1882 | map<epoch_t,list< pair<Context*, int> > > waiting_for_map; | |
1883 | ||
1884 | ceph::timespan mon_timeout; | |
1885 | ceph::timespan osd_timeout; | |
1886 | ||
1887 | MOSDOp *_prepare_osd_op(Op *op); | |
1888 | void _send_op(Op *op, MOSDOp *m = NULL); | |
1889 | void _send_op_account(Op *op); | |
1890 | void _cancel_linger_op(Op *op); | |
1891 | void finish_op(OSDSession *session, ceph_tid_t tid); | |
1892 | void _finish_op(Op *op, int r); | |
1893 | static bool is_pg_changed( | |
1894 | int oldprimary, | |
1895 | const vector<int>& oldacting, | |
1896 | int newprimary, | |
1897 | const vector<int>& newacting, | |
1898 | bool any_change=false); | |
1899 | enum recalc_op_target_result { | |
1900 | RECALC_OP_TARGET_NO_ACTION = 0, | |
1901 | RECALC_OP_TARGET_NEED_RESEND, | |
1902 | RECALC_OP_TARGET_POOL_DNE, | |
1903 | RECALC_OP_TARGET_OSD_DNE, | |
1904 | RECALC_OP_TARGET_OSD_DOWN, | |
1905 | }; | |
1906 | bool _osdmap_full_flag() const; | |
1907 | bool _osdmap_has_pool_full() const; | |
1908 | ||
1909 | bool target_should_be_paused(op_target_t *op); | |
1910 | int _calc_target(op_target_t *t, Connection *con, | |
1911 | bool any_change = false); | |
1912 | int _map_session(op_target_t *op, OSDSession **s, | |
1913 | shunique_lock& lc); | |
1914 | ||
1915 | void _session_op_assign(OSDSession *s, Op *op); | |
1916 | void _session_op_remove(OSDSession *s, Op *op); | |
1917 | void _session_linger_op_assign(OSDSession *to, LingerOp *op); | |
1918 | void _session_linger_op_remove(OSDSession *from, LingerOp *op); | |
1919 | void _session_command_op_assign(OSDSession *to, CommandOp *op); | |
1920 | void _session_command_op_remove(OSDSession *from, CommandOp *op); | |
1921 | ||
1922 | int _assign_op_target_session(Op *op, shunique_lock& lc, | |
1923 | bool src_session_locked, | |
1924 | bool dst_session_locked); | |
1925 | int _recalc_linger_op_target(LingerOp *op, shunique_lock& lc); | |
1926 | ||
1927 | void _linger_submit(LingerOp *info, shunique_lock& sul); | |
1928 | void _send_linger(LingerOp *info, shunique_lock& sul); | |
1929 | void _linger_commit(LingerOp *info, int r, bufferlist& outbl); | |
1930 | void _linger_reconnect(LingerOp *info, int r); | |
1931 | void _send_linger_ping(LingerOp *info); | |
1932 | void _linger_ping(LingerOp *info, int r, ceph::mono_time sent, | |
1933 | uint32_t register_gen); | |
1934 | int _normalize_watch_error(int r); | |
1935 | ||
1936 | friend class C_DoWatchError; | |
1937 | public: | |
1938 | void linger_callback_flush(Context *ctx) { | |
1939 | finisher->queue(ctx); | |
1940 | } | |
1941 | ||
1942 | private: | |
1943 | void _check_op_pool_dne(Op *op, unique_lock *sl); | |
1944 | void _send_op_map_check(Op *op); | |
1945 | void _op_cancel_map_check(Op *op); | |
1946 | void _check_linger_pool_dne(LingerOp *op, bool *need_unregister); | |
1947 | void _send_linger_map_check(LingerOp *op); | |
1948 | void _linger_cancel_map_check(LingerOp *op); | |
1949 | void _check_command_map_dne(CommandOp *op); | |
1950 | void _send_command_map_check(CommandOp *op); | |
1951 | void _command_cancel_map_check(CommandOp *op); | |
1952 | ||
1953 | void kick_requests(OSDSession *session); | |
1954 | void _kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lresend); | |
1955 | void _linger_ops_resend(map<uint64_t, LingerOp *>& lresend, unique_lock& ul); | |
1956 | ||
1957 | int _get_session(int osd, OSDSession **session, shunique_lock& sul); | |
1958 | void put_session(OSDSession *s); | |
1959 | void get_session(OSDSession *s); | |
1960 | void _reopen_session(OSDSession *session); | |
1961 | void close_session(OSDSession *session); | |
1962 | ||
1963 | void _nlist_reply(NListContext *list_context, int r, Context *final_finish, | |
1964 | epoch_t reply_epoch); | |
1965 | ||
1966 | void resend_mon_ops(); | |
1967 | ||
1968 | /** | |
1969 | * handle a budget for in-flight ops | |
1970 | * budget is taken whenever an op goes into the ops map | |
1971 | * and returned whenever an op is removed from the map | |
1972 | * If throttle_op needs to throttle it will unlock client_lock. | |
1973 | */ | |
1974 | int calc_op_budget(Op *op); | |
1975 | void _throttle_op(Op *op, shunique_lock& sul, int op_size = 0); | |
1976 | int _take_op_budget(Op *op, shunique_lock& sul) { | |
1977 | assert(sul && sul.mutex() == &rwlock); | |
1978 | int op_budget = calc_op_budget(op); | |
1979 | if (keep_balanced_budget) { | |
1980 | _throttle_op(op, sul, op_budget); | |
1981 | } else { | |
1982 | op_throttle_bytes.take(op_budget); | |
1983 | op_throttle_ops.take(1); | |
1984 | } | |
1985 | op->budgeted = true; | |
1986 | return op_budget; | |
1987 | } | |
1988 | void put_op_budget_bytes(int op_budget) { | |
1989 | assert(op_budget >= 0); | |
1990 | op_throttle_bytes.put(op_budget); | |
1991 | op_throttle_ops.put(1); | |
1992 | } | |
1993 | void put_op_budget(Op *op) { | |
1994 | assert(op->budgeted); | |
1995 | int op_budget = calc_op_budget(op); | |
1996 | put_op_budget_bytes(op_budget); | |
1997 | } | |
1998 | void put_nlist_context_budget(NListContext *list_context); | |
1999 | Throttle op_throttle_bytes, op_throttle_ops; | |
2000 | ||
2001 | public: | |
2002 | Objecter(CephContext *cct_, Messenger *m, MonClient *mc, | |
2003 | Finisher *fin, | |
2004 | double mon_timeout, | |
2005 | double osd_timeout) : | |
2006 | Dispatcher(cct_), messenger(m), monc(mc), finisher(fin), | |
2007 | trace_endpoint("0.0.0.0", 0, "Objecter"), | |
31f18b77 FG |
2008 | osdmap(new OSDMap), |
2009 | max_linger_id(0), | |
7c673cae | 2010 | keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false), |
31f18b77 | 2011 | blacklist_events_enabled(false), |
7c673cae FG |
2012 | last_seen_osdmap_version(0), last_seen_pgmap_version(0), |
2013 | logger(NULL), tick_event(0), m_request_state_hook(NULL), | |
7c673cae FG |
2014 | homeless_session(new OSDSession(cct, -1)), |
2015 | mon_timeout(ceph::make_timespan(mon_timeout)), | |
2016 | osd_timeout(ceph::make_timespan(osd_timeout)), | |
2017 | op_throttle_bytes(cct, "objecter_bytes", | |
2018 | cct->_conf->objecter_inflight_op_bytes), | |
2019 | op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops), | |
2020 | epoch_barrier(0), | |
2021 | retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply) | |
2022 | { } | |
2023 | ~Objecter() override; | |
2024 | ||
2025 | void init(); | |
2026 | void start(const OSDMap *o = nullptr); | |
2027 | void shutdown(); | |
2028 | ||
2029 | // These two templates replace osdmap_(get)|(put)_read. Simply wrap | |
2030 | // whatever functionality you want to use the OSDMap in a lambda like: | |
2031 | // | |
2032 | // with_osdmap([](const OSDMap& o) { o.do_stuff(); }); | |
2033 | // | |
2034 | // or | |
2035 | // | |
2036 | // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); }); | |
2037 | // | |
2038 | // Do not call into something that will try to lock the OSDMap from | |
2039 | // here or you will have great woe and misery. | |
2040 | ||
2041 | template<typename Callback, typename...Args> | |
2042 | auto with_osdmap(Callback&& cb, Args&&... args) const -> | |
2043 | decltype(cb(*osdmap, std::forward<Args>(args)...)) { | |
2044 | shared_lock l(rwlock); | |
2045 | return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...); | |
2046 | } | |
2047 | ||
2048 | ||
2049 | /** | |
2050 | * Tell the objecter to throttle outgoing ops according to its | |
2051 | * budget (in _conf). If you do this, ops can block, in | |
2052 | * which case it will unlock client_lock and sleep until | |
2053 | * incoming messages reduce the used budget low enough for | |
2054 | * the ops to continue going; then it will lock client_lock again. | |
2055 | */ | |
2056 | void set_balanced_budget() { keep_balanced_budget = true; } | |
2057 | void unset_balanced_budget() { keep_balanced_budget = false; } | |
2058 | ||
2059 | void set_honor_osdmap_full() { honor_osdmap_full = true; } | |
2060 | void unset_honor_osdmap_full() { honor_osdmap_full = false; } | |
2061 | ||
2062 | void set_osdmap_full_try() { osdmap_full_try = true; } | |
2063 | void unset_osdmap_full_try() { osdmap_full_try = false; } | |
2064 | ||
2065 | void _scan_requests(OSDSession *s, | |
2066 | bool force_resend, | |
2067 | bool cluster_full, | |
2068 | map<int64_t, bool> *pool_full_map, | |
2069 | map<ceph_tid_t, Op*>& need_resend, | |
2070 | list<LingerOp*>& need_resend_linger, | |
2071 | map<ceph_tid_t, CommandOp*>& need_resend_command, | |
2072 | shunique_lock& sul); | |
2073 | ||
2074 | int64_t get_object_hash_position(int64_t pool, const string& key, | |
2075 | const string& ns); | |
2076 | int64_t get_object_pg_hash_position(int64_t pool, const string& key, | |
2077 | const string& ns); | |
2078 | ||
2079 | // messages | |
2080 | public: | |
2081 | bool ms_dispatch(Message *m) override; | |
2082 | bool ms_can_fast_dispatch_any() const override { | |
2083 | return true; | |
2084 | } | |
2085 | bool ms_can_fast_dispatch(const Message *m) const override { | |
2086 | switch (m->get_type()) { | |
2087 | case CEPH_MSG_OSD_OPREPLY: | |
2088 | case CEPH_MSG_WATCH_NOTIFY: | |
2089 | return true; | |
2090 | default: | |
2091 | return false; | |
2092 | } | |
2093 | } | |
2094 | void ms_fast_dispatch(Message *m) override { | |
224ce89b WB |
2095 | if (!ms_dispatch(m)) { |
2096 | m->put(); | |
2097 | } | |
7c673cae FG |
2098 | } |
2099 | ||
2100 | void handle_osd_op_reply(class MOSDOpReply *m); | |
2101 | void handle_osd_backoff(class MOSDBackoff *m); | |
2102 | void handle_watch_notify(class MWatchNotify *m); | |
2103 | void handle_osd_map(class MOSDMap *m); | |
2104 | void wait_for_osd_map(); | |
2105 | ||
31f18b77 FG |
2106 | /** |
2107 | * Get list of entities blacklisted since this was last called, | |
2108 | * and reset the list. | |
2109 | * | |
2110 | * Uses a std::set because typical use case is to compare some | |
2111 | * other list of clients to see which overlap with the blacklisted | |
2112 | * addrs. | |
2113 | * | |
2114 | */ | |
2115 | void consume_blacklist_events(std::set<entity_addr_t> *events); | |
2116 | ||
7c673cae FG |
2117 | int pool_snap_by_name(int64_t poolid, |
2118 | const char *snap_name, | |
2119 | snapid_t *snap) const; | |
2120 | int pool_snap_get_info(int64_t poolid, snapid_t snap, | |
2121 | pool_snap_info_t *info) const; | |
2122 | int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps); | |
2123 | private: | |
2124 | ||
31f18b77 FG |
2125 | void emit_blacklist_events(const OSDMap::Incremental &inc); |
2126 | void emit_blacklist_events(const OSDMap &old_osd_map, | |
2127 | const OSDMap &new_osd_map); | |
2128 | ||
7c673cae FG |
2129 | // low-level |
2130 | void _op_submit(Op *op, shunique_lock& lc, ceph_tid_t *ptid); | |
2131 | void _op_submit_with_budget(Op *op, shunique_lock& lc, | |
2132 | ceph_tid_t *ptid, | |
2133 | int *ctx_budget = NULL); | |
2134 | inline void unregister_op(Op *op); | |
2135 | ||
2136 | // public interface | |
2137 | public: | |
2138 | void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL); | |
2139 | bool is_active() { | |
2140 | shared_lock l(rwlock); | |
31f18b77 | 2141 | return !((!inflight_ops) && linger_ops.empty() && |
7c673cae FG |
2142 | poolstat_ops.empty() && statfs_ops.empty()); |
2143 | } | |
2144 | ||
2145 | /** | |
2146 | * Output in-flight requests | |
2147 | */ | |
2148 | void _dump_active(OSDSession *s); | |
2149 | void _dump_active(); | |
2150 | void dump_active(); | |
2151 | void dump_requests(Formatter *fmt); | |
2152 | void _dump_ops(const OSDSession *s, Formatter *fmt); | |
2153 | void dump_ops(Formatter *fmt); | |
2154 | void _dump_linger_ops(const OSDSession *s, Formatter *fmt); | |
2155 | void dump_linger_ops(Formatter *fmt); | |
2156 | void _dump_command_ops(const OSDSession *s, Formatter *fmt); | |
2157 | void dump_command_ops(Formatter *fmt); | |
2158 | void dump_pool_ops(Formatter *fmt) const; | |
2159 | void dump_pool_stat_ops(Formatter *fmt) const; | |
2160 | void dump_statfs_ops(Formatter *fmt) const; | |
2161 | ||
31f18b77 FG |
2162 | int get_client_incarnation() const { return client_inc; } |
2163 | void set_client_incarnation(int inc) { client_inc = inc; } | |
7c673cae FG |
2164 | |
2165 | bool have_map(epoch_t epoch); | |
2166 | /// wait for epoch; true if we already have it | |
2167 | bool wait_for_map(epoch_t epoch, Context *c, int err=0); | |
2168 | void _wait_for_new_map(Context *c, epoch_t epoch, int err=0); | |
2169 | void wait_for_latest_osdmap(Context *fin); | |
2170 | void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin); | |
2171 | void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin); | |
2172 | ||
2173 | /** Get the current set of global op flags */ | |
31f18b77 | 2174 | int get_global_op_flags() const { return global_op_flags; } |
7c673cae FG |
2175 | /** Add a flag to the global op flags, not really atomic operation */ |
2176 | void add_global_op_flags(int flag) { | |
31f18b77 | 2177 | global_op_flags.fetch_or(flag); |
7c673cae | 2178 | } |
31f18b77 | 2179 | /** Clear the passed flags from the global op flag set */ |
7c673cae | 2180 | void clear_global_op_flag(int flags) { |
31f18b77 | 2181 | global_op_flags.fetch_and(~flags); |
7c673cae FG |
2182 | } |
2183 | ||
2184 | /// cancel an in-progress request with the given return code | |
2185 | private: | |
2186 | int op_cancel(OSDSession *s, ceph_tid_t tid, int r); | |
2187 | int _op_cancel(ceph_tid_t tid, int r); | |
2188 | public: | |
2189 | int op_cancel(ceph_tid_t tid, int r); | |
94b18763 | 2190 | int op_cancel(const vector<ceph_tid_t>& tidls, int r); |
7c673cae FG |
2191 | |
2192 | /** | |
2193 | * Any write op which is in progress at the start of this call shall no | |
2194 | * longer be in progress when this call ends. Operations started after the | |
2195 | * start of this call may still be in progress when this call ends. | |
2196 | * | |
2197 | * @return the latest possible epoch in which a cancelled op could have | |
2198 | * existed, or -1 if nothing was cancelled. | |
2199 | */ | |
2200 | epoch_t op_cancel_writes(int r, int64_t pool=-1); | |
2201 | ||
2202 | // commands | |
2203 | void osd_command(int osd, const std::vector<string>& cmd, | |
2204 | const bufferlist& inbl, ceph_tid_t *ptid, | |
2205 | bufferlist *poutbl, string *prs, Context *onfinish) { | |
2206 | assert(osd >= 0); | |
2207 | CommandOp *c = new CommandOp( | |
2208 | osd, | |
2209 | cmd, | |
2210 | inbl, | |
2211 | poutbl, | |
2212 | prs, | |
2213 | onfinish); | |
2214 | submit_command(c, ptid); | |
2215 | } | |
224ce89b | 2216 | void pg_command(pg_t pgid, const vector<string>& cmd, |
7c673cae FG |
2217 | const bufferlist& inbl, ceph_tid_t *ptid, |
2218 | bufferlist *poutbl, string *prs, Context *onfinish) { | |
2219 | CommandOp *c = new CommandOp( | |
2220 | pgid, | |
2221 | cmd, | |
2222 | inbl, | |
2223 | poutbl, | |
2224 | prs, | |
2225 | onfinish); | |
2226 | submit_command(c, ptid); | |
2227 | } | |
2228 | ||
2229 | // mid-level helpers | |
2230 | Op *prepare_mutate_op( | |
2231 | const object_t& oid, const object_locator_t& oloc, | |
2232 | ObjectOperation& op, const SnapContext& snapc, | |
2233 | ceph::real_time mtime, int flags, | |
2234 | Context *oncommit, version_t *objver = NULL, | |
2235 | osd_reqid_t reqid = osd_reqid_t(), | |
2236 | ZTracer::Trace *parent_trace = nullptr) { | |
31f18b77 | 2237 | Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | |
7c673cae FG |
2238 | CEPH_OSD_FLAG_WRITE, oncommit, objver, nullptr, parent_trace); |
2239 | o->priority = op.priority; | |
2240 | o->mtime = mtime; | |
2241 | o->snapc = snapc; | |
2242 | o->out_rval.swap(op.out_rval); | |
2243 | o->reqid = reqid; | |
2244 | return o; | |
2245 | } | |
2246 | ceph_tid_t mutate( | |
2247 | const object_t& oid, const object_locator_t& oloc, | |
2248 | ObjectOperation& op, const SnapContext& snapc, | |
2249 | ceph::real_time mtime, int flags, | |
2250 | Context *oncommit, version_t *objver = NULL, | |
2251 | osd_reqid_t reqid = osd_reqid_t()) { | |
2252 | Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags, | |
2253 | oncommit, objver, reqid); | |
2254 | ceph_tid_t tid; | |
2255 | op_submit(o, &tid); | |
2256 | return tid; | |
2257 | } | |
2258 | Op *prepare_read_op( | |
2259 | const object_t& oid, const object_locator_t& oloc, | |
2260 | ObjectOperation& op, | |
2261 | snapid_t snapid, bufferlist *pbl, int flags, | |
2262 | Context *onack, version_t *objver = NULL, | |
2263 | int *data_offset = NULL, | |
2264 | uint64_t features = 0, | |
2265 | ZTracer::Trace *parent_trace = nullptr) { | |
31f18b77 | 2266 | Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | |
7c673cae FG |
2267 | CEPH_OSD_FLAG_READ, onack, objver, data_offset, parent_trace); |
2268 | o->priority = op.priority; | |
2269 | o->snapid = snapid; | |
2270 | o->outbl = pbl; | |
2271 | if (!o->outbl && op.size() == 1 && op.out_bl[0]->length()) | |
2272 | o->outbl = op.out_bl[0]; | |
2273 | o->out_bl.swap(op.out_bl); | |
2274 | o->out_handler.swap(op.out_handler); | |
2275 | o->out_rval.swap(op.out_rval); | |
2276 | return o; | |
2277 | } | |
2278 | ceph_tid_t read( | |
2279 | const object_t& oid, const object_locator_t& oloc, | |
2280 | ObjectOperation& op, | |
2281 | snapid_t snapid, bufferlist *pbl, int flags, | |
2282 | Context *onack, version_t *objver = NULL, | |
2283 | int *data_offset = NULL, | |
2284 | uint64_t features = 0) { | |
2285 | Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver, | |
2286 | data_offset); | |
2287 | if (features) | |
2288 | o->features = features; | |
2289 | ceph_tid_t tid; | |
2290 | op_submit(o, &tid); | |
2291 | return tid; | |
2292 | } | |
2293 | Op *prepare_pg_read_op( | |
2294 | uint32_t hash, object_locator_t oloc, | |
2295 | ObjectOperation& op, bufferlist *pbl, int flags, | |
2296 | Context *onack, epoch_t *reply_epoch, | |
2297 | int *ctx_budget) { | |
2298 | Op *o = new Op(object_t(), oloc, | |
2299 | op.ops, | |
31f18b77 | 2300 | flags | global_op_flags | CEPH_OSD_FLAG_READ | |
7c673cae FG |
2301 | CEPH_OSD_FLAG_IGNORE_OVERLAY, |
2302 | onack, NULL); | |
2303 | o->target.precalc_pgid = true; | |
2304 | o->target.base_pgid = pg_t(hash, oloc.pool); | |
2305 | o->priority = op.priority; | |
2306 | o->snapid = CEPH_NOSNAP; | |
2307 | o->outbl = pbl; | |
2308 | o->out_bl.swap(op.out_bl); | |
2309 | o->out_handler.swap(op.out_handler); | |
2310 | o->out_rval.swap(op.out_rval); | |
2311 | o->reply_epoch = reply_epoch; | |
2312 | if (ctx_budget) { | |
2313 | // budget is tracked by listing context | |
2314 | o->ctx_budgeted = true; | |
2315 | } | |
2316 | return o; | |
2317 | } | |
2318 | ceph_tid_t pg_read( | |
2319 | uint32_t hash, object_locator_t oloc, | |
2320 | ObjectOperation& op, bufferlist *pbl, int flags, | |
2321 | Context *onack, epoch_t *reply_epoch, | |
2322 | int *ctx_budget) { | |
2323 | Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags, | |
2324 | onack, reply_epoch, ctx_budget); | |
2325 | ceph_tid_t tid; | |
2326 | op_submit(o, &tid, ctx_budget); | |
2327 | return tid; | |
2328 | } | |
2329 | ||
2330 | // caller owns a ref | |
2331 | LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc, | |
2332 | int flags); | |
2333 | ceph_tid_t linger_watch(LingerOp *info, | |
2334 | ObjectOperation& op, | |
2335 | const SnapContext& snapc, ceph::real_time mtime, | |
2336 | bufferlist& inbl, | |
2337 | Context *onfinish, | |
2338 | version_t *objver); | |
2339 | ceph_tid_t linger_notify(LingerOp *info, | |
2340 | ObjectOperation& op, | |
2341 | snapid_t snap, bufferlist& inbl, | |
2342 | bufferlist *poutbl, | |
2343 | Context *onack, | |
2344 | version_t *objver); | |
2345 | int linger_check(LingerOp *info); | |
2346 | void linger_cancel(LingerOp *info); // releases a reference | |
2347 | void _linger_cancel(LingerOp *info); | |
2348 | ||
2349 | void _do_watch_notify(LingerOp *info, MWatchNotify *m); | |
2350 | ||
2351 | /** | |
2352 | * set up initial ops in the op vector, and allocate a final op slot. | |
2353 | * | |
2354 | * The caller is responsible for filling in the final ops_count ops. | |
2355 | * | |
2356 | * @param ops op vector | |
2357 | * @param ops_count number of final ops the caller will fill in | |
2358 | * @param extra_ops pointer to [array of] initial op[s] | |
2359 | * @return index of final op (for caller to fill in) | |
2360 | */ | |
2361 | int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) { | |
2362 | int i; | |
2363 | int extra = 0; | |
2364 | ||
2365 | if (extra_ops) | |
2366 | extra = extra_ops->ops.size(); | |
2367 | ||
2368 | ops.resize(ops_count + extra); | |
2369 | ||
2370 | for (i=0; i<extra; i++) { | |
2371 | ops[i] = extra_ops->ops[i]; | |
2372 | } | |
2373 | ||
2374 | return i; | |
2375 | } | |
2376 | ||
2377 | ||
2378 | // high-level helpers | |
2379 | Op *prepare_stat_op( | |
2380 | const object_t& oid, const object_locator_t& oloc, | |
2381 | snapid_t snap, uint64_t *psize, ceph::real_time *pmtime, | |
2382 | int flags, Context *onfinish, version_t *objver = NULL, | |
2383 | ObjectOperation *extra_ops = NULL) { | |
2384 | vector<OSDOp> ops; | |
2385 | int i = init_ops(ops, 1, extra_ops); | |
2386 | ops[i].op.op = CEPH_OSD_OP_STAT; | |
2387 | C_Stat *fin = new C_Stat(psize, pmtime, onfinish); | |
31f18b77 | 2388 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2389 | CEPH_OSD_FLAG_READ, fin, objver); |
2390 | o->snapid = snap; | |
2391 | o->outbl = &fin->bl; | |
2392 | return o; | |
2393 | } | |
2394 | ceph_tid_t stat( | |
2395 | const object_t& oid, const object_locator_t& oloc, | |
2396 | snapid_t snap, uint64_t *psize, ceph::real_time *pmtime, | |
2397 | int flags, Context *onfinish, version_t *objver = NULL, | |
2398 | ObjectOperation *extra_ops = NULL) { | |
2399 | Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags, | |
2400 | onfinish, objver, extra_ops); | |
2401 | ceph_tid_t tid; | |
2402 | op_submit(o, &tid); | |
2403 | return tid; | |
2404 | } | |
2405 | ||
2406 | Op *prepare_read_op( | |
2407 | const object_t& oid, const object_locator_t& oloc, | |
2408 | uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, | |
2409 | int flags, Context *onfinish, version_t *objver = NULL, | |
2410 | ObjectOperation *extra_ops = NULL, int op_flags = 0, | |
2411 | ZTracer::Trace *parent_trace = nullptr) { | |
2412 | vector<OSDOp> ops; | |
2413 | int i = init_ops(ops, 1, extra_ops); | |
2414 | ops[i].op.op = CEPH_OSD_OP_READ; | |
2415 | ops[i].op.extent.offset = off; | |
2416 | ops[i].op.extent.length = len; | |
2417 | ops[i].op.extent.truncate_size = 0; | |
2418 | ops[i].op.extent.truncate_seq = 0; | |
2419 | ops[i].op.flags = op_flags; | |
31f18b77 | 2420 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2421 | CEPH_OSD_FLAG_READ, onfinish, objver, nullptr, parent_trace); |
2422 | o->snapid = snap; | |
2423 | o->outbl = pbl; | |
2424 | return o; | |
2425 | } | |
2426 | ceph_tid_t read( | |
2427 | const object_t& oid, const object_locator_t& oloc, | |
2428 | uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, | |
2429 | int flags, Context *onfinish, version_t *objver = NULL, | |
2430 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2431 | Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags, | |
2432 | onfinish, objver, extra_ops, op_flags); | |
2433 | ceph_tid_t tid; | |
2434 | op_submit(o, &tid); | |
2435 | return tid; | |
2436 | } | |
2437 | ||
2438 | Op *prepare_cmpext_op( | |
2439 | const object_t& oid, const object_locator_t& oloc, | |
2440 | uint64_t off, bufferlist &cmp_bl, | |
2441 | snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL, | |
2442 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2443 | vector<OSDOp> ops; | |
2444 | int i = init_ops(ops, 1, extra_ops); | |
2445 | ops[i].op.op = CEPH_OSD_OP_CMPEXT; | |
2446 | ops[i].op.extent.offset = off; | |
2447 | ops[i].op.extent.length = cmp_bl.length(); | |
2448 | ops[i].op.extent.truncate_size = 0; | |
2449 | ops[i].op.extent.truncate_seq = 0; | |
2450 | ops[i].indata = cmp_bl; | |
2451 | ops[i].op.flags = op_flags; | |
31f18b77 | 2452 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2453 | CEPH_OSD_FLAG_READ, onfinish, objver); |
2454 | o->snapid = snap; | |
2455 | return o; | |
2456 | } | |
2457 | ||
2458 | ceph_tid_t cmpext( | |
2459 | const object_t& oid, const object_locator_t& oloc, | |
2460 | uint64_t off, bufferlist &cmp_bl, | |
2461 | snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL, | |
2462 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2463 | Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap, | |
2464 | flags, onfinish, objver, extra_ops, op_flags); | |
2465 | ceph_tid_t tid; | |
2466 | op_submit(o, &tid); | |
2467 | return tid; | |
2468 | } | |
2469 | ||
2470 | ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc, | |
2471 | uint64_t off, uint64_t len, snapid_t snap, | |
2472 | bufferlist *pbl, int flags, uint64_t trunc_size, | |
2473 | __u32 trunc_seq, Context *onfinish, | |
2474 | version_t *objver = NULL, | |
2475 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2476 | vector<OSDOp> ops; | |
2477 | int i = init_ops(ops, 1, extra_ops); | |
2478 | ops[i].op.op = CEPH_OSD_OP_READ; | |
2479 | ops[i].op.extent.offset = off; | |
2480 | ops[i].op.extent.length = len; | |
2481 | ops[i].op.extent.truncate_size = trunc_size; | |
2482 | ops[i].op.extent.truncate_seq = trunc_seq; | |
2483 | ops[i].op.flags = op_flags; | |
31f18b77 | 2484 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2485 | CEPH_OSD_FLAG_READ, onfinish, objver); |
2486 | o->snapid = snap; | |
2487 | o->outbl = pbl; | |
2488 | ceph_tid_t tid; | |
2489 | op_submit(o, &tid); | |
2490 | return tid; | |
2491 | } | |
2492 | ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc, | |
2493 | uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, | |
2494 | int flags, Context *onfinish, version_t *objver = NULL, | |
2495 | ObjectOperation *extra_ops = NULL) { | |
2496 | vector<OSDOp> ops; | |
2497 | int i = init_ops(ops, 1, extra_ops); | |
2498 | ops[i].op.op = CEPH_OSD_OP_MAPEXT; | |
2499 | ops[i].op.extent.offset = off; | |
2500 | ops[i].op.extent.length = len; | |
2501 | ops[i].op.extent.truncate_size = 0; | |
2502 | ops[i].op.extent.truncate_seq = 0; | |
31f18b77 | 2503 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2504 | CEPH_OSD_FLAG_READ, onfinish, objver); |
2505 | o->snapid = snap; | |
2506 | o->outbl = pbl; | |
2507 | ceph_tid_t tid; | |
2508 | op_submit(o, &tid); | |
2509 | return tid; | |
2510 | } | |
2511 | ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc, | |
2512 | const char *name, snapid_t snap, bufferlist *pbl, int flags, | |
2513 | Context *onfinish, | |
2514 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2515 | vector<OSDOp> ops; | |
2516 | int i = init_ops(ops, 1, extra_ops); | |
2517 | ops[i].op.op = CEPH_OSD_OP_GETXATTR; | |
2518 | ops[i].op.xattr.name_len = (name ? strlen(name) : 0); | |
2519 | ops[i].op.xattr.value_len = 0; | |
2520 | if (name) | |
2521 | ops[i].indata.append(name); | |
31f18b77 | 2522 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2523 | CEPH_OSD_FLAG_READ, onfinish, objver); |
2524 | o->snapid = snap; | |
2525 | o->outbl = pbl; | |
2526 | ceph_tid_t tid; | |
2527 | op_submit(o, &tid); | |
2528 | return tid; | |
2529 | } | |
2530 | ||
2531 | ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc, | |
2532 | snapid_t snap, map<string,bufferlist>& attrset, | |
2533 | int flags, Context *onfinish, version_t *objver = NULL, | |
2534 | ObjectOperation *extra_ops = NULL) { | |
2535 | vector<OSDOp> ops; | |
2536 | int i = init_ops(ops, 1, extra_ops); | |
2537 | ops[i].op.op = CEPH_OSD_OP_GETXATTRS; | |
2538 | C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish); | |
31f18b77 | 2539 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2540 | CEPH_OSD_FLAG_READ, fin, objver); |
2541 | o->snapid = snap; | |
2542 | o->outbl = &fin->bl; | |
2543 | ceph_tid_t tid; | |
2544 | op_submit(o, &tid); | |
2545 | return tid; | |
2546 | } | |
2547 | ||
2548 | ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc, | |
2549 | snapid_t snap, bufferlist *pbl, int flags, | |
2550 | Context *onfinish, version_t *objver = NULL, | |
2551 | ObjectOperation *extra_ops = NULL) { | |
31f18b77 | 2552 | return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags | |
7c673cae FG |
2553 | CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops); |
2554 | } | |
2555 | ||
2556 | ||
2557 | // writes | |
2558 | ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc, | |
2559 | vector<OSDOp>& ops, ceph::real_time mtime, | |
2560 | const SnapContext& snapc, int flags, | |
2561 | Context *oncommit, | |
2562 | version_t *objver = NULL) { | |
31f18b77 | 2563 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2564 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2565 | o->mtime = mtime; | |
2566 | o->snapc = snapc; | |
2567 | ceph_tid_t tid; | |
2568 | op_submit(o, &tid); | |
2569 | return tid; | |
2570 | } | |
2571 | Op *prepare_write_op( | |
2572 | const object_t& oid, const object_locator_t& oloc, | |
2573 | uint64_t off, uint64_t len, const SnapContext& snapc, | |
2574 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2575 | Context *oncommit, version_t *objver = NULL, | |
2576 | ObjectOperation *extra_ops = NULL, int op_flags = 0, | |
2577 | ZTracer::Trace *parent_trace = nullptr) { | |
2578 | vector<OSDOp> ops; | |
2579 | int i = init_ops(ops, 1, extra_ops); | |
2580 | ops[i].op.op = CEPH_OSD_OP_WRITE; | |
2581 | ops[i].op.extent.offset = off; | |
2582 | ops[i].op.extent.length = len; | |
2583 | ops[i].op.extent.truncate_size = 0; | |
2584 | ops[i].op.extent.truncate_seq = 0; | |
2585 | ops[i].indata = bl; | |
2586 | ops[i].op.flags = op_flags; | |
31f18b77 | 2587 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2588 | CEPH_OSD_FLAG_WRITE, oncommit, objver, |
2589 | nullptr, parent_trace); | |
2590 | o->mtime = mtime; | |
2591 | o->snapc = snapc; | |
2592 | return o; | |
2593 | } | |
2594 | ceph_tid_t write( | |
2595 | const object_t& oid, const object_locator_t& oloc, | |
2596 | uint64_t off, uint64_t len, const SnapContext& snapc, | |
2597 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2598 | Context *oncommit, version_t *objver = NULL, | |
2599 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2600 | Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags, | |
2601 | oncommit, objver, extra_ops, op_flags); | |
2602 | ceph_tid_t tid; | |
2603 | op_submit(o, &tid); | |
2604 | return tid; | |
2605 | } | |
2606 | Op *prepare_append_op( | |
2607 | const object_t& oid, const object_locator_t& oloc, | |
2608 | uint64_t len, const SnapContext& snapc, | |
2609 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2610 | Context *oncommit, | |
2611 | version_t *objver = NULL, | |
2612 | ObjectOperation *extra_ops = NULL) { | |
2613 | vector<OSDOp> ops; | |
2614 | int i = init_ops(ops, 1, extra_ops); | |
2615 | ops[i].op.op = CEPH_OSD_OP_APPEND; | |
2616 | ops[i].op.extent.offset = 0; | |
2617 | ops[i].op.extent.length = len; | |
2618 | ops[i].op.extent.truncate_size = 0; | |
2619 | ops[i].op.extent.truncate_seq = 0; | |
2620 | ops[i].indata = bl; | |
31f18b77 | 2621 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2622 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2623 | o->mtime = mtime; | |
2624 | o->snapc = snapc; | |
2625 | return o; | |
2626 | } | |
2627 | ceph_tid_t append( | |
2628 | const object_t& oid, const object_locator_t& oloc, | |
2629 | uint64_t len, const SnapContext& snapc, | |
2630 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2631 | Context *oncommit, | |
2632 | version_t *objver = NULL, | |
2633 | ObjectOperation *extra_ops = NULL) { | |
2634 | Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags, | |
2635 | oncommit, objver, extra_ops); | |
2636 | ceph_tid_t tid; | |
2637 | op_submit(o, &tid); | |
2638 | return tid; | |
2639 | } | |
2640 | ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc, | |
2641 | uint64_t off, uint64_t len, const SnapContext& snapc, | |
2642 | const bufferlist &bl, ceph::real_time mtime, int flags, | |
2643 | uint64_t trunc_size, __u32 trunc_seq, | |
2644 | Context *oncommit, | |
2645 | version_t *objver = NULL, | |
2646 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2647 | vector<OSDOp> ops; | |
2648 | int i = init_ops(ops, 1, extra_ops); | |
2649 | ops[i].op.op = CEPH_OSD_OP_WRITE; | |
2650 | ops[i].op.extent.offset = off; | |
2651 | ops[i].op.extent.length = len; | |
2652 | ops[i].op.extent.truncate_size = trunc_size; | |
2653 | ops[i].op.extent.truncate_seq = trunc_seq; | |
2654 | ops[i].indata = bl; | |
2655 | ops[i].op.flags = op_flags; | |
31f18b77 | 2656 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2657 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2658 | o->mtime = mtime; | |
2659 | o->snapc = snapc; | |
2660 | ceph_tid_t tid; | |
2661 | op_submit(o, &tid); | |
2662 | return tid; | |
2663 | } | |
2664 | Op *prepare_write_full_op( | |
2665 | const object_t& oid, const object_locator_t& oloc, | |
2666 | const SnapContext& snapc, const bufferlist &bl, | |
2667 | ceph::real_time mtime, int flags, | |
2668 | Context *oncommit, version_t *objver = NULL, | |
2669 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2670 | vector<OSDOp> ops; | |
2671 | int i = init_ops(ops, 1, extra_ops); | |
2672 | ops[i].op.op = CEPH_OSD_OP_WRITEFULL; | |
2673 | ops[i].op.extent.offset = 0; | |
2674 | ops[i].op.extent.length = bl.length(); | |
2675 | ops[i].indata = bl; | |
2676 | ops[i].op.flags = op_flags; | |
31f18b77 | 2677 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2678 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2679 | o->mtime = mtime; | |
2680 | o->snapc = snapc; | |
2681 | return o; | |
2682 | } | |
2683 | ceph_tid_t write_full( | |
2684 | const object_t& oid, const object_locator_t& oloc, | |
2685 | const SnapContext& snapc, const bufferlist &bl, | |
2686 | ceph::real_time mtime, int flags, | |
2687 | Context *oncommit, version_t *objver = NULL, | |
2688 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2689 | Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags, | |
2690 | oncommit, objver, extra_ops, op_flags); | |
2691 | ceph_tid_t tid; | |
2692 | op_submit(o, &tid); | |
2693 | return tid; | |
2694 | } | |
2695 | Op *prepare_writesame_op( | |
2696 | const object_t& oid, const object_locator_t& oloc, | |
2697 | uint64_t write_len, uint64_t off, | |
2698 | const SnapContext& snapc, const bufferlist &bl, | |
2699 | ceph::real_time mtime, int flags, | |
2700 | Context *oncommit, version_t *objver = NULL, | |
2701 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2702 | ||
2703 | vector<OSDOp> ops; | |
2704 | int i = init_ops(ops, 1, extra_ops); | |
2705 | ops[i].op.op = CEPH_OSD_OP_WRITESAME; | |
2706 | ops[i].op.writesame.offset = off; | |
2707 | ops[i].op.writesame.length = write_len; | |
2708 | ops[i].op.writesame.data_length = bl.length(); | |
2709 | ops[i].indata = bl; | |
2710 | ops[i].op.flags = op_flags; | |
31f18b77 | 2711 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2712 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2713 | o->mtime = mtime; | |
2714 | o->snapc = snapc; | |
2715 | return o; | |
2716 | } | |
2717 | ceph_tid_t writesame( | |
2718 | const object_t& oid, const object_locator_t& oloc, | |
2719 | uint64_t write_len, uint64_t off, | |
2720 | const SnapContext& snapc, const bufferlist &bl, | |
2721 | ceph::real_time mtime, int flags, | |
2722 | Context *oncommit, version_t *objver = NULL, | |
2723 | ObjectOperation *extra_ops = NULL, int op_flags = 0) { | |
2724 | ||
2725 | Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl, | |
2726 | mtime, flags, oncommit, objver, | |
2727 | extra_ops, op_flags); | |
2728 | ||
2729 | ceph_tid_t tid; | |
2730 | op_submit(o, &tid); | |
2731 | return tid; | |
2732 | } | |
2733 | ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc, | |
2734 | const SnapContext& snapc, ceph::real_time mtime, int flags, | |
2735 | uint64_t trunc_size, __u32 trunc_seq, | |
2736 | Context *oncommit, version_t *objver = NULL, | |
2737 | ObjectOperation *extra_ops = NULL) { | |
2738 | vector<OSDOp> ops; | |
2739 | int i = init_ops(ops, 1, extra_ops); | |
2740 | ops[i].op.op = CEPH_OSD_OP_TRUNCATE; | |
2741 | ops[i].op.extent.offset = trunc_size; | |
2742 | ops[i].op.extent.truncate_size = trunc_size; | |
2743 | ops[i].op.extent.truncate_seq = trunc_seq; | |
31f18b77 | 2744 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2745 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2746 | o->mtime = mtime; | |
2747 | o->snapc = snapc; | |
2748 | ceph_tid_t tid; | |
2749 | op_submit(o, &tid); | |
2750 | return tid; | |
2751 | } | |
2752 | ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc, | |
2753 | uint64_t off, uint64_t len, const SnapContext& snapc, | |
2754 | ceph::real_time mtime, int flags, Context *oncommit, | |
2755 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2756 | vector<OSDOp> ops; | |
2757 | int i = init_ops(ops, 1, extra_ops); | |
2758 | ops[i].op.op = CEPH_OSD_OP_ZERO; | |
2759 | ops[i].op.extent.offset = off; | |
2760 | ops[i].op.extent.length = len; | |
31f18b77 | 2761 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2762 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2763 | o->mtime = mtime; | |
2764 | o->snapc = snapc; | |
2765 | ceph_tid_t tid; | |
2766 | op_submit(o, &tid); | |
2767 | return tid; | |
2768 | } | |
2769 | ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc, | |
2770 | const SnapContext& snapc, snapid_t snapid, | |
2771 | ceph::real_time mtime, Context *oncommit, | |
2772 | version_t *objver = NULL, | |
2773 | ObjectOperation *extra_ops = NULL) { | |
2774 | vector<OSDOp> ops; | |
2775 | int i = init_ops(ops, 1, extra_ops); | |
2776 | ops[i].op.op = CEPH_OSD_OP_ROLLBACK; | |
2777 | ops[i].op.snap.snapid = snapid; | |
2778 | Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver); | |
2779 | o->mtime = mtime; | |
2780 | o->snapc = snapc; | |
2781 | ceph_tid_t tid; | |
2782 | op_submit(o, &tid); | |
2783 | return tid; | |
2784 | } | |
2785 | ceph_tid_t create(const object_t& oid, const object_locator_t& oloc, | |
2786 | const SnapContext& snapc, ceph::real_time mtime, int global_flags, | |
2787 | int create_flags, Context *oncommit, | |
2788 | version_t *objver = NULL, | |
2789 | ObjectOperation *extra_ops = NULL) { | |
2790 | vector<OSDOp> ops; | |
2791 | int i = init_ops(ops, 1, extra_ops); | |
2792 | ops[i].op.op = CEPH_OSD_OP_CREATE; | |
2793 | ops[i].op.flags = create_flags; | |
31f18b77 | 2794 | Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags | |
7c673cae FG |
2795 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2796 | o->mtime = mtime; | |
2797 | o->snapc = snapc; | |
2798 | ceph_tid_t tid; | |
2799 | op_submit(o, &tid); | |
2800 | return tid; | |
2801 | } | |
2802 | Op *prepare_remove_op( | |
2803 | const object_t& oid, const object_locator_t& oloc, | |
2804 | const SnapContext& snapc, ceph::real_time mtime, int flags, | |
2805 | Context *oncommit, | |
2806 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2807 | vector<OSDOp> ops; | |
2808 | int i = init_ops(ops, 1, extra_ops); | |
2809 | ops[i].op.op = CEPH_OSD_OP_DELETE; | |
31f18b77 | 2810 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2811 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2812 | o->mtime = mtime; | |
2813 | o->snapc = snapc; | |
2814 | return o; | |
2815 | } | |
2816 | ceph_tid_t remove( | |
2817 | const object_t& oid, const object_locator_t& oloc, | |
2818 | const SnapContext& snapc, ceph::real_time mtime, int flags, | |
2819 | Context *oncommit, | |
2820 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2821 | Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags, | |
2822 | oncommit, objver, extra_ops); | |
2823 | ceph_tid_t tid; | |
2824 | op_submit(o, &tid); | |
2825 | return tid; | |
2826 | } | |
2827 | ||
2828 | ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc, | |
2829 | const char *name, const SnapContext& snapc, const bufferlist &bl, | |
2830 | ceph::real_time mtime, int flags, | |
2831 | Context *oncommit, | |
2832 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2833 | vector<OSDOp> ops; | |
2834 | int i = init_ops(ops, 1, extra_ops); | |
2835 | ops[i].op.op = CEPH_OSD_OP_SETXATTR; | |
2836 | ops[i].op.xattr.name_len = (name ? strlen(name) : 0); | |
2837 | ops[i].op.xattr.value_len = bl.length(); | |
2838 | if (name) | |
2839 | ops[i].indata.append(name); | |
2840 | ops[i].indata.append(bl); | |
31f18b77 | 2841 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2842 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2843 | o->mtime = mtime; | |
2844 | o->snapc = snapc; | |
2845 | ceph_tid_t tid; | |
2846 | op_submit(o, &tid); | |
2847 | return tid; | |
2848 | } | |
2849 | ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc, | |
2850 | const char *name, const SnapContext& snapc, | |
2851 | ceph::real_time mtime, int flags, | |
2852 | Context *oncommit, | |
2853 | version_t *objver = NULL, ObjectOperation *extra_ops = NULL) { | |
2854 | vector<OSDOp> ops; | |
2855 | int i = init_ops(ops, 1, extra_ops); | |
2856 | ops[i].op.op = CEPH_OSD_OP_RMXATTR; | |
2857 | ops[i].op.xattr.name_len = (name ? strlen(name) : 0); | |
2858 | ops[i].op.xattr.value_len = 0; | |
2859 | if (name) | |
2860 | ops[i].indata.append(name); | |
31f18b77 | 2861 | Op *o = new Op(oid, oloc, ops, flags | global_op_flags | |
7c673cae FG |
2862 | CEPH_OSD_FLAG_WRITE, oncommit, objver); |
2863 | o->mtime = mtime; | |
2864 | o->snapc = snapc; | |
2865 | ceph_tid_t tid; | |
2866 | op_submit(o, &tid); | |
2867 | return tid; | |
2868 | } | |
2869 | ||
2870 | void list_nobjects(NListContext *p, Context *onfinish); | |
2871 | uint32_t list_nobjects_seek(NListContext *p, uint32_t pos); | |
2872 | uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c); | |
2873 | void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c); | |
2874 | ||
2875 | hobject_t enumerate_objects_begin(); | |
2876 | hobject_t enumerate_objects_end(); | |
2877 | //hobject_t enumerate_objects_begin(int n, int m); | |
2878 | void enumerate_objects( | |
2879 | int64_t pool_id, | |
2880 | const std::string &ns, | |
2881 | const hobject_t &start, | |
2882 | const hobject_t &end, | |
2883 | const uint32_t max, | |
2884 | const bufferlist &filter_bl, | |
2885 | std::list<librados::ListObjectImpl> *result, | |
2886 | hobject_t *next, | |
2887 | Context *on_finish); | |
2888 | ||
2889 | void _enumerate_reply( | |
2890 | bufferlist &bl, | |
2891 | int r, | |
2892 | const hobject_t &end, | |
2893 | const int64_t pool_id, | |
2894 | int budget, | |
2895 | epoch_t reply_epoch, | |
2896 | std::list<librados::ListObjectImpl> *result, | |
2897 | hobject_t *next, | |
2898 | Context *on_finish); | |
2899 | friend class C_EnumerateReply; | |
2900 | ||
2901 | // ------------------------- | |
2902 | // pool ops | |
2903 | private: | |
2904 | void pool_op_submit(PoolOp *op); | |
2905 | void _pool_op_submit(PoolOp *op); | |
2906 | void _finish_pool_op(PoolOp *op, int r); | |
2907 | void _do_delete_pool(int64_t pool, Context *onfinish); | |
2908 | public: | |
2909 | int create_pool_snap(int64_t pool, string& snapName, Context *onfinish); | |
2910 | int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, | |
2911 | Context *onfinish); | |
2912 | int delete_pool_snap(int64_t pool, string& snapName, Context *onfinish); | |
2913 | int delete_selfmanaged_snap(int64_t pool, snapid_t snap, Context *onfinish); | |
2914 | ||
2915 | int create_pool(string& name, Context *onfinish, uint64_t auid=0, | |
2916 | int crush_rule=-1); | |
2917 | int delete_pool(int64_t pool, Context *onfinish); | |
2918 | int delete_pool(const string& name, Context *onfinish); | |
2919 | int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid); | |
2920 | ||
2921 | void handle_pool_op_reply(MPoolOpReply *m); | |
2922 | int pool_op_cancel(ceph_tid_t tid, int r); | |
2923 | ||
2924 | // -------------------------- | |
2925 | // pool stats | |
2926 | private: | |
2927 | void _poolstat_submit(PoolStatOp *op); | |
2928 | public: | |
2929 | void handle_get_pool_stats_reply(MGetPoolStatsReply *m); | |
2930 | void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result, | |
2931 | Context *onfinish); | |
2932 | int pool_stat_op_cancel(ceph_tid_t tid, int r); | |
2933 | void _finish_pool_stat_op(PoolStatOp *op, int r); | |
2934 | ||
2935 | // --------------------------- | |
2936 | // df stats | |
2937 | private: | |
2938 | void _fs_stats_submit(StatfsOp *op); | |
2939 | public: | |
2940 | void handle_fs_stats_reply(MStatfsReply *m); | |
d2e6a577 FG |
2941 | void get_fs_stats(struct ceph_statfs& result, boost::optional<int64_t> poolid, |
2942 | Context *onfinish); | |
7c673cae FG |
2943 | int statfs_op_cancel(ceph_tid_t tid, int r); |
2944 | void _finish_statfs_op(StatfsOp *op, int r); | |
2945 | ||
2946 | // --------------------------- | |
2947 | // some scatter/gather hackery | |
2948 | ||
2949 | void _sg_read_finish(vector<ObjectExtent>& extents, | |
2950 | vector<bufferlist>& resultbl, | |
2951 | bufferlist *bl, Context *onfinish); | |
2952 | ||
2953 | struct C_SGRead : public Context { | |
2954 | Objecter *objecter; | |
2955 | vector<ObjectExtent> extents; | |
2956 | vector<bufferlist> resultbl; | |
2957 | bufferlist *bl; | |
2958 | Context *onfinish; | |
2959 | C_SGRead(Objecter *ob, | |
2960 | vector<ObjectExtent>& e, vector<bufferlist>& r, bufferlist *b, | |
2961 | Context *c) : | |
2962 | objecter(ob), bl(b), onfinish(c) { | |
2963 | extents.swap(e); | |
2964 | resultbl.swap(r); | |
2965 | } | |
2966 | void finish(int r) override { | |
2967 | objecter->_sg_read_finish(extents, resultbl, bl, onfinish); | |
2968 | } | |
2969 | }; | |
2970 | ||
2971 | void sg_read_trunc(vector<ObjectExtent>& extents, snapid_t snap, | |
2972 | bufferlist *bl, int flags, uint64_t trunc_size, | |
2973 | __u32 trunc_seq, Context *onfinish, int op_flags = 0) { | |
2974 | if (extents.size() == 1) { | |
2975 | read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, | |
2976 | extents[0].length, snap, bl, flags, extents[0].truncate_size, | |
2977 | trunc_seq, onfinish, 0, 0, op_flags); | |
2978 | } else { | |
2979 | C_GatherBuilder gather(cct); | |
2980 | vector<bufferlist> resultbl(extents.size()); | |
2981 | int i=0; | |
2982 | for (vector<ObjectExtent>::iterator p = extents.begin(); | |
2983 | p != extents.end(); | |
2984 | ++p) { | |
2985 | read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++], | |
2986 | flags, p->truncate_size, trunc_seq, gather.new_sub(), | |
2987 | 0, 0, op_flags); | |
2988 | } | |
2989 | gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish)); | |
2990 | gather.activate(); | |
2991 | } | |
2992 | } | |
2993 | ||
2994 | void sg_read(vector<ObjectExtent>& extents, snapid_t snap, bufferlist *bl, | |
2995 | int flags, Context *onfinish, int op_flags = 0) { | |
2996 | sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags); | |
2997 | } | |
2998 | ||
2999 | void sg_write_trunc(vector<ObjectExtent>& extents, const SnapContext& snapc, | |
3000 | const bufferlist& bl, ceph::real_time mtime, int flags, | |
3001 | uint64_t trunc_size, __u32 trunc_seq, | |
3002 | Context *oncommit, int op_flags = 0) { | |
3003 | if (extents.size() == 1) { | |
3004 | write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset, | |
3005 | extents[0].length, snapc, bl, mtime, flags, | |
3006 | extents[0].truncate_size, trunc_seq, oncommit, | |
3007 | 0, 0, op_flags); | |
3008 | } else { | |
3009 | C_GatherBuilder gcom(cct, oncommit); | |
3010 | for (vector<ObjectExtent>::iterator p = extents.begin(); | |
3011 | p != extents.end(); | |
3012 | ++p) { | |
3013 | bufferlist cur; | |
3014 | for (vector<pair<uint64_t,uint64_t> >::iterator bit | |
3015 | = p->buffer_extents.begin(); | |
3016 | bit != p->buffer_extents.end(); | |
3017 | ++bit) | |
3018 | bl.copy(bit->first, bit->second, cur); | |
3019 | assert(cur.length() == p->length); | |
3020 | write_trunc(p->oid, p->oloc, p->offset, p->length, | |
3021 | snapc, cur, mtime, flags, p->truncate_size, trunc_seq, | |
3022 | oncommit ? gcom.new_sub():0, | |
3023 | 0, 0, op_flags); | |
3024 | } | |
3025 | gcom.activate(); | |
3026 | } | |
3027 | } | |
3028 | ||
3029 | void sg_write(vector<ObjectExtent>& extents, const SnapContext& snapc, | |
3030 | const bufferlist& bl, ceph::real_time mtime, int flags, | |
3031 | Context *oncommit, int op_flags = 0) { | |
3032 | sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit, | |
3033 | op_flags); | |
3034 | } | |
3035 | ||
3036 | void ms_handle_connect(Connection *con) override; | |
3037 | bool ms_handle_reset(Connection *con) override; | |
3038 | void ms_handle_remote_reset(Connection *con) override; | |
3039 | bool ms_handle_refused(Connection *con) override; | |
3040 | bool ms_get_authorizer(int dest_type, | |
3041 | AuthAuthorizer **authorizer, | |
3042 | bool force_new) override; | |
3043 | ||
3044 | void blacklist_self(bool set); | |
3045 | ||
3046 | private: | |
3047 | epoch_t epoch_barrier; | |
3048 | bool retry_writes_after_first_reply; | |
3049 | public: | |
3050 | void set_epoch_barrier(epoch_t epoch); | |
3051 | ||
3052 | PerfCounters *get_logger() { | |
3053 | return logger; | |
3054 | } | |
3055 | }; | |
3056 | ||
3057 | #endif |