]> git.proxmox.com Git - ceph.git/blame - ceph/src/librados/IoCtxImpl.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / librados / IoCtxImpl.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <limits.h>
16
17#include "IoCtxImpl.h"
18
11fdf7f2 19#include "librados/librados_c.h"
7c673cae
FG
20#include "librados/AioCompletionImpl.h"
21#include "librados/PoolAsyncCompletionImpl.h"
22#include "librados/RadosClient.h"
11fdf7f2 23#include "include/ceph_assert.h"
7c673cae
FG
24#include "common/valgrind.h"
25#include "common/EventTrace.h"
26
27#define dout_subsys ceph_subsys_rados
28#undef dout_prefix
29#define dout_prefix *_dout << "librados: "
30
20effc67
TL
31using std::string;
32using std::map;
33using std::unique_lock;
34using std::vector;
35
f67539c2
TL
36namespace bs = boost::system;
37namespace ca = ceph::async;
38namespace cb = ceph::buffer;
39
7c673cae
FG
40namespace librados {
41namespace {
42
f67539c2 43struct CB_notify_Finish {
7c673cae
FG
44 CephContext *cct;
45 Context *ctx;
46 Objecter *objecter;
47 Objecter::LingerOp *linger_op;
7c673cae
FG
48 bufferlist *preply_bl;
49 char **preply_buf;
50 size_t *preply_buf_len;
51
f67539c2
TL
52 CB_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
53 Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
54 char **_preply_buf, size_t *_preply_buf_len)
7c673cae
FG
55 : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
56 preply_bl(_preply_bl), preply_buf(_preply_buf),
f67539c2 57 preply_buf_len(_preply_buf_len) {}
7c673cae 58
f67539c2
TL
59
60 // move-only
61 CB_notify_Finish(const CB_notify_Finish&) = delete;
62 CB_notify_Finish& operator =(const CB_notify_Finish&) = delete;
63 CB_notify_Finish(CB_notify_Finish&&) = default;
64 CB_notify_Finish& operator =(CB_notify_Finish&&) = default;
65
66 void operator()(bs::error_code ec, bufferlist&& reply_bl) {
7c673cae 67 ldout(cct, 10) << __func__ << " completed notify (linger op "
f67539c2 68 << linger_op << "), ec = " << ec << dendl;
7c673cae
FG
69
70 // pass result back to user
71 // NOTE: we do this regardless of what error code we return
72 if (preply_buf) {
73 if (reply_bl.length()) {
74 *preply_buf = (char*)malloc(reply_bl.length());
75 memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
76 } else {
77 *preply_buf = NULL;
78 }
79 }
80 if (preply_buf_len)
81 *preply_buf_len = reply_bl.length();
82 if (preply_bl)
f67539c2 83 *preply_bl = std::move(reply_bl);
7c673cae 84
f67539c2 85 ctx->complete(ceph::from_error_code(ec));
7c673cae
FG
86 }
87};
88
f67539c2 89struct CB_aio_linger_cancel {
7c673cae
FG
90 Objecter *objecter;
91 Objecter::LingerOp *linger_op;
92
f67539c2 93 CB_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
7c673cae
FG
94 : objecter(_objecter), linger_op(_linger_op)
95 {
96 }
97
f67539c2 98 void operator()() {
7c673cae
FG
99 objecter->linger_cancel(linger_op);
100 }
101};
102
103struct C_aio_linger_Complete : public Context {
104 AioCompletionImpl *c;
105 Objecter::LingerOp *linger_op;
106 bool cancel;
107
108 C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel)
109 : c(_c), linger_op(_linger_op), cancel(_cancel)
110 {
111 c->get();
112 }
113
114 void finish(int r) override {
115 if (cancel || r < 0)
f67539c2
TL
116 boost::asio::defer(c->io->client->finish_strand,
117 CB_aio_linger_cancel(c->io->objecter,
118 linger_op));
7c673cae 119
9f95a23c 120 c->lock.lock();
7c673cae
FG
121 c->rval = r;
122 c->complete = true;
9f95a23c 123 c->cond.notify_all();
7c673cae
FG
124
125 if (c->callback_complete ||
126 c->callback_safe) {
f67539c2 127 boost::asio::defer(c->io->client->finish_strand, CB_AioComplete(c));
7c673cae
FG
128 }
129 c->put_unlock();
130 }
131};
132
133struct C_aio_notify_Complete : public C_aio_linger_Complete {
9f95a23c 134 ceph::mutex lock = ceph::make_mutex("C_aio_notify_Complete::lock");
7c673cae
FG
135 bool acked = false;
136 bool finished = false;
137 int ret_val = 0;
138
139 C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
9f95a23c 140 : C_aio_linger_Complete(_c, _linger_op, false) {
7c673cae
FG
141 }
142
143 void handle_ack(int r) {
144 // invoked by C_aio_notify_Ack
9f95a23c 145 lock.lock();
7c673cae
FG
146 acked = true;
147 complete_unlock(r);
148 }
149
150 void complete(int r) override {
28e407b8 151 // invoked by C_notify_Finish
9f95a23c 152 lock.lock();
7c673cae
FG
153 finished = true;
154 complete_unlock(r);
155 }
156
157 void complete_unlock(int r) {
158 if (ret_val == 0 && r < 0) {
159 ret_val = r;
160 }
161
162 if (acked && finished) {
9f95a23c 163 lock.unlock();
7c673cae
FG
164 cancel = true;
165 C_aio_linger_Complete::complete(ret_val);
166 } else {
9f95a23c 167 lock.unlock();
7c673cae
FG
168 }
169 }
170};
171
172struct C_aio_notify_Ack : public Context {
173 CephContext *cct;
7c673cae
FG
174 C_aio_notify_Complete *oncomplete;
175
f67539c2 176 C_aio_notify_Ack(CephContext *_cct,
7c673cae 177 C_aio_notify_Complete *_oncomplete)
f67539c2 178 : cct(_cct), oncomplete(_oncomplete)
7c673cae
FG
179 {
180 }
181
182 void finish(int r) override
183 {
184 ldout(cct, 10) << __func__ << " linger op " << oncomplete->linger_op << " "
185 << "acked (" << r << ")" << dendl;
186 oncomplete->handle_ack(r);
7c673cae
FG
187 }
188};
189
190struct C_aio_selfmanaged_snap_op_Complete : public Context {
191 librados::RadosClient *client;
192 librados::AioCompletionImpl *c;
193
194 C_aio_selfmanaged_snap_op_Complete(librados::RadosClient *client,
195 librados::AioCompletionImpl *c)
196 : client(client), c(c) {
197 c->get();
198 }
199
200 void finish(int r) override {
9f95a23c 201 c->lock.lock();
7c673cae
FG
202 c->rval = r;
203 c->complete = true;
9f95a23c 204 c->cond.notify_all();
7c673cae
FG
205
206 if (c->callback_complete || c->callback_safe) {
f67539c2 207 boost::asio::defer(client->finish_strand, librados::CB_AioComplete(c));
7c673cae
FG
208 }
209 c->put_unlock();
210 }
211};
212
213struct C_aio_selfmanaged_snap_create_Complete : public C_aio_selfmanaged_snap_op_Complete {
214 snapid_t snapid;
215 uint64_t *dest_snapid;
216
217 C_aio_selfmanaged_snap_create_Complete(librados::RadosClient *client,
218 librados::AioCompletionImpl *c,
219 uint64_t *dest_snapid)
220 : C_aio_selfmanaged_snap_op_Complete(client, c),
221 dest_snapid(dest_snapid) {
222 }
223
224 void finish(int r) override {
225 if (r >= 0) {
226 *dest_snapid = snapid;
227 }
228 C_aio_selfmanaged_snap_op_Complete::finish(r);
229 }
230};
231
232} // anonymous namespace
233} // namespace librados
234
9f95a23c 235librados::IoCtxImpl::IoCtxImpl() = default;
7c673cae
FG
236
237librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter,
238 int64_t poolid, snapid_t s)
9f95a23c 239 : client(c), poolid(poolid), snap_seq(s),
7c673cae 240 notify_timeout(c->cct->_conf->client_notify_timeout),
9f95a23c 241 oloc(poolid),
7c673cae
FG
242 aio_write_seq(0), objecter(objecter)
243{
244}
245
246void librados::IoCtxImpl::set_snap_read(snapid_t s)
247{
248 if (!s)
249 s = CEPH_NOSNAP;
250 ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl;
251 snap_seq = s;
252}
253
254int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps)
255{
256 ::SnapContext n;
257 ldout(client->cct, 10) << "set snap write context: seq = " << seq
258 << " and snaps = " << snaps << dendl;
259 n.seq = seq;
260 n.snaps = snaps;
261 if (!n.is_valid())
262 return -EINVAL;
263 snapc = n;
264 return 0;
265}
266
267int librados::IoCtxImpl::get_object_hash_position(
268 const std::string& oid, uint32_t *hash_position)
269{
270 int64_t r = objecter->get_object_hash_position(poolid, oid, oloc.nspace);
271 if (r < 0)
272 return r;
273 *hash_position = (uint32_t)r;
274 return 0;
275}
276
277int librados::IoCtxImpl::get_object_pg_hash_position(
278 const std::string& oid, uint32_t *pg_hash_position)
279{
280 int64_t r = objecter->get_object_pg_hash_position(poolid, oid, oloc.nspace);
281 if (r < 0)
282 return r;
283 *pg_hash_position = (uint32_t)r;
284 return 0;
285}
286
287void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
288{
289 get();
9f95a23c 290 std::scoped_lock l{aio_write_list_lock};
11fdf7f2 291 ceph_assert(c->io == this);
7c673cae
FG
292 c->aio_write_seq = ++aio_write_seq;
293 ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
294 << " write_seq " << aio_write_seq << dendl;
295 aio_write_list.push_back(&c->aio_write_list_item);
7c673cae
FG
296}
297
298void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
299{
300 ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
9f95a23c 301 aio_write_list_lock.lock();
11fdf7f2 302 ceph_assert(c->io == this);
7c673cae
FG
303 c->aio_write_list_item.remove_myself();
304
305 map<ceph_tid_t, std::list<AioCompletionImpl*> >::iterator waiters = aio_write_waiters.begin();
306 while (waiters != aio_write_waiters.end()) {
307 if (!aio_write_list.empty() &&
308 aio_write_list.front()->aio_write_seq <= waiters->first) {
309 ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq
310 << " <= waiter " << waiters->first
311 << ", stopping" << dendl;
312 break;
313 }
314 ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl;
315 for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
316 it != waiters->second.end(); ++it) {
f67539c2 317 boost::asio::defer(client->finish_strand, CB_AioCompleteAndSafe(*it));
7c673cae
FG
318 (*it)->put();
319 }
320 aio_write_waiters.erase(waiters++);
321 }
322
9f95a23c
TL
323 aio_write_cond.notify_all();
324 aio_write_list_lock.unlock();
7c673cae
FG
325 put();
326}
327
328void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
329{
330 ldout(client->cct, 20) << "flush_aio_writes_async " << this
331 << " completion " << c << dendl;
11fdf7f2 332 std::lock_guard l(aio_write_list_lock);
7c673cae
FG
333 ceph_tid_t seq = aio_write_seq;
334 if (aio_write_list.empty()) {
335 ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid "
336 << seq << ")" << dendl;
f67539c2 337 boost::asio::defer(client->finish_strand, CB_AioCompleteAndSafe(c));
7c673cae
FG
338 } else {
339 ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size()
340 << " writes in flight; waiting on tid " << seq << dendl;
341 c->get();
342 aio_write_waiters[seq].push_back(c);
343 }
344}
345
346void librados::IoCtxImpl::flush_aio_writes()
347{
348 ldout(client->cct, 20) << "flush_aio_writes" << dendl;
9f95a23c
TL
349 std::unique_lock l{aio_write_list_lock};
350 aio_write_cond.wait(l, [seq=aio_write_seq, this] {
351 return (aio_write_list.empty() ||
352 aio_write_list.front()->aio_write_seq > seq);
353 });
7c673cae
FG
354}
355
356string librados::IoCtxImpl::get_cached_pool_name()
357{
358 std::string pn;
359 client->pool_get_name(get_id(), &pn);
360 return pn;
361}
362
363// SNAPS
364
365int librados::IoCtxImpl::snap_create(const char *snapName)
366{
367 int reply;
368 string sName(snapName);
369
9f95a23c
TL
370 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::snap_create::mylock");
371 ceph::condition_variable cond;
7c673cae 372 bool done;
9f95a23c 373 Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
f67539c2 374 objecter->create_pool_snap(poolid, sName, onfinish);
7c673cae 375
f67539c2
TL
376 std::unique_lock l{mylock};
377 cond.wait(l, [&done] { return done; });
7c673cae
FG
378 return reply;
379}
380
381int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid)
382{
383 int reply;
384
9f95a23c
TL
385 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::selfmanaged_snap_create::mylock");
386 ceph::condition_variable cond;
7c673cae 387 bool done;
9f95a23c 388 Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
7c673cae 389 snapid_t snapid;
f67539c2 390 objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish);
7c673cae 391
f67539c2
TL
392 {
393 std::unique_lock l{mylock};
394 cond.wait(l, [&done] { return done; });
7c673cae 395 }
f67539c2
TL
396 if (reply == 0)
397 *psnapid = snapid;
7c673cae
FG
398 return reply;
399}
400
401void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid,
402 AioCompletionImpl *c)
403{
404 C_aio_selfmanaged_snap_create_Complete *onfinish =
405 new C_aio_selfmanaged_snap_create_Complete(client, c, snapid);
f67539c2
TL
406 objecter->allocate_selfmanaged_snap(poolid, &onfinish->snapid,
407 onfinish);
7c673cae
FG
408}
409
410int librados::IoCtxImpl::snap_remove(const char *snapName)
411{
412 int reply;
413 string sName(snapName);
414
9f95a23c
TL
415 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::snap_remove::mylock");
416 ceph::condition_variable cond;
7c673cae 417 bool done;
9f95a23c 418 Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
f67539c2
TL
419 objecter->delete_pool_snap(poolid, sName, onfinish);
420 unique_lock l{mylock};
421 cond.wait(l, [&done] { return done; });
7c673cae
FG
422 return reply;
423}
424
425int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
426 ::SnapContext& snapc,
427 uint64_t snapid)
428{
429 int reply;
430
9f95a23c
TL
431 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::snap_rollback::mylock");
432 ceph::condition_variable cond;
7c673cae 433 bool done;
9f95a23c 434 Context *onack = new C_SafeCond(mylock, cond, &done, &reply);
7c673cae
FG
435
436 ::ObjectOperation op;
437 prepare_assert_ops(&op);
438 op.rollback(snapid);
439 objecter->mutate(oid, oloc,
f67539c2
TL
440 op, snapc, ceph::real_clock::now(),
441 extra_op_flags,
7c673cae
FG
442 onack, NULL);
443
9f95a23c
TL
444 std::unique_lock l{mylock};
445 cond.wait(l, [&done] { return done; });
7c673cae
FG
446 return reply;
447}
448
449int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName)
450{
451 snapid_t snap;
452
453 int r = objecter->pool_snap_by_name(poolid, snapName, &snap);
454 if (r < 0) {
455 return r;
456 }
457
458 return selfmanaged_snap_rollback_object(oid, snapc, snap);
459}
460
461int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid)
462{
463 int reply;
464
9f95a23c
TL
465 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::selfmanaged_snap_remove::mylock");
466 ceph::condition_variable cond;
7c673cae
FG
467 bool done;
468 objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid),
9f95a23c 469 new C_SafeCond(mylock, cond, &done, &reply));
7c673cae 470
9f95a23c
TL
471 std::unique_lock l{mylock};
472 cond.wait(l, [&done] { return done; });
7c673cae
FG
473 return (int)reply;
474}
475
476void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid,
477 AioCompletionImpl *c)
478{
479 Context *onfinish = new C_aio_selfmanaged_snap_op_Complete(client, c);
480 objecter->delete_selfmanaged_snap(poolid, snapid, onfinish);
481}
482
7c673cae
FG
483int librados::IoCtxImpl::snap_list(vector<uint64_t> *snaps)
484{
485 return objecter->pool_snap_list(poolid, snaps);
486}
487
488int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid)
489{
490 return objecter->pool_snap_by_name(poolid, name, (snapid_t *)snapid);
491}
492
493int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s)
494{
495 pool_snap_info_t info;
496 int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
497 if (ret < 0) {
498 return ret;
499 }
500 *s = info.name.c_str();
501 return 0;
502}
503
504int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t)
505{
506 pool_snap_info_t info;
507 int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
508 if (ret < 0) {
509 return ret;
510 }
511 *t = info.stamp.sec();
512 return 0;
513}
514
515
516// IO
517
518int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries)
519{
7c673cae
FG
520 bool done;
521 int r = 0;
9f95a23c
TL
522 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::nlist::mylock");
523 ceph::condition_variable cond;
7c673cae
FG
524
525 if (context->at_end())
526 return 0;
527
528 context->max_entries = max_entries;
529 context->nspace = oloc.nspace;
530
9f95a23c 531 objecter->list_nobjects(context, new C_SafeCond(mylock, cond, &done, &r));
7c673cae 532
9f95a23c
TL
533 std::unique_lock l{mylock};
534 cond.wait(l, [&done] { return done; });
7c673cae
FG
535 return r;
536}
537
538uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
539 uint32_t pos)
540{
541 context->list.clear();
542 return objecter->list_nobjects_seek(context, pos);
543}
544
545uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
546 const rados_object_list_cursor& cursor)
547{
548 context->list.clear();
549 return objecter->list_nobjects_seek(context, *(const hobject_t *)cursor);
550}
551
552rados_object_list_cursor librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext *context)
553{
554 hobject_t *c = new hobject_t;
555
556 objecter->list_nobjects_get_cursor(context, c);
557 return (rados_object_list_cursor)c;
558}
559
560int librados::IoCtxImpl::create(const object_t& oid, bool exclusive)
561{
562 ::ObjectOperation op;
563 prepare_assert_ops(&op);
564 op.create(exclusive);
565 return operate(oid, &op, NULL);
566}
567
568/*
569 * add any version assert operations that are appropriate given the
570 * stat in the IoCtx, either the target version assert or any src
571 * object asserts. these affect a single ioctx operation, so clear
572 * the ioctx state when we're doing.
573 *
574 * return a pointer to the ObjectOperation if we added any events;
575 * this is convenient for passing the extra_ops argument into Objecter
576 * methods.
577 */
578::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op)
579{
580 ::ObjectOperation *pop = NULL;
581 if (assert_ver) {
582 op->assert_version(assert_ver);
583 assert_ver = 0;
584 pop = op;
585 }
586 return pop;
587}
588
589int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
590 size_t len, uint64_t off)
591{
592 if (len > UINT_MAX/2)
593 return -E2BIG;
594 ::ObjectOperation op;
595 prepare_assert_ops(&op);
596 bufferlist mybl;
597 mybl.substr_of(bl, 0, len);
598 op.write(off, mybl);
599 return operate(oid, &op, NULL);
600}
601
602int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len)
603{
604 if (len > UINT_MAX/2)
605 return -E2BIG;
606 ::ObjectOperation op;
607 prepare_assert_ops(&op);
608 bufferlist mybl;
609 mybl.substr_of(bl, 0, len);
610 op.append(mybl);
611 return operate(oid, &op, NULL);
612}
613
614int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl)
615{
616 if (bl.length() > UINT_MAX/2)
617 return -E2BIG;
618 ::ObjectOperation op;
619 prepare_assert_ops(&op);
620 op.write_full(bl);
621 return operate(oid, &op, NULL);
622}
623
624int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl,
625 size_t write_len, uint64_t off)
626{
627 if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
628 return -E2BIG;
629 if ((bl.length() == 0) || (write_len % bl.length()))
630 return -EINVAL;
631 ::ObjectOperation op;
632 prepare_assert_ops(&op);
633 bufferlist mybl;
634 mybl.substr_of(bl, 0, bl.length());
635 op.writesame(off, write_len, mybl);
636 return operate(oid, &op, NULL);
637}
638
639int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
640 ceph::real_time *pmtime, int flags)
641{
642 ceph::real_time ut = (pmtime ? *pmtime :
643 ceph::real_clock::now());
644
645 /* can't write to a snapshot */
646 if (snap_seq != CEPH_NOSNAP)
647 return -EROFS;
648
649 if (!o->size())
650 return 0;
651
9f95a23c
TL
652 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::operate::mylock");
653 ceph::condition_variable cond;
7c673cae
FG
654 bool done;
655 int r;
656 version_t ver;
657
9f95a23c 658 Context *oncommit = new C_SafeCond(mylock, cond, &done, &r);
7c673cae
FG
659
660 int op = o->ops[0].op.op;
661 ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid
662 << " nspace=" << oloc.nspace << dendl;
f67539c2
TL
663 Objecter::Op *objecter_op = objecter->prepare_mutate_op(
664 oid, oloc,
665 *o, snapc, ut,
666 flags | extra_op_flags,
667 oncommit, &ver);
7c673cae
FG
668 objecter->op_submit(objecter_op);
669
9f95a23c
TL
670 {
671 std::unique_lock l{mylock};
672 cond.wait(l, [&done] { return done;});
673 }
7c673cae
FG
674 ldout(client->cct, 10) << "Objecter returned from "
675 << ceph_osd_op_name(op) << " r=" << r << dendl;
676
677 set_sync_op_version(ver);
678
679 return r;
680}
681
682int librados::IoCtxImpl::operate_read(const object_t& oid,
683 ::ObjectOperation *o,
684 bufferlist *pbl,
685 int flags)
686{
687 if (!o->size())
688 return 0;
689
9f95a23c
TL
690 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::operate_read::mylock");
691 ceph::condition_variable cond;
7c673cae
FG
692 bool done;
693 int r;
694 version_t ver;
695
9f95a23c 696 Context *onack = new C_SafeCond(mylock, cond, &done, &r);
7c673cae
FG
697
698 int op = o->ops[0].op.op;
699 ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
f67539c2
TL
700 Objecter::Op *objecter_op = objecter->prepare_read_op(
701 oid, oloc,
702 *o, snap_seq, pbl,
703 flags | extra_op_flags,
704 onack, &ver);
7c673cae
FG
705 objecter->op_submit(objecter_op);
706
9f95a23c
TL
707 {
708 std::unique_lock l{mylock};
709 cond.wait(l, [&done] { return done; });
710 }
7c673cae
FG
711 ldout(client->cct, 10) << "Objecter returned from "
712 << ceph_osd_op_name(op) << " r=" << r << dendl;
713
714 set_sync_op_version(ver);
715
716 return r;
717}
718
719int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
720 ::ObjectOperation *o,
721 AioCompletionImpl *c,
722 int flags,
723 bufferlist *pbl,
31f18b77 724 const blkin_trace_info *trace_info)
7c673cae 725{
11fdf7f2 726 FUNCTRACE(client->cct);
7c673cae
FG
727 Context *oncomplete = new C_aio_Complete(c);
728
9f95a23c 729#if defined(WITH_EVENTTRACE)
7c673cae
FG
730 ((C_aio_Complete *) oncomplete)->oid = oid;
731#endif
732 c->is_read = true;
733 c->io = this;
734
735 ZTracer::Trace trace;
31f18b77
FG
736 if (trace_info) {
737 ZTracer::Trace parent_trace("", nullptr, trace_info);
738 trace.init("rados operate read", &objecter->trace_endpoint, &parent_trace);
739 }
7c673cae
FG
740
741 trace.event("init root span");
f67539c2
TL
742 Objecter::Op *objecter_op = objecter->prepare_read_op(
743 oid, oloc,
744 *o, snap_seq, pbl, flags | extra_op_flags,
745 oncomplete, &c->objver, nullptr, 0, &trace);
7c673cae
FG
746 objecter->op_submit(objecter_op, &c->tid);
747 trace.event("rados operate read submitted");
748
749 return 0;
750}
751
752int librados::IoCtxImpl::aio_operate(const object_t& oid,
753 ::ObjectOperation *o, AioCompletionImpl *c,
1e59de90
TL
754 const SnapContext& snap_context,
755 const ceph::real_time *pmtime, int flags,
31f18b77 756 const blkin_trace_info *trace_info)
7c673cae 757{
11fdf7f2 758 FUNCTRACE(client->cct);
7c673cae 759 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
1e59de90 760 const ceph::real_time ut = (pmtime ? *pmtime : ceph::real_clock::now());
7c673cae
FG
761 /* can't write to a snapshot */
762 if (snap_seq != CEPH_NOSNAP)
763 return -EROFS;
764
765 Context *oncomplete = new C_aio_Complete(c);
9f95a23c 766#if defined(WITH_EVENTTRACE)
7c673cae
FG
767 ((C_aio_Complete *) oncomplete)->oid = oid;
768#endif
769
770 c->io = this;
771 queue_aio_write(c);
772
773 ZTracer::Trace trace;
31f18b77
FG
774 if (trace_info) {
775 ZTracer::Trace parent_trace("", nullptr, trace_info);
776 trace.init("rados operate", &objecter->trace_endpoint, &parent_trace);
777 }
7c673cae
FG
778
779 trace.event("init root span");
780 Objecter::Op *op = objecter->prepare_mutate_op(
f67539c2 781 oid, oloc, *o, snap_context, ut, flags | extra_op_flags,
7c673cae
FG
782 oncomplete, &c->objver, osd_reqid_t(), &trace);
783 objecter->op_submit(op, &c->tid);
784 trace.event("rados operate op submitted");
785
786 return 0;
787}
788
789int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
790 bufferlist *pbl, size_t len, uint64_t off,
791 uint64_t snapid, const blkin_trace_info *info)
792{
11fdf7f2 793 FUNCTRACE(client->cct);
7c673cae
FG
794 if (len > (size_t) INT_MAX)
795 return -EDOM;
796
797 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
798 Context *oncomplete = new C_aio_Complete(c);
799
9f95a23c 800#if defined(WITH_EVENTTRACE)
7c673cae
FG
801 ((C_aio_Complete *) oncomplete)->oid = oid;
802#endif
803 c->is_read = true;
804 c->io = this;
805 c->blp = pbl;
806
807 ZTracer::Trace trace;
808 if (info)
809 trace.init("rados read", &objecter->trace_endpoint, info);
810
811 Objecter::Op *o = objecter->prepare_read_op(
812 oid, oloc,
f67539c2 813 off, len, snapid, pbl, extra_op_flags,
7c673cae
FG
814 oncomplete, &c->objver, nullptr, 0, &trace);
815 objecter->op_submit(o, &c->tid);
816 return 0;
817}
818
819int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
820 char *buf, size_t len, uint64_t off,
821 uint64_t snapid, const blkin_trace_info *info)
822{
11fdf7f2 823 FUNCTRACE(client->cct);
7c673cae
FG
824 if (len > (size_t) INT_MAX)
825 return -EDOM;
826
827 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
828 Context *oncomplete = new C_aio_Complete(c);
829
9f95a23c 830#if defined(WITH_EVENTTRACE)
7c673cae
FG
831 ((C_aio_Complete *) oncomplete)->oid = oid;
832#endif
833 c->is_read = true;
834 c->io = this;
835 c->bl.clear();
836 c->bl.push_back(buffer::create_static(len, buf));
837 c->blp = &c->bl;
838 c->out_buf = buf;
839
840 ZTracer::Trace trace;
841 if (info)
842 trace.init("rados read", &objecter->trace_endpoint, info);
843
844 Objecter::Op *o = objecter->prepare_read_op(
845 oid, oloc,
f67539c2 846 off, len, snapid, &c->bl, extra_op_flags,
7c673cae
FG
847 oncomplete, &c->objver, nullptr, 0, &trace);
848 objecter->op_submit(o, &c->tid);
849 return 0;
850}
851
852class C_ObjectOperation : public Context {
853public:
854 ::ObjectOperation m_ops;
855 explicit C_ObjectOperation(Context *c) : m_ctx(c) {}
856 void finish(int r) override {
857 m_ctx->complete(r);
858 }
859private:
860 Context *m_ctx;
861};
862
863int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
864 AioCompletionImpl *c,
865 std::map<uint64_t,uint64_t> *m,
866 bufferlist *data_bl, size_t len,
867 uint64_t off, uint64_t snapid)
868{
11fdf7f2 869 FUNCTRACE(client->cct);
7c673cae
FG
870 if (len > (size_t) INT_MAX)
871 return -EDOM;
872
873 Context *nested = new C_aio_Complete(c);
874 C_ObjectOperation *onack = new C_ObjectOperation(nested);
875
9f95a23c 876#if defined(WITH_EVENTTRACE)
7c673cae
FG
877 ((C_aio_Complete *) nested)->oid = oid;
878#endif
879 c->is_read = true;
880 c->io = this;
881
882 onack->m_ops.sparse_read(off, len, m, data_bl, NULL);
883
884 Objecter::Op *o = objecter->prepare_read_op(
885 oid, oloc,
f67539c2 886 onack->m_ops, snapid, NULL, extra_op_flags,
7c673cae
FG
887 onack, &c->objver);
888 objecter->op_submit(o, &c->tid);
889 return 0;
890}
891
892int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
893 AioCompletionImpl *c,
894 uint64_t off,
895 bufferlist& cmp_bl)
896{
897 if (cmp_bl.length() > UINT_MAX/2)
898 return -E2BIG;
899
900 Context *onack = new C_aio_Complete(c);
901
902 c->is_read = true;
903 c->io = this;
904
905 Objecter::Op *o = objecter->prepare_cmpext_op(
f67539c2 906 oid, oloc, off, cmp_bl, snap_seq, extra_op_flags,
7c673cae
FG
907 onack, &c->objver);
908 objecter->op_submit(o, &c->tid);
909
910 return 0;
911}
912
913/* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
914int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
915 AioCompletionImpl *c,
916 const char *cmp_buf,
917 size_t cmp_len,
918 uint64_t off)
919{
920 if (cmp_len > UINT_MAX/2)
921 return -E2BIG;
922
923 bufferlist cmp_bl;
924 cmp_bl.append(cmp_buf, cmp_len);
925
926 Context *nested = new C_aio_Complete(c);
927 C_ObjectOperation *onack = new C_ObjectOperation(nested);
928
929 c->is_read = true;
930 c->io = this;
931
932 onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL);
933
934 Objecter::Op *o = objecter->prepare_read_op(
f67539c2 935 oid, oloc, onack->m_ops, snap_seq, NULL, extra_op_flags, onack, &c->objver);
7c673cae
FG
936 objecter->op_submit(o, &c->tid);
937 return 0;
938}
939
940int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
941 const bufferlist& bl, size_t len,
942 uint64_t off, const blkin_trace_info *info)
943{
11fdf7f2 944 FUNCTRACE(client->cct);
7c673cae
FG
945 auto ut = ceph::real_clock::now();
946 ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl;
947 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
948
949 if (len > UINT_MAX/2)
950 return -E2BIG;
951 /* can't write to a snapshot */
952 if (snap_seq != CEPH_NOSNAP)
953 return -EROFS;
954
955 Context *oncomplete = new C_aio_Complete(c);
956
9f95a23c 957#if defined(WITH_EVENTTRACE)
7c673cae
FG
958 ((C_aio_Complete *) oncomplete)->oid = oid;
959#endif
960 ZTracer::Trace trace;
961 if (info)
962 trace.init("rados write", &objecter->trace_endpoint, info);
963
964 c->io = this;
965 queue_aio_write(c);
966
967 Objecter::Op *o = objecter->prepare_write_op(
968 oid, oloc,
f67539c2 969 off, len, snapc, bl, ut, extra_op_flags,
7c673cae
FG
970 oncomplete, &c->objver, nullptr, 0, &trace);
971 objecter->op_submit(o, &c->tid);
972
973 return 0;
974}
975
976int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
977 const bufferlist& bl, size_t len)
978{
11fdf7f2 979 FUNCTRACE(client->cct);
7c673cae
FG
980 auto ut = ceph::real_clock::now();
981
982 if (len > UINT_MAX/2)
983 return -E2BIG;
984 /* can't write to a snapshot */
985 if (snap_seq != CEPH_NOSNAP)
986 return -EROFS;
987
988 Context *oncomplete = new C_aio_Complete(c);
9f95a23c 989#if defined(WITH_EVENTTRACE)
7c673cae
FG
990 ((C_aio_Complete *) oncomplete)->oid = oid;
991#endif
992
993 c->io = this;
994 queue_aio_write(c);
995
996 Objecter::Op *o = objecter->prepare_append_op(
997 oid, oloc,
f67539c2 998 len, snapc, bl, ut, extra_op_flags,
7c673cae
FG
999 oncomplete, &c->objver);
1000 objecter->op_submit(o, &c->tid);
1001
1002 return 0;
1003}
1004
1005int librados::IoCtxImpl::aio_write_full(const object_t &oid,
1006 AioCompletionImpl *c,
1007 const bufferlist& bl)
1008{
11fdf7f2 1009 FUNCTRACE(client->cct);
7c673cae
FG
1010 auto ut = ceph::real_clock::now();
1011
1012 if (bl.length() > UINT_MAX/2)
1013 return -E2BIG;
1014 /* can't write to a snapshot */
1015 if (snap_seq != CEPH_NOSNAP)
1016 return -EROFS;
1017
1018 Context *oncomplete = new C_aio_Complete(c);
9f95a23c 1019#if defined(WITH_EVENTTRACE)
7c673cae
FG
1020 ((C_aio_Complete *) oncomplete)->oid = oid;
1021#endif
1022
1023 c->io = this;
1024 queue_aio_write(c);
1025
1026 Objecter::Op *o = objecter->prepare_write_full_op(
1027 oid, oloc,
f67539c2 1028 snapc, bl, ut, extra_op_flags,
7c673cae
FG
1029 oncomplete, &c->objver);
1030 objecter->op_submit(o, &c->tid);
1031
1032 return 0;
1033}
1034
1035int librados::IoCtxImpl::aio_writesame(const object_t &oid,
1036 AioCompletionImpl *c,
1037 const bufferlist& bl,
1038 size_t write_len,
1039 uint64_t off)
1040{
11fdf7f2 1041 FUNCTRACE(client->cct);
7c673cae
FG
1042 auto ut = ceph::real_clock::now();
1043
1044 if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
1045 return -E2BIG;
1046 if ((bl.length() == 0) || (write_len % bl.length()))
1047 return -EINVAL;
1048 /* can't write to a snapshot */
1049 if (snap_seq != CEPH_NOSNAP)
1050 return -EROFS;
1051
1052 Context *oncomplete = new C_aio_Complete(c);
1053
9f95a23c 1054#if defined(WITH_EVENTTRACE)
7c673cae
FG
1055 ((C_aio_Complete *) oncomplete)->oid = oid;
1056#endif
1057 c->io = this;
1058 queue_aio_write(c);
1059
1060 Objecter::Op *o = objecter->prepare_writesame_op(
1061 oid, oloc,
1062 write_len, off,
f67539c2 1063 snapc, bl, ut, extra_op_flags,
7c673cae
FG
1064 oncomplete, &c->objver);
1065 objecter->op_submit(o, &c->tid);
1066
1067 return 0;
1068}
1069
1070int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags)
1071{
11fdf7f2 1072 FUNCTRACE(client->cct);
7c673cae
FG
1073 auto ut = ceph::real_clock::now();
1074
1075 /* can't write to a snapshot */
1076 if (snap_seq != CEPH_NOSNAP)
1077 return -EROFS;
1078
1079 Context *oncomplete = new C_aio_Complete(c);
1080
9f95a23c 1081#if defined(WITH_EVENTTRACE)
7c673cae
FG
1082 ((C_aio_Complete *) oncomplete)->oid = oid;
1083#endif
1084 c->io = this;
1085 queue_aio_write(c);
1086
1087 Objecter::Op *o = objecter->prepare_remove_op(
1088 oid, oloc,
f67539c2 1089 snapc, ut, flags | extra_op_flags,
7c673cae
FG
1090 oncomplete, &c->objver);
1091 objecter->op_submit(o, &c->tid);
1092
1093 return 0;
1094}
1095
1096
1097int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c,
1098 uint64_t *psize, time_t *pmtime)
1099{
1100 C_aio_stat_Ack *onack = new C_aio_stat_Ack(c, pmtime);
1101 c->is_read = true;
1102 c->io = this;
1103 Objecter::Op *o = objecter->prepare_stat_op(
1104 oid, oloc,
f67539c2 1105 snap_seq, psize, &onack->mtime, extra_op_flags,
7c673cae
FG
1106 onack, &c->objver);
1107 objecter->op_submit(o, &c->tid);
1108 return 0;
1109}
1110
1111int librados::IoCtxImpl::aio_stat2(const object_t& oid, AioCompletionImpl *c,
1112 uint64_t *psize, struct timespec *pts)
1113{
1114 C_aio_stat2_Ack *onack = new C_aio_stat2_Ack(c, pts);
1115 c->is_read = true;
1116 c->io = this;
1117 Objecter::Op *o = objecter->prepare_stat_op(
1118 oid, oloc,
f67539c2 1119 snap_seq, psize, &onack->mtime, extra_op_flags,
7c673cae
FG
1120 onack, &c->objver);
1121 objecter->op_submit(o, &c->tid);
1122 return 0;
1123}
1124
1125int librados::IoCtxImpl::aio_getxattr(const object_t& oid, AioCompletionImpl *c,
1126 const char *name, bufferlist& bl)
1127{
1128 ::ObjectOperation rd;
1129 prepare_assert_ops(&rd);
1130 rd.getxattr(name, &bl, NULL);
1131 int r = aio_operate_read(oid, &rd, c, 0, &bl);
1132 return r;
1133}
1134
1135int librados::IoCtxImpl::aio_rmxattr(const object_t& oid, AioCompletionImpl *c,
1136 const char *name)
1137{
1138 ::ObjectOperation op;
1139 prepare_assert_ops(&op);
1140 op.rmxattr(name);
1e59de90 1141 return aio_operate(oid, &op, c, snapc, nullptr, 0);
7c673cae
FG
1142}
1143
1144int librados::IoCtxImpl::aio_setxattr(const object_t& oid, AioCompletionImpl *c,
1145 const char *name, bufferlist& bl)
1146{
1147 ::ObjectOperation op;
1148 prepare_assert_ops(&op);
1149 op.setxattr(name, bl);
1e59de90 1150 return aio_operate(oid, &op, c, snapc, nullptr, 0);
7c673cae
FG
1151}
1152
1153namespace {
1154struct AioGetxattrsData {
1155 AioGetxattrsData(librados::AioCompletionImpl *c, map<string, bufferlist>* attrset,
1156 librados::RadosClient *_client) :
1157 user_completion(c), user_attrset(attrset), client(_client) {}
f67539c2 1158 struct librados::CB_AioCompleteAndSafe user_completion;
7c673cae
FG
1159 map<string, bufferlist> result_attrset;
1160 map<std::string, bufferlist>* user_attrset;
1161 librados::RadosClient *client;
1162};
1163}
1164
1165static void aio_getxattrs_complete(rados_completion_t c, void *arg) {
1166 AioGetxattrsData *cdata = reinterpret_cast<AioGetxattrsData*>(arg);
1167 int rc = rados_aio_get_return_value(c);
1168 cdata->user_attrset->clear();
1169 if (rc >= 0) {
1170 for (map<string,bufferlist>::iterator p = cdata->result_attrset.begin();
1171 p != cdata->result_attrset.end();
1172 ++p) {
1173 ldout(cdata->client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1174 (*cdata->user_attrset)[p->first] = p->second;
1175 }
1176 }
f67539c2 1177 cdata->user_completion(rc);
7c673cae
FG
1178 ((librados::AioCompletionImpl*)c)->put();
1179 delete cdata;
1180}
1181
1182int librados::IoCtxImpl::aio_getxattrs(const object_t& oid, AioCompletionImpl *c,
1183 map<std::string, bufferlist>& attrset)
1184{
1185 AioGetxattrsData *cdata = new AioGetxattrsData(c, &attrset, client);
1186 ::ObjectOperation rd;
1187 prepare_assert_ops(&rd);
1188 rd.getxattrs(&cdata->result_attrset, NULL);
1189 librados::AioCompletionImpl *comp = new librados::AioCompletionImpl;
1190 comp->set_complete_callback(cdata, aio_getxattrs_complete);
1191 return aio_operate_read(oid, &rd, comp, 0, NULL);
1192}
1193
1194int librados::IoCtxImpl::aio_cancel(AioCompletionImpl *c)
1195{
1196 return objecter->op_cancel(c->tid, -ECANCELED);
1197}
1198
1199
1200int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
1201 std::list< std::pair<time_t, time_t> > *pls)
1202{
1203 Context *oncomplete = new C_aio_Complete(c);
1204 c->is_read = true;
1205 c->io = this;
1206
1207 ::ObjectOperation rd;
1208 rd.hit_set_ls(pls, NULL);
1209 object_locator_t oloc(poolid);
1210 Objecter::Op *o = objecter->prepare_pg_read_op(
f67539c2 1211 hash, oloc, rd, NULL, extra_op_flags, oncomplete, NULL, NULL);
7c673cae
FG
1212 objecter->op_submit(o, &c->tid);
1213 return 0;
1214}
1215
1216int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
1217 time_t stamp,
1218 bufferlist *pbl)
1219{
1220 Context *oncomplete = new C_aio_Complete(c);
1221 c->is_read = true;
1222 c->io = this;
1223
1224 ::ObjectOperation rd;
1225 rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0);
1226 object_locator_t oloc(poolid);
1227 Objecter::Op *o = objecter->prepare_pg_read_op(
f67539c2 1228 hash, oloc, rd, NULL, extra_op_flags, oncomplete, NULL, NULL);
7c673cae
FG
1229 objecter->op_submit(o, &c->tid);
1230 return 0;
1231}
1232
1233int librados::IoCtxImpl::remove(const object_t& oid)
1234{
1235 ::ObjectOperation op;
1236 prepare_assert_ops(&op);
1237 op.remove();
94b18763 1238 return operate(oid, &op, nullptr, librados::OPERATION_FULL_FORCE);
7c673cae
FG
1239}
1240
1241int librados::IoCtxImpl::remove(const object_t& oid, int flags)
1242{
1243 ::ObjectOperation op;
1244 prepare_assert_ops(&op);
1245 op.remove();
1246 return operate(oid, &op, NULL, flags);
1247}
1248
1249int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size)
1250{
1251 ::ObjectOperation op;
1252 prepare_assert_ops(&op);
1253 op.truncate(size);
1254 return operate(oid, &op, NULL);
1255}
1256
1257int librados::IoCtxImpl::get_inconsistent_objects(const pg_t& pg,
1258 const librados::object_id_t& start_after,
1259 uint64_t max_to_get,
1260 AioCompletionImpl *c,
1261 std::vector<inconsistent_obj_t>* objects,
1262 uint32_t* interval)
1263{
1264 Context *oncomplete = new C_aio_Complete(c);
1265 c->is_read = true;
1266 c->io = this;
1267
1268 ::ObjectOperation op;
11fdf7f2 1269 op.scrub_ls(start_after, max_to_get, objects, interval, &c->rval);
7c673cae
FG
1270 object_locator_t oloc{poolid, pg.ps()};
1271 Objecter::Op *o = objecter->prepare_pg_read_op(
f67539c2 1272 oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP | extra_op_flags, oncomplete,
7c673cae
FG
1273 nullptr, nullptr);
1274 objecter->op_submit(o, &c->tid);
1275 return 0;
1276}
1277
1278int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t& pg,
1279 const librados::object_id_t& start_after,
1280 uint64_t max_to_get,
1281 AioCompletionImpl *c,
1282 std::vector<inconsistent_snapset_t>* snapsets,
1283 uint32_t* interval)
1284{
1285 Context *oncomplete = new C_aio_Complete(c);
1286 c->is_read = true;
1287 c->io = this;
1288
1289 ::ObjectOperation op;
11fdf7f2 1290 op.scrub_ls(start_after, max_to_get, snapsets, interval, &c->rval);
7c673cae
FG
1291 object_locator_t oloc{poolid, pg.ps()};
1292 Objecter::Op *o = objecter->prepare_pg_read_op(
f67539c2 1293 oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP | extra_op_flags, oncomplete,
7c673cae
FG
1294 nullptr, nullptr);
1295 objecter->op_submit(o, &c->tid);
1296 return 0;
1297}
1298
1299int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl)
1300{
1301 ::ObjectOperation wr;
1302 prepare_assert_ops(&wr);
1303 wr.tmap_update(cmdbl);
1304 return operate(oid, &wr, NULL);
1305}
1306
7c673cae
FG
1307int librados::IoCtxImpl::exec(const object_t& oid,
1308 const char *cls, const char *method,
1309 bufferlist& inbl, bufferlist& outbl)
1310{
1311 ::ObjectOperation rd;
1312 prepare_assert_ops(&rd);
1313 rd.call(cls, method, inbl);
1314 return operate_read(oid, &rd, &outbl);
1315}
1316
1317int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1318 const char *cls, const char *method,
1319 bufferlist& inbl, bufferlist *outbl)
1320{
11fdf7f2 1321 FUNCTRACE(client->cct);
7c673cae
FG
1322 Context *oncomplete = new C_aio_Complete(c);
1323
9f95a23c 1324#if defined(WITH_EVENTTRACE)
7c673cae
FG
1325 ((C_aio_Complete *) oncomplete)->oid = oid;
1326#endif
1327 c->is_read = true;
1328 c->io = this;
1329
1330 ::ObjectOperation rd;
1331 prepare_assert_ops(&rd);
1332 rd.call(cls, method, inbl);
1333 Objecter::Op *o = objecter->prepare_read_op(
f67539c2 1334 oid, oloc, rd, snap_seq, outbl, extra_op_flags, oncomplete, &c->objver);
7c673cae
FG
1335 objecter->op_submit(o, &c->tid);
1336 return 0;
1337}
1338
1339int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1340 const char *cls, const char *method,
1341 bufferlist& inbl, char *buf, size_t out_len)
1342{
11fdf7f2 1343 FUNCTRACE(client->cct);
7c673cae
FG
1344 Context *oncomplete = new C_aio_Complete(c);
1345
9f95a23c 1346#if defined(WITH_EVENTTRACE)
7c673cae
FG
1347 ((C_aio_Complete *) oncomplete)->oid = oid;
1348#endif
1349 c->is_read = true;
1350 c->io = this;
1351 c->bl.clear();
1352 c->bl.push_back(buffer::create_static(out_len, buf));
1353 c->blp = &c->bl;
1354 c->out_buf = buf;
1355
1356 ::ObjectOperation rd;
1357 prepare_assert_ops(&rd);
1358 rd.call(cls, method, inbl);
1359 Objecter::Op *o = objecter->prepare_read_op(
f67539c2 1360 oid, oloc, rd, snap_seq, &c->bl, extra_op_flags, oncomplete, &c->objver);
7c673cae
FG
1361 objecter->op_submit(o, &c->tid);
1362 return 0;
1363}
1364
1365int librados::IoCtxImpl::read(const object_t& oid,
1366 bufferlist& bl, size_t len, uint64_t off)
1367{
1368 if (len > (size_t) INT_MAX)
1369 return -EDOM;
1370 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
1371
1372 ::ObjectOperation rd;
1373 prepare_assert_ops(&rd);
1374 rd.read(off, len, &bl, NULL, NULL);
1375 int r = operate_read(oid, &rd, &bl);
1376 if (r < 0)
1377 return r;
1378
1379 if (bl.length() < len) {
1380 ldout(client->cct, 10) << "Returned length " << bl.length()
1381 << " less than original length "<< len << dendl;
1382 }
1383
1384 return bl.length();
1385}
1386
1387int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off,
1388 bufferlist& cmp_bl)
1389{
1390 if (cmp_bl.length() > UINT_MAX/2)
1391 return -E2BIG;
1392
1393 ::ObjectOperation op;
1394 prepare_assert_ops(&op);
1395 op.cmpext(off, cmp_bl, NULL);
1396 return operate_read(oid, &op, NULL);
1397}
1398
1399int librados::IoCtxImpl::mapext(const object_t& oid,
1400 uint64_t off, size_t len,
1401 std::map<uint64_t,uint64_t>& m)
1402{
1403 bufferlist bl;
1404
9f95a23c
TL
1405 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::read::mylock");
1406 ceph::condition_variable cond;
7c673cae
FG
1407 bool done;
1408 int r;
9f95a23c 1409 Context *onack = new C_SafeCond(mylock, cond, &done, &r);
7c673cae
FG
1410
1411 objecter->mapext(oid, oloc,
f67539c2 1412 off, len, snap_seq, &bl, extra_op_flags,
7c673cae
FG
1413 onack);
1414
9f95a23c
TL
1415 {
1416 unique_lock l{mylock};
1417 cond.wait(l, [&done] { return done;});
1418 }
7c673cae
FG
1419 ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
1420
1421 if (r < 0)
1422 return r;
1423
11fdf7f2
TL
1424 auto iter = bl.cbegin();
1425 decode(m, iter);
7c673cae
FG
1426
1427 return m.size();
1428}
1429
1430int librados::IoCtxImpl::sparse_read(const object_t& oid,
1431 std::map<uint64_t,uint64_t>& m,
1432 bufferlist& data_bl, size_t len,
1433 uint64_t off)
1434{
1435 if (len > (size_t) INT_MAX)
1436 return -EDOM;
1437
1438 ::ObjectOperation rd;
1439 prepare_assert_ops(&rd);
1440 rd.sparse_read(off, len, &m, &data_bl, NULL);
1441
1442 int r = operate_read(oid, &rd, NULL);
1443 if (r < 0)
1444 return r;
1445
1446 return m.size();
1447}
1448
1449int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type,
1450 const bufferlist &init_value, size_t len,
1451 uint64_t off, size_t chunk_size,
1452 bufferlist *pbl)
1453{
1454 if (len > (size_t) INT_MAX) {
1455 return -EDOM;
1456 }
1457
1458 ::ObjectOperation rd;
1459 prepare_assert_ops(&rd);
1460 rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr);
1461
1462 int r = operate_read(oid, &rd, nullptr);
1463 if (r < 0) {
1464 return r;
1465 }
1466
1467 return 0;
1468}
1469
1470int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
1471{
1472 uint64_t size;
1473 real_time mtime;
1474
1475 if (!psize)
1476 psize = &size;
1477
1478 ::ObjectOperation rd;
1479 prepare_assert_ops(&rd);
f67539c2 1480 rd.stat(psize, &mtime, nullptr);
7c673cae
FG
1481 int r = operate_read(oid, &rd, NULL);
1482
1483 if (r >= 0 && pmtime) {
1484 *pmtime = real_clock::to_time_t(mtime);
1485 }
1486
1487 return r;
1488}
1489
1490int librados::IoCtxImpl::stat2(const object_t& oid, uint64_t *psize, struct timespec *pts)
1491{
1492 uint64_t size;
1493 ceph::real_time mtime;
1494
1495 if (!psize)
1496 psize = &size;
1497
1498 ::ObjectOperation rd;
1499 prepare_assert_ops(&rd);
f67539c2 1500 rd.stat(psize, &mtime, nullptr);
7c673cae
FG
1501 int r = operate_read(oid, &rd, NULL);
1502 if (r < 0) {
1503 return r;
1504 }
1505
1506 if (pts) {
1507 *pts = ceph::real_clock::to_timespec(mtime);
1508 }
1509
1510 return 0;
1511}
1512
1513int librados::IoCtxImpl::getxattr(const object_t& oid,
1514 const char *name, bufferlist& bl)
1515{
1516 ::ObjectOperation rd;
1517 prepare_assert_ops(&rd);
1518 rd.getxattr(name, &bl, NULL);
1519 int r = operate_read(oid, &rd, &bl);
1520 if (r < 0)
1521 return r;
1522
1523 return bl.length();
1524}
1525
1526int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name)
1527{
1528 ::ObjectOperation op;
1529 prepare_assert_ops(&op);
1530 op.rmxattr(name);
1531 return operate(oid, &op, NULL);
1532}
1533
1534int librados::IoCtxImpl::setxattr(const object_t& oid,
1535 const char *name, bufferlist& bl)
1536{
1537 ::ObjectOperation op;
1538 prepare_assert_ops(&op);
1539 op.setxattr(name, bl);
1540 return operate(oid, &op, NULL);
1541}
1542
1543int librados::IoCtxImpl::getxattrs(const object_t& oid,
1544 map<std::string, bufferlist>& attrset)
1545{
1546 map<string, bufferlist> aset;
1547
1548 ::ObjectOperation rd;
1549 prepare_assert_ops(&rd);
1550 rd.getxattrs(&aset, NULL);
1551 int r = operate_read(oid, &rd, NULL);
1552
1553 attrset.clear();
1554 if (r >= 0) {
1555 for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); ++p) {
1556 ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1557 attrset[p->first.c_str()] = p->second;
1558 }
1559 }
1560
1561 return r;
1562}
1563
1564void librados::IoCtxImpl::set_sync_op_version(version_t ver)
1565{
1566 ANNOTATE_BENIGN_RACE_SIZED(&last_objver, sizeof(last_objver),
1567 "IoCtxImpl last_objver");
1568 last_objver = ver;
1569}
1570
f67539c2
TL
1571namespace librados {
1572void intrusive_ptr_add_ref(IoCtxImpl *p) { p->get(); }
1573void intrusive_ptr_release(IoCtxImpl *p) { p->put(); }
1574}
1575
1576struct WatchInfo {
1577 boost::intrusive_ptr<librados::IoCtxImpl> ioctx;
7c673cae
FG
1578 object_t oid;
1579 librados::WatchCtx *ctx;
1580 librados::WatchCtx2 *ctx2;
7c673cae
FG
1581
1582 WatchInfo(librados::IoCtxImpl *io, object_t o,
f67539c2
TL
1583 librados::WatchCtx *c, librados::WatchCtx2 *c2)
1584 : ioctx(io), oid(o), ctx(c), ctx2(c2) {}
7c673cae
FG
1585
1586 void handle_notify(uint64_t notify_id,
1587 uint64_t cookie,
1588 uint64_t notifier_id,
f67539c2 1589 bufferlist& bl) {
7c673cae
FG
1590 ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
1591 << " cookie " << cookie
1592 << " notifier_id " << notifier_id
1593 << " len " << bl.length()
1594 << dendl;
1595
1596 if (ctx2)
1597 ctx2->handle_notify(notify_id, cookie, notifier_id, bl);
1598 if (ctx) {
1599 ctx->notify(0, 0, bl);
1600
1601 // send ACK back to OSD if using legacy protocol
1602 bufferlist empty;
1603 ioctx->notify_ack(oid, notify_id, cookie, empty);
1604 }
1605 }
f67539c2 1606 void handle_error(uint64_t cookie, int err) {
7c673cae
FG
1607 ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie
1608 << " err " << err
1609 << dendl;
1610 if (ctx2)
1611 ctx2->handle_error(cookie, err);
1612 }
f67539c2
TL
1613
1614 void operator()(bs::error_code ec,
1615 uint64_t notify_id,
1616 uint64_t cookie,
1617 uint64_t notifier_id,
1618 bufferlist&& bl) {
1619 if (ec) {
1620 handle_error(cookie, ceph::from_error_code(ec));
1621 } else {
1622 handle_notify(notify_id, cookie, notifier_id, bl);
1623 }
1624 }
1625};
1626
1627// internal WatchInfo that owns the context memory
1628struct InternalWatchInfo : public WatchInfo {
1629 std::unique_ptr<librados::WatchCtx> ctx;
1630 std::unique_ptr<librados::WatchCtx2> ctx2;
1631
1632 InternalWatchInfo(librados::IoCtxImpl *io, object_t o,
1633 librados::WatchCtx *c, librados::WatchCtx2 *c2)
1634 : WatchInfo(io, o, c, c2), ctx(c), ctx2(c2) {}
7c673cae
FG
1635};
1636
1637int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1638 librados::WatchCtx *ctx,
1639 librados::WatchCtx2 *ctx2,
1640 bool internal)
1641{
1642 return watch(oid, handle, ctx, ctx2, 0, internal);
1643}
1644
1645int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1646 librados::WatchCtx *ctx,
1647 librados::WatchCtx2 *ctx2,
1648 uint32_t timeout,
1649 bool internal)
1650{
1651 ::ObjectOperation wr;
1652 version_t objver;
1653 C_SaferCond onfinish;
1654
20effc67
TL
1655 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc,
1656 extra_op_flags);
7c673cae 1657 *handle = linger_op->get_cookie();
f67539c2
TL
1658 if (internal) {
1659 linger_op->handle = InternalWatchInfo(this, oid, ctx, ctx2);
1660 } else {
1661 linger_op->handle = WatchInfo(this, oid, ctx, ctx2);
1662 }
7c673cae
FG
1663 prepare_assert_ops(&wr);
1664 wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1665 bufferlist bl;
1666 objecter->linger_watch(linger_op, wr,
1667 snapc, ceph::real_clock::now(), bl,
1668 &onfinish,
1669 &objver);
1670
1671 int r = onfinish.wait();
1672
1673 set_sync_op_version(objver);
1674
1675 if (r < 0) {
1676 objecter->linger_cancel(linger_op);
1677 *handle = 0;
1678 }
1679
1680 return r;
1681}
1682
1683int librados::IoCtxImpl::aio_watch(const object_t& oid,
1684 AioCompletionImpl *c,
1685 uint64_t *handle,
1686 librados::WatchCtx *ctx,
1687 librados::WatchCtx2 *ctx2,
1688 bool internal) {
1689 return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
1690}
1691
1692int librados::IoCtxImpl::aio_watch(const object_t& oid,
1693 AioCompletionImpl *c,
1694 uint64_t *handle,
1695 librados::WatchCtx *ctx,
1696 librados::WatchCtx2 *ctx2,
1697 uint32_t timeout,
1698 bool internal)
1699{
20effc67
TL
1700 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc,
1701 extra_op_flags);
7c673cae
FG
1702 c->io = this;
1703 Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);
1704
1705 ::ObjectOperation wr;
1706 *handle = linger_op->get_cookie();
f67539c2
TL
1707 if (internal) {
1708 linger_op->handle = InternalWatchInfo(this, oid, ctx, ctx2);
1709 } else {
1710 linger_op->handle = WatchInfo(this, oid, ctx, ctx2);
1711 }
7c673cae
FG
1712
1713 prepare_assert_ops(&wr);
1714 wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1715 bufferlist bl;
1716 objecter->linger_watch(linger_op, wr,
1717 snapc, ceph::real_clock::now(), bl,
1718 oncomplete, &c->objver);
1719
1720 return 0;
1721}
1722
1723
1724int librados::IoCtxImpl::notify_ack(
1725 const object_t& oid,
1726 uint64_t notify_id,
1727 uint64_t cookie,
1728 bufferlist& bl)
1729{
1730 ::ObjectOperation rd;
1731 prepare_assert_ops(&rd);
1732 rd.notify_ack(notify_id, cookie, bl);
f67539c2 1733 objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, extra_op_flags, 0, 0);
7c673cae
FG
1734 return 0;
1735}
1736
1737int librados::IoCtxImpl::watch_check(uint64_t cookie)
1738{
f67539c2
TL
1739 auto linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1740 auto r = objecter->linger_check(linger_op);
1741 if (r)
1742 return 1 + std::chrono::duration_cast<
1743 std::chrono::milliseconds>(*r).count();
1744 else
1745 return ceph::from_error_code(r.error());
7c673cae
FG
1746}
1747
1748int librados::IoCtxImpl::unwatch(uint64_t cookie)
1749{
1750 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1751 C_SaferCond onfinish;
1752 version_t ver = 0;
1753
1754 ::ObjectOperation wr;
1755 prepare_assert_ops(&wr);
1756 wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1757 objecter->mutate(linger_op->target.base_oid, oloc, wr,
f67539c2 1758 snapc, ceph::real_clock::now(), extra_op_flags,
7c673cae
FG
1759 &onfinish, &ver);
1760 objecter->linger_cancel(linger_op);
1761
1762 int r = onfinish.wait();
1763 set_sync_op_version(ver);
1764 return r;
1765}
1766
1767int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
1768{
1769 c->io = this;
1770 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1771 Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
1772
1773 ::ObjectOperation wr;
1774 prepare_assert_ops(&wr);
1775 wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1776 objecter->mutate(linger_op->target.base_oid, oloc, wr,
f67539c2 1777 snapc, ceph::real_clock::now(), extra_op_flags,
7c673cae
FG
1778 oncomplete, &c->objver);
1779 return 0;
1780}
1781
1782int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
1783 uint64_t timeout_ms,
1784 bufferlist *preply_bl,
1785 char **preply_buf, size_t *preply_buf_len)
1786{
20effc67
TL
1787 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc,
1788 extra_op_flags);
7c673cae
FG
1789
1790 C_SaferCond notify_finish_cond;
f67539c2
TL
1791 linger_op->on_notify_finish =
1792 Objecter::LingerOp::OpComp::create(
1793 objecter->service.get_executor(),
1794 CB_notify_Finish(client->cct, &notify_finish_cond,
1795 objecter, linger_op, preply_bl,
1796 preply_buf, preply_buf_len));
7c673cae
FG
1797 uint32_t timeout = notify_timeout;
1798 if (timeout_ms)
1799 timeout = timeout_ms / 1000;
1800
1801 // Construct RADOS op
1802 ::ObjectOperation rd;
1803 prepare_assert_ops(&rd);
1804 bufferlist inbl;
1805 rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1806
1807 // Issue RADOS op
1808 C_SaferCond onack;
1809 version_t objver;
1810 objecter->linger_notify(linger_op,
1811 rd, snap_seq, inbl, NULL,
1812 &onack, &objver);
1813
1814 ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
1815 int r = onack.wait();
1816 ldout(client->cct, 10) << __func__ << " linger op " << linger_op
1817 << " acked (" << r << ")" << dendl;
1818
1819 if (r == 0) {
1820 ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
1821 << linger_op << dendl;
1822 r = notify_finish_cond.wait();
1823
1824 } else {
1825 ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
1826 << r << dendl;
28e407b8 1827 notify_finish_cond.wait();
7c673cae
FG
1828 }
1829
1830 objecter->linger_cancel(linger_op);
1831
1832 set_sync_op_version(objver);
1833 return r;
1834}
1835
1836int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
1837 bufferlist& bl, uint64_t timeout_ms,
1838 bufferlist *preply_bl, char **preply_buf,
1839 size_t *preply_buf_len)
1840{
20effc67
TL
1841 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc,
1842 extra_op_flags);
7c673cae
FG
1843
1844 c->io = this;
1845
1846 C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);
f67539c2
TL
1847 linger_op->on_notify_finish =
1848 Objecter::LingerOp::OpComp::create(
1849 objecter->service.get_executor(),
1850 CB_notify_Finish(client->cct, oncomplete,
1851 objecter, linger_op,
1852 preply_bl, preply_buf,
1853 preply_buf_len));
1854 Context *onack = new C_aio_notify_Ack(client->cct, oncomplete);
7c673cae
FG
1855
1856 uint32_t timeout = notify_timeout;
1857 if (timeout_ms)
1858 timeout = timeout_ms / 1000;
1859
1860 // Construct RADOS op
1861 ::ObjectOperation rd;
1862 prepare_assert_ops(&rd);
1863 bufferlist inbl;
1864 rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1865
1866 // Issue RADOS op
1867 objecter->linger_notify(linger_op,
1868 rd, snap_seq, inbl, NULL,
1869 onack, &c->objver);
1870 return 0;
1871}
1872
1873int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
1874 uint64_t expected_object_size,
1875 uint64_t expected_write_size,
1876 uint32_t flags)
1877{
1878 ::ObjectOperation wr;
1879 prepare_assert_ops(&wr);
1880 wr.set_alloc_hint(expected_object_size, expected_write_size, flags);
1881 return operate(oid, &wr, NULL);
1882}
1883
1884version_t librados::IoCtxImpl::last_version()
1885{
1886 return last_objver;
1887}
1888
1889void librados::IoCtxImpl::set_assert_version(uint64_t ver)
1890{
1891 assert_ver = ver;
1892}
1893
1894void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout)
1895{
1896 notify_timeout = timeout;
1897}
1898
1899int librados::IoCtxImpl::cache_pin(const object_t& oid)
1900{
1901 ::ObjectOperation wr;
1902 prepare_assert_ops(&wr);
1903 wr.cache_pin();
1904 return operate(oid, &wr, NULL);
1905}
1906
1907int librados::IoCtxImpl::cache_unpin(const object_t& oid)
1908{
1909 ::ObjectOperation wr;
1910 prepare_assert_ops(&wr);
1911 wr.cache_unpin();
1912 return operate(oid, &wr, NULL);
1913}
1914
1915
1916///////////////////////////// C_aio_stat_Ack ////////////////////////////
1917
1918librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl *_c,
1919 time_t *pm)
1920 : c(_c), pmtime(pm)
1921{
11fdf7f2 1922 ceph_assert(!c->io);
7c673cae
FG
1923 c->get();
1924}
1925
1926void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r)
1927{
9f95a23c 1928 c->lock.lock();
7c673cae
FG
1929 c->rval = r;
1930 c->complete = true;
9f95a23c 1931 c->cond.notify_all();
7c673cae
FG
1932
1933 if (r >= 0 && pmtime) {
1934 *pmtime = real_clock::to_time_t(mtime);
1935 }
1936
1937 if (c->callback_complete) {
f67539c2 1938 boost::asio::defer(c->io->client->finish_strand, CB_AioComplete(c));
7c673cae
FG
1939 }
1940
1941 c->put_unlock();
1942}
1943
1944///////////////////////////// C_aio_stat2_Ack ////////////////////////////
1945
1946librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl *_c,
1947 struct timespec *pt)
1948 : c(_c), pts(pt)
1949{
11fdf7f2 1950 ceph_assert(!c->io);
7c673cae
FG
1951 c->get();
1952}
1953
1954void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r)
1955{
9f95a23c 1956 c->lock.lock();
7c673cae
FG
1957 c->rval = r;
1958 c->complete = true;
9f95a23c 1959 c->cond.notify_all();
7c673cae
FG
1960
1961 if (r >= 0 && pts) {
1962 *pts = real_clock::to_timespec(mtime);
1963 }
1964
1965 if (c->callback_complete) {
f67539c2 1966 boost::asio::defer(c->io->client->finish_strand, CB_AioComplete(c));
7c673cae
FG
1967 }
1968
1969 c->put_unlock();
1970}
1971
1972//////////////////////////// C_aio_Complete ////////////////////////////////
1973
1974librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl *_c)
1975 : c(_c)
1976{
1977 c->get();
1978}
1979
1980void librados::IoCtxImpl::C_aio_Complete::finish(int r)
1981{
9f95a23c 1982 c->lock.lock();
11fdf7f2
TL
1983 // Leave an existing rval unless r != 0
1984 if (r)
1985 c->rval = r; // This clears the error set in C_ObjectOperation_scrub_ls::finish()
7c673cae 1986 c->complete = true;
9f95a23c 1987 c->cond.notify_all();
7c673cae
FG
1988
1989 if (r == 0 && c->blp && c->blp->length() > 0) {
1adf2230
AA
1990 if (c->out_buf && !c->blp->is_contiguous()) {
1991 c->rval = -ERANGE;
1992 } else {
11fdf7f2 1993 if (c->out_buf && !c->blp->is_provided_buffer(c->out_buf))
9f95a23c 1994 c->blp->begin().copy(c->blp->length(), c->out_buf);
11fdf7f2 1995
1adf2230
AA
1996 c->rval = c->blp->length();
1997 }
7c673cae
FG
1998 }
1999
2000 if (c->callback_complete ||
2001 c->callback_safe) {
f67539c2 2002 boost::asio::defer(c->io->client->finish_strand, CB_AioComplete(c));
7c673cae
FG
2003 }
2004
2005 if (c->aio_write_seq) {
2006 c->io->complete_aio_write(c);
2007 }
2008
9f95a23c 2009#if defined(WITH_EVENTTRACE)
7c673cae
FG
2010 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE");
2011#endif
2012 c->put_unlock();
2013}
2014
2015void librados::IoCtxImpl::object_list_slice(
2016 const hobject_t start,
2017 const hobject_t finish,
2018 const size_t n,
2019 const size_t m,
2020 hobject_t *split_start,
2021 hobject_t *split_finish)
2022{
2023 if (start.is_max()) {
2024 *split_start = hobject_t::get_max();
2025 *split_finish = hobject_t::get_max();
2026 return;
2027 }
2028
2029 uint64_t start_hash = hobject_t::_reverse_bits(start.get_hash());
2030 uint64_t finish_hash =
2031 finish.is_max() ? 0x100000000 :
2032 hobject_t::_reverse_bits(finish.get_hash());
2033
2034 uint64_t diff = finish_hash - start_hash;
2035 uint64_t rev_start = start_hash + (diff * n / m);
2036 uint64_t rev_finish = start_hash + (diff * (n + 1) / m);
2037 if (n == 0) {
2038 *split_start = start;
2039 } else {
2040 *split_start = hobject_t(
2041 object_t(), string(), CEPH_NOSNAP,
2042 hobject_t::_reverse_bits(rev_start), poolid, string());
2043 }
2044
2045 if (n == m - 1)
2046 *split_finish = finish;
2047 else if (rev_finish >= 0x100000000)
2048 *split_finish = hobject_t::get_max();
2049 else
2050 *split_finish = hobject_t(
2051 object_t(), string(), CEPH_NOSNAP,
2052 hobject_t::_reverse_bits(rev_finish), poolid, string());
2053}
2054
c07f9fc5
FG
2055int librados::IoCtxImpl::application_enable(const std::string& app_name,
2056 bool force)
2057{
2058 auto c = new PoolAsyncCompletionImpl();
2059 application_enable_async(app_name, force, c);
2060
2061 int r = c->wait();
11fdf7f2 2062 ceph_assert(r == 0);
c07f9fc5
FG
2063
2064 r = c->get_return_value();
2065 c->release();
f67539c2 2066 c->put();
c07f9fc5
FG
2067 if (r < 0) {
2068 return r;
2069 }
2070
2071 return client->wait_for_latest_osdmap();
2072}
2073
2074void librados::IoCtxImpl::application_enable_async(const std::string& app_name,
2075 bool force,
2076 PoolAsyncCompletionImpl *c)
2077{
2078 // pre-Luminous clusters will return -EINVAL and application won't be
2079 // preserved until Luminous is configured as minimim version.
2080 if (!client->get_required_monitor_features().contains_all(
2081 ceph::features::mon::FEATURE_LUMINOUS)) {
f67539c2
TL
2082 boost::asio::defer(client->finish_strand,
2083 [cb = CB_PoolAsync_Safe(c)]() mutable {
2084 cb(-EOPNOTSUPP);
2085 });
c07f9fc5
FG
2086 return;
2087 }
2088
2089 std::stringstream cmd;
2090 cmd << "{"
2091 << "\"prefix\": \"osd pool application enable\","
2092 << "\"pool\": \"" << get_cached_pool_name() << "\","
2093 << "\"app\": \"" << app_name << "\"";
2094 if (force) {
11fdf7f2 2095 cmd << ",\"yes_i_really_mean_it\": true";
c07f9fc5
FG
2096 }
2097 cmd << "}";
2098
2099 std::vector<std::string> cmds;
2100 cmds.push_back(cmd.str());
2101 bufferlist inbl;
2102 client->mon_command_async(cmds, inbl, nullptr, nullptr,
f67539c2 2103 make_lambda_context(CB_PoolAsync_Safe(c)));
c07f9fc5
FG
2104}
2105
2106int librados::IoCtxImpl::application_list(std::set<std::string> *app_names)
2107{
2108 int r = 0;
2109 app_names->clear();
2110 objecter->with_osdmap([&](const OSDMap& o) {
2111 auto pg_pool = o.get_pg_pool(poolid);
2112 if (pg_pool == nullptr) {
2113 r = -ENOENT;
2114 return;
2115 }
2116
2117 for (auto &pair : pg_pool->application_metadata) {
2118 app_names->insert(pair.first);
2119 }
2120 });
2121 return r;
2122}
2123
2124int librados::IoCtxImpl::application_metadata_get(const std::string& app_name,
2125 const std::string &key,
2126 std::string* value)
2127{
2128 int r = 0;
2129 objecter->with_osdmap([&](const OSDMap& o) {
2130 auto pg_pool = o.get_pg_pool(poolid);
2131 if (pg_pool == nullptr) {
2132 r = -ENOENT;
2133 return;
2134 }
2135
2136 auto app_it = pg_pool->application_metadata.find(app_name);
2137 if (app_it == pg_pool->application_metadata.end()) {
2138 r = -ENOENT;
2139 return;
2140 }
2141
2142 auto it = app_it->second.find(key);
2143 if (it == app_it->second.end()) {
2144 r = -ENOENT;
2145 return;
2146 }
2147
2148 *value = it->second;
2149 });
2150 return r;
2151}
2152
2153int librados::IoCtxImpl::application_metadata_set(const std::string& app_name,
2154 const std::string &key,
2155 const std::string& value)
2156{
2157 std::stringstream cmd;
2158 cmd << "{"
2159 << "\"prefix\":\"osd pool application set\","
2160 << "\"pool\":\"" << get_cached_pool_name() << "\","
2161 << "\"app\":\"" << app_name << "\","
2162 << "\"key\":\"" << key << "\","
2163 << "\"value\":\"" << value << "\""
2164 << "}";
2165
2166 std::vector<std::string> cmds;
2167 cmds.push_back(cmd.str());
2168 bufferlist inbl;
2169 int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2170 if (r < 0) {
2171 return r;
2172 }
2173
2174 // ensure we have the latest osd map epoch before proceeding
2175 return client->wait_for_latest_osdmap();
2176}
2177
2178int librados::IoCtxImpl::application_metadata_remove(const std::string& app_name,
2179 const std::string &key)
2180{
2181 std::stringstream cmd;
2182 cmd << "{"
2183 << "\"prefix\":\"osd pool application rm\","
2184 << "\"pool\":\"" << get_cached_pool_name() << "\","
2185 << "\"app\":\"" << app_name << "\","
2186 << "\"key\":\"" << key << "\""
2187 << "}";
2188
2189 std::vector<std::string> cmds;
2190 cmds.push_back(cmd.str());
2191 bufferlist inbl;
2192 int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2193 if (r < 0) {
2194 return r;
2195 }
2196
2197 // ensure we have the latest osd map epoch before proceeding
2198 return client->wait_for_latest_osdmap();
2199}
2200
2201int librados::IoCtxImpl::application_metadata_list(const std::string& app_name,
2202 std::map<std::string, std::string> *values)
2203{
2204 int r = 0;
2205 values->clear();
2206 objecter->with_osdmap([&](const OSDMap& o) {
2207 auto pg_pool = o.get_pg_pool(poolid);
2208 if (pg_pool == nullptr) {
2209 r = -ENOENT;
2210 return;
2211 }
2212
2213 auto it = pg_pool->application_metadata.find(app_name);
2214 if (it == pg_pool->application_metadata.end()) {
2215 r = -ENOENT;
2216 return;
2217 }
2218
2219 *values = it->second;
2220 });
2221 return r;
2222}
2223