]>
git.proxmox.com Git - mirror_frr.git/blob - lib/pullwr.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
3 * Pull-driven write event handler
4 * Copyright (C) 2019 David Lamparter
14 #define PULLWR_THRESH 16384 /* size at which we start to call write() */
15 #define PULLWR_MAXSPIN 2500 /* max µs to spend grabbing more data */
19 struct event_master
*tm
;
20 /* writer == NULL <=> we're idle */
24 void (*fill
)(void *, struct pullwr
*);
25 void (*err
)(void *, struct pullwr
*, bool);
27 /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap
28 * around if data is trickling in while keeping it at a constant size)
30 size_t bufsz
, valid
, pos
;
31 uint64_t total_written
;
34 size_t thresh
; /* PULLWR_THRESH */
35 int64_t maxspin
; /* PULLWR_MAXSPIN */
38 DEFINE_MTYPE_STATIC(LIB
, PULLWR_HEAD
, "pull-driven write controller");
39 DEFINE_MTYPE_STATIC(LIB
, PULLWR_BUF
, "pull-driven write buffer");
41 static void pullwr_run(struct event
*t
);
43 struct pullwr
*_pullwr_new(struct event_master
*tm
, int fd
, void *arg
,
44 void (*fill
)(void *, struct pullwr
*),
45 void (*err
)(void *, struct pullwr
*, bool))
47 struct pullwr
*pullwr
;
49 pullwr
= XCALLOC(MTYPE_PULLWR_HEAD
, sizeof(*pullwr
));
56 pullwr
->thresh
= PULLWR_THRESH
;
57 pullwr
->maxspin
= PULLWR_MAXSPIN
;
62 void pullwr_del(struct pullwr
*pullwr
)
64 EVENT_OFF(pullwr
->writer
);
66 XFREE(MTYPE_PULLWR_BUF
, pullwr
->buffer
);
67 XFREE(MTYPE_PULLWR_HEAD
, pullwr
);
70 void pullwr_cfg(struct pullwr
*pullwr
, int64_t max_spin_usec
,
71 size_t write_threshold
)
73 pullwr
->maxspin
= max_spin_usec
?: PULLWR_MAXSPIN
;
74 pullwr
->thresh
= write_threshold
?: PULLWR_THRESH
;
77 void pullwr_bump(struct pullwr
*pullwr
)
82 event_add_timer(pullwr
->tm
, pullwr_run
, pullwr
, 0, &pullwr
->writer
);
85 static size_t pullwr_iov(struct pullwr
*pullwr
, struct iovec
*iov
)
89 if (pullwr
->valid
== 0)
92 if (pullwr
->pos
+ pullwr
->valid
<= pullwr
->bufsz
) {
93 iov
[0].iov_base
= pullwr
->buffer
+ pullwr
->pos
;
94 iov
[0].iov_len
= pullwr
->valid
;
98 len1
= pullwr
->bufsz
- pullwr
->pos
;
100 iov
[0].iov_base
= pullwr
->buffer
+ pullwr
->pos
;
101 iov
[0].iov_len
= len1
;
102 iov
[1].iov_base
= pullwr
->buffer
;
103 iov
[1].iov_len
= pullwr
->valid
- len1
;
107 static void pullwr_resize(struct pullwr
*pullwr
, size_t need
)
110 size_t niov
, newsize
;
113 /* the buffer is maintained at pullwr->thresh * 2 since we'll be
114 * trying to fill it as long as it's anywhere below pullwr->thresh.
115 * That means we frequently end up a little short of it and then write
116 * something that goes over the threshold. So, just use double.
120 if (pullwr
->bufsz
- pullwr
->valid
>= need
)
123 newsize
= MAX((pullwr
->valid
+ need
) * 2, pullwr
->thresh
* 2);
124 newbuf
= XMALLOC(MTYPE_PULLWR_BUF
, newsize
);
125 } else if (!pullwr
->valid
) {
126 /* resize down, buffer empty */
131 if (pullwr
->bufsz
- pullwr
->valid
< pullwr
->thresh
)
133 newsize
= MAX(pullwr
->valid
, pullwr
->thresh
* 2);
134 newbuf
= XMALLOC(MTYPE_PULLWR_BUF
, newsize
);
137 niov
= pullwr_iov(pullwr
, iov
);
139 memcpy(newbuf
, iov
[0].iov_base
, iov
[0].iov_len
);
141 memcpy(newbuf
+ iov
[0].iov_len
,
142 iov
[1].iov_base
, iov
[1].iov_len
);
145 XFREE(MTYPE_PULLWR_BUF
, pullwr
->buffer
);
146 pullwr
->buffer
= newbuf
;
147 pullwr
->bufsz
= newsize
;
151 void pullwr_write(struct pullwr
*pullwr
, const void *data
, size_t len
)
153 pullwr_resize(pullwr
, len
);
155 if (pullwr
->pos
+ pullwr
->valid
> pullwr
->bufsz
) {
158 pos
= (pullwr
->pos
+ pullwr
->valid
) % pullwr
->bufsz
;
159 memcpy(pullwr
->buffer
+ pos
, data
, len
);
162 max1
= pullwr
->bufsz
- (pullwr
->pos
+ pullwr
->valid
);
163 max1
= MIN(max1
, len
);
165 memcpy(pullwr
->buffer
+ pullwr
->pos
+ pullwr
->valid
,
170 memcpy(pullwr
->buffer
, (char *)data
+ max1
, len1
);
173 pullwr
->valid
+= len
;
178 static void pullwr_run(struct event
*t
)
180 struct pullwr
*pullwr
= EVENT_ARG(t
);
182 size_t niov
, lastvalid
;
185 bool maxspun
= false;
190 lastvalid
= pullwr
->valid
- 1;
191 while (pullwr
->valid
< pullwr
->thresh
192 && pullwr
->valid
!= lastvalid
194 lastvalid
= pullwr
->valid
;
195 pullwr
->fill(pullwr
->arg
, pullwr
);
197 /* check after doing at least one fill() call so we
198 * don't spin without making progress on slow boxes
200 if (!maxspun
&& monotime_since(&t0
, NULL
)
205 if (pullwr
->valid
== 0) {
206 /* we made a fill() call above that didn't feed any
207 * data in, and we have nothing more queued, so we go
208 * into idle, i.e. no calling event_add_write()
210 pullwr_resize(pullwr
, 0);
214 niov
= pullwr_iov(pullwr
, iov
);
217 nwr
= writev(pullwr
->fd
, iov
, niov
);
219 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
221 pullwr
->err(pullwr
->arg
, pullwr
, false);
226 pullwr
->err(pullwr
->arg
, pullwr
, true);
230 pullwr
->total_written
+= nwr
;
231 pullwr
->valid
-= nwr
;
233 pullwr
->pos
%= pullwr
->bufsz
;
234 } while (pullwr
->valid
== 0 && !maxspun
);
235 /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket
236 * is full and we go wait until it's available for writing again.
239 event_add_write(pullwr
->tm
, pullwr_run
, pullwr
, pullwr
->fd
,
242 /* if we hit the time limit, just keep the buffer, we'll probably need
243 * it anyway & another run is already coming up.
246 pullwr_resize(pullwr
, 0);
249 void pullwr_stats(struct pullwr
*pullwr
, uint64_t *total_written
,
250 size_t *pending
, size_t *kernel_pending
)
254 *total_written
= pullwr
->total_written
;
255 *pending
= pullwr
->valid
;
257 if (ioctl(pullwr
->fd
, TIOCOUTQ
, &tmp
) != 0)
259 *kernel_pending
= tmp
;