4 * Copyright (c) 2016 Tom Herbert <tom@herbertland.com>
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2
8 * as published by the Free Software Foundation.
11 #include <linux/bpf.h>
12 #include <linux/errno.h>
13 #include <linux/errqueue.h>
14 #include <linux/file.h>
16 #include <linux/kernel.h>
17 #include <linux/module.h>
18 #include <linux/net.h>
19 #include <linux/netdevice.h>
20 #include <linux/poll.h>
21 #include <linux/rculist.h>
22 #include <linux/skbuff.h>
23 #include <linux/socket.h>
24 #include <linux/uaccess.h>
25 #include <linux/workqueue.h>
26 #include <net/strparser.h>
27 #include <net/netns/generic.h>
31 static struct workqueue_struct
*strp_wq
;
34 /* Internal cb structure. struct strp_rx_msg must be first for passing
37 struct strp_rx_msg strp
;
42 static inline struct _strp_rx_msg
*_strp_rx_msg(struct sk_buff
*skb
)
44 return (struct _strp_rx_msg
*)((void *)skb
->cb
+
45 offsetof(struct qdisc_skb_cb
, data
));
49 static void strp_abort_rx_strp(struct strparser
*strp
, int err
)
51 struct sock
*csk
= strp
->sk
;
53 /* Unrecoverable error in receive */
55 del_timer(&strp
->rx_msg_timer
);
62 /* Report an error on the lower socket */
64 csk
->sk_error_report(csk
);
67 static void strp_start_rx_timer(struct strparser
*strp
)
69 if (strp
->sk
->sk_rcvtimeo
)
70 mod_timer(&strp
->rx_msg_timer
, strp
->sk
->sk_rcvtimeo
);
74 static void strp_parser_err(struct strparser
*strp
, int err
,
75 read_descriptor_t
*desc
)
78 kfree_skb(strp
->rx_skb_head
);
79 strp
->rx_skb_head
= NULL
;
80 strp
->cb
.abort_parser(strp
, err
);
83 /* Lower socket lock held */
84 static int strp_tcp_recv(read_descriptor_t
*desc
, struct sk_buff
*orig_skb
,
85 unsigned int orig_offset
, size_t orig_len
)
87 struct strparser
*strp
= (struct strparser
*)desc
->arg
.data
;
88 struct _strp_rx_msg
*rxm
;
89 struct sk_buff
*head
, *skb
;
90 size_t eaten
= 0, cand_len
;
93 bool cloned_orig
= false;
98 head
= strp
->rx_skb_head
;
100 /* Message already in progress */
102 rxm
= _strp_rx_msg(head
);
103 if (unlikely(rxm
->early_eaten
)) {
104 /* Already some number of bytes on the receive sock
105 * data saved in rx_skb_head, just indicate they
108 eaten
= orig_len
<= rxm
->early_eaten
?
109 orig_len
: rxm
->early_eaten
;
110 rxm
->early_eaten
-= eaten
;
115 if (unlikely(orig_offset
)) {
116 /* Getting data with a non-zero offset when a message is
117 * in progress is not expected. If it does happen, we
118 * need to clone and pull since we can't deal with
119 * offsets in the skbs for a message expect in the head.
121 orig_skb
= skb_clone(orig_skb
, GFP_ATOMIC
);
123 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
124 desc
->error
= -ENOMEM
;
127 if (!pskb_pull(orig_skb
, orig_offset
)) {
128 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
130 desc
->error
= -ENOMEM
;
137 if (!strp
->rx_skb_nextp
) {
138 /* We are going to append to the frags_list of head.
139 * Need to unshare the frag_list.
141 err
= skb_unclone(head
, GFP_ATOMIC
);
143 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
148 if (unlikely(skb_shinfo(head
)->frag_list
)) {
149 /* We can't append to an sk_buff that already
150 * has a frag_list. We create a new head, point
151 * the frag_list of that to the old head, and
152 * then are able to use the old head->next for
153 * appending to the message.
155 if (WARN_ON(head
->next
)) {
156 desc
->error
= -EINVAL
;
160 skb
= alloc_skb(0, GFP_ATOMIC
);
162 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
163 desc
->error
= -ENOMEM
;
166 skb
->len
= head
->len
;
167 skb
->data_len
= head
->len
;
168 skb
->truesize
= head
->truesize
;
169 *_strp_rx_msg(skb
) = *_strp_rx_msg(head
);
170 strp
->rx_skb_nextp
= &head
->next
;
171 skb_shinfo(skb
)->frag_list
= head
;
172 strp
->rx_skb_head
= skb
;
176 &skb_shinfo(head
)->frag_list
;
181 while (eaten
< orig_len
) {
182 /* Always clone since we will consume something */
183 skb
= skb_clone(orig_skb
, GFP_ATOMIC
);
185 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
186 desc
->error
= -ENOMEM
;
190 cand_len
= orig_len
- eaten
;
192 head
= strp
->rx_skb_head
;
195 strp
->rx_skb_head
= head
;
196 /* Will set rx_skb_nextp on next packet if needed */
197 strp
->rx_skb_nextp
= NULL
;
198 rxm
= _strp_rx_msg(head
);
199 memset(rxm
, 0, sizeof(*rxm
));
200 rxm
->strp
.offset
= orig_offset
+ eaten
;
202 /* Unclone since we may be appending to an skb that we
203 * already share a frag_list with.
205 err
= skb_unclone(skb
, GFP_ATOMIC
);
207 STRP_STATS_INCR(strp
->stats
.rx_mem_fail
);
212 rxm
= _strp_rx_msg(head
);
213 *strp
->rx_skb_nextp
= skb
;
214 strp
->rx_skb_nextp
= &skb
->next
;
215 head
->data_len
+= skb
->len
;
216 head
->len
+= skb
->len
;
217 head
->truesize
+= skb
->truesize
;
220 if (!rxm
->strp
.full_len
) {
223 len
= (*strp
->cb
.parse_msg
)(strp
, head
);
226 /* Need more header to determine length */
227 if (!rxm
->accum_len
) {
228 /* Start RX timer for new message */
229 strp_start_rx_timer(strp
);
231 rxm
->accum_len
+= cand_len
;
233 STRP_STATS_INCR(strp
->stats
.rx_need_more_hdr
);
234 WARN_ON(eaten
!= orig_len
);
236 } else if (len
< 0) {
237 if (len
== -ESTRPIPE
&& rxm
->accum_len
) {
239 strp
->rx_unrecov_intr
= 1;
241 strp
->rx_interrupted
= 1;
243 strp_parser_err(strp
, err
, desc
);
245 } else if (len
> strp
->sk
->sk_rcvbuf
) {
246 /* Message length exceeds maximum allowed */
247 STRP_STATS_INCR(strp
->stats
.rx_msg_too_big
);
248 strp_parser_err(strp
, -EMSGSIZE
, desc
);
250 } else if (len
<= (ssize_t
)head
->len
-
251 skb
->len
- rxm
->strp
.offset
) {
252 /* Length must be into new skb (and also
255 STRP_STATS_INCR(strp
->stats
.rx_bad_hdr_len
);
256 strp_parser_err(strp
, -EPROTO
, desc
);
260 rxm
->strp
.full_len
= len
;
263 extra
= (ssize_t
)(rxm
->accum_len
+ cand_len
) -
267 /* Message not complete yet. */
268 if (rxm
->strp
.full_len
- rxm
->accum_len
>
270 /* Don't have the whole messages in the socket
271 * buffer. Set strp->rx_need_bytes to wait for
272 * the rest of the message. Also, set "early
273 * eaten" since we've already buffered the skb
274 * but don't consume yet per tcp_read_sock.
277 if (!rxm
->accum_len
) {
278 /* Start RX timer for new message */
279 strp_start_rx_timer(strp
);
282 strp
->rx_need_bytes
= rxm
->strp
.full_len
-
284 rxm
->accum_len
+= cand_len
;
285 rxm
->early_eaten
= cand_len
;
286 STRP_STATS_ADD(strp
->stats
.rx_bytes
, cand_len
);
287 desc
->count
= 0; /* Stop reading socket */
290 rxm
->accum_len
+= cand_len
;
292 WARN_ON(eaten
!= orig_len
);
296 /* Positive extra indicates ore bytes than needed for the
300 WARN_ON(extra
> cand_len
);
302 eaten
+= (cand_len
- extra
);
304 /* Hurray, we have a new message! */
305 del_timer(&strp
->rx_msg_timer
);
306 strp
->rx_skb_head
= NULL
;
307 STRP_STATS_INCR(strp
->stats
.rx_msgs
);
309 /* Give skb to upper layer */
310 strp
->cb
.rcv_msg(strp
, head
);
312 if (unlikely(strp
->rx_paused
)) {
313 /* Upper layer paused strp */
321 STRP_STATS_ADD(strp
->stats
.rx_bytes
, eaten
);
326 static int default_read_sock_done(struct strparser
*strp
, int err
)
331 /* Called with lock held on lower socket */
332 static int strp_tcp_read_sock(struct strparser
*strp
)
334 read_descriptor_t desc
;
336 desc
.arg
.data
= strp
;
338 desc
.count
= 1; /* give more than one skb per call */
340 /* sk should be locked here, so okay to do tcp_read_sock */
341 tcp_read_sock(strp
->sk
, &desc
, strp_tcp_recv
);
343 desc
.error
= strp
->cb
.read_sock_done(strp
, desc
.error
);
348 /* Lower sock lock held */
349 void strp_tcp_data_ready(struct strparser
*strp
)
351 struct sock
*csk
= strp
->sk
;
353 if (unlikely(strp
->rx_stopped
))
356 /* This check is needed to synchronize with do_strp_rx_work.
357 * do_strp_rx_work acquires a process lock (lock_sock) whereas
358 * the lock held here is bh_lock_sock. The two locks can be
359 * held by different threads at the same time, but bh_lock_sock
360 * allows a thread in BH context to safely check if the process
361 * lock is held. In this case, if the lock is held, queue work.
363 if (sock_owned_by_user(csk
)) {
364 queue_work(strp_wq
, &strp
->rx_work
);
371 if (strp
->rx_need_bytes
) {
372 if (tcp_inq(csk
) >= strp
->rx_need_bytes
)
373 strp
->rx_need_bytes
= 0;
378 if (strp_tcp_read_sock(strp
) == -ENOMEM
)
379 queue_work(strp_wq
, &strp
->rx_work
);
381 EXPORT_SYMBOL_GPL(strp_tcp_data_ready
);
383 static void do_strp_rx_work(struct strparser
*strp
)
385 read_descriptor_t rd_desc
;
386 struct sock
*csk
= strp
->sk
;
388 /* We need the read lock to synchronize with strp_tcp_data_ready. We
389 * need the socket lock for calling tcp_read_sock.
393 if (unlikely(csk
->sk_user_data
!= strp
))
396 if (unlikely(strp
->rx_stopped
))
402 rd_desc
.arg
.data
= strp
;
404 if (strp_tcp_read_sock(strp
) == -ENOMEM
)
405 queue_work(strp_wq
, &strp
->rx_work
);
411 static void strp_rx_work(struct work_struct
*w
)
413 do_strp_rx_work(container_of(w
, struct strparser
, rx_work
));
416 static void strp_rx_msg_timeout(unsigned long arg
)
418 struct strparser
*strp
= (struct strparser
*)arg
;
420 /* Message assembly timed out */
421 STRP_STATS_INCR(strp
->stats
.rx_msg_timeouts
);
423 strp
->cb
.abort_parser(strp
, ETIMEDOUT
);
424 release_sock(strp
->sk
);
427 int strp_init(struct strparser
*strp
, struct sock
*csk
,
428 struct strp_callbacks
*cb
)
430 if (!cb
|| !cb
->rcv_msg
|| !cb
->parse_msg
)
433 memset(strp
, 0, sizeof(*strp
));
437 setup_timer(&strp
->rx_msg_timer
, strp_rx_msg_timeout
,
438 (unsigned long)strp
);
440 INIT_WORK(&strp
->rx_work
, strp_rx_work
);
442 strp
->cb
.rcv_msg
= cb
->rcv_msg
;
443 strp
->cb
.parse_msg
= cb
->parse_msg
;
444 strp
->cb
.read_sock_done
= cb
->read_sock_done
? : default_read_sock_done
;
445 strp
->cb
.abort_parser
= cb
->abort_parser
? : strp_abort_rx_strp
;
449 EXPORT_SYMBOL_GPL(strp_init
);
451 /* strp must already be stopped so that strp_tcp_recv will no longer be called.
452 * Note that strp_done is not called with the lower socket held.
454 void strp_done(struct strparser
*strp
)
456 WARN_ON(!strp
->rx_stopped
);
458 del_timer_sync(&strp
->rx_msg_timer
);
459 cancel_work_sync(&strp
->rx_work
);
461 if (strp
->rx_skb_head
) {
462 kfree_skb(strp
->rx_skb_head
);
463 strp
->rx_skb_head
= NULL
;
466 EXPORT_SYMBOL_GPL(strp_done
);
468 void strp_stop(struct strparser
*strp
)
470 strp
->rx_stopped
= 1;
472 EXPORT_SYMBOL_GPL(strp_stop
);
474 void strp_check_rcv(struct strparser
*strp
)
476 queue_work(strp_wq
, &strp
->rx_work
);
478 EXPORT_SYMBOL_GPL(strp_check_rcv
);
480 static int __init
strp_mod_init(void)
482 strp_wq
= create_singlethread_workqueue("kstrp");
487 static void __exit
strp_mod_exit(void)
490 module_init(strp_mod_init
);
491 module_exit(strp_mod_exit
);
492 MODULE_LICENSE("GPL");