]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_flight.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_flight.h
CommitLineData
1e59de90
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright 2023 IBM
8 *
9 * See file COPYING for licensing information.
10 */
11
12#pragma once
13
14#include <map>
15#include <mutex>
16#include <atomic>
17
18#include "include/common_fwd.h"
19#include "common/ceph_context.h"
20#include "common/Thread.h"
21#include "common/ceph_time.h"
22#include "rgw_frontend.h"
23#include "arrow/type.h"
24#include "arrow/flight/server.h"
25#include "arrow/util/string_view.h"
26
27#include "rgw_flight_frontend.h"
28
29
30#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": "
31#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": "
32#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": "
33#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": "
34
35#define INFO INFO_F(dp)
36#define STATUS STATUS_F(dp)
37#define WARN WARN_F(dp)
38#define ERROR ERROR_F(dp)
39
40
41namespace arw = arrow;
42namespace flt = arrow::flight;
43
44
45struct req_state;
46
47namespace rgw::flight {
48
49static const coarse_real_clock::duration lifespan = std::chrono::hours(1);
50
51struct FlightData {
52 FlightKey key;
53 // coarse_real_clock::time_point expires;
54 std::string uri;
55 std::string tenant_name;
56 std::string bucket_name;
57 rgw_obj_key object_key;
58 // NB: what about object's namespace and instance?
59 uint64_t num_records;
60 uint64_t obj_size;
61 std::shared_ptr<arw::Schema> schema;
62 std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
63
64 rgw_user user_id; // TODO: this should be removed when we do
65 // proper flight authentication
66
67 FlightData(const std::string& _uri,
68 const std::string& _tenant_name,
69 const std::string& _bucket_name,
70 const rgw_obj_key& _object_key,
71 uint64_t _num_records,
72 uint64_t _obj_size,
73 std::shared_ptr<arw::Schema>& _schema,
74 std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata,
75 rgw_user _user_id);
76};
77
78// stores flights that have been created and helps expire them
79class FlightStore {
80
81protected:
82
83 const DoutPrefix& dp;
84
85public:
86
87 FlightStore(const DoutPrefix& dp);
88 virtual ~FlightStore();
89 virtual FlightKey add_flight(FlightData&& flight) = 0;
90
91 // TODO consider returning const shared pointers to FlightData in
92 // the following two functions
93 virtual arw::Result<FlightData> get_flight(const FlightKey& key) const = 0;
94 virtual std::optional<FlightData> after_key(const FlightKey& key) const = 0;
95
96 virtual int remove_flight(const FlightKey& key) = 0;
97 virtual int expire_flights() = 0;
98};
99
100class MemoryFlightStore : public FlightStore {
101 std::map<FlightKey, FlightData> map;
102 mutable std::mutex mtx; // for map
103
104public:
105
106 MemoryFlightStore(const DoutPrefix& dp);
107 virtual ~MemoryFlightStore();
108 FlightKey add_flight(FlightData&& flight) override;
109 arw::Result<FlightData> get_flight(const FlightKey& key) const override;
110 std::optional<FlightData> after_key(const FlightKey& key) const override;
111 int remove_flight(const FlightKey& key) override;
112 int expire_flights() override;
113};
114
115class FlightServer : public flt::FlightServerBase {
116
117 using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>;
118
119 RGWProcessEnv& env;
120 rgw::sal::Driver* driver;
121 const DoutPrefix& dp;
122 FlightStore* flight_store;
123
124 std::map<std::string, Data1> data;
125
126public:
127
128 static constexpr int default_port = 8077;
129
130 FlightServer(RGWProcessEnv& env,
131 FlightStore* flight_store,
132 const DoutPrefix& dp);
133 ~FlightServer() override;
134
135 FlightStore* get_flight_store() {
136 return flight_store;
137 }
138
139 arw::Status ListFlights(const flt::ServerCallContext& context,
140 const flt::Criteria* criteria,
141 std::unique_ptr<flt::FlightListing>* listings) override;
142
143 arw::Status GetFlightInfo(const flt::ServerCallContext &context,
144 const flt::FlightDescriptor &request,
145 std::unique_ptr<flt::FlightInfo> *info) override;
146
147 arw::Status GetSchema(const flt::ServerCallContext &context,
148 const flt::FlightDescriptor &request,
149 std::unique_ptr<flt::SchemaResult> *schema) override;
150
151 arw::Status DoGet(const flt::ServerCallContext &context,
152 const flt::Ticket &request,
153 std::unique_ptr<flt::FlightDataStream> *stream) override;
154}; // class FlightServer
155
156class OwningStringView : public arw::util::string_view {
157
158 uint8_t* buffer;
159 int64_t capacity;
160 int64_t consumed;
161
162 OwningStringView(uint8_t* _buffer, int64_t _size) :
163 arw::util::string_view((const char*) _buffer, _size),
164 buffer(_buffer),
165 capacity(_size),
166 consumed(_size)
167 { }
168
169 OwningStringView(OwningStringView&& from, int64_t new_size) :
170 buffer(nullptr),
171 capacity(from.capacity),
172 consumed(new_size)
173 {
174 // should be impossible due to static function check
175 ceph_assertf(consumed <= capacity, "new size cannot exceed capacity");
176
177 std::swap(buffer, from.buffer);
178 from.capacity = 0;
179 from.consumed = 0;
180 }
181
182public:
183
184 OwningStringView(OwningStringView&&) = default;
185 OwningStringView& operator=(OwningStringView&&) = default;
186
187 uint8_t* writeable_data() {
188 return buffer;
189 }
190
191 ~OwningStringView() {
192 if (buffer) {
193 delete[] buffer;
194 }
195 }
196
197 static arw::Result<OwningStringView> make(int64_t size) {
198 uint8_t* buffer = new uint8_t[size];
199 if (!buffer) {
200 return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size);
201 }
202 return OwningStringView(buffer, size);
203 }
204
205 static arw::Result<OwningStringView> shrink(OwningStringView&& from,
206 int64_t new_size) {
207 if (new_size > from.capacity) {
208 return arw::Status::Invalid("new size cannot exceed capacity");
209 } else {
210 return OwningStringView(std::move(from), new_size);
211 }
212 }
213
214};
215
216// GLOBAL
217
218flt::Ticket FlightKeyToTicket(const FlightKey& key);
219arw::Status TicketToFlightKey(const flt::Ticket& t, FlightKey& key);
220
221} // namespace rgw::flight