1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
24 #include <sstream> // IWYU pragma: keep
29 #include <gtest/gtest.h>
31 #include <boost/filesystem.hpp> // NOLINT
33 #include "arrow/buffer.h"
34 #include "arrow/io/hdfs.h"
35 #include "arrow/io/hdfs_internal.h"
36 #include "arrow/io/interfaces.h"
37 #include "arrow/status.h"
38 #include "arrow/testing/gtest_util.h"
39 #include "arrow/testing/util.h"
44 std::vector
<uint8_t> RandomData(int64_t size
) {
45 std::vector
<uint8_t> buffer(size
);
46 random_bytes(size
, 0, buffer
.data());
50 class TestHadoopFileSystem
: public ::testing::Test
{
52 Status
MakeScratchDir() {
53 if (client_
->Exists(scratch_dir_
)) {
54 RETURN_NOT_OK((client_
->Delete(scratch_dir_
, true)));
56 return client_
->MakeDirectory(scratch_dir_
);
59 Status
WriteDummyFile(const std::string
& path
, const uint8_t* buffer
, int64_t size
,
60 bool append
= false, int buffer_size
= 0, int16_t replication
= 0,
61 int default_block_size
= 0) {
62 std::shared_ptr
<HdfsOutputStream
> file
;
63 RETURN_NOT_OK(client_
->OpenWritable(path
, append
, buffer_size
, replication
,
64 default_block_size
, &file
));
66 RETURN_NOT_OK(file
->Write(buffer
, size
));
67 RETURN_NOT_OK(file
->Close());
72 std::string
ScratchPath(const std::string
& name
) {
74 ss
<< scratch_dir_
<< "/" << name
;
78 std::string
HdfsAbsPath(const std::string
& relpath
) {
80 ss
<< "hdfs://" << conf_
.host
<< ":" << conf_
.port
<< relpath
;
84 // Set up shared state between unit tests
86 internal::LibHdfsShim
* driver_shim
;
90 boost::filesystem::unique_path(boost::filesystem::temp_directory_path() /
91 "arrow-hdfs/scratch-%%%%")
94 loaded_driver_
= false;
96 Status msg
= ConnectLibHdfs(&driver_shim
);
98 if (std::getenv("ARROW_HDFS_TEST_LIBHDFS_REQUIRE")) {
99 FAIL() << "Loading libhdfs failed: " << msg
.ToString();
101 std::cout
<< "Loading libhdfs failed, skipping tests gracefully: "
102 << msg
.ToString() << std::endl
;
107 loaded_driver_
= true;
109 const char* host
= std::getenv("ARROW_HDFS_TEST_HOST");
110 const char* port
= std::getenv("ARROW_HDFS_TEST_PORT");
111 const char* user
= std::getenv("ARROW_HDFS_TEST_USER");
113 ASSERT_TRUE(user
!= nullptr) << "Set ARROW_HDFS_TEST_USER";
115 conf_
.host
= host
== nullptr ? "localhost" : host
;
117 conf_
.port
= port
== nullptr ? 20500 : atoi(port
);
119 ASSERT_OK(HadoopFileSystem::Connect(&conf_
, &client_
));
124 if (client_
->Exists(scratch_dir_
)) {
125 ARROW_EXPECT_OK(client_
->Delete(scratch_dir_
, true));
127 ARROW_EXPECT_OK(client_
->Disconnect());
131 HdfsConnectionConfig conf_
;
134 // Resources shared amongst unit tests
135 std::string scratch_dir_
;
136 std::shared_ptr
<HadoopFileSystem
> client_
;
139 #define SKIP_IF_NO_DRIVER() \
140 if (!this->loaded_driver_) { \
141 GTEST_SKIP() << "Driver not loaded, skipping"; \
144 TEST_F(TestHadoopFileSystem
, ConnectsAgain
) {
147 std::shared_ptr
<HadoopFileSystem
> client
;
148 ASSERT_OK(HadoopFileSystem::Connect(&this->conf_
, &client
));
149 ASSERT_OK(client
->Disconnect());
152 TEST_F(TestHadoopFileSystem
, MultipleClients
) {
155 ASSERT_OK(this->MakeScratchDir());
157 std::shared_ptr
<HadoopFileSystem
> client1
;
158 std::shared_ptr
<HadoopFileSystem
> client2
;
159 ASSERT_OK(HadoopFileSystem::Connect(&this->conf_
, &client1
));
160 ASSERT_OK(HadoopFileSystem::Connect(&this->conf_
, &client2
));
161 ASSERT_OK(client1
->Disconnect());
163 // client2 continues to function after equivalent client1 has shutdown
164 std::vector
<HdfsPathInfo
> listing
;
165 ASSERT_OK(client2
->ListDirectory(this->scratch_dir_
, &listing
));
166 ASSERT_OK(client2
->Disconnect());
169 TEST_F(TestHadoopFileSystem
, MakeDirectory
) {
172 std::string path
= this->ScratchPath("create-directory");
174 if (this->client_
->Exists(path
)) {
175 ASSERT_OK(this->client_
->Delete(path
, true));
178 ASSERT_OK(this->client_
->MakeDirectory(path
));
179 ASSERT_TRUE(this->client_
->Exists(path
));
180 std::vector
<HdfsPathInfo
> listing
;
181 ARROW_EXPECT_OK(this->client_
->ListDirectory(path
, &listing
));
182 ASSERT_EQ(0, listing
.size());
183 ARROW_EXPECT_OK(this->client_
->Delete(path
, true));
184 ASSERT_FALSE(this->client_
->Exists(path
));
185 ASSERT_RAISES(IOError
, this->client_
->ListDirectory(path
, &listing
));
188 TEST_F(TestHadoopFileSystem
, GetCapacityUsed
) {
191 // Who knows what is actually in your DFS cluster, but expect it to have
192 // positive used bytes and capacity
194 ASSERT_OK(this->client_
->GetCapacity(&nbytes
));
195 ASSERT_LT(0, nbytes
);
197 ASSERT_OK(this->client_
->GetUsed(&nbytes
));
198 ASSERT_LT(0, nbytes
);
201 TEST_F(TestHadoopFileSystem
, GetPathInfo
) {
206 ASSERT_OK(this->MakeScratchDir());
209 ASSERT_OK(this->client_
->GetPathInfo(this->scratch_dir_
, &info
));
210 ASSERT_EQ(ObjectType::DIRECTORY
, info
.kind
);
211 ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_
), info
.name
);
212 ASSERT_EQ(this->conf_
.user
, info
.owner
);
214 // TODO(wesm): test group, other attrs
216 auto path
= this->ScratchPath("test-file");
218 const int size
= 100;
220 std::vector
<uint8_t> buffer
= RandomData(size
);
222 ASSERT_OK(this->WriteDummyFile(path
, buffer
.data(), size
));
223 ASSERT_OK(this->client_
->GetPathInfo(path
, &info
));
225 ASSERT_EQ(ObjectType::FILE, info
.kind
);
226 ASSERT_EQ(this->HdfsAbsPath(path
), info
.name
);
227 ASSERT_EQ(this->conf_
.user
, info
.owner
);
228 ASSERT_EQ(size
, info
.size
);
231 TEST_F(TestHadoopFileSystem
, GetPathInfoNotExist
) {
232 // ARROW-2919: Test that the error message is reasonable
235 ASSERT_OK(this->MakeScratchDir());
236 auto path
= this->ScratchPath("path-does-not-exist");
239 Status s
= this->client_
->GetPathInfo(path
, &info
);
240 ASSERT_TRUE(s
.IsIOError());
242 const std::string error_message
= s
.ToString();
244 // Check that the file path is found in the error message
245 ASSERT_LT(error_message
.find(path
), std::string::npos
);
248 TEST_F(TestHadoopFileSystem
, AppendToFile
) {
251 ASSERT_OK(this->MakeScratchDir());
253 auto path
= this->ScratchPath("test-file");
254 const int size
= 100;
256 std::vector
<uint8_t> buffer
= RandomData(size
);
257 ASSERT_OK(this->WriteDummyFile(path
, buffer
.data(), size
));
260 ASSERT_OK(this->WriteDummyFile(path
, buffer
.data(), size
, true));
263 ASSERT_OK(this->client_
->GetPathInfo(path
, &info
));
264 ASSERT_EQ(size
* 2, info
.size
);
267 TEST_F(TestHadoopFileSystem
, ListDirectory
) {
270 const int size
= 100;
271 std::vector
<uint8_t> data
= RandomData(size
);
273 auto p1
= this->ScratchPath("test-file-1");
274 auto p2
= this->ScratchPath("test-file-2");
275 auto d1
= this->ScratchPath("test-dir-1");
277 ASSERT_OK(this->MakeScratchDir());
278 ASSERT_OK(this->WriteDummyFile(p1
, data
.data(), size
));
279 ASSERT_OK(this->WriteDummyFile(p2
, data
.data(), size
/ 2));
280 ASSERT_OK(this->client_
->MakeDirectory(d1
));
282 std::vector
<HdfsPathInfo
> listing
;
283 ASSERT_OK(this->client_
->ListDirectory(this->scratch_dir_
, &listing
));
285 // Do it again, appends!
286 ASSERT_OK(this->client_
->ListDirectory(this->scratch_dir_
, &listing
));
288 ASSERT_EQ(6, static_cast<int>(listing
.size()));
290 // Argh, well, shouldn't expect the listing to be in any particular order
291 for (size_t i
= 0; i
< listing
.size(); ++i
) {
292 const HdfsPathInfo
& info
= listing
[i
];
293 if (info
.name
== this->HdfsAbsPath(p1
)) {
294 ASSERT_EQ(ObjectType::FILE, info
.kind
);
295 ASSERT_EQ(size
, info
.size
);
296 } else if (info
.name
== this->HdfsAbsPath(p2
)) {
297 ASSERT_EQ(ObjectType::FILE, info
.kind
);
298 ASSERT_EQ(size
/ 2, info
.size
);
299 } else if (info
.name
== this->HdfsAbsPath(d1
)) {
300 ASSERT_EQ(ObjectType::DIRECTORY
, info
.kind
);
302 FAIL() << "Unexpected path: " << info
.name
;
307 TEST_F(TestHadoopFileSystem
, ReadableMethods
) {
310 ASSERT_OK(this->MakeScratchDir());
312 auto path
= this->ScratchPath("test-file");
313 const int size
= 100;
315 std::vector
<uint8_t> data
= RandomData(size
);
316 ASSERT_OK(this->WriteDummyFile(path
, data
.data(), size
));
318 std::shared_ptr
<HdfsReadableFile
> file
;
319 ASSERT_OK(this->client_
->OpenReadable(path
, &file
));
321 // Test GetSize -- move this into its own unit test if ever needed
322 ASSERT_OK_AND_EQ(size
, file
->GetSize());
326 ASSERT_OK_AND_EQ(50, file
->Read(50, buffer
));
327 ASSERT_EQ(0, std::memcmp(buffer
, data
.data(), 50));
329 ASSERT_OK_AND_EQ(50, file
->Read(50, buffer
));
330 ASSERT_EQ(0, std::memcmp(buffer
, data
.data() + 50, 50));
333 ASSERT_OK_AND_EQ(0, file
->Read(1, buffer
));
336 ASSERT_OK_AND_EQ(40, file
->ReadAt(60, 100, buffer
));
337 ASSERT_EQ(0, std::memcmp(buffer
, data
.data() + 60, 40));
340 ASSERT_OK(file
->Seek(60));
341 ASSERT_OK_AND_EQ(60, file
->Tell());
344 TEST_F(TestHadoopFileSystem
, LargeFile
) {
347 ASSERT_OK(this->MakeScratchDir());
349 auto path
= this->ScratchPath("test-large-file");
350 const int size
= 1000000;
352 std::vector
<uint8_t> data
= RandomData(size
);
353 ASSERT_OK(this->WriteDummyFile(path
, data
.data(), size
));
355 std::shared_ptr
<HdfsReadableFile
> file
;
356 ASSERT_OK(this->client_
->OpenReadable(path
, &file
));
358 ASSERT_FALSE(file
->closed());
360 ASSERT_OK_AND_ASSIGN(auto buffer
, AllocateBuffer(size
));
362 ASSERT_OK_AND_EQ(size
, file
->Read(size
, buffer
->mutable_data()));
363 ASSERT_EQ(0, std::memcmp(buffer
->data(), data
.data(), size
));
365 // explicit buffer size
366 std::shared_ptr
<HdfsReadableFile
> file2
;
367 ASSERT_OK(this->client_
->OpenReadable(path
, 1 << 18, &file2
));
369 ASSERT_OK_AND_ASSIGN(auto buffer2
, AllocateBuffer(size
));
371 ASSERT_OK_AND_EQ(size
, file2
->Read(size
, buffer2
->mutable_data()));
372 ASSERT_EQ(0, std::memcmp(buffer2
->data(), data
.data(), size
));
375 TEST_F(TestHadoopFileSystem
, RenameFile
) {
377 ASSERT_OK(this->MakeScratchDir());
379 auto src_path
= this->ScratchPath("src-file");
380 auto dst_path
= this->ScratchPath("dst-file");
381 const int size
= 100;
383 std::vector
<uint8_t> data
= RandomData(size
);
384 ASSERT_OK(this->WriteDummyFile(src_path
, data
.data(), size
));
386 ASSERT_OK(this->client_
->Rename(src_path
, dst_path
));
388 ASSERT_FALSE(this->client_
->Exists(src_path
));
389 ASSERT_TRUE(this->client_
->Exists(dst_path
));
392 TEST_F(TestHadoopFileSystem
, ChmodChown
) {
394 ASSERT_OK(this->MakeScratchDir());
396 auto path
= this->ScratchPath("path-to-chmod");
399 const int size
= 100;
401 std::vector
<uint8_t> data
= RandomData(size
);
402 ASSERT_OK(this->WriteDummyFile(path
, data
.data(), size
));
405 ASSERT_OK(this->client_
->Chmod(path
, mode
));
406 ASSERT_OK(this->client_
->GetPathInfo(path
, &info
));
407 ASSERT_EQ(mode
, info
.permissions
);
409 std::string owner
= "hadoop";
410 std::string group
= "hadoop";
411 ASSERT_OK(this->client_
->Chown(path
, owner
.c_str(), group
.c_str()));
412 ASSERT_OK(this->client_
->GetPathInfo(path
, &info
));
413 ASSERT_EQ("hadoop", info
.owner
);
414 ASSERT_EQ("hadoop", info
.group
);
417 TEST_F(TestHadoopFileSystem
, ThreadSafety
) {
419 ASSERT_OK(this->MakeScratchDir());
421 auto src_path
= this->ScratchPath("threadsafety");
423 std::string data
= "foobar";
424 ASSERT_OK(this->WriteDummyFile(src_path
, reinterpret_cast<const uint8_t*>(data
.c_str()),
425 static_cast<int64_t>(data
.size())));
427 std::shared_ptr
<HdfsReadableFile
> file
;
428 ASSERT_OK(this->client_
->OpenReadable(src_path
, &file
));
430 std::atomic
<int> correct_count(0);
433 auto ReadData
= [&file
, &correct_count
, &data
, &niter
]() {
434 for (int i
= 0; i
< niter
; ++i
) {
435 std::shared_ptr
<Buffer
> buffer
;
437 ASSERT_OK_AND_ASSIGN(buffer
, file
->ReadAt(3, 3));
438 if (0 == memcmp(data
.c_str() + 3, buffer
->data(), 3)) {
442 ASSERT_OK_AND_ASSIGN(buffer
, file
->ReadAt(0, 4));
443 if (0 == memcmp(data
.c_str() + 0, buffer
->data(), 4)) {
450 std::thread
thread1(ReadData
);
451 std::thread
thread2(ReadData
);
452 std::thread
thread3(ReadData
);
453 std::thread
thread4(ReadData
);
460 ASSERT_EQ(niter
* 4, correct_count
);