]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/EventPoll.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2022 Rafael Lopez <rafael.lopez@softiron.com>
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #include "common/errno.h"
17 #include "EventPoll.h"
20 #define dout_subsys ceph_subsys_ms
23 #define dout_prefix *_dout << "PollDriver."
35 int PollDriver::init(EventCenter
*c
, int nevent
) {
36 // pfds array will auto scale up to hard_max_pfds, which should be
37 // greater than total daemons/op_threads (todo: cfg option?)
39 // 128 seems a good starting point, cover clusters up to ~350 OSDs
40 // with default ms_async_op_threads
43 pfds
= (POLLFD
*)calloc(max_pfds
, sizeof(POLLFD
));
45 lderr(cct
) << __func__
<< " unable to allocate memory " << dendl
;
50 for(int i
= 0; i
< max_pfds
; i
++){
58 // Helper func to register/unregister interest in a FD's events by
59 // manipulating it's entry in pfds array
60 int PollDriver::poll_ctl(int fd
, int op
, int events
) {
63 // Find an empty pollfd slot
64 for(pos
= 0; pos
< max_pfds
; pos
++){
65 if(pfds
[pos
].fd
== -1){
67 pfds
[pos
].events
= events
;
68 pfds
[pos
].revents
= 0;
72 // We ran out of slots, try to increase
73 if (max_pfds
< hard_max_pfds
) {
74 ldout(cct
, 10) << __func__
<< " exhausted pollfd slots"
75 << ", doubling to " << max_pfds
*2 << dendl
;
76 pfds
= (POLLFD
*)realloc(pfds
, max_pfds
*2*sizeof(POLLFD
));
78 lderr(cct
) << __func__
<< " unable to realloc for more pollfd slots"
82 // Initialise new slots
83 for (int i
= max_pfds
; i
< max_pfds
*2 ; i
++){
88 max_pfds
= max_pfds
*2;
90 pfds
[pos
].events
= events
;
91 pfds
[pos
].revents
= 0;
95 lderr(cct
) << __func__
<< " hard limit for file descriptors per op"
96 << " thread reached (" << hard_max_pfds
<< ")" << dendl
;
99 } else if (op
== POLL_MOD
) {
100 for (pos
= 0; pos
< max_pfds
; pos
++ ){
101 if (pfds
[pos
].fd
== fd
) {
102 pfds
[pos
].events
= events
;
106 } else if (op
== POLL_DEL
) {
107 for (pos
= 0; pos
< max_pfds
; pos
++ ){
108 if (pfds
[pos
].fd
== fd
) {
110 pfds
[pos
].events
= 0;
118 int PollDriver::add_event(int fd
, int cur_mask
, int add_mask
) {
119 ldout(cct
, 10) << __func__
<< " add event to fd=" << fd
<< " mask="
120 << add_mask
<< dendl
;
122 op
= cur_mask
== EVENT_NONE
? POLL_ADD
: POLL_MOD
;
124 add_mask
|= cur_mask
; /* Merge old events */
125 if (add_mask
& EVENT_READABLE
) {
128 if (add_mask
& EVENT_WRITABLE
) {
131 int ret
= poll_ctl(fd
, op
, events
);
135 int PollDriver::del_event(int fd
, int cur_mask
, int delmask
) {
136 ldout(cct
, 10) << __func__
<< " del event fd=" << fd
<< " cur mask="
137 << cur_mask
<< dendl
;
139 int mask
= cur_mask
& (~delmask
);
141 if (mask
!= EVENT_NONE
) {
143 if (mask
& EVENT_READABLE
) {
146 if (mask
& EVENT_WRITABLE
) {
152 poll_ctl(fd
, op
, events
);
156 int PollDriver::resize_events(int newsize
) {
160 int PollDriver::event_wait(std::vector
<FiredFileEvent
> &fired_events
,
161 struct timeval
*tvp
) {
162 int retval
, numevents
= 0;
164 retval
= WSAPoll(pfds
, max_pfds
,
165 tvp
? (tvp
->tv_sec
*1000 + tvp
->tv_usec
/1000) : -1);
167 retval
= poll(pfds
, max_pfds
,
168 tvp
? (tvp
->tv_sec
*1000 + tvp
->tv_usec
/1000) : -1);
171 for (int j
= 0; j
< max_pfds
; j
++) {
172 if (pfds
[j
].fd
!= -1) {
174 struct FiredFileEvent fe
;
175 if (pfds
[j
].revents
& POLLIN
) {
176 mask
|= EVENT_READABLE
;
178 if (pfds
[j
].revents
& POLLOUT
) {
179 mask
|= EVENT_WRITABLE
;
181 if (pfds
[j
].revents
& POLLHUP
) {
182 mask
|= EVENT_READABLE
| EVENT_WRITABLE
;
184 if (pfds
[j
].revents
& POLLERR
) {
185 mask
|= EVENT_READABLE
| EVENT_WRITABLE
;
190 fired_events
.push_back(fe
);