]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blobdiff - kernel/trace/ring_buffer.c
ring-buffer: Check for valid buffer before changing size
[mirror_ubuntu-bionic-kernel.git] / kernel / trace / ring_buffer.c
index cf8d11e91efdf92d95dad58d6fa771d2ac998786..1d0f6a8a0e5e83680c0df3b28836a5f6a2103a39 100644 (file)
@@ -23,6 +23,8 @@
 #include <asm/local.h>
 #include "trace.h"
 
+static void update_pages_handler(struct work_struct *work);
+
 /*
  * The ring buffer header is special. We must manually up keep it.
  */
@@ -449,6 +451,7 @@ struct ring_buffer_per_cpu {
        raw_spinlock_t                  reader_lock;    /* serialize readers */
        arch_spinlock_t                 lock;
        struct lock_class_key           lock_key;
+       unsigned int                    nr_pages;
        struct list_head                *pages;
        struct buffer_page              *head_page;     /* read from head */
        struct buffer_page              *tail_page;     /* write to tail */
@@ -466,13 +469,18 @@ struct ring_buffer_per_cpu {
        unsigned long                   read_bytes;
        u64                             write_stamp;
        u64                             read_stamp;
+       /* ring buffer pages to update, > 0 to add, < 0 to remove */
+       int                             nr_pages_to_update;
+       struct list_head                new_pages; /* new pages to add */
+       struct work_struct              update_pages_work;
+       struct completion               update_done;
 };
 
 struct ring_buffer {
-       unsigned                        pages;
        unsigned                        flags;
        int                             cpus;
        atomic_t                        record_disabled;
+       atomic_t                        resize_disabled;
        cpumask_var_t                   cpumask;
 
        struct lock_class_key           *reader_lock_key;
@@ -937,6 +945,10 @@ static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
        struct list_head *head = cpu_buffer->pages;
        struct buffer_page *bpage, *tmp;
 
+       /* Reset the head page if it exists */
+       if (cpu_buffer->head_page)
+               rb_set_head_page(cpu_buffer);
+
        rb_head_page_deactivate(cpu_buffer);
 
        if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
@@ -963,14 +975,10 @@ static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
        return 0;
 }
 
-static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
-                            unsigned nr_pages)
+static int __rb_allocate_pages(int nr_pages, struct list_head *pages, int cpu)
 {
+       int i;
        struct buffer_page *bpage, *tmp;
-       LIST_HEAD(pages);
-       unsigned i;
-
-       WARN_ON(!nr_pages);
 
        for (i = 0; i < nr_pages; i++) {
                struct page *page;
@@ -981,15 +989,13 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
                 */
                bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
                                    GFP_KERNEL | __GFP_NORETRY,
-                                   cpu_to_node(cpu_buffer->cpu));
+                                   cpu_to_node(cpu));
                if (!bpage)
                        goto free_pages;
 
-               rb_check_bpage(cpu_buffer, bpage);
+               list_add(&bpage->list, pages);
 
-               list_add(&bpage->list, &pages);
-
-               page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu),
+               page = alloc_pages_node(cpu_to_node(cpu),
                                        GFP_KERNEL | __GFP_NORETRY, 0);
                if (!page)
                        goto free_pages;
@@ -997,6 +1003,27 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
                rb_init_page(bpage->page);
        }
 
