From 8431f47863f3d2f630504e386f1d67d1903c08e8 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 23 Jan 2024 12:51:31 -0800 Subject: [PATCH 1/5] Streaming log for agent Signed-off-by: Kevin Su --- .../go/admin/mocks/AsyncAgentServiceClient.go | 12 +- .../go/admin/mocks/AsyncAgentServiceServer.go | 27 ++-- .../pb-cpp/flyteidl/service/agent.grpc.pb.cc | 38 ++---- .../pb-cpp/flyteidl/service/agent.grpc.pb.h | 116 +++++++--------- .../gen/pb-cpp/flyteidl/service/agent.pb.cc | 24 ++-- .../gen/pb-go/flyteidl/service/agent.pb.go | 129 +++++++++++------- .../pb-go/flyteidl/service/agent.swagger.json | 54 ++++++++ .../gen/pb-java/flyteidl/service/Agent.java | 22 +-- .../pb_python/flyteidl/service/agent_pb2.py | 8 +- .../flyteidl/service/agent_pb2_grpc.py | 6 +- flyteidl/go.mod | 2 - flyteidl/protos/flyteidl/service/agent.proto | 2 +- 12 files changed, 246 insertions(+), 194 deletions(-) diff --git a/flyteidl/clients/go/admin/mocks/AsyncAgentServiceClient.go b/flyteidl/clients/go/admin/mocks/AsyncAgentServiceClient.go index f11ef1adfe..0103e3f293 100644 --- a/flyteidl/clients/go/admin/mocks/AsyncAgentServiceClient.go +++ b/flyteidl/clients/go/admin/mocks/AsyncAgentServiceClient.go @@ -10,6 +10,8 @@ import ( grpc "google.golang.org/grpc" mock "github.com/stretchr/testify/mock" + + service "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" ) // AsyncAgentServiceClient is an autogenerated mock type for the AsyncAgentServiceClient type @@ -165,7 +167,7 @@ type AsyncAgentServiceClient_GetTaskLogs struct { *mock.Call } -func (_m AsyncAgentServiceClient_GetTaskLogs) Return(_a0 *admin.GetTaskLogsResponse, _a1 error) *AsyncAgentServiceClient_GetTaskLogs { +func (_m AsyncAgentServiceClient_GetTaskLogs) Return(_a0 service.AsyncAgentService_GetTaskLogsClient, _a1 error) *AsyncAgentServiceClient_GetTaskLogs { return &AsyncAgentServiceClient_GetTaskLogs{Call: _m.Call.Return(_a0, _a1)} } @@ -180,7 +182,7 @@ func (_m *AsyncAgentServiceClient) OnGetTaskLogsMatch(matchers ...interface{}) * } // GetTaskLogs provides a mock function with given fields: ctx, in, opts -func (_m *AsyncAgentServiceClient) GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (*admin.GetTaskLogsResponse, error) { +func (_m *AsyncAgentServiceClient) GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (service.AsyncAgentService_GetTaskLogsClient, error) { _va := make([]interface{}, len(opts)) for _i := range opts { _va[_i] = opts[_i] @@ -190,12 +192,12 @@ func (_m *AsyncAgentServiceClient) GetTaskLogs(ctx context.Context, in *admin.Ge _ca = append(_ca, _va...) ret := _m.Called(_ca...) - var r0 *admin.GetTaskLogsResponse - if rf, ok := ret.Get(0).(func(context.Context, *admin.GetTaskLogsRequest, ...grpc.CallOption) *admin.GetTaskLogsResponse); ok { + var r0 service.AsyncAgentService_GetTaskLogsClient + if rf, ok := ret.Get(0).(func(context.Context, *admin.GetTaskLogsRequest, ...grpc.CallOption) service.AsyncAgentService_GetTaskLogsClient); ok { r0 = rf(ctx, in, opts...) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*admin.GetTaskLogsResponse) + r0 = ret.Get(0).(service.AsyncAgentService_GetTaskLogsClient) } } diff --git a/flyteidl/clients/go/admin/mocks/AsyncAgentServiceServer.go b/flyteidl/clients/go/admin/mocks/AsyncAgentServiceServer.go index 1803e286eb..76b618f791 100644 --- a/flyteidl/clients/go/admin/mocks/AsyncAgentServiceServer.go +++ b/flyteidl/clients/go/admin/mocks/AsyncAgentServiceServer.go @@ -8,6 +8,8 @@ import ( admin "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" mock "github.com/stretchr/testify/mock" + + service "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" ) // AsyncAgentServiceServer is an autogenerated mock type for the AsyncAgentServiceServer type @@ -142,11 +144,11 @@ type AsyncAgentServiceServer_GetTaskLogs struct { *mock.Call } -func (_m AsyncAgentServiceServer_GetTaskLogs) Return(_a0 *admin.GetTaskLogsResponse, _a1 error) *AsyncAgentServiceServer_GetTaskLogs { - return &AsyncAgentServiceServer_GetTaskLogs{Call: _m.Call.Return(_a0, _a1)} +func (_m AsyncAgentServiceServer_GetTaskLogs) Return(_a0 error) *AsyncAgentServiceServer_GetTaskLogs { + return &AsyncAgentServiceServer_GetTaskLogs{Call: _m.Call.Return(_a0)} } -func (_m *AsyncAgentServiceServer) OnGetTaskLogs(_a0 context.Context, _a1 *admin.GetTaskLogsRequest) *AsyncAgentServiceServer_GetTaskLogs { +func (_m *AsyncAgentServiceServer) OnGetTaskLogs(_a0 *admin.GetTaskLogsRequest, _a1 service.AsyncAgentService_GetTaskLogsServer) *AsyncAgentServiceServer_GetTaskLogs { c_call := _m.On("GetTaskLogs", _a0, _a1) return &AsyncAgentServiceServer_GetTaskLogs{Call: c_call} } @@ -157,26 +159,17 @@ func (_m *AsyncAgentServiceServer) OnGetTaskLogsMatch(matchers ...interface{}) * } // GetTaskLogs provides a mock function with given fields: _a0, _a1 -func (_m *AsyncAgentServiceServer) GetTaskLogs(_a0 context.Context, _a1 *admin.GetTaskLogsRequest) (*admin.GetTaskLogsResponse, error) { +func (_m *AsyncAgentServiceServer) GetTaskLogs(_a0 *admin.GetTaskLogsRequest, _a1 service.AsyncAgentService_GetTaskLogsServer) error { ret := _m.Called(_a0, _a1) - var r0 *admin.GetTaskLogsResponse - if rf, ok := ret.Get(0).(func(context.Context, *admin.GetTaskLogsRequest) *admin.GetTaskLogsResponse); ok { + var r0 error + if rf, ok := ret.Get(0).(func(*admin.GetTaskLogsRequest, service.AsyncAgentService_GetTaskLogsServer) error); ok { r0 = rf(_a0, _a1) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*admin.GetTaskLogsResponse) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *admin.GetTaskLogsRequest) error); ok { - r1 = rf(_a0, _a1) - } else { - r1 = ret.Error(1) + r0 = ret.Error(0) } - return r0, r1 + return r0 } type AsyncAgentServiceServer_GetTaskMetrics struct { diff --git a/flyteidl/gen/pb-cpp/flyteidl/service/agent.grpc.pb.cc b/flyteidl/gen/pb-cpp/flyteidl/service/agent.grpc.pb.cc index a167711437..a81f0834be 100644 --- a/flyteidl/gen/pb-cpp/flyteidl/service/agent.grpc.pb.cc +++ b/flyteidl/gen/pb-cpp/flyteidl/service/agent.grpc.pb.cc @@ -38,7 +38,7 @@ AsyncAgentService::Stub::Stub(const std::shared_ptr< ::grpc::ChannelInterface>& , rpcmethod_GetTask_(AsyncAgentService_method_names[1], ::grpc::internal::RpcMethod::NORMAL_RPC, channel) , rpcmethod_DeleteTask_(AsyncAgentService_method_names[2], ::grpc::internal::RpcMethod::NORMAL_RPC, channel) , rpcmethod_GetTaskMetrics_(AsyncAgentService_method_names[3], ::grpc::internal::RpcMethod::NORMAL_RPC, channel) - , rpcmethod_GetTaskLogs_(AsyncAgentService_method_names[4], ::grpc::internal::RpcMethod::NORMAL_RPC, channel) + , rpcmethod_GetTaskLogs_(AsyncAgentService_method_names[4], ::grpc::internal::RpcMethod::SERVER_STREAMING, channel) {} ::grpc::Status AsyncAgentService::Stub::CreateTask(::grpc::ClientContext* context, const ::flyteidl::admin::CreateTaskRequest& request, ::flyteidl::admin::CreateTaskResponse* response) { @@ -153,32 +153,20 @@ ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskMetricsResponse>* A return ::grpc::internal::ClientAsyncResponseReaderFactory< ::flyteidl::admin::GetTaskMetricsResponse>::Create(channel_.get(), cq, rpcmethod_GetTaskMetrics_, context, request, false); } -::grpc::Status AsyncAgentService::Stub::GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::flyteidl::admin::GetTaskLogsResponse* response) { - return ::grpc::internal::BlockingUnaryCall(channel_.get(), rpcmethod_GetTaskLogs_, context, request, response); +::grpc::ClientReader< ::flyteidl::admin::GetTaskLogsResponse>* AsyncAgentService::Stub::GetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request) { + return ::grpc::internal::ClientReaderFactory< ::flyteidl::admin::GetTaskLogsResponse>::Create(channel_.get(), rpcmethod_GetTaskLogs_, context, request); } -void AsyncAgentService::Stub::experimental_async::GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response, std::function f) { - ::grpc::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_GetTaskLogs_, context, request, response, std::move(f)); +void AsyncAgentService::Stub::experimental_async::GetTaskLogs(::grpc::ClientContext* context, ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::experimental::ClientReadReactor< ::flyteidl::admin::GetTaskLogsResponse>* reactor) { + ::grpc::internal::ClientCallbackReaderFactory< ::flyteidl::admin::GetTaskLogsResponse>::Create(stub_->channel_.get(), stub_->rpcmethod_GetTaskLogs_, context, request, reactor); } -void AsyncAgentService::Stub::experimental_async::GetTaskLogs(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::flyteidl::admin::GetTaskLogsResponse* response, std::function f) { - ::grpc::internal::CallbackUnaryCall(stub_->channel_.get(), stub_->rpcmethod_GetTaskLogs_, context, request, response, std::move(f)); +::grpc::ClientAsyncReader< ::flyteidl::admin::GetTaskLogsResponse>* AsyncAgentService::Stub::AsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq, void* tag) { + return ::grpc::internal::ClientAsyncReaderFactory< ::flyteidl::admin::GetTaskLogsResponse>::Create(channel_.get(), cq, rpcmethod_GetTaskLogs_, context, request, true, tag); } -void AsyncAgentService::Stub::experimental_async::GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) { - ::grpc::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_GetTaskLogs_, context, request, response, reactor); -} - -void AsyncAgentService::Stub::experimental_async::GetTaskLogs(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::flyteidl::admin::GetTaskLogsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) { - ::grpc::internal::ClientCallbackUnaryFactory::Create(stub_->channel_.get(), stub_->rpcmethod_GetTaskLogs_, context, request, response, reactor); -} - -::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskLogsResponse>* AsyncAgentService::Stub::AsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) { - return ::grpc::internal::ClientAsyncResponseReaderFactory< ::flyteidl::admin::GetTaskLogsResponse>::Create(channel_.get(), cq, rpcmethod_GetTaskLogs_, context, request, true); -} - -::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskLogsResponse>* AsyncAgentService::Stub::PrepareAsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) { - return ::grpc::internal::ClientAsyncResponseReaderFactory< ::flyteidl::admin::GetTaskLogsResponse>::Create(channel_.get(), cq, rpcmethod_GetTaskLogs_, context, request, false); +::grpc::ClientAsyncReader< ::flyteidl::admin::GetTaskLogsResponse>* AsyncAgentService::Stub::PrepareAsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) { + return ::grpc::internal::ClientAsyncReaderFactory< ::flyteidl::admin::GetTaskLogsResponse>::Create(channel_.get(), cq, rpcmethod_GetTaskLogs_, context, request, false, nullptr); } AsyncAgentService::Service::Service() { @@ -204,8 +192,8 @@ AsyncAgentService::Service::Service() { std::mem_fn(&AsyncAgentService::Service::GetTaskMetrics), this))); AddMethod(new ::grpc::internal::RpcServiceMethod( AsyncAgentService_method_names[4], - ::grpc::internal::RpcMethod::NORMAL_RPC, - new ::grpc::internal::RpcMethodHandler< AsyncAgentService::Service, ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>( + ::grpc::internal::RpcMethod::SERVER_STREAMING, + new ::grpc::internal::ServerStreamingHandler< AsyncAgentService::Service, ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>( std::mem_fn(&AsyncAgentService::Service::GetTaskLogs), this))); } @@ -240,10 +228,10 @@ ::grpc::Status AsyncAgentService::Service::GetTaskMetrics(::grpc::ServerContext* return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } -::grpc::Status AsyncAgentService::Service::GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response) { +::grpc::Status AsyncAgentService::Service::GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerWriter< ::flyteidl::admin::GetTaskLogsResponse>* writer) { (void) context; (void) request; - (void) response; + (void) writer; return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } diff --git a/flyteidl/gen/pb-cpp/flyteidl/service/agent.grpc.pb.h b/flyteidl/gen/pb-cpp/flyteidl/service/agent.grpc.pb.h index e1ab9cd6e5..789618195f 100644 --- a/flyteidl/gen/pb-cpp/flyteidl/service/agent.grpc.pb.h +++ b/flyteidl/gen/pb-cpp/flyteidl/service/agent.grpc.pb.h @@ -86,12 +86,14 @@ class AsyncAgentService final { return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::GetTaskMetricsResponse>>(PrepareAsyncGetTaskMetricsRaw(context, request, cq)); } // GetTaskLogs returns task execution logs, if available. - virtual ::grpc::Status GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::flyteidl::admin::GetTaskLogsResponse* response) = 0; - std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>> AsyncGetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) { - return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>>(AsyncGetTaskLogsRaw(context, request, cq)); + std::unique_ptr< ::grpc::ClientReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>> GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request) { + return std::unique_ptr< ::grpc::ClientReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>>(GetTaskLogsRaw(context, request)); } - std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>> PrepareAsyncGetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) { - return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>>(PrepareAsyncGetTaskLogsRaw(context, request, cq)); + std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>> AsyncGetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq, void* tag) { + return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>>(AsyncGetTaskLogsRaw(context, request, cq, tag)); + } + std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>> PrepareAsyncGetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>>(PrepareAsyncGetTaskLogsRaw(context, request, cq)); } class experimental_async_interface { public: @@ -121,10 +123,7 @@ class AsyncAgentService final { virtual void GetTaskMetrics(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskMetricsRequest* request, ::flyteidl::admin::GetTaskMetricsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0; virtual void GetTaskMetrics(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::flyteidl::admin::GetTaskMetricsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0; // GetTaskLogs returns task execution logs, if available. - virtual void GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response, std::function) = 0; - virtual void GetTaskLogs(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::flyteidl::admin::GetTaskLogsResponse* response, std::function) = 0; - virtual void GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0; - virtual void GetTaskLogs(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::flyteidl::admin::GetTaskLogsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) = 0; + virtual void GetTaskLogs(::grpc::ClientContext* context, ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::experimental::ClientReadReactor< ::flyteidl::admin::GetTaskLogsResponse>* reactor) = 0; }; virtual class experimental_async_interface* experimental_async() { return nullptr; } private: @@ -136,8 +135,9 @@ class AsyncAgentService final { virtual ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::DeleteTaskResponse>* PrepareAsyncDeleteTaskRaw(::grpc::ClientContext* context, const ::flyteidl::admin::DeleteTaskRequest& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::GetTaskMetricsResponse>* AsyncGetTaskMetricsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskMetricsRequest& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::GetTaskMetricsResponse>* PrepareAsyncGetTaskMetricsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskMetricsRequest& request, ::grpc::CompletionQueue* cq) = 0; - virtual ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>* AsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) = 0; - virtual ::grpc::ClientAsyncResponseReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>* PrepareAsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) = 0; + virtual ::grpc::ClientReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>* GetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request) = 0; + virtual ::grpc::ClientAsyncReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>* AsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq, void* tag) = 0; + virtual ::grpc::ClientAsyncReaderInterface< ::flyteidl::admin::GetTaskLogsResponse>* PrepareAsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) = 0; }; class Stub final : public StubInterface { public: @@ -170,12 +170,14 @@ class AsyncAgentService final { std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskMetricsResponse>> PrepareAsyncGetTaskMetrics(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskMetricsRequest& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskMetricsResponse>>(PrepareAsyncGetTaskMetricsRaw(context, request, cq)); } - ::grpc::Status GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::flyteidl::admin::GetTaskLogsResponse* response) override; - std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskLogsResponse>> AsyncGetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) { - return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskLogsResponse>>(AsyncGetTaskLogsRaw(context, request, cq)); + std::unique_ptr< ::grpc::ClientReader< ::flyteidl::admin::GetTaskLogsResponse>> GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request) { + return std::unique_ptr< ::grpc::ClientReader< ::flyteidl::admin::GetTaskLogsResponse>>(GetTaskLogsRaw(context, request)); + } + std::unique_ptr< ::grpc::ClientAsyncReader< ::flyteidl::admin::GetTaskLogsResponse>> AsyncGetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq, void* tag) { + return std::unique_ptr< ::grpc::ClientAsyncReader< ::flyteidl::admin::GetTaskLogsResponse>>(AsyncGetTaskLogsRaw(context, request, cq, tag)); } - std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskLogsResponse>> PrepareAsyncGetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) { - return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskLogsResponse>>(PrepareAsyncGetTaskLogsRaw(context, request, cq)); + std::unique_ptr< ::grpc::ClientAsyncReader< ::flyteidl::admin::GetTaskLogsResponse>> PrepareAsyncGetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) { + return std::unique_ptr< ::grpc::ClientAsyncReader< ::flyteidl::admin::GetTaskLogsResponse>>(PrepareAsyncGetTaskLogsRaw(context, request, cq)); } class experimental_async final : public StubInterface::experimental_async_interface { @@ -196,10 +198,7 @@ class AsyncAgentService final { void GetTaskMetrics(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::flyteidl::admin::GetTaskMetricsResponse* response, std::function) override; void GetTaskMetrics(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskMetricsRequest* request, ::flyteidl::admin::GetTaskMetricsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) override; void GetTaskMetrics(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::flyteidl::admin::GetTaskMetricsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) override; - void GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response, std::function) override; - void GetTaskLogs(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::flyteidl::admin::GetTaskLogsResponse* response, std::function) override; - void GetTaskLogs(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) override; - void GetTaskLogs(::grpc::ClientContext* context, const ::grpc::ByteBuffer* request, ::flyteidl::admin::GetTaskLogsResponse* response, ::grpc::experimental::ClientUnaryReactor* reactor) override; + void GetTaskLogs(::grpc::ClientContext* context, ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::experimental::ClientReadReactor< ::flyteidl::admin::GetTaskLogsResponse>* reactor) override; private: friend class Stub; explicit experimental_async(Stub* stub): stub_(stub) { } @@ -219,8 +218,9 @@ class AsyncAgentService final { ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::DeleteTaskResponse>* PrepareAsyncDeleteTaskRaw(::grpc::ClientContext* context, const ::flyteidl::admin::DeleteTaskRequest& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskMetricsResponse>* AsyncGetTaskMetricsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskMetricsRequest& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskMetricsResponse>* PrepareAsyncGetTaskMetricsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskMetricsRequest& request, ::grpc::CompletionQueue* cq) override; - ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskLogsResponse>* AsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) override; - ::grpc::ClientAsyncResponseReader< ::flyteidl::admin::GetTaskLogsResponse>* PrepareAsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) override; + ::grpc::ClientReader< ::flyteidl::admin::GetTaskLogsResponse>* GetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request) override; + ::grpc::ClientAsyncReader< ::flyteidl::admin::GetTaskLogsResponse>* AsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq, void* tag) override; + ::grpc::ClientAsyncReader< ::flyteidl::admin::GetTaskLogsResponse>* PrepareAsyncGetTaskLogsRaw(::grpc::ClientContext* context, const ::flyteidl::admin::GetTaskLogsRequest& request, ::grpc::CompletionQueue* cq) override; const ::grpc::internal::RpcMethod rpcmethod_CreateTask_; const ::grpc::internal::RpcMethod rpcmethod_GetTask_; const ::grpc::internal::RpcMethod rpcmethod_DeleteTask_; @@ -246,7 +246,7 @@ class AsyncAgentService final { // * various other errors virtual ::grpc::Status GetTaskMetrics(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskMetricsRequest* request, ::flyteidl::admin::GetTaskMetricsResponse* response); // GetTaskLogs returns task execution logs, if available. - virtual ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response); + virtual ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerWriter< ::flyteidl::admin::GetTaskLogsResponse>* writer); }; template class WithAsyncMethod_CreateTask : public BaseClass { @@ -340,12 +340,12 @@ class AsyncAgentService final { BaseClassMustBeDerivedFromService(this); } // disable synchronous version of this method - ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response) override { + ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerWriter< ::flyteidl::admin::GetTaskLogsResponse>* writer) override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } - void RequestGetTaskLogs(::grpc::ServerContext* context, ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerAsyncResponseWriter< ::flyteidl::admin::GetTaskLogsResponse>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) { - ::grpc::Service::RequestAsyncUnary(4, context, request, response, new_call_cq, notification_cq, tag); + void RequestGetTaskLogs(::grpc::ServerContext* context, ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerAsyncWriter< ::flyteidl::admin::GetTaskLogsResponse>* writer, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) { + ::grpc::Service::RequestAsyncServerStreaming(4, context, request, writer, new_call_cq, notification_cq, tag); } }; typedef WithAsyncMethod_CreateTask > > > > AsyncService; @@ -480,29 +480,20 @@ class AsyncAgentService final { public: ExperimentalWithCallbackMethod_GetTaskLogs() { ::grpc::Service::experimental().MarkMethodCallback(4, - new ::grpc::internal::CallbackUnaryHandler< ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>( - [this](::grpc::ServerContext* context, - const ::flyteidl::admin::GetTaskLogsRequest* request, - ::flyteidl::admin::GetTaskLogsResponse* response, - ::grpc::experimental::ServerCallbackRpcController* controller) { - return this->GetTaskLogs(context, request, response, controller); - })); - } - void SetMessageAllocatorFor_GetTaskLogs( - ::grpc::experimental::MessageAllocator< ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>* allocator) { - static_cast<::grpc::internal::CallbackUnaryHandler< ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>*>( - ::grpc::Service::experimental().GetHandler(4)) - ->SetMessageAllocator(allocator); + new ::grpc::internal::CallbackServerStreamingHandler< ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>( + [this] { return this->GetTaskLogs(); })); } ~ExperimentalWithCallbackMethod_GetTaskLogs() override { BaseClassMustBeDerivedFromService(this); } // disable synchronous version of this method - ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response) override { + ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerWriter< ::flyteidl::admin::GetTaskLogsResponse>* writer) override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } - virtual void GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + virtual ::grpc::experimental::ServerWriteReactor< ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>* GetTaskLogs() { + return new ::grpc::internal::UnimplementedWriteReactor< + ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>;} }; typedef ExperimentalWithCallbackMethod_CreateTask > > > > ExperimentalCallbackService; template @@ -585,7 +576,7 @@ class AsyncAgentService final { BaseClassMustBeDerivedFromService(this); } // disable synchronous version of this method - ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response) override { + ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerWriter< ::flyteidl::admin::GetTaskLogsResponse>* writer) override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } @@ -682,12 +673,12 @@ class AsyncAgentService final { BaseClassMustBeDerivedFromService(this); } // disable synchronous version of this method - ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response) override { + ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerWriter< ::flyteidl::admin::GetTaskLogsResponse>* writer) override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } - void RequestGetTaskLogs(::grpc::ServerContext* context, ::grpc::ByteBuffer* request, ::grpc::ServerAsyncResponseWriter< ::grpc::ByteBuffer>* response, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) { - ::grpc::Service::RequestAsyncUnary(4, context, request, response, new_call_cq, notification_cq, tag); + void RequestGetTaskLogs(::grpc::ServerContext* context, ::grpc::ByteBuffer* request, ::grpc::ServerAsyncWriter< ::grpc::ByteBuffer>* writer, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) { + ::grpc::Service::RequestAsyncServerStreaming(4, context, request, writer, new_call_cq, notification_cq, tag); } }; template @@ -797,23 +788,20 @@ class AsyncAgentService final { public: ExperimentalWithRawCallbackMethod_GetTaskLogs() { ::grpc::Service::experimental().MarkMethodRawCallback(4, - new ::grpc::internal::CallbackUnaryHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>( - [this](::grpc::ServerContext* context, - const ::grpc::ByteBuffer* request, - ::grpc::ByteBuffer* response, - ::grpc::experimental::ServerCallbackRpcController* controller) { - this->GetTaskLogs(context, request, response, controller); - })); + new ::grpc::internal::CallbackServerStreamingHandler< ::grpc::ByteBuffer, ::grpc::ByteBuffer>( + [this] { return this->GetTaskLogs(); })); } ~ExperimentalWithRawCallbackMethod_GetTaskLogs() override { BaseClassMustBeDerivedFromService(this); } // disable synchronous version of this method - ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response) override { + ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerWriter< ::flyteidl::admin::GetTaskLogsResponse>* writer) override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } - virtual void GetTaskLogs(::grpc::ServerContext* context, const ::grpc::ByteBuffer* request, ::grpc::ByteBuffer* response, ::grpc::experimental::ServerCallbackRpcController* controller) { controller->Finish(::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); } + virtual ::grpc::experimental::ServerWriteReactor< ::grpc::ByteBuffer, ::grpc::ByteBuffer>* GetTaskLogs() { + return new ::grpc::internal::UnimplementedWriteReactor< + ::grpc::ByteBuffer, ::grpc::ByteBuffer>;} }; template class WithStreamedUnaryMethod_CreateTask : public BaseClass { @@ -895,29 +883,29 @@ class AsyncAgentService final { // replace default version of method with streamed unary virtual ::grpc::Status StreamedGetTaskMetrics(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< ::flyteidl::admin::GetTaskMetricsRequest,::flyteidl::admin::GetTaskMetricsResponse>* server_unary_streamer) = 0; }; + typedef WithStreamedUnaryMethod_CreateTask > > > StreamedUnaryService; template - class WithStreamedUnaryMethod_GetTaskLogs : public BaseClass { + class WithSplitStreamingMethod_GetTaskLogs : public BaseClass { private: void BaseClassMustBeDerivedFromService(const Service *service) {} public: - WithStreamedUnaryMethod_GetTaskLogs() { + WithSplitStreamingMethod_GetTaskLogs() { ::grpc::Service::MarkMethodStreamed(4, - new ::grpc::internal::StreamedUnaryHandler< ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>(std::bind(&WithStreamedUnaryMethod_GetTaskLogs::StreamedGetTaskLogs, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::SplitServerStreamingHandler< ::flyteidl::admin::GetTaskLogsRequest, ::flyteidl::admin::GetTaskLogsResponse>(std::bind(&WithSplitStreamingMethod_GetTaskLogs::StreamedGetTaskLogs, this, std::placeholders::_1, std::placeholders::_2))); } - ~WithStreamedUnaryMethod_GetTaskLogs() override { + ~WithSplitStreamingMethod_GetTaskLogs() override { BaseClassMustBeDerivedFromService(this); } // disable regular version of this method - ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::flyteidl::admin::GetTaskLogsResponse* response) override { + ::grpc::Status GetTaskLogs(::grpc::ServerContext* context, const ::flyteidl::admin::GetTaskLogsRequest* request, ::grpc::ServerWriter< ::flyteidl::admin::GetTaskLogsResponse>* writer) override { abort(); return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""); } - // replace default version of method with streamed unary - virtual ::grpc::Status StreamedGetTaskLogs(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< ::flyteidl::admin::GetTaskLogsRequest,::flyteidl::admin::GetTaskLogsResponse>* server_unary_streamer) = 0; + // replace default version of method with split streamed + virtual ::grpc::Status StreamedGetTaskLogs(::grpc::ServerContext* context, ::grpc::ServerSplitStreamer< ::flyteidl::admin::GetTaskLogsRequest,::flyteidl::admin::GetTaskLogsResponse>* server_split_streamer) = 0; }; - typedef WithStreamedUnaryMethod_CreateTask > > > > StreamedUnaryService; - typedef Service SplitStreamedService; - typedef WithStreamedUnaryMethod_CreateTask > > > > StreamedService; + typedef WithSplitStreamingMethod_GetTaskLogs SplitStreamedService; + typedef WithStreamedUnaryMethod_CreateTask > > > > StreamedService; }; // AgentMetadataService defines an RPC service that is also served over HTTP via grpc-gateway. diff --git a/flyteidl/gen/pb-cpp/flyteidl/service/agent.pb.cc b/flyteidl/gen/pb-cpp/flyteidl/service/agent.pb.cc index 3d6379889b..affa0bc795 100644 --- a/flyteidl/gen/pb-cpp/flyteidl/service/agent.pb.cc +++ b/flyteidl/gen/pb-cpp/flyteidl/service/agent.pb.cc @@ -39,7 +39,7 @@ ::google::protobuf::internal::AssignDescriptorsTable assign_descriptors_table_fl const char descriptor_table_protodef_flyteidl_2fservice_2fagent_2eproto[] = "\n\034flyteidl/service/agent.proto\022\020flyteidl" ".service\032\034google/api/annotations.proto\032\032" - "flyteidl/admin/agent.proto2\314\003\n\021AsyncAgen" + "flyteidl/admin/agent.proto2\316\003\n\021AsyncAgen" "tService\022U\n\nCreateTask\022!.flyteidl.admin." "CreateTaskRequest\032\".flyteidl.admin.Creat" "eTaskResponse\"\000\022L\n\007GetTask\022\036.flyteidl.ad" @@ -49,22 +49,22 @@ const char descriptor_table_protodef_flyteidl_2fservice_2fagent_2eproto[] = "DeleteTaskResponse\"\000\022a\n\016GetTaskMetrics\022%" ".flyteidl.admin.GetTaskMetricsRequest\032&." "flyteidl.admin.GetTaskMetricsResponse\"\000\022" - "X\n\013GetTaskLogs\022\".flyteidl.admin.GetTaskL" + "Z\n\013GetTaskLogs\022\".flyteidl.admin.GetTaskL" "ogsRequest\032#.flyteidl.admin.GetTaskLogsR" - "esponse\"\0002\360\001\n\024AgentMetadataService\022k\n\010Ge" - "tAgent\022\037.flyteidl.admin.GetAgentRequest\032" - " .flyteidl.admin.GetAgentResponse\"\034\202\323\344\223\002" - "\026\022\024/api/v1/agent/{name}\022k\n\nListAgents\022!." - "flyteidl.admin.ListAgentsRequest\032\".flyte" - "idl.admin.ListAgentsResponse\"\026\202\323\344\223\002\020\022\016/a" - "pi/v1/agentsB\?Z=github.com/flyteorg/flyt" - "e/flyteidl/gen/pb-go/flyteidl/serviceb\006p" - "roto3" + "esponse\"\0000\0012\360\001\n\024AgentMetadataService\022k\n\010" + "GetAgent\022\037.flyteidl.admin.GetAgentReques" + "t\032 .flyteidl.admin.GetAgentResponse\"\034\202\323\344" + "\223\002\026\022\024/api/v1/agent/{name}\022k\n\nListAgents\022" + "!.flyteidl.admin.ListAgentsRequest\032\".fly" + "teidl.admin.ListAgentsResponse\"\026\202\323\344\223\002\020\022\016" + "/api/v1/agentsB\?Z=github.com/flyteorg/fl" + "yte/flyteidl/gen/pb-go/flyteidl/serviceb" + "\006proto3" ; ::google::protobuf::internal::DescriptorTable descriptor_table_flyteidl_2fservice_2fagent_2eproto = { false, InitDefaults_flyteidl_2fservice_2fagent_2eproto, descriptor_table_protodef_flyteidl_2fservice_2fagent_2eproto, - "flyteidl/service/agent.proto", &assign_descriptors_table_flyteidl_2fservice_2fagent_2eproto, 885, + "flyteidl/service/agent.proto", &assign_descriptors_table_flyteidl_2fservice_2fagent_2eproto, 887, }; void AddDescriptors_flyteidl_2fservice_2fagent_2eproto() { diff --git a/flyteidl/gen/pb-go/flyteidl/service/agent.pb.go b/flyteidl/gen/pb-go/flyteidl/service/agent.pb.go index 478c62819c..56782f31a7 100644 --- a/flyteidl/gen/pb-go/flyteidl/service/agent.pb.go +++ b/flyteidl/gen/pb-go/flyteidl/service/agent.pb.go @@ -29,30 +29,31 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package func init() { proto.RegisterFile("flyteidl/service/agent.proto", fileDescriptor_f7d1dfd1fb77d2ef) } var fileDescriptor_f7d1dfd1fb77d2ef = []byte{ - // 367 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x93, 0xcd, 0x4a, 0x33, 0x31, - 0x14, 0x86, 0xbf, 0x4f, 0x41, 0x25, 0x42, 0xa9, 0xa1, 0x74, 0x31, 0x14, 0x7f, 0x2a, 0xba, 0x73, - 0x82, 0x75, 0x2d, 0x52, 0x15, 0xba, 0x69, 0x37, 0x6a, 0x41, 0xdc, 0xa5, 0xd3, 0x63, 0x0c, 0x9d, - 0x26, 0xe3, 0xe4, 0xb4, 0x50, 0xc4, 0x8d, 0xb7, 0xe0, 0x6d, 0xb9, 0xf3, 0x16, 0xbc, 0x00, 0x2f, - 0x41, 0x9a, 0x99, 0x4c, 0x6b, 0x35, 0x75, 0x57, 0xfa, 0xbc, 0x79, 0xde, 0x30, 0xe7, 0x84, 0xd4, - 0xee, 0xe3, 0x09, 0x82, 0xec, 0xc7, 0xcc, 0x40, 0x3a, 0x96, 0x11, 0x30, 0x2e, 0x40, 0x61, 0x98, - 0xa4, 0x1a, 0x35, 0x2d, 0x3b, 0x1a, 0xe6, 0x34, 0xa8, 0x09, 0xad, 0x45, 0x0c, 0x8c, 0x27, 0x92, - 0x71, 0xa5, 0x34, 0x72, 0x94, 0x5a, 0x99, 0x2c, 0x1f, 0x04, 0x85, 0x8d, 0xf7, 0x87, 0x52, 0xcd, - 0xbb, 0x1a, 0x6f, 0xab, 0x64, 0xab, 0x69, 0x26, 0x2a, 0x6a, 0x4e, 0xff, 0xbc, 0xce, 0x7c, 0xb4, - 0x4b, 0xc8, 0x45, 0x0a, 0x1c, 0xe1, 0x86, 0x9b, 0x01, 0xdd, 0x0b, 0x8b, 0x42, 0x2b, 0x08, 0x67, - 0xec, 0x0a, 0x1e, 0x47, 0x60, 0x30, 0xa8, 0x2f, 0x8b, 0x98, 0x44, 0x2b, 0x03, 0xf5, 0x7f, 0xb4, - 0x4d, 0xd6, 0x5b, 0x80, 0xd6, 0xb9, 0xbd, 0x78, 0x20, 0x07, 0x4e, 0xb8, 0xe3, 0xe5, 0x85, 0xad, - 0x4b, 0xc8, 0x25, 0xc4, 0xe0, 0xbb, 0xe4, 0x8c, 0x79, 0x2f, 0x39, 0x1f, 0x29, 0xb4, 0x9c, 0x94, - 0xf2, 0xae, 0x0e, 0x60, 0x2a, 0x23, 0x43, 0x0f, 0x3c, 0x77, 0xc9, 0xb9, 0xd3, 0x1f, 0xfe, 0x15, - 0x2b, 0x2a, 0x6e, 0xc9, 0x66, 0xce, 0xda, 0x5a, 0x18, 0x5a, 0xf7, 0x1c, 0x9c, 0x42, 0x27, 0xdf, - 0x5f, 0x9a, 0x71, 0xe6, 0xc6, 0xe7, 0x7f, 0x52, 0xb1, 0x93, 0xec, 0x00, 0xf2, 0x3e, 0x47, 0xee, - 0x26, 0x3a, 0x20, 0x1b, 0x2d, 0x40, 0x8b, 0xe8, 0x6f, 0xdf, 0xd6, 0x12, 0x57, 0xb6, 0xeb, 0x0f, - 0xe4, 0x4d, 0xb5, 0x97, 0xf7, 0x8f, 0xd7, 0x95, 0x2a, 0xad, 0xd8, 0x95, 0x1b, 0x1f, 0x67, 0x3b, - 0xc5, 0x9e, 0x14, 0x1f, 0xc2, 0x33, 0x1d, 0x10, 0xd2, 0x96, 0x26, 0x3b, 0x62, 0x7e, 0x4e, 0x66, - 0xc6, 0xbc, 0x93, 0x99, 0x8f, 0xe4, 0x95, 0x55, 0x5b, 0x59, 0xa6, 0xa5, 0x6f, 0x95, 0xe6, 0xfc, - 0xec, 0xee, 0x54, 0x48, 0x7c, 0x18, 0xf5, 0xc2, 0x48, 0x0f, 0x99, 0xf5, 0xe8, 0x54, 0x64, 0x3f, - 0x58, 0xb1, 0xf9, 0x02, 0x14, 0x4b, 0x7a, 0x47, 0x42, 0xb3, 0xc5, 0xa7, 0xd5, 0x5b, 0xb3, 0x2f, - 0xe1, 0xe4, 0x2b, 0x00, 0x00, 0xff, 0xff, 0x64, 0x33, 0x1f, 0x2d, 0x75, 0x03, 0x00, 0x00, + // 369 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x93, 0xcd, 0x4e, 0x32, 0x31, + 0x14, 0x86, 0x3f, 0x3e, 0x13, 0x35, 0x35, 0x21, 0xd8, 0x10, 0x16, 0x13, 0xe2, 0x0f, 0x46, 0x77, + 0x4e, 0x15, 0xd7, 0xc6, 0xa0, 0x26, 0x6c, 0x60, 0xa3, 0xb2, 0x61, 0x57, 0x86, 0x63, 0x6d, 0x18, + 0xda, 0x71, 0x5a, 0x48, 0x88, 0x71, 0xe3, 0x2d, 0x78, 0x5f, 0x6e, 0xbc, 0x05, 0x2f, 0xc0, 0x4b, + 0x30, 0x74, 0xda, 0x61, 0x44, 0x8b, 0x3b, 0xc2, 0xf3, 0xf6, 0x79, 0x9b, 0x39, 0xa7, 0xa8, 0x7e, + 0x1f, 0xcf, 0x34, 0xf0, 0x61, 0x4c, 0x14, 0xa4, 0x53, 0x1e, 0x01, 0xa1, 0x0c, 0x84, 0x0e, 0x93, + 0x54, 0x6a, 0x89, 0x2b, 0x8e, 0x86, 0x96, 0x06, 0x75, 0x26, 0x25, 0x8b, 0x81, 0xd0, 0x84, 0x13, + 0x2a, 0x84, 0xd4, 0x54, 0x73, 0x29, 0x54, 0x96, 0x0f, 0x82, 0xdc, 0x46, 0x87, 0x63, 0x2e, 0x8a, + 0xae, 0xe6, 0xdb, 0x1a, 0xda, 0x6e, 0xa9, 0x99, 0x88, 0x5a, 0xf3, 0x3f, 0x6f, 0x33, 0x1f, 0xee, + 0x21, 0x74, 0x95, 0x02, 0xd5, 0x70, 0x47, 0xd5, 0x08, 0xef, 0x87, 0x79, 0xa1, 0x11, 0x84, 0x0b, + 0x76, 0x03, 0x8f, 0x13, 0x50, 0x3a, 0x68, 0xac, 0x8a, 0xa8, 0x44, 0x0a, 0x05, 0x8d, 0x7f, 0xb8, + 0x83, 0x36, 0xda, 0xa0, 0x8d, 0x73, 0x67, 0xf9, 0x80, 0x05, 0x4e, 0xb8, 0xeb, 0xe5, 0xb9, 0xad, + 0x87, 0xd0, 0x35, 0xc4, 0xe0, 0xbb, 0xe4, 0x82, 0x79, 0x2f, 0x59, 0x8c, 0xe4, 0x5a, 0x8a, 0xca, + 0xb6, 0xab, 0x0b, 0x3a, 0xe5, 0x91, 0xc2, 0x87, 0x9e, 0xbb, 0x58, 0xee, 0xf4, 0x47, 0x7f, 0xc5, + 0xf2, 0x8a, 0x3e, 0xda, 0xb2, 0xac, 0x23, 0x99, 0xc2, 0x0d, 0xcf, 0xc1, 0x39, 0x74, 0xf2, 0x83, + 0x95, 0x19, 0x67, 0x3e, 0x29, 0x35, 0x3f, 0x4b, 0xa8, 0x6a, 0x66, 0xd9, 0x05, 0x4d, 0x87, 0x54, + 0x53, 0x37, 0xd3, 0x11, 0xda, 0x6c, 0x83, 0x36, 0x08, 0xff, 0xf6, 0x75, 0x0d, 0x71, 0x75, 0x7b, + 0xfe, 0x80, 0xed, 0xaa, 0xbf, 0xbc, 0x7f, 0xbc, 0xfe, 0xaf, 0xe1, 0xaa, 0x59, 0xba, 0xe9, 0x69, + 0xb6, 0x55, 0xe4, 0x49, 0xd0, 0x31, 0x3c, 0xe3, 0x11, 0x42, 0x1d, 0xae, 0xb2, 0x23, 0xea, 0xe7, + 0x6c, 0x16, 0xcc, 0x3b, 0x9b, 0x62, 0xc4, 0x56, 0xd6, 0x4c, 0x65, 0x05, 0x97, 0xbf, 0x55, 0xaa, + 0xcb, 0x8b, 0xfe, 0x39, 0xe3, 0xfa, 0x61, 0x32, 0x08, 0x23, 0x39, 0x26, 0xc6, 0x23, 0x53, 0x96, + 0xfd, 0x20, 0xf9, 0xee, 0x33, 0x10, 0x24, 0x19, 0x1c, 0x33, 0x49, 0x96, 0x1f, 0xd7, 0x60, 0xdd, + 0xbc, 0x85, 0xb3, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x4c, 0x53, 0x1f, 0xba, 0x77, 0x03, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -80,7 +81,7 @@ type AsyncAgentServiceClient interface { // * various other errors GetTaskMetrics(ctx context.Context, in *admin.GetTaskMetricsRequest, opts ...grpc.CallOption) (*admin.GetTaskMetricsResponse, error) // GetTaskLogs returns task execution logs, if available. - GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (*admin.GetTaskLogsResponse, error) + GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (AsyncAgentService_GetTaskLogsClient, error) } type asyncAgentServiceClient struct { @@ -127,13 +128,36 @@ func (c *asyncAgentServiceClient) GetTaskMetrics(ctx context.Context, in *admin. return out, nil } -func (c *asyncAgentServiceClient) GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (*admin.GetTaskLogsResponse, error) { - out := new(admin.GetTaskLogsResponse) - err := c.cc.Invoke(ctx, "/flyteidl.service.AsyncAgentService/GetTaskLogs", in, out, opts...) +func (c *asyncAgentServiceClient) GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (AsyncAgentService_GetTaskLogsClient, error) { + stream, err := c.cc.NewStream(ctx, &_AsyncAgentService_serviceDesc.Streams[0], "/flyteidl.service.AsyncAgentService/GetTaskLogs", opts...) if err != nil { return nil, err } - return out, nil + x := &asyncAgentServiceGetTaskLogsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type AsyncAgentService_GetTaskLogsClient interface { + Recv() (*admin.GetTaskLogsResponse, error) + grpc.ClientStream +} + +type asyncAgentServiceGetTaskLogsClient struct { + grpc.ClientStream +} + +func (x *asyncAgentServiceGetTaskLogsClient) Recv() (*admin.GetTaskLogsResponse, error) { + m := new(admin.GetTaskLogsResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // AsyncAgentServiceServer is the server API for AsyncAgentService service. @@ -151,7 +175,7 @@ type AsyncAgentServiceServer interface { // * various other errors GetTaskMetrics(context.Context, *admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsResponse, error) // GetTaskLogs returns task execution logs, if available. - GetTaskLogs(context.Context, *admin.GetTaskLogsRequest) (*admin.GetTaskLogsResponse, error) + GetTaskLogs(*admin.GetTaskLogsRequest, AsyncAgentService_GetTaskLogsServer) error } // UnimplementedAsyncAgentServiceServer can be embedded to have forward compatible implementations. @@ -170,8 +194,8 @@ func (*UnimplementedAsyncAgentServiceServer) DeleteTask(ctx context.Context, req func (*UnimplementedAsyncAgentServiceServer) GetTaskMetrics(ctx context.Context, req *admin.GetTaskMetricsRequest) (*admin.GetTaskMetricsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetTaskMetrics not implemented") } -func (*UnimplementedAsyncAgentServiceServer) GetTaskLogs(ctx context.Context, req *admin.GetTaskLogsRequest) (*admin.GetTaskLogsResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetTaskLogs not implemented") +func (*UnimplementedAsyncAgentServiceServer) GetTaskLogs(req *admin.GetTaskLogsRequest, srv AsyncAgentService_GetTaskLogsServer) error { + return status.Errorf(codes.Unimplemented, "method GetTaskLogs not implemented") } func RegisterAsyncAgentServiceServer(s *grpc.Server, srv AsyncAgentServiceServer) { @@ -250,22 +274,25 @@ func _AsyncAgentService_GetTaskMetrics_Handler(srv interface{}, ctx context.Cont return interceptor(ctx, in, info, handler) } -func _AsyncAgentService_GetTaskLogs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(admin.GetTaskLogsRequest) - if err := dec(in); err != nil { - return nil, err +func _AsyncAgentService_GetTaskLogs_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(admin.GetTaskLogsRequest) + if err := stream.RecvMsg(m); err != nil { + return err } - if interceptor == nil { - return srv.(AsyncAgentServiceServer).GetTaskLogs(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/flyteidl.service.AsyncAgentService/GetTaskLogs", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(AsyncAgentServiceServer).GetTaskLogs(ctx, req.(*admin.GetTaskLogsRequest)) - } - return interceptor(ctx, in, info, handler) + return srv.(AsyncAgentServiceServer).GetTaskLogs(m, &asyncAgentServiceGetTaskLogsServer{stream}) +} + +type AsyncAgentService_GetTaskLogsServer interface { + Send(*admin.GetTaskLogsResponse) error + grpc.ServerStream +} + +type asyncAgentServiceGetTaskLogsServer struct { + grpc.ServerStream +} + +func (x *asyncAgentServiceGetTaskLogsServer) Send(m *admin.GetTaskLogsResponse) error { + return x.ServerStream.SendMsg(m) } var _AsyncAgentService_serviceDesc = grpc.ServiceDesc{ @@ -288,12 +315,14 @@ var _AsyncAgentService_serviceDesc = grpc.ServiceDesc{ MethodName: "GetTaskMetrics", Handler: _AsyncAgentService_GetTaskMetrics_Handler, }, + }, + Streams: []grpc.StreamDesc{ { - MethodName: "GetTaskLogs", - Handler: _AsyncAgentService_GetTaskLogs_Handler, + StreamName: "GetTaskLogs", + Handler: _AsyncAgentService_GetTaskLogs_Handler, + ServerStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "flyteidl/service/agent.proto", } diff --git a/flyteidl/gen/pb-go/flyteidl/service/agent.swagger.json b/flyteidl/gen/pb-go/flyteidl/service/agent.swagger.json index 33eb5cf445..44392e1b6d 100644 --- a/flyteidl/gen/pb-go/flyteidl/service/agent.swagger.json +++ b/flyteidl/gen/pb-go/flyteidl/service/agent.swagger.json @@ -1514,6 +1514,21 @@ }, "description": "Represents a subset of runtime task execution metadata that are relevant to external plugins." }, + "protobufAny": { + "type": "object", + "properties": { + "type_url": { + "type": "string", + "description": "A URL/resource name that uniquely identifies the type of the serialized\nprotocol buffer message. This string must contain at least\none \"/\" character. The last segment of the URL's path must represent\nthe fully qualified name of the type (as in\n`path/google.protobuf.Duration`). The name should be in a canonical form\n(e.g., leading \".\" is not accepted).\n\nIn practice, teams usually precompile into the binary all types that they\nexpect it to use in the context of Any. However, for URLs which use the\nscheme `http`, `https`, or no scheme, one can optionally set up a type\nserver that maps type URLs to message definitions as follows:\n\n* If no scheme is provided, `https` is assumed.\n* An HTTP GET on the URL must yield a [google.protobuf.Type][]\n value in binary format, or produce an error.\n* Applications are allowed to cache lookup results based on the\n URL, or have them precompiled into a binary to avoid any\n lookup. Therefore, binary compatibility needs to be preserved\n on changes to types. (Use versioned type names to manage\n breaking changes.)\n\nNote: this functionality is not currently available in the official\nprotobuf release, and it is not used for type URLs beginning with\ntype.googleapis.com.\n\nSchemes other than `http`, `https` (or the empty scheme) might be\nused with implementation specific semantics." + }, + "value": { + "type": "string", + "format": "byte", + "description": "Must be a valid serialized protocol buffer of the above specified type." + } + }, + "description": "`Any` contains an arbitrary serialized protocol buffer message along with a\nURL that describes the type of the serialized message.\n\nProtobuf library provides support to pack/unpack Any values in the form\nof utility functions or additional generated methods of the Any type.\n\nExample 1: Pack and unpack a message in C++.\n\n Foo foo = ...;\n Any any;\n any.PackFrom(foo);\n ...\n if (any.UnpackTo(\u0026foo)) {\n ...\n }\n\nExample 2: Pack and unpack a message in Java.\n\n Foo foo = ...;\n Any any = Any.pack(foo);\n ...\n if (any.is(Foo.class)) {\n foo = any.unpack(Foo.class);\n }\n\n Example 3: Pack and unpack a message in Python.\n\n foo = Foo(...)\n any = Any()\n any.Pack(foo)\n ...\n if any.Is(Foo.DESCRIPTOR):\n any.Unpack(foo)\n ...\n\n Example 4: Pack and unpack a message in Go\n\n foo := \u0026pb.Foo{...}\n any, err := ptypes.MarshalAny(foo)\n ...\n foo := \u0026pb.Foo{}\n if err := ptypes.UnmarshalAny(any, foo); err != nil {\n ...\n }\n\nThe pack methods provided by protobuf library will by default use\n'type.googleapis.com/full.type.name' as the type URL and the unpack\nmethods only use the fully qualified type name after the last '/'\nin the type URL, for example \"foo.bar.com/x/y.z\" will yield type\nname \"y.z\".\n\n\nJSON\n====\nThe JSON representation of an `Any` value uses the regular\nrepresentation of the deserialized, embedded message, with an\nadditional field `@type` which contains the type URL. Example:\n\n package google.profile;\n message Person {\n string first_name = 1;\n string last_name = 2;\n }\n\n {\n \"@type\": \"type.googleapis.com/google.profile.Person\",\n \"firstName\": \u003cstring\u003e,\n \"lastName\": \u003cstring\u003e\n }\n\nIf the embedded message type is well-known and has a custom JSON\nrepresentation, that representation will be embedded adding a field\n`value` which holds the custom JSON in addition to the `@type`\nfield. Example (for message [google.protobuf.Duration][]):\n\n {\n \"@type\": \"type.googleapis.com/google.protobuf.Duration\",\n \"value\": \"1.212s\"\n }" + }, "protobufListValue": { "type": "object", "properties": { @@ -1579,6 +1594,45 @@ } }, "description": "`Value` represents a dynamically typed value which can be either\nnull, a number, a string, a boolean, a recursive struct value, or a\nlist of values. A producer of value is expected to set one of that\nvariants, absence of any variant indicates an error.\n\nThe JSON representation for `Value` is JSON value." + }, + "runtimeStreamError": { + "type": "object", + "properties": { + "grpc_code": { + "type": "integer", + "format": "int32" + }, + "http_code": { + "type": "integer", + "format": "int32" + }, + "message": { + "type": "string" + }, + "http_status": { + "type": "string" + }, + "details": { + "type": "array", + "items": { + "$ref": "#/definitions/protobufAny" + } + } + } + } + }, + "x-stream-definitions": { + "adminGetTaskLogsResponse": { + "type": "object", + "properties": { + "result": { + "$ref": "#/definitions/adminGetTaskLogsResponse" + }, + "error": { + "$ref": "#/definitions/runtimeStreamError" + } + }, + "title": "Stream result of adminGetTaskLogsResponse" } } } diff --git a/flyteidl/gen/pb-java/flyteidl/service/Agent.java b/flyteidl/gen/pb-java/flyteidl/service/Agent.java index 7242f5ba36..5ca2a4262f 100644 --- a/flyteidl/gen/pb-java/flyteidl/service/Agent.java +++ b/flyteidl/gen/pb-java/flyteidl/service/Agent.java @@ -25,7 +25,7 @@ public static void registerAllExtensions( java.lang.String[] descriptorData = { "\n\034flyteidl/service/agent.proto\022\020flyteidl" + ".service\032\034google/api/annotations.proto\032\032" + - "flyteidl/admin/agent.proto2\314\003\n\021AsyncAgen" + + "flyteidl/admin/agent.proto2\316\003\n\021AsyncAgen" + "tService\022U\n\nCreateTask\022!.flyteidl.admin." + "CreateTaskRequest\032\".flyteidl.admin.Creat" + "eTaskResponse\"\000\022L\n\007GetTask\022\036.flyteidl.ad" + @@ -35,17 +35,17 @@ public static void registerAllExtensions( "DeleteTaskResponse\"\000\022a\n\016GetTaskMetrics\022%" + ".flyteidl.admin.GetTaskMetricsRequest\032&." + "flyteidl.admin.GetTaskMetricsResponse\"\000\022" + - "X\n\013GetTaskLogs\022\".flyteidl.admin.GetTaskL" + + "Z\n\013GetTaskLogs\022\".flyteidl.admin.GetTaskL" + "ogsRequest\032#.flyteidl.admin.GetTaskLogsR" + - "esponse\"\0002\360\001\n\024AgentMetadataService\022k\n\010Ge" + - "tAgent\022\037.flyteidl.admin.GetAgentRequest\032" + - " .flyteidl.admin.GetAgentResponse\"\034\202\323\344\223\002" + - "\026\022\024/api/v1/agent/{name}\022k\n\nListAgents\022!." + - "flyteidl.admin.ListAgentsRequest\032\".flyte" + - "idl.admin.ListAgentsResponse\"\026\202\323\344\223\002\020\022\016/a" + - "pi/v1/agentsB?Z=github.com/flyteorg/flyt" + - "e/flyteidl/gen/pb-go/flyteidl/serviceb\006p" + - "roto3" + "esponse\"\0000\0012\360\001\n\024AgentMetadataService\022k\n\010" + + "GetAgent\022\037.flyteidl.admin.GetAgentReques" + + "t\032 .flyteidl.admin.GetAgentResponse\"\034\202\323\344" + + "\223\002\026\022\024/api/v1/agent/{name}\022k\n\nListAgents\022" + + "!.flyteidl.admin.ListAgentsRequest\032\".fly" + + "teidl.admin.ListAgentsResponse\"\026\202\323\344\223\002\020\022\016" + + "/api/v1/agentsB?Z=github.com/flyteorg/fl" + + "yte/flyteidl/gen/pb-go/flyteidl/serviceb" + + "\006proto3" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { diff --git a/flyteidl/gen/pb_python/flyteidl/service/agent_pb2.py b/flyteidl/gen/pb_python/flyteidl/service/agent_pb2.py index 4a1a3d8f70..916bbde9ec 100644 --- a/flyteidl/gen/pb_python/flyteidl/service/agent_pb2.py +++ b/flyteidl/gen/pb_python/flyteidl/service/agent_pb2.py @@ -15,7 +15,7 @@ from flyteidl.admin import agent_pb2 as flyteidl_dot_admin_dot_agent__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lyteidl/service/agent.proto\x12\x10\x66lyteidl.service\x1a\x1cgoogle/api/annotations.proto\x1a\x1a\x66lyteidl/admin/agent.proto2\xcc\x03\n\x11\x41syncAgentService\x12U\n\nCreateTask\x12!.flyteidl.admin.CreateTaskRequest\x1a\".flyteidl.admin.CreateTaskResponse\"\x00\x12L\n\x07GetTask\x12\x1e.flyteidl.admin.GetTaskRequest\x1a\x1f.flyteidl.admin.GetTaskResponse\"\x00\x12U\n\nDeleteTask\x12!.flyteidl.admin.DeleteTaskRequest\x1a\".flyteidl.admin.DeleteTaskResponse\"\x00\x12\x61\n\x0eGetTaskMetrics\x12%.flyteidl.admin.GetTaskMetricsRequest\x1a&.flyteidl.admin.GetTaskMetricsResponse\"\x00\x12X\n\x0bGetTaskLogs\x12\".flyteidl.admin.GetTaskLogsRequest\x1a#.flyteidl.admin.GetTaskLogsResponse\"\x00\x32\xf0\x01\n\x14\x41gentMetadataService\x12k\n\x08GetAgent\x12\x1f.flyteidl.admin.GetAgentRequest\x1a .flyteidl.admin.GetAgentResponse\"\x1c\x82\xd3\xe4\x93\x02\x16\x12\x14/api/v1/agent/{name}\x12k\n\nListAgents\x12!.flyteidl.admin.ListAgentsRequest\x1a\".flyteidl.admin.ListAgentsResponse\"\x16\x82\xd3\xe4\x93\x02\x10\x12\x0e/api/v1/agentsB\xc2\x01\n\x14\x63om.flyteidl.serviceB\nAgentProtoP\x01Z=github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service\xa2\x02\x03\x46SX\xaa\x02\x10\x46lyteidl.Service\xca\x02\x10\x46lyteidl\\Service\xe2\x02\x1c\x46lyteidl\\Service\\GPBMetadata\xea\x02\x11\x46lyteidl::Serviceb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lyteidl/service/agent.proto\x12\x10\x66lyteidl.service\x1a\x1cgoogle/api/annotations.proto\x1a\x1a\x66lyteidl/admin/agent.proto2\xce\x03\n\x11\x41syncAgentService\x12U\n\nCreateTask\x12!.flyteidl.admin.CreateTaskRequest\x1a\".flyteidl.admin.CreateTaskResponse\"\x00\x12L\n\x07GetTask\x12\x1e.flyteidl.admin.GetTaskRequest\x1a\x1f.flyteidl.admin.GetTaskResponse\"\x00\x12U\n\nDeleteTask\x12!.flyteidl.admin.DeleteTaskRequest\x1a\".flyteidl.admin.DeleteTaskResponse\"\x00\x12\x61\n\x0eGetTaskMetrics\x12%.flyteidl.admin.GetTaskMetricsRequest\x1a&.flyteidl.admin.GetTaskMetricsResponse\"\x00\x12Z\n\x0bGetTaskLogs\x12\".flyteidl.admin.GetTaskLogsRequest\x1a#.flyteidl.admin.GetTaskLogsResponse\"\x00\x30\x01\x32\xf0\x01\n\x14\x41gentMetadataService\x12k\n\x08GetAgent\x12\x1f.flyteidl.admin.GetAgentRequest\x1a .flyteidl.admin.GetAgentResponse\"\x1c\x82\xd3\xe4\x93\x02\x16\x12\x14/api/v1/agent/{name}\x12k\n\nListAgents\x12!.flyteidl.admin.ListAgentsRequest\x1a\".flyteidl.admin.ListAgentsResponse\"\x16\x82\xd3\xe4\x93\x02\x10\x12\x0e/api/v1/agentsB\xc2\x01\n\x14\x63om.flyteidl.serviceB\nAgentProtoP\x01Z=github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service\xa2\x02\x03\x46SX\xaa\x02\x10\x46lyteidl.Service\xca\x02\x10\x46lyteidl\\Service\xe2\x02\x1c\x46lyteidl\\Service\\GPBMetadata\xea\x02\x11\x46lyteidl::Serviceb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -29,7 +29,7 @@ _AGENTMETADATASERVICE.methods_by_name['ListAgents']._options = None _AGENTMETADATASERVICE.methods_by_name['ListAgents']._serialized_options = b'\202\323\344\223\002\020\022\016/api/v1/agents' _globals['_ASYNCAGENTSERVICE']._serialized_start=109 - _globals['_ASYNCAGENTSERVICE']._serialized_end=569 - _globals['_AGENTMETADATASERVICE']._serialized_start=572 - _globals['_AGENTMETADATASERVICE']._serialized_end=812 + _globals['_ASYNCAGENTSERVICE']._serialized_end=571 + _globals['_AGENTMETADATASERVICE']._serialized_start=574 + _globals['_AGENTMETADATASERVICE']._serialized_end=814 # @@protoc_insertion_point(module_scope) diff --git a/flyteidl/gen/pb_python/flyteidl/service/agent_pb2_grpc.py b/flyteidl/gen/pb_python/flyteidl/service/agent_pb2_grpc.py index 7d04b0cd33..57efa48a6a 100644 --- a/flyteidl/gen/pb_python/flyteidl/service/agent_pb2_grpc.py +++ b/flyteidl/gen/pb_python/flyteidl/service/agent_pb2_grpc.py @@ -35,7 +35,7 @@ def __init__(self, channel): request_serializer=flyteidl_dot_admin_dot_agent__pb2.GetTaskMetricsRequest.SerializeToString, response_deserializer=flyteidl_dot_admin_dot_agent__pb2.GetTaskMetricsResponse.FromString, ) - self.GetTaskLogs = channel.unary_unary( + self.GetTaskLogs = channel.unary_stream( '/flyteidl.service.AsyncAgentService/GetTaskLogs', request_serializer=flyteidl_dot_admin_dot_agent__pb2.GetTaskLogsRequest.SerializeToString, response_deserializer=flyteidl_dot_admin_dot_agent__pb2.GetTaskLogsResponse.FromString, @@ -108,7 +108,7 @@ def add_AsyncAgentServiceServicer_to_server(servicer, server): request_deserializer=flyteidl_dot_admin_dot_agent__pb2.GetTaskMetricsRequest.FromString, response_serializer=flyteidl_dot_admin_dot_agent__pb2.GetTaskMetricsResponse.SerializeToString, ), - 'GetTaskLogs': grpc.unary_unary_rpc_method_handler( + 'GetTaskLogs': grpc.unary_stream_rpc_method_handler( servicer.GetTaskLogs, request_deserializer=flyteidl_dot_admin_dot_agent__pb2.GetTaskLogsRequest.FromString, response_serializer=flyteidl_dot_admin_dot_agent__pb2.GetTaskLogsResponse.SerializeToString, @@ -203,7 +203,7 @@ def GetTaskLogs(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary(request, target, '/flyteidl.service.AsyncAgentService/GetTaskLogs', + return grpc.experimental.unary_stream(request, target, '/flyteidl.service.AsyncAgentService/GetTaskLogs', flyteidl_dot_admin_dot_agent__pb2.GetTaskLogsRequest.SerializeToString, flyteidl_dot_admin_dot_agent__pb2.GetTaskLogsResponse.FromString, options, channel_credentials, diff --git a/flyteidl/go.mod b/flyteidl/go.mod index 34466ad69e..f410e79d6b 100644 --- a/flyteidl/go.mod +++ b/flyteidl/go.mod @@ -2,8 +2,6 @@ module github.com/flyteorg/flyte/flyteidl go 1.21 -toolchain go1.21.3 - require ( github.com/antihax/optional v1.0.0 github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000 diff --git a/flyteidl/protos/flyteidl/service/agent.proto b/flyteidl/protos/flyteidl/service/agent.proto index 7538c0b819..9c2b457a12 100644 --- a/flyteidl/protos/flyteidl/service/agent.proto +++ b/flyteidl/protos/flyteidl/service/agent.proto @@ -23,7 +23,7 @@ service AsyncAgentService { rpc GetTaskMetrics(flyteidl.admin.GetTaskMetricsRequest) returns (flyteidl.admin.GetTaskMetricsResponse){}; // GetTaskLogs returns task execution logs, if available. - rpc GetTaskLogs(flyteidl.admin.GetTaskLogsRequest) returns (flyteidl.admin.GetTaskLogsResponse){}; + rpc GetTaskLogs(flyteidl.admin.GetTaskLogsRequest) returns (stream flyteidl.admin.GetTaskLogsResponse){}; } // AgentMetadataService defines an RPC service that is also served over HTTP via grpc-gateway. From ddd25cdb425ac3110d7812fb7f3e727e6a788096 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 23 Jan 2024 13:49:01 -0800 Subject: [PATCH 2/5] nit Signed-off-by: Kevin Su --- flyteidl/go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flyteidl/go.mod b/flyteidl/go.mod index f410e79d6b..34466ad69e 100644 --- a/flyteidl/go.mod +++ b/flyteidl/go.mod @@ -2,6 +2,8 @@ module github.com/flyteorg/flyte/flyteidl go 1.21 +toolchain go1.21.3 + require ( github.com/antihax/optional v1.0.0 github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000 From d95e73332e98cfa6ff4f63677d7a897105682fe9 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 23 Jan 2024 14:30:38 -0800 Subject: [PATCH 3/5] update tests Signed-off-by: Kevin Su --- .../tasks/plugins/webapi/agent/integration_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/integration_test.go b/flyteplugins/go/tasks/plugins/webapi/agent/integration_test.go index a2b2135591..0b5872da1d 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/integration_test.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/integration_test.go @@ -40,22 +40,22 @@ type MockPlugin struct { type MockAsyncTask struct { } -func (m *MockAsyncTask) GetTaskMetrics(ctx context.Context, in *admin.GetTaskMetricsRequest, opts ...grpc.CallOption) (*admin.GetTaskMetricsResponse, error) { - panic("not implemented") +func (m *MockAsyncTask) GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (service.AsyncAgentService_GetTaskLogsClient, error) { + panic("implement me") } -func (m *MockAsyncTask) GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (*admin.GetTaskLogsResponse, error) { +func (m *MockAsyncTask) GetTaskMetrics(ctx context.Context, in *admin.GetTaskMetricsRequest, opts ...grpc.CallOption) (*admin.GetTaskMetricsResponse, error) { panic("not implemented") } type MockSyncTask struct { } -func (m *MockSyncTask) GetTaskMetrics(ctx context.Context, in *admin.GetTaskMetricsRequest, opts ...grpc.CallOption) (*admin.GetTaskMetricsResponse, error) { - panic("not implemented") +func (m *MockSyncTask) GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (service.AsyncAgentService_GetTaskLogsClient, error) { + panic("implement me") } -func (m *MockSyncTask) GetTaskLogs(ctx context.Context, in *admin.GetTaskLogsRequest, opts ...grpc.CallOption) (*admin.GetTaskLogsResponse, error) { +func (m *MockSyncTask) GetTaskMetrics(ctx context.Context, in *admin.GetTaskMetricsRequest, opts ...grpc.CallOption) (*admin.GetTaskMetricsResponse, error) { panic("not implemented") } From bf2fe55f2cc5da667a40fb657437fe9c4781a8b7 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 24 Jan 2024 11:35:20 -0800 Subject: [PATCH 4/5] remove token Signed-off-by: Kevin Su --- flyteidl/protos/flyteidl/admin/agent.proto | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/flyteidl/protos/flyteidl/admin/agent.proto b/flyteidl/protos/flyteidl/admin/agent.proto index 601e8b61e6..4163a68da2 100644 --- a/flyteidl/protos/flyteidl/admin/agent.proto +++ b/flyteidl/protos/flyteidl/admin/agent.proto @@ -158,18 +158,10 @@ message GetTaskLogsRequest { string task_type = 1; // Metadata is created by the agent. It could be a string (jobId) or a dict (more complex metadata). bytes resource_meta = 2; - // Number of lines to return. - uint64 lines = 3; - // In the case of multiple pages of results, the server-provided token can be used to fetch the next page - // in a query. If there are no more results, this value will be empty. - string token = 4; } // A response containing the logs for a task execution. message GetTaskLogsResponse { // The execution log results. - repeated string results = 1; - // In the case of multiple pages of results, the server-provided token can be used to fetch the next page - // in a query. If there are no more results, this value will be empty. - string token = 2; + string results = 1; } \ No newline at end of file From 29fc426068c1d2b8c4b61bf53451aca4da1f606d Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Wed, 24 Jan 2024 14:01:48 -0800 Subject: [PATCH 5/5] Streaming response Signed-off-by: Haytham Abuelfutuh --- flyteidl/protos/flyteidl/admin/agent.proto | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/flyteidl/protos/flyteidl/admin/agent.proto b/flyteidl/protos/flyteidl/admin/agent.proto index 4163a68da2..086e77c4ae 100644 --- a/flyteidl/protos/flyteidl/admin/agent.proto +++ b/flyteidl/protos/flyteidl/admin/agent.proto @@ -160,8 +160,18 @@ message GetTaskLogsRequest { bytes resource_meta = 2; } +message GetTaskLogsResponseHeader { + string token = 1; +} + +message GetTaskLogsResponseBody { + repeated string results = 1; +} + // A response containing the logs for a task execution. message GetTaskLogsResponse { - // The execution log results. - string results = 1; + oneof part { + GetTaskLogsResponseHeader header = 1; + GetTaskLogsResponseBody body = 2; + } } \ No newline at end of file