8 from http
import client
as http_client
9 from urllib
import parse
as urlparse
10 from time
import gmtime
, strftime
11 from .multisite
import Zone
13 from botocore
.client
import Config
15 log
= logging
.getLogger('rgw_multi.tests')
18 def get_object_tagging(conn
, bucket
, object_key
):
19 client
= boto3
.client('s3',
20 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
21 aws_access_key_id
=conn
.aws_access_key_id
,
22 aws_secret_access_key
=conn
.aws_secret_access_key
,
23 config
=Config(signature_version
='s3'))
24 return client
.get_object_tagging(
30 class PSZone(Zone
): # pylint: disable=too-many-ancestors
31 """ PubSub zone class """
32 def __init__(self
, name
, zonegroup
=None, cluster
=None, data
=None, zone_id
=None, gateways
=None, full_sync
='false', retention_days
='7'):
33 self
.full_sync
= full_sync
34 self
.retention_days
= retention_days
35 self
.master_zone
= zonegroup
.master_zone
36 super(PSZone
, self
).__init
__(name
, zonegroup
, cluster
, data
, zone_id
, gateways
)
38 def is_read_only(self
):
44 def syncs_from(self
, zone_name
):
45 return zone_name
== self
.master_zone
.name
47 def create(self
, cluster
, args
=None, **kwargs
):
50 tier_config
= ','.join(['start_with_full_sync=' + self
.full_sync
, 'event_retention_days=' + self
.retention_days
])
51 args
+= ['--tier-type', self
.tier_type(), '--sync-from-all=0', '--sync-from', self
.master_zone
.name
, '--tier-config', tier_config
]
52 return self
.json_command(cluster
, 'create', args
)
54 def has_buckets(self
):
61 def make_request(conn
, method
, resource
, parameters
=None, sign_parameters
=False, extra_parameters
=None):
62 """generic request sending to pubsub radogw
63 should cover: topics, notificatios and subscriptions
66 if parameters
is not None:
67 url_params
= urlparse
.urlencode(parameters
)
68 # remove 'None' from keys with no values
69 url_params
= url_params
.replace('=None', '')
70 url_params
= '?' + url_params
71 if extra_parameters
is not None:
72 url_params
= url_params
+ '&' + extra_parameters
73 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
74 string_to_sign
= method
+ '\n\n\n' + string_date
+ '\n' + resource
76 string_to_sign
+= url_params
77 signature
= base64
.b64encode(hmac
.new(conn
.aws_secret_access_key
.encode('utf-8'),
78 string_to_sign
.encode('utf-8'),
79 hashlib
.sha1
).digest()).decode('ascii')
80 headers
= {'Authorization': 'AWS '+conn
.aws_access_key_id
+':'+signature
,
82 'Host': conn
.host
+':'+str(conn
.port
)}
83 http_conn
= http_client
.HTTPConnection(conn
.host
, conn
.port
)
84 if log
.getEffectiveLevel() <= 10:
85 http_conn
.set_debuglevel(5)
86 http_conn
.request(method
, resource
+url_params
, NO_HTTP_BODY
, headers
)
87 response
= http_conn
.getresponse()
88 data
= response
.read()
89 status
= response
.status
91 return data
.decode('utf-8'), status
94 def print_connection_info(conn
):
95 """print info of connection"""
96 print("Host: " + conn
.host
+':'+str(conn
.port
))
97 print("AWS Secret Key: " + conn
.aws_secret_access_key
)
98 print("AWS Access Key: " + conn
.aws_access_key_id
)
102 """class to set/get/delete a topic
103 PUT /topics/<topic name>[?push-endpoint=<endpoint>&[<arg1>=<value1>...]]
104 GET /topics/<topic name>
105 DELETE /topics/<topic name>
107 def __init__(self
, conn
, topic_name
, endpoint
=None, endpoint_args
=None):
109 assert topic_name
.strip()
110 self
.resource
= '/topics/'+topic_name
111 if endpoint
is not None:
112 self
.parameters
= {'push-endpoint': endpoint
}
113 self
.extra_parameters
= endpoint_args
115 self
.parameters
= None
116 self
.extra_parameters
= None
118 def send_request(self
, method
, get_list
=False, parameters
=None, extra_parameters
=None):
119 """send request to radosgw"""
121 return make_request(self
.conn
, method
, '/topics')
122 return make_request(self
.conn
, method
, self
.resource
,
123 parameters
=parameters
, extra_parameters
=extra_parameters
)
125 def get_config(self
):
127 return self
.send_request('GET')
129 def set_config(self
):
131 return self
.send_request('PUT', parameters
=self
.parameters
, extra_parameters
=self
.extra_parameters
)
133 def del_config(self
):
135 return self
.send_request('DELETE')
138 """list all topics"""
139 return self
.send_request('GET', get_list
=True)
143 """class to set/list/get/delete a topic
144 POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
145 POST ?Action=ListTopics
146 POST ?Action=GetTopic&TopicArn=<topic-arn>
147 POST ?Action=GetTopicAttributes&TopicArn=<topic-arn>
148 POST ?Action=DeleteTopic&TopicArn=<topic-arn>
150 def __init__(self
, conn
, topic_name
, region
, endpoint_args
=None, opaque_data
=None):
152 self
.topic_name
= topic_name
.strip()
153 assert self
.topic_name
156 if endpoint_args
is not None:
157 self
.attributes
= {nvp
[0] : nvp
[1] for nvp
in urlparse
.parse_qsl(endpoint_args
, keep_blank_values
=True)}
158 if opaque_data
is not None:
159 self
.attributes
['OpaqueData'] = opaque_data
160 protocol
= 'https' if conn
.is_secure
else 'http'
161 self
.client
= boto3
.client('sns',
162 endpoint_url
=protocol
+'://'+conn
.host
+':'+str(conn
.port
),
163 aws_access_key_id
=conn
.aws_access_key_id
,
164 aws_secret_access_key
=conn
.aws_secret_access_key
,
167 config
=Config(signature_version
='s3'))
170 def get_config(self
):
172 parameters
= {'Action': 'GetTopic', 'TopicArn': self
.topic_arn
}
173 body
= urlparse
.urlencode(parameters
)
174 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
175 content_type
= 'application/x-www-form-urlencoded; charset=utf-8'
178 string_to_sign
= method
+ '\n\n' + content_type
+ '\n' + string_date
+ '\n' + resource
179 log
.debug('StringTosign: %s', string_to_sign
)
180 signature
= base64
.b64encode(hmac
.new(self
.conn
.aws_secret_access_key
.encode('utf-8'),
181 string_to_sign
.encode('utf-8'),
182 hashlib
.sha1
).digest()).decode('ascii')
183 headers
= {'Authorization': 'AWS '+self
.conn
.aws_access_key_id
+':'+signature
,
185 'Host': self
.conn
.host
+':'+str(self
.conn
.port
),
186 'Content-Type': content_type
}
187 if self
.conn
.is_secure
:
188 http_conn
= http_client
.HTTPSConnection(self
.conn
.host
, self
.conn
.port
,
189 context
=ssl
.create_default_context(cafile
='./cert.pem'))
191 http_conn
= http_client
.HTTPConnection(self
.conn
.host
, self
.conn
.port
)
192 http_conn
.request(method
, resource
, body
, headers
)
193 response
= http_conn
.getresponse()
194 data
= response
.read()
195 status
= response
.status
197 dict_response
= xmltodict
.parse(data
)
198 return dict_response
, status
200 def get_attributes(self
):
201 """get topic attributes"""
202 return self
.client
.get_topic_attributes(TopicArn
=self
.topic_arn
)
204 def set_config(self
):
206 result
= self
.client
.create_topic(Name
=self
.topic_name
, Attributes
=self
.attributes
)
207 self
.topic_arn
= result
['TopicArn']
208 return self
.topic_arn
210 def del_config(self
):
212 result
= self
.client
.delete_topic(TopicArn
=self
.topic_arn
)
213 return result
['ResponseMetadata']['HTTPStatusCode']
216 """list all topics"""
217 # note that boto3 supports list_topics(), however, the result only show ARNs
218 parameters
= {'Action': 'ListTopics'}
219 body
= urlparse
.urlencode(parameters
)
220 string_date
= strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
221 content_type
= 'application/x-www-form-urlencoded; charset=utf-8'
224 string_to_sign
= method
+ '\n\n' + content_type
+ '\n' + string_date
+ '\n' + resource
225 log
.debug('StringTosign: %s', string_to_sign
)
226 signature
= base64
.b64encode(hmac
.new(self
.conn
.aws_secret_access_key
.encode('utf-8'),
227 string_to_sign
.encode('utf-8'),
228 hashlib
.sha1
).digest()).decode('ascii')
229 headers
= {'Authorization': 'AWS '+self
.conn
.aws_access_key_id
+':'+signature
,
231 'Host': self
.conn
.host
+':'+str(self
.conn
.port
),
232 'Content-Type': content_type
}
233 if self
.conn
.is_secure
:
234 http_conn
= http_client
.HTTPSConnection(self
.conn
.host
, self
.conn
.port
,
235 context
=ssl
.create_default_context(cafile
='./cert.pem'))
237 http_conn
= http_client
.HTTPConnection(self
.conn
.host
, self
.conn
.port
)
238 http_conn
.request(method
, resource
, body
, headers
)
239 response
= http_conn
.getresponse()
240 data
= response
.read()
241 status
= response
.status
243 dict_response
= xmltodict
.parse(data
)
244 return dict_response
, status
247 class PSNotification
:
248 """class to set/get/delete a notification
249 PUT /notifications/bucket/<bucket>?topic=<topic-name>[&events=<event>[,<event>]]
250 GET /notifications/bucket/<bucket>
251 DELETE /notifications/bucket/<bucket>?topic=<topic-name>
253 def __init__(self
, conn
, bucket_name
, topic_name
, events
=''):
255 assert bucket_name
.strip()
256 assert topic_name
.strip()
257 self
.resource
= '/notifications/bucket/'+bucket_name
259 self
.parameters
= {'topic': topic_name
, 'events': events
}
261 self
.parameters
= {'topic': topic_name
}
263 def send_request(self
, method
, parameters
=None):
264 """send request to radosgw"""
265 return make_request(self
.conn
, method
, self
.resource
, parameters
)
267 def get_config(self
):
268 """get notification info"""
269 return self
.send_request('GET')
271 def set_config(self
):
272 """set notification"""
273 return self
.send_request('PUT', self
.parameters
)
275 def del_config(self
):
276 """delete notification"""
277 return self
.send_request('DELETE', self
.parameters
)
280 class PSNotificationS3
:
281 """class to set/get/delete an S3 notification
282 PUT /<bucket>?notification
283 GET /<bucket>?notification[=<notification>]
284 DELETE /<bucket>?notification[=<notification>]
286 def __init__(self
, conn
, bucket_name
, topic_conf_list
):
288 assert bucket_name
.strip()
289 self
.bucket_name
= bucket_name
290 self
.resource
= '/'+bucket_name
291 self
.topic_conf_list
= topic_conf_list
292 self
.client
= boto3
.client('s3',
293 endpoint_url
='http://'+conn
.host
+':'+str(conn
.port
),
294 aws_access_key_id
=conn
.aws_access_key_id
,
295 aws_secret_access_key
=conn
.aws_secret_access_key
,
296 config
=Config(signature_version
='s3'))
298 def send_request(self
, method
, parameters
=None):
299 """send request to radosgw"""
300 return make_request(self
.conn
, method
, self
.resource
,
301 parameters
=parameters
, sign_parameters
=True)
303 def get_config(self
, notification
=None):
304 """get notification info"""
306 if notification
is None:
307 response
= self
.client
.get_bucket_notification_configuration(Bucket
=self
.bucket_name
)
308 status
= response
['ResponseMetadata']['HTTPStatusCode']
309 return response
, status
310 parameters
= {'notification': notification
}
311 response
, status
= self
.send_request('GET', parameters
=parameters
)
312 dict_response
= xmltodict
.parse(response
)
313 return dict_response
, status
315 def set_config(self
):
316 """set notification"""
317 response
= self
.client
.put_bucket_notification_configuration(Bucket
=self
.bucket_name
,
318 NotificationConfiguration
={
319 'TopicConfigurations': self
.topic_conf_list
321 status
= response
['ResponseMetadata']['HTTPStatusCode']
322 return response
, status
324 def del_config(self
, notification
=None):
325 """delete notification"""
326 parameters
= {'notification': notification
}
328 return self
.send_request('DELETE', parameters
)
331 class PSSubscription
:
332 """class to set/get/delete a subscription:
333 PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
334 GET /subscriptions/<sub-name>
335 DELETE /subscriptions/<sub-name>
336 also to get list of events, and ack them:
337 GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
338 POST /subscriptions/<sub-name>?ack&event-id=<event-id>
340 def __init__(self
, conn
, sub_name
, topic_name
, endpoint
=None, endpoint_args
=None):
342 assert topic_name
.strip()
343 self
.resource
= '/subscriptions/'+sub_name
344 if endpoint
is not None:
345 self
.parameters
= {'topic': topic_name
, 'push-endpoint': endpoint
}
346 self
.extra_parameters
= endpoint_args
348 self
.parameters
= {'topic': topic_name
}
349 self
.extra_parameters
= None
351 def send_request(self
, method
, parameters
=None, extra_parameters
=None):
352 """send request to radosgw"""
353 return make_request(self
.conn
, method
, self
.resource
,
354 parameters
=parameters
,
355 extra_parameters
=extra_parameters
)
357 def get_config(self
):
358 """get subscription info"""
359 return self
.send_request('GET')
361 def set_config(self
):
362 """set subscription"""
363 return self
.send_request('PUT', parameters
=self
.parameters
, extra_parameters
=self
.extra_parameters
)
365 def del_config(self
, topic
=False):
366 """delete subscription"""
368 return self
.send_request('DELETE', self
.parameters
)
369 return self
.send_request('DELETE')
371 def get_events(self
, max_entries
=None, marker
=None):
372 """ get events from subscription """
373 parameters
= {'events': None}
374 if max_entries
is not None:
375 parameters
['max-entries'] = max_entries
376 if marker
is not None:
377 parameters
['marker'] = marker
378 return self
.send_request('GET', parameters
)
380 def ack_events(self
, event_id
):
381 """ ack events in a subscription """
382 parameters
= {'ack': None, 'event-id': event_id
}
383 return self
.send_request('POST', parameters
)
387 """ pubsub zone configuration """
388 def __init__(self
, cfg
, section
):
389 self
.full_sync
= cfg
.get(section
, 'start_with_full_sync')
390 self
.retention_days
= cfg
.get(section
, 'retention_days')