]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_worker.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_worker.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2019 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16
17 #pragma once
18
19 #include <atomic>
20
21 #include "common/Thread.h"
22 #include "common/ceph_mutex.h"
23 #include "include/common_fwd.h"
24
25 class RGWRados;
26
27 class RGWRadosThread {
28 class Worker : public Thread, public DoutPrefixProvider {
29 CephContext *cct;
30 RGWRadosThread *processor;
31 ceph::mutex lock = ceph::make_mutex("RGWRadosThread::Worker");
32 ceph::condition_variable cond;
33
34 void wait() {
35 std::unique_lock l{lock};
36 cond.wait(l);
37 };
38
39 void wait_interval(const ceph::real_clock::duration& wait_time) {
40 std::unique_lock l{lock};
41 cond.wait_for(l, wait_time);
42 }
43
44 public:
45 Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p) {}
46 void *entry() override;
47 void signal() {
48 std::lock_guard l{lock};
49 cond.notify_all();
50 }
51
52 CephContext *get_cct() const { return cct; }
53 unsigned get_subsys() const { return ceph_subsys_rgw; }
54 std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw rados thread: "; }
55
56 };
57
58 Worker *worker;
59
60 protected:
61 CephContext *cct;
62 RGWRados *store;
63
64 std::atomic<bool> down_flag = { false };
65
66 std::string thread_name;
67
68 virtual uint64_t interval_msec() = 0;
69 virtual void stop_process() {}
70 public:
71 RGWRadosThread(RGWRados *_store, const std::string& thread_name = "radosgw")
72 : worker(NULL), cct(_store->ctx()), store(_store), thread_name(thread_name) {}
73 virtual ~RGWRadosThread() {
74 stop();
75 }
76
77 virtual int init(const DoutPrefixProvider *dpp) { return 0; }
78 virtual int process(const DoutPrefixProvider *dpp) = 0;
79
80 bool going_down() { return down_flag; }
81
82 void start();
83 void stop();
84
85 void signal() {
86 if (worker) {
87 worker->signal();
88 }
89 }
90 };
91