]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/hdfs/env_hdfs.h
buildsys: change download over to reef release
[ceph.git] / ceph / src / rocksdb / hdfs / env_hdfs.h
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6
7#pragma once
8#include <algorithm>
9#include <stdio.h>
10#include <time.h>
11#include <iostream>
12#include "port/sys_time.h"
13#include "rocksdb/env.h"
14#include "rocksdb/status.h"
15
16#ifdef USE_HDFS
17#include <hdfs.h>
18
f67539c2 19namespace ROCKSDB_NAMESPACE {
7c673cae
FG
20
21// Thrown during execution when there is an issue with the supplied
22// arguments.
23class HdfsUsageException : public std::exception { };
24
25// A simple exception that indicates something went wrong that is not
26// recoverable. The intention is for the message to be printed (with
27// nothing else) and the process terminate.
28class HdfsFatalException : public std::exception {
29public:
30 explicit HdfsFatalException(const std::string& s) : what_(s) { }
31 virtual ~HdfsFatalException() throw() { }
32 virtual const char* what() const throw() {
33 return what_.c_str();
34 }
35private:
36 const std::string what_;
37};
38
39//
40// The HDFS environment for rocksdb. This class overrides all the
41// file/dir access methods and delegates the thread-mgmt methods to the
42// default posix environment.
43//
44class HdfsEnv : public Env {
45
46 public:
47 explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) {
48 posixEnv = Env::Default();
49 fileSys_ = connectToPath(fsname_);
50 }
51
52 virtual ~HdfsEnv() {
53 fprintf(stderr, "Destroying HdfsEnv::Default()\n");
54 hdfsDisconnect(fileSys_);
55 }
56
494da23a
TL
57 Status NewSequentialFile(const std::string& fname,
58 std::unique_ptr<SequentialFile>* result,
59 const EnvOptions& options) override;
7c673cae 60
494da23a
TL
61 Status NewRandomAccessFile(const std::string& fname,
62 std::unique_ptr<RandomAccessFile>* result,
63 const EnvOptions& options) override;
7c673cae 64
494da23a
TL
65 Status NewWritableFile(const std::string& fname,
66 std::unique_ptr<WritableFile>* result,
67 const EnvOptions& options) override;
7c673cae 68
494da23a
TL
69 Status NewDirectory(const std::string& name,
70 std::unique_ptr<Directory>* result) override;
7c673cae 71
494da23a 72 Status FileExists(const std::string& fname) override;
7c673cae 73
494da23a
TL
74 Status GetChildren(const std::string& path,
75 std::vector<std::string>* result) override;
7c673cae 76
494da23a 77 Status DeleteFile(const std::string& fname) override;
7c673cae 78
494da23a 79 Status CreateDir(const std::string& name) override;
7c673cae 80
494da23a 81 Status CreateDirIfMissing(const std::string& name) override;
7c673cae 82
494da23a 83 Status DeleteDir(const std::string& name) override;
7c673cae 84
494da23a 85 Status GetFileSize(const std::string& fname, uint64_t* size) override;
7c673cae 86
494da23a
TL
87 Status GetFileModificationTime(const std::string& fname,
88 uint64_t* file_mtime) override;
7c673cae 89
494da23a 90 Status RenameFile(const std::string& src, const std::string& target) override;
7c673cae 91
494da23a
TL
92 Status LinkFile(const std::string& /*src*/,
93 const std::string& /*target*/) override {
7c673cae
FG
94 return Status::NotSupported(); // not supported
95 }
96
494da23a 97 Status LockFile(const std::string& fname, FileLock** lock) override;
7c673cae 98
494da23a 99 Status UnlockFile(FileLock* lock) override;
7c673cae 100
494da23a
TL
101 Status NewLogger(const std::string& fname,
102 std::shared_ptr<Logger>* result) override;
7c673cae 103
20effc67
TL
104 Status IsDirectory(const std::string& path, bool* is_dir) override;
105
494da23a
TL
106 void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
107 void* tag = nullptr,
108 void (*unschedFunction)(void* arg) = 0) override {
7c673cae
FG
109 posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
110 }
111
494da23a 112 int UnSchedule(void* tag, Priority pri) override {
7c673cae
FG
113 return posixEnv->UnSchedule(tag, pri);
114 }
115
494da23a 116 void StartThread(void (*function)(void* arg), void* arg) override {
7c673cae
FG
117 posixEnv->StartThread(function, arg);
118 }
119
494da23a 120 void WaitForJoin() override { posixEnv->WaitForJoin(); }
7c673cae 121
494da23a 122 unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
7c673cae
FG
123 return posixEnv->GetThreadPoolQueueLen(pri);
124 }
125
494da23a 126 Status GetTestDirectory(std::string* path) override {
7c673cae
FG
127 return posixEnv->GetTestDirectory(path);
128 }
129
494da23a 130 uint64_t NowMicros() override { return posixEnv->NowMicros(); }
7c673cae 131
494da23a 132 void SleepForMicroseconds(int micros) override {
7c673cae
FG
133 posixEnv->SleepForMicroseconds(micros);
134 }
135
494da23a 136 Status GetHostName(char* name, uint64_t len) override {
7c673cae
FG
137 return posixEnv->GetHostName(name, len);
138 }
139
494da23a 140 Status GetCurrentTime(int64_t* unix_time) override {
7c673cae
FG
141 return posixEnv->GetCurrentTime(unix_time);
142 }
143
494da23a
TL
144 Status GetAbsolutePath(const std::string& db_path,
145 std::string* output_path) override {
7c673cae
FG
146 return posixEnv->GetAbsolutePath(db_path, output_path);
147 }
148
494da23a 149 void SetBackgroundThreads(int number, Priority pri = LOW) override {
7c673cae
FG
150 posixEnv->SetBackgroundThreads(number, pri);
151 }
152
494da23a 153 int GetBackgroundThreads(Priority pri = LOW) override {
11fdf7f2
TL
154 return posixEnv->GetBackgroundThreads(pri);
155 }
156
494da23a 157 void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
7c673cae
FG
158 posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
159 }
160
494da23a 161 std::string TimeToString(uint64_t number) override {
7c673cae
FG
162 return posixEnv->TimeToString(number);
163 }
164
20effc67 165 static uint64_t gettid() { return Env::Default()->GetThreadID(); }
7c673cae 166
494da23a 167 uint64_t GetThreadID() const override { return HdfsEnv::gettid(); }
7c673cae
FG
168
169 private:
170 std::string fsname_; // string of the form "hdfs://hostname:port/"
171 hdfsFS fileSys_; // a single FileSystem object for all files
172 Env* posixEnv; // This object is derived from Env, but not from
173 // posixEnv. We have posixnv as an encapsulated
174 // object here so that we can use posix timers,
175 // posix threads, etc.
176
177 static const std::string kProto;
178 static const std::string pathsep;
179
180 /**
181 * If the URI is specified of the form hdfs://server:port/path,
182 * then connect to the specified cluster
183 * else connect to default.
184 */
185 hdfsFS connectToPath(const std::string& uri) {
186 if (uri.empty()) {
187 return nullptr;
188 }
189 if (uri.find(kProto) != 0) {
190 // uri doesn't start with hdfs:// -> use default:0, which is special
191 // to libhdfs.
192 return hdfsConnectNewInstance("default", 0);
193 }
194 const std::string hostport = uri.substr(kProto.length());
195
196 std::vector <std::string> parts;
197 split(hostport, ':', parts);
198 if (parts.size() != 2) {
199 throw HdfsFatalException("Bad uri for hdfs " + uri);
200 }
201 // parts[0] = hosts, parts[1] = port/xxx/yyy
202 std::string host(parts[0]);
203 std::string remaining(parts[1]);
204
494da23a 205 int rem = static_cast<int>(remaining.find(pathsep));
7c673cae
FG
206 std::string portStr = (rem == 0 ? remaining :
207 remaining.substr(0, rem));
208
20effc67 209 tPort port = static_cast<tPort>(atoi(portStr.c_str()));
7c673cae
FG
210 if (port == 0) {
211 throw HdfsFatalException("Bad host-port for hdfs " + uri);
212 }
213 hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
214 return fs;
215 }
216
217 void split(const std::string &s, char delim,
218 std::vector<std::string> &elems) {
219 elems.clear();
220 size_t prev = 0;
221 size_t pos = s.find(delim);
222 while (pos != std::string::npos) {
223 elems.push_back(s.substr(prev, pos));
224 prev = pos + 1;
225 pos = s.find(delim, prev);
226 }
227 elems.push_back(s.substr(prev, s.size()));
228 }
229};
230
f67539c2 231} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
232
233#else // USE_HDFS
234
f67539c2 235namespace ROCKSDB_NAMESPACE {
7c673cae 236
7c673cae
FG
237class HdfsEnv : public Env {
238
239 public:
11fdf7f2 240 explicit HdfsEnv(const std::string& /*fsname*/) {
7c673cae
FG
241 fprintf(stderr, "You have not build rocksdb with HDFS support\n");
242 fprintf(stderr, "Please see hdfs/README for details\n");
243 abort();
244 }
245
246 virtual ~HdfsEnv() {
247 }
248
249 virtual Status NewSequentialFile(const std::string& fname,
494da23a 250 std::unique_ptr<SequentialFile>* result,
7c673cae
FG
251 const EnvOptions& options) override;
252
494da23a
TL
253 virtual Status NewRandomAccessFile(
254 const std::string& /*fname*/,
255 std::unique_ptr<RandomAccessFile>* /*result*/,
256 const EnvOptions& /*options*/) override {
20effc67 257 return Status::NotSupported();
7c673cae
FG
258 }
259
11fdf7f2 260 virtual Status NewWritableFile(const std::string& /*fname*/,
494da23a 261 std::unique_ptr<WritableFile>* /*result*/,
11fdf7f2 262 const EnvOptions& /*options*/) override {
20effc67 263 return Status::NotSupported();
7c673cae
FG
264 }
265
11fdf7f2 266 virtual Status NewDirectory(const std::string& /*name*/,
494da23a 267 std::unique_ptr<Directory>* /*result*/) override {
20effc67 268 return Status::NotSupported();
7c673cae
FG
269 }
270
11fdf7f2 271 virtual Status FileExists(const std::string& /*fname*/) override {
20effc67 272 return Status::NotSupported();
7c673cae
FG
273 }
274
11fdf7f2
TL
275 virtual Status GetChildren(const std::string& /*path*/,
276 std::vector<std::string>* /*result*/) override {
20effc67 277 return Status::NotSupported();
7c673cae
FG
278 }
279
11fdf7f2 280 virtual Status DeleteFile(const std::string& /*fname*/) override {
20effc67 281 return Status::NotSupported();
7c673cae
FG
282 }
283
11fdf7f2 284 virtual Status CreateDir(const std::string& /*name*/) override {
20effc67 285 return Status::NotSupported();
11fdf7f2 286 }
7c673cae 287
11fdf7f2 288 virtual Status CreateDirIfMissing(const std::string& /*name*/) override {
20effc67 289 return Status::NotSupported();
7c673cae
FG
290 }
291
11fdf7f2 292 virtual Status DeleteDir(const std::string& /*name*/) override {
20effc67 293 return Status::NotSupported();
11fdf7f2 294 }
7c673cae 295
11fdf7f2
TL
296 virtual Status GetFileSize(const std::string& /*fname*/,
297 uint64_t* /*size*/) override {
20effc67 298 return Status::NotSupported();
7c673cae
FG
299 }
300
11fdf7f2
TL
301 virtual Status GetFileModificationTime(const std::string& /*fname*/,
302 uint64_t* /*time*/) override {
20effc67 303 return Status::NotSupported();
7c673cae
FG
304 }
305
11fdf7f2
TL
306 virtual Status RenameFile(const std::string& /*src*/,
307 const std::string& /*target*/) override {
20effc67 308 return Status::NotSupported();
7c673cae
FG
309 }
310
11fdf7f2
TL
311 virtual Status LinkFile(const std::string& /*src*/,
312 const std::string& /*target*/) override {
20effc67 313 return Status::NotSupported();
7c673cae
FG
314 }
315
11fdf7f2
TL
316 virtual Status LockFile(const std::string& /*fname*/,
317 FileLock** /*lock*/) override {
20effc67 318 return Status::NotSupported();
7c673cae
FG
319 }
320
20effc67
TL
321 virtual Status UnlockFile(FileLock* /*lock*/) override {
322 return Status::NotSupported();
323 }
7c673cae 324
11fdf7f2 325 virtual Status NewLogger(const std::string& /*fname*/,
494da23a 326 std::shared_ptr<Logger>* /*result*/) override {
20effc67
TL
327 return Status::NotSupported();
328 }
329
330 Status IsDirectory(const std::string& /*path*/, bool* /*is_dir*/) override {
331 return Status::NotSupported();
7c673cae
FG
332 }
333
11fdf7f2
TL
334 virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/,
335 Priority /*pri*/ = LOW, void* /*tag*/ = nullptr,
336 void (* /*unschedFunction*/)(void* arg) = 0) override {}
7c673cae 337
11fdf7f2 338 virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; }
7c673cae 339
11fdf7f2
TL
340 virtual void StartThread(void (* /*function*/)(void* arg),
341 void* /*arg*/) override {}
7c673cae
FG
342
343 virtual void WaitForJoin() override {}
344
345 virtual unsigned int GetThreadPoolQueueLen(
11fdf7f2 346 Priority /*pri*/ = LOW) const override {
7c673cae
FG
347 return 0;
348 }
349
11fdf7f2 350 virtual Status GetTestDirectory(std::string* /*path*/) override {
20effc67 351 return Status::NotSupported();
11fdf7f2 352 }
7c673cae
FG
353
354 virtual uint64_t NowMicros() override { return 0; }
355
11fdf7f2 356 virtual void SleepForMicroseconds(int /*micros*/) override {}
7c673cae 357
11fdf7f2 358 virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override {
20effc67 359 return Status::NotSupported();
7c673cae
FG
360 }
361
11fdf7f2 362 virtual Status GetCurrentTime(int64_t* /*unix_time*/) override {
20effc67 363 return Status::NotSupported();
11fdf7f2 364 }
7c673cae 365
11fdf7f2
TL
366 virtual Status GetAbsolutePath(const std::string& /*db_path*/,
367 std::string* /*outputpath*/) override {
20effc67 368 return Status::NotSupported();
7c673cae
FG
369 }
370
11fdf7f2
TL
371 virtual void SetBackgroundThreads(int /*number*/,
372 Priority /*pri*/ = LOW) override {}
373 virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override {
374 return 0;
7c673cae 375 }
11fdf7f2
TL
376 virtual void IncBackgroundThreadsIfNeeded(int /*number*/,
377 Priority /*pri*/) override {}
378 virtual std::string TimeToString(uint64_t /*number*/) override { return ""; }
7c673cae
FG
379
380 virtual uint64_t GetThreadID() const override {
381 return 0;
382 }
383};
f67539c2 384} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
385
386#endif // USE_HDFS