2 #include <linux/ceph/ceph_debug.h>
4 #include <linux/module.h>
5 #include <linux/slab.h>
8 #include <linux/ceph/libceph.h>
9 #include <linux/ceph/osdmap.h>
10 #include <linux/ceph/decode.h>
11 #include <linux/crush/hash.h>
12 #include <linux/crush/mapper.h>
14 char *ceph_osdmap_state_str(char *str
, int len
, int state
)
19 if ((state
& CEPH_OSD_EXISTS
) && (state
& CEPH_OSD_UP
))
20 snprintf(str
, len
, "exists, up");
21 else if (state
& CEPH_OSD_EXISTS
)
22 snprintf(str
, len
, "exists");
23 else if (state
& CEPH_OSD_UP
)
24 snprintf(str
, len
, "up");
26 snprintf(str
, len
, "doesn't exist");
33 static int calc_bits_of(unsigned int t
)
44 * the foo_mask is the smallest value 2^n-1 that is >= foo.
46 static void calc_pg_masks(struct ceph_pg_pool_info
*pi
)
48 pi
->pg_num_mask
= (1 << calc_bits_of(pi
->pg_num
-1)) - 1;
49 pi
->pgp_num_mask
= (1 << calc_bits_of(pi
->pgp_num
-1)) - 1;
55 static int crush_decode_uniform_bucket(void **p
, void *end
,
56 struct crush_bucket_uniform
*b
)
58 dout("crush_decode_uniform_bucket %p to %p\n", *p
, end
);
59 ceph_decode_need(p
, end
, (1+b
->h
.size
) * sizeof(u32
), bad
);
60 b
->item_weight
= ceph_decode_32(p
);
66 static int crush_decode_list_bucket(void **p
, void *end
,
67 struct crush_bucket_list
*b
)
70 dout("crush_decode_list_bucket %p to %p\n", *p
, end
);
71 b
->item_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
72 if (b
->item_weights
== NULL
)
74 b
->sum_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
75 if (b
->sum_weights
== NULL
)
77 ceph_decode_need(p
, end
, 2 * b
->h
.size
* sizeof(u32
), bad
);
78 for (j
= 0; j
< b
->h
.size
; j
++) {
79 b
->item_weights
[j
] = ceph_decode_32(p
);
80 b
->sum_weights
[j
] = ceph_decode_32(p
);
87 static int crush_decode_tree_bucket(void **p
, void *end
,
88 struct crush_bucket_tree
*b
)
91 dout("crush_decode_tree_bucket %p to %p\n", *p
, end
);
92 ceph_decode_8_safe(p
, end
, b
->num_nodes
, bad
);
93 b
->node_weights
= kcalloc(b
->num_nodes
, sizeof(u32
), GFP_NOFS
);
94 if (b
->node_weights
== NULL
)
96 ceph_decode_need(p
, end
, b
->num_nodes
* sizeof(u32
), bad
);
97 for (j
= 0; j
< b
->num_nodes
; j
++)
98 b
->node_weights
[j
] = ceph_decode_32(p
);
104 static int crush_decode_straw_bucket(void **p
, void *end
,
105 struct crush_bucket_straw
*b
)
108 dout("crush_decode_straw_bucket %p to %p\n", *p
, end
);
109 b
->item_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
110 if (b
->item_weights
== NULL
)
112 b
->straws
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
113 if (b
->straws
== NULL
)
115 ceph_decode_need(p
, end
, 2 * b
->h
.size
* sizeof(u32
), bad
);
116 for (j
= 0; j
< b
->h
.size
; j
++) {
117 b
->item_weights
[j
] = ceph_decode_32(p
);
118 b
->straws
[j
] = ceph_decode_32(p
);
125 static int crush_decode_straw2_bucket(void **p
, void *end
,
126 struct crush_bucket_straw2
*b
)
129 dout("crush_decode_straw2_bucket %p to %p\n", *p
, end
);
130 b
->item_weights
= kcalloc(b
->h
.size
, sizeof(u32
), GFP_NOFS
);
131 if (b
->item_weights
== NULL
)
133 ceph_decode_need(p
, end
, b
->h
.size
* sizeof(u32
), bad
);
134 for (j
= 0; j
< b
->h
.size
; j
++)
135 b
->item_weights
[j
] = ceph_decode_32(p
);
141 static int skip_name_map(void **p
, void *end
)
144 ceph_decode_32_safe(p
, end
, len
,bad
);
148 ceph_decode_32_safe(p
, end
, strlen
, bad
);
156 static struct crush_map
*crush_decode(void *pbyval
, void *end
)
162 void *start
= pbyval
;
166 dout("crush_decode %p to %p len %d\n", *p
, end
, (int)(end
- *p
));
168 c
= kzalloc(sizeof(*c
), GFP_NOFS
);
170 return ERR_PTR(-ENOMEM
);
172 /* set tunables to default values */
173 c
->choose_local_tries
= 2;
174 c
->choose_local_fallback_tries
= 5;
175 c
->choose_total_tries
= 19;
176 c
->chooseleaf_descend_once
= 0;
178 ceph_decode_need(p
, end
, 4*sizeof(u32
), bad
);
179 magic
= ceph_decode_32(p
);
180 if (magic
!= CRUSH_MAGIC
) {
181 pr_err("crush_decode magic %x != current %x\n",
182 (unsigned int)magic
, (unsigned int)CRUSH_MAGIC
);
185 c
->max_buckets
= ceph_decode_32(p
);
186 c
->max_rules
= ceph_decode_32(p
);
187 c
->max_devices
= ceph_decode_32(p
);
189 c
->buckets
= kcalloc(c
->max_buckets
, sizeof(*c
->buckets
), GFP_NOFS
);
190 if (c
->buckets
== NULL
)
192 c
->rules
= kcalloc(c
->max_rules
, sizeof(*c
->rules
), GFP_NOFS
);
193 if (c
->rules
== NULL
)
197 for (i
= 0; i
< c
->max_buckets
; i
++) {
200 struct crush_bucket
*b
;
202 ceph_decode_32_safe(p
, end
, alg
, bad
);
204 c
->buckets
[i
] = NULL
;
207 dout("crush_decode bucket %d off %x %p to %p\n",
208 i
, (int)(*p
-start
), *p
, end
);
211 case CRUSH_BUCKET_UNIFORM
:
212 size
= sizeof(struct crush_bucket_uniform
);
214 case CRUSH_BUCKET_LIST
:
215 size
= sizeof(struct crush_bucket_list
);
217 case CRUSH_BUCKET_TREE
:
218 size
= sizeof(struct crush_bucket_tree
);
220 case CRUSH_BUCKET_STRAW
:
221 size
= sizeof(struct crush_bucket_straw
);
223 case CRUSH_BUCKET_STRAW2
:
224 size
= sizeof(struct crush_bucket_straw2
);
231 b
= c
->buckets
[i
] = kzalloc(size
, GFP_NOFS
);
235 ceph_decode_need(p
, end
, 4*sizeof(u32
), bad
);
236 b
->id
= ceph_decode_32(p
);
237 b
->type
= ceph_decode_16(p
);
238 b
->alg
= ceph_decode_8(p
);
239 b
->hash
= ceph_decode_8(p
);
240 b
->weight
= ceph_decode_32(p
);
241 b
->size
= ceph_decode_32(p
);
243 dout("crush_decode bucket size %d off %x %p to %p\n",
244 b
->size
, (int)(*p
-start
), *p
, end
);
246 b
->items
= kcalloc(b
->size
, sizeof(__s32
), GFP_NOFS
);
247 if (b
->items
== NULL
)
249 b
->perm
= kcalloc(b
->size
, sizeof(u32
), GFP_NOFS
);
254 ceph_decode_need(p
, end
, b
->size
*sizeof(u32
), bad
);
255 for (j
= 0; j
< b
->size
; j
++)
256 b
->items
[j
] = ceph_decode_32(p
);
259 case CRUSH_BUCKET_UNIFORM
:
260 err
= crush_decode_uniform_bucket(p
, end
,
261 (struct crush_bucket_uniform
*)b
);
265 case CRUSH_BUCKET_LIST
:
266 err
= crush_decode_list_bucket(p
, end
,
267 (struct crush_bucket_list
*)b
);
271 case CRUSH_BUCKET_TREE
:
272 err
= crush_decode_tree_bucket(p
, end
,
273 (struct crush_bucket_tree
*)b
);
277 case CRUSH_BUCKET_STRAW
:
278 err
= crush_decode_straw_bucket(p
, end
,
279 (struct crush_bucket_straw
*)b
);
283 case CRUSH_BUCKET_STRAW2
:
284 err
= crush_decode_straw2_bucket(p
, end
,
285 (struct crush_bucket_straw2
*)b
);
293 dout("rule vec is %p\n", c
->rules
);
294 for (i
= 0; i
< c
->max_rules
; i
++) {
296 struct crush_rule
*r
;
298 ceph_decode_32_safe(p
, end
, yes
, bad
);
300 dout("crush_decode NO rule %d off %x %p to %p\n",
301 i
, (int)(*p
-start
), *p
, end
);
306 dout("crush_decode rule %d off %x %p to %p\n",
307 i
, (int)(*p
-start
), *p
, end
);
310 ceph_decode_32_safe(p
, end
, yes
, bad
);
311 #if BITS_PER_LONG == 32
313 if (yes
> (ULONG_MAX
- sizeof(*r
))
314 / sizeof(struct crush_rule_step
))
317 r
= c
->rules
[i
] = kmalloc(sizeof(*r
) +
318 yes
*sizeof(struct crush_rule_step
),
322 dout(" rule %d is at %p\n", i
, r
);
324 ceph_decode_copy_safe(p
, end
, &r
->mask
, 4, bad
); /* 4 u8's */
325 ceph_decode_need(p
, end
, r
->len
*3*sizeof(u32
), bad
);
326 for (j
= 0; j
< r
->len
; j
++) {
327 r
->steps
[j
].op
= ceph_decode_32(p
);
328 r
->steps
[j
].arg1
= ceph_decode_32(p
);
329 r
->steps
[j
].arg2
= ceph_decode_32(p
);
333 /* ignore trailing name maps. */
334 for (num_name_maps
= 0; num_name_maps
< 3; num_name_maps
++) {
335 err
= skip_name_map(p
, end
);
341 ceph_decode_need(p
, end
, 3*sizeof(u32
), done
);
342 c
->choose_local_tries
= ceph_decode_32(p
);
343 c
->choose_local_fallback_tries
= ceph_decode_32(p
);
344 c
->choose_total_tries
= ceph_decode_32(p
);
345 dout("crush decode tunable choose_local_tries = %d\n",
346 c
->choose_local_tries
);
347 dout("crush decode tunable choose_local_fallback_tries = %d\n",
348 c
->choose_local_fallback_tries
);
349 dout("crush decode tunable choose_total_tries = %d\n",
350 c
->choose_total_tries
);
352 ceph_decode_need(p
, end
, sizeof(u32
), done
);
353 c
->chooseleaf_descend_once
= ceph_decode_32(p
);
354 dout("crush decode tunable chooseleaf_descend_once = %d\n",
355 c
->chooseleaf_descend_once
);
357 ceph_decode_need(p
, end
, sizeof(u8
), done
);
358 c
->chooseleaf_vary_r
= ceph_decode_8(p
);
359 dout("crush decode tunable chooseleaf_vary_r = %d\n",
360 c
->chooseleaf_vary_r
);
362 /* skip straw_calc_version, allowed_bucket_algs */
363 ceph_decode_need(p
, end
, sizeof(u8
) + sizeof(u32
), done
);
364 *p
+= sizeof(u8
) + sizeof(u32
);
366 ceph_decode_need(p
, end
, sizeof(u8
), done
);
367 c
->chooseleaf_stable
= ceph_decode_8(p
);
368 dout("crush decode tunable chooseleaf_stable = %d\n",
369 c
->chooseleaf_stable
);
372 dout("crush_decode success\n");
378 dout("crush_decode fail %d\n", err
);
384 * rbtree of pg_mapping for handling pg_temp (explicit mapping of pgid
385 * to a set of osds) and primary_temp (explicit primary setting)
387 static int pgid_cmp(struct ceph_pg l
, struct ceph_pg r
)
400 static int __insert_pg_mapping(struct ceph_pg_mapping
*new,
401 struct rb_root
*root
)
403 struct rb_node
**p
= &root
->rb_node
;
404 struct rb_node
*parent
= NULL
;
405 struct ceph_pg_mapping
*pg
= NULL
;
408 dout("__insert_pg_mapping %llx %p\n", *(u64
*)&new->pgid
, new);
411 pg
= rb_entry(parent
, struct ceph_pg_mapping
, node
);
412 c
= pgid_cmp(new->pgid
, pg
->pgid
);
421 rb_link_node(&new->node
, parent
, p
);
422 rb_insert_color(&new->node
, root
);
426 static struct ceph_pg_mapping
*__lookup_pg_mapping(struct rb_root
*root
,
429 struct rb_node
*n
= root
->rb_node
;
430 struct ceph_pg_mapping
*pg
;
434 pg
= rb_entry(n
, struct ceph_pg_mapping
, node
);
435 c
= pgid_cmp(pgid
, pg
->pgid
);
441 dout("__lookup_pg_mapping %lld.%x got %p\n",
442 pgid
.pool
, pgid
.seed
, pg
);
449 static int __remove_pg_mapping(struct rb_root
*root
, struct ceph_pg pgid
)
451 struct ceph_pg_mapping
*pg
= __lookup_pg_mapping(root
, pgid
);
454 dout("__remove_pg_mapping %lld.%x %p\n", pgid
.pool
, pgid
.seed
,
456 rb_erase(&pg
->node
, root
);
460 dout("__remove_pg_mapping %lld.%x dne\n", pgid
.pool
, pgid
.seed
);
465 * rbtree of pg pool info
467 static int __insert_pg_pool(struct rb_root
*root
, struct ceph_pg_pool_info
*new)
469 struct rb_node
**p
= &root
->rb_node
;
470 struct rb_node
*parent
= NULL
;
471 struct ceph_pg_pool_info
*pi
= NULL
;
475 pi
= rb_entry(parent
, struct ceph_pg_pool_info
, node
);
476 if (new->id
< pi
->id
)
478 else if (new->id
> pi
->id
)
484 rb_link_node(&new->node
, parent
, p
);
485 rb_insert_color(&new->node
, root
);
489 static struct ceph_pg_pool_info
*__lookup_pg_pool(struct rb_root
*root
, u64 id
)
491 struct ceph_pg_pool_info
*pi
;
492 struct rb_node
*n
= root
->rb_node
;
495 pi
= rb_entry(n
, struct ceph_pg_pool_info
, node
);
498 else if (id
> pi
->id
)
506 struct ceph_pg_pool_info
*ceph_pg_pool_by_id(struct ceph_osdmap
*map
, u64 id
)
508 return __lookup_pg_pool(&map
->pg_pools
, id
);
511 const char *ceph_pg_pool_name_by_id(struct ceph_osdmap
*map
, u64 id
)
513 struct ceph_pg_pool_info
*pi
;
515 if (id
== CEPH_NOPOOL
)
518 if (WARN_ON_ONCE(id
> (u64
) INT_MAX
))
521 pi
= __lookup_pg_pool(&map
->pg_pools
, (int) id
);
523 return pi
? pi
->name
: NULL
;
525 EXPORT_SYMBOL(ceph_pg_pool_name_by_id
);
527 int ceph_pg_poolid_by_name(struct ceph_osdmap
*map
, const char *name
)
531 for (rbp
= rb_first(&map
->pg_pools
); rbp
; rbp
= rb_next(rbp
)) {
532 struct ceph_pg_pool_info
*pi
=
533 rb_entry(rbp
, struct ceph_pg_pool_info
, node
);
534 if (pi
->name
&& strcmp(pi
->name
, name
) == 0)
539 EXPORT_SYMBOL(ceph_pg_poolid_by_name
);
541 static void __remove_pg_pool(struct rb_root
*root
, struct ceph_pg_pool_info
*pi
)
543 rb_erase(&pi
->node
, root
);
548 static int decode_pool(void **p
, void *end
, struct ceph_pg_pool_info
*pi
)
554 ceph_decode_need(p
, end
, 2 + 4, bad
);
555 ev
= ceph_decode_8(p
); /* encoding version */
556 cv
= ceph_decode_8(p
); /* compat version */
558 pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev
, cv
);
562 pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev
, cv
);
565 len
= ceph_decode_32(p
);
566 ceph_decode_need(p
, end
, len
, bad
);
569 pi
->type
= ceph_decode_8(p
);
570 pi
->size
= ceph_decode_8(p
);
571 pi
->crush_ruleset
= ceph_decode_8(p
);
572 pi
->object_hash
= ceph_decode_8(p
);
574 pi
->pg_num
= ceph_decode_32(p
);
575 pi
->pgp_num
= ceph_decode_32(p
);
577 *p
+= 4 + 4; /* skip lpg* */
578 *p
+= 4; /* skip last_change */
579 *p
+= 8 + 4; /* skip snap_seq, snap_epoch */
582 num
= ceph_decode_32(p
);
584 *p
+= 8; /* snapid key */
585 *p
+= 1 + 1; /* versions */
586 len
= ceph_decode_32(p
);
590 /* skip removed_snaps */
591 num
= ceph_decode_32(p
);
594 *p
+= 8; /* skip auid */
595 pi
->flags
= ceph_decode_64(p
);
596 *p
+= 4; /* skip crash_replay_interval */
599 *p
+= 1; /* skip min_size */
602 *p
+= 8 + 8; /* skip quota_max_* */
606 num
= ceph_decode_32(p
);
609 *p
+= 8; /* skip tier_of */
610 *p
+= 1; /* skip cache_mode */
612 pi
->read_tier
= ceph_decode_64(p
);
613 pi
->write_tier
= ceph_decode_64(p
);
619 /* ignore the rest */
629 static int decode_pool_names(void **p
, void *end
, struct ceph_osdmap
*map
)
631 struct ceph_pg_pool_info
*pi
;
635 ceph_decode_32_safe(p
, end
, num
, bad
);
636 dout(" %d pool names\n", num
);
638 ceph_decode_64_safe(p
, end
, pool
, bad
);
639 ceph_decode_32_safe(p
, end
, len
, bad
);
640 dout(" pool %llu len %d\n", pool
, len
);
641 ceph_decode_need(p
, end
, len
, bad
);
642 pi
= __lookup_pg_pool(&map
->pg_pools
, pool
);
644 char *name
= kstrndup(*p
, len
, GFP_NOFS
);
650 dout(" name is %s\n", pi
->name
);
663 void ceph_osdmap_destroy(struct ceph_osdmap
*map
)
665 dout("osdmap_destroy %p\n", map
);
667 crush_destroy(map
->crush
);
668 while (!RB_EMPTY_ROOT(&map
->pg_temp
)) {
669 struct ceph_pg_mapping
*pg
=
670 rb_entry(rb_first(&map
->pg_temp
),
671 struct ceph_pg_mapping
, node
);
672 rb_erase(&pg
->node
, &map
->pg_temp
);
675 while (!RB_EMPTY_ROOT(&map
->primary_temp
)) {
676 struct ceph_pg_mapping
*pg
=
677 rb_entry(rb_first(&map
->primary_temp
),
678 struct ceph_pg_mapping
, node
);
679 rb_erase(&pg
->node
, &map
->primary_temp
);
682 while (!RB_EMPTY_ROOT(&map
->pg_pools
)) {
683 struct ceph_pg_pool_info
*pi
=
684 rb_entry(rb_first(&map
->pg_pools
),
685 struct ceph_pg_pool_info
, node
);
686 __remove_pg_pool(&map
->pg_pools
, pi
);
688 kfree(map
->osd_state
);
689 kfree(map
->osd_weight
);
690 kfree(map
->osd_addr
);
691 kfree(map
->osd_primary_affinity
);
696 * Adjust max_osd value, (re)allocate arrays.
698 * The new elements are properly initialized.
700 static int osdmap_set_max_osd(struct ceph_osdmap
*map
, int max
)
704 struct ceph_entity_addr
*addr
;
707 state
= krealloc(map
->osd_state
, max
*sizeof(*state
), GFP_NOFS
);
710 map
->osd_state
= state
;
712 weight
= krealloc(map
->osd_weight
, max
*sizeof(*weight
), GFP_NOFS
);
715 map
->osd_weight
= weight
;
717 addr
= krealloc(map
->osd_addr
, max
*sizeof(*addr
), GFP_NOFS
);
720 map
->osd_addr
= addr
;
722 for (i
= map
->max_osd
; i
< max
; i
++) {
723 map
->osd_state
[i
] = 0;
724 map
->osd_weight
[i
] = CEPH_OSD_OUT
;
725 memset(map
->osd_addr
+ i
, 0, sizeof(*map
->osd_addr
));
728 if (map
->osd_primary_affinity
) {
731 affinity
= krealloc(map
->osd_primary_affinity
,
732 max
*sizeof(*affinity
), GFP_NOFS
);
735 map
->osd_primary_affinity
= affinity
;
737 for (i
= map
->max_osd
; i
< max
; i
++)
738 map
->osd_primary_affinity
[i
] =
739 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
;
747 #define OSDMAP_WRAPPER_COMPAT_VER 7
748 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1
751 * Return 0 or error. On success, *v is set to 0 for old (v6) osdmaps,
752 * to struct_v of the client_data section for new (v7 and above)
755 static int get_osdmap_client_data_v(void **p
, void *end
,
756 const char *prefix
, u8
*v
)
760 ceph_decode_8_safe(p
, end
, struct_v
, e_inval
);
764 ceph_decode_8_safe(p
, end
, struct_compat
, e_inval
);
765 if (struct_compat
> OSDMAP_WRAPPER_COMPAT_VER
) {
766 pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
767 struct_v
, struct_compat
,
768 OSDMAP_WRAPPER_COMPAT_VER
, prefix
);
771 *p
+= 4; /* ignore wrapper struct_len */
773 ceph_decode_8_safe(p
, end
, struct_v
, e_inval
);
774 ceph_decode_8_safe(p
, end
, struct_compat
, e_inval
);
775 if (struct_compat
> OSDMAP_CLIENT_DATA_COMPAT_VER
) {
776 pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
777 struct_v
, struct_compat
,
778 OSDMAP_CLIENT_DATA_COMPAT_VER
, prefix
);
781 *p
+= 4; /* ignore client data struct_len */
786 ceph_decode_16_safe(p
, end
, version
, e_inval
);
788 pr_warn("got v %d < 6 of %s ceph_osdmap\n",
793 /* old osdmap enconding */
804 static int __decode_pools(void **p
, void *end
, struct ceph_osdmap
*map
,
809 ceph_decode_32_safe(p
, end
, n
, e_inval
);
811 struct ceph_pg_pool_info
*pi
;
815 ceph_decode_64_safe(p
, end
, pool
, e_inval
);
817 pi
= __lookup_pg_pool(&map
->pg_pools
, pool
);
818 if (!incremental
|| !pi
) {
819 pi
= kzalloc(sizeof(*pi
), GFP_NOFS
);
825 ret
= __insert_pg_pool(&map
->pg_pools
, pi
);
832 ret
= decode_pool(p
, end
, pi
);
843 static int decode_pools(void **p
, void *end
, struct ceph_osdmap
*map
)
845 return __decode_pools(p
, end
, map
, false);
848 static int decode_new_pools(void **p
, void *end
, struct ceph_osdmap
*map
)
850 return __decode_pools(p
, end
, map
, true);
853 static int __decode_pg_temp(void **p
, void *end
, struct ceph_osdmap
*map
,
858 ceph_decode_32_safe(p
, end
, n
, e_inval
);
864 ret
= ceph_decode_pgid(p
, end
, &pgid
);
868 ceph_decode_32_safe(p
, end
, len
, e_inval
);
870 ret
= __remove_pg_mapping(&map
->pg_temp
, pgid
);
871 BUG_ON(!incremental
&& ret
!= -ENOENT
);
873 if (!incremental
|| len
> 0) {
874 struct ceph_pg_mapping
*pg
;
876 ceph_decode_need(p
, end
, len
*sizeof(u32
), e_inval
);
878 if (len
> (UINT_MAX
- sizeof(*pg
)) / sizeof(u32
))
881 pg
= kzalloc(sizeof(*pg
) + len
*sizeof(u32
), GFP_NOFS
);
886 pg
->pg_temp
.len
= len
;
887 for (i
= 0; i
< len
; i
++)
888 pg
->pg_temp
.osds
[i
] = ceph_decode_32(p
);
890 ret
= __insert_pg_mapping(pg
, &map
->pg_temp
);
904 static int decode_pg_temp(void **p
, void *end
, struct ceph_osdmap
*map
)
906 return __decode_pg_temp(p
, end
, map
, false);
909 static int decode_new_pg_temp(void **p
, void *end
, struct ceph_osdmap
*map
)
911 return __decode_pg_temp(p
, end
, map
, true);
914 static int __decode_primary_temp(void **p
, void *end
, struct ceph_osdmap
*map
,
919 ceph_decode_32_safe(p
, end
, n
, e_inval
);
925 ret
= ceph_decode_pgid(p
, end
, &pgid
);
929 ceph_decode_32_safe(p
, end
, osd
, e_inval
);
931 ret
= __remove_pg_mapping(&map
->primary_temp
, pgid
);
932 BUG_ON(!incremental
&& ret
!= -ENOENT
);
934 if (!incremental
|| osd
!= (u32
)-1) {
935 struct ceph_pg_mapping
*pg
;
937 pg
= kzalloc(sizeof(*pg
), GFP_NOFS
);
942 pg
->primary_temp
.osd
= osd
;
944 ret
= __insert_pg_mapping(pg
, &map
->primary_temp
);
958 static int decode_primary_temp(void **p
, void *end
, struct ceph_osdmap
*map
)
960 return __decode_primary_temp(p
, end
, map
, false);
963 static int decode_new_primary_temp(void **p
, void *end
,
964 struct ceph_osdmap
*map
)
966 return __decode_primary_temp(p
, end
, map
, true);
969 u32
ceph_get_primary_affinity(struct ceph_osdmap
*map
, int osd
)
971 BUG_ON(osd
>= map
->max_osd
);
973 if (!map
->osd_primary_affinity
)
974 return CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
;
976 return map
->osd_primary_affinity
[osd
];
979 static int set_primary_affinity(struct ceph_osdmap
*map
, int osd
, u32 aff
)
981 BUG_ON(osd
>= map
->max_osd
);
983 if (!map
->osd_primary_affinity
) {
986 map
->osd_primary_affinity
= kmalloc(map
->max_osd
*sizeof(u32
),
988 if (!map
->osd_primary_affinity
)
991 for (i
= 0; i
< map
->max_osd
; i
++)
992 map
->osd_primary_affinity
[i
] =
993 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
;
996 map
->osd_primary_affinity
[osd
] = aff
;
1001 static int decode_primary_affinity(void **p
, void *end
,
1002 struct ceph_osdmap
*map
)
1006 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1008 kfree(map
->osd_primary_affinity
);
1009 map
->osd_primary_affinity
= NULL
;
1012 if (len
!= map
->max_osd
)
1015 ceph_decode_need(p
, end
, map
->max_osd
*sizeof(u32
), e_inval
);
1017 for (i
= 0; i
< map
->max_osd
; i
++) {
1020 ret
= set_primary_affinity(map
, i
, ceph_decode_32(p
));
1031 static int decode_new_primary_affinity(void **p
, void *end
,
1032 struct ceph_osdmap
*map
)
1036 ceph_decode_32_safe(p
, end
, n
, e_inval
);
1041 ceph_decode_32_safe(p
, end
, osd
, e_inval
);
1042 ceph_decode_32_safe(p
, end
, aff
, e_inval
);
1044 ret
= set_primary_affinity(map
, osd
, aff
);
1048 pr_info("osd%d primary-affinity 0x%x\n", osd
, aff
);
1058 * decode a full map.
1060 static int osdmap_decode(void **p
, void *end
, struct ceph_osdmap
*map
)
1069 dout("%s %p to %p len %d\n", __func__
, *p
, end
, (int)(end
- *p
));
1071 err
= get_osdmap_client_data_v(p
, end
, "full", &struct_v
);
1075 /* fsid, epoch, created, modified */
1076 ceph_decode_need(p
, end
, sizeof(map
->fsid
) + sizeof(u32
) +
1077 sizeof(map
->created
) + sizeof(map
->modified
), e_inval
);
1078 ceph_decode_copy(p
, &map
->fsid
, sizeof(map
->fsid
));
1079 epoch
= map
->epoch
= ceph_decode_32(p
);
1080 ceph_decode_copy(p
, &map
->created
, sizeof(map
->created
));
1081 ceph_decode_copy(p
, &map
->modified
, sizeof(map
->modified
));
1084 err
= decode_pools(p
, end
, map
);
1089 err
= decode_pool_names(p
, end
, map
);
1093 ceph_decode_32_safe(p
, end
, map
->pool_max
, e_inval
);
1095 ceph_decode_32_safe(p
, end
, map
->flags
, e_inval
);
1098 ceph_decode_32_safe(p
, end
, max
, e_inval
);
1100 /* (re)alloc osd arrays */
1101 err
= osdmap_set_max_osd(map
, max
);
1105 /* osd_state, osd_weight, osd_addrs->client_addr */
1106 ceph_decode_need(p
, end
, 3*sizeof(u32
) +
1107 map
->max_osd
*(1 + sizeof(*map
->osd_weight
) +
1108 sizeof(*map
->osd_addr
)), e_inval
);
1110 if (ceph_decode_32(p
) != map
->max_osd
)
1113 ceph_decode_copy(p
, map
->osd_state
, map
->max_osd
);
1115 if (ceph_decode_32(p
) != map
->max_osd
)
1118 for (i
= 0; i
< map
->max_osd
; i
++)
1119 map
->osd_weight
[i
] = ceph_decode_32(p
);
1121 if (ceph_decode_32(p
) != map
->max_osd
)
1124 ceph_decode_copy(p
, map
->osd_addr
, map
->max_osd
*sizeof(*map
->osd_addr
));
1125 for (i
= 0; i
< map
->max_osd
; i
++)
1126 ceph_decode_addr(&map
->osd_addr
[i
]);
1129 err
= decode_pg_temp(p
, end
, map
);
1134 if (struct_v
>= 1) {
1135 err
= decode_primary_temp(p
, end
, map
);
1140 /* primary_affinity */
1141 if (struct_v
>= 2) {
1142 err
= decode_primary_affinity(p
, end
, map
);
1146 /* XXX can this happen? */
1147 kfree(map
->osd_primary_affinity
);
1148 map
->osd_primary_affinity
= NULL
;
1152 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1153 map
->crush
= crush_decode(*p
, min(*p
+ len
, end
));
1154 if (IS_ERR(map
->crush
)) {
1155 err
= PTR_ERR(map
->crush
);
1161 /* ignore the rest */
1164 dout("full osdmap epoch %d max_osd %d\n", map
->epoch
, map
->max_osd
);
1170 pr_err("corrupt full osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1171 err
, epoch
, (int)(*p
- start
), *p
, start
, end
);
1172 print_hex_dump(KERN_DEBUG
, "osdmap: ",
1173 DUMP_PREFIX_OFFSET
, 16, 1,
1174 start
, end
- start
, true);
1179 * Allocate and decode a full map.
1181 struct ceph_osdmap
*ceph_osdmap_decode(void **p
, void *end
)
1183 struct ceph_osdmap
*map
;
1186 map
= kzalloc(sizeof(*map
), GFP_NOFS
);
1188 return ERR_PTR(-ENOMEM
);
1190 map
->pg_temp
= RB_ROOT
;
1191 map
->primary_temp
= RB_ROOT
;
1192 mutex_init(&map
->crush_scratch_mutex
);
1194 ret
= osdmap_decode(p
, end
, map
);
1196 ceph_osdmap_destroy(map
);
1197 return ERR_PTR(ret
);
1204 * decode and apply an incremental map update.
1206 struct ceph_osdmap
*osdmap_apply_incremental(void **p
, void *end
,
1207 struct ceph_osdmap
*map
)
1209 struct crush_map
*newcrush
= NULL
;
1210 struct ceph_fsid fsid
;
1212 struct ceph_timespec modified
;
1216 __s32 new_flags
, max
;
1221 dout("%s %p to %p len %d\n", __func__
, *p
, end
, (int)(end
- *p
));
1223 err
= get_osdmap_client_data_v(p
, end
, "inc", &struct_v
);
1227 /* fsid, epoch, modified, new_pool_max, new_flags */
1228 ceph_decode_need(p
, end
, sizeof(fsid
) + sizeof(u32
) + sizeof(modified
) +
1229 sizeof(u64
) + sizeof(u32
), e_inval
);
1230 ceph_decode_copy(p
, &fsid
, sizeof(fsid
));
1231 epoch
= ceph_decode_32(p
);
1232 BUG_ON(epoch
!= map
->epoch
+1);
1233 ceph_decode_copy(p
, &modified
, sizeof(modified
));
1234 new_pool_max
= ceph_decode_64(p
);
1235 new_flags
= ceph_decode_32(p
);
1238 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1240 dout("apply_incremental full map len %d, %p to %p\n",
1242 return ceph_osdmap_decode(p
, min(*p
+len
, end
));
1246 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1248 newcrush
= crush_decode(*p
, min(*p
+len
, end
));
1249 if (IS_ERR(newcrush
)) {
1250 err
= PTR_ERR(newcrush
);
1259 map
->flags
= new_flags
;
1260 if (new_pool_max
>= 0)
1261 map
->pool_max
= new_pool_max
;
1264 ceph_decode_32_safe(p
, end
, max
, e_inval
);
1266 err
= osdmap_set_max_osd(map
, max
);
1272 map
->modified
= modified
;
1275 crush_destroy(map
->crush
);
1276 map
->crush
= newcrush
;
1281 err
= decode_new_pools(p
, end
, map
);
1285 /* new_pool_names */
1286 err
= decode_pool_names(p
, end
, map
);
1291 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1293 struct ceph_pg_pool_info
*pi
;
1295 ceph_decode_64_safe(p
, end
, pool
, e_inval
);
1296 pi
= __lookup_pg_pool(&map
->pg_pools
, pool
);
1298 __remove_pg_pool(&map
->pg_pools
, pi
);
1302 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1305 struct ceph_entity_addr addr
;
1306 ceph_decode_32_safe(p
, end
, osd
, e_inval
);
1307 ceph_decode_copy_safe(p
, end
, &addr
, sizeof(addr
), e_inval
);
1308 ceph_decode_addr(&addr
);
1309 pr_info("osd%d up\n", osd
);
1310 BUG_ON(osd
>= map
->max_osd
);
1311 map
->osd_state
[osd
] |= CEPH_OSD_UP
| CEPH_OSD_EXISTS
;
1312 map
->osd_addr
[osd
] = addr
;
1316 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1320 ceph_decode_32_safe(p
, end
, osd
, e_inval
);
1321 xorstate
= **(u8
**)p
;
1322 (*p
)++; /* clean flag */
1324 xorstate
= CEPH_OSD_UP
;
1325 if (xorstate
& CEPH_OSD_UP
)
1326 pr_info("osd%d down\n", osd
);
1327 if (osd
< map
->max_osd
)
1328 map
->osd_state
[osd
] ^= xorstate
;
1332 ceph_decode_32_safe(p
, end
, len
, e_inval
);
1335 ceph_decode_need(p
, end
, sizeof(u32
)*2, e_inval
);
1336 osd
= ceph_decode_32(p
);
1337 off
= ceph_decode_32(p
);
1338 pr_info("osd%d weight 0x%x %s\n", osd
, off
,
1339 off
== CEPH_OSD_IN
? "(in)" :
1340 (off
== CEPH_OSD_OUT
? "(out)" : ""));
1341 if (osd
< map
->max_osd
)
1342 map
->osd_weight
[osd
] = off
;
1346 err
= decode_new_pg_temp(p
, end
, map
);
1350 /* new_primary_temp */
1351 if (struct_v
>= 1) {
1352 err
= decode_new_primary_temp(p
, end
, map
);
1357 /* new_primary_affinity */
1358 if (struct_v
>= 2) {
1359 err
= decode_new_primary_affinity(p
, end
, map
);
1364 /* ignore the rest */
1367 dout("inc osdmap epoch %d max_osd %d\n", map
->epoch
, map
->max_osd
);
1373 pr_err("corrupt inc osdmap (%d) epoch %d off %d (%p of %p-%p)\n",
1374 err
, epoch
, (int)(*p
- start
), *p
, start
, end
);
1375 print_hex_dump(KERN_DEBUG
, "osdmap: ",
1376 DUMP_PREFIX_OFFSET
, 16, 1,
1377 start
, end
- start
, true);
1379 crush_destroy(newcrush
);
1380 return ERR_PTR(err
);
1383 void ceph_oid_copy(struct ceph_object_id
*dest
,
1384 const struct ceph_object_id
*src
)
1386 WARN_ON(!ceph_oid_empty(dest
));
1388 if (src
->name
!= src
->inline_name
) {
1389 /* very rare, see ceph_object_id definition */
1390 dest
->name
= kmalloc(src
->name_len
+ 1,
1391 GFP_NOIO
| __GFP_NOFAIL
);
1394 memcpy(dest
->name
, src
->name
, src
->name_len
+ 1);
1395 dest
->name_len
= src
->name_len
;
1397 EXPORT_SYMBOL(ceph_oid_copy
);
1399 static __printf(2, 0)
1400 int oid_printf_vargs(struct ceph_object_id
*oid
, const char *fmt
, va_list ap
)
1404 WARN_ON(!ceph_oid_empty(oid
));
1406 len
= vsnprintf(oid
->inline_name
, sizeof(oid
->inline_name
), fmt
, ap
);
1407 if (len
>= sizeof(oid
->inline_name
))
1410 oid
->name_len
= len
;
1415 * If oid doesn't fit into inline buffer, BUG.
1417 void ceph_oid_printf(struct ceph_object_id
*oid
, const char *fmt
, ...)
1422 BUG_ON(oid_printf_vargs(oid
, fmt
, ap
));
1425 EXPORT_SYMBOL(ceph_oid_printf
);
1427 static __printf(3, 0)
1428 int oid_aprintf_vargs(struct ceph_object_id
*oid
, gfp_t gfp
,
1429 const char *fmt
, va_list ap
)
1435 len
= oid_printf_vargs(oid
, fmt
, aq
);
1439 char *external_name
;
1441 external_name
= kmalloc(len
+ 1, gfp
);
1445 oid
->name
= external_name
;
1446 WARN_ON(vsnprintf(oid
->name
, len
+ 1, fmt
, ap
) != len
);
1447 oid
->name_len
= len
;
1454 * If oid doesn't fit into inline buffer, allocate.
1456 int ceph_oid_aprintf(struct ceph_object_id
*oid
, gfp_t gfp
,
1457 const char *fmt
, ...)
1463 ret
= oid_aprintf_vargs(oid
, gfp
, fmt
, ap
);
1468 EXPORT_SYMBOL(ceph_oid_aprintf
);
1470 void ceph_oid_destroy(struct ceph_object_id
*oid
)
1472 if (oid
->name
!= oid
->inline_name
)
1475 EXPORT_SYMBOL(ceph_oid_destroy
);
1477 static bool osds_valid(const struct ceph_osds
*set
)
1480 if (set
->size
> 0 && set
->primary
>= 0)
1483 /* empty can_shift_osds set */
1484 if (!set
->size
&& set
->primary
== -1)
1487 /* empty !can_shift_osds set - all NONE */
1488 if (set
->size
> 0 && set
->primary
== -1) {
1491 for (i
= 0; i
< set
->size
; i
++) {
1492 if (set
->osds
[i
] != CRUSH_ITEM_NONE
)
1502 void ceph_osds_copy(struct ceph_osds
*dest
, const struct ceph_osds
*src
)
1504 memcpy(dest
->osds
, src
->osds
, src
->size
* sizeof(src
->osds
[0]));
1505 dest
->size
= src
->size
;
1506 dest
->primary
= src
->primary
;
1510 * calculate file layout from given offset, length.
1511 * fill in correct oid, logical length, and object extent
1514 * for now, we write only a single su, until we can
1515 * pass a stride back to the caller.
1517 int ceph_calc_file_object_mapping(struct ceph_file_layout
*layout
,
1520 u64
*oxoff
, u64
*oxlen
)
1522 u32 osize
= le32_to_cpu(layout
->fl_object_size
);
1523 u32 su
= le32_to_cpu(layout
->fl_stripe_unit
);
1524 u32 sc
= le32_to_cpu(layout
->fl_stripe_count
);
1525 u32 bl
, stripeno
, stripepos
, objsetno
;
1529 dout("mapping %llu~%llu osize %u fl_su %u\n", off
, len
,
1531 if (su
== 0 || sc
== 0)
1533 su_per_object
= osize
/ su
;
1534 if (su_per_object
== 0)
1536 dout("osize %u / su %u = su_per_object %u\n", osize
, su
,
1539 if ((su
& ~PAGE_MASK
) != 0)
1542 /* bl = *off / su; */
1546 dout("off %llu / su %u = bl %u\n", off
, su
, bl
);
1549 stripepos
= bl
% sc
;
1550 objsetno
= stripeno
/ su_per_object
;
1552 *ono
= objsetno
* sc
+ stripepos
;
1553 dout("objset %u * sc %u = ono %u\n", objsetno
, sc
, (unsigned int)*ono
);
1555 /* *oxoff = *off % layout->fl_stripe_unit; # offset in su */
1557 su_offset
= do_div(t
, su
);
1558 *oxoff
= su_offset
+ (stripeno
% su_per_object
) * su
;
1561 * Calculate the length of the extent being written to the selected
1562 * object. This is the minimum of the full length requested (len) or
1563 * the remainder of the current stripe being written to.
1565 *oxlen
= min_t(u64
, len
, su
- su_offset
);
1567 dout(" obj extent %llu~%llu\n", *oxoff
, *oxlen
);
1571 dout(" invalid layout\n");
1577 EXPORT_SYMBOL(ceph_calc_file_object_mapping
);
1580 * Map an object into a PG.
1582 * Should only be called with target_oid and target_oloc (as opposed to
1583 * base_oid and base_oloc), since tiering isn't taken into account.
1585 int ceph_object_locator_to_pg(struct ceph_osdmap
*osdmap
,
1586 struct ceph_object_id
*oid
,
1587 struct ceph_object_locator
*oloc
,
1588 struct ceph_pg
*raw_pgid
)
1590 struct ceph_pg_pool_info
*pi
;
1592 pi
= ceph_pg_pool_by_id(osdmap
, oloc
->pool
);
1596 raw_pgid
->pool
= oloc
->pool
;
1597 raw_pgid
->seed
= ceph_str_hash(pi
->object_hash
, oid
->name
,
1600 dout("%s %*pE -> raw_pgid %llu.%x\n", __func__
, oid
->name_len
,
1601 oid
->name
, raw_pgid
->pool
, raw_pgid
->seed
);
1604 EXPORT_SYMBOL(ceph_object_locator_to_pg
);
1607 * Map a raw PG (full precision ps) into an actual PG.
1609 static void raw_pg_to_pg(struct ceph_pg_pool_info
*pi
,
1610 const struct ceph_pg
*raw_pgid
,
1611 struct ceph_pg
*pgid
)
1613 pgid
->pool
= raw_pgid
->pool
;
1614 pgid
->seed
= ceph_stable_mod(raw_pgid
->seed
, pi
->pg_num
,
1619 * Map a raw PG (full precision ps) into a placement ps (placement
1620 * seed). Include pool id in that value so that different pools don't
1621 * use the same seeds.
1623 static u32
raw_pg_to_pps(struct ceph_pg_pool_info
*pi
,
1624 const struct ceph_pg
*raw_pgid
)
1626 if (pi
->flags
& CEPH_POOL_FLAG_HASHPSPOOL
) {
1627 /* hash pool id and seed so that pool PGs do not overlap */
1628 return crush_hash32_2(CRUSH_HASH_RJENKINS1
,
1629 ceph_stable_mod(raw_pgid
->seed
,
1635 * legacy behavior: add ps and pool together. this is
1636 * not a great approach because the PGs from each pool
1637 * will overlap on top of each other: 0.5 == 1.4 ==
1640 return ceph_stable_mod(raw_pgid
->seed
, pi
->pgp_num
,
1642 (unsigned)raw_pgid
->pool
;
1646 static int do_crush(struct ceph_osdmap
*map
, int ruleno
, int x
,
1647 int *result
, int result_max
,
1648 const __u32
*weight
, int weight_max
)
1652 BUG_ON(result_max
> CEPH_PG_MAX_SIZE
);
1654 mutex_lock(&map
->crush_scratch_mutex
);
1655 r
= crush_do_rule(map
->crush
, ruleno
, x
, result
, result_max
,
1656 weight
, weight_max
, map
->crush_scratch_ary
);
1657 mutex_unlock(&map
->crush_scratch_mutex
);
1663 * Calculate raw set (CRUSH output) for given PG. The result may
1664 * contain nonexistent OSDs. ->primary is undefined for a raw set.
1666 * Placement seed (CRUSH input) is returned through @ppps.
1668 static void pg_to_raw_osds(struct ceph_osdmap
*osdmap
,
1669 struct ceph_pg_pool_info
*pi
,
1670 const struct ceph_pg
*raw_pgid
,
1671 struct ceph_osds
*raw
,
1674 u32 pps
= raw_pg_to_pps(pi
, raw_pgid
);
1678 ceph_osds_init(raw
);
1682 ruleno
= crush_find_rule(osdmap
->crush
, pi
->crush_ruleset
, pi
->type
,
1685 pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
1686 pi
->id
, pi
->crush_ruleset
, pi
->type
, pi
->size
);
1690 len
= do_crush(osdmap
, ruleno
, pps
, raw
->osds
,
1691 min_t(int, pi
->size
, ARRAY_SIZE(raw
->osds
)),
1692 osdmap
->osd_weight
, osdmap
->max_osd
);
1694 pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
1695 len
, ruleno
, pi
->id
, pi
->crush_ruleset
, pi
->type
,
1704 * Given raw set, calculate up set and up primary. By definition of an
1705 * up set, the result won't contain nonexistent or down OSDs.
1707 * This is done in-place - on return @set is the up set. If it's
1708 * empty, ->primary will remain undefined.
1710 static void raw_to_up_osds(struct ceph_osdmap
*osdmap
,
1711 struct ceph_pg_pool_info
*pi
,
1712 struct ceph_osds
*set
)
1716 /* ->primary is undefined for a raw set */
1717 BUG_ON(set
->primary
!= -1);
1719 if (ceph_can_shift_osds(pi
)) {
1723 for (i
= 0; i
< set
->size
; i
++) {
1724 if (ceph_osd_is_down(osdmap
, set
->osds
[i
])) {
1729 set
->osds
[i
- removed
] = set
->osds
[i
];
1731 set
->size
-= removed
;
1733 set
->primary
= set
->osds
[0];
1735 /* set down/dne devices to NONE */
1736 for (i
= set
->size
- 1; i
>= 0; i
--) {
1737 if (ceph_osd_is_down(osdmap
, set
->osds
[i
]))
1738 set
->osds
[i
] = CRUSH_ITEM_NONE
;
1740 set
->primary
= set
->osds
[i
];
1745 static void apply_primary_affinity(struct ceph_osdmap
*osdmap
,
1746 struct ceph_pg_pool_info
*pi
,
1748 struct ceph_osds
*up
)
1754 * Do we have any non-default primary_affinity values for these
1757 if (!osdmap
->osd_primary_affinity
)
1760 for (i
= 0; i
< up
->size
; i
++) {
1761 int osd
= up
->osds
[i
];
1763 if (osd
!= CRUSH_ITEM_NONE
&&
1764 osdmap
->osd_primary_affinity
[osd
] !=
1765 CEPH_OSD_DEFAULT_PRIMARY_AFFINITY
) {
1773 * Pick the primary. Feed both the seed (for the pg) and the
1774 * osd into the hash/rng so that a proportional fraction of an
1775 * osd's pgs get rejected as primary.
1777 for (i
= 0; i
< up
->size
; i
++) {
1778 int osd
= up
->osds
[i
];
1781 if (osd
== CRUSH_ITEM_NONE
)
1784 aff
= osdmap
->osd_primary_affinity
[osd
];
1785 if (aff
< CEPH_OSD_MAX_PRIMARY_AFFINITY
&&
1786 (crush_hash32_2(CRUSH_HASH_RJENKINS1
,
1787 pps
, osd
) >> 16) >= aff
) {
1789 * We chose not to use this primary. Note it
1790 * anyway as a fallback in case we don't pick
1791 * anyone else, but keep looking.
1803 up
->primary
= up
->osds
[pos
];
1805 if (ceph_can_shift_osds(pi
) && pos
> 0) {
1806 /* move the new primary to the front */
1807 for (i
= pos
; i
> 0; i
--)
1808 up
->osds
[i
] = up
->osds
[i
- 1];
1809 up
->osds
[0] = up
->primary
;
1814 * Get pg_temp and primary_temp mappings for given PG.
1816 * Note that a PG may have none, only pg_temp, only primary_temp or
1817 * both pg_temp and primary_temp mappings. This means @temp isn't
1818 * always a valid OSD set on return: in the "only primary_temp" case,
1819 * @temp will have its ->primary >= 0 but ->size == 0.
1821 static void get_temp_osds(struct ceph_osdmap
*osdmap
,
1822 struct ceph_pg_pool_info
*pi
,
1823 const struct ceph_pg
*raw_pgid
,
1824 struct ceph_osds
*temp
)
1826 struct ceph_pg pgid
;
1827 struct ceph_pg_mapping
*pg
;
1830 raw_pg_to_pg(pi
, raw_pgid
, &pgid
);
1831 ceph_osds_init(temp
);
1834 pg
= __lookup_pg_mapping(&osdmap
->pg_temp
, pgid
);
1836 for (i
= 0; i
< pg
->pg_temp
.len
; i
++) {
1837 if (ceph_osd_is_down(osdmap
, pg
->pg_temp
.osds
[i
])) {
1838 if (ceph_can_shift_osds(pi
))
1841 temp
->osds
[temp
->size
++] = CRUSH_ITEM_NONE
;
1843 temp
->osds
[temp
->size
++] = pg
->pg_temp
.osds
[i
];
1847 /* apply pg_temp's primary */
1848 for (i
= 0; i
< temp
->size
; i
++) {
1849 if (temp
->osds
[i
] != CRUSH_ITEM_NONE
) {
1850 temp
->primary
= temp
->osds
[i
];
1857 pg
= __lookup_pg_mapping(&osdmap
->primary_temp
, pgid
);
1859 temp
->primary
= pg
->primary_temp
.osd
;
1863 * Map a PG to its acting set as well as its up set.
1865 * Acting set is used for data mapping purposes, while up set can be
1866 * recorded for detecting interval changes and deciding whether to
1869 void ceph_pg_to_up_acting_osds(struct ceph_osdmap
*osdmap
,
1870 const struct ceph_pg
*raw_pgid
,
1871 struct ceph_osds
*up
,
1872 struct ceph_osds
*acting
)
1874 struct ceph_pg_pool_info
*pi
;
1877 pi
= ceph_pg_pool_by_id(osdmap
, raw_pgid
->pool
);
1880 ceph_osds_init(acting
);
1884 pg_to_raw_osds(osdmap
, pi
, raw_pgid
, up
, &pps
);
1885 raw_to_up_osds(osdmap
, pi
, up
);
1886 apply_primary_affinity(osdmap
, pi
, pps
, up
);
1887 get_temp_osds(osdmap
, pi
, raw_pgid
, acting
);
1888 if (!acting
->size
) {
1889 memcpy(acting
->osds
, up
->osds
, up
->size
* sizeof(up
->osds
[0]));
1890 acting
->size
= up
->size
;
1891 if (acting
->primary
== -1)
1892 acting
->primary
= up
->primary
;
1895 WARN_ON(!osds_valid(up
) || !osds_valid(acting
));
1899 * Return primary osd for given pgid, or -1 if none.
1901 int ceph_calc_pg_primary(struct ceph_osdmap
*osdmap
, struct ceph_pg pgid
)
1903 struct ceph_osds up
, acting
;
1905 ceph_pg_to_up_acting_osds(osdmap
, &pgid
, &up
, &acting
);
1906 return acting
.primary
;
1908 EXPORT_SYMBOL(ceph_calc_pg_primary
);