From 2e06fdaf848d3207425fa56e777102ddd5433a64 Mon Sep 17 00:00:00 2001 From: Ruoyu Ying Date: Tue, 19 Nov 2024 01:21:27 +0800 Subject: [PATCH] gmc: include ui as part of pipeline (#435) * add ui handler in GMC router logic * add sample config file for chatqna with conversation ui Signed-off-by: Ruoyu Ying Co-authored-by: Iris --- .../api/v1alpha3/validating_webhook.go | 1 + microservices-connector/cmd/router/main.go | 195 +++++++++++++++++- .../chatQnA_dataprep_xeon_with_ui.yaml | 82 ++++++++ .../controller/gmconnector_controller.go | 2 + 4 files changed, 274 insertions(+), 6 deletions(-) create mode 100644 microservices-connector/config/samples/ChatQnA/chatQnA_dataprep_xeon_with_ui.yaml diff --git a/microservices-connector/api/v1alpha3/validating_webhook.go b/microservices-connector/api/v1alpha3/validating_webhook.go index 9b936fdd5..173e203d9 100644 --- a/microservices-connector/api/v1alpha3/validating_webhook.go +++ b/microservices-connector/api/v1alpha3/validating_webhook.go @@ -46,6 +46,7 @@ var ( "Whisper", "WhisperGaudi", "DataPrep", + "UI", } ) diff --git a/microservices-connector/cmd/router/main.go b/microservices-connector/cmd/router/main.go index eacf84db0..843c6fcb9 100644 --- a/microservices-connector/cmd/router/main.go +++ b/microservices-connector/cmd/router/main.go @@ -20,6 +20,8 @@ import ( "io" "mime/multipart" "net/http" + "net/http/httputil" + "net/url" "os" // "regexp" @@ -39,12 +41,15 @@ import ( ) const ( - BufferSize = 1024 - MaxGoroutines = 1024 - ServiceURL = "serviceUrl" - ServiceNode = "node" - DataPrep = "DataPrep" - Parameters = "parameters" + BufferSize = 1024 + MaxGoroutines = 1024 + ServiceURL = "serviceUrl" + ServiceNode = "node" + DataPrep = "DataPrep" + Parameters = "parameters" + UI = "UI" + LLMKeyword = "query" + EmbeddingKeyword = "text" ) var ( @@ -64,6 +69,15 @@ var ( Transport: transport, Timeout: 30 * time.Second, } + UnknownErr = errors.New("Unknown format") + defaultLlmParams = map[string]interface{}{ + "max_tokens": 1024, + "top_k": 10, + "top_p": 0.95, + "temperature": 0.01, + "repetition_penalty": 1.03, + "streaming": true, + } ) type EnsembleStepOutput struct { @@ -727,10 +741,179 @@ func handleMultipartError(writer *multipart.Writer, err error) { log.Error(err, "Error during multipart creation") } +// create a handler to handle traffic to /ui +// if the payload is empty, redirect to ui service +// if there's payload, format the payload and redirect to backend service +func mcUiHandler(w http.ResponseWriter, req *http.Request) { + // redirect traffic to ui pod if payload is empty + // redirect traffic to mcGraphHandler if payload is not empty + var finishProcessing bool + defaultNode := mcGraph.Spec.Nodes[defaultNodeName] + for i := range defaultNode.Steps { + step := &defaultNode.Steps[i] + if UI == step.StepName { + body, err := io.ReadAll(req.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // if no payload included in the request, redirect request to UI + if len(body) == 0 { + serviceURL := getServiceURLByStepTarget(step, mcGraph.Namespace) + targetURL, err := url.Parse(serviceURL) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + proxy := httputil.NewSingleHostReverseProxy(targetURL) + proxy.ServeHTTP(w, req) + finishProcessing = true + } else { + // if payload exists, format payload and redirect to backend services + var data map[string]interface{} + parsedData := map[string]interface{}{} + + // find the first hop for the pipeline + var nextHop *mcv1alpha3.Step + for i := range defaultNode.Steps { + nextHop = &defaultNode.Steps[i] + if nextHop.InternalService.IsDownstreamService { + // skip downstream service + continue + } + break + } + + // set the corresponding keyword for input data + var key string + switch nextHop.StepName { + case "Embedding": + key = EmbeddingKeyword + case "Llm": + key = LLMKeyword + default: + log.Info("Unsupported step. Failed to find the corresponding keyword. Using default one...") + key = LLMKeyword + } + + // resolve the data input + err = json.Unmarshal(body, &data) + if err != nil { + log.Error(err, "Failed to parse data.") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // check if the pattern "messages" exists + if val, ok := data["messages"]; ok { + switch ms := val.(type) { + // value is in the format of string + case string: + parsedData[key] = ms + // value is in the format of array + case []interface{}: + for _, item := range ms { + // check if item is in the format of map + if m, ok := item.(map[string]interface{}); ok { + // find the key "role" + if v, ok := m["role"]; ok { + strRole := fmt.Sprintf("%v", v) + content := "" + // find the key "content" + // currently consume everything as string + if c, ok := m["content"]; ok { + content = fmt.Sprintf("%v", c) + } + switch strRole { + // concatenate system prompt + case "system": + parsedData[key] = content + "/n" + // concatenate others + default: + parsedData[key] = fmt.Sprintf("%v", parsedData[key]) + strRole + ":" + content + "/n" + } + } + } + } + // others + default: + log.Error(UnknownErr, "Unknown format in payload messages.") + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } + + // attach the default llm parameters if the llm is the first hop + if key == LLMKeyword { + for k, v := range defaultLlmParams { + parsedData[k] = v + } + } + + // append rest of the data + // treat everything as string + for k, v := range data { + if k == "messages" { + continue + } + parsedData[k] = v + } + + marshalData, err := json.Marshal(parsedData) + if err != nil { + log.Error(err, "Failed to marshal prompt to json") + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + // create a new http request with the formatted data + proxyReq, err := http.NewRequest(req.Method, req.URL.String(), bytes.NewReader(marshalData)) + if err != nil { + log.Error(err, "Failed to generate new http request with formatted payload.") + http.Error(w, err.Error(), http.StatusInternalServerError) + } + mcGraphHandler(w, proxyReq) + finishProcessing = true + } + + } + } + // check if the request has been processed + if !finishProcessing { + log.Info("UI step not found in the graph or other errors happened. Please check the logs.") + } +} + +// accessing UI result in access to assets under /assets uri +// create a handler to redirect that request to ui endpoint +func mcAssetHandler(w http.ResponseWriter, req *http.Request) { + // Determine the asset type based on the URL path + defaultNode := mcGraph.Spec.Nodes[defaultNodeName] + found := false + for i := range defaultNode.Steps { + step := &defaultNode.Steps[i] + if UI == step.StepName { + serviceURL := getServiceURLByStepTarget(step, mcGraph.Namespace) + targetURL, err := url.Parse(serviceURL) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + proxy := httputil.NewSingleHostReverseProxy(targetURL) + proxy.ServeHTTP(w, req) + return + } + } + if !found { + log.Info("UI step not found in the graph.") + } +} + func initializeRoutes() *http.ServeMux { mux := http.NewServeMux() mux.HandleFunc("/", mcGraphHandler) mux.HandleFunc("/dataprep", mcDataHandler) + mux.HandleFunc("/assets/", mcAssetHandler) + mux.HandleFunc("/ui", mcUiHandler) return mux } diff --git a/microservices-connector/config/samples/ChatQnA/chatQnA_dataprep_xeon_with_ui.yaml b/microservices-connector/config/samples/ChatQnA/chatQnA_dataprep_xeon_with_ui.yaml new file mode 100644 index 000000000..04ada4341 --- /dev/null +++ b/microservices-connector/config/samples/ChatQnA/chatQnA_dataprep_xeon_with_ui.yaml @@ -0,0 +1,82 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +apiVersion: gmc.opea.io/v1alpha3 +kind: GMConnector +metadata: + labels: + app.kubernetes.io/name: gmconnector + app.kubernetes.io/managed-by: kustomize + gmc/platform: xeon + name: chatqa + namespace: chatqa +spec: + routerConfig: + name: router + serviceName: router-service + nodes: + root: + routerType: Sequence + steps: + - name: Embedding + internalService: + serviceName: embedding-svc + config: + endpoint: /v1/embeddings + TEI_EMBEDDING_ENDPOINT: tei-embedding-svc + - name: TeiEmbedding + internalService: + serviceName: tei-embedding-svc + isDownstreamService: true + - name: Retriever + data: $response + internalService: + serviceName: retriever-svc + config: + endpoint: /v1/retrieval + REDIS_URL: redis-vector-db + TEI_EMBEDDING_ENDPOINT: tei-embedding-svc + - name: VectorDB + internalService: + serviceName: redis-vector-db + isDownstreamService: true + - name: Reranking + data: $response + internalService: + serviceName: reranking-svc + config: + endpoint: /v1/reranking + TEI_RERANKING_ENDPOINT: tei-reranking-svc + - name: TeiReranking + internalService: + serviceName: tei-reranking-svc + config: + endpoint: /rerank + isDownstreamService: true + - name: Llm + data: $response + internalService: + serviceName: llm-svc + config: + endpoint: /v1/chat/completions + TGI_LLM_ENDPOINT: tgi-service-m + - name: Tgi + internalService: + serviceName: tgi-service-m + config: + endpoint: /generate + isDownstreamService: true + - name: DataPrep + internalService: + serviceName: data-prep-svc + config: + endpoint: /v1/dataprep + REDIS_URL: redis-vector-db + TEI_ENDPOINT: tei-embedding-svc + isDownstreamService: true + - name: UI + internalService: + serviceName: ui-svc + config: + endpoint: / + isDownstreamService: true diff --git a/microservices-connector/internal/controller/gmconnector_controller.go b/microservices-connector/internal/controller/gmconnector_controller.go index 51e405d4c..c5f0801cc 100644 --- a/microservices-connector/internal/controller/gmconnector_controller.go +++ b/microservices-connector/internal/controller/gmconnector_controller.go @@ -71,6 +71,7 @@ const ( SpeechT5Gaudi = "SpeechT5Gaudi" Whisper = "Whisper" WhisperGaudi = "WhisperGaudi" + UI = "UI" ) var yamlDict = map[string]string{ @@ -95,6 +96,7 @@ var yamlDict = map[string]string{ Whisper: yaml_dir + "whisper.yaml", WhisperGaudi: yaml_dir + "whisper_gaudi.yaml", DataPrep: yaml_dir + "data-prep.yaml", + UI: yaml_dir + "ui.yaml", } var (