Skip to content

Commit

Permalink
git mv edge/logs.go edge/scheduler/logs.go - jobID in logs == schedul…
Browse files Browse the repository at this point in the history
…e.ID

Signed-off-by: Sven Dowideit <[email protected]>
  • Loading branch information
SvenDowideit committed Jan 25, 2022
1 parent 5701949 commit be12a1c
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
7 changes: 4 additions & 3 deletions edge/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package edge
import (
"errors"
"fmt"
"github.com/portainer/agent/edge/scheduler"
"log"
"time"

Expand All @@ -18,7 +19,7 @@ type (
clusterService agent.ClusterService
dockerInfoService agent.DockerInfoService
key *edgeKey
logsManager *logsManager
logsManager *scheduler.LogsManager
pollService *PollService
pollServiceConfig *pollServiceConfig
stackManager *StackManager
Expand Down Expand Up @@ -74,8 +75,8 @@ func (manager *Manager) Start() error {
}
manager.stackManager = stackManager

manager.logsManager = newLogsManager(manager.key.PortainerInstanceURL, manager.key.EndpointID, manager.agentOptions.EdgeID, pollServiceConfig.InsecurePoll)
manager.logsManager.start()
manager.logsManager = scheduler.NewLogsManager(manager.key.PortainerInstanceURL, manager.key.EndpointID, manager.agentOptions.EdgeID, pollServiceConfig.InsecurePoll)
manager.logsManager.Start()

pollService, err := newPollService(manager.stackManager, manager.logsManager, pollServiceConfig)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions edge/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type PollService struct {
endpointID string
tunnelServerAddr string
tunnelServerFingerprint string
logsManager *logsManager
logsManager *scheduler.LogsManager
containerPlatform agent.ContainerPlatform
}

Expand All @@ -56,7 +56,7 @@ type pollServiceConfig struct {
}

// newPollService returns a pointer to a new instance of PollService
func newPollService(edgeStackManager *StackManager, logsManager *logsManager, config *pollServiceConfig) (*PollService, error) {
func newPollService(edgeStackManager *StackManager, logsManager *scheduler.LogsManager, config *pollServiceConfig) (*PollService, error) {
pollFrequency, err := time.ParseDuration(config.PollFrequency)
if err != nil {
return nil, err
Expand Down Expand Up @@ -293,7 +293,7 @@ func (service *PollService) poll() error {
}
}

service.logsManager.handleReceivedLogsRequests(logsToCollect)
service.logsManager.HandleReceivedLogsRequests(logsToCollect)

if responseData.CheckinInterval != service.pollIntervalInSeconds {
log.Printf("[DEBUG] [internal,edge,poll] [old_interval: %f] [new_interval: %f] [message: updating poll interval]", service.pollIntervalInSeconds, responseData.CheckinInterval)
Expand Down
16 changes: 8 additions & 8 deletions edge/logs.go → edge/scheduler/logs.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package edge
package scheduler

import (
"fmt"
Expand All @@ -10,7 +10,7 @@ import (
"github.com/portainer/agent/http/client"
)

type logsManager struct {
type LogsManager struct {
httpClient *client.PortainerClient
stopSignal chan struct{}
jobs map[int]logStatus
Expand All @@ -25,17 +25,17 @@ const (
logFailed
)

func newLogsManager(portainerURL, endpointID, edgeID string, insecurePoll bool) *logsManager {
func NewLogsManager(portainerURL, endpointID, edgeID string, insecurePoll bool) *LogsManager {
cli := client.NewPortainerClient(portainerURL, endpointID, edgeID, insecurePoll)

return &logsManager{
return &LogsManager{
httpClient: cli,
stopSignal: nil,
jobs: map[int]logStatus{},
}
}

func (manager *logsManager) start() error {
func (manager *LogsManager) Start() error {
if manager.stopSignal != nil {
return nil
}
Expand Down Expand Up @@ -100,15 +100,15 @@ func (manager *logsManager) start() error {
return nil
}

func (manager *logsManager) stop() {
func (manager *LogsManager) stop() {
if manager.stopSignal != nil {
log.Printf("[DEBUG] [internal,edge,logs] [message: logs manager stopped]")
close(manager.stopSignal)
manager.stopSignal = nil
}
}

func (manager *logsManager) handleReceivedLogsRequests(jobs []int) {
func (manager *LogsManager) HandleReceivedLogsRequests(jobs []int) {
for _, jobID := range jobs {
if _, ok := manager.jobs[jobID]; !ok {
log.Printf("[DEBUG] [internal,edge,logs] [job_identifier: %d] [message: added job to queue]", jobID)
Expand All @@ -117,7 +117,7 @@ func (manager *logsManager) handleReceivedLogsRequests(jobs []int) {
}
}

func (manager *logsManager) next() int {
func (manager *LogsManager) next() int {
for jobID, status := range manager.jobs {
if status == logPending {
return jobID
Expand Down

0 comments on commit be12a1c

Please sign in to comment.