]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/io/hdfs.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
27 #include <unordered_map>
31 #include "arrow/buffer.h"
32 #include "arrow/io/hdfs.h"
33 #include "arrow/io/hdfs_internal.h"
34 #include "arrow/io/interfaces.h"
35 #include "arrow/memory_pool.h"
36 #include "arrow/result.h"
37 #include "arrow/status.h"
38 #include "arrow/util/io_util.h"
39 #include "arrow/util/logging.h"
45 using internal::IOErrorFromErrno
;
51 std::string
TranslateErrno(int error_code
) {
53 ss
<< error_code
<< " (" << strerror(error_code
) << ")";
54 if (error_code
== 255) {
55 // Unknown error can occur if the host is correct but the port is not
56 ss
<< " Please check that you are connecting to the correct HDFS RPC port";
63 #define CHECK_FAILURE(RETURN_VALUE, WHAT) \
65 if (RETURN_VALUE == -1) { \
66 return Status::IOError("HDFS ", WHAT, " failed, errno: ", TranslateErrno(errno)); \
70 static constexpr int kDefaultHdfsBufferSize
= 1 << 16;
72 // ----------------------------------------------------------------------
75 class HdfsAnyFileImpl
{
77 void set_members(const std::string
& path
, internal::LibHdfsShim
* driver
, hdfsFS fs
,
86 Status
Seek(int64_t position
) {
87 RETURN_NOT_OK(CheckClosed());
88 int ret
= driver_
->Seek(fs_
, file_
, position
);
89 CHECK_FAILURE(ret
, "seek");
93 Result
<int64_t> Tell() {
94 RETURN_NOT_OK(CheckClosed());
95 int64_t ret
= driver_
->Tell(fs_
, file_
);
96 CHECK_FAILURE(ret
, "tell");
100 bool is_open() const { return is_open_
; }
103 Status
CheckClosed() {
105 return Status::Invalid("Operation on closed HDFS file");
112 internal::LibHdfsShim
* driver_
;
117 // These are pointers in libhdfs, so OK to copy
126 Status
GetPathInfoFailed(const std::string
& path
) {
127 std::stringstream ss
;
128 ss
<< "Calling GetPathInfo for " << path
<< " failed. errno: " << TranslateErrno(errno
);
129 return Status::IOError(ss
.str());
134 // Private implementation for read-only files
135 class HdfsReadableFile::HdfsReadableFileImpl
: public HdfsAnyFileImpl
{
137 explicit HdfsReadableFileImpl(MemoryPool
* pool
) : pool_(pool
) {}
141 // is_open_ must be set to false in the beginning, because the destructor
142 // attempts to close the stream again, and if the first close fails, then
143 // the error doesn't get propagated properly and the second close
144 // initiated by the destructor raises a segfault
146 int ret
= driver_
->CloseFile(fs_
, file_
);
147 CHECK_FAILURE(ret
, "CloseFile");
152 bool closed() const { return !is_open_
; }
154 Result
<int64_t> ReadAt(int64_t position
, int64_t nbytes
, uint8_t* buffer
) {
155 RETURN_NOT_OK(CheckClosed());
156 if (!driver_
->HasPread()) {
157 std::lock_guard
<std::mutex
> guard(lock_
);
158 RETURN_NOT_OK(Seek(position
));
159 return Read(nbytes
, buffer
);
162 constexpr int64_t kMaxBlockSize
= std::numeric_limits
<int32_t>::max();
163 int64_t total_bytes
= 0;
165 const auto block_size
= static_cast<tSize
>(std::min(kMaxBlockSize
, nbytes
));
167 driver_
->Pread(fs_
, file_
, static_cast<tOffset
>(position
), buffer
, block_size
);
168 CHECK_FAILURE(ret
, "read");
169 DCHECK_LE(ret
, block_size
);
181 Result
<std::shared_ptr
<Buffer
>> ReadAt(int64_t position
, int64_t nbytes
) {
182 RETURN_NOT_OK(CheckClosed());
184 ARROW_ASSIGN_OR_RAISE(auto buffer
, AllocateResizableBuffer(nbytes
, pool_
));
185 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read
,
186 ReadAt(position
, nbytes
, buffer
->mutable_data()));
187 if (bytes_read
< nbytes
) {
188 RETURN_NOT_OK(buffer
->Resize(bytes_read
));
189 buffer
->ZeroPadding();
191 return std::move(buffer
);
194 Result
<int64_t> Read(int64_t nbytes
, void* buffer
) {
195 RETURN_NOT_OK(CheckClosed());
197 int64_t total_bytes
= 0;
198 while (total_bytes
< nbytes
) {
199 tSize ret
= driver_
->Read(
200 fs_
, file_
, reinterpret_cast<uint8_t*>(buffer
) + total_bytes
,
201 static_cast<tSize
>(std::min
<int64_t>(buffer_size_
, nbytes
- total_bytes
)));
202 CHECK_FAILURE(ret
, "read");
211 Result
<std::shared_ptr
<Buffer
>> Read(int64_t nbytes
) {
212 RETURN_NOT_OK(CheckClosed());
214 ARROW_ASSIGN_OR_RAISE(auto buffer
, AllocateResizableBuffer(nbytes
, pool_
));
215 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read
, Read(nbytes
, buffer
->mutable_data()));
216 if (bytes_read
< nbytes
) {
217 RETURN_NOT_OK(buffer
->Resize(bytes_read
));
219 return std::move(buffer
);
222 Result
<int64_t> GetSize() {
223 RETURN_NOT_OK(CheckClosed());
225 hdfsFileInfo
* entry
= driver_
->GetPathInfo(fs_
, path_
.c_str());
226 if (entry
== nullptr) {
227 return GetPathInfoFailed(path_
);
229 int64_t size
= entry
->mSize
;
230 driver_
->FreeFileInfo(entry
, 1);
234 void set_memory_pool(MemoryPool
* pool
) { pool_
= pool
; }
236 void set_buffer_size(int32_t buffer_size
) { buffer_size_
= buffer_size
; }
240 int32_t buffer_size_
;
243 HdfsReadableFile::HdfsReadableFile(const io::IOContext
& io_context
) {
244 impl_
.reset(new HdfsReadableFileImpl(io_context
.pool()));
247 HdfsReadableFile::~HdfsReadableFile() { DCHECK_OK(impl_
->Close()); }
249 Status
HdfsReadableFile::Close() { return impl_
->Close(); }
251 bool HdfsReadableFile::closed() const { return impl_
->closed(); }
253 Result
<int64_t> HdfsReadableFile::ReadAt(int64_t position
, int64_t nbytes
, void* buffer
) {
254 return impl_
->ReadAt(position
, nbytes
, reinterpret_cast<uint8_t*>(buffer
));
257 Result
<std::shared_ptr
<Buffer
>> HdfsReadableFile::ReadAt(int64_t position
,
259 return impl_
->ReadAt(position
, nbytes
);
262 Result
<int64_t> HdfsReadableFile::Read(int64_t nbytes
, void* buffer
) {
263 return impl_
->Read(nbytes
, buffer
);
266 Result
<std::shared_ptr
<Buffer
>> HdfsReadableFile::Read(int64_t nbytes
) {
267 return impl_
->Read(nbytes
);
270 Result
<int64_t> HdfsReadableFile::GetSize() { return impl_
->GetSize(); }
272 Status
HdfsReadableFile::Seek(int64_t position
) { return impl_
->Seek(position
); }
274 Result
<int64_t> HdfsReadableFile::Tell() const { return impl_
->Tell(); }
276 // ----------------------------------------------------------------------
279 // Private implementation for writable-only files
280 class HdfsOutputStream::HdfsOutputStreamImpl
: public HdfsAnyFileImpl
{
282 HdfsOutputStreamImpl() {}
286 // is_open_ must be set to false in the beginning, because the destructor
287 // attempts to close the stream again, and if the first close fails, then
288 // the error doesn't get propagated properly and the second close
289 // initiated by the destructor raises a segfault
291 RETURN_NOT_OK(FlushInternal());
292 int ret
= driver_
->CloseFile(fs_
, file_
);
293 CHECK_FAILURE(ret
, "CloseFile");
298 bool closed() const { return !is_open_
; }
301 RETURN_NOT_OK(CheckClosed());
303 return FlushInternal();
306 Status
Write(const uint8_t* buffer
, int64_t nbytes
) {
307 RETURN_NOT_OK(CheckClosed());
309 constexpr int64_t kMaxBlockSize
= std::numeric_limits
<int32_t>::max();
311 std::lock_guard
<std::mutex
> guard(lock_
);
313 const auto block_size
= static_cast<tSize
>(std::min(kMaxBlockSize
, nbytes
));
314 tSize ret
= driver_
->Write(fs_
, file_
, buffer
, block_size
);
315 CHECK_FAILURE(ret
, "Write");
316 DCHECK_LE(ret
, block_size
);
324 Status
FlushInternal() {
325 int ret
= driver_
->Flush(fs_
, file_
);
326 CHECK_FAILURE(ret
, "Flush");
331 HdfsOutputStream::HdfsOutputStream() { impl_
.reset(new HdfsOutputStreamImpl()); }
333 HdfsOutputStream::~HdfsOutputStream() { DCHECK_OK(impl_
->Close()); }
335 Status
HdfsOutputStream::Close() { return impl_
->Close(); }
337 bool HdfsOutputStream::closed() const { return impl_
->closed(); }
339 Status
HdfsOutputStream::Write(const void* buffer
, int64_t nbytes
) {
340 return impl_
->Write(reinterpret_cast<const uint8_t*>(buffer
), nbytes
);
343 Status
HdfsOutputStream::Flush() { return impl_
->Flush(); }
345 Result
<int64_t> HdfsOutputStream::Tell() const { return impl_
->Tell(); }
347 // ----------------------------------------------------------------------
350 // TODO(wesm): this could throw std::bad_alloc in the course of copying strings
351 // into the path info object
352 static void SetPathInfo(const hdfsFileInfo
* input
, HdfsPathInfo
* out
) {
353 out
->kind
= input
->mKind
== kObjectKindFile
? ObjectType::FILE : ObjectType::DIRECTORY
;
354 out
->name
= std::string(input
->mName
);
355 out
->owner
= std::string(input
->mOwner
);
356 out
->group
= std::string(input
->mGroup
);
358 out
->last_access_time
= static_cast<int32_t>(input
->mLastAccess
);
359 out
->last_modified_time
= static_cast<int32_t>(input
->mLastMod
);
360 out
->size
= static_cast<int64_t>(input
->mSize
);
362 out
->replication
= input
->mReplication
;
363 out
->block_size
= input
->mBlockSize
;
365 out
->permissions
= input
->mPermissions
;
368 // Private implementation
369 class HadoopFileSystem::HadoopFileSystemImpl
{
371 HadoopFileSystemImpl() : driver_(NULLPTR
), port_(0), fs_(NULLPTR
) {}
373 Status
Connect(const HdfsConnectionConfig
* config
) {
374 RETURN_NOT_OK(ConnectLibHdfs(&driver_
));
376 // connect to HDFS with the builder object
377 hdfsBuilder
* builder
= driver_
->NewBuilder();
378 if (!config
->host
.empty()) {
379 driver_
->BuilderSetNameNode(builder
, config
->host
.c_str());
381 driver_
->BuilderSetNameNodePort(builder
, static_cast<tPort
>(config
->port
));
382 if (!config
->user
.empty()) {
383 driver_
->BuilderSetUserName(builder
, config
->user
.c_str());
385 if (!config
->kerb_ticket
.empty()) {
386 driver_
->BuilderSetKerbTicketCachePath(builder
, config
->kerb_ticket
.c_str());
389 for (const auto& kv
: config
->extra_conf
) {
390 int ret
= driver_
->BuilderConfSetStr(builder
, kv
.first
.c_str(), kv
.second
.c_str());
391 CHECK_FAILURE(ret
, "confsetstr");
394 driver_
->BuilderSetForceNewInstance(builder
);
395 fs_
= driver_
->BuilderConnect(builder
);
397 if (fs_
== nullptr) {
398 return Status::IOError("HDFS connection failed");
400 namenode_host_
= config
->host
;
401 port_
= config
->port
;
402 user_
= config
->user
;
403 kerb_ticket_
= config
->kerb_ticket
;
408 Status
MakeDirectory(const std::string
& path
) {
409 int ret
= driver_
->MakeDirectory(fs_
, path
.c_str());
410 CHECK_FAILURE(ret
, "create directory");
414 Status
Delete(const std::string
& path
, bool recursive
) {
415 int ret
= driver_
->Delete(fs_
, path
.c_str(), static_cast<int>(recursive
));
416 CHECK_FAILURE(ret
, "delete");
420 Status
Disconnect() {
421 int ret
= driver_
->Disconnect(fs_
);
422 CHECK_FAILURE(ret
, "hdfsFS::Disconnect");
426 bool Exists(const std::string
& path
) {
427 // hdfsExists does not distinguish between RPC failure and the file not
429 int ret
= driver_
->Exists(fs_
, path
.c_str());
433 Status
GetCapacity(int64_t* nbytes
) {
434 tOffset ret
= driver_
->GetCapacity(fs_
);
435 CHECK_FAILURE(ret
, "GetCapacity");
440 Status
GetUsed(int64_t* nbytes
) {
441 tOffset ret
= driver_
->GetUsed(fs_
);
442 CHECK_FAILURE(ret
, "GetUsed");
447 Status
GetWorkingDirectory(std::string
* out
) {
449 if (driver_
->GetWorkingDirectory(fs_
, buffer
, sizeof(buffer
) - 1) == nullptr) {
450 return Status::IOError("HDFS GetWorkingDirectory failed, errno: ",
451 TranslateErrno(errno
));
457 Status
GetPathInfo(const std::string
& path
, HdfsPathInfo
* info
) {
458 hdfsFileInfo
* entry
= driver_
->GetPathInfo(fs_
, path
.c_str());
460 if (entry
== nullptr) {
461 return GetPathInfoFailed(path
);
464 SetPathInfo(entry
, info
);
465 driver_
->FreeFileInfo(entry
, 1);
470 Status
Stat(const std::string
& path
, FileStatistics
* stat
) {
472 RETURN_NOT_OK(GetPathInfo(path
, &info
));
474 stat
->size
= info
.size
;
475 stat
->kind
= info
.kind
;
479 Status
GetChildren(const std::string
& path
, std::vector
<std::string
>* listing
) {
480 std::vector
<HdfsPathInfo
> detailed_listing
;
481 RETURN_NOT_OK(ListDirectory(path
, &detailed_listing
));
482 for (const auto& info
: detailed_listing
) {
483 listing
->push_back(info
.name
);
488 Status
ListDirectory(const std::string
& path
, std::vector
<HdfsPathInfo
>* listing
) {
491 hdfsFileInfo
* entries
= driver_
->ListDirectory(fs_
, path
.c_str(), &num_entries
);
493 if (entries
== nullptr) {
494 // If the directory is empty, entries is NULL but errno is 0. Non-zero
495 // errno indicates error
497 // Note: errno is thread-local
499 // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set
500 // errno 2/ENOENT for empty directories. To be more robust to this we
501 // double check this case
502 if ((errno
== 0) || (errno
== ENOENT
&& Exists(path
))) {
505 return Status::IOError("HDFS list directory failed, errno: ",
506 TranslateErrno(errno
));
510 // Allocate additional space for elements
511 int vec_offset
= static_cast<int>(listing
->size());
512 listing
->resize(vec_offset
+ num_entries
);
514 for (int i
= 0; i
< num_entries
; ++i
) {
515 SetPathInfo(entries
+ i
, &(*listing
)[vec_offset
+ i
]);
518 // Free libhdfs file info
519 driver_
->FreeFileInfo(entries
, num_entries
);
524 Status
OpenReadable(const std::string
& path
, int32_t buffer_size
,
525 const io::IOContext
& io_context
,
526 std::shared_ptr
<HdfsReadableFile
>* file
) {
528 hdfsFile handle
= driver_
->OpenFile(fs_
, path
.c_str(), O_RDONLY
, buffer_size
, 0, 0);
530 if (handle
== nullptr) {
532 return IOErrorFromErrno(errno
, "Opening HDFS file '", path
, "' failed");
534 return Status::IOError("Opening HDFS file '", path
, "' failed");
538 // std::make_shared does not work with private ctors
539 *file
= std::shared_ptr
<HdfsReadableFile
>(new HdfsReadableFile(io_context
));
540 (*file
)->impl_
->set_members(path
, driver_
, fs_
, handle
);
541 (*file
)->impl_
->set_buffer_size(buffer_size
);
546 Status
OpenWritable(const std::string
& path
, bool append
, int32_t buffer_size
,
547 int16_t replication
, int64_t default_block_size
,
548 std::shared_ptr
<HdfsOutputStream
>* file
) {
549 int flags
= O_WRONLY
;
550 if (append
) flags
|= O_APPEND
;
554 driver_
->OpenFile(fs_
, path
.c_str(), flags
, buffer_size
, replication
,
555 static_cast<tSize
>(default_block_size
));
557 if (handle
== nullptr) {
559 return IOErrorFromErrno(errno
, "Opening HDFS file '", path
, "' failed");
561 return Status::IOError("Opening HDFS file '", path
, "' failed");
565 // std::make_shared does not work with private ctors
566 *file
= std::shared_ptr
<HdfsOutputStream
>(new HdfsOutputStream());
567 (*file
)->impl_
->set_members(path
, driver_
, fs_
, handle
);
572 Status
Rename(const std::string
& src
, const std::string
& dst
) {
573 int ret
= driver_
->Rename(fs_
, src
.c_str(), dst
.c_str());
574 CHECK_FAILURE(ret
, "Rename");
578 Status
Copy(const std::string
& src
, const std::string
& dst
) {
579 int ret
= driver_
->Copy(fs_
, src
.c_str(), fs_
, dst
.c_str());
580 CHECK_FAILURE(ret
, "Rename");
584 Status
Move(const std::string
& src
, const std::string
& dst
) {
585 int ret
= driver_
->Move(fs_
, src
.c_str(), fs_
, dst
.c_str());
586 CHECK_FAILURE(ret
, "Rename");
590 Status
Chmod(const std::string
& path
, int mode
) {
591 int ret
= driver_
->Chmod(fs_
, path
.c_str(), static_cast<short>(mode
)); // NOLINT
592 CHECK_FAILURE(ret
, "Chmod");
596 Status
Chown(const std::string
& path
, const char* owner
, const char* group
) {
597 int ret
= driver_
->Chown(fs_
, path
.c_str(), owner
, group
);
598 CHECK_FAILURE(ret
, "Chown");
603 internal::LibHdfsShim
* driver_
;
605 std::string namenode_host_
;
608 std::string kerb_ticket_
;
613 // ----------------------------------------------------------------------
614 // Public API for HDFSClient
616 HadoopFileSystem::HadoopFileSystem() { impl_
.reset(new HadoopFileSystemImpl()); }
618 HadoopFileSystem::~HadoopFileSystem() {}
620 Status
HadoopFileSystem::Connect(const HdfsConnectionConfig
* config
,
621 std::shared_ptr
<HadoopFileSystem
>* fs
) {
622 // ctor is private, make_shared will not work
623 *fs
= std::shared_ptr
<HadoopFileSystem
>(new HadoopFileSystem());
625 RETURN_NOT_OK((*fs
)->impl_
->Connect(config
));
629 Status
HadoopFileSystem::MakeDirectory(const std::string
& path
) {
630 return impl_
->MakeDirectory(path
);
633 Status
HadoopFileSystem::Delete(const std::string
& path
, bool recursive
) {
634 return impl_
->Delete(path
, recursive
);
637 Status
HadoopFileSystem::DeleteDirectory(const std::string
& path
) {
638 return Delete(path
, true);
641 Status
HadoopFileSystem::Disconnect() { return impl_
->Disconnect(); }
643 bool HadoopFileSystem::Exists(const std::string
& path
) { return impl_
->Exists(path
); }
645 Status
HadoopFileSystem::GetPathInfo(const std::string
& path
, HdfsPathInfo
* info
) {
646 return impl_
->GetPathInfo(path
, info
);
649 Status
HadoopFileSystem::Stat(const std::string
& path
, FileStatistics
* stat
) {
650 return impl_
->Stat(path
, stat
);
653 Status
HadoopFileSystem::GetCapacity(int64_t* nbytes
) {
654 return impl_
->GetCapacity(nbytes
);
657 Status
HadoopFileSystem::GetUsed(int64_t* nbytes
) { return impl_
->GetUsed(nbytes
); }
659 Status
HadoopFileSystem::GetWorkingDirectory(std::string
* out
) {
660 return impl_
->GetWorkingDirectory(out
);
663 Status
HadoopFileSystem::GetChildren(const std::string
& path
,
664 std::vector
<std::string
>* listing
) {
665 return impl_
->GetChildren(path
, listing
);
668 Status
HadoopFileSystem::ListDirectory(const std::string
& path
,
669 std::vector
<HdfsPathInfo
>* listing
) {
670 return impl_
->ListDirectory(path
, listing
);
673 Status
HadoopFileSystem::OpenReadable(const std::string
& path
, int32_t buffer_size
,
674 std::shared_ptr
<HdfsReadableFile
>* file
) {
675 return impl_
->OpenReadable(path
, buffer_size
, io::default_io_context(), file
);
678 Status
HadoopFileSystem::OpenReadable(const std::string
& path
,
679 std::shared_ptr
<HdfsReadableFile
>* file
) {
680 return OpenReadable(path
, kDefaultHdfsBufferSize
, io::default_io_context(), file
);
683 Status
HadoopFileSystem::OpenReadable(const std::string
& path
, int32_t buffer_size
,
684 const io::IOContext
& io_context
,
685 std::shared_ptr
<HdfsReadableFile
>* file
) {
686 return impl_
->OpenReadable(path
, buffer_size
, io_context
, file
);
689 Status
HadoopFileSystem::OpenReadable(const std::string
& path
,
690 const io::IOContext
& io_context
,
691 std::shared_ptr
<HdfsReadableFile
>* file
) {
692 return OpenReadable(path
, kDefaultHdfsBufferSize
, io_context
, file
);
695 Status
HadoopFileSystem::OpenWritable(const std::string
& path
, bool append
,
696 int32_t buffer_size
, int16_t replication
,
697 int64_t default_block_size
,
698 std::shared_ptr
<HdfsOutputStream
>* file
) {
699 return impl_
->OpenWritable(path
, append
, buffer_size
, replication
, default_block_size
,
703 Status
HadoopFileSystem::OpenWritable(const std::string
& path
, bool append
,
704 std::shared_ptr
<HdfsOutputStream
>* file
) {
705 return OpenWritable(path
, append
, 0, 0, 0, file
);
708 Status
HadoopFileSystem::Chmod(const std::string
& path
, int mode
) {
709 return impl_
->Chmod(path
, mode
);
712 Status
HadoopFileSystem::Chown(const std::string
& path
, const char* owner
,
714 return impl_
->Chown(path
, owner
, group
);
717 Status
HadoopFileSystem::Rename(const std::string
& src
, const std::string
& dst
) {
718 return impl_
->Rename(src
, dst
);
721 Status
HadoopFileSystem::Copy(const std::string
& src
, const std::string
& dst
) {
722 return impl_
->Copy(src
, dst
);
725 Status
HadoopFileSystem::Move(const std::string
& src
, const std::string
& dst
) {
726 return impl_
->Move(src
, dst
);
729 // ----------------------------------------------------------------------
730 // Allow public API users to check whether we are set up correctly
732 Status
HaveLibHdfs() {
733 internal::LibHdfsShim
* driver
;
734 return internal::ConnectLibHdfs(&driver
);