diff --git a/src/main/java/org/opensearch/ad/constant/ADResourceScope.java b/src/main/java/org/opensearch/ad/constant/ADResourceScope.java new file mode 100644 index 000000000..3344b79f4 --- /dev/null +++ b/src/main/java/org/opensearch/ad/constant/ADResourceScope.java @@ -0,0 +1,18 @@ +package org.opensearch.ad.constant; + +import org.opensearch.accesscontrol.resources.ResourceAccessScope; + +public enum ADResourceScope implements ResourceAccessScope { + AD_READ_ACCESS("ad_read_access"), + AD_FULL_ACCESS("ad_full_access"); + + private final String scopeName; + + ADResourceScope(String scopeName) { + this.scopeName = scopeName; + } + + public String getScopeName() { + return scopeName; + } +} diff --git a/src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorTransportAction.java index 5d9b69910..aea7d47ae 100644 --- a/src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorTransportAction.java @@ -14,8 +14,7 @@ import static org.opensearch.ad.constant.ADCommonMessages.FAIL_TO_CREATE_DETECTOR; import static org.opensearch.ad.constant.ADCommonMessages.FAIL_TO_UPDATE_DETECTOR; import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_FILTER_BY_BACKEND_ROLES; -import static org.opensearch.timeseries.util.ParseUtils.checkFilterByBackendRoles; -import static org.opensearch.timeseries.util.ParseUtils.getConfig; +import static org.opensearch.timeseries.util.ParseUtils.*; import static org.opensearch.timeseries.util.RestHandlerUtils.wrapRestActionListener; import java.util.List; @@ -45,7 +44,6 @@ import org.opensearch.rest.RestRequest; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.Task; -import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.feature.SearchFeatureDao; import org.opensearch.timeseries.function.ExecutorFunction; import org.opensearch.timeseries.util.ParseUtils; @@ -100,7 +98,7 @@ protected void doExecute(Task task, IndexAnomalyDetectorRequest request, ActionL String errorMessage = method == RestRequest.Method.PUT ? FAIL_TO_UPDATE_DETECTOR : FAIL_TO_CREATE_DETECTOR; ActionListener listener = wrapRestActionListener(actionListener, errorMessage); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - resolveUserAndExecute(user, detectorId, method, listener, (detector) -> adExecute(request, user, detector, context, listener)); + resolveUserAndExecute(detectorId, method, listener, (detector) -> adExecute(request, user, detector, context, listener)); } catch (Exception e) { LOG.error(e); listener.onFailure(e); @@ -108,40 +106,17 @@ protected void doExecute(Task task, IndexAnomalyDetectorRequest request, ActionL } private void resolveUserAndExecute( - User requestedUser, String detectorId, RestRequest.Method method, ActionListener listener, Consumer function ) { try { - // Check if user has backend roles - // When filter by is enabled, block users creating/updating detectors who do not have backend roles. - if (filterByEnabled) { - String error = checkFilterByBackendRoles(requestedUser); - if (error != null) { - listener.onFailure(new TimeSeriesException(error)); - return; - } - } + if (method == RestRequest.Method.PUT) { - // requestedUser == null means security is disabled or user is superadmin. In this case we don't need to - // check if request user have access to the detector or not. But we still need to get current detector for - // this case, so we can keep current detector's user data. - boolean filterByBackendRole = requestedUser == null ? false : filterByEnabled; // Update detector request, check if user has permissions to update the detector // Get detector and verify backend roles - getConfig( - requestedUser, - detectorId, - listener, - function, - client, - clusterService, - xContentRegistry, - filterByBackendRole, - AnomalyDetector.class - ); + getConfig(detectorId, listener, function, client, clusterService, xContentRegistry, AnomalyDetector.class); } else { // Create Detector. No need to get current detector. function.accept(null); @@ -175,6 +150,8 @@ protected void adExecute( checkIndicesAndExecute(detector.getIndices(), () -> { // Don't replace detector's user when update detector // Github issue: https://github.com/opensearch-project/anomaly-detection/issues/124 + // TODO this and similar code should be updated to remove reference to a user + User detectorUser = currentDetector == null ? user : currentDetector.getUser(); IndexAnomalyDetectorActionHandler indexAnomalyDetectorActionHandler = new IndexAnomalyDetectorActionHandler( clusterService, @@ -201,6 +178,11 @@ protected void adExecute( ); indexAnomalyDetectorActionHandler.start(listener); }, listener); + + // This call was added to ensure that existing functionality of sharing the resource via backend_role exists + // TODO 3.0 and later the following must be removed and a new REST API where user must explicitly share the detector should be + // exposed + shareResourceWithBackendRoles(detectorId, user, listener); } private void checkIndicesAndExecute( diff --git a/src/main/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportAction.java index ef82c43b2..909cc51f4 100644 --- a/src/main/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportAction.java @@ -43,7 +43,6 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.commons.authuser.User; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -55,7 +54,6 @@ import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonName; -import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.RestHandlerUtils; import org.opensearch.transport.TransportService; @@ -103,13 +101,10 @@ protected void doExecute( ActionListener actionListener ) { String detectorId = request.getId(); - User user = ParseUtils.getUserContext(client); ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_PREVIEW_DETECTOR); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute( - user, detectorId, - filterByEnabled, listener, (anomalyDetector) -> previewExecute(request, context, listener), client, diff --git a/src/main/java/org/opensearch/forecast/transport/ForecastRunOnceTransportAction.java b/src/main/java/org/opensearch/forecast/transport/ForecastRunOnceTransportAction.java index f157c4d6f..e7f4f4e9d 100644 --- a/src/main/java/org/opensearch/forecast/transport/ForecastRunOnceTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/ForecastRunOnceTransportAction.java @@ -32,7 +32,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.commons.authuser.User; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.forecast.constant.ForecastCommonMessages; @@ -71,7 +70,6 @@ import org.opensearch.timeseries.stats.StatNames; import org.opensearch.timeseries.task.TaskCacheManager; import org.opensearch.timeseries.transport.ResultProcessor; -import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.SecurityClientUtil; import org.opensearch.transport.TransportService; @@ -154,13 +152,10 @@ public ForecastRunOnceTransportAction( @Override protected void doExecute(Task task, ForecastResultRequest request, ActionListener listener) { String forecastID = request.getConfigId(); - User user = ParseUtils.getUserContext(client); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute( - user, forecastID, - filterByEnabled, listener, (forecaster) -> executeRunOnce(forecastID, request, listener), client, diff --git a/src/main/java/org/opensearch/forecast/transport/IndexForecasterTransportAction.java b/src/main/java/org/opensearch/forecast/transport/IndexForecasterTransportAction.java index c9bc28b72..ff5c1610b 100644 --- a/src/main/java/org/opensearch/forecast/transport/IndexForecasterTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/IndexForecasterTransportAction.java @@ -14,9 +14,7 @@ import static org.opensearch.forecast.constant.ForecastCommonMessages.FAIL_TO_CREATE_FORECASTER; import static org.opensearch.forecast.constant.ForecastCommonMessages.FAIL_TO_UPDATE_FORECASTER; import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_FILTER_BY_BACKEND_ROLES; -import static org.opensearch.timeseries.util.ParseUtils.checkFilterByBackendRoles; -import static org.opensearch.timeseries.util.ParseUtils.getConfig; -import static org.opensearch.timeseries.util.ParseUtils.getUserContext; +import static org.opensearch.timeseries.util.ParseUtils.*; import static org.opensearch.timeseries.util.RestHandlerUtils.wrapRestActionListener; import java.util.List; @@ -100,7 +98,6 @@ protected void doExecute(Task task, IndexForecasterRequest request, ActionListen ActionListener listener = wrapRestActionListener(actionListener, errorMessage); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute( - user, forecasterId, method, listener, @@ -113,41 +110,16 @@ protected void doExecute(Task task, IndexForecasterRequest request, ActionListen } private void resolveUserAndExecute( - User requestedUser, String forecasterId, RestRequest.Method method, ActionListener listener, Consumer function ) { try { - // requestedUser == null means security is disabled or user is superadmin. In this case we don't need to - // check if request user have access to the forecaster or not. But we still need to get current forecaster for - // this case, so we can keep current forecaster's user data. - boolean filterByBackendRole = requestedUser == null ? false : filterByEnabled; - - // Check if user has backend roles - // When filter by is enabled, block users creating/updating detectors who do not have backend roles. - if (filterByEnabled) { - String error = checkFilterByBackendRoles(requestedUser); - if (error != null) { - listener.onFailure(new IllegalArgumentException(error)); - return; - } - } if (method == RestRequest.Method.PUT) { // Update forecaster request, check if user has permissions to update the forecaster // Get forecaster and verify backend roles - getConfig( - requestedUser, - forecasterId, - listener, - function, - client, - clusterService, - xContentRegistry, - filterByBackendRole, - Forecaster.class - ); + getConfig(forecasterId, listener, function, client, clusterService, xContentRegistry, Forecaster.class); } else { // Create Detector. No need to get current detector. function.accept(null); diff --git a/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java b/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java index 719ec88ac..a870c4375 100644 --- a/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java @@ -42,6 +42,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.SpecialPermission; +import org.opensearch.accesscontrol.resources.ResourceService; import org.opensearch.action.ActionRequest; import org.opensearch.ad.ADJobProcessor; import org.opensearch.ad.ADTaskProfileRunner; @@ -161,6 +162,10 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.Lifecycle; +import org.opensearch.common.lifecycle.LifecycleComponent; +import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; @@ -267,10 +272,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.monitor.jvm.JvmInfo; import org.opensearch.monitor.jvm.JvmService; -import org.opensearch.plugins.ActionPlugin; -import org.opensearch.plugins.Plugin; -import org.opensearch.plugins.ScriptPlugin; -import org.opensearch.plugins.SystemIndexPlugin; +import org.opensearch.plugins.*; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; @@ -291,6 +293,7 @@ import org.opensearch.timeseries.function.ThrowingSupplierWrapper; import org.opensearch.timeseries.model.Job; import org.opensearch.timeseries.ratelimit.CheckPointMaintainRequestAdapter; +import org.opensearch.timeseries.rest.RestShareConfigAction; import org.opensearch.timeseries.settings.TimeSeriesEnabledSetting; import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.stats.StatNames; @@ -300,6 +303,8 @@ import org.opensearch.timeseries.stats.suppliers.SettableSupplier; import org.opensearch.timeseries.task.TaskCacheManager; import org.opensearch.timeseries.transport.CronTransportAction; +import org.opensearch.timeseries.transport.ShareConfigAction; +import org.opensearch.timeseries.transport.ShareConfigTransportAction; import org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler; import org.opensearch.timeseries.util.ClientUtil; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; @@ -327,7 +332,13 @@ /** * Entry point of time series analytics plugin. */ -public class TimeSeriesAnalyticsPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension { +public class TimeSeriesAnalyticsPlugin extends Plugin + implements + ActionPlugin, + ScriptPlugin, + SystemIndexPlugin, + JobSchedulerExtension, + ResourcePlugin { private static final Logger LOG = LogManager.getLogger(TimeSeriesAnalyticsPlugin.class); @@ -431,6 +442,9 @@ public List getRestHandlers( RestValidateForecasterAction validateForecasterAction = new RestValidateForecasterAction(settings, clusterService); RestForecasterSuggestAction suggestForecasterParamAction = new RestForecasterSuggestAction(settings, clusterService); + // Config sharing and access control + RestShareConfigAction restShareConfigAction = new RestShareConfigAction(); + ForecastJobProcessor forecastJobRunner = ForecastJobProcessor.getInstance(); forecastJobRunner.setClient(client); forecastJobRunner.setThreadPool(threadPool); @@ -470,7 +484,9 @@ public List getRestHandlers( statsForecasterAction, runOnceForecasterAction, validateForecasterAction, - suggestForecasterParamAction + suggestForecasterParamAction, + // Config sharing and access control + restShareConfigAction ); } @@ -1703,7 +1719,8 @@ public List getNamedXContent() { new ActionHandler<>(ForecastRunOnceAction.INSTANCE, ForecastRunOnceTransportAction.class), new ActionHandler<>(ForecastRunOnceProfileAction.INSTANCE, ForecastRunOnceProfileTransportAction.class), new ActionHandler<>(ValidateForecasterAction.INSTANCE, ValidateForecasterTransportAction.class), - new ActionHandler<>(SuggestForecasterParamAction.INSTANCE, SuggestForecasterParamTransportAction.class) + new ActionHandler<>(SuggestForecasterParamAction.INSTANCE, SuggestForecasterParamTransportAction.class), + new ActionHandler<>(ShareConfigAction.INSTANCE, ShareConfigTransportAction.class) ); } @@ -1758,4 +1775,56 @@ public void close() { } } } + + @Override + public String getResourceType() { + return "detectors"; + } + + @Override + public String getResourceIndex() { + return CommonName.CONFIG_INDEX; + } + + @Override + public Collection> getGuiceServiceClasses() { + final List> services = new ArrayList<>(1); + services.add(GuiceHolder.class); + return services; + } + + public static class GuiceHolder implements LifecycleComponent { + + private static ResourceService resourceService; + + @Inject + public GuiceHolder(final ResourceService resourceService) { + GuiceHolder.resourceService = resourceService; + } + + public static ResourceService getResourceService() { + return resourceService; + } + + @Override + public void close() {} + + @Override + public Lifecycle.State lifecycleState() { + return null; + } + + @Override + public void addLifecycleListener(LifecycleListener listener) {} + + @Override + public void removeLifecycleListener(LifecycleListener listener) {} + + @Override + public void start() {} + + @Override + public void stop() {} + + } } diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonValue.java b/src/main/java/org/opensearch/timeseries/constant/CommonValue.java index 6f05f59d0..d2648bc83 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonValue.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonValue.java @@ -9,4 +9,7 @@ public class CommonValue { // unknown or no schema version public static Integer NO_SCHEMA_VERSION = 0; + // config access control + public static String CONFIG_ACCESS_CONTROL_BASE_ACTION = "cluster:admin/timeseries/config/access"; + public static String CONFIG_ACCESS_CONTROL_BASE_URI = "/_plugins/_timeseries/config/access"; } diff --git a/src/main/java/org/opensearch/timeseries/rest/RestShareConfigAction.java b/src/main/java/org/opensearch/timeseries/rest/RestShareConfigAction.java new file mode 100644 index 000000000..3072aaa88 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/rest/RestShareConfigAction.java @@ -0,0 +1,89 @@ +package org.opensearch.timeseries.rest; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.POST; +import static org.opensearch.timeseries.constant.CommonValue.CONFIG_ACCESS_CONTROL_BASE_URI; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.opensearch.accesscontrol.resources.ShareWith; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.timeseries.transport.ShareConfigAction; +import org.opensearch.timeseries.transport.ShareConfigRequest; + +/** + * Registers REST API to handle detector/forecaster sharing. + * Here is an example request: + * POST /_plugins/_anomaly_detection/detectors + * { + * "config_id" : , + * "share_with" : { + * "AD_FULL_ACCESS": { + * "users": ["x"], + * "roles": ["y"], + * "backend_roles": ["z"] + * } + * } + * } + * example response: + * { + * "message": "Resource shared successfully with " + * } + */ +public class RestShareConfigAction extends BaseRestHandler { + public RestShareConfigAction() {} + + @Override + public List routes() { + return singletonList(new Route(POST, CONFIG_ACCESS_CONTROL_BASE_URI + "/share")); + } + + @Override + public String getName() { + return "share_timeseries_config"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + Map source; + try (XContentParser parser = request.contentParser()) { + source = parser.map(); + } + + String resourceId = (String) source.get("config_id"); + + ShareWith shareWith = parseShareWith(source); + final ShareConfigRequest shareResourceRequest = new ShareConfigRequest(resourceId, shareWith); + return channel -> client.executeLocally(ShareConfigAction.INSTANCE, shareResourceRequest, new RestToXContentListener<>(channel)); + } + + private ShareWith parseShareWith(Map source) throws IOException { + @SuppressWarnings("unchecked") + Map shareWithMap = (Map) source.get("share_with"); + if (shareWithMap == null || shareWithMap.isEmpty()) { + throw new IllegalArgumentException("share_with is required and cannot be empty"); + } + + String jsonString = XContentFactory.jsonBuilder().map(shareWithMap).toString(); + + try ( + XContentParser parser = XContentType.JSON + .xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, jsonString) + ) { + return ShareWith.fromXContent(parser); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid share_with structure: " + e.getMessage(), e); + } + } +} diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseDeleteConfigTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseDeleteConfigTransportAction.java index 6245464a9..b97f17ffa 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseDeleteConfigTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseDeleteConfigTransportAction.java @@ -30,7 +30,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.commons.authuser.User; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -49,7 +48,6 @@ import org.opensearch.timeseries.model.TimeSeriesTask; import org.opensearch.timeseries.task.TaskCacheManager; import org.opensearch.timeseries.task.TaskManager; -import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.RestHandlerUtils; import org.opensearch.transport.TransportService; @@ -106,44 +104,33 @@ public BaseDeleteConfigTransportAction( protected void doExecute(Task task, DeleteConfigRequest request, ActionListener actionListener) { String configId = request.getConfigID(); LOG.info("Delete job {}", configId); - User user = ParseUtils.getUserContext(client); ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_DELETE_CONFIG); // By the time request reaches here, the user permissions are validated by Security plugin. try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { - resolveUserAndExecute( - user, - configId, - filterByEnabled, - listener, - (input) -> nodeStateManager.getConfig(configId, analysisType, config -> { - if (config.isEmpty()) { - // In a mixed cluster, if delete detector request routes to node running AD1.0, then it will - // not delete detector tasks. User can re-delete these deleted detector after cluster upgraded, - // in that case, the detector is not present. - LOG.info("Can't find config {}", configId); - taskManager.deleteTasks(configId, () -> deleteJobDoc(configId, listener), listener); - return; - } - // Check if there is realtime job or batch analysis task running. If none of these running, we - // can delete the config. - getJob(configId, listener, () -> { - taskManager.getAndExecuteOnLatestConfigLevelTask(configId, batchTaskTypes, configTask -> { - if (configTask.isPresent() && !configTask.get().isDone()) { - String batchTaskName = configTask.get() instanceof ADTask ? "Historical" : "Run once"; - listener.onFailure(new OpenSearchStatusException(batchTaskName + " is running", RestStatus.BAD_REQUEST)); - } else { - taskManager.deleteTasks(configId, () -> deleteJobDoc(configId, listener), listener); - } - // false means don't reset task state as inactive/stopped state. We are checking if task has finished or not. - // So no need to reset task state. - }, transportService, false, listener); - }); - }, listener), - client, - clusterService, - xContentRegistry, - configTypeClass - ); + resolveUserAndExecute(configId, listener, (input) -> nodeStateManager.getConfig(configId, analysisType, config -> { + if (config.isEmpty()) { + // In a mixed cluster, if delete detector request routes to node running AD1.0, then it will + // not delete detector tasks. User can re-delete these deleted detector after cluster upgraded, + // in that case, the detector is not present. + LOG.info("Can't find config {}", configId); + taskManager.deleteTasks(configId, () -> deleteJobDoc(configId, listener), listener); + return; + } + // Check if there is realtime job or batch analysis task running. If none of these running, we + // can delete the config. + getJob(configId, listener, () -> { + taskManager.getAndExecuteOnLatestConfigLevelTask(configId, batchTaskTypes, configTask -> { + if (configTask.isPresent() && !configTask.get().isDone()) { + String batchTaskName = configTask.get() instanceof ADTask ? "Historical" : "Run once"; + listener.onFailure(new OpenSearchStatusException(batchTaskName + " is running", RestStatus.BAD_REQUEST)); + } else { + taskManager.deleteTasks(configId, () -> deleteJobDoc(configId, listener), listener); + } + // false means don't reset task state as inactive/stopped state. We are checking if task has finished or not. + // So no need to reset task state. + }, transportService, false, listener); + }); + }, listener), client, clusterService, xContentRegistry, configTypeClass); } catch (Exception e) { LOG.error(e); listener.onFailure(e); diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java index b803a4851..29ceffd95 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseGetConfigTransportAction.java @@ -37,7 +37,6 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.commons.authuser.User; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.Strings; @@ -67,7 +66,6 @@ import org.opensearch.timeseries.task.TaskCacheManager; import org.opensearch.timeseries.task.TaskManager; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; -import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.RestHandlerUtils; import org.opensearch.timeseries.util.SecurityClientUtil; import org.opensearch.transport.TransportService; @@ -160,13 +158,10 @@ public BaseGetConfigTransportAction( public void doExecute(Task task, ActionRequest request, ActionListener actionListener) { GetConfigRequest getConfigRequest = GetConfigRequest.fromActionRequest(request); String configID = getConfigRequest.getConfigID(); - User user = ParseUtils.getUserContext(client); ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_CONFIG_MSG); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute( - user, configID, - filterByEnabled, listener, (config) -> getExecute(getConfigRequest, listener), client, diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseJobTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseJobTransportAction.java index 99f4a69b3..5d504cffb 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseJobTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseJobTransportAction.java @@ -98,9 +98,7 @@ protected void doExecute(Task task, JobRequest request, ActionListener executeConfig(listener, configId, dateRange, historical, rawPath, requestTimeout, user, context), client, diff --git a/src/main/java/org/opensearch/timeseries/transport/ShareConfigAction.java b/src/main/java/org/opensearch/timeseries/transport/ShareConfigAction.java new file mode 100644 index 000000000..35f5a9baa --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/transport/ShareConfigAction.java @@ -0,0 +1,21 @@ +package org.opensearch.timeseries.transport; + +import static org.opensearch.timeseries.constant.CommonValue.CONFIG_ACCESS_CONTROL_BASE_ACTION; + +import org.opensearch.action.ActionType; + +public class ShareConfigAction extends ActionType { + + /** + * Share config action instance. + */ + public static final ShareConfigAction INSTANCE = new ShareConfigAction(); + /** + * Share config action name + */ + public static final String NAME = CONFIG_ACCESS_CONTROL_BASE_ACTION + "/share"; + + private ShareConfigAction() { + super(NAME, ShareConfigResponse::new); + } +} diff --git a/src/main/java/org/opensearch/timeseries/transport/ShareConfigRequest.java b/src/main/java/org/opensearch/timeseries/transport/ShareConfigRequest.java new file mode 100644 index 000000000..8c7d8c19c --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/transport/ShareConfigRequest.java @@ -0,0 +1,48 @@ +package org.opensearch.timeseries.transport; + +import java.io.IOException; +import java.util.stream.Collectors; + +import org.opensearch.accesscontrol.resources.ShareWith; +import org.opensearch.accesscontrol.resources.SharedWithScope; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.timeseries.util.ValidationUtil; + +public class ShareConfigRequest extends ActionRequest { + private final String configId; + private final ShareWith shareWith; + + public ShareConfigRequest(String configId, ShareWith shareWith) { + this.configId = configId; + this.shareWith = shareWith; + } + + public ShareConfigRequest(StreamInput in) throws IOException { + this.configId = in.readString(); + this.shareWith = in.readNamedWriteable(ShareWith.class); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeString(configId); + out.writeNamedWriteable(shareWith); + } + + @Override + public ActionRequestValidationException validate() { + + return ValidationUtil + .validateScopes(shareWith.getSharedWithScopes().stream().map(SharedWithScope::getScope).collect(Collectors.toSet())); + } + + public String getConfigId() { + return configId; + } + + public ShareWith getShareWith() { + return shareWith; + } +} diff --git a/src/main/java/org/opensearch/timeseries/transport/ShareConfigResponse.java b/src/main/java/org/opensearch/timeseries/transport/ShareConfigResponse.java new file mode 100644 index 000000000..06adf17b6 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/transport/ShareConfigResponse.java @@ -0,0 +1,35 @@ +package org.opensearch.timeseries.transport; + +import java.io.IOException; + +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +public class ShareConfigResponse extends ActionResponse implements ToXContentObject { + private final String message; + + public ShareConfigResponse(String message) { + this.message = message; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(message); + } + + public ShareConfigResponse(final StreamInput in) throws IOException { + message = in.readString(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field("message", message); + builder.endObject(); + return builder; + } +} diff --git a/src/main/java/org/opensearch/timeseries/transport/ShareConfigTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/ShareConfigTransportAction.java new file mode 100644 index 000000000..f7aaeeeaf --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/transport/ShareConfigTransportAction.java @@ -0,0 +1,52 @@ +package org.opensearch.timeseries.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.accesscontrol.resources.ResourceService; +import org.opensearch.accesscontrol.resources.ResourceSharing; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.tasks.Task; +import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; +import org.opensearch.timeseries.constant.CommonName; +import org.opensearch.transport.TransportService; + +public class ShareConfigTransportAction extends HandledTransportAction { + + private static final Logger log = LogManager.getLogger(ShareConfigTransportAction.class); + + @Inject + public ShareConfigTransportAction(TransportService transportService, ActionFilters actionFilters) { + super(ShareConfigAction.NAME, transportService, actionFilters, ShareConfigRequest::new); + } + + @Override + protected void doExecute(Task task, ShareConfigRequest request, ActionListener listener) { + ResourceSharing sharing; + try { + sharing = shareConfig(request); + if (sharing == null) { + log.error("Unable to share resource {}. Check whether security plugin is enabled.", request.getConfigId()); + listener + .onResponse( + new ShareConfigResponse( + "Unable to share resource " + request.getConfigId() + ". Check whether security plugin is enabled." + ) + ); + return; + } + log.info("Shared resource {} with {}", request.getConfigId(), sharing.toString()); + listener.onResponse(new ShareConfigResponse("Resource " + request.getConfigId() + " shared successfully with " + sharing)); + } catch (Exception e) { + log.error("Something went wrong trying to share resource {} with {}", request.getConfigId(), request.getShareWith().toString()); + listener.onFailure(e); + } + } + + private ResourceSharing shareConfig(ShareConfigRequest request) { + ResourceService rs = TimeSeriesAnalyticsPlugin.GuiceHolder.getResourceService(); + return rs.getResourceAccessControlPlugin().shareWith(request.getConfigId(), CommonName.CONFIG_INDEX, request.getShareWith()); + } +} diff --git a/src/main/java/org/opensearch/timeseries/util/ParseUtils.java b/src/main/java/org/opensearch/timeseries/util/ParseUtils.java index 7d7ac7889..00d40550b 100644 --- a/src/main/java/org/opensearch/timeseries/util/ParseUtils.java +++ b/src/main/java/org/opensearch/timeseries/util/ParseUtils.java @@ -36,9 +36,14 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.join.ScoreMode; import org.opensearch.OpenSearchStatusException; +import org.opensearch.accesscontrol.resources.RecipientType; +import org.opensearch.accesscontrol.resources.ShareWith; +import org.opensearch.accesscontrol.resources.SharedWithScope; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.search.SearchResponse; +import org.opensearch.ad.constant.ADResourceScope; +import org.opensearch.ad.transport.IndexAnomalyDetectorResponse; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.xcontent.LoggingDeprecationHandler; @@ -70,6 +75,7 @@ import org.opensearch.search.aggregations.bucket.range.DateRangeAggregationBuilder; import org.opensearch.search.aggregations.metrics.Max; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonName; @@ -484,9 +490,7 @@ public static User getUserContext(Client client) { /** * run the given function based on given user * @param Config response type. Can be either GetAnomalyDetectorResponse or GetForecasterResponse - * @param requestedUser requested user * @param configId config Id - * @param filterByEnabled filter by backend is enabled * @param listener listener. We didn't provide the generic type of listener and therefore can return anything using the listener. * @param function Function to execute * @param client Client to OS. @@ -495,9 +499,7 @@ public static User getUserContext(Client client) { * @param configTypeClass the class of the ConfigType, used by the ConfigFactory to parse the correct type of Config */ public static void resolveUserAndExecute( - User requestedUser, String configId, - boolean filterByEnabled, ActionListener listener, Consumer function, Client client, @@ -506,22 +508,11 @@ public static configTypeClass ) { try { - if (requestedUser == null || configId == null) { - // requestedUser == null means security is disabled or user is superadmin. In this case we don't need to - // check if request user have access to the detector or not. + if (configId == null) { + // configId == null indicates this is a new config creation request. We do not check for resource permission on new creation function.accept(null); } else { - getConfig( - requestedUser, - configId, - listener, - function, - client, - clusterService, - xContentRegistry, - filterByEnabled, - configTypeClass - ); + getConfig(configId, listener, function, client, clusterService, xContentRegistry, configTypeClass); } } catch (Exception e) { listener.onFailure(e); @@ -529,46 +520,35 @@ public static void getConfig( - User requestUser, String configId, ActionListener listener, Consumer function, Client client, ClusterService clusterService, NamedXContentRegistry xContentRegistry, - boolean filterByBackendRole, Class configTypeClass ) { if (clusterService.state().metadata().indices().containsKey(CommonName.CONFIG_INDEX)) { + // Check whether current user has access to update the detector + validatePermissions(configId, listener); + GetRequest request = new GetRequest(CommonName.CONFIG_INDEX).id(configId); client .get( request, ActionListener .wrap( - response -> onGetConfigResponse( - response, - requestUser, - configId, - listener, - function, - xContentRegistry, - filterByBackendRole, - configTypeClass - ), + response -> onGetConfigResponse(response, configId, listener, function, xContentRegistry, configTypeClass), exception -> { logger.error("Failed to get config: " + configId, exception); listener.onFailure(exception); @@ -594,22 +574,18 @@ public static The type of Config to be processed in this method, which extends from the Config base type. * @param The type of ActionResponse to be used, which extends from the ActionResponse base type. * @param response The GetResponse from the getConfig request. This contains the information about the config that is to be processed. - * @param requestUser The User from the request. This user's permissions will be checked to ensure they have access to the config. * @param configId The ID of the config. This is used for logging and error messages. * @param listener The ActionListener to call if an error occurs. Any errors that occur during the processing of the config will be passed to this listener. * @param function The Consumer function to apply to the ConfigType. If the user has permission to access the config, this function will be applied. * @param xContentRegistry The XContentRegistry used to create the XContentParser. This is used to parse the response into a ConfigType. - * @param filterByBackendRole A boolean indicating whether to filter by backend role. If true, the user's backend roles will be checked to ensure they have access to the config. * @param configTypeClass The class of the ConfigType, used by the ConfigFactory to parse the correct type of Config. */ public static void onGetConfigResponse( GetResponse response, - User requestUser, String configId, ActionListener listener, Consumer function, NamedXContentRegistry xContentRegistry, - boolean filterByBackendRole, Class configTypeClass ) { if (response.isExists()) { @@ -619,17 +595,11 @@ public static getFieldNamesForFeature(Feature feature, NamedXConten return ParseUtils.parseAggregationRequest(parser); } + public static void validatePermissions(String detectorId, ActionListener listener) { + // TODO the scope supplied here needs to be dynamically applicable to each function call type + // i.e. it should be different for adExecute(), forecastExecute(), etc. + boolean hasPermission = TimeSeriesAnalyticsPlugin.GuiceHolder + .getResourceService() + .getResourceAccessControlPlugin() + .hasPermission(detectorId, CommonName.CONFIG_INDEX, ADResourceScope.AD_FULL_ACCESS.getScopeName()); + + if (!hasPermission) { + logger.debug("Current user does not have permissions to access detector: " + detectorId); + listener + .onFailure(new OpenSearchStatusException(CommonMessages.NO_PERMISSION_TO_ACCESS_CONFIG + detectorId, RestStatus.FORBIDDEN)); + } + } + + public static void shareResourceWithBackendRoles(String detectorId, User user, ActionListener listener) { + SharedWithScope.ScopeRecipients recipients = new SharedWithScope.ScopeRecipients( + Map.of(new RecipientType("backend_roles"), Set.copyOf(user.getBackendRoles())) + ); + ShareWith shareWith = new ShareWith(Set.of(new SharedWithScope(ADResourceScope.AD_FULL_ACCESS.getScopeName(), recipients))); + + TimeSeriesAnalyticsPlugin.GuiceHolder + .getResourceService() + .getResourceAccessControlPlugin() + .shareWith(detectorId, CommonName.CONFIG_INDEX, shareWith); + + logger + .info( + "Detector {} shared with backend roles of user {} for scope {}", + detectorId, + user.getName(), + ADResourceScope.AD_FULL_ACCESS.getScopeName() + ); + } + } diff --git a/src/main/java/org/opensearch/timeseries/util/ValidationUtil.java b/src/main/java/org/opensearch/timeseries/util/ValidationUtil.java new file mode 100644 index 000000000..61f145178 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/util/ValidationUtil.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.timeseries.util; + +import java.util.HashSet; +import java.util.Set; + +import org.opensearch.accesscontrol.resources.ResourceAccessScope; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.ad.constant.ADResourceScope; + +public class ValidationUtil { + public static ActionRequestValidationException validateScopes(Set scopes) { + Set validScopes = new HashSet<>(); + for (ADResourceScope scope : ADResourceScope.values()) { + validScopes.add(scope.name()); + } + validScopes.add(ResourceAccessScope.READ_ONLY); + validScopes.add(ResourceAccessScope.READ_WRITE); + + for (String s : scopes) { + if (!validScopes.contains(s)) { + ActionRequestValidationException exception = new ActionRequestValidationException(); + exception.addValidationError("Invalid scope: " + s + ". Scope must be one of: " + validScopes); + return exception; + } + } + return null; + } +} diff --git a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java index 3adeead1c..3f93a8d33 100644 --- a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java +++ b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java @@ -99,9 +99,7 @@ protected void doExecute(Task task, JobRequest request, ActionListener executeDetector(listener, detectorId, rawPath, requestTimeout, user, detectionDateRange, historical), client,