]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright 2023 IBM | |
8 | * | |
9 | * See file COPYING for licensing information. | |
10 | */ | |
11 | ||
12 | #pragma once | |
13 | ||
14 | #include <map> | |
15 | #include <mutex> | |
16 | #include <atomic> | |
17 | ||
18 | #include "include/common_fwd.h" | |
19 | #include "common/ceph_context.h" | |
20 | #include "common/Thread.h" | |
21 | #include "common/ceph_time.h" | |
22 | #include "rgw_frontend.h" | |
23 | #include "arrow/type.h" | |
24 | #include "arrow/flight/server.h" | |
25 | #include "arrow/util/string_view.h" | |
26 | ||
27 | #include "rgw_flight_frontend.h" | |
28 | ||
29 | ||
30 | #define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": " | |
31 | #define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": " | |
32 | #define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": " | |
33 | #define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": " | |
34 | ||
35 | #define INFO INFO_F(dp) | |
36 | #define STATUS STATUS_F(dp) | |
37 | #define WARN WARN_F(dp) | |
38 | #define ERROR ERROR_F(dp) | |
39 | ||
40 | ||
41 | namespace arw = arrow; | |
42 | namespace flt = arrow::flight; | |
43 | ||
44 | ||
45 | struct req_state; | |
46 | ||
47 | namespace rgw::flight { | |
48 | ||
49 | static const coarse_real_clock::duration lifespan = std::chrono::hours(1); | |
50 | ||
51 | struct FlightData { | |
52 | FlightKey key; | |
53 | // coarse_real_clock::time_point expires; | |
54 | std::string uri; | |
55 | std::string tenant_name; | |
56 | std::string bucket_name; | |
57 | rgw_obj_key object_key; | |
58 | // NB: what about object's namespace and instance? | |
59 | uint64_t num_records; | |
60 | uint64_t obj_size; | |
61 | std::shared_ptr<arw::Schema> schema; | |
62 | std::shared_ptr<const arw::KeyValueMetadata> kv_metadata; | |
63 | ||
64 | rgw_user user_id; // TODO: this should be removed when we do | |
65 | // proper flight authentication | |
66 | ||
67 | FlightData(const std::string& _uri, | |
68 | const std::string& _tenant_name, | |
69 | const std::string& _bucket_name, | |
70 | const rgw_obj_key& _object_key, | |
71 | uint64_t _num_records, | |
72 | uint64_t _obj_size, | |
73 | std::shared_ptr<arw::Schema>& _schema, | |
74 | std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata, | |
75 | rgw_user _user_id); | |
76 | }; | |
77 | ||
78 | // stores flights that have been created and helps expire them | |
79 | class FlightStore { | |
80 | ||
81 | protected: | |
82 | ||
83 | const DoutPrefix& dp; | |
84 | ||
85 | public: | |
86 | ||
87 | FlightStore(const DoutPrefix& dp); | |
88 | virtual ~FlightStore(); | |
89 | virtual FlightKey add_flight(FlightData&& flight) = 0; | |
90 | ||
91 | // TODO consider returning const shared pointers to FlightData in | |
92 | // the following two functions | |
93 | virtual arw::Result<FlightData> get_flight(const FlightKey& key) const = 0; | |
94 | virtual std::optional<FlightData> after_key(const FlightKey& key) const = 0; | |
95 | ||
96 | virtual int remove_flight(const FlightKey& key) = 0; | |
97 | virtual int expire_flights() = 0; | |
98 | }; | |
99 | ||
100 | class MemoryFlightStore : public FlightStore { | |
101 | std::map<FlightKey, FlightData> map; | |
102 | mutable std::mutex mtx; // for map | |
103 | ||
104 | public: | |
105 | ||
106 | MemoryFlightStore(const DoutPrefix& dp); | |
107 | virtual ~MemoryFlightStore(); | |
108 | FlightKey add_flight(FlightData&& flight) override; | |
109 | arw::Result<FlightData> get_flight(const FlightKey& key) const override; | |
110 | std::optional<FlightData> after_key(const FlightKey& key) const override; | |
111 | int remove_flight(const FlightKey& key) override; | |
112 | int expire_flights() override; | |
113 | }; | |
114 | ||
115 | class FlightServer : public flt::FlightServerBase { | |
116 | ||
117 | using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>; | |
118 | ||
119 | RGWProcessEnv& env; | |
120 | rgw::sal::Driver* driver; | |
121 | const DoutPrefix& dp; | |
122 | FlightStore* flight_store; | |
123 | ||
124 | std::map<std::string, Data1> data; | |
125 | ||
126 | public: | |
127 | ||
128 | static constexpr int default_port = 8077; | |
129 | ||
130 | FlightServer(RGWProcessEnv& env, | |
131 | FlightStore* flight_store, | |
132 | const DoutPrefix& dp); | |
133 | ~FlightServer() override; | |
134 | ||
135 | FlightStore* get_flight_store() { | |
136 | return flight_store; | |
137 | } | |
138 | ||
139 | arw::Status ListFlights(const flt::ServerCallContext& context, | |
140 | const flt::Criteria* criteria, | |
141 | std::unique_ptr<flt::FlightListing>* listings) override; | |
142 | ||
143 | arw::Status GetFlightInfo(const flt::ServerCallContext &context, | |
144 | const flt::FlightDescriptor &request, | |
145 | std::unique_ptr<flt::FlightInfo> *info) override; | |
146 | ||
147 | arw::Status GetSchema(const flt::ServerCallContext &context, | |
148 | const flt::FlightDescriptor &request, | |
149 | std::unique_ptr<flt::SchemaResult> *schema) override; | |
150 | ||
151 | arw::Status DoGet(const flt::ServerCallContext &context, | |
152 | const flt::Ticket &request, | |
153 | std::unique_ptr<flt::FlightDataStream> *stream) override; | |
154 | }; // class FlightServer | |
155 | ||
156 | class OwningStringView : public arw::util::string_view { | |
157 | ||
158 | uint8_t* buffer; | |
159 | int64_t capacity; | |
160 | int64_t consumed; | |
161 | ||
162 | OwningStringView(uint8_t* _buffer, int64_t _size) : | |
163 | arw::util::string_view((const char*) _buffer, _size), | |
164 | buffer(_buffer), | |
165 | capacity(_size), | |
166 | consumed(_size) | |
167 | { } | |
168 | ||
169 | OwningStringView(OwningStringView&& from, int64_t new_size) : | |
170 | buffer(nullptr), | |
171 | capacity(from.capacity), | |
172 | consumed(new_size) | |
173 | { | |
174 | // should be impossible due to static function check | |
175 | ceph_assertf(consumed <= capacity, "new size cannot exceed capacity"); | |
176 | ||
177 | std::swap(buffer, from.buffer); | |
178 | from.capacity = 0; | |
179 | from.consumed = 0; | |
180 | } | |
181 | ||
182 | public: | |
183 | ||
184 | OwningStringView(OwningStringView&&) = default; | |
185 | OwningStringView& operator=(OwningStringView&&) = default; | |
186 | ||
187 | uint8_t* writeable_data() { | |
188 | return buffer; | |
189 | } | |
190 | ||
191 | ~OwningStringView() { | |
192 | if (buffer) { | |
193 | delete[] buffer; | |
194 | } | |
195 | } | |
196 | ||
197 | static arw::Result<OwningStringView> make(int64_t size) { | |
198 | uint8_t* buffer = new uint8_t[size]; | |
199 | if (!buffer) { | |
200 | return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size); | |
201 | } | |
202 | return OwningStringView(buffer, size); | |
203 | } | |
204 | ||
205 | static arw::Result<OwningStringView> shrink(OwningStringView&& from, | |
206 | int64_t new_size) { | |
207 | if (new_size > from.capacity) { | |
208 | return arw::Status::Invalid("new size cannot exceed capacity"); | |
209 | } else { | |
210 | return OwningStringView(std::move(from), new_size); | |
211 | } | |
212 | } | |
213 | ||
214 | }; | |
215 | ||
216 | // GLOBAL | |
217 | ||
218 | flt::Ticket FlightKeyToTicket(const FlightKey& key); | |
219 | arw::Status TicketToFlightKey(const flt::Ticket& t, FlightKey& key); | |
220 | ||
221 | } // namespace rgw::flight |