Skip to content

Commit

Permalink
Fix data races
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong committed Nov 15, 2024
1 parent c55d13d commit e3d3faf
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions controller/api/destination/federated_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type federatedServiceWatcher struct {
localEndpoints *watcher.EndpointsWatcher

log *logging.Entry

sync.Mutex
}

type remoteDiscoveryID struct {
Expand Down Expand Up @@ -109,10 +111,14 @@ func (fsw *federatedServiceWatcher) Subscribe(
endStream chan struct{},
) error {
id := watcher.ServiceID{Namespace: namespace, Name: service}
fsw.Lock()
if federatedService, ok := fsw.services[id]; ok {
fsw.Unlock()
fsw.log.Debugf("Subscribing to federated service %s/%s", namespace, service)
federatedService.subscribe(port, nodeName, instanceID, stream, endStream)
return nil
} else {
fsw.Unlock()
}
return fmt.Errorf("service %s/%s is not a federated service", namespace, service)
}
Expand All @@ -123,9 +129,13 @@ func (fsw *federatedServiceWatcher) Unsubscribe(
stream pb.Destination_GetServer,
) {
id := watcher.ServiceID{Namespace: namespace, Name: service}
fsw.Lock()
if federatedService, ok := fsw.services[id]; ok {
fsw.Unlock()
fsw.log.Debugf("Unsubscribing from federated service %s/%s", namespace, service)
federatedService.unsubscribe(stream)
} else {
fsw.Unlock()
}
}

Expand All @@ -137,20 +147,27 @@ func (fsw *federatedServiceWatcher) addService(obj interface{}) {
}

if isFederatedService(service) {
fsw.Lock()
if federatedService, ok := fsw.services[id]; ok {
fsw.Unlock()
fsw.log.Debugf("Updating federated service %s/%s", service.Namespace, service.Name)
federatedService.update(service)
} else {
fsw.log.Debugf("Adding federated service %s/%s", service.Namespace, service.Name)
federatedService = fsw.newFederatedService(service)
fsw.services[id] = federatedService
fsw.Unlock()
federatedService.update(service)
}
} else {
fsw.Lock()
if federatedService, ok := fsw.services[id]; ok {
delete(fsw.services, id)
fsw.Unlock()
fsw.log.Debugf("Service %s/%s is no longer a federated service", service.Namespace, service.Name)
federatedService.delete()
delete(fsw.services, id)
} else {
fsw.Unlock()
}
}
}
Expand Down Expand Up @@ -268,6 +285,9 @@ func (fs *federatedService) subscribe(
stream pb.Destination_GetServer,
endStream chan struct{},
) {
fs.Lock()
defer fs.Unlock()

syncStream := newSyncronizedGetStream(stream, fs.log)
syncStream.Start()

Expand All @@ -287,8 +307,6 @@ func (fs *federatedService) subscribe(
fs.localDiscoverySubscribe(&subscriber, fs.localDiscovery)
}

fs.Lock()
defer fs.Unlock()
fs.subscribers = append(fs.subscribers, subscriber)
}

Expand Down

0 comments on commit e3d3faf

Please sign in to comment.