Skip to content

Commit

Permalink
#1268 Step3 Customer Execute Payment
Browse files Browse the repository at this point in the history
  • Loading branch information
tuannguyenh1 committed Nov 19, 2024
1 parent 169b288 commit fa18c3c
Show file tree
Hide file tree
Showing 57 changed files with 1,098 additions and 370 deletions.
6 changes: 4 additions & 2 deletions order/src/it/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ spring.jpa.open-in-view=true
cors.allowed-origins=*

cdc.event.checkout.status.topic-name=dbcheckout-status.public.checkout
cdc.event.checkout.status.group-id=checkout-status
cdc.event.checkout.fulfillment.group-id=checkout-fulfillment
cdc.event.checkout.confirmed.status.group-id=checkout-confirmed-status

cdc.event.payment.topic-name=dbpayment.public.payment
cdc.event.payment.update.group-id=payment-update
cdc.event.payment.order.id.update.group-id=payment-order-id-update
cdc.event.payment.status.update.group-id=payment-status-update

kafka.version=7.0.9

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public ResponseEntity<CheckoutVm> createCheckout(@Valid @RequestBody CheckoutPos
return ResponseEntity.ok(checkoutService.createCheckout(checkoutPostVm));
}

@PostMapping("/storefront/checkouts/{id}/process-payment")
public ResponseEntity<Void> processPayment(@PathVariable String id) {
checkoutService.processPayment(id);
return ResponseEntity.ok().build();
}

