]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/mgr_util.py
import ceph 16.2.6
[ceph.git] / ceph / src / pybind / mgr / mgr_util.py
index 29a6f631477ee39c88b11aa879cd8cd8ce5dbb62..d865d31092aa9cc410b7231763c4e63ffbcd52d3 100644 (file)
@@ -13,6 +13,7 @@ import logging
 import sys
 from threading import Lock, Condition, Event
 from typing import no_type_check
+import urllib
 from functools import wraps
 if sys.version_info >= (3, 3):
     from threading import Timer
@@ -20,6 +21,9 @@ else:
     from threading import _Timer as Timer
 
 from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
+
+from ceph.deployment.utils import wrap_ipv6
+
 T = TypeVar('T')
 
 if TYPE_CHECKING:
@@ -181,10 +185,11 @@ class CephfsConnectionPool(object):
     # TODO: make this configurable
     TIMER_TASK_RUN_INTERVAL = 30.0   # seconds
     CONNECTION_IDLE_INTERVAL = 60.0  # seconds
+    MAX_CONCURRENT_CONNECTIONS = 5   # max number of concurrent connections per volume
 
     def __init__(self, mgr: Module_T):
         self.mgr = mgr
-        self.connections: Dict[str, CephfsConnectionPool.Connection] = {}
+        self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {}
         self.lock = Lock()
         self.cond = Condition(self.lock)
         self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
@@ -194,57 +199,87 @@ class CephfsConnectionPool(object):
     def cleanup_connections(self) -> None:
         with self.lock:
             logger.info("scanning for idle connections..")
-            idle_fs = [fs_name for fs_name, conn in self.connections.items()
-                       if conn.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL)]
-            for fs_name in idle_fs:
-                logger.info("cleaning up connection for '{}'".format(fs_name))
-                self._del_fs_handle(fs_name)
+            idle_conns = []
+            for fs_name, connections in self.connections.items():
+                logger.debug(f'fs_name ({fs_name}) connections ({connections})')
+                for connection in connections:
+                    if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL):
+                        idle_conns.append((fs_name, connection))
+            logger.info(f'cleaning up connections: {idle_conns}')
+            for idle_conn in idle_conns:
+                self._del_connection(idle_conn[0], idle_conn[1])
 
     def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
         with self.lock:
-            conn = None
             try:
-                conn = self.connections.get(fs_name, None)
-                if conn:
-                    if conn.is_connection_valid():
-                        return conn.get_fs_handle()
+                min_shared = 0
+                shared_connection = None
+                connections = self.connections.setdefault(fs_name, [])
+                logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})')
+                if connections:
+                    min_shared = connections[0].ops_in_progress
+                    shared_connection = connections[0]
+                for connection in list(connections):
+                    logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}')
+                    if connection.ops_in_progress == 0:
+                        if connection.is_connection_valid():
+                            logger.debug(f'[get] connection ({connection}) can be reused')
+                            return connection.get_fs_handle()
+                        else:
+                            # filesystem id changed beneath us (or the filesystem does not exist).
+                            # this is possible if the filesystem got removed (and recreated with
+                            # same name) via "ceph fs rm/new" mon command.
+                            logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
+                            # note -- this will mutate @connections too
+                            self._del_connection(fs_name, connection)
                     else:
-                        # filesystem id changed beneath us (or the filesystem does not exist).
-                        # this is possible if the filesystem got removed (and recreated with
-                        # same name) via "ceph fs rm/new" mon command.
-                        logger.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
-                        self._del_fs_handle(fs_name)
-                conn = CephfsConnectionPool.Connection(self.mgr, fs_name)
-                conn.connect()
+                        if connection.ops_in_progress < min_shared:
+                            min_shared = connection.ops_in_progress
+                            shared_connection = connection
+                # when we end up here, there are no "free" connections. so either spin up a new
+                # one or share it.
+                if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS:
+                    logger.debug('[get] spawning new connection since no connection is unused and we still have room for more')
+                    connection = CephfsConnectionPool.Connection(self.mgr, fs_name)
+                    connection.connect()
+                    self.connections[fs_name].append(connection)
+                    return connection.get_fs_handle()
+                else:
+                    assert shared_connection is not None
+                    logger.debug(f'[get] using shared connection ({shared_connection})')
+                    return shared_connection.get_fs_handle()
             except cephfs.Error as e:
                 # try to provide a better error string if possible
                 if e.args[0] == errno.ENOENT:
                     raise CephfsConnectionException(
                         -errno.ENOENT, "FS '{0}' not found".format(fs_name))
                 raise CephfsConnectionException(-e.args[0], e.args[1])
