return check
return CheckFileInput
+# If the mgr loses its lock on the database because e.g. the pgs were
+# transiently down, then close it and allow it to be reopened.
+MAX_DBCLEANUP_RETRIES = 3
+def MgrModuleRecoverDB(func: Callable) -> Callable:
+ @functools.wraps(func)
+ def check(self: MgrModule, *args: Any, **kwargs: Any) -> Any:
+ retries = 0
+ while True:
+ try:
+ return func(self, *args, **kwargs)
+ except sqlite3.DatabaseError as e:
+ self.log.error(f"Caught fatal database error: {e}")
+ retries = retries+1
+ if retries > MAX_DBCLEANUP_RETRIES:
+ raise
+ self.log.debug(f"attempting reopen of database")
+ self.close_db()
+ self.open_db();
+ # allow retry of func(...)
+ check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
+ return check
+
def CLIRequiresDB(func: HandlerFuncType) -> HandlerFuncType:
@functools.wraps(func)
def check(self: MgrModule, *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
if version <= 0:
self.log.info(f"creating main.db for {self.module_name}")
assert self.SCHEMA is not None
- cur = db.executescript(self.SCHEMA)
+ db.executescript(self.SCHEMA)
self.update_schema_version(db, 1)
else:
assert self.SCHEMA_VERSIONED is not None
db.row_factory = sqlite3.Row
self.load_schema(db)
+ def close_db(self) -> None:
+ with self._db_lock:
+ if self._db is not None:
+ self._db.close()
+ self._db = None
+
def open_db(self) -> Optional[sqlite3.Connection]:
if not self.pool_exists(self.MGR_POOL_NAME):
if not self.have_enough_osds():
uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
self.log.debug(f"using uri {uri}")
db = sqlite3.connect(uri, check_same_thread=False, uri=True)
+ # if libcephsqlite reconnects, update the addrv for blocklist
+ with db:
+ cur = db.execute('SELECT json_extract(ceph_status(), "$.addr");')
+ (addrv,) = cur.fetchone()
+ assert addrv is not None
+ self.log.debug(f"new libcephsqlite addrv = {addrv}")
+ self._ceph_register_client("libcephsqlite", addrv, True)
self.configure_db(db)
return db
if self._rados:
addrs = self._rados.get_addrs()
self._rados.shutdown()
- self._ceph_unregister_client(addrs)
+ self._ceph_unregister_client(None, addrs)
self._rados = None
@API.expose
@API.expose
@profile_method()
- def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
+ def get_unlabeled_perf_counters(self, prio_limit: int = PRIO_USEFUL,
services: Sequence[str] = ("mds", "mon", "osd",
"rbd-mirror", "rgw",
"tcmu-runner")) -> Dict[str, dict]:
ctx_capsule = self.get_context()
self._rados = rados.Rados(context=ctx_capsule)
self._rados.connect()
- self._ceph_register_client(self._rados.get_addrs())
+ self._ceph_register_client(None, self._rados.get_addrs(), False)
return self._rados
@staticmethod