*
*/
+#include <algorithm>
#include <cstdint>
#include <numeric>
#include <optional>
op->exec(fifo::op::CLASS, fifo::op::UPDATE_META, in);
}
-void part_init(lr::ObjectWriteOperation* op, std::string_view tag,
- fifo::data_params params)
+void part_init(lr::ObjectWriteOperation* op, fifo::data_params params)
{
fifo::op::init_part ip;
- ip.tag = tag;
ip.params = params;
cb::list in;
op->exec(fifo::op::CLASS, fifo::op::INIT_PART, in);
}
-int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
+int push_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
std::deque<cb::list> data_bufs, std::uint64_t tid,
optional_yield y)
{
lr::ObjectWriteOperation op;
fifo::op::push_part pp;
- pp.tag = tag;
+ op.assert_exists();
+
pp.data_bufs = data_bufs;
pp.total_len = 0;
return retval;
}
-void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
+void push_part(lr::IoCtx& ioctx, const std::string& oid,
std::deque<cb::list> data_bufs, std::uint64_t tid,
lr::AioCompletion* c)
{
lr::ObjectWriteOperation op;
fifo::op::push_part pp;
- pp.tag = tag;
pp.data_bufs = data_bufs;
pp.total_len = 0;
}
void trim_part(lr::ObjectWriteOperation* op,
- std::optional<std::string_view> tag,
std::uint64_t ofs, bool exclusive)
{
fifo::op::trim_part tp;
- tp.tag = tag;
tp.ofs = ofs;
tp.exclusive = exclusive;
}
int list_part(const DoutPrefixProvider *dpp, lr::IoCtx& ioctx, const std::string& oid,
- std::optional<std::string_view> tag, std::uint64_t ofs,
- std::uint64_t max_entries,
+ std::uint64_t ofs, std::uint64_t max_entries,
std::vector<fifo::part_list_entry>* entries,
- bool* more, bool* full_part, std::string* ptag,
+ bool* more, bool* full_part,
std::uint64_t tid, optional_yield y)
{
lr::ObjectReadOperation op;
fifo::op::list_part lp;
- lp.tag = tag;
lp.ofs = ofs;
lp.max_entries = max_entries;
if (entries) *entries = std::move(reply.entries);
if (more) *more = reply.more;
if (full_part) *full_part = reply.full_part;
- if (ptag) *ptag = reply.tag;
} catch (const cb::error& err) {
ldpp_dout(dpp, -1)
<< __PRETTY_FUNCTION__ << ":" << __LINE__
std::vector<fifo::part_list_entry>* entries;
bool* more;
bool* full_part;
- std::string* ptag;
std::uint64_t tid;
list_entry_completion(CephContext* cct, int* r_out, std::vector<fifo::part_list_entry>* entries,
- bool* more, bool* full_part, std::string* ptag,
- std::uint64_t tid)
+ bool* more, bool* full_part, std::uint64_t tid)
: cct(cct), r_out(r_out), entries(entries), more(more),
- full_part(full_part), ptag(ptag), tid(tid) {}
+ full_part(full_part), tid(tid) {}
virtual ~list_entry_completion() = default;
void handle_completion(int r, bufferlist& bl) override {
if (r >= 0) try {
if (entries) *entries = std::move(reply.entries);
if (more) *more = reply.more;
if (full_part) *full_part = reply.full_part;
- if (ptag) *ptag = reply.tag;
} catch (const cb::error& err) {
lderr(cct)
<< __PRETTY_FUNCTION__ << ":" << __LINE__
};
lr::ObjectReadOperation list_part(CephContext* cct,
- std::optional<std::string_view> tag,
std::uint64_t ofs,
std::uint64_t max_entries,
int* r_out,
std::vector<fifo::part_list_entry>* entries,
bool* more, bool* full_part,
- std::string* ptag, std::uint64_t tid)
+ std::uint64_t tid)
{
lr::ObjectReadOperation op;
fifo::op::list_part lp;
- lp.tag = tag;
lp.ofs = ofs;
lp.max_entries = max_entries;
encode(lp, in);
op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
new list_entry_completion(cct, r_out, entries, more, full_part,
- ptag, tid));
+ tid));
return op;
}
return m;
}
-std::string FIFO::generate_tag() const
-{
- static constexpr auto HEADER_TAG_SIZE = 16;
- return gen_rand_alphanumeric_plain(static_cast<CephContext*>(ioctx.cct()),
- HEADER_TAG_SIZE);
-}
-
-
int FIFO::apply_update(const DoutPrefixProvider *dpp,
fifo::info* info,
const fifo::objv& objv,
std::unique_lock l(m);
if (objv != info->version) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " version mismatch, canceling: tid=" << tid << dendl;
- return -ECANCELED;
- }
- auto err = info->apply_update(update);
- if (err) {
- ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " error applying update: " << *err << " tid=" << tid << dendl;
+ << " version mismatch, canceling: tid=" << tid << dendl;
return -ECANCELED;
}
- ++info->version.ver;
-
+ info->apply_update(update);
return {};
}
<< " entering: tid=" << tid << dendl;
lr::ObjectWriteOperation op;
bool canceled = false;
- update_meta(&op, info.version, update);
+ update_meta(&op, version, update);
auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
if (r >= 0 || r == -ECANCELED) {
canceled = (r == -ECANCELED);
assert(r >= 0);
}
-int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
+int FIFO::create_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
optional_yield y)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
op.create(false); /* We don't need exclusivity, part_init ensures
we're creating from the same journal entry. */
std::unique_lock l(m);
- part_init(&op, tag, info.params);
+ part_init(&op, info.params);
auto oid = info.part_oid(part_num);
l.unlock();
auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
return r;
}
-int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::string_view tag, std::uint64_t tid,
+int FIFO::remove_part(const DoutPrefixProvider *dpp, int64_t part_num, std::uint64_t tid,
optional_yield y)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
l.unlock();
int r = 0;
- for (auto& [n, entry] : tmpjournal) {
+ for (auto& entry : tmpjournal) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " processing entry: entry=" << entry << " tid=" << tid
<< dendl;
switch (entry.op) {
case fifo::journal_entry::Op::create:
- r = create_part(dpp, entry.part_num, entry.part_tag, tid, y);
+ r = create_part(dpp, entry.part_num, tid, y);
if (entry.part_num > new_max) {
new_max = entry.part_num;
}
}
break;
case fifo::journal_entry::Op::remove:
- r = remove_part(dpp, entry.part_num, entry.part_tag, tid, y);
+ r = remove_part(dpp, entry.part_num, tid, y);
if (r == -ENOENT) r = 0;
if (entry.part_num >= new_tail) {
new_tail = entry.part_num + 1;
<< " update canceled, retrying: i=" << i << " tid="
<< tid << dendl;
for (auto& e : processed) {
- auto jiter = info.journal.find(e.part_num);
- /* journal entry was already processed */
- if (jiter == info.journal.end() ||
- !(jiter->second == e)) {
- continue;
+ if (info.journal.contains(e)) {
+ new_processed.push_back(e);
}
- new_processed.push_back(e);
}
processed = std::move(new_processed);
}
return r;
}
-int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, optional_yield y)
+int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp,
+ std::int64_t new_part_num, bool is_head,
+ std::uint64_t tid, optional_yield y)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
std::unique_lock l(m);
- std::vector jentries = { info.next_journal_entry(generate_tag()) };
- if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
+ std::vector<fifo::journal_entry> jentries{{ fifo::journal_entry::Op::create, new_part_num }};
+ if (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) &&
+ (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}))) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
}
return r;
}
- std::int64_t new_head_part_num = info.head_part_num;
auto version = info.version;
if (is_head) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " needs new head: tid=" << tid << dendl;
- auto new_head_jentry = jentries.front();
- new_head_jentry.op = fifo::journal_entry::Op::set_head;
- new_head_part_num = jentries.front().part_num;
- jentries.push_back(std::move(new_head_jentry));
+ jentries.push_back({ fifo::journal_entry::Op::set_head, new_part_num });
}
l.unlock();
r = _update_meta(dpp, u, version, &canceled, tid, y);
if (r >= 0 && canceled) {
std::unique_lock l(m);
- auto found = (info.journal.find(jentries.front().part_num) !=
- info.journal.end());
- if ((info.max_push_part_num >= jentries.front().part_num &&
- info.head_part_num >= new_head_part_num)) {
+ version = info.version;
+ auto found = (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) ||
+ info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}));
+ if ((info.max_push_part_num >= new_part_num &&
+ info.head_part_num >= new_part_num)) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " raced, but journaled and processed: i=" << i
<< " tid=" << tid << dendl;
return r;
}
-int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
+int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp,
+ std::int64_t new_head_part_num,
+ std::uint64_t tid, optional_yield y)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " entering: tid=" << tid << dendl;
+ << " entering: tid=" << tid << dendl;
std::unique_lock l(m);
- std::int64_t new_head_num = info.head_part_num + 1;
auto max_push_part_num = info.max_push_part_num;
auto version = info.version;
l.unlock();
int r = 0;
- if (max_push_part_num < new_head_num) {
+ if (max_push_part_num < new_head_part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " need new part: tid=" << tid << dendl;
- r = _prepare_new_part(dpp, true, tid, y);
+ r = _prepare_new_part(dpp, new_head_part_num, true, tid, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " _prepare_new_part failed: r=" << r
return r;
}
std::unique_lock l(m);
- if (info.max_push_part_num < new_head_num) {
+ if (info.max_push_part_num < new_head_part_num) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " inconsistency, push part less than head part: "
<< " tid=" << tid << dendl;
return 0;
}
+ fifo::journal_entry jentry;
+ jentry.op = fifo::journal_entry::Op::set_head;
+ jentry.part_num = new_head_part_num;
+
+ r = 0;
bool canceled = true;
for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+ canceled = false;
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " updating head: i=" << i << " tid=" << tid << dendl;
- auto u = fifo::update{}.head_part_num(new_head_num);
+ << " updating metadata: i=" << i << " tid=" << tid << dendl;
+ auto u = fifo::update{}.journal_entries_add({{ jentry }});
r = _update_meta(dpp, u, version, &canceled, tid, y);
+ if (r >= 0 && canceled) {
+ std::unique_lock l(m);
+ auto found = (info.journal.contains({fifo::journal_entry::Op::create, new_head_part_num}) ||
+ info.journal.contains({fifo::journal_entry::Op::set_head, new_head_part_num}));
+ version = info.version;
+ if ((info.head_part_num >= new_head_part_num)) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but journaled and processed: i=" << i
+ << " tid=" << tid << dendl;
+ return 0;
+ }
+ if (found) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, journaled but not processed: i=" << i
+ << " tid=" << tid << dendl;
+ canceled = false;
+ }
+ l.unlock();
+ }
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " _update_meta failed: update=" << u << " r=" << r
<< " tid=" << tid << dendl;
return r;
}
- std::unique_lock l(m);
- auto head_part_num = info.head_part_num;
- version = info.version;
- l.unlock();
- if (canceled && (head_part_num >= new_head_num)) {
- ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " raced, but completed by the other caller: i=" << i
- << " tid=" << tid << dendl;
- canceled = false;
- }
}
if (canceled) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " canceled too many times, giving up: tid=" << tid << dendl;
return -ECANCELED;
}
- return 0;
+ r = process_journal(dpp, tid, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " process_journal failed: r=" << r << " tid=" << tid << dendl;
+ }
+ return r;
}
struct NewPartPreparer : public Completion<NewPartPreparer> {
FIFO* f;
std::vector<fifo::journal_entry> jentries;
int i = 0;
- std::int64_t new_head_part_num;
+ std::int64_t new_part_num;
bool canceled = false;
uint64_t tid;
NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
std::vector<fifo::journal_entry> jentries,
- std::int64_t new_head_part_num,
+ std::int64_t new_part_num,
std::uint64_t tid)
: Completion(dpp, super), f(f), jentries(std::move(jentries)),
- new_head_part_num(new_head_part_num), tid(tid) {}
+ new_part_num(new_part_num), tid(tid) {}
void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
if (canceled) {
std::unique_lock l(f->m);
- auto iter = f->info.journal.find(jentries.front().part_num);
+ auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) ||
+ f->info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}));
auto max_push_part_num = f->info.max_push_part_num;
auto head_part_num = f->info.head_part_num;
auto version = f->info.version;
- auto found = (iter != f->info.journal.end());
l.unlock();
- if ((max_push_part_num >= jentries.front().part_num &&
- head_part_num >= new_head_part_num)) {
+ if ((max_push_part_num >= new_part_num &&
+ head_part_num >= new_part_num)) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " raced, but journaled and processed: i=" << i
<< " tid=" << tid << dendl;
}
};
-void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid,
- lr::AioCompletion* c)
+void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num,
+ bool is_head, std::uint64_t tid, lr::AioCompletion* c)
{
std::unique_lock l(m);
- std::vector jentries = { info.next_journal_entry(generate_tag()) };
- if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
+ std::vector<fifo::journal_entry> jentries{{fifo::journal_entry::Op::create, new_part_num}};
+ if (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) &&
+ (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}))) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
process_journal(dpp, tid, c);
return;
}
- std::int64_t new_head_part_num = info.head_part_num;
auto version = info.version;
if (is_head) {
- auto new_head_jentry = jentries.front();
- new_head_jentry.op = fifo::journal_entry::Op::set_head;
- new_head_part_num = jentries.front().part_num;
- jentries.push_back(std::move(new_head_jentry));
+ jentries.push_back({ fifo::journal_entry::Op::set_head, new_part_num });
}
l.unlock();
auto n = std::make_unique<NewPartPreparer>(dpp, this, c, jentries,
- new_head_part_num, tid);
+ new_part_num, tid);
auto np = n.get();
_update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version,
&np->canceled, tid, NewPartPreparer::call(std::move(n)));
FIFO* f;
int i = 0;
bool newpart;
- std::int64_t new_head_num;
+ std::int64_t new_head_part_num;
bool canceled = false;
std::uint64_t tid;
NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
- bool newpart, std::int64_t new_head_num, std::uint64_t tid)
- : Completion(dpp, super), f(f), newpart(newpart), new_head_num(new_head_num),
- tid(tid) {}
+ bool newpart, std::int64_t new_head_part_num,
+ std::uint64_t tid)
+ : Completion(dpp, super), f(f), newpart(newpart),
+ new_head_part_num(new_head_part_num), tid(tid) {}
void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
if (newpart)
return;
}
std::unique_lock l(f->m);
- if (f->info.max_push_part_num < new_head_num) {
+ if (f->info.max_push_part_num < new_head_part_num) {
l.unlock();
lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " _prepare_new_part failed: r=" << r
}
void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
- std::unique_lock l(f->m);
- auto head_part_num = f->info.head_part_num;
- auto version = f->info.version;
- l.unlock();
-
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " entering: tid=" << tid << dendl;
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " _update_meta failed: r=" << r
+ << " _update_meta failed: r=" << r
<< " tid=" << tid << dendl;
complete(std::move(p), r);
return;
}
+
if (canceled) {
+ std::unique_lock l(f->m);
+ auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, new_head_part_num }) ||
+ f->info.journal.contains({fifo::journal_entry::Op::set_head, new_head_part_num }));
+ auto head_part_num = f->info.head_part_num;
+ auto version = f->info.version;
+
+ l.unlock();
+ if ((head_part_num >= new_head_part_num)) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, but journaled and processed: i=" << i
+ << " tid=" << tid << dendl;
+ complete(std::move(p), 0);
+ return;
+ }
if (i >= MAX_RACE_RETRIES) {
- ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " canceled too many times, giving up: tid=" << tid << dendl;
complete(std::move(p), -ECANCELED);
return;
}
-
- // Raced, but there's still work to do!
- if (head_part_num < new_head_num) {
- canceled = false;
+ if (!found) {
++i;
- ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " updating head: i=" << i << " tid=" << tid << dendl;
- f->_update_meta(dpp, fifo::update{}.head_part_num(new_head_num),
- version, &this->canceled, tid, call(std::move(p)));
+ fifo::journal_entry jentry;
+ jentry.op = fifo::journal_entry::Op::set_head;
+ jentry.part_num = new_head_part_num;
+ f->_update_meta(dpp, fifo::update{}
+ .journal_entries_add({{jentry}}),
+ version, &canceled, tid, call(std::move(p)));
return;
+ } else {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " raced, journaled but not processed: i=" << i
+ << " tid=" << tid << dendl;
+ canceled = false;
}
+ // Fall through. We still need to process the journal.
}
- ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " succeeded : i=" << i << " tid=" << tid << dendl;
- complete(std::move(p), 0);
+ f->process_journal(dpp, tid, super());
return;
}
};
-void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
+void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num,
+ std::uint64_t tid, lr::AioCompletion* c)
{
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
std::unique_lock l(m);
- int64_t new_head_num = info.head_part_num + 1;
auto max_push_part_num = info.max_push_part_num;
auto version = info.version;
l.unlock();
- if (max_push_part_num < new_head_num) {
+ if (max_push_part_num < new_head_part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " need new part: tid=" << tid << dendl;
- auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_num,
+ auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_part_num,
tid);
- _prepare_new_part(dpp, true, tid, NewHeadPreparer::call(std::move(n)));
+ _prepare_new_part(dpp, new_head_part_num, true, tid,
+ NewHeadPreparer::call(std::move(n)));
} else {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " updating head: tid=" << tid << dendl;
- auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_num,
+ auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_part_num,
tid);
auto np = n.get();
- _update_meta(dpp, fifo::update{}.head_part_num(new_head_num), version,
+ fifo::journal_entry jentry;
+ jentry.op = fifo::journal_entry::Op::set_head;
+ jentry.part_num = new_head_part_num;
+ _update_meta(dpp, fifo::update{}.journal_entries_add({{jentry}}), version,
&np->canceled, tid, NewHeadPreparer::call(std::move(n)));
}
}
<< " entering: tid=" << tid << dendl;
std::unique_lock l(m);
auto head_part_num = info.head_part_num;
- auto tag = info.head_tag;
const auto part_oid = info.part_oid(head_part_num);
l.unlock();
- auto r = push_part(dpp, ioctx, part_oid, tag, data_bufs, tid, y);
+ auto r = push_part(dpp, ioctx, part_oid, data_bufs, tid, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " push_part failed: r=" << r << " tid=" << tid << dendl;
{
std::unique_lock l(m);
auto head_part_num = info.head_part_num;
- auto tag = info.head_tag;
const auto part_oid = info.part_oid(head_part_num);
l.unlock();
- push_part(ioctx, part_oid, tag, data_bufs, tid, c);
+ push_part(ioctx, part_oid, data_bufs, tid, c);
}
int FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
- std::optional<std::string_view> tag,
bool exclusive, std::uint64_t tid,
optional_yield y)
{
std::unique_lock l(m);
const auto part_oid = info.part_oid(part_num);
l.unlock();
- rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
+ rgw::cls::fifo::trim_part(&op, ofs, exclusive);
auto r = rgw_rados_operate(dpp, ioctx, part_oid, &op, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
}
void FIFO::trim_part(const DoutPrefixProvider *dpp, int64_t part_num, uint64_t ofs,
- std::optional<std::string_view> tag,
bool exclusive, std::uint64_t tid,
lr::AioCompletion* c)
{
std::unique_lock l(m);
const auto part_oid = info.part_oid(part_num);
l.unlock();
- rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
+ rgw::cls::fifo::trim_part(&op, ofs, exclusive);
auto r = ioctx.aio_operate(part_oid, c, &op);
ceph_assert(r >= 0);
}
auto tid = ++next_tid;
auto max_entry_size = info.params.max_entry_size;
auto need_new_head = info.need_new_head();
+ auto head_part_num = info.head_part_num;
l.unlock();
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
if (need_new_head) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " need new head tid=" << tid << dendl;
- r = _prepare_new_head(dpp, tid, y);
+ r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " _prepare_new_head failed: r=" << r
<< " batch=" << batch.size() << " retries=" << retries
<< " tid=" << tid << dendl;
std::unique_lock l(m);
+ head_part_num = info.head_part_num;
auto max_part_size = info.params.max_part_size;
auto overhead = part_entry_overhead;
l.unlock();
canceled = true;
++retries;
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << " need new head tid=" << tid << dendl;
- r = _prepare_new_head(dpp, tid, y);
+ << " need new head tid=" << tid << dendl;
+ r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " prepare_new_head failed: r=" << r
r = 0;
continue;
}
+ if (r == -ENOENT) {
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " racing client trimmed part, rereading metadata "
+ << "tid=" << tid << dendl;
+ canceled = true;
+ ++retries;
+ r = read_meta(dpp, y);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: r=" << r
+ << " tid=" << tid << dendl;
+ return r;
+ }
+ r = 0;
+ continue;
+ }
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " push_entries failed: r=" << r
std::deque<cb::list> remaining;
std::deque<cb::list> batch;
int i = 0;
+ std::int64_t head_part_num;
std::uint64_t tid;
- bool new_heading = false;
+ enum { pushing, new_heading, meta_reading } state = pushing;
void prep_then_push(const DoutPrefixProvider *dpp, Ptr&& p, const unsigned successes) {
std::unique_lock l(f->m);
auto max_part_size = f->info.params.max_part_size;
auto part_entry_overhead = f->part_entry_overhead;
+ head_part_num = f->info.head_part_num;
l.unlock();
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
}
void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
- new_heading = true;
- f->_prepare_new_head(dpp, tid, call(std::move(p)));
+ state = new_heading;
+ f->_prepare_new_head(dpp, head_part_num + 1, tid, call(std::move(p)));
+ }
+
+ void read_meta(const DoutPrefixProvider *dpp, Ptr&& p) {
+ ++i;
+ state = meta_reading;
+ f->read_meta(dpp, tid, call(std::move(p)));
}
void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
- if (!new_heading) {
+ switch (state) {
+ case pushing:
if (r == -ERANGE) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " need new head tid=" << tid << dendl;
new_head(dpp, std::move(p));
return;
}
+ if (r == -ENOENT) {
+ if (i > MAX_RACE_RETRIES) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " racing client deleted part, but we're out"
+ << " of retries: tid=" << tid << dendl;
+ complete(std::move(p), r);
+ }
+ ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " racing client deleted part: tid=" << tid << dendl;
+ read_meta(dpp, std::move(p));
+ return;
+ }
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " push_entries failed: r=" << r
}
i = 0; // We've made forward progress, so reset the race counter!
prep_then_push(dpp, std::move(p), r);
- } else {
+ break;
+
+ case new_heading:
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " prepare_new_head failed: r=" << r
complete(std::move(p), r);
return;
}
- new_heading = false;
+ state = pushing;
handle_new_head(dpp, std::move(p), r);
+ break;
+
+ case meta_reading:
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: r=" << r
+ << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+ state = pushing;
+ prep_then_push(dpp, std::move(p), r);
+ break;
}
}
}
Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque<cb::list>&& remaining,
- std::uint64_t tid, lr::AioCompletion* super)
+ std::int64_t head_part_num, std::uint64_t tid,
+ lr::AioCompletion* super)
: Completion(dpp, super), f(f), remaining(std::move(remaining)),
- tid(tid) {}
+ head_part_num(head_part_num), tid(tid) {}
};
void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs,
auto tid = ++next_tid;
auto max_entry_size = info.params.max_entry_size;
auto need_new_head = info.need_new_head();
+ auto head_part_num = info.head_part_num;
l.unlock();
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
auto p = std::make_unique<Pusher>(dpp, this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
- tid, c);
+ head_part_num, tid, c);
// Validate sizes
for (const auto& bl : data_bufs) {
if (bl.length() > max_entry_size) {
auto part_oid = info.part_oid(part_num);
l.unlock();
- r = list_part(dpp, ioctx, part_oid, {}, ofs, max_entries, &entries,
- &part_more, &part_full, nullptr, tid, y);
+ r = list_part(dpp, ioctx, part_oid, ofs, max_entries, &entries,
+ &part_more, &part_full, tid, y);
if (r == -ENOENT) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " missing part, rereading metadata"
<< " pn=" << pn << " tid=" << tid << dendl;
std::unique_lock l(m);
l.unlock();
- r = trim_part(dpp, pn, max_part_size, std::nullopt, false, tid, y);
+ r = trim_part(dpp, pn, max_part_size, false, tid, y);
if (r < 0 && r == -ENOENT) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " trim_part failed: r=" << r
}
++pn;
}
- r = trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid, y);
+ r = trim_part(dpp, part_num, ofs, exclusive, tid, y);
if (r < 0 && r != -ENOENT) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " trim_part failed: r=" << r
if (pn < part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " pn=" << pn << " tid=" << tid << dendl;
- fifo->trim_part(dpp, pn++, max_part_size, std::nullopt,
- false, tid, call(std::move(p)));
+ fifo->trim_part(dpp, pn++, max_part_size, false, tid,
+ call(std::move(p)));
} else {
update = true;
canceled = tail_part_num < part_num;
- fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid,
- call(std::move(p)));
+ fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
}
return;
}
std::unique_lock l(fifo->m);
const auto max_part_size = fifo->info.params.max_part_size;
l.unlock();
- fifo->trim_part(dpp, pn++, max_part_size, std::nullopt,
- false, tid, call(std::move(p)));
+ fifo->trim_part(dpp, pn++, max_part_size, false, tid,
+ call(std::move(p)));
return;
}
l.unlock();
update = true;
canceled = tail_part_num < part_num;
- fifo->trim_part(dpp, part_num, ofs, std::nullopt, exclusive, tid,
- call(std::move(p)));
+ fifo->trim_part(dpp, part_num, ofs, exclusive, tid, call(std::move(p)));
return;
}
} else {
trimmer->update = true;
}
- trim_part(dpp, pn, ofs, std::nullopt, exclusive,
- tid, Trimmer::call(std::move(trimmer)));
+ trim_part(dpp, pn, ofs, exclusive, tid, Trimmer::call(std::move(trimmer)));
}
int FIFO::get_part_info(const DoutPrefixProvider *dpp, int64_t part_num,
FIFO* const fifo;
std::vector<fifo::journal_entry> processed;
- std::multimap<std::int64_t, fifo::journal_entry> journal;
- std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
+ decltype(fifo->info.journal) journal;
+ decltype(journal)::iterator iter;
std::int64_t new_tail;
std::int64_t new_head;
std::int64_t new_max;
pp_callback,
} state;
- void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num,
- std::string_view tag) {
+ void create_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
state = entry_callback;
op.create(false); /* We don't need exclusivity, part_init ensures
we're creating from the same journal entry. */
std::unique_lock l(fifo->m);
- part_init(&op, tag, fifo->info.params);
+ part_init(&op, fifo->info.params);
auto oid = fifo->info.part_oid(part_num);
l.unlock();
auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
return;
}
- void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num,
- std::string_view tag) {
+ void remove_part(const DoutPrefixProvider *dpp, Ptr&& p, int64_t part_num) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
state = entry_callback;
std::vector<fifo::journal_entry> new_processed;
std::unique_lock l(fifo->m);
for (auto& e : processed) {
- auto jiter = fifo->info.journal.find(e.part_num);
- /* journal entry was already processed */
- if (jiter == fifo->info.journal.end() ||
- !(jiter->second == e)) {
- continue;
+ if (fifo->info.journal.contains(e)) {
+ new_processed.push_back(e);
}
- new_processed.push_back(e);
}
processed = std::move(new_processed);
}
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " processing entry: entry=" << *iter
<< " tid=" << tid << dendl;
- const auto entry = iter->second;
+ const auto entry = *iter;
switch (entry.op) {
case fifo::journal_entry::Op::create:
- create_part(dpp, std::move(p), entry.part_num, entry.part_tag);
+ create_part(dpp, std::move(p), entry.part_num);
return;
case fifo::journal_entry::Op::set_head:
if (entry.part_num > new_head) {
++iter;
continue;
case fifo::journal_entry::Op::remove:
- remove_part(dpp, std::move(p), entry.part_num, entry.part_tag);
+ remove_part(dpp, std::move(p), entry.part_num);
return;
default:
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
switch (state) {
case entry_callback:
- finish_je(dpp, std::move(p), r, iter->second);
+ finish_je(dpp, std::move(p), r, *iter);
return;
case pp_callback:
auto c = canceled;
l.unlock();
read = false;
- auto op = list_part(f->cct, {}, ofs, max_entries, &r_out,
- &entries, &part_more, &part_full,
- nullptr, tid);
+ auto op = list_part(f->cct, ofs, max_entries, &r_out,
+ &entries, &part_more, &part_full, tid);
f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr);
} else {
complete(std::move(p), 0);