5 from typing
import List
, Dict
, Any
, Tuple
, cast
, Optional
7 from ceph
.deployment
.service_spec
import IngressSpec
8 from mgr_util
import build_url
9 from cephadm
.utils
import resolve_ip
10 from orchestrator
import OrchestratorError
11 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
, CephService
13 logger
= logging
.getLogger(__name__
)
16 class IngressService(CephService
):
19 def primary_daemon_type(self
) -> str:
22 def per_host_daemon_type(self
) -> Optional
[str]:
27 daemon_spec
: CephadmDaemonDeploySpec
,
28 ) -> CephadmDaemonDeploySpec
:
29 if daemon_spec
.daemon_type
== 'haproxy':
30 return self
.haproxy_prepare_create(daemon_spec
)
31 if daemon_spec
.daemon_type
== 'keepalived':
32 return self
.keepalived_prepare_create(daemon_spec
)
33 assert False, "unexpected daemon type"
37 daemon_spec
: CephadmDaemonDeploySpec
38 ) -> Tuple
[Dict
[str, Any
], List
[str]]:
39 if daemon_spec
.daemon_type
== 'haproxy':
40 return self
.haproxy_generate_config(daemon_spec
)
42 return self
.keepalived_generate_config(daemon_spec
)
43 assert False, "unexpected daemon type"
45 def haproxy_prepare_create(
47 daemon_spec
: CephadmDaemonDeploySpec
,
48 ) -> CephadmDaemonDeploySpec
:
49 assert daemon_spec
.daemon_type
== 'haproxy'
51 daemon_id
= daemon_spec
.daemon_id
52 host
= daemon_spec
.host
53 spec
= cast(IngressSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
55 logger
.debug('prepare_create haproxy.%s on host %s with spec %s' % (
56 daemon_id
, host
, spec
))
58 daemon_spec
.final_config
, daemon_spec
.deps
= self
.haproxy_generate_config(daemon_spec
)
62 def haproxy_generate_config(
64 daemon_spec
: CephadmDaemonDeploySpec
,
65 ) -> Tuple
[Dict
[str, Any
], List
[str]]:
66 spec
= cast(IngressSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
67 assert spec
.backend_service
68 if spec
.backend_service
not in self
.mgr
.spec_store
:
70 f
'{spec.service_name()} backend service {spec.backend_service} does not exist')
71 backend_spec
= self
.mgr
.spec_store
[spec
.backend_service
].spec
72 daemons
= self
.mgr
.cache
.get_daemons_by_service(spec
.backend_service
)
73 deps
= [d
.name() for d
in daemons
]
76 pw_key
= f
'{spec.service_name()}/monitor_password'
77 password
= self
.mgr
.get_store(pw_key
)
79 if not spec
.monitor_password
:
80 password
= ''.join(random
.choice(string
.ascii_lowercase
) for _
in range(20))
81 self
.mgr
.set_store(pw_key
, password
)
83 if spec
.monitor_password
:
84 self
.mgr
.set_store(pw_key
, None)
85 if spec
.monitor_password
:
86 password
= spec
.monitor_password
88 if backend_spec
.service_type
== 'nfs':
90 by_rank
= {d
.rank
: d
for d
in daemons
if d
.rank
is not None}
93 # try to establish how many ranks we *should* have
94 num_ranks
= backend_spec
.placement
.count
96 num_ranks
= 1 + max(by_rank
.keys())
98 for rank
in range(num_ranks
):
103 'name': f
"{spec.backend_service}.{rank}",
104 'ip': d
.ip
or resolve_ip(self
.mgr
.inventory
.get_addr(str(d
.hostname
))),
108 # offline/missing server; leave rank in place
110 'name': f
"{spec.backend_service}.{rank}",
119 'ip': d
.ip
or resolve_ip(self
.mgr
.inventory
.get_addr(str(d
.hostname
))),
121 } for d
in daemons
if d
.ports
124 haproxy_conf
= self
.mgr
.template
.render(
125 'services/ingress/haproxy.cfg.j2',
130 'user': spec
.monitor_user
or 'admin',
131 'password': password
,
132 'ip': str(spec
.virtual_ip
).split('/')[0] or daemon_spec
.ip
or '*',
133 'frontend_port': daemon_spec
.ports
[0] if daemon_spec
.ports
else spec
.frontend_port
,
134 'monitor_port': daemon_spec
.ports
[1] if daemon_spec
.ports
else spec
.monitor_port
,
139 "haproxy.cfg": haproxy_conf
,
143 ssl_cert
= spec
.ssl_cert
144 if isinstance(ssl_cert
, list):
145 ssl_cert
= '\n'.join(ssl_cert
)
146 config_files
['files']['haproxy.pem'] = ssl_cert
148 return config_files
, sorted(deps
)
150 def keepalived_prepare_create(
152 daemon_spec
: CephadmDaemonDeploySpec
,
153 ) -> CephadmDaemonDeploySpec
:
154 assert daemon_spec
.daemon_type
== 'keepalived'
156 daemon_id
= daemon_spec
.daemon_id
157 host
= daemon_spec
.host
158 spec
= cast(IngressSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
160 logger
.debug('prepare_create keepalived.%s on host %s with spec %s' % (
161 daemon_id
, host
, spec
))
163 daemon_spec
.final_config
, daemon_spec
.deps
= self
.keepalived_generate_config(daemon_spec
)
167 def keepalived_generate_config(
169 daemon_spec
: CephadmDaemonDeploySpec
,
170 ) -> Tuple
[Dict
[str, Any
], List
[str]]:
171 spec
= cast(IngressSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
172 assert spec
.backend_service
175 pw_key
= f
'{spec.service_name()}/keepalived_password'
176 password
= self
.mgr
.get_store(pw_key
)
178 if not spec
.keepalived_password
:
179 password
= ''.join(random
.choice(string
.ascii_lowercase
) for _
in range(20))
180 self
.mgr
.set_store(pw_key
, password
)
182 if spec
.keepalived_password
:
183 self
.mgr
.set_store(pw_key
, None)
184 if spec
.keepalived_password
:
185 password
= spec
.keepalived_password
187 daemons
= self
.mgr
.cache
.get_daemons_by_service(spec
.service_name())
190 raise OrchestratorError(
191 f
'Failed to generate keepalived.conf: No daemons deployed for {spec.service_name()}')
193 deps
= sorted([d
.name() for d
in daemons
if d
.daemon_type
== 'haproxy'])
195 host
= daemon_spec
.host
196 hosts
= sorted(list(set([host
] + [str(d
.hostname
) for d
in daemons
])))
199 bare_ip
= str(spec
.virtual_ip
).split('/')[0]
201 for subnet
, ifaces
in self
.mgr
.cache
.networks
.get(host
, {}).items():
202 if ifaces
and ipaddress
.ip_address(bare_ip
) in ipaddress
.ip_network(subnet
):
203 interface
= list(ifaces
.keys())[0]
205 f
'{bare_ip} is in {subnet} on {host} interface {interface}'
208 # try to find interface by matching spec.virtual_interface_networks
209 if not interface
and spec
.virtual_interface_networks
:
210 for subnet
, ifaces
in self
.mgr
.cache
.networks
.get(host
, {}).items():
211 if subnet
in spec
.virtual_interface_networks
:
212 interface
= list(ifaces
.keys())[0]
214 f
'{spec.virtual_ip} will be configured on {host} interface '
215 f
'{interface} (which has guiding subnet {subnet})'
219 raise OrchestratorError(
220 f
"Unable to identify interface for {spec.virtual_ip} on {host}"
223 # script to monitor health
224 script
= '/usr/bin/false'
226 if d
.hostname
== host
:
227 if d
.daemon_type
== 'haproxy':
229 port
= d
.ports
[1] # monitoring port
230 script
= f
'/usr/bin/curl {build_url(scheme="http", host=d.ip or "localhost", port=port)}/health'
233 # set state. first host in placement is master all others backups
238 # remove host, daemon is being deployed on from hosts list for
239 # other_ips in conf file and converter to ips
242 other_ips
= [resolve_ip(self
.mgr
.inventory
.get_addr(h
)) for h
in hosts
]
244 keepalived_conf
= self
.mgr
.template
.render(
245 'services/ingress/keepalived.conf.j2',
249 'password': password
,
250 'interface': interface
,
252 'other_ips': other_ips
,
253 'host_ip': resolve_ip(self
.mgr
.inventory
.get_addr(host
)),
259 "keepalived.conf": keepalived_conf
,
263 return config_file
, deps