]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - net/ceph/osd_client.c
libceph: a major OSD client update
[mirror_ubuntu-bionic-kernel.git] / net / ceph / osd_client.c
CommitLineData
a4ce40a9 1
3d14c5d2 2#include <linux/ceph/ceph_debug.h>
f24e9980 3
3d14c5d2 4#include <linux/module.h>
f24e9980
SW
5#include <linux/err.h>
6#include <linux/highmem.h>
7#include <linux/mm.h>
8#include <linux/pagemap.h>
9#include <linux/slab.h>
10#include <linux/uaccess.h>
68b4476b
YS
11#ifdef CONFIG_BLOCK
12#include <linux/bio.h>
13#endif
f24e9980 14
3d14c5d2
YS
15#include <linux/ceph/libceph.h>
16#include <linux/ceph/osd_client.h>
17#include <linux/ceph/messenger.h>
18#include <linux/ceph/decode.h>
19#include <linux/ceph/auth.h>
20#include <linux/ceph/pagelist.h>
f24e9980 21
c16e7869 22#define OSD_OPREPLY_FRONT_LEN 512
0d59ab81 23
5522ae0b
AE
24static struct kmem_cache *ceph_osd_request_cache;
25
9e32789f 26static const struct ceph_connection_operations osd_con_ops;
f24e9980 27
f24e9980
SW
28/*
29 * Implement client access to distributed object storage cluster.
30 *
31 * All data objects are stored within a cluster/cloud of OSDs, or
32 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
33 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
34 * remote daemons serving up and coordinating consistent and safe
35 * access to storage.
36 *
37 * Cluster membership and the mapping of data objects onto storage devices
38 * are described by the osd map.
39 *
40 * We keep track of pending OSD requests (read, write), resubmit
41 * requests to different OSDs when the cluster topology/data layout
42 * change, or retry the affected requests when the communications
43 * channel with an OSD is reset.
44 */
45
5aea3dcd
ID
46static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
47static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
48
49#if 1
50static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
51{
52 bool wrlocked = true;
53
54 if (unlikely(down_read_trylock(sem))) {
55 wrlocked = false;
56 up_read(sem);
57 }
58
59 return wrlocked;
60}
61static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
62{
63 WARN_ON(!rwsem_is_locked(&osdc->lock));
64}
65static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
66{
67 WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
68}
69static inline void verify_osd_locked(struct ceph_osd *osd)
70{
71 struct ceph_osd_client *osdc = osd->o_osdc;
72
73 WARN_ON(!(mutex_is_locked(&osd->lock) &&
74 rwsem_is_locked(&osdc->lock)) &&
75 !rwsem_is_wrlocked(&osdc->lock));
76}
77#else
78static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
79static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
80static inline void verify_osd_locked(struct ceph_osd *osd) { }
81#endif
82
f24e9980
SW
83/*
84 * calculate the mapping of a file extent onto an object, and fill out the
85 * request accordingly. shorten extent as necessary if it crosses an
86 * object boundary.
87 *
88 * fill osd op in request message.
89 */
dbe0fc41 90static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
a19dadfb 91 u64 *objnum, u64 *objoff, u64 *objlen)
f24e9980 92{
60e56f13 93 u64 orig_len = *plen;
d63b77f4 94 int r;
f24e9980 95
60e56f13 96 /* object extent? */
75d1c941
AE
97 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
98 objoff, objlen);
d63b77f4
SW
99 if (r < 0)
100 return r;
75d1c941
AE
101 if (*objlen < orig_len) {
102 *plen = *objlen;
60e56f13
AE
103 dout(" skipping last %llu, final file extent %llu~%llu\n",
104 orig_len - *plen, off, *plen);
105 }
106
75d1c941 107 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
f24e9980 108
3ff5f385 109 return 0;
f24e9980
SW
110}
111
c54d47bf
AE
112static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
113{
114 memset(osd_data, 0, sizeof (*osd_data));
115 osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
116}
117
a4ce40a9 118static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
43bfe5de
AE
119 struct page **pages, u64 length, u32 alignment,
120 bool pages_from_pool, bool own_pages)
121{
122 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
123 osd_data->pages = pages;
124 osd_data->length = length;
125 osd_data->alignment = alignment;
126 osd_data->pages_from_pool = pages_from_pool;
127 osd_data->own_pages = own_pages;
128}
43bfe5de 129
a4ce40a9 130static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
43bfe5de
AE
131 struct ceph_pagelist *pagelist)
132{
133 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
134 osd_data->pagelist = pagelist;
135}
43bfe5de
AE
136
137#ifdef CONFIG_BLOCK
a4ce40a9 138static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
43bfe5de
AE
139 struct bio *bio, size_t bio_length)
140{
141 osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
142 osd_data->bio = bio;
143 osd_data->bio_length = bio_length;
144}
43bfe5de
AE
145#endif /* CONFIG_BLOCK */
146
8a703a38
IC
147#define osd_req_op_data(oreq, whch, typ, fld) \
148({ \
149 struct ceph_osd_request *__oreq = (oreq); \
150 unsigned int __whch = (whch); \
151 BUG_ON(__whch >= __oreq->r_num_ops); \
152 &__oreq->r_ops[__whch].typ.fld; \
153})
863c7eb5 154
49719778
AE
155static struct ceph_osd_data *
156osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
157{
158 BUG_ON(which >= osd_req->r_num_ops);
159
160 return &osd_req->r_ops[which].raw_data_in;
161}
162
a4ce40a9
AE
163struct ceph_osd_data *
164osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
406e2c9f 165 unsigned int which)
a4ce40a9 166{
863c7eb5 167 return osd_req_op_data(osd_req, which, extent, osd_data);
a4ce40a9
AE
168}
169EXPORT_SYMBOL(osd_req_op_extent_osd_data);
170
49719778
AE
171void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
172 unsigned int which, struct page **pages,
173 u64 length, u32 alignment,
174 bool pages_from_pool, bool own_pages)
175{
176 struct ceph_osd_data *osd_data;
177
178 osd_data = osd_req_op_raw_data_in(osd_req, which);
179 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
180 pages_from_pool, own_pages);
181}
182EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
183
a4ce40a9 184void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
406e2c9f
AE
185 unsigned int which, struct page **pages,
186 u64 length, u32 alignment,
a4ce40a9
AE
187 bool pages_from_pool, bool own_pages)
188{
189 struct ceph_osd_data *osd_data;
190
863c7eb5 191 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
a4ce40a9
AE
192 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
193 pages_from_pool, own_pages);
a4ce40a9
AE
194}
195EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
196
197void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
406e2c9f 198 unsigned int which, struct ceph_pagelist *pagelist)
a4ce40a9
AE
199{
200 struct ceph_osd_data *osd_data;
201
863c7eb5 202 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
a4ce40a9 203 ceph_osd_data_pagelist_init(osd_data, pagelist);
a4ce40a9
AE
204}
205EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
206
207#ifdef CONFIG_BLOCK
208void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
406e2c9f 209 unsigned int which, struct bio *bio, size_t bio_length)
a4ce40a9
AE
210{
211 struct ceph_osd_data *osd_data;
863c7eb5
AE
212
213 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
a4ce40a9 214 ceph_osd_data_bio_init(osd_data, bio, bio_length);
a4ce40a9
AE
215}
216EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
217#endif /* CONFIG_BLOCK */
218
219static void osd_req_op_cls_request_info_pagelist(
220 struct ceph_osd_request *osd_req,
221 unsigned int which, struct ceph_pagelist *pagelist)
222{
223 struct ceph_osd_data *osd_data;
224
863c7eb5 225 osd_data = osd_req_op_data(osd_req, which, cls, request_info);
a4ce40a9 226 ceph_osd_data_pagelist_init(osd_data, pagelist);
a4ce40a9
AE
227}
228
04017e29
AE
229void osd_req_op_cls_request_data_pagelist(
230 struct ceph_osd_request *osd_req,
231 unsigned int which, struct ceph_pagelist *pagelist)
232{
233 struct ceph_osd_data *osd_data;
234
863c7eb5 235 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
04017e29 236 ceph_osd_data_pagelist_init(osd_data, pagelist);
bb873b53
ID
237 osd_req->r_ops[which].cls.indata_len += pagelist->length;
238 osd_req->r_ops[which].indata_len += pagelist->length;
04017e29
AE
239}
240EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
241
6c57b554
AE
242void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
243 unsigned int which, struct page **pages, u64 length,
244 u32 alignment, bool pages_from_pool, bool own_pages)
245{
246 struct ceph_osd_data *osd_data;
247
248 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
249 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
250 pages_from_pool, own_pages);
bb873b53
ID
251 osd_req->r_ops[which].cls.indata_len += length;
252 osd_req->r_ops[which].indata_len += length;
6c57b554
AE
253}
254EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
255
a4ce40a9
AE
256void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
257 unsigned int which, struct page **pages, u64 length,
258 u32 alignment, bool pages_from_pool, bool own_pages)
259{
260 struct ceph_osd_data *osd_data;
261
863c7eb5 262 osd_data = osd_req_op_data(osd_req, which, cls, response_data);
a4ce40a9
AE
263 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
264 pages_from_pool, own_pages);
a4ce40a9
AE
265}
266EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
267
23c08a9c
AE
268static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
269{
270 switch (osd_data->type) {
271 case CEPH_OSD_DATA_TYPE_NONE:
272 return 0;
273 case CEPH_OSD_DATA_TYPE_PAGES:
274 return osd_data->length;
275 case CEPH_OSD_DATA_TYPE_PAGELIST:
276 return (u64)osd_data->pagelist->length;
277#ifdef CONFIG_BLOCK
278 case CEPH_OSD_DATA_TYPE_BIO:
279 return (u64)osd_data->bio_length;
280#endif /* CONFIG_BLOCK */
281 default:
282 WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
283 return 0;
284 }
285}
286
c54d47bf
AE
287static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
288{
5476492f 289 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
c54d47bf
AE
290 int num_pages;
291
292 num_pages = calc_pages_for((u64)osd_data->alignment,
293 (u64)osd_data->length);
294 ceph_release_page_vector(osd_data->pages, num_pages);
295 }
5476492f
AE
296 ceph_osd_data_init(osd_data);
297}
298
299static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
300 unsigned int which)
301{
302 struct ceph_osd_req_op *op;
303
304 BUG_ON(which >= osd_req->r_num_ops);
305 op = &osd_req->r_ops[which];
306
307 switch (op->op) {
308 case CEPH_OSD_OP_READ:
309 case CEPH_OSD_OP_WRITE:
e30b7577 310 case CEPH_OSD_OP_WRITEFULL:
5476492f
AE
311 ceph_osd_data_release(&op->extent.osd_data);
312 break;
313 case CEPH_OSD_OP_CALL:
314 ceph_osd_data_release(&op->cls.request_info);
04017e29 315 ceph_osd_data_release(&op->cls.request_data);
5476492f
AE
316 ceph_osd_data_release(&op->cls.response_data);
317 break;
d74b50be
YZ
318 case CEPH_OSD_OP_SETXATTR:
319 case CEPH_OSD_OP_CMPXATTR:
320 ceph_osd_data_release(&op->xattr.osd_data);
321 break;
66ba609f
YZ
322 case CEPH_OSD_OP_STAT:
323 ceph_osd_data_release(&op->raw_data_in);
324 break;
5476492f
AE
325 default:
326 break;
327 }
c54d47bf
AE
328}
329
63244fa1
ID
330/*
331 * Assumes @t is zero-initialized.
332 */
333static void target_init(struct ceph_osd_request_target *t)
334{
335 ceph_oid_init(&t->base_oid);
336 ceph_oloc_init(&t->base_oloc);
337 ceph_oid_init(&t->target_oid);
338 ceph_oloc_init(&t->target_oloc);
339
340 ceph_osds_init(&t->acting);
341 ceph_osds_init(&t->up);
342 t->size = -1;
343 t->min_size = -1;
344
345 t->osd = CEPH_HOMELESS_OSD;
346}
347
348static void target_destroy(struct ceph_osd_request_target *t)
349{
350 ceph_oid_destroy(&t->base_oid);
351 ceph_oid_destroy(&t->target_oid);
352}
353
f24e9980
SW
354/*
355 * requests
356 */
9e94af20 357static void ceph_osdc_release_request(struct kref *kref)
f24e9980 358{
9e94af20
ID
359 struct ceph_osd_request *req = container_of(kref,
360 struct ceph_osd_request, r_kref);
5476492f 361 unsigned int which;
415e49a9 362
9e94af20
ID
363 dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
364 req->r_request, req->r_reply);
6562d661 365 WARN_ON(!RB_EMPTY_NODE(&req->r_node));
6562d661
ID
366 WARN_ON(!list_empty(&req->r_linger_item));
367 WARN_ON(!list_empty(&req->r_linger_osd_item));
368 WARN_ON(req->r_osd);
9e94af20 369
415e49a9
SW
370 if (req->r_request)
371 ceph_msg_put(req->r_request);
5aea3dcd 372 if (req->r_reply)
ab8cb34a 373 ceph_msg_put(req->r_reply);
0fff87ec 374
5476492f
AE
375 for (which = 0; which < req->r_num_ops; which++)
376 osd_req_op_data_release(req, which);
0fff87ec 377
a66dd383 378 target_destroy(&req->r_t);
415e49a9 379 ceph_put_snap_context(req->r_snapc);
d30291b9 380
415e49a9
SW
381 if (req->r_mempool)
382 mempool_free(req, req->r_osdc->req_mempool);
3f1af42a 383 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
5522ae0b 384 kmem_cache_free(ceph_osd_request_cache, req);
3f1af42a
ID
385 else
386 kfree(req);
f24e9980 387}
9e94af20
ID
388
389void ceph_osdc_get_request(struct ceph_osd_request *req)
390{
391 dout("%s %p (was %d)\n", __func__, req,
392 atomic_read(&req->r_kref.refcount));
393 kref_get(&req->r_kref);
394}
395EXPORT_SYMBOL(ceph_osdc_get_request);
396
397void ceph_osdc_put_request(struct ceph_osd_request *req)
398{
3ed97d63
ID
399 if (req) {
400 dout("%s %p (was %d)\n", __func__, req,
401 atomic_read(&req->r_kref.refcount));
402 kref_put(&req->r_kref, ceph_osdc_release_request);
403 }
9e94af20
ID
404}
405EXPORT_SYMBOL(ceph_osdc_put_request);
68b4476b 406
3499e8a5 407struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
f24e9980 408 struct ceph_snap_context *snapc,
1b83bef2 409 unsigned int num_ops,
3499e8a5 410 bool use_mempool,
54a54007 411 gfp_t gfp_flags)
f24e9980
SW
412{
413 struct ceph_osd_request *req;
1b83bef2 414
f24e9980 415 if (use_mempool) {
3f1af42a 416 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
3499e8a5 417 req = mempool_alloc(osdc->req_mempool, gfp_flags);
3f1af42a
ID
418 } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
419 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
f24e9980 420 } else {
3f1af42a
ID
421 BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
422 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
423 gfp_flags);
f24e9980 424 }
3f1af42a 425 if (unlikely(!req))
a79832f2 426 return NULL;
f24e9980 427
3f1af42a
ID
428 /* req only, each op is zeroed in _osd_req_op_init() */
429 memset(req, 0, sizeof(*req));
430
f24e9980
SW
431 req->r_osdc = osdc;
432 req->r_mempool = use_mempool;
79528734 433 req->r_num_ops = num_ops;
84127282
ID
434 req->r_snapid = CEPH_NOSNAP;
435 req->r_snapc = ceph_get_snap_context(snapc);
68b4476b 436
415e49a9 437 kref_init(&req->r_kref);
f24e9980
SW
438 init_completion(&req->r_completion);
439 init_completion(&req->r_safe_completion);
a978fa20 440 RB_CLEAR_NODE(&req->r_node);
f24e9980 441 INIT_LIST_HEAD(&req->r_unsafe_item);
a40c4f10 442 INIT_LIST_HEAD(&req->r_linger_item);
1d0326b1 443 INIT_LIST_HEAD(&req->r_linger_osd_item);
cd43045c 444
a66dd383 445 target_init(&req->r_t);
22116525 446
13d1ad16
ID
447 dout("%s req %p\n", __func__, req);
448 return req;
449}
450EXPORT_SYMBOL(ceph_osdc_alloc_request);
3f1af42a 451
13d1ad16
ID
452int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
453{
454 struct ceph_osd_client *osdc = req->r_osdc;
455 struct ceph_msg *msg;
456 int msg_size;
c16e7869 457
d30291b9
ID
458 WARN_ON(ceph_oid_empty(&req->r_base_oid));
459
13d1ad16 460 /* create request message */
ae458f5a
ID
461 msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
462 msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
463 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
464 msg_size += 1 + 8 + 4 + 4; /* pgid */
13d1ad16
ID
465 msg_size += 4 + req->r_base_oid.name_len; /* oid */
466 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
ae458f5a
ID
467 msg_size += 8; /* snapid */
468 msg_size += 8; /* snap_seq */
13d1ad16 469 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
ae458f5a
ID
470 msg_size += 4; /* retry_attempt */
471
13d1ad16 472 if (req->r_mempool)
8f3bc053 473 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
f24e9980 474 else
13d1ad16
ID
475 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
476 if (!msg)
477 return -ENOMEM;
68b4476b 478
f24e9980 479 memset(msg->front.iov_base, 0, msg->front.iov_len);
3499e8a5 480 req->r_request = msg;
3499e8a5 481
13d1ad16
ID
482 /* create reply message */
483 msg_size = OSD_OPREPLY_FRONT_LEN;
711da55d
ID
484 msg_size += req->r_base_oid.name_len;
485 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
13d1ad16
ID
486
487 if (req->r_mempool)
488 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
489 else
490 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
491 if (!msg)
492 return -ENOMEM;
493
494 req->r_reply = msg;
495
496 return 0;
3499e8a5 497}
13d1ad16 498EXPORT_SYMBOL(ceph_osdc_alloc_messages);
3499e8a5 499
a8dd0a37 500static bool osd_req_opcode_valid(u16 opcode)
68b4476b 501{
a8dd0a37 502 switch (opcode) {
70b5bfa3
ID
503#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true;
504__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
505#undef GENERATE_CASE
a8dd0a37
AE
506 default:
507 return false;
508 }
509}
510
33803f33
AE
511/*
512 * This is an osd op init function for opcodes that have no data or
513 * other information associated with them. It also serves as a
514 * common init routine for all the other init functions, below.
515 */
c99d2d4a 516static struct ceph_osd_req_op *
49719778 517_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
144cba14 518 u16 opcode, u32 flags)
33803f33 519{
c99d2d4a
AE
520 struct ceph_osd_req_op *op;
521
522 BUG_ON(which >= osd_req->r_num_ops);
33803f33
AE
523 BUG_ON(!osd_req_opcode_valid(opcode));
524
c99d2d4a 525 op = &osd_req->r_ops[which];
33803f33 526 memset(op, 0, sizeof (*op));
33803f33 527 op->op = opcode;
144cba14 528 op->flags = flags;
c99d2d4a
AE
529
530 return op;
33803f33
AE
531}
532
49719778 533void osd_req_op_init(struct ceph_osd_request *osd_req,
144cba14 534 unsigned int which, u16 opcode, u32 flags)
49719778 535{
144cba14 536 (void)_osd_req_op_init(osd_req, which, opcode, flags);
49719778
AE
537}
538EXPORT_SYMBOL(osd_req_op_init);
539
c99d2d4a
AE
540void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
541 unsigned int which, u16 opcode,
33803f33
AE
542 u64 offset, u64 length,
543 u64 truncate_size, u32 truncate_seq)
544{
144cba14
YZ
545 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
546 opcode, 0);
33803f33
AE
547 size_t payload_len = 0;
548
ad7a60de 549 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
e30b7577
ID
550 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
551 opcode != CEPH_OSD_OP_TRUNCATE);
33803f33 552
33803f33
AE
553 op->extent.offset = offset;
554 op->extent.length = length;
555 op->extent.truncate_size = truncate_size;
556 op->extent.truncate_seq = truncate_seq;
e30b7577 557 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
33803f33
AE
558 payload_len += length;
559
de2aa102 560 op->indata_len = payload_len;
33803f33
AE
561}
562EXPORT_SYMBOL(osd_req_op_extent_init);
563
c99d2d4a
AE
564void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
565 unsigned int which, u64 length)
e5975c7c 566{
c99d2d4a
AE
567 struct ceph_osd_req_op *op;
568 u64 previous;
569
570 BUG_ON(which >= osd_req->r_num_ops);
571 op = &osd_req->r_ops[which];
572 previous = op->extent.length;
e5975c7c
AE
573
574 if (length == previous)
575 return; /* Nothing to do */
576 BUG_ON(length > previous);
577
578 op->extent.length = length;
de2aa102 579 op->indata_len -= previous - length;
e5975c7c
AE
580}
581EXPORT_SYMBOL(osd_req_op_extent_update);
582
2c63f49a
YZ
583void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
584 unsigned int which, u64 offset_inc)
585{
586 struct ceph_osd_req_op *op, *prev_op;
587
588 BUG_ON(which + 1 >= osd_req->r_num_ops);
589
590 prev_op = &osd_req->r_ops[which];
591 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
592 /* dup previous one */
593 op->indata_len = prev_op->indata_len;
594 op->outdata_len = prev_op->outdata_len;
595 op->extent = prev_op->extent;
596 /* adjust offset */
597 op->extent.offset += offset_inc;
598 op->extent.length -= offset_inc;
599
600 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
601 op->indata_len -= offset_inc;
602}
603EXPORT_SYMBOL(osd_req_op_extent_dup_last);
604
c99d2d4a 605void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
04017e29 606 u16 opcode, const char *class, const char *method)
33803f33 607{
144cba14
YZ
608 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
609 opcode, 0);
5f562df5 610 struct ceph_pagelist *pagelist;
33803f33
AE
611 size_t payload_len = 0;
612 size_t size;
613
614 BUG_ON(opcode != CEPH_OSD_OP_CALL);
615
5f562df5
AE
616 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
617 BUG_ON(!pagelist);
618 ceph_pagelist_init(pagelist);
619
33803f33
AE
620 op->cls.class_name = class;
621 size = strlen(class);
622 BUG_ON(size > (size_t) U8_MAX);
623 op->cls.class_len = size;
5f562df5 624 ceph_pagelist_append(pagelist, class, size);
33803f33
AE
625 payload_len += size;
626
627 op->cls.method_name = method;
628 size = strlen(method);
629 BUG_ON(size > (size_t) U8_MAX);
630 op->cls.method_len = size;
5f562df5 631 ceph_pagelist_append(pagelist, method, size);
33803f33
AE
632 payload_len += size;
633
a4ce40a9 634 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
5f562df5 635
de2aa102 636 op->indata_len = payload_len;
33803f33
AE
637}
638EXPORT_SYMBOL(osd_req_op_cls_init);
8c042b0d 639
d74b50be
YZ
640int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
641 u16 opcode, const char *name, const void *value,
642 size_t size, u8 cmp_op, u8 cmp_mode)
643{
144cba14
YZ
644 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
645 opcode, 0);
d74b50be
YZ
646 struct ceph_pagelist *pagelist;
647 size_t payload_len;
648
649 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
650
651 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
652 if (!pagelist)
653 return -ENOMEM;
654
655 ceph_pagelist_init(pagelist);
656
657 payload_len = strlen(name);
658 op->xattr.name_len = payload_len;
659 ceph_pagelist_append(pagelist, name, payload_len);
660
661 op->xattr.value_len = size;
662 ceph_pagelist_append(pagelist, value, size);
663 payload_len += size;
664
665 op->xattr.cmp_op = cmp_op;
666 op->xattr.cmp_mode = cmp_mode;
667
668 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
de2aa102 669 op->indata_len = payload_len;
d74b50be
YZ
670 return 0;
671}
672EXPORT_SYMBOL(osd_req_op_xattr_init);
673
c99d2d4a
AE
674void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
675 unsigned int which, u16 opcode,
33803f33
AE
676 u64 cookie, u64 version, int flag)
677{
144cba14
YZ
678 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
679 opcode, 0);
33803f33 680
c99d2d4a 681 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
33803f33
AE
682
683 op->watch.cookie = cookie;
9ef1ee5a 684 op->watch.ver = version;
33803f33 685 if (opcode == CEPH_OSD_OP_WATCH && flag)
c99d2d4a 686 op->watch.flag = (u8)1;
33803f33
AE
687}
688EXPORT_SYMBOL(osd_req_op_watch_init);
689
c647b8a8
ID
690void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
691 unsigned int which,
692 u64 expected_object_size,
693 u64 expected_write_size)
694{
695 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
144cba14
YZ
696 CEPH_OSD_OP_SETALLOCHINT,
697 0);
c647b8a8
ID
698
699 op->alloc_hint.expected_object_size = expected_object_size;
700 op->alloc_hint.expected_write_size = expected_write_size;
701
702 /*
703 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
704 * not worth a feature bit. Set FAILOK per-op flag to make
705 * sure older osds don't trip over an unsupported opcode.
706 */
707 op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
708}
709EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
710
90af3602 711static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
ec9123c5
AE
712 struct ceph_osd_data *osd_data)
713{
714 u64 length = ceph_osd_data_length(osd_data);
715
716 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
717 BUG_ON(length > (u64) SIZE_MAX);
718 if (length)
90af3602 719 ceph_msg_data_add_pages(msg, osd_data->pages,
ec9123c5
AE
720 length, osd_data->alignment);
721 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
722 BUG_ON(!length);
90af3602 723 ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
ec9123c5
AE
724#ifdef CONFIG_BLOCK
725 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
90af3602 726 ceph_msg_data_add_bio(msg, osd_data->bio, length);
ec9123c5
AE
727#endif
728 } else {
729 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
730 }
731}
732
bb873b53
ID
733static u32 osd_req_encode_op(struct ceph_osd_op *dst,
734 const struct ceph_osd_req_op *src)
a8dd0a37 735{
a8dd0a37
AE
736 if (WARN_ON(!osd_req_opcode_valid(src->op))) {
737 pr_err("unrecognized osd opcode %d\n", src->op);
738
739 return 0;
740 }
741
742 switch (src->op) {
743 case CEPH_OSD_OP_STAT:
744 break;
745 case CEPH_OSD_OP_READ:
746 case CEPH_OSD_OP_WRITE:
e30b7577 747 case CEPH_OSD_OP_WRITEFULL:
ad7a60de 748 case CEPH_OSD_OP_ZERO:
ad7a60de 749 case CEPH_OSD_OP_TRUNCATE:
a8dd0a37
AE
750 dst->extent.offset = cpu_to_le64(src->extent.offset);
751 dst->extent.length = cpu_to_le64(src->extent.length);
752 dst->extent.truncate_size =
753 cpu_to_le64(src->extent.truncate_size);
754 dst->extent.truncate_seq =
755 cpu_to_le32(src->extent.truncate_seq);
756 break;
757 case CEPH_OSD_OP_CALL:
a8dd0a37
AE
758 dst->cls.class_len = src->cls.class_len;
759 dst->cls.method_len = src->cls.method_len;
bb873b53 760 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
a8dd0a37
AE
761 break;
762 case CEPH_OSD_OP_STARTSYNC:
763 break;
764 case CEPH_OSD_OP_NOTIFY_ACK:
765 case CEPH_OSD_OP_WATCH:
766 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
767 dst->watch.ver = cpu_to_le64(src->watch.ver);
768 dst->watch.flag = src->watch.flag;
769 break;
c647b8a8
ID
770 case CEPH_OSD_OP_SETALLOCHINT:
771 dst->alloc_hint.expected_object_size =
772 cpu_to_le64(src->alloc_hint.expected_object_size);
773 dst->alloc_hint.expected_write_size =
774 cpu_to_le64(src->alloc_hint.expected_write_size);
775 break;
d74b50be
YZ
776 case CEPH_OSD_OP_SETXATTR:
777 case CEPH_OSD_OP_CMPXATTR:
778 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
779 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
780 dst->xattr.cmp_op = src->xattr.cmp_op;
781 dst->xattr.cmp_mode = src->xattr.cmp_mode;
d74b50be 782 break;
864e9197
YZ
783 case CEPH_OSD_OP_CREATE:
784 case CEPH_OSD_OP_DELETE:
785 break;
a8dd0a37 786 default:
4c46459c 787 pr_err("unsupported osd opcode %s\n",
8f63ca2d 788 ceph_osd_op_name(src->op));
4c46459c 789 WARN_ON(1);
a8dd0a37
AE
790
791 return 0;
68b4476b 792 }
7b25bf5f 793
a8dd0a37 794 dst->op = cpu_to_le16(src->op);
7b25bf5f 795 dst->flags = cpu_to_le32(src->flags);
de2aa102 796 dst->payload_len = cpu_to_le32(src->indata_len);
175face2 797
bb873b53 798 return src->indata_len;
68b4476b
YS
799}
800
3499e8a5
YS
801/*
802 * build new request AND message, calculate layout, and adjust file
803 * extent as needed.
804 *
805 * if the file was recently truncated, we include information about its
806 * old and new size so that the object can be updated appropriately. (we
807 * avoid synchronously deleting truncated objects because it's slow.)
808 *
809 * if @do_sync, include a 'startsync' command so that the osd will flush
810 * data quickly.
811 */
812struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
813 struct ceph_file_layout *layout,
814 struct ceph_vino vino,
715e4cd4
YZ
815 u64 off, u64 *plen,
816 unsigned int which, int num_ops,
3499e8a5
YS
817 int opcode, int flags,
818 struct ceph_snap_context *snapc,
3499e8a5
YS
819 u32 truncate_seq,
820 u64 truncate_size,
153e5167 821 bool use_mempool)
3499e8a5 822{
68b4476b 823 struct ceph_osd_request *req;
75d1c941
AE
824 u64 objnum = 0;
825 u64 objoff = 0;
826 u64 objlen = 0;
6816282d 827 int r;
68b4476b 828
ad7a60de 829 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
864e9197
YZ
830 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
831 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
68b4476b 832
acead002 833 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
ae7ca4a3 834 GFP_NOFS);
13d1ad16
ID
835 if (!req) {
836 r = -ENOMEM;
837 goto fail;
838 }
79528734 839
3499e8a5 840 /* calculate max write size */
a19dadfb 841 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
13d1ad16
ID
842 if (r)
843 goto fail;
a19dadfb 844
864e9197 845 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
144cba14 846 osd_req_op_init(req, which, opcode, 0);
864e9197
YZ
847 } else {
848 u32 object_size = le32_to_cpu(layout->fl_object_size);
849 u32 object_base = off - objoff;
850 if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
851 if (truncate_size <= object_base) {
852 truncate_size = 0;
853 } else {
854 truncate_size -= object_base;
855 if (truncate_size > object_size)
856 truncate_size = object_size;
857 }
ccca4e37 858 }
715e4cd4 859 osd_req_op_extent_init(req, which, opcode, objoff, objlen,
864e9197
YZ
860 truncate_size, truncate_seq);
861 }
d18d1e28 862
bb873b53 863 req->r_flags = flags;
3c972c95 864 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
d30291b9 865 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
dbe0fc41 866
bb873b53
ID
867 req->r_snapid = vino.snap;
868 if (flags & CEPH_OSD_FLAG_WRITE)
869 req->r_data_offset = off;
870
13d1ad16
ID
871 r = ceph_osdc_alloc_messages(req, GFP_NOFS);
872 if (r)
873 goto fail;
874
f24e9980 875 return req;
13d1ad16
ID
876
877fail:
878 ceph_osdc_put_request(req);
879 return ERR_PTR(r);
f24e9980 880}
3d14c5d2 881EXPORT_SYMBOL(ceph_osdc_new_request);
f24e9980
SW
882
883/*
884 * We keep osd requests in an rbtree, sorted by ->r_tid.
885 */
fcd00b68 886DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
f24e9980 887
0247a0cf
ID
888static bool osd_homeless(struct ceph_osd *osd)
889{
890 return osd->o_osd == CEPH_HOMELESS_OSD;
891}
892
5aea3dcd 893static bool osd_registered(struct ceph_osd *osd)
f24e9980 894{
5aea3dcd 895 verify_osdc_locked(osd->o_osdc);
f24e9980 896
5aea3dcd 897 return !RB_EMPTY_NODE(&osd->o_node);
f24e9980
SW
898}
899
0247a0cf
ID
900/*
901 * Assumes @osd is zero-initialized.
902 */
903static void osd_init(struct ceph_osd *osd)
904{
905 atomic_set(&osd->o_ref, 1);
906 RB_CLEAR_NODE(&osd->o_node);
5aea3dcd 907 osd->o_requests = RB_ROOT;
0247a0cf
ID
908 INIT_LIST_HEAD(&osd->o_linger_requests);
909 INIT_LIST_HEAD(&osd->o_osd_lru);
910 INIT_LIST_HEAD(&osd->o_keepalive_item);
911 osd->o_incarnation = 1;
5aea3dcd 912 mutex_init(&osd->lock);
0247a0cf
ID
913}
914
915static void osd_cleanup(struct ceph_osd *osd)
916{
917 WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
5aea3dcd 918 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
0247a0cf
ID
919 WARN_ON(!list_empty(&osd->o_linger_requests));
920 WARN_ON(!list_empty(&osd->o_osd_lru));
921 WARN_ON(!list_empty(&osd->o_keepalive_item));
922
923 if (osd->o_auth.authorizer) {
924 WARN_ON(osd_homeless(osd));
925 ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
926 }
927}
928
f24e9980
SW
929/*
930 * Track open sessions with osds.
931 */
e10006f8 932static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
f24e9980
SW
933{
934 struct ceph_osd *osd;
935
0247a0cf
ID
936 WARN_ON(onum == CEPH_HOMELESS_OSD);
937
7a28f59b 938 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
0247a0cf 939 osd_init(osd);
f24e9980 940 osd->o_osdc = osdc;
e10006f8 941 osd->o_osd = onum;
f24e9980 942
b7a9e5dd 943 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
4e7a5dcd 944
f24e9980
SW
945 return osd;
946}
947
948static struct ceph_osd *get_osd(struct ceph_osd *osd)
949{
950 if (atomic_inc_not_zero(&osd->o_ref)) {
951 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
952 atomic_read(&osd->o_ref));
953 return osd;
954 } else {
955 dout("get_osd %p FAIL\n", osd);
956 return NULL;
957 }
958}
959
960static void put_osd(struct ceph_osd *osd)
961{
962 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
963 atomic_read(&osd->o_ref) - 1);
b28ec2f3 964 if (atomic_dec_and_test(&osd->o_ref)) {
0247a0cf 965 osd_cleanup(osd);
f24e9980 966 kfree(osd);
79494d1b 967 }
f24e9980
SW
968}
969
fcd00b68
ID
970DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
971
9dd2845c 972static void __move_osd_to_lru(struct ceph_osd *osd)
f5a2041b 973{
9dd2845c
ID
974 struct ceph_osd_client *osdc = osd->o_osdc;
975
976 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
f5a2041b 977 BUG_ON(!list_empty(&osd->o_osd_lru));
bbf37ec3 978
9dd2845c 979 spin_lock(&osdc->osd_lru_lock);
f5a2041b 980 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
9dd2845c
ID
981 spin_unlock(&osdc->osd_lru_lock);
982
a319bf56 983 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
f5a2041b
YS
984}
985
9dd2845c 986static void maybe_move_osd_to_lru(struct ceph_osd *osd)
bbf37ec3 987{
5aea3dcd 988 if (RB_EMPTY_ROOT(&osd->o_requests) &&
bbf37ec3 989 list_empty(&osd->o_linger_requests))
9dd2845c 990 __move_osd_to_lru(osd);
bbf37ec3
ID
991}
992
f5a2041b
YS
993static void __remove_osd_from_lru(struct ceph_osd *osd)
994{
9dd2845c
ID
995 struct ceph_osd_client *osdc = osd->o_osdc;
996
997 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
998
999 spin_lock(&osdc->osd_lru_lock);
f5a2041b
YS
1000 if (!list_empty(&osd->o_osd_lru))
1001 list_del_init(&osd->o_osd_lru);
9dd2845c 1002 spin_unlock(&osdc->osd_lru_lock);
f5a2041b
YS
1003}
1004
5aea3dcd
ID
1005/*
1006 * Close the connection and assign any leftover requests to the
1007 * homeless session.
1008 */
1009static void close_osd(struct ceph_osd *osd)
1010{
1011 struct ceph_osd_client *osdc = osd->o_osdc;
1012 struct rb_node *n;
1013
1014 verify_osdc_wrlocked(osdc);
1015 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1016
1017 ceph_con_close(&osd->o_con);
1018
1019 for (n = rb_first(&osd->o_requests); n; ) {
1020 struct ceph_osd_request *req =
1021 rb_entry(n, struct ceph_osd_request, r_node);
1022
1023 n = rb_next(n); /* unlink_request() */
1024
1025 dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1026 unlink_request(osd, req);
1027 link_request(&osdc->homeless_osd, req);
1028 }
1029
1030 __remove_osd_from_lru(osd);
1031 erase_osd(&osdc->osds, osd);
1032 put_osd(osd);
1033}
1034
f24e9980
SW
1035/*
1036 * reset osd connect
1037 */
5aea3dcd 1038static int reopen_osd(struct ceph_osd *osd)
f24e9980 1039{
c3acb181 1040 struct ceph_entity_addr *peer_addr;
f24e9980 1041
5aea3dcd
ID
1042 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1043
1044 if (RB_EMPTY_ROOT(&osd->o_requests) &&
a40c4f10 1045 list_empty(&osd->o_linger_requests)) {
5aea3dcd 1046 close_osd(osd);
c3acb181
AE
1047 return -ENODEV;
1048 }
1049
5aea3dcd 1050 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
c3acb181
AE
1051 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1052 !ceph_con_opened(&osd->o_con)) {
5aea3dcd 1053 struct rb_node *n;
c3acb181 1054
0b4af2e8
ID
1055 dout("osd addr hasn't changed and connection never opened, "
1056 "letting msgr retry\n");
87b315a5 1057 /* touch each r_stamp for handle_timeout()'s benfit */
5aea3dcd
ID
1058 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1059 struct ceph_osd_request *req =
1060 rb_entry(n, struct ceph_osd_request, r_node);
87b315a5 1061 req->r_stamp = jiffies;
5aea3dcd 1062 }
c3acb181
AE
1063
1064 return -EAGAIN;
f24e9980 1065 }
c3acb181
AE
1066
1067 ceph_con_close(&osd->o_con);
1068 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1069 osd->o_incarnation++;
1070
1071 return 0;
f24e9980
SW
1072}
1073
5aea3dcd
ID
1074static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1075 bool wrlocked)
f24e9980 1076{
5aea3dcd 1077 struct ceph_osd *osd;
35f9f8a0 1078
5aea3dcd
ID
1079 if (wrlocked)
1080 verify_osdc_wrlocked(osdc);
1081 else
1082 verify_osdc_locked(osdc);
f24e9980 1083
5aea3dcd
ID
1084 if (o != CEPH_HOMELESS_OSD)
1085 osd = lookup_osd(&osdc->osds, o);
1086 else
1087 osd = &osdc->homeless_osd;
1088 if (!osd) {
1089 if (!wrlocked)
1090 return ERR_PTR(-EAGAIN);
0ba6478d 1091
5aea3dcd
ID
1092 osd = create_osd(osdc, o);
1093 insert_osd(&osdc->osds, osd);
1094 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1095 &osdc->osdmap->osd_addr[osd->o_osd]);
0ba6478d 1096 }
f24e9980 1097
5aea3dcd
ID
1098 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1099 return osd;
f24e9980
SW
1100}
1101
1102/*
5aea3dcd
ID
1103 * Create request <-> OSD session relation.
1104 *
1105 * @req has to be assigned a tid, @osd may be homeless.
f24e9980 1106 */
5aea3dcd 1107static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
f24e9980 1108{
5aea3dcd
ID
1109 verify_osd_locked(osd);
1110 WARN_ON(!req->r_tid || req->r_osd);
1111 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1112 req, req->r_tid);
1113
1114 if (!osd_homeless(osd))
1115 __remove_osd_from_lru(osd);
1116 else
1117 atomic_inc(&osd->o_osdc->num_homeless);
1118
1119 get_osd(osd);
1120 insert_request(&osd->o_requests, req);
1121 req->r_osd = osd;
f24e9980
SW
1122}
1123
5aea3dcd
ID
1124static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1125{
1126 verify_osd_locked(osd);
1127 WARN_ON(req->r_osd != osd);
1128 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1129 req, req->r_tid);
1130
1131 req->r_osd = NULL;
1132 erase_request(&osd->o_requests, req);
1133 put_osd(osd);
1134
1135 if (!osd_homeless(osd))
1136 maybe_move_osd_to_lru(osd);
1137 else
1138 atomic_dec(&osd->o_osdc->num_homeless);
1139}
1140
1141static void __register_linger_request(struct ceph_osd *osd,
a40c4f10
YS
1142 struct ceph_osd_request *req)
1143{
af593064
ID
1144 dout("%s %p tid %llu\n", __func__, req, req->r_tid);
1145 WARN_ON(!req->r_linger);
1146
96e4dac6 1147 ceph_osdc_get_request(req);
5aea3dcd
ID
1148 list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger);
1149 list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests);
1150 __remove_osd_from_lru(osd);
1151 req->r_osd = osd;
a40c4f10
YS
1152}
1153
1154static void __unregister_linger_request(struct ceph_osd_client *osdc,
1155 struct ceph_osd_request *req)
1156{
af593064
ID
1157 WARN_ON(!req->r_linger);
1158
1159 if (list_empty(&req->r_linger_item)) {
1160 dout("%s %p tid %llu not registered\n", __func__, req,
1161 req->r_tid);
1162 return;
1163 }
1164
1165 dout("%s %p tid %llu\n", __func__, req, req->r_tid);
61c74035 1166 list_del_init(&req->r_linger_item);
af593064 1167
a40c4f10 1168 if (req->r_osd) {
1d0326b1 1169 list_del_init(&req->r_linger_osd_item);
9dd2845c 1170 maybe_move_osd_to_lru(req->r_osd);
5aea3dcd 1171 if (RB_EMPTY_ROOT(&req->r_osd->o_requests))
fbdb9190 1172 req->r_osd = NULL;
a40c4f10 1173 }
96e4dac6 1174 ceph_osdc_put_request(req);
a40c4f10
YS
1175}
1176
a40c4f10
YS
1177void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
1178 struct ceph_osd_request *req)
1179{
1180 if (!req->r_linger) {
1181 dout("set_request_linger %p\n", req);
1182 req->r_linger = 1;
a40c4f10
YS
1183 }
1184}
1185EXPORT_SYMBOL(ceph_osdc_set_request_linger);
1186
63244fa1
ID
1187static bool __pool_full(struct ceph_pg_pool_info *pi)
1188{
1189 return pi->flags & CEPH_POOL_FLAG_FULL;
1190}
1191
42c1b124
ID
1192static bool have_pool_full(struct ceph_osd_client *osdc)
1193{
1194 struct rb_node *n;
1195
1196 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1197 struct ceph_pg_pool_info *pi =
1198 rb_entry(n, struct ceph_pg_pool_info, node);
1199
1200 if (__pool_full(pi))
1201 return true;
1202 }
1203
1204 return false;
1205}
1206
5aea3dcd
ID
1207static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1208{
1209 struct ceph_pg_pool_info *pi;
1210
1211 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1212 if (!pi)
1213 return false;
1214
1215 return __pool_full(pi);
1216}
1217
d29adb34
JD
1218/*
1219 * Returns whether a request should be blocked from being sent
1220 * based on the current osdmap and osd_client settings.
d29adb34 1221 */
63244fa1
ID
1222static bool target_should_be_paused(struct ceph_osd_client *osdc,
1223 const struct ceph_osd_request_target *t,
1224 struct ceph_pg_pool_info *pi)
1225{
1226 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
1227 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
1228 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
1229 __pool_full(pi);
1230
1231 WARN_ON(pi->id != t->base_oloc.pool);
1232 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
1233 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
1234}
1235
63244fa1
ID
1236enum calc_target_result {
1237 CALC_TARGET_NO_ACTION = 0,
1238 CALC_TARGET_NEED_RESEND,
1239 CALC_TARGET_POOL_DNE,
1240};
1241
1242static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1243 struct ceph_osd_request_target *t,
1244 u32 *last_force_resend,
1245 bool any_change)
1246{
1247 struct ceph_pg_pool_info *pi;
1248 struct ceph_pg pgid, last_pgid;
1249 struct ceph_osds up, acting;
1250 bool force_resend = false;
1251 bool need_check_tiering = false;
1252 bool need_resend = false;
1253 bool sort_bitwise = ceph_osdmap_flag(osdc->osdmap,
1254 CEPH_OSDMAP_SORTBITWISE);
1255 enum calc_target_result ct_res;
1256 int ret;
1257
1258 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1259 if (!pi) {
1260 t->osd = CEPH_HOMELESS_OSD;
1261 ct_res = CALC_TARGET_POOL_DNE;
1262 goto out;
1263 }
1264
1265 if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1266 if (last_force_resend &&
1267 *last_force_resend < pi->last_force_request_resend) {
1268 *last_force_resend = pi->last_force_request_resend;
1269 force_resend = true;
1270 } else if (!last_force_resend) {
1271 force_resend = true;
1272 }
1273 }
1274 if (ceph_oid_empty(&t->target_oid) || force_resend) {
1275 ceph_oid_copy(&t->target_oid, &t->base_oid);
1276 need_check_tiering = true;
1277 }
1278 if (ceph_oloc_empty(&t->target_oloc) || force_resend) {
1279 ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1280 need_check_tiering = true;
1281 }
1282
1283 if (need_check_tiering &&
1284 (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1285 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
1286 t->target_oloc.pool = pi->read_tier;
1287 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
1288 t->target_oloc.pool = pi->write_tier;
1289 }
1290
1291 ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid,
1292 &t->target_oloc, &pgid);
1293 if (ret) {
1294 WARN_ON(ret != -ENOENT);
1295 t->osd = CEPH_HOMELESS_OSD;
1296 ct_res = CALC_TARGET_POOL_DNE;
1297 goto out;
1298 }
1299 last_pgid.pool = pgid.pool;
1300 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1301
1302 ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting);
1303 if (any_change &&
1304 ceph_is_new_interval(&t->acting,
1305 &acting,
1306 &t->up,
1307 &up,
1308 t->size,
1309 pi->size,
1310 t->min_size,
1311 pi->min_size,
1312 t->pg_num,
1313 pi->pg_num,
1314 t->sort_bitwise,
1315 sort_bitwise,
1316 &last_pgid))
1317 force_resend = true;
1318
1319 if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1320 t->paused = false;
1321 need_resend = true;
1322 }
1323
1324 if (ceph_pg_compare(&t->pgid, &pgid) ||
1325 ceph_osds_changed(&t->acting, &acting, any_change) ||
1326 force_resend) {
1327 t->pgid = pgid; /* struct */
1328 ceph_osds_copy(&t->acting, &acting);
1329 ceph_osds_copy(&t->up, &up);
1330 t->size = pi->size;
1331 t->min_size = pi->min_size;
1332 t->pg_num = pi->pg_num;
1333 t->pg_num_mask = pi->pg_num_mask;
1334 t->sort_bitwise = sort_bitwise;
1335
1336 t->osd = acting.primary;
1337 need_resend = true;
1338 }
1339
1340 ct_res = need_resend ? CALC_TARGET_NEED_RESEND : CALC_TARGET_NO_ACTION;
1341out:
1342 dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
1343 return ct_res;
1344}
1345
bb873b53
ID
1346static void setup_request_data(struct ceph_osd_request *req,
1347 struct ceph_msg *msg)
f24e9980 1348{
bb873b53
ID
1349 u32 data_len = 0;
1350 int i;
1351
1352 if (!list_empty(&msg->data))
1353 return;
f24e9980 1354
bb873b53
ID
1355 WARN_ON(msg->data_length);
1356 for (i = 0; i < req->r_num_ops; i++) {
1357 struct ceph_osd_req_op *op = &req->r_ops[i];
1358
1359 switch (op->op) {
1360 /* request */
1361 case CEPH_OSD_OP_WRITE:
1362 case CEPH_OSD_OP_WRITEFULL:
1363 WARN_ON(op->indata_len != op->extent.length);
1364 ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
1365 break;
1366 case CEPH_OSD_OP_SETXATTR:
1367 case CEPH_OSD_OP_CMPXATTR:
1368 WARN_ON(op->indata_len != op->xattr.name_len +
1369 op->xattr.value_len);
1370 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
1371 break;
1372
1373 /* reply */
1374 case CEPH_OSD_OP_STAT:
1375 ceph_osdc_msg_data_add(req->r_reply,
1376 &op->raw_data_in);
1377 break;
1378 case CEPH_OSD_OP_READ:
1379 ceph_osdc_msg_data_add(req->r_reply,
1380 &op->extent.osd_data);
1381 break;
1382
1383 /* both */
1384 case CEPH_OSD_OP_CALL:
1385 WARN_ON(op->indata_len != op->cls.class_len +
1386 op->cls.method_len +
1387 op->cls.indata_len);
1388 ceph_osdc_msg_data_add(msg, &op->cls.request_info);
1389 /* optional, can be NONE */
1390 ceph_osdc_msg_data_add(msg, &op->cls.request_data);
1391 /* optional, can be NONE */
1392 ceph_osdc_msg_data_add(req->r_reply,
1393 &op->cls.response_data);
1394 break;
1395 }
1396
1397 data_len += op->indata_len;
1398 }
1b83bef2 1399
bb873b53
ID
1400 WARN_ON(data_len != msg->data_length);
1401}
1402
1403static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
1404{
1405 void *p = msg->front.iov_base;
1406 void *const end = p + msg->front_alloc_len;
1407 u32 data_len = 0;
1408 int i;
1409
1410 if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
1411 /* snapshots aren't writeable */
1412 WARN_ON(req->r_snapid != CEPH_NOSNAP);
1413 } else {
1414 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
1415 req->r_data_offset || req->r_snapc);
1416 }
1417
1418 setup_request_data(req, msg);
1419
1420 ceph_encode_32(&p, 1); /* client_inc, always 1 */
1421 ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
1422 ceph_encode_32(&p, req->r_flags);
1423 ceph_encode_timespec(p, &req->r_mtime);
1424 p += sizeof(struct ceph_timespec);
1425 /* aka reassert_version */
1426 memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
1427 p += sizeof(req->r_replay_version);
1428
1429 /* oloc */
1430 ceph_encode_8(&p, 4);
1431 ceph_encode_8(&p, 4);
1432 ceph_encode_32(&p, 8 + 4 + 4);
1433 ceph_encode_64(&p, req->r_t.target_oloc.pool);
1434 ceph_encode_32(&p, -1); /* preferred */
1435 ceph_encode_32(&p, 0); /* key len */
1436
1437 /* pgid */
1438 ceph_encode_8(&p, 1);
a66dd383
ID
1439 ceph_encode_64(&p, req->r_t.pgid.pool);
1440 ceph_encode_32(&p, req->r_t.pgid.seed);
bb873b53 1441 ceph_encode_32(&p, -1); /* preferred */
2169aea6 1442
bb873b53
ID
1443 /* oid */
1444 ceph_encode_32(&p, req->r_t.target_oid.name_len);
1445 memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len);
1446 p += req->r_t.target_oid.name_len;
f24e9980 1447
bb873b53
ID
1448 /* ops, can imply data */
1449 ceph_encode_16(&p, req->r_num_ops);
1450 for (i = 0; i < req->r_num_ops; i++) {
1451 data_len += osd_req_encode_op(p, &req->r_ops[i]);
1452 p += sizeof(struct ceph_osd_op);
1453 }
26be8808 1454
bb873b53
ID
1455 ceph_encode_64(&p, req->r_snapid); /* snapid */
1456 if (req->r_snapc) {
1457 ceph_encode_64(&p, req->r_snapc->seq);
1458 ceph_encode_32(&p, req->r_snapc->num_snaps);
1459 for (i = 0; i < req->r_snapc->num_snaps; i++)
1460 ceph_encode_64(&p, req->r_snapc->snaps[i]);
1461 } else {
1462 ceph_encode_64(&p, 0); /* snap_seq */
1463 ceph_encode_32(&p, 0); /* snaps len */
1464 }
1465
1466 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
1467
1468 BUG_ON(p > end);
1469 msg->front.iov_len = p - msg->front.iov_base;
1470 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
1471 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1472 msg->hdr.data_len = cpu_to_le32(data_len);
1473 /*
1474 * The header "data_off" is a hint to the receiver allowing it
1475 * to align received data into its buffers such that there's no
1476 * need to re-copy it before writing it to disk (direct I/O).
1477 */
1478 msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
26be8808 1479
bb873b53
ID
1480 dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__,
1481 req, req->r_t.target_oid.name_len, req->r_t.target_oid.name,
1482 req->r_t.target_oid.name_len, msg->front.iov_len, data_len);
1483}
1484
1485/*
1486 * @req has to be assigned a tid and registered.
1487 */
1488static void send_request(struct ceph_osd_request *req)
1489{
1490 struct ceph_osd *osd = req->r_osd;
1491
5aea3dcd 1492 verify_osd_locked(osd);
bb873b53
ID
1493 WARN_ON(osd->o_osd != req->r_t.osd);
1494
5aea3dcd
ID
1495 /*
1496 * We may have a previously queued request message hanging
1497 * around. Cancel it to avoid corrupting the msgr.
1498 */
1499 if (req->r_sent)
1500 ceph_msg_revoke(req->r_request);
1501
bb873b53
ID
1502 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
1503 if (req->r_attempts)
1504 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1505 else
1506 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
1507
1508 encode_request(req, req->r_request);
1509
1510 dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n",
1511 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
1512 req->r_t.osd, req->r_flags, req->r_attempts);
1513
1514 req->r_t.paused = false;
1515 req->r_stamp = jiffies;
1516 req->r_attempts++;
1517
1518 req->r_sent = osd->o_incarnation;
1519 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
1520 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
f24e9980
SW
1521}
1522
42c1b124
ID
1523static void maybe_request_map(struct ceph_osd_client *osdc)
1524{
1525 bool continuous = false;
1526
5aea3dcd 1527 verify_osdc_locked(osdc);
42c1b124
ID
1528 WARN_ON(!osdc->osdmap->epoch);
1529
1530 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
1531 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
1532 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
1533 dout("%s osdc %p continuous\n", __func__, osdc);
1534 continuous = true;
1535 } else {
1536 dout("%s osdc %p onetime\n", __func__, osdc);
1537 }
1538
1539 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
1540 osdc->osdmap->epoch + 1, continuous))
1541 ceph_monc_renew_subs(&osdc->client->monc);
1542}
1543
5aea3dcd 1544static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
0bbfdfe8 1545{
5aea3dcd
ID
1546 struct ceph_osd_client *osdc = req->r_osdc;
1547 struct ceph_osd *osd;
1548 bool need_send = false;
1549 bool promoted = false;
0bbfdfe8 1550
5aea3dcd
ID
1551 WARN_ON(req->r_tid || req->r_got_reply);
1552 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
1553
1554again:
1555 calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
1556 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
1557 if (IS_ERR(osd)) {
1558 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
1559 goto promote;
0bbfdfe8
ID
1560 }
1561
5aea3dcd
ID
1562 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1563 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
1564 dout("req %p pausewr\n", req);
1565 req->r_t.paused = true;
1566 maybe_request_map(osdc);
1567 } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1568 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
1569 dout("req %p pauserd\n", req);
1570 req->r_t.paused = true;
1571 maybe_request_map(osdc);
1572 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1573 !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
1574 CEPH_OSD_FLAG_FULL_FORCE)) &&
1575 (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
1576 pool_full(osdc, req->r_t.base_oloc.pool))) {
1577 dout("req %p full/pool_full\n", req);
1578 pr_warn_ratelimited("FULL or reached pool quota\n");
1579 req->r_t.paused = true;
1580 maybe_request_map(osdc);
1581 } else if (!osd_homeless(osd)) {
1582 need_send = true;
0bbfdfe8 1583 } else {
5aea3dcd 1584 maybe_request_map(osdc);
0bbfdfe8
ID
1585 }
1586
5aea3dcd
ID
1587 mutex_lock(&osd->lock);
1588 /*
1589 * Assign the tid atomically with send_request() to protect
1590 * multiple writes to the same object from racing with each
1591 * other, resulting in out of order ops on the OSDs.
1592 */
1593 req->r_tid = atomic64_inc_return(&osdc->last_tid);
1594 link_request(osd, req);
1595 if (need_send)
1596 send_request(req);
1597 mutex_unlock(&osd->lock);
1598
1599 if (promoted)
1600 downgrade_write(&osdc->lock);
1601 return;
1602
1603promote:
1604 up_read(&osdc->lock);
1605 down_write(&osdc->lock);
1606 wrlocked = true;
1607 promoted = true;
1608 goto again;
1609}
1610
1611static void account_request(struct ceph_osd_request *req)
1612{
1613 unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
1614
1615 if (req->r_flags & CEPH_OSD_FLAG_READ) {
1616 WARN_ON(req->r_flags & mask);
1617 req->r_flags |= CEPH_OSD_FLAG_ACK;
1618 } else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
1619 WARN_ON(!(req->r_flags & mask));
1620 else
1621 WARN_ON(1);
1622
1623 WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
1624 atomic_inc(&req->r_osdc->num_requests);
1625}
1626
1627static void submit_request(struct ceph_osd_request *req, bool wrlocked)
1628{
1629 ceph_osdc_get_request(req);
1630 account_request(req);
1631 __submit_request(req, wrlocked);
1632}
1633
1634static void __finish_request(struct ceph_osd_request *req)
1635{
1636 struct ceph_osd_client *osdc = req->r_osdc;
1637 struct ceph_osd *osd = req->r_osd;
1638
1639 verify_osd_locked(osd);
1640 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
1641
1642 unlink_request(osd, req);
1643 atomic_dec(&osdc->num_requests);
1644
1645 /*
1646 * If an OSD has failed or returned and a request has been sent
1647 * twice, it's possible to get a reply and end up here while the
1648 * request message is queued for delivery. We will ignore the
1649 * reply, so not a big deal, but better to try and catch it.
1650 */
1651 ceph_msg_revoke(req->r_request);
1652 ceph_msg_revoke_incoming(req->r_reply);
1653}
1654
1655static void finish_request(struct ceph_osd_request *req)
1656{
1657 __finish_request(req);
1658 ceph_osdc_put_request(req);
0bbfdfe8
ID
1659}
1660
fe5da05e
ID
1661static void __complete_request(struct ceph_osd_request *req)
1662{
1663 if (req->r_callback)
1664 req->r_callback(req);
1665 else
1666 complete_all(&req->r_completion);
1667}
1668
5aea3dcd
ID
1669static void cancel_request(struct ceph_osd_request *req)
1670{
1671 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
1672
1673 finish_request(req);
1674}
1675
f24e9980 1676/*
fbca9635
ID
1677 * Timeout callback, called every N seconds. When 1 or more OSD
1678 * requests has been active for more than N seconds, we send a keepalive
1679 * (tag + timestamp) to its OSD to ensure any communications channel
1680 * reset is detected.
f24e9980
SW
1681 */
1682static void handle_timeout(struct work_struct *work)
1683{
1684 struct ceph_osd_client *osdc =
1685 container_of(work, struct ceph_osd_client, timeout_work.work);
a319bf56 1686 struct ceph_options *opts = osdc->client->options;
5aea3dcd
ID
1687 unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
1688 LIST_HEAD(slow_osds);
1689 struct rb_node *n, *p;
f24e9980 1690
5aea3dcd
ID
1691 dout("%s osdc %p\n", __func__, osdc);
1692 down_write(&osdc->lock);
f24e9980 1693
422d2cb8
YS
1694 /*
1695 * ping osds that are a bit slow. this ensures that if there
1696 * is a break in the TCP connection we will notice, and reopen
1697 * a connection with that osd (from the fault callback).
1698 */
5aea3dcd
ID
1699 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1700 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1701 bool found = false;
1702
1703 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
1704 struct ceph_osd_request *req =
1705 rb_entry(p, struct ceph_osd_request, r_node);
1706
1707 if (time_before(req->r_stamp, cutoff)) {
1708 dout(" req %p tid %llu on osd%d is laggy\n",
1709 req, req->r_tid, osd->o_osd);
1710 found = true;
1711 }
1712 }
422d2cb8 1713
5aea3dcd
ID
1714 if (found)
1715 list_move_tail(&osd->o_keepalive_item, &slow_osds);
422d2cb8 1716 }
5aea3dcd
ID
1717
1718 if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
1719 maybe_request_map(osdc);
1720
422d2cb8 1721 while (!list_empty(&slow_osds)) {
5aea3dcd
ID
1722 struct ceph_osd *osd = list_first_entry(&slow_osds,
1723 struct ceph_osd,
1724 o_keepalive_item);
422d2cb8 1725 list_del_init(&osd->o_keepalive_item);
f24e9980
SW
1726 ceph_con_keepalive(&osd->o_con);
1727 }
1728
5aea3dcd 1729 up_write(&osdc->lock);
fbca9635
ID
1730 schedule_delayed_work(&osdc->timeout_work,
1731 osdc->client->options->osd_keepalive_timeout);
f24e9980
SW
1732}
1733
f5a2041b
YS
1734static void handle_osds_timeout(struct work_struct *work)
1735{
1736 struct ceph_osd_client *osdc =
1737 container_of(work, struct ceph_osd_client,
1738 osds_timeout_work.work);
a319bf56 1739 unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
42a2c09f 1740 struct ceph_osd *osd, *nosd;
f5a2041b 1741
42a2c09f 1742 dout("%s osdc %p\n", __func__, osdc);
5aea3dcd 1743 down_write(&osdc->lock);
42a2c09f
ID
1744 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
1745 if (time_before(jiffies, osd->lru_ttl))
1746 break;
1747
5aea3dcd
ID
1748 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1749 WARN_ON(!list_empty(&osd->o_linger_requests));
1750 close_osd(osd);
42a2c09f 1751 }
f5a2041b 1752
5aea3dcd 1753 up_write(&osdc->lock);
f5a2041b
YS
1754 schedule_delayed_work(&osdc->osds_timeout_work,
1755 round_jiffies_relative(delay));
1756}
1757
205ee118
ID
1758static int ceph_oloc_decode(void **p, void *end,
1759 struct ceph_object_locator *oloc)
1760{
1761 u8 struct_v, struct_cv;
1762 u32 len;
1763 void *struct_end;
1764 int ret = 0;
1765
1766 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1767 struct_v = ceph_decode_8(p);
1768 struct_cv = ceph_decode_8(p);
1769 if (struct_v < 3) {
1770 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
1771 struct_v, struct_cv);
1772 goto e_inval;
1773 }
1774 if (struct_cv > 6) {
1775 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
1776 struct_v, struct_cv);
1777 goto e_inval;
1778 }
1779 len = ceph_decode_32(p);
1780 ceph_decode_need(p, end, len, e_inval);
1781 struct_end = *p + len;
1782
1783 oloc->pool = ceph_decode_64(p);
1784 *p += 4; /* skip preferred */
1785
1786 len = ceph_decode_32(p);
1787 if (len > 0) {
1788 pr_warn("ceph_object_locator::key is set\n");
1789 goto e_inval;
1790 }
1791
1792 if (struct_v >= 5) {
1793 len = ceph_decode_32(p);
1794 if (len > 0) {
1795 pr_warn("ceph_object_locator::nspace is set\n");
1796 goto e_inval;
1797 }
1798 }
1799
1800 if (struct_v >= 6) {
1801 s64 hash = ceph_decode_64(p);
1802 if (hash != -1) {
1803 pr_warn("ceph_object_locator::hash is set\n");
1804 goto e_inval;
1805 }
1806 }
1807
1808 /* skip the rest */
1809 *p = struct_end;
1810out:
1811 return ret;
1812
1813e_inval:
1814 ret = -EINVAL;
1815 goto out;
1816}
1817
1818static int ceph_redirect_decode(void **p, void *end,
1819 struct ceph_request_redirect *redir)
1820{
1821 u8 struct_v, struct_cv;
1822 u32 len;
1823 void *struct_end;
1824 int ret;
1825
1826 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
1827 struct_v = ceph_decode_8(p);
1828 struct_cv = ceph_decode_8(p);
1829 if (struct_cv > 1) {
1830 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
1831 struct_v, struct_cv);
1832 goto e_inval;
1833 }
1834 len = ceph_decode_32(p);
1835 ceph_decode_need(p, end, len, e_inval);
1836 struct_end = *p + len;
1837
1838 ret = ceph_oloc_decode(p, end, &redir->oloc);
1839 if (ret)
1840 goto out;
1841
1842 len = ceph_decode_32(p);
1843 if (len > 0) {
1844 pr_warn("ceph_request_redirect::object_name is set\n");
1845 goto e_inval;
1846 }
1847
1848 len = ceph_decode_32(p);
1849 *p += len; /* skip osd_instructions */
1850
1851 /* skip the rest */
1852 *p = struct_end;
1853out:
1854 return ret;
1855
1856e_inval:
1857 ret = -EINVAL;
1858 goto out;
1859}
1860
fe5da05e
ID
1861struct MOSDOpReply {
1862 struct ceph_pg pgid;
1863 u64 flags;
1864 int result;
1865 u32 epoch;
1866 int num_ops;
1867 u32 outdata_len[CEPH_OSD_MAX_OPS];
1868 s32 rval[CEPH_OSD_MAX_OPS];
1869 int retry_attempt;
1870 struct ceph_eversion replay_version;
1871 u64 user_version;
1872 struct ceph_request_redirect redirect;
1873};
25845472 1874
fe5da05e 1875static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
f24e9980 1876{
fe5da05e
ID
1877 void *p = msg->front.iov_base;
1878 void *const end = p + msg->front.iov_len;
1879 u16 version = le16_to_cpu(msg->hdr.version);
1880 struct ceph_eversion bad_replay_version;
b0b31a8f 1881 u8 decode_redir;
fe5da05e
ID
1882 u32 len;
1883 int ret;
1884 int i;
1b83bef2 1885
fe5da05e
ID
1886 ceph_decode_32_safe(&p, end, len, e_inval);
1887 ceph_decode_need(&p, end, len, e_inval);
1888 p += len; /* skip oid */
1b83bef2 1889
fe5da05e
ID
1890 ret = ceph_decode_pgid(&p, end, &m->pgid);
1891 if (ret)
1892 return ret;
1b83bef2 1893
fe5da05e
ID
1894 ceph_decode_64_safe(&p, end, m->flags, e_inval);
1895 ceph_decode_32_safe(&p, end, m->result, e_inval);
1896 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
1897 memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
1898 p += sizeof(bad_replay_version);
1899 ceph_decode_32_safe(&p, end, m->epoch, e_inval);
1b83bef2 1900
fe5da05e
ID
1901 ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
1902 if (m->num_ops > ARRAY_SIZE(m->outdata_len))
1903 goto e_inval;
1b83bef2 1904
fe5da05e
ID
1905 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
1906 e_inval);
1907 for (i = 0; i < m->num_ops; i++) {
1b83bef2 1908 struct ceph_osd_op *op = p;
1b83bef2 1909
fe5da05e 1910 m->outdata_len[i] = le32_to_cpu(op->payload_len);
1b83bef2
SW
1911 p += sizeof(*op);
1912 }
1b83bef2 1913
fe5da05e
ID
1914 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
1915 for (i = 0; i < m->num_ops; i++)
1916 ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
f24e9980 1917
fe5da05e
ID
1918 if (version >= 5) {
1919 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
1920 memcpy(&m->replay_version, p, sizeof(m->replay_version));
1921 p += sizeof(m->replay_version);
1922 ceph_decode_64_safe(&p, end, m->user_version, e_inval);
1923 } else {
1924 m->replay_version = bad_replay_version; /* struct */
1925 m->user_version = le64_to_cpu(m->replay_version.version);
1926 }
eb845ff1 1927
fe5da05e
ID
1928 if (version >= 6) {
1929 if (version >= 7)
1930 ceph_decode_8_safe(&p, end, decode_redir, e_inval);
b0b31a8f
ID
1931 else
1932 decode_redir = 1;
1933 } else {
1934 decode_redir = 0;
1935 }
1936
1937 if (decode_redir) {
fe5da05e
ID
1938 ret = ceph_redirect_decode(&p, end, &m->redirect);
1939 if (ret)
1940 return ret;
205ee118 1941 } else {
fe5da05e 1942 ceph_oloc_init(&m->redirect.oloc);
205ee118 1943 }
f24e9980 1944
fe5da05e
ID
1945 return 0;
1946
1947e_inval:
1948 return -EINVAL;
1949}
1950
1951/*
1952 * We are done with @req if
1953 * - @m is a safe reply, or
1954 * - @m is an unsafe reply and we didn't want a safe one
1955 */
1956static bool done_request(const struct ceph_osd_request *req,
1957 const struct MOSDOpReply *m)
1958{
1959 return (m->result < 0 ||
1960 (m->flags & CEPH_OSD_FLAG_ONDISK) ||
1961 !(req->r_flags & CEPH_OSD_FLAG_ONDISK));
1962}
205ee118 1963
fe5da05e
ID
1964/*
1965 * handle osd op reply. either call the callback if it is specified,
1966 * or do the completion to wake up the waiting thread.
1967 *
1968 * ->r_unsafe_callback is set? yes no
1969 *
1970 * first reply is OK (needed r_cb/r_completion, r_cb/r_completion,
1971 * any or needed/got safe) r_safe_completion r_safe_completion
1972 *
1973 * first reply is unsafe r_unsafe_cb(true) (nothing)
1974 *
1975 * when we get the safe reply r_unsafe_cb(false), r_cb/r_completion,
1976 * r_safe_completion r_safe_completion
1977 */
5aea3dcd 1978static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
fe5da05e 1979{
5aea3dcd 1980 struct ceph_osd_client *osdc = osd->o_osdc;
fe5da05e
ID
1981 struct ceph_osd_request *req;
1982 struct MOSDOpReply m;
1983 u64 tid = le64_to_cpu(msg->hdr.tid);
1984 u32 data_len = 0;
1985 bool already_acked;
1986 int ret;
1987 int i;
1988
1989 dout("%s msg %p tid %llu\n", __func__, msg, tid);
1990
5aea3dcd
ID
1991 down_read(&osdc->lock);
1992 if (!osd_registered(osd)) {
1993 dout("%s osd%d unknown\n", __func__, osd->o_osd);
1994 goto out_unlock_osdc;
1995 }
1996 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
1997
1998 mutex_lock(&osd->lock);
1999 req = lookup_request(&osd->o_requests, tid);
fe5da05e 2000 if (!req) {
5aea3dcd
ID
2001 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
2002 goto out_unlock_session;
fe5da05e 2003 }
fe5da05e
ID
2004
2005 ret = decode_MOSDOpReply(msg, &m);
2006 if (ret) {
2007 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
2008 req->r_tid, ret);
2009 ceph_msg_dump(msg);
2010 goto fail_request;
2011 }
2012 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
2013 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
2014 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
2015 le64_to_cpu(m.replay_version.version), m.user_version);
2016
2017 if (m.retry_attempt >= 0) {
2018 if (m.retry_attempt != req->r_attempts - 1) {
2019 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
2020 req, req->r_tid, m.retry_attempt,
2021 req->r_attempts - 1);
5aea3dcd 2022 goto out_unlock_session;
fe5da05e
ID
2023 }
2024 } else {
2025 WARN_ON(1); /* MOSDOpReply v4 is assumed */
2026 }
2027
2028 if (!ceph_oloc_empty(&m.redirect.oloc)) {
2029 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
2030 m.redirect.oloc.pool);
5aea3dcd
ID
2031 unlink_request(osd, req);
2032 mutex_unlock(&osd->lock);
205ee118 2033
fe5da05e 2034 ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
5aea3dcd
ID
2035 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
2036 req->r_tid = 0;
2037 __submit_request(req, false);
2038 goto out_unlock_osdc;
205ee118
ID
2039 }
2040
fe5da05e
ID
2041 if (m.num_ops != req->r_num_ops) {
2042 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
2043 req->r_num_ops, req->r_tid);
2044 goto fail_request;
f24e9980 2045 }
fe5da05e
ID
2046 for (i = 0; i < req->r_num_ops; i++) {
2047 dout(" req %p tid %llu op %d rval %d len %u\n", req,
2048 req->r_tid, i, m.rval[i], m.outdata_len[i]);
2049 req->r_ops[i].rval = m.rval[i];
2050 req->r_ops[i].outdata_len = m.outdata_len[i];
2051 data_len += m.outdata_len[i];
2052 }
2053 if (data_len != le32_to_cpu(msg->hdr.data_len)) {
2054 pr_err("sum of lens %u != %u for tid %llu\n", data_len,
2055 le32_to_cpu(msg->hdr.data_len), req->r_tid);
2056 goto fail_request;
2057 }
2058 dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__,
2059 req, req->r_tid, req->r_got_reply, m.result, data_len);
2060
2061 already_acked = req->r_got_reply;
2062 if (!already_acked) {
2063 req->r_result = m.result ?: data_len;
2064 req->r_replay_version = m.replay_version; /* struct */
2065 req->r_got_reply = true;
2066 } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
2067 dout("req %p tid %llu dup ack\n", req, req->r_tid);
5aea3dcd 2068 goto out_unlock_session;
fe5da05e
ID
2069 }
2070
2071 if (done_request(req, &m)) {
5aea3dcd 2072 __finish_request(req);
fe5da05e
ID
2073 if (req->r_linger) {
2074 WARN_ON(req->r_unsafe_callback);
5aea3dcd 2075 __register_linger_request(osd, req);
fe5da05e
ID
2076 }
2077 }
f24e9980 2078
5aea3dcd
ID
2079 mutex_unlock(&osd->lock);
2080 up_read(&osdc->lock);
f24e9980 2081
fe5da05e
ID
2082 if (done_request(req, &m)) {
2083 if (already_acked && req->r_unsafe_callback) {
2084 dout("req %p tid %llu safe-cb\n", req, req->r_tid);
61c5d6bf 2085 req->r_unsafe_callback(req, false);
fe5da05e
ID
2086 } else {
2087 dout("req %p tid %llu cb\n", req, req->r_tid);
2088 __complete_request(req);
2089 }
2090 } else {
2091 if (req->r_unsafe_callback) {
2092 dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
2093 req->r_unsafe_callback(req, true);
2094 } else {
2095 WARN_ON(1);
2096 }
61c5d6bf 2097 }
fe5da05e
ID
2098 if (m.flags & CEPH_OSD_FLAG_ONDISK)
2099 complete_all(&req->r_safe_completion);
f24e9980 2100
f24e9980
SW
2101 ceph_osdc_put_request(req);
2102 return;
2103
fe5da05e 2104fail_request:
37c89bde 2105 req->r_result = -EIO;
5aea3dcd 2106 __finish_request(req);
fe5da05e
ID
2107 __complete_request(req);
2108 complete_all(&req->r_safe_completion);
5aea3dcd
ID
2109out_unlock_session:
2110 mutex_unlock(&osd->lock);
2111out_unlock_osdc:
2112 up_read(&osdc->lock);
f24e9980
SW
2113}
2114
42c1b124
ID
2115static void set_pool_was_full(struct ceph_osd_client *osdc)
2116{
2117 struct rb_node *n;
2118
2119 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
2120 struct ceph_pg_pool_info *pi =
2121 rb_entry(n, struct ceph_pg_pool_info, node);
2122
2123 pi->was_full = __pool_full(pi);
2124 }
2125}
2126
5aea3dcd 2127static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
f24e9980 2128{
5aea3dcd 2129 struct ceph_pg_pool_info *pi;
f24e9980 2130
5aea3dcd
ID
2131 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
2132 if (!pi)
2133 return false;
f24e9980 2134
5aea3dcd 2135 return pi->was_full && !__pool_full(pi);
422d2cb8
YS
2136}
2137
2138/*
5aea3dcd 2139 * Requeue requests whose mapping to an OSD has changed.
422d2cb8 2140 */
5aea3dcd
ID
2141static void scan_requests(struct ceph_osd *osd,
2142 bool force_resend,
2143 bool cleared_full,
2144 bool check_pool_cleared_full,
2145 struct rb_root *need_resend,
2146 struct list_head *need_resend_linger)
422d2cb8 2147{
5aea3dcd
ID
2148 struct ceph_osd_client *osdc = osd->o_osdc;
2149 struct rb_node *n;
2150 bool force_resend_writes;
2151
2152 for (n = rb_first(&osd->o_requests); n; ) {
2153 struct ceph_osd_request *req =
2154 rb_entry(n, struct ceph_osd_request, r_node);
2155 enum calc_target_result ct_res;
2156
2157 n = rb_next(n); /* unlink_request() */
2158
2159 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2160 ct_res = calc_target(osdc, &req->r_t,
2161 &req->r_last_force_resend, false);
2162 switch (ct_res) {
2163 case CALC_TARGET_NO_ACTION:
2164 force_resend_writes = cleared_full ||
2165 (check_pool_cleared_full &&
2166 pool_cleared_full(osdc, req->r_t.base_oloc.pool));
2167 if (!force_resend &&
2168 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
2169 !force_resend_writes))
2170 break;
2171
2172 /* fall through */
2173 case CALC_TARGET_NEED_RESEND:
2174 unlink_request(osd, req);
2175 insert_request(need_resend, req);
2176 break;
2177 case CALC_TARGET_POOL_DNE:
2178 break;
b0494532 2179 }
6f6c7006 2180 }
422d2cb8 2181}
6f6c7006 2182
42c1b124 2183static int handle_one_map(struct ceph_osd_client *osdc,
5aea3dcd
ID
2184 void *p, void *end, bool incremental,
2185 struct rb_root *need_resend,
2186 struct list_head *need_resend_linger)
42c1b124
ID
2187{
2188 struct ceph_osdmap *newmap;
2189 struct rb_node *n;
2190 bool skipped_map = false;
2191 bool was_full;
2192
2193 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
2194 set_pool_was_full(osdc);
2195
2196 if (incremental)
2197 newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
2198 else
2199 newmap = ceph_osdmap_decode(&p, end);
2200 if (IS_ERR(newmap))
2201 return PTR_ERR(newmap);
2202
2203 if (newmap != osdc->osdmap) {
2204 /*
2205 * Preserve ->was_full before destroying the old map.
2206 * For pools that weren't in the old map, ->was_full
2207 * should be false.
2208 */
2209 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
2210 struct ceph_pg_pool_info *pi =
2211 rb_entry(n, struct ceph_pg_pool_info, node);
2212 struct ceph_pg_pool_info *old_pi;
2213
2214 old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
2215 if (old_pi)
2216 pi->was_full = old_pi->was_full;
2217 else
2218 WARN_ON(pi->was_full);
2219 }
2220
2221 if (osdc->osdmap->epoch &&
2222 osdc->osdmap->epoch + 1 < newmap->epoch) {
2223 WARN_ON(incremental);
2224 skipped_map = true;
2225 }
2226
2227 ceph_osdmap_destroy(osdc->osdmap);
2228 osdc->osdmap = newmap;
2229 }
2230
2231 was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
5aea3dcd
ID
2232 scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
2233 need_resend, need_resend_linger);
2234
2235 for (n = rb_first(&osdc->osds); n; ) {
2236 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2237
2238 n = rb_next(n); /* close_osd() */
2239
2240 scan_requests(osd, skipped_map, was_full, true, need_resend,
2241 need_resend_linger);
2242 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
2243 memcmp(&osd->o_con.peer_addr,
2244 ceph_osd_addr(osdc->osdmap, osd->o_osd),
2245 sizeof(struct ceph_entity_addr)))
2246 close_osd(osd);
2247 }
42c1b124
ID
2248
2249 return 0;
2250}
6f6c7006 2251
5aea3dcd
ID
2252static void kick_requests(struct ceph_osd_client *osdc,
2253 struct rb_root *need_resend,
2254 struct list_head *need_resend_linger)
2255{
2256 struct rb_node *n;
2257
2258 for (n = rb_first(need_resend); n; ) {
2259 struct ceph_osd_request *req =
2260 rb_entry(n, struct ceph_osd_request, r_node);
2261 struct ceph_osd *osd;
2262
2263 n = rb_next(n);
2264 erase_request(need_resend, req); /* before link_request() */
2265
2266 WARN_ON(req->r_osd);
2267 calc_target(osdc, &req->r_t, NULL, false);
2268 osd = lookup_create_osd(osdc, req->r_t.osd, true);
2269 link_request(osd, req);
2270 if (!req->r_linger) {
2271 if (!osd_homeless(osd) && !req->r_t.paused)
2272 send_request(req);
2273 }
2274 }
2275}
2276
f24e9980
SW
2277/*
2278 * Process updated osd map.
2279 *
2280 * The message contains any number of incremental and full maps, normally
2281 * indicating some sort of topology change in the cluster. Kick requests
2282 * off to different OSDs as needed.
2283 */
2284void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
2285{
42c1b124
ID
2286 void *p = msg->front.iov_base;
2287 void *const end = p + msg->front.iov_len;
f24e9980
SW
2288 u32 nr_maps, maplen;
2289 u32 epoch;
f24e9980 2290 struct ceph_fsid fsid;
5aea3dcd
ID
2291 struct rb_root need_resend = RB_ROOT;
2292 LIST_HEAD(need_resend_linger);
42c1b124
ID
2293 bool handled_incremental = false;
2294 bool was_pauserd, was_pausewr;
2295 bool pauserd, pausewr;
2296 int err;
f24e9980 2297
42c1b124 2298 dout("%s have %u\n", __func__, osdc->osdmap->epoch);
5aea3dcd 2299 down_write(&osdc->lock);
f24e9980
SW
2300
2301 /* verify fsid */
2302 ceph_decode_need(&p, end, sizeof(fsid), bad);
2303 ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d 2304 if (ceph_check_fsid(osdc->client, &fsid) < 0)
42c1b124 2305 goto bad;
f24e9980 2306
42c1b124
ID
2307 was_pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
2308 was_pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
2309 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
2310 have_pool_full(osdc);
9a1ea2db 2311
f24e9980
SW
2312 /* incremental maps */
2313 ceph_decode_32_safe(&p, end, nr_maps, bad);
2314 dout(" %d inc maps\n", nr_maps);
2315 while (nr_maps > 0) {
2316 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
2317 epoch = ceph_decode_32(&p);
2318 maplen = ceph_decode_32(&p);
f24e9980 2319 ceph_decode_need(&p, end, maplen, bad);
42c1b124
ID
2320 if (osdc->osdmap->epoch &&
2321 osdc->osdmap->epoch + 1 == epoch) {
f24e9980
SW
2322 dout("applying incremental map %u len %d\n",
2323 epoch, maplen);
5aea3dcd
ID
2324 err = handle_one_map(osdc, p, p + maplen, true,
2325 &need_resend, &need_resend_linger);
42c1b124 2326 if (err)
f24e9980 2327 goto bad;
42c1b124 2328 handled_incremental = true;
f24e9980
SW
2329 } else {
2330 dout("ignoring incremental map %u len %d\n",
2331 epoch, maplen);
2332 }
42c1b124 2333 p += maplen;
f24e9980
SW
2334 nr_maps--;
2335 }
42c1b124 2336 if (handled_incremental)
f24e9980
SW
2337 goto done;
2338
2339 /* full maps */
2340 ceph_decode_32_safe(&p, end, nr_maps, bad);
2341 dout(" %d full maps\n", nr_maps);
2342 while (nr_maps) {
2343 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
2344 epoch = ceph_decode_32(&p);
2345 maplen = ceph_decode_32(&p);
f24e9980
SW
2346 ceph_decode_need(&p, end, maplen, bad);
2347 if (nr_maps > 1) {
2348 dout("skipping non-latest full map %u len %d\n",
2349 epoch, maplen);
e5253a7b 2350 } else if (osdc->osdmap->epoch >= epoch) {
f24e9980
SW
2351 dout("skipping full map %u len %d, "
2352 "older than our %u\n", epoch, maplen,
2353 osdc->osdmap->epoch);
2354 } else {
2355 dout("taking full map %u len %d\n", epoch, maplen);
5aea3dcd
ID
2356 err = handle_one_map(osdc, p, p + maplen, false,
2357 &need_resend, &need_resend_linger);
42c1b124 2358 if (err)
f24e9980 2359 goto bad;
f24e9980
SW
2360 }
2361 p += maplen;
2362 nr_maps--;
2363 }
2364
2365done:
cd634fb6
SW
2366 /*
2367 * subscribe to subsequent osdmap updates if full to ensure
2368 * we find out when we are no longer full and stop returning
2369 * ENOSPC.
2370 */
42c1b124
ID
2371 pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
2372 pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
2373 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
2374 have_pool_full(osdc);
2375 if (was_pauserd || was_pausewr || pauserd || pausewr)
2376 maybe_request_map(osdc);
cd634fb6 2377
5aea3dcd 2378 kick_requests(osdc, &need_resend, &need_resend_linger);
42c1b124
ID
2379
2380 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2381 osdc->osdmap->epoch);
5aea3dcd 2382 up_write(&osdc->lock);
03066f23 2383 wake_up_all(&osdc->client->auth_wq);
f24e9980
SW
2384 return;
2385
2386bad:
2387 pr_err("osdc handle_map corrupt msg\n");
9ec7cab1 2388 ceph_msg_dump(msg);
5aea3dcd
ID
2389 up_write(&osdc->lock);
2390}
2391
2392/*
2393 * Resubmit requests pending on the given osd.
2394 */
2395static void kick_osd_requests(struct ceph_osd *osd)
2396{
2397 struct rb_node *n;
2398
2399 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
2400 struct ceph_osd_request *req =
2401 rb_entry(n, struct ceph_osd_request, r_node);
2402
2403 if (!req->r_linger) {
2404 if (!req->r_t.paused)
2405 send_request(req);
2406 }
2407 }
2408}
2409
2410/*
2411 * If the osd connection drops, we need to resubmit all requests.
2412 */
2413static void osd_fault(struct ceph_connection *con)
2414{
2415 struct ceph_osd *osd = con->private;
2416 struct ceph_osd_client *osdc = osd->o_osdc;
2417
2418 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
2419
2420 down_write(&osdc->lock);
2421 if (!osd_registered(osd)) {
2422 dout("%s osd%d unknown\n", __func__, osd->o_osd);
2423 goto out_unlock;
2424 }
2425
2426 if (!reopen_osd(osd))
2427 kick_osd_requests(osd);
2428 maybe_request_map(osdc);
2429
2430out_unlock:
2431 up_write(&osdc->lock);
f24e9980
SW
2432}
2433
a40c4f10
YS
2434/*
2435 * watch/notify callback event infrastructure
2436 *
2437 * These callbacks are used both for watch and notify operations.
2438 */
2439static void __release_event(struct kref *kref)
2440{
2441 struct ceph_osd_event *event =
2442 container_of(kref, struct ceph_osd_event, kref);
2443
2444 dout("__release_event %p\n", event);
2445 kfree(event);
2446}
2447
2448static void get_event(struct ceph_osd_event *event)
2449{
2450 kref_get(&event->kref);
2451}
2452
2453void ceph_osdc_put_event(struct ceph_osd_event *event)
2454{
2455 kref_put(&event->kref, __release_event);
2456}
2457EXPORT_SYMBOL(ceph_osdc_put_event);
2458
2459static void __insert_event(struct ceph_osd_client *osdc,
2460 struct ceph_osd_event *new)
2461{
2462 struct rb_node **p = &osdc->event_tree.rb_node;
2463 struct rb_node *parent = NULL;
2464 struct ceph_osd_event *event = NULL;
2465
2466 while (*p) {
2467 parent = *p;
2468 event = rb_entry(parent, struct ceph_osd_event, node);
2469 if (new->cookie < event->cookie)
2470 p = &(*p)->rb_left;
2471 else if (new->cookie > event->cookie)
2472 p = &(*p)->rb_right;
2473 else
2474 BUG();
2475 }
2476
2477 rb_link_node(&new->node, parent, p);
2478 rb_insert_color(&new->node, &osdc->event_tree);
2479}
2480
2481static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
2482 u64 cookie)
2483{
2484 struct rb_node **p = &osdc->event_tree.rb_node;
2485 struct rb_node *parent = NULL;
2486 struct ceph_osd_event *event = NULL;
2487
2488 while (*p) {
2489 parent = *p;
2490 event = rb_entry(parent, struct ceph_osd_event, node);
2491 if (cookie < event->cookie)
2492 p = &(*p)->rb_left;
2493 else if (cookie > event->cookie)
2494 p = &(*p)->rb_right;
2495 else
2496 return event;
2497 }
2498 return NULL;
2499}
2500
2501static void __remove_event(struct ceph_osd_event *event)
2502{
2503 struct ceph_osd_client *osdc = event->osdc;
2504
2505 if (!RB_EMPTY_NODE(&event->node)) {
2506 dout("__remove_event removed %p\n", event);
2507 rb_erase(&event->node, &osdc->event_tree);
2508 ceph_osdc_put_event(event);
2509 } else {
2510 dout("__remove_event didn't remove %p\n", event);
2511 }
2512}
2513
2514int ceph_osdc_create_event(struct ceph_osd_client *osdc,
2515 void (*event_cb)(u64, u64, u8, void *),
3c663bbd 2516 void *data, struct ceph_osd_event **pevent)
a40c4f10
YS
2517{
2518 struct ceph_osd_event *event;
2519
2520 event = kmalloc(sizeof(*event), GFP_NOIO);
2521 if (!event)
2522 return -ENOMEM;
2523
2524 dout("create_event %p\n", event);
2525 event->cb = event_cb;
3c663bbd 2526 event->one_shot = 0;
a40c4f10
YS
2527 event->data = data;
2528 event->osdc = osdc;
2529 INIT_LIST_HEAD(&event->osd_node);
3ee5234d 2530 RB_CLEAR_NODE(&event->node);
a40c4f10
YS
2531 kref_init(&event->kref); /* one ref for us */
2532 kref_get(&event->kref); /* one ref for the caller */
a40c4f10
YS
2533
2534 spin_lock(&osdc->event_lock);
2535 event->cookie = ++osdc->event_count;
2536 __insert_event(osdc, event);
2537 spin_unlock(&osdc->event_lock);
2538
2539 *pevent = event;
2540 return 0;
2541}
2542EXPORT_SYMBOL(ceph_osdc_create_event);
2543
2544void ceph_osdc_cancel_event(struct ceph_osd_event *event)
2545{
2546 struct ceph_osd_client *osdc = event->osdc;
2547
2548 dout("cancel_event %p\n", event);
2549 spin_lock(&osdc->event_lock);
2550 __remove_event(event);
2551 spin_unlock(&osdc->event_lock);
2552 ceph_osdc_put_event(event); /* caller's */
2553}
2554EXPORT_SYMBOL(ceph_osdc_cancel_event);
2555
2556
2557static void do_event_work(struct work_struct *work)
2558{
2559 struct ceph_osd_event_work *event_work =
2560 container_of(work, struct ceph_osd_event_work, work);
2561 struct ceph_osd_event *event = event_work->event;
2562 u64 ver = event_work->ver;
2563 u64 notify_id = event_work->notify_id;
2564 u8 opcode = event_work->opcode;
2565
2566 dout("do_event_work completing %p\n", event);
2567 event->cb(ver, notify_id, opcode, event->data);
a40c4f10
YS
2568 dout("do_event_work completed %p\n", event);
2569 ceph_osdc_put_event(event);
2570 kfree(event_work);
2571}
2572
2573
2574/*
2575 * Process osd watch notifications
2576 */
3c663bbd
AE
2577static void handle_watch_notify(struct ceph_osd_client *osdc,
2578 struct ceph_msg *msg)
a40c4f10
YS
2579{
2580 void *p, *end;
2581 u8 proto_ver;
2582 u64 cookie, ver, notify_id;
2583 u8 opcode;
2584 struct ceph_osd_event *event;
2585 struct ceph_osd_event_work *event_work;
2586
2587 p = msg->front.iov_base;
2588 end = p + msg->front.iov_len;
2589
2590 ceph_decode_8_safe(&p, end, proto_ver, bad);
2591 ceph_decode_8_safe(&p, end, opcode, bad);
2592 ceph_decode_64_safe(&p, end, cookie, bad);
2593 ceph_decode_64_safe(&p, end, ver, bad);
2594 ceph_decode_64_safe(&p, end, notify_id, bad);
2595
2596 spin_lock(&osdc->event_lock);
2597 event = __find_event(osdc, cookie);
2598 if (event) {
3c663bbd 2599 BUG_ON(event->one_shot);
a40c4f10 2600 get_event(event);
a40c4f10
YS
2601 }
2602 spin_unlock(&osdc->event_lock);
2603 dout("handle_watch_notify cookie %lld ver %lld event %p\n",
2604 cookie, ver, event);
2605 if (event) {
2606 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
a40c4f10 2607 if (!event_work) {
91883cd2
ID
2608 pr_err("couldn't allocate event_work\n");
2609 ceph_osdc_put_event(event);
2610 return;
a40c4f10 2611 }
6b0ae409 2612 INIT_WORK(&event_work->work, do_event_work);
a40c4f10
YS
2613 event_work->event = event;
2614 event_work->ver = ver;
2615 event_work->notify_id = notify_id;
2616 event_work->opcode = opcode;
a40c4f10 2617
91883cd2
ID
2618 queue_work(osdc->notify_wq, &event_work->work);
2619 }
a40c4f10 2620
a40c4f10
YS
2621 return;
2622
2623bad:
2624 pr_err("osdc handle_watch_notify corrupt msg\n");
a40c4f10
YS
2625}
2626
70636773
AE
2627/*
2628 * Register request, send initial attempt.
2629 */
2630int ceph_osdc_start_request(struct ceph_osd_client *osdc,
2631 struct ceph_osd_request *req,
2632 bool nofail)
2633{
5aea3dcd
ID
2634 down_read(&osdc->lock);
2635 submit_request(req, false);
2636 up_read(&osdc->lock);
0bbfdfe8 2637
5aea3dcd 2638 return 0;
f24e9980 2639}
3d14c5d2 2640EXPORT_SYMBOL(ceph_osdc_start_request);
f24e9980 2641
c9f9b93d
ID
2642/*
2643 * Unregister a registered request. The request is not completed (i.e.
2644 * no callbacks or wakeups) - higher layers are supposed to know what
2645 * they are canceling.
2646 */
2647void ceph_osdc_cancel_request(struct ceph_osd_request *req)
2648{
2649 struct ceph_osd_client *osdc = req->r_osdc;
2650
5aea3dcd 2651 down_write(&osdc->lock);
c9f9b93d
ID
2652 if (req->r_linger)
2653 __unregister_linger_request(osdc, req);
5aea3dcd
ID
2654 if (req->r_osd)
2655 cancel_request(req);
2656 up_write(&osdc->lock);
c9f9b93d
ID
2657}
2658EXPORT_SYMBOL(ceph_osdc_cancel_request);
2659
f24e9980
SW
2660/*
2661 * wait for a request to complete
2662 */
2663int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
2664 struct ceph_osd_request *req)
2665{
2666 int rc;
2667
c9f9b93d
ID
2668 dout("%s %p tid %llu\n", __func__, req, req->r_tid);
2669
f24e9980
SW
2670 rc = wait_for_completion_interruptible(&req->r_completion);
2671 if (rc < 0) {
c9f9b93d
ID
2672 dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
2673 ceph_osdc_cancel_request(req);
fe5da05e
ID
2674
2675 /* kludge - need to to wake ceph_osdc_sync() */
2676 complete_all(&req->r_safe_completion);
f24e9980
SW
2677 return rc;
2678 }
2679
c9f9b93d
ID
2680 dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid,
2681 req->r_result);
f24e9980
SW
2682 return req->r_result;
2683}
3d14c5d2 2684EXPORT_SYMBOL(ceph_osdc_wait_request);
f24e9980
SW
2685
2686/*
2687 * sync - wait for all in-flight requests to flush. avoid starvation.
2688 */
2689void ceph_osdc_sync(struct ceph_osd_client *osdc)
2690{
5aea3dcd
ID
2691 struct rb_node *n, *p;
2692 u64 last_tid = atomic64_read(&osdc->last_tid);
f24e9980 2693
5aea3dcd
ID
2694again:
2695 down_read(&osdc->lock);
2696 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2697 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2698
2699 mutex_lock(&osd->lock);
2700 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
2701 struct ceph_osd_request *req =
2702 rb_entry(p, struct ceph_osd_request, r_node);
2703
2704 if (req->r_tid > last_tid)
2705 break;
2706
2707 if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
2708 continue;
f24e9980 2709
5aea3dcd
ID
2710 ceph_osdc_get_request(req);
2711 mutex_unlock(&osd->lock);
2712 up_read(&osdc->lock);
2713 dout("%s waiting on req %p tid %llu last_tid %llu\n",
2714 __func__, req, req->r_tid, last_tid);
2715 wait_for_completion(&req->r_safe_completion);
2716 ceph_osdc_put_request(req);
2717 goto again;
2718 }
f24e9980 2719
5aea3dcd 2720 mutex_unlock(&osd->lock);
f24e9980 2721 }
5aea3dcd
ID
2722
2723 up_read(&osdc->lock);
2724 dout("%s done last_tid %llu\n", __func__, last_tid);
f24e9980 2725}
3d14c5d2 2726EXPORT_SYMBOL(ceph_osdc_sync);
f24e9980 2727
dd935f44
JD
2728/*
2729 * Call all pending notify callbacks - for use after a watch is
2730 * unregistered, to make sure no more callbacks for it will be invoked
2731 */
f6479449 2732void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
dd935f44
JD
2733{
2734 flush_workqueue(osdc->notify_wq);
2735}
2736EXPORT_SYMBOL(ceph_osdc_flush_notifies);
2737
2738
f24e9980
SW
2739/*
2740 * init, shutdown
2741 */
2742int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
2743{
2744 int err;
2745
2746 dout("init\n");
2747 osdc->client = client;
5aea3dcd 2748 init_rwsem(&osdc->lock);
f24e9980 2749 osdc->osds = RB_ROOT;
f5a2041b 2750 INIT_LIST_HEAD(&osdc->osd_lru);
9dd2845c 2751 spin_lock_init(&osdc->osd_lru_lock);
a40c4f10 2752 INIT_LIST_HEAD(&osdc->req_linger);
5aea3dcd
ID
2753 osd_init(&osdc->homeless_osd);
2754 osdc->homeless_osd.o_osdc = osdc;
2755 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
f24e9980 2756 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
f5a2041b 2757 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
a40c4f10
YS
2758 spin_lock_init(&osdc->event_lock);
2759 osdc->event_tree = RB_ROOT;
2760 osdc->event_count = 0;
f5a2041b 2761
5f44f142 2762 err = -ENOMEM;
e5253a7b
ID
2763 osdc->osdmap = ceph_osdmap_alloc();
2764 if (!osdc->osdmap)
2765 goto out;
2766
9e767adb
ID
2767 osdc->req_mempool = mempool_create_slab_pool(10,
2768 ceph_osd_request_cache);
f24e9980 2769 if (!osdc->req_mempool)
e5253a7b 2770 goto out_map;
f24e9980 2771
d50b409f 2772 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
711da55d 2773 PAGE_SIZE, 10, true, "osd_op");
f24e9980 2774 if (err < 0)
5f44f142 2775 goto out_mempool;
d50b409f 2776 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
711da55d 2777 PAGE_SIZE, 10, true, "osd_op_reply");
c16e7869
SW
2778 if (err < 0)
2779 goto out_msgpool;
a40c4f10 2780
dbcae088 2781 err = -ENOMEM;
a40c4f10 2782 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
dbcae088 2783 if (!osdc->notify_wq)
c172ec5c
ID
2784 goto out_msgpool_reply;
2785
fbca9635
ID
2786 schedule_delayed_work(&osdc->timeout_work,
2787 osdc->client->options->osd_keepalive_timeout);
b37ee1b9
ID
2788 schedule_delayed_work(&osdc->osds_timeout_work,
2789 round_jiffies_relative(osdc->client->options->osd_idle_ttl));
2790
f24e9980 2791 return 0;
5f44f142 2792
c172ec5c
ID
2793out_msgpool_reply:
2794 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
c16e7869
SW
2795out_msgpool:
2796 ceph_msgpool_destroy(&osdc->msgpool_op);
5f44f142
SW
2797out_mempool:
2798 mempool_destroy(osdc->req_mempool);
e5253a7b
ID
2799out_map:
2800 ceph_osdmap_destroy(osdc->osdmap);
5f44f142
SW
2801out:
2802 return err;
f24e9980
SW
2803}
2804
2805void ceph_osdc_stop(struct ceph_osd_client *osdc)
2806{
a40c4f10
YS
2807 flush_workqueue(osdc->notify_wq);
2808 destroy_workqueue(osdc->notify_wq);
f24e9980 2809 cancel_delayed_work_sync(&osdc->timeout_work);
f5a2041b 2810 cancel_delayed_work_sync(&osdc->osds_timeout_work);
42a2c09f 2811
5aea3dcd 2812 down_write(&osdc->lock);
42a2c09f
ID
2813 while (!RB_EMPTY_ROOT(&osdc->osds)) {
2814 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
2815 struct ceph_osd, o_node);
5aea3dcd 2816 close_osd(osd);
42a2c09f 2817 }
5aea3dcd
ID
2818 up_write(&osdc->lock);
2819 WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
2820 osd_cleanup(&osdc->homeless_osd);
2821
2822 WARN_ON(!list_empty(&osdc->osd_lru));
2823 WARN_ON(atomic_read(&osdc->num_requests));
2824 WARN_ON(atomic_read(&osdc->num_homeless));
42a2c09f 2825
e5253a7b 2826 ceph_osdmap_destroy(osdc->osdmap);
f24e9980
SW
2827 mempool_destroy(osdc->req_mempool);
2828 ceph_msgpool_destroy(&osdc->msgpool_op);
c16e7869 2829 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
f24e9980
SW
2830}
2831
2832/*
2833 * Read some contiguous pages. If we cross a stripe boundary, shorten
2834 * *plen. Return number of bytes read, or error.
2835 */
2836int ceph_osdc_readpages(struct ceph_osd_client *osdc,
2837 struct ceph_vino vino, struct ceph_file_layout *layout,
2838 u64 off, u64 *plen,
2839 u32 truncate_seq, u64 truncate_size,
b7495fc2 2840 struct page **pages, int num_pages, int page_align)
f24e9980
SW
2841{
2842 struct ceph_osd_request *req;
2843 int rc = 0;
2844
2845 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
2846 vino.snap, off, *plen);
715e4cd4 2847 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
f24e9980 2848 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
acead002 2849 NULL, truncate_seq, truncate_size,
153e5167 2850 false);
6816282d
SW
2851 if (IS_ERR(req))
2852 return PTR_ERR(req);
f24e9980
SW
2853
2854 /* it may be a short read due to an object boundary */
406e2c9f 2855 osd_req_op_extent_osd_data_pages(req, 0,
a4ce40a9 2856 pages, *plen, page_align, false, false);
f24e9980 2857
e0c59487 2858 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
43bfe5de 2859 off, *plen, *plen, page_align);
f24e9980
SW
2860
2861 rc = ceph_osdc_start_request(osdc, req, false);
2862 if (!rc)
2863 rc = ceph_osdc_wait_request(osdc, req);
2864
2865 ceph_osdc_put_request(req);
2866 dout("readpages result %d\n", rc);
2867 return rc;
2868}
3d14c5d2 2869EXPORT_SYMBOL(ceph_osdc_readpages);
f24e9980
SW
2870
2871/*
2872 * do a synchronous write on N pages
2873 */
2874int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
2875 struct ceph_file_layout *layout,
2876 struct ceph_snap_context *snapc,
2877 u64 off, u64 len,
2878 u32 truncate_seq, u64 truncate_size,
2879 struct timespec *mtime,
24808826 2880 struct page **pages, int num_pages)
f24e9980
SW
2881{
2882 struct ceph_osd_request *req;
2883 int rc = 0;
b7495fc2 2884 int page_align = off & ~PAGE_MASK;
f24e9980 2885
715e4cd4 2886 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
f24e9980 2887 CEPH_OSD_OP_WRITE,
24808826 2888 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
acead002 2889 snapc, truncate_seq, truncate_size,
153e5167 2890 true);
6816282d
SW
2891 if (IS_ERR(req))
2892 return PTR_ERR(req);
f24e9980
SW
2893
2894 /* it may be a short write due to an object boundary */
406e2c9f 2895 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
43bfe5de
AE
2896 false, false);
2897 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
f24e9980 2898
bb873b53 2899 req->r_mtime = *mtime;
87f979d3 2900 rc = ceph_osdc_start_request(osdc, req, true);
f24e9980
SW
2901 if (!rc)
2902 rc = ceph_osdc_wait_request(osdc, req);
2903
2904 ceph_osdc_put_request(req);
2905 if (rc == 0)
2906 rc = len;
2907 dout("writepages result %d\n", rc);
2908 return rc;
2909}
3d14c5d2 2910EXPORT_SYMBOL(ceph_osdc_writepages);
f24e9980 2911
5522ae0b
AE
2912int ceph_osdc_setup(void)
2913{
3f1af42a
ID
2914 size_t size = sizeof(struct ceph_osd_request) +
2915 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
2916
5522ae0b 2917 BUG_ON(ceph_osd_request_cache);
3f1af42a
ID
2918 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
2919 0, 0, NULL);
5522ae0b
AE
2920
2921 return ceph_osd_request_cache ? 0 : -ENOMEM;
2922}
2923EXPORT_SYMBOL(ceph_osdc_setup);
2924
2925void ceph_osdc_cleanup(void)
2926{
2927 BUG_ON(!ceph_osd_request_cache);
2928 kmem_cache_destroy(ceph_osd_request_cache);
2929 ceph_osd_request_cache = NULL;
2930}
2931EXPORT_SYMBOL(ceph_osdc_cleanup);
2932
f24e9980
SW
2933/*
2934 * handle incoming message
2935 */
2936static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2937{
2938 struct ceph_osd *osd = con->private;
5aea3dcd 2939 struct ceph_osd_client *osdc = osd->o_osdc;
f24e9980
SW
2940 int type = le16_to_cpu(msg->hdr.type);
2941
f24e9980
SW
2942 switch (type) {
2943 case CEPH_MSG_OSD_MAP:
2944 ceph_osdc_handle_map(osdc, msg);
2945 break;
2946 case CEPH_MSG_OSD_OPREPLY:
5aea3dcd 2947 handle_reply(osd, msg);
f24e9980 2948 break;
a40c4f10
YS
2949 case CEPH_MSG_WATCH_NOTIFY:
2950 handle_watch_notify(osdc, msg);
2951 break;
f24e9980
SW
2952
2953 default:
2954 pr_err("received unknown message type %d %s\n", type,
2955 ceph_msg_type_name(type));
2956 }
5aea3dcd 2957
f24e9980
SW
2958 ceph_msg_put(msg);
2959}
2960
5b3a4db3 2961/*
d15f9d69
ID
2962 * Lookup and return message for incoming reply. Don't try to do
2963 * anything about a larger than preallocated data portion of the
2964 * message at the moment - for now, just skip the message.
5b3a4db3
SW
2965 */
2966static struct ceph_msg *get_reply(struct ceph_connection *con,
2450418c
YS
2967 struct ceph_msg_header *hdr,
2968 int *skip)
f24e9980
SW
2969{
2970 struct ceph_osd *osd = con->private;
2971 struct ceph_osd_client *osdc = osd->o_osdc;
5aea3dcd 2972 struct ceph_msg *m = NULL;
0547a9b3 2973 struct ceph_osd_request *req;
3f0a4ac5 2974 int front_len = le32_to_cpu(hdr->front_len);
5b3a4db3 2975 int data_len = le32_to_cpu(hdr->data_len);
5aea3dcd 2976 u64 tid = le64_to_cpu(hdr->tid);
f24e9980 2977
5aea3dcd
ID
2978 down_read(&osdc->lock);
2979 if (!osd_registered(osd)) {
2980 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
2981 *skip = 1;
2982 goto out_unlock_osdc;
2983 }
2984 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
2985
2986 mutex_lock(&osd->lock);
2987 req = lookup_request(&osd->o_requests, tid);
0547a9b3 2988 if (!req) {
cd8140c6
ID
2989 dout("%s osd%d tid %llu unknown, skipping\n", __func__,
2990 osd->o_osd, tid);
d15f9d69 2991 *skip = 1;
5aea3dcd 2992 goto out_unlock_session;
0547a9b3 2993 }
c16e7869 2994
ace6d3a9 2995 ceph_msg_revoke_incoming(req->r_reply);
0547a9b3 2996
f2be82b0 2997 if (front_len > req->r_reply->front_alloc_len) {
d15f9d69
ID
2998 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
2999 __func__, osd->o_osd, req->r_tid, front_len,
3000 req->r_reply->front_alloc_len);
3f0a4ac5
ID
3001 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
3002 false);
a79832f2 3003 if (!m)
5aea3dcd 3004 goto out_unlock_session;
c16e7869
SW
3005 ceph_msg_put(req->r_reply);
3006 req->r_reply = m;
3007 }
0fff87ec 3008
d15f9d69
ID
3009 if (data_len > req->r_reply->data_length) {
3010 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
3011 __func__, osd->o_osd, req->r_tid, data_len,
3012 req->r_reply->data_length);
3013 m = NULL;
3014 *skip = 1;
5aea3dcd 3015 goto out_unlock_session;
0547a9b3 3016 }
d15f9d69
ID
3017
3018 m = ceph_msg_get(req->r_reply);
c16e7869 3019 dout("get_reply tid %lld %p\n", tid, m);
0547a9b3 3020
5aea3dcd
ID
3021out_unlock_session:
3022 mutex_unlock(&osd->lock);
3023out_unlock_osdc:
3024 up_read(&osdc->lock);
2450418c 3025 return m;
5b3a4db3
SW
3026}
3027
3028static struct ceph_msg *alloc_msg(struct ceph_connection *con,
3029 struct ceph_msg_header *hdr,
3030 int *skip)
3031{
3032 struct ceph_osd *osd = con->private;
3033 int type = le16_to_cpu(hdr->type);
3034 int front = le32_to_cpu(hdr->front_len);
3035
1c20f2d2 3036 *skip = 0;
5b3a4db3
SW
3037 switch (type) {
3038 case CEPH_MSG_OSD_MAP:
a40c4f10 3039 case CEPH_MSG_WATCH_NOTIFY:
b61c2763 3040 return ceph_msg_new(type, front, GFP_NOFS, false);
5b3a4db3
SW
3041 case CEPH_MSG_OSD_OPREPLY:
3042 return get_reply(con, hdr, skip);
3043 default:
5aea3dcd
ID
3044 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
3045 osd->o_osd, type);
5b3a4db3
SW
3046 *skip = 1;
3047 return NULL;
3048 }
f24e9980
SW
3049}
3050
3051/*
3052 * Wrappers to refcount containing ceph_osd struct
3053 */
3054static struct ceph_connection *get_osd_con(struct ceph_connection *con)
3055{
3056 struct ceph_osd *osd = con->private;
3057 if (get_osd(osd))
3058 return con;
3059 return NULL;
3060}
3061
3062static void put_osd_con(struct ceph_connection *con)
3063{
3064 struct ceph_osd *osd = con->private;
3065 put_osd(osd);
3066}
3067
4e7a5dcd
SW
3068/*
3069 * authentication
3070 */
a3530df3
AE
3071/*
3072 * Note: returned pointer is the address of a structure that's
3073 * managed separately. Caller must *not* attempt to free it.
3074 */
3075static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
8f43fb53 3076 int *proto, int force_new)
4e7a5dcd
SW
3077{
3078 struct ceph_osd *o = con->private;
3079 struct ceph_osd_client *osdc = o->o_osdc;
3080 struct ceph_auth_client *ac = osdc->client->monc.auth;
74f1869f 3081 struct ceph_auth_handshake *auth = &o->o_auth;
4e7a5dcd 3082
74f1869f 3083 if (force_new && auth->authorizer) {
6c1ea260 3084 ceph_auth_destroy_authorizer(auth->authorizer);
74f1869f
AE
3085 auth->authorizer = NULL;
3086 }
27859f97
SW
3087 if (!auth->authorizer) {
3088 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
3089 auth);
4e7a5dcd 3090 if (ret)
a3530df3 3091 return ERR_PTR(ret);
27859f97
SW
3092 } else {
3093 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
0bed9b5c
SW
3094 auth);
3095 if (ret)
3096 return ERR_PTR(ret);
4e7a5dcd 3097 }
4e7a5dcd 3098 *proto = ac->protocol;
74f1869f 3099
a3530df3 3100 return auth;
4e7a5dcd
SW
3101}
3102
3103
3104static int verify_authorizer_reply(struct ceph_connection *con, int len)
3105{
3106 struct ceph_osd *o = con->private;
3107 struct ceph_osd_client *osdc = o->o_osdc;
3108 struct ceph_auth_client *ac = osdc->client->monc.auth;
3109
27859f97 3110 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len);
4e7a5dcd
SW
3111}
3112
9bd2e6f8
SW
3113static int invalidate_authorizer(struct ceph_connection *con)
3114{
3115 struct ceph_osd *o = con->private;
3116 struct ceph_osd_client *osdc = o->o_osdc;
3117 struct ceph_auth_client *ac = osdc->client->monc.auth;
3118
27859f97 3119 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
9bd2e6f8
SW
3120 return ceph_monc_validate_auth(&osdc->client->monc);
3121}
4e7a5dcd 3122
79dbd1ba 3123static int osd_sign_message(struct ceph_msg *msg)
33d07337 3124{
79dbd1ba 3125 struct ceph_osd *o = msg->con->private;
33d07337 3126 struct ceph_auth_handshake *auth = &o->o_auth;
79dbd1ba 3127
33d07337
YZ
3128 return ceph_auth_sign_message(auth, msg);
3129}
3130
79dbd1ba 3131static int osd_check_message_signature(struct ceph_msg *msg)
33d07337 3132{
79dbd1ba 3133 struct ceph_osd *o = msg->con->private;
33d07337 3134 struct ceph_auth_handshake *auth = &o->o_auth;
79dbd1ba 3135
33d07337
YZ
3136 return ceph_auth_check_message_signature(auth, msg);
3137}
3138
9e32789f 3139static const struct ceph_connection_operations osd_con_ops = {
f24e9980
SW
3140 .get = get_osd_con,
3141 .put = put_osd_con,
3142 .dispatch = dispatch,
4e7a5dcd
SW
3143 .get_authorizer = get_authorizer,
3144 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 3145 .invalidate_authorizer = invalidate_authorizer,
f24e9980 3146 .alloc_msg = alloc_msg,
79dbd1ba
ID
3147 .sign_message = osd_sign_message,
3148 .check_message_signature = osd_check_message_signature,
5aea3dcd 3149 .fault = osd_fault,
f24e9980 3150};