import rados
from mgr_module import CommandResult
-from mgr_util import get_most_recent_rate, get_time_series_rates
+from mgr_util import get_most_recent_rate, get_time_series_rates, name_to_config_section
from .. import mgr
return pool
return None
+ @classmethod
+ def get_encryption_config(cls, daemon_name):
+ kms_vault_configured = False
+ s3_vault_configured = False
+ kms_backend: str = ''
+ sse_s3_backend: str = ''
+ vault_stats = []
+ full_daemon_name = 'rgw.' + daemon_name
+
+ kms_backend = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(full_daemon_name),
+ key='rgw_crypt_s3_kms_backend')
+ sse_s3_backend = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(full_daemon_name),
+ key='rgw_crypt_sse_s3_backend')
+
+ if kms_backend.strip() == 'vault':
+ kms_vault_auth: str = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
+ key='rgw_crypt_vault_auth')
+ kms_vault_engine: str = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
+ key='rgw_crypt_vault_secret_engine')
+ kms_vault_address: str = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
+ key='rgw_crypt_vault_addr')
+ kms_vault_token: str = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
+ key='rgw_crypt_vault_token_file') # noqa E501 #pylint: disable=line-too-long
+ if (kms_vault_auth.strip() != "" and kms_vault_engine.strip() != "" and kms_vault_address.strip() != ""): # noqa E501 #pylint: disable=line-too-long
+ if(kms_vault_auth == 'token' and kms_vault_token.strip() == ""):
+ kms_vault_configured = False
+ else:
+ kms_vault_configured = True
+
+ if sse_s3_backend.strip() == 'vault':
+ s3_vault_auth: str = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
+ key='rgw_crypt_sse_s3_vault_auth')
+ s3_vault_engine: str = CephService.send_command('mon',
+ 'config get',
+ who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
+ key='rgw_crypt_sse_s3_vault_secret_engine') # noqa E501 #pylint: disable=line-too-long
+ s3_vault_address: str = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
+ key='rgw_crypt_sse_s3_vault_addr')
+ s3_vault_token: str = CephService.send_command('mon', 'config get',
+ who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
+ key='rgw_crypt_sse_s3_vault_token_file') # noqa E501 #pylint: disable=line-too-long
+
+ if (s3_vault_auth.strip() != "" and s3_vault_engine.strip() != "" and s3_vault_address.strip() != ""): # noqa E501 #pylint: disable=line-too-long
+ if(s3_vault_auth == 'token' and s3_vault_token.strip() == ""):
+ s3_vault_configured = False
+ else:
+ s3_vault_configured = True
+
+ vault_stats.append(kms_vault_configured)
+ vault_stats.append(s3_vault_configured)
+ return vault_stats
+
+ @classmethod
+ def set_encryption_config(cls, encryption_type, kms_provider, auth_method,
+ secret_engine, secret_path, namespace, address,
+ token, daemon_name, ssl_cert, client_cert, client_key):
+ full_daemon_name = 'rgw.' + daemon_name
+ if encryption_type == 'aws:kms':
+
+ KMS_CONFIG = [
+ ['rgw_crypt_s3_kms_backend', kms_provider],
+ ['rgw_crypt_vault_auth', auth_method],
+ ['rgw_crypt_vault_prefix', secret_path],
+ ['rgw_crypt_vault_namespace', namespace],
+ ['rgw_crypt_vault_secret_engine', secret_engine],
+ ['rgw_crypt_vault_addr', address],
+ ['rgw_crypt_vault_token_file', token],
+ ['rgw_crypt_vault_ssl_cacert', ssl_cert],
+ ['rgw_crypt_vault_ssl_clientcert', client_cert],
+ ['rgw_crypt_vault_ssl_clientkey', client_key]
+ ]
+
+ for (key, value) in KMS_CONFIG:
+ if value == 'null':
+ continue
+ CephService.send_command('mon', 'config set',
+ who=name_to_config_section(full_daemon_name),
+ name=key, value=value)
+
+ if encryption_type == 'AES256':
+
+ SSE_S3_CONFIG = [
+ ['rgw_crypt_sse_s3_backend', kms_provider],
+ ['rgw_crypt_sse_s3_vault_auth', auth_method],
+ ['rgw_crypt_sse_s3_vault_prefix', secret_path],
+ ['rgw_crypt_sse_s3_vault_namespace', namespace],
+ ['rgw_crypt_sse_s3_vault_secret_engine', secret_engine],
+ ['rgw_crypt_sse_s3_vault_addr', address],
+ ['rgw_crypt_sse_s3_vault_token_file', token],
+ ['rgw_crypt_sse_s3_vault_ssl_cacert', ssl_cert],
+ ['rgw_crypt_sse_s3_vault_ssl_clientcert', client_cert],
+ ['rgw_crypt_sse_s3_vault_ssl_clientkey', client_key]
+ ]
+
+ for (key, value) in SSE_S3_CONFIG:
+ if value == 'null':
+ continue
+ CephService.send_command('mon', 'config set',
+ who=name_to_config_section(full_daemon_name),
+ name=key, value=value)
+
+ return {}
+
@classmethod
def get_pool_pg_status(cls, pool_name):
# type: (str) -> dict