Skip to content

Commit

Permalink
ARROW-6094: [FlightRPC] Add Flight RPC method getFlightSchema
Browse files Browse the repository at this point in the history
coupled with proposal in https://docs.google.com/document/d/1zLdFYikk3owbKpHvJrARLMlmYpi-Ef6OJy7H90MqViA/edit?usp=sharing

can be merged once that proposal is approved

Closes apache#4980 from rymurr/ARROW-6094 and squashes the following commits:

f1660b9 <Ryan Murray> address code review
4e13616 <Antoine Pitrou> Fix lint
0eb712d <Ryan Murray> changes as per code review:
f61779b <Ryan Murray> address code review
28eac3c <Ryan Murray> changes as per mailing list suggestions
6a4741e <Ryan Murray> ARROW-6094:  add flight rpc method getFlightSchema

Lead-authored-by: Ryan Murray <[email protected]>
Co-authored-by: Ryan Murray <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Micah Kornfield <[email protected]>
  • Loading branch information
3 people authored and emkornfield committed Aug 31, 2019
1 parent 157b179 commit 2f3ea96
Show file tree
Hide file tree
Showing 22 changed files with 404 additions and 1 deletion.
25 changes: 25 additions & 0 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,25 @@ class FlightClient::FlightClientImpl {
return Status::OK();
}

Status GetSchema(const FlightCallOptions& options, const FlightDescriptor& descriptor,
std::unique_ptr<SchemaResult>* schema_result) {
pb::FlightDescriptor pb_descriptor;
pb::SchemaResult pb_response;

RETURN_NOT_OK(internal::ToProto(descriptor, &pb_descriptor));

ClientRpc rpc(options);
RETURN_NOT_OK(rpc.SetToken(auth_handler_.get()));
Status s = internal::FromGrpcStatus(
stub_->GetSchema(&rpc.context, pb_descriptor, &pb_response));
RETURN_NOT_OK(s);

std::string str;
RETURN_NOT_OK(internal::FromProto(pb_response, &str));
schema_result->reset(new SchemaResult(str));
return Status::OK();
}

Status DoGet(const FlightCallOptions& options, const Ticket& ticket,
std::unique_ptr<FlightStreamReader>* out) {
pb::Ticket pb_ticket;
Expand Down Expand Up @@ -595,6 +614,12 @@ Status FlightClient::GetFlightInfo(const FlightCallOptions& options,
return impl_->GetFlightInfo(options, descriptor, info);
}

Status FlightClient::GetSchema(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::unique_ptr<SchemaResult>* schema_result) {
return impl_->GetSchema(options, descriptor, schema_result);
}

Status FlightClient::ListFlights(std::unique_ptr<FlightListing>* listing) {
return ListFlights({}, {}, listing);
}
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ class ARROW_FLIGHT_EXPORT FlightClient {
return GetFlightInfo({}, descriptor, info);
}

/// \brief Request schema for a single flight, which may be an existing
/// dataset or a command to be executed
/// \param[in] options Per-RPC options
/// \param[in] descriptor the dataset request, whether a named dataset or
/// command
/// \param[out] schema_result the SchemaResult describing the dataset schema
/// \return Status
Status GetSchema(const FlightCallOptions& options, const FlightDescriptor& descriptor,
std::unique_ptr<SchemaResult>* schema_result);
Status GetSchema(const FlightDescriptor& descriptor,
std::unique_ptr<SchemaResult>* schema_result) {
return GetSchema({}, descriptor, schema_result);
}

/// \brief List all available flights known to the server
/// \param[out] listing an iterator that returns a FlightInfo for each flight
/// \return Status
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/flight/flight_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,17 @@ TEST_F(TestFlightClient, GetFlightInfo) {
AssertEqual(flights[0], *info);
}

TEST_F(TestFlightClient, GetSchema) {
auto descr = FlightDescriptor::Path({"examples", "ints"});
std::unique_ptr<SchemaResult> schema_result;
std::shared_ptr<Schema> schema;
ipc::DictionaryMemo dict_memo;

ASSERT_OK(client_->GetSchema(descr, &schema_result));
ASSERT_NE(schema_result, nullptr);
ASSERT_OK(schema_result->GetSchema(&dict_memo, &schema));
}

TEST_F(TestFlightClient, GetFlightInfoNotFound) {
auto descr = FlightDescriptor::Path({"examples", "things"});
std::unique_ptr<FlightInfo> info;
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/flight/internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info) {
return Status::OK();
}

Status FromProto(const pb::SchemaResult& pb_result, std::string* result) {
*result = pb_result.schema();
return Status::OK();
}

Status SchemaToString(const Schema& schema, std::string* out) {
// TODO(wesm): Do we care about better memory efficiency here?
std::shared_ptr<Buffer> serialized_schema;
Expand Down Expand Up @@ -344,6 +349,11 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {
return Status::OK();
}

Status ToProto(const SchemaResult& result, pb::SchemaResult* pb_result) {
pb_result->set_schema(result.serialized_schema());
return Status::OK();
}

} // namespace internal
} // namespace flight
} // namespace arrow
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ Status FromProto(const pb::FlightData& pb_data, FlightDescriptor* descriptor,
Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor* descr);
Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint);
Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info);
Status FromProto(const pb::SchemaResult& pb_result, std::string* result);

Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr);
Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info);
Status ToProto(const ActionType& type, pb::ActionType* pb_type);
Status ToProto(const Action& action, pb::Action* pb_action);
Status ToProto(const Result& result, pb::Result* pb_result);
Status ToProto(const SchemaResult& result, pb::SchemaResult* pb_result);
void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket);

} // namespace internal
Expand Down
27 changes: 27 additions & 0 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,27 @@ class FlightServiceImpl : public FlightService::Service {
return grpc::Status::OK;
}

