]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/python/pyarrow/_s3fs.pyx
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / _s3fs.pyx
diff --git a/ceph/src/arrow/python/pyarrow/_s3fs.pyx b/ceph/src/arrow/python/pyarrow/_s3fs.pyx
new file mode 100644 (file)
index 0000000..5829d74
--- /dev/null
@@ -0,0 +1,284 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow.lib cimport (check_status, pyarrow_wrap_metadata,
+                          pyarrow_unwrap_metadata)
+from pyarrow.lib import frombytes, tobytes, KeyValueMetadata
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.libarrow_fs cimport *
+from pyarrow._fs cimport FileSystem
+
+
+cpdef enum S3LogLevel:
+    Off = <int8_t> CS3LogLevel_Off
+    Fatal = <int8_t> CS3LogLevel_Fatal
+    Error = <int8_t> CS3LogLevel_Error
+    Warn = <int8_t> CS3LogLevel_Warn
+    Info = <int8_t> CS3LogLevel_Info
+    Debug = <int8_t> CS3LogLevel_Debug
+    Trace = <int8_t> CS3LogLevel_Trace
+
+
+def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal):
+    """
+    Initialize S3 support
+
+    Parameters
+    ----------
+    log_level : S3LogLevel
+        level of logging
+    """
+    cdef CS3GlobalOptions options
+    options.log_level = <CS3LogLevel> log_level
+    check_status(CInitializeS3(options))
+
+
+def finalize_s3():
+    check_status(CFinalizeS3())
+
+
+cdef class S3FileSystem(FileSystem):
+    """
+    S3-backed FileSystem implementation
+
+    If neither access_key nor secret_key are provided, and role_arn is also not
+    provided, then attempts to initialize from AWS environment variables,
+    otherwise both access_key and secret_key must be provided.
+
+    If role_arn is provided instead of access_key and secret_key, temporary
+    credentials will be fetched by issuing a request to STS to assume the
+    specified role.
+
+    Note: S3 buckets are special and the operations available on them may be
+    limited or more expensive than desired.
+
+    Parameters
+    ----------
+    access_key : str, default None
+        AWS Access Key ID. Pass None to use the standard AWS environment
+        variables and/or configuration file.
+    secret_key : str, default None
+        AWS Secret Access key. Pass None to use the standard AWS environment
+        variables and/or configuration file.
+    session_token : str, default None
+        AWS Session Token.  An optional session token, required if access_key
+        and secret_key are temporary credentials from STS.
+    anonymous : boolean, default False
+        Whether to connect anonymously if access_key and secret_key are None.
+        If true, will not attempt to look up credentials using standard AWS
+        configuration methods.
+    role_arn : str, default None
+        AWS Role ARN.  If provided instead of access_key and secret_key,
+        temporary credentials will be fetched by assuming this role.
+    session_name : str, default None
+        An optional identifier for the assumed role session.
+    external_id : str, default None
+        An optional unique identifier that might be required when you assume
+        a role in another account.
+    load_frequency : int, default 900
+        The frequency (in seconds) with which temporary credentials from an
+        assumed role session will be refreshed.
+    region : str, default 'us-east-1'
+        AWS region to connect to.
+    scheme : str, default 'https'
+        S3 connection transport scheme.
+    endpoint_override : str, default None
+        Override region with a connect string such as "localhost:9000"
+    background_writes : boolean, default True
+        Whether file writes will be issued in the background, without
+        blocking.
+    default_metadata : mapping or KeyValueMetadata, default None
+        Default metadata for open_output_stream.  This will be ignored if
+        non-empty metadata is passed to open_output_stream.
+    proxy_options : dict or str, default None
+        If a proxy is used, provide the options here. Supported options are:
+        'scheme' (str: 'http' or 'https'; required), 'host' (str; required),
+        'port' (int; required), 'username' (str; optional),
+        'password' (str; optional).
+        A proxy URI (str) can also be provided, in which case these options
+        will be derived from the provided URI.
+        The following are equivalent::
+
+            S3FileSystem(proxy_options='http://username:password@localhost:8020')
+            S3FileSystem(proxy_options={'scheme': 'http', 'host': 'localhost',
+                                        'port': 8020, 'username': 'username',
+                                        'password': 'password'})
+    """
+
+    cdef:
+        CS3FileSystem* s3fs
+
+    def __init__(self, *, access_key=None, secret_key=None, session_token=None,
+                 bint anonymous=False, region=None, scheme=None,
+                 endpoint_override=None, bint background_writes=True,
+                 default_metadata=None, role_arn=None, session_name=None,
+                 external_id=None, load_frequency=900, proxy_options=None):
+        cdef:
+            CS3Options options
+            shared_ptr[CS3FileSystem] wrapped
+
+        if access_key is not None and secret_key is None:
+            raise ValueError(
+                'In order to initialize with explicit credentials both '
+                'access_key and secret_key must be provided, '
+                '`secret_key` is not set.'
+            )
+        elif access_key is None and secret_key is not None:
+            raise ValueError(
+                'In order to initialize with explicit credentials both '
+                'access_key and secret_key must be provided, '
+                '`access_key` is not set.'
+            )
+
+        elif session_token is not None and (access_key is None or
+                                            secret_key is None):
+            raise ValueError(
+                'In order to initialize a session with temporary credentials, '
+                'both secret_key and access_key must be provided in addition '
+                'to session_token.'
+            )
+
+        elif (access_key is not None or secret_key is not None):
+            if anonymous:
+                raise ValueError(
+                    'Cannot pass anonymous=True together with access_key '
+                    'and secret_key.')
+
+            if role_arn:
+                raise ValueError(
+                    'Cannot provide role_arn with access_key and secret_key')
+
+            if session_token is None:
+                session_token = ""
+
+            options = CS3Options.FromAccessKey(
+                tobytes(access_key),
+                tobytes(secret_key),
+                tobytes(session_token)
+            )
+        elif anonymous:
+            if role_arn:
+                raise ValueError(
+                    'Cannot provide role_arn with anonymous=True')
+
+            options = CS3Options.Anonymous()
+        elif role_arn:
+
+            options = CS3Options.FromAssumeRole(
+                tobytes(role_arn),
+                tobytes(session_name),
+                tobytes(external_id),
+                load_frequency
+            )
+        else:
+            options = CS3Options.Defaults()
+
+        if region is not None:
+            options.region = tobytes(region)
+        if scheme is not None:
+            options.scheme = tobytes(scheme)
+        if endpoint_override is not None:
+            options.endpoint_override = tobytes(endpoint_override)
+        if background_writes is not None:
+            options.background_writes = background_writes
+        if default_metadata is not None:
+            if not isinstance(default_metadata, KeyValueMetadata):
+                default_metadata = KeyValueMetadata(default_metadata)
+            options.default_metadata = pyarrow_unwrap_metadata(
+                default_metadata)
+
+        if proxy_options is not None:
+            if isinstance(proxy_options, dict):
+                options.proxy_options.scheme = tobytes(proxy_options["scheme"])
+                options.proxy_options.host = tobytes(proxy_options["host"])
+                options.proxy_options.port = proxy_options["port"]
+                proxy_username = proxy_options.get("username", None)
+                if proxy_username:
+                    options.proxy_options.username = tobytes(proxy_username)
+                proxy_password = proxy_options.get("password", None)
+                if proxy_password:
+                    options.proxy_options.password = tobytes(proxy_password)
+            elif isinstance(proxy_options, str):
+                options.proxy_options = GetResultValue(
+                    CS3ProxyOptions.FromUriString(tobytes(proxy_options)))
+            else:
+                raise TypeError(
+                    "'proxy_options': expected 'dict' or 'str', "
+                    f"got {type(proxy_options)} instead.")
+
+        with nogil:
+            wrapped = GetResultValue(CS3FileSystem.Make(options))
+
+        self.init(<shared_ptr[CFileSystem]> wrapped)
+
+    cdef init(self, const shared_ptr[CFileSystem]& wrapped):
+        FileSystem.init(self, wrapped)
+        self.s3fs = <CS3FileSystem*> wrapped.get()
+
+    @classmethod
+    def _reconstruct(cls, kwargs):
+        return cls(**kwargs)
+
+    def __reduce__(self):
+        cdef CS3Options opts = self.s3fs.options()
+
+        # if creds were explicitly provided, then use them
+        # else obtain them as they were last time.
+        if opts.credentials_kind == CS3CredentialsKind_Explicit:
+            access_key = frombytes(opts.GetAccessKey())
+            secret_key = frombytes(opts.GetSecretKey())
+            session_token = frombytes(opts.GetSessionToken())
+        else:
+            access_key = None
+            secret_key = None
+            session_token = None
+
+        return (
+            S3FileSystem._reconstruct, (dict(
+                access_key=access_key,
+                secret_key=secret_key,
+                session_token=session_token,
+                anonymous=(opts.credentials_kind ==
+                           CS3CredentialsKind_Anonymous),
+                region=frombytes(opts.region),
+                scheme=frombytes(opts.scheme),
+                endpoint_override=frombytes(opts.endpoint_override),
+                role_arn=frombytes(opts.role_arn),
+                session_name=frombytes(opts.session_name),
+                external_id=frombytes(opts.external_id),
+                load_frequency=opts.load_frequency,
+                background_writes=opts.background_writes,
+                default_metadata=pyarrow_wrap_metadata(opts.default_metadata),
+                proxy_options={'scheme': frombytes(opts.proxy_options.scheme),
+                               'host': frombytes(opts.proxy_options.host),
+                               'port': opts.proxy_options.port,
+                               'username': frombytes(
+                                   opts.proxy_options.username),
+                               'password': frombytes(
+                                   opts.proxy_options.password)}
+            ),)
+        )
+
+    @property
+    def region(self):
+        """
+        The AWS region this filesystem connects to.
+        """
+        return frombytes(self.s3fs.region())