]>
Commit | Line | Data |
---|---|---|
acddc0ed | 1 | // SPDX-License-Identifier: GPL-2.0-or-later |
5c52c06c DL |
2 | /* |
3 | * Pull-driven write event handler | |
4 | * Copyright (C) 2019 David Lamparter | |
5c52c06c DL |
5 | */ |
6 | ||
7 | #include "zebra.h" | |
8 | ||
9 | #include "pullwr.h" | |
10 | #include "memory.h" | |
11 | #include "monotime.h" | |
12 | ||
13 | /* defaults */ | |
14 | #define PULLWR_THRESH 16384 /* size at which we start to call write() */ | |
15 | #define PULLWR_MAXSPIN 2500 /* max µs to spend grabbing more data */ | |
16 | ||
17 | struct pullwr { | |
18 | int fd; | |
19 | struct thread_master *tm; | |
20 | /* writer == NULL <=> we're idle */ | |
21 | struct thread *writer; | |
22 | ||
23 | void *arg; | |
24 | void (*fill)(void *, struct pullwr *); | |
25 | void (*err)(void *, struct pullwr *, bool); | |
26 | ||
27 | /* ring buffer (although it's "un-ringed" on resizing, it WILL wrap | |
28 | * around if data is trickling in while keeping it at a constant size) | |
29 | */ | |
30 | size_t bufsz, valid, pos; | |
31 | uint64_t total_written; | |
32 | char *buffer; | |
33 | ||
34 | size_t thresh; /* PULLWR_THRESH */ | |
35 | int64_t maxspin; /* PULLWR_MAXSPIN */ | |
36 | }; | |
37 | ||
bf8d3d6a DL |
38 | DEFINE_MTYPE_STATIC(LIB, PULLWR_HEAD, "pull-driven write controller"); |
39 | DEFINE_MTYPE_STATIC(LIB, PULLWR_BUF, "pull-driven write buffer"); | |
5c52c06c | 40 | |
cc9f21da | 41 | static void pullwr_run(struct thread *t); |
5c52c06c DL |
42 | |
43 | struct pullwr *_pullwr_new(struct thread_master *tm, int fd, | |
44 | void *arg, | |
45 | void (*fill)(void *, struct pullwr *), | |
46 | void (*err)(void *, struct pullwr *, bool)) | |
47 | { | |
48 | struct pullwr *pullwr; | |
49 | ||
50 | pullwr = XCALLOC(MTYPE_PULLWR_HEAD, sizeof(*pullwr)); | |
51 | pullwr->fd = fd; | |
52 | pullwr->tm = tm; | |
53 | pullwr->arg = arg; | |
54 | pullwr->fill = fill; | |
55 | pullwr->err = err; | |
56 | ||
57 | pullwr->thresh = PULLWR_THRESH; | |
58 | pullwr->maxspin = PULLWR_MAXSPIN; | |
59 | ||
60 | return pullwr; | |
61 | } | |
62 | ||
63 | void pullwr_del(struct pullwr *pullwr) | |
64 | { | |
65 | THREAD_OFF(pullwr->writer); | |
66 | ||
67 | XFREE(MTYPE_PULLWR_BUF, pullwr->buffer); | |
68 | XFREE(MTYPE_PULLWR_HEAD, pullwr); | |
69 | } | |
70 | ||
71 | void pullwr_cfg(struct pullwr *pullwr, int64_t max_spin_usec, | |
72 | size_t write_threshold) | |
73 | { | |
74 | pullwr->maxspin = max_spin_usec ?: PULLWR_MAXSPIN; | |
75 | pullwr->thresh = write_threshold ?: PULLWR_THRESH; | |
76 | } | |
77 | ||
78 | void pullwr_bump(struct pullwr *pullwr) | |
79 | { | |
80 | if (pullwr->writer) | |
81 | return; | |
82 | ||
83 | thread_add_timer(pullwr->tm, pullwr_run, pullwr, 0, &pullwr->writer); | |
84 | } | |
85 | ||
86 | static size_t pullwr_iov(struct pullwr *pullwr, struct iovec *iov) | |
87 | { | |
88 | size_t len1; | |
89 | ||
90 | if (pullwr->valid == 0) | |
91 | return 0; | |
92 | ||
93 | if (pullwr->pos + pullwr->valid <= pullwr->bufsz) { | |
94 | iov[0].iov_base = pullwr->buffer + pullwr->pos; | |
95 | iov[0].iov_len = pullwr->valid; | |
96 | return 1; | |
97 | } | |
98 | ||
99 | len1 = pullwr->bufsz - pullwr->pos; | |
100 | ||
101 | iov[0].iov_base = pullwr->buffer + pullwr->pos; | |
102 | iov[0].iov_len = len1; | |
103 | iov[1].iov_base = pullwr->buffer; | |
104 | iov[1].iov_len = pullwr->valid - len1; | |
105 | return 2; | |
106 | } | |
107 | ||
108 | static void pullwr_resize(struct pullwr *pullwr, size_t need) | |
109 | { | |
110 | struct iovec iov[2]; | |
111 | size_t niov, newsize; | |
112 | char *newbuf; | |
113 | ||
114 | /* the buffer is maintained at pullwr->thresh * 2 since we'll be | |
115 | * trying to fill it as long as it's anywhere below pullwr->thresh. | |
116 | * That means we frequently end up a little short of it and then write | |
117 | * something that goes over the threshold. So, just use double. | |
118 | */ | |
119 | if (need) { | |
120 | /* resize up */ | |
121 | if (pullwr->bufsz - pullwr->valid >= need) | |
122 | return; | |
123 | ||
124 | newsize = MAX((pullwr->valid + need) * 2, pullwr->thresh * 2); | |
125 | newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize); | |
126 | } else if (!pullwr->valid) { | |
127 | /* resize down, buffer empty */ | |
128 | newsize = 0; | |
129 | newbuf = NULL; | |
130 | } else { | |
131 | /* resize down */ | |
132 | if (pullwr->bufsz - pullwr->valid < pullwr->thresh) | |
133 | return; | |
134 | newsize = MAX(pullwr->valid, pullwr->thresh * 2); | |
135 | newbuf = XMALLOC(MTYPE_PULLWR_BUF, newsize); | |
136 | } | |
137 | ||
138 | niov = pullwr_iov(pullwr, iov); | |
139 | if (niov >= 1) { | |
140 | memcpy(newbuf, iov[0].iov_base, iov[0].iov_len); | |
141 | if (niov >= 2) | |
142 | memcpy(newbuf + iov[0].iov_len, | |
143 | iov[1].iov_base, iov[1].iov_len); | |
144 | } | |
145 | ||
146 | XFREE(MTYPE_PULLWR_BUF, pullwr->buffer); | |
147 | pullwr->buffer = newbuf; | |
148 | pullwr->bufsz = newsize; | |
149 | pullwr->pos = 0; | |
150 | } | |
151 | ||
152 | void pullwr_write(struct pullwr *pullwr, const void *data, size_t len) | |
153 | { | |
154 | pullwr_resize(pullwr, len); | |
155 | ||
156 | if (pullwr->pos + pullwr->valid > pullwr->bufsz) { | |
157 | size_t pos; | |
158 | ||
159 | pos = (pullwr->pos + pullwr->valid) % pullwr->bufsz; | |
160 | memcpy(pullwr->buffer + pos, data, len); | |
161 | } else { | |
162 | size_t max1, len1; | |
163 | max1 = pullwr->bufsz - (pullwr->pos + pullwr->valid); | |
164 | max1 = MIN(max1, len); | |
165 | ||
166 | memcpy(pullwr->buffer + pullwr->pos + pullwr->valid, | |
167 | data, max1); | |
168 | len1 = len - max1; | |
169 | ||
170 | if (len1) | |
171 | memcpy(pullwr->buffer, (char *)data + max1, len1); | |
172 | ||
173 | } | |
174 | pullwr->valid += len; | |
175 | ||
176 | pullwr_bump(pullwr); | |
177 | } | |
178 | ||
cc9f21da | 179 | static void pullwr_run(struct thread *t) |
5c52c06c DL |
180 | { |
181 | struct pullwr *pullwr = THREAD_ARG(t); | |
182 | struct iovec iov[2]; | |
183 | size_t niov, lastvalid; | |
184 | ssize_t nwr; | |
185 | struct timeval t0; | |
186 | bool maxspun = false; | |
187 | ||
188 | monotime(&t0); | |
189 | ||
190 | do { | |
191 | lastvalid = pullwr->valid - 1; | |
192 | while (pullwr->valid < pullwr->thresh | |
193 | && pullwr->valid != lastvalid | |
194 | && !maxspun) { | |
195 | lastvalid = pullwr->valid; | |
196 | pullwr->fill(pullwr->arg, pullwr); | |
197 | ||
198 | /* check after doing at least one fill() call so we | |
199 | * don't spin without making progress on slow boxes | |
200 | */ | |
201 | if (!maxspun && monotime_since(&t0, NULL) | |
202 | >= pullwr->maxspin) | |
203 | maxspun = true; | |
204 | } | |
205 | ||
206 | if (pullwr->valid == 0) { | |
207 | /* we made a fill() call above that didn't feed any | |
208 | * data in, and we have nothing more queued, so we go | |
209 | * into idle, i.e. no calling thread_add_write() | |
210 | */ | |
211 | pullwr_resize(pullwr, 0); | |
cc9f21da | 212 | return; |
5c52c06c DL |
213 | } |
214 | ||
215 | niov = pullwr_iov(pullwr, iov); | |
216 | assert(niov); | |
217 | ||
218 | nwr = writev(pullwr->fd, iov, niov); | |
219 | if (nwr < 0) { | |
220 | if (errno == EAGAIN || errno == EWOULDBLOCK) | |
221 | break; | |
222 | pullwr->err(pullwr->arg, pullwr, false); | |
cc9f21da | 223 | return; |
5c52c06c DL |
224 | } |
225 | ||
226 | if (nwr == 0) { | |
227 | pullwr->err(pullwr->arg, pullwr, true); | |
cc9f21da | 228 | return; |
5c52c06c DL |
229 | } |
230 | ||
231 | pullwr->total_written += nwr; | |
232 | pullwr->valid -= nwr; | |
233 | pullwr->pos += nwr; | |
234 | pullwr->pos %= pullwr->bufsz; | |
235 | } while (pullwr->valid == 0 && !maxspun); | |
236 | /* pullwr->valid != 0 implies we did an incomplete write, i.e. socket | |
237 | * is full and we go wait until it's available for writing again. | |
238 | */ | |
239 | ||
240 | thread_add_write(pullwr->tm, pullwr_run, pullwr, pullwr->fd, | |
241 | &pullwr->writer); | |
242 | ||
243 | /* if we hit the time limit, just keep the buffer, we'll probably need | |
244 | * it anyway & another run is already coming up. | |
245 | */ | |
246 | if (!maxspun) | |
247 | pullwr_resize(pullwr, 0); | |
5c52c06c DL |
248 | } |
249 | ||
250 | void pullwr_stats(struct pullwr *pullwr, uint64_t *total_written, | |
251 | size_t *pending, size_t *kernel_pending) | |
252 | { | |
253 | int tmp; | |
254 | ||
255 | *total_written = pullwr->total_written; | |
256 | *pending = pullwr->valid; | |
257 | ||
258 | if (ioctl(pullwr->fd, TIOCOUTQ, &tmp) != 0) | |
259 | tmp = 0; | |
260 | *kernel_pending = tmp; | |
261 | } |