1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2021 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or modify it under the
10 * terms of the GNU Lesser General Public License version 2.1, as published by
11 * the Free Software Foundation. See file COPYING.
15 #include <boost/smart_ptr/intrusive_ptr.hpp>
16 #include <fmt/format.h>
21 #include <sys/types.h>
29 #include <string_view>
34 #include <sqlite3ext.h>
35 SQLITE_EXTENSION_INIT1
37 #include "include/ceph_assert.h"
38 #include "include/rados/librados.hpp"
40 #include "common/Clock.h"
41 #include "common/Formatter.h"
42 #include "common/ceph_argparse.h"
43 #include "common/ceph_mutex.h"
44 #include "common/common_init.h"
45 #include "common/config.h"
46 #include "common/debug.h"
47 #include "common/errno.h"
48 #include "common/perf_counters.h"
49 #include "common/version.h"
51 #include "include/libcephsqlite.h"
52 #include "SimpleRADOSStriper.h"
54 #define dout_subsys ceph_subsys_cephsqlite
56 #define dout_prefix *_dout << "cephsqlite: " << __func__ << ": "
57 #define d(cct,cluster,lvl) ldout((cct), (lvl)) << "(client." << cluster->get_instance_id() << ") "
58 #define dv(lvl) d(cct,cluster,(lvl))
59 #define df(lvl) d(f->io.cct,f->io.cluster,(lvl)) << f->loc << " "
76 P_OPF_CHECKRESERVEDLOCK
,
79 P_OPF_DEVICECHARACTERISTICS
,
83 using cctptr
= boost::intrusive_ptr
<CephContext
>;
84 using rsptr
= std::shared_ptr
<librados::Rados
>;
86 struct cephsqlite_appdata
{
87 ~cephsqlite_appdata() {
89 std::scoped_lock
lock(cluster_mutex
);
93 cct
->get_perfcounters_collection()->remove(logger
.get());
96 cct
->get_perfcounters_collection()->remove(striper_logger
.get());
101 PerfCountersBuilder
plb(cct
.get(), "libcephsqlite_vfs", P_FIRST
, P_LAST
);
102 plb
.add_time_avg(P_OP_OPEN
, "op_open", "Time average of Open operations");
103 plb
.add_time_avg(P_OP_DELETE
, "op_delete", "Time average of Delete operations");
104 plb
.add_time_avg(P_OP_ACCESS
, "op_access", "Time average of Access operations");
105 plb
.add_time_avg(P_OP_FULLPATHNAME
, "op_fullpathname", "Time average of FullPathname operations");
106 plb
.add_time_avg(P_OP_CURRENTTIME
, "op_currenttime", "Time average of Currenttime operations");
107 plb
.add_time_avg(P_OPF_CLOSE
, "opf_close", "Time average of Close file operations");
108 plb
.add_time_avg(P_OPF_READ
, "opf_read", "Time average of Read file operations");
109 plb
.add_time_avg(P_OPF_WRITE
, "opf_write", "Time average of Write file operations");
110 plb
.add_time_avg(P_OPF_TRUNCATE
, "opf_truncate", "Time average of Truncate file operations");
111 plb
.add_time_avg(P_OPF_SYNC
, "opf_sync", "Time average of Sync file operations");
112 plb
.add_time_avg(P_OPF_FILESIZE
, "opf_filesize", "Time average of FileSize file operations");
113 plb
.add_time_avg(P_OPF_LOCK
, "opf_lock", "Time average of Lock file operations");
114 plb
.add_time_avg(P_OPF_UNLOCK
, "opf_unlock", "Time average of Unlock file operations");
115 plb
.add_time_avg(P_OPF_CHECKRESERVEDLOCK
, "opf_checkreservedlock", "Time average of CheckReservedLock file operations");
116 plb
.add_time_avg(P_OPF_FILECONTROL
, "opf_filecontrol", "Time average of FileControl file operations");
117 plb
.add_time_avg(P_OPF_SECTORSIZE
, "opf_sectorsize", "Time average of SectorSize file operations");
118 plb
.add_time_avg(P_OPF_DEVICECHARACTERISTICS
, "opf_devicecharacteristics", "Time average of DeviceCharacteristics file operations");
119 logger
.reset(plb
.create_perf_counters());
120 if (int rc
= SimpleRADOSStriper::config_logger(cct
.get(), "libcephsqlite_striper", &striper_logger
); rc
< 0) {
123 cct
->get_perfcounters_collection()->add(logger
.get());
124 cct
->get_perfcounters_collection()->add(striper_logger
.get());
128 std::pair
<cctptr
, rsptr
> get_cluster() {
129 std::scoped_lock
lock(cluster_mutex
);
131 if (int rc
= _open(nullptr); rc
< 0) {
132 ceph_abort("could not open connection to ceph");
135 return {cct
, cluster
};
138 std::scoped_lock
lock(cluster_mutex
);
142 std::scoped_lock
lock(cluster_mutex
);
146 int maybe_reconnect(rsptr _cluster
) {
147 std::scoped_lock
lock(cluster_mutex
);
148 if (!cluster
|| cluster
== _cluster
) {
149 ldout(cct
, 10) << "reconnecting to RADOS" << dendl
;
153 ldout(cct
, 10) << "already reconnected" << dendl
;
157 int open(CephContext
* _cct
) {
158 std::scoped_lock
lock(cluster_mutex
);
162 std::unique_ptr
<PerfCounters
> logger
;
163 std::shared_ptr
<PerfCounters
> striper_logger
;
166 int _open(CephContext
* _cct
) {
168 std::vector
<const char*> env_args
;
169 env_to_vec(env_args
, "CEPH_ARGS");
170 std::string cluster
, conf_file_list
; // unused
171 CephInitParameters iparams
= ceph_argparse_early_args(env_args
, CEPH_ENTITY_TYPE_CLIENT
, &cluster
, &conf_file_list
);
172 cct
= cctptr(common_preinit(iparams
, CODE_ENVIRONMENT_LIBRARY
, 0), false);
173 cct
->_conf
.parse_config_files(nullptr, &std::cerr
, 0);
174 cct
->_conf
.parse_env(cct
->get_module_type()); // environment variables override
175 cct
->_conf
.apply_changes(nullptr);
176 common_init_finish(cct
.get());
181 if (int rc
= setup_perf(); rc
< 0) {
185 if (int rc
= _connect(); rc
< 0) {
198 auto _cluster
= rsptr(new librados::Rados());
199 ldout(cct
, 5) << "initializing RADOS handle as " << cct
->_conf
->name
<< dendl
;
200 if (int rc
= _cluster
->init_with_context(cct
.get()); rc
< 0) {
201 lderr(cct
) << "cannot initialize RADOS: " << cpp_strerror(rc
) << dendl
;
204 if (int rc
= _cluster
->connect(); rc
< 0) {
205 lderr(cct
) << "cannot connect: " << cpp_strerror(rc
) << dendl
;
208 auto s
= _cluster
->get_addrs();
209 ldout(cct
, 5) << "completed connection to RADOS with address " << s
<< dendl
;
210 cluster
= std::move(_cluster
);
214 ceph::mutex cluster_mutex
= ceph::make_mutex("libcephsqlite");;
219 struct cephsqlite_fileloc
{
225 struct cephsqlite_fileio
{
227 rsptr cluster
; // anchor for ioctx
228 librados::IoCtx ioctx
;
229 std::unique_ptr
<SimpleRADOSStriper
> rs
;
232 std::ostream
& operator<<(std::ostream
&out
, const cephsqlite_fileloc
& fileloc
) {
244 struct cephsqlite_file
{
246 struct sqlite3_vfs
* vfs
= nullptr;
248 // There are 5 lock states: https://sqlite.org/c3ref/c_lock_exclusive.html
250 struct cephsqlite_fileloc loc
{};
251 struct cephsqlite_fileio io
{};
255 #define getdata(vfs) (*((cephsqlite_appdata*)((vfs)->pAppData)))
257 static int Lock(sqlite3_file
*file
, int ilock
)
259 auto f
= (cephsqlite_file
*)file
;
260 auto start
= ceph::coarse_mono_clock::now();
261 df(5) << std::hex
<< ilock
<< dendl
;
263 auto& lock
= f
->lock
;
264 ceph_assert(!f
->io
.rs
->is_locked() || lock
> SQLITE_LOCK_NONE
);
265 ceph_assert(lock
<= ilock
);
266 if (!f
->io
.rs
->is_locked() && ilock
> SQLITE_LOCK_NONE
) {
267 if (int rc
= f
->io
.rs
->lock(0); rc
< 0) {
268 df(5) << "failed: " << rc
<< dendl
;
269 if (rc
== -EBLOCKLISTED
) {
270 getdata(f
->vfs
).maybe_reconnect(f
->io
.cluster
);
277 auto end
= ceph::coarse_mono_clock::now();
278 getdata(f
->vfs
).logger
->tinc(P_OPF_LOCK
, end
-start
);
282 static int Unlock(sqlite3_file
*file
, int ilock
)
284 auto f
= (cephsqlite_file
*)file
;
285 auto start
= ceph::coarse_mono_clock::now();
286 df(5) << std::hex
<< ilock
<< dendl
;
288 auto& lock
= f
->lock
;
289 ceph_assert(lock
== SQLITE_LOCK_NONE
|| (lock
> SQLITE_LOCK_NONE
&& f
->io
.rs
->is_locked()));
290 ceph_assert(lock
>= ilock
);
291 if (ilock
<= SQLITE_LOCK_NONE
&& SQLITE_LOCK_NONE
< lock
) {
292 if (int rc
= f
->io
.rs
->unlock(); rc
< 0) {
293 df(5) << "failed: " << rc
<< dendl
;
294 if (rc
== -EBLOCKLISTED
) {
295 getdata(f
->vfs
).maybe_reconnect(f
->io
.cluster
);
302 auto end
= ceph::coarse_mono_clock::now();
303 getdata(f
->vfs
).logger
->tinc(P_OPF_UNLOCK
, end
-start
);
307 static int CheckReservedLock(sqlite3_file
*file
, int *result
)
309 auto f
= (cephsqlite_file
*)file
;
310 auto start
= ceph::coarse_mono_clock::now();
314 auto& lock
= f
->lock
;
315 if (lock
> SQLITE_LOCK_SHARED
) {
320 f
->io
.rs
->print_lockers(*_dout
);
323 auto end
= ceph::coarse_mono_clock::now();
324 getdata(f
->vfs
).logger
->tinc(P_OPF_CHECKRESERVEDLOCK
, end
-start
);
328 static int Close(sqlite3_file
*file
)
330 auto f
= (cephsqlite_file
*)file
;
331 auto start
= ceph::coarse_mono_clock::now();
333 f
->~cephsqlite_file();
334 auto end
= ceph::coarse_mono_clock::now();
335 getdata(f
->vfs
).logger
->tinc(P_OPF_CLOSE
, end
-start
);
339 static int Read(sqlite3_file
*file
, void *buf
, int len
, sqlite_int64 off
)
341 auto f
= (cephsqlite_file
*)file
;
342 auto start
= ceph::coarse_mono_clock::now();
343 df(5) << buf
<< " " << off
<< "~" << len
<< dendl
;
345 if (int rc
= f
->io
.rs
->read(buf
, len
, off
); rc
< 0) {
346 df(5) << "read failed: " << cpp_strerror(rc
) << dendl
;
347 if (rc
== -EBLOCKLISTED
) {
348 getdata(f
->vfs
).maybe_reconnect(f
->io
.cluster
);
350 return SQLITE_IOERR_READ
;
352 df(5) << "= " << rc
<< dendl
;
353 auto end
= ceph::coarse_mono_clock::now();
354 getdata(f
->vfs
).logger
->tinc(P_OPF_READ
, end
-start
);
356 memset((unsigned char*)buf
+rc
, 0, len
-rc
);
357 return SQLITE_IOERR_SHORT_READ
;
364 static int Write(sqlite3_file
*file
, const void *buf
, int len
, sqlite_int64 off
)
366 auto f
= (cephsqlite_file
*)file
;
367 auto start
= ceph::coarse_mono_clock::now();
368 df(5) << off
<< "~" << len
<< dendl
;
370 if (int rc
= f
->io
.rs
->write(buf
, len
, off
); rc
< 0) {
371 df(5) << "write failed: " << cpp_strerror(rc
) << dendl
;
372 if (rc
== -EBLOCKLISTED
) {
373 getdata(f
->vfs
).maybe_reconnect(f
->io
.cluster
);
375 return SQLITE_IOERR_WRITE
;
377 df(5) << "= " << rc
<< dendl
;
378 auto end
= ceph::coarse_mono_clock::now();
379 getdata(f
->vfs
).logger
->tinc(P_OPF_WRITE
, end
-start
);
385 static int Truncate(sqlite3_file
*file
, sqlite_int64 size
)
387 auto f
= (cephsqlite_file
*)file
;
388 auto start
= ceph::coarse_mono_clock::now();
389 df(5) << size
<< dendl
;
391 if (int rc
= f
->io
.rs
->truncate(size
); rc
< 0) {
392 df(5) << "truncate failed: " << cpp_strerror(rc
) << dendl
;
393 if (rc
== -EBLOCKLISTED
) {
394 getdata(f
->vfs
).maybe_reconnect(f
->io
.cluster
);
399 auto end
= ceph::coarse_mono_clock::now();
400 getdata(f
->vfs
).logger
->tinc(P_OPF_TRUNCATE
, end
-start
);
404 static int Sync(sqlite3_file
*file
, int flags
)
406 auto f
= (cephsqlite_file
*)file
;
407 auto start
= ceph::coarse_mono_clock::now();
408 df(5) << flags
<< dendl
;
410 if (int rc
= f
->io
.rs
->flush(); rc
< 0) {
411 df(5) << "failed: " << cpp_strerror(rc
) << dendl
;
412 if (rc
== -EBLOCKLISTED
) {
413 getdata(f
->vfs
).maybe_reconnect(f
->io
.cluster
);
418 df(5) << " = 0" << dendl
;
420 auto end
= ceph::coarse_mono_clock::now();
421 getdata(f
->vfs
).logger
->tinc(P_OPF_SYNC
, end
-start
);
426 static int FileSize(sqlite3_file
*file
, sqlite_int64
*osize
)
428 auto f
= (cephsqlite_file
*)file
;
429 auto start
= ceph::coarse_mono_clock::now();
433 if (int rc
= f
->io
.rs
->stat(&size
); rc
< 0) {
434 df(5) << "stat failed: " << cpp_strerror(rc
) << dendl
;
435 if (rc
== -EBLOCKLISTED
) {
436 getdata(f
->vfs
).maybe_reconnect(f
->io
.cluster
);
438 return SQLITE_NOTFOUND
;
441 *osize
= (sqlite_int64
)size
;
443 df(5) << "= " << size
<< dendl
;
445 auto end
= ceph::coarse_mono_clock::now();
446 getdata(f
->vfs
).logger
->tinc(P_OPF_FILESIZE
, end
-start
);
451 static bool parsepath(std::string_view path
, struct cephsqlite_fileloc
* fileloc
)
453 static const std::regex re1
{"^/*(\\*[[:digit:]]+):([[:alnum:]\\-_.]*)/([[:alnum:]\\-._]+)$"};
454 static const std::regex re2
{"^/*([[:alnum:]\\-_.]+):([[:alnum:]\\-_.]*)/([[:alnum:]\\-._]+)$"};
457 if (!std::regex_match(path
.data(), cm
, re1
)) {
458 if (!std::regex_match(path
.data(), cm
, re2
)) {
462 fileloc
->pool
= cm
[1];
463 fileloc
->radosns
= cm
[2];
464 fileloc
->name
= cm
[3];
469 static int makestriper(sqlite3_vfs
* vfs
, cctptr cct
, rsptr cluster
, const cephsqlite_fileloc
& loc
, cephsqlite_fileio
* io
)
473 d(cct
,cluster
,10) << loc
<< dendl
;
476 if (loc
.pool
[0] == '*') {
478 int64_t id
= strict_strtoll(loc
.pool
.c_str()+1, 10, &err
);
479 ceph_assert(err
.empty());
480 if (int rc
= cluster
->ioctx_create2(id
, io
->ioctx
); rc
< 0) {
481 if (rc
== -ENOENT
&& !gotmap
) {
482 cluster
->wait_for_latest_osdmap();
486 d(cct
,cluster
,1) << "cannot create ioctx: " << cpp_strerror(rc
) << dendl
;
490 if (int rc
= cluster
->ioctx_create(loc
.pool
.c_str(), io
->ioctx
); rc
< 0) {
491 if (rc
== -ENOENT
&& !gotmap
) {
492 cluster
->wait_for_latest_osdmap();
496 d(cct
,cluster
,1) << "cannot create ioctx: " << cpp_strerror(rc
) << dendl
;
501 if (!loc
.radosns
.empty())
502 io
->ioctx
.set_namespace(loc
.radosns
);
504 io
->rs
= std::make_unique
<SimpleRADOSStriper
>(io
->ioctx
, loc
.name
);
505 io
->rs
->set_logger(getdata(vfs
).striper_logger
);
506 io
->rs
->set_lock_timeout(cct
->_conf
.get_val
<std::chrono::milliseconds
>("cephsqlite_lock_renewal_timeout"));
507 io
->rs
->set_lock_interval(cct
->_conf
.get_val
<std::chrono::milliseconds
>("cephsqlite_lock_renewal_interval"));
508 io
->rs
->set_blocklist_the_dead(cct
->_conf
.get_val
<bool>("cephsqlite_blocklist_dead_locker"));
509 io
->cluster
= std::move(cluster
);
515 static int SectorSize(sqlite3_file
* sf
)
517 static const int size
= 65536;
518 auto start
= ceph::coarse_mono_clock::now();
519 auto f
= (cephsqlite_file
*)sf
;
520 df(5) << " = " << size
<< dendl
;
521 auto end
= ceph::coarse_mono_clock::now();
522 getdata(f
->vfs
).logger
->tinc(P_OPF_SECTORSIZE
, end
-start
);
526 static int FileControl(sqlite3_file
* sf
, int op
, void *arg
)
528 auto f
= (cephsqlite_file
*)sf
;
529 auto start
= ceph::coarse_mono_clock::now();
530 df(5) << op
<< ", " << arg
<< dendl
;
531 auto end
= ceph::coarse_mono_clock::now();
532 getdata(f
->vfs
).logger
->tinc(P_OPF_FILECONTROL
, end
-start
);
533 return SQLITE_NOTFOUND
;
536 static int DeviceCharacteristics(sqlite3_file
* sf
)
538 auto f
= (cephsqlite_file
*)sf
;
539 auto start
= ceph::coarse_mono_clock::now();
541 static const int c
= 0
543 |SQLITE_IOCAP_POWERSAFE_OVERWRITE
544 |SQLITE_IOCAP_UNDELETABLE_WHEN_OPEN
545 |SQLITE_IOCAP_SAFE_APPEND
547 auto end
= ceph::coarse_mono_clock::now();
548 getdata(f
->vfs
).logger
->tinc(P_OPF_DEVICECHARACTERISTICS
, end
-start
);
552 static int Open(sqlite3_vfs
*vfs
, const char *name
, sqlite3_file
*file
,
553 int flags
, int *oflags
)
555 static const sqlite3_io_methods io
= {
560 Truncate
, /* xTruncate */
562 FileSize
, /* xFileSize */
564 Unlock
, /* xUnlock */
565 CheckReservedLock
, /* xCheckReservedLock */
566 FileControl
, /* xFileControl */
567 SectorSize
, /* xSectorSize */
568 DeviceCharacteristics
/* xDeviceCharacteristics */
571 auto start
= ceph::coarse_mono_clock::now();
573 auto [cct
, cluster
] = getdata(vfs
).get_cluster();
575 /* we are not going to create temporary files */
577 dv(-1) << " cannot open temporary database" << dendl
;
578 return SQLITE_CANTOPEN
;
580 auto path
= std::string_view(name
);
581 if (path
== ":memory:") {
582 dv(-1) << " cannot open temporary database" << dendl
;
586 dv(5) << path
<< " flags=" << std::hex
<< flags
<< dendl
;
588 auto f
= new (file
)cephsqlite_file();
590 if (!parsepath(path
, &f
->loc
)) {
591 ceph_assert(0); /* xFullPathname validates! */
596 if (int rc
= makestriper(vfs
, cct
, cluster
, f
->loc
, &f
->io
); rc
< 0) {
597 f
->~cephsqlite_file();
598 dv(-1) << "cannot open striper" << dendl
;
602 if (flags
& SQLITE_OPEN_CREATE
) {
603 dv(10) << "OPEN_CREATE" << dendl
;
604 if (int rc
= f
->io
.rs
->create(); rc
< 0 && rc
!= -EEXIST
) {
605 if (rc
== -ENOENT
&& !gotmap
) {
606 /* we may have an out of date OSDMap which cancels the op in the
607 * Objecter. Try to get a new one and retry. This is mostly noticable
608 * in testing when pools are getting created/deleted left and right.
610 dv(5) << "retrying create after getting latest OSDMap" << dendl
;
611 cluster
->wait_for_latest_osdmap();
615 dv(5) << "file cannot be created: " << cpp_strerror(rc
) << dendl
;
620 if (int rc
= f
->io
.rs
->open(); rc
< 0) {
621 if (rc
== -ENOENT
&& !gotmap
) {
622 /* See comment above for create case. */
623 dv(5) << "retrying open after getting latest OSDMap" << dendl
;
624 cluster
->wait_for_latest_osdmap();
628 dv(10) << "cannot open striper: " << cpp_strerror(rc
) << dendl
;
635 f
->base
.pMethods
= &io
;
636 auto end
= ceph::coarse_mono_clock::now();
637 getdata(vfs
).logger
->tinc(P_OP_OPEN
, end
-start
);
642 ** Delete the file identified by argument path. If the dsync parameter
643 ** is non-zero, then ensure the file-system modification to delete the
644 ** file has been synced to disk before returning.
646 static int Delete(sqlite3_vfs
* vfs
, const char* path
, int dsync
)
648 auto start
= ceph::coarse_mono_clock::now();
649 auto [cct
, cluster
] = getdata(vfs
).get_cluster();
650 dv(5) << "'" << path
<< "', " << dsync
<< dendl
;
652 cephsqlite_fileloc fileloc
;
653 if (!parsepath(path
, &fileloc
)) {
654 dv(5) << "path does not parse!" << dendl
;
655 return SQLITE_NOTFOUND
;
658 cephsqlite_fileio io
;
659 if (int rc
= makestriper(vfs
, cct
, cluster
, fileloc
, &io
); rc
< 0) {
660 dv(-1) << "cannot open striper" << dendl
;
664 if (int rc
= io
.rs
->lock(0); rc
< 0) {
668 if (int rc
= io
.rs
->remove(); rc
< 0) {
669 dv(5) << "= " << rc
<< dendl
;
670 return SQLITE_IOERR_DELETE
;
673 /* No need to unlock */
674 dv(5) << "= 0" << dendl
;
675 auto end
= ceph::coarse_mono_clock::now();
676 getdata(vfs
).logger
->tinc(P_OP_DELETE
, end
-start
);
682 ** Query the file-system to see if the named file exists, is readable or
683 ** is both readable and writable.
685 static int Access(sqlite3_vfs
* vfs
, const char* path
, int flags
, int* result
)
687 auto start
= ceph::coarse_mono_clock::now();
688 auto [cct
, cluster
] = getdata(vfs
).get_cluster();
689 dv(5) << path
<< " " << std::hex
<< flags
<< dendl
;
691 cephsqlite_fileloc fileloc
;
692 if (!parsepath(path
, &fileloc
)) {
693 dv(5) << "path does not parse!" << dendl
;
694 return SQLITE_NOTFOUND
;
697 cephsqlite_fileio io
;
698 if (int rc
= makestriper(vfs
, cct
, cluster
, fileloc
, &io
); rc
< 0) {
699 dv(-1) << "cannot open striper" << dendl
;
703 if (int rc
= io
.rs
->open(); rc
< 0) {
708 dv(10) << "cannot open striper: " << cpp_strerror(rc
) << dendl
;
715 if (int rc
= io
.rs
->stat(&size
); rc
< 0) {
716 dv(5) << "= " << rc
<< " (" << cpp_strerror(rc
) << ")" << dendl
;
719 dv(5) << "= 0" << dendl
;
723 auto end
= ceph::coarse_mono_clock::now();
724 getdata(vfs
).logger
->tinc(P_OP_ACCESS
, end
-start
);
728 /* This method is only called once for each database. It provides a chance to
729 * reformat the path into a canonical format.
731 static int FullPathname(sqlite3_vfs
* vfs
, const char* ipath
, int opathlen
, char* opath
)
733 auto start
= ceph::coarse_mono_clock::now();
734 auto path
= std::string_view(ipath
);
735 auto [cct
, cluster
] = getdata(vfs
).get_cluster();
736 dv(5) << "1: " << path
<< dendl
;
738 cephsqlite_fileloc fileloc
;
739 if (!parsepath(path
, &fileloc
)) {
740 dv(5) << "path does not parse!" << dendl
;
741 return SQLITE_NOTFOUND
;
743 dv(5) << " parsed " << fileloc
<< dendl
;
745 auto p
= fmt::format("{}:{}/{}", fileloc
.pool
, fileloc
.radosns
, fileloc
.name
);
746 if (p
.size() >= (size_t)opathlen
) {
747 dv(5) << "path too long!" << dendl
;
748 return SQLITE_CANTOPEN
;
750 strcpy(opath
, p
.c_str());
751 dv(5) << " output " << p
<< dendl
;
753 auto end
= ceph::coarse_mono_clock::now();
754 getdata(vfs
).logger
->tinc(P_OP_FULLPATHNAME
, end
-start
);
758 static int CurrentTime(sqlite3_vfs
* vfs
, sqlite3_int64
* time
)
760 auto start
= ceph::coarse_mono_clock::now();
761 auto [cct
, cluster
] = getdata(vfs
).get_cluster();
762 dv(5) << time
<< dendl
;
764 auto t
= ceph_clock_now();
765 *time
= t
.to_msec() + 2440587.5*86400000; /* julian days since 1970 converted to ms */
767 auto end
= ceph::coarse_mono_clock::now();
768 getdata(vfs
).logger
->tinc(P_OP_CURRENTTIME
, end
-start
);
772 LIBCEPHSQLITE_API
int cephsqlite_setcct(CephContext
* _cct
, char** ident
)
774 ldout(_cct
, 1) << "cct: " << _cct
<< dendl
;
776 if (sqlite3_api
== nullptr) {
777 lderr(_cct
) << "API violation: must have sqlite3 init libcephsqlite" << dendl
;
781 auto vfs
= sqlite3_vfs_find("ceph");
783 lderr(_cct
) << "API violation: must have sqlite3 init libcephsqlite" << dendl
;
787 auto& appd
= getdata(vfs
);
788 if (int rc
= appd
.open(_cct
); rc
< 0) {
792 auto [cct
, cluster
] = appd
.get_cluster();
794 auto s
= cluster
->get_addrs();
796 *ident
= strdup(s
.c_str());
799 ldout(cct
, 1) << "complete" << dendl
;
804 static void f_perf(sqlite3_context
* ctx
, int argc
, sqlite3_value
** argv
)
806 auto vfs
= (sqlite3_vfs
*)sqlite3_user_data(ctx
);
807 auto [cct
, cluster
] = getdata(vfs
).get_cluster();
809 auto&& appd
= getdata(vfs
);
810 JSONFormatter
f(false);
811 f
.open_object_section("ceph_perf");
812 appd
.logger
->dump_formatted(&f
, false, false);
813 appd
.striper_logger
->dump_formatted(&f
, false, false);
816 CachedStackStringStream css
;
818 auto sv
= css
->strv();
819 dv(20) << " = " << sv
<< dendl
;
820 sqlite3_result_text(ctx
, sv
.data(), sv
.size(), SQLITE_TRANSIENT
);
824 static void f_status(sqlite3_context
* ctx
, int argc
, sqlite3_value
** argv
)
826 auto vfs
= (sqlite3_vfs
*)sqlite3_user_data(ctx
);
827 auto [cct
, cluster
] = getdata(vfs
).get_cluster();
829 JSONFormatter
f(false);
830 f
.open_object_section("ceph_status");
831 f
.dump_int("id", cluster
->get_instance_id());
832 f
.dump_string("addr", cluster
->get_addrs());
835 CachedStackStringStream css
;
837 auto sv
= css
->strv();
838 dv(20) << " = " << sv
<< dendl
;
839 sqlite3_result_text(ctx
, sv
.data(), sv
.size(), SQLITE_TRANSIENT
);
843 static int autoreg(sqlite3
* db
, char** err
, const struct sqlite3_api_routines
* thunk
)
845 auto vfs
= sqlite3_vfs_find("ceph");
847 ceph_abort("ceph vfs not found");
850 if (int rc
= sqlite3_create_function(db
, "ceph_perf", 0, SQLITE_UTF8
, vfs
, f_perf
, nullptr, nullptr); rc
) {
854 if (int rc
= sqlite3_create_function(db
, "ceph_status", 0, SQLITE_UTF8
, vfs
, f_status
, nullptr, nullptr); rc
) {
861 /* You may wonder why we have an atexit handler? After all, atexit/exit creates
862 * a mess for multithreaded programs. Well, sqlite3 does not have an API for
863 * orderly removal of extensions. And, in fact, any API we might make
864 * unofficially (such as "sqlite3_cephsqlite_fini") would potentially race with
865 * other threads interacting with sqlite3 + the "ceph" VFS. There is a method
866 * for removing a VFS but it's not called by sqlite3 in any error scenario and
867 * there is no mechanism within sqlite3 to tell a VFS to unregister itself.
869 * This all would be mostly okay if /bin/sqlite3 did not call exit(3), but it
870 * does. (This occurs only for the sqlite3 binary, not when used as a library.)
871 * exit(3) calls destructors on all static-duration structures for the program.
872 * This breaks any outstanding threads created by the librados handle in all
873 * sorts of fantastic ways from C++ exceptions to memory faults. In general,
874 * Ceph libraries are not tolerant of exit(3) (_exit(3) is okay!). Applications
875 * must clean up after themselves or _exit(3).
877 * So, we have an atexit handler for libcephsqlite. This simply shuts down the
878 * RADOS handle. We can be assured that this occurs before any ceph library
879 * static-duration structures are destructed due to ordering guarantees by
880 * exit(3). Generally, we only see this called when the VFS is used by
881 * /bin/sqlite3 and only during sqlite3 error scenarios (like I/O errors
882 * arrising from blocklisting).
885 static void cephsqlite_atexit()
887 if (auto vfs
= sqlite3_vfs_find("ceph"); vfs
) {
889 auto&& appd
= getdata(vfs
);
891 vfs
->pAppData
= nullptr;
896 LIBCEPHSQLITE_API
int sqlite3_cephsqlite_init(sqlite3
* db
, char** err
, const sqlite3_api_routines
* api
)
898 SQLITE_EXTENSION_INIT2(api
);
900 auto vfs
= sqlite3_vfs_find("ceph");
902 vfs
= (sqlite3_vfs
*) calloc(1, sizeof(sqlite3_vfs
));
903 auto appd
= new cephsqlite_appdata
;
905 vfs
->szOsFile
= sizeof(struct cephsqlite_file
);
906 vfs
->mxPathname
= 4096;
908 vfs
->pAppData
= appd
;
910 vfs
->xDelete
= Delete
;
911 vfs
->xAccess
= Access
;
912 vfs
->xFullPathname
= FullPathname
;
913 vfs
->xCurrentTimeInt64
= CurrentTime
;
914 if (int rc
= sqlite3_vfs_register(vfs
, 0); rc
) {
921 if (int rc
= std::atexit(cephsqlite_atexit
); rc
) {
922 return SQLITE_INTERNAL
;
925 if (int rc
= sqlite3_auto_extension((void(*)(void))autoreg
); rc
) {
928 if (int rc
= autoreg(db
, err
, api
); rc
) {
932 return SQLITE_OK_LOAD_PERMANENTLY
;