+       return 0;
+
+free_pages:
+       list_for_each_entry_safe(bpage, tmp, pages, list) {
+               list_del_init(&bpage->list);
+               free_buffer_page(bpage);
+       }
+
+       return -ENOMEM;
+}
+
+static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
+                            unsigned nr_pages)
+{
+       LIST_HEAD(pages);
+
+       WARN_ON(!nr_pages);
+
+       if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
+               return -ENOMEM;
+
        /*
         * The ring buffer page list is a circular list that does not
         * start and end with a list head. All page list items point to
@@ -1005,20 +1032,15 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
        cpu_buffer->pages = pages.next;
        list_del(&pages);
 
+       cpu_buffer->nr_pages = nr_pages;
+
        rb_check_pages(cpu_buffer);
 
        return 0;
-
- free_pages:
-       list_for_each_entry_safe(bpage, tmp, &pages, list) {
-               list_del_init(&bpage->list);
-               free_buffer_page(bpage);
-       }
-       return -ENOMEM;
 }
 
 static struct ring_buffer_per_cpu *
-rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
+rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
        struct buffer_page *bpage;
@@ -1035,6 +1057,8 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
        raw_spin_lock_init(&cpu_buffer->reader_lock);
        lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
        cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
+       INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
+       init_completion(&cpu_buffer->update_done);
 
        bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
                            GFP_KERNEL, cpu_to_node(cpu));
@@ -1052,7 +1076,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
 
        INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
 
-       ret = rb_allocate_pages(cpu_buffer, buffer->pages);
+       ret = rb_allocate_pages(cpu_buffer, nr_pages);
        if (ret < 0)
                goto fail_free_reader;
 
@@ -1113,7 +1137,7 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
 {
        struct ring_buffer *buffer;
        int bsize;
-       int cpu;
+       int cpu, nr_pages;
 
        /* keep it in its own cache line */
        buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
@@ -1124,14 +1148,14 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
        if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
                goto fail_free_buffer;
 
-       buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
+       nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
        buffer->flags = flags;
        buffer->clock = trace_clock_local;
        buffer->reader_lock_key = key;
 
        /* need at least two pages */
-       if (buffer->pages < 2)
-               buffer->pages = 2;
+       if (nr_pages < 2)
+               nr_pages = 2;
 
        /*
         * In case of non-hotplug cpu, if the ring-buffer is allocated
@@ -1154,7 +1178,7 @@ struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
 
        for_each_buffer_cpu(buffer, cpu) {
                buffer->buffers[cpu] =
-                       rb_allocate_cpu_buffer(buffer, cpu);
+                       rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
                if (!buffer->buffers[cpu])
                        goto fail_free_buffers;
        }
@@ -1222,58 +1246,222 @@ void ring_buffer_set_clock(struct ring_buffer *buffer,
 
 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
 
-static void
-rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
+static inline unsigned long rb_page_entries(struct buffer_page *bpage)
 {
-       struct buffer_page *bpage;
-       struct list_head *p;
-       unsigned i;
+       return local_read(&bpage->entries) & RB_WRITE_MASK;
+}
+
+static inline unsigned long rb_page_write(struct buffer_page *bpage)
+{
+       return local_read(&bpage->write) & RB_WRITE_MASK;
+}
+
+static int
+rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages)
+{
+       struct list_head *tail_page, *to_remove, *next_page;
+       struct buffer_page *to_remove_page, *tmp_iter_page;
+       struct buffer_page *last_page, *first_page;
+       unsigned int nr_removed;
+       unsigned long head_bit;
+       int page_entries;
+
+       head_bit = 0;
 
        raw_spin_lock_irq(&cpu_buffer->reader_lock);
-       rb_head_page_deactivate(cpu_buffer);
+       atomic_inc(&cpu_buffer->record_disabled);
+       /*
+        * We don't race with the readers since we have acquired the reader
+        * lock. We also don't race with writers after disabling recording.
+        * This makes it easy to figure out the first and the last page to be
+        * removed from the list. We unlink all the pages in between including
+        * the first and last pages. This is done in a busy loop so that we
+        * lose the least number of traces.
+        * The pages are freed after we restart recording and unlock readers.
+        */
+       tail_page = &cpu_buffer->tail_page->list;
 
-       for (i = 0; i < nr_pages; i++) {
-               if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
-                       goto out;
-               p = cpu_buffer->pages->next;
-               bpage = list_entry(p, struct buffer_page, list);
-               list_del_init(&bpage->list);
-               free_buffer_page(bpage);
+       /*
+        * tail page might be on reader page, we remove the next page
+        * from the ring buffer
+        */
+       if (cpu_buffer->tail_page == cpu_buffer->reader_page)
+               tail_page = rb_list_head(tail_page->next);
+       to_remove = tail_page;
+
+       /* start of pages to remove */
+       first_page = list_entry(rb_list_head(to_remove->next),
+                               struct buffer_page, list);
+
+       for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
+               to_remove = rb_list_head(to_remove)->next;
+               head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
        }
-       if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
-               goto out;
 
-       rb_reset_cpu(cpu_buffer);
-       rb_check_pages(cpu_buffer);
+       next_page = rb_list_head(to_remove)->next;
 
-out:
+       /*
+        * Now we remove all pages between tail_page and next_page.
+        * Make sure that we have head_bit value preserved for the
+        * next page
+        */
+       tail_page->next = (struct list_head *)((unsigned long)next_page |
+                                               head_bit);
+       next_page = rb_list_head(next_page);
+       next_page->prev = tail_page;
+
+       /* make sure pages points to a valid page in the ring buffer */
+       cpu_buffer->pages = next_page;
+
+       /* update head page */
+       if (head_bit)
+               cpu_buffer->head_page = list_entry(next_page,
+                                               struct buffer_page, list);
+
+       /*
+        * change read pointer to make sure any read iterators reset
+        * themselves
+        */
+       cpu_buffer->read = 0;
+
+       /* pages are removed, resume tracing and then free the pages */
+       atomic_dec(&cpu_buffer->record_disabled);
        raw_spin_unlock_irq(&cpu_buffer->reader_lock);
+
+       RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
+
+       /* last buffer page to remove */
+       last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
+                               list);
+       tmp_iter_page = first_page;
+
+       do {
+               to_remove_page = tmp_iter_page;
+               rb_inc_page(cpu_buffer, &tmp_iter_page);
+
+               /* update the counters */
+               page_entries = rb_page_entries(to_remove_page);
+               if (page_entries) {
+                       /*
+                        * If something was added to this page, it was full
+                        * since it is not the tail page. So we deduct the
+                        * bytes consumed in ring buffer from here.
+                        * No need to update overruns, since this page is
+                        * deleted from ring buffer and its entries are
+                        * already accounted for.
+                        */
+                       local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
+               }
+
+               /*
+                * We have already removed references to this list item, just
+                * free up the buffer_page and its page
+                */
+               free_buffer_page(to_remove_page);
+               nr_removed--;
+
+       } while (to_remove_page != last_page);
+
+       RB_WARN_ON(cpu_buffer, nr_removed);
+
+       return nr_removed == 0;
 }
 
