Since we have alot of logic in the saga class introduced a couple of services each having to deal with one part of the logic, Everything is working as expected besides the orderTotalPrice variable in the ProductReservationService. When i go through the code with the debugger the order price is getting incremented in the handle ProductReservedEvent method but when i try to use this variable in the OrderCreationHandler to emit an update to the subscription query i get a 0 as a value of the orderTotalPrice. when i remove the productReservationService.resetOrderTotalPrice(); statement in the processOrder function i get the orderTotalPrice for all the orders made in the app (even when i restart the app)
This is the code i have:
import com.jchaaban.ordersservice.command.CancelOrderCommand;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.springframework.stereotype.Service;
@Service
public class OrderCancellationService {
private final transient CommandGateway commandGateway;
private final transient ProductReservationService productReservationService;
private final transient OrderSummaryEmitter orderSummaryEmitter;
public OrderCancellationService(CommandGateway commandGateway, ProductReservationService productReservationService, OrderSummaryEmitter orderSummaryEmitter) {
this.commandGateway = commandGateway;
this.productReservationService = productReservationService;
this.orderSummaryEmitter = orderSummaryEmitter;
}
public void cancelOrder(String orderId, String reason) {
commandGateway.send(new CancelOrderCommand(orderId, reason));
productReservationService.rollbackReservations();
orderSummaryEmitter.emitOrderSummary(orderId, reason);
}
}
import com.jchaaban.common.command.UpdatePaymentDetailsBalanceCommand;
import com.jchaaban.common.dto.ReadPaymentDetailsDto;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.springframework.stereotype.Service;
@Service
public class PaymentService {
private final transient CommandGateway commandGateway;
public PaymentService(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
public void updatePaymentDetailsBalance(String paymentDetailsId, double amount) {
commandGateway.sendAndWait(new UpdatePaymentDetailsBalanceCommand(paymentDetailsId, amount));
}
}
import com.jchaaban.ordersservice.rest.controller.query.FetchOrderQuery;
import com.jchaaban.ordersservice.rest.dto.OrderSummary;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.springframework.stereotype.Service;
@Service
public class OrderSummaryEmitter {
private final transient QueryUpdateEmitter queryUpdateEmitter;
public OrderSummaryEmitter(QueryUpdateEmitter queryUpdateEmitter) {
this.queryUpdateEmitter = queryUpdateEmitter;
}
public void emitOrderSummary(String orderId, String message) {
OrderSummary summary = new OrderSummary(orderId, message);
queryUpdateEmitter.emit(FetchOrderQuery.class, query -> true, summary);
}
}
import com.jchaaban.common.dto.ReadPaymentDetailsDto;
import com.jchaaban.common.dto.ReadUserDto;
import com.jchaaban.common.query.FetchUserPaymentDetailsQuery;
import com.jchaaban.common.query.FetchUserQuery;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.QueryGateway;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
public class UserQueryService {
private final transient QueryGateway queryGateway;
public UserQueryService(QueryGateway queryGateway) {
this.queryGateway = queryGateway;
}
public Optional<ReadUserDto> fetchUserInformation(String userId) {
return Optional.ofNullable(
queryGateway.query(new FetchUserQuery(userId), ResponseTypes.instanceOf(ReadUserDto.class))
.exceptionally(ex -> null)
.join()
);
}
public Optional<ReadPaymentDetailsDto> fetchUserPaymentDetails(String userId) {
return Optional.ofNullable(
queryGateway.query(new FetchUserPaymentDetailsQuery(userId), ResponseTypes.instanceOf(ReadPaymentDetailsDto.class))
.exceptionally(ex -> null)
.join()
);
}
}
import com.jchaaban.common.command.ReserveProductCommand;
import com.jchaaban.common.command.UnreserveProductCommand;
import com.jchaaban.common.event.ProductReservedEvent;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandExecutionException;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
@Service
public class ProductReservationService {
private final transient CommandGateway commandGateway;
private final Map<String, Integer> successfullyReservedProducts = new HashMap<>();
@Getter
private double orderTotalPrice = 0;
@EventHandler
public void handle(ProductReservedEvent event) {
log.info("Handling ProductReservedEvent for productId: {}, productPrice: {}, quantity: {}",
event.getProductId(), event.getProductPrice(), event.getQuantity());
orderTotalPrice += event.getProductPrice().doubleValue() * event.getQuantity();;
log.info("Updated orderPrice: {}", orderTotalPrice);
}
public ProductReservationService(CommandGateway commandGateway) {
this.commandGateway = commandGateway;
}
public boolean reserveProducts(List<String> productIds) {
Map<String, Integer> productOccurrences = countProductOccurrences(productIds);
return productOccurrences.entrySet().stream().allMatch(entry -> reserveProduct(entry.getKey(), entry.getValue()));
}
public void rollbackReservations() {
log.info("Rolling back product reservations.");
successfullyReservedProducts.forEach((productId, quantity) ->
commandGateway.send(new UnreserveProductCommand(productId, quantity)));
successfullyReservedProducts.clear();
orderTotalPrice = 0;
}
public void resetOrderTotalPrice(){
orderTotalPrice = 0;
}
private Map<String, Integer> countProductOccurrences(List<String> productIds) {
return productIds.stream()
.collect(Collectors.groupingBy(productId -> productId, Collectors.summingInt(productId -> 1)));
}
private boolean reserveProduct(String productId, int quantity) {
try {
commandGateway.sendAndWait(new ReserveProductCommand(productId, quantity));
successfullyReservedProducts.put(productId, quantity);
} catch (CommandExecutionException exception) {
rollbackReservations();
return false;
}
return true;
}
}
import com.jchaaban.common.dto.ReadPaymentDetailsDto;
import com.jchaaban.ordersservice.saga.service.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
@Data
public class OrderCreationHandler {
private static final String USER_NOT_FOUND_TEMPLATE = "User with ID: %s not found.";
private static final String PAYMENT_DETAILS_NOT_FOUND_TEMPLATE = "Payment details for user with ID: %s not found.";
private static final String INSUFFICIENT_BALANCE = "Insufficient balance.";
private static final String ORDER_SUCCESS_TEMPLATE = "Order processed successfully. Total cost: %s.";
private final transient OrderSummaryEmitter orderSummaryEmitter;
private final transient ProductReservationService productReservationService;
private final transient UserQueryService userQueryService;
private final transient OrderCancellationService orderCancellationService;
private final transient PaymentService paymentService;
public OrderCreationHandler(
OrderSummaryEmitter orderSummaryEmitter,
ProductReservationService productReservationService,
UserQueryService userQueryService,
OrderCancellationService orderCancellationService,
PaymentService paymentService
) {
this.orderSummaryEmitter = orderSummaryEmitter;
this.productReservationService = productReservationService;
this.userQueryService = userQueryService;
this.orderCancellationService = orderCancellationService;
this.paymentService = paymentService;
}
public void processOrder(String orderId, String userId, List<String> productIds) {
userQueryService.fetchUserInformation(userId).ifPresentOrElse(
user -> processPaymentDetails(orderId, userId, productIds),
() -> orderSummaryEmitter.emitOrderSummary(orderId, String.format(USER_NOT_FOUND_TEMPLATE, userId))
);
}
private void processPaymentDetails(String orderId, String userId, List<String> productIds) {
userQueryService.fetchUserPaymentDetails(userId).ifPresentOrElse(
paymentDetails -> processOrder(orderId, productIds, paymentDetails),
() -> orderSummaryEmitter.emitOrderSummary(orderId, String.format(PAYMENT_DETAILS_NOT_FOUND_TEMPLATE, userId))
);
}
private void processOrder(String orderId, List<String> productIds, ReadPaymentDetailsDto paymentDetails) {
if (!productReservationService.reserveProducts(productIds)) {
orderCancellationService.cancelOrder(orderId, "Product reservation failed");
return;
}
double orderPrice = productReservationService.getOrderTotalPrice();
if (paymentDetails.getBalance() < orderPrice) {
orderCancellationService.cancelOrder(orderId, INSUFFICIENT_BALANCE);
return;
}
orderSummaryEmitter.emitOrderSummary(orderId, String.format(ORDER_SUCCESS_TEMPLATE, orderPrice));
paymentService.updatePaymentDetailsBalance(paymentDetails.getPaymentDetailsId(), orderPrice);
productReservationService.resetOrderTotalPrice();
}
}
import com.jchaaban.common.event.ProductReservedEvent;
import com.jchaaban.ordersservice.event.OrderCreatedEvent;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.modelling.saga.SagaEventHandler;
import org.axonframework.modelling.saga.StartSaga;
import org.axonframework.spring.stereotype.Saga;
import org.springframework.beans.factory.annotation.Autowired;
@Saga // is already a spring component
@Slf4j
@NoArgsConstructor
public class OrderSaga {
private String orderId;
@Autowired
private transient OrderCreationHandler orderCreationHandler;
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void handleOrderCreated(OrderCreatedEvent event) {
log.info("Handling OrderCreatedEvent for orderId: {}", event.getOrderId());
this.orderId = event.getOrderId();
orderCreationHandler.processOrder(orderId, event.getUserId(), event.getProductIds());
}
}
Here are the logs:
2023-10-30T16:15:47.496+01:00 INFO 19832 — [agaProcessor]-0] c.jchaaban.ordersservice.saga.OrderSaga : Handling OrderCreatedEvent for orderId: 4ea2f30f-6a7a-4886-a99d-58558ef1f769
Hibernate: update token_entry set token=?,token_type=?,timestamp=? where owner=? and processor_name=? and segment=?
2023-10-30T16:15:47.550+01:00 INFO 19832 — [saga.service]-0] c.j.o.s.s.ProductReservationService : Handling ProductReservedEvent for productId: cb70a736-05fd-4091-8441-d83eaa117673, productPrice: 33, quantity: 2
2023-10-30T16:15:47.550+01:00 INFO 19832 — [saga.service]-0] c.j.o.s.s.ProductReservationService : Updated orderPrice: 66.0
Hibernate: select nextval(‘association_value_entry_seq’)
and the infos in the logs are correct the total price of this order is indeed 66, Do you know any solution for this problem?
Thank you