]> git.proxmox.com Git - mirror_ubuntu-disco-kernel.git/blame - fs/ceph/caps.c
ceph: adding protection for showing cap reservation info
[mirror_ubuntu-disco-kernel.git] / fs / ceph / caps.c
CommitLineData
b2441318 1// SPDX-License-Identifier: GPL-2.0
3d14c5d2 2#include <linux/ceph/ceph_debug.h>
a8599bd8
SW
3
4#include <linux/fs.h>
5#include <linux/kernel.h>
174cd4b1 6#include <linux/sched/signal.h>
5a0e3ad6 7#include <linux/slab.h>
a8599bd8
SW
8#include <linux/vmalloc.h>
9#include <linux/wait.h>
f1a3d572 10#include <linux/writeback.h>
a8599bd8
SW
11
12#include "super.h"
3d14c5d2 13#include "mds_client.h"
99ccbd22 14#include "cache.h"
3d14c5d2
YS
15#include <linux/ceph/decode.h>
16#include <linux/ceph/messenger.h>
a8599bd8
SW
17
18/*
19 * Capability management
20 *
21 * The Ceph metadata servers control client access to inode metadata
22 * and file data by issuing capabilities, granting clients permission
23 * to read and/or write both inode field and file data to OSDs
24 * (storage nodes). Each capability consists of a set of bits
25 * indicating which operations are allowed.
26 *
27 * If the client holds a *_SHARED cap, the client has a coherent value
28 * that can be safely read from the cached inode.
29 *
30 * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the
31 * client is allowed to change inode attributes (e.g., file size,
32 * mtime), note its dirty state in the ceph_cap, and asynchronously
33 * flush that metadata change to the MDS.
34 *
35 * In the event of a conflicting operation (perhaps by another
36 * client), the MDS will revoke the conflicting client capabilities.
37 *
38 * In order for a client to cache an inode, it must hold a capability
39 * with at least one MDS server. When inodes are released, release
40 * notifications are batched and periodically sent en masse to the MDS
41 * cluster to release server state.
42 */
43
0e294387 44static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc);
7bc00fdd
YZ
45static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
46 struct ceph_mds_session *session,
47 struct ceph_inode_info *ci,
48 u64 oldest_flush_tid);
a8599bd8
SW
49
50/*
51 * Generate readable cap strings for debugging output.
52 */
53#define MAX_CAP_STR 20
54static char cap_str[MAX_CAP_STR][40];
55static DEFINE_SPINLOCK(cap_str_lock);
56static int last_cap_str;
57
58static char *gcap_string(char *s, int c)
59{
60 if (c & CEPH_CAP_GSHARED)
61 *s++ = 's';
62 if (c & CEPH_CAP_GEXCL)
63 *s++ = 'x';
64 if (c & CEPH_CAP_GCACHE)
65 *s++ = 'c';
66 if (c & CEPH_CAP_GRD)
67 *s++ = 'r';
68 if (c & CEPH_CAP_GWR)
69 *s++ = 'w';
70 if (c & CEPH_CAP_GBUFFER)
71 *s++ = 'b';
72 if (c & CEPH_CAP_GLAZYIO)
73 *s++ = 'l';
74 return s;
75}
76
77const char *ceph_cap_string(int caps)
78{
79 int i;
80 char *s;
81 int c;
82
83 spin_lock(&cap_str_lock);
84 i = last_cap_str++;
85 if (last_cap_str == MAX_CAP_STR)
86 last_cap_str = 0;
87 spin_unlock(&cap_str_lock);
88
89 s = cap_str[i];
90
91 if (caps & CEPH_CAP_PIN)
92 *s++ = 'p';
93
94 c = (caps >> CEPH_CAP_SAUTH) & 3;
95 if (c) {
96 *s++ = 'A';
97 s = gcap_string(s, c);
98 }
99
100 c = (caps >> CEPH_CAP_SLINK) & 3;
101 if (c) {
102 *s++ = 'L';
103 s = gcap_string(s, c);
104 }
105
106 c = (caps >> CEPH_CAP_SXATTR) & 3;
107 if (c) {
108 *s++ = 'X';
109 s = gcap_string(s, c);
110 }
111
112 c = caps >> CEPH_CAP_SFILE;
113 if (c) {
114 *s++ = 'F';
115 s = gcap_string(s, c);
116 }
117
118 if (s == cap_str[i])
119 *s++ = '-';
120 *s = 0;
121 return cap_str[i];
122}
123
37151668 124void ceph_caps_init(struct ceph_mds_client *mdsc)
a8599bd8 125{
37151668
YS
126 INIT_LIST_HEAD(&mdsc->caps_list);
127 spin_lock_init(&mdsc->caps_list_lock);
a8599bd8
SW
128}
129
37151668 130void ceph_caps_finalize(struct ceph_mds_client *mdsc)
a8599bd8
SW
131{
132 struct ceph_cap *cap;
133
37151668
YS
134 spin_lock(&mdsc->caps_list_lock);
135 while (!list_empty(&mdsc->caps_list)) {
136 cap = list_first_entry(&mdsc->caps_list,
137 struct ceph_cap, caps_item);
a8599bd8
SW
138 list_del(&cap->caps_item);
139 kmem_cache_free(ceph_cap_cachep, cap);
140 }
37151668
YS
141 mdsc->caps_total_count = 0;
142 mdsc->caps_avail_count = 0;
143 mdsc->caps_use_count = 0;
144 mdsc->caps_reserve_count = 0;
145 mdsc->caps_min_count = 0;
146 spin_unlock(&mdsc->caps_list_lock);
85ccce43
SW
147}
148
37151668 149void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
85ccce43 150{
37151668
YS
151 spin_lock(&mdsc->caps_list_lock);
152 mdsc->caps_min_count += delta;
153 BUG_ON(mdsc->caps_min_count < 0);
154 spin_unlock(&mdsc->caps_list_lock);
a8599bd8
SW
155}
156
e30ee581
ZZ
157/*
158 * Called under mdsc->mutex.
159 */
160int ceph_reserve_caps(struct ceph_mds_client *mdsc,
37151668 161 struct ceph_cap_reservation *ctx, int need)
a8599bd8 162{
e30ee581 163 int i, j;
a8599bd8
SW
164 struct ceph_cap *cap;
165 int have;
166 int alloc = 0;
e30ee581
ZZ
167 int max_caps;
168 bool trimmed = false;
169 struct ceph_mds_session *s;
a8599bd8 170 LIST_HEAD(newcaps);
a8599bd8
SW
171
172 dout("reserve caps ctx=%p need=%d\n", ctx, need);
173
174 /* first reserve any caps that are already allocated */
37151668
YS
175 spin_lock(&mdsc->caps_list_lock);
176 if (mdsc->caps_avail_count >= need)
a8599bd8
SW
177 have = need;
178 else
37151668
YS
179 have = mdsc->caps_avail_count;
180 mdsc->caps_avail_count -= have;
181 mdsc->caps_reserve_count += have;
182 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
183 mdsc->caps_reserve_count +
184 mdsc->caps_avail_count);
185 spin_unlock(&mdsc->caps_list_lock);
a8599bd8
SW
186
187 for (i = have; i < need; i++) {
e30ee581 188retry:
a8599bd8 189 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
e30ee581
ZZ
190 if (!cap) {
191 if (!trimmed) {
192 for (j = 0; j < mdsc->max_sessions; j++) {
193 s = __ceph_lookup_mds_session(mdsc, j);
194 if (!s)
195 continue;
196 mutex_unlock(&mdsc->mutex);
197
198 mutex_lock(&s->s_mutex);
199 max_caps = s->s_nr_caps - (need - i);
200 ceph_trim_caps(mdsc, s, max_caps);
201 mutex_unlock(&s->s_mutex);
202
203 ceph_put_mds_session(s);
204 mutex_lock(&mdsc->mutex);
205 }
206 trimmed = true;
207 goto retry;
208 } else {
209 pr_warn("reserve caps ctx=%p ENOMEM "
210 "need=%d got=%d\n",
211 ctx, need, have + alloc);
212 goto out_nomem;
213 }
214 }
a8599bd8
SW
215 list_add(&cap->caps_item, &newcaps);
216 alloc++;
217 }
e30ee581 218 BUG_ON(have + alloc != need);
a8599bd8 219
37151668
YS
220 spin_lock(&mdsc->caps_list_lock);
221 mdsc->caps_total_count += alloc;
222 mdsc->caps_reserve_count += alloc;
223 list_splice(&newcaps, &mdsc->caps_list);
a8599bd8 224
37151668
YS
225 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
226 mdsc->caps_reserve_count +
227 mdsc->caps_avail_count);
228 spin_unlock(&mdsc->caps_list_lock);
a8599bd8
SW
229
230 ctx->count = need;
231 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
37151668
YS
232 ctx, mdsc->caps_total_count, mdsc->caps_use_count,
233 mdsc->caps_reserve_count, mdsc->caps_avail_count);
e30ee581
ZZ
234 return 0;
235
236out_nomem:
237 while (!list_empty(&newcaps)) {
238 cap = list_first_entry(&newcaps,
239 struct ceph_cap, caps_item);
240 list_del(&cap->caps_item);
241 kmem_cache_free(ceph_cap_cachep, cap);
242 }
243
244 spin_lock(&mdsc->caps_list_lock);
245 mdsc->caps_avail_count += have;
246 mdsc->caps_reserve_count -= have;
247 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
248 mdsc->caps_reserve_count +
249 mdsc->caps_avail_count);
250 spin_unlock(&mdsc->caps_list_lock);
251 return -ENOMEM;
a8599bd8
SW
252}
253
37151668
YS
254int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
255 struct ceph_cap_reservation *ctx)
a8599bd8
SW
256{
257 dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
258 if (ctx->count) {
37151668
YS
259 spin_lock(&mdsc->caps_list_lock);
260 BUG_ON(mdsc->caps_reserve_count < ctx->count);
261 mdsc->caps_reserve_count -= ctx->count;
262 mdsc->caps_avail_count += ctx->count;
a8599bd8
SW
263 ctx->count = 0;
264 dout("unreserve caps %d = %d used + %d resv + %d avail\n",
37151668
YS
265 mdsc->caps_total_count, mdsc->caps_use_count,
266 mdsc->caps_reserve_count, mdsc->caps_avail_count);
267 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
268 mdsc->caps_reserve_count +
269 mdsc->caps_avail_count);
270 spin_unlock(&mdsc->caps_list_lock);
a8599bd8
SW
271 }
272 return 0;
273}
274
d9df2783
YZ
275struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
276 struct ceph_cap_reservation *ctx)
a8599bd8
SW
277{
278 struct ceph_cap *cap = NULL;
279
280 /* temporary, until we do something about cap import/export */
443b3760
SW
281 if (!ctx) {
282 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
283 if (cap) {
4d1d0534 284 spin_lock(&mdsc->caps_list_lock);
37151668
YS
285 mdsc->caps_use_count++;
286 mdsc->caps_total_count++;
4d1d0534 287 spin_unlock(&mdsc->caps_list_lock);
443b3760
SW
288 }
289 return cap;
290 }
a8599bd8 291
37151668 292 spin_lock(&mdsc->caps_list_lock);
a8599bd8 293 dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
37151668
YS
294 ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
295 mdsc->caps_reserve_count, mdsc->caps_avail_count);
a8599bd8 296 BUG_ON(!ctx->count);
37151668
YS
297 BUG_ON(ctx->count > mdsc->caps_reserve_count);
298 BUG_ON(list_empty(&mdsc->caps_list));
a8599bd8
SW
299
300 ctx->count--;
37151668
YS
301 mdsc->caps_reserve_count--;
302 mdsc->caps_use_count++;
a8599bd8 303
37151668 304 cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
a8599bd8
SW
305 list_del(&cap->caps_item);
306
37151668
YS
307 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
308 mdsc->caps_reserve_count + mdsc->caps_avail_count);
309 spin_unlock(&mdsc->caps_list_lock);
a8599bd8
SW
310 return cap;
311}
312
37151668 313void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
a8599bd8 314{
37151668 315 spin_lock(&mdsc->caps_list_lock);
7c1332b8 316 dout("put_cap %p %d = %d used + %d resv + %d avail\n",
37151668
YS
317 cap, mdsc->caps_total_count, mdsc->caps_use_count,
318 mdsc->caps_reserve_count, mdsc->caps_avail_count);
319 mdsc->caps_use_count--;
a8599bd8 320 /*
85ccce43
SW
321 * Keep some preallocated caps around (ceph_min_count), to
322 * avoid lots of free/alloc churn.
a8599bd8 323 */
37151668
YS
324 if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
325 mdsc->caps_min_count) {
326 mdsc->caps_total_count--;
a8599bd8
SW
327 kmem_cache_free(ceph_cap_cachep, cap);
328 } else {
37151668
YS
329 mdsc->caps_avail_count++;
330 list_add(&cap->caps_item, &mdsc->caps_list);
a8599bd8
SW
331 }
332
37151668
YS
333 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
334 mdsc->caps_reserve_count + mdsc->caps_avail_count);
335 spin_unlock(&mdsc->caps_list_lock);
a8599bd8
SW
336}
337
3d14c5d2 338void ceph_reservation_status(struct ceph_fs_client *fsc,
85ccce43
SW
339 int *total, int *avail, int *used, int *reserved,
340 int *min)
a8599bd8 341{
3d14c5d2 342 struct ceph_mds_client *mdsc = fsc->mdsc;
37151668 343
b884014a
CX
344 spin_lock(&mdsc->caps_list_lock);
345
a8599bd8 346 if (total)
37151668 347 *total = mdsc->caps_total_count;
a8599bd8 348 if (avail)
37151668 349 *avail = mdsc->caps_avail_count;
a8599bd8 350 if (used)
37151668 351 *used = mdsc->caps_use_count;
a8599bd8 352 if (reserved)
37151668 353 *reserved = mdsc->caps_reserve_count;
85ccce43 354 if (min)
37151668 355 *min = mdsc->caps_min_count;
b884014a
CX
356
357 spin_unlock(&mdsc->caps_list_lock);
a8599bd8
SW
358}
359
360/*
361 * Find ceph_cap for given mds, if any.
362 *
be655596 363 * Called with i_ceph_lock held.
a8599bd8
SW
364 */
365static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
366{
367 struct ceph_cap *cap;
368 struct rb_node *n = ci->i_caps.rb_node;
369
370 while (n) {
371 cap = rb_entry(n, struct ceph_cap, ci_node);
372 if (mds < cap->mds)
373 n = n->rb_left;
374 else if (mds > cap->mds)
375 n = n->rb_right;
376 else
377 return cap;
378 }
379 return NULL;
380}
381
2bc50259
GF
382struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
383{
384 struct ceph_cap *cap;
385
be655596 386 spin_lock(&ci->i_ceph_lock);
2bc50259 387 cap = __get_cap_for_mds(ci, mds);
be655596 388 spin_unlock(&ci->i_ceph_lock);
2bc50259
GF
389 return cap;
390}
391
a8599bd8 392/*
33caad32 393 * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
a8599bd8 394 */
ca81f3f6 395static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
a8599bd8
SW
396{
397 struct ceph_cap *cap;
398 int mds = -1;
399 struct rb_node *p;
400
33caad32 401 /* prefer mds with WR|BUFFER|EXCL caps */
a8599bd8
SW
402 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
403 cap = rb_entry(p, struct ceph_cap, ci_node);
404 mds = cap->mds;
a8599bd8
SW
405 if (cap->issued & (CEPH_CAP_FILE_WR |
406 CEPH_CAP_FILE_BUFFER |
407 CEPH_CAP_FILE_EXCL))
408 break;
409 }
410 return mds;
411}
412
413int ceph_get_cap_mds(struct inode *inode)
414{
be655596 415 struct ceph_inode_info *ci = ceph_inode(inode);
a8599bd8 416 int mds;
be655596 417 spin_lock(&ci->i_ceph_lock);
ca81f3f6 418 mds = __ceph_get_cap_mds(ceph_inode(inode));
be655596 419 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
420 return mds;
421}
422
423/*
be655596 424 * Called under i_ceph_lock.
a8599bd8
SW
425 */
426static void __insert_cap_node(struct ceph_inode_info *ci,
427 struct ceph_cap *new)
428{
429 struct rb_node **p = &ci->i_caps.rb_node;
430 struct rb_node *parent = NULL;
431 struct ceph_cap *cap = NULL;
432
433 while (*p) {
434 parent = *p;
435 cap = rb_entry(parent, struct ceph_cap, ci_node);
436 if (new->mds < cap->mds)
437 p = &(*p)->rb_left;
438 else if (new->mds > cap->mds)
439 p = &(*p)->rb_right;
440 else
441 BUG();
442 }
443
444 rb_link_node(&new->ci_node, parent, p);
445 rb_insert_color(&new->ci_node, &ci->i_caps);
446}
447
448/*
449 * (re)set cap hold timeouts, which control the delayed release
450 * of unused caps back to the MDS. Should be called on cap use.
451 */
452static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
453 struct ceph_inode_info *ci)
454{
3d14c5d2 455 struct ceph_mount_options *ma = mdsc->fsc->mount_options;
a8599bd8
SW
456
457 ci->i_hold_caps_min = round_jiffies(jiffies +
458 ma->caps_wanted_delay_min * HZ);
459 ci->i_hold_caps_max = round_jiffies(jiffies +
460 ma->caps_wanted_delay_max * HZ);
461 dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
462 ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
463}
464
465/*
466 * (Re)queue cap at the end of the delayed cap release list.
467 *
468 * If I_FLUSH is set, leave the inode at the front of the list.
469 *
be655596 470 * Caller holds i_ceph_lock
a8599bd8
SW
471 * -> we take mdsc->cap_delay_lock
472 */
473static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
474 struct ceph_inode_info *ci)
475{
476 __cap_set_timeouts(mdsc, ci);
477 dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
478 ci->i_ceph_flags, ci->i_hold_caps_max);
479 if (!mdsc->stopping) {
480 spin_lock(&mdsc->cap_delay_lock);
481 if (!list_empty(&ci->i_cap_delay_list)) {
482 if (ci->i_ceph_flags & CEPH_I_FLUSH)
483 goto no_change;
484 list_del_init(&ci->i_cap_delay_list);
485 }
486 list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
487no_change:
488 spin_unlock(&mdsc->cap_delay_lock);
489 }
490}
491
492/*
493 * Queue an inode for immediate writeback. Mark inode with I_FLUSH,
494 * indicating we should send a cap message to flush dirty metadata
495 * asap, and move to the front of the delayed cap list.
496 */
497static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc,
498 struct ceph_inode_info *ci)
499{
500 dout("__cap_delay_requeue_front %p\n", &ci->vfs_inode);
501 spin_lock(&mdsc->cap_delay_lock);
502 ci->i_ceph_flags |= CEPH_I_FLUSH;
503 if (!list_empty(&ci->i_cap_delay_list))
504 list_del_init(&ci->i_cap_delay_list);
505 list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
506 spin_unlock(&mdsc->cap_delay_lock);
507}
508
509/*
510 * Cancel delayed work on cap.
511 *
be655596 512 * Caller must hold i_ceph_lock.
a8599bd8
SW
513 */
514static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
515 struct ceph_inode_info *ci)
516{
517 dout("__cap_delay_cancel %p\n", &ci->vfs_inode);
518 if (list_empty(&ci->i_cap_delay_list))
519 return;
520 spin_lock(&mdsc->cap_delay_lock);
521 list_del_init(&ci->i_cap_delay_list);
522 spin_unlock(&mdsc->cap_delay_lock);
523}
524
525/*
526 * Common issue checks for add_cap, handle_cap_grant.
527 */
528static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
529 unsigned issued)
530{
531 unsigned had = __ceph_caps_issued(ci, NULL);
532
533 /*
534 * Each time we receive FILE_CACHE anew, we increment
535 * i_rdcache_gen.
536 */
2962507c 537 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
99ccbd22 538 (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
a8599bd8 539 ci->i_rdcache_gen++;
99ccbd22 540 }
a8599bd8
SW
541
542 /*
15b51bd6
YZ
543 * If FILE_SHARED is newly issued, mark dir not complete. We don't
544 * know what happened to this directory while we didn't have the cap.
545 * If FILE_SHARED is being revoked, also mark dir not complete. It
546 * stops on-going cached readdir.
a8599bd8 547 */
15b51bd6
YZ
548 if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
549 if (issued & CEPH_CAP_FILE_SHARED)
97aeb6bf 550 atomic_inc(&ci->i_shared_gen);
a8673d61
YZ
551 if (S_ISDIR(ci->vfs_inode.i_mode)) {
552 dout(" marking %p NOT complete\n", &ci->vfs_inode);
2f276c51 553 __ceph_dir_clear_complete(ci);
a8673d61 554 }
a8599bd8
SW
555 }
556}
557
558/*
559 * Add a capability under the given MDS session.
560 *
561 * Caller should hold session snap_rwsem (read) and s_mutex.
562 *
563 * @fmode is the open file mode, if we are opening a file, otherwise
564 * it is < 0. (This is so we can atomically add the cap and add an
565 * open file reference to it.)
566 */
d9df2783
YZ
567void ceph_add_cap(struct inode *inode,
568 struct ceph_mds_session *session, u64 cap_id,
569 int fmode, unsigned issued, unsigned wanted,
570 unsigned seq, unsigned mseq, u64 realmino, int flags,
571 struct ceph_cap **new_cap)
a8599bd8 572{
3d14c5d2 573 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
a8599bd8 574 struct ceph_inode_info *ci = ceph_inode(inode);
a8599bd8
SW
575 struct ceph_cap *cap;
576 int mds = session->s_mds;
577 int actual_wanted;
578
579 dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
580 session->s_mds, cap_id, ceph_cap_string(issued), seq);
581
582 /*
583 * If we are opening the file, include file mode wanted bits
584 * in wanted.
585 */
586 if (fmode >= 0)
587 wanted |= ceph_caps_for_mode(fmode);
588
a8599bd8
SW
589 cap = __get_cap_for_mds(ci, mds);
590 if (!cap) {
d9df2783
YZ
591 cap = *new_cap;
592 *new_cap = NULL;
a8599bd8
SW
593
594 cap->issued = 0;
595 cap->implemented = 0;
596 cap->mds = mds;
597 cap->mds_wanted = 0;
964266cc 598 cap->mseq = 0;
a8599bd8
SW
599
600 cap->ci = ci;
601 __insert_cap_node(ci, cap);
602
a8599bd8
SW
603 /* add to session cap list */
604 cap->session = session;
605 spin_lock(&session->s_cap_lock);
606 list_add_tail(&cap->session_caps, &session->s_caps);
607 session->s_nr_caps++;
608 spin_unlock(&session->s_cap_lock);
11df2dfb 609 } else {
11df2dfb
YZ
610 /*
611 * auth mds of the inode changed. we received the cap export
612 * message, but still haven't received the cap import message.
613 * handle_cap_export() updated the new auth MDS' cap.
614 *
615 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
616 * a message that was send before the cap import message. So
617 * don't remove caps.
618 */
619 if (ceph_seq_cmp(seq, cap->seq) <= 0) {
620 WARN_ON(cap != ci->i_auth_cap);
621 WARN_ON(cap->cap_id != cap_id);
622 seq = cap->seq;
623 mseq = cap->mseq;
624 issued |= cap->issued;
625 flags |= CEPH_CAP_FLAG_AUTH;
626 }
627 }
a8599bd8 628
7d9c9193
YZ
629 if (!ci->i_snap_realm ||
630 ((flags & CEPH_CAP_FLAG_AUTH) &&
631 realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) {
a8599bd8
SW
632 /*
633 * add this inode to the appropriate snap realm
634 */
635 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
636 realmino);
637 if (realm) {
7d9c9193
YZ
638 struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
639 if (oldrealm) {
640 spin_lock(&oldrealm->inodes_with_caps_lock);
641 list_del_init(&ci->i_snap_realm_item);
642 spin_unlock(&oldrealm->inodes_with_caps_lock);
643 }
644
a8599bd8
SW
645 spin_lock(&realm->inodes_with_caps_lock);
646 ci->i_snap_realm = realm;
647 list_add(&ci->i_snap_realm_item,
648 &realm->inodes_with_caps);
649 spin_unlock(&realm->inodes_with_caps_lock);
7d9c9193
YZ
650
651 if (oldrealm)
652 ceph_put_snap_realm(mdsc, oldrealm);
a8599bd8
SW
653 } else {
654 pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
655 realmino);
b8cd07e7 656 WARN_ON(!realm);
a8599bd8
SW
657 }
658 }
659
660 __check_cap_issue(ci, cap, issued);
661
662 /*
663 * If we are issued caps we don't want, or the mds' wanted
664 * value appears to be off, queue a check so we'll release
665 * later and/or update the mds wanted value.
666 */
667 actual_wanted = __ceph_caps_wanted(ci);
668 if ((wanted & ~actual_wanted) ||
669 (issued & ~actual_wanted & CEPH_CAP_ANY_WR)) {
670 dout(" issued %s, mds wanted %s, actual %s, queueing\n",
671 ceph_cap_string(issued), ceph_cap_string(wanted),
672 ceph_cap_string(actual_wanted));
673 __cap_delay_requeue(mdsc, ci);
674 }
675
b8c2f3ae 676 if (flags & CEPH_CAP_FLAG_AUTH) {
d37b1d99 677 if (!ci->i_auth_cap ||
d9ffc4f7 678 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
b8c2f3ae 679 ci->i_auth_cap = cap;
d9ffc4f7
YZ
680 cap->mds_wanted = wanted;
681 }
11df2dfb
YZ
682 } else {
683 WARN_ON(ci->i_auth_cap == cap);
8a92a119 684 }
a8599bd8
SW
685
686 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
687 inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
688 ceph_cap_string(issued|cap->issued), seq, mds);
689 cap->cap_id = cap_id;
690 cap->issued = issued;
691 cap->implemented |= issued;
d1b87809 692 if (ceph_seq_cmp(mseq, cap->mseq) > 0)
964266cc
YZ
693 cap->mds_wanted = wanted;
694 else
695 cap->mds_wanted |= wanted;
a8599bd8
SW
696 cap->seq = seq;
697 cap->issue_seq = seq;
698 cap->mseq = mseq;
685f9a5d 699 cap->cap_gen = session->s_cap_gen;
a8599bd8
SW
700
701 if (fmode >= 0)
702 __ceph_get_fmode(ci, fmode);
a8599bd8
SW
703}
704
705/*
706 * Return true if cap has not timed out and belongs to the current
707 * generation of the MDS session (i.e. has not gone 'stale' due to
708 * us losing touch with the mds).
709 */
710static int __cap_is_valid(struct ceph_cap *cap)
711{
712 unsigned long ttl;
cdac8303 713 u32 gen;
a8599bd8 714
d8fb02ab 715 spin_lock(&cap->session->s_gen_ttl_lock);
a8599bd8
SW
716 gen = cap->session->s_cap_gen;
717 ttl = cap->session->s_cap_ttl;
d8fb02ab 718 spin_unlock(&cap->session->s_gen_ttl_lock);
a8599bd8 719
685f9a5d 720 if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
a8599bd8
SW
721 dout("__cap_is_valid %p cap %p issued %s "
722 "but STALE (gen %u vs %u)\n", &cap->ci->vfs_inode,
685f9a5d 723 cap, ceph_cap_string(cap->issued), cap->cap_gen, gen);
a8599bd8
SW
724 return 0;
725 }
726
727 return 1;
728}
729
730/*
731 * Return set of valid cap bits issued to us. Note that caps time
732 * out, and may be invalidated in bulk if the client session times out
733 * and session->s_cap_gen is bumped.
734 */
735int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
736{
d9df2783 737 int have = ci->i_snap_caps;
a8599bd8
SW
738 struct ceph_cap *cap;
739 struct rb_node *p;
740
741 if (implemented)
742 *implemented = 0;
743 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
744 cap = rb_entry(p, struct ceph_cap, ci_node);
745 if (!__cap_is_valid(cap))
746 continue;
747 dout("__ceph_caps_issued %p cap %p issued %s\n",
748 &ci->vfs_inode, cap, ceph_cap_string(cap->issued));
749 have |= cap->issued;
750 if (implemented)
751 *implemented |= cap->implemented;
752 }
b1530f57
YZ
753 /*
754 * exclude caps issued by non-auth MDS, but are been revoking
755 * by the auth MDS. The non-auth MDS should be revoking/exporting
756 * these caps, but the message is delayed.
757 */
758 if (ci->i_auth_cap) {
759 cap = ci->i_auth_cap;
760 have &= ~cap->implemented | cap->issued;
761 }
a8599bd8
SW
762 return have;
763}
764
765/*
766 * Get cap bits issued by caps other than @ocap
767 */
768int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap)
769{
770 int have = ci->i_snap_caps;
771 struct ceph_cap *cap;
772 struct rb_node *p;
773
774 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
775 cap = rb_entry(p, struct ceph_cap, ci_node);
776 if (cap == ocap)
777 continue;
778 if (!__cap_is_valid(cap))
779 continue;
780 have |= cap->issued;
781 }
782 return have;
783}
784
785/*
786 * Move a cap to the end of the LRU (oldest caps at list head, newest
787 * at list tail).
788 */
789static void __touch_cap(struct ceph_cap *cap)
790{
791 struct ceph_mds_session *s = cap->session;
792
a8599bd8 793 spin_lock(&s->s_cap_lock);
d37b1d99 794 if (!s->s_cap_iterator) {
5dacf091
SW
795 dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
796 s->s_mds);
797 list_move_tail(&cap->session_caps, &s->s_caps);
798 } else {
799 dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n",
800 &cap->ci->vfs_inode, cap, s->s_mds);
801 }
a8599bd8
SW
802 spin_unlock(&s->s_cap_lock);
803}
804
805/*
806 * Check if we hold the given mask. If so, move the cap(s) to the
807 * front of their respective LRUs. (This is the preferred way for
808 * callers to check for caps they want.)
809 */
810int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
811{
812 struct ceph_cap *cap;
813 struct rb_node *p;
814 int have = ci->i_snap_caps;
815
816 if ((have & mask) == mask) {
817 dout("__ceph_caps_issued_mask %p snap issued %s"
818 " (mask %s)\n", &ci->vfs_inode,
819 ceph_cap_string(have),
820 ceph_cap_string(mask));
821 return 1;
822 }
823
824 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
825 cap = rb_entry(p, struct ceph_cap, ci_node);
826 if (!__cap_is_valid(cap))
827 continue;
828 if ((cap->issued & mask) == mask) {
829 dout("__ceph_caps_issued_mask %p cap %p issued %s"
830 " (mask %s)\n", &ci->vfs_inode, cap,
831 ceph_cap_string(cap->issued),
832 ceph_cap_string(mask));
833 if (touch)
834 __touch_cap(cap);
835 return 1;
836 }
837
838 /* does a combination of caps satisfy mask? */
839 have |= cap->issued;
840 if ((have & mask) == mask) {
841 dout("__ceph_caps_issued_mask %p combo issued %s"
842 " (mask %s)\n", &ci->vfs_inode,
843 ceph_cap_string(cap->issued),
844 ceph_cap_string(mask));
845 if (touch) {
846 struct rb_node *q;
847
25985edc 848 /* touch this + preceding caps */
a8599bd8
SW
849 __touch_cap(cap);
850 for (q = rb_first(&ci->i_caps); q != p;
851 q = rb_next(q)) {
852 cap = rb_entry(q, struct ceph_cap,
853 ci_node);
854 if (!__cap_is_valid(cap))
855 continue;
856 __touch_cap(cap);
857 }
858 }
859 return 1;
860 }
861 }
862
863 return 0;
864}
865
866/*
867 * Return true if mask caps are currently being revoked by an MDS.
868 */
6ee6b953
YZ
869int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
870 struct ceph_cap *ocap, int mask)
a8599bd8 871{
a8599bd8
SW
872 struct ceph_cap *cap;
873 struct rb_node *p;
a8599bd8 874
a8599bd8
SW
875 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
876 cap = rb_entry(p, struct ceph_cap, ci_node);
9563f88c 877 if (cap != ocap &&
6ee6b953
YZ
878 (cap->implemented & ~cap->issued & mask))
879 return 1;
a8599bd8 880 }
6ee6b953
YZ
881 return 0;
882}
883
884int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
885{
886 struct inode *inode = &ci->vfs_inode;
887 int ret;
888
889 spin_lock(&ci->i_ceph_lock);
890 ret = __ceph_caps_revoking_other(ci, NULL, mask);
be655596 891 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
892 dout("ceph_caps_revoking %p %s = %d\n", inode,
893 ceph_cap_string(mask), ret);
894 return ret;
895}
896
897int __ceph_caps_used(struct ceph_inode_info *ci)
898{
899 int used = 0;
900 if (ci->i_pin_ref)
901 used |= CEPH_CAP_PIN;
902 if (ci->i_rd_ref)
903 used |= CEPH_CAP_FILE_RD;
fdd4e158
YZ
904 if (ci->i_rdcache_ref ||
905 (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
906 ci->vfs_inode.i_data.nrpages))
a8599bd8
SW
907 used |= CEPH_CAP_FILE_CACHE;
908 if (ci->i_wr_ref)
909 used |= CEPH_CAP_FILE_WR;
d3d0720d 910 if (ci->i_wb_ref || ci->i_wrbuffer_ref)
a8599bd8
SW
911 used |= CEPH_CAP_FILE_BUFFER;
912 return used;
913}
914
915/*
916 * wanted, by virtue of open file modes
917 */
918int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
919{
774a6a11
YZ
920 int i, bits = 0;
921 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
922 if (ci->i_nr_by_mode[i])
923 bits |= 1 << i;
924 }
925 if (bits == 0)
926 return 0;
927 return ceph_caps_for_mode(bits >> 1);
a8599bd8
SW
928}
929
930/*
931 * Return caps we have registered with the MDS(s) as 'wanted'.
932 */
c1944fed 933int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
a8599bd8
SW
934{
935 struct ceph_cap *cap;
936 struct rb_node *p;
937 int mds_wanted = 0;
938
939 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
940 cap = rb_entry(p, struct ceph_cap, ci_node);
c1944fed 941 if (check && !__cap_is_valid(cap))
a8599bd8 942 continue;
a2550604
YZ
943 if (cap == ci->i_auth_cap)
944 mds_wanted |= cap->mds_wanted;
945 else
946 mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);
a8599bd8
SW
947 }
948 return mds_wanted;
949}
950
951/*
be655596 952 * called under i_ceph_lock
a8599bd8 953 */
0f439c74
YZ
954static int __ceph_is_single_caps(struct ceph_inode_info *ci)
955{
956 return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
957}
958
a8599bd8
SW
959static int __ceph_is_any_caps(struct ceph_inode_info *ci)
960{
d9df2783 961 return !RB_EMPTY_ROOT(&ci->i_caps);
a8599bd8
SW
962}
963
9215aeea
YZ
964int ceph_is_any_caps(struct inode *inode)
965{
966 struct ceph_inode_info *ci = ceph_inode(inode);
967 int ret;
968
969 spin_lock(&ci->i_ceph_lock);
970 ret = __ceph_is_any_caps(ci);
971 spin_unlock(&ci->i_ceph_lock);
972
973 return ret;
974}
975
db40cc17
YZ
976static void drop_inode_snap_realm(struct ceph_inode_info *ci)
977{
978 struct ceph_snap_realm *realm = ci->i_snap_realm;
979 spin_lock(&realm->inodes_with_caps_lock);
980 list_del_init(&ci->i_snap_realm_item);
981 ci->i_snap_realm_counter++;
982 ci->i_snap_realm = NULL;
983 spin_unlock(&realm->inodes_with_caps_lock);
984 ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc,
985 realm);
986}
987
a8599bd8 988/*
f818a736
SW
989 * Remove a cap. Take steps to deal with a racing iterate_session_caps.
990 *
be655596 991 * caller should hold i_ceph_lock.
a6369741 992 * caller will not hold session s_mutex if called from destroy_inode.
a8599bd8 993 */
a096b09a 994void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
a8599bd8
SW
995{
996 struct ceph_mds_session *session = cap->session;
997 struct ceph_inode_info *ci = cap->ci;
640ef79d 998 struct ceph_mds_client *mdsc =
3d14c5d2 999 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
f818a736 1000 int removed = 0;
a8599bd8
SW
1001
1002 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
1003
7c1332b8
SW
1004 /* remove from session list */
1005 spin_lock(&session->s_cap_lock);
1006 if (session->s_cap_iterator == cap) {
1007 /* not yet, we are iterating over this very cap */
1008 dout("__ceph_remove_cap delaying %p removal from session %p\n",
1009 cap, cap->session);
1010 } else {
1011 list_del_init(&cap->session_caps);
1012 session->s_nr_caps--;
1013 cap->session = NULL;
f818a736 1014 removed = 1;
7c1332b8 1015 }
f818a736
SW
1016 /* protect backpointer with s_cap_lock: see iterate_session_caps */
1017 cap->ci = NULL;
745a8e3b
YZ
1018
1019 /*
1020 * s_cap_reconnect is protected by s_cap_lock. no one changes
1021 * s_cap_gen while session is in the reconnect state.
1022 */
1023 if (queue_release &&
1024 (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
1025 cap->queue_release = 1;
1026 if (removed) {
1027 list_add_tail(&cap->session_caps,
1028 &session->s_cap_releases);
1029 session->s_num_cap_releases++;
1030 removed = 0;
1031 }
1032 } else {
1033 cap->queue_release = 0;
1034 }
1035 cap->cap_ino = ci->i_vino.ino;
1036
7c1332b8
SW
1037 spin_unlock(&session->s_cap_lock);
1038
f818a736
SW
1039 /* remove from inode list */
1040 rb_erase(&cap->ci_node, &ci->i_caps);
1041 if (ci->i_auth_cap == cap)
1042 ci->i_auth_cap = NULL;
1043
1044 if (removed)
37151668 1045 ceph_put_cap(mdsc, cap);
a8599bd8 1046
db40cc17
YZ
1047 /* when reconnect denied, we remove session caps forcibly,
1048 * i_wr_ref can be non-zero. If there are ongoing write,
1049 * keep i_snap_realm.
1050 */
1051 if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm)
1052 drop_inode_snap_realm(ci);
1053
a8599bd8
SW
1054 if (!__ceph_is_any_real_caps(ci))
1055 __cap_delay_cancel(mdsc, ci);
1056}
1057
0ff8bfb3
JL
1058struct cap_msg_args {
1059 struct ceph_mds_session *session;
1060 u64 ino, cid, follows;
1061 u64 flush_tid, oldest_flush_tid, size, max_size;
1062 u64 xattr_version;
1063 struct ceph_buffer *xattr_buf;
1064 struct timespec atime, mtime, ctime;
1065 int op, caps, wanted, dirty;
1066 u32 seq, issue_seq, mseq, time_warp_seq;
1e4ef0c6 1067 u32 flags;
0ff8bfb3
JL
1068 kuid_t uid;
1069 kgid_t gid;
1070 umode_t mode;
1071 bool inline_data;
1072};
1073
a8599bd8
SW
1074/*
1075 * Build and send a cap message to the given MDS.
1076 *
1077 * Caller should be holding s_mutex.
1078 */
0ff8bfb3 1079static int send_cap_msg(struct cap_msg_args *arg)
a8599bd8
SW
1080{
1081 struct ceph_mds_caps *fc;
1082 struct ceph_msg *msg;
e20d258d
YZ
1083 void *p;
1084 size_t extra_len;
43b29673 1085 struct timespec zerotime = {0};
92475f05 1086 struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;
a8599bd8
SW
1087
1088 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
a2971c8c 1089 " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
0ff8bfb3
JL
1090 " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op),
1091 arg->cid, arg->ino, ceph_cap_string(arg->caps),
1092 ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty),
1093 arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid,
1094 arg->mseq, arg->follows, arg->size, arg->max_size,
1095 arg->xattr_version,
1096 arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);
a8599bd8 1097
a2971c8c
YZ
1098 /* flock buffer size + inline version + inline data size +
1099 * osd_epoch_barrier + oldest_flush_tid */
43b29673 1100 extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4;
e20d258d
YZ
1101 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
1102 GFP_NOFS, false);
a79832f2
SW
1103 if (!msg)
1104 return -ENOMEM;
a8599bd8 1105
43b29673 1106 msg->hdr.version = cpu_to_le16(10);
0ff8bfb3 1107 msg->hdr.tid = cpu_to_le64(arg->flush_tid);
a8599bd8 1108
6df058c0 1109 fc = msg->front.iov_base;
a8599bd8
SW
1110 memset(fc, 0, sizeof(*fc));
1111
0ff8bfb3
JL
1112 fc->cap_id = cpu_to_le64(arg->cid);
1113 fc->op = cpu_to_le32(arg->op);
1114 fc->seq = cpu_to_le32(arg->seq);
1115 fc->issue_seq = cpu_to_le32(arg->issue_seq);
1116 fc->migrate_seq = cpu_to_le32(arg->mseq);
1117 fc->caps = cpu_to_le32(arg->caps);
1118 fc->wanted = cpu_to_le32(arg->wanted);
1119 fc->dirty = cpu_to_le32(arg->dirty);
1120 fc->ino = cpu_to_le64(arg->ino);
1121 fc->snap_follows = cpu_to_le64(arg->follows);
1122
1123 fc->size = cpu_to_le64(arg->size);
1124 fc->max_size = cpu_to_le64(arg->max_size);
1125 ceph_encode_timespec(&fc->mtime, &arg->mtime);
1126 ceph_encode_timespec(&fc->atime, &arg->atime);
1127 ceph_encode_timespec(&fc->ctime, &arg->ctime);
1128 fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq);
1129
1130 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid));
1131 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid));
1132 fc->mode = cpu_to_le32(arg->mode);
1133
1134 fc->xattr_version = cpu_to_le64(arg->xattr_version);
1135 if (arg->xattr_buf) {
1136 msg->middle = ceph_buffer_get(arg->xattr_buf);
1137 fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
1138 msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
9670079f
JL
1139 }
1140
e20d258d 1141 p = fc + 1;
43b29673 1142 /* flock buffer size (version 2) */
e20d258d 1143 ceph_encode_32(&p, 0);
43b29673 1144 /* inline version (version 4) */
0ff8bfb3 1145 ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
e20d258d
YZ
1146 /* inline data size */
1147 ceph_encode_32(&p, 0);
92475f05
JL
1148 /*
1149 * osd_epoch_barrier (version 5)
1150 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in
1151 * case it was recently changed
1152 */
1153 ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier));
43b29673 1154 /* oldest_flush_tid (version 6) */
0ff8bfb3 1155 ceph_encode_64(&p, arg->oldest_flush_tid);
e20d258d 1156
43b29673
JL
1157 /*
1158 * caller_uid/caller_gid (version 7)
1159 *
1160 * Currently, we don't properly track which caller dirtied the caps
1161 * last, and force a flush of them when there is a conflict. For now,
1162 * just set this to 0:0, to emulate how the MDS has worked up to now.
1163 */
1164 ceph_encode_32(&p, 0);
1165 ceph_encode_32(&p, 0);
1166
1167 /* pool namespace (version 8) (mds always ignores this) */
1168 ceph_encode_32(&p, 0);
1169
1170 /*
1171 * btime and change_attr (version 9)
1172 *
1173 * We just zero these out for now, as the MDS ignores them unless
1174 * the requisite feature flags are set (which we don't do yet).
1175 */
1176 ceph_encode_timespec(p, &zerotime);
1177 p += sizeof(struct ceph_timespec);
1178 ceph_encode_64(&p, 0);
1179
1180 /* Advisory flags (version 10) */
1e4ef0c6 1181 ceph_encode_32(&p, arg->flags);
43b29673 1182
0ff8bfb3 1183 ceph_con_send(&arg->session->s_con, msg);
a8599bd8
SW
1184 return 0;
1185}
1186
1187/*
a6369741 1188 * Queue cap releases when an inode is dropped from our cache. Since
be655596 1189 * inode is about to be destroyed, there is no need for i_ceph_lock.
a8599bd8
SW
1190 */
1191void ceph_queue_caps_release(struct inode *inode)
1192{
1193 struct ceph_inode_info *ci = ceph_inode(inode);
1194 struct rb_node *p;
1195
a8599bd8
SW
1196 p = rb_first(&ci->i_caps);
1197 while (p) {
1198 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
a8599bd8 1199 p = rb_next(p);
a096b09a 1200 __ceph_remove_cap(cap, true);
a8599bd8 1201 }
a8599bd8
SW
1202}
1203
1204/*
1205 * Send a cap msg on the given inode. Update our caps state, then
be655596 1206 * drop i_ceph_lock and send the message.
a8599bd8
SW
1207 *
1208 * Make note of max_size reported/requested from mds, revoked caps
1209 * that have now been implemented.
1210 *
1211 * Make half-hearted attempt ot to invalidate page cache if we are
1212 * dropping RDCACHE. Note that this will leave behind locked pages
1213 * that we'll then need to deal with elsewhere.
1214 *
1215 * Return non-zero if delayed release, or we experienced an error
1216 * such that the caller should requeue + retry later.
1217 *
be655596 1218 * called with i_ceph_lock, then drops it.
a8599bd8
SW
1219 * caller should hold snap_rwsem (read), s_mutex.
1220 */
1221static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1e4ef0c6
JL
1222 int op, bool sync, int used, int want, int retain,
1223 int flushing, u64 flush_tid, u64 oldest_flush_tid)
be655596 1224 __releases(cap->ci->i_ceph_lock)
a8599bd8
SW
1225{
1226 struct ceph_inode_info *ci = cap->ci;
1227 struct inode *inode = &ci->vfs_inode;
0ff8bfb3 1228 struct cap_msg_args arg;
bb0581f0 1229 int held, revoking;
a8599bd8 1230 int wake = 0;
a8599bd8 1231 int delayed = 0;
a8599bd8
SW
1232 int ret;
1233
68c28323
SW
1234 held = cap->issued | cap->implemented;
1235 revoking = cap->implemented & ~cap->issued;
1236 retain &= ~revoking;
68c28323 1237
a8599bd8
SW
1238 dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
1239 inode, cap, cap->session,
1240 ceph_cap_string(held), ceph_cap_string(held & retain),
1241 ceph_cap_string(revoking));
1242 BUG_ON((retain & CEPH_CAP_PIN) == 0);
1243
0ff8bfb3 1244 arg.session = cap->session;
a8599bd8
SW
1245
1246 /* don't release wanted unless we've waited a bit. */
1247 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
1248 time_before(jiffies, ci->i_hold_caps_min)) {
1249 dout(" delaying issued %s -> %s, wanted %s -> %s on send\n",
1250 ceph_cap_string(cap->issued),
1251 ceph_cap_string(cap->issued & retain),
1252 ceph_cap_string(cap->mds_wanted),
1253 ceph_cap_string(want));
1254 want |= cap->mds_wanted;
1255 retain |= cap->issued;
1256 delayed = 1;
1257 }
1258 ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH);
eb65b919
YZ
1259 if (want & ~cap->mds_wanted) {
1260 /* user space may open/close single file frequently.
1261 * This avoids droping mds_wanted immediately after
1262 * requesting new mds_wanted.
1263 */
1264 __cap_set_timeouts(mdsc, ci);
1265 }
a8599bd8
SW
1266
1267 cap->issued &= retain; /* drop bits we don't want */
1268 if (cap->implemented & ~cap->issued) {
1269 /*
1270 * Wake up any waiters on wanted -> needed transition.
1271 * This is due to the weird transition from buffered
1272 * to sync IO... we need to flush dirty pages _before_
1273 * allowing sync writes to avoid reordering.
1274 */
1275 wake = 1;
1276 }
1277 cap->implemented &= cap->issued | used;
1278 cap->mds_wanted = want;
1279
0ff8bfb3
JL
1280 arg.ino = ceph_vino(inode).ino;
1281 arg.cid = cap->cap_id;
1282 arg.follows = flushing ? ci->i_head_snapc->seq : 0;
1283 arg.flush_tid = flush_tid;
1284 arg.oldest_flush_tid = oldest_flush_tid;
1285
1286 arg.size = inode->i_size;
1287 ci->i_reported_size = arg.size;
1288 arg.max_size = ci->i_wanted_max_size;
1289 ci->i_requested_max_size = arg.max_size;
a8599bd8 1290
082afec9 1291 if (flushing & CEPH_CAP_XATTR_EXCL) {
a8599bd8 1292 __ceph_build_xattrs_blob(ci);
0ff8bfb3
JL
1293 arg.xattr_version = ci->i_xattrs.version;
1294 arg.xattr_buf = ci->i_xattrs.blob;
1295 } else {
1296 arg.xattr_buf = NULL;
a8599bd8
SW
1297 }
1298
0ff8bfb3
JL
1299 arg.mtime = inode->i_mtime;
1300 arg.atime = inode->i_atime;
1301 arg.ctime = inode->i_ctime;
1302
1303 arg.op = op;
1304 arg.caps = cap->implemented;
1305 arg.wanted = want;
1306 arg.dirty = flushing;
1307
1308 arg.seq = cap->seq;
1309 arg.issue_seq = cap->issue_seq;
1310 arg.mseq = cap->mseq;
1311 arg.time_warp_seq = ci->i_time_warp_seq;
1312
1313 arg.uid = inode->i_uid;
1314 arg.gid = inode->i_gid;
1315 arg.mode = inode->i_mode;
1316
1317 arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
95569713
YZ
1318 if (list_empty(&ci->i_cap_snaps))
1319 arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP;
1320 else
1321 arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
1e4ef0c6
JL
1322 if (sync)
1323 arg.flags |= CEPH_CLIENT_CAPS_SYNC;
e20d258d 1324
be655596 1325 spin_unlock(&ci->i_ceph_lock);
a8599bd8 1326
0ff8bfb3 1327 ret = send_cap_msg(&arg);
a8599bd8
SW
1328 if (ret < 0) {
1329 dout("error sending cap msg, must requeue %p\n", inode);
1330 delayed = 1;
1331 }
1332
1333 if (wake)
03066f23 1334 wake_up_all(&ci->i_cap_wq);
a8599bd8
SW
1335
1336 return delayed;
1337}
1338
0e294387
YZ
1339static inline int __send_flush_snap(struct inode *inode,
1340 struct ceph_mds_session *session,
1341 struct ceph_cap_snap *capsnap,
1342 u32 mseq, u64 oldest_flush_tid)
1343{
0ff8bfb3
JL
1344 struct cap_msg_args arg;
1345
1346 arg.session = session;
1347 arg.ino = ceph_vino(inode).ino;
1348 arg.cid = 0;
1349 arg.follows = capsnap->follows;
1350 arg.flush_tid = capsnap->cap_flush.tid;
1351 arg.oldest_flush_tid = oldest_flush_tid;
1352
1353 arg.size = capsnap->size;
1354 arg.max_size = 0;
1355 arg.xattr_version = capsnap->xattr_version;
1356 arg.xattr_buf = capsnap->xattr_blob;
1357
1358 arg.atime = capsnap->atime;
1359 arg.mtime = capsnap->mtime;
1360 arg.ctime = capsnap->ctime;
1361
1362 arg.op = CEPH_CAP_OP_FLUSHSNAP;
1363 arg.caps = capsnap->issued;
1364 arg.wanted = 0;
1365 arg.dirty = capsnap->dirty;
1366
1367 arg.seq = 0;
1368 arg.issue_seq = 0;
1369 arg.mseq = mseq;
1370 arg.time_warp_seq = capsnap->time_warp_seq;
1371
1372 arg.uid = capsnap->uid;
1373 arg.gid = capsnap->gid;
1374 arg.mode = capsnap->mode;
1375
1376 arg.inline_data = capsnap->inline_data;
1e4ef0c6 1377 arg.flags = 0;
0ff8bfb3
JL
1378
1379 return send_cap_msg(&arg);
0e294387
YZ
1380}
1381
a8599bd8
SW
1382/*
1383 * When a snapshot is taken, clients accumulate dirty metadata on
1384 * inodes with capabilities in ceph_cap_snaps to describe the file
1385 * state at the time the snapshot was taken. This must be flushed
1386 * asynchronously back to the MDS once sync writes complete and dirty
1387 * data is written out.
1388 *
be655596 1389 * Called under i_ceph_lock. Takes s_mutex as needed.
a8599bd8 1390 */
ed9b430c
YZ
1391static void __ceph_flush_snaps(struct ceph_inode_info *ci,
1392 struct ceph_mds_session *session)
be655596
SW
1393 __releases(ci->i_ceph_lock)
1394 __acquires(ci->i_ceph_lock)
a8599bd8
SW
1395{
1396 struct inode *inode = &ci->vfs_inode;
ed9b430c 1397 struct ceph_mds_client *mdsc = session->s_mdsc;
a8599bd8 1398 struct ceph_cap_snap *capsnap;
ed9b430c
YZ
1399 u64 oldest_flush_tid = 0;
1400 u64 first_tid = 1, last_tid = 0;
a8599bd8 1401
ed9b430c 1402 dout("__flush_snaps %p session %p\n", inode, session);
a8599bd8 1403
a8599bd8 1404 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
a8599bd8
SW
1405 /*
1406 * we need to wait for sync writes to complete and for dirty
1407 * pages to be written out.
1408 */
1409 if (capsnap->dirty_pages || capsnap->writing)
cfc0bf66 1410 break;
a8599bd8 1411
86056090
YZ
1412 /* should be removed by ceph_try_drop_cap_snap() */
1413 BUG_ON(!capsnap->need_flush);
819ccbfa 1414
e835124c 1415 /* only flush each capsnap once */
0e294387 1416 if (capsnap->cap_flush.tid > 0) {
ed9b430c 1417 dout(" already flushed %p, skipping\n", capsnap);
e835124c
SW
1418 continue;
1419 }
1420
553adfd9 1421 spin_lock(&mdsc->cap_dirty_lock);
0e294387
YZ
1422 capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
1423 list_add_tail(&capsnap->cap_flush.g_list,
1424 &mdsc->cap_flush_list);
ed9b430c
YZ
1425 if (oldest_flush_tid == 0)
1426 oldest_flush_tid = __get_oldest_flush_tid(mdsc);
0e294387
YZ
1427 if (list_empty(&ci->i_flushing_item)) {
1428 list_add_tail(&ci->i_flushing_item,
1429 &session->s_cap_flushing);
1430 }
553adfd9
YZ
1431 spin_unlock(&mdsc->cap_dirty_lock);
1432
0e294387
YZ
1433 list_add_tail(&capsnap->cap_flush.i_list,
1434 &ci->i_cap_flush_list);
1435
ed9b430c
YZ
1436 if (first_tid == 1)
1437 first_tid = capsnap->cap_flush.tid;
1438 last_tid = capsnap->cap_flush.tid;
1439 }
1440
1441 ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
1442
1443 while (first_tid <= last_tid) {
1444 struct ceph_cap *cap = ci->i_auth_cap;
1445 struct ceph_cap_flush *cf;
1446 int ret;
1447
1448 if (!(cap && cap->session == session)) {
1449 dout("__flush_snaps %p auth cap %p not mds%d, "
1450 "stop\n", inode, cap, session->s_mds);
1451 break;
1452 }
1453
1454 ret = -ENOENT;
1455 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
1456 if (cf->tid >= first_tid) {
1457 ret = 0;
1458 break;
1459 }
1460 }
1461 if (ret < 0)
1462 break;
1463
1464 first_tid = cf->tid + 1;
1465
1466 capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
805692d0 1467 refcount_inc(&capsnap->nref);
be655596 1468 spin_unlock(&ci->i_ceph_lock);
a8599bd8 1469
ed9b430c
YZ
1470 dout("__flush_snaps %p capsnap %p tid %llu %s\n",
1471 inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty));
a8599bd8 1472
ed9b430c
YZ
1473 ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
1474 oldest_flush_tid);
1475 if (ret < 0) {
1476 pr_err("__flush_snaps: error sending cap flushsnap, "
1477 "ino (%llx.%llx) tid %llu follows %llu\n",
1478 ceph_vinop(inode), cf->tid, capsnap->follows);
1479 }
a8599bd8 1480
ed9b430c 1481 ceph_put_cap_snap(capsnap);
be655596 1482 spin_lock(&ci->i_ceph_lock);
a8599bd8 1483 }
ed9b430c 1484}
a8599bd8 1485
ed9b430c
YZ
1486void ceph_flush_snaps(struct ceph_inode_info *ci,
1487 struct ceph_mds_session **psession)
1488{
1489 struct inode *inode = &ci->vfs_inode;
1490 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
e4d2b16a 1491 struct ceph_mds_session *session = NULL;
ed9b430c 1492 int mds;
e4d2b16a 1493
ed9b430c 1494 dout("ceph_flush_snaps %p\n", inode);
e4d2b16a
YZ
1495 if (psession)
1496 session = *psession;
ed9b430c
YZ
1497retry:
1498 spin_lock(&ci->i_ceph_lock);
1499 if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) {
1500 dout(" no capsnap needs flush, doing nothing\n");
1501 goto out;
1502 }
1503 if (!ci->i_auth_cap) {
1504 dout(" no auth cap (migrating?), doing nothing\n");
1505 goto out;
1506 }
a8599bd8 1507
ed9b430c
YZ
1508 mds = ci->i_auth_cap->session->s_mds;
1509 if (session && session->s_mds != mds) {
1510 dout(" oops, wrong session %p mutex\n", session);
a8599bd8
SW
1511 mutex_unlock(&session->s_mutex);
1512 ceph_put_mds_session(session);
ed9b430c
YZ
1513 session = NULL;
1514 }
1515 if (!session) {
1516 spin_unlock(&ci->i_ceph_lock);
1517 mutex_lock(&mdsc->mutex);
1518 session = __ceph_lookup_mds_session(mdsc, mds);
1519 mutex_unlock(&mdsc->mutex);
1520 if (session) {
1521 dout(" inverting session/ino locks on %p\n", session);
1522 mutex_lock(&session->s_mutex);
1523 }
1524 goto retry;
a8599bd8 1525 }
a8599bd8 1526
24d063ac
YZ
1527 // make sure flushsnap messages are sent in proper order.
1528 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
1529 __kick_flushing_caps(mdsc, session, ci, 0);
1530 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
1531 }
1532
ed9b430c
YZ
1533 __ceph_flush_snaps(ci, session);
1534out:
be655596 1535 spin_unlock(&ci->i_ceph_lock);
ed9b430c
YZ
1536
1537 if (psession) {
1538 *psession = session;
c858a070 1539 } else if (session) {
ed9b430c
YZ
1540 mutex_unlock(&session->s_mutex);
1541 ceph_put_mds_session(session);
1542 }
1543 /* we flushed them all; remove this inode from the queue */
1544 spin_lock(&mdsc->snap_flush_lock);
1545 list_del_init(&ci->i_snap_flush_item);
1546 spin_unlock(&mdsc->snap_flush_lock);
a8599bd8
SW
1547}
1548
76e3b390 1549/*
fca65b4a
SW
1550 * Mark caps dirty. If inode is newly dirty, return the dirty flags.
1551 * Caller is then responsible for calling __mark_inode_dirty with the
1552 * returned flags value.
76e3b390 1553 */
f66fd9f0
YZ
1554int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
1555 struct ceph_cap_flush **pcf)
76e3b390 1556{
640ef79d 1557 struct ceph_mds_client *mdsc =
3d14c5d2 1558 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
76e3b390
SW
1559 struct inode *inode = &ci->vfs_inode;
1560 int was = ci->i_dirty_caps;
1561 int dirty = 0;
1562
571ade33
YZ
1563 if (!ci->i_auth_cap) {
1564 pr_warn("__mark_dirty_caps %p %llx mask %s, "
1565 "but no auth cap (session was closed?)\n",
1566 inode, ceph_ino(inode), ceph_cap_string(mask));
1567 return 0;
1568 }
1569
76e3b390
SW
1570 dout("__mark_dirty_caps %p %s dirty %s -> %s\n", &ci->vfs_inode,
1571 ceph_cap_string(mask), ceph_cap_string(was),
1572 ceph_cap_string(was | mask));
1573 ci->i_dirty_caps |= mask;
1574 if (was == 0) {
f66fd9f0
YZ
1575 WARN_ON_ONCE(ci->i_prealloc_cap_flush);
1576 swap(ci->i_prealloc_cap_flush, *pcf);
1577
604d1b02
YZ
1578 if (!ci->i_head_snapc) {
1579 WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
7d8cb26d
SW
1580 ci->i_head_snapc = ceph_get_snap_context(
1581 ci->i_snap_realm->cached_context);
604d1b02 1582 }
0685235f
YZ
1583 dout(" inode %p now dirty snapc %p auth cap %p\n",
1584 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
76e3b390
SW
1585 BUG_ON(!list_empty(&ci->i_dirty_item));
1586 spin_lock(&mdsc->cap_dirty_lock);
11df2dfb 1587 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
76e3b390
SW
1588 spin_unlock(&mdsc->cap_dirty_lock);
1589 if (ci->i_flushing_caps == 0) {
3772d26d 1590 ihold(inode);
76e3b390
SW
1591 dirty |= I_DIRTY_SYNC;
1592 }
f66fd9f0
YZ
1593 } else {
1594 WARN_ON_ONCE(!ci->i_prealloc_cap_flush);
76e3b390
SW
1595 }
1596 BUG_ON(list_empty(&ci->i_dirty_item));
1597 if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
1598 (mask & CEPH_CAP_FILE_BUFFER))
1599 dirty |= I_DIRTY_DATASYNC;
76e3b390 1600 __cap_delay_requeue(mdsc, ci);
fca65b4a 1601 return dirty;
76e3b390
SW
1602}
1603
f66fd9f0
YZ
1604struct ceph_cap_flush *ceph_alloc_cap_flush(void)
1605{
1606 return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
1607}
1608
1609void ceph_free_cap_flush(struct ceph_cap_flush *cf)
1610{
1611 if (cf)
1612 kmem_cache_free(ceph_cap_flush_cachep, cf);
1613}
1614
a2971c8c
YZ
1615static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
1616{
e4500b5e 1617 if (!list_empty(&mdsc->cap_flush_list)) {
a2971c8c 1618 struct ceph_cap_flush *cf =
e4500b5e
YZ
1619 list_first_entry(&mdsc->cap_flush_list,
1620 struct ceph_cap_flush, g_list);
a2971c8c
YZ
1621 return cf->tid;
1622 }
1623 return 0;
1624}
1625
c8799fc4
YZ
1626/*
1627 * Remove cap_flush from the mdsc's or inode's flushing cap list.
1628 * Return true if caller needs to wake up flush waiters.
1629 */
1630static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
1631 struct ceph_inode_info *ci,
1632 struct ceph_cap_flush *cf)
1633{
1634 struct ceph_cap_flush *prev;
1635 bool wake = cf->wake;
1636 if (mdsc) {
1637 /* are there older pending cap flushes? */
1638 if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
1639 prev = list_prev_entry(cf, g_list);
1640 prev->wake = true;
1641 wake = false;
1642 }
1643 list_del(&cf->g_list);
1644 } else if (ci) {
1645 if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
1646 prev = list_prev_entry(cf, i_list);
1647 prev->wake = true;
1648 wake = false;
1649 }
1650 list_del(&cf->i_list);
1651 } else {
1652 BUG_ON(1);
1653 }
1654 return wake;
1655}
1656
a8599bd8
SW
1657/*
1658 * Add dirty inode to the flushing list. Assigned a seq number so we
1659 * can wait for caps to flush without starving.
cdc35f96 1660 *
be655596 1661 * Called under i_ceph_lock.
a8599bd8 1662 */
cdc35f96 1663static int __mark_caps_flushing(struct inode *inode,
c8799fc4 1664 struct ceph_mds_session *session, bool wake,
a2971c8c 1665 u64 *flush_tid, u64 *oldest_flush_tid)
a8599bd8 1666{
3d14c5d2 1667 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
a8599bd8 1668 struct ceph_inode_info *ci = ceph_inode(inode);
f66fd9f0 1669 struct ceph_cap_flush *cf = NULL;
cdc35f96 1670 int flushing;
50b885b9 1671
cdc35f96 1672 BUG_ON(ci->i_dirty_caps == 0);
a8599bd8 1673 BUG_ON(list_empty(&ci->i_dirty_item));
f66fd9f0 1674 BUG_ON(!ci->i_prealloc_cap_flush);
cdc35f96
SW
1675
1676 flushing = ci->i_dirty_caps;
1677 dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n",
1678 ceph_cap_string(flushing),
1679 ceph_cap_string(ci->i_flushing_caps),
1680 ceph_cap_string(ci->i_flushing_caps | flushing));
1681 ci->i_flushing_caps |= flushing;
1682 ci->i_dirty_caps = 0;
afcdaea3 1683 dout(" inode %p now !dirty\n", inode);
cdc35f96 1684
f66fd9f0 1685 swap(cf, ci->i_prealloc_cap_flush);
553adfd9 1686 cf->caps = flushing;
c8799fc4 1687 cf->wake = wake;
553adfd9 1688
a8599bd8 1689 spin_lock(&mdsc->cap_dirty_lock);
afcdaea3
SW
1690 list_del_init(&ci->i_dirty_item);
1691
553adfd9 1692 cf->tid = ++mdsc->last_cap_flush_tid;
e4500b5e 1693 list_add_tail(&cf->g_list, &mdsc->cap_flush_list);
a2971c8c 1694 *oldest_flush_tid = __get_oldest_flush_tid(mdsc);
553adfd9 1695
a8599bd8
SW
1696 if (list_empty(&ci->i_flushing_item)) {
1697 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
1698 mdsc->num_cap_flushing++;
a8599bd8
SW
1699 }
1700 spin_unlock(&mdsc->cap_dirty_lock);
cdc35f96 1701
e4500b5e 1702 list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
553adfd9
YZ
1703
1704 *flush_tid = cf->tid;
cdc35f96 1705 return flushing;
a8599bd8
SW
1706}
1707
5ecad6fd
SW
1708/*
1709 * try to invalidate mapping pages without blocking.
1710 */
5ecad6fd
SW
1711static int try_nonblocking_invalidate(struct inode *inode)
1712{
1713 struct ceph_inode_info *ci = ceph_inode(inode);
1714 u32 invalidating_gen = ci->i_rdcache_gen;
1715
be655596 1716 spin_unlock(&ci->i_ceph_lock);
5ecad6fd 1717 invalidate_mapping_pages(&inode->i_data, 0, -1);
be655596 1718 spin_lock(&ci->i_ceph_lock);
5ecad6fd 1719
18a38193 1720 if (inode->i_data.nrpages == 0 &&
5ecad6fd
SW
1721 invalidating_gen == ci->i_rdcache_gen) {
1722 /* success. */
1723 dout("try_nonblocking_invalidate %p success\n", inode);
cd045cb4
SW
1724 /* save any racing async invalidate some trouble */
1725 ci->i_rdcache_revoking = ci->i_rdcache_gen - 1;
5ecad6fd
SW
1726 return 0;
1727 }
1728 dout("try_nonblocking_invalidate %p failed\n", inode);
1729 return -1;
1730}
1731
efb0ca76
YZ
1732bool __ceph_should_report_size(struct ceph_inode_info *ci)
1733{
1734 loff_t size = ci->vfs_inode.i_size;
1735 /* mds will adjust max size according to the reported size */
1736 if (ci->i_flushing_caps & CEPH_CAP_FILE_WR)
1737 return false;
1738 if (size >= ci->i_max_size)
1739 return true;
1740 /* half of previous max_size increment has been used */
1741 if (ci->i_max_size > ci->i_reported_size &&
1742 (size << 1) >= ci->i_max_size + ci->i_reported_size)
1743 return true;
1744 return false;
1745}
1746
a8599bd8
SW
1747/*
1748 * Swiss army knife function to examine currently used and wanted
1749 * versus held caps. Release, flush, ack revoked caps to mds as
1750 * appropriate.
1751 *
1752 * CHECK_CAPS_NODELAY - caller is delayed work and we should not delay
1753 * cap release further.
1754 * CHECK_CAPS_AUTHONLY - we should only check the auth cap
1755 * CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
1756 * further delay.
1757 */
1758void ceph_check_caps(struct ceph_inode_info *ci, int flags,
1759 struct ceph_mds_session *session)
1760{
3d14c5d2
YS
1761 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
1762 struct ceph_mds_client *mdsc = fsc->mdsc;
a8599bd8
SW
1763 struct inode *inode = &ci->vfs_inode;
1764 struct ceph_cap *cap;
a2971c8c 1765 u64 flush_tid, oldest_flush_tid;
395c312b 1766 int file_wanted, used, cap_used;
a8599bd8 1767 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
cbd03635 1768 int issued, implemented, want, retain, revoking, flushing = 0;
a8599bd8
SW
1769 int mds = -1; /* keep track of how far we've gone through i_caps list
1770 to avoid an infinite loop on retry */
1771 struct rb_node *p;
0f439c74
YZ
1772 int delayed = 0, sent = 0;
1773 bool no_delay = flags & CHECK_CAPS_NODELAY;
3609404f 1774 bool queue_invalidate = false;
3609404f 1775 bool tried_invalidate = false;
a8599bd8
SW
1776
1777 /* if we are unmounting, flush any unused caps immediately. */
1778 if (mdsc->stopping)
0f439c74 1779 no_delay = true;
a8599bd8 1780
be655596 1781 spin_lock(&ci->i_ceph_lock);
a8599bd8
SW
1782
1783 if (ci->i_ceph_flags & CEPH_I_FLUSH)
1784 flags |= CHECK_CAPS_FLUSH;
1785
0f439c74
YZ
1786 if (!(flags & CHECK_CAPS_AUTHONLY) ||
1787 (ci->i_auth_cap && __ceph_is_single_caps(ci)))
1788 __cap_delay_cancel(mdsc, ci);
1789
a8599bd8
SW
1790 goto retry_locked;
1791retry:
be655596 1792 spin_lock(&ci->i_ceph_lock);
a8599bd8
SW
1793retry_locked:
1794 file_wanted = __ceph_caps_file_wanted(ci);
1795 used = __ceph_caps_used(ci);
cbd03635
SW
1796 issued = __ceph_caps_issued(ci, &implemented);
1797 revoking = implemented & ~issued;
a8599bd8 1798
41445999
YZ
1799 want = file_wanted;
1800 retain = file_wanted | used | CEPH_CAP_PIN;
a8599bd8 1801 if (!mdsc->stopping && inode->i_nlink > 0) {
41445999 1802 if (file_wanted) {
a8599bd8 1803 retain |= CEPH_CAP_ANY; /* be greedy */
32ec4397
YZ
1804 } else if (S_ISDIR(inode->i_mode) &&
1805 (issued & CEPH_CAP_FILE_SHARED) &&
1806 __ceph_dir_is_complete(ci)) {
1807 /*
1808 * If a directory is complete, we want to keep
1809 * the exclusive cap. So that MDS does not end up
1810 * revoking the shared cap on every create/unlink
1811 * operation.
1812 */
1813 want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
1814 retain |= want;
a8599bd8 1815 } else {
32ec4397 1816
a8599bd8
SW
1817 retain |= CEPH_CAP_ANY_SHARED;
1818 /*
1819 * keep RD only if we didn't have the file open RW,
1820 * because then the mds would revoke it anyway to
1821 * journal max_size=0.
1822 */
1823 if (ci->i_max_size == 0)
1824 retain |= CEPH_CAP_ANY_RD;
1825 }
1826 }
1827
1828 dout("check_caps %p file_want %s used %s dirty %s flushing %s"
cbd03635 1829 " issued %s revoking %s retain %s %s%s%s\n", inode,
a8599bd8
SW
1830 ceph_cap_string(file_wanted),
1831 ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
1832 ceph_cap_string(ci->i_flushing_caps),
cbd03635 1833 ceph_cap_string(issued), ceph_cap_string(revoking),
a8599bd8
SW
1834 ceph_cap_string(retain),
1835 (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
1836 (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "",
1837 (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "");
1838
1839 /*
1840 * If we no longer need to hold onto old our caps, and we may
1841 * have cached pages, but don't want them, then try to invalidate.
1842 * If we fail, it's because pages are locked.... try again later.
1843 */
0f439c74 1844 if ((!no_delay || mdsc->stopping) &&
fdd4e158 1845 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */
9abd4db7 1846 !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */
fdd4e158 1847 inode->i_data.nrpages && /* have cached pages */
5e804ac4
YZ
1848 (revoking & (CEPH_CAP_FILE_CACHE|
1849 CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */
a8599bd8 1850 !tried_invalidate) {
a8599bd8 1851 dout("check_caps trying to invalidate on %p\n", inode);
5ecad6fd 1852 if (try_nonblocking_invalidate(inode) < 0) {
ee612d95
YZ
1853 dout("check_caps queuing invalidate\n");
1854 queue_invalidate = true;
1855 ci->i_rdcache_revoking = ci->i_rdcache_gen;
a8599bd8 1856 }
3609404f 1857 tried_invalidate = true;
a8599bd8
SW
1858 goto retry_locked;
1859 }
1860
a8599bd8
SW
1861 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
1862 cap = rb_entry(p, struct ceph_cap, ci_node);
a8599bd8
SW
1863
1864 /* avoid looping forever */
1865 if (mds >= cap->mds ||
1866 ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap))
1867 continue;
1868
1869 /* NOTE: no side-effects allowed, until we take s_mutex */
1870
395c312b
YZ
1871 cap_used = used;
1872 if (ci->i_auth_cap && cap != ci->i_auth_cap)
1873 cap_used &= ~ci->i_auth_cap->issued;
1874
a8599bd8 1875 revoking = cap->implemented & ~cap->issued;
395c312b 1876 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
9abd4db7
YZ
1877 cap->mds, cap, ceph_cap_string(cap_used),
1878 ceph_cap_string(cap->issued),
088b3f5e
SW
1879 ceph_cap_string(cap->implemented),
1880 ceph_cap_string(revoking));
a8599bd8
SW
1881
1882 if (cap == ci->i_auth_cap &&
1883 (cap->issued & CEPH_CAP_FILE_WR)) {
1884 /* request larger max_size from MDS? */
1885 if (ci->i_wanted_max_size > ci->i_max_size &&
1886 ci->i_wanted_max_size > ci->i_requested_max_size) {
1887 dout("requesting new max_size\n");
1888 goto ack;
1889 }
1890
1891 /* approaching file_max? */
efb0ca76 1892 if (__ceph_should_report_size(ci)) {
a8599bd8
SW
1893 dout("i_size approaching max_size\n");
1894 goto ack;
1895 }
1896 }
1897 /* flush anything dirty? */
7bc00fdd
YZ
1898 if (cap == ci->i_auth_cap) {
1899 if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) {
1900 dout("flushing dirty caps\n");
1901 goto ack;
1902 }
1903 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) {
1904 dout("flushing snap caps\n");
1905 goto ack;
1906 }
a8599bd8
SW
1907 }
1908
1909 /* completed revocation? going down and there are no caps? */
395c312b 1910 if (revoking && (revoking & cap_used) == 0) {
a8599bd8
SW
1911 dout("completed revocation of %s\n",
1912 ceph_cap_string(cap->implemented & ~cap->issued));
1913 goto ack;
1914 }
1915
1916 /* want more caps from mds? */
1917 if (want & ~(cap->mds_wanted | cap->issued))
1918 goto ack;
1919
1920 /* things we might delay */
1921 if ((cap->issued & ~retain) == 0 &&
1922 cap->mds_wanted == want)
1923 continue; /* nope, all good */
1924
0f439c74 1925 if (no_delay)
a8599bd8
SW
1926 goto ack;
1927
1928 /* delay? */
1929 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
1930 time_before(jiffies, ci->i_hold_caps_max)) {
1931 dout(" delaying issued %s -> %s, wanted %s -> %s\n",
1932 ceph_cap_string(cap->issued),
1933 ceph_cap_string(cap->issued & retain),
1934 ceph_cap_string(cap->mds_wanted),
1935 ceph_cap_string(want));
1936 delayed++;
1937 continue;
1938 }
1939
1940ack:
e9964c10
SW
1941 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
1942 dout(" skipping %p I_NOFLUSH set\n", inode);
1943 continue;
1944 }
1945
a8599bd8
SW
1946 if (session && session != cap->session) {
1947 dout("oops, wrong session %p mutex\n", session);
1948 mutex_unlock(&session->s_mutex);
1949 session = NULL;
1950 }
1951 if (!session) {
1952 session = cap->session;
1953 if (mutex_trylock(&session->s_mutex) == 0) {
1954 dout("inverting session/ino locks on %p\n",
1955 session);
be655596 1956 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
1957 if (took_snap_rwsem) {
1958 up_read(&mdsc->snap_rwsem);
1959 took_snap_rwsem = 0;
1960 }
1961 mutex_lock(&session->s_mutex);
1962 goto retry;
1963 }
1964 }
7bc00fdd
YZ
1965
1966 /* kick flushing and flush snaps before sending normal
1967 * cap message */
1968 if (cap == ci->i_auth_cap &&
1969 (ci->i_ceph_flags &
1970 (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
1971 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
24d063ac 1972 __kick_flushing_caps(mdsc, session, ci, 0);
7bc00fdd
YZ
1973 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
1974 }
ed9b430c
YZ
1975 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
1976 __ceph_flush_snaps(ci, session);
1977
7bc00fdd
YZ
1978 goto retry_locked;
1979 }
1980
a8599bd8
SW
1981 /* take snap_rwsem after session mutex */
1982 if (!took_snap_rwsem) {
1983 if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
1984 dout("inverting snap/in locks on %p\n",
1985 inode);
be655596 1986 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
1987 down_read(&mdsc->snap_rwsem);
1988 took_snap_rwsem = 1;
1989 goto retry;
1990 }
1991 took_snap_rwsem = 1;
1992 }
1993
553adfd9 1994 if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
c8799fc4 1995 flushing = __mark_caps_flushing(inode, session, false,
a2971c8c
YZ
1996 &flush_tid,
1997 &oldest_flush_tid);
553adfd9 1998 } else {
24be0c48 1999 flushing = 0;
553adfd9 2000 flush_tid = 0;
a2971c8c
YZ
2001 spin_lock(&mdsc->cap_dirty_lock);
2002 oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2003 spin_unlock(&mdsc->cap_dirty_lock);
553adfd9 2004 }
a8599bd8
SW
2005
2006 mds = cap->mds; /* remember mds, so we don't repeat */
2007 sent++;
2008
be655596 2009 /* __send_cap drops i_ceph_lock */
1e4ef0c6
JL
2010 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false,
2011 cap_used, want, retain, flushing,
2012 flush_tid, oldest_flush_tid);
be655596 2013 goto retry; /* retake i_ceph_lock and restart our cap scan. */
a8599bd8
SW
2014 }
2015
0f439c74
YZ
2016 /* Reschedule delayed caps release if we delayed anything */
2017 if (delayed)
a8599bd8
SW
2018 __cap_delay_requeue(mdsc, ci);
2019
be655596 2020 spin_unlock(&ci->i_ceph_lock);
a8599bd8 2021
cbd03635 2022 if (queue_invalidate)
3c6f6b79 2023 ceph_queue_invalidate(inode);
cbd03635 2024
cdc2ce05 2025 if (session)
a8599bd8
SW
2026 mutex_unlock(&session->s_mutex);
2027 if (took_snap_rwsem)
2028 up_read(&mdsc->snap_rwsem);
2029}
2030
a8599bd8
SW
2031/*
2032 * Try to flush dirty caps back to the auth mds.
2033 */
553adfd9 2034static int try_flush_caps(struct inode *inode, u64 *ptid)
a8599bd8 2035{
3d14c5d2 2036 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
a8599bd8 2037 struct ceph_inode_info *ci = ceph_inode(inode);
4fe59789 2038 struct ceph_mds_session *session = NULL;
89b52fe1 2039 int flushing = 0;
a2971c8c 2040 u64 flush_tid = 0, oldest_flush_tid = 0;
a8599bd8
SW
2041
2042retry:
be655596 2043 spin_lock(&ci->i_ceph_lock);
e9964c10 2044 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
6c2838fb 2045 spin_unlock(&ci->i_ceph_lock);
e9964c10
SW
2046 dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
2047 goto out;
2048 }
a8599bd8
SW
2049 if (ci->i_dirty_caps && ci->i_auth_cap) {
2050 struct ceph_cap *cap = ci->i_auth_cap;
2051 int used = __ceph_caps_used(ci);
2052 int want = __ceph_caps_wanted(ci);
2053 int delayed;
2054
4fe59789 2055 if (!session || session != cap->session) {
be655596 2056 spin_unlock(&ci->i_ceph_lock);
4fe59789
YZ
2057 if (session)
2058 mutex_unlock(&session->s_mutex);
a8599bd8
SW
2059 session = cap->session;
2060 mutex_lock(&session->s_mutex);
2061 goto retry;
2062 }
6c2838fb
JL
2063 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) {
2064 spin_unlock(&ci->i_ceph_lock);
a8599bd8 2065 goto out;
6c2838fb 2066 }
a8599bd8 2067
c8799fc4
YZ
2068 flushing = __mark_caps_flushing(inode, session, true,
2069 &flush_tid, &oldest_flush_tid);
a8599bd8 2070
be655596 2071 /* __send_cap drops i_ceph_lock */
1e4ef0c6
JL
2072 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true,
2073 used, want, (cap->issued | cap->implemented),
2074 flushing, flush_tid, oldest_flush_tid);
a8599bd8 2075
553adfd9
YZ
2076 if (delayed) {
2077 spin_lock(&ci->i_ceph_lock);
89b52fe1 2078 __cap_delay_requeue(mdsc, ci);
553adfd9
YZ
2079 spin_unlock(&ci->i_ceph_lock);
2080 }
2081 } else {
e4500b5e 2082 if (!list_empty(&ci->i_cap_flush_list)) {
553adfd9 2083 struct ceph_cap_flush *cf =
e4500b5e 2084 list_last_entry(&ci->i_cap_flush_list,
c8799fc4
YZ
2085 struct ceph_cap_flush, i_list);
2086 cf->wake = true;
553adfd9
YZ
2087 flush_tid = cf->tid;
2088 }
2089 flushing = ci->i_flushing_caps;
2090 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
2091 }
2092out:
4fe59789 2093 if (session)
a8599bd8 2094 mutex_unlock(&session->s_mutex);
553adfd9
YZ
2095
2096 *ptid = flush_tid;
a8599bd8
SW
2097 return flushing;
2098}
2099
2100/*
2101 * Return true if we've flushed caps through the given flush_tid.
2102 */
553adfd9 2103static int caps_are_flushed(struct inode *inode, u64 flush_tid)
a8599bd8
SW
2104{
2105 struct ceph_inode_info *ci = ceph_inode(inode);
553adfd9 2106 int ret = 1;
a8599bd8 2107
be655596 2108 spin_lock(&ci->i_ceph_lock);
e4500b5e
YZ
2109 if (!list_empty(&ci->i_cap_flush_list)) {
2110 struct ceph_cap_flush * cf =
2111 list_first_entry(&ci->i_cap_flush_list,
2112 struct ceph_cap_flush, i_list);
553adfd9 2113 if (cf->tid <= flush_tid)
a8599bd8 2114 ret = 0;
89b52fe1 2115 }
be655596 2116 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
2117 return ret;
2118}
2119
da819c81 2120/*
68cd5b4b 2121 * wait for any unsafe requests to complete.
da819c81 2122 */
68cd5b4b 2123static int unsafe_request_wait(struct inode *inode)
da819c81
YZ
2124{
2125 struct ceph_inode_info *ci = ceph_inode(inode);
68cd5b4b
YZ
2126 struct ceph_mds_request *req1 = NULL, *req2 = NULL;
2127 int ret, err = 0;
da819c81
YZ
2128
2129 spin_lock(&ci->i_unsafe_lock);
68cd5b4b
YZ
2130 if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) {
2131 req1 = list_last_entry(&ci->i_unsafe_dirops,
2132 struct ceph_mds_request,
2133 r_unsafe_dir_item);
2134 ceph_mdsc_get_request(req1);
2135 }
2136 if (!list_empty(&ci->i_unsafe_iops)) {
2137 req2 = list_last_entry(&ci->i_unsafe_iops,
2138 struct ceph_mds_request,
2139 r_unsafe_target_item);
2140 ceph_mdsc_get_request(req2);
2141 }
2142 spin_unlock(&ci->i_unsafe_lock);
da819c81 2143
4945a084 2144 dout("unsafe_request_wait %p wait on tid %llu %llu\n",
68cd5b4b
YZ
2145 inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
2146 if (req1) {
2147 ret = !wait_for_completion_timeout(&req1->r_safe_completion,
2148 ceph_timeout_jiffies(req1->r_timeout));
da819c81 2149 if (ret)
68cd5b4b
YZ
2150 err = -EIO;
2151 ceph_mdsc_put_request(req1);
2152 }
2153 if (req2) {
2154 ret = !wait_for_completion_timeout(&req2->r_safe_completion,
2155 ceph_timeout_jiffies(req2->r_timeout));
2156 if (ret)
2157 err = -EIO;
2158 ceph_mdsc_put_request(req2);
2159 }
2160 return err;
da819c81
YZ
2161}
2162
02c24a82 2163int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
a8599bd8 2164{
7ea80859 2165 struct inode *inode = file->f_mapping->host;
a8599bd8 2166 struct ceph_inode_info *ci = ceph_inode(inode);
553adfd9 2167 u64 flush_tid;
a8599bd8
SW
2168 int ret;
2169 int dirty;
2170
2171 dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
9a5530c6 2172
b74fceae 2173 ret = file_write_and_wait_range(file, start, end);
a8599bd8 2174 if (ret < 0)
da819c81
YZ
2175 goto out;
2176
2177 if (datasync)
2178 goto out;
2179
5955102c 2180 inode_lock(inode);
a8599bd8 2181
553adfd9 2182 dirty = try_flush_caps(inode, &flush_tid);
a8599bd8
SW
2183 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
2184
68cd5b4b 2185 ret = unsafe_request_wait(inode);
da819c81 2186
a8599bd8
SW
2187 /*
2188 * only wait on non-file metadata writeback (the mds
2189 * can recover size and mtime, so we don't need to
2190 * wait for that)
2191 */
da819c81 2192 if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
a8599bd8 2193 ret = wait_event_interruptible(ci->i_cap_wq,
da819c81 2194 caps_are_flushed(inode, flush_tid));
a8599bd8 2195 }
5955102c 2196 inode_unlock(inode);
da819c81
YZ
2197out:
2198 dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
a8599bd8
SW
2199 return ret;
2200}
2201
2202/*
2203 * Flush any dirty caps back to the mds. If we aren't asked to wait,
2204 * queue inode for flush but don't do so immediately, because we can
2205 * get by with fewer MDS messages if we wait for data writeback to
2206 * complete first.
2207 */
f1a3d572 2208int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
a8599bd8
SW
2209{
2210 struct ceph_inode_info *ci = ceph_inode(inode);
553adfd9 2211 u64 flush_tid;
a8599bd8
SW
2212 int err = 0;
2213 int dirty;
16515a6d 2214 int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync);
a8599bd8
SW
2215
2216 dout("write_inode %p wait=%d\n", inode, wait);
2217 if (wait) {
553adfd9 2218 dirty = try_flush_caps(inode, &flush_tid);
a8599bd8
SW
2219 if (dirty)
2220 err = wait_event_interruptible(ci->i_cap_wq,
2221 caps_are_flushed(inode, flush_tid));
2222 } else {
640ef79d 2223 struct ceph_mds_client *mdsc =
3d14c5d2 2224 ceph_sb_to_client(inode->i_sb)->mdsc;
a8599bd8 2225
be655596 2226 spin_lock(&ci->i_ceph_lock);
a8599bd8
SW
2227 if (__ceph_caps_dirty(ci))
2228 __cap_delay_requeue_front(mdsc, ci);
be655596 2229 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
2230 }
2231 return err;
2232}
2233
0e294387
YZ
2234static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
2235 struct ceph_mds_session *session,
2236 struct ceph_inode_info *ci,
2237 u64 oldest_flush_tid)
2238 __releases(ci->i_ceph_lock)
2239 __acquires(ci->i_ceph_lock)
553adfd9
YZ
2240{
2241 struct inode *inode = &ci->vfs_inode;
2242 struct ceph_cap *cap;
2243 struct ceph_cap_flush *cf;
0e294387 2244 int ret;
553adfd9
YZ
2245 u64 first_tid = 0;
2246
e4500b5e
YZ
2247 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
2248 if (cf->tid < first_tid)
2249 continue;
2250
553adfd9
YZ
2251 cap = ci->i_auth_cap;
2252 if (!(cap && cap->session == session)) {
0e294387
YZ
2253 pr_err("%p auth cap %p not mds%d ???\n",
2254 inode, cap, session->s_mds);
553adfd9
YZ
2255 break;
2256 }
2257
553adfd9
YZ
2258 first_tid = cf->tid + 1;
2259
0e294387
YZ
2260 if (cf->caps) {
2261 dout("kick_flushing_caps %p cap %p tid %llu %s\n",
2262 inode, cap, cf->tid, ceph_cap_string(cf->caps));
2263 ci->i_ceph_flags |= CEPH_I_NODELAY;
2264 ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
1e4ef0c6 2265 false, __ceph_caps_used(ci),
0e294387
YZ
2266 __ceph_caps_wanted(ci),
2267 cap->issued | cap->implemented,
2268 cf->caps, cf->tid, oldest_flush_tid);
2269 if (ret) {
2270 pr_err("kick_flushing_caps: error sending "
2271 "cap flush, ino (%llx.%llx) "
2272 "tid %llu flushing %s\n",
2273 ceph_vinop(inode), cf->tid,
2274 ceph_cap_string(cf->caps));
2275 }
2276 } else {
2277 struct ceph_cap_snap *capsnap =
2278 container_of(cf, struct ceph_cap_snap,
2279 cap_flush);
2280 dout("kick_flushing_caps %p capsnap %p tid %llu %s\n",
2281 inode, capsnap, cf->tid,
2282 ceph_cap_string(capsnap->dirty));
2283
805692d0 2284 refcount_inc(&capsnap->nref);
0e294387
YZ
2285 spin_unlock(&ci->i_ceph_lock);
2286
2287 ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
2288 oldest_flush_tid);
2289 if (ret < 0) {
2290 pr_err("kick_flushing_caps: error sending "
2291 "cap flushsnap, ino (%llx.%llx) "
2292 "tid %llu follows %llu\n",
2293 ceph_vinop(inode), cf->tid,
2294 capsnap->follows);
2295 }
2296
2297 ceph_put_cap_snap(capsnap);
2298 }
e4500b5e
YZ
2299
2300 spin_lock(&ci->i_ceph_lock);
553adfd9 2301 }
553adfd9
YZ
2302}
2303
e548e9b9
YZ
2304void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
2305 struct ceph_mds_session *session)
2306{
2307 struct ceph_inode_info *ci;
2308 struct ceph_cap *cap;
0e294387 2309 u64 oldest_flush_tid;
e548e9b9
YZ
2310
2311 dout("early_kick_flushing_caps mds%d\n", session->s_mds);
0e294387
YZ
2312
2313 spin_lock(&mdsc->cap_dirty_lock);
2314 oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2315 spin_unlock(&mdsc->cap_dirty_lock);
2316
e548e9b9
YZ
2317 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
2318 spin_lock(&ci->i_ceph_lock);
2319 cap = ci->i_auth_cap;
2320 if (!(cap && cap->session == session)) {
2321 pr_err("%p auth cap %p not mds%d ???\n",
2322 &ci->vfs_inode, cap, session->s_mds);
2323 spin_unlock(&ci->i_ceph_lock);
2324 continue;
2325 }
2326
2327
2328 /*
2329 * if flushing caps were revoked, we re-send the cap flush
2330 * in client reconnect stage. This guarantees MDS * processes
2331 * the cap flush message before issuing the flushing caps to
2332 * other client.
2333 */
2334 if ((cap->issued & ci->i_flushing_caps) !=
2335 ci->i_flushing_caps) {
13c2b57d 2336 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
0e294387
YZ
2337 __kick_flushing_caps(mdsc, session, ci,
2338 oldest_flush_tid);
13c2b57d
YZ
2339 } else {
2340 ci->i_ceph_flags |= CEPH_I_KICK_FLUSH;
e548e9b9
YZ
2341 }
2342
e548e9b9
YZ
2343 spin_unlock(&ci->i_ceph_lock);
2344 }
2345}
2346
a8599bd8
SW
2347void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
2348 struct ceph_mds_session *session)
2349{
2350 struct ceph_inode_info *ci;
13c2b57d 2351 struct ceph_cap *cap;
0e294387 2352 u64 oldest_flush_tid;
a8599bd8
SW
2353
2354 dout("kick_flushing_caps mds%d\n", session->s_mds);
0e294387
YZ
2355
2356 spin_lock(&mdsc->cap_dirty_lock);
2357 oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2358 spin_unlock(&mdsc->cap_dirty_lock);
2359
a8599bd8 2360 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
0e294387 2361 spin_lock(&ci->i_ceph_lock);
13c2b57d
YZ
2362 cap = ci->i_auth_cap;
2363 if (!(cap && cap->session == session)) {
2364 pr_err("%p auth cap %p not mds%d ???\n",
2365 &ci->vfs_inode, cap, session->s_mds);
2366 spin_unlock(&ci->i_ceph_lock);
2367 continue;
2368 }
2369 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
2370 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
2371 __kick_flushing_caps(mdsc, session, ci,
2372 oldest_flush_tid);
2373 }
0e294387 2374 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
2375 }
2376}
2377
088b3f5e
SW
2378static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
2379 struct ceph_mds_session *session,
2380 struct inode *inode)
0e294387 2381 __releases(ci->i_ceph_lock)
088b3f5e
SW
2382{
2383 struct ceph_inode_info *ci = ceph_inode(inode);
2384 struct ceph_cap *cap;
088b3f5e 2385
088b3f5e 2386 cap = ci->i_auth_cap;
8310b089
YZ
2387 dout("kick_flushing_inode_caps %p flushing %s\n", inode,
2388 ceph_cap_string(ci->i_flushing_caps));
005c4697 2389
0e294387
YZ
2390 if (!list_empty(&ci->i_cap_flush_list)) {
2391 u64 oldest_flush_tid;
005c4697
YZ
2392 spin_lock(&mdsc->cap_dirty_lock);
2393 list_move_tail(&ci->i_flushing_item,
2394 &cap->session->s_cap_flushing);
0e294387 2395 oldest_flush_tid = __get_oldest_flush_tid(mdsc);
005c4697
YZ
2396 spin_unlock(&mdsc->cap_dirty_lock);
2397
13c2b57d 2398 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
0e294387 2399 __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
553adfd9 2400 spin_unlock(&ci->i_ceph_lock);
088b3f5e 2401 } else {
be655596 2402 spin_unlock(&ci->i_ceph_lock);
088b3f5e
SW
2403 }
2404}
2405
a8599bd8
SW
2406
2407/*
2408 * Take references to capabilities we hold, so that we don't release
2409 * them to the MDS prematurely.
2410 *
be655596 2411 * Protected by i_ceph_lock.
a8599bd8 2412 */
5dda377c
YZ
2413static void __take_cap_refs(struct ceph_inode_info *ci, int got,
2414 bool snap_rwsem_locked)
a8599bd8
SW
2415{
2416 if (got & CEPH_CAP_PIN)
2417 ci->i_pin_ref++;
2418 if (got & CEPH_CAP_FILE_RD)
2419 ci->i_rd_ref++;
2420 if (got & CEPH_CAP_FILE_CACHE)
2421 ci->i_rdcache_ref++;
5dda377c
YZ
2422 if (got & CEPH_CAP_FILE_WR) {
2423 if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
2424 BUG_ON(!snap_rwsem_locked);
2425 ci->i_head_snapc = ceph_get_snap_context(
2426 ci->i_snap_realm->cached_context);
2427 }
a8599bd8 2428 ci->i_wr_ref++;
5dda377c 2429 }
a8599bd8 2430 if (got & CEPH_CAP_FILE_BUFFER) {
d3d0720d 2431 if (ci->i_wb_ref == 0)
3772d26d 2432 ihold(&ci->vfs_inode);
d3d0720d
HC
2433 ci->i_wb_ref++;
2434 dout("__take_cap_refs %p wb %d -> %d (?)\n",
2435 &ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref);
a8599bd8
SW
2436 }
2437}
2438
2439/*
2440 * Try to grab cap references. Specify those refs we @want, and the
2441 * minimal set we @need. Also include the larger offset we are writing
2442 * to (when applicable), and check against max_size here as well.
2443 * Note that caller is responsible for ensuring max_size increases are
2444 * requested from the MDS.
2445 */
2446static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
5dda377c 2447 loff_t endoff, bool nonblock, int *got, int *err)
a8599bd8
SW
2448{
2449 struct inode *inode = &ci->vfs_inode;
5dda377c 2450 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
a8599bd8 2451 int ret = 0;
c4d4a582 2452 int have, implemented;
195d3ce2 2453 int file_wanted;
5dda377c 2454 bool snap_rwsem_locked = false;
a8599bd8
SW
2455
2456 dout("get_cap_refs %p need %s want %s\n", inode,
2457 ceph_cap_string(need), ceph_cap_string(want));
c4d4a582 2458
5dda377c 2459again:
be655596 2460 spin_lock(&ci->i_ceph_lock);
a8599bd8 2461
195d3ce2
SW
2462 /* make sure file is actually open */
2463 file_wanted = __ceph_caps_file_wanted(ci);
77310320 2464 if ((file_wanted & need) != need) {
195d3ce2
SW
2465 dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
2466 ceph_cap_string(need), ceph_cap_string(file_wanted));
a8599bd8
SW
2467 *err = -EBADF;
2468 ret = 1;
3738daa6 2469 goto out_unlock;
a8599bd8
SW
2470 }
2471
37505d57
YZ
2472 /* finish pending truncate */
2473 while (ci->i_truncate_pending) {
2474 spin_unlock(&ci->i_ceph_lock);
5dda377c
YZ
2475 if (snap_rwsem_locked) {
2476 up_read(&mdsc->snap_rwsem);
2477 snap_rwsem_locked = false;
2478 }
b415bf4f 2479 __ceph_do_pending_vmtruncate(inode);
37505d57
YZ
2480 spin_lock(&ci->i_ceph_lock);
2481 }
2482
3871cbb9
YZ
2483 have = __ceph_caps_issued(ci, &implemented);
2484
2485 if (have & need & CEPH_CAP_FILE_WR) {
a8599bd8
SW
2486 if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
2487 dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
2488 inode, endoff, ci->i_max_size);
3871cbb9 2489 if (endoff > ci->i_requested_max_size) {
5dda377c 2490 *err = -EAGAIN;
a8599bd8
SW
2491 ret = 1;
2492 }
3738daa6 2493 goto out_unlock;
a8599bd8
SW
2494 }
2495 /*
2496 * If a sync write is in progress, we must wait, so that we
2497 * can get a final snapshot value for size+mtime.
2498 */
2499 if (__ceph_have_pending_cap_snap(ci)) {
2500 dout("get_cap_refs %p cap_snap_pending\n", inode);
3738daa6 2501 goto out_unlock;
a8599bd8
SW
2502 }
2503 }
a8599bd8 2504
a8599bd8
SW
2505 if ((have & need) == need) {
2506 /*
2507 * Look at (implemented & ~have & not) so that we keep waiting
2508 * on transition from wanted -> needed caps. This is needed
2509 * for WRBUFFER|WR -> WR to avoid a new WR sync write from
2510 * going before a prior buffered writeback happens.
2511 */
2512 int not = want & ~(have & need);
2513 int revoking = implemented & ~have;
2514 dout("get_cap_refs %p have %s but not %s (revoking %s)\n",
2515 inode, ceph_cap_string(have), ceph_cap_string(not),
2516 ceph_cap_string(revoking));
2517 if ((revoking & not) == 0) {
5dda377c
YZ
2518 if (!snap_rwsem_locked &&
2519 !ci->i_head_snapc &&
2520 (need & CEPH_CAP_FILE_WR)) {
2521 if (!down_read_trylock(&mdsc->snap_rwsem)) {
2522 /*
2523 * we can not call down_read() when
2524 * task isn't in TASK_RUNNING state
2525 */
2526 if (nonblock) {
2527 *err = -EAGAIN;
2528 ret = 1;
2529 goto out_unlock;
2530 }
2531
2532 spin_unlock(&ci->i_ceph_lock);
2533 down_read(&mdsc->snap_rwsem);
2534 snap_rwsem_locked = true;
2535 goto again;
2536 }
2537 snap_rwsem_locked = true;
2538 }
c4d4a582 2539 *got = need | (have & want);
f7f7e7a0
YZ
2540 if ((need & CEPH_CAP_FILE_RD) &&
2541 !(*got & CEPH_CAP_FILE_CACHE))
2542 ceph_disable_fscache_readpage(ci);
5dda377c 2543 __take_cap_refs(ci, *got, true);
a8599bd8
SW
2544 ret = 1;
2545 }
2546 } else {
03f4fcb0
YZ
2547 int session_readonly = false;
2548 if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
2549 struct ceph_mds_session *s = ci->i_auth_cap->session;
2550 spin_lock(&s->s_cap_lock);
2551 session_readonly = s->s_readonly;
2552 spin_unlock(&s->s_cap_lock);
2553 }
2554 if (session_readonly) {
2555 dout("get_cap_refs %p needed %s but mds%d readonly\n",
2556 inode, ceph_cap_string(need), ci->i_auth_cap->mds);
2557 *err = -EROFS;
2558 ret = 1;
2559 goto out_unlock;
2560 }
2561
77310320
YZ
2562 if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
2563 int mds_wanted;
52953d55 2564 if (READ_ONCE(mdsc->fsc->mount_state) ==
77310320
YZ
2565 CEPH_MOUNT_SHUTDOWN) {
2566 dout("get_cap_refs %p forced umount\n", inode);
2567 *err = -EIO;
2568 ret = 1;
2569 goto out_unlock;
2570 }
c1944fed 2571 mds_wanted = __ceph_caps_mds_wanted(ci, false);
eb65b919 2572 if (need & ~(mds_wanted & need)) {
77310320
YZ
2573 dout("get_cap_refs %p caps were dropped"
2574 " (session killed?)\n", inode);
2575 *err = -ESTALE;
2576 ret = 1;
2577 goto out_unlock;
2578 }
eb65b919 2579 if (!(file_wanted & ~mds_wanted))
77310320 2580 ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
48fec5d0
YZ
2581 }
2582
a8599bd8
SW
2583 dout("get_cap_refs %p have %s needed %s\n", inode,
2584 ceph_cap_string(have), ceph_cap_string(need));
2585 }
3738daa6 2586out_unlock:
be655596 2587 spin_unlock(&ci->i_ceph_lock);
5dda377c
YZ
2588 if (snap_rwsem_locked)
2589 up_read(&mdsc->snap_rwsem);
3738daa6 2590
a8599bd8 2591 dout("get_cap_refs %p ret %d got %s\n", inode,
c4d4a582 2592 ret, ceph_cap_string(*got));
a8599bd8
SW
2593 return ret;
2594}
2595
2596/*
2597 * Check the offset we are writing up to against our current
2598 * max_size. If necessary, tell the MDS we want to write to
2599 * a larger offset.
2600 */
2601static void check_max_size(struct inode *inode, loff_t endoff)
2602{
2603 struct ceph_inode_info *ci = ceph_inode(inode);
2604 int check = 0;
2605
2606 /* do we need to explicitly request a larger max_size? */
be655596 2607 spin_lock(&ci->i_ceph_lock);
3871cbb9 2608 if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) {
a8599bd8
SW
2609 dout("write %p at large endoff %llu, req max_size\n",
2610 inode, endoff);
2611 ci->i_wanted_max_size = endoff;
a8599bd8 2612 }
3871cbb9
YZ
2613 /* duplicate ceph_check_caps()'s logic */
2614 if (ci->i_auth_cap &&
2615 (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) &&
2616 ci->i_wanted_max_size > ci->i_max_size &&
2617 ci->i_wanted_max_size > ci->i_requested_max_size)
2618 check = 1;
be655596 2619 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
2620 if (check)
2621 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
2622}
2623
2b1ac852
YZ
2624int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
2625{
2626 int ret, err = 0;
2627
2628 BUG_ON(need & ~CEPH_CAP_FILE_RD);
2629 BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
2630 ret = ceph_pool_perm_check(ci, need);
2631 if (ret < 0)
2632 return ret;
2633
2634 ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
2635 if (ret) {
2636 if (err == -EAGAIN) {
2637 ret = 0;
2638 } else if (err < 0) {
2639 ret = err;
2640 }
2641 }
2642 return ret;
2643}
2644
a8599bd8
SW
2645/*
2646 * Wait for caps, and take cap references. If we can't get a WR cap
2647 * due to a small max_size, make sure we check_max_size (and possibly
2648 * ask the mds) so we don't get hung up indefinitely.
2649 */
3738daa6
YZ
2650int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2651 loff_t endoff, int *got, struct page **pinned_page)
a8599bd8 2652{
5dda377c 2653 int _got, ret, err = 0;
a8599bd8 2654
10183a69
YZ
2655 ret = ceph_pool_perm_check(ci, need);
2656 if (ret < 0)
2657 return ret;
2658
5dda377c
YZ
2659 while (true) {
2660 if (endoff > 0)
2661 check_max_size(&ci->vfs_inode, endoff);
c4d4a582 2662
5dda377c
YZ
2663 err = 0;
2664 _got = 0;
2665 ret = try_get_cap_refs(ci, need, want, endoff,
2666 false, &_got, &err);
2667 if (ret) {
2668 if (err == -EAGAIN)
2669 continue;
2670 if (err < 0)
77310320 2671 ret = err;
5dda377c 2672 } else {
5c341ee3
NB
2673 DEFINE_WAIT_FUNC(wait, woken_wake_function);
2674 add_wait_queue(&ci->i_cap_wq, &wait);
2675
2676 while (!try_get_cap_refs(ci, need, want, endoff,
6e09d0fb
YZ
2677 true, &_got, &err)) {
2678 if (signal_pending(current)) {
2679 ret = -ERESTARTSYS;
2680 break;
2681 }
5c341ee3 2682 wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
6e09d0fb 2683 }
5c341ee3
NB
2684
2685 remove_wait_queue(&ci->i_cap_wq, &wait);
2686
5dda377c
YZ
2687 if (err == -EAGAIN)
2688 continue;
2689 if (err < 0)
2690 ret = err;
77310320
YZ
2691 }
2692 if (ret < 0) {
2693 if (err == -ESTALE) {
2694 /* session was killed, try renew caps */
2695 ret = ceph_renew_caps(&ci->vfs_inode);
2696 if (ret == 0)
2697 continue;
2698 }
2699 return ret;
5dda377c 2700 }
c4d4a582 2701
5dda377c
YZ
2702 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2703 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2704 i_size_read(&ci->vfs_inode) > 0) {
2705 struct page *page =
2706 find_get_page(ci->vfs_inode.i_mapping, 0);
2707 if (page) {
2708 if (PageUptodate(page)) {
2709 *pinned_page = page;
2710 break;
2711 }
09cbfeaf 2712 put_page(page);
c4d4a582 2713 }
5dda377c
YZ
2714 /*
2715 * drop cap refs first because getattr while
2716 * holding * caps refs can cause deadlock.
2717 */
2718 ceph_put_cap_refs(ci, _got);
2719 _got = 0;
c4d4a582 2720
5dda377c
YZ
2721 /*
2722 * getattr request will bring inline data into
2723 * page cache
2724 */
2725 ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
2726 CEPH_STAT_CAP_INLINE_DATA,
2727 true);
2728 if (ret < 0)
2729 return ret;
2730 continue;
2731 }
2732 break;
c4d4a582 2733 }
5dda377c 2734
f7f7e7a0
YZ
2735 if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
2736 ceph_fscache_revalidate_cookie(ci);
2737
c4d4a582
YZ
2738 *got = _got;
2739 return 0;
a8599bd8
SW
2740}
2741
2742/*
2743 * Take cap refs. Caller must already know we hold at least one ref
2744 * on the caps in question or we don't know this is safe.
2745 */
2746void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
2747{
be655596 2748 spin_lock(&ci->i_ceph_lock);
5dda377c 2749 __take_cap_refs(ci, caps, false);
be655596 2750 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
2751}
2752
86056090
YZ
2753
2754/*
2755 * drop cap_snap that is not associated with any snapshot.
2756 * we don't need to send FLUSHSNAP message for it.
2757 */
70220ac8
YZ
2758static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
2759 struct ceph_cap_snap *capsnap)
86056090
YZ
2760{
2761 if (!capsnap->need_flush &&
2762 !capsnap->writing && !capsnap->dirty_pages) {
86056090
YZ
2763 dout("dropping cap_snap %p follows %llu\n",
2764 capsnap, capsnap->follows);
0e294387 2765 BUG_ON(capsnap->cap_flush.tid > 0);
86056090 2766 ceph_put_snap_context(capsnap->context);
70220ac8
YZ
2767 if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps))
2768 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
2769
86056090 2770 list_del(&capsnap->ci_item);
86056090
YZ
2771 ceph_put_cap_snap(capsnap);
2772 return 1;
2773 }
2774 return 0;
2775}
2776
a8599bd8
SW
2777/*
2778 * Release cap refs.
2779 *
2780 * If we released the last ref on any given cap, call ceph_check_caps
2781 * to release (or schedule a release).
2782 *
2783 * If we are releasing a WR cap (from a sync write), finalize any affected
2784 * cap_snap, and wake up any waiters.
2785 */
2786void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
2787{
2788 struct inode *inode = &ci->vfs_inode;
2789 int last = 0, put = 0, flushsnaps = 0, wake = 0;
a8599bd8 2790
be655596 2791 spin_lock(&ci->i_ceph_lock);
a8599bd8
SW
2792 if (had & CEPH_CAP_PIN)
2793 --ci->i_pin_ref;
2794 if (had & CEPH_CAP_FILE_RD)
2795 if (--ci->i_rd_ref == 0)
2796 last++;
2797 if (had & CEPH_CAP_FILE_CACHE)
2798 if (--ci->i_rdcache_ref == 0)
2799 last++;
2800 if (had & CEPH_CAP_FILE_BUFFER) {
d3d0720d 2801 if (--ci->i_wb_ref == 0) {
a8599bd8
SW
2802 last++;
2803 put++;
2804 }
d3d0720d
HC
2805 dout("put_cap_refs %p wb %d -> %d (?)\n",
2806 inode, ci->i_wb_ref+1, ci->i_wb_ref);
a8599bd8
SW
2807 }
2808 if (had & CEPH_CAP_FILE_WR)
2809 if (--ci->i_wr_ref == 0) {
2810 last++;
86056090
YZ
2811 if (__ceph_have_pending_cap_snap(ci)) {
2812 struct ceph_cap_snap *capsnap =
2813 list_last_entry(&ci->i_cap_snaps,
2814 struct ceph_cap_snap,
2815 ci_item);
2816 capsnap->writing = 0;
70220ac8 2817 if (ceph_try_drop_cap_snap(ci, capsnap))
86056090
YZ
2818 put++;
2819 else if (__ceph_finish_cap_snap(ci, capsnap))
2820 flushsnaps = 1;
2821 wake = 1;
a8599bd8 2822 }
5dda377c
YZ
2823 if (ci->i_wrbuffer_ref_head == 0 &&
2824 ci->i_dirty_caps == 0 &&
2825 ci->i_flushing_caps == 0) {
2826 BUG_ON(!ci->i_head_snapc);
2827 ceph_put_snap_context(ci->i_head_snapc);
2828 ci->i_head_snapc = NULL;
2829 }
db40cc17
YZ
2830 /* see comment in __ceph_remove_cap() */
2831 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
2832 drop_inode_snap_realm(ci);
a8599bd8 2833 }
be655596 2834 spin_unlock(&ci->i_ceph_lock);
a8599bd8 2835
819ccbfa
SW
2836 dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
2837 last ? " last" : "", put ? " put" : "");
a8599bd8
SW
2838
2839 if (last && !flushsnaps)
2840 ceph_check_caps(ci, 0, NULL);
2841 else if (flushsnaps)
ed9b430c 2842 ceph_flush_snaps(ci, NULL);
a8599bd8 2843 if (wake)
03066f23 2844 wake_up_all(&ci->i_cap_wq);
86056090 2845 while (put-- > 0)
a8599bd8
SW
2846 iput(inode);
2847}
2848
2849/*
2850 * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
2851 * context. Adjust per-snap dirty page accounting as appropriate.
2852 * Once all dirty data for a cap_snap is flushed, flush snapped file
2853 * metadata back to the MDS. If we dropped the last ref, call
2854 * ceph_check_caps.
2855 */
2856void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
2857 struct ceph_snap_context *snapc)
2858{
2859 struct inode *inode = &ci->vfs_inode;
a8599bd8 2860 struct ceph_cap_snap *capsnap = NULL;
70220ac8
YZ
2861 int put = 0;
2862 bool last = false;
2863 bool found = false;
2864 bool flush_snaps = false;
2865 bool complete_capsnap = false;
a8599bd8 2866
be655596 2867 spin_lock(&ci->i_ceph_lock);
a8599bd8 2868 ci->i_wrbuffer_ref -= nr;
70220ac8
YZ
2869 if (ci->i_wrbuffer_ref == 0) {
2870 last = true;
2871 put++;
2872 }
a8599bd8
SW
2873
2874 if (ci->i_head_snapc == snapc) {
2875 ci->i_wrbuffer_ref_head -= nr;
7d8cb26d 2876 if (ci->i_wrbuffer_ref_head == 0 &&
5dda377c
YZ
2877 ci->i_wr_ref == 0 &&
2878 ci->i_dirty_caps == 0 &&
2879 ci->i_flushing_caps == 0) {
7d8cb26d 2880 BUG_ON(!ci->i_head_snapc);
a8599bd8
SW
2881 ceph_put_snap_context(ci->i_head_snapc);
2882 ci->i_head_snapc = NULL;
2883 }
2884 dout("put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n",
2885 inode,
2886 ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr,
2887 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
2888 last ? " LAST" : "");
2889 } else {
2890 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
2891 if (capsnap->context == snapc) {
70220ac8 2892 found = true;
a8599bd8
SW
2893 break;
2894 }
2895 }
2896 BUG_ON(!found);
819ccbfa
SW
2897 capsnap->dirty_pages -= nr;
2898 if (capsnap->dirty_pages == 0) {
70220ac8
YZ
2899 complete_capsnap = true;
2900 if (!capsnap->writing) {
2901 if (ceph_try_drop_cap_snap(ci, capsnap)) {
2902 put++;
2903 } else {
2904 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
2905 flush_snaps = true;
2906 }
2907 }
819ccbfa 2908 }
a8599bd8 2909 dout("put_wrbuffer_cap_refs on %p cap_snap %p "
86056090 2910 " snap %lld %d/%d -> %d/%d %s%s\n",
a8599bd8
SW
2911 inode, capsnap, capsnap->context->seq,
2912 ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
2913 ci->i_wrbuffer_ref, capsnap->dirty_pages,
2914 last ? " (wrbuffer last)" : "",
86056090 2915 complete_capsnap ? " (complete capsnap)" : "");
a8599bd8
SW
2916 }
2917
be655596 2918 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
2919
2920 if (last) {
2921 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
70220ac8 2922 } else if (flush_snaps) {
ed9b430c 2923 ceph_flush_snaps(ci, NULL);
a8599bd8 2924 }
70220ac8
YZ
2925 if (complete_capsnap)
2926 wake_up_all(&ci->i_cap_wq);
2927 while (put-- > 0)
819ccbfa 2928 iput(inode);
a8599bd8
SW
2929}
2930
ca20c991
YZ
2931/*
2932 * Invalidate unlinked inode's aliases, so we can drop the inode ASAP.
2933 */
2934static void invalidate_aliases(struct inode *inode)
2935{
2936 struct dentry *dn, *prev = NULL;
2937
2938 dout("invalidate_aliases inode %p\n", inode);
2939 d_prune_aliases(inode);
2940 /*
2941 * For non-directory inode, d_find_alias() only returns
fc12c80a
BF
2942 * hashed dentry. After calling d_invalidate(), the
2943 * dentry becomes unhashed.
ca20c991 2944 *
a8d436f0 2945 * For directory inode, d_find_alias() can return
fc12c80a 2946 * unhashed dentry. But directory inode should have
ca20c991
YZ
2947 * one alias at most.
2948 */
2949 while ((dn = d_find_alias(inode))) {
2950 if (dn == prev) {
2951 dput(dn);
2952 break;
2953 }
a8d436f0 2954 d_invalidate(dn);
ca20c991
YZ
2955 if (prev)
2956 dput(prev);
2957 prev = dn;
2958 }
2959 if (prev)
2960 dput(prev);
2961}
2962
a8599bd8
SW
2963/*
2964 * Handle a cap GRANT message from the MDS. (Note that a GRANT may
2965 * actually be a revocation if it specifies a smaller cap set.)
2966 *
be655596 2967 * caller holds s_mutex and i_ceph_lock, we drop both.
a8599bd8 2968 */
2cd698be
YZ
2969static void handle_cap_grant(struct ceph_mds_client *mdsc,
2970 struct inode *inode, struct ceph_mds_caps *grant,
779fe0fb
YZ
2971 struct ceph_string **pns, u64 inline_version,
2972 void *inline_data, u32 inline_len,
2cd698be 2973 struct ceph_buffer *xattr_buf,
15637c8b 2974 struct ceph_mds_session *session,
779fe0fb 2975 struct ceph_cap *cap, int issued)
2cd698be 2976 __releases(ci->i_ceph_lock)
982d6011 2977 __releases(mdsc->snap_rwsem)
a8599bd8
SW
2978{
2979 struct ceph_inode_info *ci = ceph_inode(inode);
2980 int mds = session->s_mds;
2f56f56a 2981 int seq = le32_to_cpu(grant->seq);
a8599bd8 2982 int newcaps = le32_to_cpu(grant->caps);
2cd698be 2983 int used, wanted, dirty;
a8599bd8
SW
2984 u64 size = le64_to_cpu(grant->size);
2985 u64 max_size = le64_to_cpu(grant->max_size);
2986 struct timespec mtime, atime, ctime;
15637c8b 2987 int check_caps = 0;
ab6c2c3e
FF
2988 bool wake = false;
2989 bool writeback = false;
2990 bool queue_trunc = false;
2991 bool queue_invalidate = false;
ab6c2c3e 2992 bool deleted_inode = false;
31c542a1 2993 bool fill_inline = false;
a8599bd8 2994
2f56f56a
SW
2995 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
2996 inode, cap, mds, seq, ceph_cap_string(newcaps));
a8599bd8
SW
2997 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
2998 inode->i_size);
2999
11df2dfb
YZ
3000
3001 /*
3002 * auth mds of the inode changed. we received the cap export message,
3003 * but still haven't received the cap import message. handle_cap_export
3004 * updated the new auth MDS' cap.
3005 *
3006 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
3007 * that was sent before the cap import message. So don't remove caps.
3008 */
3009 if (ceph_seq_cmp(seq, cap->seq) <= 0) {
3010 WARN_ON(cap != ci->i_auth_cap);
3011 WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
3012 seq = cap->seq;
3013 newcaps |= cap->issued;
3014 }
3015
a8599bd8
SW
3016 /*
3017 * If CACHE is being revoked, and we have no dirty buffers,
3018 * try to invalidate (once). (If there are dirty buffers, we
3019 * will invalidate _after_ writeback.)
3020 */
fdd4e158
YZ
3021 if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
3022 ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
3b454c49 3023 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
9abd4db7 3024 !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
e9075743 3025 if (try_nonblocking_invalidate(inode)) {
a8599bd8
SW
3026 /* there were locked pages.. invalidate later
3027 in a separate thread. */
3028 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
ab6c2c3e 3029 queue_invalidate = true;
a8599bd8
SW
3030 ci->i_rdcache_revoking = ci->i_rdcache_gen;
3031 }
a8599bd8 3032 }
a8599bd8
SW
3033 }
3034
3035 /* side effects now are allowed */
685f9a5d 3036 cap->cap_gen = session->s_cap_gen;
11df2dfb 3037 cap->seq = seq;
a8599bd8
SW
3038
3039 __check_cap_issue(ci, cap, newcaps);
3040
f98a128a
YZ
3041 if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
3042 (issued & CEPH_CAP_AUTH_EXCL) == 0) {
a8599bd8 3043 inode->i_mode = le32_to_cpu(grant->mode);
05cb11c1
EB
3044 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
3045 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
a8599bd8 3046 dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
bd2bae6a
EB
3047 from_kuid(&init_user_ns, inode->i_uid),
3048 from_kgid(&init_user_ns, inode->i_gid));
a8599bd8
SW
3049 }
3050
f98a128a
YZ
3051 if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
3052 (issued & CEPH_CAP_LINK_EXCL) == 0) {
bfe86848 3053 set_nlink(inode, le32_to_cpu(grant->nlink));
ca20c991
YZ
3054 if (inode->i_nlink == 0 &&
3055 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
ab6c2c3e 3056 deleted_inode = true;
ca20c991 3057 }
a8599bd8
SW
3058
3059 if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
3060 int len = le32_to_cpu(grant->xattr_len);
3061 u64 version = le64_to_cpu(grant->xattr_version);
3062
3063 if (version > ci->i_xattrs.version) {
3064 dout(" got new xattrs v%llu on %p len %d\n",
3065 version, inode, len);
3066 if (ci->i_xattrs.blob)
3067 ceph_buffer_put(ci->i_xattrs.blob);
3068 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
3069 ci->i_xattrs.version = version;
7221fe4c 3070 ceph_forget_all_cached_acls(inode);
a8599bd8
SW
3071 }
3072 }
3073
f98a128a
YZ
3074 if (newcaps & CEPH_CAP_ANY_RD) {
3075 /* ctime/mtime/atime? */
3076 ceph_decode_timespec(&mtime, &grant->mtime);
3077 ceph_decode_timespec(&atime, &grant->atime);
3078 ceph_decode_timespec(&ctime, &grant->ctime);
3079 ceph_fill_file_time(inode, issued,
3080 le32_to_cpu(grant->time_warp_seq),
3081 &ctime, &mtime, &atime);
3082 }
3083
3084 if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
3085 /* file layout may have changed */
7627151e 3086 s64 old_pool = ci->i_layout.pool_id;
779fe0fb
YZ
3087 struct ceph_string *old_ns;
3088
7627151e 3089 ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
779fe0fb
YZ
3090 old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
3091 lockdep_is_held(&ci->i_ceph_lock));
3092 rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
3093
3094 if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
7627151e 3095 ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
5ea5c5e0 3096
779fe0fb
YZ
3097 *pns = old_ns;
3098
f98a128a
YZ
3099 /* size/truncate_seq? */
3100 queue_trunc = ceph_fill_file_size(inode, issued,
3101 le32_to_cpu(grant->truncate_seq),
3102 le64_to_cpu(grant->truncate_size),
3103 size);
84eea8c7
YZ
3104 }
3105
3106 if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) {
3107 if (max_size != ci->i_max_size) {
f98a128a
YZ
3108 dout("max_size %lld -> %llu\n",
3109 ci->i_max_size, max_size);
3110 ci->i_max_size = max_size;
3111 if (max_size >= ci->i_wanted_max_size) {
3112 ci->i_wanted_max_size = 0; /* reset */
3113 ci->i_requested_max_size = 0;
3114 }
ab6c2c3e 3115 wake = true;
84eea8c7
YZ
3116 } else if (ci->i_wanted_max_size > ci->i_max_size &&
3117 ci->i_wanted_max_size > ci->i_requested_max_size) {
3118 /* CEPH_CAP_OP_IMPORT */
3119 wake = true;
a8599bd8 3120 }
a8599bd8
SW
3121 }
3122
3123 /* check cap bits */
3124 wanted = __ceph_caps_wanted(ci);
3125 used = __ceph_caps_used(ci);
3126 dirty = __ceph_caps_dirty(ci);
3127 dout(" my wanted = %s, used = %s, dirty %s\n",
3128 ceph_cap_string(wanted),
3129 ceph_cap_string(used),
3130 ceph_cap_string(dirty));
3131 if (wanted != le32_to_cpu(grant->wanted)) {
3132 dout("mds wanted %s -> %s\n",
3133 ceph_cap_string(le32_to_cpu(grant->wanted)),
3134 ceph_cap_string(wanted));
390306c3
YZ
3135 /* imported cap may not have correct mds_wanted */
3136 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
3137 check_caps = 1;
a8599bd8
SW
3138 }
3139
a8599bd8
SW
3140 /* revocation, grant, or no-op? */
3141 if (cap->issued & ~newcaps) {
3b454c49
SW
3142 int revoking = cap->issued & ~newcaps;
3143
3144 dout("revocation: %s -> %s (revoking %s)\n",
3145 ceph_cap_string(cap->issued),
3146 ceph_cap_string(newcaps),
3147 ceph_cap_string(revoking));
0eb6cd49 3148 if (revoking & used & CEPH_CAP_FILE_BUFFER)
ab6c2c3e 3149 writeback = true; /* initiate writeback; will delay ack */
3b454c49
SW
3150 else if (revoking == CEPH_CAP_FILE_CACHE &&
3151 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
3152 queue_invalidate)
3153 ; /* do nothing yet, invalidation will be queued */
3154 else if (cap == ci->i_auth_cap)
3155 check_caps = 1; /* check auth cap only */
3156 else
3157 check_caps = 2; /* check all caps */
a8599bd8 3158 cap->issued = newcaps;
978097c9 3159 cap->implemented |= newcaps;
a8599bd8
SW
3160 } else if (cap->issued == newcaps) {
3161 dout("caps unchanged: %s -> %s\n",
3162 ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
3163 } else {
3164 dout("grant: %s -> %s\n", ceph_cap_string(cap->issued),
3165 ceph_cap_string(newcaps));
6ee6b953
YZ
3166 /* non-auth MDS is revoking the newly grant caps ? */
3167 if (cap == ci->i_auth_cap &&
3168 __ceph_caps_revoking_other(ci, cap, newcaps))
3169 check_caps = 2;
3170
a8599bd8
SW
3171 cap->issued = newcaps;
3172 cap->implemented |= newcaps; /* add bits only, to
3173 * avoid stepping on a
3174 * pending revocation */
ab6c2c3e 3175 wake = true;
a8599bd8 3176 }
978097c9 3177 BUG_ON(cap->issued & ~cap->implemented);
a8599bd8 3178
31c542a1
YZ
3179 if (inline_version > 0 && inline_version >= ci->i_inline_version) {
3180 ci->i_inline_version = inline_version;
3181 if (ci->i_inline_version != CEPH_INLINE_NONE &&
3182 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
3183 fill_inline = true;
3184 }
3185
2cd698be 3186 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
2cd698be 3187 if (newcaps & ~issued)
ab6c2c3e 3188 wake = true;
0e294387
YZ
3189 kick_flushing_inode_caps(mdsc, session, inode);
3190 up_read(&mdsc->snap_rwsem);
3191 } else {
3192 spin_unlock(&ci->i_ceph_lock);
2cd698be
YZ
3193 }
3194
31c542a1
YZ
3195 if (fill_inline)
3196 ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
3197
14649758 3198 if (queue_trunc)
c6bcda6f 3199 ceph_queue_vmtruncate(inode);
c6bcda6f 3200
3c6f6b79 3201 if (writeback)
a8599bd8
SW
3202 /*
3203 * queue inode for writeback: we can't actually call
3204 * filemap_write_and_wait, etc. from message handler
3205 * context.
3206 */
3c6f6b79
SW
3207 ceph_queue_writeback(inode);
3208 if (queue_invalidate)
3209 ceph_queue_invalidate(inode);
ca20c991
YZ
3210 if (deleted_inode)
3211 invalidate_aliases(inode);
a8599bd8 3212 if (wake)
03066f23 3213 wake_up_all(&ci->i_cap_wq);
15637c8b
SW
3214
3215 if (check_caps == 1)
3216 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
3217 session);
3218 else if (check_caps == 2)
3219 ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
3220 else
3221 mutex_unlock(&session->s_mutex);
a8599bd8
SW
3222}
3223
3224/*
3225 * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
3226 * MDS has been safely committed.
3227 */
6df058c0 3228static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
a8599bd8
SW
3229 struct ceph_mds_caps *m,
3230 struct ceph_mds_session *session,
3231 struct ceph_cap *cap)
be655596 3232 __releases(ci->i_ceph_lock)
a8599bd8
SW
3233{
3234 struct ceph_inode_info *ci = ceph_inode(inode);
3d14c5d2 3235 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
e4500b5e 3236 struct ceph_cap_flush *cf, *tmp_cf;
553adfd9 3237 LIST_HEAD(to_remove);
a8599bd8
SW
3238 unsigned seq = le32_to_cpu(m->seq);
3239 int dirty = le32_to_cpu(m->dirty);
3240 int cleaned = 0;
c8799fc4 3241 bool drop = false;
7271efa7
TM
3242 bool wake_ci = false;
3243 bool wake_mdsc = false;
a8599bd8 3244
e4500b5e 3245 list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
553adfd9
YZ
3246 if (cf->tid == flush_tid)
3247 cleaned = cf->caps;
0e294387
YZ
3248 if (cf->caps == 0) /* capsnap */
3249 continue;
553adfd9 3250 if (cf->tid <= flush_tid) {
c8799fc4
YZ
3251 if (__finish_cap_flush(NULL, ci, cf))
3252 wake_ci = true;
e4500b5e 3253 list_add_tail(&cf->i_list, &to_remove);
553adfd9
YZ
3254 } else {
3255 cleaned &= ~cf->caps;
3256 if (!cleaned)
3257 break;
3258 }
3259 }
a8599bd8
SW
3260
3261 dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
3262 " flushing %s -> %s\n",
3263 inode, session->s_mds, seq, ceph_cap_string(dirty),
3264 ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps),
3265 ceph_cap_string(ci->i_flushing_caps & ~cleaned));
3266
8310b089 3267 if (list_empty(&to_remove) && !cleaned)
a8599bd8
SW
3268 goto out;
3269
a8599bd8 3270 ci->i_flushing_caps &= ~cleaned;
a8599bd8
SW
3271
3272 spin_lock(&mdsc->cap_dirty_lock);
8310b089 3273
c8799fc4
YZ
3274 list_for_each_entry(cf, &to_remove, i_list) {
3275 if (__finish_cap_flush(mdsc, NULL, cf))
3276 wake_mdsc = true;
8310b089
YZ
3277 }
3278
a8599bd8 3279 if (ci->i_flushing_caps == 0) {
0e294387
YZ
3280 if (list_empty(&ci->i_cap_flush_list)) {
3281 list_del_init(&ci->i_flushing_item);
3282 if (!list_empty(&session->s_cap_flushing)) {
3283 dout(" mds%d still flushing cap on %p\n",
3284 session->s_mds,
3285 &list_first_entry(&session->s_cap_flushing,
3286 struct ceph_inode_info,
3287 i_flushing_item)->vfs_inode);
3288 }
3289 }
a8599bd8 3290 mdsc->num_cap_flushing--;
a8599bd8 3291 dout(" inode %p now !flushing\n", inode);
afcdaea3
SW
3292
3293 if (ci->i_dirty_caps == 0) {
3294 dout(" inode %p now clean\n", inode);
3295 BUG_ON(!list_empty(&ci->i_dirty_item));
c8799fc4 3296 drop = true;
5dda377c
YZ
3297 if (ci->i_wr_ref == 0 &&
3298 ci->i_wrbuffer_ref_head == 0) {
7d8cb26d
SW
3299 BUG_ON(!ci->i_head_snapc);
3300 ceph_put_snap_context(ci->i_head_snapc);
3301 ci->i_head_snapc = NULL;
3302 }
76e3b390
SW
3303 } else {
3304 BUG_ON(list_empty(&ci->i_dirty_item));
afcdaea3 3305 }
a8599bd8
SW
3306 }
3307 spin_unlock(&mdsc->cap_dirty_lock);
a8599bd8
SW
3308
3309out:
be655596 3310 spin_unlock(&ci->i_ceph_lock);
553adfd9
YZ
3311
3312 while (!list_empty(&to_remove)) {
3313 cf = list_first_entry(&to_remove,
e4500b5e
YZ
3314 struct ceph_cap_flush, i_list);
3315 list_del(&cf->i_list);
f66fd9f0 3316 ceph_free_cap_flush(cf);
553adfd9 3317 }
c8799fc4
YZ
3318
3319 if (wake_ci)
3320 wake_up_all(&ci->i_cap_wq);
3321 if (wake_mdsc)
3322 wake_up_all(&mdsc->cap_flushing_wq);
afcdaea3 3323 if (drop)
a8599bd8
SW
3324 iput(inode);
3325}
3326
3327/*
3328 * Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can
3329 * throw away our cap_snap.
3330 *
3331 * Caller hold s_mutex.
3332 */
6df058c0 3333static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
a8599bd8
SW
3334 struct ceph_mds_caps *m,
3335 struct ceph_mds_session *session)
3336{
3337 struct ceph_inode_info *ci = ceph_inode(inode);
affbc19a 3338 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
a8599bd8 3339 u64 follows = le64_to_cpu(m->snap_follows);
a8599bd8 3340 struct ceph_cap_snap *capsnap;
c8799fc4
YZ
3341 bool flushed = false;
3342 bool wake_ci = false;
3343 bool wake_mdsc = false;
a8599bd8
SW
3344
3345 dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
3346 inode, ci, session->s_mds, follows);
3347
be655596 3348 spin_lock(&ci->i_ceph_lock);
a8599bd8
SW
3349 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
3350 if (capsnap->follows == follows) {
0e294387 3351 if (capsnap->cap_flush.tid != flush_tid) {
a8599bd8
SW
3352 dout(" cap_snap %p follows %lld tid %lld !="
3353 " %lld\n", capsnap, follows,
0e294387 3354 flush_tid, capsnap->cap_flush.tid);
a8599bd8
SW
3355 break;
3356 }
c8799fc4 3357 flushed = true;
a8599bd8
SW
3358 break;
3359 } else {
3360 dout(" skipping cap_snap %p follows %lld\n",
3361 capsnap, capsnap->follows);
3362 }
3363 }
0e294387 3364 if (flushed) {
0e294387
YZ
3365 WARN_ON(capsnap->dirty_pages || capsnap->writing);
3366 dout(" removing %p cap_snap %p follows %lld\n",
3367 inode, capsnap, follows);
3368 list_del(&capsnap->ci_item);
c8799fc4
YZ
3369 if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush))
3370 wake_ci = true;
0e294387
YZ
3371
3372 spin_lock(&mdsc->cap_dirty_lock);
3373
3374 if (list_empty(&ci->i_cap_flush_list))
3375 list_del_init(&ci->i_flushing_item);
3376
c8799fc4
YZ
3377 if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush))
3378 wake_mdsc = true;
0e294387
YZ
3379
3380 spin_unlock(&mdsc->cap_dirty_lock);
0e294387 3381 }
be655596 3382 spin_unlock(&ci->i_ceph_lock);
0e294387
YZ
3383 if (flushed) {
3384 ceph_put_snap_context(capsnap->context);
3385 ceph_put_cap_snap(capsnap);
c8799fc4
YZ
3386 if (wake_ci)
3387 wake_up_all(&ci->i_cap_wq);
3388 if (wake_mdsc)
3389 wake_up_all(&mdsc->cap_flushing_wq);
a8599bd8 3390 iput(inode);
0e294387 3391 }
a8599bd8
SW
3392}
3393
3394/*
3395 * Handle TRUNC from MDS, indicating file truncation.
3396 *
3397 * caller hold s_mutex.
3398 */
3399static void handle_cap_trunc(struct inode *inode,
3400 struct ceph_mds_caps *trunc,
3401 struct ceph_mds_session *session)
be655596 3402 __releases(ci->i_ceph_lock)
a8599bd8
SW
3403{
3404 struct ceph_inode_info *ci = ceph_inode(inode);
3405 int mds = session->s_mds;
3406 int seq = le32_to_cpu(trunc->seq);
3407 u32 truncate_seq = le32_to_cpu(trunc->truncate_seq);
3408 u64 truncate_size = le64_to_cpu(trunc->truncate_size);
3409 u64 size = le64_to_cpu(trunc->size);
3410 int implemented = 0;
3411 int dirty = __ceph_caps_dirty(ci);
3412 int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
3413 int queue_trunc = 0;
3414
3415 issued |= implemented | dirty;
3416
3417 dout("handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n",
3418 inode, mds, seq, truncate_size, truncate_seq);
3419 queue_trunc = ceph_fill_file_size(inode, issued,
3420 truncate_seq, truncate_size, size);
be655596 3421 spin_unlock(&ci->i_ceph_lock);
a8599bd8 3422
14649758 3423 if (queue_trunc)
3c6f6b79 3424 ceph_queue_vmtruncate(inode);
a8599bd8
SW
3425}
3426
3427/*
3428 * Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a
3429 * different one. If we are the most recent migration we've seen (as
3430 * indicated by mseq), make note of the migrating cap bits for the
3431 * duration (until we see the corresponding IMPORT).
3432 *
3433 * caller holds s_mutex
3434 */
3435static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
11df2dfb
YZ
3436 struct ceph_mds_cap_peer *ph,
3437 struct ceph_mds_session *session)
a8599bd8 3438{
db354052 3439 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
11df2dfb 3440 struct ceph_mds_session *tsession = NULL;
d9df2783 3441 struct ceph_cap *cap, *tcap, *new_cap = NULL;
a8599bd8 3442 struct ceph_inode_info *ci = ceph_inode(inode);
11df2dfb 3443 u64 t_cap_id;
a8599bd8 3444 unsigned mseq = le32_to_cpu(ex->migrate_seq);
11df2dfb
YZ
3445 unsigned t_seq, t_mseq;
3446 int target, issued;
3447 int mds = session->s_mds;
a8599bd8 3448
11df2dfb
YZ
3449 if (ph) {
3450 t_cap_id = le64_to_cpu(ph->cap_id);
3451 t_seq = le32_to_cpu(ph->seq);
3452 t_mseq = le32_to_cpu(ph->mseq);
3453 target = le32_to_cpu(ph->mds);
3454 } else {
3455 t_cap_id = t_seq = t_mseq = 0;
3456 target = -1;
3457 }
a8599bd8 3458
11df2dfb
YZ
3459 dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n",
3460 inode, ci, mds, mseq, target);
3461retry:
be655596 3462 spin_lock(&ci->i_ceph_lock);
11df2dfb 3463 cap = __get_cap_for_mds(ci, mds);
ca665e02 3464 if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id))
11df2dfb 3465 goto out_unlock;
a8599bd8 3466
11df2dfb
YZ
3467 if (target < 0) {
3468 __ceph_remove_cap(cap, false);
77310320
YZ
3469 if (!ci->i_auth_cap)
3470 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
11df2dfb 3471 goto out_unlock;
a8599bd8
SW
3472 }
3473
11df2dfb
YZ
3474 /*
3475 * now we know we haven't received the cap import message yet
3476 * because the exported cap still exist.
3477 */
db354052 3478
11df2dfb 3479 issued = cap->issued;
d84b37f9
YZ
3480 if (issued != cap->implemented)
3481 pr_err_ratelimited("handle_cap_export: issued != implemented: "
3482 "ino (%llx.%llx) mds%d seq %d mseq %d "
3483 "issued %s implemented %s\n",
3484 ceph_vinop(inode), mds, cap->seq, cap->mseq,
3485 ceph_cap_string(issued),
3486 ceph_cap_string(cap->implemented));
3487
11df2dfb
YZ
3488
3489 tcap = __get_cap_for_mds(ci, target);
3490 if (tcap) {
3491 /* already have caps from the target */
fa0aa3b8 3492 if (tcap->cap_id == t_cap_id &&
11df2dfb
YZ
3493 ceph_seq_cmp(tcap->seq, t_seq) < 0) {
3494 dout(" updating import cap %p mds%d\n", tcap, target);
3495 tcap->cap_id = t_cap_id;
3496 tcap->seq = t_seq - 1;
3497 tcap->issue_seq = t_seq - 1;
3498 tcap->mseq = t_mseq;
3499 tcap->issued |= issued;
3500 tcap->implemented |= issued;
3501 if (cap == ci->i_auth_cap)
3502 ci->i_auth_cap = tcap;
00f06cba 3503
0e294387
YZ
3504 if (!list_empty(&ci->i_cap_flush_list) &&
3505 ci->i_auth_cap == tcap) {
11df2dfb
YZ
3506 spin_lock(&mdsc->cap_dirty_lock);
3507 list_move_tail(&ci->i_flushing_item,
3508 &tcap->session->s_cap_flushing);
3509 spin_unlock(&mdsc->cap_dirty_lock);
db354052 3510 }
a8599bd8 3511 }
a096b09a 3512 __ceph_remove_cap(cap, false);
11df2dfb 3513 goto out_unlock;
d9df2783 3514 } else if (tsession) {
11df2dfb 3515 /* add placeholder for the export tagert */
d9df2783 3516 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
00f06cba 3517 tcap = new_cap;
11df2dfb 3518 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
d9df2783
YZ
3519 t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
3520
00f06cba
YZ
3521 if (!list_empty(&ci->i_cap_flush_list) &&
3522 ci->i_auth_cap == tcap) {
3523 spin_lock(&mdsc->cap_dirty_lock);
3524 list_move_tail(&ci->i_flushing_item,
3525 &tcap->session->s_cap_flushing);
3526 spin_unlock(&mdsc->cap_dirty_lock);
3527 }
3528
d9df2783
YZ
3529 __ceph_remove_cap(cap, false);
3530 goto out_unlock;
a8599bd8
SW
3531 }
3532
be655596 3533 spin_unlock(&ci->i_ceph_lock);
11df2dfb
YZ
3534 mutex_unlock(&session->s_mutex);
3535
3536 /* open target session */
3537 tsession = ceph_mdsc_open_export_target_session(mdsc, target);
3538 if (!IS_ERR(tsession)) {
3539 if (mds > target) {
3540 mutex_lock(&session->s_mutex);
3541 mutex_lock_nested(&tsession->s_mutex,
3542 SINGLE_DEPTH_NESTING);
3543 } else {
3544 mutex_lock(&tsession->s_mutex);
3545 mutex_lock_nested(&session->s_mutex,
3546 SINGLE_DEPTH_NESTING);
3547 }
d9df2783 3548 new_cap = ceph_get_cap(mdsc, NULL);
11df2dfb
YZ
3549 } else {
3550 WARN_ON(1);
3551 tsession = NULL;
3552 target = -1;
3553 }
3554 goto retry;
3555
3556out_unlock:
3557 spin_unlock(&ci->i_ceph_lock);
3558 mutex_unlock(&session->s_mutex);
3559 if (tsession) {
3560 mutex_unlock(&tsession->s_mutex);
3561 ceph_put_mds_session(tsession);
3562 }
d9df2783
YZ
3563 if (new_cap)
3564 ceph_put_cap(mdsc, new_cap);
a8599bd8
SW
3565}
3566
3567/*
2cd698be 3568 * Handle cap IMPORT.
a8599bd8 3569 *
2cd698be 3570 * caller holds s_mutex. acquires i_ceph_lock
a8599bd8
SW
3571 */
3572static void handle_cap_import(struct ceph_mds_client *mdsc,
3573 struct inode *inode, struct ceph_mds_caps *im,
4ee6a914 3574 struct ceph_mds_cap_peer *ph,
a8599bd8 3575 struct ceph_mds_session *session,
2cd698be
YZ
3576 struct ceph_cap **target_cap, int *old_issued)
3577 __acquires(ci->i_ceph_lock)
a8599bd8
SW
3578{
3579 struct ceph_inode_info *ci = ceph_inode(inode);
2cd698be 3580 struct ceph_cap *cap, *ocap, *new_cap = NULL;
a8599bd8 3581 int mds = session->s_mds;
2cd698be
YZ
3582 int issued;
3583 unsigned caps = le32_to_cpu(im->caps);
a8599bd8
SW
3584 unsigned wanted = le32_to_cpu(im->wanted);
3585 unsigned seq = le32_to_cpu(im->seq);
3586 unsigned mseq = le32_to_cpu(im->migrate_seq);
3587 u64 realmino = le64_to_cpu(im->realm);
3588 u64 cap_id = le64_to_cpu(im->cap_id);
4ee6a914
YZ
3589 u64 p_cap_id;
3590 int peer;
a8599bd8 3591
4ee6a914
YZ
3592 if (ph) {
3593 p_cap_id = le64_to_cpu(ph->cap_id);
3594 peer = le32_to_cpu(ph->mds);
3595 } else {
3596 p_cap_id = 0;
3597 peer = -1;
3598 }
db354052 3599
4ee6a914
YZ
3600 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
3601 inode, ci, mds, mseq, peer);
3602
d9df2783 3603retry:
4ee6a914 3604 spin_lock(&ci->i_ceph_lock);
d9df2783
YZ
3605 cap = __get_cap_for_mds(ci, mds);
3606 if (!cap) {
3607 if (!new_cap) {
3608 spin_unlock(&ci->i_ceph_lock);
3609 new_cap = ceph_get_cap(mdsc, NULL);
3610 goto retry;
3611 }
2cd698be
YZ
3612 cap = new_cap;
3613 } else {
3614 if (new_cap) {
3615 ceph_put_cap(mdsc, new_cap);
3616 new_cap = NULL;
3617 }
d9df2783
YZ
3618 }
3619
2cd698be
YZ
3620 __ceph_caps_issued(ci, &issued);
3621 issued |= __ceph_caps_dirty(ci);
3622
3623 ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
d9df2783
YZ
3624 realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
3625
2cd698be
YZ
3626 ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
3627 if (ocap && ocap->cap_id == p_cap_id) {
4ee6a914 3628 dout(" remove export cap %p mds%d flags %d\n",
2cd698be 3629 ocap, peer, ph->flags);
4ee6a914 3630 if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
2cd698be
YZ
3631 (ocap->seq != le32_to_cpu(ph->seq) ||
3632 ocap->mseq != le32_to_cpu(ph->mseq))) {
d84b37f9
YZ
3633 pr_err_ratelimited("handle_cap_import: "
3634 "mismatched seq/mseq: ino (%llx.%llx) "
3635 "mds%d seq %d mseq %d importer mds%d "
3636 "has peer seq %d mseq %d\n",
3637 ceph_vinop(inode), peer, ocap->seq,
3638 ocap->mseq, mds, le32_to_cpu(ph->seq),
3639 le32_to_cpu(ph->mseq));
db354052 3640 }
2cd698be 3641 __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
a8599bd8
SW
3642 }
3643
4ee6a914 3644 /* make sure we re-request max_size, if necessary */
4ee6a914 3645 ci->i_requested_max_size = 0;
d9df2783 3646
2cd698be
YZ
3647 *old_issued = issued;
3648 *target_cap = cap;
a8599bd8
SW
3649}
3650
3651/*
3652 * Handle a caps message from the MDS.
3653 *
3654 * Identify the appropriate session, inode, and call the right handler
3655 * based on the cap op.
3656 */
3657void ceph_handle_caps(struct ceph_mds_session *session,
3658 struct ceph_msg *msg)
3659{
3660 struct ceph_mds_client *mdsc = session->s_mdsc;
3d14c5d2 3661 struct super_block *sb = mdsc->fsc->sb;
a8599bd8 3662 struct inode *inode;
be655596 3663 struct ceph_inode_info *ci;
a8599bd8
SW
3664 struct ceph_cap *cap;
3665 struct ceph_mds_caps *h;
4ee6a914 3666 struct ceph_mds_cap_peer *peer = NULL;
779fe0fb
YZ
3667 struct ceph_snap_realm *realm = NULL;
3668 struct ceph_string *pool_ns = NULL;
2600d2dd 3669 int mds = session->s_mds;
2cd698be 3670 int op, issued;
3d7ded4d 3671 u32 seq, mseq;
a8599bd8 3672 struct ceph_vino vino;
6df058c0 3673 u64 tid;
fb01d1f8
YZ
3674 u64 inline_version = 0;
3675 void *inline_data = NULL;
3676 u32 inline_len = 0;
70edb55b 3677 void *snaptrace;
ce1fbc8d 3678 size_t snaptrace_len;
fb01d1f8 3679 void *p, *end;
a8599bd8
SW
3680
3681 dout("handle_caps from mds%d\n", mds);
3682
3683 /* decode */
4ee6a914 3684 end = msg->front.iov_base + msg->front.iov_len;
6df058c0 3685 tid = le64_to_cpu(msg->hdr.tid);
a8599bd8
SW
3686 if (msg->front.iov_len < sizeof(*h))
3687 goto bad;
3688 h = msg->front.iov_base;
3689 op = le32_to_cpu(h->op);
3690 vino.ino = le64_to_cpu(h->ino);
3691 vino.snap = CEPH_NOSNAP;
a8599bd8 3692 seq = le32_to_cpu(h->seq);
3d7ded4d 3693 mseq = le32_to_cpu(h->migrate_seq);
a8599bd8 3694
ce1fbc8d
SW
3695 snaptrace = h + 1;
3696 snaptrace_len = le32_to_cpu(h->snap_trace_len);
fb01d1f8 3697 p = snaptrace + snaptrace_len;
ce1fbc8d
SW
3698
3699 if (le16_to_cpu(msg->hdr.version) >= 2) {
fb01d1f8 3700 u32 flock_len;
ce1fbc8d 3701 ceph_decode_32_safe(&p, end, flock_len, bad);
4ee6a914
YZ
3702 if (p + flock_len > end)
3703 goto bad;
fb01d1f8 3704 p += flock_len;
ce1fbc8d
SW
3705 }
3706
4ee6a914
YZ
3707 if (le16_to_cpu(msg->hdr.version) >= 3) {
3708 if (op == CEPH_CAP_OP_IMPORT) {
4ee6a914
YZ
3709 if (p + sizeof(*peer) > end)
3710 goto bad;
3711 peer = p;
fb01d1f8 3712 p += sizeof(*peer);
11df2dfb
YZ
3713 } else if (op == CEPH_CAP_OP_EXPORT) {
3714 /* recorded in unused fields */
3715 peer = (void *)&h->size;
4ee6a914
YZ
3716 }
3717 }
3718
fb01d1f8
YZ
3719 if (le16_to_cpu(msg->hdr.version) >= 4) {
3720 ceph_decode_64_safe(&p, end, inline_version, bad);
3721 ceph_decode_32_safe(&p, end, inline_len, bad);
3722 if (p + inline_len > end)
3723 goto bad;
3724 inline_data = p;
3725 p += inline_len;
3726 }
3727
92475f05
JL
3728 if (le16_to_cpu(msg->hdr.version) >= 5) {
3729 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
3730 u32 epoch_barrier;
3731
3732 ceph_decode_32_safe(&p, end, epoch_barrier, bad);
3733 ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
3734 }
3735
5ea5c5e0
YZ
3736 if (le16_to_cpu(msg->hdr.version) >= 8) {
3737 u64 flush_tid;
3738 u32 caller_uid, caller_gid;
779fe0fb 3739 u32 pool_ns_len;
92475f05 3740
5ea5c5e0
YZ
3741 /* version >= 6 */
3742 ceph_decode_64_safe(&p, end, flush_tid, bad);
3743 /* version >= 7 */
3744 ceph_decode_32_safe(&p, end, caller_uid, bad);
3745 ceph_decode_32_safe(&p, end, caller_gid, bad);
3746 /* version >= 8 */
3747 ceph_decode_32_safe(&p, end, pool_ns_len, bad);
779fe0fb
YZ
3748 if (pool_ns_len > 0) {
3749 ceph_decode_need(&p, end, pool_ns_len, bad);
3750 pool_ns = ceph_find_or_create_string(p, pool_ns_len);
3751 p += pool_ns_len;
3752 }
5ea5c5e0
YZ
3753 }
3754
6cd3bcad
YZ
3755 /* lookup ino */
3756 inode = ceph_find_inode(sb, vino);
3757 ci = ceph_inode(inode);
3758 dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
3759 vino.snap, inode);
3760
a8599bd8
SW
3761 mutex_lock(&session->s_mutex);
3762 session->s_seq++;
3763 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
3764 (unsigned)seq);
3765
a8599bd8
SW
3766 if (!inode) {
3767 dout(" i don't have ino %llx\n", vino.ino);
3d7ded4d 3768
a096b09a 3769 if (op == CEPH_CAP_OP_IMPORT) {
745a8e3b
YZ
3770 cap = ceph_get_cap(mdsc, NULL);
3771 cap->cap_ino = vino.ino;
3772 cap->queue_release = 1;
779fe0fb 3773 cap->cap_id = le64_to_cpu(h->cap_id);
745a8e3b
YZ
3774 cap->mseq = mseq;
3775 cap->seq = seq;
dc24de82 3776 cap->issue_seq = seq;
a096b09a 3777 spin_lock(&session->s_cap_lock);
745a8e3b
YZ
3778 list_add_tail(&cap->session_caps,
3779 &session->s_cap_releases);
3780 session->s_num_cap_releases++;
a096b09a
YZ
3781 spin_unlock(&session->s_cap_lock);
3782 }
21b559de 3783 goto flush_cap_releases;
a8599bd8
SW
3784 }
3785
3786 /* these will work even if we don't have a cap yet */
3787 switch (op) {
3788 case CEPH_CAP_OP_FLUSHSNAP_ACK:
6df058c0 3789 handle_cap_flushsnap_ack(inode, tid, h, session);
a8599bd8
SW
3790 goto done;
3791
3792 case CEPH_CAP_OP_EXPORT:
11df2dfb
YZ
3793 handle_cap_export(inode, h, peer, session);
3794 goto done_unlocked;
a8599bd8
SW
3795
3796 case CEPH_CAP_OP_IMPORT:
982d6011
YZ
3797 realm = NULL;
3798 if (snaptrace_len) {
3799 down_write(&mdsc->snap_rwsem);
3800 ceph_update_snap_trace(mdsc, snaptrace,
3801 snaptrace + snaptrace_len,
3802 false, &realm);
3803 downgrade_write(&mdsc->snap_rwsem);
3804 } else {
3805 down_read(&mdsc->snap_rwsem);
3806 }
4ee6a914 3807 handle_cap_import(mdsc, inode, h, peer, session,
2cd698be 3808 &cap, &issued);
779fe0fb 3809 handle_cap_grant(mdsc, inode, h, &pool_ns,
fb01d1f8 3810 inline_version, inline_data, inline_len,
779fe0fb 3811 msg->middle, session, cap, issued);
982d6011
YZ
3812 if (realm)
3813 ceph_put_snap_realm(mdsc, realm);
2cd698be 3814 goto done_unlocked;
a8599bd8
SW
3815 }
3816
3817 /* the rest require a cap */
be655596 3818 spin_lock(&ci->i_ceph_lock);
a8599bd8
SW
3819 cap = __get_cap_for_mds(ceph_inode(inode), mds);
3820 if (!cap) {
9dbd412f 3821 dout(" no cap on %p ino %llx.%llx from mds%d\n",
a8599bd8 3822 inode, ceph_ino(inode), ceph_snap(inode), mds);
be655596 3823 spin_unlock(&ci->i_ceph_lock);
21b559de 3824 goto flush_cap_releases;
a8599bd8
SW
3825 }
3826
be655596 3827 /* note that each of these drops i_ceph_lock for us */
a8599bd8
SW
3828 switch (op) {
3829 case CEPH_CAP_OP_REVOKE:
3830 case CEPH_CAP_OP_GRANT:
2cd698be
YZ
3831 __ceph_caps_issued(ci, &issued);
3832 issued |= __ceph_caps_dirty(ci);
779fe0fb 3833 handle_cap_grant(mdsc, inode, h, &pool_ns,
fb01d1f8 3834 inline_version, inline_data, inline_len,
779fe0fb 3835 msg->middle, session, cap, issued);
15637c8b 3836 goto done_unlocked;
a8599bd8
SW
3837
3838 case CEPH_CAP_OP_FLUSH_ACK:
6df058c0 3839 handle_cap_flush_ack(inode, tid, h, session, cap);
a8599bd8
SW
3840 break;
3841
3842 case CEPH_CAP_OP_TRUNC:
3843 handle_cap_trunc(inode, h, session);
3844 break;
3845
3846 default:
be655596 3847 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
3848 pr_err("ceph_handle_caps: unknown cap op %d %s\n", op,
3849 ceph_cap_op_name(op));
3850 }
3851
21b559de
GF
3852 goto done;
3853
3854flush_cap_releases:
3855 /*
745a8e3b 3856 * send any cap release message to try to move things
21b559de
GF
3857 * along for the mds (who clearly thinks we still have this
3858 * cap).
3859 */
21b559de
GF
3860 ceph_send_cap_releases(mdsc, session);
3861
a8599bd8 3862done:
15637c8b
SW
3863 mutex_unlock(&session->s_mutex);
3864done_unlocked:
e96a650a 3865 iput(inode);
779fe0fb 3866 ceph_put_string(pool_ns);
a8599bd8
SW
3867 return;
3868
3869bad:
3870 pr_err("ceph_handle_caps: corrupt message\n");
9ec7cab1 3871 ceph_msg_dump(msg);
a8599bd8
SW
3872 return;
3873}
3874
3875/*
3876 * Delayed work handler to process end of delayed cap release LRU list.
3877 */
afcdaea3 3878void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
a8599bd8 3879{
4b9f2042 3880 struct inode *inode;
a8599bd8
SW
3881 struct ceph_inode_info *ci;
3882 int flags = CHECK_CAPS_NODELAY;
3883
a8599bd8
SW
3884 dout("check_delayed_caps\n");
3885 while (1) {
3886 spin_lock(&mdsc->cap_delay_lock);
3887 if (list_empty(&mdsc->cap_delay_list))
3888 break;
3889 ci = list_first_entry(&mdsc->cap_delay_list,
3890 struct ceph_inode_info,
3891 i_cap_delay_list);
3892 if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
3893 time_before(jiffies, ci->i_hold_caps_max))
3894 break;
3895 list_del_init(&ci->i_cap_delay_list);
4b9f2042
YZ
3896
3897 inode = igrab(&ci->vfs_inode);
a8599bd8 3898 spin_unlock(&mdsc->cap_delay_lock);
4b9f2042
YZ
3899
3900 if (inode) {
3901 dout("check_delayed_caps on %p\n", inode);
3902 ceph_check_caps(ci, flags, NULL);
3903 iput(inode);
3904 }
a8599bd8
SW
3905 }
3906 spin_unlock(&mdsc->cap_delay_lock);
3907}
3908
afcdaea3
SW
3909/*
3910 * Flush all dirty caps to the mds
3911 */
3912void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
3913{
db354052
SW
3914 struct ceph_inode_info *ci;
3915 struct inode *inode;
afcdaea3
SW
3916
3917 dout("flush_dirty_caps\n");
3918 spin_lock(&mdsc->cap_dirty_lock);
db354052
SW
3919 while (!list_empty(&mdsc->cap_dirty)) {
3920 ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info,
3921 i_dirty_item);
70b666c3
SW
3922 inode = &ci->vfs_inode;
3923 ihold(inode);
db354052 3924 dout("flush_dirty_caps %p\n", inode);
afcdaea3 3925 spin_unlock(&mdsc->cap_dirty_lock);
70b666c3
SW
3926 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL);
3927 iput(inode);
afcdaea3
SW
3928 spin_lock(&mdsc->cap_dirty_lock);
3929 }
3930 spin_unlock(&mdsc->cap_dirty_lock);
db354052 3931 dout("flush_dirty_caps done\n");
afcdaea3
SW
3932}
3933
774a6a11
YZ
3934void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode)
3935{
3936 int i;
3937 int bits = (fmode << 1) | 1;
3938 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
3939 if (bits & (1 << i))
3940 ci->i_nr_by_mode[i]++;
3941 }
3942}
3943
a8599bd8
SW
3944/*
3945 * Drop open file reference. If we were the last open file,
3946 * we may need to release capabilities to the MDS (or schedule
3947 * their delayed release).
3948 */
3949void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
3950{
774a6a11
YZ
3951 int i, last = 0;
3952 int bits = (fmode << 1) | 1;
be655596 3953 spin_lock(&ci->i_ceph_lock);
774a6a11
YZ
3954 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
3955 if (bits & (1 << i)) {
3956 BUG_ON(ci->i_nr_by_mode[i] == 0);
3957 if (--ci->i_nr_by_mode[i] == 0)
3958 last++;
3959 }
3960 }
3961 dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n",
3962 &ci->vfs_inode, fmode,
3963 ci->i_nr_by_mode[0], ci->i_nr_by_mode[1],
3964 ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]);
be655596 3965 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
3966
3967 if (last && ci->i_vino.snap == CEPH_NOSNAP)
3968 ceph_check_caps(ci, 0, NULL);
3969}
3970
6ef0bc6d
ZZ
3971/*
3972 * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it
3973 * looks like the link count will hit 0, drop any other caps (other
3974 * than PIN) we don't specifically want (due to the file still being
3975 * open).
3976 */
3977int ceph_drop_caps_for_unlink(struct inode *inode)
3978{
3979 struct ceph_inode_info *ci = ceph_inode(inode);
3980 int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
3981
3982 spin_lock(&ci->i_ceph_lock);
3983 if (inode->i_nlink == 1) {
3984 drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
3985
3986 ci->i_ceph_flags |= CEPH_I_NODELAY;
3987 if (__ceph_caps_dirty(ci)) {
3988 struct ceph_mds_client *mdsc =
3989 ceph_inode_to_client(inode)->mdsc;
3990 __cap_delay_requeue_front(mdsc, ci);
3991 }
3992 }
3993 spin_unlock(&ci->i_ceph_lock);
3994 return drop;
3995}
3996
a8599bd8
SW
3997/*
3998 * Helpers for embedding cap and dentry lease releases into mds
3999 * requests.
4000 *
4001 * @force is used by dentry_release (below) to force inclusion of a
4002 * record for the directory inode, even when there aren't any caps to
4003 * drop.
4004 */
4005int ceph_encode_inode_release(void **p, struct inode *inode,
4006 int mds, int drop, int unless, int force)
4007{
4008 struct ceph_inode_info *ci = ceph_inode(inode);
4009 struct ceph_cap *cap;
4010 struct ceph_mds_request_release *rel = *p;
ec97f88b 4011 int used, dirty;
a8599bd8 4012 int ret = 0;
a8599bd8 4013
be655596 4014 spin_lock(&ci->i_ceph_lock);
916623da 4015 used = __ceph_caps_used(ci);
ec97f88b 4016 dirty = __ceph_caps_dirty(ci);
916623da 4017
ec97f88b
SW
4018 dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n",
4019 inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop),
916623da
SW
4020 ceph_cap_string(unless));
4021
ec97f88b
SW
4022 /* only drop unused, clean caps */
4023 drop &= ~(used | dirty);
916623da 4024
a8599bd8
SW
4025 cap = __get_cap_for_mds(ci, mds);
4026 if (cap && __cap_is_valid(cap)) {
222b7f90
YZ
4027 unless &= cap->issued;
4028 if (unless) {
4029 if (unless & CEPH_CAP_AUTH_EXCL)
4030 drop &= ~CEPH_CAP_AUTH_SHARED;
4031 if (unless & CEPH_CAP_LINK_EXCL)
4032 drop &= ~CEPH_CAP_LINK_SHARED;
4033 if (unless & CEPH_CAP_XATTR_EXCL)
4034 drop &= ~CEPH_CAP_XATTR_SHARED;
4035 if (unless & CEPH_CAP_FILE_EXCL)
4036 drop &= ~CEPH_CAP_FILE_SHARED;
4037 }
4038
4039 if (force || (cap->issued & drop)) {
4040 if (cap->issued & drop) {
bb137f84
YZ
4041 int wanted = __ceph_caps_wanted(ci);
4042 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
4043 wanted |= cap->mds_wanted;
4044 dout("encode_inode_release %p cap %p "
4045 "%s -> %s, wanted %s -> %s\n", inode, cap,
a8599bd8 4046 ceph_cap_string(cap->issued),
bb137f84
YZ
4047 ceph_cap_string(cap->issued & ~drop),
4048 ceph_cap_string(cap->mds_wanted),
4049 ceph_cap_string(wanted));
4050
a8599bd8
SW
4051 cap->issued &= ~drop;
4052 cap->implemented &= ~drop;
bb137f84 4053 cap->mds_wanted = wanted;
a8599bd8
SW
4054 } else {
4055 dout("encode_inode_release %p cap %p %s"
4056 " (force)\n", inode, cap,
4057 ceph_cap_string(cap->issued));
4058 }
4059
4060 rel->ino = cpu_to_le64(ceph_ino(inode));
4061 rel->cap_id = cpu_to_le64(cap->cap_id);
4062 rel->seq = cpu_to_le32(cap->seq);
08a0f24e 4063 rel->issue_seq = cpu_to_le32(cap->issue_seq);
a8599bd8 4064 rel->mseq = cpu_to_le32(cap->mseq);
fd7b95cd 4065 rel->caps = cpu_to_le32(cap->implemented);
a8599bd8
SW
4066 rel->wanted = cpu_to_le32(cap->mds_wanted);
4067 rel->dname_len = 0;
4068 rel->dname_seq = 0;
4069 *p += sizeof(*rel);
4070 ret = 1;
4071 } else {
222b7f90 4072 dout("encode_inode_release %p cap %p %s (noop)\n",
a8599bd8
SW
4073 inode, cap, ceph_cap_string(cap->issued));
4074 }
4075 }
be655596 4076 spin_unlock(&ci->i_ceph_lock);
a8599bd8
SW
4077 return ret;
4078}
4079
4080int ceph_encode_dentry_release(void **p, struct dentry *dentry,
ca6c8ae0 4081 struct inode *dir,
a8599bd8
SW
4082 int mds, int drop, int unless)
4083{
ca6c8ae0 4084 struct dentry *parent = NULL;
a8599bd8
SW
4085 struct ceph_mds_request_release *rel = *p;
4086 struct ceph_dentry_info *di = ceph_dentry(dentry);
4087 int force = 0;
4088 int ret;
4089
4090 /*
4091 * force an record for the directory caps if we have a dentry lease.
be655596 4092 * this is racy (can't take i_ceph_lock and d_lock together), but it
a8599bd8
SW
4093 * doesn't have to be perfect; the mds will revoke anything we don't
4094 * release.
4095 */
4096 spin_lock(&dentry->d_lock);
4097 if (di->lease_session && di->lease_session->s_mds == mds)
4098 force = 1;
ca6c8ae0
JL
4099 if (!dir) {
4100 parent = dget(dentry->d_parent);
4101 dir = d_inode(parent);
4102 }
a8599bd8
SW
4103 spin_unlock(&dentry->d_lock);
4104
ca6c8ae0 4105 ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force);
adf0d687 4106 dput(parent);
a8599bd8
SW
4107
4108 spin_lock(&dentry->d_lock);
4109 if (ret && di->lease_session && di->lease_session->s_mds == mds) {
4110 dout("encode_dentry_release %p mds%d seq %d\n",
4111 dentry, mds, (int)di->lease_seq);
4112 rel->dname_len = cpu_to_le32(dentry->d_name.len);
4113 memcpy(*p, dentry->d_name.name, dentry->d_name.len);
4114 *p += dentry->d_name.len;
4115 rel->dname_seq = cpu_to_le32(di->lease_seq);
1dadcce3 4116 __ceph_mdsc_drop_dentry_lease(dentry);
a8599bd8
SW
4117 }
4118 spin_unlock(&dentry->d_lock);
4119 return ret;
4120}