import json
+import base64
+import binascii
+import errno
from abc import abstractmethod
self.stderr = orig.stdout
else:
self.message = message
- self.retcode = 0
- self.stdout = None
- self.stderr = None
+ self.retcode = -errno.EINVAL
+ self.stdout = ''
+ self.stderr = message
class RGWAMCmdRunException(RGWAMException):
pass
@abstractmethod
- def apply_rgw(self, svc_id, realm_name, zone_name, port=None):
+ def apply_rgw(self, spec):
pass
@abstractmethod
class RealmToken(JSONObj):
- def __init__(self, realm_id, endpoint, uid, access_key, secret):
+ def __init__(self, realm_name, realm_id, endpoint, access_key, secret):
+ self.realm_name = realm_name
self.realm_id = realm_id
self.endpoint = endpoint
- self.uid = uid
self.access_key = access_key
self.secret = secret
+ @classmethod
+ def from_base64_str(cls, realm_token_b64):
+ try:
+ realm_token_b = base64.b64decode(realm_token_b64)
+ realm_token_s = realm_token_b.decode('utf-8')
+ realm_token = json.loads(realm_token_s)
+ return cls(**realm_token)
+ except binascii.Error:
+ return None
+
class RGWZone(JSONObj):
def __init__(self, zone_dict):
return self.find_zonegroup_by_id(self.master_zonegroup)
return self.zonegroups_by_name.get(zonegroup)
+ def get_master_zonegroup(self):
+ return self.find_zonegroup_by_id(self.master_zonegroup)
+
def find_zonegroup_by_id(self, zonegroup):
return self.zonegroups_by_id.get(zonegroup)
is_system = d.get('system') or 'false'
self.system = (is_system == 'true')
+
+ def add_key(self, access_key, secret):
+ self.keys.append(RGWAccessKey({'user': self.uid,
+ 'access_key': access_key,
+ 'secret_key': secret}))
+
+ def get_key(self, index):
+ return self.keys[index] if index < len(self.keys) else None