Skip to content

Commit

Permalink
address Amit and Owais's comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Jun 13, 2023
1 parent 3843f4d commit 9816d74
Show file tree
Hide file tree
Showing 22 changed files with 173 additions and 96 deletions.
6 changes: 3 additions & 3 deletions src/main/java/org/opensearch/ad/caching/PriorityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private void addIntoInactiveCache(ModelState<EntityModel> removed) {
private void addEntity(List<Entity> destination, Entity entity, String detectorId) {
// It's possible our doorkeepr prevented the entity from entering inactive entities cache
if (entity != null) {
Optional<String> modelId = entity.getModelId(detectorId);
Optional<String> modelId = entity.getDetectorModelId(detectorId);
if (modelId.isPresent() && inActiveEntities.getIfPresent(modelId.get()) != null) {
destination.add(entity);
}
Expand Down Expand Up @@ -377,7 +377,7 @@ public Pair<List<Entity>, List<Entity>> selectUpdateCandidate(
// thread safe as each detector has one thread at one time and only the
// thread can access its buffer.
Entity entity = cacheMissEntitiesIter.next();
Optional<String> modelId = entity.getModelId(detectorId);
Optional<String> modelId = entity.getDetectorModelId(detectorId);

if (false == modelId.isPresent()) {
continue;
Expand Down Expand Up @@ -412,7 +412,7 @@ public Pair<List<Entity>, List<Entity>> selectUpdateCandidate(
// If two threads try to remove the same entity and add their own state, the 2nd remove
// returns null and only the first one succeeds.
Entity entity = cacheMissEntitiesIter.next();
Optional<String> modelId = entity.getModelId(detectorId);
Optional<String> modelId = entity.getDetectorModelId(detectorId);

if (false == modelId.isPresent()) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class ADCommonMessages {
public static String HISTORICAL_ANALYSIS_CANCELLED = "Historical analysis cancelled by user";
public static String HC_DETECTOR_TASK_IS_UPDATING = "HC detector task is updating";
public static String INVALID_TIME_CONFIGURATION_UNITS = "Time unit %s is not supported";
public static String DUPLICATE_FEATURE_AGGREGATION_NAMES = "Detector has duplicate feature aggregation query names: ";
public static String FAIL_TO_GET_DETECTOR = "Fail to get detector";
public static String FAIL_TO_GET_DETECTOR_INFO = "Fail to get detector info";
public static String FAIL_TO_CREATE_DETECTOR = "Fail to create detector";
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/model/ADTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ public Entity getEntity() {
}

public String getEntityModelId() {
return entity == null ? null : entity.getModelId(getId()).orElse(null);
return entity == null ? null : entity.getDetectorModelId(getId()).orElse(null);
}

public String getParentTaskId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ public Entity getEntity() {
}

public Optional<String> getModelId() {
return entity.getModelId(detectorId);
return entity.getDetectorModelId(detectorId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void doExecute(Task task, EntityProfileRequest request, ActionListener

String adID = request.getAdID();
Entity entityValue = request.getEntityValue();
Optional<String> modelIdOptional = entityValue.getModelId(adID);
Optional<String> modelIdOptional = entityValue.getDetectorModelId(adID);
if (false == modelIdOptional.isPresent()) {
listener.onFailure(new TimeSeriesException(adID, NO_MODEL_ID_FOUND_MSG));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private ActionListener<Optional<AnomalyDetector>> onGetDetector(
.createSingleAttributeEntity(detector.getCategoryFields().get(0), attrValues.get(ADCommonName.EMPTY_FIELD));
}

Optional<String> modelIdOptional = categoricalValues.getModelId(detectorId);
Optional<String> modelIdOptional = categoricalValues.getDetectorModelId(detectorId);
if (false == modelIdOptional.isPresent()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static String getTooManyCategoricalFieldErr(int limit) {
public static final String FEATURE_WITH_INVALID_QUERY_MSG = FEATURE_INVALID_MSG_PREFIX + " causing a runtime exception: ";
public static final String UNKNOWN_SEARCH_QUERY_EXCEPTION_MSG =
"Feature has an unknown exception caught while executing the feature query: ";
public static String DUPLICATE_FEATURE_AGGREGATION_NAMES = "Config has duplicate feature aggregation query names: ";

// ======================================
// Index message
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/timeseries/model/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ public static Config parseConfig(Class<? extends Config> configClass, XContentPa
} else if (configClass == Forecaster.class) {
return Forecaster.parse(parser);
} else {
throw new IllegalArgumentException("Unsupported config type");
throw new IllegalArgumentException("Unsupported config type. Supported config types are [AnomalyDetector, Forecaster]");
}
}
}
32 changes: 20 additions & 12 deletions src/main/java/org/opensearch/timeseries/model/Entity.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private static String normalizedAttributes(SortedMap<String, String> attributes)
}

/**
* Create model Id out of detector Id and attribute name and value pairs
* Create model Id out of config Id and attribute name and value pairs
*
* HCAD v1 uses the categorical value as part of the model document Id, but
* OpenSearch’s document Id can be at most 512 bytes. Categorical values are
Expand All @@ -240,31 +240,33 @@ private static String normalizedAttributes(SortedMap<String, String> attributes)
* that git uses) and use the hash as part of the model document Id.
*
* We have choices to make regarding when to use the hash as part of a model
* document Id: for all HC detectors or a HC detector with multiple categorical
* document Id: for all HC detectors or an HC detector with multiple categorical
* fields. The challenge lies in providing backward compatibility of looking for
* a model checkpoint in the case of a HC detector with one categorical field.
* a model checkpoint in the case of an HC detector with one categorical field.
* If using hashes for all HC detectors, we need two get requests to ensure that
* a model checkpoint exists. One uses the document Id without a hash, while one
* uses the document Id with a hash. The dual get requests are ineffective. If
* limiting hashes to a HC detector with multiple categorical fields, there is
* no backward compatibility issue. However, the code will be branchy. One may
* wonder if backward compatibility can be ignored; indeed, the old checkpoints
* will be gone after a transition period during upgrading. During the transition
* period, HC detectors can experience unnecessary cold starts as if the
* period, HC detector can experience unnecessary cold starts as if the
* detectors were just started. Checkpoint index size can double if every model
* has two model documents. The transition period can be as long as 3 days since
* our checkpoint retention period is 3 days. There is no perfect solution. We
* prefer limiting hashes to an HC detector with multiple categorical fields as
* its customer impact is none.
*
* @param detectorId Detector Id
* @param configId config Id
* @param attributes Attributes of an entity
* @param usedForAD Note we only need bwc for AD. Forecasting will
* use hashes irregardless of the number of categorical fields.
* @return the model Id
*/
public static Optional<String> getModelId(String detectorId, SortedMap<String, String> attributes) {
private static Optional<String> getModelId(String configId, SortedMap<String, String> attributes, boolean usedForAD) {
if (attributes.isEmpty()) {
return Optional.empty();
} else if (attributes.size() == 1) {
} else if (usedForAD && attributes.size() == 1) {
for (Map.Entry<String, String> categoryValuePair : attributes.entrySet()) {
// For OpenSearch, the limit of the document ID is 512 bytes.
// skip an entity if the entity's name is more than 256 characters
Expand All @@ -273,7 +275,7 @@ public static Optional<String> getModelId(String detectorId, SortedMap<String, S
if (categoricalValue.length() > AnomalyDetectorSettings.MAX_ENTITY_LENGTH) {
return Optional.empty();
}
return Optional.of(detectorId + MODEL_ID_INFIX + categoricalValue);
return Optional.of(configId + MODEL_ID_INFIX + categoricalValue);
}
return Optional.empty();
} else {
Expand All @@ -291,20 +293,22 @@ public static Optional<String> getModelId(String detectorId, SortedMap<String, S
System.arraycopy(Numbers.longToBytes(hashFunc.h1), 0, bytes, 0, 8);
System.arraycopy(Numbers.longToBytes(hashFunc.h2), 0, bytes, 8, 8);
// Some bytes like 10 in ascii is corrupted in some systems. Base64 ensures we use safe bytes: https://tinyurl.com/mxmrhmhf
return Optional.of(detectorId + MODEL_ID_INFIX + Base64.getUrlEncoder().withoutPadding().encodeToString(bytes));
return Optional.of(configId + MODEL_ID_INFIX + Base64.getUrlEncoder().withoutPadding().encodeToString(bytes));
}
}

/**
* Get the cached model Id if present. Or recompute one if missing.
*
* @param detectorId Detector Id. Used as part of model Id.
* @param configId Id. Used as part of model Id.
* @param isDetectorId Boolean value representing whether the id is a Detector Id. We need to pay attention to
* bwc if it is used for AD.
* @return Model Id. Can be missing (e.g., the field value is too long for single-category detector)
*/
public Optional<String> getModelId(String detectorId) {
public Optional<String> getModelId(String configId, boolean isDetectorId) {
if (modelId.get() == null) {
// computing model id is not cheap and the result is deterministic. We only do it once.
Optional<String> computedModelId = Entity.getModelId(detectorId, attributes);
Optional<String> computedModelId = Entity.getModelId(configId, attributes, isDetectorId);
if (computedModelId.isPresent()) {
this.modelId.set(computedModelId.get());
} else {
Expand All @@ -314,6 +318,10 @@ public Optional<String> getModelId(String detectorId) {
return Optional.ofNullable(modelId.get());
}

public Optional<String> getDetectorModelId(String detectorId) {
return getModelId(detectorId, true);
}

public Map<String, String> getAttributes() {
return attributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.bytes.BytesReference;
Expand All @@ -46,6 +44,7 @@
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.Feature;

Expand Down Expand Up @@ -85,7 +84,7 @@ public final class RestHandlerUtils {
public static final ToXContent.MapParams XCONTENT_WITH_TYPE = new ToXContent.MapParams(ImmutableMap.of("with_type", "true"));

public static final String OPENSEARCH_DASHBOARDS_USER_AGENT = "OpenSearch Dashboards";
public static final String[] UI_METADATA_EXCLUDE = new String[] { AnomalyDetector.UI_METADATA_FIELD };
public static final String[] UI_METADATA_EXCLUDE = new String[] { Config.UI_METADATA_FIELD };

public static final String FORECASTER_ID = "forecasterID";
public static final String FORECASTER = "forecaster";
Expand Down Expand Up @@ -168,14 +167,14 @@ private static String validateFeaturesConfig(List<Feature> features) {

StringBuilder errorMsgBuilder = new StringBuilder();
if (duplicateFeatureNames.size() > 0) {
errorMsgBuilder.append("Detector has duplicate feature names: ");
errorMsgBuilder.append("There are duplicate feature names: ");
errorMsgBuilder.append(String.join(", ", duplicateFeatureNames));
}
if (errorMsgBuilder.length() != 0 && duplicateFeatureAggNames.size() > 0) {
errorMsgBuilder.append(". ");
}
if (duplicateFeatureAggNames.size() > 0) {
errorMsgBuilder.append(ADCommonMessages.DUPLICATE_FEATURE_AGGREGATION_NAMES);
errorMsgBuilder.append(CommonMessages.DUPLICATE_FEATURE_AGGREGATION_NAMES);
errorMsgBuilder.append(String.join(", ", duplicateFeatureAggNames));
}
return errorMsgBuilder.toString();
Expand All @@ -198,9 +197,9 @@ public static boolean isExceptionCausedByInvalidQuery(Exception ex) {

/**
* Wrap action listener to avoid return verbose error message and wrong 500 error to user.
* Suggestion for exception handling in AD:
* Suggestion for exception handling in timeseries analysis (e.g., AD and Forecast):
* 1. If the error is caused by wrong input, throw IllegalArgumentException exception.
* 2. For other errors, please use AnomalyDetectionException or its subclass, or use
* 2. For other errors, please use TimeSeriesException or its subclass, or use
* OpenSearchStatusException.
*
* TODO: tune this function for wrapped exception, return root exception error message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void setUp() throws Exception {
}).when(client).get(any(), any());

entity = Entity.createSingleAttributeEntity(categoryField, entityValue);
modelId = entity.getModelId(detectorId).get();
modelId = entity.getDetectorModelId(detectorId).get();
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ public void setUp() throws Exception {
entity2 = Entity.createSingleAttributeEntity("attributeName1", "attributeVal2");
entity3 = Entity.createSingleAttributeEntity("attributeName1", "attributeVal3");
entity4 = Entity.createSingleAttributeEntity("attributeName1", "attributeVal4");
modelId1 = entity1.getModelId(detectorId).get();
modelId2 = entity2.getModelId(detectorId).get();
modelId3 = entity3.getModelId(detectorId).get();
modelId4 = entity4.getModelId(detectorId).get();
modelId1 = entity1.getDetectorModelId(detectorId).get();
modelId2 = entity2.getDetectorModelId(detectorId).get();
modelId3 = entity3.getDetectorModelId(detectorId).get();
modelId4 = entity4.getDetectorModelId(detectorId).get();

clock = mock(Clock.class);
when(clock.instant()).thenReturn(Instant.now());
Expand Down
Loading

0 comments on commit 9816d74

Please sign in to comment.