]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/registry.py
2 from typing
import List
, Dict
, Tuple
3 from requests
import Response
8 def __init__(self
, url
: str):
12 def api_domain(self
) -> str:
13 if self
._url
== 'docker.io':
14 return 'registry-1.docker.io'
17 def get_token(self
, response
: Response
) -> str:
18 realm
, params
= self
.parse_www_authenticate(response
.headers
['Www-Authenticate'])
19 r
= requests
.get(realm
, params
=params
)
22 if 'access_token' in ret
:
23 return ret
['access_token']
26 raise ValueError(f
'Unknown token reply {ret}')
28 def parse_www_authenticate(self
, text
: str) -> Tuple
[str, Dict
[str, str]]:
29 # 'Www-Authenticate': 'Bearer realm="https://auth.docker.io/token",service="registry.docker.io",scope="repository:ceph/ceph:pull"'
30 r
: Dict
[str, str] = {}
31 for token
in text
.split(','):
32 key
, value
= token
.split('=', 1)
33 r
[key
] = value
.strip('"')
34 realm
= r
.pop('Bearer realm')
37 def get_tags(self
, image
: str) -> List
[str]:
39 headers
= {'Accept': 'application/json'}
40 url
= f
'https://{self.api_domain}/v2/{image}/tags/list'
43 r
= requests
.get(url
, headers
=headers
)
44 except requests
.exceptions
.ConnectionError
as e
:
45 msg
= f
"Cannot get tags from url '{url}': {e}"
46 raise ValueError(msg
) from e
47 if r
.status_code
== 401:
48 if 'Authorization' in headers
:
49 raise ValueError('failed authentication')
50 token
= self
.get_token(r
)
51 headers
['Authorization'] = f
'Bearer {token}'
55 new_tags
= r
.json()['tags']
58 if 'Link' not in r
.headers
:
61 # strip < > brackets off and prepend the domain
62 url
= f
'https://{self.api_domain}' + r
.headers
['Link'].split(';')[0][1:-1]