5 from typing
import List
, Dict
, Any
, Tuple
, cast
, Optional
7 from ceph
.deployment
.service_spec
import IngressSpec
8 from mgr_util
import build_url
9 from cephadm
.utils
import resolve_ip
10 from orchestrator
import OrchestratorError
11 from cephadm
.services
.cephadmservice
import CephadmDaemonDeploySpec
, CephService
13 logger
= logging
.getLogger(__name__
)
16 class IngressService(CephService
):
18 MAX_KEEPALIVED_PASS_LEN
= 8
20 def primary_daemon_type(self
) -> str:
23 def per_host_daemon_type(self
) -> Optional
[str]:
28 daemon_spec
: CephadmDaemonDeploySpec
,
29 ) -> CephadmDaemonDeploySpec
:
30 if daemon_spec
.daemon_type
== 'haproxy':
31 return self
.haproxy_prepare_create(daemon_spec
)
32 if daemon_spec
.daemon_type
== 'keepalived':
33 return self
.keepalived_prepare_create(daemon_spec
)
34 assert False, "unexpected daemon type"
38 daemon_spec
: CephadmDaemonDeploySpec
39 ) -> Tuple
[Dict
[str, Any
], List
[str]]:
40 if daemon_spec
.daemon_type
== 'haproxy':
41 return self
.haproxy_generate_config(daemon_spec
)
43 return self
.keepalived_generate_config(daemon_spec
)
44 assert False, "unexpected daemon type"
46 def haproxy_prepare_create(
48 daemon_spec
: CephadmDaemonDeploySpec
,
49 ) -> CephadmDaemonDeploySpec
:
50 assert daemon_spec
.daemon_type
== 'haproxy'
52 daemon_id
= daemon_spec
.daemon_id
53 host
= daemon_spec
.host
54 spec
= cast(IngressSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
56 logger
.debug('prepare_create haproxy.%s on host %s with spec %s' % (
57 daemon_id
, host
, spec
))
59 daemon_spec
.final_config
, daemon_spec
.deps
= self
.haproxy_generate_config(daemon_spec
)
63 def haproxy_generate_config(
65 daemon_spec
: CephadmDaemonDeploySpec
,
66 ) -> Tuple
[Dict
[str, Any
], List
[str]]:
67 spec
= cast(IngressSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
68 assert spec
.backend_service
69 if spec
.backend_service
not in self
.mgr
.spec_store
:
71 f
'{spec.service_name()} backend service {spec.backend_service} does not exist')
72 backend_spec
= self
.mgr
.spec_store
[spec
.backend_service
].spec
73 daemons
= self
.mgr
.cache
.get_daemons_by_service(spec
.backend_service
)
74 deps
= [d
.name() for d
in daemons
]
77 pw_key
= f
'{spec.service_name()}/monitor_password'
78 password
= self
.mgr
.get_store(pw_key
)
80 if not spec
.monitor_password
:
81 password
= ''.join(random
.choice(string
.ascii_lowercase
)
82 for _
in range(self
.MAX_KEEPALIVED_PASS_LEN
))
83 self
.mgr
.set_store(pw_key
, password
)
85 if spec
.monitor_password
:
86 self
.mgr
.set_store(pw_key
, None)
87 if spec
.monitor_password
:
88 password
= spec
.monitor_password
90 if backend_spec
.service_type
== 'nfs':
92 by_rank
= {d
.rank
: d
for d
in daemons
if d
.rank
is not None}
95 # try to establish how many ranks we *should* have
96 num_ranks
= backend_spec
.placement
.count
98 num_ranks
= 1 + max(by_rank
.keys())
100 for rank
in range(num_ranks
):
105 'name': f
"{spec.backend_service}.{rank}",
106 'ip': d
.ip
or resolve_ip(self
.mgr
.inventory
.get_addr(str(d
.hostname
))),
110 # offline/missing server; leave rank in place
112 'name': f
"{spec.backend_service}.{rank}",
121 'ip': d
.ip
or resolve_ip(self
.mgr
.inventory
.get_addr(str(d
.hostname
))),
123 } for d
in daemons
if d
.ports
126 haproxy_conf
= self
.mgr
.template
.render(
127 'services/ingress/haproxy.cfg.j2',
132 'user': spec
.monitor_user
or 'admin',
133 'password': password
,
134 'ip': "*" if spec
.virtual_ips_list
else str(spec
.virtual_ip
).split('/')[0] or daemon_spec
.ip
or '*',
135 'frontend_port': daemon_spec
.ports
[0] if daemon_spec
.ports
else spec
.frontend_port
,
136 'monitor_port': daemon_spec
.ports
[1] if daemon_spec
.ports
else spec
.monitor_port
,
141 "haproxy.cfg": haproxy_conf
,
145 ssl_cert
= spec
.ssl_cert
146 if isinstance(ssl_cert
, list):
147 ssl_cert
= '\n'.join(ssl_cert
)
148 config_files
['files']['haproxy.pem'] = ssl_cert
150 return config_files
, sorted(deps
)
152 def keepalived_prepare_create(
154 daemon_spec
: CephadmDaemonDeploySpec
,
155 ) -> CephadmDaemonDeploySpec
:
156 assert daemon_spec
.daemon_type
== 'keepalived'
158 daemon_id
= daemon_spec
.daemon_id
159 host
= daemon_spec
.host
160 spec
= cast(IngressSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
162 logger
.debug('prepare_create keepalived.%s on host %s with spec %s' % (
163 daemon_id
, host
, spec
))
165 daemon_spec
.final_config
, daemon_spec
.deps
= self
.keepalived_generate_config(daemon_spec
)
169 def keepalived_generate_config(
171 daemon_spec
: CephadmDaemonDeploySpec
,
172 ) -> Tuple
[Dict
[str, Any
], List
[str]]:
173 spec
= cast(IngressSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
174 assert spec
.backend_service
177 pw_key
= f
'{spec.service_name()}/keepalived_password'
178 password
= self
.mgr
.get_store(pw_key
)
180 if not spec
.keepalived_password
:
181 password
= ''.join(random
.choice(string
.ascii_lowercase
)
182 for _
in range(self
.MAX_KEEPALIVED_PASS_LEN
))
183 self
.mgr
.set_store(pw_key
, password
)
185 if spec
.keepalived_password
:
186 self
.mgr
.set_store(pw_key
, None)
187 if spec
.keepalived_password
:
188 password
= spec
.keepalived_password
190 daemons
= self
.mgr
.cache
.get_daemons_by_service(spec
.service_name())
193 raise OrchestratorError(
194 f
'Failed to generate keepalived.conf: No daemons deployed for {spec.service_name()}')
196 deps
= sorted([d
.name() for d
in daemons
if d
.daemon_type
== 'haproxy'])
198 host
= daemon_spec
.host
199 hosts
= sorted(list(set([host
] + [str(d
.hostname
) for d
in daemons
])))
204 bare_ips
.append(str(spec
.virtual_ip
).split('/')[0])
205 elif spec
.virtual_ips_list
:
206 bare_ips
= [str(vip
).split('/')[0] for vip
in spec
.virtual_ips_list
]
208 for bare_ip
in bare_ips
:
209 for subnet
, ifaces
in self
.mgr
.cache
.networks
.get(host
, {}).items():
210 if ifaces
and ipaddress
.ip_address(bare_ip
) in ipaddress
.ip_network(subnet
):
211 interface
= list(ifaces
.keys())[0]
213 f
'{bare_ip} is in {subnet} on {host} interface {interface}'
219 # try to find interface by matching spec.virtual_interface_networks
220 if not interface
and spec
.virtual_interface_networks
:
221 for subnet
, ifaces
in self
.mgr
.cache
.networks
.get(host
, {}).items():
222 if subnet
in spec
.virtual_interface_networks
:
223 interface
= list(ifaces
.keys())[0]
225 f
'{spec.virtual_ip} will be configured on {host} interface '
226 f
'{interface} (which has guiding subnet {subnet})'
230 raise OrchestratorError(
231 f
"Unable to identify interface for {spec.virtual_ip} on {host}"
234 # script to monitor health
235 script
= '/usr/bin/false'
237 if d
.hostname
== host
:
238 if d
.daemon_type
== 'haproxy':
240 port
= d
.ports
[1] # monitoring port
241 script
= f
'/usr/bin/curl {build_url(scheme="http", host=d.ip or "localhost", port=port)}/health'
248 # Set state and priority. Have one master for each VIP. Or at least the first one as master if only one VIP.
250 virtual_ips
.append(spec
.virtual_ip
)
252 states
.append('MASTER')
253 priorities
.append(100)
255 states
.append('BACKUP')
256 priorities
.append(90)
258 elif spec
.virtual_ips_list
:
259 virtual_ips
= spec
.virtual_ips_list
260 if len(virtual_ips
) > len(hosts
):
261 raise OrchestratorError(
262 "Number of virtual IPs for ingress is greater than number of available hosts"
264 for x
in range(len(virtual_ips
)):
266 states
.append('MASTER')
267 priorities
.append(100)
269 states
.append('BACKUP')
270 priorities
.append(90)
272 # remove host, daemon is being deployed on from hosts list for
273 # other_ips in conf file and converter to ips
276 other_ips
= [resolve_ip(self
.mgr
.inventory
.get_addr(h
)) for h
in hosts
]
278 keepalived_conf
= self
.mgr
.template
.render(
279 'services/ingress/keepalived.conf.j2',
283 'password': password
,
284 'interface': interface
,
285 'virtual_ips': virtual_ips
,
287 'priorities': priorities
,
288 'other_ips': other_ips
,
289 'host_ip': resolve_ip(self
.mgr
.inventory
.get_addr(host
)),
295 "keepalived.conf": keepalived_conf
,
299 return config_file
, deps