Alter the queue so that each queue item stores the scheduled time of the next delivery. This keeps the query for

queued items simple. We no longer group by posturl; as the queue update function will only keep one item per destination
scheduled for shorter term processing. Others (multiple queued items for a single destination) will be scheduled for
delivery far into the future and only delivered if the hub responds to the "active" or short term queue item.
This commit is contained in:
zotlabs 2017-01-29 14:45:25 -08:00
parent 5aa0017e91
commit d5d67708ac
6 changed files with 70 additions and 26 deletions

View file

@ -61,30 +61,15 @@ class Queue {
// the site is permanently down, there's no reason to attempt delivery at all, or at most not more than once
// or twice a day.
// FIXME: can we sort postgres on outq_priority and maintain the 'distinct' ?
// The order by max(outq_priority) might be a dodgy query because of the group by.
// The desired result is to return a sequence in the order most likely to be delivered in this run.
// If a hub has already been sitting in the queue for a few days, they should be delivered last;
// hence every failure should drop them further down the priority list.
if(ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
$prefix = 'DISTINCT ON (outq_posturl)';
$suffix = 'ORDER BY outq_posturl';
} else {
$prefix = '';
$suffix = 'GROUP BY outq_posturl ORDER BY max(outq_priority)';
}
$r = q("SELECT $prefix * FROM outq WHERE outq_delivered = 0 and (( outq_created > %s - INTERVAL %s and outq_updated < %s - INTERVAL %s ) OR ( outq_updated < %s - INTERVAL %s )) $suffix",
db_utcnow(), db_quoteinterval('12 HOUR'),
db_utcnow(), db_quoteinterval('15 MINUTE'),
db_utcnow(), db_quoteinterval('1 HOUR')
$r = q("SELECT * FROM outq WHERE outq_delivered = 0 and outq_scheduled < '%s' ",
db_utcnow()
);
}
if(! $r)
return;
foreach($r as $rr) {
queue_deliver($rr);
foreach($r as $rv) {
queue_deliver($rv);
}
}
}

View file

@ -52,7 +52,7 @@ define ( 'PLATFORM_NAME', 'hubzilla' );
define ( 'STD_VERSION', '2.1' );
define ( 'ZOT_REVISION', '1.2' );
define ( 'DB_UPDATE_VERSION', 1187 );
define ( 'DB_UPDATE_VERSION', 1188 );
define ( 'PROJECT_BASE', __DIR__ );

View file

