diff --git a/src/Core/Worker/Cron.php b/src/Core/Worker/Cron.php index 21f67c030b..de6a808aca 100644 --- a/src/Core/Worker/Cron.php +++ b/src/Core/Worker/Cron.php @@ -204,32 +204,31 @@ class Cron */ private static function deliverPosts() { - $deliveries = DBA::p("SELECT `gsid`, MAX(`failed`) AS `failed` FROM `delivery-queue` GROUP BY `gsid` ORDER BY RAND()"); - while ($delivery = DBA::fetch($deliveries)) { - if ($delivery['failed'] > 0) { - Logger::info('Removing failed deliveries', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed']]); - Delivery::removeFailedQueue($delivery['gsid']); + foreach(DI::deliveryQueueItemRepo()->selectAggregateByServerId() as $delivery) { + if ($delivery->failed > 0) { + Logger::info('Removing failed deliveries', ['gsid' => $delivery->targetServerId, 'failed' => $delivery->failed]); + DI::deliveryQueueItemRepo()->removeFailedByServerId($delivery->targetServerId, DI::config()->get('system', 'worker_defer_limit')); } - if (($delivery['failed'] < 3) || GServer::isReachableById($delivery['gsid'])) { + if (($delivery->failed < 3) || GServer::isReachableById($delivery->targetServerId)) { $priority = Worker::PRIORITY_HIGH; - } elseif ($delivery['failed'] < 6) { + } elseif ($delivery->failed < 6) { $priority = Worker::PRIORITY_MEDIUM; - } elseif ($delivery['failed'] < 8) { + } elseif ($delivery->failed < 8) { $priority = Worker::PRIORITY_LOW; } else { $priority = Worker::PRIORITY_NEGLIGIBLE; } - if (Worker::add(['priority' => $priority, 'force_priority' => true], 'BulkDelivery', $delivery['gsid'])) { - Logger::info('Priority for BulkDelivery worker adjusted', ['gsid' => $delivery['gsid'], 'failed' => $delivery['failed'], 'priority' => $priority]); + if (Worker::add(['priority' => $priority, 'force_priority' => true], 'BulkDelivery', $delivery->targetServerId)) { + Logger::info('Priority for BulkDelivery worker adjusted', ['gsid' => $delivery->targetServerId, 'failed' => $delivery->failed, 'priority' => $priority]); } } // Optimizing this table only last seconds if (DI::config()->get('system', 'optimize_tables')) { Logger::info('Optimize start'); - DBA::e("OPTIMIZE TABLE `delivery-queue`"); + DI::deliveryQueueItemRepo()->optimizeStorage(); Logger::info('Optimize end'); } } diff --git a/src/DI.php b/src/DI.php index 261455385a..0ef18a1aa3 100644 --- a/src/DI.php +++ b/src/DI.php @@ -602,6 +602,20 @@ abstract class DI return self::$dice->create(Navigation\Notifications\Factory\FormattedNavNotification::class); } + // + // "Federation" namespace instances + // + + public static function deliveryQueueItemFactory(): Federation\Factory\DeliveryQueueItem + { + return self::$dice->create(Federation\Factory\DeliveryQueueItem::class); + } + + public static function deliveryQueueItemRepo(): Federation\Repository\DeliveryQueueItem + { + return self::$dice->create(Federation\Repository\DeliveryQueueItem::class); + } + // // "Protocol" namespace instances // diff --git a/src/Database/DBA.php b/src/Database/DBA.php index 2e4fb37671..ffeecfaa71 100644 --- a/src/Database/DBA.php +++ b/src/Database/DBA.php @@ -517,7 +517,7 @@ class DBA * Build the table query substring from one or more tables, with or without a schema. * * Expected formats: - * - table + * - [table] * - [table1, table2, ...] * - [schema1 => table1, schema2 => table2, table3, ...] * diff --git a/src/Federation/Collection/DeliveryQueueAggregates.php b/src/Federation/Collection/DeliveryQueueAggregates.php new file mode 100644 index 0000000000..60f07675af --- /dev/null +++ b/src/Federation/Collection/DeliveryQueueAggregates.php @@ -0,0 +1,44 @@ +. + * + */ + +namespace Friendica\Federation\Collection; + +use Friendica\Federation\Entity; + +final class DeliveryQueueAggregates extends \Friendica\BaseCollection +{ + /** + * @param Entity\DeliveryQueueAggregate[] $entities + * @param int|null $totalCount + */ + public function __construct(array $entities = [], int $totalCount = null) + { + parent::__construct($entities, $totalCount); + } + + /** + * @return Entity\DeliveryQueueAggregate + */ + public function current(): Entity\DeliveryQueueAggregate + { + return parent::current(); + } +} diff --git a/src/Federation/Collection/DeliveryQueueItems.php b/src/Federation/Collection/DeliveryQueueItems.php new file mode 100644 index 0000000000..a41f7b07cc --- /dev/null +++ b/src/Federation/Collection/DeliveryQueueItems.php @@ -0,0 +1,44 @@ +. + * + */ + +namespace Friendica\Federation\Collection; + +use Friendica\Federation\Entity; + +final class DeliveryQueueItems extends \Friendica\BaseCollection +{ + /** + * @param Entity\DeliveryQueueItem[] $entities + * @param int|null $totalCount + */ + public function __construct(array $entities = [], int $totalCount = null) + { + parent::__construct($entities, $totalCount); + } + + /** + * @return Entity\DeliveryQueueItem + */ + public function current(): Entity\DeliveryQueueItem + { + return parent::current(); + } +} diff --git a/src/Federation/Entity/DeliveryQueueAggregate.php b/src/Federation/Entity/DeliveryQueueAggregate.php new file mode 100644 index 0000000000..9be4b0e739 --- /dev/null +++ b/src/Federation/Entity/DeliveryQueueAggregate.php @@ -0,0 +1,40 @@ +. + * + */ + +namespace Friendica\Federation\Entity; + +/** + * @property-read int $targetServerId + * @property-read int $failed Maximum number of delivery failures among the delivery queue items targeting the server + */ +final class DeliveryQueueAggregate extends \Friendica\BaseEntity +{ + /** @var int */ + protected $targetServerId; + /** @var int */ + protected $failed; + + public function __construct(int $targetServerId, int $failed) + { + $this->targetServerId = $targetServerId; + $this->failed = $failed; + } +} diff --git a/src/Federation/Entity/DeliveryQueueItem.php b/src/Federation/Entity/DeliveryQueueItem.php new file mode 100644 index 0000000000..aa9cd5b831 --- /dev/null +++ b/src/Federation/Entity/DeliveryQueueItem.php @@ -0,0 +1,62 @@ +. + * + */ + +namespace Friendica\Federation\Entity; + +use DateTimeImmutable; + +/** + * @property-read int $targetServerId + * @property-read int $postUriId + * @property-read DateTimeImmutable $created + * @property-read string $command One of the Protocol\Delivery command constant values + * @property-read int $targetContactId + * @property-read int $senderUserId + * @property-read int $failed Number of delivery failures for this post and target server + */ +final class DeliveryQueueItem extends \Friendica\BaseEntity +{ + /** @var int */ + protected $targetServerId; + /** @var int */ + protected $postUriId; + /** @var DateTimeImmutable */ + protected $created; + /** @var string */ + protected $command; + /** @var int */ + protected $targetContactId; + /** @var int */ + protected $senderUserId; + /** @var int */ + protected $failed; + + public function __construct(int $targetServerId, int $postUriId, DateTimeImmutable $created, string $command, int $targetContactId, int $senderUserId, int $failed = 0) + { + $this->targetServerId = $targetServerId; + $this->postUriId = $postUriId; + $this->created = $created; + $this->command = $command; + $this->targetContactId = $targetContactId; + $this->senderUserId = $senderUserId; + $this->failed = $failed; + } +} diff --git a/src/Federation/Factory/DeliveryQueueItem.php b/src/Federation/Factory/DeliveryQueueItem.php new file mode 100644 index 0000000000..2559e81650 --- /dev/null +++ b/src/Federation/Factory/DeliveryQueueItem.php @@ -0,0 +1,55 @@ +. + * + */ + +namespace Friendica\Federation\Factory; + +use Friendica\Federation\Entity; + +final class DeliveryQueueItem extends \Friendica\BaseFactory implements \Friendica\Capabilities\ICanCreateFromTableRow +{ + /** + * @inheritDoc + */ + public function createFromTableRow(array $row): Entity\DeliveryQueueItem + { + return new Entity\DeliveryQueueItem( + $row['gsid'], + $row['uri-id'], + new \DateTimeImmutable($row['created']), + $row['command'], + $row['cid'], + $row['uid'], + $row['failed'] + ); + } + + public function createFromDelivery(string $cmd, int $uri_id, \DateTimeImmutable $created, int $cid, int $gsid, int $uid): Entity\DeliveryQueueItem + { + return new Entity\DeliveryQueueItem( + $gsid, + $uri_id, + $created, + $cmd, + $cid, + $uid + ); + } +} diff --git a/src/Federation/Repository/DeliveryQueueItem.php b/src/Federation/Repository/DeliveryQueueItem.php new file mode 100644 index 0000000000..815cf89b50 --- /dev/null +++ b/src/Federation/Repository/DeliveryQueueItem.php @@ -0,0 +1,113 @@ +. + * + */ + +namespace Friendica\Federation\Repository; + +use Friendica\Database\Database; +use Friendica\Database\DBA; +use Friendica\Federation\Collection; +use Friendica\Federation\Entity; +use Friendica\Federation\Factory; +use Friendica\Util\DateTimeFormat; +use Psr\Log\LoggerInterface; + +final class DeliveryQueueItem extends \Friendica\BaseRepository +{ + protected static $table_name = 'delivery-queue'; + + public function __construct(Database $database, LoggerInterface $logger, Factory\DeliveryQueueItem $factory) + { + parent::__construct($database, $logger, $factory); + } + + public function selectByServerId(int $gsid, int $maxFailedCount): Collection\DeliveryQueueItems + { + $Entities = new Collection\DeliveryQueueItems(); + + $deliveryQueueItems = $this->db->select( + self::$table_name, + [], + ["`gsid` = ? AND `failed` < ?", $gsid, $maxFailedCount], + ['order' => ['created']] + ); + while ($deliveryQueueItem = $this->db->fetch($deliveryQueueItems)) { + $Entities[] = $this->factory->createFromTableRow($deliveryQueueItem); + } + + $this->db->close($deliveryQueueItems); + + return $Entities; + } + + public function selectAggregateByServerId(): Collection\DeliveryQueueAggregates + { + $Entities = new Collection\DeliveryQueueAggregates(); + + $deliveryQueueAggregates = $this->db->p("SELECT `gsid`, MAX(`failed`) AS `failed` FROM " . DBA::buildTableString([self::$table_name]) . " GROUP BY `gsid` ORDER BY RAND()"); + while ($deliveryQueueAggregate = $this->db->fetch($deliveryQueueAggregates)) { + $Entities[] = new Entity\DeliveryQueueAggregate($deliveryQueueAggregate['gsid'], $deliveryQueueAggregate['failed']); + } + + $this->db->close($deliveryQueueAggregates); + + return $Entities; + } + + public function save(Entity\DeliveryQueueItem $deliveryQueueItem) + { + $fields = [ + 'gsid' => $deliveryQueueItem->targetServerId, + 'uri-id' => $deliveryQueueItem->postUriId, + 'created' => $deliveryQueueItem->created->format(DateTimeFormat::MYSQL), + 'command' => $deliveryQueueItem->command, + 'cid' => $deliveryQueueItem->targetContactId, + 'uid' => $deliveryQueueItem->senderUserId, + 'failed' => $deliveryQueueItem->failed, + ]; + + $this->db->insert(self::$table_name, $fields, Database::INSERT_UPDATE); + } + + public function remove(Entity\DeliveryQueueItem $deliveryQueueItem): bool + { + return $this->db->delete(self::$table_name, ['uri-id' => $deliveryQueueItem->postUriId, 'gsid' => $deliveryQueueItem->targetServerId]); + } + + public function removeFailedByServerId(int $gsid, int $failedThreshold): bool + { + return $this->db->delete(self::$table_name, ["`gsid` = ? AND `failed` >= ?", $gsid, $failedThreshold]); + } + + public function incrementFailed(Entity\DeliveryQueueItem $deliveryQueueItem): bool + { + return $this->db->e(" + UPDATE " . DBA::buildTableString([self::$table_name]) . " + SET `failed` = `failed` + 1 + WHERE `uri-id` = ? AND `gsid` = ?", + $deliveryQueueItem->postUriId, $deliveryQueueItem->targetServerId + ); + } + + public function optimizeStorage(): bool + { + return $this->db->e("OPTIMIZE TABLE " . DBA::buildTableString([self::$table_name])); + } +} diff --git a/src/Protocol/Delivery.php b/src/Protocol/Delivery.php index e35433b0c9..a4fe943da5 100644 --- a/src/Protocol/Delivery.php +++ b/src/Protocol/Delivery.php @@ -606,68 +606,4 @@ class Delivery } return $success; } - - /** - * Add post for a server - * - * @param string $cmd - * @param integer $uri_id - * @param string $created - * @param integer $cid - * @param integer $gsid - * @param integer $uid - * @return bool - */ - public static function addQueue(string $cmd, int $uri_id, string $created, int $cid, int $gsid, int $uid): bool - { - $fields = ['uri-id' => $uri_id, 'uid' => $uid, 'cid' => $cid, 'gsid' => $gsid, 'created' => $created, 'command' => $cmd]; - - return DBA::insert('delivery-queue', $fields, Database::INSERT_IGNORE); - } - - /** - * Remove post by a server after delivery - * - * @param integer $uri_id - * @param integer $gsid - * @return bool - */ - public static function removeQueue(int $uri_id, int $gsid): bool - { - return DBA::delete('delivery-queue', ['uri-id' => $uri_id, 'gsid' => $gsid]); - } - - /** - * Remove failed posts for the given server - * - * @param integer $gsid - * @return bool - */ - public static function removeFailedQueue(int $gsid): bool - { - return DBA::delete('delivery-queue', ["`gsid` = ? AND `failed` >= ?", $gsid, DI::config()->get('system', 'worker_defer_limit')]); - } - - /** - * Increment "failed" counter for the given server and post - * - * @param integer $uri_id - * @param integer $gsid - * @return bool - */ - public static function incrementFailedQueue(int $uri_id, int $gsid): bool - { - return DBA::e('UPDATE `delivery-queue` SET `failed` = `failed` + 1 WHERE `uri-id` = ? AND `gsid` = ?', $uri_id, $gsid); - } - - /** - * Select queue entries for the given server - * - * @param integer $gsid - * @return array - */ - public static function selectQueueForServer(int $gsid): array - { - return DBA::selectToArray('delivery-queue', [], ["`gsid` = ? AND `failed` < ?", $gsid, DI::config()->get('system', 'worker_defer_limit')], ['order' => ['created']]); - } } diff --git a/src/Worker/BulkDelivery.php b/src/Worker/BulkDelivery.php index 5e335f7205..4ac77b2137 100644 --- a/src/Worker/BulkDelivery.php +++ b/src/Worker/BulkDelivery.php @@ -23,6 +23,7 @@ namespace Friendica\Worker; use Friendica\Core\Logger; use Friendica\Core\Worker; +use Friendica\DI; use Friendica\Model\GServer; use Friendica\Protocol\Delivery as ProtocolDelivery; @@ -33,19 +34,19 @@ class BulkDelivery $server_failure = false; $delivery_failure = false; - $posts = ProtocolDelivery::selectQueueForServer($gsid); - foreach ($posts as $post) { - if (!$server_failure && ProtocolDelivery::deliver($post['command'], $post['uri-id'], $post['cid'], $post['uid'])) { - ProtocolDelivery::removeQueue($post['uri-id'], $post['gsid']); - Logger::debug('Delivery successful', $post); + $deliveryQueueItems = DI::deliveryQueueItemRepo()->selectByServerId($gsid, DI::config()->get('system', 'worker_defer_limit')); + foreach ($deliveryQueueItems as $deliveryQueueItem) { + if (!$server_failure && ProtocolDelivery::deliver($deliveryQueueItem->command, $deliveryQueueItem->postUriId, $deliveryQueueItem->targetContactId, $deliveryQueueItem->senderUserId)) { + DI::deliveryQueueItemRepo()->remove($deliveryQueueItem); + Logger::debug('Delivery successful', $deliveryQueueItem->toArray()); } else { - ProtocolDelivery::incrementFailedQueue($post['uri-id'], $post['gsid']); + DI::deliveryQueueItemRepo()->incrementFailed($deliveryQueueItem); $delivery_failure = true; if (!$server_failure) { $server_failure = !GServer::isReachableById($gsid); } - Logger::debug('Delivery failed', ['server_failure' => $server_failure, 'post' => $post]); + Logger::debug('Delivery failed', ['server_failure' => $server_failure, 'post' => $deliveryQueueItem]); } } @@ -54,7 +55,7 @@ class BulkDelivery } if ($delivery_failure) { - ProtocolDelivery::removeFailedQueue($gsid); + DI::deliveryQueueItemRepo()->removeFailedByServerId($gsid, DI::config()->get('system', 'worker_defer_limit')); } } } diff --git a/src/Worker/Notifier.php b/src/Worker/Notifier.php index 0b7460c53d..2d21a0937c 100644 --- a/src/Worker/Notifier.php +++ b/src/Worker/Notifier.php @@ -592,7 +592,8 @@ class Notifier if (!empty($contact['gsid']) && DI::config()->get('system', 'bulk_delivery')) { $delivery_queue_count++; - Delivery::addQueue($cmd, $post_uriid, $target_item['created'], $contact['id'], $contact['gsid'], $sender_uid); + $deliveryQueueItem = DI::deliveryQueueItemFactory()->createFromDelivery($cmd, $post_uriid, new \DateTimeImmutable($target_item['created']), $contact['id'], $contact['gsid'], $sender_uid); + DI::deliveryQueueItemRepo()->save($deliveryQueueItem); Worker::add(['priority' => Worker::PRIORITY_HIGH, 'dont_fork' => true], 'BulkDelivery', $contact['gsid']); } else { if (Worker::add($deliver_options, 'Delivery', $cmd, $post_uriid, (int)$contact['id'], $sender_uid)) {