1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
7 * Copyright (C) 2014 Red Hat <contact@redhat.com>
9 * Author: Loic Dachary <loic@dachary.org>
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; either
14 * version 2.1 of the License, or (at your option) any later version.
21 #include "include/str_map.h"
22 #include "common/debug.h"
23 #include "crush/CrushWrapper.h"
24 #include "osd/osd_types.h"
25 #include "include/stringify.h"
26 #include "erasure-code/ErasureCodePlugin.h"
27 #include "json_spirit/json_spirit_writer.h"
29 #include "ErasureCodeLrc.h"
31 // re-include our assert to clobber boost's
32 #include "include/assert.h"
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_osd
37 #define dout_prefix _prefix(_dout)
41 static ostream
& _prefix(std::ostream
* _dout
)
43 return *_dout
<< "ErasureCodeLrc: ";
46 int ErasureCodeLrc::create_ruleset(const string
&name
,
50 if (crush
.rule_exists(name
)) {
51 *ss
<< "rule " << name
<< " exists";
54 if (!crush
.name_exists(ruleset_root
)) {
55 *ss
<< "root item " << ruleset_root
<< " does not exist";
58 int root
= crush
.get_item_id(ruleset_root
);
62 for (rno
= 0; rno
< crush
.get_max_rules(); rno
++) {
63 if (!crush
.rule_exists(rno
) && !crush
.ruleset_exists(rno
))
68 int steps
= 4 + ruleset_steps
.size();
70 int max_rep
= get_chunk_count();
72 ret
= crush
.add_rule(steps
, ruleset
, pg_pool_t::TYPE_ERASURE
,
73 min_rep
, max_rep
, rno
);
77 ret
= crush
.set_rule_step(rno
, step
++, CRUSH_RULE_SET_CHOOSELEAF_TRIES
, 5, 0);
79 ret
= crush
.set_rule_step(rno
, step
++, CRUSH_RULE_SET_CHOOSE_TRIES
, 100, 0);
81 ret
= crush
.set_rule_step(rno
, step
++, CRUSH_RULE_TAKE
, root
, 0);
83 // [ [ "choose", "rack", 2 ],
84 // [ "chooseleaf", "host", 5 ] ]
85 for (vector
<Step
>::const_iterator i
= ruleset_steps
.begin();
86 i
!= ruleset_steps
.end();
88 int op
= i
->op
== "chooseleaf" ?
89 CRUSH_RULE_CHOOSELEAF_INDEP
: CRUSH_RULE_CHOOSE_INDEP
;
90 int type
= crush
.get_type_id(i
->type
);
92 *ss
<< "unknown crush type " << i
->type
;
95 ret
= crush
.set_rule_step(rno
, step
++, op
, i
->n
, type
);
98 ret
= crush
.set_rule_step(rno
, step
++, CRUSH_RULE_EMIT
, 0, 0);
100 crush
.set_rule_name(rno
, name
);
104 int ErasureCodeLrc::layers_description(const ErasureCodeProfile
&profile
,
105 json_spirit::mArray
*description
,
108 if (profile
.count("layers") == 0) {
109 *ss
<< "could not find 'layers' in " << profile
<< std::endl
;
110 return ERROR_LRC_DESCRIPTION
;
112 string str
= profile
.find("layers")->second
;
114 json_spirit::mValue json
;
115 json_spirit::read_or_throw(str
, json
);
117 if (json
.type() != json_spirit::array_type
) {
118 *ss
<< "layers='" << str
119 << "' must be a JSON array but is of type "
120 << json
.type() << " instead" << std::endl
;
121 return ERROR_LRC_ARRAY
;
123 *description
= json
.get_array();
124 } catch (json_spirit::Error_position
&e
) {
125 *ss
<< "failed to parse layers='" << str
<< "'"
126 << " at line " << e
.line_
<< ", column " << e
.column_
127 << " : " << e
.reason_
<< std::endl
;
128 return ERROR_LRC_PARSE_JSON
;
133 int ErasureCodeLrc::layers_parse(string description_string
,
134 json_spirit::mArray description
,
138 for (vector
<json_spirit::mValue
>::iterator i
= description
.begin();
139 i
!= description
.end();
141 if (i
->type() != json_spirit::array_type
) {
142 stringstream json_string
;
143 json_spirit::write(*i
, json_string
);
144 *ss
<< "each element of the array "
145 << description_string
<< " must be a JSON array but "
146 << json_string
.str() << " at position " << position
147 << " is of type " << i
->type() << " instead" << std::endl
;
148 return ERROR_LRC_ARRAY
;
150 json_spirit::mArray layer_json
= i
->get_array();
151 ErasureCodeProfile profile
;
153 for (vector
<json_spirit::mValue
>::iterator j
= layer_json
.begin();
154 j
!= layer_json
.end();
157 if (j
->type() != json_spirit::str_type
) {
158 stringstream element
;
159 json_spirit::write(*j
, element
);
160 *ss
<< "the first element of the entry "
161 << element
.str() << " (first is zero) "
162 << position
<< " in " << description_string
163 << " is of type " << (*j
).type() << " instead of string" << std::endl
;
164 return ERROR_LRC_STR
;
166 layers
.push_back(Layer(j
->get_str()));
167 Layer
&layer
= layers
.back();
168 layer
.chunks_map
= j
->get_str();
169 } else if(index
== 1) {
170 Layer
&layer
= layers
.back();
171 if (j
->type() != json_spirit::str_type
&&
172 j
->type() != json_spirit::obj_type
) {
173 stringstream element
;
174 json_spirit::write(*j
, element
);
175 *ss
<< "the second element of the entry "
176 << element
.str() << " (first is zero) "
177 << position
<< " in " << description_string
178 << " is of type " << (*j
).type() << " instead of string or object"
180 return ERROR_LRC_CONFIG_OPTIONS
;
182 if (j
->type() == json_spirit::str_type
) {
183 int err
= get_json_str_map(j
->get_str(), *ss
, &layer
.profile
);
186 } else if (j
->type() == json_spirit::obj_type
) {
187 json_spirit::mObject o
= j
->get_obj();
189 for (map
<string
, json_spirit::mValue
>::iterator i
= o
.begin();
192 layer
.profile
[i
->first
] = i
->second
.get_str();
196 // ignore trailing elements
203 int ErasureCodeLrc::layers_init(ostream
*ss
)
205 ErasureCodePluginRegistry
®istry
= ErasureCodePluginRegistry::instance();
206 for (unsigned int i
= 0; i
< layers
.size(); i
++) {
207 Layer
&layer
= layers
[i
];
209 for(std::string::iterator it
= layer
.chunks_map
.begin();
210 it
!= layer
.chunks_map
.end();
213 layer
.data
.push_back(position
);
215 layer
.coding
.push_back(position
);
216 if (*it
== 'c' || *it
== 'D')
217 layer
.chunks_as_set
.insert(position
);
220 layer
.chunks
= layer
.data
;
221 layer
.chunks
.insert(layer
.chunks
.end(),
222 layer
.coding
.begin(), layer
.coding
.end());
223 if (layer
.profile
.find("k") == layer
.profile
.end())
224 layer
.profile
["k"] = stringify(layer
.data
.size());
225 if (layer
.profile
.find("m") == layer
.profile
.end())
226 layer
.profile
["m"] = stringify(layer
.coding
.size());
227 if (layer
.profile
.find("plugin") == layer
.profile
.end())
228 layer
.profile
["plugin"] = "jerasure";
229 if (layer
.profile
.find("technique") == layer
.profile
.end())
230 layer
.profile
["technique"] = "reed_sol_van";
231 int err
= registry
.factory(layer
.profile
["plugin"],
242 int ErasureCodeLrc::layers_sanity_checks(string description_string
,
247 if (layers
.size() < 1) {
248 *ss
<< "layers parameter has " << layers
.size()
249 << " which is less than the minimum of one. "
250 << description_string
<< std::endl
;
251 return ERROR_LRC_LAYERS_COUNT
;
253 for (vector
<Layer
>::const_iterator layer
= layers
.begin();
254 layer
!= layers
.end();
256 if (chunk_count
!= layer
->chunks_map
.length()) {
257 *ss
<< "the first element of the array at position "
258 << position
<< " (starting from zero) "
259 << " is the string '" << layer
->chunks_map
260 << " found in the layers parameter "
261 << description_string
<< ". It is expected to be "
262 << chunk_count
<< " characters long but is "
263 << layer
->chunks_map
.length() << " characters long instead "
265 return ERROR_LRC_MAPPING_SIZE
;
271 int ErasureCodeLrc::parse(ErasureCodeProfile
&profile
,
274 int r
= ErasureCode::parse(profile
, ss
);
278 return parse_ruleset(profile
, ss
);
281 const string
ErasureCodeLrc::DEFAULT_KML("-1");
283 int ErasureCodeLrc::parse_kml(ErasureCodeProfile
&profile
,
286 int err
= ErasureCode::parse(profile
, ss
);
287 const int DEFAULT_INT
= -1;
289 err
|= to_int("k", profile
, &k
, DEFAULT_KML
, ss
);
290 err
|= to_int("m", profile
, &m
, DEFAULT_KML
, ss
);
291 err
|= to_int("l", profile
, &l
, DEFAULT_KML
, ss
);
293 if (k
== DEFAULT_INT
&& m
== DEFAULT_INT
&& l
== DEFAULT_INT
)
296 if ((k
!= DEFAULT_INT
|| m
!= DEFAULT_INT
|| l
!= DEFAULT_INT
) &&
297 (k
== DEFAULT_INT
|| m
== DEFAULT_INT
|| l
== DEFAULT_INT
)) {
298 *ss
<< "All of k, m, l must be set or none of them in "
299 << profile
<< std::endl
;
300 return ERROR_LRC_ALL_OR_NOTHING
;
303 const char *generated
[] = { "mapping",
307 for (int i
= 0; i
< 3; i
++) {
308 if (profile
.count(generated
[i
])) {
309 *ss
<< "The " << generated
[i
] << " parameter cannot be set "
310 << "when k, m, l are set in " << profile
<< std::endl
;
311 return ERROR_LRC_GENERATED
;
316 *ss
<< "k + m must be a multiple of l in "
317 << profile
<< std::endl
;
318 return ERROR_LRC_K_M_MODULO
;
321 int local_group_count
= (k
+ m
) / l
;
323 if (k
% local_group_count
) {
324 *ss
<< "k must be a multiple of (k + m) / l in "
325 << profile
<< std::endl
;
326 return ERROR_LRC_K_MODULO
;
329 if (m
% local_group_count
) {
330 *ss
<< "m must be a multiple of (k + m) / l in "
331 << profile
<< std::endl
;
332 return ERROR_LRC_M_MODULO
;
336 for (int i
= 0; i
< local_group_count
; i
++) {
337 mapping
+= string(k
/ local_group_count
, 'D') +
338 string(m
/ local_group_count
, '_') + "_";
340 profile
["mapping"] = mapping
;
342 string layers
= "[ ";
346 for (int i
= 0; i
< local_group_count
; i
++) {
347 layers
+= string(k
/ local_group_count
, 'D') +
348 string(m
/ local_group_count
, 'c') + "_";
350 layers
+= "\", \"\" ],";
353 for (int i
= 0; i
< local_group_count
; i
++) {
355 for (int j
= 0; j
< local_group_count
; j
++) {
357 layers
+= string(l
, 'D') + "c";
359 layers
+= string(l
+ 1, '_');
361 layers
+= "\", \"\" ],";
363 profile
["layers"] = layers
+ "]";
365 ErasureCodeProfile::const_iterator parameter
;
366 string ruleset_locality
;
367 parameter
= profile
.find("ruleset-locality");
368 if (parameter
!= profile
.end())
369 ruleset_locality
= parameter
->second
;
370 string ruleset_failure_domain
= "host";
371 parameter
= profile
.find("ruleset-failure-domain");
372 if (parameter
!= profile
.end())
373 ruleset_failure_domain
= parameter
->second
;
375 if (ruleset_locality
!= "") {
376 ruleset_steps
.clear();
377 ruleset_steps
.push_back(Step("choose", ruleset_locality
,
379 ruleset_steps
.push_back(Step("chooseleaf", ruleset_failure_domain
,
381 } else if (ruleset_failure_domain
!= "") {
382 ruleset_steps
.clear();
383 ruleset_steps
.push_back(Step("chooseleaf", ruleset_failure_domain
, 0));
389 int ErasureCodeLrc::parse_ruleset(ErasureCodeProfile
&profile
,
393 err
|= to_string("ruleset-root", profile
,
397 if (profile
.count("ruleset-steps") != 0) {
398 ruleset_steps
.clear();
399 string str
= profile
.find("ruleset-steps")->second
;
400 json_spirit::mArray description
;
402 json_spirit::mValue json
;
403 json_spirit::read_or_throw(str
, json
);
405 if (json
.type() != json_spirit::array_type
) {
406 *ss
<< "ruleset-steps='" << str
407 << "' must be a JSON array but is of type "
408 << json
.type() << " instead" << std::endl
;
409 return ERROR_LRC_ARRAY
;
411 description
= json
.get_array();
412 } catch (json_spirit::Error_position
&e
) {
413 *ss
<< "failed to parse ruleset-steps='" << str
<< "'"
414 << " at line " << e
.line_
<< ", column " << e
.column_
415 << " : " << e
.reason_
<< std::endl
;
416 return ERROR_LRC_PARSE_JSON
;
420 for (vector
<json_spirit::mValue
>::iterator i
= description
.begin();
421 i
!= description
.end();
423 if (i
->type() != json_spirit::array_type
) {
424 stringstream json_string
;
425 json_spirit::write(*i
, json_string
);
426 *ss
<< "element of the array "
427 << str
<< " must be a JSON array but "
428 << json_string
.str() << " at position " << position
429 << " is of type " << i
->type() << " instead" << std::endl
;
430 return ERROR_LRC_ARRAY
;
432 int r
= parse_ruleset_step(str
, i
->get_array(), ss
);
440 int ErasureCodeLrc::parse_ruleset_step(string description_string
,
441 json_spirit::mArray description
,
444 stringstream json_string
;
445 json_spirit::write(description
, json_string
);
450 for (vector
<json_spirit::mValue
>::iterator i
= description
.begin();
451 i
!= description
.end();
453 if ((position
== 0 || position
== 1) &&
454 i
->type() != json_spirit::str_type
) {
455 *ss
<< "element " << position
<< " of the array "
456 << json_string
.str() << " found in " << description_string
457 << " must be a JSON string but is of type "
458 << i
->type() << " instead" << std::endl
;
459 return position
== 0 ? ERROR_LRC_RULESET_OP
: ERROR_LRC_RULESET_TYPE
;
461 if (position
== 2 && i
->type() != json_spirit::int_type
) {
462 *ss
<< "element " << position
<< " of the array "
463 << json_string
.str() << " found in " << description_string
464 << " must be a JSON int but is of type "
465 << i
->type() << " instead" << std::endl
;
466 return ERROR_LRC_RULESET_N
;
476 ruleset_steps
.push_back(Step(op
, type
, n
));
480 int ErasureCodeLrc::init(ErasureCodeProfile
&profile
,
485 r
= parse_kml(profile
, ss
);
489 r
= parse(profile
, ss
);
493 json_spirit::mArray description
;
494 r
= layers_description(profile
, &description
, ss
);
498 string description_string
= profile
.find("layers")->second
;
500 dout(10) << "init(" << description_string
<< ")" << dendl
;
502 r
= layers_parse(description_string
, description
, ss
);
510 if (profile
.count("mapping") == 0) {
511 *ss
<< "the 'mapping' profile is missing from " << profile
;
512 return ERROR_LRC_MAPPING
;
514 string mapping
= profile
.find("mapping")->second
;
515 data_chunk_count
= 0;
516 for(std::string::iterator it
= mapping
.begin(); it
!= mapping
.end(); ++it
) {
520 chunk_count
= mapping
.length();
522 r
= layers_sanity_checks(description_string
, ss
);
527 // When initialized with kml, the profile parameters
528 // that were generated should not be stored because
529 // they would otherwise be exposed to the caller.
531 if (profile
.find("l") != profile
.end() &&
532 profile
.find("l")->second
!= DEFAULT_KML
) {
533 profile
.erase("mapping");
534 profile
.erase("layers");
536 ErasureCode::init(profile
, ss
);
540 set
<int> ErasureCodeLrc::get_erasures(const set
<int> &want
,
541 const set
<int> &available
) const
544 set_difference(want
.begin(), want
.end(),
545 available
.begin(), available
.end(),
546 inserter(result
, result
.end()));
550 unsigned int ErasureCodeLrc::get_chunk_size(unsigned int object_size
) const
552 return layers
.front().erasure_code
->get_chunk_size(object_size
);
555 void p(const set
<int> &s
) { cerr
<< s
; } // for gdb
557 int ErasureCodeLrc::minimum_to_decode(const set
<int> &want_to_read
,
558 const set
<int> &available_chunks
,
561 dout(20) << __func__
<< " want_to_read " << want_to_read
562 << " available_chunks " << available_chunks
<< dendl
;
564 set
<int> erasures_total
;
565 set
<int> erasures_not_recovered
;
566 set
<int> erasures_want
;
567 for (unsigned int i
= 0; i
< get_chunk_count(); ++i
) {
568 if (available_chunks
.count(i
) == 0) {
569 erasures_total
.insert(i
);
570 erasures_not_recovered
.insert(i
);
571 if (want_to_read
.count(i
) != 0)
572 erasures_want
.insert(i
);
579 // When no chunk is missing there is no need to read more than what
582 if (erasures_want
.empty()) {
583 *minimum
= want_to_read
;
584 dout(20) << __func__
<< " minimum == want_to_read == "
585 << want_to_read
<< dendl
;
592 // Try to recover erasures with as few chunks as possible.
594 for (vector
<Layer
>::reverse_iterator i
= layers
.rbegin();
598 // If this layer has no chunk that we want, skip it.
601 set_intersection(want_to_read
.begin(), want_to_read
.end(),
602 i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
603 inserter(layer_want
, layer_want
.end()));
604 if (layer_want
.empty())
607 // Are some of the chunks we want missing ?
609 set
<int> layer_erasures
;
610 set_intersection(layer_want
.begin(), layer_want
.end(),
611 erasures_want
.begin(), erasures_want
.end(),
612 inserter(layer_erasures
, layer_erasures
.end()));
613 set
<int> layer_minimum
;
614 if (layer_erasures
.empty()) {
616 // The chunks we want are available, this is the minimum we need
619 layer_minimum
= layer_want
;
622 set_intersection(i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
623 erasures_not_recovered
.begin(), erasures_not_recovered
.end(),
624 inserter(erasures
, erasures
.end()));
626 if (erasures
.size() > i
->erasure_code
->get_coding_chunk_count()) {
628 // There are too many erasures for this layer to recover: skip
629 // it and hope that an upper layer will be do better.
634 // Get all available chunks in that layer to recover the
637 set_difference(i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
638 erasures_not_recovered
.begin(), erasures_not_recovered
.end(),
639 inserter(layer_minimum
, layer_minimum
.end()));
641 // Chunks recovered by this layer are removed from the list of
642 // erasures so that upper levels do not attempt to recover
645 for (set
<int>::const_iterator j
= erasures
.begin();
648 erasures_not_recovered
.erase(*j
);
649 if (erasures_want
.count(*j
))
650 erasures_want
.erase(*j
);
654 minimum
->insert(layer_minimum
.begin(), layer_minimum
.end());
656 if (erasures_want
.empty()) {
657 minimum
->insert(want_to_read
.begin(), want_to_read
.end());
658 for (set
<int>::const_iterator i
= erasures_total
.begin();
659 i
!= erasures_total
.end();
661 if (minimum
->count(*i
))
664 dout(20) << __func__
<< " minimum = " << *minimum
<< dendl
;
673 // The previous strategy failed to recover from all erasures.
675 // Try to recover as many chunks as possible, even from layers
676 // that do not contain chunks that we want, in the hope that it
677 // will help the upper layers.
679 set
<int> erasures_total
;
680 for (unsigned int i
= 0; i
< get_chunk_count(); ++i
) {
681 if (available_chunks
.count(i
) == 0)
682 erasures_total
.insert(i
);
685 for (vector
<Layer
>::reverse_iterator i
= layers
.rbegin();
688 set
<int> layer_erasures
;
689 set_intersection(i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
690 erasures_total
.begin(), erasures_total
.end(),
691 inserter(layer_erasures
, layer_erasures
.end()));
693 // If this layer has no erasure, skip it
695 if (layer_erasures
.empty())
698 if (layer_erasures
.size() > 0 &&
699 layer_erasures
.size() <= i
->erasure_code
->get_coding_chunk_count()) {
701 // chunks recovered by this layer are removed from the list of
702 // erasures so that upper levels know they can rely on their
705 for (set
<int>::const_iterator j
= layer_erasures
.begin();
706 j
!= layer_erasures
.end();
708 erasures_total
.erase(*j
);
712 if (erasures_total
.empty()) {
714 // Do not try to be smart about what chunks are necessary to
715 // recover, use all available chunks.
717 *minimum
= available_chunks
;
718 dout(20) << __func__
<< " minimum == available_chunks == "
719 << available_chunks
<< dendl
;
724 derr
<< __func__
<< " not enough chunks in " << available_chunks
725 << " to read " << want_to_read
<< dendl
;
729 int ErasureCodeLrc::encode_chunks(const set
<int> &want_to_encode
,
730 map
<int, bufferlist
> *encoded
)
732 unsigned int top
= layers
.size();
733 for (vector
<Layer
>::reverse_iterator i
= layers
.rbegin();
737 if (includes(i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
738 want_to_encode
.begin(), want_to_encode
.end()))
742 for (unsigned int i
= top
; i
< layers
.size(); ++i
) {
743 const Layer
&layer
= layers
[i
];
744 set
<int> layer_want_to_encode
;
745 map
<int, bufferlist
> layer_encoded
;
747 for (vector
<int>::const_iterator c
= layer
.chunks
.begin();
748 c
!= layer
.chunks
.end();
750 layer_encoded
[j
] = (*encoded
)[*c
];
751 if (want_to_encode
.find(*c
) != want_to_encode
.end())
752 layer_want_to_encode
.insert(j
);
755 int err
= layer
.erasure_code
->encode_chunks(layer_want_to_encode
,
758 derr
<< __func__
<< " layer " << layer
.chunks_map
759 << " failed with " << err
<< " trying to encode "
760 << layer_want_to_encode
<< dendl
;
767 int ErasureCodeLrc::decode_chunks(const set
<int> &want_to_read
,
768 const map
<int, bufferlist
> &chunks
,
769 map
<int, bufferlist
> *decoded
)
771 set
<int> available_chunks
;
773 for (unsigned int i
= 0; i
< get_chunk_count(); ++i
) {
774 if (chunks
.count(i
) != 0)
775 available_chunks
.insert(i
);
780 set
<int> want_to_read_erasures
;
782 for (vector
<Layer
>::reverse_iterator layer
= layers
.rbegin();
783 layer
!= layers
.rend();
785 set
<int> layer_erasures
;
786 set_intersection(layer
->chunks_as_set
.begin(), layer
->chunks_as_set
.end(),
787 erasures
.begin(), erasures
.end(),
788 inserter(layer_erasures
, layer_erasures
.end()));
790 if (layer_erasures
.size() >
791 layer
->erasure_code
->get_coding_chunk_count()) {
792 // skip because there are too many erasures for this layer to recover
793 } else if(layer_erasures
.size() == 0) {
794 // skip because all chunks are already available
796 set
<int> layer_want_to_read
;
797 map
<int, bufferlist
> layer_chunks
;
798 map
<int, bufferlist
> layer_decoded
;
800 for (vector
<int>::const_iterator c
= layer
->chunks
.begin();
801 c
!= layer
->chunks
.end();
804 // Pick chunks from *decoded* instead of *chunks* to re-use
805 // chunks recovered by previous layers. In other words
806 // *chunks* does not change but *decoded* gradually improves
807 // as more layers recover from erasures.
809 if (erasures
.count(*c
) == 0)
810 layer_chunks
[j
] = (*decoded
)[*c
];
811 if (want_to_read
.count(*c
) != 0)
812 layer_want_to_read
.insert(j
);
813 layer_decoded
[j
] = (*decoded
)[*c
];
816 int err
= layer
->erasure_code
->decode_chunks(layer_want_to_read
,
820 derr
<< __func__
<< " layer " << layer
->chunks_map
821 << " failed with " << err
<< " trying to decode "
822 << layer_want_to_read
<< " with " << available_chunks
<< dendl
;
826 for (vector
<int>::const_iterator c
= layer
->chunks
.begin();
827 c
!= layer
->chunks
.end();
829 (*decoded
)[*c
] = layer_decoded
[j
];
831 if (erasures
.count(*c
) != 0)
834 want_to_read_erasures
.clear();
835 set_intersection(erasures
.begin(), erasures
.end(),
836 want_to_read
.begin(), want_to_read
.end(),
837 inserter(want_to_read_erasures
, want_to_read_erasures
.end()));
838 if (want_to_read_erasures
.size() == 0)
843 if (want_to_read_erasures
.size() > 0) {
844 derr
<< __func__
<< " want to read " << want_to_read
845 << " with available_chunks = " << available_chunks
846 << " end up being unable to read " << want_to_read_erasures
<< dendl
;