]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/telegraf/basesocket.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / telegraf / basesocket.py
CommitLineData
11fdf7f2 1import socket
20effc67
TL
2from urllib.parse import ParseResult
3from typing import Any, Dict, Optional, Tuple, Union
11fdf7f2
TL
4
5
6class BaseSocket(object):
7 schemes = {
8 'unixgram': (socket.AF_UNIX, socket.SOCK_DGRAM),
9 'unix': (socket.AF_UNIX, socket.SOCK_STREAM),
10 'tcp': (socket.AF_INET, socket.SOCK_STREAM),
11 'tcp6': (socket.AF_INET6, socket.SOCK_STREAM),
12 'udp': (socket.AF_INET, socket.SOCK_DGRAM),
13 'udp6': (socket.AF_INET6, socket.SOCK_DGRAM),
14 }
15
20effc67 16 def __init__(self, url: ParseResult) -> None:
11fdf7f2
TL
17 self.url = url
18
19 try:
20 socket_family, socket_type = self.schemes[self.url.scheme]
21 except KeyError:
22 raise RuntimeError('Unsupported socket type: %s', self.url.scheme)
23
24 self.sock = socket.socket(family=socket_family, type=socket_type)
25 if self.sock.family == socket.AF_UNIX:
20effc67 26 self.address: Union[str, Tuple[str, int]] = self.url.path
11fdf7f2 27 else:
20effc67
TL
28 assert self.url.hostname
29 assert self.url.port
11fdf7f2
TL
30 self.address = (self.url.hostname, self.url.port)
31
20effc67 32 def connect(self) -> None:
11fdf7f2
TL
33 return self.sock.connect(self.address)
34
20effc67 35 def close(self) -> None:
11fdf7f2
TL
36 self.sock.close()
37
20effc67 38 def send(self, data: str, flags: int = 0) -> int:
11fdf7f2
TL
39 return self.sock.send(data.encode('utf-8') + b'\n', flags)
40
20effc67 41 def __del__(self) -> None:
11fdf7f2
TL
42 self.sock.close()
43
20effc67 44 def __enter__(self) -> 'BaseSocket':
11fdf7f2
TL
45 self.connect()
46 return self
47
20effc67 48 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
11fdf7f2 49 self.close()