]>
git.proxmox.com Git - mirror_ubuntu-focal-kernel.git/blob - fs/ceph/snap.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
4 #include <linux/sort.h>
5 #include <linux/slab.h>
6 #include <linux/iversion.h>
8 #include "mds_client.h"
9 #include <linux/ceph/decode.h>
11 /* unused map expires after 5 minutes */
12 #define CEPH_SNAPID_MAP_TIMEOUT (5 * 60 * HZ)
15 * Snapshots in ceph are driven in large part by cooperation from the
16 * client. In contrast to local file systems or file servers that
17 * implement snapshots at a single point in the system, ceph's
18 * distributed access to storage requires clients to help decide
19 * whether a write logically occurs before or after a recently created
22 * This provides a perfect instantanous client-wide snapshot. Between
23 * clients, however, snapshots may appear to be applied at slightly
24 * different points in time, depending on delays in delivering the
25 * snapshot notification.
27 * Snapshots are _not_ file system-wide. Instead, each snapshot
28 * applies to the subdirectory nested beneath some directory. This
29 * effectively divides the hierarchy into multiple "realms," where all
30 * of the files contained by each realm share the same set of
31 * snapshots. An individual realm's snap set contains snapshots
32 * explicitly created on that realm, as well as any snaps in its
33 * parent's snap set _after_ the point at which the parent became it's
34 * parent (due to, say, a rename). Similarly, snaps from prior parents
35 * during the time intervals during which they were the parent are included.
37 * The client is spared most of this detail, fortunately... it must only
38 * maintains a hierarchy of realms reflecting the current parent/child
39 * realm relationship, and for each realm has an explicit list of snaps
40 * inherited from prior parents.
42 * A snap_realm struct is maintained for realms containing every inode
43 * with an open cap in the system. (The needed snap realm information is
44 * provided by the MDS whenever a cap is issued, i.e., on open.) A 'seq'
45 * version number is used to ensure that as realm parameters change (new
46 * snapshot, new parent, etc.) the client's realm hierarchy is updated.
48 * The realm hierarchy drives the generation of a 'snap context' for each
49 * realm, which simply lists the resulting set of snaps for the realm. This
50 * is attached to any writes sent to OSDs.
53 * Unfortunately error handling is a bit mixed here. If we get a snap
54 * update, but don't have enough memory to update our realm hierarchy,
55 * it's not clear what we can do about it (besides complaining to the
61 * increase ref count for the realm
63 * caller must hold snap_rwsem for write.
65 void ceph_get_snap_realm(struct ceph_mds_client
*mdsc
,
66 struct ceph_snap_realm
*realm
)
68 dout("get_realm %p %d -> %d\n", realm
,
69 atomic_read(&realm
->nref
), atomic_read(&realm
->nref
)+1);
71 * since we _only_ increment realm refs or empty the empty
72 * list with snap_rwsem held, adjusting the empty list here is
73 * safe. we do need to protect against concurrent empty list
76 if (atomic_inc_return(&realm
->nref
) == 1) {
77 spin_lock(&mdsc
->snap_empty_lock
);
78 list_del_init(&realm
->empty_item
);
79 spin_unlock(&mdsc
->snap_empty_lock
);
83 static void __insert_snap_realm(struct rb_root
*root
,
84 struct ceph_snap_realm
*new)
86 struct rb_node
**p
= &root
->rb_node
;
87 struct rb_node
*parent
= NULL
;
88 struct ceph_snap_realm
*r
= NULL
;
92 r
= rb_entry(parent
, struct ceph_snap_realm
, node
);
93 if (new->ino
< r
->ino
)
95 else if (new->ino
> r
->ino
)
101 rb_link_node(&new->node
, parent
, p
);
102 rb_insert_color(&new->node
, root
);
106 * create and get the realm rooted at @ino and bump its ref count.
108 * caller must hold snap_rwsem for write.
110 static struct ceph_snap_realm
*ceph_create_snap_realm(
111 struct ceph_mds_client
*mdsc
,
114 struct ceph_snap_realm
*realm
;
116 realm
= kzalloc(sizeof(*realm
), GFP_NOFS
);
118 return ERR_PTR(-ENOMEM
);
120 atomic_set(&realm
->nref
, 1); /* for caller */
122 INIT_LIST_HEAD(&realm
->children
);
123 INIT_LIST_HEAD(&realm
->child_item
);
124 INIT_LIST_HEAD(&realm
->empty_item
);
125 INIT_LIST_HEAD(&realm
->dirty_item
);
126 INIT_LIST_HEAD(&realm
->inodes_with_caps
);
127 spin_lock_init(&realm
->inodes_with_caps_lock
);
128 __insert_snap_realm(&mdsc
->snap_realms
, realm
);
129 mdsc
->num_snap_realms
++;
131 dout("create_snap_realm %llx %p\n", realm
->ino
, realm
);
136 * lookup the realm rooted at @ino.
138 * caller must hold snap_rwsem for write.
140 static struct ceph_snap_realm
*__lookup_snap_realm(struct ceph_mds_client
*mdsc
,
143 struct rb_node
*n
= mdsc
->snap_realms
.rb_node
;
144 struct ceph_snap_realm
*r
;
147 r
= rb_entry(n
, struct ceph_snap_realm
, node
);
150 else if (ino
> r
->ino
)
153 dout("lookup_snap_realm %llx %p\n", r
->ino
, r
);
160 struct ceph_snap_realm
*ceph_lookup_snap_realm(struct ceph_mds_client
*mdsc
,
163 struct ceph_snap_realm
*r
;
164 r
= __lookup_snap_realm(mdsc
, ino
);
166 ceph_get_snap_realm(mdsc
, r
);
170 static void __put_snap_realm(struct ceph_mds_client
*mdsc
,
171 struct ceph_snap_realm
*realm
);
174 * called with snap_rwsem (write)
176 static void __destroy_snap_realm(struct ceph_mds_client
*mdsc
,
177 struct ceph_snap_realm
*realm
)
179 dout("__destroy_snap_realm %p %llx\n", realm
, realm
->ino
);
181 rb_erase(&realm
->node
, &mdsc
->snap_realms
);
182 mdsc
->num_snap_realms
--;
185 list_del_init(&realm
->child_item
);
186 __put_snap_realm(mdsc
, realm
->parent
);
189 kfree(realm
->prior_parent_snaps
);
191 ceph_put_snap_context(realm
->cached_context
);
196 * caller holds snap_rwsem (write)
198 static void __put_snap_realm(struct ceph_mds_client
*mdsc
,
199 struct ceph_snap_realm
*realm
)
201 dout("__put_snap_realm %llx %p %d -> %d\n", realm
->ino
, realm
,
202 atomic_read(&realm
->nref
), atomic_read(&realm
->nref
)-1);
203 if (atomic_dec_and_test(&realm
->nref
))
204 __destroy_snap_realm(mdsc
, realm
);
208 * caller needn't hold any locks
210 void ceph_put_snap_realm(struct ceph_mds_client
*mdsc
,
211 struct ceph_snap_realm
*realm
)
213 dout("put_snap_realm %llx %p %d -> %d\n", realm
->ino
, realm
,
214 atomic_read(&realm
->nref
), atomic_read(&realm
->nref
)-1);
215 if (!atomic_dec_and_test(&realm
->nref
))
218 if (down_write_trylock(&mdsc
->snap_rwsem
)) {
219 __destroy_snap_realm(mdsc
, realm
);
220 up_write(&mdsc
->snap_rwsem
);
222 spin_lock(&mdsc
->snap_empty_lock
);
223 list_add(&realm
->empty_item
, &mdsc
->snap_empty
);
224 spin_unlock(&mdsc
->snap_empty_lock
);
229 * Clean up any realms whose ref counts have dropped to zero. Note
230 * that this does not include realms who were created but not yet
233 * Called under snap_rwsem (write)
235 static void __cleanup_empty_realms(struct ceph_mds_client
*mdsc
)
237 struct ceph_snap_realm
*realm
;
239 spin_lock(&mdsc
->snap_empty_lock
);
240 while (!list_empty(&mdsc
->snap_empty
)) {
241 realm
= list_first_entry(&mdsc
->snap_empty
,
242 struct ceph_snap_realm
, empty_item
);
243 list_del(&realm
->empty_item
);
244 spin_unlock(&mdsc
->snap_empty_lock
);
245 __destroy_snap_realm(mdsc
, realm
);
246 spin_lock(&mdsc
->snap_empty_lock
);
248 spin_unlock(&mdsc
->snap_empty_lock
);
251 void ceph_cleanup_empty_realms(struct ceph_mds_client
*mdsc
)
253 down_write(&mdsc
->snap_rwsem
);
254 __cleanup_empty_realms(mdsc
);
255 up_write(&mdsc
->snap_rwsem
);
259 * adjust the parent realm of a given @realm. adjust child list, and parent
260 * pointers, and ref counts appropriately.
262 * return true if parent was changed, 0 if unchanged, <0 on error.
264 * caller must hold snap_rwsem for write.
266 static int adjust_snap_realm_parent(struct ceph_mds_client
*mdsc
,
267 struct ceph_snap_realm
*realm
,
270 struct ceph_snap_realm
*parent
;
272 if (realm
->parent_ino
== parentino
)
275 parent
= ceph_lookup_snap_realm(mdsc
, parentino
);
277 parent
= ceph_create_snap_realm(mdsc
, parentino
);
279 return PTR_ERR(parent
);
281 dout("adjust_snap_realm_parent %llx %p: %llx %p -> %llx %p\n",
282 realm
->ino
, realm
, realm
->parent_ino
, realm
->parent
,
285 list_del_init(&realm
->child_item
);
286 ceph_put_snap_realm(mdsc
, realm
->parent
);
288 realm
->parent_ino
= parentino
;
289 realm
->parent
= parent
;
290 list_add(&realm
->child_item
, &parent
->children
);
295 static int cmpu64_rev(const void *a
, const void *b
)
297 if (*(u64
*)a
< *(u64
*)b
)
299 if (*(u64
*)a
> *(u64
*)b
)
306 * build the snap context for a given realm.
308 static int build_snap_context(struct ceph_snap_realm
*realm
,
309 struct list_head
* dirty_realms
)
311 struct ceph_snap_realm
*parent
= realm
->parent
;
312 struct ceph_snap_context
*snapc
;
314 u32 num
= realm
->num_prior_parent_snaps
+ realm
->num_snaps
;
317 * build parent context, if it hasn't been built.
318 * conservatively estimate that all parent snaps might be
322 if (!parent
->cached_context
) {
323 err
= build_snap_context(parent
, dirty_realms
);
327 num
+= parent
->cached_context
->num_snaps
;
330 /* do i actually need to update? not if my context seq
331 matches realm seq, and my parents' does to. (this works
332 because we rebuild_snap_realms() works _downward_ in
333 hierarchy after each update.) */
334 if (realm
->cached_context
&&
335 realm
->cached_context
->seq
== realm
->seq
&&
337 realm
->cached_context
->seq
>= parent
->cached_context
->seq
)) {
338 dout("build_snap_context %llx %p: %p seq %lld (%u snaps)"
340 realm
->ino
, realm
, realm
->cached_context
,
341 realm
->cached_context
->seq
,
342 (unsigned int)realm
->cached_context
->num_snaps
);
346 /* alloc new snap context */
348 if (num
> (SIZE_MAX
- sizeof(*snapc
)) / sizeof(u64
))
350 snapc
= ceph_create_snap_context(num
, GFP_NOFS
);
354 /* build (reverse sorted) snap vector */
356 snapc
->seq
= realm
->seq
;
360 /* include any of parent's snaps occurring _after_ my
361 parent became my parent */
362 for (i
= 0; i
< parent
->cached_context
->num_snaps
; i
++)
363 if (parent
->cached_context
->snaps
[i
] >=
365 snapc
->snaps
[num
++] =
366 parent
->cached_context
->snaps
[i
];
367 if (parent
->cached_context
->seq
> snapc
->seq
)
368 snapc
->seq
= parent
->cached_context
->seq
;
370 memcpy(snapc
->snaps
+ num
, realm
->snaps
,
371 sizeof(u64
)*realm
->num_snaps
);
372 num
+= realm
->num_snaps
;
373 memcpy(snapc
->snaps
+ num
, realm
->prior_parent_snaps
,
374 sizeof(u64
)*realm
->num_prior_parent_snaps
);
375 num
+= realm
->num_prior_parent_snaps
;
377 sort(snapc
->snaps
, num
, sizeof(u64
), cmpu64_rev
, NULL
);
378 snapc
->num_snaps
= num
;
379 dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n",
380 realm
->ino
, realm
, snapc
, snapc
->seq
,
381 (unsigned int) snapc
->num_snaps
);
383 ceph_put_snap_context(realm
->cached_context
);
384 realm
->cached_context
= snapc
;
385 /* queue realm for cap_snap creation */
386 list_add_tail(&realm
->dirty_item
, dirty_realms
);
391 * if we fail, clear old (incorrect) cached_context... hopefully
392 * we'll have better luck building it later
394 if (realm
->cached_context
) {
395 ceph_put_snap_context(realm
->cached_context
);
396 realm
->cached_context
= NULL
;
398 pr_err("build_snap_context %llx %p fail %d\n", realm
->ino
,
404 * rebuild snap context for the given realm and all of its children.
406 static void rebuild_snap_realms(struct ceph_snap_realm
*realm
,
407 struct list_head
*dirty_realms
)
409 struct ceph_snap_realm
*child
;
411 dout("rebuild_snap_realms %llx %p\n", realm
->ino
, realm
);
412 build_snap_context(realm
, dirty_realms
);
414 list_for_each_entry(child
, &realm
->children
, child_item
)
415 rebuild_snap_realms(child
, dirty_realms
);
420 * helper to allocate and decode an array of snapids. free prior
423 static int dup_array(u64
**dst
, __le64
*src
, u32 num
)
429 *dst
= kcalloc(num
, sizeof(u64
), GFP_NOFS
);
432 for (i
= 0; i
< num
; i
++)
433 (*dst
)[i
] = get_unaligned_le64(src
+ i
);
440 static bool has_new_snaps(struct ceph_snap_context
*o
,
441 struct ceph_snap_context
*n
)
443 if (n
->num_snaps
== 0)
445 /* snaps are in descending order */
446 return n
->snaps
[0] > o
->seq
;
450 * When a snapshot is applied, the size/mtime inode metadata is queued
451 * in a ceph_cap_snap (one for each snapshot) until writeback
452 * completes and the metadata can be flushed back to the MDS.
454 * However, if a (sync) write is currently in-progress when we apply
455 * the snapshot, we have to wait until the write succeeds or fails
456 * (and a final size/mtime is known). In this case the
457 * cap_snap->writing = 1, and is said to be "pending." When the write
458 * finishes, we __ceph_finish_cap_snap().
460 * Caller must hold snap_rwsem for read (i.e., the realm topology won't
463 void ceph_queue_cap_snap(struct ceph_inode_info
*ci
)
465 struct inode
*inode
= &ci
->vfs_inode
;
466 struct ceph_cap_snap
*capsnap
;
467 struct ceph_snap_context
*old_snapc
, *new_snapc
;
468 struct ceph_buffer
*old_blob
= NULL
;
471 capsnap
= kzalloc(sizeof(*capsnap
), GFP_NOFS
);
473 pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode
);
477 spin_lock(&ci
->i_ceph_lock
);
478 used
= __ceph_caps_used(ci
);
479 dirty
= __ceph_caps_dirty(ci
);
481 old_snapc
= ci
->i_head_snapc
;
482 new_snapc
= ci
->i_snap_realm
->cached_context
;
485 * If there is a write in progress, treat that as a dirty Fw,
486 * even though it hasn't completed yet; by the time we finish
487 * up this capsnap it will be.
489 if (used
& CEPH_CAP_FILE_WR
)
490 dirty
|= CEPH_CAP_FILE_WR
;
492 if (__ceph_have_pending_cap_snap(ci
)) {
493 /* there is no point in queuing multiple "pending" cap_snaps,
494 as no new writes are allowed to start when pending, so any
495 writes in progress now were started before the previous
496 cap_snap. lucky us. */
497 dout("queue_cap_snap %p already pending\n", inode
);
500 if (ci
->i_wrbuffer_ref_head
== 0 &&
501 !(dirty
& (CEPH_CAP_ANY_EXCL
|CEPH_CAP_FILE_WR
))) {
502 dout("queue_cap_snap %p nothing dirty|writing\n", inode
);
509 * There is no need to send FLUSHSNAP message to MDS if there is
510 * no new snapshot. But when there is dirty pages or on-going
511 * writes, we still need to create cap_snap. cap_snap is needed
512 * by the write path and page writeback path.
514 * also see ceph_try_drop_cap_snap()
516 if (has_new_snaps(old_snapc
, new_snapc
)) {
517 if (dirty
& (CEPH_CAP_ANY_EXCL
|CEPH_CAP_FILE_WR
))
518 capsnap
->need_flush
= true;
520 if (!(used
& CEPH_CAP_FILE_WR
) &&
521 ci
->i_wrbuffer_ref_head
== 0) {
522 dout("queue_cap_snap %p "
523 "no new_snap|dirty_page|writing\n", inode
);
528 dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n",
529 inode
, capsnap
, old_snapc
, ceph_cap_string(dirty
),
530 capsnap
->need_flush
? "" : "no_flush");
533 refcount_set(&capsnap
->nref
, 1);
534 INIT_LIST_HEAD(&capsnap
->ci_item
);
536 capsnap
->follows
= old_snapc
->seq
;
537 capsnap
->issued
= __ceph_caps_issued(ci
, NULL
);
538 capsnap
->dirty
= dirty
;
540 capsnap
->mode
= inode
->i_mode
;
541 capsnap
->uid
= inode
->i_uid
;
542 capsnap
->gid
= inode
->i_gid
;
544 if (dirty
& CEPH_CAP_XATTR_EXCL
) {
545 old_blob
= __ceph_build_xattrs_blob(ci
);
546 capsnap
->xattr_blob
=
547 ceph_buffer_get(ci
->i_xattrs
.blob
);
548 capsnap
->xattr_version
= ci
->i_xattrs
.version
;
550 capsnap
->xattr_blob
= NULL
;
551 capsnap
->xattr_version
= 0;
554 capsnap
->inline_data
= ci
->i_inline_version
!= CEPH_INLINE_NONE
;
556 /* dirty page count moved from _head to this cap_snap;
557 all subsequent writes page dirties occur _after_ this
559 capsnap
->dirty_pages
= ci
->i_wrbuffer_ref_head
;
560 ci
->i_wrbuffer_ref_head
= 0;
561 capsnap
->context
= old_snapc
;
562 list_add_tail(&capsnap
->ci_item
, &ci
->i_cap_snaps
);
564 if (used
& CEPH_CAP_FILE_WR
) {
565 dout("queue_cap_snap %p cap_snap %p snapc %p"
566 " seq %llu used WR, now pending\n", inode
,
567 capsnap
, old_snapc
, old_snapc
->seq
);
568 capsnap
->writing
= 1;
570 /* note mtime, size NOW. */
571 __ceph_finish_cap_snap(ci
, capsnap
);
577 if (ci
->i_wrbuffer_ref_head
== 0 &&
579 ci
->i_dirty_caps
== 0 &&
580 ci
->i_flushing_caps
== 0) {
581 ci
->i_head_snapc
= NULL
;
583 ci
->i_head_snapc
= ceph_get_snap_context(new_snapc
);
584 dout(" new snapc is %p\n", new_snapc
);
586 spin_unlock(&ci
->i_ceph_lock
);
588 ceph_buffer_put(old_blob
);
590 ceph_put_snap_context(old_snapc
);
594 * Finalize the size, mtime for a cap_snap.. that is, settle on final values
595 * to be used for the snapshot, to be flushed back to the mds.
597 * If capsnap can now be flushed, add to snap_flush list, and return 1.
599 * Caller must hold i_ceph_lock.
601 int __ceph_finish_cap_snap(struct ceph_inode_info
*ci
,
602 struct ceph_cap_snap
*capsnap
)
604 struct inode
*inode
= &ci
->vfs_inode
;
605 struct ceph_mds_client
*mdsc
= ceph_sb_to_client(inode
->i_sb
)->mdsc
;
607 BUG_ON(capsnap
->writing
);
608 capsnap
->size
= inode
->i_size
;
609 capsnap
->mtime
= inode
->i_mtime
;
610 capsnap
->atime
= inode
->i_atime
;
611 capsnap
->ctime
= inode
->i_ctime
;
612 capsnap
->btime
= ci
->i_btime
;
613 capsnap
->change_attr
= inode_peek_iversion_raw(inode
);
614 capsnap
->time_warp_seq
= ci
->i_time_warp_seq
;
615 capsnap
->truncate_size
= ci
->i_truncate_size
;
616 capsnap
->truncate_seq
= ci
->i_truncate_seq
;
617 if (capsnap
->dirty_pages
) {
618 dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
619 "still has %d dirty pages\n", inode
, capsnap
,
620 capsnap
->context
, capsnap
->context
->seq
,
621 ceph_cap_string(capsnap
->dirty
), capsnap
->size
,
622 capsnap
->dirty_pages
);
626 ci
->i_ceph_flags
|= CEPH_I_FLUSH_SNAPS
;
627 dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
628 inode
, capsnap
, capsnap
->context
,
629 capsnap
->context
->seq
, ceph_cap_string(capsnap
->dirty
),
632 spin_lock(&mdsc
->snap_flush_lock
);
633 if (list_empty(&ci
->i_snap_flush_item
))
634 list_add_tail(&ci
->i_snap_flush_item
, &mdsc
->snap_flush_list
);
635 spin_unlock(&mdsc
->snap_flush_lock
);
636 return 1; /* caller may want to ceph_flush_snaps */
640 * Queue cap_snaps for snap writeback for this realm and its children.
641 * Called under snap_rwsem, so realm topology won't change.
643 static void queue_realm_cap_snaps(struct ceph_snap_realm
*realm
)
645 struct ceph_inode_info
*ci
;
646 struct inode
*lastinode
= NULL
;
648 dout("queue_realm_cap_snaps %p %llx inodes\n", realm
, realm
->ino
);
650 spin_lock(&realm
->inodes_with_caps_lock
);
651 list_for_each_entry(ci
, &realm
->inodes_with_caps
, i_snap_realm_item
) {
652 struct inode
*inode
= igrab(&ci
->vfs_inode
);
655 spin_unlock(&realm
->inodes_with_caps_lock
);
656 /* avoid calling iput_final() while holding
657 * mdsc->snap_rwsem or in mds dispatch threads */
658 ceph_async_iput(lastinode
);
660 ceph_queue_cap_snap(ci
);
661 spin_lock(&realm
->inodes_with_caps_lock
);
663 spin_unlock(&realm
->inodes_with_caps_lock
);
664 ceph_async_iput(lastinode
);
666 dout("queue_realm_cap_snaps %p %llx done\n", realm
, realm
->ino
);
670 * Parse and apply a snapblob "snap trace" from the MDS. This specifies
671 * the snap realm parameters from a given realm and all of its ancestors,
674 * Caller must hold snap_rwsem for write.
676 int ceph_update_snap_trace(struct ceph_mds_client
*mdsc
,
677 void *p
, void *e
, bool deletion
,
678 struct ceph_snap_realm
**realm_ret
)
680 struct ceph_mds_snap_realm
*ri
; /* encoded */
681 __le64
*snaps
; /* encoded */
682 __le64
*prior_parent_snaps
; /* encoded */
683 struct ceph_snap_realm
*realm
= NULL
;
684 struct ceph_snap_realm
*first_realm
= NULL
;
687 LIST_HEAD(dirty_realms
);
689 dout("update_snap_trace deletion=%d\n", deletion
);
691 ceph_decode_need(&p
, e
, sizeof(*ri
), bad
);
694 ceph_decode_need(&p
, e
, sizeof(u64
)*(le32_to_cpu(ri
->num_snaps
) +
695 le32_to_cpu(ri
->num_prior_parent_snaps
)), bad
);
697 p
+= sizeof(u64
) * le32_to_cpu(ri
->num_snaps
);
698 prior_parent_snaps
= p
;
699 p
+= sizeof(u64
) * le32_to_cpu(ri
->num_prior_parent_snaps
);
701 realm
= ceph_lookup_snap_realm(mdsc
, le64_to_cpu(ri
->ino
));
703 realm
= ceph_create_snap_realm(mdsc
, le64_to_cpu(ri
->ino
));
705 err
= PTR_ERR(realm
);
710 /* ensure the parent is correct */
711 err
= adjust_snap_realm_parent(mdsc
, realm
, le64_to_cpu(ri
->parent
));
716 if (le64_to_cpu(ri
->seq
) > realm
->seq
) {
717 dout("update_snap_trace updating %llx %p %lld -> %lld\n",
718 realm
->ino
, realm
, realm
->seq
, le64_to_cpu(ri
->seq
));
719 /* update realm parameters, snap lists */
720 realm
->seq
= le64_to_cpu(ri
->seq
);
721 realm
->created
= le64_to_cpu(ri
->created
);
722 realm
->parent_since
= le64_to_cpu(ri
->parent_since
);
724 realm
->num_snaps
= le32_to_cpu(ri
->num_snaps
);
725 err
= dup_array(&realm
->snaps
, snaps
, realm
->num_snaps
);
729 realm
->num_prior_parent_snaps
=
730 le32_to_cpu(ri
->num_prior_parent_snaps
);
731 err
= dup_array(&realm
->prior_parent_snaps
, prior_parent_snaps
,
732 realm
->num_prior_parent_snaps
);
736 if (realm
->seq
> mdsc
->last_snap_seq
)
737 mdsc
->last_snap_seq
= realm
->seq
;
740 } else if (!realm
->cached_context
) {
741 dout("update_snap_trace %llx %p seq %lld new\n",
742 realm
->ino
, realm
, realm
->seq
);
745 dout("update_snap_trace %llx %p seq %lld unchanged\n",
746 realm
->ino
, realm
, realm
->seq
);
749 dout("done with %llx %p, invalidated=%d, %p %p\n", realm
->ino
,
750 realm
, invalidate
, p
, e
);
752 /* invalidate when we reach the _end_ (root) of the trace */
753 if (invalidate
&& p
>= e
)
754 rebuild_snap_realms(realm
, &dirty_realms
);
759 ceph_put_snap_realm(mdsc
, realm
);
765 * queue cap snaps _after_ we've built the new snap contexts,
766 * so that i_head_snapc can be set appropriately.
768 while (!list_empty(&dirty_realms
)) {
769 realm
= list_first_entry(&dirty_realms
, struct ceph_snap_realm
,
771 list_del_init(&realm
->dirty_item
);
772 queue_realm_cap_snaps(realm
);
776 *realm_ret
= first_realm
;
778 ceph_put_snap_realm(mdsc
, first_realm
);
780 __cleanup_empty_realms(mdsc
);
786 if (realm
&& !IS_ERR(realm
))
787 ceph_put_snap_realm(mdsc
, realm
);
789 ceph_put_snap_realm(mdsc
, first_realm
);
790 pr_err("update_snap_trace error %d\n", err
);
796 * Send any cap_snaps that are queued for flush. Try to carry
797 * s_mutex across multiple snap flushes to avoid locking overhead.
799 * Caller holds no locks.
801 static void flush_snaps(struct ceph_mds_client
*mdsc
)
803 struct ceph_inode_info
*ci
;
805 struct ceph_mds_session
*session
= NULL
;
807 dout("flush_snaps\n");
808 spin_lock(&mdsc
->snap_flush_lock
);
809 while (!list_empty(&mdsc
->snap_flush_list
)) {
810 ci
= list_first_entry(&mdsc
->snap_flush_list
,
811 struct ceph_inode_info
, i_snap_flush_item
);
812 inode
= &ci
->vfs_inode
;
814 spin_unlock(&mdsc
->snap_flush_lock
);
815 ceph_flush_snaps(ci
, &session
);
816 /* avoid calling iput_final() while holding
817 * session->s_mutex or in mds dispatch threads */
818 ceph_async_iput(inode
);
819 spin_lock(&mdsc
->snap_flush_lock
);
821 spin_unlock(&mdsc
->snap_flush_lock
);
824 mutex_unlock(&session
->s_mutex
);
825 ceph_put_mds_session(session
);
827 dout("flush_snaps done\n");
832 * Handle a snap notification from the MDS.
834 * This can take two basic forms: the simplest is just a snap creation
835 * or deletion notification on an existing realm. This should update the
836 * realm and its children.
838 * The more difficult case is realm creation, due to snap creation at a
839 * new point in the file hierarchy, or due to a rename that moves a file or
840 * directory into another realm.
842 void ceph_handle_snap(struct ceph_mds_client
*mdsc
,
843 struct ceph_mds_session
*session
,
844 struct ceph_msg
*msg
)
846 struct super_block
*sb
= mdsc
->fsc
->sb
;
847 int mds
= session
->s_mds
;
851 struct ceph_snap_realm
*realm
= NULL
;
852 void *p
= msg
->front
.iov_base
;
853 void *e
= p
+ msg
->front
.iov_len
;
854 struct ceph_mds_snap_head
*h
;
855 int num_split_inos
, num_split_realms
;
856 __le64
*split_inos
= NULL
, *split_realms
= NULL
;
858 int locked_rwsem
= 0;
861 if (msg
->front
.iov_len
< sizeof(*h
))
864 op
= le32_to_cpu(h
->op
);
865 split
= le64_to_cpu(h
->split
); /* non-zero if we are splitting an
867 num_split_inos
= le32_to_cpu(h
->num_split_inos
);
868 num_split_realms
= le32_to_cpu(h
->num_split_realms
);
869 trace_len
= le32_to_cpu(h
->trace_len
);
872 dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds
,
873 ceph_snap_op_name(op
), split
, trace_len
);
875 mutex_lock(&session
->s_mutex
);
877 mutex_unlock(&session
->s_mutex
);
879 down_write(&mdsc
->snap_rwsem
);
882 if (op
== CEPH_SNAP_OP_SPLIT
) {
883 struct ceph_mds_snap_realm
*ri
;
886 * A "split" breaks part of an existing realm off into
887 * a new realm. The MDS provides a list of inodes
888 * (with caps) and child realms that belong to the new
892 p
+= sizeof(u64
) * num_split_inos
;
894 p
+= sizeof(u64
) * num_split_realms
;
895 ceph_decode_need(&p
, e
, sizeof(*ri
), bad
);
896 /* we will peek at realm info here, but will _not_
897 * advance p, as the realm update will occur below in
898 * ceph_update_snap_trace. */
901 realm
= ceph_lookup_snap_realm(mdsc
, split
);
903 realm
= ceph_create_snap_realm(mdsc
, split
);
908 dout("splitting snap_realm %llx %p\n", realm
->ino
, realm
);
909 for (i
= 0; i
< num_split_inos
; i
++) {
910 struct ceph_vino vino
= {
911 .ino
= le64_to_cpu(split_inos
[i
]),
914 struct inode
*inode
= ceph_find_inode(sb
, vino
);
915 struct ceph_inode_info
*ci
;
916 struct ceph_snap_realm
*oldrealm
;
920 ci
= ceph_inode(inode
);
922 spin_lock(&ci
->i_ceph_lock
);
923 if (!ci
->i_snap_realm
)
926 * If this inode belongs to a realm that was
927 * created after our new realm, we experienced
928 * a race (due to another split notifications
929 * arriving from a different MDS). So skip
932 if (ci
->i_snap_realm
->created
>
933 le64_to_cpu(ri
->created
)) {
934 dout(" leaving %p in newer realm %llx %p\n",
935 inode
, ci
->i_snap_realm
->ino
,
939 dout(" will move %p to split realm %llx %p\n",
940 inode
, realm
->ino
, realm
);
942 * Move the inode to the new realm
944 oldrealm
= ci
->i_snap_realm
;
945 spin_lock(&oldrealm
->inodes_with_caps_lock
);
946 list_del_init(&ci
->i_snap_realm_item
);
947 spin_unlock(&oldrealm
->inodes_with_caps_lock
);
949 spin_lock(&realm
->inodes_with_caps_lock
);
950 list_add(&ci
->i_snap_realm_item
,
951 &realm
->inodes_with_caps
);
952 ci
->i_snap_realm
= realm
;
953 if (realm
->ino
== ci
->i_vino
.ino
)
954 realm
->inode
= inode
;
955 spin_unlock(&realm
->inodes_with_caps_lock
);
957 spin_unlock(&ci
->i_ceph_lock
);
959 ceph_get_snap_realm(mdsc
, realm
);
960 ceph_put_snap_realm(mdsc
, oldrealm
);
962 /* avoid calling iput_final() while holding
963 * mdsc->snap_rwsem or mds in dispatch threads */
964 ceph_async_iput(inode
);
968 spin_unlock(&ci
->i_ceph_lock
);
969 ceph_async_iput(inode
);
972 /* we may have taken some of the old realm's children. */
973 for (i
= 0; i
< num_split_realms
; i
++) {
974 struct ceph_snap_realm
*child
=
975 __lookup_snap_realm(mdsc
,
976 le64_to_cpu(split_realms
[i
]));
979 adjust_snap_realm_parent(mdsc
, child
, realm
->ino
);
984 * update using the provided snap trace. if we are deleting a
985 * snap, we can avoid queueing cap_snaps.
987 ceph_update_snap_trace(mdsc
, p
, e
,
988 op
== CEPH_SNAP_OP_DESTROY
, NULL
);
990 if (op
== CEPH_SNAP_OP_SPLIT
)
991 /* we took a reference when we created the realm, above */
992 ceph_put_snap_realm(mdsc
, realm
);
994 __cleanup_empty_realms(mdsc
);
996 up_write(&mdsc
->snap_rwsem
);
1002 pr_err("corrupt snap message from mds%d\n", mds
);
1006 up_write(&mdsc
->snap_rwsem
);
1010 struct ceph_snapid_map
* ceph_get_snapid_map(struct ceph_mds_client
*mdsc
,
1013 struct ceph_snapid_map
*sm
, *exist
;
1014 struct rb_node
**p
, *parent
;
1018 spin_lock(&mdsc
->snapid_map_lock
);
1019 p
= &mdsc
->snapid_map_tree
.rb_node
;
1021 exist
= rb_entry(*p
, struct ceph_snapid_map
, node
);
1022 if (snap
> exist
->snap
) {
1024 } else if (snap
< exist
->snap
) {
1025 p
= &(*p
)->rb_right
;
1027 if (atomic_inc_return(&exist
->ref
) == 1)
1028 list_del_init(&exist
->lru
);
1033 spin_unlock(&mdsc
->snapid_map_lock
);
1035 dout("found snapid map %llx -> %x\n", exist
->snap
, exist
->dev
);
1039 sm
= kmalloc(sizeof(*sm
), GFP_NOFS
);
1043 ret
= get_anon_bdev(&sm
->dev
);
1049 INIT_LIST_HEAD(&sm
->lru
);
1050 atomic_set(&sm
->ref
, 1);
1055 p
= &mdsc
->snapid_map_tree
.rb_node
;
1056 spin_lock(&mdsc
->snapid_map_lock
);
1059 exist
= rb_entry(*p
, struct ceph_snapid_map
, node
);
1060 if (snap
> exist
->snap
)
1062 else if (snap
< exist
->snap
)
1063 p
= &(*p
)->rb_right
;
1069 if (atomic_inc_return(&exist
->ref
) == 1)
1070 list_del_init(&exist
->lru
);
1072 rb_link_node(&sm
->node
, parent
, p
);
1073 rb_insert_color(&sm
->node
, &mdsc
->snapid_map_tree
);
1075 spin_unlock(&mdsc
->snapid_map_lock
);
1077 free_anon_bdev(sm
->dev
);
1079 dout("found snapid map %llx -> %x\n", exist
->snap
, exist
->dev
);
1083 dout("create snapid map %llx -> %x\n", sm
->snap
, sm
->dev
);
1087 void ceph_put_snapid_map(struct ceph_mds_client
* mdsc
,
1088 struct ceph_snapid_map
*sm
)
1092 if (atomic_dec_and_lock(&sm
->ref
, &mdsc
->snapid_map_lock
)) {
1093 if (!RB_EMPTY_NODE(&sm
->node
)) {
1094 sm
->last_used
= jiffies
;
1095 list_add_tail(&sm
->lru
, &mdsc
->snapid_map_lru
);
1096 spin_unlock(&mdsc
->snapid_map_lock
);
1098 /* already cleaned up by
1099 * ceph_cleanup_snapid_map() */
1100 spin_unlock(&mdsc
->snapid_map_lock
);
1106 void ceph_trim_snapid_map(struct ceph_mds_client
*mdsc
)
1108 struct ceph_snapid_map
*sm
;
1112 spin_lock(&mdsc
->snapid_map_lock
);
1115 while (!list_empty(&mdsc
->snapid_map_lru
)) {
1116 sm
= list_first_entry(&mdsc
->snapid_map_lru
,
1117 struct ceph_snapid_map
, lru
);
1118 if (time_after(sm
->last_used
+ CEPH_SNAPID_MAP_TIMEOUT
, now
))
1121 rb_erase(&sm
->node
, &mdsc
->snapid_map_tree
);
1122 list_move(&sm
->lru
, &to_free
);
1124 spin_unlock(&mdsc
->snapid_map_lock
);
1126 while (!list_empty(&to_free
)) {
1127 sm
= list_first_entry(&to_free
, struct ceph_snapid_map
, lru
);
1129 dout("trim snapid map %llx -> %x\n", sm
->snap
, sm
->dev
);
1130 free_anon_bdev(sm
->dev
);
1135 void ceph_cleanup_snapid_map(struct ceph_mds_client
*mdsc
)
1137 struct ceph_snapid_map
*sm
;
1141 spin_lock(&mdsc
->snapid_map_lock
);
1142 while ((p
= rb_first(&mdsc
->snapid_map_tree
))) {
1143 sm
= rb_entry(p
, struct ceph_snapid_map
, node
);
1144 rb_erase(p
, &mdsc
->snapid_map_tree
);
1146 list_move(&sm
->lru
, &to_free
);
1148 spin_unlock(&mdsc
->snapid_map_lock
);
1150 while (!list_empty(&to_free
)) {
1151 sm
= list_first_entry(&to_free
, struct ceph_snapid_map
, lru
);
1153 free_anon_bdev(sm
->dev
);
1154 if (WARN_ON_ONCE(atomic_read(&sm
->ref
))) {
1155 pr_err("snapid map %llx -> %x still in use\n",