1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
9 * See file COPYING for licensing information.
16 #include "arrow/type.h"
17 #include "arrow/flight/server.h"
18 #include "arrow/io/file.h"
20 #include "parquet/arrow/reader.h"
21 #include "parquet/arrow/schema.h"
22 #include "parquet/stream_reader.h"
24 #include "rgw_flight_frontend.h"
25 #include "rgw_flight.h"
29 constexpr unsigned dout_subsys
= ceph_subsys_rgw_flight
;
30 constexpr const char* dout_prefix_str
= "rgw arrow_flight: ";
33 namespace rgw::flight
{
35 const FlightKey null_flight_key
= 0;
37 FlightFrontend::FlightFrontend(RGWProcessEnv
& _env
,
38 RGWFrontendConfig
* _config
,
43 dp(env
.driver
->ctx(), dout_subsys
, dout_prefix_str
)
45 env
.flight_store
= new MemoryFlightStore(dp
);
46 env
.flight_server
= new FlightServer(env
, env
.flight_store
, dp
);
47 INFO
<< "flight server started" << dendl
;
50 FlightFrontend::~FlightFrontend() {
51 delete env
.flight_server
;
52 env
.flight_server
= nullptr;
54 delete env
.flight_store
;
55 env
.flight_store
= nullptr;
57 INFO
<< "flight server shut down" << dendl
;
60 int FlightFrontend::init() {
62 port
= FlightServer::default_port
;
64 const std::string url
=
65 std::string("grpc+tcp://localhost:") + std::to_string(port
);
66 flt::Location location
;
67 arw::Status s
= flt::Location::Parse(url
, &location
);
69 ERROR
<< "couldn't parse url=" << url
<< ", status=" << s
<< dendl
;
73 flt::FlightServerOptions
options(location
);
74 options
.verify_client
= false;
75 s
= env
.flight_server
->Init(options
);
77 ERROR
<< "couldn't init flight server; status=" << s
<< dendl
;
81 INFO
<< "FlightServer inited; will use port " << port
<< dendl
;
85 int FlightFrontend::run() {
87 flight_thread
= make_named_thread(server_thread_name
,
91 INFO
<< "FlightServer thread started, id=" <<
92 flight_thread
.get_id() <<
93 ", joinable=" << flight_thread
.joinable() << dendl
;
95 } catch (std::system_error
& e
) {
96 ERROR
<< "FlightServer thread failed to start" << dendl
;
97 return -e
.code().value();
101 void FlightFrontend::stop() {
102 env
.flight_server
->Shutdown();
103 env
.flight_server
->Wait();
104 INFO
<< "FlightServer shut down" << dendl
;
107 void FlightFrontend::join() {
108 flight_thread
.join();
109 INFO
<< "FlightServer thread joined" << dendl
;
112 void FlightFrontend::pause_for_new_config() {
113 // ignore since config changes won't alter flight_server
116 void FlightFrontend::unpause_with_new_config() {
117 // ignore since config changes won't alter flight_server
120 /* ************************************************************ */
122 FlightGetObj_Filter::FlightGetObj_Filter(const req_state
* request
,
123 RGWGetObj_Filter
* next
) :
124 RGWGetObj_Filter(next
),
126 dp(request
->cct
->get(), dout_subsys
, dout_prefix_str
),
128 expected_size(request
->obj_size
),
129 uri(request
->decoded_uri
),
130 tenant_name(request
->bucket
->get_tenant()),
131 bucket_name(request
->bucket
->get_name()),
132 object_key(request
->object
->get_key()),
133 // note: what about object namespace and instance?
134 schema_status(arrow::StatusCode::Cancelled
,
135 "schema determination incomplete"),
136 user_id(request
->user
->get_id())
138 #warning "TODO: fix use of tmpnam"
140 const char* namep
= std::tmpnam(name
);
144 temp_file_name
= namep
;
146 temp_file
.open(temp_file_name
);
149 FlightGetObj_Filter::~FlightGetObj_Filter() {
150 if (temp_file
.is_open()) {
153 std::error_code error
;
154 std::filesystem::remove(temp_file_name
, error
);
156 ERROR
<< "FlightGetObj_Filter got error when removing temp file; "
157 "error=" << error
.value() <<
158 ", temp_file_name=" << temp_file_name
<< dendl
;
160 INFO
<< "parquet/arrow schema determination status: " <<
161 schema_status
<< dendl
;
165 int FlightGetObj_Filter::handle_data(bufferlist
& bl
,
166 off_t bl_ofs
, off_t bl_len
) {
167 INFO
<< "flight handling data from offset " <<
168 current_offset
<< " (" << bl_ofs
<< ") of size " << bl_len
<< dendl
;
170 current_offset
+= bl_len
;
172 if (temp_file
.is_open()) {
173 bl
.write_stream(temp_file
);
175 if (current_offset
>= expected_size
) {
176 INFO
<< "data read is completed, current_offset=" <<
177 current_offset
<< ", expected_size=" << expected_size
<< dendl
;
180 std::shared_ptr
<const arw::KeyValueMetadata
> kv_metadata
;
181 std::shared_ptr
<arw::Schema
> aw_schema
;
182 int64_t num_rows
= 0;
184 auto process_metadata
= [&aw_schema
, &num_rows
, &kv_metadata
, this]() -> arrow::Status
{
185 ARROW_ASSIGN_OR_RAISE(std::shared_ptr
<arrow::io::ReadableFile
> file
,
186 arrow::io::ReadableFile::Open(temp_file_name
));
187 const std::shared_ptr
<parquet::FileMetaData
> metadata
= parquet::ReadMetaData(file
);
191 num_rows
= metadata
->num_rows();
192 kv_metadata
= metadata
->key_value_metadata();
193 const parquet::SchemaDescriptor
* pq_schema
= metadata
->schema();
194 ARROW_RETURN_NOT_OK(parquet::arrow::FromParquetSchema(pq_schema
, &aw_schema
));
196 return arrow::Status::OK();
199 schema_status
= process_metadata();
200 if (!schema_status
.ok()) {
201 ERROR
<< "reading metadata to access schema, error=" << schema_status
<< dendl
;
203 // INFO << "arrow_schema=" << *aw_schema << dendl;
204 FlightStore
* store
= penv
.flight_store
;
206 store
->add_flight(FlightData(uri
, tenant_name
, bucket_name
,
207 object_key
, num_rows
,
208 expected_size
, aw_schema
,
209 kv_metadata
, user_id
));
210 (void) key
; // suppress unused variable warning
215 // chain to next filter in stream
216 int ret
= RGWGetObj_Filter::handle_data(bl
, bl_ofs
, bl_len
);
222 void code_snippets() {
223 INFO
<< "num_columns:" << md
->num_columns() <<
224 " num_schema_elements:" << md
->num_schema_elements() <<
225 " num_rows:" << md
->num_rows() <<
226 " num_row_groups:" << md
->num_row_groups() << dendl
;
229 INFO
<< "file schema: name=" << schema1
->name() << ", ToString:" << schema1
->ToString() << ", num_columns=" << schema1
->num_columns() << dendl
;
230 for (int c
= 0; c
< schema1
->num_columns(); ++c
) {
231 const parquet::ColumnDescriptor
* cd
= schema1
->Column(c
);
232 // const parquet::ConvertedType::type t = cd->converted_type;
233 const std::shared_ptr
<const parquet::LogicalType
> lt
= cd
->logical_type();
234 INFO
<< "column " << c
<< ": name=" << cd
->name() << ", ToString=" << cd
->ToString() << ", logical_type=" << lt
->ToString() << dendl
;
237 INFO
<< "There are " << md
->num_rows() << " rows and " << md
->num_row_groups() << " row groups" << dendl
;
238 for (int rg
= 0; rg
< md
->num_row_groups(); ++rg
) {
239 INFO
<< "Row Group " << rg
<< dendl
;
240 auto rg_md
= md
->RowGroup(rg
);
241 auto schema2
= rg_md
->schema();
246 } // namespace rgw::flight