for (auto stack : spawned.entries) {
stack->put();
}
-
- if (preallocated_stack) {
- preallocated_stack->put();
- }
}
int RGWCoroutinesStack::operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *_env)
rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
- RGWCoroutinesStack *stack = preallocated_stack;
- if (!stack) {
- stack = env->manager->allocate_stack();
- }
- preallocated_stack = nullptr;
-
+ RGWCoroutinesStack *stack = env->manager->allocate_stack();
s->add_pending(stack);
stack->parent = this;
return spawn(NULL, op, wait);
}
-RGWCoroutinesStack *RGWCoroutinesStack::prealloc_stack()
-{
- if (!preallocated_stack) {
- preallocated_stack = env->manager->allocate_stack();
- }
- return preallocated_stack;
-}
-
int RGWCoroutinesStack::wait(const utime_t& interval)
{
RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
- RGWCompletionManager::io_completion& io, int *blocked_count)
+ RGWCompletionManager::io_completion& io, int *blocked_count, int *interval_wait_count)
{
ceph_assert(ceph_mutex_is_wlocked(lock));
RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
if (stack->is_io_blocked()) {
--(*blocked_count);
stack->set_io_blocked(false);
+ if (stack->is_interval_waiting()) {
+ --(*interval_wait_count);
+ }
}
stack->set_interval_wait(false);
if (!stack->is_done()) {
}
while (completion_mgr->try_get_next(&io)) {
- handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
+ handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count, &interval_wait_count);
}
/*
if (ret < 0) {
ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
}
- handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
+ handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count, &interval_wait_count);
}
next:
canceled = true;
break;
}
- handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
+ handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count, &interval_wait_count);
iter = scheduled_stacks.begin();
}
if (canceled) {
return stack->spawn(this, op, wait);
}
-RGWCoroutinesStack *RGWCoroutine::prealloc_stack()
-{
- return stack->prealloc_stack();
-}
-
-uint64_t RGWCoroutine::prealloc_stack_id()
-{
- return prealloc_stack()->get_id();
-}
-
bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
{
return stack->collect(this, ret, skip_stack, stack_id);