1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
19 //# include <Windows.h>
21 # include <winsock2.h>
23 // TODO: consider NOMINMAX
26 # pragma comment(lib, "ws2_32.lib")
33 # include <sys/epoll.h>
37 # include "TargetConditionals.h"
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/types.h>
44 // Common POSIX headers for Linux and Mac OS X
45 # include <arpa/inet.h>
48 # include <netinet/in.h>
49 # include <netinet/tcp.h>
50 # include <sys/socket.h>
55 # define _Out_cap_(size)
58 #if defined(HAVE_CONSOLE_LOG) && !defined(LOG_DEBUG)
59 // Log to console if there's no standard log facility defined
62 # define LOG_DEBUG(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
63 # define LOG_TRACE(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
64 # define LOG_INFO(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
65 # define LOG_WARN(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
66 # define LOG_ERROR(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
71 // Don't log anything if there's no standard log facility defined
72 # define LOG_DEBUG(fmt_, ...)
73 # define LOG_TRACE(fmt_, ...)
74 # define LOG_INFO(fmt_, ...)
75 # define LOG_WARN(fmt_, ...)
76 # define LOG_ERROR(fmt_, ...)
83 /// A simple thread, derived class overloads onThread() method.
89 std::atomic
<bool> m_terminate
{false};
92 /// Thread Constructor
94 /// <returns>Thread</returns>
103 m_thread
= std::thread([&]() { this->onThread(); });
112 if (m_thread
.joinable())
119 /// Indicates if this thread should terminate
121 /// <returns></returns>
122 bool shouldTerminate() const { return m_terminate
; }
125 /// Must be implemented by children
127 virtual void onThread() = 0;
130 /// Thread destructor
132 /// <returns></returns>
133 virtual ~Thread() noexcept
{}
136 }; // namespace common
137 namespace SocketTools
141 // WinSocks need extra (de)initialization, solved by a global object here,
142 // whose constructor/destructor will be called before and after main().
143 struct WsaInitializer
148 WSAStartup(MAKEWORD(2, 2), &wsaData
);
151 ~WsaInitializer() { WSACleanup(); }
154 static WsaInitializer g_wsaInitializer
;
159 /// Encapsulation of sockaddr(_in)
163 static u_long
const Loopback
= 0x7F000001;
168 /// SocketAddr constructor
170 /// <returns>SocketAddr</returns>
171 SocketAddr() { memset(&m_data
, 0, sizeof(m_data
)); }
173 SocketAddr(u_long addr
, int port
)
175 sockaddr_in
&inet4
= reinterpret_cast<sockaddr_in
&>(m_data
);
176 inet4
.sin_family
= AF_INET
;
177 inet4
.sin_port
= htons(static_cast<unsigned short>(port
));
178 inet4
.sin_addr
.s_addr
= htonl(addr
);
181 SocketAddr(char const *addr
)
184 INT addrlen
= sizeof(m_data
);
186 for (int i
= 0; i
< sizeof(buf
) && addr
[i
]; i
++)
191 ::WSAStringToAddressW(buf
, AF_INET
, nullptr, &m_data
, &addrlen
);
193 sockaddr_in
&inet4
= reinterpret_cast<sockaddr_in
&>(m_data
);
194 inet4
.sin_family
= AF_INET
;
195 char const *colon
= strchr(addr
, ':');
198 inet4
.sin_port
= htons(atoi(colon
+ 1));
200 memcpy(buf
, addr
, (std::min
<ptrdiff_t>)(15, colon
- addr
));
202 ::inet_pton(AF_INET
, buf
, &inet4
.sin_addr
);
207 ::inet_pton(AF_INET
, addr
, &inet4
.sin_addr
);
212 SocketAddr(SocketAddr
const &other
) = default;
214 SocketAddr
&operator=(SocketAddr
const &other
) = default;
216 operator sockaddr
*() { return &m_data
; }
218 operator const sockaddr
*() const { return &m_data
; }
222 switch (m_data
.sa_family
)
225 sockaddr_in
const &inet4
= reinterpret_cast<sockaddr_in
const &>(m_data
);
226 return ntohs(inet4
.sin_port
);
234 std::string
toString() const
236 std::ostringstream os
;
238 switch (m_data
.sa_family
)
241 sockaddr_in
const &inet4
= reinterpret_cast<sockaddr_in
const &>(m_data
);
242 u_long addr
= ntohl(inet4
.sin_addr
.s_addr
);
243 os
<< (addr
>> 24) << '.' << ((addr
>> 16) & 255) << '.' << ((addr
>> 8) & 255) << '.'
245 os
<< ':' << ntohs(inet4
.sin_port
);
250 os
<< "[?AF?" << m_data
.sa_family
<< ']';
257 /// Encapsulation of a socket (non-exclusive ownership)
263 static Type
const Invalid
= INVALID_SOCKET
;
266 static Type
const Invalid
= -1;
271 Socket(Type sock
= Invalid
) : m_sock(sock
) {}
273 Socket(int af
, int type
, int proto
) { m_sock
= ::socket(af
, type
, proto
); }
277 operator Socket::Type() const { return m_sock
; }
279 bool operator==(Socket
const &other
) const { return (m_sock
== other
.m_sock
); }
281 bool operator!=(Socket
const &other
) const { return (m_sock
!= other
.m_sock
); }
283 bool operator<(Socket
const &other
) const { return (m_sock
< other
.m_sock
); }
285 bool invalid() const { return (m_sock
== Invalid
); }
287 void setNonBlocking()
289 assert(m_sock
!= Invalid
);
292 ::ioctlsocket(m_sock
, FIONBIO
, &value
);
294 int flags
= ::fcntl(m_sock
, F_GETFL
, 0);
295 ::fcntl(m_sock
, F_SETFL
, flags
| O_NONBLOCK
);
301 assert(m_sock
!= Invalid
);
307 return (::setsockopt(m_sock
, SOL_SOCKET
, SO_REUSEADDR
, reinterpret_cast<char *>(&value
),
308 sizeof(value
)) == 0);
313 assert(m_sock
!= Invalid
);
319 return (::setsockopt(m_sock
, IPPROTO_TCP
, TCP_NODELAY
, reinterpret_cast<char *>(&value
),
320 sizeof(value
)) == 0);
323 bool connect(SocketAddr
const &addr
)
325 assert(m_sock
!= Invalid
);
326 return (::connect(m_sock
, addr
, sizeof(addr
)) == 0);
331 assert(m_sock
!= Invalid
);
333 ::closesocket(m_sock
);
340 int recv(_Out_cap_(size
) void *buffer
, unsigned size
)
342 assert(m_sock
!= Invalid
);
344 return static_cast<int>(::recv(m_sock
, reinterpret_cast<char *>(buffer
), size
, flags
));
347 int send(void const *buffer
, unsigned size
)
349 assert(m_sock
!= Invalid
);
350 return static_cast<int>(::send(m_sock
, reinterpret_cast<char const *>(buffer
), size
, 0));
353 bool bind(SocketAddr
const &addr
)
355 assert(m_sock
!= Invalid
);
356 return (::bind(m_sock
, addr
, sizeof(addr
)) == 0);
359 bool getsockname(SocketAddr
&addr
) const
361 assert(m_sock
!= Invalid
);
363 int addrlen
= sizeof(addr
);
365 socklen_t addrlen
= sizeof(addr
);
367 return (::getsockname(m_sock
, addr
, &addrlen
) == 0);
370 bool listen(int backlog
)
372 assert(m_sock
!= Invalid
);
373 return (::listen(m_sock
, backlog
) == 0);
376 bool accept(Socket
&csock
, SocketAddr
&caddr
)
378 assert(m_sock
!= Invalid
);
380 int addrlen
= sizeof(caddr
);
382 socklen_t addrlen
= sizeof(caddr
);
384 csock
= ::accept(m_sock
, caddr
, &addrlen
);
385 return !csock
.invalid();
388 bool shutdown(int how
)
390 assert(m_sock
!= Invalid
);
391 return (::shutdown(m_sock
, how
) == 0);
397 return ::WSAGetLastError();
406 ErrorWouldBlock
= WSAEWOULDBLOCK
408 ErrorWouldBlock
= EWOULDBLOCK
415 ShutdownReceive
= SD_RECEIVE
,
416 ShutdownSend
= SD_SEND
,
417 ShutdownBoth
= SD_BOTH
419 ShutdownReceive
= SHUT_RD
,
420 ShutdownSend
= SHUT_WR
,
421 ShutdownBoth
= SHUT_RDWR
434 SocketData() : socket(), flags(0) {}
436 bool operator==(Socket s
) { return (socket
== s
); }
442 struct Reactor
: protected common::Thread
445 /// Socket State callback
450 virtual void onSocketReadable(Socket sock
) = 0;
451 virtual void onSocketWritable(Socket sock
) = 0;
452 virtual void onSocketAcceptable(Socket sock
) = 0;
453 virtual void onSocketClosed(Socket sock
) = 0;
467 SocketCallback
&m_callback
;
469 std::vector
<SocketData
> m_sockets
;
472 /* use WinSock events on Windows */
473 std::vector
<WSAEVENT
> m_events
{};
477 /* use epoll on Linux */
482 /* use kqueue on Mac */
483 # define KQUEUE_SIZE 32
485 struct kevent m_events
[KQUEUE_SIZE
];
489 Reactor(SocketCallback
&callback
) : m_callback(callback
)
493 m_epollFd
= ::epoll_create(0);
495 m_epollFd
= ::epoll_create1(0);
500 bzero(&m_events
[0], sizeof(m_events
));
518 /// <param name="socket"></param>
519 /// <param name="flags"></param>
520 void addSocket(const Socket
&socket
, int flags
)
524 removeSocket(socket
);
528 auto it
= std::find(m_sockets
.begin(), m_sockets
.end(), socket
);
529 if (it
== m_sockets
.end())
531 LOG_TRACE("Reactor: Adding socket 0x%x with flags 0x%x", static_cast<int>(socket
), flags
);
533 m_events
.push_back(::WSACreateEvent());
536 epoll_event event
= {};
537 event
.data
.fd
= socket
;
539 ::epoll_ctl(m_epollFd
, EPOLL_CTL_ADD
, socket
, &event
);
543 bzero(&event
, sizeof(event
));
544 event
.ident
= socket
.m_sock
;
545 EV_SET(&event
, event
.ident
, EVFILT_READ
, EV_ADD
, 0, 0, NULL
);
546 kevent(kq
, &event
, 1, NULL
, 0, NULL
);
547 EV_SET(&event
, event
.ident
, EVFILT_WRITE
, EV_ADD
, 0, 0, NULL
);
548 kevent(kq
, &event
, 1, NULL
, 0, NULL
);
550 m_sockets
.push_back(SocketData());
551 m_sockets
.back().socket
= socket
;
552 m_sockets
.back().flags
= 0;
553 it
= m_sockets
.end() - 1;
557 LOG_TRACE("Reactor: Updating socket 0x%x with flags 0x%x", static_cast<int>(socket
), flags
);
560 if (it
->flags
!= flags
)
564 long lNetworkEvents
= 0;
565 if (it
->flags
& Readable
)
567 lNetworkEvents
|= FD_READ
;
569 if (it
->flags
& Writable
)
571 lNetworkEvents
|= FD_WRITE
;
573 if (it
->flags
& Acceptable
)
575 lNetworkEvents
|= FD_ACCEPT
;
577 if (it
->flags
& Closed
)
579 lNetworkEvents
|= FD_CLOSE
;
581 auto eventIt
= m_events
.begin() + std::distance(m_sockets
.begin(), it
);
582 ::WSAEventSelect(socket
, *eventIt
, lNetworkEvents
);
586 if (it
->flags
& Readable
)
590 if (it
->flags
& Writable
)
594 if (it
->flags
& Acceptable
)
598 // if (it->flags & Closed) - always handled (EPOLLERR | EPOLLHUP)
599 epoll_event event
= {};
600 event
.data
.fd
= socket
;
601 event
.events
= events
;
602 ::epoll_ctl(m_epollFd
, EPOLL_CTL_MOD
, socket
, &event
);
605 // TODO: [MG] - Mac OS X socket doesn't currently support updating flags
614 /// <param name="socket"></param>
615 void removeSocket(const Socket
&socket
)
617 LOG_TRACE("Reactor: Removing socket 0x%x", static_cast<int>(socket
));
618 auto it
= std::find(m_sockets
.begin(), m_sockets
.end(), socket
);
619 if (it
!= m_sockets
.end())
622 auto eventIt
= m_events
.begin() + std::distance(m_sockets
.begin(), it
);
623 ::WSAEventSelect(it
->socket
, *eventIt
, 0);
624 ::WSACloseEvent(*eventIt
);
625 m_events
.erase(eventIt
);
628 ::epoll_ctl(m_epollFd
, EPOLL_CTL_DEL
, socket
, nullptr);
632 bzero(&event
, sizeof(event
));
633 event
.ident
= socket
;
634 EV_SET(&event
, socket
, EVFILT_READ
, EV_DELETE
, 0, 0, NULL
);
635 if (-1 == kevent(kq
, &event
, 1, NULL
, 0, NULL
))
637 //// Already removed?
638 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event
.ident
);
640 EV_SET(&event
, socket
, EVFILT_WRITE
, EV_DELETE
, 0, 0, NULL
);
641 if (-1 == kevent(kq
, &event
, 1, NULL
, 0, NULL
))
643 //// Already removed?
644 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event
.ident
);
656 LOG_INFO("Reactor: Starting...");
665 LOG_INFO("Reactor: Stopping...");
668 for (auto &hEvent
: m_events
)
670 ::WSACloseEvent(hEvent
);
672 #else /* Linux and Mac */
673 for (auto &sd
: m_sockets
)
676 ::epoll_ctl(m_epollFd
, EPOLL_CTL_DEL
, sd
.socket
, nullptr);
678 # ifdef TARGET_OS_MAC
680 bzero(&event
, sizeof(event
));
681 event
.ident
= sd
.socket
;
682 EV_SET(&event
, sd
.socket
, EVFILT_READ
, EV_DELETE
, 0, 0, NULL
);
683 if (-1 == kevent(kq
, &event
, 1, NULL
, 0, NULL
))
685 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event
.ident
);
687 EV_SET(&event
, sd
.socket
, EVFILT_WRITE
, EV_DELETE
, 0, 0, NULL
);
688 if (-1 == kevent(kq
, &event
, 1, NULL
, 0, NULL
))
690 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event
.ident
);
699 /// Thread Loop for async events processing
701 virtual void onThread() override
703 LOG_INFO("Reactor: Thread started");
704 while (!shouldTerminate())
707 DWORD dwResult
= ::WSAWaitForMultipleEvents(static_cast<DWORD
>(m_events
.size()),
708 m_events
.data(), FALSE
, 500, FALSE
);
709 if (dwResult
== WSA_WAIT_TIMEOUT
)
714 assert(dwResult
<= WSA_WAIT_EVENT_0
+ m_events
.size());
715 int index
= dwResult
- WSA_WAIT_EVENT_0
;
716 Socket socket
= m_sockets
[index
].socket
;
717 int flags
= m_sockets
[index
].flags
;
720 ::WSAEnumNetworkEvents(socket
, m_events
[index
], &ne
);
722 "Reactor: Handling socket 0x%x (index %d) with active flags 0x%x "
724 static_cast<int>(socket
), index
, ne
.lNetworkEvents
, flags
);
726 if ((flags
& Readable
) && (ne
.lNetworkEvents
& FD_READ
))
728 m_callback
.onSocketReadable(socket
);
730 if ((flags
& Writable
) && (ne
.lNetworkEvents
& FD_WRITE
))
732 m_callback
.onSocketWritable(socket
);
734 if ((flags
& Acceptable
) && (ne
.lNetworkEvents
& FD_ACCEPT
))
736 m_callback
.onSocketAcceptable(socket
);
738 if ((flags
& Closed
) && (ne
.lNetworkEvents
& FD_CLOSE
))
740 m_callback
.onSocketClosed(socket
);
745 epoll_event events
[4];
746 int result
= ::epoll_wait(m_epollFd
, events
, sizeof(events
) / sizeof(events
[0]), 500);
747 if (result
== 0 || (result
== -1 && errno
== EINTR
))
752 assert(result
>= 1 && static_cast<size_t>(result
) <= sizeof(events
) / sizeof(events
[0]));
753 for (int i
= 0; i
< result
; i
++)
755 auto it
= std::find(m_sockets
.begin(), m_sockets
.end(), events
[i
].data
.fd
);
756 assert(it
!= m_sockets
.end());
757 Socket socket
= it
->socket
;
758 int flags
= it
->flags
;
760 LOG_TRACE("Reactor: Handling socket 0x%x active flags 0x%x (armed 0x%x)",
761 static_cast<int>(socket
), events
[i
].events
, flags
);
763 if ((flags
& Readable
) && (events
[i
].events
& EPOLLIN
))
765 m_callback
.onSocketReadable(socket
);
767 if ((flags
& Writable
) && (events
[i
].events
& EPOLLOUT
))
769 m_callback
.onSocketWritable(socket
);
771 if ((flags
& Acceptable
) && (events
[i
].events
& EPOLLIN
))
773 m_callback
.onSocketAcceptable(socket
);
775 if ((flags
& Closed
) && (events
[i
].events
& (EPOLLHUP
| EPOLLERR
)))
777 m_callback
.onSocketClosed(socket
);
782 #if defined(TARGET_OS_MAC)
783 unsigned waitms
= 500; // never block for more than 500ms
784 struct timespec timeout
;
785 timeout
.tv_sec
= waitms
/ 1000;
786 timeout
.tv_nsec
= (waitms
% 1000) * 1000 * 1000;
788 int nev
= kevent(kq
, NULL
, 0, m_events
, KQUEUE_SIZE
, &timeout
);
789 for (int i
= 0; i
< nev
; i
++)
791 struct kevent
&event
= m_events
[i
];
792 int fd
= (int)event
.ident
;
793 auto it
= std::find(m_sockets
.begin(), m_sockets
.end(), fd
);
794 assert(it
!= m_sockets
.end());
795 Socket socket
= it
->socket
;
796 int flags
= it
->flags
;
798 LOG_TRACE("Handling socket 0x%x active flags 0x%x (armed 0x%x)", static_cast<int>(socket
),
799 event
.flags
, event
.fflags
);
801 if (event
.filter
== EVFILT_READ
)
803 if (flags
& Acceptable
)
805 m_callback
.onSocketAcceptable(socket
);
807 if (flags
& Readable
)
809 m_callback
.onSocketReadable(socket
);
814 if (event
.filter
== EVFILT_WRITE
)
816 if (flags
& Writable
)
818 m_callback
.onSocketWritable(socket
);
823 if ((event
.flags
& EV_EOF
) || (event
.flags
& EV_ERROR
))
825 LOG_TRACE("event.filter=%s", "EVFILT_WRITE");
826 m_callback
.onSocketClosed(socket
);
829 EV_SET(&kevt
, event
.ident
, EVFILT_READ
, EV_DELETE
, 0, 0, NULL
);
830 if (-1 == kevent(kq
, &kevt
, 1, NULL
, 0, NULL
))
832 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event
.ident
);
834 EV_SET(&kevt
, event
.ident
, EVFILT_WRITE
, EV_DELETE
, 0, 0, NULL
);
835 if (-1 == kevent(kq
, &kevt
, 1, NULL
, 0, NULL
))
837 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event
.ident
);
841 LOG_ERROR("Reactor: unhandled kevent!");
845 LOG_TRACE("Reactor: Thread done");
849 } // namespace SocketTools