]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | /** -*- C++ -*- |
2 | * Copyright (C) 2009-2011 New Dream Network | |
3 | * | |
4 | * This file is part of Hypertable. | |
5 | * | |
6 | * Hypertable is free software; you can redistribute it and/or | |
7 | * modify it under the terms of the GNU General Public License | |
8 | * as published by the Free Software Foundation; either version 2 | |
9 | * of the License, or any later version. | |
10 | * | |
11 | * Hypertable is distributed in the hope that it will be useful, | |
12 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 | * GNU General Public License for more details. | |
15 | * | |
16 | * You should have received a copy of the GNU General Public License | |
17 | * along with Hypertable. If not, see <http://www.gnu.org/licenses/> | |
18 | * | |
19 | * Authors: | |
20 | * Gregory Farnum <gfarnum@gmail.com> | |
21 | * Colin McCabe <cmccabe@alumni.cmu.edu> | |
22 | */ | |
23 | ||
24 | #include "Common/Compat.h" | |
25 | ||
26 | #include "CephBroker.h" | |
27 | #include "Common/Error.h" | |
28 | #include "Common/FileUtils.h" | |
29 | #include "Common/Filesystem.h" | |
30 | #include "Common/System.h" | |
31 | ||
32 | #include <cephfs/libcephfs.h> | |
33 | #include <dirent.h> | |
34 | #include <errno.h> | |
35 | #include <fcntl.h> | |
36 | #include <poll.h> | |
37 | #include <string> | |
38 | #include <sys/types.h> | |
39 | #include <sys/uio.h> | |
40 | #include <unistd.h> | |
41 | ||
42 | using namespace Hypertable; | |
43 | ||
31f18b77 | 44 | std::atomic<int> CephBroker::ms_next_fd{0}; |
7c673cae FG |
45 | |
46 | /* A thread-safe version of strerror */ | |
47 | static std::string cpp_strerror(int err) | |
48 | { | |
49 | char buf[128]; | |
50 | if (err < 0) | |
51 | err = -err; | |
52 | std::ostringstream oss; | |
53 | oss << strerror_r(err, buf, sizeof(buf)); | |
54 | return oss.str(); | |
55 | } | |
56 | ||
57 | OpenFileDataCeph::OpenFileDataCeph(struct ceph_mount_info *cmount_, const String& fname, | |
58 | int _fd, int _flags) | |
59 | : cmount(cmount_), fd(_fd), flags(_flags), filename(fname) | |
60 | { | |
61 | } | |
62 | ||
63 | OpenFileDataCeph::~OpenFileDataCeph() { | |
64 | ceph_close(cmount, fd); | |
65 | } | |
66 | ||
67 | CephBroker::CephBroker(PropertiesPtr& cfg) | |
68 | : cmount(NULL) | |
69 | { | |
70 | int ret; | |
71 | String id(cfg->get_str("CephBroker.Id")); | |
72 | m_verbose = cfg->get_bool("Hypertable.Verbose"); | |
73 | m_root_dir = cfg->get_str("CephBroker.RootDir"); | |
74 | String mon_addr(cfg->get_str("CephBroker.MonAddr")); | |
75 | ||
76 | HT_INFO("Calling ceph_create"); | |
77 | ret = ceph_create(&cmount, id.empty() ? NULL : id.c_str()); | |
78 | if (ret) { | |
79 | throw Hypertable::Exception(ret, "ceph_create failed"); | |
80 | } | |
81 | ret = ceph_conf_set(cmount, "mon_host", mon_addr.c_str()); | |
82 | if (ret) { | |
83 | ceph_shutdown(cmount); | |
84 | throw Hypertable::Exception(ret, "ceph_conf_set(mon_addr) failed"); | |
85 | } | |
86 | ||
87 | // For Ceph debugging, uncomment these lines | |
88 | //ceph_conf_set(cmount, "debug_client", "1"); | |
89 | //ceph_conf_set(cmount, "debug_ms", "1"); | |
90 | ||
91 | HT_INFO("Calling ceph_mount"); | |
92 | ret = ceph_mount(cmount, m_root_dir.empty() ? NULL : m_root_dir.c_str()); | |
93 | if (ret) { | |
94 | ceph_shutdown(cmount); | |
95 | throw Hypertable::Exception(ret, "ceph_mount failed"); | |
96 | } | |
97 | HT_INFO("Mounted Ceph filesystem."); | |
98 | } | |
99 | ||
100 | CephBroker::~CephBroker() | |
101 | { | |
102 | ceph_shutdown(cmount); | |
103 | cmount = NULL; | |
104 | } | |
105 | ||
106 | void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, | |
107 | uint32_t flags, uint32_t bufsz) { | |
108 | int fd, ceph_fd; | |
109 | String abspath; | |
110 | HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz); | |
111 | ||
112 | make_abs_path(fname, abspath); | |
113 | ||
114 | fd = atomic_inc_return(&ms_next_fd); | |
115 | ||
116 | if ((ceph_fd = ceph_open(cmount, abspath.c_str(), O_RDONLY, 0)) < 0) { | |
117 | report_error(cb, -ceph_fd); | |
118 | return; | |
119 | } | |
120 | HT_INFOF("open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd); | |
121 | ||
122 | { | |
123 | struct sockaddr_in addr; | |
124 | OpenFileDataCephPtr fdata(new OpenFileDataCeph(cmount, abspath, ceph_fd, O_RDONLY)); | |
125 | ||
126 | cb->get_address(addr); | |
127 | ||
128 | m_open_file_map.create(fd, addr, fdata); | |
129 | ||
130 | cb->response(fd); | |
131 | } | |
132 | } | |
133 | ||
134 | void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, | |
135 | int32_t bufsz, int16_t replication, int64_t blksz){ | |
136 | int fd, ceph_fd; | |
137 | int oflags; | |
138 | String abspath; | |
139 | ||
140 | make_abs_path(fname, abspath); | |
141 | HT_DEBUGF("create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld", | |
142 | fname, flags, bufsz, (int)replication, (Lld)blksz); | |
143 | ||
144 | fd = atomic_inc_return(&ms_next_fd); | |
145 | ||
146 | if (flags & Filesystem::OPEN_FLAG_OVERWRITE) | |
147 | oflags = O_WRONLY | O_CREAT | O_TRUNC; | |
148 | else | |
149 | oflags = O_WRONLY | O_CREAT | O_APPEND; | |
150 | ||
151 | //make sure the directories in the path exist | |
152 | String directory = abspath.substr(0, abspath.rfind('/')); | |
153 | int r; | |
154 | HT_INFOF("Calling mkdirs on %s", directory.c_str()); | |
155 | if((r=ceph_mkdirs(cmount, directory.c_str(), 0644)) < 0 && r!=-EEXIST) { | |
156 | HT_ERRORF("create failed on mkdirs: dname='%s' - %d", directory.c_str(), -r); | |
157 | report_error(cb, -r); | |
158 | return; | |
159 | } | |
160 | ||
161 | //create file | |
162 | if ((ceph_fd = ceph_open(cmount, abspath.c_str(), oflags, 0644)) < 0) { | |
163 | std::string errs(cpp_strerror(-ceph_fd)); | |
164 | HT_ERRORF("open failed: file=%s - %s", abspath.c_str(), errs.c_str()); | |
165 | report_error(cb, ceph_fd); | |
166 | return; | |
167 | } | |
168 | ||
169 | HT_INFOF("create %s = %d", fname, ceph_fd); | |
170 | ||
171 | { | |
172 | struct sockaddr_in addr; | |
173 | OpenFileDataCephPtr fdata (new OpenFileDataCeph(cmount, fname, ceph_fd, O_WRONLY)); | |
174 | ||
175 | cb->get_address(addr); | |
176 | ||
177 | m_open_file_map.create(fd, addr, fdata); | |
178 | ||
179 | cb->response(fd); | |
180 | } | |
181 | } | |
182 | ||
183 | void CephBroker::close(ResponseCallback *cb, uint32_t fd) { | |
184 | if (m_verbose) { | |
185 | HT_INFOF("close fd=%d", fd); | |
186 | } | |
187 | OpenFileDataCephPtr fdata; | |
188 | m_open_file_map.get(fd, fdata); | |
189 | m_open_file_map.remove(fd); | |
190 | cb->response_ok(); | |
191 | } | |
192 | ||
193 | void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount) { | |
194 | OpenFileDataCephPtr fdata; | |
195 | ssize_t nread; | |
196 | uint64_t offset; | |
197 | StaticBuffer buf(new uint8_t [amount], amount); | |
198 | ||
199 | HT_DEBUGF("read fd=%d amount = %d", fd, amount); | |
200 | ||
201 | if (!m_open_file_map.get(fd, fdata)) { | |
202 | char errbuf[32]; | |
203 | sprintf(errbuf, "%d", fd); | |
204 | cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); | |
205 | HT_ERRORF("bad file handle: %d", fd); | |
206 | return; | |
207 | } | |
208 | ||
209 | if ((offset = ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) { | |
210 | std::string errs(cpp_strerror((int)-offset)); | |
211 | HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", | |
212 | fd, fdata->fd, errs.c_str()); | |
213 | report_error(cb, offset); | |
214 | return; | |
215 | } | |
216 | ||
217 | if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, 0)) < 0 ) { | |
218 | HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata->fd, amount); | |
219 | report_error(cb, -nread); | |
220 | return; | |
221 | } | |
222 | ||
223 | buf.size = nread; | |
224 | cb->response(offset, buf); | |
225 | } | |
226 | ||
227 | void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd, | |
228 | uint32_t amount, const void *data, bool sync) | |
229 | { | |
230 | OpenFileDataCephPtr fdata; | |
231 | ssize_t nwritten; | |
232 | uint64_t offset; | |
233 | ||
234 | HT_DEBUG_OUT << "append fd="<< fd <<" amount="<< amount <<" data='" | |
235 | << format_bytes(20, data, amount) <<" sync="<< sync << HT_END; | |
236 | ||
237 | if (!m_open_file_map.get(fd, fdata)) { | |
238 | char errbuf[32]; | |
239 | sprintf(errbuf, "%d", fd); | |
240 | cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); | |
241 | return; | |
242 | } | |
243 | ||
244 | if ((offset = (uint64_t)ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) { | |
245 | std::string errs(cpp_strerror((int)-offset)); | |
246 | HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd, | |
247 | errs.c_str()); | |
248 | report_error(cb, offset); | |
249 | return; | |
250 | } | |
251 | ||
252 | if ((nwritten = ceph_write(cmount, fdata->fd, (const char *)data, amount, 0)) < 0) { | |
253 | std::string errs(cpp_strerror(nwritten)); | |
254 | HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", | |
255 | fd, fdata->fd, amount, errs.c_str()); | |
256 | report_error(cb, -nwritten); | |
257 | return; | |
258 | } | |
259 | ||
260 | int r; | |
261 | if (sync && ((r = ceph_fsync(cmount, fdata->fd, true)) != 0)) { | |
262 | std::string errs(cpp_strerror(errno)); | |
263 | HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, errs.c_str()); | |
264 | report_error(cb, r); | |
265 | return; | |
266 | } | |
267 | ||
268 | cb->response(offset, nwritten); | |
269 | } | |
270 | ||
271 | void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) { | |
272 | OpenFileDataCephPtr fdata; | |
273 | ||
274 | HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset); | |
275 | ||
276 | if (!m_open_file_map.get(fd, fdata)) { | |
277 | char errbuf[32]; | |
278 | sprintf(errbuf, "%d", fd); | |
279 | cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); | |
280 | return; | |
281 | } | |
282 | loff_t res = ceph_lseek(cmount, fdata->fd, offset, SEEK_SET); | |
283 | if (res < 0) { | |
284 | std::string errs(cpp_strerror((int)res)); | |
285 | HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", | |
286 | fd, fdata->fd, (Llu)offset, errs.c_str()); | |
287 | report_error(cb, offset); | |
288 | return; | |
289 | } | |
290 | ||
291 | cb->response_ok(); | |
292 | } | |
293 | ||
294 | void CephBroker::remove(ResponseCallback *cb, const char *fname) { | |
295 | String abspath; | |
296 | ||
297 | HT_DEBUGF("remove file='%s'", fname); | |
298 | ||
299 | make_abs_path(fname, abspath); | |
300 | ||
301 | int r; | |
302 | if ((r = ceph_unlink(cmount, abspath.c_str())) < 0) { | |
303 | std::string errs(cpp_strerror(r)); | |
304 | HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), errs.c_str()); | |
305 | report_error(cb, r); | |
306 | return; | |
307 | } | |
308 | cb->response_ok(); | |
309 | } | |
310 | ||
311 | void CephBroker::length(ResponseCallbackLength *cb, const char *fname, bool) { | |
312 | int r; | |
313 | struct ceph_statx stx; | |
314 | ||
315 | HT_DEBUGF("length file='%s'", fname); | |
316 | ||
317 | if ((r = ceph_statx(cmount, fname, &stx, CEPH_STATX_SIZE, AT_SYMLINK_NOFOLLOW)) < 0) { | |
318 | String abspath; | |
319 | make_abs_path(fname, abspath); | |
320 | std::string errs(cpp_strerror(r)); | |
321 | HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(), errs.c_str()); | |
322 | report_error(cb,- r); | |
323 | return; | |
324 | } | |
325 | cb->response(stx.stx_size); | |
326 | } | |
327 | ||
328 | void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset, | |
329 | uint32_t amount, bool) { | |
330 | OpenFileDataCephPtr fdata; | |
331 | ssize_t nread; | |
332 | StaticBuffer buf(new uint8_t [amount], amount); | |
333 | ||
334 | HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset, amount); | |
335 | ||
336 | if (!m_open_file_map.get(fd, fdata)) { | |
337 | char errbuf[32]; | |
338 | sprintf(errbuf, "%d", fd); | |
339 | cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); | |
340 | return; | |
341 | } | |
342 | ||
343 | if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, offset)) < 0) { | |
344 | std::string errs(cpp_strerror(nread)); | |
345 | HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu - %s", fd, fdata->fd, | |
346 | amount, (Llu)offset, errs.c_str()); | |
347 | report_error(cb, nread); | |
348 | return; | |
349 | } | |
350 | ||
351 | buf.size = nread; | |
352 | ||
353 | cb->response(offset, buf); | |
354 | } | |
355 | ||
356 | void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) { | |
357 | String absdir; | |
358 | ||
359 | HT_DEBUGF("mkdirs dir='%s'", dname); | |
360 | ||
361 | make_abs_path(dname, absdir); | |
362 | int r; | |
363 | if((r=ceph_mkdirs(cmount, absdir.c_str(), 0644)) < 0 && r!=-EEXIST) { | |
364 | HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r); | |
365 | report_error(cb, -r); | |
366 | return; | |
367 | } | |
368 | cb->response_ok(); | |
369 | } | |
370 | ||
371 | void CephBroker::rmdir(ResponseCallback *cb, const char *dname) { | |
372 | String absdir; | |
373 | int r; | |
374 | ||
375 | make_abs_path(dname, absdir); | |
376 | if((r = rmdir_recursive(absdir.c_str())) < 0) { | |
377 | HT_ERRORF("failed to remove dir %s, got error %d", absdir.c_str(), r); | |
378 | report_error(cb, -r); | |
379 | return; | |
380 | } | |
381 | cb->response_ok(); | |
382 | } | |
383 | ||
384 | int CephBroker::rmdir_recursive(const char *directory) { | |
385 | struct ceph_dir_result *dirp; | |
386 | struct dirent de; | |
387 | struct ceph_statx stx; | |
388 | int r; | |
389 | if ((r = ceph_opendir(cmount, directory, &dirp)) < 0) | |
390 | return r; //failed to open | |
391 | while ((r = ceph_readdirplus_r(cmount, dirp, &de, &stx, CEPH_STATX_INO, AT_NO_ATTR_SYNC, NULL)) > 0) { | |
392 | String new_dir = de.d_name; | |
393 | if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) { | |
394 | new_dir = directory; | |
395 | new_dir += '/'; | |
396 | new_dir += de.d_name; | |
397 | if (S_ISDIR(stx.stx_mode)) { //it's a dir, clear it out... | |
398 | if((r=rmdir_recursive(new_dir.c_str())) < 0) return r; | |
399 | } else { //delete this file | |
400 | if((r=ceph_unlink(cmount, new_dir.c_str())) < 0) return r; | |
401 | } | |
402 | } | |
403 | } | |
404 | if (r < 0) return r; //we got an error | |
405 | if ((r = ceph_closedir(cmount, dirp)) < 0) return r; | |
406 | return ceph_rmdir(cmount, directory); | |
407 | } | |
408 | ||
409 | void CephBroker::flush(ResponseCallback *cb, uint32_t fd) { | |
410 | OpenFileDataCephPtr fdata; | |
411 | ||
412 | HT_DEBUGF("flush fd=%d", fd); | |
413 | ||
414 | if (!m_open_file_map.get(fd, fdata)) { | |
415 | char errbuf[32]; | |
416 | sprintf(errbuf, "%d", fd); | |
417 | cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); | |
418 | return; | |
419 | } | |
420 | ||
421 | int r; | |
422 | if ((r = ceph_fsync(cmount, fdata->fd, true)) != 0) { | |
423 | std::string errs(cpp_strerror(r)); | |
424 | HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, errs.c_str()); | |
425 | report_error(cb, -r); | |
426 | return; | |
427 | } | |
428 | ||
429 | cb->response_ok(); | |
430 | } | |
431 | ||
432 | void CephBroker::status(ResponseCallback *cb) { | |
433 | cb->response_ok(); | |
434 | /*perhaps a total cheat, but both the local and Kosmos brokers | |
435 | included in Hypertable also do this. */ | |
436 | } | |
437 | ||
438 | void CephBroker::shutdown(ResponseCallback *cb) { | |
439 | m_open_file_map.remove_all(); | |
440 | cb->response_ok(); | |
441 | poll(0, 0, 2000); | |
442 | } | |
443 | ||
444 | void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) { | |
445 | std::vector<String> listing; | |
446 | String absdir; | |
447 | ||
448 | HT_DEBUGF("Readdir dir='%s'", dname); | |
449 | ||
450 | //get from ceph in a buffer | |
451 | make_abs_path(dname, absdir); | |
452 | ||
453 | struct ceph_dir_result *dirp; | |
454 | ceph_opendir(cmount, absdir.c_str(), &dirp); | |
455 | int r; | |
456 | int buflen = 100; //good default? | |
457 | char *buf = new char[buflen]; | |
458 | String *ent; | |
459 | int bufpos; | |
460 | while (1) { | |
461 | r = ceph_getdnames(cmount, dirp, buf, buflen); | |
462 | if (r==-ERANGE) { //expand the buffer | |
463 | delete [] buf; | |
464 | buflen *= 2; | |
465 | buf = new char[buflen]; | |
466 | continue; | |
467 | } | |
468 | if (r<=0) break; | |
469 | ||
470 | //if we make it here, we got at least one name, maybe more | |
471 | bufpos = 0; | |
472 | while (bufpos<r) {//make new strings and add them to listing | |
473 | ent = new String(buf+bufpos); | |
474 | if (ent->compare(".") && ent->compare("..")) | |
475 | listing.push_back(*ent); | |
476 | bufpos+=ent->size()+1; | |
477 | delete ent; | |
478 | } | |
479 | } | |
480 | delete [] buf; | |
481 | ceph_closedir(cmount, dirp); | |
482 | ||
483 | if (r < 0) report_error(cb, -r); //Ceph shouldn't return r<0 on getdnames | |
484 | //(except for ERANGE) so if it happens this is bad | |
485 | cb->response(listing); | |
486 | } | |
487 | ||
488 | void CephBroker::exists(ResponseCallbackExists *cb, const char *fname) { | |
489 | String abspath; | |
490 | struct ceph_statx stx; | |
491 | ||
492 | HT_DEBUGF("exists file='%s'", fname); | |
493 | make_abs_path(fname, abspath); | |
494 | cb->response(ceph_statx(cmount, abspath.c_str(), &stx, 0, AT_SYMLINK_NOFOLLOW) == 0); | |
495 | } | |
496 | ||
497 | void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst) { | |
498 | String src_abs; | |
499 | String dest_abs; | |
500 | int r; | |
501 | ||
502 | make_abs_path(src, src_abs); | |
503 | make_abs_path(dst, dest_abs); | |
504 | if ((r = ceph_rename(cmount, src_abs.c_str(), dest_abs.c_str())) <0 ) { | |
505 | report_error(cb, r); | |
506 | return; | |
507 | } | |
508 | cb->response_ok(); | |
509 | } | |
510 | ||
511 | void CephBroker::debug(ResponseCallback *cb, int32_t command, | |
512 | StaticBuffer &serialized_parameters) { | |
513 | HT_ERROR("debug commands not implemented!"); | |
514 | cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not supported")); | |
515 | } | |
516 | ||
517 | void CephBroker::report_error(ResponseCallback *cb, int error) { | |
518 | char errbuf[128]; | |
519 | errbuf[0] = 0; | |
520 | ||
521 | strerror_r(error, errbuf, 128); | |
522 | ||
523 | cb->error(Error::DFSBROKER_IO_ERROR, errbuf); | |
524 | } | |
525 | ||
526 |