Skip to content

Commit

Permalink
Merge branch 'main' into maint/github-action-for-e2e-secrets
Browse files Browse the repository at this point in the history
Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 committed Nov 27, 2023
2 parents 25e8686 + c88c27f commit 9e737df
Show file tree
Hide file tree
Showing 15 changed files with 205 additions and 31 deletions.
16 changes: 14 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ subprojects {
testImplementation testLibs.hamcrest
testImplementation testLibs.awaitility
constraints {
implementation('org.apache.avro:avro') {
version {
require '1.11.3'
}
because 'Fixes CVE-2023-39410.'
}
implementation('org.apache.httpcomponents:httpclient') {
version {
require '4.5.14'
Expand All @@ -116,6 +122,12 @@ subprojects {
}
because 'the build fails if the Log4j API is not update along with log4j-core'
}
implementation('org.apache.zookeeper:zookeeper') {
version {
require '3.7.2'
}
because 'Fixes CVE-2023-44981'
}
implementation('com.google.code.gson:gson') {
version {
require '2.8.9'
Expand Down Expand Up @@ -196,9 +208,9 @@ subprojects {
}
implementation('org.json:json') {
version {
require '20230618'
require '20231013'
}
because 'CVE from transitive dependencies'
because 'CVE-2023-5072, CVE from transitive dependencies'
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,20 @@ private void addAcknowledgedOffsets(final TopicPartition topicPartition, final R

if (Objects.isNull(commitTracker) && errLogRateLimiter.isAllowed(System.currentTimeMillis())) {
LOG.error("Commit tracker not found for TopicPartition: {}", topicPartition);
return;
}

final OffsetAndMetadata offsetAndMetadata =
partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange);
final OffsetAndMetadata offsetAndMetadata = commitTracker.addCompletedOffsets(offsetRange);
updateOffsetsToCommit(topicPartition, offsetAndMetadata);
}

private void resetOffsets() {
// resetting offsets is similar to committing acknowledged offsets. Throttle the frequency of resets by
// checking current time with last commit time. Same "lastCommitTime" and commit interval are used in both cases
long currentTimeMillis = System.currentTimeMillis();
if ((currentTimeMillis - lastCommitTime) < topicConfig.getCommitInterval().toMillis()) {
return;
}
if (partitionsToReset.size() > 0) {
partitionsToReset.forEach(partition -> {
try {
Expand All @@ -244,6 +250,8 @@ private void resetOffsets() {
consumer.seek(partition, offsetAndMetadata);
}
partitionCommitTrackerMap.remove(partition.partition());
final long epoch = getCurrentTimeNanos();
ownedPartitionsEpoch.put(partition, epoch);
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon negative acknowledgement {}", partition, e);
}
Expand Down Expand Up @@ -493,7 +501,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
continue;
}
LOG.info("Assigned partition {}", topicPartition);
partitionCommitTrackerMap.remove(topicPartition.partition());
ownedPartitionsEpoch.put(topicPartition, epoch);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ private void createRole(final String role, final String indexPattern, final Stri
final String createRoleJson = Strings.toString(
XContentFactory.jsonBuilder()
.startObject()
.array("cluster_permissions", "cluster:monitor/main")
.startArray("index_permissions")
.startObject()
.array("index_patterns", new String[]{indexPattern})
.array("allowed_actions", allowedActions)
.endObject()
.startObject()
.array("index_patterns", new String[]{"*"})
.array("allowed_actions", "indices:admin/aliases/get")
.endObject()
.endArray()
.endObject()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.opensearch.client.Request;
Expand Down Expand Up @@ -99,6 +100,9 @@
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.wipeAllTemplates;

public class OpenSearchSinkIT {
private static final int LUCENE_CHAR_LENGTH_LIMIT = 32_766;
private static final String TEST_STRING_WITH_SPECIAL_CHARS = "Hello! Data-Prepper? #Example123";
private static final String TEST_STRING_WITH_NON_LATIN_CHARS = "Привет,Γειά σας,こんにちは,你好";
private static final String PLUGIN_NAME = "opensearch";
private static final String PIPELINE_NAME = "integTestPipeline";
private static final String TEST_CUSTOM_INDEX_POLICY_FILE = "test-custom-index-policy-file.json";
Expand Down Expand Up @@ -969,6 +973,33 @@ public void testEventOutput() throws IOException, InterruptedException {
sink.shutdown();
}

@ParameterizedTest
@MethodSource("getAttributeTestSpecialAndExtremeValues")
public void testEventOutputWithSpecialAndExtremeValues(final Object testValue) throws IOException, InterruptedException {
final String testIndexAlias = "test-alias";
final String testField = "value";
final Map<String, Object> data = new HashMap<>();
data.put(testField, testValue);
final Event testEvent = JacksonEvent.builder()
.withData(data)
.withEventType("event")
.build();

final List<Record<Event>> testRecords = Collections.singletonList(new Record<>(testEvent));

final PluginSetting pluginSetting = generatePluginSetting(null, testIndexAlias, null);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);

final List<Map<String, Object>> retSources = getSearchResponseDocSources(testIndexAlias);
final Map<String, Object> expectedContent = new HashMap<>();
expectedContent.put(testField, testValue);

assertThat(retSources.size(), equalTo(1));
assertThat(retSources.get(0), equalTo(expectedContent));
sink.shutdown();
}

@ParameterizedTest
@ValueSource(strings = {"info/ids/id", "id"})
public void testOpenSearchDocumentId(final String testDocumentIdField) throws IOException, InterruptedException {
Expand Down Expand Up @@ -1436,4 +1467,25 @@ private void createV1IndexTemplate(final String templateName, final String index
private static boolean isES6() {
return DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(OpenSearchIntegrationHelper.getVersion()) >= 0;
}

private static Stream<Object> getAttributeTestSpecialAndExtremeValues() {
return Stream.of(
null,
Arguments.of(Long.MAX_VALUE),
Arguments.of(Long.MIN_VALUE),
Arguments.of(Integer.MAX_VALUE),
Arguments.of(Integer.MIN_VALUE),
Arguments.of(RandomStringUtils.randomAlphabetic(LUCENE_CHAR_LENGTH_LIMIT)),
Arguments.of(TEST_STRING_WITH_SPECIAL_CHARS),
Arguments.of(TEST_STRING_WITH_NON_LATIN_CHARS),
Arguments.of(Double.MIN_VALUE),
Arguments.of(-Double.MIN_VALUE),
Arguments.of((double) Float.MAX_VALUE),
Arguments.of((double) Float.MIN_VALUE),
Arguments.of((double) -Float.MAX_VALUE),
Arguments.of((double) -Float.MIN_VALUE),
Arguments.of(Boolean.TRUE),
Arguments.of(Boolean.FALSE)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,18 @@ private void doInitializeInternal() throws IOException {
}
indexManager.setupIndex();

final Boolean requireAlias = indexManager.isIndexAlias(configuredIndexAlias);
final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression();
final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled();
if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) {
final int maxLocalCompressionsForEstimation = openSearchSinkConfig.getIndexConfiguration().getMaxLocalCompressionsForEstimation();
bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder(), bulkSize, maxLocalCompressionsForEstimation);
bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias), bulkSize, maxLocalCompressionsForEstimation);
} else if (isEstimateBulkSizeUsingCompression) {
LOG.warn("Estimate bulk request size using compression was enabled but request compression is disabled. " +
"Estimating bulk request size without compression.");
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias));
} else {
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder().requireAlias(requireAlias));
}

final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.opensearch.client.opensearch.cluster.GetClusterSettingsRequest;
import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -50,6 +52,8 @@ public abstract class AbstractIndexManager implements IndexManager {
protected IsmPolicyManagementStrategy ismPolicyManagementStrategy;
private final TemplateStrategy templateStrategy;
protected String indexPrefix;
private Boolean isIndexAlias;
private boolean isIndexAliasChecked;

private static final Logger LOG = LoggerFactory.getLogger(AbstractIndexManager.class);

Expand Down Expand Up @@ -112,6 +116,10 @@ public static String getIndexAliasWithDate(final String indexAlias) {
return indexAlias.replaceAll(TIME_PATTERN_REGULAR_EXPRESSION, "") + suffix;
}

private void initalizeIsIndexAlias(final String indexAlias) {

}

private void initializeIndexPrefixAndSuffix(final String indexAlias){
final DateTimeFormatter dateFormatter = getDatePatternFormatter(indexAlias);
if (dateFormatter != null) {
Expand Down Expand Up @@ -176,6 +184,26 @@ public static ZonedDateTime getCurrentUtcTime() {
return LocalDateTime.now().atZone(ZoneId.systemDefault()).withZoneSameInstant(UTC_ZONE_ID);
}

@Override
public Boolean isIndexAlias(final String dynamicIndexAlias) throws IOException {
if (isIndexAliasChecked == false) {
try {
// Try to get the OpenSearch version. This fails on older OpenDistro versions, that do not support
// `require_alias` as a bulk API parameter. All OpenSearch versions do, as this was introduced in
// ES 7.10.
openSearchClient.info();
ExistsAliasRequest request = new ExistsAliasRequest.Builder().name(dynamicIndexAlias).build();
BooleanResponse response = openSearchClient.indices().existsAlias(request);
isIndexAlias = response.value() && checkISMEnabled();
} catch (RuntimeException ex) {
isIndexAlias = null;
} finally {
isIndexAliasChecked = true;
}
}
return isIndexAlias;
}

final boolean checkISMEnabled() throws IOException {
final GetClusterSettingsRequest request = new GetClusterSettingsRequest.Builder()
.includeDefaults(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.io.IOException;

public interface IndexManager{

void setupIndex() throws IOException;
String getIndexName(final String indexAlias) throws IOException;
Boolean isIndexAlias(final String indexAlias) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.client.opensearch.cluster.GetClusterSettingsResponse;
import org.opensearch.client.opensearch.cluster.OpenSearchClusterClient;
import org.opensearch.client.opensearch.indices.CreateIndexRequest;
import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
import org.opensearch.client.opensearch.indices.ExistsRequest;
import org.opensearch.client.opensearch.indices.GetTemplateRequest;
import org.opensearch.client.opensearch.indices.GetTemplateResponse;
Expand Down Expand Up @@ -313,6 +314,51 @@ void constructor_NullConfiguration() {
verify(indexConfiguration).getIsmPolicyFile();
}

@Test
void isIndexAlias_True() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true));
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("true");
assertEquals(true, defaultIndexManager.isIndexAlias(INDEX_ALIAS));
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void isIndexAlias_False_NoAlias() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(false));
assertEquals(false, defaultIndexManager.isIndexAlias(INDEX_ALIAS));
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
}

@Test
void isIndexAlias_False_NoISM() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy);
when(openSearchIndicesClient.existsAlias(any(ExistsAliasRequest.class))).thenReturn(new BooleanResponse(true));
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), anyString())).thenReturn("false");
assertEquals(false, defaultIndexManager.isIndexAlias(INDEX_ALIAS));
verify(openSearchSinkConfiguration, times(2)).getIndexConfiguration();
verify(indexConfiguration).getIsmPolicyFile();
verify(indexConfiguration).getIndexAlias();
verify(openSearchClient).indices();
verify(openSearchIndicesClient).existsAlias(any(ExistsAliasRequest.class));
verify(openSearchClient).cluster();
verify(openSearchClusterClient).getSettings(any(GetClusterSettingsRequest.class));
}

@Test
void checkISMEnabled_True() throws IOException {
defaultIndexManager = indexManagerFactory.getIndexManager(
Expand Down
Loading

0 comments on commit 9e737df

Please sign in to comment.