Don't send the full event json over replication

This commit is contained in:
Erik Johnston 2017-03-17 15:47:51 +00:00
parent 2abe85d50e
commit 61f471f779
4 changed files with 38 additions and 50 deletions

View file

@ -410,11 +410,16 @@ class SynchrotronServer(HomeServer):
stream = result.get("events") stream = result.get("events")
if stream: if stream:
max_position = stream["position"] max_position = stream["position"]
event_map = yield store.get_events([row[1] for row in stream["rows"]])
for row in stream["rows"]: for row in stream["rows"]:
position = row[0] position = row[0]
internal = json.loads(row[1]) event_id = row[1]
event_json = json.loads(row[2]) event = event_map.get(event_id, None)
event = FrozenEvent(event_json, internal_metadata_dict=internal) if not event:
continue
extra_users = () extra_users = ()
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
extra_users = (event.state_key,) extra_users = (event.state_key,)

View file

@ -283,12 +283,12 @@ class ReplicationResource(Resource):
if request_events != upto_events_token: if request_events != upto_events_token:
writer.write_header_and_rows("events", res.new_forward_events, ( writer.write_header_and_rows("events", res.new_forward_events, (
"position", "internal", "json", "state_group" "position", "event_id", "room_id", "type", "state_key",
), position=upto_events_token) ), position=upto_events_token)
if request_backfill != upto_backfill_token: if request_backfill != upto_backfill_token:
writer.write_header_and_rows("backfill", res.new_backfill_events, ( writer.write_header_and_rows("backfill", res.new_backfill_events, (
"position", "internal", "json", "state_group", "position", "event_id", "room_id", "type", "state_key", "redacts",
), position=upto_backfill_token) ), position=upto_backfill_token)
writer.write_header_and_rows( writer.write_header_and_rows(

View file

@ -242,46 +242,32 @@ class SlavedEventStore(BaseSlavedStore):
return super(SlavedEventStore, self).process_replication(result) return super(SlavedEventStore, self).process_replication(result)
def _process_replication_row(self, row, backfilled): def _process_replication_row(self, row, backfilled):
internal = json.loads(row[1]) stream_ordering = row[0] if not backfilled else -row[0]
event_json = json.loads(row[2])
event = FrozenEvent(event_json, internal_metadata_dict=internal)
self.invalidate_caches_for_event( self.invalidate_caches_for_event(
event, backfilled, stream_ordering, row[1], row[2], row[3], row[4], row[5],
backfilled=backfilled,
) )
def invalidate_caches_for_event(self, event, backfilled): def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
self._invalidate_get_event_cache(event.event_id) etype, state_key, redacts, backfilled):
self._invalidate_get_event_cache(event_id)
self.get_latest_event_ids_in_room.invalidate((event.room_id,)) self.get_latest_event_ids_in_room.invalidate((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate_many( self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
(event.room_id,) (room_id,)
) )
if not backfilled: if not backfilled:
self._events_stream_cache.entity_has_changed( self._events_stream_cache.entity_has_changed(
event.room_id, event.internal_metadata.stream_ordering room_id, stream_ordering
) )
# self.get_unread_event_push_actions_by_room_for_user.invalidate_many( if redacts:
# (event.room_id,) self._invalidate_get_event_cache(redacts)
# )
if event.type == EventTypes.Redaction: if etype == EventTypes.Member:
self._invalidate_get_event_cache(event.redacts)
if event.type == EventTypes.Member:
self._membership_stream_cache.entity_has_changed( self._membership_stream_cache.entity_has_changed(
event.state_key, event.internal_metadata.stream_ordering state_key, stream_ordering
) )
self.get_invited_rooms_for_user.invalidate((event.state_key,)) self.get_invited_rooms_for_user.invalidate((state_key,))
if not event.is_state():
return
if backfilled:
return
if (not event.internal_metadata.is_invite_from_remote()
and event.internal_metadata.is_outlier()):
return

View file

@ -1620,14 +1620,13 @@ class EventsStore(SQLBaseStore):
def get_all_new_events_txn(txn): def get_all_new_events_txn(txn):
sql = ( sql = (
"SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group" "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" FROM events as e" " state_key, redacts"
" JOIN event_json as ej" " FROM events AS e"
" ON e.event_id = ej.event_id AND e.room_id = ej.room_id" " LEFT JOIN redactions USING (event_id)"
" LEFT JOIN event_to_state_groups as eg" " LEFT JOIN state_events USING (event_id)"
" ON e.event_id = eg.event_id" " WHERE ? < stream_ordering AND stream_ordering <= ?"
" WHERE ? < e.stream_ordering AND e.stream_ordering <= ?" " ORDER BY stream_ordering ASC"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?" " LIMIT ?"
) )
if have_forward_events: if have_forward_events:
@ -1653,15 +1652,13 @@ class EventsStore(SQLBaseStore):
forward_ex_outliers = [] forward_ex_outliers = []
sql = ( sql = (
"SELECT -e.stream_ordering, ej.internal_metadata, ej.json," "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
" eg.state_group" " state_key, redacts"
" FROM events as e" " FROM events AS e"
" JOIN event_json as ej" " LEFT JOIN redactions USING (event_id)"
" ON e.event_id = ej.event_id AND e.room_id = ej.room_id" " LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_to_state_groups as eg" " WHERE ? > stream_ordering AND stream_ordering >= ?"
" ON e.event_id = eg.event_id" " ORDER BY stream_ordering DESC"
" WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
" ORDER BY e.stream_ordering DESC"
" LIMIT ?" " LIMIT ?"
) )
if have_backfill_events: if have_backfill_events: