1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 SUSE LINUX GmbH
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "ImageSyncThrottler.h"
16 #include "ImageSync.h"
17 #include "common/ceph_context.h"
19 #define dout_context g_ceph_context
20 #define dout_subsys ceph_subsys_rbd_mirror
22 #define dout_prefix *_dout << "rbd::mirror::ImageSyncThrottler:: " << this \
23 << " " << __func__ << ": "
24 using std::unique_ptr
;
31 template <typename ImageCtxT
>
32 struct ImageSyncThrottler
<ImageCtxT
>::C_SyncHolder
: public Context
{
33 ImageSyncThrottler
<ImageCtxT
> *m_sync_throttler
;
34 PoolImageId m_local_pool_image_id
;
35 ImageSync
<ImageCtxT
> *m_sync
= nullptr;
38 C_SyncHolder(ImageSyncThrottler
<ImageCtxT
> *sync_throttler
,
39 const PoolImageId
&local_pool_image_id
, Context
*on_finish
)
40 : m_sync_throttler(sync_throttler
),
41 m_local_pool_image_id(local_pool_image_id
), m_on_finish(on_finish
) {
44 void finish(int r
) override
{
45 m_sync_throttler
->handle_sync_finished(this);
46 m_on_finish
->complete(r
);
51 ImageSyncThrottler
<I
>::ImageSyncThrottler()
52 : m_max_concurrent_syncs(g_ceph_context
->_conf
->rbd_mirror_concurrent_image_syncs
),
53 m_lock("rbd::mirror::ImageSyncThrottler")
55 dout(20) << "Initialized max_concurrent_syncs=" << m_max_concurrent_syncs
57 g_ceph_context
->_conf
->add_observer(this);
61 ImageSyncThrottler
<I
>::~ImageSyncThrottler() {
63 Mutex::Locker
l(m_lock
);
64 assert(m_sync_queue
.empty());
65 assert(m_inflight_syncs
.empty());
68 g_ceph_context
->_conf
->remove_observer(this);
72 void ImageSyncThrottler
<I
>::start_sync(I
*local_image_ctx
, I
*remote_image_ctx
,
73 SafeTimer
*timer
, Mutex
*timer_lock
,
74 const std::string
&mirror_uuid
,
76 MirrorPeerClientMeta
*client_meta
,
77 ContextWQ
*work_queue
,
79 ProgressContext
*progress_ctx
) {
82 PoolImageId
pool_image_id(local_image_ctx
->md_ctx
.get_id(),
84 C_SyncHolder
*sync_holder_ctx
= new C_SyncHolder(this, pool_image_id
,
86 sync_holder_ctx
->m_sync
= ImageSync
<I
>::create(local_image_ctx
,
87 remote_image_ctx
, timer
,
88 timer_lock
, mirror_uuid
,
89 journaler
, client_meta
,
90 work_queue
, sync_holder_ctx
,
92 sync_holder_ctx
->m_sync
->get();
96 Mutex::Locker
l(m_lock
);
98 if (m_inflight_syncs
.size() < m_max_concurrent_syncs
) {
99 assert(m_inflight_syncs
.count(pool_image_id
) == 0);
100 m_inflight_syncs
[pool_image_id
] = sync_holder_ctx
;
102 dout(10) << "ready to start image sync for local_image_id "
103 << local_image_ctx
->id
<< " [" << m_inflight_syncs
.size() << "/"
104 << m_max_concurrent_syncs
<< "]" << dendl
;
106 m_sync_queue
.push_front(sync_holder_ctx
);
107 dout(10) << "image sync for local_image_id " << local_image_ctx
->id
108 << " has been queued" << dendl
;
113 sync_holder_ctx
->m_sync
->send();
117 template <typename I
>
118 void ImageSyncThrottler
<I
>::cancel_sync(librados::IoCtx
&local_io_ctx
,
119 const std::string local_image_id
) {
122 C_SyncHolder
*sync_holder
= nullptr;
123 bool running_sync
= true;
126 Mutex::Locker
l(m_lock
);
127 if (m_inflight_syncs
.empty()) {
128 // no image sync currently running and neither waiting
132 PoolImageId
local_pool_image_id(local_io_ctx
.get_id(),
134 auto it
= m_inflight_syncs
.find(local_pool_image_id
);
135 if (it
!= m_inflight_syncs
.end()) {
136 sync_holder
= it
->second
;
140 for (auto it
= m_sync_queue
.begin(); it
!= m_sync_queue
.end(); ++it
) {
141 if ((*it
)->m_local_pool_image_id
== local_pool_image_id
) {
143 m_sync_queue
.erase(it
);
144 running_sync
= false;
153 dout(10) << "canceled running image sync for local_image_id "
154 << sync_holder
->m_local_pool_image_id
.second
<< dendl
;
155 sync_holder
->m_sync
->cancel();
157 dout(10) << "canceled waiting image sync for local_image_id "
158 << sync_holder
->m_local_pool_image_id
.second
<< dendl
;
159 sync_holder
->m_on_finish
->complete(-ECANCELED
);
160 sync_holder
->m_sync
->put();
166 template <typename I
>
167 void ImageSyncThrottler
<I
>::handle_sync_finished(C_SyncHolder
*sync_holder
) {
170 C_SyncHolder
*next_sync_holder
= nullptr;
173 Mutex::Locker
l(m_lock
);
174 m_inflight_syncs
.erase(sync_holder
->m_local_pool_image_id
);
176 if (m_inflight_syncs
.size() < m_max_concurrent_syncs
&&
177 !m_sync_queue
.empty()) {
178 next_sync_holder
= m_sync_queue
.back();
179 m_sync_queue
.pop_back();
182 m_inflight_syncs
.count(next_sync_holder
->m_local_pool_image_id
) == 0);
183 m_inflight_syncs
[next_sync_holder
->m_local_pool_image_id
] =
185 dout(10) << "ready to start image sync for local_image_id "
186 << next_sync_holder
->m_local_pool_image_id
.second
187 << " [" << m_inflight_syncs
.size() << "/"
188 << m_max_concurrent_syncs
<< "]" << dendl
;
191 dout(10) << "currently running image syncs [" << m_inflight_syncs
.size()
192 << "/" << m_max_concurrent_syncs
<< "]" << dendl
;
195 if (next_sync_holder
) {
196 next_sync_holder
->m_sync
->send();
200 template <typename I
>
201 void ImageSyncThrottler
<I
>::set_max_concurrent_syncs(uint32_t max
) {
202 dout(20) << " max=" << max
<< dendl
;
206 std::list
<C_SyncHolder
*> next_sync_holders
;
208 Mutex::Locker
l(m_lock
);
209 this->m_max_concurrent_syncs
= max
;
211 // Start waiting syncs in the case of available free slots
212 while(m_inflight_syncs
.size() < m_max_concurrent_syncs
213 && !m_sync_queue
.empty()) {
214 C_SyncHolder
*next_sync_holder
= m_sync_queue
.back();
215 next_sync_holders
.push_back(next_sync_holder
);
216 m_sync_queue
.pop_back();
219 m_inflight_syncs
.count(next_sync_holder
->m_local_pool_image_id
) == 0);
220 m_inflight_syncs
[next_sync_holder
->m_local_pool_image_id
] =
223 dout(10) << "ready to start image sync for local_image_id "
224 << next_sync_holder
->m_local_pool_image_id
.second
225 << " [" << m_inflight_syncs
.size() << "/"
226 << m_max_concurrent_syncs
<< "]" << dendl
;
230 for (const auto& sync_holder
: next_sync_holders
) {
231 sync_holder
->m_sync
->send();
235 template <typename I
>
236 void ImageSyncThrottler
<I
>::print_status(Formatter
*f
, stringstream
*ss
) {
237 Mutex::Locker
l(m_lock
);
240 f
->dump_int("max_parallel_syncs", m_max_concurrent_syncs
);
241 f
->dump_int("running_syncs", m_inflight_syncs
.size());
242 f
->dump_int("waiting_syncs", m_sync_queue
.size());
246 *ss
<< "max_parallel_syncs=" << m_max_concurrent_syncs
<< ", ";
247 *ss
<< "running_syncs=" << m_inflight_syncs
.size() << ", ";
248 *ss
<< "waiting_syncs=" << m_sync_queue
.size() << " ]";
252 template <typename I
>
253 const char** ImageSyncThrottler
<I
>::get_tracked_conf_keys() const {
254 static const char* KEYS
[] = {
255 "rbd_mirror_concurrent_image_syncs",
261 template <typename I
>
262 void ImageSyncThrottler
<I
>::handle_conf_change(
263 const struct md_config_t
*conf
,
264 const set
<string
> &changed
) {
265 if (changed
.count("rbd_mirror_concurrent_image_syncs")) {
266 set_max_concurrent_syncs(conf
->rbd_mirror_concurrent_image_syncs
);
270 } // namespace mirror
273 template class rbd::mirror::ImageSyncThrottler
<librbd::ImageCtx
>;