namespace {
-seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_seastore_tm);
+seastar::logger& journal_logger() {
+ return crimson::get_logger(ceph_subsys_seastore_journal);
}
}
namespace crimson::os::seastore {
+bool is_aligned(uint64_t offset, uint64_t alignment)
+{
+ return (offset % alignment) == 0;
+}
+
+std::ostream& operator<<(std::ostream &out, const omap_root_t &root)
+{
+ return out << "omap_root{addr=" << root.addr
+ << ", depth=" << root.depth
+ << ", hint=" << root.hint
+ << ", mutated=" << root.mutated
+ << "}";
+}
+
std::ostream& operator<<(std::ostream& out, const seastore_meta_t& meta)
{
return out << meta.seastore_id;
}
-std::ostream &segment_to_stream(std::ostream &out, const segment_id_t &t)
+std::ostream &operator<<(std::ostream &out, const device_id_printer_t &id)
{
- if (t == NULL_SEG_ID)
- return out << "NULL_SEG";
- else if (t == FAKE_SEG_ID)
- return out << "FAKE_SEG";
- else
- return out << t;
+ auto _id = id.id;
+ if (_id == DEVICE_ID_NULL) {
+ return out << "Dev(NULL)";
+ } else if (_id == DEVICE_ID_RECORD_RELATIVE) {
+ return out << "Dev(RR)";
+ } else if (_id == DEVICE_ID_BLOCK_RELATIVE) {
+ return out << "Dev(BR)";
+ } else if (_id == DEVICE_ID_DELAYED) {
+ return out << "Dev(DELAYED)";
+ } else if (_id == DEVICE_ID_FAKE) {
+ return out << "Dev(FAKE)";
+ } else if (_id == DEVICE_ID_ZERO) {
+ return out << "Dev(ZERO)";
+ } else if (_id == DEVICE_ID_ROOT) {
+ return out << "Dev(ROOT)";
+ } else {
+ return out << "Dev(" << (unsigned)_id << ")";
+ }
+}
+
+std::ostream &operator<<(std::ostream &out, const segment_id_t &segment)
+{
+ if (segment == NULL_SEG_ID) {
+ return out << "Seg[NULL]";
+ } else {
+ return out << "Seg[" << device_id_printer_t{segment.device_id()}
+ << "," << segment.device_segment_id()
+ << "]";
+ }
}
-std::ostream &offset_to_stream(std::ostream &out, const segment_off_t &t)
+std::ostream& operator<<(std::ostream& out, segment_type_t t)
{
- if (t == NULL_SEG_OFF)
- return out << "NULL_OFF";
- else
- return out << t;
+ switch(t) {
+ case segment_type_t::JOURNAL:
+ return out << "JOURNAL";
+ case segment_type_t::OOL:
+ return out << "OOL";
+ case segment_type_t::NULL_SEG:
+ return out << "NULL_SEG";
+ default:
+ return out << "INVALID_SEGMENT_TYPE!";
+ }
}
-std::ostream &operator<<(std::ostream &out, const segment_id_t& segment)
+std::ostream& operator<<(std::ostream& out, segment_seq_printer_t seq)
{
- return out << "[" << (uint64_t)segment.device_id() << ","
- << segment.device_segment_id() << "]";
+ if (seq.seq == NULL_SEG_SEQ) {
+ return out << "sseq(NULL)";
+ } else {
+ return out << "sseq(" << seq.seq << ")";
+ }
}
std::ostream &operator<<(std::ostream &out, const paddr_t &rhs)
{
- out << "paddr_t<";
+ auto id = rhs.get_device_id();
+ out << "paddr<";
if (rhs == P_ADDR_NULL) {
- out << "NULL_PADDR";
+ out << "NULL";
} else if (rhs == P_ADDR_MIN) {
- out << "MIN_PADDR";
- } else if (rhs.is_block_relative()) {
- out << "BLOCK_REG";
- } else if (rhs.is_record_relative()) {
- out << "RECORD_REG";
- } else if (rhs.get_device_id() == DEVICE_ID_DELAYED) {
- out << "DELAYED_TEMP";
- } else if (rhs.get_addr_type() == addr_types_t::SEGMENT) {
- const seg_paddr_t& s = rhs.as_seg_paddr();
- segment_to_stream(out, s.get_segment_id());
- out << ", ";
- offset_to_stream(out, s.get_segment_off());
+ out << "MIN";
+ } else if (rhs == P_ADDR_ZERO) {
+ out << "ZERO";
+ } else if (has_device_off(id)) {
+ auto &s = rhs.as_res_paddr();
+ out << device_id_printer_t{id}
+ << ","
+ << s.get_device_off();
+ } else if (rhs.get_addr_type() == paddr_types_t::SEGMENT) {
+ auto &s = rhs.as_seg_paddr();
+ out << s.get_segment_id()
+ << ","
+ << s.get_segment_off();
+ } else if (rhs.get_addr_type() == paddr_types_t::RANDOM_BLOCK) {
+ auto &s = rhs.as_blk_paddr();
+ out << device_id_printer_t{s.get_device_id()}
+ << ","
+ << s.get_device_off();
} else {
- out << "INVALID";
+ out << "INVALID!";
}
return out << ">";
}
+journal_seq_t journal_seq_t::add_offset(
+ journal_type_t type,
+ device_off_t off,
+ device_off_t roll_start,
+ device_off_t roll_size) const
+{
+ assert(offset.is_absolute());
+ assert(off <= DEVICE_OFF_MAX && off >= DEVICE_OFF_MIN);
+ assert(roll_start >= 0);
+ assert(roll_size > 0);
+
+ segment_seq_t jseq = segment_seq;
+ device_off_t joff;
+ if (type == journal_type_t::SEGMENTED) {
+ joff = offset.as_seg_paddr().get_segment_off();
+ } else {
+ assert(type == journal_type_t::RANDOM_BLOCK);
+ auto boff = offset.as_blk_paddr().get_device_off();
+ joff = boff;
+ }
+ auto roll_end = roll_start + roll_size;
+ assert(joff >= roll_start);
+ assert(joff <= roll_end);
+
+ if (off >= 0) {
+ device_off_t new_jseq = jseq + (off / roll_size);
+ joff += (off % roll_size);
+ if (joff >= roll_end) {
+ ++new_jseq;
+ joff -= roll_size;
+ }
+ assert(new_jseq < MAX_SEG_SEQ);
+ jseq = static_cast<segment_seq_t>(new_jseq);
+ } else {
+ device_off_t mod = (-off) / roll_size;
+ joff -= ((-off) % roll_size);
+ if (joff < roll_start) {
+ ++mod;
+ joff += roll_size;
+ }
+ if (jseq >= mod) {
+ jseq -= mod;
+ } else {
+ return JOURNAL_SEQ_MIN;
+ }
+ }
+ assert(joff >= roll_start);
+ assert(joff < roll_end);
+ return journal_seq_t{jseq, make_block_relative_paddr(joff)};
+}
+
+device_off_t journal_seq_t::relative_to(
+ journal_type_t type,
+ const journal_seq_t& r,
+ device_off_t roll_start,
+ device_off_t roll_size) const
+{
+ assert(offset.is_absolute());
+ assert(r.offset.is_absolute());
+ assert(roll_start >= 0);
+ assert(roll_size > 0);
+
+ device_off_t ret = static_cast<device_off_t>(segment_seq) - r.segment_seq;
+ ret *= roll_size;
+ if (type == journal_type_t::SEGMENTED) {
+ ret += (static_cast<device_off_t>(offset.as_seg_paddr().get_segment_off()) -
+ static_cast<device_off_t>(r.offset.as_seg_paddr().get_segment_off()));
+ } else {
+ assert(type == journal_type_t::RANDOM_BLOCK);
+ ret += offset.as_blk_paddr().get_device_off() -
+ r.offset.as_blk_paddr().get_device_off();
+ }
+ assert(ret <= DEVICE_OFF_MAX && ret >= DEVICE_OFF_MIN);
+ return ret;
+}
+
std::ostream &operator<<(std::ostream &out, const journal_seq_t &seq)
{
- return out << "journal_seq_t(segment_seq="
- << seq.segment_seq << ", offset="
- << seq.offset
- << ")";
+ if (seq == JOURNAL_SEQ_NULL) {
+ return out << "JOURNAL_SEQ_NULL";
+ } else if (seq == JOURNAL_SEQ_MIN) {
+ return out << "JOURNAL_SEQ_MIN";
+ } else {
+ return out << "jseq("
+ << segment_seq_printer_t{seq.segment_seq}
+ << ", " << seq.offset
+ << ")";
+ }
}
std::ostream &operator<<(std::ostream &out, extent_types_t t)
return out << "LADDR_INTERNAL";
case extent_types_t::LADDR_LEAF:
return out << "LADDR_LEAF";
+ case extent_types_t::DINK_LADDR_LEAF:
+ return out << "LADDR_LEAF";
case extent_types_t::ONODE_BLOCK_STAGED:
return out << "ONODE_BLOCK_STAGED";
case extent_types_t::OMAP_INNER:
return out << "TEST_BLOCK";
case extent_types_t::TEST_BLOCK_PHYSICAL:
return out << "TEST_BLOCK_PHYSICAL";
+ case extent_types_t::BACKREF_INTERNAL:
+ return out << "BACKREF_INTERNAL";
+ case extent_types_t::BACKREF_LEAF:
+ return out << "BACKREF_LEAF";
case extent_types_t::NONE:
return out << "NONE";
default:
}
}
+std::ostream &operator<<(std::ostream &out, rewrite_gen_printer_t gen)
+{
+ if (gen.gen == NULL_GENERATION) {
+ return out << "GEN_NULL";
+ } else if (gen.gen == INIT_GENERATION) {
+ return out << "GEN_INIT";
+ } else if (gen.gen == INLINE_GENERATION) {
+ return out << "GEN_INL";
+ } else if (gen.gen == OOL_GENERATION) {
+ return out << "GEN_OOL";
+ } else if (gen.gen > REWRITE_GENERATIONS) {
+ return out << "GEN_INVALID(" << (unsigned)gen.gen << ")!";
+ } else {
+ return out << "GEN(" << (unsigned)gen.gen << ")";
+ }
+}
+
+std::ostream &operator<<(std::ostream &out, data_category_t c)
+{
+ switch (c) {
+ case data_category_t::METADATA:
+ return out << "MD";
+ case data_category_t::DATA:
+ return out << "DATA";
+ default:
+ return out << "INVALID_CATEGORY!";
+ }
+}
+
+std::ostream &operator<<(std::ostream &out, sea_time_point_printer_t tp)
+{
+ if (tp.tp == NULL_TIME) {
+ return out << "tp(NULL)";
+ }
+ auto time = seastar::lowres_system_clock::to_time_t(tp.tp);
+ char buf[32];
+ std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", std::localtime(&time));
+ return out << "tp(" << buf << ")";
+}
+
+std::ostream &operator<<(std::ostream &out, mod_time_point_printer_t tp) {
+ auto time = mod_to_timepoint(tp.tp);
+ return out << "mod_" << sea_time_point_printer_t{time};
+}
+
std::ostream &operator<<(std::ostream &out, const laddr_list_t &rhs)
{
bool first = false;
return out << ']';
}
-std::ostream &operator<<(std::ostream &lhs, const delta_info_t &rhs)
+std::ostream &operator<<(std::ostream &out, const delta_info_t &delta)
{
- return lhs << "delta_info_t("
- << "type: " << rhs.type
- << ", paddr: " << rhs.paddr
- << ", laddr: " << rhs.laddr
- << ", prev_crc: " << rhs.prev_crc
- << ", final_crc: " << rhs.final_crc
- << ", length: " << rhs.length
- << ", pversion: " << rhs.pversion
+ return out << "delta_info_t("
+ << "type: " << delta.type
+ << ", paddr: " << delta.paddr
+ << ", laddr: " << delta.laddr
+ << ", prev_crc: " << delta.prev_crc
+ << ", final_crc: " << delta.final_crc
+ << ", length: " << delta.length
+ << ", pversion: " << delta.pversion
+ << ", ext_seq: " << delta.ext_seq
+ << ", seg_type: " << delta.seg_type
<< ")";
}
+std::ostream &operator<<(std::ostream &out, const journal_tail_delta_t &delta)
+{
+ return out << "journal_tail_delta_t("
+ << "alloc_tail=" << delta.alloc_tail
+ << ", dirty_tail=" << delta.dirty_tail
+ << ")";
+}
+
std::ostream &operator<<(std::ostream &out, const extent_info_t &info)
{
return out << "extent_info_t("
std::ostream &operator<<(std::ostream &out, const segment_header_t &header)
{
return out << "segment_header_t("
- << "segment_seq=" << header.journal_segment_seq
- << ", physical_segment_id=" << header.physical_segment_id
- << ", journal_tail=" << header.journal_tail
- << ", segment_nonce=" << header.segment_nonce
- << ", out-of-line=" << header.out_of_line
- << ")";
+ << header.physical_segment_id
+ << " " << header.type
+ << " " << segment_seq_printer_t{header.segment_seq}
+ << " " << header.category
+ << " " << rewrite_gen_printer_t{header.generation}
+ << ", dirty_tail=" << header.dirty_tail
+ << ", alloc_tail=" << header.alloc_tail
+ << ", segment_nonce=" << header.segment_nonce
+ << ")";
+}
+
+std::ostream &operator<<(std::ostream &out, const segment_tail_t &tail)
+{
+ return out << "segment_tail_t("
+ << tail.physical_segment_id
+ << " " << tail.type
+ << " " << segment_seq_printer_t{tail.segment_seq}
+ << ", segment_nonce=" << tail.segment_nonce
+ << ", modify_time=" << mod_time_point_printer_t{tail.modify_time}
+ << ", num_extents=" << tail.num_extents
+ << ")";
}
extent_len_t record_size_t::get_raw_mdlength() const
plain_mdlength += ceph::encoded_sizeof(delta);
}
+std::ostream &operator<<(std::ostream &os, transaction_type_t type)
+{
+ switch (type) {
+ case transaction_type_t::MUTATE:
+ return os << "MUTATE";
+ case transaction_type_t::READ:
+ return os << "READ";
+ case transaction_type_t::TRIM_DIRTY:
+ return os << "TRIM_DIRTY";
+ case transaction_type_t::TRIM_ALLOC:
+ return os << "TRIM_ALLOC";
+ case transaction_type_t::CLEANER_MAIN:
+ return os << "CLEANER_MAIN";
+ case transaction_type_t::CLEANER_COLD:
+ return os << "CLEANER_COLD";
+ case transaction_type_t::MAX:
+ return os << "TRANS_TYPE_NULL";
+ default:
+ return os << "INVALID_TRANS_TYPE("
+ << static_cast<std::size_t>(type)
+ << ")";
+ }
+}
+
+std::ostream &operator<<(std::ostream& out, const record_size_t& rsize)
+{
+ return out << "record_size_t("
+ << "raw_md=" << rsize.get_raw_mdlength()
+ << ", data=" << rsize.dlength
+ << ")";
+}
+
+std::ostream &operator<<(std::ostream& out, const record_t& r)
+{
+ return out << "record_t("
+ << "type=" << r.type
+ << ", num_extents=" << r.extents.size()
+ << ", num_deltas=" << r.deltas.size()
+ << ", modify_time=" << sea_time_point_printer_t{r.modify_time}
+ << ")";
+}
+
+std::ostream &operator<<(std::ostream& out, const record_header_t& r)
+{
+ return out << "record_header_t("
+ << "type=" << r.type
+ << ", num_extents=" << r.extents
+ << ", num_deltas=" << r.deltas
+ << ", modify_time=" << mod_time_point_printer_t{r.modify_time}
+ << ")";
+}
+
+std::ostream& operator<<(std::ostream& out, const record_group_header_t& h)
+{
+ return out << "record_group_header_t("
+ << "num_records=" << h.records
+ << ", mdlength=" << h.mdlength
+ << ", dlength=" << h.dlength
+ << ", nonce=" << h.segment_nonce
+ << ", committed_to=" << h.committed_to
+ << ", data_crc=" << h.data_crc
+ << ")";
+}
+
extent_len_t record_group_size_t::get_raw_mdlength() const
{
return plain_mdlength +
block_size = _block_size;
}
+std::ostream& operator<<(std::ostream& out, const record_group_size_t& size)
+{
+ return out << "record_group_size_t("
+ << "raw_md=" << size.get_raw_mdlength()
+ << ", data=" << size.dlength
+ << ", block_size=" << size.block_size
+ << ", fullness=" << size.get_fullness()
+ << ")";
+}
+
+std::ostream& operator<<(std::ostream& out, const record_group_t& rg)
+{
+ return out << "record_group_t("
+ << "num_records=" << rg.records.size()
+ << ", " << rg.size
+ << ")";
+}
+
ceph::bufferlist encode_record(
record_t&& record,
extent_len_t block_size,
for (auto& r: record_group.records) {
record_header_t rheader{
+ r.type,
(extent_len_t)r.deltas.size(),
(extent_len_t)r.extents.size(),
+ timepoint_to_mod(r.modify_time)
};
encode(rheader, bl);
}
try {
decode(header, bp);
} catch (ceph::buffer::error &e) {
- logger().debug(
+ journal_logger().debug(
"try_decode_records_header: failed, "
"cannot decode record_group_header_t, got {}.",
- e);
+ e.what());
return std::nullopt;
}
if (header.segment_nonce != expected_nonce) {
- logger().debug(
+ journal_logger().debug(
"try_decode_records_header: failed, record_group_header nonce mismatch, "
"read {}, expected {}!",
header.segment_nonce,
test_crc);
bool success = (test_crc == recorded_crc);
if (!success) {
- logger().debug("validate_records_metadata: failed, metadata crc mismatch.");
+ journal_logger().debug(
+ "validate_records_metadata: failed, metadata crc mismatch.");
}
return success;
}
{
bool success = (data_bl.crc32c(-1) == header.data_crc);
if (!success) {
- logger().debug("validate_records_data: failed, data crc mismatch!");
+ journal_logger().debug(
+ "validate_records_data: failed, data crc mismatch!");
}
return success;
}
-namespace {
-
std::optional<std::vector<record_header_t>>
try_decode_record_headers(
const record_group_header_t& header,
try {
decode(i, bliter);
} catch (ceph::buffer::error &e) {
- logger().debug(
+ journal_logger().debug(
"try_decode_record_headers: failed, "
"cannot decode record_header_t, got {}.",
- e);
+ e.what());
return std::nullopt;
}
}
return record_headers;
}
-}
-
std::optional<std::vector<record_extent_infos_t> >
try_decode_extent_infos(
const record_group_header_t& header,
{
auto maybe_headers = try_decode_record_headers(header, md_bl);
if (!maybe_headers) {
- logger().debug(
- "try_decode_extent_infos: failed, cannot decode record headers.");
return std::nullopt;
}
try {
decode(i, bliter);
} catch (ceph::buffer::error &e) {
- logger().debug(
+ journal_logger().debug(
"try_decode_extent_infos: failed, "
"cannot decode extent_info_t, got {}.",
- e);
+ e.what());
return std::nullopt;
}
}
{
auto maybe_record_extent_infos = try_decode_extent_infos(header, md_bl);
if (!maybe_record_extent_infos) {
- logger().debug(
- "try_decode_deltas: failed, cannot decode extent_infos.");
return std::nullopt;
}
result_iter->deltas.resize(r.header.deltas);
for (auto& i: result_iter->deltas) {
try {
- decode(i, bliter);
+ decode(i.second, bliter);
+ i.first = mod_to_timepoint(r.header.modify_time);
} catch (ceph::buffer::error &e) {
- logger().debug(
+ journal_logger().debug(
"try_decode_deltas: failed, "
"cannot decode delta_info_t, got {}.",
- e);
+ e.what());
return std::nullopt;
}
}
for (auto& i: r.extent_infos) {
- auto& seg_addr = record_block_base.as_seg_paddr();
- seg_addr.set_segment_off(seg_addr.get_segment_off() + i.len);
+ record_block_base = record_block_base.add_offset(i.len);
}
++result_iter;
}
return record_deltas;
}
+std::ostream& operator<<(std::ostream& out, placement_hint_t h)
+{
+ switch (h) {
+ case placement_hint_t::HOT:
+ return out << "Hint(HOT)";
+ case placement_hint_t::COLD:
+ return out << "Hint(COLD)";
+ case placement_hint_t::REWRITE:
+ return out << "Hint(REWRITE)";
+ case PLACEMENT_HINT_NULL:
+ return out << "Hint(NULL)";
+ default:
+ return out << "INVALID_PLACEMENT_HINT_TYPE!";
+ }
+}
+
bool can_delay_allocation(device_type_t type) {
// Some types of device may not support delayed allocation, for example PMEM.
- return type <= device_type_t::RANDOM_BLOCK;
+ // All types of device currently support delayed allocation.
+ return true;
}
device_type_t string_to_device_type(std::string type) {
- if (type == "segmented") {
- return device_type_t::SEGMENTED;
+ if (type == "HDD") {
+ return device_type_t::HDD;
}
- if (type == "random_block") {
- return device_type_t::RANDOM_BLOCK;
+ if (type == "SSD") {
+ return device_type_t::SSD;
}
- if (type == "pmem") {
- return device_type_t::PMEM;
+ if (type == "ZNS") {
+ return device_type_t::ZNS;
+ }
+ if (type == "RANDOM_BLOCK_SSD") {
+ return device_type_t::RANDOM_BLOCK_SSD;
}
return device_type_t::NONE;
}
switch (t) {
case device_type_t::NONE:
return out << "NONE";
- case device_type_t::SEGMENTED:
- return out << "SEGMENTED";
- case device_type_t::RANDOM_BLOCK:
- return out << "RANDOM_BLOCK";
- case device_type_t::PMEM:
- return out << "PMEM";
+ case device_type_t::HDD:
+ return out << "HDD";
+ case device_type_t::SSD:
+ return out << "SSD";
+ case device_type_t::ZNS:
+ return out << "ZNS";
+ case device_type_t::EPHEMERAL_COLD:
+ return out << "EPHEMERAL_COLD";
+ case device_type_t::EPHEMERAL_MAIN:
+ return out << "EPHEMERAL_MAIN";
+ case device_type_t::RANDOM_BLOCK_SSD:
+ return out << "RANDOM_BLOCK_SSD";
+ case device_type_t::RANDOM_BLOCK_EPHEMERAL:
+ return out << "RANDOM_BLOCK_EPHEMERAL";
default:
return out << "INVALID_DEVICE_TYPE!";
}
}
-paddr_t convert_blk_paddr_to_paddr(blk_paddr_t addr, size_t block_size,
- uint32_t blocks_per_segment, device_id_t d_id)
+std::ostream& operator<<(std::ostream& out, backend_type_t btype) {
+ if (btype == backend_type_t::SEGMENTED) {
+ return out << "SEGMENTED";
+ } else {
+ return out << "RANDOM_BLOCK";
+ }
+}
+
+std::ostream& operator<<(std::ostream& out, const write_result_t& w)
{
- segment_id_t id = segment_id_t {
- d_id,
- (device_segment_id_t)(addr / (block_size * blocks_per_segment))
- };
- segment_off_t off = addr % (block_size * blocks_per_segment);
- return paddr_t::make_seg_paddr(id, off);
+ return out << "write_result_t("
+ << "start=" << w.start_seq
+ << ", length=" << w.length
+ << ")";
}
-blk_paddr_t convert_paddr_to_blk_paddr(paddr_t addr, size_t block_size,
- uint32_t blocks_per_segment)
+std::ostream& operator<<(std::ostream& out, const record_locator_t& l)
{
- seg_paddr_t& s = addr.as_seg_paddr();
- return (blk_paddr_t)(s.get_segment_id().device_segment_id() *
- (block_size * blocks_per_segment) + s.get_segment_off());
+ return out << "record_locator_t("
+ << "block_base=" << l.record_block_base
+ << ", " << l.write_result
+ << ")";
}
+void scan_valid_records_cursor::emplace_record_group(
+ const record_group_header_t& header, ceph::bufferlist&& md_bl)
+{
+ auto new_committed_to = header.committed_to;
+ ceph_assert(last_committed == JOURNAL_SEQ_NULL ||
+ last_committed <= new_committed_to);
+ last_committed = new_committed_to;
+ pending_record_groups.emplace_back(
+ seq.offset,
+ header,
+ std::move(md_bl));
+ increment_seq(header.dlength + header.mdlength);
+ ceph_assert(new_committed_to == JOURNAL_SEQ_NULL ||
+ new_committed_to < seq);
+}
+
+std::ostream& operator<<(std::ostream& out, const scan_valid_records_cursor& c)
+{
+ return out << "cursor(last_valid_header_found=" << c.last_valid_header_found
+ << ", seq=" << c.seq
+ << ", last_committed=" << c.last_committed
+ << ", pending_record_groups=" << c.pending_record_groups.size()
+ << ", num_consumed_records=" << c.num_consumed_records
+ << ")";
+}
}