custom/plugins/ZeobvBundleProducts/src/Core/Subscriber/BundleProductStockUpdater.php line 94

Open in your IDE?
  1. <?php
  2. declare(strict_types=1);
  3. namespace Zeobv\BundleProducts\Core\Subscriber;
  4. use Doctrine\DBAL\Connection;
  5. use Doctrine\DBAL\Driver\ResultStatement;
  6. use ReflectionMethod;
  7. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  8. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  9. use Symfony\Component\Messenger\MessageBusInterface;
  10. use Shopware\Core\Checkout\Cart\Event\CheckoutOrderPlacedEvent;
  11. use Shopware\Core\Checkout\Cart\LineItem\LineItem;
  12. use Shopware\Core\Checkout\Order\Aggregate\OrderLineItem\OrderLineItemDefinition;
  13. use Shopware\Core\Checkout\Order\OrderEvents;
  14. use Shopware\Core\Checkout\Order\OrderStates;
  15. use Shopware\Core\Content\Product\Events\ProductNoLongerAvailableEvent;
  16. use Shopware\Core\Defaults;
  17. use Shopware\Core\Framework\Context;
  18. use Shopware\Core\Framework\DataAbstractionLayer\Doctrine\RetryableQuery;
  19. use Shopware\Core\Framework\DataAbstractionLayer\EntityWriteResult;
  20. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenEvent;
  21. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\ChangeSetAware;
  22. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\DeleteCommand;
  23. use Shopware\Core\Framework\DataAbstractionLayer\Write\Validation\PreWriteValidationEvent;
  24. use Shopware\Core\Framework\Uuid\Uuid;
  25. use Shopware\Core\System\StateMachine\Event\StateMachineTransitionEvent;
  26. use Zeobv\BundleProducts\Service\ConfigService;
  27. use Zeobv\BundleProducts\Struct\BundleProduct\LineItem as BundleProductLineItem;
  28. use Zeobv\BundleProducts\MessageBus\Message\BundleStockUpdateMessage;
  29. class BundleProductStockUpdater implements EventSubscriberInterface
  30. {
  31.     private Connection $connection;
  32.     private EventDispatcherInterface $dispatcher;
  33.     private ConfigService $configService;
  34.     private MessageBusInterface $messageBus;
  35.     public function __construct(
  36.         Connection $connection,
  37.         EventDispatcherInterface $dispatcher,
  38.         ConfigService $configService,
  39.         MessageBusInterface $messageBus
  40.     ) {
  41.         $this->connection $connection;
  42.         $this->dispatcher $dispatcher;
  43.         $this->configService $configService;
  44.         $this->messageBus $messageBus;
  45.     }
  46.     /**
  47.      * Returns a list of custom business events to listen where the product maybe changed
  48.      */
  49.     public static function getSubscribedEvents(): array
  50.     {
  51.         return [
  52.             CheckoutOrderPlacedEvent::class => [
  53.                 ['orderPlaced', -1]
  54.             ],
  55.             StateMachineTransitionEvent::class => [
  56.                 ['stateChanged', -1]
  57.             ],
  58.             PreWriteValidationEvent::class => [
  59.                 ['triggerChangeSet', -1]
  60.             ],
  61.             OrderEvents::ORDER_LINE_ITEM_WRITTEN_EVENT => [
  62.                 ['lineItemWritten', -1]
  63.             ],
  64.             OrderEvents::ORDER_LINE_ITEM_DELETED_EVENT => [
  65.                 ['lineItemWritten', -1]
  66.             ],
  67.             'pickware_erp.stock.stock_updated_for_stock_movements' => 'onPickwareStockUpdatedFromStockMovement',
  68.         ];
  69.     }
  70.     public function onPickwareStockUpdatedFromStockMovement($stockUpdatedForStockMovementsEvent): void
  71.     {
  72.         if (!method_exists($stockUpdatedForStockMovementsEvent'getStockMovements')) {
  73.             return;
  74.         }
  75.         $productIds = [];
  76.         foreach ($stockUpdatedForStockMovementsEvent->getStockMovements() as $stockMovement) {
  77.             $productIds[] = $stockMovement['productId'];
  78.         }
  79.         $this->dispatchBundleProductStockUpdateEvent($productIds);
  80.     }
  81.     public function triggerChangeSet(PreWriteValidationEvent $event): void
  82.     {
  83.         if ($event->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  84.             return;
  85.         }
  86.         foreach ($event->getCommands() as $command) {
  87.             if (!$command instanceof ChangeSetAware) {
  88.                 continue;
  89.             }
  90.             if ($command->getDefinition()->getEntityName() !== OrderLineItemDefinition::ENTITY_NAME) {
  91.                 continue;
  92.             }
  93.             if ($command instanceof DeleteCommand) {
  94.                 $command->requestChangeSet();
  95.                 continue;
  96.             }
  97.             if ($command->hasField('referenced_id') || $command->hasField('product_id') || $command->hasField('quantity')) {
  98.                 $command->requestChangeSet();
  99.                 continue;
  100.             }
  101.         }
  102.     }
  103.     /**
  104.      * If the product of an order item changed, the stocks of the old product and the new product must be updated.
  105.      */
  106.     public function lineItemWritten(EntityWrittenEvent $event): void
  107.     {
  108.         $ids = [];
  109.         foreach ($event->getWriteResults() as $result) {
  110.             if ($result->hasPayload('referencedId') && ($result->getProperty('type') === LineItem::PRODUCT_LINE_ITEM_TYPE ||
  111.                 $result->getProperty('type') === BundleProductLineItem::BUNDLE_PRODUCT_LINE_ITEM_TYPE
  112.             )) {
  113.                 $ids[] = $result->getProperty('referencedId');
  114.             }
  115.             if ($result->getOperation() === EntityWriteResult::OPERATION_INSERT) {
  116.                 continue;
  117.             }
  118.             $changeSet $result->getChangeSet();
  119.             if (!$changeSet) {
  120.                 continue;
  121.             }
  122.             $type $changeSet->getBefore('type');
  123.             if ($type !== BundleProductLineItem::BUNDLE_PRODUCT_LINE_ITEM_TYPE) {
  124.                 continue;
  125.             }
  126.             if (!$changeSet->hasChanged('referenced_id') && !$changeSet->hasChanged('quantity')) {
  127.                 continue;
  128.             }
  129.             $ids[] = $changeSet->getBefore('referenced_id');
  130.             $ids[] = $changeSet->getAfter('referenced_id');
  131.         }
  132.         $ids array_filter(array_unique($ids));
  133.         if (empty($ids)) {
  134.             return;
  135.         }
  136.         $this->update($ids$event->getContext());
  137.     }
  138.     public function stateChanged(StateMachineTransitionEvent $event): void
  139.     {
  140.         if ($event->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  141.             return;
  142.         }
  143.         if ($event->getEntityName() !== 'order') {
  144.             return;
  145.         }
  146.         if ($event->getToPlace()->getTechnicalName() === OrderStates::STATE_COMPLETED) {
  147.             $this->decreaseStock($event);
  148.             return;
  149.         }
  150.         if ($event->getFromPlace()->getTechnicalName() === OrderStates::STATE_COMPLETED) {
  151.             $this->increaseStock($event);
  152.             return;
  153.         }
  154.         if ($event->getToPlace()->getTechnicalName() === OrderStates::STATE_CANCELLED || $event->getFromPlace()->getTechnicalName() === OrderStates::STATE_CANCELLED) {
  155.             $products $this->getProductsOfOrder($event->getEntityId());
  156.             $ids array_column($products'referenced_id');
  157.             $this->updateAvailableStockAndSales($ids$event->getContext());
  158.             $this->updateAvailableFlag($ids$event->getContext());
  159.             return;
  160.         }
  161.     }
  162.     public function update(array $idsContext $context): void
  163.     {
  164.         if ($context->getVersionId() !== Defaults::LIVE_VERSION) {
  165.             return;
  166.         }
  167.         $this->updateAvailableStockAndSales($ids$context);
  168.         $this->updateAvailableFlag($ids$context);
  169.     }
  170.     public function orderPlaced(CheckoutOrderPlacedEvent $event): void
  171.     {
  172.         $ids = [];
  173.         foreach ($event->getOrder()->getLineItems() as $lineItem) {
  174.             if (
  175.                 $lineItem->getType() !== BundleProductLineItem::BUNDLE_PRODUCT_LINE_ITEM_TYPE
  176.                 && $lineItem->getType() !== LineItem::PRODUCT_LINE_ITEM_TYPE
  177.             ) {
  178.                 continue;
  179.             }
  180.             $ids[] = $lineItem->getReferencedId();
  181.         }
  182.         $this->update($ids$event->getContext());
  183.     }
  184.     private function increaseStock(StateMachineTransitionEvent $event): void
  185.     {
  186.         $products $this->getProductsOfOrder($event->getEntityId());
  187.         $ids array_column($products'referenced_id');
  188.         $this->updateStock($products, +1);
  189.         $this->updateAvailableStockAndSales($ids$event->getContext());
  190.         $this->updateAvailableFlag($ids$event->getContext());
  191.     }
  192.     private function decreaseStock(StateMachineTransitionEvent $event): void
  193.     {
  194.         $products $this->getProductsOfOrder($event->getEntityId());
  195.         $ids array_column($products'referenced_id');
  196.         $this->updateStock($products, -1);
  197.         $this->updateAvailableStockAndSales($ids$event->getContext());
  198.         $this->updateAvailableFlag($ids$event->getContext());
  199.     }
  200.     private function updateAvailableStockAndSales(array $idsContext $context): void
  201.     {
  202.         $this->dispatchBundleProductStockUpdateEvent($ids);
  203.         if (!$this->configService->enableBundleProductStockManagement()) {
  204.             return;
  205.         }
  206.         $ids array_filter(array_keys(array_flip($ids)));
  207.         if (empty($ids)) {
  208.             return;
  209.         }
  210.         $sql '
  211. SELECT LOWER(order_line_item.referenced_id) as product_id,
  212.     IFNULL(
  213.         SUM(IF(state_machine_state.technical_name = :completed_state, 0, order_line_item.quantity)),
  214.         0
  215.     ) as open_quantity,
  216.     IFNULL(
  217.         SUM(IF(state_machine_state.technical_name = :completed_state, order_line_item.quantity, 0)),
  218.         0
  219.     ) as sales_quantity
  220. FROM order_line_item
  221.     INNER JOIN `order`
  222.         ON `order`.id = order_line_item.order_id
  223.         AND `order`.version_id = order_line_item.order_version_id
  224.     INNER JOIN state_machine_state
  225.         ON state_machine_state.id = `order`.state_id
  226.         AND state_machine_state.technical_name <> :cancelled_state
  227. WHERE (
  228.         (
  229.             order_line_item.referenced_id IN (:referenceIds)
  230.             AND order_line_item.type = :bundleType
  231.         ) OR
  232.         (
  233.             order_line_item.product_id IN (:productIds)
  234.             AND order_line_item.type = :productType
  235.         )
  236.     )
  237.     AND order_line_item.version_id = :version
  238.     AND order_line_item.referenced_id IS NOT NULL
  239. GROUP BY referenced_id;
  240.         ';
  241.         $rows $this->connection->fetchAllAssociative(
  242.             $sql,
  243.             [
  244.                 'bundleType' => BundleProductLineItem::BUNDLE_PRODUCT_LINE_ITEM_TYPE,
  245.                 'productType' => LineItem::PRODUCT_LINE_ITEM_TYPE,
  246.                 'version' => Uuid::fromHexToBytes($context->getVersionId()),
  247.                 'completed_state' => OrderStates::STATE_COMPLETED,
  248.                 'cancelled_state' => OrderStates::STATE_CANCELLED,
  249.                 'productIds' => Uuid::fromHexToBytesList($ids),
  250.                 'referenceIds' => $ids,
  251.             ],
  252.             [
  253.                 'productIds' => Connection::PARAM_STR_ARRAY,
  254.                 'referenceIds' => Connection::PARAM_STR_ARRAY,
  255.             ]
  256.         );
  257.         $fallback array_column($rows'product_id');
  258.         $fallback array_diff($ids$fallback);
  259.         if ($this->shouldApplyFallbackFor641()) {
  260.             $update = new RetryableQuery(
  261.                 $this->connection->prepare('UPDATE product SET available_stock = stock - :open_quantity, sales = :sales_quantity, updated_at = :now WHERE id = :id')
  262.             );
  263.         } else {
  264.             $update = new RetryableQuery(
  265.                 $this->connection,
  266.                 $this->connection->prepare('UPDATE product SET available_stock = stock - :open_quantity, sales = :sales_quantity, updated_at = :now WHERE id = :id')
  267.             );
  268.         }
  269.         foreach ($fallback as $id) {
  270.             $update->execute([
  271.                 'id' => Uuid::fromHexToBytes((string) $id),
  272.                 'open_quantity' => 0,
  273.                 'sales_quantity' => 0,
  274.                 'now' => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  275.             ]);
  276.         }
  277.         foreach ($rows as $row) {
  278.             $update->execute([
  279.                 'id' => Uuid::fromHexToBytes($row['product_id']),
  280.                 'open_quantity' => $row['open_quantity'],
  281.                 'sales_quantity' => $row['sales_quantity'],
  282.                 'now' => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  283.             ]);
  284.         }
  285.     }
  286.     private function updateAvailableFlag(array $idsContext $context): void
  287.     {
  288.         $ids array_filter(array_unique($ids));
  289.         if (empty($ids)) {
  290.             return;
  291.         }
  292.         $bytes Uuid::fromHexToBytesList($ids);
  293.         $sql '
  294.             UPDATE product
  295.             LEFT JOIN product parent
  296.                 ON parent.id = product.parent_id
  297.                 AND parent.version_id = product.version_id
  298.             SET product.available = IFNULL((
  299.                 IFNULL(product.is_closeout, parent.is_closeout) * product.available_stock
  300.                 >=
  301.                 IFNULL(product.is_closeout, parent.is_closeout) * IFNULL(product.min_purchase, parent.min_purchase)
  302.             ), 0)
  303.             WHERE product.id IN (:ids)
  304.             AND product.version_id = :version
  305.         ';
  306.         if ($this->shouldApplyFallbackFor641()) {
  307.             RetryableQuery::retryable(function () use ($sql$context$bytes): void {
  308.                 $this->connection->executeUpdate(
  309.                     $sql,
  310.                     ['ids' => $bytes'version' => Uuid::fromHexToBytes($context->getVersionId())],
  311.                     ['ids' => Connection::PARAM_STR_ARRAY]
  312.                 );
  313.             });
  314.         } else {
  315.             RetryableQuery::retryable($this->connection, function () use ($sql$context$bytes): void {
  316.                 $this->connection->executeUpdate(
  317.                     $sql,
  318.                     ['ids' => $bytes'version' => Uuid::fromHexToBytes($context->getVersionId())],
  319.                     ['ids' => Connection::PARAM_STR_ARRAY]
  320.                 );
  321.             });
  322.         }
  323.         $updated $this->connection->fetchFirstColumn(
  324.             'SELECT LOWER(HEX(id)) FROM product WHERE available = 0 AND id IN (:ids) AND product.version_id = :version',
  325.             ['ids' => $bytes'version' => Uuid::fromHexToBytes($context->getVersionId())],
  326.             ['ids' => Connection::PARAM_STR_ARRAY]
  327.         );
  328.         if (!empty($updated)) {
  329.             $this->dispatcher->dispatch(new ProductNoLongerAvailableEvent($updated$context));
  330.         }
  331.     }
  332.     private function updateStock(array $productsint $multiplier): void
  333.     {
  334.         if ($this->shouldApplyFallbackFor641()) {
  335.             $query = new RetryableQuery(
  336.                 $this->connection->prepare('UPDATE product SET stock = stock + :quantity WHERE id = :id AND version_id = :version')
  337.             );
  338.         } else {
  339.             $query = new RetryableQuery(
  340.                 $this->connection,
  341.                 $this->connection->prepare('UPDATE product SET stock = stock + :quantity WHERE id = :id AND version_id = :version')
  342.             );
  343.         }
  344.         foreach ($products as $product) {
  345.             $query->execute([
  346.                 'quantity' => (int) $product['quantity'] * $multiplier,
  347.                 'id' => Uuid::fromHexToBytes($product['referenced_id']),
  348.                 'version' => Uuid::fromHexToBytes(Defaults::LIVE_VERSION),
  349.             ]);
  350.         }
  351.     }
  352.     private function getProductsOfOrder(string $orderId): array
  353.     {
  354.         $query $this->connection->createQueryBuilder();
  355.         $query->select(['referenced_id''quantity']);
  356.         $query->from('order_line_item');
  357.         $query->andWhere('type = :type');
  358.         $query->andWhere('order_id = :id');
  359.         $query->andWhere('version_id = :version');
  360.         $query->setParameter('id'Uuid::fromHexToBytes($orderId));
  361.         $query->setParameter('version'Uuid::fromHexToBytes(Defaults::LIVE_VERSION));
  362.         $query->setParameter('type'BundleProductLineItem::BUNDLE_PRODUCT_LINE_ITEM_TYPE);
  363.         /** @var ResultStatement|int $resultStatement */
  364.         $resultStatement $query->execute();
  365.         if (!$resultStatement instanceof ResultStatement) {
  366.             return [];
  367.         }
  368.         return $resultStatement->fetchAll(\PDO::FETCH_ASSOC);
  369.     }
  370.     private function dispatchBundleProductStockUpdateEvent(array $ids): void
  371.     {
  372.         $this->messageBus->dispatch(new BundleStockUpdateMessage($ids));
  373.     }
  374.     private function shouldApplyFallbackFor641(): bool
  375.     {
  376.         $method = new ReflectionMethod(RetryableQuery::class, 'retryable');
  377.         return $method->getNumberOfParameters() < 2;
  378.     }
  379. }