]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/rgw/rgw_multi/tools.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / test / rgw / rgw_multi / tools.py
CommitLineData
31f18b77
FG
1import json
2import boto
3
4def append_attr_value(d, attr, attrv):
5 if attrv and len(str(attrv)) > 0:
6 d[attr] = attrv
7
8def append_attr(d, k, attr):
9 try:
10 attrv = getattr(k, attr)
11 except:
12 return
13 append_attr_value(d, attr, attrv)
14
15def get_attrs(k, attrs):
16 d = {}
17 for a in attrs:
18 append_attr(d, k, a)
19
20 return d
21
22def append_query_arg(s, n, v):
23 if not v:
24 return s
25 nv = '{n}={v}'.format(n=n, v=v)
26 if not s:
27 return nv
28 return '{s}&{nv}'.format(s=s, nv=nv)
29
30class KeyJSONEncoder(boto.s3.key.Key):
31 @staticmethod
32 def default(k, versioned=False):
33 attrs = ['bucket', 'name', 'size', 'last_modified', 'metadata', 'cache_control',
34 'content_type', 'content_disposition', 'content_language',
35 'owner', 'storage_class', 'md5', 'version_id', 'encrypted',
36 'delete_marker', 'expiry_date', 'VersionedEpoch', 'RgwxTag']
37 d = get_attrs(k, attrs)
38 d['etag'] = k.etag[1:-1]
39 if versioned:
40 d['is_latest'] = k.is_latest
41 return d
42
43class DeleteMarkerJSONEncoder(boto.s3.key.Key):
44 @staticmethod
45 def default(k):
46 attrs = ['name', 'version_id', 'last_modified', 'owner']
47 d = get_attrs(k, attrs)
48 d['delete_marker'] = True
49 d['is_latest'] = k.is_latest
50 return d
51
52class UserJSONEncoder(boto.s3.user.User):
53 @staticmethod
54 def default(k):
55 attrs = ['id', 'display_name']
56 return get_attrs(k, attrs)
57
58class BucketJSONEncoder(boto.s3.bucket.Bucket):
59 @staticmethod
60 def default(k):
61 attrs = ['name', 'creation_date']
62 return get_attrs(k, attrs)
63
64class BotoJSONEncoder(json.JSONEncoder):
65 def default(self, obj):
66 if isinstance(obj, boto.s3.key.Key):
67 return KeyJSONEncoder.default(obj)
68 if isinstance(obj, boto.s3.deletemarker.DeleteMarker):
69 return DeleteMarkerJSONEncoder.default(obj)
70 if isinstance(obj, boto.s3.user.User):
71 return UserJSONEncoder.default(obj)
72 if isinstance(obj, boto.s3.prefix.Prefix):
73 return (lambda x: {'prefix': x.name})(obj)
74 if isinstance(obj, boto.s3.bucket.Bucket):
75 return BucketJSONEncoder.default(obj)
76 return json.JSONEncoder.default(self, obj)
77
78
79def dump_json(o, cls=BotoJSONEncoder):
80 return json.dumps(o, cls=cls, indent=4)
81
11fdf7f2
TL
82def assert_raises(excClass, callableObj, *args, **kwargs):
83 """
84 Like unittest.TestCase.assertRaises, but returns the exception.
85 """
86 try:
87 callableObj(*args, **kwargs)
88 except excClass as e:
89 return e
90 else:
91 if hasattr(excClass, '__name__'):
92 excName = excClass.__name__
93 else:
94 excName = str(excClass)
95 raise AssertionError("%s not raised" % excName)
96
31f18b77 97