]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/filesystem/hdfs.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / filesystem / hdfs.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <chrono>
19 #include <cstring>
20 #include <unordered_map>
21 #include <utility>
22
23 #include "arrow/filesystem/hdfs.h"
24 #include "arrow/filesystem/path_util.h"
25 #include "arrow/filesystem/util_internal.h"
26 #include "arrow/io/hdfs.h"
27 #include "arrow/io/hdfs_internal.h"
28 #include "arrow/util/checked_cast.h"
29 #include "arrow/util/logging.h"
30 #include "arrow/util/value_parsing.h"
31 #include "arrow/util/windows_fixup.h"
32
33 namespace arrow {
34
35 using internal::ParseValue;
36 using internal::Uri;
37
38 namespace fs {
39
40 using internal::GetAbstractPathParent;
41 using internal::MakeAbstractPathRelative;
42 using internal::RemoveLeadingSlash;
43
44 class HadoopFileSystem::Impl {
45 public:
46 Impl(HdfsOptions options, const io::IOContext& io_context)
47 : options_(std::move(options)), io_context_(io_context) {}
48
49 ~Impl() {
50 Status st = Close();
51 if (!st.ok()) {
52 ARROW_LOG(WARNING) << "Failed to disconnect hdfs client: " << st.ToString();
53 }
54 }
55
56 Status Init() {
57 io::internal::LibHdfsShim* driver_shim;
58 RETURN_NOT_OK(ConnectLibHdfs(&driver_shim));
59 RETURN_NOT_OK(io::HadoopFileSystem::Connect(&options_.connection_config, &client_));
60 return Status::OK();
61 }
62
63 Status Close() {
64 if (client_) {
65 RETURN_NOT_OK(client_->Disconnect());
66 }
67 return Status::OK();
68 }
69
70 HdfsOptions options() const { return options_; }
71
72 Result<FileInfo> GetFileInfo(const std::string& path) {
73 // It has unfortunately been a frequent logic error to pass URIs down
74 // to GetFileInfo (e.g. ARROW-10264). Unlike other filesystems, HDFS
75 // silently accepts URIs but returns different results than if given the
76 // equivalent in-filesystem paths. Instead of raising cryptic errors
77 // later, notify the underlying problem immediately.
78 if (path.substr(0, 5) == "hdfs:") {
79 return Status::Invalid("GetFileInfo must not be passed a URI, got: ", path);
80 }
81 FileInfo info;
82 io::HdfsPathInfo path_info;
83 auto status = client_->GetPathInfo(path, &path_info);
84 info.set_path(path);
85 if (status.IsIOError()) {
86 info.set_type(FileType::NotFound);
87 return info;
88 }
89
90 PathInfoToFileInfo(path_info, &info);
91 return info;
92 }
93
94 Status StatSelector(const std::string& wd, const std::string& path,
95 const FileSelector& select, int nesting_depth,
96 std::vector<FileInfo>* out) {
97 std::vector<io::HdfsPathInfo> children;
98 Status st = client_->ListDirectory(path, &children);
99 if (!st.ok()) {
100 if (select.allow_not_found) {
101 ARROW_ASSIGN_OR_RAISE(auto info, GetFileInfo(path));
102 if (info.type() == FileType::NotFound) {
103 return Status::OK();
104 }
105 }
106 return st;
107 }
108 for (const auto& child_path_info : children) {
109 // HDFS returns an absolute "URI" here, need to extract path relative to wd
110 // XXX: unfortunately, this is not a real URI as special characters
111 // are not %-escaped... hence parsing it as URI would fail.
112 std::string child_path;
113 if (!wd.empty()) {
114 if (child_path_info.name.substr(0, wd.length()) != wd) {
115 return Status::IOError("HDFS returned path '", child_path_info.name,
116 "' that is not a child of '", wd, "'");
117 }
118 child_path = child_path_info.name.substr(wd.length());
119 } else {
120 child_path = child_path_info.name;
121 }
122
123 FileInfo info;
124 info.set_path(child_path);
125 PathInfoToFileInfo(child_path_info, &info);
126 const bool is_dir = info.type() == FileType::Directory;
127 out->push_back(std::move(info));
128 if (is_dir && select.recursive && nesting_depth < select.max_recursion) {
129 RETURN_NOT_OK(StatSelector(wd, child_path, select, nesting_depth + 1, out));
130 }
131 }
132 return Status::OK();
133 }
134
135 Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) {
136 // See GetFileInfo(const std::string&) above.
137 if (select.base_dir.substr(0, 5) == "hdfs:") {
138 return Status::Invalid("FileSelector.base_dir must not be a URI, got: ",
139 select.base_dir);
140 }
141 std::vector<FileInfo> results;
142
143 // Fetch working directory.
144 // If select.base_dir is relative, we need to trim it from the start
145 // of paths returned by ListDirectory.
146 // If select.base_dir is absolute, we need to trim the "URI authority"
147 // portion of the working directory.
148 std::string wd;
149 RETURN_NOT_OK(client_->GetWorkingDirectory(&wd));
150
151 if (!select.base_dir.empty() && select.base_dir.front() == '/') {
152 // base_dir is absolute, only keep the URI authority portion.
153 // As mentioned in StatSelector() above, the URI may contain unescaped
154 // special chars and therefore may not be a valid URI, so we parse by hand.
155 auto pos = wd.find("://"); // start of host:port portion
156 if (pos == std::string::npos) {
157 return Status::IOError("Unexpected HDFS working directory URI: ", wd);
158 }
159 pos = wd.find("/", pos + 3); // end of host:port portion
160 if (pos == std::string::npos) {
161 return Status::IOError("Unexpected HDFS working directory URI: ", wd);
162 }
163 wd = wd.substr(0, pos); // keep up until host:port (included)
164 } else if (!wd.empty() && wd.back() != '/') {
165 // For a relative lookup, trim leading slashes
166 wd += '/';
167 }
168
169 if (!select.base_dir.empty()) {
170 ARROW_ASSIGN_OR_RAISE(auto info, GetFileInfo(select.base_dir));
171 if (info.type() == FileType::File) {
172 return Status::IOError(
173 "GetFileInfo expects base_dir of selector to be a directory, but '",
174 select.base_dir, "' is a file");
175 }
176 }
177 RETURN_NOT_OK(StatSelector(wd, select.base_dir, select, 0, &results));
178 return results;
179 }
180
181 Status CreateDir(const std::string& path, bool recursive) {
182 if (IsDirectory(path)) {
183 return Status::OK();
184 }
185 if (!recursive) {
186 const auto parent = GetAbstractPathParent(path).first;
187 if (!parent.empty() && !IsDirectory(parent)) {
188 return Status::IOError("Cannot create directory '", path,
189 "': parent is not a directory");
190 }
191 }
192 RETURN_NOT_OK(client_->MakeDirectory(path));
193 return Status::OK();
194 }
195
196 Status DeleteDir(const std::string& path) {
197 if (!IsDirectory(path)) {
198 return Status::IOError("Cannot delete directory '", path, "': not a directory");
199 }
200 RETURN_NOT_OK(client_->DeleteDirectory(path));
201 return Status::OK();
202 }
203
204 Status DeleteDirContents(const std::string& path) {
205 if (!IsDirectory(path)) {
206 return Status::IOError("Cannot delete contents of directory '", path,
207 "': not a directory");
208 }
209 std::vector<std::string> file_list;
210 RETURN_NOT_OK(client_->GetChildren(path, &file_list));
211 for (auto file : file_list) {
212 RETURN_NOT_OK(client_->Delete(file, /*recursive=*/true));
213 }
214 return Status::OK();
215 }
216
217 Status DeleteFile(const std::string& path) {
218 if (IsDirectory(path)) {
219 return Status::IOError("path is a directory");
220 }
221 RETURN_NOT_OK(client_->Delete(path));
222 return Status::OK();
223 }
224
225 Status Move(const std::string& src, const std::string& dest) {
226 auto st = client_->Rename(src, dest);
227 if (st.IsIOError() && IsFile(src) && IsFile(dest)) {
228 // Allow file -> file clobber
229 RETURN_NOT_OK(client_->Delete(dest));
230 st = client_->Rename(src, dest);
231 }
232 return st;
233 }
234
235 Status CopyFile(const std::string& src, const std::string& dest) {
236 return client_->Copy(src, dest);
237 }
238
239 Result<std::shared_ptr<io::InputStream>> OpenInputStream(const std::string& path) {
240 std::shared_ptr<io::HdfsReadableFile> file;
241 RETURN_NOT_OK(client_->OpenReadable(path, io_context_, &file));
242 return file;
243 }
244
245 Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(const std::string& path) {
246 std::shared_ptr<io::HdfsReadableFile> file;
247 RETURN_NOT_OK(client_->OpenReadable(path, io_context_, &file));
248 return file;
249 }
250
251 Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(const std::string& path) {
252 bool append = false;
253 return OpenOutputStreamGeneric(path, append);
254 }
255
256 Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(const std::string& path) {
257 bool append = true;
258 return OpenOutputStreamGeneric(path, append);
259 }
260
261 protected:
262 const HdfsOptions options_;
263 const io::IOContext io_context_;
264 std::shared_ptr<::arrow::io::HadoopFileSystem> client_;
265
266 void PathInfoToFileInfo(const io::HdfsPathInfo& info, FileInfo* out) {
267 if (info.kind == io::ObjectType::DIRECTORY) {
268 out->set_type(FileType::Directory);
269 out->set_size(kNoSize);
270 } else if (info.kind == io::ObjectType::FILE) {
271 out->set_type(FileType::File);
272 out->set_size(info.size);
273 }
274 out->set_mtime(ToTimePoint(info.last_modified_time));
275 }
276
277 Result<std::shared_ptr<io::OutputStream>> OpenOutputStreamGeneric(
278 const std::string& path, bool append) {
279 std::shared_ptr<io::HdfsOutputStream> stream;
280 RETURN_NOT_OK(client_->OpenWritable(path, append, options_.buffer_size,
281 options_.replication, options_.default_block_size,
282 &stream));
283 return stream;
284 }
285
286 bool IsDirectory(const std::string& path) {
287 io::HdfsPathInfo info;
288 return GetPathInfo(path, &info) && info.kind == io::ObjectType::DIRECTORY;
289 }
290
291 bool IsFile(const std::string& path) {
292 io::HdfsPathInfo info;
293 return GetPathInfo(path, &info) && info.kind == io::ObjectType::FILE;
294 }
295
296 bool GetPathInfo(const std::string& path, io::HdfsPathInfo* info) {
297 return client_->GetPathInfo(path, info).ok();
298 }
299
300 TimePoint ToTimePoint(int secs) {
301 std::chrono::nanoseconds ns_count(static_cast<int64_t>(secs) * 1000000000);
302 return TimePoint(std::chrono::duration_cast<TimePoint::duration>(ns_count));
303 }
304 };
305
306 void HdfsOptions::ConfigureEndPoint(std::string host, int port) {
307 connection_config.host = std::move(host);
308 connection_config.port = port;
309 }
310
311 void HdfsOptions::ConfigureUser(std::string user_name) {
312 connection_config.user = std::move(user_name);
313 }
314
315 void HdfsOptions::ConfigureKerberosTicketCachePath(std::string path) {
316 connection_config.kerb_ticket = std::move(path);
317 }
318
319 void HdfsOptions::ConfigureReplication(int16_t replication) {
320 this->replication = replication;
321 }
322
323 void HdfsOptions::ConfigureBufferSize(int32_t buffer_size) {
324 this->buffer_size = buffer_size;
325 }
326
327 void HdfsOptions::ConfigureBlockSize(int64_t default_block_size) {
328 this->default_block_size = default_block_size;
329 }
330
331 void HdfsOptions::ConfigureExtraConf(std::string key, std::string val) {
332 connection_config.extra_conf.emplace(std::move(key), std::move(val));
333 }
334
335 bool HdfsOptions::Equals(const HdfsOptions& other) const {
336 return (buffer_size == other.buffer_size && replication == other.replication &&
337 default_block_size == other.default_block_size &&
338 connection_config.host == other.connection_config.host &&
339 connection_config.port == other.connection_config.port &&
340 connection_config.user == other.connection_config.user &&
341 connection_config.kerb_ticket == other.connection_config.kerb_ticket &&
342 connection_config.extra_conf == other.connection_config.extra_conf);
343 }
344
345 Result<HdfsOptions> HdfsOptions::FromUri(const Uri& uri) {
346 HdfsOptions options;
347
348 std::unordered_map<std::string, std::string> options_map;
349 ARROW_ASSIGN_OR_RAISE(const auto options_items, uri.query_items());
350 for (const auto& kv : options_items) {
351 options_map.emplace(kv.first, kv.second);
352 }
353
354 std::string host;
355 host = uri.scheme() + "://" + uri.host();
356
357 // configure endpoint
358 const auto port = uri.port();
359 if (port == -1) {
360 // default port will be determined by hdfs FileSystem impl
361 options.ConfigureEndPoint(host, 0);
362 } else {
363 options.ConfigureEndPoint(host, port);
364 }
365
366 // configure replication
367 auto it = options_map.find("replication");
368 if (it != options_map.end()) {
369 const auto& v = it->second;
370 int16_t replication;
371 if (!ParseValue<Int16Type>(v.data(), v.size(), &replication)) {
372 return Status::Invalid("Invalid value for option 'replication': '", v, "'");
373 }
374 options.ConfigureReplication(replication);
375 options_map.erase(it);
376 }
377
378 // configure buffer_size
379 it = options_map.find("buffer_size");
380 if (it != options_map.end()) {
381 const auto& v = it->second;
382 int32_t buffer_size;
383 if (!ParseValue<Int32Type>(v.data(), v.size(), &buffer_size)) {
384 return Status::Invalid("Invalid value for option 'buffer_size': '", v, "'");
385 }
386 options.ConfigureBufferSize(buffer_size);
387 options_map.erase(it);
388 }
389
390 // configure default_block_size
391 it = options_map.find("default_block_size");
392 if (it != options_map.end()) {
393 const auto& v = it->second;
394 int64_t default_block_size;
395 if (!ParseValue<Int64Type>(v.data(), v.size(), &default_block_size)) {
396 return Status::Invalid("Invalid value for option 'default_block_size': '", v, "'");
397 }
398 options.ConfigureBlockSize(default_block_size);
399 options_map.erase(it);
400 }
401
402 // configure user
403 it = options_map.find("user");
404 if (it != options_map.end()) {
405 const auto& user = it->second;
406 options.ConfigureUser(user);
407 options_map.erase(it);
408 }
409
410 // configure kerberos
411 it = options_map.find("kerb_ticket");
412 if (it != options_map.end()) {
413 const auto& ticket = it->second;
414 options.ConfigureKerberosTicketCachePath(ticket);
415 options_map.erase(it);
416 }
417
418 // configure other options
419 for (const auto& it : options_map) {
420 options.ConfigureExtraConf(it.first, it.second);
421 }
422
423 return options;
424 }
425
426 Result<HdfsOptions> HdfsOptions::FromUri(const std::string& uri_string) {
427 Uri uri;
428 RETURN_NOT_OK(uri.Parse(uri_string));
429 return FromUri(uri);
430 }
431
432 HadoopFileSystem::HadoopFileSystem(const HdfsOptions& options,
433 const io::IOContext& io_context)
434 : FileSystem(io_context), impl_(new Impl{options, io_context_}) {
435 default_async_is_sync_ = false;
436 }
437
438 HadoopFileSystem::~HadoopFileSystem() {}
439
440 Result<std::shared_ptr<HadoopFileSystem>> HadoopFileSystem::Make(
441 const HdfsOptions& options, const io::IOContext& io_context) {
442 std::shared_ptr<HadoopFileSystem> ptr(new HadoopFileSystem(options, io_context));
443 RETURN_NOT_OK(ptr->impl_->Init());
444 return ptr;
445 }
446
447 Result<FileInfo> HadoopFileSystem::GetFileInfo(const std::string& path) {
448 return impl_->GetFileInfo(path);
449 }
450
451 HdfsOptions HadoopFileSystem::options() const { return impl_->options(); }
452
453 bool HadoopFileSystem::Equals(const FileSystem& other) const {
454 if (this == &other) {
455 return true;
456 }
457 if (other.type_name() != type_name()) {
458 return false;
459 }
460 const auto& hdfs = ::arrow::internal::checked_cast<const HadoopFileSystem&>(other);
461 return options().Equals(hdfs.options());
462 }
463
464 Result<std::vector<FileInfo>> HadoopFileSystem::GetFileInfo(const FileSelector& select) {
465 return impl_->GetFileInfo(select);
466 }
467
468 Status HadoopFileSystem::CreateDir(const std::string& path, bool recursive) {
469 return impl_->CreateDir(path, recursive);
470 }
471
472 Status HadoopFileSystem::DeleteDir(const std::string& path) {
473 return impl_->DeleteDir(path);
474 }
475
476 Status HadoopFileSystem::DeleteDirContents(const std::string& path) {
477 if (internal::IsEmptyPath(path)) {
478 return internal::InvalidDeleteDirContents(path);
479 }
480 return impl_->DeleteDirContents(path);
481 }
482
483 Status HadoopFileSystem::DeleteRootDirContents() { return impl_->DeleteDirContents(""); }
484
485 Status HadoopFileSystem::DeleteFile(const std::string& path) {
486 return impl_->DeleteFile(path);
487 }
488
489 Status HadoopFileSystem::Move(const std::string& src, const std::string& dest) {
490 return impl_->Move(src, dest);
491 }
492
493 Status HadoopFileSystem::CopyFile(const std::string& src, const std::string& dest) {
494 return impl_->CopyFile(src, dest);
495 }
496
497 Result<std::shared_ptr<io::InputStream>> HadoopFileSystem::OpenInputStream(
498 const std::string& path) {
499 return impl_->OpenInputStream(path);
500 }
501
502 Result<std::shared_ptr<io::RandomAccessFile>> HadoopFileSystem::OpenInputFile(
503 const std::string& path) {
504 return impl_->OpenInputFile(path);
505 }
506
507 Result<std::shared_ptr<io::OutputStream>> HadoopFileSystem::OpenOutputStream(
508 const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
509 return impl_->OpenOutputStream(path);
510 }
511
512 Result<std::shared_ptr<io::OutputStream>> HadoopFileSystem::OpenAppendStream(
513 const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
514 return impl_->OpenAppendStream(path);
515 }
516
517 } // namespace fs
518 } // namespace arrow