]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/env/env_hdfs.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / env / env_hdfs.cc
index 1eaea3a1ce5fdc56188a5a372d57ed9659460b43..3323eeb8af36f3e4a4d6071600ff09fc3a23c794 100644 (file)
 #ifndef ROCKSDB_HDFS_FILE_C
 #define ROCKSDB_HDFS_FILE_C
 
-#include <algorithm>
 #include <stdio.h>
-#include <sys/time.h>
 #include <time.h>
+#include <algorithm>
 #include <iostream>
 #include <sstream>
+#include "logging/logging.h"
 #include "rocksdb/status.h"
 #include "util/string_util.h"
 
 // will reside on the same HDFS cluster.
 //
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 namespace {
 
 // Log error message
 static Status IOError(const std::string& context, int err_number) {
-  return (err_number == ENOSPC) ?
-      Status::NoSpace(context, strerror(err_number)) :
-      Status::IOError(context, strerror(err_number));
+  return (err_number == ENOSPC)
+             ? Status::NoSpace(context, strerror(err_number))
+             : (err_number == ENOENT)
+                   ? Status::PathNotFound(context, strerror(err_number))
+                   : Status::IOError(context, strerror(err_number));
 }
 
 // assume that there is one global logger for now. It is not thread-safe,
@@ -121,8 +123,9 @@ class HdfsReadableFile : virtual public SequentialFile,
     Status s;
     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
                     filename_.c_str());
-    ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
-                                   (void*)scratch, (tSize)n);
+    tSize bytes_read =
+        hdfsPread(fileSys_, hfile_, offset, static_cast<void*>(scratch),
+                  static_cast<tSize>(n));
     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
                     filename_.c_str());
     *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
