]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_es.py
204b9e47f2e3ee8dfcbe35d41c854d1d13b233c0
[ceph.git] / ceph / src / test / rgw / rgw_multi / zone_es.py
1 import json
2 import requests.compat
3 import logging
4
5 import boto
6 import boto.s3.connection
7
8 import dateutil.parser
9
10 from nose.tools import eq_ as eq
11 from itertools import zip_longest # type: ignore
12
13 from .multisite import *
14 from .tools import *
15
16 log = logging.getLogger(__name__)
17
18 def get_key_ver(k):
19 if not k.version_id:
20 return 'null'
21 return k.version_id
22
23 def check_object_eq(k1, k2, check_extra = True):
24 assert k1
25 assert k2
26 log.debug('comparing key name=%s', k1.name)
27 eq(k1.name, k2.name)
28 eq(k1.metadata, k2.metadata)
29 # eq(k1.cache_control, k2.cache_control)
30 eq(k1.content_type, k2.content_type)
31 # eq(k1.content_encoding, k2.content_encoding)
32 # eq(k1.content_disposition, k2.content_disposition)
33 # eq(k1.content_language, k2.content_language)
34 eq(k1.etag, k2.etag)
35 mtime1 = dateutil.parser.parse(k1.last_modified)
36 mtime2 = dateutil.parser.parse(k2.last_modified)
37 assert abs((mtime1 - mtime2).total_seconds()) < 1 # handle different time resolution
38 if check_extra:
39 eq(k1.owner.id, k2.owner.id)
40 eq(k1.owner.display_name, k2.owner.display_name)
41 # eq(k1.storage_class, k2.storage_class)
42 eq(k1.size, k2.size)
43 eq(get_key_ver(k1), get_key_ver(k2))
44 # eq(k1.encrypted, k2.encrypted)
45
46 def make_request(conn, method, bucket, key, query_args, headers):
47 result = conn.make_request(method, bucket=bucket, key=key, query_args=query_args, headers=headers)
48 if result.status // 100 != 2:
49 raise boto.exception.S3ResponseError(result.status, result.reason, result.read())
50 return result
51
52
53 class MDSearch:
54 def __init__(self, conn, bucket_name, query, query_args = None, marker = None):
55 self.conn = conn
56 self.bucket_name = bucket_name or ''
57 if bucket_name:
58 self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
59 else:
60 self.bucket = None
61 self.query = query
62 self.query_args = query_args
63 self.max_keys = None
64 self.marker = marker
65
66 def raw_search(self):
67 q = self.query or ''
68 query_args = append_query_arg(self.query_args, 'query', requests.compat.quote_plus(q))
69 if self.max_keys is not None:
70 query_args = append_query_arg(query_args, 'max-keys', self.max_keys)
71 if self.marker:
72 query_args = append_query_arg(query_args, 'marker', self.marker)
73
74 query_args = append_query_arg(query_args, 'format', 'json')
75
76 headers = {}
77
78 result = make_request(self.conn, "GET", bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
79
80 l = []
81
82 result_dict = json.loads(result.read())
83
84 for entry in result_dict['Objects']:
85 bucket = self.conn.get_bucket(entry['Bucket'], validate = False)
86 k = boto.s3.key.Key(bucket, entry['Key'])
87
88 k.version_id = entry['Instance']
89 k.etag = entry['ETag']
90 k.owner = boto.s3.user.User(id=entry['Owner']['ID'], display_name=entry['Owner']['DisplayName'])
91 k.last_modified = entry['LastModified']
92 k.size = entry['Size']
93 k.content_type = entry['ContentType']
94 k.versioned_epoch = entry['VersionedEpoch']
95
96 k.metadata = {}
97 for e in entry['CustomMetadata']:
98 k.metadata[e['Name']] = str(e['Value']) # int values will return as int, cast to string for compatibility with object meta response
99
100 l.append(k)
101
102 return result_dict, l
103
104 def search(self, drain = True, sort = True, sort_key = None):
105 l = []
106
107 is_done = False
108
109 while not is_done:
110 result, result_keys = self.raw_search()
111
112 l = l + result_keys
113
114 is_done = not (drain and (result['IsTruncated'] == "true"))
115 marker = result['Marker']
116
117 if sort:
118 if not sort_key:
119 sort_key = lambda k: (k.name, -k.versioned_epoch)
120 l.sort(key = sort_key)
121
122 return l
123
124
125 class MDSearchConfig:
126 def __init__(self, conn, bucket_name):
127 self.conn = conn
128 self.bucket_name = bucket_name or ''
129 if bucket_name:
130 self.bucket = boto.s3.bucket.Bucket(name=bucket_name)
131 else:
132 self.bucket = None
133
134 def send_request(self, conf, method):
135 query_args = 'mdsearch'
136 headers = None
137 if conf:
138 headers = { 'X-Amz-Meta-Search': conf }
139
140 query_args = append_query_arg(query_args, 'format', 'json')
141
142 return make_request(self.conn, method, bucket=self.bucket_name, key='', query_args=query_args, headers=headers)
143
144 def get_config(self):
145 result = self.send_request(None, 'GET')
146 return json.loads(result.read())
147
148 def set_config(self, conf):
149 self.send_request(conf, 'POST')
150
151 def del_config(self):
152 self.send_request(None, 'DELETE')
153
154
155 class ESZoneBucket:
156 def __init__(self, zone_conn, name, conn):
157 self.zone_conn = zone_conn
158 self.name = name
159 self.conn = conn
160
161 self.bucket = boto.s3.bucket.Bucket(name=name)
162
163 def get_all_versions(self):
164
165 marker = None
166 is_done = False
167
168 req = MDSearch(self.conn, self.name, 'bucket == ' + self.name, marker=marker)
169
170 for k in req.search():
171 yield k
172
173
174
175
176 class ESZone(Zone):
177 def __init__(self, name, es_endpoint, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
178 self.es_endpoint = es_endpoint
179 super(ESZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
180
181 def is_read_only(self):
182 return True
183
184 def tier_type(self):
185 return "elasticsearch"
186
187 def create(self, cluster, args = None, check_retcode = True):
188 """ create the object with the given arguments """
189
190 if args is None:
191 args = ''
192
193 tier_config = ','.join([ 'endpoint=' + self.es_endpoint, 'explicit_custom_meta=false' ])
194
195 args += [ '--tier-type', self.tier_type(), '--tier-config', tier_config ]
196
197 return self.json_command(cluster, 'create', args, check_retcode=check_retcode)
198
199 def has_buckets(self):
200 return False
201
202 class Conn(ZoneConn):
203 def __init__(self, zone, credentials):
204 super(ESZone.Conn, self).__init__(zone, credentials)
205
206 def get_bucket(self, bucket_name):
207 return ESZoneBucket(self, bucket_name, self.conn)
208
209 def create_bucket(self, name):
210 # should not be here, a bug in the test suite
211 log.critical('Conn.create_bucket() should not be called in ES zone')
212 assert False
213
214 def check_bucket_eq(self, zone_conn, bucket_name):
215 assert(zone_conn.zone.tier_type() == "rados")
216
217 log.info('comparing bucket=%s zones={%s, %s}', bucket_name, self.name, self.name)
218 b1 = self.get_bucket(bucket_name)
219 b2 = zone_conn.get_bucket(bucket_name)
220
221 log.debug('bucket1 objects:')
222 for o in b1.get_all_versions():
223 log.debug('o=%s', o.name)
224 log.debug('bucket2 objects:')
225 for o in b2.get_all_versions():
226 log.debug('o=%s', o.name)
227
228 for k1, k2 in zip_longest(b1.get_all_versions(), b2.get_all_versions()):
229 if k1 is None:
230 log.critical('key=%s is missing from zone=%s', k2.name, self.name)
231 assert False
232 if k2 is None:
233 log.critical('key=%s is missing from zone=%s', k1.name, zone_conn.name)
234 assert False
235
236 check_object_eq(k1, k2)
237
238
239 log.info('success, bucket identical: bucket=%s zones={%s, %s}', bucket_name, self.name, zone_conn.name)
240
241 return True
242
243 def get_conn(self, credentials):
244 return self.Conn(self, credentials)
245
246
247 class ESZoneConfig:
248 def __init__(self, cfg, section):
249 self.endpoint = cfg.get(section, 'endpoint')
250