#define dout_subsys ceph_subsys_cephsqlite
#undef dout_prefix
#define dout_prefix *_dout << "cephsqlite: " << __func__ << ": "
-#define d(vfs,lvl) ldout(getcct(vfs), (lvl)) << "(client." << getdata(vfs).cluster.get_instance_id() << ") "
-#define dv(lvl) d(vfs,(lvl))
-#define df(lvl) d(f->vfs,(lvl)) << f->loc << " "
+#define d(cct,cluster,lvl) ldout((cct), (lvl)) << "(client." << cluster->get_instance_id() << ") "
+#define dv(lvl) d(cct,cluster,(lvl))
+#define df(lvl) d(f->io.cct,f->io.cluster,(lvl)) << f->loc << " "
enum {
P_FIRST = 0xf0000,
P_LAST,
};
+using cctptr = boost::intrusive_ptr<CephContext>;
+using rsptr = std::shared_ptr<librados::Rados>;
+
struct cephsqlite_appdata {
~cephsqlite_appdata() {
+ {
+ std::scoped_lock lock(cluster_mutex);
+ _disconnect();
+ }
if (logger) {
cct->get_perfcounters_collection()->remove(logger.get());
}
if (striper_logger) {
cct->get_perfcounters_collection()->remove(striper_logger.get());
}
- cluster.shutdown();
}
int setup_perf() {
ceph_assert(cct);
cct->get_perfcounters_collection()->add(striper_logger.get());
return 0;
}
- int init_cluster() {
+
+ std::pair<cctptr, rsptr> get_cluster() {
+ std::scoped_lock lock(cluster_mutex);
+ if (!cct) {
+ if (int rc = _open(nullptr); rc < 0) {
+ ceph_abort("could not open connection to ceph");
+ }
+ }
+ return {cct, cluster};
+ }
+ int connect() {
+ std::scoped_lock lock(cluster_mutex);
+ return _connect();
+ }
+ int reconnect() {
+ std::scoped_lock lock(cluster_mutex);
+ _disconnect();
+ return _connect();
+ }
+ int maybe_reconnect(rsptr _cluster) {
+ std::scoped_lock lock(cluster_mutex);
+ if (!cluster || cluster == _cluster) {
+ ldout(cct, 10) << "reconnecting to RADOS" << dendl;
+ _disconnect();
+ return _connect();
+ } else {
+ ldout(cct, 10) << "already reconnected" << dendl;
+ return 0;
+ }
+ }
+ int open(CephContext* _cct) {
+ std::scoped_lock lock(cluster_mutex);
+ return _open(_cct);
+ }
+
+ std::unique_ptr<PerfCounters> logger;
+ std::shared_ptr<PerfCounters> striper_logger;
+
+private:
+ int _open(CephContext* _cct) {
+ if (!_cct) {
+ std::vector<const char*> env_args;
+ env_to_vec(env_args, "CEPH_ARGS");
+ std::string cluster, conf_file_list; // unused
+ CephInitParameters iparams = ceph_argparse_early_args(env_args, CEPH_ENTITY_TYPE_CLIENT, &cluster, &conf_file_list);
+ cct = cctptr(common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0), false);
+ cct->_conf.parse_config_files(nullptr, &std::cerr, 0);
+ cct->_conf.parse_env(cct->get_module_type()); // environment variables override
+ cct->_conf.apply_changes(nullptr);
+ common_init_finish(cct.get());
+ } else {
+ cct = cctptr(_cct);
+ }
+
+ if (int rc = setup_perf(); rc < 0) {
+ return rc;
+ }
+
+ if (int rc = _connect(); rc < 0) {
+ return rc;
+ }
+
+ return 0;
+ }
+ void _disconnect() {
+ if (cluster) {
+ cluster.reset();
+ }
+ }
+ int _connect() {
ceph_assert(cct);
+ auto _cluster = rsptr(new librados::Rados());
ldout(cct, 5) << "initializing RADOS handle as " << cct->_conf->name << dendl;
- if (int rc = cluster.init_with_context(cct.get()); rc < 0) {
+ if (int rc = _cluster->init_with_context(cct.get()); rc < 0) {
lderr(cct) << "cannot initialize RADOS: " << cpp_strerror(rc) << dendl;
return rc;
}
- if (int rc = cluster.connect(); rc < 0) {
+ if (int rc = _cluster->connect(); rc < 0) {
lderr(cct) << "cannot connect: " << cpp_strerror(rc) << dendl;
return rc;
}
- auto s = cluster.get_addrs();
+ auto s = _cluster->get_addrs();
ldout(cct, 5) << "completed connection to RADOS with address " << s << dendl;
+ cluster = std::move(_cluster);
return 0;
}
- boost::intrusive_ptr<CephContext> cct;
- std::unique_ptr<PerfCounters> logger;
- std::shared_ptr<PerfCounters> striper_logger;
- librados::Rados cluster;
+ ceph::mutex cluster_mutex = ceph::make_mutex("libcephsqlite");;
+ cctptr cct;
+ rsptr cluster;
};
struct cephsqlite_fileloc {
};
struct cephsqlite_fileio {
+ cctptr cct;
+ rsptr cluster; // anchor for ioctx
librados::IoCtx ioctx;
std::unique_ptr<SimpleRADOSStriper> rs;
};
#define getdata(vfs) (*((cephsqlite_appdata*)((vfs)->pAppData)))
-static CephContext* getcct(sqlite3_vfs* vfs)
-{
- auto&& appd = getdata(vfs);
- auto& cct = appd.cct;
- if (cct) {
- return cct.get();
- }
-
- /* bootstrap cct */
- std::vector<const char*> env_args;
- env_to_vec(env_args, "CEPH_ARGS");
- std::string cluster, conf_file_list; // unused
- CephInitParameters iparams = ceph_argparse_early_args(env_args, CEPH_ENTITY_TYPE_CLIENT, &cluster, &conf_file_list);
- cct = boost::intrusive_ptr<CephContext>(common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0), false);
- cct->_conf.parse_config_files(nullptr, &std::cerr, 0);
- cct->_conf.parse_env(cct->get_module_type()); // environment variables override
- cct->_conf.apply_changes(nullptr);
- common_init_finish(cct.get());
-
- if (int rc = appd.setup_perf(); rc < 0) {
- ceph_abort("cannot setup perf counters");
- }
-
- if (int rc = appd.init_cluster(); rc < 0) {
- ceph_abort("cannot setup RADOS cluster handle");
- }
-
- return cct.get();
-}
-
static int Lock(sqlite3_file *file, int ilock)
{
auto f = (cephsqlite_file*)file;
if (!f->io.rs->is_locked() && ilock > SQLITE_LOCK_NONE) {
if (int rc = f->io.rs->lock(0); rc < 0) {
df(5) << "failed: " << rc << dendl;
+ if (rc == -EBLOCKLISTED) {
+ getdata(f->vfs).maybe_reconnect(f->io.cluster);
+ }
return SQLITE_IOERR;
}
}
if (ilock <= SQLITE_LOCK_NONE && SQLITE_LOCK_NONE < lock) {
if (int rc = f->io.rs->unlock(); rc < 0) {
df(5) << "failed: " << rc << dendl;
+ if (rc == -EBLOCKLISTED) {
+ getdata(f->vfs).maybe_reconnect(f->io.cluster);
+ }
return SQLITE_IOERR;
}
}
if (int rc = f->io.rs->read(buf, len, off); rc < 0) {
df(5) << "read failed: " << cpp_strerror(rc) << dendl;
+ if (rc == -EBLOCKLISTED) {
+ getdata(f->vfs).maybe_reconnect(f->io.cluster);
+ }
return SQLITE_IOERR_READ;
} else {
df(5) << "= " << rc << dendl;
auto end = ceph::coarse_mono_clock::now();
getdata(f->vfs).logger->tinc(P_OPF_READ, end-start);
if (rc < len) {
- memset(buf, 0, len-rc);
+ memset((unsigned char*)buf+rc, 0, len-rc);
return SQLITE_IOERR_SHORT_READ;
} else {
return SQLITE_OK;
if (int rc = f->io.rs->write(buf, len, off); rc < 0) {
df(5) << "write failed: " << cpp_strerror(rc) << dendl;
+ if (rc == -EBLOCKLISTED) {
+ getdata(f->vfs).maybe_reconnect(f->io.cluster);
+ }
return SQLITE_IOERR_WRITE;
} else {
df(5) << "= " << rc << dendl;
if (int rc = f->io.rs->truncate(size); rc < 0) {
df(5) << "truncate failed: " << cpp_strerror(rc) << dendl;
+ if (rc == -EBLOCKLISTED) {
+ getdata(f->vfs).maybe_reconnect(f->io.cluster);
+ }
return SQLITE_IOERR;
}
if (int rc = f->io.rs->flush(); rc < 0) {
df(5) << "failed: " << cpp_strerror(rc) << dendl;
+ if (rc == -EBLOCKLISTED) {
+ getdata(f->vfs).maybe_reconnect(f->io.cluster);
+ }
return SQLITE_IOERR;
}
uint64_t size = 0;
if (int rc = f->io.rs->stat(&size); rc < 0) {
df(5) << "stat failed: " << cpp_strerror(rc) << dendl;
+ if (rc == -EBLOCKLISTED) {
+ getdata(f->vfs).maybe_reconnect(f->io.cluster);
+ }
return SQLITE_NOTFOUND;
}
return true;
}
-static int makestriper(sqlite3_vfs* vfs, const cephsqlite_fileloc& loc, cephsqlite_fileio* io)
+static int makestriper(sqlite3_vfs* vfs, cctptr cct, rsptr cluster, const cephsqlite_fileloc& loc, cephsqlite_fileio* io)
{
- auto&& appd = getdata(vfs);
- auto& cct = appd.cct;
- auto& cluster = appd.cluster;
bool gotmap = false;
- dv(10) << loc << dendl;
+ d(cct,cluster,10) << loc << dendl;
enoent_retry:
if (loc.pool[0] == '*') {
std::string err;
int64_t id = strict_strtoll(loc.pool.c_str()+1, 10, &err);
ceph_assert(err.empty());
- if (int rc = cluster.ioctx_create2(id, io->ioctx); rc < 0) {
+ if (int rc = cluster->ioctx_create2(id, io->ioctx); rc < 0) {
if (rc == -ENOENT && !gotmap) {
- cluster.wait_for_latest_osdmap();
+ cluster->wait_for_latest_osdmap();
gotmap = true;
goto enoent_retry;
}
- dv(10) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
+ d(cct,cluster,1) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
return rc;
}
} else {
- if (int rc = cluster.ioctx_create(loc.pool.c_str(), io->ioctx); rc < 0) {
+ if (int rc = cluster->ioctx_create(loc.pool.c_str(), io->ioctx); rc < 0) {
if (rc == -ENOENT && !gotmap) {
- cluster.wait_for_latest_osdmap();
+ cluster->wait_for_latest_osdmap();
gotmap = true;
goto enoent_retry;
}
- dv(10) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
+ d(cct,cluster,1) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
return rc;
}
}
io->ioctx.set_namespace(loc.radosns);
io->rs = std::make_unique<SimpleRADOSStriper>(io->ioctx, loc.name);
- io->rs->set_logger(appd.striper_logger);
+ io->rs->set_logger(getdata(vfs).striper_logger);
io->rs->set_lock_timeout(cct->_conf.get_val<std::chrono::milliseconds>("cephsqlite_lock_renewal_timeout"));
io->rs->set_lock_interval(cct->_conf.get_val<std::chrono::milliseconds>("cephsqlite_lock_renewal_interval"));
io->rs->set_blocklist_the_dead(cct->_conf.get_val<bool>("cephsqlite_blocklist_dead_locker"));
+ io->cluster = std::move(cluster);
+ io->cct = cct;
return 0;
}
auto start = ceph::coarse_mono_clock::now();
bool gotmap = false;
- auto& cluster = getdata(vfs).cluster;
+ auto [cct, cluster] = getdata(vfs).get_cluster();
/* we are not going to create temporary files */
if (name == NULL) {
f->flags = flags;
enoent_retry:
- if (int rc = makestriper(vfs, f->loc, &f->io); rc < 0) {
+ if (int rc = makestriper(vfs, cct, cluster, f->loc, &f->io); rc < 0) {
f->~cephsqlite_file();
- dv(5) << "cannot open striper" << dendl;
+ dv(-1) << "cannot open striper" << dendl;
return SQLITE_IOERR;
}
* in testing when pools are getting created/deleted left and right.
*/
dv(5) << "retrying create after getting latest OSDMap" << dendl;
- cluster.wait_for_latest_osdmap();
+ cluster->wait_for_latest_osdmap();
gotmap = true;
goto enoent_retry;
}
if (rc == -ENOENT && !gotmap) {
/* See comment above for create case. */
dv(5) << "retrying open after getting latest OSDMap" << dendl;
- cluster.wait_for_latest_osdmap();
+ cluster->wait_for_latest_osdmap();
gotmap = true;
goto enoent_retry;
}
static int Delete(sqlite3_vfs* vfs, const char* path, int dsync)
{
auto start = ceph::coarse_mono_clock::now();
+ auto [cct, cluster] = getdata(vfs).get_cluster();
dv(5) << "'" << path << "', " << dsync << dendl;
cephsqlite_fileloc fileloc;
}
cephsqlite_fileio io;
- if (int rc = makestriper(vfs, fileloc, &io); rc < 0) {
- dv(5) << "cannot open striper" << dendl;
+ if (int rc = makestriper(vfs, cct, cluster, fileloc, &io); rc < 0) {
+ dv(-1) << "cannot open striper" << dendl;
return SQLITE_IOERR;
}
static int Access(sqlite3_vfs* vfs, const char* path, int flags, int* result)
{
auto start = ceph::coarse_mono_clock::now();
+ auto [cct, cluster] = getdata(vfs).get_cluster();
dv(5) << path << " " << std::hex << flags << dendl;
cephsqlite_fileloc fileloc;
}
cephsqlite_fileio io;
- if (int rc = makestriper(vfs, fileloc, &io); rc < 0) {
- dv(5) << "cannot open striper" << dendl;
+ if (int rc = makestriper(vfs, cct, cluster, fileloc, &io); rc < 0) {
+ dv(-1) << "cannot open striper" << dendl;
return SQLITE_IOERR;
}
{
auto start = ceph::coarse_mono_clock::now();
auto path = std::string_view(ipath);
-
+ auto [cct, cluster] = getdata(vfs).get_cluster();
dv(5) << "1: " << path << dendl;
cephsqlite_fileloc fileloc;
static int CurrentTime(sqlite3_vfs* vfs, sqlite3_int64* time)
{
auto start = ceph::coarse_mono_clock::now();
+ auto [cct, cluster] = getdata(vfs).get_cluster();
dv(5) << time << dendl;
auto t = ceph_clock_now();
return SQLITE_OK;
}
-LIBCEPHSQLITE_API int cephsqlite_setcct(CephContext* cct, char** ident)
+LIBCEPHSQLITE_API int cephsqlite_setcct(CephContext* _cct, char** ident)
{
- ldout(cct, 1) << "cct: " << cct << dendl;
+ ldout(_cct, 1) << "cct: " << _cct << dendl;
if (sqlite3_api == nullptr) {
- lderr(cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
+ lderr(_cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
return -EINVAL;
}
auto vfs = sqlite3_vfs_find("ceph");
if (!vfs) {
- lderr(cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
+ lderr(_cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
return -EINVAL;
}
auto& appd = getdata(vfs);
- appd.cct = cct;
- if (int rc = appd.setup_perf(); rc < 0) {
- appd.cct = nullptr;
- return rc;
- }
- if (int rc = appd.init_cluster(); rc < 0) {
- appd.cct = nullptr;
+ if (int rc = appd.open(_cct); rc < 0) {
return rc;
}
- auto s = appd.cluster.get_addrs();
+ auto [cct, cluster] = appd.get_cluster();
+
+ auto s = cluster->get_addrs();
if (ident) {
*ident = strdup(s.c_str());
}
static void f_perf(sqlite3_context* ctx, int argc, sqlite3_value** argv)
{
auto vfs = (sqlite3_vfs*)sqlite3_user_data(ctx);
+ auto [cct, cluster] = getdata(vfs).get_cluster();
dv(10) << dendl;
auto&& appd = getdata(vfs);
JSONFormatter f(false);
static void f_status(sqlite3_context* ctx, int argc, sqlite3_value** argv)
{
auto vfs = (sqlite3_vfs*)sqlite3_user_data(ctx);
+ auto [cct, cluster] = getdata(vfs).get_cluster();
dv(10) << dendl;
- auto&& appd = getdata(vfs);
JSONFormatter f(false);
f.open_object_section("ceph_status");
- f.dump_int("id", appd.cluster.get_instance_id());
- f.dump_string("addr", appd.cluster.get_addrs());
+ f.dump_int("id", cluster->get_instance_id());
+ f.dump_string("addr", cluster->get_addrs());
f.close_section();
{
CachedStackStringStream css;