1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Cloudwatt <libre.licensing@cloudwatt.com>
7 * Copyright (C) 2014 Red Hat <contact@redhat.com>
9 * Author: Loic Dachary <loic@dachary.org>
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; either
14 * version 2.1 of the License, or (at your option) any later version.
21 #include "include/str_map.h"
22 #include "common/debug.h"
23 #include "crush/CrushWrapper.h"
24 #include "osd/osd_types.h"
25 #include "include/stringify.h"
26 #include "erasure-code/ErasureCodePlugin.h"
27 #include "json_spirit/json_spirit_writer.h"
29 #include "ErasureCodeLrc.h"
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_osd
34 #define dout_prefix _prefix(_dout)
39 static ostream
& _prefix(std::ostream
* _dout
)
41 return *_dout
<< "ErasureCodeLrc: ";
44 int ErasureCodeLrc::create_rule(const string
&name
,
48 if (crush
.rule_exists(name
)) {
49 *ss
<< "rule " << name
<< " exists";
52 if (!crush
.name_exists(rule_root
)) {
53 *ss
<< "root item " << rule_root
<< " does not exist";
56 int root
= crush
.get_item_id(rule_root
);
57 if (rule_device_class
.size()) {
58 if (!crush
.class_exists(rule_device_class
)) {
59 *ss
<< "device class " << rule_device_class
<< " does not exist";
62 int c
= crush
.get_class_id(rule_device_class
);
63 if (crush
.class_bucket
.count(root
) == 0 ||
64 crush
.class_bucket
[root
].count(c
) == 0) {
65 *ss
<< "root item " << rule_root
<< " has no devices with class "
69 root
= crush
.class_bucket
[root
][c
];
73 for (rno
= 0; rno
< crush
.get_max_rules(); rno
++) {
74 if (!crush
.rule_exists(rno
))
78 int steps
= 4 + rule_steps
.size();
80 ret
= crush
.add_rule(rno
, steps
, pg_pool_t::TYPE_ERASURE
);
81 ceph_assert(ret
== rno
);
84 ret
= crush
.set_rule_step(rno
, step
++, CRUSH_RULE_SET_CHOOSELEAF_TRIES
, 5, 0);
85 ceph_assert(ret
== 0);
86 ret
= crush
.set_rule_step(rno
, step
++, CRUSH_RULE_SET_CHOOSE_TRIES
, 100, 0);
87 ceph_assert(ret
== 0);
88 ret
= crush
.set_rule_step(rno
, step
++, CRUSH_RULE_TAKE
, root
, 0);
89 ceph_assert(ret
== 0);
90 // [ [ "choose", "rack", 2 ],
91 // [ "chooseleaf", "host", 5 ] ]
92 for (vector
<Step
>::const_iterator i
= rule_steps
.begin();
93 i
!= rule_steps
.end();
95 int op
= i
->op
== "chooseleaf" ?
96 CRUSH_RULE_CHOOSELEAF_INDEP
: CRUSH_RULE_CHOOSE_INDEP
;
97 int type
= crush
.get_type_id(i
->type
);
99 *ss
<< "unknown crush type " << i
->type
;
102 ret
= crush
.set_rule_step(rno
, step
++, op
, i
->n
, type
);
103 ceph_assert(ret
== 0);
105 ret
= crush
.set_rule_step(rno
, step
++, CRUSH_RULE_EMIT
, 0, 0);
106 ceph_assert(ret
== 0);
107 crush
.set_rule_name(rno
, name
);
111 int ErasureCodeLrc::layers_description(const ErasureCodeProfile
&profile
,
112 json_spirit::mArray
*description
,
115 if (profile
.count("layers") == 0) {
116 *ss
<< "could not find 'layers' in " << profile
<< std::endl
;
117 return ERROR_LRC_DESCRIPTION
;
119 string str
= profile
.find("layers")->second
;
121 json_spirit::mValue json
;
122 json_spirit::read_or_throw(str
, json
);
124 if (json
.type() != json_spirit::array_type
) {
125 *ss
<< "layers='" << str
126 << "' must be a JSON array but is of type "
127 << json
.type() << " instead" << std::endl
;
128 return ERROR_LRC_ARRAY
;
130 *description
= json
.get_array();
131 } catch (json_spirit::Error_position
&e
) {
132 *ss
<< "failed to parse layers='" << str
<< "'"
133 << " at line " << e
.line_
<< ", column " << e
.column_
134 << " : " << e
.reason_
<< std::endl
;
135 return ERROR_LRC_PARSE_JSON
;
140 int ErasureCodeLrc::layers_parse(const string
&description_string
,
141 json_spirit::mArray description
,
145 for (vector
<json_spirit::mValue
>::iterator i
= description
.begin();
146 i
!= description
.end();
148 if (i
->type() != json_spirit::array_type
) {
149 stringstream json_string
;
150 json_spirit::write(*i
, json_string
);
151 *ss
<< "each element of the array "
152 << description_string
<< " must be a JSON array but "
153 << json_string
.str() << " at position " << position
154 << " is of type " << i
->type() << " instead" << std::endl
;
155 return ERROR_LRC_ARRAY
;
157 json_spirit::mArray layer_json
= i
->get_array();
158 ErasureCodeProfile profile
;
160 for (vector
<json_spirit::mValue
>::iterator j
= layer_json
.begin();
161 j
!= layer_json
.end();
164 if (j
->type() != json_spirit::str_type
) {
165 stringstream element
;
166 json_spirit::write(*j
, element
);
167 *ss
<< "the first element of the entry "
168 << element
.str() << " (first is zero) "
169 << position
<< " in " << description_string
170 << " is of type " << (*j
).type() << " instead of string" << std::endl
;
171 return ERROR_LRC_STR
;
173 layers
.push_back(Layer(j
->get_str()));
174 Layer
&layer
= layers
.back();
175 layer
.chunks_map
= j
->get_str();
176 } else if(index
== 1) {
177 Layer
&layer
= layers
.back();
178 if (j
->type() != json_spirit::str_type
&&
179 j
->type() != json_spirit::obj_type
) {
180 stringstream element
;
181 json_spirit::write(*j
, element
);
182 *ss
<< "the second element of the entry "
183 << element
.str() << " (first is zero) "
184 << position
<< " in " << description_string
185 << " is of type " << (*j
).type() << " instead of string or object"
187 return ERROR_LRC_CONFIG_OPTIONS
;
189 if (j
->type() == json_spirit::str_type
) {
190 int err
= get_json_str_map(j
->get_str(), *ss
, &layer
.profile
);
193 } else if (j
->type() == json_spirit::obj_type
) {
194 json_spirit::mObject o
= j
->get_obj();
196 for (map
<string
, json_spirit::mValue
>::iterator i
= o
.begin();
199 layer
.profile
[i
->first
] = i
->second
.get_str();
203 // ignore trailing elements
210 int ErasureCodeLrc::layers_init(ostream
*ss
)
212 ErasureCodePluginRegistry
®istry
= ErasureCodePluginRegistry::instance();
213 for (unsigned int i
= 0; i
< layers
.size(); i
++) {
214 Layer
&layer
= layers
[i
];
216 for(std::string::iterator it
= layer
.chunks_map
.begin();
217 it
!= layer
.chunks_map
.end();
220 layer
.data
.push_back(position
);
222 layer
.coding
.push_back(position
);
223 if (*it
== 'c' || *it
== 'D')
224 layer
.chunks_as_set
.insert(position
);
227 layer
.chunks
= layer
.data
;
228 layer
.chunks
.insert(layer
.chunks
.end(),
229 layer
.coding
.begin(), layer
.coding
.end());
230 if (layer
.profile
.find("k") == layer
.profile
.end())
231 layer
.profile
["k"] = stringify(layer
.data
.size());
232 if (layer
.profile
.find("m") == layer
.profile
.end())
233 layer
.profile
["m"] = stringify(layer
.coding
.size());
234 if (layer
.profile
.find("plugin") == layer
.profile
.end())
235 layer
.profile
["plugin"] = "jerasure";
236 if (layer
.profile
.find("technique") == layer
.profile
.end())
237 layer
.profile
["technique"] = "reed_sol_van";
238 int err
= registry
.factory(layer
.profile
["plugin"],
249 int ErasureCodeLrc::layers_sanity_checks(const string
&description_string
,
254 if (layers
.size() < 1) {
255 *ss
<< "layers parameter has " << layers
.size()
256 << " which is less than the minimum of one. "
257 << description_string
<< std::endl
;
258 return ERROR_LRC_LAYERS_COUNT
;
260 for (vector
<Layer
>::const_iterator layer
= layers
.begin();
261 layer
!= layers
.end();
263 if (chunk_count
!= layer
->chunks_map
.length()) {
264 *ss
<< "the first element of the array at position "
265 << position
<< " (starting from zero) "
266 << " is the string '" << layer
->chunks_map
267 << " found in the layers parameter "
268 << description_string
<< ". It is expected to be "
269 << chunk_count
<< " characters long but is "
270 << layer
->chunks_map
.length() << " characters long instead "
272 return ERROR_LRC_MAPPING_SIZE
;
278 int ErasureCodeLrc::parse(ErasureCodeProfile
&profile
,
281 int r
= ErasureCode::parse(profile
, ss
);
285 return parse_rule(profile
, ss
);
288 const string
ErasureCodeLrc::DEFAULT_KML("-1");
290 int ErasureCodeLrc::parse_kml(ErasureCodeProfile
&profile
,
293 int err
= ErasureCode::parse(profile
, ss
);
294 const int DEFAULT_INT
= -1;
296 err
|= to_int("k", profile
, &k
, DEFAULT_KML
, ss
);
297 err
|= to_int("m", profile
, &m
, DEFAULT_KML
, ss
);
298 err
|= to_int("l", profile
, &l
, DEFAULT_KML
, ss
);
300 if (k
== DEFAULT_INT
&& m
== DEFAULT_INT
&& l
== DEFAULT_INT
)
303 if ((k
!= DEFAULT_INT
|| m
!= DEFAULT_INT
|| l
!= DEFAULT_INT
) &&
304 (k
== DEFAULT_INT
|| m
== DEFAULT_INT
|| l
== DEFAULT_INT
)) {
305 *ss
<< "All of k, m, l must be set or none of them in "
306 << profile
<< std::endl
;
307 return ERROR_LRC_ALL_OR_NOTHING
;
310 const char *generated
[] = { "mapping",
314 for (int i
= 0; i
< 3; i
++) {
315 if (profile
.count(generated
[i
])) {
316 *ss
<< "The " << generated
[i
] << " parameter cannot be set "
317 << "when k, m, l are set in " << profile
<< std::endl
;
318 return ERROR_LRC_GENERATED
;
322 if (l
== 0 || (k
+ m
) % l
) {
323 *ss
<< "k + m must be a multiple of l in "
324 << profile
<< std::endl
;
325 return ERROR_LRC_K_M_MODULO
;
328 int local_group_count
= (k
+ m
) / l
;
330 if (k
% local_group_count
) {
331 *ss
<< "k must be a multiple of (k + m) / l in "
332 << profile
<< std::endl
;
333 return ERROR_LRC_K_MODULO
;
336 if (m
% local_group_count
) {
337 *ss
<< "m must be a multiple of (k + m) / l in "
338 << profile
<< std::endl
;
339 return ERROR_LRC_M_MODULO
;
343 for (int i
= 0; i
< local_group_count
; i
++) {
344 mapping
+= string(k
/ local_group_count
, 'D') +
345 string(m
/ local_group_count
, '_') + "_";
347 profile
["mapping"] = mapping
;
349 string layers
= "[ ";
353 for (int i
= 0; i
< local_group_count
; i
++) {
354 layers
+= string(k
/ local_group_count
, 'D') +
355 string(m
/ local_group_count
, 'c') + "_";
357 layers
+= "\", \"\" ],";
360 for (int i
= 0; i
< local_group_count
; i
++) {
362 for (int j
= 0; j
< local_group_count
; j
++) {
364 layers
+= string(l
, 'D') + "c";
366 layers
+= string(l
+ 1, '_');
368 layers
+= "\", \"\" ],";
370 profile
["layers"] = layers
+ "]";
372 ErasureCodeProfile::const_iterator parameter
;
373 string rule_locality
;
374 parameter
= profile
.find("crush-locality");
375 if (parameter
!= profile
.end())
376 rule_locality
= parameter
->second
;
377 string rule_failure_domain
= "host";
378 parameter
= profile
.find("crush-failure-domain");
379 if (parameter
!= profile
.end())
380 rule_failure_domain
= parameter
->second
;
382 if (rule_locality
!= "") {
384 rule_steps
.push_back(Step("choose", rule_locality
,
386 rule_steps
.push_back(Step("chooseleaf", rule_failure_domain
,
388 } else if (rule_failure_domain
!= "") {
390 rule_steps
.push_back(Step("chooseleaf", rule_failure_domain
, 0));
396 int ErasureCodeLrc::parse_rule(ErasureCodeProfile
&profile
,
400 err
|= to_string("crush-root", profile
,
403 err
|= to_string("crush-device-class", profile
,
409 if (profile
.count("crush-steps") != 0) {
411 string str
= profile
.find("crush-steps")->second
;
412 json_spirit::mArray description
;
414 json_spirit::mValue json
;
415 json_spirit::read_or_throw(str
, json
);
417 if (json
.type() != json_spirit::array_type
) {
418 *ss
<< "crush-steps='" << str
419 << "' must be a JSON array but is of type "
420 << json
.type() << " instead" << std::endl
;
421 return ERROR_LRC_ARRAY
;
423 description
= json
.get_array();
424 } catch (json_spirit::Error_position
&e
) {
425 *ss
<< "failed to parse crush-steps='" << str
<< "'"
426 << " at line " << e
.line_
<< ", column " << e
.column_
427 << " : " << e
.reason_
<< std::endl
;
428 return ERROR_LRC_PARSE_JSON
;
432 for (vector
<json_spirit::mValue
>::iterator i
= description
.begin();
433 i
!= description
.end();
435 if (i
->type() != json_spirit::array_type
) {
436 stringstream json_string
;
437 json_spirit::write(*i
, json_string
);
438 *ss
<< "element of the array "
439 << str
<< " must be a JSON array but "
440 << json_string
.str() << " at position " << position
441 << " is of type " << i
->type() << " instead" << std::endl
;
442 return ERROR_LRC_ARRAY
;
444 int r
= parse_rule_step(str
, i
->get_array(), ss
);
452 int ErasureCodeLrc::parse_rule_step(const string
&description_string
,
453 json_spirit::mArray description
,
456 stringstream json_string
;
457 json_spirit::write(description
, json_string
);
462 for (vector
<json_spirit::mValue
>::iterator i
= description
.begin();
463 i
!= description
.end();
465 if ((position
== 0 || position
== 1) &&
466 i
->type() != json_spirit::str_type
) {
467 *ss
<< "element " << position
<< " of the array "
468 << json_string
.str() << " found in " << description_string
469 << " must be a JSON string but is of type "
470 << i
->type() << " instead" << std::endl
;
471 return position
== 0 ? ERROR_LRC_RULE_OP
: ERROR_LRC_RULE_TYPE
;
473 if (position
== 2 && i
->type() != json_spirit::int_type
) {
474 *ss
<< "element " << position
<< " of the array "
475 << json_string
.str() << " found in " << description_string
476 << " must be a JSON int but is of type "
477 << i
->type() << " instead" << std::endl
;
478 return ERROR_LRC_RULE_N
;
488 rule_steps
.push_back(Step(op
, type
, n
));
492 int ErasureCodeLrc::init(ErasureCodeProfile
&profile
,
497 r
= parse_kml(profile
, ss
);
501 r
= parse(profile
, ss
);
505 json_spirit::mArray description
;
506 r
= layers_description(profile
, &description
, ss
);
510 string description_string
= profile
.find("layers")->second
;
512 dout(10) << "init(" << description_string
<< ")" << dendl
;
514 r
= layers_parse(description_string
, description
, ss
);
522 if (profile
.count("mapping") == 0) {
523 *ss
<< "the 'mapping' profile is missing from " << profile
;
524 return ERROR_LRC_MAPPING
;
526 string mapping
= profile
.find("mapping")->second
;
527 data_chunk_count
= count(begin(mapping
), end(mapping
), 'D');
528 chunk_count
= mapping
.length();
530 r
= layers_sanity_checks(description_string
, ss
);
535 // When initialized with kml, the profile parameters
536 // that were generated should not be stored because
537 // they would otherwise be exposed to the caller.
539 if (profile
.find("l") != profile
.end() &&
540 profile
.find("l")->second
!= DEFAULT_KML
) {
541 profile
.erase("mapping");
542 profile
.erase("layers");
544 ErasureCode::init(profile
, ss
);
548 set
<int> ErasureCodeLrc::get_erasures(const set
<int> &want
,
549 const set
<int> &available
) const
552 set_difference(want
.begin(), want
.end(),
553 available
.begin(), available
.end(),
554 inserter(result
, result
.end()));
558 unsigned int ErasureCodeLrc::get_chunk_size(unsigned int object_size
) const
560 return layers
.front().erasure_code
->get_chunk_size(object_size
);
563 void p(const set
<int> &s
) { cerr
<< s
; } // for gdb
565 int ErasureCodeLrc::_minimum_to_decode(const set
<int> &want_to_read
,
566 const set
<int> &available_chunks
,
569 dout(20) << __func__
<< " want_to_read " << want_to_read
570 << " available_chunks " << available_chunks
<< dendl
;
572 set
<int> erasures_total
;
573 set
<int> erasures_not_recovered
;
574 set
<int> erasures_want
;
575 for (unsigned int i
= 0; i
< get_chunk_count(); ++i
) {
576 if (available_chunks
.count(i
) == 0) {
577 erasures_total
.insert(i
);
578 erasures_not_recovered
.insert(i
);
579 if (want_to_read
.count(i
) != 0)
580 erasures_want
.insert(i
);
587 // When no chunk is missing there is no need to read more than what
590 if (erasures_want
.empty()) {
591 *minimum
= want_to_read
;
592 dout(20) << __func__
<< " minimum == want_to_read == "
593 << want_to_read
<< dendl
;
600 // Try to recover erasures with as few chunks as possible.
602 for (vector
<Layer
>::reverse_iterator i
= layers
.rbegin();
606 // If this layer has no chunk that we want, skip it.
609 set_intersection(want_to_read
.begin(), want_to_read
.end(),
610 i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
611 inserter(layer_want
, layer_want
.end()));
612 if (layer_want
.empty())
615 // Are some of the chunks we want missing ?
617 set
<int> layer_erasures
;
618 set_intersection(layer_want
.begin(), layer_want
.end(),
619 erasures_want
.begin(), erasures_want
.end(),
620 inserter(layer_erasures
, layer_erasures
.end()));
621 set
<int> layer_minimum
;
622 if (layer_erasures
.empty()) {
624 // The chunks we want are available, this is the minimum we need
627 layer_minimum
= layer_want
;
630 set_intersection(i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
631 erasures_not_recovered
.begin(), erasures_not_recovered
.end(),
632 inserter(erasures
, erasures
.end()));
634 if (erasures
.size() > i
->erasure_code
->get_coding_chunk_count()) {
636 // There are too many erasures for this layer to recover: skip
637 // it and hope that an upper layer will be do better.
642 // Get all available chunks in that layer to recover the
645 set_difference(i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
646 erasures_not_recovered
.begin(), erasures_not_recovered
.end(),
647 inserter(layer_minimum
, layer_minimum
.end()));
649 // Chunks recovered by this layer are removed from the list of
650 // erasures so that upper levels do not attempt to recover
653 for (set
<int>::const_iterator j
= erasures
.begin();
656 erasures_not_recovered
.erase(*j
);
657 erasures_want
.erase(*j
);
661 minimum
->insert(layer_minimum
.begin(), layer_minimum
.end());
663 if (erasures_want
.empty()) {
664 minimum
->insert(want_to_read
.begin(), want_to_read
.end());
665 for (set
<int>::const_iterator i
= erasures_total
.begin();
666 i
!= erasures_total
.end();
668 if (minimum
->count(*i
))
671 dout(20) << __func__
<< " minimum = " << *minimum
<< dendl
;
680 // The previous strategy failed to recover from all erasures.
682 // Try to recover as many chunks as possible, even from layers
683 // that do not contain chunks that we want, in the hope that it
684 // will help the upper layers.
686 set
<int> erasures_total
;
687 for (unsigned int i
= 0; i
< get_chunk_count(); ++i
) {
688 if (available_chunks
.count(i
) == 0)
689 erasures_total
.insert(i
);
692 for (vector
<Layer
>::reverse_iterator i
= layers
.rbegin();
695 set
<int> layer_erasures
;
696 set_intersection(i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
697 erasures_total
.begin(), erasures_total
.end(),
698 inserter(layer_erasures
, layer_erasures
.end()));
700 // If this layer has no erasure, skip it
702 if (layer_erasures
.empty())
705 if (layer_erasures
.size() > 0 &&
706 layer_erasures
.size() <= i
->erasure_code
->get_coding_chunk_count()) {
708 // chunks recovered by this layer are removed from the list of
709 // erasures so that upper levels know they can rely on their
712 for (set
<int>::const_iterator j
= layer_erasures
.begin();
713 j
!= layer_erasures
.end();
715 erasures_total
.erase(*j
);
719 if (erasures_total
.empty()) {
721 // Do not try to be smart about what chunks are necessary to
722 // recover, use all available chunks.
724 *minimum
= available_chunks
;
725 dout(20) << __func__
<< " minimum == available_chunks == "
726 << available_chunks
<< dendl
;
731 derr
<< __func__
<< " not enough chunks in " << available_chunks
732 << " to read " << want_to_read
<< dendl
;
736 int ErasureCodeLrc::encode_chunks(const set
<int> &want_to_encode
,
737 map
<int, bufferlist
> *encoded
)
739 unsigned int top
= layers
.size();
740 for (vector
<Layer
>::reverse_iterator i
= layers
.rbegin();
744 if (includes(i
->chunks_as_set
.begin(), i
->chunks_as_set
.end(),
745 want_to_encode
.begin(), want_to_encode
.end()))
749 for (unsigned int i
= top
; i
< layers
.size(); ++i
) {
750 const Layer
&layer
= layers
[i
];
751 set
<int> layer_want_to_encode
;
752 map
<int, bufferlist
> layer_encoded
;
754 for (const auto& c
: layer
.chunks
) {
755 std::swap(layer_encoded
[j
], (*encoded
)[c
]);
756 if (want_to_encode
.find(c
) != want_to_encode
.end())
757 layer_want_to_encode
.insert(j
);
760 int err
= layer
.erasure_code
->encode_chunks(layer_want_to_encode
,
763 for (const auto& c
: layer
.chunks
) {
764 std::swap(layer_encoded
[j
++], (*encoded
)[c
]);
767 derr
<< __func__
<< " layer " << layer
.chunks_map
768 << " failed with " << err
<< " trying to encode "
769 << layer_want_to_encode
<< dendl
;
776 int ErasureCodeLrc::decode_chunks(const set
<int> &want_to_read
,
777 const map
<int, bufferlist
> &chunks
,
778 map
<int, bufferlist
> *decoded
)
780 set
<int> available_chunks
;
782 for (unsigned int i
= 0; i
< get_chunk_count(); ++i
) {
783 if (chunks
.count(i
) != 0)
784 available_chunks
.insert(i
);
789 set
<int> want_to_read_erasures
;
791 for (vector
<Layer
>::reverse_iterator layer
= layers
.rbegin();
792 layer
!= layers
.rend();
794 set
<int> layer_erasures
;
795 set_intersection(layer
->chunks_as_set
.begin(), layer
->chunks_as_set
.end(),
796 erasures
.begin(), erasures
.end(),
797 inserter(layer_erasures
, layer_erasures
.end()));
799 if (layer_erasures
.size() >
800 layer
->erasure_code
->get_coding_chunk_count()) {
801 // skip because there are too many erasures for this layer to recover
802 } else if(layer_erasures
.size() == 0) {
803 // skip because all chunks are already available
805 set
<int> layer_want_to_read
;
806 map
<int, bufferlist
> layer_chunks
;
807 map
<int, bufferlist
> layer_decoded
;
809 for (vector
<int>::const_iterator c
= layer
->chunks
.begin();
810 c
!= layer
->chunks
.end();
813 // Pick chunks from *decoded* instead of *chunks* to re-use
814 // chunks recovered by previous layers. In other words
815 // *chunks* does not change but *decoded* gradually improves
816 // as more layers recover from erasures.
818 if (erasures
.count(*c
) == 0)
819 layer_chunks
[j
] = (*decoded
)[*c
];
820 if (want_to_read
.count(*c
) != 0)
821 layer_want_to_read
.insert(j
);
822 layer_decoded
[j
] = (*decoded
)[*c
];
825 int err
= layer
->erasure_code
->decode_chunks(layer_want_to_read
,
829 derr
<< __func__
<< " layer " << layer
->chunks_map
830 << " failed with " << err
<< " trying to decode "
831 << layer_want_to_read
<< " with " << available_chunks
<< dendl
;
835 for (vector
<int>::const_iterator c
= layer
->chunks
.begin();
836 c
!= layer
->chunks
.end();
838 (*decoded
)[*c
] = layer_decoded
[j
];
842 want_to_read_erasures
.clear();
843 set_intersection(erasures
.begin(), erasures
.end(),
844 want_to_read
.begin(), want_to_read
.end(),
845 inserter(want_to_read_erasures
, want_to_read_erasures
.end()));
846 if (want_to_read_erasures
.size() == 0)
851 if (want_to_read_erasures
.size() > 0) {
852 derr
<< __func__
<< " want to read " << want_to_read
853 << " with available_chunks = " << available_chunks
854 << " end up being unable to read " << want_to_read_erasures
<< dendl
;