]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/arrow/io/hdfs_test.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / hdfs_test.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <atomic>
19 #include <cstdint>
20 #include <cstdlib>
21 #include <cstring>
22 #include <iostream>
23 #include <memory>
24 #include <sstream> // IWYU pragma: keep
25 #include <string>
26 #include <thread>
27 #include <vector>
28
29 #include <gtest/gtest.h>
30
31 #include <boost/filesystem.hpp> // NOLINT
32
33 #include "arrow/buffer.h"
34 #include "arrow/io/hdfs.h"
35 #include "arrow/io/hdfs_internal.h"
36 #include "arrow/io/interfaces.h"
37 #include "arrow/status.h"
38 #include "arrow/testing/gtest_util.h"
39 #include "arrow/testing/util.h"
40
41 namespace arrow {
42 namespace io {
43
44 std::vector<uint8_t> RandomData(int64_t size) {
45 std::vector<uint8_t> buffer(size);
46 random_bytes(size, 0, buffer.data());
47 return buffer;
48 }
49
50 class TestHadoopFileSystem : public ::testing::Test {
51 public:
52 Status MakeScratchDir() {
53 if (client_->Exists(scratch_dir_)) {
54 RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
55 }
56 return client_->MakeDirectory(scratch_dir_);
57 }
58
59 Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
60 bool append = false, int buffer_size = 0, int16_t replication = 0,
61 int default_block_size = 0) {
62 std::shared_ptr<HdfsOutputStream> file;
63 RETURN_NOT_OK(client_->OpenWritable(path, append, buffer_size, replication,
64 default_block_size, &file));
65
66 RETURN_NOT_OK(file->Write(buffer, size));
67 RETURN_NOT_OK(file->Close());
68
69 return Status::OK();
70 }
71
72 std::string ScratchPath(const std::string& name) {
73 std::stringstream ss;
74 ss << scratch_dir_ << "/" << name;
75 return ss.str();
76 }
77
78 std::string HdfsAbsPath(const std::string& relpath) {
79 std::stringstream ss;
80 ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
81 return ss.str();
82 }
83
84 // Set up shared state between unit tests
85 void SetUp() {
86 internal::LibHdfsShim* driver_shim;
87
88 client_ = nullptr;
89 scratch_dir_ =
90 boost::filesystem::unique_path(boost::filesystem::temp_directory_path() /
91 "arrow-hdfs/scratch-%%%%")
92 .string();
93
94 loaded_driver_ = false;
95
96 Status msg = ConnectLibHdfs(&driver_shim);
97 if (!msg.ok()) {
98 if (std::getenv("ARROW_HDFS_TEST_LIBHDFS_REQUIRE")) {
99 FAIL() << "Loading libhdfs failed: " << msg.ToString();
100 } else {
101 std::cout << "Loading libhdfs failed, skipping tests gracefully: "
102 << msg.ToString() << std::endl;
103 }
104 return;
105 }
106
107 loaded_driver_ = true;
108
109 const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
110 const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
111 const char* user = std::getenv("ARROW_HDFS_TEST_USER");
112
113 ASSERT_TRUE(user != nullptr) << "Set ARROW_HDFS_TEST_USER";
114
115 conf_.host = host == nullptr ? "localhost" : host;
116 conf_.user = user;
117 conf_.port = port == nullptr ? 20500 : atoi(port);
118
119 ASSERT_OK(HadoopFileSystem::Connect(&conf_, &client_));
120 }
121
122 void TearDown() {
123 if (client_) {
124 if (client_->Exists(scratch_dir_)) {
125 ARROW_EXPECT_OK(client_->Delete(scratch_dir_, true));
126 }
127 ARROW_EXPECT_OK(client_->Disconnect());
128 }
129 }
130
131 HdfsConnectionConfig conf_;
132 bool loaded_driver_;
133
134 // Resources shared amongst unit tests
135 std::string scratch_dir_;
136 std::shared_ptr<HadoopFileSystem> client_;
137 };
138
139 #define SKIP_IF_NO_DRIVER() \
140 if (!this->loaded_driver_) { \
141 GTEST_SKIP() << "Driver not loaded, skipping"; \
142 }
143
144 TEST_F(TestHadoopFileSystem, ConnectsAgain) {
145 SKIP_IF_NO_DRIVER();
146
147 std::shared_ptr<HadoopFileSystem> client;
148 ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client));
149 ASSERT_OK(client->Disconnect());
150 }
151
152 TEST_F(TestHadoopFileSystem, MultipleClients) {
153 SKIP_IF_NO_DRIVER();
154
155 ASSERT_OK(this->MakeScratchDir());
156
157 std::shared_ptr<HadoopFileSystem> client1;
158 std::shared_ptr<HadoopFileSystem> client2;
159 ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client1));
160 ASSERT_OK(HadoopFileSystem::Connect(&this->conf_, &client2));
161 ASSERT_OK(client1->Disconnect());
162
163 // client2 continues to function after equivalent client1 has shutdown
164 std::vector<HdfsPathInfo> listing;
165 ASSERT_OK(client2->ListDirectory(this->scratch_dir_, &listing));
166 ASSERT_OK(client2->Disconnect());
167 }
168
169 TEST_F(TestHadoopFileSystem, MakeDirectory) {
170 SKIP_IF_NO_DRIVER();
171
172 std::string path = this->ScratchPath("create-directory");
173
174 if (this->client_->Exists(path)) {
175 ASSERT_OK(this->client_->Delete(path, true));
176 }
177
178 ASSERT_OK(this->client_->MakeDirectory(path));
179 ASSERT_TRUE(this->client_->Exists(path));
180 std::vector<HdfsPathInfo> listing;
181 ARROW_EXPECT_OK(this->client_->ListDirectory(path, &listing));
182 ASSERT_EQ(0, listing.size());
183 ARROW_EXPECT_OK(this->client_->Delete(path, true));
184 ASSERT_FALSE(this->client_->Exists(path));
185 ASSERT_RAISES(IOError, this->client_->ListDirectory(path, &listing));
186 }
187
188 TEST_F(TestHadoopFileSystem, GetCapacityUsed) {
189 SKIP_IF_NO_DRIVER();
190
191 // Who knows what is actually in your DFS cluster, but expect it to have
192 // positive used bytes and capacity
193 int64_t nbytes = 0;
194 ASSERT_OK(this->client_->GetCapacity(&nbytes));
195 ASSERT_LT(0, nbytes);
196
197 ASSERT_OK(this->client_->GetUsed(&nbytes));
198 ASSERT_LT(0, nbytes);
199 }
200
201 TEST_F(TestHadoopFileSystem, GetPathInfo) {
202 SKIP_IF_NO_DRIVER();
203
204 HdfsPathInfo info;
205
206 ASSERT_OK(this->MakeScratchDir());
207
208 // Directory info
209 ASSERT_OK(this->client_->GetPathInfo(this->scratch_dir_, &info));
210 ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
211 ASSERT_EQ(this->HdfsAbsPath(this->scratch_dir_), info.name);
212 ASSERT_EQ(this->conf_.user, info.owner);
213
214 // TODO(wesm): test group, other attrs
215
216 auto path = this->ScratchPath("test-file");
217
218 const int size = 100;
219
220 std::vector<uint8_t> buffer = RandomData(size);
221
222 ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
223 ASSERT_OK(this->client_->GetPathInfo(path, &info));
224
225 ASSERT_EQ(ObjectType::FILE, info.kind);
226 ASSERT_EQ(this->HdfsAbsPath(path), info.name);
227 ASSERT_EQ(this->conf_.user, info.owner);
228 ASSERT_EQ(size, info.size);
229 }
230
231 TEST_F(TestHadoopFileSystem, GetPathInfoNotExist) {
232 // ARROW-2919: Test that the error message is reasonable
233 SKIP_IF_NO_DRIVER();
234
235 ASSERT_OK(this->MakeScratchDir());
236 auto path = this->ScratchPath("path-does-not-exist");
237
238 HdfsPathInfo info;
239 Status s = this->client_->GetPathInfo(path, &info);
240 ASSERT_TRUE(s.IsIOError());
241
242 const std::string error_message = s.ToString();
243
244 // Check that the file path is found in the error message
245 ASSERT_LT(error_message.find(path), std::string::npos);
246 }
247
248 TEST_F(TestHadoopFileSystem, AppendToFile) {
249 SKIP_IF_NO_DRIVER();
250
251 ASSERT_OK(this->MakeScratchDir());
252
253 auto path = this->ScratchPath("test-file");
254 const int size = 100;
255
256 std::vector<uint8_t> buffer = RandomData(size);
257 ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size));
258
259 // now append
260 ASSERT_OK(this->WriteDummyFile(path, buffer.data(), size, true));
261
262 HdfsPathInfo info;
263 ASSERT_OK(this->client_->GetPathInfo(path, &info));
264 ASSERT_EQ(size * 2, info.size);
265 }
266
267 TEST_F(TestHadoopFileSystem, ListDirectory) {
268 SKIP_IF_NO_DRIVER();
269
270 const int size = 100;
271 std::vector<uint8_t> data = RandomData(size);
272
273 auto p1 = this->ScratchPath("test-file-1");
274 auto p2 = this->ScratchPath("test-file-2");
275 auto d1 = this->ScratchPath("test-dir-1");
276
277 ASSERT_OK(this->MakeScratchDir());
278 ASSERT_OK(this->WriteDummyFile(p1, data.data(), size));
279 ASSERT_OK(this->WriteDummyFile(p2, data.data(), size / 2));
280 ASSERT_OK(this->client_->MakeDirectory(d1));
281
282 std::vector<HdfsPathInfo> listing;
283 ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
284
285 // Do it again, appends!
286 ASSERT_OK(this->client_->ListDirectory(this->scratch_dir_, &listing));
287
288 ASSERT_EQ(6, static_cast<int>(listing.size()));
289
290 // Argh, well, shouldn't expect the listing to be in any particular order
291 for (size_t i = 0; i < listing.size(); ++i) {
292 const HdfsPathInfo& info = listing[i];
293 if (info.name == this->HdfsAbsPath(p1)) {
294 ASSERT_EQ(ObjectType::FILE, info.kind);
295 ASSERT_EQ(size, info.size);
296 } else if (info.name == this->HdfsAbsPath(p2)) {
297 ASSERT_EQ(ObjectType::FILE, info.kind);
298 ASSERT_EQ(size / 2, info.size);
299 } else if (info.name == this->HdfsAbsPath(d1)) {
300 ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
301 } else {
302 FAIL() << "Unexpected path: " << info.name;
303 }
304 }
305 }
306
307 TEST_F(TestHadoopFileSystem, ReadableMethods) {
308 SKIP_IF_NO_DRIVER();
309
310 ASSERT_OK(this->MakeScratchDir());
311
312 auto path = this->ScratchPath("test-file");
313 const int size = 100;
314
315 std::vector<uint8_t> data = RandomData(size);
316 ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
317
318 std::shared_ptr<HdfsReadableFile> file;
319 ASSERT_OK(this->client_->OpenReadable(path, &file));
320
321 // Test GetSize -- move this into its own unit test if ever needed
322 ASSERT_OK_AND_EQ(size, file->GetSize());
323
324 uint8_t buffer[50];
325
326 ASSERT_OK_AND_EQ(50, file->Read(50, buffer));
327 ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
328
329 ASSERT_OK_AND_EQ(50, file->Read(50, buffer));
330 ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
331
332 // EOF
333 ASSERT_OK_AND_EQ(0, file->Read(1, buffer));
334
335 // ReadAt to EOF
336 ASSERT_OK_AND_EQ(40, file->ReadAt(60, 100, buffer));
337 ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, 40));
338
339 // Seek, Tell
340 ASSERT_OK(file->Seek(60));
341 ASSERT_OK_AND_EQ(60, file->Tell());
342 }
343
344 TEST_F(TestHadoopFileSystem, LargeFile) {
345 SKIP_IF_NO_DRIVER();
346
347 ASSERT_OK(this->MakeScratchDir());
348
349 auto path = this->ScratchPath("test-large-file");
350 const int size = 1000000;
351
352 std::vector<uint8_t> data = RandomData(size);
353 ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
354
355 std::shared_ptr<HdfsReadableFile> file;
356 ASSERT_OK(this->client_->OpenReadable(path, &file));
357
358 ASSERT_FALSE(file->closed());
359
360 ASSERT_OK_AND_ASSIGN(auto buffer, AllocateBuffer(size));
361
362 ASSERT_OK_AND_EQ(size, file->Read(size, buffer->mutable_data()));
363 ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size));
364
365 // explicit buffer size
366 std::shared_ptr<HdfsReadableFile> file2;
367 ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2));
368
369 ASSERT_OK_AND_ASSIGN(auto buffer2, AllocateBuffer(size));
370
371 ASSERT_OK_AND_EQ(size, file2->Read(size, buffer2->mutable_data()));
372 ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size));
373 }
374
375 TEST_F(TestHadoopFileSystem, RenameFile) {
376 SKIP_IF_NO_DRIVER();
377 ASSERT_OK(this->MakeScratchDir());
378
379 auto src_path = this->ScratchPath("src-file");
380 auto dst_path = this->ScratchPath("dst-file");
381 const int size = 100;
382
383 std::vector<uint8_t> data = RandomData(size);
384 ASSERT_OK(this->WriteDummyFile(src_path, data.data(), size));
385
386 ASSERT_OK(this->client_->Rename(src_path, dst_path));
387
388 ASSERT_FALSE(this->client_->Exists(src_path));
389 ASSERT_TRUE(this->client_->Exists(dst_path));
390 }
391
392 TEST_F(TestHadoopFileSystem, ChmodChown) {
393 SKIP_IF_NO_DRIVER();
394 ASSERT_OK(this->MakeScratchDir());
395
396 auto path = this->ScratchPath("path-to-chmod");
397
398 int16_t mode = 0755;
399 const int size = 100;
400
401 std::vector<uint8_t> data = RandomData(size);
402 ASSERT_OK(this->WriteDummyFile(path, data.data(), size));
403
404 HdfsPathInfo info;
405 ASSERT_OK(this->client_->Chmod(path, mode));
406 ASSERT_OK(this->client_->GetPathInfo(path, &info));
407 ASSERT_EQ(mode, info.permissions);
408
409 std::string owner = "hadoop";
410 std::string group = "hadoop";
411 ASSERT_OK(this->client_->Chown(path, owner.c_str(), group.c_str()));
412 ASSERT_OK(this->client_->GetPathInfo(path, &info));
413 ASSERT_EQ("hadoop", info.owner);
414 ASSERT_EQ("hadoop", info.group);
415 }
416
417 TEST_F(TestHadoopFileSystem, ThreadSafety) {
418 SKIP_IF_NO_DRIVER();
419 ASSERT_OK(this->MakeScratchDir());
420
421 auto src_path = this->ScratchPath("threadsafety");
422
423 std::string data = "foobar";
424 ASSERT_OK(this->WriteDummyFile(src_path, reinterpret_cast<const uint8_t*>(data.c_str()),
425 static_cast<int64_t>(data.size())));
426
427 std::shared_ptr<HdfsReadableFile> file;
428 ASSERT_OK(this->client_->OpenReadable(src_path, &file));
429
430 std::atomic<int> correct_count(0);
431 int niter = 1000;
432
433 auto ReadData = [&file, &correct_count, &data, &niter]() {
434 for (int i = 0; i < niter; ++i) {
435 std::shared_ptr<Buffer> buffer;
436 if (i % 2 == 0) {
437 ASSERT_OK_AND_ASSIGN(buffer, file->ReadAt(3, 3));
438 if (0 == memcmp(data.c_str() + 3, buffer->data(), 3)) {
439 correct_count += 1;
440 }
441 } else {
442 ASSERT_OK_AND_ASSIGN(buffer, file->ReadAt(0, 4));
443 if (0 == memcmp(data.c_str() + 0, buffer->data(), 4)) {
444 correct_count += 1;
445 }
446 }
447 }
448 };
449
450 std::thread thread1(ReadData);
451 std::thread thread2(ReadData);
452 std::thread thread3(ReadData);
453 std::thread thread4(ReadData);
454
455 thread1.join();
456 thread2.join();
457 thread3.join();
458 thread4.join();
459
460 ASSERT_EQ(niter * 4, correct_count);
461 }
462
463 } // namespace io
464 } // namespace arrow