Fix broken cache for getting retry times. This meant we retried remote destinations way more frequently than we should

This commit is contained in:
Erik Johnston 2015-11-03 16:23:35 +00:00
parent 3eb62873f6
commit 7ce264ce5f
2 changed files with 40 additions and 55 deletions

View file

@ -202,19 +202,6 @@ class TransactionQueue(object):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _attempt_new_transaction(self, destination): def _attempt_new_transaction(self, destination):
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
logger.debug(
"TX [%s] Transaction already in progress",
destination
)
return
logger.debug("TX [%s] _attempt_new_transaction", destination)
# list of (pending_pdu, deferred, order) # list of (pending_pdu, deferred, order)
pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, [])
@ -228,20 +215,34 @@ class TransactionQueue(object):
logger.debug("TX [%s] Nothing to send", destination) logger.debug("TX [%s] Nothing to send", destination)
return return
# Sort based on the order field if destination in self.pending_transactions:
pending_pdus.sort(key=lambda t: t[2]) # XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
pdus = [x[0] for x in pending_pdus] # we need application-layer timeouts of some flavour of these
edus = [x[0] for x in pending_edus] # requests
failures = [x[0].get_dict() for x in pending_failures] logger.debug(
deferreds = [ "TX [%s] Transaction already in progress",
x[1] destination
for x in pending_pdus + pending_edus + pending_failures )
] return
# NOTE: Nothing should be between the above check and the insertion below
try: try:
self.pending_transactions[destination] = 1 self.pending_transactions[destination] = 1
logger.debug("TX [%s] _attempt_new_transaction", destination)
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[2])
pdus = [x[0] for x in pending_pdus]
edus = [x[0] for x in pending_edus]
failures = [x[0].get_dict() for x in pending_failures]
deferreds = [
x[1]
for x in pending_pdus + pending_edus + pending_failures
]
txn_id = str(self._next_txn_id) txn_id = str(self._next_txn_id)
limiter = yield get_retry_limiter( limiter = yield get_retry_limiter(

View file

@ -253,16 +253,6 @@ class TransactionStore(SQLBaseStore):
retry_interval (int) - how long until next retry in ms retry_interval (int) - how long until next retry in ms
""" """
# As this is the new value, we might as well prefill the cache
self.get_destination_retry_timings.prefill(
destination,
{
"destination": destination,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval
},
)
# XXX: we could chose to not bother persisting this if our cache thinks # XXX: we could chose to not bother persisting this if our cache thinks
# this is a NOOP # this is a NOOP
return self.runInteraction( return self.runInteraction(
@ -275,31 +265,25 @@ class TransactionStore(SQLBaseStore):
def _set_destination_retry_timings(self, txn, destination, def _set_destination_retry_timings(self, txn, destination,
retry_last_ts, retry_interval): retry_last_ts, retry_interval):
query = ( txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
"UPDATE destinations"
" SET retry_last_ts = ?, retry_interval = ?"
" WHERE destination = ?"
)
txn.execute( self._simple_upsert_txn(
query, txn,
( "destinations",
retry_last_ts, retry_interval, destination, keyvalues={
) "destination": destination,
},
values={
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
insertion_values={
"destination": destination,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
}
) )
if txn.rowcount == 0:
# destination wasn't already in table. Insert it.
self._simple_insert_txn(
txn,
table="destinations",
values={
"destination": destination,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
}
)
def get_destinations_needing_retry(self): def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction. """Get all destinations which are due a retry for sending a transaction.