Skip to content

Commit

Permalink
[CYB-193][UI] Landing page for pipelines. (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
vpavlenko-cv authored Nov 4, 2024
1 parent dcf2ea3 commit 71cf8d1
Show file tree
Hide file tree
Showing 119 changed files with 2,995 additions and 2,284 deletions.
6 changes: 6 additions & 0 deletions flink-cyber/cyber-jobs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.cloudera.cyber</groupId>
<artifactId>cyber-worker-service</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
7 changes: 7 additions & 0 deletions flink-cyber/cyber-jobs/src/main/assemblies/cloudera.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@
<fileMode>0644</fileMode>
</file>

<file>
<source>../cyber-services/cyber-worker-service/target/cyber-worker-service-${project.version}.jar</source>
<outputDirectory>jobs/</outputDirectory>
<destName>cyber-worker-service-${cybersec.full.version}.jar</destName>
<fileMode>0644</fileMode>
</file>

<file>
<source>../flink-commands/scoring-commands/target/scoring-commands-${project.version}.jar</source>
<outputDirectory>tools/</outputDirectory>
Expand Down
5 changes: 5 additions & 0 deletions flink-cyber/cyber-services/cyber-service-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RequestBody {
private String clusterServiceId;
private String jobIdHex;
private String pipelineDir;
private String pipelineName;
private String branch;
private String profileName;
private List<String> jobs;
private byte[] payload;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.cloudera.service.common.request;

public enum RequestType {
GET_ALL_CLUSTERS_SERVICE_REQUEST, GET_CLUSTER_SERVICE_REQUEST, START_JOB_REQUEST, RESTART_JOB_REQUEST, STOP_JOB_REQUEST, GET_JOB_CONFIG_REQUEST, UPDATE_JOB_CONFIG_REQUEST
GET_ALL_CLUSTERS_SERVICE_REQUEST, GET_CLUSTER_SERVICE_REQUEST, START_JOB_REQUEST, RESTART_JOB_REQUEST, STOP_JOB_REQUEST, GET_JOB_CONFIG_REQUEST, CREATE_EMPTY_PIPELINE, START_ARCHIVE_PIPELINE, UPDATE_JOB_CONFIG_REQUEST
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class Job {

private JobType jobType;

private String user;

@AllArgsConstructor
@Getter
public enum JobType {
Expand All @@ -46,7 +48,6 @@ public enum JobType {

public String[] getScript(Job job) {
switch (this) {
case GENERATOR:
case PROFILE:
case PARSER:
return new String[]{scriptName, job.getJobBranch(), job.getJobPipeline(), job.getJobName()};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.cloudera.service.common.response;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Pipeline {
String id;
String name;
String clusterName;
String date;
String userName;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.cloudera.service.common.response;

public enum ResponseType {
GET_ALL_CLUSTERS_SERVICE_RESPONSE, GET_CLUSTER_SERVICE_RESPONSE, START_JOB_RESPONSE, RESTART_JOB_RESPONSE, STOP_JOB_RESPONSE, GET_JOB_CONFIG_RESPONSE, UPDATE_JOB_CONFIG_RESPONSE, ERROR_RESPONSE
GET_ALL_CLUSTERS_SERVICE_RESPONSE, GET_CLUSTER_SERVICE_RESPONSE, START_JOB_RESPONSE, RESTART_JOB_RESPONSE, STOP_JOB_RESPONSE, GET_JOB_CONFIG_RESPONSE, UPDATE_JOB_CONFIG_RESPONSE, CREATE_EMPTY_PIPELINE_RESPONSE, START_ARCHIVE_PIPELINE_RESPONSE, ERROR_RESPONSE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.cloudera.cyber.restcli.configuration;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "cluster")
@Getter
@Setter
public class AppWorkerConfig {
private String name;
private String id;
private String status;
private String version;
private String pipelineDir;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.cloudera.cyber.restcli.controller;

import com.cloudera.cyber.restcli.configuration.AppWorkerConfig;
import com.cloudera.cyber.restcli.service.JobService;
import com.cloudera.cyber.restcli.service.FilePipelineService;
import com.cloudera.service.common.Utils;
import com.cloudera.service.common.request.RequestBody;
import com.cloudera.service.common.request.RequestType;
Expand Down Expand Up @@ -29,18 +31,11 @@
@RequiredArgsConstructor
@Slf4j
public class KafkaListenerController {

@Value("${cluster.name}")
private String clusterName;
@Value("${cluster.id}")
private String clusterId;
@Value("${cluster.status}")
private String clusterStatus;
@Value("${cluster.version}")
private String clusterVersion;

private final JobService jobService;
private final FilePipelineService pipelineService;
private final AppWorkerConfig config;

//TODO: Rewrite to Spring events. Probably split the events into separate types, such as cluster event, job event, pipeline event, etc.
@KafkaListener(topics = "#{kafkaProperties.getRequestTopic()}", containerFactory = "kafkaListenerContainerFactory")
@SendTo({"#{kafkaProperties.getReplyTopic()}"})
public Message<ResponseBody> handleMessage(RequestBody requestBody, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
Expand Down Expand Up @@ -85,6 +80,26 @@ public Message<ResponseBody> handleMessage(RequestBody requestBody, @Header(Kafk
} catch (IOException e) {
return handleErrorResponse(e, replyTo, correlationId);
}
case CREATE_EMPTY_PIPELINE:
try {
pipelineService.createEmptyPipeline(requestBody.getPipelineName(), requestBody.getBranch());
final ResponseBody responseBody = ResponseBody.builder().build();
return buildResponseMessage(responseBody, ResponseType.CREATE_EMPTY_PIPELINE_RESPONSE, replyTo, correlationId);
} catch (Exception e) {
return handleErrorResponse(e, replyTo, correlationId);
}
case START_ARCHIVE_PIPELINE:
try {
pipelineService.extractPipeline(requestBody.getPayload(), requestBody.getPipelineName(), requestBody.getBranch());
pipelineService.startPipelineJob(requestBody.getPipelineName(), requestBody.getBranch(), requestBody.getProfileName(), requestBody.getJobs());
final ResponseBody responseBody = ResponseBody.builder().build();
return buildResponseMessage(responseBody, ResponseType.START_ARCHIVE_PIPELINE_RESPONSE, replyTo, correlationId);
} catch (Exception e) {
log.error("Exception while processing the Start All request {}", e.getMessage());
return handleErrorResponse(e, replyTo, correlationId);

}

}
return null;
}
Expand All @@ -95,10 +110,10 @@ private Message<ResponseBody> getResponseBodyMessage(byte[] replyTo, byte[] corr
ResponseBody responseBody = ResponseBody.builder()
.jobs(jobs)
.clusterMeta(ClusterMeta.builder()
.name(clusterName)
.clusterId(clusterId)
.clusterStatus(clusterStatus)
.version(clusterVersion)
.name(config.getName())
.clusterId(config.getId())
.clusterStatus(config.getStatus())
.version(config.getVersion())
.build())
.build();
return buildResponseMessage(responseBody, responseType, replyTo, correlationId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.cloudera.cyber.restcli.service;

import com.cloudera.cyber.restcli.configuration.AppWorkerConfig;
import com.cloudera.service.common.Utils;
import com.cloudera.service.common.response.Job;
import com.cloudera.service.common.utils.ArchiveUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@Service
@RequiredArgsConstructor
public class FilePipelineService {
private final AppWorkerConfig config;


public void createEmptyPipeline(String pipelineName, String branchName) {
String fullPath = this.config.getPipelineDir().endsWith("/") ? this.config.getPipelineDir() + pipelineName + "/" + branchName
: this.config.getPipelineDir() + "/" + pipelineName + "/" + branchName;
File directory = new File(fullPath);
if (directory.mkdirs()) {
log.info("Create full path {}", fullPath);
}
try {
ProcessBuilder processBuilder = new ProcessBuilder("cs-create-pipeline", pipelineName);
processBuilder.directory(directory);
Process process = processBuilder.start();
process.waitFor();
} catch (IOException ioe) {
log.error("Caught get IOException {} ", ioe.getMessage());
} catch (InterruptedException e) {
log.error("Caught Interrupt Exception with message {} ", e.getMessage());
}
}

public void extractPipeline(byte[] payload, String pipelineName, String branch) throws IOException {
String fullPipelinePath = pipelineName.endsWith("/") ? this.config.getPipelineDir() + pipelineName + "/" + branch : this.config.getPipelineDir() + "/" + pipelineName + "/" + branch;
ArchiveUtil.decompressFromTarGzInMemory(payload, fullPipelinePath, true);
}

public void startPipelineJob(String pipelineName, String branch, String profileName, List<String> jobsNames) throws IOException {
String fullPipelinePath = pipelineName.endsWith("/") ?this.config.getPipelineDir() + pipelineName + "/" + branch
: this.config.getPipelineDir() + "/" + pipelineName + "/" + branch;

List<Job> jobs = jobsNames.stream().map(jobName -> Job.builder()
.jobPipeline(pipelineName)
.jobType(Utils.getEnumFromString(jobName, Job.JobType.class, Job.JobType::getName))
.jobBranch(branch)
.jobName(StringUtils.defaultString(profileName, "main"))
.build()).collect(Collectors.toList());
for (Job job : jobs) {
job.getJobType().getScript(job);
ProcessBuilder processBuilder = new ProcessBuilder(job.getJobType().getScript(job));
processBuilder.directory(new File(fullPipelinePath));
Process process = processBuilder.start();
Thread clt = new Thread(() -> {
try {
log.info(IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8));
log.error(IOUtils.toString(process.getErrorStream(), StandardCharsets.UTF_8));
} catch (IOException e) {
log.error("Error happens on stream reading from bash {}", e.getMessage());
}
});
clt.setDaemon(true);
clt.start();
}
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.cloudera.cyber.restcli.service;

import com.cloudera.cyber.restcli.configuration.AppWorkerConfig;
import com.cloudera.service.common.Utils;
import com.cloudera.service.common.response.Job;
import com.cloudera.service.common.utils.ArchiveUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.BufferedReader;
Expand All @@ -27,12 +28,13 @@

@Slf4j
@Service
@RequiredArgsConstructor
public class JobService {
@Value("${cluster.pipeline.dir}")
private String pipelineDir;
public static final String LOG_CLI_JOB_INFO = "Successfully read jobs from cli with exit code {}. job count '{}' jobs data '[{}]'";
private final Pattern pattern = Pattern.compile("^(?<date>[\\d.:\\s]+)\\s:\\s(?<jobId>[a-fA-F0-9]+)\\s:\\s(?<jobFullName>[\\w.-]+)\\s\\((?<jobStatus>\\w+)\\)$");

private final AppWorkerConfig config;


public List<Job> getJobs() throws IOException {
List<Job> jobs = new ArrayList<>();
Expand All @@ -56,8 +58,8 @@ public Job restartJob(String id) throws IOException {
log.info("Script command = '{}'", Arrays.toString(job.getJobType().getScript(job)));
try {
ProcessBuilder processBuilder = new ProcessBuilder(job.getJobType().getScript(job));
if (pipelineDir != null) {
processBuilder.directory(new File(pipelineDir));
if (config.getPipelineDir() != null) {
processBuilder.directory(new File(config.getPipelineDir()));
}
Process process = processBuilder.start();
log.debug("Command input stream '{}' \n Command error stream '{}'", IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8), IOUtils.toString(process.getErrorStream(), StandardCharsets.UTF_8));
Expand Down Expand Up @@ -155,12 +157,12 @@ private void setJobParameters(Job job, String fullJobName) {
String[] jobParameters = fullJobName.split("\\.");
job.setJobBranch(jobParameters[0]);
job.setJobPipeline(jobParameters[1]);
if (job.getJobType() == Job.JobType.PROFILE || job.getJobType() == Job.JobType.GENERATOR || job.getJobType() == Job.JobType.PARSER) {
if (job.getJobType() == Job.JobType.PROFILE || job.getJobType() == Job.JobType.PARSER) {
job.setJobName(jobParameters[jobParameters.length - 1]);
}
}

public void updateConfig(byte[] payload) throws IOException {
ArchiveUtil.decompressFromTarGzInMemory(payload, pipelineDir, true);
ArchiveUtil.decompressFromTarGzInMemory(payload, config.getPipelineDir(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cluster.name=testName
cluster.id=1
cluster.status=online
cluster.version=1.0.0
cluster.pipeline.dir=/tmp/test
cluster.pipelineDir=/tmp/test
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=testGroup
spring.kafka.consumer.auto-offset-reset=earliest
Expand Down
11 changes: 11 additions & 0 deletions flink-cyber/flink-commands/json-commands/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
Expand Down
1 change: 1 addition & 0 deletions flink-cyber/flink-commands/scoring-commands/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
5 changes: 5 additions & 0 deletions flink-cyber/flink-profiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
<artifactId>flink-avro</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
"simple-import-sort/imports": "off",
"array-bracket-spacing": "off",
"no-underscore-dangle": "off",
"@typescript-eslint/member-ordering": [
"warn",
{
"default": ["static-field", "instance-field", "static-method", "instance-method"]
}],
"@typescript-eslint/no-unused-vars": [
"warn",
{
Expand Down
Loading

0 comments on commit 71cf8d1

Please sign in to comment.