Skip to content

Commit

Permalink
Add elastic documents purge job (#209)
Browse files Browse the repository at this point in the history
* Add ElasticDocPurge job to scheduler with deletion logic

This commit introduces a new `ElasticDocPurge` job type, enabling the purging of Elastic documents based on specific conditions. The corresponding logic for job validation, execution, and fact deletion has been implemented, including support for contextualized query construction and handling shard failures. Dependencies were updated to include the necessary SDK functionalities.

* Add ElasticDocPurge job to scheduler with deletion logic

This commit introduces a new `ElasticDocPurge` job type, enabling the purging of Elastic documents based on specific conditions. The corresponding logic for job validation, execution, and fact deletion has been implemented, including support for contextualized query construction and handling shard failures. Dependencies were updated to include the necessary SDK functionalities.

* Update myrtea-sdk to stable version 5.1.7

* Update internals/scheduler/elastic_doc_purge_job.go

Co-authored-by: Paul <[email protected]>

* Update internals/scheduler/elastic_doc_purge_job.go

Co-authored-by: Paul <[email protected]>

* Update internals/scheduler/elastic_doc_purge_job.go

Co-authored-by: Paul <[email protected]>

* Update internals/scheduler/elastic_doc_purge_job.go

Co-authored-by: Paul <[email protected]>

* Add continue statement to skip non-delete facts

---------

Co-authored-by: Paul <[email protected]>
  • Loading branch information
Ismail731404 and SchawnnDev authored Jan 7, 2025
1 parent a06e638 commit f4cf050
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/jmoiron/sqlx v1.4.0
github.com/json-iterator/go v1.1.12
github.com/lib/pq v1.10.9
github.com/myrteametrics/myrtea-sdk/v5 v5.1.6
github.com/myrteametrics/myrtea-sdk/v5 v5.1.7
github.com/prataprc/goparsec v0.0.0-20211219142520-daac0e635e7e
github.com/prometheus/client_golang v1.20.2
github.com/robfig/cron/v3 v3.0.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/myrteametrics/myrtea-sdk/v5 v5.1.6 h1:mRJ5ii2DbPUZLMiWZfTE7TSCkKPSh9FYzk4D/obpqpI=
github.com/myrteametrics/myrtea-sdk/v5 v5.1.6/go.mod h1:dXuc7MzW6V7t4kIupE0BFQGe8JzngDEkdbqzAeMy164=
github.com/myrteametrics/myrtea-sdk/v5 v5.1.7 h1:qOQcdWs1bpqZY7+xNgEm8ub4Oc5CawYgQrTGj4u8/Eg=
github.com/myrteametrics/myrtea-sdk/v5 v5.1.7/go.mod h1:dXuc7MzW6V7t4kIupE0BFQGe8JzngDEkdbqzAeMy164=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
Expand Down
59 changes: 59 additions & 0 deletions internals/fact/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package fact
import (
"context"
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v8/typedapi/core/deletebyquery"
"github.com/elastic/go-elasticsearch/v8/typedapi/types/enums/conflicts"
"github.com/myrteametrics/myrtea-sdk/v5/elasticsearch"
"strings"
"time"
Expand Down Expand Up @@ -62,6 +65,62 @@ func ExecuteFact(
return widgetData, nil
}

func ExecuteFactDeleteQuery(
ti time.Time,
f engine.Fact,
) (*deletebyquery.Response, error) {

parameters := make(map[string]string, 0)
f.ContextualizeDimensions(ti, parameters)
err := f.ContextualizeCondition(ti, parameters)
if err := f.ContextualizeCondition(ti, parameters); err != nil {
return nil, fmt.Errorf("failed to contextualize condition: %w", err)
}

searchRequest, err := elasticsearch.ConvertFactToSearchRequestV8(f, ti, parameters)
if err != nil {
zap.L().Error("ConvertFactToSearchRequestV8 failed", zap.Error(err))
return nil, fmt.Errorf("failed to convert fact to search request: %w", err)
}

indices := FindIndices(f, ti, false)
if len(indices) == 0 {
return nil, errors.New("no indices found for the fact")
}

zap.L().Debug("Preparing to execute DeleteByQuery",
zap.Strings("indices", indices),
zap.Any("request", searchRequest))

response, err := elasticsearch.C().DeleteByQuery(strings.Join(indices, ",")).
Query(searchRequest.Query).
Conflicts(conflicts.Proceed). // Ignore les conflits (insertion/suppression)
Do(context.Background())

if err != nil {
zap.L().Error("ES DeleteByQuery execution failed", zap.Error(err))
return nil, fmt.Errorf("failed to execute DeleteByQuery: %w", err)
}

// Check for failures in the response
if len(response.Failures) > 0 {
zap.L().Warn("DeleteByQuery encountered shard failures", zap.Any("failures", response.Failures))
return nil, errors.New("one or more shards failed during DeleteByQuery")
}

// Check if the request timed out
if response.TimedOut != nil && *response.TimedOut {
zap.L().Warn("DeleteByQuery timed out")
return nil, errors.New("delete by query operation timed out")
}

zap.L().Info("DeleteByQuery completed successfully",
zap.Int64p("deleted", response.Deleted),
zap.Int64p("version_conflicts", response.VersionConflicts))

return response, nil
}

func FindIndices(f engine.Fact, ti time.Time, update bool) []string {

var indices []string
Expand Down
81 changes: 81 additions & 0 deletions internals/scheduler/elastic_doc_purge_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package scheduler

import (
"errors"
"github.com/myrteametrics/myrtea-engine-api/v5/internals/fact"
"github.com/myrteametrics/myrtea-sdk/v5/engine"
"go.uber.org/zap"
"time"
)

type ElasticDocPurgeJob struct {
FactIds []int64 `json:"factIds"`
ScheduleID int64 `json:"-"`
}

func (job ElasticDocPurgeJob) IsValid() (bool, error) {
if job.FactIds == nil || len(job.FactIds) <= 0 {
return false, errors.New("missing FactIds")
}
return true, nil
}

func (job ElasticDocPurgeJob) Run() {

if S().ExistingRunningJob(job.ScheduleID) {
zap.L().Info("Skipping Elastic document purge job because last execution is still running", zap.Int64s("ids", job.FactIds))
return
}
S().AddRunningJob(job.ScheduleID)

zap.L().Info("Delete Elastic document job started", zap.Int64s("ids", job.FactIds))

t := time.Now().Truncate(1 * time.Second).UTC()

PurgeElasticDocs(t, job.FactIds)

zap.L().Info("Elastic document purge job ended", zap.Int64("id Schedule", job.ScheduleID))
S().RemoveRunningJob(job.ScheduleID)

}

func PurgeElasticDocs(t time.Time, factIds []int64) {
zap.L().Info("Starting Elastic document purge", zap.Time("timestamp", t), zap.Int("number_of_facts", len(factIds)))

for _, factId := range factIds {
zap.L().Debug("Processing fact", zap.Int64("factId", factId))

// Retrieve the Fact
f, found, err := fact.R().Get(factId)
if err != nil {
zap.L().Error("Error retrieving the fact; skipping deletion",
zap.Int64("factId", factId),
zap.Error(err))
continue
}

if !found {
zap.L().Warn("Fact does not exist; skipping deletion", zap.Int64("factId", factId))
continue
}

if f.Intent.Operator != engine.Delete {
zap.L().Warn("Fact is not a delete fact; skipping deletion", zap.Int64("factId", factId))
continue
}

// Execute deletion
_, err = fact.ExecuteFactDeleteQuery(t, f)
if err != nil {
zap.L().Error("Error during fact deletion",
zap.Int64("factId", f.ID),
zap.Any("fact", f),
zap.Error(err))
continue
}

zap.L().Info("Fact successfully deleted from Elastic", zap.Int64("factId", f.ID))
}

zap.L().Info("Elastic document purge completed", zap.Time("timestamp", t), zap.Int("processed_facts", len(factIds)))
}
16 changes: 11 additions & 5 deletions internals/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@ type InternalJob interface {
}

var jobTypes = map[string]struct{}{
"fact": {},
"baseline": {},
"compact": {},
"purge": {},
"fact": {},
"baseline": {},
"compact": {},
"purge": {},
"elastic_doc_purge": {},
}

// InternalSchedule wrap a schedule
type InternalSchedule struct {
ID int64 `json:"id"`
Name string `json:"name"`
CronExpr string `json:"cronexpr" example:"0 */15 * * *"`
JobType string `json:"jobtype" enums:"fact,baseline,compact,purge"`
JobType string `json:"jobtype" enums:"fact,baseline,compact,purge,elastic_doc_purge"`
Job InternalJob `json:"job"`
Enabled bool `json:"enabled"`
}
Expand Down Expand Up @@ -103,6 +104,11 @@ func UnmarshalInternalJob(t string, b json.RawMessage, scheduleID int64) (Intern
err = json.Unmarshal(b, &tJob)
tJob.ScheduleID = scheduleID
job = tJob
case "elastic_doc_purge":
var tJob ElasticDocPurgeJob
err = json.Unmarshal(b, &tJob)
tJob.ScheduleID = scheduleID
job = tJob

default:
zap.L().Error("unknown internal job type", zap.String("type", t))
Expand Down

0 comments on commit f4cf050

Please sign in to comment.