]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | ||
7 | #pragma once | |
8 | #include <algorithm> | |
9 | #include <stdio.h> | |
10 | #include <time.h> | |
11 | #include <iostream> | |
12 | #include "port/sys_time.h" | |
13 | #include "rocksdb/env.h" | |
14 | #include "rocksdb/status.h" | |
15 | ||
16 | #ifdef USE_HDFS | |
17 | #include <hdfs.h> | |
18 | ||
f67539c2 | 19 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
20 | |
21 | // Thrown during execution when there is an issue with the supplied | |
22 | // arguments. | |
23 | class HdfsUsageException : public std::exception { }; | |
24 | ||
25 | // A simple exception that indicates something went wrong that is not | |
26 | // recoverable. The intention is for the message to be printed (with | |
27 | // nothing else) and the process terminate. | |
28 | class HdfsFatalException : public std::exception { | |
29 | public: | |
30 | explicit HdfsFatalException(const std::string& s) : what_(s) { } | |
31 | virtual ~HdfsFatalException() throw() { } | |
32 | virtual const char* what() const throw() { | |
33 | return what_.c_str(); | |
34 | } | |
35 | private: | |
36 | const std::string what_; | |
37 | }; | |
38 | ||
39 | // | |
40 | // The HDFS environment for rocksdb. This class overrides all the | |
41 | // file/dir access methods and delegates the thread-mgmt methods to the | |
42 | // default posix environment. | |
43 | // | |
44 | class HdfsEnv : public Env { | |
45 | ||
46 | public: | |
47 | explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) { | |
48 | posixEnv = Env::Default(); | |
49 | fileSys_ = connectToPath(fsname_); | |
50 | } | |
51 | ||
52 | virtual ~HdfsEnv() { | |
53 | fprintf(stderr, "Destroying HdfsEnv::Default()\n"); | |
54 | hdfsDisconnect(fileSys_); | |
55 | } | |
56 | ||
494da23a TL |
57 | Status NewSequentialFile(const std::string& fname, |
58 | std::unique_ptr<SequentialFile>* result, | |
59 | const EnvOptions& options) override; | |
7c673cae | 60 | |
494da23a TL |
61 | Status NewRandomAccessFile(const std::string& fname, |
62 | std::unique_ptr<RandomAccessFile>* result, | |
63 | const EnvOptions& options) override; | |
7c673cae | 64 | |
494da23a TL |
65 | Status NewWritableFile(const std::string& fname, |
66 | std::unique_ptr<WritableFile>* result, | |
67 | const EnvOptions& options) override; | |
7c673cae | 68 | |
494da23a TL |
69 | Status NewDirectory(const std::string& name, |
70 | std::unique_ptr<Directory>* result) override; | |
7c673cae | 71 | |
494da23a | 72 | Status FileExists(const std::string& fname) override; |
7c673cae | 73 | |
494da23a TL |
74 | Status GetChildren(const std::string& path, |
75 | std::vector<std::string>* result) override; | |
7c673cae | 76 | |
494da23a | 77 | Status DeleteFile(const std::string& fname) override; |
7c673cae | 78 | |
494da23a | 79 | Status CreateDir(const std::string& name) override; |
7c673cae | 80 | |
494da23a | 81 | Status CreateDirIfMissing(const std::string& name) override; |
7c673cae | 82 | |
494da23a | 83 | Status DeleteDir(const std::string& name) override; |
7c673cae | 84 | |
494da23a | 85 | Status GetFileSize(const std::string& fname, uint64_t* size) override; |
7c673cae | 86 | |
494da23a TL |
87 | Status GetFileModificationTime(const std::string& fname, |
88 | uint64_t* file_mtime) override; | |
7c673cae | 89 | |
494da23a | 90 | Status RenameFile(const std::string& src, const std::string& target) override; |
7c673cae | 91 | |
494da23a TL |
92 | Status LinkFile(const std::string& /*src*/, |
93 | const std::string& /*target*/) override { | |
7c673cae FG |
94 | return Status::NotSupported(); // not supported |
95 | } | |
96 | ||
494da23a | 97 | Status LockFile(const std::string& fname, FileLock** lock) override; |
7c673cae | 98 | |
494da23a | 99 | Status UnlockFile(FileLock* lock) override; |
7c673cae | 100 | |
494da23a TL |
101 | Status NewLogger(const std::string& fname, |
102 | std::shared_ptr<Logger>* result) override; | |
7c673cae | 103 | |
20effc67 TL |
104 | Status IsDirectory(const std::string& path, bool* is_dir) override; |
105 | ||
494da23a TL |
106 | void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW, |
107 | void* tag = nullptr, | |
108 | void (*unschedFunction)(void* arg) = 0) override { | |
7c673cae FG |
109 | posixEnv->Schedule(function, arg, pri, tag, unschedFunction); |
110 | } | |
111 | ||
494da23a | 112 | int UnSchedule(void* tag, Priority pri) override { |
7c673cae FG |
113 | return posixEnv->UnSchedule(tag, pri); |
114 | } | |
115 | ||
494da23a | 116 | void StartThread(void (*function)(void* arg), void* arg) override { |
7c673cae FG |
117 | posixEnv->StartThread(function, arg); |
118 | } | |
119 | ||
494da23a | 120 | void WaitForJoin() override { posixEnv->WaitForJoin(); } |
7c673cae | 121 | |
494da23a | 122 | unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override { |
7c673cae FG |
123 | return posixEnv->GetThreadPoolQueueLen(pri); |
124 | } | |
125 | ||
494da23a | 126 | Status GetTestDirectory(std::string* path) override { |
7c673cae FG |
127 | return posixEnv->GetTestDirectory(path); |
128 | } | |
129 | ||
494da23a | 130 | uint64_t NowMicros() override { return posixEnv->NowMicros(); } |
7c673cae | 131 | |
494da23a | 132 | void SleepForMicroseconds(int micros) override { |
7c673cae FG |
133 | posixEnv->SleepForMicroseconds(micros); |
134 | } | |
135 | ||
494da23a | 136 | Status GetHostName(char* name, uint64_t len) override { |
7c673cae FG |
137 | return posixEnv->GetHostName(name, len); |
138 | } | |
139 | ||
494da23a | 140 | Status GetCurrentTime(int64_t* unix_time) override { |
7c673cae FG |
141 | return posixEnv->GetCurrentTime(unix_time); |
142 | } | |
143 | ||
494da23a TL |
144 | Status GetAbsolutePath(const std::string& db_path, |
145 | std::string* output_path) override { | |
7c673cae FG |
146 | return posixEnv->GetAbsolutePath(db_path, output_path); |
147 | } | |
148 | ||
494da23a | 149 | void SetBackgroundThreads(int number, Priority pri = LOW) override { |
7c673cae FG |
150 | posixEnv->SetBackgroundThreads(number, pri); |
151 | } | |
152 | ||
494da23a | 153 | int GetBackgroundThreads(Priority pri = LOW) override { |
11fdf7f2 TL |
154 | return posixEnv->GetBackgroundThreads(pri); |
155 | } | |
156 | ||
494da23a | 157 | void IncBackgroundThreadsIfNeeded(int number, Priority pri) override { |
7c673cae FG |
158 | posixEnv->IncBackgroundThreadsIfNeeded(number, pri); |
159 | } | |
160 | ||
494da23a | 161 | std::string TimeToString(uint64_t number) override { |
7c673cae FG |
162 | return posixEnv->TimeToString(number); |
163 | } | |
164 | ||
20effc67 | 165 | static uint64_t gettid() { return Env::Default()->GetThreadID(); } |
7c673cae | 166 | |
494da23a | 167 | uint64_t GetThreadID() const override { return HdfsEnv::gettid(); } |
7c673cae FG |
168 | |
169 | private: | |
170 | std::string fsname_; // string of the form "hdfs://hostname:port/" | |
171 | hdfsFS fileSys_; // a single FileSystem object for all files | |
172 | Env* posixEnv; // This object is derived from Env, but not from | |
173 | // posixEnv. We have posixnv as an encapsulated | |
174 | // object here so that we can use posix timers, | |
175 | // posix threads, etc. | |
176 | ||
177 | static const std::string kProto; | |
178 | static const std::string pathsep; | |
179 | ||
180 | /** | |
181 | * If the URI is specified of the form hdfs://server:port/path, | |
182 | * then connect to the specified cluster | |
183 | * else connect to default. | |
184 | */ | |
185 | hdfsFS connectToPath(const std::string& uri) { | |
186 | if (uri.empty()) { | |
187 | return nullptr; | |
188 | } | |
189 | if (uri.find(kProto) != 0) { | |
190 | // uri doesn't start with hdfs:// -> use default:0, which is special | |
191 | // to libhdfs. | |
192 | return hdfsConnectNewInstance("default", 0); | |
193 | } | |
194 | const std::string hostport = uri.substr(kProto.length()); | |
195 | ||
196 | std::vector <std::string> parts; | |
197 | split(hostport, ':', parts); | |
198 | if (parts.size() != 2) { | |
199 | throw HdfsFatalException("Bad uri for hdfs " + uri); | |
200 | } | |
201 | // parts[0] = hosts, parts[1] = port/xxx/yyy | |
202 | std::string host(parts[0]); | |
203 | std::string remaining(parts[1]); | |
204 | ||
494da23a | 205 | int rem = static_cast<int>(remaining.find(pathsep)); |
7c673cae FG |
206 | std::string portStr = (rem == 0 ? remaining : |
207 | remaining.substr(0, rem)); | |
208 | ||
20effc67 | 209 | tPort port = static_cast<tPort>(atoi(portStr.c_str())); |
7c673cae FG |
210 | if (port == 0) { |
211 | throw HdfsFatalException("Bad host-port for hdfs " + uri); | |
212 | } | |
213 | hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port); | |
214 | return fs; | |
215 | } | |
216 | ||
217 | void split(const std::string &s, char delim, | |
218 | std::vector<std::string> &elems) { | |
219 | elems.clear(); | |
220 | size_t prev = 0; | |
221 | size_t pos = s.find(delim); | |
222 | while (pos != std::string::npos) { | |
223 | elems.push_back(s.substr(prev, pos)); | |
224 | prev = pos + 1; | |
225 | pos = s.find(delim, prev); | |
226 | } | |
227 | elems.push_back(s.substr(prev, s.size())); | |
228 | } | |
229 | }; | |
230 | ||
f67539c2 | 231 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
232 | |
233 | #else // USE_HDFS | |
234 | ||
f67539c2 | 235 | namespace ROCKSDB_NAMESPACE { |
7c673cae | 236 | |
7c673cae FG |
237 | class HdfsEnv : public Env { |
238 | ||
239 | public: | |
11fdf7f2 | 240 | explicit HdfsEnv(const std::string& /*fsname*/) { |
7c673cae FG |
241 | fprintf(stderr, "You have not build rocksdb with HDFS support\n"); |
242 | fprintf(stderr, "Please see hdfs/README for details\n"); | |
243 | abort(); | |
244 | } | |
245 | ||
246 | virtual ~HdfsEnv() { | |
247 | } | |
248 | ||
249 | virtual Status NewSequentialFile(const std::string& fname, | |
494da23a | 250 | std::unique_ptr<SequentialFile>* result, |
7c673cae FG |
251 | const EnvOptions& options) override; |
252 | ||
494da23a TL |
253 | virtual Status NewRandomAccessFile( |
254 | const std::string& /*fname*/, | |
255 | std::unique_ptr<RandomAccessFile>* /*result*/, | |
256 | const EnvOptions& /*options*/) override { | |
20effc67 | 257 | return Status::NotSupported(); |
7c673cae FG |
258 | } |
259 | ||
11fdf7f2 | 260 | virtual Status NewWritableFile(const std::string& /*fname*/, |
494da23a | 261 | std::unique_ptr<WritableFile>* /*result*/, |
11fdf7f2 | 262 | const EnvOptions& /*options*/) override { |
20effc67 | 263 | return Status::NotSupported(); |
7c673cae FG |
264 | } |
265 | ||
11fdf7f2 | 266 | virtual Status NewDirectory(const std::string& /*name*/, |
494da23a | 267 | std::unique_ptr<Directory>* /*result*/) override { |
20effc67 | 268 | return Status::NotSupported(); |
7c673cae FG |
269 | } |
270 | ||
11fdf7f2 | 271 | virtual Status FileExists(const std::string& /*fname*/) override { |
20effc67 | 272 | return Status::NotSupported(); |
7c673cae FG |
273 | } |
274 | ||
11fdf7f2 TL |
275 | virtual Status GetChildren(const std::string& /*path*/, |
276 | std::vector<std::string>* /*result*/) override { | |
20effc67 | 277 | return Status::NotSupported(); |
7c673cae FG |
278 | } |
279 | ||
11fdf7f2 | 280 | virtual Status DeleteFile(const std::string& /*fname*/) override { |
20effc67 | 281 | return Status::NotSupported(); |
7c673cae FG |
282 | } |
283 | ||
11fdf7f2 | 284 | virtual Status CreateDir(const std::string& /*name*/) override { |
20effc67 | 285 | return Status::NotSupported(); |
11fdf7f2 | 286 | } |
7c673cae | 287 | |
11fdf7f2 | 288 | virtual Status CreateDirIfMissing(const std::string& /*name*/) override { |
20effc67 | 289 | return Status::NotSupported(); |
7c673cae FG |
290 | } |
291 | ||
11fdf7f2 | 292 | virtual Status DeleteDir(const std::string& /*name*/) override { |
20effc67 | 293 | return Status::NotSupported(); |
11fdf7f2 | 294 | } |
7c673cae | 295 | |
11fdf7f2 TL |
296 | virtual Status GetFileSize(const std::string& /*fname*/, |
297 | uint64_t* /*size*/) override { | |
20effc67 | 298 | return Status::NotSupported(); |
7c673cae FG |
299 | } |
300 | ||
11fdf7f2 TL |
301 | virtual Status GetFileModificationTime(const std::string& /*fname*/, |
302 | uint64_t* /*time*/) override { | |
20effc67 | 303 | return Status::NotSupported(); |
7c673cae FG |
304 | } |
305 | ||
11fdf7f2 TL |
306 | virtual Status RenameFile(const std::string& /*src*/, |
307 | const std::string& /*target*/) override { | |
20effc67 | 308 | return Status::NotSupported(); |
7c673cae FG |
309 | } |
310 | ||
11fdf7f2 TL |
311 | virtual Status LinkFile(const std::string& /*src*/, |
312 | const std::string& /*target*/) override { | |
20effc67 | 313 | return Status::NotSupported(); |
7c673cae FG |
314 | } |
315 | ||
11fdf7f2 TL |
316 | virtual Status LockFile(const std::string& /*fname*/, |
317 | FileLock** /*lock*/) override { | |
20effc67 | 318 | return Status::NotSupported(); |
7c673cae FG |
319 | } |
320 | ||
20effc67 TL |
321 | virtual Status UnlockFile(FileLock* /*lock*/) override { |
322 | return Status::NotSupported(); | |
323 | } | |
7c673cae | 324 | |
11fdf7f2 | 325 | virtual Status NewLogger(const std::string& /*fname*/, |
494da23a | 326 | std::shared_ptr<Logger>* /*result*/) override { |
20effc67 TL |
327 | return Status::NotSupported(); |
328 | } | |
329 | ||
330 | Status IsDirectory(const std::string& /*path*/, bool* /*is_dir*/) override { | |
331 | return Status::NotSupported(); | |
7c673cae FG |
332 | } |
333 | ||
11fdf7f2 TL |
334 | virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/, |
335 | Priority /*pri*/ = LOW, void* /*tag*/ = nullptr, | |
336 | void (* /*unschedFunction*/)(void* arg) = 0) override {} | |
7c673cae | 337 | |
11fdf7f2 | 338 | virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; } |
7c673cae | 339 | |
11fdf7f2 TL |
340 | virtual void StartThread(void (* /*function*/)(void* arg), |
341 | void* /*arg*/) override {} | |
7c673cae FG |
342 | |
343 | virtual void WaitForJoin() override {} | |
344 | ||
345 | virtual unsigned int GetThreadPoolQueueLen( | |
11fdf7f2 | 346 | Priority /*pri*/ = LOW) const override { |
7c673cae FG |
347 | return 0; |
348 | } | |
349 | ||
11fdf7f2 | 350 | virtual Status GetTestDirectory(std::string* /*path*/) override { |
20effc67 | 351 | return Status::NotSupported(); |
11fdf7f2 | 352 | } |
7c673cae FG |
353 | |
354 | virtual uint64_t NowMicros() override { return 0; } | |
355 | ||
11fdf7f2 | 356 | virtual void SleepForMicroseconds(int /*micros*/) override {} |
7c673cae | 357 | |
11fdf7f2 | 358 | virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override { |
20effc67 | 359 | return Status::NotSupported(); |
7c673cae FG |
360 | } |
361 | ||
11fdf7f2 | 362 | virtual Status GetCurrentTime(int64_t* /*unix_time*/) override { |
20effc67 | 363 | return Status::NotSupported(); |
11fdf7f2 | 364 | } |
7c673cae | 365 | |
11fdf7f2 TL |
366 | virtual Status GetAbsolutePath(const std::string& /*db_path*/, |
367 | std::string* /*outputpath*/) override { | |
20effc67 | 368 | return Status::NotSupported(); |
7c673cae FG |
369 | } |
370 | ||
11fdf7f2 TL |
371 | virtual void SetBackgroundThreads(int /*number*/, |
372 | Priority /*pri*/ = LOW) override {} | |
373 | virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override { | |
374 | return 0; | |
7c673cae | 375 | } |
11fdf7f2 TL |
376 | virtual void IncBackgroundThreadsIfNeeded(int /*number*/, |
377 | Priority /*pri*/) override {} | |
378 | virtual std::string TimeToString(uint64_t /*number*/) override { return ""; } | |
7c673cae FG |
379 | |
380 | virtual uint64_t GetThreadID() const override { | |
381 | return 0; | |
382 | } | |
383 | }; | |
f67539c2 | 384 | } // namespace ROCKSDB_NAMESPACE |
7c673cae FG |
385 | |
386 | #endif // USE_HDFS |