X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frocksdb%2Fenv%2Fenv_hdfs.cc;h=3323eeb8af36f3e4a4d6071600ff09fc3a23c794;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=1eaea3a1ce5fdc56188a5a372d57ed9659460b43;hpb=11fdf7f228cb605e22a0e495ebabd3329db96b81;p=ceph.git diff --git a/ceph/src/rocksdb/env/env_hdfs.cc b/ceph/src/rocksdb/env/env_hdfs.cc index 1eaea3a1c..3323eeb8a 100644 --- a/ceph/src/rocksdb/env/env_hdfs.cc +++ b/ceph/src/rocksdb/env/env_hdfs.cc @@ -11,12 +11,12 @@ #ifndef ROCKSDB_HDFS_FILE_C #define ROCKSDB_HDFS_FILE_C -#include #include -#include #include +#include #include #include +#include "logging/logging.h" #include "rocksdb/status.h" #include "util/string_util.h" @@ -30,15 +30,17 @@ // will reside on the same HDFS cluster. // -namespace rocksdb { +namespace ROCKSDB_NAMESPACE { namespace { // Log error message static Status IOError(const std::string& context, int err_number) { - return (err_number == ENOSPC) ? - Status::NoSpace(context, strerror(err_number)) : - Status::IOError(context, strerror(err_number)); + return (err_number == ENOSPC) + ? Status::NoSpace(context, strerror(err_number)) + : (err_number == ENOENT) + ? Status::PathNotFound(context, strerror(err_number)) + : Status::IOError(context, strerror(err_number)); } // assume that there is one global logger for now. It is not thread-safe, @@ -121,8 +123,9 @@ class HdfsReadableFile : virtual public SequentialFile, Status s; ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n", filename_.c_str()); - ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset, - (void*)scratch, (tSize)n); + tSize bytes_read = + hdfsPread(fileSys_, hfile_, offset, static_cast(scratch), + static_cast(n)); ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n", filename_.c_str()); *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read); @@ -186,8 +189,12 @@ class HdfsWritableFile: public WritableFile { hdfsFile hfile_; public: - HdfsWritableFile(hdfsFS fileSys, const std::string& fname) - : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) { + HdfsWritableFile(hdfsFS fileSys, const std::string& fname, + const EnvOptions& options) + : WritableFile(options), + fileSys_(fileSys), + filename_(fname), + hfile_(nullptr) { ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n", filename_.c_str()); hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0); @@ -222,7 +229,7 @@ class HdfsWritableFile: public WritableFile { filename_.c_str()); const char* src = data.data(); size_t left = data.size(); - size_t ret = hdfsWrite(fileSys_, hfile_, src, left); + size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast(left)); ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n", filename_.c_str()); if (ret != left) { @@ -252,7 +259,8 @@ class HdfsWritableFile: public WritableFile { // This is used by HdfsLogger to write data to the debug log file virtual Status Append(const char* src, size_t size) { - if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) { + if (hdfsWrite(fileSys_, hfile_, src, static_cast(size)) != + static_cast(size)) { return IOError(filename_, errno); } return Status::OK(); @@ -280,11 +288,10 @@ class HdfsLogger : public Logger { Status HdfsCloseHelper() { ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n", file_->getName().c_str()); - Status s = file_->Close(); if (mylog != nullptr && mylog == this) { mylog = nullptr; } - return s; + return Status::OK(); } protected: @@ -297,14 +304,15 @@ class HdfsLogger : public Logger { file_->getName().c_str()); } - virtual ~HdfsLogger() { + ~HdfsLogger() override { if (!closed_) { closed_ = true; HdfsCloseHelper(); } } - virtual void Logv(const char* format, va_list ap) { + using Logger::Logv; + void Logv(const char* format, va_list ap) override { const uint64_t thread_id = (*gettid_)(); // We try twice: the first time with a fixed-size stack allocated buffer, @@ -381,8 +389,8 @@ const std::string HdfsEnv::pathsep = "/"; // open a file for sequential reading Status HdfsEnv::NewSequentialFile(const std::string& fname, - unique_ptr* result, - const EnvOptions& options) { + std::unique_ptr* result, + const EnvOptions& /*options*/) { result->reset(); HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); if (f == nullptr || !f->isValid()) { @@ -396,8 +404,8 @@ Status HdfsEnv::NewSequentialFile(const std::string& fname, // open a file for random reading Status HdfsEnv::NewRandomAccessFile(const std::string& fname, - unique_ptr* result, - const EnvOptions& options) { + std::unique_ptr* result, + const EnvOptions& /*options*/) { result->reset(); HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); if (f == nullptr || !f->isValid()) { @@ -411,11 +419,11 @@ Status HdfsEnv::NewRandomAccessFile(const std::string& fname, // create a new file for writing Status HdfsEnv::NewWritableFile(const std::string& fname, - unique_ptr* result, + std::unique_ptr* result, const EnvOptions& options) { result->reset(); Status s; - HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); + HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options); if (f == nullptr || !f->isValid()) { delete f; *result = nullptr; @@ -430,14 +438,16 @@ class HdfsDirectory : public Directory { explicit HdfsDirectory(int fd) : fd_(fd) {} ~HdfsDirectory() {} - virtual Status Fsync() { return Status::OK(); } + Status Fsync() override { return Status::OK(); } + + int GetFd() const { return fd_; } private: int fd_; }; Status HdfsEnv::NewDirectory(const std::string& name, - unique_ptr* result) { + std::unique_ptr* result) { int value = hdfsExists(fileSys_, name.c_str()); switch (value) { case HDFS_EXISTS: @@ -475,10 +485,10 @@ Status HdfsEnv::GetChildren(const std::string& path, pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries); if (numEntries >= 0) { for(int i = 0; i < numEntries; i++) { - char* pathname = pHdfsFileInfo[i].mName; - char* filename = std::rindex(pathname, '/'); - if (filename != nullptr) { - result->push_back(filename+1); + std::string pathname(pHdfsFileInfo[i].mName); + size_t pos = pathname.rfind("/"); + if (std::string::npos != pos) { + result->push_back(pathname.substr(pos + 1)); } } if (pHdfsFileInfo != nullptr) { @@ -569,20 +579,23 @@ Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) { return IOError(src, errno); } -Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) { +Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) { // there isn's a very good way to atomically check and create // a file via libhdfs *lock = nullptr; return Status::OK(); } -Status HdfsEnv::UnlockFile(FileLock* lock) { - return Status::OK(); -} +Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); } Status HdfsEnv::NewLogger(const std::string& fname, - shared_ptr* result) { - HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); + std::shared_ptr* result) { + // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That + // option is only intended for WAL/flush/compaction writes, so turn it off in + // the logger. + EnvOptions options; + options.strict_bytes_per_sync = false; + HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options); if (f == nullptr || !f->isValid()) { delete f; *result = nullptr; @@ -596,28 +609,40 @@ Status HdfsEnv::NewLogger(const std::string& fname, return Status::OK(); } +Status HdfsEnv::IsDirectory(const std::string& path, bool* is_dir) { + hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, path.c_str()); + if (pFileInfo != nullptr) { + if (is_dir != nullptr) { + *is_dir = (pFileInfo->mKind == kObjectKindDirectory); + } + hdfsFreeFileInfo(pFileInfo, 1); + return Status::OK(); + } + return IOError(path, errno); +} + // The factory method for creating an HDFS Env Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) { *hdfs_env = new HdfsEnv(fsname); return Status::OK(); } -} // namespace rocksdb +} // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_HDFS_FILE_C #else // USE_HDFS // dummy placeholders used when HDFS is not available -namespace rocksdb { +namespace ROCKSDB_NAMESPACE { Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/, - unique_ptr* /*result*/, + std::unique_ptr* /*result*/, const EnvOptions& /*options*/) { return Status::NotSupported("Not compiled with hdfs support"); - } +} Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) { return Status::NotSupported("Not compiled with hdfs support"); } -} + } // namespace ROCKSDB_NAMESPACE #endif