import sys
from threading import Lock, Condition, Event
from typing import no_type_check
+import urllib
from functools import wraps
if sys.version_info >= (3, 3):
from threading import Timer
from threading import _Timer as Timer
from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
+
+from ceph.deployment.utils import wrap_ipv6
+
T = TypeVar('T')
if TYPE_CHECKING:
# TODO: make this configurable
TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
CONNECTION_IDLE_INTERVAL = 60.0 # seconds
+ MAX_CONCURRENT_CONNECTIONS = 5 # max number of concurrent connections per volume
def __init__(self, mgr: Module_T):
self.mgr = mgr
- self.connections: Dict[str, CephfsConnectionPool.Connection] = {}
+ self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {}
self.lock = Lock()
self.cond = Condition(self.lock)
self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
def cleanup_connections(self) -> None:
with self.lock:
logger.info("scanning for idle connections..")
- idle_fs = [fs_name for fs_name, conn in self.connections.items()
- if conn.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL)]
- for fs_name in idle_fs:
- logger.info("cleaning up connection for '{}'".format(fs_name))
- self._del_fs_handle(fs_name)
+ idle_conns = []
+ for fs_name, connections in self.connections.items():
+ logger.debug(f'fs_name ({fs_name}) connections ({connections})')
+ for connection in connections:
+ if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL):
+ idle_conns.append((fs_name, connection))
+ logger.info(f'cleaning up connections: {idle_conns}')
+ for idle_conn in idle_conns:
+ self._del_connection(idle_conn[0], idle_conn[1])
def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
with self.lock:
- conn = None
try:
- conn = self.connections.get(fs_name, None)
- if conn:
- if conn.is_connection_valid():
- return conn.get_fs_handle()
+ min_shared = 0
+ shared_connection = None
+ connections = self.connections.setdefault(fs_name, [])
+ logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})')
+ if connections:
+ min_shared = connections[0].ops_in_progress
+ shared_connection = connections[0]
+ for connection in list(connections):
+ logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}')
+ if connection.ops_in_progress == 0:
+ if connection.is_connection_valid():
+ logger.debug(f'[get] connection ({connection}) can be reused')
+ return connection.get_fs_handle()
+ else:
+ # filesystem id changed beneath us (or the filesystem does not exist).
+ # this is possible if the filesystem got removed (and recreated with
+ # same name) via "ceph fs rm/new" mon command.
+ logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
+ # note -- this will mutate @connections too
+ self._del_connection(fs_name, connection)
else:
- # filesystem id changed beneath us (or the filesystem does not exist).
- # this is possible if the filesystem got removed (and recreated with
- # same name) via "ceph fs rm/new" mon command.
- logger.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
- self._del_fs_handle(fs_name)
- conn = CephfsConnectionPool.Connection(self.mgr, fs_name)
- conn.connect()
+ if connection.ops_in_progress < min_shared:
+ min_shared = connection.ops_in_progress
+ shared_connection = connection
+ # when we end up here, there are no "free" connections. so either spin up a new
+ # one or share it.
+ if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS:
+ logger.debug('[get] spawning new connection since no connection is unused and we still have room for more')
+ connection = CephfsConnectionPool.Connection(self.mgr, fs_name)
+ connection.connect()
+ self.connections[fs_name].append(connection)
+ return connection.get_fs_handle()
+ else:
+ assert shared_connection is not None
+ logger.debug(f'[get] using shared connection ({shared_connection})')
+ return shared_connection.get_fs_handle()
except cephfs.Error as e:
# try to provide a better error string if possible
if e.args[0] == errno.ENOENT:
raise CephfsConnectionException(
-errno.ENOENT, "FS '{0}' not found".format(fs_name))
raise CephfsConnectionException(-e.args[0], e.args[1])
- self.connections[fs_name] = conn
- return conn.get_fs_handle()
- def put_fs_handle(self, fs_name: str) -> None:
+ def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None:
with self.lock:
- conn = self.connections.get(fs_name, None)
- if conn:
- conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
+ connections = self.connections.get(fs_name, [])
+ for connection in connections:
+ if connection.fs == fs_handle:
+ logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}')
+ connection.put_fs_handle(notify=lambda: self.cond.notifyAll())
- def _del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
- conn = self.connections.pop(fs_name, None)
- if conn:
- conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
+ def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None:
+ self.connections[fs_name].remove(connection)
+ connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
- def del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
+ def _del_connections(self, fs_name: str, wait: bool = False) -> None:
+ for connection in list(self.connections.get(fs_name, [])):
+ self._del_connection(fs_name, connection, wait)
+
+ def del_connections(self, fs_name: str, wait: bool = False) -> None:
with self.lock:
- self._del_fs_handle(fs_name, wait)
+ self._del_connections(fs_name, wait)
- def del_all_handles(self) -> None:
+ def del_all_connections(self) -> None:
with self.lock:
for fs_name in list(self.connections.keys()):
logger.info("waiting for pending ops for '{}'".format(fs_name))
- self._del_fs_handle(fs_name, wait=True)
+ self._del_connections(fs_name, wait=True)
logger.info("pending ops completed for '{}'".format(fs_name))
# no new connections should have been initialized since its
# guarded on shutdown.
# first, note that we're shutting down
self.stopping.set()
# second, delete all libcephfs handles from connection pool
- self.connection_pool.del_all_handles()
+ self.connection_pool.del_all_connections()
def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
fs_map = self.mgr.get('fs_map')
try:
yield fs_handle
finally:
- fsc.connection_pool.put_fs_handle(fs_name)
+ fsc.connection_pool.put_fs_handle(fs_name, fs_handle)
def colorize(msg: str, color: int, dark: bool = False) -> str:
return result
+def build_url(host: str, scheme: Optional[str] = None, port: Optional[int] = None, path: str = '') -> str:
+ """
+ Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
+ automatically.
+
+ >>> build_url('example.com', 'https', 443)
+ 'https://example.com:443'
+
+ >>> build_url(host='example.com', port=443)
+ '//example.com:443'
+
+ >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
+ 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
+
+ >>> build_url('example.com', 'https', 443, path='/metrics')
+ 'https://example.com:443/metrics'
+
+
+ :param scheme: The scheme, e.g. http, https or ftp.
+ :type scheme: str
+ :param host: Consisting of either a registered name (including but not limited to
+ a hostname) or an IP address.
+ :type host: str
+ :type port: int
+ :rtype: str
+ """
+ netloc = wrap_ipv6(host)
+ if port:
+ netloc += ':{}'.format(port)
+ pr = urllib.parse.ParseResult(
+ scheme=scheme if scheme else '',
+ netloc=netloc,
+ path=path,
+ params='',
+ query='',
+ fragment='')
+ return pr.geturl()
+
+
class ServerConfigException(Exception):
pass