grpc::Status GetSchema(ServerContext* context, const pb::FlightDescriptor* request,
pb::SchemaResult* response) {
CHECK_ARG_NOT_NULL(request, "FlightDescriptor cannot be null");
GrpcServerCallContext flight_context;
GRPC_RETURN_NOT_GRPC_OK(CheckAuth(context, flight_context));

FlightDescriptor descr;
GRPC_RETURN_NOT_OK(internal::FromProto(*request, &descr));

std::unique_ptr<SchemaResult> result;
GRPC_RETURN_NOT_OK(server_->GetSchema(flight_context, descr, &result));

if (!result) {
// Treat null listing as no flights available
return grpc::Status(grpc::StatusCode::NOT_FOUND, "Flight not found");
}

GRPC_RETURN_NOT_OK(internal::ToProto(*result, response));
return grpc::Status::OK;
}

grpc::Status DoGet(ServerContext* context, const pb::Ticket* request,
ServerWriter<pb::FlightData>* writer) {
CHECK_ARG_NOT_NULL(request, "ticket cannot be null");
Expand Down Expand Up @@ -627,6 +648,12 @@ Status FlightServerBase::ListActions(const ServerCallContext& context,
return Status::NotImplemented("NYI");
}

Status FlightServerBase::GetSchema(const ServerCallContext& context,
const FlightDescriptor& request,
std::unique_ptr<SchemaResult>* schema) {
return Status::NotImplemented("NYI");
}

// ----------------------------------------------------------------------
// Implement RecordBatchStream

Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/flight/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ class ARROW_FLIGHT_EXPORT FlightServerBase {
const FlightDescriptor& request,
std::unique_ptr<FlightInfo>* info);

/// \brief Retrieve the schema for the indicated descriptor
/// \param[in] context The call context.
/// \param[in] request may be null
/// \param[out] schema the returned flight schema provider
/// \return Status
virtual Status GetSchema(const ServerCallContext& context,
const FlightDescriptor& request,
std::unique_ptr<SchemaResult>* schema);

/// \brief Get a stream of IPC payloads to put on the wire
/// \param[in] context The call context.
/// \param[in] request an opaque ticket
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,20 @@ class FlightTestServer : public FlightServerBase {
*out = std::move(actions);
return Status::OK();
}

Status GetSchema(const ServerCallContext& context, const FlightDescriptor& request,
std::unique_ptr<SchemaResult>* schema) override {
std::vector<FlightInfo> flights = ExampleFlightInfo();

for (const auto& info : flights) {
if (info.descriptor().Equals(request)) {
*schema =
std::unique_ptr<SchemaResult>(new SchemaResult(info.serialized_schema()));
return Status::OK();
}
}
return Status::Invalid("Flight not found: ", request.ToString());
}
};

