]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | import json |
2 | import requests.compat | |
3 | import logging | |
4 | ||
5 | import boto | |
6 | import boto.s3.connection | |
7 | ||
8 | import dateutil.parser | |
9 | import datetime | |
10 | ||
11 | import re | |
12 | ||
13 | from nose.tools import eq_ as eq | |
f67539c2 TL |
14 | from itertools import zip_longest # type: ignore |
15 | from urllib.parse import urlparse | |
11fdf7f2 TL |
16 | |
17 | from .multisite import * | |
18 | from .tools import * | |
19 | ||
20 | log = logging.getLogger(__name__) | |
21 | ||
22 | def get_key_ver(k): | |
23 | if not k.version_id: | |
24 | return 'null' | |
25 | return k.version_id | |
26 | ||
27 | def unquote(s): | |
28 | if s[0] == '"' and s[-1] == '"': | |
29 | return s[1:-1] | |
30 | return s | |
31 | ||
32 | def check_object_eq(k1, k2, check_extra = True): | |
33 | assert k1 | |
34 | assert k2 | |
35 | log.debug('comparing key name=%s', k1.name) | |
36 | eq(k1.name, k2.name) | |
37 | eq(k1.metadata, k2.metadata) | |
38 | # eq(k1.cache_control, k2.cache_control) | |
39 | eq(k1.content_type, k2.content_type) | |
40 | eq(k1.content_encoding, k2.content_encoding) | |
41 | eq(k1.content_disposition, k2.content_disposition) | |
42 | eq(k1.content_language, k2.content_language) | |
43 | ||
44 | eq(unquote(k1.etag), unquote(k2.etag)) | |
45 | ||
46 | mtime1 = dateutil.parser.parse(k1.last_modified) | |
47 | mtime2 = dateutil.parser.parse(k2.last_modified) | |
48 | log.debug('k1.last_modified=%s k2.last_modified=%s', k1.last_modified, k2.last_modified) | |
49 | assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution | |
50 | # if check_extra: | |
51 | # eq(k1.owner.id, k2.owner.id) | |
52 | # eq(k1.owner.display_name, k2.owner.display_name) | |
53 | # eq(k1.storage_class, k2.storage_class) | |
54 | eq(k1.size, k2.size) | |
55 | eq(get_key_ver(k1), get_key_ver(k2)) | |
56 | # eq(k1.encrypted, k2.encrypted) | |
57 | ||
58 | def make_request(conn, method, bucket, key, query_args, headers): | |
59 | result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers) | |
e306af50 | 60 | if result.status // 100 != 2: |
11fdf7f2 TL |
61 | raise boto.exception.S3ResponseError(result.status, result.reason, result.read()) |
62 | return result | |
63 | ||
64 | class CloudKey: | |
65 | def __init__(self, zone_bucket, k): | |
66 | self.zone_bucket = zone_bucket | |
67 | ||
68 | # we need two keys: when listing buckets, we get keys that only contain partial data | |
69 | # but we need to have the full data so that we could use all the meta-rgwx- headers | |
70 | # that are needed in order to create a correct representation of the object | |
71 | self.key = k | |
72 | self.rgwx_key = k # assuming k has all the meta info on, if not then we'll update it in update() | |
73 | self.update() | |
74 | ||
75 | def update(self): | |
76 | k = self.key | |
77 | rk = self.rgwx_key | |
78 | ||
79 | self.size = rk.size | |
80 | orig_name = rk.metadata.get('rgwx-source-key') | |
81 | if not orig_name: | |
82 | self.rgwx_key = self.zone_bucket.bucket.get_key(k.name, version_id = k.version_id) | |
83 | rk = self.rgwx_key | |
84 | orig_name = rk.metadata.get('rgwx-source-key') | |
85 | ||
86 | self.name = orig_name | |
87 | self.version_id = rk.metadata.get('rgwx-source-version-id') | |
88 | ||
89 | ve = rk.metadata.get('rgwx-versioned-epoch') | |
90 | if ve: | |
91 | self.versioned_epoch = int(ve) | |
92 | else: | |
93 | self.versioned_epoch = 0 | |
94 | ||
95 | mt = rk.metadata.get('rgwx-source-mtime') | |
96 | if mt: | |
97 | self.last_modified = datetime.datetime.utcfromtimestamp(float(mt)).strftime('%a, %d %b %Y %H:%M:%S GMT') | |
98 | else: | |
99 | self.last_modified = k.last_modified | |
100 | ||
101 | et = rk.metadata.get('rgwx-source-etag') | |
102 | if rk.etag.find('-') >= 0 or et.find('-') >= 0: | |
103 | # in this case we will use the source etag as it was uploaded via multipart upload | |
104 | # in one of the zones, so there's no way to make sure etags are calculated the same | |
105 | # way. In the other case we'd just want to keep the etag that was generated in the | |
106 | # regular upload mechanism, which should be consistent in both ends | |
107 | self.etag = et | |
108 | else: | |
109 | self.etag = rk.etag | |
110 | ||
111 | if k.etag[0] == '"' and self.etag[0] != '"': # inconsistent etag quoting when listing bucket vs object get | |
112 | self.etag = '"' + self.etag + '"' | |
113 | ||
114 | new_meta = {} | |
9f95a23c | 115 | for meta_key, meta_val in k.metadata.items(): |
11fdf7f2 TL |
116 | if not meta_key.startswith('rgwx-'): |
117 | new_meta[meta_key] = meta_val | |
118 | ||
119 | self.metadata = new_meta | |
120 | ||
121 | self.cache_control = k.cache_control | |
122 | self.content_type = k.content_type | |
123 | self.content_encoding = k.content_encoding | |
124 | self.content_disposition = k.content_disposition | |
125 | self.content_language = k.content_language | |
126 | ||
127 | ||
e306af50 TL |
128 | def get_contents_as_string(self, encoding=None): |
129 | r = self.key.get_contents_as_string(encoding=encoding) | |
11fdf7f2 TL |
130 | |
131 | # the previous call changed the status of the source object, as it loaded | |
132 | # its metadata | |
133 | ||
134 | self.rgwx_key = self.key | |
135 | self.update() | |
136 | ||
137 | return r | |
138 | ||
11fdf7f2 TL |
139 | |
140 | class CloudZoneBucket: | |
141 | def __init__(self, zone_conn, target_path, name): | |
142 | self.zone_conn = zone_conn | |
143 | self.name = name | |
144 | self.cloud_conn = zone_conn.zone.cloud_conn | |
145 | ||
146 | target_path = target_path[:] | |
147 | if target_path[-1] != '/': | |
148 | target_path += '/' | |
149 | target_path = target_path.replace('${bucket}', name) | |
150 | ||
151 | tp = target_path.split('/', 1) | |
152 | ||
153 | if len(tp) == 1: | |
154 | self.target_bucket = target_path | |
155 | self.target_prefix = '' | |
156 | else: | |
157 | self.target_bucket = tp[0] | |
158 | self.target_prefix = tp[1] | |
159 | ||
160 | log.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path, self.target_bucket, self.target_prefix) | |
161 | self.bucket = self.cloud_conn.get_bucket(self.target_bucket) | |
162 | ||
163 | def get_all_versions(self): | |
164 | l = [] | |
165 | ||
166 | for k in self.bucket.get_all_keys(prefix=self.target_prefix): | |
167 | new_key = CloudKey(self, k) | |
168 | ||
169 | log.debug('appending o=[\'%s\', \'%s\', \'%d\']', new_key.name, new_key.version_id, new_key.versioned_epoch) | |
170 | l.append(new_key) | |
171 | ||
172 | ||
173 | sort_key = lambda k: (k.name, -k.versioned_epoch) | |
174 | l.sort(key = sort_key) | |
175 | ||
176 | for new_key in l: | |
177 | yield new_key | |
178 | ||
179 | def get_key(self, name, version_id=None): | |
180 | return CloudKey(self, self.bucket.get_key(name, version_id=version_id)) | |
181 | ||
182 | ||
183 | def parse_endpoint(endpoint): | |
184 | o = urlparse(endpoint) | |
185 | ||
186 | netloc = o.netloc.split(':') | |
187 | ||
188 | host = netloc[0] | |
189 | ||
190 | if len(netloc) > 1: | |
191 | port = int(netloc[1]) | |
192 | else: | |
193 | port = o.port | |
194 | ||
195 | is_secure = False | |
196 | ||
197 | if o.scheme == 'https': | |
198 | is_secure = True | |
199 | ||
200 | if not port: | |
201 | if is_secure: | |
202 | port = 443 | |
203 | else: | |
204 | port = 80 | |
205 | ||
206 | return host, port, is_secure | |
207 | ||
208 | ||
209 | class CloudZone(Zone): | |
210 | def __init__(self, name, cloud_endpoint, credentials, source_bucket, target_path, | |
211 | zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None): | |
212 | self.cloud_endpoint = cloud_endpoint | |
213 | self.credentials = credentials | |
214 | self.source_bucket = source_bucket | |
215 | self.target_path = target_path | |
216 | ||
217 | self.target_path = self.target_path.replace('${zone}', name) | |
218 | # self.target_path = self.target_path.replace('${zone_id}', zone_id) | |
219 | self.target_path = self.target_path.replace('${zonegroup}', zonegroup.name) | |
220 | self.target_path = self.target_path.replace('${zonegroup_id}', zonegroup.id) | |
221 | ||
222 | log.debug('target_path=%s', self.target_path) | |
223 | ||
224 | host, port, is_secure = parse_endpoint(cloud_endpoint) | |
225 | ||
226 | self.cloud_conn = boto.connect_s3( | |
227 | aws_access_key_id = credentials.access_key, | |
228 | aws_secret_access_key = credentials.secret, | |
229 | host = host, | |
230 | port = port, | |
231 | is_secure = is_secure, | |
232 | calling_format = boto.s3.connection.OrdinaryCallingFormat()) | |
233 | super(CloudZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways) | |
234 | ||
235 | ||
236 | def is_read_only(self): | |
237 | return True | |
238 | ||
239 | def tier_type(self): | |
240 | return "cloud" | |
241 | ||
242 | def create(self, cluster, args = None, check_retcode = True): | |
243 | """ create the object with the given arguments """ | |
244 | ||
245 | if args is None: | |
246 | args = '' | |
247 | ||
248 | tier_config = ','.join([ 'connection.endpoint=' + self.cloud_endpoint, | |
249 | 'connection.access_key=' + self.credentials.access_key, | |
250 | 'connection.secret=' + self.credentials.secret, | |
251 | 'target_path=' + re.escape(self.target_path)]) | |
252 | ||
253 | args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ] | |
254 | ||
255 | return self.json_command(cluster, 'create', args, check_retcode=check_retcode) | |
256 | ||
257 | def has_buckets(self): | |
258 | return False | |
259 | ||
1e59de90 TL |
260 | def has_roles(self): |
261 | return False | |
262 | ||
11fdf7f2 TL |
263 | class Conn(ZoneConn): |
264 | def __init__(self, zone, credentials): | |
265 | super(CloudZone.Conn, self).__init__(zone, credentials) | |
266 | ||
267 | def get_bucket(self, bucket_name): | |
268 | return CloudZoneBucket(self, self.zone.target_path, bucket_name) | |
269 | ||
270 | def create_bucket(self, name): | |
271 | # should not be here, a bug in the test suite | |
272 | log.critical('Conn.create_bucket() should not be called in cloud zone') | |
273 | assert False | |
274 | ||
275 | def check_bucket_eq(self, zone_conn, bucket_name): | |
276 | assert(zone_conn.zone.tier_type() == "rados") | |
277 | ||
278 | log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name) | |
279 | b1 = self.get_bucket(bucket_name) | |
280 | b2 = zone_conn.get_bucket(bucket_name) | |
281 | ||
282 | log.debug('bucket1 objects:') | |
283 | for o in b1.get_all_versions(): | |
284 | log.debug('o=%s', o.name) | |
285 | log.debug('bucket2 objects:') | |
286 | for o in b2.get_all_versions(): | |
287 | log.debug('o=%s', o.name) | |
288 | ||
289 | for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): | |
290 | if k1 is None: | |
291 | log.critical('key=%s is missing from zone=%s', k2.name, self.name) | |
292 | assert False | |
293 | if k2 is None: | |
294 | log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name) | |
295 | assert False | |
296 | ||
297 | check_object_eq(k1, k2) | |
298 | ||
299 | ||
300 | log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) | |
301 | ||
302 | return True | |
303 | ||
39ae355f TL |
304 | def create_role(self, path, rolename, policy_document, tag_list): |
305 | assert False | |
306 | ||
11fdf7f2 TL |
307 | def get_conn(self, credentials): |
308 | return self.Conn(self, credentials) | |
309 | ||
310 | ||
311 | class CloudZoneConfig: | |
312 | def __init__(self, cfg, section): | |
313 | self.endpoint = cfg.get(section, 'endpoint') | |
314 | access_key = cfg.get(section, 'access_key') | |
315 | secret = cfg.get(section, 'secret') | |
316 | self.credentials = Credentials(access_key, secret) | |
317 | try: | |
318 | self.target_path = cfg.get(section, 'target_path') | |
319 | except: | |
320 | self.target_path = 'rgw-${zonegroup_id}/${bucket}' | |
321 | ||
322 | try: | |
323 | self.source_bucket = cfg.get(section, 'source_bucket') | |
324 | except: | |
325 | self.source_bucket = '*' | |
326 |