@PutMapping("/storefront/checkouts/status")
public ResponseEntity<Long> updateCheckoutStatus(@Valid @RequestBody CheckoutStatusPutVm checkoutStatusPutVm) {
return ResponseEntity.ok(checkoutService.updateCheckoutStatus(checkoutStatusPutVm));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package com.yas.order.kafka.consumer;

import static com.yas.order.kafka.helper.ConsumerHelper.processForEventUpdate;
import static com.yas.order.utils.JsonUtils.getJsonValueOrNull;
import static com.yas.order.utils.JsonUtils.getJsonValueOrThrow;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yas.commonlibrary.exception.BadRequestException;
import com.yas.order.model.Checkout;
import com.yas.order.model.CheckoutItem;
import com.yas.order.model.Order;
import com.yas.order.model.OrderItem;
import com.yas.order.model.enumeration.CheckoutState;
import com.yas.order.model.enumeration.DeliveryStatus;
import com.yas.order.model.enumeration.OrderStatus;
import com.yas.order.service.CheckoutItemService;
import com.yas.order.service.CheckoutService;
import com.yas.order.service.OrderAddressService;
import com.yas.order.service.OrderItemService;
import com.yas.order.service.OrderService;
import com.yas.order.utils.Constants;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
* After the Checkout Status is set to PAYMENT_CONFIRMED, an order will be created.
*/
@Service
@Transactional
@RequiredArgsConstructor
public class CheckoutConfirmedStatusConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(CheckoutConfirmedStatusConsumer.class);
private final OrderService orderService;
private final OrderItemService orderItemService;
private final CheckoutService checkoutService;
private final CheckoutItemService checkoutItemService;
private final OrderAddressService orderAddressService;
private final ObjectMapper objectMapper;

@KafkaListener(
topics = "${cdc.event.checkout.status.topic-name}",
groupId = "${cdc.event.checkout.confirmed.status.group-id}"
)
@RetryableTopic
public void listen(ConsumerRecord<?, ?> consumerRecord) {
processForEventUpdate(
consumerRecord,
this::handleJsonForUpdateCheckout,
objectMapper,
LOGGER
);
}

private void handleJsonForUpdateCheckout(JsonNode valueObject) {

JsonNode before = valueObject.get("before");
JsonNode after = valueObject.get("after");

String id = getJsonValueOrThrow(after, Constants.Column.ID_COLUMN, Constants.ErrorCode.ID_NOT_EXISTED);
String beforeStatus = getJsonValueOrNull(before, Constants.Column.STATUS_COLUMN);
String afterStatus = getJsonValueOrNull(after, Constants.Column.STATUS_COLUMN);

if (Objects.isNull(afterStatus)
|| afterStatus.equals(beforeStatus)
|| !CheckoutState.PAYMENT_CONFIRMED.name().equals(afterStatus)
) {
LOGGER.info("It's not an event to create Order with Checkout Id {}", id);
return;
}

LOGGER.info("Checkout record with ID {} has the status 'PAYMENT_CONFIRMED'", id);

Checkout checkout = checkoutService.findCheckoutById(id);
List<CheckoutItem> checkoutItemList = checkoutItemService.getAllByCheckoutId(checkout.getId());

Order order = createOrder(checkout, checkoutItemList);
createOrderItems(order, checkoutItemList);
updateCheckoutStatus(checkout);
}

private Order createOrder(Checkout checkout, List<CheckoutItem> checkoutItemList) {

Order order = Order.builder()
.email(checkout.getEmail())
.numberItem(checkoutItemList.size())
.note(checkout.getNote())
.tax(checkout.getTotalTax())
.discount(checkout.getTotalDiscountAmount())
.totalPrice(checkout.getTotalAmount())
.couponCode(checkout.getCouponCode())
.orderStatus(OrderStatus.PAYMENT_CONFIRMED)
.deliveryFee(checkout.getTotalShipmentFee())
.deliveryMethod(checkout.getShipmentMethodId())
.deliveryStatus(DeliveryStatus.PREPARING)
.totalShipmentTax(checkout.getTotalShipmentTax())
.customerId(checkout.getCustomerId())
.shippingAddressId(
Optional.ofNullable(checkout.getShippingAddressId())
.map(orderAddressService::findOrderAddressById)
.orElseThrow(() -> new BadRequestException("Shipping Address Id is not existed: {}",
checkout.getShippingAddressId()))
)
.billingAddressId(
Optional.ofNullable(checkout.getBillingAddressId())
.map(orderAddressService::findOrderAddressById)
.orElseThrow(() -> new BadRequestException("Billing Address Id is not existed: {}",
checkout.getBillingAddressId()))
)
.checkoutId(checkout.getId())
.build();

return orderService.updateOrder(order);

}

private void createOrderItems(Order order, List<CheckoutItem> checkoutItemList) {

List<OrderItem> orderItems = checkoutItemList.stream()
.map(item -> OrderItem.builder()
.productId(item.getProductId())
.productName(item.getProductName())
.quantity(item.getQuantity())
.productPrice(item.getProductPrice())
.note(item.getNote())
.orderId(order.getId())
.build())
.toList();

orderItemService.saveAll(orderItems);
}

private void updateCheckoutStatus(Checkout checkout) {
checkout.setCheckoutState(CheckoutState.FULFILLED);
checkoutService.updateCheckout(checkout);
}

}
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
package com.yas.order.kafka.consumer;

import static com.yas.order.kafka.helper.ConsumerHelper.processForEventUpdate;
import static com.yas.order.utils.JsonUtils.convertObjectToString;
import static com.yas.order.utils.JsonUtils.createJsonErrorObject;
import static com.yas.order.utils.JsonUtils.getAttributesNode;
import static com.yas.order.utils.JsonUtils.getJsonValueOrThrow;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.yas.commonlibrary.exception.BadRequestException;
import com.yas.commonlibrary.exception.NotFoundException;
import com.yas.order.model.Checkout;
import com.yas.order.model.enumeration.CheckoutProgress;
import com.yas.order.model.enumeration.CheckoutState;
import com.yas.order.repository.CheckoutRepository;
import com.yas.order.service.CheckoutService;
import com.yas.order.service.PaymentService;
import com.yas.order.utils.Constants;
import com.yas.order.viewmodel.payment.CheckoutPaymentVm;
import java.util.Objects;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
Expand All @@ -28,50 +25,41 @@
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.stereotype.Service;

