1 // SPDX-License-Identifier: GPL-2.0
2 /* Watch queue and general notification mechanism, built on pipes
4 * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
7 * See Documentation/watch_queue.rst
10 #define pr_fmt(fmt) "watchq: " fmt
11 #include <linux/module.h>
12 #include <linux/init.h>
13 #include <linux/sched.h>
14 #include <linux/slab.h>
15 #include <linux/printk.h>
16 #include <linux/miscdevice.h>
19 #include <linux/pagemap.h>
20 #include <linux/poll.h>
21 #include <linux/uaccess.h>
22 #include <linux/vmalloc.h>
23 #include <linux/file.h>
24 #include <linux/security.h>
25 #include <linux/cred.h>
26 #include <linux/sched/signal.h>
27 #include <linux/watch_queue.h>
28 #include <linux/pipe_fs_i.h>
30 MODULE_DESCRIPTION("Watch queue");
31 MODULE_AUTHOR("Red Hat, Inc.");
32 MODULE_LICENSE("GPL");
34 #define WATCH_QUEUE_NOTE_SIZE 128
35 #define WATCH_QUEUE_NOTES_PER_PAGE (PAGE_SIZE / WATCH_QUEUE_NOTE_SIZE)
37 static void watch_queue_pipe_buf_release(struct pipe_inode_info
*pipe
,
38 struct pipe_buffer
*buf
)
40 struct watch_queue
*wqueue
= (struct watch_queue
*)buf
->private;
44 /* We need to work out which note within the page this refers to, but
45 * the note might have been maximum size, so merely ANDing the offset
46 * off doesn't work. OTOH, the note must've been more than zero size.
48 bit
= buf
->offset
+ buf
->len
;
49 if ((bit
& (WATCH_QUEUE_NOTE_SIZE
- 1)) == 0)
50 bit
-= WATCH_QUEUE_NOTE_SIZE
;
51 bit
/= WATCH_QUEUE_NOTE_SIZE
;
56 set_bit(bit
, wqueue
->notes_bitmap
);
57 generic_pipe_buf_release(pipe
, buf
);
60 // No try_steal function => no stealing
61 #define watch_queue_pipe_buf_try_steal NULL
63 /* New data written to a pipe may be appended to a buffer with this type. */
64 static const struct pipe_buf_operations watch_queue_pipe_buf_ops
= {
65 .release
= watch_queue_pipe_buf_release
,
66 .try_steal
= watch_queue_pipe_buf_try_steal
,
67 .get
= generic_pipe_buf_get
,
71 * Post a notification to a watch queue.
73 static bool post_one_notification(struct watch_queue
*wqueue
,
74 struct watch_notification
*n
)
77 struct pipe_inode_info
*pipe
= wqueue
->pipe
;
78 struct pipe_buffer
*buf
;
80 unsigned int head
, tail
, mask
, note
, offset
, len
;
86 spin_lock_irq(&pipe
->rd_wait
.lock
);
91 mask
= pipe
->ring_size
- 1;
94 if (pipe_full(head
, tail
, pipe
->ring_size
))
97 note
= find_first_bit(wqueue
->notes_bitmap
, wqueue
->nr_notes
);
98 if (note
>= wqueue
->nr_notes
)
101 page
= wqueue
->notes
[note
/ WATCH_QUEUE_NOTES_PER_PAGE
];
102 offset
= note
% WATCH_QUEUE_NOTES_PER_PAGE
* WATCH_QUEUE_NOTE_SIZE
;
104 len
= n
->info
& WATCH_INFO_LENGTH
;
105 p
= kmap_atomic(page
);
106 memcpy(p
+ offset
, n
, len
);
109 buf
= &pipe
->bufs
[head
& mask
];
111 buf
->private = (unsigned long)wqueue
;
112 buf
->ops
= &watch_queue_pipe_buf_ops
;
113 buf
->offset
= offset
;
115 buf
->flags
= PIPE_BUF_FLAG_WHOLE
;
116 smp_store_release(&pipe
->head
, head
+ 1); /* vs pipe_read() */
118 if (!test_and_clear_bit(note
, wqueue
->notes_bitmap
)) {
119 spin_unlock_irq(&pipe
->rd_wait
.lock
);
122 wake_up_interruptible_sync_poll_locked(&pipe
->rd_wait
, EPOLLIN
| EPOLLRDNORM
);
126 spin_unlock_irq(&pipe
->rd_wait
.lock
);
128 kill_fasync(&pipe
->fasync_readers
, SIGIO
, POLL_IN
);
132 buf
= &pipe
->bufs
[(head
- 1) & mask
];
133 buf
->flags
|= PIPE_BUF_FLAG_LOSS
;
138 * Apply filter rules to a notification.
140 static bool filter_watch_notification(const struct watch_filter
*wf
,
141 const struct watch_notification
*n
)
143 const struct watch_type_filter
*wt
;
144 unsigned int st_bits
= sizeof(wt
->subtype_filter
[0]) * 8;
145 unsigned int st_index
= n
->subtype
/ st_bits
;
146 unsigned int st_bit
= 1U << (n
->subtype
% st_bits
);
149 if (!test_bit(n
->type
, wf
->type_filter
))
152 for (i
= 0; i
< wf
->nr_filters
; i
++) {
153 wt
= &wf
->filters
[i
];
154 if (n
->type
== wt
->type
&&
155 (wt
->subtype_filter
[st_index
] & st_bit
) &&
156 (n
->info
& wt
->info_mask
) == wt
->info_filter
)
160 return false; /* If there is a filter, the default is to reject. */
164 * __post_watch_notification - Post an event notification
165 * @wlist: The watch list to post the event to.
166 * @n: The notification record to post.
167 * @cred: The creds of the process that triggered the notification.
168 * @id: The ID to match on the watch.
170 * Post a notification of an event into a set of watch queues and let the users
173 * The size of the notification should be set in n->info & WATCH_INFO_LENGTH and
174 * should be in units of sizeof(*n).
176 void __post_watch_notification(struct watch_list
*wlist
,
177 struct watch_notification
*n
,
178 const struct cred
*cred
,
181 const struct watch_filter
*wf
;
182 struct watch_queue
*wqueue
;
185 if (((n
->info
& WATCH_INFO_LENGTH
) >> WATCH_INFO_LENGTH__SHIFT
) == 0) {
192 hlist_for_each_entry_rcu(watch
, &wlist
->watchers
, list_node
) {
195 n
->info
&= ~WATCH_INFO_ID
;
196 n
->info
|= watch
->info_id
;
198 wqueue
= rcu_dereference(watch
->queue
);
199 wf
= rcu_dereference(wqueue
->filter
);
200 if (wf
&& !filter_watch_notification(wf
, n
))
203 if (security_post_notification(watch
->cred
, cred
, n
) < 0)
206 post_one_notification(wqueue
, n
);
211 EXPORT_SYMBOL(__post_watch_notification
);
214 * Allocate sufficient pages to preallocation for the requested number of
217 long watch_queue_set_size(struct pipe_inode_info
*pipe
, unsigned int nr_notes
)
219 struct watch_queue
*wqueue
= pipe
->watch_queue
;
221 unsigned long *bitmap
;
222 unsigned long user_bufs
;
224 int ret
, i
, nr_pages
;
232 nr_notes
> 512) /* TODO: choose a better hard limit */
235 nr_pages
= (nr_notes
+ WATCH_QUEUE_NOTES_PER_PAGE
- 1);
236 nr_pages
/= WATCH_QUEUE_NOTES_PER_PAGE
;
237 user_bufs
= account_pipe_buffers(pipe
->user
, pipe
->nr_accounted
, nr_pages
);
239 if (nr_pages
> pipe
->max_usage
&&
240 (too_many_pipe_buffers_hard(user_bufs
) ||
241 too_many_pipe_buffers_soft(user_bufs
)) &&
242 pipe_is_unprivileged_user()) {
247 nr_notes
= nr_pages
* WATCH_QUEUE_NOTES_PER_PAGE
;
248 ret
= pipe_resize_ring(pipe
, roundup_pow_of_two(nr_notes
));
252 pages
= kcalloc(sizeof(struct page
*), nr_pages
, GFP_KERNEL
);
256 for (i
= 0; i
< nr_pages
; i
++) {
257 pages
[i
] = alloc_page(GFP_KERNEL
);
260 pages
[i
]->index
= i
* WATCH_QUEUE_NOTES_PER_PAGE
;
263 bmsize
= (nr_notes
+ BITS_PER_LONG
- 1) / BITS_PER_LONG
;
264 bmsize
*= sizeof(unsigned long);
265 bitmap
= kmalloc(bmsize
, GFP_KERNEL
);
269 memset(bitmap
, 0xff, bmsize
);
270 wqueue
->notes
= pages
;
271 wqueue
->notes_bitmap
= bitmap
;
272 wqueue
->nr_pages
= nr_pages
;
273 wqueue
->nr_notes
= nr_notes
;
277 for (i
= 0; i
< nr_pages
; i
++)
278 __free_page(pages
[i
]);
281 (void) account_pipe_buffers(pipe
->user
, nr_pages
, pipe
->nr_accounted
);
286 * Set the filter on a watch queue.
288 long watch_queue_set_filter(struct pipe_inode_info
*pipe
,
289 struct watch_notification_filter __user
*_filter
)
291 struct watch_notification_type_filter
*tf
;
292 struct watch_notification_filter filter
;
293 struct watch_type_filter
*q
;
294 struct watch_filter
*wfilter
;
295 struct watch_queue
*wqueue
= pipe
->watch_queue
;
296 int ret
, nr_filter
= 0, i
;
302 /* Remove the old filter */
307 /* Grab the user's filter specification */
308 if (copy_from_user(&filter
, _filter
, sizeof(filter
)) != 0)
310 if (filter
.nr_filters
== 0 ||
311 filter
.nr_filters
> 16 ||
312 filter
.__reserved
!= 0)
315 tf
= memdup_user(_filter
->filters
, filter
.nr_filters
* sizeof(*tf
));
320 for (i
= 0; i
< filter
.nr_filters
; i
++) {
321 if ((tf
[i
].info_filter
& ~tf
[i
].info_mask
) ||
322 tf
[i
].info_mask
& WATCH_INFO_LENGTH
)
324 /* Ignore any unknown types */
325 if (tf
[i
].type
>= WATCH_TYPE__NR
)
330 /* Now we need to build the internal filter from only the relevant
331 * user-specified filters.
334 wfilter
= kzalloc(struct_size(wfilter
, filters
, nr_filter
), GFP_KERNEL
);
337 wfilter
->nr_filters
= nr_filter
;
339 q
= wfilter
->filters
;
340 for (i
= 0; i
< filter
.nr_filters
; i
++) {
341 if (tf
[i
].type
>= WATCH_TYPE__NR
)
344 q
->type
= tf
[i
].type
;
345 q
->info_filter
= tf
[i
].info_filter
;
346 q
->info_mask
= tf
[i
].info_mask
;
347 q
->subtype_filter
[0] = tf
[i
].subtype_filter
[0];
348 __set_bit(q
->type
, wfilter
->type_filter
);
355 wfilter
= rcu_replace_pointer(wqueue
->filter
, wfilter
,
356 lockdep_is_held(&pipe
->mutex
));
359 kfree_rcu(wfilter
, rcu
);
367 static void __put_watch_queue(struct kref
*kref
)
369 struct watch_queue
*wqueue
=
370 container_of(kref
, struct watch_queue
, usage
);
371 struct watch_filter
*wfilter
;
374 for (i
= 0; i
< wqueue
->nr_pages
; i
++)
375 __free_page(wqueue
->notes
[i
]);
376 bitmap_free(wqueue
->notes_bitmap
);
378 wfilter
= rcu_access_pointer(wqueue
->filter
);
380 kfree_rcu(wfilter
, rcu
);
381 kfree_rcu(wqueue
, rcu
);
385 * put_watch_queue - Dispose of a ref on a watchqueue.
386 * @wqueue: The watch queue to unref.
388 void put_watch_queue(struct watch_queue
*wqueue
)
390 kref_put(&wqueue
->usage
, __put_watch_queue
);
392 EXPORT_SYMBOL(put_watch_queue
);
394 static void free_watch(struct rcu_head
*rcu
)
396 struct watch
*watch
= container_of(rcu
, struct watch
, rcu
);
398 put_watch_queue(rcu_access_pointer(watch
->queue
));
399 atomic_dec(&watch
->cred
->user
->nr_watches
);
400 put_cred(watch
->cred
);
403 static void __put_watch(struct kref
*kref
)
405 struct watch
*watch
= container_of(kref
, struct watch
, usage
);
407 call_rcu(&watch
->rcu
, free_watch
);
413 static void put_watch(struct watch
*watch
)
415 kref_put(&watch
->usage
, __put_watch
);
419 * init_watch - Initialise a watch
420 * @watch: The watch to initialise.
421 * @wqueue: The queue to assign.
423 * Initialise a watch and set the watch queue.
425 void init_watch(struct watch
*watch
, struct watch_queue
*wqueue
)
427 kref_init(&watch
->usage
);
428 INIT_HLIST_NODE(&watch
->list_node
);
429 INIT_HLIST_NODE(&watch
->queue_node
);
430 rcu_assign_pointer(watch
->queue
, wqueue
);
434 * add_watch_to_object - Add a watch on an object to a watch list
435 * @watch: The watch to add
436 * @wlist: The watch list to add to
438 * @watch->queue must have been set to point to the queue to post notifications
439 * to and the watch list of the object to be watched. @watch->cred must also
440 * have been set to the appropriate credentials and a ref taken on them.
442 * The caller must pin the queue and the list both and must hold the list
443 * locked against racing watch additions/removals.
445 int add_watch_to_object(struct watch
*watch
, struct watch_list
*wlist
)
447 struct watch_queue
*wqueue
= rcu_access_pointer(watch
->queue
);
450 hlist_for_each_entry(w
, &wlist
->watchers
, list_node
) {
451 struct watch_queue
*wq
= rcu_access_pointer(w
->queue
);
452 if (wqueue
== wq
&& watch
->id
== w
->id
)
456 watch
->cred
= get_current_cred();
457 rcu_assign_pointer(watch
->watch_list
, wlist
);
459 if (atomic_inc_return(&watch
->cred
->user
->nr_watches
) >
460 task_rlimit(current
, RLIMIT_NOFILE
)) {
461 atomic_dec(&watch
->cred
->user
->nr_watches
);
462 put_cred(watch
->cred
);
466 spin_lock_bh(&wqueue
->lock
);
467 kref_get(&wqueue
->usage
);
468 kref_get(&watch
->usage
);
469 hlist_add_head(&watch
->queue_node
, &wqueue
->watches
);
470 spin_unlock_bh(&wqueue
->lock
);
472 hlist_add_head(&watch
->list_node
, &wlist
->watchers
);
475 EXPORT_SYMBOL(add_watch_to_object
);
478 * remove_watch_from_object - Remove a watch or all watches from an object.
479 * @wlist: The watch list to remove from
480 * @wq: The watch queue of interest (ignored if @all is true)
481 * @id: The ID of the watch to remove (ignored if @all is true)
482 * @all: True to remove all objects
484 * Remove a specific watch or all watches from an object. A notification is
485 * sent to the watcher to tell them that this happened.
487 int remove_watch_from_object(struct watch_list
*wlist
, struct watch_queue
*wq
,
490 struct watch_notification_removal n
;
491 struct watch_queue
*wqueue
;
498 spin_lock(&wlist
->lock
);
499 hlist_for_each_entry(watch
, &wlist
->watchers
, list_node
) {
501 (watch
->id
== id
&& rcu_access_pointer(watch
->queue
) == wq
))
504 spin_unlock(&wlist
->lock
);
509 hlist_del_init_rcu(&watch
->list_node
);
510 rcu_assign_pointer(watch
->watch_list
, NULL
);
511 spin_unlock(&wlist
->lock
);
513 /* We now own the reference on watch that used to belong to wlist. */
515 n
.watch
.type
= WATCH_TYPE_META
;
516 n
.watch
.subtype
= WATCH_META_REMOVAL_NOTIFICATION
;
517 n
.watch
.info
= watch
->info_id
| watch_sizeof(n
.watch
);
520 n
.watch
.info
= watch
->info_id
| watch_sizeof(n
);
522 wqueue
= rcu_dereference(watch
->queue
);
524 /* We don't need the watch list lock for the next bit as RCU is
525 * protecting *wqueue from deallocation.
528 post_one_notification(wqueue
, &n
.watch
);
530 spin_lock_bh(&wqueue
->lock
);
532 if (!hlist_unhashed(&watch
->queue_node
)) {
533 hlist_del_init_rcu(&watch
->queue_node
);
537 spin_unlock_bh(&wqueue
->lock
);
540 if (wlist
->release_watch
) {
541 void (*release_watch
)(struct watch
*);
543 release_watch
= wlist
->release_watch
;
545 (*release_watch
)(watch
);
550 if (all
&& !hlist_empty(&wlist
->watchers
))
556 EXPORT_SYMBOL(remove_watch_from_object
);
559 * Remove all the watches that are contributory to a queue. This has the
560 * potential to race with removal of the watches by the destruction of the
561 * objects being watched or with the distribution of notifications.
563 void watch_queue_clear(struct watch_queue
*wqueue
)
565 struct watch_list
*wlist
;
570 spin_lock_bh(&wqueue
->lock
);
572 /* Prevent new notifications from being stored. */
573 wqueue
->defunct
= true;
575 while (!hlist_empty(&wqueue
->watches
)) {
576 watch
= hlist_entry(wqueue
->watches
.first
, struct watch
, queue_node
);
577 hlist_del_init_rcu(&watch
->queue_node
);
578 /* We now own a ref on the watch. */
579 spin_unlock_bh(&wqueue
->lock
);
581 /* We can't do the next bit under the queue lock as we need to
582 * get the list lock - which would cause a deadlock if someone
583 * was removing from the opposite direction at the same time or
584 * posting a notification.
586 wlist
= rcu_dereference(watch
->watch_list
);
588 void (*release_watch
)(struct watch
*);
590 spin_lock(&wlist
->lock
);
592 release
= !hlist_unhashed(&watch
->list_node
);
594 hlist_del_init_rcu(&watch
->list_node
);
595 rcu_assign_pointer(watch
->watch_list
, NULL
);
597 /* We now own a second ref on the watch. */
600 release_watch
= wlist
->release_watch
;
601 spin_unlock(&wlist
->lock
);
606 /* This might need to call dput(), so
607 * we have to drop all the locks.
609 (*release_watch
)(watch
);
617 spin_lock_bh(&wqueue
->lock
);
620 spin_unlock_bh(&wqueue
->lock
);
625 * get_watch_queue - Get a watch queue from its file descriptor.
626 * @fd: The fd to query.
628 struct watch_queue
*get_watch_queue(int fd
)
630 struct pipe_inode_info
*pipe
;
631 struct watch_queue
*wqueue
= ERR_PTR(-EINVAL
);
636 pipe
= get_pipe_info(f
.file
, false);
637 if (pipe
&& pipe
->watch_queue
) {
638 wqueue
= pipe
->watch_queue
;
639 kref_get(&wqueue
->usage
);
646 EXPORT_SYMBOL(get_watch_queue
);
649 * Initialise a watch queue
651 int watch_queue_init(struct pipe_inode_info
*pipe
)
653 struct watch_queue
*wqueue
;
655 wqueue
= kzalloc(sizeof(*wqueue
), GFP_KERNEL
);
660 kref_init(&wqueue
->usage
);
661 spin_lock_init(&wqueue
->lock
);
662 INIT_HLIST_HEAD(&wqueue
->watches
);
664 pipe
->watch_queue
= wqueue
;