Skip to content

Commit

Permalink
Merge pull request #101 from gen-mind/patch/message-vote-cascadedelete
Browse files Browse the repository at this point in the history
Patch/message vote cascadedelete
  • Loading branch information
apaladiychuk authored May 15, 2024
2 parents c48ea17 + 6b310ce commit 5b7e24c
Show file tree
Hide file tree
Showing 45 changed files with 1,502 additions and 1,426 deletions.
1 change: 0 additions & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ swag-api:
swag init --dir ./api --output ./api/docs --parseDependency --parseInternal

gen-proto:
rm -f core/proto/*.pb.go
protoc -I=. -I=vendor -I=${GOPATH}/src core/proto/*.proto --go_out=. --go-grpc_out=.
8 changes: 7 additions & 1 deletion backend/api-service-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ spec:
- name: MINIO_BUCKET_NAME
value: "data"
- name: MINIO_REGION
value: "CH"
value: "CH"
- name: NATS_URL
value: "nats.nats:4222"
- name: MILVUS_URL
value: "my-release-milvus.milvus2.svc.cluster.local:19530"
- name: EMBEDDING_GRPC_URL
value: "embeddingservice:50051"
ports:
- containerPort: 8080
volumeMounts:
Expand Down
2 changes: 2 additions & 0 deletions backend/api/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var Module = fx.Options(
bll.BLLModule,
storage.MinioModule,
messaging.NatsModule,
ai.EmbeddingModule,
storage.MilvusModule,
fx.Provide(ReadConfig,
NewRouter,
newGoogleOauthProvider,
Expand Down
80 changes: 18 additions & 62 deletions backend/connector/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ func (e *Executor) run(ctx context.Context, topic, subscriptionName string, task

func (e *Executor) runEmbedding(ctx context.Context, msg *proto.Message) error {
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.MapCarrier(msg.Header))
payload := msg.GetBody().GetEmbedding()
payload := msg.GetBody().GetChunking()
if payload == nil {
zap.S().Errorf("Failed to get embedding payload")
return nil
}
zap.S().Infof("process embedding %d == > %50s ", payload.GetDocumentId(), payload.Content)

return nil
}

Expand All @@ -53,85 +53,46 @@ func (e *Executor) runConnector(ctx context.Context, msg *proto.Message) error {
if err != nil {
return err
}
// todo move to connector
if err = e.connectorRepo.InvalidateConnector(ctx, connectorModel); err != nil {
return err
}
connectorWF, err := connector.New(connectorModel)
if err != nil {
return err
}
embedding := ai.NewEmbeddingParser(&model.EmbeddingModel{ModelID: "text-embedding-ada-002"})
resultCh := connectorWF.Execute(ctx, trigger.Params)

if err = e.milvusClinet.CreateSchema(ctx, connectorWF.CollectionName()); err != nil {
if err = e.milvusClinet.CreateSchema(ctx, connectorModel.CollectionName()); err != nil {
return fmt.Errorf("error creating schema: %v", err)
}

for result := range resultCh {
var loopErr error
doc := e.handleResult(ctx, connectorModel, result)
if !doc.IsUpdated {
continue
}

if doc.ID.IntPart() != 0 {
loopErr = e.docRepo.Update(ctx, doc)
} else {
loopErr = e.docRepo.Create(ctx, doc)
}

if loopErr != nil {
err = loopErr
doc.Status = model.StatusFailed
zap.S().Errorf("Failed to update document: %v", loopErr)
continue
}
chunks, loopErr := e.chunking.Split(ctx, result.Content)
if loopErr != nil {
err = loopErr
doc.Status = model.StatusFailed
zap.S().Errorf("Failed to update document: %v", loopErr)
continue
}
embeddingResponse, loopErr := embedding.Parse(ctx, &proto.EmbeddingRequest{
DocumentId: doc.ID.IntPart(),
Key: doc.DocumentID,
Content: chunks,
})
if loopErr != nil {
err = loopErr
doc.Status = model.StatusFailed
zap.S().Errorf("Failed to update document: %v", loopErr)
continue
}
milvusPayload := make([]*storage.MilvusPayload, 0, len(embeddingResponse.Payload))
for _, payload := range embeddingResponse.Payload {
milvusPayload = append(milvusPayload, &storage.MilvusPayload{
DocumentID: embeddingResponse.GetDocumentId(),
Chunk: payload.GetChunk(),
Content: payload.GetContent(),
Vector: payload.GetVector(),
})
}
if loopErr = e.milvusClinet.Save(ctx, connectorWF.CollectionName(), milvusPayload...); loopErr != nil {
result.DocumentId = doc.ID.IntPart()

if loopErr = e.msgClient.Publish(ctx, model.TopicChunking, &proto.Body{
Payload: &proto.Body_Chunking{Chunking: result},
}); loopErr != nil {
err = loopErr
doc.Status = model.StatusFailed
zap.S().Errorf("Failed to update document: %v", loopErr)
continue
}

//todo for production
//if loopErr = e.msgClient.Publish(ctx, model.TopicEmbedding,
// &proto.Body{MilvusPayload: &proto.Body_Embedding{Embedding: &proto.EmbeddingRequest{
// DocumentId: doc.ID.IntPart(),
// Key: doc.DocumentID,
// Content: chunks,
// }}},
//); err != nil {
// err = loopErr
// zap.S().Errorf("Failed to update document: %v", err)
// doc.Status = model.StatusFailed
// doc.Signature = ""
// continue
//}
}

if err != nil {
Expand All @@ -145,26 +106,21 @@ func (e *Executor) runConnector(ctx context.Context, msg *proto.Message) error {
return nil
}

func (e *Executor) handleResult(ctx context.Context, connectorModel *model.Connector, result *proto.TriggerResponse) *model.Document {
doc, ok := connectorModel.DocsMap[result.GetDocumentId()]
func (e *Executor) handleResult(ctx context.Context, connectorModel *model.Connector, result *proto.ChunkingData) *model.Document {
doc, ok := connectorModel.DocsMap[result.GetUrl()]
if !ok {
doc = &model.Document{
DocumentID: result.GetDocumentId(),
DocumentID: result.GetUrl(),
ConnectorID: connectorModel.ID,
Link: result.GetUrl(),
Signature: result.GetSignature(),
CreatedDate: time.Now().UTC(),
Status: model.StatusInProgress,
IsUpdated: true,
}
connectorModel.DocsMap[result.GetDocumentId()] = doc
} else {
if doc.Signature != result.GetSignature() {
doc.Signature = result.GetSignature()
doc.Status = model.StatusInProgress
doc.IsUpdated = true
}
connectorModel.DocsMap[result.GetUrl()] = doc
}

doc.Status = model.StatusInProgress

return doc
}

Expand Down
2 changes: 1 addition & 1 deletion backend/connector/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var Module = fx.Options(

func RunServer(lc fx.Lifecycle, executor *Executor) error {

go executor.run(context.Background(), model.TopicEmbedding, model.SubscriptionEmbedding, executor.runEmbedding)
//go executor.run(context.Background(), model.TopicEmbedding, model.SubscriptionEmbedding, executor.runEmbedding)
go executor.run(context.Background(), model.TopicExecutor, model.SubscriptionExecutor, executor.runConnector)
return nil

Expand Down
21 changes: 20 additions & 1 deletion backend/connector/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,26 @@ collection name is fixed by a rule like tennant_{ID}. user_{ID}
pass opentelemetry context to nats for distributed tracing


### file compatibility
### file compatibility

- Office all
- doc, docx
- xls, xlsx
- rtf
- ppt, pptx
- pdf
- google docs all
- txt
- md

#### optional
- ODT
- ODS
- ODP
- ODG


### connectors
teams chats
teams teams
sharepoint
Expand Down
59 changes: 0 additions & 59 deletions backend/core/ai/embedding-openai.go

This file was deleted.

17 changes: 17 additions & 0 deletions backend/core/ai/embeddingi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ai

import (
_ "github.com/deluan/flowllm/llms/openai"
validation "github.com/go-ozzo/ozzo-validation/v4"
)

type (
EmbeddingConfig struct {
EmbeddingURL string `env:"EMBEDDING_GRPC_URL"`
}
)

func (v EmbeddingConfig) Validate() error {
return validation.ValidateStruct(&v,
validation.Field(&v.EmbeddingURL, validation.Required))
}
5 changes: 0 additions & 5 deletions backend/core/ai/model.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ai

import (
"cognix.ch/api/v2/core/proto"
"context"
validation "github.com/go-ozzo/ozzo-validation/v4"
)
Expand All @@ -22,10 +21,6 @@ type (
Chunking interface {
Split(ctx context.Context, text string) ([]string, error)
}

EmbeddingParser interface {
Parse(ctx context.Context, payload *proto.EmbeddingRequest) (*proto.EmbeddingResponse, error)
}
)

func (v ChunkingConfig) Validate() error {
Expand Down
27 changes: 27 additions & 0 deletions backend/core/ai/module.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package ai

import (
"cognix.ch/api/v2/core/proto"
"cognix.ch/api/v2/core/utils"
"go.uber.org/fx"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var ChunkingModule = fx.Options(
Expand All @@ -26,3 +29,27 @@ func newChunking(cfg *ChunkingConfig) Chunking {
}
return NewStaticChunking(cfg)
}

var EmbeddingModule = fx.Options(
fx.Provide(func() (*EmbeddingConfig, error) {
cfg := EmbeddingConfig{}
if err := utils.ReadConfig(&cfg); err != nil {
return nil, err
}
if err := cfg.Validate(); err != nil {
return nil, err
}
return &cfg, nil
},
newEmbeddingGRPCClient),
)

func newEmbeddingGRPCClient(cfg *EmbeddingConfig) (proto.EmbeddServiceClient, error) {
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}

conn, err := grpc.Dial(cfg.EmbeddingURL, dialOptions...)
if err != nil {
return nil, err
}
return proto.NewEmbeddServiceClient(conn), nil
}
7 changes: 0 additions & 7 deletions backend/core/ai/open-ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ func (o *openAIClient) Request(ctx context.Context, message string) (*Response,
Messages: []openai.ChatCompletionMessage{userMessage},
},
)
o.client.CreateEmbeddings(ctx, &openai.EmbeddingRequest{
Input: nil,
Model: "",
User: "",
EncodingFormat: "",
Dimensions: 0,
})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 5b7e24c

Please sign in to comment.