]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/env/env_hdfs.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #include "rocksdb/env.h"
8 #include "hdfs/env_hdfs.h"
11 #ifndef ROCKSDB_HDFS_FILE_C
12 #define ROCKSDB_HDFS_FILE_C
20 #include "rocksdb/status.h"
21 #include "util/logging.h"
22 #include "util/string_util.h"
25 #define HDFS_DOESNT_EXIST -1
26 #define HDFS_SUCCESS 0
29 // This file defines an HDFS environment for rocksdb. It uses the libhdfs
30 // api to access HDFS. All HDFS files created by one instance of rocksdb
31 // will reside on the same HDFS cluster.
39 static Status
IOError(const std::string
& context
, int err_number
) {
40 return (err_number
== ENOSPC
)
41 ? Status::NoSpace(context
, strerror(err_number
))
42 : (err_number
== ENOENT
)
43 ? Status::PathNotFound(context
, strerror(err_number
))
44 : Status::IOError(context
, strerror(err_number
));
47 // assume that there is one global logger for now. It is not thread-safe,
48 // but need not be because the logger is initialized at db-open time.
49 static Logger
* mylog
= nullptr;
51 // Used for reading a file from HDFS. It implements both sequential-read
52 // access methods as well as random read access methods.
53 class HdfsReadableFile
: virtual public SequentialFile
,
54 virtual public RandomAccessFile
{
57 std::string filename_
;
61 HdfsReadableFile(hdfsFS fileSys
, const std::string
& fname
)
62 : fileSys_(fileSys
), filename_(fname
), hfile_(nullptr) {
63 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile opening file %s\n",
65 hfile_
= hdfsOpenFile(fileSys_
, filename_
.c_str(), O_RDONLY
, 0, 0, 0);
66 ROCKS_LOG_DEBUG(mylog
,
67 "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
68 filename_
.c_str(), hfile_
);
71 virtual ~HdfsReadableFile() {
72 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile closing file %s\n",
74 hdfsCloseFile(fileSys_
, hfile_
);
75 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile closed file %s\n",
81 return hfile_
!= nullptr;
84 // sequential access, read data at current offset in file
85 virtual Status
Read(size_t n
, Slice
* result
, char* scratch
) {
87 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile reading %s %ld\n",
88 filename_
.c_str(), n
);
90 char* buffer
= scratch
;
91 size_t total_bytes_read
= 0;
93 tSize remaining_bytes
= (tSize
)n
;
95 // Read a total of n bytes repeatedly until we hit error or eof
96 while (remaining_bytes
> 0) {
97 bytes_read
= hdfsRead(fileSys_
, hfile_
, buffer
, remaining_bytes
);
98 if (bytes_read
<= 0) {
101 assert(bytes_read
<= remaining_bytes
);
103 total_bytes_read
+= bytes_read
;
104 remaining_bytes
-= bytes_read
;
105 buffer
+= bytes_read
;
107 assert(total_bytes_read
<= n
);
109 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile read %s\n",
112 if (bytes_read
< 0) {
113 s
= IOError(filename_
, errno
);
115 *result
= Slice(scratch
, total_bytes_read
);
121 // random access, read data from specified offset in file
122 virtual Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
123 char* scratch
) const {
125 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile preading %s\n",
127 ssize_t bytes_read
= hdfsPread(fileSys_
, hfile_
, offset
,
128 (void*)scratch
, (tSize
)n
);
129 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile pread %s\n",
131 *result
= Slice(scratch
, (bytes_read
< 0) ? 0 : bytes_read
);
132 if (bytes_read
< 0) {
133 // An error: return a non-ok status
134 s
= IOError(filename_
, errno
);
139 virtual Status
Skip(uint64_t n
) {
140 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile skip %s\n",
142 // get current offset from file
143 tOffset current
= hdfsTell(fileSys_
, hfile_
);
145 return IOError(filename_
, errno
);
147 // seek to new offset in file
148 tOffset newoffset
= current
+ n
;
149 int val
= hdfsSeek(fileSys_
, hfile_
, newoffset
);
151 return IOError(filename_
, errno
);
158 // returns true if we are at the end of file, false otherwise
160 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile feof %s\n",
162 if (hdfsTell(fileSys_
, hfile_
) == fileSize()) {
168 // the current size of the file
170 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile fileSize %s\n",
172 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, filename_
.c_str());
174 if (pFileInfo
!= nullptr) {
175 size
= pFileInfo
->mSize
;
176 hdfsFreeFileInfo(pFileInfo
, 1);
178 throw HdfsFatalException("fileSize on unknown file " + filename_
);
184 // Appends to an existing file in HDFS.
185 class HdfsWritableFile
: public WritableFile
{
188 std::string filename_
;
192 HdfsWritableFile(hdfsFS fileSys
, const std::string
& fname
)
193 : fileSys_(fileSys
), filename_(fname
) , hfile_(nullptr) {
194 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile opening %s\n",
196 hfile_
= hdfsOpenFile(fileSys_
, filename_
.c_str(), O_WRONLY
, 0, 0, 0);
197 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile opened %s\n",
199 assert(hfile_
!= nullptr);
201 virtual ~HdfsWritableFile() {
202 if (hfile_
!= nullptr) {
203 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closing %s\n",
205 hdfsCloseFile(fileSys_
, hfile_
);
206 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closed %s\n",
212 // If the file was successfully created, then this returns true.
213 // Otherwise returns false.
215 return hfile_
!= nullptr;
218 // The name of the file, mostly needed for debug logging.
219 const std::string
& getName() {
223 virtual Status
Append(const Slice
& data
) {
224 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Append %s\n",
226 const char* src
= data
.data();
227 size_t left
= data
.size();
228 size_t ret
= hdfsWrite(fileSys_
, hfile_
, src
, static_cast<tSize
>(left
));
229 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Appended %s\n",
232 return IOError(filename_
, errno
);
237 virtual Status
Flush() {
241 virtual Status
Sync() {
243 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Sync %s\n",
245 if (hdfsFlush(fileSys_
, hfile_
) == -1) {
246 return IOError(filename_
, errno
);
248 if (hdfsHSync(fileSys_
, hfile_
) == -1) {
249 return IOError(filename_
, errno
);
251 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Synced %s\n",
256 // This is used by HdfsLogger to write data to the debug log file
257 virtual Status
Append(const char* src
, size_t size
) {
258 if (hdfsWrite(fileSys_
, hfile_
, src
, static_cast<tSize
>(size
)) !=
259 static_cast<tSize
>(size
)) {
260 return IOError(filename_
, errno
);
265 virtual Status
Close() {
266 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closing %s\n",
268 if (hdfsCloseFile(fileSys_
, hfile_
) != 0) {
269 return IOError(filename_
, errno
);
271 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closed %s\n",
278 // The object that implements the debug logs to reside in HDFS.
279 class HdfsLogger
: public Logger
{
281 HdfsWritableFile
* file_
;
282 uint64_t (*gettid_
)(); // Return the thread id for the current thread
284 Status
HdfsCloseHelper() {
285 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsLogger closed %s\n",
286 file_
->getName().c_str());
287 if (mylog
!= nullptr && mylog
== this) {
294 virtual Status
CloseImpl() override
{ return HdfsCloseHelper(); }
297 HdfsLogger(HdfsWritableFile
* f
, uint64_t (*gettid
)())
298 : file_(f
), gettid_(gettid
) {
299 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsLogger opened %s\n",
300 file_
->getName().c_str());
303 ~HdfsLogger() override
{
311 void Logv(const char* format
, va_list ap
) override
{
312 const uint64_t thread_id
= (*gettid_
)();
314 // We try twice: the first time with a fixed-size stack allocated buffer,
315 // and the second time with a much larger dynamically allocated buffer.
317 for (int iter
= 0; iter
< 2; iter
++) {
321 bufsize
= sizeof(buffer
);
325 base
= new char[bufsize
];
328 char* limit
= base
+ bufsize
;
330 struct timeval now_tv
;
331 gettimeofday(&now_tv
, nullptr);
332 const time_t seconds
= now_tv
.tv_sec
;
334 localtime_r(&seconds
, &t
);
335 p
+= snprintf(p
, limit
- p
,
336 "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
343 static_cast<int>(now_tv
.tv_usec
),
344 static_cast<long long unsigned int>(thread_id
));
349 va_copy(backup_ap
, ap
);
350 p
+= vsnprintf(p
, limit
- p
, format
, backup_ap
);
354 // Truncate to available space if necessary
357 continue; // Try again with larger buffer
363 // Add newline if necessary
364 if (p
== base
|| p
[-1] != '\n') {
369 file_
->Append(base
, p
-base
);
371 if (base
!= buffer
) {
381 // Finally, the hdfs environment
383 const std::string
HdfsEnv::kProto
= "hdfs://";
384 const std::string
HdfsEnv::pathsep
= "/";
386 // open a file for sequential reading
387 Status
HdfsEnv::NewSequentialFile(const std::string
& fname
,
388 std::unique_ptr
<SequentialFile
>* result
,
389 const EnvOptions
& /*options*/) {
391 HdfsReadableFile
* f
= new HdfsReadableFile(fileSys_
, fname
);
392 if (f
== nullptr || !f
->isValid()) {
395 return IOError(fname
, errno
);
397 result
->reset(dynamic_cast<SequentialFile
*>(f
));
401 // open a file for random reading
402 Status
HdfsEnv::NewRandomAccessFile(const std::string
& fname
,
403 std::unique_ptr
<RandomAccessFile
>* result
,
404 const EnvOptions
& /*options*/) {
406 HdfsReadableFile
* f
= new HdfsReadableFile(fileSys_
, fname
);
407 if (f
== nullptr || !f
->isValid()) {
410 return IOError(fname
, errno
);
412 result
->reset(dynamic_cast<RandomAccessFile
*>(f
));
416 // create a new file for writing
417 Status
HdfsEnv::NewWritableFile(const std::string
& fname
,
418 std::unique_ptr
<WritableFile
>* result
,
419 const EnvOptions
& /*options*/) {
422 HdfsWritableFile
* f
= new HdfsWritableFile(fileSys_
, fname
);
423 if (f
== nullptr || !f
->isValid()) {
426 return IOError(fname
, errno
);
428 result
->reset(dynamic_cast<WritableFile
*>(f
));
432 class HdfsDirectory
: public Directory
{
434 explicit HdfsDirectory(int fd
) : fd_(fd
) {}
437 Status
Fsync() override
{ return Status::OK(); }
439 int GetFd() const { return fd_
; }
445 Status
HdfsEnv::NewDirectory(const std::string
& name
,
446 std::unique_ptr
<Directory
>* result
) {
447 int value
= hdfsExists(fileSys_
, name
.c_str());
450 result
->reset(new HdfsDirectory(0));
452 default: // fail if the directory doesn't exist
453 ROCKS_LOG_FATAL(mylog
, "NewDirectory hdfsExists call failed");
454 throw HdfsFatalException("hdfsExists call failed with error " +
455 ToString(value
) + " on path " + name
+
460 Status
HdfsEnv::FileExists(const std::string
& fname
) {
461 int value
= hdfsExists(fileSys_
, fname
.c_str());
465 case HDFS_DOESNT_EXIST
:
466 return Status::NotFound();
467 default: // anything else should be an error
468 ROCKS_LOG_FATAL(mylog
, "FileExists hdfsExists call failed");
469 return Status::IOError("hdfsExists call failed with error " +
470 ToString(value
) + " on path " + fname
+ ".\n");
474 Status
HdfsEnv::GetChildren(const std::string
& path
,
475 std::vector
<std::string
>* result
) {
476 int value
= hdfsExists(fileSys_
, path
.c_str());
478 case HDFS_EXISTS
: { // directory exists
480 hdfsFileInfo
* pHdfsFileInfo
= 0;
481 pHdfsFileInfo
= hdfsListDirectory(fileSys_
, path
.c_str(), &numEntries
);
482 if (numEntries
>= 0) {
483 for(int i
= 0; i
< numEntries
; i
++) {
484 std::string
pathname(pHdfsFileInfo
[i
].mName
);
485 size_t pos
= pathname
.rfind("/");
486 if (std::string::npos
!= pos
) {
487 result
->push_back(pathname
.substr(pos
+ 1));
490 if (pHdfsFileInfo
!= nullptr) {
491 hdfsFreeFileInfo(pHdfsFileInfo
, numEntries
);
494 // numEntries < 0 indicates error
495 ROCKS_LOG_FATAL(mylog
, "hdfsListDirectory call failed with error ");
496 throw HdfsFatalException(
497 "hdfsListDirectory call failed negative error.\n");
501 case HDFS_DOESNT_EXIST
: // directory does not exist, exit
502 return Status::NotFound();
503 default: // anything else should be an error
504 ROCKS_LOG_FATAL(mylog
, "GetChildren hdfsExists call failed");
505 throw HdfsFatalException("hdfsExists call failed with error " +
506 ToString(value
) + ".\n");
511 Status
HdfsEnv::DeleteFile(const std::string
& fname
) {
512 if (hdfsDelete(fileSys_
, fname
.c_str(), 1) == 0) {
515 return IOError(fname
, errno
);
518 Status
HdfsEnv::CreateDir(const std::string
& name
) {
519 if (hdfsCreateDirectory(fileSys_
, name
.c_str()) == 0) {
522 return IOError(name
, errno
);
525 Status
HdfsEnv::CreateDirIfMissing(const std::string
& name
) {
526 const int value
= hdfsExists(fileSys_
, name
.c_str());
527 // Not atomic. state might change b/w hdfsExists and CreateDir.
531 case HDFS_DOESNT_EXIST
:
532 return CreateDir(name
);
533 default: // anything else should be an error
534 ROCKS_LOG_FATAL(mylog
, "CreateDirIfMissing hdfsExists call failed");
535 throw HdfsFatalException("hdfsExists call failed with error " +
536 ToString(value
) + ".\n");
540 Status
HdfsEnv::DeleteDir(const std::string
& name
) {
541 return DeleteFile(name
);
544 Status
HdfsEnv::GetFileSize(const std::string
& fname
, uint64_t* size
) {
546 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, fname
.c_str());
547 if (pFileInfo
!= nullptr) {
548 *size
= pFileInfo
->mSize
;
549 hdfsFreeFileInfo(pFileInfo
, 1);
552 return IOError(fname
, errno
);
555 Status
HdfsEnv::GetFileModificationTime(const std::string
& fname
,
557 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, fname
.c_str());
558 if (pFileInfo
!= nullptr) {
559 *time
= static_cast<uint64_t>(pFileInfo
->mLastMod
);
560 hdfsFreeFileInfo(pFileInfo
, 1);
563 return IOError(fname
, errno
);
567 // The rename is not atomic. HDFS does not allow a renaming if the
568 // target already exists. So, we delete the target before attempting the
570 Status
HdfsEnv::RenameFile(const std::string
& src
, const std::string
& target
) {
571 hdfsDelete(fileSys_
, target
.c_str(), 1);
572 if (hdfsRename(fileSys_
, src
.c_str(), target
.c_str()) == 0) {
575 return IOError(src
, errno
);
578 Status
HdfsEnv::LockFile(const std::string
& /*fname*/, FileLock
** lock
) {
579 // there isn's a very good way to atomically check and create
580 // a file via libhdfs
585 Status
HdfsEnv::UnlockFile(FileLock
* /*lock*/) { return Status::OK(); }
587 Status
HdfsEnv::NewLogger(const std::string
& fname
,
588 std::shared_ptr
<Logger
>* result
) {
589 HdfsWritableFile
* f
= new HdfsWritableFile(fileSys_
, fname
);
590 if (f
== nullptr || !f
->isValid()) {
593 return IOError(fname
, errno
);
595 HdfsLogger
* h
= new HdfsLogger(f
, &HdfsEnv::gettid
);
597 if (mylog
== nullptr) {
598 // mylog = h; // uncomment this for detailed logging
603 // The factory method for creating an HDFS Env
604 Status
NewHdfsEnv(Env
** hdfs_env
, const std::string
& fsname
) {
605 *hdfs_env
= new HdfsEnv(fsname
);
608 } // namespace rocksdb
610 #endif // ROCKSDB_HDFS_FILE_C
614 // dummy placeholders used when HDFS is not available
616 Status
HdfsEnv::NewSequentialFile(const std::string
& /*fname*/,
617 std::unique_ptr
<SequentialFile
>* /*result*/,
618 const EnvOptions
& /*options*/) {
619 return Status::NotSupported("Not compiled with hdfs support");
622 Status
NewHdfsEnv(Env
** /*hdfs_env*/, const std::string
& /*fsname*/) {
623 return Status::NotSupported("Not compiled with hdfs support");