-            self.connections[fs_name] = conn
-            return conn.get_fs_handle()
 
-    def put_fs_handle(self, fs_name: str) -> None:
+    def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None:
         with self.lock:
-            conn = self.connections.get(fs_name, None)
-            if conn:
-                conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
+            connections = self.connections.get(fs_name, [])
+            for connection in connections:
+                if connection.fs == fs_handle:
+                    logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}')
+                    connection.put_fs_handle(notify=lambda: self.cond.notifyAll())
 
-    def _del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
-        conn = self.connections.pop(fs_name, None)
-        if conn:
-            conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+    def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None:
+        self.connections[fs_name].remove(connection)
+        connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
 
-    def del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
+    def _del_connections(self, fs_name: str, wait: bool = False) -> None:
+        for connection in list(self.connections.get(fs_name, [])):
+            self._del_connection(fs_name, connection, wait)
+
+    def del_connections(self, fs_name: str, wait: bool = False) -> None:
         with self.lock:
-            self._del_fs_handle(fs_name, wait)
+            self._del_connections(fs_name, wait)
 
-    def del_all_handles(self) -> None:
+    def del_all_connections(self) -> None:
         with self.lock:
             for fs_name in list(self.connections.keys()):
                 logger.info("waiting for pending ops for '{}'".format(fs_name))
-                self._del_fs_handle(fs_name, wait=True)
+                self._del_connections(fs_name, wait=True)
                 logger.info("pending ops completed for '{}'".format(fs_name))
             # no new connections should have been initialized since its
             # guarded on shutdown.
@@ -265,7 +300,7 @@ class CephfsClient(Generic[Module_T]):
         # first, note that we're shutting down
         self.stopping.set()
         # second, delete all libcephfs handles from connection pool
-        self.connection_pool.del_all_handles()
+        self.connection_pool.del_all_connections()
 
     def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
         fs_map = self.mgr.get('fs_map')
@@ -305,7 +340,7 @@ def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCep
     try:
         yield fs_handle
     finally:
-        fsc.connection_pool.put_fs_handle(fs_name)
+        fsc.connection_pool.put_fs_handle(fs_name, fs_handle)
 
 
 def colorize(msg: str, color: int, dark: bool = False) -> str:
@@ -400,6 +435,45 @@ def get_default_addr():
         return result
 
 
+def build_url(host: str, scheme: Optional[str] = None, port: Optional[int] = None, path: str = '') -> str:
+    """
+    Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
+    automatically.
+
+    >>> build_url('example.com', 'https', 443)
+    'https://example.com:443'
+
+    >>> build_url(host='example.com', port=443)
+    '//example.com:443'
+
+    >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
+    'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
+
+    >>> build_url('example.com', 'https', 443, path='/metrics')
+    'https://example.com:443/metrics'
+
+
+    :param scheme: The scheme, e.g. http, https or ftp.
+    :type scheme: str
+    :param host: Consisting of either a registered name (including but not limited to
+                 a hostname) or an IP address.
+    :type host: str
+    :type port: int
+    :rtype: str
+    """
+    netloc = wrap_ipv6(host)
+    if port:
+        netloc += ':{}'.format(port)
+    pr = urllib.parse.ParseResult(
+        scheme=scheme if scheme else '',
+        netloc=netloc,
+        path=path,
+        params='',
+        query='',
+        fragment='')
+    return pr.geturl()
+
+
 class ServerConfigException(Exception):
     pass