]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blobdiff - fs/ceph/dir.c
ceph: using hash value to compose dentry offset
[mirror_ubuntu-bionic-kernel.git] / fs / ceph / dir.c
index ebcbd1c946b4c387e9fd59e43bd4943efcaf9ba1..4850c3624a873c024b7ce11aa5ab0a89711b51da 100644 (file)
@@ -69,16 +69,42 @@ out_unlock:
 }
 
 /*
- * for readdir, we encode the directory frag and offset within that
- * frag into f_pos.
+ * for f_pos for readdir:
+ * - hash order:
+ *     (0xff << 52) | ((24 bits hash) << 28) |
+ *     (the nth entry has hash collision);
+ * - frag+name order;
+ *     ((frag value) << 28) | (the nth entry in frag);
  */
+#define OFFSET_BITS    28
+#define OFFSET_MASK    ((1 << OFFSET_BITS) - 1)
+#define HASH_ORDER     (0xffull << (OFFSET_BITS + 24))
+loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
+{
+       loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
+       if (hash_order)
+               fpos |= HASH_ORDER;
+       return fpos;
+}
+
+static bool is_hash_order(loff_t p)
+{
+       return (p & HASH_ORDER) == HASH_ORDER;
+}
+
 static unsigned fpos_frag(loff_t p)
 {
-       return p >> 32;
+       return p >> OFFSET_BITS;
+}
+
+static unsigned fpos_hash(loff_t p)
+{
+       return ceph_frag_value(fpos_frag(p));
 }
+
 static unsigned fpos_off(loff_t p)
 {
-       return p & 0xffffffff;
+       return p & OFFSET_MASK;
 }
 
 static int fpos_cmp(loff_t l, loff_t r)
@@ -177,7 +203,7 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
        u64 idx = 0;
        int err = 0;
 
-       dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
+       dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
 
        /* search start position */
        if (ctx->pos > 2) {
@@ -234,7 +260,7 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
                spin_unlock(&dentry->d_lock);
 
                if (emit_dentry) {
-                       dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
+                       dout(" %llx dentry %p %pd %p\n", di->offset,
                             dentry, dentry, d_inode(dentry));
                        ctx->pos = di->offset;
                        if (!dir_emit(ctx, dentry->d_name.name,
@@ -269,6 +295,16 @@ out:
        return err;
 }
 
+static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
+{
+       if (!fi->last_readdir)
+               return true;
+       if (is_hash_order(pos))
+               return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
+       else
+               return fi->frag != fpos_frag(pos);
+}
+
 static int ceph_readdir(struct file *file, struct dir_context *ctx)
 {
        struct ceph_file_info *fi = file->private_data;
@@ -276,13 +312,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_mds_client *mdsc = fsc->mdsc;
-       unsigned frag = fpos_frag(ctx->pos);
-       int off = fpos_off(ctx->pos);
+       int i;
        int err;
        u32 ftype;
        struct ceph_mds_reply_info_parsed *rinfo;
 
-       dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
+       dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
        if (fi->flags & CEPH_F_ATEND)
                return 0;
 
@@ -294,7 +329,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                            inode->i_mode >> 12))
                        return 0;
                ctx->pos = 1;
-               off = 1;
        }
        if (ctx->pos == 1) {
                ino_t ino = parent_ino(file->f_path.dentry);
@@ -304,7 +338,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                            inode->i_mode >> 12))
                        return 0;
                ctx->pos = 2;
-               off = 2;
        }
 
        /* can we use the dcache? */
@@ -319,8 +352,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                err = __dcache_readdir(file, ctx, shared_gen);
                if (err != -EAGAIN)
                        return err;
-               frag = fpos_frag(ctx->pos);
-               off = fpos_off(ctx->pos);
        } else {
                spin_unlock(&ci->i_ceph_lock);
        }
@@ -328,8 +359,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        /* proceed with a normal readdir */
 more:
        /* do we have the correct frag content buffered? */
-       if (fi->frag != frag || fi->last_readdir == NULL) {
+       if (need_send_readdir(fi, ctx->pos)) {
                struct ceph_mds_request *req;
+               unsigned frag;
                int op = ceph_snap(inode) == CEPH_SNAPDIR ?
                        CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 
@@ -339,6 +371,13 @@ more:
                        fi->last_readdir = NULL;
                }
 
+               if (is_hash_order(ctx->pos)) {
+                       frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
+                                               NULL, NULL);
+               } else {
+                       frag = fpos_frag(ctx->pos);
+               }
+
                dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
                     ceph_vinop(inode), frag, fi->last_name);
                req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
