import logging
-import httplib
+import ssl
import urllib
import hmac
import hashlib
import base64
+import xmltodict
+from http import client as http_client
+from urllib import parse as urlparse
from time import gmtime, strftime
-from multisite import Zone
+from .multisite import Zone
+import boto3
+from botocore.client import Config
log = logging.getLogger('rgw_multi.tests')
+def get_object_tagging(conn, bucket, object_key):
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ config=Config(signature_version='s3'))
+ return client.get_object_tagging(
+ Bucket=bucket,
+ Key=object_key
+ )
+
+
class PSZone(Zone): # pylint: disable=too-many-ancestors
""" PubSub zone class """
+ def __init__(self, name, zonegroup=None, cluster=None, data=None, zone_id=None, gateways=None, full_sync='false', retention_days ='7'):
+ self.full_sync = full_sync
+ self.retention_days = retention_days
+ self.master_zone = zonegroup.master_zone
+ super(PSZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
def is_read_only(self):
return True
def tier_type(self):
return "pubsub"
+ def syncs_from(self, zone_name):
+ return zone_name == self.master_zone.name
+
def create(self, cluster, args=None, **kwargs):
if args is None:
args = ''
- args += ['--tier-type', self.tier_type()]
+ tier_config = ','.join(['start_with_full_sync=' + self.full_sync, 'event_retention_days=' + self.retention_days])
+ args += ['--tier-type', self.tier_type(), '--sync-from-all=0', '--sync-from', self.master_zone.name, '--tier-config', tier_config]
return self.json_command(cluster, 'create', args)
def has_buckets(self):
NO_HTTP_BODY = ''
-def make_request(conn, method, resource, parameters=None):
+def make_request(conn, method, resource, parameters=None, sign_parameters=False, extra_parameters=None):
"""generic request sending to pubsub radogw
should cover: topics, notificatios and subscriptions
"""
url_params = ''
if parameters is not None:
- url_params = urllib.urlencode(parameters)
+ url_params = urlparse.urlencode(parameters)
# remove 'None' from keys with no values
url_params = url_params.replace('=None', '')
url_params = '?' + url_params
+ if extra_parameters is not None:
+ url_params = url_params + '&' + extra_parameters
string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
string_to_sign = method + '\n\n\n' + string_date + '\n' + resource
- signature = base64.b64encode(hmac.new(conn.aws_secret_access_key,
+ if sign_parameters:
+ string_to_sign += url_params
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key.encode('utf-8'),
string_to_sign.encode('utf-8'),
- hashlib.sha1).digest())
+ hashlib.sha1).digest()).decode('ascii')
headers = {'Authorization': 'AWS '+conn.aws_access_key_id+':'+signature,
'Date': string_date,
'Host': conn.host+':'+str(conn.port)}
- http_conn = httplib.HTTPConnection(conn.host, conn.port)
+ http_conn = http_client.HTTPConnection(conn.host, conn.port)
if log.getEffectiveLevel() <= 10:
http_conn.set_debuglevel(5)
http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
data = response.read()
status = response.status
http_conn.close()
- return data, status
+ return data.decode('utf-8'), status
+
+
+def print_connection_info(conn):
+ """print info of connection"""
+ print("Host: " + conn.host+':'+str(conn.port))
+ print("AWS Secret Key: " + conn.aws_secret_access_key)
+ print("AWS Access Key: " + conn.aws_access_key_id)
class PSTopic:
"""class to set/get/delete a topic
- PUT /topics/<topic name>
+ PUT /topics/<topic name>[?push-endpoint=<endpoint>&[<arg1>=<value1>...]]
GET /topics/<topic name>
DELETE /topics/<topic name>
"""
- def __init__(self, conn, topic_name):
+ def __init__(self, conn, topic_name, endpoint=None, endpoint_args=None):
self.conn = conn
assert topic_name.strip()
self.resource = '/topics/'+topic_name
+ if endpoint is not None:
+ self.parameters = {'push-endpoint': endpoint}
+ self.extra_parameters = endpoint_args
+ else:
+ self.parameters = None
+ self.extra_parameters = None
- def send_request(self, method):
+ def send_request(self, method, get_list=False, parameters=None, extra_parameters=None):
"""send request to radosgw"""
- return make_request(self.conn, method, self.resource)
+ if get_list:
+ return make_request(self.conn, method, '/topics')
+ return make_request(self.conn, method, self.resource,
+ parameters=parameters, extra_parameters=extra_parameters)
def get_config(self):
"""get topic info"""
def set_config(self):
"""set topic"""
- return self.send_request('PUT')
+ return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
def del_config(self):
"""delete topic"""
return self.send_request('DELETE')
+
+ def get_list(self):
+ """list all topics"""
+ return self.send_request('GET', get_list=True)
+
+
+class PSTopicS3:
+ """class to set/list/get/delete a topic
+ POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
+ POST ?Action=ListTopics
+ POST ?Action=GetTopic&TopicArn=<topic-arn>
+ POST ?Action=GetTopicAttributes&TopicArn=<topic-arn>
+ POST ?Action=DeleteTopic&TopicArn=<topic-arn>
+ """
+ def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
+ self.conn = conn
+ self.topic_name = topic_name.strip()
+ assert self.topic_name
+ self.topic_arn = ''
+ self.attributes = {}
+ if endpoint_args is not None:
+ self.attributes = {nvp[0] : nvp[1] for nvp in urlparse.parse_qsl(endpoint_args, keep_blank_values=True)}
+ if opaque_data is not None:
+ self.attributes['OpaqueData'] = opaque_data
+ protocol = 'https' if conn.is_secure else 'http'
+ self.client = boto3.client('sns',
+ endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ region_name=region,
+ verify='./cert.pem',
+ config=Config(signature_version='s3'))
+
+
+ def get_config(self):
+ """get topic info"""
+ parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
+ body = urlparse.urlencode(parameters)
+ string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
+ content_type = 'application/x-www-form-urlencoded; charset=utf-8'
+ resource = '/'
+ method = 'POST'
+ string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
+ log.debug('StringTosign: %s', string_to_sign)
+ signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
+ string_to_sign.encode('utf-8'),
+ hashlib.sha1).digest()).decode('ascii')
+ headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
+ 'Date': string_date,
+ 'Host': self.conn.host+':'+str(self.conn.port),
+ 'Content-Type': content_type}
+ if self.conn.is_secure:
+ http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
+ context=ssl.create_default_context(cafile='./cert.pem'))
+ else:
+ http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
+ http_conn.request(method, resource, body, headers)
+ response = http_conn.getresponse()
+ data = response.read()
+ status = response.status
+ http_conn.close()
+ dict_response = xmltodict.parse(data)
+ return dict_response, status
+
+ def get_attributes(self):
+ """get topic attributes"""
+ return self.client.get_topic_attributes(TopicArn=self.topic_arn)
+
+ def set_config(self):
+ """set topic"""
+ result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
+ self.topic_arn = result['TopicArn']
+ return self.topic_arn
+
+ def del_config(self):
+ """delete topic"""
+ result = self.client.delete_topic(TopicArn=self.topic_arn)
+ return result['ResponseMetadata']['HTTPStatusCode']
+
+ def get_list(self):
+ """list all topics"""
+ # note that boto3 supports list_topics(), however, the result only show ARNs
+ parameters = {'Action': 'ListTopics'}
+ body = urlparse.urlencode(parameters)
+ string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
+ content_type = 'application/x-www-form-urlencoded; charset=utf-8'
+ resource = '/'
+ method = 'POST'
+ string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
+ log.debug('StringTosign: %s', string_to_sign)
+ signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
+ string_to_sign.encode('utf-8'),
+ hashlib.sha1).digest()).decode('ascii')
+ headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
+ 'Date': string_date,
+ 'Host': self.conn.host+':'+str(self.conn.port),
+ 'Content-Type': content_type}
+ if self.conn.is_secure:
+ http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
+ context=ssl.create_default_context(cafile='./cert.pem'))
+ else:
+ http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
+ http_conn.request(method, resource, body, headers)
+ response = http_conn.getresponse()
+ data = response.read()
+ status = response.status
+ http_conn.close()
+ dict_response = xmltodict.parse(data)
+ return dict_response, status
class PSNotification:
return self.send_request('GET')
def set_config(self):
- """setnotification"""
+ """set notification"""
return self.send_request('PUT', self.parameters)
def del_config(self):
return self.send_request('DELETE', self.parameters)
+class PSNotificationS3:
+ """class to set/get/delete an S3 notification
+ PUT /<bucket>?notification
+ GET /<bucket>?notification[=<notification>]
+ DELETE /<bucket>?notification[=<notification>]
+ """
+ def __init__(self, conn, bucket_name, topic_conf_list):
+ self.conn = conn
+ assert bucket_name.strip()
+ self.bucket_name = bucket_name
+ self.resource = '/'+bucket_name
+ self.topic_conf_list = topic_conf_list
+ self.client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key,
+ config=Config(signature_version='s3'))
+
+ def send_request(self, method, parameters=None):
+ """send request to radosgw"""
+ return make_request(self.conn, method, self.resource,
+ parameters=parameters, sign_parameters=True)
+
+ def get_config(self, notification=None):
+ """get notification info"""
+ parameters = None
+ if notification is None:
+ response = self.client.get_bucket_notification_configuration(Bucket=self.bucket_name)
+ status = response['ResponseMetadata']['HTTPStatusCode']
+ return response, status
+ parameters = {'notification': notification}
+ response, status = self.send_request('GET', parameters=parameters)
+ dict_response = xmltodict.parse(response)
+ return dict_response, status
+
+ def set_config(self):
+ """set notification"""
+ response = self.client.put_bucket_notification_configuration(Bucket=self.bucket_name,
+ NotificationConfiguration={
+ 'TopicConfigurations': self.topic_conf_list
+ })
+ status = response['ResponseMetadata']['HTTPStatusCode']
+ return response, status
+
+ def del_config(self, notification=None):
+ """delete notification"""
+ parameters = {'notification': notification}
+
+ return self.send_request('DELETE', parameters)
+
+
class PSSubscription:
"""class to set/get/delete a subscription:
- PUT /subscriptions/<sub-name>?topic=<topic-name>
+ PUT /subscriptions/<sub-name>?topic=<topic-name>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]
GET /subscriptions/<sub-name>
DELETE /subscriptions/<sub-name>
also to get list of events, and ack them:
GET /subscriptions/<sub-name>?events[&max-entries=<max-entries>][&marker=<marker>]
POST /subscriptions/<sub-name>?ack&event-id=<event-id>
"""
- def __init__(self, conn, sub_name, topic_name):
+ def __init__(self, conn, sub_name, topic_name, endpoint=None, endpoint_args=None):
self.conn = conn
assert topic_name.strip()
self.resource = '/subscriptions/'+sub_name
- self.parameters = {'topic': topic_name}
+ if endpoint is not None:
+ self.parameters = {'topic': topic_name, 'push-endpoint': endpoint}
+ self.extra_parameters = endpoint_args
+ else:
+ self.parameters = {'topic': topic_name}
+ self.extra_parameters = None
- def send_request(self, method, parameters=None):
+ def send_request(self, method, parameters=None, extra_parameters=None):
"""send request to radosgw"""
- return make_request(self.conn, method, self.resource, parameters)
+ return make_request(self.conn, method, self.resource,
+ parameters=parameters,
+ extra_parameters=extra_parameters)
def get_config(self):
"""get subscription info"""
def set_config(self):
"""set subscription"""
- return self.send_request('PUT', self.parameters)
+ return self.send_request('PUT', parameters=self.parameters, extra_parameters=self.extra_parameters)
- def del_config(self):
+ def del_config(self, topic=False):
"""delete subscription"""
+ if topic:
+ return self.send_request('DELETE', self.parameters)
return self.send_request('DELETE')
def get_events(self, max_entries=None, marker=None):
""" ack events in a subscription """
parameters = {'ack': None, 'event-id': event_id}
return self.send_request('POST', parameters)
+
+
+class PSZoneConfig:
+ """ pubsub zone configuration """
+ def __init__(self, cfg, section):
+ self.full_sync = cfg.get(section, 'start_with_full_sync')
+ self.retention_days = cfg.get(section, 'retention_days')