]>
git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_cloud.py
6 import boto
.s3
.connection
13 from nose
.tools
import eq_
as eq
15 from itertools
import izip_longest
as zip_longest
# type: ignore
17 from itertools
import zip_longest
19 from six
.moves
.urllib
.parse
import urlparse
21 from .multisite
import *
24 log
= logging
.getLogger(__name__
)
32 if s
[0] == '"' and s
[-1] == '"':
36 def check_object_eq(k1
, k2
, check_extra
= True):
39 log
.debug('comparing key name=%s', k1
.name
)
41 eq(k1
.metadata
, k2
.metadata
)
42 # eq(k1.cache_control, k2.cache_control)
43 eq(k1
.content_type
, k2
.content_type
)
44 eq(k1
.content_encoding
, k2
.content_encoding
)
45 eq(k1
.content_disposition
, k2
.content_disposition
)
46 eq(k1
.content_language
, k2
.content_language
)
48 eq(unquote(k1
.etag
), unquote(k2
.etag
))
50 mtime1
= dateutil
.parser
.parse(k1
.last_modified
)
51 mtime2
= dateutil
.parser
.parse(k2
.last_modified
)
52 log
.debug('k1.last_modified=%s k2.last_modified=%s', k1
.last_modified
, k2
.last_modified
)
53 assert abs((mtime1
- mtime2
).total_seconds()) < 1 # handle different time resolution
55 # eq(k1.owner.id, k2.owner.id)
56 # eq(k1.owner.display_name, k2.owner.display_name)
57 # eq(k1.storage_class, k2.storage_class)
59 eq(get_key_ver(k1
), get_key_ver(k2
))
60 # eq(k1.encrypted, k2.encrypted)
62 def make_request(conn
, method
, bucket
, key
, query_args
, headers
):
63 result
= conn
.make_request(method
, bucket
=bucket
, key
=key
, query_args
=query_args
, headers
=headers
)
64 if result
.status
// 100 != 2:
65 raise boto
.exception
.S3ResponseError(result
.status
, result
.reason
, result
.read())
69 def __init__(self
, zone_bucket
, k
):
70 self
.zone_bucket
= zone_bucket
72 # we need two keys: when listing buckets, we get keys that only contain partial data
73 # but we need to have the full data so that we could use all the meta-rgwx- headers
74 # that are needed in order to create a correct representation of the object
76 self
.rgwx_key
= k
# assuming k has all the meta info on, if not then we'll update it in update()
84 orig_name
= rk
.metadata
.get('rgwx-source-key')
86 self
.rgwx_key
= self
.zone_bucket
.bucket
.get_key(k
.name
, version_id
= k
.version_id
)
88 orig_name
= rk
.metadata
.get('rgwx-source-key')
91 self
.version_id
= rk
.metadata
.get('rgwx-source-version-id')
93 ve
= rk
.metadata
.get('rgwx-versioned-epoch')
95 self
.versioned_epoch
= int(ve
)
97 self
.versioned_epoch
= 0
99 mt
= rk
.metadata
.get('rgwx-source-mtime')
101 self
.last_modified
= datetime
.datetime
.utcfromtimestamp(float(mt
)).strftime('%a, %d %b %Y %H:%M:%S GMT')
103 self
.last_modified
= k
.last_modified
105 et
= rk
.metadata
.get('rgwx-source-etag')
106 if rk
.etag
.find('-') >= 0 or et
.find('-') >= 0:
107 # in this case we will use the source etag as it was uploaded via multipart upload
108 # in one of the zones, so there's no way to make sure etags are calculated the same
109 # way. In the other case we'd just want to keep the etag that was generated in the
110 # regular upload mechanism, which should be consistent in both ends
115 if k
.etag
[0] == '"' and self
.etag
[0] != '"': # inconsistent etag quoting when listing bucket vs object get
116 self
.etag
= '"' + self
.etag
+ '"'
119 for meta_key
, meta_val
in k
.metadata
.items():
120 if not meta_key
.startswith('rgwx-'):
121 new_meta
[meta_key
] = meta_val
123 self
.metadata
= new_meta
125 self
.cache_control
= k
.cache_control
126 self
.content_type
= k
.content_type
127 self
.content_encoding
= k
.content_encoding
128 self
.content_disposition
= k
.content_disposition
129 self
.content_language
= k
.content_language
132 def get_contents_as_string(self
, encoding
=None):
133 r
= self
.key
.get_contents_as_string(encoding
=encoding
)
135 # the previous call changed the status of the source object, as it loaded
138 self
.rgwx_key
= self
.key
144 class CloudZoneBucket
:
145 def __init__(self
, zone_conn
, target_path
, name
):
146 self
.zone_conn
= zone_conn
148 self
.cloud_conn
= zone_conn
.zone
.cloud_conn
150 target_path
= target_path
[:]
151 if target_path
[-1] != '/':
153 target_path
= target_path
.replace('${bucket}', name
)
155 tp
= target_path
.split('/', 1)
158 self
.target_bucket
= target_path
159 self
.target_prefix
= ''
161 self
.target_bucket
= tp
[0]
162 self
.target_prefix
= tp
[1]
164 log
.debug('target_path=%s target_bucket=%s target_prefix=%s', target_path
, self
.target_bucket
, self
.target_prefix
)
165 self
.bucket
= self
.cloud_conn
.get_bucket(self
.target_bucket
)
167 def get_all_versions(self
):
170 for k
in self
.bucket
.get_all_keys(prefix
=self
.target_prefix
):
171 new_key
= CloudKey(self
, k
)
173 log
.debug('appending o=[\'%s\', \'%s\', \'%d\']', new_key
.name
, new_key
.version_id
, new_key
.versioned_epoch
)
177 sort_key
= lambda k
: (k
.name
, -k
.versioned_epoch
)
178 l
.sort(key
= sort_key
)
183 def get_key(self
, name
, version_id
=None):
184 return CloudKey(self
, self
.bucket
.get_key(name
, version_id
=version_id
))
187 def parse_endpoint(endpoint
):
188 o
= urlparse(endpoint
)
190 netloc
= o
.netloc
.split(':')
195 port
= int(netloc
[1])
201 if o
.scheme
== 'https':
210 return host
, port
, is_secure
213 class CloudZone(Zone
):
214 def __init__(self
, name
, cloud_endpoint
, credentials
, source_bucket
, target_path
,
215 zonegroup
= None, cluster
= None, data
= None, zone_id
= None, gateways
= None):
216 self
.cloud_endpoint
= cloud_endpoint
217 self
.credentials
= credentials
218 self
.source_bucket
= source_bucket
219 self
.target_path
= target_path
221 self
.target_path
= self
.target_path
.replace('${zone}', name
)
222 # self.target_path = self.target_path.replace('${zone_id}', zone_id)
223 self
.target_path
= self
.target_path
.replace('${zonegroup}', zonegroup
.name
)
224 self
.target_path
= self
.target_path
.replace('${zonegroup_id}', zonegroup
.id)
226 log
.debug('target_path=%s', self
.target_path
)
228 host
, port
, is_secure
= parse_endpoint(cloud_endpoint
)
230 self
.cloud_conn
= boto
.connect_s3(
231 aws_access_key_id
= credentials
.access_key
,
232 aws_secret_access_key
= credentials
.secret
,
235 is_secure
= is_secure
,
236 calling_format
= boto
.s3
.connection
.OrdinaryCallingFormat())
237 super(CloudZone
, self
).__init
__(name
, zonegroup
, cluster
, data
, zone_id
, gateways
)
240 def is_read_only(self
):
246 def create(self
, cluster
, args
= None, check_retcode
= True):
247 """ create the object with the given arguments """
252 tier_config
= ','.join([ 'connection.endpoint=' + self
.cloud_endpoint
,
253 'connection.access_key=' + self
.credentials
.access_key
,
254 'connection.secret=' + self
.credentials
.secret
,
255 'target_path=' + re
.escape(self
.target_path
)])
257 args
+= [ '--tier-type', self
.tier_type(), '--tier-config', tier_config
]
259 return self
.json_command(cluster
, 'create', args
, check_retcode
=check_retcode
)
261 def has_buckets(self
):
264 class Conn(ZoneConn
):
265 def __init__(self
, zone
, credentials
):
266 super(CloudZone
.Conn
, self
).__init
__(zone
, credentials
)
268 def get_bucket(self
, bucket_name
):
269 return CloudZoneBucket(self
, self
.zone
.target_path
, bucket_name
)
271 def create_bucket(self
, name
):
272 # should not be here, a bug in the test suite
273 log
.critical('Conn.create_bucket() should not be called in cloud zone')
276 def check_bucket_eq(self
, zone_conn
, bucket_name
):
277 assert(zone_conn
.zone
.tier_type() == "rados")
279 log
.info('comparing bucket=%s zones={%s, %s}', bucket_name
, self
.name
, self
.name
)
280 b1
= self
.get_bucket(bucket_name
)
281 b2
= zone_conn
.get_bucket(bucket_name
)
283 log
.debug('bucket1 objects:')
284 for o
in b1
.get_all_versions():
285 log
.debug('o=%s', o
.name
)
286 log
.debug('bucket2 objects:')
287 for o
in b2
.get_all_versions():
288 log
.debug('o=%s', o
.name
)
290 for k1
, k2
in zip_longest(b1
.get_all_versions(), b2
.get_all_versions()):
292 log
.critical('key=%s is missing from zone=%s', k2
.name
, self
.name
)
295 log
.critical('key=%s is missing from zone=%s', k1
.name
, zone_conn
.name
)
298 check_object_eq(k1
, k2
)
301 log
.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name
, self
.name
, zone_conn
.name
)
305 def get_conn(self
, credentials
):
306 return self
.Conn(self
, credentials
)
309 class CloudZoneConfig
:
310 def __init__(self
, cfg
, section
):
311 self
.endpoint
= cfg
.get(section
, 'endpoint')
312 access_key
= cfg
.get(section
, 'access_key')
313 secret
= cfg
.get(section
, 'secret')
314 self
.credentials
= Credentials(access_key
, secret
)
316 self
.target_path
= cfg
.get(section
, 'target_path')
318 self
.target_path
= 'rgw-${zonegroup_id}/${bucket}'
321 self
.source_bucket
= cfg
.get(section
, 'source_bucket')
323 self
.source_bucket
= '*'