diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 61d2667..c35eb8b 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -533,6 +533,9 @@ func (bl *FxBlockchain) serve(w http.ResponseWriter, r *http.Request) { actionFetchContainerLogs: func(from peer.ID, w http.ResponseWriter, r *http.Request) { bl.handleFetchContainerLogs(r.Context(), from, w, r) }, + actionChatWithAI: func(from peer.ID, w http.ResponseWriter, r *http.Request) { + bl.handleChatWithAI(r.Context(), from, w, r) + }, actionFindBestAndTargetInLogs: func(from peer.ID, w http.ResponseWriter, r *http.Request) { bl.handleFindBestAndTargetInLogs(r.Context(), from, w, r) }, @@ -981,7 +984,7 @@ func (bl *FxBlockchain) authorized(pid peer.ID, action string) bool { switch action { case actionReplicateInPool: return (bl.authorizer == bl.h.ID() || bl.authorizer == "") - case actionBloxFreeSpace, actionAccountFund, actionManifestBatchUpload, actionAssetsBalance, actionGetDatastoreSize, actionGetFolderSize, actionFindBestAndTargetInLogs, actionFetchContainerLogs, actionEraseBlData, actionWifiRemoveall, actionReboot, actionPartition, actionDeleteWifi, actionDisconnectWifi, actionDeleteFulaConfig, actionGetAccount, actionSeeded, actionAccountExists, actionPoolCreate, actionPoolJoin, actionPoolCancelJoin, actionPoolRequests, actionPoolList, actionPoolVote, actionPoolLeave, actionManifestUpload, actionManifestStore, actionManifestAvailable, actionManifestRemove, actionManifestRemoveStorer, actionManifestRemoveStored, actionTransferToMumbai, actionListPlugins, actionListActivePlugins, actionInstallPlugin, actionUninstallPlugin, actionGetInstallStatus, actionGetInstallOutput, actionUpdatePlugin: + case actionBloxFreeSpace, actionAccountFund, actionManifestBatchUpload, actionAssetsBalance, actionGetDatastoreSize, actionGetFolderSize, actionFindBestAndTargetInLogs, actionFetchContainerLogs, actionChatWithAI, actionEraseBlData, actionWifiRemoveall, actionReboot, actionPartition, actionDeleteWifi, actionDisconnectWifi, actionDeleteFulaConfig, actionGetAccount, actionSeeded, actionAccountExists, actionPoolCreate, actionPoolJoin, actionPoolCancelJoin, actionPoolRequests, actionPoolList, actionPoolVote, actionPoolLeave, actionManifestUpload, actionManifestStore, actionManifestAvailable, actionManifestRemove, actionManifestRemoveStorer, actionManifestRemoveStored, actionTransferToMumbai, actionListPlugins, actionListActivePlugins, actionInstallPlugin, actionUninstallPlugin, actionGetInstallStatus, actionGetInstallOutput, actionUpdatePlugin: bl.authorizedPeersLock.RLock() _, ok := bl.authorizedPeers[pid] bl.authorizedPeersLock.RUnlock() diff --git a/blockchain/blox.go b/blockchain/blox.go index 9c53b89..9a8f9e1 100644 --- a/blockchain/blox.go +++ b/blockchain/blox.go @@ -1,12 +1,14 @@ package blockchain import ( + "bufio" "bytes" "context" "encoding/json" "fmt" "io" "net/http" + "sync" "github.com/functionland/go-fula/wap/pkg/wifi" "github.com/libp2p/go-libp2p/core/network" @@ -20,6 +22,49 @@ const ( GB = 1024 * MB ) +type StreamBuffer struct { + mu sync.Mutex + chunks []string + closed bool + err error +} + +func NewStreamBuffer() *StreamBuffer { + return &StreamBuffer{ + chunks: make([]string, 0), + } +} + +func (b *StreamBuffer) AddChunk(chunk string) { + b.mu.Lock() + defer b.mu.Unlock() + if b.closed { + return + } + b.chunks = append(b.chunks, chunk) +} + +func (b *StreamBuffer) Close(err error) { + b.mu.Lock() + defer b.mu.Unlock() + b.closed = true + b.err = err +} + +func (b *StreamBuffer) GetChunk() (string, error) { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.chunks) > 0 { + chunk := b.chunks[0] + b.chunks = b.chunks[1:] + return chunk, nil + } + if b.closed { + return "", b.err + } + return "", nil // No chunk available yet +} + func (bl *FxBlockchain) BloxFreeSpace(ctx context.Context, to peer.ID) ([]byte, error) { if bl.allowTransientConnection { ctx = network.WithUseTransient(ctx, "fx.blockchain") @@ -245,6 +290,55 @@ func (bl *FxBlockchain) FetchContainerLogs(ctx context.Context, to peer.ID, r wi } } +func (bl *FxBlockchain) ChatWithAI(ctx context.Context, to peer.ID, r wifi.ChatWithAIRequest) (*StreamBuffer, error) { + if bl.allowTransientConnection { + ctx = network.WithUseTransient(ctx, "fx.blockchain") + } + + // Encode the request into JSON + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(r); err != nil { + return nil, fmt.Errorf("failed to encode request: %v", err) + } + + // Create the HTTP request + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+to.String()+".invalid/"+actionChatWithAI, &buf) + if err != nil { + return nil, fmt.Errorf("failed to create request: %v", err) + } + + resp, err := bl.c.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %v", err) + } + if resp.StatusCode != http.StatusOK { + defer resp.Body.Close() + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("unexpected response: %d; body: %s", resp.StatusCode, string(bodyBytes)) + } + + buffer := NewStreamBuffer() // Create a new StreamBuffer + + go func() { + defer resp.Body.Close() + reader := bufio.NewReader(resp.Body) + for { + line, err := reader.ReadString('\n') // Read each chunk line by line + if err != nil { + if err == io.EOF { + buffer.Close(nil) // Close buffer with no error + } else { + buffer.Close(fmt.Errorf("error reading response stream: %v", err)) + } + break + } + buffer.AddChunk(line) // Add each chunk to the buffer + } + }() + + return buffer, nil // Return the StreamBuffer +} + func (bl *FxBlockchain) FindBestAndTargetInLogs(ctx context.Context, to peer.ID, r wifi.FindBestAndTargetInLogsRequest) ([]byte, error) { if bl.allowTransientConnection { @@ -468,6 +562,64 @@ func (bl *FxBlockchain) handleFetchContainerLogs(ctx context.Context, from peer. } +func (bl *FxBlockchain) handleChatWithAI(ctx context.Context, from peer.ID, w http.ResponseWriter, r *http.Request) { + log := log.With("action", actionChatWithAI, "from", from) + + var req wifi.ChatWithAIRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + log.Error("failed to decode request: %v", err) + http.Error(w, "failed to decode request", http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusAccepted) // Use StatusAccepted for consistency + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming not supported", http.StatusInternalServerError) + return + } + + chunks, err := wifi.FetchAIResponse(ctx, req.AIModel, req.UserMessage) + if err != nil { + log.Error("error in fetchAIResponse: %v", err) + http.Error(w, fmt.Sprintf("Error fetching AI response: %v", err), http.StatusInternalServerError) + return + } + + log.Debugw("Streaming AI response started", "ai_model", req.AIModel) + defer log.Debugw("Streaming AI response ended", "ai_model", req.AIModel) + + for { + select { + case <-ctx.Done(): // Handle client disconnect or cancellation + log.Warn("client disconnected") + return + case chunk, ok := <-chunks: + if !ok { + return // Channel closed + } + response := wifi.ChatWithAIResponse{ + Status: true, + Msg: chunk, + } + + if err := json.NewEncoder(w).Encode(response); err != nil { + log.Error("failed to write response: %v", err) + errorResponse := wifi.ChatWithAIResponse{ + Status: false, + Msg: fmt.Sprintf("Error writing response: %v", err), + } + json.NewEncoder(w).Encode(errorResponse) // Send error as part of stream + flusher.Flush() + return + } + flusher.Flush() + } + } +} + func (bl *FxBlockchain) handleFindBestAndTargetInLogs(ctx context.Context, from peer.ID, w http.ResponseWriter, r *http.Request) { log := log.With("action", actionFindBestAndTargetInLogs, "from", from) diff --git a/blockchain/interface.go b/blockchain/interface.go index 8bebc01..93a955f 100644 --- a/blockchain/interface.go +++ b/blockchain/interface.go @@ -64,6 +64,9 @@ const ( actionGetInstallOutput = "get-install-output" actionGetInstallStatus = "get-install-status" actionUpdatePlugin = "update-plugin" + + // AI + actionChatWithAI = "chat-ai" ) type ReplicateRequest struct { @@ -520,6 +523,9 @@ type Blockchain interface { GetInstallOutput(context.Context, peer.ID, string, string) ([]byte, error) GetInstallStatus(context.Context, peer.ID, string) ([]byte, error) UpdatePlugin(context.Context, peer.ID, string) ([]byte, error) + + // AI + ChatWithAI(context.Context, peer.ID, wifi.ChatWithAIRequest) (*StreamBuffer, error) } var requestTypes = map[string]reflect.Type{ @@ -574,6 +580,9 @@ var requestTypes = map[string]reflect.Type{ actionGetInstallOutput: reflect.TypeOf(GetInstallOutputRequest{}), actionGetInstallStatus: reflect.TypeOf(GetInstallStatusRequest{}), actionUpdatePlugin: reflect.TypeOf(UpdatePluginRequest{}), + + // AI + actionChatWithAI: reflect.TypeOf(wifi.ChatWithAIRequest{}), } var responseTypes = map[string]reflect.Type{ @@ -628,4 +637,7 @@ var responseTypes = map[string]reflect.Type{ actionGetInstallOutput: reflect.TypeOf(GetInstallOutputResponse{}), actionGetInstallStatus: reflect.TypeOf(GetInstallStatusResponse{}), actionUpdatePlugin: reflect.TypeOf(UpdatePluginResponse{}), + + // AI + actionChatWithAI: reflect.TypeOf(wifi.ChatWithAIResponse{}), } diff --git a/go.mod b/go.mod index f6a35d4..25c01d5 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,12 @@ module github.com/functionland/go-fula -go 1.21 +go 1.22.0 + +toolchain go1.22.1 require ( github.com/docker/docker v24.0.7+incompatible + github.com/google/uuid v1.6.0 github.com/grandcat/zeroconf v1.0.0 github.com/ipfs-cluster/ipfs-cluster v1.0.8 github.com/ipfs/boxo v0.17.0 @@ -28,14 +31,16 @@ require ( github.com/mdp/qrterminal v1.0.1 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.2 + github.com/multiformats/go-multibase v0.2.0 github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 github.com/multiformats/go-varint v0.0.7 + github.com/sony/gobreaker v0.5.0 github.com/tyler-smith/go-bip39 v1.1.0 github.com/urfave/cli/v2 v2.27.1 go.uber.org/ratelimit v0.3.0 - golang.org/x/crypto v0.21.0 - golang.org/x/sync v0.6.0 + golang.org/x/crypto v0.32.0 + golang.org/x/sync v0.10.0 gopkg.in/ini.v1 v1.67.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -49,7 +54,7 @@ require ( github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/sys v0.29.0 // indirect gotest.tools/v3 v3.4.0 // indirect ) @@ -110,7 +115,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/websocket v1.5.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect @@ -193,7 +197,6 @@ require ( github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect - github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multistream v0.5.0 // indirect github.com/onsi/ginkgo/v2 v2.13.2 // indirect github.com/opencontainers/runtime-spec v1.1.0 // indirect @@ -233,7 +236,6 @@ require ( github.com/rs/cors v1.10.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/samber/lo v1.39.0 // indirect - github.com/sony/gobreaker v0.5.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/testify v1.8.4 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect @@ -268,12 +270,12 @@ require ( go.uber.org/zap v1.26.0 // indirect go4.org v0.0.0-20230225012048-214862532bf5 // indirect golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect - golang.org/x/mobile v0.0.0-20240320162201-c76e57eead38 // indirect - golang.org/x/mod v0.16.0 // indirect - golang.org/x/net v0.22.0 // indirect + golang.org/x/mobile v0.0.0-20250106192035-c31d5b91ecc3 // indirect + golang.org/x/mod v0.22.0 // indirect + golang.org/x/net v0.34.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect - golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.19.0 // indirect + golang.org/x/text v0.21.0 // indirect + golang.org/x/tools v0.29.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect gonum.org/v1/gonum v0.14.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/go.sum b/go.sum index 5e60f56..1c1e1a0 100644 --- a/go.sum +++ b/go.sum @@ -1152,10 +1152,10 @@ golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45 golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -1183,8 +1183,8 @@ golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= -golang.org/x/mobile v0.0.0-20240320162201-c76e57eead38 h1:Pje51YXhl8Griu25k5zJBD48T4C0qA7YUd5myQfnFLo= -golang.org/x/mobile v0.0.0-20240320162201-c76e57eead38/go.mod h1:DN+F2TpepQEh5goqWnM3gopfFakSWM8OmHiz0rPRjT4= +golang.org/x/mobile v0.0.0-20250106192035-c31d5b91ecc3 h1:8LrYkH99trX3onYF3dT9frUSRDXokkceG+9tHBaDAFQ= +golang.org/x/mobile v0.0.0-20250106192035-c31d5b91ecc3/go.mod h1:sY92m3V/rTEa4JCJ1FkKHK978K6wxOSX1PStMYo+6wI= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= @@ -1194,10 +1194,10 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.15.0 h1:SernR4v+D55NyBH2QiEQrlBAnj1ECL6AGrA5+dPaMY8= -golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= +golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1254,10 +1254,10 @@ golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1286,6 +1286,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1368,10 +1370,10 @@ golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1400,6 +1402,7 @@ golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1461,10 +1464,10 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ= -golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg= golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= +golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= +golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/mobile/blockchain.go b/mobile/blockchain.go index dae94a3..ab4a33d 100644 --- a/mobile/blockchain.go +++ b/mobile/blockchain.go @@ -10,6 +10,7 @@ import ( "github.com/functionland/go-fula/blockchain" wifi "github.com/functionland/go-fula/wap/pkg/wifi" + "github.com/google/uuid" ) type PluginParam struct { @@ -256,6 +257,58 @@ func (c *Client) FetchContainerLogs(ContainerName string, TailCount string) ([]b return c.bl.FetchContainerLogs(ctx, c.bloxPid, wifi.FetchContainerLogsRequest{ContainerName: ContainerName, TailCount: TailCount}) } +func (c *Client) ChatWithAI(aiModel string, userMessage string) ([]byte, error) { + ctx := context.TODO() + + buffer, err := c.bl.ChatWithAI(ctx, c.bloxPid, wifi.ChatWithAIRequest{ + AIModel: aiModel, + UserMessage: userMessage, + }) + if err != nil { + return nil, fmt.Errorf("error starting ChatWithAI: %v", err) + } + + streamID := uuid.New().String() // Generate a unique stream ID + c.mu.Lock() + c.streams[streamID] = buffer // Store the StreamBuffer in the map + c.mu.Unlock() + + return []byte(streamID), nil // Return the stream ID as a byte slice +} + +func (c *Client) GetChatChunk(streamID string) (string, error) { + c.mu.Lock() + buffer, ok := c.streams[streamID] + c.mu.Unlock() + + if !ok { + return "", fmt.Errorf("invalid stream ID") + } + + chunk, err := buffer.GetChunk() + if chunk == "" && err == nil { + return "", nil // No chunk available yet + } + if err != nil { // Stream closed or errored out + c.mu.Lock() + delete(c.streams, streamID) + c.mu.Unlock() + } + return chunk, err +} + +func (c *Client) GetStreamIterator(streamID string) (*StreamIterator, error) { + c.mu.Lock() + buffer, ok := c.streams[streamID] + c.mu.Unlock() + + if !ok { + return nil, fmt.Errorf("invalid stream ID") + } + + return &StreamIterator{buffer: buffer}, nil +} + // GetAccount requests blox at Config.BloxAddr to get the balance of the account. // the addr must be a valid multiaddr that includes peer ID. func (c *Client) FindBestAndTargetInLogs(NodeContainerName string, TailCount string) ([]byte, error) { diff --git a/mobile/client.go b/mobile/client.go index c7a7727..5525db6 100644 --- a/mobile/client.go +++ b/mobile/client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "github.com/functionland/go-fula/blockchain" "github.com/functionland/go-fula/exchange" @@ -51,6 +52,9 @@ type Client struct { bl blockchain.Blockchain bloxPid peer.ID relays []string + + streams map[string]*blockchain.StreamBuffer // Map of active streams + mu sync.Mutex // Mutex for thread-safe access } func NewClient(cfg *Config) (*Client, error) { @@ -58,6 +62,10 @@ func NewClient(cfg *Config) (*Client, error) { if err := cfg.init(&mc); err != nil { return nil, err } + + // Initialize the streams map for managing active streaming sessions + mc.streams = make(map[string]*blockchain.StreamBuffer) + return &mc, nil } diff --git a/mobile/stream_iter.go b/mobile/stream_iter.go new file mode 100644 index 0000000..383aaca --- /dev/null +++ b/mobile/stream_iter.go @@ -0,0 +1,18 @@ +package fulamobile + +import ( + "github.com/functionland/go-fula/blockchain" +) + +type StreamIterator struct { + buffer *blockchain.StreamBuffer +} + +func (i *StreamIterator) HasNext() bool { + chunk, _ := i.buffer.GetChunk() + return chunk != "" +} + +func (i *StreamIterator) Next() (string, error) { + return i.buffer.GetChunk() +} diff --git a/wap/pkg/wifi/properties.go b/wap/pkg/wifi/properties.go index 4d9b776..7366092 100644 --- a/wap/pkg/wifi/properties.go +++ b/wap/pkg/wifi/properties.go @@ -1,6 +1,7 @@ package wifi import ( + "bufio" "bytes" "context" "crypto/rand" @@ -9,6 +10,7 @@ import ( "encoding/json" "fmt" "io" + "net/http" "os/exec" "regexp" "strconv" @@ -103,6 +105,16 @@ type SyncInfo struct { Speed string } +type ChatWithAIRequest struct { + AIModel string `json:"ai_model"` + UserMessage string `json:"user_message"` +} + +type ChatWithAIResponse struct { + Status bool `json:"status"` + Msg string `json:"msg"` +} + const ( B = 1 KB = 1024 * B @@ -337,6 +349,59 @@ func FetchContainerLogs(ctx context.Context, req FetchContainerLogsRequest) (str } } +func FetchAIResponse(ctx context.Context, aiModel string, userMessage string) (<-chan string, error) { + url := "http://127.0.0.1:8080/get_RKLLM_output" + + payload := map[string]string{ + "ai_model": aiModel, + "user_message": userMessage, + } + jsonPayload, err := json.Marshal(payload) + if err != nil { + return nil, fmt.Errorf("failed to marshal payload: %v", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonPayload)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request: %v", err) + } + + if resp.StatusCode != http.StatusOK { + defer resp.Body.Close() + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("received non-OK status code: %d; body: %s", resp.StatusCode, bodyBytes) + } + + responseChannel := make(chan string) + + go func() { + defer close(responseChannel) + defer resp.Body.Close() + + reader := bufio.NewReader(resp.Body) + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + break + } + fmt.Printf("Error reading response stream: %v\n", err) + break + } + responseChannel <- line // Send each chunk of data as it arrives + } + }() + + return responseChannel, nil +} + func fetchLogsFromDocker(ctx context.Context, containerName string, tailCount string) (string, error) { cli, err := client.NewClientWithOpts(client.WithAPIVersionNegotiation(), client.WithHost("unix:///var/run/docker.sock")) if err != nil {