]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/watcher/Notifier.h" | |
5 | #include "common/WorkQueue.h" | |
6 | #include "librbd/ImageCtx.h" | |
7 | #include "librbd/Utils.h" | |
8 | #include "librbd/watcher/Types.h" | |
9 | ||
10 | #define dout_subsys ceph_subsys_rbd | |
11 | #undef dout_prefix | |
12 | #define dout_prefix *_dout << "librbd::watcher::Notifier: " \ | |
13 | << this << " " << __func__ << ": " | |
14 | ||
15 | namespace librbd { | |
16 | namespace watcher { | |
17 | ||
18 | const uint64_t Notifier::NOTIFY_TIMEOUT = 5000; | |
19 | ||
20 | Notifier::C_AioNotify::C_AioNotify(Notifier *notifier, NotifyResponse *response, | |
21 | Context *on_finish) | |
22 | : notifier(notifier), response(response), on_finish(on_finish) { | |
23 | } | |
24 | ||
25 | void Notifier::C_AioNotify::finish(int r) { | |
26 | if (response != nullptr) { | |
27 | if (r == 0 || r == -ETIMEDOUT) { | |
28 | try { | |
11fdf7f2 TL |
29 | auto it = out_bl.cbegin(); |
30 | decode(*response, it); | |
7c673cae FG |
31 | } catch (const buffer::error &err) { |
32 | r = -EBADMSG; | |
33 | } | |
34 | } | |
35 | } | |
36 | notifier->handle_notify(r, on_finish); | |
37 | } | |
38 | ||
39 | Notifier::Notifier(ContextWQ *work_queue, IoCtx &ioctx, const std::string &oid) | |
40 | : m_work_queue(work_queue), m_ioctx(ioctx), m_oid(oid), | |
41 | m_aio_notify_lock(util::unique_lock_name( | |
42 | "librbd::object_watcher::Notifier::m_aio_notify_lock", this)) { | |
43 | m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); | |
44 | } | |
45 | ||
46 | Notifier::~Notifier() { | |
47 | Mutex::Locker aio_notify_locker(m_aio_notify_lock); | |
11fdf7f2 | 48 | ceph_assert(m_pending_aio_notifies == 0); |
7c673cae FG |
49 | } |
50 | ||
51 | void Notifier::flush(Context *on_finish) { | |
52 | Mutex::Locker aio_notify_locker(m_aio_notify_lock); | |
53 | if (m_pending_aio_notifies == 0) { | |
54 | m_work_queue->queue(on_finish, 0); | |
55 | return; | |
56 | } | |
57 | ||
58 | m_aio_notify_flush_ctxs.push_back(on_finish); | |
59 | } | |
60 | ||
61 | void Notifier::notify(bufferlist &bl, NotifyResponse *response, | |
62 | Context *on_finish) { | |
63 | { | |
64 | Mutex::Locker aio_notify_locker(m_aio_notify_lock); | |
65 | ++m_pending_aio_notifies; | |
66 | ||
67 | ldout(m_cct, 20) << "pending=" << m_pending_aio_notifies << dendl; | |
68 | } | |
69 | ||
70 | C_AioNotify *ctx = new C_AioNotify(this, response, on_finish); | |
71 | librados::AioCompletion *comp = util::create_rados_callback(ctx); | |
72 | int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, &ctx->out_bl); | |
11fdf7f2 | 73 | ceph_assert(r == 0); |
7c673cae FG |
74 | comp->release(); |
75 | } | |
76 | ||
77 | void Notifier::handle_notify(int r, Context *on_finish) { | |
78 | ldout(m_cct, 20) << "r=" << r << dendl; | |
79 | ||
80 | Mutex::Locker aio_notify_locker(m_aio_notify_lock); | |
11fdf7f2 | 81 | ceph_assert(m_pending_aio_notifies > 0); |
7c673cae FG |
82 | --m_pending_aio_notifies; |
83 | ||
84 | ldout(m_cct, 20) << "pending=" << m_pending_aio_notifies << dendl; | |
85 | if (m_pending_aio_notifies == 0) { | |
86 | for (auto ctx : m_aio_notify_flush_ctxs) { | |
87 | m_work_queue->queue(ctx, 0); | |
88 | } | |
89 | m_aio_notify_flush_ctxs.clear(); | |
90 | } | |
91 | ||
92 | if (on_finish != nullptr) { | |
93 | m_work_queue->queue(on_finish, r); | |
94 | } | |
95 | } | |
96 | ||
97 | } // namespace watcher | |
98 | } // namespace librbd |