]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/libcephsqlite.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / libcephsqlite.cc
index f533780c548bd99bda9555b8902c995872a1f3cb..b4fb968413bfb0ef962782ab1268b0f2c5acc998 100644 (file)
@@ -54,9 +54,9 @@ SQLITE_EXTENSION_INIT1
 #define dout_subsys ceph_subsys_cephsqlite
 #undef dout_prefix
 #define dout_prefix *_dout << "cephsqlite: " << __func__ << ": "
-#define d(vfs,lvl) ldout(getcct(vfs), (lvl)) << "(client." << getdata(vfs).cluster.get_instance_id() << ") "
-#define dv(lvl) d(vfs,(lvl))
-#define df(lvl) d(f->vfs,(lvl)) << f->loc << " "
+#define d(cct,cluster,lvl) ldout((cct), (lvl)) << "(client." << cluster->get_instance_id() << ") "
+#define dv(lvl) d(cct,cluster,(lvl))
+#define df(lvl) d(f->io.cct,f->io.cluster,(lvl)) << f->loc << " "
 
 enum {
   P_FIRST = 0xf0000,
@@ -80,15 +80,21 @@ enum {
   P_LAST,
 };
 
+using cctptr = boost::intrusive_ptr<CephContext>;
+using rsptr = std::shared_ptr<librados::Rados>;
+
 struct cephsqlite_appdata {
   ~cephsqlite_appdata() {
+    {
+      std::scoped_lock lock(cluster_mutex);
+      _disconnect();
+    }
     if (logger) {
       cct->get_perfcounters_collection()->remove(logger.get());
     }
     if (striper_logger) {
       cct->get_perfcounters_collection()->remove(striper_logger.get());
     }
-    cluster.shutdown();
   }
   int setup_perf() {
     ceph_assert(cct);
@@ -118,26 +124,96 @@ struct cephsqlite_appdata {
     cct->get_perfcounters_collection()->add(striper_logger.get());
     return 0;
   }
-  int init_cluster() {
+
+  std::pair<cctptr, rsptr> get_cluster() {
+    std::scoped_lock lock(cluster_mutex);
+    if (!cct) {
+      if (int rc = _open(nullptr); rc < 0) {
+        ceph_abort("could not open connection to ceph");
+      }
+    }
+    return {cct, cluster};
+  }
+  int connect() {
+    std::scoped_lock lock(cluster_mutex);
+    return _connect();
+  }
+  int reconnect() {
+    std::scoped_lock lock(cluster_mutex);
+    _disconnect();
+    return _connect();
+  }
+  int maybe_reconnect(rsptr _cluster) {
+    std::scoped_lock lock(cluster_mutex);
+    if (!cluster || cluster == _cluster) {
+      ldout(cct, 10) << "reconnecting to RADOS" << dendl;
+      _disconnect();
+      return _connect();
+    } else {
+      ldout(cct, 10) << "already reconnected" << dendl;
+      return 0;
+    }
+  }
+  int open(CephContext* _cct) {
+    std::scoped_lock lock(cluster_mutex);
+    return _open(_cct);
+  }
+
+  std::unique_ptr<PerfCounters> logger;
+  std::shared_ptr<PerfCounters> striper_logger;
+
+private:
+  int _open(CephContext* _cct) {
+    if (!_cct) {
+      std::vector<const char*> env_args;
+      env_to_vec(env_args, "CEPH_ARGS");
+      std::string cluster, conf_file_list; // unused
+      CephInitParameters iparams = ceph_argparse_early_args(env_args, CEPH_ENTITY_TYPE_CLIENT, &cluster, &conf_file_list);
+      cct = cctptr(common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0), false);
+      cct->_conf.parse_config_files(nullptr, &std::cerr, 0);
+      cct->_conf.parse_env(cct->get_module_type()); // environment variables override
+      cct->_conf.apply_changes(nullptr);
+      common_init_finish(cct.get());
+    } else {
+      cct = cctptr(_cct);
+    }
+
+    if (int rc = setup_perf(); rc < 0) {
+      return rc;
+    }
+
+    if (int rc = _connect(); rc < 0) {
+      return rc;
+    }
+
+    return 0;
+  }
+  void _disconnect() {
+    if (cluster) {
+      cluster.reset();
+    }
+  }
+  int _connect() {
     ceph_assert(cct);
+    auto _cluster = rsptr(new librados::Rados());
     ldout(cct, 5) << "initializing RADOS handle as " << cct->_conf->name << dendl;
-    if (int rc = cluster.init_with_context(cct.get()); rc < 0) {
+    if (int rc = _cluster->init_with_context(cct.get()); rc < 0) {
       lderr(cct) << "cannot initialize RADOS: " << cpp_strerror(rc) << dendl;
       return rc;
     }
-    if (int rc = cluster.connect(); rc < 0) {
+    if (int rc = _cluster->connect(); rc < 0) {
       lderr(cct) << "cannot connect: " << cpp_strerror(rc) << dendl;
       return rc;
     }
-    auto s = cluster.get_addrs();
+    auto s = _cluster->get_addrs();
     ldout(cct, 5) << "completed connection to RADOS with address " << s << dendl;
+    cluster = std::move(_cluster);
     return 0;
   }
 
-  boost::intrusive_ptr<CephContext> cct;
-  std::unique_ptr<PerfCounters> logger;
-  std::shared_ptr<PerfCounters> striper_logger;
-  librados::Rados cluster;
+  ceph::mutex cluster_mutex = ceph::make_mutex("libcephsqlite");;
+  cctptr cct;
+  rsptr cluster;
 };
 
 struct cephsqlite_fileloc {
@@ -147,6 +223,8 @@ struct cephsqlite_fileloc {
 };
 
 struct cephsqlite_fileio {
+  cctptr cct;
+  rsptr cluster; // anchor for ioctx
   librados::IoCtx ioctx;
   std::unique_ptr<SimpleRADOSStriper> rs;
 };
@@ -176,36 +254,6 @@ struct cephsqlite_file {
 
 #define getdata(vfs) (*((cephsqlite_appdata*)((vfs)->pAppData)))
 
-static CephContext* getcct(sqlite3_vfs* vfs)
-{
-  auto&& appd = getdata(vfs);
-  auto& cct = appd.cct;
-  if (cct) {
-    return cct.get();
-  }
-
-  /* bootstrap cct */
-  std::vector<const char*> env_args;
-  env_to_vec(env_args, "CEPH_ARGS");
-  std::string cluster, conf_file_list; // unused
-  CephInitParameters iparams = ceph_argparse_early_args(env_args, CEPH_ENTITY_TYPE_CLIENT, &cluster, &conf_file_list);
-  cct = boost::intrusive_ptr<CephContext>(common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0), false);
-  cct->_conf.parse_config_files(nullptr, &std::cerr, 0);
-  cct->_conf.parse_env(cct->get_module_type()); // environment variables override
-  cct->_conf.apply_changes(nullptr);
-  common_init_finish(cct.get());
-
-  if (int rc = appd.setup_perf(); rc < 0) {
-    ceph_abort("cannot setup perf counters");
-  }
-
-  if (int rc = appd.init_cluster(); rc < 0) {
-    ceph_abort("cannot setup RADOS cluster handle");
-  }
-
-  return cct.get();
-}
-
 static int Lock(sqlite3_file *file, int ilock)
 {
   auto f = (cephsqlite_file*)file;
@@ -218,6 +266,9 @@ static int Lock(sqlite3_file *file, int ilock)
   if (!f->io.rs->is_locked() && ilock > SQLITE_LOCK_NONE) {
     if (int rc = f->io.rs->lock(0); rc < 0) {
       df(5) << "failed: " << rc << dendl;
+      if (rc == -EBLOCKLISTED) {
+        getdata(f->vfs).maybe_reconnect(f->io.cluster);
+      }
       return SQLITE_IOERR;
     }
   }
@@ -240,6 +291,9 @@ static int Unlock(sqlite3_file *file, int ilock)
   if (ilock <= SQLITE_LOCK_NONE && SQLITE_LOCK_NONE < lock) {
     if (int rc = f->io.rs->unlock(); rc < 0) {
       df(5) << "failed: " << rc << dendl;
+      if (rc == -EBLOCKLISTED) {
+        getdata(f->vfs).maybe_reconnect(f->io.cluster);
+      }
       return SQLITE_IOERR;
     }
   }
@@ -290,13 +344,16 @@ static int Read(sqlite3_file *file, void *buf, int len, sqlite_int64 off)
 
   if (int rc = f->io.rs->read(buf, len, off); rc < 0) {
     df(5) << "read failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_IOERR_READ;
   } else {
     df(5) << "= " << rc << dendl;
     auto end = ceph::coarse_mono_clock::now();
     getdata(f->vfs).logger->tinc(P_OPF_READ, end-start);
     if (rc < len) {
-      memset(buf, 0, len-rc);
+      memset((unsigned char*)buf+rc, 0, len-rc);
       return SQLITE_IOERR_SHORT_READ;
     } else {
       return SQLITE_OK;
@@ -312,6 +369,9 @@ static int Write(sqlite3_file *file, const void *buf, int len, sqlite_int64 off)
 
   if (int rc = f->io.rs->write(buf, len, off); rc < 0) {
     df(5) << "write failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_IOERR_WRITE;
   } else {
     df(5) << "= " << rc << dendl;
@@ -330,6 +390,9 @@ static int Truncate(sqlite3_file *file, sqlite_int64 size)
 
   if (int rc = f->io.rs->truncate(size); rc < 0) {
     df(5) << "truncate failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_IOERR;
   }
 
@@ -346,6 +409,9 @@ static int Sync(sqlite3_file *file, int flags)
 
   if (int rc = f->io.rs->flush(); rc < 0) {
     df(5) << "failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_IOERR;
   }
 
@@ -366,6 +432,9 @@ static int FileSize(sqlite3_file *file, sqlite_int64 *osize)
   uint64_t size = 0;
   if (int rc = f->io.rs->stat(&size); rc < 0) {
     df(5) << "stat failed: " << cpp_strerror(rc) << dendl;
+    if (rc == -EBLOCKLISTED) {
+      getdata(f->vfs).maybe_reconnect(f->io.cluster);
+    }
     return SQLITE_NOTFOUND;
   }
 
@@ -397,37 +466,34 @@ static bool parsepath(std::string_view path, struct cephsqlite_fileloc* fileloc)
   return true;
 }
 
-static int makestriper(sqlite3_vfs* vfs, const cephsqlite_fileloc& loc, cephsqlite_fileio* io)
+static int makestriper(sqlite3_vfs* vfs, cctptr cct, rsptr cluster, const cephsqlite_fileloc& loc, cephsqlite_fileio* io)
 {
-  auto&& appd = getdata(vfs);
-  auto& cct = appd.cct;
-  auto& cluster = appd.cluster;
   bool gotmap = false;
 
-  dv(10) << loc << dendl;
+  d(cct,cluster,10) << loc << dendl;
 
 enoent_retry:
   if (loc.pool[0] == '*') {
     std::string err;
     int64_t id = strict_strtoll(loc.pool.c_str()+1, 10, &err);
     ceph_assert(err.empty());
-    if (int rc = cluster.ioctx_create2(id, io->ioctx); rc < 0) {
+    if (int rc = cluster->ioctx_create2(id, io->ioctx); rc < 0) {
       if (rc == -ENOENT && !gotmap) {
-        cluster.wait_for_latest_osdmap();
+        cluster->wait_for_latest_osdmap();
         gotmap = true;
         goto enoent_retry;
       }
-      dv(10) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
+      d(cct,cluster,1) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
       return rc;
     }
   } else {
-    if (int rc = cluster.ioctx_create(loc.pool.c_str(), io->ioctx); rc < 0) {
+    if (int rc = cluster->ioctx_create(loc.pool.c_str(), io->ioctx); rc < 0) {
       if (rc == -ENOENT && !gotmap) {
-        cluster.wait_for_latest_osdmap();
+        cluster->wait_for_latest_osdmap();
         gotmap = true;
         goto enoent_retry;
       }
-      dv(10) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
+      d(cct,cluster,1) << "cannot create ioctx: " << cpp_strerror(rc) << dendl;
       return rc;
     }
   }
@@ -436,10 +502,12 @@ enoent_retry:
     io->ioctx.set_namespace(loc.radosns);
 
   io->rs = std::make_unique<SimpleRADOSStriper>(io->ioctx, loc.name);
-  io->rs->set_logger(appd.striper_logger);
+  io->rs->set_logger(getdata(vfs).striper_logger);
   io->rs->set_lock_timeout(cct->_conf.get_val<std::chrono::milliseconds>("cephsqlite_lock_renewal_timeout"));
   io->rs->set_lock_interval(cct->_conf.get_val<std::chrono::milliseconds>("cephsqlite_lock_renewal_interval"));
   io->rs->set_blocklist_the_dead(cct->_conf.get_val<bool>("cephsqlite_blocklist_dead_locker"));
+  io->cluster = std::move(cluster);
+  io->cct = cct;
 
   return 0;
 }
@@ -502,7 +570,7 @@ static int Open(sqlite3_vfs *vfs, const char *name, sqlite3_file *file,
 
   auto start = ceph::coarse_mono_clock::now();
   bool gotmap = false;
-  auto& cluster = getdata(vfs).cluster;
+  auto [cct, cluster] = getdata(vfs).get_cluster();
 
   /* we are not going to create temporary files */
   if (name == NULL) {
@@ -525,9 +593,9 @@ static int Open(sqlite3_vfs *vfs, const char *name, sqlite3_file *file,
   f->flags = flags;
 
 enoent_retry:
-  if (int rc = makestriper(vfs, f->loc, &f->io); rc < 0) {
+  if (int rc = makestriper(vfs, cct, cluster, f->loc, &f->io); rc < 0) {
     f->~cephsqlite_file();
-    dv(5) << "cannot open striper" << dendl;
+    dv(-1) << "cannot open striper" << dendl;
     return SQLITE_IOERR;
   }
 
@@ -540,7 +608,7 @@ enoent_retry:
          * in testing when pools are getting created/deleted left and right.
          */
         dv(5) << "retrying create after getting latest OSDMap" << dendl;
-        cluster.wait_for_latest_osdmap();
+        cluster->wait_for_latest_osdmap();
         gotmap = true;
         goto enoent_retry;
       }
@@ -553,7 +621,7 @@ enoent_retry:
     if (rc == -ENOENT && !gotmap) {
       /* See comment above for create case. */
       dv(5) << "retrying open after getting latest OSDMap" << dendl;
-      cluster.wait_for_latest_osdmap();
+      cluster->wait_for_latest_osdmap();
       gotmap = true;
       goto enoent_retry;
     }
@@ -578,6 +646,7 @@ enoent_retry:
 static int Delete(sqlite3_vfs* vfs, const char* path, int dsync)
 {
   auto start = ceph::coarse_mono_clock::now();
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(5) << "'" << path << "', " << dsync << dendl;
 
   cephsqlite_fileloc fileloc;
@@ -587,8 +656,8 @@ static int Delete(sqlite3_vfs* vfs, const char* path, int dsync)
   }
 
   cephsqlite_fileio io;
-  if (int rc = makestriper(vfs, fileloc, &io); rc < 0) {
-    dv(5) << "cannot open striper" << dendl;
+  if (int rc = makestriper(vfs, cct, cluster, fileloc, &io); rc < 0) {
+    dv(-1) << "cannot open striper" << dendl;
     return SQLITE_IOERR;
   }
 
@@ -616,6 +685,7 @@ static int Delete(sqlite3_vfs* vfs, const char* path, int dsync)
 static int Access(sqlite3_vfs* vfs, const char* path, int flags, int* result)
 {
   auto start = ceph::coarse_mono_clock::now();
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(5) << path << " " << std::hex << flags << dendl;
 
   cephsqlite_fileloc fileloc;
@@ -625,8 +695,8 @@ static int Access(sqlite3_vfs* vfs, const char* path, int flags, int* result)
   }
 
   cephsqlite_fileio io;
-  if (int rc = makestriper(vfs, fileloc, &io); rc < 0) {
-    dv(5) << "cannot open striper" << dendl;
+  if (int rc = makestriper(vfs, cct, cluster, fileloc, &io); rc < 0) {
+    dv(-1) << "cannot open striper" << dendl;
     return SQLITE_IOERR;
   }
 
@@ -662,7 +732,7 @@ static int FullPathname(sqlite3_vfs* vfs, const char* ipath, int opathlen, char*
 {
   auto start = ceph::coarse_mono_clock::now();
   auto path = std::string_view(ipath);
-
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(5) << "1: " <<  path << dendl;
 
   cephsqlite_fileloc fileloc;
@@ -688,6 +758,7 @@ static int FullPathname(sqlite3_vfs* vfs, const char* ipath, int opathlen, char*
 static int CurrentTime(sqlite3_vfs* vfs, sqlite3_int64* time)
 {
   auto start = ceph::coarse_mono_clock::now();
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(5) << time << dendl;
 
   auto t = ceph_clock_now();
@@ -698,33 +769,29 @@ static int CurrentTime(sqlite3_vfs* vfs, sqlite3_int64* time)
   return SQLITE_OK;
 }
 
-LIBCEPHSQLITE_API int cephsqlite_setcct(CephContext* cct, char** ident)
+LIBCEPHSQLITE_API int cephsqlite_setcct(CephContext* _cct, char** ident)
 {
-  ldout(cct, 1) << "cct: " << cct << dendl;
+  ldout(_cct, 1) << "cct: " << _cct << dendl;
 
   if (sqlite3_api == nullptr) {
-    lderr(cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
+    lderr(_cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
     return -EINVAL;
   }
 
   auto vfs = sqlite3_vfs_find("ceph");
   if (!vfs) {
-    lderr(cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
+    lderr(_cct) << "API violation: must have sqlite3 init libcephsqlite" << dendl;
     return -EINVAL;
   }
 
   auto& appd = getdata(vfs);
-  appd.cct = cct;
-  if (int rc = appd.setup_perf(); rc < 0) {
-    appd.cct = nullptr;
-    return rc;
-  }
-  if (int rc = appd.init_cluster(); rc < 0) {
-    appd.cct = nullptr;
+  if (int rc = appd.open(_cct); rc < 0) {
     return rc;
   }
 
-  auto s = appd.cluster.get_addrs();
+  auto [cct, cluster] = appd.get_cluster();
+
+  auto s = cluster->get_addrs();
   if (ident) {
     *ident = strdup(s.c_str());
   }
@@ -737,6 +804,7 @@ LIBCEPHSQLITE_API int cephsqlite_setcct(CephContext* cct, char** ident)
 static void f_perf(sqlite3_context* ctx, int argc, sqlite3_value** argv)
 {
   auto vfs = (sqlite3_vfs*)sqlite3_user_data(ctx);
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(10) << dendl;
   auto&& appd = getdata(vfs);
   JSONFormatter f(false);
@@ -756,12 +824,12 @@ static void f_perf(sqlite3_context* ctx, int argc, sqlite3_value** argv)
 static void f_status(sqlite3_context* ctx, int argc, sqlite3_value** argv)
 {
   auto vfs = (sqlite3_vfs*)sqlite3_user_data(ctx);
+  auto [cct, cluster] = getdata(vfs).get_cluster();
   dv(10) << dendl;
-  auto&& appd = getdata(vfs);
   JSONFormatter f(false);
   f.open_object_section("ceph_status");
-  f.dump_int("id", appd.cluster.get_instance_id());
-  f.dump_string("addr", appd.cluster.get_addrs());
+  f.dump_int("id", cluster->get_instance_id());
+  f.dump_string("addr", cluster->get_addrs());
   f.close_section();
   {
     CachedStackStringStream css;