Skip to content

Commit

Permalink
Hot Reload: Subscription (dapr#7583)
Browse files Browse the repository at this point in the history
* Hot Reload: Subscription

Adds support for Subscription Hot Reloading.

If the Hot Reloading preview feature is enabled, create, update and
delete events to declarative Subscriptions will be reflected in the
Subscriptions runtime without needing to restart the Dapr runtime. Works
in both self-hosted and Kubernetes modes.

Note that, because topic routes and not indexed by Subscription
resources, all Subscriptions will be cancelled and topic routes re-built
and re-subscribed when a Subscription event occurs. A possible
improvement would to only cancel all topic route subscriptions which are
related to the Subscription pubsub or include an index of the
Subscription which a topic is routed for, however this is out of scope
of this change. Support for per topic route cancellation will be needed
for on-demand bi-directional subscriptions.

In-flight messages are not effected by Subscription hot reloading.

Branched from dapr#7582

Closes dapr#7139

Signed-off-by: joshvanl <[email protected]>

* Load all Subscriptions before reloading on boot

Signed-off-by: joshvanl <[email protected]>

* Adds unit tests for subscription client side scope filtering

Signed-off-by: joshvanl <[email protected]>

* Linting

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
Co-authored-by: Dapr Bot <[email protected]>
  • Loading branch information
2 people authored and cicoyle committed May 24, 2024
1 parent a110feb commit bfeaa90
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 27 deletions.
23 changes: 13 additions & 10 deletions pkg/operator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ const (
var log = logger.NewLogger("dapr.operator.api")

type Options struct {
Client client.Client
Cache cache.Cache
Security security.Provider
Port int
Client client.Client
Cache cache.Cache
Security security.Provider
Port int
ListenAddress string
}

// Server runs the Dapr API server for components and configurations.
Expand All @@ -62,9 +63,10 @@ type Server interface {

type apiServer struct {
operatorv1pb.UnimplementedOperatorServer
Client client.Client
sec security.Provider
port string
Client client.Client
sec security.Provider
port string
listenAddress string

compInformer informer.Interface[componentsapi.Component]

Expand All @@ -85,6 +87,7 @@ func NewAPIServer(opts Options) Server {
}),
sec: opts.Security,
port: strconv.Itoa(opts.Port),
listenAddress: opts.ListenAddress,
allEndpointsUpdateChan: make(map[string]chan *httpendpointsapi.HTTPEndpoint),
allSubscriptionUpdateChan: make(map[string]chan *SubscriptionUpdateEvent),
readyCh: make(chan struct{}),
Expand All @@ -97,7 +100,7 @@ func (a *apiServer) Run(ctx context.Context) error {
return errors.New("api server already running")
}

log.Infof("Starting gRPC server on port %s", a.port)
log.Infof("Starting gRPC server on %s:%s", a.listenAddress, a.port)

sec, err := a.sec.Handler(ctx)
if err != nil {
Expand All @@ -107,7 +110,7 @@ func (a *apiServer) Run(ctx context.Context) error {
s := grpc.NewServer(sec.GRPCServerOptionMTLS())
operatorv1pb.RegisterOperatorServer(s, a)

lis, err := net.Listen("tcp", ":"+a.port)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%s", a.listenAddress, a.port))
if err != nil {
return fmt.Errorf("error starting tcp listener: %w", err)
}
Expand Down Expand Up @@ -143,4 +146,4 @@ func (a *apiServer) Ready(ctx context.Context) error {
case <-ctx.Done():
return errors.New("timeout waiting for api server to be ready")
}
}
}
2 changes: 1 addition & 1 deletion pkg/operator/api/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,4 @@ func getSecret(ctx context.Context, name, namespace string, ref commonapi.Secret
}

return commonapi.DynamicValue{}, nil
}
}
2 changes: 1 addition & 1 deletion pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (o *operator) Run(ctx context.Context) error {
log.Info("Dapr Operator is starting")
healthzServer := health.NewServer(health.Options{
Log: log,
Targets: ptr.Of(5),
Targets: ptr.Of(6),
})

/*
Expand Down
61 changes: 46 additions & 15 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,22 +184,22 @@ func newDaprRuntime(ctx context.Context,
})

processor := processor.New(processor.Options{
ID: runtimeConfig.id,
Namespace: namespace,
IsHTTP: runtimeConfig.appConnectionConfig.Protocol.IsHTTP(),
ActorsEnabled: len(runtimeConfig.actorsService) > 0,
ID: runtimeConfig.id,
Namespace: namespace,
IsHTTP: runtimeConfig.appConnectionConfig.Protocol.IsHTTP(),
ActorsEnabled: len(runtimeConfig.actorsService) > 0,
SchedulerEnabled: runtimeConfig.schedulerAddress != "",
Registry: runtimeConfig.registry,
ComponentStore: compStore,
Meta: meta,
GlobalConfig: globalConfig,
Resiliency: resiliencyProvider,
Mode: runtimeConfig.mode,
PodName: podName,
OperatorClient: operatorClient,
GRPC: grpc,
Channels: channels,
MiddlewareHTTP: httpMiddleware,
Registry: runtimeConfig.registry,
ComponentStore: compStore,
Meta: meta,
GlobalConfig: globalConfig,
Resiliency: resiliencyProvider,
Mode: runtimeConfig.mode,
PodName: podName,
OperatorClient: operatorClient,
GRPC: grpc,
Channels: channels,
MiddlewareHTTP: httpMiddleware,
})

var reloader *hotreload.Reloader
Expand Down Expand Up @@ -1096,6 +1096,37 @@ func (a *DaprRuntime) loadDeclarativeSubscriptions(ctx context.Context) error {
return nil
}

func (a *DaprRuntime) loadDeclarativeSubscriptions(ctx context.Context) error {
var loader loader.Loader[subapi.Subscription]

switch a.runtimeConfig.mode {
case modes.KubernetesMode:
loader = kubernetes.NewSubscriptions(kubernetes.Options{
Client: a.operatorClient,
Namespace: a.namespace,
PodName: a.podName,
})
case modes.StandaloneMode:
loader = disk.NewSubscriptions(a.runtimeConfig.standalone.ResourcesPath...)
default:
return nil
}

log.Info("Loading Declarative Subscriptions…")
subs, err := loader.Load(ctx)
if err != nil {
return err
}

for _, s := range subs {
log.Infof("Found Subscription: %s", s.Name)
}

a.processor.AddPendingSubscription(ctx, subs...)

return nil
}

func (a *DaprRuntime) flushOutstandingHTTPEndpoints(ctx context.Context) {
log.Info("Waiting for all outstanding http endpoints to be processed…")
// We flush by sending a no-op http endpoint. Since the processHTTPEndpoints goroutine only reads one http endpoint at a time,
Expand Down

0 comments on commit bfeaa90

Please sign in to comment.