-static void
-rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
-               struct list_head *pages, unsigned nr_pages)
+static int
+rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
 {
-       struct buffer_page *bpage;
-       struct list_head *p;
-       unsigned i;
+       struct list_head *pages = &cpu_buffer->new_pages;
+       int retries, success;
 
        raw_spin_lock_irq(&cpu_buffer->reader_lock);
-       rb_head_page_deactivate(cpu_buffer);
+       /*
+        * We are holding the reader lock, so the reader page won't be swapped
+        * in the ring buffer. Now we are racing with the writer trying to
+        * move head page and the tail page.
+        * We are going to adapt the reader page update process where:
+        * 1. We first splice the start and end of list of new pages between
+        *    the head page and its previous page.
+        * 2. We cmpxchg the prev_page->next to point from head page to the
+        *    start of new pages list.
+        * 3. Finally, we update the head->prev to the end of new list.
+        *
+        * We will try this process 10 times, to make sure that we don't keep
+        * spinning.
+        */
+       retries = 10;
+       success = 0;
+       while (retries--) {
+               struct list_head *head_page, *prev_page, *r;
+               struct list_head *last_page, *first_page;
+               struct list_head *head_page_with_bit;
 
-       for (i = 0; i < nr_pages; i++) {
-               if (RB_WARN_ON(cpu_buffer, list_empty(pages)))
-                       goto out;
-               p = pages->next;
-               bpage = list_entry(p, struct buffer_page, list);
-               list_del_init(&bpage->list);
-               list_add_tail(&bpage->list, cpu_buffer->pages);
+               head_page = &rb_set_head_page(cpu_buffer)->list;
+               prev_page = head_page->prev;
+
+               first_page = pages->next;
+               last_page  = pages->prev;
+
+               head_page_with_bit = (struct list_head *)
+                                    ((unsigned long)head_page | RB_PAGE_HEAD);
+
+               last_page->next = head_page_with_bit;
+               first_page->prev = prev_page;
+
+               r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
+
+               if (r == head_page_with_bit) {
+                       /*
+                        * yay, we replaced the page pointer to our new list,
+                        * now, we just have to update to head page's prev
+                        * pointer to point to end of list
+                        */
+                       head_page->prev = last_page;
+                       success = 1;
+                       break;
+               }
        }
-       rb_reset_cpu(cpu_buffer);
-       rb_check_pages(cpu_buffer);
 
-out:
+       if (success)
+               INIT_LIST_HEAD(pages);
+       /*
+        * If we weren't successful in adding in new pages, warn and stop
+        * tracing
+        */
+       RB_WARN_ON(cpu_buffer, !success);
        raw_spin_unlock_irq(&cpu_buffer->reader_lock);
+
+       /* free pages if they weren't inserted */
+       if (!success) {
+               struct buffer_page *bpage, *tmp;
+               list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
+                                        list) {
+                       list_del_init(&bpage->list);
+                       free_buffer_page(bpage);
+               }
+       }
+       return success;
+}
+
+static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       int success;
+
+       if (cpu_buffer->nr_pages_to_update > 0)
+               success = rb_insert_pages(cpu_buffer);
+       else
+               success = rb_remove_pages(cpu_buffer,
+                                       -cpu_buffer->nr_pages_to_update);
+
+       if (success)
+               cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
+}
+
+static void update_pages_handler(struct work_struct *work)
+{
+       struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
+                       struct ring_buffer_per_cpu, update_pages_work);
+       rb_update_pages(cpu_buffer);
+       complete(&cpu_buffer->update_done);
 }
 
 /**
@@ -1283,16 +1471,14 @@ out:
  *
  * Minimum size is 2 * BUF_PAGE_SIZE.
  *
- * Returns -1 on failure.
+ * Returns 0 on success and < 0 on failure.
  */
