]>
git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_es.py
204b9e47f2e3ee8dfcbe35d41c854d1d13b233c0
6 import boto
.s3
.connection
10 from nose
.tools
import eq_
as eq
11 from itertools
import zip_longest
# type: ignore
13 from .multisite
import *
16 log
= logging
.getLogger(__name__
)
23 def check_object_eq(k1
, k2
, check_extra
= True):
26 log
.debug('comparing key name=%s', k1
.name
)
28 eq(k1
.metadata
, k2
.metadata
)
29 # eq(k1.cache_control, k2.cache_control)
30 eq(k1
.content_type
, k2
.content_type
)
31 # eq(k1.content_encoding, k2.content_encoding)
32 # eq(k1.content_disposition, k2.content_disposition)
33 # eq(k1.content_language, k2.content_language)
35 mtime1
= dateutil
.parser
.parse(k1
.last_modified
)
36 mtime2
= dateutil
.parser
.parse(k2
.last_modified
)
37 assert abs((mtime1
- mtime2
).total_seconds()) < 1 # handle different time resolution
39 eq(k1
.owner
.id, k2
.owner
.id)
40 eq(k1
.owner
.display_name
, k2
.owner
.display_name
)
41 # eq(k1.storage_class, k2.storage_class)
43 eq(get_key_ver(k1
), get_key_ver(k2
))
44 # eq(k1.encrypted, k2.encrypted)
46 def make_request(conn
, method
, bucket
, key
, query_args
, headers
):
47 result
= conn
.make_request(method
, bucket
=bucket
, key
=key
, query_args
=query_args
, headers
=headers
)
48 if result
.status
// 100 != 2:
49 raise boto
.exception
.S3ResponseError(result
.status
, result
.reason
, result
.read())
54 def __init__(self
, conn
, bucket_name
, query
, query_args
= None, marker
= None):
56 self
.bucket_name
= bucket_name
or ''
58 self
.bucket
= boto
.s3
.bucket
.Bucket(name
=bucket_name
)
62 self
.query_args
= query_args
68 query_args
= append_query_arg(self
.query_args
, 'query', requests
.compat
.quote_plus(q
))
69 if self
.max_keys
is not None:
70 query_args
= append_query_arg(query_args
, 'max-keys', self
.max_keys
)
72 query_args
= append_query_arg(query_args
, 'marker', self
.marker
)
74 query_args
= append_query_arg(query_args
, 'format', 'json')
78 result
= make_request(self
.conn
, "GET", bucket
=self
.bucket_name
, key
='', query_args
=query_args
, headers
=headers
)
82 result_dict
= json
.loads(result
.read())
84 for entry
in result_dict
['Objects']:
85 bucket
= self
.conn
.get_bucket(entry
['Bucket'], validate
= False)
86 k
= boto
.s3
.key
.Key(bucket
, entry
['Key'])
88 k
.version_id
= entry
['Instance']
89 k
.etag
= entry
['ETag']
90 k
.owner
= boto
.s3
.user
.User(id=entry
['Owner']['ID'], display_name
=entry
['Owner']['DisplayName'])
91 k
.last_modified
= entry
['LastModified']
92 k
.size
= entry
['Size']
93 k
.content_type
= entry
['ContentType']
94 k
.versioned_epoch
= entry
['VersionedEpoch']
97 for e
in entry
['CustomMetadata']:
98 k
.metadata
[e
['Name']] = str(e
['Value']) # int values will return as int, cast to string for compatibility with object meta response
102 return result_dict
, l
104 def search(self
, drain
= True, sort
= True, sort_key
= None):
110 result
, result_keys
= self
.raw_search()
114 is_done
= not (drain
and (result
['IsTruncated'] == "true"))
115 marker
= result
['Marker']
119 sort_key
= lambda k
: (k
.name
, -k
.versioned_epoch
)
120 l
.sort(key
= sort_key
)
125 class MDSearchConfig
:
126 def __init__(self
, conn
, bucket_name
):
128 self
.bucket_name
= bucket_name
or ''
130 self
.bucket
= boto
.s3
.bucket
.Bucket(name
=bucket_name
)
134 def send_request(self
, conf
, method
):
135 query_args
= 'mdsearch'
138 headers
= { 'X-Amz-Meta-Search': conf
}
140 query_args
= append_query_arg(query_args
, 'format', 'json')
142 return make_request(self
.conn
, method
, bucket
=self
.bucket_name
, key
='', query_args
=query_args
, headers
=headers
)
144 def get_config(self
):
145 result
= self
.send_request(None, 'GET')
146 return json
.loads(result
.read())
148 def set_config(self
, conf
):
149 self
.send_request(conf
, 'POST')
151 def del_config(self
):
152 self
.send_request(None, 'DELETE')
156 def __init__(self
, zone_conn
, name
, conn
):
157 self
.zone_conn
= zone_conn
161 self
.bucket
= boto
.s3
.bucket
.Bucket(name
=name
)
163 def get_all_versions(self
):
168 req
= MDSearch(self
.conn
, self
.name
, 'bucket == ' + self
.name
, marker
=marker
)
170 for k
in req
.search():
177 def __init__(self
, name
, es_endpoint
, zonegroup
= None, cluster
= None, data
= None, zone_id
= None, gateways
= None):
178 self
.es_endpoint
= es_endpoint
179 super(ESZone
, self
).__init
__(name
, zonegroup
, cluster
, data
, zone_id
, gateways
)
181 def is_read_only(self
):
185 return "elasticsearch"
187 def create(self
, cluster
, args
= None, check_retcode
= True):
188 """ create the object with the given arguments """
193 tier_config
= ','.join([ 'endpoint=' + self
.es_endpoint
, 'explicit_custom_meta=false' ])
195 args
+= [ '--tier-type', self
.tier_type(), '--tier-config', tier_config
]
197 return self
.json_command(cluster
, 'create', args
, check_retcode
=check_retcode
)
199 def has_buckets(self
):
202 class Conn(ZoneConn
):
203 def __init__(self
, zone
, credentials
):
204 super(ESZone
.Conn
, self
).__init
__(zone
, credentials
)
206 def get_bucket(self
, bucket_name
):
207 return ESZoneBucket(self
, bucket_name
, self
.conn
)
209 def create_bucket(self
, name
):
210 # should not be here, a bug in the test suite
211 log
.critical('Conn.create_bucket() should not be called in ES zone')
214 def check_bucket_eq(self
, zone_conn
, bucket_name
):
215 assert(zone_conn
.zone
.tier_type() == "rados")
217 log
.info('comparing bucket=%s zones={%s, %s}', bucket_name
, self
.name
, self
.name
)
218 b1
= self
.get_bucket(bucket_name
)
219 b2
= zone_conn
.get_bucket(bucket_name
)
221 log
.debug('bucket1 objects:')
222 for o
in b1
.get_all_versions():
223 log
.debug('o=%s', o
.name
)
224 log
.debug('bucket2 objects:')
225 for o
in b2
.get_all_versions():
226 log
.debug('o=%s', o
.name
)
228 for k1
, k2
in zip_longest(b1
.get_all_versions(), b2
.get_all_versions()):
230 log
.critical('key=%s is missing from zone=%s', k2
.name
, self
.name
)
233 log
.critical('key=%s is missing from zone=%s', k1
.name
, zone_conn
.name
)
236 check_object_eq(k1
, k2
)
239 log
.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name
, self
.name
, zone_conn
.name
)
243 def get_conn(self
, credentials
):
244 return self
.Conn(self
, credentials
)
248 def __init__(self
, cfg
, section
):
249 self
.endpoint
= cfg
.get(section
, 'endpoint')