queue/notification/delivery refactor continued

This commit is contained in:
redmatrix 2015-12-16 22:42:33 -08:00
parent b909c878b1
commit 4cacfe59bd
5 changed files with 120 additions and 242 deletions

View file

@ -75,7 +75,7 @@ Some/many of these widgets have restrictions which may restrict the type of page
* suggestedchats - "interesting" chatrooms chosen for the current observer
* item - displays a single webpage item by mid
* args: mid - message_id of webpage to display
* args: mid - message_id of webpage to display (must be webpage, not a conversation item)
<br />&nbsp;<br />
* photo - display a single photo

View file

@ -16,7 +16,6 @@ function deliver_run($argv, $argc) {
logger('deliver: invoked: ' . print_r($argv,true), LOGGER_DATA);
for($x = 1; $x < $argc; $x ++) {
$dresult = null;
@ -25,76 +24,6 @@ function deliver_run($argv, $argc) {
);
if($r) {
/**
* Check to see if we have any recent communications with this hub (in the last month).
* If not, reduce the outq_priority.
*/
$base = '';
$h = parse_url($r[0]['outq_posturl']);
if($h) {
$base = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : '');
if($base !== z_root()) {
$y = q("select site_update, site_dead from site where site_url = '%s' ",
dbesc($base)
);
if($y) {
if(intval($y[0]['site_dead'])) {
remove_queue_by_posturl($r[0]['outq_posturl']);
logger('dead site ignored ' . $base);
continue;
}
if($y[0]['site_update'] < datetime_convert('UTC','UTC','now - 1 month')) {
update_queue_item($r[0]['outq_hash'],10);
logger('immediate delivery deferred for site ' . $base);
continue;
}
}
else {
// zot sites should all have a site record, unless they've been dead for as long as
// your site has existed. Since we don't know for sure what these sites are,
// call them unknown
q("insert into site (site_url, site_update, site_dead, site_type) values ('%s','%s',0,%d) ",
dbesc($base),
dbesc(datetime_convert()),
intval(($r[0]['outq_driver'] === 'post') ? SITE_TYPE_NOTZOT : SITE_TYPE_UNKNOWN)
);
}
}
}
// "post" queue driver - used for diaspora and friendica-over-diaspora communications.
if($r[0]['outq_driver'] === 'post') {
$result = z_post_url($r[0]['outq_posturl'],$r[0]['outq_msg']);
if($result['success'] && $result['return_code'] < 300) {
logger('deliver: queue post success to ' . $r[0]['outq_posturl'], LOGGER_DEBUG);
if($base) {
q("update site set site_update = '%s', site_dead = 0 where site_url = '%s' ",
dbesc(datetime_convert()),
dbesc($base)
);
}
q("update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s' limit 1",
dbesc('accepted for delivery'),
dbesc(datetime_convert()),
dbesc($argv[$x])
);
remove_queue_item($argv[$x]);
}
else {
logger('deliver: queue post returned ' . $result['return_code'] . ' from ' . $r[0]['outq_posturl'],LOGGER_DEBUG);
update_queue_item($argv[$x]);
}
continue;
}
$notify = json_decode($r[0]['outq_notify'],true);
// Messages without an outq_msg will need to go via the web, even if it's a
@ -118,7 +47,7 @@ function deliver_run($argv, $argc) {
$dresult = zot_import($msg,z_root());
}
remove_queue_item($argv[$x]);
remove_queue_item($r[0]['outq_hash']);
if($dresult && is_array($dresult)) {
foreach($dresult as $xx) {
@ -142,19 +71,11 @@ function deliver_run($argv, $argc) {
);
}
}
else {
logger('deliver: dest: ' . $r[0]['outq_posturl'], LOGGER_DEBUG);
$result = zot_zot($r[0]['outq_posturl'],$r[0]['outq_notify']);
if($result['success']) {
logger('deliver: remote zot delivery succeeded to ' . $r[0]['outq_posturl']);
zot_process_response($r[0]['outq_posturl'],$result, $r[0]);
}
else {
logger('deliver: remote zot delivery failed to ' . $r[0]['outq_posturl']);
logger('deliver: remote zot delivery fail data: ' . print_r($result,true), LOGGER_DATA);
update_queue_item($argv[$x],10);
}
}
// otherwise it's a remote delivery - call queue_deliver();
queue_deliver($r[0],true);
}
}
}

View file

