diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000000..f7ccb323c6 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,50 @@ +# Maven +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.mvn/wrapper/maven-wrapper.jar + +# IDE +.idea/ +*.iml +*.iws +*.ipr +.vscode/ +.project +.classpath +.settings/ +.metadata/ + +# Git +.git/ +.gitignore +.gitattributes + +# Docker +Dockerfile +.dockerignore +docker-compose.yml + +# Documentation +*.md +!README.md + +# Logs +*.log + +# OS +.DS_Store +Thumbs.db + +# Build artifacts +*.jar +*.war +*.ear +*.class + diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000000..3b41682ac5 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +/mvnw text eol=lf +*.cmd text eol=crlf diff --git a/.gitignore b/.gitignore index 67045665db..38b8d9ab31 100644 --- a/.gitignore +++ b/.gitignore @@ -1,104 +1,37 @@ -# Logs -logs +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Docker ### *.log -npm-debug.log* -yarn-debug.log* -yarn-error.log* -lerna-debug.log* -# Diagnostic reports (https://nodejs.org/api/report.html) -report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json - -# Runtime data -pids -*.pid -*.seed -*.pid.lock - -# Directory for instrumented libs generated by jscoverage/JSCover -lib-cov - -# Coverage directory used by tools like istanbul -coverage -*.lcov - -# nyc test coverage -.nyc_output - -# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) -.grunt - -# Bower dependency directory (https://bower.io/) -bower_components - -# node-waf configuration -.lock-wscript - -# Compiled binary addons (https://nodejs.org/api/addons.html) -build/Release - -# Dependency directories -node_modules/ -jspm_packages/ - -# TypeScript v1 declaration files -typings/ - -# TypeScript cache -*.tsbuildinfo - -# Optional npm cache directory -.npm - -# Optional eslint cache -.eslintcache - -# Microbundle cache -.rpt2_cache/ -.rts2_cache_cjs/ -.rts2_cache_es/ -.rts2_cache_umd/ - -# Optional REPL history -.node_repl_history - -# Output of 'npm pack' -*.tgz - -# Yarn Integrity file -.yarn-integrity - -# dotenv environment variables file -.env -.env.test - -# parcel-bundler cache (https://parceljs.org/) -.cache - -# Next.js build output -.next - -# Nuxt.js build / generate output -.nuxt -dist - -# Gatsby files -.cache/ -# Comment in the public line in if your project uses Gatsby and *not* Next.js -# https://nextjs.org/blog/next-9-1#public-directory-support -# public - -# vuepress build output -.vuepress/dist - -# Serverless directories -.serverless/ - -# FuseBox cache -.fusebox/ - -# DynamoDB Local files -.dynamodb/ - -# TernJS port file -.tern-port diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000000..8dea6c227c --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,3 @@ +wrapperVersion=3.3.4 +distributionType=only-script +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.12/apache-maven-3.9.12-bin.zip diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000..5fcef25b09 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,25 @@ +# Build stage +FROM maven:3.9.5-eclipse-temurin-21 AS build +WORKDIR /app + +# Copy parent POM and common module +COPY pom.xml . +COPY common common + +# Copy transaction-service +COPY transaction-service transaction-service + +# Build only transaction-service and its dependencies +RUN mvn clean package -DskipTests -pl transaction-service -am + +# Run stage +FROM eclipse-temurin:21-jre-alpine +WORKDIR /app + +COPY --from=build /app/transaction-service/target/*.jar app.jar + +EXPOSE 8080 + +ENTRYPOINT ["java", "-jar", "app.jar"] + + diff --git a/README.md b/README.md index b067a71026..fef131ef0c 100644 --- a/README.md +++ b/README.md @@ -1,82 +1,484 @@ -# Yape Code Challenge :rocket: +# Yape Challenge - Microservicios de Transacciones -Our code challenge will let you marvel us with your Jedi coding skills :smile:. +Sistema de microservicios para gestión de transacciones con validación anti-fraude, **optimizado para alto volumen**. -Don't forget that the proper way to submit your work is to fork the repo and create a PR :wink: ... have fun !! +## ✨ Características Principales -- [Problem](#problem) -- [Tech Stack](#tech_stack) -- [Send us your challenge](#send_us_your_challenge) +- ✅ **Arquitectura de Microservicios** con comunicación asíncrona vía Kafka +- ✅ **Event Sourcing** implementado para auditoría completa de transacciones +- ✅ **CQRS** (Command Query Responsibility Segregation) con Command Bus y Query Bus +- ✅ **Redis Cache Distribuido** para lecturas de alta velocidad (5-20ms) +- ✅ **Optimizado para Alto Volumen** (5K-10K lecturas/seg, 100-200 escrituras/seg) +- ✅ **API REST** con Spring Boot 3.2.0 y Java 21 +- ✅ **Validación Anti-Fraude** en tiempo real (rechaza transacciones > 1000) +- ✅ **PostgreSQL 16** con JSONB para Event Store +- ✅ **Apache Kafka** para mensajería asíncrona entre servicios +- ✅ **Docker y Docker Compose** para deployment simplificado +- ✅ **HikariCP** con connection pooling optimizado (50 conexiones) +- ✅ **Event Store API** para auditoría y debugging de eventos -# Problem +## 🏗️ Arquitectura -Every time a financial transaction is created it must be validated by our anti-fraud microservice and then the same service sends a message back to update the transaction status. -For now, we have only three transaction statuses: +Este proyecto implementa una arquitectura de microservicios con los siguientes componentes: -
Query type + * @paramResponse type + * @return Response from handler + */ + public R dispatch(Q query) { + log.debug("Dispatching query: {}", query.getClass().getSimpleName()); + + QueryHandlerhandler = findHandler(query); + + if (handler == null) { + throw new IllegalStateException("No handler found for query: " + query.getClass().getName()); + } + + return handler.handle(query); + } + + @SuppressWarnings("unchecked") + privateQueryHandlerfindHandler(Q query) { + // Get all QueryHandler beans + var handlers = applicationContext.getBeansOfType(QueryHandler.class); + + // Find the handler that can handle this query type + for (var handler : handlers.values()) { + // Check if this handler can handle the query by examining its generic types + ResolvableType handlerType = ResolvableType.forClass(handler.getClass()).as(QueryHandler.class); + ResolvableType[] generics = handlerType.getGenerics(); + + if (generics.length == 2) { + Class> queryType = generics[0].resolve(); + if (queryType != null && queryType.isAssignableFrom(query.getClass())) { + return handler; + } + } + } + + return null; + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/command/CreateTransactionCommand.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/command/CreateTransactionCommand.java new file mode 100644 index 0000000000..43c0092796 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/command/CreateTransactionCommand.java @@ -0,0 +1,19 @@ +package com.yape.challenge.transaction.application.command; + +import lombok.Builder; +import lombok.Data; + +import java.math.BigDecimal; +import java.util.UUID; + +/** + * Command to create a new transaction + */ +@Data +@Builder +public class CreateTransactionCommand { + private UUID accountExternalIdDebit; + private UUID accountExternalIdCredit; + private Integer tranferTypeId; + private BigDecimal value; +} diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/command/UpdateTransactionStatusCommand.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/command/UpdateTransactionStatusCommand.java new file mode 100644 index 0000000000..0682d6c4dd --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/command/UpdateTransactionStatusCommand.java @@ -0,0 +1,18 @@ +package com.yape.challenge.transaction.application.command; + +import com.yape.challenge.common.dto.TransactionStatus; +import lombok.Builder; +import lombok.Data; + +import java.util.UUID; + +/** + * Command to update transaction status + */ +@Data +@Builder +public class UpdateTransactionStatusCommand { + private UUID externalId; + private TransactionStatus status; +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/dto/request/CreateTransactionRequest.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/dto/request/CreateTransactionRequest.java new file mode 100644 index 0000000000..b1a18a9733 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/dto/request/CreateTransactionRequest.java @@ -0,0 +1,34 @@ +package com.yape.challenge.transaction.application.dto.request; + +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.Positive; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.math.BigDecimal; +import java.util.UUID; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class CreateTransactionRequest { + + @NotNull(message = "accountExternalIdDebit is required") + private UUID accountExternalIdDebit; + + @NotNull(message = "accountExternalIdCredit is required") + private UUID accountExternalIdCredit; + + @NotNull(message = "tranferTypeId is required") + @Positive(message = "tranferTypeId must be positive") + private Integer tranferTypeId; + + @NotNull(message = "value is required") + @Positive(message = "value must be positive") + private BigDecimal value; +} + + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/dto/response/TransactionResponse.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/dto/response/TransactionResponse.java new file mode 100644 index 0000000000..c023051eee --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/dto/response/TransactionResponse.java @@ -0,0 +1,44 @@ +package com.yape.challenge.transaction.application.dto.response; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.UUID; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TransactionResponse { + + private UUID transactionExternalId; + + private TransactionTypeDto transactionType; + + private TransactionStatusDto transactionStatus; + + private BigDecimal value; + + private LocalDateTime createdAt; + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class TransactionTypeDto { + private String name; + } + + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class TransactionStatusDto { + private String name; + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/CommandHandler.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/CommandHandler.java new file mode 100644 index 0000000000..10c81d8093 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/CommandHandler.java @@ -0,0 +1,10 @@ +package com.yape.challenge.transaction.application.handler; + +/** + * Generic Command Handler interface + * @paramCommand type + * @param Response type + */ +public interface CommandHandler { + R handle(C command); +} diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/QueryHandler.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/QueryHandler.java new file mode 100644 index 0000000000..845d9d2dfc --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/QueryHandler.java @@ -0,0 +1,11 @@ +package com.yape.challenge.transaction.application.handler; + +/** + * Generic Query Handler interface + * @param Query type + * @paramResponse type + */ +public interface QueryHandler { + R handle(Q query); +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/command/CreateTransactionCommandHandler.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/command/CreateTransactionCommandHandler.java new file mode 100644 index 0000000000..50f61f1aad --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/command/CreateTransactionCommandHandler.java @@ -0,0 +1,97 @@ +package com.yape.challenge.transaction.application.handler.command; + +import com.yape.challenge.common.dto.TransactionCreatedEvent; +import com.yape.challenge.common.dto.TransactionStatus; +import com.yape.challenge.common.kafka.KafkaTopics; +import com.yape.challenge.transaction.application.command.CreateTransactionCommand; +import com.yape.challenge.transaction.application.dto.response.TransactionResponse; +import com.yape.challenge.transaction.application.handler.CommandHandler; +import com.yape.challenge.transaction.application.mapper.TransactionMapper; +import com.yape.challenge.transaction.domain.entity.Transaction; +import com.yape.challenge.transaction.domain.entity.TransactionType; +import com.yape.challenge.transaction.domain.event.TransactionCreatedDomainEvent; +import com.yape.challenge.transaction.infrastructure.eventstore.EventStore; +import com.yape.challenge.transaction.infrastructure.kafka.producer.KafkaProducerService; +import com.yape.challenge.transaction.infrastructure.repository.TransactionRepository; +import com.yape.challenge.transaction.infrastructure.repository.TransactionTypeRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.UUID; + +/** + * Handler for CreateTransactionCommand with Event Sourcing + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class CreateTransactionCommandHandler implements CommandHandler{ + + private final EventStore eventStore; + private final TransactionRepository transactionRepository; + private final TransactionTypeRepository transactionTypeRepository; + private final TransactionMapper transactionMapper; + private final KafkaProducerService kafkaProducerService; + + @Override + @Transactional + public TransactionResponse handle(CreateTransactionCommand command) { + log.info("Handling CreateTransactionCommand with Event Sourcing: {}", command); + + // 1. Generate unique transaction ID + UUID transactionId = UUID.randomUUID(); + + // 2. Create domain event + TransactionCreatedDomainEvent domainEvent = TransactionCreatedDomainEvent.builder() + .aggregateId(transactionId) + .accountExternalIdDebit(command.getAccountExternalIdDebit()) + .accountExternalIdCredit(command.getAccountExternalIdCredit()) + .transferTypeId(command.getTranferTypeId()) + .value(command.getValue()) + .occurredAt(LocalDateTime.now()) + .build(); + + // 3. Save event to Event Store (persistence) + eventStore.saveEvent(domainEvent); + log.info("Domain event persisted in Event Store for transaction: {}", transactionId); + + // 4. Apply event to create aggregate and save read model + Transaction transaction = applyEvent(domainEvent); + Transaction savedTransaction = transactionRepository.save(transaction); + log.info("Transaction read model saved with externalId: {}", savedTransaction.getExternalId()); + + // 5. Get transaction type + TransactionType transactionType = transactionTypeRepository.findById(command.getTranferTypeId()) + .orElseThrow(() -> new IllegalArgumentException("Transaction type not found")); + + // 6. Publish integration event to Kafka + TransactionCreatedEvent kafkaEvent = transactionMapper.toCreatedEvent(savedTransaction); + kafkaProducerService.sendTransactionCreatedEvent( + KafkaTopics.TRANSACTION_CREATED, + savedTransaction.getExternalId().toString(), + kafkaEvent + ); + log.info("Integration event published to Kafka for externalId: {}", savedTransaction.getExternalId()); + + return transactionMapper.toResponse(savedTransaction, transactionType); + } + + /** + * Apply domain event to create transaction aggregate + */ + private Transaction applyEvent(TransactionCreatedDomainEvent event) { + return Transaction.builder() + .externalId(event.getAggregateId()) + .accountExternalIdDebit(event.getAccountExternalIdDebit()) + .accountExternalIdCredit(event.getAccountExternalIdCredit()) + .transferTypeId(event.getTransferTypeId()) + .value(event.getValue()) + .status(TransactionStatus.PENDING) + .createdAt(event.getOccurredAt()) + .updatedAt(event.getOccurredAt()) + .build(); + } +} diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/command/UpdateTransactionStatusCommandHandler.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/command/UpdateTransactionStatusCommandHandler.java new file mode 100644 index 0000000000..0832a99800 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/command/UpdateTransactionStatusCommandHandler.java @@ -0,0 +1,70 @@ +package com.yape.challenge.transaction.application.handler.command; + +import com.yape.challenge.transaction.application.command.UpdateTransactionStatusCommand; +import com.yape.challenge.transaction.application.handler.CommandHandler; +import com.yape.challenge.transaction.domain.entity.Transaction; +import com.yape.challenge.transaction.domain.event.TransactionStatusChangedDomainEvent; +import com.yape.challenge.transaction.infrastructure.eventstore.EventStore; +import com.yape.challenge.transaction.infrastructure.repository.TransactionRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; + +/** + * Handler for UpdateTransactionStatusCommand with Event Sourcing + * Implements cache invalidation for consistency + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class UpdateTransactionStatusCommandHandler implements CommandHandler { + + private final EventStore eventStore; + private final TransactionRepository transactionRepository; + + @Override + @Transactional + @CacheEvict(value = "transactions", key = "#command.externalId.toString()") + public Void handle(UpdateTransactionStatusCommand command) { + log.info("Handling UpdateTransactionStatusCommand with Event Sourcing: {}", command); + + // 1. Get current transaction state + Transaction transaction = transactionRepository.findByExternalId(command.getExternalId()) + .orElseThrow(() -> new IllegalArgumentException("Transaction not found: " + command.getExternalId())); + + // 2. Check if status actually changed + if (transaction.getStatus() == command.getStatus()) { + log.info("Transaction status unchanged: {}", command.getStatus()); + return null; + } + + // 3. Create domain event for status change + TransactionStatusChangedDomainEvent domainEvent = TransactionStatusChangedDomainEvent.builder() + .aggregateId(command.getExternalId()) + .oldStatus(transaction.getStatus()) + .newStatus(command.getStatus()) + .reason("Status updated via antifraud validation") + .occurredAt(LocalDateTime.now()) + .build(); + + // 4. Save event to Event Store (persistence) + eventStore.saveEvent(domainEvent); + log.info("Domain event persisted in Event Store for transaction: {} - Status change: {} -> {}", + command.getExternalId(), domainEvent.getOldStatus(), domainEvent.getNewStatus()); + + // 5. Apply event to update aggregate and save read model + transaction.setStatus(command.getStatus()); + transaction.setUpdatedAt(domainEvent.getOccurredAt()); + transactionRepository.save(transaction); + + log.info("Transaction status updated successfully and cache invalidated for externalId: {} - New status: {}", + command.getExternalId(), command.getStatus()); + + return null; + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/query/GetTransactionQueryHandler.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/query/GetTransactionQueryHandler.java new file mode 100644 index 0000000000..956655a59e --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/handler/query/GetTransactionQueryHandler.java @@ -0,0 +1,46 @@ +package com.yape.challenge.transaction.application.handler.query; + +import com.yape.challenge.transaction.application.dto.response.TransactionResponse; +import com.yape.challenge.transaction.application.handler.QueryHandler; +import com.yape.challenge.transaction.application.mapper.TransactionMapper; +import com.yape.challenge.transaction.application.query.GetTransactionQuery; +import com.yape.challenge.transaction.domain.entity.Transaction; +import com.yape.challenge.transaction.domain.entity.TransactionType; +import com.yape.challenge.transaction.infrastructure.repository.TransactionRepository; +import com.yape.challenge.transaction.infrastructure.repository.TransactionTypeRepository; +import com.yape.challenge.transaction.presentation.exception.ResourceNotFoundException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +/** + * Handler for GetTransactionQuery + * Implements caching for high volume read optimization + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class GetTransactionQueryHandler implements QueryHandler { + + private final TransactionRepository transactionRepository; + private final TransactionTypeRepository transactionTypeRepository; + private final TransactionMapper transactionMapper; + + @Override + @Transactional(readOnly = true) + @Cacheable(value = "transactions", key = "#query.externalId.toString()", unless = "#result == null") + public TransactionResponse handle(GetTransactionQuery query) { + log.info("Handling GetTransactionQuery: {} (cache miss)", query); + + Transaction transaction = transactionRepository.findByExternalId(query.getExternalId()) + .orElseThrow(() -> new ResourceNotFoundException("Transaction not found")); + + TransactionType transactionType = transactionTypeRepository.findById(transaction.getTransferTypeId()) + .orElseThrow(() -> new IllegalArgumentException("Transaction type not found")); + + return transactionMapper.toResponse(transaction, transactionType); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/mapper/TransactionMapper.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/mapper/TransactionMapper.java new file mode 100644 index 0000000000..2f0d1a8357 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/mapper/TransactionMapper.java @@ -0,0 +1,54 @@ +package com.yape.challenge.transaction.application.mapper; + +import com.yape.challenge.common.dto.TransactionCreatedEvent; +import com.yape.challenge.transaction.application.dto.request.CreateTransactionRequest; +import com.yape.challenge.transaction.application.dto.response.TransactionResponse; +import com.yape.challenge.transaction.domain.entity.Transaction; +import com.yape.challenge.transaction.domain.entity.TransactionType; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Named; + +@Mapper(componentModel = "spring") +public interface TransactionMapper { + + @Mapping(target = "id", ignore = true) + @Mapping(target = "externalId", ignore = true) + @Mapping(target = "status", ignore = true) + @Mapping(target = "createdAt", ignore = true) + @Mapping(target = "updatedAt", ignore = true) + @Mapping(target = "transferTypeId", source = "tranferTypeId") + Transaction toEntity(CreateTransactionRequest request); + + @Mapping(target = "transactionExternalId", source = "externalId") + @Mapping(target = "status", source = "transaction.status") + TransactionCreatedEvent toCreatedEvent(Transaction transaction); + + @Mapping(target = "transactionExternalId", source = "transaction.externalId") + @Mapping(target = "transactionType", source = "transactionType", qualifiedByName = "mapTransactionType") + @Mapping(target = "transactionStatus", source = "transaction.status", qualifiedByName = "mapTransactionStatus") + @Mapping(target = "value", source = "transaction.value") + @Mapping(target = "createdAt", source = "transaction.createdAt") + TransactionResponse toResponse(Transaction transaction, TransactionType transactionType); + + @Named("mapTransactionType") + default TransactionResponse.TransactionTypeDto mapTransactionType(TransactionType transactionType) { + if (transactionType == null) { + return null; + } + return TransactionResponse.TransactionTypeDto.builder() + .name(transactionType.getName()) + .build(); + } + + @Named("mapTransactionStatus") + default TransactionResponse.TransactionStatusDto mapTransactionStatus(com.yape.challenge.common.dto.TransactionStatus status) { + if (status == null) { + return null; + } + return TransactionResponse.TransactionStatusDto.builder() + .name(status.name()) + .build(); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/application/query/GetTransactionQuery.java b/transaction-service/src/main/java/com/yape/challenge/transaction/application/query/GetTransactionQuery.java new file mode 100644 index 0000000000..2be6277a6e --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/application/query/GetTransactionQuery.java @@ -0,0 +1,16 @@ +package com.yape.challenge.transaction.application.query; + +import lombok.Builder; +import lombok.Data; + +import java.util.UUID; + +/** + * Query to get a transaction by external ID + */ +@Data +@Builder +public class GetTransactionQuery { + private UUID externalId; +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/domain/entity/DomainEvent.java b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/entity/DomainEvent.java new file mode 100644 index 0000000000..6a155aa13a --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/entity/DomainEvent.java @@ -0,0 +1,59 @@ +package com.yape.challenge.transaction.domain.entity; + +import jakarta.persistence.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.hibernate.annotations.JdbcTypeCode; +import org.hibernate.type.SqlTypes; + +import java.time.LocalDateTime; +import java.util.UUID; + +/** + * Entity representing a domain event in the Event Store + */ +@Entity +@Table(name = "domain_events", indexes = { + @Index(name = "idx_domain_events_aggregate_id", columnList = "aggregate_id"), + @Index(name = "idx_domain_events_event_type", columnList = "event_type"), + @Index(name = "idx_domain_events_occurred_at", columnList = "occurred_at"), + @Index(name = "idx_domain_events_aggregate_type", columnList = "aggregate_type") +}, uniqueConstraints = { + @UniqueConstraint(name = "uk_aggregate_version", columnNames = {"aggregate_id", "version"}) +}) +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DomainEvent { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(name = "aggregate_id", nullable = false) + private UUID aggregateId; + + @Column(name = "aggregate_type", nullable = false, length = 100) + private String aggregateType; + + @Column(name = "event_type", nullable = false, length = 100) + private String eventType; + + @JdbcTypeCode(SqlTypes.JSON) + @Column(name = "event_data", nullable = false, columnDefinition = "jsonb") + private String eventData; + + @JdbcTypeCode(SqlTypes.JSON) + @Column(name = "metadata", columnDefinition = "jsonb") + private String metadata; + + @Column(name = "version", nullable = false) + private Integer version; + + @Column(name = "occurred_at", nullable = false) + private LocalDateTime occurredAt; +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/domain/entity/Transaction.java b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/entity/Transaction.java new file mode 100644 index 0000000000..d3c275d866 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/entity/Transaction.java @@ -0,0 +1,68 @@ +package com.yape.challenge.transaction.domain.entity; + +import com.yape.challenge.common.dto.TransactionStatus; +import jakarta.persistence.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.hibernate.annotations.CreationTimestamp; +import org.hibernate.annotations.UpdateTimestamp; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.UUID; + +@Entity +@Table(name = "transactions", indexes = { + @Index(name = "idx_external_id", columnList = "external_id", unique = true), + @Index(name = "idx_status_created", columnList = "status, created_at") +}) +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Transaction { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(name = "external_id", nullable = false, unique = true, updatable = false) + private UUID externalId; + + @Column(name = "account_external_id_debit", nullable = false) + private UUID accountExternalIdDebit; + + @Column(name = "account_external_id_credit", nullable = false) + private UUID accountExternalIdCredit; + + @Column(name = "transfer_type_id", nullable = false) + private Integer transferTypeId; + + @Column(name = "\"value\"", nullable = false, precision = 19, scale = 2) + private BigDecimal value; + + @Enumerated(EnumType.STRING) + @Column(nullable = false, length = 20) + private TransactionStatus status; + + @CreationTimestamp + @Column(name = "created_at", nullable = false, updatable = false) + private LocalDateTime createdAt; + + @UpdateTimestamp + @Column(name = "updated_at", nullable = false) + private LocalDateTime updatedAt; + + @PrePersist + protected void onCreate() { + if (externalId == null) { + externalId = UUID.randomUUID(); + } + if (status == null) { + status = TransactionStatus.PENDING; + } + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/domain/entity/TransactionType.java b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/entity/TransactionType.java new file mode 100644 index 0000000000..39d60d421d --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/entity/TransactionType.java @@ -0,0 +1,27 @@ +package com.yape.challenge.transaction.domain.entity; + +import jakarta.persistence.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Entity +@Table(name = "transaction_types") +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TransactionType { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Integer id; + + @Column(nullable = false, unique = true, length = 50) + private String name; + + @Column(length = 255) + private String description; +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/domain/event/TransactionCreatedDomainEvent.java b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/event/TransactionCreatedDomainEvent.java new file mode 100644 index 0000000000..06e9ac5bd5 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/event/TransactionCreatedDomainEvent.java @@ -0,0 +1,48 @@ +package com.yape.challenge.transaction.domain.event; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Builder; +import lombok.Data; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.UUID; + +/** + * Domain event fired when a transaction is created + */ +@Data +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class TransactionCreatedDomainEvent implements TransactionDomainEvent { + private final UUID aggregateId; + private final UUID accountExternalIdDebit; + private final UUID accountExternalIdCredit; + private final Integer transferTypeId; + private final BigDecimal value; + private final LocalDateTime occurredAt; + + @JsonCreator + public TransactionCreatedDomainEvent( + @JsonProperty("aggregateId") UUID aggregateId, + @JsonProperty("accountExternalIdDebit") UUID accountExternalIdDebit, + @JsonProperty("accountExternalIdCredit") UUID accountExternalIdCredit, + @JsonProperty("transferTypeId") Integer transferTypeId, + @JsonProperty("value") BigDecimal value, + @JsonProperty("occurredAt") LocalDateTime occurredAt) { + this.aggregateId = aggregateId; + this.accountExternalIdDebit = accountExternalIdDebit; + this.accountExternalIdCredit = accountExternalIdCredit; + this.transferTypeId = transferTypeId; + this.value = value; + this.occurredAt = occurredAt; + } + + @Override + public String getEventType() { + return "TransactionCreatedDomainEvent"; + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/domain/event/TransactionDomainEvent.java b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/event/TransactionDomainEvent.java new file mode 100644 index 0000000000..e9e99d4cb1 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/event/TransactionDomainEvent.java @@ -0,0 +1,15 @@ +package com.yape.challenge.transaction.domain.event; + +import java.time.LocalDateTime; +import java.util.UUID; + +/** + * Base interface for all transaction domain events + */ +public interface TransactionDomainEvent { + UUID getAggregateId(); + LocalDateTime getOccurredAt(); + String getEventType(); +} + + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/domain/event/TransactionStatusChangedDomainEvent.java b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/event/TransactionStatusChangedDomainEvent.java new file mode 100644 index 0000000000..7002a08055 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/event/TransactionStatusChangedDomainEvent.java @@ -0,0 +1,45 @@ +package com.yape.challenge.transaction.domain.event; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.yape.challenge.common.dto.TransactionStatus; +import lombok.Builder; +import lombok.Data; + +import java.time.LocalDateTime; +import java.util.UUID; + +/** + * Domain event fired when a transaction status changes + */ +@Data +@Builder +@JsonIgnoreProperties(ignoreUnknown = true) +public class TransactionStatusChangedDomainEvent implements TransactionDomainEvent { + private final UUID aggregateId; + private final TransactionStatus oldStatus; + private final TransactionStatus newStatus; + private final String reason; + private final LocalDateTime occurredAt; + + @JsonCreator + public TransactionStatusChangedDomainEvent( + @JsonProperty("aggregateId") UUID aggregateId, + @JsonProperty("oldStatus") TransactionStatus oldStatus, + @JsonProperty("newStatus") TransactionStatus newStatus, + @JsonProperty("reason") String reason, + @JsonProperty("occurredAt") LocalDateTime occurredAt) { + this.aggregateId = aggregateId; + this.oldStatus = oldStatus; + this.newStatus = newStatus; + this.reason = reason; + this.occurredAt = occurredAt; + } + + @Override + public String getEventType() { + return "TransactionStatusChangedDomainEvent"; + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/domain/service/TransactionAggregateService.java b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/service/TransactionAggregateService.java new file mode 100644 index 0000000000..b476993cf4 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/domain/service/TransactionAggregateService.java @@ -0,0 +1,90 @@ +package com.yape.challenge.transaction.domain.service; + +import com.yape.challenge.common.dto.TransactionStatus; +import com.yape.challenge.transaction.domain.entity.Transaction; +import com.yape.challenge.transaction.domain.event.TransactionCreatedDomainEvent; +import com.yape.challenge.transaction.domain.event.TransactionDomainEvent; +import com.yape.challenge.transaction.domain.event.TransactionStatusChangedDomainEvent; +import com.yape.challenge.transaction.infrastructure.eventstore.EventStore; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.UUID; + +/** + * Service to rebuild Transaction aggregates from domain events + */ +@Service +@RequiredArgsConstructor +@Slf4j +public class TransactionAggregateService { + + private final EventStore eventStore; + + /** + * Rebuild a transaction aggregate from its event history + */ + public Transaction rebuildFromEvents(UUID transactionId) { + log.info("Rebuilding transaction aggregate from events: {}", transactionId); + + List events = eventStore.getEvents(transactionId); + + if (events.isEmpty()) { + throw new IllegalArgumentException("No events found for transaction: " + transactionId); + } + + Transaction transaction = new Transaction(); + transaction.setExternalId(transactionId); + + // Apply each event in order + for (TransactionDomainEvent event : events) { + applyEvent(transaction, event); + } + + log.info("Transaction aggregate rebuilt successfully: {}", transactionId); + return transaction; + } + + /** + * Apply a single event to the transaction aggregate + */ + public void applyEvent(Transaction transaction, TransactionDomainEvent event) { + switch (event) { + case TransactionCreatedDomainEvent created -> { + transaction.setAccountExternalIdDebit(created.getAccountExternalIdDebit()); + transaction.setAccountExternalIdCredit(created.getAccountExternalIdCredit()); + transaction.setTransferTypeId(created.getTransferTypeId()); + transaction.setValue(created.getValue()); + transaction.setStatus(TransactionStatus.PENDING); + transaction.setCreatedAt(created.getOccurredAt()); + transaction.setUpdatedAt(created.getOccurredAt()); + log.debug("Applied TransactionCreatedDomainEvent to aggregate: {}", transaction.getExternalId()); + } + case TransactionStatusChangedDomainEvent statusChanged -> { + transaction.setStatus(statusChanged.getNewStatus()); + transaction.setUpdatedAt(statusChanged.getOccurredAt()); + log.debug("Applied TransactionStatusChangedDomainEvent to aggregate: {} - New status: {}", + transaction.getExternalId(), statusChanged.getNewStatus()); + } + default -> + log.warn("Unknown event type: {}", event.getClass().getName()); + } + } + + /** + * Check if a transaction exists in the event store + */ + public boolean transactionExists(UUID transactionId) { + return eventStore.aggregateExists(transactionId); + } + + /** + * Get the number of events for a transaction + */ + public long getEventCount(UUID transactionId) { + return eventStore.getEventCount(transactionId); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/config/CacheConfig.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/config/CacheConfig.java new file mode 100644 index 0000000000..cee5bca385 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/config/CacheConfig.java @@ -0,0 +1,83 @@ +package com.yape.challenge.transaction.infrastructure.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cache.CacheManager; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.cache.RedisCacheConfiguration; +import org.springframework.data.redis.cache.RedisCacheManager; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.RedisSerializationContext; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +import java.time.Duration; + +/** + * Redis cache configuration for high volume read optimization + * Implements distributed caching to reduce database load + */ +@Configuration +@EnableCaching +@ConditionalOnProperty(name = "spring.cache.type", havingValue = "redis", matchIfMissing = true) +public class CacheConfig { + + /** + * Configure ObjectMapper for Redis serialization with Java Time support + * Enables default typing to preserve type information during serialization/deserialization + */ + @Bean + public ObjectMapper redisCacheObjectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + objectMapper.findAndRegisterModules(); + // Enable default typing to avoid ClassCastException when deserializing from Redis + objectMapper.activateDefaultTyping( + objectMapper.getPolymorphicTypeValidator(), + ObjectMapper.DefaultTyping.NON_FINAL + ); + return objectMapper; + } + + /** + * Configure Redis Cache Manager with custom TTLs for different cache regions + */ + @Bean + public CacheManager cacheManager(RedisConnectionFactory connectionFactory) { + // Default cache configuration + RedisCacheConfiguration defaultConfig = RedisCacheConfiguration.defaultCacheConfig() + .entryTtl(Duration.ofMinutes(5)) + .serializeKeysWith( + RedisSerializationContext.SerializationPair.fromSerializer( + new StringRedisSerializer() + ) + ) + .serializeValuesWith( + RedisSerializationContext.SerializationPair.fromSerializer( + new GenericJackson2JsonRedisSerializer(redisCacheObjectMapper()) + ) + ) + .disableCachingNullValues(); + + // Build cache manager with specific cache configurations + return RedisCacheManager.builder(connectionFactory) + .cacheDefaults(defaultConfig) + // Transaction cache: 10 minutes (frequent reads) + .withCacheConfiguration("transactions", + defaultConfig.entryTtl(Duration.ofMinutes(10))) + // Transaction types: 1 hour (rarely changes) + .withCacheConfiguration("transactionTypes", + defaultConfig.entryTtl(Duration.ofHours(1))) + // Transaction list: 2 minutes (changes frequently) + .withCacheConfiguration("transactionList", + defaultConfig.entryTtl(Duration.ofMinutes(2))) + .transactionAware() + .build(); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/config/ObjectMapperConfig.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/config/ObjectMapperConfig.java new file mode 100644 index 0000000000..5fe95b6dac --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/config/ObjectMapperConfig.java @@ -0,0 +1,30 @@ +package com.yape.challenge.transaction.infrastructure.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +/** + * Configuration for ObjectMapper with Java Time support + */ +@Configuration +public class ObjectMapperConfig { + + @Bean + @Primary + public ObjectMapper objectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + + // Register JavaTimeModule for LocalDateTime, LocalDate, etc. + objectMapper.registerModule(new JavaTimeModule()); + + // Disable writing dates as timestamps + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + + return objectMapper; + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/config/Resilience4jConfig.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/config/Resilience4jConfig.java new file mode 100644 index 0000000000..7f143a3d5b --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/config/Resilience4jConfig.java @@ -0,0 +1,56 @@ +package com.yape.challenge.transaction.infrastructure.config; + +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; +import io.github.resilience4j.core.registry.EntryAddedEvent; +import io.github.resilience4j.core.registry.EntryRemovedEvent; +import io.github.resilience4j.core.registry.EntryReplacedEvent; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Configuration for Resilience4j Circuit Breaker + * Provides custom logging for Circuit Breaker events + */ +@Configuration +@Slf4j +public class Resilience4jConfig { + + @Bean + public CircuitBreakerRegistry circuitBreakerRegistry() { + CircuitBreakerRegistry registry = CircuitBreakerRegistry.ofDefaults(); + + // Register event listeners for Circuit Breaker lifecycle + registry.getEventPublisher() + .onEntryAdded(this::onCircuitBreakerAdded) + .onEntryRemoved(this::onCircuitBreakerRemoved) + .onEntryReplaced(this::onCircuitBreakerReplaced); + + return registry; + } + + private void onCircuitBreakerAdded(EntryAddedEvent event) { + log.info("Circuit Breaker '{}' added", event.getAddedEntry().getName()); + + // Register state transition listener + event.getAddedEntry().getEventPublisher() + .onStateTransition(e -> log.warn("Circuit Breaker '{}' state changed: {} -> {}", + event.getAddedEntry().getName(), + e.getStateTransition().getFromState(), + e.getStateTransition().getToState())) + .onError(e -> log.error("Circuit Breaker '{}' recorded error: {}", + event.getAddedEntry().getName(), + e.getThrowable().getMessage())) + .onSuccess(e -> log.debug("Circuit Breaker '{}' recorded success", + event.getAddedEntry().getName())); + } + + private void onCircuitBreakerRemoved(EntryRemovedEvent event) { + log.info("Circuit Breaker '{}' removed", event.getRemovedEntry().getName()); + } + + private void onCircuitBreakerReplaced(EntryReplacedEvent event) { + log.info("Circuit Breaker '{}' replaced", event.getNewEntry().getName()); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/eventstore/EventStore.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/eventstore/EventStore.java new file mode 100644 index 0000000000..1b86906aeb --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/eventstore/EventStore.java @@ -0,0 +1,174 @@ +package com.yape.challenge.transaction.infrastructure.eventstore; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yape.challenge.transaction.domain.entity.DomainEvent; +import com.yape.challenge.transaction.domain.event.TransactionCreatedDomainEvent; +import com.yape.challenge.transaction.domain.event.TransactionDomainEvent; +import com.yape.challenge.transaction.domain.event.TransactionStatusChangedDomainEvent; +import com.yape.challenge.transaction.infrastructure.repository.DomainEventRepository; +import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Event Store implementation for persisting and retrieving domain events + */ +@Component +@RequiredArgsConstructor +@Slf4j +public class EventStore { + + private final DomainEventRepository domainEventRepository; + private final ObjectMapper objectMapper; + + private static final String AGGREGATE_TYPE = "Transaction"; + + /** + * Save a domain event to the event store + */ + @Transactional + @CircuitBreaker(name = "database", fallbackMethod = "saveEventFallback") + public void saveEvent(TransactionDomainEvent event) { + try { + UUID aggregateId = event.getAggregateId(); + + // Get the last version for this aggregate + Integer lastVersion = domainEventRepository + .findLastVersionByAggregateId(aggregateId) + .orElse(0); + + // Serialize event data to JSON + String eventData = objectMapper.writeValueAsString(event); + + // Create metadata + Map metadata = new HashMap<>(); + metadata.put("timestamp", LocalDateTime.now()); + metadata.put("eventClass", event.getClass().getName()); + String metadataJson = objectMapper.writeValueAsString(metadata); + + // Create and save domain event + DomainEvent domainEvent = DomainEvent.builder() + .aggregateId(aggregateId) + .aggregateType(AGGREGATE_TYPE) + .eventType(event.getEventType()) + .eventData(eventData) + .metadata(metadataJson) + .version(lastVersion + 1) + .occurredAt(event.getOccurredAt()) + .build(); + + domainEventRepository.save(domainEvent); + log.info("Event saved: {} for aggregate: {}, version: {}", + event.getEventType(), aggregateId, lastVersion + 1); + + } catch (JsonProcessingException e) { + log.error("Error serializing event: {}", event, e); + throw new RuntimeException("Failed to serialize event", e); + } + } + + /** + * Fallback method for saveEvent when database is not available + */ + private void saveEventFallback(TransactionDomainEvent event, Exception e) { + log.error("Database circuit breaker is OPEN or error occurred. Event: {}, Error: {}", + event.getAggregateId(), e.getMessage()); + throw new RuntimeException("Database service is currently unavailable. Please try again later.", e); + } + + /** + * Get all events for a specific aggregate + */ + @Transactional(readOnly = true) + @CircuitBreaker(name = "database", fallbackMethod = "getEventsFallback") + public List getEvents(UUID aggregateId) { + List domainEvents = domainEventRepository + .findByAggregateIdOrderByVersionAsc(aggregateId); + + return domainEvents.stream() + .map(this::deserializeEvent) + .toList(); + } + + /** + * Fallback method for getEvents when database is not available + */ + private List getEventsFallback(UUID aggregateId, Exception e) { + log.error("Database circuit breaker is OPEN or error occurred. Aggregate: {}, Error: {}", + aggregateId, e.getMessage()); + throw new RuntimeException("Database service is currently unavailable. Please try again later.", e); + } + + /** + * Check if an aggregate exists in the event store + */ + @Transactional(readOnly = true) + public boolean aggregateExists(UUID aggregateId) { + return domainEventRepository.existsByAggregateId(aggregateId); + } + + /** + * Get event count for an aggregate + */ + @Transactional(readOnly = true) + public long getEventCount(UUID aggregateId) { + return domainEventRepository.countByAggregateId(aggregateId); + } + + /** + * Deserialize a domain event from JSON + */ + private TransactionDomainEvent deserializeEvent(DomainEvent domainEvent) { + try { + String eventType = domainEvent.getEventType(); + String eventData = domainEvent.getEventData(); + + return switch (eventType) { + case "TransactionCreatedDomainEvent" -> + objectMapper.readValue(eventData, TransactionCreatedDomainEvent.class); + case "TransactionStatusChangedDomainEvent" -> + objectMapper.readValue(eventData, TransactionStatusChangedDomainEvent.class); + default -> { + log.error("Unknown event type: {}", eventType); + throw new IllegalArgumentException("Unknown event type: " + eventType); + } + }; + } catch (JsonProcessingException e) { + log.error("Error deserializing event: {}", domainEvent, e); + throw new RuntimeException("Failed to deserialize event", e); + } + } + + /** + * Get all events by type + */ + @Transactional(readOnly = true) + public List getEventsByType(String eventType) { + return domainEventRepository.findByEventTypeOrderByOccurredAtDesc(eventType) + .stream() + .map(this::deserializeEvent) + .toList(); + } + + /** + * Get all events for the aggregate type + */ + @Transactional(readOnly = true) + public List getAllTransactionEvents() { + return domainEventRepository.findByAggregateTypeOrderByOccurredAtDesc(AGGREGATE_TYPE) + .stream() + .map(this::deserializeEvent) + .toList(); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/config/KafkaConsumerConfig.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/config/KafkaConsumerConfig.java new file mode 100644 index 0000000000..818861f717 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/config/KafkaConsumerConfig.java @@ -0,0 +1,50 @@ +package com.yape.challenge.transaction.infrastructure.kafka.config; + +import com.yape.challenge.common.dto.TransactionStatusEvent; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.consumer.group-id}") + private String groupId; + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, TransactionStatusEvent.class.getName()); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setConcurrency(3); + return factory; + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/config/KafkaProducerConfig.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/config/KafkaProducerConfig.java new file mode 100644 index 0000000000..21a2bbb7b5 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,41 @@ +package com.yape.challenge.transaction.infrastructure.kafka.config; + +import com.yape.challenge.common.dto.TransactionCreatedEvent; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaProducerConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + configProps.put(ProducerConfig.ACKS_CONFIG, "all"); + configProps.put(ProducerConfig.RETRIES_CONFIG, 3); + configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/config/KafkaTopicConfig.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/config/KafkaTopicConfig.java new file mode 100644 index 0000000000..7743a7fb3f --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/config/KafkaTopicConfig.java @@ -0,0 +1,44 @@ +package com.yape.challenge.transaction.infrastructure.kafka.config; + +import com.yape.challenge.common.kafka.KafkaTopics; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.KafkaAdmin; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaTopicConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + return new KafkaAdmin(configs); + } + + @Bean + public NewTopic transactionCreatedTopic() { + return TopicBuilder.name(KafkaTopics.TRANSACTION_CREATED) + .partitions(3) + .replicas(1) + .build(); + } + + @Bean + public NewTopic transactionStatusUpdatedTopic() { + return TopicBuilder.name(KafkaTopics.TRANSACTION_STATUS_UPDATED) + .partitions(3) + .replicas(1) + .build(); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/consumer/TransactionStatusConsumer.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/consumer/TransactionStatusConsumer.java new file mode 100644 index 0000000000..f87afd9c39 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/consumer/TransactionStatusConsumer.java @@ -0,0 +1,62 @@ +package com.yape.challenge.transaction.infrastructure.kafka.consumer; + +import com.yape.challenge.common.dto.TransactionStatusEvent; +import com.yape.challenge.transaction.application.bus.CommandBus; +import com.yape.challenge.transaction.application.command.UpdateTransactionStatusCommand; +import com.yape.challenge.common.kafka.KafkaTopics; +import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; +import io.github.resilience4j.retry.annotation.Retry; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +@Slf4j +public class TransactionStatusConsumer { + + private final CommandBus commandBus; + + @KafkaListener(topics = KafkaTopics.TRANSACTION_STATUS_UPDATED, groupId = "${spring.kafka.consumer.group-id}") + @CircuitBreaker(name = "database", fallbackMethod = "consumeTransactionStatusFallback") + @Retry(name = "kafkaProducer") + public void consumeTransactionStatus(TransactionStatusEvent event) { + log.info("Received transaction status event: {}", event); + + try { + // Create command to update transaction status + UpdateTransactionStatusCommand command = UpdateTransactionStatusCommand.builder() + .externalId(event.getTransactionExternalId()) + .status(event.getStatus()) + .build(); + + // Dispatch command through command bus + commandBus.dispatch(command); + + log.info("Transaction status updated successfully for externalId: {}", + event.getTransactionExternalId()); + } catch (Exception e) { + log.error("Error updating transaction status for externalId: {}", + event.getTransactionExternalId(), e); + throw e; + } + } + + /** + * Fallback method when Circuit Breaker is open for database operations + */ + private void consumeTransactionStatusFallback(TransactionStatusEvent event, Exception ex) { + log.error("Circuit Breaker OPEN or all retries failed for transaction status update. " + + "TransactionExternalId: {}, Status: {}. Reason: {}", + event.getTransactionExternalId(), event.getStatus(), ex.getMessage()); + + // TODO: Implement fallback strategy: + // 1. Save to dead letter topic + // 2. Store in a retry queue + // 3. Send alert for manual intervention + + // Don't throw exception to avoid message reprocessing loop + log.warn("Message will be acknowledged to avoid reprocessing loop"); + } +} diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/exception/KafkaProducerException.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/exception/KafkaProducerException.java new file mode 100644 index 0000000000..05f92bf7ee --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/exception/KafkaProducerException.java @@ -0,0 +1,16 @@ +package com.yape.challenge.transaction.infrastructure.kafka.exception; + +/** + * Custom exception for Kafka producer errors + */ +public class KafkaProducerException extends RuntimeException { + + public KafkaProducerException(String message) { + super(message); + } + + public KafkaProducerException(String message, Throwable cause) { + super(message, cause); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/producer/KafkaProducerService.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/producer/KafkaProducerService.java new file mode 100644 index 0000000000..5d28309495 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/kafka/producer/KafkaProducerService.java @@ -0,0 +1,90 @@ +package com.yape.challenge.transaction.infrastructure.kafka.producer; + +import com.yape.challenge.common.dto.TransactionCreatedEvent; +import com.yape.challenge.transaction.infrastructure.kafka.exception.KafkaProducerException; +import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; +import io.github.resilience4j.retry.annotation.Retry; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Service for sending messages to Kafka with Circuit Breaker pattern + */ +@Service +@RequiredArgsConstructor +@Slf4j +public class KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + + /** + * Send transaction created event to Kafka with Circuit Breaker and Retry + * + * @param topic Topic name + * @param key Message key + * @param event Event to send + */ + @CircuitBreaker(name = "kafkaProducer", fallbackMethod = "sendEventFallback") + @Retry(name = "kafkaProducer") + public void sendTransactionCreatedEvent(String topic, String key, TransactionCreatedEvent event) { + log.info("Sending event to Kafka topic '{}' with key '{}': {}", topic, key, event); + + try { + CompletableFuture > future = + kafkaTemplate.send(topic, key, event); + + // Wait for the result with timeout to ensure Circuit Breaker catches exceptions + // Timeout of 5 seconds to fail fast if Kafka is unavailable + SendResult result = future.get(5, TimeUnit.SECONDS); + + if (result != null && result.getRecordMetadata() != null) { + log.info("Event sent successfully to topic '{}', partition: {}, offset: {}", + topic, + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } else { + log.info("Event sent successfully to topic '{}', but no record metadata was returned", topic); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Restore interrupt status + log.error("Thread interrupted while sending event to Kafka topic '{}': {}", topic, e.getMessage()); + throw new KafkaProducerException("Thread interrupted while sending event to Kafka", e); + } catch (Exception e) { + log.error("Failed to send event to Kafka topic '{}': {}", topic, e.getMessage()); + throw new KafkaProducerException("Error sending event to Kafka", e); + } + } + + /** + * Fallback method when Circuit Breaker is open or all retries fail + * This method logs the error and prevents cascading failures. + * In production, this could: + * - Save to a dead letter queue + * - Save to database for later retry + * - Send notification to monitoring system + */ + @SuppressWarnings("unused") // Used by Circuit Breaker via reflection + private void sendEventFallback(String topic, String key, TransactionCreatedEvent event, Exception ex) { + log.error("Circuit Breaker OPEN or all retries failed for Kafka producer. " + + "Topic: {}, Key: {}, Event: {}. Reason: {}", + topic, key, event, ex.getMessage()); + + // Strategy: Log and allow the application to continue + // The transaction is already created in the database (Event Sourcing) + // The antifraud service will not receive the notification immediately, + // but the system remains available for other operations + + log.warn("Transaction {} created but notification to antifraud service failed. " + + "Manual intervention or retry mechanism may be required.", event.getTransactionExternalId()); + + throw new KafkaProducerException("Kafka service is temporarily unavailable. Transaction created but notification failed.", ex); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/repository/DomainEventRepository.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/repository/DomainEventRepository.java new file mode 100644 index 0000000000..ec731fae2b --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/repository/DomainEventRepository.java @@ -0,0 +1,50 @@ +package com.yape.challenge.transaction.infrastructure.repository; + +import com.yape.challenge.transaction.domain.entity.DomainEvent; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; +import org.springframework.stereotype.Repository; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +/** + * Repository for DomainEvent persistence + */ +@Repository +public interface DomainEventRepository extends JpaRepository { + + /** + * Find all events for a specific aggregate, ordered by version + */ + List findByAggregateIdOrderByVersionAsc(UUID aggregateId); + + /** + * Find the last version number for an aggregate + */ + @Query("SELECT MAX(de.version) FROM DomainEvent de WHERE de.aggregateId = :aggregateId") + Optional findLastVersionByAggregateId(@Param("aggregateId") UUID aggregateId); + + /** + * Check if an aggregate has any events + */ + boolean existsByAggregateId(UUID aggregateId); + + /** + * Find events by aggregate type + */ + List findByAggregateTypeOrderByOccurredAtDesc(String aggregateType); + + /** + * Find events by event type + */ + List findByEventTypeOrderByOccurredAtDesc(String eventType); + + /** + * Count events for an aggregate + */ + long countByAggregateId(UUID aggregateId); +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/repository/TransactionRepository.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/repository/TransactionRepository.java new file mode 100644 index 0000000000..2a4f7bebbc --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/repository/TransactionRepository.java @@ -0,0 +1,17 @@ +package com.yape.challenge.transaction.infrastructure.repository; + +import com.yape.challenge.transaction.domain.entity.Transaction; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +import java.util.Optional; +import java.util.UUID; + +@Repository +public interface TransactionRepository extends JpaRepository { + + Optional findByExternalId(UUID externalId); + +} + + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/repository/TransactionTypeRepository.java b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/repository/TransactionTypeRepository.java new file mode 100644 index 0000000000..f10eed88ce --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/infrastructure/repository/TransactionTypeRepository.java @@ -0,0 +1,12 @@ +package com.yape.challenge.transaction.infrastructure.repository; + +import com.yape.challenge.transaction.domain.entity.TransactionType; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + + +@Repository +public interface TransactionTypeRepository extends JpaRepository { + +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/controller/EventStoreController.java b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/controller/EventStoreController.java new file mode 100644 index 0000000000..0b3d8e8bb4 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/controller/EventStoreController.java @@ -0,0 +1,95 @@ +package com.yape.challenge.transaction.presentation.controller; + +import com.yape.challenge.transaction.domain.event.TransactionDomainEvent; +import com.yape.challenge.transaction.infrastructure.eventstore.EventStore; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.UUID; + +/** + * REST Controller for Event Store queries (for debugging and auditing) + */ +@RestController +@RequestMapping("/api/v1/events") +@RequiredArgsConstructor +@Slf4j +public class EventStoreController { + + private final EventStore eventStore; + + /** + * Get all events for a specific transaction + * GET /api/v1/events/transaction/{transactionId} + */ + @GetMapping("/transaction/{transactionId}") + public ResponseEntity > getTransactionEvents( + @PathVariable UUID transactionId) { + log.info("Getting events for transaction: {}", transactionId); + + List
events = eventStore.getEvents(transactionId); + + if (events.isEmpty()) { + return ResponseEntity.notFound().build(); + } + + return ResponseEntity.ok(events); + } + + /** + * Get event count for a specific transaction + * GET /api/v1/events/transaction/{transactionId}/count + */ + @GetMapping("/transaction/{transactionId}/count") + public ResponseEntity getTransactionEventCount(@PathVariable UUID transactionId) { + log.info("Getting event count for transaction: {}", transactionId); + + long count = eventStore.getEventCount(transactionId); + + return ResponseEntity.ok(count); + } + + /** + * Get all events by event type + * GET /api/v1/events/type/{eventType} + */ + @GetMapping("/type/{eventType}") + public ResponseEntity > getEventsByType( + @PathVariable String eventType) { + log.info("Getting events by type: {}", eventType); + + List
events = eventStore.getEventsByType(eventType); + + return ResponseEntity.ok(events); + } + + /** + * Get all transaction events + * GET /api/v1/events/all + */ + @GetMapping("/all") + public ResponseEntity > getAllEvents() { + log.info("Getting all transaction events"); + + List
events = eventStore.getAllTransactionEvents(); + + return ResponseEntity.ok(events); + } + + /** + * Check if transaction exists in event store + * GET /api/v1/events/transaction/{transactionId}/exists + */ + @GetMapping("/transaction/{transactionId}/exists") + public ResponseEntity transactionExists(@PathVariable UUID transactionId) { + log.info("Checking if transaction exists in event store: {}", transactionId); + + boolean exists = eventStore.aggregateExists(transactionId); + + return ResponseEntity.ok(exists); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/controller/TransactionController.java b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/controller/TransactionController.java new file mode 100644 index 0000000000..a9f85d868e --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/controller/TransactionController.java @@ -0,0 +1,61 @@ +package com.yape.challenge.transaction.presentation.controller; + +import com.yape.challenge.transaction.application.bus.CommandBus; +import com.yape.challenge.transaction.application.bus.QueryBus; +import com.yape.challenge.transaction.application.command.CreateTransactionCommand; +import com.yape.challenge.transaction.application.dto.request.CreateTransactionRequest; +import com.yape.challenge.transaction.application.dto.response.TransactionResponse; +import com.yape.challenge.transaction.application.query.GetTransactionQuery; +import jakarta.validation.Valid; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.UUID; + +@RestController +@RequestMapping("/api/v1/transactions") +@RequiredArgsConstructor +@Slf4j +public class TransactionController { + + private final CommandBus commandBus; + private final QueryBus queryBus; + + @PostMapping + public ResponseEntity createTransaction( + @Valid @RequestBody CreateTransactionRequest request) { + log.info("POST /api/v1/transactions - Request: {}", request); + + // Create command from request + CreateTransactionCommand command = CreateTransactionCommand.builder() + .accountExternalIdDebit(request.getAccountExternalIdDebit()) + .accountExternalIdCredit(request.getAccountExternalIdCredit()) + .tranferTypeId(request.getTranferTypeId()) + .value(request.getValue()) + .build(); + + // Dispatch command through command bus + TransactionResponse response = commandBus.dispatch(command); + return ResponseEntity.status(HttpStatus.CREATED).body(response); + } + + @GetMapping("/{externalId}") + public ResponseEntity getTransaction( + @PathVariable UUID externalId) { + log.info("GET /api/v1/transactions/{}", externalId); + + // Create query + GetTransactionQuery query = GetTransactionQuery.builder() + .externalId(externalId) + .build(); + + // Dispatch query through query bus + TransactionResponse response = queryBus.dispatch(query); + return ResponseEntity.ok(response); + } +} + + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/exception/ErrorResponse.java b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/exception/ErrorResponse.java new file mode 100644 index 0000000000..b1d5e6c01e --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/exception/ErrorResponse.java @@ -0,0 +1,23 @@ +package com.yape.challenge.transaction.presentation.exception; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ErrorResponse { + + private LocalDateTime timestamp; + private int status; + private String error; + private String message; + private Map validationErrors; +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/exception/GlobalExceptionHandler.java b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/exception/GlobalExceptionHandler.java new file mode 100644 index 0000000000..6cd27206a6 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/exception/GlobalExceptionHandler.java @@ -0,0 +1,75 @@ +package com.yape.challenge.transaction.presentation.exception; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.validation.FieldError; +import org.springframework.web.bind.MethodArgumentNotValidException; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; + +@RestControllerAdvice +@Slf4j +public class GlobalExceptionHandler { + + @ExceptionHandler(ResourceNotFoundException.class) + public ResponseEntity handleResourceNotFoundException(ResourceNotFoundException ex) { + log.error("ResourceNotFoundException: {}", ex.getMessage()); + ErrorResponse error = ErrorResponse.builder() + .timestamp(LocalDateTime.now()) + .status(HttpStatus.NOT_FOUND.value()) + .error(HttpStatus.NOT_FOUND.getReasonPhrase()) + .message(ex.getMessage()) + .build(); + return ResponseEntity.status(HttpStatus.NOT_FOUND).body(error); + } + + @ExceptionHandler(IllegalArgumentException.class) + public ResponseEntity handleIllegalArgumentException(IllegalArgumentException ex) { + log.error("IllegalArgumentException: {}", ex.getMessage()); + ErrorResponse error = ErrorResponse.builder() + .timestamp(LocalDateTime.now()) + .status(HttpStatus.BAD_REQUEST.value()) + .error(HttpStatus.BAD_REQUEST.getReasonPhrase()) + .message(ex.getMessage()) + .build(); + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(error); + } + + @ExceptionHandler(MethodArgumentNotValidException.class) + public ResponseEntity handleValidationException(MethodArgumentNotValidException ex) { + log.error("Validation error: {}", ex.getMessage()); + Map errors = new HashMap<>(); + ex.getBindingResult().getAllErrors().forEach(error -> { + String fieldName = ((FieldError) error).getField(); + String errorMessage = error.getDefaultMessage(); + errors.put(fieldName, errorMessage); + }); + + ErrorResponse error = ErrorResponse.builder() + .timestamp(LocalDateTime.now()) + .status(HttpStatus.BAD_REQUEST.value()) + .error(HttpStatus.BAD_REQUEST.getReasonPhrase()) + .message("Validation failed") + .validationErrors(errors) + .build(); + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(error); + } + + @ExceptionHandler(Exception.class) + public ResponseEntity handleGenericException(Exception ex) { + log.error("Unexpected error: ", ex); + ErrorResponse error = ErrorResponse.builder() + .timestamp(LocalDateTime.now()) + .status(HttpStatus.INTERNAL_SERVER_ERROR.value()) + .error(HttpStatus.INTERNAL_SERVER_ERROR.getReasonPhrase()) + .message("An unexpected error occurred") + .build(); + return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(error); + } +} + diff --git a/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/exception/ResourceNotFoundException.java b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/exception/ResourceNotFoundException.java new file mode 100644 index 0000000000..721be85e97 --- /dev/null +++ b/transaction-service/src/main/java/com/yape/challenge/transaction/presentation/exception/ResourceNotFoundException.java @@ -0,0 +1,16 @@ +package com.yape.challenge.transaction.presentation.exception; + +/** + * Exception thrown when a requested resource is not found + */ +public class ResourceNotFoundException extends RuntimeException { + + public ResourceNotFoundException(String message) { + super(message); + } + + public ResourceNotFoundException(String message, Throwable cause) { + super(message, cause); + } +} + diff --git a/transaction-service/src/main/resources/application-docker.yml b/transaction-service/src/main/resources/application-docker.yml new file mode 100644 index 0000000000..353070184a --- /dev/null +++ b/transaction-service/src/main/resources/application-docker.yml @@ -0,0 +1,63 @@ +spring: + application: + name: transaction-service + + datasource: + url: ${SPRING_DATASOURCE_URL:jdbc:postgresql://localhost:5432/yape_transactions} + username: ${SPRING_DATASOURCE_USERNAME:yapeuser} + password: ${SPRING_DATASOURCE_PASSWORD:YapePass2026} + driver-class-name: org.postgresql.Driver + hikari: + maximum-pool-size: 20 + minimum-idle: 5 + connection-timeout: 3000 # Fallar rápido si la BD no está disponible + validation-timeout: 2000 + idle-timeout: 600000 + max-lifetime: 1800000 + connection-test-query: SELECT 1 + + sql: + init: + mode: always + data-locations: classpath:db/data.sql + continue-on-error: false + + jpa: + defer-datasource-initialization: true + hibernate: + ddl-auto: update + show-sql: false + properties: + hibernate: + dialect: org.hibernate.dialect.PostgreSQLDialect + format_sql: true + use_sql_comments: true + + kafka: + bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:9092} + consumer: + group-id: transaction-service-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + +server: + port: 8080 + +management: + endpoints: + web: + exposure: + include: health,info,metrics + endpoint: + health: + show-details: always + +logging: + level: + com.yape.challenge: INFO + org.springframework.kafka: WARN + diff --git a/transaction-service/src/main/resources/application.yml b/transaction-service/src/main/resources/application.yml new file mode 100644 index 0000000000..0e5fa365c8 --- /dev/null +++ b/transaction-service/src/main/resources/application.yml @@ -0,0 +1,173 @@ +# Application name +spring: + application: + name: transaction-service + + # Database configuration + datasource: + url: ${SPRING_DATASOURCE_URL:jdbc:postgresql://localhost:5432/yape_transactions} + username: ${SPRING_DATASOURCE_USERNAME:yapeuser} + password: ${SPRING_DATASOURCE_PASSWORD:YapePass2026} + driver-class-name: org.postgresql.Driver + + # HikariCP configuration (optimized for high volume) + hikari: + maximum-pool-size: 50 + minimum-idle: 10 + connection-timeout: 3000 # Fallar rápido si la BD no está disponible + validation-timeout: 2000 + idle-timeout: 300000 + max-lifetime: 1800000 + leak-detection-threshold: 60000 + auto-commit: false + connection-test-query: SELECT 1 + data-source-properties: + cachePrepStmts: true + prepStmtCacheSize: 250 + prepStmtCacheSqlLimit: 2048 + useServerPrepStmts: true + + # Redis configuration (for distributed cache) + data: + redis: + host: ${SPRING_REDIS_HOST:localhost} + port: ${SPRING_REDIS_PORT:6379} + timeout: 2000ms + lettuce: + pool: + max-active: 20 + max-idle: 10 + min-idle: 5 + max-wait: 2000ms + shutdown-timeout: 200ms + + # Cache configuration + cache: + type: redis + redis: + time-to-live: 300000 # 5 minutes + cache-null-values: false + use-key-prefix: true + key-prefix: "yape:txn:" + + # SQL initialization - Ejecuta después de que Hibernate cree las tablas + sql: + init: + mode: always + data-locations: classpath:db/data.sql + continue-on-error: false + + # JPA/Hibernate configuration + jpa: + defer-datasource-initialization: true # Espera a que Hibernate cree las tablas primero + hibernate: + ddl-auto: update + show-sql: false + properties: + hibernate: + dialect: org.hibernate.dialect.PostgreSQLDialect + format_sql: true + use_sql_comments: true + + # Kafka configuration + kafka: + bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:9092} + consumer: + group-id: transaction-service-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + properties: + max.block.ms: 5000 # Timeout máximo para bloqueos en send/partitionsFor + request.timeout.ms: 5000 # Timeout de solicitud + delivery.timeout.ms: 10000 # Timeout total de entrega + linger.ms: 0 # No esperar para enviar + retries: 2 # Reintentos a nivel de Kafka + +# Server configuration +server: + port: ${SERVER_PORT:8080} + +# Management/Actuator configuration +management: + endpoints: + web: + exposure: + include: health,info,metrics,circuitbreakers,circuitbreakerevents + endpoint: + health: + show-details: always + health: + circuitbreakers: + enabled: true + +# Resilience4j Circuit Breaker configuration +resilience4j: + circuitbreaker: + configs: + default: + registerHealthIndicator: true + slidingWindowSize: 10 + minimumNumberOfCalls: 5 + permittedNumberOfCallsInHalfOpenState: 3 + automaticTransitionFromOpenToHalfOpenEnabled: true + waitDurationInOpenState: 10s + failureRateThreshold: 50 + eventConsumerBufferSize: 10 + recordExceptions: + - org.springframework.kafka.KafkaException + - java.util.concurrent.TimeoutException + - org.springframework.dao.DataAccessException + - org.springframework.dao.DataAccessResourceFailureException + - org.hibernate.exception.JDBCConnectionException + - java.sql.SQLTransientConnectionException + - java.sql.SQLException + - java.util.concurrent.ExecutionException + - org.apache.kafka.common.errors.TimeoutException + - org.apache.kafka.common.errors.DisconnectException + - java.net.ConnectException + - java.net.UnknownHostException + - com.yape.challenge.transaction.infrastructure.kafka.exception.KafkaProducerException + instances: + kafkaProducer: + baseConfig: default + slidingWindowSize: 10 + minimumNumberOfCalls: 3 + waitDurationInOpenState: 30s + failureRateThreshold: 50 + slowCallRateThreshold: 80 + slowCallDurationThreshold: 5s + database: + baseConfig: default + slidingWindowSize: 10 + minimumNumberOfCalls: 3 + waitDurationInOpenState: 15s + failureRateThreshold: 60 + slowCallRateThreshold: 70 + slowCallDurationThreshold: 3s + + retry: + configs: + default: + maxAttempts: 3 + waitDuration: 1s + enableExponentialBackoff: true + exponentialBackoffMultiplier: 2 + instances: + kafkaProducer: + maxAttempts: 2 + waitDuration: 1s + retryExceptions: + - org.springframework.kafka.KafkaException + - java.util.concurrent.TimeoutException + - java.util.concurrent.ExecutionException + +# Logging configuration +logging: + level: + com.yape.challenge: ${LOGGING_LEVEL:INFO} + org.springframework.kafka: WARN + diff --git a/transaction-service/src/main/resources/db/data.sql b/transaction-service/src/main/resources/db/data.sql new file mode 100644 index 0000000000..1b7689b1c5 --- /dev/null +++ b/transaction-service/src/main/resources/db/data.sql @@ -0,0 +1,12 @@ +-- PostgreSQL data initialization +-- This script inserts initial data AFTER the schema is created + +-- Insert default transaction types (only if not exists) +INSERT INTO transaction_types (name, description) +VALUES + ('TRANSFER', 'Transfer between accounts'), + ('PAYMENT', 'Payment transaction'), + ('WITHDRAWAL', 'Cash withdrawal'), + ('DEPOSIT', 'Cash deposit') +ON CONFLICT (name) DO NOTHING; + diff --git a/transaction-service/src/test/java/com/yape/challenge/transaction/TransactionServiceApplicationTest.java b/transaction-service/src/test/java/com/yape/challenge/transaction/TransactionServiceApplicationTest.java new file mode 100644 index 0000000000..e794060440 --- /dev/null +++ b/transaction-service/src/test/java/com/yape/challenge/transaction/TransactionServiceApplicationTest.java @@ -0,0 +1,92 @@ +package com.yape.challenge.transaction; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ApplicationContext; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; + +import static org.junit.jupiter.api.Assertions.*; + +@SpringBootTest( + properties = { + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.consumer.group-id=test-app-group", + "spring.datasource.url=jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;CASE_INSENSITIVE_IDENTIFIERS=TRUE;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;DEFAULT_NULL_ORDERING=HIGH", + "spring.datasource.driver-class-name=org.h2.Driver", + "spring.sql.init.mode=never", + "spring.jpa.hibernate.ddl-auto=create-drop", + "spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.H2Dialect", + "spring.cache.type=none", + "spring.data.redis.repositories.enabled=false" + } +) +@EmbeddedKafka( + partitions = 1, + topics = {"transaction-created", "transaction-status-updated"} +) +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@DisplayName("Transaction Service Application Tests") +class TransactionServiceApplicationTest { + + @Autowired + private ApplicationContext applicationContext; + + @Test + @DisplayName("Should load application context") + void shouldLoadApplicationContext() { + assertNotNull(applicationContext); + } + + @Test + @DisplayName("Should have transaction repository bean") + void shouldHaveTransactionRepositoryBean() { + assertTrue(applicationContext.containsBean("transactionRepository")); + } + + @Test + @DisplayName("Should have event store bean") + void shouldHaveEventStoreBean() { + assertTrue(applicationContext.containsBean("eventStore")); + } + + @Test + @DisplayName("Should have transaction aggregate service bean") + void shouldHaveTransactionAggregateServiceBean() { + assertTrue(applicationContext.containsBean("transactionAggregateService")); + } + + @Test + @DisplayName("Should have kafka template bean") + void shouldHaveKafkaTemplateBean() { + assertNotNull(applicationContext.getBean(KafkaTemplate.class)); + } + + @Test + @DisplayName("Should have command bus bean") + void shouldHaveCommandBusBean() { + assertTrue(applicationContext.containsBean("commandBus")); + } + + @Test + @DisplayName("Should have query bus bean") + void shouldHaveQueryBusBean() { + assertTrue(applicationContext.containsBean("queryBus")); + } + + @Test + @DisplayName("Should have transaction status consumer bean") + void shouldHaveTransactionStatusConsumerBean() { + assertTrue(applicationContext.containsBean("transactionStatusConsumer")); + } + + @Test + @DisplayName("Should have kafka producer service bean") + void shouldHaveKafkaProducerServiceBean() { + assertTrue(applicationContext.containsBean("kafkaProducerService")); + } +} + diff --git a/transaction-service/src/test/java/com/yape/challenge/transaction/domain/entity/TransactionTest.java b/transaction-service/src/test/java/com/yape/challenge/transaction/domain/entity/TransactionTest.java new file mode 100644 index 0000000000..5f16f4463f --- /dev/null +++ b/transaction-service/src/test/java/com/yape/challenge/transaction/domain/entity/TransactionTest.java @@ -0,0 +1,113 @@ +package com.yape.challenge.transaction.domain.entity; + +import com.yape.challenge.common.dto.TransactionStatus; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; + +@DisplayName("Transaction Entity Tests") +class TransactionTest { + + private Transaction transaction; + + @BeforeEach + void setUp() { + transaction = new Transaction(); + } + + @Test + @DisplayName("Should generate UUID on pre-persist when external id is null") + void shouldGenerateUuidOnPrePersist() { + // When + transaction.onCreate(); + + // Then + assertNotNull(transaction.getExternalId()); + } + + @Test + @DisplayName("Should not override existing external id on pre-persist") + void shouldNotOverrideExistingExternalId() { + // Given + UUID existingId = UUID.randomUUID(); + transaction.setExternalId(existingId); + + // When + transaction.onCreate(); + + // Then + assertEquals(existingId, transaction.getExternalId()); + } + + @Test + @DisplayName("Should set default status to PENDING on pre-persist") + void shouldSetDefaultStatusToPending() { + // When + transaction.onCreate(); + + // Then + assertEquals(TransactionStatus.PENDING, transaction.getStatus()); + } + + @Test + @DisplayName("Should not override existing status on pre-persist") + void shouldNotOverrideExistingStatus() { + // Given + transaction.setStatus(TransactionStatus.APPROVED); + + // When + transaction.onCreate(); + + // Then + assertEquals(TransactionStatus.APPROVED, transaction.getStatus()); + } + + @Test + @DisplayName("Should create transaction with builder") + void shouldCreateTransactionWithBuilder() { + // Given + UUID externalId = UUID.randomUUID(); + UUID debitId = UUID.randomUUID(); + UUID creditId = UUID.randomUUID(); + BigDecimal value = new BigDecimal("100.00"); + + // When + Transaction transaction = Transaction.builder() + .externalId(externalId) + .accountExternalIdDebit(debitId) + .accountExternalIdCredit(creditId) + .transferTypeId(1) + .value(value) + .status(TransactionStatus.PENDING) + .build(); + + // Then + assertNotNull(transaction); + assertEquals(externalId, transaction.getExternalId()); + assertEquals(debitId, transaction.getAccountExternalIdDebit()); + assertEquals(creditId, transaction.getAccountExternalIdCredit()); + assertEquals(1, transaction.getTransferTypeId()); + assertEquals(value, transaction.getValue()); + assertEquals(TransactionStatus.PENDING, transaction.getStatus()); + } + + @Test + @DisplayName("Should support all transaction statuses") + void shouldSupportAllTransactionStatuses() { + // Test each status + transaction.setStatus(TransactionStatus.PENDING); + assertEquals(TransactionStatus.PENDING, transaction.getStatus()); + + transaction.setStatus(TransactionStatus.APPROVED); + assertEquals(TransactionStatus.APPROVED, transaction.getStatus()); + + transaction.setStatus(TransactionStatus.REJECTED); + assertEquals(TransactionStatus.REJECTED, transaction.getStatus()); + } +} + diff --git a/transaction-service/src/test/java/com/yape/challenge/transaction/domain/service/TransactionAggregateServiceTest.java b/transaction-service/src/test/java/com/yape/challenge/transaction/domain/service/TransactionAggregateServiceTest.java new file mode 100644 index 0000000000..f76bd1741f --- /dev/null +++ b/transaction-service/src/test/java/com/yape/challenge/transaction/domain/service/TransactionAggregateServiceTest.java @@ -0,0 +1,252 @@ +package com.yape.challenge.transaction.domain.service; + +import com.yape.challenge.common.dto.TransactionStatus; +import com.yape.challenge.transaction.domain.entity.Transaction; +import com.yape.challenge.transaction.domain.event.TransactionCreatedDomainEvent; +import com.yape.challenge.transaction.domain.event.TransactionDomainEvent; +import com.yape.challenge.transaction.domain.event.TransactionStatusChangedDomainEvent; +import com.yape.challenge.transaction.infrastructure.eventstore.EventStore; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +@DisplayName("Transaction Aggregate Service Tests") +class TransactionAggregateServiceTest { + + @Mock + private EventStore eventStore; + + @InjectMocks + private TransactionAggregateService transactionAggregateService; + + private UUID transactionId; + private UUID debitAccountId; + private UUID creditAccountId; + + @BeforeEach + void setUp() { + transactionId = UUID.randomUUID(); + debitAccountId = UUID.randomUUID(); + creditAccountId = UUID.randomUUID(); + } + + @Test + @DisplayName("Should rebuild transaction from creation event") + void shouldRebuildTransactionFromCreationEvent() { + // Given + TransactionCreatedDomainEvent createdEvent = TransactionCreatedDomainEvent.builder() + .aggregateId(transactionId) + .accountExternalIdDebit(debitAccountId) + .accountExternalIdCredit(creditAccountId) + .transferTypeId(1) + .value(new BigDecimal("500.00")) + .occurredAt(LocalDateTime.now()) + .build(); + + when(eventStore.getEvents(transactionId)) + .thenReturn(Collections.singletonList(createdEvent)); + + // When + Transaction transaction = transactionAggregateService.rebuildFromEvents(transactionId); + + // Then + assertNotNull(transaction); + assertEquals(transactionId, transaction.getExternalId()); + assertEquals(debitAccountId, transaction.getAccountExternalIdDebit()); + assertEquals(creditAccountId, transaction.getAccountExternalIdCredit()); + assertEquals(1, transaction.getTransferTypeId()); + assertEquals(new BigDecimal("500.00"), transaction.getValue()); + assertEquals(TransactionStatus.PENDING, transaction.getStatus()); + verify(eventStore).getEvents(transactionId); + } + + @Test + @DisplayName("Should rebuild transaction from multiple events") + void shouldRebuildTransactionFromMultipleEvents() { + // Given + LocalDateTime createdAt = LocalDateTime.now().minusMinutes(5); + LocalDateTime updatedAt = LocalDateTime.now(); + + TransactionCreatedDomainEvent createdEvent = TransactionCreatedDomainEvent.builder() + .aggregateId(transactionId) + .accountExternalIdDebit(debitAccountId) + .accountExternalIdCredit(creditAccountId) + .transferTypeId(1) + .value(new BigDecimal("500.00")) + .occurredAt(createdAt) + .build(); + + TransactionStatusChangedDomainEvent statusChangedEvent = TransactionStatusChangedDomainEvent.builder() + .aggregateId(transactionId) + .oldStatus(TransactionStatus.PENDING) + .newStatus(TransactionStatus.APPROVED) + .occurredAt(updatedAt) + .build(); + + List events = Arrays.asList(createdEvent, statusChangedEvent); + when(eventStore.getEvents(transactionId)).thenReturn(events); + + // When + Transaction transaction = transactionAggregateService.rebuildFromEvents(transactionId); + + // Then + assertNotNull(transaction); + assertEquals(transactionId, transaction.getExternalId()); + assertEquals(TransactionStatus.APPROVED, transaction.getStatus()); + assertEquals(updatedAt, transaction.getUpdatedAt()); + verify(eventStore).getEvents(transactionId); + } + + @Test + @DisplayName("Should throw exception when no events found") + void shouldThrowExceptionWhenNoEventsFound() { + // Given + when(eventStore.getEvents(transactionId)).thenReturn(Collections.emptyList()); + + // When & Then + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + transactionAggregateService.rebuildFromEvents(transactionId) + ); + + assertTrue(exception.getMessage().contains("No events found")); + verify(eventStore).getEvents(transactionId); + } + + @Test + @DisplayName("Should apply creation event correctly") + void shouldApplyCreationEventCorrectly() { + // Given + Transaction transaction = new Transaction(); + transaction.setExternalId(transactionId); + + TransactionCreatedDomainEvent event = TransactionCreatedDomainEvent.builder() + .aggregateId(transactionId) + .accountExternalIdDebit(debitAccountId) + .accountExternalIdCredit(creditAccountId) + .transferTypeId(1) + .value(new BigDecimal("500.00")) + .occurredAt(LocalDateTime.now()) + .build(); + + // When + transactionAggregateService.applyEvent(transaction, event); + + // Then + assertEquals(debitAccountId, transaction.getAccountExternalIdDebit()); + assertEquals(creditAccountId, transaction.getAccountExternalIdCredit()); + assertEquals(TransactionStatus.PENDING, transaction.getStatus()); + } + + @Test + @DisplayName("Should apply status changed event correctly") + void shouldApplyStatusChangedEventCorrectly() { + // Given + Transaction transaction = new Transaction(); + transaction.setExternalId(transactionId); + transaction.setStatus(TransactionStatus.PENDING); + + LocalDateTime updatedAt = LocalDateTime.now(); + TransactionStatusChangedDomainEvent event = TransactionStatusChangedDomainEvent.builder() + .aggregateId(transactionId) + .oldStatus(TransactionStatus.PENDING) + .newStatus(TransactionStatus.APPROVED) + .occurredAt(updatedAt) + .build(); + + // When + transactionAggregateService.applyEvent(transaction, event); + + // Then + assertEquals(TransactionStatus.APPROVED, transaction.getStatus()); + assertEquals(updatedAt, transaction.getUpdatedAt()); + } + + @Test + @DisplayName("Should check if transaction exists") + void shouldCheckIfTransactionExists() { + // Given + when(eventStore.aggregateExists(transactionId)).thenReturn(true); + + // When + boolean exists = transactionAggregateService.transactionExists(transactionId); + + // Then + assertTrue(exists); + verify(eventStore).aggregateExists(transactionId); + } + + @Test + @DisplayName("Should return false when transaction does not exist") + void shouldReturnFalseWhenTransactionDoesNotExist() { + // Given + when(eventStore.aggregateExists(transactionId)).thenReturn(false); + + // When + boolean exists = transactionAggregateService.transactionExists(transactionId); + + // Then + assertFalse(exists); + verify(eventStore).aggregateExists(transactionId); + } + + @Test + @DisplayName("Should get event count") + void shouldGetEventCount() { + // Given + when(eventStore.getEventCount(transactionId)).thenReturn(3L); + + // When + long count = transactionAggregateService.getEventCount(transactionId); + + // Then + assertEquals(3L, count); + verify(eventStore).getEventCount(transactionId); + } + + @Test + @DisplayName("Should rebuild transaction with status changes from pending to rejected") + void shouldRebuildTransactionWithStatusChangesToRejected() { + // Given + TransactionCreatedDomainEvent createdEvent = TransactionCreatedDomainEvent.builder() + .aggregateId(transactionId) + .accountExternalIdDebit(debitAccountId) + .accountExternalIdCredit(creditAccountId) + .transferTypeId(1) + .value(new BigDecimal("2000.00")) + .occurredAt(LocalDateTime.now().minusMinutes(5)) + .build(); + + TransactionStatusChangedDomainEvent statusChangedEvent = TransactionStatusChangedDomainEvent.builder() + .aggregateId(transactionId) + .oldStatus(TransactionStatus.PENDING) + .newStatus(TransactionStatus.REJECTED) + .occurredAt(LocalDateTime.now()) + .build(); + + when(eventStore.getEvents(transactionId)) + .thenReturn(Arrays.asList(createdEvent, statusChangedEvent)); + + // When + Transaction transaction = transactionAggregateService.rebuildFromEvents(transactionId); + + // Then + assertEquals(TransactionStatus.REJECTED, transaction.getStatus()); + verify(eventStore).getEvents(transactionId); + } +} + diff --git a/transaction-service/src/test/java/com/yape/challenge/transaction/infrastructure/eventstore/EventStoreTest.java b/transaction-service/src/test/java/com/yape/challenge/transaction/infrastructure/eventstore/EventStoreTest.java new file mode 100644 index 0000000000..d14c888403 --- /dev/null +++ b/transaction-service/src/test/java/com/yape/challenge/transaction/infrastructure/eventstore/EventStoreTest.java @@ -0,0 +1,203 @@ +package com.yape.challenge.transaction.infrastructure.eventstore; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yape.challenge.transaction.domain.entity.DomainEvent; +import com.yape.challenge.transaction.domain.event.TransactionCreatedDomainEvent; +import com.yape.challenge.transaction.domain.event.TransactionStatusChangedDomainEvent; +import com.yape.challenge.transaction.infrastructure.repository.DomainEventRepository; +import com.yape.challenge.common.dto.TransactionStatus; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +@DisplayName("Event Store Tests") +class EventStoreTest { + + @Mock + private DomainEventRepository domainEventRepository; + + @Mock + private ObjectMapper objectMapper; + + @InjectMocks + private EventStore eventStore; + + private UUID aggregateId; + private TransactionCreatedDomainEvent createdEvent; + private TransactionStatusChangedDomainEvent statusChangedEvent; + + @BeforeEach + void setUp() { + aggregateId = UUID.randomUUID(); + + createdEvent = TransactionCreatedDomainEvent.builder() + .aggregateId(aggregateId) + .accountExternalIdDebit(UUID.randomUUID()) + .accountExternalIdCredit(UUID.randomUUID()) + .transferTypeId(1) + .value(new BigDecimal("500.00")) + .occurredAt(LocalDateTime.now()) + .build(); + + statusChangedEvent = TransactionStatusChangedDomainEvent.builder() + .aggregateId(aggregateId) + .oldStatus(TransactionStatus.PENDING) + .newStatus(TransactionStatus.APPROVED) + .occurredAt(LocalDateTime.now()) + .build(); + } + + @Test + @DisplayName("Should save transaction created event") + void shouldSaveTransactionCreatedEvent() throws Exception { + // Given + when(domainEventRepository.findLastVersionByAggregateId(aggregateId)) + .thenReturn(Optional.empty()); + when(objectMapper.writeValueAsString(any())).thenReturn("{}"); + when(domainEventRepository.save(any(DomainEvent.class))) + .thenAnswer(invocation -> invocation.getArgument(0)); + + // When + eventStore.saveEvent(createdEvent); + + // Then + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(DomainEvent.class); + verify(domainEventRepository).save(eventCaptor.capture()); + + DomainEvent savedEvent = eventCaptor.getValue(); + assertEquals(aggregateId, savedEvent.getAggregateId()); + assertEquals("Transaction", savedEvent.getAggregateType()); + assertEquals(1, savedEvent.getVersion()); + } + + @Test + @DisplayName("Should increment version when saving subsequent events") + void shouldIncrementVersionWhenSavingSubsequentEvents() throws Exception { + // Given + when(domainEventRepository.findLastVersionByAggregateId(aggregateId)) + .thenReturn(Optional.of(2)); + when(objectMapper.writeValueAsString(any())).thenReturn("{}"); + when(domainEventRepository.save(any(DomainEvent.class))) + .thenAnswer(invocation -> invocation.getArgument(0)); + + // When + eventStore.saveEvent(statusChangedEvent); + + // Then + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(DomainEvent.class); + verify(domainEventRepository).save(eventCaptor.capture()); + + DomainEvent savedEvent = eventCaptor.getValue(); + assertEquals(3, savedEvent.getVersion()); + } + + @Test + @DisplayName("Should throw exception when serialization fails") + void shouldThrowExceptionWhenSerializationFails() throws Exception { + // Given + when(domainEventRepository.findLastVersionByAggregateId(aggregateId)) + .thenReturn(Optional.empty()); + when(objectMapper.writeValueAsString(any())) + .thenThrow(new RuntimeException("Serialization error")); + + // When & Then + assertThrows(RuntimeException.class, () -> + eventStore.saveEvent(createdEvent) + ); + + verify(domainEventRepository, never()).save(any()); + } + + @Test + @DisplayName("Should save event with correct event type") + void shouldSaveEventWithCorrectEventType() throws Exception { + // Given + when(domainEventRepository.findLastVersionByAggregateId(aggregateId)) + .thenReturn(Optional.empty()); + when(objectMapper.writeValueAsString(any())).thenReturn("{}"); + when(domainEventRepository.save(any(DomainEvent.class))) + .thenAnswer(invocation -> invocation.getArgument(0)); + + // When + eventStore.saveEvent(createdEvent); + + // Then + ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(DomainEvent.class); + verify(domainEventRepository).save(eventCaptor.capture()); + + DomainEvent savedEvent = eventCaptor.getValue(); + assertEquals("TransactionCreatedDomainEvent", savedEvent.getEventType()); + } + + @Test + @DisplayName("Should check if aggregate exists") + void shouldCheckIfAggregateExists() { + // Given + when(domainEventRepository.existsByAggregateId(aggregateId)).thenReturn(true); + + // When + boolean exists = eventStore.aggregateExists(aggregateId); + + // Then + assertTrue(exists); + verify(domainEventRepository).existsByAggregateId(aggregateId); + } + + @Test + @DisplayName("Should return false when aggregate does not exist") + void shouldReturnFalseWhenAggregateDoesNotExist() { + // Given + when(domainEventRepository.existsByAggregateId(aggregateId)).thenReturn(false); + + // When + boolean exists = eventStore.aggregateExists(aggregateId); + + // Then + assertFalse(exists); + verify(domainEventRepository).existsByAggregateId(aggregateId); + } + + @Test + @DisplayName("Should get event count for aggregate") + void shouldGetEventCountForAggregate() { + // Given + when(domainEventRepository.countByAggregateId(aggregateId)).thenReturn(5L); + + // When + long count = eventStore.getEventCount(aggregateId); + + // Then + assertEquals(5L, count); + verify(domainEventRepository).countByAggregateId(aggregateId); + } + + @Test + @DisplayName("Should return zero when no events exist for aggregate") + void shouldReturnZeroWhenNoEventsExistForAggregate() { + // Given + when(domainEventRepository.countByAggregateId(aggregateId)).thenReturn(0L); + + // When + long count = eventStore.getEventCount(aggregateId); + + // Then + assertEquals(0L, count); + verify(domainEventRepository).countByAggregateId(aggregateId); + } +} + diff --git a/transaction-service/src/test/java/com/yape/challenge/transaction/infrastructure/kafka/consumer/TransactionStatusConsumerTest.java b/transaction-service/src/test/java/com/yape/challenge/transaction/infrastructure/kafka/consumer/TransactionStatusConsumerTest.java new file mode 100644 index 0000000000..3b61358d24 --- /dev/null +++ b/transaction-service/src/test/java/com/yape/challenge/transaction/infrastructure/kafka/consumer/TransactionStatusConsumerTest.java @@ -0,0 +1,144 @@ +package com.yape.challenge.transaction.infrastructure.kafka.consumer; + +import com.yape.challenge.common.dto.TransactionStatus; +import com.yape.challenge.common.dto.TransactionStatusEvent; +import com.yape.challenge.transaction.application.bus.CommandBus; +import com.yape.challenge.transaction.application.command.UpdateTransactionStatusCommand; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +@DisplayName("Transaction Status Consumer Tests") +class TransactionStatusConsumerTest { + + @Mock + private CommandBus commandBus; + + @InjectMocks + private TransactionStatusConsumer transactionStatusConsumer; + + private UUID transactionId; + private TransactionStatusEvent statusEvent; + + @BeforeEach + void setUp() { + transactionId = UUID.randomUUID(); + statusEvent = TransactionStatusEvent.builder() + .transactionExternalId(transactionId) + .status(TransactionStatus.APPROVED) + .build(); + } + + @Test + @DisplayName("Should consume and process transaction status event successfully") + void shouldConsumeAndProcessTransactionStatusEventSuccessfully() { + // Given + when(commandBus.dispatch(any(UpdateTransactionStatusCommand.class))).thenReturn(null); + + // When + transactionStatusConsumer.consumeTransactionStatus(statusEvent); + + // Then + verify(commandBus, times(1)).dispatch(any(UpdateTransactionStatusCommand.class)); + verify(commandBus).dispatch(argThat(command -> + command instanceof UpdateTransactionStatusCommand && + ((UpdateTransactionStatusCommand) command).getExternalId().equals(transactionId) && + ((UpdateTransactionStatusCommand) command).getStatus().equals(TransactionStatus.APPROVED) + )); + } + + @Test + @DisplayName("Should consume rejected transaction status") + void shouldConsumeRejectedTransactionStatus() { + // Given + TransactionStatusEvent rejectedEvent = TransactionStatusEvent.builder() + .transactionExternalId(transactionId) + .status(TransactionStatus.REJECTED) + .build(); + + when(commandBus.dispatch(any(UpdateTransactionStatusCommand.class))).thenReturn(null); + + // When + transactionStatusConsumer.consumeTransactionStatus(rejectedEvent); + + // Then + verify(commandBus, times(1)).dispatch(any(UpdateTransactionStatusCommand.class)); + verify(commandBus).dispatch(argThat(command -> + command instanceof UpdateTransactionStatusCommand && + ((UpdateTransactionStatusCommand) command).getStatus().equals(TransactionStatus.REJECTED) + )); + } + + @Test + @DisplayName("Should propagate exception when command bus fails") + void shouldPropagateExceptionWhenCommandBusFails() { + // Given + RuntimeException exception = new RuntimeException("Command bus failed"); + doThrow(exception).when(commandBus).dispatch(any(UpdateTransactionStatusCommand.class)); + + // When & Then + RuntimeException thrown = assertThrows(RuntimeException.class, () -> + transactionStatusConsumer.consumeTransactionStatus(statusEvent) + ); + + assertEquals("Command bus failed", thrown.getMessage()); + verify(commandBus, times(1)).dispatch(any(UpdateTransactionStatusCommand.class)); + } + + @Test + @DisplayName("Should process multiple consecutive events") + void shouldProcessMultipleConsecutiveEvents() { + // Given + TransactionStatusEvent event1 = TransactionStatusEvent.builder() + .transactionExternalId(UUID.randomUUID()) + .status(TransactionStatus.APPROVED) + .build(); + + TransactionStatusEvent event2 = TransactionStatusEvent.builder() + .transactionExternalId(UUID.randomUUID()) + .status(TransactionStatus.REJECTED) + .build(); + + when(commandBus.dispatch(any(UpdateTransactionStatusCommand.class))).thenReturn(null); + + // When + transactionStatusConsumer.consumeTransactionStatus(event1); + transactionStatusConsumer.consumeTransactionStatus(event2); + + // Then + verify(commandBus, times(2)).dispatch(any(UpdateTransactionStatusCommand.class)); + } + + @Test + @DisplayName("Should handle event with correct transaction id") + void shouldHandleEventWithCorrectTransactionId() { + // Given + UUID specificTransactionId = UUID.randomUUID(); + TransactionStatusEvent specificEvent = TransactionStatusEvent.builder() + .transactionExternalId(specificTransactionId) + .status(TransactionStatus.APPROVED) + .build(); + + when(commandBus.dispatch(any(UpdateTransactionStatusCommand.class))).thenReturn(null); + + // When + transactionStatusConsumer.consumeTransactionStatus(specificEvent); + + // Then + verify(commandBus).dispatch(argThat(command -> + ((UpdateTransactionStatusCommand) command).getExternalId().equals(specificTransactionId) + )); + } +} + diff --git a/transaction-service/src/test/java/com/yape/challenge/transaction/infrastructure/kafka/producer/KafkaProducerServiceTest.java b/transaction-service/src/test/java/com/yape/challenge/transaction/infrastructure/kafka/producer/KafkaProducerServiceTest.java new file mode 100644 index 0000000000..689ea3c2f5 --- /dev/null +++ b/transaction-service/src/test/java/com/yape/challenge/transaction/infrastructure/kafka/producer/KafkaProducerServiceTest.java @@ -0,0 +1,159 @@ +package com.yape.challenge.transaction.infrastructure.kafka.producer; + +import com.yape.challenge.common.dto.TransactionCreatedEvent; +import com.yape.challenge.common.kafka.KafkaTopics; +import com.yape.challenge.transaction.infrastructure.kafka.exception.KafkaProducerException; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; + +import java.math.BigDecimal; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +@DisplayName("Kafka Producer Service Tests") +class KafkaProducerServiceTest { + + @Mock + private KafkaTemplate kafkaTemplate; + + @InjectMocks + private KafkaProducerService kafkaProducerService; + + private UUID transactionId; + private TransactionCreatedEvent event; + private String topic; + private String key; + + @BeforeEach + void setUp() { + transactionId = UUID.randomUUID(); + topic = KafkaTopics.TRANSACTION_CREATED; + key = transactionId.toString(); + + event = TransactionCreatedEvent.builder() + .transactionExternalId(transactionId) + .accountExternalIdDebit(UUID.randomUUID()) + .accountExternalIdCredit(UUID.randomUUID()) + .transferTypeId(1) + .value(new BigDecimal("500.00")) + .build(); + } + + @Test + @DisplayName("Should send event successfully") + void shouldSendEventSuccessfully() throws Exception { + // Given + RecordMetadata metadata = new RecordMetadata(new TopicPartition(topic, 0), 0, 0, 0, 0, 0); + SendResult sendResult = mock(SendResult.class); + when(sendResult.getRecordMetadata()).thenReturn(metadata); + + CompletableFuture > future = + CompletableFuture.completedFuture(sendResult); + + when(kafkaTemplate.send(anyString(), anyString(), any(TransactionCreatedEvent.class))) + .thenReturn(future); + + // When + kafkaProducerService.sendTransactionCreatedEvent(topic, key, event); + + // Then + verify(kafkaTemplate, times(1)).send(topic, key, event); + } + + @Test + @DisplayName("Should throw KafkaProducerException when send fails") + void shouldThrowKafkaProducerExceptionWhenSendFails() { + // Given + CompletableFuture > future = + CompletableFuture.failedFuture(new RuntimeException("Kafka unavailable")); + + when(kafkaTemplate.send(anyString(), anyString(), any(TransactionCreatedEvent.class))) + .thenReturn(future); + + // When & Then + assertThrows(KafkaProducerException.class, () -> + kafkaProducerService.sendTransactionCreatedEvent(topic, key, event) + ); + + verify(kafkaTemplate, times(1)).send(topic, key, event); + } + + @Test + @DisplayName("Should handle interrupted exception") + void shouldHandleInterruptedException() { + // Given + CompletableFuture > future = new CompletableFuture<>(); + future.completeExceptionally(new InterruptedException("Thread interrupted")); + + when(kafkaTemplate.send(anyString(), anyString(), any(TransactionCreatedEvent.class))) + .thenReturn(future); + + // When & Then + assertThrows(KafkaProducerException.class, () -> + kafkaProducerService.sendTransactionCreatedEvent(topic, key, event) + ); + + verify(kafkaTemplate, times(1)).send(topic, key, event); + } + + @Test + @DisplayName("Should use correct topic and key") + void shouldUseCorrectTopicAndKey() throws Exception { + // Given + RecordMetadata metadata = new RecordMetadata(new TopicPartition(topic, 0), 0, 0, 0, 0, 0); + SendResult sendResult = mock(SendResult.class); + when(sendResult.getRecordMetadata()).thenReturn(metadata); + + CompletableFuture > future = + CompletableFuture.completedFuture(sendResult); + + when(kafkaTemplate.send(eq(topic), eq(key), any(TransactionCreatedEvent.class))) + .thenReturn(future); + + // When + kafkaProducerService.sendTransactionCreatedEvent(topic, key, event); + + // Then + verify(kafkaTemplate).send(eq(topic), eq(key), any(TransactionCreatedEvent.class)); + } + + @Test + @DisplayName("Should send event with correct transaction data") + void shouldSendEventWithCorrectTransactionData() throws Exception { + // Given + RecordMetadata metadata = new RecordMetadata(new TopicPartition(topic, 0), 0, 0, 0, 0, 0); + SendResult sendResult = mock(SendResult.class); + when(sendResult.getRecordMetadata()).thenReturn(metadata); + + CompletableFuture > future = + CompletableFuture.completedFuture(sendResult); + + when(kafkaTemplate.send(anyString(), anyString(), any(TransactionCreatedEvent.class))) + .thenReturn(future); + + // When + kafkaProducerService.sendTransactionCreatedEvent(topic, key, event); + + // Then + verify(kafkaTemplate).send(anyString(), anyString(), argThat(evt -> + evt.getTransactionExternalId().equals(transactionId) && + evt.getValue().equals(new BigDecimal("500.00")) && + evt.getTransferTypeId().equals(1) + )); + } +} + diff --git a/transaction-service/src/test/java/com/yape/challenge/transaction/integration/TransactionServiceIntegrationTest.java b/transaction-service/src/test/java/com/yape/challenge/transaction/integration/TransactionServiceIntegrationTest.java new file mode 100644 index 0000000000..389feb611d --- /dev/null +++ b/transaction-service/src/test/java/com/yape/challenge/transaction/integration/TransactionServiceIntegrationTest.java @@ -0,0 +1,278 @@ +package com.yape.challenge.transaction.integration; + +import com.yape.challenge.common.dto.TransactionCreatedEvent; +import com.yape.challenge.common.dto.TransactionStatus; +import com.yape.challenge.common.kafka.KafkaTopics; +import com.yape.challenge.transaction.application.dto.request.CreateTransactionRequest; +import com.yape.challenge.transaction.domain.entity.Transaction; +import com.yape.challenge.transaction.domain.entity.TransactionType; +import com.yape.challenge.transaction.infrastructure.repository.TransactionRepository; +import com.yape.challenge.transaction.infrastructure.repository.TransactionTypeRepository; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.KafkaMessageListenerContainer; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.ContainerTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.MvcResult; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*; + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.datasource.url=jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;CASE_INSENSITIVE_IDENTIFIERS=TRUE;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;DEFAULT_NULL_ORDERING=HIGH", + "spring.datasource.driver-class-name=org.h2.Driver", + "spring.sql.init.mode=never", + "spring.jpa.hibernate.ddl-auto=create-drop", + "spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.H2Dialect", + "spring.jpa.show-sql=true", + "spring.cache.type=none", + "spring.data.redis.repositories.enabled=false" + } +) +@AutoConfigureMockMvc +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +@EmbeddedKafka( + partitions = 3, + topics = {KafkaTopics.TRANSACTION_CREATED, KafkaTopics.TRANSACTION_STATUS_UPDATED} +) +@DisplayName("Transaction Service Integration Tests") +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class TransactionServiceIntegrationTest { + + @Autowired + private MockMvc mockMvc; + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private TransactionRepository transactionRepository; + + @Autowired + private TransactionTypeRepository transactionTypeRepository; + + private KafkaMessageListenerContainer container; + private BlockingQueue > records; + + @BeforeEach + void setUp() { + // Clean up database before each test + transactionRepository.deleteAll(); + + // Ensure transaction types exist + if (transactionTypeRepository.count() == 0) { + TransactionType type = new TransactionType(); + type.setName("Tipo A"); + transactionTypeRepository.save(type); + } + + // Setup Kafka consumer for integration tests + records = new LinkedBlockingQueue<>(); + + Map consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("spring.embedded.kafka.brokers")); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-integration-group-" + System.currentTimeMillis()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); + consumerProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, TransactionCreatedEvent.class.getName()); + + DefaultKafkaConsumerFactory consumerFactory = + new DefaultKafkaConsumerFactory<>(consumerProps); + + ContainerProperties containerProperties = new ContainerProperties(KafkaTopics.TRANSACTION_CREATED); + container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); + container.setupMessageListener((MessageListener ) records::add); + container.start(); + + ContainerTestUtils.waitForAssignment(container, 3); + } + + @AfterEach + void tearDown() { + if (container != null) { + container.stop(); + } + } + + @Test + @Order(1) + @DisplayName("Should create transaction and publish to Kafka") + void shouldCreateTransactionAndPublishToKafka() throws Exception { + // Given + CreateTransactionRequest request = CreateTransactionRequest.builder() + .accountExternalIdDebit(UUID.randomUUID()) + .accountExternalIdCredit(UUID.randomUUID()) + .tranferTypeId(1) + .value(new BigDecimal("500.00")) + .build(); + + // When + MvcResult result = mockMvc.perform(post("/api/v1/transactions") + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(request))) + .andExpect(status().isCreated()) + .andExpect(jsonPath("$.transactionExternalId").exists()) + .andExpect(jsonPath("$.transactionStatus.name").value("PENDING")) + .andExpect(jsonPath("$.value").value(500.00)) + .andReturn(); + + // Then - Verify database + String response = result.getResponse().getContentAsString(); + String transactionId = objectMapper.readTree(response).get("transactionExternalId").asText(); + + Transaction savedTransaction = transactionRepository + .findByExternalId(UUID.fromString(transactionId)) + .orElse(null); + + assertNotNull(savedTransaction); + assertEquals(TransactionStatus.PENDING, savedTransaction.getStatus()); + + // Then - Verify Kafka event + ConsumerRecord received = records.poll(10, TimeUnit.SECONDS); + assertNotNull(received, "Should receive a Kafka event"); + + TransactionCreatedEvent kafkaEvent = received.value(); + assertEquals(transactionId, kafkaEvent.getTransactionExternalId().toString()); + assertEquals(new BigDecimal("500.00"), kafkaEvent.getValue()); + } + + @Test + @Order(2) + @DisplayName("Should get transaction by external id") + void shouldGetTransactionByExternalId() throws Exception { + // Given - Create a transaction first + Transaction transaction = Transaction.builder() + .externalId(UUID.randomUUID()) + .accountExternalIdDebit(UUID.randomUUID()) + .accountExternalIdCredit(UUID.randomUUID()) + .transferTypeId(1) + .value(new BigDecimal("300.00")) + .status(TransactionStatus.PENDING) + .build(); + transaction = transactionRepository.save(transaction); + + // When & Then + mockMvc.perform(get("/api/v1/transactions/{externalId}", transaction.getExternalId())) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.transactionExternalId").value(transaction.getExternalId().toString())) + .andExpect(jsonPath("$.transactionStatus.name").value("PENDING")) + .andExpect(jsonPath("$.value").value(300.00)); + } + + @Test + @Order(3) + @DisplayName("Should return 404 when transaction not found") + void shouldReturn404WhenTransactionNotFound() throws Exception { + // Given + UUID nonExistentId = UUID.randomUUID(); + + // When & Then + mockMvc.perform(get("/api/v1/transactions/{externalId}", nonExistentId)) + .andExpect(status().isNotFound()); + } + + @Test + @Order(4) + @DisplayName("Should persist transaction in event store and read model") + void shouldPersistTransactionInEventStoreAndReadModel() throws Exception { + // Given + CreateTransactionRequest request = CreateTransactionRequest.builder() + .accountExternalIdDebit(UUID.randomUUID()) + .accountExternalIdCredit(UUID.randomUUID()) + .tranferTypeId(1) + .value(new BigDecimal("750.00")) + .build(); + + // When + MvcResult result = mockMvc.perform(post("/api/v1/transactions") + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(request))) + .andExpect(status().isCreated()) + .andReturn(); + + // Then + String response = result.getResponse().getContentAsString(); + String transactionId = objectMapper.readTree(response).get("transactionExternalId").asText(); + + // Verify transaction exists in read model (database) + Transaction transaction = transactionRepository + .findByExternalId(UUID.fromString(transactionId)) + .orElse(null); + + assertNotNull(transaction); + assertEquals(new BigDecimal("750.00"), transaction.getValue()); + assertEquals(TransactionStatus.PENDING, transaction.getStatus()); + } + + @Test + @Order(5) + @DisplayName("Should create multiple transactions successfully") + void shouldCreateMultipleTransactionsSuccessfully() throws Exception { + // Given + long initialCount = transactionRepository.count(); + + CreateTransactionRequest request1 = CreateTransactionRequest.builder() + .accountExternalIdDebit(UUID.randomUUID()) + .accountExternalIdCredit(UUID.randomUUID()) + .tranferTypeId(1) + .value(new BigDecimal("100.00")) + .build(); + + CreateTransactionRequest request2 = CreateTransactionRequest.builder() + .accountExternalIdDebit(UUID.randomUUID()) + .accountExternalIdCredit(UUID.randomUUID()) + .tranferTypeId(1) + .value(new BigDecimal("200.00")) + .build(); + + // When + mockMvc.perform(post("/api/v1/transactions") + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(request1))) + .andExpect(status().isCreated()); + + mockMvc.perform(post("/api/v1/transactions") + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(request2))) + .andExpect(status().isCreated()); + + // Then + assertEquals(initialCount + 2, transactionRepository.count()); + + // Verify both Kafka events + ConsumerRecord event1 = records.poll(10, TimeUnit.SECONDS); + ConsumerRecord event2 = records.poll(10, TimeUnit.SECONDS); + + assertNotNull(event1); + assertNotNull(event2); + } +} + diff --git a/transaction-service/src/test/java/com/yape/challenge/transaction/presentation/controller/TransactionControllerTest.java b/transaction-service/src/test/java/com/yape/challenge/transaction/presentation/controller/TransactionControllerTest.java new file mode 100644 index 0000000000..446b489a7f --- /dev/null +++ b/transaction-service/src/test/java/com/yape/challenge/transaction/presentation/controller/TransactionControllerTest.java @@ -0,0 +1,221 @@ +package com.yape.challenge.transaction.presentation.controller; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yape.challenge.transaction.application.bus.CommandBus; +import com.yape.challenge.transaction.application.bus.QueryBus; +import com.yape.challenge.transaction.application.command.CreateTransactionCommand; +import com.yape.challenge.transaction.application.dto.request.CreateTransactionRequest; +import com.yape.challenge.transaction.application.dto.response.TransactionResponse; +import com.yape.challenge.transaction.application.query.GetTransactionQuery; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.http.MediaType; +import org.springframework.test.web.servlet.MockMvc; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*; + +@WebMvcTest(TransactionController.class) +@DisplayName("Transaction Controller Tests") +class TransactionControllerTest { + + @Autowired + private MockMvc mockMvc; + + @Autowired + private ObjectMapper objectMapper; + + @MockBean + private CommandBus commandBus; + + @MockBean + private QueryBus queryBus; + + private UUID transactionId; + private UUID debitAccountId; + private UUID creditAccountId; + private CreateTransactionRequest createRequest; + private TransactionResponse transactionResponse; + + @BeforeEach + void setUp() { + transactionId = UUID.randomUUID(); + debitAccountId = UUID.randomUUID(); + creditAccountId = UUID.randomUUID(); + + createRequest = CreateTransactionRequest.builder() + .accountExternalIdDebit(debitAccountId) + .accountExternalIdCredit(creditAccountId) + .tranferTypeId(1) + .value(new BigDecimal("500.00")) + .build(); + + transactionResponse = TransactionResponse.builder() + .transactionExternalId(transactionId) + .transactionType(TransactionResponse.TransactionTypeDto.builder() + .name("Tipo A") + .build()) + .transactionStatus(TransactionResponse.TransactionStatusDto.builder() + .name("PENDING") + .build()) + .value(new BigDecimal("500.00")) + .createdAt(LocalDateTime.now()) + .build(); + } + + @Test + @DisplayName("Should create transaction successfully") + void shouldCreateTransactionSuccessfully() throws Exception { + // Given + when(commandBus.dispatch(any(CreateTransactionCommand.class))) + .thenReturn(transactionResponse); + + // When & Then + mockMvc.perform(post("/api/v1/transactions") + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(createRequest))) + .andExpect(status().isCreated()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andExpect(jsonPath("$.transactionExternalId").value(transactionId.toString())) + .andExpect(jsonPath("$.transactionStatus.name").value("PENDING")) + .andExpect(jsonPath("$.value").value(500.00)) + .andExpect(jsonPath("$.transactionType.name").value("Tipo A")); + + verify(commandBus, times(1)).dispatch(any(CreateTransactionCommand.class)); + } + + @Test + @DisplayName("Should return bad request when request is invalid") + void shouldReturnBadRequestWhenRequestIsInvalid() throws Exception { + // Given + CreateTransactionRequest invalidRequest = CreateTransactionRequest.builder() + .accountExternalIdDebit(null) // Required field missing + .accountExternalIdCredit(creditAccountId) + .tranferTypeId(1) + .value(new BigDecimal("500.00")) + .build(); + + // When & Then + mockMvc.perform(post("/api/v1/transactions") + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(invalidRequest))) + .andExpect(status().isBadRequest()); + + verify(commandBus, never()).dispatch(any(CreateTransactionCommand.class)); + } + + @Test + @DisplayName("Should get transaction by external id") + void shouldGetTransactionByExternalId() throws Exception { + // Given + when(queryBus.dispatch(any(GetTransactionQuery.class))) + .thenReturn(transactionResponse); + + // When & Then + mockMvc.perform(get("/api/v1/transactions/{externalId}", transactionId)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andExpect(jsonPath("$.transactionExternalId").value(transactionId.toString())) + .andExpect(jsonPath("$.transactionStatus.name").value("PENDING")) + .andExpect(jsonPath("$.value").value(500.00)); + + verify(queryBus, times(1)).dispatch(any(GetTransactionQuery.class)); + } + + @Test + @DisplayName("Should handle valid UUID path variable") + void shouldHandleValidUuidPathVariable() throws Exception { + // Given + when(queryBus.dispatch(any(GetTransactionQuery.class))) + .thenReturn(transactionResponse); + + // When & Then + mockMvc.perform(get("/api/v1/transactions/{externalId}", transactionId.toString())) + .andExpect(status().isOk()); + + verify(queryBus, times(1)).dispatch(any(GetTransactionQuery.class)); + } + + @Test + @DisplayName("Should create transaction with minimum valid value") + void shouldCreateTransactionWithMinimumValidValue() throws Exception { + // Given + CreateTransactionRequest minValueRequest = CreateTransactionRequest.builder() + .accountExternalIdDebit(debitAccountId) + .accountExternalIdCredit(creditAccountId) + .tranferTypeId(1) + .value(new BigDecimal("0.01")) + .build(); + + TransactionResponse minValueResponse = TransactionResponse.builder() + .transactionExternalId(transactionId) + .transactionType(TransactionResponse.TransactionTypeDto.builder() + .name("Tipo A") + .build()) + .transactionStatus(TransactionResponse.TransactionStatusDto.builder() + .name("PENDING") + .build()) + .value(new BigDecimal("0.01")) + .createdAt(LocalDateTime.now()) + .build(); + + when(commandBus.dispatch(any(CreateTransactionCommand.class))) + .thenReturn(minValueResponse); + + // When & Then + mockMvc.perform(post("/api/v1/transactions") + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(minValueRequest))) + .andExpect(status().isCreated()) + .andExpect(jsonPath("$.value").value(0.01)); + + verify(commandBus, times(1)).dispatch(any(CreateTransactionCommand.class)); + } + + @Test + @DisplayName("Should create transaction with large value") + void shouldCreateTransactionWithLargeValue() throws Exception { + // Given + CreateTransactionRequest largeValueRequest = CreateTransactionRequest.builder() + .accountExternalIdDebit(debitAccountId) + .accountExternalIdCredit(creditAccountId) + .tranferTypeId(1) + .value(new BigDecimal("99999.99")) + .build(); + + TransactionResponse largeValueResponse = TransactionResponse.builder() + .transactionExternalId(transactionId) + .transactionType(TransactionResponse.TransactionTypeDto.builder() + .name("Tipo A") + .build()) + .transactionStatus(TransactionResponse.TransactionStatusDto.builder() + .name("PENDING") + .build()) + .value(new BigDecimal("99999.99")) + .createdAt(LocalDateTime.now()) + .build(); + + when(commandBus.dispatch(any(CreateTransactionCommand.class))) + .thenReturn(largeValueResponse); + + // When & Then + mockMvc.perform(post("/api/v1/transactions") + .contentType(MediaType.APPLICATION_JSON) + .content(objectMapper.writeValueAsString(largeValueRequest))) + .andExpect(status().isCreated()) + .andExpect(jsonPath("$.value").value(99999.99)); + + verify(commandBus, times(1)).dispatch(any(CreateTransactionCommand.class)); + } +} diff --git a/transaction-service/src/test/resources/application.yml b/transaction-service/src/test/resources/application.yml new file mode 100644 index 0000000000..2ca6791b2a --- /dev/null +++ b/transaction-service/src/test/resources/application.yml @@ -0,0 +1,68 @@ +spring: + kafka: + bootstrap-servers: localhost:9092 + consumer: + group-id: transaction-test-group + auto-offset-reset: earliest + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.trusted.packages: "*" + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + + datasource: + url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1;CASE_INSENSITIVE_IDENTIFIERS=TRUE;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE;DEFAULT_NULL_ORDERING=HIGH + driverClassName: org.h2.Driver + username: sa + password: + + jpa: + hibernate: + ddl-auto: create-drop + show-sql: true + properties: + hibernate: + dialect: org.hibernate.dialect.H2Dialect + format_sql: true + + h2: + console: + enabled: true + + cache: + type: none + +resilience4j: + circuitbreaker: + instances: + database: + register-health-indicator: true + sliding-window-size: 10 + minimum-number-of-calls: 5 + permitted-number-of-calls-in-half-open-state: 3 + wait-duration-in-open-state: 10s + failure-rate-threshold: 50 + kafkaProducer: + register-health-indicator: true + sliding-window-size: 10 + minimum-number-of-calls: 5 + permitted-number-of-calls-in-half-open-state: 3 + wait-duration-in-open-state: 5s + failure-rate-threshold: 50 + + retry: + instances: + kafkaProducer: + max-attempts: 3 + wait-duration: 1s + enable-exponential-backoff: true + exponential-backoff-multiplier: 2 + +logging: + level: + com.yape.challenge: DEBUG + org.springframework.kafka: DEBUG + org.hibernate.SQL: DEBUG +