Skip to content

Commit

Permalink
Implement patch API for datasources (#2273)
Browse files Browse the repository at this point in the history
* Implement patch API for datasources

Signed-off-by: Derek Ho <[email protected]>

* Change patch implementation to Map

Signed-off-by: Derek Ho <[email protected]>

* Fix up, everything complete except unit test

Signed-off-by: Derek Ho <[email protected]>

* Revise PR to use existing functions

Signed-off-by: Derek Ho <[email protected]>

* Remove unused utility function

Signed-off-by: Derek Ho <[email protected]>

* Add tests

Signed-off-by: Derek Ho <[email protected]>

* Add back line

Signed-off-by: Derek Ho <[email protected]>

* fix build issue

Signed-off-by: Derek Ho <[email protected]>

* Fix tests and add in rst

Signed-off-by: Derek Ho <[email protected]>

* Register patch

Signed-off-by: Derek Ho <[email protected]>

* Add imports

Signed-off-by: Derek Ho <[email protected]>

* Patch

Signed-off-by: Derek Ho <[email protected]>

* Fix integration test

Signed-off-by: Derek Ho <[email protected]>

* Update IT

Signed-off-by: Derek Ho <[email protected]>

* Add tests

Signed-off-by: Derek Ho <[email protected]>

* Fix test

Signed-off-by: Derek Ho <[email protected]>

* Fix tests and increase code cov

Signed-off-by: Derek Ho <[email protected]>

* Add more coverage to impl

Signed-off-by: Derek Ho <[email protected]>

* Fix test and jacoco passing

Signed-off-by: Derek Ho <[email protected]>

* Test fix

Signed-off-by: Derek Ho <[email protected]>

* Add docs

Signed-off-by: Derek Ho <[email protected]>

---------

Signed-off-by: Derek Ho <[email protected]>
(cherry picked from commit f835112)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 18, 2023
1 parent 3d1a376 commit 70ecae8
Show file tree
Hide file tree
Showing 15 changed files with 580 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.datasource;

import java.util.Map;
import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand Down Expand Up @@ -56,12 +57,19 @@ public interface DataSourceService {
void createDataSource(DataSourceMetadata metadata);

/**
* Updates {@link DataSource} corresponding to dataSourceMetadata.
* Updates {@link DataSource} corresponding to dataSourceMetadata (all fields needed).
*
* @param dataSourceMetadata {@link DataSourceMetadata}.
*/
void updateDataSource(DataSourceMetadata dataSourceMetadata);

/**
* Patches {@link DataSource} corresponding to the given name (only fields to be changed needed).
*
* @param dataSourceData
*/
void patchDataSource(Map<String, Object> dataSourceData);

/**
* Deletes {@link DataSource} corresponding to the DataSource name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ public DataSource getDataSource(String dataSourceName) {
@Override
public void updateDataSource(DataSourceMetadata dataSourceMetadata) {}

@Override
public void patchDataSource(Map<String, Object> dataSourceData) {}

@Override
public void deleteDataSource(String dataSourceName) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.CONNECTOR_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;

import java.io.IOException;
import java.util.Map;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;

public class PatchDataSourceActionRequest extends ActionRequest {

@Getter private Map<String, Object> dataSourceData;

/** Constructor of UpdateDataSourceActionRequest from StreamInput. */
public PatchDataSourceActionRequest(StreamInput in) throws IOException {
super(in);
}

Check warning on line 28 in datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java#L27-L28

Added lines #L27 - L28 were not covered by tests

public PatchDataSourceActionRequest(Map<String, Object> dataSourceData) {
this.dataSourceData = dataSourceData;
}

@Override
public ActionRequestValidationException validate() {
if (this.dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
ActionRequestValidationException exception = new ActionRequestValidationException();
exception.addValidationError(

Check warning on line 38 in datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java#L37-L38

Added lines #L37 - L38 were not covered by tests
"Not allowed to update datasource with name : " + DEFAULT_DATASOURCE_NAME);
return exception;

Check warning on line 40 in datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java#L40

Added line #L40 was not covered by tests
} else if (this.dataSourceData.get(CONNECTOR_FIELD) != null) {
ActionRequestValidationException exception = new ActionRequestValidationException();
exception.addValidationError("Not allowed to update connector for datasource");
return exception;

Check warning on line 44 in datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java#L42-L44

Added lines #L42 - L44 were not covered by tests
} else {
return null;

Check warning on line 46 in datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionRequest.java#L46

Added line #L46 was not covered by tests
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import java.io.IOException;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

@RequiredArgsConstructor
public class PatchDataSourceActionResponse extends ActionResponse {

@Getter private final String result;

public PatchDataSourceActionResponse(StreamInput in) throws IOException {
super(in);
result = in.readString();
}

Check warning on line 25 in datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionResponse.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionResponse.java#L23-L25

Added lines #L23 - L25 were not covered by tests

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
streamOutput.writeString(result);
}

Check warning on line 30 in datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionResponse.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/model/transport/PatchDataSourceActionResponse.java#L29-L30

Added lines #L29 - L30 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.NOT_FOUND;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.rest.RestRequest.Method.DELETE;
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.POST;
import static org.opensearch.rest.RestRequest.Method.PUT;
import static org.opensearch.rest.RestRequest.Method.*;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
Expand All @@ -32,18 +30,8 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse;
import org.opensearch.sql.datasources.transport.TransportCreateDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportDeleteDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportGetDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportUpdateDataSourceAction;
import org.opensearch.sql.datasources.model.transport.*;
import org.opensearch.sql.datasources.transport.*;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.datasources.utils.XContentParserUtils;

Expand Down Expand Up @@ -98,6 +86,17 @@ public List<Route> routes() {
*/
new Route(PUT, BASE_DATASOURCE_ACTION_URL),

/*
* PATCH datasources
* Request body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionRequest]
* Response body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionResponse]
*/
new Route(PATCH, BASE_DATASOURCE_ACTION_URL),

/*
* DELETE datasources
* Request body: Ref
Expand All @@ -122,6 +121,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
return executeUpdateRequest(restRequest, nodeClient);
case DELETE:
return executeDeleteRequest(restRequest, nodeClient);
case PATCH:
return executePatchRequest(restRequest, nodeClient);

Check warning on line 125 in datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java#L125

Added line #L125 was not covered by tests
default:
return restChannel ->
restChannel.sendResponse(
Expand Down Expand Up @@ -216,6 +217,34 @@ public void onFailure(Exception e) {
}));
}

private RestChannelConsumer executePatchRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
Map<String, Object> dataSourceData = XContentParserUtils.toMap(restRequest.contentParser());
return restChannel ->
Scheduler.schedule(

Check warning on line 224 in datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java#L222-L224

Added lines #L222 - L224 were not covered by tests
nodeClient,
() ->
nodeClient.execute(

Check warning on line 227 in datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java#L227

Added line #L227 was not covered by tests
TransportPatchDataSourceAction.ACTION_TYPE,
new PatchDataSourceActionRequest(dataSourceData),
new ActionListener<>() {

Check warning on line 230 in datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java#L230

Added line #L230 was not covered by tests
@Override
public void onResponse(
PatchDataSourceActionResponse patchDataSourceActionResponse) {
restChannel.sendResponse(

Check warning on line 234 in datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java#L234

Added line #L234 was not covered by tests
new BytesRestResponse(
RestStatus.OK,
"application/json; charset=UTF-8",
patchDataSourceActionResponse.getResult()));
}

Check warning on line 239 in datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java#L238-L239

Added lines #L238 - L239 were not covered by tests

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}

Check warning on line 244 in datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java

View check run for this annotation

Codecov / codecov/patch

datasources/src/main/java/org/opensearch/sql/datasources/rest/RestDataSourceQueryAction.java#L243-L244

Added lines #L243 - L244 were not covered by tests
}));
}

private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) {

String dataSourceName = restRequest.param("dataSourceName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
package org.opensearch.sql.datasources.service;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.*;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
Expand Down Expand Up @@ -100,6 +96,19 @@ public void updateDataSource(DataSourceMetadata dataSourceMetadata) {
}
}

@Override
public void patchDataSource(Map<String, Object> dataSourceData) {
if (!dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
DataSourceMetadata dataSourceMetadata =
getRawDataSourceMetadata((String) dataSourceData.get(NAME_FIELD));
replaceOldDatasourceMetadata(dataSourceData, dataSourceMetadata);
updateDataSource(dataSourceMetadata);
} else {
throw new UnsupportedOperationException(
"Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME);
}
}

@Override
public void deleteDataSource(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
Expand Down Expand Up @@ -136,6 +145,35 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+ " Properties are required parameters.");
}

/**
* Replaces the fields in the map of the given metadata.
*
* @param dataSourceData
* @param metadata {@link DataSourceMetadata}.
*/
private void replaceOldDatasourceMetadata(
Map<String, Object> dataSourceData, DataSourceMetadata metadata) {

for (String key : dataSourceData.keySet()) {
switch (key) {
// Name and connector should not be modified
case DESCRIPTION_FIELD:
metadata.setDescription((String) dataSourceData.get(DESCRIPTION_FIELD));
break;
case ALLOWED_ROLES_FIELD:
metadata.setAllowedRoles((List<String>) dataSourceData.get(ALLOWED_ROLES_FIELD));
break;
case PROPERTIES_FIELD:
Map<String, String> properties = new HashMap<>(metadata.getProperties());
properties.putAll(((Map<String, String>) dataSourceData.get(PROPERTIES_FIELD)));
break;
case NAME_FIELD:
case CONNECTOR_FIELD:
break;
}
}
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.transport;

import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

public class TransportPatchDataSourceAction
extends HandledTransportAction<PatchDataSourceActionRequest, PatchDataSourceActionResponse> {

public static final String NAME = "cluster:admin/opensearch/ql/datasources/patch";
public static final ActionType<PatchDataSourceActionResponse> ACTION_TYPE =
new ActionType<>(NAME, PatchDataSourceActionResponse::new);

private DataSourceService dataSourceService;

/**
* TransportPatchDataSourceAction action for updating datasource.
*
* @param transportService transportService.
* @param actionFilters actionFilters.
* @param dataSourceService dataSourceService.
*/
@Inject
public TransportPatchDataSourceAction(
TransportService transportService,
ActionFilters actionFilters,
DataSourceServiceImpl dataSourceService) {
super(
TransportPatchDataSourceAction.NAME,
transportService,
actionFilters,
PatchDataSourceActionRequest::new);
this.dataSourceService = dataSourceService;
}

@Override
protected void doExecute(
Task task,
PatchDataSourceActionRequest request,
ActionListener<PatchDataSourceActionResponse> actionListener) {
try {
dataSourceService.patchDataSource(request.getDataSourceData());
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Updated DataSource with name " + request.getDataSourceData().get(NAME_FIELD));
actionListener.onResponse(new PatchDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}
Loading

0 comments on commit 70ecae8

Please sign in to comment.