Skip to content

Commit

Permalink
[CYB-208] OCSF Support for Flink Indexing + JSON TableAPI (#88)
Browse files Browse the repository at this point in the history
* json tableapi support

* OCSF Support for Flink Indexing
  • Loading branch information
stas-panasiuk authored Dec 20, 2024
1 parent 8748f24 commit b973766
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand All @@ -28,23 +32,35 @@ public class MappingColumnDto {
private Boolean isMap;

@JsonIgnore
public String getKafkaName() {
public List<String> getKafkaNameList() {
final String properName = kafkaName == null ? name : kafkaName;
if (getIsMap()) {
return String.format("['%s']", properName);
} else {
if (getPath().equals("..")) {
return String.format("%s", properName);
}
return String.format(".%s", properName);
return Collections.singletonList(String.format("['%s']", properName));
}

String[] kafkaNamesSplit = properName.split(",");

return Arrays.stream(kafkaNamesSplit)
.map(singleKafkaName -> {
if (getPath().equals("..")) {
return String.format("%s", singleKafkaName);
}
return String.format(".%s", singleKafkaName);
})
.collect(Collectors.toList());
}

@JsonIgnore
public String getRawKafkaName(){
public String getRawKafkaName() {
return kafkaName;
}

@JsonProperty("path")
public String getRawPath() {
return this.path;
}

@JsonIgnore
public String getPath() {
if (StringUtils.isEmpty(path)) {
return "extensions";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,31 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Streams;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.FormatDescriptor;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
Expand Down Expand Up @@ -355,30 +370,42 @@ protected String getInsertColumns(MappingDto mappingDto) {
}

private String getFromColumns(MappingDto mappingDto, ResolvedSchema tableSchema) {
System.out.println("Building from columns");
return mappingDto.getColumnMapping().stream()
.map(mappingColumnDto -> {
final String kafkaName = mappingColumnDto.getKafkaName();
final List<String> kafkaNameList = mappingColumnDto.getKafkaNameList();
final String path = mappingColumnDto.getPath();

String fullPath;
if (path.startsWith("..")) {
fullPath = path.substring(2);
} else {
fullPath = String.format("message.%s", path);
}
if (StringUtils.hasText(fullPath)) {
fullPath = String.join(".", fullPath.split("\\."));
}

fullPath = fullPath + kafkaName;
List<String> fullPathList = kafkaNameList.stream()
.map(kafkaName -> {
String fullPath;
if (path.startsWith("..")) {
fullPath = path.substring(2);
} else {
fullPath = String.format("message.%s", path);
}
if (StringUtils.hasText(fullPath)) {
fullPath = String.join(".", fullPath.split("\\."));
}

return "(" + fullPath + kafkaName + ")";
}).collect(Collectors.toList());

Optional<Column> column = tableSchema.getColumn(mappingColumnDto.getName());
final String transformation = column.map(value -> getTransformation(value.getDataType(), mappingColumnDto)).orElse("");

if (!CollectionUtils.isEmpty(mappingDto.getIgnoreFields())) {
String fieldsToIgnore = mappingDto.getIgnoreFields().stream()
.filter(StringUtils::hasText)
.collect(Collectors.joining("','", "'", "'"));
if (StringUtils.hasText(transformation)) {
fullPathList.add(fieldsToIgnore);
}
}

return StringUtils.hasText(transformation)
? String.format(transformation, "(" + fullPath + ")", mappingDto.getIgnoreFields().stream()
.collect(Collectors.joining("','", "'", "'")))
: fullPath;
? String.format(transformation, fullPathList.toArray(new Object[0]))
: String.join(", ", fullPathList);
})
.collect(Collectors.joining(", ", " ", " "));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.cloudera.cyber.indexing.hive.tableapi;

import com.cloudera.cyber.indexing.hive.tableapi.impl.TableApiFilesystemJob;
import com.cloudera.cyber.indexing.hive.tableapi.impl.TableApiHiveJob;
import com.cloudera.cyber.indexing.hive.tableapi.impl.TableApiKafkaJob;
import com.cloudera.cyber.scoring.ScoredMessage;
import java.io.IOException;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.IOException;

public class TableApiJobFactory {

public static TableApiAbstractJob getJobByConnectorName(String typeName, ParameterTool params, StreamExecutionEnvironment env, DataStream<ScoredMessage> source) throws IOException {
Expand All @@ -20,6 +20,8 @@ public static TableApiAbstractJob getJobByConnectorName(String typeName, Paramet
return new TableApiHiveJob(params, env, source);
case "kafka":
return new TableApiKafkaJob(params, env, source);
case "filesystem":
return new TableApiFilesystemJob(params, env, source);
default:
throw new RuntimeException(String.format("Unknown job type name [%s] provided while the Flink writer is selected as TableAPI", typeName));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.cloudera.cyber.indexing.hive.tableapi.impl;

import com.cloudera.cyber.indexing.hive.tableapi.TableApiAbstractJob;
import com.cloudera.cyber.scoring.ScoredMessage;
import java.io.IOException;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.FormatDescriptor;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class TableApiFilesystemJob extends TableApiAbstractJob {

private static final String BASE_TABLE_JSON = "base-hive-table.json";
private final String format;
private final String path;

public TableApiFilesystemJob(ParameterTool params, StreamExecutionEnvironment env, DataStream<ScoredMessage> source)
throws IOException {
super(params, env, source, "Filesystem", BASE_TABLE_JSON);
format = params.get("flink.files.format", "json");
path = params.getRequired("flink.files.path");
}

@Override
protected StreamExecutionEnvironment jobReturnValue() {
return null;
}

@Override
protected String getTableConnector() {
return "filesystem";
}

@Override
protected FormatDescriptor getFormatDescriptor() {
return FormatDescriptor.forFormat(format).build();
}

@Override
protected void registerCatalog(StreamTableEnvironment tableEnv) {
}

@Override
protected TableDescriptor.Builder fillTableOptions(TableDescriptor.Builder builder) {
return super.fillTableOptions(builder)
.option("path", path)
.option("format", format)
.option("sink.partition-commit.policy.kind", "success-file");
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.cloudera.cyber.indexing.hive.tableapi;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.cloudera.cyber.indexing.MappingColumnDto;
import com.cloudera.cyber.indexing.MappingDto;
import com.cloudera.cyber.indexing.hive.tableapi.impl.TableApiKafkaJob;
Expand Down Expand Up @@ -51,7 +53,40 @@ public static Stream<Arguments> mappingsData() {
new MappingColumnDto("column6", null, null, null, false),
new MappingColumnDto("column7", null, null, null, false),
new MappingColumnDto("column8", null, null, null, false),
new MappingColumnDto("column9", null, null, null, false))))));
new MappingColumnDto("column9", null, null, null, false))))),

Arguments.of(Collections.singletonMap(GIVEN_TABLE_NAME, ResolvedSchema.of(
Column.physical("column1", DataTypes.STRING()),
Column.physical("column2", DataTypes.STRING()),
Column.physical("column3", DataTypes.STRING()))),
Collections.singletonMap(GIVEN_SOURCE,
new MappingDto(GIVEN_TABLE_NAME, new ArrayList<>(), Arrays.asList(
new MappingColumnDto("column1", "column1,column2", null, null, false),
new MappingColumnDto("column2", "column3", null, null, false))))));
}

public static Stream<Arguments> insertSqlData() {
return Stream.of(
Arguments.of("topic",
new MappingDto("tableName", Collections.emptyList(), Collections.emptyList()),
ResolvedSchema.of(),
"CREATE TEMPORARY VIEW topic_tmpview( ) AS \n" +
" SELECT \n" +
" from KafkaTempView\n" +
" where `source`='topic'"),
Arguments.of("topic",
new MappingDto("tableName", Collections.emptyList(), Arrays.asList(
new MappingColumnDto("column1", "column1,column2", null, "ROW(%s, %s)", false),
new MappingColumnDto("column2", "column3", null, null, false))),
ResolvedSchema.of(
Column.physical("column1", DataTypes.STRING()),
Column.physical("column2", DataTypes.STRING()),
Column.physical("column3", DataTypes.STRING())),
"CREATE TEMPORARY VIEW topic_tmpview( column1, column2 ) AS \n" +
" SELECT ROW((message.extensions.column1), (message.extensions.column2)), (message.extensions.column3) \n" +
" from KafkaTempView\n" +
" where `source`='topic'")
);
}

public static Stream<Arguments> mappingsExceptionData() {
Expand Down Expand Up @@ -92,13 +127,21 @@ public static Stream<Arguments> mappingsExceptionData() {
"Found column mappings of non-string type without transformations for source [%s]: %s",
GIVEN_SOURCE, "[column1]")));
}

