Merge pull request #568 from matrix-org/erikj/unread_notif

Atomically persit push actions when we persist the event
This commit is contained in:
Erik Johnston 2016-02-10 11:25:32 +00:00
commit f7ef5c1d57
7 changed files with 66 additions and 89 deletions

View file

@ -20,3 +20,4 @@ class EventContext(object):
self.current_state = current_state self.current_state = current_state
self.state_group = None self.state_group = None
self.rejected = False self.rejected = False
self.push_actions = []

View file

@ -53,25 +53,10 @@ class BaseHandler(object):
self.event_builder_factory = hs.get_event_builder_factory() self.event_builder_factory = hs.get_event_builder_factory()
@defer.inlineCallbacks @defer.inlineCallbacks
def _filter_events_for_clients(self, user_tuples, events): def _filter_events_for_clients(self, user_tuples, events, event_id_to_state):
""" Returns dict of user_id -> list of events that user is allowed to """ Returns dict of user_id -> list of events that user is allowed to
see. see.
""" """
# If there is only one user, just get the state for that one user,
# otherwise just get all the state.
if len(user_tuples) == 1:
types = (
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, user_tuples[0][0]),
)
else:
types = None
event_id_to_state = yield self.store.get_state_for_events(
frozenset(e.event_id for e in events),
types=types
)
forgotten = yield defer.gatherResults([ forgotten = yield defer.gatherResults([
self.store.who_forgot_in_room( self.store.who_forgot_in_room(
room_id, room_id,
@ -135,7 +120,17 @@ class BaseHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _filter_events_for_client(self, user_id, events, is_peeking=False): def _filter_events_for_client(self, user_id, events, is_peeking=False):
# Assumes that user has at some point joined the room if not is_guest. # Assumes that user has at some point joined the room if not is_guest.
res = yield self._filter_events_for_clients([(user_id, is_peeking)], events) types = (
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, user_id),
)
event_id_to_state = yield self.store.get_state_for_events(
frozenset(e.event_id for e in events),
types=types
)
res = yield self._filter_events_for_clients(
[(user_id, is_peeking)], events, event_id_to_state
)
defer.returnValue(res.get(user_id, [])) defer.returnValue(res.get(user_id, []))
def ratelimit(self, user_id): def ratelimit(self, user_id):
@ -269,13 +264,13 @@ class BaseHandler(object):
"You don't have permission to redact events" "You don't have permission to redact events"
) )
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
action_generator = ActionGenerator(self.hs) action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event( yield action_generator.handle_push_actions_for_event(
event, self event, context, self
)
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
) )
destinations = set() destinations = set()

View file

@ -236,12 +236,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key) user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id) yield user_joined_room(self.distributor, user, event.room_id)
if not backfilled and not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
event, self
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events): def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events( event_to_state = yield self.store.get_state_for_events(
@ -1073,6 +1067,12 @@ class FederationHandler(BaseHandler):
auth_events=auth_events, auth_events=auth_events,
) )
if not backfilled and not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
event, context, self
)
event_stream_id, max_stream_id = yield self.store.persist_event( event_stream_id, max_stream_id = yield self.store.persist_event(
event, event,
context=context, context=context,

View file

@ -19,8 +19,6 @@ import bulk_push_rule_evaluator
import logging import logging
from synapse.api.constants import EventTypes
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -36,21 +34,15 @@ class ActionGenerator:
# tag (ie. we just need all the users). # tag (ie. we just need all the users).
@defer.inlineCallbacks @defer.inlineCallbacks
def handle_push_actions_for_event(self, event, handler): def handle_push_actions_for_event(self, event, context, handler):
if event.type == EventTypes.Redaction and event.redacts is not None:
yield self.store.remove_push_actions_for_event_id(
event.room_id, event.redacts
)
bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
event.room_id, self.hs, self.store event.room_id, self.hs, self.store
) )
actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler) actions_by_user = yield bulk_evaluator.action_for_event_by_user(
event, handler, context.current_state
yield self.store.set_push_actions_for_event_and_users(
event,
[
(uid, None, actions) for uid, actions in actions_by_user.items()
]
) )
context.push_actions = [
(uid, None, actions) for uid, actions in actions_by_user.items()
]

View file

