]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mirroring/fs/dir_map/update.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / mirroring / fs / dir_map / update.py
1 import errno
2 import pickle
3 import logging
4
5 import rados
6
7 from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \
8 INSTANCE_ID_PREFIX, MIRROR_OBJECT_PREFIX
9
10 log = logging.getLogger(__name__)
11
12 MAX_UPDATE = 256
13
14 class UpdateDirMapRequest:
15 def __init__(self, ioctx, update_mapping, removals, on_finish_callback):
16 self.ioctx = ioctx
17 self.update_mapping = update_mapping
18 self.removals = removals
19 self.on_finish_callback = on_finish_callback
20
21 @staticmethod
22 def omap_key(dir_path):
23 return f'{DIRECTORY_MAP_PREFIX}{dir_path}'
24
25 def send(self):
26 log.info('updating image map')
27 self.send_update()
28
29 def send_update(self):
30 log.debug(f'pending updates: {len(self.update_mapping)}+{len(self.removals)}')
31 try:
32 with rados.WriteOpCtx() as write_op:
33 keys = []
34 vals = []
35 dir_keys = list(self.update_mapping.keys())[0:MAX_UPDATE]
36 # gather updates
37 for dir_path in dir_keys:
38 mapping = self.update_mapping.pop(dir_path)
39 keys.append(UpdateDirMapRequest.omap_key(dir_path))
40 vals.append(pickle.dumps(mapping))
41 self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
42 # gather deletes
43 slicept = MAX_UPDATE - len(dir_keys)
44 removals = [UpdateDirMapRequest.omap_key(dir_path) for dir_path in self.removals[0:slicept]]
45 self.removals = self.removals[slicept:]
46 self.ioctx.remove_omap_keys(write_op, tuple(removals))
47 log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
48 self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
49 except rados.Error as e:
50 log.error(f'UpdateDirMapRequest.send_update exception: {e}')
51 self.finish(-e.args[0])
52
53 def handle_update(self, completion):
54 r = completion.get_return_value()
55 log.debug(f'handle_update: r={r}')
56 if not r == 0:
57 self.finish(r)
58 elif self.update_mapping or self.removals:
59 self.send_update()
60 else:
61 self.finish(0)
62
63 def finish(self, r):
64 log.info(f'finish: r={r}')
65 self.on_finish_callback(r)
66
67 class UpdateInstanceRequest:
68 def __init__(self, ioctx, instances_added, instances_removed, on_finish_callback):
69 self.ioctx = ioctx
70 self.instances_added = instances_added
71 # purge vs remove: purge list is for purging on-disk instance
72 # object. remove is for purging instance map.
73 self.instances_removed = instances_removed.copy()
74 self.instances_purge = instances_removed.copy()
75 self.on_finish_callback = on_finish_callback
76
77 @staticmethod
78 def omap_key(instance_id):
79 return f'{INSTANCE_ID_PREFIX}{instance_id}'
80
81 @staticmethod
82 def cephfs_mirror_object_name(instance_id):
83 assert instance_id != ''
84 return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'
85
86 def send(self):
87 log.info('updating instances')
88 self.send_update()
89
90 def send_update(self):
91 self.remove_instance_object()
92
93 def remove_instance_object(self):
94 log.debug(f'pending purges: {len(self.instances_purge)}')
95 if not self.instances_purge:
96 self.update_instance_map()
97 return
98 instance_id = self.instances_purge.pop()
99 self.ioctx.aio_remove(
100 UpdateInstanceRequest.cephfs_mirror_object_name(instance_id), oncomplete=self.handle_remove)
101
102 def handle_remove(self, completion):
103 r = completion.get_return_value()
104 log.debug(f'handle_remove: r={r}')
105 # cephfs-mirror instances remove their respective instance
106 # objects upon termination. so we handle ENOENT here. note
107 # that when an instance is blocklisted, it wont be able to
108 # purge its instance object, so we do it on its behalf.
109 if not r == 0 and not r == -errno.ENOENT:
110 self.finish(r)
111 return
112 self.remove_instance_object()
113
114 def update_instance_map(self):
115 log.debug(f'pending updates: {len(self.instances_added)}+{len(self.instances_removed)}')
116 try:
117 with rados.WriteOpCtx() as write_op:
118 keys = []
119 vals = []
120 instance_ids = list(self.instances_added.keys())[0:MAX_UPDATE]
121 # gather updates
122 for instance_id in instance_ids:
123 data = self.instances_added.pop(instance_id)
124 keys.append(UpdateInstanceRequest.omap_key(instance_id))
125 vals.append(pickle.dumps(data))
126 self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
127 # gather deletes
128 slicept = MAX_UPDATE - len(instance_ids)
129 removals = [UpdateInstanceRequest.omap_key(instance_id) \
130 for instance_id in self.instances_removed[0:slicept]]
131 self.instances_removed = self.instances_removed[slicept:]
132 self.ioctx.remove_omap_keys(write_op, tuple(removals))
133 log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
134 self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
135 except rados.Error as e:
136 log.error(f'UpdateInstanceRequest.update_instance_map exception: {e}')
137 self.finish(-e.args[0])
138
139 def handle_update(self, completion):
140 r = completion.get_return_value()
141 log.debug(f'handle_update: r={r}')
142 if not r == 0:
143 self.finish(r)
144 elif self.instances_added or self.instances_removed:
145 self.update_instance_map()
146 else:
147 self.finish(0)
148
149 def finish(self, r):
150 log.info(f'finish: r={r}')
151 self.on_finish_callback(r)