Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement patch API for datasources #2273

Merged
merged 22 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.datasource;

import java.util.Map;
import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand Down Expand Up @@ -56,12 +57,19 @@ public interface DataSourceService {
void createDataSource(DataSourceMetadata metadata);

/**
* Updates {@link DataSource} corresponding to dataSourceMetadata.
* Updates {@link DataSource} corresponding to dataSourceMetadata (all fields needed).
*
* @param dataSourceMetadata {@link DataSourceMetadata}.
*/
void updateDataSource(DataSourceMetadata dataSourceMetadata);

/**
* Patches {@link DataSource} corresponding to the given name (only fields to be changed needed).
*
* @param dataSourceData
*/
void patchDataSource(Map<String, Object> dataSourceData);

/**
* Deletes {@link DataSource} corresponding to the DataSource name.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;

import java.io.IOException;
import java.util.Map;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;

public class PatchDataSourceActionRequest extends ActionRequest {

@Getter private Map<String, Object> dataSourceData;

/** Constructor of UpdateDataSourceActionRequest from StreamInput. */
public PatchDataSourceActionRequest(StreamInput in) throws IOException {
super(in);
}

public PatchDataSourceActionRequest(Map<String, Object> dataSourceData) {
this.dataSourceData = dataSourceData;
}

@Override
public ActionRequestValidationException validate() {
if (this.dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
ActionRequestValidationException exception = new ActionRequestValidationException();
exception.addValidationError(
vamsimanohar marked this conversation as resolved.
Show resolved Hide resolved
"Not allowed to update datasource with name : " + DEFAULT_DATASOURCE_NAME);
return exception;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import java.io.IOException;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

@RequiredArgsConstructor
public class PatchDataSourceActionResponse extends ActionResponse {

@Getter private final String result;

public PatchDataSourceActionResponse(StreamInput in) throws IOException {
super(in);
result = in.readString();
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
streamOutput.writeString(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.NOT_FOUND;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.rest.RestRequest.Method.DELETE;
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.POST;
import static org.opensearch.rest.RestRequest.Method.PUT;
import static org.opensearch.rest.RestRequest.Method.*;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
Expand All @@ -32,18 +30,8 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse;
import org.opensearch.sql.datasources.transport.TransportCreateDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportDeleteDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportGetDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportUpdateDataSourceAction;
import org.opensearch.sql.datasources.model.transport.*;
import org.opensearch.sql.datasources.transport.*;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.datasources.utils.XContentParserUtils;

Expand Down Expand Up @@ -98,6 +86,17 @@ public List<Route> routes() {
*/
new Route(PUT, BASE_DATASOURCE_ACTION_URL),

/*
* PATCH datasources
* Request body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionRequest]
* Response body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionResponse]
*/
new Route(PATCH, BASE_DATASOURCE_ACTION_URL),

/*
* DELETE datasources
* Request body: Ref
Expand All @@ -122,6 +121,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
return executeUpdateRequest(restRequest, nodeClient);
case DELETE:
return executeDeleteRequest(restRequest, nodeClient);
case PATCH:
return executePatchRequest(restRequest, nodeClient);
default:
return restChannel ->
restChannel.sendResponse(
Expand Down Expand Up @@ -216,6 +217,34 @@ public void onFailure(Exception e) {
}));
}

private RestChannelConsumer executePatchRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
Map<String, Object> dataSourceData = XContentParserUtils.toMap(restRequest.contentParser());
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportPatchDataSourceAction.ACTION_TYPE,
new PatchDataSourceActionRequest(dataSourceData),
new ActionListener<>() {
@Override
public void onResponse(
PatchDataSourceActionResponse patchDataSourceActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.OK,
"application/json; charset=UTF-8",
patchDataSourceActionResponse.getResult()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}

private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) {

String dataSourceName = restRequest.param("dataSourceName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.opensearch.sql.datasources.service;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand Down Expand Up @@ -47,6 +48,13 @@ public interface DataSourceMetadataStorage {
*/
void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata);

/**
* Patches {@link DataSourceMetadata} in underlying storage.
*
* @param dataSourceData
*/
void patchDataSourceMetadata(Map<String, Object> dataSourceData);

/**
* Deletes {@link DataSourceMetadata} corresponding to the datasourceName from underlying storage.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
package org.opensearch.sql.datasources.service;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
Expand Down Expand Up @@ -98,6 +94,16 @@ public void updateDataSource(DataSourceMetadata dataSourceMetadata) {
}
}

@Override
public void patchDataSource(Map<String, Object> dataSourceData) {
if (!dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
this.dataSourceMetadataStorage.patchDataSourceMetadata(dataSourceData);
vamsimanohar marked this conversation as resolved.
Show resolved Hide resolved
derek-ho marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new UnsupportedOperationException(
"Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME);
}
}

@Override
public void deleteDataSource(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

package org.opensearch.sql.datasources.storage;

import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.PROPERTIES_FIELD;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -161,6 +164,35 @@ public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {
}
}

@Override
public void patchDataSourceMetadata(Map<String, Object> dataSourceData) {
encryptDecryptAuthenticationData(dataSourceData, true);
UpdateRequest updateRequest =
new UpdateRequest(DATASOURCE_INDEX_NAME, (String) dataSourceData.get(NAME_FIELD));
UpdateResponse updateResponse;
try (ThreadContext.StoredContext storedContext =
client.threadPool().getThreadContext().stashContext()) {
updateRequest.doc(XContentParserUtils.convertMapToXContent(dataSourceData));
derek-ho marked this conversation as resolved.
Show resolved Hide resolved
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<UpdateResponse> updateResponseActionFuture = client.update(updateRequest);
updateResponse = updateResponseActionFuture.actionGet();
} catch (DocumentMissingException exception) {
throw new DataSourceNotFoundException(
"Datasource with name: " + dataSourceData.get(NAME_FIELD) + " doesn't exist");
} catch (Exception e) {
throw new RuntimeException(e);
}

if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED)
|| updateResponse.getResult().equals(DocWriteResponse.Result.NOOP)) {
LOG.debug("DatasourceMetadata : {} successfully updated", dataSourceData.get(NAME_FIELD));
} else {
throw new RuntimeException(
"Saving dataSource metadata information failed with result : "
+ updateResponse.getResult().getLowercase());
}
}

@Override
public void deleteDataSourceMetadata(String datasourceName) {
DeleteRequest deleteRequest = new DeleteRequest(DATASOURCE_INDEX_NAME);
Expand Down Expand Up @@ -263,6 +295,16 @@ private DataSourceMetadata encryptDecryptAuthenticationData(
return dataSourceMetadata;
}

// Encrypt and Decrypt irrespective of auth type.If properties name ends in username, password,
// secret_key and access_key.
private Map<String, Object> encryptDecryptAuthenticationData(
Map<String, Object> dataSourceData, Boolean isEncryption) {
Map<String, String> propertiesMap = (Map<String, String>) dataSourceData.get(PROPERTIES_FIELD);
handleBasicAuthPropertiesEncryptionDecryption(propertiesMap, isEncryption);
handleSigV4PropertiesEncryptionDecryption(propertiesMap, isEncryption);
return dataSourceData;
}

private void handleBasicAuthPropertiesEncryptionDecryption(
Map<String, String> propertiesMap, Boolean isEncryption) {
ArrayList<String> list = new ArrayList<>();
Expand Down
Loading