]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PrimaryLogPG.cc
update sources to v12.1.0
[ceph.git] / ceph / src / osd / PrimaryLogPG.cc
index 8c4f08c7090dd738ee3fcbbe61684e8593814e77..b50624963a5c30fe6b3f297d4c2d8dc7da5e4008 100644 (file)
@@ -1877,15 +1877,15 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     }
   }
 
-  if (op->includes_pg_op()) {
-    return do_pg_op(op);
-  }
-
   if (!op_has_sufficient_caps(op)) {
     osd->reply_op_error(op, -EPERM);
     return;
   }
 
+  if (op->includes_pg_op()) {
+    return do_pg_op(op);
+  }
+
   // object name too long?
   if (m->get_oid().name.size() > cct->_conf->osd_max_object_name_len) {
     dout(4) << "do_op name is longer than "
@@ -2150,6 +2150,13 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
       return;
   }
 
+  if (obc.get() && obc->obs.exists && obc->obs.oi.has_manifest()) {
+    if (maybe_handle_manifest(op,
+                              write_ordered,
+                              obc))
+    return;
+  }
+
   if (maybe_handle_cache(op,
                         write_ordered,
                         obc,
@@ -2168,7 +2175,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     }
     dout(20) << __func__ << "find_object_context got error " << r << dendl;
     if (op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       record_write_error(op, oid, nullptr, r);
     } else {
       osd->reply_op_error(op, r);
@@ -2245,7 +2252,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     dout(20) << __func__ << " returned an error: " << r << dendl;
     close_op_ctx(ctx);
     if (op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       record_write_error(op, oid, nullptr, r);
     } else {
       osd->reply_op_error(op, r);
@@ -2296,6 +2303,53 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
   // force recovery of the oldest missing object if too many logs
   maybe_force_recovery();
 }
+PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
+  OpRequestRef op,
+  bool write_ordered,
+  ObjectContextRef obc)
+{
+  if (static_cast<const MOSDOp *>(op->get_req())->get_flags() &
+      CEPH_OSD_FLAG_IGNORE_REDIRECT) {
+    dout(20) << __func__ << ": ignoring redirect due to flag" << dendl;
+    return cache_result_t::NOOP;
+  }
+
+  if (obc)
+    dout(10) << __func__ << " " << obc->obs.oi << " "
+       << (obc->obs.exists ? "exists" : "DNE")
+       << dendl;
+
+  // if it is write-ordered and blocked, stop now
+  if (obc.get() && obc->is_blocked() && write_ordered) {
+    // we're already doing something with this object
+    dout(20) << __func__ << " blocked on " << obc->obs.oi.soid << dendl;
+    return cache_result_t::NOOP;
+  }
+
+  vector<OSDOp> ops = static_cast<const MOSDOp*>(op->get_req())->ops;
+  for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
+    OSDOp& osd_op = *p;
+    ceph_osd_op& op = osd_op.op;
+    if (op.op == CEPH_OSD_OP_SET_REDIRECT) {
+      return cache_result_t::NOOP;
+    }
+  }
+
+  switch (obc->obs.oi.manifest.type) {
+  case object_manifest_t::TYPE_REDIRECT:
+    if (op->may_write() || write_ordered) {
+      do_proxy_write(op, obc->obs.oi.soid, obc);
+    } else {
+      do_proxy_read(op, obc);
+    }
+    return cache_result_t::HANDLED_PROXY;
+  case object_manifest_t::TYPE_CHUNKED:
+  default:
+    assert(0 == "unrecognized manifest type");
+  }
+
+  return cache_result_t::NOOP;
+}
 
 void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
                                      MOSDOpReply *orig_reply, int r)
@@ -2304,7 +2358,7 @@ void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
   assert(op->may_write());
   const osd_reqid_t &reqid = static_cast<const MOSDOp*>(op->get_req())->get_reqid();
   ObjectContextRef obc;
-  mempool::osd::list<pg_log_entry_t> entries;
+  mempool::osd_pglog::list<pg_log_entry_t> entries;
   entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, soid,
                                   get_next_version(), eversion_t(), 0,
                                   reqid, utime_t(), r));
@@ -2657,15 +2711,30 @@ struct C_ProxyRead : public Context {
   }
 };
 
