8 from http
import client
as http_client
9 from urllib
import parse
as urlparse
10 from time
import gmtime
, strftime
11 from .multisite
import Zone
13 from botocore
.client
import Config
15 log
= logging
.getLogger('rgw_multi.tests')
17 def put_object_tagging(conn
, bucket_name
, key
, tags
):
18 client
= boto3
.client('s3',
19 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
20 aws_access_key_id
=conn
.aws_access_key_id
,
21 aws_secret_access_key
=conn
.aws_secret_access_key
,
22 config
=Config(signature_version
='s3'))
23 return client
.put_object(Body
='aaaaaaaaaaa', Bucket
=bucket_name
, Key
=key
, Tagging
=tags
)
26 def get_object_tagging(conn
, bucket
, object_key
):
27 client
= boto3
.client('s3',
28 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
29 aws_access_key_id
=conn
.aws_access_key_id
,
30 aws_secret_access_key
=conn
.aws_secret_access_key
,
31 config
=Config(signature_version
='s3'))
32 return client
.get_object_tagging(
38 class PSZone(Zone
): # pylint: disable=too-many-ancestors
39 """ PubSub zone class """
40 def __init__(self
, name
, zonegroup
=None, cluster
=None, data
=None, zone_id
=None, gateways
=None, full_sync
='false', retention_days
='7'):
41 self
.full_sync
= full_sync
42 self
.retention_days
= retention_days
43 self
.master_zone
= zonegroup
.master_zone
44 super(PSZone
, self
).__init
__(name
, zonegroup
, cluster
, data
, zone_id
, gateways
)
46 def is_read_only(self
):
52 def syncs_from(self
, zone_name
):
53 return zone_name
== self
.master_zone
.name
55 def create(self
, cluster
, args
=None, **kwargs
):
58 tier_config
= ','.join(['start_with_full_sync=' + self
.full_sync
, 'event_retention_days=' + self
.retention_days
])
59 args
+= ['--tier-type', self
.tier_type(), '--sync-from-all=0', '--sync-from', self
.master_zone
.name
, '--tier-config', tier_config
]
60 return self
.json_command(cluster
, 'create', args
)
62 def has_buckets(self
):
69 def make_request(conn
, method
, resource
, parameters
=None, sign_parameters
=False, extra_parameters
=None):
70 """generic request sending to pubsub radogw
71 should cover: topics, notificatios and subscriptions
74 if parameters
is not None:
75 url_params
= urlparse
.urlencode(parameters
)
76 # remove 'None' from keys with no values
77 url_params
= url_params
.replace('=None', '')
78 url_params
= '?' + url_params
79 if extra_parameters
is not None:
80 url_params
= url_params
+ '&' + extra_parameters
81 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
82 string_to_sign
= method
+ '\n\n\n' + string_date
+ '\n' + resource
84 string_to_sign
+= url_params
85 signature
= base64
.b64encode(hmac
.new(conn
.aws_secret_access_key
.encode('utf-8'),
86 string_to_sign
.encode('utf-8'),
87 hashlib
.sha1
).digest()).decode('ascii')
88 headers
= {'Authorization': 'AWS '+conn
.aws_access_key_id
+':'+signature
,
90 'Host': conn
.host
+':'+str(conn
.port
)}
91 http_conn
= http_client
.HTTPConnection(conn
.host
, conn
.port
)
92 if log
.getEffectiveLevel() <= 10:
93 http_conn
.set_debuglevel(5)
94 http_conn
.request(method
, resource
+url_params
, NO_HTTP_BODY
, headers
)
95 response
= http_conn
.getresponse()
96 data
= response
.read()
97 status
= response
.status
99 return data
.decode('utf-8'), status
102 def print_connection_info(conn
):
103 """print info of connection"""
104 print("Host: " + conn
.host
+':'+str(conn
.port
))
105 print("AWS Secret Key: " + conn
.aws_secret_access_key
)
106 print("AWS Access Key: " + conn
.aws_access_key_id
)
110 """class to set/get/delete a topic
111 PUT /topics/<topic name>[?push-endpoint=<endpoint>&[<arg1>=<value1>...]]
112 GET /topics/<topic name>
113 DELETE /topics/<topic name>
115 def __init__(self
, conn
, topic_name
, endpoint
=None, endpoint_args
=None):
117 assert topic_name
.strip()
118 self
.resource
= '/topics/'+topic_name
119 if endpoint
is not None:
120 self
.parameters
= {'push-endpoint': endpoint
}
121 self
.extra_parameters
= endpoint_args
123 self
.parameters
= None
124 self
.extra_parameters
= None
126 def send_request(self
, method
, get_list
=False, parameters
=None, extra_parameters
=None):
127 """send request to radosgw"""
129 return make_request(self
.conn
, method
, '/topics')
130 return make_request(self
.conn
, method
, self
.resource
,
131 parameters
=parameters
, extra_parameters
=extra_parameters
)
133 def get_config(self
):
135 return self
.send_request('GET')
137 def set_config(self
):
139 return self
.send_request('PUT', parameters
=self
.parameters
, extra_parameters
=self
.extra_parameters
)
141 def del_config(self
):
143 return self
.send_request('DELETE')
146 """list all topics"""
147 return self
.send_request('GET', get_list
=True)
150 def delete_all_s3_topics(zone
, region
):
152 conn
= zone
.secure_conn
if zone
.secure_conn
is not None else zone
.conn
153 protocol
= 'https' if conn
.is_secure
else 'http'
154 client
= boto3
.client('sns',
155 endpoint_url
=protocol
+'://'+conn
.host
+':'+str(conn
.port
),
156 aws_access_key_id
=conn
.aws_access_key_id
,
157 aws_secret_access_key
=conn
.aws_secret_access_key
,
160 config
=Config(signature_version
='s3'))
162 topics
= client
.list_topics()['Topics']
164 print('topic cleanup, deleting: ' + topic
['TopicArn'])
165 assert client
.delete_topic(TopicArn
=topic
['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
166 except Exception as err
:
167 print('failed to do topic cleanup: ' + str(err
))
170 def delete_all_objects(conn
, bucket_name
):
171 client
= boto3
.client('s3',
172 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
173 aws_access_key_id
=conn
.aws_access_key_id
,
174 aws_secret_access_key
=conn
.aws_secret_access_key
)
177 for key
in client
.list_objects(Bucket
=bucket_name
)['Contents']:
178 objects
.append({'Key': key
['Key']})
179 # delete objects from the bucket
180 response
= client
.delete_objects(Bucket
=bucket_name
,
181 Delete
={'Objects': objects
})
186 """class to set/list/get/delete a topic
187 POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
188 POST ?Action=ListTopics
189 POST ?Action=GetTopic&TopicArn=<topic-arn>
190 POST ?Action=GetTopicAttributes&TopicArn=<topic-arn>
191 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
193 def __init__(self
, conn
, topic_name
, region
, endpoint_args
=None, opaque_data
=None):
195 self
.topic_name
= topic_name
.strip()
196 assert self
.topic_name
199 if endpoint_args
is not None:
200 self
.attributes
= {nvp
[0] : nvp
[1] for nvp
in urlparse
.parse_qsl(endpoint_args
, keep_blank_values
=True)}
201 if opaque_data
is not None:
202 self
.attributes
['OpaqueData'] = opaque_data
203 protocol
= 'https' if conn
.is_secure
else 'http'
204 self
.client
= boto3
.client('sns',
205 endpoint_url
=protocol
+'://'+conn
.host
+':'+str(conn
.port
),
206 aws_access_key_id
=conn
.aws_access_key_id
,
207 aws_secret_access_key
=conn
.aws_secret_access_key
,
210 config
=Config(signature_version
='s3'))
213 def get_config(self
):
215 parameters
= {'Action': 'GetTopic', 'TopicArn': self
.topic_arn
}
216 body
= urlparse
.urlencode(parameters
)
217 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
218 content_type
= 'application/x-www-form-urlencoded; charset=utf-8'
221 string_to_sign
= method
+ '\n\n' + content_type
+ '\n' + string_date
+ '\n' + resource
222 log
.debug('StringTosign: %s', string_to_sign
)
223 signature
= base64
.b64encode(hmac
.new(self
.conn
.aws_secret_access_key
.encode('utf-8'),
224 string_to_sign
.encode('utf-8'),
225 hashlib
.sha1
).digest()).decode('ascii')
226 headers
= {'Authorization': 'AWS '+self
.conn
.aws_access_key_id
+':'+signature
,
228 'Host': self
.conn
.host
+':'+str(self
.conn
.port
),
229 'Content-Type': content_type
}
230 if self
.conn
.is_secure
:
231 http_conn
= http_client
.HTTPSConnection(self
.conn
.host
, self
.conn
.port
,
232 context
=ssl
.create_default_context(cafile
='./cert.pem'))
234 http_conn
= http_client
.HTTPConnection(self
.conn
.host
, self
.conn
.port
)
235 http_conn
.request(method
, resource
, body
, headers
)
236 response
= http_conn
.getresponse()
237 data
= response
.read()
238 status
= response
.status
240 dict_response
= xmltodict
.parse(data
)
241 return dict_response
, status
243 def get_attributes(self
):
244 """get topic attributes"""
245 return self
.client
.get_topic_attributes(TopicArn
=self
.topic_arn
)
247 def set_config(self
):
249 result
= self
.client
.create_topic(Name
=self
.topic_name
, Attributes
=self
.attributes
)
250 self
.topic_arn
= result
['TopicArn']
251 return self
.topic_arn
253 def del_config(self
):
255 result
= self
.client
.delete_topic(TopicArn
=self
.topic_arn
)
256 return result
['ResponseMetadata']['HTTPStatusCode']
259 """list all topics"""
260 # note that boto3 supports list_topics(), however, the result only show ARNs
261 parameters
= {'Action': 'ListTopics'}
262 body
= urlparse
.urlencode(parameters
)
263 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
264 content_type
= 'application/x-www-form-urlencoded; charset=utf-8'
267 string_to_sign
= method
+ '\n\n' + content_type
+ '\n' + string_date
+ '\n' + resource
268 log
.debug('StringTosign: %s', string_to_sign
)
269 signature
= base64
.b64encode(hmac
.new(self
.conn
.aws_secret_access_key
.encode('utf-8'),
270 string_to_sign
.encode('utf-8'),
271 hashlib
.sha1
).digest()).decode('ascii')
272 headers
= {'Authorization': 'AWS '+self
.conn
.aws_access_key_id
+':'+signature
,
274 'Host': self
.conn
.host
+':'+str(self
.conn
.port
),
275 'Content-Type': content_type
}
276 if self
.conn
.is_secure
:
277 http_conn
= http_client
.HTTPSConnection(self
.conn
.host
, self
.conn
.port
,
278 context
=ssl
.create_default_context(cafile
='./cert.pem'))
280 http_conn
= http_client
.HTTPConnection(self
.conn
.host
, self
.conn
.port
)
281 http_conn
.request(method
, resource
, body
, headers
)
282 response
= http_conn
.getresponse()
283 data
= response
.read()
284 status
= response
.status
286 dict_response
= xmltodict
.parse(data
)
287 return dict_response
, status
290 class PSNotification
:
291 """class to set/get/delete a notification
292 PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
293 GET /notifications/bucket/<bucket>
294 DELETE /notifications/bucket/<bucket>?topic=<topic-name>
296 def __init__(self
, conn
, bucket_name
, topic_name
, events
=''):
298 assert bucket_name
.strip()
299 assert topic_name
.strip()
300 self
.resource
= '/notifications/bucket/'+bucket_name
302 self
.parameters
= {'topic': topic_name
, 'events': events
}
304 self
.parameters
= {'topic': topic_name
}
306 def send_request(self
, method
, parameters
=None):
307 """send request to radosgw"""
308 return make_request(self
.conn
, method
, self
.resource
, parameters
)
310 def get_config(self
):
311 """get notification info"""
312 return self
.send_request('GET')
314 def set_config(self
):
315 """set notification"""
316 return self
.send_request('PUT', self
.parameters
)
318 def del_config(self
):
319 """delete notification"""
320 return self
.send_request('DELETE', self
.parameters
)
323 class PSNotificationS3
:
324 """class to set/get/delete an S3 notification
325 PUT /<bucket>?notification
326 GET /<bucket>?notification[=<notification>]
327 DELETE /<bucket>?notification[=<notification>]
329 def __init__(self
, conn
, bucket_name
, topic_conf_list
):
331 assert bucket_name
.strip()
332 self
.bucket_name
= bucket_name
333 self
.resource
= '/'+bucket_name
334 self
.topic_conf_list
= topic_conf_list
335 self
.client
= boto3
.client('s3',
336 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
337 aws_access_key_id
=conn
.aws_access_key_id
,
338 aws_secret_access_key
=conn
.aws_secret_access_key
,
339 config
=Config(signature_version
='s3'))
341 def send_request(self
, method
, parameters
=None):
342 """send request to radosgw"""
343 return make_request(self
.conn
, method
, self
.resource
,
344 parameters
=parameters
, sign_parameters
=True)
346 def get_config(self
, notification
=None):
347 """get notification info"""
349 if notification
is None:
350 response
= self
.client
.get_bucket_notification_configuration(Bucket
=self
.bucket_name
)
351 status
= response
['ResponseMetadata']['HTTPStatusCode']
352 return response
, status
353 parameters
= {'notification': notification
}
354 response
, status
= self
.send_request('GET', parameters
=parameters
)
355 dict_response
= xmltodict
.parse(response
)
356 return dict_response
, status
358 def set_config(self
):
359 """set notification"""
360 response
= self
.client
.put_bucket_notification_configuration(Bucket
=self
.bucket_name
,
361 NotificationConfiguration
={
362 'TopicConfigurations': self
.topic_conf_list
364 status
= response
['ResponseMetadata']['HTTPStatusCode']
365 return response
, status
367 def del_config(self
, notification
=None):
368 """delete notification"""
369 parameters
= {'notification': notification
}
371 return self
.send_request('DELETE', parameters
)
374 class PSSubscription
:
375 """class to set/get/delete a subscription:
376 PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
377 GET /subscriptions/<sub-name>
378 DELETE /subscriptions/<sub-name>
379 also to get list of events, and ack them:
380 GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
381 POST /subscriptions/<sub-name>?ack&event-id=<event-id>
383 def __init__(self
, conn
, sub_name
, topic_name
, endpoint
=None, endpoint_args
=None):
385 assert topic_name
.strip()
386 self
.resource
= '/subscriptions/'+sub_name
387 if endpoint
is not None:
388 self
.parameters
= {'topic': topic_name
, 'push-endpoint': endpoint
}
389 self
.extra_parameters
= endpoint_args
391 self
.parameters
= {'topic': topic_name
}
392 self
.extra_parameters
= None
394 def send_request(self
, method
, parameters
=None, extra_parameters
=None):
395 """send request to radosgw"""
396 return make_request(self
.conn
, method
, self
.resource
,
397 parameters
=parameters
,
398 extra_parameters
=extra_parameters
)
400 def get_config(self
):
401 """get subscription info"""
402 return self
.send_request('GET')
404 def set_config(self
):
405 """set subscription"""
406 return self
.send_request('PUT', parameters
=self
.parameters
, extra_parameters
=self
.extra_parameters
)
408 def del_config(self
, topic
=False):
409 """delete subscription"""
411 return self
.send_request('DELETE', self
.parameters
)
412 return self
.send_request('DELETE')
414 def get_events(self
, max_entries
=None, marker
=None):
415 """ get events from subscription """
416 parameters
= {'events': None}
417 if max_entries
is not None:
418 parameters
['max-entries'] = max_entries
419 if marker
is not None:
420 parameters
['marker'] = marker
421 return self
.send_request('GET', parameters
)
423 def ack_events(self
, event_id
):
424 """ ack events in a subscription """
425 parameters
= {'ack': None, 'event-id': event_id
}
426 return self
.send_request('POST', parameters
)
430 """ pubsub zone configuration """
431 def __init__(self
, cfg
, section
):
432 self
.full_sync
= cfg
.get(section
, 'start_with_full_sync')
433 self
.retention_days
= cfg
.get(section
, 'retention_days')