Skip to content

Commit

Permalink
Refactor: Reorganize code for Forecasting and AnomalyDetector packages
Browse files Browse the repository at this point in the history
This PR includes several refactorings to increase code reuse and improve readability:

1. Entity, FeatureData, and RestHandlerUtils have been moved from the 'ad' package to the 'timeseries' package, enabling their use within the Forecasting code.
2. A new class, DataByFeatureId, has been created to hold shared code previously located in FeatureData. FeatureData now inherits from DataByFeatureId. The standalone usage of DataByFeatureId will be detailed in subsequent PRs.
3. Renamed checkAnomalyDetectorFeaturesSyntax to checkFeaturesSyntax in RestHandlerUtils to make it more generic and usable for Forecasting.
4. Constants AGG_NAME_MAX_TIME, AGG_NAME_MIN_TIME, DATE_HISTOGRAM, and FEATURE_AGGS have been moved from ADCommonName to CommonName to make them accessible for Forecasting.
5. A new method, parseConfig, has been added to the Config class. This method can parse configuration for either AnomalyDetector or Forecaster based on input.

Testing:
The changes have been validated with a successful gradle build.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Jun 12, 2023
1 parent 02c120f commit 3843f4d
Show file tree
Hide file tree
Showing 114 changed files with 409 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.ad.AnomalyDetectorPlugin.AD_THREAD_POOL_NAME;
import static org.opensearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE;

import java.io.IOException;
import java.time.Instant;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.EntityAnomalyResult;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.model.FeatureData;

/**
* Runner to trigger an anomaly detector.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.EntityProfile;
import org.opensearch.ad.model.EntityProfileName;
import org.opensearch.ad.model.EntityState;
Expand All @@ -56,6 +55,7 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.ParseUtils;

Expand Down Expand Up @@ -460,7 +460,7 @@ private SearchRequest createLastSampleTimeRequest(String detectorId, long enable

SearchSourceBuilder source = new SearchSourceBuilder()
.query(boolQueryBuilder)
.aggregation(AggregationBuilders.max(ADCommonName.AGG_NAME_MAX_TIME).field(CommonName.EXECUTION_END_TIME_FIELD))
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(CommonName.EXECUTION_END_TIME_FIELD))
.trackTotalHits(false)
.size(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultResponse;
Expand All @@ -49,6 +48,7 @@
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;

public class ExecuteADResultResponseRecorder {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/caching/EntityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.timeseries.model.Entity;

public interface EntityCache extends MaintenanceState, CleanState, DetectorModelSize {
/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/caching/PriorityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.ad.ml.ModelManager.ModelType;
import org.opensearch.ad.ml.ModelState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.ad.ratelimit.CheckpointMaintainWorker;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
Expand All @@ -62,6 +61,7 @@
import org.opensearch.timeseries.common.exception.LimitExceededException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.settings.TimeSeriesSettings;

import com.google.common.cache.Cache;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import static org.opensearch.ad.model.ADTask.TASK_TYPE_FIELD;
import static org.opensearch.ad.model.ADTaskType.taskTypeToString;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_DETECTOR_UPPER_LIMIT;
import static org.opensearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
import static org.opensearch.ad.util.RestHandlerUtils.createXContentParserFromRegistry;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

import java.io.IOException;
import java.time.Instant;
Expand Down
12 changes: 0 additions & 12 deletions src/main/java/org/opensearch/ad/constant/ADCommonName.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,6 @@ public class ADCommonName {
public static final String AD_TASK_REMOTE = "ad_task_remote";
public static final String CANCEL_TASK = "cancel_task";

// ======================================
// Query
// ======================================
// Used in finding the max timestamp
public static final String AGG_NAME_MAX_TIME = "max_timefield";
// Used in finding the min timestamp
public static final String AGG_NAME_MIN_TIME = "min_timefield";
// date histogram aggregation name
public static final String DATE_HISTOGRAM = "date_histogram";
// feature aggregation name
public static final String FEATURE_AGGS = "feature_aggs";

// ======================================
// Used in stats API
// ======================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -47,6 +46,7 @@
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.util.ParseUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.ad.CleanState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.dataprocessor.Imputer;
import org.opensearch.timeseries.model.Entity;

/**
* A facade managing feature data operations and buffers.
Expand Down
9 changes: 4 additions & 5 deletions src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
package org.opensearch.ad.feature;

import static org.apache.commons.math3.linear.MatrixUtils.createRealMatrix;
import static org.opensearch.ad.constant.ADCommonName.DATE_HISTOGRAM;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_ENTITIES_FOR_PREVIEW;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.PAGE_SIZE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.PREVIEW_TIMEOUT_IN_MILLIS;
Expand Down Expand Up @@ -40,9 +39,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -69,7 +66,9 @@
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.dataprocessor.Imputer;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.ParseUtils;

Expand Down Expand Up @@ -169,7 +168,7 @@ public SearchFeatureDao(
*/
public void getLatestDataTime(AnomalyDetector detector, ActionListener<Optional<Long>> listener) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.max(ADCommonName.AGG_NAME_MAX_TIME).field(detector.getTimeField()))
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(detector.getTimeField()))
.size(0);
SearchRequest searchRequest = new SearchRequest().indices(detector.getIndices().toArray(new String[0])).source(searchSourceBuilder);
final ActionListener<SearchResponse> searchResponseListener = ActionListener
Expand Down Expand Up @@ -569,7 +568,7 @@ private Map<Long, Optional<double[]>> parseBucketAggregationResponse(SearchRespo
List<InternalComposite.InternalBucket> buckets = ((InternalComposite) agg).getBuckets();
buckets.forEach(bucket -> {
Optional<double[]> featureData = parseAggregations(Optional.ofNullable(bucket.getAggregations()), featureIds);
dataPoints.put((Long) bucket.getKey().get(DATE_HISTOGRAM), featureData);
dataPoints.put((Long) bucket.getKey().get(CommonName.DATE_HISTOGRAM), featureData);
});
}
return dataPoints;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.client.Client;
import org.opensearch.index.IndexNotFoundException;
Expand All @@ -69,6 +68,7 @@
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;

