]>
Commit | Line | Data |
---|---|---|
1 | # Licensed to the Apache Software Foundation (ASF) under one | |
2 | # or more contributor license agreements. See the NOTICE file | |
3 | # distributed with this work for additional information | |
4 | # regarding copyright ownership. The ASF licenses this file | |
5 | # to you under the Apache License, Version 2.0 (the | |
6 | # "License"); you may not use this file except in compliance | |
7 | # with the License. You may obtain a copy of the License at | |
8 | # | |
9 | # http://www.apache.org/licenses/LICENSE-2.0 | |
10 | # | |
11 | # Unless required by applicable law or agreed to in writing, | |
12 | # software distributed under the License is distributed on an | |
13 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | # KIND, either express or implied. See the License for the | |
15 | # specific language governing permissions and limitations | |
16 | # under the License. | |
17 | ||
18 | # cython: language_level = 3 | |
19 | ||
20 | from pyarrow.lib cimport (check_status, pyarrow_wrap_metadata, | |
21 | pyarrow_unwrap_metadata) | |
22 | from pyarrow.lib import frombytes, tobytes, KeyValueMetadata | |
23 | from pyarrow.includes.common cimport * | |
24 | from pyarrow.includes.libarrow cimport * | |
25 | from pyarrow.includes.libarrow_fs cimport * | |
26 | from pyarrow._fs cimport FileSystem | |
27 | ||
28 | ||
29 | cpdef enum S3LogLevel: | |
30 | Off = <int8_t> CS3LogLevel_Off | |
31 | Fatal = <int8_t> CS3LogLevel_Fatal | |
32 | Error = <int8_t> CS3LogLevel_Error | |
33 | Warn = <int8_t> CS3LogLevel_Warn | |
34 | Info = <int8_t> CS3LogLevel_Info | |
35 | Debug = <int8_t> CS3LogLevel_Debug | |
36 | Trace = <int8_t> CS3LogLevel_Trace | |
37 | ||
38 | ||
39 | def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal): | |
40 | """ | |
41 | Initialize S3 support | |
42 | ||
43 | Parameters | |
44 | ---------- | |
45 | log_level : S3LogLevel | |
46 | level of logging | |
47 | """ | |
48 | cdef CS3GlobalOptions options | |
49 | options.log_level = <CS3LogLevel> log_level | |
50 | check_status(CInitializeS3(options)) | |
51 | ||
52 | ||
53 | def finalize_s3(): | |
54 | check_status(CFinalizeS3()) | |
55 | ||
56 | ||
57 | cdef class S3FileSystem(FileSystem): | |
58 | """ | |
59 | S3-backed FileSystem implementation | |
60 | ||
61 | If neither access_key nor secret_key are provided, and role_arn is also not | |
62 | provided, then attempts to initialize from AWS environment variables, | |
63 | otherwise both access_key and secret_key must be provided. | |
64 | ||
65 | If role_arn is provided instead of access_key and secret_key, temporary | |
66 | credentials will be fetched by issuing a request to STS to assume the | |
67 | specified role. | |
68 | ||
69 | Note: S3 buckets are special and the operations available on them may be | |
70 | limited or more expensive than desired. | |
71 | ||
72 | Parameters | |
73 | ---------- | |
74 | access_key : str, default None | |
75 | AWS Access Key ID. Pass None to use the standard AWS environment | |
76 | variables and/or configuration file. | |
77 | secret_key : str, default None | |
78 | AWS Secret Access key. Pass None to use the standard AWS environment | |
79 | variables and/or configuration file. | |
80 | session_token : str, default None | |
81 | AWS Session Token. An optional session token, required if access_key | |
82 | and secret_key are temporary credentials from STS. | |
83 | anonymous : boolean, default False | |
84 | Whether to connect anonymously if access_key and secret_key are None. | |
85 | If true, will not attempt to look up credentials using standard AWS | |
86 | configuration methods. | |
87 | role_arn : str, default None | |
88 | AWS Role ARN. If provided instead of access_key and secret_key, | |
89 | temporary credentials will be fetched by assuming this role. | |
90 | session_name : str, default None | |
91 | An optional identifier for the assumed role session. | |
92 | external_id : str, default None | |
93 | An optional unique identifier that might be required when you assume | |
94 | a role in another account. | |
95 | load_frequency : int, default 900 | |
96 | The frequency (in seconds) with which temporary credentials from an | |
97 | assumed role session will be refreshed. | |
98 | region : str, default 'us-east-1' | |
99 | AWS region to connect to. | |
100 | scheme : str, default 'https' | |
101 | S3 connection transport scheme. | |
102 | endpoint_override : str, default None | |
103 | Override region with a connect string such as "localhost:9000" | |
104 | background_writes : boolean, default True | |
105 | Whether file writes will be issued in the background, without | |
106 | blocking. | |
107 | default_metadata : mapping or KeyValueMetadata, default None | |
108 | Default metadata for open_output_stream. This will be ignored if | |
109 | non-empty metadata is passed to open_output_stream. | |
110 | proxy_options : dict or str, default None | |
111 | If a proxy is used, provide the options here. Supported options are: | |
112 | 'scheme' (str: 'http' or 'https'; required), 'host' (str; required), | |
113 | 'port' (int; required), 'username' (str; optional), | |
114 | 'password' (str; optional). | |
115 | A proxy URI (str) can also be provided, in which case these options | |
116 | will be derived from the provided URI. | |
117 | The following are equivalent:: | |
118 | ||
119 | S3FileSystem(proxy_options='http://username:password@localhost:8020') | |
120 | S3FileSystem(proxy_options={'scheme': 'http', 'host': 'localhost', | |
121 | 'port': 8020, 'username': 'username', | |
122 | 'password': 'password'}) | |
123 | """ | |
124 | ||
125 | cdef: | |
126 | CS3FileSystem* s3fs | |
127 | ||
128 | def __init__(self, *, access_key=None, secret_key=None, session_token=None, | |
129 | bint anonymous=False, region=None, scheme=None, | |
130 | endpoint_override=None, bint background_writes=True, | |
131 | default_metadata=None, role_arn=None, session_name=None, | |
132 | external_id=None, load_frequency=900, proxy_options=None): | |
133 | cdef: | |
134 | CS3Options options | |
135 | shared_ptr[CS3FileSystem] wrapped | |
136 | ||
137 | if access_key is not None and secret_key is None: | |
138 | raise ValueError( | |
139 | 'In order to initialize with explicit credentials both ' | |
140 | 'access_key and secret_key must be provided, ' | |
141 | '`secret_key` is not set.' | |
142 | ) | |
143 | elif access_key is None and secret_key is not None: | |
144 | raise ValueError( | |
145 | 'In order to initialize with explicit credentials both ' | |
146 | 'access_key and secret_key must be provided, ' | |
147 | '`access_key` is not set.' | |
148 | ) | |
149 | ||
150 | elif session_token is not None and (access_key is None or | |
151 | secret_key is None): | |
152 | raise ValueError( | |
153 | 'In order to initialize a session with temporary credentials, ' | |
154 | 'both secret_key and access_key must be provided in addition ' | |
155 | 'to session_token.' | |
156 | ) | |
157 | ||
158 | elif (access_key is not None or secret_key is not None): | |
159 | if anonymous: | |
160 | raise ValueError( | |
161 | 'Cannot pass anonymous=True together with access_key ' | |
162 | 'and secret_key.') | |
163 | ||
164 | if role_arn: | |
165 | raise ValueError( | |
166 | 'Cannot provide role_arn with access_key and secret_key') | |
167 | ||
168 | if session_token is None: | |
169 | session_token = "" | |
170 | ||
171 | options = CS3Options.FromAccessKey( | |
172 | tobytes(access_key), | |
173 | tobytes(secret_key), | |
174 | tobytes(session_token) | |
175 | ) | |
176 | elif anonymous: | |
177 | if role_arn: | |
178 | raise ValueError( | |
179 | 'Cannot provide role_arn with anonymous=True') | |
180 | ||
181 | options = CS3Options.Anonymous() | |
182 | elif role_arn: | |
183 | ||
184 | options = CS3Options.FromAssumeRole( | |
185 | tobytes(role_arn), | |
186 | tobytes(session_name), | |
187 | tobytes(external_id), | |
188 | load_frequency | |
189 | ) | |
190 | else: | |
191 | options = CS3Options.Defaults() | |
192 | ||
193 | if region is not None: | |
194 | options.region = tobytes(region) | |
195 | if scheme is not None: | |
196 | options.scheme = tobytes(scheme) | |
197 | if endpoint_override is not None: | |
198 | options.endpoint_override = tobytes(endpoint_override) | |
199 | if background_writes is not None: | |
200 | options.background_writes = background_writes | |
201 | if default_metadata is not None: | |
202 | if not isinstance(default_metadata, KeyValueMetadata): | |
203 | default_metadata = KeyValueMetadata(default_metadata) | |
204 | options.default_metadata = pyarrow_unwrap_metadata( | |
205 | default_metadata) | |
206 | ||
207 | if proxy_options is not None: | |
208 | if isinstance(proxy_options, dict): | |
209 | options.proxy_options.scheme = tobytes(proxy_options["scheme"]) | |
210 | options.proxy_options.host = tobytes(proxy_options["host"]) | |
211 | options.proxy_options.port = proxy_options["port"] | |
212 | proxy_username = proxy_options.get("username", None) | |
213 | if proxy_username: | |
214 | options.proxy_options.username = tobytes(proxy_username) | |
215 | proxy_password = proxy_options.get("password", None) | |
216 | if proxy_password: | |
217 | options.proxy_options.password = tobytes(proxy_password) | |
218 | elif isinstance(proxy_options, str): | |
219 | options.proxy_options = GetResultValue( | |
220 | CS3ProxyOptions.FromUriString(tobytes(proxy_options))) | |
221 | else: | |
222 | raise TypeError( | |
223 | "'proxy_options': expected 'dict' or 'str', " | |
224 | f"got {type(proxy_options)} instead.") | |
225 | ||
226 | with nogil: | |
227 | wrapped = GetResultValue(CS3FileSystem.Make(options)) | |
228 | ||
229 | self.init(<shared_ptr[CFileSystem]> wrapped) | |
230 | ||
231 | cdef init(self, const shared_ptr[CFileSystem]& wrapped): | |
232 | FileSystem.init(self, wrapped) | |
233 | self.s3fs = <CS3FileSystem*> wrapped.get() | |
234 | ||
235 | @classmethod | |
236 | def _reconstruct(cls, kwargs): | |
237 | return cls(**kwargs) | |
238 | ||
239 | def __reduce__(self): | |
240 | cdef CS3Options opts = self.s3fs.options() | |
241 | ||
242 | # if creds were explicitly provided, then use them | |
243 | # else obtain them as they were last time. | |
244 | if opts.credentials_kind == CS3CredentialsKind_Explicit: | |
245 | access_key = frombytes(opts.GetAccessKey()) | |
246 | secret_key = frombytes(opts.GetSecretKey()) | |
247 | session_token = frombytes(opts.GetSessionToken()) | |
248 | else: | |
249 | access_key = None | |
250 | secret_key = None | |
251 | session_token = None | |
252 | ||
253 | return ( | |
254 | S3FileSystem._reconstruct, (dict( | |
255 | access_key=access_key, | |
256 | secret_key=secret_key, | |
257 | session_token=session_token, | |
258 | anonymous=(opts.credentials_kind == | |
259 | CS3CredentialsKind_Anonymous), | |
260 | region=frombytes(opts.region), | |
261 | scheme=frombytes(opts.scheme), | |
262 | endpoint_override=frombytes(opts.endpoint_override), | |
263 | role_arn=frombytes(opts.role_arn), | |
264 | session_name=frombytes(opts.session_name), | |
265 | external_id=frombytes(opts.external_id), | |
266 | load_frequency=opts.load_frequency, | |
267 | background_writes=opts.background_writes, | |
268 | default_metadata=pyarrow_wrap_metadata(opts.default_metadata), | |
269 | proxy_options={'scheme': frombytes(opts.proxy_options.scheme), | |
270 | 'host': frombytes(opts.proxy_options.host), | |
271 | 'port': opts.proxy_options.port, | |
272 | 'username': frombytes( | |
273 | opts.proxy_options.username), | |
274 | 'password': frombytes( | |
275 | opts.proxy_options.password)} | |
276 | ),) | |
277 | ) | |
278 | ||
279 | @property | |
280 | def region(self): | |
281 | """ | |
282 | The AWS region this filesystem connects to. | |
283 | """ | |
284 | return frombytes(self.s3fs.region()) |