@ -98,25 +98,21 @@ class BulkPushRuleEvaluator:
self.store = store self.store = store
@defer.inlineCallbacks @defer.inlineCallbacks
def action_for_event_by_user(self, event, handler): def action_for_event_by_user(self, event, handler, current_state):
actions_by_user = {} actions_by_user = {}
users_dict = yield self.store.are_guests(self.rules_by_user.keys()) users_dict = yield self.store.are_guests(self.rules_by_user.keys())
filtered_by_user = yield handler._filter_events_for_clients( filtered_by_user = yield handler._filter_events_for_clients(
users_dict.items(), [event] users_dict.items(), [event], {event.event_id: current_state}
) )
evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room)) evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
condition_cache = {} condition_cache = {}
member_state = yield self.store.get_state_for_event(
event.event_id,
)
display_names = {} display_names = {}
for ev in member_state.values(): for ev in current_state.values():
nm = ev.content.get("displayname", None) nm = ev.content.get("displayname", None)
if nm and ev.type == EventTypes.Member: if nm and ev.type == EventTypes.Member:
display_names[ev.state_key] = nm display_names[ev.state_key] = nm

View file

@ -24,8 +24,7 @@ logger = logging.getLogger(__name__)
class EventPushActionsStore(SQLBaseStore): class EventPushActionsStore(SQLBaseStore):
@defer.inlineCallbacks def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
def set_push_actions_for_event_and_users(self, event, tuples):
""" """
:param event: the event set actions for :param event: the event set actions for
:param tuples: list of tuples of (user_id, profile_tag, actions) :param tuples: list of tuples of (user_id, profile_tag, actions)
@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore):
'highlight': 1 if _action_has_highlight(actions) else 0, 'highlight': 1 if _action_has_highlight(actions) else 0,
}) })
def f(txn): for uid, _, __ in tuples:
for uid, _, __ in tuples: txn.call_after(
txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
self.get_unread_event_push_actions_by_room_for_user.invalidate_many, (event.room_id, uid)
(event.room_id, uid) )
) self._simple_insert_many_txn(txn, "event_push_actions", values)
return self._simple_insert_many_txn(txn, "event_push_actions", values)
yield self.runInteraction(
"set_actions_for_event_and_users",
f,
)
@cachedInlineCallbacks(num_args=3, lru=True, tree=True) @cachedInlineCallbacks(num_args=3, lru=True, tree=True)
def get_unread_event_push_actions_by_room_for_user( def get_unread_event_push_actions_by_room_for_user(
@ -107,21 +100,15 @@ class EventPushActionsStore(SQLBaseStore):
) )
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
def remove_push_actions_for_event_id(self, room_id, event_id): # Sad that we have to blow away the cache for the whole room here
def f(txn): txn.call_after(
# Sad that we have to blow away the cache for the whole room here self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
txn.call_after( (room_id,)
self.get_unread_event_push_actions_by_room_for_user.invalidate_many, )
(room_id,) txn.execute(
) "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
txn.execute( (room_id, event_id)
"DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
(room_id, event_id)
)
yield self.runInteraction(
"remove_push_actions_for_event_id",
f
) )

View file

@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore):
@log_function @log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled, def _persist_events_txn(self, txn, events_and_contexts, backfilled,
is_new_state=True): is_new_state=True):
depth_updates = {}
# Remove the any existing cache entries for the event_ids for event, context in events_and_contexts:
for event, _ in events_and_contexts: # Remove the any existing cache entries for the event_ids
txn.call_after(self._invalidate_get_event_cache, event.event_id) txn.call_after(self._invalidate_get_event_cache, event.event_id)
if not backfilled: if not backfilled:
txn.call_after( txn.call_after(
self._events_stream_cache.entity_has_changed, self._events_stream_cache.entity_has_changed,
event.room_id, event.internal_metadata.stream_ordering, event.room_id, event.internal_metadata.stream_ordering,
) )
depth_updates = {} if not event.internal_metadata.is_outlier():
for event, _ in events_and_contexts: depth_updates[event.room_id] = max(
if event.internal_metadata.is_outlier(): event.depth, depth_updates.get(event.room_id, event.depth)
continue )
depth_updates[event.room_id] = max(
event.depth, depth_updates.get(event.room_id, event.depth) if context.push_actions:
self._set_push_actions_for_event_and_users_txn(
txn, event, context.push_actions
)
if event.type == EventTypes.Redaction and event.redacts is not None:
self._remove_push_actions_for_event_id_txn(
txn, event.room_id, event.redacts
) )
for room_id, depth in depth_updates.items(): for room_id, depth in depth_updates.items():