]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include <chrono> | |
19 | #include <cstring> | |
20 | #include <unordered_map> | |
21 | #include <utility> | |
22 | ||
23 | #include "arrow/filesystem/hdfs.h" | |
24 | #include "arrow/filesystem/path_util.h" | |
25 | #include "arrow/filesystem/util_internal.h" | |
26 | #include "arrow/io/hdfs.h" | |
27 | #include "arrow/io/hdfs_internal.h" | |
28 | #include "arrow/util/checked_cast.h" | |
29 | #include "arrow/util/logging.h" | |
30 | #include "arrow/util/value_parsing.h" | |
31 | #include "arrow/util/windows_fixup.h" | |
32 | ||
33 | namespace arrow { | |
34 | ||
35 | using internal::ParseValue; | |
36 | using internal::Uri; | |
37 | ||
38 | namespace fs { | |
39 | ||
40 | using internal::GetAbstractPathParent; | |
41 | using internal::MakeAbstractPathRelative; | |
42 | using internal::RemoveLeadingSlash; | |
43 | ||
44 | class HadoopFileSystem::Impl { | |
45 | public: | |
46 | Impl(HdfsOptions options, const io::IOContext& io_context) | |
47 | : options_(std::move(options)), io_context_(io_context) {} | |
48 | ||
49 | ~Impl() { | |
50 | Status st = Close(); | |
51 | if (!st.ok()) { | |
52 | ARROW_LOG(WARNING) << "Failed to disconnect hdfs client: " << st.ToString(); | |
53 | } | |
54 | } | |
55 | ||
56 | Status Init() { | |
57 | io::internal::LibHdfsShim* driver_shim; | |
58 | RETURN_NOT_OK(ConnectLibHdfs(&driver_shim)); | |
59 | RETURN_NOT_OK(io::HadoopFileSystem::Connect(&options_.connection_config, &client_)); | |
60 | return Status::OK(); | |
61 | } | |
62 | ||
63 | Status Close() { | |
64 | if (client_) { | |
65 | RETURN_NOT_OK(client_->Disconnect()); | |
66 | } | |
67 | return Status::OK(); | |
68 | } | |
69 | ||
70 | HdfsOptions options() const { return options_; } | |
71 | ||
72 | Result<FileInfo> GetFileInfo(const std::string& path) { | |
73 | // It has unfortunately been a frequent logic error to pass URIs down | |
74 | // to GetFileInfo (e.g. ARROW-10264). Unlike other filesystems, HDFS | |
75 | // silently accepts URIs but returns different results than if given the | |
76 | // equivalent in-filesystem paths. Instead of raising cryptic errors | |
77 | // later, notify the underlying problem immediately. | |
78 | if (path.substr(0, 5) == "hdfs:") { | |
79 | return Status::Invalid("GetFileInfo must not be passed a URI, got: ", path); | |
80 | } | |
81 | FileInfo info; | |
82 | io::HdfsPathInfo path_info; | |
83 | auto status = client_->GetPathInfo(path, &path_info); | |
84 | info.set_path(path); | |
85 | if (status.IsIOError()) { | |
86 | info.set_type(FileType::NotFound); | |
87 | return info; | |
88 | } | |
89 | ||
90 | PathInfoToFileInfo(path_info, &info); | |
91 | return info; | |
92 | } | |
93 | ||
94 | Status StatSelector(const std::string& wd, const std::string& path, | |
95 | const FileSelector& select, int nesting_depth, | |
96 | std::vector<FileInfo>* out) { | |
97 | std::vector<io::HdfsPathInfo> children; | |
98 | Status st = client_->ListDirectory(path, &children); | |
99 | if (!st.ok()) { | |
100 | if (select.allow_not_found) { | |
101 | ARROW_ASSIGN_OR_RAISE(auto info, GetFileInfo(path)); | |
102 | if (info.type() == FileType::NotFound) { | |
103 | return Status::OK(); | |
104 | } | |
105 | } | |
106 | return st; | |
107 | } | |
108 | for (const auto& child_path_info : children) { | |
109 | // HDFS returns an absolute "URI" here, need to extract path relative to wd | |
110 | // XXX: unfortunately, this is not a real URI as special characters | |
111 | // are not %-escaped... hence parsing it as URI would fail. | |
112 | std::string child_path; | |
113 | if (!wd.empty()) { | |
114 | if (child_path_info.name.substr(0, wd.length()) != wd) { | |
115 | return Status::IOError("HDFS returned path '", child_path_info.name, | |
116 | "' that is not a child of '", wd, "'"); | |
117 | } | |
118 | child_path = child_path_info.name.substr(wd.length()); | |
119 | } else { | |
120 | child_path = child_path_info.name; | |
121 | } | |
122 | ||
123 | FileInfo info; | |
124 | info.set_path(child_path); | |
125 | PathInfoToFileInfo(child_path_info, &info); | |
126 | const bool is_dir = info.type() == FileType::Directory; | |
127 | out->push_back(std::move(info)); | |
128 | if (is_dir && select.recursive && nesting_depth < select.max_recursion) { | |
129 | RETURN_NOT_OK(StatSelector(wd, child_path, select, nesting_depth + 1, out)); | |
130 | } | |
131 | } | |
132 | return Status::OK(); | |
133 | } | |
134 | ||
135 | Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) { | |
136 | // See GetFileInfo(const std::string&) above. | |
137 | if (select.base_dir.substr(0, 5) == "hdfs:") { | |
138 | return Status::Invalid("FileSelector.base_dir must not be a URI, got: ", | |
139 | select.base_dir); | |
140 | } | |
141 | std::vector<FileInfo> results; | |
142 | ||
143 | // Fetch working directory. | |
144 | // If select.base_dir is relative, we need to trim it from the start | |
145 | // of paths returned by ListDirectory. | |
146 | // If select.base_dir is absolute, we need to trim the "URI authority" | |
147 | // portion of the working directory. | |
148 | std::string wd; | |
149 | RETURN_NOT_OK(client_->GetWorkingDirectory(&wd)); | |
150 | ||
151 | if (!select.base_dir.empty() && select.base_dir.front() == '/') { | |
152 | // base_dir is absolute, only keep the URI authority portion. | |
153 | // As mentioned in StatSelector() above, the URI may contain unescaped | |
154 | // special chars and therefore may not be a valid URI, so we parse by hand. | |
155 | auto pos = wd.find("://"); // start of host:port portion | |
156 | if (pos == std::string::npos) { | |
157 | return Status::IOError("Unexpected HDFS working directory URI: ", wd); | |
158 | } | |
159 | pos = wd.find("/", pos + 3); // end of host:port portion | |
160 | if (pos == std::string::npos) { | |
161 | return Status::IOError("Unexpected HDFS working directory URI: ", wd); | |
162 | } | |
163 | wd = wd.substr(0, pos); // keep up until host:port (included) | |
164 | } else if (!wd.empty() && wd.back() != '/') { | |
165 | // For a relative lookup, trim leading slashes | |
166 | wd += '/'; | |
167 | } | |
168 | ||
169 | if (!select.base_dir.empty()) { | |
170 | ARROW_ASSIGN_OR_RAISE(auto info, GetFileInfo(select.base_dir)); | |
171 | if (info.type() == FileType::File) { | |
172 | return Status::IOError( | |
173 | "GetFileInfo expects base_dir of selector to be a directory, but '", | |
174 | select.base_dir, "' is a file"); | |
175 | } | |
176 | } | |
177 | RETURN_NOT_OK(StatSelector(wd, select.base_dir, select, 0, &results)); | |
178 | return results; | |
179 | } | |
180 | ||
181 | Status CreateDir(const std::string& path, bool recursive) { | |
182 | if (IsDirectory(path)) { | |
183 | return Status::OK(); | |
184 | } | |
185 | if (!recursive) { | |
186 | const auto parent = GetAbstractPathParent(path).first; | |
187 | if (!parent.empty() && !IsDirectory(parent)) { | |
188 | return Status::IOError("Cannot create directory '", path, | |
189 | "': parent is not a directory"); | |
190 | } | |
191 | } | |
192 | RETURN_NOT_OK(client_->MakeDirectory(path)); | |
193 | return Status::OK(); | |
194 | } | |
195 | ||
196 | Status DeleteDir(const std::string& path) { | |
197 | if (!IsDirectory(path)) { | |
198 | return Status::IOError("Cannot delete directory '", path, "': not a directory"); | |
199 | } | |
200 | RETURN_NOT_OK(client_->DeleteDirectory(path)); | |
201 | return Status::OK(); | |
202 | } | |
203 | ||
204 | Status DeleteDirContents(const std::string& path) { | |
205 | if (!IsDirectory(path)) { | |
206 | return Status::IOError("Cannot delete contents of directory '", path, | |
207 | "': not a directory"); | |
208 | } | |
209 | std::vector<std::string> file_list; | |
210 | RETURN_NOT_OK(client_->GetChildren(path, &file_list)); | |
211 | for (auto file : file_list) { | |
212 | RETURN_NOT_OK(client_->Delete(file, /*recursive=*/true)); | |
213 | } | |
214 | return Status::OK(); | |
215 | } | |
216 | ||
217 | Status DeleteFile(const std::string& path) { | |
218 | if (IsDirectory(path)) { | |
219 | return Status::IOError("path is a directory"); | |
220 | } | |
221 | RETURN_NOT_OK(client_->Delete(path)); | |
222 | return Status::OK(); | |
223 | } | |
224 | ||
225 | Status Move(const std::string& src, const std::string& dest) { | |
226 | auto st = client_->Rename(src, dest); | |
227 | if (st.IsIOError() && IsFile(src) && IsFile(dest)) { | |
228 | // Allow file -> file clobber | |
229 | RETURN_NOT_OK(client_->Delete(dest)); | |
230 | st = client_->Rename(src, dest); | |
231 | } | |
232 | return st; | |
233 | } | |
234 | ||
235 | Status CopyFile(const std::string& src, const std::string& dest) { | |
236 | return client_->Copy(src, dest); | |
237 | } | |
238 | ||
239 | Result<std::shared_ptr<io::InputStream>> OpenInputStream(const std::string& path) { | |
240 | std::shared_ptr<io::HdfsReadableFile> file; | |
241 | RETURN_NOT_OK(client_->OpenReadable(path, io_context_, &file)); | |
242 | return file; | |
243 | } | |
244 | ||
245 | Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(const std::string& path) { | |
246 | std::shared_ptr<io::HdfsReadableFile> file; | |
247 | RETURN_NOT_OK(client_->OpenReadable(path, io_context_, &file)); | |
248 | return file; | |
249 | } | |
250 | ||
251 | Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(const std::string& path) { | |
252 | bool append = false; | |
253 | return OpenOutputStreamGeneric(path, append); | |
254 | } | |
255 | ||
256 | Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(const std::string& path) { | |
257 | bool append = true; | |
258 | return OpenOutputStreamGeneric(path, append); | |
259 | } | |
260 | ||
261 | protected: | |
262 | const HdfsOptions options_; | |
263 | const io::IOContext io_context_; | |
264 | std::shared_ptr<::arrow::io::HadoopFileSystem> client_; | |
265 | ||
266 | void PathInfoToFileInfo(const io::HdfsPathInfo& info, FileInfo* out) { | |
267 | if (info.kind == io::ObjectType::DIRECTORY) { | |
268 | out->set_type(FileType::Directory); | |
269 | out->set_size(kNoSize); | |
270 | } else if (info.kind == io::ObjectType::FILE) { | |
271 | out->set_type(FileType::File); | |
272 | out->set_size(info.size); | |
273 | } | |
274 | out->set_mtime(ToTimePoint(info.last_modified_time)); | |
275 | } | |
276 | ||
277 | Result<std::shared_ptr<io::OutputStream>> OpenOutputStreamGeneric( | |
278 | const std::string& path, bool append) { | |
279 | std::shared_ptr<io::HdfsOutputStream> stream; | |
280 | RETURN_NOT_OK(client_->OpenWritable(path, append, options_.buffer_size, | |
281 | options_.replication, options_.default_block_size, | |
282 | &stream)); | |
283 | return stream; | |
284 | } | |
285 | ||
286 | bool IsDirectory(const std::string& path) { | |
287 | io::HdfsPathInfo info; | |
288 | return GetPathInfo(path, &info) && info.kind == io::ObjectType::DIRECTORY; | |
289 | } | |
290 | ||
291 | bool IsFile(const std::string& path) { | |
292 | io::HdfsPathInfo info; | |
293 | return GetPathInfo(path, &info) && info.kind == io::ObjectType::FILE; | |
294 | } | |
295 | ||
296 | bool GetPathInfo(const std::string& path, io::HdfsPathInfo* info) { | |
297 | return client_->GetPathInfo(path, info).ok(); | |
298 | } | |
299 | ||
300 | TimePoint ToTimePoint(int secs) { | |
301 | std::chrono::nanoseconds ns_count(static_cast<int64_t>(secs) * 1000000000); | |
302 | return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count)); | |
303 | } | |
304 | }; | |
305 | ||
306 | void HdfsOptions::ConfigureEndPoint(std::string host, int port) { | |
307 | connection_config.host = std::move(host); | |
308 | connection_config.port = port; | |
309 | } | |
310 | ||
311 | void HdfsOptions::ConfigureUser(std::string user_name) { | |
312 | connection_config.user = std::move(user_name); | |
313 | } | |
314 | ||
315 | void HdfsOptions::ConfigureKerberosTicketCachePath(std::string path) { | |
316 | connection_config.kerb_ticket = std::move(path); | |
317 | } | |
318 | ||
319 | void HdfsOptions::ConfigureReplication(int16_t replication) { | |
320 | this->replication = replication; | |
321 | } | |
322 | ||
323 | void HdfsOptions::ConfigureBufferSize(int32_t buffer_size) { | |
324 | this->buffer_size = buffer_size; | |
325 | } | |
326 | ||
327 | void HdfsOptions::ConfigureBlockSize(int64_t default_block_size) { | |
328 | this->default_block_size = default_block_size; | |
329 | } | |
330 | ||
331 | void HdfsOptions::ConfigureExtraConf(std::string key, std::string val) { | |
332 | connection_config.extra_conf.emplace(std::move(key), std::move(val)); | |
333 | } | |
334 | ||
335 | bool HdfsOptions::Equals(const HdfsOptions& other) const { | |
336 | return (buffer_size == other.buffer_size && replication == other.replication && | |
337 | default_block_size == other.default_block_size && | |
338 | connection_config.host == other.connection_config.host && | |
339 | connection_config.port == other.connection_config.port && | |
340 | connection_config.user == other.connection_config.user && | |
341 | connection_config.kerb_ticket == other.connection_config.kerb_ticket && | |
342 | connection_config.extra_conf == other.connection_config.extra_conf); | |
343 | } | |
344 | ||
345 | Result<HdfsOptions> HdfsOptions::FromUri(const Uri& uri) { | |
346 | HdfsOptions options; | |
347 | ||
348 | std::unordered_map<std::string, std::string> options_map; | |
349 | ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items()); | |
350 | for (const auto& kv : options_items) { | |
351 | options_map.emplace(kv.first, kv.second); | |
352 | } | |
353 | ||
354 | std::string host; | |
355 | host = uri.scheme() + "://" + uri.host(); | |
356 | ||
357 | // configure endpoint | |
358 | const auto port = uri.port(); | |
359 | if (port == -1) { | |
360 | // default port will be determined by hdfs FileSystem impl | |
361 | options.ConfigureEndPoint(host, 0); | |
362 | } else { | |
363 | options.ConfigureEndPoint(host, port); | |
364 | } | |
365 | ||
366 | // configure replication | |
367 | auto it = options_map.find("replication"); | |
368 | if (it != options_map.end()) { | |
369 | const auto& v = it->second; | |
370 | int16_t replication; | |
371 | if (!ParseValue<Int16Type>(v.data(), v.size(), &replication)) { | |
372 | return Status::Invalid("Invalid value for option 'replication': '", v, "'"); | |
373 | } | |
374 | options.ConfigureReplication(replication); | |
375 | options_map.erase(it); | |
376 | } | |
377 | ||
378 | // configure buffer_size | |
379 | it = options_map.find("buffer_size"); | |
380 | if (it != options_map.end()) { | |
381 | const auto& v = it->second; | |
382 | int32_t buffer_size; | |
383 | if (!ParseValue<Int32Type>(v.data(), v.size(), &buffer_size)) { | |
384 | return Status::Invalid("Invalid value for option 'buffer_size': '", v, "'"); | |
385 | } | |
386 | options.ConfigureBufferSize(buffer_size); | |
387 | options_map.erase(it); | |
388 | } | |
389 | ||
390 | // configure default_block_size | |
391 | it = options_map.find("default_block_size"); | |
392 | if (it != options_map.end()) { | |
393 | const auto& v = it->second; | |
394 | int64_t default_block_size; | |
395 | if (!ParseValue<Int64Type>(v.data(), v.size(), &default_block_size)) { | |
396 | return Status::Invalid("Invalid value for option 'default_block_size': '", v, "'"); | |
397 | } | |
398 | options.ConfigureBlockSize(default_block_size); | |
399 | options_map.erase(it); | |
400 | } | |
401 | ||
402 | // configure user | |
403 | it = options_map.find("user"); | |
404 | if (it != options_map.end()) { | |
405 | const auto& user = it->second; | |
406 | options.ConfigureUser(user); | |
407 | options_map.erase(it); | |
408 | } | |
409 | ||
410 | // configure kerberos | |
411 | it = options_map.find("kerb_ticket"); | |
412 | if (it != options_map.end()) { | |
413 | const auto& ticket = it->second; | |
414 | options.ConfigureKerberosTicketCachePath(ticket); | |
415 | options_map.erase(it); | |
416 | } | |
417 | ||
418 | // configure other options | |
419 | for (const auto& it : options_map) { | |
420 | options.ConfigureExtraConf(it.first, it.second); | |
421 | } | |
422 | ||
423 | return options; | |
424 | } | |
425 | ||
426 | Result<HdfsOptions> HdfsOptions::FromUri(const std::string& uri_string) { | |
427 | Uri uri; | |
428 | RETURN_NOT_OK(uri.Parse(uri_string)); | |
429 | return FromUri(uri); | |
430 | } | |
431 | ||
432 | HadoopFileSystem::HadoopFileSystem(const HdfsOptions& options, | |
433 | const io::IOContext& io_context) | |
434 | : FileSystem(io_context), impl_(new Impl{options, io_context_}) { | |
435 | default_async_is_sync_ = false; | |
436 | } | |
437 | ||
438 | HadoopFileSystem::~HadoopFileSystem() {} | |
439 | ||
440 | Result<std::shared_ptr<HadoopFileSystem>> HadoopFileSystem::Make( | |
441 | const HdfsOptions& options, const io::IOContext& io_context) { | |
442 | std::shared_ptr<HadoopFileSystem> ptr(new HadoopFileSystem(options, io_context)); | |
443 | RETURN_NOT_OK(ptr->impl_->Init()); | |
444 | return ptr; | |
445 | } | |
446 | ||
447 | Result<FileInfo> HadoopFileSystem::GetFileInfo(const std::string& path) { | |
448 | return impl_->GetFileInfo(path); | |
449 | } | |
450 | ||
451 | HdfsOptions HadoopFileSystem::options() const { return impl_->options(); } | |
452 | ||
453 | bool HadoopFileSystem::Equals(const FileSystem& other) const { | |
454 | if (this == &other) { | |
455 | return true; | |
456 | } | |
457 | if (other.type_name() != type_name()) { | |
458 | return false; | |
459 | } | |
460 | const auto& hdfs = ::arrow::internal::checked_cast<const HadoopFileSystem&>(other); | |
461 | return options().Equals(hdfs.options()); | |
462 | } | |
463 | ||
464 | Result<std::vector<FileInfo>> HadoopFileSystem::GetFileInfo(const FileSelector& select) { | |
465 | return impl_->GetFileInfo(select); | |
466 | } | |
467 | ||
468 | Status HadoopFileSystem::CreateDir(const std::string& path, bool recursive) { | |
469 | return impl_->CreateDir(path, recursive); | |
470 | } | |
471 | ||
472 | Status HadoopFileSystem::DeleteDir(const std::string& path) { | |
473 | return impl_->DeleteDir(path); | |
474 | } | |
475 | ||
476 | Status HadoopFileSystem::DeleteDirContents(const std::string& path) { | |
477 | if (internal::IsEmptyPath(path)) { | |
478 | return internal::InvalidDeleteDirContents(path); | |
479 | } | |
480 | return impl_->DeleteDirContents(path); | |
481 | } | |
482 | ||
483 | Status HadoopFileSystem::DeleteRootDirContents() { return impl_->DeleteDirContents(""); } | |
484 | ||
485 | Status HadoopFileSystem::DeleteFile(const std::string& path) { | |
486 | return impl_->DeleteFile(path); | |
487 | } | |
488 | ||
489 | Status HadoopFileSystem::Move(const std::string& src, const std::string& dest) { | |
490 | return impl_->Move(src, dest); | |
491 | } | |
492 | ||
493 | Status HadoopFileSystem::CopyFile(const std::string& src, const std::string& dest) { | |
494 | return impl_->CopyFile(src, dest); | |
495 | } | |
496 | ||
497 | Result<std::shared_ptr<io::InputStream>> HadoopFileSystem::OpenInputStream( | |
498 | const std::string& path) { | |
499 | return impl_->OpenInputStream(path); | |
500 | } | |
501 | ||
502 | Result<std::shared_ptr<io::RandomAccessFile>> HadoopFileSystem::OpenInputFile( | |
503 | const std::string& path) { | |
504 | return impl_->OpenInputFile(path); | |
505 | } | |
506 | ||
507 | Result<std::shared_ptr<io::OutputStream>> HadoopFileSystem::OpenOutputStream( | |
508 | const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { | |
509 | return impl_->OpenOutputStream(path); | |
510 | } | |
511 | ||
512 | Result<std::shared_ptr<io::OutputStream>> HadoopFileSystem::OpenAppendStream( | |
513 | const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { | |
514 | return impl_->OpenAppendStream(path); | |
515 | } | |
516 | ||
517 | } // namespace fs | |
518 | } // namespace arrow |