Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
khanzadimahdi committed Jan 18, 2025
1 parent 6bb0156 commit d24f21f
Show file tree
Hide file tree
Showing 8 changed files with 458 additions and 25 deletions.
48 changes: 48 additions & 0 deletions domain/runner/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package runner

import (
"context"
)

type TasksDefinition struct {
Tasks []Task
}

type Task struct {
Name string
Image string
Command []string
Cleanup bool
Resources Resources
WaitToComplete bool
}

type Resources struct {
Limits ResourceLimits
Reservations ResourceReservations
}

type ResourceLimits struct {
CPUs string
Memory string
}

type ResourceReservations struct {
CPUs string
Memory string
}

type Runner interface {
Run(ctx context.Context, isDone chan<- bool)
}

type ContainerManager interface {
CreateContainer(ctx context.Context, task Task) (string, error)
StartContainer(ctx context.Context, id string) error
WaitForContainer(ctx context.Context, id string) (bool, error)
RemoveContainer(ctx context.Context, id string) error
}

type ImageManager interface {
PullImage(ctx context.Context, image string) error
}
45 changes: 37 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,70 @@ module github.com/khanzadimahdi/testproject
go 1.23

require (
github.com/docker/docker v27.5.0+incompatible
github.com/gofrs/uuid/v5 v5.2.0
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/go-cmp v0.5.9
github.com/google/go-cmp v0.6.0
github.com/minio/minio-go/v7 v7.0.70
github.com/nats-io/nats.go v1.37.0
github.com/pkg/errors v0.9.1
github.com/sethvargo/go-limiter v1.0.0
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
go.mongodb.org/mongo-driver v1.15.0
golang.org/x/crypto v0.28.0
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
)

require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
go.opentelemetry.io/otel v1.33.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.33.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.33.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.6.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.67.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.3.0 // indirect
)
137 changes: 120 additions & 17 deletions go.sum

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions infrastructure/docker/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package docker

import "github.com/docker/docker/client"

func NewClient() (*client.Client, error) {
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil, err
}

return cli, nil
}
71 changes: 71 additions & 0 deletions infrastructure/docker/container/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package container

import (
"context"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/khanzadimahdi/testproject/domain/runner"
)

type manager struct {
cli *client.Client
}

var _ runner.ContainerManager = &manager{}

func NewManager(cli *client.Client) *manager {
return &manager{
cli: cli,
}
}

// CreateContainer creates a new container and returns it ID.
func (m *manager) CreateContainer(ctx context.Context, task runner.Task) (string, error) {
config := &container.Config{
Image: task.Image,
Cmd: task.Command,
}

res, err := m.cli.ContainerCreate(ctx, config, &container.HostConfig{}, nil, nil, task.Name)
if err != nil {
return "", err
}

return res.ID, nil
}

// StartContainer starts the container created with given ID.
func (m *manager) StartContainer(ctx context.Context, id string) error {
return m.cli.ContainerStart(ctx, id, container.StartOptions{})
}

// WaitForContainer waits for the running container to finish.
func (m *manager) WaitForContainer(ctx context.Context, id string) (bool, error) {
// check if the container is in running state
if _, err := m.cli.ContainerInspect(ctx, id); err != nil {
return true, nil
}

// send API call to wait for the container completion
wait, errC := m.cli.ContainerWait(ctx, id, container.WaitConditionNotRunning)

// check if container exit code is 0, and return accordingly
select {
case status := <-wait:
if status.StatusCode == 0 {
return true, nil
}

return false, nil
case err := <-errC:
return false, err
case <-ctx.Done():
return false, ctx.Err()
}
}

// RemoveContainer removes the given container id
func (m *manager) RemoveContainer(ctx context.Context, id string) error {
return m.cli.ContainerRemove(ctx, id, container.RemoveOptions{})
}
70 changes: 70 additions & 0 deletions infrastructure/docker/image/image.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package image

import (
"context"
"encoding/json"
"fmt"
"io"

"github.com/docker/docker/api/types/image"
"github.com/docker/docker/client"
"github.com/khanzadimahdi/testproject/domain/runner"
"github.com/pkg/errors"
)

type ImagePullStatus struct {
Status string `json:"status"`
Error string `json:"error"`
Progress string `json:"progress"`
ProgressDetail struct {
Current int `json:"current"`
Total int `json:"total"`
} `json:"progressDetail"`
}

type manager struct {
cli *client.Client
}

var _ runner.ImageManager = &manager{}

