]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/PurgeQueue.h
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / mds / PurgeQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2015 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef PURGE_QUEUE_H_
16 #define PURGE_QUEUE_H_
17
18 #include "include/compact_set.h"
19 #include "mds/MDSMap.h"
20 #include "osdc/Journaler.h"
21
22
23 /**
24 * Descriptor of the work associated with purging a file. We record
25 * the minimal amount of information from the inode such as the size
26 * and layout: all other un-needed inode metadata (times, permissions, etc)
27 * has been discarded.
28 */
29 class PurgeItem
30 {
31 public:
32 enum Action : uint8_t {
33 NONE = 0,
34 PURGE_FILE = 1,
35 TRUNCATE_FILE,
36 PURGE_DIR
37 };
38
39 utime_t stamp;
40 //None PurgeItem serves as NoOp for splicing out journal entries;
41 //so there has to be a "pad_size" to specify the size of journal
42 //space to be spliced.
43 uint32_t pad_size;
44 Action action;
45 inodeno_t ino;
46 uint64_t size;
47 file_layout_t layout;
48 compact_set<int64_t> old_pools;
49 SnapContext snapc;
50 fragtree_t fragtree;
51
52 PurgeItem()
53 : pad_size(0), action(NONE), ino(0), size(0)
54 {}
55
56 void encode(bufferlist &bl) const;
57 void decode(bufferlist::const_iterator &p);
58
59 static Action str_to_type(std::string_view str) {
60 return PurgeItem::actions.at(std::string(str));
61 }
62
63 void dump(Formatter *f) const
64 {
65 f->dump_int("action", action);
66 f->dump_int("ino", ino);
67 f->dump_int("size", size);
68 f->open_object_section("layout");
69 layout.dump(f);
70 f->close_section();
71 f->open_object_section("SnapContext");
72 snapc.dump(f);
73 f->close_section();
74 f->open_object_section("fragtree");
75 fragtree.dump(f);
76 f->close_section();
77 }
78
79 std::string_view get_type_str() const;
80 private:
81 static const std::map<std::string, PurgeItem::Action> actions;
82 };
83 WRITE_CLASS_ENCODER(PurgeItem)
84
85 enum {
86 l_pq_first = 3500,
87
88 // How many items have been finished by PurgeQueue
89 l_pq_executing_ops,
90 l_pq_executing_ops_high_water,
91 l_pq_executing,
92 l_pq_executing_high_water,
93 l_pq_executed,
94 l_pq_last
95 };
96
97 /**
98 * A persistent queue of PurgeItems. This class both writes and reads
99 * to the queue. There is one of these per MDS rank.
100 *
101 * Note that this class does not take a reference to MDSRank: we are
102 * independent of all the metadata structures and do not need to
103 * take mds_lock for anything.
104 */
105 class PurgeQueue
106 {
107 private:
108 CephContext *cct;
109 const mds_rank_t rank;
110 Mutex lock;
111 bool readonly = false;
112
113 int64_t metadata_pool;
114
115 // Don't use the MDSDaemon's Finisher and Timer, because this class
116 // operates outside of MDSDaemon::mds_lock
117 Finisher finisher;
118 SafeTimer timer;
119 Filer filer;
120 Objecter *objecter;
121 std::unique_ptr<PerfCounters> logger;
122
123 Journaler journaler;
124
125 Context *on_error;
126
127 // Map of Journaler offset to PurgeItem
128 std::map<uint64_t, PurgeItem> in_flight;
129
130 std::set<uint64_t> pending_expire;
131
132 // Throttled allowances
133 uint64_t ops_in_flight;
134
135 // Dynamic op limit per MDS based on PG count
136 uint64_t max_purge_ops;
137
138 uint32_t _calculate_ops(const PurgeItem &item) const;
139
140 bool _can_consume();
141
142 // How many bytes were remaining when drain() was first called,
143 // used for indicating progress.
144 uint64_t drain_initial;
145
146 // Has drain() ever been called on this instance?
147 bool draining;
148
149 // recover the journal write_pos (drop any partial written entry)
150 void _recover();
151
152 /**
153 * @return true if we were in a position to try and consume something:
154 * does not mean we necessarily did.
155 */
156 bool _consume();
157
158 // Do we currently have a flush timer event waiting?
159 Context *delayed_flush;
160
161 void _execute_item(
162 const PurgeItem &item,
163 uint64_t expire_to);
164 void _execute_item_complete(
165 uint64_t expire_to);
166
167 bool recovered;
168 std::list<Context*> waiting_for_recovery;
169
170 void _go_readonly(int r);
171
172 uint64_t ops_high_water = 0;
173 uint64_t files_high_water = 0;
174
175 public:
176 void init();
177 void activate();
178 void shutdown();
179
180 void create_logger();
181
182 // Write an empty queue, use this during MDS rank creation
183 void create(Context *completion);
184
185 // Read the Journaler header for an existing queue and start consuming
186 void open(Context *completion);
187
188 void wait_for_recovery(Context *c);
189
190 // Submit one entry to the work queue. Call back when it is persisted
191 // to the queue (there is no callback for when it is executed)
192 void push(const PurgeItem &pi, Context *completion);
193
194 // If the on-disk queue is empty and we are not currently processing
195 // anything.
196 bool is_idle() const;
197
198 /**
199 * Signal to the PurgeQueue that you would like it to hurry up and
200 * finish consuming everything in the queue. Provides progress
201 * feedback.
202 *
203 * @param progress: bytes consumed since we started draining
204 * @param progress_total: max bytes that were outstanding during purge
205 * @param in_flight_count: number of file purges currently in flight
206 *
207 * @returns true if drain is complete
208 */
209 bool drain(
210 uint64_t *progress,
211 uint64_t *progress_total,
212 size_t *in_flight_count);
213
214 void update_op_limit(const MDSMap &mds_map);
215
216 void handle_conf_change(const std::set<std::string>& changed, const MDSMap& mds_map);
217
218 PurgeQueue(
219 CephContext *cct_,
220 mds_rank_t rank_,
221 const int64_t metadata_pool_,
222 Objecter *objecter_,
223 Context *on_error);
224 ~PurgeQueue();
225 };
226
227 #endif
228