]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/arrow/io/hdfs_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / hdfs_test.cc
diff --git a/ceph/src/arrow/cpp/src/arrow/io/hdfs_test.cc b/ceph/src/arrow/cpp/src/arrow/io/hdfs_test.cc
new file mode 100644 (file)
index 0000000..2ebf950
--- /dev/null
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <sstream>  // IWYU pragma: keep
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include <boost/filesystem.hpp>  // NOLINT
+
+#include "arrow/buffer.h"
+#include "arrow/io/hdfs.h"
+#include "arrow/io/hdfs_internal.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/status.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
+
+namespace arrow {
+namespace io {
+
+std::vector<uint8_t> RandomData(int64_t size) {
+  std::vector<uint8_t> buffer(size);
+  random_bytes(size, 0, buffer.data());
+  return buffer;
+}
+
+class TestHadoopFileSystem : public ::testing::Test {
+ public:
+  Status MakeScratchDir() {
+    if (client_->Exists(scratch_dir_)) {
+      RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
+    }
+    return client_->MakeDirectory(scratch_dir_);
+  }
+
+  Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
+                        bool append = false, int buffer_size = 0, int16_t replication = 0,
+                        int default_block_size = 0) {
+    std::shared_ptr<HdfsOutputStream> file;
+    RETURN_NOT_OK(client_->OpenWritable(path, append, buffer_size, replication,
+                                        default_block_size, &file));
+
+    RETURN_NOT_OK(file->Write(buffer, size));
+    RETURN_NOT_OK(file->Close());
+
+    return Status::OK();
+  }
+
+  std::string ScratchPath(const std::string& name) {
+    std::stringstream ss;
+    ss << scratch_dir_ << "/" << name;
+    return ss.str();
+  }
+
+  std::string HdfsAbsPath(const std::string& relpath) {
+    std::stringstream ss;
+    ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
+    return ss.str();
+  }
+
+  // Set up shared state between unit tests
+  void SetUp() {
+    internal::LibHdfsShim* driver_shim;
+
+    client_ = nullptr;
+    scratch_dir_ =
+        boost::filesystem::unique_path(boost::filesystem::temp_directory_path() /
+                                       "arrow-hdfs/scratch-%%%%")
+            .string();
+
+    loaded_driver_ = false;
+
+    Status msg = ConnectLibHdfs(&driver_shim);
+    if (!msg.ok()) {
+      if (std::getenv("ARROW_HDFS_TEST_LIBHDFS_REQUIRE")) {
+        FAIL() << "Loading libhdfs failed: " << msg.ToString();
+      } else {
+        std::cout << "Loading libhdfs failed, skipping tests gracefully: "
+                  << msg.ToString() << std::endl;
+      }
+      return;
+    }
+
+    loaded_driver_ = true;
+
+    const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
+    const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
+    const char* user = std::getenv("ARROW_HDFS_TEST_USER");
+
+    ASSERT_TRUE(user != nullptr) << "Set ARROW_HDFS_TEST_USER";
+
+    conf_.host = host == nullptr ? "localhost" : host;
+    conf_.user = user;
+    conf_.port = port == nullptr ? 20500 : atoi(port);
+
+    ASSERT_OK(HadoopFileSystem::Connect(&conf_, &client_));
+  }
+
+  void TearDown() {
+    if (client_) {
+      if (client_->Exists(scratch_dir_)) {
+        ARROW_EXPECT_OK(client_->Delete(scratch_dir_, true));
+      }
+      ARROW_EXPECT_OK(client_->Disconnect());
+    }
+  }
+
+  HdfsConnectionConfig conf_;
+  bool loaded_driver_;
+
+  // Resources shared amongst unit tests
+  std::string scratch_dir_;
+  std::shared_ptr<HadoopFileSystem> client_;
+};
+
+#define SKIP_IF_NO_DRIVER()                        \
+  if (!this->loaded_driver_) {                     \
+    GTEST_SKIP() << "Driver not loaded, skipping"; \
+  }
+
+TEST_F(TestHadoopFileSystem, ConnectsAgain) {
+  SKIP_IF_NO_DRIVER();
+
+  std::shared_ptr<HadoopFileSystem> client;
+  ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client));
+  ASSERT_OK(client->Disconnect());
+}
+
+TEST_F(TestHadoopFileSystem, MultipleClients) {
+  SKIP_IF_NO_DRIVER();
+
+  ASSERT_OK(this->MakeScratchDir());
+
+  std::shared_ptr<HadoopFileSystem> client1;
+  std::shared_ptr<HadoopFileSystem> client2;
+  ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client1));
+  ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client2));
+  ASSERT_OK(client1->Disconnect());
+
+  // client2 continues to function after equivalent client1 has shutdown
+  std::vector<HdfsPathInfo> listing;
+  ASSERT_OK(client2->ListDirectory(this->scratch_dir_, &listing));
+  ASSERT_OK(client2->Disconnect());
+}
+
+TEST_F(TestHadoopFileSystem, MakeDirectory) {
+  SKIP_IF_NO_DRIVER();
+
+  std::string path = this->ScratchPath("create-directory");
+
+  if (this->client_->Exists(path)) {
+    ASSERT_OK(this->client_->Delete(path, true));
+  }
+
+  ASSERT_OK(this->client_->MakeDirectory(path));
+  ASSERT_TRUE(this->client_->Exists(path));
+  std::vector<HdfsPathInfo> listing;
+  ARROW_EXPECT_OK(this->client_->ListDirectory(path, &listing));
+  ASSERT_EQ(0, listing.size());
+  ARROW_EXPECT_OK(this->client_->Delete(path, true));
+  ASSERT_FALSE(this->client_->Exists(path));
+  ASSERT_RAISES(IOError, this->client_->ListDirectory(path, &listing));
+}
+
+TEST_F(TestHadoopFileSystem, GetCapacityUsed) {
+  SKIP_IF_NO_DRIVER();
+
+  // Who knows what is actually in your DFS cluster, but expect it to have
+  // positive used bytes and capacity
+  int64_t nbytes = 0;
+  ASSERT_OK(this->client_->GetCapacity(&nbytes));
+  ASSERT_LT(0, nbytes);
+
+  ASSERT_OK(this->client_->GetUsed(&nbytes));
+  ASSERT_LT(0, nbytes);
+}
+
+TEST_F(TestHadoopFileSystem, GetPathInfo) {
+  SKIP_IF_NO_DRIVER();
+
+  HdfsPathInfo info;
+
+  ASSERT_OK(this->MakeScratchDir());
+
+  // Directory info
+  ASSERT_OK(this->client_->GetPathInfo(this->scratch_dir_, &info));
+  ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+  ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_), info.name);
+  ASSERT_EQ(this->conf_.user, info.owner);
+
+  // TODO(wesm): test group, other attrs
+
+  auto path = this->ScratchPath("test-file");
+
+  const int size = 100;
+
+  std::vector<uint8_t> buffer = RandomData(size);
+
+  ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
+  ASSERT_OK(this->client_->GetPathInfo(path, &info));
+
+  ASSERT_EQ(ObjectType::FILE, info.kind);
+  ASSERT_EQ(this->HdfsAbsPath(path), info.name);
+  ASSERT_EQ(this->conf_.user, info.owner);
+  ASSERT_EQ(size, info.size);
+}
+
+TEST_F(TestHadoopFileSystem, GetPathInfoNotExist) {
+  // ARROW-2919: Test that the error message is reasonable
+  SKIP_IF_NO_DRIVER();
+
+  ASSERT_OK(this->MakeScratchDir());
+  auto path = this->ScratchPath("path-does-not-exist");
+
+  HdfsPathInfo info;
+  Status s = this->client_->GetPathInfo(path, &info);
+  ASSERT_TRUE(s.IsIOError());
+
+  const std::string error_message = s.ToString();
+
+  // Check that the file path is found in the error message
+  ASSERT_LT(error_message.find(path), std::string::npos);
+}
+
+TEST_F(TestHadoopFileSystem, AppendToFile) {
+  SKIP_IF_NO_DRIVER();
+
+  ASSERT_OK(this->MakeScratchDir());
+
+  auto path = this->ScratchPath("test-file");
+  const int size = 100;
+
+  std::vector<uint8_t> buffer = RandomData(size);
+  ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
+
+  // now append
+  ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size, true));
+
+  HdfsPathInfo info;
+  ASSERT_OK(this->client_->GetPathInfo(path, &info));
+  ASSERT_EQ(size * 2, info.size);
+}
+
+TEST_F(TestHadoopFileSystem, ListDirectory) {
+  SKIP_IF_NO_DRIVER();
+
+  const int size = 100;
+  std::vector<uint8_t> data = RandomData(size);
+
+  auto p1 = this->ScratchPath("test-file-1");
+  auto p2 = this->ScratchPath("test-file-2");
+  auto d1 = this->ScratchPath("test-dir-1");
+
+  ASSERT_OK(this->MakeScratchDir());
+  ASSERT_OK(this->WriteDummyFile(p1, data.data(), size));
+  ASSERT_OK(this->WriteDummyFile(p2, data.data(), size / 2));
+  ASSERT_OK(this->client_->MakeDirectory(d1));
+
+  std::vector<HdfsPathInfo> listing;
+  ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
+
+  // Do it again, appends!
+  ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
+
+  ASSERT_EQ(6, static_cast<int>(listing.size()));
+
+  // Argh, well, shouldn't expect the listing to be in any particular order
+  for (size_t i = 0; i < listing.size(); ++i) {
+    const HdfsPathInfo& info = listing[i];
+    if (info.name == this->HdfsAbsPath(p1)) {
+      ASSERT_EQ(ObjectType::FILE, info.kind);
+      ASSERT_EQ(size, info.size);
+    } else if (info.name == this->HdfsAbsPath(p2)) {
+      ASSERT_EQ(ObjectType::FILE, info.kind);
+      ASSERT_EQ(size / 2, info.size);
+    } else if (info.name == this->HdfsAbsPath(d1)) {
+      ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+    } else {
+      FAIL() << "Unexpected path: " << info.name;
+    }
+  }
+}
+
+TEST_F(TestHadoopFileSystem, ReadableMethods) {
+  SKIP_IF_NO_DRIVER();
+
+  ASSERT_OK(this->MakeScratchDir());
+
+  auto path = this->ScratchPath("test-file");
+  const int size = 100;
+
+  std::vector<uint8_t> data = RandomData(size);
+  ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
+
+  std::shared_ptr<HdfsReadableFile> file;
+  ASSERT_OK(this->client_->OpenReadable(path, &file));
+
+  // Test GetSize -- move this into its own unit test if ever needed
+  ASSERT_OK_AND_EQ(size, file->GetSize());
+
+  uint8_t buffer[50];
+
+  ASSERT_OK_AND_EQ(50, file->Read(50, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
+
+  ASSERT_OK_AND_EQ(50, file->Read(50, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
+
+  // EOF
+  ASSERT_OK_AND_EQ(0, file->Read(1, buffer));
+
+  // ReadAt to EOF
+  ASSERT_OK_AND_EQ(40, file->ReadAt(60, 100, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, 40));
+
+  // Seek, Tell
+  ASSERT_OK(file->Seek(60));
+  ASSERT_OK_AND_EQ(60, file->Tell());
+}
+
+TEST_F(TestHadoopFileSystem, LargeFile) {
+  SKIP_IF_NO_DRIVER();
+
+  ASSERT_OK(this->MakeScratchDir());
+
+  auto path = this->ScratchPath("test-large-file");
+  const int size = 1000000;
+
+  std::vector<uint8_t> data = RandomData(size);
+  ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
+
+  std::shared_ptr<HdfsReadableFile> file;
+  ASSERT_OK(this->client_->OpenReadable(path, &file));
+
+  ASSERT_FALSE(file->closed());
+
+  ASSERT_OK_AND_ASSIGN(auto buffer, AllocateBuffer(size));
+
+  ASSERT_OK_AND_EQ(size, file->Read(size, buffer->mutable_data()));
+  ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size));
+
+  // explicit buffer size
+  std::shared_ptr<HdfsReadableFile> file2;
+  ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2));
+
+  ASSERT_OK_AND_ASSIGN(auto buffer2, AllocateBuffer(size));
+
+  ASSERT_OK_AND_EQ(size, file2->Read(size, buffer2->mutable_data()));
+  ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size));
+}
+
+TEST_F(TestHadoopFileSystem, RenameFile) {
+  SKIP_IF_NO_DRIVER();
+  ASSERT_OK(this->MakeScratchDir());
+
+  auto src_path = this->ScratchPath("src-file");
+  auto dst_path = this->ScratchPath("dst-file");
+  const int size = 100;
+
+  std::vector<uint8_t> data = RandomData(size);
+  ASSERT_OK(this->WriteDummyFile(src_path, data.data(), size));
+
+  ASSERT_OK(this->client_->Rename(src_path, dst_path));
+
+  ASSERT_FALSE(this->client_->Exists(src_path));
+  ASSERT_TRUE(this->client_->Exists(dst_path));
+}
+
+TEST_F(TestHadoopFileSystem, ChmodChown) {
+  SKIP_IF_NO_DRIVER();
+  ASSERT_OK(this->MakeScratchDir());
+
+  auto path = this->ScratchPath("path-to-chmod");
+
+  int16_t mode = 0755;
+  const int size = 100;
+
+  std::vector<uint8_t> data = RandomData(size);
+  ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
+
+  HdfsPathInfo info;
+  ASSERT_OK(this->client_->Chmod(path, mode));
+  ASSERT_OK(this->client_->GetPathInfo(path, &info));
+  ASSERT_EQ(mode, info.permissions);
+
+  std::string owner = "hadoop";
+  std::string group = "hadoop";
+  ASSERT_OK(this->client_->Chown(path, owner.c_str(), group.c_str()));
+  ASSERT_OK(this->client_->GetPathInfo(path, &info));
+  ASSERT_EQ("hadoop", info.owner);
+  ASSERT_EQ("hadoop", info.group);
+}
+
+TEST_F(TestHadoopFileSystem, ThreadSafety) {
+  SKIP_IF_NO_DRIVER();
+  ASSERT_OK(this->MakeScratchDir());
+
+  auto src_path = this->ScratchPath("threadsafety");
+
+  std::string data = "foobar";
+  ASSERT_OK(this->WriteDummyFile(src_path, reinterpret_cast<const uint8_t*>(data.c_str()),
+                                 static_cast<int64_t>(data.size())));
+
+  std::shared_ptr<HdfsReadableFile> file;
+  ASSERT_OK(this->client_->OpenReadable(src_path, &file));
+
+  std::atomic<int> correct_count(0);
+  int niter = 1000;
+
+  auto ReadData = [&file, &correct_count, &data, &niter]() {
+    for (int i = 0; i < niter; ++i) {
+      std::shared_ptr<Buffer> buffer;
+      if (i % 2 == 0) {
+        ASSERT_OK_AND_ASSIGN(buffer, file->ReadAt(3, 3));
+        if (0 == memcmp(data.c_str() + 3, buffer->data(), 3)) {
+          correct_count += 1;
+        }
+      } else {
+        ASSERT_OK_AND_ASSIGN(buffer, file->ReadAt(0, 4));
+        if (0 == memcmp(data.c_str() + 0, buffer->data(), 4)) {
+          correct_count += 1;
+        }
+      }
+    }
+  };
+
+  std::thread thread1(ReadData);
+  std::thread thread2(ReadData);
+  std::thread thread3(ReadData);
+  std::thread thread4(ReadData);
+
+  thread1.join();
+  thread2.join();
+  thread3.join();
+  thread4.join();
+
+  ASSERT_EQ(niter * 4, correct_count);
+}
+
+}  // namespace io
+}  // namespace arrow