func NewManager(cli *client.Client) *manager {
return &manager{
cli: cli,
}
}

// PullImage outputs to stdout the contents of the runner image.
func (m *manager) PullImage(ctx context.Context, imageName string) error {
out, err := m.cli.ImagePull(ctx, imageName, image.PullOptions{})
if err != nil {
return errors.New("DOCKER PULL")
}

defer func() {
if err := out.Close(); err != nil {
fmt.Println(err)
}
}()

fd := json.NewDecoder(out)
var status *ImagePullStatus
for {
if err := fd.Decode(&status); err != nil {
if errors.Is(err, io.EOF) {
break
}

return errors.Wrap(err, "DOCKER PULL")
}

if status.Error != "" {
return errors.Wrap(errors.New(status.Error), "DOCKER PULL")
}

// uncomment to log image pull status
// fmt.Println(status)
}

return nil
}
29 changes: 29 additions & 0 deletions infrastructure/runner/models/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package models

type TasksDefinition struct {
Tasks []Task `json:"tasks,omitempty"`
}

type Task struct {
Name string `json:"name,omitempty"`
Image string `json:"runner,omitempty"`
Command []string `json:"command,omitempty"`
Cleanup bool `json:"cleanup,omitempty"`
Resources Resources `json:"resources,omitempty"`
WaitToComplete bool `json:"wait_to_complete,omitempty"`
}

type Resources struct {
Limits ResourceLimits `json:"limits,omitempty"`
Reservations ResourceReservations `json:"reservations,omitempty"`
}

type ResourceLimits struct {
CPUs string `json:"cpus,omitempty"`
Memory string `json:"memory,omitempty"`
}

type ResourceReservations struct {
CPUs string `json:"cpus,omitempty"`
Memory string `json:"memory,omitempty"`
}
71 changes: 71 additions & 0 deletions infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package runner

import (
"context"
"fmt"

runnerDomain "github.com/khanzadimahdi/testproject/domain/runner"
)

type runner struct {
containerManager runnerDomain.ContainerManager
imageManager runnerDomain.ImageManager
}

var _ runnerDomain.Runner = &runner{}

Check failure on line 15 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

cannot use &runner{} (value of type *runner) as "github.com/khanzadimahdi/testproject/domain/runner".Runner value in variable declaration: *runner does not implement "github.com/khanzadimahdi/testproject/domain/runner".Runner (wrong type for method Run)

Check failure on line 15 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

cannot use &runner{} (value of type *runner) as "github.com/khanzadimahdi/testproject/domain/runner".Runner value in variable declaration: *runner does not implement "github.com/khanzadimahdi/testproject/domain/runner".Runner (wrong type for method Run)

func NewRunner(
containerManager runnerDomain.ContainerManager,
imageManager runnerDomain.ImageManager,
) *runner {
return &runner{
containerManager: containerManager,
imageManager: imageManager,
}
}

func (r *runner) Run(ctx context.Context, tasksDefinition runnerDomain.TasksDefinition) error {
for _, task := range tasksDefinition.Tasks {
if err != r.run(ctx, task); err != nil {

Check failure on line 29 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

undefined: err

Check failure on line 29 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

undefined: err
return err

Check failure on line 30 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

undefined: err

Check failure on line 30 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

undefined: err
}
}
}

Check failure on line 33 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

missing return

Check failure on line 33 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

missing return

func (r *runner) run(ctx context.Context, task runnerDomain.Task) error {
fmt.Println("preparing task - ", task.Name)
if err := r.imageManager.PullImage(ctx, task.Image); err != nil {
return err
}

id, err := r.containerManager.CreateContainer(ctx, task)
if err != nil {
return err
}

fmt.Println("starting task - ", task.Name)
err = r.containerManager.StartContainer(ctx, id)
if err != nil {
return err
}

statusSuccess, err := r.containerManager.WaitForContainer(ctx, id)
if err != nil {
return err
}

if statusSuccess {
fmt.Println("completed task - ", task.Name)

// cleanup by removing the task container
if task.Cleanup {
fmt.Println("cleanup task - ", task.Name)
err = r.containerManager.RemoveContainer(ctx, id)
if err != nil {
fmt.Println(err)
}
}
} else {
fmt.Println("failed task - ", task.Name)
}
}

Check failure on line 71 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

missing return

Check failure on line 71 in infrastructure/runner/runner.go

View workflow job for this annotation

GitHub Actions / ci

missing return

0 comments on commit d24f21f

Please sign in to comment.