]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | import json |
2 | import requests.compat | |
3 | import logging | |
4 | ||
5 | import boto | |
6 | import boto.s3.connection | |
7 | ||
8 | import dateutil.parser | |
9 | import datetime | |
10 | ||
11 | import re | |
12 | ||
13 | from nose.tools import eq_ as eq | |
14 | try: | |
9f95a23c | 15 | from itertools import izip_longest as zip_longest # type: ignore |
11fdf7f2 TL |
16 | except ImportError: |
17 | from itertools import zip_longest | |
18 | ||
19 | from six.moves.urllib.parse import urlparse | |
20 | ||
21 | from .multisite import * | |
22 | from .tools import * | |
23 | ||
24 | log = logging.getLogger(__name__) | |
25 | ||
26 | def get_key_ver(k): | |
27 | if not k.version_id: | |
28 | return 'null' | |
29 | return k.version_id | |
30 | ||
31 | def unquote(s): | |
32 | if s[0] == '"' and s[-1] == '"': | |
33 | return s[1:-1] | |
34 | return s | |
35 | ||
36 | def check_object_eq(k1, k2, check_extra = True): | |
37 | assert k1 | |
38 | assert k2 | |
39 | log.debug('comparing key name=%s', k1.name) | |
40 | eq(k1.name, k2.name) | |
41 | eq(k1.metadata, k2.metadata) | |
42 | # eq(k1.cache_control, k2.cache_control) | |
43 | eq(k1.content_type, k2.content_type) | |
44 | eq(k1.content_encoding, k2.content_encoding) | |
45 | eq(k1.content_disposition, k2.content_disposition) | |
46 | eq(k1.content_language, k2.content_language) | |
47 | ||
48 | eq(unquote(k1.etag), unquote(k2.etag)) | |
49 | ||
50 | mtime1 = dateutil.parser.parse(k1.last_modified) | |
51 | mtime2 = dateutil.parser.parse(k2.last_modified) | |
52 | log.debug('k1.last_modified=%s k2.last_modified=%s', k1.last_modified, k2.last_modified) | |
53 | assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution | |
54 | # if check_extra: | |
55 | # eq(k1.owner.id, k2.owner.id) | |
56 | # eq(k1.owner.display_name, k2.owner.display_name) | |
57 | # eq(k1.storage_class, k2.storage_class) | |
58 | eq(k1.size, k2.size) | |
59 | eq(get_key_ver(k1), get_key_ver(k2)) | |
60 | # eq(k1.encrypted, k2.encrypted) | |
61 | ||
62 | def make_request(conn, method, bucket, key, query_args, headers): | |
63 | result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers) | |
e306af50 | 64 | if result.status // 100 != 2: |
11fdf7f2 TL |
65 | raise boto.exception.S3ResponseError(result.status, result.reason, result.read()) |
66 | return result | |
67 | ||
68 | class CloudKey: | |
69 | def __init__(self, zone_bucket, k): | |
70 | self.zone_bucket = zone_bucket | |
71 | ||
72 | # we need two keys: when listing buckets, we get keys that only contain partial data | |
73 | # but we need to have the full data so that we could use all the meta-rgwx- headers | |
74 | # that are needed in order to create a correct representation of the object | |
75 | self.key = k | |
76 | self.rgwx_key = k # assuming k has all the meta info on, if not then we'll update it in update() | |
77 | self.update() | |
78 | ||
79 | def update(self): | |
80 | k = self.key | |
81 | rk = self.rgwx_key | |
82 | ||
83 | self.size = rk.size | |
84 | orig_name = rk.metadata.get('rgwx-source-key') | |
85 | if not orig_name: | |
86 | self.rgwx_key = self.zone_bucket.bucket.get_key(k.name, version_id = k.version_id) | |
87 | rk = self.rgwx_key | |
88 | orig_name = rk.metadata.get('rgwx-source-key') | |
89 | ||
90 | self.name = orig_name | |
91 | self.version_id = rk.metadata.get('rgwx-source-version-id') | |
92 | ||
93 | ve = rk.metadata.get('rgwx-versioned-epoch') | |
94 | if ve: | |
95 | self.versioned_epoch = int(ve) | |
96 | else: | |
97 | self.versioned_epoch = 0 | |
98 | ||
99 | mt = rk.metadata.get('rgwx-source-mtime') | |
100 | if mt: | |
101 | self.last_modified = datetime.datetime.utcfromtimestamp(float(mt)).strftime('%a, %d %b %Y %H:%M:%S GMT') | |
102 | else: | |
103 | self.last_modified = k.last_modified | |
104 | ||
105 | et = rk.metadata.get('rgwx-source-etag') | |
106 | if rk.etag.find('-') >= 0 or et.find('-') >= 0: | |
107 | # in this case we will use the source etag as it was uploaded via multipart upload | |
108 | # in one of the zones, so there's no way to make sure etags are calculated the same | |
109 | # way. In the other case we'd just want to keep the etag that was generated in the | |
110 | # regular upload mechanism, which should be consistent in both ends | |
111 | self.etag = et | |
112 | else: | |
113 | self.etag = rk.etag | |
114 | ||
115 | if k.etag[0] == '"' and self.etag[0] != '"': # inconsistent etag quoting when listing bucket vs object get | |
116 | self.etag = '"' + self.etag + '"' | |
117 | ||
118 | new_meta = {} | |
9f95a23c | 119 | for meta_key, meta_val in k.metadata.items(): |
11fdf7f2 TL |
120 | if not meta_key.startswith('rgwx-'): |
121 | new_meta[meta_key] = meta_val | |
122 | ||
123 | self.metadata = new_meta | |
124 | ||
125 | self.cache_control = k.cache_control | |
126 | self.content_type = k.content_type | |
127 | self.content_encoding = k.content_encoding | |
128 | self.content_disposition = k.content_disposition | |
129 | self.content_language = k.content_language | |
130 | ||
131 | ||
e306af50 TL |
132 | def get_contents_as_string(self, encoding=None): |
133 | r = self.key.get_contents_as_string(encoding=encoding) | |
11fdf7f2 TL |
134 | |
135 | # the previous call changed the status of the source object, as it loaded | |
136 | # its metadata | |
137 | ||
138 | self.rgwx_key = self.key | |
139 | self.update() | |
140 | ||
141 | return r | |
142 | ||
11fdf7f2 TL |
143 | |
144 | class CloudZoneBucket: | |
145 | def __init__(self, zone_conn, target_path, name): | |
146 | self.zone_conn = zone_conn | |
147 | self.name = name | |
148 | self.cloud_conn = zone_conn.zone.cloud_conn | |
149 | ||
150 | target_path = target_path[:] | |
151 | if target_path[-1] != '/': | |
152 | target_path += '/' | |
153 | target_path = target_path.replace('${bucket}', name) | |
154 | ||
155 | tp = target_path.split('/', 1) | |
156 | ||
157 | if len(tp) == 1: | |
158 | self.target_bucket = target_path | |
159 | self.target_prefix = '' | |
160 | else: | |
161 | self.target_bucket = tp[0] | |
162 | self.target_prefix = tp[1] | |
163 | ||
164 | log.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path, self.target_bucket, self.target_prefix) | |
165 | self.bucket = self.cloud_conn.get_bucket(self.target_bucket) | |
166 | ||
167 | def get_all_versions(self): | |
168 | l = [] | |
169 | ||
170 | for k in self.bucket.get_all_keys(prefix=self.target_prefix): | |
171 | new_key = CloudKey(self, k) | |
172 | ||
173 | log.debug('appending o=[\'%s\', \'%s\', \'%d\']', new_key.name, new_key.version_id, new_key.versioned_epoch) | |
174 | l.append(new_key) | |
175 | ||
176 | ||
177 | sort_key = lambda k: (k.name, -k.versioned_epoch) | |
178 | l.sort(key = sort_key) | |
179 | ||
180 | for new_key in l: | |
181 | yield new_key | |
182 | ||
183 | def get_key(self, name, version_id=None): | |
184 | return CloudKey(self, self.bucket.get_key(name, version_id=version_id)) | |
185 | ||
186 | ||
187 | def parse_endpoint(endpoint): | |
188 | o = urlparse(endpoint) | |
189 | ||
190 | netloc = o.netloc.split(':') | |
191 | ||
192 | host = netloc[0] | |
193 | ||
194 | if len(netloc) > 1: | |
195 | port = int(netloc[1]) | |
196 | else: | |
197 | port = o.port | |
198 | ||
199 | is_secure = False | |
200 | ||
201 | if o.scheme == 'https': | |
202 | is_secure = True | |
203 | ||
204 | if not port: | |
205 | if is_secure: | |
206 | port = 443 | |
207 | else: | |
208 | port = 80 | |
209 | ||
210 | return host, port, is_secure | |
211 | ||
212 | ||
213 | class CloudZone(Zone): | |
214 | def __init__(self, name, cloud_endpoint, credentials, source_bucket, target_path, | |
215 | zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None): | |
216 | self.cloud_endpoint = cloud_endpoint | |
217 | self.credentials = credentials | |
218 | self.source_bucket = source_bucket | |
219 | self.target_path = target_path | |
220 | ||
221 | self.target_path = self.target_path.replace('${zone}', name) | |
222 | # self.target_path = self.target_path.replace('${zone_id}', zone_id) | |
223 | self.target_path = self.target_path.replace('${zonegroup}', zonegroup.name) | |
224 | self.target_path = self.target_path.replace('${zonegroup_id}', zonegroup.id) | |
225 | ||
226 | log.debug('target_path=%s', self.target_path) | |
227 | ||
228 | host, port, is_secure = parse_endpoint(cloud_endpoint) | |
229 | ||
230 | self.cloud_conn = boto.connect_s3( | |
231 | aws_access_key_id = credentials.access_key, | |
232 | aws_secret_access_key = credentials.secret, | |
233 | host = host, | |
234 | port = port, | |
235 | is_secure = is_secure, | |
236 | calling_format = boto.s3.connection.OrdinaryCallingFormat()) | |
237 | super(CloudZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways) | |
238 | ||
239 | ||
240 | def is_read_only(self): | |
241 | return True | |
242 | ||
243 | def tier_type(self): | |
244 | return "cloud" | |
245 | ||
246 | def create(self, cluster, args = None, check_retcode = True): | |
247 | """ create the object with the given arguments """ | |
248 | ||
249 | if args is None: | |
250 | args = '' | |
251 | ||
252 | tier_config = ','.join([ 'connection.endpoint=' + self.cloud_endpoint, | |
253 | 'connection.access_key=' + self.credentials.access_key, | |
254 | 'connection.secret=' + self.credentials.secret, | |
255 | 'target_path=' + re.escape(self.target_path)]) | |
256 | ||
257 | args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ] | |
258 | ||
259 | return self.json_command(cluster, 'create', args, check_retcode=check_retcode) | |
260 | ||
261 | def has_buckets(self): | |
262 | return False | |
263 | ||
264 | class Conn(ZoneConn): | |
265 | def __init__(self, zone, credentials): | |
266 | super(CloudZone.Conn, self).__init__(zone, credentials) | |
267 | ||
268 | def get_bucket(self, bucket_name): | |
269 | return CloudZoneBucket(self, self.zone.target_path, bucket_name) | |
270 | ||
271 | def create_bucket(self, name): | |
272 | # should not be here, a bug in the test suite | |
273 | log.critical('Conn.create_bucket() should not be called in cloud zone') | |
274 | assert False | |
275 | ||
276 | def check_bucket_eq(self, zone_conn, bucket_name): | |
277 | assert(zone_conn.zone.tier_type() == "rados") | |
278 | ||
279 | log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name) | |
280 | b1 = self.get_bucket(bucket_name) | |
281 | b2 = zone_conn.get_bucket(bucket_name) | |
282 | ||
283 | log.debug('bucket1 objects:') | |
284 | for o in b1.get_all_versions(): | |
285 | log.debug('o=%s', o.name) | |
286 | log.debug('bucket2 objects:') | |
287 | for o in b2.get_all_versions(): | |
288 | log.debug('o=%s', o.name) | |
289 | ||
290 | for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()): | |
291 | if k1 is None: | |
292 | log.critical('key=%s is missing from zone=%s', k2.name, self.name) | |
293 | assert False | |
294 | if k2 is None: | |
295 | log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name) | |
296 | assert False | |
297 | ||
298 | check_object_eq(k1, k2) | |
299 | ||
300 | ||
301 | log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name) | |
302 | ||
303 | return True | |
304 | ||
305 | def get_conn(self, credentials): | |
306 | return self.Conn(self, credentials) | |
307 | ||
308 | ||
309 | class CloudZoneConfig: | |
310 | def __init__(self, cfg, section): | |
311 | self.endpoint = cfg.get(section, 'endpoint') | |
312 | access_key = cfg.get(section, 'access_key') | |
313 | secret = cfg.get(section, 'secret') | |
314 | self.credentials = Credentials(access_key, secret) | |
315 | try: | |
316 | self.target_path = cfg.get(section, 'target_path') | |
317 | except: | |
318 | self.target_path = 'rgw-${zonegroup_id}/${bucket}' | |
319 | ||
320 | try: | |
321 | self.source_bucket = cfg.get(section, 'source_bucket') | |
322 | except: | |
323 | self.source_bucket = '*' | |
324 |