]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/io/hdfs.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / hdfs.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#include <algorithm>
19#include <cerrno>
20#include <cstdint>
21#include <cstring>
22#include <limits>
23#include <memory>
24#include <mutex>
25#include <sstream>
26#include <string>
27#include <unordered_map>
28#include <utility>
29#include <vector>
30
31#include "arrow/buffer.h"
32#include "arrow/io/hdfs.h"
33#include "arrow/io/hdfs_internal.h"
34#include "arrow/io/interfaces.h"
35#include "arrow/memory_pool.h"
36#include "arrow/result.h"
37#include "arrow/status.h"
38#include "arrow/util/io_util.h"
39#include "arrow/util/logging.h"
40
41using std::size_t;
42
43namespace arrow {
44
45using internal::IOErrorFromErrno;
46
47namespace io {
48
49namespace {
50
51std::string TranslateErrno(int error_code) {
52 std::stringstream ss;
53 ss << error_code << " (" << strerror(error_code) << ")";
54 if (error_code == 255) {
55 // Unknown error can occur if the host is correct but the port is not
56 ss << " Please check that you are connecting to the correct HDFS RPC port";
57 }
58 return ss.str();
59}
60
61} // namespace
62
63#define CHECK_FAILURE(RETURN_VALUE, WHAT) \
64 do { \
65 if (RETURN_VALUE == -1) { \
66 return Status::IOError("HDFS ", WHAT, " failed, errno: ", TranslateErrno(errno)); \
67 } \
68 } while (0)
69
70static constexpr int kDefaultHdfsBufferSize = 1 << 16;
71
72// ----------------------------------------------------------------------
73// File reading
74
75class HdfsAnyFileImpl {
76 public:
77 void set_members(const std::string& path, internal::LibHdfsShim* driver, hdfsFS fs,
78 hdfsFile handle) {
79 path_ = path;
80 driver_ = driver;
81 fs_ = fs;
82 file_ = handle;
83 is_open_ = true;
84 }
85
86 Status Seek(int64_t position) {
87 RETURN_NOT_OK(CheckClosed());
88 int ret = driver_->Seek(fs_, file_, position);
89 CHECK_FAILURE(ret, "seek");
90 return Status::OK();
91 }
92
93 Result<int64_t> Tell() {
94 RETURN_NOT_OK(CheckClosed());
95 int64_t ret = driver_->Tell(fs_, file_);
96 CHECK_FAILURE(ret, "tell");
97 return ret;
98 }
99
100 bool is_open() const { return is_open_; }
101
102 protected:
103 Status CheckClosed() {
104 if (!is_open_) {
105 return Status::Invalid("Operation on closed HDFS file");
106 }
107 return Status::OK();
108 }
109
110 std::string path_;
111
112 internal::LibHdfsShim* driver_;
113
114 // For threadsafety
115 std::mutex lock_;
116
117 // These are pointers in libhdfs, so OK to copy
118 hdfsFS fs_;
119 hdfsFile file_;
120
121 bool is_open_;
122};
123
124namespace {
125
126Status GetPathInfoFailed(const std::string& path) {
127 std::stringstream ss;
128 ss << "Calling GetPathInfo for " << path << " failed. errno: " << TranslateErrno(errno);
129 return Status::IOError(ss.str());
130}
131
132} // namespace
133
134// Private implementation for read-only files
135class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
136 public:
137 explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {}
138
139 Status Close() {
140 if (is_open_) {
141 // is_open_ must be set to false in the beginning, because the destructor
142 // attempts to close the stream again, and if the first close fails, then
143 // the error doesn't get propagated properly and the second close
144 // initiated by the destructor raises a segfault
145 is_open_ = false;
146 int ret = driver_->CloseFile(fs_, file_);
147 CHECK_FAILURE(ret, "CloseFile");
148 }
149 return Status::OK();
150 }
151
152 bool closed() const { return !is_open_; }
153
154 Result<int64_t> ReadAt(int64_t position, int64_t nbytes, uint8_t* buffer) {
155 RETURN_NOT_OK(CheckClosed());
156 if (!driver_->HasPread()) {
157 std::lock_guard<std::mutex> guard(lock_);
158 RETURN_NOT_OK(Seek(position));
159 return Read(nbytes, buffer);
160 }
161
162 constexpr int64_t kMaxBlockSize = std::numeric_limits<int32_t>::max();
163 int64_t total_bytes = 0;
164 while (nbytes > 0) {
165 const auto block_size = static_cast<tSize>(std::min(kMaxBlockSize, nbytes));
166 tSize ret =
167 driver_->Pread(fs_, file_, static_cast<tOffset>(position), buffer, block_size);
168 CHECK_FAILURE(ret, "read");
169 DCHECK_LE(ret, block_size);
170 if (ret == 0) {
171 break; // EOF
172 }
173 buffer += ret;
174 total_bytes += ret;
175 position += ret;
176 nbytes -= ret;
177 }
178 return total_bytes;
179 }
180
181 Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) {
182 RETURN_NOT_OK(CheckClosed());
183
184 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
185 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
186 ReadAt(position, nbytes, buffer->mutable_data()));
187 if (bytes_read < nbytes) {
188 RETURN_NOT_OK(buffer->Resize(bytes_read));
189 buffer->ZeroPadding();
190 }
191 return std::move(buffer);
192 }
193
194 Result<int64_t> Read(int64_t nbytes, void* buffer) {
195 RETURN_NOT_OK(CheckClosed());
196
197 int64_t total_bytes = 0;
198 while (total_bytes < nbytes) {
199 tSize ret = driver_->Read(
200 fs_, file_, reinterpret_cast<uint8_t*>(buffer) + total_bytes,
201 static_cast<tSize>(std::min<int64_t>(buffer_size_, nbytes - total_bytes)));
202 CHECK_FAILURE(ret, "read");
203 total_bytes += ret;
204 if (ret == 0) {
205 break;
206 }
207 }
208 return total_bytes;
209 }
210
211 Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) {
212 RETURN_NOT_OK(CheckClosed());
213
214 ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
215 ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()));
216 if (bytes_read < nbytes) {
217 RETURN_NOT_OK(buffer->Resize(bytes_read));
218 }
219 return std::move(buffer);
220 }
221
222 Result<int64_t> GetSize() {
223 RETURN_NOT_OK(CheckClosed());
224
225 hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str());
226 if (entry == nullptr) {
227 return GetPathInfoFailed(path_);
228 }
229 int64_t size = entry->mSize;
230 driver_->FreeFileInfo(entry, 1);
231 return size;
232 }
233
234 void set_memory_pool(MemoryPool* pool) { pool_ = pool; }
235
236 void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; }
237
238 private:
239 MemoryPool* pool_;
240 int32_t buffer_size_;
241};
242
243HdfsReadableFile::HdfsReadableFile(const io::IOContext& io_context) {
244 impl_.reset(new HdfsReadableFileImpl(io_context.pool()));
245}
246
247HdfsReadableFile::~HdfsReadableFile() { DCHECK_OK(impl_->Close()); }
248
249Status HdfsReadableFile::Close() { return impl_->Close(); }
250
251bool HdfsReadableFile::closed() const { return impl_->closed(); }
252
253Result<int64_t> HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, void* buffer) {
254 return impl_->ReadAt(position, nbytes, reinterpret_cast<uint8_t*>(buffer));
255}
256
257Result<std::shared_ptr<Buffer>> HdfsReadableFile::ReadAt(int64_t position,
258 int64_t nbytes) {
259 return impl_->ReadAt(position, nbytes);
260}
261
262Result<int64_t> HdfsReadableFile::Read(int64_t nbytes, void* buffer) {
263 return impl_->Read(nbytes, buffer);
264}
265
266Result<std::shared_ptr<Buffer>> HdfsReadableFile::Read(int64_t nbytes) {
267 return impl_->Read(nbytes);
268}
269
270Result<int64_t> HdfsReadableFile::GetSize() { return impl_->GetSize(); }
271
272Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); }
273
274Result<int64_t> HdfsReadableFile::Tell() const { return impl_->Tell(); }
275
276// ----------------------------------------------------------------------
277// File writing
278
279// Private implementation for writable-only files
280class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl {
281 public:
282 HdfsOutputStreamImpl() {}
283
284 Status Close() {
285 if (is_open_) {
286 // is_open_ must be set to false in the beginning, because the destructor
287 // attempts to close the stream again, and if the first close fails, then
288 // the error doesn't get propagated properly and the second close
289 // initiated by the destructor raises a segfault
290 is_open_ = false;
291 RETURN_NOT_OK(FlushInternal());
292 int ret = driver_->CloseFile(fs_, file_);
293 CHECK_FAILURE(ret, "CloseFile");
294 }
295 return Status::OK();
296 }
297
298 bool closed() const { return !is_open_; }
299
300 Status Flush() {
301 RETURN_NOT_OK(CheckClosed());
302
303 return FlushInternal();
304 }
305
306 Status Write(const uint8_t* buffer, int64_t nbytes) {
307 RETURN_NOT_OK(CheckClosed());
308
309 constexpr int64_t kMaxBlockSize = std::numeric_limits<int32_t>::max();
310
311 std::lock_guard<std::mutex> guard(lock_);
312 while (nbytes > 0) {
313 const auto block_size = static_cast<tSize>(std::min(kMaxBlockSize, nbytes));
314 tSize ret = driver_->Write(fs_, file_, buffer, block_size);
315 CHECK_FAILURE(ret, "Write");
316 DCHECK_LE(ret, block_size);
317 buffer += ret;
318 nbytes -= ret;
319 }
320 return Status::OK();
321 }
322
323 protected:
324 Status FlushInternal() {
325 int ret = driver_->Flush(fs_, file_);
326 CHECK_FAILURE(ret, "Flush");
327 return Status::OK();
328 }
329};
330
331HdfsOutputStream::HdfsOutputStream() { impl_.reset(new HdfsOutputStreamImpl()); }
332
333HdfsOutputStream::~HdfsOutputStream() { DCHECK_OK(impl_->Close()); }
334
335Status HdfsOutputStream::Close() { return impl_->Close(); }
336
337bool HdfsOutputStream::closed() const { return impl_->closed(); }
338
339Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes) {
340 return impl_->Write(reinterpret_cast<const uint8_t*>(buffer), nbytes);
341}
342
343Status HdfsOutputStream::Flush() { return impl_->Flush(); }
344
345Result<int64_t> HdfsOutputStream::Tell() const { return impl_->Tell(); }
346
347// ----------------------------------------------------------------------
348// HDFS client
349
350// TODO(wesm): this could throw std::bad_alloc in the course of copying strings
351// into the path info object
352static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) {
353 out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY;
354 out->name = std::string(input->mName);
355 out->owner = std::string(input->mOwner);
356 out->group = std::string(input->mGroup);
357
358 out->last_access_time = static_cast<int32_t>(input->mLastAccess);
359 out->last_modified_time = static_cast<int32_t>(input->mLastMod);
360 out->size = static_cast<int64_t>(input->mSize);
361
362 out->replication = input->mReplication;
363 out->block_size = input->mBlockSize;
364
365 out->permissions = input->mPermissions;
366}
367
368// Private implementation
369class HadoopFileSystem::HadoopFileSystemImpl {
370 public:
371 HadoopFileSystemImpl() : driver_(NULLPTR), port_(0), fs_(NULLPTR) {}
372
373 Status Connect(const HdfsConnectionConfig* config) {
374 RETURN_NOT_OK(ConnectLibHdfs(&driver_));
375
376 // connect to HDFS with the builder object
377 hdfsBuilder* builder = driver_->NewBuilder();
378 if (!config->host.empty()) {
379 driver_->BuilderSetNameNode(builder, config->host.c_str());
380 }
381 driver_->BuilderSetNameNodePort(builder, static_cast<tPort>(config->port));
382 if (!config->user.empty()) {
383 driver_->BuilderSetUserName(builder, config->user.c_str());
384 }
385 if (!config->kerb_ticket.empty()) {
386 driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str());
387 }
388
389 for (const auto& kv : config->extra_conf) {
390 int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str());
391 CHECK_FAILURE(ret, "confsetstr");
392 }
393
394 driver_->BuilderSetForceNewInstance(builder);
395 fs_ = driver_->BuilderConnect(builder);
396
397 if (fs_ == nullptr) {
398 return Status::IOError("HDFS connection failed");
399 }
400 namenode_host_ = config->host;
401 port_ = config->port;
402 user_ = config->user;
403 kerb_ticket_ = config->kerb_ticket;
404
405 return Status::OK();
406 }
407
408 Status MakeDirectory(const std::string& path) {
409 int ret = driver_->MakeDirectory(fs_, path.c_str());
410 CHECK_FAILURE(ret, "create directory");
411 return Status::OK();
412 }
413
414 Status Delete(const std::string& path, bool recursive) {
415 int ret = driver_->Delete(fs_, path.c_str(), static_cast<int>(recursive));
416 CHECK_FAILURE(ret, "delete");
417 return Status::OK();
418 }
419
420 Status Disconnect() {
421 int ret = driver_->Disconnect(fs_);
422 CHECK_FAILURE(ret, "hdfsFS::Disconnect");
423 return Status::OK();
424 }
425
426 bool Exists(const std::string& path) {
427 // hdfsExists does not distinguish between RPC failure and the file not
428 // existing
429 int ret = driver_->Exists(fs_, path.c_str());
430 return ret == 0;
431 }
432
433 Status GetCapacity(int64_t* nbytes) {
434 tOffset ret = driver_->GetCapacity(fs_);
435 CHECK_FAILURE(ret, "GetCapacity");
436 *nbytes = ret;
437 return Status::OK();
438 }
439
440 Status GetUsed(int64_t* nbytes) {
441 tOffset ret = driver_->GetUsed(fs_);
442 CHECK_FAILURE(ret, "GetUsed");
443 *nbytes = ret;
444 return Status::OK();
445 }
446
447 Status GetWorkingDirectory(std::string* out) {
448 char buffer[2048];
449 if (driver_->GetWorkingDirectory(fs_, buffer, sizeof(buffer) - 1) == nullptr) {
450 return Status::IOError("HDFS GetWorkingDirectory failed, errno: ",
451 TranslateErrno(errno));
452 }
453 *out = buffer;
454 return Status::OK();
455 }
456
457 Status GetPathInfo(const std::string& path, HdfsPathInfo* info) {
458 hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str());
459
460 if (entry == nullptr) {
461 return GetPathInfoFailed(path);
462 }
463
464 SetPathInfo(entry, info);
465 driver_->FreeFileInfo(entry, 1);
466
467 return Status::OK();
468 }
469
470 Status Stat(const std::string& path, FileStatistics* stat) {
471 HdfsPathInfo info;
472 RETURN_NOT_OK(GetPathInfo(path, &info));
473
474 stat->size = info.size;
475 stat->kind = info.kind;
476 return Status::OK();
477 }
478
479 Status GetChildren(const std::string& path, std::vector<std::string>* listing) {
480 std::vector<HdfsPathInfo> detailed_listing;
481 RETURN_NOT_OK(ListDirectory(path, &detailed_listing));
482 for (const auto& info : detailed_listing) {
483 listing->push_back(info.name);
484 }
485 return Status::OK();
486 }
487
488 Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) {
489 int num_entries = 0;
490 errno = 0;
491 hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries);
492
493 if (entries == nullptr) {
494 // If the directory is empty, entries is NULL but errno is 0. Non-zero
495 // errno indicates error
496 //
497 // Note: errno is thread-local
498 //
499 // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set
500 // errno 2/ENOENT for empty directories. To be more robust to this we
501 // double check this case
502 if ((errno == 0) || (errno == ENOENT && Exists(path))) {
503 num_entries = 0;
504 } else {
505 return Status::IOError("HDFS list directory failed, errno: ",
506 TranslateErrno(errno));
507 }
508 }
509
510 // Allocate additional space for elements
511 int vec_offset = static_cast<int>(listing->size());
512 listing->resize(vec_offset + num_entries);
513
514 for (int i = 0; i < num_entries; ++i) {
515 SetPathInfo(entries + i, &(*listing)[vec_offset + i]);
516 }
517
518 // Free libhdfs file info
519 driver_->FreeFileInfo(entries, num_entries);
520
521 return Status::OK();
522 }
523
524 Status OpenReadable(const std::string& path, int32_t buffer_size,
525 const io::IOContext& io_context,
526 std::shared_ptr<HdfsReadableFile>* file) {
527 errno = 0;
528 hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0);
529
530 if (handle == nullptr) {
531 if (errno) {
532 return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed");
533 } else {
534 return Status::IOError("Opening HDFS file '", path, "' failed");
535 }
536 }
537
538 // std::make_shared does not work with private ctors
539 *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile(io_context));
540 (*file)->impl_->set_members(path, driver_, fs_, handle);
541 (*file)->impl_->set_buffer_size(buffer_size);
542
543 return Status::OK();
544 }
545
546 Status OpenWritable(const std::string& path, bool append, int32_t buffer_size,
547 int16_t replication, int64_t default_block_size,
548 std::shared_ptr<HdfsOutputStream>* file) {
549 int flags = O_WRONLY;
550 if (append) flags |= O_APPEND;
551
552 errno = 0;
553 hdfsFile handle =
554 driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, replication,
555 static_cast<tSize>(default_block_size));
556
557 if (handle == nullptr) {
558 if (errno) {
559 return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed");
560 } else {
561 return Status::IOError("Opening HDFS file '", path, "' failed");
562 }
563 }
564
565 // std::make_shared does not work with private ctors
566 *file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream());
567 (*file)->impl_->set_members(path, driver_, fs_, handle);
568
569 return Status::OK();
570 }
571
572 Status Rename(const std::string& src, const std::string& dst) {
573 int ret = driver_->Rename(fs_, src.c_str(), dst.c_str());
574 CHECK_FAILURE(ret, "Rename");
575 return Status::OK();
576 }
577
578 Status Copy(const std::string& src, const std::string& dst) {
579 int ret = driver_->Copy(fs_, src.c_str(), fs_, dst.c_str());
580 CHECK_FAILURE(ret, "Rename");
581 return Status::OK();
582 }
583
584 Status Move(const std::string& src, const std::string& dst) {
585 int ret = driver_->Move(fs_, src.c_str(), fs_, dst.c_str());
586 CHECK_FAILURE(ret, "Rename");
587 return Status::OK();
588 }
589
590 Status Chmod(const std::string& path, int mode) {
591 int ret = driver_->Chmod(fs_, path.c_str(), static_cast<short>(mode)); // NOLINT
592 CHECK_FAILURE(ret, "Chmod");
593 return Status::OK();
594 }
595
596 Status Chown(const std::string& path, const char* owner, const char* group) {
597 int ret = driver_->Chown(fs_, path.c_str(), owner, group);
598 CHECK_FAILURE(ret, "Chown");
599 return Status::OK();
600 }
601
602 private:
603 internal::LibHdfsShim* driver_;
604
605 std::string namenode_host_;
606 std::string user_;
607 int port_;
608 std::string kerb_ticket_;
609
610 hdfsFS fs_;
611};
612
613// ----------------------------------------------------------------------
614// Public API for HDFSClient
615
616HadoopFileSystem::HadoopFileSystem() { impl_.reset(new HadoopFileSystemImpl()); }
617
618HadoopFileSystem::~HadoopFileSystem() {}
619
620Status HadoopFileSystem::Connect(const HdfsConnectionConfig* config,
621 std::shared_ptr<HadoopFileSystem>* fs) {
622 // ctor is private, make_shared will not work
623 *fs = std::shared_ptr<HadoopFileSystem>(new HadoopFileSystem());
624
625 RETURN_NOT_OK((*fs)->impl_->Connect(config));
626 return Status::OK();
627}
628
629Status HadoopFileSystem::MakeDirectory(const std::string& path) {
630 return impl_->MakeDirectory(path);
631}
632
633Status HadoopFileSystem::Delete(const std::string& path, bool recursive) {
634 return impl_->Delete(path, recursive);
635}
636
637Status HadoopFileSystem::DeleteDirectory(const std::string& path) {
638 return Delete(path, true);
639}
640
641Status HadoopFileSystem::Disconnect() { return impl_->Disconnect(); }
642
643bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); }
644
645Status HadoopFileSystem::GetPathInfo(const std::string& path, HdfsPathInfo* info) {
646 return impl_->GetPathInfo(path, info);
647}
648
649Status HadoopFileSystem::Stat(const std::string& path, FileStatistics* stat) {
650 return impl_->Stat(path, stat);
651}
652
653Status HadoopFileSystem::GetCapacity(int64_t* nbytes) {
654 return impl_->GetCapacity(nbytes);
655}
656
657Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); }
658
659Status HadoopFileSystem::GetWorkingDirectory(std::string* out) {
660 return impl_->GetWorkingDirectory(out);
661}
662
663Status HadoopFileSystem::GetChildren(const std::string& path,
664 std::vector<std::string>* listing) {
665 return impl_->GetChildren(path, listing);
666}
667
668Status HadoopFileSystem::ListDirectory(const std::string& path,
669 std::vector<HdfsPathInfo>* listing) {
670 return impl_->ListDirectory(path, listing);
671}
672
673Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size,
674 std::shared_ptr<HdfsReadableFile>* file) {
675 return impl_->OpenReadable(path, buffer_size, io::default_io_context(), file);
676}
677
678Status HadoopFileSystem::OpenReadable(const std::string& path,
679 std::shared_ptr<HdfsReadableFile>* file) {
680 return OpenReadable(path, kDefaultHdfsBufferSize, io::default_io_context(), file);
681}
682
683Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size,
684 const io::IOContext& io_context,
685 std::shared_ptr<HdfsReadableFile>* file) {
686 return impl_->OpenReadable(path, buffer_size, io_context, file);
687}
688
689Status HadoopFileSystem::OpenReadable(const std::string& path,
690 const io::IOContext& io_context,
691 std::shared_ptr<HdfsReadableFile>* file) {
692 return OpenReadable(path, kDefaultHdfsBufferSize, io_context, file);
693}
694
695Status HadoopFileSystem::OpenWritable(const std::string& path, bool append,
696 int32_t buffer_size, int16_t replication,
697 int64_t default_block_size,
698 std::shared_ptr<HdfsOutputStream>* file) {
699 return impl_->OpenWritable(path, append, buffer_size, replication, default_block_size,
700 file);
701}
702
703Status HadoopFileSystem::OpenWritable(const std::string& path, bool append,
704 std::shared_ptr<HdfsOutputStream>* file) {
705 return OpenWritable(path, append, 0, 0, 0, file);
706}
707
708Status HadoopFileSystem::Chmod(const std::string& path, int mode) {
709 return impl_->Chmod(path, mode);
710}
711
712Status HadoopFileSystem::Chown(const std::string& path, const char* owner,
713 const char* group) {
714 return impl_->Chown(path, owner, group);
715}
716
717Status HadoopFileSystem::Rename(const std::string& src, const std::string& dst) {
718 return impl_->Rename(src, dst);
719}
720
721Status HadoopFileSystem::Copy(const std::string& src, const std::string& dst) {
722 return impl_->Copy(src, dst);
723}
724
725Status HadoopFileSystem::Move(const std::string& src, const std::string& dst) {
726 return impl_->Move(src, dst);
727}
728
729// ----------------------------------------------------------------------
730// Allow public API users to check whether we are set up correctly
731
732Status HaveLibHdfs() {
733 internal::LibHdfsShim* driver;
734 return internal::ConnectLibHdfs(&driver);
735}
736
737} // namespace io
738} // namespace arrow