]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blobdiff - fs/ceph/dir.c
ceph: define 'end/complete' in readdir reply as bit flags
[mirror_ubuntu-bionic-kernel.git] / fs / ceph / dir.c
index 4fb2bbc2a2722af6e9ccf84cf9fc80ce7f3edbeb..ebcbd1c946b4c387e9fd59e43bd4943efcaf9ba1 100644 (file)
@@ -110,6 +110,50 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name,
        return 0;
 }
 
+
+static struct dentry *
+__dcache_find_get_entry(struct dentry *parent, u64 idx,
+                       struct ceph_readdir_cache_control *cache_ctl)
+{
+       struct inode *dir = d_inode(parent);
+       struct dentry *dentry;
+       unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1;
+       loff_t ptr_pos = idx * sizeof(struct dentry *);
+       pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT;
+
+       if (ptr_pos >= i_size_read(dir))
+               return NULL;
+
+       if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) {
+               ceph_readdir_cache_release(cache_ctl);
+               cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff);
+               if (!cache_ctl->page) {
+                       dout(" page %lu not found\n", ptr_pgoff);
+                       return ERR_PTR(-EAGAIN);
+               }
+               /* reading/filling the cache are serialized by
+                  i_mutex, no need to use page lock */
+               unlock_page(cache_ctl->page);
+               cache_ctl->dentries = kmap(cache_ctl->page);
+       }
+
+       cache_ctl->index = idx & idx_mask;
+
+       rcu_read_lock();
+       spin_lock(&parent->d_lock);
+       /* check i_size again here, because empty directory can be
+        * marked as complete while not holding the i_mutex. */
+       if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
+               dentry = cache_ctl->dentries[cache_ctl->index];
+       else
+               dentry = NULL;
+       spin_unlock(&parent->d_lock);
+       if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
+               dentry = NULL;
+       rcu_read_unlock();
+       return dentry ? : ERR_PTR(-EAGAIN);
+}
+
 /*
  * When possible, we try to satisfy a readdir by peeking at the
  * dcache.  We make this work by carefully ordering dentries on
@@ -129,68 +173,61 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
        struct inode *dir = d_inode(parent);
        struct dentry *dentry, *last = NULL;
        struct ceph_dentry_info *di;
-       unsigned nsize = PAGE_SIZE / sizeof(struct dentry *);
-       int err = 0;
-       loff_t ptr_pos = 0;
        struct ceph_readdir_cache_control cache_ctl = {};
+       u64 idx = 0;
+       int err = 0;
 
        dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
 
-       /* we can calculate cache index for the first dirfrag */
-       if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
-               cache_ctl.index = fpos_off(ctx->pos) - 2;
-               BUG_ON(cache_ctl.index < 0);
-               ptr_pos = cache_ctl.index * sizeof(struct dentry *);
+       /* search start position */
+       if (ctx->pos > 2) {
+               u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *));
+               while (count > 0) {
+                       u64 step = count >> 1;
+                       dentry = __dcache_find_get_entry(parent, idx + step,
+                                                        &cache_ctl);
+                       if (!dentry) {
+                               /* use linar search */
+                               idx = 0;
+                               break;
+                       }
+                       if (IS_ERR(dentry)) {
+                               err = PTR_ERR(dentry);
+                               goto out;
+                       }
+                       di = ceph_dentry(dentry);
+                       spin_lock(&dentry->d_lock);
+                       if (fpos_cmp(di->offset, ctx->pos) < 0) {
+                               idx += step + 1;
+                               count -= step + 1;
+                       } else {
+                               count = step;
+                       }
+                       spin_unlock(&dentry->d_lock);
+                       dput(dentry);
+               }
+
+               dout("__dcache_readdir %p cache idx %llu\n", dir, idx);
        }
 
-       while (true) {
-               pgoff_t pgoff;
-               bool emit_dentry;
 
-               if (ptr_pos >= i_size_read(dir)) {
+       for (;;) {
+               bool emit_dentry = false;
+               dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl);
+               if (!dentry) {
                        fi->flags |= CEPH_F_ATEND;
                        err = 0;
                        break;
                }
-
-               err = -EAGAIN;
-               pgoff = ptr_pos >> PAGE_SHIFT;
-               if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
-                       ceph_readdir_cache_release(&cache_ctl);
-                       cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
-                       if (!cache_ctl.page) {
-                               dout(" page %lu not found\n", pgoff);
-                               break;
-                       }
-                       /* reading/filling the cache are serialized by
-                        * i_mutex, no need to use page lock */
-                       unlock_page(cache_ctl.page);
-                       cache_ctl.dentries = kmap(cache_ctl.page);
+               if (IS_ERR(dentry)) {
+                       err = PTR_ERR(dentry);
+                       goto out;
                }
 
-               rcu_read_lock();
-               spin_lock(&parent->d_lock);
-               /* check i_size again here, because empty directory can be
-                * marked as complete while not holding the i_mutex. */
-               if (ceph_dir_is_complete_ordered(dir) &&
-                   ptr_pos < i_size_read(dir))
-                       dentry = cache_ctl.dentries[cache_ctl.index % nsize];
-               else
-                       dentry = NULL;
-               spin_unlock(&parent->d_lock);
-               if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
-                       dentry = NULL;
-               rcu_read_unlock();
-               if (!dentry)
-                       break;
-
-               emit_dentry = false;
                di = ceph_dentry(dentry);
                spin_lock(&dentry->d_lock);
                if (di->lease_shared_gen == shared_gen &&
                    d_really_is_positive(dentry) &&
-                   ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
-                   ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
                    fpos_cmp(ctx->pos, di->offset) <= 0) {
                        emit_dentry = true;
                }
@@ -217,10 +254,8 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
                } else {
                        dput(dentry);
                }
-
-               cache_ctl.index++;
-               ptr_pos += sizeof(struct dentry *);
        }
