Update the extremeties tables in a seperate txn

This commit is contained in:
Erik Johnston 2015-08-24 16:04:58 +01:00
parent 457970c724
commit 55ce9f9261
2 changed files with 74 additions and 61 deletions

View file

@ -303,74 +303,81 @@ class EventFederationStore(SQLBaseStore):
], ],
) )
@defer.inlineCallbacks
def _update_extremeties(self, events):
events_by_room = {} events_by_room = {}
for ev in events: for ev in events:
events_by_room.setdefault(ev.room_id, []).append(ev) events_by_room.setdefault(ev.room_id, []).append(ev)
for room_id, room_events in events_by_room.items(): def _update_forwards_txn(txn):
prevs = [ for room_id, room_events in events_by_room.items():
e_id for ev in room_events for e_id, _ in ev.prev_events prevs = [
e_id for ev in room_events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
]
if prevs:
txn.execute(
"DELETE FROM event_forward_extremities"
" WHERE room_id = ?"
" AND event_id in (%s)" % (
",".join(["?"] * len(prevs)),
),
[room_id] + prevs,
)
query = (
"INSERT INTO event_forward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_edges WHERE prev_event_id = ?"
" )"
)
txn.executemany(
query,
[
(ev.event_id, ev.room_id, ev.event_id) for ev in events
if not ev.internal_metadata.is_outlier()
]
)
def _update_backwards_txn(txn):
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)
txn.executemany(query, [
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
for ev in events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier() if not ev.internal_metadata.is_outlier()
] ])
if prevs:
txn.execute( query = (
"DELETE FROM event_forward_extremities" "DELETE FROM event_backward_extremities"
" WHERE room_id = ?" " WHERE event_id = ? AND room_id = ?"
" AND event_id in (%s)" % ( )
",".join(["?"] * len(prevs)), txn.executemany(
), query,
[room_id] + prevs, [
(ev.event_id, ev.room_id) for ev in events
if not ev.internal_metadata.is_outlier()
]
)
for room_id in events_by_room:
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, (room_id,)
) )
query = ( yield self.runInteraction("_update_forwards_txn", _update_forwards_txn)
"INSERT INTO event_forward_extremities (event_id, room_id)" yield self.runInteraction("_update_backwards_txn", _update_backwards_txn)
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_edges WHERE prev_event_id = ?"
" )"
)
txn.executemany(
query,
[
(ev.event_id, ev.room_id, ev.event_id) for ev in events
if not ev.internal_metadata.is_outlier()
]
)
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)
txn.executemany(query, [
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
for ev in events for e_id, _ in ev.prev_events
if not ev.internal_metadata.is_outlier()
])
query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
txn.executemany(
query,
[
(ev.event_id, ev.room_id) for ev in events
if not ev.internal_metadata.is_outlier()
]
)
for room_id in events_by_room:
txn.call_after(
self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
def get_backfill_events(self, room_id, event_list, limit): def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and """Get a list of Events for a given topic that occurred before (and

View file

@ -86,6 +86,10 @@ class EventsStore(SQLBaseStore):
is_new_state=is_new_state, is_new_state=is_new_state,
) )
yield self._update_extremeties([
ev for ev, _ in chunk
])
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def persist_event(self, event, context, backfilled=False, def persist_event(self, event, context, backfilled=False,
@ -120,6 +124,8 @@ class EventsStore(SQLBaseStore):
except _RollbackButIsFineException: except _RollbackButIsFineException:
pass pass
yield self._update_extremeties([event])
max_persisted_id = yield self._stream_id_gen.get_max_token(self) max_persisted_id = yield self._stream_id_gen.get_max_token(self)
defer.returnValue((stream_ordering, max_persisted_id)) defer.returnValue((stream_ordering, max_persisted_id))