]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file | |
3 | // distributed with this work for additional information | |
4 | // regarding copyright ownership. The ASF licenses this file | |
5 | // to you under the Apache License, Version 2.0 (the | |
6 | // "License"); you may not use this file except in compliance | |
7 | // with the License. You may obtain a copy of the License at | |
8 | // | |
9 | // http://www.apache.org/licenses/LICENSE-2.0 | |
10 | // | |
11 | // Unless required by applicable law or agreed to in writing, | |
12 | // software distributed under the License is distributed on an | |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | // KIND, either express or implied. See the License for the | |
15 | // specific language governing permissions and limitations | |
16 | // under the License. | |
17 | ||
18 | #include <algorithm> | |
19 | #include <cerrno> | |
20 | #include <cstdint> | |
21 | #include <cstring> | |
22 | #include <limits> | |
23 | #include <memory> | |
24 | #include <mutex> | |
25 | #include <sstream> | |
26 | #include <string> | |
27 | #include <unordered_map> | |
28 | #include <utility> | |
29 | #include <vector> | |
30 | ||
31 | #include "arrow/buffer.h" | |
32 | #include "arrow/io/hdfs.h" | |
33 | #include "arrow/io/hdfs_internal.h" | |
34 | #include "arrow/io/interfaces.h" | |
35 | #include "arrow/memory_pool.h" | |
36 | #include "arrow/result.h" | |
37 | #include "arrow/status.h" | |
38 | #include "arrow/util/io_util.h" | |
39 | #include "arrow/util/logging.h" | |
40 | ||
41 | using std::size_t; | |
42 | ||
43 | namespace arrow { | |
44 | ||
45 | using internal::IOErrorFromErrno; | |
46 | ||
47 | namespace io { | |
48 | ||
49 | namespace { | |
50 | ||
51 | std::string TranslateErrno(int error_code) { | |
52 | std::stringstream ss; | |
53 | ss << error_code << " (" << strerror(error_code) << ")"; | |
54 | if (error_code == 255) { | |
55 | // Unknown error can occur if the host is correct but the port is not | |
56 | ss << " Please check that you are connecting to the correct HDFS RPC port"; | |
57 | } | |
58 | return ss.str(); | |
59 | } | |
60 | ||
61 | } // namespace | |
62 | ||
63 | #define CHECK_FAILURE(RETURN_VALUE, WHAT) \ | |
64 | do { \ | |
65 | if (RETURN_VALUE == -1) { \ | |
66 | return Status::IOError("HDFS ", WHAT, " failed, errno: ", TranslateErrno(errno)); \ | |
67 | } \ | |
68 | } while (0) | |
69 | ||
70 | static constexpr int kDefaultHdfsBufferSize = 1 << 16; | |
71 | ||
72 | // ---------------------------------------------------------------------- | |
73 | // File reading | |
74 | ||
75 | class HdfsAnyFileImpl { | |
76 | public: | |
77 | void set_members(const std::string& path, internal::LibHdfsShim* driver, hdfsFS fs, | |
78 | hdfsFile handle) { | |
79 | path_ = path; | |
80 | driver_ = driver; | |
81 | fs_ = fs; | |
82 | file_ = handle; | |
83 | is_open_ = true; | |
84 | } | |
85 | ||
86 | Status Seek(int64_t position) { | |
87 | RETURN_NOT_OK(CheckClosed()); | |
88 | int ret = driver_->Seek(fs_, file_, position); | |
89 | CHECK_FAILURE(ret, "seek"); | |
90 | return Status::OK(); | |
91 | } | |
92 | ||
93 | Result<int64_t> Tell() { | |
94 | RETURN_NOT_OK(CheckClosed()); | |
95 | int64_t ret = driver_->Tell(fs_, file_); | |
96 | CHECK_FAILURE(ret, "tell"); | |
97 | return ret; | |
98 | } | |
99 | ||
100 | bool is_open() const { return is_open_; } | |
101 | ||
102 | protected: | |
103 | Status CheckClosed() { | |
104 | if (!is_open_) { | |
105 | return Status::Invalid("Operation on closed HDFS file"); | |
106 | } | |
107 | return Status::OK(); | |
108 | } | |
109 | ||
110 | std::string path_; | |
111 | ||
112 | internal::LibHdfsShim* driver_; | |
113 | ||
114 | // For threadsafety | |
115 | std::mutex lock_; | |
116 | ||
117 | // These are pointers in libhdfs, so OK to copy | |
118 | hdfsFS fs_; | |
119 | hdfsFile file_; | |
120 | ||
121 | bool is_open_; | |
122 | }; | |
123 | ||
124 | namespace { | |
125 | ||
126 | Status GetPathInfoFailed(const std::string& path) { | |
127 | std::stringstream ss; | |
128 | ss << "Calling GetPathInfo for " << path << " failed. errno: " << TranslateErrno(errno); | |
129 | return Status::IOError(ss.str()); | |
130 | } | |
131 | ||
132 | } // namespace | |
133 | ||
134 | // Private implementation for read-only files | |
135 | class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { | |
136 | public: | |
137 | explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {} | |
138 | ||
139 | Status Close() { | |
140 | if (is_open_) { | |
141 | // is_open_ must be set to false in the beginning, because the destructor | |
142 | // attempts to close the stream again, and if the first close fails, then | |
143 | // the error doesn't get propagated properly and the second close | |
144 | // initiated by the destructor raises a segfault | |
145 | is_open_ = false; | |
146 | int ret = driver_->CloseFile(fs_, file_); | |
147 | CHECK_FAILURE(ret, "CloseFile"); | |
148 | } | |
149 | return Status::OK(); | |
150 | } | |
151 | ||
152 | bool closed() const { return !is_open_; } | |
153 | ||
154 | Result<int64_t> ReadAt(int64_t position, int64_t nbytes, uint8_t* buffer) { | |
155 | RETURN_NOT_OK(CheckClosed()); | |
156 | if (!driver_->HasPread()) { | |
157 | std::lock_guard<std::mutex> guard(lock_); | |
158 | RETURN_NOT_OK(Seek(position)); | |
159 | return Read(nbytes, buffer); | |
160 | } | |
161 | ||
162 | constexpr int64_t kMaxBlockSize = std::numeric_limits<int32_t>::max(); | |
163 | int64_t total_bytes = 0; | |
164 | while (nbytes > 0) { | |
165 | const auto block_size = static_cast<tSize>(std::min(kMaxBlockSize, nbytes)); | |
166 | tSize ret = | |
167 | driver_->Pread(fs_, file_, static_cast<tOffset>(position), buffer, block_size); | |
168 | CHECK_FAILURE(ret, "read"); | |
169 | DCHECK_LE(ret, block_size); | |
170 | if (ret == 0) { | |
171 | break; // EOF | |
172 | } | |
173 | buffer += ret; | |
174 | total_bytes += ret; | |
175 | position += ret; | |
176 | nbytes -= ret; | |
177 | } | |
178 | return total_bytes; | |
179 | } | |
180 | ||
181 | Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) { | |
182 | RETURN_NOT_OK(CheckClosed()); | |
183 | ||
184 | ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); | |
185 | ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, | |
186 | ReadAt(position, nbytes, buffer->mutable_data())); | |
187 | if (bytes_read < nbytes) { | |
188 | RETURN_NOT_OK(buffer->Resize(bytes_read)); | |
189 | buffer->ZeroPadding(); | |
190 | } | |
191 | return std::move(buffer); | |
192 | } | |
193 | ||
194 | Result<int64_t> Read(int64_t nbytes, void* buffer) { | |
195 | RETURN_NOT_OK(CheckClosed()); | |
196 | ||
197 | int64_t total_bytes = 0; | |
198 | while (total_bytes < nbytes) { | |
199 | tSize ret = driver_->Read( | |
200 | fs_, file_, reinterpret_cast<uint8_t*>(buffer) + total_bytes, | |
201 | static_cast<tSize>(std::min<int64_t>(buffer_size_, nbytes - total_bytes))); | |
202 | CHECK_FAILURE(ret, "read"); | |
203 | total_bytes += ret; | |
204 | if (ret == 0) { | |
205 | break; | |
206 | } | |
207 | } | |
208 | return total_bytes; | |
209 | } | |
210 | ||
211 | Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) { | |
212 | RETURN_NOT_OK(CheckClosed()); | |
213 | ||
214 | ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); | |
215 | ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())); | |
216 | if (bytes_read < nbytes) { | |
217 | RETURN_NOT_OK(buffer->Resize(bytes_read)); | |
218 | } | |
219 | return std::move(buffer); | |
220 | } | |
221 | ||
222 | Result<int64_t> GetSize() { | |
223 | RETURN_NOT_OK(CheckClosed()); | |
224 | ||
225 | hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path_.c_str()); | |
226 | if (entry == nullptr) { | |
227 | return GetPathInfoFailed(path_); | |
228 | } | |
229 | int64_t size = entry->mSize; | |
230 | driver_->FreeFileInfo(entry, 1); | |
231 | return size; | |
232 | } | |
233 | ||
234 | void set_memory_pool(MemoryPool* pool) { pool_ = pool; } | |
235 | ||
236 | void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; } | |
237 | ||
238 | private: | |
239 | MemoryPool* pool_; | |
240 | int32_t buffer_size_; | |
241 | }; | |
242 | ||
243 | HdfsReadableFile::HdfsReadableFile(const io::IOContext& io_context) { | |
244 | impl_.reset(new HdfsReadableFileImpl(io_context.pool())); | |
245 | } | |
246 | ||
247 | HdfsReadableFile::~HdfsReadableFile() { DCHECK_OK(impl_->Close()); } | |
248 | ||
249 | Status HdfsReadableFile::Close() { return impl_->Close(); } | |
250 | ||
251 | bool HdfsReadableFile::closed() const { return impl_->closed(); } | |
252 | ||
253 | Result<int64_t> HdfsReadableFile::ReadAt(int64_t position, int64_t nbytes, void* buffer) { | |
254 | return impl_->ReadAt(position, nbytes, reinterpret_cast<uint8_t*>(buffer)); | |
255 | } | |
256 | ||
257 | Result<std::shared_ptr<Buffer>> HdfsReadableFile::ReadAt(int64_t position, | |
258 | int64_t nbytes) { | |
259 | return impl_->ReadAt(position, nbytes); | |
260 | } | |
261 | ||
262 | Result<int64_t> HdfsReadableFile::Read(int64_t nbytes, void* buffer) { | |
263 | return impl_->Read(nbytes, buffer); | |
264 | } | |
265 | ||
266 | Result<std::shared_ptr<Buffer>> HdfsReadableFile::Read(int64_t nbytes) { | |
267 | return impl_->Read(nbytes); | |
268 | } | |
269 | ||
270 | Result<int64_t> HdfsReadableFile::GetSize() { return impl_->GetSize(); } | |
271 | ||
272 | Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); } | |
273 | ||
274 | Result<int64_t> HdfsReadableFile::Tell() const { return impl_->Tell(); } | |
275 | ||
276 | // ---------------------------------------------------------------------- | |
277 | // File writing | |
278 | ||
279 | // Private implementation for writable-only files | |
280 | class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl { | |
281 | public: | |
282 | HdfsOutputStreamImpl() {} | |
283 | ||
284 | Status Close() { | |
285 | if (is_open_) { | |
286 | // is_open_ must be set to false in the beginning, because the destructor | |
287 | // attempts to close the stream again, and if the first close fails, then | |
288 | // the error doesn't get propagated properly and the second close | |
289 | // initiated by the destructor raises a segfault | |
290 | is_open_ = false; | |
291 | RETURN_NOT_OK(FlushInternal()); | |
292 | int ret = driver_->CloseFile(fs_, file_); | |
293 | CHECK_FAILURE(ret, "CloseFile"); | |
294 | } | |
295 | return Status::OK(); | |
296 | } | |
297 | ||
298 | bool closed() const { return !is_open_; } | |
299 | ||
300 | Status Flush() { | |
301 | RETURN_NOT_OK(CheckClosed()); | |
302 | ||
303 | return FlushInternal(); | |
304 | } | |
305 | ||
306 | Status Write(const uint8_t* buffer, int64_t nbytes) { | |
307 | RETURN_NOT_OK(CheckClosed()); | |
308 | ||
309 | constexpr int64_t kMaxBlockSize = std::numeric_limits<int32_t>::max(); | |
310 | ||
311 | std::lock_guard<std::mutex> guard(lock_); | |
312 | while (nbytes > 0) { | |
313 | const auto block_size = static_cast<tSize>(std::min(kMaxBlockSize, nbytes)); | |
314 | tSize ret = driver_->Write(fs_, file_, buffer, block_size); | |
315 | CHECK_FAILURE(ret, "Write"); | |
316 | DCHECK_LE(ret, block_size); | |
317 | buffer += ret; | |
318 | nbytes -= ret; | |
319 | } | |
320 | return Status::OK(); | |
321 | } | |
322 | ||
323 | protected: | |
324 | Status FlushInternal() { | |
325 | int ret = driver_->Flush(fs_, file_); | |
326 | CHECK_FAILURE(ret, "Flush"); | |
327 | return Status::OK(); | |
328 | } | |
329 | }; | |
330 | ||
331 | HdfsOutputStream::HdfsOutputStream() { impl_.reset(new HdfsOutputStreamImpl()); } | |
332 | ||
333 | HdfsOutputStream::~HdfsOutputStream() { DCHECK_OK(impl_->Close()); } | |
334 | ||
335 | Status HdfsOutputStream::Close() { return impl_->Close(); } | |
336 | ||
337 | bool HdfsOutputStream::closed() const { return impl_->closed(); } | |
338 | ||
339 | Status HdfsOutputStream::Write(const void* buffer, int64_t nbytes) { | |
340 | return impl_->Write(reinterpret_cast<const uint8_t*>(buffer), nbytes); | |
341 | } | |
342 | ||
343 | Status HdfsOutputStream::Flush() { return impl_->Flush(); } | |
344 | ||
345 | Result<int64_t> HdfsOutputStream::Tell() const { return impl_->Tell(); } | |
346 | ||
347 | // ---------------------------------------------------------------------- | |
348 | // HDFS client | |
349 | ||
350 | // TODO(wesm): this could throw std::bad_alloc in the course of copying strings | |
351 | // into the path info object | |
352 | static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) { | |
353 | out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY; | |
354 | out->name = std::string(input->mName); | |
355 | out->owner = std::string(input->mOwner); | |
356 | out->group = std::string(input->mGroup); | |
357 | ||
358 | out->last_access_time = static_cast<int32_t>(input->mLastAccess); | |
359 | out->last_modified_time = static_cast<int32_t>(input->mLastMod); | |
360 | out->size = static_cast<int64_t>(input->mSize); | |
361 | ||
362 | out->replication = input->mReplication; | |
363 | out->block_size = input->mBlockSize; | |
364 | ||
365 | out->permissions = input->mPermissions; | |
366 | } | |
367 | ||
368 | // Private implementation | |
369 | class HadoopFileSystem::HadoopFileSystemImpl { | |
370 | public: | |
371 | HadoopFileSystemImpl() : driver_(NULLPTR), port_(0), fs_(NULLPTR) {} | |
372 | ||
373 | Status Connect(const HdfsConnectionConfig* config) { | |
374 | RETURN_NOT_OK(ConnectLibHdfs(&driver_)); | |
375 | ||
376 | // connect to HDFS with the builder object | |
377 | hdfsBuilder* builder = driver_->NewBuilder(); | |
378 | if (!config->host.empty()) { | |
379 | driver_->BuilderSetNameNode(builder, config->host.c_str()); | |
380 | } | |
381 | driver_->BuilderSetNameNodePort(builder, static_cast<tPort>(config->port)); | |
382 | if (!config->user.empty()) { | |
383 | driver_->BuilderSetUserName(builder, config->user.c_str()); | |
384 | } | |
385 | if (!config->kerb_ticket.empty()) { | |
386 | driver_->BuilderSetKerbTicketCachePath(builder, config->kerb_ticket.c_str()); | |
387 | } | |
388 | ||
389 | for (const auto& kv : config->extra_conf) { | |
390 | int ret = driver_->BuilderConfSetStr(builder, kv.first.c_str(), kv.second.c_str()); | |
391 | CHECK_FAILURE(ret, "confsetstr"); | |
392 | } | |
393 | ||
394 | driver_->BuilderSetForceNewInstance(builder); | |
395 | fs_ = driver_->BuilderConnect(builder); | |
396 | ||
397 | if (fs_ == nullptr) { | |
398 | return Status::IOError("HDFS connection failed"); | |
399 | } | |
400 | namenode_host_ = config->host; | |
401 | port_ = config->port; | |
402 | user_ = config->user; | |
403 | kerb_ticket_ = config->kerb_ticket; | |
404 | ||
405 | return Status::OK(); | |
406 | } | |
407 | ||
408 | Status MakeDirectory(const std::string& path) { | |
409 | int ret = driver_->MakeDirectory(fs_, path.c_str()); | |
410 | CHECK_FAILURE(ret, "create directory"); | |
411 | return Status::OK(); | |
412 | } | |
413 | ||
414 | Status Delete(const std::string& path, bool recursive) { | |
415 | int ret = driver_->Delete(fs_, path.c_str(), static_cast<int>(recursive)); | |
416 | CHECK_FAILURE(ret, "delete"); | |
417 | return Status::OK(); | |
418 | } | |
419 | ||
420 | Status Disconnect() { | |
421 | int ret = driver_->Disconnect(fs_); | |
422 | CHECK_FAILURE(ret, "hdfsFS::Disconnect"); | |
423 | return Status::OK(); | |
424 | } | |
425 | ||
426 | bool Exists(const std::string& path) { | |
427 | // hdfsExists does not distinguish between RPC failure and the file not | |
428 | // existing | |
429 | int ret = driver_->Exists(fs_, path.c_str()); | |
430 | return ret == 0; | |
431 | } | |
432 | ||
433 | Status GetCapacity(int64_t* nbytes) { | |
434 | tOffset ret = driver_->GetCapacity(fs_); | |
435 | CHECK_FAILURE(ret, "GetCapacity"); | |
436 | *nbytes = ret; | |
437 | return Status::OK(); | |
438 | } | |
439 | ||
440 | Status GetUsed(int64_t* nbytes) { | |
441 | tOffset ret = driver_->GetUsed(fs_); | |
442 | CHECK_FAILURE(ret, "GetUsed"); | |
443 | *nbytes = ret; | |
444 | return Status::OK(); | |
445 | } | |
446 | ||
447 | Status GetWorkingDirectory(std::string* out) { | |
448 | char buffer[2048]; | |
449 | if (driver_->GetWorkingDirectory(fs_, buffer, sizeof(buffer) - 1) == nullptr) { | |
450 | return Status::IOError("HDFS GetWorkingDirectory failed, errno: ", | |
451 | TranslateErrno(errno)); | |
452 | } | |
453 | *out = buffer; | |
454 | return Status::OK(); | |
455 | } | |
456 | ||
457 | Status GetPathInfo(const std::string& path, HdfsPathInfo* info) { | |
458 | hdfsFileInfo* entry = driver_->GetPathInfo(fs_, path.c_str()); | |
459 | ||
460 | if (entry == nullptr) { | |
461 | return GetPathInfoFailed(path); | |
462 | } | |
463 | ||
464 | SetPathInfo(entry, info); | |
465 | driver_->FreeFileInfo(entry, 1); | |
466 | ||
467 | return Status::OK(); | |
468 | } | |
469 | ||
470 | Status Stat(const std::string& path, FileStatistics* stat) { | |
471 | HdfsPathInfo info; | |
472 | RETURN_NOT_OK(GetPathInfo(path, &info)); | |
473 | ||
474 | stat->size = info.size; | |
475 | stat->kind = info.kind; | |
476 | return Status::OK(); | |
477 | } | |
478 | ||
479 | Status GetChildren(const std::string& path, std::vector<std::string>* listing) { | |
480 | std::vector<HdfsPathInfo> detailed_listing; | |
481 | RETURN_NOT_OK(ListDirectory(path, &detailed_listing)); | |
482 | for (const auto& info : detailed_listing) { | |
483 | listing->push_back(info.name); | |
484 | } | |
485 | return Status::OK(); | |
486 | } | |
487 | ||
488 | Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) { | |
489 | int num_entries = 0; | |
490 | errno = 0; | |
491 | hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries); | |
492 | ||
493 | if (entries == nullptr) { | |
494 | // If the directory is empty, entries is NULL but errno is 0. Non-zero | |
495 | // errno indicates error | |
496 | // | |
497 | // Note: errno is thread-local | |
498 | // | |
499 | // XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set | |
500 | // errno 2/ENOENT for empty directories. To be more robust to this we | |
501 | // double check this case | |
502 | if ((errno == 0) || (errno == ENOENT && Exists(path))) { | |
503 | num_entries = 0; | |
504 | } else { | |
505 | return Status::IOError("HDFS list directory failed, errno: ", | |
506 | TranslateErrno(errno)); | |
507 | } | |
508 | } | |
509 | ||
510 | // Allocate additional space for elements | |
511 | int vec_offset = static_cast<int>(listing->size()); | |
512 | listing->resize(vec_offset + num_entries); | |
513 | ||
514 | for (int i = 0; i < num_entries; ++i) { | |
515 | SetPathInfo(entries + i, &(*listing)[vec_offset + i]); | |
516 | } | |
517 | ||
518 | // Free libhdfs file info | |
519 | driver_->FreeFileInfo(entries, num_entries); | |
520 | ||
521 | return Status::OK(); | |
522 | } | |
523 | ||
524 | Status OpenReadable(const std::string& path, int32_t buffer_size, | |
525 | const io::IOContext& io_context, | |
526 | std::shared_ptr<HdfsReadableFile>* file) { | |
527 | errno = 0; | |
528 | hdfsFile handle = driver_->OpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0); | |
529 | ||
530 | if (handle == nullptr) { | |
531 | if (errno) { | |
532 | return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed"); | |
533 | } else { | |
534 | return Status::IOError("Opening HDFS file '", path, "' failed"); | |
535 | } | |
536 | } | |
537 | ||
538 | // std::make_shared does not work with private ctors | |
539 | *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile(io_context)); | |
540 | (*file)->impl_->set_members(path, driver_, fs_, handle); | |
541 | (*file)->impl_->set_buffer_size(buffer_size); | |
542 | ||
543 | return Status::OK(); | |
544 | } | |
545 | ||
546 | Status OpenWritable(const std::string& path, bool append, int32_t buffer_size, | |
547 | int16_t replication, int64_t default_block_size, | |
548 | std::shared_ptr<HdfsOutputStream>* file) { | |
549 | int flags = O_WRONLY; | |
550 | if (append) flags |= O_APPEND; | |
551 | ||
552 | errno = 0; | |
553 | hdfsFile handle = | |
554 | driver_->OpenFile(fs_, path.c_str(), flags, buffer_size, replication, | |
555 | static_cast<tSize>(default_block_size)); | |
556 | ||
557 | if (handle == nullptr) { | |
558 | if (errno) { | |
559 | return IOErrorFromErrno(errno, "Opening HDFS file '", path, "' failed"); | |
560 | } else { | |
561 | return Status::IOError("Opening HDFS file '", path, "' failed"); | |
562 | } | |
563 | } | |
564 | ||
565 | // std::make_shared does not work with private ctors | |
566 | *file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream()); | |
567 | (*file)->impl_->set_members(path, driver_, fs_, handle); | |
568 | ||
569 | return Status::OK(); | |
570 | } | |
571 | ||
572 | Status Rename(const std::string& src, const std::string& dst) { | |
573 | int ret = driver_->Rename(fs_, src.c_str(), dst.c_str()); | |
574 | CHECK_FAILURE(ret, "Rename"); | |
575 | return Status::OK(); | |
576 | } | |
577 | ||
578 | Status Copy(const std::string& src, const std::string& dst) { | |
579 | int ret = driver_->Copy(fs_, src.c_str(), fs_, dst.c_str()); | |
580 | CHECK_FAILURE(ret, "Rename"); | |
581 | return Status::OK(); | |
582 | } | |
583 | ||
584 | Status Move(const std::string& src, const std::string& dst) { | |
585 | int ret = driver_->Move(fs_, src.c_str(), fs_, dst.c_str()); | |
586 | CHECK_FAILURE(ret, "Rename"); | |
587 | return Status::OK(); | |
588 | } | |
589 | ||
590 | Status Chmod(const std::string& path, int mode) { | |
591 | int ret = driver_->Chmod(fs_, path.c_str(), static_cast<short>(mode)); // NOLINT | |
592 | CHECK_FAILURE(ret, "Chmod"); | |
593 | return Status::OK(); | |
594 | } | |
595 | ||
596 | Status Chown(const std::string& path, const char* owner, const char* group) { | |
597 | int ret = driver_->Chown(fs_, path.c_str(), owner, group); | |
598 | CHECK_FAILURE(ret, "Chown"); | |
599 | return Status::OK(); | |
600 | } | |
601 | ||
602 | private: | |
603 | internal::LibHdfsShim* driver_; | |
604 | ||
605 | std::string namenode_host_; | |
606 | std::string user_; | |
607 | int port_; | |
608 | std::string kerb_ticket_; | |
609 | ||
610 | hdfsFS fs_; | |
611 | }; | |
612 | ||
613 | // ---------------------------------------------------------------------- | |
614 | // Public API for HDFSClient | |
615 | ||
616 | HadoopFileSystem::HadoopFileSystem() { impl_.reset(new HadoopFileSystemImpl()); } | |
617 | ||
618 | HadoopFileSystem::~HadoopFileSystem() {} | |
619 | ||
620 | Status HadoopFileSystem::Connect(const HdfsConnectionConfig* config, | |
621 | std::shared_ptr<HadoopFileSystem>* fs) { | |
622 | // ctor is private, make_shared will not work | |
623 | *fs = std::shared_ptr<HadoopFileSystem>(new HadoopFileSystem()); | |
624 | ||
625 | RETURN_NOT_OK((*fs)->impl_->Connect(config)); | |
626 | return Status::OK(); | |
627 | } | |
628 | ||
629 | Status HadoopFileSystem::MakeDirectory(const std::string& path) { | |
630 | return impl_->MakeDirectory(path); | |
631 | } | |
632 | ||
633 | Status HadoopFileSystem::Delete(const std::string& path, bool recursive) { | |
634 | return impl_->Delete(path, recursive); | |
635 | } | |
636 | ||
637 | Status HadoopFileSystem::DeleteDirectory(const std::string& path) { | |
638 | return Delete(path, true); | |
639 | } | |
640 | ||
641 | Status HadoopFileSystem::Disconnect() { return impl_->Disconnect(); } | |
642 | ||
643 | bool HadoopFileSystem::Exists(const std::string& path) { return impl_->Exists(path); } | |
644 | ||
645 | Status HadoopFileSystem::GetPathInfo(const std::string& path, HdfsPathInfo* info) { | |
646 | return impl_->GetPathInfo(path, info); | |
647 | } | |
648 | ||
649 | Status HadoopFileSystem::Stat(const std::string& path, FileStatistics* stat) { | |
650 | return impl_->Stat(path, stat); | |
651 | } | |
652 | ||
653 | Status HadoopFileSystem::GetCapacity(int64_t* nbytes) { | |
654 | return impl_->GetCapacity(nbytes); | |
655 | } | |
656 | ||
657 | Status HadoopFileSystem::GetUsed(int64_t* nbytes) { return impl_->GetUsed(nbytes); } | |
658 | ||
659 | Status HadoopFileSystem::GetWorkingDirectory(std::string* out) { | |
660 | return impl_->GetWorkingDirectory(out); | |
661 | } | |
662 | ||
663 | Status HadoopFileSystem::GetChildren(const std::string& path, | |
664 | std::vector<std::string>* listing) { | |
665 | return impl_->GetChildren(path, listing); | |
666 | } | |
667 | ||
668 | Status HadoopFileSystem::ListDirectory(const std::string& path, | |
669 | std::vector<HdfsPathInfo>* listing) { | |
670 | return impl_->ListDirectory(path, listing); | |
671 | } | |
672 | ||
673 | Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, | |
674 | std::shared_ptr<HdfsReadableFile>* file) { | |
675 | return impl_->OpenReadable(path, buffer_size, io::default_io_context(), file); | |
676 | } | |
677 | ||
678 | Status HadoopFileSystem::OpenReadable(const std::string& path, | |
679 | std::shared_ptr<HdfsReadableFile>* file) { | |
680 | return OpenReadable(path, kDefaultHdfsBufferSize, io::default_io_context(), file); | |
681 | } | |
682 | ||
683 | Status HadoopFileSystem::OpenReadable(const std::string& path, int32_t buffer_size, | |
684 | const io::IOContext& io_context, | |
685 | std::shared_ptr<HdfsReadableFile>* file) { | |
686 | return impl_->OpenReadable(path, buffer_size, io_context, file); | |
687 | } | |
688 | ||
689 | Status HadoopFileSystem::OpenReadable(const std::string& path, | |
690 | const io::IOContext& io_context, | |
691 | std::shared_ptr<HdfsReadableFile>* file) { | |
692 | return OpenReadable(path, kDefaultHdfsBufferSize, io_context, file); | |
693 | } | |
694 | ||
695 | Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, | |
696 | int32_t buffer_size, int16_t replication, | |
697 | int64_t default_block_size, | |
698 | std::shared_ptr<HdfsOutputStream>* file) { | |
699 | return impl_->OpenWritable(path, append, buffer_size, replication, default_block_size, | |
700 | file); | |
701 | } | |
702 | ||
703 | Status HadoopFileSystem::OpenWritable(const std::string& path, bool append, | |
704 | std::shared_ptr<HdfsOutputStream>* file) { | |
705 | return OpenWritable(path, append, 0, 0, 0, file); | |
706 | } | |
707 | ||
708 | Status HadoopFileSystem::Chmod(const std::string& path, int mode) { | |
709 | return impl_->Chmod(path, mode); | |
710 | } | |
711 | ||
712 | Status HadoopFileSystem::Chown(const std::string& path, const char* owner, | |
713 | const char* group) { | |
714 | return impl_->Chown(path, owner, group); | |
715 | } | |
716 | ||
717 | Status HadoopFileSystem::Rename(const std::string& src, const std::string& dst) { | |
718 | return impl_->Rename(src, dst); | |
719 | } | |
720 | ||
721 | Status HadoopFileSystem::Copy(const std::string& src, const std::string& dst) { | |
722 | return impl_->Copy(src, dst); | |
723 | } | |
724 | ||
725 | Status HadoopFileSystem::Move(const std::string& src, const std::string& dst) { | |
726 | return impl_->Move(src, dst); | |
727 | } | |
728 | ||
729 | // ---------------------------------------------------------------------- | |
730 | // Allow public API users to check whether we are set up correctly | |
731 | ||
732 | Status HaveLibHdfs() { | |
733 | internal::LibHdfsShim* driver; | |
734 | return internal::ConnectLibHdfs(&driver); | |
735 | } | |
736 | ||
737 | } // namespace io | |
738 | } // namespace arrow |