Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Mar 29, 2024
1 parent 3034784 commit 7306330
Show file tree
Hide file tree
Showing 605 changed files with 36,950 additions and 18,228 deletions.
46 changes: 46 additions & 0 deletions src/main/java/org/opensearch/ad/ADEntityProfileRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.ad;

import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.settings.ADNumericSetting;
import org.opensearch.ad.transport.ADEntityProfileAction;
import org.opensearch.client.Client;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.EntityProfileRunner;
import org.opensearch.timeseries.util.SecurityClientUtil;

public class ADEntityProfileRunner extends EntityProfileRunner<ADEntityProfileAction> {

public ADEntityProfileRunner(
Client client,
SecurityClientUtil clientUtil,
NamedXContentRegistry xContentRegistry,
long requiredSamples
) {
super(
client,
clientUtil,
xContentRegistry,
requiredSamples,
AnomalyDetector::parse,
ADNumericSetting.maxCategoricalFields(),
AnalysisType.AD,
ADEntityProfileAction.INSTANCE,
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
AnomalyResult.DETECTOR_ID_FIELD
);
}
}
98 changes: 98 additions & 0 deletions src/main/java/org/opensearch/ad/ADJobProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad;

import java.time.Instant;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.rest.handler.ADIndexJobActionHandler;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.ADProfileAction;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.jobscheduler.spi.LockModel;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.JobProcessor;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.transport.ResultRequest;

public class ADJobProcessor extends
JobProcessor<ADIndex, ADIndexManagement, ADTaskCacheManager, ADTaskType, ADTask, ADTaskManager, AnomalyResult, ADProfileAction, ExecuteADResultResponseRecorder, ADIndexJobActionHandler> {

private static final Logger log = LogManager.getLogger(ADJobProcessor.class);

private static ADJobProcessor INSTANCE;

public static ADJobProcessor getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (ADJobProcessor.class) {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new ADJobProcessor();
return INSTANCE;
}
}

private ADJobProcessor() {
// Singleton class, use getJobRunnerInstance method instead of constructor
super(AnalysisType.AD, TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME, AnomalyResultAction.INSTANCE);
}

public void registerSettings(Settings settings) {
super.registerSettings(settings, AnomalyDetectorSettings.AD_MAX_RETRY_FOR_END_RUN_EXCEPTION);
}

@Override
protected ResultRequest createResultRequest(String configId, long start, long end) {
return new AnomalyResultRequest(configId, start, end);
}

@Override
protected void validateResultIndexAndRunJob(
Job jobParameter,
LockService lockService,
LockModel lock,
Instant executionStartTime,
Instant executionEndTime,
String configId,
String user,
List<String> roles,
ExecuteADResultResponseRecorder recorder,
Config detector
) {
String resultIndex = jobParameter.getCustomResultIndex();
if (resultIndex == null) {
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector);
return;
}
ActionListener<Boolean> listener = ActionListener.wrap(r -> { log.debug("Custom index is valid"); }, e -> {
Exception exception = new EndRunException(configId, e.getMessage(), false);
handleException(jobParameter, lockService, lock, executionStartTime, executionEndTime, exception, recorder, detector);
});
indexManagement.validateCustomIndexForBackendJob(resultIndex, configId, user, roles, () -> {
listener.onResponse(true);
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector);
}, listener);
}
}
85 changes: 85 additions & 0 deletions src/main/java/org/opensearch/ad/ADTaskProfileRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad;

import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskProfile;
import org.opensearch.ad.transport.ADTaskProfileAction;
import org.opensearch.ad.transport.ADTaskProfileNodeResponse;
import org.opensearch.ad.transport.ADTaskProfileRequest;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.timeseries.TaskProfileRunner;
import org.opensearch.timeseries.cluster.HashRing;
import org.opensearch.timeseries.model.EntityTaskProfile;

public class ADTaskProfileRunner implements TaskProfileRunner<ADTask, ADTaskProfile> {
public final Logger logger = LogManager.getLogger(ADTaskProfileRunner.class);

private final HashRing hashRing;
private final Client client;

public ADTaskProfileRunner(HashRing hashRing, Client client) {
this.hashRing = hashRing;
this.client = client;
}

@Override
public void getTaskProfile(ADTask configLevelTask, ActionListener<ADTaskProfile> listener) {
String detectorId = configLevelTask.getConfigId();

hashRing.getAllEligibleDataNodesWithKnownVersion(dataNodes -> {
ADTaskProfileRequest adTaskProfileRequest = new ADTaskProfileRequest(detectorId, dataNodes);
client.execute(ADTaskProfileAction.INSTANCE, adTaskProfileRequest, ActionListener.wrap(response -> {
if (response.hasFailures()) {
listener.onFailure(response.failures().get(0));
return;
}

List<EntityTaskProfile> adEntityTaskProfiles = new ArrayList<>();
ADTaskProfile detectorTaskProfile = new ADTaskProfile(configLevelTask);
for (ADTaskProfileNodeResponse node : response.getNodes()) {
ADTaskProfile taskProfile = node.getAdTaskProfile();
if (taskProfile != null) {
if (taskProfile.getNodeId() != null) {
// HC detector: task profile from coordinating node
// Single entity detector: task profile from worker node
detectorTaskProfile.setTaskId(taskProfile.getTaskId());
detectorTaskProfile.setRcfTotalUpdates(taskProfile.getRcfTotalUpdates());
detectorTaskProfile.setThresholdModelTrained(taskProfile.getThresholdModelTrained());
detectorTaskProfile.setThresholdModelTrainingDataSize(taskProfile.getThresholdModelTrainingDataSize());
detectorTaskProfile.setModelSizeInBytes(taskProfile.getModelSizeInBytes());
detectorTaskProfile.setNodeId(taskProfile.getNodeId());
detectorTaskProfile.setTotalEntitiesCount(taskProfile.getTotalEntitiesCount());
detectorTaskProfile.setDetectorTaskSlots(taskProfile.getDetectorTaskSlots());
detectorTaskProfile.setPendingEntitiesCount(taskProfile.getPendingEntitiesCount());
detectorTaskProfile.setRunningEntitiesCount(taskProfile.getRunningEntitiesCount());
detectorTaskProfile.setRunningEntities(taskProfile.getRunningEntities());
detectorTaskProfile.setTaskType(taskProfile.getTaskType());
}
if (taskProfile.getEntityTaskProfiles() != null) {
adEntityTaskProfiles.addAll(taskProfile.getEntityTaskProfiles());
}
}
}
if (adEntityTaskProfiles != null && adEntityTaskProfiles.size() > 0) {
detectorTaskProfile.setEntityTaskProfiles(adEntityTaskProfiles);
}
listener.onResponse(detectorTaskProfile);
}, e -> {
logger.error("Failed to get task profile for task " + configLevelTask.getTaskId(), e);
listener.onFailure(e);
}));
}, listener);

}

}
Loading

0 comments on commit 7306330

Please sign in to comment.