]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/simple-stream.hh
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / include / seastar / core / simple-stream.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2016 Scylladb, Ltd.
20 */
21
22#pragma once
23#include <seastar/core/sstring.hh>
24#include <seastar/util/variant_utils.hh>
25
26namespace seastar {
27
28class measuring_output_stream {
29 size_t _size = 0;
30public:
1e59de90 31 void write(const char*, size_t size) {
11fdf7f2
TL
32 _size += size;
33 }
34
35 size_t size() const {
36 return _size;
37 }
38};
39
40template<typename>
41class memory_output_stream;
42
43class simple_memory_input_stream;
44
45template<typename Iterator>
46class fragmented_memory_input_stream;
47
48template<typename Iterator>
49class memory_input_stream;
50
51class simple_memory_output_stream {
52 char* _p = nullptr;
53 size_t _size = 0;
54public:
55 using has_with_stream = std::false_type;
56 simple_memory_output_stream() {}
57 simple_memory_output_stream(char* p, size_t size, size_t start = 0) : _p(p + start), _size(size) {}
58 char* begin() { return _p; }
59
60 [[gnu::always_inline]]
61 void skip(size_t size) {
62 if (size > _size) {
63 throw std::out_of_range("serialization buffer overflow");
64 }
65 _p += size;
66 _size -= size;
67 }
68
69 [[gnu::always_inline]]
70 simple_memory_output_stream write_substream(size_t size) {
71 if (size > _size) {
72 throw std::out_of_range("serialization buffer overflow");
73 }
74 simple_memory_output_stream substream(_p, size);
75 skip(size);
76 return substream;
77 }
78
79 [[gnu::always_inline]]
80 void write(const char* p, size_t size) {
81 if (size > _size) {
82 throw std::out_of_range("serialization buffer overflow");
83 }
84 std::copy_n(p, size, _p);
85 skip(size);
86 }
87
88 [[gnu::always_inline]]
89 void fill(char c, size_t size) {
90 if (size > _size) {
91 throw std::out_of_range("serialization buffer overflow");
92 }
93 std::fill_n(_p, size, c);
94 skip(size);
95 }
96
97 [[gnu::always_inline]]
1e59de90 98 size_t size() const {
11fdf7f2
TL
99 return _size;
100 }
101
102 // simple_memory_output_stream is a write cursor that keeps a mutable view of some
103 // underlying buffer and provides write interface. to_input_stream() converts it
104 // to a read cursor that points to the same part of the buffer but provides
105 // read interface.
106 simple_memory_input_stream to_input_stream() const;
107};
108
109template<typename Iterator>
110class fragmented_memory_output_stream {
111 using simple = simple_memory_output_stream ;
112
113 Iterator _it;
114 simple _current;
115 size_t _size = 0;
116
117 friend class memory_input_stream<Iterator>;
118private:
119 template<typename Func>
120 //requires requires(Func f, view bv) { { f(bv) } -> void; }
121 void for_each_fragment(size_t size, Func&& func) {
122 if (size > _size) {
123 throw std::out_of_range("serialization buffer overflow");
124 }
125 _size -= size;
126 while (size) {
127 if (!_current.size()) {
128 _current = simple(reinterpret_cast<char*>((*_it).get_write()), (*_it).size());
129 _it++;
130 }
131 auto this_size = std::min(_current.size(), size);
132 func(_current.write_substream(this_size));
133 size -= this_size;
134 }
135 }
136 fragmented_memory_output_stream(Iterator it, simple_memory_output_stream bv, size_t size)
137 : _it(it), _current(bv), _size(size) { }
138public:
139 using has_with_stream = std::false_type;
140 using iterator_type = Iterator;
141
142 fragmented_memory_output_stream() = default;
143
144 fragmented_memory_output_stream(Iterator it, size_t size)
145 : _it(it), _size(size) {
146 }
147
148 void skip(size_t size) {
149 for_each_fragment(size, [] (auto) { });
150 }
151 memory_output_stream<Iterator> write_substream(size_t size) {
152 if (size > _size) {
153 throw std::out_of_range("serialization buffer overflow");
154 }
155 if (_current.size() >= size) {
156 _size -= size;
157 return _current.write_substream(size);
158 }
159 fragmented_memory_output_stream substream(_it, _current, size);
160 skip(size);
161 return substream;
162 }
163 void write(const char* p, size_t size) {
164 for_each_fragment(size, [&p] (auto bv) {
165 std::copy_n(p, bv.size(), bv.begin());
166 p += bv.size();
167 });
168 }
169 void fill(char c, size_t size) {
170 for_each_fragment(size, [c] (simple fragment) {
171 std::fill_n(fragment.begin(), fragment.size(), c);
172 });
173 }
1e59de90 174 size_t size() const {
11fdf7f2
TL
175 return _size;
176 }
177
178 // fragmented_memory_input_stream is a write cursor that keeps a mutable view of some
179 // underlying fragmented buffer and provides write interface. to_input_stream() converts
180 // it to a read cursor that points to the same part of the buffer but provides read interface.
181 fragmented_memory_input_stream<Iterator> to_input_stream() const;
182};
183
184template<typename Iterator>
185class memory_output_stream {
186public:
187 using simple = simple_memory_output_stream;
188 using fragmented = fragmented_memory_output_stream<Iterator>;
189
190private:
191 const bool _is_simple;
192 using fragmented_type = fragmented;
193 union {
194 simple _simple;
195 fragmented_type _fragmented;
196 };
197public:
198 template<typename StreamVisitor>
199 [[gnu::always_inline]]
200 decltype(auto) with_stream(StreamVisitor&& visitor) {
201 if (__builtin_expect(_is_simple, true)) {
202 return visitor(_simple);
203 }
204 return visitor(_fragmented);
205 }
206
207 template<typename StreamVisitor>
208 [[gnu::always_inline]]
209 decltype(auto) with_stream(StreamVisitor&& visitor) const {
210 if (__builtin_expect(_is_simple, true)) {
211 return visitor(_simple);
212 }
213 return visitor(_fragmented);
214 }
215public:
216 using has_with_stream = std::true_type;
217 using iterator_type = Iterator;
218 memory_output_stream()
219 : _is_simple(true), _simple() {}
220 memory_output_stream(simple stream)
221 : _is_simple(true), _simple(std::move(stream)) {}
222 memory_output_stream(fragmented stream)
223 : _is_simple(false), _fragmented(std::move(stream)) {}
224
225 [[gnu::always_inline]]
226 memory_output_stream(const memory_output_stream& other) noexcept : _is_simple(other._is_simple) {
227 // Making this copy constructor noexcept makes copy assignment simpler.
228 // Besides, performance of memory_output_stream relies on the fact that both
229 // fragmented and simple input stream are PODs and the branch below
230 // is optimized away, so throwable copy constructors aren't something
231 // we want.
232 static_assert(std::is_nothrow_copy_constructible<fragmented>::value,
233 "seastar::memory_output_stream::fragmented should be copy constructible");
234 static_assert(std::is_nothrow_copy_constructible<simple>::value,
235 "seastar::memory_output_stream::simple should be copy constructible");
236 if (_is_simple) {
237 new (&_simple) simple(other._simple);
238 } else {
239 new (&_fragmented) fragmented_type(other._fragmented);
240 }
241 }
242
243 [[gnu::always_inline]]
244 memory_output_stream(memory_output_stream&& other) noexcept : _is_simple(other._is_simple) {
245 if (_is_simple) {
246 new (&_simple) simple(std::move(other._simple));
247 } else {
248 new (&_fragmented) fragmented_type(std::move(other._fragmented));
249 }
250 }
251
252 [[gnu::always_inline]]
253 memory_output_stream& operator=(const memory_output_stream& other) noexcept {
254 // Copy constructor being noexcept makes copy assignment simpler.
255 static_assert(std::is_nothrow_copy_constructible<memory_output_stream>::value,
256 "memory_output_stream copy constructor shouldn't throw");
257 if (this != &other) {
258 this->~memory_output_stream();
259 new (this) memory_output_stream(other);
260 }
261 return *this;
262 }
263
264 [[gnu::always_inline]]
265 memory_output_stream& operator=(memory_output_stream&& other) noexcept {
266 if (this != &other) {
267 this->~memory_output_stream();
268 new (this) memory_output_stream(std::move(other));
269 }
270 return *this;
271 }
272
273 [[gnu::always_inline]]
274 ~memory_output_stream() {
275 if (_is_simple) {
276 _simple.~simple();
277 } else {
278 _fragmented.~fragmented_type();
279 }
280 }
281
282 [[gnu::always_inline]]
283 void skip(size_t size) {
284 with_stream([size] (auto& stream) {
285 stream.skip(size);
286 });
287 }
288
289 [[gnu::always_inline]]
290 memory_output_stream write_substream(size_t size) {
291 return with_stream([size] (auto& stream) -> memory_output_stream {
292 return stream.write_substream(size);
293 });
294 }
295
296 [[gnu::always_inline]]
297 void write(const char* p, size_t size) {
298 with_stream([p, size] (auto& stream) {
299 stream.write(p, size);
300 });
301 }
302
303 [[gnu::always_inline]]
304 void fill(char c, size_t size) {
305 with_stream([c, size] (auto& stream) {
306 stream.fill(c, size);
307 });
308 }
309
310 [[gnu::always_inline]]
311 size_t size() const {
312 return with_stream([] (auto& stream) {
313 return stream.size();
314 });
315 }
316
317 memory_input_stream<Iterator> to_input_stream() const;
318};
319
320class simple_memory_input_stream {
321 using simple = simple_memory_input_stream;
322
323 const char* _p = nullptr;
324 size_t _size = 0;
325public:
326 using has_with_stream = std::false_type;
327 simple_memory_input_stream() = default;
328 simple_memory_input_stream(const char* p, size_t size) : _p(p), _size(size) {}
329
330 const char* begin() const { return _p; }
331
332 [[gnu::always_inline]]
333 void skip(size_t size) {
334 if (size > _size) {
335 throw std::out_of_range("deserialization buffer underflow");
336 }
337 _p += size;
338 _size -= size;
339 }
340
341 [[gnu::always_inline]]
342 simple read_substream(size_t size) {
343 if (size > _size) {
344 throw std::out_of_range("deserialization buffer underflow");
345 }
346 simple substream(_p, size);
347 skip(size);
348 return substream;
349 }
350
351 [[gnu::always_inline]]
352 void read(char* p, size_t size) {
353 if (size > _size) {
354 throw std::out_of_range("deserialization buffer underflow");
355 }
356 std::copy_n(_p, size, p);
357 skip(size);
358 }
359
360 template<typename Output>
361 [[gnu::always_inline]]
362 void copy_to(Output& out) const {
363 out.write(_p, _size);
364 }
365
366 [[gnu::always_inline]]
1e59de90 367 size_t size() const {
11fdf7f2
TL
368 return _size;
369 }
370};
371
372template<typename Iterator>
373class fragmented_memory_input_stream {
374 using simple = simple_memory_input_stream;
375 using fragmented = fragmented_memory_input_stream;
376
377 Iterator _it;
378 simple _current;
379 size_t _size;
380private:
381 template<typename Func>
382 //requires requires(Func f, view bv) { { f(bv) } -> void; }
383 void for_each_fragment(size_t size, Func&& func) {
384 if (size > _size) {
385 throw std::out_of_range("deserialization buffer underflow");
386 }
387 _size -= size;
388 while (size) {
389 if (!_current.size()) {
390 _current = simple(reinterpret_cast<const char*>((*_it).begin()), (*_it).size());
391 _it++;
392 }
393 auto this_size = std::min(_current.size(), size);
394 func(_current.read_substream(this_size));
395 size -= this_size;
396 }
397 }
398 fragmented_memory_input_stream(Iterator it, simple bv, size_t size)
399 : _it(it), _current(bv), _size(size) { }
400 friend class fragmented_memory_output_stream<Iterator>;
401public:
402 using has_with_stream = std::false_type;
403 using iterator_type = Iterator;
404 fragmented_memory_input_stream(Iterator it, size_t size)
405 : _it(it), _size(size) {
406 }
407
408 void skip(size_t size) {
409 for_each_fragment(size, [] (auto) { });
410 }
411 fragmented read_substream(size_t size) {
412 if (size > _size) {
413 throw std::out_of_range("deserialization buffer underflow");
414 }
415 fragmented substream(_it, _current, size);
416 skip(size);
417 return substream;
418 }
419 void read(char* p, size_t size) {
420 for_each_fragment(size, [&p] (auto bv) {
421 p = std::copy_n(bv.begin(), bv.size(), p);
422 });
423 }
424 template<typename Output>
425 void copy_to(Output& out) {
426 for_each_fragment(_size, [&out] (auto bv) {
427 bv.copy_to(out);
428 });
429 }
1e59de90 430 size_t size() const {
11fdf7f2
TL
431 return _size;
432 }
433
434 const char* first_fragment_data() const { return _current.begin(); }
435 size_t first_fragment_size() const { return _current.size(); }
436 Iterator fragment_iterator() const { return _it; }
437};
438
439/*
440template<typename Visitor>
441concept bool StreamVisitor() {
442 return requires(Visitor visitor, simple& simple, fragmented& fragmented) {
443 visitor(simple);
444 visitor(fragmented);
445 };
446}
447*/
448// memory_input_stream performs type erasure optimized for cases where
449// simple is used.
450// By using a lot of [[gnu::always_inline]] attributes this class attempts to
451// make the compiler generate code with simple functions inlined
452// directly in the user of the intput_stream.
453template<typename Iterator>
454class memory_input_stream {
455public:
456 using simple = simple_memory_input_stream;
457 using fragmented = fragmented_memory_input_stream<Iterator>;
458private:
459 const bool _is_simple;
460 using fragmented_type = fragmented;
461 union {
462 simple _simple;
463 fragmented_type _fragmented;
464 };
465public:
466 template<typename StreamVisitor>
467 [[gnu::always_inline]]
468 decltype(auto) with_stream(StreamVisitor&& visitor) {
469 if (__builtin_expect(_is_simple, true)) {
470 return visitor(_simple);
471 }
472 return visitor(_fragmented);
473 }
474
475 template<typename StreamVisitor>
476 [[gnu::always_inline]]
477 decltype(auto) with_stream(StreamVisitor&& visitor) const {
478 if (__builtin_expect(_is_simple, true)) {
479 return visitor(_simple);
480 }
481 return visitor(_fragmented);
482 }
483public:
484 using has_with_stream = std::true_type;
485 using iterator_type = Iterator;
486 memory_input_stream(simple stream)
487 : _is_simple(true), _simple(std::move(stream)) {}
488 memory_input_stream(fragmented stream)
489 : _is_simple(false), _fragmented(std::move(stream)) {}
490
491 [[gnu::always_inline]]
492 memory_input_stream(const memory_input_stream& other) noexcept : _is_simple(other._is_simple) {
493 // Making this copy constructor noexcept makes copy assignment simpler.
494 // Besides, performance of memory_input_stream relies on the fact that both
495 // fragmented and simple input stream are PODs and the branch below
496 // is optimized away, so throwable copy constructors aren't something
497 // we want.
498 static_assert(std::is_nothrow_copy_constructible<fragmented>::value,
499 "seastar::memory_input_stream::fragmented should be copy constructible");
500 static_assert(std::is_nothrow_copy_constructible<simple>::value,
501 "seastar::memory_input_stream::simple should be copy constructible");
502 if (_is_simple) {
503 new (&_simple) simple(other._simple);
504 } else {
505 new (&_fragmented) fragmented_type(other._fragmented);
506 }
507 }
508
509 [[gnu::always_inline]]
510 memory_input_stream(memory_input_stream&& other) noexcept : _is_simple(other._is_simple) {
511 if (_is_simple) {
512 new (&_simple) simple(std::move(other._simple));
513 } else {
514 new (&_fragmented) fragmented_type(std::move(other._fragmented));
515 }
516 }
517
518 [[gnu::always_inline]]
519 memory_input_stream& operator=(const memory_input_stream& other) noexcept {
520 // Copy constructor being noexcept makes copy assignment simpler.
521 static_assert(std::is_nothrow_copy_constructible<memory_input_stream>::value,
522 "memory_input_stream copy constructor shouldn't throw");
523 if (this != &other) {
524 this->~memory_input_stream();
525 new (this) memory_input_stream(other);
526 }
527 return *this;
528 }
529
530 [[gnu::always_inline]]
531 memory_input_stream& operator=(memory_input_stream&& other) noexcept {
532 if (this != &other) {
533 this->~memory_input_stream();
534 new (this) memory_input_stream(std::move(other));
535 }
536 return *this;
537 }
538
539 [[gnu::always_inline]]
540 ~memory_input_stream() {
541 if (_is_simple) {
542 _simple.~simple_memory_input_stream();
543 } else {
544 _fragmented.~fragmented_type();
545 }
546 }
547
548 [[gnu::always_inline]]
549 void skip(size_t size) {
550 with_stream([size] (auto& stream) {
551 stream.skip(size);
552 });
553 }
554
555 [[gnu::always_inline]]
556 memory_input_stream read_substream(size_t size) {
557 return with_stream([size] (auto& stream) -> memory_input_stream {
558 return stream.read_substream(size);
559 });
560 }
561
562 [[gnu::always_inline]]
563 void read(char* p, size_t size) {
564 with_stream([p, size] (auto& stream) {
565 stream.read(p, size);
566 });
567 }
568
569 template<typename Output>
570 [[gnu::always_inline]]
571 void copy_to(Output& out) {
572 with_stream([&out] (auto& stream) {
573 stream.copy_to(out);
574 });
575 }
576
577 [[gnu::always_inline]]
578 size_t size() const {
579 return with_stream([] (auto& stream) {
580 return stream.size();
581 });
582 }
583
584 template<typename Stream, typename StreamVisitor>
585 friend decltype(auto) with_serialized_stream(Stream& stream, StreamVisitor&& visitor);
586};
587
588inline simple_memory_input_stream simple_memory_output_stream::to_input_stream() const {
589 return simple_memory_input_stream(_p, _size);
590}
591
592template<typename Iterator>
593inline fragmented_memory_input_stream<Iterator> fragmented_memory_output_stream<Iterator>::to_input_stream() const {
594 return fragmented_memory_input_stream<Iterator>(_it, _current.to_input_stream(), _size);
595}
596
597template<typename Iterator>
598inline memory_input_stream<Iterator> memory_output_stream<Iterator>::to_input_stream() const {
599 return with_stream(make_visitor(
600 [] (const simple_memory_output_stream& ostream) -> memory_input_stream<Iterator> {
601 return ostream.to_input_stream();
602 },
603 [] (const fragmented_memory_output_stream<Iterator>& ostream) -> memory_input_stream<Iterator> {
604 return ostream.to_input_stream();
605 }
606 ));
607}
608
609// The purpose of the with_serialized_stream() is to minimize number of dynamic
610// dispatches. For example, a lot of IDL-generated code looks like this:
611// auto some_value() const {
612// return seastar::with_serialized_stream(v, [] (auto& v) {
613// auto in = v;
614// ser::skip(in, boost::type<type1>());
615// ser::skip(in, boost::type<type2>());
616// return deserialize(in, boost::type<type3>());
617// });
618// }
619// Using with_stream() there is at most one dynamic dispatch per such
620// function, instead of one per each skip() and deserialize() call.
621
622template<typename Stream, typename StreamVisitor, typename = std::enable_if_t<Stream::has_with_stream::value>>
623[[gnu::always_inline]]
624 static inline decltype(auto)
625 with_serialized_stream(Stream& stream, StreamVisitor&& visitor) {
626 return stream.with_stream(std::forward<StreamVisitor>(visitor));
627}
628
629template<typename Stream, typename StreamVisitor, typename = std::enable_if_t<!Stream::has_with_stream::value>, typename = void>
630[[gnu::always_inline]]
631 static inline decltype(auto)
632 with_serialized_stream(Stream& stream, StreamVisitor&& visitor) {
633 return visitor(stream);
634}
635
636using simple_input_stream = simple_memory_input_stream;
637using simple_output_stream = simple_memory_output_stream;
638
639}