2 ceph-mgr Ansible orchestrator module
4 The external Orchestrator is the Ansible runner service (RESTful https service)
7 # pylint: disable=abstract-method, no-member, bad-continuation
12 from mgr_module
import MgrModule
15 from .ansible_runner_svc
import Client
, PlayBookExecution
, ExecutionStatusCode
,\
16 AnsibleRunnerServiceError
18 from .output_wizards
import ProcessInventory
, ProcessPlaybookResult
, \
21 # Time to clean the completions list
24 # List of playbooks names used
26 # Name of the playbook used in the "get_inventory" method.
27 # This playbook is expected to provide a list of storage devices in the host
28 # where the playbook is executed.
29 GET_STORAGE_DEVICES_CATALOG_PLAYBOOK
= "storage-inventory.yml"
31 # Used in the create_osd method
32 ADD_OSD_PLAYBOOK
= "add-osd.yml"
34 # Used in the remove_osds method
35 REMOVE_OSD_PLAYBOOK
= "shrink-osd.yml"
37 # Default name for the inventory group for hosts managed by the Orchestrator
38 ORCHESTRATOR_GROUP
= "orchestrator"
40 # URLs for Ansible Runner Operations
41 # Add or remove host in one group
42 URL_ADD_RM_HOSTS
= "api/v1/hosts/{host_name}/groups/{inventory_group}"
44 # Retrieve the groups where the host is included in.
45 URL_GET_HOST_GROUPS
= "api/v1/hosts/{host_name}"
47 URL_MANAGE_GROUP
= "api/v1/groups/{group_name}"
48 # URLs for Ansible Runner Operations
49 URL_GET_HOSTS
= "api/v1/hosts"
52 class AnsibleReadOperation(orchestrator
.ReadCompletion
):
53 """ A read operation means to obtain information from the cluster.
55 def __init__(self
, client
, logger
):
57 :param client : Ansible Runner Service Client
58 :param logger : The object used to log messages
60 super(AnsibleReadOperation
, self
).__init
__()
63 self
._is
_complete
= False
64 self
._is
_errored
= False
66 self
._status
= ExecutionStatusCode
.NOT_LAUNCHED
68 # Object used to process operation result in different ways
69 self
.output_wizard
= None
71 # Error description in operation
74 # Ansible Runner Service client
75 self
.ar_client
= client
80 # OutputWizard object used to process the result
81 self
.output_wizard
= None
84 def is_complete(self
):
85 return self
._is
_complete
89 return self
._is
_errored
97 """Retrieve the current status of the operation and update state
100 raise NotImplementedError()
102 class ARSOperation(AnsibleReadOperation
):
103 """Execute an Ansible Runner Service Operation
106 def __init__(self
, client
, logger
, url
, get_operation
=True, payload
=None):
108 :param client : Ansible Runner Service Client
109 :param logger : The object used to log messages
110 :param url : The Ansible Runner Service URL that provides
112 :param get_operation : True if operation is provided using an http GET
113 :param payload : http request payload
115 super(ARSOperation
, self
).__init
__(client
, logger
)
118 self
.get_operation
= get_operation
119 self
.payload
= payload
122 return "Ansible Runner Service: {operation} {url}".format(
123 operation
="GET" if self
.get_operation
else "POST",
128 """ Execute the Ansible Runner Service operation and update the status
129 and result of the underlying Completion object.
132 # Execute the right kind of http request
133 if self
.get_operation
:
134 response
= self
.ar_client
.http_get(self
.url
)
136 response
= self
.ar_client
.http_post(self
.url
, self
.payload
)
138 # If no connection errors, the operation is complete
139 self
._is
_complete
= True
141 # Depending of the response, status and result is updated
143 self
._is
_errored
= True
144 self
._status
= ExecutionStatusCode
.ERROR
145 self
._result
= "Ansible Runner Service not Available"
147 self
._is
_errored
= (response
.status_code
!= requests
.codes
.ok
)
149 if not self
._is
_errored
:
150 self
._status
= ExecutionStatusCode
.SUCCESS
151 if self
.output_wizard
:
152 self
._result
= self
.output_wizard
.process(self
.url
,
155 self
._result
= response
.text
157 self
._status
= ExecutionStatusCode
.ERROR
158 self
._result
= response
.reason
163 class PlaybookOperation(AnsibleReadOperation
):
164 """Execute a playbook using the Ansible Runner Service
167 def __init__(self
, client
, playbook
, logger
, result_pattern
,
171 :param client : Ansible Runner Service Client
172 :param playbook : The playbook to execute
173 :param logger : The object used to log messages
174 :param result_pattern: The "pattern" to discover what execution events
175 have the information deemed as result
176 :param params : http request payload for the playbook execution
177 :param querystr_dict : http request querystring for the playbook
178 execution (DO NOT MODIFY HERE)
181 super(PlaybookOperation
, self
).__init
__(client
, logger
)
184 self
.playbook
= playbook
186 # An aditional filter of result events based in the event
187 self
.event_filter
= ""
189 # Playbook execution object
190 self
.pb_execution
= PlayBookExecution(client
,
198 return "Playbook {playbook_name}".format(playbook_name
=self
.playbook
)
202 """Check the status of the playbook execution and update the status
203 and result of the underlying Completion object.
206 if self
._status
in [ExecutionStatusCode
.ON_GOING
,
207 ExecutionStatusCode
.NOT_LAUNCHED
]:
208 self
._status
= self
.pb_execution
.get_status()
210 self
._is
_complete
= (self
._status
== ExecutionStatusCode
.SUCCESS
) or \
211 (self
._status
== ExecutionStatusCode
.ERROR
)
213 self
._is
_errored
= (self
._status
== ExecutionStatusCode
.ERROR
)
215 if self
._is
_complete
:
220 def execute_playbook(self
):
221 """Launch the execution of the playbook with the parameters configured
224 self
.pb_execution
.launch()
225 except AnsibleRunnerServiceError
:
226 self
._status
= ExecutionStatusCode
.ERROR
229 def update_result(self
):
230 """Output of the read operation
232 The result of the playbook execution can be customized through the
233 function provided as 'process_output' attribute
235 :return string: Result of the operation formatted if it is possible
238 processed_result
= []
240 if self
._is
_complete
:
241 raw_result
= self
.pb_execution
.get_result(self
.event_filter
)
243 if self
.output_wizard
:
244 processed_result
= self
.output_wizard
.process(self
.pb_execution
.play_uuid
,
247 processed_result
= raw_result
249 self
._result
= processed_result
252 class AnsibleChangeOperation(orchestrator
.WriteCompletion
):
253 """Operations that changes the "cluster" state
255 Modifications/Changes (writes) are a two-phase thing, firstly execute
256 the playbook that is going to change elements in the Ceph Cluster.
257 When the playbook finishes execution (independently of the result),
258 the modification/change operation has finished.
261 super(AnsibleChangeOperation
, self
).__init
__()
263 self
._status
= ExecutionStatusCode
.NOT_LAUNCHED
266 # Object used to process operation result in different ways
267 self
.output_wizard
= None
271 """Return the status code of the operation
273 raise NotImplementedError()
276 def is_persistent(self
):
278 Has the operation updated the orchestrator's configuration
279 persistently? Typically this would indicate that an update
280 had been written to a manifest, but that the update
281 had not necessarily been pushed out to the cluster.
283 :return Boolean: True if the execution of the Ansible Playbook or the
284 operation over the Ansible Runner Service has finished
287 return self
._status
in [ExecutionStatusCode
.SUCCESS
,
288 ExecutionStatusCode
.ERROR
]
291 def is_effective(self
):
292 """Has the operation taken effect on the cluster?
293 For example, if we were adding a service, has it come up and appeared
294 in Ceph's cluster maps?
296 In the case of Ansible, this will be True if the playbooks has been
297 executed succesfully.
299 :return Boolean: if the playbook/ARS operation has been executed
303 return self
._status
== ExecutionStatusCode
.SUCCESS
306 def is_errored(self
):
307 return self
._status
== ExecutionStatusCode
.ERROR
313 class HttpOperation(object):
314 """A class to ease the management of http operations
317 def __init__(self
, url
, http_operation
, payload
="", query_string
="{}"):
319 self
.http_operation
= http_operation
320 self
.payload
= payload
321 self
.query_string
= query_string
324 class ARSChangeOperation(AnsibleChangeOperation
):
325 """Execute one or more Ansible Runner Service Operations that implies
326 a change in the cluster
328 def __init__(self
, client
, logger
, operations
):
330 :param client : Ansible Runner Service Client
331 :param logger : The object used to log messages
332 :param operations : A list of http_operation objects
333 :param payload : dict with http request payload
335 super(ARSChangeOperation
, self
).__init
__()
337 assert operations
, "At least one operation is needed"
338 self
.ar_client
= client
340 self
.operations
= operations
343 # Use the last operation as the main
344 return "Ansible Runner Service: {operation} {url}".format(
345 operation
=self
.operations
[-1].http_operation
,
346 url
=self
.operations
[-1].url
)
350 """Execute the Ansible Runner Service operations and update the status
351 and result of the underlying Completion object.
354 for my_request
in self
.operations
:
355 # Execute the right kind of http request
357 if my_request
.http_operation
== "post":
358 response
= self
.ar_client
.http_post(my_request
.url
,
360 my_request
.query_string
)
361 elif my_request
.http_operation
== "delete":
362 response
= self
.ar_client
.http_delete(my_request
.url
)
363 elif my_request
.http_operation
== "get":
364 response
= self
.ar_client
.http_get(my_request
.url
)
366 # Any problem executing the secuence of operations will
367 # produce an errored completion object.
368 if response
.status_code
!= requests
.codes
.ok
:
369 self
._status
= ExecutionStatusCode
.ERROR
370 self
._result
= response
.text
373 # Any kind of error communicating with ARS or preventing
374 # to have a right http response
375 except AnsibleRunnerServiceError
as ex
:
376 self
._status
= ExecutionStatusCode
.ERROR
377 self
._result
= str(ex
)
380 # If this point is reached, all the operations has been succesfuly
381 # executed, and the final result is updated
382 self
._status
= ExecutionStatusCode
.SUCCESS
383 if self
.output_wizard
:
384 self
._result
= self
.output_wizard
.process("", response
.text
)
386 self
._result
= response
.text
390 class Module(MgrModule
, orchestrator
.Orchestrator
):
391 """An Orchestrator that uses <Ansible Runner Service> to perform operations
395 {'name': 'server_url'},
396 {'name': 'username'},
397 {'name': 'password'},
398 {'name': 'verify_server'} # Check server identity (Boolean/path to CA bundle)
401 def __init__(self
, *args
, **kwargs
):
402 super(Module
, self
).__init
__(*args
, **kwargs
)
406 self
.all_completions
= []
408 self
.ar_client
= None
411 """ Check if Ansible Runner service is working
414 return (True, "Everything ready")
416 def wait(self
, completions
):
417 """Given a list of Completion instances, progress any which are
420 :param completions: list of Completion instances
421 :Returns : True if everything is done.
424 # Check progress and update status in each operation
425 # Access completion.status property do the trick
426 for operation
in completions
:
427 self
.log
.info("<%s> status:%s", operation
, operation
.status
)
429 completions
= filter(lambda x
: not x
.is_complete
, completions
)
431 ops_pending
= len(completions
)
432 self
.log
.info("Operations pending: %s", ops_pending
)
434 return ops_pending
== 0
437 """ Mandatory for standby modules
439 self
.log
.info("Starting Ansible Orchestrator module ...")
441 # Verify config options (Just that settings are available)
444 # Ansible runner service client
446 self
.ar_client
= Client(server_url
=self
.get_module_option('server_url', ''),
447 user
=self
.get_module_option('username', ''),
448 password
=self
.get_module_option('password', ''),
449 verify_server
=self
.get_module_option('verify_server', True),
451 except AnsibleRunnerServiceError
:
452 self
.log
.exception("Ansible Runner Service not available. "
453 "Check external server status/TLS identity or "
454 "connection options. If configuration options changed"
455 " try to disable/enable the module.")
463 self
.log
.info('Stopping Ansible orchestrator module')
466 def get_inventory(self
, node_filter
=None, refresh
=False):
469 :param : node_filter instance
470 :param : refresh any cached state
471 :Return : A AnsibleReadOperation instance (Completion Object)
474 # Create a new read completion object for execute the playbook
475 playbook_operation
= PlaybookOperation(client
=self
.ar_client
,
476 playbook
=GET_STORAGE_DEVICES_CATALOG_PLAYBOOK
,
478 result_pattern
="list storage inventory",
482 # Assign the process_output function
483 playbook_operation
.output_wizard
= ProcessInventory(self
.ar_client
,
485 playbook_operation
.event_filter
= "runner_on_ok"
487 # Execute the playbook to obtain data
488 self
._launch
_operation
(playbook_operation
)
490 return playbook_operation
492 def create_osds(self
, drive_group
, all_hosts
):
493 """Create one or more OSDs within a single Drive Group.
494 If no host provided the operation affects all the host in the OSDS role
497 :param drive_group: (orchestrator.DriveGroupSpec),
498 Drive group with the specification of drives to use
499 :param all_hosts : (List[str]),
500 List of hosts where the OSD's must be created
503 # Transform drive group specification to Ansible playbook parameters
504 host
, osd_spec
= dg_2_ansible(drive_group
)
506 # Create a new read completion object for execute the playbook
507 playbook_operation
= PlaybookOperation(client
=self
.ar_client
,
508 playbook
=ADD_OSD_PLAYBOOK
,
512 querystr_dict
={"limit": host
})
514 # Filter to get the result
515 playbook_operation
.output_wizard
= ProcessPlaybookResult(self
.ar_client
,
517 playbook_operation
.event_filter
= "playbook_on_stats"
519 # Execute the playbook
520 self
._launch
_operation
(playbook_operation
)
522 return playbook_operation
524 def remove_osds(self
, osd_ids
):
527 :param osd_ids: List of osd's to be removed (List[int])
530 extravars
= {'osd_to_kill': ",".join([str(osd_id
) for osd_id
in osd_ids
]),
531 'ireallymeanit':'yes'}
533 # Create a new read completion object for execute the playbook
534 playbook_operation
= PlaybookOperation(client
=self
.ar_client
,
535 playbook
=REMOVE_OSD_PLAYBOOK
,
540 # Filter to get the result
541 playbook_operation
.output_wizard
= ProcessPlaybookResult(self
.ar_client
,
543 playbook_operation
.event_filter
= "playbook_on_stats"
545 # Execute the playbook
546 self
._launch
_operation
(playbook_operation
)
548 return playbook_operation
551 """Provides a list Inventory nodes
554 host_ls_op
= ARSOperation(self
.ar_client
, self
.log
, URL_GET_HOSTS
)
556 host_ls_op
.output_wizard
= ProcessHostsList(self
.ar_client
,
561 def add_host(self
, host
):
563 Add a host to the Ansible Runner Service inventory in the "orchestrator"
566 :param host: hostname
567 :returns : orchestrator.WriteCompletion
570 url_group
= URL_MANAGE_GROUP
.format(group_name
=ORCHESTRATOR_GROUP
)
573 # Create the orchestrator default group if not exist.
574 # If exists we ignore the error response
575 dummy_response
= self
.ar_client
.http_post(url_group
, "", {})
577 # Here, the default group exists so...
578 # Prepare the operation for adding the new host
579 add_url
= URL_ADD_RM_HOSTS
.format(host_name
=host
,
580 inventory_group
=ORCHESTRATOR_GROUP
)
582 operations
= [HttpOperation(add_url
, "post")]
584 except AnsibleRunnerServiceError
as ex
:
585 # Problems with the external orchestrator.
586 # Prepare the operation to return the error in a Completion object.
587 self
.log
.exception("Error checking <orchestrator> group: %s", ex
)
588 operations
= [HttpOperation(url_group
, "post")]
590 return ARSChangeOperation(self
.ar_client
, self
.log
, operations
)
592 def remove_host(self
, host
):
594 Remove a host from all the groups in the Ansible Runner Service
597 :param host: hostname
598 :returns : orchestrator.WriteCompletion
605 # Get the list of groups where the host is included
606 groups_url
= URL_GET_HOST_GROUPS
.format(host_name
=host
)
607 response
= self
.ar_client
.http_get(groups_url
)
609 if response
.status_code
== requests
.codes
.ok
:
610 host_groups
= json
.loads(response
.text
)["data"]["groups"]
612 except AnsibleRunnerServiceError
:
613 self
.log
.exception("Error retrieving host groups")
616 # Error retrieving the groups, prepare the completion object to
617 # execute the problematic operation just to provide the error
619 operations
= [HttpOperation(groups_url
, "get")]
621 # Build the operations list
622 operations
= list(map(lambda x
:
623 HttpOperation(URL_ADD_RM_HOSTS
.format(
629 return ARSChangeOperation(self
.ar_client
, self
.log
, operations
)
631 def _launch_operation(self
, ansible_operation
):
632 """Launch the operation and add the operation to the completion objects
635 :ansible_operation: A read/write ansible operation (completion object)
638 # Execute the playbook
639 ansible_operation
.execute_playbook()
641 # Add the operation to the list of things ongoing
642 self
.all_completions
.append(ansible_operation
)
644 def verify_config(self
):
645 """ Verify configuration options for the Ansible orchestrator module
649 if not self
.get_module_option('server_url', ''):
650 msg
= "No Ansible Runner Service base URL <server_name>:<port>." \
651 "Try 'ceph config set mgr mgr/{0}/server_url " \
652 "<server name/ip>:<port>'".format(self
.module_name
)
656 if not self
.get_module_option('username', ''):
657 msg
= "No Ansible Runner Service user. " \
658 "Try 'ceph config set mgr mgr/{0}/username " \
659 "<string value>'".format(self
.module_name
)
663 if not self
.get_module_option('password', ''):
664 msg
= "No Ansible Runner Service User password. " \
665 "Try 'ceph config set mgr mgr/{0}/password " \
666 "<string value>'".format(self
.module_name
)
670 if not self
.get_module_option('verify_server', ''):
671 msg
= "TLS server identity verification is enabled by default." \
672 "Use 'ceph config set mgr mgr/{0}/verify_server False' " \
673 "to disable it. Use 'ceph config set mgr mgr/{0}/verify_server " \
674 "<path>' to point the CA bundle path used for " \
675 "verification".format(self
.module_name
)
681 # TODO: Use OrchestratorValidationError
682 raise Exception(client_msg
)
686 # Auxiliary functions
687 #==============================================================================
688 def dg_2_ansible(drive_group
):
689 """ Transform a drive group especification into:
691 a host : limit the playbook execution to this host
692 a osd_spec : dict of parameters to pass to the Ansible playbook used
695 :param drive_group: (type: DriveGroupSpec)
697 TODO: Possible this function will be removed/or modified heavily when
698 the ansible playbook to create osd's use ceph volume batch with
699 drive group parameter
702 # Limit the execution of the playbook to certain hosts
703 # TODO: Now only accepted "*" (all the hosts) or a host_name in the
704 # drive_group.host_pattern
705 # This attribute is intended to be used with "fnmatch" patterns, so when
706 # this become effective it will be needed to use the "get_inventory" method
707 # in order to have a list of hosts to be filtered with the "host_pattern"
708 if drive_group
.host_pattern
in ["*"]:
709 host
= None # No limit in the playbook
711 # For the moment, we assume that we only have 1 host
712 host
= drive_group
.host_pattern
714 # Compose the OSD configuration
718 osd
["data"] = drive_group
.data_devices
.paths
[0]
719 # Other parameters will be extracted in the same way
720 #osd["dmcrypt"] = drive_group.encryption
722 # lvm_volumes parameters
723 # (by the moment is what is accepted in the current playbook)
724 osd_spec
= {"lvm_volumes":[osd
]}
726 #Global scope variables also can be included in the osd_spec
727 #osd_spec["osd_objectstore"] = drive_group.objectstore
729 return host
, osd_spec