Skip to content

Commit

Permalink
fix(evpn-bridge): fix system behaviour for pending objects
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitrios Markou <[email protected]>
  • Loading branch information
mardim91 committed Jul 14, 2024
1 parent 708f948 commit 85bde59
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 20 deletions.
17 changes: 14 additions & 3 deletions pkg/infradb/subscriberframework/eventbus/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package eventbus

import (
"fmt"
"log"
"sort"
"sync"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (e *EventBus) Subscribe(moduleName, eventType string, priority int, eventHa

subscriber := &Subscriber{
Name: moduleName,
Ch: make(chan interface{}, 1),
Ch: make(chan interface{}),
Quit: make(chan bool),
Priority: priority,
}
Expand Down Expand Up @@ -128,10 +129,20 @@ func (e *EventBus) subscriberExist(eventType string, moduleName string) bool {
}

// Publish api notifies the subscribers with certain eventType
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) {
func (e *EventBus) Publish(objectData *ObjectData, subscriber *Subscriber) error {

Check warning on line 132 in pkg/infradb/subscriberframework/eventbus/eventbus.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/subscriberframework/eventbus/eventbus.go#L132

Added line #L132 was not covered by tests
e.publishL.RLock()
defer e.publishL.RUnlock()
subscriber.Ch <- objectData
var err error
// We need the default case here as if the subscriber is busy then we will not be able to sent to the subscriber channel
// and the Publish function will stuck. So the default case serves exctly this purpose.
select {
case subscriber.Ch <- objectData:
log.Printf("Publish(): Notification is sent to subscriber %s\n", subscriber.Name)
default:
log.Printf("Publish(): Channel for subsriber %s is busy. Notification not sent", subscriber.Name)
err = fmt.Errorf("channel is busy")

Check warning on line 143 in pkg/infradb/subscriberframework/eventbus/eventbus.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/subscriberframework/eventbus/eventbus.go#L135-L143

Added lines #L135 - L143 were not covered by tests
}
return err

Check warning on line 145 in pkg/infradb/subscriberframework/eventbus/eventbus.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/subscriberframework/eventbus/eventbus.go#L145

Added line #L145 was not covered by tests
}

// Unsubscribe the subscriber, which delete the subscriber(all resources will be washed out)
Expand Down
66 changes: 49 additions & 17 deletions pkg/infradb/taskmanager/taskmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type Task struct {
objectType string
resourceVersion string
subIndex int
retryTimer time.Duration
subs []*eventbus.Subscriber
// systemTimer is used only when we want to retry a Task due to unavailability of the Subscriber or not receiving a TaskStatus
systemTimer time.Duration
subs []*eventbus.Subscriber
}

// TaskStatus holds info related to the status that has been received
Expand Down Expand Up @@ -60,6 +61,7 @@ func newTask(name, objectType, resourceVersion string, subs []*eventbus.Subscrib
objectType: objectType,
resourceVersion: resourceVersion,
subIndex: 0,
systemTimer: 1 * time.Second,
subs: subs,
}
}
Expand Down Expand Up @@ -94,13 +96,18 @@ func (t *TaskManager) CreateTask(name, objectType, resourceVersion string, subs
// StatusUpdated creates a task status and sends it for handling
func (t *TaskManager) StatusUpdated(name, objectType, resourceVersion, notificationID string, dropTask bool, component *common.Component) {
taskStatus := newTaskStatus(name, objectType, resourceVersion, notificationID, dropTask, component)

// Do we need to make this call happen in a goroutine in order to not make the
// StatusUpdated function stuck in case that nobody reads what is written on the channel ?
// Is there any case where this can happen
// (nobody reads what is written on the channel and the StatusUpdated gets stuck) ?
t.taskStatusChan <- taskStatus
log.Printf("StatusUpdated(): New Task Status has been created and sent to channel: %+v\n", taskStatus)
log.Printf("StatusUpdated(): New Task Status has been created: %+v\n", taskStatus)

// We need to have a default case here so the call is not stuck if we send to channel but there is nobody reading from the channel.
// e.g. a subscriber got stuck and doesn't reply. The task will be requeued after the timer gets expired. In the meanwhile
// the subscriber replies and a taskStatus is sent to channel but the call gets stuck there as the previous task has not been requeued yet
// as the timer has not expired and the queue is empty (We assume that there is only one task in the queue).
select {
case t.taskStatusChan <- taskStatus:
log.Printf("StatusUpdated(): Task Status has been sent to channel: %+v\n", taskStatus)
default:
log.Printf("StatusUpdated(): Task Status has not been sent to channel. Channel not available: %+v\n", taskStatus)

Check warning on line 109 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L99-L109

Added lines #L99 - L109 were not covered by tests
}
}

// processTasks processes the task
Expand All @@ -123,7 +130,18 @@ func (t *TaskManager) processTasks() {
// (e.g. Maybe you have a timeout on the subscribers and you got the notification after the timeout have passed)
NotificationID: uuid.NewString(),
}
eventbus.EBus.Publish(objectData, sub)
if err := eventbus.EBus.Publish(objectData, sub); err != nil {
log.Printf("processTasks(): Notification not sent to subscriber %+v with data %+v. Subscriber is busy. The Task %+v will be requeued.\n", sub, objectData, task)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.
task.subIndex += i
task.systemTimer *= 2
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
})
break loopTwo

Check warning on line 143 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L133-L143

Added lines #L133 - L143 were not covered by tests
}
log.Printf("processTasks(): Notification has been sent to subscriber %+v with data %+v\n", sub, objectData)

