]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/env/env_hdfs.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
7 #include "rocksdb/env.h"
8 #include "hdfs/env_hdfs.h"
11 #ifndef ROCKSDB_HDFS_FILE_C
12 #define ROCKSDB_HDFS_FILE_C
20 #include "rocksdb/status.h"
21 #include "util/string_util.h"
24 #define HDFS_DOESNT_EXIST -1
25 #define HDFS_SUCCESS 0
28 // This file defines an HDFS environment for rocksdb. It uses the libhdfs
29 // api to access HDFS. All HDFS files created by one instance of rocksdb
30 // will reside on the same HDFS cluster.
38 static Status
IOError(const std::string
& context
, int err_number
) {
39 return (err_number
== ENOSPC
) ?
40 Status::NoSpace(context
, strerror(err_number
)) :
41 Status::IOError(context
, strerror(err_number
));
44 // assume that there is one global logger for now. It is not thread-safe,
45 // but need not be because the logger is initialized at db-open time.
46 static Logger
* mylog
= nullptr;
48 // Used for reading a file from HDFS. It implements both sequential-read
49 // access methods as well as random read access methods.
50 class HdfsReadableFile
: virtual public SequentialFile
,
51 virtual public RandomAccessFile
{
54 std::string filename_
;
58 HdfsReadableFile(hdfsFS fileSys
, const std::string
& fname
)
59 : fileSys_(fileSys
), filename_(fname
), hfile_(nullptr) {
60 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile opening file %s\n",
62 hfile_
= hdfsOpenFile(fileSys_
, filename_
.c_str(), O_RDONLY
, 0, 0, 0);
63 ROCKS_LOG_DEBUG(mylog
,
64 "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
65 filename_
.c_str(), hfile_
);
68 virtual ~HdfsReadableFile() {
69 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile closing file %s\n",
71 hdfsCloseFile(fileSys_
, hfile_
);
72 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile closed file %s\n",
78 return hfile_
!= nullptr;
81 // sequential access, read data at current offset in file
82 virtual Status
Read(size_t n
, Slice
* result
, char* scratch
) {
84 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile reading %s %ld\n",
85 filename_
.c_str(), n
);
87 char* buffer
= scratch
;
88 size_t total_bytes_read
= 0;
90 tSize remaining_bytes
= (tSize
)n
;
92 // Read a total of n bytes repeatedly until we hit error or eof
93 while (remaining_bytes
> 0) {
94 bytes_read
= hdfsRead(fileSys_
, hfile_
, buffer
, remaining_bytes
);
95 if (bytes_read
<= 0) {
98 assert(bytes_read
<= remaining_bytes
);
100 total_bytes_read
+= bytes_read
;
101 remaining_bytes
-= bytes_read
;
102 buffer
+= bytes_read
;
104 assert(total_bytes_read
<= n
);
106 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile read %s\n",
109 if (bytes_read
< 0) {
110 s
= IOError(filename_
, errno
);
112 *result
= Slice(scratch
, total_bytes_read
);
118 // random access, read data from specified offset in file
119 virtual Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
120 char* scratch
) const {
122 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile preading %s\n",
124 ssize_t bytes_read
= hdfsPread(fileSys_
, hfile_
, offset
,
125 (void*)scratch
, (tSize
)n
);
126 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile pread %s\n",
128 *result
= Slice(scratch
, (bytes_read
< 0) ? 0 : bytes_read
);
129 if (bytes_read
< 0) {
130 // An error: return a non-ok status
131 s
= IOError(filename_
, errno
);
136 virtual Status
Skip(uint64_t n
) {
137 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile skip %s\n",
139 // get current offset from file
140 tOffset current
= hdfsTell(fileSys_
, hfile_
);
142 return IOError(filename_
, errno
);
144 // seek to new offset in file
145 tOffset newoffset
= current
+ n
;
146 int val
= hdfsSeek(fileSys_
, hfile_
, newoffset
);
148 return IOError(filename_
, errno
);
155 // returns true if we are at the end of file, false otherwise
157 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile feof %s\n",
159 if (hdfsTell(fileSys_
, hfile_
) == fileSize()) {
165 // the current size of the file
167 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsReadableFile fileSize %s\n",
169 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, filename_
.c_str());
171 if (pFileInfo
!= nullptr) {
172 size
= pFileInfo
->mSize
;
173 hdfsFreeFileInfo(pFileInfo
, 1);
175 throw HdfsFatalException("fileSize on unknown file " + filename_
);
181 // Appends to an existing file in HDFS.
182 class HdfsWritableFile
: public WritableFile
{
185 std::string filename_
;
189 HdfsWritableFile(hdfsFS fileSys
, const std::string
& fname
)
190 : fileSys_(fileSys
), filename_(fname
) , hfile_(nullptr) {
191 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile opening %s\n",
193 hfile_
= hdfsOpenFile(fileSys_
, filename_
.c_str(), O_WRONLY
, 0, 0, 0);
194 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile opened %s\n",
196 assert(hfile_
!= nullptr);
198 virtual ~HdfsWritableFile() {
199 if (hfile_
!= nullptr) {
200 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closing %s\n",
202 hdfsCloseFile(fileSys_
, hfile_
);
203 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closed %s\n",
209 // If the file was successfully created, then this returns true.
210 // Otherwise returns false.
212 return hfile_
!= nullptr;
215 // The name of the file, mostly needed for debug logging.
216 const std::string
& getName() {
220 virtual Status
Append(const Slice
& data
) {
221 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Append %s\n",
223 const char* src
= data
.data();
224 size_t left
= data
.size();
225 size_t ret
= hdfsWrite(fileSys_
, hfile_
, src
, left
);
226 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Appended %s\n",
229 return IOError(filename_
, errno
);
234 virtual Status
Flush() {
238 virtual Status
Sync() {
240 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Sync %s\n",
242 if (hdfsFlush(fileSys_
, hfile_
) == -1) {
243 return IOError(filename_
, errno
);
245 if (hdfsHSync(fileSys_
, hfile_
) == -1) {
246 return IOError(filename_
, errno
);
248 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile Synced %s\n",
253 // This is used by HdfsLogger to write data to the debug log file
254 virtual Status
Append(const char* src
, size_t size
) {
255 if (hdfsWrite(fileSys_
, hfile_
, src
, size
) != (tSize
)size
) {
256 return IOError(filename_
, errno
);
261 virtual Status
Close() {
262 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closing %s\n",
264 if (hdfsCloseFile(fileSys_
, hfile_
) != 0) {
265 return IOError(filename_
, errno
);
267 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsWritableFile closed %s\n",
274 // The object that implements the debug logs to reside in HDFS.
275 class HdfsLogger
: public Logger
{
277 HdfsWritableFile
* file_
;
278 uint64_t (*gettid_
)(); // Return the thread id for the current thread
281 HdfsLogger(HdfsWritableFile
* f
, uint64_t (*gettid
)())
282 : file_(f
), gettid_(gettid
) {
283 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsLogger opened %s\n",
284 file_
->getName().c_str());
287 virtual ~HdfsLogger() {
288 ROCKS_LOG_DEBUG(mylog
, "[hdfs] HdfsLogger closed %s\n",
289 file_
->getName().c_str());
291 if (mylog
!= nullptr && mylog
== this) {
296 virtual void Logv(const char* format
, va_list ap
) {
297 const uint64_t thread_id
= (*gettid_
)();
299 // We try twice: the first time with a fixed-size stack allocated buffer,
300 // and the second time with a much larger dynamically allocated buffer.
302 for (int iter
= 0; iter
< 2; iter
++) {
306 bufsize
= sizeof(buffer
);
310 base
= new char[bufsize
];
313 char* limit
= base
+ bufsize
;
315 struct timeval now_tv
;
316 gettimeofday(&now_tv
, nullptr);
317 const time_t seconds
= now_tv
.tv_sec
;
319 localtime_r(&seconds
, &t
);
320 p
+= snprintf(p
, limit
- p
,
321 "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
328 static_cast<int>(now_tv
.tv_usec
),
329 static_cast<long long unsigned int>(thread_id
));
334 va_copy(backup_ap
, ap
);
335 p
+= vsnprintf(p
, limit
- p
, format
, backup_ap
);
339 // Truncate to available space if necessary
342 continue; // Try again with larger buffer
348 // Add newline if necessary
349 if (p
== base
|| p
[-1] != '\n') {
354 file_
->Append(base
, p
-base
);
356 if (base
!= buffer
) {
366 // Finally, the hdfs environment
368 const std::string
HdfsEnv::kProto
= "hdfs://";
369 const std::string
HdfsEnv::pathsep
= "/";
371 // open a file for sequential reading
372 Status
HdfsEnv::NewSequentialFile(const std::string
& fname
,
373 unique_ptr
<SequentialFile
>* result
,
374 const EnvOptions
& options
) {
376 HdfsReadableFile
* f
= new HdfsReadableFile(fileSys_
, fname
);
377 if (f
== nullptr || !f
->isValid()) {
380 return IOError(fname
, errno
);
382 result
->reset(dynamic_cast<SequentialFile
*>(f
));
386 // open a file for random reading
387 Status
HdfsEnv::NewRandomAccessFile(const std::string
& fname
,
388 unique_ptr
<RandomAccessFile
>* result
,
389 const EnvOptions
& options
) {
391 HdfsReadableFile
* f
= new HdfsReadableFile(fileSys_
, fname
);
392 if (f
== nullptr || !f
->isValid()) {
395 return IOError(fname
, errno
);
397 result
->reset(dynamic_cast<RandomAccessFile
*>(f
));
401 // create a new file for writing
402 Status
HdfsEnv::NewWritableFile(const std::string
& fname
,
403 unique_ptr
<WritableFile
>* result
,
404 const EnvOptions
& options
) {
407 HdfsWritableFile
* f
= new HdfsWritableFile(fileSys_
, fname
);
408 if (f
== nullptr || !f
->isValid()) {
411 return IOError(fname
, errno
);
413 result
->reset(dynamic_cast<WritableFile
*>(f
));
417 class HdfsDirectory
: public Directory
{
419 explicit HdfsDirectory(int fd
) : fd_(fd
) {}
422 virtual Status
Fsync() { return Status::OK(); }
428 Status
HdfsEnv::NewDirectory(const std::string
& name
,
429 unique_ptr
<Directory
>* result
) {
430 int value
= hdfsExists(fileSys_
, name
.c_str());
433 result
->reset(new HdfsDirectory(0));
435 default: // fail if the directory doesn't exist
436 ROCKS_LOG_FATAL(mylog
, "NewDirectory hdfsExists call failed");
437 throw HdfsFatalException("hdfsExists call failed with error " +
438 ToString(value
) + " on path " + name
+
443 Status
HdfsEnv::FileExists(const std::string
& fname
) {
444 int value
= hdfsExists(fileSys_
, fname
.c_str());
448 case HDFS_DOESNT_EXIST
:
449 return Status::NotFound();
450 default: // anything else should be an error
451 ROCKS_LOG_FATAL(mylog
, "FileExists hdfsExists call failed");
452 return Status::IOError("hdfsExists call failed with error " +
453 ToString(value
) + " on path " + fname
+ ".\n");
457 Status
HdfsEnv::GetChildren(const std::string
& path
,
458 std::vector
<std::string
>* result
) {
459 int value
= hdfsExists(fileSys_
, path
.c_str());
461 case HDFS_EXISTS
: { // directory exists
463 hdfsFileInfo
* pHdfsFileInfo
= 0;
464 pHdfsFileInfo
= hdfsListDirectory(fileSys_
, path
.c_str(), &numEntries
);
465 if (numEntries
>= 0) {
466 for(int i
= 0; i
< numEntries
; i
++) {
467 char* pathname
= pHdfsFileInfo
[i
].mName
;
468 char* filename
= std::rindex(pathname
, '/');
469 if (filename
!= nullptr) {
470 result
->push_back(filename
+1);
473 if (pHdfsFileInfo
!= nullptr) {
474 hdfsFreeFileInfo(pHdfsFileInfo
, numEntries
);
477 // numEntries < 0 indicates error
478 ROCKS_LOG_FATAL(mylog
, "hdfsListDirectory call failed with error ");
479 throw HdfsFatalException(
480 "hdfsListDirectory call failed negative error.\n");
484 case HDFS_DOESNT_EXIST
: // directory does not exist, exit
485 return Status::NotFound();
486 default: // anything else should be an error
487 ROCKS_LOG_FATAL(mylog
, "GetChildren hdfsExists call failed");
488 throw HdfsFatalException("hdfsExists call failed with error " +
489 ToString(value
) + ".\n");
494 Status
HdfsEnv::DeleteFile(const std::string
& fname
) {
495 if (hdfsDelete(fileSys_
, fname
.c_str(), 1) == 0) {
498 return IOError(fname
, errno
);
501 Status
HdfsEnv::CreateDir(const std::string
& name
) {
502 if (hdfsCreateDirectory(fileSys_
, name
.c_str()) == 0) {
505 return IOError(name
, errno
);
508 Status
HdfsEnv::CreateDirIfMissing(const std::string
& name
) {
509 const int value
= hdfsExists(fileSys_
, name
.c_str());
510 // Not atomic. state might change b/w hdfsExists and CreateDir.
514 case HDFS_DOESNT_EXIST
:
515 return CreateDir(name
);
516 default: // anything else should be an error
517 ROCKS_LOG_FATAL(mylog
, "CreateDirIfMissing hdfsExists call failed");
518 throw HdfsFatalException("hdfsExists call failed with error " +
519 ToString(value
) + ".\n");
523 Status
HdfsEnv::DeleteDir(const std::string
& name
) {
524 return DeleteFile(name
);
527 Status
HdfsEnv::GetFileSize(const std::string
& fname
, uint64_t* size
) {
529 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, fname
.c_str());
530 if (pFileInfo
!= nullptr) {
531 *size
= pFileInfo
->mSize
;
532 hdfsFreeFileInfo(pFileInfo
, 1);
535 return IOError(fname
, errno
);
538 Status
HdfsEnv::GetFileModificationTime(const std::string
& fname
,
540 hdfsFileInfo
* pFileInfo
= hdfsGetPathInfo(fileSys_
, fname
.c_str());
541 if (pFileInfo
!= nullptr) {
542 *time
= static_cast<uint64_t>(pFileInfo
->mLastMod
);
543 hdfsFreeFileInfo(pFileInfo
, 1);
546 return IOError(fname
, errno
);
550 // The rename is not atomic. HDFS does not allow a renaming if the
551 // target already exists. So, we delete the target before attempting the
553 Status
HdfsEnv::RenameFile(const std::string
& src
, const std::string
& target
) {
554 hdfsDelete(fileSys_
, target
.c_str(), 1);
555 if (hdfsRename(fileSys_
, src
.c_str(), target
.c_str()) == 0) {
558 return IOError(src
, errno
);
561 Status
HdfsEnv::LockFile(const std::string
& fname
, FileLock
** lock
) {
562 // there isn's a very good way to atomically check and create
563 // a file via libhdfs
568 Status
HdfsEnv::UnlockFile(FileLock
* lock
) {
572 Status
HdfsEnv::NewLogger(const std::string
& fname
,
573 shared_ptr
<Logger
>* result
) {
574 HdfsWritableFile
* f
= new HdfsWritableFile(fileSys_
, fname
);
575 if (f
== nullptr || !f
->isValid()) {
578 return IOError(fname
, errno
);
580 HdfsLogger
* h
= new HdfsLogger(f
, &HdfsEnv::gettid
);
582 if (mylog
== nullptr) {
583 // mylog = h; // uncomment this for detailed logging
588 // The factory method for creating an HDFS Env
589 Status
NewHdfsEnv(Env
** hdfs_env
, const std::string
& fsname
) {
590 *hdfs_env
= new HdfsEnv(fsname
);
593 } // namespace rocksdb
595 #endif // ROCKSDB_HDFS_FILE_C
599 // dummy placeholders used when HDFS is not available
601 Status
HdfsEnv::NewSequentialFile(const std::string
& fname
,
602 unique_ptr
<SequentialFile
>* result
,
603 const EnvOptions
& options
) {
604 return Status::NotSupported("Not compiled with hdfs support");
607 Status
NewHdfsEnv(Env
** hdfs_env
, const std::string
& fsname
) {
608 return Status::NotSupported("Not compiled with hdfs support");