]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_ps.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / test / rgw / rgw_multi / zone_ps.py
1 import logging
2 import ssl
3 import urllib
4 import hmac
5 import hashlib
6 import base64
7 import xmltodict
8 from http import client as http_client
9 from urllib import parse as urlparse
10 from time import gmtime, strftime
11 from .multisite import Zone
12 import boto3
13 from botocore.client import Config
14
15 log = logging.getLogger('rgw_multi.tests')
16
17 def put_object_tagging(conn, bucket_name, key, tags):
18 client = boto3.client('s3',
19 endpoint_url='http://'+conn.host+':'+str(conn.port),
20 aws_access_key_id=conn.aws_access_key_id,
21 aws_secret_access_key=conn.aws_secret_access_key,
22 config=Config(signature_version='s3'))
23 return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
24
25
26 def get_object_tagging(conn, bucket, object_key):
27 client = boto3.client('s3',
28 endpoint_url='http://'+conn.host+':'+str(conn.port),
29 aws_access_key_id=conn.aws_access_key_id,
30 aws_secret_access_key=conn.aws_secret_access_key,
31 config=Config(signature_version='s3'))
32 return client.get_object_tagging(
33 Bucket=bucket,
34 Key=object_key
35 )
36
37
38 class PSZone(Zone): # pylint: disable=too-many-ancestors
39 """ PubSub zone class """
40 def __init__(self, name, zonegroup=None, cluster=None, data=None, zone_id=None, gateways=None, full_sync='false', retention_days ='7'):
41 self.full_sync = full_sync
42 self.retention_days = retention_days
43 self.master_zone = zonegroup.master_zone
44 super(PSZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
45
46 def is_read_only(self):
47 return True
48
49 def tier_type(self):
50 return "pubsub"
51
52 def syncs_from(self, zone_name):
53 return zone_name == self.master_zone.name
54
55 def create(self, cluster, args=None, **kwargs):
56 if args is None:
57 args = ''
58 tier_config = ','.join(['start_with_full_sync=' + self.full_sync, 'event_retention_days=' + self.retention_days])
59 args += ['--tier-type', self.tier_type(), '--sync-from-all=0', '--sync-from', self.master_zone.name, '--tier-config', tier_config]
60 return self.json_command(cluster, 'create', args)
61
62 def has_buckets(self):
63 return False
64
65
66 NO_HTTP_BODY = ''
67
68
69 def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None):
70 """generic request sending to pubsub radogw
71 should cover: topics, notificatios and subscriptions
72 """
73 url_params = ''
74 if parameters is not None:
75 url_params = urlparse.urlencode(parameters)
76 # remove 'None' from keys with no values
77 url_params = url_params.replace('=None', '')
78 url_params = '?' + url_params
79 if extra_parameters is not None:
80 url_params = url_params + '&' + extra_parameters
81 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
82 string_to_sign = method + '\n\n\n' + string_date + '\n' + resource
83 if sign_parameters:
84 string_to_sign += url_params
85 signature = base64.b64encode(hmac.new(conn.aws_secret_access_key.encode('utf-8'),
86 string_to_sign.encode('utf-8'),
87 hashlib.sha1).digest()).decode('ascii')
88 headers = {'Authorization': 'AWS '+conn.aws_access_key_id+':'+signature,
89 'Date': string_date,
90 'Host': conn.host+':'+str(conn.port)}
91 http_conn = http_client.HTTPConnection(conn.host, conn.port)
92 if log.getEffectiveLevel() <= 10:
93 http_conn.set_debuglevel(5)
94 http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
95 response = http_conn.getresponse()
96 data = response.read()
97 status = response.status
98 http_conn.close()
99 return data.decode('utf-8'), status
100
101
102 def print_connection_info(conn):
103 """print info of connection"""
104 print("Host: " + conn.host+':'+str(conn.port))
105 print("AWS Secret Key: " + conn.aws_secret_access_key)
106 print("AWS Access Key: " + conn.aws_access_key_id)
107
108
109 class PSTopic:
110 """class to set/get/delete a topic
111 PUT /topics/<topic name>[?push-endpoint=<endpoint>&[<arg1>=<value1>...]]
112 GET /topics/<topic name>
113 DELETE /topics/<topic name>
114 """
115 def __init__(self, conn, topic_name, endpoint=None, endpoint_args=None):
116 self.conn = conn
117 assert topic_name.strip()
118 self.resource = '/topics/'+topic_name
119 if endpoint is not None:
120 self.parameters = {'push-endpoint': endpoint}
121 self.extra_parameters = endpoint_args
122 else:
123 self.parameters = None
124 self.extra_parameters = None
125
126 def send_request(self, method, get_list=False, parameters=None, extra_parameters=None):
127 """send request to radosgw"""
128 if get_list:
129 return make_request(self.conn, method, '/topics')
130 return make_request(self.conn, method, self.resource,
131 parameters=parameters, extra_parameters=extra_parameters)
132
133 def get_config(self):
134 """get topic info"""
135 return self.send_request('GET')
136
137 def set_config(self):
138 """set topic"""
139 return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
140
141 def del_config(self):
142 """delete topic"""
143 return self.send_request('DELETE')
144
145 def get_list(self):
146 """list all topics"""
147 return self.send_request('GET', get_list=True)
148
149
150 def delete_all_s3_topics(zone, region):
151 try:
152 conn = zone.secure_conn if zone.secure_conn is not None else zone.conn
153 protocol = 'https' if conn.is_secure else 'http'
154 client = boto3.client('sns',
155 endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
156 aws_access_key_id=conn.aws_access_key_id,
157 aws_secret_access_key=conn.aws_secret_access_key,
158 region_name=region,
159 verify='./cert.pem',
160 config=Config(signature_version='s3'))
161
162 topics = client.list_topics()['Topics']
163 for topic in topics:
164 print('topic cleanup, deleting: ' + topic['TopicArn'])
165 assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
166 except Exception as err:
167 print('failed to do topic cleanup: ' + str(err))
168
169
170 def delete_all_objects(conn, bucket_name):
171 client = boto3.client('s3',
172 endpoint_url='http://'+conn.host+':'+str(conn.port),
173 aws_access_key_id=conn.aws_access_key_id,
174 aws_secret_access_key=conn.aws_secret_access_key)
175
176 objects = []
177 for key in client.list_objects(Bucket=bucket_name)['Contents']:
178 objects.append({'Key': key['Key']})
179 # delete objects from the bucket
180 response = client.delete_objects(Bucket=bucket_name,
181 Delete={'Objects': objects})
182 return response
183
184
185 class PSTopicS3:
186 """class to set/list/get/delete a topic
187 POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
188 POST ?Action=ListTopics
189 POST ?Action=GetTopic&TopicArn=<topic-arn>
190 POST ?Action=GetTopicAttributes&TopicArn=<topic-arn>
191 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
192 """
193 def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
194 self.conn = conn
195 self.topic_name = topic_name.strip()
196 assert self.topic_name
197 self.topic_arn = ''
198 self.attributes = {}
199 if endpoint_args is not None:
200 self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
201 if opaque_data is not None:
202 self.attributes['OpaqueData'] = opaque_data
203 protocol = 'https' if conn.is_secure else 'http'
204 self.client = boto3.client('sns',
205 endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
206 aws_access_key_id=conn.aws_access_key_id,
207 aws_secret_access_key=conn.aws_secret_access_key,
208 region_name=region,
209 verify='./cert.pem',
210 config=Config(signature_version='s3'))
211
212
213 def get_config(self):
214 """get topic info"""
215 parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
216 body = urlparse.urlencode(parameters)
217 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
218 content_type = 'application/x-www-form-urlencoded; charset=utf-8'
219 resource = '/'
220 method = 'POST'
221 string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
222 log.debug('StringTosign: %s', string_to_sign)
223 signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
224 string_to_sign.encode('utf-8'),
225 hashlib.sha1).digest()).decode('ascii')
226 headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
227 'Date': string_date,
228 'Host': self.conn.host+':'+str(self.conn.port),
229 'Content-Type': content_type}
230 if self.conn.is_secure:
231 http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
232 context=ssl.create_default_context(cafile='./cert.pem'))
233 else:
234 http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
235 http_conn.request(method, resource, body, headers)
236 response = http_conn.getresponse()
237 data = response.read()
238 status = response.status
239 http_conn.close()
240 dict_response = xmltodict.parse(data)
241 return dict_response, status
242
243 def get_attributes(self):
244 """get topic attributes"""
245 return self.client.get_topic_attributes(TopicArn=self.topic_arn)
246
247 def set_config(self):
248 """set topic"""
249 result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
250 self.topic_arn = result['TopicArn']
251 return self.topic_arn
252
253 def del_config(self):
254 """delete topic"""
255 result = self.client.delete_topic(TopicArn=self.topic_arn)
256 return result['ResponseMetadata']['HTTPStatusCode']
257
258 def get_list(self):
259 """list all topics"""
260 # note that boto3 supports list_topics(), however, the result only show ARNs
261 parameters = {'Action': 'ListTopics'}
262 body = urlparse.urlencode(parameters)
263 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
264 content_type = 'application/x-www-form-urlencoded; charset=utf-8'
265 resource = '/'
266 method = 'POST'
267 string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
268 log.debug('StringTosign: %s', string_to_sign)
269 signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
270 string_to_sign.encode('utf-8'),
271 hashlib.sha1).digest()).decode('ascii')
272 headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
273 'Date': string_date,
274 'Host': self.conn.host+':'+str(self.conn.port),
275 'Content-Type': content_type}
276 if self.conn.is_secure:
277 http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
278 context=ssl.create_default_context(cafile='./cert.pem'))
279 else:
280 http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
281 http_conn.request(method, resource, body, headers)
282 response = http_conn.getresponse()
283 data = response.read()
284 status = response.status
285 http_conn.close()
286 dict_response = xmltodict.parse(data)
287 return dict_response, status
288
289
290 class PSNotification:
291 """class to set/get/delete a notification
292 PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
293 GET /notifications/bucket/<bucket>
294 DELETE /notifications/bucket/<bucket>?topic=<topic-name>
295 """
296 def __init__(self, conn, bucket_name, topic_name, events=''):
297 self.conn = conn
298 assert bucket_name.strip()
299 assert topic_name.strip()
300 self.resource = '/notifications/bucket/'+bucket_name
301 if events.strip():
302 self.parameters = {'topic': topic_name, 'events': events}
303 else:
304 self.parameters = {'topic': topic_name}
305
306 def send_request(self, method, parameters=None):
307 """send request to radosgw"""
308 return make_request(self.conn, method, self.resource, parameters)
309
310 def get_config(self):
311 """get notification info"""
312 return self.send_request('GET')
313
314 def set_config(self):
315 """set notification"""
316 return self.send_request('PUT', self.parameters)
317
318 def del_config(self):
319 """delete notification"""
320 return self.send_request('DELETE', self.parameters)
321
322
323 class PSNotificationS3:
324 """class to set/get/delete an S3 notification
325 PUT /<bucket>?notification
326 GET /<bucket>?notification[=<notification>]
327 DELETE /<bucket>?notification[=<notification>]
328 """
329 def __init__(self, conn, bucket_name, topic_conf_list):
330 self.conn = conn
331 assert bucket_name.strip()
332 self.bucket_name = bucket_name
333 self.resource = '/'+bucket_name
334 self.topic_conf_list = topic_conf_list
335 self.client = boto3.client('s3',
336 endpoint_url='http://'+conn.host+':'+str(conn.port),
337 aws_access_key_id=conn.aws_access_key_id,
338 aws_secret_access_key=conn.aws_secret_access_key,
339 config=Config(signature_version='s3'))
340
341 def send_request(self, method, parameters=None):
342 """send request to radosgw"""
343 return make_request(self.conn, method, self.resource,
344 parameters=parameters, sign_parameters=True)
345
346 def get_config(self, notification=None):
347 """get notification info"""
348 parameters = None
349 if notification is None:
350 response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name)
351 status = response['ResponseMetadata']['HTTPStatusCode']
352 return response, status
353 parameters = {'notification': notification}
354 response, status = self.send_request('GET', parameters=parameters)
355 dict_response = xmltodict.parse(response)
356 return dict_response, status
357
358 def set_config(self):
359 """set notification"""
360 response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name,
361 NotificationConfiguration={
362 'TopicConfigurations': self.topic_conf_list
363 })
364 status = response['ResponseMetadata']['HTTPStatusCode']
365 return response, status
366
367 def del_config(self, notification=None):
368 """delete notification"""
369 parameters = {'notification': notification}
370
371 return self.send_request('DELETE', parameters)
372
373
374 class PSSubscription:
375 """class to set/get/delete a subscription:
376 PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
377 GET /subscriptions/<sub-name>
378 DELETE /subscriptions/<sub-name>
379 also to get list of events, and ack them:
380 GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
381 POST /subscriptions/<sub-name>?ack&event-id=<event-id>
382 """
383 def __init__(self, conn, sub_name, topic_name, endpoint=None, endpoint_args=None):
384 self.conn = conn
385 assert topic_name.strip()
386 self.resource = '/subscriptions/'+sub_name
387 if endpoint is not None:
388 self.parameters = {'topic': topic_name, 'push-endpoint': endpoint}
389 self.extra_parameters = endpoint_args
390 else:
391 self.parameters = {'topic': topic_name}
392 self.extra_parameters = None
393
394 def send_request(self, method, parameters=None, extra_parameters=None):
395 """send request to radosgw"""
396 return make_request(self.conn, method, self.resource,
397 parameters=parameters,
398 extra_parameters=extra_parameters)
399
400 def get_config(self):
401 """get subscription info"""
402 return self.send_request('GET')
403
404 def set_config(self):
405 """set subscription"""
406 return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
407
408 def del_config(self, topic=False):
409 """delete subscription"""
410 if topic:
411 return self.send_request('DELETE', self.parameters)
412 return self.send_request('DELETE')
413
414 def get_events(self, max_entries=None, marker=None):
415 """ get events from subscription """
416 parameters = {'events': None}
417 if max_entries is not None:
418 parameters['max-entries'] = max_entries
419 if marker is not None:
420 parameters['marker'] = marker
421 return self.send_request('GET', parameters)
422
423 def ack_events(self, event_id):
424 """ ack events in a subscription """
425 parameters = {'ack': None, 'event-id': event_id}
426 return self.send_request('POST', parameters)
427
428
429 class PSZoneConfig:
430 """ pubsub zone configuration """
431 def __init__(self, cfg, section):
432 self.full_sync = cfg.get(section, 'start_with_full_sync')
433 self.retention_days = cfg.get(section, 'retention_days')