@ParameterizedTest
@MethodSource("mappingsData")
void shouldValidateMappings(Map<String, ResolvedSchema> givenTableSchemaMap,
Map<String, MappingDto> givenTopicMapping) {
job.validateMappings(givenTableSchemaMap, givenTopicMapping);
}

@ParameterizedTest
@MethodSource("insertSqlData")
void shouldGenerateInsertSql(String topic, MappingDto mappingDto, ResolvedSchema tableSchema, String expectedSql) {
String actualSql = job.buildInsertSql(topic, mappingDto, tableSchema);
assertThat(actualSql).isEqualTo(expectedSql);
}

@ParameterizedTest
@MethodSource("mappingsExceptionData")
void shouldThrowExceptionWhenValidateMappings(Map<String, ResolvedSchema> givenTableSchemaMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ export interface IndexTableMapping {
column_mapping: IndexingColumnMapping[]
}

export interface TableColumnDto {
name: string;
type: string;
nullable: boolean;
}

export interface IndexingColumnMapping {
name: string;
kafka_name?: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,23 @@
nzText="Import/Export"
nzOrientation="center">
</nz-divider>
<nz-form-item>
<nz-form-label>Table Config path</nz-form-label>
<nz-form-control>
<nz-input-group nzPrefixIcon="file">
<input formControlName="_tableFilePath"
nz-input
placeholder="/path/to/tableConfig.json"/>
</nz-input-group>
</nz-form-control>
</nz-form-item>
<nz-form-item>
<nz-form-label>Mapping path</nz-form-label>
<nz-form-control>
<nz-input-group nzPrefixIcon="file">
<input formControlName="_filePath"
<input formControlName="_mappingFilePath"
nz-input
placeholder="/path/to/file.json"/>
placeholder="/path/to/mappingConfig.json"/>
</nz-input-group>
</nz-form-control>
</nz-form-item>
Expand Down
Loading

0 comments on commit b973766

Please sign in to comment.