Skip to content

Commit

Permalink
- Cleanup NThreads/Threads variable naming
Browse files Browse the repository at this point in the history
- Rework JobStatus into SortedSet
  • Loading branch information
awildturtok committed Jul 1, 2020
1 parent 0be8f1b commit 6761856
Show file tree
Hide file tree
Showing 12 changed files with 24 additions and 67 deletions.
1 change: 0 additions & 1 deletion autodoc/src/main/java/com/bakdata/conquery/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import com.bakdata.conquery.models.config.QueryConfig;
import com.bakdata.conquery.models.config.StandaloneConfig;
import com.bakdata.conquery.models.config.StorageConfig;
import com.bakdata.conquery.models.config.ThreadPoolDefinition;
import com.bakdata.conquery.models.config.XodusConfig;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void configure(Subparser subparser) {
protected void run(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception {
verbose = Boolean.TRUE.equals(namespace.getBoolean("-verbose"));

ExecutorService pool = Executors.newFixedThreadPool(config.getPreprocessor().getThreads());
ExecutorService pool = Executors.newFixedThreadPool(config.getPreprocessor().getNThreads());

Collection<EntityExtractor> jobs = findPreprocessedJobs(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void configure(Subparser subparser) {
@Override
protected void run(Environment environment, Namespace namespace, ConqueryConfig config) throws Exception {
if (pool == null) {
pool = Executors.newFixedThreadPool(config.getPreprocessor().getThreads());
pool = Executors.newFixedThreadPool(config.getPreprocessor().getNThreads());
}

final Collection<TableImportDescriptor> descriptors = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.io.File;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -90,7 +89,7 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig
log.warn("Had to create Storage Dir at `{}`", config.getStorage().getDirectory());
}

workers = new Workers(new RoundRobinQueue<>(config.getQueries().getRoundRobinQueueCapacity()), config.getQueries().getNThreads(), config.getStorage().getThreads());
workers = new Workers(new RoundRobinQueue<>(config.getQueries().getRoundRobinQueueCapacity()), config.getQueries().getNThreads(), config.getStorage().getNThreads());

File storageDir = config.getStorage().getDirectory();
for(File directory : storageDir.listFiles((file, name) -> name.startsWith("worker_"))) {
Expand Down Expand Up @@ -258,8 +257,6 @@ private void reportJobManagerStatus() {
jobManagerStatus.getJobs().addAll(worker.getJobManager().reportStatus().getJobs());
}

jobManagerStatus.getJobs().sort(Comparator.comparingDouble(js -> js.getProgressReporter().getProgress()));

try {
context.trySend(new UpdateJobManagerStatus(jobManagerStatus));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ public class ClusterConfig extends Configuration {
private MinaConfig mina = new MinaConfig();
@Min(1)
private int entityBucketSize = 1000;



}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class PreprocessingConfig {
@Valid
private PreprocessingDirectories[] directories;
@Min(1)
private int threads = Runtime.getRuntime().availableProcessors();
private int nThreads = Runtime.getRuntime().availableProcessors();
@Min(0)
private int maximumPrintedErrors = 10;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
@Getter @Setter @ToString
public class QueryConfig {

@Deprecated
private ThreadPoolDefinition executionPool = new ThreadPoolDefinition();

private int roundRobinQueueCapacity = 10;
private int nThreads = Runtime.getRuntime().availableProcessors();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import com.bakdata.conquery.models.exceptions.validators.ExistingFile;
import io.dropwizard.util.Duration;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -25,6 +24,6 @@ public class StorageConfig {
@NotNull
private Duration weakCacheDuration = Duration.hours(48);

@Min(0)
private int threads = Runtime.getRuntime().availableProcessors();
@Min(1)
private int nThreads = Runtime.getRuntime().availableProcessors();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public boolean cancelJob(UUID jobId) {
}

public List<Job> getJobs() {
List<Job> jobs = new ArrayList<>(this.jobs.size()+1);
List<Job> jobs = new ArrayList<>(this.jobs.size() + 1);
Job current = currentJob.get();
if(current!=null) {
if (current != null) {
jobs.add(current);
}
jobs.addAll(this.jobs);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.bakdata.conquery.models.jobs;

import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -43,7 +42,6 @@ public void stop() throws Exception {

public JobManagerStatus reportStatus() {
return new JobManagerStatus(
LocalDateTime.now(),
getSlowJobs()
.stream()
.map(job -> new JobStatus(job.getJobId(), job.getProgressReporter(), job.getLabel(), job.isCancelled()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,29 @@

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Collection;
import java.util.Comparator;
import java.util.SortedSet;
import java.util.TreeSet;

import javax.validation.constraints.NotNull;

import com.fasterxml.jackson.annotation.JsonIgnore;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Data;
import lombok.NonNull;
import lombok.Setter;
import lombok.ToString;

@NoArgsConstructor @AllArgsConstructor @Getter @Setter @ToString

@Data
public class JobManagerStatus {
@NonNull @NotNull
private LocalDateTime timestamp;
private final LocalDateTime timestamp = LocalDateTime.now();
@NotNull
private List<JobStatus> jobs = Collections.emptyList();

private final SortedSet<JobStatus> jobs = new TreeSet<>(Comparator.comparingDouble(job -> job.getProgressReporter().getStartTime()));

public JobManagerStatus(Collection<? extends JobStatus> jobs){
this.jobs.addAll(jobs);
}

public int size() {
return jobs.size();
}
Expand Down

0 comments on commit 6761856

Please sign in to comment.