]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - net/ceph/osd_client.c
libceph: pass length to ceph_calc_file_object_mapping()
[mirror_ubuntu-bionic-kernel.git] / net / ceph / osd_client.c
CommitLineData
3d14c5d2 1#include <linux/ceph/ceph_debug.h>
f24e9980 2
3d14c5d2 3#include <linux/module.h>
f24e9980
SW
4#include <linux/err.h>
5#include <linux/highmem.h>
6#include <linux/mm.h>
7#include <linux/pagemap.h>
8#include <linux/slab.h>
9#include <linux/uaccess.h>
68b4476b
YS
10#ifdef CONFIG_BLOCK
11#include <linux/bio.h>
12#endif
f24e9980 13
3d14c5d2
YS
14#include <linux/ceph/libceph.h>
15#include <linux/ceph/osd_client.h>
16#include <linux/ceph/messenger.h>
17#include <linux/ceph/decode.h>
18#include <linux/ceph/auth.h>
19#include <linux/ceph/pagelist.h>
f24e9980 20
c16e7869
SW
21#define OSD_OP_FRONT_LEN 4096
22#define OSD_OPREPLY_FRONT_LEN 512
0d59ab81 23
9e32789f 24static const struct ceph_connection_operations osd_con_ops;
f24e9980 25
6f6c7006
SW
26static void send_queued(struct ceph_osd_client *osdc);
27static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
a40c4f10
YS
28static void __register_request(struct ceph_osd_client *osdc,
29 struct ceph_osd_request *req);
30static void __unregister_linger_request(struct ceph_osd_client *osdc,
31 struct ceph_osd_request *req);
56e925b6
SW
32static void __send_request(struct ceph_osd_client *osdc,
33 struct ceph_osd_request *req);
f24e9980 34
68b4476b
YS
35static int op_has_extent(int op)
36{
37 return (op == CEPH_OSD_OP_READ ||
38 op == CEPH_OSD_OP_WRITE);
39}
40
d63b77f4 41int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
3499e8a5
YS
42 struct ceph_file_layout *layout,
43 u64 snapid,
68b4476b
YS
44 u64 off, u64 *plen, u64 *bno,
45 struct ceph_osd_request *req,
46 struct ceph_osd_req_op *op)
3499e8a5
YS
47{
48 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
68b4476b 49 u64 orig_len = *plen;
3499e8a5 50 u64 objoff, objlen; /* extent in object */
d63b77f4 51 int r;
3499e8a5
YS
52
53 reqhead->snapid = cpu_to_le64(snapid);
54
55 /* object extent? */
e8afad65 56 r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
d63b77f4
SW
57 &objoff, &objlen);
58 if (r < 0)
59 return r;
e8afad65
AE
60 if (objlen < orig_len) {
61 *plen = objlen;
3499e8a5 62 dout(" skipping last %llu, final file extent %llu~%llu\n",
68b4476b 63 orig_len - *plen, off, *plen);
e8afad65 64 }
3499e8a5 65
68b4476b 66 if (op_has_extent(op->op)) {
a41bad1a 67 u32 osize = le32_to_cpu(layout->fl_object_size);
68b4476b
YS
68 op->extent.offset = objoff;
69 op->extent.length = objlen;
a41bad1a
YZ
70 if (op->extent.truncate_size <= off - objoff) {
71 op->extent.truncate_size = 0;
72 } else {
73 op->extent.truncate_size -= off - objoff;
74 if (op->extent.truncate_size > osize)
75 op->extent.truncate_size = osize;
76 }
68b4476b
YS
77 }
78 req->r_num_pages = calc_pages_for(off, *plen);
b7495fc2 79 req->r_page_alignment = off & ~PAGE_MASK;
3d14c5d2
YS
80 if (op->op == CEPH_OSD_OP_WRITE)
81 op->payload_len = *plen;
3499e8a5
YS
82
83 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
84 *bno, objoff, objlen, req->r_num_pages);
d63b77f4 85 return 0;
3499e8a5 86}
3d14c5d2 87EXPORT_SYMBOL(ceph_calc_raw_layout);
3499e8a5 88
f24e9980
SW
89/*
90 * Implement client access to distributed object storage cluster.
91 *
92 * All data objects are stored within a cluster/cloud of OSDs, or
93 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
94 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
95 * remote daemons serving up and coordinating consistent and safe
96 * access to storage.
97 *
98 * Cluster membership and the mapping of data objects onto storage devices
99 * are described by the osd map.
100 *
101 * We keep track of pending OSD requests (read, write), resubmit
102 * requests to different OSDs when the cluster topology/data layout
103 * change, or retry the affected requests when the communications
104 * channel with an OSD is reset.
105 */
106
107/*
108 * calculate the mapping of a file extent onto an object, and fill out the
109 * request accordingly. shorten extent as necessary if it crosses an
110 * object boundary.
111 *
112 * fill osd op in request message.
113 */
d63b77f4
SW
114static int calc_layout(struct ceph_osd_client *osdc,
115 struct ceph_vino vino,
116 struct ceph_file_layout *layout,
117 u64 off, u64 *plen,
118 struct ceph_osd_request *req,
119 struct ceph_osd_req_op *op)
f24e9980 120{
f24e9980 121 u64 bno;
d63b77f4 122 int r;
f24e9980 123
d63b77f4
SW
124 r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
125 plen, &bno, req, op);
126 if (r < 0)
127 return r;
f24e9980 128
2dab036b 129 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
f24e9980 130 req->r_oid_len = strlen(req->r_oid);
d63b77f4
SW
131
132 return r;
f24e9980
SW
133}
134
f24e9980
SW
135/*
136 * requests
137 */
415e49a9 138void ceph_osdc_release_request(struct kref *kref)
f24e9980 139{
415e49a9
SW
140 struct ceph_osd_request *req = container_of(kref,
141 struct ceph_osd_request,
142 r_kref);
143
144 if (req->r_request)
145 ceph_msg_put(req->r_request);
0d59ab81 146 if (req->r_con_filling_msg) {
8921d114 147 dout("%s revoking pages %p from con %p\n", __func__,
0d59ab81 148 req->r_pages, req->r_con_filling_msg);
8921d114 149 ceph_msg_revoke_incoming(req->r_reply);
0d47766f 150 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
350b1c32 151 }
ab8cb34a
AE
152 if (req->r_reply)
153 ceph_msg_put(req->r_reply);
415e49a9
SW
154 if (req->r_own_pages)
155 ceph_release_page_vector(req->r_pages,
156 req->r_num_pages);
68b4476b
YS
157#ifdef CONFIG_BLOCK
158 if (req->r_bio)
159 bio_put(req->r_bio);
160#endif
415e49a9 161 ceph_put_snap_context(req->r_snapc);
c885837f 162 ceph_pagelist_release(&req->r_trail);
415e49a9
SW
163 if (req->r_mempool)
164 mempool_free(req, req->r_osdc->req_mempool);
165 else
166 kfree(req);
f24e9980 167}
3d14c5d2 168EXPORT_SYMBOL(ceph_osdc_release_request);
68b4476b 169
5b9d1b1c 170static int get_num_ops(struct ceph_osd_req_op *ops)
68b4476b
YS
171{
172 int i = 0;
173
5b9d1b1c 174 while (ops[i].op)
68b4476b 175 i++;
68b4476b
YS
176
177 return i;
178}
179
3499e8a5
YS
180struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
181 int flags,
f24e9980 182 struct ceph_snap_context *snapc,
68b4476b 183 struct ceph_osd_req_op *ops,
3499e8a5
YS
184 bool use_mempool,
185 gfp_t gfp_flags,
68b4476b
YS
186 struct page **pages,
187 struct bio *bio)
f24e9980
SW
188{
189 struct ceph_osd_request *req;
190 struct ceph_msg *msg;
5b9d1b1c 191 int num_op = get_num_ops(ops);
68b4476b 192 size_t msg_size = sizeof(struct ceph_osd_request_head);
3499e8a5 193
68b4476b 194 msg_size += num_op*sizeof(struct ceph_osd_op);
f24e9980
SW
195
196 if (use_mempool) {
3499e8a5 197 req = mempool_alloc(osdc->req_mempool, gfp_flags);
f24e9980
SW
198 memset(req, 0, sizeof(*req));
199 } else {
3499e8a5 200 req = kzalloc(sizeof(*req), gfp_flags);
f24e9980
SW
201 }
202 if (req == NULL)
a79832f2 203 return NULL;
f24e9980 204
f24e9980
SW
205 req->r_osdc = osdc;
206 req->r_mempool = use_mempool;
68b4476b 207
415e49a9 208 kref_init(&req->r_kref);
f24e9980
SW
209 init_completion(&req->r_completion);
210 init_completion(&req->r_safe_completion);
a978fa20 211 RB_CLEAR_NODE(&req->r_node);
f24e9980 212 INIT_LIST_HEAD(&req->r_unsafe_item);
a40c4f10
YS
213 INIT_LIST_HEAD(&req->r_linger_item);
214 INIT_LIST_HEAD(&req->r_linger_osd);
935b639a 215 INIT_LIST_HEAD(&req->r_req_lru_item);
cd43045c
SW
216 INIT_LIST_HEAD(&req->r_osd_item);
217
f24e9980
SW
218 req->r_flags = flags;
219
220 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
221
c16e7869
SW
222 /* create reply message */
223 if (use_mempool)
224 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
225 else
226 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
b61c2763 227 OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
a79832f2 228 if (!msg) {
c16e7869 229 ceph_osdc_put_request(req);
a79832f2 230 return NULL;
c16e7869
SW
231 }
232 req->r_reply = msg;
233
c885837f 234 ceph_pagelist_init(&req->r_trail);
d50b409f 235
c16e7869 236 /* create request message; allow space for oid */
224736d9 237 msg_size += MAX_OBJ_NAME_SIZE;
f24e9980
SW
238 if (snapc)
239 msg_size += sizeof(u64) * snapc->num_snaps;
240 if (use_mempool)
8f3bc053 241 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
f24e9980 242 else
b61c2763 243 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
a79832f2 244 if (!msg) {
f24e9980 245 ceph_osdc_put_request(req);
a79832f2 246 return NULL;
f24e9980 247 }
68b4476b 248
f24e9980 249 memset(msg->front.iov_base, 0, msg->front.iov_len);
3499e8a5
YS
250
251 req->r_request = msg;
252 req->r_pages = pages;
68b4476b
YS
253#ifdef CONFIG_BLOCK
254 if (bio) {
255 req->r_bio = bio;
256 bio_get(req->r_bio);
257 }
258#endif
3499e8a5
YS
259
260 return req;
261}
3d14c5d2 262EXPORT_SYMBOL(ceph_osdc_alloc_request);
3499e8a5 263
68b4476b
YS
264static void osd_req_encode_op(struct ceph_osd_request *req,
265 struct ceph_osd_op *dst,
266 struct ceph_osd_req_op *src)
267{
268 dst->op = cpu_to_le16(src->op);
269
065a68f9 270 switch (src->op) {
68b4476b
YS
271 case CEPH_OSD_OP_READ:
272 case CEPH_OSD_OP_WRITE:
273 dst->extent.offset =
274 cpu_to_le64(src->extent.offset);
275 dst->extent.length =
276 cpu_to_le64(src->extent.length);
277 dst->extent.truncate_size =
278 cpu_to_le64(src->extent.truncate_size);
279 dst->extent.truncate_seq =
280 cpu_to_le32(src->extent.truncate_seq);
281 break;
282
283 case CEPH_OSD_OP_GETXATTR:
284 case CEPH_OSD_OP_SETXATTR:
285 case CEPH_OSD_OP_CMPXATTR:
68b4476b
YS
286 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
287 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
288 dst->xattr.cmp_op = src->xattr.cmp_op;
289 dst->xattr.cmp_mode = src->xattr.cmp_mode;
c885837f 290 ceph_pagelist_append(&req->r_trail, src->xattr.name,
68b4476b 291 src->xattr.name_len);
c885837f 292 ceph_pagelist_append(&req->r_trail, src->xattr.val,
68b4476b
YS
293 src->xattr.value_len);
294 break;
ae1533b6 295 case CEPH_OSD_OP_CALL:
ae1533b6
YS
296 dst->cls.class_len = src->cls.class_len;
297 dst->cls.method_len = src->cls.method_len;
298 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
299
c885837f 300 ceph_pagelist_append(&req->r_trail, src->cls.class_name,
ae1533b6 301 src->cls.class_len);
c885837f 302 ceph_pagelist_append(&req->r_trail, src->cls.method_name,
ae1533b6 303 src->cls.method_len);
c885837f 304 ceph_pagelist_append(&req->r_trail, src->cls.indata,
ae1533b6
YS
305 src->cls.indata_len);
306 break;
307 case CEPH_OSD_OP_ROLLBACK:
308 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
309 break;
68b4476b
YS
310 case CEPH_OSD_OP_STARTSYNC:
311 break;
a40c4f10
YS
312 case CEPH_OSD_OP_NOTIFY:
313 {
314 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
315 __le32 timeout = cpu_to_le32(src->watch.timeout);
316
c885837f 317 ceph_pagelist_append(&req->r_trail,
a40c4f10 318 &prot_ver, sizeof(prot_ver));
c885837f 319 ceph_pagelist_append(&req->r_trail,
a40c4f10
YS
320 &timeout, sizeof(timeout));
321 }
322 case CEPH_OSD_OP_NOTIFY_ACK:
323 case CEPH_OSD_OP_WATCH:
324 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
325 dst->watch.ver = cpu_to_le64(src->watch.ver);
326 dst->watch.flag = src->watch.flag;
327 break;
68b4476b
YS
328 default:
329 pr_err("unrecognized osd opcode %d\n", dst->op);
330 WARN_ON(1);
331 break;
332 }
333 dst->payload_len = cpu_to_le32(src->payload_len);
334}
335
3499e8a5
YS
336/*
337 * build new request AND message
338 *
339 */
340void ceph_osdc_build_request(struct ceph_osd_request *req,
0120be3c 341 u64 off, u64 len,
68b4476b
YS
342 struct ceph_osd_req_op *src_ops,
343 struct ceph_snap_context *snapc,
af77f26c 344 struct timespec *mtime)
3499e8a5
YS
345{
346 struct ceph_msg *msg = req->r_request;
347 struct ceph_osd_request_head *head;
68b4476b 348 struct ceph_osd_req_op *src_op;
3499e8a5
YS
349 struct ceph_osd_op *op;
350 void *p;
5b9d1b1c 351 int num_op = get_num_ops(src_ops);
3499e8a5 352 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
3499e8a5 353 int flags = req->r_flags;
68b4476b
YS
354 u64 data_len = 0;
355 int i;
3499e8a5 356
f24e9980
SW
357 head = msg->front.iov_base;
358 op = (void *)(head + 1);
359 p = (void *)(op + num_op);
360
f24e9980
SW
361 req->r_snapc = ceph_get_snap_context(snapc);
362
363 head->client_inc = cpu_to_le32(1); /* always, for now. */
364 head->flags = cpu_to_le32(flags);
365 if (flags & CEPH_OSD_FLAG_WRITE)
366 ceph_encode_timespec(&head->mtime, mtime);
367 head->num_ops = cpu_to_le16(num_op);
f24e9980 368
f24e9980
SW
369
370 /* fill in oid */
af77f26c
AE
371 head->object_len = cpu_to_le32(req->r_oid_len);
372 memcpy(p, req->r_oid, req->r_oid_len);
373 p += req->r_oid_len;
f24e9980 374
68b4476b
YS
375 src_op = src_ops;
376 while (src_op->op) {
377 osd_req_encode_op(req, op, src_op);
378 src_op++;
f24e9980 379 op++;
f24e9980 380 }
68b4476b 381
c885837f 382 data_len += req->r_trail.length;
68b4476b 383
f24e9980
SW
384 if (snapc) {
385 head->snap_seq = cpu_to_le64(snapc->seq);
386 head->num_snaps = cpu_to_le32(snapc->num_snaps);
387 for (i = 0; i < snapc->num_snaps; i++) {
388 put_unaligned_le64(snapc->snaps[i], p);
389 p += sizeof(u64);
390 }
391 }
392
68b4476b
YS
393 if (flags & CEPH_OSD_FLAG_WRITE) {
394 req->r_request->hdr.data_off = cpu_to_le16(off);
0120be3c 395 req->r_request->hdr.data_len = cpu_to_le32(len + data_len);
68b4476b
YS
396 } else if (data_len) {
397 req->r_request->hdr.data_off = 0;
398 req->r_request->hdr.data_len = cpu_to_le32(data_len);
399 }
400
c5c6b19d
SW
401 req->r_request->page_alignment = req->r_page_alignment;
402
f24e9980 403 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
6f863e71
SW
404 msg_size = p - msg->front.iov_base;
405 msg->front.iov_len = msg_size;
406 msg->hdr.front_len = cpu_to_le32(msg_size);
3499e8a5
YS
407 return;
408}
3d14c5d2 409EXPORT_SYMBOL(ceph_osdc_build_request);
3499e8a5
YS
410
411/*
412 * build new request AND message, calculate layout, and adjust file
413 * extent as needed.
414 *
415 * if the file was recently truncated, we include information about its
416 * old and new size so that the object can be updated appropriately. (we
417 * avoid synchronously deleting truncated objects because it's slow.)
418 *
419 * if @do_sync, include a 'startsync' command so that the osd will flush
420 * data quickly.
421 */
422struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
423 struct ceph_file_layout *layout,
424 struct ceph_vino vino,
425 u64 off, u64 *plen,
426 int opcode, int flags,
427 struct ceph_snap_context *snapc,
428 int do_sync,
429 u32 truncate_seq,
430 u64 truncate_size,
431 struct timespec *mtime,
b7495fc2
SW
432 bool use_mempool, int num_reply,
433 int page_align)
3499e8a5 434{
68b4476b
YS
435 struct ceph_osd_req_op ops[3];
436 struct ceph_osd_request *req;
6816282d 437 int r;
68b4476b
YS
438
439 ops[0].op = opcode;
440 ops[0].extent.truncate_seq = truncate_seq;
441 ops[0].extent.truncate_size = truncate_size;
442 ops[0].payload_len = 0;
443
444 if (do_sync) {
445 ops[1].op = CEPH_OSD_OP_STARTSYNC;
446 ops[1].payload_len = 0;
447 ops[2].op = 0;
448 } else
449 ops[1].op = 0;
450
451 req = ceph_osdc_alloc_request(osdc, flags,
452 snapc, ops,
3499e8a5 453 use_mempool,
68b4476b 454 GFP_NOFS, NULL, NULL);
4ad12621 455 if (!req)
6816282d 456 return ERR_PTR(-ENOMEM);
3499e8a5
YS
457
458 /* calculate max write size */
6816282d
SW
459 r = calc_layout(osdc, vino, layout, off, plen, req, ops);
460 if (r < 0)
461 return ERR_PTR(r);
3499e8a5
YS
462 req->r_file_layout = *layout; /* keep a copy */
463
9bb0ce2b
SW
464 /* in case it differs from natural (file) alignment that
465 calc_layout filled in for us */
466 req->r_num_pages = calc_pages_for(page_align, *plen);
b7495fc2
SW
467 req->r_page_alignment = page_align;
468
0120be3c 469 ceph_osdc_build_request(req, off, *plen, ops,
68b4476b 470 snapc,
af77f26c 471 mtime);
3499e8a5 472
f24e9980
SW
473 return req;
474}
3d14c5d2 475EXPORT_SYMBOL(ceph_osdc_new_request);
f24e9980
SW
476
477/*
478 * We keep osd requests in an rbtree, sorted by ->r_tid.
479 */
480static void __insert_request(struct ceph_osd_client *osdc,
481 struct ceph_osd_request *new)
482{
483 struct rb_node **p = &osdc->requests.rb_node;
484 struct rb_node *parent = NULL;
485 struct ceph_osd_request *req = NULL;
486
487 while (*p) {
488 parent = *p;
489 req = rb_entry(parent, struct ceph_osd_request, r_node);
490 if (new->r_tid < req->r_tid)
491 p = &(*p)->rb_left;
492 else if (new->r_tid > req->r_tid)
493 p = &(*p)->rb_right;
494 else
495 BUG();
496 }
497
498 rb_link_node(&new->r_node, parent, p);
499 rb_insert_color(&new->r_node, &osdc->requests);
500}
501
502static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
503 u64 tid)
504{
505 struct ceph_osd_request *req;
506 struct rb_node *n = osdc->requests.rb_node;
507
508 while (n) {
509 req = rb_entry(n, struct ceph_osd_request, r_node);
510 if (tid < req->r_tid)
511 n = n->rb_left;
512 else if (tid > req->r_tid)
513 n = n->rb_right;
514 else
515 return req;
516 }
517 return NULL;
518}
519
520static struct ceph_osd_request *
521__lookup_request_ge(struct ceph_osd_client *osdc,
522 u64 tid)
523{
524 struct ceph_osd_request *req;
525 struct rb_node *n = osdc->requests.rb_node;
526
527 while (n) {
528 req = rb_entry(n, struct ceph_osd_request, r_node);
529 if (tid < req->r_tid) {
530 if (!n->rb_left)
531 return req;
532 n = n->rb_left;
533 } else if (tid > req->r_tid) {
534 n = n->rb_right;
535 } else {
536 return req;
537 }
538 }
539 return NULL;
540}
541
6f6c7006
SW
542/*
543 * Resubmit requests pending on the given osd.
544 */
545static void __kick_osd_requests(struct ceph_osd_client *osdc,
546 struct ceph_osd *osd)
547{
a40c4f10 548 struct ceph_osd_request *req, *nreq;
6f6c7006
SW
549 int err;
550
551 dout("__kick_osd_requests osd%d\n", osd->o_osd);
552 err = __reset_osd(osdc, osd);
685a7555 553 if (err)
6f6c7006
SW
554 return;
555
556 list_for_each_entry(req, &osd->o_requests, r_osd_item) {
557 list_move(&req->r_req_lru_item, &osdc->req_unsent);
558 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
559 osd->o_osd);
a40c4f10
YS
560 if (!req->r_linger)
561 req->r_flags |= CEPH_OSD_FLAG_RETRY;
562 }
563
564 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
565 r_linger_osd) {
77f38e0e
SW
566 /*
567 * reregister request prior to unregistering linger so
568 * that r_osd is preserved.
569 */
570 BUG_ON(!list_empty(&req->r_req_lru_item));
a40c4f10 571 __register_request(osdc, req);
77f38e0e
SW
572 list_add(&req->r_req_lru_item, &osdc->req_unsent);
573 list_add(&req->r_osd_item, &req->r_osd->o_requests);
574 __unregister_linger_request(osdc, req);
a40c4f10
YS
575 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
576 osd->o_osd);
6f6c7006
SW
577 }
578}
579
f24e9980 580/*
81b024e7 581 * If the osd connection drops, we need to resubmit all requests.
f24e9980
SW
582 */
583static void osd_reset(struct ceph_connection *con)
584{
585 struct ceph_osd *osd = con->private;
586 struct ceph_osd_client *osdc;
587
588 if (!osd)
589 return;
590 dout("osd_reset osd%d\n", osd->o_osd);
591 osdc = osd->o_osdc;
f24e9980 592 down_read(&osdc->map_sem);
83aff95e
SW
593 mutex_lock(&osdc->request_mutex);
594 __kick_osd_requests(osdc, osd);
595 mutex_unlock(&osdc->request_mutex);
6f6c7006 596 send_queued(osdc);
f24e9980
SW
597 up_read(&osdc->map_sem);
598}
599
600/*
601 * Track open sessions with osds.
602 */
e10006f8 603static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
f24e9980
SW
604{
605 struct ceph_osd *osd;
606
607 osd = kzalloc(sizeof(*osd), GFP_NOFS);
608 if (!osd)
609 return NULL;
610
611 atomic_set(&osd->o_ref, 1);
612 osd->o_osdc = osdc;
e10006f8 613 osd->o_osd = onum;
f407731d 614 RB_CLEAR_NODE(&osd->o_node);
f24e9980 615 INIT_LIST_HEAD(&osd->o_requests);
a40c4f10 616 INIT_LIST_HEAD(&osd->o_linger_requests);
f5a2041b 617 INIT_LIST_HEAD(&osd->o_osd_lru);
f24e9980
SW
618 osd->o_incarnation = 1;
619
b7a9e5dd 620 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
4e7a5dcd 621
422d2cb8 622 INIT_LIST_HEAD(&osd->o_keepalive_item);
f24e9980
SW
623 return osd;
624}
625
626static struct ceph_osd *get_osd(struct ceph_osd *osd)
627{
628 if (atomic_inc_not_zero(&osd->o_ref)) {
629 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
630 atomic_read(&osd->o_ref));
631 return osd;
632 } else {
633 dout("get_osd %p FAIL\n", osd);
634 return NULL;
635 }
636}
637
638static void put_osd(struct ceph_osd *osd)
639{
640 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
641 atomic_read(&osd->o_ref) - 1);
a255651d 642 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
79494d1b
SW
643 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
644
a255651d 645 if (ac->ops && ac->ops->destroy_authorizer)
6c4a1915 646 ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
f24e9980 647 kfree(osd);
79494d1b 648 }
f24e9980
SW
649}
650
651/*
652 * remove an osd from our map
653 */
f5a2041b 654static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 655{
f5a2041b 656 dout("__remove_osd %p\n", osd);
f24e9980
SW
657 BUG_ON(!list_empty(&osd->o_requests));
658 rb_erase(&osd->o_node, &osdc->osds);
f5a2041b 659 list_del_init(&osd->o_osd_lru);
f24e9980
SW
660 ceph_con_close(&osd->o_con);
661 put_osd(osd);
662}
663
aca420bc
SW
664static void remove_all_osds(struct ceph_osd_client *osdc)
665{
048a9d2d 666 dout("%s %p\n", __func__, osdc);
aca420bc
SW
667 mutex_lock(&osdc->request_mutex);
668 while (!RB_EMPTY_ROOT(&osdc->osds)) {
669 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
670 struct ceph_osd, o_node);
671 __remove_osd(osdc, osd);
672 }
673 mutex_unlock(&osdc->request_mutex);
674}
675
f5a2041b
YS
676static void __move_osd_to_lru(struct ceph_osd_client *osdc,
677 struct ceph_osd *osd)
678{
679 dout("__move_osd_to_lru %p\n", osd);
680 BUG_ON(!list_empty(&osd->o_osd_lru));
681 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
3d14c5d2 682 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
f5a2041b
YS
683}
684
685static void __remove_osd_from_lru(struct ceph_osd *osd)
686{
687 dout("__remove_osd_from_lru %p\n", osd);
688 if (!list_empty(&osd->o_osd_lru))
689 list_del_init(&osd->o_osd_lru);
690}
691
aca420bc 692static void remove_old_osds(struct ceph_osd_client *osdc)
f5a2041b
YS
693{
694 struct ceph_osd *osd, *nosd;
695
696 dout("__remove_old_osds %p\n", osdc);
697 mutex_lock(&osdc->request_mutex);
698 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
aca420bc 699 if (time_before(jiffies, osd->lru_ttl))
f5a2041b
YS
700 break;
701 __remove_osd(osdc, osd);
702 }
703 mutex_unlock(&osdc->request_mutex);
704}
705
f24e9980
SW
706/*
707 * reset osd connect
708 */
f5a2041b 709static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 710{
c3acb181 711 struct ceph_entity_addr *peer_addr;
f24e9980 712
f5a2041b 713 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
a40c4f10
YS
714 if (list_empty(&osd->o_requests) &&
715 list_empty(&osd->o_linger_requests)) {
f5a2041b 716 __remove_osd(osdc, osd);
c3acb181
AE
717
718 return -ENODEV;
719 }
720
721 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
722 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
723 !ceph_con_opened(&osd->o_con)) {
724 struct ceph_osd_request *req;
725
87b315a5
SW
726 dout(" osd addr hasn't changed and connection never opened,"
727 " letting msgr retry");
728 /* touch each r_stamp for handle_timeout()'s benfit */
729 list_for_each_entry(req, &osd->o_requests, r_osd_item)
730 req->r_stamp = jiffies;
c3acb181
AE
731
732 return -EAGAIN;
f24e9980 733 }
c3acb181
AE
734
735 ceph_con_close(&osd->o_con);
736 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
737 osd->o_incarnation++;
738
739 return 0;
f24e9980
SW
740}
741
742static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
743{
744 struct rb_node **p = &osdc->osds.rb_node;
745 struct rb_node *parent = NULL;
746 struct ceph_osd *osd = NULL;
747
aca420bc 748 dout("__insert_osd %p osd%d\n", new, new->o_osd);
f24e9980
SW
749 while (*p) {
750 parent = *p;
751 osd = rb_entry(parent, struct ceph_osd, o_node);
752 if (new->o_osd < osd->o_osd)
753 p = &(*p)->rb_left;
754 else if (new->o_osd > osd->o_osd)
755 p = &(*p)->rb_right;
756 else
757 BUG();
758 }
759
760 rb_link_node(&new->o_node, parent, p);
761 rb_insert_color(&new->o_node, &osdc->osds);
762}
763
764static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
765{
766 struct ceph_osd *osd;
767 struct rb_node *n = osdc->osds.rb_node;
768
769 while (n) {
770 osd = rb_entry(n, struct ceph_osd, o_node);
771 if (o < osd->o_osd)
772 n = n->rb_left;
773 else if (o > osd->o_osd)
774 n = n->rb_right;
775 else
776 return osd;
777 }
778 return NULL;
779}
780
422d2cb8
YS
781static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
782{
783 schedule_delayed_work(&osdc->timeout_work,
3d14c5d2 784 osdc->client->options->osd_keepalive_timeout * HZ);
422d2cb8
YS
785}
786
787static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
788{
789 cancel_delayed_work(&osdc->timeout_work);
790}
f24e9980
SW
791
792/*
793 * Register request, assign tid. If this is the first request, set up
794 * the timeout event.
795 */
a40c4f10
YS
796static void __register_request(struct ceph_osd_client *osdc,
797 struct ceph_osd_request *req)
f24e9980 798{
f24e9980 799 req->r_tid = ++osdc->last_tid;
6df058c0 800 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
77f38e0e 801 dout("__register_request %p tid %lld\n", req, req->r_tid);
f24e9980
SW
802 __insert_request(osdc, req);
803 ceph_osdc_get_request(req);
804 osdc->num_requests++;
f24e9980 805 if (osdc->num_requests == 1) {
422d2cb8
YS
806 dout(" first request, scheduling timeout\n");
807 __schedule_osd_timeout(osdc);
f24e9980 808 }
a40c4f10
YS
809}
810
811static void register_request(struct ceph_osd_client *osdc,
812 struct ceph_osd_request *req)
813{
814 mutex_lock(&osdc->request_mutex);
815 __register_request(osdc, req);
f24e9980
SW
816 mutex_unlock(&osdc->request_mutex);
817}
818
819/*
820 * called under osdc->request_mutex
821 */
822static void __unregister_request(struct ceph_osd_client *osdc,
823 struct ceph_osd_request *req)
824{
35f9f8a0
SW
825 if (RB_EMPTY_NODE(&req->r_node)) {
826 dout("__unregister_request %p tid %lld not registered\n",
827 req, req->r_tid);
828 return;
829 }
830
f24e9980
SW
831 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
832 rb_erase(&req->r_node, &osdc->requests);
833 osdc->num_requests--;
834
0ba6478d
SW
835 if (req->r_osd) {
836 /* make sure the original request isn't in flight. */
6740a845 837 ceph_msg_revoke(req->r_request);
0ba6478d
SW
838
839 list_del_init(&req->r_osd_item);
a40c4f10
YS
840 if (list_empty(&req->r_osd->o_requests) &&
841 list_empty(&req->r_osd->o_linger_requests)) {
842 dout("moving osd to %p lru\n", req->r_osd);
f5a2041b 843 __move_osd_to_lru(osdc, req->r_osd);
a40c4f10 844 }
fbdb9190 845 if (list_empty(&req->r_linger_item))
a40c4f10 846 req->r_osd = NULL;
0ba6478d 847 }
f24e9980 848
7d5f2481 849 list_del_init(&req->r_req_lru_item);
f24e9980
SW
850 ceph_osdc_put_request(req);
851
422d2cb8
YS
852 if (osdc->num_requests == 0) {
853 dout(" no requests, canceling timeout\n");
854 __cancel_osd_timeout(osdc);
f24e9980
SW
855 }
856}
857
858/*
859 * Cancel a previously queued request message
860 */
861static void __cancel_request(struct ceph_osd_request *req)
862{
6bc18876 863 if (req->r_sent && req->r_osd) {
6740a845 864 ceph_msg_revoke(req->r_request);
f24e9980
SW
865 req->r_sent = 0;
866 }
867}
868
a40c4f10
YS
869static void __register_linger_request(struct ceph_osd_client *osdc,
870 struct ceph_osd_request *req)
871{
872 dout("__register_linger_request %p\n", req);
873 list_add_tail(&req->r_linger_item, &osdc->req_linger);
6194ea89
SW
874 if (req->r_osd)
875 list_add_tail(&req->r_linger_osd,
876 &req->r_osd->o_linger_requests);
a40c4f10
YS
877}
878
879static void __unregister_linger_request(struct ceph_osd_client *osdc,
880 struct ceph_osd_request *req)
881{
882 dout("__unregister_linger_request %p\n", req);
61c74035 883 list_del_init(&req->r_linger_item);
a40c4f10 884 if (req->r_osd) {
a40c4f10
YS
885 list_del_init(&req->r_linger_osd);
886
887 if (list_empty(&req->r_osd->o_requests) &&
888 list_empty(&req->r_osd->o_linger_requests)) {
889 dout("moving osd to %p lru\n", req->r_osd);
890 __move_osd_to_lru(osdc, req->r_osd);
891 }
fbdb9190
SW
892 if (list_empty(&req->r_osd_item))
893 req->r_osd = NULL;
a40c4f10
YS
894 }
895}
896
897void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
898 struct ceph_osd_request *req)
899{
900 mutex_lock(&osdc->request_mutex);
901 if (req->r_linger) {
902 __unregister_linger_request(osdc, req);
903 ceph_osdc_put_request(req);
904 }
905 mutex_unlock(&osdc->request_mutex);
906}
907EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
908
909void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
910 struct ceph_osd_request *req)
911{
912 if (!req->r_linger) {
913 dout("set_request_linger %p\n", req);
914 req->r_linger = 1;
915 /*
916 * caller is now responsible for calling
917 * unregister_linger_request
918 */
919 ceph_osdc_get_request(req);
920 }
921}
922EXPORT_SYMBOL(ceph_osdc_set_request_linger);
923
f24e9980
SW
924/*
925 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
926 * (as needed), and set the request r_osd appropriately. If there is
25985edc 927 * no up osd, set r_osd to NULL. Move the request to the appropriate list
6f6c7006 928 * (unsent, homeless) or leave on in-flight lru.
f24e9980
SW
929 *
930 * Return 0 if unchanged, 1 if changed, or negative on error.
931 *
932 * Caller should hold map_sem for read and request_mutex.
933 */
6f6c7006 934static int __map_request(struct ceph_osd_client *osdc,
38d6453c 935 struct ceph_osd_request *req, int force_resend)
f24e9980
SW
936{
937 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
51042122 938 struct ceph_pg pgid;
d85b7056
SW
939 int acting[CEPH_PG_MAX_SIZE];
940 int o = -1, num = 0;
f24e9980 941 int err;
f24e9980 942
6f6c7006 943 dout("map_request %p tid %lld\n", req, req->r_tid);
f24e9980
SW
944 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
945 &req->r_file_layout, osdc->osdmap);
6f6c7006
SW
946 if (err) {
947 list_move(&req->r_req_lru_item, &osdc->req_notarget);
f24e9980 948 return err;
6f6c7006 949 }
51042122 950 pgid = reqhead->layout.ol_pgid;
7740a42f
SW
951 req->r_pgid = pgid;
952
d85b7056
SW
953 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
954 if (err > 0) {
955 o = acting[0];
956 num = err;
957 }
f24e9980 958
38d6453c
SW
959 if ((!force_resend &&
960 req->r_osd && req->r_osd->o_osd == o &&
d85b7056
SW
961 req->r_sent >= req->r_osd->o_incarnation &&
962 req->r_num_pg_osds == num &&
963 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
f24e9980
SW
964 (req->r_osd == NULL && o == -1))
965 return 0; /* no change */
966
6f6c7006 967 dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
51042122 968 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
f24e9980
SW
969 req->r_osd ? req->r_osd->o_osd : -1);
970
d85b7056
SW
971 /* record full pg acting set */
972 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
973 req->r_num_pg_osds = num;
974
f24e9980
SW
975 if (req->r_osd) {
976 __cancel_request(req);
977 list_del_init(&req->r_osd_item);
f24e9980
SW
978 req->r_osd = NULL;
979 }
980
981 req->r_osd = __lookup_osd(osdc, o);
982 if (!req->r_osd && o >= 0) {
c99eb1c7 983 err = -ENOMEM;
e10006f8 984 req->r_osd = create_osd(osdc, o);
6f6c7006
SW
985 if (!req->r_osd) {
986 list_move(&req->r_req_lru_item, &osdc->req_notarget);
c99eb1c7 987 goto out;
6f6c7006 988 }
f24e9980 989
6f6c7006 990 dout("map_request osd %p is osd%d\n", req->r_osd, o);
f24e9980
SW
991 __insert_osd(osdc, req->r_osd);
992
b7a9e5dd
SW
993 ceph_con_open(&req->r_osd->o_con,
994 CEPH_ENTITY_TYPE_OSD, o,
995 &osdc->osdmap->osd_addr[o]);
f24e9980
SW
996 }
997
f5a2041b
YS
998 if (req->r_osd) {
999 __remove_osd_from_lru(req->r_osd);
f24e9980 1000 list_add(&req->r_osd_item, &req->r_osd->o_requests);
6f6c7006
SW
1001 list_move(&req->r_req_lru_item, &osdc->req_unsent);
1002 } else {
1003 list_move(&req->r_req_lru_item, &osdc->req_notarget);
f5a2041b 1004 }
d85b7056 1005 err = 1; /* osd or pg changed */
f24e9980
SW
1006
1007out:
f24e9980
SW
1008 return err;
1009}
1010
1011/*
1012 * caller should hold map_sem (for read) and request_mutex
1013 */
56e925b6
SW
1014static void __send_request(struct ceph_osd_client *osdc,
1015 struct ceph_osd_request *req)
f24e9980
SW
1016{
1017 struct ceph_osd_request_head *reqhead;
f24e9980
SW
1018
1019 dout("send_request %p tid %llu to osd%d flags %d\n",
1020 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1021
1022 reqhead = req->r_request->front.iov_base;
1023 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1024 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
1025 reqhead->reassert_version = req->r_reassert_version;
1026
3dd72fc0 1027 req->r_stamp = jiffies;
07a27e22 1028 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
f24e9980
SW
1029
1030 ceph_msg_get(req->r_request); /* send consumes a ref */
1031 ceph_con_send(&req->r_osd->o_con, req->r_request);
1032 req->r_sent = req->r_osd->o_incarnation;
f24e9980
SW
1033}
1034
6f6c7006
SW
1035/*
1036 * Send any requests in the queue (req_unsent).
1037 */
1038static void send_queued(struct ceph_osd_client *osdc)
1039{
1040 struct ceph_osd_request *req, *tmp;
1041
1042 dout("send_queued\n");
1043 mutex_lock(&osdc->request_mutex);
1044 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1045 __send_request(osdc, req);
1046 }
1047 mutex_unlock(&osdc->request_mutex);
1048}
1049
f24e9980
SW
1050/*
1051 * Timeout callback, called every N seconds when 1 or more osd
1052 * requests has been active for more than N seconds. When this
1053 * happens, we ping all OSDs with requests who have timed out to
1054 * ensure any communications channel reset is detected. Reset the
1055 * request timeouts another N seconds in the future as we go.
1056 * Reschedule the timeout event another N seconds in future (unless
1057 * there are no open requests).
1058 */
1059static void handle_timeout(struct work_struct *work)
1060{
1061 struct ceph_osd_client *osdc =
1062 container_of(work, struct ceph_osd_client, timeout_work.work);
83aff95e 1063 struct ceph_osd_request *req;
f24e9980 1064 struct ceph_osd *osd;
422d2cb8 1065 unsigned long keepalive =
3d14c5d2 1066 osdc->client->options->osd_keepalive_timeout * HZ;
422d2cb8 1067 struct list_head slow_osds;
f24e9980
SW
1068 dout("timeout\n");
1069 down_read(&osdc->map_sem);
1070
1071 ceph_monc_request_next_osdmap(&osdc->client->monc);
1072
1073 mutex_lock(&osdc->request_mutex);
f24e9980 1074
422d2cb8
YS
1075 /*
1076 * ping osds that are a bit slow. this ensures that if there
1077 * is a break in the TCP connection we will notice, and reopen
1078 * a connection with that osd (from the fault callback).
1079 */
1080 INIT_LIST_HEAD(&slow_osds);
1081 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
3dd72fc0 1082 if (time_before(jiffies, req->r_stamp + keepalive))
422d2cb8
YS
1083 break;
1084
1085 osd = req->r_osd;
1086 BUG_ON(!osd);
1087 dout(" tid %llu is slow, will send keepalive on osd%d\n",
f24e9980 1088 req->r_tid, osd->o_osd);
422d2cb8
YS
1089 list_move_tail(&osd->o_keepalive_item, &slow_osds);
1090 }
1091 while (!list_empty(&slow_osds)) {
1092 osd = list_entry(slow_osds.next, struct ceph_osd,
1093 o_keepalive_item);
1094 list_del_init(&osd->o_keepalive_item);
f24e9980
SW
1095 ceph_con_keepalive(&osd->o_con);
1096 }
1097
422d2cb8 1098 __schedule_osd_timeout(osdc);
f24e9980 1099 mutex_unlock(&osdc->request_mutex);
6f6c7006 1100 send_queued(osdc);
f24e9980
SW
1101 up_read(&osdc->map_sem);
1102}
1103
f5a2041b
YS
1104static void handle_osds_timeout(struct work_struct *work)
1105{
1106 struct ceph_osd_client *osdc =
1107 container_of(work, struct ceph_osd_client,
1108 osds_timeout_work.work);
1109 unsigned long delay =
3d14c5d2 1110 osdc->client->options->osd_idle_ttl * HZ >> 2;
f5a2041b
YS
1111
1112 dout("osds timeout\n");
1113 down_read(&osdc->map_sem);
aca420bc 1114 remove_old_osds(osdc);
f5a2041b
YS
1115 up_read(&osdc->map_sem);
1116
1117 schedule_delayed_work(&osdc->osds_timeout_work,
1118 round_jiffies_relative(delay));
1119}
1120
25845472
SW
1121static void complete_request(struct ceph_osd_request *req)
1122{
1123 if (req->r_safe_callback)
1124 req->r_safe_callback(req, NULL);
1125 complete_all(&req->r_safe_completion); /* fsync waiter */
1126}
1127
f24e9980
SW
1128/*
1129 * handle osd op reply. either call the callback if it is specified,
1130 * or do the completion to wake up the waiting thread.
1131 */
350b1c32
SW
1132static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1133 struct ceph_connection *con)
f24e9980
SW
1134{
1135 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1136 struct ceph_osd_request *req;
1137 u64 tid;
1138 int numops, object_len, flags;
0ceed5db 1139 s32 result;
f24e9980 1140
6df058c0 1141 tid = le64_to_cpu(msg->hdr.tid);
f24e9980
SW
1142 if (msg->front.iov_len < sizeof(*rhead))
1143 goto bad;
f24e9980
SW
1144 numops = le32_to_cpu(rhead->num_ops);
1145 object_len = le32_to_cpu(rhead->object_len);
0ceed5db 1146 result = le32_to_cpu(rhead->result);
f24e9980
SW
1147 if (msg->front.iov_len != sizeof(*rhead) + object_len +
1148 numops * sizeof(struct ceph_osd_op))
1149 goto bad;
0ceed5db 1150 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
f24e9980
SW
1151 /* lookup */
1152 mutex_lock(&osdc->request_mutex);
1153 req = __lookup_request(osdc, tid);
1154 if (req == NULL) {
1155 dout("handle_reply tid %llu dne\n", tid);
1156 mutex_unlock(&osdc->request_mutex);
1157 return;
1158 }
1159 ceph_osdc_get_request(req);
1160 flags = le32_to_cpu(rhead->flags);
1161
350b1c32 1162 /*
0d59ab81 1163 * if this connection filled our message, drop our reference now, to
350b1c32
SW
1164 * avoid a (safe but slower) revoke later.
1165 */
0d59ab81 1166 if (req->r_con_filling_msg == con && req->r_reply == msg) {
c16e7869 1167 dout(" dropping con_filling_msg ref %p\n", con);
0d59ab81 1168 req->r_con_filling_msg = NULL;
0d47766f 1169 con->ops->put(con);
350b1c32
SW
1170 }
1171
f24e9980 1172 if (!req->r_got_reply) {
95c96174 1173 unsigned int bytes;
f24e9980
SW
1174
1175 req->r_result = le32_to_cpu(rhead->result);
1176 bytes = le32_to_cpu(msg->hdr.data_len);
1177 dout("handle_reply result %d bytes %d\n", req->r_result,
1178 bytes);
1179 if (req->r_result == 0)
1180 req->r_result = bytes;
1181
1182 /* in case this is a write and we need to replay, */
1183 req->r_reassert_version = rhead->reassert_version;
1184
1185 req->r_got_reply = 1;
1186 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1187 dout("handle_reply tid %llu dup ack\n", tid);
34b43a56 1188 mutex_unlock(&osdc->request_mutex);
f24e9980
SW
1189 goto done;
1190 }
1191
1192 dout("handle_reply tid %llu flags %d\n", tid, flags);
1193
a40c4f10
YS
1194 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1195 __register_linger_request(osdc, req);
1196
f24e9980 1197 /* either this is a read, or we got the safe response */
0ceed5db
SW
1198 if (result < 0 ||
1199 (flags & CEPH_OSD_FLAG_ONDISK) ||
f24e9980
SW
1200 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1201 __unregister_request(osdc, req);
1202
1203 mutex_unlock(&osdc->request_mutex);
1204
1205 if (req->r_callback)
1206 req->r_callback(req, msg);
1207 else
03066f23 1208 complete_all(&req->r_completion);
f24e9980 1209
25845472
SW
1210 if (flags & CEPH_OSD_FLAG_ONDISK)
1211 complete_request(req);
f24e9980
SW
1212
1213done:
a40c4f10 1214 dout("req=%p req->r_linger=%d\n", req, req->r_linger);
f24e9980
SW
1215 ceph_osdc_put_request(req);
1216 return;
1217
1218bad:
1219 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1220 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1221 (int)sizeof(*rhead));
9ec7cab1 1222 ceph_msg_dump(msg);
f24e9980
SW
1223}
1224
6f6c7006 1225static void reset_changed_osds(struct ceph_osd_client *osdc)
f24e9980 1226{
f24e9980 1227 struct rb_node *p, *n;
f24e9980 1228
6f6c7006
SW
1229 for (p = rb_first(&osdc->osds); p; p = n) {
1230 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
f24e9980 1231
6f6c7006
SW
1232 n = rb_next(p);
1233 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1234 memcmp(&osd->o_con.peer_addr,
1235 ceph_osd_addr(osdc->osdmap,
1236 osd->o_osd),
1237 sizeof(struct ceph_entity_addr)) != 0)
1238 __reset_osd(osdc, osd);
f24e9980 1239 }
422d2cb8
YS
1240}
1241
1242/*
6f6c7006
SW
1243 * Requeue requests whose mapping to an OSD has changed. If requests map to
1244 * no osd, request a new map.
422d2cb8 1245 *
e6d50f67 1246 * Caller should hold map_sem for read.
422d2cb8 1247 */
38d6453c 1248static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
422d2cb8 1249{
a40c4f10 1250 struct ceph_osd_request *req, *nreq;
6f6c7006
SW
1251 struct rb_node *p;
1252 int needmap = 0;
1253 int err;
422d2cb8 1254
38d6453c 1255 dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
422d2cb8 1256 mutex_lock(&osdc->request_mutex);
6194ea89 1257 for (p = rb_first(&osdc->requests); p; ) {
6f6c7006 1258 req = rb_entry(p, struct ceph_osd_request, r_node);
6194ea89 1259 p = rb_next(p);
ab60b16d
AE
1260
1261 /*
1262 * For linger requests that have not yet been
1263 * registered, move them to the linger list; they'll
1264 * be sent to the osd in the loop below. Unregister
1265 * the request before re-registering it as a linger
1266 * request to ensure the __map_request() below
1267 * will decide it needs to be sent.
1268 */
1269 if (req->r_linger && list_empty(&req->r_linger_item)) {
1270 dout("%p tid %llu restart on osd%d\n",
1271 req, req->r_tid,
1272 req->r_osd ? req->r_osd->o_osd : -1);
1273 __unregister_request(osdc, req);
1274 __register_linger_request(osdc, req);
1275 continue;
1276 }
1277
38d6453c 1278 err = __map_request(osdc, req, force_resend);
6f6c7006
SW
1279 if (err < 0)
1280 continue; /* error */
1281 if (req->r_osd == NULL) {
1282 dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1283 needmap++; /* request a newer map */
1284 } else if (err > 0) {
6194ea89
SW
1285 if (!req->r_linger) {
1286 dout("%p tid %llu requeued on osd%d\n", req,
1287 req->r_tid,
1288 req->r_osd ? req->r_osd->o_osd : -1);
a40c4f10 1289 req->r_flags |= CEPH_OSD_FLAG_RETRY;
6194ea89
SW
1290 }
1291 }
a40c4f10
YS
1292 }
1293
1294 list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1295 r_linger_item) {
1296 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1297
38d6453c 1298 err = __map_request(osdc, req, force_resend);
ab60b16d 1299 dout("__map_request returned %d\n", err);
a40c4f10
YS
1300 if (err == 0)
1301 continue; /* no change and no osd was specified */
1302 if (err < 0)
1303 continue; /* hrm! */
1304 if (req->r_osd == NULL) {
1305 dout("tid %llu maps to no valid osd\n", req->r_tid);
1306 needmap++; /* request a newer map */
1307 continue;
6f6c7006 1308 }
a40c4f10
YS
1309
1310 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1311 req->r_osd ? req->r_osd->o_osd : -1);
a40c4f10 1312 __register_request(osdc, req);
c89ce05e 1313 __unregister_linger_request(osdc, req);
6f6c7006 1314 }
f24e9980
SW
1315 mutex_unlock(&osdc->request_mutex);
1316
1317 if (needmap) {
1318 dout("%d requests for down osds, need new map\n", needmap);
1319 ceph_monc_request_next_osdmap(&osdc->client->monc);
1320 }
e6d50f67 1321 reset_changed_osds(osdc);
422d2cb8 1322}
6f6c7006
SW
1323
1324
f24e9980
SW
1325/*
1326 * Process updated osd map.
1327 *
1328 * The message contains any number of incremental and full maps, normally
1329 * indicating some sort of topology change in the cluster. Kick requests
1330 * off to different OSDs as needed.
1331 */
1332void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1333{
1334 void *p, *end, *next;
1335 u32 nr_maps, maplen;
1336 u32 epoch;
1337 struct ceph_osdmap *newmap = NULL, *oldmap;
1338 int err;
1339 struct ceph_fsid fsid;
1340
1341 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1342 p = msg->front.iov_base;
1343 end = p + msg->front.iov_len;
1344
1345 /* verify fsid */
1346 ceph_decode_need(&p, end, sizeof(fsid), bad);
1347 ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d
SW
1348 if (ceph_check_fsid(osdc->client, &fsid) < 0)
1349 return;
f24e9980
SW
1350
1351 down_write(&osdc->map_sem);
1352
1353 /* incremental maps */
1354 ceph_decode_32_safe(&p, end, nr_maps, bad);
1355 dout(" %d inc maps\n", nr_maps);
1356 while (nr_maps > 0) {
1357 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1358 epoch = ceph_decode_32(&p);
1359 maplen = ceph_decode_32(&p);
f24e9980
SW
1360 ceph_decode_need(&p, end, maplen, bad);
1361 next = p + maplen;
1362 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1363 dout("applying incremental map %u len %d\n",
1364 epoch, maplen);
1365 newmap = osdmap_apply_incremental(&p, next,
1366 osdc->osdmap,
15d9882c 1367 &osdc->client->msgr);
f24e9980
SW
1368 if (IS_ERR(newmap)) {
1369 err = PTR_ERR(newmap);
1370 goto bad;
1371 }
30dc6381 1372 BUG_ON(!newmap);
f24e9980
SW
1373 if (newmap != osdc->osdmap) {
1374 ceph_osdmap_destroy(osdc->osdmap);
1375 osdc->osdmap = newmap;
1376 }
38d6453c 1377 kick_requests(osdc, 0);
f24e9980
SW
1378 } else {
1379 dout("ignoring incremental map %u len %d\n",
1380 epoch, maplen);
1381 }
1382 p = next;
1383 nr_maps--;
1384 }
1385 if (newmap)
1386 goto done;
1387
1388 /* full maps */
1389 ceph_decode_32_safe(&p, end, nr_maps, bad);
1390 dout(" %d full maps\n", nr_maps);
1391 while (nr_maps) {
1392 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1393 epoch = ceph_decode_32(&p);
1394 maplen = ceph_decode_32(&p);
f24e9980
SW
1395 ceph_decode_need(&p, end, maplen, bad);
1396 if (nr_maps > 1) {
1397 dout("skipping non-latest full map %u len %d\n",
1398 epoch, maplen);
1399 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1400 dout("skipping full map %u len %d, "
1401 "older than our %u\n", epoch, maplen,
1402 osdc->osdmap->epoch);
1403 } else {
38d6453c
SW
1404 int skipped_map = 0;
1405
f24e9980
SW
1406 dout("taking full map %u len %d\n", epoch, maplen);
1407 newmap = osdmap_decode(&p, p+maplen);
1408 if (IS_ERR(newmap)) {
1409 err = PTR_ERR(newmap);
1410 goto bad;
1411 }
30dc6381 1412 BUG_ON(!newmap);
f24e9980
SW
1413 oldmap = osdc->osdmap;
1414 osdc->osdmap = newmap;
38d6453c
SW
1415 if (oldmap) {
1416 if (oldmap->epoch + 1 < newmap->epoch)
1417 skipped_map = 1;
f24e9980 1418 ceph_osdmap_destroy(oldmap);
38d6453c
SW
1419 }
1420 kick_requests(osdc, skipped_map);
f24e9980
SW
1421 }
1422 p += maplen;
1423 nr_maps--;
1424 }
1425
1426done:
1427 downgrade_write(&osdc->map_sem);
1428 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
cd634fb6
SW
1429
1430 /*
1431 * subscribe to subsequent osdmap updates if full to ensure
1432 * we find out when we are no longer full and stop returning
1433 * ENOSPC.
1434 */
1435 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1436 ceph_monc_request_next_osdmap(&osdc->client->monc);
1437
6f6c7006 1438 send_queued(osdc);
f24e9980 1439 up_read(&osdc->map_sem);
03066f23 1440 wake_up_all(&osdc->client->auth_wq);
f24e9980
SW
1441 return;
1442
1443bad:
1444 pr_err("osdc handle_map corrupt msg\n");
9ec7cab1 1445 ceph_msg_dump(msg);
f24e9980
SW
1446 up_write(&osdc->map_sem);
1447 return;
1448}
1449
a40c4f10
YS
1450/*
1451 * watch/notify callback event infrastructure
1452 *
1453 * These callbacks are used both for watch and notify operations.
1454 */
1455static void __release_event(struct kref *kref)
1456{
1457 struct ceph_osd_event *event =
1458 container_of(kref, struct ceph_osd_event, kref);
1459
1460 dout("__release_event %p\n", event);
1461 kfree(event);
1462}
1463
1464static void get_event(struct ceph_osd_event *event)
1465{
1466 kref_get(&event->kref);
1467}
1468
1469void ceph_osdc_put_event(struct ceph_osd_event *event)
1470{
1471 kref_put(&event->kref, __release_event);
1472}
1473EXPORT_SYMBOL(ceph_osdc_put_event);
1474
1475static void __insert_event(struct ceph_osd_client *osdc,
1476 struct ceph_osd_event *new)
1477{
1478 struct rb_node **p = &osdc->event_tree.rb_node;
1479 struct rb_node *parent = NULL;
1480 struct ceph_osd_event *event = NULL;
1481
1482 while (*p) {
1483 parent = *p;
1484 event = rb_entry(parent, struct ceph_osd_event, node);
1485 if (new->cookie < event->cookie)
1486 p = &(*p)->rb_left;
1487 else if (new->cookie > event->cookie)
1488 p = &(*p)->rb_right;
1489 else
1490 BUG();
1491 }
1492
1493 rb_link_node(&new->node, parent, p);
1494 rb_insert_color(&new->node, &osdc->event_tree);
1495}
1496
1497static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1498 u64 cookie)
1499{
1500 struct rb_node **p = &osdc->event_tree.rb_node;
1501 struct rb_node *parent = NULL;
1502 struct ceph_osd_event *event = NULL;
1503
1504 while (*p) {
1505 parent = *p;
1506 event = rb_entry(parent, struct ceph_osd_event, node);
1507 if (cookie < event->cookie)
1508 p = &(*p)->rb_left;
1509 else if (cookie > event->cookie)
1510 p = &(*p)->rb_right;
1511 else
1512 return event;
1513 }
1514 return NULL;
1515}
1516
1517static void __remove_event(struct ceph_osd_event *event)
1518{
1519 struct ceph_osd_client *osdc = event->osdc;
1520
1521 if (!RB_EMPTY_NODE(&event->node)) {
1522 dout("__remove_event removed %p\n", event);
1523 rb_erase(&event->node, &osdc->event_tree);
1524 ceph_osdc_put_event(event);
1525 } else {
1526 dout("__remove_event didn't remove %p\n", event);
1527 }
1528}
1529
1530int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1531 void (*event_cb)(u64, u64, u8, void *),
1532 int one_shot, void *data,
1533 struct ceph_osd_event **pevent)
1534{
1535 struct ceph_osd_event *event;
1536
1537 event = kmalloc(sizeof(*event), GFP_NOIO);
1538 if (!event)
1539 return -ENOMEM;
1540
1541 dout("create_event %p\n", event);
1542 event->cb = event_cb;
1543 event->one_shot = one_shot;
1544 event->data = data;
1545 event->osdc = osdc;
1546 INIT_LIST_HEAD(&event->osd_node);
3ee5234d 1547 RB_CLEAR_NODE(&event->node);
a40c4f10
YS
1548 kref_init(&event->kref); /* one ref for us */
1549 kref_get(&event->kref); /* one ref for the caller */
1550 init_completion(&event->completion);
1551
1552 spin_lock(&osdc->event_lock);
1553 event->cookie = ++osdc->event_count;
1554 __insert_event(osdc, event);
1555 spin_unlock(&osdc->event_lock);
1556
1557 *pevent = event;
1558 return 0;
1559}
1560EXPORT_SYMBOL(ceph_osdc_create_event);
1561
1562void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1563{
1564 struct ceph_osd_client *osdc = event->osdc;
1565
1566 dout("cancel_event %p\n", event);
1567 spin_lock(&osdc->event_lock);
1568 __remove_event(event);
1569 spin_unlock(&osdc->event_lock);
1570 ceph_osdc_put_event(event); /* caller's */
1571}
1572EXPORT_SYMBOL(ceph_osdc_cancel_event);
1573
1574
1575static void do_event_work(struct work_struct *work)
1576{
1577 struct ceph_osd_event_work *event_work =
1578 container_of(work, struct ceph_osd_event_work, work);
1579 struct ceph_osd_event *event = event_work->event;
1580 u64 ver = event_work->ver;
1581 u64 notify_id = event_work->notify_id;
1582 u8 opcode = event_work->opcode;
1583
1584 dout("do_event_work completing %p\n", event);
1585 event->cb(ver, notify_id, opcode, event->data);
1586 complete(&event->completion);
1587 dout("do_event_work completed %p\n", event);
1588 ceph_osdc_put_event(event);
1589 kfree(event_work);
1590}
1591
1592
1593/*
1594 * Process osd watch notifications
1595 */
1596void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1597{
1598 void *p, *end;
1599 u8 proto_ver;
1600 u64 cookie, ver, notify_id;
1601 u8 opcode;
1602 struct ceph_osd_event *event;
1603 struct ceph_osd_event_work *event_work;
1604
1605 p = msg->front.iov_base;
1606 end = p + msg->front.iov_len;
1607
1608 ceph_decode_8_safe(&p, end, proto_ver, bad);
1609 ceph_decode_8_safe(&p, end, opcode, bad);
1610 ceph_decode_64_safe(&p, end, cookie, bad);
1611 ceph_decode_64_safe(&p, end, ver, bad);
1612 ceph_decode_64_safe(&p, end, notify_id, bad);
1613
1614 spin_lock(&osdc->event_lock);
1615 event = __find_event(osdc, cookie);
1616 if (event) {
1617 get_event(event);
1618 if (event->one_shot)
1619 __remove_event(event);
1620 }
1621 spin_unlock(&osdc->event_lock);
1622 dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1623 cookie, ver, event);
1624 if (event) {
1625 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
a40c4f10
YS
1626 if (!event_work) {
1627 dout("ERROR: could not allocate event_work\n");
1628 goto done_err;
1629 }
6b0ae409 1630 INIT_WORK(&event_work->work, do_event_work);
a40c4f10
YS
1631 event_work->event = event;
1632 event_work->ver = ver;
1633 event_work->notify_id = notify_id;
1634 event_work->opcode = opcode;
1635 if (!queue_work(osdc->notify_wq, &event_work->work)) {
1636 dout("WARNING: failed to queue notify event work\n");
1637 goto done_err;
1638 }
1639 }
1640
1641 return;
1642
1643done_err:
1644 complete(&event->completion);
1645 ceph_osdc_put_event(event);
1646 return;
1647
1648bad:
1649 pr_err("osdc handle_watch_notify corrupt msg\n");
1650 return;
1651}
1652
1653int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1654{
1655 int err;
1656
1657 dout("wait_event %p\n", event);
1658 err = wait_for_completion_interruptible_timeout(&event->completion,
1659 timeout * HZ);
1660 ceph_osdc_put_event(event);
1661 if (err > 0)
1662 err = 0;
1663 dout("wait_event %p returns %d\n", event, err);
1664 return err;
1665}
1666EXPORT_SYMBOL(ceph_osdc_wait_event);
1667
f24e9980
SW
1668/*
1669 * Register request, send initial attempt.
1670 */
1671int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1672 struct ceph_osd_request *req,
1673 bool nofail)
1674{
c1ea8823 1675 int rc = 0;
f24e9980
SW
1676
1677 req->r_request->pages = req->r_pages;
1678 req->r_request->nr_pages = req->r_num_pages;
68b4476b
YS
1679#ifdef CONFIG_BLOCK
1680 req->r_request->bio = req->r_bio;
1681#endif
c885837f 1682 req->r_request->trail = &req->r_trail;
f24e9980
SW
1683
1684 register_request(osdc, req);
1685
1686 down_read(&osdc->map_sem);
1687 mutex_lock(&osdc->request_mutex);
c1ea8823
SW
1688 /*
1689 * a racing kick_requests() may have sent the message for us
1690 * while we dropped request_mutex above, so only send now if
1691 * the request still han't been touched yet.
1692 */
1693 if (req->r_sent == 0) {
38d6453c 1694 rc = __map_request(osdc, req, 0);
9d6fcb08
SW
1695 if (rc < 0) {
1696 if (nofail) {
1697 dout("osdc_start_request failed map, "
1698 " will retry %lld\n", req->r_tid);
1699 rc = 0;
1700 }
234af26f 1701 goto out_unlock;
9d6fcb08 1702 }
6f6c7006
SW
1703 if (req->r_osd == NULL) {
1704 dout("send_request %p no up osds in pg\n", req);
1705 ceph_monc_request_next_osdmap(&osdc->client->monc);
1706 } else {
56e925b6 1707 __send_request(osdc, req);
f24e9980 1708 }
56e925b6 1709 rc = 0;
f24e9980 1710 }
234af26f
DC
1711
1712out_unlock:
f24e9980
SW
1713 mutex_unlock(&osdc->request_mutex);
1714 up_read(&osdc->map_sem);
1715 return rc;
1716}
3d14c5d2 1717EXPORT_SYMBOL(ceph_osdc_start_request);
f24e9980
SW
1718
1719/*
1720 * wait for a request to complete
1721 */
1722int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1723 struct ceph_osd_request *req)
1724{
1725 int rc;
1726
1727 rc = wait_for_completion_interruptible(&req->r_completion);
1728 if (rc < 0) {
1729 mutex_lock(&osdc->request_mutex);
1730 __cancel_request(req);
529cfcc4 1731 __unregister_request(osdc, req);
f24e9980 1732 mutex_unlock(&osdc->request_mutex);
25845472 1733 complete_request(req);
529cfcc4 1734 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
f24e9980
SW
1735 return rc;
1736 }
1737
1738 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1739 return req->r_result;
1740}
3d14c5d2 1741EXPORT_SYMBOL(ceph_osdc_wait_request);
f24e9980
SW
1742
1743/*
1744 * sync - wait for all in-flight requests to flush. avoid starvation.
1745 */
1746void ceph_osdc_sync(struct ceph_osd_client *osdc)
1747{
1748 struct ceph_osd_request *req;
1749 u64 last_tid, next_tid = 0;
1750
1751 mutex_lock(&osdc->request_mutex);
1752 last_tid = osdc->last_tid;
1753 while (1) {
1754 req = __lookup_request_ge(osdc, next_tid);
1755 if (!req)
1756 break;
1757 if (req->r_tid > last_tid)
1758 break;
1759
1760 next_tid = req->r_tid + 1;
1761 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1762 continue;
1763
1764 ceph_osdc_get_request(req);
1765 mutex_unlock(&osdc->request_mutex);
1766 dout("sync waiting on tid %llu (last is %llu)\n",
1767 req->r_tid, last_tid);
1768 wait_for_completion(&req->r_safe_completion);
1769 mutex_lock(&osdc->request_mutex);
1770 ceph_osdc_put_request(req);
1771 }
1772 mutex_unlock(&osdc->request_mutex);
1773 dout("sync done (thru tid %llu)\n", last_tid);
1774}
3d14c5d2 1775EXPORT_SYMBOL(ceph_osdc_sync);
f24e9980
SW
1776
1777/*
1778 * init, shutdown
1779 */
1780int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1781{
1782 int err;
1783
1784 dout("init\n");
1785 osdc->client = client;
1786 osdc->osdmap = NULL;
1787 init_rwsem(&osdc->map_sem);
1788 init_completion(&osdc->map_waiters);
1789 osdc->last_requested_map = 0;
1790 mutex_init(&osdc->request_mutex);
f24e9980
SW
1791 osdc->last_tid = 0;
1792 osdc->osds = RB_ROOT;
f5a2041b 1793 INIT_LIST_HEAD(&osdc->osd_lru);
f24e9980 1794 osdc->requests = RB_ROOT;
422d2cb8 1795 INIT_LIST_HEAD(&osdc->req_lru);
6f6c7006
SW
1796 INIT_LIST_HEAD(&osdc->req_unsent);
1797 INIT_LIST_HEAD(&osdc->req_notarget);
a40c4f10 1798 INIT_LIST_HEAD(&osdc->req_linger);
f24e9980
SW
1799 osdc->num_requests = 0;
1800 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
f5a2041b 1801 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
a40c4f10
YS
1802 spin_lock_init(&osdc->event_lock);
1803 osdc->event_tree = RB_ROOT;
1804 osdc->event_count = 0;
f5a2041b
YS
1805
1806 schedule_delayed_work(&osdc->osds_timeout_work,
3d14c5d2 1807 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
f24e9980 1808
5f44f142 1809 err = -ENOMEM;
f24e9980
SW
1810 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1811 sizeof(struct ceph_osd_request));
1812 if (!osdc->req_mempool)
5f44f142 1813 goto out;
f24e9980 1814
d50b409f
SW
1815 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
1816 OSD_OP_FRONT_LEN, 10, true,
4f48280e 1817 "osd_op");
f24e9980 1818 if (err < 0)
5f44f142 1819 goto out_mempool;
d50b409f 1820 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
4f48280e
SW
1821 OSD_OPREPLY_FRONT_LEN, 10, true,
1822 "osd_op_reply");
c16e7869
SW
1823 if (err < 0)
1824 goto out_msgpool;
a40c4f10
YS
1825
1826 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1827 if (IS_ERR(osdc->notify_wq)) {
1828 err = PTR_ERR(osdc->notify_wq);
1829 osdc->notify_wq = NULL;
1830 goto out_msgpool;
1831 }
f24e9980 1832 return 0;
5f44f142 1833
c16e7869
SW
1834out_msgpool:
1835 ceph_msgpool_destroy(&osdc->msgpool_op);
5f44f142
SW
1836out_mempool:
1837 mempool_destroy(osdc->req_mempool);
1838out:
1839 return err;
f24e9980 1840}
3d14c5d2 1841EXPORT_SYMBOL(ceph_osdc_init);
f24e9980
SW
1842
1843void ceph_osdc_stop(struct ceph_osd_client *osdc)
1844{
a40c4f10
YS
1845 flush_workqueue(osdc->notify_wq);
1846 destroy_workqueue(osdc->notify_wq);
f24e9980 1847 cancel_delayed_work_sync(&osdc->timeout_work);
f5a2041b 1848 cancel_delayed_work_sync(&osdc->osds_timeout_work);
f24e9980
SW
1849 if (osdc->osdmap) {
1850 ceph_osdmap_destroy(osdc->osdmap);
1851 osdc->osdmap = NULL;
1852 }
aca420bc 1853 remove_all_osds(osdc);
f24e9980
SW
1854 mempool_destroy(osdc->req_mempool);
1855 ceph_msgpool_destroy(&osdc->msgpool_op);
c16e7869 1856 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
f24e9980 1857}
3d14c5d2 1858EXPORT_SYMBOL(ceph_osdc_stop);
f24e9980
SW
1859
1860/*
1861 * Read some contiguous pages. If we cross a stripe boundary, shorten
1862 * *plen. Return number of bytes read, or error.
1863 */
1864int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1865 struct ceph_vino vino, struct ceph_file_layout *layout,
1866 u64 off, u64 *plen,
1867 u32 truncate_seq, u64 truncate_size,
b7495fc2 1868 struct page **pages, int num_pages, int page_align)
f24e9980
SW
1869{
1870 struct ceph_osd_request *req;
1871 int rc = 0;
1872
1873 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1874 vino.snap, off, *plen);
1875 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1876 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1877 NULL, 0, truncate_seq, truncate_size, NULL,
b7495fc2 1878 false, 1, page_align);
6816282d
SW
1879 if (IS_ERR(req))
1880 return PTR_ERR(req);
f24e9980
SW
1881
1882 /* it may be a short read due to an object boundary */
1883 req->r_pages = pages;
f24e9980 1884
b7495fc2
SW
1885 dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1886 off, *plen, req->r_num_pages, page_align);
f24e9980
SW
1887
1888 rc = ceph_osdc_start_request(osdc, req, false);
1889 if (!rc)
1890 rc = ceph_osdc_wait_request(osdc, req);
1891
1892 ceph_osdc_put_request(req);
1893 dout("readpages result %d\n", rc);
1894 return rc;
1895}
3d14c5d2 1896EXPORT_SYMBOL(ceph_osdc_readpages);
f24e9980
SW
1897
1898/*
1899 * do a synchronous write on N pages
1900 */
1901int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1902 struct ceph_file_layout *layout,
1903 struct ceph_snap_context *snapc,
1904 u64 off, u64 len,
1905 u32 truncate_seq, u64 truncate_size,
1906 struct timespec *mtime,
1907 struct page **pages, int num_pages,
1908 int flags, int do_sync, bool nofail)
1909{
1910 struct ceph_osd_request *req;
1911 int rc = 0;
b7495fc2 1912 int page_align = off & ~PAGE_MASK;
f24e9980
SW
1913
1914 BUG_ON(vino.snap != CEPH_NOSNAP);
1915 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1916 CEPH_OSD_OP_WRITE,
1917 flags | CEPH_OSD_FLAG_ONDISK |
1918 CEPH_OSD_FLAG_WRITE,
1919 snapc, do_sync,
1920 truncate_seq, truncate_size, mtime,
b7495fc2 1921 nofail, 1, page_align);
6816282d
SW
1922 if (IS_ERR(req))
1923 return PTR_ERR(req);
f24e9980
SW
1924
1925 /* it may be a short write due to an object boundary */
1926 req->r_pages = pages;
f24e9980
SW
1927 dout("writepages %llu~%llu (%d pages)\n", off, len,
1928 req->r_num_pages);
1929
1930 rc = ceph_osdc_start_request(osdc, req, nofail);
1931 if (!rc)
1932 rc = ceph_osdc_wait_request(osdc, req);
1933
1934 ceph_osdc_put_request(req);
1935 if (rc == 0)
1936 rc = len;
1937 dout("writepages result %d\n", rc);
1938 return rc;
1939}
3d14c5d2 1940EXPORT_SYMBOL(ceph_osdc_writepages);
f24e9980
SW
1941
1942/*
1943 * handle incoming message
1944 */
1945static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1946{
1947 struct ceph_osd *osd = con->private;
32c895e7 1948 struct ceph_osd_client *osdc;
f24e9980
SW
1949 int type = le16_to_cpu(msg->hdr.type);
1950
1951 if (!osd)
4a32f93d 1952 goto out;
32c895e7 1953 osdc = osd->o_osdc;
f24e9980
SW
1954
1955 switch (type) {
1956 case CEPH_MSG_OSD_MAP:
1957 ceph_osdc_handle_map(osdc, msg);
1958 break;
1959 case CEPH_MSG_OSD_OPREPLY:
350b1c32 1960 handle_reply(osdc, msg, con);
f24e9980 1961 break;
a40c4f10
YS
1962 case CEPH_MSG_WATCH_NOTIFY:
1963 handle_watch_notify(osdc, msg);
1964 break;
f24e9980
SW
1965
1966 default:
1967 pr_err("received unknown message type %d %s\n", type,
1968 ceph_msg_type_name(type));
1969 }
4a32f93d 1970out:
f24e9980
SW
1971 ceph_msg_put(msg);
1972}
1973
5b3a4db3 1974/*
21b667f6
SW
1975 * lookup and return message for incoming reply. set up reply message
1976 * pages.
5b3a4db3
SW
1977 */
1978static struct ceph_msg *get_reply(struct ceph_connection *con,
2450418c
YS
1979 struct ceph_msg_header *hdr,
1980 int *skip)
f24e9980
SW
1981{
1982 struct ceph_osd *osd = con->private;
1983 struct ceph_osd_client *osdc = osd->o_osdc;
2450418c 1984 struct ceph_msg *m;
0547a9b3 1985 struct ceph_osd_request *req;
5b3a4db3
SW
1986 int front = le32_to_cpu(hdr->front_len);
1987 int data_len = le32_to_cpu(hdr->data_len);
0547a9b3 1988 u64 tid;
f24e9980 1989
0547a9b3
YS
1990 tid = le64_to_cpu(hdr->tid);
1991 mutex_lock(&osdc->request_mutex);
1992 req = __lookup_request(osdc, tid);
1993 if (!req) {
1994 *skip = 1;
1995 m = NULL;
756a16a5
SW
1996 dout("get_reply unknown tid %llu from osd%d\n", tid,
1997 osd->o_osd);
0547a9b3
YS
1998 goto out;
1999 }
c16e7869
SW
2000
2001 if (req->r_con_filling_msg) {
8921d114 2002 dout("%s revoking msg %p from old con %p\n", __func__,
c16e7869 2003 req->r_reply, req->r_con_filling_msg);
8921d114 2004 ceph_msg_revoke_incoming(req->r_reply);
0d47766f 2005 req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
6f46cb29 2006 req->r_con_filling_msg = NULL;
0547a9b3
YS
2007 }
2008
c16e7869
SW
2009 if (front > req->r_reply->front.iov_len) {
2010 pr_warning("get_reply front %d > preallocated %d\n",
2011 front, (int)req->r_reply->front.iov_len);
b61c2763 2012 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
a79832f2 2013 if (!m)
c16e7869
SW
2014 goto out;
2015 ceph_msg_put(req->r_reply);
2016 req->r_reply = m;
2017 }
2018 m = ceph_msg_get(req->r_reply);
2019
0547a9b3 2020 if (data_len > 0) {
b7495fc2 2021 int want = calc_pages_for(req->r_page_alignment, data_len);
21b667f6
SW
2022
2023 if (unlikely(req->r_num_pages < want)) {
9bb0ce2b
SW
2024 pr_warning("tid %lld reply has %d bytes %d pages, we"
2025 " had only %d pages ready\n", tid, data_len,
2026 want, req->r_num_pages);
0547a9b3
YS
2027 *skip = 1;
2028 ceph_msg_put(m);
a79832f2 2029 m = NULL;
21b667f6 2030 goto out;
0547a9b3 2031 }
21b667f6
SW
2032 m->pages = req->r_pages;
2033 m->nr_pages = req->r_num_pages;
c5c6b19d 2034 m->page_alignment = req->r_page_alignment;
68b4476b
YS
2035#ifdef CONFIG_BLOCK
2036 m->bio = req->r_bio;
2037#endif
0547a9b3 2038 }
5b3a4db3 2039 *skip = 0;
0d47766f 2040 req->r_con_filling_msg = con->ops->get(con);
c16e7869 2041 dout("get_reply tid %lld %p\n", tid, m);
0547a9b3
YS
2042
2043out:
2044 mutex_unlock(&osdc->request_mutex);
2450418c 2045 return m;
5b3a4db3
SW
2046
2047}
2048
2049static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2050 struct ceph_msg_header *hdr,
2051 int *skip)
2052{
2053 struct ceph_osd *osd = con->private;
2054 int type = le16_to_cpu(hdr->type);
2055 int front = le32_to_cpu(hdr->front_len);
2056
1c20f2d2 2057 *skip = 0;
5b3a4db3
SW
2058 switch (type) {
2059 case CEPH_MSG_OSD_MAP:
a40c4f10 2060 case CEPH_MSG_WATCH_NOTIFY:
b61c2763 2061 return ceph_msg_new(type, front, GFP_NOFS, false);
5b3a4db3
SW
2062 case CEPH_MSG_OSD_OPREPLY:
2063 return get_reply(con, hdr, skip);
2064 default:
2065 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2066 osd->o_osd);
2067 *skip = 1;
2068 return NULL;
2069 }
f24e9980
SW
2070}
2071
2072/*
2073 * Wrappers to refcount containing ceph_osd struct
2074 */
2075static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2076{
2077 struct ceph_osd *osd = con->private;
2078 if (get_osd(osd))
2079 return con;
2080 return NULL;
2081}
2082
2083static void put_osd_con(struct ceph_connection *con)
2084{
2085 struct ceph_osd *osd = con->private;
2086 put_osd(osd);
2087}
2088
4e7a5dcd
SW
2089/*
2090 * authentication
2091 */
a3530df3
AE
2092/*
2093 * Note: returned pointer is the address of a structure that's
2094 * managed separately. Caller must *not* attempt to free it.
2095 */
2096static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
8f43fb53 2097 int *proto, int force_new)
4e7a5dcd
SW
2098{
2099 struct ceph_osd *o = con->private;
2100 struct ceph_osd_client *osdc = o->o_osdc;
2101 struct ceph_auth_client *ac = osdc->client->monc.auth;
74f1869f 2102 struct ceph_auth_handshake *auth = &o->o_auth;
4e7a5dcd 2103
74f1869f 2104 if (force_new && auth->authorizer) {
a255651d
AE
2105 if (ac->ops && ac->ops->destroy_authorizer)
2106 ac->ops->destroy_authorizer(ac, auth->authorizer);
74f1869f
AE
2107 auth->authorizer = NULL;
2108 }
a255651d 2109 if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
a3530df3
AE
2110 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2111 auth);
4e7a5dcd 2112 if (ret)
a3530df3 2113 return ERR_PTR(ret);
4e7a5dcd 2114 }
4e7a5dcd 2115 *proto = ac->protocol;
74f1869f 2116
a3530df3 2117 return auth;
4e7a5dcd
SW
2118}
2119
2120
2121static int verify_authorizer_reply(struct ceph_connection *con, int len)
2122{
2123 struct ceph_osd *o = con->private;
2124 struct ceph_osd_client *osdc = o->o_osdc;
2125 struct ceph_auth_client *ac = osdc->client->monc.auth;
2126
a255651d
AE
2127 /*
2128 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2129 * XXX which do we do: succeed or fail?
2130 */
6c4a1915 2131 return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
4e7a5dcd
SW
2132}
2133
9bd2e6f8
SW
2134static int invalidate_authorizer(struct ceph_connection *con)
2135{
2136 struct ceph_osd *o = con->private;
2137 struct ceph_osd_client *osdc = o->o_osdc;
2138 struct ceph_auth_client *ac = osdc->client->monc.auth;
2139
a255651d 2140 if (ac->ops && ac->ops->invalidate_authorizer)
9bd2e6f8
SW
2141 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2142
2143 return ceph_monc_validate_auth(&osdc->client->monc);
2144}
4e7a5dcd 2145
9e32789f 2146static const struct ceph_connection_operations osd_con_ops = {
f24e9980
SW
2147 .get = get_osd_con,
2148 .put = put_osd_con,
2149 .dispatch = dispatch,
4e7a5dcd
SW
2150 .get_authorizer = get_authorizer,
2151 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 2152 .invalidate_authorizer = invalidate_authorizer,
f24e9980 2153 .alloc_msg = alloc_msg,
81b024e7 2154 .fault = osd_reset,
f24e9980 2155};