]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/hdfs/env_hdfs.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
12 #include "port/sys_time.h"
13 #include "rocksdb/env.h"
14 #include "rocksdb/status.h"
21 // Thrown during execution when there is an issue with the supplied
23 class HdfsUsageException
: public std::exception
{ };
25 // A simple exception that indicates something went wrong that is not
26 // recoverable. The intention is for the message to be printed (with
27 // nothing else) and the process terminate.
28 class HdfsFatalException
: public std::exception
{
30 explicit HdfsFatalException(const std::string
& s
) : what_(s
) { }
31 virtual ~HdfsFatalException() throw() { }
32 virtual const char* what() const throw() {
36 const std::string what_
;
40 // The HDFS environment for rocksdb. This class overrides all the
41 // file/dir access methods and delegates the thread-mgmt methods to the
42 // default posix environment.
44 class HdfsEnv
: public Env
{
47 explicit HdfsEnv(const std::string
& fsname
) : fsname_(fsname
) {
48 posixEnv
= Env::Default();
49 fileSys_
= connectToPath(fsname_
);
53 fprintf(stderr
, "Destroying HdfsEnv::Default()\n");
54 hdfsDisconnect(fileSys_
);
57 Status
NewSequentialFile(const std::string
& fname
,
58 std::unique_ptr
<SequentialFile
>* result
,
59 const EnvOptions
& options
) override
;
61 Status
NewRandomAccessFile(const std::string
& fname
,
62 std::unique_ptr
<RandomAccessFile
>* result
,
63 const EnvOptions
& options
) override
;
65 Status
NewWritableFile(const std::string
& fname
,
66 std::unique_ptr
<WritableFile
>* result
,
67 const EnvOptions
& options
) override
;
69 Status
NewDirectory(const std::string
& name
,
70 std::unique_ptr
<Directory
>* result
) override
;
72 Status
FileExists(const std::string
& fname
) override
;
74 Status
GetChildren(const std::string
& path
,
75 std::vector
<std::string
>* result
) override
;
77 Status
DeleteFile(const std::string
& fname
) override
;
79 Status
CreateDir(const std::string
& name
) override
;
81 Status
CreateDirIfMissing(const std::string
& name
) override
;
83 Status
DeleteDir(const std::string
& name
) override
;
85 Status
GetFileSize(const std::string
& fname
, uint64_t* size
) override
;
87 Status
GetFileModificationTime(const std::string
& fname
,
88 uint64_t* file_mtime
) override
;
90 Status
RenameFile(const std::string
& src
, const std::string
& target
) override
;
92 Status
LinkFile(const std::string
& /*src*/,
93 const std::string
& /*target*/) override
{
94 return Status::NotSupported(); // not supported
97 Status
LockFile(const std::string
& fname
, FileLock
** lock
) override
;
99 Status
UnlockFile(FileLock
* lock
) override
;
101 Status
NewLogger(const std::string
& fname
,
102 std::shared_ptr
<Logger
>* result
) override
;
104 void Schedule(void (*function
)(void* arg
), void* arg
, Priority pri
= LOW
,
106 void (*unschedFunction
)(void* arg
) = 0) override
{
107 posixEnv
->Schedule(function
, arg
, pri
, tag
, unschedFunction
);
110 int UnSchedule(void* tag
, Priority pri
) override
{
111 return posixEnv
->UnSchedule(tag
, pri
);
114 void StartThread(void (*function
)(void* arg
), void* arg
) override
{
115 posixEnv
->StartThread(function
, arg
);
118 void WaitForJoin() override
{ posixEnv
->WaitForJoin(); }
120 unsigned int GetThreadPoolQueueLen(Priority pri
= LOW
) const override
{
121 return posixEnv
->GetThreadPoolQueueLen(pri
);
124 Status
GetTestDirectory(std::string
* path
) override
{
125 return posixEnv
->GetTestDirectory(path
);
128 uint64_t NowMicros() override
{ return posixEnv
->NowMicros(); }
130 void SleepForMicroseconds(int micros
) override
{
131 posixEnv
->SleepForMicroseconds(micros
);
134 Status
GetHostName(char* name
, uint64_t len
) override
{
135 return posixEnv
->GetHostName(name
, len
);
138 Status
GetCurrentTime(int64_t* unix_time
) override
{
139 return posixEnv
->GetCurrentTime(unix_time
);
142 Status
GetAbsolutePath(const std::string
& db_path
,
143 std::string
* output_path
) override
{
144 return posixEnv
->GetAbsolutePath(db_path
, output_path
);
147 void SetBackgroundThreads(int number
, Priority pri
= LOW
) override
{
148 posixEnv
->SetBackgroundThreads(number
, pri
);
151 int GetBackgroundThreads(Priority pri
= LOW
) override
{
152 return posixEnv
->GetBackgroundThreads(pri
);
155 void IncBackgroundThreadsIfNeeded(int number
, Priority pri
) override
{
156 posixEnv
->IncBackgroundThreadsIfNeeded(number
, pri
);
159 std::string
TimeToString(uint64_t number
) override
{
160 return posixEnv
->TimeToString(number
);
163 static uint64_t gettid() {
164 assert(sizeof(pthread_t
) <= sizeof(uint64_t));
165 return (uint64_t)pthread_self();
168 uint64_t GetThreadID() const override
{ return HdfsEnv::gettid(); }
171 std::string fsname_
; // string of the form "hdfs://hostname:port/"
172 hdfsFS fileSys_
; // a single FileSystem object for all files
173 Env
* posixEnv
; // This object is derived from Env, but not from
174 // posixEnv. We have posixnv as an encapsulated
175 // object here so that we can use posix timers,
176 // posix threads, etc.
178 static const std::string kProto
;
179 static const std::string pathsep
;
182 * If the URI is specified of the form hdfs://server:port/path,
183 * then connect to the specified cluster
184 * else connect to default.
186 hdfsFS
connectToPath(const std::string
& uri
) {
190 if (uri
.find(kProto
) != 0) {
191 // uri doesn't start with hdfs:// -> use default:0, which is special
193 return hdfsConnectNewInstance("default", 0);
195 const std::string hostport
= uri
.substr(kProto
.length());
197 std::vector
<std::string
> parts
;
198 split(hostport
, ':', parts
);
199 if (parts
.size() != 2) {
200 throw HdfsFatalException("Bad uri for hdfs " + uri
);
202 // parts[0] = hosts, parts[1] = port/xxx/yyy
203 std::string
host(parts
[0]);
204 std::string
remaining(parts
[1]);
206 int rem
= static_cast<int>(remaining
.find(pathsep
));
207 std::string portStr
= (rem
== 0 ? remaining
:
208 remaining
.substr(0, rem
));
211 port
= atoi(portStr
.c_str());
213 throw HdfsFatalException("Bad host-port for hdfs " + uri
);
215 hdfsFS fs
= hdfsConnectNewInstance(host
.c_str(), port
);
219 void split(const std::string
&s
, char delim
,
220 std::vector
<std::string
> &elems
) {
223 size_t pos
= s
.find(delim
);
224 while (pos
!= std::string::npos
) {
225 elems
.push_back(s
.substr(prev
, pos
));
227 pos
= s
.find(delim
, prev
);
229 elems
.push_back(s
.substr(prev
, s
.size()));
233 } // namespace rocksdb
240 static const Status notsup
;
242 class HdfsEnv
: public Env
{
245 explicit HdfsEnv(const std::string
& /*fsname*/) {
246 fprintf(stderr
, "You have not build rocksdb with HDFS support\n");
247 fprintf(stderr
, "Please see hdfs/README for details\n");
254 virtual Status
NewSequentialFile(const std::string
& fname
,
255 std::unique_ptr
<SequentialFile
>* result
,
256 const EnvOptions
& options
) override
;
258 virtual Status
NewRandomAccessFile(
259 const std::string
& /*fname*/,
260 std::unique_ptr
<RandomAccessFile
>* /*result*/,
261 const EnvOptions
& /*options*/) override
{
265 virtual Status
NewWritableFile(const std::string
& /*fname*/,
266 std::unique_ptr
<WritableFile
>* /*result*/,
267 const EnvOptions
& /*options*/) override
{
271 virtual Status
NewDirectory(const std::string
& /*name*/,
272 std::unique_ptr
<Directory
>* /*result*/) override
{
276 virtual Status
FileExists(const std::string
& /*fname*/) override
{
280 virtual Status
GetChildren(const std::string
& /*path*/,
281 std::vector
<std::string
>* /*result*/) override
{
285 virtual Status
DeleteFile(const std::string
& /*fname*/) override
{
289 virtual Status
CreateDir(const std::string
& /*name*/) override
{
293 virtual Status
CreateDirIfMissing(const std::string
& /*name*/) override
{
297 virtual Status
DeleteDir(const std::string
& /*name*/) override
{
301 virtual Status
GetFileSize(const std::string
& /*fname*/,
302 uint64_t* /*size*/) override
{
306 virtual Status
GetFileModificationTime(const std::string
& /*fname*/,
307 uint64_t* /*time*/) override
{
311 virtual Status
RenameFile(const std::string
& /*src*/,
312 const std::string
& /*target*/) override
{
316 virtual Status
LinkFile(const std::string
& /*src*/,
317 const std::string
& /*target*/) override
{
321 virtual Status
LockFile(const std::string
& /*fname*/,
322 FileLock
** /*lock*/) override
{
326 virtual Status
UnlockFile(FileLock
* /*lock*/) override
{ return notsup
; }
328 virtual Status
NewLogger(const std::string
& /*fname*/,
329 std::shared_ptr
<Logger
>* /*result*/) override
{
333 virtual void Schedule(void (* /*function*/)(void* arg
), void* /*arg*/,
334 Priority
/*pri*/ = LOW
, void* /*tag*/ = nullptr,
335 void (* /*unschedFunction*/)(void* arg
) = 0) override
{}
337 virtual int UnSchedule(void* /*tag*/, Priority
/*pri*/) override
{ return 0; }
339 virtual void StartThread(void (* /*function*/)(void* arg
),
340 void* /*arg*/) override
{}
342 virtual void WaitForJoin() override
{}
344 virtual unsigned int GetThreadPoolQueueLen(
345 Priority
/*pri*/ = LOW
) const override
{
349 virtual Status
GetTestDirectory(std::string
* /*path*/) override
{
353 virtual uint64_t NowMicros() override
{ return 0; }
355 virtual void SleepForMicroseconds(int /*micros*/) override
{}
357 virtual Status
GetHostName(char* /*name*/, uint64_t /*len*/) override
{
361 virtual Status
GetCurrentTime(int64_t* /*unix_time*/) override
{
365 virtual Status
GetAbsolutePath(const std::string
& /*db_path*/,
366 std::string
* /*outputpath*/) override
{
370 virtual void SetBackgroundThreads(int /*number*/,
371 Priority
/*pri*/ = LOW
) override
{}
372 virtual int GetBackgroundThreads(Priority
/*pri*/ = LOW
) override
{
375 virtual void IncBackgroundThreadsIfNeeded(int /*number*/,
376 Priority
/*pri*/) override
{}
377 virtual std::string
TimeToString(uint64_t /*number*/) override
{ return ""; }
379 virtual uint64_t GetThreadID() const override
{