@@ -376,22 +415,26 @@ more:
                        ceph_mdsc_put_request(req);
                        return err;
                }
-               dout("readdir got and parsed readdir result=%d"
-                    " on frag %x, end=%d, complete=%d\n", err, frag,
+               dout("readdir got and parsed readdir result=%d on "
+                    "frag %x, end=%d, complete=%d, hash_order=%d\n",
+                    err, frag,
                     (int)req->r_reply_info.dir_end,
-                    (int)req->r_reply_info.dir_complete);
-
+                    (int)req->r_reply_info.dir_complete,
+                    (int)req->r_reply_info.hash_order);
 
-               /* note next offset and last dentry name */
                rinfo = &req->r_reply_info;
                if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                        frag = le32_to_cpu(rinfo->dir_dir->frag);
-                       off = req->r_readdir_offset;
-                       fi->next_offset = off;
+                       if (!rinfo->hash_order) {
+                               fi->next_offset = req->r_readdir_offset;
+                               /* adjust ctx->pos to beginning of frag */
+                               ctx->pos = ceph_make_fpos(frag,
+                                                         fi->next_offset,
+                                                         false);
+                       }
                }
 
                fi->frag = frag;
-               fi->offset = fi->next_offset;
                fi->last_readdir = req;
 
                if (req->r_did_prepopulate) {
@@ -399,7 +442,8 @@ more:
                        if (fi->readdir_cache_idx < 0) {
                                /* preclude from marking dir ordered */
                                fi->dir_ordered_count = 0;
-                       } else if (ceph_frag_is_leftmost(frag) && off == 2) {
+                       } else if (ceph_frag_is_leftmost(frag) &&
+                                  fi->next_offset == 2) {
                                /* note dir version at start of readdir so
                                 * we can tell if any dentries get dropped */
                                fi->dir_release_count = req->r_dir_release_cnt;
@@ -413,59 +457,87 @@ more:
                        fi->dir_release_count = 0;
                }
 
-               if (req->r_reply_info.dir_end) {
-                       kfree(fi->last_name);
-                       fi->last_name = NULL;
-                       fi->next_offset = 2;
-               } else {
+               /* note next offset and last dentry name */
+               if (rinfo->dir_nr > 0) {
                        struct ceph_mds_reply_dir_entry *rde =
                                        rinfo->dir_entries + (rinfo->dir_nr-1);
+                       unsigned next_offset = req->r_reply_info.dir_end ?
+                                       2 : (fpos_off(rde->offset) + 1);
                        err = note_last_dentry(fi, rde->name, rde->name_len,
-                                      fi->next_offset + rinfo->dir_nr);
+                                              next_offset);
                        if (err)
                                return err;
+               } else if (req->r_reply_info.dir_end) {
+                       fi->next_offset = 2;
+                       /* keep last name */
                }
        }
 
        rinfo = &fi->last_readdir->r_reply_info;
