#include "osd/PeeringState.h"
-#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGCreate2.h"
#include "common/Formatter.h"
+#include "crimson/common/exception.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd.h"
#include "crimson/osd/osd_operations/compound_peering_request.h"
seastar::promise<BufferedRecoveryMessages> promise;
// assuming crimson-osd won't need to be compatible with pre-octopus
// releases
- BufferedRecoveryMessages ctx{ceph_release_t::octopus};
+ BufferedRecoveryMessages ctx;
compound_state() = default;
~compound_state() {
promise.set_value(std::move(ctx));
PeeringSubEvent(compound_state_ref state, Args &&... args) :
RemotePeeringEvent(std::forward<Args>(args)...), state(state) {}
- seastar::future<> complete_rctx(Ref<crimson::osd::PG> pg) final {
+ PeeringEvent::interruptible_future<>
+ complete_rctx(Ref<crimson::osd::PG> pg) final {
logger().debug("{}: submitting ctx transaction", *this);
state->ctx.accept_buffered_messages(ctx);
state = {};
}
};
-std::vector<OperationRef> handle_pg_create(
+std::vector<crimson::OperationRef> handle_pg_create(
OSD &osd,
crimson::net::ConnectionRef conn,
compound_state_ref state,
Ref<MOSDPGCreate2> m)
{
- std::vector<OperationRef> ret;
- for (auto &p : m->pgs) {
- const spg_t &pgid = p.first;
- const auto &[created, created_stamp] = p.second;
+ std::vector<crimson::OperationRef> ret;
+ for (auto& [pgid, when] : m->pgs) {
+ const auto &[created, created_stamp] = when;
auto q = m->pg_extra.find(pgid);
ceph_assert(q != m->pg_extra.end());
+ auto& [history, pi] = q->second;
logger().debug(
- "{}, {} {} e{} @{} history {} pi {}",
- __func__,
- pgid,
- created,
- created_stamp,
- q->second.first,
- q->second.second);
- if (!q->second.second.empty() &&
- m->epoch < q->second.second.get_bounds().second) {
+ "{}: {} e{} @{} "
+ "history {} pi {}",
+ __func__, pgid, created, created_stamp,
+ history, pi);
+ if (!pi.empty() &&
+ m->epoch < pi.get_bounds().second) {
logger().error(
- "got pg_create on {} epoch {} unmatched past_intervals (history {})",
- pgid,
- m->epoch,
- q->second.second,
- q->second.first);
+ "got pg_create on {} epoch {} "
+ "unmatched past_intervals {} (history {})",
+ pgid, m->epoch,
+ pi, history);
} else {
auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
state,
m->epoch,
NullEvt(),
true,
- new PGCreateInfo(
- pgid,
- m->epoch,
- q->second.first,
- q->second.second,
- true)).first;
+ new PGCreateInfo(pgid, m->epoch, history, pi, true)).first;
ret.push_back(op);
}
}
return ret;
}
-struct SubOpBlocker : BlockerT<SubOpBlocker> {
+struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
static constexpr const char * type_name = "CompoundOpBlocker";
- std::vector<OperationRef> subops;
- SubOpBlocker(std::vector<OperationRef> &&subops) : subops(subops) {}
+ std::vector<crimson::OperationRef> subops;
+ SubOpBlocker(std::vector<crimson::OperationRef> &&subops)
+ : subops(subops) {}
virtual void dump_detail(Formatter *f) const {
f->open_array_section("dependent_operations");
IRef ref = this;
logger().info("{}: about to fork future", *this);
- return with_blocking_future(
- blocker->make_blocking_future(state->promise.get_future())
- ).then([this, blocker=std::move(blocker)](auto &&ctx) {
- logger().info("{}: sub events complete", *this);
- return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
- }).then([this, ref=std::move(ref)] {
- logger().info("{}: complete", *this);
+ return crimson::common::handle_system_shutdown(
+ [this, ref, blocker=std::move(blocker), state]() mutable {
+ return with_blocking_future(
+ blocker->make_blocking_future(state->promise.get_future())
+ ).then([this, blocker=std::move(blocker)](auto &&ctx) {
+ logger().info("{}: sub events complete", *this);
+ return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
+ }).then([this, ref=std::move(ref)] {
+ logger().info("{}: complete", *this);
+ });
});
}