import logging
-import httplib
import ssl
import urllib
-import urlparse
import hmac
import hashlib
import base64
import xmltodict
+from http import client as http_client
+from urllib import parse as urlparse
from time import gmtime, strftime
from .multisite import Zone
import boto3
log = logging.getLogger('rgw_multi.tests')
-def put_object_tagging(conn, bucket_name, key, tags):
- client = boto3.client('s3',
- endpoint_url='http://'+conn.host+':'+str(conn.port),
- aws_access_key_id=conn.aws_access_key_id,
- aws_secret_access_key=conn.aws_secret_access_key,
- config=Config(signature_version='s3'))
- return client.put_object(Body='aaaaaaaaaaa', Bucket=bucket_name, Key=key, Tagging=tags)
-
def get_object_tagging(conn, bucket, object_key):
client = boto3.client('s3',
"""
url_params = ''
if parameters is not None:
- url_params = urllib.urlencode(parameters)
+ url_params = urlparse.urlencode(parameters)
# remove 'None' from keys with no values
url_params = url_params.replace('=None', '')
url_params = '?' + url_params
string_to_sign = method + '\n\n\n' + string_date + '\n' + resource
if sign_parameters:
string_to_sign += url_params
- signature = base64.b64encode(hmac.new(conn.aws_secret_access_key,
+ signature = base64.b64encode(hmac.new(conn.aws_secret_access_key.encode('utf-8'),
string_to_sign.encode('utf-8'),
- hashlib.sha1).digest())
+ hashlib.sha1).digest()).decode('ascii')
headers = {'Authorization': 'AWS '+conn.aws_access_key_id+':'+signature,
'Date': string_date,
'Host': conn.host+':'+str(conn.port)}
- http_conn = httplib.HTTPConnection(conn.host, conn.port)
+ http_conn = http_client.HTTPConnection(conn.host, conn.port)
if log.getEffectiveLevel() <= 10:
http_conn.set_debuglevel(5)
http_conn.request(method, resource+url_params, NO_HTTP_BODY, headers)
data = response.read()
status = response.status
http_conn.close()
- return data, status
+ return data.decode('utf-8'), status
def print_connection_info(conn):
return self.send_request('GET', get_list=True)
-def delete_all_s3_topics(zone, region):
- try:
- conn = zone.secure_conn if zone.secure_conn is not None else zone.conn
- protocol = 'https' if conn.is_secure else 'http'
- client = boto3.client('sns',
- endpoint_url=protocol+'://'+conn.host+':'+str(conn.port),
- aws_access_key_id=conn.aws_access_key_id,
- aws_secret_access_key=conn.aws_secret_access_key,
- region_name=region,
- verify='./cert.pem',
- config=Config(signature_version='s3'))
-
- topics = client.list_topics()['Topics']
- for topic in topics:
- print('topic cleanup, deleting: ' + topic['TopicArn'])
- assert client.delete_topic(TopicArn=topic['TopicArn'])['ResponseMetadata']['HTTPStatusCode'] == 200
- except Exception as err:
- print('failed to do topic cleanup: ' + str(err))
-
-
-def delete_all_objects(conn, bucket_name):
- client = boto3.client('s3',
- endpoint_url='http://'+conn.host+':'+str(conn.port),
- aws_access_key_id=conn.aws_access_key_id,
- aws_secret_access_key=conn.aws_secret_access_key)
-
- objects = []
- for key in client.list_objects(Bucket=bucket_name)['Contents']:
- objects.append({'Key': key['Key']})
- # delete objects from the bucket
- response = client.delete_objects(Bucket=bucket_name,
- Delete={'Objects': objects})
- return response
-
-
class PSTopicS3:
"""class to set/list/get/delete a topic
POST ?Action=CreateTopic&Name=<topic name>[&OpaqueData=<data>[&push-endpoint=<endpoint>&[<arg1>=<value1>...]]]
POST ?Action=ListTopics
POST ?Action=GetTopic&TopicArn=<topic-arn>
+ POST ?Action=GetTopicAttributes&TopicArn=<topic-arn>
POST ?Action=DeleteTopic&TopicArn=<topic-arn>
"""
def __init__(self, conn, topic_name, region, endpoint_args=None, opaque_data=None):
def get_config(self):
"""get topic info"""
parameters = {'Action': 'GetTopic', 'TopicArn': self.topic_arn}
- body = urllib.urlencode(parameters)
+ body = urlparse.urlencode(parameters)
string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
content_type = 'application/x-www-form-urlencoded; charset=utf-8'
resource = '/'
method = 'POST'
string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
log.debug('StringTosign: %s', string_to_sign)
- signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key,
+ signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
string_to_sign.encode('utf-8'),
- hashlib.sha1).digest())
+ hashlib.sha1).digest()).decode('ascii')
headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
'Date': string_date,
'Host': self.conn.host+':'+str(self.conn.port),
'Content-Type': content_type}
if self.conn.is_secure:
- http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port,
+ http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
context=ssl.create_default_context(cafile='./cert.pem'))
else:
- http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
+ http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
http_conn.request(method, resource, body, headers)
response = http_conn.getresponse()
data = response.read()
dict_response = xmltodict.parse(data)
return dict_response, status
+ def get_attributes(self):
+ """get topic attributes"""
+ return self.client.get_topic_attributes(TopicArn=self.topic_arn)
+
def set_config(self):
"""set topic"""
result = self.client.create_topic(Name=self.topic_name, Attributes=self.attributes)
"""list all topics"""
# note that boto3 supports list_topics(), however, the result only show ARNs
parameters = {'Action': 'ListTopics'}
- body = urllib.urlencode(parameters)
+ body = urlparse.urlencode(parameters)
string_date = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime())
content_type = 'application/x-www-form-urlencoded; charset=utf-8'
resource = '/'
method = 'POST'
string_to_sign = method + '\n\n' + content_type + '\n' + string_date + '\n' + resource
log.debug('StringTosign: %s', string_to_sign)
- signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key,
+ signature = base64.b64encode(hmac.new(self.conn.aws_secret_access_key.encode('utf-8'),
string_to_sign.encode('utf-8'),
- hashlib.sha1).digest())
+ hashlib.sha1).digest()).decode('ascii')
headers = {'Authorization': 'AWS '+self.conn.aws_access_key_id+':'+signature,
'Date': string_date,
'Host': self.conn.host+':'+str(self.conn.port),
'Content-Type': content_type}
if self.conn.is_secure:
- http_conn = httplib.HTTPSConnection(self.conn.host, self.conn.port,
+ http_conn = http_client.HTTPSConnection(self.conn.host, self.conn.port,
context=ssl.create_default_context(cafile='./cert.pem'))
else:
- http_conn = httplib.HTTPConnection(self.conn.host, self.conn.port)
+ http_conn = http_client.HTTPConnection(self.conn.host, self.conn.port)
http_conn.request(method, resource, body, headers)
response = http_conn.getresponse()
data = response.read()