]> git.proxmox.com Git - mirror_ubuntu-jammy-kernel.git/blame - fs/ceph/addr.c
btrfs: check the root node for uptodate before returning it
[mirror_ubuntu-jammy-kernel.git] / fs / ceph / addr.c
CommitLineData
b2441318 1// SPDX-License-Identifier: GPL-2.0
3d14c5d2 2#include <linux/ceph/ceph_debug.h>
1d3576fd
SW
3
4#include <linux/backing-dev.h>
5#include <linux/fs.h>
6#include <linux/mm.h>
7#include <linux/pagemap.h>
8#include <linux/writeback.h> /* generic_writepages */
5a0e3ad6 9#include <linux/slab.h>
1d3576fd
SW
10#include <linux/pagevec.h>
11#include <linux/task_io_accounting_ops.h>
f361bf4a 12#include <linux/signal.h>
5c308356 13#include <linux/iversion.h>
97e27aaa 14#include <linux/ktime.h>
f0702876 15#include <linux/netfs.h>
1d3576fd
SW
16
17#include "super.h"
3d14c5d2 18#include "mds_client.h"
99ccbd22 19#include "cache.h"
97e27aaa 20#include "metric.h"
3d14c5d2 21#include <linux/ceph/osd_client.h>
08c1ac50 22#include <linux/ceph/striper.h>
1d3576fd
SW
23
24/*
25 * Ceph address space ops.
26 *
27 * There are a few funny things going on here.
28 *
29 * The page->private field is used to reference a struct
30 * ceph_snap_context for _every_ dirty page. This indicates which
31 * snapshot the page was logically dirtied in, and thus which snap
32 * context needs to be associated with the osd write during writeback.
33 *
34 * Similarly, struct ceph_inode_info maintains a set of counters to
25985edc 35 * count dirty pages on the inode. In the absence of snapshots,
1d3576fd
SW
36 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
37 *
38 * When a snapshot is taken (that is, when the client receives
39 * notification that a snapshot was taken), each inode with caps and
40 * with dirty pages (dirty pages implies there is a cap) gets a new
41 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
42 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is
43 * moved to capsnap->dirty. (Unless a sync write is currently in
44 * progress. In that case, the capsnap is said to be "pending", new
45 * writes cannot start, and the capsnap isn't "finalized" until the
46 * write completes (or fails) and a final size/mtime for the inode for
47 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0.
48 *
49 * On writeback, we must submit writes to the osd IN SNAP ORDER. So,
50 * we look for the first capsnap in i_cap_snaps and write out pages in
51 * that snap context _only_. Then we move on to the next capsnap,
52 * eventually reaching the "live" or "head" context (i.e., pages that
53 * are not yet snapped) and are writing the most recently dirtied
54 * pages.
55 *
56 * Invalidate and so forth must take care to ensure the dirty page
57 * accounting is preserved.
58 */
59
2baba250
YS
60#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
61#define CONGESTION_OFF_THRESH(congestion_kb) \
62 (CONGESTION_ON_THRESH(congestion_kb) - \
63 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
64
d801327d
JL
65static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
66 struct page *page, void **_fsdata);
67
61600ef8
YZ
68static inline struct ceph_snap_context *page_snap_context(struct page *page)
69{
70 if (PagePrivate(page))
71 return (void *)page->private;
72 return NULL;
73}
1d3576fd
SW
74
75/*
76 * Dirty a page. Optimistically adjust accounting, on the assumption
77 * that we won't race with invalidate. If we do, readjust.
78 */
79static int ceph_set_page_dirty(struct page *page)
80{
81 struct address_space *mapping = page->mapping;
82 struct inode *inode;
83 struct ceph_inode_info *ci;
1d3576fd 84 struct ceph_snap_context *snapc;
1d3576fd 85
7d6e1f54 86 if (PageDirty(page)) {
1d3576fd
SW
87 dout("%p set_page_dirty %p idx %lu -- already dirty\n",
88 mapping->host, page, page->index);
7d6e1f54 89 BUG_ON(!PagePrivate(page));
1d3576fd
SW
90 return 0;
91 }
92
93 inode = mapping->host;
94 ci = ceph_inode(inode);
95
1d3576fd 96 /* dirty the head */
be655596 97 spin_lock(&ci->i_ceph_lock);
5dda377c
YZ
98 BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
99 if (__ceph_have_pending_cap_snap(ci)) {
100 struct ceph_cap_snap *capsnap =
101 list_last_entry(&ci->i_cap_snaps,
102 struct ceph_cap_snap,
103 ci_item);
104 snapc = ceph_get_snap_context(capsnap->context);
105 capsnap->dirty_pages++;
106 } else {
107 BUG_ON(!ci->i_head_snapc);
108 snapc = ceph_get_snap_context(ci->i_head_snapc);
109 ++ci->i_wrbuffer_ref_head;
110 }
1d3576fd 111 if (ci->i_wrbuffer_ref == 0)
0444d76a 112 ihold(inode);
1d3576fd
SW
113 ++ci->i_wrbuffer_ref;
114 dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
115 "snapc %p seq %lld (%d snaps)\n",
116 mapping->host, page, page->index,
117 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
118 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
119 snapc, snapc->seq, snapc->num_snaps);
be655596 120 spin_unlock(&ci->i_ceph_lock);
1d3576fd 121
7d6e1f54
SZ
122 /*
123 * Reference snap context in page->private. Also set
124 * PagePrivate so that we get invalidatepage callback.
125 */
126 BUG_ON(PagePrivate(page));
379fc7fa 127 attach_page_private(page, snapc);
1d3576fd 128
22d41cdc 129 return __set_page_dirty_nobuffers(page);
1d3576fd
SW
130}
131
132/*
133 * If we are truncating the full page (i.e. offset == 0), adjust the
134 * dirty page counters appropriately. Only called if there is private
135 * data on the page.
136 */
d47992f8
LC
137static void ceph_invalidatepage(struct page *page, unsigned int offset,
138 unsigned int length)
1d3576fd 139{
4ce1e9ad 140 struct inode *inode;
1d3576fd 141 struct ceph_inode_info *ci;
379fc7fa 142 struct ceph_snap_context *snapc;
1d3576fd 143
7c46b318
JL
144 wait_on_page_fscache(page);
145
4ce1e9ad 146 inode = page->mapping->host;
b150f5c1
MT
147 ci = ceph_inode(inode);
148
8ff2d290 149 if (offset != 0 || length != thp_size(page)) {
b150f5c1
MT
150 dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
151 inode, page, page->index, offset, length);
152 return;
153 }
4ce1e9ad 154
b072d774 155 WARN_ON(!PageLocked(page));
99ccbd22
MT
156 if (!PagePrivate(page))
157 return;
158
b150f5c1
MT
159 dout("%p invalidatepage %p idx %lu full dirty page\n",
160 inode, page, page->index);
161
379fc7fa 162 snapc = detach_page_private(page);
b150f5c1
MT
163 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
164 ceph_put_snap_context(snapc);
1d3576fd
SW
165}
166
7c46b318 167static int ceph_releasepage(struct page *page, gfp_t gfp)
1d3576fd 168{
e55f1a18
N
169 dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
170 page, page->index, PageDirty(page) ? "" : "not ");
99ccbd22 171
7c46b318
JL
172 if (PageFsCache(page)) {
173 if (!(gfp & __GFP_DIRECT_RECLAIM) || !(gfp & __GFP_FS))
174 return 0;
175 wait_on_page_fscache(page);
176 }
99ccbd22 177 return !PagePrivate(page);
1d3576fd
SW
178}
179
f0702876
JL
180static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq)
181{
182 struct inode *inode = rreq->mapping->host;
183 struct ceph_inode_info *ci = ceph_inode(inode);
184 struct ceph_file_layout *lo = &ci->i_layout;
185 u32 blockoff;
186 u64 blockno;
187
188 /* Expand the start downward */
189 blockno = div_u64_rem(rreq->start, lo->stripe_unit, &blockoff);
190 rreq->start = blockno * lo->stripe_unit;
191 rreq->len += blockoff;
192
193 /* Now, round up the length to the next block */
194 rreq->len = roundup(rreq->len, lo->stripe_unit);
195}
196
197static bool ceph_netfs_clamp_length(struct netfs_read_subrequest *subreq)
198{
199 struct inode *inode = subreq->rreq->mapping->host;
200 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
201 struct ceph_inode_info *ci = ceph_inode(inode);
202 u64 objno, objoff;
203 u32 xlen;
204
205 /* Truncate the extent at the end of the current block */
206 ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len,
207 &objno, &objoff, &xlen);
208 subreq->len = min(xlen, fsc->mount_options->rsize);
209 return true;
210}
211
212static void finish_netfs_read(struct ceph_osd_request *req)
213{
214 struct ceph_fs_client *fsc = ceph_inode_to_client(req->r_inode);
215 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
216 struct netfs_read_subrequest *subreq = req->r_priv;
217 int num_pages;
218 int err = req->r_result;
219
8ae99ae2 220 ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency,
903f4fec 221 req->r_end_latency, osd_data->length, err);
f0702876
JL
222
223 dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result,
224 subreq->len, i_size_read(req->r_inode));
225
226 /* no object means success but no data */
227 if (err == -ENOENT)
228 err = 0;
229 else if (err == -EBLOCKLISTED)
230 fsc->blocklisted = true;
231
232 if (err >= 0 && err < subreq->len)
233 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
234
235 netfs_subreq_terminated(subreq, err, true);
236
237 num_pages = calc_pages_for(osd_data->alignment, osd_data->length);
238 ceph_put_page_vector(osd_data->pages, num_pages, false);
239 iput(req->r_inode);
240}
241
242static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
243{
244 struct netfs_read_request *rreq = subreq->rreq;
245 struct inode *inode = rreq->mapping->host;
246 struct ceph_inode_info *ci = ceph_inode(inode);
247 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
248 struct ceph_osd_request *req;
249 struct ceph_vino vino = ceph_vino(inode);
250 struct iov_iter iter;
251 struct page **pages;
252 size_t page_off;
253 int err = 0;
254 u64 len = subreq->len;
255
256 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
257 0, 1, CEPH_OSD_OP_READ,
258 CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
259 NULL, ci->i_truncate_seq, ci->i_truncate_size, false);
260 if (IS_ERR(req)) {
261 err = PTR_ERR(req);
262 req = NULL;
263 goto out;
264 }
265
266 dout("%s: pos=%llu orig_len=%zu len=%llu\n", __func__, subreq->start, subreq->len, len);
267 iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
268 err = iov_iter_get_pages_alloc(&iter, &pages, len, &page_off);
269 if (err < 0) {
270 dout("%s: iov_ter_get_pages_alloc returned %d\n", __func__, err);
271 goto out;
272 }
273
274 /* should always give us a page-aligned read */
275 WARN_ON_ONCE(page_off);
276 len = err;
277
278 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
279 req->r_callback = finish_netfs_read;
280 req->r_priv = subreq;
281 req->r_inode = inode;
282 ihold(inode);
283
284 err = ceph_osdc_start_request(req->r_osdc, req, false);
285 if (err)
286 iput(inode);
287out:
288 ceph_osdc_put_request(req);
289 if (err)
290 netfs_subreq_terminated(subreq, err, false);
291 dout("%s: result %d\n", __func__, err);
292}
293
294static void ceph_init_rreq(struct netfs_read_request *rreq, struct file *file)
295{
296}
297
49870056
JL
298static void ceph_readahead_cleanup(struct address_space *mapping, void *priv)
299{
300 struct inode *inode = mapping->host;
301 struct ceph_inode_info *ci = ceph_inode(inode);
302 int got = (uintptr_t)priv;
303
304 if (got)
305 ceph_put_cap_refs(ci, got);
306}
307
675d4d89 308static const struct netfs_read_request_ops ceph_netfs_read_ops = {
f0702876
JL
309 .init_rreq = ceph_init_rreq,
310 .is_cache_enabled = ceph_is_cache_enabled,
311 .begin_cache_operation = ceph_begin_cache_operation,
312 .issue_op = ceph_netfs_issue_op,
313 .expand_readahead = ceph_netfs_expand_readahead,
314 .clamp_length = ceph_netfs_clamp_length,
d801327d 315 .check_write_begin = ceph_netfs_check_write_begin,
49870056 316 .cleanup = ceph_readahead_cleanup,
f0702876
JL
317};
318
319/* read a single page, without unlocking it. */
320static int ceph_readpage(struct file *file, struct page *page)
321{
322 struct inode *inode = file_inode(file);
323 struct ceph_inode_info *ci = ceph_inode(inode);
324 struct ceph_vino vino = ceph_vino(inode);
325 u64 off = page_offset(page);
8ff2d290 326 u64 len = thp_size(page);
f0702876
JL
327
328 if (ci->i_inline_version != CEPH_INLINE_NONE) {
329 /*
330 * Uptodate inline data should have been added
331 * into page cache while getting Fcr caps.
332 */
333 if (off == 0) {
334 unlock_page(page);
335 return -EINVAL;
336 }
8ff2d290 337 zero_user_segment(page, 0, thp_size(page));
f0702876
JL
338 SetPageUptodate(page);
339 unlock_page(page);
340 return 0;
341 }
342
343 dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
344 vino.ino, vino.snap, file, off, len, page, page->index);
345
346 return netfs_readpage(file, page, &ceph_netfs_read_ops, NULL);
347}
348
49870056 349static void ceph_readahead(struct readahead_control *ractl)
1d3576fd 350{
49870056
JL
351 struct inode *inode = file_inode(ractl->file);
352 struct ceph_file_info *fi = ractl->file->private_data;
353 struct ceph_rw_context *rw_ctx;
2b1ac852
YZ
354 int got = 0;
355 int ret = 0;
356
49870056
JL
357 if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
358 return;
359
360 rw_ctx = ceph_find_rw_context(fi);
5d988308 361 if (!rw_ctx) {
49870056
JL
362 /*
363 * readahead callers do not necessarily hold Fcb caps
364 * (e.g. fadvise, madvise).
365 */
2b1ac852 366 int want = CEPH_CAP_FILE_CACHE;
49870056
JL
367
368 ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got);
369 if (ret < 0)
2b1ac852 370 dout("start_read %p, error getting cap\n", inode);
49870056 371 else if (!(got & want))
2b1ac852 372 dout("start_read %p, no cache cap\n", inode);
1d3576fd 373
49870056
JL
374 if (ret <= 0)
375 return;
2b1ac852 376 }
49870056 377 netfs_readahead(ractl, &ceph_netfs_read_ops, (void *)(uintptr_t)got);
1d3576fd
SW
378}
379
1f934b00
YZ
380struct ceph_writeback_ctl
381{
382 loff_t i_size;
383 u64 truncate_size;
384 u32 truncate_seq;
385 bool size_stable;
2a2d927e 386 bool head_snapc;
1f934b00
YZ
387};
388
1d3576fd
SW
389/*
390 * Get ref for the oldest snapc for an inode with dirty data... that is, the
391 * only snap context we are allowed to write back.
1d3576fd 392 */
1f934b00 393static struct ceph_snap_context *
05455e11
YZ
394get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
395 struct ceph_snap_context *page_snapc)
1d3576fd
SW
396{
397 struct ceph_inode_info *ci = ceph_inode(inode);
398 struct ceph_snap_context *snapc = NULL;
399 struct ceph_cap_snap *capsnap = NULL;
400
be655596 401 spin_lock(&ci->i_ceph_lock);
1d3576fd
SW
402 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
403 dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
404 capsnap->context, capsnap->dirty_pages);
05455e11
YZ
405 if (!capsnap->dirty_pages)
406 continue;
407
408 /* get i_size, truncate_{seq,size} for page_snapc? */
409 if (snapc && capsnap->context != page_snapc)
410 continue;
411
412 if (ctl) {
413 if (capsnap->writing) {
414 ctl->i_size = i_size_read(inode);
415 ctl->size_stable = false;
416 } else {
417 ctl->i_size = capsnap->size;
418 ctl->size_stable = true;
1f934b00 419 }
05455e11
YZ
420 ctl->truncate_size = capsnap->truncate_size;
421 ctl->truncate_seq = capsnap->truncate_seq;
2a2d927e 422 ctl->head_snapc = false;
1d3576fd 423 }
05455e11
YZ
424
425 if (snapc)
426 break;
427
428 snapc = ceph_get_snap_context(capsnap->context);
429 if (!page_snapc ||
430 page_snapc == snapc ||
431 page_snapc->seq > snapc->seq)
432 break;
1d3576fd 433 }
7d8cb26d 434 if (!snapc && ci->i_wrbuffer_ref_head) {
80e755fe 435 snapc = ceph_get_snap_context(ci->i_head_snapc);
1d3576fd
SW
436 dout(" head snapc %p has %d dirty pages\n",
437 snapc, ci->i_wrbuffer_ref_head);
1f934b00
YZ
438 if (ctl) {
439 ctl->i_size = i_size_read(inode);
440 ctl->truncate_size = ci->i_truncate_size;
441 ctl->truncate_seq = ci->i_truncate_seq;
442 ctl->size_stable = false;
2a2d927e 443 ctl->head_snapc = true;
1f934b00 444 }
1d3576fd 445 }
be655596 446 spin_unlock(&ci->i_ceph_lock);
1d3576fd
SW
447 return snapc;
448}
449
1f934b00
YZ
450static u64 get_writepages_data_length(struct inode *inode,
451 struct page *page, u64 start)
452{
453 struct ceph_inode_info *ci = ceph_inode(inode);
454 struct ceph_snap_context *snapc = page_snap_context(page);
455 struct ceph_cap_snap *capsnap = NULL;
456 u64 end = i_size_read(inode);
457
458 if (snapc != ci->i_head_snapc) {
459 bool found = false;
460 spin_lock(&ci->i_ceph_lock);
461 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
462 if (capsnap->context == snapc) {
463 if (!capsnap->writing)
464 end = capsnap->size;
465 found = true;
466 break;
467 }
468 }
469 spin_unlock(&ci->i_ceph_lock);
470 WARN_ON(!found);
471 }
8ff2d290
JL
472 if (end > page_offset(page) + thp_size(page))
473 end = page_offset(page) + thp_size(page);
1f934b00
YZ
474 return end > start ? end - start : 0;
475}
476
1d3576fd
SW
477/*
478 * Write a single page, but leave the page locked.
479 *
b72b13eb 480 * If we get a write error, mark the mapping for error, but still adjust the
1d3576fd
SW
481 * dirty page accounting (i.e., page is no longer dirty).
482 */
483static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
484{
6390987f
JL
485 struct inode *inode = page->mapping->host;
486 struct ceph_inode_info *ci = ceph_inode(inode);
487 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
6298a337 488 struct ceph_snap_context *snapc, *oldest;
fc2744aa 489 loff_t page_off = page_offset(page);
6390987f 490 int err;
8ff2d290 491 loff_t len = thp_size(page);
1f934b00 492 struct ceph_writeback_ctl ceph_wbc;
6390987f
JL
493 struct ceph_osd_client *osdc = &fsc->client->osdc;
494 struct ceph_osd_request *req;
1d3576fd
SW
495
496 dout("writepage %p idx %lu\n", page, page->index);
497
1d3576fd 498 /* verify this is a writeable snap context */
61600ef8 499 snapc = page_snap_context(page);
d37b1d99 500 if (!snapc) {
1d3576fd 501 dout("writepage %p page %p not dirty?\n", inode, page);
43986881 502 return 0;
1d3576fd 503 }
05455e11 504 oldest = get_oldest_context(inode, &ceph_wbc, snapc);
6298a337 505 if (snapc->seq > oldest->seq) {
1d3576fd 506 dout("writepage %p page %p snapc %p not writeable - noop\n",
61600ef8 507 inode, page, snapc);
1d3576fd 508 /* we should only noop if called by kswapd */
fa71fefb 509 WARN_ON(!(current->flags & PF_MEMALLOC));
6298a337 510 ceph_put_snap_context(oldest);
fa71fefb 511 redirty_page_for_writepage(wbc, page);
43986881 512 return 0;
1d3576fd 513 }
6298a337 514 ceph_put_snap_context(oldest);
1d3576fd
SW
515
516 /* is this a partial page at end of file? */
1f934b00
YZ
517 if (page_off >= ceph_wbc.i_size) {
518 dout("%p page eof %llu\n", page, ceph_wbc.i_size);
8ff2d290 519 page->mapping->a_ops->invalidatepage(page, 0, thp_size(page));
43986881 520 return 0;
fc2744aa 521 }
43986881 522
1f934b00
YZ
523 if (ceph_wbc.i_size < page_off + len)
524 len = ceph_wbc.i_size - page_off;
1d3576fd 525
6390987f 526 dout("writepage %p page %p index %lu on %llu~%llu snapc %p seq %lld\n",
1c0a9c2d 527 inode, page, page->index, page_off, len, snapc, snapc->seq);
1d3576fd 528
314c4737 529 if (atomic_long_inc_return(&fsc->writeback_count) >
3d14c5d2 530 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
09dc9fc2 531 set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
2baba250 532
1d3576fd 533 set_page_writeback(page);
6390987f
JL
534 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), page_off, &len, 0, 1,
535 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc,
536 ceph_wbc.truncate_seq, ceph_wbc.truncate_size,
537 true);
538 if (IS_ERR(req)) {
539 redirty_page_for_writepage(wbc, page);
540 end_page_writeback(page);
541 return PTR_ERR(req);
542 }
543
544 /* it may be a short write due to an object boundary */
8ff2d290 545 WARN_ON_ONCE(len > thp_size(page));
6390987f
JL
546 osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
547 dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);
548
549 req->r_mtime = inode->i_mtime;
550 err = ceph_osdc_start_request(osdc, req, true);
551 if (!err)
552 err = ceph_osdc_wait_request(osdc, req);
553
8ae99ae2 554 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
903f4fec 555 req->r_end_latency, len, err);
6390987f
JL
556
557 ceph_osdc_put_request(req);
558 if (err == 0)
559 err = len;
560
1d3576fd 561 if (err < 0) {
ad15ec06
YZ
562 struct writeback_control tmp_wbc;
563 if (!wbc)
564 wbc = &tmp_wbc;
565 if (err == -ERESTARTSYS) {
566 /* killed by SIGKILL */
567 dout("writepage interrupted page %p\n", page);
568 redirty_page_for_writepage(wbc, page);
569 end_page_writeback(page);
43986881 570 return err;
ad15ec06 571 }
0b98acd6
ID
572 if (err == -EBLOCKLISTED)
573 fsc->blocklisted = true;
ad15ec06
YZ
574 dout("writepage setting page/mapping error %d %p\n",
575 err, page);
1d3576fd 576 mapping_set_error(&inode->i_data, err);
ad15ec06 577 wbc->pages_skipped++;
1d3576fd
SW
578 } else {
579 dout("writepage cleaned page %p\n", page);
580 err = 0; /* vfs expects us to return 0 */
581 }
379fc7fa
JL
582 oldest = detach_page_private(page);
583 WARN_ON_ONCE(oldest != snapc);
1d3576fd
SW
584 end_page_writeback(page);
585 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
6298a337 586 ceph_put_snap_context(snapc); /* page's reference */
314c4737
YZ
587
588 if (atomic_long_dec_return(&fsc->writeback_count) <
589 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
590 clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
591
1d3576fd
SW
592 return err;
593}
594
595static int ceph_writepage(struct page *page, struct writeback_control *wbc)
596{
dbd646a8
YS
597 int err;
598 struct inode *inode = page->mapping->host;
599 BUG_ON(!inode);
70b666c3 600 ihold(inode);
dbd646a8 601 err = writepage_nounlock(page, wbc);
ad15ec06
YZ
602 if (err == -ERESTARTSYS) {
603 /* direct memory reclaimer was killed by SIGKILL. return 0
604 * to prevent caller from setting mapping/page error */
605 err = 0;
606 }
1d3576fd 607 unlock_page(page);
dbd646a8 608 iput(inode);
1d3576fd
SW
609 return err;
610}
611
1d3576fd
SW
612/*
613 * async writeback completion handler.
614 *
615 * If we get an error, set the mapping error bit, but not the individual
616 * page error bits.
617 */
85e084fe 618static void writepages_finish(struct ceph_osd_request *req)
1d3576fd
SW
619{
620 struct inode *inode = req->r_inode;
1d3576fd 621 struct ceph_inode_info *ci = ceph_inode(inode);
87060c10 622 struct ceph_osd_data *osd_data;
1d3576fd 623 struct page *page;
5b64640c
YZ
624 int num_pages, total_pages = 0;
625 int i, j;
626 int rc = req->r_result;
1d3576fd
SW
627 struct ceph_snap_context *snapc = req->r_snapc;
628 struct address_space *mapping = inode->i_mapping;
3d14c5d2 629 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
903f4fec 630 unsigned int len = 0;
5b64640c 631 bool remove_page;
1d3576fd 632
5b64640c 633 dout("writepages_finish %p rc %d\n", inode, rc);
26544c62 634 if (rc < 0) {
1d3576fd 635 mapping_set_error(mapping, rc);
26544c62 636 ceph_set_error_write(ci);
0b98acd6
ID
637 if (rc == -EBLOCKLISTED)
638 fsc->blocklisted = true;
26544c62
JL
639 } else {
640 ceph_clear_error_write(ci);
641 }
5b64640c
YZ
642
643 /*
644 * We lost the cache cap, need to truncate the page before
645 * it is unlocked, otherwise we'd truncate it later in the
646 * page truncation thread, possibly losing some data that
647 * raced its way in
648 */
649 remove_page = !(ceph_caps_issued(ci) &
650 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
1d3576fd
SW
651
652 /* clean all pages */
5b64640c
YZ
653 for (i = 0; i < req->r_num_ops; i++) {
654 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
655 break;
e63dc5c7 656
5b64640c
YZ
657 osd_data = osd_req_op_extent_osd_data(req, i);
658 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
903f4fec 659 len += osd_data->length;
5b64640c
YZ
660 num_pages = calc_pages_for((u64)osd_data->alignment,
661 (u64)osd_data->length);
662 total_pages += num_pages;
663 for (j = 0; j < num_pages; j++) {
664 page = osd_data->pages[j];
665 BUG_ON(!page);
666 WARN_ON(!PageUptodate(page));
667
668 if (atomic_long_dec_return(&fsc->writeback_count) <
669 CONGESTION_OFF_THRESH(
670 fsc->mount_options->congestion_kb))
09dc9fc2 671 clear_bdi_congested(inode_to_bdi(inode),
5b64640c
YZ
672 BLK_RW_ASYNC);
673
379fc7fa 674 ceph_put_snap_context(detach_page_private(page));
5b64640c 675 end_page_writeback(page);
379fc7fa 676 dout("unlocking %p\n", page);
5b64640c
YZ
677
678 if (remove_page)
679 generic_error_remove_page(inode->i_mapping,
680 page);
681
682 unlock_page(page);
683 }
684 dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
685 inode, osd_data->length, rc >= 0 ? num_pages : 0);
e63dc5c7 686
96ac9158 687 release_pages(osd_data->pages, num_pages);
1d3576fd 688 }
1d3576fd 689
903f4fec
XL
690 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
691 req->r_end_latency, len, rc);
692
5b64640c
YZ
693 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
694
695 osd_data = osd_req_op_extent_osd_data(req, 0);
87060c10 696 if (osd_data->pages_from_pool)
a0102bda 697 mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
1d3576fd 698 else
87060c10 699 kfree(osd_data->pages);
1d3576fd
SW
700 ceph_osdc_put_request(req);
701}
702
1d3576fd
SW
703/*
704 * initiate async writeback
705 */
706static int ceph_writepages_start(struct address_space *mapping,
707 struct writeback_control *wbc)
708{
709 struct inode *inode = mapping->host;
1d3576fd 710 struct ceph_inode_info *ci = ceph_inode(inode);
fc2744aa
YZ
711 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
712 struct ceph_vino vino = ceph_vino(inode);
2a2d927e 713 pgoff_t index, start_index, end = -1;
80e755fe 714 struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
1d3576fd 715 struct pagevec pvec;
1d3576fd 716 int rc = 0;
93407472 717 unsigned int wsize = i_blocksize(inode);
1d3576fd 718 struct ceph_osd_request *req = NULL;
1f934b00 719 struct ceph_writeback_ctl ceph_wbc;
590e9d98 720 bool should_loop, range_whole = false;
af9cc401 721 bool done = false;
1d3576fd 722
3fb99d48 723 dout("writepages_start %p (mode=%s)\n", inode,
1d3576fd
SW
724 wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
725 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
726
50c9132d 727 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
6c93df5d
YZ
728 if (ci->i_wrbuffer_ref > 0) {
729 pr_warn_ratelimited(
730 "writepage_start %p %lld forced umount\n",
731 inode, ceph_ino(inode));
732 }
a341d4df 733 mapping_set_error(mapping, -EIO);
1d3576fd
SW
734 return -EIO; /* we're in a forced umount, don't write! */
735 }
95cca2b4 736 if (fsc->mount_options->wsize < wsize)
3d14c5d2 737 wsize = fsc->mount_options->wsize;
1d3576fd 738
86679820 739 pagevec_init(&pvec);
1d3576fd 740
590e9d98 741 start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
2a2d927e 742 index = start_index;
1d3576fd
SW
743
744retry:
745 /* find oldest snap context with dirty data */
05455e11 746 snapc = get_oldest_context(inode, &ceph_wbc, NULL);
1d3576fd
SW
747 if (!snapc) {
748 /* hmm, why does writepages get called when there
749 is no dirty data? */
750 dout(" no snap context with dirty data?\n");
751 goto out;
752 }
753 dout(" oldest snapc is %p seq %lld (%d snaps)\n",
754 snapc, snapc->seq, snapc->num_snaps);
fc2744aa 755
2a2d927e
YZ
756 should_loop = false;
757 if (ceph_wbc.head_snapc && snapc != last_snapc) {
758 /* where to start/end? */
759 if (wbc->range_cyclic) {
760 index = start_index;
761 end = -1;
762 if (index > 0)
763 should_loop = true;
764 dout(" cyclic, start at %lu\n", index);
765 } else {
766 index = wbc->range_start >> PAGE_SHIFT;
767 end = wbc->range_end >> PAGE_SHIFT;
768 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
769 range_whole = true;
770 dout(" not cyclic, %lu to %lu\n", index, end);
771 }
772 } else if (!ceph_wbc.head_snapc) {
773 /* Do not respect wbc->range_{start,end}. Dirty pages
774 * in that range can be associated with newer snapc.
775 * They are not writeable until we write all dirty pages
776 * associated with 'snapc' get written */
1582af2e 777 if (index > 0)
2a2d927e
YZ
778 should_loop = true;
779 dout(" non-head snapc, range whole\n");
1d3576fd 780 }
2a2d927e
YZ
781
782 ceph_put_snap_context(last_snapc);
1d3576fd
SW
783 last_snapc = snapc;
784
af9cc401 785 while (!done && index <= end) {
5b64640c 786 int num_ops = 0, op_idx;
0e5ecac7 787 unsigned i, pvec_pages, max_pages, locked_pages = 0;
5b64640c 788 struct page **pages = NULL, **data_pages;
1d3576fd 789 struct page *page;
0e5ecac7 790 pgoff_t strip_unit_end = 0;
5b64640c 791 u64 offset = 0, len = 0;
a0102bda 792 bool from_pool = false;
1d3576fd 793
0e5ecac7 794 max_pages = wsize >> PAGE_SHIFT;
1d3576fd
SW
795
796get_more_pages:
2e169296
JL
797 pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
798 end, PAGECACHE_TAG_DIRTY);
0ed75fc8 799 dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
1d3576fd
SW
800 if (!pvec_pages && !locked_pages)
801 break;
802 for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
803 page = pvec.pages[i];
804 dout("? %p idx %lu\n", page, page->index);
805 if (locked_pages == 0)
806 lock_page(page); /* first page */
807 else if (!trylock_page(page))
808 break;
809
810 /* only dirty pages, or our accounting breaks */
811 if (unlikely(!PageDirty(page)) ||
812 unlikely(page->mapping != mapping)) {
813 dout("!dirty or !mapping %p\n", page);
814 unlock_page(page);
0713e5f2 815 continue;
1d3576fd 816 }
af9cc401
YZ
817 /* only if matching snap context */
818 pgsnapc = page_snap_context(page);
819 if (pgsnapc != snapc) {
820 dout("page snapc %p %lld != oldest %p %lld\n",
821 pgsnapc, pgsnapc->seq, snapc, snapc->seq);
1582af2e
YZ
822 if (!should_loop &&
823 !ceph_wbc.head_snapc &&
824 wbc->sync_mode != WB_SYNC_NONE)
825 should_loop = true;
1d3576fd 826 unlock_page(page);
af9cc401 827 continue;
1d3576fd 828 }
1f934b00
YZ
829 if (page_offset(page) >= ceph_wbc.i_size) {
830 dout("%p page eof %llu\n",
831 page, ceph_wbc.i_size);
c95f1c5f
EC
832 if ((ceph_wbc.size_stable ||
833 page_offset(page) >= i_size_read(inode)) &&
834 clear_page_dirty_for_io(page))
af9cc401 835 mapping->a_ops->invalidatepage(page,
8ff2d290 836 0, thp_size(page));
af9cc401
YZ
837 unlock_page(page);
838 continue;
839 }
840 if (strip_unit_end && (page->index > strip_unit_end)) {
841 dout("end of strip unit %p\n", page);
1d3576fd
SW
842 unlock_page(page);
843 break;
844 }
845 if (PageWriteback(page)) {
0713e5f2
YZ
846 if (wbc->sync_mode == WB_SYNC_NONE) {
847 dout("%p under writeback\n", page);
848 unlock_page(page);
849 continue;
850 }
851 dout("waiting on writeback %p\n", page);
852 wait_on_page_writeback(page);
1d3576fd
SW
853 }
854
1d3576fd
SW
855 if (!clear_page_dirty_for_io(page)) {
856 dout("%p !clear_page_dirty_for_io\n", page);
857 unlock_page(page);
0713e5f2 858 continue;
1d3576fd
SW
859 }
860
e5975c7c
AE
861 /*
862 * We have something to write. If this is
863 * the first locked page this time through,
5b64640c
YZ
864 * calculate max possinle write size and
865 * allocate a page array
e5975c7c 866 */
1d3576fd 867 if (locked_pages == 0) {
5b64640c
YZ
868 u64 objnum;
869 u64 objoff;
dccbf080 870 u32 xlen;
5b64640c 871
1d3576fd 872 /* prepare async write request */
e5975c7c 873 offset = (u64)page_offset(page);
dccbf080
ID
874 ceph_calc_file_object_mapping(&ci->i_layout,
875 offset, wsize,
876 &objnum, &objoff,
877 &xlen);
878 len = xlen;
8c71897b 879
3fb99d48 880 num_ops = 1;
5b64640c 881 strip_unit_end = page->index +
09cbfeaf 882 ((len - 1) >> PAGE_SHIFT);
88486957 883
5b64640c 884 BUG_ON(pages);
88486957 885 max_pages = calc_pages_for(0, (u64)len);
6da2ec56
KC
886 pages = kmalloc_array(max_pages,
887 sizeof(*pages),
888 GFP_NOFS);
88486957 889 if (!pages) {
a0102bda
JL
890 from_pool = true;
891 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
e5975c7c 892 BUG_ON(!pages);
88486957 893 }
5b64640c
YZ
894
895 len = 0;
896 } else if (page->index !=
09cbfeaf 897 (offset + len) >> PAGE_SHIFT) {
a0102bda
JL
898 if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS :
899 CEPH_OSD_MAX_OPS)) {
5b64640c
YZ
900 redirty_page_for_writepage(wbc, page);
901 unlock_page(page);
902 break;
903 }
904
905 num_ops++;
906 offset = (u64)page_offset(page);
907 len = 0;
1d3576fd
SW
908 }
909
910 /* note position of first page in pvec */
1d3576fd
SW
911 dout("%p will write page %p idx %lu\n",
912 inode, page, page->index);
2baba250 913
5b64640c
YZ
914 if (atomic_long_inc_return(&fsc->writeback_count) >
915 CONGESTION_ON_THRESH(
3d14c5d2 916 fsc->mount_options->congestion_kb)) {
09dc9fc2 917 set_bdi_congested(inode_to_bdi(inode),
213c99ee 918 BLK_RW_ASYNC);
2baba250
YS
919 }
920
0713e5f2
YZ
921
922 pages[locked_pages++] = page;
923 pvec.pages[i] = NULL;
924
8ff2d290 925 len += thp_size(page);
1d3576fd
SW
926 }
927
928 /* did we get anything? */
929 if (!locked_pages)
930 goto release_pvec_pages;
931 if (i) {
0713e5f2
YZ
932 unsigned j, n = 0;
933 /* shift unused page to beginning of pvec */
934 for (j = 0; j < pvec_pages; j++) {
935 if (!pvec.pages[j])
936 continue;
937 if (n < j)
938 pvec.pages[n] = pvec.pages[j];
939 n++;
940 }
941 pvec.nr = n;
1d3576fd
SW
942
943 if (pvec_pages && i == pvec_pages &&
944 locked_pages < max_pages) {
945 dout("reached end pvec, trying for more\n");
0713e5f2 946 pagevec_release(&pvec);
1d3576fd
SW
947 goto get_more_pages;
948 }
1d3576fd
SW
949 }
950
5b64640c 951new_request:
e5975c7c 952 offset = page_offset(pages[0]);
5b64640c
YZ
953 len = wsize;
954
955 req = ceph_osdc_new_request(&fsc->client->osdc,
956 &ci->i_layout, vino,
957 offset, &len, 0, num_ops,
1f934b00
YZ
958 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
959 snapc, ceph_wbc.truncate_seq,
960 ceph_wbc.truncate_size, false);
5b64640c
YZ
961 if (IS_ERR(req)) {
962 req = ceph_osdc_new_request(&fsc->client->osdc,
963 &ci->i_layout, vino,
964 offset, &len, 0,
965 min(num_ops,
966 CEPH_OSD_SLAB_OPS),
967 CEPH_OSD_OP_WRITE,
54ea0046 968 CEPH_OSD_FLAG_WRITE,
1f934b00
YZ
969 snapc, ceph_wbc.truncate_seq,
970 ceph_wbc.truncate_size, true);
5b64640c 971 BUG_ON(IS_ERR(req));
e1966b49 972 }
5b64640c 973 BUG_ON(len < page_offset(pages[locked_pages - 1]) +
8ff2d290 974 thp_size(page) - offset);
5b64640c
YZ
975
976 req->r_callback = writepages_finish;
977 req->r_inode = inode;
1d3576fd 978
5b64640c
YZ
979 /* Format the osd request message and submit the write */
980 len = 0;
981 data_pages = pages;
982 op_idx = 0;
983 for (i = 0; i < locked_pages; i++) {
984 u64 cur_offset = page_offset(pages[i]);
985 if (offset + len != cur_offset) {
3fb99d48 986 if (op_idx + 1 == req->r_num_ops)
5b64640c
YZ
987 break;
988 osd_req_op_extent_dup_last(req, op_idx,
989 cur_offset - offset);
990 dout("writepages got pages at %llu~%llu\n",
991 offset, len);
992 osd_req_op_extent_osd_data_pages(req, op_idx,
993 data_pages, len, 0,
a0102bda 994 from_pool, false);
5b64640c 995 osd_req_op_extent_update(req, op_idx, len);
e5975c7c 996
5b64640c
YZ
997 len = 0;
998 offset = cur_offset;
999 data_pages = pages + i;
1000 op_idx++;
1001 }
1002
1003 set_page_writeback(pages[i]);
8ff2d290 1004 len += thp_size(page);
5b64640c
YZ
1005 }
1006
1f934b00
YZ
1007 if (ceph_wbc.size_stable) {
1008 len = min(len, ceph_wbc.i_size - offset);
5b64640c
YZ
1009 } else if (i == locked_pages) {
1010 /* writepages_finish() clears writeback pages
1011 * according to the data length, so make sure
1012 * data length covers all locked pages */
8ff2d290 1013 u64 min_len = len + 1 - thp_size(page);
1f934b00
YZ
1014 len = get_writepages_data_length(inode, pages[i - 1],
1015 offset);
5b64640c
YZ
1016 len = max(len, min_len);
1017 }
1018 dout("writepages got pages at %llu~%llu\n", offset, len);
e5975c7c 1019
5b64640c 1020 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
a0102bda 1021 0, from_pool, false);
5b64640c 1022 osd_req_op_extent_update(req, op_idx, len);
e5975c7c 1023
5b64640c
YZ
1024 BUG_ON(op_idx + 1 != req->r_num_ops);
1025
a0102bda 1026 from_pool = false;
5b64640c
YZ
1027 if (i < locked_pages) {
1028 BUG_ON(num_ops <= req->r_num_ops);
1029 num_ops -= req->r_num_ops;
5b64640c
YZ
1030 locked_pages -= i;
1031
1032 /* allocate new pages array for next request */
1033 data_pages = pages;
6da2ec56
KC
1034 pages = kmalloc_array(locked_pages, sizeof(*pages),
1035 GFP_NOFS);
5b64640c 1036 if (!pages) {
a0102bda
JL
1037 from_pool = true;
1038 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
5b64640c
YZ
1039 BUG_ON(!pages);
1040 }
1041 memcpy(pages, data_pages + i,
1042 locked_pages * sizeof(*pages));
1043 memset(data_pages + i, 0,
1044 locked_pages * sizeof(*pages));
1045 } else {
1046 BUG_ON(num_ops != req->r_num_ops);
1047 index = pages[i - 1]->index + 1;
1048 /* request message now owns the pages array */
1049 pages = NULL;
1050 }
e5975c7c 1051
fac02ddf 1052 req->r_mtime = inode->i_mtime;
9d6fcb08
SW
1053 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
1054 BUG_ON(rc);
1d3576fd
SW
1055 req = NULL;
1056
5b64640c
YZ
1057 wbc->nr_to_write -= i;
1058 if (pages)
1059 goto new_request;
1060
2a2d927e
YZ
1061 /*
1062 * We stop writing back only if we are not doing
1063 * integrity sync. In case of integrity sync we have to
1064 * keep going until we have written all the pages
1065 * we tagged for writeback prior to entering this loop.
1066 */
1067 if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
af9cc401 1068 done = true;
1d3576fd
SW
1069
1070release_pvec_pages:
1071 dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
1072 pvec.nr ? pvec.pages[0] : NULL);
1073 pagevec_release(&pvec);
1d3576fd
SW
1074 }
1075
1076 if (should_loop && !done) {
1077 /* more to do; loop back to beginning of file */
1078 dout("writepages looping back to beginning of file\n");
2a2d927e 1079 end = start_index - 1; /* OK even when start_index == 0 */
f275635e
YZ
1080
1081 /* to write dirty pages associated with next snapc,
1082 * we need to wait until current writes complete */
1083 if (wbc->sync_mode != WB_SYNC_NONE &&
1084 start_index == 0 && /* all dirty pages were checked */
1085 !ceph_wbc.head_snapc) {
1086 struct page *page;
1087 unsigned i, nr;
1088 index = 0;
1089 while ((index <= end) &&
1090 (nr = pagevec_lookup_tag(&pvec, mapping, &index,
67fd707f 1091 PAGECACHE_TAG_WRITEBACK))) {
f275635e
YZ
1092 for (i = 0; i < nr; i++) {
1093 page = pvec.pages[i];
1094 if (page_snap_context(page) != snapc)
1095 continue;
1096 wait_on_page_writeback(page);
1097 }
1098 pagevec_release(&pvec);
1099 cond_resched();
1100 }
1101 }
1102
2a2d927e 1103 start_index = 0;
1d3576fd
SW
1104 index = 0;
1105 goto retry;
1106 }
1107
1108 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
1109 mapping->writeback_index = index;
1110
1111out:
3ed97d63 1112 ceph_osdc_put_request(req);
2a2d927e
YZ
1113 ceph_put_snap_context(last_snapc);
1114 dout("writepages dend - startone, rc = %d\n", rc);
1d3576fd
SW
1115 return rc;
1116}
1117
1118
1119
1120/*
1121 * See if a given @snapc is either writeable, or already written.
1122 */
1123static int context_is_writeable_or_written(struct inode *inode,
1124 struct ceph_snap_context *snapc)
1125{
05455e11 1126 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
6298a337
SW
1127 int ret = !oldest || snapc->seq <= oldest->seq;
1128
1129 ceph_put_snap_context(oldest);
1130 return ret;
1d3576fd
SW
1131}
1132
18d620f0
JL
1133/**
1134 * ceph_find_incompatible - find an incompatible context and return it
18d620f0 1135 * @page: page being dirtied
8f883c24 1136 *
18d620f0
JL
1137 * We are only allowed to write into/dirty a page if the page is
1138 * clean, or already dirty within the same snap context. Returns a
1139 * conflicting context if there is one, NULL if there isn't, or a
1140 * negative error code on other errors.
1141 *
1142 * Must be called with page lock held.
1d3576fd 1143 */
18d620f0 1144static struct ceph_snap_context *
d45156bf 1145ceph_find_incompatible(struct page *page)
1d3576fd 1146{
d45156bf 1147 struct inode *inode = page->mapping->host;
6c93df5d 1148 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1d3576fd 1149 struct ceph_inode_info *ci = ceph_inode(inode);
1d3576fd 1150
50c9132d 1151 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
6c93df5d 1152 dout(" page %p forced umount\n", page);
18d620f0 1153 return ERR_PTR(-EIO);
6c93df5d
YZ
1154 }
1155
18d620f0
JL
1156 for (;;) {
1157 struct ceph_snap_context *snapc, *oldest;
1158
1159 wait_on_page_writeback(page);
1160
1161 snapc = page_snap_context(page);
1162 if (!snapc || snapc == ci->i_head_snapc)
1163 break;
1d3576fd 1164
1d3576fd
SW
1165 /*
1166 * this page is already dirty in another (older) snap
1167 * context! is it writeable now?
1168 */
05455e11 1169 oldest = get_oldest_context(inode, NULL, NULL);
80e755fe 1170 if (snapc->seq > oldest->seq) {
18d620f0 1171 /* not writeable -- return it for the caller to deal with */
6298a337 1172 ceph_put_snap_context(oldest);
18d620f0
JL
1173 dout(" page %p snapc %p not current or oldest\n", page, snapc);
1174 return ceph_get_snap_context(snapc);
1d3576fd 1175 }
6298a337 1176 ceph_put_snap_context(oldest);
1d3576fd
SW
1177
1178 /* yay, writeable, do it now (without dropping page lock) */
18d620f0
JL
1179 dout(" page %p snapc %p not current, but oldest\n", page, snapc);
1180 if (clear_page_dirty_for_io(page)) {
1181 int r = writepage_nounlock(page, NULL);
1182 if (r < 0)
1183 return ERR_PTR(r);
1184 }
1185 }
1186 return NULL;
1187}
1188
d801327d
JL
1189static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
1190 struct page *page, void **_fsdata)
1191{
1192 struct inode *inode = file_inode(file);
1193 struct ceph_inode_info *ci = ceph_inode(inode);
1194 struct ceph_snap_context *snapc;
1195
1196 snapc = ceph_find_incompatible(page);
1197 if (snapc) {
1198 int r;
1199
1200 unlock_page(page);
1201 put_page(page);
1202 if (IS_ERR(snapc))
1203 return PTR_ERR(snapc);
1204
1205 ceph_queue_writeback(inode);
1206 r = wait_event_killable(ci->i_cap_wq,
1207 context_is_writeable_or_written(inode, snapc));
1208 ceph_put_snap_context(snapc);
1209 return r == 0 ? -EAGAIN : r;
1210 }
1211 return 0;
1212}
1213
18d620f0
JL
1214/*
1215 * We are only allowed to write into/dirty the page if the page is
1216 * clean, or already dirty within the same snap context.
18d620f0 1217 */
1cc16990
JL
1218static int ceph_write_begin(struct file *file, struct address_space *mapping,
1219 loff_t pos, unsigned len, unsigned flags,
1220 struct page **pagep, void **fsdata)
18d620f0
JL
1221{
1222 struct inode *inode = file_inode(file);
1223 struct ceph_inode_info *ci = ceph_inode(inode);
1cc16990
JL
1224 struct page *page = NULL;
1225 pgoff_t index = pos >> PAGE_SHIFT;
d801327d 1226 int r;
1d3576fd 1227
d801327d
JL
1228 /*
1229 * Uninlining should have already been done and everything updated, EXCEPT
1230 * for inline_version sent to the MDS.
1231 */
1232 if (ci->i_inline_version != CEPH_INLINE_NONE) {
4a357f50 1233 page = grab_cache_page_write_begin(mapping, index, flags);
d801327d
JL
1234 if (!page)
1235 return -ENOMEM;
4af6b225 1236
1cc16990 1237 /*
d801327d
JL
1238 * The inline_version on a new inode is set to 1. If that's the
1239 * case, then the page is brand new and isn't yet Uptodate.
1cc16990 1240 */
d801327d
JL
1241 r = 0;
1242 if (index == 0 && ci->i_inline_version != 1) {
1243 if (!PageUptodate(page)) {
1244 WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n",
1245 ci->i_inline_version);
1246 r = -EINVAL;
1247 }
1248 goto out;
1cc16990 1249 }
8ff2d290 1250 zero_user_segment(page, 0, thp_size(page));
d801327d
JL
1251 SetPageUptodate(page);
1252 goto out;
1cc16990 1253 }
4af6b225 1254
d801327d
JL
1255 r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &page, NULL,
1256 &ceph_netfs_read_ops, NULL);
1257out:
1258 if (r == 0)
1259 wait_on_page_fscache(page);
1cc16990 1260 if (r < 0) {
d801327d 1261 if (page)
09cbfeaf 1262 put_page(page);
1cc16990 1263 } else {
d801327d 1264 WARN_ON_ONCE(!PageLocked(page));
1cc16990
JL
1265 *pagep = page;
1266 }
4af6b225
YS
1267 return r;
1268}
1269
1d3576fd
SW
1270/*
1271 * we don't do anything in here that simple_write_end doesn't do
5dda377c 1272 * except adjust dirty page accounting
1d3576fd
SW
1273 */
1274static int ceph_write_end(struct file *file, struct address_space *mapping,
1275 loff_t pos, unsigned len, unsigned copied,
1276 struct page *page, void *fsdata)
1277{
496ad9aa 1278 struct inode *inode = file_inode(file);
efb0ca76 1279 bool check_cap = false;
1d3576fd
SW
1280
1281 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1282 inode, page, (int)pos, (int)copied, (int)len);
1283
b9de313c 1284 if (!PageUptodate(page)) {
ce3a8732 1285 /* just return that nothing was copied on a short copy */
b9de313c
AV
1286 if (copied < len) {
1287 copied = 0;
1288 goto out;
1289 }
1290 SetPageUptodate(page);
1291 }
1d3576fd
SW
1292
1293 /* did file size increase? */
99c88e69 1294 if (pos+copied > i_size_read(inode))
1d3576fd
SW
1295 check_cap = ceph_inode_set_size(inode, pos+copied);
1296
1d3576fd
SW
1297 set_page_dirty(page);
1298
b9de313c 1299out:
1d3576fd 1300 unlock_page(page);
09cbfeaf 1301 put_page(page);
1d3576fd
SW
1302
1303 if (check_cap)
1304 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1305
1306 return copied;
1307}
1308
1309/*
1310 * we set .direct_IO to indicate direct io is supported, but since we
1311 * intercept O_DIRECT reads and writes early, this function should
1312 * never get called.
1313 */
c8b8e32d 1314static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter)
1d3576fd
SW
1315{
1316 WARN_ON(1);
1317 return -EINVAL;
1318}
1319
1320const struct address_space_operations ceph_aops = {
1321 .readpage = ceph_readpage,
49870056 1322 .readahead = ceph_readahead,
1d3576fd
SW
1323 .writepage = ceph_writepage,
1324 .writepages = ceph_writepages_start,
1325 .write_begin = ceph_write_begin,
1326 .write_end = ceph_write_end,
1327 .set_page_dirty = ceph_set_page_dirty,
1328 .invalidatepage = ceph_invalidatepage,
1329 .releasepage = ceph_releasepage,
1330 .direct_IO = ceph_direct_io,
1331};
1332
4f7e89f6
YZ
1333static void ceph_block_sigs(sigset_t *oldset)
1334{
1335 sigset_t mask;
1336 siginitsetinv(&mask, sigmask(SIGKILL));
1337 sigprocmask(SIG_BLOCK, &mask, oldset);
1338}
1339
1340static void ceph_restore_sigs(sigset_t *oldset)
1341{
1342 sigprocmask(SIG_SETMASK, oldset, NULL);
1343}
1d3576fd
SW
1344
1345/*
1346 * vm ops
1347 */
24499847 1348static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
61f68816 1349{
11bac800 1350 struct vm_area_struct *vma = vmf->vma;
61f68816
YZ
1351 struct inode *inode = file_inode(vma->vm_file);
1352 struct ceph_inode_info *ci = ceph_inode(inode);
1353 struct ceph_file_info *fi = vma->vm_file->private_data;
c403c3a2 1354 loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT;
24499847 1355 int want, got, err;
4f7e89f6 1356 sigset_t oldset;
24499847 1357 vm_fault_t ret = VM_FAULT_SIGBUS;
4f7e89f6
YZ
1358
1359 ceph_block_sigs(&oldset);
61f68816 1360
8ff2d290
JL
1361 dout("filemap_fault %p %llx.%llx %llu trying to get caps\n",
1362 inode, ceph_vinop(inode), off);
61f68816
YZ
1363 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1364 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1365 else
1366 want = CEPH_CAP_FILE_CACHE;
4f7e89f6
YZ
1367
1368 got = 0;
e72968e1 1369 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1, &got);
24499847 1370 if (err < 0)
4f7e89f6 1371 goto out_restore;
6ce026e4 1372
8ff2d290
JL
1373 dout("filemap_fault %p %llu got cap refs on %s\n",
1374 inode, off, ceph_cap_string(got));
61f68816 1375
83701246 1376 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
2b1ac852 1377 ci->i_inline_version == CEPH_INLINE_NONE) {
5d988308
YZ
1378 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
1379 ceph_add_rw_context(fi, &rw_ctx);
11bac800 1380 ret = filemap_fault(vmf);
5d988308 1381 ceph_del_rw_context(fi, &rw_ctx);
8ff2d290
JL
1382 dout("filemap_fault %p %llu drop cap refs %s ret %x\n",
1383 inode, off, ceph_cap_string(got), ret);
2b1ac852 1384 } else
24499847 1385 err = -EAGAIN;
61f68816 1386
61f68816
YZ
1387 ceph_put_cap_refs(ci, got);
1388
24499847 1389 if (err != -EAGAIN)
4f7e89f6 1390 goto out_restore;
83701246
YZ
1391
1392 /* read inline data */
09cbfeaf 1393 if (off >= PAGE_SIZE) {
83701246
YZ
1394 /* does not support inline data > PAGE_SIZE */
1395 ret = VM_FAULT_SIGBUS;
1396 } else {
83701246 1397 struct address_space *mapping = inode->i_mapping;
057ba5b2
JK
1398 struct page *page;
1399
1400 filemap_invalidate_lock_shared(mapping);
1401 page = find_or_create_page(mapping, 0,
1402 mapping_gfp_constraint(mapping, ~__GFP_FS));
83701246
YZ
1403 if (!page) {
1404 ret = VM_FAULT_OOM;
4f7e89f6 1405 goto out_inline;
83701246 1406 }
24499847 1407 err = __ceph_do_getattr(inode, page,
83701246 1408 CEPH_STAT_CAP_INLINE_DATA, true);
24499847 1409 if (err < 0 || off >= i_size_read(inode)) {
83701246 1410 unlock_page(page);
09cbfeaf 1411 put_page(page);
c64a2b05 1412 ret = vmf_error(err);
4f7e89f6 1413 goto out_inline;
83701246 1414 }
24499847
SJ
1415 if (err < PAGE_SIZE)
1416 zero_user_segment(page, err, PAGE_SIZE);
83701246
YZ
1417 else
1418 flush_dcache_page(page);
1419 SetPageUptodate(page);
1420 vmf->page = page;
1421 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
4f7e89f6 1422out_inline:
057ba5b2 1423 filemap_invalidate_unlock_shared(mapping);
8ff2d290
JL
1424 dout("filemap_fault %p %llu read inline data ret %x\n",
1425 inode, off, ret);
83701246 1426 }
4f7e89f6
YZ
1427out_restore:
1428 ceph_restore_sigs(&oldset);
24499847
SJ
1429 if (err < 0)
1430 ret = vmf_error(err);
6ce026e4 1431
61f68816
YZ
1432 return ret;
1433}
1d3576fd 1434
24499847 1435static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
1d3576fd 1436{
11bac800 1437 struct vm_area_struct *vma = vmf->vma;
496ad9aa 1438 struct inode *inode = file_inode(vma->vm_file);
61f68816
YZ
1439 struct ceph_inode_info *ci = ceph_inode(inode);
1440 struct ceph_file_info *fi = vma->vm_file->private_data;
f66fd9f0 1441 struct ceph_cap_flush *prealloc_cf;
61f68816 1442 struct page *page = vmf->page;
6285bc23 1443 loff_t off = page_offset(page);
61f68816
YZ
1444 loff_t size = i_size_read(inode);
1445 size_t len;
24499847 1446 int want, got, err;
4f7e89f6 1447 sigset_t oldset;
24499847 1448 vm_fault_t ret = VM_FAULT_SIGBUS;
3ca9c3bd 1449
f66fd9f0
YZ
1450 prealloc_cf = ceph_alloc_cap_flush();
1451 if (!prealloc_cf)
6ce026e4 1452 return VM_FAULT_OOM;
f66fd9f0 1453
249c1df5 1454 sb_start_pagefault(inode->i_sb);
4f7e89f6 1455 ceph_block_sigs(&oldset);
f66fd9f0 1456
28127bdd
YZ
1457 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1458 struct page *locked_page = NULL;
1459 if (off == 0) {
1460 lock_page(page);
1461 locked_page = page;
1462 }
24499847 1463 err = ceph_uninline_data(vma->vm_file, locked_page);
28127bdd
YZ
1464 if (locked_page)
1465 unlock_page(locked_page);
24499847 1466 if (err < 0)
f66fd9f0 1467 goto out_free;
28127bdd
YZ
1468 }
1469
8ff2d290
JL
1470 if (off + thp_size(page) <= size)
1471 len = thp_size(page);
1d3576fd 1472 else
8ff2d290 1473 len = offset_in_thp(page, size);
1d3576fd 1474
61f68816
YZ
1475 dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
1476 inode, ceph_vinop(inode), off, len, size);
1477 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1478 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1479 else
1480 want = CEPH_CAP_FILE_BUFFER;
4f7e89f6
YZ
1481
1482 got = 0;
e72968e1 1483 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got);
24499847 1484 if (err < 0)
4f7e89f6 1485 goto out_free;
6ce026e4 1486
61f68816
YZ
1487 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
1488 inode, off, len, ceph_cap_string(got));
1489
1490 /* Update time before taking page lock */
1491 file_update_time(vma->vm_file);
5c308356 1492 inode_inc_iversion_raw(inode);
4af6b225 1493
f0b33df5 1494 do {
d45156bf
JL
1495 struct ceph_snap_context *snapc;
1496
f0b33df5 1497 lock_page(page);
4af6b225 1498
cb03c143 1499 if (page_mkwrite_check_truncate(page, inode) < 0) {
f0b33df5
YZ
1500 unlock_page(page);
1501 ret = VM_FAULT_NOPAGE;
1502 break;
1503 }
1504
d45156bf
JL
1505 snapc = ceph_find_incompatible(page);
1506 if (!snapc) {
f0b33df5
YZ
1507 /* success. we'll keep the page locked. */
1508 set_page_dirty(page);
1509 ret = VM_FAULT_LOCKED;
d45156bf
JL
1510 break;
1511 }
1512
1513 unlock_page(page);
1514
1515 if (IS_ERR(snapc)) {
1516 ret = VM_FAULT_SIGBUS;
1517 break;
f0b33df5 1518 }
d45156bf
JL
1519
1520 ceph_queue_writeback(inode);
1521 err = wait_event_killable(ci->i_cap_wq,
1522 context_is_writeable_or_written(inode, snapc));
1523 ceph_put_snap_context(snapc);
1524 } while (err == 0);
4af6b225 1525
28127bdd
YZ
1526 if (ret == VM_FAULT_LOCKED ||
1527 ci->i_inline_version != CEPH_INLINE_NONE) {
61f68816
YZ
1528 int dirty;
1529 spin_lock(&ci->i_ceph_lock);
28127bdd 1530 ci->i_inline_version = CEPH_INLINE_NONE;
f66fd9f0
YZ
1531 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1532 &prealloc_cf);
61f68816
YZ
1533 spin_unlock(&ci->i_ceph_lock);
1534 if (dirty)
1535 __mark_inode_dirty(inode, dirty);
1536 }
1537
24499847 1538 dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
61f68816 1539 inode, off, len, ceph_cap_string(got), ret);
a8810cdc 1540 ceph_put_cap_refs_async(ci, got);
f66fd9f0 1541out_free:
4f7e89f6 1542 ceph_restore_sigs(&oldset);
249c1df5 1543 sb_end_pagefault(inode->i_sb);
f66fd9f0 1544 ceph_free_cap_flush(prealloc_cf);
24499847
SJ
1545 if (err < 0)
1546 ret = vmf_error(err);
1d3576fd
SW
1547 return ret;
1548}
1549
31c542a1
YZ
1550void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
1551 char *data, size_t len)
1552{
1553 struct address_space *mapping = inode->i_mapping;
1554 struct page *page;
1555
1556 if (locked_page) {
1557 page = locked_page;
1558 } else {
1559 if (i_size_read(inode) == 0)
1560 return;
1561 page = find_or_create_page(mapping, 0,
c62d2555
MH
1562 mapping_gfp_constraint(mapping,
1563 ~__GFP_FS));
31c542a1
YZ
1564 if (!page)
1565 return;
1566 if (PageUptodate(page)) {
1567 unlock_page(page);
09cbfeaf 1568 put_page(page);
31c542a1
YZ
1569 return;
1570 }
1571 }
1572
0668ff52 1573 dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n",
31c542a1
YZ
1574 inode, ceph_vinop(inode), len, locked_page);
1575
1576 if (len > 0) {
1577 void *kaddr = kmap_atomic(page);
1578 memcpy(kaddr, data, len);
1579 kunmap_atomic(kaddr);
1580 }
1581
1582 if (page != locked_page) {
09cbfeaf
KS
1583 if (len < PAGE_SIZE)
1584 zero_user_segment(page, len, PAGE_SIZE);
31c542a1
YZ
1585 else
1586 flush_dcache_page(page);
1587
1588 SetPageUptodate(page);
1589 unlock_page(page);
09cbfeaf 1590 put_page(page);
31c542a1
YZ
1591 }
1592}
1593
28127bdd
YZ
1594int ceph_uninline_data(struct file *filp, struct page *locked_page)
1595{
1596 struct inode *inode = file_inode(filp);
1597 struct ceph_inode_info *ci = ceph_inode(inode);
1598 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1599 struct ceph_osd_request *req;
1600 struct page *page = NULL;
1601 u64 len, inline_version;
1602 int err = 0;
1603 bool from_pagecache = false;
1604
1605 spin_lock(&ci->i_ceph_lock);
1606 inline_version = ci->i_inline_version;
1607 spin_unlock(&ci->i_ceph_lock);
1608
1609 dout("uninline_data %p %llx.%llx inline_version %llu\n",
1610 inode, ceph_vinop(inode), inline_version);
1611
1612 if (inline_version == 1 || /* initial version, no data */
1613 inline_version == CEPH_INLINE_NONE)
1614 goto out;
1615
1616 if (locked_page) {
1617 page = locked_page;
1618 WARN_ON(!PageUptodate(page));
1619 } else if (ceph_caps_issued(ci) &
1620 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
1621 page = find_get_page(inode->i_mapping, 0);
1622 if (page) {
1623 if (PageUptodate(page)) {
1624 from_pagecache = true;
1625 lock_page(page);
1626 } else {
09cbfeaf 1627 put_page(page);
28127bdd
YZ
1628 page = NULL;
1629 }
1630 }
1631 }
1632
1633 if (page) {
1634 len = i_size_read(inode);
09cbfeaf
KS
1635 if (len > PAGE_SIZE)
1636 len = PAGE_SIZE;
28127bdd
YZ
1637 } else {
1638 page = __page_cache_alloc(GFP_NOFS);
1639 if (!page) {
1640 err = -ENOMEM;
1641 goto out;
1642 }
1643 err = __ceph_do_getattr(inode, page,
1644 CEPH_STAT_CAP_INLINE_DATA, true);
1645 if (err < 0) {
1646 /* no inline data */
1647 if (err == -ENODATA)
1648 err = 0;
1649 goto out;
1650 }
1651 len = err;
1652 }
1653
1654 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1655 ceph_vino(inode), 0, &len, 0, 1,
54ea0046 1656 CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
34b759b4 1657 NULL, 0, 0, false);
28127bdd
YZ
1658 if (IS_ERR(req)) {
1659 err = PTR_ERR(req);
1660 goto out;
1661 }
1662
fac02ddf 1663 req->r_mtime = inode->i_mtime;
28127bdd
YZ
1664 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1665 if (!err)
1666 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1667 ceph_osdc_put_request(req);
1668 if (err < 0)
1669 goto out;
1670
1671 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1672 ceph_vino(inode), 0, &len, 1, 3,
54ea0046 1673 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
34b759b4
ID
1674 NULL, ci->i_truncate_seq,
1675 ci->i_truncate_size, false);
28127bdd
YZ
1676 if (IS_ERR(req)) {
1677 err = PTR_ERR(req);
1678 goto out;
1679 }
1680
1681 osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
1682
ec137c10
YZ
1683 {
1684 __le64 xattr_buf = cpu_to_le64(inline_version);
1685 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
1686 "inline_version", &xattr_buf,
1687 sizeof(xattr_buf),
1688 CEPH_OSD_CMPXATTR_OP_GT,
1689 CEPH_OSD_CMPXATTR_MODE_U64);
1690 if (err)
1691 goto out_put;
1692 }
1693
1694 {
1695 char xattr_buf[32];
1696 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
1697 "%llu", inline_version);
1698 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
1699 "inline_version",
1700 xattr_buf, xattr_len, 0, 0);
1701 if (err)
1702 goto out_put;
1703 }
28127bdd 1704
fac02ddf 1705 req->r_mtime = inode->i_mtime;
28127bdd
YZ
1706 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1707 if (!err)
1708 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
97e27aaa 1709
8ae99ae2 1710 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
903f4fec 1711 req->r_end_latency, len, err);
97e27aaa 1712
28127bdd
YZ
1713out_put:
1714 ceph_osdc_put_request(req);
1715 if (err == -ECANCELED)
1716 err = 0;
1717out:
1718 if (page && page != locked_page) {
1719 if (from_pagecache) {
1720 unlock_page(page);
09cbfeaf 1721 put_page(page);
28127bdd
YZ
1722 } else
1723 __free_pages(page, 0);
1724 }
1725
1726 dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
1727 inode, ceph_vinop(inode), inline_version, err);
1728 return err;
1729}
1730
7cbea8dc 1731static const struct vm_operations_struct ceph_vmops = {
61f68816 1732 .fault = ceph_filemap_fault,
1d3576fd
SW
1733 .page_mkwrite = ceph_page_mkwrite,
1734};
1735
1736int ceph_mmap(struct file *file, struct vm_area_struct *vma)
1737{
1738 struct address_space *mapping = file->f_mapping;
1739
1740 if (!mapping->a_ops->readpage)
1741 return -ENOEXEC;
1742 file_accessed(file);
1743 vma->vm_ops = &ceph_vmops;
1d3576fd
SW
1744 return 0;
1745}
10183a69
YZ
1746
1747enum {
1748 POOL_READ = 1,
1749 POOL_WRITE = 2,
1750};
1751
779fe0fb
YZ
1752static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
1753 s64 pool, struct ceph_string *pool_ns)
10183a69
YZ
1754{
1755 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
1756 struct ceph_mds_client *mdsc = fsc->mdsc;
1757 struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
1758 struct rb_node **p, *parent;
1759 struct ceph_pool_perm *perm;
1760 struct page **pages;
779fe0fb 1761 size_t pool_ns_len;
10183a69
YZ
1762 int err = 0, err2 = 0, have = 0;
1763
1764 down_read(&mdsc->pool_perm_rwsem);
1765 p = &mdsc->pool_perm_tree.rb_node;
1766 while (*p) {
1767 perm = rb_entry(*p, struct ceph_pool_perm, node);
1768 if (pool < perm->pool)
1769 p = &(*p)->rb_left;
1770 else if (pool > perm->pool)
1771 p = &(*p)->rb_right;
1772 else {
779fe0fb
YZ
1773 int ret = ceph_compare_string(pool_ns,
1774 perm->pool_ns,
1775 perm->pool_ns_len);
1776 if (ret < 0)
1777 p = &(*p)->rb_left;
1778 else if (ret > 0)
1779 p = &(*p)->rb_right;
1780 else {
1781 have = perm->perm;
1782 break;
1783 }
10183a69
YZ
1784 }
1785 }
1786 up_read(&mdsc->pool_perm_rwsem);
1787 if (*p)
1788 goto out;
1789
779fe0fb
YZ
1790 if (pool_ns)
1791 dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
1792 pool, (int)pool_ns->len, pool_ns->str);
1793 else
1794 dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
10183a69
YZ
1795
1796 down_write(&mdsc->pool_perm_rwsem);
779fe0fb 1797 p = &mdsc->pool_perm_tree.rb_node;
10183a69
YZ
1798 parent = NULL;
1799 while (*p) {
1800 parent = *p;
1801 perm = rb_entry(parent, struct ceph_pool_perm, node);
1802 if (pool < perm->pool)
1803 p = &(*p)->rb_left;
1804 else if (pool > perm->pool)
1805 p = &(*p)->rb_right;
1806 else {
779fe0fb
YZ
1807 int ret = ceph_compare_string(pool_ns,
1808 perm->pool_ns,
1809 perm->pool_ns_len);
1810 if (ret < 0)
1811 p = &(*p)->rb_left;
1812 else if (ret > 0)
1813 p = &(*p)->rb_right;
1814 else {
1815 have = perm->perm;
1816 break;
1817 }
10183a69
YZ
1818 }
1819 }
1820 if (*p) {
1821 up_write(&mdsc->pool_perm_rwsem);
1822 goto out;
1823 }
1824
34b759b4 1825 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
10183a69
YZ
1826 1, false, GFP_NOFS);
1827 if (!rd_req) {
1828 err = -ENOMEM;
1829 goto out_unlock;
1830 }
1831
1832 rd_req->r_flags = CEPH_OSD_FLAG_READ;
1833 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
1834 rd_req->r_base_oloc.pool = pool;
779fe0fb
YZ
1835 if (pool_ns)
1836 rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
d30291b9 1837 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
10183a69 1838
13d1ad16
ID
1839 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
1840 if (err)
1841 goto out_unlock;
10183a69 1842
34b759b4 1843 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
10183a69
YZ
1844 1, false, GFP_NOFS);
1845 if (!wr_req) {
1846 err = -ENOMEM;
1847 goto out_unlock;
1848 }
1849
54ea0046 1850 wr_req->r_flags = CEPH_OSD_FLAG_WRITE;
10183a69 1851 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
63244fa1 1852 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
d30291b9 1853 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
10183a69 1854
13d1ad16
ID
1855 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
1856 if (err)
1857 goto out_unlock;
10183a69
YZ
1858
1859 /* one page should be large enough for STAT data */
1860 pages = ceph_alloc_page_vector(1, GFP_KERNEL);
1861 if (IS_ERR(pages)) {
1862 err = PTR_ERR(pages);
1863 goto out_unlock;
1864 }
1865
1866 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
1867 0, false, true);
10183a69
YZ
1868 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
1869
fac02ddf 1870 wr_req->r_mtime = ci->vfs_inode.i_mtime;
10183a69
YZ
1871 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
1872
1873 if (!err)
1874 err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
1875 if (!err2)
1876 err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);
1877
1878 if (err >= 0 || err == -ENOENT)
1879 have |= POOL_READ;
131d7eb4 1880 else if (err != -EPERM) {
0b98acd6
ID
1881 if (err == -EBLOCKLISTED)
1882 fsc->blocklisted = true;
10183a69 1883 goto out_unlock;
131d7eb4 1884 }
10183a69
YZ
1885
1886 if (err2 == 0 || err2 == -EEXIST)
1887 have |= POOL_WRITE;
1888 else if (err2 != -EPERM) {
0b98acd6
ID
1889 if (err2 == -EBLOCKLISTED)
1890 fsc->blocklisted = true;
10183a69
YZ
1891 err = err2;
1892 goto out_unlock;
1893 }
1894
779fe0fb
YZ
1895 pool_ns_len = pool_ns ? pool_ns->len : 0;
1896 perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
10183a69
YZ
1897 if (!perm) {
1898 err = -ENOMEM;
1899 goto out_unlock;
1900 }
1901
1902 perm->pool = pool;
1903 perm->perm = have;
779fe0fb
YZ
1904 perm->pool_ns_len = pool_ns_len;
1905 if (pool_ns_len > 0)
1906 memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
1907 perm->pool_ns[pool_ns_len] = 0;
1908
10183a69
YZ
1909 rb_link_node(&perm->node, parent, p);
1910 rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
1911 err = 0;
1912out_unlock:
1913 up_write(&mdsc->pool_perm_rwsem);
1914
3ed97d63
ID
1915 ceph_osdc_put_request(rd_req);
1916 ceph_osdc_put_request(wr_req);
10183a69
YZ
1917out:
1918 if (!err)
1919 err = have;
779fe0fb
YZ
1920 if (pool_ns)
1921 dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
1922 pool, (int)pool_ns->len, pool_ns->str, err);
1923 else
1924 dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
10183a69
YZ
1925 return err;
1926}
1927
5e3ded1b 1928int ceph_pool_perm_check(struct inode *inode, int need)
10183a69 1929{
5e3ded1b 1930 struct ceph_inode_info *ci = ceph_inode(inode);
779fe0fb 1931 struct ceph_string *pool_ns;
5e3ded1b 1932 s64 pool;
10183a69
YZ
1933 int ret, flags;
1934
e9b22501
JL
1935 /* Only need to do this for regular files */
1936 if (!S_ISREG(inode->i_mode))
1937 return 0;
1938
80e80fbb
YZ
1939 if (ci->i_vino.snap != CEPH_NOSNAP) {
1940 /*
1941 * Pool permission check needs to write to the first object.
1942 * But for snapshot, head of the first object may have alread
1943 * been deleted. Skip check to avoid creating orphan object.
1944 */
1945 return 0;
1946 }
1947
5e3ded1b 1948 if (ceph_test_mount_opt(ceph_inode_to_client(inode),
10183a69
YZ
1949 NOPOOLPERM))
1950 return 0;
1951
1952 spin_lock(&ci->i_ceph_lock);
1953 flags = ci->i_ceph_flags;
7627151e 1954 pool = ci->i_layout.pool_id;
10183a69
YZ
1955 spin_unlock(&ci->i_ceph_lock);
1956check:
1957 if (flags & CEPH_I_POOL_PERM) {
1958 if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
7627151e 1959 dout("ceph_pool_perm_check pool %lld no read perm\n",
10183a69
YZ
1960 pool);
1961 return -EPERM;
1962 }
1963 if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
7627151e 1964 dout("ceph_pool_perm_check pool %lld no write perm\n",
10183a69
YZ
1965 pool);
1966 return -EPERM;
1967 }
1968 return 0;
1969 }
1970
779fe0fb
YZ
1971 pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
1972 ret = __ceph_pool_perm_get(ci, pool, pool_ns);
1973 ceph_put_string(pool_ns);
10183a69
YZ
1974 if (ret < 0)
1975 return ret;
1976
1977 flags = CEPH_I_POOL_PERM;
1978 if (ret & POOL_READ)
1979 flags |= CEPH_I_POOL_RD;
1980 if (ret & POOL_WRITE)
1981 flags |= CEPH_I_POOL_WR;
1982
1983 spin_lock(&ci->i_ceph_lock);
779fe0fb
YZ
1984 if (pool == ci->i_layout.pool_id &&
1985 pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
1986 ci->i_ceph_flags |= flags;
10183a69 1987 } else {
7627151e 1988 pool = ci->i_layout.pool_id;
10183a69
YZ
1989 flags = ci->i_ceph_flags;
1990 }
1991 spin_unlock(&ci->i_ceph_lock);
1992 goto check;
1993}
1994
1995void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
1996{
1997 struct ceph_pool_perm *perm;
1998 struct rb_node *n;
1999
2000 while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
2001 n = rb_first(&mdsc->pool_perm_tree);
2002 perm = rb_entry(n, struct ceph_pool_perm, node);
2003 rb_erase(n, &mdsc->pool_perm_tree);
2004 kfree(perm);
2005 }
2006}