]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blob - net/sunrpc/svc.c
ceph: avoid iput_final() while holding mutex or in dispatch thread
[mirror_ubuntu-bionic-kernel.git] / net / sunrpc / svc.c
1 /*
2 * linux/net/sunrpc/svc.c
3 *
4 * High-level RPC service routines
5 *
6 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
7 *
8 * Multiple threads pools and NUMAisation
9 * Copyright (c) 2006 Silicon Graphics, Inc.
10 * by Greg Banks <gnb@melbourne.sgi.com>
11 */
12
13 #include <linux/linkage.h>
14 #include <linux/sched/signal.h>
15 #include <linux/errno.h>
16 #include <linux/net.h>
17 #include <linux/in.h>
18 #include <linux/mm.h>
19 #include <linux/interrupt.h>
20 #include <linux/module.h>
21 #include <linux/kthread.h>
22 #include <linux/slab.h>
23
24 #include <linux/sunrpc/types.h>
25 #include <linux/sunrpc/xdr.h>
26 #include <linux/sunrpc/stats.h>
27 #include <linux/sunrpc/svcsock.h>
28 #include <linux/sunrpc/clnt.h>
29 #include <linux/sunrpc/bc_xprt.h>
30
31 #include <trace/events/sunrpc.h>
32
33 #define RPCDBG_FACILITY RPCDBG_SVCDSP
34
35 static void svc_unregister(const struct svc_serv *serv, struct net *net);
36
37 void svc_tcp_prep_reply_hdr(struct svc_rqst *rqstp);
38
39 #define svc_serv_is_pooled(serv) ((serv)->sv_ops->svo_function)
40
41 #define SVC_POOL_DEFAULT SVC_POOL_GLOBAL
42
43 /*
44 * Structure for mapping cpus to pools and vice versa.
45 * Setup once during sunrpc initialisation.
46 */
47 struct svc_pool_map svc_pool_map = {
48 .mode = SVC_POOL_DEFAULT
49 };
50 EXPORT_SYMBOL_GPL(svc_pool_map);
51
52 static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
53
54 static int
55 param_set_pool_mode(const char *val, const struct kernel_param *kp)
56 {
57 int *ip = (int *)kp->arg;
58 struct svc_pool_map *m = &svc_pool_map;
59 int err;
60
61 mutex_lock(&svc_pool_map_mutex);
62
63 err = -EBUSY;
64 if (m->count)
65 goto out;
66
67 err = 0;
68 if (!strncmp(val, "auto", 4))
69 *ip = SVC_POOL_AUTO;
70 else if (!strncmp(val, "global", 6))
71 *ip = SVC_POOL_GLOBAL;
72 else if (!strncmp(val, "percpu", 6))
73 *ip = SVC_POOL_PERCPU;
74 else if (!strncmp(val, "pernode", 7))
75 *ip = SVC_POOL_PERNODE;
76 else
77 err = -EINVAL;
78
79 out:
80 mutex_unlock(&svc_pool_map_mutex);
81 return err;
82 }
83
84 static int
85 param_get_pool_mode(char *buf, const struct kernel_param *kp)
86 {
87 int *ip = (int *)kp->arg;
88
89 switch (*ip)
90 {
91 case SVC_POOL_AUTO:
92 return strlcpy(buf, "auto", 20);
93 case SVC_POOL_GLOBAL:
94 return strlcpy(buf, "global", 20);
95 case SVC_POOL_PERCPU:
96 return strlcpy(buf, "percpu", 20);
97 case SVC_POOL_PERNODE:
98 return strlcpy(buf, "pernode", 20);
99 default:
100 return sprintf(buf, "%d", *ip);
101 }
102 }
103
104 module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
105 &svc_pool_map.mode, 0644);
106
107 /*
108 * Detect best pool mapping mode heuristically,
109 * according to the machine's topology.
110 */
111 static int
112 svc_pool_map_choose_mode(void)
113 {
114 unsigned int node;
115
116 if (nr_online_nodes > 1) {
117 /*
118 * Actually have multiple NUMA nodes,
119 * so split pools on NUMA node boundaries
120 */
121 return SVC_POOL_PERNODE;
122 }
123
124 node = first_online_node;
125 if (nr_cpus_node(node) > 2) {
126 /*
127 * Non-trivial SMP, or CONFIG_NUMA on
128 * non-NUMA hardware, e.g. with a generic
129 * x86_64 kernel on Xeons. In this case we
130 * want to divide the pools on cpu boundaries.
131 */
132 return SVC_POOL_PERCPU;
133 }
134
135 /* default: one global pool */
136 return SVC_POOL_GLOBAL;
137 }
138
139 /*
140 * Allocate the to_pool[] and pool_to[] arrays.
141 * Returns 0 on success or an errno.
142 */
143 static int
144 svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
145 {
146 m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
147 if (!m->to_pool)
148 goto fail;
149 m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
150 if (!m->pool_to)
151 goto fail_free;
152
153 return 0;
154
155 fail_free:
156 kfree(m->to_pool);
157 m->to_pool = NULL;
158 fail:
159 return -ENOMEM;
160 }
161
162 /*
163 * Initialise the pool map for SVC_POOL_PERCPU mode.
164 * Returns number of pools or <0 on error.
165 */
166 static int
167 svc_pool_map_init_percpu(struct svc_pool_map *m)
168 {
169 unsigned int maxpools = nr_cpu_ids;
170 unsigned int pidx = 0;
171 unsigned int cpu;
172 int err;
173
174 err = svc_pool_map_alloc_arrays(m, maxpools);
175 if (err)
176 return err;
177
178 for_each_online_cpu(cpu) {
179 BUG_ON(pidx >= maxpools);
180 m->to_pool[cpu] = pidx;
181 m->pool_to[pidx] = cpu;
182 pidx++;
183 }
184 /* cpus brought online later all get mapped to pool0, sorry */
185
186 return pidx;
187 };
188
189
190 /*
191 * Initialise the pool map for SVC_POOL_PERNODE mode.
192 * Returns number of pools or <0 on error.
193 */
194 static int
195 svc_pool_map_init_pernode(struct svc_pool_map *m)
196 {
197 unsigned int maxpools = nr_node_ids;
198 unsigned int pidx = 0;
199 unsigned int node;
200 int err;
201
202 err = svc_pool_map_alloc_arrays(m, maxpools);
203 if (err)
204 return err;
205
206 for_each_node_with_cpus(node) {
207 /* some architectures (e.g. SN2) have cpuless nodes */
208 BUG_ON(pidx > maxpools);
209 m->to_pool[node] = pidx;
210 m->pool_to[pidx] = node;
211 pidx++;
212 }
213 /* nodes brought online later all get mapped to pool0, sorry */
214
215 return pidx;
216 }
217
218
219 /*
220 * Add a reference to the global map of cpus to pools (and
221 * vice versa). Initialise the map if we're the first user.
222 * Returns the number of pools.
223 */
224 unsigned int
225 svc_pool_map_get(void)
226 {
227 struct svc_pool_map *m = &svc_pool_map;
228 int npools = -1;
229
230 mutex_lock(&svc_pool_map_mutex);
231
232 if (m->count++) {
233 mutex_unlock(&svc_pool_map_mutex);
234 return m->npools;
235 }
236
237 if (m->mode == SVC_POOL_AUTO)
238 m->mode = svc_pool_map_choose_mode();
239
240 switch (m->mode) {
241 case SVC_POOL_PERCPU:
242 npools = svc_pool_map_init_percpu(m);
243 break;
244 case SVC_POOL_PERNODE:
245 npools = svc_pool_map_init_pernode(m);
246 break;
247 }
248
249 if (npools < 0) {
250 /* default, or memory allocation failure */
251 npools = 1;
252 m->mode = SVC_POOL_GLOBAL;
253 }
254 m->npools = npools;
255
256 mutex_unlock(&svc_pool_map_mutex);
257 return m->npools;
258 }
259 EXPORT_SYMBOL_GPL(svc_pool_map_get);
260
261 /*
262 * Drop a reference to the global map of cpus to pools.
263 * When the last reference is dropped, the map data is
264 * freed; this allows the sysadmin to change the pool
265 * mode using the pool_mode module option without
266 * rebooting or re-loading sunrpc.ko.
267 */
268 void
269 svc_pool_map_put(void)
270 {
271 struct svc_pool_map *m = &svc_pool_map;
272
273 mutex_lock(&svc_pool_map_mutex);
274
275 if (!--m->count) {
276 kfree(m->to_pool);
277 m->to_pool = NULL;
278 kfree(m->pool_to);
279 m->pool_to = NULL;
280 m->npools = 0;
281 }
282
283 mutex_unlock(&svc_pool_map_mutex);
284 }
285 EXPORT_SYMBOL_GPL(svc_pool_map_put);
286
287 static int svc_pool_map_get_node(unsigned int pidx)
288 {
289 const struct svc_pool_map *m = &svc_pool_map;
290
291 if (m->count) {
292 if (m->mode == SVC_POOL_PERCPU)
293 return cpu_to_node(m->pool_to[pidx]);
294 if (m->mode == SVC_POOL_PERNODE)
295 return m->pool_to[pidx];
296 }
297 return NUMA_NO_NODE;
298 }
299 /*
300 * Set the given thread's cpus_allowed mask so that it
301 * will only run on cpus in the given pool.
302 */
303 static inline void
304 svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
305 {
306 struct svc_pool_map *m = &svc_pool_map;
307 unsigned int node = m->pool_to[pidx];
308
309 /*
310 * The caller checks for sv_nrpools > 1, which
311 * implies that we've been initialized.
312 */
313 WARN_ON_ONCE(m->count == 0);
314 if (m->count == 0)
315 return;
316
317 switch (m->mode) {
318 case SVC_POOL_PERCPU:
319 {
320 set_cpus_allowed_ptr(task, cpumask_of(node));
321 break;
322 }
323 case SVC_POOL_PERNODE:
324 {
325 set_cpus_allowed_ptr(task, cpumask_of_node(node));
326 break;
327 }
328 }
329 }
330
331 /*
332 * Use the mapping mode to choose a pool for a given CPU.
333 * Used when enqueueing an incoming RPC. Always returns
334 * a non-NULL pool pointer.
335 */
336 struct svc_pool *
337 svc_pool_for_cpu(struct svc_serv *serv, int cpu)
338 {
339 struct svc_pool_map *m = &svc_pool_map;
340 unsigned int pidx = 0;
341
342 /*
343 * An uninitialised map happens in a pure client when
344 * lockd is brought up, so silently treat it the
345 * same as SVC_POOL_GLOBAL.
346 */
347 if (svc_serv_is_pooled(serv)) {
348 switch (m->mode) {
349 case SVC_POOL_PERCPU:
350 pidx = m->to_pool[cpu];
351 break;
352 case SVC_POOL_PERNODE:
353 pidx = m->to_pool[cpu_to_node(cpu)];
354 break;
355 }
356 }
357 return &serv->sv_pools[pidx % serv->sv_nrpools];
358 }
359
360 int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
361 {
362 int err;
363
364 err = rpcb_create_local(net);
365 if (err)
366 return err;
367
368 /* Remove any stale portmap registrations */
369 svc_unregister(serv, net);
370 return 0;
371 }
372 EXPORT_SYMBOL_GPL(svc_rpcb_setup);
373
374 void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
375 {
376 svc_unregister(serv, net);
377 rpcb_put_local(net);
378 }
379 EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
380
381 static int svc_uses_rpcbind(struct svc_serv *serv)
382 {
383 struct svc_program *progp;
384 unsigned int i;
385
386 for (progp = serv->sv_program; progp; progp = progp->pg_next) {
387 for (i = 0; i < progp->pg_nvers; i++) {
388 if (progp->pg_vers[i] == NULL)
389 continue;
390 if (!progp->pg_vers[i]->vs_hidden)
391 return 1;
392 }
393 }
394
395 return 0;
396 }
397
398 int svc_bind(struct svc_serv *serv, struct net *net)
399 {
400 if (!svc_uses_rpcbind(serv))
401 return 0;
402 return svc_rpcb_setup(serv, net);
403 }
404 EXPORT_SYMBOL_GPL(svc_bind);
405
406 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
407 static void
408 __svc_init_bc(struct svc_serv *serv)
409 {
410 INIT_LIST_HEAD(&serv->sv_cb_list);
411 spin_lock_init(&serv->sv_cb_lock);
412 init_waitqueue_head(&serv->sv_cb_waitq);
413 }
414 #else
415 static void
416 __svc_init_bc(struct svc_serv *serv)
417 {
418 }
419 #endif
420
421 /*
422 * Create an RPC service
423 */
424 static struct svc_serv *
425 __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
426 const struct svc_serv_ops *ops)
427 {
428 struct svc_serv *serv;
429 unsigned int vers;
430 unsigned int xdrsize;
431 unsigned int i;
432
433 if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
434 return NULL;
435 serv->sv_name = prog->pg_name;
436 serv->sv_program = prog;
437 serv->sv_nrthreads = 1;
438 serv->sv_stats = prog->pg_stats;
439 if (bufsize > RPCSVC_MAXPAYLOAD)
440 bufsize = RPCSVC_MAXPAYLOAD;
441 serv->sv_max_payload = bufsize? bufsize : 4096;
442 serv->sv_max_mesg = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
443 serv->sv_ops = ops;
444 xdrsize = 0;
445 while (prog) {
446 prog->pg_lovers = prog->pg_nvers-1;
447 for (vers=0; vers<prog->pg_nvers ; vers++)
448 if (prog->pg_vers[vers]) {
449 prog->pg_hivers = vers;
450 if (prog->pg_lovers > vers)
451 prog->pg_lovers = vers;
452 if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
453 xdrsize = prog->pg_vers[vers]->vs_xdrsize;
454 }
455 prog = prog->pg_next;
456 }
457 serv->sv_xdrsize = xdrsize;
458 INIT_LIST_HEAD(&serv->sv_tempsocks);
459 INIT_LIST_HEAD(&serv->sv_permsocks);
460 timer_setup(&serv->sv_temptimer, NULL, 0);
461 spin_lock_init(&serv->sv_lock);
462
463 __svc_init_bc(serv);
464
465 serv->sv_nrpools = npools;
466 serv->sv_pools =
467 kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
468 GFP_KERNEL);
469 if (!serv->sv_pools) {
470 kfree(serv);
471 return NULL;
472 }
473
474 for (i = 0; i < serv->sv_nrpools; i++) {
475 struct svc_pool *pool = &serv->sv_pools[i];
476
477 dprintk("svc: initialising pool %u for %s\n",
478 i, serv->sv_name);
479
480 pool->sp_id = i;
481 INIT_LIST_HEAD(&pool->sp_sockets);
482 INIT_LIST_HEAD(&pool->sp_all_threads);
483 spin_lock_init(&pool->sp_lock);
484 }
485
486 return serv;
487 }
488
489 struct svc_serv *
490 svc_create(struct svc_program *prog, unsigned int bufsize,
491 const struct svc_serv_ops *ops)
492 {
493 return __svc_create(prog, bufsize, /*npools*/1, ops);
494 }
495 EXPORT_SYMBOL_GPL(svc_create);
496
497 struct svc_serv *
498 svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
499 const struct svc_serv_ops *ops)
500 {
501 struct svc_serv *serv;
502 unsigned int npools = svc_pool_map_get();
503
504 serv = __svc_create(prog, bufsize, npools, ops);
505 if (!serv)
506 goto out_err;
507 return serv;
508 out_err:
509 svc_pool_map_put();
510 return NULL;
511 }
512 EXPORT_SYMBOL_GPL(svc_create_pooled);
513
514 void svc_shutdown_net(struct svc_serv *serv, struct net *net)
515 {
516 svc_close_net(serv, net);
517
518 if (serv->sv_ops->svo_shutdown)
519 serv->sv_ops->svo_shutdown(serv, net);
520 }
521 EXPORT_SYMBOL_GPL(svc_shutdown_net);
522
523 /*
524 * Destroy an RPC service. Should be called with appropriate locking to
525 * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
526 */
527 void
528 svc_destroy(struct svc_serv *serv)
529 {
530 dprintk("svc: svc_destroy(%s, %d)\n",
531 serv->sv_program->pg_name,
532 serv->sv_nrthreads);
533
534 if (serv->sv_nrthreads) {
535 if (--(serv->sv_nrthreads) != 0) {
536 svc_sock_update_bufs(serv);
537 return;
538 }
539 } else
540 printk("svc_destroy: no threads for serv=%p!\n", serv);
541
542 del_timer_sync(&serv->sv_temptimer);
543
544 /*
545 * The last user is gone and thus all sockets have to be destroyed to
546 * the point. Check this.
547 */
548 BUG_ON(!list_empty(&serv->sv_permsocks));
549 BUG_ON(!list_empty(&serv->sv_tempsocks));
550
551 cache_clean_deferred(serv);
552
553 if (svc_serv_is_pooled(serv))
554 svc_pool_map_put();
555
556 kfree(serv->sv_pools);
557 kfree(serv);
558 }
559 EXPORT_SYMBOL_GPL(svc_destroy);
560
561 /*
562 * Allocate an RPC server's buffer space.
563 * We allocate pages and place them in rq_argpages.
564 */
565 static int
566 svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
567 {
568 unsigned int pages, arghi;
569
570 /* bc_xprt uses fore channel allocated buffers */
571 if (svc_is_backchannel(rqstp))
572 return 1;
573
574 pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
575 * We assume one is at most one page
576 */
577 arghi = 0;
578 WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
579 if (pages > RPCSVC_MAXPAGES)
580 pages = RPCSVC_MAXPAGES;
581 while (pages) {
582 struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
583 if (!p)
584 break;
585 rqstp->rq_pages[arghi++] = p;
586 pages--;
587 }
588 return pages == 0;
589 }
590
591 /*
592 * Release an RPC server buffer
593 */
594 static void
595 svc_release_buffer(struct svc_rqst *rqstp)
596 {
597 unsigned int i;
598
599 for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
600 if (rqstp->rq_pages[i])
601 put_page(rqstp->rq_pages[i]);
602 }
603
604 struct svc_rqst *
605 svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
606 {
607 struct svc_rqst *rqstp;
608
609 rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
610 if (!rqstp)
611 return rqstp;
612
613 __set_bit(RQ_BUSY, &rqstp->rq_flags);
614 spin_lock_init(&rqstp->rq_lock);
615 rqstp->rq_server = serv;
616 rqstp->rq_pool = pool;
617
618 rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
619 if (!rqstp->rq_argp)
620 goto out_enomem;
621
622 rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
623 if (!rqstp->rq_resp)
624 goto out_enomem;
625
626 if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
627 goto out_enomem;
628
629 return rqstp;
630 out_enomem:
631 svc_rqst_free(rqstp);
632 return NULL;
633 }
634 EXPORT_SYMBOL_GPL(svc_rqst_alloc);
635
636 struct svc_rqst *
637 svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
638 {
639 struct svc_rqst *rqstp;
640
641 rqstp = svc_rqst_alloc(serv, pool, node);
642 if (!rqstp)
643 return ERR_PTR(-ENOMEM);
644
645 serv->sv_nrthreads++;
646 spin_lock_bh(&pool->sp_lock);
647 pool->sp_nrthreads++;
648 list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
649 spin_unlock_bh(&pool->sp_lock);
650 return rqstp;
651 }
652 EXPORT_SYMBOL_GPL(svc_prepare_thread);
653
654 /*
655 * Choose a pool in which to create a new thread, for svc_set_num_threads
656 */
657 static inline struct svc_pool *
658 choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
659 {
660 if (pool != NULL)
661 return pool;
662
663 return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
664 }
665
666 /*
667 * Choose a thread to kill, for svc_set_num_threads
668 */
669 static inline struct task_struct *
670 choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
671 {
672 unsigned int i;
673 struct task_struct *task = NULL;
674
675 if (pool != NULL) {
676 spin_lock_bh(&pool->sp_lock);
677 } else {
678 /* choose a pool in round-robin fashion */
679 for (i = 0; i < serv->sv_nrpools; i++) {
680 pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
681 spin_lock_bh(&pool->sp_lock);
682 if (!list_empty(&pool->sp_all_threads))
683 goto found_pool;
684 spin_unlock_bh(&pool->sp_lock);
685 }
686 return NULL;
687 }
688
689 found_pool:
690 if (!list_empty(&pool->sp_all_threads)) {
691 struct svc_rqst *rqstp;
692
693 /*
694 * Remove from the pool->sp_all_threads list
695 * so we don't try to kill it again.
696 */
697 rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
698 set_bit(RQ_VICTIM, &rqstp->rq_flags);
699 list_del_rcu(&rqstp->rq_all);
700 task = rqstp->rq_task;
701 }
702 spin_unlock_bh(&pool->sp_lock);
703
704 return task;
705 }
706
707 /* create new threads */
708 static int
709 svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
710 {
711 struct svc_rqst *rqstp;
712 struct task_struct *task;
713 struct svc_pool *chosen_pool;
714 unsigned int state = serv->sv_nrthreads-1;
715 int node;
716
717 do {
718 nrservs--;
719 chosen_pool = choose_pool(serv, pool, &state);
720
721 node = svc_pool_map_get_node(chosen_pool->sp_id);
722 rqstp = svc_prepare_thread(serv, chosen_pool, node);
723 if (IS_ERR(rqstp))
724 return PTR_ERR(rqstp);
725
726 __module_get(serv->sv_ops->svo_module);
727 task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
728 node, "%s", serv->sv_name);
729 if (IS_ERR(task)) {
730 module_put(serv->sv_ops->svo_module);
731 svc_exit_thread(rqstp);
732 return PTR_ERR(task);
733 }
734
735 rqstp->rq_task = task;
736 if (serv->sv_nrpools > 1)
737 svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
738
739 svc_sock_update_bufs(serv);
740 wake_up_process(task);
741 } while (nrservs > 0);
742
743 return 0;
744 }
745
746
747 /* destroy old threads */
748 static int
749 svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
750 {
751 struct task_struct *task;
752 unsigned int state = serv->sv_nrthreads-1;
753
754 /* destroy old threads */
755 do {
756 task = choose_victim(serv, pool, &state);
757 if (task == NULL)
758 break;
759 send_sig(SIGINT, task, 1);
760 nrservs++;
761 } while (nrservs < 0);
762
763 return 0;
764 }
765
766 /*
767 * Create or destroy enough new threads to make the number
768 * of threads the given number. If `pool' is non-NULL, applies
769 * only to threads in that pool, otherwise round-robins between
770 * all pools. Caller must ensure that mutual exclusion between this and
771 * server startup or shutdown.
772 *
773 * Destroying threads relies on the service threads filling in
774 * rqstp->rq_task, which only the nfs ones do. Assumes the serv
775 * has been created using svc_create_pooled().
776 *
777 * Based on code that used to be in nfsd_svc() but tweaked
778 * to be pool-aware.
779 */
780 int
781 svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
782 {
783 if (pool == NULL) {
784 /* The -1 assumes caller has done a svc_get() */
785 nrservs -= (serv->sv_nrthreads-1);
786 } else {
787 spin_lock_bh(&pool->sp_lock);
788 nrservs -= pool->sp_nrthreads;
789 spin_unlock_bh(&pool->sp_lock);
790 }
791
792 if (nrservs > 0)
793 return svc_start_kthreads(serv, pool, nrservs);
794 if (nrservs < 0)
795 return svc_signal_kthreads(serv, pool, nrservs);
796 return 0;
797 }
798 EXPORT_SYMBOL_GPL(svc_set_num_threads);
799
800 /* destroy old threads */
801 static int
802 svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
803 {
804 struct task_struct *task;
805 unsigned int state = serv->sv_nrthreads-1;
806
807 /* destroy old threads */
808 do {
809 task = choose_victim(serv, pool, &state);
810 if (task == NULL)
811 break;
812 kthread_stop(task);
813 nrservs++;
814 } while (nrservs < 0);
815 return 0;
816 }
817
818 int
819 svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
820 {
821 if (pool == NULL) {
822 /* The -1 assumes caller has done a svc_get() */
823 nrservs -= (serv->sv_nrthreads-1);
824 } else {
825 spin_lock_bh(&pool->sp_lock);
826 nrservs -= pool->sp_nrthreads;
827 spin_unlock_bh(&pool->sp_lock);
828 }
829
830 if (nrservs > 0)
831 return svc_start_kthreads(serv, pool, nrservs);
832 if (nrservs < 0)
833 return svc_stop_kthreads(serv, pool, nrservs);
834 return 0;
835 }
836 EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
837
838 /*
839 * Called from a server thread as it's exiting. Caller must hold the "service
840 * mutex" for the service.
841 */
842 void
843 svc_rqst_free(struct svc_rqst *rqstp)
844 {
845 svc_release_buffer(rqstp);
846 kfree(rqstp->rq_resp);
847 kfree(rqstp->rq_argp);
848 kfree(rqstp->rq_auth_data);
849 kfree_rcu(rqstp, rq_rcu_head);
850 }
851 EXPORT_SYMBOL_GPL(svc_rqst_free);
852
853 void
854 svc_exit_thread(struct svc_rqst *rqstp)
855 {
856 struct svc_serv *serv = rqstp->rq_server;
857 struct svc_pool *pool = rqstp->rq_pool;
858
859 spin_lock_bh(&pool->sp_lock);
860 pool->sp_nrthreads--;
861 if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
862 list_del_rcu(&rqstp->rq_all);
863 spin_unlock_bh(&pool->sp_lock);
864
865 svc_rqst_free(rqstp);
866
867 /* Release the server */
868 if (serv)
869 svc_destroy(serv);
870 }
871 EXPORT_SYMBOL_GPL(svc_exit_thread);
872
873 /*
874 * Register an "inet" protocol family netid with the local
875 * rpcbind daemon via an rpcbind v4 SET request.
876 *
877 * No netconfig infrastructure is available in the kernel, so
878 * we map IP_ protocol numbers to netids by hand.
879 *
880 * Returns zero on success; a negative errno value is returned
881 * if any error occurs.
882 */
883 static int __svc_rpcb_register4(struct net *net, const u32 program,
884 const u32 version,
885 const unsigned short protocol,
886 const unsigned short port)
887 {
888 const struct sockaddr_in sin = {
889 .sin_family = AF_INET,
890 .sin_addr.s_addr = htonl(INADDR_ANY),
891 .sin_port = htons(port),
892 };
893 const char *netid;
894 int error;
895
896 switch (protocol) {
897 case IPPROTO_UDP:
898 netid = RPCBIND_NETID_UDP;
899 break;
900 case IPPROTO_TCP:
901 netid = RPCBIND_NETID_TCP;
902 break;
903 default:
904 return -ENOPROTOOPT;
905 }
906
907 error = rpcb_v4_register(net, program, version,
908 (const struct sockaddr *)&sin, netid);
909
910 /*
911 * User space didn't support rpcbind v4, so retry this
912 * registration request with the legacy rpcbind v2 protocol.
913 */
914 if (error == -EPROTONOSUPPORT)
915 error = rpcb_register(net, program, version, protocol, port);
916
917 return error;
918 }
919
920 #if IS_ENABLED(CONFIG_IPV6)
921 /*
922 * Register an "inet6" protocol family netid with the local
923 * rpcbind daemon via an rpcbind v4 SET request.
924 *
925 * No netconfig infrastructure is available in the kernel, so
926 * we map IP_ protocol numbers to netids by hand.
927 *
928 * Returns zero on success; a negative errno value is returned
929 * if any error occurs.
930 */
931 static int __svc_rpcb_register6(struct net *net, const u32 program,
932 const u32 version,
933 const unsigned short protocol,
934 const unsigned short port)
935 {
936 const struct sockaddr_in6 sin6 = {
937 .sin6_family = AF_INET6,
938 .sin6_addr = IN6ADDR_ANY_INIT,
939 .sin6_port = htons(port),
940 };
941 const char *netid;
942 int error;
943
944 switch (protocol) {
945 case IPPROTO_UDP:
946 netid = RPCBIND_NETID_UDP6;
947 break;
948 case IPPROTO_TCP:
949 netid = RPCBIND_NETID_TCP6;
950 break;
951 default:
952 return -ENOPROTOOPT;
953 }
954
955 error = rpcb_v4_register(net, program, version,
956 (const struct sockaddr *)&sin6, netid);
957
958 /*
959 * User space didn't support rpcbind version 4, so we won't
960 * use a PF_INET6 listener.
961 */
962 if (error == -EPROTONOSUPPORT)
963 error = -EAFNOSUPPORT;
964
965 return error;
966 }
967 #endif /* IS_ENABLED(CONFIG_IPV6) */
968
969 /*
970 * Register a kernel RPC service via rpcbind version 4.
971 *
972 * Returns zero on success; a negative errno value is returned
973 * if any error occurs.
974 */
975 static int __svc_register(struct net *net, const char *progname,
976 const u32 program, const u32 version,
977 const int family,
978 const unsigned short protocol,
979 const unsigned short port)
980 {
981 int error = -EAFNOSUPPORT;
982
983 switch (family) {
984 case PF_INET:
985 error = __svc_rpcb_register4(net, program, version,
986 protocol, port);
987 break;
988 #if IS_ENABLED(CONFIG_IPV6)
989 case PF_INET6:
990 error = __svc_rpcb_register6(net, program, version,
991 protocol, port);
992 #endif
993 }
994
995 return error;
996 }
997
998 /**
999 * svc_register - register an RPC service with the local portmapper
1000 * @serv: svc_serv struct for the service to register
1001 * @net: net namespace for the service to register
1002 * @family: protocol family of service's listener socket
1003 * @proto: transport protocol number to advertise
1004 * @port: port to advertise
1005 *
1006 * Service is registered for any address in the passed-in protocol family
1007 */
1008 int svc_register(const struct svc_serv *serv, struct net *net,
1009 const int family, const unsigned short proto,
1010 const unsigned short port)
1011 {
1012 struct svc_program *progp;
1013 const struct svc_version *vers;
1014 unsigned int i;
1015 int error = 0;
1016
1017 WARN_ON_ONCE(proto == 0 && port == 0);
1018 if (proto == 0 && port == 0)
1019 return -EINVAL;
1020
1021 for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1022 for (i = 0; i < progp->pg_nvers; i++) {
1023 vers = progp->pg_vers[i];
1024 if (vers == NULL)
1025 continue;
1026
1027 dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
1028 progp->pg_name,
1029 i,
1030 proto == IPPROTO_UDP? "udp" : "tcp",
1031 port,
1032 family,
1033 vers->vs_hidden ?
1034 " (but not telling portmap)" : "");
1035
1036 if (vers->vs_hidden)
1037 continue;
1038
1039 /*
1040 * Don't register a UDP port if we need congestion
1041 * control.
1042 */
1043 if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
1044 continue;
1045
1046 error = __svc_register(net, progp->pg_name, progp->pg_prog,
1047 i, family, proto, port);
1048
1049 if (vers->vs_rpcb_optnl) {
1050 error = 0;
1051 continue;
1052 }
1053
1054 if (error < 0) {
1055 printk(KERN_WARNING "svc: failed to register "
1056 "%sv%u RPC service (errno %d).\n",
1057 progp->pg_name, i, -error);
1058 break;
1059 }
1060 }
1061 }
1062
1063 return error;
1064 }
1065
1066 /*
1067 * If user space is running rpcbind, it should take the v4 UNSET
1068 * and clear everything for this [program, version]. If user space
1069 * is running portmap, it will reject the v4 UNSET, but won't have
1070 * any "inet6" entries anyway. So a PMAP_UNSET should be sufficient
1071 * in this case to clear all existing entries for [program, version].
1072 */
1073 static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1074 const char *progname)
1075 {
1076 int error;
1077
1078 error = rpcb_v4_register(net, program, version, NULL, "");
1079
1080 /*
1081 * User space didn't support rpcbind v4, so retry this
1082 * request with the legacy rpcbind v2 protocol.
1083 */
1084 if (error == -EPROTONOSUPPORT)
1085 error = rpcb_register(net, program, version, 0, 0);
1086
1087 dprintk("svc: %s(%sv%u), error %d\n",
1088 __func__, progname, version, error);
1089 }
1090
1091 /*
1092 * All netids, bind addresses and ports registered for [program, version]
1093 * are removed from the local rpcbind database (if the service is not
1094 * hidden) to make way for a new instance of the service.
1095 *
1096 * The result of unregistration is reported via dprintk for those who want
1097 * verification of the result, but is otherwise not important.
1098 */
1099 static void svc_unregister(const struct svc_serv *serv, struct net *net)
1100 {
1101 struct svc_program *progp;
1102 unsigned long flags;
1103 unsigned int i;
1104
1105 clear_thread_flag(TIF_SIGPENDING);
1106
1107 for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1108 for (i = 0; i < progp->pg_nvers; i++) {
1109 if (progp->pg_vers[i] == NULL)
1110 continue;
1111 if (progp->pg_vers[i]->vs_hidden)
1112 continue;
1113
1114 dprintk("svc: attempting to unregister %sv%u\n",
1115 progp->pg_name, i);
1116 __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1117 }
1118 }
1119
1120 spin_lock_irqsave(&current->sighand->siglock, flags);
1121 recalc_sigpending();
1122 spin_unlock_irqrestore(&current->sighand->siglock, flags);
1123 }
1124
1125 /*
1126 * dprintk the given error with the address of the client that caused it.
1127 */
1128 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1129 static __printf(2, 3)
1130 void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1131 {
1132 struct va_format vaf;
1133 va_list args;
1134 char buf[RPC_MAX_ADDRBUFLEN];
1135
1136 va_start(args, fmt);
1137
1138 vaf.fmt = fmt;
1139 vaf.va = &args;
1140
1141 dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1142
1143 va_end(args);
1144 }
1145 #else
1146 static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1147 #endif
1148
1149 /*
1150 * Common routine for processing the RPC request.
1151 */
1152 static int
1153 svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1154 {
1155 struct svc_program *progp;
1156 const struct svc_version *versp = NULL; /* compiler food */
1157 const struct svc_procedure *procp = NULL;
1158 struct svc_serv *serv = rqstp->rq_server;
1159 __be32 *statp;
1160 u32 prog, vers, proc;
1161 __be32 auth_stat, rpc_stat;
1162 int auth_res;
1163 __be32 *reply_statp;
1164
1165 rpc_stat = rpc_success;
1166
1167 if (argv->iov_len < 6*4)
1168 goto err_short_len;
1169
1170 /* Will be turned off by GSS integrity and privacy services */
1171 set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1172 /* Will be turned off only when NFSv4 Sessions are used */
1173 set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1174 clear_bit(RQ_DROPME, &rqstp->rq_flags);
1175
1176 /* Setup reply header */
1177 if (rqstp->rq_prot == IPPROTO_TCP)
1178 svc_tcp_prep_reply_hdr(rqstp);
1179
1180 svc_putu32(resv, rqstp->rq_xid);
1181
1182 vers = svc_getnl(argv);
1183
1184 /* First words of reply: */
1185 svc_putnl(resv, 1); /* REPLY */
1186
1187 if (vers != 2) /* RPC version number */
1188 goto err_bad_rpc;
1189
1190 /* Save position in case we later decide to reject: */
1191 reply_statp = resv->iov_base + resv->iov_len;
1192
1193 svc_putnl(resv, 0); /* ACCEPT */
1194
1195 rqstp->rq_prog = prog = svc_getnl(argv); /* program number */
1196 rqstp->rq_vers = vers = svc_getnl(argv); /* version number */
1197 rqstp->rq_proc = proc = svc_getnl(argv); /* procedure number */
1198
1199 for (progp = serv->sv_program; progp; progp = progp->pg_next)
1200 if (prog == progp->pg_prog)
1201 break;
1202
1203 /*
1204 * Decode auth data, and add verifier to reply buffer.
1205 * We do this before anything else in order to get a decent
1206 * auth verifier.
1207 */
1208 auth_res = svc_authenticate(rqstp, &auth_stat);
1209 /* Also give the program a chance to reject this call: */
1210 if (auth_res == SVC_OK && progp) {
1211 auth_stat = rpc_autherr_badcred;
1212 auth_res = progp->pg_authenticate(rqstp);
1213 }
1214 switch (auth_res) {
1215 case SVC_OK:
1216 break;
1217 case SVC_GARBAGE:
1218 goto err_garbage;
1219 case SVC_SYSERR:
1220 rpc_stat = rpc_system_err;
1221 goto err_bad;
1222 case SVC_DENIED:
1223 goto err_bad_auth;
1224 case SVC_CLOSE:
1225 goto close;
1226 case SVC_DROP:
1227 goto dropit;
1228 case SVC_COMPLETE:
1229 goto sendit;
1230 }
1231
1232 if (progp == NULL)
1233 goto err_bad_prog;
1234
1235 if (vers >= progp->pg_nvers ||
1236 !(versp = progp->pg_vers[vers]))
1237 goto err_bad_vers;
1238
1239 /*
1240 * Some protocol versions (namely NFSv4) require some form of
1241 * congestion control. (See RFC 7530 section 3.1 paragraph 2)
1242 * In other words, UDP is not allowed. We mark those when setting
1243 * up the svc_xprt, and verify that here.
1244 *
1245 * The spec is not very clear about what error should be returned
1246 * when someone tries to access a server that is listening on UDP
1247 * for lower versions. RPC_PROG_MISMATCH seems to be the closest
1248 * fit.
1249 */
1250 if (versp->vs_need_cong_ctrl && rqstp->rq_xprt &&
1251 !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
1252 goto err_bad_vers;
1253
1254 procp = versp->vs_proc + proc;
1255 if (proc >= versp->vs_nproc || !procp->pc_func)
1256 goto err_bad_proc;
1257 rqstp->rq_procinfo = procp;
1258
1259 /* Syntactic check complete */
1260 serv->sv_stats->rpccnt++;
1261
1262 /* Build the reply header. */
1263 statp = resv->iov_base +resv->iov_len;
1264 svc_putnl(resv, RPC_SUCCESS);
1265
1266 /* Bump per-procedure stats counter */
1267 versp->vs_count[proc]++;
1268
1269 /* Initialize storage for argp and resp */
1270 memset(rqstp->rq_argp, 0, procp->pc_argsize);
1271 memset(rqstp->rq_resp, 0, procp->pc_ressize);
1272
1273 /* un-reserve some of the out-queue now that we have a
1274 * better idea of reply size
1275 */
1276 if (procp->pc_xdrressize)
1277 svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1278
1279 /* Call the function that processes the request. */
1280 if (!versp->vs_dispatch) {
1281 /*
1282 * Decode arguments
1283 * XXX: why do we ignore the return value?
1284 */
1285 if (procp->pc_decode &&
1286 !procp->pc_decode(rqstp, argv->iov_base))
1287 goto err_garbage;
1288
1289 *statp = procp->pc_func(rqstp);
1290
1291 /* Encode reply */
1292 if (*statp == rpc_drop_reply ||
1293 test_bit(RQ_DROPME, &rqstp->rq_flags)) {
1294 if (procp->pc_release)
1295 procp->pc_release(rqstp);
1296 goto dropit;
1297 }
1298 if (*statp == rpc_autherr_badcred) {
1299 if (procp->pc_release)
1300 procp->pc_release(rqstp);
1301 goto err_bad_auth;
1302 }
1303 if (*statp == rpc_success && procp->pc_encode &&
1304 !procp->pc_encode(rqstp, resv->iov_base + resv->iov_len)) {
1305 dprintk("svc: failed to encode reply\n");
1306 /* serv->sv_stats->rpcsystemerr++; */
1307 *statp = rpc_system_err;
1308 }
1309 } else {
1310 dprintk("svc: calling dispatcher\n");
1311 if (!versp->vs_dispatch(rqstp, statp)) {
1312 /* Release reply info */
1313 if (procp->pc_release)
1314 procp->pc_release(rqstp);
1315 goto dropit;
1316 }
1317 }
1318
1319 /* Check RPC status result */
1320 if (*statp != rpc_success)
1321 resv->iov_len = ((void*)statp) - resv->iov_base + 4;
1322
1323 /* Release reply info */
1324 if (procp->pc_release)
1325 procp->pc_release(rqstp);
1326
1327 if (procp->pc_encode == NULL)
1328 goto dropit;
1329
1330 sendit:
1331 if (svc_authorise(rqstp))
1332 goto close;
1333 return 1; /* Caller can now send it */
1334
1335 dropit:
1336 svc_authorise(rqstp); /* doesn't hurt to call this twice */
1337 dprintk("svc: svc_process dropit\n");
1338 return 0;
1339
1340 close:
1341 if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1342 svc_close_xprt(rqstp->rq_xprt);
1343 dprintk("svc: svc_process close\n");
1344 return 0;
1345
1346 err_short_len:
1347 svc_printk(rqstp, "short len %zd, dropping request\n",
1348 argv->iov_len);
1349 goto close;
1350
1351 err_bad_rpc:
1352 serv->sv_stats->rpcbadfmt++;
1353 svc_putnl(resv, 1); /* REJECT */
1354 svc_putnl(resv, 0); /* RPC_MISMATCH */
1355 svc_putnl(resv, 2); /* Only RPCv2 supported */
1356 svc_putnl(resv, 2);
1357 goto sendit;
1358
1359 err_bad_auth:
1360 dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1361 serv->sv_stats->rpcbadauth++;
1362 /* Restore write pointer to location of accept status: */
1363 xdr_ressize_check(rqstp, reply_statp);
1364 svc_putnl(resv, 1); /* REJECT */
1365 svc_putnl(resv, 1); /* AUTH_ERROR */
1366 svc_putnl(resv, ntohl(auth_stat)); /* status */
1367 goto sendit;
1368
1369 err_bad_prog:
1370 dprintk("svc: unknown program %d\n", prog);
1371 serv->sv_stats->rpcbadfmt++;
1372 svc_putnl(resv, RPC_PROG_UNAVAIL);
1373 goto sendit;
1374
1375 err_bad_vers:
1376 svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1377 vers, prog, progp->pg_name);
1378
1379 serv->sv_stats->rpcbadfmt++;
1380 svc_putnl(resv, RPC_PROG_MISMATCH);
1381 svc_putnl(resv, progp->pg_lovers);
1382 svc_putnl(resv, progp->pg_hivers);
1383 goto sendit;
1384
1385 err_bad_proc:
1386 svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1387
1388 serv->sv_stats->rpcbadfmt++;
1389 svc_putnl(resv, RPC_PROC_UNAVAIL);
1390 goto sendit;
1391
1392 err_garbage:
1393 svc_printk(rqstp, "failed to decode args\n");
1394
1395 rpc_stat = rpc_garbage_args;
1396 err_bad:
1397 serv->sv_stats->rpcbadfmt++;
1398 svc_putnl(resv, ntohl(rpc_stat));
1399 goto sendit;
1400 }
1401
1402 /*
1403 * Process the RPC request.
1404 */
1405 int
1406 svc_process(struct svc_rqst *rqstp)
1407 {
1408 struct kvec *argv = &rqstp->rq_arg.head[0];
1409 struct kvec *resv = &rqstp->rq_res.head[0];
1410 struct svc_serv *serv = rqstp->rq_server;
1411 u32 dir;
1412
1413 /*
1414 * Setup response xdr_buf.
1415 * Initially it has just one page
1416 */
1417 rqstp->rq_next_page = &rqstp->rq_respages[1];
1418 resv->iov_base = page_address(rqstp->rq_respages[0]);
1419 resv->iov_len = 0;
1420 rqstp->rq_res.pages = rqstp->rq_respages + 1;
1421 rqstp->rq_res.len = 0;
1422 rqstp->rq_res.page_base = 0;
1423 rqstp->rq_res.page_len = 0;
1424 rqstp->rq_res.buflen = PAGE_SIZE;
1425 rqstp->rq_res.tail[0].iov_base = NULL;
1426 rqstp->rq_res.tail[0].iov_len = 0;
1427
1428 dir = svc_getnl(argv);
1429 if (dir != 0) {
1430 /* direction != CALL */
1431 svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1432 serv->sv_stats->rpcbadfmt++;
1433 goto out_drop;
1434 }
1435
1436 /* Returns 1 for send, 0 for drop */
1437 if (likely(svc_process_common(rqstp, argv, resv))) {
1438 int ret = svc_send(rqstp);
1439
1440 trace_svc_process(rqstp, ret);
1441 return ret;
1442 }
1443 out_drop:
1444 trace_svc_process(rqstp, 0);
1445 svc_drop(rqstp);
1446 return 0;
1447 }
1448 EXPORT_SYMBOL_GPL(svc_process);
1449
1450 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1451 /*
1452 * Process a backchannel RPC request that arrived over an existing
1453 * outbound connection
1454 */
1455 int
1456 bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1457 struct svc_rqst *rqstp)
1458 {
1459 struct kvec *argv = &rqstp->rq_arg.head[0];
1460 struct kvec *resv = &rqstp->rq_res.head[0];
1461 struct rpc_task *task;
1462 int proc_error;
1463 int error;
1464
1465 dprintk("svc: %s(%p)\n", __func__, req);
1466
1467 /* Build the svc_rqst used by the common processing routine */
1468 rqstp->rq_xid = req->rq_xid;
1469 rqstp->rq_prot = req->rq_xprt->prot;
1470 rqstp->rq_server = serv;
1471 rqstp->rq_bc_net = req->rq_xprt->xprt_net;
1472
1473 rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1474 memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1475 memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1476 memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1477
1478 /* Adjust the argument buffer length */
1479 rqstp->rq_arg.len = req->rq_private_buf.len;
1480 if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1481 rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1482 rqstp->rq_arg.page_len = 0;
1483 } else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
1484 rqstp->rq_arg.page_len)
1485 rqstp->rq_arg.page_len = rqstp->rq_arg.len -
1486 rqstp->rq_arg.head[0].iov_len;
1487 else
1488 rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
1489 rqstp->rq_arg.page_len;
1490
1491 /* reset result send buffer "put" position */
1492 resv->iov_len = 0;
1493
1494 /*
1495 * Skip the next two words because they've already been
1496 * processed in the transport
1497 */
1498 svc_getu32(argv); /* XID */
1499 svc_getnl(argv); /* CALLDIR */
1500
1501 /* Parse and execute the bc call */
1502 proc_error = svc_process_common(rqstp, argv, resv);
1503
1504 atomic_inc(&req->rq_xprt->bc_free_slots);
1505 if (!proc_error) {
1506 /* Processing error: drop the request */
1507 xprt_free_bc_request(req);
1508 return 0;
1509 }
1510
1511 /* Finally, send the reply synchronously */
1512 memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1513 task = rpc_run_bc_task(req);
1514 if (IS_ERR(task)) {
1515 error = PTR_ERR(task);
1516 goto out;
1517 }
1518
1519 WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
1520 error = task->tk_status;
1521 rpc_put_task(task);
1522
1523 out:
1524 dprintk("svc: %s(), error=%d\n", __func__, error);
1525 return error;
1526 }
1527 EXPORT_SYMBOL_GPL(bc_svc_process);
1528 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1529
1530 /*
1531 * Return (transport-specific) limit on the rpc payload.
1532 */
1533 u32 svc_max_payload(const struct svc_rqst *rqstp)
1534 {
1535 u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1536
1537 if (rqstp->rq_server->sv_max_payload < max)
1538 max = rqstp->rq_server->sv_max_payload;
1539 return max;
1540 }
1541 EXPORT_SYMBOL_GPL(svc_max_payload);