Skip to content

Commit

Permalink
feat(authority-source-files): produce authority source file update ev…
Browse files Browse the repository at this point in the history
…ent (folio-org#271)

* feat(authority-source-files): produce domain event

- Add Kafka topic for authority source files updating
- Add publisher for new topic

Closes: MODELINKS-202
  • Loading branch information
viacheslavpoliakov authored Mar 28, 2024
1 parent 949fe95 commit f51aa0c
Show file tree
Hide file tree
Showing 17 changed files with 321 additions and 51 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* Implement endpoint for bulk authorities upsert from external file ([MODELINKS-173](https://issues.folio.org/browse/MODELINKS-173))
* Add possibility to filter Authority records by (un)defined fields in Cql query ([MODELINKS-214](https://issues.folio.org/browse/MODELINKS-214))
* Set auto_linking_enabled in instance_authority_linking_rule for 6xx fields ([MODELINKS-220](https://folio-org.atlassian.net/browse/MODELINKS-220))
* Add Authority source file Kafka topic and publisher for update event ([MODELINKS-202](https://folio-org.atlassian.net/browse/MODELINKS-202))

### Bug fixes
* Fix secure setup of system users by default ([MODELINKS-135](https://issues.folio.org/browse/MODELINKS-135))
Expand Down
10 changes: 10 additions & 0 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,16 @@
"value": "50",
"description": "Maximum number of records returned in a single call to poll()."
},
{
"name": "KAFKA_AUTHORITY_SOURCE_FILE_TOPIC_PARTITIONS",
"value": "1",
"description": "Amount of partitions for `authority-authority-source-file` topic."
},
{
"name": "KAFKA_AUTHORITY_SOURCE_FILE_TOPIC_REPLICATION_FACTOR",
"value": "",
"description": "Replication factor for `authority-authority-source-file` topic."
},
{
"name": "KAFKA_INSTANCE_AUTHORITY_TOPIC_PARTITIONS",
"value": "10",
Expand Down
2 changes: 2 additions & 0 deletions doc/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ docker run -t -i -p 8081:8081 mod-entities-links
| KAFKA_SSL_TRUSTSTORE_LOCATION | - | The location of the Kafka trust store file. |
| KAFKA_SSL_TRUSTSTORE_PASSWORD | - | The password for the Kafka trust store file. If a password is not set, trust store file configured will still be used, but integrity checking is disabled. |
| KAFKA_CONSUMER_MAX_POLL_RECORDS | 50 | Maximum number of records returned in a single call to poll(). |
| KAFKA_AUTHORITY_SOURCE_FILE_TOPIC_PARTITIONS | 1 | Amount of partitions for `authority.authority-source-file` topic. |
| KAFKA_AUTHORITY_SOURCE_FILE_TOPIC_REPLICATION_FACTOR | - | Replication factor for `authority.authority-source-file` topic. |
| KAFKA_INSTANCE_AUTHORITY_TOPIC_PARTITIONS | 10 | Amount of partitions for `links.instance-authority` topic. |
| KAFKA_INSTANCE_AUTHORITY_TOPIC_REPLICATION_FACTOR | - | Replication factor for `links.instance-authority` topic. |
| KAFKA_INSTANCE_AUTHORITY_STATS_TOPIC_PARTITIONS | 10 | Amount of partitions for `links.instance-authority-stats` topic. |
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/org/folio/entlinks/config/KafkaConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,28 @@ public EventProducer<LinkUpdateReport> linkUpdateReportMessageProducerService(
}

@Bean
public ProducerFactory<String, DomainEvent<?>> domainProducerFactory(KafkaProperties kafkaProperties) {
public <T> ProducerFactory<String, DomainEvent<T>> domainProducerFactory(KafkaProperties kafkaProperties) {
return getProducerConfigProps(kafkaProperties);
}

@Bean
public KafkaTemplate<String, DomainEvent<?>> domainKafkaTemplate(
ProducerFactory<String, DomainEvent<?>> domainProducerFactory) {
public <T> KafkaTemplate<String, DomainEvent<T>> domainKafkaTemplate(
ProducerFactory<String, DomainEvent<T>> domainProducerFactory) {
return new KafkaTemplate<>(domainProducerFactory);
}

@Bean
public EventProducer<DomainEvent<?>> authorityDomainMessageProducerService(
KafkaTemplate<String, DomainEvent<?>> template) {
@Bean("authorityDomainMessageProducer")
public <T> EventProducer<DomainEvent<T>> authorityDomainMessageProducerService(
KafkaTemplate<String, DomainEvent<T>> template) {
return new EventProducer<>(template, "authorities.authority");
}

@Bean("authoritySourceFileDomainMessageProducer")
public <T> EventProducer<DomainEvent<T>> authoritySourceFileDomainMessageProducerService(
KafkaTemplate<String, DomainEvent<T>> template) {
return new EventProducer<>(template, "authority.authority-source-file");
}

private <T> ConcurrentKafkaListenerContainerFactory<String, T> listenerFactory(
ConsumerFactory<String, T> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, T>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.function.BiConsumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -21,13 +22,16 @@
import org.folio.entlinks.domain.entity.AuthoritySourceFile;
import org.folio.entlinks.exception.RequestBodyValidationException;
import org.folio.entlinks.integration.dto.event.DomainEventType;
import org.folio.entlinks.service.authority.AuthoritySourceFileDomainEventPublisher;
import org.folio.entlinks.service.authority.AuthoritySourceFileService;
import org.folio.entlinks.service.consortium.ConsortiumTenantsService;
import org.folio.entlinks.service.consortium.UserTenantsService;
import org.folio.entlinks.service.consortium.propagation.ConsortiumPropagationService;
import org.folio.entlinks.service.consortium.propagation.ConsortiumAuthoritySourceFilePropagationService;
import org.folio.entlinks.service.consortium.propagation.model.AuthoritySourceFilePropagationData;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.folio.tenant.domain.dto.Parameter;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Service;

@Log4j2
Expand All @@ -40,7 +44,8 @@ public class AuthoritySourceFileServiceDelegate {
private final AuthoritySourceFileService service;
private final AuthoritySourceFileMapper mapper;
private final UserTenantsService tenantsService;
private final ConsortiumPropagationService<AuthoritySourceFile> propagationService;
private final AuthoritySourceFileDomainEventPublisher eventPublisher;
private final ConsortiumAuthoritySourceFilePropagationService propagationService;
private final FolioExecutionContext context;
private final SystemUserScopedExecutionService executionService;
private final ConsortiumTenantsService consortiumTenantsService;
Expand All @@ -64,22 +69,26 @@ public AuthoritySourceFileDto createAuthoritySourceFile(AuthoritySourceFilePostD

service.createSequence(created.getSequenceName(), created.getHridStartNumber());

propagationService.propagate(entity, CREATE, context.getTenantId());
propagationService.propagate(getPropagationData(entity, null), CREATE, context.getTenantId());
return mapper.toDto(created);
}

public void patchAuthoritySourceFile(UUID id, AuthoritySourceFilePatchDto partiallyModifiedDto) {
log.debug("patch:: Attempting to patch AuthoritySourceFile [id: {}, patchDto: {}]", id, partiallyModifiedDto);
var existingEntity = service.getById(id);
validateActionRightsForTenant(DomainEventType.UPDATE);
validatePatchRequest(partiallyModifiedDto, existingEntity);
var hasAuthorityReferences = anyAuthoritiesExistForSourceFile(existingEntity);
validatePatchRequest(partiallyModifiedDto, existingEntity, hasAuthorityReferences);

var partialEntityUpdate = new AuthoritySourceFile(existingEntity);
partialEntityUpdate = mapper.partialUpdate(partiallyModifiedDto, partialEntityUpdate);
normalizeBaseUrl(partialEntityUpdate);
var patched = service.update(id, partialEntityUpdate);

var publishConsumer = publishRequired(hasAuthorityReferences, partiallyModifiedDto, existingEntity)
? getUpdatePublishConsumer() : null;
var patched = service.update(id, partialEntityUpdate, publishConsumer);
log.debug("patch:: Authority Source File partially updated: {}", patched);
propagationService.propagate(patched, UPDATE, context.getTenantId());
propagationService.propagate(getPropagationData(patched, publishConsumer), UPDATE, context.getTenantId());
}

public void deleteAuthoritySourceFileById(UUID id) {
Expand All @@ -88,14 +97,14 @@ public void deleteAuthoritySourceFileById(UUID id) {

if (anyAuthoritiesExistForSourceFile(entity)) {
throw new RequestBodyValidationException(
"Unable to delete. Authority source file has referenced authorities", Collections.emptyList());
"Unable to delete. Authority source file has referenced authorities", Collections.emptyList());
}

if (entity.getSequenceName() != null) {
service.deleteSequence(entity.getSequenceName());
}
service.deleteById(id);
propagationService.propagate(entity, DELETE, context.getTenantId());
propagationService.propagate(getPropagationData(entity, null), DELETE, context.getTenantId());
}

public AuthoritySourceFileHridDto getAuthoritySourceFileNextHrid(UUID id) {
Expand Down Expand Up @@ -126,9 +135,9 @@ private void validateActionRightsForTenant(DomainEventType action) {
}
}

private void validatePatchRequest(AuthoritySourceFilePatchDto patchDto, AuthoritySourceFile existing) {
private void validatePatchRequest(AuthoritySourceFilePatchDto patchDto, AuthoritySourceFile existing,
boolean hasAuthorityReferences) {
var errorParameters = new LinkedList<Parameter>();
var hasAuthorityReferences = anyAuthoritiesExistForSourceFile(existing);

if (!(existing.getSource().equals(FOLIO) || hasAuthorityReferences)) {
return;
Expand Down Expand Up @@ -167,4 +176,18 @@ public boolean anyAuthoritiesExistForSourceFile(AuthoritySourceFile sourceFile)

return false;
}

private AuthoritySourceFilePropagationData<AuthoritySourceFile> getPropagationData(
AuthoritySourceFile authoritySourceFile, BiConsumer<AuthoritySourceFile, AuthoritySourceFile> publishConsumer) {
return new AuthoritySourceFilePropagationData<>(authoritySourceFile, publishConsumer);
}

@NotNull
private BiConsumer<AuthoritySourceFile, AuthoritySourceFile> getUpdatePublishConsumer() {
return (newAsf, oldAsf) -> eventPublisher.publishUpdateEvent(mapper.toDto(newAsf), mapper.toDto(oldAsf));
}

private boolean publishRequired(boolean hasRefs, AuthoritySourceFilePatchDto modified, AuthoritySourceFile existed) {
return hasRefs && !modified.getBaseUrl().equals(existed.getBaseUrl());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.folio.entlinks.integration.kafka.EventProducer;
import org.folio.entlinks.service.reindex.ReindexContext;
import org.folio.spring.FolioExecutionContext;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -19,6 +20,7 @@ public class AuthorityDomainEventPublisher {
private static final String DOMAIN_EVENT_TYPE_HEADER = "domain-event-type";
private static final String REINDEX_JOB_ID_HEADER = "reindex-job-id";

@Qualifier("authorityDomainMessageProducer")
private final EventProducer<DomainEvent<?>> eventProducer;
private final FolioExecutionContext folioExecutionContext;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.folio.entlinks.service.authority;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.entlinks.domain.dto.AuthoritySourceFileDto;
import org.folio.entlinks.integration.dto.event.DomainEvent;
import org.folio.entlinks.integration.dto.event.DomainEventType;
import org.folio.entlinks.integration.kafka.EventProducer;
import org.folio.spring.FolioExecutionContext;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
@Log4j2
public class AuthoritySourceFileDomainEventPublisher {
private static final String DOMAIN_EVENT_TYPE_HEADER = "domain-event-type";

@Qualifier("authoritySourceFileDomainMessageProducer")
private final EventProducer<DomainEvent<?>> eventProducer;
private final FolioExecutionContext folioExecutionContext;

public void publishUpdateEvent(AuthoritySourceFileDto oldAsfDto, AuthoritySourceFileDto updatedAsfDto) {
var id = updatedAsfDto.getId();
if (id == null || oldAsfDto.getId() == null) {
log.warn("Old/New Authority Source File cannot have null id: updated.id - {}, old.id: {}",
id, oldAsfDto.getId());
return;
}

log.debug("publishUpdated::process authority source file id={}", id);

var domainEvent = DomainEvent.updateEvent(id, oldAsfDto, updatedAsfDto,
folioExecutionContext.getTenantId());
eventProducer.sendMessage(id.toString(), domainEvent, DOMAIN_EVENT_TYPE_HEADER, DomainEventType.UPDATE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.function.BiConsumer;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -120,20 +121,17 @@ public AuthoritySourceFile create(AuthoritySourceFile entity) {
maxAttempts = 2,
backoff = @Backoff(delay = 500))
public AuthoritySourceFile update(UUID id, AuthoritySourceFile modified) {
log.debug("update:: Attempting to update AuthoritySourceFile [id: {}]", id);

validateOnUpdate(id, modified);

var existingEntity = repository.findById(id).orElseThrow(() -> new AuthoritySourceFileNotFoundException(id));
if (modified.getVersion() < existingEntity.getVersion()) {
throw OptimisticLockingException.optimisticLockingOnUpdate(
id, existingEntity.getVersion(), modified.getVersion());
}

updateSequenceStartNumber(existingEntity, modified);
return updateInner(id, modified, null);
}

copyModifiableFields(existingEntity, modified);
return repository.save(existingEntity);
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Retryable(
retryFor = OptimisticLockingException.class,
maxAttempts = 2,
backoff = @Backoff(delay = 500))
public AuthoritySourceFile update(UUID id, AuthoritySourceFile modified,
BiConsumer<AuthoritySourceFile, AuthoritySourceFile> publishConsumer) {
return updateInner(id, modified, publishConsumer);
}

public void deleteById(UUID id) {
Expand Down Expand Up @@ -182,6 +180,30 @@ private void validateOnCreate(AuthoritySourceFile entity) {
entity.getAuthoritySourceFileCodes().forEach(this::validateSourceFileCode);
}

private AuthoritySourceFile updateInner(UUID id, AuthoritySourceFile modified,
BiConsumer<AuthoritySourceFile, AuthoritySourceFile> publishConsumer) {
log.debug("update:: Attempting to update AuthoritySourceFile [id: {}]", id);

validateOnUpdate(id, modified);

var existingEntity = repository.findById(id).orElseThrow(() -> new AuthoritySourceFileNotFoundException(id));
var detachedExisting = new AuthoritySourceFile(existingEntity);
if (modified.getVersion() < existingEntity.getVersion()) {
throw OptimisticLockingException.optimisticLockingOnUpdate(
id, existingEntity.getVersion(), modified.getVersion());
}

updateSequenceStartNumber(existingEntity, modified);

copyModifiableFields(existingEntity, modified);

AuthoritySourceFile saved = repository.saveAndFlush(existingEntity);
if (publishConsumer != null) {
publishConsumer.accept(saved, detachedExisting);
}
return saved;
}

private void validateOnUpdate(UUID id, AuthoritySourceFile entity) {
if (!Objects.equals(id, entity.getId())) {
throw new RequestBodyValidationException("Request should have id = " + id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import org.folio.entlinks.domain.entity.AuthoritySourceFile;
import org.folio.entlinks.service.authority.AuthoritySourceFileService;
import org.folio.entlinks.service.consortium.ConsortiumTenantsService;
import org.folio.entlinks.service.consortium.propagation.model.AuthoritySourceFilePropagationData;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.springframework.stereotype.Service;

@Log4j2
@Service
public class ConsortiumAuthoritySourceFilePropagationService extends ConsortiumPropagationService<AuthoritySourceFile> {
public class ConsortiumAuthoritySourceFilePropagationService
extends ConsortiumPropagationService<AuthoritySourceFilePropagationData<AuthoritySourceFile>> {

private final AuthoritySourceFileService sourceFileService;

Expand All @@ -20,13 +22,14 @@ public ConsortiumAuthoritySourceFilePropagationService(AuthoritySourceFileServic
this.sourceFileService = sourceFileService;
}

protected void doPropagation(AuthoritySourceFile sourceFile, PropagationType propagationType) {
protected void doPropagation(AuthoritySourceFilePropagationData<AuthoritySourceFile> data,
PropagationType propagationType) {
var sourceFile = data.authoritySourceFile();
switch (propagationType) {
case CREATE -> sourceFileService.create(sourceFile);
case UPDATE -> sourceFileService.update(sourceFile.getId(), sourceFile);
case UPDATE -> sourceFileService.update(sourceFile.getId(), sourceFile, data.publishConsumer());
case DELETE -> sourceFileService.deleteById(sourceFile.getId());
default -> throw new IllegalStateException("Unexpected value: " + propagationType);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public void propagate(T entity, PropagationType propagationType,
} catch (FolioIntegrationException e) {
log.warn("Skip propagation. Exception: ", e);
}

}

protected abstract void doPropagation(T entity,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.folio.entlinks.service.consortium.propagation.model;

import java.util.function.BiConsumer;
import org.folio.entlinks.domain.entity.AuthoritySourceFile;

public record AuthoritySourceFilePropagationData<T>(AuthoritySourceFile authoritySourceFile,
BiConsumer<T, T> publishConsumer) {
}
3 changes: 3 additions & 0 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ folio:
permissionsFilePath: permissions/mod-entities-links-permissions.csv
kafka:
topics:
- name: authority.authority-source-file
numPartitions: ${KAFKA_AUTHORITY_SOURCE_FILE_TOPIC_PARTITIONS:1}
replicationFactor: ${KAFKA_AUTHORITY_SOURCE_FILE_TOPIC_REPLICATION_FACTOR:}
- name: authorities.authority
numPartitions: ${KAFKA_INSTANCE_AUTHORITY_TOPIC_PARTITIONS:50}
replicationFactor: ${KAFKA_INSTANCE_AUTHORITY_TOPIC_REPLICATION_FACTOR:}
Expand Down
Loading

0 comments on commit f51aa0c

Please sign in to comment.