]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_cloud.py
322a19e6d1807a9559c5d7c332e1eefac585e811
[ceph.git] / ceph / src / test / rgw / rgw_multi / zone_cloud.py
1 import json
2 import requests.compat
3 import logging
4
5 import boto
6 import boto.s3.connection
7
8 import dateutil.parser
9 import datetime
10
11 import re
12
13 from nose.tools import eq_ as eq
14 from itertools import zip_longest # type: ignore
15 from urllib.parse import urlparse
16
17 from .multisite import *
18 from .tools import *
19
20 log = logging.getLogger(__name__)
21
22 def get_key_ver(k):
23 if not k.version_id:
24 return 'null'
25 return k.version_id
26
27 def unquote(s):
28 if s[0] == '"' and s[-1] == '"':
29 return s[1:-1]
30 return s
31
32 def check_object_eq(k1, k2, check_extra = True):
33 assert k1
34 assert k2
35 log.debug('comparing key name=%s', k1.name)
36 eq(k1.name, k2.name)
37 eq(k1.metadata, k2.metadata)
38 # eq(k1.cache_control, k2.cache_control)
39 eq(k1.content_type, k2.content_type)
40 eq(k1.content_encoding, k2.content_encoding)
41 eq(k1.content_disposition, k2.content_disposition)
42 eq(k1.content_language, k2.content_language)
43
44 eq(unquote(k1.etag), unquote(k2.etag))
45
46 mtime1 = dateutil.parser.parse(k1.last_modified)
47 mtime2 = dateutil.parser.parse(k2.last_modified)
48 log.debug('k1.last_modified=%s k2.last_modified=%s', k1.last_modified, k2.last_modified)
49 assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
50 # if check_extra:
51 # eq(k1.owner.id, k2.owner.id)
52 # eq(k1.owner.display_name, k2.owner.display_name)
53 # eq(k1.storage_class, k2.storage_class)
54 eq(k1.size, k2.size)
55 eq(get_key_ver(k1), get_key_ver(k2))
56 # eq(k1.encrypted, k2.encrypted)
57
58 def make_request(conn, method, bucket, key, query_args, headers):
59 result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers)
60 if result.status // 100 != 2:
61 raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
62 return result
63
64 class CloudKey:
65 def __init__(self, zone_bucket, k):
66 self.zone_bucket = zone_bucket
67
68 # we need two keys: when listing buckets, we get keys that only contain partial data
69 # but we need to have the full data so that we could use all the meta-rgwx- headers
70 # that are needed in order to create a correct representation of the object
71 self.key = k
72 self.rgwx_key = k # assuming k has all the meta info on, if not then we'll update it in update()
73 self.update()
74
75 def update(self):
76 k = self.key
77 rk = self.rgwx_key
78
79 self.size = rk.size
80 orig_name = rk.metadata.get('rgwx-source-key')
81 if not orig_name:
82 self.rgwx_key = self.zone_bucket.bucket.get_key(k.name, version_id = k.version_id)
83 rk = self.rgwx_key
84 orig_name = rk.metadata.get('rgwx-source-key')
85
86 self.name = orig_name
87 self.version_id = rk.metadata.get('rgwx-source-version-id')
88
89 ve = rk.metadata.get('rgwx-versioned-epoch')
90 if ve:
91 self.versioned_epoch = int(ve)
92 else:
93 self.versioned_epoch = 0
94
95 mt = rk.metadata.get('rgwx-source-mtime')
96 if mt:
97 self.last_modified = datetime.datetime.utcfromtimestamp(float(mt)).strftime('%a, %d %b %Y %H:%M:%S GMT')
98 else:
99 self.last_modified = k.last_modified
100
101 et = rk.metadata.get('rgwx-source-etag')
102 if rk.etag.find('-') >= 0 or et.find('-') >= 0:
103 # in this case we will use the source etag as it was uploaded via multipart upload
104 # in one of the zones, so there's no way to make sure etags are calculated the same
105 # way. In the other case we'd just want to keep the etag that was generated in the
106 # regular upload mechanism, which should be consistent in both ends
107 self.etag = et
108 else:
109 self.etag = rk.etag
110
111 if k.etag[0] == '"' and self.etag[0] != '"': # inconsistent etag quoting when listing bucket vs object get
112 self.etag = '"' + self.etag + '"'
113
114 new_meta = {}
115 for meta_key, meta_val in k.metadata.items():
116 if not meta_key.startswith('rgwx-'):
117 new_meta[meta_key] = meta_val
118
119 self.metadata = new_meta
120
121 self.cache_control = k.cache_control
122 self.content_type = k.content_type
123 self.content_encoding = k.content_encoding
124 self.content_disposition = k.content_disposition
125 self.content_language = k.content_language
126
127
128 def get_contents_as_string(self, encoding=None):
129 r = self.key.get_contents_as_string(encoding=encoding)
130
131 # the previous call changed the status of the source object, as it loaded
132 # its metadata
133
134 self.rgwx_key = self.key
135 self.update()
136
137 return r
138
139
140 class CloudZoneBucket:
141 def __init__(self, zone_conn, target_path, name):
142 self.zone_conn = zone_conn
143 self.name = name
144 self.cloud_conn = zone_conn.zone.cloud_conn
145
146 target_path = target_path[:]
147 if target_path[-1] != '/':
148 target_path += '/'
149 target_path = target_path.replace('${bucket}', name)
150
151 tp = target_path.split('/', 1)
152
153 if len(tp) == 1:
154 self.target_bucket = target_path
155 self.target_prefix = ''
156 else:
157 self.target_bucket = tp[0]
158 self.target_prefix = tp[1]
159
160 log.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path, self.target_bucket, self.target_prefix)
161 self.bucket = self.cloud_conn.get_bucket(self.target_bucket)
162
163 def get_all_versions(self):
164 l = []
165
166 for k in self.bucket.get_all_keys(prefix=self.target_prefix):
167 new_key = CloudKey(self, k)
168
169 log.debug('appending o=[\'%s\', \'%s\', \'%d\']', new_key.name, new_key.version_id, new_key.versioned_epoch)
170 l.append(new_key)
171
172
173 sort_key = lambda k: (k.name, -k.versioned_epoch)
174 l.sort(key = sort_key)
175
176 for new_key in l:
177 yield new_key
178
179 def get_key(self, name, version_id=None):
180 return CloudKey(self, self.bucket.get_key(name, version_id=version_id))
181
182
183 def parse_endpoint(endpoint):
184 o = urlparse(endpoint)
185
186 netloc = o.netloc.split(':')
187
188 host = netloc[0]
189
190 if len(netloc) > 1:
191 port = int(netloc[1])
192 else:
193 port = o.port
194
195 is_secure = False
196
197 if o.scheme == 'https':
198 is_secure = True
199
200 if not port:
201 if is_secure:
202 port = 443
203 else:
204 port = 80
205
206 return host, port, is_secure
207
208
209 class CloudZone(Zone):
210 def __init__(self, name, cloud_endpoint, credentials, source_bucket, target_path,
211 zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
212 self.cloud_endpoint = cloud_endpoint
213 self.credentials = credentials
214 self.source_bucket = source_bucket
215 self.target_path = target_path
216
217 self.target_path = self.target_path.replace('${zone}', name)
218 # self.target_path = self.target_path.replace('${zone_id}', zone_id)
219 self.target_path = self.target_path.replace('${zonegroup}', zonegroup.name)
220 self.target_path = self.target_path.replace('${zonegroup_id}', zonegroup.id)
221
222 log.debug('target_path=%s', self.target_path)
223
224 host, port, is_secure = parse_endpoint(cloud_endpoint)
225
226 self.cloud_conn = boto.connect_s3(
227 aws_access_key_id = credentials.access_key,
228 aws_secret_access_key = credentials.secret,
229 host = host,
230 port = port,
231 is_secure = is_secure,
232 calling_format = boto.s3.connection.OrdinaryCallingFormat())
233 super(CloudZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
234
235
236 def is_read_only(self):
237 return True
238
239 def tier_type(self):
240 return "cloud"
241
242 def create(self, cluster, args = None, check_retcode = True):
243 """ create the object with the given arguments """
244
245 if args is None:
246 args = ''
247
248 tier_config = ','.join([ 'connection.endpoint=' + self.cloud_endpoint,
249 'connection.access_key=' + self.credentials.access_key,
250 'connection.secret=' + self.credentials.secret,
251 'target_path=' + re.escape(self.target_path)])
252
253 args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ]
254
255 return self.json_command(cluster, 'create', args, check_retcode=check_retcode)
256
257 def has_buckets(self):
258 return False
259
260 class Conn(ZoneConn):
261 def __init__(self, zone, credentials):
262 super(CloudZone.Conn, self).__init__(zone, credentials)
263
264 def get_bucket(self, bucket_name):
265 return CloudZoneBucket(self, self.zone.target_path, bucket_name)
266
267 def create_bucket(self, name):
268 # should not be here, a bug in the test suite
269 log.critical('Conn.create_bucket() should not be called in cloud zone')
270 assert False
271
272 def check_bucket_eq(self, zone_conn, bucket_name):
273 assert(zone_conn.zone.tier_type() == "rados")
274
275 log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
276 b1 = self.get_bucket(bucket_name)
277 b2 = zone_conn.get_bucket(bucket_name)
278
279 log.debug('bucket1 objects:')
280 for o in b1.get_all_versions():
281 log.debug('o=%s', o.name)
282 log.debug('bucket2 objects:')
283 for o in b2.get_all_versions():
284 log.debug('o=%s', o.name)
285
286 for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
287 if k1 is None:
288 log.critical('key=%s is missing from zone=%s', k2.name, self.name)
289 assert False
290 if k2 is None:
291 log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
292 assert False
293
294 check_object_eq(k1, k2)
295
296
297 log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
298
299 return True
300
301 def get_conn(self, credentials):
302 return self.Conn(self, credentials)
303
304
305 class CloudZoneConfig:
306 def __init__(self, cfg, section):
307 self.endpoint = cfg.get(section, 'endpoint')
308 access_key = cfg.get(section, 'access_key')
309 secret = cfg.get(section, 'secret')
310 self.credentials = Credentials(access_key, secret)
311 try:
312 self.target_path = cfg.get(section, 'target_path')
313 except:
314 self.target_path = 'rgw-${zonegroup_id}/${bucket}'
315
316 try:
317 self.source_bucket = cfg.get(section, 'source_bucket')
318 except:
319 self.source_bucket = '*'
320