]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/demos/file_demo.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / demos / file_demo.cc
CommitLineData
f67539c2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2020 ScyllaDB
20 */
21
22
23// Demonstration of seastar::with_file
24
25#include <cstring>
26#include <limits>
27#include <random>
28
29#include <seastar/core/app-template.hh>
30
31#include <seastar/core/aligned_buffer.hh>
32#include <seastar/core/file.hh>
33#include <seastar/core/fstream.hh>
34#include <seastar/core/seastar.hh>
35#include <seastar/core/sstring.hh>
36#include <seastar/core/temporary_buffer.hh>
37#include <seastar/core/loop.hh>
20effc67 38#include <seastar/core/io_intent.hh>
f67539c2
TL
39#include <seastar/util/log.hh>
40#include <seastar/util/tmp_file.hh>
41
42using namespace seastar;
43
44constexpr size_t aligned_size = 4096;
45
46future<> verify_data_file(file& f, temporary_buffer<char>& rbuf, const temporary_buffer<char>& wbuf) {
47 return f.dma_read(0, rbuf.get_write(), aligned_size).then([&rbuf, &wbuf] (size_t count) {
48 assert(count == aligned_size);
49 fmt::print(" verifying {} bytes\n", count);
50 assert(!memcmp(rbuf.get(), wbuf.get(), aligned_size));
51 });
52}
53
54future<file> open_data_file(sstring meta_filename, temporary_buffer<char>& rbuf) {
55 fmt::print(" retrieving data filename from {}\n", meta_filename);
56 return with_file(open_file_dma(meta_filename, open_flags::ro), [&rbuf] (file& f) {
57 return f.dma_read(0, rbuf.get_write(), aligned_size).then([&rbuf] (size_t count) {
58 assert(count == aligned_size);
59 auto data_filename = sstring(rbuf.get());
60 fmt::print(" opening {}\n", data_filename);
61 return open_file_dma(data_filename, open_flags::ro);
62 });
63 });
64}
65
66future<> demo_with_file() {
67 fmt::print("Demonstrating with_file():\n");
68 return tmp_dir::do_with_thread([] (tmp_dir& t) {
69 auto rnd = std::mt19937(std::random_device()());
70 auto dist = std::uniform_int_distribution<char>(0, std::numeric_limits<char>::max());
71 auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size);
72 sstring meta_filename = (t.get_path() / "meta_file").native();
73 sstring data_filename = (t.get_path() / "data_file").native();
74
75 // `with_file` is used to create/open `filename` just around the call to `dma_write`
76 auto write_to_file = [] (const sstring filename, temporary_buffer<char>& wbuf) {
77 auto count = with_file(open_file_dma(filename, open_flags::rw | open_flags::create), [&wbuf] (file& f) {
78 return f.dma_write(0, wbuf.get(), aligned_size);
79 }).get0();
80 assert(count == aligned_size);
81 };
82
83 // print the data_filename into the write buffer
84 std::fill(wbuf.get_write(), wbuf.get_write() + aligned_size, 0);
85 std::copy(data_filename.cbegin(), data_filename.cend(), wbuf.get_write());
86
87 // and write it to `meta_filename`
88 fmt::print(" writing \"{}\" into {}\n", data_filename, meta_filename);
89
90 write_to_file(meta_filename, wbuf);
91
92 // now write some random data into data_filename
93 fmt::print(" writing random data into {}\n", data_filename);
94 std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); });
95
96 write_to_file(data_filename, wbuf);
97
98 // verify the data via meta_filename
99 fmt::print(" verifying data...\n");
100 auto rbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size);
101
102 with_file(open_data_file(meta_filename, rbuf), [&rbuf, &wbuf] (file& f) {
103 return verify_data_file(f, rbuf, wbuf);
104 }).get();
105 });
106}
107
108future<> demo_with_file_close_on_failure() {
109 fmt::print("\nDemonstrating with_file_close_on_failure():\n");
110 return tmp_dir::do_with_thread([] (tmp_dir& t) {
111 auto rnd = std::mt19937(std::random_device()());
112 auto dist = std::uniform_int_distribution<char>(0, std::numeric_limits<char>::max());
113 auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size);
114 sstring meta_filename = (t.get_path() / "meta_file").native();
115 sstring data_filename = (t.get_path() / "data_file").native();
116
117 // with_file_close_on_failure will close the opened file only if
118 // `make_file_output_stream` returns an error. Otherwise, in the error-free path,
119 // the opened file is moved to `file_output_stream` that in-turn closes it
120 // when the stream is closed.
121 auto make_output_stream = [] (const sstring filename) {
122 return with_file_close_on_failure(open_file_dma(std::move(filename), open_flags::rw | open_flags::create), [] (file f) {
123 return make_file_output_stream(std::move(f), aligned_size);
124 });
125 };
126
127 // writes the buffer one byte at a time, to demonstrate output stream
128 auto write_to_stream = [] (output_stream<char>& o, const temporary_buffer<char>& wbuf) {
129 return seastar::do_for_each(wbuf, [&o] (char c) {
130 return o.write(&c, 1);
131 }).finally([&o] {
132 return o.close();
133 });
134 };
135
136 // print the data_filename into the write buffer
137 std::fill(wbuf.get_write(), wbuf.get_write() + aligned_size, 0);
138 std::copy(data_filename.cbegin(), data_filename.cend(), wbuf.get_write());
139
140 // and write it to `meta_filename`
141 fmt::print(" writing \"{}\" into {}\n", data_filename, meta_filename);
142
143 // with_file_close_on_failure will close the opened file only if
144 // `make_file_output_stream` returns an error. Otherwise, in the error-free path,
145 // the opened file is moved to `file_output_stream` that in-turn closes it
146 // when the stream is closed.
147 output_stream<char> o = make_output_stream(meta_filename).get0();
148
149 write_to_stream(o, wbuf).get();
150
151 // now write some random data into data_filename
152 fmt::print(" writing random data into {}\n", data_filename);
153 std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); });
154
155 o = make_output_stream(data_filename).get0();
156
157 write_to_stream(o, wbuf).get();
158
159 // verify the data via meta_filename
160 fmt::print(" verifying data...\n");
161 auto rbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size);
162
163 with_file(open_data_file(meta_filename, rbuf), [&rbuf, &wbuf] (file& f) {
164 return verify_data_file(f, rbuf, wbuf);
165 }).get();
166 });
167}
168
20effc67
TL
169static constexpr size_t half_aligned_size = aligned_size / 2;
170
171future<> demo_with_io_intent() {
172 fmt::print("\nDemonstrating demo_with_io_intent():\n");
173 return tmp_dir::do_with_thread([] (tmp_dir& t) {
174 sstring filename = (t.get_path() / "testfile.tmp").native();
175 auto f = open_file_dma(filename, open_flags::rw | open_flags::create).get0();
176
177 auto rnd = std::mt19937(std::random_device()());
178 auto dist = std::uniform_int_distribution<char>(0, std::numeric_limits<char>::max());
179
180 auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size);
181 fmt::print(" writing random data into {}\n", filename);
182 std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); });
183
184 f.dma_write(0, wbuf.get(), aligned_size).get();
185
186 auto wbuf_n = temporary_buffer<char>::aligned(aligned_size, aligned_size);
187 fmt::print(" starting to overwrite {} with other random data in two steps\n", filename);
188 std::generate(wbuf_n.get_write(), wbuf_n.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); });
189
190 io_intent intent;
191 auto f1 = f.dma_write(0, wbuf_n.get(), half_aligned_size);
192 auto f2 = f.dma_write(half_aligned_size, wbuf_n.get() + half_aligned_size, half_aligned_size, default_priority_class(), &intent);
193
194 fmt::print(" cancel the 2nd overwriting\n");
195 intent.cancel();
196
197 fmt::print(" wait for overwriting IOs to complete\n");
198 f1.get();
199
200 bool cancelled = false;
201 try {
202 f2.get();
203 // The file::dma_write doesn't preemt, but if it
204 // suddenly will, the 2nd write will pass before
205 // the intent would be cancelled
206 fmt::print(" 2nd write won the race with cancellation\n");
207 } catch (cancelled_error& ex) {
208 cancelled = true;
209 }
210
211 fmt::print(" verifying data...\n");
212 auto rbuf = allocate_aligned_buffer<unsigned char>(aligned_size, aligned_size);
213 f.dma_read(0, rbuf.get(), aligned_size).get();
214
215 // First part of the buffer must coincide with the overwritten data
216 assert(!memcmp(rbuf.get(), wbuf_n.get(), half_aligned_size));
217
218 if (cancelled) {
219 // Second part -- with the old data ...
220 assert(!memcmp(rbuf.get() + half_aligned_size, wbuf.get() + half_aligned_size, half_aligned_size));
221 } else {
222 // ... or with new if the cancellation didn't happen
223 assert(!memcmp(rbuf.get() + half_aligned_size, wbuf.get() + half_aligned_size, half_aligned_size));
224 }
225 });
226}
227
f67539c2
TL
228int main(int ac, char** av) {
229 app_template app;
230 return app.run(ac, av, [] {
231 return demo_with_file().then([] {
20effc67
TL
232 return demo_with_file_close_on_failure().then([] {
233 return demo_with_io_intent();
234 });
f67539c2
TL
235 });
236 });
237}