]> git.proxmox.com Git - mirror_ubuntu-zesty-kernel.git/blob - fs/ceph/osd_client.c
2647dafd96f5d6ddb343c99ffc0916f6383fd74b
[mirror_ubuntu-zesty-kernel.git] / fs / ceph / osd_client.c
1 #include "ceph_debug.h"
2
3 #include <linux/err.h>
4 #include <linux/highmem.h>
5 #include <linux/mm.h>
6 #include <linux/pagemap.h>
7 #include <linux/slab.h>
8 #include <linux/uaccess.h>
9
10 #include "super.h"
11 #include "osd_client.h"
12 #include "messenger.h"
13 #include "decode.h"
14 #include "auth.h"
15
16 #define OSD_OP_FRONT_LEN 4096
17 #define OSD_OPREPLY_FRONT_LEN 512
18
19 static const struct ceph_connection_operations osd_con_ops;
20 static int __kick_requests(struct ceph_osd_client *osdc,
21 struct ceph_osd *kickosd);
22
23 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
24
25 void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
26 struct ceph_file_layout *layout,
27 u64 snapid,
28 u64 off, u64 len, u64 *bno,
29 struct ceph_osd_request *req)
30 {
31 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
32 struct ceph_osd_op *op = (void *)(reqhead + 1);
33 u64 orig_len = len;
34 u64 objoff, objlen; /* extent in object */
35
36 reqhead->snapid = cpu_to_le64(snapid);
37
38 /* object extent? */
39 ceph_calc_file_object_mapping(layout, off, &len, bno,
40 &objoff, &objlen);
41 if (len < orig_len)
42 dout(" skipping last %llu, final file extent %llu~%llu\n",
43 orig_len - len, off, len);
44
45 op->extent.offset = cpu_to_le64(objoff);
46 op->extent.length = cpu_to_le64(objlen);
47 req->r_num_pages = calc_pages_for(off, len);
48
49 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
50 *bno, objoff, objlen, req->r_num_pages);
51
52 }
53
54 /*
55 * Implement client access to distributed object storage cluster.
56 *
57 * All data objects are stored within a cluster/cloud of OSDs, or
58 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
59 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
60 * remote daemons serving up and coordinating consistent and safe
61 * access to storage.
62 *
63 * Cluster membership and the mapping of data objects onto storage devices
64 * are described by the osd map.
65 *
66 * We keep track of pending OSD requests (read, write), resubmit
67 * requests to different OSDs when the cluster topology/data layout
68 * change, or retry the affected requests when the communications
69 * channel with an OSD is reset.
70 */
71
72 /*
73 * calculate the mapping of a file extent onto an object, and fill out the
74 * request accordingly. shorten extent as necessary if it crosses an
75 * object boundary.
76 *
77 * fill osd op in request message.
78 */
79 static void calc_layout(struct ceph_osd_client *osdc,
80 struct ceph_vino vino,
81 struct ceph_file_layout *layout,
82 u64 off, u64 *plen,
83 struct ceph_osd_request *req)
84 {
85 u64 bno;
86
87 ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
88
89 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
90 req->r_oid_len = strlen(req->r_oid);
91 }
92
93 /*
94 * requests
95 */
96 void ceph_osdc_release_request(struct kref *kref)
97 {
98 struct ceph_osd_request *req = container_of(kref,
99 struct ceph_osd_request,
100 r_kref);
101
102 if (req->r_request)
103 ceph_msg_put(req->r_request);
104 if (req->r_reply)
105 ceph_msg_put(req->r_reply);
106 if (req->r_con_filling_msg) {
107 dout("release_request revoking pages %p from con %p\n",
108 req->r_pages, req->r_con_filling_msg);
109 ceph_con_revoke_message(req->r_con_filling_msg,
110 req->r_reply);
111 ceph_con_put(req->r_con_filling_msg);
112 }
113 if (req->r_own_pages)
114 ceph_release_page_vector(req->r_pages,
115 req->r_num_pages);
116 ceph_put_snap_context(req->r_snapc);
117 if (req->r_mempool)
118 mempool_free(req, req->r_osdc->req_mempool);
119 else
120 kfree(req);
121 }
122
123 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
124 int flags,
125 struct ceph_snap_context *snapc,
126 int do_sync,
127 bool use_mempool,
128 gfp_t gfp_flags,
129 struct page **pages)
130 {
131 struct ceph_osd_request *req;
132 struct ceph_msg *msg;
133 int num_op = 1 + do_sync;
134 size_t msg_size = sizeof(struct ceph_osd_request_head) +
135 num_op*sizeof(struct ceph_osd_op);
136
137 if (use_mempool) {
138 req = mempool_alloc(osdc->req_mempool, gfp_flags);
139 memset(req, 0, sizeof(*req));
140 } else {
141 req = kzalloc(sizeof(*req), gfp_flags);
142 }
143 if (!req)
144 return NULL;
145
146 if (use_mempool) {
147 req = mempool_alloc(osdc->req_mempool, gfp_flags);
148 memset(req, 0, sizeof(*req));
149 } else {
150 req = kzalloc(sizeof(*req), gfp_flags);
151 }
152 if (req == NULL)
153 return NULL;
154
155 req->r_osdc = osdc;
156 req->r_mempool = use_mempool;
157 kref_init(&req->r_kref);
158 init_completion(&req->r_completion);
159 init_completion(&req->r_safe_completion);
160 INIT_LIST_HEAD(&req->r_unsafe_item);
161 req->r_flags = flags;
162
163 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
164
165 /* create reply message */
166 if (use_mempool)
167 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
168 else
169 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
170 OSD_OPREPLY_FRONT_LEN, gfp_flags);
171 if (!msg) {
172 ceph_osdc_put_request(req);
173 return NULL;
174 }
175 req->r_reply = msg;
176
177 /* create request message; allow space for oid */
178 msg_size += 40;
179 if (snapc)
180 msg_size += sizeof(u64) * snapc->num_snaps;
181 if (use_mempool)
182 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
183 else
184 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
185 if (!msg) {
186 ceph_osdc_put_request(req);
187 return NULL;
188 }
189 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
190 memset(msg->front.iov_base, 0, msg->front.iov_len);
191
192 req->r_request = msg;
193 req->r_pages = pages;
194
195 return req;
196 }
197
198 /*
199 * build new request AND message
200 *
201 */
202 void ceph_osdc_build_request(struct ceph_osd_request *req,
203 u64 off, u64 *plen,
204 int opcode,
205 struct ceph_snap_context *snapc,
206 int do_sync,
207 u32 truncate_seq,
208 u64 truncate_size,
209 struct timespec *mtime,
210 const char *oid,
211 int oid_len)
212 {
213 struct ceph_msg *msg = req->r_request;
214 struct ceph_osd_request_head *head;
215 struct ceph_osd_op *op;
216 void *p;
217 int num_op = 1 + do_sync;
218 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
219 int i;
220 int flags = req->r_flags;
221
222 head = msg->front.iov_base;
223 op = (void *)(head + 1);
224 p = (void *)(op + num_op);
225
226 req->r_snapc = ceph_get_snap_context(snapc);
227
228 head->client_inc = cpu_to_le32(1); /* always, for now. */
229 head->flags = cpu_to_le32(flags);
230 if (flags & CEPH_OSD_FLAG_WRITE)
231 ceph_encode_timespec(&head->mtime, mtime);
232 head->num_ops = cpu_to_le16(num_op);
233 op->op = cpu_to_le16(opcode);
234
235 if (flags & CEPH_OSD_FLAG_WRITE) {
236 req->r_request->hdr.data_off = cpu_to_le16(off);
237 req->r_request->hdr.data_len = cpu_to_le32(*plen);
238 op->payload_len = cpu_to_le32(*plen);
239 }
240 op->extent.truncate_size = cpu_to_le64(truncate_size);
241 op->extent.truncate_seq = cpu_to_le32(truncate_seq);
242
243 /* fill in oid */
244 head->object_len = cpu_to_le32(oid_len);
245 memcpy(p, oid, oid_len);
246 p += oid_len;
247
248 if (do_sync) {
249 op++;
250 op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
251 }
252 if (snapc) {
253 head->snap_seq = cpu_to_le64(snapc->seq);
254 head->num_snaps = cpu_to_le32(snapc->num_snaps);
255 for (i = 0; i < snapc->num_snaps; i++) {
256 put_unaligned_le64(snapc->snaps[i], p);
257 p += sizeof(u64);
258 }
259 }
260
261 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
262 msg_size = p - msg->front.iov_base;
263 msg->front.iov_len = msg_size;
264 msg->hdr.front_len = cpu_to_le32(msg_size);
265 return;
266 }
267
268 /*
269 * build new request AND message, calculate layout, and adjust file
270 * extent as needed.
271 *
272 * if the file was recently truncated, we include information about its
273 * old and new size so that the object can be updated appropriately. (we
274 * avoid synchronously deleting truncated objects because it's slow.)
275 *
276 * if @do_sync, include a 'startsync' command so that the osd will flush
277 * data quickly.
278 */
279 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
280 struct ceph_file_layout *layout,
281 struct ceph_vino vino,
282 u64 off, u64 *plen,
283 int opcode, int flags,
284 struct ceph_snap_context *snapc,
285 int do_sync,
286 u32 truncate_seq,
287 u64 truncate_size,
288 struct timespec *mtime,
289 bool use_mempool, int num_reply)
290 {
291 struct ceph_osd_request *req =
292 ceph_osdc_alloc_request(osdc, flags,
293 snapc, do_sync,
294 use_mempool,
295 GFP_NOFS, NULL);
296 if (IS_ERR(req))
297 return req;
298
299 /* calculate max write size */
300 calc_layout(osdc, vino, layout, off, plen, req);
301 req->r_file_layout = *layout; /* keep a copy */
302
303 ceph_osdc_build_request(req, off, plen, opcode,
304 snapc, do_sync,
305 truncate_seq, truncate_size,
306 mtime,
307 req->r_oid, req->r_oid_len);
308
309 return req;
310 }
311
312 /*
313 * We keep osd requests in an rbtree, sorted by ->r_tid.
314 */
315 static void __insert_request(struct ceph_osd_client *osdc,
316 struct ceph_osd_request *new)
317 {
318 struct rb_node **p = &osdc->requests.rb_node;
319 struct rb_node *parent = NULL;
320 struct ceph_osd_request *req = NULL;
321
322 while (*p) {
323 parent = *p;
324 req = rb_entry(parent, struct ceph_osd_request, r_node);
325 if (new->r_tid < req->r_tid)
326 p = &(*p)->rb_left;
327 else if (new->r_tid > req->r_tid)
328 p = &(*p)->rb_right;
329 else
330 BUG();
331 }
332
333 rb_link_node(&new->r_node, parent, p);
334 rb_insert_color(&new->r_node, &osdc->requests);
335 }
336
337 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
338 u64 tid)
339 {
340 struct ceph_osd_request *req;
341 struct rb_node *n = osdc->requests.rb_node;
342
343 while (n) {
344 req = rb_entry(n, struct ceph_osd_request, r_node);
345 if (tid < req->r_tid)
346 n = n->rb_left;
347 else if (tid > req->r_tid)
348 n = n->rb_right;
349 else
350 return req;
351 }
352 return NULL;
353 }
354
355 static struct ceph_osd_request *
356 __lookup_request_ge(struct ceph_osd_client *osdc,
357 u64 tid)
358 {
359 struct ceph_osd_request *req;
360 struct rb_node *n = osdc->requests.rb_node;
361
362 while (n) {
363 req = rb_entry(n, struct ceph_osd_request, r_node);
364 if (tid < req->r_tid) {
365 if (!n->rb_left)
366 return req;
367 n = n->rb_left;
368 } else if (tid > req->r_tid) {
369 n = n->rb_right;
370 } else {
371 return req;
372 }
373 }
374 return NULL;
375 }
376
377
378 /*
379 * If the osd connection drops, we need to resubmit all requests.
380 */
381 static void osd_reset(struct ceph_connection *con)
382 {
383 struct ceph_osd *osd = con->private;
384 struct ceph_osd_client *osdc;
385
386 if (!osd)
387 return;
388 dout("osd_reset osd%d\n", osd->o_osd);
389 osdc = osd->o_osdc;
390 down_read(&osdc->map_sem);
391 kick_requests(osdc, osd);
392 up_read(&osdc->map_sem);
393 }
394
395 /*
396 * Track open sessions with osds.
397 */
398 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
399 {
400 struct ceph_osd *osd;
401
402 osd = kzalloc(sizeof(*osd), GFP_NOFS);
403 if (!osd)
404 return NULL;
405
406 atomic_set(&osd->o_ref, 1);
407 osd->o_osdc = osdc;
408 INIT_LIST_HEAD(&osd->o_requests);
409 INIT_LIST_HEAD(&osd->o_osd_lru);
410 osd->o_incarnation = 1;
411
412 ceph_con_init(osdc->client->msgr, &osd->o_con);
413 osd->o_con.private = osd;
414 osd->o_con.ops = &osd_con_ops;
415 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
416
417 INIT_LIST_HEAD(&osd->o_keepalive_item);
418 return osd;
419 }
420
421 static struct ceph_osd *get_osd(struct ceph_osd *osd)
422 {
423 if (atomic_inc_not_zero(&osd->o_ref)) {
424 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
425 atomic_read(&osd->o_ref));
426 return osd;
427 } else {
428 dout("get_osd %p FAIL\n", osd);
429 return NULL;
430 }
431 }
432
433 static void put_osd(struct ceph_osd *osd)
434 {
435 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
436 atomic_read(&osd->o_ref) - 1);
437 if (atomic_dec_and_test(&osd->o_ref)) {
438 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
439
440 if (osd->o_authorizer)
441 ac->ops->destroy_authorizer(ac, osd->o_authorizer);
442 kfree(osd);
443 }
444 }
445
446 /*
447 * remove an osd from our map
448 */
449 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
450 {
451 dout("__remove_osd %p\n", osd);
452 BUG_ON(!list_empty(&osd->o_requests));
453 rb_erase(&osd->o_node, &osdc->osds);
454 list_del_init(&osd->o_osd_lru);
455 ceph_con_close(&osd->o_con);
456 put_osd(osd);
457 }
458
459 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
460 struct ceph_osd *osd)
461 {
462 dout("__move_osd_to_lru %p\n", osd);
463 BUG_ON(!list_empty(&osd->o_osd_lru));
464 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
465 osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
466 }
467
468 static void __remove_osd_from_lru(struct ceph_osd *osd)
469 {
470 dout("__remove_osd_from_lru %p\n", osd);
471 if (!list_empty(&osd->o_osd_lru))
472 list_del_init(&osd->o_osd_lru);
473 }
474
475 static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
476 {
477 struct ceph_osd *osd, *nosd;
478
479 dout("__remove_old_osds %p\n", osdc);
480 mutex_lock(&osdc->request_mutex);
481 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
482 if (!remove_all && time_before(jiffies, osd->lru_ttl))
483 break;
484 __remove_osd(osdc, osd);
485 }
486 mutex_unlock(&osdc->request_mutex);
487 }
488
489 /*
490 * reset osd connect
491 */
492 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
493 {
494 struct ceph_osd_request *req;
495 int ret = 0;
496
497 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
498 if (list_empty(&osd->o_requests)) {
499 __remove_osd(osdc, osd);
500 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
501 &osd->o_con.peer_addr,
502 sizeof(osd->o_con.peer_addr)) == 0 &&
503 !ceph_con_opened(&osd->o_con)) {
504 dout(" osd addr hasn't changed and connection never opened,"
505 " letting msgr retry");
506 /* touch each r_stamp for handle_timeout()'s benfit */
507 list_for_each_entry(req, &osd->o_requests, r_osd_item)
508 req->r_stamp = jiffies;
509 ret = -EAGAIN;
510 } else {
511 ceph_con_close(&osd->o_con);
512 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
513 osd->o_incarnation++;
514 }
515 return ret;
516 }
517
518 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
519 {
520 struct rb_node **p = &osdc->osds.rb_node;
521 struct rb_node *parent = NULL;
522 struct ceph_osd *osd = NULL;
523
524 while (*p) {
525 parent = *p;
526 osd = rb_entry(parent, struct ceph_osd, o_node);
527 if (new->o_osd < osd->o_osd)
528 p = &(*p)->rb_left;
529 else if (new->o_osd > osd->o_osd)
530 p = &(*p)->rb_right;
531 else
532 BUG();
533 }
534
535 rb_link_node(&new->o_node, parent, p);
536 rb_insert_color(&new->o_node, &osdc->osds);
537 }
538
539 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
540 {
541 struct ceph_osd *osd;
542 struct rb_node *n = osdc->osds.rb_node;
543
544 while (n) {
545 osd = rb_entry(n, struct ceph_osd, o_node);
546 if (o < osd->o_osd)
547 n = n->rb_left;
548 else if (o > osd->o_osd)
549 n = n->rb_right;
550 else
551 return osd;
552 }
553 return NULL;
554 }
555
556 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
557 {
558 schedule_delayed_work(&osdc->timeout_work,
559 osdc->client->mount_args->osd_keepalive_timeout * HZ);
560 }
561
562 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
563 {
564 cancel_delayed_work(&osdc->timeout_work);
565 }
566
567 /*
568 * Register request, assign tid. If this is the first request, set up
569 * the timeout event.
570 */
571 static void register_request(struct ceph_osd_client *osdc,
572 struct ceph_osd_request *req)
573 {
574 mutex_lock(&osdc->request_mutex);
575 req->r_tid = ++osdc->last_tid;
576 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
577 INIT_LIST_HEAD(&req->r_req_lru_item);
578
579 dout("register_request %p tid %lld\n", req, req->r_tid);
580 __insert_request(osdc, req);
581 ceph_osdc_get_request(req);
582 osdc->num_requests++;
583
584 if (osdc->num_requests == 1) {
585 dout(" first request, scheduling timeout\n");
586 __schedule_osd_timeout(osdc);
587 }
588 mutex_unlock(&osdc->request_mutex);
589 }
590
591 /*
592 * called under osdc->request_mutex
593 */
594 static void __unregister_request(struct ceph_osd_client *osdc,
595 struct ceph_osd_request *req)
596 {
597 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
598 rb_erase(&req->r_node, &osdc->requests);
599 osdc->num_requests--;
600
601 if (req->r_osd) {
602 /* make sure the original request isn't in flight. */
603 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
604
605 list_del_init(&req->r_osd_item);
606 if (list_empty(&req->r_osd->o_requests))
607 __move_osd_to_lru(osdc, req->r_osd);
608 req->r_osd = NULL;
609 }
610
611 ceph_osdc_put_request(req);
612
613 list_del_init(&req->r_req_lru_item);
614 if (osdc->num_requests == 0) {
615 dout(" no requests, canceling timeout\n");
616 __cancel_osd_timeout(osdc);
617 }
618 }
619
620 /*
621 * Cancel a previously queued request message
622 */
623 static void __cancel_request(struct ceph_osd_request *req)
624 {
625 if (req->r_sent && req->r_osd) {
626 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
627 req->r_sent = 0;
628 }
629 list_del_init(&req->r_req_lru_item);
630 }
631
632 /*
633 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
634 * (as needed), and set the request r_osd appropriately. If there is
635 * no up osd, set r_osd to NULL.
636 *
637 * Return 0 if unchanged, 1 if changed, or negative on error.
638 *
639 * Caller should hold map_sem for read and request_mutex.
640 */
641 static int __map_osds(struct ceph_osd_client *osdc,
642 struct ceph_osd_request *req)
643 {
644 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
645 struct ceph_pg pgid;
646 int acting[CEPH_PG_MAX_SIZE];
647 int o = -1, num = 0;
648 int err;
649
650 dout("map_osds %p tid %lld\n", req, req->r_tid);
651 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
652 &req->r_file_layout, osdc->osdmap);
653 if (err)
654 return err;
655 pgid = reqhead->layout.ol_pgid;
656 req->r_pgid = pgid;
657
658 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
659 if (err > 0) {
660 o = acting[0];
661 num = err;
662 }
663
664 if ((req->r_osd && req->r_osd->o_osd == o &&
665 req->r_sent >= req->r_osd->o_incarnation &&
666 req->r_num_pg_osds == num &&
667 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
668 (req->r_osd == NULL && o == -1))
669 return 0; /* no change */
670
671 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
672 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
673 req->r_osd ? req->r_osd->o_osd : -1);
674
675 /* record full pg acting set */
676 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
677 req->r_num_pg_osds = num;
678
679 if (req->r_osd) {
680 __cancel_request(req);
681 list_del_init(&req->r_osd_item);
682 req->r_osd = NULL;
683 }
684
685 req->r_osd = __lookup_osd(osdc, o);
686 if (!req->r_osd && o >= 0) {
687 err = -ENOMEM;
688 req->r_osd = create_osd(osdc);
689 if (!req->r_osd)
690 goto out;
691
692 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
693 req->r_osd->o_osd = o;
694 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
695 __insert_osd(osdc, req->r_osd);
696
697 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
698 }
699
700 if (req->r_osd) {
701 __remove_osd_from_lru(req->r_osd);
702 list_add(&req->r_osd_item, &req->r_osd->o_requests);
703 }
704 err = 1; /* osd or pg changed */
705
706 out:
707 return err;
708 }
709
710 /*
711 * caller should hold map_sem (for read) and request_mutex
712 */
713 static int __send_request(struct ceph_osd_client *osdc,
714 struct ceph_osd_request *req)
715 {
716 struct ceph_osd_request_head *reqhead;
717 int err;
718
719 err = __map_osds(osdc, req);
720 if (err < 0)
721 return err;
722 if (req->r_osd == NULL) {
723 dout("send_request %p no up osds in pg\n", req);
724 ceph_monc_request_next_osdmap(&osdc->client->monc);
725 return 0;
726 }
727
728 dout("send_request %p tid %llu to osd%d flags %d\n",
729 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
730
731 reqhead = req->r_request->front.iov_base;
732 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
733 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
734 reqhead->reassert_version = req->r_reassert_version;
735
736 req->r_stamp = jiffies;
737 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
738
739 ceph_msg_get(req->r_request); /* send consumes a ref */
740 ceph_con_send(&req->r_osd->o_con, req->r_request);
741 req->r_sent = req->r_osd->o_incarnation;
742 return 0;
743 }
744
745 /*
746 * Timeout callback, called every N seconds when 1 or more osd
747 * requests has been active for more than N seconds. When this
748 * happens, we ping all OSDs with requests who have timed out to
749 * ensure any communications channel reset is detected. Reset the
750 * request timeouts another N seconds in the future as we go.
751 * Reschedule the timeout event another N seconds in future (unless
752 * there are no open requests).
753 */
754 static void handle_timeout(struct work_struct *work)
755 {
756 struct ceph_osd_client *osdc =
757 container_of(work, struct ceph_osd_client, timeout_work.work);
758 struct ceph_osd_request *req, *last_req = NULL;
759 struct ceph_osd *osd;
760 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
761 unsigned long keepalive =
762 osdc->client->mount_args->osd_keepalive_timeout * HZ;
763 unsigned long last_stamp = 0;
764 struct rb_node *p;
765 struct list_head slow_osds;
766
767 dout("timeout\n");
768 down_read(&osdc->map_sem);
769
770 ceph_monc_request_next_osdmap(&osdc->client->monc);
771
772 mutex_lock(&osdc->request_mutex);
773 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
774 req = rb_entry(p, struct ceph_osd_request, r_node);
775
776 if (req->r_resend) {
777 int err;
778
779 dout("osdc resending prev failed %lld\n", req->r_tid);
780 err = __send_request(osdc, req);
781 if (err)
782 dout("osdc failed again on %lld\n", req->r_tid);
783 else
784 req->r_resend = false;
785 continue;
786 }
787 }
788
789 /*
790 * reset osds that appear to be _really_ unresponsive. this
791 * is a failsafe measure.. we really shouldn't be getting to
792 * this point if the system is working properly. the monitors
793 * should mark the osd as failed and we should find out about
794 * it from an updated osd map.
795 */
796 while (timeout && !list_empty(&osdc->req_lru)) {
797 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
798 r_req_lru_item);
799
800 if (time_before(jiffies, req->r_stamp + timeout))
801 break;
802
803 BUG_ON(req == last_req && req->r_stamp == last_stamp);
804 last_req = req;
805 last_stamp = req->r_stamp;
806
807 osd = req->r_osd;
808 BUG_ON(!osd);
809 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
810 req->r_tid, osd->o_osd);
811 __kick_requests(osdc, osd);
812 }
813
814 /*
815 * ping osds that are a bit slow. this ensures that if there
816 * is a break in the TCP connection we will notice, and reopen
817 * a connection with that osd (from the fault callback).
818 */
819 INIT_LIST_HEAD(&slow_osds);
820 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
821 if (time_before(jiffies, req->r_stamp + keepalive))
822 break;
823
824 osd = req->r_osd;
825 BUG_ON(!osd);
826 dout(" tid %llu is slow, will send keepalive on osd%d\n",
827 req->r_tid, osd->o_osd);
828 list_move_tail(&osd->o_keepalive_item, &slow_osds);
829 }
830 while (!list_empty(&slow_osds)) {
831 osd = list_entry(slow_osds.next, struct ceph_osd,
832 o_keepalive_item);
833 list_del_init(&osd->o_keepalive_item);
834 ceph_con_keepalive(&osd->o_con);
835 }
836
837 __schedule_osd_timeout(osdc);
838 mutex_unlock(&osdc->request_mutex);
839
840 up_read(&osdc->map_sem);
841 }
842
843 static void handle_osds_timeout(struct work_struct *work)
844 {
845 struct ceph_osd_client *osdc =
846 container_of(work, struct ceph_osd_client,
847 osds_timeout_work.work);
848 unsigned long delay =
849 osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
850
851 dout("osds timeout\n");
852 down_read(&osdc->map_sem);
853 remove_old_osds(osdc, 0);
854 up_read(&osdc->map_sem);
855
856 schedule_delayed_work(&osdc->osds_timeout_work,
857 round_jiffies_relative(delay));
858 }
859
860 /*
861 * handle osd op reply. either call the callback if it is specified,
862 * or do the completion to wake up the waiting thread.
863 */
864 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
865 struct ceph_connection *con)
866 {
867 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
868 struct ceph_osd_request *req;
869 u64 tid;
870 int numops, object_len, flags;
871 s32 result;
872
873 tid = le64_to_cpu(msg->hdr.tid);
874 if (msg->front.iov_len < sizeof(*rhead))
875 goto bad;
876 numops = le32_to_cpu(rhead->num_ops);
877 object_len = le32_to_cpu(rhead->object_len);
878 result = le32_to_cpu(rhead->result);
879 if (msg->front.iov_len != sizeof(*rhead) + object_len +
880 numops * sizeof(struct ceph_osd_op))
881 goto bad;
882 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
883
884 /* lookup */
885 mutex_lock(&osdc->request_mutex);
886 req = __lookup_request(osdc, tid);
887 if (req == NULL) {
888 dout("handle_reply tid %llu dne\n", tid);
889 mutex_unlock(&osdc->request_mutex);
890 return;
891 }
892 ceph_osdc_get_request(req);
893 flags = le32_to_cpu(rhead->flags);
894
895 /*
896 * if this connection filled our message, drop our reference now, to
897 * avoid a (safe but slower) revoke later.
898 */
899 if (req->r_con_filling_msg == con && req->r_reply == msg) {
900 dout(" dropping con_filling_msg ref %p\n", con);
901 req->r_con_filling_msg = NULL;
902 ceph_con_put(con);
903 }
904
905 if (!req->r_got_reply) {
906 unsigned bytes;
907
908 req->r_result = le32_to_cpu(rhead->result);
909 bytes = le32_to_cpu(msg->hdr.data_len);
910 dout("handle_reply result %d bytes %d\n", req->r_result,
911 bytes);
912 if (req->r_result == 0)
913 req->r_result = bytes;
914
915 /* in case this is a write and we need to replay, */
916 req->r_reassert_version = rhead->reassert_version;
917
918 req->r_got_reply = 1;
919 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
920 dout("handle_reply tid %llu dup ack\n", tid);
921 mutex_unlock(&osdc->request_mutex);
922 goto done;
923 }
924
925 dout("handle_reply tid %llu flags %d\n", tid, flags);
926
927 /* either this is a read, or we got the safe response */
928 if (result < 0 ||
929 (flags & CEPH_OSD_FLAG_ONDISK) ||
930 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
931 __unregister_request(osdc, req);
932
933 mutex_unlock(&osdc->request_mutex);
934
935 if (req->r_callback)
936 req->r_callback(req, msg);
937 else
938 complete_all(&req->r_completion);
939
940 if (flags & CEPH_OSD_FLAG_ONDISK) {
941 if (req->r_safe_callback)
942 req->r_safe_callback(req, msg);
943 complete_all(&req->r_safe_completion); /* fsync waiter */
944 }
945
946 done:
947 ceph_osdc_put_request(req);
948 return;
949
950 bad:
951 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
952 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
953 (int)sizeof(*rhead));
954 ceph_msg_dump(msg);
955 }
956
957
958 static int __kick_requests(struct ceph_osd_client *osdc,
959 struct ceph_osd *kickosd)
960 {
961 struct ceph_osd_request *req;
962 struct rb_node *p, *n;
963 int needmap = 0;
964 int err;
965
966 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
967 if (kickosd) {
968 err = __reset_osd(osdc, kickosd);
969 if (err == -EAGAIN)
970 return 1;
971 } else {
972 for (p = rb_first(&osdc->osds); p; p = n) {
973 struct ceph_osd *osd =
974 rb_entry(p, struct ceph_osd, o_node);
975
976 n = rb_next(p);
977 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
978 memcmp(&osd->o_con.peer_addr,
979 ceph_osd_addr(osdc->osdmap,
980 osd->o_osd),
981 sizeof(struct ceph_entity_addr)) != 0)
982 __reset_osd(osdc, osd);
983 }
984 }
985
986 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
987 req = rb_entry(p, struct ceph_osd_request, r_node);
988
989 if (req->r_resend) {
990 dout(" r_resend set on tid %llu\n", req->r_tid);
991 __cancel_request(req);
992 goto kick;
993 }
994 if (req->r_osd && kickosd == req->r_osd) {
995 __cancel_request(req);
996 goto kick;
997 }
998
999 err = __map_osds(osdc, req);
1000 if (err == 0)
1001 continue; /* no change */
1002 if (err < 0) {
1003 /*
1004 * FIXME: really, we should set the request
1005 * error and fail if this isn't a 'nofail'
1006 * request, but that's a fair bit more
1007 * complicated to do. So retry!
1008 */
1009 dout(" setting r_resend on %llu\n", req->r_tid);
1010 req->r_resend = true;
1011 continue;
1012 }
1013 if (req->r_osd == NULL) {
1014 dout("tid %llu maps to no valid osd\n", req->r_tid);
1015 needmap++; /* request a newer map */
1016 continue;
1017 }
1018
1019 kick:
1020 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
1021 req->r_osd ? req->r_osd->o_osd : -1);
1022 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1023 err = __send_request(osdc, req);
1024 if (err) {
1025 dout(" setting r_resend on %llu\n", req->r_tid);
1026 req->r_resend = true;
1027 }
1028 }
1029
1030 return needmap;
1031 }
1032
1033 /*
1034 * Resubmit osd requests whose osd or osd address has changed. Request
1035 * a new osd map if osds are down, or we are otherwise unable to determine
1036 * how to direct a request.
1037 *
1038 * Close connections to down osds.
1039 *
1040 * If @who is specified, resubmit requests for that specific osd.
1041 *
1042 * Caller should hold map_sem for read and request_mutex.
1043 */
1044 static void kick_requests(struct ceph_osd_client *osdc,
1045 struct ceph_osd *kickosd)
1046 {
1047 int needmap;
1048
1049 mutex_lock(&osdc->request_mutex);
1050 needmap = __kick_requests(osdc, kickosd);
1051 mutex_unlock(&osdc->request_mutex);
1052
1053 if (needmap) {
1054 dout("%d requests for down osds, need new map\n", needmap);
1055 ceph_monc_request_next_osdmap(&osdc->client->monc);
1056 }
1057
1058 }
1059 /*
1060 * Process updated osd map.
1061 *
1062 * The message contains any number of incremental and full maps, normally
1063 * indicating some sort of topology change in the cluster. Kick requests
1064 * off to different OSDs as needed.
1065 */
1066 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1067 {
1068 void *p, *end, *next;
1069 u32 nr_maps, maplen;
1070 u32 epoch;
1071 struct ceph_osdmap *newmap = NULL, *oldmap;
1072 int err;
1073 struct ceph_fsid fsid;
1074
1075 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1076 p = msg->front.iov_base;
1077 end = p + msg->front.iov_len;
1078
1079 /* verify fsid */
1080 ceph_decode_need(&p, end, sizeof(fsid), bad);
1081 ceph_decode_copy(&p, &fsid, sizeof(fsid));
1082 if (ceph_check_fsid(osdc->client, &fsid) < 0)
1083 return;
1084
1085 down_write(&osdc->map_sem);
1086
1087 /* incremental maps */
1088 ceph_decode_32_safe(&p, end, nr_maps, bad);
1089 dout(" %d inc maps\n", nr_maps);
1090 while (nr_maps > 0) {
1091 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1092 epoch = ceph_decode_32(&p);
1093 maplen = ceph_decode_32(&p);
1094 ceph_decode_need(&p, end, maplen, bad);
1095 next = p + maplen;
1096 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1097 dout("applying incremental map %u len %d\n",
1098 epoch, maplen);
1099 newmap = osdmap_apply_incremental(&p, next,
1100 osdc->osdmap,
1101 osdc->client->msgr);
1102 if (IS_ERR(newmap)) {
1103 err = PTR_ERR(newmap);
1104 goto bad;
1105 }
1106 BUG_ON(!newmap);
1107 if (newmap != osdc->osdmap) {
1108 ceph_osdmap_destroy(osdc->osdmap);
1109 osdc->osdmap = newmap;
1110 }
1111 } else {
1112 dout("ignoring incremental map %u len %d\n",
1113 epoch, maplen);
1114 }
1115 p = next;
1116 nr_maps--;
1117 }
1118 if (newmap)
1119 goto done;
1120
1121 /* full maps */
1122 ceph_decode_32_safe(&p, end, nr_maps, bad);
1123 dout(" %d full maps\n", nr_maps);
1124 while (nr_maps) {
1125 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1126 epoch = ceph_decode_32(&p);
1127 maplen = ceph_decode_32(&p);
1128 ceph_decode_need(&p, end, maplen, bad);
1129 if (nr_maps > 1) {
1130 dout("skipping non-latest full map %u len %d\n",
1131 epoch, maplen);
1132 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1133 dout("skipping full map %u len %d, "
1134 "older than our %u\n", epoch, maplen,
1135 osdc->osdmap->epoch);
1136 } else {
1137 dout("taking full map %u len %d\n", epoch, maplen);
1138 newmap = osdmap_decode(&p, p+maplen);
1139 if (IS_ERR(newmap)) {
1140 err = PTR_ERR(newmap);
1141 goto bad;
1142 }
1143 BUG_ON(!newmap);
1144 oldmap = osdc->osdmap;
1145 osdc->osdmap = newmap;
1146 if (oldmap)
1147 ceph_osdmap_destroy(oldmap);
1148 }
1149 p += maplen;
1150 nr_maps--;
1151 }
1152
1153 done:
1154 downgrade_write(&osdc->map_sem);
1155 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1156 if (newmap)
1157 kick_requests(osdc, NULL);
1158 up_read(&osdc->map_sem);
1159 wake_up_all(&osdc->client->auth_wq);
1160 return;
1161
1162 bad:
1163 pr_err("osdc handle_map corrupt msg\n");
1164 ceph_msg_dump(msg);
1165 up_write(&osdc->map_sem);
1166 return;
1167 }
1168
1169 /*
1170 * Register request, send initial attempt.
1171 */
1172 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1173 struct ceph_osd_request *req,
1174 bool nofail)
1175 {
1176 int rc = 0;
1177
1178 req->r_request->pages = req->r_pages;
1179 req->r_request->nr_pages = req->r_num_pages;
1180
1181 register_request(osdc, req);
1182
1183 down_read(&osdc->map_sem);
1184 mutex_lock(&osdc->request_mutex);
1185 /*
1186 * a racing kick_requests() may have sent the message for us
1187 * while we dropped request_mutex above, so only send now if
1188 * the request still han't been touched yet.
1189 */
1190 if (req->r_sent == 0) {
1191 rc = __send_request(osdc, req);
1192 if (rc) {
1193 if (nofail) {
1194 dout("osdc_start_request failed send, "
1195 " marking %lld\n", req->r_tid);
1196 req->r_resend = true;
1197 rc = 0;
1198 } else {
1199 __unregister_request(osdc, req);
1200 }
1201 }
1202 }
1203 mutex_unlock(&osdc->request_mutex);
1204 up_read(&osdc->map_sem);
1205 return rc;
1206 }
1207
1208 /*
1209 * wait for a request to complete
1210 */
1211 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1212 struct ceph_osd_request *req)
1213 {
1214 int rc;
1215
1216 rc = wait_for_completion_interruptible(&req->r_completion);
1217 if (rc < 0) {
1218 mutex_lock(&osdc->request_mutex);
1219 __cancel_request(req);
1220 __unregister_request(osdc, req);
1221 mutex_unlock(&osdc->request_mutex);
1222 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1223 return rc;
1224 }
1225
1226 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1227 return req->r_result;
1228 }
1229
1230 /*
1231 * sync - wait for all in-flight requests to flush. avoid starvation.
1232 */
1233 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1234 {
1235 struct ceph_osd_request *req;
1236 u64 last_tid, next_tid = 0;
1237
1238 mutex_lock(&osdc->request_mutex);
1239 last_tid = osdc->last_tid;
1240 while (1) {
1241 req = __lookup_request_ge(osdc, next_tid);
1242 if (!req)
1243 break;
1244 if (req->r_tid > last_tid)
1245 break;
1246
1247 next_tid = req->r_tid + 1;
1248 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1249 continue;
1250
1251 ceph_osdc_get_request(req);
1252 mutex_unlock(&osdc->request_mutex);
1253 dout("sync waiting on tid %llu (last is %llu)\n",
1254 req->r_tid, last_tid);
1255 wait_for_completion(&req->r_safe_completion);
1256 mutex_lock(&osdc->request_mutex);
1257 ceph_osdc_put_request(req);
1258 }
1259 mutex_unlock(&osdc->request_mutex);
1260 dout("sync done (thru tid %llu)\n", last_tid);
1261 }
1262
1263 /*
1264 * init, shutdown
1265 */
1266 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1267 {
1268 int err;
1269
1270 dout("init\n");
1271 osdc->client = client;
1272 osdc->osdmap = NULL;
1273 init_rwsem(&osdc->map_sem);
1274 init_completion(&osdc->map_waiters);
1275 osdc->last_requested_map = 0;
1276 mutex_init(&osdc->request_mutex);
1277 osdc->last_tid = 0;
1278 osdc->osds = RB_ROOT;
1279 INIT_LIST_HEAD(&osdc->osd_lru);
1280 osdc->requests = RB_ROOT;
1281 INIT_LIST_HEAD(&osdc->req_lru);
1282 osdc->num_requests = 0;
1283 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1284 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1285
1286 schedule_delayed_work(&osdc->osds_timeout_work,
1287 round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
1288
1289 err = -ENOMEM;
1290 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1291 sizeof(struct ceph_osd_request));
1292 if (!osdc->req_mempool)
1293 goto out;
1294
1295 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1296 "osd_op");
1297 if (err < 0)
1298 goto out_mempool;
1299 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1300 OSD_OPREPLY_FRONT_LEN, 10, true,
1301 "osd_op_reply");
1302 if (err < 0)
1303 goto out_msgpool;
1304 return 0;
1305
1306 out_msgpool:
1307 ceph_msgpool_destroy(&osdc->msgpool_op);
1308 out_mempool:
1309 mempool_destroy(osdc->req_mempool);
1310 out:
1311 return err;
1312 }
1313
1314 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1315 {
1316 cancel_delayed_work_sync(&osdc->timeout_work);
1317 cancel_delayed_work_sync(&osdc->osds_timeout_work);
1318 if (osdc->osdmap) {
1319 ceph_osdmap_destroy(osdc->osdmap);
1320 osdc->osdmap = NULL;
1321 }
1322 remove_old_osds(osdc, 1);
1323 mempool_destroy(osdc->req_mempool);
1324 ceph_msgpool_destroy(&osdc->msgpool_op);
1325 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1326 }
1327
1328 /*
1329 * Read some contiguous pages. If we cross a stripe boundary, shorten
1330 * *plen. Return number of bytes read, or error.
1331 */
1332 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1333 struct ceph_vino vino, struct ceph_file_layout *layout,
1334 u64 off, u64 *plen,
1335 u32 truncate_seq, u64 truncate_size,
1336 struct page **pages, int num_pages)
1337 {
1338 struct ceph_osd_request *req;
1339 int rc = 0;
1340
1341 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1342 vino.snap, off, *plen);
1343 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1344 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1345 NULL, 0, truncate_seq, truncate_size, NULL,
1346 false, 1);
1347 if (!req)
1348 return -ENOMEM;
1349
1350 /* it may be a short read due to an object boundary */
1351 req->r_pages = pages;
1352
1353 dout("readpages final extent is %llu~%llu (%d pages)\n",
1354 off, *plen, req->r_num_pages);
1355
1356 rc = ceph_osdc_start_request(osdc, req, false);
1357 if (!rc)
1358 rc = ceph_osdc_wait_request(osdc, req);
1359
1360 ceph_osdc_put_request(req);
1361 dout("readpages result %d\n", rc);
1362 return rc;
1363 }
1364
1365 /*
1366 * do a synchronous write on N pages
1367 */
1368 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1369 struct ceph_file_layout *layout,
1370 struct ceph_snap_context *snapc,
1371 u64 off, u64 len,
1372 u32 truncate_seq, u64 truncate_size,
1373 struct timespec *mtime,
1374 struct page **pages, int num_pages,
1375 int flags, int do_sync, bool nofail)
1376 {
1377 struct ceph_osd_request *req;
1378 int rc = 0;
1379
1380 BUG_ON(vino.snap != CEPH_NOSNAP);
1381 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1382 CEPH_OSD_OP_WRITE,
1383 flags | CEPH_OSD_FLAG_ONDISK |
1384 CEPH_OSD_FLAG_WRITE,
1385 snapc, do_sync,
1386 truncate_seq, truncate_size, mtime,
1387 nofail, 1);
1388 if (!req)
1389 return -ENOMEM;
1390
1391 /* it may be a short write due to an object boundary */
1392 req->r_pages = pages;
1393 dout("writepages %llu~%llu (%d pages)\n", off, len,
1394 req->r_num_pages);
1395
1396 rc = ceph_osdc_start_request(osdc, req, nofail);
1397 if (!rc)
1398 rc = ceph_osdc_wait_request(osdc, req);
1399
1400 ceph_osdc_put_request(req);
1401 if (rc == 0)
1402 rc = len;
1403 dout("writepages result %d\n", rc);
1404 return rc;
1405 }
1406
1407 /*
1408 * handle incoming message
1409 */
1410 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1411 {
1412 struct ceph_osd *osd = con->private;
1413 struct ceph_osd_client *osdc;
1414 int type = le16_to_cpu(msg->hdr.type);
1415
1416 if (!osd)
1417 goto out;
1418 osdc = osd->o_osdc;
1419
1420 switch (type) {
1421 case CEPH_MSG_OSD_MAP:
1422 ceph_osdc_handle_map(osdc, msg);
1423 break;
1424 case CEPH_MSG_OSD_OPREPLY:
1425 handle_reply(osdc, msg, con);
1426 break;
1427
1428 default:
1429 pr_err("received unknown message type %d %s\n", type,
1430 ceph_msg_type_name(type));
1431 }
1432 out:
1433 ceph_msg_put(msg);
1434 }
1435
1436 /*
1437 * lookup and return message for incoming reply. set up reply message
1438 * pages.
1439 */
1440 static struct ceph_msg *get_reply(struct ceph_connection *con,
1441 struct ceph_msg_header *hdr,
1442 int *skip)
1443 {
1444 struct ceph_osd *osd = con->private;
1445 struct ceph_osd_client *osdc = osd->o_osdc;
1446 struct ceph_msg *m;
1447 struct ceph_osd_request *req;
1448 int front = le32_to_cpu(hdr->front_len);
1449 int data_len = le32_to_cpu(hdr->data_len);
1450 u64 tid;
1451
1452 tid = le64_to_cpu(hdr->tid);
1453 mutex_lock(&osdc->request_mutex);
1454 req = __lookup_request(osdc, tid);
1455 if (!req) {
1456 *skip = 1;
1457 m = NULL;
1458 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
1459 osd->o_osd);
1460 goto out;
1461 }
1462
1463 if (req->r_con_filling_msg) {
1464 dout("get_reply revoking msg %p from old con %p\n",
1465 req->r_reply, req->r_con_filling_msg);
1466 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1467 ceph_con_put(req->r_con_filling_msg);
1468 req->r_con_filling_msg = NULL;
1469 }
1470
1471 if (front > req->r_reply->front.iov_len) {
1472 pr_warning("get_reply front %d > preallocated %d\n",
1473 front, (int)req->r_reply->front.iov_len);
1474 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
1475 if (!m)
1476 goto out;
1477 ceph_msg_put(req->r_reply);
1478 req->r_reply = m;
1479 }
1480 m = ceph_msg_get(req->r_reply);
1481
1482 if (data_len > 0) {
1483 unsigned data_off = le16_to_cpu(hdr->data_off);
1484 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1485
1486 if (unlikely(req->r_num_pages < want)) {
1487 pr_warning("tid %lld reply %d > expected %d pages\n",
1488 tid, want, m->nr_pages);
1489 *skip = 1;
1490 ceph_msg_put(m);
1491 m = NULL;
1492 goto out;
1493 }
1494 m->pages = req->r_pages;
1495 m->nr_pages = req->r_num_pages;
1496 }
1497 *skip = 0;
1498 req->r_con_filling_msg = ceph_con_get(con);
1499 dout("get_reply tid %lld %p\n", tid, m);
1500
1501 out:
1502 mutex_unlock(&osdc->request_mutex);
1503 return m;
1504
1505 }
1506
1507 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1508 struct ceph_msg_header *hdr,
1509 int *skip)
1510 {
1511 struct ceph_osd *osd = con->private;
1512 int type = le16_to_cpu(hdr->type);
1513 int front = le32_to_cpu(hdr->front_len);
1514
1515 switch (type) {
1516 case CEPH_MSG_OSD_MAP:
1517 return ceph_msg_new(type, front, GFP_NOFS);
1518 case CEPH_MSG_OSD_OPREPLY:
1519 return get_reply(con, hdr, skip);
1520 default:
1521 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1522 osd->o_osd);
1523 *skip = 1;
1524 return NULL;
1525 }
1526 }
1527
1528 /*
1529 * Wrappers to refcount containing ceph_osd struct
1530 */
1531 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1532 {
1533 struct ceph_osd *osd = con->private;
1534 if (get_osd(osd))
1535 return con;
1536 return NULL;
1537 }
1538
1539 static void put_osd_con(struct ceph_connection *con)
1540 {
1541 struct ceph_osd *osd = con->private;
1542 put_osd(osd);
1543 }
1544
1545 /*
1546 * authentication
1547 */
1548 static int get_authorizer(struct ceph_connection *con,
1549 void **buf, int *len, int *proto,
1550 void **reply_buf, int *reply_len, int force_new)
1551 {
1552 struct ceph_osd *o = con->private;
1553 struct ceph_osd_client *osdc = o->o_osdc;
1554 struct ceph_auth_client *ac = osdc->client->monc.auth;
1555 int ret = 0;
1556
1557 if (force_new && o->o_authorizer) {
1558 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1559 o->o_authorizer = NULL;
1560 }
1561 if (o->o_authorizer == NULL) {
1562 ret = ac->ops->create_authorizer(
1563 ac, CEPH_ENTITY_TYPE_OSD,
1564 &o->o_authorizer,
1565 &o->o_authorizer_buf,
1566 &o->o_authorizer_buf_len,
1567 &o->o_authorizer_reply_buf,
1568 &o->o_authorizer_reply_buf_len);
1569 if (ret)
1570 return ret;
1571 }
1572
1573 *proto = ac->protocol;
1574 *buf = o->o_authorizer_buf;
1575 *len = o->o_authorizer_buf_len;
1576 *reply_buf = o->o_authorizer_reply_buf;
1577 *reply_len = o->o_authorizer_reply_buf_len;
1578 return 0;
1579 }
1580
1581
1582 static int verify_authorizer_reply(struct ceph_connection *con, int len)
1583 {
1584 struct ceph_osd *o = con->private;
1585 struct ceph_osd_client *osdc = o->o_osdc;
1586 struct ceph_auth_client *ac = osdc->client->monc.auth;
1587
1588 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1589 }
1590
1591 static int invalidate_authorizer(struct ceph_connection *con)
1592 {
1593 struct ceph_osd *o = con->private;
1594 struct ceph_osd_client *osdc = o->o_osdc;
1595 struct ceph_auth_client *ac = osdc->client->monc.auth;
1596
1597 if (ac->ops->invalidate_authorizer)
1598 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1599
1600 return ceph_monc_validate_auth(&osdc->client->monc);
1601 }
1602
1603 static const struct ceph_connection_operations osd_con_ops = {
1604 .get = get_osd_con,
1605 .put = put_osd_con,
1606 .dispatch = dispatch,
1607 .get_authorizer = get_authorizer,
1608 .verify_authorizer_reply = verify_authorizer_reply,
1609 .invalidate_authorizer = invalidate_authorizer,
1610 .alloc_msg = alloc_msg,
1611 .fault = osd_reset,
1612 };