]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/os/alienstore/alien_store.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / os / alienstore / alien_store.cc
index cb55532545b47cdc90bb992cae5b962514dbd0cd..904540a4c44ae512e1cd7e0f4733f587268a8307 100644 (file)
 #include "common/ceph_context.h"
 #include "global/global_context.h"
 #include "include/Context.h"
-#include "os/bluestore/BlueStore.h"
 #include "os/ObjectStore.h"
 #include "os/Transaction.h"
 
+#include "crimson/common/config_proxy.h"
 #include "crimson/common/log.h"
 #include "crimson/os/futurized_store.h"
 
+using std::map;
+using std::set;
+using std::string;
+
 namespace {
-  seastar::logger& logger()
-  {
-    return crimson::get_logger(ceph_subsys_filestore);
-  }
+
+seastar::logger& logger()
+{
+  return crimson::get_logger(ceph_subsys_alienstore);
+}
 
 class OnCommit final: public Context
 {
-  int cpuid;
+  const int cpuid;
   Context *oncommit;
+  seastar::alien::instance &alien;
   seastar::promise<> &alien_done;
 public:
   OnCommit(
     int id,
     seastar::promise<> &done,
     Context *oncommit,
+    seastar::alien::instance &alien,
     ceph::os::Transaction& txn)
-    : cpuid(id), oncommit(oncommit),
-      alien_done(done) {}
+    : cpuid(id),
+      oncommit(oncommit),
+      alien(alien),
+      alien_done(done) {
+  }
 
   void finish(int) final {
-    return seastar::alien::submit_to(cpuid, [this] {
-      if (oncommit) oncommit->complete(0);
+    return seastar::alien::submit_to(alien, cpuid, [this] {
+      if (oncommit) {
+        oncommit->complete(0);
+      }
       alien_done.set_value();
       return seastar::make_ready_future<>();
     }).wait();
@@ -56,34 +68,56 @@ public:
 
 namespace crimson::os {
 
-AlienStore::AlienStore(const std::string& path, const ConfigValues& values)
-  : path{path}
+using crimson::common::get_conf;
+
+AlienStore::AlienStore(const std::string& type,
+                       const std::string& path,
+                       const ConfigValues& values)
+  : type(type),
+    path{path}
 {
   cct = std::make_unique<CephContext>(CEPH_ENTITY_TYPE_OSD);
   g_ceph_context = cct.get();
   cct->_conf.set_config_values(values);
-  store = std::make_unique<BlueStore>(cct.get(), path);
-
-  long cpu_id = 0;
-  if (long nr_cpus = sysconf(_SC_NPROCESSORS_ONLN); nr_cpus != -1) {
-    cpu_id = nr_cpus - 1;
-  } else {
-    logger().error("{}: unable to get nproc: {}", __func__, errno);
-    cpu_id = -1;
-  }
-  tp = std::make_unique<crimson::os::ThreadPool>(1, 128, cpu_id);
 }
 
 seastar::future<> AlienStore::start()
 {
+  store = ObjectStore::create(cct.get(), type, path);
+  if (!store) {
+    ceph_abort_msgf("unsupported objectstore type: %s", type.c_str());
+  }
+  std::vector<uint64_t> cpu_cores = _parse_cpu_cores();
+  // cores except the first "N_CORES_FOR_SEASTAR" ones will
+  // be used for alien threads scheduling:
+  //   [0, N_CORES_FOR_SEASTAR) are reserved for seastar reactors
+  //   [N_CORES_FOR_SEASTAR, ..] are assigned to alien threads.
+  if (cpu_cores.empty()) {
+    if (long nr_cpus = sysconf(_SC_NPROCESSORS_ONLN);
+       nr_cpus > N_CORES_FOR_SEASTAR ) {
+      for (int i = N_CORES_FOR_SEASTAR; i < nr_cpus; i++) {
+        cpu_cores.push_back(i);
+      }
+    } else {
+      logger().error("{}: unable to get nproc: {}", __func__, errno);
+    }
+  }
+  const auto num_threads =
+    get_conf<uint64_t>("crimson_alien_op_num_threads");
+  tp = std::make_unique<crimson::os::ThreadPool>(num_threads, 128, cpu_cores);
   return tp->start();
 }
 
 seastar::future<> AlienStore::stop()
 {
+  if (!tp) {
+    // not really started yet
+    return seastar::now();
+  }
   return tp->submit([this] {
-    for (auto [cid, ch]: coll_map)
+    for (auto [cid, ch]: coll_map) {
       static_cast<AlienCollection*>(ch.get())->collection.reset();
+    }
     store.reset();
   }).then([this] {
     return tp->stop();
@@ -92,21 +126,30 @@ seastar::future<> AlienStore::stop()
 
 AlienStore::~AlienStore() = default;
 
-seastar::future<> AlienStore::mount()
+AlienStore::mount_ertr::future<> AlienStore::mount()
 {
   logger().debug("{}", __func__);
+  assert(tp);
   return tp->submit([this] {
     return store->mount();
-  }).then([] (int r) {
-    assert(r == 0);
-    return seastar::now();
+  }).then([] (const int r) -> mount_ertr::future<> {
+    if (r != 0) {
+      return crimson::stateful_ec{
+        std::error_code(-r, std::generic_category()) };
+    } else {
+      return mount_ertr::now();
+    }
   });
 }
 
 seastar::future<> AlienStore::umount()
 {
   logger().info("{}", __func__);
-  return transaction_gate.close().then([this] {
+  if (!tp) {
+    // not really started yet
+    return seastar::now();
+  }
+  return op_gate.close().then([this] {
     return tp->submit([this] {
       return store->umount();
     });
@@ -116,15 +159,20 @@ seastar::future<> AlienStore::umount()
   });
 }
 
-seastar::future<> AlienStore::mkfs(uuid_d osd_fsid)
+AlienStore::mkfs_ertr::future<> AlienStore::mkfs(uuid_d osd_fsid)
 {
   logger().debug("{}", __func__);
   store->set_fsid(osd_fsid);
+  assert(tp);
   return tp->submit([this] {
     return store->mkfs();
-  }).then([] (int r) {
-    assert(r == 0);
-    return seastar::now();
+  }).then([] (int r) -> mkfs_ertr::future<> {
+    if (r != 0) {
+      return crimson::stateful_ec{
+        std::error_code(-r, std::generic_category()) };
+    } else {
+      return mkfs_ertr::now();
+    }
   });
 }
 
@@ -135,18 +183,21 @@ AlienStore::list_objects(CollectionRef ch,
                         uint64_t limit) const
 {
   logger().debug("{}", __func__);
-  return seastar::do_with(std::vector<ghobject_t>(), ghobject_t(),
-                          [=] (auto &objects, auto &next) {
+  assert(tp);
+  return do_with_op_gate(std::vector<ghobject_t>(), ghobject_t(),
+                         [=] (auto &objects, auto &next) {
     objects.reserve(limit);
-    return tp->submit([=, &objects, &next] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+      [=, &objects, &next] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->collection_list(c->collection, start, end,
                                     store->get_ideal_list_max(),
                                     &objects, &next);
     }).then([&objects, &next] (int r) {
       assert(r == 0);
-      return seastar::make_ready_future<std::tuple<std::vector<ghobject_t>, ghobject_t>>(
-       std::make_tuple(std::move(objects), std::move(next)));
+      return seastar::make_ready_future<
+       std::tuple<std::vector<ghobject_t>, ghobject_t>>(
+         std::move(objects), std::move(next));
     });
   });
 }
@@ -154,6 +205,7 @@ AlienStore::list_objects(CollectionRef ch,
 seastar::future<CollectionRef> AlienStore::create_new_collection(const coll_t& cid)
 {
   logger().debug("{}", __func__);
+  assert(tp);
   return tp->submit([this, cid] {
     return store->create_new_collection(cid);
   }).then([this, cid] (ObjectStore::CollectionHandle c) {
@@ -177,9 +229,13 @@ seastar::future<CollectionRef> AlienStore::create_new_collection(const coll_t& c
 seastar::future<CollectionRef> AlienStore::open_collection(const coll_t& cid)
 {
   logger().debug("{}", __func__);
+  assert(tp);
   return tp->submit([this, cid] {
     return store->open_collection(cid);
   }).then([this] (ObjectStore::CollectionHandle c) {
+    if (!c) {
+      return seastar::make_ready_future<CollectionRef>();
+    }
     CollectionRef ch;
     auto cp = coll_map.find(c->cid);
     if (cp == coll_map.end()){
@@ -199,8 +255,9 @@ seastar::future<CollectionRef> AlienStore::open_collection(const coll_t& cid)
 seastar::future<std::vector<coll_t>> AlienStore::list_collections()
 {
   logger().debug("{}", __func__);
+  assert(tp);
 
-  return seastar::do_with(std::vector<coll_t>{}, [=] (auto &ls) {
+  return do_with_op_gate(std::vector<coll_t>{}, [=] (auto &ls) {
     return tp->submit([this, &ls] {
       return store->list_collections(ls);
     }).then([&ls] (int r) {
@@ -218,8 +275,9 @@ AlienStore::read(CollectionRef ch,
                  uint32_t op_flags)
 {
   logger().debug("{}", __func__);
-  return seastar::do_with(ceph::bufferlist{}, [=] (auto &bl) {
-    return tp->submit([=, &bl] {
+  assert(tp);
+  return do_with_op_gate(ceph::bufferlist{}, [=] (auto &bl) {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &bl] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->read(c->collection, oid, offset, len, bl, op_flags);
     }).then([&bl] (int r) -> read_errorator::future<ceph::bufferlist> {
@@ -228,7 +286,8 @@ AlienStore::read(CollectionRef ch,
       } else if (r == -EIO) {
         return crimson::ct_error::input_output_error::make();
       } else {
-        return read_errorator::make_ready_future<ceph::bufferlist>(std::move(bl));
+        return read_errorator::make_ready_future<ceph::bufferlist>(
+          std::move(bl));
       }
     });
   });
@@ -241,9 +300,11 @@ AlienStore::readv(CollectionRef ch,
                  uint32_t op_flags)
 {
   logger().debug("{}", __func__);
-  return seastar::do_with(ceph::bufferlist{},
+  assert(tp);
+  return do_with_op_gate(ceph::bufferlist{},
     [this, ch, oid, &m, op_flags](auto& bl) {
-    return tp->submit([this, ch, oid, &m, op_flags, &bl] {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+      [this, ch, oid, &m, op_flags, &bl] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->readv(c->collection, oid, m, bl, op_flags);
     }).then([&bl](int r) -> read_errorator::future<ceph::bufferlist> {
@@ -252,30 +313,36 @@ AlienStore::readv(CollectionRef ch,
       } else if (r == -EIO) {
         return crimson::ct_error::input_output_error::make();
       } else {
-        return read_errorator::make_ready_future<ceph::bufferlist>(std::move(bl));
+        return read_errorator::make_ready_future<ceph::bufferlist>(
+         std::move(bl));
       }
     });
   });
 }
 
-AlienStore::get_attr_errorator::future<ceph::bufferptr>
+AlienStore::get_attr_errorator::future<ceph::bufferlist>
 AlienStore::get_attr(CollectionRef ch,
                      const ghobject_t& oid,
                      std::string_view name) const
 {
   logger().debug("{}", __func__);
-  return seastar::do_with(ceph::bufferptr{}, [=] (auto &value) {
-    return tp->submit([=, &value] {
-      auto c =static_cast<AlienCollection*>(ch.get());
-      return store->getattr(c->collection, oid,
-                           static_cast<std::string>(name).c_str(), value);
-    }).then([oid, &value] (int r) -> get_attr_errorator::future<ceph::bufferptr> {
+  assert(tp);
+  return do_with_op_gate(ceph::bufferlist{}, std::string{name},
+                         [=] (auto &value, const auto& name) {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &value, &name] {
+      // XXX: `name` isn't a `std::string_view` anymore! it had to be converted
+      // to `std::string` for the sake of extending life-time not only of
+      // a _ptr-to-data_ but _data_ as well. Otherwise we would run into a use-
+      // after-free issue.
+      auto c = static_cast<AlienCollection*>(ch.get());
+      return store->getattr(c->collection, oid, name.c_str(), value);
+    }).then([oid, &value] (int r) -> get_attr_errorator::future<ceph::bufferlist> {
       if (r == -ENOENT) {
         return crimson::ct_error::enoent::make();
       } else if (r == -ENODATA) {
         return crimson::ct_error::enodata::make();
       } else {
-        return get_attr_errorator::make_ready_future<ceph::bufferptr>(
+        return get_attr_errorator::make_ready_future<ceph::bufferlist>(
           std::move(value));
       }
     });
@@ -287,11 +354,12 @@ AlienStore::get_attrs(CollectionRef ch,
                       const ghobject_t& oid)
 {
   logger().debug("{}", __func__);
-  return seastar::do_with(attrs_t{}, [=] (auto &aset) {
-    return tp->submit([=, &aset] {
+  assert(tp);
+  return do_with_op_gate(attrs_t{}, [=] (auto &aset) {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &aset] {
       auto c = static_cast<AlienCollection*>(ch.get());
-      return store->getattrs(c->collection, oid,
-                            reinterpret_cast<map<string,bufferptr>&>(aset));
+      const auto r = store->getattrs(c->collection, oid, aset);
+      return r;
     }).then([&aset] (int r) -> get_attrs_ertr::future<attrs_t> {
       if (r == -ENOENT) {
         return crimson::ct_error::enoent::make();
@@ -308,8 +376,9 @@ auto AlienStore::omap_get_values(CollectionRef ch,
   -> read_errorator::future<omap_values_t>
 {
   logger().debug("{}", __func__);
-  return seastar::do_with(omap_values_t{}, [=] (auto &values) {
-    return tp->submit([=, &values] {
+  assert(tp);
+  return do_with_op_gate(omap_values_t{}, [=] (auto &values) {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &values] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->omap_get_values(c->collection, oid, keys,
                                    reinterpret_cast<map<string, bufferlist>*>(&values));
@@ -318,7 +387,8 @@ auto AlienStore::omap_get_values(CollectionRef ch,
         return crimson::ct_error::enoent::make();
       } else {
         assert(r == 0);
-        return read_errorator::make_ready_future<omap_values_t>(std::move(values));
+        return read_errorator::make_ready_future<omap_values_t>(
+         std::move(values));
       }
     });
   });
@@ -330,8 +400,9 @@ auto AlienStore::omap_get_values(CollectionRef ch,
   -> read_errorator::future<std::tuple<bool, omap_values_t>>
 {
   logger().debug("{} with_start", __func__);
-  return seastar::do_with(omap_values_t{}, [=] (auto &values) {
-    return tp->submit([=, &values] {
+  assert(tp);
+  return do_with_op_gate(omap_values_t{}, [=] (auto &values) {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &values] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->omap_get_values(c->collection, oid, start,
                                    reinterpret_cast<map<string, bufferlist>*>(&values));
@@ -344,7 +415,7 @@ auto AlienStore::omap_get_values(CollectionRef ch,
         return crimson::ct_error::input_output_error::make();
       } else {
         return read_errorator::make_ready_future<std::tuple<bool, omap_values_t>>(
-          std::make_tuple(true, std::move(values)));
+          true, std::move(values));
       }
     });
   });
@@ -356,37 +427,64 @@ seastar::future<> AlienStore::do_transaction(CollectionRef ch,
   logger().debug("{}", __func__);
   auto id = seastar::this_shard_id();
   auto done = seastar::promise<>();
-  return seastar::do_with(
+  return do_with_op_gate(
     std::move(txn),
     std::move(done),
     [this, ch, id] (auto &txn, auto &done) {
-      return seastar::with_gate(transaction_gate, [this, ch, id, &txn, &done] {
-       return tp_mutex.lock().then ([this, ch, id, &txn, &done] {
+       AlienCollection* alien_coll = static_cast<AlienCollection*>(ch.get());
+        // moving the `ch` is crucial for buildability on newer S* versions.
+       return alien_coll->with_lock([this, ch=std::move(ch), id, &txn, &done] {
          Context *crimson_wrapper =
            ceph::os::Transaction::collect_all_contexts(txn);
-         return tp->submit([this, ch, id, crimson_wrapper, &txn, &done] {
-           txn.register_on_commit(new OnCommit(id, done, crimson_wrapper, txn));
+         assert(tp);
+         return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+           [this, ch, id, crimson_wrapper, &txn, &done, &alien=seastar::engine().alien()] {
+           txn.register_on_commit(new OnCommit(id, done, crimson_wrapper,
+                                               alien, txn));
            auto c = static_cast<AlienCollection*>(ch.get());
            return store->queue_transaction(c->collection, std::move(txn));
          });
-       }).then([this, &done] (int r) {
+       }).then([&done] (int r) {
          assert(r == 0);
-         tp_mutex.unlock();
          return done.get_future();
        });
-      });
     });
 }
 
+seastar::future<> AlienStore::inject_data_error(const ghobject_t& o)
+{
+  logger().debug("{}", __func__);
+  assert(tp);
+  return seastar::with_gate(op_gate, [=] {
+    return tp->submit([=] {
+      return store->inject_data_error(o);
+    });
+  });
+}
+
+seastar::future<> AlienStore::inject_mdata_error(const ghobject_t& o)
+{
+  logger().debug("{}", __func__);
+  assert(tp);
+  return seastar::with_gate(op_gate, [=] {
+    return tp->submit([=] {
+      return store->inject_mdata_error(o);
+    });
+  });
+}
+
 seastar::future<> AlienStore::write_meta(const std::string& key,
                                          const std::string& value)
 {
   logger().debug("{}", __func__);
-  return tp->submit([=] {
-    return store->write_meta(key, value);
-  }).then([] (int r) {
-    assert(r == 0);
-    return seastar::make_ready_future<>();
+  assert(tp);
+  return seastar::with_gate(op_gate, [=] {
+    return tp->submit([=] {
+      return store->write_meta(key, value);
+    }).then([] (int r) {
+      assert(r == 0);
+      return seastar::make_ready_future<>();
+    });
   });
 }
 
@@ -394,20 +492,23 @@ seastar::future<std::tuple<int, std::string>>
 AlienStore::read_meta(const std::string& key)
 {
   logger().debug("{}", __func__);
-  return tp->submit([this, key] {
-    std::string value;
-    int r = store->read_meta(key, &value);
-    if (r > 0) {
-      value.resize(r);
-      boost::algorithm::trim_right_if(value,
-        [] (unsigned char c) {return isspace(c);});
-    } else {
-      value.clear();
-    }
-    return std::make_pair(r, value);
-  }).then([] (auto entry) {
-    return seastar::make_ready_future<std::tuple<int, std::string>>(
-      std::move(entry));
+  assert(tp);
+  return seastar::with_gate(op_gate, [this, key] {
+    return tp->submit([this, key] {
+      std::string value;
+      int r = store->read_meta(key, &value);
+      if (r > 0) {
+        value.resize(r);
+        boost::algorithm::trim_right_if(value,
+          [] (unsigned char c) {return isspace(c);});
+      } else {
+        value.clear();
+      }
+      return std::make_pair(r, value);
+    }).then([] (auto entry) {
+      return seastar::make_ready_future<std::tuple<int, std::string>>(
+        std::move(entry));
+    });
   });
 }
 
@@ -420,7 +521,8 @@ uuid_d AlienStore::get_fsid() const
 seastar::future<store_statfs_t> AlienStore::stat() const
 {
   logger().info("{}", __func__);
-  return seastar::do_with(store_statfs_t{}, [this] (store_statfs_t &st) {
+  assert(tp);
+  return do_with_op_gate(store_statfs_t{}, [this] (store_statfs_t &st) {
     return tp->submit([this, &st] {
       return store->statfs(&st, nullptr);
     }).then([&st] (int r) {
@@ -440,8 +542,9 @@ seastar::future<struct stat> AlienStore::stat(
   CollectionRef ch,
   const ghobject_t& oid)
 {
-  return seastar::do_with((struct stat){}, [this, ch, oid](auto& st) {
-    return tp->submit([this, ch, oid, &st] {
+  assert(tp);
+  return do_with_op_gate((struct stat){}, [this, ch, oid](auto& st) {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [this, ch, oid, &st] {
       auto c = static_cast<AlienCollection*>(ch.get());
       store->stat(c->collection, oid, &st);
       return st;
@@ -453,8 +556,9 @@ auto AlienStore::omap_get_header(CollectionRef ch,
                                  const ghobject_t& oid)
   -> read_errorator::future<ceph::bufferlist>
 {
-  return seastar::do_with(ceph::bufferlist(), [=](auto& bl) {
-    return tp->submit([=, &bl] {
+  assert(tp);
+  return do_with_op_gate(ceph::bufferlist(), [=](auto& bl) {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &bl] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->omap_get_header(c->collection, oid, &bl);
     }).then([&bl] (int r) -> read_errorator::future<ceph::bufferlist> {
@@ -464,7 +568,8 @@ auto AlienStore::omap_get_header(CollectionRef ch,
         logger().error("omap_get_header: {}", r);
         return crimson::ct_error::input_output_error::make();
       } else {
-        return read_errorator::make_ready_future<ceph::bufferlist>(std::move(bl));
+        return read_errorator::make_ready_future<ceph::bufferlist>(
+         std::move(bl));
       }
     });
   });
@@ -476,14 +581,14 @@ seastar::future<std::map<uint64_t, uint64_t>> AlienStore::fiemap(
   uint64_t off,
   uint64_t len)
 {
-  return seastar::do_with(std::map<uint64_t, uint64_t>(), [=](auto& destmap) {
-    return tp->submit([=, &destmap] {
+  assert(tp);
+  return do_with_op_gate(std::map<uint64_t, uint64_t>(), [=](auto& destmap) {
+    return tp->submit(ch->get_cid().hash_to_shard(tp->size()), [=, &destmap] {
       auto c = static_cast<AlienCollection*>(ch.get());
       return store->fiemap(c->collection, oid, off, len, destmap);
     }).then([&destmap] (int i) {
-      return seastar::make_ready_future
-         <std::map<uint64_t, uint64_t>>
-         (std::move(destmap));
+      return seastar::make_ready_future<std::map<uint64_t, uint64_t>>(
+       std::move(destmap));
     });
   });
 }
@@ -492,12 +597,15 @@ seastar::future<FuturizedStore::OmapIteratorRef> AlienStore::get_omap_iterator(
   CollectionRef ch,
   const ghobject_t& oid)
 {
-  return tp->submit([=] {
+  assert(tp);
+  return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
+    [this, ch, oid] {
     auto c = static_cast<AlienCollection*>(ch.get());
     auto iter = store->get_omap_iterator(c->collection, oid);
     return FuturizedStore::OmapIteratorRef(
              new AlienStore::AlienOmapIterator(iter,
-                                               this));
+                                               this,
+                                               ch));
   });
 }
 
@@ -505,7 +613,9 @@ seastar::future<FuturizedStore::OmapIteratorRef> AlienStore::get_omap_iterator(
 //      needs further optimization.
 seastar::future<> AlienStore::AlienOmapIterator::seek_to_first()
 {
-  return store->tp->submit([=] {
+  assert(store->tp);
+  return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+    [this] {
     return iter->seek_to_first();
   }).then([] (int r) {
     assert(r == 0);
@@ -516,7 +626,9 @@ seastar::future<> AlienStore::AlienOmapIterator::seek_to_first()
 seastar::future<> AlienStore::AlienOmapIterator::upper_bound(
   const std::string& after)
 {
-  return store->tp->submit([this, after] {
+  assert(store->tp);
+  return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+    [this, after] {
     return iter->upper_bound(after);
   }).then([] (int r) {
     assert(r == 0);
@@ -527,7 +639,9 @@ seastar::future<> AlienStore::AlienOmapIterator::upper_bound(
 seastar::future<> AlienStore::AlienOmapIterator::lower_bound(
   const std::string& to)
 {
-  return store->tp->submit([this, to] {
+  assert(store->tp);
+  return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+    [this, to] {
     return iter->lower_bound(to);
   }).then([] (int r) {
     assert(r == 0);
@@ -537,7 +651,9 @@ seastar::future<> AlienStore::AlienOmapIterator::lower_bound(
 
 seastar::future<> AlienStore::AlienOmapIterator::next()
 {
-  return store->tp->submit([this] {
+  assert(store->tp);
+  return store->tp->submit(ch->get_cid().hash_to_shard(store->tp->size()),
+    [this] {
     return iter->next();
   }).then([] (int r) {
     assert(r == 0);
@@ -555,13 +671,6 @@ std::string AlienStore::AlienOmapIterator::key()
   return iter->key();
 }
 
-seastar::future<std::string> AlienStore::AlienOmapIterator::tail_key()
-{
-  return store->tp->submit([this] {
-    return iter->tail_key();
-  });
-}
-
 ceph::buffer::list AlienStore::AlienOmapIterator::value()
 {
   return iter->value();
@@ -572,4 +681,26 @@ int AlienStore::AlienOmapIterator::status() const
   return iter->status();
 }
 
+std::vector<uint64_t> AlienStore::_parse_cpu_cores()
+{
+  std::vector<uint64_t> cpu_cores;
+  auto cpu_string =
+    get_conf<std::string>("crimson_alien_thread_cpu_cores");
+
+  std::string token;
+  std::istringstream token_stream(cpu_string);
+  while (std::getline(token_stream, token, ',')) {
+    std::istringstream cpu_stream(token);
+    std::string cpu;
+    std::getline(cpu_stream, cpu, '-');
+    uint64_t start_cpu = std::stoull(cpu);
+    std::getline(cpu_stream, cpu, '-');
+    uint64_t end_cpu = std::stoull(cpu);
+    for (uint64_t i = start_cpu; i < end_cpu; i++) {
+      cpu_cores.push_back(i);
+    }
+  }
+  return cpu_cores;
+}
+
 }