]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | ||
7 | #include "rocksdb/env.h" | |
8 | #include "hdfs/env_hdfs.h" | |
9 | ||
10 | #ifdef USE_HDFS | |
11 | #ifndef ROCKSDB_HDFS_FILE_C | |
12 | #define ROCKSDB_HDFS_FILE_C | |
13 | ||
14 | #include <algorithm> | |
15 | #include <stdio.h> | |
16 | #include <sys/time.h> | |
17 | #include <time.h> | |
18 | #include <iostream> | |
19 | #include <sstream> | |
20 | #include "rocksdb/status.h" | |
21 | #include "util/string_util.h" | |
22 | ||
23 | #define HDFS_EXISTS 0 | |
24 | #define HDFS_DOESNT_EXIST -1 | |
25 | #define HDFS_SUCCESS 0 | |
26 | ||
27 | // | |
28 | // This file defines an HDFS environment for rocksdb. It uses the libhdfs | |
29 | // api to access HDFS. All HDFS files created by one instance of rocksdb | |
30 | // will reside on the same HDFS cluster. | |
31 | // | |
32 | ||
33 | namespace rocksdb { | |
34 | ||
35 | namespace { | |
36 | ||
37 | // Log error message | |
38 | static Status IOError(const std::string& context, int err_number) { | |
39 | return (err_number == ENOSPC) ? | |
40 | Status::NoSpace(context, strerror(err_number)) : | |
41 | Status::IOError(context, strerror(err_number)); | |
42 | } | |
43 | ||
44 | // assume that there is one global logger for now. It is not thread-safe, | |
45 | // but need not be because the logger is initialized at db-open time. | |
46 | static Logger* mylog = nullptr; | |
47 | ||
48 | // Used for reading a file from HDFS. It implements both sequential-read | |
49 | // access methods as well as random read access methods. | |
50 | class HdfsReadableFile : virtual public SequentialFile, | |
51 | virtual public RandomAccessFile { | |
52 | private: | |
53 | hdfsFS fileSys_; | |
54 | std::string filename_; | |
55 | hdfsFile hfile_; | |
56 | ||
57 | public: | |
58 | HdfsReadableFile(hdfsFS fileSys, const std::string& fname) | |
59 | : fileSys_(fileSys), filename_(fname), hfile_(nullptr) { | |
60 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n", | |
61 | filename_.c_str()); | |
62 | hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0); | |
63 | ROCKS_LOG_DEBUG(mylog, | |
64 | "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n", | |
65 | filename_.c_str(), hfile_); | |
66 | } | |
67 | ||
68 | virtual ~HdfsReadableFile() { | |
69 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n", | |
70 | filename_.c_str()); | |
71 | hdfsCloseFile(fileSys_, hfile_); | |
72 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n", | |
73 | filename_.c_str()); | |
74 | hfile_ = nullptr; | |
75 | } | |
76 | ||
77 | bool isValid() { | |
78 | return hfile_ != nullptr; | |
79 | } | |
80 | ||
81 | // sequential access, read data at current offset in file | |
82 | virtual Status Read(size_t n, Slice* result, char* scratch) { | |
83 | Status s; | |
84 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n", | |
85 | filename_.c_str(), n); | |
86 | ||
87 | char* buffer = scratch; | |
88 | size_t total_bytes_read = 0; | |
89 | tSize bytes_read = 0; | |
90 | tSize remaining_bytes = (tSize)n; | |
91 | ||
92 | // Read a total of n bytes repeatedly until we hit error or eof | |
93 | while (remaining_bytes > 0) { | |
94 | bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes); | |
95 | if (bytes_read <= 0) { | |
96 | break; | |
97 | } | |
98 | assert(bytes_read <= remaining_bytes); | |
99 | ||
100 | total_bytes_read += bytes_read; | |
101 | remaining_bytes -= bytes_read; | |
102 | buffer += bytes_read; | |
103 | } | |
104 | assert(total_bytes_read <= n); | |
105 | ||
106 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n", | |
107 | filename_.c_str()); | |
108 | ||
109 | if (bytes_read < 0) { | |
110 | s = IOError(filename_, errno); | |
111 | } else { | |
112 | *result = Slice(scratch, total_bytes_read); | |
113 | } | |
114 | ||
115 | return s; | |
116 | } | |
117 | ||
118 | // random access, read data from specified offset in file | |
119 | virtual Status Read(uint64_t offset, size_t n, Slice* result, | |
120 | char* scratch) const { | |
121 | Status s; | |
122 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n", | |
123 | filename_.c_str()); | |
124 | ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset, | |
125 | (void*)scratch, (tSize)n); | |
126 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n", | |
127 | filename_.c_str()); | |
128 | *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read); | |
129 | if (bytes_read < 0) { | |
130 | // An error: return a non-ok status | |
131 | s = IOError(filename_, errno); | |
132 | } | |
133 | return s; | |
134 | } | |
135 | ||
136 | virtual Status Skip(uint64_t n) { | |
137 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n", | |
138 | filename_.c_str()); | |
139 | // get current offset from file | |
140 | tOffset current = hdfsTell(fileSys_, hfile_); | |
141 | if (current < 0) { | |
142 | return IOError(filename_, errno); | |
143 | } | |
144 | // seek to new offset in file | |
145 | tOffset newoffset = current + n; | |
146 | int val = hdfsSeek(fileSys_, hfile_, newoffset); | |
147 | if (val < 0) { | |
148 | return IOError(filename_, errno); | |
149 | } | |
150 | return Status::OK(); | |
151 | } | |
152 | ||
153 | private: | |
154 | ||
155 | // returns true if we are at the end of file, false otherwise | |
156 | bool feof() { | |
157 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n", | |
158 | filename_.c_str()); | |
159 | if (hdfsTell(fileSys_, hfile_) == fileSize()) { | |
160 | return true; | |
161 | } | |
162 | return false; | |
163 | } | |
164 | ||
165 | // the current size of the file | |
166 | tOffset fileSize() { | |
167 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n", | |
168 | filename_.c_str()); | |
169 | hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str()); | |
170 | tOffset size = 0L; | |
171 | if (pFileInfo != nullptr) { | |
172 | size = pFileInfo->mSize; | |
173 | hdfsFreeFileInfo(pFileInfo, 1); | |
174 | } else { | |
175 | throw HdfsFatalException("fileSize on unknown file " + filename_); | |
176 | } | |
177 | return size; | |
178 | } | |
179 | }; | |
180 | ||
181 | // Appends to an existing file in HDFS. | |
182 | class HdfsWritableFile: public WritableFile { | |
183 | private: | |
184 | hdfsFS fileSys_; | |
185 | std::string filename_; | |
186 | hdfsFile hfile_; | |
187 | ||
188 | public: | |
189 | HdfsWritableFile(hdfsFS fileSys, const std::string& fname) | |
190 | : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) { | |
191 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n", | |
192 | filename_.c_str()); | |
193 | hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0); | |
194 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n", | |
195 | filename_.c_str()); | |
196 | assert(hfile_ != nullptr); | |
197 | } | |
198 | virtual ~HdfsWritableFile() { | |
199 | if (hfile_ != nullptr) { | |
200 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n", | |
201 | filename_.c_str()); | |
202 | hdfsCloseFile(fileSys_, hfile_); | |
203 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n", | |
204 | filename_.c_str()); | |
205 | hfile_ = nullptr; | |
206 | } | |
207 | } | |
208 | ||
209 | // If the file was successfully created, then this returns true. | |
210 | // Otherwise returns false. | |
211 | bool isValid() { | |
212 | return hfile_ != nullptr; | |
213 | } | |
214 | ||
215 | // The name of the file, mostly needed for debug logging. | |
216 | const std::string& getName() { | |
217 | return filename_; | |
218 | } | |
219 | ||
220 | virtual Status Append(const Slice& data) { | |
221 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n", | |
222 | filename_.c_str()); | |
223 | const char* src = data.data(); | |
224 | size_t left = data.size(); | |
225 | size_t ret = hdfsWrite(fileSys_, hfile_, src, left); | |
226 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n", | |
227 | filename_.c_str()); | |
228 | if (ret != left) { | |
229 | return IOError(filename_, errno); | |
230 | } | |
231 | return Status::OK(); | |
232 | } | |
233 | ||
234 | virtual Status Flush() { | |
235 | return Status::OK(); | |
236 | } | |
237 | ||
238 | virtual Status Sync() { | |
239 | Status s; | |
240 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n", | |
241 | filename_.c_str()); | |
242 | if (hdfsFlush(fileSys_, hfile_) == -1) { | |
243 | return IOError(filename_, errno); | |
244 | } | |
245 | if (hdfsHSync(fileSys_, hfile_) == -1) { | |
246 | return IOError(filename_, errno); | |
247 | } | |
248 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n", | |
249 | filename_.c_str()); | |
250 | return Status::OK(); | |
251 | } | |
252 | ||
253 | // This is used by HdfsLogger to write data to the debug log file | |
254 | virtual Status Append(const char* src, size_t size) { | |
255 | if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) { | |
256 | return IOError(filename_, errno); | |
257 | } | |
258 | return Status::OK(); | |
259 | } | |
260 | ||
261 | virtual Status Close() { | |
262 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n", | |
263 | filename_.c_str()); | |
264 | if (hdfsCloseFile(fileSys_, hfile_) != 0) { | |
265 | return IOError(filename_, errno); | |
266 | } | |
267 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n", | |
268 | filename_.c_str()); | |
269 | hfile_ = nullptr; | |
270 | return Status::OK(); | |
271 | } | |
272 | }; | |
273 | ||
274 | // The object that implements the debug logs to reside in HDFS. | |
275 | class HdfsLogger : public Logger { | |
276 | private: | |
277 | HdfsWritableFile* file_; | |
278 | uint64_t (*gettid_)(); // Return the thread id for the current thread | |
279 | ||
11fdf7f2 TL |
280 | Status HdfsCloseHelper() { |
281 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n", | |
282 | file_->getName().c_str()); | |
283 | Status s = file_->Close(); | |
284 | if (mylog != nullptr && mylog == this) { | |
285 | mylog = nullptr; | |
286 | } | |
287 | return s; | |
288 | } | |
289 | ||
290 | protected: | |
291 | virtual Status CloseImpl() override { return HdfsCloseHelper(); } | |
292 | ||
7c673cae FG |
293 | public: |
294 | HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) | |
295 | : file_(f), gettid_(gettid) { | |
296 | ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n", | |
297 | file_->getName().c_str()); | |
298 | } | |
299 | ||
300 | virtual ~HdfsLogger() { | |
11fdf7f2 TL |
301 | if (!closed_) { |
302 | closed_ = true; | |
303 | HdfsCloseHelper(); | |
7c673cae FG |
304 | } |
305 | } | |
306 | ||
307 | virtual void Logv(const char* format, va_list ap) { | |
308 | const uint64_t thread_id = (*gettid_)(); | |
309 | ||
310 | // We try twice: the first time with a fixed-size stack allocated buffer, | |
311 | // and the second time with a much larger dynamically allocated buffer. | |
312 | char buffer[500]; | |
313 | for (int iter = 0; iter < 2; iter++) { | |
314 | char* base; | |
315 | int bufsize; | |
316 | if (iter == 0) { | |
317 | bufsize = sizeof(buffer); | |
318 | base = buffer; | |
319 | } else { | |
320 | bufsize = 30000; | |
321 | base = new char[bufsize]; | |
322 | } | |
323 | char* p = base; | |
324 | char* limit = base + bufsize; | |
325 | ||
326 | struct timeval now_tv; | |
327 | gettimeofday(&now_tv, nullptr); | |
328 | const time_t seconds = now_tv.tv_sec; | |
329 | struct tm t; | |
330 | localtime_r(&seconds, &t); | |
331 | p += snprintf(p, limit - p, | |
332 | "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", | |
333 | t.tm_year + 1900, | |
334 | t.tm_mon + 1, | |
335 | t.tm_mday, | |
336 | t.tm_hour, | |
337 | t.tm_min, | |
338 | t.tm_sec, | |
339 | static_cast<int>(now_tv.tv_usec), | |
340 | static_cast<long long unsigned int>(thread_id)); | |
341 | ||
342 | // Print the message | |
343 | if (p < limit) { | |
344 | va_list backup_ap; | |
345 | va_copy(backup_ap, ap); | |
346 | p += vsnprintf(p, limit - p, format, backup_ap); | |
347 | va_end(backup_ap); | |
348 | } | |
349 | ||
350 | // Truncate to available space if necessary | |
351 | if (p >= limit) { | |
352 | if (iter == 0) { | |
353 | continue; // Try again with larger buffer | |
354 | } else { | |
355 | p = limit - 1; | |
356 | } | |
357 | } | |
358 | ||
359 | // Add newline if necessary | |
360 | if (p == base || p[-1] != '\n') { | |
361 | *p++ = '\n'; | |
362 | } | |
363 | ||
364 | assert(p <= limit); | |
365 | file_->Append(base, p-base); | |
366 | file_->Flush(); | |
367 | if (base != buffer) { | |
368 | delete[] base; | |
369 | } | |
370 | break; | |
371 | } | |
372 | } | |
373 | }; | |
374 | ||
375 | } // namespace | |
376 | ||
377 | // Finally, the hdfs environment | |
378 | ||
379 | const std::string HdfsEnv::kProto = "hdfs://"; | |
380 | const std::string HdfsEnv::pathsep = "/"; | |
381 | ||
382 | // open a file for sequential reading | |
383 | Status HdfsEnv::NewSequentialFile(const std::string& fname, | |
384 | unique_ptr<SequentialFile>* result, | |
385 | const EnvOptions& options) { | |
386 | result->reset(); | |
387 | HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); | |
388 | if (f == nullptr || !f->isValid()) { | |
389 | delete f; | |
390 | *result = nullptr; | |
391 | return IOError(fname, errno); | |
392 | } | |
393 | result->reset(dynamic_cast<SequentialFile*>(f)); | |
394 | return Status::OK(); | |
395 | } | |
396 | ||
397 | // open a file for random reading | |
398 | Status HdfsEnv::NewRandomAccessFile(const std::string& fname, | |
399 | unique_ptr<RandomAccessFile>* result, | |
400 | const EnvOptions& options) { | |
401 | result->reset(); | |
402 | HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); | |
403 | if (f == nullptr || !f->isValid()) { | |
404 | delete f; | |
405 | *result = nullptr; | |
406 | return IOError(fname, errno); | |
407 | } | |
408 | result->reset(dynamic_cast<RandomAccessFile*>(f)); | |
409 | return Status::OK(); | |
410 | } | |
411 | ||
412 | // create a new file for writing | |
413 | Status HdfsEnv::NewWritableFile(const std::string& fname, | |
414 | unique_ptr<WritableFile>* result, | |
415 | const EnvOptions& options) { | |
416 | result->reset(); | |
417 | Status s; | |
418 | HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); | |
419 | if (f == nullptr || !f->isValid()) { | |
420 | delete f; | |
421 | *result = nullptr; | |
422 | return IOError(fname, errno); | |
423 | } | |
424 | result->reset(dynamic_cast<WritableFile*>(f)); | |
425 | return Status::OK(); | |
426 | } | |
427 | ||
428 | class HdfsDirectory : public Directory { | |
429 | public: | |
430 | explicit HdfsDirectory(int fd) : fd_(fd) {} | |
431 | ~HdfsDirectory() {} | |
432 | ||
433 | virtual Status Fsync() { return Status::OK(); } | |
434 | ||
435 | private: | |
436 | int fd_; | |
437 | }; | |
438 | ||
439 | Status HdfsEnv::NewDirectory(const std::string& name, | |
440 | unique_ptr<Directory>* result) { | |
441 | int value = hdfsExists(fileSys_, name.c_str()); | |
442 | switch (value) { | |
443 | case HDFS_EXISTS: | |
444 | result->reset(new HdfsDirectory(0)); | |
445 | return Status::OK(); | |
446 | default: // fail if the directory doesn't exist | |
447 | ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed"); | |
448 | throw HdfsFatalException("hdfsExists call failed with error " + | |
449 | ToString(value) + " on path " + name + | |
450 | ".\n"); | |
451 | } | |
452 | } | |
453 | ||
454 | Status HdfsEnv::FileExists(const std::string& fname) { | |
455 | int value = hdfsExists(fileSys_, fname.c_str()); | |
456 | switch (value) { | |
457 | case HDFS_EXISTS: | |
458 | return Status::OK(); | |
459 | case HDFS_DOESNT_EXIST: | |
460 | return Status::NotFound(); | |
461 | default: // anything else should be an error | |
462 | ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed"); | |
463 | return Status::IOError("hdfsExists call failed with error " + | |
464 | ToString(value) + " on path " + fname + ".\n"); | |
465 | } | |
466 | } | |
467 | ||
468 | Status HdfsEnv::GetChildren(const std::string& path, | |
469 | std::vector<std::string>* result) { | |
470 | int value = hdfsExists(fileSys_, path.c_str()); | |
471 | switch (value) { | |
472 | case HDFS_EXISTS: { // directory exists | |
473 | int numEntries = 0; | |
474 | hdfsFileInfo* pHdfsFileInfo = 0; | |
475 | pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries); | |
476 | if (numEntries >= 0) { | |
477 | for(int i = 0; i < numEntries; i++) { | |
478 | char* pathname = pHdfsFileInfo[i].mName; | |
479 | char* filename = std::rindex(pathname, '/'); | |
480 | if (filename != nullptr) { | |
481 | result->push_back(filename+1); | |
482 | } | |
483 | } | |
484 | if (pHdfsFileInfo != nullptr) { | |
485 | hdfsFreeFileInfo(pHdfsFileInfo, numEntries); | |
486 | } | |
487 | } else { | |
488 | // numEntries < 0 indicates error | |
489 | ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error "); | |
490 | throw HdfsFatalException( | |
491 | "hdfsListDirectory call failed negative error.\n"); | |
492 | } | |
493 | break; | |
494 | } | |
495 | case HDFS_DOESNT_EXIST: // directory does not exist, exit | |
496 | return Status::NotFound(); | |
497 | default: // anything else should be an error | |
498 | ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed"); | |
499 | throw HdfsFatalException("hdfsExists call failed with error " + | |
500 | ToString(value) + ".\n"); | |
501 | } | |
502 | return Status::OK(); | |
503 | } | |
504 | ||
505 | Status HdfsEnv::DeleteFile(const std::string& fname) { | |
506 | if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) { | |
507 | return Status::OK(); | |
508 | } | |
509 | return IOError(fname, errno); | |
510 | }; | |
511 | ||
512 | Status HdfsEnv::CreateDir(const std::string& name) { | |
513 | if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) { | |
514 | return Status::OK(); | |
515 | } | |
516 | return IOError(name, errno); | |
517 | }; | |
518 | ||
519 | Status HdfsEnv::CreateDirIfMissing(const std::string& name) { | |
520 | const int value = hdfsExists(fileSys_, name.c_str()); | |
521 | // Not atomic. state might change b/w hdfsExists and CreateDir. | |
522 | switch (value) { | |
523 | case HDFS_EXISTS: | |
524 | return Status::OK(); | |
525 | case HDFS_DOESNT_EXIST: | |
526 | return CreateDir(name); | |
527 | default: // anything else should be an error | |
528 | ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed"); | |
529 | throw HdfsFatalException("hdfsExists call failed with error " + | |
530 | ToString(value) + ".\n"); | |
531 | } | |
532 | }; | |
533 | ||
534 | Status HdfsEnv::DeleteDir(const std::string& name) { | |
535 | return DeleteFile(name); | |
536 | }; | |
537 | ||
538 | Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) { | |
539 | *size = 0L; | |
540 | hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); | |
541 | if (pFileInfo != nullptr) { | |
542 | *size = pFileInfo->mSize; | |
543 | hdfsFreeFileInfo(pFileInfo, 1); | |
544 | return Status::OK(); | |
545 | } | |
546 | return IOError(fname, errno); | |
547 | } | |
548 | ||
549 | Status HdfsEnv::GetFileModificationTime(const std::string& fname, | |
550 | uint64_t* time) { | |
551 | hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); | |
552 | if (pFileInfo != nullptr) { | |
553 | *time = static_cast<uint64_t>(pFileInfo->mLastMod); | |
554 | hdfsFreeFileInfo(pFileInfo, 1); | |
555 | return Status::OK(); | |
556 | } | |
557 | return IOError(fname, errno); | |
558 | ||
559 | } | |
560 | ||
561 | // The rename is not atomic. HDFS does not allow a renaming if the | |
562 | // target already exists. So, we delete the target before attempting the | |
563 | // rename. | |
564 | Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) { | |
565 | hdfsDelete(fileSys_, target.c_str(), 1); | |
566 | if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) { | |
567 | return Status::OK(); | |
568 | } | |
569 | return IOError(src, errno); | |
570 | } | |
571 | ||
572 | Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) { | |
573 | // there isn's a very good way to atomically check and create | |
574 | // a file via libhdfs | |
575 | *lock = nullptr; | |
576 | return Status::OK(); | |
577 | } | |
578 | ||
579 | Status HdfsEnv::UnlockFile(FileLock* lock) { | |
580 | return Status::OK(); | |
581 | } | |
582 | ||
583 | Status HdfsEnv::NewLogger(const std::string& fname, | |
584 | shared_ptr<Logger>* result) { | |
585 | HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); | |
586 | if (f == nullptr || !f->isValid()) { | |
587 | delete f; | |
588 | *result = nullptr; | |
589 | return IOError(fname, errno); | |
590 | } | |
591 | HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid); | |
592 | result->reset(h); | |
593 | if (mylog == nullptr) { | |
594 | // mylog = h; // uncomment this for detailed logging | |
595 | } | |
596 | return Status::OK(); | |
597 | } | |
598 | ||
599 | // The factory method for creating an HDFS Env | |
600 | Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) { | |
601 | *hdfs_env = new HdfsEnv(fsname); | |
602 | return Status::OK(); | |
603 | } | |
604 | } // namespace rocksdb | |
605 | ||
606 | #endif // ROCKSDB_HDFS_FILE_C | |
607 | ||
608 | #else // USE_HDFS | |
609 | ||
610 | // dummy placeholders used when HDFS is not available | |
611 | namespace rocksdb { | |
11fdf7f2 TL |
612 | Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/, |
613 | unique_ptr<SequentialFile>* /*result*/, | |
614 | const EnvOptions& /*options*/) { | |
615 | return Status::NotSupported("Not compiled with hdfs support"); | |
7c673cae FG |
616 | } |
617 | ||
11fdf7f2 | 618 | Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) { |
7c673cae FG |
619 | return Status::NotSupported("Not compiled with hdfs support"); |
620 | } | |
621 | } | |
622 | ||
623 | #endif |