1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "arrow/flight/types.h"
24 #include "arrow/buffer.h"
25 #include "arrow/flight/serialization_internal.h"
26 #include "arrow/io/memory.h"
27 #include "arrow/ipc/dictionary.h"
28 #include "arrow/ipc/reader.h"
29 #include "arrow/status.h"
30 #include "arrow/table.h"
31 #include "arrow/util/uri.h"
36 const char* kSchemeGrpc
= "grpc";
37 const char* kSchemeGrpcTcp
= "grpc+tcp";
38 const char* kSchemeGrpcUnix
= "grpc+unix";
39 const char* kSchemeGrpcTls
= "grpc+tls";
41 const char* kErrorDetailTypeId
= "flight::FlightStatusDetail";
43 const char* FlightStatusDetail::type_id() const { return kErrorDetailTypeId
; }
45 std::string
FlightStatusDetail::ToString() const { return CodeAsString(); }
47 FlightStatusCode
FlightStatusDetail::code() const { return code_
; }
49 std::string
FlightStatusDetail::extra_info() const { return extra_info_
; }
51 void FlightStatusDetail::set_extra_info(std::string extra_info
) {
52 extra_info_
= std::move(extra_info
);
55 std::string
FlightStatusDetail::CodeAsString() const {
57 case FlightStatusCode::Internal
:
59 case FlightStatusCode::TimedOut
:
61 case FlightStatusCode::Cancelled
:
63 case FlightStatusCode::Unauthenticated
:
64 return "Unauthenticated";
65 case FlightStatusCode::Unauthorized
:
66 return "Unauthorized";
67 case FlightStatusCode::Unavailable
:
74 std::shared_ptr
<FlightStatusDetail
> FlightStatusDetail::UnwrapStatus(
75 const arrow::Status
& status
) {
76 if (!status
.detail() || status
.detail()->type_id() != kErrorDetailTypeId
) {
79 return std::dynamic_pointer_cast
<FlightStatusDetail
>(status
.detail());
82 Status
MakeFlightError(FlightStatusCode code
, std::string message
,
83 std::string extra_info
) {
84 StatusCode arrow_code
= arrow::StatusCode::IOError
;
85 return arrow::Status(arrow_code
, std::move(message
),
86 std::make_shared
<FlightStatusDetail
>(code
, std::move(extra_info
)));
89 bool FlightDescriptor::Equals(const FlightDescriptor
& other
) const {
90 if (type
!= other
.type
) {
95 return path
== other
.path
;
97 return cmd
== other
.cmd
;
103 std::string
FlightDescriptor::ToString() const {
104 std::stringstream ss
;
105 ss
<< "FlightDescriptor<";
110 for (const auto& p
: path
) {
121 ss
<< "cmd = '" << cmd
<< "'";
130 Status
FlightPayload::Validate() const {
131 static constexpr int64_t kInt32Max
= std::numeric_limits
<int32_t>::max();
132 if (descriptor
&& descriptor
->size() > kInt32Max
) {
133 return Status::CapacityError("Descriptor size overflow (>= 2**31)");
135 if (app_metadata
&& app_metadata
->size() > kInt32Max
) {
136 return Status::CapacityError("app_metadata size overflow (>= 2**31)");
138 if (ipc_message
.body_length
> kInt32Max
) {
139 return Status::Invalid("Cannot send record batches exceeding 2GiB yet");
144 Status
SchemaResult::GetSchema(ipc::DictionaryMemo
* dictionary_memo
,
145 std::shared_ptr
<Schema
>* out
) const {
146 io::BufferReader
schema_reader(raw_schema_
);
147 return ipc::ReadSchema(&schema_reader
, dictionary_memo
).Value(out
);
150 Status
FlightDescriptor::SerializeToString(std::string
* out
) const {
151 pb::FlightDescriptor pb_descriptor
;
152 RETURN_NOT_OK(internal::ToProto(*this, &pb_descriptor
));
154 if (!pb_descriptor
.SerializeToString(out
)) {
155 return Status::IOError("Serialized descriptor exceeded 2 GiB limit");
160 Status
FlightDescriptor::Deserialize(const std::string
& serialized
,
161 FlightDescriptor
* out
) {
162 pb::FlightDescriptor pb_descriptor
;
163 if (!pb_descriptor
.ParseFromString(serialized
)) {
164 return Status::Invalid("Not a valid descriptor");
166 return internal::FromProto(pb_descriptor
, out
);
169 bool Ticket::Equals(const Ticket
& other
) const { return ticket
== other
.ticket
; }
171 Status
Ticket::SerializeToString(std::string
* out
) const {
172 pb::Ticket pb_ticket
;
173 internal::ToProto(*this, &pb_ticket
);
175 if (!pb_ticket
.SerializeToString(out
)) {
176 return Status::IOError("Serialized ticket exceeded 2 GiB limit");
181 Status
Ticket::Deserialize(const std::string
& serialized
, Ticket
* out
) {
182 pb::Ticket pb_ticket
;
183 if (!pb_ticket
.ParseFromString(serialized
)) {
184 return Status::Invalid("Not a valid ticket");
186 return internal::FromProto(pb_ticket
, out
);
189 arrow::Result
<FlightInfo
> FlightInfo::Make(const Schema
& schema
,
190 const FlightDescriptor
& descriptor
,
191 const std::vector
<FlightEndpoint
>& endpoints
,
192 int64_t total_records
, int64_t total_bytes
) {
193 FlightInfo::Data data
;
194 data
.descriptor
= descriptor
;
195 data
.endpoints
= endpoints
;
196 data
.total_records
= total_records
;
197 data
.total_bytes
= total_bytes
;
198 RETURN_NOT_OK(internal::SchemaToString(schema
, &data
.schema
));
199 return FlightInfo(data
);
202 Status
FlightInfo::GetSchema(ipc::DictionaryMemo
* dictionary_memo
,
203 std::shared_ptr
<Schema
>* out
) const {
204 if (reconstructed_schema_
) {
208 io::BufferReader
schema_reader(data_
.schema
);
209 RETURN_NOT_OK(ipc::ReadSchema(&schema_reader
, dictionary_memo
).Value(&schema_
));
210 reconstructed_schema_
= true;
215 Status
FlightInfo::SerializeToString(std::string
* out
) const {
216 pb::FlightInfo pb_info
;
217 RETURN_NOT_OK(internal::ToProto(*this, &pb_info
));
219 if (!pb_info
.SerializeToString(out
)) {
220 return Status::IOError("Serialized FlightInfo exceeded 2 GiB limit");
225 Status
FlightInfo::Deserialize(const std::string
& serialized
,
226 std::unique_ptr
<FlightInfo
>* out
) {
227 pb::FlightInfo pb_info
;
228 if (!pb_info
.ParseFromString(serialized
)) {
229 return Status::Invalid("Not a valid FlightInfo");
231 FlightInfo::Data data
;
232 RETURN_NOT_OK(internal::FromProto(pb_info
, &data
));
233 out
->reset(new FlightInfo(data
));
237 Location::Location() { uri_
= std::make_shared
<arrow::internal::Uri
>(); }
239 Status
Location::Parse(const std::string
& uri_string
, Location
* location
) {
240 return location
->uri_
->Parse(uri_string
);
243 Status
Location::ForGrpcTcp(const std::string
& host
, const int port
, Location
* location
) {
244 std::stringstream uri_string
;
245 uri_string
<< "grpc+tcp://" << host
<< ':' << port
;
246 return Location::Parse(uri_string
.str(), location
);
249 Status
Location::ForGrpcTls(const std::string
& host
, const int port
, Location
* location
) {
250 std::stringstream uri_string
;
251 uri_string
<< "grpc+tls://" << host
<< ':' << port
;
252 return Location::Parse(uri_string
.str(), location
);
255 Status
Location::ForGrpcUnix(const std::string
& path
, Location
* location
) {
256 std::stringstream uri_string
;
257 uri_string
<< "grpc+unix://" << path
;
258 return Location::Parse(uri_string
.str(), location
);
261 std::string
Location::ToString() const { return uri_
->ToString(); }
262 std::string
Location::scheme() const {
263 std::string scheme
= uri_
->scheme();
264 if (scheme
.empty()) {
265 // Default to grpc+tcp
271 bool Location::Equals(const Location
& other
) const {
272 return ToString() == other
.ToString();
275 bool FlightEndpoint::Equals(const FlightEndpoint
& other
) const {
276 return ticket
== other
.ticket
&& locations
== other
.locations
;
279 Status
MetadataRecordBatchReader::ReadAll(
280 std::vector
<std::shared_ptr
<RecordBatch
>>* batches
) {
281 FlightStreamChunk chunk
;
284 RETURN_NOT_OK(Next(&chunk
));
285 if (!chunk
.data
) break;
286 batches
->emplace_back(std::move(chunk
.data
));
291 Status
MetadataRecordBatchReader::ReadAll(std::shared_ptr
<Table
>* table
) {
292 std::vector
<std::shared_ptr
<RecordBatch
>> batches
;
293 RETURN_NOT_OK(ReadAll(&batches
));
294 ARROW_ASSIGN_OR_RAISE(auto schema
, GetSchema());
295 return Table::FromRecordBatches(schema
, std::move(batches
)).Value(table
);
298 Status
MetadataRecordBatchWriter::Begin(const std::shared_ptr
<Schema
>& schema
) {
299 return Begin(schema
, ipc::IpcWriteOptions::Defaults());
303 class MetadataRecordBatchReaderAdapter
: public RecordBatchReader
{
305 explicit MetadataRecordBatchReaderAdapter(
306 std::shared_ptr
<Schema
> schema
, std::shared_ptr
<MetadataRecordBatchReader
> delegate
)
307 : schema_(std::move(schema
)), delegate_(std::move(delegate
)) {}
308 std::shared_ptr
<Schema
> schema() const override
{ return schema_
; }
309 Status
ReadNext(std::shared_ptr
<RecordBatch
>* batch
) override
{
310 FlightStreamChunk next
;
312 RETURN_NOT_OK(delegate_
->Next(&next
));
313 if (!next
.data
&& !next
.app_metadata
) {
317 } else if (next
.data
) {
318 *batch
= std::move(next
.data
);
321 // Got metadata, but no data (which is valid) - read the next message
326 std::shared_ptr
<Schema
> schema_
;
327 std::shared_ptr
<MetadataRecordBatchReader
> delegate_
;
331 arrow::Result
<std::shared_ptr
<RecordBatchReader
>> MakeRecordBatchReader(
332 std::shared_ptr
<MetadataRecordBatchReader
> reader
) {
333 ARROW_ASSIGN_OR_RAISE(auto schema
, reader
->GetSchema());
334 return std::make_shared
<MetadataRecordBatchReaderAdapter
>(std::move(schema
),
338 SimpleFlightListing::SimpleFlightListing(const std::vector
<FlightInfo
>& flights
)
339 : position_(0), flights_(flights
) {}
341 SimpleFlightListing::SimpleFlightListing(std::vector
<FlightInfo
>&& flights
)
342 : position_(0), flights_(std::move(flights
)) {}
344 Status
SimpleFlightListing::Next(std::unique_ptr
<FlightInfo
>* info
) {
345 if (position_
>= static_cast<int>(flights_
.size())) {
349 *info
= std::unique_ptr
<FlightInfo
>(new FlightInfo(std::move(flights_
[position_
++])));
353 SimpleResultStream::SimpleResultStream(std::vector
<Result
>&& results
)
354 : results_(std::move(results
)), position_(0) {}
356 Status
SimpleResultStream::Next(std::unique_ptr
<Result
>* result
) {
357 if (position_
>= results_
.size()) {
361 *result
= std::unique_ptr
<Result
>(new Result(std::move(results_
[position_
++])));
365 Status
BasicAuth::Deserialize(const std::string
& serialized
, BasicAuth
* out
) {
366 pb::BasicAuth pb_result
;
367 pb_result
.ParseFromString(serialized
);
368 return internal::FromProto(pb_result
, out
);
371 Status
BasicAuth::Serialize(const BasicAuth
& basic_auth
, std::string
* out
) {
372 pb::BasicAuth pb_result
;
373 RETURN_NOT_OK(internal::ToProto(basic_auth
, &pb_result
));
374 *out
= pb_result
.SerializeAsString();
377 } // namespace flight