loopThree:
Expand All @@ -143,11 +161,17 @@ func (t *TaskManager) processTasks() {
log.Printf("processTasks(): received notification id %+v doesn't equal the sent notification id %+v\n", taskStatus.notificationID, objectData.NotificationID)

// We need a timeout in case that the subscriber doesn't update the status at all for whatever reason.
// If that occurs then we just take a note which subscriber need to revisit and we requeue the task without any timer
// If that occurs then we just requeue the task with a timer
case <-time.After(30 * time.Second):
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued immediately Task Status %+v\n", sub, task, taskStatus)
log.Printf("processTasks(): No task status has been received in the channel from subscriber %+v. The task %+v will be requeued. Task Status %+v\n", sub, task, taskStatus)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.

Check warning on line 168 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L166-L168

Added lines #L166 - L168 were not covered by tests
task.subIndex += i
go t.taskQueue.Enqueue(task)
task.systemTimer *= 2
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.systemTimer)
time.AfterFunc(task.systemTimer, func() {
t.taskQueue.Enqueue(task)
})

Check warning on line 174 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L170-L174

Added lines #L170 - L174 were not covered by tests
break loopThree
}
}
Expand All @@ -159,19 +183,27 @@ func (t *TaskManager) processTasks() {
break loopTwo
}

// We re-initialize the systemTimer every time that we get a taskStatus. That means that the subscriber is available and has responded
task.systemTimer = 1 * time.Second

Check warning on line 188 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L187-L188

Added lines #L187 - L188 were not covered by tests
switch taskStatus.component.CompStatus {
case common.ComponentStatusSuccess:
log.Printf("processTasks(): Subscriber %+v has processed the task %+v successfully\n", sub, task)
continue loopTwo
default:
case common.ComponentStatusError:

Check warning on line 193 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L193

Added line #L193 was not covered by tests
log.Printf("processTasks(): Subscriber %+v has not processed the task %+v successfully\n", sub, task)
log.Printf("processTasks(): The Task will be requeued after %+v\n", taskStatus.component.Timer)
// We keep this subIndex in order to know from which subscriber to start iterating after the requeue of the Task
// so we do start again from the subscriber that returned an error or was unavailable for any reason.

Check warning on line 197 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L195-L197

Added lines #L195 - L197 were not covered by tests
task.subIndex += i
task.retryTimer = taskStatus.component.Timer
log.Printf("processTasks(): The Task will be requeued after %+v\n", task.retryTimer)
time.AfterFunc(task.retryTimer, func() {
time.AfterFunc(taskStatus.component.Timer, func() {

Check warning on line 199 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L199

Added line #L199 was not covered by tests
t.taskQueue.Enqueue(task)
})
break loopTwo
default:
log.Printf("processTasks(): Subscriber %+v has not provided designated status for the task %+v\n", sub, task)
log.Printf("processTasks(): The task %+v will be dropped\n", task)
break loopTwo

Check warning on line 206 in pkg/infradb/taskmanager/taskmanager.go

View check run for this annotation

Codecov / codecov/patch

pkg/infradb/taskmanager/taskmanager.go#L203-L206

Added lines #L203 - L206 were not covered by tests
}
}
}
Expand Down

0 comments on commit 85bde59

Please sign in to comment.