-int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
+int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
+                       int cpu_id)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
-       unsigned nr_pages, rm_pages, new_pages;
-       struct buffer_page *bpage, *tmp;
-       unsigned long buffer_size;
-       LIST_HEAD(pages);
-       int i, cpu;
+       unsigned nr_pages;
+       int cpu, err = 0;
 
        /*
         * Always succeed at resizing a non-existent buffer:
@@ -1300,115 +1486,161 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
        if (!buffer)
                return size;
 
+       /* Make sure the requested buffer exists */
+       if (cpu_id != RING_BUFFER_ALL_CPUS &&
+           !cpumask_test_cpu(cpu_id, buffer->cpumask))
+               return size;
+
        size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
        size *= BUF_PAGE_SIZE;
-       buffer_size = buffer->pages * BUF_PAGE_SIZE;
 
        /* we need a minimum of two pages */
        if (size < BUF_PAGE_SIZE * 2)
                size = BUF_PAGE_SIZE * 2;
 
-       if (size == buffer_size)
-               return size;
-
-       atomic_inc(&buffer->record_disabled);
+       nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
 
-       /* Make sure all writers are done with this buffer. */
-       synchronize_sched();
+       /*
+        * Don't succeed if resizing is disabled, as a reader might be
+        * manipulating the ring buffer and is expecting a sane state while
+        * this is true.
+        */
+       if (atomic_read(&buffer->resize_disabled))
+               return -EBUSY;
 
+       /* prevent another thread from changing buffer sizes */
        mutex_lock(&buffer->mutex);
-       get_online_cpus();
-
-       nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
 
-       if (size < buffer_size) {
+       if (cpu_id == RING_BUFFER_ALL_CPUS) {
+               /* calculate the pages to update */
+               for_each_buffer_cpu(buffer, cpu) {
+                       cpu_buffer = buffer->buffers[cpu];
 
-               /* easy case, just free pages */
-               if (RB_WARN_ON(buffer, nr_pages >= buffer->pages))
-                       goto out_fail;
+                       cpu_buffer->nr_pages_to_update = nr_pages -
+                                                       cpu_buffer->nr_pages;
+                       /*
+                        * nothing more to do for removing pages or no update
+                        */
+                       if (cpu_buffer->nr_pages_to_update <= 0)
+                               continue;
+                       /*
+                        * to add pages, make sure all new pages can be
+                        * allocated without receiving ENOMEM
+                        */
+                       INIT_LIST_HEAD(&cpu_buffer->new_pages);
+                       if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
+                                               &cpu_buffer->new_pages, cpu)) {
+                               /* not enough memory for new pages */
+                               err = -ENOMEM;
+                               goto out_err;
+                       }
+               }
 
