]>
Commit | Line | Data |
---|---|---|
d62a17ae | 1 | /* |
354d119a | 2 | * Quagga Work Queue Support. |
3 | * | |
4 | * Copyright (C) 2005 Sun Microsystems, Inc. | |
5 | * | |
6 | * This file is part of GNU Zebra. | |
7 | * | |
8 | * Quagga is free software; you can redistribute it and/or modify it | |
9 | * under the terms of the GNU General Public License as published by the | |
10 | * Free Software Foundation; either version 2, or (at your option) any | |
11 | * later version. | |
12 | * | |
13 | * Quagga is distributed in the hope that it will be useful, but | |
14 | * WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 | * General Public License for more details. | |
17 | * | |
896014f4 DL |
18 | * You should have received a copy of the GNU General Public License along |
19 | * with this program; see the file COPYING; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
354d119a | 21 | */ |
22 | ||
7a2fbbf0 | 23 | #include <zebra.h> |
354d119a | 24 | #include "thread.h" |
25 | #include "memory.h" | |
26 | #include "workqueue.h" | |
27 | #include "linklist.h" | |
28 | #include "command.h" | |
29 | #include "log.h" | |
30 | ||
d62a17ae | 31 | DEFINE_MTYPE(LIB, WORK_QUEUE, "Work queue") |
4a1ab8e4 DL |
32 | DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_ITEM, "Work queue item") |
33 | DEFINE_MTYPE_STATIC(LIB, WORK_QUEUE_NAME, "Work queue name string") | |
34 | ||
354d119a | 35 | /* master list of work_queues */ |
24873f0c | 36 | static struct list _work_queues; |
1f9a9fff | 37 | /* pointer primarily to avoid an otherwise harmless warning on |
24873f0c DS |
38 | * ALL_LIST_ELEMENTS_RO |
39 | */ | |
40 | static struct list *work_queues = &_work_queues; | |
354d119a | 41 | |
42 | #define WORK_QUEUE_MIN_GRANULARITY 1 | |
43 | ||
d62a17ae | 44 | static struct work_queue_item *work_queue_item_new(struct work_queue *wq) |
354d119a | 45 | { |
d62a17ae | 46 | struct work_queue_item *item; |
47 | assert(wq); | |
354d119a | 48 | |
d62a17ae | 49 | item = XCALLOC(MTYPE_WORK_QUEUE_ITEM, sizeof(struct work_queue_item)); |
50 | ||
51 | return item; | |
354d119a | 52 | } |
53 | ||
d62a17ae | 54 | static void work_queue_item_free(struct work_queue_item *item) |
354d119a | 55 | { |
d62a17ae | 56 | XFREE(MTYPE_WORK_QUEUE_ITEM, item); |
57 | return; | |
354d119a | 58 | } |
59 | ||
da7f979a DS |
60 | static void work_queue_item_remove(struct work_queue *wq, |
61 | struct work_queue_item *item) | |
62 | { | |
63 | assert(item && item->data); | |
64 | ||
65 | /* call private data deletion callback if needed */ | |
66 | if (wq->spec.del_item_data) | |
67 | wq->spec.del_item_data(wq, item->data); | |
68 | ||
69 | work_queue_item_dequeue(wq, item); | |
70 | ||
71 | work_queue_item_free(item); | |
72 | ||
73 | return; | |
74 | } | |
75 | ||
354d119a | 76 | /* create new work queue */ |
d62a17ae | 77 | struct work_queue *work_queue_new(struct thread_master *m, |
78 | const char *queue_name) | |
354d119a | 79 | { |
d62a17ae | 80 | struct work_queue *new; |
81 | ||
82 | new = XCALLOC(MTYPE_WORK_QUEUE, sizeof(struct work_queue)); | |
83 | ||
d62a17ae | 84 | new->name = XSTRDUP(MTYPE_WORK_QUEUE_NAME, queue_name); |
85 | new->master = m; | |
86 | SET_FLAG(new->flags, WQ_UNPLUGGED); | |
87 | ||
f104f6c1 | 88 | STAILQ_INIT(&new->items); |
d62a17ae | 89 | |
90 | listnode_add(work_queues, new); | |
91 | ||
92 | new->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; | |
93 | ||
5418f988 | 94 | /* Default values, can be overridden by caller */ |
d62a17ae | 95 | new->spec.hold = WORK_QUEUE_DEFAULT_HOLD; |
96 | new->spec.yield = THREAD_YIELD_TIME_SLOT; | |
5418f988 | 97 | new->spec.retry = WORK_QUEUE_DEFAULT_RETRY; |
d62a17ae | 98 | |
99 | return new; | |
354d119a | 100 | } |
101 | ||
e208c8f9 | 102 | void work_queue_free_original(struct work_queue *wq) |
354d119a | 103 | { |
d62a17ae | 104 | if (wq->thread != NULL) |
105 | thread_cancel(wq->thread); | |
106 | ||
da7f979a DS |
107 | while (!work_queue_empty(wq)) { |
108 | struct work_queue_item *item = work_queue_last_item(wq); | |
109 | ||
110 | work_queue_item_remove(wq, item); | |
111 | } | |
112 | ||
d62a17ae | 113 | listnode_delete(work_queues, wq); |
114 | ||
115 | XFREE(MTYPE_WORK_QUEUE_NAME, wq->name); | |
116 | XFREE(MTYPE_WORK_QUEUE, wq); | |
117 | return; | |
354d119a | 118 | } |
119 | ||
e208c8f9 DS |
120 | void work_queue_free_and_null(struct work_queue **wq) |
121 | { | |
122 | work_queue_free_original(*wq); | |
123 | *wq = NULL; | |
124 | } | |
125 | ||
d62a17ae | 126 | bool work_queue_is_scheduled(struct work_queue *wq) |
86582682 | 127 | { |
d62a17ae | 128 | return (wq->thread != NULL); |
86582682 PJ |
129 | } |
130 | ||
d62a17ae | 131 | static int work_queue_schedule(struct work_queue *wq, unsigned int delay) |
269d74fd | 132 | { |
d62a17ae | 133 | /* if appropriate, schedule work queue thread */ |
996c9314 LB |
134 | if (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) && (wq->thread == NULL) |
135 | && !work_queue_empty(wq)) { | |
d62a17ae | 136 | wq->thread = NULL; |
5418f988 MS |
137 | |
138 | /* Schedule timer if there's a delay, otherwise just schedule | |
139 | * as an 'event' | |
140 | */ | |
141 | if (delay > 0) | |
142 | thread_add_timer_msec(wq->master, work_queue_run, wq, | |
143 | delay, &wq->thread); | |
144 | else | |
145 | thread_add_event(wq->master, work_queue_run, wq, 0, | |
146 | &wq->thread); | |
147 | ||
d62a17ae | 148 | /* set thread yield time, if needed */ |
149 | if (wq->thread && wq->spec.yield != THREAD_YIELD_TIME_SLOT) | |
150 | thread_set_yield_time(wq->thread, wq->spec.yield); | |
151 | return 1; | |
152 | } else | |
153 | return 0; | |
269d74fd | 154 | } |
d62a17ae | 155 | |
156 | void work_queue_add(struct work_queue *wq, void *data) | |
354d119a | 157 | { |
d62a17ae | 158 | struct work_queue_item *item; |
159 | ||
160 | assert(wq); | |
161 | ||
0ce1ca80 | 162 | item = work_queue_item_new(wq); |
d62a17ae | 163 | |
164 | item->data = data; | |
f104f6c1 | 165 | work_queue_item_enqueue(wq, item); |
d62a17ae | 166 | |
167 | work_queue_schedule(wq, wq->spec.hold); | |
168 | ||
169 | return; | |
354d119a | 170 | } |
171 | ||
996c9314 LB |
172 | static void work_queue_item_requeue(struct work_queue *wq, |
173 | struct work_queue_item *item) | |
354d119a | 174 | { |
f104f6c1 JB |
175 | work_queue_item_dequeue(wq, item); |
176 | ||
177 | /* attach to end of list */ | |
178 | work_queue_item_enqueue(wq, item); | |
354d119a | 179 | } |
180 | ||
49d41a26 DS |
181 | DEFUN (show_work_queues, |
182 | show_work_queues_cmd, | |
183 | "show work-queues", | |
184 | SHOW_STR | |
185 | "Work Queue information\n") | |
354d119a | 186 | { |
d62a17ae | 187 | struct listnode *node; |
188 | struct work_queue *wq; | |
189 | ||
190 | vty_out(vty, "%c %8s %5s %8s %8s %21s\n", ' ', "List", "(ms) ", | |
191 | "Q. Runs", "Yields", "Cycle Counts "); | |
192 | vty_out(vty, "%c %8s %5s %8s %8s %7s %6s %8s %6s %s\n", 'P', "Items", | |
193 | "Hold", "Total", "Total", "Best", "Gran.", "Total", "Avg.", | |
194 | "Name"); | |
195 | ||
196 | for (ALL_LIST_ELEMENTS_RO(work_queues, node, wq)) { | |
197 | vty_out(vty, "%c %8d %5d %8ld %8ld %7d %6d %8ld %6u %s\n", | |
198 | (CHECK_FLAG(wq->flags, WQ_UNPLUGGED) ? ' ' : 'P'), | |
f104f6c1 | 199 | work_queue_item_count(wq), wq->spec.hold, wq->runs, |
d62a17ae | 200 | wq->yields, wq->cycles.best, wq->cycles.granularity, |
201 | wq->cycles.total, | |
202 | (wq->runs) ? (unsigned int)(wq->cycles.total / wq->runs) | |
203 | : 0, | |
204 | wq->name); | |
205 | } | |
206 | ||
207 | return CMD_SUCCESS; | |
354d119a | 208 | } |
209 | ||
d62a17ae | 210 | void workqueue_cmd_init(void) |
0b84f294 | 211 | { |
d62a17ae | 212 | install_element(VIEW_NODE, &show_work_queues_cmd); |
0b84f294 DL |
213 | } |
214 | ||
269d74fd | 215 | /* 'plug' a queue: Stop it from being scheduled, |
216 | * ie: prevent the queue from draining. | |
217 | */ | |
d62a17ae | 218 | void work_queue_plug(struct work_queue *wq) |
269d74fd | 219 | { |
d62a17ae | 220 | if (wq->thread) |
221 | thread_cancel(wq->thread); | |
222 | ||
223 | wq->thread = NULL; | |
224 | ||
225 | UNSET_FLAG(wq->flags, WQ_UNPLUGGED); | |
269d74fd | 226 | } |
227 | ||
228 | /* unplug queue, schedule it again, if appropriate | |
229 | * Ie: Allow the queue to be drained again | |
230 | */ | |
d62a17ae | 231 | void work_queue_unplug(struct work_queue *wq) |
269d74fd | 232 | { |
d62a17ae | 233 | SET_FLAG(wq->flags, WQ_UNPLUGGED); |
269d74fd | 234 | |
d62a17ae | 235 | /* if thread isnt already waiting, add one */ |
236 | work_queue_schedule(wq, wq->spec.hold); | |
269d74fd | 237 | } |
238 | ||
354d119a | 239 | /* timer thread to process a work queue |
240 | * will reschedule itself if required, | |
d62a17ae | 241 | * otherwise work_queue_item_add |
354d119a | 242 | */ |
d62a17ae | 243 | int work_queue_run(struct thread *thread) |
354d119a | 244 | { |
d62a17ae | 245 | struct work_queue *wq; |
f104f6c1 | 246 | struct work_queue_item *item, *titem; |
5418f988 | 247 | wq_item_status ret = WQ_SUCCESS; |
d62a17ae | 248 | unsigned int cycles = 0; |
d62a17ae | 249 | char yielded = 0; |
250 | ||
251 | wq = THREAD_ARG(thread); | |
d62a17ae | 252 | |
f104f6c1 | 253 | assert(wq); |
d62a17ae | 254 | |
b575a12c A |
255 | wq->thread = NULL; |
256 | ||
d62a17ae | 257 | /* calculate cycle granularity: |
258 | * list iteration == 1 run | |
259 | * listnode processing == 1 cycle | |
260 | * granularity == # cycles between checks whether we should yield. | |
261 | * | |
262 | * granularity should be > 0, and can increase slowly after each run to | |
263 | * provide some hysteris, but not past cycles.best or 2*cycles. | |
264 | * | |
265 | * Best: starts low, can only increase | |
266 | * | |
267 | * Granularity: starts at WORK_QUEUE_MIN_GRANULARITY, can be decreased | |
268 | * if we run to end of time slot, can increase otherwise | |
269 | * by a small factor. | |
270 | * | |
271 | * We could use just the average and save some work, however we want to | |
272 | * be | |
273 | * able to adjust quickly to CPU pressure. Average wont shift much if | |
274 | * daemon has been running a long time. | |
275 | */ | |
276 | if (wq->cycles.granularity == 0) | |
277 | wq->cycles.granularity = WORK_QUEUE_MIN_GRANULARITY; | |
278 | ||
a2addae8 | 279 | STAILQ_FOREACH_SAFE (item, &wq->items, wq, titem) { |
d62a17ae | 280 | assert(item && item->data); |
281 | ||
282 | /* dont run items which are past their allowed retries */ | |
283 | if (item->ran > wq->spec.max_retries) { | |
284 | /* run error handler, if any */ | |
285 | if (wq->spec.errorfunc) | |
286 | wq->spec.errorfunc(wq, item->data); | |
f104f6c1 | 287 | work_queue_item_remove(wq, item); |
d62a17ae | 288 | continue; |
289 | } | |
290 | ||
291 | /* run and take care of items that want to be retried | |
292 | * immediately */ | |
293 | do { | |
294 | ret = wq->spec.workfunc(wq, item->data); | |
295 | item->ran++; | |
296 | } while ((ret == WQ_RETRY_NOW) | |
297 | && (item->ran < wq->spec.max_retries)); | |
298 | ||
299 | switch (ret) { | |
300 | case WQ_QUEUE_BLOCKED: { | |
301 | /* decrement item->ran again, cause this isn't an item | |
302 | * specific error, and fall through to WQ_RETRY_LATER | |
303 | */ | |
304 | item->ran--; | |
305 | } | |
306 | case WQ_RETRY_LATER: { | |
307 | goto stats; | |
308 | } | |
309 | case WQ_REQUEUE: { | |
310 | item->ran--; | |
f104f6c1 | 311 | work_queue_item_requeue(wq, item); |
d62a17ae | 312 | /* If a single node is being used with a meta-queue |
313 | * (e.g., zebra), | |
314 | * update the next node as we don't want to exit the | |
315 | * thread and | |
316 | * reschedule it after every node. By definition, | |
317 | * WQ_REQUEUE is | |
318 | * meant to continue the processing; the yield logic | |
319 | * will kick in | |
320 | * to terminate the thread when time has exceeded. | |
321 | */ | |
f104f6c1 JB |
322 | if (titem == NULL) |
323 | titem = item; | |
d62a17ae | 324 | break; |
325 | } | |
326 | case WQ_RETRY_NOW: | |
327 | /* a RETRY_NOW that gets here has exceeded max_tries, same as | |
328 | * ERROR */ | |
329 | case WQ_ERROR: { | |
330 | if (wq->spec.errorfunc) | |
331 | wq->spec.errorfunc(wq, item); | |
332 | } | |
333 | /* fallthru */ | |
334 | case WQ_SUCCESS: | |
335 | default: { | |
f104f6c1 | 336 | work_queue_item_remove(wq, item); |
d62a17ae | 337 | break; |
338 | } | |
339 | } | |
340 | ||
341 | /* completed cycle */ | |
342 | cycles++; | |
343 | ||
344 | /* test if we should yield */ | |
345 | if (!(cycles % wq->cycles.granularity) | |
346 | && thread_should_yield(thread)) { | |
347 | yielded = 1; | |
348 | goto stats; | |
349 | } | |
354d119a | 350 | } |
354d119a | 351 | |
352 | stats: | |
353 | ||
3322055b | 354 | #define WQ_HYSTERESIS_FACTOR 4 |
354d119a | 355 | |
d62a17ae | 356 | /* we yielded, check whether granularity should be reduced */ |
357 | if (yielded && (cycles < wq->cycles.granularity)) { | |
358 | wq->cycles.granularity = | |
359 | ((cycles > 0) ? cycles : WORK_QUEUE_MIN_GRANULARITY); | |
360 | } | |
361 | /* otherwise, should granularity increase? */ | |
362 | else if (cycles >= (wq->cycles.granularity)) { | |
363 | if (cycles > wq->cycles.best) | |
364 | wq->cycles.best = cycles; | |
365 | ||
366 | /* along with yielded check, provides hysteresis for granularity | |
367 | */ | |
368 | if (cycles > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR | |
369 | * WQ_HYSTERESIS_FACTOR)) | |
370 | wq->cycles.granularity *= | |
371 | WQ_HYSTERESIS_FACTOR; /* quick ramp-up */ | |
372 | else if (cycles | |
373 | > (wq->cycles.granularity * WQ_HYSTERESIS_FACTOR)) | |
374 | wq->cycles.granularity += WQ_HYSTERESIS_FACTOR; | |
375 | } | |
354d119a | 376 | #undef WQ_HYSTERIS_FACTOR |
d62a17ae | 377 | |
378 | wq->runs++; | |
379 | wq->cycles.total += cycles; | |
380 | if (yielded) | |
381 | wq->yields++; | |
354d119a | 382 | |
383 | #if 0 | |
384 | printf ("%s: cycles %d, new: best %d, worst %d\n", | |
385 | __func__, cycles, wq->cycles.best, wq->cycles.granularity); | |
386 | #endif | |
d62a17ae | 387 | |
388 | /* Is the queue done yet? If it is, call the completion callback. */ | |
5418f988 MS |
389 | if (!work_queue_empty(wq)) { |
390 | if (ret == WQ_RETRY_LATER || | |
391 | ret == WQ_QUEUE_BLOCKED) | |
392 | work_queue_schedule(wq, wq->spec.retry); | |
393 | else | |
394 | work_queue_schedule(wq, 0); | |
395 | ||
396 | } else if (wq->spec.completion_func) | |
d62a17ae | 397 | wq->spec.completion_func(wq); |
398 | ||
399 | return 0; | |
354d119a | 400 | } |