]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_flight_frontend.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / rgw_flight_frontend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright 2023 IBM
8 *
9 * See file COPYING for licensing information.
10 */
11
12 #include <cstdio>
13 #include <filesystem>
14 #include <sstream>
15
16 #include "arrow/type.h"
17 #include "arrow/flight/server.h"
18 #include "arrow/io/file.h"
19
20 #include "parquet/arrow/reader.h"
21 #include "parquet/arrow/schema.h"
22 #include "parquet/stream_reader.h"
23
24 #include "rgw_flight_frontend.h"
25 #include "rgw_flight.h"
26
27
28 // logging
29 constexpr unsigned dout_subsys = ceph_subsys_rgw_flight;
30 constexpr const char* dout_prefix_str = "rgw arrow_flight: ";
31
32
33 namespace rgw::flight {
34
35 const FlightKey null_flight_key = 0;
36
37 FlightFrontend::FlightFrontend(RGWProcessEnv& _env,
38 RGWFrontendConfig* _config,
39 int _port) :
40 env(_env),
41 config(_config),
42 port(_port),
43 dp(env.driver->ctx(), dout_subsys, dout_prefix_str)
44 {
45 env.flight_store = new MemoryFlightStore(dp);
46 env.flight_server = new FlightServer(env, env.flight_store, dp);
47 INFO << "flight server started" << dendl;
48 }
49
50 FlightFrontend::~FlightFrontend() {
51 delete env.flight_server;
52 env.flight_server = nullptr;
53
54 delete env.flight_store;
55 env.flight_store = nullptr;
56
57 INFO << "flight server shut down" << dendl;
58 }
59
60 int FlightFrontend::init() {
61 if (port <= 0) {
62 port = FlightServer::default_port;
63 }
64 const std::string url =
65 std::string("grpc+tcp://localhost:") + std::to_string(port);
66 flt::Location location;
67 arw::Status s = flt::Location::Parse(url, &location);
68 if (!s.ok()) {
69 ERROR << "couldn't parse url=" << url << ", status=" << s << dendl;
70 return -EINVAL;
71 }
72
73 flt::FlightServerOptions options(location);
74 options.verify_client = false;
75 s = env.flight_server->Init(options);
76 if (!s.ok()) {
77 ERROR << "couldn't init flight server; status=" << s << dendl;
78 return -EINVAL;
79 }
80
81 INFO << "FlightServer inited; will use port " << port << dendl;
82 return 0;
83 }
84
85 int FlightFrontend::run() {
86 try {
87 flight_thread = make_named_thread(server_thread_name,
88 &FlightServer::Serve,
89 env.flight_server);
90
91 INFO << "FlightServer thread started, id=" <<
92 flight_thread.get_id() <<
93 ", joinable=" << flight_thread.joinable() << dendl;
94 return 0;
95 } catch (std::system_error& e) {
96 ERROR << "FlightServer thread failed to start" << dendl;
97 return -e.code().value();
98 }
99 }
100
101 void FlightFrontend::stop() {
102 env.flight_server->Shutdown();
103 env.flight_server->Wait();
104 INFO << "FlightServer shut down" << dendl;
105 }
106
107 void FlightFrontend::join() {
108 flight_thread.join();
109 INFO << "FlightServer thread joined" << dendl;
110 }
111
112 void FlightFrontend::pause_for_new_config() {
113 // ignore since config changes won't alter flight_server
114 }
115
116 void FlightFrontend::unpause_with_new_config() {
117 // ignore since config changes won't alter flight_server
118 }
119
120 /* ************************************************************ */
121
122 FlightGetObj_Filter::FlightGetObj_Filter(const req_state* request,
123 RGWGetObj_Filter* next) :
124 RGWGetObj_Filter(next),
125 penv(request->penv),
126 dp(request->cct->get(), dout_subsys, dout_prefix_str),
127 current_offset(0),
128 expected_size(request->obj_size),
129 uri(request->decoded_uri),
130 tenant_name(request->bucket->get_tenant()),
131 bucket_name(request->bucket->get_name()),
132 object_key(request->object->get_key()),
133 // note: what about object namespace and instance?
134 schema_status(arrow::StatusCode::Cancelled,
135 "schema determination incomplete"),
136 user_id(request->user->get_id())
137 {
138 #warning "TODO: fix use of tmpnam"
139 char name[L_tmpnam];
140 const char* namep = std::tmpnam(name);
141 if (!namep) {
142 //
143 }
144 temp_file_name = namep;
145
146 temp_file.open(temp_file_name);
147 }
148
149 FlightGetObj_Filter::~FlightGetObj_Filter() {
150 if (temp_file.is_open()) {
151 temp_file.close();
152 }
153 std::error_code error;
154 std::filesystem::remove(temp_file_name, error);
155 if (error) {
156 ERROR << "FlightGetObj_Filter got error when removing temp file; "
157 "error=" << error.value() <<
158 ", temp_file_name=" << temp_file_name << dendl;
159 } else {
160 INFO << "parquet/arrow schema determination status: " <<
161 schema_status << dendl;
162 }
163 }
164
165 int FlightGetObj_Filter::handle_data(bufferlist& bl,
166 off_t bl_ofs, off_t bl_len) {
167 INFO << "flight handling data from offset " <<
168 current_offset << " (" << bl_ofs << ") of size " << bl_len << dendl;
169
170 current_offset += bl_len;
171
172 if (temp_file.is_open()) {
173 bl.write_stream(temp_file);
174
175 if (current_offset >= expected_size) {
176 INFO << "data read is completed, current_offset=" <<
177 current_offset << ", expected_size=" << expected_size << dendl;
178 temp_file.close();
179
180 std::shared_ptr<const arw::KeyValueMetadata> kv_metadata;
181 std::shared_ptr<arw::Schema> aw_schema;
182 int64_t num_rows = 0;
183
184 auto process_metadata = [&aw_schema, &num_rows, &kv_metadata, this]() -> arrow::Status {
185 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::ReadableFile> file,
186 arrow::io::ReadableFile::Open(temp_file_name));
187 const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file);
188
189 file->Close();
190
191 num_rows = metadata->num_rows();
192 kv_metadata = metadata->key_value_metadata();
193 const parquet::SchemaDescriptor* pq_schema = metadata->schema();
194 ARROW_RETURN_NOT_OK(parquet::arrow::FromParquetSchema(pq_schema, &aw_schema));
195
196 return arrow::Status::OK();
197 };
198
199 schema_status = process_metadata();
200 if (!schema_status.ok()) {
201 ERROR << "reading metadata to access schema, error=" << schema_status << dendl;
202 } else {
203 // INFO << "arrow_schema=" << *aw_schema << dendl;
204 FlightStore* store = penv.flight_store;
205 auto key =
206 store->add_flight(FlightData(uri, tenant_name, bucket_name,
207 object_key, num_rows,
208 expected_size, aw_schema,
209 kv_metadata, user_id));
210 (void) key; // suppress unused variable warning
211 }
212 } // if last block
213 } // if file opened
214
215 // chain to next filter in stream
216 int ret = RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len);
217
218 return ret;
219 }
220
221 #if 0
222 void code_snippets() {
223 INFO << "num_columns:" << md->num_columns() <<
224 " num_schema_elements:" << md->num_schema_elements() <<
225 " num_rows:" << md->num_rows() <<
226 " num_row_groups:" << md->num_row_groups() << dendl;
227
228
229 INFO << "file schema: name=" << schema1->name() << ", ToString:" << schema1->ToString() << ", num_columns=" << schema1->num_columns() << dendl;
230 for (int c = 0; c < schema1->num_columns(); ++c) {
231 const parquet::ColumnDescriptor* cd = schema1->Column(c);
232 // const parquet::ConvertedType::type t = cd->converted_type;
233 const std::shared_ptr<const parquet::LogicalType> lt = cd->logical_type();
234 INFO << "column " << c << ": name=" << cd->name() << ", ToString=" << cd->ToString() << ", logical_type=" << lt->ToString() << dendl;
235 }
236
237 INFO << "There are " << md->num_rows() << " rows and " << md->num_row_groups() << " row groups" << dendl;
238 for (int rg = 0; rg < md->num_row_groups(); ++rg) {
239 INFO << "Row Group " << rg << dendl;
240 auto rg_md = md->RowGroup(rg);
241 auto schema2 = rg_md->schema();
242 }
243 }
244 #endif
245
246 } // namespace rgw::flight