]>
Commit | Line | Data |
---|---|---|
acddc0ed | 1 | // SPDX-License-Identifier: GPL-2.0-or-later |
d62a17ae | 2 | /* |
354d119a | 3 | * Quagga Work Queue Support. |
4 | * | |
5 | * Copyright (C) 2005 Sun Microsystems, Inc. | |
354d119a | 6 | */ |
7 | ||
7a2fbbf0 | 8 | #include <zebra.h> |
24a58196 | 9 | #include "frrevent.h" |
354d119a | 10 | #include "memory.h" |
11 | #include "workqueue.h" | |
12 | #include "linklist.h" | |
13 | #include "command.h" | |
14 | #include "log.h" | |
15 | ||
bf8d3d6a DL |
16 | DEFINE_MTYPE(LIB, WORK_QUEUE, "Work queue"); |
17 | DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_ITEM, "Work queue item"); | |
18 | DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_NAME, "Work queue name string"); | |
4a1ab8e4 | 19 | |
354d119a | 20 | /* master list of work_queues */ |
24873f0c | 21 | static struct list _work_queues; |
1f9a9fff | 22 | /* pointer primarily to avoid an otherwise harmless warning on |
24873f0c DS |
23 | * ALL_LIST_ELEMENTS_RO |
24 | */ | |
25 | static struct list *work_queues = &_work_queues; | |
354d119a | 26 | |
27 | #define WORK_QUEUE_MIN_GRANULARITY 1 | |
28 | ||
d62a17ae | 29 | static struct work_queue_item *work_queue_item_new(struct work_queue *wq) |
354d119a | 30 | { |
d62a17ae | 31 | struct work_queue_item *item; |
32 | assert(wq); | |
354d119a | 33 | |
d62a17ae | 34 | item = XCALLOC(MTYPE_WORK_QUEUE_ITEM, sizeof(struct work_queue_item)); |
35 | ||
36 | return item; | |
354d119a | 37 | } |
38 | ||
d62a17ae | 39 | static void work_queue_item_free(struct work_queue_item *item) |
354d119a | 40 | { |
d62a17ae | 41 | XFREE(MTYPE_WORK_QUEUE_ITEM, item); |
42 | return; | |
354d119a | 43 | } |
44 | ||
da7f979a DS |
45 | static void work_queue_item_remove(struct work_queue *wq, |
46 | struct work_queue_item *item) | |
47 | { | |
48 | assert(item && item->data); | |
49 | ||
50 | /* call private data deletion callback if needed */ | |
51 | if (wq->spec.del_item_data) | |
52 | wq->spec.del_item_data(wq, item->data); | |
53 | ||
54 | work_queue_item_dequeue(wq, item); | |
55 | ||
56 | work_queue_item_free(item); | |
57 | ||
58 | return; | |
59 | } | |
60 | ||
354d119a | 61 | /* create new work queue */ |
cd9d0537 | 62 | struct work_queue *work_queue_new(struct event_loop *m, const char *queue_name) |
354d119a | 63 | { |
d62a17ae | 64 | struct work_queue *new; |
65 | ||
66 | new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue)); | |
67 | ||
d62a17ae | 68 | new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name); |
69 | new->master = m; | |
70 | SET_FLAG(new->flags, WQ_UNPLUGGED); | |
71 | ||
f104f6c1 | 72 | STAILQ_INIT(&new->items); |
d62a17ae | 73 | |
74 | listnode_add(work_queues, new); | |
75 | ||
76 | new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; | |
77 | ||
5418f988 | 78 | /* Default values, can be overridden by caller */ |
d62a17ae | 79 | new->spec.hold = WORK_QUEUE_DEFAULT_HOLD; |
ba7d2705 | 80 | new->spec.yield = EVENT_YIELD_TIME_SLOT; |
5418f988 | 81 | new->spec.retry = WORK_QUEUE_DEFAULT_RETRY; |
d62a17ae | 82 | |
83 | return new; | |
354d119a | 84 | } |
85 | ||
6b097e33 | 86 | void work_queue_free_and_null(struct work_queue **wqp) |
354d119a | 87 | { |
6b097e33 MS |
88 | struct work_queue *wq = *wqp; |
89 | ||
e16d030c | 90 | EVENT_OFF(wq->thread); |
d62a17ae | 91 | |
da7f979a DS |
92 | while (!work_queue_empty(wq)) { |
93 | struct work_queue_item *item = work_queue_last_item(wq); | |
94 | ||
95 | work_queue_item_remove(wq, item); | |
96 | } | |
97 | ||
d62a17ae | 98 | listnode_delete(work_queues, wq); |
99 | ||
100 | XFREE(MTYPE_WORK_QUEUE_NAME, wq->name); | |
101 | XFREE(MTYPE_WORK_QUEUE, wq); | |
354d119a | 102 | |
6b097e33 | 103 | *wqp = NULL; |
e208c8f9 DS |
104 | } |
105 | ||
d62a17ae | 106 | bool work_queue_is_scheduled(struct work_queue *wq) |
86582682 | 107 | { |
5f6eaa9b | 108 | return event_is_scheduled(wq->thread); |
86582682 PJ |
109 | } |
110 | ||
d62a17ae | 111 | static int work_queue_schedule(struct work_queue *wq, unsigned int delay) |
269d74fd | 112 | { |
d62a17ae | 113 | /* if appropriate, schedule work queue thread */ |
d1c27668 | 114 | if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && |
5f6eaa9b | 115 | !event_is_scheduled(wq->thread) && !work_queue_empty(wq)) { |
5418f988 MS |
116 | /* Schedule timer if there's a delay, otherwise just schedule |
117 | * as an 'event' | |
118 | */ | |
e8b3a2f7 | 119 | if (delay > 0) { |
907a2395 DS |
120 | event_add_timer_msec(wq->master, work_queue_run, wq, |
121 | delay, &wq->thread); | |
5f6eaa9b | 122 | event_ignore_late_timer(wq->thread); |
e8b3a2f7 | 123 | } else |
907a2395 DS |
124 | event_add_event(wq->master, work_queue_run, wq, 0, |
125 | &wq->thread); | |
5418f988 | 126 | |
d62a17ae | 127 | /* set thread yield time, if needed */ |
5f6eaa9b | 128 | if (event_is_scheduled(wq->thread) && |
ba7d2705 | 129 | wq->spec.yield != EVENT_YIELD_TIME_SLOT) |
70c35c11 | 130 | event_set_yield_time(wq->thread, wq->spec.yield); |
d62a17ae | 131 | return 1; |
132 | } else | |
133 | return 0; | |
269d74fd | 134 | } |
d62a17ae | 135 | |
136 | void work_queue_add(struct work_queue *wq, void *data) | |
354d119a | 137 | { |
d62a17ae | 138 | struct work_queue_item *item; |
139 | ||
140 | assert(wq); | |
141 | ||
0ce1ca80 | 142 | item = work_queue_item_new(wq); |
d62a17ae | 143 | |
144 | item->data = data; | |
f104f6c1 | 145 | work_queue_item_enqueue(wq, item); |
d62a17ae | 146 | |
147 | work_queue_schedule(wq, wq->spec.hold); | |
148 | ||
149 | return; | |
354d119a | 150 | } |
151 | ||
996c9314 LB |
152 | static void work_queue_item_requeue(struct work_queue *wq, |
153 | struct work_queue_item *item) | |
354d119a | 154 | { |
f104f6c1 JB |
155 | work_queue_item_dequeue(wq, item); |
156 | ||
157 | /* attach to end of list */ | |
158 | work_queue_item_enqueue(wq, item); | |
354d119a | 159 | } |
160 | ||
49d41a26 DS |
161 | DEFUN (show_work_queues, |
162 | show_work_queues_cmd, | |
163 | "show work-queues", | |
164 | SHOW_STR | |
165 | "Work Queue information\n") | |
354d119a | 166 | { |
d62a17ae | 167 | struct listnode *node; |
168 | struct work_queue *wq; | |
169 | ||
170 | vty_out(vty, "%c %8s %5s %8s %8s %21s\n", ' ', "List", "(ms) ", | |
171 | "Q. Runs", "Yields", "Cycle Counts "); | |
172 | vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", 'P', "Items", | |
173 | "Hold", "Total", "Total", "Best", "Gran.", "Total", "Avg.", | |
174 | "Name"); | |
175 | ||
176 | for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) { | |
177 | vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n", | |
178 | (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'), | |
f104f6c1 | 179 | work_queue_item_count(wq), wq->spec.hold, wq->runs, |
d62a17ae | 180 | wq->yields, wq->cycles.best, wq->cycles.granularity, |
181 | wq->cycles.total, | |
182 | (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs) | |
183 | : 0, | |
184 | wq->name); | |
185 | } | |
186 | ||
187 | return CMD_SUCCESS; | |
354d119a | 188 | } |
189 | ||
d62a17ae | 190 | void workqueue_cmd_init(void) |
0b84f294 | 191 | { |
d62a17ae | 192 | install_element(VIEW_NODE, &show_work_queues_cmd); |
0b84f294 DL |
193 | } |
194 | ||
269d74fd | 195 | /* 'plug' a queue: Stop it from being scheduled, |
196 | * ie: prevent the queue from draining. | |
197 | */ | |
d62a17ae | 198 | void work_queue_plug(struct work_queue *wq) |
269d74fd | 199 | { |
e16d030c | 200 | EVENT_OFF(wq->thread); |
d62a17ae | 201 | |
202 | UNSET_FLAG(wq->flags, WQ_UNPLUGGED); | |
269d74fd | 203 | } |
204 | ||
205 | /* unplug queue, schedule it again, if appropriate | |
206 | * Ie: Allow the queue to be drained again | |
207 | */ | |
d62a17ae | 208 | void work_queue_unplug(struct work_queue *wq) |
269d74fd | 209 | { |
d62a17ae | 210 | SET_FLAG(wq->flags, WQ_UNPLUGGED); |
269d74fd | 211 | |
d62a17ae | 212 | /* if thread isnt already waiting, add one */ |
213 | work_queue_schedule(wq, wq->spec.hold); | |
269d74fd | 214 | } |
215 | ||
354d119a | 216 | /* timer thread to process a work queue |
217 | * will reschedule itself if required, | |
d62a17ae | 218 | * otherwise work_queue_item_add |
354d119a | 219 | */ |
e6685141 | 220 | void work_queue_run(struct event *thread) |
354d119a | 221 | { |
d62a17ae | 222 | struct work_queue *wq; |
f104f6c1 | 223 | struct work_queue_item *item, *titem; |
5418f988 | 224 | wq_item_status ret = WQ_SUCCESS; |
d62a17ae | 225 | unsigned int cycles = 0; |
d62a17ae | 226 | char yielded = 0; |
227 | ||
e16d030c | 228 | wq = EVENT_ARG(thread); |
d62a17ae | 229 | |
f104f6c1 | 230 | assert(wq); |
d62a17ae | 231 | |
232 | /* calculate cycle granularity: | |
233 | * list iteration == 1 run | |
234 | * listnode processing == 1 cycle | |
235 | * granularity == # cycles between checks whether we should yield. | |
236 | * | |
237 | * granularity should be > 0, and can increase slowly after each run to | |
238 | * provide some hysteris, but not past cycles.best or 2*cycles. | |
239 | * | |
240 | * Best: starts low, can only increase | |
241 | * | |
242 | * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased | |
243 | * if we run to end of time slot, can increase otherwise | |
244 | * by a small factor. | |
245 | * | |
246 | * We could use just the average and save some work, however we want to | |
247 | * be | |
248 | * able to adjust quickly to CPU pressure. Average wont shift much if | |
249 | * daemon has been running a long time. | |
250 | */ | |
251 | if (wq->cycles.granularity == 0) | |
252 | wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; | |
253 | ||
a2addae8 | 254 | STAILQ_FOREACH_SAFE (item, &wq->items, wq, titem) { |
1383ff9c | 255 | assert(item->data); |
d62a17ae | 256 | |
257 | /* dont run items which are past their allowed retries */ | |
258 | if (item->ran > wq->spec.max_retries) { | |
f104f6c1 | 259 | work_queue_item_remove(wq, item); |
d62a17ae | 260 | continue; |
261 | } | |
262 | ||
263 | /* run and take care of items that want to be retried | |
264 | * immediately */ | |
265 | do { | |
266 | ret = wq->spec.workfunc(wq, item->data); | |
267 | item->ran++; | |
268 | } while ((ret == WQ_RETRY_NOW) | |
269 | && (item->ran < wq->spec.max_retries)); | |
270 | ||
271 | switch (ret) { | |
272 | case WQ_QUEUE_BLOCKED: { | |
273 | /* decrement item->ran again, cause this isn't an item | |
274 | * specific error, and fall through to WQ_RETRY_LATER | |
275 | */ | |
276 | item->ran--; | |
277 | } | |
278 | case WQ_RETRY_LATER: { | |
279 | goto stats; | |
280 | } | |
281 | case WQ_REQUEUE: { | |
282 | item->ran--; | |
f104f6c1 | 283 | work_queue_item_requeue(wq, item); |
d62a17ae | 284 | /* If a single node is being used with a meta-queue |
285 | * (e.g., zebra), | |
286 | * update the next node as we don't want to exit the | |
287 | * thread and | |
288 | * reschedule it after every node. By definition, | |
289 | * WQ_REQUEUE is | |
290 | * meant to continue the processing; the yield logic | |
291 | * will kick in | |
292 | * to terminate the thread when time has exceeded. | |
293 | */ | |
f104f6c1 JB |
294 | if (titem == NULL) |
295 | titem = item; | |
d62a17ae | 296 | break; |
297 | } | |
298 | case WQ_RETRY_NOW: | |
299 | /* a RETRY_NOW that gets here has exceeded max_tries, same as | |
300 | * ERROR */ | |
d62a17ae | 301 | /* fallthru */ |
302 | case WQ_SUCCESS: | |
303 | default: { | |
f104f6c1 | 304 | work_queue_item_remove(wq, item); |
d62a17ae | 305 | break; |
306 | } | |
307 | } | |
308 | ||
309 | /* completed cycle */ | |
310 | cycles++; | |
311 | ||
312 | /* test if we should yield */ | |
70c35c11 DS |
313 | if (!(cycles % wq->cycles.granularity) && |
314 | event_should_yield(thread)) { | |
d62a17ae | 315 | yielded = 1; |
316 | goto stats; | |
317 | } | |
354d119a | 318 | } |
354d119a | 319 | |
320 | stats: | |
321 | ||
3322055b | 322 | #define WQ_HYSTERESIS_FACTOR 4 |
354d119a | 323 | |
d62a17ae | 324 | /* we yielded, check whether granularity should be reduced */ |
325 | if (yielded && (cycles < wq->cycles.granularity)) { | |
326 | wq->cycles.granularity = | |
327 | ((cycles > 0) ? cycles : WORK_QUEUE_MIN_GRANULARITY); | |
328 | } | |
329 | /* otherwise, should granularity increase? */ | |
330 | else if (cycles >= (wq->cycles.granularity)) { | |
331 | if (cycles > wq->cycles.best) | |
332 | wq->cycles.best = cycles; | |
333 | ||
334 | /* along with yielded check, provides hysteresis for granularity | |
335 | */ | |
336 | if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR | |
337 | * WQ_HYSTERESIS_FACTOR)) | |
338 | wq->cycles.granularity *= | |
339 | WQ_HYSTERESIS_FACTOR; /* quick ramp-up */ | |
340 | else if (cycles | |
341 | > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR)) | |
342 | wq->cycles.granularity += WQ_HYSTERESIS_FACTOR; | |
343 | } | |
354d119a | 344 | #undef WQ_HYSTERIS_FACTOR |
d62a17ae | 345 | |
346 | wq->runs++; | |
347 | wq->cycles.total += cycles; | |
348 | if (yielded) | |
349 | wq->yields++; | |
354d119a | 350 | |
d62a17ae | 351 | /* Is the queue done yet? If it is, call the completion callback. */ |
5418f988 MS |
352 | if (!work_queue_empty(wq)) { |
353 | if (ret == WQ_RETRY_LATER || | |
354 | ret == WQ_QUEUE_BLOCKED) | |
355 | work_queue_schedule(wq, wq->spec.retry); | |
356 | else | |
357 | work_queue_schedule(wq, 0); | |
358 | ||
359 | } else if (wq->spec.completion_func) | |
d62a17ae | 360 | wq->spec.completion_func(wq); |
354d119a | 361 | } |