]> git.proxmox.com Git - ceph.git/blame - ceph/src/osdc/Filer.cc
import 15.2.4
[ceph.git] / ceph / src / osdc / Filer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include <mutex>
17#include <algorithm>
18#include "Filer.h"
19#include "osd/OSDMap.h"
20#include "Striper.h"
21
22#include "messages/MOSDOp.h"
23#include "messages/MOSDOpReply.h"
24#include "messages/MOSDMap.h"
25
26#include "msg/Messenger.h"
27
28#include "include/Context.h"
29
30#include "common/Finisher.h"
31#include "common/config.h"
32
33#define dout_subsys ceph_subsys_filer
34#undef dout_prefix
35#define dout_prefix *_dout << objecter->messenger->get_myname() << ".filer "
36
37class Filer::C_Probe : public Context {
38public:
39 Filer *filer;
40 Probe *probe;
41 object_t oid;
42 uint64_t size;
43 ceph::real_time mtime;
44 C_Probe(Filer *f, Probe *p, object_t o) : filer(f), probe(p), oid(o),
45 size(0) {}
46 void finish(int r) override {
47 if (r == -ENOENT) {
48 r = 0;
11fdf7f2 49 ceph_assert(size == 0);
7c673cae
FG
50 }
51
52 bool probe_complete;
53 {
54 Probe::unique_lock pl(probe->lock);
55 if (r != 0) {
56 probe->err = r;
57 }
58
59 probe_complete = filer->_probed(probe, oid, size, mtime, pl);
11fdf7f2 60 ceph_assert(!pl.owns_lock());
7c673cae
FG
61 }
62 if (probe_complete) {
63 probe->onfinish->complete(probe->err);
64 delete probe;
65 }
66 }
67};
68
69int Filer::probe(inodeno_t ino,
70 file_layout_t *layout,
71 snapid_t snapid,
72 uint64_t start_from,
73 uint64_t *end, // LB, when !fwd
74 ceph::real_time *pmtime,
75 bool fwd,
76 int flags,
77 Context *onfinish)
78{
79 ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ")
80 << hex << ino << dec
81 << " starting from " << start_from
82 << dendl;
83
11fdf7f2 84 ceph_assert(snapid); // (until there is a non-NOSNAP write)
7c673cae
FG
85
86 Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
87 flags, fwd, onfinish);
88
89 return probe_impl(probe, layout, start_from, end);
90}
91
92int Filer::probe(inodeno_t ino,
93 file_layout_t *layout,
94 snapid_t snapid,
95 uint64_t start_from,
96 uint64_t *end, // LB, when !fwd
97 utime_t *pmtime,
98 bool fwd,
99 int flags,
100 Context *onfinish)
101{
102 ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ")
103 << hex << ino << dec
104 << " starting from " << start_from
105 << dendl;
106
11fdf7f2 107 ceph_assert(snapid); // (until there is a non-NOSNAP write)
7c673cae
FG
108
109 Probe *probe = new Probe(ino, *layout, snapid, start_from, end, pmtime,
110 flags, fwd, onfinish);
111 return probe_impl(probe, layout, start_from, end);
112}
113
114int Filer::probe_impl(Probe* probe, file_layout_t *layout,
115 uint64_t start_from, uint64_t *end) // LB, when !fwd
116{
117 // period (bytes before we jump unto a new set of object(s))
118 uint64_t period = layout->get_period();
119
120 // start with 1+ periods.
121 probe->probing_len = period;
122 if (probe->fwd) {
123 if (start_from % period)
124 probe->probing_len += period - (start_from % period);
125 } else {
11fdf7f2 126 ceph_assert(start_from > *end);
7c673cae
FG
127 if (start_from % period)
128 probe->probing_len -= period - (start_from % period);
129 probe->probing_off -= probe->probing_len;
130 }
131
132 Probe::unique_lock pl(probe->lock);
133 _probe(probe, pl);
11fdf7f2 134 ceph_assert(!pl.owns_lock());
7c673cae
FG
135
136 return 0;
137}
138
139
140
141/**
142 * probe->lock must be initially locked, this function will release it
143 */
144void Filer::_probe(Probe *probe, Probe::unique_lock& pl)
145{
11fdf7f2 146 ceph_assert(pl.owns_lock() && pl.mutex() == &probe->lock);
7c673cae
FG
147
148 ldout(cct, 10) << "_probe " << hex << probe->ino << dec
149 << " " << probe->probing_off << "~" << probe->probing_len
150 << dendl;
151
152 // map range onto objects
153 probe->known_size.clear();
154 probe->probing.clear();
155 Striper::file_to_extents(cct, probe->ino, &probe->layout, probe->probing_off,
156 probe->probing_len, 0, probe->probing);
157
158 std::vector<ObjectExtent> stat_extents;
159 for (vector<ObjectExtent>::iterator p = probe->probing.begin();
160 p != probe->probing.end();
161 ++p) {
162 ldout(cct, 10) << "_probe probing " << p->oid << dendl;
163 probe->ops.insert(p->oid);
164 stat_extents.push_back(*p);
165 }
166
167 pl.unlock();
168 for (std::vector<ObjectExtent>::iterator i = stat_extents.begin();
169 i != stat_extents.end(); ++i) {
170 C_Probe *c = new C_Probe(this, probe, i->oid);
171 objecter->stat(i->oid, i->oloc, probe->snapid, &c->size, &c->mtime,
172 probe->flags | CEPH_OSD_FLAG_RWORDERED,
173 new C_OnFinisher(c, finisher));
174 }
175}
176
177/**
178 * probe->lock must be initially held, and will be released by this function.
179 *
180 * @return true if probe is complete and Probe object may be freed.
181 */
182bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size,
183 ceph::real_time mtime, Probe::unique_lock& pl)
184{
11fdf7f2 185 ceph_assert(pl.owns_lock() && pl.mutex() == &probe->lock);
7c673cae
FG
186
187 ldout(cct, 10) << "_probed " << probe->ino << " object " << oid
188 << " has size " << size << " mtime " << mtime << dendl;
189
190 probe->known_size[oid] = size;
191 if (mtime > probe->max_mtime)
192 probe->max_mtime = mtime;
193
11fdf7f2 194 ceph_assert(probe->ops.count(oid));
7c673cae
FG
195 probe->ops.erase(oid);
196
197 if (!probe->ops.empty()) {
198 pl.unlock();
199 return false; // waiting for more!
200 }
201
202 if (probe->err) { // we hit an error, propagate back up
203 pl.unlock();
204 return true;
205 }
206
207 // analyze!
208 uint64_t end = 0;
209
210 if (!probe->fwd) {
211 std::reverse(probe->probing.begin(), probe->probing.end());
212 }
213
214 for (vector<ObjectExtent>::iterator p = probe->probing.begin();
215 p != probe->probing.end();
216 ++p) {
217 uint64_t shouldbe = p->length + p->offset;
218 ldout(cct, 10) << "_probed " << probe->ino << " object " << hex
219 << p->oid << dec << " should be " << shouldbe
220 << ", actual is " << probe->known_size[p->oid]
221 << dendl;
222
223 if (!probe->found_size) {
11fdf7f2 224 ceph_assert(probe->known_size[p->oid] <= shouldbe);
7c673cae
FG
225
226 if ((probe->fwd && probe->known_size[p->oid] == shouldbe) ||
227 (!probe->fwd && probe->known_size[p->oid] == 0 &&
228 probe->probing_off > 0))
229 continue; // keep going
230
231 // aha, we found the end!
232 // calc offset into buffer_extent to get distance from probe->from.
233 uint64_t oleft = probe->known_size[p->oid] - p->offset;
234 for (vector<pair<uint64_t, uint64_t> >::iterator i
235 = p->buffer_extents.begin();
236 i != p->buffer_extents.end();
237 ++i) {
238 if (oleft <= (uint64_t)i->second) {
239 end = probe->probing_off + i->first + oleft;
240 ldout(cct, 10) << "_probed end is in buffer_extent " << i->first
241 << "~" << i->second << " off " << oleft
242 << ", from was " << probe->probing_off << ", end is "
243 << end << dendl;
244
245 probe->found_size = true;
246 ldout(cct, 10) << "_probed found size at " << end << dendl;
247 *probe->psize = end;
248
249 if (!probe->pmtime &&
250 !probe->pumtime) // stop if we don't need mtime too
251 break;
252 }
253 oleft -= i->second;
254 }
255 }
256 break;
257 }
258
259 if (!probe->found_size || (probe->probing_off && (probe->pmtime ||
260 probe->pumtime))) {
261 // keep probing!
262 ldout(cct, 10) << "_probed probing further" << dendl;
263
264 uint64_t period = probe->layout.get_period();
265 if (probe->fwd) {
266 probe->probing_off += probe->probing_len;
11fdf7f2 267 ceph_assert(probe->probing_off % period == 0);
7c673cae
FG
268 probe->probing_len = period;
269 } else {
270 // previous period.
11fdf7f2 271 ceph_assert(probe->probing_off % period == 0);
7c673cae
FG
272 probe->probing_len = period;
273 probe->probing_off -= period;
274 }
275 _probe(probe, pl);
11fdf7f2 276 ceph_assert(!pl.owns_lock());
7c673cae
FG
277 return false;
278 } else if (probe->pmtime) {
279 ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
280 *probe->pmtime = probe->max_mtime;
281 } else if (probe->pumtime) {
282 ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
283 *probe->pumtime = ceph::real_clock::to_ceph_timespec(probe->max_mtime);
284 }
285 // done!
286 pl.unlock();
287 return true;
288}
289
290
291// -----------------------
292
293struct PurgeRange {
294 std::mutex lock;
295 typedef std::lock_guard<std::mutex> lock_guard;
296 typedef std::unique_lock<std::mutex> unique_lock;
297 inodeno_t ino;
298 file_layout_t layout;
299 SnapContext snapc;
300 uint64_t first, num;
301 ceph::real_time mtime;
302 int flags;
303 Context *oncommit;
304 int uncommitted;
e306af50 305 int err = 0;
7c673cae
FG
306 PurgeRange(inodeno_t i, const file_layout_t& l, const SnapContext& sc,
307 uint64_t fo, uint64_t no, ceph::real_time t, int fl,
308 Context *fin)
309 : ino(i), layout(l), snapc(sc), first(fo), num(no), mtime(t), flags(fl),
310 oncommit(fin), uncommitted(0) {}
311};
312
313int Filer::purge_range(inodeno_t ino,
314 const file_layout_t *layout,
315 const SnapContext& snapc,
316 uint64_t first_obj, uint64_t num_obj,
317 ceph::real_time mtime,
318 int flags,
319 Context *oncommit)
320{
11fdf7f2 321 ceph_assert(num_obj > 0);
7c673cae
FG
322
323 // single object? easy!
324 if (num_obj == 1) {
325 object_t oid = file_object_t(ino, first_obj);
326 object_locator_t oloc = OSDMap::file_to_object_locator(*layout);
9f95a23c 327 ldout(cct, 10) << "purge_range removing " << oid << dendl;
7c673cae
FG
328 objecter->remove(oid, oloc, snapc, mtime, flags, oncommit);
329 return 0;
330 }
331
332 PurgeRange *pr = new PurgeRange(ino, *layout, snapc, first_obj,
333 num_obj, mtime, flags, oncommit);
334
e306af50 335 _do_purge_range(pr, 0, 0);
7c673cae
FG
336 return 0;
337}
338
339struct C_PurgeRange : public Context {
340 Filer *filer;
341 PurgeRange *pr;
342 C_PurgeRange(Filer *f, PurgeRange *p) : filer(f), pr(p) {}
343 void finish(int r) override {
e306af50 344 filer->_do_purge_range(pr, 1, r);
7c673cae
FG
345 }
346};
347
e306af50 348void Filer::_do_purge_range(PurgeRange *pr, int fin, int err)
7c673cae
FG
349{
350 PurgeRange::unique_lock prl(pr->lock);
e306af50
TL
351 if (err && err != -ENOENT)
352 pr->err = err;
7c673cae
FG
353 pr->uncommitted -= fin;
354 ldout(cct, 10) << "_do_purge_range " << pr->ino << " objects " << pr->first
355 << "~" << pr->num << " uncommitted " << pr->uncommitted
356 << dendl;
357
358 if (pr->num == 0 && pr->uncommitted == 0) {
e306af50 359 pr->oncommit->complete(pr->err);
7c673cae
FG
360 prl.unlock();
361 delete pr;
362 return;
363 }
364
365 std::vector<object_t> remove_oids;
366
367 int max = cct->_conf->filer_max_purge_ops - pr->uncommitted;
368 while (pr->num > 0 && max > 0) {
369 remove_oids.push_back(file_object_t(pr->ino, pr->first));
370 pr->uncommitted++;
371 pr->first++;
372 pr->num--;
373 max--;
374 }
375 prl.unlock();
376
377 // Issue objecter ops outside pr->lock to avoid lock dependency loop
378 for (const auto& oid : remove_oids) {
379 object_locator_t oloc = OSDMap::file_to_object_locator(pr->layout);
380 objecter->remove(oid, oloc, pr->snapc, pr->mtime, pr->flags,
381 new C_OnFinisher(new C_PurgeRange(this, pr), finisher));
382 }
383}
384
385// -----------------------
386struct TruncRange {
387 std::mutex lock;
388 typedef std::lock_guard<std::mutex> lock_guard;
389 typedef std::unique_lock<std::mutex> unique_lock;
390 inodeno_t ino;
391 file_layout_t layout;
392 SnapContext snapc;
393 ceph::real_time mtime;
394 int flags;
395 Context *oncommit;
396 int uncommitted;
397 uint64_t offset;
398 uint64_t length;
399 uint32_t truncate_seq;
400 TruncRange(inodeno_t i, const file_layout_t& l, const SnapContext& sc,
401 ceph::real_time t, int fl, Context *fin,
402 uint64_t off, uint64_t len, uint32_t ts)
403 : ino(i), layout(l), snapc(sc), mtime(t), flags(fl), oncommit(fin),
404 uncommitted(0), offset(off), length(len), truncate_seq(ts) {}
405};
406
407void Filer::truncate(inodeno_t ino,
408 file_layout_t *layout,
409 const SnapContext& snapc,
410 uint64_t offset,
411 uint64_t len,
412 __u32 truncate_seq,
413 ceph::real_time mtime,
414 int flags,
415 Context *oncommit)
416{
417 uint64_t period = layout->get_period();
418 uint64_t num_objs = Striper::get_num_objects(*layout, len + (offset % period));
419 if (num_objs == 1) {
420 vector<ObjectExtent> extents;
421 Striper::file_to_extents(cct, ino, layout, offset, len, 0, extents);
422 vector<OSDOp> ops(1);
423 ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
424 ops[0].op.extent.truncate_seq = truncate_seq;
425 ops[0].op.extent.truncate_size = extents[0].offset;
426 objecter->_modify(extents[0].oid, extents[0].oloc, ops, mtime, snapc,
427 flags, oncommit);
428 return;
429 }
430
431 if (len > 0 && (offset + len) % period)
432 len += period - ((offset + len) % period);
433
434 TruncRange *tr = new TruncRange(ino, *layout, snapc, mtime, flags, oncommit,
435 offset, len, truncate_seq);
436 _do_truncate_range(tr, 0);
437}
438
439struct C_TruncRange : public Context {
440 Filer *filer;
441 TruncRange *tr;
442 C_TruncRange(Filer *f, TruncRange *t) : filer(f), tr(t) {}
443 void finish(int r) override {
444 filer->_do_truncate_range(tr, 1);
445 }
446};
447
448void Filer::_do_truncate_range(TruncRange *tr, int fin)
449{
450 TruncRange::unique_lock trl(tr->lock);
451 tr->uncommitted -= fin;
452 ldout(cct, 10) << "_do_truncate_range " << tr->ino << " objects " << tr->offset
453 << "~" << tr->length << " uncommitted " << tr->uncommitted
454 << dendl;
455
456 if (tr->length == 0 && tr->uncommitted == 0) {
457 tr->oncommit->complete(0);
458 trl.unlock();
459 delete tr;
460 return;
461 }
462
463 vector<ObjectExtent> extents;
464
465 int max = cct->_conf->filer_max_truncate_ops - tr->uncommitted;
466 if (max > 0 && tr->length > 0) {
467 uint64_t len = tr->layout.get_period() * max;
468 if (len > tr->length)
469 len = tr->length;
470
471 uint64_t offset = tr->offset + tr->length - len;
472 Striper::file_to_extents(cct, tr->ino, &tr->layout, offset, len, 0, extents);
473 tr->uncommitted += extents.size();
474 tr->length -= len;
475 }
476
477 trl.unlock();
478
479 // Issue objecter ops outside tr->lock to avoid lock dependency loop
480 for (const auto& p : extents) {
481 vector<OSDOp> ops(1);
482 ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
483 ops[0].op.extent.truncate_size = p.offset;
484 ops[0].op.extent.truncate_seq = tr->truncate_seq;
485 objecter->_modify(p.oid, p.oloc, ops, tr->mtime, tr->snapc, tr->flags,
486 new C_OnFinisher(new C_TruncRange(this, tr), finisher));
487 }
488}