]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/env/env_hdfs.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / env / env_hdfs.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6
7 #include "rocksdb/env.h"
8 #include "hdfs/env_hdfs.h"
9
10 #ifdef USE_HDFS
11 #ifndef ROCKSDB_HDFS_FILE_C
12 #define ROCKSDB_HDFS_FILE_C
13
14 #include <algorithm>
15 #include <stdio.h>
16 #include <sys/time.h>
17 #include <time.h>
18 #include <iostream>
19 #include <sstream>
20 #include "rocksdb/status.h"
21 #include "util/string_util.h"
22
23 #define HDFS_EXISTS 0
24 #define HDFS_DOESNT_EXIST -1
25 #define HDFS_SUCCESS 0
26
27 //
28 // This file defines an HDFS environment for rocksdb. It uses the libhdfs
29 // api to access HDFS. All HDFS files created by one instance of rocksdb
30 // will reside on the same HDFS cluster.
31 //
32
33 namespace rocksdb {
34
35 namespace {
36
37 // Log error message
38 static Status IOError(const std::string& context, int err_number) {
39 return (err_number == ENOSPC) ?
40 Status::NoSpace(context, strerror(err_number)) :
41 Status::IOError(context, strerror(err_number));
42 }
43
44 // assume that there is one global logger for now. It is not thread-safe,
45 // but need not be because the logger is initialized at db-open time.
46 static Logger* mylog = nullptr;
47
48 // Used for reading a file from HDFS. It implements both sequential-read
49 // access methods as well as random read access methods.
50 class HdfsReadableFile : virtual public SequentialFile,
51 virtual public RandomAccessFile {
52 private:
53 hdfsFS fileSys_;
54 std::string filename_;
55 hdfsFile hfile_;
56
57 public:
58 HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
59 : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
60 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
61 filename_.c_str());
62 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
63 ROCKS_LOG_DEBUG(mylog,
64 "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
65 filename_.c_str(), hfile_);
66 }
67
68 virtual ~HdfsReadableFile() {
69 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
70 filename_.c_str());
71 hdfsCloseFile(fileSys_, hfile_);
72 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
73 filename_.c_str());
74 hfile_ = nullptr;
75 }
76
77 bool isValid() {
78 return hfile_ != nullptr;
79 }
80
81 // sequential access, read data at current offset in file
82 virtual Status Read(size_t n, Slice* result, char* scratch) {
83 Status s;
84 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
85 filename_.c_str(), n);
86
87 char* buffer = scratch;
88 size_t total_bytes_read = 0;
89 tSize bytes_read = 0;
90 tSize remaining_bytes = (tSize)n;
91
92 // Read a total of n bytes repeatedly until we hit error or eof
93 while (remaining_bytes > 0) {
94 bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
95 if (bytes_read <= 0) {
96 break;
97 }
98 assert(bytes_read <= remaining_bytes);
99
100 total_bytes_read += bytes_read;
101 remaining_bytes -= bytes_read;
102 buffer += bytes_read;
103 }
104 assert(total_bytes_read <= n);
105
106 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
107 filename_.c_str());
108
109 if (bytes_read < 0) {
110 s = IOError(filename_, errno);
111 } else {
112 *result = Slice(scratch, total_bytes_read);
113 }
114
115 return s;
116 }
117
118 // random access, read data from specified offset in file
119 virtual Status Read(uint64_t offset, size_t n, Slice* result,
120 char* scratch) const {
121 Status s;
122 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
123 filename_.c_str());
124 ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
125 (void*)scratch, (tSize)n);
126 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
127 filename_.c_str());
128 *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
129 if (bytes_read < 0) {
130 // An error: return a non-ok status
131 s = IOError(filename_, errno);
132 }
133 return s;
134 }
135
136 virtual Status Skip(uint64_t n) {
137 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
138 filename_.c_str());
139 // get current offset from file
140 tOffset current = hdfsTell(fileSys_, hfile_);
141 if (current < 0) {
142 return IOError(filename_, errno);
143 }
144 // seek to new offset in file
145 tOffset newoffset = current + n;
146 int val = hdfsSeek(fileSys_, hfile_, newoffset);
147 if (val < 0) {
148 return IOError(filename_, errno);
149 }
150 return Status::OK();
151 }
152
153 private:
154
155 // returns true if we are at the end of file, false otherwise
156 bool feof() {
157 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
158 filename_.c_str());
159 if (hdfsTell(fileSys_, hfile_) == fileSize()) {
160 return true;
161 }
162 return false;
163 }
164
165 // the current size of the file
166 tOffset fileSize() {
167 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
168 filename_.c_str());
169 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
170 tOffset size = 0L;
171 if (pFileInfo != nullptr) {
172 size = pFileInfo->mSize;
173 hdfsFreeFileInfo(pFileInfo, 1);
174 } else {
175 throw HdfsFatalException("fileSize on unknown file " + filename_);
176 }
177 return size;
178 }
179 };
180
181 // Appends to an existing file in HDFS.
182 class HdfsWritableFile: public WritableFile {
183 private:
184 hdfsFS fileSys_;
185 std::string filename_;
186 hdfsFile hfile_;
187
188 public:
189 HdfsWritableFile(hdfsFS fileSys, const std::string& fname)
190 : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) {
191 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
192 filename_.c_str());
193 hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
194 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
195 filename_.c_str());
196 assert(hfile_ != nullptr);
197 }
198 virtual ~HdfsWritableFile() {
199 if (hfile_ != nullptr) {
200 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
201 filename_.c_str());
202 hdfsCloseFile(fileSys_, hfile_);
203 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
204 filename_.c_str());
205 hfile_ = nullptr;
206 }
207 }
208
209 // If the file was successfully created, then this returns true.
210 // Otherwise returns false.
211 bool isValid() {
212 return hfile_ != nullptr;
213 }
214
215 // The name of the file, mostly needed for debug logging.
216 const std::string& getName() {
217 return filename_;
218 }
219
220 virtual Status Append(const Slice& data) {
221 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
222 filename_.c_str());
223 const char* src = data.data();
224 size_t left = data.size();
225 size_t ret = hdfsWrite(fileSys_, hfile_, src, left);
226 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
227 filename_.c_str());
228 if (ret != left) {
229 return IOError(filename_, errno);
230 }
231 return Status::OK();
232 }
233
234 virtual Status Flush() {
235 return Status::OK();
236 }
237
238 virtual Status Sync() {
239 Status s;
240 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
241 filename_.c_str());
242 if (hdfsFlush(fileSys_, hfile_) == -1) {
243 return IOError(filename_, errno);
244 }
245 if (hdfsHSync(fileSys_, hfile_) == -1) {
246 return IOError(filename_, errno);
247 }
248 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
249 filename_.c_str());
250 return Status::OK();
251 }
252
253 // This is used by HdfsLogger to write data to the debug log file
254 virtual Status Append(const char* src, size_t size) {
255 if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) {
256 return IOError(filename_, errno);
257 }
258 return Status::OK();
259 }
260
261 virtual Status Close() {
262 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
263 filename_.c_str());
264 if (hdfsCloseFile(fileSys_, hfile_) != 0) {
265 return IOError(filename_, errno);
266 }
267 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
268 filename_.c_str());
269 hfile_ = nullptr;
270 return Status::OK();
271 }
272 };
273
274 // The object that implements the debug logs to reside in HDFS.
275 class HdfsLogger : public Logger {
276 private:
277 HdfsWritableFile* file_;
278 uint64_t (*gettid_)(); // Return the thread id for the current thread
279
280 public:
281 HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
282 : file_(f), gettid_(gettid) {
283 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
284 file_->getName().c_str());
285 }
286
287 virtual ~HdfsLogger() {
288 ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
289 file_->getName().c_str());
290 delete file_;
291 if (mylog != nullptr && mylog == this) {
292 mylog = nullptr;
293 }
294 }
295
296 virtual void Logv(const char* format, va_list ap) {
297 const uint64_t thread_id = (*gettid_)();
298
299 // We try twice: the first time with a fixed-size stack allocated buffer,
300 // and the second time with a much larger dynamically allocated buffer.
301 char buffer[500];
302 for (int iter = 0; iter < 2; iter++) {
303 char* base;
304 int bufsize;
305 if (iter == 0) {
306 bufsize = sizeof(buffer);
307 base = buffer;
308 } else {
309 bufsize = 30000;
310 base = new char[bufsize];
311 }
312 char* p = base;
313 char* limit = base + bufsize;
314
315 struct timeval now_tv;
316 gettimeofday(&now_tv, nullptr);
317 const time_t seconds = now_tv.tv_sec;
318 struct tm t;
319 localtime_r(&seconds, &t);
320 p += snprintf(p, limit - p,
321 "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
322 t.tm_year + 1900,
323 t.tm_mon + 1,
324 t.tm_mday,
325 t.tm_hour,
326 t.tm_min,
327 t.tm_sec,
328 static_cast<int>(now_tv.tv_usec),
329 static_cast<long long unsigned int>(thread_id));
330
331 // Print the message
332 if (p < limit) {
333 va_list backup_ap;
334 va_copy(backup_ap, ap);
335 p += vsnprintf(p, limit - p, format, backup_ap);
336 va_end(backup_ap);
337 }
338
339 // Truncate to available space if necessary
340 if (p >= limit) {
341 if (iter == 0) {
342 continue; // Try again with larger buffer
343 } else {
344 p = limit - 1;
345 }
346 }
347
348 // Add newline if necessary
349 if (p == base || p[-1] != '\n') {
350 *p++ = '\n';
351 }
352
353 assert(p <= limit);
354 file_->Append(base, p-base);
355 file_->Flush();
356 if (base != buffer) {
357 delete[] base;
358 }
359 break;
360 }
361 }
362 };
363
364 } // namespace
365
366 // Finally, the hdfs environment
367
368 const std::string HdfsEnv::kProto = "hdfs://";
369 const std::string HdfsEnv::pathsep = "/";
370
371 // open a file for sequential reading
372 Status HdfsEnv::NewSequentialFile(const std::string& fname,
373 unique_ptr<SequentialFile>* result,
374 const EnvOptions& options) {
375 result->reset();
376 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
377 if (f == nullptr || !f->isValid()) {
378 delete f;
379 *result = nullptr;
380 return IOError(fname, errno);
381 }
382 result->reset(dynamic_cast<SequentialFile*>(f));
383 return Status::OK();
384 }
385
386 // open a file for random reading
387 Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
388 unique_ptr<RandomAccessFile>* result,
389 const EnvOptions& options) {
390 result->reset();
391 HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
392 if (f == nullptr || !f->isValid()) {
393 delete f;
394 *result = nullptr;
395 return IOError(fname, errno);
396 }
397 result->reset(dynamic_cast<RandomAccessFile*>(f));
398 return Status::OK();
399 }
400
401 // create a new file for writing
402 Status HdfsEnv::NewWritableFile(const std::string& fname,
403 unique_ptr<WritableFile>* result,
404 const EnvOptions& options) {
405 result->reset();
406 Status s;
407 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
408 if (f == nullptr || !f->isValid()) {
409 delete f;
410 *result = nullptr;
411 return IOError(fname, errno);
412 }
413 result->reset(dynamic_cast<WritableFile*>(f));
414 return Status::OK();
415 }
416
417 class HdfsDirectory : public Directory {
418 public:
419 explicit HdfsDirectory(int fd) : fd_(fd) {}
420 ~HdfsDirectory() {}
421
422 virtual Status Fsync() { return Status::OK(); }
423
424 private:
425 int fd_;
426 };
427
428 Status HdfsEnv::NewDirectory(const std::string& name,
429 unique_ptr<Directory>* result) {
430 int value = hdfsExists(fileSys_, name.c_str());
431 switch (value) {
432 case HDFS_EXISTS:
433 result->reset(new HdfsDirectory(0));
434 return Status::OK();
435 default: // fail if the directory doesn't exist
436 ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
437 throw HdfsFatalException("hdfsExists call failed with error " +
438 ToString(value) + " on path " + name +
439 ".\n");
440 }
441 }
442
443 Status HdfsEnv::FileExists(const std::string& fname) {
444 int value = hdfsExists(fileSys_, fname.c_str());
445 switch (value) {
446 case HDFS_EXISTS:
447 return Status::OK();
448 case HDFS_DOESNT_EXIST:
449 return Status::NotFound();
450 default: // anything else should be an error
451 ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
452 return Status::IOError("hdfsExists call failed with error " +
453 ToString(value) + " on path " + fname + ".\n");
454 }
455 }
456
457 Status HdfsEnv::GetChildren(const std::string& path,
458 std::vector<std::string>* result) {
459 int value = hdfsExists(fileSys_, path.c_str());
460 switch (value) {
461 case HDFS_EXISTS: { // directory exists
462 int numEntries = 0;
463 hdfsFileInfo* pHdfsFileInfo = 0;
464 pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
465 if (numEntries >= 0) {
466 for(int i = 0; i < numEntries; i++) {
467 char* pathname = pHdfsFileInfo[i].mName;
468 char* filename = std::rindex(pathname, '/');
469 if (filename != nullptr) {
470 result->push_back(filename+1);
471 }
472 }
473 if (pHdfsFileInfo != nullptr) {
474 hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
475 }
476 } else {
477 // numEntries < 0 indicates error
478 ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
479 throw HdfsFatalException(
480 "hdfsListDirectory call failed negative error.\n");
481 }
482 break;
483 }
484 case HDFS_DOESNT_EXIST: // directory does not exist, exit
485 return Status::NotFound();
486 default: // anything else should be an error
487 ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
488 throw HdfsFatalException("hdfsExists call failed with error " +
489 ToString(value) + ".\n");
490 }
491 return Status::OK();
492 }
493
494 Status HdfsEnv::DeleteFile(const std::string& fname) {
495 if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
496 return Status::OK();
497 }
498 return IOError(fname, errno);
499 };
500
501 Status HdfsEnv::CreateDir(const std::string& name) {
502 if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
503 return Status::OK();
504 }
505 return IOError(name, errno);
506 };
507
508 Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
509 const int value = hdfsExists(fileSys_, name.c_str());
510 // Not atomic. state might change b/w hdfsExists and CreateDir.
511 switch (value) {
512 case HDFS_EXISTS:
513 return Status::OK();
514 case HDFS_DOESNT_EXIST:
515 return CreateDir(name);
516 default: // anything else should be an error
517 ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
518 throw HdfsFatalException("hdfsExists call failed with error " +
519 ToString(value) + ".\n");
520 }
521 };
522
523 Status HdfsEnv::DeleteDir(const std::string& name) {
524 return DeleteFile(name);
525 };
526
527 Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
528 *size = 0L;
529 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
530 if (pFileInfo != nullptr) {
531 *size = pFileInfo->mSize;
532 hdfsFreeFileInfo(pFileInfo, 1);
533 return Status::OK();
534 }
535 return IOError(fname, errno);
536 }
537
538 Status HdfsEnv::GetFileModificationTime(const std::string& fname,
539 uint64_t* time) {
540 hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
541 if (pFileInfo != nullptr) {
542 *time = static_cast<uint64_t>(pFileInfo->mLastMod);
543 hdfsFreeFileInfo(pFileInfo, 1);
544 return Status::OK();
545 }
546 return IOError(fname, errno);
547
548 }
549
550 // The rename is not atomic. HDFS does not allow a renaming if the
551 // target already exists. So, we delete the target before attempting the
552 // rename.
553 Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
554 hdfsDelete(fileSys_, target.c_str(), 1);
555 if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
556 return Status::OK();
557 }
558 return IOError(src, errno);
559 }
560
561 Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) {
562 // there isn's a very good way to atomically check and create
563 // a file via libhdfs
564 *lock = nullptr;
565 return Status::OK();
566 }
567
568 Status HdfsEnv::UnlockFile(FileLock* lock) {
569 return Status::OK();
570 }
571
572 Status HdfsEnv::NewLogger(const std::string& fname,
573 shared_ptr<Logger>* result) {
574 HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
575 if (f == nullptr || !f->isValid()) {
576 delete f;
577 *result = nullptr;
578 return IOError(fname, errno);
579 }
580 HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
581 result->reset(h);
582 if (mylog == nullptr) {
583 // mylog = h; // uncomment this for detailed logging
584 }
585 return Status::OK();
586 }
587
588 // The factory method for creating an HDFS Env
589 Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
590 *hdfs_env = new HdfsEnv(fsname);
591 return Status::OK();
592 }
593 } // namespace rocksdb
594
595 #endif // ROCKSDB_HDFS_FILE_C
596
597 #else // USE_HDFS
598
599 // dummy placeholders used when HDFS is not available
600 namespace rocksdb {
601 Status HdfsEnv::NewSequentialFile(const std::string& fname,
602 unique_ptr<SequentialFile>* result,
603 const EnvOptions& options) {
604 return Status::NotSupported("Not compiled with hdfs support");
605 }
606
607 Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
608 return Status::NotSupported("Not compiled with hdfs support");
609 }
610 }
611
612 #endif