-       dout("readdir frag %x num %d off %d chunkoff %d\n", frag,
-            rinfo->dir_nr, off, fi->offset);
+       dout("readdir frag %x num %d pos %llx chunk first %llx\n",
+            fi->frag, rinfo->dir_nr, ctx->pos,
+            rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
 
-       ctx->pos = ceph_make_fpos(frag, off);
-       while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) {
-               struct ceph_mds_reply_dir_entry *rde =
-                       rinfo->dir_entries + (off - fi->offset);
+       i = 0;
+       /* search start position */
+       if (rinfo->dir_nr > 0) {
+               int step, nr = rinfo->dir_nr;
+               while (nr > 0) {
+                       step = nr >> 1;
+                       if (rinfo->dir_entries[i + step].offset < ctx->pos) {
+                               i +=  step + 1;
+                               nr -= step + 1;
+                       } else {
+                               nr = step;
+                       }
+               }
+       }
+       for (; i < rinfo->dir_nr; i++) {
+               struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
                struct ceph_vino vino;
                ino_t ino;
 
-               dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
-                    off, off - fi->offset, rinfo->dir_nr, ctx->pos,
+               BUG_ON(rde->offset < ctx->pos);
+
+               ctx->pos = rde->offset;
+               dout("readdir (%d/%d) -> %llx '%.*s' %p\n",
+                    i, rinfo->dir_nr, ctx->pos,
                     rde->name_len, rde->name, &rde->inode.in);
+
                BUG_ON(!rde->inode.in);
                ftype = le32_to_cpu(rde->inode.in->mode) >> 12;
                vino.ino = le64_to_cpu(rde->inode.in->ino);
                vino.snap = le64_to_cpu(rde->inode.in->snapid);
                ino = ceph_vino_to_ino(vino);
+
                if (!dir_emit(ctx, rde->name, rde->name_len,
                              ceph_translate_ino(inode->i_sb, ino), ftype)) {
                        dout("filldir stopping us...\n");
                        return 0;
                }
-               off++;
                ctx->pos++;
        }
 
-       if (fi->last_name) {
+       if (fi->next_offset > 2) {
                ceph_mdsc_put_request(fi->last_readdir);
                fi->last_readdir = NULL;
                goto more;
        }
 
        /* more frags? */
-       if (!ceph_frag_is_rightmost(frag)) {
-               frag = ceph_frag_next(frag);
-               off = 2;
-               ctx->pos = ceph_make_fpos(frag, off);
+       if (!ceph_frag_is_rightmost(fi->frag)) {
+               unsigned frag = ceph_frag_next(fi->frag);
+               if (is_hash_order(ctx->pos)) {
+                       loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
+                                                       fi->next_offset, true);
+                       if (new_pos > ctx->pos)
+                               ctx->pos = new_pos;
+                       /* keep last_name */
+               } else {
+                       ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
+                       kfree(fi->last_name);
+                       fi->last_name = NULL;
+               }
                dout("readdir next frag is %x\n", frag);
                goto more;
        }
@@ -497,7 +569,7 @@ more:
        return 0;
 }
 
-static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
+static void reset_readdir(struct ceph_file_info *fi)
 {
        if (fi->last_readdir) {
                ceph_mdsc_put_request(fi->last_readdir);
@@ -511,11 +583,34 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
        fi->flags &= ~CEPH_F_ATEND;
 }
 
+/*
+ * discard buffered readdir content on seekdir(0), or seek to new frag,
+ * or seek prior to current chunk
+ */
+static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
+{
+       struct ceph_mds_reply_info_parsed *rinfo;
+       loff_t chunk_offset;
+       if (new_pos == 0)
+               return true;
+       if (is_hash_order(new_pos)) {
+               /* no need to reset last_name for a forward seek when
+                * dentries are sotred in hash order */
+       } else if (fi->frag |= fpos_frag(new_pos)) {
+               return true;
+       }
+       rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
+       if (!rinfo || !rinfo->dir_nr)
+               return true;
+       chunk_offset = rinfo->dir_entries[0].offset;
+       return new_pos < chunk_offset ||
+              is_hash_order(new_pos) != is_hash_order(chunk_offset);
+}
+
 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
 {
        struct ceph_file_info *fi = file->private_data;
        struct inode *inode = file->f_mapping->host;
-       loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);
        loff_t retval;
 
        inode_lock(inode);
@@ -532,25 +627,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
        }
 
        if (offset >= 0) {
+               if (need_reset_readdir(fi, offset)) {
+                       dout("dir_llseek dropping %p content\n", file);
+                       reset_readdir(fi);
+               } else if (is_hash_order(offset) && offset > file->f_pos) {
+                       /* for hash offset, we don't know if a forward seek
+                        * is within same frag */
+                       fi->dir_release_count = 0;
+                       fi->readdir_cache_idx = -1;
+               }
+
                if (offset != file->f_pos) {
                        file->f_pos = offset;
                        file->f_version = 0;
                        fi->flags &= ~CEPH_F_ATEND;
                }
                retval = offset;
-
-               if (offset == 0 ||
-                   fpos_frag(offset) != fi->frag ||
-                   fpos_off(offset) < fi->offset) {
-                       /* discard buffered readdir content on seekdir(0), or
-                        * seek to new frag, or seek prior to current chunk */
-                       dout("dir_llseek dropping %p content\n", file);
-                       reset_readdir(fi, fpos_frag(offset));
-               } else if (fpos_cmp(offset, old_offset) > 0) {
-                       /* reset dir_release_count if we did a forward seek */
-                       fi->dir_release_count = 0;
-                       fi->readdir_cache_idx = -1;
-               }
        }
 out:
        inode_unlock(inode);