Skip to content

Commit

Permalink
Merge pull request #175 from jackchenjc/bootstrap-issue-468
Browse files Browse the repository at this point in the history
feat!: Passing callback function into WatchForChanges method
  • Loading branch information
cloudxxx8 authored Jan 22, 2025
2 parents f8d6f91 + 9a5a38b commit 3550557
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 15 deletions.
4 changes: 2 additions & 2 deletions configuration/interface.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//
// Copyright (c) 2019 Intel Corporation
// Copyright (C) 2024 IOTech Ltd
// Copyright (C) 2024-2025 IOTech Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,7 +41,7 @@ type Client interface {
// WatchForChanges sets up a keeper watch for the target key and send back updates on the update channel.
// Passed in struct is only a reference for Configuration service, empty struct is ok
// Sends the configuration in the target struct as interface{} on updateChannel, which caller must cast
WatchForChanges(updateChannel chan<- interface{}, errorChannel chan<- error, configuration interface{}, waitKey string, msgClient messaging.MessageClient)
WatchForChanges(updateChannel chan<- interface{}, errorChannel chan<- error, configuration interface{}, waitKey string, getMsgClientCb func() messaging.MessageClient)

// StopWatching causes all WatchForChanges processing to stop and waits until they have stopped.
StopWatching()
Expand Down
58 changes: 51 additions & 7 deletions configuration/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions internal/pkg/keeper/client.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2024 IOTech Ltd
// Copyright (C) 2024-2025 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand Down Expand Up @@ -167,8 +167,9 @@ func (k *keeperClient) GetConfiguration(configStruct interface{}) (interface{},
return configStruct, nil
}

func (k *keeperClient) WatchForChanges(updateChannel chan<- interface{}, errorChannel chan<- error, configuration interface{}, waitKey string, messageBus messaging.MessageClient) {
if messageBus == nil {
func (k *keeperClient) WatchForChanges(updateChannel chan<- interface{}, errorChannel chan<- error, configuration interface{}, waitKey string, getMsgClientCb func() messaging.MessageClient) {
messageClient := getMsgClientCb()
if messageClient == nil {
configErr := errors.New("unable to use MessageClient to watch for configuration changes")
errorChannel <- configErr
return
Expand All @@ -184,16 +185,16 @@ func (k *keeperClient) WatchForChanges(updateChannel chan<- interface{}, errorCh
}

watchErrors := make(chan error)
err := messageBus.Subscribe(topics, watchErrors)
err := messageClient.Subscribe(topics, watchErrors)
if err != nil {
_ = messageBus.Disconnect()
_ = messageClient.Disconnect()
errorChannel <- err
return
}

go func() {
defer func() {
_ = messageBus.Disconnect()
_ = messageClient.Disconnect()
}()

// send a nil value to updateChannel once the watcher connection is established
Expand Down

0 comments on commit 3550557

Please sign in to comment.