]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab ft=cpp | |
3 | ||
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright 2023 IBM | |
8 | * | |
9 | * See file COPYING for licensing information. | |
10 | */ | |
11 | ||
12 | #include <cstdio> | |
13 | #include <filesystem> | |
14 | #include <sstream> | |
15 | ||
16 | #include "arrow/type.h" | |
17 | #include "arrow/flight/server.h" | |
18 | #include "arrow/io/file.h" | |
19 | ||
20 | #include "parquet/arrow/reader.h" | |
21 | #include "parquet/arrow/schema.h" | |
22 | #include "parquet/stream_reader.h" | |
23 | ||
24 | #include "rgw_flight_frontend.h" | |
25 | #include "rgw_flight.h" | |
26 | ||
27 | ||
28 | // logging | |
29 | constexpr unsigned dout_subsys = ceph_subsys_rgw_flight; | |
30 | constexpr const char* dout_prefix_str = "rgw arrow_flight: "; | |
31 | ||
32 | ||
33 | namespace rgw::flight { | |
34 | ||
35 | const FlightKey null_flight_key = 0; | |
36 | ||
37 | FlightFrontend::FlightFrontend(RGWProcessEnv& _env, | |
38 | RGWFrontendConfig* _config, | |
39 | int _port) : | |
40 | env(_env), | |
41 | config(_config), | |
42 | port(_port), | |
43 | dp(env.driver->ctx(), dout_subsys, dout_prefix_str) | |
44 | { | |
45 | env.flight_store = new MemoryFlightStore(dp); | |
46 | env.flight_server = new FlightServer(env, env.flight_store, dp); | |
47 | INFO << "flight server started" << dendl; | |
48 | } | |
49 | ||
50 | FlightFrontend::~FlightFrontend() { | |
51 | delete env.flight_server; | |
52 | env.flight_server = nullptr; | |
53 | ||
54 | delete env.flight_store; | |
55 | env.flight_store = nullptr; | |
56 | ||
57 | INFO << "flight server shut down" << dendl; | |
58 | } | |
59 | ||
60 | int FlightFrontend::init() { | |
61 | if (port <= 0) { | |
62 | port = FlightServer::default_port; | |
63 | } | |
64 | const std::string url = | |
65 | std::string("grpc+tcp://localhost:") + std::to_string(port); | |
66 | flt::Location location; | |
67 | arw::Status s = flt::Location::Parse(url, &location); | |
68 | if (!s.ok()) { | |
69 | ERROR << "couldn't parse url=" << url << ", status=" << s << dendl; | |
70 | return -EINVAL; | |
71 | } | |
72 | ||
73 | flt::FlightServerOptions options(location); | |
74 | options.verify_client = false; | |
75 | s = env.flight_server->Init(options); | |
76 | if (!s.ok()) { | |
77 | ERROR << "couldn't init flight server; status=" << s << dendl; | |
78 | return -EINVAL; | |
79 | } | |
80 | ||
81 | INFO << "FlightServer inited; will use port " << port << dendl; | |
82 | return 0; | |
83 | } | |
84 | ||
85 | int FlightFrontend::run() { | |
86 | try { | |
87 | flight_thread = make_named_thread(server_thread_name, | |
88 | &FlightServer::Serve, | |
89 | env.flight_server); | |
90 | ||
91 | INFO << "FlightServer thread started, id=" << | |
92 | flight_thread.get_id() << | |
93 | ", joinable=" << flight_thread.joinable() << dendl; | |
94 | return 0; | |
95 | } catch (std::system_error& e) { | |
96 | ERROR << "FlightServer thread failed to start" << dendl; | |
97 | return -e.code().value(); | |
98 | } | |
99 | } | |
100 | ||
101 | void FlightFrontend::stop() { | |
102 | env.flight_server->Shutdown(); | |
103 | env.flight_server->Wait(); | |
104 | INFO << "FlightServer shut down" << dendl; | |
105 | } | |
106 | ||
107 | void FlightFrontend::join() { | |
108 | flight_thread.join(); | |
109 | INFO << "FlightServer thread joined" << dendl; | |
110 | } | |
111 | ||
112 | void FlightFrontend::pause_for_new_config() { | |
113 | // ignore since config changes won't alter flight_server | |
114 | } | |
115 | ||
116 | void FlightFrontend::unpause_with_new_config() { | |
117 | // ignore since config changes won't alter flight_server | |
118 | } | |
119 | ||
120 | /* ************************************************************ */ | |
121 | ||
122 | FlightGetObj_Filter::FlightGetObj_Filter(const req_state* request, | |
123 | RGWGetObj_Filter* next) : | |
124 | RGWGetObj_Filter(next), | |
125 | penv(request->penv), | |
126 | dp(request->cct->get(), dout_subsys, dout_prefix_str), | |
127 | current_offset(0), | |
128 | expected_size(request->obj_size), | |
129 | uri(request->decoded_uri), | |
130 | tenant_name(request->bucket->get_tenant()), | |
131 | bucket_name(request->bucket->get_name()), | |
132 | object_key(request->object->get_key()), | |
133 | // note: what about object namespace and instance? | |
134 | schema_status(arrow::StatusCode::Cancelled, | |
135 | "schema determination incomplete"), | |
136 | user_id(request->user->get_id()) | |
137 | { | |
138 | #warning "TODO: fix use of tmpnam" | |
139 | char name[L_tmpnam]; | |
140 | const char* namep = std::tmpnam(name); | |
141 | if (!namep) { | |
142 | // | |
143 | } | |
144 | temp_file_name = namep; | |
145 | ||
146 | temp_file.open(temp_file_name); | |
147 | } | |
148 | ||
149 | FlightGetObj_Filter::~FlightGetObj_Filter() { | |
150 | if (temp_file.is_open()) { | |
151 | temp_file.close(); | |
152 | } | |
153 | std::error_code error; | |
154 | std::filesystem::remove(temp_file_name, error); | |
155 | if (error) { | |
156 | ERROR << "FlightGetObj_Filter got error when removing temp file; " | |
157 | "error=" << error.value() << | |
158 | ", temp_file_name=" << temp_file_name << dendl; | |
159 | } else { | |
160 | INFO << "parquet/arrow schema determination status: " << | |
161 | schema_status << dendl; | |
162 | } | |
163 | } | |
164 | ||
165 | int FlightGetObj_Filter::handle_data(bufferlist& bl, | |
166 | off_t bl_ofs, off_t bl_len) { | |
167 | INFO << "flight handling data from offset " << | |
168 | current_offset << " (" << bl_ofs << ") of size " << bl_len << dendl; | |
169 | ||
170 | current_offset += bl_len; | |
171 | ||
172 | if (temp_file.is_open()) { | |
173 | bl.write_stream(temp_file); | |
174 | ||
175 | if (current_offset >= expected_size) { | |
176 | INFO << "data read is completed, current_offset=" << | |
177 | current_offset << ", expected_size=" << expected_size << dendl; | |
178 | temp_file.close(); | |
179 | ||
180 | std::shared_ptr<const arw::KeyValueMetadata> kv_metadata; | |
181 | std::shared_ptr<arw::Schema> aw_schema; | |
182 | int64_t num_rows = 0; | |
183 | ||
184 | auto process_metadata = [&aw_schema, &num_rows, &kv_metadata, this]() -> arrow::Status { | |
185 | ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::ReadableFile> file, | |
186 | arrow::io::ReadableFile::Open(temp_file_name)); | |
187 | const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file); | |
188 | ||
189 | file->Close(); | |
190 | ||
191 | num_rows = metadata->num_rows(); | |
192 | kv_metadata = metadata->key_value_metadata(); | |
193 | const parquet::SchemaDescriptor* pq_schema = metadata->schema(); | |
194 | ARROW_RETURN_NOT_OK(parquet::arrow::FromParquetSchema(pq_schema, &aw_schema)); | |
195 | ||
196 | return arrow::Status::OK(); | |
197 | }; | |
198 | ||
199 | schema_status = process_metadata(); | |
200 | if (!schema_status.ok()) { | |
201 | ERROR << "reading metadata to access schema, error=" << schema_status << dendl; | |
202 | } else { | |
203 | // INFO << "arrow_schema=" << *aw_schema << dendl; | |
204 | FlightStore* store = penv.flight_store; | |
205 | auto key = | |
206 | store->add_flight(FlightData(uri, tenant_name, bucket_name, | |
207 | object_key, num_rows, | |
208 | expected_size, aw_schema, | |
209 | kv_metadata, user_id)); | |
210 | (void) key; // suppress unused variable warning | |
211 | } | |
212 | } // if last block | |
213 | } // if file opened | |
214 | ||
215 | // chain to next filter in stream | |
216 | int ret = RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len); | |
217 | ||
218 | return ret; | |
219 | } | |
220 | ||
221 | #if 0 | |
222 | void code_snippets() { | |
223 | INFO << "num_columns:" << md->num_columns() << | |
224 | " num_schema_elements:" << md->num_schema_elements() << | |
225 | " num_rows:" << md->num_rows() << | |
226 | " num_row_groups:" << md->num_row_groups() << dendl; | |
227 | ||
228 | ||
229 | INFO << "file schema: name=" << schema1->name() << ", ToString:" << schema1->ToString() << ", num_columns=" << schema1->num_columns() << dendl; | |
230 | for (int c = 0; c < schema1->num_columns(); ++c) { | |
231 | const parquet::ColumnDescriptor* cd = schema1->Column(c); | |
232 | // const parquet::ConvertedType::type t = cd->converted_type; | |
233 | const std::shared_ptr<const parquet::LogicalType> lt = cd->logical_type(); | |
234 | INFO << "column " << c << ": name=" << cd->name() << ", ToString=" << cd->ToString() << ", logical_type=" << lt->ToString() << dendl; | |
235 | } | |
236 | ||
237 | INFO << "There are " << md->num_rows() << " rows and " << md->num_row_groups() << " row groups" << dendl; | |
238 | for (int rg = 0; rg < md->num_row_groups(); ++rg) { | |
239 | INFO << "Row Group " << rg << dendl; | |
240 | auto rg_md = md->RowGroup(rg); | |
241 | auto schema2 = rg_md->schema(); | |
242 | } | |
243 | } | |
244 | #endif | |
245 | ||
246 | } // namespace rgw::flight |