Skip to content

Commit

Permalink
feat: Adding TasksAPI.CreateTaskByFlux
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Apr 29, 2022
1 parent f50acad commit 982fa2d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
## 2.9.0 unreleased
### Features
- [#323](https://github.com/influxdata/influxdb-client-go/pull/323) Added `TasksAPI.CreateTaskByFlux` to allow full control of task script.

## 2.8.2 [2022-04-19]
### Bug fixes
- [#319](https://github.com/influxdata/influxdb-client-go/issues/319) Synchronize `WriteAPIImpl.Close` to prevent panic when closing client by multiple go-routines.
- [#319](https://github.com/influxdata/influxdb-client-go/pull/319) Synchronize `WriteAPIImpl.Close` to prevent panic when closing client by multiple go-routines.

## 2.8.1 [2022-03-21]
### Bug fixes
Expand Down
29 changes: 20 additions & 9 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,18 @@ type TasksAPI interface {
GetTask(ctx context.Context, task *domain.Task) (*domain.Task, error)
// GetTaskByID retrieves a task found using taskID.
GetTaskByID(ctx context.Context, taskID string) (*domain.Task, error)
// CreateTask creates a new task according the the task object.
// CreateTask creates a new task according the task object.
// It copies OrgId, Name, Description, Flux, Status and Every or Cron properties. Every and Cron are mutually exclusive.
// Every has higher priority.
CreateTask(ctx context.Context, task *domain.Task) (*domain.Task, error)
// CreateTaskWithEvery creates a new task with with the name, flux script and every repetition setting, in the org orgID.
// CreateTaskWithEvery creates a new task with the name, flux script and every repetition setting, in the org orgID.
// Every holds duration values.
CreateTaskWithEvery(ctx context.Context, name, flux, every, orgID string) (*domain.Task, error)
// CreateTaskWithCron creates a new task with with the name, flux script and cron repetition setting, in the org orgID
// CreateTaskWithCron creates a new task with the name, flux script and cron repetition setting, in the org orgID
// Cron holds cron-like setting, e.g. once an hour at beginning of the hour "0 * * * *".
CreateTaskWithCron(ctx context.Context, name, flux, cron, orgID string) (*domain.Task, error)
// CreateTaskByFlux creates a new task with complete definition in flux script, in the org orgID
CreateTaskByFlux(ctx context.Context, flux, orgID string) (*domain.Task, error)
// UpdateTask updates a task.
// It copies Description, Flux, Status, Offset and Every or Cron properties. Every and Cron are mutually exclusive.
// Every has higher priority.
Expand Down Expand Up @@ -217,37 +219,46 @@ func (t *tasksAPI) createTask(ctx context.Context, taskReq *domain.TaskCreateReq
return response.JSON201, nil
}

func createTaskReq(name, flux string, every, cron *string, orgID string) *domain.TaskCreateRequest {
func createTaskReqDetailed(name, flux string, every, cron *string, orgID string) *domain.TaskCreateRequest {
repetition := ""
if every != nil {
repetition = fmt.Sprintf("every: %s", *every)
} else if cron != nil {
repetition = fmt.Sprintf(`cron: "%s"`, *cron)
}
fullFlux := fmt.Sprintf(`option task = { name: "%s", %s } %s`, name, repetition, flux)
return createTaskReq(fullFlux, orgID)
}
func createTaskReq(flux string, orgID string) *domain.TaskCreateRequest {

status := domain.TaskStatusTypeActive
taskReq := &domain.TaskCreateRequest{
Flux: fmt.Sprintf(`option task = { name: "%s", %s }
%s`, name, repetition, flux),
Flux: flux,
Status: &status,
OrgID: &orgID,
}
return taskReq
}

func (t *tasksAPI) CreateTask(ctx context.Context, task *domain.Task) (*domain.Task, error) {
taskReq := createTaskReq(task.Name, task.Flux, task.Every, task.Cron, task.OrgID)
taskReq := createTaskReqDetailed(task.Name, task.Flux, task.Every, task.Cron, task.OrgID)
taskReq.Description = task.Description
taskReq.Status = task.Status
return t.createTask(ctx, taskReq)
}

func (t *tasksAPI) CreateTaskWithEvery(ctx context.Context, name, flux, every, orgID string) (*domain.Task, error) {
taskReq := createTaskReq(name, flux, &every, nil, orgID)
taskReq := createTaskReqDetailed(name, flux, &every, nil, orgID)
return t.createTask(ctx, taskReq)
}

func (t *tasksAPI) CreateTaskWithCron(ctx context.Context, name, flux, cron, orgID string) (*domain.Task, error) {
taskReq := createTaskReq(name, flux, nil, &cron, orgID)
taskReq := createTaskReqDetailed(name, flux, nil, &cron, orgID)
return t.createTask(ctx, taskReq)
}

func (t *tasksAPI) CreateTaskByFlux(ctx context.Context, flux, orgID string) (*domain.Task, error) {
taskReq := createTaskReq(flux, orgID)
return t.createTask(ctx, taskReq)
}

Expand Down
28 changes: 27 additions & 1 deletion api/tasks_e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build e2e
// +build e2e

// Copyright 2020-2021 InfluxData, Inc. All rights reserved.
Expand Down Expand Up @@ -112,7 +113,29 @@ func TestTasksAPI_CRUDTask(t *testing.T) {
if assert.NotNil(t, task3.Status) {
assert.Equal(t, taskStatus, *task3.Status, *task3.Status)
}
assert.Equal(t, *org.Id, task3.OrgID, task3.OrgID)

flux := `import "types"
option task = {
name: "task 04",
every: 1h,
}
from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")`
task4, err := tasksAPI.CreateTaskByFlux(ctx, flux, *org.Id)
require.Nil(t, err)
require.NotNil(t, task4)

assert.Equal(t, "task 04", task4.Name, task4.Name)
assert.Nil(t, task4.Description)
if assert.NotNil(t, task4.Every) {
assert.Equal(t, "1h", *task4.Every, *task4.Every)
}
if assert.NotNil(t, task4.Status) {
assert.Equal(t, domain.TaskStatusTypeActive, *task4.Status, *task4.Status)
}
assert.Equal(t, *org.Id, task4.OrgID, task4.OrgID)

err = tasksAPI.DeleteTask(ctx, task1)
assert.Nil(t, err)
Expand All @@ -123,6 +146,9 @@ func TestTasksAPI_CRUDTask(t *testing.T) {
err = tasksAPI.DeleteTask(ctx, task3)
assert.Nil(t, err)

err = tasksAPI.DeleteTask(ctx, task4)
assert.Nil(t, err)

tasks, err = tasksAPI.FindTasks(ctx, nil)
require.Nil(t, err)
assert.Len(t, tasks, 0)
Expand Down

0 comments on commit 982fa2d

Please sign in to comment.