Skip to content

Commit

Permalink
alpha2
Browse files Browse the repository at this point in the history
  • Loading branch information
andreabattaglia committed Aug 9, 2021
1 parent eb09a60 commit 1efccd9
Show file tree
Hide file tree
Showing 24 changed files with 451 additions and 216 deletions.
7 changes: 4 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
<quarkus.platform.version>2.0.3.Final</quarkus.platform.version>
<quarkus.platform.version>2.1.0.Final</quarkus.platform.version>
<surefire-plugin.version>3.0.0-M5</surefire-plugin.version>
</properties>
<dependencyManagement>
Expand All @@ -27,7 +27,7 @@
<!-- <version>${quarkus.platform.version}</version> -->
<groupId>io.quarkus</groupId>
<artifactId>quarkus-universe-bom</artifactId>
<version>2.0.3.Final</version>
<version>2.1.0.Final</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand All @@ -40,8 +40,9 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-jackson</artifactId>
<artifactId>quarkus-jackson</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-artemis-jms</artifactId>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.qiot.manufacturing.edge.machinery.domain.event.chain;

import io.qiot.manufacturing.commons.domain.productionvalidation.AbstractProductionChainEvent;
import io.qiot.manufacturing.commons.domain.productionvalidation.AbstractProductionChainEventDTO;
import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class ValidationFailedEvent extends AbstractProductionChainEvent {
public class ValidationFailedEvent extends AbstractProductionChainEventDTO {
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.qiot.manufacturing.edge.machinery.domain.event.chain;

import io.qiot.manufacturing.commons.domain.productionvalidation.AbstractProductionChainEvent;
import io.qiot.manufacturing.commons.domain.productionvalidation.AbstractProductionChainEventDTO;
import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class ValidationSuccessfullEvent extends AbstractProductionChainEvent {
public class ValidationSuccessfullEvent extends AbstractProductionChainEventDTO {
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.qiot.manufacturing.edge.machinery.domain.event;
package io.qiot.manufacturing.edge.machinery.domain.event.productline;

import io.qiot.manufacturing.commons.domain.productline.ProductLineDTO;
import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class ProductLineChangedEvent {
public class ProductLineChangedEventDTO {
public ProductLineDTO productLine;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.qiot.manufacturing.edge.machinery.domain.event.productline;

import java.util.UUID;

/**
* @author andreabattaglia
*
*/
public class RequestLatestProductLineEventDTO {
public UUID productLineId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package io.qiot.manufacturing.edge.machinery.domain.event.productline;

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.jms.ConnectionFactory;

import org.slf4j.Logger;

import io.qiot.manufacturing.edge.machinery.domain.event.BootstrapCompletedEvent;
import io.qiot.manufacturing.commons.domain.event.BootstrapCompletedEventDTO;
import io.qiot.manufacturing.edge.machinery.service.machinery.MachineryService;
import io.qiot.manufacturing.edge.machinery.service.production.ProductionChainService;
import io.qiot.manufacturing.edge.machinery.util.exception.DataValidationException;
Expand Down Expand Up @@ -38,16 +39,17 @@ public class CoreServiceImpl implements CoreService {
ProductionChainService productionChainService;

@Inject
Event<BootstrapCompletedEvent> event;

// private StationDataBean stationData;
ConnectionFactory connectionFactory;

@Inject
Event<BootstrapCompletedEventDTO> event;

void onStart(@Observes StartupEvent ev) throws DataValidationException {
LOGGER.info("The application is starting...{}");
// stationData =
scheduler.pause();
machineryService.checkRegistration();
event.fire(new BootstrapCompletedEvent());
event.fire(new BootstrapCompletedEventDTO());
scheduler.resume();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ public void produce() {
ProductLineDTO productLineDTO = productLineService
.getCurrentProductLine();

int itemId = countersService
.recordNewItem(productLineDTO.id);
int itemId = countersService.recordNewItem(productLineDTO.id);

// setup new item
conveyorBeltService.createNewItem(productLineDTO.id, itemId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.qiot.manufacturing.edge.machinery.service.productline;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.jms.ConnectionFactory;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.slf4j.Logger;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.qiot.manufacturing.commons.domain.event.BootstrapCompletedEventDTO;
import io.qiot.manufacturing.commons.domain.productionvalidation.ValidationResponseDTO;
import io.qiot.manufacturing.commons.domain.productline.ProductLineDTO;
import io.qiot.manufacturing.commons.util.producer.ProductLineReplyToQueueNameProducer;
import io.qiot.manufacturing.commons.util.producer.ValidationReplyToQueueNameProducer;
import io.qiot.manufacturing.edge.machinery.domain.event.chain.ValidationFailedEvent;
import io.qiot.manufacturing.edge.machinery.domain.event.chain.ValidationSuccessfullEvent;
import io.qiot.manufacturing.edge.machinery.domain.event.productline.ProductLineChangedEventDTO;
import io.qiot.manufacturing.edge.machinery.service.machinery.MachineryService;

@ApplicationScoped
public class LatestProductLineMessageConsumer implements Runnable {

@Inject
Logger LOGGER;

@Inject
ObjectMapper MAPPER;

@Inject
ConnectionFactory connectionFactory;

@Inject
MachineryService machineryService;

@Inject
ProductLineReplyToQueueNameProducer productLineReplyToQueueNameProducer;

@Inject
Event<ProductLineChangedEventDTO> prodictLineChangedEvent;

private JMSContext context;

private JMSConsumer consumer;

private String replyToQueueName;

private Queue replyToQueue;

private final ExecutorService scheduler = Executors
.newSingleThreadExecutor();

void init(@Observes BootstrapCompletedEventDTO event) {
LOGGER.info("Bootstrapping new product line durable subscriber...");
initSubscriber();

scheduler.submit(this);
LOGGER.debug("Bootstrap completed");
}

private void initSubscriber() {
if (Objects.nonNull(context))
context.close();
context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE);

replyToQueueName = productLineReplyToQueueNameProducer
.getReplyToQueueName(machineryService.getMachineryId());

replyToQueue = context.createQueue(replyToQueueName);

consumer = context.createConsumer(replyToQueue);
}

@PreDestroy
void destroy() {
scheduler.shutdown();
context.close();
}

@Override
public void run() {
// while (true) {
try {
Message message = consumer.receive();
String messagePayload = message.getBody(String.class);
ProductLineDTO productLine = MAPPER.readValue(messagePayload,
ProductLineDTO.class);
LOGGER.info("Received latest PRODUCTLINE available from the Factory Controller: \n {}", productLine);
ProductLineChangedEventDTO eventDTO = new ProductLineChangedEventDTO();
eventDTO.productLine = productLine;
prodictLineChangedEvent.fire(eventDTO);
} catch (JMSException e) {
LOGGER.error(
"The messaging client returned an error: {} and will be restarted.",
e);
initSubscriber();
} catch (JsonProcessingException e) {
LOGGER.error(
"The message payload is malformed and the validation request will not be sent: {}",
e);
} catch (Exception e) {
LOGGER.error("GENERIC ERROR", e);
}
// }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
*
*/
package io.qiot.manufacturing.edge.machinery.service.productline;

import java.util.Objects;
import java.util.UUID;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* @author andreabattaglia
*
*/
@ApplicationScoped
public class LatestProductLineRequestMessageProducer {

@Inject
Logger LOGGER;

@Inject
ConnectionFactory connectionFactory;

@Inject
ObjectMapper MAPPER;

@ConfigProperty(name = "qiot.productline.request.queue-prefix")
String latestProductLineRequestQueueName;

private JMSContext context;

private JMSProducer producer;

private Queue queue;

@PostConstruct
void init() {
LOGGER.info(
"Bootstrapping latest product line request event producer...");
doInit();

LOGGER.info("Bootstrap completed");

}

private void doInit() {
if (Objects.nonNull(context))
context.close();
context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE);

producer = context.createProducer();

queue = context.createQueue(latestProductLineRequestQueueName);
}

void requestLatestProductLine(String machineryId) {
LOGGER.info(
"Sending out a request for the latest product line available");
try {
String messagePayload = machineryId;

producer.send(queue, messagePayload);
} catch (Exception e) {
LOGGER.error("GENERIC ERROR", e);
}

}
}
Loading

0 comments on commit 1efccd9

Please sign in to comment.