custom/plugins/PickwareErpStarter/src/Stock/ProductReservedStockUpdater.php line 115

Open in your IDE?
  1. <?php
  2. /*
  3.  * Copyright (c) Pickware GmbH. All rights reserved.
  4.  * This file is part of software that is released under a proprietary license.
  5.  * You must not copy, modify, distribute, make publicly available, or execute
  6.  * its contents or parts thereof without express permission by the copyright
  7.  * holder, unless otherwise permitted by law.
  8.  */
  9. declare(strict_types=1);
  10. namespace Pickware\PickwareErpStarter\Stock;
  11. use DateTime;
  12. use Doctrine\DBAL\Connection;
  13. use Pickware\DalBundle\DatabaseBulkInsertService;
  14. use Pickware\DalBundle\EntityPreWriteValidationEvent;
  15. use Pickware\DalBundle\EntityPreWriteValidationEventDispatcher;
  16. use Pickware\DalBundle\RetryableTransaction;
  17. use Pickware\PickwareErpStarter\Product\PickwareProductInitializer;
  18. use Shopware\Core\Checkout\Cart\LineItem\LineItem;
  19. use Shopware\Core\Checkout\Order\Aggregate\OrderDelivery\OrderDeliveryDefinition;
  20. use Shopware\Core\Checkout\Order\Aggregate\OrderDelivery\OrderDeliveryStates;
  21. use Shopware\Core\Checkout\Order\Aggregate\OrderLineItem\OrderLineItemDefinition;
  22. use Shopware\Core\Checkout\Order\OrderDefinition;
  23. use Shopware\Core\Checkout\Order\OrderEvents;
  24. use Shopware\Core\Checkout\Order\OrderStates;
  25. use Shopware\Core\Content\Product\ProductEvents;
  26. use Shopware\Core\Defaults;
  27. use Shopware\Core\Framework\DataAbstractionLayer\EntityWriteResult;
  28. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenEvent;
  29. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\ChangeSetAware;
  30. use Shopware\Core\Framework\DataAbstractionLayer\Write\Command\DeleteCommand;
  31. use Shopware\Core\Framework\DataAbstractionLayer\Write\Validation\PreWriteValidationEvent;
  32. use Shopware\Core\Framework\Uuid\Uuid;
  33. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  34. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  35. class ProductReservedStockUpdater implements EventSubscriberInterface
  36. {
  37.     private Connection $db;
  38.     private EventDispatcherInterface $eventDispatcher;
  39.     private ?DatabaseBulkInsertService $bulkInsertWithUpdate;
  40.     private ?PickwareProductInitializer $pickwareProductInitializer;
  41.     /** @deprecated next major, $bulkInsertWithUpdate and $pickwareProductInitializer will be non-optional */
  42.     public function __construct(
  43.         Connection $db,
  44.         EventDispatcherInterface $eventDispatcher,
  45.         ?DatabaseBulkInsertService $bulkInsertWithUpdate null,
  46.         ?PickwareProductInitializer $pickwareProductInitializer null
  47.     ) {
  48.         $this->db $db;
  49.         $this->eventDispatcher $eventDispatcher;
  50.         $this->bulkInsertWithUpdate $bulkInsertWithUpdate;
  51.         $this->pickwareProductInitializer $pickwareProductInitializer;
  52.     }
  53.     public static function getSubscribedEvents(): array
  54.     {
  55.         return [
  56.             EntityPreWriteValidationEventDispatcher::getEventName(OrderLineItemDefinition::ENTITY_NAME) => 'triggerOrderLineItemChangeSet',
  57.             EntityPreWriteValidationEventDispatcher::getEventName(OrderDefinition::ENTITY_NAME) => 'triggerOrderChangeSet',
  58.             EntityPreWriteValidationEventDispatcher::getEventName(OrderDeliveryDefinition::ENTITY_NAME) => 'triggerOrderDeliveryChangeSet',
  59.             StockUpdatedForStockMovementsEvent::class => 'stockUpdatedForStockMovements',
  60.             ProductEvents::PRODUCT_WRITTEN_EVENT => 'productWritten',
  61.             OrderEvents::ORDER_WRITTEN_EVENT => 'orderWritten',
  62.             OrderEvents::ORDER_DELETED_EVENT => 'orderWritten',
  63.             OrderEvents::ORDER_DELIVERY_WRITTEN_EVENT => 'orderDeliveryWritten',
  64.             OrderEvents::ORDER_LINE_ITEM_WRITTEN_EVENT => 'orderLineItemWritten',
  65.             OrderEvents::ORDER_LINE_ITEM_DELETED_EVENT => 'orderLineItemWritten',
  66.         ];
  67.     }
  68.     /**
  69.      * If this subscriber instantiated in its old version (with the Shopware PreWriteValidationEvent subscribed with
  70.      * function triggerChangeSet) during plugin update, we need to keep the old (unused) subscriber function to not
  71.      * crash the container. The function is unused during update, so we can keep this a noop.
  72.      * See also: https://github.com/pickware/shopware-plugins/commit/d4cd9e725df724388fa31cc24461ff62ee0585eb#diff-d298c50af83392dc452a387c04823b8b63b7d333f250e02fbed95aa490ae3914L60
  73.      */
  74.     public function triggerChangeSet(PreWriteValidationEvent $event): void
  75.     {
  76.     }
  77.     public function triggerOrderLineItemChangeSet(EntityPreWriteValidationEvent $event): void
  78.     {
  79.         foreach ($event->getCommands() as $command) {
  80.             if ($command instanceof ChangeSetAware && (
  81.                 $command instanceof DeleteCommand
  82.                     || $command->hasField('product_id')
  83.                     || $command->hasField('product_version_id')
  84.                     || $command->hasField('version_id')
  85.                     || $command->hasField('type')
  86.                     || $command->hasField('quantity')
  87.             )
  88.             ) {
  89.                 $command->requestChangeSet();
  90.             }
  91.         }
  92.     }
  93.     public function triggerOrderChangeSet(EntityPreWriteValidationEvent $event): void
  94.     {
  95.         foreach ($event->getCommands() as $command) {
  96.             if ($command instanceof ChangeSetAware && (
  97.                 $command instanceof DeleteCommand
  98.                     || $command->hasField('order_line_item_id')
  99.             )
  100.             ) {
  101.                 $command->requestChangeSet();
  102.             }
  103.         }
  104.     }
  105.     public function triggerOrderDeliveryChangeSet(EntityPreWriteValidationEvent $event): void
  106.     {
  107.         foreach ($event->getCommands() as $command) {
  108.             if ($command instanceof ChangeSetAware && $command->hasField('order_id')) {
  109.                 $command->requestChangeSet();
  110.             }
  111.         }
  112.     }
  113.     public function stockUpdatedForStockMovements(StockUpdatedForStockMovementsEvent $event): void
  114.     {
  115.         $productIds = [];
  116.         foreach ($event->getStockMovements() as $stockMovement) {
  117.             if ($stockMovement['sourceOrderId'] || $stockMovement['destinationOrderId']) {
  118.                 $productIds[] = $stockMovement['productId'];
  119.             }
  120.         }
  121.         $this->recalculateProductReservedStock($productIds);
  122.     }
  123.     public function orderWritten(EntityWrittenEvent $entityWrittenEvent): void
  124.     {
  125.         if ($entityWrittenEvent->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  126.             return;
  127.         }
  128.         $orderIds = [];
  129.         foreach ($entityWrittenEvent->getWriteResults() as $writeResult) {
  130.             $payload $writeResult->getPayload();
  131.             if (isset($payload['versionId'])
  132.                 || isset($payload['stateId'])
  133.             ) {
  134.                 $orderIds[] = $writeResult->getPrimaryKey();
  135.             }
  136.         }
  137.         $products $this->db->fetchAllAssociative(
  138.             'SELECT LOWER(HEX(`order_line_item`.`product_id`)) AS `id`
  139.             FROM `order_line_item`
  140.             WHERE `order_line_item`.`order_id` IN (:orderIds)
  141.                 AND `order_line_item`.`version_id` = :liveVersionId
  142.                 AND `order_line_item`.`order_version_id` = :liveVersionId
  143.                 AND `order_line_item`.`product_version_id` = :liveVersionId
  144.                 AND `order_line_item`.`product_id` IS NOT NULL',
  145.             [
  146.                 'orderIds' => array_map('hex2bin'$orderIds),
  147.                 'liveVersionId' => hex2bin(Defaults::LIVE_VERSION),
  148.             ],
  149.             [
  150.                 'orderIds' => Connection::PARAM_STR_ARRAY,
  151.             ],
  152.         );
  153.         $productIds array_column($products'id');
  154.         $this->recalculateProductReservedStock($productIds);
  155.     }
  156.     public function orderDeliveryWritten(EntityWrittenEvent $entityWrittenEvent): void
  157.     {
  158.         if ($entityWrittenEvent->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  159.             return;
  160.         }
  161.         $orderDeliveryIds = [];
  162.         $orderIds = [];
  163.         foreach ($entityWrittenEvent->getWriteResults() as $writeResult) {
  164.             $payload $writeResult->getPayload();
  165.             if (isset($payload['stateId'])
  166.                 || isset($payload['versionId'])
  167.                 || isset($payload['orderVersionId'])
  168.             ) {
  169.                 $orderDeliveryIds[] = $payload['id'];
  170.             }
  171.             $changeSet $writeResult->getChangeSet();
  172.             if ($changeSet && $changeSet->hasChanged('order_id') && !empty($changeSet->getBefore('order_id'))) {
  173.                 $orderIds[] = bin2hex($changeSet->getBefore('order_id'));
  174.                 $orderIdAfter $changeSet->getAfter('order_id');
  175.                 if ($orderIdAfter) {
  176.                     // $orderIdAfter === null, when product_id was not changed
  177.                     $orderIds[] = bin2hex($orderIdAfter);
  178.                 }
  179.             }
  180.         }
  181.         $productIds = [];
  182.         if (count($orderDeliveryIds) > 0) {
  183.             $orderDeliveries $this->db->fetchAllAssociative(
  184.                 'SELECT
  185.                     LOWER(HEX(`order_line_item`.`product_id`)) AS `productId`
  186.                 FROM `order_delivery`
  187.                 INNER JOIN `order`
  188.                     ON `order`.`id` = `order_delivery`.`order_id`
  189.                     AND `order`.`version_id` = `order_delivery`.`order_version_id`
  190.                 INNER JOIN `order_line_item`
  191.                     ON `order`.`id` = `order_line_item`.`order_id`
  192.                     AND `order`.`version_id` = `order_line_item`.`order_version_id`
  193.                 WHERE `order_delivery`.`id` IN (:orderDeliveryIds)
  194.                     AND `order_line_item`.`product_id` IS NOT NULL
  195.                     AND `order_line_item`.`product_version_id` = :liveVersionId',
  196.                 [
  197.                     'orderDeliveryIds' => array_map('hex2bin'$orderDeliveryIds),
  198.                     'liveVersionId' => hex2bin(Defaults::LIVE_VERSION),
  199.                 ],
  200.                 [
  201.                     'orderDeliveryIds' => Connection::PARAM_STR_ARRAY,
  202.                 ],
  203.             );
  204.             $productIds array_merge($productIdsarray_column($orderDeliveries'productId'));
  205.         }
  206.         if (count($orderIds) > 0) {
  207.             $orders $this->db->fetchAllAssociative(
  208.                 'SELECT
  209.                     LOWER(HEX(`order_line_item`.`product_id`)) AS `productId`
  210.                 FROM `order`
  211.                 INNER JOIN `order_line_item`
  212.                     ON `order`.`id` = `order_line_item`.`order_id`
  213.                     AND `order`.`version_id` = `order_line_item`.`order_version_id`
  214.                 WHERE `order`.`id` IN (:orderIds)
  215.                     AND `order_line_item`.`product_id` IS NOT NULL
  216.                     AND `order_line_item`.`product_version_id` = :liveVersionId',
  217.                 [
  218.                     'orderIds' => array_map('hex2bin'$orderIds),
  219.                     'liveVersionId' => hex2bin(Defaults::LIVE_VERSION),
  220.                 ],
  221.                 [
  222.                     'orderIds' => Connection::PARAM_STR_ARRAY,
  223.                 ],
  224.             );
  225.             $productIds array_merge($productIdsarray_column($orders'productId'));
  226.         }
  227.         $productIds array_values(array_unique($productIds));
  228.         $this->recalculateProductReservedStock($productIds);
  229.     }
  230.     public function productWritten(EntityWrittenEvent $entityWrittenEvent): void
  231.     {
  232.         if ($entityWrittenEvent->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  233.             return;
  234.         }
  235.         $productIds = [];
  236.         foreach ($entityWrittenEvent->getWriteResults() as $writeResult) {
  237.             $payload $writeResult->getPayload();
  238.             if (isset($payload['versionId'])
  239.                 || isset($payload['availableStock'])
  240.             ) {
  241.                 $productIds[] = $payload['id'];
  242.             }
  243.         }
  244.         $this->recalculateProductReservedStock($productIds);
  245.     }
  246.     /**
  247.      * Updates the old and the new product, if the product of an order line item is changed.
  248.      */
  249.     public function orderLineItemWritten(EntityWrittenEvent $entityWrittenEvent): void
  250.     {
  251.         if ($entityWrittenEvent->getContext()->getVersionId() !== Defaults::LIVE_VERSION) {
  252.             return;
  253.         }
  254.         $productIds = [];
  255.         foreach ($entityWrittenEvent->getWriteResults() as $writeResult) {
  256.             // $writeResult->getExistence() can be null, but we have no idea why and also not what this means.
  257.             $existence $writeResult->getExistence();
  258.             $isNewOrderLineItem = (
  259.                 $existence === null
  260.                 && $writeResult->getOperation() === EntityWriteResult::OPERATION_INSERT
  261.             ) || (
  262.                 $existence !== null && !$existence->exists()
  263.             );
  264.             if ($isNewOrderLineItem && array_key_exists('productId'$writeResult->getPayload())) {
  265.                 // This is a newly-created order line item
  266.                 $productIds[] = $writeResult->getPayload()['productId'];
  267.                 continue;
  268.             }
  269.             $changeSet $writeResult->getChangeSet();
  270.             if ($changeSet) {
  271.                 if ($changeSet->hasChanged('product_id')
  272.                     || $changeSet->hasChanged('product_version_id')
  273.                     || $changeSet->hasChanged('type')
  274.                     || $changeSet->hasChanged('version_id')
  275.                     || $changeSet->hasChanged('quantity')
  276.                 ) {
  277.                     $productIdBefore $changeSet->getBefore('product_id');
  278.                     if ($productIdBefore) {
  279.                         $productIds[] = bin2hex($productIdBefore);
  280.                     }
  281.                     $productIdAfter $changeSet->getAfter('product_id');
  282.                     if ($productIdAfter) {
  283.                         // $productIdAfter === null, when product_id was not changed
  284.                         $productIds[] = bin2hex($productIdAfter);
  285.                     }
  286.                 }
  287.             }
  288.         }
  289.         $productIds array_values(array_filter(array_unique($productIds)));
  290.         $this->recalculateProductReservedStock($productIds);
  291.     }
  292.     /**
  293.      * @param string[] $productIds
  294.      */
  295.     public function recalculateProductReservedStock(array $productIds): void
  296.     {
  297.         if (!$this->bulkInsertWithUpdate) {
  298.             // The property was made optional for backwards compatibility in the constructor. Should not happen
  299.             // during an actual request. Return early.
  300.             return;
  301.         }
  302.         if (count($productIds) === 0) {
  303.             return;
  304.         }
  305.         // By splitting the SELECT and the UPDATE query we work around a performance problem. If the queries were
  306.         // executed in one UPDATE ... JOIN query the query time would rise unexpectedly.
  307.         RetryableTransaction::retryable($this->db, function () use ($productIds): void {
  308.             $existingProductIds $this->db->fetchFirstColumn(
  309.                 'SELECT `id` FROM `product` WHERE `id` IN (:productIds) FOR UPDATE',
  310.                 ['productIds' => array_map('hex2bin'$productIds)],
  311.                 ['productIds' => Connection::PARAM_STR_ARRAY],
  312.             );
  313.             $this->pickwareProductInitializer->ensurePickwareProductsExist($productIds);
  314.             $pickwareProductReservedStocks $this->db->fetchAllAssociative(
  315.                 'SELECT
  316.                     `pickware_product`.`id` AS `id`,
  317.                     `product`.`id` AS `product_id`,
  318.                     `product`.`version_id` AS `product_version_id`,
  319.                     SUM(
  320.                         GREATEST(0, IFNULL(`order_line_item`.`quantity`, 0) - IFNULL(`stock`.`quantity`, 0))
  321.                     ) AS `reserved_stock`,
  322.                     NOW(3) as `updated_at`,
  323.                     NOW(3) as `created_at`
  324.                 FROM `product`
  325.                 INNER JOIN `order_line_item`
  326.                     ON `order_line_item`.`product_id` = `product`.`id`
  327.                     AND `order_line_item`.`product_version_id` = `product`.`version_id`
  328.                     AND `order_line_item`.`version_id` = :liveVersionId
  329.                     AND `order_line_item`.`type` = :orderLineItemTypeProduct
  330.                 INNER JOIN `order`
  331.                     ON `order`.`id` = `order_line_item`.`order_id`
  332.                     AND `order`.`version_id` = `order_line_item`.`order_version_id`
  333.                     AND `order`.`version_id` = :liveVersionId
  334.                 INNER JOIN `state_machine_state` AS `order_state`
  335.                     ON `order`.`state_id` = `order_state`.`id`
  336.                 LEFT JOIN `pickware_erp_stock` AS `stock`
  337.                     ON `product`.`id` = `stock`.`product_id`
  338.                     AND `product`.`version_id` = `stock`.`product_version_id`
  339.                     AND `order`.`id` = `stock`.`order_id`
  340.                     AND `order`.`version_id` = `stock`.`order_version_id`
  341.                 LEFT JOIN `pickware_erp_pickware_product` AS `pickware_product`
  342.                     ON `product`.`id` = `pickware_product`.`product_id`
  343.                     AND `product`.`version_id` = `pickware_product`.`product_version_id`
  344.                 -- Select a single order delivery with the highest shippingCosts.unitPrice as the primary order
  345.                 -- delivery for the order. This selection strategy is adapted from how order deliveries are selected
  346.                 -- in the administration. See /administration/src/module/sw-order/view/sw-order-detail-base/index.js
  347.                 LEFT JOIN (
  348.                     SELECT
  349.                         `order_id`,
  350.                         `order_version_id`,
  351.                         MAX(
  352.                             CAST(JSON_UNQUOTE(
  353.                                 JSON_EXTRACT(`order_delivery`.`shipping_costs`, "$.unitPrice")
  354.                             ) AS DECIMAL)
  355.                         ) AS `unitPrice`
  356.                     FROM `order_delivery`
  357.                     INNER JOIN `order`
  358.                         ON `order_delivery`.`order_id` = `order`.`id`
  359.                         AND `order_delivery`.`order_version_id` = `order`.`version_id`
  360.                     INNER JOIN `state_machine_state` AS `order_state`
  361.                         ON `order`.`state_id` = `order_state`.`id`
  362.                     WHERE `order_state`.`technical_name` IN (:orderStates)
  363.                     GROUP BY `order_id`, `order_version_id`
  364.                 ) `primary_order_delivery_shipping_cost`
  365.                     ON `primary_order_delivery_shipping_cost`.`order_id` = `order`.`id`
  366.                     AND `primary_order_delivery_shipping_cost`.`order_version_id` = `order`.`version_id`
  367.                 LEFT JOIN `order_delivery` as `primary_order_delivery`
  368.                     ON `primary_order_delivery`.`order_version_id` = `order`.`version_id`
  369.                     AND `primary_order_delivery`.`id` = (
  370.                         SELECT `id`
  371.                         FROM `order_delivery`
  372.                         WHERE `order_delivery`.`order_id` = `order`.`id`
  373.                         AND `order_delivery`.`order_version_id` = `order`.`version_id`
  374.                         AND CAST(JSON_UNQUOTE(JSON_EXTRACT(`order_delivery`.`shipping_costs`, "$.unitPrice")) AS DECIMAL) = `primary_order_delivery_shipping_cost`.`unitPrice`
  375.                         -- Add LIMIT 1 here because this join would join multiple deliveries if they are tied for the
  376.                         -- primary order delivery (i.e. multiple order delivery have the same highest shipping cost).
  377.                         LIMIT 1
  378.                     )
  379.                 LEFT JOIN `state_machine_state` AS `order_delivery_state`
  380.                     ON `order_delivery_state`.`id` = `primary_order_delivery`.`state_id`
  381.                 WHERE
  382.                     -- The following two conditions are performance optimizations
  383.                     `product`.`id` IN (:productIds)
  384.                     AND `product`.`version_id` = :liveVersionId
  385.                     AND `order_state`.`technical_name` IN (:orderStates)
  386.                     -- Order deliveries do not have to exist starting with SW6.4.19.0 when digital products were introduced.
  387.                     -- In such a case only the order state should determine if the order reserves stock or not.
  388.                     AND (`order_delivery_state`.`technical_name` IS NULL OR `order_delivery_state`.`technical_name` IN (:orderDeliveryStates))
  389.                 GROUP BY
  390.                      `product`.`id`,
  391.                      `product`.`version_id`',
  392.                 [
  393.                     'orderStates' => [
  394.                         OrderStates::STATE_OPEN,
  395.                         OrderStates::STATE_IN_PROGRESS,
  396.                     ],
  397.                     'orderDeliveryStates' => [
  398.                         OrderDeliveryStates::STATE_OPEN,
  399.                         OrderDeliveryStates::STATE_PARTIALLY_SHIPPED,
  400.                     ],
  401.                     'liveVersionId' => hex2bin(Defaults::LIVE_VERSION),
  402.                     'productIds' => array_map('hex2bin'$productIds),
  403.                     'orderLineItemTypeProduct' => LineItem::PRODUCT_LINE_ITEM_TYPE,
  404.                 ],
  405.                 [
  406.                     'orderStates' => Connection::PARAM_STR_ARRAY,
  407.                     'orderDeliveryStates' => Connection::PARAM_STR_ARRAY,
  408.                     'productIds' => Connection::PARAM_STR_ARRAY,
  409.                 ],
  410.             );
  411.             // Set existing but not updated products to 0 reserved stock
  412.             $updatedProductIds array_unique(array_map(
  413.                 fn ($row) => $row['product_id'],
  414.                 $pickwareProductReservedStocks,
  415.             ));
  416.             $nonUpdatedProductIds array_diff($existingProductIds$updatedProductIds);
  417.             $this->bulkInsertWithUpdate->insertOnDuplicateKeyUpdate(
  418.                 'pickware_erp_pickware_product',
  419.                 array_map(
  420.                     fn ($nonUpdatedProductId) => [
  421.                         'id' => Uuid::randomBytes(),
  422.                         'product_id' => $nonUpdatedProductId,
  423.                         'product_version_id' => hex2bin(Defaults::LIVE_VERSION),
  424.                         'reserved_stock' => 0,
  425.                         'updated_at' => (new DateTime())->format('Y-m-d H:i:s'),
  426.                         'created_at' => (new DateTime())->format('Y-m-d H:i:s'),
  427.                     ],
  428.                     $nonUpdatedProductIds,
  429.                 ),
  430.                 [],
  431.                 ['reserved_stock'],
  432.             );
  433.             // While testing optimizations on a larger shop system we saw that 5000 is a batch size which has great
  434.             // performance while also having a size large enough that smaller shops can update everything in one go to
  435.             // not waste performance on those systems.
  436.             // Further references: https://github.com/pickware/shopware-plugins/issues/3324 and linked tickets
  437.             $batches array_chunk($pickwareProductReservedStocks5000);
  438.             foreach ($batches as $batch) {
  439.                 $this->bulkInsertWithUpdate->insertOnDuplicateKeyUpdate(
  440.                     'pickware_erp_pickware_product',
  441.                     $batch,
  442.                     [],
  443.                     ['reserved_stock'],
  444.                 );
  445.             }
  446.             $this->eventDispatcher->dispatch(new ProductReservedStockUpdatedEvent($productIds));
  447.         });
  448.     }
  449. }