]> git.proxmox.com Git - mirror_ubuntu-disco-kernel.git/blame - net/ceph/osd_client.c
libceph: async MON client generic requests
[mirror_ubuntu-disco-kernel.git] / net / ceph / osd_client.c
CommitLineData
a4ce40a9 1
3d14c5d2 2#include <linux/ceph/ceph_debug.h>
f24e9980 3
3d14c5d2 4#include <linux/module.h>
f24e9980
SW
5#include <linux/err.h>
6#include <linux/highmem.h>
7#include <linux/mm.h>
8#include <linux/pagemap.h>
9#include <linux/slab.h>
10#include <linux/uaccess.h>
68b4476b
YS
11#ifdef CONFIG_BLOCK
12#include <linux/bio.h>
13#endif
f24e9980 14
3d14c5d2
YS
15#include <linux/ceph/libceph.h>
16#include <linux/ceph/osd_client.h>
17#include <linux/ceph/messenger.h>
18#include <linux/ceph/decode.h>
19#include <linux/ceph/auth.h>
20#include <linux/ceph/pagelist.h>
f24e9980 21
c16e7869 22#define OSD_OPREPLY_FRONT_LEN 512
0d59ab81 23
5522ae0b
AE
24static struct kmem_cache *ceph_osd_request_cache;
25
9e32789f 26static const struct ceph_connection_operations osd_con_ops;
f24e9980 27
f24e9980
SW
28/*
29 * Implement client access to distributed object storage cluster.
30 *
31 * All data objects are stored within a cluster/cloud of OSDs, or
32 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
33 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
34 * remote daemons serving up and coordinating consistent and safe
35 * access to storage.
36 *
37 * Cluster membership and the mapping of data objects onto storage devices
38 * are described by the osd map.
39 *
40 * We keep track of pending OSD requests (read, write), resubmit
41 * requests to different OSDs when the cluster topology/data layout
42 * change, or retry the affected requests when the communications
43 * channel with an OSD is reset.
44 */
45
5aea3dcd
ID
46static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
47static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
922dab61
ID
48static void link_linger(struct ceph_osd *osd,
49 struct ceph_osd_linger_request *lreq);
50static void unlink_linger(struct ceph_osd *osd,
51 struct ceph_osd_linger_request *lreq);
5aea3dcd
ID
52
53#if 1
54static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
55{
56 bool wrlocked = true;
57
58 if (unlikely(down_read_trylock(sem))) {
59 wrlocked = false;
60 up_read(sem);
61 }
62
63 return wrlocked;
64}
65static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
66{
67 WARN_ON(!rwsem_is_locked(&osdc->lock));
68}
69static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
70{
71 WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
72}
73static inline void verify_osd_locked(struct ceph_osd *osd)
74{
75 struct ceph_osd_client *osdc = osd->o_osdc;
76
77 WARN_ON(!(mutex_is_locked(&osd->lock) &&
78 rwsem_is_locked(&osdc->lock)) &&
79 !rwsem_is_wrlocked(&osdc->lock));
80}
922dab61
ID
81static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
82{
83 WARN_ON(!mutex_is_locked(&lreq->lock));
84}
5aea3dcd
ID
85#else
86static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
87static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
88static inline void verify_osd_locked(struct ceph_osd *osd) { }
922dab61 89static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
5aea3dcd
ID
90#endif
91
f24e9980
SW
92/*
93 * calculate the mapping of a file extent onto an object, and fill out the
94 * request accordingly. shorten extent as necessary if it crosses an
95 * object boundary.
96 *
97 * fill osd op in request message.
98 */
dbe0fc41 99static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
a19dadfb 100 u64 *objnum, u64 *objoff, u64 *objlen)
f24e9980 101{
60e56f13 102 u64 orig_len = *plen;
d63b77f4 103 int r;
f24e9980 104
60e56f13 105 /* object extent? */
75d1c941
AE
106 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
107 objoff, objlen);
d63b77f4
SW
108 if (r < 0)
109 return r;
75d1c941
AE
110 if (*objlen < orig_len) {
111 *plen = *objlen;
60e56f13
AE
112 dout(" skipping last %llu, final file extent %llu~%llu\n",
113 orig_len - *plen, off, *plen);
114 }
115
75d1c941 116 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
f24e9980 117
3ff5f385 118 return 0;
f24e9980
SW
119}
120
c54d47bf
AE
121static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
122{
123 memset(osd_data, 0, sizeof (*osd_data));
124 osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
125}
126
a4ce40a9 127static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
43bfe5de
AE
128 struct page **pages, u64 length, u32 alignment,
129 bool pages_from_pool, bool own_pages)
130{
131 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
132 osd_data->pages = pages;
133 osd_data->length = length;
134 osd_data->alignment = alignment;
135 osd_data->pages_from_pool = pages_from_pool;
136 osd_data->own_pages = own_pages;
137}
43bfe5de 138
a4ce40a9 139static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
43bfe5de
AE
140 struct ceph_pagelist *pagelist)
141{
142 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
143 osd_data->pagelist = pagelist;
144}
43bfe5de
AE
145
146#ifdef CONFIG_BLOCK
a4ce40a9 147static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
43bfe5de
AE
148 struct bio *bio, size_t bio_length)
149{
150 osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
151 osd_data->bio = bio;
152 osd_data->bio_length = bio_length;
153}
43bfe5de
AE
154#endif /* CONFIG_BLOCK */
155
8a703a38
IC
156#define osd_req_op_data(oreq, whch, typ, fld) \
157({ \
158 struct ceph_osd_request *__oreq = (oreq); \
159 unsigned int __whch = (whch); \
160 BUG_ON(__whch >= __oreq->r_num_ops); \
161 &__oreq->r_ops[__whch].typ.fld; \
162})
863c7eb5 163
49719778
AE
164static struct ceph_osd_data *
165osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
166{
167 BUG_ON(which >= osd_req->r_num_ops);
168
169 return &osd_req->r_ops[which].raw_data_in;
170}
171
a4ce40a9
AE
172struct ceph_osd_data *
173osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
406e2c9f 174 unsigned int which)
a4ce40a9 175{
863c7eb5 176 return osd_req_op_data(osd_req, which, extent, osd_data);
a4ce40a9
AE
177}
178EXPORT_SYMBOL(osd_req_op_extent_osd_data);
179
49719778
AE
180void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
181 unsigned int which, struct page **pages,
182 u64 length, u32 alignment,
183 bool pages_from_pool, bool own_pages)
184{
185 struct ceph_osd_data *osd_data;
186
187 osd_data = osd_req_op_raw_data_in(osd_req, which);
188 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
189 pages_from_pool, own_pages);
190}
191EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
192
a4ce40a9 193void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
406e2c9f
AE
194 unsigned int which, struct page **pages,
195 u64 length, u32 alignment,
a4ce40a9
AE
196 bool pages_from_pool, bool own_pages)
197{
198 struct ceph_osd_data *osd_data;
199
863c7eb5 200 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
a4ce40a9
AE
201 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
202 pages_from_pool, own_pages);
a4ce40a9
AE
203}
204EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
205
206void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
406e2c9f 207 unsigned int which, struct ceph_pagelist *pagelist)
a4ce40a9
AE
208{
209 struct ceph_osd_data *osd_data;
210
863c7eb5 211 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
a4ce40a9 212 ceph_osd_data_pagelist_init(osd_data, pagelist);
a4ce40a9
AE
213}
214EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
215
216#ifdef CONFIG_BLOCK
217void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
406e2c9f 218 unsigned int which, struct bio *bio, size_t bio_length)
a4ce40a9
AE
219{
220 struct ceph_osd_data *osd_data;
863c7eb5
AE
221
222 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
a4ce40a9 223 ceph_osd_data_bio_init(osd_data, bio, bio_length);
a4ce40a9
AE
224}
225EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
226#endif /* CONFIG_BLOCK */
227
228static void osd_req_op_cls_request_info_pagelist(
229 struct ceph_osd_request *osd_req,
230 unsigned int which, struct ceph_pagelist *pagelist)
231{
232 struct ceph_osd_data *osd_data;
233
863c7eb5 234 osd_data = osd_req_op_data(osd_req, which, cls, request_info);
a4ce40a9 235 ceph_osd_data_pagelist_init(osd_data, pagelist);
a4ce40a9
AE
236}
237
04017e29
AE
238void osd_req_op_cls_request_data_pagelist(
239 struct ceph_osd_request *osd_req,
240 unsigned int which, struct ceph_pagelist *pagelist)
241{
242 struct ceph_osd_data *osd_data;
243
863c7eb5 244 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
04017e29 245 ceph_osd_data_pagelist_init(osd_data, pagelist);
bb873b53
ID
246 osd_req->r_ops[which].cls.indata_len += pagelist->length;
247 osd_req->r_ops[which].indata_len += pagelist->length;
04017e29
AE
248}
249EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
250
6c57b554
AE
251void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
252 unsigned int which, struct page **pages, u64 length,
253 u32 alignment, bool pages_from_pool, bool own_pages)
254{
255 struct ceph_osd_data *osd_data;
256
257 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
258 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
259 pages_from_pool, own_pages);
bb873b53
ID
260 osd_req->r_ops[which].cls.indata_len += length;
261 osd_req->r_ops[which].indata_len += length;
6c57b554
AE
262}
263EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
264
a4ce40a9
AE
265void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
266 unsigned int which, struct page **pages, u64 length,
267 u32 alignment, bool pages_from_pool, bool own_pages)
268{
269 struct ceph_osd_data *osd_data;
270
863c7eb5 271 osd_data = osd_req_op_data(osd_req, which, cls, response_data);
a4ce40a9
AE
272 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
273 pages_from_pool, own_pages);
a4ce40a9
AE
274}
275EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
276
23c08a9c
AE
277static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
278{
279 switch (osd_data->type) {
280 case CEPH_OSD_DATA_TYPE_NONE:
281 return 0;
282 case CEPH_OSD_DATA_TYPE_PAGES:
283 return osd_data->length;
284 case CEPH_OSD_DATA_TYPE_PAGELIST:
285 return (u64)osd_data->pagelist->length;
286#ifdef CONFIG_BLOCK
287 case CEPH_OSD_DATA_TYPE_BIO:
288 return (u64)osd_data->bio_length;
289#endif /* CONFIG_BLOCK */
290 default:
291 WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
292 return 0;
293 }
294}
295
c54d47bf
AE
296static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
297{
5476492f 298 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
c54d47bf
AE
299 int num_pages;
300
301 num_pages = calc_pages_for((u64)osd_data->alignment,
302 (u64)osd_data->length);
303 ceph_release_page_vector(osd_data->pages, num_pages);
304 }
5476492f
AE
305 ceph_osd_data_init(osd_data);
306}
307
308static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
309 unsigned int which)
310{
311 struct ceph_osd_req_op *op;
312
313 BUG_ON(which >= osd_req->r_num_ops);
314 op = &osd_req->r_ops[which];
315
316 switch (op->op) {
317 case CEPH_OSD_OP_READ:
318 case CEPH_OSD_OP_WRITE:
e30b7577 319 case CEPH_OSD_OP_WRITEFULL:
5476492f
AE
320 ceph_osd_data_release(&op->extent.osd_data);
321 break;
322 case CEPH_OSD_OP_CALL:
323 ceph_osd_data_release(&op->cls.request_info);
04017e29 324 ceph_osd_data_release(&op->cls.request_data);
5476492f
AE
325 ceph_osd_data_release(&op->cls.response_data);
326 break;
d74b50be
YZ
327 case CEPH_OSD_OP_SETXATTR:
328 case CEPH_OSD_OP_CMPXATTR:
329 ceph_osd_data_release(&op->xattr.osd_data);
330 break;
66ba609f
YZ
331 case CEPH_OSD_OP_STAT:
332 ceph_osd_data_release(&op->raw_data_in);
333 break;
922dab61
ID
334 case CEPH_OSD_OP_NOTIFY_ACK:
335 ceph_osd_data_release(&op->notify_ack.request_data);
336 break;
19079203
ID
337 case CEPH_OSD_OP_NOTIFY:
338 ceph_osd_data_release(&op->notify.request_data);
339 ceph_osd_data_release(&op->notify.response_data);
340 break;
5476492f
AE
341 default:
342 break;
343 }
c54d47bf
AE
344}
345
63244fa1
ID
346/*
347 * Assumes @t is zero-initialized.
348 */
349static void target_init(struct ceph_osd_request_target *t)
350{
351 ceph_oid_init(&t->base_oid);
352 ceph_oloc_init(&t->base_oloc);
353 ceph_oid_init(&t->target_oid);
354 ceph_oloc_init(&t->target_oloc);
355
356 ceph_osds_init(&t->acting);
357 ceph_osds_init(&t->up);
358 t->size = -1;
359 t->min_size = -1;
360
361 t->osd = CEPH_HOMELESS_OSD;
362}
363
922dab61
ID
364static void target_copy(struct ceph_osd_request_target *dest,
365 const struct ceph_osd_request_target *src)
366{
367 ceph_oid_copy(&dest->base_oid, &src->base_oid);
368 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
369 ceph_oid_copy(&dest->target_oid, &src->target_oid);
370 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
371
372 dest->pgid = src->pgid; /* struct */
373 dest->pg_num = src->pg_num;
374 dest->pg_num_mask = src->pg_num_mask;
375 ceph_osds_copy(&dest->acting, &src->acting);
376 ceph_osds_copy(&dest->up, &src->up);
377 dest->size = src->size;
378 dest->min_size = src->min_size;
379 dest->sort_bitwise = src->sort_bitwise;
380
381 dest->flags = src->flags;
382 dest->paused = src->paused;
383
384 dest->osd = src->osd;
385}
386
63244fa1
ID
387static void target_destroy(struct ceph_osd_request_target *t)
388{
389 ceph_oid_destroy(&t->base_oid);
390 ceph_oid_destroy(&t->target_oid);
391}
392
f24e9980
SW
393/*
394 * requests
395 */
3540bfdb
ID
396static void request_release_checks(struct ceph_osd_request *req)
397{
398 WARN_ON(!RB_EMPTY_NODE(&req->r_node));
3540bfdb
ID
399 WARN_ON(!list_empty(&req->r_unsafe_item));
400 WARN_ON(req->r_osd);
401}
402
9e94af20 403static void ceph_osdc_release_request(struct kref *kref)
f24e9980 404{
9e94af20
ID
405 struct ceph_osd_request *req = container_of(kref,
406 struct ceph_osd_request, r_kref);
5476492f 407 unsigned int which;
415e49a9 408
9e94af20
ID
409 dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
410 req->r_request, req->r_reply);
3540bfdb 411 request_release_checks(req);
9e94af20 412
415e49a9
SW
413 if (req->r_request)
414 ceph_msg_put(req->r_request);
5aea3dcd 415 if (req->r_reply)
ab8cb34a 416 ceph_msg_put(req->r_reply);
0fff87ec 417
5476492f
AE
418 for (which = 0; which < req->r_num_ops; which++)
419 osd_req_op_data_release(req, which);
0fff87ec 420
a66dd383 421 target_destroy(&req->r_t);
415e49a9 422 ceph_put_snap_context(req->r_snapc);
d30291b9 423
415e49a9
SW
424 if (req->r_mempool)
425 mempool_free(req, req->r_osdc->req_mempool);
3f1af42a 426 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
5522ae0b 427 kmem_cache_free(ceph_osd_request_cache, req);
3f1af42a
ID
428 else
429 kfree(req);
f24e9980 430}
9e94af20
ID
431
432void ceph_osdc_get_request(struct ceph_osd_request *req)
433{
434 dout("%s %p (was %d)\n", __func__, req,
435 atomic_read(&req->r_kref.refcount));
436 kref_get(&req->r_kref);
437}
438EXPORT_SYMBOL(ceph_osdc_get_request);
439
440void ceph_osdc_put_request(struct ceph_osd_request *req)
441{
3ed97d63
ID
442 if (req) {
443 dout("%s %p (was %d)\n", __func__, req,
444 atomic_read(&req->r_kref.refcount));
445 kref_put(&req->r_kref, ceph_osdc_release_request);
446 }
9e94af20
ID
447}
448EXPORT_SYMBOL(ceph_osdc_put_request);
68b4476b 449
3540bfdb
ID
450static void request_init(struct ceph_osd_request *req)
451{
452 /* req only, each op is zeroed in _osd_req_op_init() */
453 memset(req, 0, sizeof(*req));
454
455 kref_init(&req->r_kref);
456 init_completion(&req->r_completion);
457 init_completion(&req->r_safe_completion);
458 RB_CLEAR_NODE(&req->r_node);
3540bfdb
ID
459 INIT_LIST_HEAD(&req->r_unsafe_item);
460
461 target_init(&req->r_t);
462}
463
922dab61
ID
464/*
465 * This is ugly, but it allows us to reuse linger registration and ping
466 * requests, keeping the structure of the code around send_linger{_ping}()
467 * reasonable. Setting up a min_nr=2 mempool for each linger request
468 * and dealing with copying ops (this blasts req only, watch op remains
469 * intact) isn't any better.
470 */
471static void request_reinit(struct ceph_osd_request *req)
472{
473 struct ceph_osd_client *osdc = req->r_osdc;
474 bool mempool = req->r_mempool;
475 unsigned int num_ops = req->r_num_ops;
476 u64 snapid = req->r_snapid;
477 struct ceph_snap_context *snapc = req->r_snapc;
478 bool linger = req->r_linger;
479 struct ceph_msg *request_msg = req->r_request;
480 struct ceph_msg *reply_msg = req->r_reply;
481
482 dout("%s req %p\n", __func__, req);
483 WARN_ON(atomic_read(&req->r_kref.refcount) != 1);
484 request_release_checks(req);
485
486 WARN_ON(atomic_read(&request_msg->kref.refcount) != 1);
487 WARN_ON(atomic_read(&reply_msg->kref.refcount) != 1);
488 target_destroy(&req->r_t);
489
490 request_init(req);
491 req->r_osdc = osdc;
492 req->r_mempool = mempool;
493 req->r_num_ops = num_ops;
494 req->r_snapid = snapid;
495 req->r_snapc = snapc;
496 req->r_linger = linger;
497 req->r_request = request_msg;
498 req->r_reply = reply_msg;
499}
500
3499e8a5 501struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
f24e9980 502 struct ceph_snap_context *snapc,
1b83bef2 503 unsigned int num_ops,
3499e8a5 504 bool use_mempool,
54a54007 505 gfp_t gfp_flags)
f24e9980
SW
506{
507 struct ceph_osd_request *req;
1b83bef2 508
f24e9980 509 if (use_mempool) {
3f1af42a 510 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
3499e8a5 511 req = mempool_alloc(osdc->req_mempool, gfp_flags);
3f1af42a
ID
512 } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
513 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
f24e9980 514 } else {
3f1af42a
ID
515 BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
516 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
517 gfp_flags);
f24e9980 518 }
3f1af42a 519 if (unlikely(!req))
a79832f2 520 return NULL;
f24e9980 521
3540bfdb 522 request_init(req);
f24e9980
SW
523 req->r_osdc = osdc;
524 req->r_mempool = use_mempool;
79528734 525 req->r_num_ops = num_ops;
84127282
ID
526 req->r_snapid = CEPH_NOSNAP;
527 req->r_snapc = ceph_get_snap_context(snapc);
68b4476b 528
13d1ad16
ID
529 dout("%s req %p\n", __func__, req);
530 return req;
531}
532EXPORT_SYMBOL(ceph_osdc_alloc_request);
3f1af42a 533
13d1ad16
ID
534int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
535{
536 struct ceph_osd_client *osdc = req->r_osdc;
537 struct ceph_msg *msg;
538 int msg_size;
c16e7869 539
d30291b9
ID
540 WARN_ON(ceph_oid_empty(&req->r_base_oid));
541
13d1ad16 542 /* create request message */
ae458f5a
ID
543 msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
544 msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
545 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
546 msg_size += 1 + 8 + 4 + 4; /* pgid */
13d1ad16
ID
547 msg_size += 4 + req->r_base_oid.name_len; /* oid */
548 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
ae458f5a
ID
549 msg_size += 8; /* snapid */
550 msg_size += 8; /* snap_seq */
13d1ad16 551 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
ae458f5a
ID
552 msg_size += 4; /* retry_attempt */
553
13d1ad16 554 if (req->r_mempool)
8f3bc053 555 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
f24e9980 556 else
13d1ad16
ID
557 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
558 if (!msg)
559 return -ENOMEM;
68b4476b 560
f24e9980 561 memset(msg->front.iov_base, 0, msg->front.iov_len);
3499e8a5 562 req->r_request = msg;
3499e8a5 563
13d1ad16
ID
564 /* create reply message */
565 msg_size = OSD_OPREPLY_FRONT_LEN;
711da55d
ID
566 msg_size += req->r_base_oid.name_len;
567 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
13d1ad16
ID
568
569 if (req->r_mempool)
570 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
571 else
572 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
573 if (!msg)
574 return -ENOMEM;
575
576 req->r_reply = msg;
577
578 return 0;
3499e8a5 579}
13d1ad16 580EXPORT_SYMBOL(ceph_osdc_alloc_messages);
3499e8a5 581
a8dd0a37 582static bool osd_req_opcode_valid(u16 opcode)
68b4476b 583{
a8dd0a37 584 switch (opcode) {
70b5bfa3
ID
585#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true;
586__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
587#undef GENERATE_CASE
a8dd0a37
AE
588 default:
589 return false;
590 }
591}
592
33803f33
AE
593/*
594 * This is an osd op init function for opcodes that have no data or
595 * other information associated with them. It also serves as a
596 * common init routine for all the other init functions, below.
597 */
c99d2d4a 598static struct ceph_osd_req_op *
49719778 599_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
144cba14 600 u16 opcode, u32 flags)
33803f33 601{
c99d2d4a
AE
602 struct ceph_osd_req_op *op;
603
604 BUG_ON(which >= osd_req->r_num_ops);
33803f33
AE
605 BUG_ON(!osd_req_opcode_valid(opcode));
606
c99d2d4a 607 op = &osd_req->r_ops[which];
33803f33 608 memset(op, 0, sizeof (*op));
33803f33 609 op->op = opcode;
144cba14 610 op->flags = flags;
c99d2d4a
AE
611
612 return op;
33803f33
AE
613}
614
49719778 615void osd_req_op_init(struct ceph_osd_request *osd_req,
144cba14 616 unsigned int which, u16 opcode, u32 flags)
49719778 617{
144cba14 618 (void)_osd_req_op_init(osd_req, which, opcode, flags);
49719778
AE
619}
620EXPORT_SYMBOL(osd_req_op_init);
621
c99d2d4a
AE
622void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
623 unsigned int which, u16 opcode,
33803f33
AE
624 u64 offset, u64 length,
625 u64 truncate_size, u32 truncate_seq)
626{
144cba14
YZ
627 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
628 opcode, 0);
33803f33
AE
629 size_t payload_len = 0;
630
ad7a60de 631 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
e30b7577
ID
632 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
633 opcode != CEPH_OSD_OP_TRUNCATE);
33803f33 634
33803f33
AE
635 op->extent.offset = offset;
636 op->extent.length = length;
637 op->extent.truncate_size = truncate_size;
638 op->extent.truncate_seq = truncate_seq;
e30b7577 639 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
33803f33
AE
640 payload_len += length;
641
de2aa102 642 op->indata_len = payload_len;
33803f33
AE
643}
644EXPORT_SYMBOL(osd_req_op_extent_init);
645
c99d2d4a
AE
646void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
647 unsigned int which, u64 length)
e5975c7c 648{
c99d2d4a
AE
649 struct ceph_osd_req_op *op;
650 u64 previous;
651
652 BUG_ON(which >= osd_req->r_num_ops);
653 op = &osd_req->r_ops[which];
654 previous = op->extent.length;
e5975c7c
AE
655
656 if (length == previous)
657 return; /* Nothing to do */
658 BUG_ON(length > previous);
659
660 op->extent.length = length;
de2aa102 661 op->indata_len -= previous - length;
e5975c7c
AE
662}
663EXPORT_SYMBOL(osd_req_op_extent_update);
664
2c63f49a
YZ
665void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
666 unsigned int which, u64 offset_inc)
667{
668 struct ceph_osd_req_op *op, *prev_op;
669
670 BUG_ON(which + 1 >= osd_req->r_num_ops);
671
672 prev_op = &osd_req->r_ops[which];
673 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
674 /* dup previous one */
675 op->indata_len = prev_op->indata_len;
676 op->outdata_len = prev_op->outdata_len;
677 op->extent = prev_op->extent;
678 /* adjust offset */
679 op->extent.offset += offset_inc;
680 op->extent.length -= offset_inc;
681
682 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
683 op->indata_len -= offset_inc;
684}
685EXPORT_SYMBOL(osd_req_op_extent_dup_last);
686
c99d2d4a 687void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
04017e29 688 u16 opcode, const char *class, const char *method)
33803f33 689{
144cba14
YZ
690 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
691 opcode, 0);
5f562df5 692 struct ceph_pagelist *pagelist;
33803f33
AE
693 size_t payload_len = 0;
694 size_t size;
695
696 BUG_ON(opcode != CEPH_OSD_OP_CALL);
697
5f562df5
AE
698 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
699 BUG_ON(!pagelist);
700 ceph_pagelist_init(pagelist);
701
33803f33
AE
702 op->cls.class_name = class;
703 size = strlen(class);
704 BUG_ON(size > (size_t) U8_MAX);
705 op->cls.class_len = size;
5f562df5 706 ceph_pagelist_append(pagelist, class, size);
33803f33
AE
707 payload_len += size;
708
709 op->cls.method_name = method;
710 size = strlen(method);
711 BUG_ON(size > (size_t) U8_MAX);
712 op->cls.method_len = size;
5f562df5 713 ceph_pagelist_append(pagelist, method, size);
33803f33
AE
714 payload_len += size;
715
a4ce40a9 716 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
5f562df5 717
de2aa102 718 op->indata_len = payload_len;
33803f33
AE
719}
720EXPORT_SYMBOL(osd_req_op_cls_init);
8c042b0d 721
d74b50be
YZ
722int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
723 u16 opcode, const char *name, const void *value,
724 size_t size, u8 cmp_op, u8 cmp_mode)
725{
144cba14
YZ
726 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
727 opcode, 0);
d74b50be
YZ
728 struct ceph_pagelist *pagelist;
729 size_t payload_len;
730
731 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
732
733 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
734 if (!pagelist)
735 return -ENOMEM;
736
737 ceph_pagelist_init(pagelist);
738
739 payload_len = strlen(name);
740 op->xattr.name_len = payload_len;
741 ceph_pagelist_append(pagelist, name, payload_len);
742
743 op->xattr.value_len = size;
744 ceph_pagelist_append(pagelist, value, size);
745 payload_len += size;
746
747 op->xattr.cmp_op = cmp_op;
748 op->xattr.cmp_mode = cmp_mode;
749
750 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
de2aa102 751 op->indata_len = payload_len;
d74b50be
YZ
752 return 0;
753}
754EXPORT_SYMBOL(osd_req_op_xattr_init);
755
922dab61
ID
756/*
757 * @watch_opcode: CEPH_OSD_WATCH_OP_*
758 */
759static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
760 u64 cookie, u8 watch_opcode)
33803f33 761{
922dab61 762 struct ceph_osd_req_op *op;
33803f33 763
922dab61 764 op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
33803f33 765 op->watch.cookie = cookie;
922dab61
ID
766 op->watch.op = watch_opcode;
767 op->watch.gen = 0;
33803f33 768}
33803f33 769
c647b8a8
ID
770void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
771 unsigned int which,
772 u64 expected_object_size,
773 u64 expected_write_size)
774{
775 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
144cba14
YZ
776 CEPH_OSD_OP_SETALLOCHINT,
777 0);
c647b8a8
ID
778
779 op->alloc_hint.expected_object_size = expected_object_size;
780 op->alloc_hint.expected_write_size = expected_write_size;
781
782 /*
783 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
784 * not worth a feature bit. Set FAILOK per-op flag to make
785 * sure older osds don't trip over an unsupported opcode.
786 */
787 op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
788}
789EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
790
90af3602 791static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
ec9123c5
AE
792 struct ceph_osd_data *osd_data)
793{
794 u64 length = ceph_osd_data_length(osd_data);
795
796 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
797 BUG_ON(length > (u64) SIZE_MAX);
798 if (length)
90af3602 799 ceph_msg_data_add_pages(msg, osd_data->pages,
ec9123c5
AE
800 length, osd_data->alignment);
801 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
802 BUG_ON(!length);
90af3602 803 ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
ec9123c5
AE
804#ifdef CONFIG_BLOCK
805 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
90af3602 806 ceph_msg_data_add_bio(msg, osd_data->bio, length);
ec9123c5
AE
807#endif
808 } else {
809 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
810 }
811}
812
bb873b53
ID
813static u32 osd_req_encode_op(struct ceph_osd_op *dst,
814 const struct ceph_osd_req_op *src)
a8dd0a37 815{
a8dd0a37
AE
816 if (WARN_ON(!osd_req_opcode_valid(src->op))) {
817 pr_err("unrecognized osd opcode %d\n", src->op);
818
819 return 0;
820 }
821
822 switch (src->op) {
823 case CEPH_OSD_OP_STAT:
824 break;
825 case CEPH_OSD_OP_READ:
826 case CEPH_OSD_OP_WRITE:
e30b7577 827 case CEPH_OSD_OP_WRITEFULL:
ad7a60de 828 case CEPH_OSD_OP_ZERO:
ad7a60de 829 case CEPH_OSD_OP_TRUNCATE:
a8dd0a37
AE
830 dst->extent.offset = cpu_to_le64(src->extent.offset);
831 dst->extent.length = cpu_to_le64(src->extent.length);
832 dst->extent.truncate_size =
833 cpu_to_le64(src->extent.truncate_size);
834 dst->extent.truncate_seq =
835 cpu_to_le32(src->extent.truncate_seq);
836 break;
837 case CEPH_OSD_OP_CALL:
a8dd0a37
AE
838 dst->cls.class_len = src->cls.class_len;
839 dst->cls.method_len = src->cls.method_len;
bb873b53 840 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
a8dd0a37
AE
841 break;
842 case CEPH_OSD_OP_STARTSYNC:
843 break;
a8dd0a37
AE
844 case CEPH_OSD_OP_WATCH:
845 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
922dab61
ID
846 dst->watch.ver = cpu_to_le64(0);
847 dst->watch.op = src->watch.op;
848 dst->watch.gen = cpu_to_le32(src->watch.gen);
849 break;
850 case CEPH_OSD_OP_NOTIFY_ACK:
a8dd0a37 851 break;
19079203
ID
852 case CEPH_OSD_OP_NOTIFY:
853 dst->notify.cookie = cpu_to_le64(src->notify.cookie);
854 break;
c647b8a8
ID
855 case CEPH_OSD_OP_SETALLOCHINT:
856 dst->alloc_hint.expected_object_size =
857 cpu_to_le64(src->alloc_hint.expected_object_size);
858 dst->alloc_hint.expected_write_size =
859 cpu_to_le64(src->alloc_hint.expected_write_size);
860 break;
d74b50be
YZ
861 case CEPH_OSD_OP_SETXATTR:
862 case CEPH_OSD_OP_CMPXATTR:
863 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
864 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
865 dst->xattr.cmp_op = src->xattr.cmp_op;
866 dst->xattr.cmp_mode = src->xattr.cmp_mode;
d74b50be 867 break;
864e9197
YZ
868 case CEPH_OSD_OP_CREATE:
869 case CEPH_OSD_OP_DELETE:
870 break;
a8dd0a37 871 default:
4c46459c 872 pr_err("unsupported osd opcode %s\n",
8f63ca2d 873 ceph_osd_op_name(src->op));
4c46459c 874 WARN_ON(1);
a8dd0a37
AE
875
876 return 0;
68b4476b 877 }
7b25bf5f 878
a8dd0a37 879 dst->op = cpu_to_le16(src->op);
7b25bf5f 880 dst->flags = cpu_to_le32(src->flags);
de2aa102 881 dst->payload_len = cpu_to_le32(src->indata_len);
175face2 882
bb873b53 883 return src->indata_len;
68b4476b
YS
884}
885
3499e8a5
YS
886/*
887 * build new request AND message, calculate layout, and adjust file
888 * extent as needed.
889 *
890 * if the file was recently truncated, we include information about its
891 * old and new size so that the object can be updated appropriately. (we
892 * avoid synchronously deleting truncated objects because it's slow.)
893 *
894 * if @do_sync, include a 'startsync' command so that the osd will flush
895 * data quickly.
896 */
897struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
898 struct ceph_file_layout *layout,
899 struct ceph_vino vino,
715e4cd4
YZ
900 u64 off, u64 *plen,
901 unsigned int which, int num_ops,
3499e8a5
YS
902 int opcode, int flags,
903 struct ceph_snap_context *snapc,
3499e8a5
YS
904 u32 truncate_seq,
905 u64 truncate_size,
153e5167 906 bool use_mempool)
3499e8a5 907{
68b4476b 908 struct ceph_osd_request *req;
75d1c941
AE
909 u64 objnum = 0;
910 u64 objoff = 0;
911 u64 objlen = 0;
6816282d 912 int r;
68b4476b 913
ad7a60de 914 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
864e9197
YZ
915 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
916 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
68b4476b 917
acead002 918 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
ae7ca4a3 919 GFP_NOFS);
13d1ad16
ID
920 if (!req) {
921 r = -ENOMEM;
922 goto fail;
923 }
79528734 924
3499e8a5 925 /* calculate max write size */
a19dadfb 926 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
13d1ad16
ID
927 if (r)
928 goto fail;
a19dadfb 929
864e9197 930 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
144cba14 931 osd_req_op_init(req, which, opcode, 0);
864e9197
YZ
932 } else {
933 u32 object_size = le32_to_cpu(layout->fl_object_size);
934 u32 object_base = off - objoff;
935 if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
936 if (truncate_size <= object_base) {
937 truncate_size = 0;
938 } else {
939 truncate_size -= object_base;
940 if (truncate_size > object_size)
941 truncate_size = object_size;
942 }
ccca4e37 943 }
715e4cd4 944 osd_req_op_extent_init(req, which, opcode, objoff, objlen,
864e9197
YZ
945 truncate_size, truncate_seq);
946 }
d18d1e28 947
bb873b53 948 req->r_flags = flags;
3c972c95 949 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
d30291b9 950 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
dbe0fc41 951
bb873b53
ID
952 req->r_snapid = vino.snap;
953 if (flags & CEPH_OSD_FLAG_WRITE)
954 req->r_data_offset = off;
955
13d1ad16
ID
956 r = ceph_osdc_alloc_messages(req, GFP_NOFS);
957 if (r)
958 goto fail;
959
f24e9980 960 return req;
13d1ad16
ID
961
962fail:
963 ceph_osdc_put_request(req);
964 return ERR_PTR(r);
f24e9980 965}
3d14c5d2 966EXPORT_SYMBOL(ceph_osdc_new_request);
f24e9980
SW
967
968/*
969 * We keep osd requests in an rbtree, sorted by ->r_tid.
970 */
fcd00b68 971DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
f24e9980 972
0247a0cf
ID
973static bool osd_homeless(struct ceph_osd *osd)
974{
975 return osd->o_osd == CEPH_HOMELESS_OSD;
976}
977
5aea3dcd 978static bool osd_registered(struct ceph_osd *osd)
f24e9980 979{
5aea3dcd 980 verify_osdc_locked(osd->o_osdc);
f24e9980 981
5aea3dcd 982 return !RB_EMPTY_NODE(&osd->o_node);
f24e9980
SW
983}
984
0247a0cf
ID
985/*
986 * Assumes @osd is zero-initialized.
987 */
988static void osd_init(struct ceph_osd *osd)
989{
990 atomic_set(&osd->o_ref, 1);
991 RB_CLEAR_NODE(&osd->o_node);
5aea3dcd 992 osd->o_requests = RB_ROOT;
922dab61 993 osd->o_linger_requests = RB_ROOT;
0247a0cf
ID
994 INIT_LIST_HEAD(&osd->o_osd_lru);
995 INIT_LIST_HEAD(&osd->o_keepalive_item);
996 osd->o_incarnation = 1;
5aea3dcd 997 mutex_init(&osd->lock);
0247a0cf
ID
998}
999
1000static void osd_cleanup(struct ceph_osd *osd)
1001{
1002 WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
5aea3dcd 1003 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
922dab61 1004 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
0247a0cf
ID
1005 WARN_ON(!list_empty(&osd->o_osd_lru));
1006 WARN_ON(!list_empty(&osd->o_keepalive_item));
1007
1008 if (osd->o_auth.authorizer) {
1009 WARN_ON(osd_homeless(osd));
1010 ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1011 }
1012}
1013
f24e9980
SW
1014/*
1015 * Track open sessions with osds.
1016 */
e10006f8 1017static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
f24e9980
SW
1018{
1019 struct ceph_osd *osd;
1020
0247a0cf
ID
1021 WARN_ON(onum == CEPH_HOMELESS_OSD);
1022
7a28f59b 1023 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
0247a0cf 1024 osd_init(osd);
f24e9980 1025 osd->o_osdc = osdc;
e10006f8 1026 osd->o_osd = onum;
f24e9980 1027
b7a9e5dd 1028 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
4e7a5dcd 1029
f24e9980
SW
1030 return osd;
1031}
1032
1033static struct ceph_osd *get_osd(struct ceph_osd *osd)
1034{
1035 if (atomic_inc_not_zero(&osd->o_ref)) {
1036 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
1037 atomic_read(&osd->o_ref));
1038 return osd;
1039 } else {
1040 dout("get_osd %p FAIL\n", osd);
1041 return NULL;
1042 }
1043}
1044
1045static void put_osd(struct ceph_osd *osd)
1046{
1047 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
1048 atomic_read(&osd->o_ref) - 1);
b28ec2f3 1049 if (atomic_dec_and_test(&osd->o_ref)) {
0247a0cf 1050 osd_cleanup(osd);
f24e9980 1051 kfree(osd);
79494d1b 1052 }
f24e9980
SW
1053}
1054
fcd00b68
ID
1055DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1056
9dd2845c 1057static void __move_osd_to_lru(struct ceph_osd *osd)
f5a2041b 1058{
9dd2845c
ID
1059 struct ceph_osd_client *osdc = osd->o_osdc;
1060
1061 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
f5a2041b 1062 BUG_ON(!list_empty(&osd->o_osd_lru));
bbf37ec3 1063
9dd2845c 1064 spin_lock(&osdc->osd_lru_lock);
f5a2041b 1065 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
9dd2845c
ID
1066 spin_unlock(&osdc->osd_lru_lock);
1067
a319bf56 1068 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
f5a2041b
YS
1069}
1070
9dd2845c 1071static void maybe_move_osd_to_lru(struct ceph_osd *osd)
bbf37ec3 1072{
5aea3dcd 1073 if (RB_EMPTY_ROOT(&osd->o_requests) &&
922dab61 1074 RB_EMPTY_ROOT(&osd->o_linger_requests))
9dd2845c 1075 __move_osd_to_lru(osd);
bbf37ec3
ID
1076}
1077
f5a2041b
YS
1078static void __remove_osd_from_lru(struct ceph_osd *osd)
1079{
9dd2845c
ID
1080 struct ceph_osd_client *osdc = osd->o_osdc;
1081
1082 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1083
1084 spin_lock(&osdc->osd_lru_lock);
f5a2041b
YS
1085 if (!list_empty(&osd->o_osd_lru))
1086 list_del_init(&osd->o_osd_lru);
9dd2845c 1087 spin_unlock(&osdc->osd_lru_lock);
f5a2041b
YS
1088}
1089
5aea3dcd
ID
1090/*
1091 * Close the connection and assign any leftover requests to the
1092 * homeless session.
1093 */
1094static void close_osd(struct ceph_osd *osd)
1095{
1096 struct ceph_osd_client *osdc = osd->o_osdc;
1097 struct rb_node *n;
1098
1099 verify_osdc_wrlocked(osdc);
1100 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1101
1102 ceph_con_close(&osd->o_con);
1103
1104 for (n = rb_first(&osd->o_requests); n; ) {
1105 struct ceph_osd_request *req =
1106 rb_entry(n, struct ceph_osd_request, r_node);
1107
1108 n = rb_next(n); /* unlink_request() */
1109
1110 dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1111 unlink_request(osd, req);
1112 link_request(&osdc->homeless_osd, req);
1113 }
922dab61
ID
1114 for (n = rb_first(&osd->o_linger_requests); n; ) {
1115 struct ceph_osd_linger_request *lreq =
1116 rb_entry(n, struct ceph_osd_linger_request, node);
1117
1118 n = rb_next(n); /* unlink_linger() */
1119
1120 dout(" reassigning lreq %p linger_id %llu\n", lreq,
1121 lreq->linger_id);
1122 unlink_linger(osd, lreq);
1123 link_linger(&osdc->homeless_osd, lreq);
1124 }
5aea3dcd
ID
1125
1126 __remove_osd_from_lru(osd);
1127 erase_osd(&osdc->osds, osd);
1128 put_osd(osd);
1129}
1130
f24e9980
SW
1131/*
1132 * reset osd connect
1133 */
5aea3dcd 1134static int reopen_osd(struct ceph_osd *osd)
f24e9980 1135{
c3acb181 1136 struct ceph_entity_addr *peer_addr;
f24e9980 1137
5aea3dcd
ID
1138 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1139
1140 if (RB_EMPTY_ROOT(&osd->o_requests) &&
922dab61 1141 RB_EMPTY_ROOT(&osd->o_linger_requests)) {
5aea3dcd 1142 close_osd(osd);
c3acb181
AE
1143 return -ENODEV;
1144 }
1145
5aea3dcd 1146 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
c3acb181
AE
1147 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1148 !ceph_con_opened(&osd->o_con)) {
5aea3dcd 1149 struct rb_node *n;
c3acb181 1150
0b4af2e8
ID
1151 dout("osd addr hasn't changed and connection never opened, "
1152 "letting msgr retry\n");
87b315a5 1153 /* touch each r_stamp for handle_timeout()'s benfit */
5aea3dcd
ID
1154 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1155 struct ceph_osd_request *req =
1156 rb_entry(n, struct ceph_osd_request, r_node);
87b315a5 1157 req->r_stamp = jiffies;
5aea3dcd 1158 }
c3acb181
AE
1159
1160 return -EAGAIN;
f24e9980 1161 }
c3acb181
AE
1162
1163 ceph_con_close(&osd->o_con);
1164 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1165 osd->o_incarnation++;
1166
1167 return 0;
f24e9980
SW
1168}
1169
5aea3dcd
ID
1170static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1171 bool wrlocked)
f24e9980 1172{
5aea3dcd 1173 struct ceph_osd *osd;
35f9f8a0 1174
5aea3dcd
ID
1175 if (wrlocked)
1176 verify_osdc_wrlocked(osdc);
1177 else
1178 verify_osdc_locked(osdc);
f24e9980 1179
5aea3dcd
ID
1180 if (o != CEPH_HOMELESS_OSD)
1181 osd = lookup_osd(&osdc->osds, o);
1182 else
1183 osd = &osdc->homeless_osd;
1184 if (!osd) {
1185 if (!wrlocked)
1186 return ERR_PTR(-EAGAIN);
0ba6478d 1187
5aea3dcd
ID
1188 osd = create_osd(osdc, o);
1189 insert_osd(&osdc->osds, osd);
1190 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1191 &osdc->osdmap->osd_addr[osd->o_osd]);
0ba6478d 1192 }
f24e9980 1193
5aea3dcd
ID
1194 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1195 return osd;
f24e9980
SW
1196}
1197
1198/*
5aea3dcd
ID
1199 * Create request <-> OSD session relation.
1200 *
1201 * @req has to be assigned a tid, @osd may be homeless.
f24e9980 1202 */
5aea3dcd 1203static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
f24e9980 1204{
5aea3dcd
ID
1205 verify_osd_locked(osd);
1206 WARN_ON(!req->r_tid || req->r_osd);
1207 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1208 req, req->r_tid);
1209
1210 if (!osd_homeless(osd))
1211 __remove_osd_from_lru(osd);
1212 else
1213 atomic_inc(&osd->o_osdc->num_homeless);
1214
1215 get_osd(osd);
1216 insert_request(&osd->o_requests, req);
1217 req->r_osd = osd;
f24e9980
SW
1218}
1219
5aea3dcd
ID
1220static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1221{
1222 verify_osd_locked(osd);
1223 WARN_ON(req->r_osd != osd);
1224 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1225 req, req->r_tid);
1226
1227 req->r_osd = NULL;
1228 erase_request(&osd->o_requests, req);
1229 put_osd(osd);
1230
1231 if (!osd_homeless(osd))
1232 maybe_move_osd_to_lru(osd);
1233 else
1234 atomic_dec(&osd->o_osdc->num_homeless);
1235}
1236
63244fa1
ID
1237static bool __pool_full(struct ceph_pg_pool_info *pi)
1238{
1239 return pi->flags & CEPH_POOL_FLAG_FULL;
1240}
1241
42c1b124
ID
1242static bool have_pool_full(struct ceph_osd_client *osdc)
1243{
1244 struct rb_node *n;
1245
1246 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1247 struct ceph_pg_pool_info *pi =
1248 rb_entry(n, struct ceph_pg_pool_info, node);
1249
1250 if (__pool_full(pi))
1251 return true;
1252 }
1253
1254 return false;
1255}
1256
5aea3dcd
ID
1257static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1258{
1259 struct ceph_pg_pool_info *pi;
1260
1261 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1262 if (!pi)
1263 return false;
1264
1265 return __pool_full(pi);
1266}
1267
d29adb34
JD
1268/*
1269 * Returns whether a request should be blocked from being sent
1270 * based on the current osdmap and osd_client settings.
d29adb34 1271 */
63244fa1
ID
1272static bool target_should_be_paused(struct ceph_osd_client *osdc,
1273 const struct ceph_osd_request_target *t,
1274 struct ceph_pg_pool_info *pi)
1275{
1276 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
1277 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
1278 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
1279 __pool_full(pi);
1280
1281 WARN_ON(pi->id != t->base_oloc.pool);
1282 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
1283 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
1284}
1285
63244fa1
ID
1286enum calc_target_result {
1287 CALC_TARGET_NO_ACTION = 0,
1288 CALC_TARGET_NEED_RESEND,
1289 CALC_TARGET_POOL_DNE,
1290};
1291
1292static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1293 struct ceph_osd_request_target *t,
1294 u32 *last_force_resend,
1295 bool any_change)
1296{
1297 struct ceph_pg_pool_info *pi;
1298 struct ceph_pg pgid, last_pgid;
1299 struct ceph_osds up, acting;
1300 bool force_resend = false;
1301 bool need_check_tiering = false;
1302 bool need_resend = false;
1303 bool sort_bitwise = ceph_osdmap_flag(osdc->osdmap,
1304 CEPH_OSDMAP_SORTBITWISE);
1305 enum calc_target_result ct_res;
1306 int ret;
1307
1308 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1309 if (!pi) {
1310 t->osd = CEPH_HOMELESS_OSD;
1311 ct_res = CALC_TARGET_POOL_DNE;
1312 goto out;
1313 }
1314
1315 if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1316 if (last_force_resend &&
1317 *last_force_resend < pi->last_force_request_resend) {
1318 *last_force_resend = pi->last_force_request_resend;
1319 force_resend = true;
1320 } else if (!last_force_resend) {
1321 force_resend = true;
1322 }
1323 }
1324 if (ceph_oid_empty(&t->target_oid) || force_resend) {
1325 ceph_oid_copy(&t->target_oid, &t->base_oid);
1326 need_check_tiering = true;
1327 }
1328 if (ceph_oloc_empty(&t->target_oloc) || force_resend) {
1329 ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1330 need_check_tiering = true;
1331 }
1332
1333 if (need_check_tiering &&
1334 (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1335 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
1336 t->target_oloc.pool = pi->read_tier;
1337 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
1338 t->target_oloc.pool = pi->write_tier;
1339 }
1340
1341 ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid,
1342 &t->target_oloc, &pgid);
1343 if (ret) {
1344 WARN_ON(ret != -ENOENT);
1345 t->osd = CEPH_HOMELESS_OSD;
1346 ct_res = CALC_TARGET_POOL_DNE;
1347 goto out;
1348 }
1349 last_pgid.pool = pgid.pool;
1350 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1351
1352 ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting);
1353 if (any_change &&
1354 ceph_is_new_interval(&t->acting,
1355 &acting,
1356 &t->up,
1357 &up,
1358 t->size,
1359 pi->size,
1360 t->min_size,
1361 pi->min_size,
1362 t->pg_num,
1363 pi->pg_num,
1364 t->sort_bitwise,
1365 sort_bitwise,
1366 &last_pgid))
1367 force_resend = true;
1368
1369 if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1370 t->paused = false;
1371 need_resend = true;
1372 }
1373
1374 if (ceph_pg_compare(&t->pgid, &pgid) ||
1375 ceph_osds_changed(&t->acting, &acting, any_change) ||
1376 force_resend) {
1377 t->pgid = pgid; /* struct */
1378 ceph_osds_copy(&t->acting, &acting);
1379 ceph_osds_copy(&t->up, &up);
1380 t->size = pi->size;
1381 t->min_size = pi->min_size;
1382 t->pg_num = pi->pg_num;
1383 t->pg_num_mask = pi->pg_num_mask;
1384 t->sort_bitwise = sort_bitwise;
1385
1386 t->osd = acting.primary;
1387 need_resend = true;
1388 }
1389
1390 ct_res = need_resend ? CALC_TARGET_NEED_RESEND : CALC_TARGET_NO_ACTION;
1391out:
1392 dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
1393 return ct_res;
1394}
1395
bb873b53
ID
1396static void setup_request_data(struct ceph_osd_request *req,
1397 struct ceph_msg *msg)
f24e9980 1398{
bb873b53
ID
1399 u32 data_len = 0;
1400 int i;
1401
1402 if (!list_empty(&msg->data))
1403 return;
f24e9980 1404
bb873b53
ID
1405 WARN_ON(msg->data_length);
1406 for (i = 0; i < req->r_num_ops; i++) {
1407 struct ceph_osd_req_op *op = &req->r_ops[i];
1408
1409 switch (op->op) {
1410 /* request */
1411 case CEPH_OSD_OP_WRITE:
1412 case CEPH_OSD_OP_WRITEFULL:
1413 WARN_ON(op->indata_len != op->extent.length);
1414 ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
1415 break;
1416 case CEPH_OSD_OP_SETXATTR:
1417 case CEPH_OSD_OP_CMPXATTR:
1418 WARN_ON(op->indata_len != op->xattr.name_len +
1419 op->xattr.value_len);
1420 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
1421 break;
922dab61
ID
1422 case CEPH_OSD_OP_NOTIFY_ACK:
1423 ceph_osdc_msg_data_add(msg,
1424 &op->notify_ack.request_data);
1425 break;
bb873b53
ID
1426
1427 /* reply */
1428 case CEPH_OSD_OP_STAT:
1429 ceph_osdc_msg_data_add(req->r_reply,
1430 &op->raw_data_in);
1431 break;
1432 case CEPH_OSD_OP_READ:
1433 ceph_osdc_msg_data_add(req->r_reply,
1434 &op->extent.osd_data);
1435 break;
1436
1437 /* both */
1438 case CEPH_OSD_OP_CALL:
1439 WARN_ON(op->indata_len != op->cls.class_len +
1440 op->cls.method_len +
1441 op->cls.indata_len);
1442 ceph_osdc_msg_data_add(msg, &op->cls.request_info);
1443 /* optional, can be NONE */
1444 ceph_osdc_msg_data_add(msg, &op->cls.request_data);
1445 /* optional, can be NONE */
1446 ceph_osdc_msg_data_add(req->r_reply,
1447 &op->cls.response_data);
1448 break;
19079203
ID
1449 case CEPH_OSD_OP_NOTIFY:
1450 ceph_osdc_msg_data_add(msg,
1451 &op->notify.request_data);
1452 ceph_osdc_msg_data_add(req->r_reply,
1453 &op->notify.response_data);
1454 break;
bb873b53
ID
1455 }
1456
1457 data_len += op->indata_len;
1458 }
1b83bef2 1459
bb873b53
ID
1460 WARN_ON(data_len != msg->data_length);
1461}
1462
1463static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
1464{
1465 void *p = msg->front.iov_base;
1466 void *const end = p + msg->front_alloc_len;
1467 u32 data_len = 0;
1468 int i;
1469
1470 if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
1471 /* snapshots aren't writeable */
1472 WARN_ON(req->r_snapid != CEPH_NOSNAP);
1473 } else {
1474 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
1475 req->r_data_offset || req->r_snapc);
1476 }
1477
1478 setup_request_data(req, msg);
1479
1480 ceph_encode_32(&p, 1); /* client_inc, always 1 */
1481 ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
1482 ceph_encode_32(&p, req->r_flags);
1483 ceph_encode_timespec(p, &req->r_mtime);
1484 p += sizeof(struct ceph_timespec);
1485 /* aka reassert_version */
1486 memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
1487 p += sizeof(req->r_replay_version);
1488
1489 /* oloc */
1490 ceph_encode_8(&p, 4);
1491 ceph_encode_8(&p, 4);
1492 ceph_encode_32(&p, 8 + 4 + 4);
1493 ceph_encode_64(&p, req->r_t.target_oloc.pool);
1494 ceph_encode_32(&p, -1); /* preferred */
1495 ceph_encode_32(&p, 0); /* key len */
1496
1497 /* pgid */
1498 ceph_encode_8(&p, 1);
a66dd383
ID
1499 ceph_encode_64(&p, req->r_t.pgid.pool);
1500 ceph_encode_32(&p, req->r_t.pgid.seed);
bb873b53 1501 ceph_encode_32(&p, -1); /* preferred */
2169aea6 1502
bb873b53
ID
1503 /* oid */
1504 ceph_encode_32(&p, req->r_t.target_oid.name_len);
1505 memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
1506 p += req->r_t.target_oid.name_len;
f24e9980 1507
bb873b53
ID
1508 /* ops, can imply data */
1509 ceph_encode_16(&p, req->r_num_ops);
1510 for (i = 0; i < req->r_num_ops; i++) {
1511 data_len += osd_req_encode_op(p, &req->r_ops[i]);
1512 p += sizeof(struct ceph_osd_op);
1513 }
26be8808 1514
bb873b53
ID
1515 ceph_encode_64(&p, req->r_snapid); /* snapid */
1516 if (req->r_snapc) {
1517 ceph_encode_64(&p, req->r_snapc->seq);
1518 ceph_encode_32(&p, req->r_snapc->num_snaps);
1519 for (i = 0; i < req->r_snapc->num_snaps; i++)
1520 ceph_encode_64(&p, req->r_snapc->snaps[i]);
1521 } else {
1522 ceph_encode_64(&p, 0); /* snap_seq */
1523 ceph_encode_32(&p, 0); /* snaps len */
1524 }
1525
1526 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
1527
1528 BUG_ON(p > end);
1529 msg->front.iov_len = p - msg->front.iov_base;
1530 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
1531 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1532 msg->hdr.data_len = cpu_to_le32(data_len);
1533 /*
1534 * The header "data_off" is a hint to the receiver allowing it
1535 * to align received data into its buffers such that there's no
1536 * need to re-copy it before writing it to disk (direct I/O).
1537 */
1538 msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
26be8808 1539
bb873b53
ID
1540 dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__,
1541 req, req->r_t.target_oid.name_len, req->r_t.target_oid.name,
1542 req->r_t.target_oid.name_len, msg->front.iov_len, data_len);
1543}
1544
1545/*
1546 * @req has to be assigned a tid and registered.
1547 */
1548static void send_request(struct ceph_osd_request *req)
1549{
1550 struct ceph_osd *osd = req->r_osd;
1551
5aea3dcd 1552 verify_osd_locked(osd);
bb873b53
ID
1553 WARN_ON(osd->o_osd != req->r_t.osd);
1554
5aea3dcd
ID
1555 /*
1556 * We may have a previously queued request message hanging
1557 * around. Cancel it to avoid corrupting the msgr.
1558 */
1559 if (req->r_sent)
1560 ceph_msg_revoke(req->r_request);
1561
bb873b53
ID
1562 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
1563 if (req->r_attempts)
1564 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1565 else
1566 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
1567
1568 encode_request(req, req->r_request);
1569
1570 dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
1571 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
1572 req->r_t.osd, req->r_flags, req->r_attempts);
1573
1574 req->r_t.paused = false;
1575 req->r_stamp = jiffies;
1576 req->r_attempts++;
1577
1578 req->r_sent = osd->o_incarnation;
1579 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
1580 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
f24e9980
SW
1581}
1582
42c1b124
ID
1583static void maybe_request_map(struct ceph_osd_client *osdc)
1584{
1585 bool continuous = false;
1586
5aea3dcd 1587 verify_osdc_locked(osdc);
42c1b124
ID
1588 WARN_ON(!osdc->osdmap->epoch);
1589
1590 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
1591 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
1592 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
1593 dout("%s osdc %p continuous\n", __func__, osdc);
1594 continuous = true;
1595 } else {
1596 dout("%s osdc %p onetime\n", __func__, osdc);
1597 }
1598
1599 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
1600 osdc->osdmap->epoch + 1, continuous))
1601 ceph_monc_renew_subs(&osdc->client->monc);
1602}
1603
5aea3dcd 1604static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
0bbfdfe8 1605{
5aea3dcd
ID
1606 struct ceph_osd_client *osdc = req->r_osdc;
1607 struct ceph_osd *osd;
1608 bool need_send = false;
1609 bool promoted = false;
0bbfdfe8 1610
5aea3dcd
ID
1611 WARN_ON(req->r_tid || req->r_got_reply);
1612 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
1613
1614again:
1615 calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
1616 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
1617 if (IS_ERR(osd)) {
1618 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
1619 goto promote;
0bbfdfe8
ID
1620 }
1621
5aea3dcd
ID
1622 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1623 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
1624 dout("req %p pausewr\n", req);
1625 req->r_t.paused = true;
1626 maybe_request_map(osdc);
1627 } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1628 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
1629 dout("req %p pauserd\n", req);
1630 req->r_t.paused = true;
1631 maybe_request_map(osdc);
1632 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1633 !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
1634 CEPH_OSD_FLAG_FULL_FORCE)) &&
1635 (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
1636 pool_full(osdc, req->r_t.base_oloc.pool))) {
1637 dout("req %p full/pool_full\n", req);
1638 pr_warn_ratelimited("FULL or reached pool quota\n");
1639 req->r_t.paused = true;
1640 maybe_request_map(osdc);
1641 } else if (!osd_homeless(osd)) {
1642 need_send = true;
0bbfdfe8 1643 } else {
5aea3dcd 1644 maybe_request_map(osdc);
0bbfdfe8
ID
1645 }
1646
5aea3dcd
ID
1647 mutex_lock(&osd->lock);
1648 /*
1649 * Assign the tid atomically with send_request() to protect
1650 * multiple writes to the same object from racing with each
1651 * other, resulting in out of order ops on the OSDs.
1652 */
1653 req->r_tid = atomic64_inc_return(&osdc->last_tid);
1654 link_request(osd, req);
1655 if (need_send)
1656 send_request(req);
1657 mutex_unlock(&osd->lock);
1658
1659 if (promoted)
1660 downgrade_write(&osdc->lock);
1661 return;
1662
1663promote:
1664 up_read(&osdc->lock);
1665 down_write(&osdc->lock);
1666 wrlocked = true;
1667 promoted = true;
1668 goto again;
1669}
1670
1671static void account_request(struct ceph_osd_request *req)
1672{
1673 unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
1674
1675 if (req->r_flags & CEPH_OSD_FLAG_READ) {
1676 WARN_ON(req->r_flags & mask);
1677 req->r_flags |= CEPH_OSD_FLAG_ACK;
1678 } else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
1679 WARN_ON(!(req->r_flags & mask));
1680 else
1681 WARN_ON(1);
1682
1683 WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
1684 atomic_inc(&req->r_osdc->num_requests);
1685}
1686
1687static void submit_request(struct ceph_osd_request *req, bool wrlocked)
1688{
1689 ceph_osdc_get_request(req);
1690 account_request(req);
1691 __submit_request(req, wrlocked);
1692}
1693
1694static void __finish_request(struct ceph_osd_request *req)
1695{
1696 struct ceph_osd_client *osdc = req->r_osdc;
1697 struct ceph_osd *osd = req->r_osd;
1698
1699 verify_osd_locked(osd);
1700 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
1701
1702 unlink_request(osd, req);
1703 atomic_dec(&osdc->num_requests);
1704
1705 /*
1706 * If an OSD has failed or returned and a request has been sent
1707 * twice, it's possible to get a reply and end up here while the
1708 * request message is queued for delivery. We will ignore the
1709 * reply, so not a big deal, but better to try and catch it.
1710 */
1711 ceph_msg_revoke(req->r_request);
1712 ceph_msg_revoke_incoming(req->r_reply);
1713}
1714
1715static void finish_request(struct ceph_osd_request *req)
1716{
1717 __finish_request(req);
1718 ceph_osdc_put_request(req);
0bbfdfe8
ID
1719}
1720
fe5da05e
ID
1721static void __complete_request(struct ceph_osd_request *req)
1722{
1723 if (req->r_callback)
1724 req->r_callback(req);
1725 else
1726 complete_all(&req->r_completion);
1727}
1728
5aea3dcd
ID
1729static void cancel_request(struct ceph_osd_request *req)
1730{
1731 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
1732
1733 finish_request(req);
1734}
1735
922dab61
ID
1736/*
1737 * lingering requests, watch/notify v2 infrastructure
1738 */
1739static void linger_release(struct kref *kref)
1740{
1741 struct ceph_osd_linger_request *lreq =
1742 container_of(kref, struct ceph_osd_linger_request, kref);
1743
1744 dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
1745 lreq->reg_req, lreq->ping_req);
1746 WARN_ON(!RB_EMPTY_NODE(&lreq->node));
1747 WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
1748 WARN_ON(!list_empty(&lreq->scan_item));
b07d3c4b 1749 WARN_ON(!list_empty(&lreq->pending_lworks));
922dab61
ID
1750 WARN_ON(lreq->osd);
1751
1752 if (lreq->reg_req)
1753 ceph_osdc_put_request(lreq->reg_req);
1754 if (lreq->ping_req)
1755 ceph_osdc_put_request(lreq->ping_req);
1756 target_destroy(&lreq->t);
1757 kfree(lreq);
1758}
1759
1760static void linger_put(struct ceph_osd_linger_request *lreq)
1761{
1762 if (lreq)
1763 kref_put(&lreq->kref, linger_release);
1764}
1765
1766static struct ceph_osd_linger_request *
1767linger_get(struct ceph_osd_linger_request *lreq)
1768{
1769 kref_get(&lreq->kref);
1770 return lreq;
1771}
1772
1773static struct ceph_osd_linger_request *
1774linger_alloc(struct ceph_osd_client *osdc)
1775{
1776 struct ceph_osd_linger_request *lreq;
1777
1778 lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
1779 if (!lreq)
1780 return NULL;
1781
1782 kref_init(&lreq->kref);
1783 mutex_init(&lreq->lock);
1784 RB_CLEAR_NODE(&lreq->node);
1785 RB_CLEAR_NODE(&lreq->osdc_node);
1786 INIT_LIST_HEAD(&lreq->scan_item);
b07d3c4b 1787 INIT_LIST_HEAD(&lreq->pending_lworks);
922dab61 1788 init_completion(&lreq->reg_commit_wait);
19079203 1789 init_completion(&lreq->notify_finish_wait);
922dab61
ID
1790
1791 lreq->osdc = osdc;
1792 target_init(&lreq->t);
1793
1794 dout("%s lreq %p\n", __func__, lreq);
1795 return lreq;
1796}
1797
1798DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
1799DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
1800
1801/*
1802 * Create linger request <-> OSD session relation.
1803 *
1804 * @lreq has to be registered, @osd may be homeless.
1805 */
1806static void link_linger(struct ceph_osd *osd,
1807 struct ceph_osd_linger_request *lreq)
1808{
1809 verify_osd_locked(osd);
1810 WARN_ON(!lreq->linger_id || lreq->osd);
1811 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
1812 osd->o_osd, lreq, lreq->linger_id);
1813
1814 if (!osd_homeless(osd))
1815 __remove_osd_from_lru(osd);
1816 else
1817 atomic_inc(&osd->o_osdc->num_homeless);
1818
1819 get_osd(osd);
1820 insert_linger(&osd->o_linger_requests, lreq);
1821 lreq->osd = osd;
1822}
1823
1824static void unlink_linger(struct ceph_osd *osd,
1825 struct ceph_osd_linger_request *lreq)
1826{
1827 verify_osd_locked(osd);
1828 WARN_ON(lreq->osd != osd);
1829 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
1830 osd->o_osd, lreq, lreq->linger_id);
1831
1832 lreq->osd = NULL;
1833 erase_linger(&osd->o_linger_requests, lreq);
1834 put_osd(osd);
1835
1836 if (!osd_homeless(osd))
1837 maybe_move_osd_to_lru(osd);
1838 else
1839 atomic_dec(&osd->o_osdc->num_homeless);
1840}
1841
1842static bool __linger_registered(struct ceph_osd_linger_request *lreq)
1843{
1844 verify_osdc_locked(lreq->osdc);
1845
1846 return !RB_EMPTY_NODE(&lreq->osdc_node);
1847}
1848
1849static bool linger_registered(struct ceph_osd_linger_request *lreq)
1850{
1851 struct ceph_osd_client *osdc = lreq->osdc;
1852 bool registered;
1853
1854 down_read(&osdc->lock);
1855 registered = __linger_registered(lreq);
1856 up_read(&osdc->lock);
1857
1858 return registered;
1859}
1860
1861static void linger_register(struct ceph_osd_linger_request *lreq)
1862{
1863 struct ceph_osd_client *osdc = lreq->osdc;
1864
1865 verify_osdc_wrlocked(osdc);
1866 WARN_ON(lreq->linger_id);
1867
1868 linger_get(lreq);
1869 lreq->linger_id = ++osdc->last_linger_id;
1870 insert_linger_osdc(&osdc->linger_requests, lreq);
1871}
1872
1873static void linger_unregister(struct ceph_osd_linger_request *lreq)
1874{
1875 struct ceph_osd_client *osdc = lreq->osdc;
1876
1877 verify_osdc_wrlocked(osdc);
1878
1879 erase_linger_osdc(&osdc->linger_requests, lreq);
1880 linger_put(lreq);
1881}
1882
1883static void cancel_linger_request(struct ceph_osd_request *req)
1884{
1885 struct ceph_osd_linger_request *lreq = req->r_priv;
1886
1887 WARN_ON(!req->r_linger);
1888 cancel_request(req);
1889 linger_put(lreq);
1890}
1891
1892struct linger_work {
1893 struct work_struct work;
1894 struct ceph_osd_linger_request *lreq;
b07d3c4b
ID
1895 struct list_head pending_item;
1896 unsigned long queued_stamp;
922dab61
ID
1897
1898 union {
1899 struct {
1900 u64 notify_id;
1901 u64 notifier_id;
1902 void *payload; /* points into @msg front */
1903 size_t payload_len;
1904
1905 struct ceph_msg *msg; /* for ceph_msg_put() */
1906 } notify;
1907 struct {
1908 int err;
1909 } error;
1910 };
1911};
1912
1913static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
1914 work_func_t workfn)
1915{
1916 struct linger_work *lwork;
1917
1918 lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
1919 if (!lwork)
1920 return NULL;
1921
1922 INIT_WORK(&lwork->work, workfn);
b07d3c4b 1923 INIT_LIST_HEAD(&lwork->pending_item);
922dab61
ID
1924 lwork->lreq = linger_get(lreq);
1925
1926 return lwork;
1927}
1928
1929static void lwork_free(struct linger_work *lwork)
1930{
1931 struct ceph_osd_linger_request *lreq = lwork->lreq;
1932
b07d3c4b
ID
1933 mutex_lock(&lreq->lock);
1934 list_del(&lwork->pending_item);
1935 mutex_unlock(&lreq->lock);
1936
922dab61
ID
1937 linger_put(lreq);
1938 kfree(lwork);
1939}
1940
1941static void lwork_queue(struct linger_work *lwork)
1942{
1943 struct ceph_osd_linger_request *lreq = lwork->lreq;
1944 struct ceph_osd_client *osdc = lreq->osdc;
1945
1946 verify_lreq_locked(lreq);
b07d3c4b
ID
1947 WARN_ON(!list_empty(&lwork->pending_item));
1948
1949 lwork->queued_stamp = jiffies;
1950 list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
922dab61
ID
1951 queue_work(osdc->notify_wq, &lwork->work);
1952}
1953
1954static void do_watch_notify(struct work_struct *w)
1955{
1956 struct linger_work *lwork = container_of(w, struct linger_work, work);
1957 struct ceph_osd_linger_request *lreq = lwork->lreq;
1958
1959 if (!linger_registered(lreq)) {
1960 dout("%s lreq %p not registered\n", __func__, lreq);
1961 goto out;
1962 }
1963
19079203 1964 WARN_ON(!lreq->is_watch);
922dab61
ID
1965 dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
1966 __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
1967 lwork->notify.payload_len);
1968 lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
1969 lwork->notify.notifier_id, lwork->notify.payload,
1970 lwork->notify.payload_len);
1971
1972out:
1973 ceph_msg_put(lwork->notify.msg);
1974 lwork_free(lwork);
1975}
1976
1977static void do_watch_error(struct work_struct *w)
1978{
1979 struct linger_work *lwork = container_of(w, struct linger_work, work);
1980 struct ceph_osd_linger_request *lreq = lwork->lreq;
1981
1982 if (!linger_registered(lreq)) {
1983 dout("%s lreq %p not registered\n", __func__, lreq);
1984 goto out;
1985 }
1986
1987 dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
1988 lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
1989
1990out:
1991 lwork_free(lwork);
1992}
1993
1994static void queue_watch_error(struct ceph_osd_linger_request *lreq)
1995{
1996 struct linger_work *lwork;
1997
1998 lwork = lwork_alloc(lreq, do_watch_error);
1999 if (!lwork) {
2000 pr_err("failed to allocate error-lwork\n");
2001 return;
2002 }
2003
2004 lwork->error.err = lreq->last_error;
2005 lwork_queue(lwork);
2006}
2007
2008static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
2009 int result)
2010{
2011 if (!completion_done(&lreq->reg_commit_wait)) {
2012 lreq->reg_commit_error = (result <= 0 ? result : 0);
2013 complete_all(&lreq->reg_commit_wait);
2014 }
2015}
2016
2017static void linger_commit_cb(struct ceph_osd_request *req)
2018{
2019 struct ceph_osd_linger_request *lreq = req->r_priv;
2020
2021 mutex_lock(&lreq->lock);
2022 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
2023 lreq->linger_id, req->r_result);
2024 WARN_ON(!__linger_registered(lreq));
2025 linger_reg_commit_complete(lreq, req->r_result);
2026 lreq->committed = true;
2027
19079203
ID
2028 if (!lreq->is_watch) {
2029 struct ceph_osd_data *osd_data =
2030 osd_req_op_data(req, 0, notify, response_data);
2031 void *p = page_address(osd_data->pages[0]);
2032
2033 WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
2034 osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
2035
2036 /* make note of the notify_id */
2037 if (req->r_ops[0].outdata_len >= sizeof(u64)) {
2038 lreq->notify_id = ceph_decode_64(&p);
2039 dout("lreq %p notify_id %llu\n", lreq,
2040 lreq->notify_id);
2041 } else {
2042 dout("lreq %p no notify_id\n", lreq);
2043 }
2044 }
2045
922dab61
ID
2046 mutex_unlock(&lreq->lock);
2047 linger_put(lreq);
2048}
2049
2050static int normalize_watch_error(int err)
2051{
2052 /*
2053 * Translate ENOENT -> ENOTCONN so that a delete->disconnection
2054 * notification and a failure to reconnect because we raced with
2055 * the delete appear the same to the user.
2056 */
2057 if (err == -ENOENT)
2058 err = -ENOTCONN;
2059
2060 return err;
2061}
2062
2063static void linger_reconnect_cb(struct ceph_osd_request *req)
2064{
2065 struct ceph_osd_linger_request *lreq = req->r_priv;
2066
2067 mutex_lock(&lreq->lock);
2068 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
2069 lreq, lreq->linger_id, req->r_result, lreq->last_error);
2070 if (req->r_result < 0) {
2071 if (!lreq->last_error) {
2072 lreq->last_error = normalize_watch_error(req->r_result);
2073 queue_watch_error(lreq);
2074 }
2075 }
2076
2077 mutex_unlock(&lreq->lock);
2078 linger_put(lreq);
2079}
2080
2081static void send_linger(struct ceph_osd_linger_request *lreq)
2082{
2083 struct ceph_osd_request *req = lreq->reg_req;
2084 struct ceph_osd_req_op *op = &req->r_ops[0];
2085
2086 verify_osdc_wrlocked(req->r_osdc);
2087 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2088
2089 if (req->r_osd)
2090 cancel_linger_request(req);
2091
2092 request_reinit(req);
2093 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
2094 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
2095 req->r_flags = lreq->t.flags;
2096 req->r_mtime = lreq->mtime;
2097
2098 mutex_lock(&lreq->lock);
19079203 2099 if (lreq->is_watch && lreq->committed) {
922dab61
ID
2100 WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2101 op->watch.cookie != lreq->linger_id);
2102 op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
2103 op->watch.gen = ++lreq->register_gen;
2104 dout("lreq %p reconnect register_gen %u\n", lreq,
2105 op->watch.gen);
2106 req->r_callback = linger_reconnect_cb;
2107 } else {
19079203
ID
2108 if (!lreq->is_watch)
2109 lreq->notify_id = 0;
2110 else
2111 WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
922dab61
ID
2112 dout("lreq %p register\n", lreq);
2113 req->r_callback = linger_commit_cb;
2114 }
2115 mutex_unlock(&lreq->lock);
2116
2117 req->r_priv = linger_get(lreq);
2118 req->r_linger = true;
2119
2120 submit_request(req, true);
2121}
2122
2123static void linger_ping_cb(struct ceph_osd_request *req)
2124{
2125 struct ceph_osd_linger_request *lreq = req->r_priv;
2126
2127 mutex_lock(&lreq->lock);
2128 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
2129 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
2130 lreq->last_error);
2131 if (lreq->register_gen == req->r_ops[0].watch.gen) {
b07d3c4b
ID
2132 if (!req->r_result) {
2133 lreq->watch_valid_thru = lreq->ping_sent;
2134 } else if (!lreq->last_error) {
922dab61
ID
2135 lreq->last_error = normalize_watch_error(req->r_result);
2136 queue_watch_error(lreq);
2137 }
2138 } else {
2139 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
2140 lreq->register_gen, req->r_ops[0].watch.gen);
2141 }
2142
2143 mutex_unlock(&lreq->lock);
2144 linger_put(lreq);
2145}
2146
2147static void send_linger_ping(struct ceph_osd_linger_request *lreq)
2148{
2149 struct ceph_osd_client *osdc = lreq->osdc;
2150 struct ceph_osd_request *req = lreq->ping_req;
2151 struct ceph_osd_req_op *op = &req->r_ops[0];
2152
2153 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
2154 dout("%s PAUSERD\n", __func__);
2155 return;
2156 }
2157
2158 lreq->ping_sent = jiffies;
2159 dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
2160 __func__, lreq, lreq->linger_id, lreq->ping_sent,
2161 lreq->register_gen);
2162
2163 if (req->r_osd)
2164 cancel_linger_request(req);
2165
2166 request_reinit(req);
2167 target_copy(&req->r_t, &lreq->t);
2168
2169 WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2170 op->watch.cookie != lreq->linger_id ||
2171 op->watch.op != CEPH_OSD_WATCH_OP_PING);
2172 op->watch.gen = lreq->register_gen;
2173 req->r_callback = linger_ping_cb;
2174 req->r_priv = linger_get(lreq);
2175 req->r_linger = true;
2176
2177 ceph_osdc_get_request(req);
2178 account_request(req);
2179 req->r_tid = atomic64_inc_return(&osdc->last_tid);
2180 link_request(lreq->osd, req);
2181 send_request(req);
2182}
2183
2184static void linger_submit(struct ceph_osd_linger_request *lreq)
2185{
2186 struct ceph_osd_client *osdc = lreq->osdc;
2187 struct ceph_osd *osd;
2188
2189 calc_target(osdc, &lreq->t, &lreq->last_force_resend, false);
2190 osd = lookup_create_osd(osdc, lreq->t.osd, true);
2191 link_linger(osd, lreq);
2192
2193 send_linger(lreq);
2194}
2195
2196/*
2197 * @lreq has to be both registered and linked.
2198 */
2199static void __linger_cancel(struct ceph_osd_linger_request *lreq)
2200{
19079203 2201 if (lreq->is_watch && lreq->ping_req->r_osd)
922dab61
ID
2202 cancel_linger_request(lreq->ping_req);
2203 if (lreq->reg_req->r_osd)
2204 cancel_linger_request(lreq->reg_req);
2205 unlink_linger(lreq->osd, lreq);
2206 linger_unregister(lreq);
2207}
2208
2209static void linger_cancel(struct ceph_osd_linger_request *lreq)
2210{
2211 struct ceph_osd_client *osdc = lreq->osdc;
2212
2213 down_write(&osdc->lock);
2214 if (__linger_registered(lreq))
2215 __linger_cancel(lreq);
2216 up_write(&osdc->lock);
2217}
2218
2219static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
2220{
2221 int ret;
2222
2223 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2224 ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
2225 return ret ?: lreq->reg_commit_error;
2226}
2227
19079203
ID
2228static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
2229{
2230 int ret;
2231
2232 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2233 ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
2234 return ret ?: lreq->notify_finish_error;
2235}
2236
f24e9980 2237/*
fbca9635
ID
2238 * Timeout callback, called every N seconds. When 1 or more OSD
2239 * requests has been active for more than N seconds, we send a keepalive
2240 * (tag + timestamp) to its OSD to ensure any communications channel
2241 * reset is detected.
f24e9980
SW
2242 */
2243static void handle_timeout(struct work_struct *work)
2244{
2245 struct ceph_osd_client *osdc =
2246 container_of(work, struct ceph_osd_client, timeout_work.work);
a319bf56 2247 struct ceph_options *opts = osdc->client->options;
5aea3dcd
ID
2248 unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
2249 LIST_HEAD(slow_osds);
2250 struct rb_node *n, *p;
f24e9980 2251
5aea3dcd
ID
2252 dout("%s osdc %p\n", __func__, osdc);
2253 down_write(&osdc->lock);
f24e9980 2254
422d2cb8
YS
2255 /*
2256 * ping osds that are a bit slow. this ensures that if there
2257 * is a break in the TCP connection we will notice, and reopen
2258 * a connection with that osd (from the fault callback).
2259 */
5aea3dcd
ID
2260 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2261 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2262 bool found = false;
2263
2264 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
2265 struct ceph_osd_request *req =
2266 rb_entry(p, struct ceph_osd_request, r_node);
2267
2268 if (time_before(req->r_stamp, cutoff)) {
2269 dout(" req %p tid %llu on osd%d is laggy\n",
2270 req, req->r_tid, osd->o_osd);
2271 found = true;
2272 }
2273 }
922dab61
ID
2274 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
2275 struct ceph_osd_linger_request *lreq =
2276 rb_entry(p, struct ceph_osd_linger_request, node);
2277
2278 dout(" lreq %p linger_id %llu is served by osd%d\n",
2279 lreq, lreq->linger_id, osd->o_osd);
2280 found = true;
2281
2282 mutex_lock(&lreq->lock);
19079203 2283 if (lreq->is_watch && lreq->committed && !lreq->last_error)
922dab61
ID
2284 send_linger_ping(lreq);
2285 mutex_unlock(&lreq->lock);
2286 }
422d2cb8 2287
5aea3dcd
ID
2288 if (found)
2289 list_move_tail(&osd->o_keepalive_item, &slow_osds);
422d2cb8 2290 }
5aea3dcd
ID
2291
2292 if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
2293 maybe_request_map(osdc);
2294
422d2cb8 2295 while (!list_empty(&slow_osds)) {
5aea3dcd
ID
2296 struct ceph_osd *osd = list_first_entry(&slow_osds,
2297 struct ceph_osd,
2298 o_keepalive_item);
422d2cb8 2299 list_del_init(&osd->o_keepalive_item);
f24e9980
SW
2300 ceph_con_keepalive(&osd->o_con);
2301 }
2302
5aea3dcd 2303 up_write(&osdc->lock);
fbca9635
ID
2304 schedule_delayed_work(&osdc->timeout_work,
2305 osdc->client->options->osd_keepalive_timeout);
f24e9980
SW
2306}
2307
f5a2041b
YS
2308static void handle_osds_timeout(struct work_struct *work)
2309{
2310 struct ceph_osd_client *osdc =
2311 container_of(work, struct ceph_osd_client,
2312 osds_timeout_work.work);
a319bf56 2313 unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
42a2c09f 2314 struct ceph_osd *osd, *nosd;
f5a2041b 2315
42a2c09f 2316 dout("%s osdc %p\n", __func__, osdc);
5aea3dcd 2317 down_write(&osdc->lock);
42a2c09f
ID
2318 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
2319 if (time_before(jiffies, osd->lru_ttl))
2320 break;
2321
5aea3dcd 2322 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
922dab61 2323 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
5aea3dcd 2324 close_osd(osd);
42a2c09f 2325 }
f5a2041b 2326
5aea3dcd 2327 up_write(&osdc->lock);
f5a2041b
YS
2328 schedule_delayed_work(&osdc->osds_timeout_work,
2329 round_jiffies_relative(delay));
2330}
2331
205ee118
ID
2332static int ceph_oloc_decode(void **p, void *end,
2333 struct ceph_object_locator *oloc)
2334{
2335 u8 struct_v, struct_cv;
2336 u32 len;
2337 void *struct_end;
2338 int ret = 0;
2339
2340 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
2341 struct_v = ceph_decode_8(p);
2342 struct_cv = ceph_decode_8(p);
2343 if (struct_v < 3) {
2344 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
2345 struct_v, struct_cv);
2346 goto e_inval;
2347 }
2348 if (struct_cv > 6) {
2349 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
2350 struct_v, struct_cv);
2351 goto e_inval;
2352 }
2353 len = ceph_decode_32(p);
2354 ceph_decode_need(p, end, len, e_inval);
2355 struct_end = *p + len;
2356
2357 oloc->pool = ceph_decode_64(p);
2358 *p += 4; /* skip preferred */
2359
2360 len = ceph_decode_32(p);
2361 if (len > 0) {
2362 pr_warn("ceph_object_locator::key is set\n");
2363 goto e_inval;
2364 }
2365
2366 if (struct_v >= 5) {
2367 len = ceph_decode_32(p);
2368 if (len > 0) {
2369 pr_warn("ceph_object_locator::nspace is set\n");
2370 goto e_inval;
2371 }
2372 }
2373
2374 if (struct_v >= 6) {
2375 s64 hash = ceph_decode_64(p);
2376 if (hash != -1) {
2377 pr_warn("ceph_object_locator::hash is set\n");
2378 goto e_inval;
2379 }
2380 }
2381
2382 /* skip the rest */
2383 *p = struct_end;
2384out:
2385 return ret;
2386
2387e_inval:
2388 ret = -EINVAL;
2389 goto out;
2390}
2391
2392static int ceph_redirect_decode(void **p, void *end,
2393 struct ceph_request_redirect *redir)
2394{
2395 u8 struct_v, struct_cv;
2396 u32 len;
2397 void *struct_end;
2398 int ret;
2399
2400 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
2401 struct_v = ceph_decode_8(p);
2402 struct_cv = ceph_decode_8(p);
2403 if (struct_cv > 1) {
2404 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
2405 struct_v, struct_cv);
2406 goto e_inval;
2407 }
2408 len = ceph_decode_32(p);
2409 ceph_decode_need(p, end, len, e_inval);
2410 struct_end = *p + len;
2411
2412 ret = ceph_oloc_decode(p, end, &redir->oloc);
2413 if (ret)
2414 goto out;
2415
2416 len = ceph_decode_32(p);
2417 if (len > 0) {
2418 pr_warn("ceph_request_redirect::object_name is set\n");
2419 goto e_inval;
2420 }
2421
2422 len = ceph_decode_32(p);
2423 *p += len; /* skip osd_instructions */
2424
2425 /* skip the rest */
2426 *p = struct_end;
2427out:
2428 return ret;
2429
2430e_inval:
2431 ret = -EINVAL;
2432 goto out;
2433}
2434
fe5da05e
ID
2435struct MOSDOpReply {
2436 struct ceph_pg pgid;
2437 u64 flags;
2438 int result;
2439 u32 epoch;
2440 int num_ops;
2441 u32 outdata_len[CEPH_OSD_MAX_OPS];
2442 s32 rval[CEPH_OSD_MAX_OPS];
2443 int retry_attempt;
2444 struct ceph_eversion replay_version;
2445 u64 user_version;
2446 struct ceph_request_redirect redirect;
2447};
25845472 2448
fe5da05e 2449static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
f24e9980 2450{
fe5da05e
ID
2451 void *p = msg->front.iov_base;
2452 void *const end = p + msg->front.iov_len;
2453 u16 version = le16_to_cpu(msg->hdr.version);
2454 struct ceph_eversion bad_replay_version;
b0b31a8f 2455 u8 decode_redir;
fe5da05e
ID
2456 u32 len;
2457 int ret;
2458 int i;
1b83bef2 2459
fe5da05e
ID
2460 ceph_decode_32_safe(&p, end, len, e_inval);
2461 ceph_decode_need(&p, end, len, e_inval);
2462 p += len; /* skip oid */
1b83bef2 2463
fe5da05e
ID
2464 ret = ceph_decode_pgid(&p, end, &m->pgid);
2465 if (ret)
2466 return ret;
1b83bef2 2467
fe5da05e
ID
2468 ceph_decode_64_safe(&p, end, m->flags, e_inval);
2469 ceph_decode_32_safe(&p, end, m->result, e_inval);
2470 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
2471 memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
2472 p += sizeof(bad_replay_version);
2473 ceph_decode_32_safe(&p, end, m->epoch, e_inval);
1b83bef2 2474
fe5da05e
ID
2475 ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
2476 if (m->num_ops > ARRAY_SIZE(m->outdata_len))
2477 goto e_inval;
1b83bef2 2478
fe5da05e
ID
2479 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
2480 e_inval);
2481 for (i = 0; i < m->num_ops; i++) {
1b83bef2 2482 struct ceph_osd_op *op = p;
1b83bef2 2483
fe5da05e 2484 m->outdata_len[i] = le32_to_cpu(op->payload_len);
1b83bef2
SW
2485 p += sizeof(*op);
2486 }
1b83bef2 2487
fe5da05e
ID
2488 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
2489 for (i = 0; i < m->num_ops; i++)
2490 ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
f24e9980 2491
fe5da05e
ID
2492 if (version >= 5) {
2493 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
2494 memcpy(&m->replay_version, p, sizeof(m->replay_version));
2495 p += sizeof(m->replay_version);
2496 ceph_decode_64_safe(&p, end, m->user_version, e_inval);
2497 } else {
2498 m->replay_version = bad_replay_version; /* struct */
2499 m->user_version = le64_to_cpu(m->replay_version.version);
2500 }
eb845ff1 2501
fe5da05e
ID
2502 if (version >= 6) {
2503 if (version >= 7)
2504 ceph_decode_8_safe(&p, end, decode_redir, e_inval);
b0b31a8f
ID
2505 else
2506 decode_redir = 1;
2507 } else {
2508 decode_redir = 0;
2509 }
2510
2511 if (decode_redir) {
fe5da05e
ID
2512 ret = ceph_redirect_decode(&p, end, &m->redirect);
2513 if (ret)
2514 return ret;
205ee118 2515 } else {
fe5da05e 2516 ceph_oloc_init(&m->redirect.oloc);
205ee118 2517 }
f24e9980 2518
fe5da05e
ID
2519 return 0;
2520
2521e_inval:
2522 return -EINVAL;
2523}
2524
2525/*
2526 * We are done with @req if
2527 * - @m is a safe reply, or
2528 * - @m is an unsafe reply and we didn't want a safe one
2529 */
2530static bool done_request(const struct ceph_osd_request *req,
2531 const struct MOSDOpReply *m)
2532{
2533 return (m->result < 0 ||
2534 (m->flags & CEPH_OSD_FLAG_ONDISK) ||
2535 !(req->r_flags & CEPH_OSD_FLAG_ONDISK));
2536}
205ee118 2537
fe5da05e
ID
2538/*
2539 * handle osd op reply. either call the callback if it is specified,
2540 * or do the completion to wake up the waiting thread.
2541 *
2542 * ->r_unsafe_callback is set? yes no
2543 *
2544 * first reply is OK (needed r_cb/r_completion, r_cb/r_completion,
2545 * any or needed/got safe) r_safe_completion r_safe_completion
2546 *
2547 * first reply is unsafe r_unsafe_cb(true) (nothing)
2548 *
2549 * when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
2550 * r_safe_completion r_safe_completion
2551 */
5aea3dcd 2552static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
fe5da05e 2553{
5aea3dcd 2554 struct ceph_osd_client *osdc = osd->o_osdc;
fe5da05e
ID
2555 struct ceph_osd_request *req;
2556 struct MOSDOpReply m;
2557 u64 tid = le64_to_cpu(msg->hdr.tid);
2558 u32 data_len = 0;
2559 bool already_acked;
2560 int ret;
2561 int i;
2562
2563 dout("%s msg %p tid %llu\n", __func__, msg, tid);
2564
5aea3dcd
ID
2565 down_read(&osdc->lock);
2566 if (!osd_registered(osd)) {
2567 dout("%s osd%d unknown\n", __func__, osd->o_osd);
2568 goto out_unlock_osdc;
2569 }
2570 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
2571
2572 mutex_lock(&osd->lock);
2573 req = lookup_request(&osd->o_requests, tid);
fe5da05e 2574 if (!req) {
5aea3dcd
ID
2575 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
2576 goto out_unlock_session;
fe5da05e 2577 }
fe5da05e
ID
2578
2579 ret = decode_MOSDOpReply(msg, &m);
2580 if (ret) {
2581 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
2582 req->r_tid, ret);
2583 ceph_msg_dump(msg);
2584 goto fail_request;
2585 }
2586 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
2587 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
2588 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
2589 le64_to_cpu(m.replay_version.version), m.user_version);
2590
2591 if (m.retry_attempt >= 0) {
2592 if (m.retry_attempt != req->r_attempts - 1) {
2593 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
2594 req, req->r_tid, m.retry_attempt,
2595 req->r_attempts - 1);
5aea3dcd 2596 goto out_unlock_session;
fe5da05e
ID
2597 }
2598 } else {
2599 WARN_ON(1); /* MOSDOpReply v4 is assumed */
2600 }
2601
2602 if (!ceph_oloc_empty(&m.redirect.oloc)) {
2603 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
2604 m.redirect.oloc.pool);
5aea3dcd
ID
2605 unlink_request(osd, req);
2606 mutex_unlock(&osd->lock);
205ee118 2607
fe5da05e 2608 ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
5aea3dcd
ID
2609 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
2610 req->r_tid = 0;
2611 __submit_request(req, false);
2612 goto out_unlock_osdc;
205ee118
ID
2613 }
2614
fe5da05e
ID
2615 if (m.num_ops != req->r_num_ops) {
2616 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
2617 req->r_num_ops, req->r_tid);
2618 goto fail_request;
f24e9980 2619 }
fe5da05e
ID
2620 for (i = 0; i < req->r_num_ops; i++) {
2621 dout(" req %p tid %llu op %d rval %d len %u\n", req,
2622 req->r_tid, i, m.rval[i], m.outdata_len[i]);
2623 req->r_ops[i].rval = m.rval[i];
2624 req->r_ops[i].outdata_len = m.outdata_len[i];
2625 data_len += m.outdata_len[i];
2626 }
2627 if (data_len != le32_to_cpu(msg->hdr.data_len)) {
2628 pr_err("sum of lens %u != %u for tid %llu\n", data_len,
2629 le32_to_cpu(msg->hdr.data_len), req->r_tid);
2630 goto fail_request;
2631 }
2632 dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__,
2633 req, req->r_tid, req->r_got_reply, m.result, data_len);
2634
2635 already_acked = req->r_got_reply;
2636 if (!already_acked) {
2637 req->r_result = m.result ?: data_len;
2638 req->r_replay_version = m.replay_version; /* struct */
2639 req->r_got_reply = true;
2640 } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
2641 dout("req %p tid %llu dup ack\n", req, req->r_tid);
5aea3dcd 2642 goto out_unlock_session;
fe5da05e
ID
2643 }
2644
2645 if (done_request(req, &m)) {
5aea3dcd 2646 __finish_request(req);
fe5da05e
ID
2647 if (req->r_linger) {
2648 WARN_ON(req->r_unsafe_callback);
922dab61
ID
2649 dout("req %p tid %llu cb (locked)\n", req, req->r_tid);
2650 __complete_request(req);
fe5da05e
ID
2651 }
2652 }
f24e9980 2653
5aea3dcd
ID
2654 mutex_unlock(&osd->lock);
2655 up_read(&osdc->lock);
f24e9980 2656
fe5da05e
ID
2657 if (done_request(req, &m)) {
2658 if (already_acked && req->r_unsafe_callback) {
2659 dout("req %p tid %llu safe-cb\n", req, req->r_tid);
61c5d6bf 2660 req->r_unsafe_callback(req, false);
922dab61 2661 } else if (!req->r_linger) {
fe5da05e
ID
2662 dout("req %p tid %llu cb\n", req, req->r_tid);
2663 __complete_request(req);
2664 }
2665 } else {
2666 if (req->r_unsafe_callback) {
2667 dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
2668 req->r_unsafe_callback(req, true);
2669 } else {
2670 WARN_ON(1);
2671 }
61c5d6bf 2672 }
fe5da05e
ID
2673 if (m.flags & CEPH_OSD_FLAG_ONDISK)
2674 complete_all(&req->r_safe_completion);
f24e9980 2675
f24e9980
SW
2676 ceph_osdc_put_request(req);
2677 return;
2678
fe5da05e 2679fail_request:
37c89bde 2680 req->r_result = -EIO;
5aea3dcd 2681 __finish_request(req);
fe5da05e
ID
2682 __complete_request(req);
2683 complete_all(&req->r_safe_completion);
5aea3dcd
ID
2684out_unlock_session:
2685 mutex_unlock(&osd->lock);
2686out_unlock_osdc:
2687 up_read(&osdc->lock);
f24e9980
SW
2688}
2689
42c1b124
ID
2690static void set_pool_was_full(struct ceph_osd_client *osdc)
2691{
2692 struct rb_node *n;
2693
2694 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
2695 struct ceph_pg_pool_info *pi =
2696 rb_entry(n, struct ceph_pg_pool_info, node);
2697
2698 pi->was_full = __pool_full(pi);
2699 }
2700}
2701
5aea3dcd 2702static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
f24e9980 2703{
5aea3dcd 2704 struct ceph_pg_pool_info *pi;
f24e9980 2705
5aea3dcd
ID
2706 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
2707 if (!pi)
2708 return false;
f24e9980 2709
5aea3dcd 2710 return pi->was_full && !__pool_full(pi);
422d2cb8
YS
2711}
2712
922dab61
ID
2713static enum calc_target_result
2714recalc_linger_target(struct ceph_osd_linger_request *lreq)
2715{
2716 struct ceph_osd_client *osdc = lreq->osdc;
2717 enum calc_target_result ct_res;
2718
2719 ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true);
2720 if (ct_res == CALC_TARGET_NEED_RESEND) {
2721 struct ceph_osd *osd;
2722
2723 osd = lookup_create_osd(osdc, lreq->t.osd, true);
2724 if (osd != lreq->osd) {
2725 unlink_linger(lreq->osd, lreq);
2726 link_linger(osd, lreq);
2727 }
2728 }
2729
2730 return ct_res;
2731}
2732
422d2cb8 2733/*
5aea3dcd 2734 * Requeue requests whose mapping to an OSD has changed.
422d2cb8 2735 */
5aea3dcd
ID
2736static void scan_requests(struct ceph_osd *osd,
2737 bool force_resend,
2738 bool cleared_full,
2739 bool check_pool_cleared_full,
2740 struct rb_root *need_resend,
2741 struct list_head *need_resend_linger)
422d2cb8 2742{
5aea3dcd
ID
2743 struct ceph_osd_client *osdc = osd->o_osdc;
2744 struct rb_node *n;
2745 bool force_resend_writes;
2746
922dab61
ID
2747 for (n = rb_first(&osd->o_linger_requests); n; ) {
2748 struct ceph_osd_linger_request *lreq =
2749 rb_entry(n, struct ceph_osd_linger_request, node);
2750 enum calc_target_result ct_res;
2751
2752 n = rb_next(n); /* recalc_linger_target() */
2753
2754 dout("%s lreq %p linger_id %llu\n", __func__, lreq,
2755 lreq->linger_id);
2756 ct_res = recalc_linger_target(lreq);
2757 switch (ct_res) {
2758 case CALC_TARGET_NO_ACTION:
2759 force_resend_writes = cleared_full ||
2760 (check_pool_cleared_full &&
2761 pool_cleared_full(osdc, lreq->t.base_oloc.pool));
2762 if (!force_resend && !force_resend_writes)
2763 break;
2764
2765 /* fall through */
2766 case CALC_TARGET_NEED_RESEND:
2767 /*
2768 * scan_requests() for the previous epoch(s)
2769 * may have already added it to the list, since
2770 * it's not unlinked here.
2771 */
2772 if (list_empty(&lreq->scan_item))
2773 list_add_tail(&lreq->scan_item, need_resend_linger);
2774 break;
2775 case CALC_TARGET_POOL_DNE:
2776 break;
2777 }
2778 }
2779
5aea3dcd
ID
2780 for (n = rb_first(&osd->o_requests); n; ) {
2781 struct ceph_osd_request *req =
2782 rb_entry(n, struct ceph_osd_request, r_node);
2783 enum calc_target_result ct_res;
2784
2785 n = rb_next(n); /* unlink_request() */
2786
2787 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2788 ct_res = calc_target(osdc, &req->r_t,
2789 &req->r_last_force_resend, false);
2790 switch (ct_res) {
2791 case CALC_TARGET_NO_ACTION:
2792 force_resend_writes = cleared_full ||
2793 (check_pool_cleared_full &&
2794 pool_cleared_full(osdc, req->r_t.base_oloc.pool));
2795 if (!force_resend &&
2796 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
2797 !force_resend_writes))
2798 break;
2799
2800 /* fall through */
2801 case CALC_TARGET_NEED_RESEND:
2802 unlink_request(osd, req);
2803 insert_request(need_resend, req);
2804 break;
2805 case CALC_TARGET_POOL_DNE:
2806 break;
b0494532 2807 }
6f6c7006 2808 }
422d2cb8 2809}
6f6c7006 2810
42c1b124 2811static int handle_one_map(struct ceph_osd_client *osdc,
5aea3dcd
ID
2812 void *p, void *end, bool incremental,
2813 struct rb_root *need_resend,
2814 struct list_head *need_resend_linger)
42c1b124
ID
2815{
2816 struct ceph_osdmap *newmap;
2817 struct rb_node *n;
2818 bool skipped_map = false;
2819 bool was_full;
2820
2821 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
2822 set_pool_was_full(osdc);
2823
2824 if (incremental)
2825 newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
2826 else
2827 newmap = ceph_osdmap_decode(&p, end);
2828 if (IS_ERR(newmap))
2829 return PTR_ERR(newmap);
2830
2831 if (newmap != osdc->osdmap) {
2832 /*
2833 * Preserve ->was_full before destroying the old map.
2834 * For pools that weren't in the old map, ->was_full
2835 * should be false.
2836 */
2837 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
2838 struct ceph_pg_pool_info *pi =
2839 rb_entry(n, struct ceph_pg_pool_info, node);
2840 struct ceph_pg_pool_info *old_pi;
2841
2842 old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
2843 if (old_pi)
2844 pi->was_full = old_pi->was_full;
2845 else
2846 WARN_ON(pi->was_full);
2847 }
2848
2849 if (osdc->osdmap->epoch &&
2850 osdc->osdmap->epoch + 1 < newmap->epoch) {
2851 WARN_ON(incremental);
2852 skipped_map = true;
2853 }
2854
2855 ceph_osdmap_destroy(osdc->osdmap);
2856 osdc->osdmap = newmap;
2857 }
2858
2859 was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
5aea3dcd
ID
2860 scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
2861 need_resend, need_resend_linger);
2862
2863 for (n = rb_first(&osdc->osds); n; ) {
2864 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2865
2866 n = rb_next(n); /* close_osd() */
2867
2868 scan_requests(osd, skipped_map, was_full, true, need_resend,
2869 need_resend_linger);
2870 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
2871 memcmp(&osd->o_con.peer_addr,
2872 ceph_osd_addr(osdc->osdmap, osd->o_osd),
2873 sizeof(struct ceph_entity_addr)))
2874 close_osd(osd);
2875 }
42c1b124
ID
2876
2877 return 0;
2878}
6f6c7006 2879
5aea3dcd
ID
2880static void kick_requests(struct ceph_osd_client *osdc,
2881 struct rb_root *need_resend,
2882 struct list_head *need_resend_linger)
2883{
922dab61 2884 struct ceph_osd_linger_request *lreq, *nlreq;
5aea3dcd
ID
2885 struct rb_node *n;
2886
2887 for (n = rb_first(need_resend); n; ) {
2888 struct ceph_osd_request *req =
2889 rb_entry(n, struct ceph_osd_request, r_node);
2890 struct ceph_osd *osd;
2891
2892 n = rb_next(n);
2893 erase_request(need_resend, req); /* before link_request() */
2894
2895 WARN_ON(req->r_osd);
2896 calc_target(osdc, &req->r_t, NULL, false);
2897 osd = lookup_create_osd(osdc, req->r_t.osd, true);
2898 link_request(osd, req);
2899 if (!req->r_linger) {
2900 if (!osd_homeless(osd) && !req->r_t.paused)
2901 send_request(req);
922dab61
ID
2902 } else {
2903 cancel_linger_request(req);
5aea3dcd
ID
2904 }
2905 }
922dab61
ID
2906
2907 list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
2908 if (!osd_homeless(lreq->osd))
2909 send_linger(lreq);
2910
2911 list_del_init(&lreq->scan_item);
2912 }
5aea3dcd
ID
2913}
2914
f24e9980
SW
2915/*
2916 * Process updated osd map.
2917 *
2918 * The message contains any number of incremental and full maps, normally
2919 * indicating some sort of topology change in the cluster. Kick requests
2920 * off to different OSDs as needed.
2921 */
2922void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2923{
42c1b124
ID
2924 void *p = msg->front.iov_base;
2925 void *const end = p + msg->front.iov_len;
f24e9980
SW
2926 u32 nr_maps, maplen;
2927 u32 epoch;
f24e9980 2928 struct ceph_fsid fsid;
5aea3dcd
ID
2929 struct rb_root need_resend = RB_ROOT;
2930 LIST_HEAD(need_resend_linger);
42c1b124
ID
2931 bool handled_incremental = false;
2932 bool was_pauserd, was_pausewr;
2933 bool pauserd, pausewr;
2934 int err;
f24e9980 2935
42c1b124 2936 dout("%s have %u\n", __func__, osdc->osdmap->epoch);
5aea3dcd 2937 down_write(&osdc->lock);
f24e9980
SW
2938
2939 /* verify fsid */
2940 ceph_decode_need(&p, end, sizeof(fsid), bad);
2941 ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d 2942 if (ceph_check_fsid(osdc->client, &fsid) < 0)
42c1b124 2943 goto bad;
f24e9980 2944
42c1b124
ID
2945 was_pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
2946 was_pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
2947 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
2948 have_pool_full(osdc);
9a1ea2db 2949
f24e9980
SW
2950 /* incremental maps */
2951 ceph_decode_32_safe(&p, end, nr_maps, bad);
2952 dout(" %d inc maps\n", nr_maps);
2953 while (nr_maps > 0) {
2954 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
2955 epoch = ceph_decode_32(&p);
2956 maplen = ceph_decode_32(&p);
f24e9980 2957 ceph_decode_need(&p, end, maplen, bad);
42c1b124
ID
2958 if (osdc->osdmap->epoch &&
2959 osdc->osdmap->epoch + 1 == epoch) {
f24e9980
SW
2960 dout("applying incremental map %u len %d\n",
2961 epoch, maplen);
5aea3dcd
ID
2962 err = handle_one_map(osdc, p, p + maplen, true,
2963 &need_resend, &need_resend_linger);
42c1b124 2964 if (err)
f24e9980 2965 goto bad;
42c1b124 2966 handled_incremental = true;
f24e9980
SW
2967 } else {
2968 dout("ignoring incremental map %u len %d\n",
2969 epoch, maplen);
2970 }
42c1b124 2971 p += maplen;
f24e9980
SW
2972 nr_maps--;
2973 }
42c1b124 2974 if (handled_incremental)
f24e9980
SW
2975 goto done;
2976
2977 /* full maps */
2978 ceph_decode_32_safe(&p, end, nr_maps, bad);
2979 dout(" %d full maps\n", nr_maps);
2980 while (nr_maps) {
2981 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
2982 epoch = ceph_decode_32(&p);
2983 maplen = ceph_decode_32(&p);
f24e9980
SW
2984 ceph_decode_need(&p, end, maplen, bad);
2985 if (nr_maps > 1) {
2986 dout("skipping non-latest full map %u len %d\n",
2987 epoch, maplen);
e5253a7b 2988 } else if (osdc->osdmap->epoch >= epoch) {
f24e9980
SW
2989 dout("skipping full map %u len %d, "
2990 "older than our %u\n", epoch, maplen,
2991 osdc->osdmap->epoch);
2992 } else {
2993 dout("taking full map %u len %d\n", epoch, maplen);
5aea3dcd
ID
2994 err = handle_one_map(osdc, p, p + maplen, false,
2995 &need_resend, &need_resend_linger);
42c1b124 2996 if (err)
f24e9980 2997 goto bad;
f24e9980
SW
2998 }
2999 p += maplen;
3000 nr_maps--;
3001 }
3002
3003done:
cd634fb6
SW
3004 /*
3005 * subscribe to subsequent osdmap updates if full to ensure
3006 * we find out when we are no longer full and stop returning
3007 * ENOSPC.
3008 */
42c1b124
ID
3009 pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
3010 pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
3011 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
3012 have_pool_full(osdc);
3013 if (was_pauserd || was_pausewr || pauserd || pausewr)
3014 maybe_request_map(osdc);
cd634fb6 3015
5aea3dcd 3016 kick_requests(osdc, &need_resend, &need_resend_linger);
42c1b124
ID
3017
3018 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
3019 osdc->osdmap->epoch);
5aea3dcd 3020 up_write(&osdc->lock);
03066f23 3021 wake_up_all(&osdc->client->auth_wq);
f24e9980
SW
3022 return;
3023
3024bad:
3025 pr_err("osdc handle_map corrupt msg\n");
9ec7cab1 3026 ceph_msg_dump(msg);
5aea3dcd
ID
3027 up_write(&osdc->lock);
3028}
3029
3030/*
3031 * Resubmit requests pending on the given osd.
3032 */
3033static void kick_osd_requests(struct ceph_osd *osd)
3034{
3035 struct rb_node *n;
3036
922dab61 3037 for (n = rb_first(&osd->o_requests); n; ) {
5aea3dcd
ID
3038 struct ceph_osd_request *req =
3039 rb_entry(n, struct ceph_osd_request, r_node);
3040
922dab61
ID
3041 n = rb_next(n); /* cancel_linger_request() */
3042
5aea3dcd
ID
3043 if (!req->r_linger) {
3044 if (!req->r_t.paused)
3045 send_request(req);
922dab61
ID
3046 } else {
3047 cancel_linger_request(req);
5aea3dcd
ID
3048 }
3049 }
922dab61
ID
3050 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
3051 struct ceph_osd_linger_request *lreq =
3052 rb_entry(n, struct ceph_osd_linger_request, node);
3053
3054 send_linger(lreq);
3055 }
5aea3dcd
ID
3056}
3057
3058/*
3059 * If the osd connection drops, we need to resubmit all requests.
3060 */
3061static void osd_fault(struct ceph_connection *con)
3062{
3063 struct ceph_osd *osd = con->private;
3064 struct ceph_osd_client *osdc = osd->o_osdc;
3065
3066 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
3067
3068 down_write(&osdc->lock);
3069 if (!osd_registered(osd)) {
3070 dout("%s osd%d unknown\n", __func__, osd->o_osd);
3071 goto out_unlock;
3072 }
3073
3074 if (!reopen_osd(osd))
3075 kick_osd_requests(osd);
3076 maybe_request_map(osdc);
3077
3078out_unlock:
3079 up_write(&osdc->lock);
f24e9980
SW
3080}
3081
a40c4f10
YS
3082/*
3083 * Process osd watch notifications
3084 */
3c663bbd
AE
3085static void handle_watch_notify(struct ceph_osd_client *osdc,
3086 struct ceph_msg *msg)
a40c4f10 3087{
922dab61
ID
3088 void *p = msg->front.iov_base;
3089 void *const end = p + msg->front.iov_len;
3090 struct ceph_osd_linger_request *lreq;
3091 struct linger_work *lwork;
3092 u8 proto_ver, opcode;
3093 u64 cookie, notify_id;
3094 u64 notifier_id = 0;
19079203 3095 s32 return_code = 0;
922dab61
ID
3096 void *payload = NULL;
3097 u32 payload_len = 0;
a40c4f10
YS
3098
3099 ceph_decode_8_safe(&p, end, proto_ver, bad);
3100 ceph_decode_8_safe(&p, end, opcode, bad);
3101 ceph_decode_64_safe(&p, end, cookie, bad);
922dab61 3102 p += 8; /* skip ver */
a40c4f10
YS
3103 ceph_decode_64_safe(&p, end, notify_id, bad);
3104
922dab61
ID
3105 if (proto_ver >= 1) {
3106 ceph_decode_32_safe(&p, end, payload_len, bad);
3107 ceph_decode_need(&p, end, payload_len, bad);
3108 payload = p;
3109 p += payload_len;
3110 }
3111
3112 if (le16_to_cpu(msg->hdr.version) >= 2)
19079203 3113 ceph_decode_32_safe(&p, end, return_code, bad);
922dab61
ID
3114
3115 if (le16_to_cpu(msg->hdr.version) >= 3)
3116 ceph_decode_64_safe(&p, end, notifier_id, bad);
3117
3118 down_read(&osdc->lock);
3119 lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
3120 if (!lreq) {
3121 dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
3122 cookie);
3123 goto out_unlock_osdc;
3124 }
3125
3126 mutex_lock(&lreq->lock);
19079203
ID
3127 dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
3128 opcode, cookie, lreq, lreq->is_watch);
922dab61
ID
3129 if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
3130 if (!lreq->last_error) {
3131 lreq->last_error = -ENOTCONN;
3132 queue_watch_error(lreq);
3133 }
19079203
ID
3134 } else if (!lreq->is_watch) {
3135 /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
3136 if (lreq->notify_id && lreq->notify_id != notify_id) {
3137 dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
3138 lreq->notify_id, notify_id);
3139 } else if (!completion_done(&lreq->notify_finish_wait)) {
3140 struct ceph_msg_data *data =
3141 list_first_entry_or_null(&msg->data,
3142 struct ceph_msg_data,
3143 links);
3144
3145 if (data) {
3146 if (lreq->preply_pages) {
3147 WARN_ON(data->type !=
3148 CEPH_MSG_DATA_PAGES);
3149 *lreq->preply_pages = data->pages;
3150 *lreq->preply_len = data->length;
3151 } else {
3152 ceph_release_page_vector(data->pages,
3153 calc_pages_for(0, data->length));
3154 }
3155 }
3156 lreq->notify_finish_error = return_code;
3157 complete_all(&lreq->notify_finish_wait);
3158 }
922dab61
ID
3159 } else {
3160 /* CEPH_WATCH_EVENT_NOTIFY */
3161 lwork = lwork_alloc(lreq, do_watch_notify);
3162 if (!lwork) {
3163 pr_err("failed to allocate notify-lwork\n");
3164 goto out_unlock_lreq;
a40c4f10 3165 }
a40c4f10 3166
922dab61
ID
3167 lwork->notify.notify_id = notify_id;
3168 lwork->notify.notifier_id = notifier_id;
3169 lwork->notify.payload = payload;
3170 lwork->notify.payload_len = payload_len;
3171 lwork->notify.msg = ceph_msg_get(msg);
3172 lwork_queue(lwork);
91883cd2 3173 }
a40c4f10 3174
922dab61
ID
3175out_unlock_lreq:
3176 mutex_unlock(&lreq->lock);
3177out_unlock_osdc:
3178 up_read(&osdc->lock);
a40c4f10
YS
3179 return;
3180
3181bad:
3182 pr_err("osdc handle_watch_notify corrupt msg\n");
a40c4f10
YS
3183}
3184
70636773
AE
3185/*
3186 * Register request, send initial attempt.
3187 */
3188int ceph_osdc_start_request(struct ceph_osd_client *osdc,
3189 struct ceph_osd_request *req,
3190 bool nofail)
3191{
5aea3dcd
ID
3192 down_read(&osdc->lock);
3193 submit_request(req, false);
3194 up_read(&osdc->lock);
0bbfdfe8 3195
5aea3dcd 3196 return 0;
f24e9980 3197}
3d14c5d2 3198EXPORT_SYMBOL(ceph_osdc_start_request);
f24e9980 3199
c9f9b93d
ID
3200/*
3201 * Unregister a registered request. The request is not completed (i.e.
3202 * no callbacks or wakeups) - higher layers are supposed to know what
3203 * they are canceling.
3204 */
3205void ceph_osdc_cancel_request(struct ceph_osd_request *req)
3206{
3207 struct ceph_osd_client *osdc = req->r_osdc;
3208
5aea3dcd 3209 down_write(&osdc->lock);
5aea3dcd
ID
3210 if (req->r_osd)
3211 cancel_request(req);
3212 up_write(&osdc->lock);
c9f9b93d
ID
3213}
3214EXPORT_SYMBOL(ceph_osdc_cancel_request);
3215
f24e9980 3216/*
42b06965 3217 * @timeout: in jiffies, 0 means "wait forever"
f24e9980 3218 */
42b06965
ID
3219static int wait_request_timeout(struct ceph_osd_request *req,
3220 unsigned long timeout)
f24e9980 3221{
42b06965 3222 long left;
c9f9b93d 3223
42b06965
ID
3224 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3225 left = wait_for_completion_interruptible_timeout(&req->r_completion,
3226 ceph_timeout_jiffies(timeout));
3227 if (left <= 0) {
3228 left = left ?: -ETIMEDOUT;
c9f9b93d 3229 ceph_osdc_cancel_request(req);
fe5da05e
ID
3230
3231 /* kludge - need to to wake ceph_osdc_sync() */
3232 complete_all(&req->r_safe_completion);
42b06965
ID
3233 } else {
3234 left = req->r_result; /* completed */
f24e9980
SW
3235 }
3236
42b06965
ID
3237 return left;
3238}
3239
3240/*
3241 * wait for a request to complete
3242 */
3243int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
3244 struct ceph_osd_request *req)
3245{
3246 return wait_request_timeout(req, 0);
f24e9980 3247}
3d14c5d2 3248EXPORT_SYMBOL(ceph_osdc_wait_request);
f24e9980
SW
3249
3250/*
3251 * sync - wait for all in-flight requests to flush. avoid starvation.
3252 */
3253void ceph_osdc_sync(struct ceph_osd_client *osdc)
3254{
5aea3dcd
ID
3255 struct rb_node *n, *p;
3256 u64 last_tid = atomic64_read(&osdc->last_tid);
f24e9980 3257
5aea3dcd
ID
3258again:
3259 down_read(&osdc->lock);
3260 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3261 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3262
3263 mutex_lock(&osd->lock);
3264 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
3265 struct ceph_osd_request *req =
3266 rb_entry(p, struct ceph_osd_request, r_node);
3267
3268 if (req->r_tid > last_tid)
3269 break;
3270
3271 if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
3272 continue;
f24e9980 3273
5aea3dcd
ID
3274 ceph_osdc_get_request(req);
3275 mutex_unlock(&osd->lock);
3276 up_read(&osdc->lock);
3277 dout("%s waiting on req %p tid %llu last_tid %llu\n",
3278 __func__, req, req->r_tid, last_tid);
3279 wait_for_completion(&req->r_safe_completion);
3280 ceph_osdc_put_request(req);
3281 goto again;
3282 }
f24e9980 3283
5aea3dcd 3284 mutex_unlock(&osd->lock);
f24e9980 3285 }
5aea3dcd
ID
3286
3287 up_read(&osdc->lock);
3288 dout("%s done last_tid %llu\n", __func__, last_tid);
f24e9980 3289}
3d14c5d2 3290EXPORT_SYMBOL(ceph_osdc_sync);
f24e9980 3291
922dab61
ID
3292static struct ceph_osd_request *
3293alloc_linger_request(struct ceph_osd_linger_request *lreq)
3294{
3295 struct ceph_osd_request *req;
3296
3297 req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
3298 if (!req)
3299 return NULL;
3300
3301 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
3302 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
3303
3304 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
3305 ceph_osdc_put_request(req);
3306 return NULL;
3307 }
3308
3309 return req;
3310}
3311
3312/*
3313 * Returns a handle, caller owns a ref.
3314 */
3315struct ceph_osd_linger_request *
3316ceph_osdc_watch(struct ceph_osd_client *osdc,
3317 struct ceph_object_id *oid,
3318 struct ceph_object_locator *oloc,
3319 rados_watchcb2_t wcb,
3320 rados_watcherrcb_t errcb,
3321 void *data)
3322{
3323 struct ceph_osd_linger_request *lreq;
3324 int ret;
3325
3326 lreq = linger_alloc(osdc);
3327 if (!lreq)
3328 return ERR_PTR(-ENOMEM);
3329
19079203 3330 lreq->is_watch = true;
922dab61
ID
3331 lreq->wcb = wcb;
3332 lreq->errcb = errcb;
3333 lreq->data = data;
b07d3c4b 3334 lreq->watch_valid_thru = jiffies;
922dab61
ID
3335
3336 ceph_oid_copy(&lreq->t.base_oid, oid);
3337 ceph_oloc_copy(&lreq->t.base_oloc, oloc);
3338 lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
3339 lreq->mtime = CURRENT_TIME;
3340
3341 lreq->reg_req = alloc_linger_request(lreq);
3342 if (!lreq->reg_req) {
3343 ret = -ENOMEM;
3344 goto err_put_lreq;
3345 }
3346
3347 lreq->ping_req = alloc_linger_request(lreq);
3348 if (!lreq->ping_req) {
3349 ret = -ENOMEM;
3350 goto err_put_lreq;
3351 }
3352
3353 down_write(&osdc->lock);
3354 linger_register(lreq); /* before osd_req_op_* */
3355 osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
3356 CEPH_OSD_WATCH_OP_WATCH);
3357 osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
3358 CEPH_OSD_WATCH_OP_PING);
3359 linger_submit(lreq);
3360 up_write(&osdc->lock);
3361
3362 ret = linger_reg_commit_wait(lreq);
3363 if (ret) {
3364 linger_cancel(lreq);
3365 goto err_put_lreq;
3366 }
3367
3368 return lreq;
3369
3370err_put_lreq:
3371 linger_put(lreq);
3372 return ERR_PTR(ret);
3373}
3374EXPORT_SYMBOL(ceph_osdc_watch);
3375
3376/*
3377 * Releases a ref.
3378 *
3379 * Times out after mount_timeout to preserve rbd unmap behaviour
3380 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
3381 * with mount_timeout").
3382 */
3383int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
3384 struct ceph_osd_linger_request *lreq)
3385{
3386 struct ceph_options *opts = osdc->client->options;
3387 struct ceph_osd_request *req;
3388 int ret;
3389
3390 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
3391 if (!req)
3392 return -ENOMEM;
3393
3394 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
3395 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
3396 req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
3397 req->r_mtime = CURRENT_TIME;
3398 osd_req_op_watch_init(req, 0, lreq->linger_id,
3399 CEPH_OSD_WATCH_OP_UNWATCH);
3400
3401 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3402 if (ret)
3403 goto out_put_req;
3404
3405 ceph_osdc_start_request(osdc, req, false);
3406 linger_cancel(lreq);
3407 linger_put(lreq);
3408 ret = wait_request_timeout(req, opts->mount_timeout);
3409
3410out_put_req:
3411 ceph_osdc_put_request(req);
3412 return ret;
3413}
3414EXPORT_SYMBOL(ceph_osdc_unwatch);
3415
3416static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
3417 u64 notify_id, u64 cookie, void *payload,
3418 size_t payload_len)
3419{
3420 struct ceph_osd_req_op *op;
3421 struct ceph_pagelist *pl;
3422 int ret;
3423
3424 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
3425
3426 pl = kmalloc(sizeof(*pl), GFP_NOIO);
3427 if (!pl)
3428 return -ENOMEM;
3429
3430 ceph_pagelist_init(pl);
3431 ret = ceph_pagelist_encode_64(pl, notify_id);
3432 ret |= ceph_pagelist_encode_64(pl, cookie);
3433 if (payload) {
3434 ret |= ceph_pagelist_encode_32(pl, payload_len);
3435 ret |= ceph_pagelist_append(pl, payload, payload_len);
3436 } else {
3437 ret |= ceph_pagelist_encode_32(pl, 0);
3438 }
3439 if (ret) {
3440 ceph_pagelist_release(pl);
3441 return -ENOMEM;
3442 }
3443
3444 ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
3445 op->indata_len = pl->length;
3446 return 0;
3447}
3448
3449int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
3450 struct ceph_object_id *oid,
3451 struct ceph_object_locator *oloc,
3452 u64 notify_id,
3453 u64 cookie,
3454 void *payload,
3455 size_t payload_len)
3456{
3457 struct ceph_osd_request *req;
3458 int ret;
3459
3460 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
3461 if (!req)
3462 return -ENOMEM;
3463
3464 ceph_oid_copy(&req->r_base_oid, oid);
3465 ceph_oloc_copy(&req->r_base_oloc, oloc);
3466 req->r_flags = CEPH_OSD_FLAG_READ;
3467
3468 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3469 if (ret)
3470 goto out_put_req;
3471
3472 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
3473 payload_len);
3474 if (ret)
3475 goto out_put_req;
3476
3477 ceph_osdc_start_request(osdc, req, false);
3478 ret = ceph_osdc_wait_request(osdc, req);
3479
3480out_put_req:
3481 ceph_osdc_put_request(req);
3482 return ret;
3483}
3484EXPORT_SYMBOL(ceph_osdc_notify_ack);
3485
19079203
ID
3486static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
3487 u64 cookie, u32 prot_ver, u32 timeout,
3488 void *payload, size_t payload_len)
3489{
3490 struct ceph_osd_req_op *op;
3491 struct ceph_pagelist *pl;
3492 int ret;
3493
3494 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
3495 op->notify.cookie = cookie;
3496
3497 pl = kmalloc(sizeof(*pl), GFP_NOIO);
3498 if (!pl)
3499 return -ENOMEM;
3500
3501 ceph_pagelist_init(pl);
3502 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
3503 ret |= ceph_pagelist_encode_32(pl, timeout);
3504 ret |= ceph_pagelist_encode_32(pl, payload_len);
3505 ret |= ceph_pagelist_append(pl, payload, payload_len);
3506 if (ret) {
3507 ceph_pagelist_release(pl);
3508 return -ENOMEM;
3509 }
3510
3511 ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
3512 op->indata_len = pl->length;
3513 return 0;
3514}
3515
3516/*
3517 * @timeout: in seconds
3518 *
3519 * @preply_{pages,len} are initialized both on success and error.
3520 * The caller is responsible for:
3521 *
3522 * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
3523 */
3524int ceph_osdc_notify(struct ceph_osd_client *osdc,
3525 struct ceph_object_id *oid,
3526 struct ceph_object_locator *oloc,
3527 void *payload,
3528 size_t payload_len,
3529 u32 timeout,
3530 struct page ***preply_pages,
3531 size_t *preply_len)
3532{
3533 struct ceph_osd_linger_request *lreq;
3534 struct page **pages;
3535 int ret;
3536
3537 WARN_ON(!timeout);
3538 if (preply_pages) {
3539 *preply_pages = NULL;
3540 *preply_len = 0;
3541 }
3542
3543 lreq = linger_alloc(osdc);
3544 if (!lreq)
3545 return -ENOMEM;
3546
3547 lreq->preply_pages = preply_pages;
3548 lreq->preply_len = preply_len;
3549
3550 ceph_oid_copy(&lreq->t.base_oid, oid);
3551 ceph_oloc_copy(&lreq->t.base_oloc, oloc);
3552 lreq->t.flags = CEPH_OSD_FLAG_READ;
3553
3554 lreq->reg_req = alloc_linger_request(lreq);
3555 if (!lreq->reg_req) {
3556 ret = -ENOMEM;
3557 goto out_put_lreq;
3558 }
3559
3560 /* for notify_id */
3561 pages = ceph_alloc_page_vector(1, GFP_NOIO);
3562 if (IS_ERR(pages)) {
3563 ret = PTR_ERR(pages);
3564 goto out_put_lreq;
3565 }
3566
3567 down_write(&osdc->lock);
3568 linger_register(lreq); /* before osd_req_op_* */
3569 ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
3570 timeout, payload, payload_len);
3571 if (ret) {
3572 linger_unregister(lreq);
3573 up_write(&osdc->lock);
3574 ceph_release_page_vector(pages, 1);
3575 goto out_put_lreq;
3576 }
3577 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
3578 response_data),
3579 pages, PAGE_SIZE, 0, false, true);
3580 linger_submit(lreq);
3581 up_write(&osdc->lock);
3582
3583 ret = linger_reg_commit_wait(lreq);
3584 if (!ret)
3585 ret = linger_notify_finish_wait(lreq);
3586 else
3587 dout("lreq %p failed to initiate notify %d\n", lreq, ret);
3588
3589 linger_cancel(lreq);
3590out_put_lreq:
3591 linger_put(lreq);
3592 return ret;
3593}
3594EXPORT_SYMBOL(ceph_osdc_notify);
3595
b07d3c4b
ID
3596/*
3597 * Return the number of milliseconds since the watch was last
3598 * confirmed, or an error. If there is an error, the watch is no
3599 * longer valid, and should be destroyed with ceph_osdc_unwatch().
3600 */
3601int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
3602 struct ceph_osd_linger_request *lreq)
3603{
3604 unsigned long stamp, age;
3605 int ret;
3606
3607 down_read(&osdc->lock);
3608 mutex_lock(&lreq->lock);
3609 stamp = lreq->watch_valid_thru;
3610 if (!list_empty(&lreq->pending_lworks)) {
3611 struct linger_work *lwork =
3612 list_first_entry(&lreq->pending_lworks,
3613 struct linger_work,
3614 pending_item);
3615
3616 if (time_before(lwork->queued_stamp, stamp))
3617 stamp = lwork->queued_stamp;
3618 }
3619 age = jiffies - stamp;
3620 dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
3621 lreq, lreq->linger_id, age, lreq->last_error);
3622 /* we are truncating to msecs, so return a safe upper bound */
3623 ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
3624
3625 mutex_unlock(&lreq->lock);
3626 up_read(&osdc->lock);
3627 return ret;
3628}
3629
dd935f44
JD
3630/*
3631 * Call all pending notify callbacks - for use after a watch is
3632 * unregistered, to make sure no more callbacks for it will be invoked
3633 */
f6479449 3634void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
dd935f44
JD
3635{
3636 flush_workqueue(osdc->notify_wq);
3637}
3638EXPORT_SYMBOL(ceph_osdc_flush_notifies);
3639
3640
f24e9980
SW
3641/*
3642 * init, shutdown
3643 */
3644int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
3645{
3646 int err;
3647
3648 dout("init\n");
3649 osdc->client = client;
5aea3dcd 3650 init_rwsem(&osdc->lock);
f24e9980 3651 osdc->osds = RB_ROOT;
f5a2041b 3652 INIT_LIST_HEAD(&osdc->osd_lru);
9dd2845c 3653 spin_lock_init(&osdc->osd_lru_lock);
5aea3dcd
ID
3654 osd_init(&osdc->homeless_osd);
3655 osdc->homeless_osd.o_osdc = osdc;
3656 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
922dab61 3657 osdc->linger_requests = RB_ROOT;
f24e9980 3658 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
f5a2041b
YS
3659 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
3660
5f44f142 3661 err = -ENOMEM;
e5253a7b
ID
3662 osdc->osdmap = ceph_osdmap_alloc();
3663 if (!osdc->osdmap)
3664 goto out;
3665
9e767adb
ID
3666 osdc->req_mempool = mempool_create_slab_pool(10,
3667 ceph_osd_request_cache);
f24e9980 3668 if (!osdc->req_mempool)
e5253a7b 3669 goto out_map;
f24e9980 3670
d50b409f 3671 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
711da55d 3672 PAGE_SIZE, 10, true, "osd_op");
f24e9980 3673 if (err < 0)
5f44f142 3674 goto out_mempool;
d50b409f 3675 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
711da55d 3676 PAGE_SIZE, 10, true, "osd_op_reply");
c16e7869
SW
3677 if (err < 0)
3678 goto out_msgpool;
a40c4f10 3679
dbcae088 3680 err = -ENOMEM;
a40c4f10 3681 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
dbcae088 3682 if (!osdc->notify_wq)
c172ec5c
ID
3683 goto out_msgpool_reply;
3684
fbca9635
ID
3685 schedule_delayed_work(&osdc->timeout_work,
3686 osdc->client->options->osd_keepalive_timeout);
b37ee1b9
ID
3687 schedule_delayed_work(&osdc->osds_timeout_work,
3688 round_jiffies_relative(osdc->client->options->osd_idle_ttl));
3689
f24e9980 3690 return 0;
5f44f142 3691
c172ec5c
ID
3692out_msgpool_reply:
3693 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
c16e7869
SW
3694out_msgpool:
3695 ceph_msgpool_destroy(&osdc->msgpool_op);
5f44f142
SW
3696out_mempool:
3697 mempool_destroy(osdc->req_mempool);
e5253a7b
ID
3698out_map:
3699 ceph_osdmap_destroy(osdc->osdmap);
5f44f142
SW
3700out:
3701 return err;
f24e9980
SW
3702}
3703
3704void ceph_osdc_stop(struct ceph_osd_client *osdc)
3705{
a40c4f10
YS
3706 flush_workqueue(osdc->notify_wq);
3707 destroy_workqueue(osdc->notify_wq);
f24e9980 3708 cancel_delayed_work_sync(&osdc->timeout_work);
f5a2041b 3709 cancel_delayed_work_sync(&osdc->osds_timeout_work);
42a2c09f 3710
5aea3dcd 3711 down_write(&osdc->lock);
42a2c09f
ID
3712 while (!RB_EMPTY_ROOT(&osdc->osds)) {
3713 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
3714 struct ceph_osd, o_node);
5aea3dcd 3715 close_osd(osd);
42a2c09f 3716 }
5aea3dcd
ID
3717 up_write(&osdc->lock);
3718 WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
3719 osd_cleanup(&osdc->homeless_osd);
3720
3721 WARN_ON(!list_empty(&osdc->osd_lru));
922dab61 3722 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5aea3dcd
ID
3723 WARN_ON(atomic_read(&osdc->num_requests));
3724 WARN_ON(atomic_read(&osdc->num_homeless));
42a2c09f 3725
e5253a7b 3726 ceph_osdmap_destroy(osdc->osdmap);
f24e9980
SW
3727 mempool_destroy(osdc->req_mempool);
3728 ceph_msgpool_destroy(&osdc->msgpool_op);
c16e7869 3729 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
f24e9980
SW
3730}
3731
3732/*
3733 * Read some contiguous pages. If we cross a stripe boundary, shorten
3734 * *plen. Return number of bytes read, or error.
3735 */
3736int ceph_osdc_readpages(struct ceph_osd_client *osdc,
3737 struct ceph_vino vino, struct ceph_file_layout *layout,
3738 u64 off, u64 *plen,
3739 u32 truncate_seq, u64 truncate_size,
b7495fc2 3740 struct page **pages, int num_pages, int page_align)
f24e9980
SW
3741{
3742 struct ceph_osd_request *req;
3743 int rc = 0;
3744
3745 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
3746 vino.snap, off, *plen);
715e4cd4 3747 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
f24e9980 3748 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
acead002 3749 NULL, truncate_seq, truncate_size,
153e5167 3750 false);
6816282d
SW
3751 if (IS_ERR(req))
3752 return PTR_ERR(req);
f24e9980
SW
3753
3754 /* it may be a short read due to an object boundary */
406e2c9f 3755 osd_req_op_extent_osd_data_pages(req, 0,
a4ce40a9 3756 pages, *plen, page_align, false, false);
f24e9980 3757
e0c59487 3758 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
43bfe5de 3759 off, *plen, *plen, page_align);
f24e9980
SW
3760
3761 rc = ceph_osdc_start_request(osdc, req, false);
3762 if (!rc)
3763 rc = ceph_osdc_wait_request(osdc, req);
3764
3765 ceph_osdc_put_request(req);
3766 dout("readpages result %d\n", rc);
3767 return rc;
3768}
3d14c5d2 3769EXPORT_SYMBOL(ceph_osdc_readpages);
f24e9980
SW
3770
3771/*
3772 * do a synchronous write on N pages
3773 */
3774int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
3775 struct ceph_file_layout *layout,
3776 struct ceph_snap_context *snapc,
3777 u64 off, u64 len,
3778 u32 truncate_seq, u64 truncate_size,
3779 struct timespec *mtime,
24808826 3780 struct page **pages, int num_pages)
f24e9980
SW
3781{
3782 struct ceph_osd_request *req;
3783 int rc = 0;
b7495fc2 3784 int page_align = off & ~PAGE_MASK;
f24e9980 3785
715e4cd4 3786 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
f24e9980 3787 CEPH_OSD_OP_WRITE,
24808826 3788 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
acead002 3789 snapc, truncate_seq, truncate_size,
153e5167 3790 true);
6816282d
SW
3791 if (IS_ERR(req))
3792 return PTR_ERR(req);
f24e9980
SW
3793
3794 /* it may be a short write due to an object boundary */
406e2c9f 3795 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
43bfe5de
AE
3796 false, false);
3797 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
f24e9980 3798
bb873b53 3799 req->r_mtime = *mtime;
87f979d3 3800 rc = ceph_osdc_start_request(osdc, req, true);
f24e9980
SW
3801 if (!rc)
3802 rc = ceph_osdc_wait_request(osdc, req);
3803
3804 ceph_osdc_put_request(req);
3805 if (rc == 0)
3806 rc = len;
3807 dout("writepages result %d\n", rc);
3808 return rc;
3809}
3d14c5d2 3810EXPORT_SYMBOL(ceph_osdc_writepages);
f24e9980 3811
5522ae0b
AE
3812int ceph_osdc_setup(void)
3813{
3f1af42a
ID
3814 size_t size = sizeof(struct ceph_osd_request) +
3815 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
3816
5522ae0b 3817 BUG_ON(ceph_osd_request_cache);
3f1af42a
ID
3818 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
3819 0, 0, NULL);
5522ae0b
AE
3820
3821 return ceph_osd_request_cache ? 0 : -ENOMEM;
3822}
3823EXPORT_SYMBOL(ceph_osdc_setup);
3824
3825void ceph_osdc_cleanup(void)
3826{
3827 BUG_ON(!ceph_osd_request_cache);
3828 kmem_cache_destroy(ceph_osd_request_cache);
3829 ceph_osd_request_cache = NULL;
3830}
3831EXPORT_SYMBOL(ceph_osdc_cleanup);
3832
f24e9980
SW
3833/*
3834 * handle incoming message
3835 */
3836static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3837{
3838 struct ceph_osd *osd = con->private;
5aea3dcd 3839 struct ceph_osd_client *osdc = osd->o_osdc;
f24e9980
SW
3840 int type = le16_to_cpu(msg->hdr.type);
3841
f24e9980
SW
3842 switch (type) {
3843 case CEPH_MSG_OSD_MAP:
3844 ceph_osdc_handle_map(osdc, msg);
3845 break;
3846 case CEPH_MSG_OSD_OPREPLY:
5aea3dcd 3847 handle_reply(osd, msg);
f24e9980 3848 break;
a40c4f10
YS
3849 case CEPH_MSG_WATCH_NOTIFY:
3850 handle_watch_notify(osdc, msg);
3851 break;
f24e9980
SW
3852
3853 default:
3854 pr_err("received unknown message type %d %s\n", type,
3855 ceph_msg_type_name(type));
3856 }
5aea3dcd 3857
f24e9980
SW
3858 ceph_msg_put(msg);
3859}
3860
5b3a4db3 3861/*
d15f9d69
ID
3862 * Lookup and return message for incoming reply. Don't try to do
3863 * anything about a larger than preallocated data portion of the
3864 * message at the moment - for now, just skip the message.
5b3a4db3
SW
3865 */
3866static struct ceph_msg *get_reply(struct ceph_connection *con,
2450418c
YS
3867 struct ceph_msg_header *hdr,
3868 int *skip)
f24e9980
SW
3869{
3870 struct ceph_osd *osd = con->private;
3871 struct ceph_osd_client *osdc = osd->o_osdc;
5aea3dcd 3872 struct ceph_msg *m = NULL;
0547a9b3 3873 struct ceph_osd_request *req;
3f0a4ac5 3874 int front_len = le32_to_cpu(hdr->front_len);
5b3a4db3 3875 int data_len = le32_to_cpu(hdr->data_len);
5aea3dcd 3876 u64 tid = le64_to_cpu(hdr->tid);
f24e9980 3877
5aea3dcd
ID
3878 down_read(&osdc->lock);
3879 if (!osd_registered(osd)) {
3880 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
3881 *skip = 1;
3882 goto out_unlock_osdc;
3883 }
3884 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
3885
3886 mutex_lock(&osd->lock);
3887 req = lookup_request(&osd->o_requests, tid);
0547a9b3 3888 if (!req) {
cd8140c6
ID
3889 dout("%s osd%d tid %llu unknown, skipping\n", __func__,
3890 osd->o_osd, tid);
d15f9d69 3891 *skip = 1;
5aea3dcd 3892 goto out_unlock_session;
0547a9b3 3893 }
c16e7869 3894
ace6d3a9 3895 ceph_msg_revoke_incoming(req->r_reply);
0547a9b3 3896
f2be82b0 3897 if (front_len > req->r_reply->front_alloc_len) {
d15f9d69
ID
3898 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
3899 __func__, osd->o_osd, req->r_tid, front_len,
3900 req->r_reply->front_alloc_len);
3f0a4ac5
ID
3901 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
3902 false);
a79832f2 3903 if (!m)
5aea3dcd 3904 goto out_unlock_session;
c16e7869
SW
3905 ceph_msg_put(req->r_reply);
3906 req->r_reply = m;
3907 }
0fff87ec 3908
d15f9d69
ID
3909 if (data_len > req->r_reply->data_length) {
3910 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
3911 __func__, osd->o_osd, req->r_tid, data_len,
3912 req->r_reply->data_length);
3913 m = NULL;
3914 *skip = 1;
5aea3dcd 3915 goto out_unlock_session;
0547a9b3 3916 }
d15f9d69
ID
3917
3918 m = ceph_msg_get(req->r_reply);
c16e7869 3919 dout("get_reply tid %lld %p\n", tid, m);
0547a9b3 3920
5aea3dcd
ID
3921out_unlock_session:
3922 mutex_unlock(&osd->lock);
3923out_unlock_osdc:
3924 up_read(&osdc->lock);
2450418c 3925 return m;
5b3a4db3
SW
3926}
3927
19079203
ID
3928/*
3929 * TODO: switch to a msg-owned pagelist
3930 */
3931static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
3932{
3933 struct ceph_msg *m;
3934 int type = le16_to_cpu(hdr->type);
3935 u32 front_len = le32_to_cpu(hdr->front_len);
3936 u32 data_len = le32_to_cpu(hdr->data_len);
3937
3938 m = ceph_msg_new(type, front_len, GFP_NOIO, false);
3939 if (!m)
3940 return NULL;
3941
3942 if (data_len) {
3943 struct page **pages;
3944 struct ceph_osd_data osd_data;
3945
3946 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
3947 GFP_NOIO);
3948 if (!pages) {
3949 ceph_msg_put(m);
3950 return NULL;
3951 }
3952
3953 ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
3954 false);
3955 ceph_osdc_msg_data_add(m, &osd_data);
3956 }
3957
3958 return m;
3959}
3960
5b3a4db3
SW
3961static struct ceph_msg *alloc_msg(struct ceph_connection *con,
3962 struct ceph_msg_header *hdr,
3963 int *skip)
3964{
3965 struct ceph_osd *osd = con->private;
3966 int type = le16_to_cpu(hdr->type);
5b3a4db3 3967
1c20f2d2 3968 *skip = 0;
5b3a4db3
SW
3969 switch (type) {
3970 case CEPH_MSG_OSD_MAP:
a40c4f10 3971 case CEPH_MSG_WATCH_NOTIFY:
19079203 3972 return alloc_msg_with_page_vector(hdr);
5b3a4db3
SW
3973 case CEPH_MSG_OSD_OPREPLY:
3974 return get_reply(con, hdr, skip);
3975 default:
5aea3dcd
ID
3976 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
3977 osd->o_osd, type);
5b3a4db3
SW
3978 *skip = 1;
3979 return NULL;
3980 }
f24e9980
SW
3981}
3982
3983/*
3984 * Wrappers to refcount containing ceph_osd struct
3985 */
3986static struct ceph_connection *get_osd_con(struct ceph_connection *con)
3987{
3988 struct ceph_osd *osd = con->private;
3989 if (get_osd(osd))
3990 return con;
3991 return NULL;
3992}
3993
3994static void put_osd_con(struct ceph_connection *con)
3995{
3996 struct ceph_osd *osd = con->private;
3997 put_osd(osd);
3998}
3999
4e7a5dcd
SW
4000/*
4001 * authentication
4002 */
a3530df3
AE
4003/*
4004 * Note: returned pointer is the address of a structure that's
4005 * managed separately. Caller must *not* attempt to free it.
4006 */
4007static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
8f43fb53 4008 int *proto, int force_new)
4e7a5dcd
SW
4009{
4010 struct ceph_osd *o = con->private;
4011 struct ceph_osd_client *osdc = o->o_osdc;
4012 struct ceph_auth_client *ac = osdc->client->monc.auth;
74f1869f 4013 struct ceph_auth_handshake *auth = &o->o_auth;
4e7a5dcd 4014
74f1869f 4015 if (force_new && auth->authorizer) {
6c1ea260 4016 ceph_auth_destroy_authorizer(auth->authorizer);
74f1869f
AE
4017 auth->authorizer = NULL;
4018 }
27859f97
SW
4019 if (!auth->authorizer) {
4020 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
4021 auth);
4e7a5dcd 4022 if (ret)
a3530df3 4023 return ERR_PTR(ret);
27859f97
SW
4024 } else {
4025 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
0bed9b5c
SW
4026 auth);
4027 if (ret)
4028 return ERR_PTR(ret);
4e7a5dcd 4029 }
4e7a5dcd 4030 *proto = ac->protocol;
74f1869f 4031
a3530df3 4032 return auth;
4e7a5dcd
SW
4033}
4034
4035
4036static int verify_authorizer_reply(struct ceph_connection *con, int len)
4037{
4038 struct ceph_osd *o = con->private;
4039 struct ceph_osd_client *osdc = o->o_osdc;
4040 struct ceph_auth_client *ac = osdc->client->monc.auth;
4041
27859f97 4042 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
4e7a5dcd
SW
4043}
4044
9bd2e6f8
SW
4045static int invalidate_authorizer(struct ceph_connection *con)
4046{
4047 struct ceph_osd *o = con->private;
4048 struct ceph_osd_client *osdc = o->o_osdc;
4049 struct ceph_auth_client *ac = osdc->client->monc.auth;
4050
27859f97 4051 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
9bd2e6f8
SW
4052 return ceph_monc_validate_auth(&osdc->client->monc);
4053}
4e7a5dcd 4054
79dbd1ba 4055static int osd_sign_message(struct ceph_msg *msg)
33d07337 4056{
79dbd1ba 4057 struct ceph_osd *o = msg->con->private;
33d07337 4058 struct ceph_auth_handshake *auth = &o->o_auth;
79dbd1ba 4059
33d07337
YZ
4060 return ceph_auth_sign_message(auth, msg);
4061}
4062
79dbd1ba 4063static int osd_check_message_signature(struct ceph_msg *msg)
33d07337 4064{
79dbd1ba 4065 struct ceph_osd *o = msg->con->private;
33d07337 4066 struct ceph_auth_handshake *auth = &o->o_auth;
79dbd1ba 4067
33d07337
YZ
4068 return ceph_auth_check_message_signature(auth, msg);
4069}
4070
9e32789f 4071static const struct ceph_connection_operations osd_con_ops = {
f24e9980
SW
4072 .get = get_osd_con,
4073 .put = put_osd_con,
4074 .dispatch = dispatch,
4e7a5dcd
SW
4075 .get_authorizer = get_authorizer,
4076 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 4077 .invalidate_authorizer = invalidate_authorizer,
f24e9980 4078 .alloc_msg = alloc_msg,
79dbd1ba
ID
4079 .sign_message = osd_sign_message,
4080 .check_message_signature = osd_check_message_signature,
5aea3dcd 4081 .fault = osd_fault,
f24e9980 4082};