Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to using a single worker #497

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ env_variables:
APP_ID: 119816
KEY_SECRET: "gcpsecretmanager://projects/allstar-ossf/secrets/allstar-private-key?decoder=bytes"
DO_NOTHING_ON_OPT_OUT: true
ALLSTAR_NUM_WORKERS: 1
1 change: 1 addition & 0 deletions app-staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ resources:
env_variables:
APP_ID: 166485
KEY_SECRET: "gcpsecretmanager://projects/allstar-ossf/secrets/allstar-staging-private-key?decoder=bytes"
ALLSTAR_NUM_WORKERS: 1
6 changes: 2 additions & 4 deletions cmd/allstar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ func main() {
specificPolicyArg := flag.String("policy", "", fmt.Sprintf("Run a specific policy check. Supported policies: %s", supportedPoliciesMsg))
specificRepoArg := flag.String("repo", "", "Run on a specific \"owner/repo\". For example \"ossf/allstar\"")

numWorkersArg := flag.Int("workers", 5, "maximum number of active goroutines for Allstar scans")

flag.Parse()

if *specificPolicyArg != "" {
Expand All @@ -83,7 +81,7 @@ func main() {
}

if runOnce {
_, err := enforce.EnforceAll(ctx, ghc, *specificPolicyArg, *specificRepoArg, *numWorkersArg)
_, err := enforce.EnforceAll(ctx, ghc, *specificPolicyArg, *specificRepoArg)
if err != nil {
log.Fatal().
Err(err).
Expand All @@ -96,7 +94,7 @@ func main() {
go func() {
defer wg.Done()
log.Info().
Err(enforce.EnforceJob(ctx, ghc, (5 * time.Minute), *specificPolicyArg, *specificRepoArg, *numWorkersArg)).
Err(enforce.EnforceJob(ctx, ghc, (5 * time.Minute), *specificPolicyArg, *specificRepoArg)).
Msg("Enforce job shutting down.")
}()
sigs := make(chan os.Signal, 1)
Expand Down
14 changes: 14 additions & 0 deletions pkg/config/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ const setNoticePingDurationHrs = (24 * time.Hour)

var NoticePingDuration time.Duration

// NumWorkers is the number of concurrent orginazations/installations the
// Allstar binary will scan concurrently.
const setNumWorkers = 5

var NumWorkers int

var osGetenv func(string) string

func init() {
Expand Down Expand Up @@ -147,4 +153,12 @@ func setVars() {

allowedOrgs := osGetenv("GITHUB_ALLOWED_ORGS")
AllowedOrganizations = strings.Split(allowedOrgs, ",")

nws := osGetenv("ALLSTAR_NUM_WORKERS")
nw, err := strconv.Atoi(nws)
if err == nil {
NumWorkers = nw
} else {
NumWorkers = setNumWorkers
}
}
11 changes: 7 additions & 4 deletions pkg/enforce/enforce.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func init() {
//
// TBD: determine if this should remain exported, or if it will only be called
// from EnforceJob.
func EnforceAll(ctx context.Context, ghc ghclients.GhClientsInterface, specificPolicyArg string, specificRepoArg string, numWorkersArg int) (EnforceAllResults, error) {
func EnforceAll(ctx context.Context, ghc ghclients.GhClientsInterface, specificPolicyArg string, specificRepoArg string) (EnforceAllResults, error) {
var repoCount int
var enforceAllResults = make(EnforceAllResults)
ac, err := ghc.Get(0)
Expand All @@ -85,10 +85,13 @@ func EnforceAll(ctx context.Context, ghc ghclients.GhClientsInterface, specificP
Msg("Enforcing policies on installations.")

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(numWorkersArg)
g.SetLimit(operator.NumWorkers)
var mu sync.Mutex

for _, i := range insts {
if ctx.Err() != nil {
break
}
if i.SuspendedAt != nil {
log.Info().
Str("area", "bot").
Expand Down Expand Up @@ -302,9 +305,9 @@ func getAppInstallationReposReal(ctx context.Context, ic *github.Client) ([]*git

// EnforceJob is a reconciliation job that enforces policies on all repos every
// d duration. It runs forever until the context is done.
func EnforceJob(ctx context.Context, ghc *ghclients.GHClients, d time.Duration, specificPolicyArg string, specificRepoArg string, numWorkersArg int) error {
func EnforceJob(ctx context.Context, ghc *ghclients.GHClients, d time.Duration, specificPolicyArg string, specificRepoArg string) error {
for {
_, err := EnforceAll(ctx, ghc, specificPolicyArg, specificRepoArg, numWorkersArg)
_, err := EnforceAll(ctx, ghc, specificPolicyArg, specificRepoArg)
if err != nil {
log.Error().
Err(err).
Expand Down
8 changes: 3 additions & 5 deletions pkg/enforce/enforce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,7 @@ func TestEnforceAll(t *testing.T) {
policy1Results = test.Policy1Results
policy2Results = test.Policy2Results

numWorkers := 1
enforceAllResults, err := EnforceAll(context.Background(), mockGhc, "", "", numWorkers)
enforceAllResults, err := EnforceAll(context.Background(), mockGhc, "", "")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -582,16 +581,15 @@ func TestSuspendedEnforce(t *testing.T) {
}
suspended = false
gaicalled = false
numWorkers := 1
if _, err := EnforceAll(context.Background(), &MockGhClients{}, "", "", numWorkers); err != nil {
if _, err := EnforceAll(context.Background(), &MockGhClients{}, "", ""); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !gaicalled {
t.Errorf("Expected getAppInstallationRepos() to be called, but wasn't")
}
suspended = true
gaicalled = false
if _, err := EnforceAll(context.Background(), &MockGhClients{}, "", "", numWorkers); err != nil {
if _, err := EnforceAll(context.Background(), &MockGhClients{}, "", ""); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if gaicalled {
Expand Down
Loading