]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/Filer.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / osdc / Filer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #include <mutex>
17 #include <algorithm>
18 #include "Filer.h"
19 #include "osd/OSDMap.h"
20 #include "Striper.h"
21
22 #include "messages/MOSDOp.h"
23 #include "messages/MOSDOpReply.h"
24 #include "messages/MOSDMap.h"
25
26 #include "msg/Messenger.h"
27
28 #include "include/Context.h"
29
30 #include "common/Finisher.h"
31 #include "common/config.h"
32
33 #define dout_subsys ceph_subsys_filer
34 #undef dout_prefix
35 #define dout_prefix *_dout << objecter->messenger->get_myname() << ".filer "
36
37 class Filer::C_Probe : public Context {
38 public:
39 Filer *filer;
40 Probe *probe;
41 object_t oid;
42 uint64_t size;
43 ceph::real_time mtime;
44 C_Probe(Filer *f, Probe *p, object_t o) : filer(f), probe(p), oid(o),
45 size(0) {}
46 void finish(int r) override {
47 if (r == -ENOENT) {
48 r = 0;
49 assert(size == 0);
50 }
51
52 bool probe_complete;
53 {
54 Probe::unique_lock pl(probe->lock);
55 if (r != 0) {
56 probe->err = r;
57 }
58
59 probe_complete = filer->_probed(probe, oid, size, mtime, pl);
60 assert(!pl.owns_lock());
61 }
62 if (probe_complete) {
63 probe->onfinish->complete(probe->err);
64 delete probe;
65 }
66 }
67 };
68
69 int Filer::probe(inodeno_t ino,
70 file_layout_t *layout,
71 snapid_t snapid,
72 uint64_t start_from,
73 uint64_t *end, // LB, when !fwd
74 ceph::real_time *pmtime,
75 bool fwd,
76 int flags,
77 Context *onfinish)
78 {
79 ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ")
80 << hex << ino << dec
81 << " starting from " << start_from
82 << dendl;
83
84 assert(snapid); // (until there is a non-NOSNAP write)
85
86 Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
87 flags, fwd, onfinish);
88
89 return probe_impl(probe, layout, start_from, end);
90 }
91
92 int Filer::probe(inodeno_t ino,
93 file_layout_t *layout,
94 snapid_t snapid,
95 uint64_t start_from,
96 uint64_t *end, // LB, when !fwd
97 utime_t *pmtime,
98 bool fwd,
99 int flags,
100 Context *onfinish)
101 {
102 ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ")
103 << hex << ino << dec
104 << " starting from " << start_from
105 << dendl;
106
107 assert(snapid); // (until there is a non-NOSNAP write)
108
109 Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
110 flags, fwd, onfinish);
111 return probe_impl(probe, layout, start_from, end);
112 }
113
114 int Filer::probe_impl(Probe* probe, file_layout_t *layout,
115 uint64_t start_from, uint64_t *end) // LB, when !fwd
116 {
117 // period (bytes before we jump unto a new set of object(s))
118 uint64_t period = layout->get_period();
119
120 // start with 1+ periods.
121 probe->probing_len = period;
122 if (probe->fwd) {
123 if (start_from % period)
124 probe->probing_len += period - (start_from % period);
125 } else {
126 assert(start_from > *end);
127 if (start_from % period)
128 probe->probing_len -= period - (start_from % period);
129 probe->probing_off -= probe->probing_len;
130 }
131
132 Probe::unique_lock pl(probe->lock);
133 _probe(probe, pl);
134 assert(!pl.owns_lock());
135
136 return 0;
137 }
138
139
140
141 /**
142 * probe->lock must be initially locked, this function will release it
143 */
144 void Filer::_probe(Probe *probe, Probe::unique_lock& pl)
145 {
146 assert(pl.owns_lock() && pl.mutex() == &probe->lock);
147
148 ldout(cct, 10) << "_probe " << hex << probe->ino << dec
149 << " " << probe->probing_off << "~" << probe->probing_len
150 << dendl;
151
152 // map range onto objects
153 probe->known_size.clear();
154 probe->probing.clear();
155 Striper::file_to_extents(cct, probe->ino, &probe->layout, probe->probing_off,
156 probe->probing_len, 0, probe->probing);
157
158 std::vector<ObjectExtent> stat_extents;
159 for (vector<ObjectExtent>::iterator p = probe->probing.begin();
160 p != probe->probing.end();
161 ++p) {
162 ldout(cct, 10) << "_probe probing " << p->oid << dendl;
163 probe->ops.insert(p->oid);
164 stat_extents.push_back(*p);
165 }
166
167 pl.unlock();
168 for (std::vector<ObjectExtent>::iterator i = stat_extents.begin();
169 i != stat_extents.end(); ++i) {
170 C_Probe *c = new C_Probe(this, probe, i->oid);
171 objecter->stat(i->oid, i->oloc, probe->snapid, &c->size, &c->mtime,
172 probe->flags | CEPH_OSD_FLAG_RWORDERED,
173 new C_OnFinisher(c, finisher));
174 }
175 }
176
177 /**
178 * probe->lock must be initially held, and will be released by this function.
179 *
180 * @return true if probe is complete and Probe object may be freed.
181 */
182 bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size,
183 ceph::real_time mtime, Probe::unique_lock& pl)
184 {
185 assert(pl.owns_lock() && pl.mutex() == &probe->lock);
186
187 ldout(cct, 10) << "_probed " << probe->ino << " object " << oid
188 << " has size " << size << " mtime " << mtime << dendl;
189
190 probe->known_size[oid] = size;
191 if (mtime > probe->max_mtime)
192 probe->max_mtime = mtime;
193
194 assert(probe->ops.count(oid));
195 probe->ops.erase(oid);
196
197 if (!probe->ops.empty()) {
198 pl.unlock();
199 return false; // waiting for more!
200 }
201
202 if (probe->err) { // we hit an error, propagate back up
203 pl.unlock();
204 return true;
205 }
206
207 // analyze!
208 uint64_t end = 0;
209
210 if (!probe->fwd) {
211 std::reverse(probe->probing.begin(), probe->probing.end());
212 }
213
214 for (vector<ObjectExtent>::iterator p = probe->probing.begin();
215 p != probe->probing.end();
216 ++p) {
217 uint64_t shouldbe = p->length + p->offset;
218 ldout(cct, 10) << "_probed " << probe->ino << " object " << hex
219 << p->oid << dec << " should be " << shouldbe
220 << ", actual is " << probe->known_size[p->oid]
221 << dendl;
222
223 if (!probe->found_size) {
224 assert(probe->known_size[p->oid] <= shouldbe);
225
226 if ((probe->fwd && probe->known_size[p->oid] == shouldbe) ||
227 (!probe->fwd && probe->known_size[p->oid] == 0 &&
228 probe->probing_off > 0))
229 continue; // keep going
230
231 // aha, we found the end!
232 // calc offset into buffer_extent to get distance from probe->from.
233 uint64_t oleft = probe->known_size[p->oid] - p->offset;
234 for (vector<pair<uint64_t, uint64_t> >::iterator i
235 = p->buffer_extents.begin();
236 i != p->buffer_extents.end();
237 ++i) {
238 if (oleft <= (uint64_t)i->second) {
239 end = probe->probing_off + i->first + oleft;
240 ldout(cct, 10) << "_probed end is in buffer_extent " << i->first
241 << "~" << i->second << " off " << oleft
242 << ", from was " << probe->probing_off << ", end is "
243 << end << dendl;
244
245 probe->found_size = true;
246 ldout(cct, 10) << "_probed found size at " << end << dendl;
247 *probe->psize = end;
248
249 if (!probe->pmtime &&
250 !probe->pumtime) // stop if we don't need mtime too
251 break;
252 }
253 oleft -= i->second;
254 }
255 }
256 break;
257 }
258
259 if (!probe->found_size || (probe->probing_off && (probe->pmtime ||
260 probe->pumtime))) {
261 // keep probing!
262 ldout(cct, 10) << "_probed probing further" << dendl;
263
264 uint64_t period = probe->layout.get_period();
265 if (probe->fwd) {
266 probe->probing_off += probe->probing_len;
267 assert(probe->probing_off % period == 0);
268 probe->probing_len = period;
269 } else {
270 // previous period.
271 assert(probe->probing_off % period == 0);
272 probe->probing_len = period;
273 probe->probing_off -= period;
274 }
275 _probe(probe, pl);
276 assert(!pl.owns_lock());
277 return false;
278 } else if (probe->pmtime) {
279 ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
280 *probe->pmtime = probe->max_mtime;
281 } else if (probe->pumtime) {
282 ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
283 *probe->pumtime = ceph::real_clock::to_ceph_timespec(probe->max_mtime);
284 }
285 // done!
286 pl.unlock();
287 return true;
288 }
289
290
291 // -----------------------
292
293 struct PurgeRange {
294 std::mutex lock;
295 typedef std::lock_guard<std::mutex> lock_guard;
296 typedef std::unique_lock<std::mutex> unique_lock;
297 inodeno_t ino;
298 file_layout_t layout;
299 SnapContext snapc;
300 uint64_t first, num;
301 ceph::real_time mtime;
302 int flags;
303 Context *oncommit;
304 int uncommitted;
305 PurgeRange(inodeno_t i, const file_layout_t& l, const SnapContext& sc,
306 uint64_t fo, uint64_t no, ceph::real_time t, int fl,
307 Context *fin)
308 : ino(i), layout(l), snapc(sc), first(fo), num(no), mtime(t), flags(fl),
309 oncommit(fin), uncommitted(0) {}
310 };
311
312 int Filer::purge_range(inodeno_t ino,
313 const file_layout_t *layout,
314 const SnapContext& snapc,
315 uint64_t first_obj, uint64_t num_obj,
316 ceph::real_time mtime,
317 int flags,
318 Context *oncommit)
319 {
320 assert(num_obj > 0);
321
322 // single object? easy!
323 if (num_obj == 1) {
324 object_t oid = file_object_t(ino, first_obj);
325 object_locator_t oloc = OSDMap::file_to_object_locator(*layout);
326 objecter->remove(oid, oloc, snapc, mtime, flags, oncommit);
327 return 0;
328 }
329
330 PurgeRange *pr = new PurgeRange(ino, *layout, snapc, first_obj,
331 num_obj, mtime, flags, oncommit);
332
333 _do_purge_range(pr, 0);
334 return 0;
335 }
336
337 struct C_PurgeRange : public Context {
338 Filer *filer;
339 PurgeRange *pr;
340 C_PurgeRange(Filer *f, PurgeRange *p) : filer(f), pr(p) {}
341 void finish(int r) override {
342 filer->_do_purge_range(pr, 1);
343 }
344 };
345
346 void Filer::_do_purge_range(PurgeRange *pr, int fin)
347 {
348 PurgeRange::unique_lock prl(pr->lock);
349 pr->uncommitted -= fin;
350 ldout(cct, 10) << "_do_purge_range " << pr->ino << " objects " << pr->first
351 << "~" << pr->num << " uncommitted " << pr->uncommitted
352 << dendl;
353
354 if (pr->num == 0 && pr->uncommitted == 0) {
355 pr->oncommit->complete(0);
356 prl.unlock();
357 delete pr;
358 return;
359 }
360
361 std::vector<object_t> remove_oids;
362
363 int max = cct->_conf->filer_max_purge_ops - pr->uncommitted;
364 while (pr->num > 0 && max > 0) {
365 remove_oids.push_back(file_object_t(pr->ino, pr->first));
366 pr->uncommitted++;
367 pr->first++;
368 pr->num--;
369 max--;
370 }
371 prl.unlock();
372
373 // Issue objecter ops outside pr->lock to avoid lock dependency loop
374 for (const auto& oid : remove_oids) {
375 object_locator_t oloc = OSDMap::file_to_object_locator(pr->layout);
376 objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags,
377 new C_OnFinisher(new C_PurgeRange(this, pr), finisher));
378 }
379 }
380
381 // -----------------------
382 struct TruncRange {
383 std::mutex lock;
384 typedef std::lock_guard<std::mutex> lock_guard;
385 typedef std::unique_lock<std::mutex> unique_lock;
386 inodeno_t ino;
387 file_layout_t layout;
388 SnapContext snapc;
389 ceph::real_time mtime;
390 int flags;
391 Context *oncommit;
392 int uncommitted;
393 uint64_t offset;
394 uint64_t length;
395 uint32_t truncate_seq;
396 TruncRange(inodeno_t i, const file_layout_t& l, const SnapContext& sc,
397 ceph::real_time t, int fl, Context *fin,
398 uint64_t off, uint64_t len, uint32_t ts)
399 : ino(i), layout(l), snapc(sc), mtime(t), flags(fl), oncommit(fin),
400 uncommitted(0), offset(off), length(len), truncate_seq(ts) {}
401 };
402
403 void Filer::truncate(inodeno_t ino,
404 file_layout_t *layout,
405 const SnapContext& snapc,
406 uint64_t offset,
407 uint64_t len,
408 __u32 truncate_seq,
409 ceph::real_time mtime,
410 int flags,
411 Context *oncommit)
412 {
413 uint64_t period = layout->get_period();
414 uint64_t num_objs = Striper::get_num_objects(*layout, len + (offset % period));
415 if (num_objs == 1) {
416 vector<ObjectExtent> extents;
417 Striper::file_to_extents(cct, ino, layout, offset, len, 0, extents);
418 vector<OSDOp> ops(1);
419 ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
420 ops[0].op.extent.truncate_seq = truncate_seq;
421 ops[0].op.extent.truncate_size = extents[0].offset;
422 objecter->_modify(extents[0].oid, extents[0].oloc, ops, mtime, snapc,
423 flags, oncommit);
424 return;
425 }
426
427 if (len > 0 && (offset + len) % period)
428 len += period - ((offset + len) % period);
429
430 TruncRange *tr = new TruncRange(ino, *layout, snapc, mtime, flags, oncommit,
431 offset, len, truncate_seq);
432 _do_truncate_range(tr, 0);
433 }
434
435 struct C_TruncRange : public Context {
436 Filer *filer;
437 TruncRange *tr;
438 C_TruncRange(Filer *f, TruncRange *t) : filer(f), tr(t) {}
439 void finish(int r) override {
440 filer->_do_truncate_range(tr, 1);
441 }
442 };
443
444 void Filer::_do_truncate_range(TruncRange *tr, int fin)
445 {
446 TruncRange::unique_lock trl(tr->lock);
447 tr->uncommitted -= fin;
448 ldout(cct, 10) << "_do_truncate_range " << tr->ino << " objects " << tr->offset
449 << "~" << tr->length << " uncommitted " << tr->uncommitted
450 << dendl;
451
452 if (tr->length == 0 && tr->uncommitted == 0) {
453 tr->oncommit->complete(0);
454 trl.unlock();
455 delete tr;
456 return;
457 }
458
459 vector<ObjectExtent> extents;
460
461 int max = cct->_conf->filer_max_truncate_ops - tr->uncommitted;
462 if (max > 0 && tr->length > 0) {
463 uint64_t len = tr->layout.get_period() * max;
464 if (len > tr->length)
465 len = tr->length;
466
467 uint64_t offset = tr->offset + tr->length - len;
468 Striper::file_to_extents(cct, tr->ino, &tr->layout, offset, len, 0, extents);
469 tr->uncommitted += extents.size();
470 tr->length -= len;
471 }
472
473 trl.unlock();
474
475 // Issue objecter ops outside tr->lock to avoid lock dependency loop
476 for (const auto& p : extents) {
477 vector<OSDOp> ops(1);
478 ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
479 ops[0].op.extent.truncate_size = p.offset;
480 ops[0].op.extent.truncate_seq = tr->truncate_seq;
481 objecter->_modify(p.oid, p.oloc, ops, tr->mtime, tr->snapc, tr->flags,
482 new C_OnFinisher(new C_TruncRange(this, tr), finisher));
483 }
484 }