2 #include <linux/ceph/ceph_debug.h>
4 #include <linux/module.h>
5 #include <linux/slab.h>
8 #include <linux/ceph/libceph.h>
9 #include <linux/ceph/osdmap.h>
10 #include <linux/ceph/decode.h>
11 #include <linux/crush/hash.h>
12 #include <linux/crush/mapper.h>
14 char *ceph_osdmap_state_str(char *str
, int len
, int state
)
19 if ((state
& CEPH_OSD_EXISTS
) && (state
& CEPH_OSD_UP
))
20 snprintf(str
, len
, "exists, up");
21 else if (state
& CEPH_OSD_EXISTS
)
22 snprintf(str
, len
, "exists");
23 else if (state
& CEPH_OSD_UP
)
24 snprintf(str
, len
, "up");
26 snprintf(str
, len
, "doesn't exist");
33 static int calc_bits_of(unsigned int t
)
44 * the foo_mask is the smallest value 2^n-1 that is >= foo.
46 static void calc_pg_masks(struct ceph_pg_pool_info
*pi
)
48 pi
->pg_num_mask
= (1 << calc_bits_of(pi
->pg_num
-1)) - 1;
49 pi
->pgp_num_mask
= (1 << calc_bits_of(pi
->pgp_num
-1)) - 1;
55 static int crush_decode_uniform_bucket(void **p
, void *end
,
56 struct crush_bucket_uniform
*b
)
58 dout("crush_decode_uniform_bucket %p to %p\n", *p
, end
);
59 ceph_decode_need(p
, end
, (1+b
->h
.size
) * sizeof(u32
), bad
);
60 b
->item_weight
= ceph_decode_32(p
);
66 static int crush_decode_list_bucket(void **p
, void *end
,
67 struct crush_bucket_list
*b
)
70 dout("crush_decode_list_bucket %p to %p\n", *p
, end
);
71 b
->item_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
72 if (b
->item_weights
== NULL
)
74 b
->sum_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
75 if (b
->sum_weights
== NULL
)
77 ceph_decode_need(p
, end
, 2 * b
->h
.size
* sizeof(u32
), bad
);
78 for (j
= 0; j
< b
->h
.size
; j
++) {
79 b
->item_weights
[j
] = ceph_decode_32(p
);
80 b
->sum_weights
[j
] = ceph_decode_32(p
);
87 static int crush_decode_tree_bucket(void **p
, void *end
,
88 struct crush_bucket_tree
*b
)
91 dout("crush_decode_tree_bucket %p to %p\n", *p
, end
);
92 ceph_decode_8_safe(p
, end
, b
->num_nodes
, bad
);
93 b
->node_weights
= kcalloc(b
->num_nodes
, sizeof(u32
), GFP_NOFS
);
94 if (b
->node_weights
== NULL
)
96 ceph_decode_need(p
, end
, b
->num_nodes
* sizeof(u32
), bad
);
97 for (j
= 0; j
< b
->num_nodes
; j
++)
98 b
->node_weights
[j
] = ceph_decode_32(p
);
104 static int crush_decode_straw_bucket(void **p
, void *end
,
105 struct crush_bucket_straw
*b
)
108 dout("crush_decode_straw_bucket %p to %p\n", *p
, end
);
109 b
->item_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
110 if (b
->item_weights
== NULL
)
112 b
->straws
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
113 if (b
->straws
== NULL
)
115 ceph_decode_need(p
, end
, 2 * b
->h
.size
* sizeof(u32
), bad
);
116 for (j
= 0; j
< b
->h
.size
; j
++) {
117 b
->item_weights
[j
] = ceph_decode_32(p
);
118 b
->straws
[j
] = ceph_decode_32(p
);
125 static int crush_decode_straw2_bucket(void **p
, void *end
,
126 struct crush_bucket_straw2
*b
)
129 dout("crush_decode_straw2_bucket %p to %p\n", *p
, end
);
130 b
->item_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
131 if (b
->item_weights
== NULL
)
133 ceph_decode_need(p
, end
, b
->h
.size
* sizeof(u32
), bad
);
134 for (j
= 0; j
< b
->h
.size
; j
++)
135 b
->item_weights
[j
] = ceph_decode_32(p
);
141 static int skip_name_map(void **p
, void *end
)
144 ceph_decode_32_safe(p
, end
, len
,bad
);
148 ceph_decode_32_safe(p
, end
, strlen
, bad
);
156 static void crush_finalize(struct crush_map
*c
)
160 /* Space for the array of pointers to per-bucket workspace */
161 c
->working_size
= sizeof(struct crush_work
) +
162 c
->max_buckets
* sizeof(struct crush_work_bucket
*);
164 for (b
= 0; b
< c
->max_buckets
; b
++) {
168 switch (c
->buckets
[b
]->alg
) {
171 * The base case, permutation variables and
172 * the pointer to the permutation array.
174 c
->working_size
+= sizeof(struct crush_work_bucket
);
177 /* Every bucket has a permutation array. */
178 c
->working_size
+= c
->buckets
[b
]->size
* sizeof(__u32
);
182 static struct crush_map
*crush_decode(void *pbyval
, void *end
)
188 void *start
= pbyval
;
192 dout("crush_decode %p to %p len %d\n", *p
, end
, (int)(end
- *p
));
194 c
= kzalloc(sizeof(*c
), GFP_NOFS
);
196 return ERR_PTR(-ENOMEM
);
198 /* set tunables to default values */
199 c
->choose_local_tries
= 2;
200 c
->choose_local_fallback_tries
= 5;
201 c
->choose_total_tries
= 19;
202 c
->chooseleaf_descend_once
= 0;
204 ceph_decode_need(p
, end
, 4*sizeof(u32
), bad
);
205 magic
= ceph_decode_32(p
);
206 if (magic
!= CRUSH_MAGIC
) {
207 pr_err("crush_decode magic %x != current %x\n",
208 (unsigned int)magic
, (unsigned int)CRUSH_MAGIC
);
211 c
->max_buckets
= ceph_decode_32(p
);
212 c
->max_rules
= ceph_decode_32(p
);
213 c
->max_devices
= ceph_decode_32(p
);
215 c
->buckets
= kcalloc(c
->max_buckets
, sizeof(*c
->buckets
), GFP_NOFS
);
216 if (c
->buckets
== NULL
)
218 c
->rules
= kcalloc(c
->max_rules
, sizeof(*c
->rules
), GFP_NOFS
);
219 if (c
->rules
== NULL
)
223 for (i
= 0; i
< c
->max_buckets
; i
++) {
226 struct crush_bucket
*b
;
228 ceph_decode_32_safe(p
, end
, alg
, bad
);
230 c
->buckets
[i
] = NULL
;
233 dout("crush_decode bucket %d off %x %p to %p\n",
234 i
, (int)(*p
-start
), *p
, end
);
237 case CRUSH_BUCKET_UNIFORM
:
238 size
= sizeof(struct crush_bucket_uniform
);
240 case CRUSH_BUCKET_LIST
:
241 size
= sizeof(struct crush_bucket_list
);
243 case CRUSH_BUCKET_TREE
:
244 size
= sizeof(struct crush_bucket_tree
);
246 case CRUSH_BUCKET_STRAW
:
247 size
= sizeof(struct crush_bucket_straw
);
249 case CRUSH_BUCKET_STRAW2
:
250 size
= sizeof(struct crush_bucket_straw2
);
257 b
= c
->buckets
[i
] = kzalloc(size
, GFP_NOFS
);
261 ceph_decode_need(p
, end
, 4*sizeof(u32
), bad
);
262 b
->id
= ceph_decode_32(p
);
263 b
->type
= ceph_decode_16(p
);
264 b
->alg
= ceph_decode_8(p
);
265 b
->hash
= ceph_decode_8(p
);
266 b
->weight
= ceph_decode_32(p
);
267 b
->size
= ceph_decode_32(p
);
269 dout("crush_decode bucket size %d off %x %p to %p\n",
270 b
->size
, (int)(*p
-start
), *p
, end
);
272 b
->items
= kcalloc(b
->size
, sizeof(__s32
), GFP_NOFS
);
273 if (b
->items
== NULL
)
276 ceph_decode_need(p
, end
, b
->size
*sizeof(u32
), bad
);
277 for (j
= 0; j
< b
->size
; j
++)
278 b
->items
[j
] = ceph_decode_32(p
);
281 case CRUSH_BUCKET_UNIFORM
:
282 err
= crush_decode_uniform_bucket(p
, end
,
283 (struct crush_bucket_uniform
*)b
);
287 case CRUSH_BUCKET_LIST
:
288 err
= crush_decode_list_bucket(p
, end
,
289 (struct crush_bucket_list
*)b
);
293 case CRUSH_BUCKET_TREE
:
294 err
= crush_decode_tree_bucket(p
, end
,
295 (struct crush_bucket_tree
*)b
);
299 case CRUSH_BUCKET_STRAW
:
300 err
= crush_decode_straw_bucket(p
, end
,
301 (struct crush_bucket_straw
*)b
);
305 case CRUSH_BUCKET_STRAW2
:
306 err
= crush_decode_straw2_bucket(p
, end
,
307 (struct crush_bucket_straw2
*)b
);
315 dout("rule vec is %p\n", c
->rules
);
316 for (i
= 0; i
< c
->max_rules
; i
++) {
318 struct crush_rule
*r
;
320 ceph_decode_32_safe(p
, end
, yes
, bad
);
322 dout("crush_decode NO rule %d off %x %p to %p\n",
323 i
, (int)(*p
-start
), *p
, end
);
328 dout("crush_decode rule %d off %x %p to %p\n",
329 i
, (int)(*p
-start
), *p
, end
);
332 ceph_decode_32_safe(p
, end
, yes
, bad
);
333 #if BITS_PER_LONG == 32
335 if (yes
> (ULONG_MAX
- sizeof(*r
))
336 / sizeof(struct crush_rule_step
))
339 r
= c
->rules
[i
] = kmalloc(sizeof(*r
) +
340 yes
*sizeof(struct crush_rule_step
),
344 dout(" rule %d is at %p\n", i
, r
);
346 ceph_decode_copy_safe(p
, end
, &r
->mask
, 4, bad
); /* 4 u8's */
347 ceph_decode_need(p
, end
, r
->len
*3*sizeof(u32
), bad
);
348 for (j
= 0; j
< r
->len
; j
++) {
349 r
->steps
[j
].op
= ceph_decode_32(p
);
350 r
->steps
[j
].arg1
= ceph_decode_32(p
);
351 r
->steps
[j
].arg2
= ceph_decode_32(p
);
355 /* ignore trailing name maps. */
356 for (num_name_maps
= 0; num_name_maps
< 3; num_name_maps
++) {
357 err
= skip_name_map(p
, end
);
363 ceph_decode_need(p
, end
, 3*sizeof(u32
), done
);
364 c
->choose_local_tries
= ceph_decode_32(p
);
365 c
->choose_local_fallback_tries
= ceph_decode_32(p
);
366 c
->choose_total_tries
= ceph_decode_32(p
);
367 dout("crush decode tunable choose_local_tries = %d\n",
368 c
->choose_local_tries
);
369 dout("crush decode tunable choose_local_fallback_tries = %d\n",
370 c
->choose_local_fallback_tries
);
371 dout("crush decode tunable choose_total_tries = %d\n",
372 c
->choose_total_tries
);
374 ceph_decode_need(p
, end
, sizeof(u32
), done
);
375 c
->chooseleaf_descend_once
= ceph_decode_32(p
);
376 dout("crush decode tunable chooseleaf_descend_once = %d\n",
377 c
->chooseleaf_descend_once
);
379 ceph_decode_need(p
, end
, sizeof(u8
), done
);
380 c
->chooseleaf_vary_r
= ceph_decode_8(p
);
381 dout("crush decode tunable chooseleaf_vary_r = %d\n",
382 c
->chooseleaf_vary_r
);
384 /* skip straw_calc_version, allowed_bucket_algs */
385 ceph_decode_need(p
, end
, sizeof(u8
) + sizeof(u32
), done
);
386 *p
+= sizeof(u8
) + sizeof(u32
);
388 ceph_decode_need(p
, end
, sizeof(u8
), done
);
389 c
->chooseleaf_stable
= ceph_decode_8(p
);
390 dout("crush decode tunable chooseleaf_stable = %d\n",
391 c
->chooseleaf_stable
);
396 dout("crush_decode success\n");
402 dout("crush_decode fail %d\n", err
);
407 int ceph_pg_compare(const struct ceph_pg
*lhs
, const struct ceph_pg
*rhs
)
409 if (lhs
->pool
< rhs
->pool
)
411 if (lhs
->pool
> rhs
->pool
)
413 if (lhs
->seed
< rhs
->seed
)
415 if (lhs
->seed
> rhs
->seed
)
422 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
423 * to a set of osds) and primary_temp (explicit primary setting)
425 static int __insert_pg_mapping(struct ceph_pg_mapping
*new,
426 struct rb_root
*root
)
428 struct rb_node
**p
= &root
->rb_node
;
429 struct rb_node
*parent
= NULL
;
430 struct ceph_pg_mapping
*pg
= NULL
;
433 dout("__insert_pg_mapping %llx %p\n", *(u64
*)&new->pgid
, new);
436 pg
= rb_entry(parent
, struct ceph_pg_mapping
, node
);
437 c
= ceph_pg_compare(&new->pgid
, &pg
->pgid
);
446 rb_link_node(&new->node
, parent
, p
);
447 rb_insert_color(&new->node
, root
);
451 static struct ceph_pg_mapping
*__lookup_pg_mapping(struct rb_root
*root
,
454 struct rb_node
*n
= root
->rb_node
;
455 struct ceph_pg_mapping
*pg
;
459 pg
= rb_entry(n
, struct ceph_pg_mapping
, node
);
460 c
= ceph_pg_compare(&pgid
, &pg
->pgid
);
466 dout("__lookup_pg_mapping %lld.%x got %p\n",
467 pgid
.pool
, pgid
.seed
, pg
);
474 static int __remove_pg_mapping(struct rb_root
*root
, struct ceph_pg pgid
)
476 struct ceph_pg_mapping
*pg
= __lookup_pg_mapping(root
, pgid
);
479 dout("__remove_pg_mapping %lld.%x %p\n", pgid
.pool
, pgid
.seed
,
481 rb_erase(&pg
->node
, root
);
485 dout("__remove_pg_mapping %lld.%x dne\n", pgid
.pool
, pgid
.seed
);
490 * rbtree of pg pool info
492 static int __insert_pg_pool(struct rb_root
*root
, struct ceph_pg_pool_info
*new)
494 struct rb_node
**p
= &root
->rb_node
;
495 struct rb_node
*parent
= NULL
;
496 struct ceph_pg_pool_info
*pi
= NULL
;
500 pi
= rb_entry(parent
, struct ceph_pg_pool_info
, node
);
501 if (new->id
< pi
->id
)
503 else if (new->id
> pi
->id
)
509 rb_link_node(&new->node
, parent
, p
);
510 rb_insert_color(&new->node
, root
);
514 static struct ceph_pg_pool_info
*__lookup_pg_pool(struct rb_root
*root
, u64 id
)
516 struct ceph_pg_pool_info
*pi
;
517 struct rb_node
*n
= root
->rb_node
;
520 pi
= rb_entry(n
, struct ceph_pg_pool_info
, node
);
523 else if (id
> pi
->id
)
531 struct ceph_pg_pool_info
*ceph_pg_pool_by_id(struct ceph_osdmap
*map
, u64 id
)
533 return __lookup_pg_pool(&map
->pg_pools
, id
);
536 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap
*map
, u64 id
)
538 struct ceph_pg_pool_info
*pi
;
540 if (id
== CEPH_NOPOOL
)
543 if (WARN_ON_ONCE(id
> (u64
) INT_MAX
))
546 pi
= __lookup_pg_pool(&map
->pg_pools
, (int) id
);
548 return pi
? pi
->name
: NULL
;
550 EXPORT_SYMBOL(ceph_pg_pool_name_by_id
);
552 int ceph_pg_poolid_by_name(struct ceph_osdmap
*map
, const char *name
)
556 for (rbp
= rb_first(&map
->pg_pools
); rbp
; rbp
= rb_next(rbp
)) {
557 struct ceph_pg_pool_info
*pi
=
558 rb_entry(rbp
, struct ceph_pg_pool_info
, node
);
559 if (pi
->name
&& strcmp(pi
->name
, name
) == 0)
564 EXPORT_SYMBOL(ceph_pg_poolid_by_name
);
566 static void __remove_pg_pool(struct rb_root
*root
, struct ceph_pg_pool_info
*pi
)
568 rb_erase(&pi
->node
, root
);
573 static int decode_pool(void **p
, void *end
, struct ceph_pg_pool_info
*pi
)
579 ceph_decode_need(p
, end
, 2 + 4, bad
);
580 ev
= ceph_decode_8(p
); /* encoding version */
581 cv
= ceph_decode_8(p
); /* compat version */
583 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev
, cv
);
587 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev
, cv
);
590 len
= ceph_decode_32(p
);
591 ceph_decode_need(p
, end
, len
, bad
);
594 pi
->type
= ceph_decode_8(p
);
595 pi
->size
= ceph_decode_8(p
);
596 pi
->crush_ruleset
= ceph_decode_8(p
);
597 pi
->object_hash
= ceph_decode_8(p
);
599 pi
->pg_num
= ceph_decode_32(p
);
600 pi
->pgp_num
= ceph_decode_32(p
);
602 *p
+= 4 + 4; /* skip lpg* */
603 *p
+= 4; /* skip last_change */
604 *p
+= 8 + 4; /* skip snap_seq, snap_epoch */
607 num
= ceph_decode_32(p
);
609 *p
+= 8; /* snapid key */
610 *p
+= 1 + 1; /* versions */
611 len
= ceph_decode_32(p
);
615 /* skip removed_snaps */
616 num
= ceph_decode_32(p
);
619 *p
+= 8; /* skip auid */
620 pi
->flags
= ceph_decode_64(p
);
621 *p
+= 4; /* skip crash_replay_interval */
624 pi
->min_size
= ceph_decode_8(p
);
626 pi
->min_size
= pi
->size
- pi
->size
/ 2;
629 *p
+= 8 + 8; /* skip quota_max_* */
633 num
= ceph_decode_32(p
);
636 *p
+= 8; /* skip tier_of */
637 *p
+= 1; /* skip cache_mode */
639 pi
->read_tier
= ceph_decode_64(p
);
640 pi
->write_tier
= ceph_decode_64(p
);
647 /* skip properties */
648 num
= ceph_decode_32(p
);
650 len
= ceph_decode_32(p
);
652 len
= ceph_decode_32(p
);
658 /* skip hit_set_params */
659 *p
+= 1 + 1; /* versions */
660 len
= ceph_decode_32(p
);
663 *p
+= 4; /* skip hit_set_period */
664 *p
+= 4; /* skip hit_set_count */
668 *p
+= 4; /* skip stripe_width */
671 *p
+= 8; /* skip target_max_bytes */
672 *p
+= 8; /* skip target_max_objects */
673 *p
+= 4; /* skip cache_target_dirty_ratio_micro */
674 *p
+= 4; /* skip cache_target_full_ratio_micro */
675 *p
+= 4; /* skip cache_min_flush_age */
676 *p
+= 4; /* skip cache_min_evict_age */
680 /* skip erasure_code_profile */
681 len
= ceph_decode_32(p
);
686 pi
->last_force_request_resend
= ceph_decode_32(p
);
688 pi
->last_force_request_resend
= 0;
690 /* ignore the rest */
700 static int decode_pool_names(void **p
, void *end
, struct ceph_osdmap
*map
)
702 struct ceph_pg_pool_info
*pi
;
706 ceph_decode_32_safe(p
, end
, num
, bad
);
707 dout(" %d pool names\n", num
);
709 ceph_decode_64_safe(p
, end
, pool
, bad
);
710 ceph_decode_32_safe(p
, end
, len
, bad
);
711 dout(" pool %llu len %d\n", pool
, len
);
712 ceph_decode_need(p
, end
, len
, bad
);
713 pi
= __lookup_pg_pool(&map
->pg_pools
, pool
);
715 char *name
= kstrndup(*p
, len
, GFP_NOFS
);
721 dout(" name is %s\n", pi
->name
);
734 struct ceph_osdmap
*ceph_osdmap_alloc(void)
736 struct ceph_osdmap
*map
;
738 map
= kzalloc(sizeof(*map
), GFP_NOIO
);
742 map
->pg_pools
= RB_ROOT
;
744 map
->pg_temp
= RB_ROOT
;
745 map
->primary_temp
= RB_ROOT
;
746 mutex_init(&map
->crush_workspace_mutex
);
751 void ceph_osdmap_destroy(struct ceph_osdmap
*map
)
753 dout("osdmap_destroy %p\n", map
);
755 crush_destroy(map
->crush
);
756 while (!RB_EMPTY_ROOT(&map
->pg_temp
)) {
757 struct ceph_pg_mapping
*pg
=
758 rb_entry(rb_first(&map
->pg_temp
),
759 struct ceph_pg_mapping
, node
);
760 rb_erase(&pg
->node
, &map
->pg_temp
);
763 while (!RB_EMPTY_ROOT(&map
->primary_temp
)) {
764 struct ceph_pg_mapping
*pg
=
765 rb_entry(rb_first(&map
->primary_temp
),
766 struct ceph_pg_mapping
, node
);
767 rb_erase(&pg
->node
, &map
->primary_temp
);
770 while (!RB_EMPTY_ROOT(&map
->pg_pools
)) {
771 struct ceph_pg_pool_info
*pi
=
772 rb_entry(rb_first(&map
->pg_pools
),
773 struct ceph_pg_pool_info
, node
);
774 __remove_pg_pool(&map
->pg_pools
, pi
);
776 kfree(map
->osd_state
);
777 kfree(map
->osd_weight
);
778 kfree(map
->osd_addr
);
779 kfree(map
->osd_primary_affinity
);
780 kfree(map
->crush_workspace
);
785 * Adjust max_osd value, (re)allocate arrays.
787 * The new elements are properly initialized.
789 static int osdmap_set_max_osd(struct ceph_osdmap
*map
, int max
)
793 struct ceph_entity_addr
*addr
;
796 state
= krealloc(map
->osd_state
, max
*sizeof(*state
), GFP_NOFS
);
799 map
->osd_state
= state
;
801 weight
= krealloc(map
->osd_weight
, max
*sizeof(*weight
), GFP_NOFS
);
804 map
->osd_weight
= weight
;
806 addr
= krealloc(map
->osd_addr
, max
*sizeof(*addr
), GFP_NOFS
);
809 map
->osd_addr
= addr
;
811 for (i
= map
->max_osd
; i
< max
; i
++) {
812 map
->osd_state
[i
] = 0;
813 map
->osd_weight
[i
] = CEPH_OSD_OUT
;
814 memset(map
->osd_addr
+ i
, 0, sizeof(*map
->osd_addr
));
817 if (map
->osd_primary_affinity
) {
820 affinity
= krealloc(map
->osd_primary_affinity
,
821 max
*sizeof(*affinity
), GFP_NOFS
);
824 map
->osd_primary_affinity
= affinity
;
826 for (i
= map
->max_osd
; i
< max
; i
++)
827 map
->osd_primary_affinity
[i
] =
828 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
;
836 static int osdmap_set_crush(struct ceph_osdmap
*map
, struct crush_map
*crush
)
842 return PTR_ERR(crush
);
844 work_size
= crush_work_size(crush
, CEPH_PG_MAX_SIZE
);
845 dout("%s work_size %zu bytes\n", __func__
, work_size
);
846 workspace
= kmalloc(work_size
, GFP_NOIO
);
848 crush_destroy(crush
);
851 crush_init_workspace(crush
, workspace
);
854 crush_destroy(map
->crush
);
855 kfree(map
->crush_workspace
);
857 map
->crush_workspace
= workspace
;
861 #define OSDMAP_WRAPPER_COMPAT_VER 7
862 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1
865 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps,
866 * to struct_v of the client_data section for new (v7 and above)
869 static int get_osdmap_client_data_v(void **p
, void *end
,
870 const char *prefix
, u8
*v
)
874 ceph_decode_8_safe(p
, end
, struct_v
, e_inval
);
878 ceph_decode_8_safe(p
, end
, struct_compat
, e_inval
);
879 if (struct_compat
> OSDMAP_WRAPPER_COMPAT_VER
) {
880 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
881 struct_v
, struct_compat
,
882 OSDMAP_WRAPPER_COMPAT_VER
, prefix
);
885 *p
+= 4; /* ignore wrapper struct_len */
887 ceph_decode_8_safe(p
, end
, struct_v
, e_inval
);
888 ceph_decode_8_safe(p
, end
, struct_compat
, e_inval
);
889 if (struct_compat
> OSDMAP_CLIENT_DATA_COMPAT_VER
) {
890 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
891 struct_v
, struct_compat
,
892 OSDMAP_CLIENT_DATA_COMPAT_VER
, prefix
);
895 *p
+= 4; /* ignore client data struct_len */
900 ceph_decode_16_safe(p
, end
, version
, e_inval
);
902 pr_warn("got v %d < 6 of %s ceph_osdmap\n",
907 /* old osdmap enconding */
918 static int __decode_pools(void **p
, void *end
, struct ceph_osdmap
*map
,
923 ceph_decode_32_safe(p
, end
, n
, e_inval
);
925 struct ceph_pg_pool_info
*pi
;
929 ceph_decode_64_safe(p
, end
, pool
, e_inval
);
931 pi
= __lookup_pg_pool(&map
->pg_pools
, pool
);
932 if (!incremental
|| !pi
) {
933 pi
= kzalloc(sizeof(*pi
), GFP_NOFS
);
939 ret
= __insert_pg_pool(&map
->pg_pools
, pi
);
946 ret
= decode_pool(p
, end
, pi
);
957 static int decode_pools(void **p
, void *end
, struct ceph_osdmap
*map
)
959 return __decode_pools(p
, end
, map
, false);
962 static int decode_new_pools(void **p
, void *end
, struct ceph_osdmap
*map
)
964 return __decode_pools(p
, end
, map
, true);
967 static int __decode_pg_temp(void **p
, void *end
, struct ceph_osdmap
*map
,
972 ceph_decode_32_safe(p
, end
, n
, e_inval
);
978 ret
= ceph_decode_pgid(p
, end
, &pgid
);
982 ceph_decode_32_safe(p
, end
, len
, e_inval
);
984 ret
= __remove_pg_mapping(&map
->pg_temp
, pgid
);
985 BUG_ON(!incremental
&& ret
!= -ENOENT
);
987 if (!incremental
|| len
> 0) {
988 struct ceph_pg_mapping
*pg
;
990 ceph_decode_need(p
, end
, len
*sizeof(u32
), e_inval
);
992 if (len
> (UINT_MAX
- sizeof(*pg
)) / sizeof(u32
))
995 pg
= kzalloc(sizeof(*pg
) + len
*sizeof(u32
), GFP_NOFS
);
1000 pg
->pg_temp
.len
= len
;
1001 for (i
= 0; i
< len
; i
++)
1002 pg
->pg_temp
.osds
[i
] = ceph_decode_32(p
);
1004 ret
= __insert_pg_mapping(pg
, &map
->pg_temp
);
1018 static int decode_pg_temp(void **p
, void *end
, struct ceph_osdmap
*map
)
1020 return __decode_pg_temp(p
, end
, map
, false);
1023 static int decode_new_pg_temp(void **p
, void *end
, struct ceph_osdmap
*map
)
1025 return __decode_pg_temp(p
, end
, map
, true);
1028 static int __decode_primary_temp(void **p
, void *end
, struct ceph_osdmap
*map
,
1033 ceph_decode_32_safe(p
, end
, n
, e_inval
);
1035 struct ceph_pg pgid
;
1039 ret
= ceph_decode_pgid(p
, end
, &pgid
);
1043 ceph_decode_32_safe(p
, end
, osd
, e_inval
);
1045 ret
= __remove_pg_mapping(&map
->primary_temp
, pgid
);
1046 BUG_ON(!incremental
&& ret
!= -ENOENT
);
1048 if (!incremental
|| osd
!= (u32
)-1) {
1049 struct ceph_pg_mapping
*pg
;
1051 pg
= kzalloc(sizeof(*pg
), GFP_NOFS
);
1056 pg
->primary_temp
.osd
= osd
;
1058 ret
= __insert_pg_mapping(pg
, &map
->primary_temp
);
1072 static int decode_primary_temp(void **p
, void *end
, struct ceph_osdmap
*map
)
1074 return __decode_primary_temp(p
, end
, map
, false);
1077 static int decode_new_primary_temp(void **p
, void *end
,
1078 struct ceph_osdmap
*map
)
1080 return __decode_primary_temp(p
, end
, map
, true);
1083 u32
ceph_get_primary_affinity(struct ceph_osdmap
*map
, int osd
)
1085 BUG_ON(osd
>= map
->max_osd
);
1087 if (!map
->osd_primary_affinity
)
1088 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
;
1090 return map
->osd_primary_affinity
[osd
];
1093 static int set_primary_affinity(struct ceph_osdmap
*map
, int osd
, u32 aff
)
1095 BUG_ON(osd
>= map
->max_osd
);
1097 if (!map
->osd_primary_affinity
) {
1100 map
->osd_primary_affinity
= kmalloc(map
->max_osd
*sizeof(u32
),
1102 if (!map
->osd_primary_affinity
)
1105 for (i
= 0; i
< map
->max_osd
; i
++)
1106 map
->osd_primary_affinity
[i
] =
1107 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
;
1110 map
->osd_primary_affinity
[osd
] = aff
;
1115 static int decode_primary_affinity(void **p
, void *end
,
1116 struct ceph_osdmap
*map
)
1120 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1122 kfree(map
->osd_primary_affinity
);
1123 map
->osd_primary_affinity
= NULL
;
1126 if (len
!= map
->max_osd
)
1129 ceph_decode_need(p
, end
, map
->max_osd
*sizeof(u32
), e_inval
);
1131 for (i
= 0; i
< map
->max_osd
; i
++) {
1134 ret
= set_primary_affinity(map
, i
, ceph_decode_32(p
));
1145 static int decode_new_primary_affinity(void **p
, void *end
,
1146 struct ceph_osdmap
*map
)
1150 ceph_decode_32_safe(p
, end
, n
, e_inval
);
1155 ceph_decode_32_safe(p
, end
, osd
, e_inval
);
1156 ceph_decode_32_safe(p
, end
, aff
, e_inval
);
1158 ret
= set_primary_affinity(map
, osd
, aff
);
1162 pr_info("osd%d primary-affinity 0x%x\n", osd
, aff
);
1172 * decode a full map.
1174 static int osdmap_decode(void **p
, void *end
, struct ceph_osdmap
*map
)
1183 dout("%s %p to %p len %d\n", __func__
, *p
, end
, (int)(end
- *p
));
1185 err
= get_osdmap_client_data_v(p
, end
, "full", &struct_v
);
1189 /* fsid, epoch, created, modified */
1190 ceph_decode_need(p
, end
, sizeof(map
->fsid
) + sizeof(u32
) +
1191 sizeof(map
->created
) + sizeof(map
->modified
), e_inval
);
1192 ceph_decode_copy(p
, &map
->fsid
, sizeof(map
->fsid
));
1193 epoch
= map
->epoch
= ceph_decode_32(p
);
1194 ceph_decode_copy(p
, &map
->created
, sizeof(map
->created
));
1195 ceph_decode_copy(p
, &map
->modified
, sizeof(map
->modified
));
1198 err
= decode_pools(p
, end
, map
);
1203 err
= decode_pool_names(p
, end
, map
);
1207 ceph_decode_32_safe(p
, end
, map
->pool_max
, e_inval
);
1209 ceph_decode_32_safe(p
, end
, map
->flags
, e_inval
);
1212 ceph_decode_32_safe(p
, end
, max
, e_inval
);
1214 /* (re)alloc osd arrays */
1215 err
= osdmap_set_max_osd(map
, max
);
1219 /* osd_state, osd_weight, osd_addrs->client_addr */
1220 ceph_decode_need(p
, end
, 3*sizeof(u32
) +
1221 map
->max_osd
*(1 + sizeof(*map
->osd_weight
) +
1222 sizeof(*map
->osd_addr
)), e_inval
);
1224 if (ceph_decode_32(p
) != map
->max_osd
)
1227 ceph_decode_copy(p
, map
->osd_state
, map
->max_osd
);
1229 if (ceph_decode_32(p
) != map
->max_osd
)
1232 for (i
= 0; i
< map
->max_osd
; i
++)
1233 map
->osd_weight
[i
] = ceph_decode_32(p
);
1235 if (ceph_decode_32(p
) != map
->max_osd
)
1238 ceph_decode_copy(p
, map
->osd_addr
, map
->max_osd
*sizeof(*map
->osd_addr
));
1239 for (i
= 0; i
< map
->max_osd
; i
++)
1240 ceph_decode_addr(&map
->osd_addr
[i
]);
1243 err
= decode_pg_temp(p
, end
, map
);
1248 if (struct_v
>= 1) {
1249 err
= decode_primary_temp(p
, end
, map
);
1254 /* primary_affinity */
1255 if (struct_v
>= 2) {
1256 err
= decode_primary_affinity(p
, end
, map
);
1260 /* XXX can this happen? */
1261 kfree(map
->osd_primary_affinity
);
1262 map
->osd_primary_affinity
= NULL
;
1266 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1267 err
= osdmap_set_crush(map
, crush_decode(*p
, min(*p
+ len
, end
)));
1271 /* ignore the rest */
1274 dout("full osdmap epoch %d max_osd %d\n", map
->epoch
, map
->max_osd
);
1280 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1281 err
, epoch
, (int)(*p
- start
), *p
, start
, end
);
1282 print_hex_dump(KERN_DEBUG
, "osdmap: ",
1283 DUMP_PREFIX_OFFSET
, 16, 1,
1284 start
, end
- start
, true);
1289 * Allocate and decode a full map.
1291 struct ceph_osdmap
*ceph_osdmap_decode(void **p
, void *end
)
1293 struct ceph_osdmap
*map
;
1296 map
= ceph_osdmap_alloc();
1298 return ERR_PTR(-ENOMEM
);
1300 ret
= osdmap_decode(p
, end
, map
);
1302 ceph_osdmap_destroy(map
);
1303 return ERR_PTR(ret
);
1310 * Encoding order is (new_up_client, new_state, new_weight). Need to
1311 * apply in the (new_weight, new_state, new_up_client) order, because
1312 * an incremental map may look like e.g.
1314 * new_up_client: { osd=6, addr=... } # set osd_state and addr
1315 * new_state: { osd=6, xorstate=EXISTS } # clear osd_state
1317 static int decode_new_up_state_weight(void **p
, void *end
,
1318 struct ceph_osdmap
*map
)
1320 void *new_up_client
;
1322 void *new_weight_end
;
1326 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1327 len
*= sizeof(u32
) + sizeof(struct ceph_entity_addr
);
1328 ceph_decode_need(p
, end
, len
, e_inval
);
1332 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1333 len
*= sizeof(u32
) + sizeof(u8
);
1334 ceph_decode_need(p
, end
, len
, e_inval
);
1338 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1343 ceph_decode_need(p
, end
, 2*sizeof(u32
), e_inval
);
1344 osd
= ceph_decode_32(p
);
1345 w
= ceph_decode_32(p
);
1346 BUG_ON(osd
>= map
->max_osd
);
1347 pr_info("osd%d weight 0x%x %s\n", osd
, w
,
1348 w
== CEPH_OSD_IN
? "(in)" :
1349 (w
== CEPH_OSD_OUT
? "(out)" : ""));
1350 map
->osd_weight
[osd
] = w
;
1353 * If we are marking in, set the EXISTS, and clear the
1354 * AUTOOUT and NEW bits.
1357 map
->osd_state
[osd
] |= CEPH_OSD_EXISTS
;
1358 map
->osd_state
[osd
] &= ~(CEPH_OSD_AUTOOUT
|
1362 new_weight_end
= *p
;
1364 /* new_state (up/down) */
1366 len
= ceph_decode_32(p
);
1372 osd
= ceph_decode_32(p
);
1373 xorstate
= ceph_decode_8(p
);
1375 xorstate
= CEPH_OSD_UP
;
1376 BUG_ON(osd
>= map
->max_osd
);
1377 if ((map
->osd_state
[osd
] & CEPH_OSD_UP
) &&
1378 (xorstate
& CEPH_OSD_UP
))
1379 pr_info("osd%d down\n", osd
);
1380 if ((map
->osd_state
[osd
] & CEPH_OSD_EXISTS
) &&
1381 (xorstate
& CEPH_OSD_EXISTS
)) {
1382 pr_info("osd%d does not exist\n", osd
);
1383 map
->osd_weight
[osd
] = CEPH_OSD_IN
;
1384 ret
= set_primary_affinity(map
, osd
,
1385 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
);
1388 memset(map
->osd_addr
+ osd
, 0, sizeof(*map
->osd_addr
));
1389 map
->osd_state
[osd
] = 0;
1391 map
->osd_state
[osd
] ^= xorstate
;
1397 len
= ceph_decode_32(p
);
1400 struct ceph_entity_addr addr
;
1402 osd
= ceph_decode_32(p
);
1403 ceph_decode_copy(p
, &addr
, sizeof(addr
));
1404 ceph_decode_addr(&addr
);
1405 BUG_ON(osd
>= map
->max_osd
);
1406 pr_info("osd%d up\n", osd
);
1407 map
->osd_state
[osd
] |= CEPH_OSD_EXISTS
| CEPH_OSD_UP
;
1408 map
->osd_addr
[osd
] = addr
;
1411 *p
= new_weight_end
;
1419 * decode and apply an incremental map update.
1421 struct ceph_osdmap
*osdmap_apply_incremental(void **p
, void *end
,
1422 struct ceph_osdmap
*map
)
1424 struct ceph_fsid fsid
;
1426 struct ceph_timespec modified
;
1430 __s32 new_flags
, max
;
1435 dout("%s %p to %p len %d\n", __func__
, *p
, end
, (int)(end
- *p
));
1437 err
= get_osdmap_client_data_v(p
, end
, "inc", &struct_v
);
1441 /* fsid, epoch, modified, new_pool_max, new_flags */
1442 ceph_decode_need(p
, end
, sizeof(fsid
) + sizeof(u32
) + sizeof(modified
) +
1443 sizeof(u64
) + sizeof(u32
), e_inval
);
1444 ceph_decode_copy(p
, &fsid
, sizeof(fsid
));
1445 epoch
= ceph_decode_32(p
);
1446 BUG_ON(epoch
!= map
->epoch
+1);
1447 ceph_decode_copy(p
, &modified
, sizeof(modified
));
1448 new_pool_max
= ceph_decode_64(p
);
1449 new_flags
= ceph_decode_32(p
);
1452 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1454 dout("apply_incremental full map len %d, %p to %p\n",
1456 return ceph_osdmap_decode(p
, min(*p
+len
, end
));
1460 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1462 err
= osdmap_set_crush(map
,
1463 crush_decode(*p
, min(*p
+ len
, end
)));
1471 map
->flags
= new_flags
;
1472 if (new_pool_max
>= 0)
1473 map
->pool_max
= new_pool_max
;
1476 ceph_decode_32_safe(p
, end
, max
, e_inval
);
1478 err
= osdmap_set_max_osd(map
, max
);
1484 map
->modified
= modified
;
1487 err
= decode_new_pools(p
, end
, map
);
1491 /* new_pool_names */
1492 err
= decode_pool_names(p
, end
, map
);
1497 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1499 struct ceph_pg_pool_info
*pi
;
1501 ceph_decode_64_safe(p
, end
, pool
, e_inval
);
1502 pi
= __lookup_pg_pool(&map
->pg_pools
, pool
);
1504 __remove_pg_pool(&map
->pg_pools
, pi
);
1507 /* new_up_client, new_state, new_weight */
1508 err
= decode_new_up_state_weight(p
, end
, map
);
1513 err
= decode_new_pg_temp(p
, end
, map
);
1517 /* new_primary_temp */
1518 if (struct_v
>= 1) {
1519 err
= decode_new_primary_temp(p
, end
, map
);
1524 /* new_primary_affinity */
1525 if (struct_v
>= 2) {
1526 err
= decode_new_primary_affinity(p
, end
, map
);
1531 /* ignore the rest */
1534 dout("inc osdmap epoch %d max_osd %d\n", map
->epoch
, map
->max_osd
);
1540 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1541 err
, epoch
, (int)(*p
- start
), *p
, start
, end
);
1542 print_hex_dump(KERN_DEBUG
, "osdmap: ",
1543 DUMP_PREFIX_OFFSET
, 16, 1,
1544 start
, end
- start
, true);
1545 return ERR_PTR(err
);
1548 void ceph_oloc_copy(struct ceph_object_locator
*dest
,
1549 const struct ceph_object_locator
*src
)
1551 WARN_ON(!ceph_oloc_empty(dest
));
1552 WARN_ON(dest
->pool_ns
); /* empty() only covers ->pool */
1554 dest
->pool
= src
->pool
;
1556 dest
->pool_ns
= ceph_get_string(src
->pool_ns
);
1558 EXPORT_SYMBOL(ceph_oloc_copy
);
1560 void ceph_oloc_destroy(struct ceph_object_locator
*oloc
)
1562 ceph_put_string(oloc
->pool_ns
);
1564 EXPORT_SYMBOL(ceph_oloc_destroy
);
1566 void ceph_oid_copy(struct ceph_object_id
*dest
,
1567 const struct ceph_object_id
*src
)
1569 WARN_ON(!ceph_oid_empty(dest
));
1571 if (src
->name
!= src
->inline_name
) {
1572 /* very rare, see ceph_object_id definition */
1573 dest
->name
= kmalloc(src
->name_len
+ 1,
1574 GFP_NOIO
| __GFP_NOFAIL
);
1577 memcpy(dest
->name
, src
->name
, src
->name_len
+ 1);
1578 dest
->name_len
= src
->name_len
;
1580 EXPORT_SYMBOL(ceph_oid_copy
);
1582 static __printf(2, 0)
1583 int oid_printf_vargs(struct ceph_object_id
*oid
, const char *fmt
, va_list ap
)
1587 WARN_ON(!ceph_oid_empty(oid
));
1589 len
= vsnprintf(oid
->inline_name
, sizeof(oid
->inline_name
), fmt
, ap
);
1590 if (len
>= sizeof(oid
->inline_name
))
1593 oid
->name_len
= len
;
1598 * If oid doesn't fit into inline buffer, BUG.
1600 void ceph_oid_printf(struct ceph_object_id
*oid
, const char *fmt
, ...)
1605 BUG_ON(oid_printf_vargs(oid
, fmt
, ap
));
1608 EXPORT_SYMBOL(ceph_oid_printf
);
1610 static __printf(3, 0)
1611 int oid_aprintf_vargs(struct ceph_object_id
*oid
, gfp_t gfp
,
1612 const char *fmt
, va_list ap
)
1618 len
= oid_printf_vargs(oid
, fmt
, aq
);
1622 char *external_name
;
1624 external_name
= kmalloc(len
+ 1, gfp
);
1628 oid
->name
= external_name
;
1629 WARN_ON(vsnprintf(oid
->name
, len
+ 1, fmt
, ap
) != len
);
1630 oid
->name_len
= len
;
1637 * If oid doesn't fit into inline buffer, allocate.
1639 int ceph_oid_aprintf(struct ceph_object_id
*oid
, gfp_t gfp
,
1640 const char *fmt
, ...)
1646 ret
= oid_aprintf_vargs(oid
, gfp
, fmt
, ap
);
1651 EXPORT_SYMBOL(ceph_oid_aprintf
);
1653 void ceph_oid_destroy(struct ceph_object_id
*oid
)
1655 if (oid
->name
!= oid
->inline_name
)
1658 EXPORT_SYMBOL(ceph_oid_destroy
);
1663 static bool __osds_equal(const struct ceph_osds
*lhs
,
1664 const struct ceph_osds
*rhs
)
1666 if (lhs
->size
== rhs
->size
&&
1667 !memcmp(lhs
->osds
, rhs
->osds
, rhs
->size
* sizeof(rhs
->osds
[0])))
1676 static bool osds_equal(const struct ceph_osds
*lhs
,
1677 const struct ceph_osds
*rhs
)
1679 if (__osds_equal(lhs
, rhs
) &&
1680 lhs
->primary
== rhs
->primary
)
1686 static bool osds_valid(const struct ceph_osds
*set
)
1689 if (set
->size
> 0 && set
->primary
>= 0)
1692 /* empty can_shift_osds set */
1693 if (!set
->size
&& set
->primary
== -1)
1696 /* empty !can_shift_osds set - all NONE */
1697 if (set
->size
> 0 && set
->primary
== -1) {
1700 for (i
= 0; i
< set
->size
; i
++) {
1701 if (set
->osds
[i
] != CRUSH_ITEM_NONE
)
1711 void ceph_osds_copy(struct ceph_osds
*dest
, const struct ceph_osds
*src
)
1713 memcpy(dest
->osds
, src
->osds
, src
->size
* sizeof(src
->osds
[0]));
1714 dest
->size
= src
->size
;
1715 dest
->primary
= src
->primary
;
1718 static bool is_split(const struct ceph_pg
*pgid
,
1722 int old_bits
= calc_bits_of(old_pg_num
);
1723 int old_mask
= (1 << old_bits
) - 1;
1726 WARN_ON(pgid
->seed
>= old_pg_num
);
1727 if (new_pg_num
<= old_pg_num
)
1730 for (n
= 1; ; n
++) {
1731 int next_bit
= n
<< (old_bits
- 1);
1732 u32 s
= next_bit
| pgid
->seed
;
1734 if (s
< old_pg_num
|| s
== pgid
->seed
)
1736 if (s
>= new_pg_num
)
1739 s
= ceph_stable_mod(s
, old_pg_num
, old_mask
);
1740 if (s
== pgid
->seed
)
1747 bool ceph_is_new_interval(const struct ceph_osds
*old_acting
,
1748 const struct ceph_osds
*new_acting
,
1749 const struct ceph_osds
*old_up
,
1750 const struct ceph_osds
*new_up
,
1757 bool old_sort_bitwise
,
1758 bool new_sort_bitwise
,
1759 const struct ceph_pg
*pgid
)
1761 return !osds_equal(old_acting
, new_acting
) ||
1762 !osds_equal(old_up
, new_up
) ||
1763 old_size
!= new_size
||
1764 old_min_size
!= new_min_size
||
1765 is_split(pgid
, old_pg_num
, new_pg_num
) ||
1766 old_sort_bitwise
!= new_sort_bitwise
;
1769 static int calc_pg_rank(int osd
, const struct ceph_osds
*acting
)
1773 for (i
= 0; i
< acting
->size
; i
++) {
1774 if (acting
->osds
[i
] == osd
)
1781 static bool primary_changed(const struct ceph_osds
*old_acting
,
1782 const struct ceph_osds
*new_acting
)
1784 if (!old_acting
->size
&& !new_acting
->size
)
1785 return false; /* both still empty */
1787 if (!old_acting
->size
^ !new_acting
->size
)
1788 return true; /* was empty, now not, or vice versa */
1790 if (old_acting
->primary
!= new_acting
->primary
)
1791 return true; /* primary changed */
1793 if (calc_pg_rank(old_acting
->primary
, old_acting
) !=
1794 calc_pg_rank(new_acting
->primary
, new_acting
))
1797 return false; /* same primary (tho replicas may have changed) */
1800 bool ceph_osds_changed(const struct ceph_osds
*old_acting
,
1801 const struct ceph_osds
*new_acting
,
1804 if (primary_changed(old_acting
, new_acting
))
1807 if (any_change
&& !__osds_equal(old_acting
, new_acting
))
1814 * calculate file layout from given offset, length.
1815 * fill in correct oid, logical length, and object extent
1818 * for now, we write only a single su, until we can
1819 * pass a stride back to the caller.
1821 int ceph_calc_file_object_mapping(struct ceph_file_layout
*layout
,
1824 u64
*oxoff
, u64
*oxlen
)
1826 u32 osize
= layout
->object_size
;
1827 u32 su
= layout
->stripe_unit
;
1828 u32 sc
= layout
->stripe_count
;
1829 u32 bl
, stripeno
, stripepos
, objsetno
;
1833 dout("mapping %llu~%llu osize %u fl_su %u\n", off
, len
,
1835 if (su
== 0 || sc
== 0)
1837 su_per_object
= osize
/ su
;
1838 if (su_per_object
== 0)
1840 dout("osize %u / su %u = su_per_object %u\n", osize
, su
,
1843 if ((su
& ~PAGE_MASK
) != 0)
1846 /* bl = *off / su; */
1850 dout("off %llu / su %u = bl %u\n", off
, su
, bl
);
1853 stripepos
= bl
% sc
;
1854 objsetno
= stripeno
/ su_per_object
;
1856 *ono
= objsetno
* sc
+ stripepos
;
1857 dout("objset %u * sc %u = ono %u\n", objsetno
, sc
, (unsigned int)*ono
);
1859 /* *oxoff = *off % layout->fl_stripe_unit; # offset in su */
1861 su_offset
= do_div(t
, su
);
1862 *oxoff
= su_offset
+ (stripeno
% su_per_object
) * su
;
1865 * Calculate the length of the extent being written to the selected
1866 * object. This is the minimum of the full length requested (len) or
1867 * the remainder of the current stripe being written to.
1869 *oxlen
= min_t(u64
, len
, su
- su_offset
);
1871 dout(" obj extent %llu~%llu\n", *oxoff
, *oxlen
);
1875 dout(" invalid layout\n");
1881 EXPORT_SYMBOL(ceph_calc_file_object_mapping
);
1884 * Map an object into a PG.
1886 * Should only be called with target_oid and target_oloc (as opposed to
1887 * base_oid and base_oloc), since tiering isn't taken into account.
1889 int ceph_object_locator_to_pg(struct ceph_osdmap
*osdmap
,
1890 struct ceph_object_id
*oid
,
1891 struct ceph_object_locator
*oloc
,
1892 struct ceph_pg
*raw_pgid
)
1894 struct ceph_pg_pool_info
*pi
;
1896 pi
= ceph_pg_pool_by_id(osdmap
, oloc
->pool
);
1900 if (!oloc
->pool_ns
) {
1901 raw_pgid
->pool
= oloc
->pool
;
1902 raw_pgid
->seed
= ceph_str_hash(pi
->object_hash
, oid
->name
,
1904 dout("%s %s -> raw_pgid %llu.%x\n", __func__
, oid
->name
,
1905 raw_pgid
->pool
, raw_pgid
->seed
);
1907 char stack_buf
[256];
1908 char *buf
= stack_buf
;
1909 int nsl
= oloc
->pool_ns
->len
;
1910 size_t total
= nsl
+ 1 + oid
->name_len
;
1912 if (total
> sizeof(stack_buf
)) {
1913 buf
= kmalloc(total
, GFP_NOIO
);
1917 memcpy(buf
, oloc
->pool_ns
->str
, nsl
);
1919 memcpy(buf
+ nsl
+ 1, oid
->name
, oid
->name_len
);
1920 raw_pgid
->pool
= oloc
->pool
;
1921 raw_pgid
->seed
= ceph_str_hash(pi
->object_hash
, buf
, total
);
1922 if (buf
!= stack_buf
)
1924 dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__
,
1925 oid
->name
, nsl
, oloc
->pool_ns
->str
,
1926 raw_pgid
->pool
, raw_pgid
->seed
);
1930 EXPORT_SYMBOL(ceph_object_locator_to_pg
);
1933 * Map a raw PG (full precision ps) into an actual PG.
1935 static void raw_pg_to_pg(struct ceph_pg_pool_info
*pi
,
1936 const struct ceph_pg
*raw_pgid
,
1937 struct ceph_pg
*pgid
)
1939 pgid
->pool
= raw_pgid
->pool
;
1940 pgid
->seed
= ceph_stable_mod(raw_pgid
->seed
, pi
->pg_num
,
1945 * Map a raw PG (full precision ps) into a placement ps (placement
1946 * seed). Include pool id in that value so that different pools don't
1947 * use the same seeds.
1949 static u32
raw_pg_to_pps(struct ceph_pg_pool_info
*pi
,
1950 const struct ceph_pg
*raw_pgid
)
1952 if (pi
->flags
& CEPH_POOL_FLAG_HASHPSPOOL
) {
1953 /* hash pool id and seed so that pool PGs do not overlap */
1954 return crush_hash32_2(CRUSH_HASH_RJENKINS1
,
1955 ceph_stable_mod(raw_pgid
->seed
,
1961 * legacy behavior: add ps and pool together. this is
1962 * not a great approach because the PGs from each pool
1963 * will overlap on top of each other: 0.5 == 1.4 ==
1966 return ceph_stable_mod(raw_pgid
->seed
, pi
->pgp_num
,
1968 (unsigned)raw_pgid
->pool
;
1972 static int do_crush(struct ceph_osdmap
*map
, int ruleno
, int x
,
1973 int *result
, int result_max
,
1974 const __u32
*weight
, int weight_max
)
1978 BUG_ON(result_max
> CEPH_PG_MAX_SIZE
);
1980 mutex_lock(&map
->crush_workspace_mutex
);
1981 r
= crush_do_rule(map
->crush
, ruleno
, x
, result
, result_max
,
1982 weight
, weight_max
, map
->crush_workspace
);
1983 mutex_unlock(&map
->crush_workspace_mutex
);
1989 * Calculate raw set (CRUSH output) for given PG. The result may
1990 * contain nonexistent OSDs. ->primary is undefined for a raw set.
1992 * Placement seed (CRUSH input) is returned through @ppps.
1994 static void pg_to_raw_osds(struct ceph_osdmap
*osdmap
,
1995 struct ceph_pg_pool_info
*pi
,
1996 const struct ceph_pg
*raw_pgid
,
1997 struct ceph_osds
*raw
,
2000 u32 pps
= raw_pg_to_pps(pi
, raw_pgid
);
2004 ceph_osds_init(raw
);
2008 ruleno
= crush_find_rule(osdmap
->crush
, pi
->crush_ruleset
, pi
->type
,
2011 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
2012 pi
->id
, pi
->crush_ruleset
, pi
->type
, pi
->size
);
2016 if (pi
->size
> ARRAY_SIZE(raw
->osds
)) {
2017 pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
2018 pi
->id
, pi
->crush_ruleset
, pi
->type
, pi
->size
,
2019 ARRAY_SIZE(raw
->osds
));
2023 len
= do_crush(osdmap
, ruleno
, pps
, raw
->osds
, pi
->size
,
2024 osdmap
->osd_weight
, osdmap
->max_osd
);
2026 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
2027 len
, ruleno
, pi
->id
, pi
->crush_ruleset
, pi
->type
,
2036 * Given raw set, calculate up set and up primary. By definition of an
2037 * up set, the result won't contain nonexistent or down OSDs.
2039 * This is done in-place - on return @set is the up set. If it's
2040 * empty, ->primary will remain undefined.
2042 static void raw_to_up_osds(struct ceph_osdmap
*osdmap
,
2043 struct ceph_pg_pool_info
*pi
,
2044 struct ceph_osds
*set
)
2048 /* ->primary is undefined for a raw set */
2049 BUG_ON(set
->primary
!= -1);
2051 if (ceph_can_shift_osds(pi
)) {
2055 for (i
= 0; i
< set
->size
; i
++) {
2056 if (ceph_osd_is_down(osdmap
, set
->osds
[i
])) {
2061 set
->osds
[i
- removed
] = set
->osds
[i
];
2063 set
->size
-= removed
;
2065 set
->primary
= set
->osds
[0];
2067 /* set down/dne devices to NONE */
2068 for (i
= set
->size
- 1; i
>= 0; i
--) {
2069 if (ceph_osd_is_down(osdmap
, set
->osds
[i
]))
2070 set
->osds
[i
] = CRUSH_ITEM_NONE
;
2072 set
->primary
= set
->osds
[i
];
2077 static void apply_primary_affinity(struct ceph_osdmap
*osdmap
,
2078 struct ceph_pg_pool_info
*pi
,
2080 struct ceph_osds
*up
)
2086 * Do we have any non-default primary_affinity values for these
2089 if (!osdmap
->osd_primary_affinity
)
2092 for (i
= 0; i
< up
->size
; i
++) {
2093 int osd
= up
->osds
[i
];
2095 if (osd
!= CRUSH_ITEM_NONE
&&
2096 osdmap
->osd_primary_affinity
[osd
] !=
2097 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
) {
2105 * Pick the primary. Feed both the seed (for the pg) and the
2106 * osd into the hash/rng so that a proportional fraction of an
2107 * osd's pgs get rejected as primary.
2109 for (i
= 0; i
< up
->size
; i
++) {
2110 int osd
= up
->osds
[i
];
2113 if (osd
== CRUSH_ITEM_NONE
)
2116 aff
= osdmap
->osd_primary_affinity
[osd
];
2117 if (aff
< CEPH_OSD_MAX_PRIMARY_AFFINITY
&&
2118 (crush_hash32_2(CRUSH_HASH_RJENKINS1
,
2119 pps
, osd
) >> 16) >= aff
) {
2121 * We chose not to use this primary. Note it
2122 * anyway as a fallback in case we don't pick
2123 * anyone else, but keep looking.
2135 up
->primary
= up
->osds
[pos
];
2137 if (ceph_can_shift_osds(pi
) && pos
> 0) {
2138 /* move the new primary to the front */
2139 for (i
= pos
; i
> 0; i
--)
2140 up
->osds
[i
] = up
->osds
[i
- 1];
2141 up
->osds
[0] = up
->primary
;
2146 * Get pg_temp and primary_temp mappings for given PG.
2148 * Note that a PG may have none, only pg_temp, only primary_temp or
2149 * both pg_temp and primary_temp mappings. This means @temp isn't
2150 * always a valid OSD set on return: in the "only primary_temp" case,
2151 * @temp will have its ->primary >= 0 but ->size == 0.
2153 static void get_temp_osds(struct ceph_osdmap
*osdmap
,
2154 struct ceph_pg_pool_info
*pi
,
2155 const struct ceph_pg
*raw_pgid
,
2156 struct ceph_osds
*temp
)
2158 struct ceph_pg pgid
;
2159 struct ceph_pg_mapping
*pg
;
2162 raw_pg_to_pg(pi
, raw_pgid
, &pgid
);
2163 ceph_osds_init(temp
);
2166 pg
= __lookup_pg_mapping(&osdmap
->pg_temp
, pgid
);
2168 for (i
= 0; i
< pg
->pg_temp
.len
; i
++) {
2169 if (ceph_osd_is_down(osdmap
, pg
->pg_temp
.osds
[i
])) {
2170 if (ceph_can_shift_osds(pi
))
2173 temp
->osds
[temp
->size
++] = CRUSH_ITEM_NONE
;
2175 temp
->osds
[temp
->size
++] = pg
->pg_temp
.osds
[i
];
2179 /* apply pg_temp's primary */
2180 for (i
= 0; i
< temp
->size
; i
++) {
2181 if (temp
->osds
[i
] != CRUSH_ITEM_NONE
) {
2182 temp
->primary
= temp
->osds
[i
];
2189 pg
= __lookup_pg_mapping(&osdmap
->primary_temp
, pgid
);
2191 temp
->primary
= pg
->primary_temp
.osd
;
2195 * Map a PG to its acting set as well as its up set.
2197 * Acting set is used for data mapping purposes, while up set can be
2198 * recorded for detecting interval changes and deciding whether to
2201 void ceph_pg_to_up_acting_osds(struct ceph_osdmap
*osdmap
,
2202 const struct ceph_pg
*raw_pgid
,
2203 struct ceph_osds
*up
,
2204 struct ceph_osds
*acting
)
2206 struct ceph_pg_pool_info
*pi
;
2209 pi
= ceph_pg_pool_by_id(osdmap
, raw_pgid
->pool
);
2212 ceph_osds_init(acting
);
2216 pg_to_raw_osds(osdmap
, pi
, raw_pgid
, up
, &pps
);
2217 raw_to_up_osds(osdmap
, pi
, up
);
2218 apply_primary_affinity(osdmap
, pi
, pps
, up
);
2219 get_temp_osds(osdmap
, pi
, raw_pgid
, acting
);
2220 if (!acting
->size
) {
2221 memcpy(acting
->osds
, up
->osds
, up
->size
* sizeof(up
->osds
[0]));
2222 acting
->size
= up
->size
;
2223 if (acting
->primary
== -1)
2224 acting
->primary
= up
->primary
;
2227 WARN_ON(!osds_valid(up
) || !osds_valid(acting
));
2231 * Return acting primary for given PG, or -1 if none.
2233 int ceph_pg_to_acting_primary(struct ceph_osdmap
*osdmap
,
2234 const struct ceph_pg
*raw_pgid
)
2236 struct ceph_osds up
, acting
;
2238 ceph_pg_to_up_acting_osds(osdmap
, raw_pgid
, &up
, &acting
);
2239 return acting
.primary
;
2241 EXPORT_SYMBOL(ceph_pg_to_acting_primary
);