]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / concurrency / TimerManager.cpp
diff --git a/ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp b/ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp
new file mode 100644 (file)
index 0000000..703c19e
--- /dev/null
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/concurrency/TimerManager.h>
+#include <thrift/concurrency/Exception.h>
+
+#include <assert.h>
+#include <iostream>
+#include <memory>
+#include <set>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+
+using std::shared_ptr;
+using std::weak_ptr;
+
+/**
+ * TimerManager class
+ *
+ * @version $Id:$
+ */
+class TimerManager::Task : public Runnable {
+
+public:
+  enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
+
+  Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
+
+  ~Task() override = default;
+
+  void run() override {
+    if (state_ == EXECUTING) {
+      runnable_->run();
+      state_ = COMPLETE;
+    }
+  }
+
+  bool operator==(const shared_ptr<Runnable> & runnable) const { return runnable_ == runnable; }
+
+  task_iterator it_;
+
+private:
+  shared_ptr<Runnable> runnable_;
+  friend class TimerManager::Dispatcher;
+  STATE state_;
+};
+
+class TimerManager::Dispatcher : public Runnable {
+
+public:
+  Dispatcher(TimerManager* manager) : manager_(manager) {}
+
+  ~Dispatcher() override = default;
+
+  /**
+   * Dispatcher entry point
+   *
+   * As long as dispatcher thread is running, pull tasks off the task taskMap_
+   * and execute.
+   */
+  void run() override {
+    {
+      Synchronized s(manager_->monitor_);
+      if (manager_->state_ == TimerManager::STARTING) {
+        manager_->state_ = TimerManager::STARTED;
+        manager_->monitor_.notifyAll();
+      }
+    }
+
+    do {
+      std::set<shared_ptr<TimerManager::Task> > expiredTasks;
+      {
+        Synchronized s(manager_->monitor_);
+        task_iterator expiredTaskEnd;
+        auto now = std::chrono::steady_clock::now();
+        while (manager_->state_ == TimerManager::STARTED
+               && (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
+                  == manager_->taskMap_.begin()) {
+          std::chrono::milliseconds timeout(0);
+          if (!manager_->taskMap_.empty()) {
+            timeout = std::chrono::duration_cast<std::chrono::milliseconds>(manager_->taskMap_.begin()->first - now);
+            //because the unit of steady_clock is smaller than millisecond,timeout may be 0.
+            if (timeout.count() == 0) {
+              timeout = std::chrono::milliseconds(1);
+            }
+            manager_->monitor_.waitForTimeRelative(timeout);
+          } else {
+            manager_->monitor_.waitForTimeRelative(0);
+          }
+          now = std::chrono::steady_clock::now();
+        }
+
+        if (manager_->state_ == TimerManager::STARTED) {
+          for (auto ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
+            shared_ptr<TimerManager::Task> task = ix->second;
+            expiredTasks.insert(task);
+            task->it_ = manager_->taskMap_.end();
+            if (task->state_ == TimerManager::Task::WAITING) {
+              task->state_ = TimerManager::Task::EXECUTING;
+            }
+            manager_->taskCount_--;
+          }
+          manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
+        }
+      }
+
+      for (const auto & expiredTask : expiredTasks) {
+        expiredTask->run();
+      }
+
+    } while (manager_->state_ == TimerManager::STARTED);
+
+    {
+      Synchronized s(manager_->monitor_);
+      if (manager_->state_ == TimerManager::STOPPING) {
+        manager_->state_ = TimerManager::STOPPED;
+        manager_->monitor_.notifyAll();
+      }
+    }
+    return;
+  }
+
+private:
+  TimerManager* manager_;
+  friend class TimerManager;
+};
+
+#if defined(_MSC_VER)
+#pragma warning(push)
+#pragma warning(disable : 4355) // 'this' used in base member initializer list
+#endif
+
+TimerManager::TimerManager()
+  : taskCount_(0),
+    state_(TimerManager::UNINITIALIZED),
+    dispatcher_(std::make_shared<Dispatcher>(this)) {
+}
+
+#if defined(_MSC_VER)
+#pragma warning(pop)
+#endif
+
+TimerManager::~TimerManager() {
+
+  // If we haven't been explicitly stopped, do so now.  We don't need to grab
+  // the monitor here, since stop already takes care of reentrancy.
+
+  if (state_ != STOPPED) {
+    try {
+      stop();
+    } catch (...) {
+      // We're really hosed.
+    }
+  }
+}
+
+void TimerManager::start() {
+  bool doStart = false;
+  {
+    Synchronized s(monitor_);
+    if (!threadFactory_) {
+      throw InvalidArgumentException();
+    }
+    if (state_ == TimerManager::UNINITIALIZED) {
+      state_ = TimerManager::STARTING;
+      doStart = true;
+    }
+  }
+
+  if (doStart) {
+    dispatcherThread_ = threadFactory_->newThread(dispatcher_);
+    dispatcherThread_->start();
+  }
+
+  {
+    Synchronized s(monitor_);
+    while (state_ == TimerManager::STARTING) {
+      monitor_.wait();
+    }
+    assert(state_ != TimerManager::STARTING);
+  }
+}
+
+void TimerManager::stop() {
+  bool doStop = false;
+  {
+    Synchronized s(monitor_);
+    if (state_ == TimerManager::UNINITIALIZED) {
+      state_ = TimerManager::STOPPED;
+    } else if (state_ != STOPPING && state_ != STOPPED) {
+      doStop = true;
+      state_ = STOPPING;
+      monitor_.notifyAll();
+    }
+    while (state_ != STOPPED) {
+      monitor_.wait();
+    }
+  }
+
+  if (doStop) {
+    // Clean up any outstanding tasks
+    taskMap_.clear();
+
+    // Remove dispatcher's reference to us.
+    dispatcher_->manager_ = nullptr;
+  }
+}
+
+shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
+  Synchronized s(monitor_);
+  return threadFactory_;
+}
+
+void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
+  Synchronized s(monitor_);
+  threadFactory_ = value;
+}
+
+size_t TimerManager::taskCount() const {
+  return taskCount_;
+}
+
+TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout) {
+  return add(task, std::chrono::steady_clock::now() + timeout);
+}
+
+TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
+    const std::chrono::time_point<std::chrono::steady_clock>& abstime) {
+  auto now = std::chrono::steady_clock::now();
+
+  if (abstime < now) {
+    throw InvalidArgumentException();
+  }
+  Synchronized s(monitor_);
+  if (state_ != TimerManager::STARTED) {
+    throw IllegalStateException();
+  }
+
+  // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
+  // if the expiration time is shorter than the current value. Need to test before we insert,
+  // because the new task might insert at the front.
+  bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first;
+
+  shared_ptr<Task> timer(new Task(task));
+  taskCount_++;
+  timer->it_ = taskMap_.emplace(abstime, timer);
+
+  // If the task map was empty, or if we have an expiration that is earlier
+  // than any previously seen, kick the dispatcher so it can update its
+  // timeout
+  if (notifyRequired) {
+    monitor_.notify();
+  }
+
+  return timer;
+}
+
+void TimerManager::remove(shared_ptr<Runnable> task) {
+  Synchronized s(monitor_);
+  if (state_ != TimerManager::STARTED) {
+    throw IllegalStateException();
+  }
+  bool found = false;
+  for (auto ix = taskMap_.begin(); ix != taskMap_.end();) {
+    if (*ix->second == task) {
+      found = true;
+      taskCount_--;
+      taskMap_.erase(ix++);
+    } else {
+      ++ix;
+    }
+  }
+  if (!found) {
+    throw NoSuchTaskException();
+  }
+}
+
+void TimerManager::remove(Timer handle) {
+  Synchronized s(monitor_);
+  if (state_ != TimerManager::STARTED) {
+    throw IllegalStateException();
+  }
+
+  shared_ptr<Task> task = handle.lock();
+  if (!task) {
+    throw NoSuchTaskException();
+  }
+
+  if (task->it_ == taskMap_.end()) {
+    // Task is being executed
+    throw UncancellableTaskException();
+  }
+
+  taskMap_.erase(task->it_);
+  taskCount_--;
+}
+
+TimerManager::STATE TimerManager::state() const {
+  return state_;
+}
+}
+}
+} // apache::thrift::concurrency