]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/hdfs/env_hdfs.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / hdfs / env_hdfs.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6
7 #pragma once
8 #include <algorithm>
9 #include <stdio.h>
10 #include <time.h>
11 #include <iostream>
12 #include "port/sys_time.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/status.h"
15
16 #ifdef USE_HDFS
17 #include <hdfs.h>
18
19 namespace rocksdb {
20
21 // Thrown during execution when there is an issue with the supplied
22 // arguments.
23 class HdfsUsageException : public std::exception { };
24
25 // A simple exception that indicates something went wrong that is not
26 // recoverable. The intention is for the message to be printed (with
27 // nothing else) and the process terminate.
28 class HdfsFatalException : public std::exception {
29 public:
30 explicit HdfsFatalException(const std::string& s) : what_(s) { }
31 virtual ~HdfsFatalException() throw() { }
32 virtual const char* what() const throw() {
33 return what_.c_str();
34 }
35 private:
36 const std::string what_;
37 };
38
39 //
40 // The HDFS environment for rocksdb. This class overrides all the
41 // file/dir access methods and delegates the thread-mgmt methods to the
42 // default posix environment.
43 //
44 class HdfsEnv : public Env {
45
46 public:
47 explicit HdfsEnv(const std::string& fsname) : fsname_(fsname) {
48 posixEnv = Env::Default();
49 fileSys_ = connectToPath(fsname_);
50 }
51
52 virtual ~HdfsEnv() {
53 fprintf(stderr, "Destroying HdfsEnv::Default()\n");
54 hdfsDisconnect(fileSys_);
55 }
56
57 Status NewSequentialFile(const std::string& fname,
58 std::unique_ptr<SequentialFile>* result,
59 const EnvOptions& options) override;
60
61 Status NewRandomAccessFile(const std::string& fname,
62 std::unique_ptr<RandomAccessFile>* result,
63 const EnvOptions& options) override;
64
65 Status NewWritableFile(const std::string& fname,
66 std::unique_ptr<WritableFile>* result,
67 const EnvOptions& options) override;
68
69 Status NewDirectory(const std::string& name,
70 std::unique_ptr<Directory>* result) override;
71
72 Status FileExists(const std::string& fname) override;
73
74 Status GetChildren(const std::string& path,
75 std::vector<std::string>* result) override;
76
77 Status DeleteFile(const std::string& fname) override;
78
79 Status CreateDir(const std::string& name) override;
80
81 Status CreateDirIfMissing(const std::string& name) override;
82
83 Status DeleteDir(const std::string& name) override;
84
85 Status GetFileSize(const std::string& fname, uint64_t* size) override;
86
87 Status GetFileModificationTime(const std::string& fname,
88 uint64_t* file_mtime) override;
89
90 Status RenameFile(const std::string& src, const std::string& target) override;
91
92 Status LinkFile(const std::string& /*src*/,
93 const std::string& /*target*/) override {
94 return Status::NotSupported(); // not supported
95 }
96
97 Status LockFile(const std::string& fname, FileLock** lock) override;
98
99 Status UnlockFile(FileLock* lock) override;
100
101 Status NewLogger(const std::string& fname,
102 std::shared_ptr<Logger>* result) override;
103
104 void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
105 void* tag = nullptr,
106 void (*unschedFunction)(void* arg) = 0) override {
107 posixEnv->Schedule(function, arg, pri, tag, unschedFunction);
108 }
109
110 int UnSchedule(void* tag, Priority pri) override {
111 return posixEnv->UnSchedule(tag, pri);
112 }
113
114 void StartThread(void (*function)(void* arg), void* arg) override {
115 posixEnv->StartThread(function, arg);
116 }
117
118 void WaitForJoin() override { posixEnv->WaitForJoin(); }
119
120 unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
121 return posixEnv->GetThreadPoolQueueLen(pri);
122 }
123
124 Status GetTestDirectory(std::string* path) override {
125 return posixEnv->GetTestDirectory(path);
126 }
127
128 uint64_t NowMicros() override { return posixEnv->NowMicros(); }
129
130 void SleepForMicroseconds(int micros) override {
131 posixEnv->SleepForMicroseconds(micros);
132 }
133
134 Status GetHostName(char* name, uint64_t len) override {
135 return posixEnv->GetHostName(name, len);
136 }
137
138 Status GetCurrentTime(int64_t* unix_time) override {
139 return posixEnv->GetCurrentTime(unix_time);
140 }
141
142 Status GetAbsolutePath(const std::string& db_path,
143 std::string* output_path) override {
144 return posixEnv->GetAbsolutePath(db_path, output_path);
145 }
146
147 void SetBackgroundThreads(int number, Priority pri = LOW) override {
148 posixEnv->SetBackgroundThreads(number, pri);
149 }
150
151 int GetBackgroundThreads(Priority pri = LOW) override {
152 return posixEnv->GetBackgroundThreads(pri);
153 }
154
155 void IncBackgroundThreadsIfNeeded(int number, Priority pri) override {
156 posixEnv->IncBackgroundThreadsIfNeeded(number, pri);
157 }
158
159 std::string TimeToString(uint64_t number) override {
160 return posixEnv->TimeToString(number);
161 }
162
163 static uint64_t gettid() {
164 assert(sizeof(pthread_t) <= sizeof(uint64_t));
165 return (uint64_t)pthread_self();
166 }
167
168 uint64_t GetThreadID() const override { return HdfsEnv::gettid(); }
169
170 private:
171 std::string fsname_; // string of the form "hdfs://hostname:port/"
172 hdfsFS fileSys_; // a single FileSystem object for all files
173 Env* posixEnv; // This object is derived from Env, but not from
174 // posixEnv. We have posixnv as an encapsulated
175 // object here so that we can use posix timers,
176 // posix threads, etc.
177
178 static const std::string kProto;
179 static const std::string pathsep;
180
181 /**
182 * If the URI is specified of the form hdfs://server:port/path,
183 * then connect to the specified cluster
184 * else connect to default.
185 */
186 hdfsFS connectToPath(const std::string& uri) {
187 if (uri.empty()) {
188 return nullptr;
189 }
190 if (uri.find(kProto) != 0) {
191 // uri doesn't start with hdfs:// -> use default:0, which is special
192 // to libhdfs.
193 return hdfsConnectNewInstance("default", 0);
194 }
195 const std::string hostport = uri.substr(kProto.length());
196
197 std::vector <std::string> parts;
198 split(hostport, ':', parts);
199 if (parts.size() != 2) {
200 throw HdfsFatalException("Bad uri for hdfs " + uri);
201 }
202 // parts[0] = hosts, parts[1] = port/xxx/yyy
203 std::string host(parts[0]);
204 std::string remaining(parts[1]);
205
206 int rem = static_cast<int>(remaining.find(pathsep));
207 std::string portStr = (rem == 0 ? remaining :
208 remaining.substr(0, rem));
209
210 tPort port;
211 port = atoi(portStr.c_str());
212 if (port == 0) {
213 throw HdfsFatalException("Bad host-port for hdfs " + uri);
214 }
215 hdfsFS fs = hdfsConnectNewInstance(host.c_str(), port);
216 return fs;
217 }
218
219 void split(const std::string &s, char delim,
220 std::vector<std::string> &elems) {
221 elems.clear();
222 size_t prev = 0;
223 size_t pos = s.find(delim);
224 while (pos != std::string::npos) {
225 elems.push_back(s.substr(prev, pos));
226 prev = pos + 1;
227 pos = s.find(delim, prev);
228 }
229 elems.push_back(s.substr(prev, s.size()));
230 }
231 };
232
233 } // namespace rocksdb
234
235 #else // USE_HDFS
236
237
238 namespace rocksdb {
239
240 static const Status notsup;
241
242 class HdfsEnv : public Env {
243
244 public:
245 explicit HdfsEnv(const std::string& /*fsname*/) {
246 fprintf(stderr, "You have not build rocksdb with HDFS support\n");
247 fprintf(stderr, "Please see hdfs/README for details\n");
248 abort();
249 }
250
251 virtual ~HdfsEnv() {
252 }
253
254 virtual Status NewSequentialFile(const std::string& fname,
255 std::unique_ptr<SequentialFile>* result,
256 const EnvOptions& options) override;
257
258 virtual Status NewRandomAccessFile(
259 const std::string& /*fname*/,
260 std::unique_ptr<RandomAccessFile>* /*result*/,
261 const EnvOptions& /*options*/) override {
262 return notsup;
263 }
264
265 virtual Status NewWritableFile(const std::string& /*fname*/,
266 std::unique_ptr<WritableFile>* /*result*/,
267 const EnvOptions& /*options*/) override {
268 return notsup;
269 }
270
271 virtual Status NewDirectory(const std::string& /*name*/,
272 std::unique_ptr<Directory>* /*result*/) override {
273 return notsup;
274 }
275
276 virtual Status FileExists(const std::string& /*fname*/) override {
277 return notsup;
278 }
279
280 virtual Status GetChildren(const std::string& /*path*/,
281 std::vector<std::string>* /*result*/) override {
282 return notsup;
283 }
284
285 virtual Status DeleteFile(const std::string& /*fname*/) override {
286 return notsup;
287 }
288
289 virtual Status CreateDir(const std::string& /*name*/) override {
290 return notsup;
291 }
292
293 virtual Status CreateDirIfMissing(const std::string& /*name*/) override {
294 return notsup;
295 }
296
297 virtual Status DeleteDir(const std::string& /*name*/) override {
298 return notsup;
299 }
300
301 virtual Status GetFileSize(const std::string& /*fname*/,
302 uint64_t* /*size*/) override {
303 return notsup;
304 }
305
306 virtual Status GetFileModificationTime(const std::string& /*fname*/,
307 uint64_t* /*time*/) override {
308 return notsup;
309 }
310
311 virtual Status RenameFile(const std::string& /*src*/,
312 const std::string& /*target*/) override {
313 return notsup;
314 }
315
316 virtual Status LinkFile(const std::string& /*src*/,
317 const std::string& /*target*/) override {
318 return notsup;
319 }
320
321 virtual Status LockFile(const std::string& /*fname*/,
322 FileLock** /*lock*/) override {
323 return notsup;
324 }
325
326 virtual Status UnlockFile(FileLock* /*lock*/) override { return notsup; }
327
328 virtual Status NewLogger(const std::string& /*fname*/,
329 std::shared_ptr<Logger>* /*result*/) override {
330 return notsup;
331 }
332
333 virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/,
334 Priority /*pri*/ = LOW, void* /*tag*/ = nullptr,
335 void (* /*unschedFunction*/)(void* arg) = 0) override {}
336
337 virtual int UnSchedule(void* /*tag*/, Priority /*pri*/) override { return 0; }
338
339 virtual void StartThread(void (* /*function*/)(void* arg),
340 void* /*arg*/) override {}
341
342 virtual void WaitForJoin() override {}
343
344 virtual unsigned int GetThreadPoolQueueLen(
345 Priority /*pri*/ = LOW) const override {
346 return 0;
347 }
348
349 virtual Status GetTestDirectory(std::string* /*path*/) override {
350 return notsup;
351 }
352
353 virtual uint64_t NowMicros() override { return 0; }
354
355 virtual void SleepForMicroseconds(int /*micros*/) override {}
356
357 virtual Status GetHostName(char* /*name*/, uint64_t /*len*/) override {
358 return notsup;
359 }
360
361 virtual Status GetCurrentTime(int64_t* /*unix_time*/) override {
362 return notsup;
363 }
364
365 virtual Status GetAbsolutePath(const std::string& /*db_path*/,
366 std::string* /*outputpath*/) override {
367 return notsup;
368 }
369
370 virtual void SetBackgroundThreads(int /*number*/,
371 Priority /*pri*/ = LOW) override {}
372 virtual int GetBackgroundThreads(Priority /*pri*/ = LOW) override {
373 return 0;
374 }
375 virtual void IncBackgroundThreadsIfNeeded(int /*number*/,
376 Priority /*pri*/) override {}
377 virtual std::string TimeToString(uint64_t /*number*/) override { return ""; }
378
379 virtual uint64_t GetThreadID() const override {
380 return 0;
381 }
382 };
383 }
384
385 #endif // USE_HDFS