@ -2,9 +2,42 @@
function update_queue_item($id, $add_priority = 0) {
logger('queue: requeue item ' . $id,LOGGER_DEBUG);
q("UPDATE outq SET outq_updated = '%s', outq_priority = outq_priority + %d WHERE outq_hash = '%s'",
$x = q("select outq_created, outq_posturl from outq where outq_hash = '%s' limit 1",
dbesc($id)
);
if(! $x)
return;
// Set all other records for this destination way into the future.
// The queue delivers by destination. We'll keep one queue item for
// this destination (this one) with a shorter delivery. If we succeed
// once, we'll try to deliver everything for that destination.
// The delivery will be set to at most once per hour, and if the
// queue item is less than 12 hours old, we'll schedule for fifteen
// minutes.
$r = q("UPDATE outq SET outq_scheduled = '%s' WHERE outq_posturl = '%s'",
dbesc(datetime_convert('UTC','UTC','now + 5 days')),
dbesc($x[0]['outq_posturl'])
);
$since = datetime_convert('UTC','UTC',$x[0]['outq_created']);
if($since < datetime_convert('UTC','UTC','now - 12 hour')) {
$next = datetime_convert('UTC','UTC','now + 1 hour');
}
else {
$next = datetime_convert('UTC','UTC','now + 15 minutes');
}
q("UPDATE outq SET outq_updated = '%s',
outq_priority = outq_priority + %d,
outq_scheduled = '%s'
WHERE outq_hash = '%s'",
dbesc(datetime_convert()),
intval($add_priority),
dbesc($next),
dbesc($id)
);
}
@ -33,8 +66,12 @@ function queue_set_delivered($id,$channel = 0) {
logger('queue: set delivered ' . $id,LOGGER_DEBUG);
$sql_extra = (($channel_id) ? " and outq_channel = " . intval($channel_id) . " " : '');
q("update outq set outq_delivered = 1, outq_updated = '%s' where outq_hash = '%s' $sql_extra ",
// Set the next scheduled run date so far in the future that it will be expired
// long before it ever makes it back into the delivery chain.
q("update outq set outq_delivered = 1, outq_updated = '%s', outq_scheduled = '%s' where outq_hash = '%s' $sql_extra ",
dbesc(datetime_convert()),
dbesc(datetime_convert('UTC','UTC','now + 5 days')),
dbesc($id)
);
}
@ -44,8 +81,8 @@ function queue_set_delivered($id,$channel = 0) {
function queue_insert($arr) {
$x = q("insert into outq ( outq_hash, outq_account, outq_channel, outq_driver, outq_posturl, outq_async, outq_priority,
outq_created, outq_updated, outq_notify, outq_msg )
values ( '%s', %d, %d, '%s', '%s', %d, %d, '%s', '%s', '%s', '%s' )",
outq_created, outq_updated, outq_scheduled, outq_notify, outq_msg )
values ( '%s', %d, %d, '%s', '%s', %d, %d, '%s', '%s', '%s', '%s', '%s' )",
dbesc($arr['hash']),
intval($arr['account_id']),
intval($arr['channel_id']),
@ -55,6 +92,7 @@ function queue_insert($arr) {
intval(($arr['priority']) ? $arr['priority'] : 0),
dbesc(datetime_convert()),
dbesc(datetime_convert()),
dbesc(datetime_convert()),
dbesc($arr['notify']),
dbesc(($arr['msg']) ? $arr['msg'] : '')
);

View file

@ -898,6 +898,7 @@ CREATE TABLE IF NOT EXISTS `outq` (
`outq_delivered` tinyint(1) NOT NULL DEFAULT '0',
`outq_created` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
`outq_updated` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
`outq_scheduled` datetime NOT NULL DEFAULT '0001-01-01 00:00:00',
`outq_notify` mediumtext NOT NULL,
`outq_msg` mediumtext NOT NULL,
`outq_priority` smallint(6) NOT NULL DEFAULT '0',
@ -907,6 +908,7 @@ CREATE TABLE IF NOT EXISTS `outq` (
KEY `outq_hub` (`outq_posturl`),
KEY `outq_created` (`outq_created`),
KEY `outq_updated` (`outq_updated`),
KEY `outq_scheduled` (`outq_scheduled`),
KEY `outq_async` (`outq_async`),
KEY `outq_delivered` (`outq_delivered`),
KEY `outq_priority` (`outq_priority`)

View file

@ -884,6 +884,7 @@ CREATE TABLE "outq" (
"outq_delivered" numeric(1) NOT NULL DEFAULT '0',
"outq_created" timestamp NOT NULL DEFAULT '0001-01-01 00:00:00',
"outq_updated" timestamp NOT NULL DEFAULT '0001-01-01 00:00:00',
"outq_scheduled" timestamp NOT NULL DEFAULT '0001-01-01 00:00:00',
"outq_notify" text NOT NULL,
"outq_msg" text NOT NULL,
"outq_priority" smallint NOT NULL DEFAULT '0',
@ -894,6 +895,7 @@ create index "outq_channel" on outq ("outq_channel");
create index "outq_hub" on outq ("outq_posturl");
create index "outq_created" on outq ("outq_created");
create index "outq_updated" on outq ("outq_updated");
create index "outq_scheduled" on outq ("outq_scheduled");
create index "outq_async" on outq ("outq_async");
create index "outq_delivered" on outq ("outq_delivered");
create index "outq_priority" on outq ("outq_priority");

View file

@ -1,6 +1,6 @@
<?php
define( 'UPDATE_VERSION' , 1187 );
define( 'UPDATE_VERSION' , 1188 );
/**
*
@ -2484,11 +2484,28 @@ function update_r1185() {
function update_r1186() {
$r1 = q("alter table profile add profile_vcard text not null default '' ");
$r1 = q("alter table profile add profile_vcard text not null");
if($r1)
return UPDATE_SUCCESS;
return UPDATE_FAILED;
}
function update_r1187() {
if(ACTIVE_DBTYPE == DBTYPE_POSTGRES) {
$r1 = q("alter table outq add outq_scheduled timestamp not null default '0001-01-01 00:00:00' ");
}
else {
$r1 = q("alter table outq add outq_scheduled datetime not null default '0001-01-01 00:00:00' ");
}
$r2 = q("create index outq_scheduled_idx on outq (outq_scheduled)");
if($r1 && $r2)
return UPDATE_SUCCESS;
return UPDATE_FAILED;
}