]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/EventPoll.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / msg / async / EventPoll.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2022 Rafael Lopez <rafael.lopez@softiron.com>
7 *
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "common/errno.h"
17 #include "EventPoll.h"
18
19 #include <unistd.h>
20 #define dout_subsys ceph_subsys_ms
21
22 #undef dout_prefix
23 #define dout_prefix *_dout << "PollDriver."
24
25 #ifndef POLL_ADD
26 #define POLL_ADD 1
27 #ifndef POLL_MOD
28 #define POLL_MOD 2
29 #ifndef POLL_DEL
30 #define POLL_DEL 3
31 #endif
32 #endif
33 #endif
34
35 int PollDriver::init(EventCenter *c, int nevent) {
36 // pfds array will auto scale up to hard_max_pfds, which should be
37 // greater than total daemons/op_threads (todo: cfg option?)
38 hard_max_pfds = 8192;
39 // 128 seems a good starting point, cover clusters up to ~350 OSDs
40 // with default ms_async_op_threads
41 max_pfds = 128;
42
43 pfds = (POLLFD*)calloc(max_pfds, sizeof(POLLFD));
44 if (!pfds) {
45 lderr(cct) << __func__ << " unable to allocate memory " << dendl;
46 return -ENOMEM;
47 }
48
49 //initialise pfds
50 for(int i = 0; i < max_pfds; i++){
51 pfds[i].fd = -1;
52 pfds[i].events = 0;
53 pfds[i].revents = 0;
54 }
55 return 0;
56 }
57
58 // Helper func to register/unregister interest in a FD's events by
59 // manipulating it's entry in pfds array
60 int PollDriver::poll_ctl(int fd, int op, int events) {
61 int pos = 0;
62 if (op == POLL_ADD) {
63 // Find an empty pollfd slot
64 for(pos = 0; pos < max_pfds ; pos++){
65 if(pfds[pos].fd == -1){
66 pfds[pos].fd = fd;
67 pfds[pos].events = events;
68 pfds[pos].revents = 0;
69 return 0;
70 }
71 }
72 // We ran out of slots, try to increase
73 if (max_pfds < hard_max_pfds) {
74 ldout(cct, 10) << __func__ << " exhausted pollfd slots"
75 << ", doubling to " << max_pfds*2 << dendl;
76 pfds = (POLLFD*)realloc(pfds, max_pfds*2*sizeof(POLLFD));
77 if (!pfds) {
78 lderr(cct) << __func__ << " unable to realloc for more pollfd slots"
79 << dendl;
80 return -ENOMEM;
81 }
82 // Initialise new slots
83 for (int i = max_pfds ; i < max_pfds*2 ; i++){
84 pfds[i].fd = -1;
85 pfds[i].events = 0;
86 pfds[i].revents = 0;
87 }
88 max_pfds = max_pfds*2;
89 pfds[pos].fd = fd;
90 pfds[pos].events = events;
91 pfds[pos].revents = 0;
92 return 0;
93 } else {
94 // Hit hard limit
95 lderr(cct) << __func__ << " hard limit for file descriptors per op"
96 << " thread reached (" << hard_max_pfds << ")" << dendl;
97 return -EMFILE;
98 }
99 } else if (op == POLL_MOD) {
100 for (pos = 0; pos < max_pfds; pos++ ){
101 if (pfds[pos].fd == fd) {
102 pfds[pos].events = events;
103 return 0;
104 }
105 }
106 } else if (op == POLL_DEL) {
107 for (pos = 0; pos < max_pfds; pos++ ){
108 if (pfds[pos].fd == fd) {
109 pfds[pos].fd = -1;
110 pfds[pos].events = 0;
111 return 0;
112 }
113 }
114 }
115 return 0;
116 }
117
118 int PollDriver::add_event(int fd, int cur_mask, int add_mask) {
119 ldout(cct, 10) << __func__ << " add event to fd=" << fd << " mask="
120 << add_mask << dendl;
121 int op, events = 0;
122 op = cur_mask == EVENT_NONE ? POLL_ADD: POLL_MOD;
123
124 add_mask |= cur_mask; /* Merge old events */
125 if (add_mask & EVENT_READABLE) {
126 events |= POLLIN;
127 }
128 if (add_mask & EVENT_WRITABLE) {
129 events |= POLLOUT;
130 }
131 int ret = poll_ctl(fd, op, events);
132 return ret;
133 }
134
135 int PollDriver::del_event(int fd, int cur_mask, int delmask) {
136 ldout(cct, 10) << __func__ << " del event fd=" << fd << " cur mask="
137 << cur_mask << dendl;
138 int op, events = 0;
139 int mask = cur_mask & (~delmask);
140
141 if (mask != EVENT_NONE) {
142 op = POLL_MOD;
143 if (mask & EVENT_READABLE) {
144 events |= POLLIN;
145 }
146 if (mask & EVENT_WRITABLE) {
147 events |= POLLOUT;
148 }
149 } else {
150 op = POLL_DEL;
151 }
152 poll_ctl(fd, op, events);
153 return 0;
154 }
155
156 int PollDriver::resize_events(int newsize) {
157 return 0;
158 }
159
160 int PollDriver::event_wait(std::vector<FiredFileEvent> &fired_events,
161 struct timeval *tvp) {
162 int retval, numevents = 0;
163 #ifdef _WIN32
164 retval = WSAPoll(pfds, max_pfds,
165 tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
166 #else
167 retval = poll(pfds, max_pfds,
168 tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
169 #endif
170 if (retval > 0) {
171 for (int j = 0; j < max_pfds; j++) {
172 if (pfds[j].fd != -1) {
173 int mask = 0;
174 struct FiredFileEvent fe;
175 if (pfds[j].revents & POLLIN) {
176 mask |= EVENT_READABLE;
177 }
178 if (pfds[j].revents & POLLOUT) {
179 mask |= EVENT_WRITABLE;
180 }
181 if (pfds[j].revents & POLLHUP) {
182 mask |= EVENT_READABLE | EVENT_WRITABLE;
183 }
184 if (pfds[j].revents & POLLERR) {
185 mask |= EVENT_READABLE | EVENT_WRITABLE;
186 }
187 if (mask) {
188 fe.fd = pfds[j].fd;
189 fe.mask = mask;
190 fired_events.push_back(fe);
191 numevents++;
192 }
193 }
194 }
195 }
196 return numevents;
197 }