]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | /* | |
3 | * This file is open source software, licensed to you under the terms | |
4 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
5 | * distributed with this work for additional information regarding copyright | |
6 | * ownership. You may not use this file except in compliance with the License. | |
7 | * | |
8 | * You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | /* | |
20 | * Copyright (C) 2014 Cloudius Systems, Ltd. | |
21 | */ | |
22 | /* | |
23 | * Ceph - scalable distributed file system | |
24 | * | |
25 | * Copyright (C) 2015 XSky <haomai@xsky.com> | |
26 | * | |
27 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
28 | * | |
29 | * This is free software; you can redistribute it and/or | |
30 | * modify it under the terms of the GNU Lesser General Public | |
31 | * License version 2.1, as published by the Free Software | |
32 | * Foundation. See file COPYING. | |
33 | * | |
34 | */ | |
35 | ||
36 | #ifndef CEPH_MSG_PACKET_UTIL_H_ | |
37 | #define CEPH_MSG_PACKET_UTIL_H_ | |
38 | ||
39 | #include <map> | |
40 | #include <iostream> | |
41 | ||
42 | #include "Packet.h" | |
43 | ||
44 | template <typename Offset, typename Tag> | |
45 | class packet_merger { | |
46 | private: | |
47 | static uint64_t& linearizations_ref() { | |
48 | static thread_local uint64_t linearization_count; | |
49 | return linearization_count; | |
50 | } | |
51 | public: | |
52 | std::map<Offset, Packet> map; | |
53 | ||
54 | static uint64_t linearizations() { | |
55 | return linearizations_ref(); | |
56 | } | |
57 | ||
58 | void merge(Offset offset, Packet p) { | |
59 | bool insert = true; | |
60 | auto beg = offset; | |
61 | auto end = beg + p.len(); | |
62 | // Fisrt, try to merge the packet with existing segment | |
63 | for (auto it = map.begin(); it != map.end();) { | |
64 | auto& seg_pkt = it->second; | |
65 | auto seg_beg = it->first; | |
66 | auto seg_end = seg_beg + seg_pkt.len(); | |
67 | // There are 6 cases: | |
68 | if (seg_beg <= beg && end <= seg_end) { | |
69 | // 1) seg_beg beg end seg_end | |
70 | // We already have data in this packet | |
71 | return; | |
72 | } else if (beg <= seg_beg && seg_end <= end) { | |
73 | // 2) beg seg_beg seg_end end | |
74 | // The new segment contains more data than this old segment | |
75 | // Delete the old one, insert the new one | |
76 | it = map.erase(it); | |
77 | insert = true; | |
78 | break; | |
79 | } else if (beg < seg_beg && seg_beg <= end && end <= seg_end) { | |
80 | // 3) beg seg_beg end seg_end | |
81 | // Merge two segments, trim front of old segment | |
82 | auto trim = end - seg_beg; | |
83 | seg_pkt.trim_front(trim); | |
84 | p.append(std::move(seg_pkt)); | |
85 | // Delete the old one, insert the new one | |
86 | it = map.erase(it); | |
87 | insert = true; | |
88 | break; | |
89 | } else if (seg_beg <= beg && beg <= seg_end && seg_end < end) { | |
90 | // 4) seg_beg beg seg_end end | |
91 | // Merge two segments, trim front of new segment | |
92 | auto trim = seg_end - beg; | |
93 | p.trim_front(trim); | |
94 | // Append new data to the old segment, keep the old segment | |
95 | seg_pkt.append(std::move(p)); | |
96 | seg_pkt.linearize(); | |
97 | ++linearizations_ref(); | |
98 | insert = false; | |
99 | break; | |
100 | } else { | |
101 | // 5) beg end < seg_beg seg_end | |
102 | // or | |
103 | // 6) seg_beg seg_end < beg end | |
104 | // Can not merge with this segment, keep looking | |
105 | it++; | |
106 | insert = true; | |
107 | } | |
108 | } | |
109 | ||
110 | if (insert) { | |
111 | p.linearize(); | |
112 | ++linearizations_ref(); | |
113 | map.emplace(beg, std::move(p)); | |
114 | } | |
115 | ||
116 | // Second, merge adjacent segments after this packet has been merged, | |
117 | // becasue this packet might fill a "whole" and make two adjacent | |
118 | // segments mergable | |
119 | for (auto it = map.begin(); it != map.end();) { | |
120 | // The first segment | |
121 | auto& seg_pkt = it->second; | |
122 | auto seg_beg = it->first; | |
123 | auto seg_end = seg_beg + seg_pkt.len(); | |
124 | ||
125 | // The second segment | |
126 | auto it_next = it; | |
127 | it_next++; | |
128 | if (it_next == map.end()) { | |
129 | break; | |
130 | } | |
131 | auto& p = it_next->second; | |
132 | auto beg = it_next->first; | |
133 | auto end = beg + p.len(); | |
134 | ||
135 | // Merge the the second segment into first segment if possible | |
136 | if (seg_beg <= beg && beg <= seg_end && seg_end < end) { | |
137 | // Merge two segments, trim front of second segment | |
138 | auto trim = seg_end - beg; | |
139 | p.trim_front(trim); | |
140 | // Append new data to the first segment, keep the first segment | |
141 | seg_pkt.append(std::move(p)); | |
142 | ||
143 | // Delete the second segment | |
144 | map.erase(it_next); | |
145 | ||
146 | // Keep merging this first segment with its new next packet | |
147 | // So we do not update the iterator: it | |
148 | continue; | |
149 | } else if (end <= seg_end) { | |
150 | // The first segment has all the data in the second segment | |
151 | // Delete the second segment | |
152 | map.erase(it_next); | |
153 | continue; | |
154 | } else if (seg_end < beg) { | |
155 | // Can not merge first segment with second segment | |
156 | it = it_next; | |
157 | continue; | |
158 | } else { | |
159 | // If we reach here, we have a bug with merge. | |
160 | std::cout << "packet_merger: merge error\n"; | |
161 | abort(); | |
162 | break; | |
163 | } | |
164 | } | |
165 | } | |
166 | }; | |
167 | ||
168 | #endif |