]>
git.proxmox.com Git - mirror_frr.git/blob - lib/pullwr.c
2 * Pull-driven write event handler
3 * Copyright (C) 2019 David Lamparter
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the Free
7 * Software Foundation; either version 2 of the License, or (at your option)
10 * This program is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
15 * You should have received a copy of the GNU General Public License along
16 * with this program; see the file COPYING; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
27 #define PULLWR_THRESH 16384 /* size at which we start to call write() */
28 #define PULLWR_MAXSPIN 2500 /* max µs to spend grabbing more data */
32 struct thread_master
*tm
;
33 /* writer == NULL <=> we're idle */
34 struct thread
*writer
;
37 void (*fill
)(void *, struct pullwr
*);
38 void (*err
)(void *, struct pullwr
*, bool);
40 /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap
41 * around if data is trickling in while keeping it at a constant size)
43 size_t bufsz
, valid
, pos
;
44 uint64_t total_written
;
47 size_t thresh
; /* PULLWR_THRESH */
48 int64_t maxspin
; /* PULLWR_MAXSPIN */
51 DEFINE_MTYPE_STATIC(LIB
, PULLWR_HEAD
, "pull-driven write controller")
52 DEFINE_MTYPE_STATIC(LIB
, PULLWR_BUF
, "pull-driven write buffer")
54 static int pullwr_run(struct thread
*t
);
56 struct pullwr
*_pullwr_new(struct thread_master
*tm
, int fd
,
58 void (*fill
)(void *, struct pullwr
*),
59 void (*err
)(void *, struct pullwr
*, bool))
61 struct pullwr
*pullwr
;
63 pullwr
= XCALLOC(MTYPE_PULLWR_HEAD
, sizeof(*pullwr
));
70 pullwr
->thresh
= PULLWR_THRESH
;
71 pullwr
->maxspin
= PULLWR_MAXSPIN
;
76 void pullwr_del(struct pullwr
*pullwr
)
78 THREAD_OFF(pullwr
->writer
);
80 XFREE(MTYPE_PULLWR_BUF
, pullwr
->buffer
);
81 XFREE(MTYPE_PULLWR_HEAD
, pullwr
);
84 void pullwr_cfg(struct pullwr
*pullwr
, int64_t max_spin_usec
,
85 size_t write_threshold
)
87 pullwr
->maxspin
= max_spin_usec
?: PULLWR_MAXSPIN
;
88 pullwr
->thresh
= write_threshold
?: PULLWR_THRESH
;
91 void pullwr_bump(struct pullwr
*pullwr
)
96 thread_add_timer(pullwr
->tm
, pullwr_run
, pullwr
, 0, &pullwr
->writer
);
99 static size_t pullwr_iov(struct pullwr
*pullwr
, struct iovec
*iov
)
103 if (pullwr
->valid
== 0)
106 if (pullwr
->pos
+ pullwr
->valid
<= pullwr
->bufsz
) {
107 iov
[0].iov_base
= pullwr
->buffer
+ pullwr
->pos
;
108 iov
[0].iov_len
= pullwr
->valid
;
112 len1
= pullwr
->bufsz
- pullwr
->pos
;
114 iov
[0].iov_base
= pullwr
->buffer
+ pullwr
->pos
;
115 iov
[0].iov_len
= len1
;
116 iov
[1].iov_base
= pullwr
->buffer
;
117 iov
[1].iov_len
= pullwr
->valid
- len1
;
121 static void pullwr_resize(struct pullwr
*pullwr
, size_t need
)
124 size_t niov
, newsize
;
127 /* the buffer is maintained at pullwr->thresh * 2 since we'll be
128 * trying to fill it as long as it's anywhere below pullwr->thresh.
129 * That means we frequently end up a little short of it and then write
130 * something that goes over the threshold. So, just use double.
134 if (pullwr
->bufsz
- pullwr
->valid
>= need
)
137 newsize
= MAX((pullwr
->valid
+ need
) * 2, pullwr
->thresh
* 2);
138 newbuf
= XMALLOC(MTYPE_PULLWR_BUF
, newsize
);
139 } else if (!pullwr
->valid
) {
140 /* resize down, buffer empty */
145 if (pullwr
->bufsz
- pullwr
->valid
< pullwr
->thresh
)
147 newsize
= MAX(pullwr
->valid
, pullwr
->thresh
* 2);
148 newbuf
= XMALLOC(MTYPE_PULLWR_BUF
, newsize
);
151 niov
= pullwr_iov(pullwr
, iov
);
153 memcpy(newbuf
, iov
[0].iov_base
, iov
[0].iov_len
);
155 memcpy(newbuf
+ iov
[0].iov_len
,
156 iov
[1].iov_base
, iov
[1].iov_len
);
159 XFREE(MTYPE_PULLWR_BUF
, pullwr
->buffer
);
160 pullwr
->buffer
= newbuf
;
161 pullwr
->bufsz
= newsize
;
165 void pullwr_write(struct pullwr
*pullwr
, const void *data
, size_t len
)
167 pullwr_resize(pullwr
, len
);
169 if (pullwr
->pos
+ pullwr
->valid
> pullwr
->bufsz
) {
172 pos
= (pullwr
->pos
+ pullwr
->valid
) % pullwr
->bufsz
;
173 memcpy(pullwr
->buffer
+ pos
, data
, len
);
176 max1
= pullwr
->bufsz
- (pullwr
->pos
+ pullwr
->valid
);
177 max1
= MIN(max1
, len
);
179 memcpy(pullwr
->buffer
+ pullwr
->pos
+ pullwr
->valid
,
184 memcpy(pullwr
->buffer
, (char *)data
+ max1
, len1
);
187 pullwr
->valid
+= len
;
192 static int pullwr_run(struct thread
*t
)
194 struct pullwr
*pullwr
= THREAD_ARG(t
);
196 size_t niov
, lastvalid
;
199 bool maxspun
= false;
204 lastvalid
= pullwr
->valid
- 1;
205 while (pullwr
->valid
< pullwr
->thresh
206 && pullwr
->valid
!= lastvalid
208 lastvalid
= pullwr
->valid
;
209 pullwr
->fill(pullwr
->arg
, pullwr
);
211 /* check after doing at least one fill() call so we
212 * don't spin without making progress on slow boxes
214 if (!maxspun
&& monotime_since(&t0
, NULL
)
219 if (pullwr
->valid
== 0) {
220 /* we made a fill() call above that didn't feed any
221 * data in, and we have nothing more queued, so we go
222 * into idle, i.e. no calling thread_add_write()
224 pullwr_resize(pullwr
, 0);
228 niov
= pullwr_iov(pullwr
, iov
);
231 nwr
= writev(pullwr
->fd
, iov
, niov
);
233 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
235 pullwr
->err(pullwr
->arg
, pullwr
, false);
240 pullwr
->err(pullwr
->arg
, pullwr
, true);
244 pullwr
->total_written
+= nwr
;
245 pullwr
->valid
-= nwr
;
247 pullwr
->pos
%= pullwr
->bufsz
;
248 } while (pullwr
->valid
== 0 && !maxspun
);
249 /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket
250 * is full and we go wait until it's available for writing again.
253 thread_add_write(pullwr
->tm
, pullwr_run
, pullwr
, pullwr
->fd
,
256 /* if we hit the time limit, just keep the buffer, we'll probably need
257 * it anyway & another run is already coming up.
260 pullwr_resize(pullwr
, 0);
264 void pullwr_stats(struct pullwr
*pullwr
, uint64_t *total_written
,
265 size_t *pending
, size_t *kernel_pending
)
269 *total_written
= pullwr
->total_written
;
270 *pending
= pullwr
->valid
;
272 if (ioctl(pullwr
->fd
, TIOCOUTQ
, &tmp
) != 0)
274 *kernel_pending
= tmp
;