@@ -186,8 +189,12 @@ class HdfsWritableFile: public WritableFile {
   hdfsFile hfile_;
 
  public:
-  HdfsWritableFile(hdfsFS fileSys, const std::string& fname)
-      : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) {
+  HdfsWritableFile(hdfsFS fileSys, const std::string& fname,
+                   const EnvOptions& options)
+      : WritableFile(options),
+        fileSys_(fileSys),
+        filename_(fname),
+        hfile_(nullptr) {
     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
                     filename_.c_str());
     hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
@@ -222,7 +229,7 @@ class HdfsWritableFile: public WritableFile {
                     filename_.c_str());
     const char* src = data.data();
     size_t left = data.size();
-    size_t ret = hdfsWrite(fileSys_, hfile_, src, left);
+    size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
                     filename_.c_str());
     if (ret != left) {
@@ -252,7 +259,8 @@ class HdfsWritableFile: public WritableFile {
 
   // This is used by HdfsLogger to write data to the debug log file
   virtual Status Append(const char* src, size_t size) {
-    if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) {
+    if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
+        static_cast<tSize>(size)) {
       return IOError(filename_, errno);
     }
     return Status::OK();
@@ -280,11 +288,10 @@ class HdfsLogger : public Logger {
   Status HdfsCloseHelper() {
     ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
                     file_->getName().c_str());
-    Status s = file_->Close();
     if (mylog != nullptr && mylog == this) {
       mylog = nullptr;
     }
-    return s;
+    return Status::OK();
   }
 
  protected:
@@ -297,14 +304,15 @@ class HdfsLogger : public Logger {
                     file_->getName().c_str());
   }
 
-  virtual ~HdfsLogger() {
+  ~HdfsLogger() override {
     if (!closed_) {
       closed_ = true;
       HdfsCloseHelper();
     }
   }
 
-  virtual void Logv(const char* format, va_list ap) {
+  using Logger::Logv;
+  void Logv(const char* format, va_list ap) override {
     const uint64_t thread_id = (*gettid_)();
 
     // We try twice: the first time with a fixed-size stack allocated buffer,
@@ -381,8 +389,8 @@ const std::string HdfsEnv::pathsep = "/";
 
 // open a file for sequential reading
 Status HdfsEnv::NewSequentialFile(const std::string& fname,
-                                  unique_ptr<SequentialFile>* result,
-                                  const EnvOptions& options) {
+                                  std::unique_ptr<SequentialFile>* result,
+                                  const EnvOptions& /*options*/) {
   result->reset();
   HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
   if (f == nullptr || !f->isValid()) {
@@ -396,8 +404,8 @@ Status HdfsEnv::NewSequentialFile(const std::string& fname,
 
 // open a file for random reading
 Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
-                                    unique_ptr<RandomAccessFile>* result,
-                                    const EnvOptions& options) {
+                                    std::unique_ptr<RandomAccessFile>* result,
+                                    const EnvOptions& /*options*/) {
   result->reset();
   HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
   if (f == nullptr || !f->isValid()) {
@@ -411,11 +419,11 @@ Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
 
 // create a new file for writing
 Status HdfsEnv::NewWritableFile(const std::string& fname,
-                                unique_ptr<WritableFile>* result,
+                                std::unique_ptr<WritableFile>* result,
                                 const EnvOptions& options) {
   result->reset();
   Status s;
-  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
+  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
   if (f == nullptr || !f->isValid()) {
     delete f;
     *result = nullptr;
@@ -430,14 +438,16 @@ class HdfsDirectory : public Directory {
   explicit HdfsDirectory(int fd) : fd_(fd) {}
   ~HdfsDirectory() {}
 
-  virtual Status Fsync() { return Status::OK(); }
+  Status Fsync() override { return Status::OK(); }
+
+  int GetFd() const { return fd_; }
 
  private:
   int fd_;
 };
 
 Status HdfsEnv::NewDirectory(const std::string& name,
-                             unique_ptr<Directory>* result) {
+                             std::unique_ptr<Directory>* result) {
   int value = hdfsExists(fileSys_, name.c_str());
   switch (value) {
     case HDFS_EXISTS:
@@ -475,10 +485,10 @@ Status HdfsEnv::GetChildren(const std::string& path,
     pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
     if (numEntries >= 0) {
       for(int i = 0; i < numEntries; i++) {
-        char* pathname = pHdfsFileInfo[i].mName;
-        char* filename = std::rindex(pathname, '/');
-        if (filename != nullptr) {
-          result->push_back(filename+1);
+        std::string pathname(pHdfsFileInfo[i].mName);
+        size_t pos = pathname.rfind("/");
+        if (std::string::npos != pos) {
+          result->push_back(pathname.substr(pos + 1));
         }
       }
       if (pHdfsFileInfo != nullptr) {
@@ -569,20 +579,23 @@ Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
   return IOError(src, errno);
 }
 
-Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) {
+Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
   // there isn's a very good way to atomically check and create
   // a file via libhdfs
   *lock = nullptr;
   return Status::OK();
 }
 
-Status HdfsEnv::UnlockFile(FileLock* lock) {
-  return Status::OK();
-}
+Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
 
 Status HdfsEnv::NewLogger(const std::string& fname,
-                          shared_ptr<Logger>* result) {
-  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
+                          std::shared_ptr<Logger>* result) {
+  // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That
+  // option is only intended for WAL/flush/compaction writes, so turn it off in
+  // the logger.
+  EnvOptions options;
+  options.strict_bytes_per_sync = false;
+  HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
   if (f == nullptr || !f->isValid()) {
     delete f;
     *result = nullptr;
@@ -596,28 +609,40 @@ Status HdfsEnv::NewLogger(const std::string& fname,
   return Status::OK();
 }
 
+Status HdfsEnv::IsDirectory(const std::string& path, bool* is_dir) {
+  hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, path.c_str());
+  if (pFileInfo != nullptr) {
+    if (is_dir != nullptr) {
+      *is_dir = (pFileInfo->mKind == kObjectKindDirectory);
+    }
+    hdfsFreeFileInfo(pFileInfo, 1);
+    return Status::OK();
+  }
+  return IOError(path, errno);
+}
+
 // The factory method for creating an HDFS Env
 Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
   *hdfs_env = new HdfsEnv(fsname);
   return Status::OK();
 }
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE
 
 #endif // ROCKSDB_HDFS_FILE_C
 
 #else // USE_HDFS
 
 // dummy placeholders used when HDFS is not available
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
-                                  unique_ptr<SequentialFile>* /*result*/,
+                                  std::unique_ptr<SequentialFile>* /*result*/,
                                   const EnvOptions& /*options*/) {
   return Status::NotSupported("Not compiled with hdfs support");
- }
+}
 
  Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
    return Status::NotSupported("Not compiled with hdfs support");
  }
-}
+ }  // namespace ROCKSDB_NAMESPACE
 
 #endif