8 from http
import client
as http_client
9 from urllib
import parse
as urlparse
10 from time
import gmtime
, strftime
12 from botocore
.client
import Config
16 log
= logging
.getLogger('bucket_notification.tests')
20 def put_object_tagging(conn
, bucket_name
, key
, tags
):
21 client
= boto3
.client('s3',
22 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
23 aws_access_key_id
=conn
.aws_access_key_id
,
24 aws_secret_access_key
=conn
.aws_secret_access_key
)
25 return client
.put_object(Body
='aaaaaaaaaaa', Bucket
=bucket_name
, Key
=key
, Tagging
=tags
)
27 def make_request(conn
, method
, resource
, parameters
=None, sign_parameters
=False, extra_parameters
=None):
28 """generic request sending to pubsub radogw
29 should cover: topics, notificatios and subscriptions
32 if parameters
is not None:
33 url_params
= urlparse
.urlencode(parameters
)
34 # remove 'None' from keys with no values
35 url_params
= url_params
.replace('=None', '')
36 url_params
= '?' + url_params
37 if extra_parameters
is not None:
38 url_params
= url_params
+ '&' + extra_parameters
39 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
40 string_to_sign
= method
+ '\n\n\n' + string_date
+ '\n' + resource
42 string_to_sign
+= url_params
43 signature
= base64
.b64encode(hmac
.new(conn
.aws_secret_access_key
.encode('utf-8'),
44 string_to_sign
.encode('utf-8'),
45 hashlib
.sha1
).digest()).decode('ascii')
46 headers
= {'Authorization': 'AWS '+conn
.aws_access_key_id
+':'+signature
,
48 'Host': conn
.host
+':'+str(conn
.port
)}
49 http_conn
= http_client
.HTTPConnection(conn
.host
, conn
.port
)
50 if log
.getEffectiveLevel() <= 10:
51 http_conn
.set_debuglevel(5)
52 http_conn
.request(method
, resource
+url_params
, NO_HTTP_BODY
, headers
)
53 response
= http_conn
.getresponse()
54 data
= response
.read()
55 status
= response
.status
57 return data
.decode('utf-8'), status
59 def delete_all_s3_topics(zone
, region
):
61 conn
= zone
.secure_conn
if zone
.secure_conn
is not None else zone
.conn
62 protocol
= 'https' if conn
.is_secure
else 'http'
63 client
= boto3
.client('sns',
64 endpoint_url
=protocol
+'://'+conn
.host
+':'+str(conn
.port
),
65 aws_access_key_id
=conn
.aws_access_key_id
,
66 aws_secret_access_key
=conn
.aws_secret_access_key
,
70 topics
= client
.list_topics()['Topics']
72 print('topic cleanup, deleting: ' + topic
['TopicArn'])
73 assert client
.delete_topic(TopicArn
=topic
['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
74 except Exception as err
:
75 print('failed to do topic cleanup: ' + str(err
))
77 def delete_all_objects(conn
, bucket_name
):
78 client
= boto3
.client('s3',
79 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
80 aws_access_key_id
=conn
.aws_access_key_id
,
81 aws_secret_access_key
=conn
.aws_secret_access_key
)
84 for key
in client
.list_objects(Bucket
=bucket_name
)['Contents']:
85 objects
.append({'Key': key
['Key']})
86 # delete objects from the bucket
87 response
= client
.delete_objects(Bucket
=bucket_name
,
88 Delete
={'Objects': objects
})
92 """class to set/list/get/delete a topic
93 POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
94 POST ?Action=ListTopics
95 POST ?Action=GetTopic&TopicArn=<topic-arn>
96 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
98 def __init__(self
, conn
, topic_name
, region
, endpoint_args
=None, opaque_data
=None):
100 self
.topic_name
= topic_name
.strip()
101 assert self
.topic_name
104 if endpoint_args
is not None:
105 self
.attributes
= {nvp
[0] : nvp
[1] for nvp
in urlparse
.parse_qsl(endpoint_args
, keep_blank_values
=True)}
106 if opaque_data
is not None:
107 self
.attributes
['OpaqueData'] = opaque_data
108 protocol
= 'https' if conn
.is_secure
else 'http'
109 self
.client
= boto3
.client('sns',
110 endpoint_url
=protocol
+'://'+conn
.host
+':'+str(conn
.port
),
111 aws_access_key_id
=conn
.aws_access_key_id
,
112 aws_secret_access_key
=conn
.aws_secret_access_key
,
116 def get_config(self
):
118 parameters
= {'Action': 'GetTopic', 'TopicArn': self
.topic_arn
}
119 body
= urlparse
.urlencode(parameters
)
120 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
121 content_type
= 'application/x-www-form-urlencoded; charset=utf-8'
124 string_to_sign
= method
+ '\n\n' + content_type
+ '\n' + string_date
+ '\n' + resource
125 log
.debug('StringTosign: %s', string_to_sign
)
126 signature
= base64
.b64encode(hmac
.new(self
.conn
.aws_secret_access_key
.encode('utf-8'),
127 string_to_sign
.encode('utf-8'),
128 hashlib
.sha1
).digest()).decode('ascii')
129 headers
= {'Authorization': 'AWS '+self
.conn
.aws_access_key_id
+':'+signature
,
131 'Host': self
.conn
.host
+':'+str(self
.conn
.port
),
132 'Content-Type': content_type
}
133 if self
.conn
.is_secure
:
134 http_conn
= http_client
.HTTPSConnection(self
.conn
.host
, self
.conn
.port
,
135 context
=ssl
.create_default_context(cafile
='./cert.pem'))
137 http_conn
= http_client
.HTTPConnection(self
.conn
.host
, self
.conn
.port
)
138 http_conn
.request(method
, resource
, body
, headers
)
139 response
= http_conn
.getresponse()
140 data
= response
.read()
141 status
= response
.status
143 dict_response
= xmltodict
.parse(data
)
144 return dict_response
, status
146 def set_config(self
):
148 result
= self
.client
.create_topic(Name
=self
.topic_name
, Attributes
=self
.attributes
)
149 self
.topic_arn
= result
['TopicArn']
150 return self
.topic_arn
152 def del_config(self
):
154 result
= self
.client
.delete_topic(TopicArn
=self
.topic_arn
)
155 return result
['ResponseMetadata']['HTTPStatusCode']
158 """list all topics"""
159 # note that boto3 supports list_topics(), however, the result only show ARNs
160 parameters
= {'Action': 'ListTopics'}
161 body
= urlparse
.urlencode(parameters
)
162 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
163 content_type
= 'application/x-www-form-urlencoded; charset=utf-8'
166 string_to_sign
= method
+ '\n\n' + content_type
+ '\n' + string_date
+ '\n' + resource
167 log
.debug('StringTosign: %s', string_to_sign
)
168 signature
= base64
.b64encode(hmac
.new(self
.conn
.aws_secret_access_key
.encode('utf-8'),
169 string_to_sign
.encode('utf-8'),
170 hashlib
.sha1
).digest()).decode('ascii')
171 headers
= {'Authorization': 'AWS '+self
.conn
.aws_access_key_id
+':'+signature
,
173 'Host': self
.conn
.host
+':'+str(self
.conn
.port
),
174 'Content-Type': content_type
}
175 if self
.conn
.is_secure
:
176 http_conn
= http_client
.HTTPSConnection(self
.conn
.host
, self
.conn
.port
,
177 context
=ssl
.create_default_context(cafile
='./cert.pem'))
179 http_conn
= http_client
.HTTPConnection(self
.conn
.host
, self
.conn
.port
)
180 http_conn
.request(method
, resource
, body
, headers
)
181 response
= http_conn
.getresponse()
182 data
= response
.read()
183 status
= response
.status
185 dict_response
= xmltodict
.parse(data
)
186 return dict_response
, status
188 class PSNotificationS3
:
189 """class to set/get/delete an S3 notification
190 PUT /<bucket>?notification
191 GET /<bucket>?notification[=<notification>]
192 DELETE /<bucket>?notification[=<notification>]
194 def __init__(self
, conn
, bucket_name
, topic_conf_list
):
196 assert bucket_name
.strip()
197 self
.bucket_name
= bucket_name
198 self
.resource
= '/'+bucket_name
199 self
.topic_conf_list
= topic_conf_list
200 self
.client
= boto3
.client('s3',
201 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
202 aws_access_key_id
=conn
.aws_access_key_id
,
203 aws_secret_access_key
=conn
.aws_secret_access_key
)
205 def send_request(self
, method
, parameters
=None):
206 """send request to radosgw"""
207 return make_request(self
.conn
, method
, self
.resource
,
208 parameters
=parameters
, sign_parameters
=True)
210 def get_config(self
, notification
=None):
211 """get notification info"""
213 if notification
is None:
214 response
= self
.client
.get_bucket_notification_configuration(Bucket
=self
.bucket_name
)
215 status
= response
['ResponseMetadata']['HTTPStatusCode']
216 return response
, status
217 parameters
= {'notification': notification
}
218 response
, status
= self
.send_request('GET', parameters
=parameters
)
219 dict_response
= xmltodict
.parse(response
)
220 return dict_response
, status
222 def set_config(self
):
223 """set notification"""
224 response
= self
.client
.put_bucket_notification_configuration(Bucket
=self
.bucket_name
,
225 NotificationConfiguration
={
226 'TopicConfigurations': self
.topic_conf_list
228 status
= response
['ResponseMetadata']['HTTPStatusCode']
229 return response
, status
231 def del_config(self
, notification
=None):
232 """delete notification"""
233 parameters
= {'notification': notification
}
235 return self
.send_request('DELETE', parameters
)
238 test_path
= os
.path
.normpath(os
.path
.dirname(os
.path
.realpath(__file__
))) + '/../'
240 def bash(cmd
, **kwargs
):
241 log
.debug('running command: %s', ' '.join(cmd
))
242 kwargs
['stdout'] = subprocess
.PIPE
243 process
= subprocess
.Popen(cmd
, **kwargs
)
244 s
= process
.communicate()[0].decode('utf-8')
245 return (s
, process
.returncode
)
247 def admin(args
, **kwargs
):
248 """ radosgw-admin command """
249 cmd
= [test_path
+ 'test-rgw-call.sh', 'call_rgw_admin', 'noname'] + args
250 return bash(cmd
, **kwargs
)