#include "msg/Messenger.h"
#include "osdc/Journaler.h"
#include "common/errno.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "common/Finisher.h"
#define dout_subsys ceph_subsys_journaler
#define dout_prefix *_dout << objecter->messenger->get_myname() \
<< ".journaler." << name << (readonly ? "(ro) ":"(rw) ")
+using namespace std;
using std::chrono::seconds;
class Journaler::C_DelayFlush : public Context {
Journaler *journaler;
public:
- C_DelayFlush(Journaler *j) : journaler(j) {}
+ explicit C_DelayFlush(Journaler *j) : journaler(j) {}
void finish(int r) override {
journaler->_do_delayed_flush();
}
{
lock_guard lk(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
state = STATE_ACTIVE;
stream_format = sf;
{
layout = *l;
- assert(layout.pool_id == pg_pool);
+ if (layout.pool_id != pg_pool) {
+ // user can reset pool id through cephfs-journal-tool
+ lderr(cct) << "may got older pool id from header layout" << dendl;
+ ceph_abort();
+ }
last_written.layout = layout;
last_committed.layout = layout;
// prefetch intelligently.
// (watch out, this is big if you use big objects or weird striping)
- uint64_t periods = cct->_conf->journaler_prefetch_periods;
- if (periods < 2)
- periods = 2; // we need at least 2 periods to make progress.
+ uint64_t periods = cct->_conf.get_val<uint64_t>("journaler_prefetch_periods");
fetch_len = layout.get_period() * periods;
}
void Journaler::recover(Context *onread)
{
lock_guard l(lock);
- if (stopping) {
+ if (is_stopping()) {
onread->complete(-EAGAIN);
return;
}
ldout(cct, 1) << "recover start" << dendl;
- assert(state != STATE_ACTIVE);
- assert(readonly);
+ ceph_assert(state != STATE_ACTIVE);
+ ceph_assert(readonly);
if (onread)
waitfor_recover.push_back(wrap_finisher(onread));
void Journaler::_read_head(Context *on_finish, bufferlist *bl)
{
// lock is locked
- assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
+ ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD);
object_t oid = file_object_t(ino, 0);
object_locator_t oloc(pg_pool);
void Journaler::_reread_head(Context *onfinish)
{
ldout(cct, 10) << "reread_head" << dendl;
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
state = STATE_REREADHEAD;
C_RereadHead *fin = new C_RereadHead(this, onfinish);
void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
{
lock_guard l(lock);
+ if (is_stopping()) {
+ finish->complete(-EAGAIN);
+ return;
+ }
//read on-disk header into
- assert(bl.length() || r < 0 );
+ ceph_assert(bl.length() || r < 0 );
// unpack header
if (r == 0) {
Header h;
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
try {
- ::decode(h, p);
+ decode(h, p);
} catch (const buffer::error &e) {
finish->complete(-EINVAL);
return;
void Journaler::_finish_read_head(int r, bufferlist& bl)
{
lock_guard l(lock);
+ if (is_stopping())
+ return;
- assert(state == STATE_READHEAD);
+ ceph_assert(state == STATE_READHEAD);
if (r!=0) {
ldout(cct, 0) << "error getting journal off disk" << dendl;
// unpack header
bool corrupt = false;
Header h;
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
try {
- ::decode(h, p);
+ decode(h, p);
if (h.magic != magic) {
ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
{
// lock is locked
ldout(cct, 1) << "probing for end of the log" << dendl;
- assert(state == STATE_PROBING || state == STATE_REPROBING);
+ ceph_assert(state == STATE_PROBING || state == STATE_REPROBING);
// probe the log
filer.probe(ino, &layout, CEPH_NOSNAP,
write_pos, end, true, 0, wrap_finisher(finish));
void Journaler::_reprobe(C_OnFinisher *finish)
{
ldout(cct, 10) << "reprobe" << dendl;
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
state = STATE_REPROBING;
C_ReProbe *fin = new C_ReProbe(this, finish);
C_OnFinisher *onfinish)
{
lock_guard l(lock);
+ if (is_stopping()) {
+ onfinish->complete(-EAGAIN);
+ return;
+ }
- assert(new_end >= write_pos || r < 0);
+ ceph_assert(new_end >= write_pos || r < 0);
ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
<< " (header had " << write_pos << ")."
<< dendl;
void Journaler::_finish_probe_end(int r, uint64_t end)
{
lock_guard l(lock);
+ if (is_stopping())
+ return;
- assert(state == STATE_PROBING);
+ ceph_assert(state == STATE_PROBING);
if (r < 0) { // error in probing
goto out;
}
<< write_pos << "). log was empty. recovered." << dendl;
ceph_abort(); // hrm.
} else {
- assert(end >= write_pos);
+ ceph_assert(end >= write_pos);
ldout(cct, 1) << "_finish_probe_end write_pos = " << end
<< " (header had " << write_pos << "). recovered."
<< dendl;
{
lock_guard l(lock);
- assert(state == STATE_ACTIVE);
+ ceph_assert(state == STATE_ACTIVE);
_reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish)));
}
{
// Expect to be called back from finish_reread_head, which already takes lock
// lock is locked
+ if (is_stopping()) {
+ onfinish->complete(-EAGAIN);
+ return;
+ }
- assert(!r); //if we get an error, we're boned
+ // Let the caller know that the operation has failed or was intentionally
+ // failed since the caller has been blocklisted.
+ if (r == -EBLOCKLISTED) {
+ onfinish->complete(r);
+ return;
+ }
+
+ ceph_assert(!r); //if we get an error, we're boned
_reprobe(onfinish);
}
void Journaler::_write_head(Context *oncommit)
{
- assert(!readonly);
- assert(state == STATE_ACTIVE);
+ ceph_assert(!readonly);
+ ceph_assert(state == STATE_ACTIVE);
last_written.trimmed_pos = trimmed_pos;
last_written.expire_pos = expire_pos;
last_written.unused_field = expire_pos;
ldout(cct, 10) << "write_head " << last_written << dendl;
// Avoid persisting bad pointers in case of bugs
- assert(last_written.write_pos >= last_written.expire_pos);
- assert(last_written.expire_pos >= last_written.trimmed_pos);
+ ceph_assert(last_written.write_pos >= last_written.expire_pos);
+ ceph_assert(last_written.expire_pos >= last_written.trimmed_pos);
last_wrote_head = ceph::real_clock::now();
bufferlist bl;
- ::encode(last_written, bl);
+ encode(last_written, bl);
SnapContext snapc;
object_t oid = file_object_t(ino, 0);
handle_write_error(r);
return;
}
- assert(!readonly);
+ ceph_assert(!readonly);
ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
last_committed = wrote;
if (oncommit) {
void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp)
{
lock_guard l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
if (r < 0) {
lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
return;
}
- assert(start < flush_pos);
+ ceph_assert(start < flush_pos);
// calc latency?
if (logger) {
// adjust safe_pos
auto it = pending_safe.find(start);
- assert(it != pending_safe.end());
+ ceph_assert(it != pending_safe.end());
+ uint64_t min_next_safe_pos = pending_safe.begin()->second;
pending_safe.erase(it);
if (pending_safe.empty())
safe_pos = next_safe_pos;
else
- safe_pos = pending_safe.begin()->second;
+ safe_pos = min_next_safe_pos;
ldout(cct, 10) << "_finish_flush safe from " << start
<< ", pending_safe " << pending_safe
<< dendl;
// kick waiters <= safe_pos
- while (!waitfor_safe.empty()) {
- if (waitfor_safe.begin()->first > safe_pos)
- break;
- finish_contexts(cct, waitfor_safe.begin()->second);
- waitfor_safe.erase(waitfor_safe.begin());
+ if (!waitfor_safe.empty()) {
+ list<Context*> ls;
+ while (!waitfor_safe.empty()) {
+ auto it = waitfor_safe.begin();
+ if (it->first > safe_pos)
+ break;
+ ls.splice(ls.end(), it->second);
+ waitfor_safe.erase(it);
+ }
+ finish_contexts(cct, ls);
}
}
{
unique_lock l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
uint32_t s = bl.length();
// append
// flush previous object?
uint64_t su = get_layout_period();
- assert(su > 0);
+ ceph_assert(su > 0);
uint64_t write_off = write_pos % su;
uint64_t write_obj = write_pos / su;
uint64_t flush_obj = flush_pos / su;
ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro "
<< write_obj << " flo " << flush_obj << ")" << dendl;
_do_flush(write_buf.length() - write_off);
- if (write_off) {
- // current entry isn't being flushed, set next_safe_pos to the end of previous entry
+
+ // if _do_flush() skips flushing some data, it does do a best effort to
+ // update next_safe_pos.
+ if (write_buf.length() > 0 &&
+ write_buf.length() <= wrote) { // the unflushed data are within this entry
+ // set next_safe_pos to end of previous entry
next_safe_pos = write_pos - wrote;
}
}
void Journaler::_do_flush(unsigned amount)
{
+ if (is_stopping())
+ return;
if (write_pos == flush_pos)
return;
- assert(write_pos > flush_pos);
- assert(!readonly);
+ ceph_assert(write_pos > flush_pos);
+ ceph_assert(!readonly);
// flush
uint64_t len = write_pos - flush_pos;
- assert(len == write_buf.length());
+ ceph_assert(len == write_buf.length());
if (amount && amount < len)
len = amount;
ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
<< " already too close to prezero_pos " << prezero_pos
<< ", zeroing first" << dendl;
- waiting_for_zero = true;
+ waiting_for_zero_pos = flush_pos + len;
return;
}
if (static_cast<uint64_t>(newlen) < len) {
ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
<< " but hit prezero_pos " << prezero_pos
<< ", will do " << flush_pos << "~" << newlen << dendl;
+ waiting_for_zero_pos = flush_pos + len;
len = newlen;
- } else {
- waiting_for_zero = false;
}
- } else {
- waiting_for_zero = false;
}
ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
next_safe_pos = write_pos;
} else {
write_buf.splice(0, len, &write_bl);
+ // Keys of waitfor_safe map are journal entry boundaries.
+ // Try finding a journal entry that we are actually flushing
+ // and set next_safe_pos to end of it. This is best effort.
+ // The one we found may not be the lastest flushing entry.
+ auto p = waitfor_safe.lower_bound(flush_pos + len);
+ if (p != waitfor_safe.end()) {
+ if (p->first > flush_pos + len && p != waitfor_safe.begin())
+ --p;
+ if (p->first <= flush_pos + len && p->first > next_safe_pos)
+ next_safe_pos = p->first;
+ }
}
filer.write(ino, &layout, snapc,
wrap_finisher(onsafe), write_iohint);
flush_pos += len;
- assert(write_buf.length() == write_pos - flush_pos);
+ ceph_assert(write_buf.length() == write_pos - flush_pos);
write_buf_throttle.put(len);
ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl;
void Journaler::wait_for_flush(Context *onsafe)
{
lock_guard l(lock);
- if (stopping) {
- onsafe->complete(-EAGAIN);
+ if (is_stopping()) {
+ if (onsafe)
+ onsafe->complete(-EAGAIN);
return;
}
_wait_for_flush(onsafe);
void Journaler::_wait_for_flush(Context *onsafe)
{
- assert(!readonly);
+ ceph_assert(!readonly);
// all flushed and safe?
if (write_pos == safe_pos) {
- assert(write_buf.length() == 0);
+ ceph_assert(write_buf.length() == 0);
ldout(cct, 10)
<< "flush nothing to flush, (prezeroing/prezero)/write/flush/safe "
"pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/"
void Journaler::flush(Context *onsafe)
{
lock_guard l(lock);
+ if (is_stopping()) {
+ if (onsafe)
+ onsafe->complete(-EAGAIN);
+ return;
+ }
_flush(wrap_finisher(onsafe));
}
void Journaler::_flush(C_OnFinisher *onsafe)
{
- assert(!readonly);
+ ceph_assert(!readonly);
if (write_pos == flush_pos) {
- assert(write_buf.length() == 0);
+ ceph_assert(write_buf.length() == 0);
ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/"
"flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos
<< ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
bool Journaler::_write_head_needed()
{
- return last_wrote_head + seconds(cct->_conf->journaler_write_head_interval)
+ return last_wrote_head + seconds(cct->_conf.get_val<int64_t>("journaler_write_head_interval"))
< ceph::real_clock::now();
}
void Journaler::_issue_prezero()
{
- assert(prezeroing_pos >= flush_pos);
-
- // we need to zero at least two periods, minimum, to ensure that we
- // have a full empty object/period in front of us.
- uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods);
+ ceph_assert(prezeroing_pos >= flush_pos);
+ uint64_t num_periods = cct->_conf.get_val<uint64_t>("journaler_prezero_periods");
/*
* issue zero requests based on write_pos, even though the invariant
* is that we zero ahead of flush_pos.
return;
}
- assert(r == 0 || r == -ENOENT);
+ ceph_assert(r == 0 || r == -ENOENT);
if (start == prezero_pos) {
prezero_pos += len;
pending_zero.erase(b);
}
- if (waiting_for_zero) {
- _do_flush();
+ if (waiting_for_zero_pos > flush_pos) {
+ _do_flush(waiting_for_zero_pos - flush_pos);
+ }
+
+ if (prezero_pos == prezeroing_pos &&
+ !waitfor_prezero.empty()) {
+ list<Context*> ls;
+ ls.swap(waitfor_prezero);
+ finish_contexts(cct, ls, 0);
}
} else {
pending_zero.insert(start, len);
<< dendl;
}
+void Journaler::wait_for_prezero(Context *onfinish)
+{
+ ceph_assert(onfinish);
+ lock_guard l(lock);
+
+ if (prezero_pos == prezeroing_pos) {
+ finisher->queue(onfinish, 0);
+ return;
+ }
+ waitfor_prezero.push_back(wrap_finisher(onfinish));
+}
/***************** READING *******************/
<< p->second.length() << dendl;
received_pos += p->second.length();
read_buf.claim_append(p->second);
- assert(received_pos <= requested_pos);
+ ceph_assert(received_pos <= requested_pos);
prefetch_buf.erase(p);
got_any = true;
}
if (got_any) {
ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~"
- << read_buf.length() << ", read pointers " << read_pos
- << "/" << received_pos << "/" << requested_pos
+ << read_buf.length() << ", read pointers read_pos=" << read_pos
+ << " received_pos=" << received_pos << " requested_pos=" << requested_pos
<< dendl;
// Update readability (this will also hit any decode errors resulting
{
// stuck at safe_pos? (this is needed if we are reading the tail of
// a journal we are also writing to)
- assert(requested_pos <= safe_pos);
+ ceph_assert(requested_pos <= safe_pos);
if (requested_pos == safe_pos) {
ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos
<< ", waiting" << dendl;
- assert(write_pos > requested_pos);
+ ceph_assert(write_pos > requested_pos);
if (pending_safe.empty()) {
_flush(NULL);
}
- waitfor_safe[flush_pos].push_back(new C_RetryRead(this));
+
+ // Make sure keys of waitfor_safe map are journal entry boundaries.
+ // The key we used here is either next_safe_pos or old value of
+ // next_safe_pos. next_safe_pos is always set to journal entry
+ // boundary.
+ auto p = pending_safe.rbegin();
+ if (p != pending_safe.rend())
+ waitfor_safe[p->second].push_back(new C_RetryRead(this));
+ else
+ waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this));
return;
}
// go.
ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len
- << ", read pointers " << read_pos << "/" << received_pos
- << "/" << (requested_pos+len) << dendl;
+ << ", read pointers read_pos=" << read_pos << " received_pos=" << received_pos
+ << " requested_pos+len=" << (requested_pos+len) << dendl;
// step by period (object). _don't_ do a single big filer.read()
// here because it will wait for all object reads to complete before
void Journaler::_prefetch()
{
+ if (is_stopping())
+ return;
+
ldout(cct, 10) << "_prefetch" << dendl;
// prefetch
uint64_t pf;
// adjust write_pos
prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos;
- assert(write_buf.length() == 0);
+ ceph_assert(write_buf.length() == 0);
+ ceph_assert(waitfor_safe.empty());
// reset read state
requested_pos = received_pos = read_pos;
void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
{
lock_guard l(lock);
+ if (is_stopping()) {
+ completion->complete(-EAGAIN);
+ return;
+ }
if (data_result == 0) {
// Async delete the journal header
try {
consumed = journal_stream.read(read_buf, &bl, &start_ptr);
if (stream_format >= JOURNAL_FORMAT_RESILIENT) {
- assert(start_ptr == read_pos);
+ ceph_assert(start_ptr == read_pos);
}
} catch (const buffer::error &e) {
lderr(cct) << __func__ << ": decode error from journal_stream" << dendl;
// prefetch?
_prefetch();
+
+ // If bufferlist consists of discontiguous memory, decoding types whose
+ // denc_traits needs contiguous memory is inefficient. The bufferlist may
+ // get copied to temporary memory multiple times (copy_shallow() in
+ // src/include/denc.h actually does deep copy)
+ if (bl.get_num_buffers() > 1)
+ bl.rebuild();
return true;
}
void Journaler::wait_for_readable(Context *onreadable)
{
lock_guard l(lock);
- if (stopping) {
- onreadable->complete(-EAGAIN);
+ if (is_stopping()) {
+ finisher->queue(onreadable, -EAGAIN);
return;
}
- assert(on_readable == 0);
+ ceph_assert(on_readable == 0);
if (!readable) {
ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable "
<< onreadable << dendl;
void Journaler::_trim()
{
- assert(!readonly);
+ if (is_stopping())
+ return;
+
+ ceph_assert(!readonly);
uint64_t period = get_layout_period();
uint64_t trim_to = last_committed.expire_pos;
trim_to -= trim_to % period;
}
// trim
- assert(trim_to <= write_pos);
- assert(trim_to <= expire_pos);
- assert(trim_to > trimming_pos);
+ ceph_assert(trim_to <= write_pos);
+ ceph_assert(trim_to <= expire_pos);
+ ceph_assert(trim_to > trimming_pos);
ldout(cct, 10) << "trim trimming to " << trim_to
<< ", trimmed/trimming/expire are "
<< trimmed_pos << "/" << trimming_pos << "/" << expire_pos
{
lock_guard l(lock);
- assert(!readonly);
+ ceph_assert(!readonly);
ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
<< ", trimmed/trimming/expire now "
<< to << "/" << trimming_pos << "/" << expire_pos
return;
}
- assert(r >= 0 || r == -ENOENT);
+ ceph_assert(r >= 0 || r == -ENOENT);
- assert(to <= trimming_pos);
- assert(to > trimmed_pos);
+ ceph_assert(to <= trimming_pos);
+ ceph_assert(to > trimmed_pos);
trimmed_pos = to;
}
lderr(cct) << __func__ << ": multiple write errors, handler already called"
<< dendl;
} else {
- assert(0 == "unhandled write error");
+ ceph_abort_msg("unhandled write error");
}
}
*/
bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const
{
- assert(need != NULL);
+ ceph_assert(need != NULL);
uint32_t entry_size = 0;
uint64_t entry_sentinel = 0;
- bufferlist::iterator p = read_buf.begin();
+ auto p = read_buf.cbegin();
// Do we have enough data to decode an entry prefix?
if (format >= JOURNAL_FORMAT_RESILIENT) {
}
if (read_buf.length() >= *need) {
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::decode(entry_sentinel, p);
+ decode(entry_sentinel, p);
if (entry_sentinel != sentinel) {
throw buffer::malformed_input("Invalid sentinel");
}
}
- ::decode(entry_size, p);
+ decode(entry_size, p);
} else {
return false;
}
size_t JournalStream::read(bufferlist &from, bufferlist *entry,
uint64_t *start_ptr)
{
- assert(start_ptr != NULL);
- assert(entry != NULL);
- assert(entry->length() == 0);
+ ceph_assert(start_ptr != NULL);
+ ceph_assert(entry != NULL);
+ ceph_assert(entry->length() == 0);
uint32_t entry_size = 0;
// Consume envelope prefix: entry_size and entry_sentinel
- bufferlist::iterator from_ptr = from.begin();
+ auto from_ptr = from.cbegin();
if (format >= JOURNAL_FORMAT_RESILIENT) {
uint64_t entry_sentinel = 0;
- ::decode(entry_sentinel, from_ptr);
+ decode(entry_sentinel, from_ptr);
// Assertion instead of clean check because of precondition of this
// fn is that readable() already passed
- assert(entry_sentinel == sentinel);
+ ceph_assert(entry_sentinel == sentinel);
}
- ::decode(entry_size, from_ptr);
+ decode(entry_size, from_ptr);
// Read out the payload
from_ptr.copy(entry_size, *entry);
// Consume the envelope suffix (start_ptr)
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::decode(*start_ptr, from_ptr);
+ decode(*start_ptr, from_ptr);
} else {
*start_ptr = 0;
}
size_t JournalStream::write(bufferlist &entry, bufferlist *to,
uint64_t const &start_ptr)
{
- assert(to != NULL);
+ ceph_assert(to != NULL);
uint32_t const entry_size = entry.length();
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::encode(sentinel, *to);
+ encode(sentinel, *to);
}
- ::encode(entry_size, *to);
+ encode(entry_size, *to);
to->claim_append(entry);
if (format >= JOURNAL_FORMAT_RESILIENT) {
- ::encode(start_ptr, *to);
+ encode(start_ptr, *to);
}
if (format >= JOURNAL_FORMAT_RESILIENT) {
*/
void Journaler::set_write_error_handler(Context *c) {
lock_guard l(lock);
- assert(!on_write_error);
+ ceph_assert(!on_write_error);
on_write_error = wrap_finisher(c);
called_write_error = false;
}
ldout(cct, 1) << __func__ << dendl;
+ state = STATE_STOPPING;
readable = false;
- stopping = true;
// Kick out anyone reading from journal
error = -EAGAIN;
f->complete(-EAGAIN);
}
- finish_contexts(cct, waitfor_recover, -ESHUTDOWN);
+ list<Context*> ls;
+ ls.swap(waitfor_recover);
+ finish_contexts(cct, ls, -ESHUTDOWN);
std::map<uint64_t, std::list<Context*> >::iterator i;
for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {