9 from time
import gmtime
, strftime
10 from .multisite
import Zone
12 from botocore
.client
import Config
14 log
= logging
.getLogger('rgw_multi.tests')
17 class PSZone(Zone
): # pylint: disable=too-many-ancestors
18 """ PubSub zone class """
19 def __init__(self
, name
, zonegroup
=None, cluster
=None, data
=None, zone_id
=None, gateways
=None, full_sync
='false', retention_days
='7'):
20 self
.full_sync
= full_sync
21 self
.retention_days
= retention_days
22 self
.master_zone
= zonegroup
.master_zone
23 super(PSZone
, self
).__init
__(name
, zonegroup
, cluster
, data
, zone_id
, gateways
)
25 def is_read_only(self
):
31 def create(self
, cluster
, args
=None, **kwargs
):
34 tier_config
= ','.join(['start_with_full_sync=' + self
.full_sync
, 'event_retention_days=' + self
.retention_days
])
35 args
+= ['--tier-type', self
.tier_type(), '--sync-from-all=0', '--sync-from', self
.master_zone
.name
, '--tier-config', tier_config
]
36 return self
.json_command(cluster
, 'create', args
)
38 def has_buckets(self
):
45 def print_connection_info(conn
):
46 """print connection details"""
47 print('Endpoint: ' + conn
.host
+ ':' + str(conn
.port
))
48 print('AWS Access Key:: ' + conn
.aws_access_key_id
)
49 print('AWS Secret Key:: ' + conn
.aws_secret_access_key
)
52 def make_request(conn
, method
, resource
, parameters
=None, sign_parameters
=False, extra_parameters
=None):
53 """generic request sending to pubsub radogw
54 should cover: topics, notificatios and subscriptions
57 if parameters
is not None:
58 url_params
= urllib
.urlencode(parameters
)
59 # remove 'None' from keys with no values
60 url_params
= url_params
.replace('=None', '')
61 url_params
= '?' + url_params
62 if extra_parameters
is not None:
63 url_params
= url_params
+ '&' + extra_parameters
64 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
65 string_to_sign
= method
+ '\n\n\n' + string_date
+ '\n' + resource
67 string_to_sign
+= url_params
68 signature
= base64
.b64encode(hmac
.new(conn
.aws_secret_access_key
,
69 string_to_sign
.encode('utf-8'),
70 hashlib
.sha1
).digest())
71 headers
= {'Authorization': 'AWS '+conn
.aws_access_key_id
+':'+signature
,
73 'Host': conn
.host
+':'+str(conn
.port
)}
74 http_conn
= httplib
.HTTPConnection(conn
.host
, conn
.port
)
75 if log
.getEffectiveLevel() <= 10:
76 http_conn
.set_debuglevel(5)
77 http_conn
.request(method
, resource
+url_params
, NO_HTTP_BODY
, headers
)
78 response
= http_conn
.getresponse()
79 data
= response
.read()
80 status
= response
.status
85 def print_connection_info(conn
):
86 """print info of connection"""
87 print("Host: " + conn
.host
+':'+str(conn
.port
))
88 print("AWS Secret Key: " + conn
.aws_secret_access_key
)
89 print("AWS Access Key: " + conn
.aws_access_key_id
)
93 """class to set/get/delete a topic
94 PUT /topics/<topic name>[?push-endpoint=<endpoint>&[<arg1>=<value1>...]]
95 GET /topics/<topic name>
96 DELETE /topics/<topic name>
98 def __init__(self
, conn
, topic_name
, endpoint
=None, endpoint_args
=None):
100 assert topic_name
.strip()
101 self
.resource
= '/topics/'+topic_name
102 if endpoint
is not None:
103 self
.parameters
= {'push-endpoint': endpoint
}
104 self
.extra_parameters
= endpoint_args
106 self
.parameters
= None
107 self
.extra_parameters
= None
109 def send_request(self
, method
, get_list
=False, parameters
=None, extra_parameters
=None):
110 """send request to radosgw"""
112 return make_request(self
.conn
, method
, '/topics')
113 return make_request(self
.conn
, method
, self
.resource
,
114 parameters
=parameters
, extra_parameters
=extra_parameters
)
116 def get_config(self
):
118 return self
.send_request('GET')
120 def set_config(self
):
122 return self
.send_request('PUT', parameters
=self
.parameters
, extra_parameters
=self
.extra_parameters
)
124 def del_config(self
):
126 return self
.send_request('DELETE')
129 """list all topics"""
130 return self
.send_request('GET', get_list
=True)
133 def delete_all_s3_topics(conn
, region
):
135 client
= boto3
.client('sns',
136 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
137 aws_access_key_id
=conn
.aws_access_key_id
,
138 aws_secret_access_key
=conn
.aws_secret_access_key
,
140 config
=Config(signature_version
='s3'))
142 topics
= client
.list_topics()['Topics']
144 print 'topic cleanup, deleting: ' + topic
['TopicArn']
145 assert client
.delete_topic(TopicArn
=topic
['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
147 print 'failed to do topic cleanup. if there are topics they may need to be manually deleted'
151 """class to set/list/get/delete a topic
152 POST ?Action=CreateTopic&Name=<topic name>&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
153 POST ?Action=ListTopics
154 POST ?Action=GetTopic&TopicArn=<topic-arn>
155 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
157 def __init__(self
, conn
, topic_name
, region
, endpoint_args
=None):
159 self
.topic_name
= topic_name
.strip()
160 assert self
.topic_name
163 if endpoint_args
is not None:
164 self
.attributes
= {nvp
[0] : nvp
[1] for nvp
in urlparse
.parse_qsl(endpoint_args
, keep_blank_values
=True)}
165 self
.client
= boto3
.client('sns',
166 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
167 aws_access_key_id
=conn
.aws_access_key_id
,
168 aws_secret_access_key
=conn
.aws_secret_access_key
,
170 config
=Config(signature_version
='s3'))
173 def get_config(self
):
175 parameters
= {'Action': 'GetTopic', 'TopicArn': self
.topic_arn
}
176 body
= urllib
.urlencode(parameters
)
177 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
178 content_type
= 'application/x-www-form-urlencoded; charset=utf-8'
181 string_to_sign
= method
+ '\n\n' + content_type
+ '\n' + string_date
+ '\n' + resource
182 log
.debug('StringTosign: %s', string_to_sign
)
183 signature
= base64
.b64encode(hmac
.new(self
.conn
.aws_secret_access_key
,
184 string_to_sign
.encode('utf-8'),
185 hashlib
.sha1
).digest())
186 headers
= {'Authorization': 'AWS '+self
.conn
.aws_access_key_id
+':'+signature
,
188 'Host': self
.conn
.host
+':'+str(self
.conn
.port
),
189 'Content-Type': content_type
}
190 http_conn
= httplib
.HTTPConnection(self
.conn
.host
, self
.conn
.port
)
191 if log
.getEffectiveLevel() <= 10:
192 http_conn
.set_debuglevel(5)
193 http_conn
.request(method
, resource
, body
, headers
)
194 response
= http_conn
.getresponse()
195 data
= response
.read()
196 status
= response
.status
198 dict_response
= xmltodict
.parse(data
)
199 return dict_response
, status
201 def set_config(self
):
203 result
= self
.client
.create_topic(Name
=self
.topic_name
, Attributes
=self
.attributes
)
204 self
.topic_arn
= result
['TopicArn']
205 return self
.topic_arn
207 def del_config(self
):
209 result
= self
.client
.delete_topic(TopicArn
=self
.topic_arn
)
210 return result
['ResponseMetadata']['HTTPStatusCode']
213 """list all topics"""
214 return self
.client
.list_topics()
217 class PSNotification
:
218 """class to set/get/delete a notification
219 PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
220 GET /notifications/bucket/<bucket>
221 DELETE /notifications/bucket/<bucket>?topic=<topic-name>
223 def __init__(self
, conn
, bucket_name
, topic_name
, events
=''):
225 assert bucket_name
.strip()
226 assert topic_name
.strip()
227 self
.resource
= '/notifications/bucket/'+bucket_name
229 self
.parameters
= {'topic': topic_name
, 'events': events
}
231 self
.parameters
= {'topic': topic_name
}
233 def send_request(self
, method
, parameters
=None):
234 """send request to radosgw"""
235 return make_request(self
.conn
, method
, self
.resource
, parameters
)
237 def get_config(self
):
238 """get notification info"""
239 return self
.send_request('GET')
241 def set_config(self
):
242 """set notification"""
243 return self
.send_request('PUT', self
.parameters
)
245 def del_config(self
):
246 """delete notification"""
247 return self
.send_request('DELETE', self
.parameters
)
250 class PSNotificationS3
:
251 """class to set/get/delete an S3 notification
252 PUT /<bucket>?notification
253 GET /<bucket>?notification[=<notification>]
254 DELETE /<bucket>?notification[=<notification>]
256 def __init__(self
, conn
, bucket_name
, topic_conf_list
):
258 assert bucket_name
.strip()
259 self
.bucket_name
= bucket_name
260 self
.resource
= '/'+bucket_name
261 self
.topic_conf_list
= topic_conf_list
262 self
.client
= boto3
.client('s3',
263 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
264 aws_access_key_id
=conn
.aws_access_key_id
,
265 aws_secret_access_key
=conn
.aws_secret_access_key
,
266 config
=Config(signature_version
='s3'))
268 def send_request(self
, method
, parameters
=None):
269 """send request to radosgw"""
270 return make_request(self
.conn
, method
, self
.resource
,
271 parameters
=parameters
, sign_parameters
=True)
273 def get_config(self
, notification
=None):
274 """get notification info"""
276 if notification
is None:
277 response
= self
.client
.get_bucket_notification_configuration(Bucket
=self
.bucket_name
)
278 status
= response
['ResponseMetadata']['HTTPStatusCode']
279 return response
, status
280 parameters
= {'notification': notification
}
281 response
, status
= self
.send_request('GET', parameters
=parameters
)
282 dict_response
= xmltodict
.parse(response
)
283 return dict_response
, status
285 def set_config(self
):
286 """set notification"""
287 response
= self
.client
.put_bucket_notification_configuration(Bucket
=self
.bucket_name
,
288 NotificationConfiguration
={
289 'TopicConfigurations': self
.topic_conf_list
291 status
= response
['ResponseMetadata']['HTTPStatusCode']
292 return response
, status
294 def del_config(self
, notification
=None):
295 """delete notification"""
296 parameters
= {'notification': notification
}
298 return self
.send_request('DELETE', parameters
)
301 class PSSubscription
:
302 """class to set/get/delete a subscription:
303 PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
304 GET /subscriptions/<sub-name>
305 DELETE /subscriptions/<sub-name>
306 also to get list of events, and ack them:
307 GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
308 POST /subscriptions/<sub-name>?ack&event-id=<event-id>
310 def __init__(self
, conn
, sub_name
, topic_name
, endpoint
=None, endpoint_args
=None):
312 assert topic_name
.strip()
313 self
.resource
= '/subscriptions/'+sub_name
314 if endpoint
is not None:
315 self
.parameters
= {'topic': topic_name
, 'push-endpoint': endpoint
}
316 self
.extra_parameters
= endpoint_args
318 self
.parameters
= {'topic': topic_name
}
319 self
.extra_parameters
= None
321 def send_request(self
, method
, parameters
=None, extra_parameters
=None):
322 """send request to radosgw"""
323 return make_request(self
.conn
, method
, self
.resource
,
324 parameters
=parameters
,
325 extra_parameters
=extra_parameters
)
327 def get_config(self
):
328 """get subscription info"""
329 return self
.send_request('GET')
331 def set_config(self
):
332 """set subscription"""
333 return self
.send_request('PUT', parameters
=self
.parameters
, extra_parameters
=self
.extra_parameters
)
335 def del_config(self
, topic
=False):
336 """delete subscription"""
338 return self
.send_request('DELETE', self
.parameters
)
339 return self
.send_request('DELETE')
341 def get_events(self
, max_entries
=None, marker
=None):
342 """ get events from subscription """
343 parameters
= {'events': None}
344 if max_entries
is not None:
345 parameters
['max-entries'] = max_entries
346 if marker
is not None:
347 parameters
['marker'] = marker
348 return self
.send_request('GET', parameters
)
350 def ack_events(self
, event_id
):
351 """ ack events in a subscription """
352 parameters
= {'ack': None, 'event-id': event_id
}
353 return self
.send_request('POST', parameters
)
357 """ pubsub zone configuration """
358 def __init__(self
, cfg
, section
):
359 self
.full_sync
= cfg
.get(section
, 'start_with_full_sync')
360 self
.retention_days
= cfg
.get(section
, 'retention_days')