]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telegraf/basesocket.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / telegraf / basesocket.py
1 import socket
2 from urllib.parse import ParseResult
3 from typing import Any, Dict, Optional, Tuple, Union
4
5
6 class BaseSocket(object):
7 schemes = {
8 'unixgram': (socket.AF_UNIX, socket.SOCK_DGRAM),
9 'unix': (socket.AF_UNIX, socket.SOCK_STREAM),
10 'tcp': (socket.AF_INET, socket.SOCK_STREAM),
11 'tcp6': (socket.AF_INET6, socket.SOCK_STREAM),
12 'udp': (socket.AF_INET, socket.SOCK_DGRAM),
13 'udp6': (socket.AF_INET6, socket.SOCK_DGRAM),
14 }
15
16 def __init__(self, url: ParseResult) -> None:
17 self.url = url
18
19 try:
20 socket_family, socket_type = self.schemes[self.url.scheme]
21 except KeyError:
22 raise RuntimeError('Unsupported socket type: %s', self.url.scheme)
23
24 self.sock = socket.socket(family=socket_family, type=socket_type)
25 if self.sock.family == socket.AF_UNIX:
26 self.address: Union[str, Tuple[str, int]] = self.url.path
27 else:
28 assert self.url.hostname
29 assert self.url.port
30 self.address = (self.url.hostname, self.url.port)
31
32 def connect(self) -> None:
33 return self.sock.connect(self.address)
34
35 def close(self) -> None:
36 self.sock.close()
37
38 def send(self, data: str, flags: int = 0) -> int:
39 return self.sock.send(data.encode('utf-8') + b'\n', flags)
40
41 def __del__(self) -> None:
42 self.sock.close()
43
44 def __enter__(self) -> 'BaseSocket':
45 self.connect()
46 return self
47
48 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
49 self.close()