Make push sort of work

This commit is contained in:
Mark Haines 2016-04-14 13:30:57 +01:00
parent 1209d3174e
commit f41b1a8723
4 changed files with 96 additions and 8 deletions

View file

@ -24,7 +24,9 @@ from synapse.config.logger import LoggingConfig
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.event_push_actions import SlavedPushActionsStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.storage.engines import create_engine
from synapse.storage import DataStore
from synapse.util.async import sleep
from synapse.util.logcontext import (LoggingContext, preserve_fn)
@ -40,7 +42,7 @@ class SlaveConfig(DatabaseConfig):
def read_config(self, config):
self.replication_url = config["replication_url"]
self.server_name = config["server_name"]
self.use_insecure_ssl_client_just_for_testing_do_not_use = False
self.use_insecure_ssl_client_just_for_testing_do_not_use = True
self.user_agent_suffix = None
self.start_pushers = True
@ -58,9 +60,13 @@ class PusherSlaveConfig(SlaveConfig, LoggingConfig):
class PusherSlaveStore(
SlavedEventStore, SlavedPusherStore, SlavedPushActionsStore
SlavedPushActionsStore,
SlavedEventStore, SlavedPusherStore,
SlavedReceiptsStore
):
pass
update_pusher_last_stream_ordering_and_success = (
DataStore.update_pusher_last_stream_ordering_and_success.__func__
)
class PusherServer(HomeServer):
@ -135,7 +141,6 @@ class PusherServer(HomeServer):
args = store.stream_positions()
args["timeout"] = 30000
result = yield http_client.get_json(replication_url, args=args)
logger.error("FNARG %r", result)
yield store.process_replication(result)
poke_pushers(result)
except:

View file

@ -14,12 +14,17 @@
# limitations under the License.
from ._base import BaseSlavedStore
from .events import SlavedEventStore
from .receipts import SlavedReceiptsStore
from synapse.storage import DataStore
from synapse.storage.event_push_actions import EventPushActionsStore
class SlavedPushActionsStore(BaseSlavedStore):
class SlavedPushActionsStore(SlavedEventStore, SlavedReceiptsStore):
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
get_unread_push_actions_for_user_in_range = (
DataStore.get_unread_push_actions_for_user_in_range.__func__
@ -28,3 +33,19 @@ class SlavedPushActionsStore(BaseSlavedStore):
get_push_action_users_in_range = (
DataStore.get_push_action_users_in_range.__func__
)
def invalidate_caches_for_event(self, event, backfilled, reset_state):
self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
(event.room_id,)
)
super(SlavedPushActionsStore, self).invalidate_caches_for_event(
event, backfilled, reset_state
)
def invalidate_caches_for_receipt(self, user_id, room_id):
self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
(room_id,)
)
super(SlavedPushActionsStore, self).invalidate_caches_for_receipt(
user_id, room_id
)

View file

@ -151,11 +151,11 @@ class SlavedEventStore(BaseSlavedStore):
internal = json.loads(row[1])
event_json = json.loads(row[2])
event = FrozenEvent(event_json, internal_metadata_dict=internal)
self._invalidate_caches_for_event(
self.invalidate_caches_for_event(
event, backfilled, reset_state=position in state_resets
)
def _invalidate_caches_for_event(self, event, backfilled, reset_state):
def invalidate_caches_for_event(self, event, backfilled, reset_state):
if reset_state:
self._get_current_state_for_key.invalidate_all()
self.get_rooms_for_user.invalidate_all()

View file

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
from synapse.storage import DataStore
from synapse.storage.receipts import ReceiptsStore
# So, um, we want to borrow a load of functions intended for reading from
# a DataStore, but we don't want to take functions that either write to the
# DataStore or are cached and don't have cache invalidation logic.
#
# Rather than write duplicate versions of those functions, or lift them to
# a common base class, we going to grab the underlying __func__ object from
# the method descriptor on the DataStore and chuck them into our class.
class SlavedReceiptsStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedReceiptsStore, self).__init__(db_conn, hs)
self._receipts_id_gen = SlavedIdTracker(
db_conn, "receipts_linearized", "stream_id"
)
get_receipts_for_user = ReceiptsStore.__dict__["get_receipts_for_user"]
get_max_receipt_stream_id = DataStore.get_max_receipt_stream_id.__func__
def stream_positions(self):
result = super(SlavedReceiptsStore, self).stream_positions()
result["receipts"] = self._receipts_id_gen.get_current_token()
return result
def process_replication(self, result):
stream = result.get("receipts")
if stream:
self._receipts_id_gen.advance(stream["position"])
for row in stream["rows"]:
room_id = row[1]
user_id = row[3]
self.invalidate_caches_for_receipt(user_id, room_id)
self.get_receipts_for_user.invalidate((user_id,))
return super(SlavedReceiptsStore, self).process_replication(result)
def invalidate_caches_for_receipt(self, user_id, room_id):
self.get_receipts_for_user.invalidate((user_id,))