Skip to content

Commit

Permalink
feat: define $9 subfield definitions for MARC bibliographic records (f…
Browse files Browse the repository at this point in the history
…olio-org#331)

Closes: MODELINKS-255
  • Loading branch information
psmagin authored Oct 7, 2024
1 parent 55e4c5b commit 3161f7d
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 83 deletions.
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Requires `instance-storage v10.0 or v11.0`

### Features
* Description ([ISSUE](https://folio-org.atlassian.net//browse/ISSUE))
* Define $9 subfield definitions for MARC bibliographic records ([MODELINKS-255](https://folio-org.atlassian.net//browse/MODELINKS-255))

### Bug fixes
* Do not delete kafka topics if tenant collection topic feature is enabled ([MODELINKS-233](https://folio-org.atlassian.net/browse/MODELINKS-233))
Expand Down
4 changes: 4 additions & 0 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
{
"id": "consortia",
"version": "1.0"
},
{
"id": "specification-storage",
"version": "1.0"
}
],
"provides": [
Expand Down
16 changes: 13 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<folio-spring-support.version>8.2.0-SNAPSHOT</folio-spring-support.version>
<folio-service-tools.version>4.1.0-SNAPSHOT</folio-service-tools.version>
<folio-s3-client.version>2.2.0-SNAPSHOT</folio-s3-client.version>
<mod-record-specifications-dto.version>1.0.0-SNAPSHOT</mod-record-specifications-dto.version>
<aws-sdk-java.version>2.28.7</aws-sdk-java.version>
<mapstruct.version>1.6.2</mapstruct.version>
<lombok.mapstruct-binding.version>0.2.0</lombok.mapstruct-binding.version>
Expand All @@ -39,6 +40,9 @@
<maven-copy-rename-plugin.version>1.0.1</maven-copy-rename-plugin.version>
<maven-checkstyle-plugin.version>3.5.0</maven-checkstyle-plugin.version>
<maven-surefire-plugin.version>3.5.0</maven-surefire-plugin.version>
<build-helper-maven-plugin.version>3.6.0</build-helper-maven-plugin.version>
<maven-failsafe-plugin.version>3.5.0</maven-failsafe-plugin.version>
<maven-release-plugin.version>3.1.1</maven-release-plugin.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -79,6 +83,12 @@
<version>${folio-s3-client.version}</version>
</dependency>

<dependency>
<groupId>org.folio</groupId>
<artifactId>mod-record-specifications-dto</artifactId>
<version>${mod-record-specifications-dto.version}</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
Expand Down Expand Up @@ -257,7 +267,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.6.0</version>
<version>${build-helper-maven-plugin.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
Expand Down Expand Up @@ -424,7 +434,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.5.0</version>
<version>${maven-failsafe-plugin.version}</version>
<executions>
<execution>
<goals>
Expand All @@ -441,7 +451,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>3.1.1</version>
<version>${maven-release-plugin.version}</version>
<configuration>
<preparationGoals>clean verify</preparationGoals>
<tagNameFormat>v@{project.version}</tagNameFormat>
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/org/folio/entlinks/config/KafkaConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.folio.entlinks.integration.dto.event.DomainEvent;
import org.folio.entlinks.integration.kafka.AuthorityChangeFilterStrategy;
import org.folio.entlinks.integration.kafka.EventProducer;
import org.folio.rspec.domain.dto.UpdateRequestEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -150,7 +151,7 @@ public EventProducer<LinkUpdateReport> linkUpdateReportMessageProducerService(
}

@Bean
public <T> ProducerFactory<String, DomainEvent<T>> domainProducerFactory(KafkaProperties kafkaProperties) {
public <T> ProducerFactory<String, T> domainProducerFactory(KafkaProperties kafkaProperties) {
return getProducerConfigProps(kafkaProperties);
}

Expand All @@ -160,6 +161,12 @@ public <T> KafkaTemplate<String, DomainEvent<T>> domainKafkaTemplate(
return new KafkaTemplate<>(domainProducerFactory);
}

@Bean
public KafkaTemplate<String, UpdateRequestEvent> specificationRequestKafkaTemplate(
ProducerFactory<String, UpdateRequestEvent> domainProducerFactory) {
return new KafkaTemplate<>(domainProducerFactory);
}

@Bean("authorityDomainMessageProducer")
public <T> EventProducer<DomainEvent<T>> authorityDomainMessageProducerService(
KafkaTemplate<String, DomainEvent<T>> template) {
Expand All @@ -172,6 +179,12 @@ public <T> EventProducer<DomainEvent<T>> authoritySourceFileDomainMessageProduce
return new EventProducer<>(template, "authority.authority-source-file");
}

@Bean("subfieldUpdateRequestEventMessageProducer")
public EventProducer<UpdateRequestEvent> specificationRequestEventMessageProducerService(
KafkaTemplate<String, UpdateRequestEvent> template) {
return new EventProducer<>(template, "specification-storage.specification.update");
}

private <T> ConcurrentKafkaListenerContainerFactory<String, T> listenerFactory(
ConsumerFactory<String, T> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, T>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.folio.entlinks.integration.kafka;

import static org.folio.spring.tools.kafka.KafkaUtils.getTenantTopicName;
import static org.folio.spring.tools.kafka.KafkaUtils.toKafkaHeaders;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -11,14 +14,13 @@
import org.codehaus.plexus.util.StringUtils;
import org.folio.entlinks.integration.dto.event.BaseEvent;
import org.folio.entlinks.utils.DateUtils;
import org.folio.entlinks.utils.KafkaUtils;
import org.folio.spring.FolioExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;

@Log4j2
@RequiredArgsConstructor
public class EventProducer<T extends BaseEvent> {
public class EventProducer<T> {

private final KafkaTemplate<String, T> template;
private final String topicName;
Expand All @@ -31,7 +33,7 @@ public void sendMessage(String key, T msgBody, Object... headers) {
log.debug("Sending event to Kafka [topic: {}, body: {}]", topicName, msgBody);
if (headers.length % 2 != 0) {
throw new IllegalArgumentException(
String.format("Wrong number of %s header key and value pairs are provided", headers.length));
String.format("Wrong number of %s header key and value pairs are provided", headers.length));
}
var headersMap = new HashMap<String, Collection<String>>();
for (int i = 0; i < headers.length; i += 2) {
Expand All @@ -42,17 +44,26 @@ public void sendMessage(String key, T msgBody, Object... headers) {
}

public void sendMessages(List<T> msgBodies) {
log.info("Sending events to Kafka [topic: {}, number: {}]", topicName, msgBodies.size());
log.trace("Sending events to Kafka [topic: {}, bodies: {}]", topicName, msgBodies);
if (log.isTraceEnabled()) {
log.trace("Sending events to Kafka [topic: {}, bodies: {}]", topicName, msgBodies);
} else {
log.info("Sending events to Kafka [topic: {}, number: {}]", topicName, msgBodies.size());
}
msgBodies.stream()
.map(this::toProducerRecord)
.forEach(template::send);
}

private ProducerRecord<String, T> toProducerRecord(T msgBody) {
return toProducerRecord(null, msgBody, Collections.emptyMap());
}

private ProducerRecord<String, T> toProducerRecord(String key, T msgBody,
Map<String, Collection<String>> headersMap) {
msgBody.setTenant(context.getTenantId());
msgBody.setTs(DateUtils.currentTsInString());
if (msgBody instanceof BaseEvent baseEvent) {
baseEvent.setTenant(context.getTenantId());
baseEvent.setTs(DateUtils.currentTsInString());
}

ProducerRecord<String, T> producerRecord;
if (StringUtils.isBlank(key)) {
Expand All @@ -61,20 +72,16 @@ private ProducerRecord<String, T> toProducerRecord(String key, T msgBody,
producerRecord = new ProducerRecord<>(topicName(), key, msgBody);
}

KafkaUtils.toKafkaHeaders(context.getOkapiHeaders())
toKafkaHeaders(context.getOkapiHeaders())
.forEach(header -> producerRecord.headers().add(header));

KafkaUtils.toKafkaHeaders(headersMap)
toKafkaHeaders(headersMap)
.forEach(header -> producerRecord.headers().add(header));

return producerRecord;
}

private ProducerRecord<String, T> toProducerRecord(T msgBody) {
return toProducerRecord(null, msgBody, Collections.emptyMap());
}

private String topicName() {
return org.folio.spring.tools.kafka.KafkaUtils.getTenantTopicName(topicName, context.getTenantId());
return getTenantTopicName(topicName, context.getTenantId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ public class ExtendedTenantService extends TenantService {
private final FolioExecutionContext folioExecutionContext;
private final KafkaAdminService kafkaAdminService;
private final ReferenceDataLoader referenceDataLoader;
private final MarcSpecificationUpdateService specificationUpdateService;

public ExtendedTenantService(JdbcTemplate jdbcTemplate,
FolioExecutionContext context,
KafkaAdminService kafkaAdminService,
FolioSpringLiquibase folioSpringLiquibase,
FolioExecutionContext folioExecutionContext,
PrepareSystemUserService folioPrepareSystemUserService,
ReferenceDataLoader referenceDataLoader) {
ReferenceDataLoader referenceDataLoader,
MarcSpecificationUpdateService specificationUpdateService) {
super(jdbcTemplate, context, folioSpringLiquibase);
this.folioPrepareSystemUserService = folioPrepareSystemUserService;
this.folioExecutionContext = folioExecutionContext;
this.kafkaAdminService = kafkaAdminService;
this.referenceDataLoader = referenceDataLoader;
this.specificationUpdateService = specificationUpdateService;
}

@Override
Expand All @@ -42,6 +45,11 @@ protected void afterTenantUpdate(TenantAttributes tenantAttributes) {
kafkaAdminService.createTopics(folioExecutionContext.getTenantId());
kafkaAdminService.restartEventListeners();
folioPrepareSystemUserService.setupSystemUser();
try {
specificationUpdateService.sendSpecificationRequests();
} catch (Exception e) {
log.warn("Failed to send MARC specification updates requests", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.folio.entlinks.service.tenant;

import lombok.RequiredArgsConstructor;
import org.folio.entlinks.domain.entity.InstanceAuthorityLinkingRule;
import org.folio.entlinks.integration.kafka.EventProducer;
import org.folio.entlinks.service.links.InstanceAuthorityLinkingRulesService;
import org.folio.rspec.domain.dto.DefinitionType;
import org.folio.rspec.domain.dto.Family;
import org.folio.rspec.domain.dto.FamilyProfile;
import org.folio.rspec.domain.dto.Scope;
import org.folio.rspec.domain.dto.SubfieldDto;
import org.folio.rspec.domain.dto.SubfieldUpdateRequestEvent;
import org.folio.rspec.domain.dto.UpdateRequestEvent;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class MarcSpecificationUpdateService {

private static final String SUBFIELD_9_CODE = "9";
private static final String SUBFIELD_9_LABEL = "Linked authority UUID";

private final InstanceAuthorityLinkingRulesService linkingRulesService;
private final EventProducer<UpdateRequestEvent> eventProducer;

@Retryable(maxAttempts = 10, backoff = @Backoff(delay = 5000))
public void sendSpecificationRequests() {
var requestEvents = linkingRulesService.getLinkingRules().stream()
.map(InstanceAuthorityLinkingRule::getBibField)
.distinct()
.map(this::toSubfieldCreationRequest)
.toList();

eventProducer.sendMessages(requestEvents);
}

private UpdateRequestEvent toSubfieldCreationRequest(String tag) {
var requestEvent = new SubfieldUpdateRequestEvent();
requestEvent.setFamily(Family.MARC);
requestEvent.setProfile(FamilyProfile.BIBLIOGRAPHIC);
requestEvent.setDefinitionType(DefinitionType.SUBFIELD);
requestEvent.setTargetFieldTag(tag);
requestEvent.setSubfield(new SubfieldDto()
.code(SUBFIELD_9_CODE)
.label(SUBFIELD_9_LABEL)
.deprecated(false)
.required(false)
.repeatable(false)
.scope(Scope.SYSTEM)
);
return requestEvent;
}
}
28 changes: 0 additions & 28 deletions src/main/java/org/folio/entlinks/utils/KafkaUtils.java

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ spring:
fail-on-unknown-properties: false
accept-single-value-as-array: true
kafka:
bootstrap-servers: ${KAFKA_HOST:kafka}:${KAFKA_PORT:9092}
bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:29092}
consumer:
max-poll-records: ${KAFKA_CONSUMER_MAX_POLL_RECORDS:50}
security:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,23 @@ class ExtendedTenantServiceTest {
private KafkaAdminService kafkaAdminService;
@Mock
private PrepareSystemUserService prepareSystemUserService;
@Mock
private MarcSpecificationUpdateService specificationUpdateService;

@Test
void initializeTenant_positive() {
when(context.getTenantId()).thenReturn(TENANT_ID);
doNothing().when(prepareSystemUserService).setupSystemUser();
doNothing().when(kafkaAdminService).createTopics(TENANT_ID);
doNothing().when(kafkaAdminService).restartEventListeners();
doNothing().when(specificationUpdateService).sendSpecificationRequests();

tenantService.afterTenantUpdate(tenantAttributes());

verify(prepareSystemUserService).setupSystemUser();
verify(kafkaAdminService).createTopics(TENANT_ID);
verify(kafkaAdminService).restartEventListeners();
verify(specificationUpdateService).sendSpecificationRequests();
}

@Test
Expand Down
Loading

0 comments on commit 3161f7d

Please sign in to comment.