]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/env/env_hdfs.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / env / env_hdfs.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6
7 #include "rocksdb/env.h"
8 #include "hdfs/env_hdfs.h"
9
10 #ifdef USE_HDFS
11 #ifndef ROCKSDB_HDFS_FILE_C
12 #define ROCKSDB_HDFS_FILE_C
13
14 #include <stdio.h>
15 #include <time.h>
16 #include <algorithm>
17 #include <iostream>
18 #include <sstream>
19 #include "logging/logging.h"
20 #include "rocksdb/status.h"
21 #include "util/string_util.h"
22
23 #define HDFS_EXISTS 0
24 #define HDFS_DOESNT_EXIST -1
25 #define HDFS_SUCCESS 0
26
27 //
28 // This file defines an HDFS environment for rocksdb. It uses the libhdfs
29 // api to access HDFS. All HDFS files created by one instance of rocksdb
30 // will reside on the same HDFS cluster.
31 //
32
33 namespace ROCKSDB_NAMESPACE {
34
35 namespace {
36
37 // Log error message
38 static Status IOError(const std::string& context, int err_number) {
39 return (err_number == ENOSPC)
40 ? Status::NoSpace(context, strerror(err_number))
41 : (err_number == ENOENT)
42 ? Status::PathNotFound(context, strerror(err_number))
43 : Status::IOError(context, strerror(err_number));
44 }
45
46 // assume that there is one global logger for now. It is not thread-safe,
47 // but need not be because the logger is initialized at db-open time.
48 static Logger* mylog = nullptr;
49
50 // Used for reading a file from HDFS. It implements both sequential-read
51 // access methods as well as random read access methods.
52 class HdfsReadableFile : virtual public SequentialFile,
53 virtual public RandomAccessFile {
54 private:
55 hdfsFS fileSys_;
56 std::string filename_;
57 hdfsFile hfile_;
58
59 public:
60 HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
61 : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
62 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
63 filename_.c_str());
64 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
65 ROCKS_LOG_DEBUG(mylog,
66 "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
67 filename_.c_str(), hfile_);
68 }
69
70 virtual ~HdfsReadableFile() {
71 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
72 filename_.c_str());
73 hdfsCloseFile(fileSys_, hfile_);
74 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
75 filename_.c_str());
76 hfile_ = nullptr;
77 }
78
79 bool isValid() {
80 return hfile_ != nullptr;
81 }
82
83 // sequential access, read data at current offset in file
84 virtual Status Read(size_t n, Slice* result, char* scratch) {
85 Status s;
86 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
87 filename_.c_str(), n);
88
89 char* buffer = scratch;
90 size_t total_bytes_read = 0;
91 tSize bytes_read = 0;
92 tSize remaining_bytes = (tSize)n;
93
94 // Read a total of n bytes repeatedly until we hit error or eof
95 while (remaining_bytes > 0) {
96 bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
97 if (bytes_read <= 0) {
98 break;
99 }
100 assert(bytes_read <= remaining_bytes);
101
102 total_bytes_read += bytes_read;
103 remaining_bytes -= bytes_read;
104 buffer += bytes_read;
105 }
106 assert(total_bytes_read <= n);
107
108 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
109 filename_.c_str());
110
111 if (bytes_read < 0) {
112 s = IOError(filename_, errno);
113 } else {
114 *result = Slice(scratch, total_bytes_read);
115 }
116
117 return s;
118 }
119
120 // random access, read data from specified offset in file
121 virtual Status Read(uint64_t offset, size_t n, Slice* result,
122 char* scratch) const {
123 Status s;
124 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
125 filename_.c_str());
126 tSize bytes_read =
127 hdfsPread(fileSys_, hfile_, offset, static_cast<void*>(scratch),
128 static_cast<tSize>(n));
129 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
130 filename_.c_str());
131 *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
132 if (bytes_read < 0) {
133 // An error: return a non-ok status
134 s = IOError(filename_, errno);
135 }
136 return s;
137 }
138
139 virtual Status Skip(uint64_t n) {
140 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
141 filename_.c_str());
142 // get current offset from file
143 tOffset current = hdfsTell(fileSys_, hfile_);
144 if (current < 0) {
145 return IOError(filename_, errno);
146 }
147 // seek to new offset in file
148 tOffset newoffset = current + n;
149 int val = hdfsSeek(fileSys_, hfile_, newoffset);
150 if (val < 0) {
151 return IOError(filename_, errno);
152 }
153 return Status::OK();
154 }
155
156 private:
157
158 // returns true if we are at the end of file, false otherwise
159 bool feof() {
160 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
161 filename_.c_str());
162 if (hdfsTell(fileSys_, hfile_) == fileSize()) {
163 return true;
164 }
165 return false;
166 }
167
168 // the current size of the file
169 tOffset fileSize() {
170 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
171 filename_.c_str());
172 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
173 tOffset size = 0L;
174 if (pFileInfo != nullptr) {
175 size = pFileInfo->mSize;
176 hdfsFreeFileInfo(pFileInfo, 1);
177 } else {
178 throw HdfsFatalException("fileSize on unknown file " + filename_);
179 }
180 return size;
181 }
182 };
183
184 // Appends to an existing file in HDFS.
185 class HdfsWritableFile: public WritableFile {
186 private:
187 hdfsFS fileSys_;
188 std::string filename_;
189 hdfsFile hfile_;
190
191 public:
192 HdfsWritableFile(hdfsFS fileSys, const std::string& fname,
193 const EnvOptions& options)
194 : WritableFile(options),
195 fileSys_(fileSys),
196 filename_(fname),
197 hfile_(nullptr) {
198 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
199 filename_.c_str());
200 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
201 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
202 filename_.c_str());
203 assert(hfile_ != nullptr);
204 }
205 virtual ~HdfsWritableFile() {
206 if (hfile_ != nullptr) {
207 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
208 filename_.c_str());
209 hdfsCloseFile(fileSys_, hfile_);
210 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
211 filename_.c_str());
212 hfile_ = nullptr;
213 }
214 }
215
216 // If the file was successfully created, then this returns true.
217 // Otherwise returns false.
218 bool isValid() {
219 return hfile_ != nullptr;
220 }
221
222 // The name of the file, mostly needed for debug logging.
223 const std::string& getName() {
224 return filename_;
225 }
226
227 virtual Status Append(const Slice& data) {
228 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
229 filename_.c_str());
230 const char* src = data.data();
231 size_t left = data.size();
232 size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
233 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
234 filename_.c_str());
235 if (ret != left) {
236 return IOError(filename_, errno);
237 }
238 return Status::OK();
239 }
240
241 virtual Status Flush() {
242 return Status::OK();
243 }
244
245 virtual Status Sync() {
246 Status s;
247 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
248 filename_.c_str());
249 if (hdfsFlush(fileSys_, hfile_) == -1) {
250 return IOError(filename_, errno);
251 }
252 if (hdfsHSync(fileSys_, hfile_) == -1) {
253 return IOError(filename_, errno);
254 }
255 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
256 filename_.c_str());
257 return Status::OK();
258 }
259
260 // This is used by HdfsLogger to write data to the debug log file
261 virtual Status Append(const char* src, size_t size) {
262 if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
263 static_cast<tSize>(size)) {
264 return IOError(filename_, errno);
265 }
266 return Status::OK();
267 }
268
269 virtual Status Close() {
270 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
271 filename_.c_str());
272 if (hdfsCloseFile(fileSys_, hfile_) != 0) {
273 return IOError(filename_, errno);
274 }
275 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
276 filename_.c_str());
277 hfile_ = nullptr;
278 return Status::OK();
279 }
280 };
281
282 // The object that implements the debug logs to reside in HDFS.
283 class HdfsLogger : public Logger {
284 private:
285 HdfsWritableFile* file_;
286 uint64_t (*gettid_)(); // Return the thread id for the current thread
287
288 Status HdfsCloseHelper() {
289 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
290 file_->getName().c_str());
291 if (mylog != nullptr && mylog == this) {
292 mylog = nullptr;
293 }
294 return Status::OK();
295 }
296
297 protected:
298 virtual Status CloseImpl() override { return HdfsCloseHelper(); }
299
300 public:
301 HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
302 : file_(f), gettid_(gettid) {
303 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
304 file_->getName().c_str());
305 }
306
307 ~HdfsLogger() override {
308 if (!closed_) {
309 closed_ = true;
310 HdfsCloseHelper();
311 }
312 }
313
314 using Logger::Logv;
315 void Logv(const char* format, va_list ap) override {
316 const uint64_t thread_id = (*gettid_)();
317
318 // We try twice: the first time with a fixed-size stack allocated buffer,
319 // and the second time with a much larger dynamically allocated buffer.
320 char buffer[500];
321 for (int iter = 0; iter < 2; iter++) {
322 char* base;
323 int bufsize;
324 if (iter == 0) {
325 bufsize = sizeof(buffer);
326 base = buffer;
327 } else {
328 bufsize = 30000;
329 base = new char[bufsize];
330 }
331 char* p = base;
332 char* limit = base + bufsize;
333
334 struct timeval now_tv;
335 gettimeofday(&now_tv, nullptr);
336 const time_t seconds = now_tv.tv_sec;
337 struct tm t;
338 localtime_r(&seconds, &t);
339 p += snprintf(p, limit - p,
340 "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
341 t.tm_year + 1900,
342 t.tm_mon + 1,
343 t.tm_mday,
344 t.tm_hour,
345 t.tm_min,
346 t.tm_sec,
347 static_cast<int>(now_tv.tv_usec),
348 static_cast<long long unsigned int>(thread_id));
349
350 // Print the message
351 if (p < limit) {
352 va_list backup_ap;
353 va_copy(backup_ap, ap);
354 p += vsnprintf(p, limit - p, format, backup_ap);
355 va_end(backup_ap);
356 }
357
358 // Truncate to available space if necessary
359 if (p >= limit) {
360 if (iter == 0) {
361 continue; // Try again with larger buffer
362 } else {
363 p = limit - 1;
364 }
365 }
366
367 // Add newline if necessary
368 if (p == base || p[-1] != '\n') {
369 *p++ = '\n';
370 }
371
372 assert(p <= limit);
373 file_->Append(base, p-base);
374 file_->Flush();
375 if (base != buffer) {
376 delete[] base;
377 }
378 break;
379 }
380 }
381 };
382
383 } // namespace
384
385 // Finally, the hdfs environment
386
387 const std::string HdfsEnv::kProto = "hdfs://";
388 const std::string HdfsEnv::pathsep = "/";
389
390 // open a file for sequential reading
391 Status HdfsEnv::NewSequentialFile(const std::string& fname,
392 std::unique_ptr<SequentialFile>* result,
393 const EnvOptions& /*options*/) {
394 result->reset();
395 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
396 if (f == nullptr || !f->isValid()) {
397 delete f;
398 *result = nullptr;
399 return IOError(fname, errno);
400 }
401 result->reset(dynamic_cast<SequentialFile*>(f));
402 return Status::OK();
403 }
404
405 // open a file for random reading
406 Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
407 std::unique_ptr<RandomAccessFile>* result,
408 const EnvOptions& /*options*/) {
409 result->reset();
410 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
411 if (f == nullptr || !f->isValid()) {
412 delete f;
413 *result = nullptr;
414 return IOError(fname, errno);
415 }
416 result->reset(dynamic_cast<RandomAccessFile*>(f));
417 return Status::OK();
418 }
419
420 // create a new file for writing
421 Status HdfsEnv::NewWritableFile(const std::string& fname,
422 std::unique_ptr<WritableFile>* result,
423 const EnvOptions& options) {
424 result->reset();
425 Status s;
426 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
427 if (f == nullptr || !f->isValid()) {
428 delete f;
429 *result = nullptr;
430 return IOError(fname, errno);
431 }
432 result->reset(dynamic_cast<WritableFile*>(f));
433 return Status::OK();
434 }
435
436 class HdfsDirectory : public Directory {
437 public:
438 explicit HdfsDirectory(int fd) : fd_(fd) {}
439 ~HdfsDirectory() {}
440
441 Status Fsync() override { return Status::OK(); }
442
443 int GetFd() const { return fd_; }
444
445 private:
446 int fd_;
447 };
448
449 Status HdfsEnv::NewDirectory(const std::string& name,
450 std::unique_ptr<Directory>* result) {
451 int value = hdfsExists(fileSys_, name.c_str());
452 switch (value) {
453 case HDFS_EXISTS:
454 result->reset(new HdfsDirectory(0));
455 return Status::OK();
456 default: // fail if the directory doesn't exist
457 ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
458 throw HdfsFatalException("hdfsExists call failed with error " +
459 ToString(value) + " on path " + name +
460 ".\n");
461 }
462 }
463
464 Status HdfsEnv::FileExists(const std::string& fname) {
465 int value = hdfsExists(fileSys_, fname.c_str());
466 switch (value) {
467 case HDFS_EXISTS:
468 return Status::OK();
469 case HDFS_DOESNT_EXIST:
470 return Status::NotFound();
471 default: // anything else should be an error
472 ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
473 return Status::IOError("hdfsExists call failed with error " +
474 ToString(value) + " on path " + fname + ".\n");
475 }
476 }
477
478 Status HdfsEnv::GetChildren(const std::string& path,
479 std::vector<std::string>* result) {
480 int value = hdfsExists(fileSys_, path.c_str());
481 switch (value) {
482 case HDFS_EXISTS: { // directory exists
483 int numEntries = 0;
484 hdfsFileInfo* pHdfsFileInfo = 0;
485 pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
486 if (numEntries >= 0) {
487 for(int i = 0; i < numEntries; i++) {
488 std::string pathname(pHdfsFileInfo[i].mName);
489 size_t pos = pathname.rfind("/");
490 if (std::string::npos != pos) {
491 result->push_back(pathname.substr(pos + 1));
492 }
493 }
494 if (pHdfsFileInfo != nullptr) {
495 hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
496 }
497 } else {
498 // numEntries < 0 indicates error
499 ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
500 throw HdfsFatalException(
501 "hdfsListDirectory call failed negative error.\n");
502 }
503 break;
504 }
505 case HDFS_DOESNT_EXIST: // directory does not exist, exit
506 return Status::NotFound();
507 default: // anything else should be an error
508 ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
509 throw HdfsFatalException("hdfsExists call failed with error " +
510 ToString(value) + ".\n");
511 }
512 return Status::OK();
513 }
514
515 Status HdfsEnv::DeleteFile(const std::string& fname) {
516 if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
517 return Status::OK();
518 }
519 return IOError(fname, errno);
520 };
521
522 Status HdfsEnv::CreateDir(const std::string& name) {
523 if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
524 return Status::OK();
525 }
526 return IOError(name, errno);
527 };
528
529 Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
530 const int value = hdfsExists(fileSys_, name.c_str());
531 // Not atomic. state might change b/w hdfsExists and CreateDir.
532 switch (value) {
533 case HDFS_EXISTS:
534 return Status::OK();
535 case HDFS_DOESNT_EXIST:
536 return CreateDir(name);
537 default: // anything else should be an error
538 ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
539 throw HdfsFatalException("hdfsExists call failed with error " +
540 ToString(value) + ".\n");
541 }
542 };
543
544 Status HdfsEnv::DeleteDir(const std::string& name) {
545 return DeleteFile(name);
546 };
547
548 Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
549 *size = 0L;
550 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
551 if (pFileInfo != nullptr) {
552 *size = pFileInfo->mSize;
553 hdfsFreeFileInfo(pFileInfo, 1);
554 return Status::OK();
555 }
556 return IOError(fname, errno);
557 }
558
559 Status HdfsEnv::GetFileModificationTime(const std::string& fname,
560 uint64_t* time) {
561 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
562 if (pFileInfo != nullptr) {
563 *time = static_cast<uint64_t>(pFileInfo->mLastMod);
564 hdfsFreeFileInfo(pFileInfo, 1);
565 return Status::OK();
566 }
567 return IOError(fname, errno);
568
569 }
570
571 // The rename is not atomic. HDFS does not allow a renaming if the
572 // target already exists. So, we delete the target before attempting the
573 // rename.
574 Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
575 hdfsDelete(fileSys_, target.c_str(), 1);
576 if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
577 return Status::OK();
578 }
579 return IOError(src, errno);
580 }
581
582 Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
583 // there isn's a very good way to atomically check and create
584 // a file via libhdfs
585 *lock = nullptr;
586 return Status::OK();
587 }
588
589 Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
590
591 Status HdfsEnv::NewLogger(const std::string& fname,
592 std::shared_ptr<Logger>* result) {
593 // EnvOptions is used exclusively for its `strict_bytes_per_sync` value. That
594 // option is only intended for WAL/flush/compaction writes, so turn it off in
595 // the logger.
596 EnvOptions options;
597 options.strict_bytes_per_sync = false;
598 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname, options);
599 if (f == nullptr || !f->isValid()) {
600 delete f;
601 *result = nullptr;
602 return IOError(fname, errno);
603 }
604 HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
605 result->reset(h);
606 if (mylog == nullptr) {
607 // mylog = h; // uncomment this for detailed logging
608 }
609 return Status::OK();
610 }
611
612 Status HdfsEnv::IsDirectory(const std::string& path, bool* is_dir) {
613 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, path.c_str());
614 if (pFileInfo != nullptr) {
615 if (is_dir != nullptr) {
616 *is_dir = (pFileInfo->mKind == kObjectKindDirectory);
617 }
618 hdfsFreeFileInfo(pFileInfo, 1);
619 return Status::OK();
620 }
621 return IOError(path, errno);
622 }
623
624 // The factory method for creating an HDFS Env
625 Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
626 *hdfs_env = new HdfsEnv(fsname);
627 return Status::OK();
628 }
629 } // namespace ROCKSDB_NAMESPACE
630
631 #endif // ROCKSDB_HDFS_FILE_C
632
633 #else // USE_HDFS
634
635 // dummy placeholders used when HDFS is not available
636 namespace ROCKSDB_NAMESPACE {
637 Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
638 std::unique_ptr<SequentialFile>* /*result*/,
639 const EnvOptions& /*options*/) {
640 return Status::NotSupported("Not compiled with hdfs support");
641 }
642
643 Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
644 return Status::NotSupported("Not compiled with hdfs support");
645 }
646 } // namespace ROCKSDB_NAMESPACE
647
648 #endif