-void PrimaryLogPG::do_proxy_read(OpRequestRef op)
+void PrimaryLogPG::do_proxy_read(OpRequestRef op, ObjectContextRef obc)
 {
   // NOTE: non-const here because the ProxyReadOp needs mutable refs to
   // stash the result in the request's OSDOp vector
   MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
-  object_locator_t oloc(m->get_object_locator());
-  oloc.pool = pool.info.tier_of;
-
-  const hobject_t& soid = m->get_hobj();
+  object_locator_t oloc;
+  hobject_t soid;
+  /* extensible tier */
+  if (obc && obc->obs.exists && obc->obs.oi.has_manifest()) {
+    switch (obc->obs.oi.manifest.type) {
+      case object_manifest_t::TYPE_REDIRECT:
+         oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
+         soid = obc->obs.oi.manifest.redirect_target;  
+         break;
+      case object_manifest_t::TYPE_CHUNKED:
+      default:
+       assert(0 == "unrecognized manifest type");
+    }
+  } else {
+  /* proxy */
+    soid = m->get_hobj();
+    oloc = object_locator_t(m->get_object_locator());
+    oloc.pool = pool.info.tier_of;
+  }
   unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
 
   // pass through some original flags that make sense.
@@ -2849,16 +2918,35 @@ struct C_ProxyWrite_Commit : public Context {
   }
 };
 
-void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid)
+void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc)
 {
   // NOTE: non-const because ProxyWriteOp takes a mutable ref
   MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
-  object_locator_t oloc(m->get_object_locator());
-  oloc.pool = pool.info.tier_of;
+  object_locator_t oloc;
   SnapContext snapc(m->get_snap_seq(), m->get_snaps());
+  hobject_t soid;
+  /* extensible tier */
+  if (obc && obc->obs.exists && obc->obs.oi.has_manifest()) {
+    switch (obc->obs.oi.manifest.type) {
+      case object_manifest_t::TYPE_REDIRECT:
+         oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
+         soid = obc->obs.oi.manifest.redirect_target;  
+         break;
+      case object_manifest_t::TYPE_CHUNKED:
+      default:
+       assert(0 == "unrecognized manifest type");
+    }
+  } else {
+  /* proxy */
+    soid = m->get_hobj();
+    oloc = object_locator_t(m->get_object_locator());
+    oloc.pool = pool.info.tier_of;
+  }
 
-  const hobject_t& soid = m->get_hobj();
   unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
+  if (!(op->may_write() || op->may_cache())) {
+    flags |= CEPH_OSD_FLAG_RWORDERED;
+  }
   dout(10) << __func__ << " Start proxy write for " << *m << dendl;
 
   ProxyWriteOpRef pwop(std::make_shared<ProxyWriteOp>(op, soid, m->ops, m->get_reqid()));
@@ -3546,7 +3634,7 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
   SnapSet& snapset = obc->ssc->snapset;
 
   bool legacy = snapset.is_legacy() ||
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+    get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
 
   object_info_t &coi = obc->obs.oi;
   set<snapid_t> old_snaps;
@@ -3679,6 +3767,9 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
       coid,
       old_snaps,
       new_snaps);
+
+    coi = object_info_t(coid);
+
     ctx->at_version.version++;
   } else {
     // save adjusted snaps for this object
@@ -3747,20 +3838,19 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
       ctx->delta_stats.num_objects--;
       if (oi.is_dirty()) {
        ctx->delta_stats.num_objects_dirty--;
-       oi.clear_flag(object_info_t::FLAG_DIRTY);
       }
       if (oi.is_omap())
        ctx->delta_stats.num_objects_omap--;
       if (oi.is_whiteout()) {
        dout(20) << __func__ << " trimming whiteout on " << oi.soid << dendl;
        ctx->delta_stats.num_whiteouts--;
-       oi.clear_flag(object_info_t::FLAG_WHITEOUT);
       }
-      if (oi.is_cache_pinned())
+      if (oi.is_cache_pinned()) {
        ctx->delta_stats.num_objects_pinned--;
+      }
     }
     ctx->snapset_obc->obs.exists = false;
