]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/rgw_multi/zone_ps.py
import ceph 14.2.5
[ceph.git] / ceph / src / test / rgw / rgw_multi / zone_ps.py
1 import logging
2 import httplib
3 import urllib
4 import urlparse
5 import hmac
6 import hashlib
7 import base64
8 import xmltodict
9 from time import gmtime, strftime
10 from .multisite import Zone
11 import boto3
12 from botocore.client import Config
13
14 log = logging.getLogger('rgw_multi.tests')
15
16
17 class PSZone(Zone): # pylint: disable=too-many-ancestors
18 """ PubSub zone class """
19 def __init__(self, name, zonegroup=None, cluster=None, data=None, zone_id=None, gateways=None, full_sync='false', retention_days ='7'):
20 self.full_sync = full_sync
21 self.retention_days = retention_days
22 self.master_zone = zonegroup.master_zone
23 super(PSZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
24
25 def is_read_only(self):
26 return True
27
28 def tier_type(self):
29 return "pubsub"
30
31 def create(self, cluster, args=None, **kwargs):
32 if args is None:
33 args = ''
34 tier_config = ','.join(['start_with_full_sync=' + self.full_sync, 'event_retention_days=' + self.retention_days])
35 args += ['--tier-type', self.tier_type(), '--sync-from-all=0', '--sync-from', self.master_zone.name, '--tier-config', tier_config]
36 return self.json_command(cluster, 'create', args)
37
38 def has_buckets(self):
39 return False
40
41
42 NO_HTTP_BODY = ''
43
44
45 def print_connection_info(conn):
46 """print connection details"""
47 print('Endpoint: ' + conn.host + ':' + str(conn.port))
48 print('AWS Access Key:: ' + conn.aws_access_key_id)
49 print('AWS Secret Key:: ' + conn.aws_secret_access_key)
50
51
52 def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None):
53 """generic request sending to pubsub radogw
54 should cover: topics, notificatios and subscriptions
55 """
56 url_params = ''
57 if parameters is not None:
58 url_params = urllib.urlencode(parameters)
59 # remove 'None' from keys with no values
60 url_params = url_params.replace('=None', '')
61 url_params = '?' + url_params
62 if extra_parameters is not None:
63 url_params = url_params + '&' + extra_parameters
64 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
65 string_to_sign = method + '\n\n\n' + string_date + '\n' + resource
66 if sign_parameters:
67 string_to_sign += url_params
68 signature = base64.b64encode(hmac.new(conn.aws_secret_access_key,
69 string_to_sign.encode('utf-8'),
70 hashlib.sha1).digest())
71 headers = {'Authorization': 'AWS '+conn.aws_access_key_id+':'+signature,
72 'Date': string_date,
73 'Host': conn.host+':'+str(conn.port)}
74 http_conn = httplib.HTTPConnection(conn.host, conn.port)
75 if log.getEffectiveLevel() <= 10:
76 http_conn.set_debuglevel(5)
77 http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
78 response = http_conn.getresponse()
79 data = response.read()
80 status = response.status
81 http_conn.close()
82 return data, status
83
84
85 def print_connection_info(conn):
86 """print info of connection"""
87 print("Host: " + conn.host+':'+str(conn.port))
88 print("AWS Secret Key: " + conn.aws_secret_access_key)
89 print("AWS Access Key: " + conn.aws_access_key_id)
90
91
92 class PSTopic:
93 """class to set/get/delete a topic
94 PUT /topics/<topic name>[?push-endpoint=<endpoint>&[<arg1>=<value1>...]]
95 GET /topics/<topic name>
96 DELETE /topics/<topic name>
97 """
98 def __init__(self, conn, topic_name, endpoint=None, endpoint_args=None):
99 self.conn = conn
100 assert topic_name.strip()
101 self.resource = '/topics/'+topic_name
102 if endpoint is not None:
103 self.parameters = {'push-endpoint': endpoint}
104 self.extra_parameters = endpoint_args
105 else:
106 self.parameters = None
107 self.extra_parameters = None
108
109 def send_request(self, method, get_list=False, parameters=None, extra_parameters=None):
110 """send request to radosgw"""
111 if get_list:
112 return make_request(self.conn, method, '/topics')
113 return make_request(self.conn, method, self.resource,
114 parameters=parameters, extra_parameters=extra_parameters)
115
116 def get_config(self):
117 """get topic info"""
118 return self.send_request('GET')
119
120 def set_config(self):
121 """set topic"""
122 return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
123
124 def del_config(self):
125 """delete topic"""
126 return self.send_request('DELETE')
127
128 def get_list(self):
129 """list all topics"""
130 return self.send_request('GET', get_list=True)
131
132
133 def delete_all_s3_topics(conn, region):
134 try:
135 client = boto3.client('sns',
136 endpoint_url='http://'+conn.host+':'+str(conn.port),
137 aws_access_key_id=conn.aws_access_key_id,
138 aws_secret_access_key=conn.aws_secret_access_key,
139 region_name=region,
140 config=Config(signature_version='s3'))
141
142 topics = client.list_topics()['Topics']
143 for topic in topics:
144 print 'topic cleanup, deleting: ' + topic['TopicArn']
145 assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
146 except:
147 print 'failed to do topic cleanup. if there are topics they may need to be manually deleted'
148
149
150 class PSTopicS3:
151 """class to set/list/get/delete a topic
152 POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
153 POST ?Action=ListTopics
154 POST ?Action=GetTopic&TopicArn=<topic-arn>
155 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
156 """
157 def __init__(self, conn, topic_name, region, endpoint_args=None):
158 self.conn = conn
159 self.topic_name = topic_name.strip()
160 assert self.topic_name
161 self.topic_arn = ''
162 self.attributes = {}
163 if endpoint_args is not None:
164 self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
165 self.client = boto3.client('sns',
166 endpoint_url='http://'+conn.host+':'+str(conn.port),
167 aws_access_key_id=conn.aws_access_key_id,
168 aws_secret_access_key=conn.aws_secret_access_key,
169 region_name=region,
170 config=Config(signature_version='s3'))
171
172
173 def get_config(self):
174 """get topic info"""
175 parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
176 body = urllib.urlencode(parameters)
177 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
178 content_type = 'application/x-www-form-urlencoded; charset=utf-8'
179 resource = '/'
180 method = 'POST'
181 string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
182 log.debug('StringTosign: %s', string_to_sign)
183 signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key,
184 string_to_sign.encode('utf-8'),
185 hashlib.sha1).digest())
186 headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
187 'Date': string_date,
188 'Host': self.conn.host+':'+str(self.conn.port),
189 'Content-Type': content_type}
190 http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
191 if log.getEffectiveLevel() <= 10:
192 http_conn.set_debuglevel(5)
193 http_conn.request(method, resource, body, headers)
194 response = http_conn.getresponse()
195 data = response.read()
196 status = response.status
197 http_conn.close()
198 dict_response = xmltodict.parse(data)
199 return dict_response, status
200
201 def set_config(self):
202 """set topic"""
203 result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
204 self.topic_arn = result['TopicArn']
205 return self.topic_arn
206
207 def del_config(self):
208 """delete topic"""
209 result = self.client.delete_topic(TopicArn=self.topic_arn)
210 return result['ResponseMetadata']['HTTPStatusCode']
211
212 def get_list(self):
213 """list all topics"""
214 return self.client.list_topics()
215
216
217 class PSNotification:
218 """class to set/get/delete a notification
219 PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
220 GET /notifications/bucket/<bucket>
221 DELETE /notifications/bucket/<bucket>?topic=<topic-name>
222 """
223 def __init__(self, conn, bucket_name, topic_name, events=''):
224 self.conn = conn
225 assert bucket_name.strip()
226 assert topic_name.strip()
227 self.resource = '/notifications/bucket/'+bucket_name
228 if events.strip():
229 self.parameters = {'topic': topic_name, 'events': events}
230 else:
231 self.parameters = {'topic': topic_name}
232
233 def send_request(self, method, parameters=None):
234 """send request to radosgw"""
235 return make_request(self.conn, method, self.resource, parameters)
236
237 def get_config(self):
238 """get notification info"""
239 return self.send_request('GET')
240
241 def set_config(self):
242 """set notification"""
243 return self.send_request('PUT', self.parameters)
244
245 def del_config(self):
246 """delete notification"""
247 return self.send_request('DELETE', self.parameters)
248
249
250 class PSNotificationS3:
251 """class to set/get/delete an S3 notification
252 PUT /<bucket>?notification
253 GET /<bucket>?notification[=<notification>]
254 DELETE /<bucket>?notification[=<notification>]
255 """
256 def __init__(self, conn, bucket_name, topic_conf_list):
257 self.conn = conn
258 assert bucket_name.strip()
259 self.bucket_name = bucket_name
260 self.resource = '/'+bucket_name
261 self.topic_conf_list = topic_conf_list
262 self.client = boto3.client('s3',
263 endpoint_url='http://'+conn.host+':'+str(conn.port),
264 aws_access_key_id=conn.aws_access_key_id,
265 aws_secret_access_key=conn.aws_secret_access_key,
266 config=Config(signature_version='s3'))
267
268 def send_request(self, method, parameters=None):
269 """send request to radosgw"""
270 return make_request(self.conn, method, self.resource,
271 parameters=parameters, sign_parameters=True)
272
273 def get_config(self, notification=None):
274 """get notification info"""
275 parameters = None
276 if notification is None:
277 response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name)
278 status = response['ResponseMetadata']['HTTPStatusCode']
279 return response, status
280 parameters = {'notification': notification}
281 response, status = self.send_request('GET', parameters=parameters)
282 dict_response = xmltodict.parse(response)
283 return dict_response, status
284
285 def set_config(self):
286 """set notification"""
287 response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name,
288 NotificationConfiguration={
289 'TopicConfigurations': self.topic_conf_list
290 })
291 status = response['ResponseMetadata']['HTTPStatusCode']
292 return response, status
293
294 def del_config(self, notification=None):
295 """delete notification"""
296 parameters = {'notification': notification}
297
298 return self.send_request('DELETE', parameters)
299
300
301 class PSSubscription:
302 """class to set/get/delete a subscription:
303 PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
304 GET /subscriptions/<sub-name>
305 DELETE /subscriptions/<sub-name>
306 also to get list of events, and ack them:
307 GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
308 POST /subscriptions/<sub-name>?ack&event-id=<event-id>
309 """
310 def __init__(self, conn, sub_name, topic_name, endpoint=None, endpoint_args=None):
311 self.conn = conn
312 assert topic_name.strip()
313 self.resource = '/subscriptions/'+sub_name
314 if endpoint is not None:
315 self.parameters = {'topic': topic_name, 'push-endpoint': endpoint}
316 self.extra_parameters = endpoint_args
317 else:
318 self.parameters = {'topic': topic_name}
319 self.extra_parameters = None
320
321 def send_request(self, method, parameters=None, extra_parameters=None):
322 """send request to radosgw"""
323 return make_request(self.conn, method, self.resource,
324 parameters=parameters,
325 extra_parameters=extra_parameters)
326
327 def get_config(self):
328 """get subscription info"""
329 return self.send_request('GET')
330
331 def set_config(self):
332 """set subscription"""
333 return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
334
335 def del_config(self, topic=False):
336 """delete subscription"""
337 if topic:
338 return self.send_request('DELETE', self.parameters)
339 return self.send_request('DELETE')
340
341 def get_events(self, max_entries=None, marker=None):
342 """ get events from subscription """
343 parameters = {'events': None}
344 if max_entries is not None:
345 parameters['max-entries'] = max_entries
346 if marker is not None:
347 parameters['marker'] = marker
348 return self.send_request('GET', parameters)
349
350 def ack_events(self, event_id):
351 """ ack events in a subscription """
352 parameters = {'ack': None, 'event-id': event_id}
353 return self.send_request('POST', parameters)
354
355
356 class PSZoneConfig:
357 """ pubsub zone configuration """
358 def __init__(self, cfg, section):
359 self.full_sync = cfg.get(section, 'start_with_full_sync')
360 self.retention_days = cfg.get(section, 'retention_days')