]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/python/pyarrow/_s3fs.pyx
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / python / pyarrow / _s3fs.pyx
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 # cython: language_level = 3
19
20 from pyarrow.lib cimport (check_status, pyarrow_wrap_metadata,
21 pyarrow_unwrap_metadata)
22 from pyarrow.lib import frombytes, tobytes, KeyValueMetadata
23 from pyarrow.includes.common cimport *
24 from pyarrow.includes.libarrow cimport *
25 from pyarrow.includes.libarrow_fs cimport *
26 from pyarrow._fs cimport FileSystem
27
28
29 cpdef enum S3LogLevel:
30 Off = <int8_t> CS3LogLevel_Off
31 Fatal = <int8_t> CS3LogLevel_Fatal
32 Error = <int8_t> CS3LogLevel_Error
33 Warn = <int8_t> CS3LogLevel_Warn
34 Info = <int8_t> CS3LogLevel_Info
35 Debug = <int8_t> CS3LogLevel_Debug
36 Trace = <int8_t> CS3LogLevel_Trace
37
38
39 def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal):
40 """
41 Initialize S3 support
42
43 Parameters
44 ----------
45 log_level : S3LogLevel
46 level of logging
47 """
48 cdef CS3GlobalOptions options
49 options.log_level = <CS3LogLevel> log_level
50 check_status(CInitializeS3(options))
51
52
53 def finalize_s3():
54 check_status(CFinalizeS3())
55
56
57 cdef class S3FileSystem(FileSystem):
58 """
59 S3-backed FileSystem implementation
60
61 If neither access_key nor secret_key are provided, and role_arn is also not
62 provided, then attempts to initialize from AWS environment variables,
63 otherwise both access_key and secret_key must be provided.
64
65 If role_arn is provided instead of access_key and secret_key, temporary
66 credentials will be fetched by issuing a request to STS to assume the
67 specified role.
68
69 Note: S3 buckets are special and the operations available on them may be
70 limited or more expensive than desired.
71
72 Parameters
73 ----------
74 access_key : str, default None
75 AWS Access Key ID. Pass None to use the standard AWS environment
76 variables and/or configuration file.
77 secret_key : str, default None
78 AWS Secret Access key. Pass None to use the standard AWS environment
79 variables and/or configuration file.
80 session_token : str, default None
81 AWS Session Token. An optional session token, required if access_key
82 and secret_key are temporary credentials from STS.
83 anonymous : boolean, default False
84 Whether to connect anonymously if access_key and secret_key are None.
85 If true, will not attempt to look up credentials using standard AWS
86 configuration methods.
87 role_arn : str, default None
88 AWS Role ARN. If provided instead of access_key and secret_key,
89 temporary credentials will be fetched by assuming this role.
90 session_name : str, default None
91 An optional identifier for the assumed role session.
92 external_id : str, default None
93 An optional unique identifier that might be required when you assume
94 a role in another account.
95 load_frequency : int, default 900
96 The frequency (in seconds) with which temporary credentials from an
97 assumed role session will be refreshed.
98 region : str, default 'us-east-1'
99 AWS region to connect to.
100 scheme : str, default 'https'
101 S3 connection transport scheme.
102 endpoint_override : str, default None
103 Override region with a connect string such as "localhost:9000"
104 background_writes : boolean, default True
105 Whether file writes will be issued in the background, without
106 blocking.
107 default_metadata : mapping or KeyValueMetadata, default None
108 Default metadata for open_output_stream. This will be ignored if
109 non-empty metadata is passed to open_output_stream.
110 proxy_options : dict or str, default None
111 If a proxy is used, provide the options here. Supported options are:
112 'scheme' (str: 'http' or 'https'; required), 'host' (str; required),
113 'port' (int; required), 'username' (str; optional),
114 'password' (str; optional).
115 A proxy URI (str) can also be provided, in which case these options
116 will be derived from the provided URI.
117 The following are equivalent::
118
119 S3FileSystem(proxy_options='http://username:password@localhost:8020')
120 S3FileSystem(proxy_options={'scheme': 'http', 'host': 'localhost',
121 'port': 8020, 'username': 'username',
122 'password': 'password'})
123 """
124
125 cdef:
126 CS3FileSystem* s3fs
127
128 def __init__(self, *, access_key=None, secret_key=None, session_token=None,
129 bint anonymous=False, region=None, scheme=None,
130 endpoint_override=None, bint background_writes=True,
131 default_metadata=None, role_arn=None, session_name=None,
132 external_id=None, load_frequency=900, proxy_options=None):
133 cdef:
134 CS3Options options
135 shared_ptr[CS3FileSystem] wrapped
136
137 if access_key is not None and secret_key is None:
138 raise ValueError(
139 'In order to initialize with explicit credentials both '
140 'access_key and secret_key must be provided, '
141 '`secret_key` is not set.'
142 )
143 elif access_key is None and secret_key is not None:
144 raise ValueError(
145 'In order to initialize with explicit credentials both '
146 'access_key and secret_key must be provided, '
147 '`access_key` is not set.'
148 )
149
150 elif session_token is not None and (access_key is None or
151 secret_key is None):
152 raise ValueError(
153 'In order to initialize a session with temporary credentials, '
154 'both secret_key and access_key must be provided in addition '
155 'to session_token.'
156 )
157
158 elif (access_key is not None or secret_key is not None):
159 if anonymous:
160 raise ValueError(
161 'Cannot pass anonymous=True together with access_key '
162 'and secret_key.')
163
164 if role_arn:
165 raise ValueError(
166 'Cannot provide role_arn with access_key and secret_key')
167
168 if session_token is None:
169 session_token = ""
170
171 options = CS3Options.FromAccessKey(
172 tobytes(access_key),
173 tobytes(secret_key),
174 tobytes(session_token)
175 )
176 elif anonymous:
177 if role_arn:
178 raise ValueError(
179 'Cannot provide role_arn with anonymous=True')
180
181 options = CS3Options.Anonymous()
182 elif role_arn:
183
184 options = CS3Options.FromAssumeRole(
185 tobytes(role_arn),
186 tobytes(session_name),
187 tobytes(external_id),
188 load_frequency
189 )
190 else:
191 options = CS3Options.Defaults()
192
193 if region is not None:
194 options.region = tobytes(region)
195 if scheme is not None:
196 options.scheme = tobytes(scheme)
197 if endpoint_override is not None:
198 options.endpoint_override = tobytes(endpoint_override)
199 if background_writes is not None:
200 options.background_writes = background_writes
201 if default_metadata is not None:
202 if not isinstance(default_metadata, KeyValueMetadata):
203 default_metadata = KeyValueMetadata(default_metadata)
204 options.default_metadata = pyarrow_unwrap_metadata(
205 default_metadata)
206
207 if proxy_options is not None:
208 if isinstance(proxy_options, dict):
209 options.proxy_options.scheme = tobytes(proxy_options["scheme"])
210 options.proxy_options.host = tobytes(proxy_options["host"])
211 options.proxy_options.port = proxy_options["port"]
212 proxy_username = proxy_options.get("username", None)
213 if proxy_username:
214 options.proxy_options.username = tobytes(proxy_username)
215 proxy_password = proxy_options.get("password", None)
216 if proxy_password:
217 options.proxy_options.password = tobytes(proxy_password)
218 elif isinstance(proxy_options, str):
219 options.proxy_options = GetResultValue(
220 CS3ProxyOptions.FromUriString(tobytes(proxy_options)))
221 else:
222 raise TypeError(
223 "'proxy_options': expected 'dict' or 'str', "
224 f"got {type(proxy_options)} instead.")
225
226 with nogil:
227 wrapped = GetResultValue(CS3FileSystem.Make(options))
228
229 self.init(<shared_ptr[CFileSystem]> wrapped)
230
231 cdef init(self, const shared_ptr[CFileSystem]& wrapped):
232 FileSystem.init(self, wrapped)
233 self.s3fs = <CS3FileSystem*> wrapped.get()
234
235 @classmethod
236 def _reconstruct(cls, kwargs):
237 return cls(**kwargs)
238
239 def __reduce__(self):
240 cdef CS3Options opts = self.s3fs.options()
241
242 # if creds were explicitly provided, then use them
243 # else obtain them as they were last time.
244 if opts.credentials_kind == CS3CredentialsKind_Explicit:
245 access_key = frombytes(opts.GetAccessKey())
246 secret_key = frombytes(opts.GetSecretKey())
247 session_token = frombytes(opts.GetSessionToken())
248 else:
249 access_key = None
250 secret_key = None
251 session_token = None
252
253 return (
254 S3FileSystem._reconstruct, (dict(
255 access_key=access_key,
256 secret_key=secret_key,
257 session_token=session_token,
258 anonymous=(opts.credentials_kind ==
259 CS3CredentialsKind_Anonymous),
260 region=frombytes(opts.region),
261 scheme=frombytes(opts.scheme),
262 endpoint_override=frombytes(opts.endpoint_override),
263 role_arn=frombytes(opts.role_arn),
264 session_name=frombytes(opts.session_name),
265 external_id=frombytes(opts.external_id),
266 load_frequency=opts.load_frequency,
267 background_writes=opts.background_writes,
268 default_metadata=pyarrow_wrap_metadata(opts.default_metadata),
269 proxy_options={'scheme': frombytes(opts.proxy_options.scheme),
270 'host': frombytes(opts.proxy_options.host),
271 'port': opts.proxy_options.port,
272 'username': frombytes(
273 opts.proxy_options.username),
274 'password': frombytes(
275 opts.proxy_options.password)}
276 ),)
277 )
278
279 @property
280 def region(self):
281 """
282 The AWS region this filesystem connects to.
283 """
284 return frombytes(self.s3fs.region())