]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/env/env_hdfs.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #include "rocksdb/env.h"
8 #include "hdfs/env_hdfs.h"
11 #ifndef ROCKSDB_HDFS_FILE_C
12 #define ROCKSDB_HDFS_FILE_C
19 #include "logging/logging.h"
20 #include "rocksdb/status.h"
21 #include "util/string_util.h"
24 #define HDFS_DOESNT_EXIST -1
25 #define HDFS_SUCCESS 0
28 // This file defines an HDFS environment for rocksdb. It uses the libhdfs
29 // api to access HDFS. All HDFS files created by one instance of rocksdb
30 // will reside on the same HDFS cluster.
33 namespace ROCKSDB_NAMESPACE
{
38 static Status
IOError(const std::string
& context
, int err_number
) {
39 return (err_number
== ENOSPC
)
40 ? Status::NoSpace(context
, strerror(err_number
))
41 : (err_number
== ENOENT
)
42 ? Status::PathNotFound(context
, strerror(err_number
))
43 : Status::IOError(context
, strerror(err_number
));
46 // assume that there is one global logger for now. It is not thread-safe,
47 // but need not be because the logger is initialized at db-open time.
48 static Logger
* mylog
= nullptr;
50 // Used for reading a file from HDFS. It implements both sequential-read
51 // access methods as well as random read access methods.
52 class HdfsReadableFile
: virtual public SequentialFile
,
53 virtual public RandomAccessFile
{
56 std::string filename_
;
60 HdfsReadableFile(hdfsFS fileSys
, const std::string
& fname
)
61 : fileSys_(fileSys
), filename_(fname
), hfile_(nullptr) {
62 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile opening file %s\n",
64 hfile_
= hdfsOpenFile(fileSys_
, filename_
.c_str(), O_RDONLY
, 0, 0, 0);
65 ROCKS_LOG_DEBUG(mylog
,
66 "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
67 filename_
.c_str(), hfile_
);
70 virtual ~HdfsReadableFile() {
71 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile closing file %s\n",
73 hdfsCloseFile(fileSys_
, hfile_
);
74 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile closed file %s\n",
80 return hfile_
!= nullptr;
83 // sequential access, read data at current offset in file
84 virtual Status
Read(size_t n
, Slice
* result
, char* scratch
) {
86 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile reading %s %ld\n",
87 filename_
.c_str(), n
);
89 char* buffer
= scratch
;
90 size_t total_bytes_read
= 0;
92 tSize remaining_bytes
= (tSize
)n
;
94 // Read a total of n bytes repeatedly until we hit error or eof
95 while (remaining_bytes
> 0) {
96 bytes_read
= hdfsRead(fileSys_
, hfile_
, buffer
, remaining_bytes
);
97 if (bytes_read
<= 0) {
100 assert(bytes_read
<= remaining_bytes
);
102 total_bytes_read
+= bytes_read
;
103 remaining_bytes
-= bytes_read
;
104 buffer
+= bytes_read
;
106 assert(total_bytes_read
<= n
);
108 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile read %s\n",
111 if (bytes_read
< 0) {
112 s
= IOError(filename_
, errno
);
114 *result
= Slice(scratch
, total_bytes_read
);
120 // random access, read data from specified offset in file
121 virtual Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
122 char* scratch
) const {
124 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile preading %s\n",
127 hdfsPread(fileSys_
, hfile_
, offset
, static_cast<void*>(scratch
),
128 static_cast<tSize
>(n
));
129 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile pread %s\n",
131 *result
= Slice(scratch
, (bytes_read
< 0) ? 0 : bytes_read
);
132 if (bytes_read
< 0) {
133 // An error: return a non-ok status
134 s
= IOError(filename_
, errno
);
139 virtual Status
Skip(uint64_t n
) {
140 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile skip %s\n",
142 // get current offset from file
143 tOffset current
= hdfsTell(fileSys_
, hfile_
);
145 return IOError(filename_
, errno
);
147 // seek to new offset in file
148 tOffset newoffset
= current
+ n
;
149 int val
= hdfsSeek(fileSys_
, hfile_
, newoffset
);
151 return IOError(filename_
, errno
);
158 // returns true if we are at the end of file, false otherwise
160 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile feof %s\n",
162 if (hdfsTell(fileSys_
, hfile_
) == fileSize()) {
168 // the current size of the file
170 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile fileSize %s\n",
172 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, filename_
.c_str());
174 if (pFileInfo
!= nullptr) {
175 size
= pFileInfo
->mSize
;
176 hdfsFreeFileInfo(pFileInfo
, 1);
178 throw HdfsFatalException("fileSize on unknown file " + filename_
);
184 // Appends to an existing file in HDFS.
185 class HdfsWritableFile
: public WritableFile
{
188 std::string filename_
;
192 HdfsWritableFile(hdfsFS fileSys
, const std::string
& fname
,
193 const EnvOptions
& options
)
194 : WritableFile(options
),
198 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile opening %s\n",
200 hfile_
= hdfsOpenFile(fileSys_
, filename_
.c_str(), O_WRONLY
, 0, 0, 0);
201 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile opened %s\n",
203 assert(hfile_
!= nullptr);
205 virtual ~HdfsWritableFile() {
206 if (hfile_
!= nullptr) {
207 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closing %s\n",
209 hdfsCloseFile(fileSys_
, hfile_
);
210 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closed %s\n",
216 // If the file was successfully created, then this returns true.
217 // Otherwise returns false.
219 return hfile_
!= nullptr;
222 // The name of the file, mostly needed for debug logging.
223 const std::string
& getName() {
227 virtual Status
Append(const Slice
& data
) {
228 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Append %s\n",
230 const char* src
= data
.data();
231 size_t left
= data
.size();
232 size_t ret
= hdfsWrite(fileSys_
, hfile_
, src
, static_cast<tSize
>(left
));
233 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Appended %s\n",
236 return IOError(filename_
, errno
);
241 virtual Status
Flush() {
245 virtual Status
Sync() {
247 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Sync %s\n",
249 if (hdfsFlush(fileSys_
, hfile_
) == -1) {
250 return IOError(filename_
, errno
);
252 if (hdfsHSync(fileSys_
, hfile_
) == -1) {
253 return IOError(filename_
, errno
);
255 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Synced %s\n",
260 // This is used by HdfsLogger to write data to the debug log file
261 virtual Status
Append(const char* src
, size_t size
) {
262 if (hdfsWrite(fileSys_
, hfile_
, src
, static_cast<tSize
>(size
)) !=
263 static_cast<tSize
>(size
)) {
264 return IOError(filename_
, errno
);
269 virtual Status
Close() {
270 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closing %s\n",
272 if (hdfsCloseFile(fileSys_
, hfile_
) != 0) {
273 return IOError(filename_
, errno
);
275 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closed %s\n",
282 // The object that implements the debug logs to reside in HDFS.
283 class HdfsLogger
: public Logger
{
285 HdfsWritableFile
* file_
;
286 uint64_t (*gettid_
)(); // Return the thread id for the current thread
288 Status
HdfsCloseHelper() {
289 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsLogger closed %s\n",
290 file_
->getName().c_str());
291 if (mylog
!= nullptr && mylog
== this) {
298 virtual Status
CloseImpl() override
{ return HdfsCloseHelper(); }
301 HdfsLogger(HdfsWritableFile
* f
, uint64_t (*gettid
)())
302 : file_(f
), gettid_(gettid
) {
303 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsLogger opened %s\n",
304 file_
->getName().c_str());
307 ~HdfsLogger() override
{
315 void Logv(const char* format
, va_list ap
) override
{
316 const uint64_t thread_id
= (*gettid_
)();
318 // We try twice: the first time with a fixed-size stack allocated buffer,
319 // and the second time with a much larger dynamically allocated buffer.
321 for (int iter
= 0; iter
< 2; iter
++) {
325 bufsize
= sizeof(buffer
);
329 base
= new char[bufsize
];
332 char* limit
= base
+ bufsize
;
334 struct timeval now_tv
;
335 gettimeofday(&now_tv
, nullptr);
336 const time_t seconds
= now_tv
.tv_sec
;
338 localtime_r(&seconds
, &t
);
339 p
+= snprintf(p
, limit
- p
,
340 "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
347 static_cast<int>(now_tv
.tv_usec
),
348 static_cast<long long unsigned int>(thread_id
));
353 va_copy(backup_ap
, ap
);
354 p
+= vsnprintf(p
, limit
- p
, format
, backup_ap
);
358 // Truncate to available space if necessary
361 continue; // Try again with larger buffer
367 // Add newline if necessary
368 if (p
== base
|| p
[-1] != '\n') {
373 file_
->Append(base
, p
-base
);
375 if (base
!= buffer
) {
385 // Finally, the hdfs environment
387 const std::string
HdfsEnv::kProto
= "hdfs://";
388 const std::string
HdfsEnv::pathsep
= "/";
390 // open a file for sequential reading
391 Status
HdfsEnv::NewSequentialFile(const std::string
& fname
,
392 std::unique_ptr
<SequentialFile
>* result
,
393 const EnvOptions
& /*options*/) {
395 HdfsReadableFile
* f
= new HdfsReadableFile(fileSys_
, fname
);
396 if (f
== nullptr || !f
->isValid()) {
399 return IOError(fname
, errno
);
401 result
->reset(dynamic_cast<SequentialFile
*>(f
));
405 // open a file for random reading
406 Status
HdfsEnv::NewRandomAccessFile(const std::string
& fname
,
407 std::unique_ptr
<RandomAccessFile
>* result
,
408 const EnvOptions
& /*options*/) {
410 HdfsReadableFile
* f
= new HdfsReadableFile(fileSys_
, fname
);
411 if (f
== nullptr || !f
->isValid()) {
414 return IOError(fname
, errno
);
416 result
->reset(dynamic_cast<RandomAccessFile
*>(f
));
420 // create a new file for writing
421 Status
HdfsEnv::NewWritableFile(const std::string
& fname
,
422 std::unique_ptr
<WritableFile
>* result
,
423 const EnvOptions
& options
) {
426 HdfsWritableFile
* f
= new HdfsWritableFile(fileSys_
, fname
, options
);
427 if (f
== nullptr || !f
->isValid()) {
430 return IOError(fname
, errno
);
432 result
->reset(dynamic_cast<WritableFile
*>(f
));
436 class HdfsDirectory
: public Directory
{
438 explicit HdfsDirectory(int fd
) : fd_(fd
) {}
441 Status
Fsync() override
{ return Status::OK(); }
443 int GetFd() const { return fd_
; }
449 Status
HdfsEnv::NewDirectory(const std::string
& name
,
450 std::unique_ptr
<Directory
>* result
) {
451 int value
= hdfsExists(fileSys_
, name
.c_str());
454 result
->reset(new HdfsDirectory(0));
456 default: // fail if the directory doesn't exist
457 ROCKS_LOG_FATAL(mylog
, "NewDirectory hdfsExists call failed");
458 throw HdfsFatalException("hdfsExists call failed with error " +
459 ToString(value
) + " on path " + name
+
464 Status
HdfsEnv::FileExists(const std::string
& fname
) {
465 int value
= hdfsExists(fileSys_
, fname
.c_str());
469 case HDFS_DOESNT_EXIST
:
470 return Status::NotFound();
471 default: // anything else should be an error
472 ROCKS_LOG_FATAL(mylog
, "FileExists hdfsExists call failed");
473 return Status::IOError("hdfsExists call failed with error " +
474 ToString(value
) + " on path " + fname
+ ".\n");
478 Status
HdfsEnv::GetChildren(const std::string
& path
,
479 std::vector
<std::string
>* result
) {
480 int value
= hdfsExists(fileSys_
, path
.c_str());
482 case HDFS_EXISTS
: { // directory exists
484 hdfsFileInfo
* pHdfsFileInfo
= 0;
485 pHdfsFileInfo
= hdfsListDirectory(fileSys_
, path
.c_str(), &numEntries
);
486 if (numEntries
>= 0) {
487 for(int i
= 0; i
< numEntries
; i
++) {
488 std::string
pathname(pHdfsFileInfo
[i
].mName
);
489 size_t pos
= pathname
.rfind("/");
490 if (std::string::npos
!= pos
) {
491 result
->push_back(pathname
.substr(pos
+ 1));
494 if (pHdfsFileInfo
!= nullptr) {
495 hdfsFreeFileInfo(pHdfsFileInfo
, numEntries
);
498 // numEntries < 0 indicates error
499 ROCKS_LOG_FATAL(mylog
, "hdfsListDirectory call failed with error ");
500 throw HdfsFatalException(
501 "hdfsListDirectory call failed negative error.\n");
505 case HDFS_DOESNT_EXIST
: // directory does not exist, exit
506 return Status::NotFound();
507 default: // anything else should be an error
508 ROCKS_LOG_FATAL(mylog
, "GetChildren hdfsExists call failed");
509 throw HdfsFatalException("hdfsExists call failed with error " +
510 ToString(value
) + ".\n");
515 Status
HdfsEnv::DeleteFile(const std::string
& fname
) {
516 if (hdfsDelete(fileSys_
, fname
.c_str(), 1) == 0) {
519 return IOError(fname
, errno
);
522 Status
HdfsEnv::CreateDir(const std::string
& name
) {
523 if (hdfsCreateDirectory(fileSys_
, name
.c_str()) == 0) {
526 return IOError(name
, errno
);
529 Status
HdfsEnv::CreateDirIfMissing(const std::string
& name
) {
530 const int value
= hdfsExists(fileSys_
, name
.c_str());
531 // Not atomic. state might change b/w hdfsExists and CreateDir.
535 case HDFS_DOESNT_EXIST
:
536 return CreateDir(name
);
537 default: // anything else should be an error
538 ROCKS_LOG_FATAL(mylog
, "CreateDirIfMissing hdfsExists call failed");
539 throw HdfsFatalException("hdfsExists call failed with error " +
540 ToString(value
) + ".\n");
544 Status
HdfsEnv::DeleteDir(const std::string
& name
) {
545 return DeleteFile(name
);
548 Status
HdfsEnv::GetFileSize(const std::string
& fname
, uint64_t* size
) {
550 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, fname
.c_str());
551 if (pFileInfo
!= nullptr) {
552 *size
= pFileInfo
->mSize
;
553 hdfsFreeFileInfo(pFileInfo
, 1);
556 return IOError(fname
, errno
);
559 Status
HdfsEnv::GetFileModificationTime(const std::string
& fname
,
561 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, fname
.c_str());
562 if (pFileInfo
!= nullptr) {
563 *time
= static_cast<uint64_t>(pFileInfo
->mLastMod
);
564 hdfsFreeFileInfo(pFileInfo
, 1);
567 return IOError(fname
, errno
);
571 // The rename is not atomic. HDFS does not allow a renaming if the
572 // target already exists. So, we delete the target before attempting the
574 Status
HdfsEnv::RenameFile(const std::string
& src
, const std::string
& target
) {
575 hdfsDelete(fileSys_
, target
.c_str(), 1);
576 if (hdfsRename(fileSys_
, src
.c_str(), target
.c_str()) == 0) {
579 return IOError(src
, errno
);
582 Status
HdfsEnv::LockFile(const std::string
& /*fname*/, FileLock
** lock
) {
583 // there isn's a very good way to atomically check and create
584 // a file via libhdfs
589 Status
HdfsEnv::UnlockFile(FileLock
* /*lock*/) { return Status::OK(); }
591 Status
HdfsEnv::NewLogger(const std::string
& fname
,
592 std::shared_ptr
<Logger
>* result
) {
593 // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That
594 // option is only intended for WAL/flush/compaction writes, so turn it off in
597 options
.strict_bytes_per_sync
= false;
598 HdfsWritableFile
* f
= new HdfsWritableFile(fileSys_
, fname
, options
);
599 if (f
== nullptr || !f
->isValid()) {
602 return IOError(fname
, errno
);
604 HdfsLogger
* h
= new HdfsLogger(f
, &HdfsEnv::gettid
);
606 if (mylog
== nullptr) {
607 // mylog = h; // uncomment this for detailed logging
612 Status
HdfsEnv::IsDirectory(const std::string
& path
, bool* is_dir
) {
613 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, path
.c_str());
614 if (pFileInfo
!= nullptr) {
615 if (is_dir
!= nullptr) {
616 *is_dir
= (pFileInfo
->mKind
== kObjectKindDirectory
);
618 hdfsFreeFileInfo(pFileInfo
, 1);
621 return IOError(path
, errno
);
624 // The factory method for creating an HDFS Env
625 Status
NewHdfsEnv(Env
** hdfs_env
, const std::string
& fsname
) {
626 *hdfs_env
= new HdfsEnv(fsname
);
629 } // namespace ROCKSDB_NAMESPACE
631 #endif // ROCKSDB_HDFS_FILE_C
635 // dummy placeholders used when HDFS is not available
636 namespace ROCKSDB_NAMESPACE
{
637 Status
HdfsEnv::NewSequentialFile(const std::string
& /*fname*/,
638 std::unique_ptr
<SequentialFile
>* /*result*/,
639 const EnvOptions
& /*options*/) {
640 return Status::NotSupported("Not compiled with hdfs support");
643 Status
NewHdfsEnv(Env
** /*hdfs_env*/, const std::string
& /*fsname*/) {
644 return Status::NotSupported("Not compiled with hdfs support");
646 } // namespace ROCKSDB_NAMESPACE