+out:
        ceph_readdir_cache_release(&cache_ctl);
        if (last) {
                int ret;
@@ -330,6 +365,8 @@ more:
                req->r_readdir_cache_idx = fi->readdir_cache_idx;
                req->r_readdir_offset = fi->next_offset;
                req->r_args.readdir.frag = cpu_to_le32(frag);
+               req->r_args.readdir.flags =
+                               cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);
 
                req->r_inode = inode;
                ihold(inode);
@@ -379,14 +416,11 @@ more:
                if (req->r_reply_info.dir_end) {
                        kfree(fi->last_name);
                        fi->last_name = NULL;
-                       if (ceph_frag_is_rightmost(frag))
-                               fi->next_offset = 2;
-                       else
-                               fi->next_offset = 0;
+                       fi->next_offset = 2;
                } else {
-                       err = note_last_dentry(fi,
-                                      rinfo->dir_dname[rinfo->dir_nr-1],
-                                      rinfo->dir_dname_len[rinfo->dir_nr-1],
+                       struct ceph_mds_reply_dir_entry *rde =
+                                       rinfo->dir_entries + (rinfo->dir_nr-1);
+                       err = note_last_dentry(fi, rde->name, rde->name_len,
                                       fi->next_offset + rinfo->dir_nr);
                        if (err)
                                return err;
@@ -399,24 +433,21 @@ more:
 
        ctx->pos = ceph_make_fpos(frag, off);
        while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) {
-               struct ceph_mds_reply_inode *in =
-                       rinfo->dir_in[off - fi->offset].in;
+               struct ceph_mds_reply_dir_entry *rde =
+                       rinfo->dir_entries + (off - fi->offset);
                struct ceph_vino vino;
                ino_t ino;
 
                dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
                     off, off - fi->offset, rinfo->dir_nr, ctx->pos,
-                    rinfo->dir_dname_len[off - fi->offset],
-                    rinfo->dir_dname[off - fi->offset], in);
-               BUG_ON(!in);
-               ftype = le32_to_cpu(in->mode) >> 12;
-               vino.ino = le64_to_cpu(in->ino);
-               vino.snap = le64_to_cpu(in->snapid);
+                    rde->name_len, rde->name, &rde->inode.in);
+               BUG_ON(!rde->inode.in);
+               ftype = le32_to_cpu(rde->inode.in->mode) >> 12;
+               vino.ino = le64_to_cpu(rde->inode.in->ino);
+               vino.snap = le64_to_cpu(rde->inode.in->snapid);
                ino = ceph_vino_to_ino(vino);
-               if (!dir_emit(ctx,
-                           rinfo->dir_dname[off - fi->offset],
-                           rinfo->dir_dname_len[off - fi->offset],
-                           ceph_translate_ino(inode->i_sb, ino), ftype)) {
+               if (!dir_emit(ctx, rde->name, rde->name_len,
+                             ceph_translate_ino(inode->i_sb, ino), ftype)) {
                        dout("filldir stopping us...\n");
                        return 0;
                }
@@ -433,7 +464,7 @@ more:
        /* more frags? */
        if (!ceph_frag_is_rightmost(frag)) {
                frag = ceph_frag_next(frag);
-               off = 0;
+               off = 2;
                ctx->pos = ceph_make_fpos(frag, off);
                dout("readdir next frag is %x\n", frag);
                goto more;
@@ -476,10 +507,7 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
        fi->last_name = NULL;
        fi->dir_release_count = 0;
        fi->readdir_cache_idx = -1;
-       if (ceph_frag_is_leftmost(frag))
-               fi->next_offset = 2;  /* compensate for . and .. */
-       else
-               fi->next_offset = 0;
+       fi->next_offset = 2;  /* compensate for . and .. */
        fi->flags &= ~CEPH_F_ATEND;
 }