From 6761856a33e003c06ba787e2fbc400a5c4faf8cc Mon Sep 17 00:00:00 2001 From: Fabian Kovacs <1553491+awildturtok@users.noreply.github.com> Date: Wed, 1 Jul 2020 15:29:39 +0200 Subject: [PATCH] - Cleanup NThreads/Threads variable naming - Rework JobStatus into SortedSet --- .../java/com/bakdata/conquery/Constants.java | 1 - .../commands/CollectEntitiesCommand.java | 2 +- .../commands/PreprocessorCommand.java | 2 +- .../conquery/commands/SlaveCommand.java | 5 +-- .../conquery/models/config/ClusterConfig.java | 3 +- .../models/config/PreprocessingConfig.java | 2 +- .../conquery/models/config/QueryConfig.java | 3 -- .../conquery/models/config/StorageConfig.java | 5 ++- .../models/config/ThreadPoolDefinition.java | 36 ------------------- .../conquery/models/jobs/JobExecutor.java | 4 +-- .../conquery/models/jobs/JobManager.java | 2 -- .../models/jobs/JobManagerStatus.java | 26 +++++++------- 12 files changed, 24 insertions(+), 67 deletions(-) delete mode 100644 backend/src/main/java/com/bakdata/conquery/models/config/ThreadPoolDefinition.java diff --git a/autodoc/src/main/java/com/bakdata/conquery/Constants.java b/autodoc/src/main/java/com/bakdata/conquery/Constants.java index 31de726676..c87d89e492 100644 --- a/autodoc/src/main/java/com/bakdata/conquery/Constants.java +++ b/autodoc/src/main/java/com/bakdata/conquery/Constants.java @@ -54,7 +54,6 @@ import com.bakdata.conquery.models.config.QueryConfig; import com.bakdata.conquery.models.config.StandaloneConfig; import com.bakdata.conquery.models.config.StorageConfig; -import com.bakdata.conquery.models.config.ThreadPoolDefinition; import com.bakdata.conquery.models.config.XodusConfig; import com.bakdata.conquery.models.datasets.Column; import com.bakdata.conquery.models.datasets.Table; diff --git a/backend/src/main/java/com/bakdata/conquery/commands/CollectEntitiesCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/CollectEntitiesCommand.java index 6812ac693a..1e5cb8ee9a 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/CollectEntitiesCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/CollectEntitiesCommand.java @@ -58,7 +58,7 @@ public void configure(Subparser subparser) { protected void run(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception { verbose = Boolean.TRUE.equals(namespace.getBoolean("-verbose")); - ExecutorService pool = Executors.newFixedThreadPool(config.getPreprocessor().getThreads()); + ExecutorService pool = Executors.newFixedThreadPool(config.getPreprocessor().getNThreads()); Collection jobs = findPreprocessedJobs(config); diff --git a/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java index e286731430..98d3db1cee 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java @@ -85,7 +85,7 @@ public void configure(Subparser subparser) { @Override protected void run(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception { if (pool == null) { - pool = Executors.newFixedThreadPool(config.getPreprocessor().getThreads()); + pool = Executors.newFixedThreadPool(config.getPreprocessor().getNThreads()); } final Collection descriptors = new ArrayList<>(); diff --git a/backend/src/main/java/com/bakdata/conquery/commands/SlaveCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/SlaveCommand.java index 737cab77ce..d4c9b37444 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/SlaveCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/SlaveCommand.java @@ -3,7 +3,6 @@ import java.io.File; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Comparator; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -90,7 +89,7 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig log.warn("Had to create Storage Dir at `{}`", config.getStorage().getDirectory()); } - workers = new Workers(new RoundRobinQueue<>(config.getQueries().getRoundRobinQueueCapacity()), config.getQueries().getNThreads(), config.getStorage().getThreads()); + workers = new Workers(new RoundRobinQueue<>(config.getQueries().getRoundRobinQueueCapacity()), config.getQueries().getNThreads(), config.getStorage().getNThreads()); File storageDir = config.getStorage().getDirectory(); for(File directory : storageDir.listFiles((file, name) -> name.startsWith("worker_"))) { @@ -258,8 +257,6 @@ private void reportJobManagerStatus() { jobManagerStatus.getJobs().addAll(worker.getJobManager().reportStatus().getJobs()); } - jobManagerStatus.getJobs().sort(Comparator.comparingDouble(js -> js.getProgressReporter().getProgress())); - try { context.trySend(new UpdateJobManagerStatus(jobManagerStatus)); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java index 44277ad2ba..17957cd4c1 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/ClusterConfig.java @@ -21,5 +21,6 @@ public class ClusterConfig extends Configuration { private MinaConfig mina = new MinaConfig(); @Min(1) private int entityBucketSize = 1000; - + + } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/PreprocessingConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/PreprocessingConfig.java index f245389381..16a4135c4a 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/PreprocessingConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/PreprocessingConfig.java @@ -13,7 +13,7 @@ public class PreprocessingConfig { @Valid private PreprocessingDirectories[] directories; @Min(1) - private int threads = Runtime.getRuntime().availableProcessors(); + private int nThreads = Runtime.getRuntime().availableProcessors(); @Min(0) private int maximumPrintedErrors = 10; diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/QueryConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/QueryConfig.java index f562618ee8..764b1d0af7 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/QueryConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/QueryConfig.java @@ -8,9 +8,6 @@ @Getter @Setter @ToString public class QueryConfig { - @Deprecated - private ThreadPoolDefinition executionPool = new ThreadPoolDefinition(); - private int roundRobinQueueCapacity = 10; private int nThreads = Runtime.getRuntime().availableProcessors(); diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/StorageConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/StorageConfig.java index b7b2cfe7a9..56c9171432 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/StorageConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/StorageConfig.java @@ -6,7 +6,6 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; -import com.bakdata.conquery.models.exceptions.validators.ExistingFile; import io.dropwizard.util.Duration; import lombok.Getter; import lombok.Setter; @@ -25,6 +24,6 @@ public class StorageConfig { @NotNull private Duration weakCacheDuration = Duration.hours(48); - @Min(0) - private int threads = Runtime.getRuntime().availableProcessors(); + @Min(1) + private int nThreads = Runtime.getRuntime().availableProcessors(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/ThreadPoolDefinition.java b/backend/src/main/java/com/bakdata/conquery/models/config/ThreadPoolDefinition.java deleted file mode 100644 index 4765ac846c..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/models/config/ThreadPoolDefinition.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.bakdata.conquery.models.config; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; - -import javax.validation.constraints.Min; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.dropwizard.util.Duration; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; - -@Getter @Setter @ToString -public class ThreadPoolDefinition { - @Min(0) - private int minThreads = Runtime.getRuntime().availableProcessors(); - @Min(1) - private int maxThreads = Runtime.getRuntime().availableProcessors(); - private boolean allowCoreThreadTimeOut = false; - private Duration keepAliveTime = Duration.seconds(60); - private Duration shutdownTime = Duration.hours(1); - - public ThreadPoolExecutor createService(String nameFormat) { - final ThreadPoolExecutor executor = new ThreadPoolExecutor( - minThreads, - maxThreads, - keepAliveTime.getQuantity(), - keepAliveTime.getUnit(), - new LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setNameFormat(nameFormat).build() - ); - executor.allowCoreThreadTimeOut(allowCoreThreadTimeOut); - return executor; - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/JobExecutor.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/JobExecutor.java index 27d56a995f..5df9232976 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/JobExecutor.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/JobExecutor.java @@ -55,9 +55,9 @@ public boolean cancelJob(UUID jobId) { } public List getJobs() { - List jobs = new ArrayList<>(this.jobs.size()+1); + List jobs = new ArrayList<>(this.jobs.size() + 1); Job current = currentJob.get(); - if(current!=null) { + if (current != null) { jobs.add(current); } jobs.addAll(this.jobs); diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/JobManager.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/JobManager.java index 5845f6e70b..8461dc51a6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/JobManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/JobManager.java @@ -1,6 +1,5 @@ package com.bakdata.conquery.models.jobs; -import java.time.LocalDateTime; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; @@ -43,7 +42,6 @@ public void stop() throws Exception { public JobManagerStatus reportStatus() { return new JobManagerStatus( - LocalDateTime.now(), getSlowJobs() .stream() .map(job -> new JobStatus(job.getJobId(), job.getProgressReporter(), job.getLabel(), job.isCancelled())) diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/JobManagerStatus.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/JobManagerStatus.java index 1d59121af2..4e57cdddb5 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/JobManagerStatus.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/JobManagerStatus.java @@ -2,27 +2,29 @@ import java.time.Duration; import java.time.LocalDateTime; -import java.util.Collections; -import java.util.List; +import java.util.Collection; +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; import javax.validation.constraints.NotNull; import com.fasterxml.jackson.annotation.JsonIgnore; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; +import lombok.Data; import lombok.NonNull; -import lombok.Setter; -import lombok.ToString; -@NoArgsConstructor @AllArgsConstructor @Getter @Setter @ToString + +@Data public class JobManagerStatus { @NonNull @NotNull - private LocalDateTime timestamp; + private final LocalDateTime timestamp = LocalDateTime.now(); @NotNull - private List jobs = Collections.emptyList(); - + private final SortedSet jobs = new TreeSet<>(Comparator.comparingDouble(job -> job.getProgressReporter().getStartTime())); + + public JobManagerStatus(Collection jobs){ + this.jobs.addAll(jobs); + } + public int size() { return jobs.size(); }