]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/env/env_hdfs.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / env / env_hdfs.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6
7#include "rocksdb/env.h"
8#include "hdfs/env_hdfs.h"
9
10#ifdef USE_HDFS
11#ifndef ROCKSDB_HDFS_FILE_C
12#define ROCKSDB_HDFS_FILE_C
13
14#include <algorithm>
15#include <stdio.h>
16#include <sys/time.h>
17#include <time.h>
18#include <iostream>
19#include <sstream>
20#include "rocksdb/status.h"
21#include "util/string_util.h"
22
23#define HDFS_EXISTS 0
24#define HDFS_DOESNT_EXIST -1
25#define HDFS_SUCCESS 0
26
27//
28// This file defines an HDFS environment for rocksdb. It uses the libhdfs
29// api to access HDFS. All HDFS files created by one instance of rocksdb
30// will reside on the same HDFS cluster.
31//
32
33namespace rocksdb {
34
35namespace {
36
37// Log error message
38static Status IOError(const std::string& context, int err_number) {
39 return (err_number == ENOSPC) ?
40 Status::NoSpace(context, strerror(err_number)) :
41 Status::IOError(context, strerror(err_number));
42}
43
44// assume that there is one global logger for now. It is not thread-safe,
45// but need not be because the logger is initialized at db-open time.
46static Logger* mylog = nullptr;
47
48// Used for reading a file from HDFS. It implements both sequential-read
49// access methods as well as random read access methods.
50class HdfsReadableFile : virtual public SequentialFile,
51 virtual public RandomAccessFile {
52 private:
53 hdfsFS fileSys_;
54 std::string filename_;
55 hdfsFile hfile_;
56
57 public:
58 HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
59 : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
60 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
61 filename_.c_str());
62 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
63 ROCKS_LOG_DEBUG(mylog,
64 "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
65 filename_.c_str(), hfile_);
66 }
67
68 virtual ~HdfsReadableFile() {
69 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
70 filename_.c_str());
71 hdfsCloseFile(fileSys_, hfile_);
72 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
73 filename_.c_str());
74 hfile_ = nullptr;
75 }
76
77 bool isValid() {
78 return hfile_ != nullptr;
79 }
80
81 // sequential access, read data at current offset in file
82 virtual Status Read(size_t n, Slice* result, char* scratch) {
83 Status s;
84 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
85 filename_.c_str(), n);
86
87 char* buffer = scratch;
88 size_t total_bytes_read = 0;
89 tSize bytes_read = 0;
90 tSize remaining_bytes = (tSize)n;
91
92 // Read a total of n bytes repeatedly until we hit error or eof
93 while (remaining_bytes > 0) {
94 bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
95 if (bytes_read <= 0) {
96 break;
97 }
98 assert(bytes_read <= remaining_bytes);
99
100 total_bytes_read += bytes_read;
101 remaining_bytes -= bytes_read;
102 buffer += bytes_read;
103 }
104 assert(total_bytes_read <= n);
105
106 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
107 filename_.c_str());
108
109 if (bytes_read < 0) {
110 s = IOError(filename_, errno);
111 } else {
112 *result = Slice(scratch, total_bytes_read);
113 }
114
115 return s;
116 }
117
118 // random access, read data from specified offset in file
119 virtual Status Read(uint64_t offset, size_t n, Slice* result,
120 char* scratch) const {
121 Status s;
122 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
123 filename_.c_str());
124 ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
125 (void*)scratch, (tSize)n);
126 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
127 filename_.c_str());
128 *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
129 if (bytes_read < 0) {
130 // An error: return a non-ok status
131 s = IOError(filename_, errno);
132 }
133 return s;
134 }
135
136 virtual Status Skip(uint64_t n) {
137 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
138 filename_.c_str());
139 // get current offset from file
140 tOffset current = hdfsTell(fileSys_, hfile_);
141 if (current < 0) {
142 return IOError(filename_, errno);
143 }
144 // seek to new offset in file
145 tOffset newoffset = current + n;
146 int val = hdfsSeek(fileSys_, hfile_, newoffset);
147 if (val < 0) {
148 return IOError(filename_, errno);
149 }
150 return Status::OK();
151 }
152
153 private:
154
155 // returns true if we are at the end of file, false otherwise
156 bool feof() {
157 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
158 filename_.c_str());
159 if (hdfsTell(fileSys_, hfile_) == fileSize()) {
160 return true;
161 }
162 return false;
163 }
164
165 // the current size of the file
166 tOffset fileSize() {
167 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
168 filename_.c_str());
169 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
170 tOffset size = 0L;
171 if (pFileInfo != nullptr) {
172 size = pFileInfo->mSize;
173 hdfsFreeFileInfo(pFileInfo, 1);
174 } else {
175 throw HdfsFatalException("fileSize on unknown file " + filename_);
176 }
177 return size;
178 }
179};
180
181// Appends to an existing file in HDFS.
182class HdfsWritableFile: public WritableFile {
183 private:
184 hdfsFS fileSys_;
185 std::string filename_;
186 hdfsFile hfile_;
187
188 public:
189 HdfsWritableFile(hdfsFS fileSys, const std::string& fname)
190 : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) {
191 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
192 filename_.c_str());
193 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
194 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
195 filename_.c_str());
196 assert(hfile_ != nullptr);
197 }
198 virtual ~HdfsWritableFile() {
199 if (hfile_ != nullptr) {
200 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
201 filename_.c_str());
202 hdfsCloseFile(fileSys_, hfile_);
203 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
204 filename_.c_str());
205 hfile_ = nullptr;
206 }
207 }
208
209 // If the file was successfully created, then this returns true.
210 // Otherwise returns false.
211 bool isValid() {
212 return hfile_ != nullptr;
213 }
214
215 // The name of the file, mostly needed for debug logging.
216 const std::string& getName() {
217 return filename_;
218 }
219
220 virtual Status Append(const Slice& data) {
221 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
222 filename_.c_str());
223 const char* src = data.data();
224 size_t left = data.size();
225 size_t ret = hdfsWrite(fileSys_, hfile_, src, left);
226 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
227 filename_.c_str());
228 if (ret != left) {
229 return IOError(filename_, errno);
230 }
231 return Status::OK();
232 }
233
234 virtual Status Flush() {
235 return Status::OK();
236 }
237
238 virtual Status Sync() {
239 Status s;
240 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
241 filename_.c_str());
242 if (hdfsFlush(fileSys_, hfile_) == -1) {
243 return IOError(filename_, errno);
244 }
245 if (hdfsHSync(fileSys_, hfile_) == -1) {
246 return IOError(filename_, errno);
247 }
248 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
249 filename_.c_str());
250 return Status::OK();
251 }
252
253 // This is used by HdfsLogger to write data to the debug log file
254 virtual Status Append(const char* src, size_t size) {
255 if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) {
256 return IOError(filename_, errno);
257 }
258 return Status::OK();
259 }
260
261 virtual Status Close() {
262 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
263 filename_.c_str());
264 if (hdfsCloseFile(fileSys_, hfile_) != 0) {
265 return IOError(filename_, errno);
266 }
267 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
268 filename_.c_str());
269 hfile_ = nullptr;
270 return Status::OK();
271 }
272};
273
274// The object that implements the debug logs to reside in HDFS.
275class HdfsLogger : public Logger {
276 private:
277 HdfsWritableFile* file_;
278 uint64_t (*gettid_)(); // Return the thread id for the current thread
279
11fdf7f2
TL
280 Status HdfsCloseHelper() {
281 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
282 file_->getName().c_str());
283 Status s = file_->Close();
284 if (mylog != nullptr && mylog == this) {
285 mylog = nullptr;
286 }
287 return s;
288 }
289
290 protected:
291 virtual Status CloseImpl() override { return HdfsCloseHelper(); }
292
7c673cae
FG
293 public:
294 HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
295 : file_(f), gettid_(gettid) {
296 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
297 file_->getName().c_str());
298 }
299
300 virtual ~HdfsLogger() {
11fdf7f2
TL
301 if (!closed_) {
302 closed_ = true;
303 HdfsCloseHelper();
7c673cae
FG
304 }
305 }
306
307 virtual void Logv(const char* format, va_list ap) {
308 const uint64_t thread_id = (*gettid_)();
309
310 // We try twice: the first time with a fixed-size stack allocated buffer,
311 // and the second time with a much larger dynamically allocated buffer.
312 char buffer[500];
313 for (int iter = 0; iter < 2; iter++) {
314 char* base;
315 int bufsize;
316 if (iter == 0) {
317 bufsize = sizeof(buffer);
318 base = buffer;
319 } else {
320 bufsize = 30000;
321 base = new char[bufsize];
322 }
323 char* p = base;
324 char* limit = base + bufsize;
325
326 struct timeval now_tv;
327 gettimeofday(&now_tv, nullptr);
328 const time_t seconds = now_tv.tv_sec;
329 struct tm t;
330 localtime_r(&seconds, &t);
331 p += snprintf(p, limit - p,
332 "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
333 t.tm_year + 1900,
334 t.tm_mon + 1,
335 t.tm_mday,
336 t.tm_hour,
337 t.tm_min,
338 t.tm_sec,
339 static_cast<int>(now_tv.tv_usec),
340 static_cast<long long unsigned int>(thread_id));
341
342 // Print the message
343 if (p < limit) {
344 va_list backup_ap;
345 va_copy(backup_ap, ap);
346 p += vsnprintf(p, limit - p, format, backup_ap);
347 va_end(backup_ap);
348 }
349
350 // Truncate to available space if necessary
351 if (p >= limit) {
352 if (iter == 0) {
353 continue; // Try again with larger buffer
354 } else {
355 p = limit - 1;
356 }
357 }
358
359 // Add newline if necessary
360 if (p == base || p[-1] != '\n') {
361 *p++ = '\n';
362 }
363
364 assert(p <= limit);
365 file_->Append(base, p-base);
366 file_->Flush();
367 if (base != buffer) {
368 delete[] base;
369 }
370 break;
371 }
372 }
373};
374
375} // namespace
376
377// Finally, the hdfs environment
378
379const std::string HdfsEnv::kProto = "hdfs://";
380const std::string HdfsEnv::pathsep = "/";
381
382// open a file for sequential reading
383Status HdfsEnv::NewSequentialFile(const std::string& fname,
384 unique_ptr<SequentialFile>* result,
385 const EnvOptions& options) {
386 result->reset();
387 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
388 if (f == nullptr || !f->isValid()) {
389 delete f;
390 *result = nullptr;
391 return IOError(fname, errno);
392 }
393 result->reset(dynamic_cast<SequentialFile*>(f));
394 return Status::OK();
395}
396
397// open a file for random reading
398Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
399 unique_ptr<RandomAccessFile>* result,
400 const EnvOptions& options) {
401 result->reset();
402 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
403 if (f == nullptr || !f->isValid()) {
404 delete f;
405 *result = nullptr;
406 return IOError(fname, errno);
407 }
408 result->reset(dynamic_cast<RandomAccessFile*>(f));
409 return Status::OK();
410}
411
412// create a new file for writing
413Status HdfsEnv::NewWritableFile(const std::string& fname,
414 unique_ptr<WritableFile>* result,
415 const EnvOptions& options) {
416 result->reset();
417 Status s;
418 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
419 if (f == nullptr || !f->isValid()) {
420 delete f;
421 *result = nullptr;
422 return IOError(fname, errno);
423 }
424 result->reset(dynamic_cast<WritableFile*>(f));
425 return Status::OK();
426}
427
428class HdfsDirectory : public Directory {
429 public:
430 explicit HdfsDirectory(int fd) : fd_(fd) {}
431 ~HdfsDirectory() {}
432
433 virtual Status Fsync() { return Status::OK(); }
434
435 private:
436 int fd_;
437};
438
439Status HdfsEnv::NewDirectory(const std::string& name,
440 unique_ptr<Directory>* result) {
441 int value = hdfsExists(fileSys_, name.c_str());
442 switch (value) {
443 case HDFS_EXISTS:
444 result->reset(new HdfsDirectory(0));
445 return Status::OK();
446 default: // fail if the directory doesn't exist
447 ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
448 throw HdfsFatalException("hdfsExists call failed with error " +
449 ToString(value) + " on path " + name +
450 ".\n");
451 }
452}
453
454Status HdfsEnv::FileExists(const std::string& fname) {
455 int value = hdfsExists(fileSys_, fname.c_str());
456 switch (value) {
457 case HDFS_EXISTS:
458 return Status::OK();
459 case HDFS_DOESNT_EXIST:
460 return Status::NotFound();
461 default: // anything else should be an error
462 ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
463 return Status::IOError("hdfsExists call failed with error " +
464 ToString(value) + " on path " + fname + ".\n");
465 }
466}
467
468Status HdfsEnv::GetChildren(const std::string& path,
469 std::vector<std::string>* result) {
470 int value = hdfsExists(fileSys_, path.c_str());
471 switch (value) {
472 case HDFS_EXISTS: { // directory exists
473 int numEntries = 0;
474 hdfsFileInfo* pHdfsFileInfo = 0;
475 pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
476 if (numEntries >= 0) {
477 for(int i = 0; i < numEntries; i++) {
478 char* pathname = pHdfsFileInfo[i].mName;
479 char* filename = std::rindex(pathname, '/');
480 if (filename != nullptr) {
481 result->push_back(filename+1);
482 }
483 }
484 if (pHdfsFileInfo != nullptr) {
485 hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
486 }
487 } else {
488 // numEntries < 0 indicates error
489 ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
490 throw HdfsFatalException(
491 "hdfsListDirectory call failed negative error.\n");
492 }
493 break;
494 }
495 case HDFS_DOESNT_EXIST: // directory does not exist, exit
496 return Status::NotFound();
497 default: // anything else should be an error
498 ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
499 throw HdfsFatalException("hdfsExists call failed with error " +
500 ToString(value) + ".\n");
501 }
502 return Status::OK();
503}
504
505Status HdfsEnv::DeleteFile(const std::string& fname) {
506 if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
507 return Status::OK();
508 }
509 return IOError(fname, errno);
510};
511
512Status HdfsEnv::CreateDir(const std::string& name) {
513 if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
514 return Status::OK();
515 }
516 return IOError(name, errno);
517};
518
519Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
520 const int value = hdfsExists(fileSys_, name.c_str());
521 // Not atomic. state might change b/w hdfsExists and CreateDir.
522 switch (value) {
523 case HDFS_EXISTS:
524 return Status::OK();
525 case HDFS_DOESNT_EXIST:
526 return CreateDir(name);
527 default: // anything else should be an error
528 ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
529 throw HdfsFatalException("hdfsExists call failed with error " +
530 ToString(value) + ".\n");
531 }
532};
533
534Status HdfsEnv::DeleteDir(const std::string& name) {
535 return DeleteFile(name);
536};
537
538Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
539 *size = 0L;
540 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
541 if (pFileInfo != nullptr) {
542 *size = pFileInfo->mSize;
543 hdfsFreeFileInfo(pFileInfo, 1);
544 return Status::OK();
545 }
546 return IOError(fname, errno);
547}
548
549Status HdfsEnv::GetFileModificationTime(const std::string& fname,
550 uint64_t* time) {
551 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
552 if (pFileInfo != nullptr) {
553 *time = static_cast<uint64_t>(pFileInfo->mLastMod);
554 hdfsFreeFileInfo(pFileInfo, 1);
555 return Status::OK();
556 }
557 return IOError(fname, errno);
558
559}
560
561// The rename is not atomic. HDFS does not allow a renaming if the
562// target already exists. So, we delete the target before attempting the
563// rename.
564Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
565 hdfsDelete(fileSys_, target.c_str(), 1);
566 if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
567 return Status::OK();
568 }
569 return IOError(src, errno);
570}
571
572Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) {
573 // there isn's a very good way to atomically check and create
574 // a file via libhdfs
575 *lock = nullptr;
576 return Status::OK();
577}
578
579Status HdfsEnv::UnlockFile(FileLock* lock) {
580 return Status::OK();
581}
582
583Status HdfsEnv::NewLogger(const std::string& fname,
584 shared_ptr<Logger>* result) {
585 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
586 if (f == nullptr || !f->isValid()) {
587 delete f;
588 *result = nullptr;
589 return IOError(fname, errno);
590 }
591 HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
592 result->reset(h);
593 if (mylog == nullptr) {
594 // mylog = h; // uncomment this for detailed logging
595 }
596 return Status::OK();
597}
598
599// The factory method for creating an HDFS Env
600Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
601 *hdfs_env = new HdfsEnv(fsname);
602 return Status::OK();
603}
604} // namespace rocksdb
605
606#endif // ROCKSDB_HDFS_FILE_C
607
608#else // USE_HDFS
609
610// dummy placeholders used when HDFS is not available
611namespace rocksdb {
11fdf7f2
TL
612Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
613 unique_ptr<SequentialFile>* /*result*/,
614 const EnvOptions& /*options*/) {
615 return Status::NotSupported("Not compiled with hdfs support");
7c673cae
FG
616 }
617
11fdf7f2 618 Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
7c673cae
FG
619 return Status::NotSupported("Not compiled with hdfs support");
620 }
621}
622
623#endif