]>
git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_cloud.py
322a19e6d1807a9559c5d7c332e1eefac585e811
6 import boto
.s3
.connection
13 from nose
.tools
import eq_
as eq
14 from itertools
import zip_longest
# type: ignore
15 from urllib
.parse
import urlparse
17 from .multisite
import *
20 log
= logging
.getLogger(__name__
)
28 if s
[0] == '"' and s
[-1] == '"':
32 def check_object_eq(k1
, k2
, check_extra
= True):
35 log
.debug('comparing key name=%s', k1
.name
)
37 eq(k1
.metadata
, k2
.metadata
)
38 # eq(k1.cache_control, k2.cache_control)
39 eq(k1
.content_type
, k2
.content_type
)
40 eq(k1
.content_encoding
, k2
.content_encoding
)
41 eq(k1
.content_disposition
, k2
.content_disposition
)
42 eq(k1
.content_language
, k2
.content_language
)
44 eq(unquote(k1
.etag
), unquote(k2
.etag
))
46 mtime1
= dateutil
.parser
.parse(k1
.last_modified
)
47 mtime2
= dateutil
.parser
.parse(k2
.last_modified
)
48 log
.debug('k1.last_modified=%s k2.last_modified=%s', k1
.last_modified
, k2
.last_modified
)
49 assert abs((mtime1
- mtime2
).total_seconds()) < 1 # handle different time resolution
51 # eq(k1.owner.id, k2.owner.id)
52 # eq(k1.owner.display_name, k2.owner.display_name)
53 # eq(k1.storage_class, k2.storage_class)
55 eq(get_key_ver(k1
), get_key_ver(k2
))
56 # eq(k1.encrypted, k2.encrypted)
58 def make_request(conn
, method
, bucket
, key
, query_args
, headers
):
59 result
= conn
.make_request(method
, bucket
=bucket
, key
=key
, query_args
=query_args
, headers
=headers
)
60 if result
.status
// 100 != 2:
61 raise boto
.exception
.S3ResponseError(result
.status
, result
.reason
, result
.read())
65 def __init__(self
, zone_bucket
, k
):
66 self
.zone_bucket
= zone_bucket
68 # we need two keys: when listing buckets, we get keys that only contain partial data
69 # but we need to have the full data so that we could use all the meta-rgwx- headers
70 # that are needed in order to create a correct representation of the object
72 self
.rgwx_key
= k
# assuming k has all the meta info on, if not then we'll update it in update()
80 orig_name
= rk
.metadata
.get('rgwx-source-key')
82 self
.rgwx_key
= self
.zone_bucket
.bucket
.get_key(k
.name
, version_id
= k
.version_id
)
84 orig_name
= rk
.metadata
.get('rgwx-source-key')
87 self
.version_id
= rk
.metadata
.get('rgwx-source-version-id')
89 ve
= rk
.metadata
.get('rgwx-versioned-epoch')
91 self
.versioned_epoch
= int(ve
)
93 self
.versioned_epoch
= 0
95 mt
= rk
.metadata
.get('rgwx-source-mtime')
97 self
.last_modified
= datetime
.datetime
.utcfromtimestamp(float(mt
)).strftime('%a, %d %b %Y %H:%M:%S GMT')
99 self
.last_modified
= k
.last_modified
101 et
= rk
.metadata
.get('rgwx-source-etag')
102 if rk
.etag
.find('-') >= 0 or et
.find('-') >= 0:
103 # in this case we will use the source etag as it was uploaded via multipart upload
104 # in one of the zones, so there's no way to make sure etags are calculated the same
105 # way. In the other case we'd just want to keep the etag that was generated in the
106 # regular upload mechanism, which should be consistent in both ends
111 if k
.etag
[0] == '"' and self
.etag
[0] != '"': # inconsistent etag quoting when listing bucket vs object get
112 self
.etag
= '"' + self
.etag
+ '"'
115 for meta_key
, meta_val
in k
.metadata
.items():
116 if not meta_key
.startswith('rgwx-'):
117 new_meta
[meta_key
] = meta_val
119 self
.metadata
= new_meta
121 self
.cache_control
= k
.cache_control
122 self
.content_type
= k
.content_type
123 self
.content_encoding
= k
.content_encoding
124 self
.content_disposition
= k
.content_disposition
125 self
.content_language
= k
.content_language
128 def get_contents_as_string(self
, encoding
=None):
129 r
= self
.key
.get_contents_as_string(encoding
=encoding
)
131 # the previous call changed the status of the source object, as it loaded
134 self
.rgwx_key
= self
.key
140 class CloudZoneBucket
:
141 def __init__(self
, zone_conn
, target_path
, name
):
142 self
.zone_conn
= zone_conn
144 self
.cloud_conn
= zone_conn
.zone
.cloud_conn
146 target_path
= target_path
[:]
147 if target_path
[-1] != '/':
149 target_path
= target_path
.replace('${bucket}', name
)
151 tp
= target_path
.split('/', 1)
154 self
.target_bucket
= target_path
155 self
.target_prefix
= ''
157 self
.target_bucket
= tp
[0]
158 self
.target_prefix
= tp
[1]
160 log
.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path
, self
.target_bucket
, self
.target_prefix
)
161 self
.bucket
= self
.cloud_conn
.get_bucket(self
.target_bucket
)
163 def get_all_versions(self
):
166 for k
in self
.bucket
.get_all_keys(prefix
=self
.target_prefix
):
167 new_key
= CloudKey(self
, k
)
169 log
.debug('appending o=[\'%s\', \'%s\', \'%d\']', new_key
.name
, new_key
.version_id
, new_key
.versioned_epoch
)
173 sort_key
= lambda k
: (k
.name
, -k
.versioned_epoch
)
174 l
.sort(key
= sort_key
)
179 def get_key(self
, name
, version_id
=None):
180 return CloudKey(self
, self
.bucket
.get_key(name
, version_id
=version_id
))
183 def parse_endpoint(endpoint
):
184 o
= urlparse(endpoint
)
186 netloc
= o
.netloc
.split(':')
191 port
= int(netloc
[1])
197 if o
.scheme
== 'https':
206 return host
, port
, is_secure
209 class CloudZone(Zone
):
210 def __init__(self
, name
, cloud_endpoint
, credentials
, source_bucket
, target_path
,
211 zonegroup
= None, cluster
= None, data
= None, zone_id
= None, gateways
= None):
212 self
.cloud_endpoint
= cloud_endpoint
213 self
.credentials
= credentials
214 self
.source_bucket
= source_bucket
215 self
.target_path
= target_path
217 self
.target_path
= self
.target_path
.replace('${zone}', name
)
218 # self.target_path = self.target_path.replace('${zone_id}', zone_id)
219 self
.target_path
= self
.target_path
.replace('${zonegroup}', zonegroup
.name
)
220 self
.target_path
= self
.target_path
.replace('${zonegroup_id}', zonegroup
.id)
222 log
.debug('target_path=%s', self
.target_path
)
224 host
, port
, is_secure
= parse_endpoint(cloud_endpoint
)
226 self
.cloud_conn
= boto
.connect_s3(
227 aws_access_key_id
= credentials
.access_key
,
228 aws_secret_access_key
= credentials
.secret
,
231 is_secure
= is_secure
,
232 calling_format
= boto
.s3
.connection
.OrdinaryCallingFormat())
233 super(CloudZone
, self
).__init
__(name
, zonegroup
, cluster
, data
, zone_id
, gateways
)
236 def is_read_only(self
):
242 def create(self
, cluster
, args
= None, check_retcode
= True):
243 """ create the object with the given arguments """
248 tier_config
= ','.join([ 'connection.endpoint=' + self
.cloud_endpoint
,
249 'connection.access_key=' + self
.credentials
.access_key
,
250 'connection.secret=' + self
.credentials
.secret
,
251 'target_path=' + re
.escape(self
.target_path
)])
253 args
+= [ '--tier-type', self
.tier_type(), '--tier-config', tier_config
]
255 return self
.json_command(cluster
, 'create', args
, check_retcode
=check_retcode
)
257 def has_buckets(self
):
260 class Conn(ZoneConn
):
261 def __init__(self
, zone
, credentials
):
262 super(CloudZone
.Conn
, self
).__init
__(zone
, credentials
)
264 def get_bucket(self
, bucket_name
):
265 return CloudZoneBucket(self
, self
.zone
.target_path
, bucket_name
)
267 def create_bucket(self
, name
):
268 # should not be here, a bug in the test suite
269 log
.critical('Conn.create_bucket() should not be called in cloud zone')
272 def check_bucket_eq(self
, zone_conn
, bucket_name
):
273 assert(zone_conn
.zone
.tier_type() == "rados")
275 log
.info('comparing bucket=%s zones={%s, %s}', bucket_name
, self
.name
, self
.name
)
276 b1
= self
.get_bucket(bucket_name
)
277 b2
= zone_conn
.get_bucket(bucket_name
)
279 log
.debug('bucket1 objects:')
280 for o
in b1
.get_all_versions():
281 log
.debug('o=%s', o
.name
)
282 log
.debug('bucket2 objects:')
283 for o
in b2
.get_all_versions():
284 log
.debug('o=%s', o
.name
)
286 for k1
, k2
in zip_longest(b1
.get_all_versions(), b2
.get_all_versions()):
288 log
.critical('key=%s is missing from zone=%s', k2
.name
, self
.name
)
291 log
.critical('key=%s is missing from zone=%s', k1
.name
, zone_conn
.name
)
294 check_object_eq(k1
, k2
)
297 log
.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name
, self
.name
, zone_conn
.name
)
301 def get_conn(self
, credentials
):
302 return self
.Conn(self
, credentials
)
305 class CloudZoneConfig
:
306 def __init__(self
, cfg
, section
):
307 self
.endpoint
= cfg
.get(section
, 'endpoint')
308 access_key
= cfg
.get(section
, 'access_key')
309 secret
= cfg
.get(section
, 'secret')
310 self
.credentials
= Credentials(access_key
, secret
)
312 self
.target_path
= cfg
.get(section
, 'target_path')
314 self
.target_path
= 'rgw-${zonegroup_id}/${bucket}'
317 self
.source_bucket
= cfg
.get(section
, 'source_bucket')
319 self
.source_bucket
= '*'