-               rm_pages = buffer->pages - nr_pages;
+               get_online_cpus();
+               /*
+                * Fire off all the required work handlers
+                * We can't schedule on offline CPUs, but it's not necessary
+                * since we can change their buffer sizes without any race.
+                */
+               for_each_buffer_cpu(buffer, cpu) {
+                       cpu_buffer = buffer->buffers[cpu];
+                       if (!cpu_buffer->nr_pages_to_update)
+                               continue;
+
+                       if (cpu_online(cpu))
+                               schedule_work_on(cpu,
+                                               &cpu_buffer->update_pages_work);
+                       else
+                               rb_update_pages(cpu_buffer);
+               }
 
+               /* wait for all the updates to complete */
                for_each_buffer_cpu(buffer, cpu) {
                        cpu_buffer = buffer->buffers[cpu];
-                       rb_remove_pages(cpu_buffer, rm_pages);
+                       if (!cpu_buffer->nr_pages_to_update)
+                               continue;
+
+                       if (cpu_online(cpu))
+                               wait_for_completion(&cpu_buffer->update_done);
+                       cpu_buffer->nr_pages_to_update = 0;
                }
-               goto out;
-       }
 
-       /*
-        * This is a bit more difficult. We only want to add pages
-        * when we can allocate enough for all CPUs. We do this
-        * by allocating all the pages and storing them on a local
-        * link list. If we succeed in our allocation, then we
-        * add these pages to the cpu_buffers. Otherwise we just free
-        * them all and return -ENOMEM;
-        */
-       if (RB_WARN_ON(buffer, nr_pages <= buffer->pages))
-               goto out_fail;
+               put_online_cpus();
+       } else {
+               cpu_buffer = buffer->buffers[cpu_id];
 
-       new_pages = nr_pages - buffer->pages;
+               if (nr_pages == cpu_buffer->nr_pages)
+                       goto out;
 
-       for_each_buffer_cpu(buffer, cpu) {
-               for (i = 0; i < new_pages; i++) {
-                       struct page *page;
-                       /*
-                        * __GFP_NORETRY flag makes sure that the allocation
-                        * fails gracefully without invoking oom-killer and
-                        * the system is not destabilized.
-                        */
-                       bpage = kzalloc_node(ALIGN(sizeof(*bpage),
-                                                 cache_line_size()),
-                                           GFP_KERNEL | __GFP_NORETRY,
-                                           cpu_to_node(cpu));
-                       if (!bpage)
-                               goto free_pages;
-                       list_add(&bpage->list, &pages);
-                       page = alloc_pages_node(cpu_to_node(cpu),
-                                               GFP_KERNEL | __GFP_NORETRY, 0);
-                       if (!page)
-                               goto free_pages;
-                       bpage->page = page_address(page);
-                       rb_init_page(bpage->page);
+               cpu_buffer->nr_pages_to_update = nr_pages -
+                                               cpu_buffer->nr_pages;
+
+               INIT_LIST_HEAD(&cpu_buffer->new_pages);
+               if (cpu_buffer->nr_pages_to_update > 0 &&
+                       __rb_allocate_pages(cpu_buffer->nr_pages_to_update,
+                                           &cpu_buffer->new_pages, cpu_id)) {
+                       err = -ENOMEM;
+                       goto out_err;
                }
-       }
 
-       for_each_buffer_cpu(buffer, cpu) {
-               cpu_buffer = buffer->buffers[cpu];
-               rb_insert_pages(cpu_buffer, &pages, new_pages);
-       }
+               get_online_cpus();
 
-       if (RB_WARN_ON(buffer, !list_empty(&pages)))
-               goto out_fail;
+               if (cpu_online(cpu_id)) {
+                       schedule_work_on(cpu_id,
+                                        &cpu_buffer->update_pages_work);
+                       wait_for_completion(&cpu_buffer->update_done);
+               } else
+                       rb_update_pages(cpu_buffer);
+
+               cpu_buffer->nr_pages_to_update = 0;
+               put_online_cpus();
+       }
 
  out:
-       buffer->pages = nr_pages;
-       put_online_cpus();
+       /*
+        * The ring buffer resize can happen with the ring buffer
+        * enabled, so that the update disturbs the tracing as little
+        * as possible. But if the buffer is disabled, we do not need
+        * to worry about that, and we can take the time to verify
+        * that the buffer is not corrupt.
+        */
+       if (atomic_read(&buffer->record_disabled)) {
+               atomic_inc(&buffer->record_disabled);
+               /*
+                * Even though the buffer was disabled, we must make sure
+                * that it is truly disabled before calling rb_check_pages.
+                * There could have been a race between checking
+                * record_disable and incrementing it.
+                */
+               synchronize_sched();
+               for_each_buffer_cpu(buffer, cpu) {
+                       cpu_buffer = buffer->buffers[cpu];
+                       rb_check_pages(cpu_buffer);
+               }
+               atomic_dec(&buffer->record_disabled);
+       }
+
        mutex_unlock(&buffer->mutex);
+       return size;
 
-       atomic_dec(&buffer->record_disabled);
+ out_err:
+       for_each_buffer_cpu(buffer, cpu) {
+               struct buffer_page *bpage, *tmp;
 
-       return size;
+               cpu_buffer = buffer->buffers[cpu];
+               cpu_buffer->nr_pages_to_update = 0;
 
- free_pages:
-       list_for_each_entry_safe(bpage, tmp, &pages, list) {
-               list_del_init(&bpage->list);
-               free_buffer_page(bpage);
-       }
-       put_online_cpus();
-       mutex_unlock(&buffer->mutex);
-       atomic_dec(&buffer->record_disabled);
-       return -ENOMEM;
+               if (list_empty(&cpu_buffer->new_pages))
+                       continue;
 
-       /*
-        * Something went totally wrong, and we are too paranoid
-        * to even clean up the mess.
-        */
- out_fail:
-       put_online_cpus();
+               list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
+                                       list) {
+                       list_del_init(&bpage->list);
+                       free_buffer_page(bpage);
+               }
+       }
        mutex_unlock(&buffer->mutex);
