]> git.proxmox.com Git - mirror_ubuntu-jammy-kernel.git/blob - net/tipc/server.c
tipc: collapse subscription creation functions
[mirror_ubuntu-jammy-kernel.git] / net / tipc / server.c
1 /*
2 * net/tipc/server.c: TIPC server infrastructure
3 *
4 * Copyright (c) 2012-2013, Wind River Systems
5 * Copyright (c) 2017, Ericsson AB
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37 #include "subscr.h"
38 #include "server.h"
39 #include "core.h"
40 #include "socket.h"
41 #include "addr.h"
42 #include "msg.h"
43 #include <net/sock.h>
44 #include <linux/module.h>
45
46 /* Number of messages to send before rescheduling */
47 #define MAX_SEND_MSG_COUNT 25
48 #define MAX_RECV_MSG_COUNT 25
49 #define CF_CONNECTED 1
50 #define CF_SERVER 2
51
52 #define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data)
53
54 /**
55 * struct tipc_conn - TIPC connection structure
56 * @kref: reference counter to connection object
57 * @conid: connection identifier
58 * @sock: socket handler associated with connection
59 * @flags: indicates connection state
60 * @server: pointer to connected server
61 * @sub_list: lsit to all pertaing subscriptions
62 * @sub_lock: lock protecting the subscription list
63 * @outqueue_lock: control access to the outqueue
64 * @rwork: receive work item
65 * @rx_action: what to do when connection socket is active
66 * @outqueue: pointer to first outbound message in queue
67 * @outqueue_lock: control access to the outqueue
68 * @swork: send work item
69 */
70 struct tipc_conn {
71 struct kref kref;
72 int conid;
73 struct socket *sock;
74 unsigned long flags;
75 struct tipc_server *server;
76 struct list_head sub_list;
77 spinlock_t sub_lock; /* for subscription list */
78 struct work_struct rwork;
79 int (*rx_action) (struct tipc_conn *con);
80 struct list_head outqueue;
81 spinlock_t outqueue_lock;
82 struct work_struct swork;
83 };
84
85 /* An entry waiting to be sent */
86 struct outqueue_entry {
87 bool inactive;
88 struct tipc_event evt;
89 struct list_head list;
90 };
91
92 static void tipc_recv_work(struct work_struct *work);
93 static void tipc_send_work(struct work_struct *work);
94 static void tipc_clean_outqueues(struct tipc_conn *con);
95
96 static bool connected(struct tipc_conn *con)
97 {
98 return con && test_bit(CF_CONNECTED, &con->flags);
99 }
100
101 static void tipc_conn_kref_release(struct kref *kref)
102 {
103 struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
104 struct tipc_server *s = con->server;
105 struct socket *sock = con->sock;
106
107 if (sock) {
108 if (test_bit(CF_SERVER, &con->flags)) {
109 __module_get(sock->ops->owner);
110 __module_get(sock->sk->sk_prot_creator->owner);
111 }
112 sock_release(sock);
113 con->sock = NULL;
114 }
115 spin_lock_bh(&s->idr_lock);
116 idr_remove(&s->conn_idr, con->conid);
117 s->idr_in_use--;
118 spin_unlock_bh(&s->idr_lock);
119 tipc_clean_outqueues(con);
120 kfree(con);
121 }
122
123 static void conn_put(struct tipc_conn *con)
124 {
125 kref_put(&con->kref, tipc_conn_kref_release);
126 }
127
128 static void conn_get(struct tipc_conn *con)
129 {
130 kref_get(&con->kref);
131 }
132
133 static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
134 {
135 struct tipc_conn *con;
136
137 spin_lock_bh(&s->idr_lock);
138 con = idr_find(&s->conn_idr, conid);
139 if (!connected(con) || !kref_get_unless_zero(&con->kref))
140 con = NULL;
141 spin_unlock_bh(&s->idr_lock);
142 return con;
143 }
144
145 /* sock_data_ready - interrupt callback indicating the socket has data to read
146 * The queued job is launched in tipc_recv_from_sock()
147 */
148 static void sock_data_ready(struct sock *sk)
149 {
150 struct tipc_conn *con;
151
152 read_lock_bh(&sk->sk_callback_lock);
153 con = sock2con(sk);
154 if (connected(con)) {
155 conn_get(con);
156 if (!queue_work(con->server->rcv_wq, &con->rwork))
157 conn_put(con);
158 }
159 read_unlock_bh(&sk->sk_callback_lock);
160 }
161
162 /* sock_write_space - interrupt callback after a sendmsg EAGAIN
163 * Indicates that there now is more is space in the send buffer
164 * The queued job is launched in tipc_send_to_sock()
165 */
166 static void sock_write_space(struct sock *sk)
167 {
168 struct tipc_conn *con;
169
170 read_lock_bh(&sk->sk_callback_lock);
171 con = sock2con(sk);
172 if (connected(con)) {
173 conn_get(con);
174 if (!queue_work(con->server->send_wq, &con->swork))
175 conn_put(con);
176 }
177 read_unlock_bh(&sk->sk_callback_lock);
178 }
179
180 static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
181 {
182 struct sock *sk = sock->sk;
183
184 write_lock_bh(&sk->sk_callback_lock);
185
186 sk->sk_data_ready = sock_data_ready;
187 sk->sk_write_space = sock_write_space;
188 sk->sk_user_data = con;
189
190 con->sock = sock;
191
192 write_unlock_bh(&sk->sk_callback_lock);
193 }
194
195 /* tipc_con_delete_sub - delete a specific or all subscriptions
196 * for a given subscriber
197 */
198 static void tipc_con_delete_sub(struct tipc_conn *con, struct tipc_subscr *s)
199 {
200 struct list_head *sub_list = &con->sub_list;
201 struct tipc_subscription *sub, *tmp;
202
203 spin_lock_bh(&con->sub_lock);
204 list_for_each_entry_safe(sub, tmp, sub_list, subscrp_list) {
205 if (!s || !memcmp(s, &sub->evt.s, sizeof(*s)))
206 tipc_sub_unsubscribe(sub);
207 else if (s)
208 break;
209 }
210 spin_unlock_bh(&con->sub_lock);
211 }
212
213 static void tipc_close_conn(struct tipc_conn *con)
214 {
215 struct sock *sk = con->sock->sk;
216 bool disconnect = false;
217
218 write_lock_bh(&sk->sk_callback_lock);
219 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
220
221 if (disconnect) {
222 sk->sk_user_data = NULL;
223 if (con->conid)
224 tipc_con_delete_sub(con, NULL);
225 }
226 write_unlock_bh(&sk->sk_callback_lock);
227
228 /* Handle concurrent calls from sending and receiving threads */
229 if (!disconnect)
230 return;
231
232 /* Don't flush pending works, -just let them expire */
233 kernel_sock_shutdown(con->sock, SHUT_RDWR);
234 conn_put(con);
235 }
236
237 static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
238 {
239 struct tipc_conn *con;
240 int ret;
241
242 con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC);
243 if (!con)
244 return ERR_PTR(-ENOMEM);
245
246 kref_init(&con->kref);
247 INIT_LIST_HEAD(&con->outqueue);
248 INIT_LIST_HEAD(&con->sub_list);
249 spin_lock_init(&con->outqueue_lock);
250 spin_lock_init(&con->sub_lock);
251 INIT_WORK(&con->swork, tipc_send_work);
252 INIT_WORK(&con->rwork, tipc_recv_work);
253
254 spin_lock_bh(&s->idr_lock);
255 ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
256 if (ret < 0) {
257 kfree(con);
258 spin_unlock_bh(&s->idr_lock);
259 return ERR_PTR(-ENOMEM);
260 }
261 con->conid = ret;
262 s->idr_in_use++;
263 spin_unlock_bh(&s->idr_lock);
264
265 set_bit(CF_CONNECTED, &con->flags);
266 con->server = s;
267
268 return con;
269 }
270
271 static int tipc_con_rcv_sub(struct tipc_server *srv,
272 struct tipc_conn *con,
273 struct tipc_subscr *s)
274 {
275 struct tipc_subscription *sub;
276
277 if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) {
278 tipc_con_delete_sub(con, s);
279 return 0;
280 }
281 sub = tipc_sub_subscribe(srv, s, con->conid);
282 if (!sub)
283 return -1;
284
285 spin_lock_bh(&con->sub_lock);
286 list_add(&sub->subscrp_list, &con->sub_list);
287 spin_unlock_bh(&con->sub_lock);
288 return 0;
289 }
290
291 static int tipc_receive_from_sock(struct tipc_conn *con)
292 {
293 struct tipc_server *srv = con->server;
294 struct sock *sk = con->sock->sk;
295 struct msghdr msg = {};
296 struct tipc_subscr s;
297 struct kvec iov;
298 int ret;
299
300 iov.iov_base = &s;
301 iov.iov_len = sizeof(s);
302 msg.msg_name = NULL;
303 iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
304 ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
305 if (ret == -EWOULDBLOCK)
306 return -EWOULDBLOCK;
307 if (ret > 0) {
308 read_lock_bh(&sk->sk_callback_lock);
309 ret = tipc_con_rcv_sub(srv, con, &s);
310 read_unlock_bh(&sk->sk_callback_lock);
311 }
312 if (ret < 0)
313 tipc_close_conn(con);
314
315 return ret;
316 }
317
318 static int tipc_accept_from_sock(struct tipc_conn *con)
319 {
320 struct socket *sock = con->sock;
321 struct socket *newsock;
322 struct tipc_conn *newcon;
323 int ret;
324
325 ret = kernel_accept(sock, &newsock, O_NONBLOCK);
326 if (ret < 0)
327 return ret;
328
329 newcon = tipc_alloc_conn(con->server);
330 if (IS_ERR(newcon)) {
331 ret = PTR_ERR(newcon);
332 sock_release(newsock);
333 return ret;
334 }
335
336 newcon->rx_action = tipc_receive_from_sock;
337 tipc_register_callbacks(newsock, newcon);
338
339 /* Wake up receive process in case of 'SYN+' message */
340 newsock->sk->sk_data_ready(newsock->sk);
341 return ret;
342 }
343
344 static struct socket *tipc_create_listen_sock(struct tipc_conn *con)
345 {
346 struct tipc_server *s = con->server;
347 struct socket *sock = NULL;
348 int imp = TIPC_CRITICAL_IMPORTANCE;
349 int ret;
350
351 ret = sock_create_kern(s->net, AF_TIPC, SOCK_SEQPACKET, 0, &sock);
352 if (ret < 0)
353 return NULL;
354 ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
355 (char *)&imp, sizeof(imp));
356 if (ret < 0)
357 goto create_err;
358 ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr));
359 if (ret < 0)
360 goto create_err;
361
362 con->rx_action = tipc_accept_from_sock;
363 ret = kernel_listen(sock, 0);
364 if (ret < 0)
365 goto create_err;
366
367 /* As server's listening socket owner and creator is the same module,
368 * we have to decrease TIPC module reference count to guarantee that
369 * it remains zero after the server socket is created, otherwise,
370 * executing "rmmod" command is unable to make TIPC module deleted
371 * after TIPC module is inserted successfully.
372 *
373 * However, the reference count is ever increased twice in
374 * sock_create_kern(): one is to increase the reference count of owner
375 * of TIPC socket's proto_ops struct; another is to increment the
376 * reference count of owner of TIPC proto struct. Therefore, we must
377 * decrement the module reference count twice to ensure that it keeps
378 * zero after server's listening socket is created. Of course, we
379 * must bump the module reference count twice as well before the socket
380 * is closed.
381 */
382 module_put(sock->ops->owner);
383 module_put(sock->sk->sk_prot_creator->owner);
384 set_bit(CF_SERVER, &con->flags);
385
386 return sock;
387
388 create_err:
389 kernel_sock_shutdown(sock, SHUT_RDWR);
390 sock_release(sock);
391 return NULL;
392 }
393
394 static int tipc_open_listening_sock(struct tipc_server *s)
395 {
396 struct socket *sock;
397 struct tipc_conn *con;
398
399 con = tipc_alloc_conn(s);
400 if (IS_ERR(con))
401 return PTR_ERR(con);
402
403 sock = tipc_create_listen_sock(con);
404 if (!sock) {
405 idr_remove(&s->conn_idr, con->conid);
406 s->idr_in_use--;
407 kfree(con);
408 return -EINVAL;
409 }
410
411 tipc_register_callbacks(sock, con);
412 return 0;
413 }
414
415 static void tipc_clean_outqueues(struct tipc_conn *con)
416 {
417 struct outqueue_entry *e, *safe;
418
419 spin_lock_bh(&con->outqueue_lock);
420 list_for_each_entry_safe(e, safe, &con->outqueue, list) {
421 list_del(&e->list);
422 kfree(e);
423 }
424 spin_unlock_bh(&con->outqueue_lock);
425 }
426
427 /* tipc_conn_queue_evt - interrupt level call from a subscription instance
428 * The queued job is launched in tipc_send_to_sock()
429 */
430 void tipc_conn_queue_evt(struct tipc_server *s, int conid,
431 u32 event, struct tipc_event *evt)
432 {
433 struct outqueue_entry *e;
434 struct tipc_conn *con;
435
436 con = tipc_conn_lookup(s, conid);
437 if (!con)
438 return;
439
440 if (!connected(con))
441 goto err;
442
443 e = kmalloc(sizeof(*e), GFP_ATOMIC);
444 if (!e)
445 goto err;
446 e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
447 memcpy(&e->evt, evt, sizeof(*evt));
448 spin_lock_bh(&con->outqueue_lock);
449 list_add_tail(&e->list, &con->outqueue);
450 spin_unlock_bh(&con->outqueue_lock);
451
452 if (queue_work(s->send_wq, &con->swork))
453 return;
454 err:
455 conn_put(con);
456 }
457
458 bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
459 u32 upper, u32 filter, int *conid)
460 {
461 struct tipc_subscr sub;
462 struct tipc_conn *con;
463 int rc;
464
465 sub.seq.type = type;
466 sub.seq.lower = lower;
467 sub.seq.upper = upper;
468 sub.timeout = TIPC_WAIT_FOREVER;
469 sub.filter = filter;
470 *(u32 *)&sub.usr_handle = port;
471
472 con = tipc_alloc_conn(tipc_topsrv(net));
473 if (IS_ERR(con))
474 return false;
475
476 *conid = con->conid;
477 con->sock = NULL;
478 rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub);
479 if (rc < 0)
480 tipc_close_conn(con);
481 return !rc;
482 }
483
484 void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
485 {
486 struct tipc_conn *con;
487
488 con = tipc_conn_lookup(tipc_topsrv(net), conid);
489 if (!con)
490 return;
491
492 test_and_clear_bit(CF_CONNECTED, &con->flags);
493 tipc_con_delete_sub(con, NULL);
494 conn_put(con);
495 conn_put(con);
496 }
497
498 static void tipc_send_kern_top_evt(struct net *net, struct tipc_event *evt)
499 {
500 u32 port = *(u32 *)&evt->s.usr_handle;
501 u32 self = tipc_own_addr(net);
502 struct sk_buff_head evtq;
503 struct sk_buff *skb;
504
505 skb = tipc_msg_create(TOP_SRV, 0, INT_H_SIZE, sizeof(*evt),
506 self, self, port, port, 0);
507 if (!skb)
508 return;
509 msg_set_dest_droppable(buf_msg(skb), true);
510 memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt));
511 skb_queue_head_init(&evtq);
512 __skb_queue_tail(&evtq, skb);
513 tipc_sk_rcv(net, &evtq);
514 }
515
516 static void tipc_send_to_sock(struct tipc_conn *con)
517 {
518 struct list_head *queue = &con->outqueue;
519 struct tipc_server *srv = con->server;
520 struct outqueue_entry *e;
521 struct tipc_event *evt;
522 struct msghdr msg;
523 struct kvec iov;
524 int count = 0;
525 int ret;
526
527 spin_lock_bh(&con->outqueue_lock);
528
529 while (!list_empty(queue)) {
530 e = list_first_entry(queue, struct outqueue_entry, list);
531 evt = &e->evt;
532 spin_unlock_bh(&con->outqueue_lock);
533
534 if (e->inactive)
535 tipc_con_delete_sub(con, &evt->s);
536
537 memset(&msg, 0, sizeof(msg));
538 msg.msg_flags = MSG_DONTWAIT;
539 iov.iov_base = evt;
540 iov.iov_len = sizeof(*evt);
541 msg.msg_name = NULL;
542
543 if (con->sock) {
544 ret = kernel_sendmsg(con->sock, &msg, &iov,
545 1, sizeof(*evt));
546 if (ret == -EWOULDBLOCK || ret == 0) {
547 cond_resched();
548 goto out;
549 } else if (ret < 0) {
550 goto err;
551 }
552 } else {
553 tipc_send_kern_top_evt(srv->net, evt);
554 }
555
556 /* Don't starve users filling buffers */
557 if (++count >= MAX_SEND_MSG_COUNT) {
558 cond_resched();
559 count = 0;
560 }
561 spin_lock_bh(&con->outqueue_lock);
562 list_del(&e->list);
563 kfree(e);
564 }
565 spin_unlock_bh(&con->outqueue_lock);
566 out:
567 return;
568 err:
569 tipc_close_conn(con);
570 }
571
572 static void tipc_recv_work(struct work_struct *work)
573 {
574 struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
575 int count = 0;
576
577 while (connected(con)) {
578 if (con->rx_action(con))
579 break;
580
581 /* Don't flood Rx machine */
582 if (++count >= MAX_RECV_MSG_COUNT) {
583 cond_resched();
584 count = 0;
585 }
586 }
587 conn_put(con);
588 }
589
590 static void tipc_send_work(struct work_struct *work)
591 {
592 struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
593
594 if (connected(con))
595 tipc_send_to_sock(con);
596
597 conn_put(con);
598 }
599
600 static void tipc_work_stop(struct tipc_server *s)
601 {
602 destroy_workqueue(s->rcv_wq);
603 destroy_workqueue(s->send_wq);
604 }
605
606 static int tipc_work_start(struct tipc_server *s)
607 {
608 s->rcv_wq = alloc_ordered_workqueue("tipc_rcv", 0);
609 if (!s->rcv_wq) {
610 pr_err("can't start tipc receive workqueue\n");
611 return -ENOMEM;
612 }
613
614 s->send_wq = alloc_ordered_workqueue("tipc_send", 0);
615 if (!s->send_wq) {
616 pr_err("can't start tipc send workqueue\n");
617 destroy_workqueue(s->rcv_wq);
618 return -ENOMEM;
619 }
620
621 return 0;
622 }
623
624 int tipc_server_start(struct tipc_server *s)
625 {
626 int ret;
627
628 spin_lock_init(&s->idr_lock);
629 idr_init(&s->conn_idr);
630 s->idr_in_use = 0;
631
632 ret = tipc_work_start(s);
633 if (ret < 0)
634 return ret;
635
636 ret = tipc_open_listening_sock(s);
637 if (ret < 0)
638 tipc_work_stop(s);
639
640 return ret;
641 }
642
643 void tipc_server_stop(struct tipc_server *s)
644 {
645 struct tipc_conn *con;
646 int id;
647
648 spin_lock_bh(&s->idr_lock);
649 for (id = 0; s->idr_in_use; id++) {
650 con = idr_find(&s->conn_idr, id);
651 if (con) {
652 spin_unlock_bh(&s->idr_lock);
653 tipc_close_conn(con);
654 spin_lock_bh(&s->idr_lock);
655 }
656 }
657 spin_unlock_bh(&s->idr_lock);
658
659 tipc_work_stop(s);
660 idr_destroy(&s->conn_idr);
661 }