#include "crimson/osd/osd_operation_external_tracking.h"
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_connection_priv.h"
+#include "osd/object_state_fmt.h"
namespace {
seastar::logger& logger() {
return get_osd_priv(conn.get()).client_request_conn_pipeline;
}
-ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
+ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
}
return interruptor::now();
});
}
- return ihref.enter_stage<interruptor>(pp(pg).await_map, *this
+ return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
).then_interruptible([this, this_instance_id, &pg, &ihref] {
logger().debug("{}.{}: after await_map stage", *this, this_instance_id);
return ihref.enter_blocker(
m->get_min_epoch(), nullptr);
}).then_interruptible([this, this_instance_id, &pg, &ihref](auto map) {
logger().debug("{}.{}: after wait_for_map", *this, this_instance_id);
- return ihref.enter_stage<interruptor>(pp(pg).wait_for_active, *this);
+ return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
}).then_interruptible([this, this_instance_id, &pg, &ihref]() {
logger().debug(
"{}.{}: after wait_for_active stage", *this, this_instance_id);
ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
{
return ihref.enter_stage<interruptor>(
- pp(*pg).recover_missing,
+ client_pp(*pg).recover_missing,
*this
).then_interruptible(
[this, pg]() mutable {
reply->set_reply_versions(completed->version, completed->user_version);
return conn->send(std::move(reply));
} else {
- return ihref.enter_stage<interruptor>(pp(*pg).get_obc, *this
+ return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
).then_interruptible(
[this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<> {
logger().debug("{}: in get_obc stage", *this);
return pg->with_locked_obc(
m->get_hobj(), op_info,
[this, pg, &ihref](auto obc) mutable {
- return ihref.enter_stage<interruptor>(pp(*pg).process, *this
- ).then_interruptible([this, pg, obc, &ihref]() mutable {
- return do_process(ihref, pg, obc);
+ logger().debug("{}: got obc {}", *this, obc->obs);
+ return ihref.enter_stage<interruptor>(
+ client_pp(*pg).process, *this
+ ).then_interruptible([this, pg, obc, &ihref]() mutable {
+ return do_process(ihref, pg, obc);
+ });
});
- });
});
}
});
return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible(
[this, pg, &ihref](auto submitted, auto all_completed) mutable {
return submitted.then_interruptible([this, pg, &ihref] {
- return ihref.enter_stage<interruptor>(pp(*pg).wait_repop, *this);
+ return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
}).then_interruptible(
[this, pg, all_completed=std::move(all_completed), &ihref]() mutable {
return all_completed.safe_then_interruptible(
[this, pg, &ihref](MURef<MOSDOpReply> reply) {
- return ihref.enter_stage<interruptor>(pp(*pg).send_reply, *this
+ return ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this
).then_interruptible(
[this, reply=std::move(reply)]() mutable {
logger().debug("{}: sending response", *this);