-       atomic_dec(&buffer->record_disabled);
-       return -1;
+       return err;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_resize);
 
@@ -1447,21 +1679,11 @@ rb_iter_head_event(struct ring_buffer_iter *iter)
        return __rb_page_index(iter->head_page, iter->head);
 }
 
-static inline unsigned long rb_page_write(struct buffer_page *bpage)
-{
-       return local_read(&bpage->write) & RB_WRITE_MASK;
-}
-
 static inline unsigned rb_page_commit(struct buffer_page *bpage)
 {
        return local_read(&bpage->page->commit);
 }
 
-static inline unsigned long rb_page_entries(struct buffer_page *bpage)
-{
-       return local_read(&bpage->entries) & RB_WRITE_MASK;
-}
-
 /* Size is determined by what has been committed */
 static inline unsigned rb_page_size(struct buffer_page *bpage)
 {
@@ -1510,7 +1732,7 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
         * assign the commit to the tail.
         */
  again:
-       max_count = cpu_buffer->buffer->pages * 100;
+       max_count = cpu_buffer->nr_pages * 100;
 
        while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
                if (RB_WARN_ON(cpu_buffer, !(--max_count)))
@@ -3486,6 +3708,7 @@ ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)
 
        iter->cpu_buffer = cpu_buffer;
 
+       atomic_inc(&buffer->resize_disabled);
        atomic_inc(&cpu_buffer->record_disabled);
 
        return iter;
