]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/filesystem/hdfs.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
20 #include <unordered_map>
23 #include "arrow/filesystem/hdfs.h"
24 #include "arrow/filesystem/path_util.h"
25 #include "arrow/filesystem/util_internal.h"
26 #include "arrow/io/hdfs.h"
27 #include "arrow/io/hdfs_internal.h"
28 #include "arrow/util/checked_cast.h"
29 #include "arrow/util/logging.h"
30 #include "arrow/util/value_parsing.h"
31 #include "arrow/util/windows_fixup.h"
35 using internal::ParseValue
;
40 using internal::GetAbstractPathParent
;
41 using internal::MakeAbstractPathRelative
;
42 using internal::RemoveLeadingSlash
;
44 class HadoopFileSystem::Impl
{
46 Impl(HdfsOptions options
, const io::IOContext
& io_context
)
47 : options_(std::move(options
)), io_context_(io_context
) {}
52 ARROW_LOG(WARNING
) << "Failed to disconnect hdfs client: " << st
.ToString();
57 io::internal::LibHdfsShim
* driver_shim
;
58 RETURN_NOT_OK(ConnectLibHdfs(&driver_shim
));
59 RETURN_NOT_OK(io::HadoopFileSystem::Connect(&options_
.connection_config
, &client_
));
65 RETURN_NOT_OK(client_
->Disconnect());
70 HdfsOptions
options() const { return options_
; }
72 Result
<FileInfo
> GetFileInfo(const std::string
& path
) {
73 // It has unfortunately been a frequent logic error to pass URIs down
74 // to GetFileInfo (e.g. ARROW-10264). Unlike other filesystems, HDFS
75 // silently accepts URIs but returns different results than if given the
76 // equivalent in-filesystem paths. Instead of raising cryptic errors
77 // later, notify the underlying problem immediately.
78 if (path
.substr(0, 5) == "hdfs:") {
79 return Status::Invalid("GetFileInfo must not be passed a URI, got: ", path
);
82 io::HdfsPathInfo path_info
;
83 auto status
= client_
->GetPathInfo(path
, &path_info
);
85 if (status
.IsIOError()) {
86 info
.set_type(FileType::NotFound
);
90 PathInfoToFileInfo(path_info
, &info
);
94 Status
StatSelector(const std::string
& wd
, const std::string
& path
,
95 const FileSelector
& select
, int nesting_depth
,
96 std::vector
<FileInfo
>* out
) {
97 std::vector
<io::HdfsPathInfo
> children
;
98 Status st
= client_
->ListDirectory(path
, &children
);
100 if (select
.allow_not_found
) {
101 ARROW_ASSIGN_OR_RAISE(auto info
, GetFileInfo(path
));
102 if (info
.type() == FileType::NotFound
) {
108 for (const auto& child_path_info
: children
) {
109 // HDFS returns an absolute "URI" here, need to extract path relative to wd
110 // XXX: unfortunately, this is not a real URI as special characters
111 // are not %-escaped... hence parsing it as URI would fail.
112 std::string child_path
;
114 if (child_path_info
.name
.substr(0, wd
.length()) != wd
) {
115 return Status::IOError("HDFS returned path '", child_path_info
.name
,
116 "' that is not a child of '", wd
, "'");
118 child_path
= child_path_info
.name
.substr(wd
.length());
120 child_path
= child_path_info
.name
;
124 info
.set_path(child_path
);
125 PathInfoToFileInfo(child_path_info
, &info
);
126 const bool is_dir
= info
.type() == FileType::Directory
;
127 out
->push_back(std::move(info
));
128 if (is_dir
&& select
.recursive
&& nesting_depth
< select
.max_recursion
) {
129 RETURN_NOT_OK(StatSelector(wd
, child_path
, select
, nesting_depth
+ 1, out
));
135 Result
<std::vector
<FileInfo
>> GetFileInfo(const FileSelector
& select
) {
136 // See GetFileInfo(const std::string&) above.
137 if (select
.base_dir
.substr(0, 5) == "hdfs:") {
138 return Status::Invalid("FileSelector.base_dir must not be a URI, got: ",
141 std::vector
<FileInfo
> results
;
143 // Fetch working directory.
144 // If select.base_dir is relative, we need to trim it from the start
145 // of paths returned by ListDirectory.
146 // If select.base_dir is absolute, we need to trim the "URI authority"
147 // portion of the working directory.
149 RETURN_NOT_OK(client_
->GetWorkingDirectory(&wd
));
151 if (!select
.base_dir
.empty() && select
.base_dir
.front() == '/') {
152 // base_dir is absolute, only keep the URI authority portion.
153 // As mentioned in StatSelector() above, the URI may contain unescaped
154 // special chars and therefore may not be a valid URI, so we parse by hand.
155 auto pos
= wd
.find("://"); // start of host:port portion
156 if (pos
== std::string::npos
) {
157 return Status::IOError("Unexpected HDFS working directory URI: ", wd
);
159 pos
= wd
.find("/", pos
+ 3); // end of host:port portion
160 if (pos
== std::string::npos
) {
161 return Status::IOError("Unexpected HDFS working directory URI: ", wd
);
163 wd
= wd
.substr(0, pos
); // keep up until host:port (included)
164 } else if (!wd
.empty() && wd
.back() != '/') {
165 // For a relative lookup, trim leading slashes
169 if (!select
.base_dir
.empty()) {
170 ARROW_ASSIGN_OR_RAISE(auto info
, GetFileInfo(select
.base_dir
));
171 if (info
.type() == FileType::File
) {
172 return Status::IOError(
173 "GetFileInfo expects base_dir of selector to be a directory, but '",
174 select
.base_dir
, "' is a file");
177 RETURN_NOT_OK(StatSelector(wd
, select
.base_dir
, select
, 0, &results
));
181 Status
CreateDir(const std::string
& path
, bool recursive
) {
182 if (IsDirectory(path
)) {
186 const auto parent
= GetAbstractPathParent(path
).first
;
187 if (!parent
.empty() && !IsDirectory(parent
)) {
188 return Status::IOError("Cannot create directory '", path
,
189 "': parent is not a directory");
192 RETURN_NOT_OK(client_
->MakeDirectory(path
));
196 Status
DeleteDir(const std::string
& path
) {
197 if (!IsDirectory(path
)) {
198 return Status::IOError("Cannot delete directory '", path
, "': not a directory");
200 RETURN_NOT_OK(client_
->DeleteDirectory(path
));
204 Status
DeleteDirContents(const std::string
& path
) {
205 if (!IsDirectory(path
)) {
206 return Status::IOError("Cannot delete contents of directory '", path
,
207 "': not a directory");
209 std::vector
<std::string
> file_list
;
210 RETURN_NOT_OK(client_
->GetChildren(path
, &file_list
));
211 for (auto file
: file_list
) {
212 RETURN_NOT_OK(client_
->Delete(file
, /*recursive=*/true));
217 Status
DeleteFile(const std::string
& path
) {
218 if (IsDirectory(path
)) {
219 return Status::IOError("path is a directory");
221 RETURN_NOT_OK(client_
->Delete(path
));
225 Status
Move(const std::string
& src
, const std::string
& dest
) {
226 auto st
= client_
->Rename(src
, dest
);
227 if (st
.IsIOError() && IsFile(src
) && IsFile(dest
)) {
228 // Allow file -> file clobber
229 RETURN_NOT_OK(client_
->Delete(dest
));
230 st
= client_
->Rename(src
, dest
);
235 Status
CopyFile(const std::string
& src
, const std::string
& dest
) {
236 return client_
->Copy(src
, dest
);
239 Result
<std::shared_ptr
<io::InputStream
>> OpenInputStream(const std::string
& path
) {
240 std::shared_ptr
<io::HdfsReadableFile
> file
;
241 RETURN_NOT_OK(client_
->OpenReadable(path
, io_context_
, &file
));
245 Result
<std::shared_ptr
<io::RandomAccessFile
>> OpenInputFile(const std::string
& path
) {
246 std::shared_ptr
<io::HdfsReadableFile
> file
;
247 RETURN_NOT_OK(client_
->OpenReadable(path
, io_context_
, &file
));
251 Result
<std::shared_ptr
<io::OutputStream
>> OpenOutputStream(const std::string
& path
) {
253 return OpenOutputStreamGeneric(path
, append
);
256 Result
<std::shared_ptr
<io::OutputStream
>> OpenAppendStream(const std::string
& path
) {
258 return OpenOutputStreamGeneric(path
, append
);
262 const HdfsOptions options_
;
263 const io::IOContext io_context_
;
264 std::shared_ptr
<::arrow::io::HadoopFileSystem
> client_
;
266 void PathInfoToFileInfo(const io::HdfsPathInfo
& info
, FileInfo
* out
) {
267 if (info
.kind
== io::ObjectType::DIRECTORY
) {
268 out
->set_type(FileType::Directory
);
269 out
->set_size(kNoSize
);
270 } else if (info
.kind
== io::ObjectType::FILE) {
271 out
->set_type(FileType::File
);
272 out
->set_size(info
.size
);
274 out
->set_mtime(ToTimePoint(info
.last_modified_time
));
277 Result
<std::shared_ptr
<io::OutputStream
>> OpenOutputStreamGeneric(
278 const std::string
& path
, bool append
) {
279 std::shared_ptr
<io::HdfsOutputStream
> stream
;
280 RETURN_NOT_OK(client_
->OpenWritable(path
, append
, options_
.buffer_size
,
281 options_
.replication
, options_
.default_block_size
,
286 bool IsDirectory(const std::string
& path
) {
287 io::HdfsPathInfo info
;
288 return GetPathInfo(path
, &info
) && info
.kind
== io::ObjectType::DIRECTORY
;
291 bool IsFile(const std::string
& path
) {
292 io::HdfsPathInfo info
;
293 return GetPathInfo(path
, &info
) && info
.kind
== io::ObjectType::FILE;
296 bool GetPathInfo(const std::string
& path
, io::HdfsPathInfo
* info
) {
297 return client_
->GetPathInfo(path
, info
).ok();
300 TimePoint
ToTimePoint(int secs
) {
301 std::chrono::nanoseconds
ns_count(static_cast<int64_t>(secs
) * 1000000000);
302 return TimePoint(std::chrono::duration_cast
<TimePoint::duration
>(ns_count
));
306 void HdfsOptions::ConfigureEndPoint(std::string host
, int port
) {
307 connection_config
.host
= std::move(host
);
308 connection_config
.port
= port
;
311 void HdfsOptions::ConfigureUser(std::string user_name
) {
312 connection_config
.user
= std::move(user_name
);
315 void HdfsOptions::ConfigureKerberosTicketCachePath(std::string path
) {
316 connection_config
.kerb_ticket
= std::move(path
);
319 void HdfsOptions::ConfigureReplication(int16_t replication
) {
320 this->replication
= replication
;
323 void HdfsOptions::ConfigureBufferSize(int32_t buffer_size
) {
324 this->buffer_size
= buffer_size
;
327 void HdfsOptions::ConfigureBlockSize(int64_t default_block_size
) {
328 this->default_block_size
= default_block_size
;
331 void HdfsOptions::ConfigureExtraConf(std::string key
, std::string val
) {
332 connection_config
.extra_conf
.emplace(std::move(key
), std::move(val
));
335 bool HdfsOptions::Equals(const HdfsOptions
& other
) const {
336 return (buffer_size
== other
.buffer_size
&& replication
== other
.replication
&&
337 default_block_size
== other
.default_block_size
&&
338 connection_config
.host
== other
.connection_config
.host
&&
339 connection_config
.port
== other
.connection_config
.port
&&
340 connection_config
.user
== other
.connection_config
.user
&&
341 connection_config
.kerb_ticket
== other
.connection_config
.kerb_ticket
&&
342 connection_config
.extra_conf
== other
.connection_config
.extra_conf
);
345 Result
<HdfsOptions
> HdfsOptions::FromUri(const Uri
& uri
) {
348 std::unordered_map
<std::string
, std::string
> options_map
;
349 ARROW_ASSIGN_OR_RAISE(const auto options_items
, uri
.query_items());
350 for (const auto& kv
: options_items
) {
351 options_map
.emplace(kv
.first
, kv
.second
);
355 host
= uri
.scheme() + "://" + uri
.host();
357 // configure endpoint
358 const auto port
= uri
.port();
360 // default port will be determined by hdfs FileSystem impl
361 options
.ConfigureEndPoint(host
, 0);
363 options
.ConfigureEndPoint(host
, port
);
366 // configure replication
367 auto it
= options_map
.find("replication");
368 if (it
!= options_map
.end()) {
369 const auto& v
= it
->second
;
371 if (!ParseValue
<Int16Type
>(v
.data(), v
.size(), &replication
)) {
372 return Status::Invalid("Invalid value for option 'replication': '", v
, "'");
374 options
.ConfigureReplication(replication
);
375 options_map
.erase(it
);
378 // configure buffer_size
379 it
= options_map
.find("buffer_size");
380 if (it
!= options_map
.end()) {
381 const auto& v
= it
->second
;
383 if (!ParseValue
<Int32Type
>(v
.data(), v
.size(), &buffer_size
)) {
384 return Status::Invalid("Invalid value for option 'buffer_size': '", v
, "'");
386 options
.ConfigureBufferSize(buffer_size
);
387 options_map
.erase(it
);
390 // configure default_block_size
391 it
= options_map
.find("default_block_size");
392 if (it
!= options_map
.end()) {
393 const auto& v
= it
->second
;
394 int64_t default_block_size
;
395 if (!ParseValue
<Int64Type
>(v
.data(), v
.size(), &default_block_size
)) {
396 return Status::Invalid("Invalid value for option 'default_block_size': '", v
, "'");
398 options
.ConfigureBlockSize(default_block_size
);
399 options_map
.erase(it
);
403 it
= options_map
.find("user");
404 if (it
!= options_map
.end()) {
405 const auto& user
= it
->second
;
406 options
.ConfigureUser(user
);
407 options_map
.erase(it
);
410 // configure kerberos
411 it
= options_map
.find("kerb_ticket");
412 if (it
!= options_map
.end()) {
413 const auto& ticket
= it
->second
;
414 options
.ConfigureKerberosTicketCachePath(ticket
);
415 options_map
.erase(it
);
418 // configure other options
419 for (const auto& it
: options_map
) {
420 options
.ConfigureExtraConf(it
.first
, it
.second
);
426 Result
<HdfsOptions
> HdfsOptions::FromUri(const std::string
& uri_string
) {
428 RETURN_NOT_OK(uri
.Parse(uri_string
));
432 HadoopFileSystem::HadoopFileSystem(const HdfsOptions
& options
,
433 const io::IOContext
& io_context
)
434 : FileSystem(io_context
), impl_(new Impl
{options
, io_context_
}) {
435 default_async_is_sync_
= false;
438 HadoopFileSystem::~HadoopFileSystem() {}
440 Result
<std::shared_ptr
<HadoopFileSystem
>> HadoopFileSystem::Make(
441 const HdfsOptions
& options
, const io::IOContext
& io_context
) {
442 std::shared_ptr
<HadoopFileSystem
> ptr(new HadoopFileSystem(options
, io_context
));
443 RETURN_NOT_OK(ptr
->impl_
->Init());
447 Result
<FileInfo
> HadoopFileSystem::GetFileInfo(const std::string
& path
) {
448 return impl_
->GetFileInfo(path
);
451 HdfsOptions
HadoopFileSystem::options() const { return impl_
->options(); }
453 bool HadoopFileSystem::Equals(const FileSystem
& other
) const {
454 if (this == &other
) {
457 if (other
.type_name() != type_name()) {
460 const auto& hdfs
= ::arrow::internal::checked_cast
<const HadoopFileSystem
&>(other
);
461 return options().Equals(hdfs
.options());
464 Result
<std::vector
<FileInfo
>> HadoopFileSystem::GetFileInfo(const FileSelector
& select
) {
465 return impl_
->GetFileInfo(select
);
468 Status
HadoopFileSystem::CreateDir(const std::string
& path
, bool recursive
) {
469 return impl_
->CreateDir(path
, recursive
);
472 Status
HadoopFileSystem::DeleteDir(const std::string
& path
) {
473 return impl_
->DeleteDir(path
);
476 Status
HadoopFileSystem::DeleteDirContents(const std::string
& path
) {
477 if (internal::IsEmptyPath(path
)) {
478 return internal::InvalidDeleteDirContents(path
);
480 return impl_
->DeleteDirContents(path
);
483 Status
HadoopFileSystem::DeleteRootDirContents() { return impl_
->DeleteDirContents(""); }
485 Status
HadoopFileSystem::DeleteFile(const std::string
& path
) {
486 return impl_
->DeleteFile(path
);
489 Status
HadoopFileSystem::Move(const std::string
& src
, const std::string
& dest
) {
490 return impl_
->Move(src
, dest
);
493 Status
HadoopFileSystem::CopyFile(const std::string
& src
, const std::string
& dest
) {
494 return impl_
->CopyFile(src
, dest
);
497 Result
<std::shared_ptr
<io::InputStream
>> HadoopFileSystem::OpenInputStream(
498 const std::string
& path
) {
499 return impl_
->OpenInputStream(path
);
502 Result
<std::shared_ptr
<io::RandomAccessFile
>> HadoopFileSystem::OpenInputFile(
503 const std::string
& path
) {
504 return impl_
->OpenInputFile(path
);
507 Result
<std::shared_ptr
<io::OutputStream
>> HadoopFileSystem::OpenOutputStream(
508 const std::string
& path
, const std::shared_ptr
<const KeyValueMetadata
>& metadata
) {
509 return impl_
->OpenOutputStream(path
);
512 Result
<std::shared_ptr
<io::OutputStream
>> HadoopFileSystem::OpenAppendStream(
513 const std::string
& path
, const std::shared_ptr
<const KeyValueMetadata
>& metadata
) {
514 return impl_
->OpenAppendStream(path
);