]> git.proxmox.com Git - mirror_ubuntu-jammy-kernel.git/blame - kernel/trace/ring_buffer.c
ring_buffer: remove raw from local_irq_save
[mirror_ubuntu-jammy-kernel.git] / kernel / trace / ring_buffer.c
CommitLineData
7a8e76a3
SR
1/*
2 * Generic ring buffer
3 *
4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5 */
6#include <linux/ring_buffer.h>
7#include <linux/spinlock.h>
8#include <linux/debugfs.h>
9#include <linux/uaccess.h>
10#include <linux/module.h>
11#include <linux/percpu.h>
12#include <linux/mutex.h>
13#include <linux/sched.h> /* used for sched_clock() (for now) */
14#include <linux/init.h>
15#include <linux/hash.h>
16#include <linux/list.h>
17#include <linux/fs.h>
18
19/* Up this if you want to test the TIME_EXTENTS and normalization */
20#define DEBUG_SHIFT 0
21
22/* FIXME!!! */
23u64 ring_buffer_time_stamp(int cpu)
24{
25 /* shift to debug/test normalization and TIME_EXTENTS */
26 return sched_clock() << DEBUG_SHIFT;
27}
28
29void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
30{
31 /* Just stupid testing the normalize function and deltas */
32 *ts >>= DEBUG_SHIFT;
33}
34
35#define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36#define RB_ALIGNMENT_SHIFT 2
37#define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
38#define RB_MAX_SMALL_DATA 28
39
40enum {
41 RB_LEN_TIME_EXTEND = 8,
42 RB_LEN_TIME_STAMP = 16,
43};
44
45/* inline for ring buffer fast paths */
46static inline unsigned
47rb_event_length(struct ring_buffer_event *event)
48{
49 unsigned length;
50
51 switch (event->type) {
52 case RINGBUF_TYPE_PADDING:
53 /* undefined */
54 return -1;
55
56 case RINGBUF_TYPE_TIME_EXTEND:
57 return RB_LEN_TIME_EXTEND;
58
59 case RINGBUF_TYPE_TIME_STAMP:
60 return RB_LEN_TIME_STAMP;
61
62 case RINGBUF_TYPE_DATA:
63 if (event->len)
64 length = event->len << RB_ALIGNMENT_SHIFT;
65 else
66 length = event->array[0];
67 return length + RB_EVNT_HDR_SIZE;
68 default:
69 BUG();
70 }
71 /* not hit */
72 return 0;
73}
74
75/**
76 * ring_buffer_event_length - return the length of the event
77 * @event: the event to get the length of
78 */
79unsigned ring_buffer_event_length(struct ring_buffer_event *event)
80{
81 return rb_event_length(event);
82}
83
84/* inline for ring buffer fast paths */
85static inline void *
86rb_event_data(struct ring_buffer_event *event)
87{
88 BUG_ON(event->type != RINGBUF_TYPE_DATA);
89 /* If length is in len field, then array[0] has the data */
90 if (event->len)
91 return (void *)&event->array[0];
92 /* Otherwise length is in array[0] and array[1] has the data */
93 return (void *)&event->array[1];
94}
95
96/**
97 * ring_buffer_event_data - return the data of the event
98 * @event: the event to get the data from
99 */
100void *ring_buffer_event_data(struct ring_buffer_event *event)
101{
102 return rb_event_data(event);
103}
104
105#define for_each_buffer_cpu(buffer, cpu) \
106 for_each_cpu_mask(cpu, buffer->cpumask)
107
108#define TS_SHIFT 27
109#define TS_MASK ((1ULL << TS_SHIFT) - 1)
110#define TS_DELTA_TEST (~TS_MASK)
111
112/*
113 * This hack stolen from mm/slob.c.
114 * We can store per page timing information in the page frame of the page.
115 * Thanks to Peter Zijlstra for suggesting this idea.
116 */
117struct buffer_page {
118 union {
119 struct {
120 unsigned long flags; /* mandatory */
121 atomic_t _count; /* mandatory */
122 u64 time_stamp; /* page time stamp */
123 unsigned size; /* size of page data */
124 struct list_head list; /* list of free pages */
125 };
126 struct page page;
127 };
128};
129
ed56829c
SR
130/*
131 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
132 * this issue out.
133 */
134static inline void free_buffer_page(struct buffer_page *bpage)
135{
136 reset_page_mapcount(&bpage->page);
137 bpage->page.mapping = NULL;
138 __free_page(&bpage->page);
139}
140
7a8e76a3
SR
141/*
142 * We need to fit the time_stamp delta into 27 bits.
143 */
144static inline int test_time_stamp(u64 delta)
145{
146 if (delta & TS_DELTA_TEST)
147 return 1;
148 return 0;
149}
150
151#define BUF_PAGE_SIZE PAGE_SIZE
152
153/*
154 * head_page == tail_page && head == tail then buffer is empty.
155 */
156struct ring_buffer_per_cpu {
157 int cpu;
158 struct ring_buffer *buffer;
159 spinlock_t lock;
160 struct lock_class_key lock_key;
161 struct list_head pages;
162 unsigned long head; /* read from head */
163 unsigned long tail; /* write to tail */
164 struct buffer_page *head_page;
165 struct buffer_page *tail_page;
166 unsigned long overrun;
167 unsigned long entries;
168 u64 write_stamp;
169 u64 read_stamp;
170 atomic_t record_disabled;
171};
172
173struct ring_buffer {
174 unsigned long size;
175 unsigned pages;
176 unsigned flags;
177 int cpus;
178 cpumask_t cpumask;
179 atomic_t record_disabled;
180
181 struct mutex mutex;
182
183 struct ring_buffer_per_cpu **buffers;
184};
185
186struct ring_buffer_iter {
187 struct ring_buffer_per_cpu *cpu_buffer;
188 unsigned long head;
189 struct buffer_page *head_page;
190 u64 read_stamp;
191};
192
193#define RB_WARN_ON(buffer, cond) \
194 if (unlikely(cond)) { \
195 atomic_inc(&buffer->record_disabled); \
196 WARN_ON(1); \
197 return -1; \
198 }
199
200/**
201 * check_pages - integrity check of buffer pages
202 * @cpu_buffer: CPU buffer with pages to test
203 *
204 * As a safty measure we check to make sure the data pages have not
205 * been corrupted.
206 */
207static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
208{
209 struct list_head *head = &cpu_buffer->pages;
210 struct buffer_page *page, *tmp;
211
212 RB_WARN_ON(cpu_buffer, head->next->prev != head);
213 RB_WARN_ON(cpu_buffer, head->prev->next != head);
214
215 list_for_each_entry_safe(page, tmp, head, list) {
216 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
217 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
218 }
219
220 return 0;
221}
222
223static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
224{
225 return cpu_buffer->head_page->size;
226}
227
228static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
229 unsigned nr_pages)
230{
231 struct list_head *head = &cpu_buffer->pages;
232 struct buffer_page *page, *tmp;
233 unsigned long addr;
234 LIST_HEAD(pages);
235 unsigned i;
236
237 for (i = 0; i < nr_pages; i++) {
238 addr = __get_free_page(GFP_KERNEL);
239 if (!addr)
240 goto free_pages;
241 page = (struct buffer_page *)virt_to_page(addr);
242 list_add(&page->list, &pages);
243 }
244
245 list_splice(&pages, head);
246
247 rb_check_pages(cpu_buffer);
248
249 return 0;
250
251 free_pages:
252 list_for_each_entry_safe(page, tmp, &pages, list) {
253 list_del_init(&page->list);
ed56829c 254 free_buffer_page(page);
7a8e76a3
SR
255 }
256 return -ENOMEM;
257}
258
259static struct ring_buffer_per_cpu *
260rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
261{
262 struct ring_buffer_per_cpu *cpu_buffer;
263 int ret;
264
265 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
266 GFP_KERNEL, cpu_to_node(cpu));
267 if (!cpu_buffer)
268 return NULL;
269
270 cpu_buffer->cpu = cpu;
271 cpu_buffer->buffer = buffer;
272 spin_lock_init(&cpu_buffer->lock);
273 INIT_LIST_HEAD(&cpu_buffer->pages);
274
275 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
276 if (ret < 0)
277 goto fail_free_buffer;
278
279 cpu_buffer->head_page
280 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
281 cpu_buffer->tail_page
282 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
283
284 return cpu_buffer;
285
286 fail_free_buffer:
287 kfree(cpu_buffer);
288 return NULL;
289}
290
291static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
292{
293 struct list_head *head = &cpu_buffer->pages;
294 struct buffer_page *page, *tmp;
295
296 list_for_each_entry_safe(page, tmp, head, list) {
297 list_del_init(&page->list);
ed56829c 298 free_buffer_page(page);
7a8e76a3
SR
299 }
300 kfree(cpu_buffer);
301}
302
a7b13743
SR
303/*
304 * Causes compile errors if the struct buffer_page gets bigger
305 * than the struct page.
306 */
307extern int ring_buffer_page_too_big(void);
308
7a8e76a3
SR
309/**
310 * ring_buffer_alloc - allocate a new ring_buffer
311 * @size: the size in bytes that is needed.
312 * @flags: attributes to set for the ring buffer.
313 *
314 * Currently the only flag that is available is the RB_FL_OVERWRITE
315 * flag. This flag means that the buffer will overwrite old data
316 * when the buffer wraps. If this flag is not set, the buffer will
317 * drop data when the tail hits the head.
318 */
319struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
320{
321 struct ring_buffer *buffer;
322 int bsize;
323 int cpu;
324
a7b13743
SR
325 /* Paranoid! Optimizes out when all is well */
326 if (sizeof(struct buffer_page) > sizeof(struct page))
327 ring_buffer_page_too_big();
328
329
7a8e76a3
SR
330 /* keep it in its own cache line */
331 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
332 GFP_KERNEL);
333 if (!buffer)
334 return NULL;
335
336 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
337 buffer->flags = flags;
338
339 /* need at least two pages */
340 if (buffer->pages == 1)
341 buffer->pages++;
342
343 buffer->cpumask = cpu_possible_map;
344 buffer->cpus = nr_cpu_ids;
345
346 bsize = sizeof(void *) * nr_cpu_ids;
347 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
348 GFP_KERNEL);
349 if (!buffer->buffers)
350 goto fail_free_buffer;
351
352 for_each_buffer_cpu(buffer, cpu) {
353 buffer->buffers[cpu] =
354 rb_allocate_cpu_buffer(buffer, cpu);
355 if (!buffer->buffers[cpu])
356 goto fail_free_buffers;
357 }
358
359 mutex_init(&buffer->mutex);
360
361 return buffer;
362
363 fail_free_buffers:
364 for_each_buffer_cpu(buffer, cpu) {
365 if (buffer->buffers[cpu])
366 rb_free_cpu_buffer(buffer->buffers[cpu]);
367 }
368 kfree(buffer->buffers);
369
370 fail_free_buffer:
371 kfree(buffer);
372 return NULL;
373}
374
375/**
376 * ring_buffer_free - free a ring buffer.
377 * @buffer: the buffer to free.
378 */
379void
380ring_buffer_free(struct ring_buffer *buffer)
381{
382 int cpu;
383
384 for_each_buffer_cpu(buffer, cpu)
385 rb_free_cpu_buffer(buffer->buffers[cpu]);
386
387 kfree(buffer);
388}
389
390static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
391
392static void
393rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
394{
395 struct buffer_page *page;
396 struct list_head *p;
397 unsigned i;
398
399 atomic_inc(&cpu_buffer->record_disabled);
400 synchronize_sched();
401
402 for (i = 0; i < nr_pages; i++) {
403 BUG_ON(list_empty(&cpu_buffer->pages));
404 p = cpu_buffer->pages.next;
405 page = list_entry(p, struct buffer_page, list);
406 list_del_init(&page->list);
ed56829c 407 free_buffer_page(page);
7a8e76a3
SR
408 }
409 BUG_ON(list_empty(&cpu_buffer->pages));
410
411 rb_reset_cpu(cpu_buffer);
412
413 rb_check_pages(cpu_buffer);
414
415 atomic_dec(&cpu_buffer->record_disabled);
416
417}
418
419static void
420rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
421 struct list_head *pages, unsigned nr_pages)
422{
423 struct buffer_page *page;
424 struct list_head *p;
425 unsigned i;
426
427 atomic_inc(&cpu_buffer->record_disabled);
428 synchronize_sched();
429
430 for (i = 0; i < nr_pages; i++) {
431 BUG_ON(list_empty(pages));
432 p = pages->next;
433 page = list_entry(p, struct buffer_page, list);
434 list_del_init(&page->list);
435 list_add_tail(&page->list, &cpu_buffer->pages);
436 }
437 rb_reset_cpu(cpu_buffer);
438
439 rb_check_pages(cpu_buffer);
440
441 atomic_dec(&cpu_buffer->record_disabled);
442}
443
444/**
445 * ring_buffer_resize - resize the ring buffer
446 * @buffer: the buffer to resize.
447 * @size: the new size.
448 *
449 * The tracer is responsible for making sure that the buffer is
450 * not being used while changing the size.
451 * Note: We may be able to change the above requirement by using
452 * RCU synchronizations.
453 *
454 * Minimum size is 2 * BUF_PAGE_SIZE.
455 *
456 * Returns -1 on failure.
457 */
458int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
459{
460 struct ring_buffer_per_cpu *cpu_buffer;
461 unsigned nr_pages, rm_pages, new_pages;
462 struct buffer_page *page, *tmp;
463 unsigned long buffer_size;
464 unsigned long addr;
465 LIST_HEAD(pages);
466 int i, cpu;
467
468 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
469 size *= BUF_PAGE_SIZE;
470 buffer_size = buffer->pages * BUF_PAGE_SIZE;
471
472 /* we need a minimum of two pages */
473 if (size < BUF_PAGE_SIZE * 2)
474 size = BUF_PAGE_SIZE * 2;
475
476 if (size == buffer_size)
477 return size;
478
479 mutex_lock(&buffer->mutex);
480
481 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
482
483 if (size < buffer_size) {
484
485 /* easy case, just free pages */
486 BUG_ON(nr_pages >= buffer->pages);
487
488 rm_pages = buffer->pages - nr_pages;
489
490 for_each_buffer_cpu(buffer, cpu) {
491 cpu_buffer = buffer->buffers[cpu];
492 rb_remove_pages(cpu_buffer, rm_pages);
493 }
494 goto out;
495 }
496
497 /*
498 * This is a bit more difficult. We only want to add pages
499 * when we can allocate enough for all CPUs. We do this
500 * by allocating all the pages and storing them on a local
501 * link list. If we succeed in our allocation, then we
502 * add these pages to the cpu_buffers. Otherwise we just free
503 * them all and return -ENOMEM;
504 */
505 BUG_ON(nr_pages <= buffer->pages);
506 new_pages = nr_pages - buffer->pages;
507
508 for_each_buffer_cpu(buffer, cpu) {
509 for (i = 0; i < new_pages; i++) {
510 addr = __get_free_page(GFP_KERNEL);
511 if (!addr)
512 goto free_pages;
513 page = (struct buffer_page *)virt_to_page(addr);
514 list_add(&page->list, &pages);
515 }
516 }
517
518 for_each_buffer_cpu(buffer, cpu) {
519 cpu_buffer = buffer->buffers[cpu];
520 rb_insert_pages(cpu_buffer, &pages, new_pages);
521 }
522
523 BUG_ON(!list_empty(&pages));
524
525 out:
526 buffer->pages = nr_pages;
527 mutex_unlock(&buffer->mutex);
528
529 return size;
530
531 free_pages:
532 list_for_each_entry_safe(page, tmp, &pages, list) {
533 list_del_init(&page->list);
ed56829c 534 free_buffer_page(page);
7a8e76a3
SR
535 }
536 return -ENOMEM;
537}
538
539static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
540{
541 return cpu_buffer->head_page == cpu_buffer->tail_page &&
542 cpu_buffer->head == cpu_buffer->tail;
543}
544
545static inline int rb_null_event(struct ring_buffer_event *event)
546{
547 return event->type == RINGBUF_TYPE_PADDING;
548}
549
550static inline void *rb_page_index(struct buffer_page *page, unsigned index)
551{
552 void *addr = page_address(&page->page);
553
554 return addr + index;
555}
556
557static inline struct ring_buffer_event *
558rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
559{
560 return rb_page_index(cpu_buffer->head_page,
561 cpu_buffer->head);
562}
563
564static inline struct ring_buffer_event *
565rb_iter_head_event(struct ring_buffer_iter *iter)
566{
567 return rb_page_index(iter->head_page,
568 iter->head);
569}
570
571/*
572 * When the tail hits the head and the buffer is in overwrite mode,
573 * the head jumps to the next page and all content on the previous
574 * page is discarded. But before doing so, we update the overrun
575 * variable of the buffer.
576 */
577static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
578{
579 struct ring_buffer_event *event;
580 unsigned long head;
581
582 for (head = 0; head < rb_head_size(cpu_buffer);
583 head += rb_event_length(event)) {
584
585 event = rb_page_index(cpu_buffer->head_page, head);
586 BUG_ON(rb_null_event(event));
587 /* Only count data entries */
588 if (event->type != RINGBUF_TYPE_DATA)
589 continue;
590 cpu_buffer->overrun++;
591 cpu_buffer->entries--;
592 }
593}
594
595static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
596 struct buffer_page **page)
597{
598 struct list_head *p = (*page)->list.next;
599
600 if (p == &cpu_buffer->pages)
601 p = p->next;
602
603 *page = list_entry(p, struct buffer_page, list);
604}
605
606static inline void
607rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
608{
609 cpu_buffer->tail_page->time_stamp = *ts;
610 cpu_buffer->write_stamp = *ts;
611}
612
613static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
614{
615 cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
616 cpu_buffer->head = 0;
617}
618
619static void
620rb_reset_iter_read_page(struct ring_buffer_iter *iter)
621{
622 iter->read_stamp = iter->head_page->time_stamp;
623 iter->head = 0;
624}
625
626/**
627 * ring_buffer_update_event - update event type and data
628 * @event: the even to update
629 * @type: the type of event
630 * @length: the size of the event field in the ring buffer
631 *
632 * Update the type and data fields of the event. The length
633 * is the actual size that is written to the ring buffer,
634 * and with this, we can determine what to place into the
635 * data field.
636 */
637static inline void
638rb_update_event(struct ring_buffer_event *event,
639 unsigned type, unsigned length)
640{
641 event->type = type;
642
643 switch (type) {
644
645 case RINGBUF_TYPE_PADDING:
646 break;
647
648 case RINGBUF_TYPE_TIME_EXTEND:
649 event->len =
650 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
651 >> RB_ALIGNMENT_SHIFT;
652 break;
653
654 case RINGBUF_TYPE_TIME_STAMP:
655 event->len =
656 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
657 >> RB_ALIGNMENT_SHIFT;
658 break;
659
660 case RINGBUF_TYPE_DATA:
661 length -= RB_EVNT_HDR_SIZE;
662 if (length > RB_MAX_SMALL_DATA) {
663 event->len = 0;
664 event->array[0] = length;
665 } else
666 event->len =
667 (length + (RB_ALIGNMENT-1))
668 >> RB_ALIGNMENT_SHIFT;
669 break;
670 default:
671 BUG();
672 }
673}
674
675static inline unsigned rb_calculate_event_length(unsigned length)
676{
677 struct ring_buffer_event event; /* Used only for sizeof array */
678
679 /* zero length can cause confusions */
680 if (!length)
681 length = 1;
682
683 if (length > RB_MAX_SMALL_DATA)
684 length += sizeof(event.array[0]);
685
686 length += RB_EVNT_HDR_SIZE;
687 length = ALIGN(length, RB_ALIGNMENT);
688
689 return length;
690}
691
692static struct ring_buffer_event *
693__rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
694 unsigned type, unsigned long length, u64 *ts)
695{
696 struct buffer_page *head_page, *tail_page;
697 unsigned long tail;
698 struct ring_buffer *buffer = cpu_buffer->buffer;
699 struct ring_buffer_event *event;
700
701 tail_page = cpu_buffer->tail_page;
702 head_page = cpu_buffer->head_page;
703 tail = cpu_buffer->tail;
704
705 if (tail + length > BUF_PAGE_SIZE) {
706 struct buffer_page *next_page = tail_page;
707
708 rb_inc_page(cpu_buffer, &next_page);
709
710 if (next_page == head_page) {
711 if (!(buffer->flags & RB_FL_OVERWRITE))
712 return NULL;
713
714 /* count overflows */
715 rb_update_overflow(cpu_buffer);
716
717 rb_inc_page(cpu_buffer, &head_page);
718 cpu_buffer->head_page = head_page;
719 rb_reset_read_page(cpu_buffer);
720 }
721
722 if (tail != BUF_PAGE_SIZE) {
723 event = rb_page_index(tail_page, tail);
724 /* page padding */
725 event->type = RINGBUF_TYPE_PADDING;
726 }
727
728 tail_page->size = tail;
729 tail_page = next_page;
730 tail_page->size = 0;
731 tail = 0;
732 cpu_buffer->tail_page = tail_page;
733 cpu_buffer->tail = tail;
734 rb_add_stamp(cpu_buffer, ts);
735 }
736
737 BUG_ON(tail + length > BUF_PAGE_SIZE);
738
739 event = rb_page_index(tail_page, tail);
740 rb_update_event(event, type, length);
741
742 return event;
743}
744
745static int
746rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
747 u64 *ts, u64 *delta)
748{
749 struct ring_buffer_event *event;
750 static int once;
751
752 if (unlikely(*delta > (1ULL << 59) && !once++)) {
753 printk(KERN_WARNING "Delta way too big! %llu"
754 " ts=%llu write stamp = %llu\n",
755 *delta, *ts, cpu_buffer->write_stamp);
756 WARN_ON(1);
757 }
758
759 /*
760 * The delta is too big, we to add a
761 * new timestamp.
762 */
763 event = __rb_reserve_next(cpu_buffer,
764 RINGBUF_TYPE_TIME_EXTEND,
765 RB_LEN_TIME_EXTEND,
766 ts);
767 if (!event)
768 return -1;
769
770 /* check to see if we went to the next page */
771 if (cpu_buffer->tail) {
772 /* Still on same page, update timestamp */
773 event->time_delta = *delta & TS_MASK;
774 event->array[0] = *delta >> TS_SHIFT;
775 /* commit the time event */
776 cpu_buffer->tail +=
777 rb_event_length(event);
778 cpu_buffer->write_stamp = *ts;
779 *delta = 0;
780 }
781
782 return 0;
783}
784
785static struct ring_buffer_event *
786rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
787 unsigned type, unsigned long length)
788{
789 struct ring_buffer_event *event;
790 u64 ts, delta;
791
792 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
793
794 if (cpu_buffer->tail) {
795 delta = ts - cpu_buffer->write_stamp;
796
797 if (test_time_stamp(delta)) {
798 int ret;
799
800 ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
801 if (ret < 0)
802 return NULL;
803 }
804 } else {
805 rb_add_stamp(cpu_buffer, &ts);
806 delta = 0;
807 }
808
809 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
810 if (!event)
811 return NULL;
812
813 /* If the reserve went to the next page, our delta is zero */
814 if (!cpu_buffer->tail)
815 delta = 0;
816
817 event->time_delta = delta;
818
819 return event;
820}
821
822/**
823 * ring_buffer_lock_reserve - reserve a part of the buffer
824 * @buffer: the ring buffer to reserve from
825 * @length: the length of the data to reserve (excluding event header)
826 * @flags: a pointer to save the interrupt flags
827 *
828 * Returns a reseverd event on the ring buffer to copy directly to.
829 * The user of this interface will need to get the body to write into
830 * and can use the ring_buffer_event_data() interface.
831 *
832 * The length is the length of the data needed, not the event length
833 * which also includes the event header.
834 *
835 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
836 * If NULL is returned, then nothing has been allocated or locked.
837 */
838struct ring_buffer_event *
839ring_buffer_lock_reserve(struct ring_buffer *buffer,
840 unsigned long length,
841 unsigned long *flags)
842{
843 struct ring_buffer_per_cpu *cpu_buffer;
844 struct ring_buffer_event *event;
845 int cpu;
846
847 if (atomic_read(&buffer->record_disabled))
848 return NULL;
849
70255b5e 850 local_irq_save(*flags);
7a8e76a3
SR
851 cpu = raw_smp_processor_id();
852
853 if (!cpu_isset(cpu, buffer->cpumask))
854 goto out_irq;
855
856 cpu_buffer = buffer->buffers[cpu];
857 spin_lock(&cpu_buffer->lock);
858
859 if (atomic_read(&cpu_buffer->record_disabled))
860 goto no_record;
861
862 length = rb_calculate_event_length(length);
863 if (length > BUF_PAGE_SIZE)
864 return NULL;
865
866 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
867 if (!event)
868 goto no_record;
869
870 return event;
871
872 no_record:
873 spin_unlock(&cpu_buffer->lock);
874 out_irq:
875 local_irq_restore(*flags);
876 return NULL;
877}
878
879static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
880 struct ring_buffer_event *event)
881{
882 cpu_buffer->tail += rb_event_length(event);
883 cpu_buffer->tail_page->size = cpu_buffer->tail;
884 cpu_buffer->write_stamp += event->time_delta;
885 cpu_buffer->entries++;
886}
887
888/**
889 * ring_buffer_unlock_commit - commit a reserved
890 * @buffer: The buffer to commit to
891 * @event: The event pointer to commit.
892 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
893 *
894 * This commits the data to the ring buffer, and releases any locks held.
895 *
896 * Must be paired with ring_buffer_lock_reserve.
897 */
898int ring_buffer_unlock_commit(struct ring_buffer *buffer,
899 struct ring_buffer_event *event,
900 unsigned long flags)
901{
902 struct ring_buffer_per_cpu *cpu_buffer;
903 int cpu = raw_smp_processor_id();
904
905 cpu_buffer = buffer->buffers[cpu];
906
907 assert_spin_locked(&cpu_buffer->lock);
908
909 rb_commit(cpu_buffer, event);
910
911 spin_unlock(&cpu_buffer->lock);
70255b5e 912 local_irq_restore(flags);
7a8e76a3
SR
913
914 return 0;
915}
916
917/**
918 * ring_buffer_write - write data to the buffer without reserving
919 * @buffer: The ring buffer to write to.
920 * @length: The length of the data being written (excluding the event header)
921 * @data: The data to write to the buffer.
922 *
923 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
924 * one function. If you already have the data to write to the buffer, it
925 * may be easier to simply call this function.
926 *
927 * Note, like ring_buffer_lock_reserve, the length is the length of the data
928 * and not the length of the event which would hold the header.
929 */
930int ring_buffer_write(struct ring_buffer *buffer,
931 unsigned long length,
932 void *data)
933{
934 struct ring_buffer_per_cpu *cpu_buffer;
935 struct ring_buffer_event *event;
936 unsigned long event_length, flags;
937 void *body;
938 int ret = -EBUSY;
939 int cpu;
940
941 if (atomic_read(&buffer->record_disabled))
942 return -EBUSY;
943
944 local_irq_save(flags);
945 cpu = raw_smp_processor_id();
946
947 if (!cpu_isset(cpu, buffer->cpumask))
948 goto out_irq;
949
950 cpu_buffer = buffer->buffers[cpu];
951 spin_lock(&cpu_buffer->lock);
952
953 if (atomic_read(&cpu_buffer->record_disabled))
954 goto out;
955
956 event_length = rb_calculate_event_length(length);
957 event = rb_reserve_next_event(cpu_buffer,
958 RINGBUF_TYPE_DATA, event_length);
959 if (!event)
960 goto out;
961
962 body = rb_event_data(event);
963
964 memcpy(body, data, length);
965
966 rb_commit(cpu_buffer, event);
967
968 ret = 0;
969 out:
970 spin_unlock(&cpu_buffer->lock);
971 out_irq:
972 local_irq_restore(flags);
973
974 return ret;
975}
976
977/**
978 * ring_buffer_lock - lock the ring buffer
979 * @buffer: The ring buffer to lock
980 * @flags: The place to store the interrupt flags
981 *
982 * This locks all the per CPU buffers.
983 *
984 * Must be unlocked by ring_buffer_unlock.
985 */
986void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
987{
988 struct ring_buffer_per_cpu *cpu_buffer;
989 int cpu;
990
991 local_irq_save(*flags);
992
993 for_each_buffer_cpu(buffer, cpu) {
994 cpu_buffer = buffer->buffers[cpu];
995 spin_lock(&cpu_buffer->lock);
996 }
997}
998
999/**
1000 * ring_buffer_unlock - unlock a locked buffer
1001 * @buffer: The locked buffer to unlock
1002 * @flags: The interrupt flags received by ring_buffer_lock
1003 */
1004void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
1005{
1006 struct ring_buffer_per_cpu *cpu_buffer;
1007 int cpu;
1008
1009 for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
1010 if (!cpu_isset(cpu, buffer->cpumask))
1011 continue;
1012 cpu_buffer = buffer->buffers[cpu];
1013 spin_unlock(&cpu_buffer->lock);
1014 }
1015
1016 local_irq_restore(flags);
1017}
1018
1019/**
1020 * ring_buffer_record_disable - stop all writes into the buffer
1021 * @buffer: The ring buffer to stop writes to.
1022 *
1023 * This prevents all writes to the buffer. Any attempt to write
1024 * to the buffer after this will fail and return NULL.
1025 *
1026 * The caller should call synchronize_sched() after this.
1027 */
1028void ring_buffer_record_disable(struct ring_buffer *buffer)
1029{
1030 atomic_inc(&buffer->record_disabled);
1031}
1032
1033/**
1034 * ring_buffer_record_enable - enable writes to the buffer
1035 * @buffer: The ring buffer to enable writes
1036 *
1037 * Note, multiple disables will need the same number of enables
1038 * to truely enable the writing (much like preempt_disable).
1039 */
1040void ring_buffer_record_enable(struct ring_buffer *buffer)
1041{
1042 atomic_dec(&buffer->record_disabled);
1043}
1044
1045/**
1046 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1047 * @buffer: The ring buffer to stop writes to.
1048 * @cpu: The CPU buffer to stop
1049 *
1050 * This prevents all writes to the buffer. Any attempt to write
1051 * to the buffer after this will fail and return NULL.
1052 *
1053 * The caller should call synchronize_sched() after this.
1054 */
1055void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1056{
1057 struct ring_buffer_per_cpu *cpu_buffer;
1058
1059 if (!cpu_isset(cpu, buffer->cpumask))
1060 return;
1061
1062 cpu_buffer = buffer->buffers[cpu];
1063 atomic_inc(&cpu_buffer->record_disabled);
1064}
1065
1066/**
1067 * ring_buffer_record_enable_cpu - enable writes to the buffer
1068 * @buffer: The ring buffer to enable writes
1069 * @cpu: The CPU to enable.
1070 *
1071 * Note, multiple disables will need the same number of enables
1072 * to truely enable the writing (much like preempt_disable).
1073 */
1074void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1075{
1076 struct ring_buffer_per_cpu *cpu_buffer;
1077
1078 if (!cpu_isset(cpu, buffer->cpumask))
1079 return;
1080
1081 cpu_buffer = buffer->buffers[cpu];
1082 atomic_dec(&cpu_buffer->record_disabled);
1083}
1084
1085/**
1086 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1087 * @buffer: The ring buffer
1088 * @cpu: The per CPU buffer to get the entries from.
1089 */
1090unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1091{
1092 struct ring_buffer_per_cpu *cpu_buffer;
1093
1094 if (!cpu_isset(cpu, buffer->cpumask))
1095 return 0;
1096
1097 cpu_buffer = buffer->buffers[cpu];
1098 return cpu_buffer->entries;
1099}
1100
1101/**
1102 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1103 * @buffer: The ring buffer
1104 * @cpu: The per CPU buffer to get the number of overruns from
1105 */
1106unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1107{
1108 struct ring_buffer_per_cpu *cpu_buffer;
1109
1110 if (!cpu_isset(cpu, buffer->cpumask))
1111 return 0;
1112
1113 cpu_buffer = buffer->buffers[cpu];
1114 return cpu_buffer->overrun;
1115}
1116
1117/**
1118 * ring_buffer_entries - get the number of entries in a buffer
1119 * @buffer: The ring buffer
1120 *
1121 * Returns the total number of entries in the ring buffer
1122 * (all CPU entries)
1123 */
1124unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1125{
1126 struct ring_buffer_per_cpu *cpu_buffer;
1127 unsigned long entries = 0;
1128 int cpu;
1129
1130 /* if you care about this being correct, lock the buffer */
1131 for_each_buffer_cpu(buffer, cpu) {
1132 cpu_buffer = buffer->buffers[cpu];
1133 entries += cpu_buffer->entries;
1134 }
1135
1136 return entries;
1137}
1138
1139/**
1140 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1141 * @buffer: The ring buffer
1142 *
1143 * Returns the total number of overruns in the ring buffer
1144 * (all CPU entries)
1145 */
1146unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1147{
1148 struct ring_buffer_per_cpu *cpu_buffer;
1149 unsigned long overruns = 0;
1150 int cpu;
1151
1152 /* if you care about this being correct, lock the buffer */
1153 for_each_buffer_cpu(buffer, cpu) {
1154 cpu_buffer = buffer->buffers[cpu];
1155 overruns += cpu_buffer->overrun;
1156 }
1157
1158 return overruns;
1159}
1160
1161/**
1162 * ring_buffer_iter_reset - reset an iterator
1163 * @iter: The iterator to reset
1164 *
1165 * Resets the iterator, so that it will start from the beginning
1166 * again.
1167 */
1168void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1169{
1170 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1171
1172 iter->head_page = cpu_buffer->head_page;
1173 iter->head = cpu_buffer->head;
1174 rb_reset_iter_read_page(iter);
1175}
1176
1177/**
1178 * ring_buffer_iter_empty - check if an iterator has no more to read
1179 * @iter: The iterator to check
1180 */
1181int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1182{
1183 struct ring_buffer_per_cpu *cpu_buffer;
1184
1185 cpu_buffer = iter->cpu_buffer;
1186
1187 return iter->head_page == cpu_buffer->tail_page &&
1188 iter->head == cpu_buffer->tail;
1189}
1190
1191static void
1192rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1193 struct ring_buffer_event *event)
1194{
1195 u64 delta;
1196
1197 switch (event->type) {
1198 case RINGBUF_TYPE_PADDING:
1199 return;
1200
1201 case RINGBUF_TYPE_TIME_EXTEND:
1202 delta = event->array[0];
1203 delta <<= TS_SHIFT;
1204 delta += event->time_delta;
1205 cpu_buffer->read_stamp += delta;
1206 return;
1207
1208 case RINGBUF_TYPE_TIME_STAMP:
1209 /* FIXME: not implemented */
1210 return;
1211
1212 case RINGBUF_TYPE_DATA:
1213 cpu_buffer->read_stamp += event->time_delta;
1214 return;
1215
1216 default:
1217 BUG();
1218 }
1219 return;
1220}
1221
1222static void
1223rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1224 struct ring_buffer_event *event)
1225{
1226 u64 delta;
1227
1228 switch (event->type) {
1229 case RINGBUF_TYPE_PADDING:
1230 return;
1231
1232 case RINGBUF_TYPE_TIME_EXTEND:
1233 delta = event->array[0];
1234 delta <<= TS_SHIFT;
1235 delta += event->time_delta;
1236 iter->read_stamp += delta;
1237 return;
1238
1239 case RINGBUF_TYPE_TIME_STAMP:
1240 /* FIXME: not implemented */
1241 return;
1242
1243 case RINGBUF_TYPE_DATA:
1244 iter->read_stamp += event->time_delta;
1245 return;
1246
1247 default:
1248 BUG();
1249 }
1250 return;
1251}
1252
1253static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
1254{
1255 struct ring_buffer_event *event;
1256 unsigned length;
1257
1258 /*
1259 * Check if we are at the end of the buffer.
1260 */
1261 if (cpu_buffer->head >= cpu_buffer->head_page->size) {
1262 BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
1263 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1264 rb_reset_read_page(cpu_buffer);
1265 return;
1266 }
1267
1268 event = rb_head_event(cpu_buffer);
1269
1270 if (event->type == RINGBUF_TYPE_DATA)
1271 cpu_buffer->entries--;
1272
1273 length = rb_event_length(event);
1274
1275 /*
1276 * This should not be called to advance the header if we are
1277 * at the tail of the buffer.
1278 */
1279 BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
1280 (cpu_buffer->head + length > cpu_buffer->tail));
1281
1282 rb_update_read_stamp(cpu_buffer, event);
1283
1284 cpu_buffer->head += length;
1285
1286 /* check for end of page */
1287 if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
1288 (cpu_buffer->head_page != cpu_buffer->tail_page))
1289 rb_advance_head(cpu_buffer);
1290}
1291
1292static void rb_advance_iter(struct ring_buffer_iter *iter)
1293{
1294 struct ring_buffer *buffer;
1295 struct ring_buffer_per_cpu *cpu_buffer;
1296 struct ring_buffer_event *event;
1297 unsigned length;
1298
1299 cpu_buffer = iter->cpu_buffer;
1300 buffer = cpu_buffer->buffer;
1301
1302 /*
1303 * Check if we are at the end of the buffer.
1304 */
1305 if (iter->head >= iter->head_page->size) {
1306 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1307 rb_inc_page(cpu_buffer, &iter->head_page);
1308 rb_reset_iter_read_page(iter);
1309 return;
1310 }
1311
1312 event = rb_iter_head_event(iter);
1313
1314 length = rb_event_length(event);
1315
1316 /*
1317 * This should not be called to advance the header if we are
1318 * at the tail of the buffer.
1319 */
1320 BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1321 (iter->head + length > cpu_buffer->tail));
1322
1323 rb_update_iter_read_stamp(iter, event);
1324
1325 iter->head += length;
1326
1327 /* check for end of page padding */
1328 if ((iter->head >= iter->head_page->size) &&
1329 (iter->head_page != cpu_buffer->tail_page))
1330 rb_advance_iter(iter);
1331}
1332
1333/**
1334 * ring_buffer_peek - peek at the next event to be read
1335 * @buffer: The ring buffer to read
1336 * @cpu: The cpu to peak at
1337 * @ts: The timestamp counter of this event.
1338 *
1339 * This will return the event that will be read next, but does
1340 * not consume the data.
1341 */
1342struct ring_buffer_event *
1343ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1344{
1345 struct ring_buffer_per_cpu *cpu_buffer;
1346 struct ring_buffer_event *event;
1347
1348 if (!cpu_isset(cpu, buffer->cpumask))
1349 return NULL;
1350
1351 cpu_buffer = buffer->buffers[cpu];
1352
1353 again:
1354 if (rb_per_cpu_empty(cpu_buffer))
1355 return NULL;
1356
1357 event = rb_head_event(cpu_buffer);
1358
1359 switch (event->type) {
1360 case RINGBUF_TYPE_PADDING:
1361 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1362 rb_reset_read_page(cpu_buffer);
1363 goto again;
1364
1365 case RINGBUF_TYPE_TIME_EXTEND:
1366 /* Internal data, OK to advance */
1367 rb_advance_head(cpu_buffer);
1368 goto again;
1369
1370 case RINGBUF_TYPE_TIME_STAMP:
1371 /* FIXME: not implemented */
1372 rb_advance_head(cpu_buffer);
1373 goto again;
1374
1375 case RINGBUF_TYPE_DATA:
1376 if (ts) {
1377 *ts = cpu_buffer->read_stamp + event->time_delta;
1378 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1379 }
1380 return event;
1381
1382 default:
1383 BUG();
1384 }
1385
1386 return NULL;
1387}
1388
1389/**
1390 * ring_buffer_iter_peek - peek at the next event to be read
1391 * @iter: The ring buffer iterator
1392 * @ts: The timestamp counter of this event.
1393 *
1394 * This will return the event that will be read next, but does
1395 * not increment the iterator.
1396 */
1397struct ring_buffer_event *
1398ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1399{
1400 struct ring_buffer *buffer;
1401 struct ring_buffer_per_cpu *cpu_buffer;
1402 struct ring_buffer_event *event;
1403
1404 if (ring_buffer_iter_empty(iter))
1405 return NULL;
1406
1407 cpu_buffer = iter->cpu_buffer;
1408 buffer = cpu_buffer->buffer;
1409
1410 again:
1411 if (rb_per_cpu_empty(cpu_buffer))
1412 return NULL;
1413
1414 event = rb_iter_head_event(iter);
1415
1416 switch (event->type) {
1417 case RINGBUF_TYPE_PADDING:
1418 rb_inc_page(cpu_buffer, &iter->head_page);
1419 rb_reset_iter_read_page(iter);
1420 goto again;
1421
1422 case RINGBUF_TYPE_TIME_EXTEND:
1423 /* Internal data, OK to advance */
1424 rb_advance_iter(iter);
1425 goto again;
1426
1427 case RINGBUF_TYPE_TIME_STAMP:
1428 /* FIXME: not implemented */
1429 rb_advance_iter(iter);
1430 goto again;
1431
1432 case RINGBUF_TYPE_DATA:
1433 if (ts) {
1434 *ts = iter->read_stamp + event->time_delta;
1435 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1436 }
1437 return event;
1438
1439 default:
1440 BUG();
1441 }
1442
1443 return NULL;
1444}
1445
1446/**
1447 * ring_buffer_consume - return an event and consume it
1448 * @buffer: The ring buffer to get the next event from
1449 *
1450 * Returns the next event in the ring buffer, and that event is consumed.
1451 * Meaning, that sequential reads will keep returning a different event,
1452 * and eventually empty the ring buffer if the producer is slower.
1453 */
1454struct ring_buffer_event *
1455ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1456{
1457 struct ring_buffer_per_cpu *cpu_buffer;
1458 struct ring_buffer_event *event;
1459
1460 if (!cpu_isset(cpu, buffer->cpumask))
1461 return NULL;
1462
1463 event = ring_buffer_peek(buffer, cpu, ts);
1464 if (!event)
1465 return NULL;
1466
1467 cpu_buffer = buffer->buffers[cpu];
1468 rb_advance_head(cpu_buffer);
1469
1470 return event;
1471}
1472
1473/**
1474 * ring_buffer_read_start - start a non consuming read of the buffer
1475 * @buffer: The ring buffer to read from
1476 * @cpu: The cpu buffer to iterate over
1477 *
1478 * This starts up an iteration through the buffer. It also disables
1479 * the recording to the buffer until the reading is finished.
1480 * This prevents the reading from being corrupted. This is not
1481 * a consuming read, so a producer is not expected.
1482 *
1483 * Must be paired with ring_buffer_finish.
1484 */
1485struct ring_buffer_iter *
1486ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1487{
1488 struct ring_buffer_per_cpu *cpu_buffer;
1489 struct ring_buffer_iter *iter;
1490
1491 if (!cpu_isset(cpu, buffer->cpumask))
1492 return NULL;
1493
1494 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1495 if (!iter)
1496 return NULL;
1497
1498 cpu_buffer = buffer->buffers[cpu];
1499
1500 iter->cpu_buffer = cpu_buffer;
1501
1502 atomic_inc(&cpu_buffer->record_disabled);
1503 synchronize_sched();
1504
1505 spin_lock(&cpu_buffer->lock);
1506 iter->head = cpu_buffer->head;
1507 iter->head_page = cpu_buffer->head_page;
1508 rb_reset_iter_read_page(iter);
1509 spin_unlock(&cpu_buffer->lock);
1510
1511 return iter;
1512}
1513
1514/**
1515 * ring_buffer_finish - finish reading the iterator of the buffer
1516 * @iter: The iterator retrieved by ring_buffer_start
1517 *
1518 * This re-enables the recording to the buffer, and frees the
1519 * iterator.
1520 */
1521void
1522ring_buffer_read_finish(struct ring_buffer_iter *iter)
1523{
1524 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1525
1526 atomic_dec(&cpu_buffer->record_disabled);
1527 kfree(iter);
1528}
1529
1530/**
1531 * ring_buffer_read - read the next item in the ring buffer by the iterator
1532 * @iter: The ring buffer iterator
1533 * @ts: The time stamp of the event read.
1534 *
1535 * This reads the next event in the ring buffer and increments the iterator.
1536 */
1537struct ring_buffer_event *
1538ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1539{
1540 struct ring_buffer_event *event;
1541
1542 event = ring_buffer_iter_peek(iter, ts);
1543 if (!event)
1544 return NULL;
1545
1546 rb_advance_iter(iter);
1547
1548 return event;
1549}
1550
1551/**
1552 * ring_buffer_size - return the size of the ring buffer (in bytes)
1553 * @buffer: The ring buffer.
1554 */
1555unsigned long ring_buffer_size(struct ring_buffer *buffer)
1556{
1557 return BUF_PAGE_SIZE * buffer->pages;
1558}
1559
1560static void
1561rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1562{
1563 cpu_buffer->head_page
1564 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1565 cpu_buffer->tail_page
1566 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1567
1568 cpu_buffer->head = cpu_buffer->tail = 0;
1569 cpu_buffer->overrun = 0;
1570 cpu_buffer->entries = 0;
1571}
1572
1573/**
1574 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1575 * @buffer: The ring buffer to reset a per cpu buffer of
1576 * @cpu: The CPU buffer to be reset
1577 */
1578void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1579{
1580 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1581 unsigned long flags;
1582
1583 if (!cpu_isset(cpu, buffer->cpumask))
1584 return;
1585
70255b5e 1586 local_irq_save(flags);
7a8e76a3
SR
1587 spin_lock(&cpu_buffer->lock);
1588
1589 rb_reset_cpu(cpu_buffer);
1590
1591 spin_unlock(&cpu_buffer->lock);
70255b5e 1592 local_irq_restore(flags);
7a8e76a3
SR
1593}
1594
1595/**
1596 * ring_buffer_reset - reset a ring buffer
1597 * @buffer: The ring buffer to reset all cpu buffers
1598 */
1599void ring_buffer_reset(struct ring_buffer *buffer)
1600{
1601 unsigned long flags;
1602 int cpu;
1603
1604 ring_buffer_lock(buffer, &flags);
1605
1606 for_each_buffer_cpu(buffer, cpu)
1607 rb_reset_cpu(buffer->buffers[cpu]);
1608
1609 ring_buffer_unlock(buffer, flags);
1610}
1611
1612/**
1613 * rind_buffer_empty - is the ring buffer empty?
1614 * @buffer: The ring buffer to test
1615 */
1616int ring_buffer_empty(struct ring_buffer *buffer)
1617{
1618 struct ring_buffer_per_cpu *cpu_buffer;
1619 int cpu;
1620
1621 /* yes this is racy, but if you don't like the race, lock the buffer */
1622 for_each_buffer_cpu(buffer, cpu) {
1623 cpu_buffer = buffer->buffers[cpu];
1624 if (!rb_per_cpu_empty(cpu_buffer))
1625 return 0;
1626 }
1627 return 1;
1628}
1629
1630/**
1631 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1632 * @buffer: The ring buffer
1633 * @cpu: The CPU buffer to test
1634 */
1635int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1636{
1637 struct ring_buffer_per_cpu *cpu_buffer;
1638
1639 if (!cpu_isset(cpu, buffer->cpumask))
1640 return 1;
1641
1642 cpu_buffer = buffer->buffers[cpu];
1643 return rb_per_cpu_empty(cpu_buffer);
1644}
1645
1646/**
1647 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1648 * @buffer_a: One buffer to swap with
1649 * @buffer_b: The other buffer to swap with
1650 *
1651 * This function is useful for tracers that want to take a "snapshot"
1652 * of a CPU buffer and has another back up buffer lying around.
1653 * it is expected that the tracer handles the cpu buffer not being
1654 * used at the moment.
1655 */
1656int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1657 struct ring_buffer *buffer_b, int cpu)
1658{
1659 struct ring_buffer_per_cpu *cpu_buffer_a;
1660 struct ring_buffer_per_cpu *cpu_buffer_b;
1661
1662 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1663 !cpu_isset(cpu, buffer_b->cpumask))
1664 return -EINVAL;
1665
1666 /* At least make sure the two buffers are somewhat the same */
1667 if (buffer_a->size != buffer_b->size ||
1668 buffer_a->pages != buffer_b->pages)
1669 return -EINVAL;
1670
1671 cpu_buffer_a = buffer_a->buffers[cpu];
1672 cpu_buffer_b = buffer_b->buffers[cpu];
1673
1674 /*
1675 * We can't do a synchronize_sched here because this
1676 * function can be called in atomic context.
1677 * Normally this will be called from the same CPU as cpu.
1678 * If not it's up to the caller to protect this.
1679 */
1680 atomic_inc(&cpu_buffer_a->record_disabled);
1681 atomic_inc(&cpu_buffer_b->record_disabled);
1682
1683 buffer_a->buffers[cpu] = cpu_buffer_b;
1684 buffer_b->buffers[cpu] = cpu_buffer_a;
1685
1686 cpu_buffer_b->buffer = buffer_a;
1687 cpu_buffer_a->buffer = buffer_b;
1688
1689 atomic_dec(&cpu_buffer_a->record_disabled);
1690 atomic_dec(&cpu_buffer_b->record_disabled);
1691
1692 return 0;
1693}
1694