Skip to content

Commit

Permalink
refactor(pipeline): 增加系统事件的消息解析
Browse files Browse the repository at this point in the history
  • Loading branch information
san-all committed Jun 25, 2024
1 parent abab1c0 commit 196b9a9
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions internal/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/agile-edgex/app-functions-sdk-go/v3/internal/bootstrap/container"
"net/http"
"reflect"
"runtime"
Expand Down Expand Up @@ -234,21 +235,24 @@ func (fpr *FunctionsPipelineRuntime) ProcessMessage(appContext *appfunction.Cont

// DecodeMessage decode the message wrapped in the MessageEnvelope and return the data to be processed.
func (fpr *FunctionsPipelineRuntime) DecodeMessage(appContext *appfunction.Context, envelope types.MessageEnvelope) (interface{}, *MessageError, bool) {
// Default Target Type for the function pipeline is an Event DTO.
// The Event DTO can be wrapped in an AddEventRequest DTO or just be the un-wrapped Event DTO,
// which is handled dynamically below.
if fpr.TargetType == nil {
fpr.TargetType = &dtos.Event{}
config := container.ConfigurationFrom(fpr.dic.Get)

systemEventTopicPrefix := common.BuildTopic(config.MessageBus.GetBaseTopicPrefix(), common.SystemEventPublishTopic)
var targetType any = nil
if strings.HasPrefix(envelope.ReceivedTopic, systemEventTopicPrefix) {
targetType = &dtos.SystemEvent{}
} else {
targetType = &dtos.Event{}
}

if reflect.TypeOf(fpr.TargetType).Kind() != reflect.Ptr {
if reflect.TypeOf(targetType).Kind() != reflect.Ptr {
err := errors.New("TargetType must be a pointer, not a value of the target type")
fpr.logError(err, envelope.CorrelationID)
return nil, &MessageError{Err: err, ErrorCode: http.StatusInternalServerError}, false
}

// Must make a copy of the type so that data isn't retained between calls for custom types
target := reflect.New(reflect.ValueOf(fpr.TargetType).Elem().Type()).Interface()
target := reflect.New(reflect.ValueOf(targetType).Elem().Type()).Interface()

switch target.(type) {
case *[]byte:
Expand Down

0 comments on commit 196b9a9

Please sign in to comment.