@@ -3548,7 +3771,14 @@ ring_buffer_read_finish(struct ring_buffer_iter *iter)
 {
        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
 
+       /*
+        * Ring buffer is disabled from recording, here's a good place
+        * to check the integrity of the ring buffer. 
+        */
+       rb_check_pages(cpu_buffer);
+
        atomic_dec(&cpu_buffer->record_disabled);
+       atomic_dec(&cpu_buffer->buffer->resize_disabled);
        kfree(iter);
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
@@ -3588,9 +3818,18 @@ EXPORT_SYMBOL_GPL(ring_buffer_read);
  * ring_buffer_size - return the size of the ring buffer (in bytes)
  * @buffer: The ring buffer.
  */
-unsigned long ring_buffer_size(struct ring_buffer *buffer)
+unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu)
 {
-       return BUF_PAGE_SIZE * buffer->pages;
+       /*
+        * Earlier, this method returned
+        *      BUF_PAGE_SIZE * buffer->nr_pages
+        * Since the nr_pages field is now removed, we have converted this to
+        * return the per cpu buffer value.
+        */
+       if (!cpumask_test_cpu(cpu, buffer->cpumask))
+               return 0;
+
+       return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_size);
 
@@ -3611,6 +3850,7 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
        cpu_buffer->commit_page = cpu_buffer->head_page;
 
        INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
+       INIT_LIST_HEAD(&cpu_buffer->new_pages);
        local_set(&cpu_buffer->reader_page->write, 0);
        local_set(&cpu_buffer->reader_page->entries, 0);
        local_set(&cpu_buffer->reader_page->page->commit, 0);
@@ -3647,8 +3887,12 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
                return;
 
+       atomic_inc(&buffer->resize_disabled);
        atomic_inc(&cpu_buffer->record_disabled);
 
+       /* Make sure all commits have finished */
+       synchronize_sched();
+
        raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 
        if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
@@ -3664,6 +3908,7 @@ void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
        raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
        atomic_dec(&cpu_buffer->record_disabled);
+       atomic_dec(&buffer->resize_disabled);
 }
 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
 
@@ -3765,8 +4010,11 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
            !cpumask_test_cpu(cpu, buffer_b->cpumask))
                goto out;
 
+       cpu_buffer_a = buffer_a->buffers[cpu];
+       cpu_buffer_b = buffer_b->buffers[cpu];
+
        /* At least make sure the two buffers are somewhat the same */
-       if (buffer_a->pages != buffer_b->pages)
+       if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
                goto out;
 
        ret = -EAGAIN;
@@ -3780,9 +4028,6 @@ int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
        if (atomic_read(&buffer_b->record_disabled))
                goto out;
 
-       cpu_buffer_a = buffer_a->buffers[cpu];
-       cpu_buffer_b = buffer_b->buffers[cpu];
-
        if (atomic_read(&cpu_buffer_a->record_disabled))
                goto out;
 
@@ -4071,6 +4316,8 @@ static int rb_cpu_notify(struct notifier_block *self,
        struct ring_buffer *buffer =
                container_of(self, struct ring_buffer, cpu_notify);
        long cpu = (long)hcpu;
+       int cpu_i, nr_pages_same;
+       unsigned int nr_pages;
 
        switch (action) {
        case CPU_UP_PREPARE:
@@ -4078,8 +4325,23 @@ static int rb_cpu_notify(struct notifier_block *self,
                if (cpumask_test_cpu(cpu, buffer->cpumask))
                        return NOTIFY_OK;
 
+               nr_pages = 0;
+               nr_pages_same = 1;
+               /* check if all cpu sizes are same */
+               for_each_buffer_cpu(buffer, cpu_i) {
+                       /* fill in the size from first enabled cpu */
+                       if (nr_pages == 0)
+                               nr_pages = buffer->buffers[cpu_i]->nr_pages;
+                       if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
+                               nr_pages_same = 0;
+                               break;
+                       }
+               }
+               /* allocate minimum pages, user can later expand it */
+               if (!nr_pages_same)
+                       nr_pages = 2;
                buffer->buffers[cpu] =
-                       rb_allocate_cpu_buffer(buffer, cpu);
+                       rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
                if (!buffer->buffers[cpu]) {
                        WARN(1, "failed to allocate ring buffer on CPU %ld\n",
                             cpu);