10 from time
import gmtime
, strftime
11 from .multisite
import Zone
13 from botocore
.client
import Config
15 log
= logging
.getLogger('rgw_multi.tests')
17 def put_object_tagging(conn
, bucket_name
, key
, tags
):
18 client
= boto3
.client('s3',
19 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
20 aws_access_key_id
=conn
.aws_access_key_id
,
21 aws_secret_access_key
=conn
.aws_secret_access_key
,
22 config
=Config(signature_version
='s3'))
23 return client
.put_object(Body
='aaaaaaaaaaa', Bucket
=bucket_name
, Key
=key
, Tagging
=tags
)
26 def get_object_tagging(conn
, bucket
, object_key
):
27 client
= boto3
.client('s3',
28 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
29 aws_access_key_id
=conn
.aws_access_key_id
,
30 aws_secret_access_key
=conn
.aws_secret_access_key
,
31 config
=Config(signature_version
='s3'))
32 return client
.get_object_tagging(
38 class PSZone(Zone
): # pylint: disable=too-many-ancestors
39 """ PubSub zone class """
40 def __init__(self
, name
, zonegroup
=None, cluster
=None, data
=None, zone_id
=None, gateways
=None, full_sync
='false', retention_days
='7'):
41 self
.full_sync
= full_sync
42 self
.retention_days
= retention_days
43 self
.master_zone
= zonegroup
.master_zone
44 super(PSZone
, self
).__init
__(name
, zonegroup
, cluster
, data
, zone_id
, gateways
)
46 def is_read_only(self
):
52 def syncs_from(self
, zone_name
):
53 return zone_name
== self
.master_zone
.name
55 def create(self
, cluster
, args
=None, **kwargs
):
58 tier_config
= ','.join(['start_with_full_sync=' + self
.full_sync
, 'event_retention_days=' + self
.retention_days
])
59 args
+= ['--tier-type', self
.tier_type(), '--sync-from-all=0', '--sync-from', self
.master_zone
.name
, '--tier-config', tier_config
]
60 return self
.json_command(cluster
, 'create', args
)
62 def has_buckets(self
):
69 def make_request(conn
, method
, resource
, parameters
=None, sign_parameters
=False, extra_parameters
=None):
70 """generic request sending to pubsub radogw
71 should cover: topics, notificatios and subscriptions
74 if parameters
is not None:
75 url_params
= urllib
.urlencode(parameters
)
76 # remove 'None' from keys with no values
77 url_params
= url_params
.replace('=None', '')
78 url_params
= '?' + url_params
79 if extra_parameters
is not None:
80 url_params
= url_params
+ '&' + extra_parameters
81 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
82 string_to_sign
= method
+ '\n\n\n' + string_date
+ '\n' + resource
84 string_to_sign
+= url_params
85 signature
= base64
.b64encode(hmac
.new(conn
.aws_secret_access_key
,
86 string_to_sign
.encode('utf-8'),
87 hashlib
.sha1
).digest())
88 headers
= {'Authorization': 'AWS '+conn
.aws_access_key_id
+':'+signature
,
90 'Host': conn
.host
+':'+str(conn
.port
)}
91 http_conn
= httplib
.HTTPConnection(conn
.host
, conn
.port
)
92 if log
.getEffectiveLevel() <= 10:
93 http_conn
.set_debuglevel(5)
94 http_conn
.request(method
, resource
+url_params
, NO_HTTP_BODY
, headers
)
95 response
= http_conn
.getresponse()
96 data
= response
.read()
97 status
= response
.status
102 def print_connection_info(conn
):
103 """print info of connection"""
104 print("Host: " + conn
.host
+':'+str(conn
.port
))
105 print("AWS Secret Key: " + conn
.aws_secret_access_key
)
106 print("AWS Access Key: " + conn
.aws_access_key_id
)
110 """class to set/get/delete a topic
111 PUT /topics/<topic name>[?push-endpoint=<endpoint>&[<arg1>=<value1>...]]
112 GET /topics/<topic name>
113 DELETE /topics/<topic name>
115 def __init__(self
, conn
, topic_name
, endpoint
=None, endpoint_args
=None):
117 assert topic_name
.strip()
118 self
.resource
= '/topics/'+topic_name
119 if endpoint
is not None:
120 self
.parameters
= {'push-endpoint': endpoint
}
121 self
.extra_parameters
= endpoint_args
123 self
.parameters
= None
124 self
.extra_parameters
= None
126 def send_request(self
, method
, get_list
=False, parameters
=None, extra_parameters
=None):
127 """send request to radosgw"""
129 return make_request(self
.conn
, method
, '/topics')
130 return make_request(self
.conn
, method
, self
.resource
,
131 parameters
=parameters
, extra_parameters
=extra_parameters
)
133 def get_config(self
):
135 return self
.send_request('GET')
137 def set_config(self
):
139 return self
.send_request('PUT', parameters
=self
.parameters
, extra_parameters
=self
.extra_parameters
)
141 def del_config(self
):
143 return self
.send_request('DELETE')
146 """list all topics"""
147 return self
.send_request('GET', get_list
=True)
150 def delete_all_s3_topics(zone
, region
):
152 conn
= zone
.secure_conn
if zone
.secure_conn
is not None else zone
.conn
153 protocol
= 'https' if conn
.is_secure
else 'http'
154 client
= boto3
.client('sns',
155 endpoint_url
=protocol
+'://'+conn
.host
+':'+str(conn
.port
),
156 aws_access_key_id
=conn
.aws_access_key_id
,
157 aws_secret_access_key
=conn
.aws_secret_access_key
,
160 config
=Config(signature_version
='s3'))
162 topics
= client
.list_topics()['Topics']
164 print('topic cleanup, deleting: ' + topic
['TopicArn'])
165 assert client
.delete_topic(TopicArn
=topic
['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
166 except Exception as err
:
167 print('failed to do topic cleanup: ' + str(err
))
170 def delete_all_objects(conn
, bucket_name
):
171 client
= boto3
.client('s3',
172 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
173 aws_access_key_id
=conn
.aws_access_key_id
,
174 aws_secret_access_key
=conn
.aws_secret_access_key
)
177 for key
in client
.list_objects(Bucket
=bucket_name
)['Contents']:
178 objects
.append({'Key': key
['Key']})
179 # delete objects from the bucket
180 response
= client
.delete_objects(Bucket
=bucket_name
,
181 Delete
={'Objects': objects
})
186 """class to set/list/get/delete a topic
187 POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
188 POST ?Action=ListTopics
189 POST ?Action=GetTopic&TopicArn=<topic-arn>
190 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
192 def __init__(self
, conn
, topic_name
, region
, endpoint_args
=None, opaque_data
=None):
194 self
.topic_name
= topic_name
.strip()
195 assert self
.topic_name
198 if endpoint_args
is not None:
199 self
.attributes
= {nvp
[0] : nvp
[1] for nvp
in urlparse
.parse_qsl(endpoint_args
, keep_blank_values
=True)}
200 if opaque_data
is not None:
201 self
.attributes
['OpaqueData'] = opaque_data
202 protocol
= 'https' if conn
.is_secure
else 'http'
203 self
.client
= boto3
.client('sns',
204 endpoint_url
=protocol
+'://'+conn
.host
+':'+str(conn
.port
),
205 aws_access_key_id
=conn
.aws_access_key_id
,
206 aws_secret_access_key
=conn
.aws_secret_access_key
,
209 config
=Config(signature_version
='s3'))
212 def get_config(self
):
214 parameters
= {'Action': 'GetTopic', 'TopicArn': self
.topic_arn
}
215 body
= urllib
.urlencode(parameters
)
216 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
217 content_type
= 'application/x-www-form-urlencoded; charset=utf-8'
220 string_to_sign
= method
+ '\n\n' + content_type
+ '\n' + string_date
+ '\n' + resource
221 log
.debug('StringTosign: %s', string_to_sign
)
222 signature
= base64
.b64encode(hmac
.new(self
.conn
.aws_secret_access_key
,
223 string_to_sign
.encode('utf-8'),
224 hashlib
.sha1
).digest())
225 headers
= {'Authorization': 'AWS '+self
.conn
.aws_access_key_id
+':'+signature
,
227 'Host': self
.conn
.host
+':'+str(self
.conn
.port
),
228 'Content-Type': content_type
}
229 if self
.conn
.is_secure
:
230 http_conn
= httplib
.HTTPSConnection(self
.conn
.host
, self
.conn
.port
,
231 context
=ssl
.create_default_context(cafile
='./cert.pem'))
233 http_conn
= httplib
.HTTPConnection(self
.conn
.host
, self
.conn
.port
)
234 http_conn
.request(method
, resource
, body
, headers
)
235 response
= http_conn
.getresponse()
236 data
= response
.read()
237 status
= response
.status
239 dict_response
= xmltodict
.parse(data
)
240 return dict_response
, status
242 def set_config(self
):
244 result
= self
.client
.create_topic(Name
=self
.topic_name
, Attributes
=self
.attributes
)
245 self
.topic_arn
= result
['TopicArn']
246 return self
.topic_arn
248 def del_config(self
):
250 result
= self
.client
.delete_topic(TopicArn
=self
.topic_arn
)
251 return result
['ResponseMetadata']['HTTPStatusCode']
254 """list all topics"""
255 # note that boto3 supports list_topics(), however, the result only show ARNs
256 parameters
= {'Action': 'ListTopics'}
257 body
= urllib
.urlencode(parameters
)
258 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
259 content_type
= 'application/x-www-form-urlencoded; charset=utf-8'
262 string_to_sign
= method
+ '\n\n' + content_type
+ '\n' + string_date
+ '\n' + resource
263 log
.debug('StringTosign: %s', string_to_sign
)
264 signature
= base64
.b64encode(hmac
.new(self
.conn
.aws_secret_access_key
,
265 string_to_sign
.encode('utf-8'),
266 hashlib
.sha1
).digest())
267 headers
= {'Authorization': 'AWS '+self
.conn
.aws_access_key_id
+':'+signature
,
269 'Host': self
.conn
.host
+':'+str(self
.conn
.port
),
270 'Content-Type': content_type
}
271 if self
.conn
.is_secure
:
272 http_conn
= httplib
.HTTPSConnection(self
.conn
.host
, self
.conn
.port
,
273 context
=ssl
.create_default_context(cafile
='./cert.pem'))
275 http_conn
= httplib
.HTTPConnection(self
.conn
.host
, self
.conn
.port
)
276 http_conn
.request(method
, resource
, body
, headers
)
277 response
= http_conn
.getresponse()
278 data
= response
.read()
279 status
= response
.status
281 dict_response
= xmltodict
.parse(data
)
282 return dict_response
, status
285 class PSNotification
:
286 """class to set/get/delete a notification
287 PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
288 GET /notifications/bucket/<bucket>
289 DELETE /notifications/bucket/<bucket>?topic=<topic-name>
291 def __init__(self
, conn
, bucket_name
, topic_name
, events
=''):
293 assert bucket_name
.strip()
294 assert topic_name
.strip()
295 self
.resource
= '/notifications/bucket/'+bucket_name
297 self
.parameters
= {'topic': topic_name
, 'events': events
}
299 self
.parameters
= {'topic': topic_name
}
301 def send_request(self
, method
, parameters
=None):
302 """send request to radosgw"""
303 return make_request(self
.conn
, method
, self
.resource
, parameters
)
305 def get_config(self
):
306 """get notification info"""
307 return self
.send_request('GET')
309 def set_config(self
):
310 """set notification"""
311 return self
.send_request('PUT', self
.parameters
)
313 def del_config(self
):
314 """delete notification"""
315 return self
.send_request('DELETE', self
.parameters
)
318 class PSNotificationS3
:
319 """class to set/get/delete an S3 notification
320 PUT /<bucket>?notification
321 GET /<bucket>?notification[=<notification>]
322 DELETE /<bucket>?notification[=<notification>]
324 def __init__(self
, conn
, bucket_name
, topic_conf_list
):
326 assert bucket_name
.strip()
327 self
.bucket_name
= bucket_name
328 self
.resource
= '/'+bucket_name
329 self
.topic_conf_list
= topic_conf_list
330 self
.client
= boto3
.client('s3',
331 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
332 aws_access_key_id
=conn
.aws_access_key_id
,
333 aws_secret_access_key
=conn
.aws_secret_access_key
,
334 config
=Config(signature_version
='s3'))
336 def send_request(self
, method
, parameters
=None):
337 """send request to radosgw"""
338 return make_request(self
.conn
, method
, self
.resource
,
339 parameters
=parameters
, sign_parameters
=True)
341 def get_config(self
, notification
=None):
342 """get notification info"""
344 if notification
is None:
345 response
= self
.client
.get_bucket_notification_configuration(Bucket
=self
.bucket_name
)
346 status
= response
['ResponseMetadata']['HTTPStatusCode']
347 return response
, status
348 parameters
= {'notification': notification
}
349 response
, status
= self
.send_request('GET', parameters
=parameters
)
350 dict_response
= xmltodict
.parse(response
)
351 return dict_response
, status
353 def set_config(self
):
354 """set notification"""
355 response
= self
.client
.put_bucket_notification_configuration(Bucket
=self
.bucket_name
,
356 NotificationConfiguration
={
357 'TopicConfigurations': self
.topic_conf_list
359 status
= response
['ResponseMetadata']['HTTPStatusCode']
360 return response
, status
362 def del_config(self
, notification
=None):
363 """delete notification"""
364 parameters
= {'notification': notification
}
366 return self
.send_request('DELETE', parameters
)
369 class PSSubscription
:
370 """class to set/get/delete a subscription:
371 PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
372 GET /subscriptions/<sub-name>
373 DELETE /subscriptions/<sub-name>
374 also to get list of events, and ack them:
375 GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
376 POST /subscriptions/<sub-name>?ack&event-id=<event-id>
378 def __init__(self
, conn
, sub_name
, topic_name
, endpoint
=None, endpoint_args
=None):
380 assert topic_name
.strip()
381 self
.resource
= '/subscriptions/'+sub_name
382 if endpoint
is not None:
383 self
.parameters
= {'topic': topic_name
, 'push-endpoint': endpoint
}
384 self
.extra_parameters
= endpoint_args
386 self
.parameters
= {'topic': topic_name
}
387 self
.extra_parameters
= None
389 def send_request(self
, method
, parameters
=None, extra_parameters
=None):
390 """send request to radosgw"""
391 return make_request(self
.conn
, method
, self
.resource
,
392 parameters
=parameters
,
393 extra_parameters
=extra_parameters
)
395 def get_config(self
):
396 """get subscription info"""
397 return self
.send_request('GET')
399 def set_config(self
):
400 """set subscription"""
401 return self
.send_request('PUT', parameters
=self
.parameters
, extra_parameters
=self
.extra_parameters
)
403 def del_config(self
, topic
=False):
404 """delete subscription"""
406 return self
.send_request('DELETE', self
.parameters
)
407 return self
.send_request('DELETE')
409 def get_events(self
, max_entries
=None, marker
=None):
410 """ get events from subscription """
411 parameters
= {'events': None}
412 if max_entries
is not None:
413 parameters
['max-entries'] = max_entries
414 if marker
is not None:
415 parameters
['marker'] = marker
416 return self
.send_request('GET', parameters
)
418 def ack_events(self
, event_id
):
419 """ ack events in a subscription """
420 parameters
= {'ack': None, 'event-id': event_id
}
421 return self
.send_request('POST', parameters
)
425 """ pubsub zone configuration """
426 def __init__(self
, cfg
, section
):
427 self
.full_sync
= cfg
.get(section
, 'start_with_full_sync')
428 self
.retention_days
= cfg
.get(section
, 'retention_days')