std::unique_ptr<FlightServerBase> ExampleTestServer() {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ std::string FlightDescriptor::ToString() const {
return ss.str();
}

Status SchemaResult::GetSchema(ipc::DictionaryMemo* dictionary_memo,
std::shared_ptr<Schema>* out) const {
io::BufferReader schema_reader(raw_schema_);
RETURN_NOT_OK(ipc::ReadSchema(&schema_reader, dictionary_memo, out));
return Status::OK();
}

Status FlightDescriptor::SerializeToString(std::string* out) const {
pb::FlightDescriptor pb_descriptor;
RETURN_NOT_OK(internal::ToProto(*this, &pb_descriptor));
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,24 @@ struct ARROW_FLIGHT_EXPORT FlightPayload {
ipc::internal::IpcPayload ipc_message;
};

/// \brief Schema result returned after a schema request RPC
struct ARROW_FLIGHT_EXPORT SchemaResult {
public:
explicit SchemaResult(std::string schema) : raw_schema_(std::move(schema)) {}

/// \brief return schema
/// \param[in,out] dictionary_memo for dictionary bookkeeping, will
/// be modified
/// \param[out] out the reconstructed Schema
Status GetSchema(ipc::DictionaryMemo* dictionary_memo,
std::shared_ptr<Schema>* out) const;

const std::string& serialized_schema() const { return raw_schema_; }

private:
std::string raw_schema_;
};

/// \brief The access coordinates for retireval of a dataset, returned by
/// GetFlightInfo
class ARROW_FLIGHT_EXPORT FlightInfo {
Expand Down
19 changes: 19 additions & 0 deletions cpp/src/arrow/python/flight.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ Status PyFlightServer::GetFlightInfo(const arrow::flight::ServerCallContext& con
});
}

Status PyFlightServer::GetSchema(const arrow::flight::ServerCallContext& context,
const arrow::flight::FlightDescriptor& request,
std::unique_ptr<arrow::flight::SchemaResult>* result) {
return SafeCallIntoPython([&] {
vtable_.get_schema(server_.obj(), context, request, result);
return CheckPyError();
});
}

Status PyFlightServer::DoGet(const arrow::flight::ServerCallContext& context,
const arrow::flight::Ticket& request,
std::unique_ptr<arrow::flight::FlightDataStream>* stream) {
Expand Down Expand Up @@ -245,6 +254,16 @@ Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
return Status::OK();
}

Status CreateSchemaResult(const std::shared_ptr<arrow::Schema>& schema,
std::unique_ptr<arrow::flight::SchemaResult>* out) {
std::string schema_in;
RETURN_NOT_OK(arrow::flight::internal::SchemaToString(*schema, &schema_in));
arrow::flight::SchemaResult value(schema_in);
*out = std::unique_ptr<arrow::flight::SchemaResult>(
new arrow::flight::SchemaResult(value));
return Status::OK();
}

} // namespace flight
} // namespace py
} // namespace arrow
12 changes: 12 additions & 0 deletions cpp/src/arrow/python/flight.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class ARROW_PYTHON_EXPORT PyFlightServerVtable {
const arrow::flight::FlightDescriptor&,
std::unique_ptr<arrow::flight::FlightInfo>*)>
get_flight_info;
std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
const arrow::flight::FlightDescriptor&,
std::unique_ptr<arrow::flight::SchemaResult>*)>
get_schema;
std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
const arrow::flight::Ticket&,
std::unique_ptr<arrow::flight::FlightDataStream>*)>
Expand Down Expand Up @@ -120,6 +124,9 @@ class ARROW_PYTHON_EXPORT PyFlightServer : public arrow::flight::FlightServerBas
Status GetFlightInfo(const arrow::flight::ServerCallContext& context,
const arrow::flight::FlightDescriptor& request,
std::unique_ptr<arrow::flight::FlightInfo>* info) override;
Status GetSchema(const arrow::flight::ServerCallContext& context,
const arrow::flight::FlightDescriptor& request,
std::unique_ptr<arrow::flight::SchemaResult>* result) override;
Status DoGet(const arrow::flight::ServerCallContext& context,
const arrow::flight::Ticket& request,
std::unique_ptr<arrow::flight::FlightDataStream>* stream) override;
Expand Down Expand Up @@ -205,6 +212,11 @@ Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
int64_t total_records, int64_t total_bytes,
std::unique_ptr<arrow::flight::FlightInfo>* out);

