]> git.proxmox.com Git - ceph.git/blame - ceph/src/client/hypertable/CephBroker.cc
update sources to v12.1.0
[ceph.git] / ceph / src / client / hypertable / CephBroker.cc
CommitLineData
7c673cae
FG
1/** -*- C++ -*-
2 * Copyright (C) 2009-2011 New Dream Network
3 *
4 * This file is part of Hypertable.
5 *
6 * Hypertable is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or any later version.
10 *
11 * Hypertable is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with Hypertable. If not, see <http://www.gnu.org/licenses/>
18 *
19 * Authors:
20 * Gregory Farnum <gfarnum@gmail.com>
21 * Colin McCabe <cmccabe@alumni.cmu.edu>
22 */
23
24#include "Common/Compat.h"
25
26#include "CephBroker.h"
27#include "Common/Error.h"
28#include "Common/FileUtils.h"
29#include "Common/Filesystem.h"
30#include "Common/System.h"
31
32#include <cephfs/libcephfs.h>
33#include <dirent.h>
34#include <errno.h>
35#include <fcntl.h>
36#include <poll.h>
37#include <string>
38#include <sys/types.h>
39#include <sys/uio.h>
40#include <unistd.h>
41
42using namespace Hypertable;
43
31f18b77 44std::atomic<int> CephBroker::ms_next_fd{0};
7c673cae
FG
45
46/* A thread-safe version of strerror */
47static std::string cpp_strerror(int err)
48{
49 char buf[128];
50 if (err < 0)
51 err = -err;
52 std::ostringstream oss;
53 oss << strerror_r(err, buf, sizeof(buf));
54 return oss.str();
55}
56
57OpenFileDataCeph::OpenFileDataCeph(struct ceph_mount_info *cmount_, const String& fname,
58 int _fd, int _flags)
59 : cmount(cmount_), fd(_fd), flags(_flags), filename(fname)
60{
61}
62
63OpenFileDataCeph::~OpenFileDataCeph() {
64 ceph_close(cmount, fd);
65}
66
67CephBroker::CephBroker(PropertiesPtr& cfg)
68 : cmount(NULL)
69{
70 int ret;
71 String id(cfg->get_str("CephBroker.Id"));
72 m_verbose = cfg->get_bool("Hypertable.Verbose");
73 m_root_dir = cfg->get_str("CephBroker.RootDir");
74 String mon_addr(cfg->get_str("CephBroker.MonAddr"));
75
76 HT_INFO("Calling ceph_create");
77 ret = ceph_create(&cmount, id.empty() ? NULL : id.c_str());
78 if (ret) {
79 throw Hypertable::Exception(ret, "ceph_create failed");
80 }
81 ret = ceph_conf_set(cmount, "mon_host", mon_addr.c_str());
82 if (ret) {
83 ceph_shutdown(cmount);
84 throw Hypertable::Exception(ret, "ceph_conf_set(mon_addr) failed");
85 }
86
87 // For Ceph debugging, uncomment these lines
88 //ceph_conf_set(cmount, "debug_client", "1");
89 //ceph_conf_set(cmount, "debug_ms", "1");
90
91 HT_INFO("Calling ceph_mount");
92 ret = ceph_mount(cmount, m_root_dir.empty() ? NULL : m_root_dir.c_str());
93 if (ret) {
94 ceph_shutdown(cmount);
95 throw Hypertable::Exception(ret, "ceph_mount failed");
96 }
97 HT_INFO("Mounted Ceph filesystem.");
98}
99
100CephBroker::~CephBroker()
101{
102 ceph_shutdown(cmount);
103 cmount = NULL;
104}
105
106void CephBroker::open(ResponseCallbackOpen *cb, const char *fname,
107 uint32_t flags, uint32_t bufsz) {
108 int fd, ceph_fd;
109 String abspath;
110 HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz);
111
112 make_abs_path(fname, abspath);
113
114 fd = atomic_inc_return(&ms_next_fd);
115
116 if ((ceph_fd = ceph_open(cmount, abspath.c_str(), O_RDONLY, 0)) < 0) {
117 report_error(cb, -ceph_fd);
118 return;
119 }
120 HT_INFOF("open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd);
121
122 {
123 struct sockaddr_in addr;
124 OpenFileDataCephPtr fdata(new OpenFileDataCeph(cmount, abspath, ceph_fd, O_RDONLY));
125
126 cb->get_address(addr);
127
128 m_open_file_map.create(fd, addr, fdata);
129
130 cb->response(fd);
131 }
132}
133
134void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags,
135 int32_t bufsz, int16_t replication, int64_t blksz){
136 int fd, ceph_fd;
137 int oflags;
138 String abspath;
139
140 make_abs_path(fname, abspath);
141 HT_DEBUGF("create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
142 fname, flags, bufsz, (int)replication, (Lld)blksz);
143
144 fd = atomic_inc_return(&ms_next_fd);
145
146 if (flags & Filesystem::OPEN_FLAG_OVERWRITE)
147 oflags = O_WRONLY | O_CREAT | O_TRUNC;
148 else
149 oflags = O_WRONLY | O_CREAT | O_APPEND;
150
151 //make sure the directories in the path exist
152 String directory = abspath.substr(0, abspath.rfind('/'));
153 int r;
154 HT_INFOF("Calling mkdirs on %s", directory.c_str());
155 if((r=ceph_mkdirs(cmount, directory.c_str(), 0644)) < 0 && r!=-EEXIST) {
156 HT_ERRORF("create failed on mkdirs: dname='%s' - %d", directory.c_str(), -r);
157 report_error(cb, -r);
158 return;
159 }
160
161 //create file
162 if ((ceph_fd = ceph_open(cmount, abspath.c_str(), oflags, 0644)) < 0) {
163 std::string errs(cpp_strerror(-ceph_fd));
164 HT_ERRORF("open failed: file=%s - %s", abspath.c_str(), errs.c_str());
165 report_error(cb, ceph_fd);
166 return;
167 }
168
169 HT_INFOF("create %s = %d", fname, ceph_fd);
170
171 {
172 struct sockaddr_in addr;
173 OpenFileDataCephPtr fdata (new OpenFileDataCeph(cmount, fname, ceph_fd, O_WRONLY));
174
175 cb->get_address(addr);
176
177 m_open_file_map.create(fd, addr, fdata);
178
179 cb->response(fd);
180 }
181}
182
183void CephBroker::close(ResponseCallback *cb, uint32_t fd) {
184 if (m_verbose) {
185 HT_INFOF("close fd=%d", fd);
186 }
187 OpenFileDataCephPtr fdata;
188 m_open_file_map.get(fd, fdata);
189 m_open_file_map.remove(fd);
190 cb->response_ok();
191}
192
193void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount) {
194 OpenFileDataCephPtr fdata;
195 ssize_t nread;
196 uint64_t offset;
197 StaticBuffer buf(new uint8_t [amount], amount);
198
199 HT_DEBUGF("read fd=%d amount = %d", fd, amount);
200
201 if (!m_open_file_map.get(fd, fdata)) {
202 char errbuf[32];
203 sprintf(errbuf, "%d", fd);
204 cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
205 HT_ERRORF("bad file handle: %d", fd);
206 return;
207 }
208
209 if ((offset = ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) {
210 std::string errs(cpp_strerror((int)-offset));
211 HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s",
212 fd, fdata->fd, errs.c_str());
213 report_error(cb, offset);
214 return;
215 }
216
217 if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, 0)) < 0 ) {
218 HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata->fd, amount);
219 report_error(cb, -nread);
220 return;
221 }
222
223 buf.size = nread;
224 cb->response(offset, buf);
225}
226
227void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd,
228 uint32_t amount, const void *data, bool sync)
229{
230 OpenFileDataCephPtr fdata;
231 ssize_t nwritten;
232 uint64_t offset;
233
234 HT_DEBUG_OUT << "append fd="<< fd <<" amount="<< amount <<" data='"
235 << format_bytes(20, data, amount) <<" sync="<< sync << HT_END;
236
237 if (!m_open_file_map.get(fd, fdata)) {
238 char errbuf[32];
239 sprintf(errbuf, "%d", fd);
240 cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
241 return;
242 }
243
244 if ((offset = (uint64_t)ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) {
245 std::string errs(cpp_strerror((int)-offset));
246 HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd,
247 errs.c_str());
248 report_error(cb, offset);
249 return;
250 }
251
252 if ((nwritten = ceph_write(cmount, fdata->fd, (const char *)data, amount, 0)) < 0) {
253 std::string errs(cpp_strerror(nwritten));
254 HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s",
255 fd, fdata->fd, amount, errs.c_str());
256 report_error(cb, -nwritten);
257 return;
258 }
259
260 int r;
261 if (sync && ((r = ceph_fsync(cmount, fdata->fd, true)) != 0)) {
262 std::string errs(cpp_strerror(errno));
263 HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, errs.c_str());
264 report_error(cb, r);
265 return;
266 }
267
268 cb->response(offset, nwritten);
269}
270
271void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) {
272 OpenFileDataCephPtr fdata;
273
274 HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
275
276 if (!m_open_file_map.get(fd, fdata)) {
277 char errbuf[32];
278 sprintf(errbuf, "%d", fd);
279 cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
280 return;
281 }
282 loff_t res = ceph_lseek(cmount, fdata->fd, offset, SEEK_SET);
283 if (res < 0) {
284 std::string errs(cpp_strerror((int)res));
285 HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s",
286 fd, fdata->fd, (Llu)offset, errs.c_str());
287 report_error(cb, offset);
288 return;
289 }
290
291 cb->response_ok();
292}
293
294void CephBroker::remove(ResponseCallback *cb, const char *fname) {
295 String abspath;
296
297 HT_DEBUGF("remove file='%s'", fname);
298
299 make_abs_path(fname, abspath);
300
301 int r;
302 if ((r = ceph_unlink(cmount, abspath.c_str())) < 0) {
303 std::string errs(cpp_strerror(r));
304 HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), errs.c_str());
305 report_error(cb, r);
306 return;
307 }
308 cb->response_ok();
309}
310
311void CephBroker::length(ResponseCallbackLength *cb, const char *fname, bool) {
312 int r;
313 struct ceph_statx stx;
314
315 HT_DEBUGF("length file='%s'", fname);
316
317 if ((r = ceph_statx(cmount, fname, &stx, CEPH_STATX_SIZE, AT_SYMLINK_NOFOLLOW)) < 0) {
318 String abspath;
319 make_abs_path(fname, abspath);
320 std::string errs(cpp_strerror(r));
321 HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(), errs.c_str());
322 report_error(cb,- r);
323 return;
324 }
325 cb->response(stx.stx_size);
326}
327
328void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset,
329 uint32_t amount, bool) {
330 OpenFileDataCephPtr fdata;
331 ssize_t nread;
332 StaticBuffer buf(new uint8_t [amount], amount);
333
334 HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset, amount);
335
336 if (!m_open_file_map.get(fd, fdata)) {
337 char errbuf[32];
338 sprintf(errbuf, "%d", fd);
339 cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
340 return;
341 }
342
343 if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, offset)) < 0) {
344 std::string errs(cpp_strerror(nread));
345 HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu - %s", fd, fdata->fd,
346 amount, (Llu)offset, errs.c_str());
347 report_error(cb, nread);
348 return;
349 }
350
351 buf.size = nread;
352
353 cb->response(offset, buf);
354}
355
356void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) {
357 String absdir;
358
359 HT_DEBUGF("mkdirs dir='%s'", dname);
360
361 make_abs_path(dname, absdir);
362 int r;
363 if((r=ceph_mkdirs(cmount, absdir.c_str(), 0644)) < 0 && r!=-EEXIST) {
364 HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r);
365 report_error(cb, -r);
366 return;
367 }
368 cb->response_ok();
369}
370
371void CephBroker::rmdir(ResponseCallback *cb, const char *dname) {
372 String absdir;
373 int r;
374
375 make_abs_path(dname, absdir);
376 if((r = rmdir_recursive(absdir.c_str())) < 0) {
377 HT_ERRORF("failed to remove dir %s, got error %d", absdir.c_str(), r);
378 report_error(cb, -r);
379 return;
380 }
381 cb->response_ok();
382}
383
384int CephBroker::rmdir_recursive(const char *directory) {
385 struct ceph_dir_result *dirp;
386 struct dirent de;
387 struct ceph_statx stx;
388 int r;
389 if ((r = ceph_opendir(cmount, directory, &dirp)) < 0)
390 return r; //failed to open
391 while ((r = ceph_readdirplus_r(cmount, dirp, &de, &stx, CEPH_STATX_INO, AT_NO_ATTR_SYNC, NULL)) > 0) {
392 String new_dir = de.d_name;
393 if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) {
394 new_dir = directory;
395 new_dir += '/';
396 new_dir += de.d_name;
397 if (S_ISDIR(stx.stx_mode)) { //it's a dir, clear it out...
398 if((r=rmdir_recursive(new_dir.c_str())) < 0) return r;
399 } else { //delete this file
400 if((r=ceph_unlink(cmount, new_dir.c_str())) < 0) return r;
401 }
402 }
403 }
404 if (r < 0) return r; //we got an error
405 if ((r = ceph_closedir(cmount, dirp)) < 0) return r;
406 return ceph_rmdir(cmount, directory);
407}
408
409void CephBroker::flush(ResponseCallback *cb, uint32_t fd) {
410 OpenFileDataCephPtr fdata;
411
412 HT_DEBUGF("flush fd=%d", fd);
413
414 if (!m_open_file_map.get(fd, fdata)) {
415 char errbuf[32];
416 sprintf(errbuf, "%d", fd);
417 cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
418 return;
419 }
420
421 int r;
422 if ((r = ceph_fsync(cmount, fdata->fd, true)) != 0) {
423 std::string errs(cpp_strerror(r));
424 HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, errs.c_str());
425 report_error(cb, -r);
426 return;
427 }
428
429 cb->response_ok();
430}
431
432void CephBroker::status(ResponseCallback *cb) {
433 cb->response_ok();
434 /*perhaps a total cheat, but both the local and Kosmos brokers
435 included in Hypertable also do this. */
436}
437
438void CephBroker::shutdown(ResponseCallback *cb) {
439 m_open_file_map.remove_all();
440 cb->response_ok();
441 poll(0, 0, 2000);
442}
443
444void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) {
445 std::vector<String> listing;
446 String absdir;
447
448 HT_DEBUGF("Readdir dir='%s'", dname);
449
450 //get from ceph in a buffer
451 make_abs_path(dname, absdir);
452
453 struct ceph_dir_result *dirp;
454 ceph_opendir(cmount, absdir.c_str(), &dirp);
455 int r;
456 int buflen = 100; //good default?
457 char *buf = new char[buflen];
458 String *ent;
459 int bufpos;
460 while (1) {
461 r = ceph_getdnames(cmount, dirp, buf, buflen);
462 if (r==-ERANGE) { //expand the buffer
463 delete [] buf;
464 buflen *= 2;
465 buf = new char[buflen];
466 continue;
467 }
468 if (r<=0) break;
469
470 //if we make it here, we got at least one name, maybe more
471 bufpos = 0;
472 while (bufpos<r) {//make new strings and add them to listing
473 ent = new String(buf+bufpos);
474 if (ent->compare(".") && ent->compare(".."))
475 listing.push_back(*ent);
476 bufpos+=ent->size()+1;
477 delete ent;
478 }
479 }
480 delete [] buf;
481 ceph_closedir(cmount, dirp);
482
483 if (r < 0) report_error(cb, -r); //Ceph shouldn't return r<0 on getdnames
484 //(except for ERANGE) so if it happens this is bad
485 cb->response(listing);
486}
487
488void CephBroker::exists(ResponseCallbackExists *cb, const char *fname) {
489 String abspath;
490 struct ceph_statx stx;
491
492 HT_DEBUGF("exists file='%s'", fname);
493 make_abs_path(fname, abspath);
494 cb->response(ceph_statx(cmount, abspath.c_str(), &stx, 0, AT_SYMLINK_NOFOLLOW) == 0);
495}
496
497void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst) {
498 String src_abs;
499 String dest_abs;
500 int r;
501
502 make_abs_path(src, src_abs);
503 make_abs_path(dst, dest_abs);
504 if ((r = ceph_rename(cmount, src_abs.c_str(), dest_abs.c_str())) <0 ) {
505 report_error(cb, r);
506 return;
507 }
508 cb->response_ok();
509}
510
511void CephBroker::debug(ResponseCallback *cb, int32_t command,
512 StaticBuffer &serialized_parameters) {
513 HT_ERROR("debug commands not implemented!");
514 cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not supported"));
515}
516
517void CephBroker::report_error(ResponseCallback *cb, int error) {
518 char errbuf[128];
519 errbuf[0] = 0;
520
521 strerror_r(error, errbuf, 128);
522
523 cb->error(Error::DFSBROKER_IO_ERROR, errbuf);
524}
525
526