seastar::future<> Watch::start_notify(NotifyRef notify)
{
- logger().info("{} adding notify(id={})", __func__, notify->ninfo.notify_id);
+ logger().debug("{} gid={} cookie={} starting notify(id={})",
+ __func__, get_watcher_gid(), get_cookie(),
+ notify->ninfo.notify_id);
auto [ it, emplaced ] = in_progress_notifies.emplace(std::move(notify));
ceph_assert(emplaced);
ceph_assert(is_alive());
const uint64_t notify_id,
const ceph::bufferlist& reply_bl)
{
- logger().info("{}", __func__);
- return seastar::do_for_each(in_progress_notifies,
- [this_shared=shared_from_this(), reply_bl] (auto notify) {
- return notify->complete_watcher(this_shared, reply_bl);
- }
- ).then([this] {
- in_progress_notifies.clear();
+ logger().debug("{} gid={} cookie={} notify_id={}",
+ __func__, get_watcher_gid(), get_cookie(), notify_id);
+ const auto it = in_progress_notifies.find(notify_id);
+ if (it == std::end(in_progress_notifies)) {
+ logger().error("{} notify_id={} not found on the in-progess list."
+ " Supressing but this should not happen.",
+ __func__, notify_id);
return seastar::now();
- });
+ }
+ auto notify = *it;
+ logger().debug("Watch::notify_ack gid={} cookie={} found notify(id={})",
+ get_watcher_gid(),
+ get_cookie(),
+ notify->get_id());
+ // let's ensure we're extending the life-time till end of this method
+ static_assert(std::is_same_v<decltype(notify), NotifyRef>);
+ in_progress_notifies.erase(it);
+ return notify->complete_watcher(shared_from_this(), reply_bl);
}
seastar::future<> Watch::send_disconnect_msg()
void Watch::discard_state()
{
+ logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
ceph_assert(obc);
in_progress_notifies.clear();
timeout_timer.cancel();
seastar::future<> Watch::remove()
{
- logger().info("{}", __func__);
+ logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
// in contrast to ceph-osd crimson sends CEPH_WATCH_EVENT_DISCONNECT directly
// from the timeout handler and _after_ CEPH_WATCH_EVENT_NOTIFY_COMPLETE.
// this simplifies the Watch::remove() interface as callers aren't obliged
// becomes an implementation detail of Watch.
return seastar::do_for_each(in_progress_notifies,
[this_shared=shared_from_this()] (auto notify) {
+ logger().debug("Watch::remove gid={} cookie={} notify(id={})",
+ this_shared->get_watcher_gid(),
+ this_shared->get_cookie(),
+ notify->ninfo.notify_id);
return notify->remove_watcher(this_shared);
}).then([this] {
discard_state();
void Watch::cancel_notify(const uint64_t notify_id)
{
- logger().info("{} notify_id={}", __func__, notify_id);
+ logger().debug("{} gid={} cookie={} notify(id={})",
+ __func__, get_watcher_gid(), get_cookie(),
+ notify_id);
const auto it = in_progress_notifies.find(notify_id);
assert(it != std::end(in_progress_notifies));
in_progress_notifies.erase(it);
std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs)
{
out << "notify_reply_t{watcher_gid=" << rhs.watcher_gid
- << ", watcher_cookie=" << rhs.watcher_cookie
- << ", bl=" << rhs.bl << "}";
+ << ", watcher_cookie=" << rhs.watcher_cookie << "}";
return out;
}
user_version(user_version)
{}
+Notify::~Notify()
+{
+ logger().debug("{} for notify(id={})", __func__, ninfo.notify_id);
+}
+
seastar::future<> Notify::remove_watcher(WatchRef watch)
{
+ logger().debug("{} for notify(id={})", __func__, ninfo.notify_id);
+
if (discarded || complete) {
+ logger().debug("{} for notify(id={}) discarded/complete already"
+ " discarded: {} complete: {}", __func__,
+ ninfo.notify_id, discarded ,complete);
return seastar::now();
}
[[maybe_unused]] const auto num_removed = watchers.erase(watch);
WatchRef watch,
const ceph::bufferlist& reply_bl)
{
+ logger().debug("{} for notify(id={})", __func__, ninfo.notify_id);
+
if (discarded || complete) {
+ logger().debug("{} for notify(id={}) discarded/complete already"
+ " discarded: {} complete: {}", __func__,
+ ninfo.notify_id, discarded ,complete);
return seastar::now();
}
notify_replies.emplace(notify_reply_t{