/// \brief Create a SchemaResult from schema.
ARROW_PYTHON_EXPORT
Status CreateSchemaResult(const std::shared_ptr<arrow::Schema>& schema,
std::unique_ptr<arrow::flight::SchemaResult>* out);

} // namespace flight
} // namespace py
} // namespace arrow
Expand Down
16 changes: 16 additions & 0 deletions format/Flight.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ service FlightService {
*/
rpc GetFlightInfo(FlightDescriptor) returns (FlightInfo) {}

/*
* For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema
* This is used when a consumer needs the Schema of flight stream. Similar to
* GetFlightInfo this interface may generate a new flight that was not previously
* available in ListFlights.
*/
rpc GetSchema(FlightDescriptor) returns (SchemaResult) {}

/*
* Retrieve a single stream associated with a particular descriptor
* associated with the referenced ticket. A Flight can be composed of one or
Expand Down Expand Up @@ -169,6 +177,14 @@ message Result {
bytes body = 1;
}

/*
* Wrap the result of a getSchema call
*/
message SchemaResult {
// schema of the dataset as described in Schema.fbs::Schema.
bytes schema = 1;
}

/*
* The name or tag for a Flight. May be used as a way to retrieve or generate
* a flight or be used to expose a set of previously defined flights.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ public FlightInfo getInfo(FlightDescriptor descriptor, CallOption... options) {
}
}

/**
* Get schema for a stream.
* @param descriptor The descriptor for the stream.
* @param options RPC-layer hints for this call.
*/
public SchemaResult getSchema(FlightDescriptor descriptor, CallOption... options) {
return SchemaResult.fromProtocol(CallOptions.wrapStub(blockingStub, options).getSchema(descriptor.toProtocol()));
}

/**
* Retrieve a stream from the server.
* @param ticket The ticket granting access to the data stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ void listFlights(CallContext context, Criteria criteria,
*/
FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor);

/**
* Get schema for a particular data stream.
*
* @param context Per-call context.
* @param descriptor The descriptor identifying the data stream.
* @return Schema for the stream.
*/
default SchemaResult getSchema(CallContext context, FlightDescriptor descriptor) {
FlightInfo info = getFlightInfo(context, descriptor);
return new SchemaResult(info.getSchema());
}


/**
* Accept uploaded data for a particular stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,19 @@ public void getFlightInfo(Flight.FlightDescriptor request, StreamObserver<Flight
}
}

@Override
public void getSchema(Flight.FlightDescriptor request, StreamObserver<Flight.SchemaResult> responseObserver) {
try {
SchemaResult result = producer
.getSchema(makeContext((ServerCallStreamObserver<?>) responseObserver),
new FlightDescriptor(request));
responseObserver.onNext(result.toProtocol());
responseObserver.onCompleted();
} catch (Exception ex) {
responseObserver.onError(StatusUtils.toGrpcException(ex));
}
}

/**
* Call context for the service.
*/
Expand Down
Loading

0 comments on commit 2f3ea96

Please sign in to comment.