]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/env/env_hdfs.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / env / env_hdfs.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6
7 #include "rocksdb/env.h"
8 #include "hdfs/env_hdfs.h"
9
10 #ifdef USE_HDFS
11 #ifndef ROCKSDB_HDFS_FILE_C
12 #define ROCKSDB_HDFS_FILE_C
13
14 #include <stdio.h>
15 #include <sys/time.h>
16 #include <time.h>
17 #include <algorithm>
18 #include <iostream>
19 #include <sstream>
20 #include "rocksdb/status.h"
21 #include "util/logging.h"
22 #include "util/string_util.h"
23
24 #define HDFS_EXISTS 0
25 #define HDFS_DOESNT_EXIST -1
26 #define HDFS_SUCCESS 0
27
28 //
29 // This file defines an HDFS environment for rocksdb. It uses the libhdfs
30 // api to access HDFS. All HDFS files created by one instance of rocksdb
31 // will reside on the same HDFS cluster.
32 //
33
34 namespace rocksdb {
35
36 namespace {
37
38 // Log error message
39 static Status IOError(const std::string& context, int err_number) {
40 return (err_number == ENOSPC)
41 ? Status::NoSpace(context, strerror(err_number))
42 : (err_number == ENOENT)
43 ? Status::PathNotFound(context, strerror(err_number))
44 : Status::IOError(context, strerror(err_number));
45 }
46
47 // assume that there is one global logger for now. It is not thread-safe,
48 // but need not be because the logger is initialized at db-open time.
49 static Logger* mylog = nullptr;
50
51 // Used for reading a file from HDFS. It implements both sequential-read
52 // access methods as well as random read access methods.
53 class HdfsReadableFile : virtual public SequentialFile,
54 virtual public RandomAccessFile {
55 private:
56 hdfsFS fileSys_;
57 std::string filename_;
58 hdfsFile hfile_;
59
60 public:
61 HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
62 : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
63 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
64 filename_.c_str());
65 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
66 ROCKS_LOG_DEBUG(mylog,
67 "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
68 filename_.c_str(), hfile_);
69 }
70
71 virtual ~HdfsReadableFile() {
72 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
73 filename_.c_str());
74 hdfsCloseFile(fileSys_, hfile_);
75 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
76 filename_.c_str());
77 hfile_ = nullptr;
78 }
79
80 bool isValid() {
81 return hfile_ != nullptr;
82 }
83
84 // sequential access, read data at current offset in file
85 virtual Status Read(size_t n, Slice* result, char* scratch) {
86 Status s;
87 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
88 filename_.c_str(), n);
89
90 char* buffer = scratch;
91 size_t total_bytes_read = 0;
92 tSize bytes_read = 0;
93 tSize remaining_bytes = (tSize)n;
94
95 // Read a total of n bytes repeatedly until we hit error or eof
96 while (remaining_bytes > 0) {
97 bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
98 if (bytes_read <= 0) {
99 break;
100 }
101 assert(bytes_read <= remaining_bytes);
102
103 total_bytes_read += bytes_read;
104 remaining_bytes -= bytes_read;
105 buffer += bytes_read;
106 }
107 assert(total_bytes_read <= n);
108
109 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
110 filename_.c_str());
111
112 if (bytes_read < 0) {
113 s = IOError(filename_, errno);
114 } else {
115 *result = Slice(scratch, total_bytes_read);
116 }
117
118 return s;
119 }
120
121 // random access, read data from specified offset in file
122 virtual Status Read(uint64_t offset, size_t n, Slice* result,
123 char* scratch) const {
124 Status s;
125 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
126 filename_.c_str());
127 ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
128 (void*)scratch, (tSize)n);
129 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
130 filename_.c_str());
131 *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
132 if (bytes_read < 0) {
133 // An error: return a non-ok status
134 s = IOError(filename_, errno);
135 }
136 return s;
137 }
138
139 virtual Status Skip(uint64_t n) {
140 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
141 filename_.c_str());
142 // get current offset from file
143 tOffset current = hdfsTell(fileSys_, hfile_);
144 if (current < 0) {
145 return IOError(filename_, errno);
146 }
147 // seek to new offset in file
148 tOffset newoffset = current + n;
149 int val = hdfsSeek(fileSys_, hfile_, newoffset);
150 if (val < 0) {
151 return IOError(filename_, errno);
152 }
153 return Status::OK();
154 }
155
156 private:
157
158 // returns true if we are at the end of file, false otherwise
159 bool feof() {
160 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
161 filename_.c_str());
162 if (hdfsTell(fileSys_, hfile_) == fileSize()) {
163 return true;
164 }
165 return false;
166 }
167
168 // the current size of the file
169 tOffset fileSize() {
170 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
171 filename_.c_str());
172 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
173 tOffset size = 0L;
174 if (pFileInfo != nullptr) {
175 size = pFileInfo->mSize;
176 hdfsFreeFileInfo(pFileInfo, 1);
177 } else {
178 throw HdfsFatalException("fileSize on unknown file " + filename_);
179 }
180 return size;
181 }
182 };
183
184 // Appends to an existing file in HDFS.
185 class HdfsWritableFile: public WritableFile {
186 private:
187 hdfsFS fileSys_;
188 std::string filename_;
189 hdfsFile hfile_;
190
191 public:
192 HdfsWritableFile(hdfsFS fileSys, const std::string& fname)
193 : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) {
194 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
195 filename_.c_str());
196 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
197 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
198 filename_.c_str());
199 assert(hfile_ != nullptr);
200 }
201 virtual ~HdfsWritableFile() {
202 if (hfile_ != nullptr) {
203 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
204 filename_.c_str());
205 hdfsCloseFile(fileSys_, hfile_);
206 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
207 filename_.c_str());
208 hfile_ = nullptr;
209 }
210 }
211
212 // If the file was successfully created, then this returns true.
213 // Otherwise returns false.
214 bool isValid() {
215 return hfile_ != nullptr;
216 }
217
218 // The name of the file, mostly needed for debug logging.
219 const std::string& getName() {
220 return filename_;
221 }
222
223 virtual Status Append(const Slice& data) {
224 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
225 filename_.c_str());
226 const char* src = data.data();
227 size_t left = data.size();
228 size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
229 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
230 filename_.c_str());
231 if (ret != left) {
232 return IOError(filename_, errno);
233 }
234 return Status::OK();
235 }
236
237 virtual Status Flush() {
238 return Status::OK();
239 }
240
241 virtual Status Sync() {
242 Status s;
243 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
244 filename_.c_str());
245 if (hdfsFlush(fileSys_, hfile_) == -1) {
246 return IOError(filename_, errno);
247 }
248 if (hdfsHSync(fileSys_, hfile_) == -1) {
249 return IOError(filename_, errno);
250 }
251 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
252 filename_.c_str());
253 return Status::OK();
254 }
255
256 // This is used by HdfsLogger to write data to the debug log file
257 virtual Status Append(const char* src, size_t size) {
258 if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
259 static_cast<tSize>(size)) {
260 return IOError(filename_, errno);
261 }
262 return Status::OK();
263 }
264
265 virtual Status Close() {
266 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
267 filename_.c_str());
268 if (hdfsCloseFile(fileSys_, hfile_) != 0) {
269 return IOError(filename_, errno);
270 }
271 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
272 filename_.c_str());
273 hfile_ = nullptr;
274 return Status::OK();
275 }
276 };
277
278 // The object that implements the debug logs to reside in HDFS.
279 class HdfsLogger : public Logger {
280 private:
281 HdfsWritableFile* file_;
282 uint64_t (*gettid_)(); // Return the thread id for the current thread
283
284 Status HdfsCloseHelper() {
285 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
286 file_->getName().c_str());
287 if (mylog != nullptr && mylog == this) {
288 mylog = nullptr;
289 }
290 return Status::OK();
291 }
292
293 protected:
294 virtual Status CloseImpl() override { return HdfsCloseHelper(); }
295
296 public:
297 HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
298 : file_(f), gettid_(gettid) {
299 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
300 file_->getName().c_str());
301 }
302
303 ~HdfsLogger() override {
304 if (!closed_) {
305 closed_ = true;
306 HdfsCloseHelper();
307 }
308 }
309
310 using Logger::Logv;
311 void Logv(const char* format, va_list ap) override {
312 const uint64_t thread_id = (*gettid_)();
313
314 // We try twice: the first time with a fixed-size stack allocated buffer,
315 // and the second time with a much larger dynamically allocated buffer.
316 char buffer[500];
317 for (int iter = 0; iter < 2; iter++) {
318 char* base;
319 int bufsize;
320 if (iter == 0) {
321 bufsize = sizeof(buffer);
322 base = buffer;
323 } else {
324 bufsize = 30000;
325 base = new char[bufsize];
326 }
327 char* p = base;
328 char* limit = base + bufsize;
329
330 struct timeval now_tv;
331 gettimeofday(&now_tv, nullptr);
332 const time_t seconds = now_tv.tv_sec;
333 struct tm t;
334 localtime_r(&seconds, &t);
335 p += snprintf(p, limit - p,
336 "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
337 t.tm_year + 1900,
338 t.tm_mon + 1,
339 t.tm_mday,
340 t.tm_hour,
341 t.tm_min,
342 t.tm_sec,
343 static_cast<int>(now_tv.tv_usec),
344 static_cast<long long unsigned int>(thread_id));
345
346 // Print the message
347 if (p < limit) {
348 va_list backup_ap;
349 va_copy(backup_ap, ap);
350 p += vsnprintf(p, limit - p, format, backup_ap);
351 va_end(backup_ap);
352 }
353
354 // Truncate to available space if necessary
355 if (p >= limit) {
356 if (iter == 0) {
357 continue; // Try again with larger buffer
358 } else {
359 p = limit - 1;
360 }
361 }
362
363 // Add newline if necessary
364 if (p == base || p[-1] != '\n') {
365 *p++ = '\n';
366 }
367
368 assert(p <= limit);
369 file_->Append(base, p-base);
370 file_->Flush();
371 if (base != buffer) {
372 delete[] base;
373 }
374 break;
375 }
376 }
377 };
378
379 } // namespace
380
381 // Finally, the hdfs environment
382
383 const std::string HdfsEnv::kProto = "hdfs://";
384 const std::string HdfsEnv::pathsep = "/";
385
386 // open a file for sequential reading
387 Status HdfsEnv::NewSequentialFile(const std::string& fname,
388 std::unique_ptr<SequentialFile>* result,
389 const EnvOptions& /*options*/) {
390 result->reset();
391 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
392 if (f == nullptr || !f->isValid()) {
393 delete f;
394 *result = nullptr;
395 return IOError(fname, errno);
396 }
397 result->reset(dynamic_cast<SequentialFile*>(f));
398 return Status::OK();
399 }
400
401 // open a file for random reading
402 Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
403 std::unique_ptr<RandomAccessFile>* result,
404 const EnvOptions& /*options*/) {
405 result->reset();
406 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
407 if (f == nullptr || !f->isValid()) {
408 delete f;
409 *result = nullptr;
410 return IOError(fname, errno);
411 }
412 result->reset(dynamic_cast<RandomAccessFile*>(f));
413 return Status::OK();
414 }
415
416 // create a new file for writing
417 Status HdfsEnv::NewWritableFile(const std::string& fname,
418 std::unique_ptr<WritableFile>* result,
419 const EnvOptions& /*options*/) {
420 result->reset();
421 Status s;
422 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
423 if (f == nullptr || !f->isValid()) {
424 delete f;
425 *result = nullptr;
426 return IOError(fname, errno);
427 }
428 result->reset(dynamic_cast<WritableFile*>(f));
429 return Status::OK();
430 }
431
432 class HdfsDirectory : public Directory {
433 public:
434 explicit HdfsDirectory(int fd) : fd_(fd) {}
435 ~HdfsDirectory() {}
436
437 Status Fsync() override { return Status::OK(); }
438
439 int GetFd() const { return fd_; }
440
441 private:
442 int fd_;
443 };
444
445 Status HdfsEnv::NewDirectory(const std::string& name,
446 std::unique_ptr<Directory>* result) {
447 int value = hdfsExists(fileSys_, name.c_str());
448 switch (value) {
449 case HDFS_EXISTS:
450 result->reset(new HdfsDirectory(0));
451 return Status::OK();
452 default: // fail if the directory doesn't exist
453 ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
454 throw HdfsFatalException("hdfsExists call failed with error " +
455 ToString(value) + " on path " + name +
456 ".\n");
457 }
458 }
459
460 Status HdfsEnv::FileExists(const std::string& fname) {
461 int value = hdfsExists(fileSys_, fname.c_str());
462 switch (value) {
463 case HDFS_EXISTS:
464 return Status::OK();
465 case HDFS_DOESNT_EXIST:
466 return Status::NotFound();
467 default: // anything else should be an error
468 ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
469 return Status::IOError("hdfsExists call failed with error " +
470 ToString(value) + " on path " + fname + ".\n");
471 }
472 }
473
474 Status HdfsEnv::GetChildren(const std::string& path,
475 std::vector<std::string>* result) {
476 int value = hdfsExists(fileSys_, path.c_str());
477 switch (value) {
478 case HDFS_EXISTS: { // directory exists
479 int numEntries = 0;
480 hdfsFileInfo* pHdfsFileInfo = 0;
481 pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
482 if (numEntries >= 0) {
483 for(int i = 0; i < numEntries; i++) {
484 std::string pathname(pHdfsFileInfo[i].mName);
485 size_t pos = pathname.rfind("/");
486 if (std::string::npos != pos) {
487 result->push_back(pathname.substr(pos + 1));
488 }
489 }
490 if (pHdfsFileInfo != nullptr) {
491 hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
492 }
493 } else {
494 // numEntries < 0 indicates error
495 ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
496 throw HdfsFatalException(
497 "hdfsListDirectory call failed negative error.\n");
498 }
499 break;
500 }
501 case HDFS_DOESNT_EXIST: // directory does not exist, exit
502 return Status::NotFound();
503 default: // anything else should be an error
504 ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
505 throw HdfsFatalException("hdfsExists call failed with error " +
506 ToString(value) + ".\n");
507 }
508 return Status::OK();
509 }
510
511 Status HdfsEnv::DeleteFile(const std::string& fname) {
512 if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
513 return Status::OK();
514 }
515 return IOError(fname, errno);
516 };
517
518 Status HdfsEnv::CreateDir(const std::string& name) {
519 if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
520 return Status::OK();
521 }
522 return IOError(name, errno);
523 };
524
525 Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
526 const int value = hdfsExists(fileSys_, name.c_str());
527 // Not atomic. state might change b/w hdfsExists and CreateDir.
528 switch (value) {
529 case HDFS_EXISTS:
530 return Status::OK();
531 case HDFS_DOESNT_EXIST:
532 return CreateDir(name);
533 default: // anything else should be an error
534 ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
535 throw HdfsFatalException("hdfsExists call failed with error " +
536 ToString(value) + ".\n");
537 }
538 };
539
540 Status HdfsEnv::DeleteDir(const std::string& name) {
541 return DeleteFile(name);
542 };
543
544 Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
545 *size = 0L;
546 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
547 if (pFileInfo != nullptr) {
548 *size = pFileInfo->mSize;
549 hdfsFreeFileInfo(pFileInfo, 1);
550 return Status::OK();
551 }
552 return IOError(fname, errno);
553 }
554
555 Status HdfsEnv::GetFileModificationTime(const std::string& fname,
556 uint64_t* time) {
557 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
558 if (pFileInfo != nullptr) {
559 *time = static_cast<uint64_t>(pFileInfo->mLastMod);
560 hdfsFreeFileInfo(pFileInfo, 1);
561 return Status::OK();
562 }
563 return IOError(fname, errno);
564
565 }
566
567 // The rename is not atomic. HDFS does not allow a renaming if the
568 // target already exists. So, we delete the target before attempting the
569 // rename.
570 Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
571 hdfsDelete(fileSys_, target.c_str(), 1);
572 if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
573 return Status::OK();
574 }
575 return IOError(src, errno);
576 }
577
578 Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
579 // there isn's a very good way to atomically check and create
580 // a file via libhdfs
581 *lock = nullptr;
582 return Status::OK();
583 }
584
585 Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
586
587 Status HdfsEnv::NewLogger(const std::string& fname,
588 std::shared_ptr<Logger>* result) {
589 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
590 if (f == nullptr || !f->isValid()) {
591 delete f;
592 *result = nullptr;
593 return IOError(fname, errno);
594 }
595 HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
596 result->reset(h);
597 if (mylog == nullptr) {
598 // mylog = h; // uncomment this for detailed logging
599 }
600 return Status::OK();
601 }
602
603 // The factory method for creating an HDFS Env
604 Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
605 *hdfs_env = new HdfsEnv(fsname);
606 return Status::OK();
607 }
608 } // namespace rocksdb
609
610 #endif // ROCKSDB_HDFS_FILE_C
611
612 #else // USE_HDFS
613
614 // dummy placeholders used when HDFS is not available
615 namespace rocksdb {
616 Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
617 std::unique_ptr<SequentialFile>* /*result*/,
618 const EnvOptions& /*options*/) {
619 return Status::NotSupported("Not compiled with hdfs support");
620 }
621
622 Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
623 return Status::NotSupported("Not compiled with hdfs support");
624 }
625 }
626
627 #endif