]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_ps.py
e22200e2665eadf9be3b227daa5dd6d89cbec9ad
[ceph.git] / ceph / src / test / rgw / rgw_multi / zone_ps.py
1 import logging
2 import httplib
3 import ssl
4 import urllib
5 import urlparse
6 import hmac
7 import hashlib
8 import base64
9 import xmltodict
10 from time import gmtime, strftime
11 from .multisite import Zone
12 import boto3
13 from botocore.client import Config
14
15 log = logging.getLogger('rgw_multi.tests')
16
17 def put_object_tagging(conn, bucket_name, key, tags):
18 client = boto3.client('s3',
19 endpoint_url='http://'+conn.host+':'+str(conn.port),
20 aws_access_key_id=conn.aws_access_key_id,
21 aws_secret_access_key=conn.aws_secret_access_key,
22 config=Config(signature_version='s3'))
23 return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
24
25
26 def get_object_tagging(conn, bucket, object_key):
27 client = boto3.client('s3',
28 endpoint_url='http://'+conn.host+':'+str(conn.port),
29 aws_access_key_id=conn.aws_access_key_id,
30 aws_secret_access_key=conn.aws_secret_access_key,
31 config=Config(signature_version='s3'))
32 return client.get_object_tagging(
33 Bucket=bucket,
34 Key=object_key
35 )
36
37
38 class PSZone(Zone): # pylint: disable=too-many-ancestors
39 """ PubSub zone class """
40 def __init__(self, name, zonegroup=None, cluster=None, data=None, zone_id=None, gateways=None, full_sync='false', retention_days ='7'):
41 self.full_sync = full_sync
42 self.retention_days = retention_days
43 self.master_zone = zonegroup.master_zone
44 super(PSZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
45
46 def is_read_only(self):
47 return True
48
49 def tier_type(self):
50 return "pubsub"
51
52 def syncs_from(self, zone_name):
53 return zone_name == self.master_zone.name
54
55 def create(self, cluster, args=None, **kwargs):
56 if args is None:
57 args = ''
58 tier_config = ','.join(['start_with_full_sync=' + self.full_sync, 'event_retention_days=' + self.retention_days])
59 args += ['--tier-type', self.tier_type(), '--sync-from-all=0', '--sync-from', self.master_zone.name, '--tier-config', tier_config]
60 return self.json_command(cluster, 'create', args)
61
62 def has_buckets(self):
63 return False
64
65
66 NO_HTTP_BODY = ''
67
68
69 def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None):
70 """generic request sending to pubsub radogw
71 should cover: topics, notificatios and subscriptions
72 """
73 url_params = ''
74 if parameters is not None:
75 url_params = urllib.urlencode(parameters)
76 # remove 'None' from keys with no values
77 url_params = url_params.replace('=None', '')
78 url_params = '?' + url_params
79 if extra_parameters is not None:
80 url_params = url_params + '&' + extra_parameters
81 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
82 string_to_sign = method + '\n\n\n' + string_date + '\n' + resource
83 if sign_parameters:
84 string_to_sign += url_params
85 signature = base64.b64encode(hmac.new(conn.aws_secret_access_key,
86 string_to_sign.encode('utf-8'),
87 hashlib.sha1).digest())
88 headers = {'Authorization': 'AWS '+conn.aws_access_key_id+':'+signature,
89 'Date': string_date,
90 'Host': conn.host+':'+str(conn.port)}
91 http_conn = httplib.HTTPConnection(conn.host, conn.port)
92 if log.getEffectiveLevel() <= 10:
93 http_conn.set_debuglevel(5)
94 http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
95 response = http_conn.getresponse()
96 data = response.read()
97 status = response.status
98 http_conn.close()
99 return data, status
100
101
102 def print_connection_info(conn):
103 """print info of connection"""
104 print("Host: " + conn.host+':'+str(conn.port))
105 print("AWS Secret Key: " + conn.aws_secret_access_key)
106 print("AWS Access Key: " + conn.aws_access_key_id)
107
108
109 class PSTopic:
110 """class to set/get/delete a topic
111 PUT /topics/<topic name>[?push-endpoint=<endpoint>&[<arg1>=<value1>...]]
112 GET /topics/<topic name>
113 DELETE /topics/<topic name>
114 """
115 def __init__(self, conn, topic_name, endpoint=None, endpoint_args=None):
116 self.conn = conn
117 assert topic_name.strip()
118 self.resource = '/topics/'+topic_name
119 if endpoint is not None:
120 self.parameters = {'push-endpoint': endpoint}
121 self.extra_parameters = endpoint_args
122 else:
123 self.parameters = None
124 self.extra_parameters = None
125
126 def send_request(self, method, get_list=False, parameters=None, extra_parameters=None):
127 """send request to radosgw"""
128 if get_list:
129 return make_request(self.conn, method, '/topics')
130 return make_request(self.conn, method, self.resource,
131 parameters=parameters, extra_parameters=extra_parameters)
132
133 def get_config(self):
134 """get topic info"""
135 return self.send_request('GET')
136
137 def set_config(self):
138 """set topic"""
139 return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
140
141 def del_config(self):
142 """delete topic"""
143 return self.send_request('DELETE')
144
145 def get_list(self):
146 """list all topics"""
147 return self.send_request('GET', get_list=True)
148
149
150 def delete_all_s3_topics(zone, region):
151 try:
152 conn = zone.secure_conn if zone.secure_conn is not None else zone.conn
153 protocol = 'https' if conn.is_secure else 'http'
154 client = boto3.client('sns',
155 endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
156 aws_access_key_id=conn.aws_access_key_id,
157 aws_secret_access_key=conn.aws_secret_access_key,
158 region_name=region,
159 verify='./cert.pem',
160 config=Config(signature_version='s3'))
161
162 topics = client.list_topics()['Topics']
163 for topic in topics:
164 print('topic cleanup, deleting: ' + topic['TopicArn'])
165 assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
166 except Exception as err:
167 print('failed to do topic cleanup: ' + str(err))
168
169
170 def delete_all_objects(conn, bucket_name):
171 client = boto3.client('s3',
172 endpoint_url='http://'+conn.host+':'+str(conn.port),
173 aws_access_key_id=conn.aws_access_key_id,
174 aws_secret_access_key=conn.aws_secret_access_key)
175
176 objects = []
177 for key in client.list_objects(Bucket=bucket_name)['Contents']:
178 objects.append({'Key': key['Key']})
179 # delete objects from the bucket
180 response = client.delete_objects(Bucket=bucket_name,
181 Delete={'Objects': objects})
182 return response
183
184
185 class PSTopicS3:
186 """class to set/list/get/delete a topic
187 POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
188 POST ?Action=ListTopics
189 POST ?Action=GetTopic&TopicArn=<topic-arn>
190 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
191 """
192 def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
193 self.conn = conn
194 self.topic_name = topic_name.strip()
195 assert self.topic_name
196 self.topic_arn = ''
197 self.attributes = {}
198 if endpoint_args is not None:
199 self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
200 if opaque_data is not None:
201 self.attributes['OpaqueData'] = opaque_data
202 protocol = 'https' if conn.is_secure else 'http'
203 self.client = boto3.client('sns',
204 endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
205 aws_access_key_id=conn.aws_access_key_id,
206 aws_secret_access_key=conn.aws_secret_access_key,
207 region_name=region,
208 verify='./cert.pem',
209 config=Config(signature_version='s3'))
210
211
212 def get_config(self):
213 """get topic info"""
214 parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
215 body = urllib.urlencode(parameters)
216 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
217 content_type = 'application/x-www-form-urlencoded; charset=utf-8'
218 resource = '/'
219 method = 'POST'
220 string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
221 log.debug('StringTosign: %s', string_to_sign)
222 signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key,
223 string_to_sign.encode('utf-8'),
224 hashlib.sha1).digest())
225 headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
226 'Date': string_date,
227 'Host': self.conn.host+':'+str(self.conn.port),
228 'Content-Type': content_type}
229 if self.conn.is_secure:
230 http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port,
231 context=ssl.create_default_context(cafile='./cert.pem'))
232 else:
233 http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
234 http_conn.request(method, resource, body, headers)
235 response = http_conn.getresponse()
236 data = response.read()
237 status = response.status
238 http_conn.close()
239 dict_response = xmltodict.parse(data)
240 return dict_response, status
241
242 def set_config(self):
243 """set topic"""
244 result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
245 self.topic_arn = result['TopicArn']
246 return self.topic_arn
247
248 def del_config(self):
249 """delete topic"""
250 result = self.client.delete_topic(TopicArn=self.topic_arn)
251 return result['ResponseMetadata']['HTTPStatusCode']
252
253 def get_list(self):
254 """list all topics"""
255 # note that boto3 supports list_topics(), however, the result only show ARNs
256 parameters = {'Action': 'ListTopics'}
257 body = urllib.urlencode(parameters)
258 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
259 content_type = 'application/x-www-form-urlencoded; charset=utf-8'
260 resource = '/'
261 method = 'POST'
262 string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
263 log.debug('StringTosign: %s', string_to_sign)
264 signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key,
265 string_to_sign.encode('utf-8'),
266 hashlib.sha1).digest())
267 headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
268 'Date': string_date,
269 'Host': self.conn.host+':'+str(self.conn.port),
270 'Content-Type': content_type}
271 if self.conn.is_secure:
272 http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port,
273 context=ssl.create_default_context(cafile='./cert.pem'))
274 else:
275 http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
276 http_conn.request(method, resource, body, headers)
277 response = http_conn.getresponse()
278 data = response.read()
279 status = response.status
280 http_conn.close()
281 dict_response = xmltodict.parse(data)
282 return dict_response, status
283
284
285 class PSNotification:
286 """class to set/get/delete a notification
287 PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
288 GET /notifications/bucket/<bucket>
289 DELETE /notifications/bucket/<bucket>?topic=<topic-name>
290 """
291 def __init__(self, conn, bucket_name, topic_name, events=''):
292 self.conn = conn
293 assert bucket_name.strip()
294 assert topic_name.strip()
295 self.resource = '/notifications/bucket/'+bucket_name
296 if events.strip():
297 self.parameters = {'topic': topic_name, 'events': events}
298 else:
299 self.parameters = {'topic': topic_name}
300
301 def send_request(self, method, parameters=None):
302 """send request to radosgw"""
303 return make_request(self.conn, method, self.resource, parameters)
304
305 def get_config(self):
306 """get notification info"""
307 return self.send_request('GET')
308
309 def set_config(self):
310 """set notification"""
311 return self.send_request('PUT', self.parameters)
312
313 def del_config(self):
314 """delete notification"""
315 return self.send_request('DELETE', self.parameters)
316
317
318 class PSNotificationS3:
319 """class to set/get/delete an S3 notification
320 PUT /<bucket>?notification
321 GET /<bucket>?notification[=<notification>]
322 DELETE /<bucket>?notification[=<notification>]
323 """
324 def __init__(self, conn, bucket_name, topic_conf_list):
325 self.conn = conn
326 assert bucket_name.strip()
327 self.bucket_name = bucket_name
328 self.resource = '/'+bucket_name
329 self.topic_conf_list = topic_conf_list
330 self.client = boto3.client('s3',
331 endpoint_url='http://'+conn.host+':'+str(conn.port),
332 aws_access_key_id=conn.aws_access_key_id,
333 aws_secret_access_key=conn.aws_secret_access_key,
334 config=Config(signature_version='s3'))
335
336 def send_request(self, method, parameters=None):
337 """send request to radosgw"""
338 return make_request(self.conn, method, self.resource,
339 parameters=parameters, sign_parameters=True)
340
341 def get_config(self, notification=None):
342 """get notification info"""
343 parameters = None
344 if notification is None:
345 response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name)
346 status = response['ResponseMetadata']['HTTPStatusCode']
347 return response, status
348 parameters = {'notification': notification}
349 response, status = self.send_request('GET', parameters=parameters)
350 dict_response = xmltodict.parse(response)
351 return dict_response, status
352
353 def set_config(self):
354 """set notification"""
355 response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name,
356 NotificationConfiguration={
357 'TopicConfigurations': self.topic_conf_list
358 })
359 status = response['ResponseMetadata']['HTTPStatusCode']
360 return response, status
361
362 def del_config(self, notification=None):
363 """delete notification"""
364 parameters = {'notification': notification}
365
366 return self.send_request('DELETE', parameters)
367
368
369 class PSSubscription:
370 """class to set/get/delete a subscription:
371 PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
372 GET /subscriptions/<sub-name>
373 DELETE /subscriptions/<sub-name>
374 also to get list of events, and ack them:
375 GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
376 POST /subscriptions/<sub-name>?ack&event-id=<event-id>
377 """
378 def __init__(self, conn, sub_name, topic_name, endpoint=None, endpoint_args=None):
379 self.conn = conn
380 assert topic_name.strip()
381 self.resource = '/subscriptions/'+sub_name
382 if endpoint is not None:
383 self.parameters = {'topic': topic_name, 'push-endpoint': endpoint}
384 self.extra_parameters = endpoint_args
385 else:
386 self.parameters = {'topic': topic_name}
387 self.extra_parameters = None
388
389 def send_request(self, method, parameters=None, extra_parameters=None):
390 """send request to radosgw"""
391 return make_request(self.conn, method, self.resource,
392 parameters=parameters,
393 extra_parameters=extra_parameters)
394
395 def get_config(self):
396 """get subscription info"""
397 return self.send_request('GET')
398
399 def set_config(self):
400 """set subscription"""
401 return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
402
403 def del_config(self, topic=False):
404 """delete subscription"""
405 if topic:
406 return self.send_request('DELETE', self.parameters)
407 return self.send_request('DELETE')
408
409 def get_events(self, max_entries=None, marker=None):
410 """ get events from subscription """
411 parameters = {'events': None}
412 if max_entries is not None:
413 parameters['max-entries'] = max_entries
414 if marker is not None:
415 parameters['marker'] = marker
416 return self.send_request('GET', parameters)
417
418 def ack_events(self, event_id):
419 """ ack events in a subscription """
420 parameters = {'ack': None, 'event-id': event_id}
421 return self.send_request('POST', parameters)
422
423
424 class PSZoneConfig:
425 """ pubsub zone configuration """
426 def __init__(self, cfg, section):
427 self.full_sync = cfg.get(section, 'start_with_full_sync')
428 self.retention_days = cfg.get(section, 'retention_days')