#include <sys/stat.h>
#include <signal.h>
#include <time.h>
-#include <boost/scoped_ptr.hpp>
#include <boost/range/adaptor/reversed.hpp>
#ifdef HAVE_SYS_PARAM_H
#endif
#include "osd/PG.h"
-#include "osd/scrub_machine.h"
-#include "osd/pg_scrubber.h"
+#include "osd/scrubber/scrub_machine.h"
+#include "osd/scrubber/pg_scrubber.h"
#include "include/types.h"
#include "include/compat.h"
#include "include/random.h"
+#include "include/scope_guard.h"
#include "OSD.h"
#include "OSDMap.h"
#include "messages/MMonGetOSDMap.h"
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGNotify2.h"
-#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGQuery2.h"
#include "messages/MOSDPGLog.h"
#include "messages/MOSDPGRemove.h"
#include "messages/MOSDScrub.h"
#include "messages/MOSDScrub2.h"
-#include "messages/MOSDRepScrub.h"
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
#include "messages/MPGStats.h"
-#include "messages/MWatchNotify.h"
-#include "messages/MOSDPGPush.h"
-#include "messages/MOSDPGPushReply.h"
-#include "messages/MOSDPGPull.h"
-
#include "messages/MMonGetPurgedSnaps.h"
#include "messages/MMonGetPurgedSnapsReply.h"
#else
#define tracepoint(...)
#endif
-#ifdef HAVE_JAEGER
-#include "common/tracer.h"
-#endif
+
+#include "osd_tracer.h"
+
#define dout_context cct
#define dout_subsys ceph_subsys_osd
using namespace ceph::osd::scheduler;
using TOPNSPC::common::cmd_getval;
+using TOPNSPC::common::cmd_getval_or;
static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
return *_dout << "osd." << whoami << " " << epoch << " ";
}
+
//Initial features in new superblock.
//Features here are also automatically upgraded
CompatSet OSD::get_osd_initial_compat_set() {
OSDService::OSDService(OSD *osd, ceph::async::io_context_pool& poolctx) :
osd(osd),
cct(osd->cct),
- whoami(osd->whoami), store(osd->store),
+ whoami(osd->whoami), store(osd->store.get()),
log_client(osd->log_client), clog(osd->clog),
pg_recovery_stats(osd->pg_recovery_stats),
cluster_messenger(osd->cluster_messenger),
publish_lock{ceph::make_mutex("OSDService::publish_lock")},
pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
max_oldest_map(0),
- scrubs_local(0),
- scrubs_remote(0),
+ m_scrub_queue{cct, *this},
agent_valid_iterator(false),
agent_ops(0),
flush_mode_high_count(0),
// --------------------------------------
// dispatch
-bool OSDService::can_inc_scrubs()
-{
- bool can_inc = false;
- std::lock_guard l(sched_scrub_lock);
-
- if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
- dout(20) << __func__ << " == true " << scrubs_local << " local + " << scrubs_remote
- << " remote < max " << cct->_conf->osd_max_scrubs << dendl;
- can_inc = true;
- } else {
- dout(20) << __func__ << " == false " << scrubs_local << " local + " << scrubs_remote
- << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
- }
-
- return can_inc;
-}
-
-bool OSDService::inc_scrubs_local()
-{
- bool result = false;
- std::lock_guard l{sched_scrub_lock};
- if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
- dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local+1)
- << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
- result = true;
- ++scrubs_local;
- } else {
- dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
- }
- return result;
-}
-
-void OSDService::dec_scrubs_local()
-{
- std::lock_guard l{sched_scrub_lock};
- dout(20) << __func__ << " " << scrubs_local << " -> " << (scrubs_local-1)
- << " (max " << cct->_conf->osd_max_scrubs << ", remote " << scrubs_remote << ")" << dendl;
- --scrubs_local;
- ceph_assert(scrubs_local >= 0);
-}
-
-bool OSDService::inc_scrubs_remote()
-{
- bool result = false;
- std::lock_guard l{sched_scrub_lock};
- if (scrubs_local + scrubs_remote < cct->_conf->osd_max_scrubs) {
- dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote+1)
- << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
- result = true;
- ++scrubs_remote;
- } else {
- dout(20) << __func__ << " " << scrubs_local << " local + " << scrubs_remote << " remote >= max " << cct->_conf->osd_max_scrubs << dendl;
- }
- return result;
-}
-
-void OSDService::dec_scrubs_remote()
-{
- std::lock_guard l{sched_scrub_lock};
- dout(20) << __func__ << " " << scrubs_remote << " -> " << (scrubs_remote-1)
- << " (max " << cct->_conf->osd_max_scrubs << ", local " << scrubs_local << ")" << dendl;
- --scrubs_remote;
- ceph_assert(scrubs_remote >= 0);
-}
-
-void OSDService::dump_scrub_reservations(Formatter *f)
-{
- std::lock_guard l{sched_scrub_lock};
- f->dump_int("scrubs_local", scrubs_local);
- f->dump_int("scrubs_remote", scrubs_remote);
- f->dump_int("osd_max_scrubs", cct->_conf->osd_max_scrubs);
-}
-
void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
epoch_t *_bind_epoch) const
{
OSDMapRef osdmap = get_osdmap();
if (osdmap && osdmap->is_up(whoami)) {
- dout(0) << __func__ << " telling mon we are shutting down" << dendl;
+ dout(0) << __func__ << " telling mon we are shutting down and dead " << dendl;
set_state(PREPARING_TO_STOP);
monc->send_mon_message(
new MOSDMarkMeDown(
whoami,
osdmap->get_addrs(whoami),
osdmap->get_epoch(),
- true // request ack
+ true, // request ack
+ true // mark as down and dead
));
const auto timeout = ceph::make_timespan(cct->_conf->osd_mon_shutdown_timeout);
is_stopping_cond.wait_for(l, timeout,
[this] { return get_state() == STOPPING; });
}
+
dout(0) << __func__ << " starting shutdown" << dendl;
set_state(STOPPING);
return true;
template <class MSG_TYPE>
void OSDService::queue_scrub_event_msg(PG* pg,
Scrub::scrub_prio_t with_priority,
- unsigned int qu_priority)
+ unsigned int qu_priority,
+ Scrub::act_token_t act_token)
{
const auto epoch = pg->get_osdmap_epoch();
- auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
- dout(15) << "queue a scrub event (" << *msg << ") for " << *pg << ". Epoch: " << epoch << dendl;
+ auto msg = new MSG_TYPE(pg->get_pgid(), epoch, act_token);
+ dout(15) << "queue a scrub event (" << *msg << ") for " << *pg
+ << ". Epoch: " << epoch << " token: " << act_token << dendl;
enqueue_back(OpSchedulerItem(
unique_ptr<OpSchedulerItem::OpQueueable>(msg), cct->_conf->osd_scrub_cost,
}
template <class MSG_TYPE>
-void OSDService::queue_scrub_event_msg(PG* pg, Scrub::scrub_prio_t with_priority)
+void OSDService::queue_scrub_event_msg(PG* pg,
+ Scrub::scrub_prio_t with_priority)
{
const auto epoch = pg->get_osdmap_epoch();
auto msg = new MSG_TYPE(pg->get_pgid(), epoch);
void OSDService::queue_for_rep_scrub(PG* pg,
Scrub::scrub_prio_t with_priority,
- unsigned int qu_priority)
+ unsigned int qu_priority,
+ Scrub::act_token_t act_token)
{
- queue_scrub_event_msg<PGRepScrub>(pg, with_priority, qu_priority);
+ queue_scrub_event_msg<PGRepScrub>(pg, with_priority, qu_priority, act_token);
}
void OSDService::queue_for_rep_scrub_resched(PG* pg,
Scrub::scrub_prio_t with_priority,
- unsigned int qu_priority)
+ unsigned int qu_priority,
+ Scrub::act_token_t act_token)
{
// Resulting scrub event: 'SchedReplica'
- queue_scrub_event_msg<PGRepScrubResched>(pg, with_priority, qu_priority);
+ queue_scrub_event_msg<PGRepScrubResched>(pg, with_priority, qu_priority,
+ act_token);
}
void OSDService::queue_for_scrub_granted(PG* pg, Scrub::scrub_prio_t with_priority)
queue_scrub_event_msg<PGScrubPushesUpdate>(pg, with_priority);
}
+void OSDService::queue_scrub_chunk_free(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'SelectedChunkFree'
+ queue_scrub_event_msg<PGScrubChunkIsFree>(pg, with_priority);
+}
+
+void OSDService::queue_scrub_chunk_busy(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'ChunkIsBusy'
+ queue_scrub_event_msg<PGScrubChunkIsBusy>(pg, with_priority);
+}
+
void OSDService::queue_scrub_applied_update(PG* pg, Scrub::scrub_prio_t with_priority)
{
queue_scrub_event_msg<PGScrubAppliedUpdate>(pg, with_priority);
queue_scrub_event_msg<PGScrubDigestUpdate>(pg, with_priority);
}
+void OSDService::queue_scrub_got_local_map(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'IntLocalMapDone'
+ queue_scrub_event_msg<PGScrubGotLocalMap>(pg, with_priority);
+}
+
void OSDService::queue_scrub_got_repl_maps(PG* pg, Scrub::scrub_prio_t with_priority)
{
// Resulting scrub event: 'GotReplicas'
queue_scrub_event_msg<PGScrubGotReplMaps>(pg, with_priority);
}
+void OSDService::queue_scrub_maps_compared(PG* pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'MapsCompared'
+ queue_scrub_event_msg<PGScrubMapsCompared>(pg, with_priority);
+}
+
void OSDService::queue_scrub_replica_pushes(PG *pg, Scrub::scrub_prio_t with_priority)
{
// Resulting scrub event: 'ReplicaPushesUpd'
queue_scrub_event_msg<PGScrubReplicaPushes>(pg, with_priority);
}
+void OSDService::queue_scrub_is_finished(PG *pg)
+{
+ // Resulting scrub event: 'ScrubFinished'
+ queue_scrub_event_msg<PGScrubScrubFinished>(pg, Scrub::scrub_prio_t::high_priority);
+}
+
+void OSDService::queue_scrub_next_chunk(PG *pg, Scrub::scrub_prio_t with_priority)
+{
+ // Resulting scrub event: 'NextChunk'
+ queue_scrub_event_msg<PGScrubGetNextChunk>(pg, with_priority);
+}
+
void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e)
{
dout(10) << __func__ << " on " << pgid << " e " << e << dendl;
} // namespace ceph::osd_cmds
-int OSD::mkfs(CephContext *cct, ObjectStore *store, uuid_d fsid, int whoami, string osdspec_affinity)
+int OSD::mkfs(CephContext *cct,
+ std::unique_ptr<ObjectStore> store,
+ uuid_d fsid,
+ int whoami,
+ string osdspec_affinity)
{
int ret;
OSDSuperblock sb;
bufferlist sbbl;
- ObjectStore::CollectionHandle ch;
-
// if we are fed a uuid for this osd, use it.
store->set_fsid(cct->_conf->osd_uuid);
if (ret) {
derr << "OSD::mkfs: ObjectStore::mkfs failed with error "
<< cpp_strerror(ret) << dendl;
- goto free_store;
+ return ret;
}
store->set_cache_shards(1); // doesn't matter for mkfs!
if (ret) {
derr << "OSD::mkfs: couldn't mount ObjectStore: error "
<< cpp_strerror(ret) << dendl;
- goto free_store;
+ return ret;
}
- ch = store->open_collection(coll_t::meta());
+ auto umount_store = make_scope_guard([&] {
+ store->umount();
+ });
+
+ ObjectStore::CollectionHandle ch =
+ store->open_collection(coll_t::meta());
if (ch) {
ret = store->read(ch, OSD_SUPERBLOCK_GOBJECT, 0, 0, sbbl);
if (ret < 0) {
derr << "OSD::mkfs: have meta collection but no superblock" << dendl;
- goto free_store;
+ return ret;
}
/* if we already have superblock, check content of superblock */
dout(0) << " have superblock" << dendl;
if (whoami != sb.whoami) {
derr << "provided osd id " << whoami << " != superblock's " << sb.whoami
<< dendl;
- ret = -EINVAL;
- goto umount_store;
+ return -EINVAL;
}
if (fsid != sb.cluster_fsid) {
derr << "provided cluster fsid " << fsid
<< " != superblock's " << sb.cluster_fsid << dendl;
- ret = -EINVAL;
- goto umount_store;
+ return -EINVAL;
}
} else {
// create superblock
if (ret) {
derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_GOBJECT: "
<< "queue_transaction returned " << cpp_strerror(ret) << dendl;
- goto umount_store;
+ return ret;
}
+ ch->flush();
}
- ret = write_meta(cct, store, sb.cluster_fsid, sb.osd_fsid, whoami, osdspec_affinity);
+ ret = write_meta(cct, store.get(), sb.cluster_fsid, sb.osd_fsid, whoami, osdspec_affinity);
if (ret) {
derr << "OSD::mkfs: failed to write fsid file: error "
<< cpp_strerror(ret) << dendl;
- goto umount_store;
- }
-
-umount_store:
- if (ch) {
- ch.reset();
}
- store->umount();
-free_store:
- delete store;
return ret;
}
// cons/des
-OSD::OSD(CephContext *cct_, ObjectStore *store_,
+OSD::OSD(CephContext *cct_,
+ std::unique_ptr<ObjectStore> store_,
int id,
Messenger *internal_messenger,
Messenger *external_messenger,
mgrc(cct_, client_messenger, &mc->monmap),
logger(create_logger()),
recoverystate_perf(create_recoverystate_perf()),
- store(store_),
+ store(std::move(store_)),
log_client(cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
clog(log_client.create_channel()),
whoami(id),
this);
shards.push_back(one_shard);
}
-
- // override some config options if mclock is enabled on all the shards
- maybe_override_options_for_qos();
}
OSD::~OSD()
cct->get_perfcounters_collection()->remove(logger);
delete recoverystate_perf;
delete logger;
- delete store;
}
double OSD::get_tick_interval() const
return pools;
}
+OSD::PGRefOrError OSD::locate_asok_target(const cmdmap_t& cmdmap,
+ stringstream& ss,
+ bool only_primary)
+{
+ string pgidstr;
+ if (!cmd_getval(cmdmap, "pgid", pgidstr)) {
+ ss << "no pgid specified";
+ return OSD::PGRefOrError{std::nullopt, -EINVAL};
+ }
+
+ pg_t pgid;
+ if (!pgid.parse(pgidstr.c_str())) {
+ ss << "couldn't parse pgid '" << pgidstr << "'";
+ return OSD::PGRefOrError{std::nullopt, -EINVAL};
+ }
+
+ spg_t pcand;
+ PGRef pg;
+ if (get_osdmap()->get_primary_shard(pgid, &pcand) && (pg = _lookup_lock_pg(pcand))) {
+ if (pg->is_primary() || !only_primary) {
+ return OSD::PGRefOrError{pg, 0};
+ }
+
+ ss << "not primary for pgid " << pgid;
+ pg->unlock();
+ return OSD::PGRefOrError{std::nullopt, -EAGAIN};
+ } else {
+ ss << "i don't have pgid " << pgid;
+ return OSD::PGRefOrError{std::nullopt, -ENOENT};
+ }
+}
+
+// note that the cmdmap is explicitly copied into asok_route_to_pg()
+int OSD::asok_route_to_pg(
+ bool only_primary,
+ std::string_view prefix,
+ cmdmap_t cmdmap,
+ Formatter* f,
+ stringstream& ss,
+ const bufferlist& inbl,
+ bufferlist& outbl,
+ std::function<void(int, const std::string&, bufferlist&)> on_finish)
+{
+ auto [target_pg, ret] = locate_asok_target(cmdmap, ss, only_primary);
+
+ if (!target_pg.has_value()) {
+ // 'ss' and 'ret' already contain the error information
+ on_finish(ret, ss.str(), outbl);
+ return ret;
+ }
+
+ // the PG was locked by locate_asok_target()
+ try {
+ (*target_pg)->do_command(prefix, cmdmap, inbl, on_finish);
+ (*target_pg)->unlock();
+ return 0; // the pg handler calls on_finish directly
+ } catch (const TOPNSPC::common::bad_cmd_get& e) {
+ (*target_pg)->unlock();
+ ss << e.what();
+ on_finish(ret, ss.str(), outbl);
+ return -EINVAL;
+ }
+}
+
void OSD::asok_command(
std::string_view prefix, const cmdmap_t& cmdmap,
Formatter *f,
}
}
+ // --- PG commands that will be answered even if !primary ---
+
+ else if (prefix == "scrubdebug") {
+ asok_route_to_pg(false, prefix, cmdmap, f, ss, inbl, outbl, on_finish);
+ return;
+ }
+
// --- OSD commands follow ---
else if (prefix == "status") {
f->close_section();
} else if (prefix == "dump_scrub_reservations") {
f->open_object_section("scrub_reservations");
- service.dump_scrub_reservations(f);
+ service.get_scrub_services().dump_scrub_reservations(f);
f->close_section();
} else if (prefix == "get_latest_osdmap") {
get_latest_osdmap();
} else if (prefix == "dump_objectstore_kv_stats") {
store->get_db_statistics(f);
} else if (prefix == "dump_scrubs") {
- service.dumps_scrub(f);
+ service.get_scrub_services().dump_scrubs(f);
} else if (prefix == "calc_objectstore_db_histogram") {
store->generate_db_histogram(f);
} else if (prefix == "flush_store_cache") {
}
else if (prefix == "bench") {
- int64_t count;
- int64_t bsize;
- int64_t osize, onum;
// default count 1G, size 4MB
- cmd_getval(cmdmap, "count", count, (int64_t)1 << 30);
- cmd_getval(cmdmap, "size", bsize, (int64_t)4 << 20);
- cmd_getval(cmdmap, "object_size", osize, (int64_t)0);
- cmd_getval(cmdmap, "object_num", onum, (int64_t)0);
-
- uint32_t duration = cct->_conf->osd_bench_duration;
-
- if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
- // let us limit the block size because the next checks rely on it
- // having a sane value. If we allow any block size to be set things
- // can still go sideways.
- ss << "block 'size' values are capped at "
- << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
- << " a higher value, please adjust 'osd_bench_max_block_size'";
- ret = -EINVAL;
- goto out;
- } else if (bsize < (int64_t) (1 << 20)) {
- // entering the realm of small block sizes.
- // limit the count to a sane value, assuming a configurable amount of
- // IOPS and duration, so that the OSD doesn't get hung up on this,
- // preventing timeouts from going off
- int64_t max_count =
- bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
- if (count > max_count) {
- ss << "'count' values greater than " << max_count
- << " for a block size of " << byte_u_t(bsize) << ", assuming "
- << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
- << " for " << duration << " seconds,"
- << " can cause ill effects on osd. "
- << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
- << " value if you wish to use a higher 'count'.";
- ret = -EINVAL;
- goto out;
- }
- } else {
- // 1MB block sizes are big enough so that we get more stuff done.
- // However, to avoid the osd from getting hung on this and having
- // timers being triggered, we are going to limit the count assuming
- // a configurable throughput and duration.
- // NOTE: max_count is the total amount of bytes that we believe we
- // will be able to write during 'duration' for the given
- // throughput. The block size hardly impacts this unless it's
- // way too big. Given we already check how big the block size
- // is, it's safe to assume everything will check out.
- int64_t max_count =
- cct->_conf->osd_bench_large_size_max_throughput * duration;
- if (count > max_count) {
- ss << "'count' values greater than " << max_count
- << " for a block size of " << byte_u_t(bsize) << ", assuming "
- << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
- << " for " << duration << " seconds,"
- << " can cause ill effects on osd. "
- << " Please adjust 'osd_bench_large_size_max_throughput'"
- << " with a higher value if you wish to use a higher 'count'.";
- ret = -EINVAL;
- goto out;
- }
- }
-
- if (osize && bsize > osize)
- bsize = osize;
-
- dout(1) << " bench count " << count
- << " bsize " << byte_u_t(bsize) << dendl;
-
- ObjectStore::Transaction cleanupt;
-
- if (osize && onum) {
- bufferlist bl;
- bufferptr bp(osize);
- bp.zero();
- bl.push_back(std::move(bp));
- bl.rebuild_page_aligned();
- for (int i=0; i<onum; ++i) {
- char nm[30];
- snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
- object_t oid(nm);
- hobject_t soid(sobject_t(oid, 0));
- ObjectStore::Transaction t;
- t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
- store->queue_transaction(service.meta_ch, std::move(t), NULL);
- cleanupt.remove(coll_t(), ghobject_t(soid));
- }
- }
-
- bufferlist bl;
- bufferptr bp(bsize);
- bp.zero();
- bl.push_back(std::move(bp));
- bl.rebuild_page_aligned();
-
- {
- C_SaferCond waiter;
- if (!service.meta_ch->flush_commit(&waiter)) {
- waiter.wait();
- }
- }
-
- utime_t start = ceph_clock_now();
- for (int64_t pos = 0; pos < count; pos += bsize) {
- char nm[30];
- unsigned offset = 0;
- if (onum && osize) {
- snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
- offset = rand() % (osize / bsize) * bsize;
- } else {
- snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
- }
- object_t oid(nm);
- hobject_t soid(sobject_t(oid, 0));
- ObjectStore::Transaction t;
- t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
- store->queue_transaction(service.meta_ch, std::move(t), NULL);
- if (!onum || !osize)
- cleanupt.remove(coll_t::meta(), ghobject_t(soid));
- }
+ int64_t count = cmd_getval_or<int64_t>(cmdmap, "count", 1LL << 30);
+ int64_t bsize = cmd_getval_or<int64_t>(cmdmap, "size", 4LL << 20);
+ int64_t osize = cmd_getval_or<int64_t>(cmdmap, "object_size", 0);
+ int64_t onum = cmd_getval_or<int64_t>(cmdmap, "object_num", 0);
+ double elapsed = 0.0;
- {
- C_SaferCond waiter;
- if (!service.meta_ch->flush_commit(&waiter)) {
- waiter.wait();
- }
- }
- utime_t end = ceph_clock_now();
-
- // clean up
- store->queue_transaction(service.meta_ch, std::move(cleanupt), NULL);
- {
- C_SaferCond waiter;
- if (!service.meta_ch->flush_commit(&waiter)) {
- waiter.wait();
- }
+ ret = run_osd_bench_test(count, bsize, osize, onum, &elapsed, ss);
+ if (ret != 0) {
+ goto out;
}
- double elapsed = end - start;
double rate = count / elapsed;
double iops = rate / bsize;
f->open_object_section("osd_bench_results");
}
f->close_section(); // entries
f->close_section(); // network_ping_times
+ } else if (prefix == "dump_pool_statfs") {
+ lock_guard l(osd_lock);
+
+ int64_t p = 0;
+ if (!(cmd_getval(cmdmap, "poolid", p))) {
+ ss << "Error dumping pool statfs: no poolid provided";
+ ret = -EINVAL;
+ goto out;
+ }
+
+ store_statfs_t st;
+ bool per_pool_omap_stats = false;
+
+ ret = store->pool_statfs(p, &st, &per_pool_omap_stats);
+ if (ret < 0) {
+ ss << "Error dumping pool statfs: " << cpp_strerror(ret);
+ goto out;
+ } else {
+ ss << "dumping pool statfs...";
+ f->open_object_section("pool_statfs");
+ f->dump_int("poolid", p);
+ st.dump(f);
+ f->close_section();
+ }
} else {
ceph_abort_msg("broken asok registration");
}
on_finish(ret, ss.str(), outbl);
}
+int OSD::run_osd_bench_test(
+ int64_t count,
+ int64_t bsize,
+ int64_t osize,
+ int64_t onum,
+ double *elapsed,
+ ostream &ss)
+{
+ int ret = 0;
+ uint32_t duration = cct->_conf->osd_bench_duration;
+
+ if (bsize > (int64_t) cct->_conf->osd_bench_max_block_size) {
+ // let us limit the block size because the next checks rely on it
+ // having a sane value. If we allow any block size to be set things
+ // can still go sideways.
+ ss << "block 'size' values are capped at "
+ << byte_u_t(cct->_conf->osd_bench_max_block_size) << ". If you wish to use"
+ << " a higher value, please adjust 'osd_bench_max_block_size'";
+ ret = -EINVAL;
+ return ret;
+ } else if (bsize < (int64_t) (1 << 20)) {
+ // entering the realm of small block sizes.
+ // limit the count to a sane value, assuming a configurable amount of
+ // IOPS and duration, so that the OSD doesn't get hung up on this,
+ // preventing timeouts from going off
+ int64_t max_count =
+ bsize * duration * cct->_conf->osd_bench_small_size_max_iops;
+ if (count > max_count) {
+ ss << "'count' values greater than " << max_count
+ << " for a block size of " << byte_u_t(bsize) << ", assuming "
+ << cct->_conf->osd_bench_small_size_max_iops << " IOPS,"
+ << " for " << duration << " seconds,"
+ << " can cause ill effects on osd. "
+ << " Please adjust 'osd_bench_small_size_max_iops' with a higher"
+ << " value if you wish to use a higher 'count'.";
+ ret = -EINVAL;
+ return ret;
+ }
+ } else {
+ // 1MB block sizes are big enough so that we get more stuff done.
+ // However, to avoid the osd from getting hung on this and having
+ // timers being triggered, we are going to limit the count assuming
+ // a configurable throughput and duration.
+ // NOTE: max_count is the total amount of bytes that we believe we
+ // will be able to write during 'duration' for the given
+ // throughput. The block size hardly impacts this unless it's
+ // way too big. Given we already check how big the block size
+ // is, it's safe to assume everything will check out.
+ int64_t max_count =
+ cct->_conf->osd_bench_large_size_max_throughput * duration;
+ if (count > max_count) {
+ ss << "'count' values greater than " << max_count
+ << " for a block size of " << byte_u_t(bsize) << ", assuming "
+ << byte_u_t(cct->_conf->osd_bench_large_size_max_throughput) << "/s,"
+ << " for " << duration << " seconds,"
+ << " can cause ill effects on osd. "
+ << " Please adjust 'osd_bench_large_size_max_throughput'"
+ << " with a higher value if you wish to use a higher 'count'.";
+ ret = -EINVAL;
+ return ret;
+ }
+ }
+
+ if (osize && bsize > osize) {
+ bsize = osize;
+ }
+
+ dout(1) << " bench count " << count
+ << " bsize " << byte_u_t(bsize) << dendl;
+
+ ObjectStore::Transaction cleanupt;
+
+ if (osize && onum) {
+ bufferlist bl;
+ bufferptr bp(osize);
+ memset(bp.c_str(), 'a', bp.length());
+ bl.push_back(std::move(bp));
+ bl.rebuild_page_aligned();
+ for (int i=0; i<onum; ++i) {
+ char nm[30];
+ snprintf(nm, sizeof(nm), "disk_bw_test_%d", i);
+ object_t oid(nm);
+ hobject_t soid(sobject_t(oid, 0));
+ ObjectStore::Transaction t;
+ t.write(coll_t(), ghobject_t(soid), 0, osize, bl);
+ store->queue_transaction(service.meta_ch, std::move(t), nullptr);
+ cleanupt.remove(coll_t(), ghobject_t(soid));
+ }
+ }
+
+ bufferlist bl;
+ bufferptr bp(bsize);
+ memset(bp.c_str(), 'a', bp.length());
+ bl.push_back(std::move(bp));
+ bl.rebuild_page_aligned();
+
+ {
+ C_SaferCond waiter;
+ if (!service.meta_ch->flush_commit(&waiter)) {
+ waiter.wait();
+ }
+ }
+
+ utime_t start = ceph_clock_now();
+ for (int64_t pos = 0; pos < count; pos += bsize) {
+ char nm[30];
+ unsigned offset = 0;
+ if (onum && osize) {
+ snprintf(nm, sizeof(nm), "disk_bw_test_%d", (int)(rand() % onum));
+ offset = rand() % (osize / bsize) * bsize;
+ } else {
+ snprintf(nm, sizeof(nm), "disk_bw_test_%lld", (long long)pos);
+ }
+ object_t oid(nm);
+ hobject_t soid(sobject_t(oid, 0));
+ ObjectStore::Transaction t;
+ t.write(coll_t::meta(), ghobject_t(soid), offset, bsize, bl);
+ store->queue_transaction(service.meta_ch, std::move(t), nullptr);
+ if (!onum || !osize) {
+ cleanupt.remove(coll_t::meta(), ghobject_t(soid));
+ }
+ }
+
+ {
+ C_SaferCond waiter;
+ if (!service.meta_ch->flush_commit(&waiter)) {
+ waiter.wait();
+ }
+ }
+ utime_t end = ceph_clock_now();
+ *elapsed = end - start;
+
+ // clean up
+ store->queue_transaction(service.meta_ch, std::move(cleanupt), nullptr);
+ {
+ C_SaferCond waiter;
+ if (!service.meta_ch->flush_commit(&waiter)) {
+ waiter.wait();
+ }
+ }
+
+ return ret;
+}
+
class TestOpsSocketHook : public AdminSocketHook {
OSDService *service;
ObjectStore *store;
<< cpp_strerror(r) << dendl;
return r;
}
- fuse_store = new FuseStore(store, mntpath);
+ fuse_store = new FuseStore(store.get(), mntpath);
r = fuse_store->start();
if (r < 0) {
derr << __func__ << " unable to start fuse: " << cpp_strerror(r) << dendl;
std::lock_guard lock(osd_lock);
if (is_stopping())
return 0;
-
+ tracing::osd::tracer.init("osd");
tick_timer.init();
tick_timer_without_osd_lock.init();
service.recovery_request_timer.init();
store->set_cache_shards(get_num_cache_shards());
+ int rotating_auth_attempts = 0;
+ auto rotating_auth_timeout =
+ g_conf().get_val<int64_t>("rotating_keys_bootstrap_timeout");
+
int r = store->mount();
if (r < 0) {
derr << "OSD:init: unable to mount object store" << dendl;
dout(2) << "boot" << dendl;
service.meta_ch = store->open_collection(coll_t::meta());
-
+ if (!service.meta_ch) {
+ derr << "OSD:init: unable to open meta collection"
+ << dendl;
+ r = -ENOENT;
+ goto out;
+ }
// initialize the daily loadavg with current 15min loadavg
double loadavgs[3];
if (getloadavg(loadavgs, 3) == 3) {
daily_loadavg = 1.0;
}
- int rotating_auth_attempts = 0;
- auto rotating_auth_timeout =
- g_conf().get_val<int64_t>("rotating_keys_bootstrap_timeout");
-
// sanity check long object name handling
{
hobject_t l;
auto ch = service.meta_ch;
auto hoid = make_snapmapper_oid();
unsigned max = cct->_conf->osd_target_transaction_size;
- r = SnapMapper::convert_legacy(cct, store, ch, hoid, max);
+ r = SnapMapper::convert_legacy(cct, store.get(), ch, hoid, max);
if (r < 0)
goto out;
}
start_boot();
+ // Override a few options if mclock scheduler is enabled.
+ maybe_override_max_osd_capacity_for_qos();
+ maybe_override_options_for_qos();
+
return 0;
out:
enable_disable_fuse(true);
store->umount();
- delete store;
- store = NULL;
+ store.reset();
return r;
}
ceph_assert(r == 0);
r = admin_socket->register_command("dump_op_pq_state",
asok_hook,
- "dump op priority queue state");
+ "dump op queue state");
ceph_assert(r == 0);
r = admin_socket->register_command("dump_blocklist",
asok_hook,
"Dump osd heartbeat network ping times");
ceph_assert(r == 0);
- test_ops_hook = new TestOpsSocketHook(&(this->service), this->store);
+ r = admin_socket->register_command(
+ "dump_pool_statfs name=poolid,type=CephInt,req=true", asok_hook,
+ "Dump store's statistics for the given pool");
+ ceph_assert(r == 0);
+
+ test_ops_hook = new TestOpsSocketHook(&(this->service), this->store.get());
// Note: pools are CephString instead of CephPoolname because
// these commands traditionally support both pool names and numbers
r = admin_socket->register_command(
asok_hook,
"Scrub purged_snaps vs snapmapper index");
ceph_assert(r == 0);
+ r = admin_socket->register_command(
+ "scrubdebug " \
+ "name=pgid,type=CephPgid " \
+ "name=cmd,type=CephChoices,strings=block|unblock|set|unset " \
+ "name=value,type=CephString,req=false",
+ asok_hook,
+ "debug the scrubber");
+ ceph_assert(r == 0);
// -- pg commands --
// old form: ceph pg <pgid> command ...
int OSD::shutdown()
{
+ // vstart overwrites osd_fast_shutdown value in the conf file -> force the value here!
+ //cct->_conf->osd_fast_shutdown = true;
+
+ dout(0) << "Fast Shutdown: - cct->_conf->osd_fast_shutdown = "
+ << cct->_conf->osd_fast_shutdown
+ << ", null-fm = " << store->has_null_manager() << dendl;
+
+ utime_t start_time_func = ceph_clock_now();
+
if (cct->_conf->osd_fast_shutdown) {
derr << "*** Immediate shutdown (osd_fast_shutdown=true) ***" << dendl;
if (cct->_conf->osd_fast_shutdown_notify_mon)
service.prepare_to_stop();
- cct->_log->flush();
- _exit(0);
- }
- if (!service.prepare_to_stop())
+ // There is no state we need to keep wehn running in NULL-FM moode
+ if (!store->has_null_manager()) {
+ cct->_log->flush();
+ _exit(0);
+ }
+ } else if (!service.prepare_to_stop()) {
return 0; // already shutting down
+ }
+
osd_lock.lock();
if (is_stopping()) {
osd_lock.unlock();
return 0;
}
- dout(0) << "shutdown" << dendl;
+ if (!cct->_conf->osd_fast_shutdown) {
+ dout(0) << "shutdown" << dendl;
+ }
+
+ // don't accept new task for this OSD
set_state(STATE_STOPPING);
- // Debugging
- if (cct->_conf.get_val<bool>("osd_debug_shutdown")) {
+ // Disabled debugging during fast-shutdown
+ if (!cct->_conf->osd_fast_shutdown && cct->_conf.get_val<bool>("osd_debug_shutdown")) {
cct->_conf.set_val("debug_osd", "100");
cct->_conf.set_val("debug_journal", "100");
cct->_conf.set_val("debug_filestore", "100");
cct->_conf.apply_changes(nullptr);
}
+ if (cct->_conf->osd_fast_shutdown) {
+ // first, stop new task from being taken from op_shardedwq
+ // and clear all pending tasks
+ op_shardedwq.stop_for_fast_shutdown();
+
+ utime_t start_time_timer = ceph_clock_now();
+ tick_timer.shutdown();
+ {
+ std::lock_guard l(tick_timer_lock);
+ tick_timer_without_osd_lock.shutdown();
+ }
+
+ osd_lock.unlock();
+ utime_t start_time_osd_drain = ceph_clock_now();
+
+ // then, wait on osd_op_tp to drain (TBD: should probably add a timeout)
+ osd_op_tp.drain();
+ osd_op_tp.stop();
+
+ utime_t start_time_umount = ceph_clock_now();
+ store->prepare_for_fast_shutdown();
+ std::lock_guard lock(osd_lock);
+ // TBD: assert in allocator that nothing is being add
+ store->umount();
+
+ utime_t end_time = ceph_clock_now();
+ if (cct->_conf->osd_fast_shutdown_timeout) {
+ ceph_assert(end_time - start_time_func < cct->_conf->osd_fast_shutdown_timeout);
+ }
+ dout(0) <<"Fast Shutdown duration total :" << end_time - start_time_func << " seconds" << dendl;
+ dout(0) <<"Fast Shutdown duration osd_drain :" << start_time_umount - start_time_osd_drain << " seconds" << dendl;
+ dout(0) <<"Fast Shutdown duration umount :" << end_time - start_time_umount << " seconds" << dendl;
+ dout(0) <<"Fast Shutdown duration timer :" << start_time_osd_drain - start_time_timer << " seconds" << dendl;
+ cct->_log->flush();
+
+ // now it is safe to exit
+ _exit(0);
+ }
+
// stop MgrClient earlier as it's more like an internal consumer of OSD
mgrc.shutdown();
std::lock_guard lock(osd_lock);
store->umount();
- delete store;
- store = nullptr;
+ store.reset();
dout(10) << "Store synced" << dendl;
op_tracker.on_shutdown();
hb_front_server_messenger->shutdown();
hb_back_server_messenger->shutdown();
+ utime_t duration = ceph_clock_now() - start_time_func;
+ dout(0) <<"Slow Shutdown duration:" << duration << " seconds" << dendl;
+
+ tracing::osd::tracer.shutdown();
+
return r;
}
++it) {
spg_t pgid;
if (it->is_temp(&pgid) ||
- (it->is_pg(&pgid) && PG::_has_removal_flag(store, pgid))) {
+ (it->is_pg(&pgid) && PG::_has_removal_flag(store.get(), pgid))) {
dout(10) << "load_pgs " << *it
<< " removing, legacy or flagged for removal pg" << dendl;
- recursive_remove_collection(cct, store, pgid, *it);
+ recursive_remove_collection(cct, store.get(), pgid, *it);
continue;
}
dout(10) << "pgid " << pgid << " coll " << coll_t(pgid) << dendl;
epoch_t map_epoch = 0;
- int r = PG::peek_map_epoch(store, pgid, &map_epoch);
+ int r = PG::peek_map_epoch(store.get(), pgid, &map_epoch);
if (r < 0) {
derr << __func__ << " unable to peek at " << pgid << " metadata, skipping"
<< dendl;
pg = _make_pg(get_osdmap(), pgid);
}
if (!pg) {
- recursive_remove_collection(cct, store, pgid, *it);
+ recursive_remove_collection(cct, store.get(), pgid, *it);
continue;
}
pg->ch = store->open_collection(pg->coll);
// read pg state, log
- pg->read_state(store);
+ pg->read_state(store.get());
if (pg->dne()) {
dout(10) << "load_pgs " << *it << " deleting dne" << dendl;
pg->ch = nullptr;
pg->unlock();
- recursive_remove_collection(cct, store, pgid, *it);
+ recursive_remove_collection(cct, store.get(), pgid, *it);
continue;
}
{
store->set_collection_commit_queue(pg->coll, &(shards[shard_index]->context_queue));
}
- pg->reg_next_scrub();
-
dout(10) << __func__ << " loaded " << *pg << dendl;
pg->unlock();
return nullptr;
}
- PeeringCtx rctx = create_context();
-
OSDMapRef startmap = get_map(info->epoch);
if (info->by_mon) {
<< "the pool allows ec overwrites but is not stored in "
<< "bluestore, so deep scrubbing will not detect bitrot";
}
+ PeeringCtx rctx;
create_pg_collection(
rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
init_pg_ondisk(rctx.transaction, pgid, pp);
acting_primary,
info->history,
info->past_intervals,
- false,
rctx.transaction);
pg->init_collection_pool_opts();
ceph_assert(ceph_mutex_is_locked_by_me(heartbeat_lock));
dout(30) << "heartbeat" << dendl;
- // get CPU load avg
- double loadavgs[1];
- int hb_interval = cct->_conf->osd_heartbeat_interval;
- int n_samples = 86400;
- if (hb_interval > 1) {
- n_samples /= hb_interval;
- if (n_samples < 1)
- n_samples = 1;
- }
-
- if (getloadavg(loadavgs, 1) == 1) {
- logger->set(l_osd_loadavg, 100 * loadavgs[0]);
- daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples;
- dout(30) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
+ auto load_for_logger = service.get_scrub_services().update_load_average();
+ if (load_for_logger) {
+ logger->set(l_osd_loadavg, load_for_logger.value());
}
-
dout(30) << "heartbeat checking stats" << dendl;
// refresh peer list and osd stats
// use a seed that is stable for each scrub interval, but varies
// by OSD to avoid any herds.
rng.seed(whoami + superblock.last_purged_snaps_scrub.sec());
- double r = (rng() % 1024) / 1024;
+ double r = (rng() % 1024) / 1024.0;
next +=
cct->_conf->osd_scrub_min_interval *
cct->_conf->osd_scrub_interval_randomize_ratio * r;
return;
}
- int64_t shardid;
- cmd_getval(cmdmap, "shardid", shardid, int64_t(shard_id_t::NO_SHARD));
+ int64_t shardid = cmd_getval_or<int64_t>(cmdmap, "shardid", shard_id_t::NO_SHARD);
hobject_t obj(object_t(objname), string(""), CEPH_NOSNAP, rawpg.ps(), pool, nspace);
ghobject_t gobj(obj, ghobject_t::NO_GEN, shard_id_t(uint8_t(shardid)));
spg_t pgid(curmap->raw_pg_to_pg(rawpg), shard_id_t(shardid));
return;
}
if (command == "set_recovery_delay") {
- int64_t delay;
- cmd_getval(cmdmap, "utime", delay, (int64_t)0);
+ int64_t delay = cmd_getval_or<int64_t>(cmdmap, "utime", 0);
ostringstream oss;
oss << delay;
int r = service->cct->_conf.set_val("osd_recovery_delay_start",
return;
}
if (command == "injectfull") {
- int64_t count;
- string type;
+ int64_t count = cmd_getval_or<int64_t>(cmdmap, "count", -1);
+ string type = cmd_getval_or<string>(cmdmap, "type", "full");
OSDService::s_names state;
- cmd_getval(cmdmap, "type", type, string("full"));
- cmd_getval(cmdmap, "count", count, (int64_t)-1);
+
if (type == "none" || count == 0) {
type = "none";
count = 0;
m->last < superblock.purged_snaps_last) {
goto out;
}
- SnapMapper::record_purged_snaps(cct, store, service.meta_ch,
+ SnapMapper::record_purged_snaps(cct, store.get(), service.meta_ch,
make_purged_snaps_oid(), &t,
m->purged_snaps);
superblock.purged_snaps_last = m->last;
{
dout(10) << __func__ << dendl;
ceph_assert(ceph_mutex_is_locked(osd_lock));
- SnapMapper::Scrubber s(cct, store, service.meta_ch,
+ SnapMapper::Scrubber s(cct, store.get(), service.meta_ch,
make_snapmapper_oid(),
make_purged_snaps_oid());
clog->debug() << "purged_snaps scrub starts";
void OSD::ms_fast_dispatch(Message *m)
{
-
-#ifdef HAVE_JAEGER
- jaeger_tracing::init_tracer("osd-services-reinit");
- dout(10) << "jaeger tracer after " << opentracing::Tracer::Global() << dendl;
- auto dispatch_span = jaeger_tracing::new_span(__func__);
-#endif
+ auto dispatch_span = tracing::osd::tracer.start_trace(__func__);
FUNCTRACE(cct);
if (service.is_stopping()) {
m->put();
return;
}
-
// peering event?
switch (m->get_type()) {
case CEPH_MSG_PING:
case MSG_OSD_SCRUB2:
handle_fast_scrub(static_cast<MOSDScrub2*>(m));
return;
-
case MSG_OSD_PG_CREATE2:
return handle_fast_pg_create(static_cast<MOSDPGCreate2*>(m));
- case MSG_OSD_PG_QUERY:
- return handle_fast_pg_query(static_cast<MOSDPGQuery*>(m));
case MSG_OSD_PG_NOTIFY:
return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));
case MSG_OSD_PG_INFO:
return handle_fast_pg_info(static_cast<MOSDPGInfo*>(m));
case MSG_OSD_PG_REMOVE:
return handle_fast_pg_remove(static_cast<MOSDPGRemove*>(m));
-
// these are single-pg messages that handle themselves
case MSG_OSD_PG_LOG:
case MSG_OSD_PG_TRIM:
tracepoint(osd, ms_fast_dispatch, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
}
-#ifdef HAVE_JAEGER
- op->set_osd_parent_span(dispatch_span);
- if (op->osd_parent_span) {
- auto op_req_span = jaeger_tracing::child_span("op-request-created", op->osd_parent_span);
- op->set_osd_parent_span(op_req_span);
- }
-#endif
+ op->osd_parent_span = tracing::osd::tracer.add_span("op-request-created", dispatch_span);
+
if (m->trace)
op->osd_trace.init("osd op", &trace_endpoint, &m->trace);
return false;
}
-OSDService::ScrubJob::ScrubJob(CephContext* cct,
- const spg_t& pg, const utime_t& timestamp,
- double pool_scrub_min_interval,
- double pool_scrub_max_interval, bool must)
- : cct(cct),
- pgid(pg),
- sched_time(timestamp),
- deadline(timestamp)
-{
- // if not explicitly requested, postpone the scrub with a random delay
- if (!must) {
- double scrub_min_interval = pool_scrub_min_interval > 0 ?
- pool_scrub_min_interval : cct->_conf->osd_scrub_min_interval;
- double scrub_max_interval = pool_scrub_max_interval > 0 ?
- pool_scrub_max_interval : cct->_conf->osd_scrub_max_interval;
-
- sched_time += scrub_min_interval;
- double r = rand() / (double)RAND_MAX;
- sched_time +=
- scrub_min_interval * cct->_conf->osd_scrub_interval_randomize_ratio * r;
- if (scrub_max_interval == 0) {
- deadline = utime_t();
- } else {
- deadline += scrub_max_interval;
- }
-
- }
-}
-
-bool OSDService::ScrubJob::ScrubJob::operator<(const OSDService::ScrubJob& rhs) const {
- if (sched_time < rhs.sched_time)
- return true;
- if (sched_time > rhs.sched_time)
- return false;
- return pgid < rhs.pgid;
-}
-void OSDService::dumps_scrub(ceph::Formatter *f)
+void OSD::sched_scrub()
{
- ceph_assert(f != nullptr);
- std::lock_guard l(sched_scrub_lock);
+ auto& scrub_scheduler = service.get_scrub_services();
- f->open_array_section("scrubs");
- for (const auto &i: sched_scrub_pg) {
- f->open_object_section("scrub");
- f->dump_stream("pgid") << i.pgid;
- f->dump_stream("sched_time") << i.sched_time;
- f->dump_stream("deadline") << i.deadline;
- f->dump_bool("forced", i.sched_time == PgScrubber::scrub_must_stamp());
- f->close_section();
+ // fail fast if no resources are available
+ if (!scrub_scheduler.can_inc_scrubs()) {
+ dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
+ return;
}
- f->close_section();
-}
-double OSD::scrub_sleep_time(bool must_scrub)
-{
- if (must_scrub) {
- return cct->_conf->osd_scrub_sleep;
- }
- utime_t now = ceph_clock_now();
- if (scrub_time_permit(now)) {
- return cct->_conf->osd_scrub_sleep;
+ // if there is a PG that is just now trying to reserve scrub replica resources -
+ // we should wait and not initiate a new scrub
+ if (scrub_scheduler.is_reserving_now()) {
+ dout(20) << __func__ << ": scrub resources reservation in progress" << dendl;
+ return;
}
- double normal_sleep = cct->_conf->osd_scrub_sleep;
- double extended_sleep = cct->_conf->osd_scrub_extended_sleep;
- return std::max(extended_sleep, normal_sleep);
-}
-bool OSD::scrub_time_permit(utime_t now)
-{
- struct tm bdt;
- time_t tt = now.sec();
- localtime_r(&tt, &bdt);
+ Scrub::ScrubPreconds env_conditions;
- bool day_permit = false;
- if (cct->_conf->osd_scrub_begin_week_day < cct->_conf->osd_scrub_end_week_day) {
- if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day && bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
- day_permit = true;
- }
- } else {
- if (bdt.tm_wday >= cct->_conf->osd_scrub_begin_week_day || bdt.tm_wday < cct->_conf->osd_scrub_end_week_day) {
- day_permit = true;
+ if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
+ if (!cct->_conf->osd_repair_during_recovery) {
+ dout(15) << __func__ << ": not scheduling scrubs due to active recovery"
+ << dendl;
+ return;
}
+ dout(10) << __func__
+ << " will only schedule explicitly requested repair due to active recovery"
+ << dendl;
+ env_conditions.allow_requested_repair_only = true;
}
- if (!day_permit) {
- dout(20) << __func__ << " should run between week day " << cct->_conf->osd_scrub_begin_week_day
- << " - " << cct->_conf->osd_scrub_end_week_day
- << " now " << bdt.tm_wday << " = no" << dendl;
- return false;
- }
-
- bool time_permit = false;
- if (cct->_conf->osd_scrub_begin_hour < cct->_conf->osd_scrub_end_hour) {
- if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour && bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
- time_permit = true;
- }
- } else {
- if (bdt.tm_hour >= cct->_conf->osd_scrub_begin_hour || bdt.tm_hour < cct->_conf->osd_scrub_end_hour) {
- time_permit = true;
+ if (g_conf()->subsys.should_gather<ceph_subsys_osd, 20>()) {
+ dout(20) << __func__ << " sched_scrub starts" << dendl;
+ auto all_jobs = scrub_scheduler.list_registered_jobs();
+ for (const auto& sj : all_jobs) {
+ dout(20) << "sched_scrub scrub-queue jobs: " << *sj << dendl;
}
}
- if (time_permit) {
- dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
- << " - " << cct->_conf->osd_scrub_end_hour
- << " now " << bdt.tm_hour << " = yes" << dendl;
- } else {
- dout(20) << __func__ << " should run between " << cct->_conf->osd_scrub_begin_hour
- << " - " << cct->_conf->osd_scrub_end_hour
- << " now " << bdt.tm_hour << " = no" << dendl;
- }
- return time_permit;
+
+ auto was_started = scrub_scheduler.select_pg_and_scrub(env_conditions);
+ dout(20) << "sched_scrub done (" << ScrubQueue::attempt_res_text(was_started)
+ << ")" << dendl;
}
-bool OSD::scrub_load_below_threshold()
+Scrub::schedule_result_t OSDService::initiate_a_scrub(spg_t pgid,
+ bool allow_requested_repair_only)
{
- double loadavgs[3];
- if (getloadavg(loadavgs, 3) != 3) {
- dout(10) << __func__ << " couldn't read loadavgs\n" << dendl;
- return false;
- }
+ dout(20) << __func__ << " trying " << pgid << dendl;
- // allow scrub if below configured threshold
- long cpus = sysconf(_SC_NPROCESSORS_ONLN);
- double loadavg_per_cpu = cpus > 0 ? loadavgs[0] / cpus : loadavgs[0];
- if (loadavg_per_cpu < cct->_conf->osd_scrub_load_threshold) {
- dout(20) << __func__ << " loadavg per cpu " << loadavg_per_cpu
- << " < max " << cct->_conf->osd_scrub_load_threshold
- << " = yes" << dendl;
- return true;
- }
+ // we have a candidate to scrub. We need some PG information to know if scrubbing is
+ // allowed
- // allow scrub if below daily avg and currently decreasing
- if (loadavgs[0] < daily_loadavg && loadavgs[0] < loadavgs[2]) {
- dout(20) << __func__ << " loadavg " << loadavgs[0]
- << " < daily_loadavg " << daily_loadavg
- << " and < 15m avg " << loadavgs[2]
- << " = yes" << dendl;
- return true;
+ PGRef pg = osd->lookup_lock_pg(pgid);
+ if (!pg) {
+ // the PG was dequeued in the short timespan between creating the candidates list
+ // (collect_ripe_jobs()) and here
+ dout(5) << __func__ << " pg " << pgid << " not found" << dendl;
+ return Scrub::schedule_result_t::no_such_pg;
}
- dout(20) << __func__ << " loadavg " << loadavgs[0]
- << " >= max " << cct->_conf->osd_scrub_load_threshold
- << " and ( >= daily_loadavg " << daily_loadavg
- << " or >= 15m avg " << loadavgs[2]
- << ") = no" << dendl;
- return false;
-}
-
-void OSD::sched_scrub()
-{
- dout(20) << __func__ << " sched_scrub starts" << dendl;
-
- // if not permitted, fail fast
- if (!service.can_inc_scrubs()) {
- dout(20) << __func__ << ": OSD cannot inc scrubs" << dendl;
- return;
+ // This has already started, so go on to the next scrub job
+ if (pg->is_scrub_queued_or_active()) {
+ pg->unlock();
+ dout(20) << __func__ << ": already in progress pgid " << pgid << dendl;
+ return Scrub::schedule_result_t::already_started;
}
- bool allow_requested_repair_only = false;
- if (service.is_recovery_active() && !cct->_conf->osd_scrub_during_recovery) {
- if (!cct->_conf->osd_repair_during_recovery) {
- dout(15) << __func__ << ": not scheduling scrubs due to active recovery" << dendl;
- return;
- }
- dout(10) << __func__
- << " will only schedule explicitly requested repair due to active recovery"
- << dendl;
- allow_requested_repair_only = true;
+ // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
+ if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
+ pg->unlock();
+ dout(10) << __func__ << " skip " << pgid
+ << " because repairing is not explicitly requested on it" << dendl;
+ return Scrub::schedule_result_t::preconditions;
}
- utime_t now = ceph_clock_now();
- bool time_permit = scrub_time_permit(now);
- bool load_is_low = scrub_load_below_threshold();
- dout(20) << "sched_scrub load_is_low=" << (int)load_is_low << dendl;
-
- OSDService::ScrubJob scrub_job;
- if (service.first_scrub_stamp(&scrub_job)) {
- do {
- dout(30) << "sched_scrub examine " << scrub_job.pgid << " at " << scrub_job.sched_time << dendl;
-
- if (scrub_job.sched_time > now) {
- // save ourselves some effort
- dout(20) << "sched_scrub " << scrub_job.pgid << " scheduled at " << scrub_job.sched_time
- << " > " << now << dendl;
- break;
- }
-
- if ((scrub_job.deadline.is_zero() || scrub_job.deadline >= now) && !(time_permit && load_is_low)) {
- dout(15) << __func__ << " not scheduling scrub for " << scrub_job.pgid << " due to "
- << (!time_permit ? "time not permit" : "high load") << dendl;
- continue;
- }
-
- PGRef pg = _lookup_lock_pg(scrub_job.pgid);
- if (!pg) {
- dout(20) << __func__ << " pg " << scrub_job.pgid << " not found" << dendl;
- continue;
- }
-
- // This has already started, so go on to the next scrub job
- if (pg->is_scrub_active()) {
- pg->unlock();
- dout(20) << __func__ << ": already in progress pgid " << scrub_job.pgid << dendl;
- continue;
- }
- // Skip other kinds of scrubbing if only explicitly requested repairing is allowed
- if (allow_requested_repair_only && !pg->m_planned_scrub.must_repair) {
- pg->unlock();
- dout(10) << __func__ << " skip " << scrub_job.pgid
- << " because repairing is not explicitly requested on it"
- << dendl;
- continue;
- }
-
- // If it is reserving, let it resolve before going to the next scrub job
- if (pg->m_scrubber->is_reserving()) {
- pg->unlock();
- dout(10) << __func__ << ": reserve in progress pgid " << scrub_job.pgid << dendl;
- break;
- }
- dout(15) << "sched_scrub scrubbing " << scrub_job.pgid << " at " << scrub_job.sched_time
- << (pg->get_must_scrub() ? ", explicitly requested" :
- (load_is_low ? ", load_is_low" : " deadline < now"))
- << dendl;
- if (pg->sched_scrub()) {
- pg->unlock();
- dout(10) << __func__ << " scheduled a scrub!" << " (~" << scrub_job.pgid << "~)" << dendl;
- break;
- }
- pg->unlock();
- } while (service.next_scrub_stamp(scrub_job, &scrub_job));
- }
- dout(20) << "sched_scrub done" << dendl;
+ auto scrub_attempt = pg->sched_scrub();
+ pg->unlock();
+ return scrub_attempt;
}
void OSD::resched_all_scrubs()
{
dout(10) << __func__ << ": start" << dendl;
- const vector<spg_t> pgs = [this] {
- vector<spg_t> pgs;
- OSDService::ScrubJob job;
- if (service.first_scrub_stamp(&job)) {
- do {
- pgs.push_back(job.pgid);
- } while (service.next_scrub_stamp(job, &job));
- }
- return pgs;
- }();
- for (auto& pgid : pgs) {
- dout(20) << __func__ << ": examine " << pgid << dendl;
- PGRef pg = _lookup_lock_pg(pgid);
- if (!pg)
- continue;
- if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
- dout(15) << __func__ << ": reschedule " << pgid << dendl;
- pg->on_info_history_change();
- }
- pg->unlock();
+ auto all_jobs = service.get_scrub_services().list_registered_jobs();
+ for (auto& e : all_jobs) {
+
+ auto& job = *e;
+ dout(20) << __func__ << ": examine " << job.pgid << dendl;
+
+ PGRef pg = _lookup_lock_pg(job.pgid);
+ if (!pg)
+ continue;
+
+ if (!pg->m_planned_scrub.must_scrub && !pg->m_planned_scrub.need_auto) {
+ dout(15) << __func__ << ": reschedule " << job.pgid << dendl;
+ pg->reschedule_scrub();
+ }
+ pg->unlock();
}
dout(10) << __func__ << ": done" << dendl;
}
MPGStats* OSD::collect_pg_stats()
{
+ dout(15) << __func__ << dendl;
// This implementation unconditionally sends every is_primary PG's
// stats every time we're called. This has equivalent cost to the
// previous implementation's worst case where all PGs are busy and
if (!pg->is_primary()) {
continue;
}
- pg->get_pg_stats([&](const pg_stat_t& s, epoch_t lec) {
+ pg->with_pg_stats([&](const pg_stat_t& s, epoch_t lec) {
m->pg_stat[pg->pg_id.pgid] = s;
min_last_epoch_clean = std::min(min_last_epoch_clean, lec);
min_last_epoch_clean_pgs.push_back(pg->pg_id.pgid);
too_old -= cct->_conf.get_val<double>("osd_op_complaint_time");
int slow = 0;
TrackedOpRef oldest_op;
+ OSDMapRef osdmap = get_osdmap();
+ // map of slow op counts by slow op event type for an aggregated logging to
+ // the cluster log.
+ map<uint8_t, int> slow_op_types;
+ // map of slow op counts by pool for reporting a pool name with highest
+ // slow ops.
+ map<uint64_t, int> slow_op_pools;
+ bool log_aggregated_slow_op =
+ cct->_conf.get_val<bool>("osd_aggregated_slow_ops_logging");
auto count_slow_ops = [&](TrackedOp& op) {
if (op.get_initiated() < too_old) {
stringstream ss;
<< " currently "
<< op.state_string();
lgeneric_subdout(cct,osd,20) << ss.str() << dendl;
- clog->warn() << ss.str();
+ if (log_aggregated_slow_op) {
+ if (const OpRequest *req = dynamic_cast<const OpRequest *>(&op)) {
+ uint8_t op_type = req->state_flag();
+ auto m = req->get_req<MOSDFastDispatchOp>();
+ uint64_t poolid = m->get_spg().pgid.m_pool;
+ slow_op_types[op_type]++;
+ if (poolid > 0 && poolid <= (uint64_t) osdmap->get_pool_max()) {
+ slow_op_pools[poolid]++;
+ }
+ }
+ } else {
+ clog->warn() << ss.str();
+ }
slow++;
if (!oldest_op || op.get_initiated() < oldest_op->get_initiated()) {
oldest_op = &op;
if (slow) {
derr << __func__ << " reporting " << slow << " slow ops, oldest is "
<< oldest_op->get_desc() << dendl;
+ if (log_aggregated_slow_op &&
+ slow_op_types.size() > 0) {
+ stringstream ss;
+ ss << slow << " slow requests (by type [ ";
+ for (const auto& [op_type, count] : slow_op_types) {
+ ss << "'" << OpRequest::get_state_string(op_type)
+ << "' : " << count
+ << " ";
+ }
+ auto slow_pool_it = std::max_element(slow_op_pools.begin(), slow_op_pools.end(),
+ [](std::pair<uint64_t, int> p1, std::pair<uint64_t, int> p2) {
+ return p1.second < p2.second;
+ });
+ if (osdmap->get_pools().find(slow_pool_it->first) != osdmap->get_pools().end()) {
+ string pool_name = osdmap->get_pool_name(slow_pool_it->first);
+ ss << "] most affected pool [ '"
+ << pool_name
+ << "' : "
+ << slow_pool_it->second
+ << " ])";
+ } else {
+ ss << "])";
+ }
+ lgeneric_subdout(cct,osd,20) << ss.str() << dendl;
+ clog->warn() << ss.str();
+ }
}
metrics.emplace_back(daemon_metric::SLOW_OPS, slow, oldest_secs);
} else {
// record new purged_snaps
if (superblock.purged_snaps_last == start - 1) {
- SnapMapper::record_purged_snaps(cct, store, service.meta_ch,
+ SnapMapper::record_purged_snaps(cct, store.get(), service.meta_ch,
make_purged_snaps_oid(), &t,
purged_snaps);
superblock.purged_snaps_last = last;
reset_heartbeat_peers(true);
}
}
+ } else if (osdmap->get_epoch() > 0 && osdmap->is_stop(whoami)) {
+ derr << "map says i am stopped by admin. shutting down." << dendl;
+ do_shutdown = true;
}
map_lock.unlock();
++i) {
PG *pg = i->get();
- PeeringCtx rctx = create_context();
+ PeeringCtx rctx;
pg->lock();
dout(10) << __func__ << " " << *pg << dendl;
epoch_t e = pg->get_osdmap_epoch();
// ----------------------------------------
// peering and recovery
-PeeringCtx OSD::create_context()
-{
- return PeeringCtx(get_osdmap()->require_osd_release);
-}
-
void OSD::dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle)
{
m->put();
}
-void OSD::handle_fast_pg_query(MOSDPGQuery *m)
-{
- dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
- if (!require_osd_peer(m)) {
- m->put();
- return;
- }
- int from = m->get_source().num();
- for (auto& p : m->pg_list) {
- enqueue_peering_evt(
- p.first,
- PGPeeringEventRef(
- std::make_shared<PGPeeringEvent>(
- p.second.epoch_sent, p.second.epoch_sent,
- MQuery(
- p.first,
- pg_shard_t(from, p.second.from),
- p.second,
- p.second.epoch_sent),
- false))
- );
- }
- m->put();
-}
-
void OSD::handle_fast_pg_notify(MOSDPGNotify* m)
{
dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
enqueue_peering_evt(
spg_t(p.info.pgid.pgid, p.to),
PGPeeringEventRef(
- std::make_shared<PGPeeringEvent>(
- p.epoch_sent, p.query_epoch,
- MInfoRec(
- pg_shard_t(from, p.from),
- p.info,
- p.epoch_sent)))
+ std::make_shared<PGPeeringEvent>(
+ p.epoch_sent, p.query_epoch,
+ MInfoRec(
+ pg_shard_t(from, p.from),
+ p.info,
+ p.epoch_sent)))
);
}
m->put();
osdmap->get_epoch(), empty,
q.query.epoch_sent);
} else {
- vector<pg_notify_t> ls;
- ls.push_back(
- pg_notify_t(
- q.query.from, q.query.to,
- q.query.epoch_sent,
- osdmap->get_epoch(),
- empty,
- PastIntervals()));
- m = new MOSDPGNotify(osdmap->get_epoch(), std::move(ls));
+ pg_notify_t notify{q.query.from, q.query.to,
+ q.query.epoch_sent,
+ osdmap->get_epoch(),
+ empty,
+ PastIntervals()};
+ m = new MOSDPGNotify2(spg_t{pgid.pgid, q.query.from},
+ std::move(notify));
}
service.maybe_share_map(con.get(), osdmap);
con->send_message(m);
<< " on " << *pg << dendl;
if (do_unfound) {
- PeeringCtx rctx = create_context();
+ PeeringCtx rctx;
rctx.handle = &handle;
pg->find_unfound(queued, rctx);
dispatch_context(rctx, pg, pg->get_osdmap());
op->osd_trace.event("enqueue op");
op->osd_trace.keyval("priority", priority);
op->osd_trace.keyval("cost", cost);
-#ifdef HAVE_JAEGER
- if (op->osd_parent_span) {
- auto enqueue_span = jaeger_tracing::child_span(__func__, op->osd_parent_span);
- enqueue_span->Log({
- {"priority", priority},
- {"cost", cost},
- {"epoch", epoch},
- {"owner", owner},
- {"type", type}
- });
- }
-#endif
+
+ auto enqueue_span = tracing::osd::tracer.add_span(__func__, op->osd_parent_span);
+ enqueue_span->AddEvent(__func__, {
+ {"priority", priority},
+ {"cost", cost},
+ {"epoch", epoch},
+ {"owner", owner},
+ {"type", type}
+ });
+
op->mark_queued_for_pg();
logger->tinc(l_osd_op_before_queue_op_lat, latency);
if (type == MSG_OSD_PG_PUSH ||
PGPeeringEventRef evt,
ThreadPool::TPHandle& handle)
{
- PeeringCtx rctx = create_context();
auto curmap = sdata->get_osdmap();
bool need_up_thru = false;
epoch_t same_interval_since = 0;
derr << __func__ << " unrecognized pg-less event " << evt->get_desc() << dendl;
ceph_abort();
}
- } else if (advance_pg(curmap->get_epoch(), pg, handle, rctx)) {
+ } else if (PeeringCtx rctx;
+ advance_pg(curmap->get_epoch(), pg, handle, rctx)) {
pg->do_peering_event(evt, rctx);
if (pg->is_deleted()) {
pg->unlock();
"osd_snap_trim_sleep",
"osd_snap_trim_sleep_hdd",
"osd_snap_trim_sleep_ssd",
- "osd_snap_trim_sleep_hybrid"
+ "osd_snap_trim_sleep_hybrid",
"osd_scrub_sleep",
"osd_recovery_max_active",
"osd_recovery_max_active_hdd",
if (changed.count("osd_client_message_cap")) {
uint64_t newval = cct->_conf->osd_client_message_cap;
Messenger::Policy pol = client_messenger->get_policy(entity_name_t::TYPE_CLIENT);
- if (pol.throttler_messages && newval > 0) {
+ if (pol.throttler_messages) {
pol.throttler_messages->reset_max(newval);
}
}
if (changed.count("osd_client_message_size_cap")) {
uint64_t newval = cct->_conf->osd_client_message_size_cap;
Messenger::Policy pol = client_messenger->get_policy(entity_name_t::TYPE_CLIENT);
- if (pol.throttler_bytes && newval > 0) {
+ if (pol.throttler_bytes) {
pol.throttler_bytes->reset_max(newval);
}
}
}
}
+void OSD::maybe_override_max_osd_capacity_for_qos()
+{
+ // If the scheduler enabled is mclock, override the default
+ // osd capacity with the value obtained from running the
+ // osd bench test. This is later used to setup mclock.
+ if ((cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler") &&
+ (cct->_conf.get_val<bool>("osd_mclock_skip_benchmark") == false) &&
+ (!unsupported_objstore_for_qos())) {
+ std::string max_capacity_iops_config;
+ bool force_run_benchmark =
+ cct->_conf.get_val<bool>("osd_mclock_force_run_benchmark_on_init");
+
+ if (store_is_rotational) {
+ max_capacity_iops_config = "osd_mclock_max_capacity_iops_hdd";
+ } else {
+ max_capacity_iops_config = "osd_mclock_max_capacity_iops_ssd";
+ }
+
+ if (!force_run_benchmark) {
+ double default_iops = 0.0;
+
+ // Get the current osd iops capacity
+ double cur_iops = cct->_conf.get_val<double>(max_capacity_iops_config);
+
+ // Get the default max iops capacity
+ auto val = cct->_conf.get_val_default(max_capacity_iops_config);
+ if (!val.has_value()) {
+ derr << __func__ << " Unable to determine default value of "
+ << max_capacity_iops_config << dendl;
+ // Cannot determine default iops. Force a run of the OSD benchmark.
+ force_run_benchmark = true;
+ } else {
+ // Default iops
+ default_iops = std::stod(val.value());
+ }
+
+ // Determine if we really need to run the osd benchmark
+ if (!force_run_benchmark && (default_iops != cur_iops)) {
+ dout(1) << __func__ << std::fixed << std::setprecision(2)
+ << " default_iops: " << default_iops
+ << " cur_iops: " << cur_iops
+ << ". Skip OSD benchmark test." << dendl;
+ return;
+ }
+ }
+
+ // Run osd bench: write 100 4MiB objects with blocksize 4KiB
+ int64_t count = 12288000; // Count of bytes to write
+ int64_t bsize = 4096; // Block size
+ int64_t osize = 4194304; // Object size
+ int64_t onum = 100; // Count of objects to write
+ double elapsed = 0.0; // Time taken to complete the test
+ double iops = 0.0;
+ stringstream ss;
+ int ret = run_osd_bench_test(count, bsize, osize, onum, &elapsed, ss);
+ if (ret != 0) {
+ derr << __func__
+ << " osd bench err: " << ret
+ << " osd bench errstr: " << ss.str()
+ << dendl;
+ return;
+ }
+
+ double rate = count / elapsed;
+ iops = rate / bsize;
+ dout(1) << __func__
+ << " osd bench result -"
+ << std::fixed << std::setprecision(3)
+ << " bandwidth (MiB/sec): " << rate / (1024 * 1024)
+ << " iops: " << iops
+ << " elapsed_sec: " << elapsed
+ << dendl;
+
+ // Persist iops to the MON store
+ ret = mon_cmd_set_config(max_capacity_iops_config, std::to_string(iops));
+ if (ret < 0) {
+ // Fallback to setting the config within the in-memory "values" map.
+ cct->_conf.set_val(max_capacity_iops_config, std::to_string(iops));
+ }
+
+ // Override the max osd capacity for all shards
+ for (auto& shard : shards) {
+ shard->update_scheduler_config();
+ }
+ }
+}
+
bool OSD::maybe_override_options_for_qos()
{
// If the scheduler enabled is mclock, override the recovery, backfill
// and sleep options so that mclock can meet the QoS goals.
- if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler") {
+ if (cct->_conf.get_val<std::string>("osd_op_queue") == "mclock_scheduler" &&
+ !unsupported_objstore_for_qos()) {
dout(1) << __func__
<< ": Changing recovery/backfill/sleep settings for QoS" << dendl;
return false;
}
+int OSD::mon_cmd_set_config(const std::string &key, const std::string &val)
+{
+ std::string cmd =
+ "{"
+ "\"prefix\": \"config set\", "
+ "\"who\": \"osd." + std::to_string(whoami) + "\", "
+ "\"name\": \"" + key + "\", "
+ "\"value\": \"" + val + "\""
+ "}";
+
+ vector<std::string> vcmd{cmd};
+ bufferlist inbl;
+ std::string outs;
+ C_SaferCond cond;
+ monc->start_mon_command(vcmd, inbl, nullptr, &outs, &cond);
+ int r = cond.wait();
+ if (r < 0) {
+ derr << __func__ << " Failed to set config key " << key
+ << " err: " << cpp_strerror(r)
+ << " errstr: " << outs << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
+bool OSD::unsupported_objstore_for_qos()
+{
+ static const std::vector<std::string> unsupported_objstores = { "filestore" };
+ return std::find(unsupported_objstores.begin(),
+ unsupported_objstores.end(),
+ store->get_type()) != unsupported_objstores.end();
+}
+
void OSD::update_log_config()
{
- map<string,string> log_to_monitors;
- map<string,string> log_to_syslog;
- map<string,string> log_channel;
- map<string,string> log_prio;
- map<string,string> log_to_graylog;
- map<string,string> log_to_graylog_host;
- map<string,string> log_to_graylog_port;
- uuid_d fsid;
- string host;
-
- if (parse_log_client_options(cct, log_to_monitors, log_to_syslog,
- log_channel, log_prio, log_to_graylog,
- log_to_graylog_host, log_to_graylog_port,
- fsid, host) == 0)
- clog->update_config(log_to_monitors, log_to_syslog,
- log_channel, log_prio, log_to_graylog,
- log_to_graylog_host, log_to_graylog_port,
- fsid, host);
- derr << "log_to_monitors " << log_to_monitors << dendl;
+ auto parsed_options = clog->parse_client_options(cct);
+ derr << "log_to_monitors " << parsed_options.log_to_monitors << dendl;
}
void OSD::check_config()
dout(10) << new_osdmap->get_epoch()
<< " (was " << (old_osdmap ? old_osdmap->get_epoch() : 0) << ")"
<< dendl;
- bool queued = false;
+ int queued = 0;
// check slots
auto p = pg_slots.begin();
dout(20) << __func__ << " " << pgid
<< " pending_peering first epoch " << first
<< " <= " << new_osdmap->get_epoch() << ", requeueing" << dendl;
- _wake_pg_slot(pgid, slot);
- queued = true;
+ queued += _wake_pg_slot(pgid, slot);
}
++p;
continue;
}
if (queued) {
std::lock_guard l{sdata_wait_lock};
- sdata_cond.notify_one();
+ if (queued == 1)
+ sdata_cond.notify_one();
+ else
+ sdata_cond.notify_all();
}
}
-void OSDShard::_wake_pg_slot(
+int OSDShard::_wake_pg_slot(
spg_t pgid,
OSDShardPGSlot *slot)
{
+ int count = 0;
dout(20) << __func__ << " " << pgid
<< " to_process " << slot->to_process
<< " waiting " << slot->waiting
i != slot->to_process.rend();
++i) {
scheduler->enqueue_front(std::move(*i));
+ count++;
}
slot->to_process.clear();
for (auto i = slot->waiting.rbegin();
i != slot->waiting.rend();
++i) {
scheduler->enqueue_front(std::move(*i));
+ count++;
}
slot->waiting.clear();
for (auto i = slot->waiting_peering.rbegin();
// someday, if we decide this inefficiency matters
for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
scheduler->enqueue_front(std::move(*j));
+ count++;
}
}
slot->waiting_peering.clear();
++slot->requeue_seq;
+ return count;
}
void OSDShard::identify_splits_and_merges(
void OSDShard::register_and_wake_split_child(PG *pg)
{
+ dout(15) << __func__ << ": " << pg << " #:" << pg_slots.size() << dendl;
epoch_t epoch;
{
std::lock_guard l(shard_lock);
- dout(10) << pg->pg_id << " " << pg << dendl;
+ dout(10) << __func__ << ": " << pg->pg_id << " " << pg << dendl;
auto p = pg_slots.find(pg->pg_id);
ceph_assert(p != pg_slots.end());
auto *slot = p->second.get();
- dout(20) << pg->pg_id << " waiting_for_split " << slot->waiting_for_split
- << dendl;
+ dout(20) << __func__ << ": " << pg->pg_id << " waiting_for_split "
+ << slot->waiting_for_split << dendl;
ceph_assert(!slot->pg);
ceph_assert(!slot->waiting_for_split.empty());
_attach_pg(slot, pg);
}
}
+void OSDShard::update_scheduler_config()
+{
+ std::lock_guard l(shard_lock);
+ scheduler->update_configuration();
+}
+
+std::string OSDShard::get_scheduler_type()
+{
+ std::ostringstream scheduler_type;
+ scheduler_type << *scheduler;
+ return scheduler_type.str();
+}
+
OSDShard::OSDShard(
int id,
CephContext *cct,
shard_lock_name(shard_name + "::shard_lock"),
shard_lock{make_mutex(shard_lock_name)},
scheduler(ceph::osd::scheduler::make_scheduler(
- cct, osd->num_shards, osd->store->is_rotational())),
+ cct, osd->num_shards, osd->store->is_rotational(),
+ osd->store->get_type())),
context_queue(sdata_wait_lock, sdata_cond)
{
dout(0) << "using op scheduler " << *scheduler << dendl;
std::unique_lock wait_lock{sdata->sdata_wait_lock};
auto future_time = ceph::real_clock::from_double(*when_ready);
dout(10) << __func__ << " dequeue future request at " << future_time << dendl;
+ // Disable heartbeat timeout until we find a non-future work item to process.
+ osd->cct->get_heartbeat_map()->clear_timeout(hb);
sdata->shard_lock.unlock();
++sdata->waiting_threads;
sdata->sdata_cond.wait_until(wait_lock, future_time);
--sdata->waiting_threads;
wait_lock.unlock();
sdata->shard_lock.lock();
+ // Reapply default wq timeouts
+ osd->cct->get_heartbeat_map()->reset_timeout(hb,
+ timeout_interval, suicide_interval);
}
} // while
}
void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) {
+ if (unlikely(m_fast_shutdown) ) {
+ // stop enqueing when we are in the middle of a fast shutdown
+ return;
+ }
+
uint32_t shard_index =
item.get_ordering_token().hash_to_shard(osd->shards.size());
- dout(20) << __func__ << " " << item << dendl;
-
OSDShard* sdata = osd->shards[shard_index];
assert (NULL != sdata);
+ if (sdata->get_scheduler_type() == "mClockScheduler") {
+ item.maybe_set_is_qos_item();
+ }
+
+ dout(20) << __func__ << " " << item << dendl;
bool empty = true;
{
void OSD::ShardedOpWQ::_enqueue_front(OpSchedulerItem&& item)
{
+ if (unlikely(m_fast_shutdown) ) {
+ // stop enqueing when we are in the middle of a fast shutdown
+ return;
+ }
+
auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size());
auto& sdata = osd->shards[shard_index];
ceph_assert(sdata);
sdata->sdata_cond.notify_one();
}
+void OSD::ShardedOpWQ::stop_for_fast_shutdown()
+{
+ uint32_t shard_index = 0;
+ m_fast_shutdown = true;
+
+ for (; shard_index < osd->num_shards; shard_index++) {
+ auto& sdata = osd->shards[shard_index];
+ ceph_assert(sdata);
+ sdata->shard_lock.lock();
+ int work_count = 0;
+ while(! sdata->scheduler->empty() ) {
+ auto work_item = sdata->scheduler->dequeue();
+ work_count++;
+ }
+ sdata->shard_lock.unlock();
+ }
+}
+
namespace ceph::osd_cmds {
int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f,