]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/rgw/rgw_multi/zone_cloud.py
import 15.2.4
[ceph.git] / ceph / src / test / rgw / rgw_multi / zone_cloud.py
CommitLineData
11fdf7f2
TL
1import json
2import requests.compat
3import logging
4
5import boto
6import boto.s3.connection
7
8import dateutil.parser
9import datetime
10
11import re
12
13from nose.tools import eq_ as eq
14try:
9f95a23c 15 from itertools import izip_longest as zip_longest # type: ignore
11fdf7f2
TL
16except ImportError:
17 from itertools import zip_longest
18
19from six.moves.urllib.parse import urlparse
20
21from .multisite import *
22from .tools import *
23
24log = logging.getLogger(__name__)
25
26def get_key_ver(k):
27 if not k.version_id:
28 return 'null'
29 return k.version_id
30
31def unquote(s):
32 if s[0] == '"' and s[-1] == '"':
33 return s[1:-1]
34 return s
35
36def check_object_eq(k1, k2, check_extra = True):
37 assert k1
38 assert k2
39 log.debug('comparing key name=%s', k1.name)
40 eq(k1.name, k2.name)
41 eq(k1.metadata, k2.metadata)
42 # eq(k1.cache_control, k2.cache_control)
43 eq(k1.content_type, k2.content_type)
44 eq(k1.content_encoding, k2.content_encoding)
45 eq(k1.content_disposition, k2.content_disposition)
46 eq(k1.content_language, k2.content_language)
47
48 eq(unquote(k1.etag), unquote(k2.etag))
49
50 mtime1 = dateutil.parser.parse(k1.last_modified)
51 mtime2 = dateutil.parser.parse(k2.last_modified)
52 log.debug('k1.last_modified=%s k2.last_modified=%s', k1.last_modified, k2.last_modified)
53 assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
54 # if check_extra:
55 # eq(k1.owner.id, k2.owner.id)
56 # eq(k1.owner.display_name, k2.owner.display_name)
57 # eq(k1.storage_class, k2.storage_class)
58 eq(k1.size, k2.size)
59 eq(get_key_ver(k1), get_key_ver(k2))
60 # eq(k1.encrypted, k2.encrypted)
61
62def make_request(conn, method, bucket, key, query_args, headers):
63 result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers)
e306af50 64 if result.status // 100 != 2:
11fdf7f2
TL
65 raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
66 return result
67
68class CloudKey:
69 def __init__(self, zone_bucket, k):
70 self.zone_bucket = zone_bucket
71
72 # we need two keys: when listing buckets, we get keys that only contain partial data
73 # but we need to have the full data so that we could use all the meta-rgwx- headers
74 # that are needed in order to create a correct representation of the object
75 self.key = k
76 self.rgwx_key = k # assuming k has all the meta info on, if not then we'll update it in update()
77 self.update()
78
79 def update(self):
80 k = self.key
81 rk = self.rgwx_key
82
83 self.size = rk.size
84 orig_name = rk.metadata.get('rgwx-source-key')
85 if not orig_name:
86 self.rgwx_key = self.zone_bucket.bucket.get_key(k.name, version_id = k.version_id)
87 rk = self.rgwx_key
88 orig_name = rk.metadata.get('rgwx-source-key')
89
90 self.name = orig_name
91 self.version_id = rk.metadata.get('rgwx-source-version-id')
92
93 ve = rk.metadata.get('rgwx-versioned-epoch')
94 if ve:
95 self.versioned_epoch = int(ve)
96 else:
97 self.versioned_epoch = 0
98
99 mt = rk.metadata.get('rgwx-source-mtime')
100 if mt:
101 self.last_modified = datetime.datetime.utcfromtimestamp(float(mt)).strftime('%a, %d %b %Y %H:%M:%S GMT')
102 else:
103 self.last_modified = k.last_modified
104
105 et = rk.metadata.get('rgwx-source-etag')
106 if rk.etag.find('-') >= 0 or et.find('-') >= 0:
107 # in this case we will use the source etag as it was uploaded via multipart upload
108 # in one of the zones, so there's no way to make sure etags are calculated the same
109 # way. In the other case we'd just want to keep the etag that was generated in the
110 # regular upload mechanism, which should be consistent in both ends
111 self.etag = et
112 else:
113 self.etag = rk.etag
114
115 if k.etag[0] == '"' and self.etag[0] != '"': # inconsistent etag quoting when listing bucket vs object get
116 self.etag = '"' + self.etag + '"'
117
118 new_meta = {}
9f95a23c 119 for meta_key, meta_val in k.metadata.items():
11fdf7f2
TL
120 if not meta_key.startswith('rgwx-'):
121 new_meta[meta_key] = meta_val
122
123 self.metadata = new_meta
124
125 self.cache_control = k.cache_control
126 self.content_type = k.content_type
127 self.content_encoding = k.content_encoding
128 self.content_disposition = k.content_disposition
129 self.content_language = k.content_language
130
131
e306af50
TL
132 def get_contents_as_string(self, encoding=None):
133 r = self.key.get_contents_as_string(encoding=encoding)
11fdf7f2
TL
134
135 # the previous call changed the status of the source object, as it loaded
136 # its metadata
137
138 self.rgwx_key = self.key
139 self.update()
140
141 return r
142
11fdf7f2
TL
143
144class CloudZoneBucket:
145 def __init__(self, zone_conn, target_path, name):
146 self.zone_conn = zone_conn
147 self.name = name
148 self.cloud_conn = zone_conn.zone.cloud_conn
149
150 target_path = target_path[:]
151 if target_path[-1] != '/':
152 target_path += '/'
153 target_path = target_path.replace('${bucket}', name)
154
155 tp = target_path.split('/', 1)
156
157 if len(tp) == 1:
158 self.target_bucket = target_path
159 self.target_prefix = ''
160 else:
161 self.target_bucket = tp[0]
162 self.target_prefix = tp[1]
163
164 log.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path, self.target_bucket, self.target_prefix)
165 self.bucket = self.cloud_conn.get_bucket(self.target_bucket)
166
167 def get_all_versions(self):
168 l = []
169
170 for k in self.bucket.get_all_keys(prefix=self.target_prefix):
171 new_key = CloudKey(self, k)
172
173 log.debug('appending o=[\'%s\', \'%s\', \'%d\']', new_key.name, new_key.version_id, new_key.versioned_epoch)
174 l.append(new_key)
175
176
177 sort_key = lambda k: (k.name, -k.versioned_epoch)
178 l.sort(key = sort_key)
179
180 for new_key in l:
181 yield new_key
182
183 def get_key(self, name, version_id=None):
184 return CloudKey(self, self.bucket.get_key(name, version_id=version_id))
185
186
187def parse_endpoint(endpoint):
188 o = urlparse(endpoint)
189
190 netloc = o.netloc.split(':')
191
192 host = netloc[0]
193
194 if len(netloc) > 1:
195 port = int(netloc[1])
196 else:
197 port = o.port
198
199 is_secure = False
200
201 if o.scheme == 'https':
202 is_secure = True
203
204 if not port:
205 if is_secure:
206 port = 443
207 else:
208 port = 80
209
210 return host, port, is_secure
211
212
213class CloudZone(Zone):
214 def __init__(self, name, cloud_endpoint, credentials, source_bucket, target_path,
215 zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
216 self.cloud_endpoint = cloud_endpoint
217 self.credentials = credentials
218 self.source_bucket = source_bucket
219 self.target_path = target_path
220
221 self.target_path = self.target_path.replace('${zone}', name)
222 # self.target_path = self.target_path.replace('${zone_id}', zone_id)
223 self.target_path = self.target_path.replace('${zonegroup}', zonegroup.name)
224 self.target_path = self.target_path.replace('${zonegroup_id}', zonegroup.id)
225
226 log.debug('target_path=%s', self.target_path)
227
228 host, port, is_secure = parse_endpoint(cloud_endpoint)
229
230 self.cloud_conn = boto.connect_s3(
231 aws_access_key_id = credentials.access_key,
232 aws_secret_access_key = credentials.secret,
233 host = host,
234 port = port,
235 is_secure = is_secure,
236 calling_format = boto.s3.connection.OrdinaryCallingFormat())
237 super(CloudZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
238
239
240 def is_read_only(self):
241 return True
242
243 def tier_type(self):
244 return "cloud"
245
246 def create(self, cluster, args = None, check_retcode = True):
247 """ create the object with the given arguments """
248
249 if args is None:
250 args = ''
251
252 tier_config = ','.join([ 'connection.endpoint=' + self.cloud_endpoint,
253 'connection.access_key=' + self.credentials.access_key,
254 'connection.secret=' + self.credentials.secret,
255 'target_path=' + re.escape(self.target_path)])
256
257 args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ]
258
259 return self.json_command(cluster, 'create', args, check_retcode=check_retcode)
260
261 def has_buckets(self):
262 return False
263
264 class Conn(ZoneConn):
265 def __init__(self, zone, credentials):
266 super(CloudZone.Conn, self).__init__(zone, credentials)
267
268 def get_bucket(self, bucket_name):
269 return CloudZoneBucket(self, self.zone.target_path, bucket_name)
270
271 def create_bucket(self, name):
272 # should not be here, a bug in the test suite
273 log.critical('Conn.create_bucket() should not be called in cloud zone')
274 assert False
275
276 def check_bucket_eq(self, zone_conn, bucket_name):
277 assert(zone_conn.zone.tier_type() == "rados")
278
279 log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
280 b1 = self.get_bucket(bucket_name)
281 b2 = zone_conn.get_bucket(bucket_name)
282
283 log.debug('bucket1 objects:')
284 for o in b1.get_all_versions():
285 log.debug('o=%s', o.name)
286 log.debug('bucket2 objects:')
287 for o in b2.get_all_versions():
288 log.debug('o=%s', o.name)
289
290 for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
291 if k1 is None:
292 log.critical('key=%s is missing from zone=%s', k2.name, self.name)
293 assert False
294 if k2 is None:
295 log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
296 assert False
297
298 check_object_eq(k1, k2)
299
300
301 log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
302
303 return True
304
305 def get_conn(self, credentials):
306 return self.Conn(self, credentials)
307
308
309class CloudZoneConfig:
310 def __init__(self, cfg, section):
311 self.endpoint = cfg.get(section, 'endpoint')
312 access_key = cfg.get(section, 'access_key')
313 secret = cfg.get(section, 'secret')
314 self.credentials = Credentials(access_key, secret)
315 try:
316 self.target_path = cfg.get(section, 'target_path')
317 except:
318 self.target_path = 'rgw-${zonegroup_id}/${bucket}'
319
320 try:
321 self.source_bucket = cfg.get(section, 'source_bucket')
322 except:
323 self.source_bucket = '*'
324