]>
git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_es.py
6 import boto
.s3
.connection
10 from nose
.tools
import eq_
as eq
12 from itertools
import izip_longest
as zip_longest
14 from itertools
import zip_longest
16 from .multisite
import *
19 log
= logging
.getLogger(__name__
)
26 def check_object_eq(k1
, k2
, check_extra
= True):
29 log
.debug('comparing key name=%s', k1
.name
)
31 eq(k1
.metadata
, k2
.metadata
)
32 # eq(k1.cache_control, k2.cache_control)
33 eq(k1
.content_type
, k2
.content_type
)
34 # eq(k1.content_encoding, k2.content_encoding)
35 # eq(k1.content_disposition, k2.content_disposition)
36 # eq(k1.content_language, k2.content_language)
38 mtime1
= dateutil
.parser
.parse(k1
.last_modified
)
39 mtime2
= dateutil
.parser
.parse(k2
.last_modified
)
40 assert abs((mtime1
- mtime2
).total_seconds()) < 1 # handle different time resolution
42 eq(k1
.owner
.id, k2
.owner
.id)
43 eq(k1
.owner
.display_name
, k2
.owner
.display_name
)
44 # eq(k1.storage_class, k2.storage_class)
46 eq(get_key_ver(k1
), get_key_ver(k2
))
47 # eq(k1.encrypted, k2.encrypted)
49 def make_request(conn
, method
, bucket
, key
, query_args
, headers
):
50 result
= conn
.make_request(method
, bucket
=bucket
, key
=key
, query_args
=query_args
, headers
=headers
)
51 if result
.status
/ 100 != 2:
52 raise boto
.exception
.S3ResponseError(result
.status
, result
.reason
, result
.read())
55 def append_query_arg(s
, n
, v
):
58 nv
= '{n}={v}'.format(n
=n
, v
=v
)
61 return '{s}&{nv}'.format(s
=s
, nv
=nv
)
64 def __init__(self
, conn
, bucket_name
, query
, query_args
= None, marker
= None):
66 self
.bucket_name
= bucket_name
or ''
68 self
.bucket
= boto
.s3
.bucket
.Bucket(name
=bucket_name
)
72 self
.query_args
= query_args
78 query_args
= append_query_arg(self
.query_args
, 'query', requests
.compat
.quote_plus(q
))
79 if self
.max_keys
is not None:
80 query_args
= append_query_arg(query_args
, 'max-keys', self
.max_keys
)
82 query_args
= append_query_arg(query_args
, 'marker', self
.marker
)
84 query_args
= append_query_arg(query_args
, 'format', 'json')
88 result
= make_request(self
.conn
, "GET", bucket
=self
.bucket_name
, key
='', query_args
=query_args
, headers
=headers
)
92 result_dict
= json
.loads(result
.read())
94 for entry
in result_dict
['Objects']:
95 bucket
= self
.conn
.get_bucket(entry
['Bucket'], validate
= False)
96 k
= boto
.s3
.key
.Key(bucket
, entry
['Key'])
98 k
.version_id
= entry
['Instance']
99 k
.etag
= entry
['ETag']
100 k
.owner
= boto
.s3
.user
.User(id=entry
['Owner']['ID'], display_name
=entry
['Owner']['DisplayName'])
101 k
.last_modified
= entry
['LastModified']
102 k
.size
= entry
['Size']
103 k
.content_type
= entry
['ContentType']
104 k
.versioned_epoch
= entry
['VersionedEpoch']
107 for e
in entry
['CustomMetadata']:
108 k
.metadata
[e
['Name']] = str(e
['Value']) # int values will return as int, cast to string for compatibility with object meta response
112 return result_dict
, l
114 def search(self
, drain
= True, sort
= True, sort_key
= None):
120 result
, result_keys
= self
.raw_search()
124 is_done
= not (drain
and (result
['IsTruncated'] == "true"))
125 marker
= result
['Marker']
129 sort_key
= lambda k
: (k
.name
, -k
.versioned_epoch
)
130 l
.sort(key
= sort_key
)
135 class MDSearchConfig
:
136 def __init__(self
, conn
, bucket_name
):
138 self
.bucket_name
= bucket_name
or ''
140 self
.bucket
= boto
.s3
.bucket
.Bucket(name
=bucket_name
)
144 def send_request(self
, conf
, method
):
145 query_args
= 'mdsearch'
148 headers
= { 'X-Amz-Meta-Search': conf
}
150 query_args
= append_query_arg(query_args
, 'format', 'json')
152 return make_request(self
.conn
, method
, bucket
=self
.bucket_name
, key
='', query_args
=query_args
, headers
=headers
)
154 def get_config(self
):
155 result
= self
.send_request(None, 'GET')
156 return json
.loads(result
.read())
158 def set_config(self
, conf
):
159 self
.send_request(conf
, 'POST')
161 def del_config(self
):
162 self
.send_request(None, 'DELETE')
166 def __init__(self
, zone_conn
, name
, conn
):
167 self
.zone_conn
= zone_conn
171 self
.bucket
= boto
.s3
.bucket
.Bucket(name
=name
)
173 def get_all_versions(self
):
178 req
= MDSearch(self
.conn
, self
.name
, 'bucket == ' + self
.name
, marker
=marker
)
180 for k
in req
.search():
187 def __init__(self
, name
, es_endpoint
, zonegroup
= None, cluster
= None, data
= None, zone_id
= None, gateways
= None):
188 self
.es_endpoint
= es_endpoint
189 super(ESZone
, self
).__init
__(name
, zonegroup
, cluster
, data
, zone_id
, gateways
)
191 def is_read_only(self
):
195 return "elasticsearch"
197 def create(self
, cluster
, args
= None, check_retcode
= True):
198 """ create the object with the given arguments """
203 tier_config
= ','.join([ 'endpoint=' + self
.es_endpoint
, 'explicit_custom_meta=false' ])
205 args
+= [ '--tier-type', self
.tier_type(), '--tier-config', tier_config
]
207 return self
.json_command(cluster
, 'create', args
, check_retcode
=check_retcode
)
209 def has_buckets(self
):
212 class Conn(ZoneConn
):
213 def __init__(self
, zone
, credentials
):
214 super(ESZone
.Conn
, self
).__init
__(zone
, credentials
)
216 def get_bucket(self
, bucket_name
):
217 return ESZoneBucket(self
, bucket_name
, self
.conn
)
219 def create_bucket(self
, name
):
220 # should not be here, a bug in the test suite
221 log
.critical('Conn.create_bucket() should not be called in ES zone')
224 def check_bucket_eq(self
, zone_conn
, bucket_name
):
225 assert(zone_conn
.zone
.tier_type() == "rados")
227 log
.info('comparing bucket=%s zones={%s, %s}', bucket_name
, self
.name
, self
.name
)
228 b1
= self
.get_bucket(bucket_name
)
229 b2
= zone_conn
.get_bucket(bucket_name
)
231 log
.debug('bucket1 objects:')
232 for o
in b1
.get_all_versions():
233 log
.debug('o=%s', o
.name
)
234 log
.debug('bucket2 objects:')
235 for o
in b2
.get_all_versions():
236 log
.debug('o=%s', o
.name
)
238 for k1
, k2
in zip_longest(b1
.get_all_versions(), b2
.get_all_versions()):
240 log
.critical('key=%s is missing from zone=%s', k2
.name
, self
.name
)
243 log
.critical('key=%s is missing from zone=%s', k1
.name
, zone_conn
.name
)
246 check_object_eq(k1
, k2
)
249 log
.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name
, self
.name
, zone_conn
.name
)
253 def get_conn(self
, credentials
):
254 return self
.Conn(self
, credentials
)