]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2020 ScyllaDB | |
20 | */ | |
21 | ||
22 | ||
23 | // Demonstration of seastar::with_file | |
24 | ||
25 | #include <cstring> | |
26 | #include <limits> | |
27 | #include <random> | |
28 | ||
29 | #include <seastar/core/app-template.hh> | |
30 | ||
31 | #include <seastar/core/aligned_buffer.hh> | |
32 | #include <seastar/core/file.hh> | |
33 | #include <seastar/core/fstream.hh> | |
34 | #include <seastar/core/seastar.hh> | |
35 | #include <seastar/core/sstring.hh> | |
36 | #include <seastar/core/temporary_buffer.hh> | |
37 | #include <seastar/core/loop.hh> | |
20effc67 | 38 | #include <seastar/core/io_intent.hh> |
f67539c2 TL |
39 | #include <seastar/util/log.hh> |
40 | #include <seastar/util/tmp_file.hh> | |
41 | ||
42 | using namespace seastar; | |
43 | ||
44 | constexpr size_t aligned_size = 4096; | |
45 | ||
46 | future<> verify_data_file(file& f, temporary_buffer<char>& rbuf, const temporary_buffer<char>& wbuf) { | |
47 | return f.dma_read(0, rbuf.get_write(), aligned_size).then([&rbuf, &wbuf] (size_t count) { | |
48 | assert(count == aligned_size); | |
49 | fmt::print(" verifying {} bytes\n", count); | |
50 | assert(!memcmp(rbuf.get(), wbuf.get(), aligned_size)); | |
51 | }); | |
52 | } | |
53 | ||
54 | future<file> open_data_file(sstring meta_filename, temporary_buffer<char>& rbuf) { | |
55 | fmt::print(" retrieving data filename from {}\n", meta_filename); | |
56 | return with_file(open_file_dma(meta_filename, open_flags::ro), [&rbuf] (file& f) { | |
57 | return f.dma_read(0, rbuf.get_write(), aligned_size).then([&rbuf] (size_t count) { | |
58 | assert(count == aligned_size); | |
59 | auto data_filename = sstring(rbuf.get()); | |
60 | fmt::print(" opening {}\n", data_filename); | |
61 | return open_file_dma(data_filename, open_flags::ro); | |
62 | }); | |
63 | }); | |
64 | } | |
65 | ||
66 | future<> demo_with_file() { | |
67 | fmt::print("Demonstrating with_file():\n"); | |
68 | return tmp_dir::do_with_thread([] (tmp_dir& t) { | |
69 | auto rnd = std::mt19937(std::random_device()()); | |
70 | auto dist = std::uniform_int_distribution<char>(0, std::numeric_limits<char>::max()); | |
71 | auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); | |
72 | sstring meta_filename = (t.get_path() / "meta_file").native(); | |
73 | sstring data_filename = (t.get_path() / "data_file").native(); | |
74 | ||
75 | // `with_file` is used to create/open `filename` just around the call to `dma_write` | |
76 | auto write_to_file = [] (const sstring filename, temporary_buffer<char>& wbuf) { | |
77 | auto count = with_file(open_file_dma(filename, open_flags::rw | open_flags::create), [&wbuf] (file& f) { | |
78 | return f.dma_write(0, wbuf.get(), aligned_size); | |
79 | }).get0(); | |
80 | assert(count == aligned_size); | |
81 | }; | |
82 | ||
83 | // print the data_filename into the write buffer | |
84 | std::fill(wbuf.get_write(), wbuf.get_write() + aligned_size, 0); | |
85 | std::copy(data_filename.cbegin(), data_filename.cend(), wbuf.get_write()); | |
86 | ||
87 | // and write it to `meta_filename` | |
88 | fmt::print(" writing \"{}\" into {}\n", data_filename, meta_filename); | |
89 | ||
90 | write_to_file(meta_filename, wbuf); | |
91 | ||
92 | // now write some random data into data_filename | |
93 | fmt::print(" writing random data into {}\n", data_filename); | |
94 | std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); }); | |
95 | ||
96 | write_to_file(data_filename, wbuf); | |
97 | ||
98 | // verify the data via meta_filename | |
99 | fmt::print(" verifying data...\n"); | |
100 | auto rbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); | |
101 | ||
102 | with_file(open_data_file(meta_filename, rbuf), [&rbuf, &wbuf] (file& f) { | |
103 | return verify_data_file(f, rbuf, wbuf); | |
104 | }).get(); | |
105 | }); | |
106 | } | |
107 | ||
108 | future<> demo_with_file_close_on_failure() { | |
109 | fmt::print("\nDemonstrating with_file_close_on_failure():\n"); | |
110 | return tmp_dir::do_with_thread([] (tmp_dir& t) { | |
111 | auto rnd = std::mt19937(std::random_device()()); | |
112 | auto dist = std::uniform_int_distribution<char>(0, std::numeric_limits<char>::max()); | |
113 | auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); | |
114 | sstring meta_filename = (t.get_path() / "meta_file").native(); | |
115 | sstring data_filename = (t.get_path() / "data_file").native(); | |
116 | ||
117 | // with_file_close_on_failure will close the opened file only if | |
118 | // `make_file_output_stream` returns an error. Otherwise, in the error-free path, | |
119 | // the opened file is moved to `file_output_stream` that in-turn closes it | |
120 | // when the stream is closed. | |
121 | auto make_output_stream = [] (const sstring filename) { | |
122 | return with_file_close_on_failure(open_file_dma(std::move(filename), open_flags::rw | open_flags::create), [] (file f) { | |
123 | return make_file_output_stream(std::move(f), aligned_size); | |
124 | }); | |
125 | }; | |
126 | ||
127 | // writes the buffer one byte at a time, to demonstrate output stream | |
128 | auto write_to_stream = [] (output_stream<char>& o, const temporary_buffer<char>& wbuf) { | |
129 | return seastar::do_for_each(wbuf, [&o] (char c) { | |
130 | return o.write(&c, 1); | |
131 | }).finally([&o] { | |
132 | return o.close(); | |
133 | }); | |
134 | }; | |
135 | ||
136 | // print the data_filename into the write buffer | |
137 | std::fill(wbuf.get_write(), wbuf.get_write() + aligned_size, 0); | |
138 | std::copy(data_filename.cbegin(), data_filename.cend(), wbuf.get_write()); | |
139 | ||
140 | // and write it to `meta_filename` | |
141 | fmt::print(" writing \"{}\" into {}\n", data_filename, meta_filename); | |
142 | ||
143 | // with_file_close_on_failure will close the opened file only if | |
144 | // `make_file_output_stream` returns an error. Otherwise, in the error-free path, | |
145 | // the opened file is moved to `file_output_stream` that in-turn closes it | |
146 | // when the stream is closed. | |
147 | output_stream<char> o = make_output_stream(meta_filename).get0(); | |
148 | ||
149 | write_to_stream(o, wbuf).get(); | |
150 | ||
151 | // now write some random data into data_filename | |
152 | fmt::print(" writing random data into {}\n", data_filename); | |
153 | std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); }); | |
154 | ||
155 | o = make_output_stream(data_filename).get0(); | |
156 | ||
157 | write_to_stream(o, wbuf).get(); | |
158 | ||
159 | // verify the data via meta_filename | |
160 | fmt::print(" verifying data...\n"); | |
161 | auto rbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); | |
162 | ||
163 | with_file(open_data_file(meta_filename, rbuf), [&rbuf, &wbuf] (file& f) { | |
164 | return verify_data_file(f, rbuf, wbuf); | |
165 | }).get(); | |
166 | }); | |
167 | } | |
168 | ||
20effc67 TL |
169 | static constexpr size_t half_aligned_size = aligned_size / 2; |
170 | ||
171 | future<> demo_with_io_intent() { | |
172 | fmt::print("\nDemonstrating demo_with_io_intent():\n"); | |
173 | return tmp_dir::do_with_thread([] (tmp_dir& t) { | |
174 | sstring filename = (t.get_path() / "testfile.tmp").native(); | |
175 | auto f = open_file_dma(filename, open_flags::rw | open_flags::create).get0(); | |
176 | ||
177 | auto rnd = std::mt19937(std::random_device()()); | |
178 | auto dist = std::uniform_int_distribution<char>(0, std::numeric_limits<char>::max()); | |
179 | ||
180 | auto wbuf = temporary_buffer<char>::aligned(aligned_size, aligned_size); | |
181 | fmt::print(" writing random data into {}\n", filename); | |
182 | std::generate(wbuf.get_write(), wbuf.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); }); | |
183 | ||
184 | f.dma_write(0, wbuf.get(), aligned_size).get(); | |
185 | ||
186 | auto wbuf_n = temporary_buffer<char>::aligned(aligned_size, aligned_size); | |
187 | fmt::print(" starting to overwrite {} with other random data in two steps\n", filename); | |
188 | std::generate(wbuf_n.get_write(), wbuf_n.get_write() + aligned_size, [&dist, &rnd] { return dist(rnd); }); | |
189 | ||
190 | io_intent intent; | |
191 | auto f1 = f.dma_write(0, wbuf_n.get(), half_aligned_size); | |
192 | auto f2 = f.dma_write(half_aligned_size, wbuf_n.get() + half_aligned_size, half_aligned_size, default_priority_class(), &intent); | |
193 | ||
194 | fmt::print(" cancel the 2nd overwriting\n"); | |
195 | intent.cancel(); | |
196 | ||
197 | fmt::print(" wait for overwriting IOs to complete\n"); | |
198 | f1.get(); | |
199 | ||
200 | bool cancelled = false; | |
201 | try { | |
202 | f2.get(); | |
203 | // The file::dma_write doesn't preemt, but if it | |
204 | // suddenly will, the 2nd write will pass before | |
205 | // the intent would be cancelled | |
206 | fmt::print(" 2nd write won the race with cancellation\n"); | |
207 | } catch (cancelled_error& ex) { | |
208 | cancelled = true; | |
209 | } | |
210 | ||
211 | fmt::print(" verifying data...\n"); | |
212 | auto rbuf = allocate_aligned_buffer<unsigned char>(aligned_size, aligned_size); | |
213 | f.dma_read(0, rbuf.get(), aligned_size).get(); | |
214 | ||
215 | // First part of the buffer must coincide with the overwritten data | |
216 | assert(!memcmp(rbuf.get(), wbuf_n.get(), half_aligned_size)); | |
217 | ||
218 | if (cancelled) { | |
219 | // Second part -- with the old data ... | |
220 | assert(!memcmp(rbuf.get() + half_aligned_size, wbuf.get() + half_aligned_size, half_aligned_size)); | |
221 | } else { | |
222 | // ... or with new if the cancellation didn't happen | |
223 | assert(!memcmp(rbuf.get() + half_aligned_size, wbuf.get() + half_aligned_size, half_aligned_size)); | |
224 | } | |
225 | }); | |
226 | } | |
227 | ||
f67539c2 TL |
228 | int main(int ac, char** av) { |
229 | app_template app; | |
230 | return app.run(ac, av, [] { | |
231 | return demo_with_file().then([] { | |
20effc67 TL |
232 | return demo_with_file_close_on_failure().then([] { |
233 | return demo_with_io_intent(); | |
234 | }); | |
f67539c2 TL |
235 | }); |
236 | }); | |
237 | } |