-    
+    ctx->snapset_obc->obs.oi = object_info_t(snapoid);
     t->remove(snapoid);
   } else {
     dout(10) << coid << " filtering snapset on " << snapoid << dendl;
@@ -4576,6 +4666,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
     case CEPH_OSD_OP_COPY_FROM:  // we handle user_version update explicitly
     case CEPH_OSD_OP_CACHE_PIN:
     case CEPH_OSD_OP_CACHE_UNPIN:
+    case CEPH_OSD_OP_SET_REDIRECT:
       break;
     default:
       if (op.op & CEPH_OSD_OP_MODE_WR)
@@ -5831,6 +5922,75 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
 
+    case CEPH_OSD_OP_SET_REDIRECT:
+      ++ctx->num_write;
+      {
+       if (pool.info.is_tier()) {
+         result = -EINVAL;
+         break;
+       }
+       if (!obs.exists) {
+         result = -ENOENT;
+         break;
+       }
+       if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+         result = -EOPNOTSUPP;
+         break;
+       }
+
+       object_t target_name;
+       object_locator_t target_oloc;
+       snapid_t target_snapid = (uint64_t)op.copy_from.snapid;
+       version_t target_version = op.copy_from.src_version;
+       try {
+         ::decode(target_name, bp);
+         ::decode(target_oloc, bp);
+       }
+       catch (buffer::error& e) {
+         result = -EINVAL;
+         goto fail;
+       }
+       pg_t raw_pg;
+       get_osdmap()->object_locator_to_pg(target_name, target_oloc, raw_pg);
+       hobject_t target(target_name, target_oloc.key, target_snapid,
+               raw_pg.ps(), raw_pg.pool(),
+               target_oloc.nspace);
+       if (target == soid) {
+         dout(20) << " set-redirect self is invalid" << dendl;
+         result = -EINVAL;
+         break;
+       }
+        oi.set_flag(object_info_t::FLAG_MANIFEST);
+       oi.manifest.redirect_target = target;
+       oi.manifest.type = object_manifest_t::TYPE_REDIRECT;
+       t->truncate(soid, 0);
+       if (oi.is_omap() && pool.info.supports_omap()) {
+         t->omap_clear(soid);
+         obs.oi.clear_omap_digest();
+         obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+       }
+       ctx->delta_stats.num_bytes -= oi.size;
+       oi.size = 0;
+       oi.new_object();
+       oi.user_version = target_version;
+       ctx->user_at_version = target_version;
+       /* rm_attrs */
+       map<string,bufferlist> rmattrs;
+       result = getattrs_maybe_cache(ctx->obc,
+                   &rmattrs,
+                   true);
+       if (result < 0) {
+         return result;
+       }
+       map<string, bufferlist>::iterator iter;
+       for (iter = rmattrs.begin(); iter != rmattrs.end(); ++iter) {
+         const string& name = iter->first;
+         t->rmattr(soid, name);
+       }
+       dout(10) << "set-redirect oid:" << oi.soid << " user_version: " << oi.user_version << dendl;
+      }
+
+      break;
 
       // -- object attrs --
       
@@ -6476,7 +6636,7 @@ inline int PrimaryLogPG::_delete_oid(
     whiteout = true;
   }
   bool legacy;
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
     legacy = false;
     // in luminous or later, we can't delete the head if there are
     // clones. we trust the caller passing no_whiteout has already
