]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/telegraf/basesocket.py
2 from urllib
.parse
import ParseResult
3 from typing
import Any
, Dict
, Optional
, Tuple
, Union
6 class BaseSocket(object):
8 'unixgram': (socket
.AF_UNIX
, socket
.SOCK_DGRAM
),
9 'unix': (socket
.AF_UNIX
, socket
.SOCK_STREAM
),
10 'tcp': (socket
.AF_INET
, socket
.SOCK_STREAM
),
11 'tcp6': (socket
.AF_INET6
, socket
.SOCK_STREAM
),
12 'udp': (socket
.AF_INET
, socket
.SOCK_DGRAM
),
13 'udp6': (socket
.AF_INET6
, socket
.SOCK_DGRAM
),
16 def __init__(self
, url
: ParseResult
) -> None:
20 socket_family
, socket_type
= self
.schemes
[self
.url
.scheme
]
22 raise RuntimeError('Unsupported socket type: %s', self
.url
.scheme
)
24 self
.sock
= socket
.socket(family
=socket_family
, type=socket_type
)
25 if self
.sock
.family
== socket
.AF_UNIX
:
26 self
.address
: Union
[str, Tuple
[str, int]] = self
.url
.path
28 assert self
.url
.hostname
30 self
.address
= (self
.url
.hostname
, self
.url
.port
)
32 def connect(self
) -> None:
33 return self
.sock
.connect(self
.address
)
35 def close(self
) -> None:
38 def send(self
, data
: str, flags
: int = 0) -> int:
39 return self
.sock
.send(data
.encode('utf-8') + b
'\n', flags
)
41 def __del__(self
) -> None:
44 def __enter__(self
) -> 'BaseSocket':
48 def __exit__(self
, exc_type
: Any
, exc_val
: Any
, exc_tb
: Any
) -> None: