]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/rgw/bucket_notification/api.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / rgw / bucket_notification / api.py
1 import logging
2 import ssl
3 import urllib
4 import hmac
5 import hashlib
6 import base64
7 import xmltodict
8 from http import client as http_client
9 from urllib import parse as urlparse
10 from time import gmtime, strftime
11 import boto3
12 from botocore.client import Config
13 import os
14 import subprocess
15
16 log = logging.getLogger('bucket_notification.tests')
17
18 NO_HTTP_BODY = ''
19
20 def put_object_tagging(conn, bucket_name, key, tags):
21 client = boto3.client('s3',
22 endpoint_url='http://'+conn.host+':'+str(conn.port),
23 aws_access_key_id=conn.aws_access_key_id,
24 aws_secret_access_key=conn.aws_secret_access_key)
25 return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
26
27 def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None):
28 """generic request sending to pubsub radogw
29 should cover: topics, notificatios and subscriptions
30 """
31 url_params = ''
32 if parameters is not None:
33 url_params = urlparse.urlencode(parameters)
34 # remove 'None' from keys with no values
35 url_params = url_params.replace('=None', '')
36 url_params = '?' + url_params
37 if extra_parameters is not None:
38 url_params = url_params + '&' + extra_parameters
39 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
40 string_to_sign = method + '\n\n\n' + string_date + '\n' + resource
41 if sign_parameters:
42 string_to_sign += url_params
43 signature = base64.b64encode(hmac.new(conn.aws_secret_access_key.encode('utf-8'),
44 string_to_sign.encode('utf-8'),
45 hashlib.sha1).digest()).decode('ascii')
46 headers = {'Authorization': 'AWS '+conn.aws_access_key_id+':'+signature,
47 'Date': string_date,
48 'Host': conn.host+':'+str(conn.port)}
49 http_conn = http_client.HTTPConnection(conn.host, conn.port)
50 if log.getEffectiveLevel() <= 10:
51 http_conn.set_debuglevel(5)
52 http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
53 response = http_conn.getresponse()
54 data = response.read()
55 status = response.status
56 http_conn.close()
57 return data.decode('utf-8'), status
58
59 def delete_all_s3_topics(zone, region):
60 try:
61 conn = zone.secure_conn if zone.secure_conn is not None else zone.conn
62 protocol = 'https' if conn.is_secure else 'http'
63 client = boto3.client('sns',
64 endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
65 aws_access_key_id=conn.aws_access_key_id,
66 aws_secret_access_key=conn.aws_secret_access_key,
67 region_name=region,
68 verify='./cert.pem')
69
70 topics = client.list_topics()['Topics']
71 for topic in topics:
72 print('topic cleanup, deleting: ' + topic['TopicArn'])
73 assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
74 except Exception as err:
75 print('failed to do topic cleanup: ' + str(err))
76
77 def delete_all_objects(conn, bucket_name):
78 client = boto3.client('s3',
79 endpoint_url='http://'+conn.host+':'+str(conn.port),
80 aws_access_key_id=conn.aws_access_key_id,
81 aws_secret_access_key=conn.aws_secret_access_key)
82
83 objects = []
84 for key in client.list_objects(Bucket=bucket_name)['Contents']:
85 objects.append({'Key': key['Key']})
86 # delete objects from the bucket
87 response = client.delete_objects(Bucket=bucket_name,
88 Delete={'Objects': objects})
89
90
91 class PSTopicS3:
92 """class to set/list/get/delete a topic
93 POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
94 POST ?Action=ListTopics
95 POST ?Action=GetTopic&TopicArn=<topic-arn>
96 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
97 """
98 def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
99 self.conn = conn
100 self.topic_name = topic_name.strip()
101 assert self.topic_name
102 self.topic_arn = ''
103 self.attributes = {}
104 if endpoint_args is not None:
105 self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
106 if opaque_data is not None:
107 self.attributes['OpaqueData'] = opaque_data
108 protocol = 'https' if conn.is_secure else 'http'
109 self.client = boto3.client('sns',
110 endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
111 aws_access_key_id=conn.aws_access_key_id,
112 aws_secret_access_key=conn.aws_secret_access_key,
113 region_name=region,
114 verify='./cert.pem')
115
116 def get_config(self):
117 """get topic info"""
118 parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
119 body = urlparse.urlencode(parameters)
120 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
121 content_type = 'application/x-www-form-urlencoded; charset=utf-8'
122 resource = '/'
123 method = 'POST'
124 string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
125 log.debug('StringTosign: %s', string_to_sign)
126 signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
127 string_to_sign.encode('utf-8'),
128 hashlib.sha1).digest()).decode('ascii')
129 headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
130 'Date': string_date,
131 'Host': self.conn.host+':'+str(self.conn.port),
132 'Content-Type': content_type}
133 if self.conn.is_secure:
134 http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
135 context=ssl.create_default_context(cafile='./cert.pem'))
136 else:
137 http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
138 http_conn.request(method, resource, body, headers)
139 response = http_conn.getresponse()
140 data = response.read()
141 status = response.status
142 http_conn.close()
143 dict_response = xmltodict.parse(data)
144 return dict_response, status
145
146 def set_config(self):
147 """set topic"""
148 result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
149 self.topic_arn = result['TopicArn']
150 return self.topic_arn
151
152 def del_config(self):
153 """delete topic"""
154 result = self.client.delete_topic(TopicArn=self.topic_arn)
155 return result['ResponseMetadata']['HTTPStatusCode']
156
157 def get_list(self):
158 """list all topics"""
159 # note that boto3 supports list_topics(), however, the result only show ARNs
160 parameters = {'Action': 'ListTopics'}
161 body = urlparse.urlencode(parameters)
162 string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
163 content_type = 'application/x-www-form-urlencoded; charset=utf-8'
164 resource = '/'
165 method = 'POST'
166 string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
167 log.debug('StringTosign: %s', string_to_sign)
168 signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
169 string_to_sign.encode('utf-8'),
170 hashlib.sha1).digest()).decode('ascii')
171 headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
172 'Date': string_date,
173 'Host': self.conn.host+':'+str(self.conn.port),
174 'Content-Type': content_type}
175 if self.conn.is_secure:
176 http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
177 context=ssl.create_default_context(cafile='./cert.pem'))
178 else:
179 http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
180 http_conn.request(method, resource, body, headers)
181 response = http_conn.getresponse()
182 data = response.read()
183 status = response.status
184 http_conn.close()
185 dict_response = xmltodict.parse(data)
186 return dict_response, status
187
188 class PSNotificationS3:
189 """class to set/get/delete an S3 notification
190 PUT /<bucket>?notification
191 GET /<bucket>?notification[=<notification>]
192 DELETE /<bucket>?notification[=<notification>]
193 """
194 def __init__(self, conn, bucket_name, topic_conf_list):
195 self.conn = conn
196 assert bucket_name.strip()
197 self.bucket_name = bucket_name
198 self.resource = '/'+bucket_name
199 self.topic_conf_list = topic_conf_list
200 self.client = boto3.client('s3',
201 endpoint_url='http://'+conn.host+':'+str(conn.port),
202 aws_access_key_id=conn.aws_access_key_id,
203 aws_secret_access_key=conn.aws_secret_access_key)
204
205 def send_request(self, method, parameters=None):
206 """send request to radosgw"""
207 return make_request(self.conn, method, self.resource,
208 parameters=parameters, sign_parameters=True)
209
210 def get_config(self, notification=None):
211 """get notification info"""
212 parameters = None
213 if notification is None:
214 response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name)
215 status = response['ResponseMetadata']['HTTPStatusCode']
216 return response, status
217 parameters = {'notification': notification}
218 response, status = self.send_request('GET', parameters=parameters)
219 dict_response = xmltodict.parse(response)
220 return dict_response, status
221
222 def set_config(self):
223 """set notification"""
224 response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name,
225 NotificationConfiguration={
226 'TopicConfigurations': self.topic_conf_list
227 })
228 status = response['ResponseMetadata']['HTTPStatusCode']
229 return response, status
230
231 def del_config(self, notification=None):
232 """delete notification"""
233 parameters = {'notification': notification}
234
235 return self.send_request('DELETE', parameters)
236
237
238 test_path = os.path.normpath(os.path.dirname(os.path.realpath(__file__))) + '/../'
239
240 def bash(cmd, **kwargs):
241 log.debug('running command: %s', ' '.join(cmd))
242 kwargs['stdout'] = subprocess.PIPE
243 process = subprocess.Popen(cmd, **kwargs)
244 s = process.communicate()[0].decode('utf-8')
245 return (s, process.returncode)
246
247 def admin(args, **kwargs):
248 """ radosgw-admin command """
249 cmd = [test_path + 'test-rgw-call.sh', 'call_rgw_admin', 'noname'] + args
250 return bash(cmd, **kwargs)
251