]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/opentelemetry-cpp/ext/include/opentelemetry/ext/http/server/socket_tools.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / jaegertracing / opentelemetry-cpp / ext / include / opentelemetry / ext / http / server / socket_tools.h
1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
3 #pragma once
4
5 #include <algorithm>
6 #include <atomic>
7 #include <cassert>
8 #include <cstddef>
9 #include <cstring>
10 #include <iostream>
11 #include <map>
12 #include <sstream>
13 #include <string>
14 #include <thread>
15 #include <vector>
16
17 #ifdef _WIN32
18
19 //# include <Windows.h>
20
21 # include <winsock2.h>
22
23 // TODO: consider NOMINMAX
24 # undef min
25 # undef max
26 # pragma comment(lib, "ws2_32.lib")
27
28 #else
29
30 # include <unistd.h>
31
32 # ifdef __linux__
33 # include <sys/epoll.h>
34 # endif
35
36 # if __APPLE__
37 # include "TargetConditionals.h"
38 // Use kqueue on mac
39 # include <sys/event.h>
40 # include <sys/time.h>
41 # include <sys/types.h>
42 # endif
43
44 // Common POSIX headers for Linux and Mac OS X
45 # include <arpa/inet.h>
46 # include <fcntl.h>
47 # include <netdb.h>
48 # include <netinet/in.h>
49 # include <netinet/tcp.h>
50 # include <sys/socket.h>
51
52 #endif
53
54 #ifndef _Out_cap_
55 # define _Out_cap_(size)
56 #endif
57
58 #if defined(HAVE_CONSOLE_LOG) && !defined(LOG_DEBUG)
59 // Log to console if there's no standard log facility defined
60 # include <cstdio>
61 # ifndef LOG_DEBUG
62 # define LOG_DEBUG(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
63 # define LOG_TRACE(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
64 # define LOG_INFO(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
65 # define LOG_WARN(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
66 # define LOG_ERROR(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__)
67 # endif
68 #endif
69
70 #ifndef LOG_DEBUG
71 // Don't log anything if there's no standard log facility defined
72 # define LOG_DEBUG(fmt_, ...)
73 # define LOG_TRACE(fmt_, ...)
74 # define LOG_INFO(fmt_, ...)
75 # define LOG_WARN(fmt_, ...)
76 # define LOG_ERROR(fmt_, ...)
77 #endif
78
79 namespace common
80 {
81
82 /// <summary>
83 /// A simple thread, derived class overloads onThread() method.
84 /// </summary>
85 struct Thread
86 {
87 std::thread m_thread;
88
89 std::atomic<bool> m_terminate{false};
90
91 /// <summary>
92 /// Thread Constructor
93 /// </summary>
94 /// <returns>Thread</returns>
95 Thread() {}
96
97 /// <summary>
98 /// Start Thread
99 /// </summary>
100 void startThread()
101 {
102 m_terminate = false;
103 m_thread = std::thread([&]() { this->onThread(); });
104 }
105
106 /// <summary>
107 /// Join Thread
108 /// </summary>
109 void joinThread()
110 {
111 m_terminate = true;
112 if (m_thread.joinable())
113 {
114 m_thread.join();
115 }
116 }
117
118 /// <summary>
119 /// Indicates if this thread should terminate
120 /// </summary>
121 /// <returns></returns>
122 bool shouldTerminate() const { return m_terminate; }
123
124 /// <summary>
125 /// Must be implemented by children
126 /// </summary>
127 virtual void onThread() = 0;
128
129 /// <summary>
130 /// Thread destructor
131 /// </summary>
132 /// <returns></returns>
133 virtual ~Thread() noexcept {}
134 };
135
136 }; // namespace common
137 namespace SocketTools
138 {
139
140 #ifdef _WIN32
141 // WinSocks need extra (de)initialization, solved by a global object here,
142 // whose constructor/destructor will be called before and after main().
143 struct WsaInitializer
144 {
145 WsaInitializer()
146 {
147 WSADATA wsaData;
148 WSAStartup(MAKEWORD(2, 2), &wsaData);
149 }
150
151 ~WsaInitializer() { WSACleanup(); }
152 };
153
154 static WsaInitializer g_wsaInitializer;
155
156 #endif
157
158 /// <summary>
159 /// Encapsulation of sockaddr(_in)
160 /// </summary>
161 struct SocketAddr
162 {
163 static u_long const Loopback = 0x7F000001;
164
165 sockaddr m_data;
166
167 /// <summary>
168 /// SocketAddr constructor
169 /// </summary>
170 /// <returns>SocketAddr</returns>
171 SocketAddr() { memset(&m_data, 0, sizeof(m_data)); }
172
173 SocketAddr(u_long addr, int port)
174 {
175 sockaddr_in &inet4 = reinterpret_cast<sockaddr_in &>(m_data);
176 inet4.sin_family = AF_INET;
177 inet4.sin_port = htons(static_cast<unsigned short>(port));
178 inet4.sin_addr.s_addr = htonl(addr);
179 }
180
181 SocketAddr(char const *addr)
182 {
183 #ifdef _WIN32
184 INT addrlen = sizeof(m_data);
185 WCHAR buf[200];
186 for (int i = 0; i < sizeof(buf) && addr[i]; i++)
187 {
188 buf[i] = addr[i];
189 }
190 buf[199] = L'\0';
191 ::WSAStringToAddressW(buf, AF_INET, nullptr, &m_data, &addrlen);
192 #else
193 sockaddr_in &inet4 = reinterpret_cast<sockaddr_in &>(m_data);
194 inet4.sin_family = AF_INET;
195 char const *colon = strchr(addr, ':');
196 if (colon)
197 {
198 inet4.sin_port = htons(atoi(colon + 1));
199 char buf[16];
200 memcpy(buf, addr, (std::min<ptrdiff_t>)(15, colon - addr));
201 buf[15] = '\0';
202 ::inet_pton(AF_INET, buf, &inet4.sin_addr);
203 }
204 else
205 {
206 inet4.sin_port = 0;
207 ::inet_pton(AF_INET, addr, &inet4.sin_addr);
208 }
209 #endif
210 }
211
212 SocketAddr(SocketAddr const &other) = default;
213
214 SocketAddr &operator=(SocketAddr const &other) = default;
215
216 operator sockaddr *() { return &m_data; }
217
218 operator const sockaddr *() const { return &m_data; }
219
220 int port() const
221 {
222 switch (m_data.sa_family)
223 {
224 case AF_INET: {
225 sockaddr_in const &inet4 = reinterpret_cast<sockaddr_in const &>(m_data);
226 return ntohs(inet4.sin_port);
227 }
228
229 default:
230 return -1;
231 }
232 }
233
234 std::string toString() const
235 {
236 std::ostringstream os;
237
238 switch (m_data.sa_family)
239 {
240 case AF_INET: {
241 sockaddr_in const &inet4 = reinterpret_cast<sockaddr_in const &>(m_data);
242 u_long addr = ntohl(inet4.sin_addr.s_addr);
243 os << (addr >> 24) << '.' << ((addr >> 16) & 255) << '.' << ((addr >> 8) & 255) << '.'
244 << (addr & 255);
245 os << ':' << ntohs(inet4.sin_port);
246 break;
247 }
248
249 default:
250 os << "[?AF?" << m_data.sa_family << ']';
251 }
252 return os.str();
253 }
254 };
255
256 /// <summary>
257 /// Encapsulation of a socket (non-exclusive ownership)
258 /// </summary>
259 struct Socket
260 {
261 #ifdef _WIN32
262 typedef SOCKET Type;
263 static Type const Invalid = INVALID_SOCKET;
264 #else
265 typedef int Type;
266 static Type const Invalid = -1;
267 #endif
268
269 Type m_sock;
270
271 Socket(Type sock = Invalid) : m_sock(sock) {}
272
273 Socket(int af, int type, int proto) { m_sock = ::socket(af, type, proto); }
274
275 ~Socket() {}
276
277 operator Socket::Type() const { return m_sock; }
278
279 bool operator==(Socket const &other) const { return (m_sock == other.m_sock); }
280
281 bool operator!=(Socket const &other) const { return (m_sock != other.m_sock); }
282
283 bool operator<(Socket const &other) const { return (m_sock < other.m_sock); }
284
285 bool invalid() const { return (m_sock == Invalid); }
286
287 void setNonBlocking()
288 {
289 assert(m_sock != Invalid);
290 #ifdef _WIN32
291 u_long value = 1;
292 ::ioctlsocket(m_sock, FIONBIO, &value);
293 #else
294 int flags = ::fcntl(m_sock, F_GETFL, 0);
295 ::fcntl(m_sock, F_SETFL, flags | O_NONBLOCK);
296 #endif
297 }
298
299 bool setReuseAddr()
300 {
301 assert(m_sock != Invalid);
302 #ifdef _WIN32
303 BOOL value = TRUE;
304 #else
305 int value = 1;
306 #endif
307 return (::setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&value),
308 sizeof(value)) == 0);
309 }
310
311 bool setNoDelay()
312 {
313 assert(m_sock != Invalid);
314 #ifdef _WIN32
315 BOOL value = TRUE;
316 #else
317 int value = 1;
318 #endif
319 return (::setsockopt(m_sock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char *>(&value),
320 sizeof(value)) == 0);
321 }
322
323 bool connect(SocketAddr const &addr)
324 {
325 assert(m_sock != Invalid);
326 return (::connect(m_sock, addr, sizeof(addr)) == 0);
327 }
328
329 void close()
330 {
331 assert(m_sock != Invalid);
332 #ifdef _WIN32
333 ::closesocket(m_sock);
334 #else
335 ::close(m_sock);
336 #endif
337 m_sock = Invalid;
338 }
339
340 int recv(_Out_cap_(size) void *buffer, unsigned size)
341 {
342 assert(m_sock != Invalid);
343 int flags = 0;
344 return static_cast<int>(::recv(m_sock, reinterpret_cast<char *>(buffer), size, flags));
345 }
346
347 int send(void const *buffer, unsigned size)
348 {
349 assert(m_sock != Invalid);
350 return static_cast<int>(::send(m_sock, reinterpret_cast<char const *>(buffer), size, 0));
351 }
352
353 bool bind(SocketAddr const &addr)
354 {
355 assert(m_sock != Invalid);
356 return (::bind(m_sock, addr, sizeof(addr)) == 0);
357 }
358
359 bool getsockname(SocketAddr &addr) const
360 {
361 assert(m_sock != Invalid);
362 #ifdef _WIN32
363 int addrlen = sizeof(addr);
364 #else
365 socklen_t addrlen = sizeof(addr);
366 #endif
367 return (::getsockname(m_sock, addr, &addrlen) == 0);
368 }
369
370 bool listen(int backlog)
371 {
372 assert(m_sock != Invalid);
373 return (::listen(m_sock, backlog) == 0);
374 }
375
376 bool accept(Socket &csock, SocketAddr &caddr)
377 {
378 assert(m_sock != Invalid);
379 #ifdef _WIN32
380 int addrlen = sizeof(caddr);
381 #else
382 socklen_t addrlen = sizeof(caddr);
383 #endif
384 csock = ::accept(m_sock, caddr, &addrlen);
385 return !csock.invalid();
386 }
387
388 bool shutdown(int how)
389 {
390 assert(m_sock != Invalid);
391 return (::shutdown(m_sock, how) == 0);
392 }
393
394 int error() const
395 {
396 #ifdef _WIN32
397 return ::WSAGetLastError();
398 #else
399 return errno;
400 #endif
401 }
402
403 enum
404 {
405 #ifdef _WIN32
406 ErrorWouldBlock = WSAEWOULDBLOCK
407 #else
408 ErrorWouldBlock = EWOULDBLOCK
409 #endif
410 };
411
412 enum
413 {
414 #ifdef _WIN32
415 ShutdownReceive = SD_RECEIVE,
416 ShutdownSend = SD_SEND,
417 ShutdownBoth = SD_BOTH
418 #else
419 ShutdownReceive = SHUT_RD,
420 ShutdownSend = SHUT_WR,
421 ShutdownBoth = SHUT_RDWR
422 #endif
423 };
424 };
425
426 /// <summary>
427 /// Socket Data
428 /// </summary>
429 struct SocketData
430 {
431 Socket socket;
432 int flags;
433
434 SocketData() : socket(), flags(0) {}
435
436 bool operator==(Socket s) { return (socket == s); }
437 };
438
439 /// <summary>
440 /// Socket Reactor
441 /// </summary>
442 struct Reactor : protected common::Thread
443 {
444 /// <summary>
445 /// Socket State callback
446 /// </summary>
447 class SocketCallback
448 {
449 public:
450 virtual void onSocketReadable(Socket sock) = 0;
451 virtual void onSocketWritable(Socket sock) = 0;
452 virtual void onSocketAcceptable(Socket sock) = 0;
453 virtual void onSocketClosed(Socket sock) = 0;
454 };
455
456 /// <summary>
457 /// Socket State
458 /// </summary>
459 enum State
460 {
461 Readable = 1,
462 Writable = 2,
463 Acceptable = 4,
464 Closed = 8
465 };
466
467 SocketCallback &m_callback;
468
469 std::vector<SocketData> m_sockets;
470
471 #ifdef _WIN32
472 /* use WinSock events on Windows */
473 std::vector<WSAEVENT> m_events{};
474 #endif
475
476 #ifdef __linux__
477 /* use epoll on Linux */
478 int m_epollFd;
479 #endif
480
481 #ifdef TARGET_OS_MAC
482 /* use kqueue on Mac */
483 # define KQUEUE_SIZE 32
484 int kq{0};
485 struct kevent m_events[KQUEUE_SIZE];
486 #endif
487
488 public:
489 Reactor(SocketCallback &callback) : m_callback(callback)
490 {
491 #ifdef __linux__
492 # ifdef ANDROID
493 m_epollFd = ::epoll_create(0);
494 # else
495 m_epollFd = ::epoll_create1(0);
496 # endif
497 #endif
498
499 #ifdef TARGET_OS_MAC
500 bzero(&m_events[0], sizeof(m_events));
501 kq = kqueue();
502 #endif
503 }
504
505 ~Reactor()
506 {
507 #ifdef __linux__
508 ::close(m_epollFd);
509 #endif
510 #ifdef TARGET_OS_MAC
511 ::close(kq);
512 #endif
513 }
514
515 /// <summary>
516 /// Add Socket
517 /// </summary>
518 /// <param name="socket"></param>
519 /// <param name="flags"></param>
520 void addSocket(const Socket &socket, int flags)
521 {
522 if (flags == 0)
523 {
524 removeSocket(socket);
525 }
526 else
527 {
528 auto it = std::find(m_sockets.begin(), m_sockets.end(), socket);
529 if (it == m_sockets.end())
530 {
531 LOG_TRACE("Reactor: Adding socket 0x%x with flags 0x%x", static_cast<int>(socket), flags);
532 #ifdef _WIN32
533 m_events.push_back(::WSACreateEvent());
534 #endif
535 #ifdef __linux__
536 epoll_event event = {};
537 event.data.fd = socket;
538 event.events = 0;
539 ::epoll_ctl(m_epollFd, EPOLL_CTL_ADD, socket, &event);
540 #endif
541 #ifdef TARGET_OS_MAC
542 struct kevent event;
543 bzero(&event, sizeof(event));
544 event.ident = socket.m_sock;
545 EV_SET(&event, event.ident, EVFILT_READ, EV_ADD, 0, 0, NULL);
546 kevent(kq, &event, 1, NULL, 0, NULL);
547 EV_SET(&event, event.ident, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
548 kevent(kq, &event, 1, NULL, 0, NULL);
549 #endif
550 m_sockets.push_back(SocketData());
551 m_sockets.back().socket = socket;
552 m_sockets.back().flags = 0;
553 it = m_sockets.end() - 1;
554 }
555 else
556 {
557 LOG_TRACE("Reactor: Updating socket 0x%x with flags 0x%x", static_cast<int>(socket), flags);
558 }
559
560 if (it->flags != flags)
561 {
562 it->flags = flags;
563 #ifdef _WIN32
564 long lNetworkEvents = 0;
565 if (it->flags & Readable)
566 {
567 lNetworkEvents |= FD_READ;
568 }
569 if (it->flags & Writable)
570 {
571 lNetworkEvents |= FD_WRITE;
572 }
573 if (it->flags & Acceptable)
574 {
575 lNetworkEvents |= FD_ACCEPT;
576 }
577 if (it->flags & Closed)
578 {
579 lNetworkEvents |= FD_CLOSE;
580 }
581 auto eventIt = m_events.begin() + std::distance(m_sockets.begin(), it);
582 ::WSAEventSelect(socket, *eventIt, lNetworkEvents);
583 #endif
584 #ifdef __linux__
585 int events = 0;
586 if (it->flags & Readable)
587 {
588 events |= EPOLLIN;
589 }
590 if (it->flags & Writable)
591 {
592 events |= EPOLLOUT;
593 }
594 if (it->flags & Acceptable)
595 {
596 events |= EPOLLIN;
597 }
598 // if (it->flags & Closed) - always handled (EPOLLERR | EPOLLHUP)
599 epoll_event event = {};
600 event.data.fd = socket;
601 event.events = events;
602 ::epoll_ctl(m_epollFd, EPOLL_CTL_MOD, socket, &event);
603 #endif
604 #ifdef TARGET_OS_MAC
605 // TODO: [MG] - Mac OS X socket doesn't currently support updating flags
606 #endif
607 }
608 }
609 }
610
611 /// <summary>
612 /// Remove Socket
613 /// </summary>
614 /// <param name="socket"></param>
615 void removeSocket(const Socket &socket)
616 {
617 LOG_TRACE("Reactor: Removing socket 0x%x", static_cast<int>(socket));
618 auto it = std::find(m_sockets.begin(), m_sockets.end(), socket);
619 if (it != m_sockets.end())
620 {
621 #ifdef _WIN32
622 auto eventIt = m_events.begin() + std::distance(m_sockets.begin(), it);
623 ::WSAEventSelect(it->socket, *eventIt, 0);
624 ::WSACloseEvent(*eventIt);
625 m_events.erase(eventIt);
626 #endif
627 #ifdef __linux__
628 ::epoll_ctl(m_epollFd, EPOLL_CTL_DEL, socket, nullptr);
629 #endif
630 #ifdef TARGET_OS_MAC
631 struct kevent event;
632 bzero(&event, sizeof(event));
633 event.ident = socket;
634 EV_SET(&event, socket, EVFILT_READ, EV_DELETE, 0, 0, NULL);
635 if (-1 == kevent(kq, &event, 1, NULL, 0, NULL))
636 {
637 //// Already removed?
638 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident);
639 }
640 EV_SET(&event, socket, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
641 if (-1 == kevent(kq, &event, 1, NULL, 0, NULL))
642 {
643 //// Already removed?
644 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident);
645 }
646 #endif
647 m_sockets.erase(it);
648 }
649 }
650
651 /// <summary>
652 /// Start server
653 /// </summary>
654 void start()
655 {
656 LOG_INFO("Reactor: Starting...");
657 startThread();
658 }
659
660 /// <summary>
661 /// Stop server
662 /// </summary>
663 void stop()
664 {
665 LOG_INFO("Reactor: Stopping...");
666 joinThread();
667 #ifdef _WIN32
668 for (auto &hEvent : m_events)
669 {
670 ::WSACloseEvent(hEvent);
671 }
672 #else /* Linux and Mac */
673 for (auto &sd : m_sockets)
674 {
675 # ifdef __linux__
676 ::epoll_ctl(m_epollFd, EPOLL_CTL_DEL, sd.socket, nullptr);
677 # endif
678 # ifdef TARGET_OS_MAC
679 struct kevent event;
680 bzero(&event, sizeof(event));
681 event.ident = sd.socket;
682 EV_SET(&event, sd.socket, EVFILT_READ, EV_DELETE, 0, 0, NULL);
683 if (-1 == kevent(kq, &event, 1, NULL, 0, NULL))
684 {
685 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident);
686 }
687 EV_SET(&event, sd.socket, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
688 if (-1 == kevent(kq, &event, 1, NULL, 0, NULL))
689 {
690 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident);
691 }
692 # endif
693 }
694 #endif
695 m_sockets.clear();
696 }
697
698 /// <summary>
699 /// Thread Loop for async events processing
700 /// </summary>
701 virtual void onThread() override
702 {
703 LOG_INFO("Reactor: Thread started");
704 while (!shouldTerminate())
705 {
706 #ifdef _WIN32
707 DWORD dwResult = ::WSAWaitForMultipleEvents(static_cast<DWORD>(m_events.size()),
708 m_events.data(), FALSE, 500, FALSE);
709 if (dwResult == WSA_WAIT_TIMEOUT)
710 {
711 continue;
712 }
713
714 assert(dwResult <= WSA_WAIT_EVENT_0 + m_events.size());
715 int index = dwResult - WSA_WAIT_EVENT_0;
716 Socket socket = m_sockets[index].socket;
717 int flags = m_sockets[index].flags;
718
719 WSANETWORKEVENTS ne;
720 ::WSAEnumNetworkEvents(socket, m_events[index], &ne);
721 LOG_TRACE(
722 "Reactor: Handling socket 0x%x (index %d) with active flags 0x%x "
723 "(armed 0x%x)",
724 static_cast<int>(socket), index, ne.lNetworkEvents, flags);
725
726 if ((flags & Readable) && (ne.lNetworkEvents & FD_READ))
727 {
728 m_callback.onSocketReadable(socket);
729 }
730 if ((flags & Writable) && (ne.lNetworkEvents & FD_WRITE))
731 {
732 m_callback.onSocketWritable(socket);
733 }
734 if ((flags & Acceptable) && (ne.lNetworkEvents & FD_ACCEPT))
735 {
736 m_callback.onSocketAcceptable(socket);
737 }
738 if ((flags & Closed) && (ne.lNetworkEvents & FD_CLOSE))
739 {
740 m_callback.onSocketClosed(socket);
741 }
742 #endif
743
744 #ifdef __linux__
745 epoll_event events[4];
746 int result = ::epoll_wait(m_epollFd, events, sizeof(events) / sizeof(events[0]), 500);
747 if (result == 0 || (result == -1 && errno == EINTR))
748 {
749 continue;
750 }
751
752 assert(result >= 1 && static_cast<size_t>(result) <= sizeof(events) / sizeof(events[0]));
753 for (int i = 0; i < result; i++)
754 {
755 auto it = std::find(m_sockets.begin(), m_sockets.end(), events[i].data.fd);
756 assert(it != m_sockets.end());
757 Socket socket = it->socket;
758 int flags = it->flags;
759
760 LOG_TRACE("Reactor: Handling socket 0x%x active flags 0x%x (armed 0x%x)",
761 static_cast<int>(socket), events[i].events, flags);
762
763 if ((flags & Readable) && (events[i].events & EPOLLIN))
764 {
765 m_callback.onSocketReadable(socket);
766 }
767 if ((flags & Writable) && (events[i].events & EPOLLOUT))
768 {
769 m_callback.onSocketWritable(socket);
770 }
771 if ((flags & Acceptable) && (events[i].events & EPOLLIN))
772 {
773 m_callback.onSocketAcceptable(socket);
774 }
775 if ((flags & Closed) && (events[i].events & (EPOLLHUP | EPOLLERR)))
776 {
777 m_callback.onSocketClosed(socket);
778 }
779 }
780 #endif
781
782 #if defined(TARGET_OS_MAC)
783 unsigned waitms = 500; // never block for more than 500ms
784 struct timespec timeout;
785 timeout.tv_sec = waitms / 1000;
786 timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
787
788 int nev = kevent(kq, NULL, 0, m_events, KQUEUE_SIZE, &timeout);
789 for (int i = 0; i < nev; i++)
790 {
791 struct kevent &event = m_events[i];
792 int fd = (int)event.ident;
793 auto it = std::find(m_sockets.begin(), m_sockets.end(), fd);
794 assert(it != m_sockets.end());
795 Socket socket = it->socket;
796 int flags = it->flags;
797
798 LOG_TRACE("Handling socket 0x%x active flags 0x%x (armed 0x%x)", static_cast<int>(socket),
799 event.flags, event.fflags);
800
801 if (event.filter == EVFILT_READ)
802 {
803 if (flags & Acceptable)
804 {
805 m_callback.onSocketAcceptable(socket);
806 }
807 if (flags & Readable)
808 {
809 m_callback.onSocketReadable(socket);
810 }
811 continue;
812 }
813
814 if (event.filter == EVFILT_WRITE)
815 {
816 if (flags & Writable)
817 {
818 m_callback.onSocketWritable(socket);
819 }
820 continue;
821 }
822
823 if ((event.flags & EV_EOF) || (event.flags & EV_ERROR))
824 {
825 LOG_TRACE("event.filter=%s", "EVFILT_WRITE");
826 m_callback.onSocketClosed(socket);
827 it->flags = Closed;
828 struct kevent kevt;
829 EV_SET(&kevt, event.ident, EVFILT_READ, EV_DELETE, 0, 0, NULL);
830 if (-1 == kevent(kq, &kevt, 1, NULL, 0, NULL))
831 {
832 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident);
833 }
834 EV_SET(&kevt, event.ident, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
835 if (-1 == kevent(kq, &kevt, 1, NULL, 0, NULL))
836 {
837 LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident);
838 }
839 continue;
840 }
841 LOG_ERROR("Reactor: unhandled kevent!");
842 }
843 #endif
844 }
845 LOG_TRACE("Reactor: Thread done");
846 }
847 };
848
849 } // namespace SocketTools