@@ -6586,16 +6746,26 @@ int PrimaryLogPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
   }
   {
     ObjectContextRef promote_obc;
-    switch (
-      maybe_handle_cache_detail(
-       ctx->op,
-       true,
-       rollback_to,
-       ret,
-       missing_oid,
-       true,
-       false,
-       &promote_obc)) {
+    cache_result_t tier_mode_result;
+    if (obs.exists && obs.oi.has_manifest()) { 
+      tier_mode_result = 
+       maybe_handle_manifest_detail(
+         ctx->op,
+         true,
+         rollback_to);
+    } else {
+      tier_mode_result = 
+       maybe_handle_cache_detail(
+         ctx->op,
+         true,
+         rollback_to,
+         ret,
+         missing_oid,
+         true,
+         false,
+         &promote_obc);
+    }
+    switch (tier_mode_result) {
     case cache_result_t::NOOP:
       break;
     case cache_result_t::BLOCKED_PROMOTE:
@@ -6807,7 +6977,7 @@ void PrimaryLogPG::make_writeable(OpContext *ctx)
     snap_oi->copy_user_bits(ctx->obs->oi);
 
     bool legacy = ctx->new_snapset.is_legacy() ||
-      !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+      get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
     if (legacy) {
       snap_oi->legacy_snaps = snaps;
     }
@@ -6868,7 +7038,7 @@ void PrimaryLogPG::make_writeable(OpContext *ctx)
   // update snapset with latest snap context
   ctx->new_snapset.seq = snapc.seq;
   ctx->new_snapset.snaps = snapc.snaps;
-  if (!get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+  if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
     // pessimistic assumption that this is a net-new legacy SnapSet
     ctx->delta_stats.num_legacy_snapsets++;
     ctx->new_snapset.head_exists = ctx->new_obs.exists;
@@ -7056,7 +7226,7 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   int result = do_osd_ops(ctx, ctx->ops);
   if (result < 0) {
     if (ctx->op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       // need to save the error code in the pg log, to detect dup ops,
       // but do nothing else
       ctx->update_log_only = true;
@@ -7068,7 +7238,7 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   if (ctx->op_t->empty() && !ctx->modify) {
     unstable_stats.add(ctx->delta_stats);
     if (ctx->op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       ctx->update_log_only = true;
     }
     return result;
@@ -7147,7 +7317,7 @@ void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
                        info.pgid.pool(), soid.get_namespace());
       dout(10) << " final snapset " << ctx->new_snapset
               << " in " << snapoid << dendl;
-      assert(!get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS));
+      assert(get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
       ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, snapoid,
                                        ctx->at_version,
                                        eversion_t(),
@@ -7235,7 +7405,7 @@ void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
   }
 
   bool legacy_snapset = ctx->new_snapset.is_legacy() ||
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+    get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
 
   // append to log
   ctx->log.push_back(pg_log_entry_t(log_op_type, soid, ctx->at_version,
@@ -8147,7 +8317,7 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
   tctx->extra_reqids = results->reqids;
 
   bool legacy_snapset = tctx->new_snapset.is_legacy() ||
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+    get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
 
   if (whiteout) {
     // create a whiteout
@@ -8200,7 +8370,7 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     assert(tctx->new_obs.oi.soid.snap == CEPH_NOSNAP);
     tctx->new_snapset.from_snap_set(
       results->snapset,
-      !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS));
+      get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
   }
   tctx->new_snapset.head_exists = true;
   dout(20) << __func__ << " new_snapset " << tctx->new_snapset << dendl;
@@ -8785,7 +8955,12 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
   if (is_primary()) {
     if (scrubber.active) {
       if (last_update_applied == scrubber.subset_last_update) {
-        requeue_scrub();
+        if (ops_blocked_by_scrub()) {
+          requeue_scrub(true);
+        } else {
+          requeue_scrub(false);
+        }
+
       }
     } else {
       assert(scrubber.start == scrubber.end);
@@ -9045,7 +9220,7 @@ void PrimaryLogPG::simple_opc_submit(OpContextUPtr ctx)
 
 
 void PrimaryLogPG::submit_log_entries(
-  const mempool::osd::list<pg_log_entry_t> &entries,
+  const mempool::osd_pglog::list<pg_log_entry_t> &entries,
   ObcLockManager &&manager,
   boost::optional<std::function<void(void)> > &&_on_complete,
   OpRequestRef op,
@@ -9062,7 +9237,7 @@ void PrimaryLogPG::submit_log_entries(
 
   boost::intrusive_ptr<RepGather> repop;
   boost::optional<std::function<void(void)> > on_complete;
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
     repop = new_repop(
       version,
       r,
@@ -9088,7 +9263,7 @@ void PrimaryLogPG::submit_log_entries(
        if (peer == pg_whoami) continue;
        assert(peer_missing.count(peer));
        assert(peer_info.count(peer));
-       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
          assert(repop);
          MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
            entries,
@@ -9112,7 +9287,7 @@ void PrimaryLogPG::submit_log_entries(
            peer.osd, m, get_osdmap()->get_epoch());
        }
       }
-      if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+      if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
        ceph_tid_t rep_tid = repop->rep_tid;
        waiting_on.insert(pg_whoami);
        log_entry_update_waiting_on.insert(
@@ -10016,7 +10191,11 @@ void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
   // requeue an active chunky scrub waiting on recovery ops
   if (!deleting && active_pushes == 0
       && scrubber.is_chunky_scrub_active()) {
-    requeue_scrub();
+    if (ops_blocked_by_scrub()) {
+      requeue_scrub(true);
+    } else {
+      requeue_scrub(false);
+    }
   }
 
   unlock();
@@ -10149,7 +10328,7 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
       unlock();
     });
 
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
     t.register_on_commit(complete);
   } else {
     /* Hack to work around the fact that ReplicatedBackend sends
@@ -10215,7 +10394,7 @@ void PrimaryLogPG::mark_all_unfound_lost(
   pg_log.get_log().print(*_dout);
   *_dout << dendl;
 
-  mempool::osd::list<pg_log_entry_t> log_entries;
+  mempool::osd_pglog::list<pg_log_entry_t> log_entries;
 
   utime_t mtime = ceph_clock_now();
   map<hobject_t, pg_missing_item>::const_iterator m =
@@ -10265,7 +10444,7 @@ void PrimaryLogPG::mark_all_unfound_lost(
       {
        pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need,
                         0, osd_reqid_t(), mtime, 0);
-       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
          if (pool.info.require_rollback()) {
            e.mod_desc.try_rmobject(v.version);
          } else {
@@ -10444,6 +10623,10 @@ void PrimaryLogPG::on_shutdown()
   cancel_proxy_ops(false);
   apply_and_flush_repops(false);
   cancel_log_updates();
+  // we must remove PGRefs, so do this this prior to release_backoffs() callers
+  clear_backoffs(); 
+  // clean up snap trim references
+  snap_trimmer_machine.process_event(Reset());
 
   pgbackend->on_change();
 
@@ -13329,7 +13512,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
          head_error.set_head_mismatch();
        }
 
-       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
          if (soid.is_snapdir()) {
            dout(10) << " will move snapset to head from " << soid << dendl;
            snapset_to_repair[soid.get_head()] = *snapset;