]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/cpp/src/arrow/io/hdfs_internal.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / arrow / io / hdfs_internal.cc
CommitLineData
1d09f67e
TL
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// This shim interface to libhdfs (for runtime shared library loading) has been
19// adapted from the SFrame project, released under the ASF-compatible 3-clause
20// BSD license
21//
22// Using this required having the $JAVA_HOME and $HADOOP_HOME environment
23// variables set, so that libjvm and libhdfs can be located easily
24
25// Copyright (C) 2015 Dato, Inc.
26// All rights reserved.
27//
28// This software may be modified and distributed under the terms
29// of the BSD license. See the LICENSE file for details.
30
31#include "arrow/io/hdfs_internal.h"
32
33#include <cstdint>
34#include <cstdlib>
35#include <mutex>
36#include <sstream> // IWYU pragma: keep
37#include <string>
38#include <utility>
39#include <vector>
40
41#ifndef _WIN32
42#include <dlfcn.h>
43#endif
44
45#include "arrow/result.h"
46#include "arrow/status.h"
47#include "arrow/util/io_util.h"
48#include "arrow/util/logging.h"
49
50namespace arrow {
51
52using internal::GetEnvVarNative;
53using internal::PlatformFilename;
54#ifdef _WIN32
55using internal::WinErrorMessage;
56#endif
57
58namespace io {
59namespace internal {
60
61namespace {
62
63void* GetLibrarySymbol(LibraryHandle handle, const char* symbol) {
64 if (handle == NULL) return NULL;
65#ifndef _WIN32
66 return dlsym(handle, symbol);
67#else
68
69 void* ret = reinterpret_cast<void*>(GetProcAddress(handle, symbol));
70 if (ret == NULL) {
71 // logstream(LOG_INFO) << "GetProcAddress error: "
72 // << get_last_err_str(GetLastError()) << std::endl;
73 }
74 return ret;
75#endif
76}
77
78#define GET_SYMBOL_REQUIRED(SHIM, SYMBOL_NAME) \
79 do { \
80 if (!SHIM->SYMBOL_NAME) { \
81 *reinterpret_cast<void**>(&SHIM->SYMBOL_NAME) = \
82 GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \
83 } \
84 if (!SHIM->SYMBOL_NAME) \
85 return Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \
86 } while (0)
87
88#define GET_SYMBOL(SHIM, SYMBOL_NAME) \
89 if (!SHIM->SYMBOL_NAME) { \
90 *reinterpret_cast<void**>(&SHIM->SYMBOL_NAME) = \
91 GetLibrarySymbol(SHIM->handle, "" #SYMBOL_NAME); \
92 }
93
94LibraryHandle libjvm_handle = nullptr;
95
96// Helper functions for dlopens
97Result<std::vector<PlatformFilename>> get_potential_libjvm_paths();
98Result<std::vector<PlatformFilename>> get_potential_libhdfs_paths();
99Result<LibraryHandle> try_dlopen(const std::vector<PlatformFilename>& potential_paths,
100 const char* name);
101
102Result<std::vector<PlatformFilename>> MakeFilenameVector(
103 const std::vector<std::string>& names) {
104 std::vector<PlatformFilename> filenames(names.size());
105 for (size_t i = 0; i < names.size(); ++i) {
106 ARROW_ASSIGN_OR_RAISE(filenames[i], PlatformFilename::FromString(names[i]));
107 }
108 return filenames;
109}
110
111void AppendEnvVarFilename(const char* var_name,
112 std::vector<PlatformFilename>* filenames) {
113 auto maybe_env_var = GetEnvVarNative(var_name);
114 if (maybe_env_var.ok()) {
115 filenames->emplace_back(std::move(*maybe_env_var));
116 }
117}
118
119void AppendEnvVarFilename(const char* var_name, const char* suffix,
120 std::vector<PlatformFilename>* filenames) {
121 auto maybe_env_var = GetEnvVarNative(var_name);
122 if (maybe_env_var.ok()) {
123 auto maybe_env_var_with_suffix =
124 PlatformFilename(std::move(*maybe_env_var)).Join(suffix);
125 if (maybe_env_var_with_suffix.ok()) {
126 filenames->emplace_back(std::move(*maybe_env_var_with_suffix));
127 }
128 }
129}
130
131void InsertEnvVarFilename(const char* var_name,
132 std::vector<PlatformFilename>* filenames) {
133 auto maybe_env_var = GetEnvVarNative(var_name);
134 if (maybe_env_var.ok()) {
135 filenames->emplace(filenames->begin(), PlatformFilename(std::move(*maybe_env_var)));
136 }
137}
138
139Result<std::vector<PlatformFilename>> get_potential_libhdfs_paths() {
140 std::vector<PlatformFilename> potential_paths;
141 std::string file_name;
142
143// OS-specific file name
144#ifdef _WIN32
145 file_name = "hdfs.dll";
146#elif __APPLE__
147 file_name = "libhdfs.dylib";
148#else
149 file_name = "libhdfs.so";
150#endif
151
152 // Common paths
153 ARROW_ASSIGN_OR_RAISE(auto search_paths, MakeFilenameVector({"", "."}));
154
155 // Path from environment variable
156 AppendEnvVarFilename("HADOOP_HOME", "lib/native", &search_paths);
157 AppendEnvVarFilename("ARROW_LIBHDFS_DIR", &search_paths);
158
159 // All paths with file name
160 for (const auto& path : search_paths) {
161 ARROW_ASSIGN_OR_RAISE(auto full_path, path.Join(file_name));
162 potential_paths.push_back(std::move(full_path));
163 }
164
165 return potential_paths;
166}
167
168Result<std::vector<PlatformFilename>> get_potential_libjvm_paths() {
169 std::vector<PlatformFilename> potential_paths;
170
171 std::vector<PlatformFilename> search_prefixes;
172 std::vector<PlatformFilename> search_suffixes;
173 std::string file_name;
174
175// From heuristics
176#ifdef _WIN32
177 ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""}));
178 ARROW_ASSIGN_OR_RAISE(search_suffixes,
179 MakeFilenameVector({"/jre/bin/server", "/bin/server"}));
180 file_name = "jvm.dll";
181#elif __APPLE__
182 ARROW_ASSIGN_OR_RAISE(search_prefixes, MakeFilenameVector({""}));
183 ARROW_ASSIGN_OR_RAISE(search_suffixes,
184 MakeFilenameVector({"/jre/lib/server", "/lib/server"}));
185 file_name = "libjvm.dylib";
186
187// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are
188// expecting users to set an environment variable
189#else
190#if defined(__aarch64__)
191 const std::string prefix_arch{"arm64"};
192 const std::string suffix_arch{"aarch64"};
193#else
194 const std::string prefix_arch{"amd64"};
195 const std::string suffix_arch{"amd64"};
196#endif
197 ARROW_ASSIGN_OR_RAISE(
198 search_prefixes,
199 MakeFilenameVector({
200 "/usr/lib/jvm/default-java", // ubuntu / debian distros
201 "/usr/lib/jvm/java", // rhel6
202 "/usr/lib/jvm", // centos6
203 "/usr/lib64/jvm", // opensuse 13
204 "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros
205 "/usr/local/lib/jvm/java", // alt rhel6
206 "/usr/local/lib/jvm", // alt centos6
207 "/usr/local/lib64/jvm", // alt opensuse 13
208 "/usr/local/lib/jvm/java-8-openjdk-" +
209 prefix_arch, // alt ubuntu / debian distros
210 "/usr/lib/jvm/java-8-openjdk-" + prefix_arch, // alt ubuntu / debian distros
211 "/usr/local/lib/jvm/java-7-openjdk-" +
212 prefix_arch, // alt ubuntu / debian distros
213 "/usr/lib/jvm/java-7-openjdk-" + prefix_arch, // alt ubuntu / debian distros
214 "/usr/local/lib/jvm/java-6-openjdk-" +
215 prefix_arch, // alt ubuntu / debian distros
216 "/usr/lib/jvm/java-6-openjdk-" + prefix_arch, // alt ubuntu / debian distros
217 "/usr/lib/jvm/java-7-oracle", // alt ubuntu
218 "/usr/lib/jvm/java-8-oracle", // alt ubuntu
219 "/usr/lib/jvm/java-6-oracle", // alt ubuntu
220 "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu
221 "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu
222 "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu
223 "/usr/lib/jvm/default", // alt centos
224 "/usr/java/latest" // alt centos
225 }));
226 ARROW_ASSIGN_OR_RAISE(
227 search_suffixes,
228 MakeFilenameVector({"", "/lib/server", "/jre/lib/" + suffix_arch + "/server",
229 "/lib/" + suffix_arch + "/server"}));
230 file_name = "libjvm.so";
231#endif
232
233 // From direct environment variable
234 InsertEnvVarFilename("JAVA_HOME", &search_prefixes);
235
236 // Generate cross product between search_prefixes, search_suffixes, and file_name
237 for (auto& prefix : search_prefixes) {
238 for (auto& suffix : search_suffixes) {
239 ARROW_ASSIGN_OR_RAISE(auto path, prefix.Join(suffix).Join(file_name));
240 potential_paths.push_back(std::move(path));
241 }
242 }
243
244 return potential_paths;
245}
246
247#ifndef _WIN32
248Result<LibraryHandle> try_dlopen(const std::vector<PlatformFilename>& potential_paths,
249 const char* name) {
250 std::string error_message = "unknown error";
251 LibraryHandle handle;
252
253 for (const auto& p : potential_paths) {
254 handle = dlopen(p.ToNative().c_str(), RTLD_NOW | RTLD_LOCAL);
255
256 if (handle != NULL) {
257 return handle;
258 } else {
259 const char* err_msg = dlerror();
260 if (err_msg != NULL) {
261 error_message = err_msg;
262 }
263 }
264 }
265
266 return Status::IOError("Unable to load ", name, ": ", error_message);
267}
268
269#else
270Result<LibraryHandle> try_dlopen(const std::vector<PlatformFilename>& potential_paths,
271 const char* name) {
272 std::string error_message;
273 LibraryHandle handle;
274
275 for (const auto& p : potential_paths) {
276 handle = LoadLibraryW(p.ToNative().c_str());
277 if (handle != NULL) {
278 return handle;
279 } else {
280 error_message = WinErrorMessage(GetLastError());
281 }
282 }
283
284 return Status::IOError("Unable to load ", name, ": ", error_message);
285}
286#endif // _WIN32
287
288LibHdfsShim libhdfs_shim;
289
290} // namespace
291
292Status LibHdfsShim::GetRequiredSymbols() {
293 GET_SYMBOL_REQUIRED(this, hdfsNewBuilder);
294 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNode);
295 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetNameNodePort);
296 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetUserName);
297 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetKerbTicketCachePath);
298 GET_SYMBOL_REQUIRED(this, hdfsBuilderSetForceNewInstance);
299 GET_SYMBOL_REQUIRED(this, hdfsBuilderConfSetStr);
300 GET_SYMBOL_REQUIRED(this, hdfsBuilderConnect);
301 GET_SYMBOL_REQUIRED(this, hdfsCreateDirectory);
302 GET_SYMBOL_REQUIRED(this, hdfsDelete);
303 GET_SYMBOL_REQUIRED(this, hdfsDisconnect);
304 GET_SYMBOL_REQUIRED(this, hdfsExists);
305 GET_SYMBOL_REQUIRED(this, hdfsFreeFileInfo);
306 GET_SYMBOL_REQUIRED(this, hdfsGetCapacity);
307 GET_SYMBOL_REQUIRED(this, hdfsGetUsed);
308 GET_SYMBOL_REQUIRED(this, hdfsGetPathInfo);
309 GET_SYMBOL_REQUIRED(this, hdfsListDirectory);
310 GET_SYMBOL_REQUIRED(this, hdfsChown);
311 GET_SYMBOL_REQUIRED(this, hdfsChmod);
312
313 // File methods
314 GET_SYMBOL_REQUIRED(this, hdfsCloseFile);
315 GET_SYMBOL_REQUIRED(this, hdfsFlush);
316 GET_SYMBOL_REQUIRED(this, hdfsOpenFile);
317 GET_SYMBOL_REQUIRED(this, hdfsRead);
318 GET_SYMBOL_REQUIRED(this, hdfsSeek);
319 GET_SYMBOL_REQUIRED(this, hdfsTell);
320 GET_SYMBOL_REQUIRED(this, hdfsWrite);
321
322 return Status::OK();
323}
324
325Status ConnectLibHdfs(LibHdfsShim** driver) {
326 static std::mutex lock;
327 std::lock_guard<std::mutex> guard(lock);
328
329 LibHdfsShim* shim = &libhdfs_shim;
330
331 static bool shim_attempted = false;
332 if (!shim_attempted) {
333 shim_attempted = true;
334
335 shim->Initialize();
336
337 ARROW_ASSIGN_OR_RAISE(auto libjvm_potential_paths, get_potential_libjvm_paths());
338 ARROW_ASSIGN_OR_RAISE(libjvm_handle, try_dlopen(libjvm_potential_paths, "libjvm"));
339
340 ARROW_ASSIGN_OR_RAISE(auto libhdfs_potential_paths, get_potential_libhdfs_paths());
341 ARROW_ASSIGN_OR_RAISE(shim->handle, try_dlopen(libhdfs_potential_paths, "libhdfs"));
342 } else if (shim->handle == nullptr) {
343 return Status::IOError("Prior attempt to load libhdfs failed");
344 }
345
346 *driver = shim;
347 return shim->GetRequiredSymbols();
348}
349
350///////////////////////////////////////////////////////////////////////////
351// HDFS thin wrapper methods
352
353hdfsBuilder* LibHdfsShim::NewBuilder(void) { return this->hdfsNewBuilder(); }
354
355void LibHdfsShim::BuilderSetNameNode(hdfsBuilder* bld, const char* nn) {
356 this->hdfsBuilderSetNameNode(bld, nn);
357}
358
359void LibHdfsShim::BuilderSetNameNodePort(hdfsBuilder* bld, tPort port) {
360 this->hdfsBuilderSetNameNodePort(bld, port);
361}
362
363void LibHdfsShim::BuilderSetUserName(hdfsBuilder* bld, const char* userName) {
364 this->hdfsBuilderSetUserName(bld, userName);
365}
366
367void LibHdfsShim::BuilderSetKerbTicketCachePath(hdfsBuilder* bld,
368 const char* kerbTicketCachePath) {
369 this->hdfsBuilderSetKerbTicketCachePath(bld, kerbTicketCachePath);
370}
371
372void LibHdfsShim::BuilderSetForceNewInstance(hdfsBuilder* bld) {
373 this->hdfsBuilderSetForceNewInstance(bld);
374}
375
376hdfsFS LibHdfsShim::BuilderConnect(hdfsBuilder* bld) {
377 return this->hdfsBuilderConnect(bld);
378}
379
380int LibHdfsShim::BuilderConfSetStr(hdfsBuilder* bld, const char* key, const char* val) {
381 return this->hdfsBuilderConfSetStr(bld, key, val);
382}
383
384int LibHdfsShim::Disconnect(hdfsFS fs) { return this->hdfsDisconnect(fs); }
385
386hdfsFile LibHdfsShim::OpenFile(hdfsFS fs, const char* path, int flags, int bufferSize,
387 short replication, tSize blocksize) { // NOLINT
388 return this->hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize);
389}
390
391int LibHdfsShim::CloseFile(hdfsFS fs, hdfsFile file) {
392 return this->hdfsCloseFile(fs, file);
393}
394
395int LibHdfsShim::Exists(hdfsFS fs, const char* path) {
396 return this->hdfsExists(fs, path);
397}
398
399int LibHdfsShim::Seek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
400 return this->hdfsSeek(fs, file, desiredPos);
401}
402
403tOffset LibHdfsShim::Tell(hdfsFS fs, hdfsFile file) { return this->hdfsTell(fs, file); }
404
405tSize LibHdfsShim::Read(hdfsFS fs, hdfsFile file, void* buffer, tSize length) {
406 return this->hdfsRead(fs, file, buffer, length);
407}
408
409bool LibHdfsShim::HasPread() {
410 GET_SYMBOL(this, hdfsPread);
411 return this->hdfsPread != nullptr;
412}
413
414tSize LibHdfsShim::Pread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
415 tSize length) {
416 GET_SYMBOL(this, hdfsPread);
417 DCHECK(this->hdfsPread);
418 return this->hdfsPread(fs, file, position, buffer, length);
419}
420
421tSize LibHdfsShim::Write(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) {
422 return this->hdfsWrite(fs, file, buffer, length);
423}
424
425int LibHdfsShim::Flush(hdfsFS fs, hdfsFile file) { return this->hdfsFlush(fs, file); }
426
427int LibHdfsShim::Available(hdfsFS fs, hdfsFile file) {
428 GET_SYMBOL(this, hdfsAvailable);
429 if (this->hdfsAvailable)
430 return this->hdfsAvailable(fs, file);
431 else
432 return 0;
433}
434
435int LibHdfsShim::Copy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
436 GET_SYMBOL(this, hdfsCopy);
437 if (this->hdfsCopy)
438 return this->hdfsCopy(srcFS, src, dstFS, dst);
439 else
440 return 0;
441}
442
443int LibHdfsShim::Move(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
444 GET_SYMBOL(this, hdfsMove);
445 if (this->hdfsMove)
446 return this->hdfsMove(srcFS, src, dstFS, dst);
447 else
448 return 0;
449}
450
451int LibHdfsShim::Delete(hdfsFS fs, const char* path, int recursive) {
452 return this->hdfsDelete(fs, path, recursive);
453}
454
455int LibHdfsShim::Rename(hdfsFS fs, const char* oldPath, const char* newPath) {
456 GET_SYMBOL(this, hdfsRename);
457 if (this->hdfsRename)
458 return this->hdfsRename(fs, oldPath, newPath);
459 else
460 return 0;
461}
462
463char* LibHdfsShim::GetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) {
464 GET_SYMBOL(this, hdfsGetWorkingDirectory);
465 if (this->hdfsGetWorkingDirectory) {
466 return this->hdfsGetWorkingDirectory(fs, buffer, bufferSize);
467 } else {
468 return NULL;
469 }
470}
471
472int LibHdfsShim::SetWorkingDirectory(hdfsFS fs, const char* path) {
473 GET_SYMBOL(this, hdfsSetWorkingDirectory);
474 if (this->hdfsSetWorkingDirectory) {
475 return this->hdfsSetWorkingDirectory(fs, path);
476 } else {
477 return 0;
478 }
479}
480
481int LibHdfsShim::MakeDirectory(hdfsFS fs, const char* path) {
482 return this->hdfsCreateDirectory(fs, path);
483}
484
485int LibHdfsShim::SetReplication(hdfsFS fs, const char* path, int16_t replication) {
486 GET_SYMBOL(this, hdfsSetReplication);
487 if (this->hdfsSetReplication) {
488 return this->hdfsSetReplication(fs, path, replication);
489 } else {
490 return 0;
491 }
492}
493
494hdfsFileInfo* LibHdfsShim::ListDirectory(hdfsFS fs, const char* path, int* numEntries) {
495 return this->hdfsListDirectory(fs, path, numEntries);
496}
497
498hdfsFileInfo* LibHdfsShim::GetPathInfo(hdfsFS fs, const char* path) {
499 return this->hdfsGetPathInfo(fs, path);
500}
501
502void LibHdfsShim::FreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) {
503 this->hdfsFreeFileInfo(hdfsFileInfo, numEntries);
504}
505
506char*** LibHdfsShim::GetHosts(hdfsFS fs, const char* path, tOffset start,
507 tOffset length) {
508 GET_SYMBOL(this, hdfsGetHosts);
509 if (this->hdfsGetHosts) {
510 return this->hdfsGetHosts(fs, path, start, length);
511 } else {
512 return NULL;
513 }
514}
515
516void LibHdfsShim::FreeHosts(char*** blockHosts) {
517 GET_SYMBOL(this, hdfsFreeHosts);
518 if (this->hdfsFreeHosts) {
519 this->hdfsFreeHosts(blockHosts);
520 }
521}
522
523tOffset LibHdfsShim::GetDefaultBlockSize(hdfsFS fs) {
524 GET_SYMBOL(this, hdfsGetDefaultBlockSize);
525 if (this->hdfsGetDefaultBlockSize) {
526 return this->hdfsGetDefaultBlockSize(fs);
527 } else {
528 return 0;
529 }
530}
531
532tOffset LibHdfsShim::GetCapacity(hdfsFS fs) { return this->hdfsGetCapacity(fs); }
533
534tOffset LibHdfsShim::GetUsed(hdfsFS fs) { return this->hdfsGetUsed(fs); }
535
536int LibHdfsShim::Chown(hdfsFS fs, const char* path, const char* owner,
537 const char* group) {
538 return this->hdfsChown(fs, path, owner, group);
539}
540
541int LibHdfsShim::Chmod(hdfsFS fs, const char* path, short mode) { // NOLINT
542 return this->hdfsChmod(fs, path, mode);
543}
544
545int LibHdfsShim::Utime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
546 GET_SYMBOL(this, hdfsUtime);
547 if (this->hdfsUtime) {
548 return this->hdfsUtime(fs, path, mtime, atime);
549 } else {
550 return 0;
551 }
552}
553
554} // namespace internal
555} // namespace io
556} // namespace arrow