import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.config.Precision;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ml/EntityColdStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.feature.SearchFeatureDao;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.ratelimit.RequestPriority;
import org.opensearch.ad.settings.ADEnabledSetting;
Expand All @@ -55,6 +54,7 @@
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.dataprocessor.Imputer;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.settings.TimeSeriesSettings;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ml/EntityModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.Optional;
import java.util.Queue;

import org.opensearch.ad.model.Entity;
import org.opensearch.timeseries.model.Entity;

import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ml/ModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.settings.TimeSeriesSettings;

import com.amazon.randomcutforest.RandomCutForest;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/ml/ThresholdingResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.commons.lang.builder.ToStringBuilder;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.FeatureData;

/**
* Data object containing thresholding results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.timeseries.model.Entity;

/**
* HC detector's entity task profile.
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/opensearch/ad/model/ADTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.timeseries.annotation.Generated;
import org.opensearch.timeseries.model.DateRange;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.util.ParseUtils;

import com.google.common.base.Objects;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/ad/model/AnomalyResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.timeseries.annotation.Generated;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.util.ParseUtils;

import com.google.common.base.Objects;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/opensearch/ad/model/ModelProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;

/**
* Used to show model information in profile API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.stats.ADStats;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -56,6 +55,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.timeseries.util.ParseUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.opensearch.ad.ratelimit;

import org.opensearch.ad.model.Entity;
import org.opensearch.timeseries.model.Entity;

public class EntityFeatureRequest extends EntityRequest {
private final double[] currentFeature;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import java.util.Optional;

import org.opensearch.ad.model.Entity;
import org.opensearch.timeseries.model.Entity;

public class EntityRequest extends QueuedRequest {
private final Entity entity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

package org.opensearch.ad.rest;

import static org.opensearch.ad.util.RestHandlerUtils.getSourceContext;
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.timeseries.util.RestHandlerUtils.getSourceContext;

import java.io.IOException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
package org.opensearch.ad.rest;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static org.opensearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.ad.util.RestHandlerUtils.IF_PRIMARY_TERM;
import static org.opensearch.ad.util.RestHandlerUtils.IF_SEQ_NO;
import static org.opensearch.ad.util.RestHandlerUtils.START_JOB;
import static org.opensearch.ad.util.RestHandlerUtils.STOP_JOB;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.timeseries.util.RestHandlerUtils.IF_PRIMARY_TERM;
import static org.opensearch.timeseries.util.RestHandlerUtils.IF_SEQ_NO;
import static org.opensearch.timeseries.util.RestHandlerUtils.START_JOB;
import static org.opensearch.timeseries.util.RestHandlerUtils.STOP_JOB;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.opensearch.ad.rest;

import static org.opensearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.timeseries.util.RestHandlerUtils.DETECTOR_ID;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
package org.opensearch.ad.rest;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static org.opensearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.ad.util.RestHandlerUtils.RUN;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.timeseries.util.RestHandlerUtils.RUN;

import java.io.IOException;
import java.util.List;
Expand Down
Loading

0 comments on commit 3843f4d

Please sign in to comment.