@ -44,7 +44,6 @@ require_once('include/html2plain.php');
* expire (in items.php)
* like (in like.php, poke.php)
* mail (in message.php)
* suggest (in fsuggest.php)
* tag (in photos.php, poke.php, tagger.php)
* tgroup (in items.php)
* wall-new (in photos.php, item.php)
@ -52,6 +51,7 @@ require_once('include/html2plain.php');
* and ITEM_ID is the id of the item in the database that needs to be sent to others.
*
* ZOT
* permission_create abook_id
* permission_update abook_id
* refresh_all channel_id
* purge_all channel_id
@ -110,72 +110,8 @@ function notifier_run($argv, $argc){
}
if($cmd == 'permission_update' || $cmd == 'permission_create') {
// Get the recipient
$r = q("select * from abook left join xchan on abook_xchan = xchan_hash where abook_id = %d and abook_self = 0",
intval($item_id)
);
if($r) {
// Get the sender
$s = channelx_by_n($r[0]['abook_channel']);
if($s) {
$perm_update = array('sender' => $s, 'recipient' => $r[0], 'success' => false, 'deliveries' => '');
if($cmd == 'permission_create')
call_hooks('permissions_create',$perm_update);
else
call_hooks('permissions_update',$perm_update);
if($perm_update['success'] && $perm_update['deliveries'])
$deliveries[] = $perm_update['deliveries'];
if(! $perm_update['success']) {
// send a refresh message to each hub they have registered here
$h = q("select * from hubloc where hubloc_hash = '%s'
and hubloc_error = 0 and hubloc_deleted = 0",
dbesc($r[0]['hubloc_hash'])
);
if($h) {
foreach($h as $hh) {
if(in_array($hh['hubloc_url'],$dead_hubs)) {
logger('skipping dead hub: ' . $hh['hubloc_url'], LOGGER_DEBUG);
continue;
}
$data = zot_build_packet($s,'refresh',array(array(
'guid' => $hh['hubloc_guid'],
'guid_sig' => $hh['hubloc_guid_sig'],
'url' => $hh['hubloc_url'])
));
if($data) {
$hash = random_string();
queue_insert(array(
'hash' => $hash,
'account_id' => $s['channel_account_id'],
'channel_id' => $s['channel_id'],
'posturl' => $hh['hubloc_callback'],
'notify' => $data,
));
$deliveries[] = $hash;
}
}
}
}
if($deliveries)
do_delivery($deliveries);
}
}
return;
}
$expire = false;
$request = false;
$mail = false;
$fsuggest = false;
$top_level = false;
$location = false;
$recipients = array();
@ -224,51 +160,42 @@ function notifier_run($argv, $argc){
$packet_type = 'request';
$normal_mode = false;
}
elseif($cmd === 'expire') {
// FIXME
// This will require a special zot packet containing a list of item message_id's to be expired.
// This packet will be public, since we cannot selectively deliver here.
// We need the handling on this end to create the array, and the handling on the remote end
// to verify permissions (for each item) and process it. Until this is complete, the expire feature will be disabled.
return;
$normal_mode = false;
$expire = true;
$items = q("SELECT * FROM item WHERE uid = %d AND item_wall = 1
AND item_deleted = 1 AND `changed` > %s - INTERVAL %s",
intval($item_id),
db_utcnow(), db_quoteinterval('10 MINUTE')
);
$uid = $item_id;
$item_id = 0;
if(! $items)
return;
}
elseif($cmd === 'suggest') {
$normal_mode = false;
$fsuggest = true;
$suggest = q("SELECT * FROM `fsuggest` WHERE `id` = %d LIMIT 1",
elseif($cmd == 'permission_update' || $cmd == 'permission_create') {
// Get the (single) recipient
$r = q("select * from abook left join xchan on abook_xchan = xchan_hash where abook_id = %d and abook_self = 0",
intval($item_id)
);
if(! count($suggest))
return;
$uid = $suggest[0]['uid'];
$recipients[] = $suggest[0]['cid'];
$item = $suggest[0];
if($r) {
$uid = $r[0]['abook_channel'];
// Get the sender
$channel = channelx_by_n($uid);
if($channel) {
$perm_update = array('sender' => $channel, 'recipient' => $r[0], 'success' => false, 'deliveries' => '');
if($cmd == 'permission_create')
call_hooks('permissions_create',$perm_update);
else
call_hooks('permissions_update',$perm_update);
if($perm_update['success']) {
if($perm_update['deliveries']) {
$deliveries[] = $perm_update['deliveries'];
do_delivery($deliveries);
}
return;
}
else {
$recipients[] = $r[0]['abook_xchan'];
$private = false;
$packet_type = 'refresh';
}
}
}
}
elseif($cmd === 'refresh_all') {
logger('notifier: refresh_all: ' . $item_id);
$s = q("select * from channel where channel_id = %d limit 1",
intval($item_id)
);
if($s)
$channel = $s[0];
$uid = $item_id;
$recipients = array();
$channel = channelx_by_n($item_id);
$r = q("select abook_xchan from abook where abook_channel = %d",
intval($item_id)
);
@ -592,10 +519,8 @@ function notifier_run($argv, $argc){
'relay_to_owner' => $relay_to_owner,
'uplink' => $uplink,
'cmd' => $cmd,
'expire' => $expire,
'mail' => $mail,
'location' => $location,
'fsuggest' => $fsuggest,
'request' => $request,
'normal_mode' => $normal_mode,
'packet_type' => $packet_type,

View file

@ -18,11 +18,8 @@ function queue_run($argv, $argc){
else
$queue_id = 0;
$deadguys = array();
logger('queue: start');
// delete all queue items more than 3 days old
// but first mark these sites dead if we haven't heard from them in a month
@ -88,53 +85,7 @@ function queue_run($argv, $argc){
return;
foreach($r as $rr) {
$dresult = null;
if(in_array($rr['outq_posturl'],$deadguys))
continue;
$base = '';
$h = parse_url($rr['outq_posturl']);
if($h)
$base = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : '');
if($rr['outq_driver'] === 'post') {
$result = z_post_url($rr['outq_posturl'],$rr['outq_msg']);
if($result['success'] && $result['return_code'] < 300) {
logger('queue: queue post success to ' . $rr['outq_posturl'], LOGGER_DEBUG);
if($base) {
q("update site set site_update = '%s', site_dead = 0 where site_url = '%s' ",
dbesc(datetime_convert()),
dbesc($base)
);
}
q("update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s' limit 1",
dbesc('accepted for delivery'),
dbesc(datetime_convert()),
dbesc($rr['outq_hash'])
);
remove_queue_item($rr['outq_hash']);
}
else {
logger('queue: queue post returned ' . $result['return_code'] . ' from ' . $rr['outq_posturl'],LOGGER_DEBUG);
update_queue_item($rr['outq_hash'],10);
$deadguys[] = $rr['outq_posturl'];
}
continue;
}
$result = zot_zot($rr['outq_posturl'],$rr['outq_notify']);
if($result['success']) {
logger('queue: deliver zot success to ' . $rr['outq_posturl'], LOGGER_DEBUG);
zot_process_response($rr['outq_posturl'],$result, $rr);
}
else {
$deadguys[] = $rr['outq_posturl'];
logger('queue: deliver zot returned ' . $result['return_code'] . ' from ' . $rr['outq_posturl'],LOGGER_DEBUG);
update_queue_item($rr['outq_hash'],10);
}
queue_deliver($rr);
}
}

View file

@ -62,3 +62,84 @@ function queue_insert($arr) {
}
function queue_deliver($outq, $immediate = false) {
$base = null;
$h = parse_url($outq['outq_posturl']);
if($h)
$base = $h['scheme'] . '://' . $h['host'] . (($h['port']) ? ':' . $h['port'] : '');
if(($base) && ($base !== z_root()) && ($immediate)) {
$y = q("select site_update, site_dead from site where site_url = '%s' ",
dbesc($base)
);
if($y) {
if(intval($y[0]['site_dead'])) {
remove_queue_by_posturl($outq['outq_posturl']);
logger('dead site ignored ' . $base);
return;
}
if($y[0]['site_update'] < datetime_convert('UTC','UTC','now - 1 month')) {
update_queue_item($outq['outq_hash'],10);
logger('immediate delivery deferred for site ' . $base);
return;
}
}
else {
// zot sites should all have a site record, unless they've been dead for as long as
// your site has existed. Since we don't know for sure what these sites are,
// call them unknown
q("insert into site (site_url, site_update, site_dead, site_type) values ('%s','%s',0,%d) ",
dbesc($base),
dbesc(datetime_convert()),
intval(($outq['outq_driver'] === 'post') ? SITE_TYPE_NOTZOT : SITE_TYPE_UNKNOWN)
);
}
}
// "post" queue driver - used for diaspora and friendica-over-diaspora communications.
if($outq['outq_driver'] === 'post') {
$result = z_post_url($outq['outq_posturl'],$outq['outq_msg']);
if($result['success'] && $result['return_code'] < 300) {
logger('deliver: queue post success to ' . $outq['outq_posturl'], LOGGER_DEBUG);
if($base) {
q("update site set site_update = '%s', site_dead = 0 where site_url = '%s' ",
dbesc(datetime_convert()),
dbesc($base)
);
}
q("update dreport set dreport_result = '%s', dreport_time = '%s' where dreport_queue = '%s' limit 1",
dbesc('accepted for delivery'),
dbesc(datetime_convert()),
dbesc($outq['outq_hash'])
);
remove_queue_item($outq['outq_hash']);
}
else {
logger('deliver: queue post returned ' . $result['return_code']
. ' from ' . $outq['outq_posturl'],LOGGER_DEBUG);
update_queue_item($argv[$x]);
}
return;
}
// normal zot delivery
logger('deliver: dest: ' . $outq['outq_posturl'], LOGGER_DEBUG);
$result = zot_zot($outq['outq_posturl'],$outq['outq_notify']);
if($result['success']) {
logger('deliver: remote zot delivery succeeded to ' . $outq['outq_posturl']);
zot_process_response($outq['outq_posturl'],$result, $outq);
}
else {
logger('deliver: remote zot delivery failed to ' . $outq['outq_posturl']);
logger('deliver: remote zot delivery fail data: ' . print_r($result,true), LOGGER_DATA);
update_queue_item($outq['outq_hash'],10);
}
return;
}