1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/watcher/Notifier.h"
5 #include "common/WorkQueue.h"
6 #include "librbd/ImageCtx.h"
7 #include "librbd/Utils.h"
8 #include "librbd/watcher/Types.h"
10 #define dout_subsys ceph_subsys_rbd
12 #define dout_prefix *_dout << "librbd::watcher::Notifier: " \
13 << this << " " << __func__ << ": "
18 const uint64_t Notifier::NOTIFY_TIMEOUT
= 5000;
20 Notifier::C_AioNotify::C_AioNotify(Notifier
*notifier
, NotifyResponse
*response
,
22 : notifier(notifier
), response(response
), on_finish(on_finish
) {
25 void Notifier::C_AioNotify::finish(int r
) {
26 if (response
!= nullptr) {
27 if (r
== 0 || r
== -ETIMEDOUT
) {
29 auto it
= out_bl
.cbegin();
30 decode(*response
, it
);
31 } catch (const buffer::error
&err
) {
36 notifier
->handle_notify(r
, on_finish
);
39 Notifier::Notifier(ContextWQ
*work_queue
, IoCtx
&ioctx
, const std::string
&oid
)
40 : m_work_queue(work_queue
), m_ioctx(ioctx
), m_oid(oid
),
41 m_aio_notify_lock(ceph::make_mutex(util::unique_lock_name(
42 "librbd::object_watcher::Notifier::m_aio_notify_lock", this))) {
43 m_cct
= reinterpret_cast<CephContext
*>(m_ioctx
.cct());
46 Notifier::~Notifier() {
47 std::lock_guard aio_notify_locker
{m_aio_notify_lock
};
48 ceph_assert(m_pending_aio_notifies
== 0);
51 void Notifier::flush(Context
*on_finish
) {
52 std::lock_guard aio_notify_locker
{m_aio_notify_lock
};
53 if (m_pending_aio_notifies
== 0) {
54 m_work_queue
->queue(on_finish
, 0);
58 m_aio_notify_flush_ctxs
.push_back(on_finish
);
61 void Notifier::notify(bufferlist
&bl
, NotifyResponse
*response
,
64 std::lock_guard aio_notify_locker
{m_aio_notify_lock
};
65 ++m_pending_aio_notifies
;
67 ldout(m_cct
, 20) << "pending=" << m_pending_aio_notifies
<< dendl
;
70 C_AioNotify
*ctx
= new C_AioNotify(this, response
, on_finish
);
71 librados::AioCompletion
*comp
= util::create_rados_callback(ctx
);
72 int r
= m_ioctx
.aio_notify(m_oid
, comp
, bl
, NOTIFY_TIMEOUT
, &ctx
->out_bl
);
77 void Notifier::handle_notify(int r
, Context
*on_finish
) {
78 ldout(m_cct
, 20) << "r=" << r
<< dendl
;
80 std::lock_guard aio_notify_locker
{m_aio_notify_lock
};
81 ceph_assert(m_pending_aio_notifies
> 0);
82 --m_pending_aio_notifies
;
84 ldout(m_cct
, 20) << "pending=" << m_pending_aio_notifies
<< dendl
;
85 if (m_pending_aio_notifies
== 0) {
86 for (auto ctx
: m_aio_notify_flush_ctxs
) {
87 m_work_queue
->queue(ctx
, 0);
89 m_aio_notify_flush_ctxs
.clear();
92 if (on_finish
!= nullptr) {
93 m_work_queue
->queue(on_finish
, r
);
97 } // namespace watcher