]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_ps.py
daee7a25b4ca880bff9a99576572b15bebb5630f
[ceph.git] / ceph / src / test / rgw / rgw_multi / zone_ps.py
1 import logging
2 import ssl
3 import urllib
4 import hmac
5 import hashlib
6 import base64
7 import xmltodict
8 from http import client as http_client
9 from urllib import parse as urlparse
10 from time import gmtime, strftime
11 from .multisite import Zone
12 import boto3
13 from botocore.client import Config
14
15 log = logging.getLogger('rgw_multi.tests')
16
17
18 def get_object_tagging(conn, bucket, object_key):
19 client = boto3.client('s3',
20 endpoint_url='http://'+conn.host+':'+str(conn.port),
21 aws_access_key_id=conn.aws_access_key_id,
22 aws_secret_access_key=conn.aws_secret_access_key,
23 config=Config(signature_version='s3'))
24 return client.get_object_tagging(
25 Bucket=bucket,
26 Key=object_key
27 )
28
29
30 class PSZone(Zone): # pylint: disable=too-many-ancestors
31 """ PubSub zone class """
32 def __init__(self, name, zonegroup=None, cluster=None, data=None, zone_id=None, gateways=None, full_sync='false', retention_days ='7'):
33 self.full_sync = full_sync
34 self.retention_days = retention_days
35 self.master_zone = zonegroup.master_zone
36 super(PSZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
37
38 def is_read_only(self):
39 return True
40
41 def tier_type(self):
42 return "pubsub"
43
44 def syncs_from(self, zone_name):
45 return zone_name == self.master_zone.name
46
47 def create(self, cluster, args=None, **kwargs):
48 if args is None:
49 args = ''
50 tier_config = ','.join(['start_with_full_sync=' + self.full_sync, 'event_retention_days=' + self.retention_days])
51 args += ['--tier-type', self.tier_type(), '--sync-from-all=0', '--sync-from', self.master_zone.name, '--tier-config', tier_config]
52 return self.json_command(cluster, 'create', args)
53
54 def has_buckets(self):
55 return False
56
57
58 NO_HTTP_BODY = ''
59
60
61 def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None):
62 """generic request sending to pubsub radogw
63 should cover: topics, notificatios and subscriptions
64 """
65 url_params = ''
66 if parameters is not None:
67 url_params = urlparse.urlencode(parameters)
68 # remove 'None' from keys with no values
69 url_params = url_params.replace('=None', '')
70 url_params = '?' + url_params
71 if extra_parameters is not None:
72 url_params = url_params + '&' + extra_parameters
73 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
74 string_to_sign = method + '\n\n\n' + string_date + '\n' + resource
75 if sign_parameters:
76 string_to_sign += url_params
77 signature = base64.b64encode(hmac.new(conn.aws_secret_access_key.encode('utf-8'),
78 string_to_sign.encode('utf-8'),
79 hashlib.sha1).digest()).decode('ascii')
80 headers = {'Authorization': 'AWS '+conn.aws_access_key_id+':'+signature,
81 'Date': string_date,
82 'Host': conn.host+':'+str(conn.port)}
83 http_conn = http_client.HTTPConnection(conn.host, conn.port)
84 if log.getEffectiveLevel() <= 10:
85 http_conn.set_debuglevel(5)
86 http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
87 response = http_conn.getresponse()
88 data = response.read()
89 status = response.status
90 http_conn.close()
91 return data.decode('utf-8'), status
92
93
94 def print_connection_info(conn):
95 """print info of connection"""
96 print("Host: " + conn.host+':'+str(conn.port))
97 print("AWS Secret Key: " + conn.aws_secret_access_key)
98 print("AWS Access Key: " + conn.aws_access_key_id)
99
100
101 class PSTopic:
102 """class to set/get/delete a topic
103 PUT /topics/<topic name>[?push-endpoint=<endpoint>&[<arg1>=<value1>...]]
104 GET /topics/<topic name>
105 DELETE /topics/<topic name>
106 """
107 def __init__(self, conn, topic_name, endpoint=None, endpoint_args=None):
108 self.conn = conn
109 assert topic_name.strip()
110 self.resource = '/topics/'+topic_name
111 if endpoint is not None:
112 self.parameters = {'push-endpoint': endpoint}
113 self.extra_parameters = endpoint_args
114 else:
115 self.parameters = None
116 self.extra_parameters = None
117
118 def send_request(self, method, get_list=False, parameters=None, extra_parameters=None):
119 """send request to radosgw"""
120 if get_list:
121 return make_request(self.conn, method, '/topics')
122 return make_request(self.conn, method, self.resource,
123 parameters=parameters, extra_parameters=extra_parameters)
124
125 def get_config(self):
126 """get topic info"""
127 return self.send_request('GET')
128
129 def set_config(self):
130 """set topic"""
131 return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
132
133 def del_config(self):
134 """delete topic"""
135 return self.send_request('DELETE')
136
137 def get_list(self):
138 """list all topics"""
139 return self.send_request('GET', get_list=True)
140
141
142 class PSTopicS3:
143 """class to set/list/get/delete a topic
144 POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
145 POST ?Action=ListTopics
146 POST ?Action=GetTopic&TopicArn=<topic-arn>
147 POST ?Action=GetTopicAttributes&TopicArn=<topic-arn>
148 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
149 """
150 def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
151 self.conn = conn
152 self.topic_name = topic_name.strip()
153 assert self.topic_name
154 self.topic_arn = ''
155 self.attributes = {}
156 if endpoint_args is not None:
157 self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
158 if opaque_data is not None:
159 self.attributes['OpaqueData'] = opaque_data
160 protocol = 'https' if conn.is_secure else 'http'
161 self.client = boto3.client('sns',
162 endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
163 aws_access_key_id=conn.aws_access_key_id,
164 aws_secret_access_key=conn.aws_secret_access_key,
165 region_name=region,
166 verify='./cert.pem',
167 config=Config(signature_version='s3'))
168
169
170 def get_config(self):
171 """get topic info"""
172 parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
173 body = urlparse.urlencode(parameters)
174 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
175 content_type = 'application/x-www-form-urlencoded; charset=utf-8'
176 resource = '/'
177 method = 'POST'
178 string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
179 log.debug('StringTosign: %s', string_to_sign)
180 signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
181 string_to_sign.encode('utf-8'),
182 hashlib.sha1).digest()).decode('ascii')
183 headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
184 'Date': string_date,
185 'Host': self.conn.host+':'+str(self.conn.port),
186 'Content-Type': content_type}
187 if self.conn.is_secure:
188 http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
189 context=ssl.create_default_context(cafile='./cert.pem'))
190 else:
191 http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
192 http_conn.request(method, resource, body, headers)
193 response = http_conn.getresponse()
194 data = response.read()
195 status = response.status
196 http_conn.close()
197 dict_response = xmltodict.parse(data)
198 return dict_response, status
199
200 def get_attributes(self):
201 """get topic attributes"""
202 return self.client.get_topic_attributes(TopicArn=self.topic_arn)
203
204 def set_config(self):
205 """set topic"""
206 result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
207 self.topic_arn = result['TopicArn']
208 return self.topic_arn
209
210 def del_config(self):
211 """delete topic"""
212 result = self.client.delete_topic(TopicArn=self.topic_arn)
213 return result['ResponseMetadata']['HTTPStatusCode']
214
215 def get_list(self):
216 """list all topics"""
217 # note that boto3 supports list_topics(), however, the result only show ARNs
218 parameters = {'Action': 'ListTopics'}
219 body = urlparse.urlencode(parameters)
220 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
221 content_type = 'application/x-www-form-urlencoded; charset=utf-8'
222 resource = '/'
223 method = 'POST'
224 string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
225 log.debug('StringTosign: %s', string_to_sign)
226 signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
227 string_to_sign.encode('utf-8'),
228 hashlib.sha1).digest()).decode('ascii')
229 headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
230 'Date': string_date,
231 'Host': self.conn.host+':'+str(self.conn.port),
232 'Content-Type': content_type}
233 if self.conn.is_secure:
234 http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
235 context=ssl.create_default_context(cafile='./cert.pem'))
236 else:
237 http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
238 http_conn.request(method, resource, body, headers)
239 response = http_conn.getresponse()
240 data = response.read()
241 status = response.status
242 http_conn.close()
243 dict_response = xmltodict.parse(data)
244 return dict_response, status
245
246
247 class PSNotification:
248 """class to set/get/delete a notification
249 PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
250 GET /notifications/bucket/<bucket>
251 DELETE /notifications/bucket/<bucket>?topic=<topic-name>
252 """
253 def __init__(self, conn, bucket_name, topic_name, events=''):
254 self.conn = conn
255 assert bucket_name.strip()
256 assert topic_name.strip()
257 self.resource = '/notifications/bucket/'+bucket_name
258 if events.strip():
259 self.parameters = {'topic': topic_name, 'events': events}
260 else:
261 self.parameters = {'topic': topic_name}
262
263 def send_request(self, method, parameters=None):
264 """send request to radosgw"""
265 return make_request(self.conn, method, self.resource, parameters)
266
267 def get_config(self):
268 """get notification info"""
269 return self.send_request('GET')
270
271 def set_config(self):
272 """set notification"""
273 return self.send_request('PUT', self.parameters)
274
275 def del_config(self):
276 """delete notification"""
277 return self.send_request('DELETE', self.parameters)
278
279
280 class PSNotificationS3:
281 """class to set/get/delete an S3 notification
282 PUT /<bucket>?notification
283 GET /<bucket>?notification[=<notification>]
284 DELETE /<bucket>?notification[=<notification>]
285 """
286 def __init__(self, conn, bucket_name, topic_conf_list):
287 self.conn = conn
288 assert bucket_name.strip()
289 self.bucket_name = bucket_name
290 self.resource = '/'+bucket_name
291 self.topic_conf_list = topic_conf_list
292 self.client = boto3.client('s3',
293 endpoint_url='http://'+conn.host+':'+str(conn.port),
294 aws_access_key_id=conn.aws_access_key_id,
295 aws_secret_access_key=conn.aws_secret_access_key,
296 config=Config(signature_version='s3'))
297
298 def send_request(self, method, parameters=None):
299 """send request to radosgw"""
300 return make_request(self.conn, method, self.resource,
301 parameters=parameters, sign_parameters=True)
302
303 def get_config(self, notification=None):
304 """get notification info"""
305 parameters = None
306 if notification is None:
307 response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name)
308 status = response['ResponseMetadata']['HTTPStatusCode']
309 return response, status
310 parameters = {'notification': notification}
311 response, status = self.send_request('GET', parameters=parameters)
312 dict_response = xmltodict.parse(response)
313 return dict_response, status
314
315 def set_config(self):
316 """set notification"""
317 response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name,
318 NotificationConfiguration={
319 'TopicConfigurations': self.topic_conf_list
320 })
321 status = response['ResponseMetadata']['HTTPStatusCode']
322 return response, status
323
324 def del_config(self, notification=None):
325 """delete notification"""
326 parameters = {'notification': notification}
327
328 return self.send_request('DELETE', parameters)
329
330
331 class PSSubscription:
332 """class to set/get/delete a subscription:
333 PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
334 GET /subscriptions/<sub-name>
335 DELETE /subscriptions/<sub-name>
336 also to get list of events, and ack them:
337 GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
338 POST /subscriptions/<sub-name>?ack&event-id=<event-id>
339 """
340 def __init__(self, conn, sub_name, topic_name, endpoint=None, endpoint_args=None):
341 self.conn = conn
342 assert topic_name.strip()
343 self.resource = '/subscriptions/'+sub_name
344 if endpoint is not None:
345 self.parameters = {'topic': topic_name, 'push-endpoint': endpoint}
346 self.extra_parameters = endpoint_args
347 else:
348 self.parameters = {'topic': topic_name}
349 self.extra_parameters = None
350
351 def send_request(self, method, parameters=None, extra_parameters=None):
352 """send request to radosgw"""
353 return make_request(self.conn, method, self.resource,
354 parameters=parameters,
355 extra_parameters=extra_parameters)
356
357 def get_config(self):
358 """get subscription info"""
359 return self.send_request('GET')
360
361 def set_config(self):
362 """set subscription"""
363 return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
364
365 def del_config(self, topic=False):
366 """delete subscription"""
367 if topic:
368 return self.send_request('DELETE', self.parameters)
369 return self.send_request('DELETE')
370
371 def get_events(self, max_entries=None, marker=None):
372 """ get events from subscription """
373 parameters = {'events': None}
374 if max_entries is not None:
375 parameters['max-entries'] = max_entries
376 if marker is not None:
377 parameters['marker'] = marker
378 return self.send_request('GET', parameters)
379
380 def ack_events(self, event_id):
381 """ ack events in a subscription """
382 parameters = {'ack': None, 'event-id': event_id}
383 return self.send_request('POST', parameters)
384
385
386 class PSZoneConfig:
387 """ pubsub zone configuration """
388 def __init__(self, cfg, section):
389 self.full_sync = cfg.get(section, 'start_with_full_sync')
390 self.retention_days = cfg.get(section, 'retention_days')