]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2020 Red Hat, Inc | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | */ | |
14 | ||
15 | #include "rgw_sync_error_repo.h" | |
16 | #include "rgw_coroutine.h" | |
17 | #include "rgw_sal.h" | |
18 | #include "services/svc_rados.h" | |
19 | #include "cls/cmpomap/client.h" | |
20 | ||
21 | ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl) | |
22 | { | |
23 | uint64_t value; | |
24 | try { | |
25 | using ceph::decode; | |
26 | decode(value, bl); | |
27 | } catch (const buffer::error&) { | |
28 | value = 0; // empty buffer = 0 | |
29 | } | |
30 | return ceph::real_clock::zero() + ceph::timespan(value); | |
31 | } | |
32 | ||
33 | int rgw_error_repo_write(librados::ObjectWriteOperation& op, | |
34 | const std::string& key, | |
35 | ceph::real_time timestamp) | |
36 | { | |
37 | // overwrite the existing timestamp if value is greater | |
38 | const uint64_t value = timestamp.time_since_epoch().count(); | |
39 | using namespace cls::cmpomap; | |
40 | const bufferlist zero = u64_buffer(0); // compare against 0 for missing keys | |
41 | return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero); | |
42 | } | |
43 | ||
44 | int rgw_error_repo_remove(librados::ObjectWriteOperation& op, | |
45 | const std::string& key, | |
46 | ceph::real_time timestamp) | |
47 | { | |
48 | // remove the omap key if value >= existing | |
49 | const uint64_t value = timestamp.time_since_epoch().count(); | |
50 | using namespace cls::cmpomap; | |
51 | return cmp_rm_keys(op, Mode::U64, Op::GTE, {{key, u64_buffer(value)}}); | |
52 | } | |
53 | ||
54 | class RGWErrorRepoWriteCR : public RGWSimpleCoroutine { | |
55 | RGWSI_RADOS::Obj obj; | |
56 | std::string key; | |
57 | ceph::real_time timestamp; | |
58 | ||
59 | boost::intrusive_ptr<RGWAioCompletionNotifier> cn; | |
60 | public: | |
61 | RGWErrorRepoWriteCR(RGWSI_RADOS* rados, const rgw_raw_obj& raw_obj, | |
62 | const std::string& key, ceph::real_time timestamp) | |
63 | : RGWSimpleCoroutine(rados->ctx()), | |
64 | obj(rados->obj(raw_obj)), | |
65 | key(key), timestamp(timestamp) | |
66 | {} | |
67 | ||
b3b6e05e | 68 | int send_request(const DoutPrefixProvider *dpp) override { |
f67539c2 TL |
69 | librados::ObjectWriteOperation op; |
70 | int r = rgw_error_repo_write(op, key, timestamp); | |
71 | if (r < 0) { | |
72 | return r; | |
73 | } | |
b3b6e05e | 74 | r = obj.open(dpp); |
f67539c2 TL |
75 | if (r < 0) { |
76 | return r; | |
77 | } | |
78 | ||
79 | cn = stack->create_completion_notifier(); | |
80 | return obj.aio_operate(cn->completion(), &op); | |
81 | } | |
82 | ||
83 | int request_complete() override { | |
84 | return cn->completion()->get_return_value(); | |
85 | } | |
86 | }; | |
87 | ||
88 | RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados, | |
89 | const rgw_raw_obj& obj, | |
90 | const std::string& key, | |
91 | ceph::real_time timestamp) | |
92 | { | |
93 | return new RGWErrorRepoWriteCR(rados, obj, key, timestamp); | |
94 | } | |
95 | ||
96 | ||
97 | class RGWErrorRepoRemoveCR : public RGWSimpleCoroutine { | |
98 | RGWSI_RADOS::Obj obj; | |
99 | std::string key; | |
100 | ceph::real_time timestamp; | |
101 | ||
102 | boost::intrusive_ptr<RGWAioCompletionNotifier> cn; | |
103 | public: | |
104 | RGWErrorRepoRemoveCR(RGWSI_RADOS* rados, const rgw_raw_obj& raw_obj, | |
105 | const std::string& key, ceph::real_time timestamp) | |
106 | : RGWSimpleCoroutine(rados->ctx()), | |
107 | obj(rados->obj(raw_obj)), | |
108 | key(key), timestamp(timestamp) | |
109 | {} | |
110 | ||
b3b6e05e | 111 | int send_request(const DoutPrefixProvider *dpp) override { |
f67539c2 TL |
112 | librados::ObjectWriteOperation op; |
113 | int r = rgw_error_repo_remove(op, key, timestamp); | |
114 | if (r < 0) { | |
115 | return r; | |
116 | } | |
b3b6e05e | 117 | r = obj.open(dpp); |
f67539c2 TL |
118 | if (r < 0) { |
119 | return r; | |
120 | } | |
121 | ||
122 | cn = stack->create_completion_notifier(); | |
123 | return obj.aio_operate(cn->completion(), &op); | |
124 | } | |
125 | ||
126 | int request_complete() override { | |
127 | return cn->completion()->get_return_value(); | |
128 | } | |
129 | }; | |
130 | ||
131 | RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados, | |
132 | const rgw_raw_obj& obj, | |
133 | const std::string& key, | |
134 | ceph::real_time timestamp) | |
135 | { | |
136 | return new RGWErrorRepoRemoveCR(rados, obj, key, timestamp); | |
137 | } |