/**
* After fulfillment, process payment and create order in PayPal.
*/
@Service
@RequiredArgsConstructor
public class OrderStatusConsumer {
public class CheckoutFulfillmentConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(OrderStatusConsumer.class);
private static final Logger LOGGER = LoggerFactory.getLogger(CheckoutFulfillmentConsumer.class);
private final PaymentService paymentService;
private final CheckoutRepository checkoutRepository;
private final CheckoutService checkoutService;
private final ObjectMapper objectMapper;
private final Gson gson;

@KafkaListener(
topics = "${cdc.event.checkout.status.topic-name}",
groupId = "${cdc.event.checkout.status.group-id}"
groupId = "${cdc.event.checkout.fulfillment.group-id}"
)
@RetryableTopic(
attempts = "1"
)
public void listen(ConsumerRecord<?, ?> consumerRecord) {

if (Objects.isNull(consumerRecord)) {
LOGGER.info("ConsumerRecord is null");
return;
}
JsonObject valueObject = gson.fromJson((String) consumerRecord.value(), JsonObject.class);
processCheckoutEvent(valueObject);

}

private void processCheckoutEvent(JsonObject valueObject) {
Optional.ofNullable(valueObject)
.filter(
value -> value.has("op") && "u".equals(value.get("op").getAsString())
)
.map(value -> value.getAsJsonObject("after"))
.ifPresent(this::handleAfterJson);
processForEventUpdate(
consumerRecord,
this::handleAfterJson,
objectMapper,
LOGGER
);
}

private void handleAfterJson(JsonObject after) {

String id = getJsonValueOrThrow(after, Constants.Column.ID_COLUMN,
private void handleAfterJson(JsonNode valueObject) {
JsonNode afterObject = valueObject.get("after");
String id = getJsonValueOrThrow(afterObject, Constants.Column.ID_COLUMN,
Constants.ErrorCode.ID_NOT_EXISTED);
String status = getJsonValueOrThrow(after, Constants.Column.STATUS_COLUMN,
String status = getJsonValueOrThrow(afterObject, Constants.Column.STATUS_COLUMN,
Constants.ErrorCode.STATUS_NOT_EXISTED, id);
String progress = getJsonValueOrThrow(after, Constants.Column.CHECKOUT_PROGRESS_COLUMN,
String progress = getJsonValueOrThrow(afterObject, Constants.Column.CHECKOUT_PROGRESS_COLUMN,
Constants.ErrorCode.PROGRESS_NOT_EXISTED, id);

if (!isPaymentProcessing(status, progress)) {
Expand All @@ -83,9 +71,7 @@ private void handleAfterJson(JsonObject after) {
LOGGER.info("Checkout record with ID {} has the status 'PAYMENT_PROCESSING' and the process 'STOCK_LOCKED'",
id);

Checkout checkout = checkoutRepository
.findById(id)
.orElseThrow(() -> new NotFoundException(Constants.ErrorCode.CHECKOUT_NOT_FOUND, id));
Checkout checkout = checkoutService.findCheckoutById(id);

processPaymentAndUpdateCheckout(checkout);
}
Expand Down Expand Up @@ -117,7 +103,7 @@ private void processPaymentAndUpdateCheckout(Checkout checkout) {
throw new BadRequestException(Constants.ErrorCode.PROCESS_CHECKOUT_FAILED, checkout.getId());

} finally {
checkoutRepository.save(checkout);
checkoutService.updateCheckout(checkout);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package com.yas.order.kafka.consumer;

import static com.yas.order.kafka.helper.ConsumerHelper.processForEventUpdate;
import static com.yas.order.utils.JsonUtils.convertObjectToString;
import static com.yas.order.utils.JsonUtils.getAttributesNode;
import static com.yas.order.utils.JsonUtils.getJsonValueOrNull;
import static com.yas.order.utils.JsonUtils.getJsonValueOrThrow;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.yas.order.model.Checkout;
import com.yas.order.model.enumeration.CheckoutProgress;
import com.yas.order.model.enumeration.CheckoutState;
import com.yas.order.service.CheckoutService;
import com.yas.order.utils.Constants;
import java.util.Objects;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
Expand All @@ -24,52 +23,44 @@
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.stereotype.Service;

/**
* After the PayPal Order id is updated in payment_provider_checkout_id column.
* Update Checkout state is PAYMENT_PROCESSING, progress is PAYMENT_CREATED
*/
@Service
@RequiredArgsConstructor
public class PaymentUpdateConsumer {
public class PaymentPaypalOrderIdUpdateConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(PaymentUpdateConsumer.class);
private static final Logger LOGGER = LoggerFactory.getLogger(PaymentPaypalOrderIdUpdateConsumer.class);
private final CheckoutService checkoutService;
private final ObjectMapper objectMapper;
private final Gson gson;

@KafkaListener(
topics = "${cdc.event.payment.topic-name}",
groupId = "${cdc.event.payment.update.group-id}"
groupId = "${cdc.event.payment.order.id.update.group-id}"
)
@RetryableTopic
public void listen(ConsumerRecord<?, ?> consumerRecord) {

if (Objects.isNull(consumerRecord)) {
LOGGER.info("Consumer Record is null");
return;
}
JsonObject valueObject = gson.fromJson((String) consumerRecord.value(), JsonObject.class);
processPaymentEvent(valueObject);

}

private void processPaymentEvent(JsonObject valueObject) {
Optional.ofNullable(valueObject)
.filter(
value -> value.has("op") && "u".equals(value.get("op").getAsString())
)
.filter(value -> value.has("before") && value.has("after"))
.ifPresent(this::handleJsonForUpdateCheckout);
processForEventUpdate(
consumerRecord,
this::handleJsonForUpdateCheckout,
objectMapper,
LOGGER
);
}

private void handleJsonForUpdateCheckout(JsonObject valueObject) {
private void handleJsonForUpdateCheckout(JsonNode valueObject) {

JsonObject before = valueObject.getAsJsonObject("before");
JsonObject after = valueObject.getAsJsonObject("after");
JsonNode before = valueObject.get("before");
JsonNode after = valueObject.get("after");

String id = getJsonValueOrThrow(after, Constants.Column.ID_COLUMN,
Constants.ErrorCode.ID_NOT_EXISTED);

String beforePaypalOrderId = getJsonValueOrNull(before,
Constants.Column.CHECKOUT_ATTRIBUTES_PAYMENT_PROVIDER_CHECKOUT_ID_FIELD);
Constants.Column.PAYMENT_ATTRIBUTES_PAYMENT_PROVIDER_CHECKOUT_ID_FIELD);
String afterPaypalOrderId = getJsonValueOrNull(after,
Constants.Column.CHECKOUT_ATTRIBUTES_PAYMENT_PROVIDER_CHECKOUT_ID_FIELD);
Constants.Column.PAYMENT_ATTRIBUTES_PAYMENT_PROVIDER_CHECKOUT_ID_FIELD);

if (!Objects.isNull(afterPaypalOrderId) && !afterPaypalOrderId.equals(beforePaypalOrderId)) {

Expand All @@ -90,7 +81,7 @@ private void updateCheckOut(String checkoutId, String paymentProviderCheckoutId)
checkout.setProgress(CheckoutProgress.PAYMENT_CREATED);

ObjectNode attributesNode = getAttributesNode(objectMapper, checkout.getAttributes());
attributesNode.put(Constants.Column.CHECKOUT_ATTRIBUTES_PAYMENT_PROVIDER_CHECKOUT_ID_FIELD,
attributesNode.put(Constants.Column.PAYMENT_ATTRIBUTES_PAYMENT_PROVIDER_CHECKOUT_ID_FIELD,
paymentProviderCheckoutId);
checkout.setAttributes(convertObjectToString(objectMapper, attributesNode));

Expand Down
Loading

0 comments on commit fa18c3c

Please sign in to comment.