]> git.proxmox.com Git - ceph.git/blame - ceph/src/librados/IoCtxImpl.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / librados / IoCtxImpl.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <limits.h>
16
17#include "IoCtxImpl.h"
18
11fdf7f2 19#include "librados/librados_c.h"
7c673cae
FG
20#include "librados/AioCompletionImpl.h"
21#include "librados/PoolAsyncCompletionImpl.h"
22#include "librados/RadosClient.h"
11fdf7f2 23#include "include/ceph_assert.h"
7c673cae
FG
24#include "common/valgrind.h"
25#include "common/EventTrace.h"
26
27#define dout_subsys ceph_subsys_rados
28#undef dout_prefix
29#define dout_prefix *_dout << "librados: "
30
f67539c2
TL
31namespace bs = boost::system;
32namespace ca = ceph::async;
33namespace cb = ceph::buffer;
34
7c673cae
FG
35namespace librados {
36namespace {
37
f67539c2 38struct CB_notify_Finish {
7c673cae
FG
39 CephContext *cct;
40 Context *ctx;
41 Objecter *objecter;
42 Objecter::LingerOp *linger_op;
7c673cae
FG
43 bufferlist *preply_bl;
44 char **preply_buf;
45 size_t *preply_buf_len;
46
f67539c2
TL
47 CB_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
48 Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
49 char **_preply_buf, size_t *_preply_buf_len)
7c673cae
FG
50 : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
51 preply_bl(_preply_bl), preply_buf(_preply_buf),
f67539c2 52 preply_buf_len(_preply_buf_len) {}
7c673cae 53
f67539c2
TL
54
55 // move-only
56 CB_notify_Finish(const CB_notify_Finish&) = delete;
57 CB_notify_Finish& operator =(const CB_notify_Finish&) = delete;
58 CB_notify_Finish(CB_notify_Finish&&) = default;
59 CB_notify_Finish& operator =(CB_notify_Finish&&) = default;
60
61 void operator()(bs::error_code ec, bufferlist&& reply_bl) {
7c673cae 62 ldout(cct, 10) << __func__ << " completed notify (linger op "
f67539c2 63 << linger_op << "), ec = " << ec << dendl;
7c673cae
FG
64
65 // pass result back to user
66 // NOTE: we do this regardless of what error code we return
67 if (preply_buf) {
68 if (reply_bl.length()) {
69 *preply_buf = (char*)malloc(reply_bl.length());
70 memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
71 } else {
72 *preply_buf = NULL;
73 }
74 }
75 if (preply_buf_len)
76 *preply_buf_len = reply_bl.length();
77 if (preply_bl)
f67539c2 78 *preply_bl = std::move(reply_bl);
7c673cae 79
f67539c2 80 ctx->complete(ceph::from_error_code(ec));
7c673cae
FG
81 }
82};
83
f67539c2 84struct CB_aio_linger_cancel {
7c673cae
FG
85 Objecter *objecter;
86 Objecter::LingerOp *linger_op;
87
f67539c2 88 CB_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
7c673cae
FG
89 : objecter(_objecter), linger_op(_linger_op)
90 {
91 }
92
f67539c2 93 void operator()() {
7c673cae
FG
94 objecter->linger_cancel(linger_op);
95 }
96};
97
98struct C_aio_linger_Complete : public Context {
99 AioCompletionImpl *c;
100 Objecter::LingerOp *linger_op;
101 bool cancel;
102
103 C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel)
104 : c(_c), linger_op(_linger_op), cancel(_cancel)
105 {
106 c->get();
107 }
108
109 void finish(int r) override {
110 if (cancel || r < 0)
f67539c2
TL
111 boost::asio::defer(c->io->client->finish_strand,
112 CB_aio_linger_cancel(c->io->objecter,
113 linger_op));
7c673cae 114
9f95a23c 115 c->lock.lock();
7c673cae
FG
116 c->rval = r;
117 c->complete = true;
9f95a23c 118 c->cond.notify_all();
7c673cae
FG
119
120 if (c->callback_complete ||
121 c->callback_safe) {
f67539c2 122 boost::asio::defer(c->io->client->finish_strand, CB_AioComplete(c));
7c673cae
FG
123 }
124 c->put_unlock();
125 }
126};
127
128struct C_aio_notify_Complete : public C_aio_linger_Complete {
9f95a23c 129 ceph::mutex lock = ceph::make_mutex("C_aio_notify_Complete::lock");
7c673cae
FG
130 bool acked = false;
131 bool finished = false;
132 int ret_val = 0;
133
134 C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
9f95a23c 135 : C_aio_linger_Complete(_c, _linger_op, false) {
7c673cae
FG
136 }
137
138 void handle_ack(int r) {
139 // invoked by C_aio_notify_Ack
9f95a23c 140 lock.lock();
7c673cae
FG
141 acked = true;
142 complete_unlock(r);
143 }
144
145 void complete(int r) override {
28e407b8 146 // invoked by C_notify_Finish
9f95a23c 147 lock.lock();
7c673cae
FG
148 finished = true;
149 complete_unlock(r);
150 }
151
152 void complete_unlock(int r) {
153 if (ret_val == 0 && r < 0) {
154 ret_val = r;
155 }
156
157 if (acked && finished) {
9f95a23c 158 lock.unlock();
7c673cae
FG
159 cancel = true;
160 C_aio_linger_Complete::complete(ret_val);
161 } else {
9f95a23c 162 lock.unlock();
7c673cae
FG
163 }
164 }
165};
166
167struct C_aio_notify_Ack : public Context {
168 CephContext *cct;
7c673cae
FG
169 C_aio_notify_Complete *oncomplete;
170
f67539c2 171 C_aio_notify_Ack(CephContext *_cct,
7c673cae 172 C_aio_notify_Complete *_oncomplete)
f67539c2 173 : cct(_cct), oncomplete(_oncomplete)
7c673cae
FG
174 {
175 }
176
177 void finish(int r) override
178 {
179 ldout(cct, 10) << __func__ << " linger op " << oncomplete->linger_op << " "
180 << "acked (" << r << ")" << dendl;
181 oncomplete->handle_ack(r);
7c673cae
FG
182 }
183};
184
185struct C_aio_selfmanaged_snap_op_Complete : public Context {
186 librados::RadosClient *client;
187 librados::AioCompletionImpl *c;
188
189 C_aio_selfmanaged_snap_op_Complete(librados::RadosClient *client,
190 librados::AioCompletionImpl *c)
191 : client(client), c(c) {
192 c->get();
193 }
194
195 void finish(int r) override {
9f95a23c 196 c->lock.lock();
7c673cae
FG
197 c->rval = r;
198 c->complete = true;
9f95a23c 199 c->cond.notify_all();
7c673cae
FG
200
201 if (c->callback_complete || c->callback_safe) {
f67539c2 202 boost::asio::defer(client->finish_strand, librados::CB_AioComplete(c));
7c673cae
FG
203 }
204 c->put_unlock();
205 }
206};
207
208struct C_aio_selfmanaged_snap_create_Complete : public C_aio_selfmanaged_snap_op_Complete {
209 snapid_t snapid;
210 uint64_t *dest_snapid;
211
212 C_aio_selfmanaged_snap_create_Complete(librados::RadosClient *client,
213 librados::AioCompletionImpl *c,
214 uint64_t *dest_snapid)
215 : C_aio_selfmanaged_snap_op_Complete(client, c),
216 dest_snapid(dest_snapid) {
217 }
218
219 void finish(int r) override {
220 if (r >= 0) {
221 *dest_snapid = snapid;
222 }
223 C_aio_selfmanaged_snap_op_Complete::finish(r);
224 }
225};
226
227} // anonymous namespace
228} // namespace librados
229
9f95a23c 230librados::IoCtxImpl::IoCtxImpl() = default;
7c673cae
FG
231
232librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter,
233 int64_t poolid, snapid_t s)
9f95a23c 234 : client(c), poolid(poolid), snap_seq(s),
7c673cae 235 notify_timeout(c->cct->_conf->client_notify_timeout),
9f95a23c 236 oloc(poolid),
7c673cae
FG
237 aio_write_seq(0), objecter(objecter)
238{
239}
240
241void librados::IoCtxImpl::set_snap_read(snapid_t s)
242{
243 if (!s)
244 s = CEPH_NOSNAP;
245 ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl;
246 snap_seq = s;
247}
248
249int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps)
250{
251 ::SnapContext n;
252 ldout(client->cct, 10) << "set snap write context: seq = " << seq
253 << " and snaps = " << snaps << dendl;
254 n.seq = seq;
255 n.snaps = snaps;
256 if (!n.is_valid())
257 return -EINVAL;
258 snapc = n;
259 return 0;
260}
261
262int librados::IoCtxImpl::get_object_hash_position(
263 const std::string& oid, uint32_t *hash_position)
264{
265 int64_t r = objecter->get_object_hash_position(poolid, oid, oloc.nspace);
266 if (r < 0)
267 return r;
268 *hash_position = (uint32_t)r;
269 return 0;
270}
271
272int librados::IoCtxImpl::get_object_pg_hash_position(
273 const std::string& oid, uint32_t *pg_hash_position)
274{
275 int64_t r = objecter->get_object_pg_hash_position(poolid, oid, oloc.nspace);
276 if (r < 0)
277 return r;
278 *pg_hash_position = (uint32_t)r;
279 return 0;
280}
281
282void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
283{
284 get();
9f95a23c 285 std::scoped_lock l{aio_write_list_lock};
11fdf7f2 286 ceph_assert(c->io == this);
7c673cae
FG
287 c->aio_write_seq = ++aio_write_seq;
288 ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
289 << " write_seq " << aio_write_seq << dendl;
290 aio_write_list.push_back(&c->aio_write_list_item);
7c673cae
FG
291}
292
293void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
294{
295 ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
9f95a23c 296 aio_write_list_lock.lock();
11fdf7f2 297 ceph_assert(c->io == this);
7c673cae
FG
298 c->aio_write_list_item.remove_myself();
299
300 map<ceph_tid_t, std::list<AioCompletionImpl*> >::iterator waiters = aio_write_waiters.begin();
301 while (waiters != aio_write_waiters.end()) {
302 if (!aio_write_list.empty() &&
303 aio_write_list.front()->aio_write_seq <= waiters->first) {
304 ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq
305 << " <= waiter " << waiters->first
306 << ", stopping" << dendl;
307 break;
308 }
309 ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl;
310 for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
311 it != waiters->second.end(); ++it) {
f67539c2 312 boost::asio::defer(client->finish_strand, CB_AioCompleteAndSafe(*it));
7c673cae
FG
313 (*it)->put();
314 }
315 aio_write_waiters.erase(waiters++);
316 }
317
9f95a23c
TL
318 aio_write_cond.notify_all();
319 aio_write_list_lock.unlock();
7c673cae
FG
320 put();
321}
322
323void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
324{
325 ldout(client->cct, 20) << "flush_aio_writes_async " << this
326 << " completion " << c << dendl;
11fdf7f2 327 std::lock_guard l(aio_write_list_lock);
7c673cae
FG
328 ceph_tid_t seq = aio_write_seq;
329 if (aio_write_list.empty()) {
330 ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid "
331 << seq << ")" << dendl;
f67539c2 332 boost::asio::defer(client->finish_strand, CB_AioCompleteAndSafe(c));
7c673cae
FG
333 } else {
334 ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size()
335 << " writes in flight; waiting on tid " << seq << dendl;
336 c->get();
337 aio_write_waiters[seq].push_back(c);
338 }
339}
340
341void librados::IoCtxImpl::flush_aio_writes()
342{
343 ldout(client->cct, 20) << "flush_aio_writes" << dendl;
9f95a23c
TL
344 std::unique_lock l{aio_write_list_lock};
345 aio_write_cond.wait(l, [seq=aio_write_seq, this] {
346 return (aio_write_list.empty() ||
347 aio_write_list.front()->aio_write_seq > seq);
348 });
7c673cae
FG
349}
350
351string librados::IoCtxImpl::get_cached_pool_name()
352{
353 std::string pn;
354 client->pool_get_name(get_id(), &pn);
355 return pn;
356}
357
358// SNAPS
359
360int librados::IoCtxImpl::snap_create(const char *snapName)
361{
362 int reply;
363 string sName(snapName);
364
9f95a23c
TL
365 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::snap_create::mylock");
366 ceph::condition_variable cond;
7c673cae 367 bool done;
9f95a23c 368 Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
f67539c2 369 objecter->create_pool_snap(poolid, sName, onfinish);
7c673cae 370
f67539c2
TL
371 std::unique_lock l{mylock};
372 cond.wait(l, [&done] { return done; });
7c673cae
FG
373 return reply;
374}
375
376int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid)
377{
378 int reply;
379
9f95a23c
TL
380 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::selfmanaged_snap_create::mylock");
381 ceph::condition_variable cond;
7c673cae 382 bool done;
9f95a23c 383 Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
7c673cae 384 snapid_t snapid;
f67539c2 385 objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish);
7c673cae 386
f67539c2
TL
387 {
388 std::unique_lock l{mylock};
389 cond.wait(l, [&done] { return done; });
7c673cae 390 }
f67539c2
TL
391 if (reply == 0)
392 *psnapid = snapid;
7c673cae
FG
393 return reply;
394}
395
396void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid,
397 AioCompletionImpl *c)
398{
399 C_aio_selfmanaged_snap_create_Complete *onfinish =
400 new C_aio_selfmanaged_snap_create_Complete(client, c, snapid);
f67539c2
TL
401 objecter->allocate_selfmanaged_snap(poolid, &onfinish->snapid,
402 onfinish);
7c673cae
FG
403}
404
405int librados::IoCtxImpl::snap_remove(const char *snapName)
406{
407 int reply;
408 string sName(snapName);
409
9f95a23c
TL
410 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::snap_remove::mylock");
411 ceph::condition_variable cond;
7c673cae 412 bool done;
9f95a23c 413 Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
f67539c2
TL
414 objecter->delete_pool_snap(poolid, sName, onfinish);
415 unique_lock l{mylock};
416 cond.wait(l, [&done] { return done; });
7c673cae
FG
417 return reply;
418}
419
420int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
421 ::SnapContext& snapc,
422 uint64_t snapid)
423{
424 int reply;
425
9f95a23c
TL
426 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::snap_rollback::mylock");
427 ceph::condition_variable cond;
7c673cae 428 bool done;
9f95a23c 429 Context *onack = new C_SafeCond(mylock, cond, &done, &reply);
7c673cae
FG
430
431 ::ObjectOperation op;
432 prepare_assert_ops(&op);
433 op.rollback(snapid);
434 objecter->mutate(oid, oloc,
f67539c2
TL
435 op, snapc, ceph::real_clock::now(),
436 extra_op_flags,
7c673cae
FG
437 onack, NULL);
438
9f95a23c
TL
439 std::unique_lock l{mylock};
440 cond.wait(l, [&done] { return done; });
7c673cae
FG
441 return reply;
442}
443
444int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName)
445{
446 snapid_t snap;
447
448 int r = objecter->pool_snap_by_name(poolid, snapName, &snap);
449 if (r < 0) {
450 return r;
451 }
452
453 return selfmanaged_snap_rollback_object(oid, snapc, snap);
454}
455
456int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid)
457{
458 int reply;
459
9f95a23c
TL
460 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::selfmanaged_snap_remove::mylock");
461 ceph::condition_variable cond;
7c673cae
FG
462 bool done;
463 objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid),
9f95a23c 464 new C_SafeCond(mylock, cond, &done, &reply));
7c673cae 465
9f95a23c
TL
466 std::unique_lock l{mylock};
467 cond.wait(l, [&done] { return done; });
7c673cae
FG
468 return (int)reply;
469}
470
471void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid,
472 AioCompletionImpl *c)
473{
474 Context *onfinish = new C_aio_selfmanaged_snap_op_Complete(client, c);
475 objecter->delete_selfmanaged_snap(poolid, snapid, onfinish);
476}
477
7c673cae
FG
478int librados::IoCtxImpl::snap_list(vector<uint64_t> *snaps)
479{
480 return objecter->pool_snap_list(poolid, snaps);
481}
482
483int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid)
484{
485 return objecter->pool_snap_by_name(poolid, name, (snapid_t *)snapid);
486}
487
488int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s)
489{
490 pool_snap_info_t info;
491 int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
492 if (ret < 0) {
493 return ret;
494 }
495 *s = info.name.c_str();
496 return 0;
497}
498
499int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t)
500{
501 pool_snap_info_t info;
502 int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
503 if (ret < 0) {
504 return ret;
505 }
506 *t = info.stamp.sec();
507 return 0;
508}
509
510
511// IO
512
513int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries)
514{
7c673cae
FG
515 bool done;
516 int r = 0;
9f95a23c
TL
517 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::nlist::mylock");
518 ceph::condition_variable cond;
7c673cae
FG
519
520 if (context->at_end())
521 return 0;
522
523 context->max_entries = max_entries;
524 context->nspace = oloc.nspace;
525
9f95a23c 526 objecter->list_nobjects(context, new C_SafeCond(mylock, cond, &done, &r));
7c673cae 527
9f95a23c
TL
528 std::unique_lock l{mylock};
529 cond.wait(l, [&done] { return done; });
7c673cae
FG
530 return r;
531}
532
533uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
534 uint32_t pos)
535{
536 context->list.clear();
537 return objecter->list_nobjects_seek(context, pos);
538}
539
540uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
541 const rados_object_list_cursor& cursor)
542{
543 context->list.clear();
544 return objecter->list_nobjects_seek(context, *(const hobject_t *)cursor);
545}
546
547rados_object_list_cursor librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext *context)
548{
549 hobject_t *c = new hobject_t;
550
551 objecter->list_nobjects_get_cursor(context, c);
552 return (rados_object_list_cursor)c;
553}
554
555int librados::IoCtxImpl::create(const object_t& oid, bool exclusive)
556{
557 ::ObjectOperation op;
558 prepare_assert_ops(&op);
559 op.create(exclusive);
560 return operate(oid, &op, NULL);
561}
562
563/*
564 * add any version assert operations that are appropriate given the
565 * stat in the IoCtx, either the target version assert or any src
566 * object asserts. these affect a single ioctx operation, so clear
567 * the ioctx state when we're doing.
568 *
569 * return a pointer to the ObjectOperation if we added any events;
570 * this is convenient for passing the extra_ops argument into Objecter
571 * methods.
572 */
573::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op)
574{
575 ::ObjectOperation *pop = NULL;
576 if (assert_ver) {
577 op->assert_version(assert_ver);
578 assert_ver = 0;
579 pop = op;
580 }
581 return pop;
582}
583
584int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
585 size_t len, uint64_t off)
586{
587 if (len > UINT_MAX/2)
588 return -E2BIG;
589 ::ObjectOperation op;
590 prepare_assert_ops(&op);
591 bufferlist mybl;
592 mybl.substr_of(bl, 0, len);
593 op.write(off, mybl);
594 return operate(oid, &op, NULL);
595}
596
597int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len)
598{
599 if (len > UINT_MAX/2)
600 return -E2BIG;
601 ::ObjectOperation op;
602 prepare_assert_ops(&op);
603 bufferlist mybl;
604 mybl.substr_of(bl, 0, len);
605 op.append(mybl);
606 return operate(oid, &op, NULL);
607}
608
609int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl)
610{
611 if (bl.length() > UINT_MAX/2)
612 return -E2BIG;
613 ::ObjectOperation op;
614 prepare_assert_ops(&op);
615 op.write_full(bl);
616 return operate(oid, &op, NULL);
617}
618
619int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl,
620 size_t write_len, uint64_t off)
621{
622 if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
623 return -E2BIG;
624 if ((bl.length() == 0) || (write_len % bl.length()))
625 return -EINVAL;
626 ::ObjectOperation op;
627 prepare_assert_ops(&op);
628 bufferlist mybl;
629 mybl.substr_of(bl, 0, bl.length());
630 op.writesame(off, write_len, mybl);
631 return operate(oid, &op, NULL);
632}
633
634int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
635 ceph::real_time *pmtime, int flags)
636{
637 ceph::real_time ut = (pmtime ? *pmtime :
638 ceph::real_clock::now());
639
640 /* can't write to a snapshot */
641 if (snap_seq != CEPH_NOSNAP)
642 return -EROFS;
643
644 if (!o->size())
645 return 0;
646
9f95a23c
TL
647 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::operate::mylock");
648 ceph::condition_variable cond;
7c673cae
FG
649 bool done;
650 int r;
651 version_t ver;
652
9f95a23c 653 Context *oncommit = new C_SafeCond(mylock, cond, &done, &r);
7c673cae
FG
654
655 int op = o->ops[0].op.op;
656 ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid
657 << " nspace=" << oloc.nspace << dendl;
f67539c2
TL
658 Objecter::Op *objecter_op = objecter->prepare_mutate_op(
659 oid, oloc,
660 *o, snapc, ut,
661 flags | extra_op_flags,
662 oncommit, &ver);
7c673cae
FG
663 objecter->op_submit(objecter_op);
664
9f95a23c
TL
665 {
666 std::unique_lock l{mylock};
667 cond.wait(l, [&done] { return done;});
668 }
7c673cae
FG
669 ldout(client->cct, 10) << "Objecter returned from "
670 << ceph_osd_op_name(op) << " r=" << r << dendl;
671
672 set_sync_op_version(ver);
673
674 return r;
675}
676
677int librados::IoCtxImpl::operate_read(const object_t& oid,
678 ::ObjectOperation *o,
679 bufferlist *pbl,
680 int flags)
681{
682 if (!o->size())
683 return 0;
684
9f95a23c
TL
685 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::operate_read::mylock");
686 ceph::condition_variable cond;
7c673cae
FG
687 bool done;
688 int r;
689 version_t ver;
690
9f95a23c 691 Context *onack = new C_SafeCond(mylock, cond, &done, &r);
7c673cae
FG
692
693 int op = o->ops[0].op.op;
694 ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
f67539c2
TL
695 Objecter::Op *objecter_op = objecter->prepare_read_op(
696 oid, oloc,
697 *o, snap_seq, pbl,
698 flags | extra_op_flags,
699 onack, &ver);
7c673cae
FG
700 objecter->op_submit(objecter_op);
701
9f95a23c
TL
702 {
703 std::unique_lock l{mylock};
704 cond.wait(l, [&done] { return done; });
705 }
7c673cae
FG
706 ldout(client->cct, 10) << "Objecter returned from "
707 << ceph_osd_op_name(op) << " r=" << r << dendl;
708
709 set_sync_op_version(ver);
710
711 return r;
712}
713
714int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
715 ::ObjectOperation *o,
716 AioCompletionImpl *c,
717 int flags,
718 bufferlist *pbl,
31f18b77 719 const blkin_trace_info *trace_info)
7c673cae 720{
11fdf7f2 721 FUNCTRACE(client->cct);
7c673cae
FG
722 Context *oncomplete = new C_aio_Complete(c);
723
9f95a23c 724#if defined(WITH_EVENTTRACE)
7c673cae
FG
725 ((C_aio_Complete *) oncomplete)->oid = oid;
726#endif
727 c->is_read = true;
728 c->io = this;
729
730 ZTracer::Trace trace;
31f18b77
FG
731 if (trace_info) {
732 ZTracer::Trace parent_trace("", nullptr, trace_info);
733 trace.init("rados operate read", &objecter->trace_endpoint, &parent_trace);
734 }
7c673cae
FG
735
736 trace.event("init root span");
f67539c2
TL
737 Objecter::Op *objecter_op = objecter->prepare_read_op(
738 oid, oloc,
739 *o, snap_seq, pbl, flags | extra_op_flags,
740 oncomplete, &c->objver, nullptr, 0, &trace);
7c673cae
FG
741 objecter->op_submit(objecter_op, &c->tid);
742 trace.event("rados operate read submitted");
743
744 return 0;
745}
746
747int librados::IoCtxImpl::aio_operate(const object_t& oid,
748 ::ObjectOperation *o, AioCompletionImpl *c,
749 const SnapContext& snap_context, int flags,
31f18b77 750 const blkin_trace_info *trace_info)
7c673cae 751{
11fdf7f2 752 FUNCTRACE(client->cct);
7c673cae
FG
753 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
754 auto ut = ceph::real_clock::now();
755 /* can't write to a snapshot */
756 if (snap_seq != CEPH_NOSNAP)
757 return -EROFS;
758
759 Context *oncomplete = new C_aio_Complete(c);
9f95a23c 760#if defined(WITH_EVENTTRACE)
7c673cae
FG
761 ((C_aio_Complete *) oncomplete)->oid = oid;
762#endif
763
764 c->io = this;
765 queue_aio_write(c);
766
767 ZTracer::Trace trace;
31f18b77
FG
768 if (trace_info) {
769 ZTracer::Trace parent_trace("", nullptr, trace_info);
770 trace.init("rados operate", &objecter->trace_endpoint, &parent_trace);
771 }
7c673cae
FG
772
773 trace.event("init root span");
774 Objecter::Op *op = objecter->prepare_mutate_op(
f67539c2 775 oid, oloc, *o, snap_context, ut, flags | extra_op_flags,
7c673cae
FG
776 oncomplete, &c->objver, osd_reqid_t(), &trace);
777 objecter->op_submit(op, &c->tid);
778 trace.event("rados operate op submitted");
779
780 return 0;
781}
782
783int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
784 bufferlist *pbl, size_t len, uint64_t off,
785 uint64_t snapid, const blkin_trace_info *info)
786{
11fdf7f2 787 FUNCTRACE(client->cct);
7c673cae
FG
788 if (len > (size_t) INT_MAX)
789 return -EDOM;
790
791 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
792 Context *oncomplete = new C_aio_Complete(c);
793
9f95a23c 794#if defined(WITH_EVENTTRACE)
7c673cae
FG
795 ((C_aio_Complete *) oncomplete)->oid = oid;
796#endif
797 c->is_read = true;
798 c->io = this;
799 c->blp = pbl;
800
801 ZTracer::Trace trace;
802 if (info)
803 trace.init("rados read", &objecter->trace_endpoint, info);
804
805 Objecter::Op *o = objecter->prepare_read_op(
806 oid, oloc,
f67539c2 807 off, len, snapid, pbl, extra_op_flags,
7c673cae
FG
808 oncomplete, &c->objver, nullptr, 0, &trace);
809 objecter->op_submit(o, &c->tid);
810 return 0;
811}
812
813int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
814 char *buf, size_t len, uint64_t off,
815 uint64_t snapid, const blkin_trace_info *info)
816{
11fdf7f2 817 FUNCTRACE(client->cct);
7c673cae
FG
818 if (len > (size_t) INT_MAX)
819 return -EDOM;
820
821 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
822 Context *oncomplete = new C_aio_Complete(c);
823
9f95a23c 824#if defined(WITH_EVENTTRACE)
7c673cae
FG
825 ((C_aio_Complete *) oncomplete)->oid = oid;
826#endif
827 c->is_read = true;
828 c->io = this;
829 c->bl.clear();
830 c->bl.push_back(buffer::create_static(len, buf));
831 c->blp = &c->bl;
832 c->out_buf = buf;
833
834 ZTracer::Trace trace;
835 if (info)
836 trace.init("rados read", &objecter->trace_endpoint, info);
837
838 Objecter::Op *o = objecter->prepare_read_op(
839 oid, oloc,
f67539c2 840 off, len, snapid, &c->bl, extra_op_flags,
7c673cae
FG
841 oncomplete, &c->objver, nullptr, 0, &trace);
842 objecter->op_submit(o, &c->tid);
843 return 0;
844}
845
846class C_ObjectOperation : public Context {
847public:
848 ::ObjectOperation m_ops;
849 explicit C_ObjectOperation(Context *c) : m_ctx(c) {}
850 void finish(int r) override {
851 m_ctx->complete(r);
852 }
853private:
854 Context *m_ctx;
855};
856
857int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
858 AioCompletionImpl *c,
859 std::map<uint64_t,uint64_t> *m,
860 bufferlist *data_bl, size_t len,
861 uint64_t off, uint64_t snapid)
862{
11fdf7f2 863 FUNCTRACE(client->cct);
7c673cae
FG
864 if (len > (size_t) INT_MAX)
865 return -EDOM;
866
867 Context *nested = new C_aio_Complete(c);
868 C_ObjectOperation *onack = new C_ObjectOperation(nested);
869
9f95a23c 870#if defined(WITH_EVENTTRACE)
7c673cae
FG
871 ((C_aio_Complete *) nested)->oid = oid;
872#endif
873 c->is_read = true;
874 c->io = this;
875
876 onack->m_ops.sparse_read(off, len, m, data_bl, NULL);
877
878 Objecter::Op *o = objecter->prepare_read_op(
879 oid, oloc,
f67539c2 880 onack->m_ops, snapid, NULL, extra_op_flags,
7c673cae
FG
881 onack, &c->objver);
882 objecter->op_submit(o, &c->tid);
883 return 0;
884}
885
886int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
887 AioCompletionImpl *c,
888 uint64_t off,
889 bufferlist& cmp_bl)
890{
891 if (cmp_bl.length() > UINT_MAX/2)
892 return -E2BIG;
893
894 Context *onack = new C_aio_Complete(c);
895
896 c->is_read = true;
897 c->io = this;
898
899 Objecter::Op *o = objecter->prepare_cmpext_op(
f67539c2 900 oid, oloc, off, cmp_bl, snap_seq, extra_op_flags,
7c673cae
FG
901 onack, &c->objver);
902 objecter->op_submit(o, &c->tid);
903
904 return 0;
905}
906
907/* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
908int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
909 AioCompletionImpl *c,
910 const char *cmp_buf,
911 size_t cmp_len,
912 uint64_t off)
913{
914 if (cmp_len > UINT_MAX/2)
915 return -E2BIG;
916
917 bufferlist cmp_bl;
918 cmp_bl.append(cmp_buf, cmp_len);
919
920 Context *nested = new C_aio_Complete(c);
921 C_ObjectOperation *onack = new C_ObjectOperation(nested);
922
923 c->is_read = true;
924 c->io = this;
925
926 onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL);
927
928 Objecter::Op *o = objecter->prepare_read_op(
f67539c2 929 oid, oloc, onack->m_ops, snap_seq, NULL, extra_op_flags, onack, &c->objver);
7c673cae
FG
930 objecter->op_submit(o, &c->tid);
931 return 0;
932}
933
934int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
935 const bufferlist& bl, size_t len,
936 uint64_t off, const blkin_trace_info *info)
937{
11fdf7f2 938 FUNCTRACE(client->cct);
7c673cae
FG
939 auto ut = ceph::real_clock::now();
940 ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl;
941 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
942
943 if (len > UINT_MAX/2)
944 return -E2BIG;
945 /* can't write to a snapshot */
946 if (snap_seq != CEPH_NOSNAP)
947 return -EROFS;
948
949 Context *oncomplete = new C_aio_Complete(c);
950
9f95a23c 951#if defined(WITH_EVENTTRACE)
7c673cae
FG
952 ((C_aio_Complete *) oncomplete)->oid = oid;
953#endif
954 ZTracer::Trace trace;
955 if (info)
956 trace.init("rados write", &objecter->trace_endpoint, info);
957
958 c->io = this;
959 queue_aio_write(c);
960
961 Objecter::Op *o = objecter->prepare_write_op(
962 oid, oloc,
f67539c2 963 off, len, snapc, bl, ut, extra_op_flags,
7c673cae
FG
964 oncomplete, &c->objver, nullptr, 0, &trace);
965 objecter->op_submit(o, &c->tid);
966
967 return 0;
968}
969
970int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
971 const bufferlist& bl, size_t len)
972{
11fdf7f2 973 FUNCTRACE(client->cct);
7c673cae
FG
974 auto ut = ceph::real_clock::now();
975
976 if (len > UINT_MAX/2)
977 return -E2BIG;
978 /* can't write to a snapshot */
979 if (snap_seq != CEPH_NOSNAP)
980 return -EROFS;
981
982 Context *oncomplete = new C_aio_Complete(c);
9f95a23c 983#if defined(WITH_EVENTTRACE)
7c673cae
FG
984 ((C_aio_Complete *) oncomplete)->oid = oid;
985#endif
986
987 c->io = this;
988 queue_aio_write(c);
989
990 Objecter::Op *o = objecter->prepare_append_op(
991 oid, oloc,
f67539c2 992 len, snapc, bl, ut, extra_op_flags,
7c673cae
FG
993 oncomplete, &c->objver);
994 objecter->op_submit(o, &c->tid);
995
996 return 0;
997}
998
999int librados::IoCtxImpl::aio_write_full(const object_t &oid,
1000 AioCompletionImpl *c,
1001 const bufferlist& bl)
1002{
11fdf7f2 1003 FUNCTRACE(client->cct);
7c673cae
FG
1004 auto ut = ceph::real_clock::now();
1005
1006 if (bl.length() > UINT_MAX/2)
1007 return -E2BIG;
1008 /* can't write to a snapshot */
1009 if (snap_seq != CEPH_NOSNAP)
1010 return -EROFS;
1011
1012 Context *oncomplete = new C_aio_Complete(c);
9f95a23c 1013#if defined(WITH_EVENTTRACE)
7c673cae
FG
1014 ((C_aio_Complete *) oncomplete)->oid = oid;
1015#endif
1016
1017 c->io = this;
1018 queue_aio_write(c);
1019
1020 Objecter::Op *o = objecter->prepare_write_full_op(
1021 oid, oloc,
f67539c2 1022 snapc, bl, ut, extra_op_flags,
7c673cae
FG
1023 oncomplete, &c->objver);
1024 objecter->op_submit(o, &c->tid);
1025
1026 return 0;
1027}
1028
1029int librados::IoCtxImpl::aio_writesame(const object_t &oid,
1030 AioCompletionImpl *c,
1031 const bufferlist& bl,
1032 size_t write_len,
1033 uint64_t off)
1034{
11fdf7f2 1035 FUNCTRACE(client->cct);
7c673cae
FG
1036 auto ut = ceph::real_clock::now();
1037
1038 if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
1039 return -E2BIG;
1040 if ((bl.length() == 0) || (write_len % bl.length()))
1041 return -EINVAL;
1042 /* can't write to a snapshot */
1043 if (snap_seq != CEPH_NOSNAP)
1044 return -EROFS;
1045
1046 Context *oncomplete = new C_aio_Complete(c);
1047
9f95a23c 1048#if defined(WITH_EVENTTRACE)
7c673cae
FG
1049 ((C_aio_Complete *) oncomplete)->oid = oid;
1050#endif
1051 c->io = this;
1052 queue_aio_write(c);
1053
1054 Objecter::Op *o = objecter->prepare_writesame_op(
1055 oid, oloc,
1056 write_len, off,
f67539c2 1057 snapc, bl, ut, extra_op_flags,
7c673cae
FG
1058 oncomplete, &c->objver);
1059 objecter->op_submit(o, &c->tid);
1060
1061 return 0;
1062}
1063
1064int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags)
1065{
11fdf7f2 1066 FUNCTRACE(client->cct);
7c673cae
FG
1067 auto ut = ceph::real_clock::now();
1068
1069 /* can't write to a snapshot */
1070 if (snap_seq != CEPH_NOSNAP)
1071 return -EROFS;
1072
1073 Context *oncomplete = new C_aio_Complete(c);
1074
9f95a23c 1075#if defined(WITH_EVENTTRACE)
7c673cae
FG
1076 ((C_aio_Complete *) oncomplete)->oid = oid;
1077#endif
1078 c->io = this;
1079 queue_aio_write(c);
1080
1081 Objecter::Op *o = objecter->prepare_remove_op(
1082 oid, oloc,
f67539c2 1083 snapc, ut, flags | extra_op_flags,
7c673cae
FG
1084 oncomplete, &c->objver);
1085 objecter->op_submit(o, &c->tid);
1086
1087 return 0;
1088}
1089
1090
1091int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c,
1092 uint64_t *psize, time_t *pmtime)
1093{
1094 C_aio_stat_Ack *onack = new C_aio_stat_Ack(c, pmtime);
1095 c->is_read = true;
1096 c->io = this;
1097 Objecter::Op *o = objecter->prepare_stat_op(
1098 oid, oloc,
f67539c2 1099 snap_seq, psize, &onack->mtime, extra_op_flags,
7c673cae
FG
1100 onack, &c->objver);
1101 objecter->op_submit(o, &c->tid);
1102 return 0;
1103}
1104
1105int librados::IoCtxImpl::aio_stat2(const object_t& oid, AioCompletionImpl *c,
1106 uint64_t *psize, struct timespec *pts)
1107{
1108 C_aio_stat2_Ack *onack = new C_aio_stat2_Ack(c, pts);
1109 c->is_read = true;
1110 c->io = this;
1111 Objecter::Op *o = objecter->prepare_stat_op(
1112 oid, oloc,
f67539c2 1113 snap_seq, psize, &onack->mtime, extra_op_flags,
7c673cae
FG
1114 onack, &c->objver);
1115 objecter->op_submit(o, &c->tid);
1116 return 0;
1117}
1118
1119int librados::IoCtxImpl::aio_getxattr(const object_t& oid, AioCompletionImpl *c,
1120 const char *name, bufferlist& bl)
1121{
1122 ::ObjectOperation rd;
1123 prepare_assert_ops(&rd);
1124 rd.getxattr(name, &bl, NULL);
1125 int r = aio_operate_read(oid, &rd, c, 0, &bl);
1126 return r;
1127}
1128
1129int librados::IoCtxImpl::aio_rmxattr(const object_t& oid, AioCompletionImpl *c,
1130 const char *name)
1131{
1132 ::ObjectOperation op;
1133 prepare_assert_ops(&op);
1134 op.rmxattr(name);
1135 return aio_operate(oid, &op, c, snapc, 0);
1136}
1137
1138int librados::IoCtxImpl::aio_setxattr(const object_t& oid, AioCompletionImpl *c,
1139 const char *name, bufferlist& bl)
1140{
1141 ::ObjectOperation op;
1142 prepare_assert_ops(&op);
1143 op.setxattr(name, bl);
1144 return aio_operate(oid, &op, c, snapc, 0);
1145}
1146
1147namespace {
1148struct AioGetxattrsData {
1149 AioGetxattrsData(librados::AioCompletionImpl *c, map<string, bufferlist>* attrset,
1150 librados::RadosClient *_client) :
1151 user_completion(c), user_attrset(attrset), client(_client) {}
f67539c2 1152 struct librados::CB_AioCompleteAndSafe user_completion;
7c673cae
FG
1153 map<string, bufferlist> result_attrset;
1154 map<std::string, bufferlist>* user_attrset;
1155 librados::RadosClient *client;
1156};
1157}
1158
1159static void aio_getxattrs_complete(rados_completion_t c, void *arg) {
1160 AioGetxattrsData *cdata = reinterpret_cast<AioGetxattrsData*>(arg);
1161 int rc = rados_aio_get_return_value(c);
1162 cdata->user_attrset->clear();
1163 if (rc >= 0) {
1164 for (map<string,bufferlist>::iterator p = cdata->result_attrset.begin();
1165 p != cdata->result_attrset.end();
1166 ++p) {
1167 ldout(cdata->client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1168 (*cdata->user_attrset)[p->first] = p->second;
1169 }
1170 }
f67539c2 1171 cdata->user_completion(rc);
7c673cae
FG
1172 ((librados::AioCompletionImpl*)c)->put();
1173 delete cdata;
1174}
1175
1176int librados::IoCtxImpl::aio_getxattrs(const object_t& oid, AioCompletionImpl *c,
1177 map<std::string, bufferlist>& attrset)
1178{
1179 AioGetxattrsData *cdata = new AioGetxattrsData(c, &attrset, client);
1180 ::ObjectOperation rd;
1181 prepare_assert_ops(&rd);
1182 rd.getxattrs(&cdata->result_attrset, NULL);
1183 librados::AioCompletionImpl *comp = new librados::AioCompletionImpl;
1184 comp->set_complete_callback(cdata, aio_getxattrs_complete);
1185 return aio_operate_read(oid, &rd, comp, 0, NULL);
1186}
1187
1188int librados::IoCtxImpl::aio_cancel(AioCompletionImpl *c)
1189{
1190 return objecter->op_cancel(c->tid, -ECANCELED);
1191}
1192
1193
1194int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
1195 std::list< std::pair<time_t, time_t> > *pls)
1196{
1197 Context *oncomplete = new C_aio_Complete(c);
1198 c->is_read = true;
1199 c->io = this;
1200
1201 ::ObjectOperation rd;
1202 rd.hit_set_ls(pls, NULL);
1203 object_locator_t oloc(poolid);
1204 Objecter::Op *o = objecter->prepare_pg_read_op(
f67539c2 1205 hash, oloc, rd, NULL, extra_op_flags, oncomplete, NULL, NULL);
7c673cae
FG
1206 objecter->op_submit(o, &c->tid);
1207 return 0;
1208}
1209
1210int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
1211 time_t stamp,
1212 bufferlist *pbl)
1213{
1214 Context *oncomplete = new C_aio_Complete(c);
1215 c->is_read = true;
1216 c->io = this;
1217
1218 ::ObjectOperation rd;
1219 rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0);
1220 object_locator_t oloc(poolid);
1221 Objecter::Op *o = objecter->prepare_pg_read_op(
f67539c2 1222 hash, oloc, rd, NULL, extra_op_flags, oncomplete, NULL, NULL);
7c673cae
FG
1223 objecter->op_submit(o, &c->tid);
1224 return 0;
1225}
1226
1227int librados::IoCtxImpl::remove(const object_t& oid)
1228{
1229 ::ObjectOperation op;
1230 prepare_assert_ops(&op);
1231 op.remove();
94b18763 1232 return operate(oid, &op, nullptr, librados::OPERATION_FULL_FORCE);
7c673cae
FG
1233}
1234
1235int librados::IoCtxImpl::remove(const object_t& oid, int flags)
1236{
1237 ::ObjectOperation op;
1238 prepare_assert_ops(&op);
1239 op.remove();
1240 return operate(oid, &op, NULL, flags);
1241}
1242
1243int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size)
1244{
1245 ::ObjectOperation op;
1246 prepare_assert_ops(&op);
1247 op.truncate(size);
1248 return operate(oid, &op, NULL);
1249}
1250
1251int librados::IoCtxImpl::get_inconsistent_objects(const pg_t& pg,
1252 const librados::object_id_t& start_after,
1253 uint64_t max_to_get,
1254 AioCompletionImpl *c,
1255 std::vector<inconsistent_obj_t>* objects,
1256 uint32_t* interval)
1257{
1258 Context *oncomplete = new C_aio_Complete(c);
1259 c->is_read = true;
1260 c->io = this;
1261
1262 ::ObjectOperation op;
11fdf7f2 1263 op.scrub_ls(start_after, max_to_get, objects, interval, &c->rval);
7c673cae
FG
1264 object_locator_t oloc{poolid, pg.ps()};
1265 Objecter::Op *o = objecter->prepare_pg_read_op(
f67539c2 1266 oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP | extra_op_flags, oncomplete,
7c673cae
FG
1267 nullptr, nullptr);
1268 objecter->op_submit(o, &c->tid);
1269 return 0;
1270}
1271
1272int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t& pg,
1273 const librados::object_id_t& start_after,
1274 uint64_t max_to_get,
1275 AioCompletionImpl *c,
1276 std::vector<inconsistent_snapset_t>* snapsets,
1277 uint32_t* interval)
1278{
1279 Context *oncomplete = new C_aio_Complete(c);
1280 c->is_read = true;
1281 c->io = this;
1282
1283 ::ObjectOperation op;
11fdf7f2 1284 op.scrub_ls(start_after, max_to_get, snapsets, interval, &c->rval);
7c673cae
FG
1285 object_locator_t oloc{poolid, pg.ps()};
1286 Objecter::Op *o = objecter->prepare_pg_read_op(
f67539c2 1287 oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP | extra_op_flags, oncomplete,
7c673cae
FG
1288 nullptr, nullptr);
1289 objecter->op_submit(o, &c->tid);
1290 return 0;
1291}
1292
1293int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl)
1294{
1295 ::ObjectOperation wr;
1296 prepare_assert_ops(&wr);
1297 wr.tmap_update(cmdbl);
1298 return operate(oid, &wr, NULL);
1299}
1300
7c673cae
FG
1301int librados::IoCtxImpl::exec(const object_t& oid,
1302 const char *cls, const char *method,
1303 bufferlist& inbl, bufferlist& outbl)
1304{
1305 ::ObjectOperation rd;
1306 prepare_assert_ops(&rd);
1307 rd.call(cls, method, inbl);
1308 return operate_read(oid, &rd, &outbl);
1309}
1310
1311int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1312 const char *cls, const char *method,
1313 bufferlist& inbl, bufferlist *outbl)
1314{
11fdf7f2 1315 FUNCTRACE(client->cct);
7c673cae
FG
1316 Context *oncomplete = new C_aio_Complete(c);
1317
9f95a23c 1318#if defined(WITH_EVENTTRACE)
7c673cae
FG
1319 ((C_aio_Complete *) oncomplete)->oid = oid;
1320#endif
1321 c->is_read = true;
1322 c->io = this;
1323
1324 ::ObjectOperation rd;
1325 prepare_assert_ops(&rd);
1326 rd.call(cls, method, inbl);
1327 Objecter::Op *o = objecter->prepare_read_op(
f67539c2 1328 oid, oloc, rd, snap_seq, outbl, extra_op_flags, oncomplete, &c->objver);
7c673cae
FG
1329 objecter->op_submit(o, &c->tid);
1330 return 0;
1331}
1332
1333int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1334 const char *cls, const char *method,
1335 bufferlist& inbl, char *buf, size_t out_len)
1336{
11fdf7f2 1337 FUNCTRACE(client->cct);
7c673cae
FG
1338 Context *oncomplete = new C_aio_Complete(c);
1339
9f95a23c 1340#if defined(WITH_EVENTTRACE)
7c673cae
FG
1341 ((C_aio_Complete *) oncomplete)->oid = oid;
1342#endif
1343 c->is_read = true;
1344 c->io = this;
1345 c->bl.clear();
1346 c->bl.push_back(buffer::create_static(out_len, buf));
1347 c->blp = &c->bl;
1348 c->out_buf = buf;
1349
1350 ::ObjectOperation rd;
1351 prepare_assert_ops(&rd);
1352 rd.call(cls, method, inbl);
1353 Objecter::Op *o = objecter->prepare_read_op(
f67539c2 1354 oid, oloc, rd, snap_seq, &c->bl, extra_op_flags, oncomplete, &c->objver);
7c673cae
FG
1355 objecter->op_submit(o, &c->tid);
1356 return 0;
1357}
1358
1359int librados::IoCtxImpl::read(const object_t& oid,
1360 bufferlist& bl, size_t len, uint64_t off)
1361{
1362 if (len > (size_t) INT_MAX)
1363 return -EDOM;
1364 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
1365
1366 ::ObjectOperation rd;
1367 prepare_assert_ops(&rd);
1368 rd.read(off, len, &bl, NULL, NULL);
1369 int r = operate_read(oid, &rd, &bl);
1370 if (r < 0)
1371 return r;
1372
1373 if (bl.length() < len) {
1374 ldout(client->cct, 10) << "Returned length " << bl.length()
1375 << " less than original length "<< len << dendl;
1376 }
1377
1378 return bl.length();
1379}
1380
1381int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off,
1382 bufferlist& cmp_bl)
1383{
1384 if (cmp_bl.length() > UINT_MAX/2)
1385 return -E2BIG;
1386
1387 ::ObjectOperation op;
1388 prepare_assert_ops(&op);
1389 op.cmpext(off, cmp_bl, NULL);
1390 return operate_read(oid, &op, NULL);
1391}
1392
1393int librados::IoCtxImpl::mapext(const object_t& oid,
1394 uint64_t off, size_t len,
1395 std::map<uint64_t,uint64_t>& m)
1396{
1397 bufferlist bl;
1398
9f95a23c
TL
1399 ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::read::mylock");
1400 ceph::condition_variable cond;
7c673cae
FG
1401 bool done;
1402 int r;
9f95a23c 1403 Context *onack = new C_SafeCond(mylock, cond, &done, &r);
7c673cae
FG
1404
1405 objecter->mapext(oid, oloc,
f67539c2 1406 off, len, snap_seq, &bl, extra_op_flags,
7c673cae
FG
1407 onack);
1408
9f95a23c
TL
1409 {
1410 unique_lock l{mylock};
1411 cond.wait(l, [&done] { return done;});
1412 }
7c673cae
FG
1413 ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
1414
1415 if (r < 0)
1416 return r;
1417
11fdf7f2
TL
1418 auto iter = bl.cbegin();
1419 decode(m, iter);
7c673cae
FG
1420
1421 return m.size();
1422}
1423
1424int librados::IoCtxImpl::sparse_read(const object_t& oid,
1425 std::map<uint64_t,uint64_t>& m,
1426 bufferlist& data_bl, size_t len,
1427 uint64_t off)
1428{
1429 if (len > (size_t) INT_MAX)
1430 return -EDOM;
1431
1432 ::ObjectOperation rd;
1433 prepare_assert_ops(&rd);
1434 rd.sparse_read(off, len, &m, &data_bl, NULL);
1435
1436 int r = operate_read(oid, &rd, NULL);
1437 if (r < 0)
1438 return r;
1439
1440 return m.size();
1441}
1442
1443int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type,
1444 const bufferlist &init_value, size_t len,
1445 uint64_t off, size_t chunk_size,
1446 bufferlist *pbl)
1447{
1448 if (len > (size_t) INT_MAX) {
1449 return -EDOM;
1450 }
1451
1452 ::ObjectOperation rd;
1453 prepare_assert_ops(&rd);
1454 rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr);
1455
1456 int r = operate_read(oid, &rd, nullptr);
1457 if (r < 0) {
1458 return r;
1459 }
1460
1461 return 0;
1462}
1463
1464int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
1465{
1466 uint64_t size;
1467 real_time mtime;
1468
1469 if (!psize)
1470 psize = &size;
1471
1472 ::ObjectOperation rd;
1473 prepare_assert_ops(&rd);
f67539c2 1474 rd.stat(psize, &mtime, nullptr);
7c673cae
FG
1475 int r = operate_read(oid, &rd, NULL);
1476
1477 if (r >= 0 && pmtime) {
1478 *pmtime = real_clock::to_time_t(mtime);
1479 }
1480
1481 return r;
1482}
1483
1484int librados::IoCtxImpl::stat2(const object_t& oid, uint64_t *psize, struct timespec *pts)
1485{
1486 uint64_t size;
1487 ceph::real_time mtime;
1488
1489 if (!psize)
1490 psize = &size;
1491
1492 ::ObjectOperation rd;
1493 prepare_assert_ops(&rd);
f67539c2 1494 rd.stat(psize, &mtime, nullptr);
7c673cae
FG
1495 int r = operate_read(oid, &rd, NULL);
1496 if (r < 0) {
1497 return r;
1498 }
1499
1500 if (pts) {
1501 *pts = ceph::real_clock::to_timespec(mtime);
1502 }
1503
1504 return 0;
1505}
1506
1507int librados::IoCtxImpl::getxattr(const object_t& oid,
1508 const char *name, bufferlist& bl)
1509{
1510 ::ObjectOperation rd;
1511 prepare_assert_ops(&rd);
1512 rd.getxattr(name, &bl, NULL);
1513 int r = operate_read(oid, &rd, &bl);
1514 if (r < 0)
1515 return r;
1516
1517 return bl.length();
1518}
1519
1520int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name)
1521{
1522 ::ObjectOperation op;
1523 prepare_assert_ops(&op);
1524 op.rmxattr(name);
1525 return operate(oid, &op, NULL);
1526}
1527
1528int librados::IoCtxImpl::setxattr(const object_t& oid,
1529 const char *name, bufferlist& bl)
1530{
1531 ::ObjectOperation op;
1532 prepare_assert_ops(&op);
1533 op.setxattr(name, bl);
1534 return operate(oid, &op, NULL);
1535}
1536
1537int librados::IoCtxImpl::getxattrs(const object_t& oid,
1538 map<std::string, bufferlist>& attrset)
1539{
1540 map<string, bufferlist> aset;
1541
1542 ::ObjectOperation rd;
1543 prepare_assert_ops(&rd);
1544 rd.getxattrs(&aset, NULL);
1545 int r = operate_read(oid, &rd, NULL);
1546
1547 attrset.clear();
1548 if (r >= 0) {
1549 for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); ++p) {
1550 ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1551 attrset[p->first.c_str()] = p->second;
1552 }
1553 }
1554
1555 return r;
1556}
1557
1558void librados::IoCtxImpl::set_sync_op_version(version_t ver)
1559{
1560 ANNOTATE_BENIGN_RACE_SIZED(&last_objver, sizeof(last_objver),
1561 "IoCtxImpl last_objver");
1562 last_objver = ver;
1563}
1564
f67539c2
TL
1565namespace librados {
1566void intrusive_ptr_add_ref(IoCtxImpl *p) { p->get(); }
1567void intrusive_ptr_release(IoCtxImpl *p) { p->put(); }
1568}
1569
1570struct WatchInfo {
1571 boost::intrusive_ptr<librados::IoCtxImpl> ioctx;
7c673cae
FG
1572 object_t oid;
1573 librados::WatchCtx *ctx;
1574 librados::WatchCtx2 *ctx2;
7c673cae
FG
1575
1576 WatchInfo(librados::IoCtxImpl *io, object_t o,
f67539c2
TL
1577 librados::WatchCtx *c, librados::WatchCtx2 *c2)
1578 : ioctx(io), oid(o), ctx(c), ctx2(c2) {}
7c673cae
FG
1579
1580 void handle_notify(uint64_t notify_id,
1581 uint64_t cookie,
1582 uint64_t notifier_id,
f67539c2 1583 bufferlist& bl) {
7c673cae
FG
1584 ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
1585 << " cookie " << cookie
1586 << " notifier_id " << notifier_id
1587 << " len " << bl.length()
1588 << dendl;
1589
1590 if (ctx2)
1591 ctx2->handle_notify(notify_id, cookie, notifier_id, bl);
1592 if (ctx) {
1593 ctx->notify(0, 0, bl);
1594
1595 // send ACK back to OSD if using legacy protocol
1596 bufferlist empty;
1597 ioctx->notify_ack(oid, notify_id, cookie, empty);
1598 }
1599 }
f67539c2 1600 void handle_error(uint64_t cookie, int err) {
7c673cae
FG
1601 ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie
1602 << " err " << err
1603 << dendl;
1604 if (ctx2)
1605 ctx2->handle_error(cookie, err);
1606 }
f67539c2
TL
1607
1608 void operator()(bs::error_code ec,
1609 uint64_t notify_id,
1610 uint64_t cookie,
1611 uint64_t notifier_id,
1612 bufferlist&& bl) {
1613 if (ec) {
1614 handle_error(cookie, ceph::from_error_code(ec));
1615 } else {
1616 handle_notify(notify_id, cookie, notifier_id, bl);
1617 }
1618 }
1619};
1620
1621// internal WatchInfo that owns the context memory
1622struct InternalWatchInfo : public WatchInfo {
1623 std::unique_ptr<librados::WatchCtx> ctx;
1624 std::unique_ptr<librados::WatchCtx2> ctx2;
1625
1626 InternalWatchInfo(librados::IoCtxImpl *io, object_t o,
1627 librados::WatchCtx *c, librados::WatchCtx2 *c2)
1628 : WatchInfo(io, o, c, c2), ctx(c), ctx2(c2) {}
7c673cae
FG
1629};
1630
1631int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1632 librados::WatchCtx *ctx,
1633 librados::WatchCtx2 *ctx2,
1634 bool internal)
1635{
1636 return watch(oid, handle, ctx, ctx2, 0, internal);
1637}
1638
1639int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1640 librados::WatchCtx *ctx,
1641 librados::WatchCtx2 *ctx2,
1642 uint32_t timeout,
1643 bool internal)
1644{
1645 ::ObjectOperation wr;
1646 version_t objver;
1647 C_SaferCond onfinish;
1648
1649 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1650 *handle = linger_op->get_cookie();
f67539c2
TL
1651 if (internal) {
1652 linger_op->handle = InternalWatchInfo(this, oid, ctx, ctx2);
1653 } else {
1654 linger_op->handle = WatchInfo(this, oid, ctx, ctx2);
1655 }
7c673cae
FG
1656 prepare_assert_ops(&wr);
1657 wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1658 bufferlist bl;
1659 objecter->linger_watch(linger_op, wr,
1660 snapc, ceph::real_clock::now(), bl,
1661 &onfinish,
1662 &objver);
1663
1664 int r = onfinish.wait();
1665
1666 set_sync_op_version(objver);
1667
1668 if (r < 0) {
1669 objecter->linger_cancel(linger_op);
1670 *handle = 0;
1671 }
1672
1673 return r;
1674}
1675
1676int librados::IoCtxImpl::aio_watch(const object_t& oid,
1677 AioCompletionImpl *c,
1678 uint64_t *handle,
1679 librados::WatchCtx *ctx,
1680 librados::WatchCtx2 *ctx2,
1681 bool internal) {
1682 return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
1683}
1684
1685int librados::IoCtxImpl::aio_watch(const object_t& oid,
1686 AioCompletionImpl *c,
1687 uint64_t *handle,
1688 librados::WatchCtx *ctx,
1689 librados::WatchCtx2 *ctx2,
1690 uint32_t timeout,
1691 bool internal)
1692{
1693 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1694 c->io = this;
1695 Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);
1696
1697 ::ObjectOperation wr;
1698 *handle = linger_op->get_cookie();
f67539c2
TL
1699 if (internal) {
1700 linger_op->handle = InternalWatchInfo(this, oid, ctx, ctx2);
1701 } else {
1702 linger_op->handle = WatchInfo(this, oid, ctx, ctx2);
1703 }
7c673cae
FG
1704
1705 prepare_assert_ops(&wr);
1706 wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1707 bufferlist bl;
1708 objecter->linger_watch(linger_op, wr,
1709 snapc, ceph::real_clock::now(), bl,
1710 oncomplete, &c->objver);
1711
1712 return 0;
1713}
1714
1715
1716int librados::IoCtxImpl::notify_ack(
1717 const object_t& oid,
1718 uint64_t notify_id,
1719 uint64_t cookie,
1720 bufferlist& bl)
1721{
1722 ::ObjectOperation rd;
1723 prepare_assert_ops(&rd);
1724 rd.notify_ack(notify_id, cookie, bl);
f67539c2 1725 objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, extra_op_flags, 0, 0);
7c673cae
FG
1726 return 0;
1727}
1728
1729int librados::IoCtxImpl::watch_check(uint64_t cookie)
1730{
f67539c2
TL
1731 auto linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1732 auto r = objecter->linger_check(linger_op);
1733 if (r)
1734 return 1 + std::chrono::duration_cast<
1735 std::chrono::milliseconds>(*r).count();
1736 else
1737 return ceph::from_error_code(r.error());
7c673cae
FG
1738}
1739
1740int librados::IoCtxImpl::unwatch(uint64_t cookie)
1741{
1742 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1743 C_SaferCond onfinish;
1744 version_t ver = 0;
1745
1746 ::ObjectOperation wr;
1747 prepare_assert_ops(&wr);
1748 wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1749 objecter->mutate(linger_op->target.base_oid, oloc, wr,
f67539c2 1750 snapc, ceph::real_clock::now(), extra_op_flags,
7c673cae
FG
1751 &onfinish, &ver);
1752 objecter->linger_cancel(linger_op);
1753
1754 int r = onfinish.wait();
1755 set_sync_op_version(ver);
1756 return r;
1757}
1758
1759int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
1760{
1761 c->io = this;
1762 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1763 Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
1764
1765 ::ObjectOperation wr;
1766 prepare_assert_ops(&wr);
1767 wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1768 objecter->mutate(linger_op->target.base_oid, oloc, wr,
f67539c2 1769 snapc, ceph::real_clock::now(), extra_op_flags,
7c673cae
FG
1770 oncomplete, &c->objver);
1771 return 0;
1772}
1773
1774int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
1775 uint64_t timeout_ms,
1776 bufferlist *preply_bl,
1777 char **preply_buf, size_t *preply_buf_len)
1778{
1779 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1780
1781 C_SaferCond notify_finish_cond;
f67539c2
TL
1782 linger_op->on_notify_finish =
1783 Objecter::LingerOp::OpComp::create(
1784 objecter->service.get_executor(),
1785 CB_notify_Finish(client->cct, &notify_finish_cond,
1786 objecter, linger_op, preply_bl,
1787 preply_buf, preply_buf_len));
7c673cae
FG
1788 uint32_t timeout = notify_timeout;
1789 if (timeout_ms)
1790 timeout = timeout_ms / 1000;
1791
1792 // Construct RADOS op
1793 ::ObjectOperation rd;
1794 prepare_assert_ops(&rd);
1795 bufferlist inbl;
1796 rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1797
1798 // Issue RADOS op
1799 C_SaferCond onack;
1800 version_t objver;
1801 objecter->linger_notify(linger_op,
1802 rd, snap_seq, inbl, NULL,
1803 &onack, &objver);
1804
1805 ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
1806 int r = onack.wait();
1807 ldout(client->cct, 10) << __func__ << " linger op " << linger_op
1808 << " acked (" << r << ")" << dendl;
1809
1810 if (r == 0) {
1811 ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
1812 << linger_op << dendl;
1813 r = notify_finish_cond.wait();
1814
1815 } else {
1816 ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
1817 << r << dendl;
28e407b8 1818 notify_finish_cond.wait();
7c673cae
FG
1819 }
1820
1821 objecter->linger_cancel(linger_op);
1822
1823 set_sync_op_version(objver);
1824 return r;
1825}
1826
1827int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
1828 bufferlist& bl, uint64_t timeout_ms,
1829 bufferlist *preply_bl, char **preply_buf,
1830 size_t *preply_buf_len)
1831{
1832 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1833
1834 c->io = this;
1835
1836 C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);
f67539c2
TL
1837 linger_op->on_notify_finish =
1838 Objecter::LingerOp::OpComp::create(
1839 objecter->service.get_executor(),
1840 CB_notify_Finish(client->cct, oncomplete,
1841 objecter, linger_op,
1842 preply_bl, preply_buf,
1843 preply_buf_len));
1844 Context *onack = new C_aio_notify_Ack(client->cct, oncomplete);
7c673cae
FG
1845
1846 uint32_t timeout = notify_timeout;
1847 if (timeout_ms)
1848 timeout = timeout_ms / 1000;
1849
1850 // Construct RADOS op
1851 ::ObjectOperation rd;
1852 prepare_assert_ops(&rd);
1853 bufferlist inbl;
1854 rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1855
1856 // Issue RADOS op
1857 objecter->linger_notify(linger_op,
1858 rd, snap_seq, inbl, NULL,
1859 onack, &c->objver);
1860 return 0;
1861}
1862
1863int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
1864 uint64_t expected_object_size,
1865 uint64_t expected_write_size,
1866 uint32_t flags)
1867{
1868 ::ObjectOperation wr;
1869 prepare_assert_ops(&wr);
1870 wr.set_alloc_hint(expected_object_size, expected_write_size, flags);
1871 return operate(oid, &wr, NULL);
1872}
1873
1874version_t librados::IoCtxImpl::last_version()
1875{
1876 return last_objver;
1877}
1878
1879void librados::IoCtxImpl::set_assert_version(uint64_t ver)
1880{
1881 assert_ver = ver;
1882}
1883
1884void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout)
1885{
1886 notify_timeout = timeout;
1887}
1888
1889int librados::IoCtxImpl::cache_pin(const object_t& oid)
1890{
1891 ::ObjectOperation wr;
1892 prepare_assert_ops(&wr);
1893 wr.cache_pin();
1894 return operate(oid, &wr, NULL);
1895}
1896
1897int librados::IoCtxImpl::cache_unpin(const object_t& oid)
1898{
1899 ::ObjectOperation wr;
1900 prepare_assert_ops(&wr);
1901 wr.cache_unpin();
1902 return operate(oid, &wr, NULL);
1903}
1904
1905
1906///////////////////////////// C_aio_stat_Ack ////////////////////////////
1907
1908librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl *_c,
1909 time_t *pm)
1910 : c(_c), pmtime(pm)
1911{
11fdf7f2 1912 ceph_assert(!c->io);
7c673cae
FG
1913 c->get();
1914}
1915
1916void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r)
1917{
9f95a23c 1918 c->lock.lock();
7c673cae
FG
1919 c->rval = r;
1920 c->complete = true;
9f95a23c 1921 c->cond.notify_all();
7c673cae
FG
1922
1923 if (r >= 0 && pmtime) {
1924 *pmtime = real_clock::to_time_t(mtime);
1925 }
1926
1927 if (c->callback_complete) {
f67539c2 1928 boost::asio::defer(c->io->client->finish_strand, CB_AioComplete(c));
7c673cae
FG
1929 }
1930
1931 c->put_unlock();
1932}
1933
1934///////////////////////////// C_aio_stat2_Ack ////////////////////////////
1935
1936librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl *_c,
1937 struct timespec *pt)
1938 : c(_c), pts(pt)
1939{
11fdf7f2 1940 ceph_assert(!c->io);
7c673cae
FG
1941 c->get();
1942}
1943
1944void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r)
1945{
9f95a23c 1946 c->lock.lock();
7c673cae
FG
1947 c->rval = r;
1948 c->complete = true;
9f95a23c 1949 c->cond.notify_all();
7c673cae
FG
1950
1951 if (r >= 0 && pts) {
1952 *pts = real_clock::to_timespec(mtime);
1953 }
1954
1955 if (c->callback_complete) {
f67539c2 1956 boost::asio::defer(c->io->client->finish_strand, CB_AioComplete(c));
7c673cae
FG
1957 }
1958
1959 c->put_unlock();
1960}
1961
1962//////////////////////////// C_aio_Complete ////////////////////////////////
1963
1964librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl *_c)
1965 : c(_c)
1966{
1967 c->get();
1968}
1969
1970void librados::IoCtxImpl::C_aio_Complete::finish(int r)
1971{
9f95a23c 1972 c->lock.lock();
11fdf7f2
TL
1973 // Leave an existing rval unless r != 0
1974 if (r)
1975 c->rval = r; // This clears the error set in C_ObjectOperation_scrub_ls::finish()
7c673cae 1976 c->complete = true;
9f95a23c 1977 c->cond.notify_all();
7c673cae
FG
1978
1979 if (r == 0 && c->blp && c->blp->length() > 0) {
1adf2230
AA
1980 if (c->out_buf && !c->blp->is_contiguous()) {
1981 c->rval = -ERANGE;
1982 } else {
11fdf7f2 1983 if (c->out_buf && !c->blp->is_provided_buffer(c->out_buf))
9f95a23c 1984 c->blp->begin().copy(c->blp->length(), c->out_buf);
11fdf7f2 1985
1adf2230
AA
1986 c->rval = c->blp->length();
1987 }
7c673cae
FG
1988 }
1989
1990 if (c->callback_complete ||
1991 c->callback_safe) {
f67539c2 1992 boost::asio::defer(c->io->client->finish_strand, CB_AioComplete(c));
7c673cae
FG
1993 }
1994
1995 if (c->aio_write_seq) {
1996 c->io->complete_aio_write(c);
1997 }
1998
9f95a23c 1999#if defined(WITH_EVENTTRACE)
7c673cae
FG
2000 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE");
2001#endif
2002 c->put_unlock();
2003}
2004
2005void librados::IoCtxImpl::object_list_slice(
2006 const hobject_t start,
2007 const hobject_t finish,
2008 const size_t n,
2009 const size_t m,
2010 hobject_t *split_start,
2011 hobject_t *split_finish)
2012{
2013 if (start.is_max()) {
2014 *split_start = hobject_t::get_max();
2015 *split_finish = hobject_t::get_max();
2016 return;
2017 }
2018
2019 uint64_t start_hash = hobject_t::_reverse_bits(start.get_hash());
2020 uint64_t finish_hash =
2021 finish.is_max() ? 0x100000000 :
2022 hobject_t::_reverse_bits(finish.get_hash());
2023
2024 uint64_t diff = finish_hash - start_hash;
2025 uint64_t rev_start = start_hash + (diff * n / m);
2026 uint64_t rev_finish = start_hash + (diff * (n + 1) / m);
2027 if (n == 0) {
2028 *split_start = start;
2029 } else {
2030 *split_start = hobject_t(
2031 object_t(), string(), CEPH_NOSNAP,
2032 hobject_t::_reverse_bits(rev_start), poolid, string());
2033 }
2034
2035 if (n == m - 1)
2036 *split_finish = finish;
2037 else if (rev_finish >= 0x100000000)
2038 *split_finish = hobject_t::get_max();
2039 else
2040 *split_finish = hobject_t(
2041 object_t(), string(), CEPH_NOSNAP,
2042 hobject_t::_reverse_bits(rev_finish), poolid, string());
2043}
2044
c07f9fc5
FG
2045int librados::IoCtxImpl::application_enable(const std::string& app_name,
2046 bool force)
2047{
2048 auto c = new PoolAsyncCompletionImpl();
2049 application_enable_async(app_name, force, c);
2050
2051 int r = c->wait();
11fdf7f2 2052 ceph_assert(r == 0);
c07f9fc5
FG
2053
2054 r = c->get_return_value();
2055 c->release();
f67539c2 2056 c->put();
c07f9fc5
FG
2057 if (r < 0) {
2058 return r;
2059 }
2060
2061 return client->wait_for_latest_osdmap();
2062}
2063
2064void librados::IoCtxImpl::application_enable_async(const std::string& app_name,
2065 bool force,
2066 PoolAsyncCompletionImpl *c)
2067{
2068 // pre-Luminous clusters will return -EINVAL and application won't be
2069 // preserved until Luminous is configured as minimim version.
2070 if (!client->get_required_monitor_features().contains_all(
2071 ceph::features::mon::FEATURE_LUMINOUS)) {
f67539c2
TL
2072 boost::asio::defer(client->finish_strand,
2073 [cb = CB_PoolAsync_Safe(c)]() mutable {
2074 cb(-EOPNOTSUPP);
2075 });
c07f9fc5
FG
2076 return;
2077 }
2078
2079 std::stringstream cmd;
2080 cmd << "{"
2081 << "\"prefix\": \"osd pool application enable\","
2082 << "\"pool\": \"" << get_cached_pool_name() << "\","
2083 << "\"app\": \"" << app_name << "\"";
2084 if (force) {
11fdf7f2 2085 cmd << ",\"yes_i_really_mean_it\": true";
c07f9fc5
FG
2086 }
2087 cmd << "}";
2088
2089 std::vector<std::string> cmds;
2090 cmds.push_back(cmd.str());
2091 bufferlist inbl;
2092 client->mon_command_async(cmds, inbl, nullptr, nullptr,
f67539c2 2093 make_lambda_context(CB_PoolAsync_Safe(c)));
c07f9fc5
FG
2094}
2095
2096int librados::IoCtxImpl::application_list(std::set<std::string> *app_names)
2097{
2098 int r = 0;
2099 app_names->clear();
2100 objecter->with_osdmap([&](const OSDMap& o) {
2101 auto pg_pool = o.get_pg_pool(poolid);
2102 if (pg_pool == nullptr) {
2103 r = -ENOENT;
2104 return;
2105 }
2106
2107 for (auto &pair : pg_pool->application_metadata) {
2108 app_names->insert(pair.first);
2109 }
2110 });
2111 return r;
2112}
2113
2114int librados::IoCtxImpl::application_metadata_get(const std::string& app_name,
2115 const std::string &key,
2116 std::string* value)
2117{
2118 int r = 0;
2119 objecter->with_osdmap([&](const OSDMap& o) {
2120 auto pg_pool = o.get_pg_pool(poolid);
2121 if (pg_pool == nullptr) {
2122 r = -ENOENT;
2123 return;
2124 }
2125
2126 auto app_it = pg_pool->application_metadata.find(app_name);
2127 if (app_it == pg_pool->application_metadata.end()) {
2128 r = -ENOENT;
2129 return;
2130 }
2131
2132 auto it = app_it->second.find(key);
2133 if (it == app_it->second.end()) {
2134 r = -ENOENT;
2135 return;
2136 }
2137
2138 *value = it->second;
2139 });
2140 return r;
2141}
2142
2143int librados::IoCtxImpl::application_metadata_set(const std::string& app_name,
2144 const std::string &key,
2145 const std::string& value)
2146{
2147 std::stringstream cmd;
2148 cmd << "{"
2149 << "\"prefix\":\"osd pool application set\","
2150 << "\"pool\":\"" << get_cached_pool_name() << "\","
2151 << "\"app\":\"" << app_name << "\","
2152 << "\"key\":\"" << key << "\","
2153 << "\"value\":\"" << value << "\""
2154 << "}";
2155
2156 std::vector<std::string> cmds;
2157 cmds.push_back(cmd.str());
2158 bufferlist inbl;
2159 int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2160 if (r < 0) {
2161 return r;
2162 }
2163
2164 // ensure we have the latest osd map epoch before proceeding
2165 return client->wait_for_latest_osdmap();
2166}
2167
2168int librados::IoCtxImpl::application_metadata_remove(const std::string& app_name,
2169 const std::string &key)
2170{
2171 std::stringstream cmd;
2172 cmd << "{"
2173 << "\"prefix\":\"osd pool application rm\","
2174 << "\"pool\":\"" << get_cached_pool_name() << "\","
2175 << "\"app\":\"" << app_name << "\","
2176 << "\"key\":\"" << key << "\""
2177 << "}";
2178
2179 std::vector<std::string> cmds;
2180 cmds.push_back(cmd.str());
2181 bufferlist inbl;
2182 int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2183 if (r < 0) {
2184 return r;
2185 }
2186
2187 // ensure we have the latest osd map epoch before proceeding
2188 return client->wait_for_latest_osdmap();
2189}
2190
2191int librados::IoCtxImpl::application_metadata_list(const std::string& app_name,
2192 std::map<std::string, std::string> *values)
2193{
2194 int r = 0;
2195 values->clear();
2196 objecter->with_osdmap([&](const OSDMap& o) {
2197 auto pg_pool = o.get_pg_pool(poolid);
2198 if (pg_pool == nullptr) {
2199 r = -ENOENT;
2200 return;
2201 }
2202
2203 auto it = pg_pool->application_metadata.find(app_name);
2204 if (it == pg_pool->application_metadata.end()) {
2205 r = -ENOENT;
2206 return;
2207 }
2208
2209 *values = it->second;
2210 });
2211 return r;
2212}
2213