]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mirroring/fs/dir_map/update.py
7 from ..utils
import MIRROR_OBJECT_NAME
, DIRECTORY_MAP_PREFIX
, \
8 INSTANCE_ID_PREFIX
, MIRROR_OBJECT_PREFIX
10 log
= logging
.getLogger(__name__
)
14 class UpdateDirMapRequest
:
15 def __init__(self
, ioctx
, update_mapping
, removals
, on_finish_callback
):
17 self
.update_mapping
= update_mapping
18 self
.removals
= removals
19 self
.on_finish_callback
= on_finish_callback
22 def omap_key(dir_path
):
23 return f
'{DIRECTORY_MAP_PREFIX}{dir_path}'
26 log
.info('updating image map')
29 def send_update(self
):
30 log
.debug(f
'pending updates: {len(self.update_mapping)}+{len(self.removals)}')
32 with rados
.WriteOpCtx() as write_op
:
35 dir_keys
= list(self
.update_mapping
.keys())[0:MAX_UPDATE
]
37 for dir_path
in dir_keys
:
38 mapping
= self
.update_mapping
.pop(dir_path
)
39 keys
.append(UpdateDirMapRequest
.omap_key(dir_path
))
40 vals
.append(pickle
.dumps(mapping
))
41 self
.ioctx
.set_omap(write_op
, tuple(keys
), tuple(vals
))
43 slicept
= MAX_UPDATE
- len(dir_keys
)
44 removals
= [UpdateDirMapRequest
.omap_key(dir_path
) for dir_path
in self
.removals
[0:slicept
]]
45 self
.removals
= self
.removals
[slicept
:]
46 self
.ioctx
.remove_omap_keys(write_op
, tuple(removals
))
47 log
.debug(f
'applying {len(keys)} updates, {len(removals)} deletes')
48 self
.ioctx
.operate_aio_write_op(write_op
, MIRROR_OBJECT_NAME
, oncomplete
=self
.handle_update
)
49 except rados
.Error
as e
:
50 log
.error(f
'UpdateDirMapRequest.send_update exception: {e}')
51 self
.finish(-e
.args
[0])
53 def handle_update(self
, completion
):
54 r
= completion
.get_return_value()
55 log
.debug(f
'handle_update: r={r}')
58 elif self
.update_mapping
or self
.removals
:
64 log
.info(f
'finish: r={r}')
65 self
.on_finish_callback(r
)
67 class UpdateInstanceRequest
:
68 def __init__(self
, ioctx
, instances_added
, instances_removed
, on_finish_callback
):
70 self
.instances_added
= instances_added
71 # purge vs remove: purge list is for purging on-disk instance
72 # object. remove is for purging instance map.
73 self
.instances_removed
= instances_removed
.copy()
74 self
.instances_purge
= instances_removed
.copy()
75 self
.on_finish_callback
= on_finish_callback
78 def omap_key(instance_id
):
79 return f
'{INSTANCE_ID_PREFIX}{instance_id}'
82 def cephfs_mirror_object_name(instance_id
):
83 assert instance_id
!= ''
84 return f
'{MIRROR_OBJECT_PREFIX}.{instance_id}'
87 log
.info('updating instances')
90 def send_update(self
):
91 self
.remove_instance_object()
93 def remove_instance_object(self
):
94 log
.debug(f
'pending purges: {len(self.instances_purge)}')
95 if not self
.instances_purge
:
96 self
.update_instance_map()
98 instance_id
= self
.instances_purge
.pop()
99 self
.ioctx
.aio_remove(
100 UpdateInstanceRequest
.cephfs_mirror_object_name(instance_id
), oncomplete
=self
.handle_remove
)
102 def handle_remove(self
, completion
):
103 r
= completion
.get_return_value()
104 log
.debug(f
'handle_remove: r={r}')
105 # cephfs-mirror instances remove their respective instance
106 # objects upon termination. so we handle ENOENT here. note
107 # that when an instance is blocklisted, it wont be able to
108 # purge its instance object, so we do it on its behalf.
109 if not r
== 0 and not r
== -errno
.ENOENT
:
112 self
.remove_instance_object()
114 def update_instance_map(self
):
115 log
.debug(f
'pending updates: {len(self.instances_added)}+{len(self.instances_removed)}')
117 with rados
.WriteOpCtx() as write_op
:
120 instance_ids
= list(self
.instances_added
.keys())[0:MAX_UPDATE
]
122 for instance_id
in instance_ids
:
123 data
= self
.instances_added
.pop(instance_id
)
124 keys
.append(UpdateInstanceRequest
.omap_key(instance_id
))
125 vals
.append(pickle
.dumps(data
))
126 self
.ioctx
.set_omap(write_op
, tuple(keys
), tuple(vals
))
128 slicept
= MAX_UPDATE
- len(instance_ids
)
129 removals
= [UpdateInstanceRequest
.omap_key(instance_id
) \
130 for instance_id
in self
.instances_removed
[0:slicept
]]
131 self
.instances_removed
= self
.instances_removed
[slicept
:]
132 self
.ioctx
.remove_omap_keys(write_op
, tuple(removals
))
133 log
.debug(f
'applying {len(keys)} updates, {len(removals)} deletes')
134 self
.ioctx
.operate_aio_write_op(write_op
, MIRROR_OBJECT_NAME
, oncomplete
=self
.handle_update
)
135 except rados
.Error
as e
:
136 log
.error(f
'UpdateInstanceRequest.update_instance_map exception: {e}')
137 self
.finish(-e
.args
[0])
139 def handle_update(self
, completion
):
140 r
= completion
.get_return_value()
141 log
.debug(f
'handle_update: r={r}')
144 elif self
.instances_added
or self
.instances_removed
:
145 self
.update_instance_map()
150 log
.info(f
'finish: r={r}')
151 self
.on_finish_callback(r
)