Fix typing being reset causing infinite syncs (#4127)

This commit is contained in:
Amber Brown 2018-11-03 00:19:23 +11:00 committed by GitHub
parent efb9343c8c
commit cb7a6b2379
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 155 additions and 5 deletions

1
changelog.d/4127.bugfix Normal file
View file

@ -0,0 +1 @@
If the typing stream ID goes backwards (as on a worker when the master restarts), the worker's typing handler will no longer erroneously report rooms containing new typing events.

View file

@ -226,7 +226,15 @@ class SynchrotronPresence(object):
class SynchrotronTyping(object): class SynchrotronTyping(object):
def __init__(self, hs): def __init__(self, hs):
self._latest_room_serial = 0 self._latest_room_serial = 0
self._reset()
def _reset(self):
"""
Reset the typing handler's data caches.
"""
# map room IDs to serial numbers
self._room_serials = {} self._room_serials = {}
# map room IDs to sets of users currently typing
self._room_typing = {} self._room_typing = {}
def stream_positions(self): def stream_positions(self):
@ -236,6 +244,12 @@ class SynchrotronTyping(object):
return {"typing": self._latest_room_serial} return {"typing": self._latest_room_serial}
def process_replication_rows(self, token, rows): def process_replication_rows(self, token, rows):
if self._latest_room_serial > token:
# The master has gone backwards. To prevent inconsistent data, just
# clear everything.
self._reset()
# Set the latest serial token to whatever the server gave us.
self._latest_room_serial = token self._latest_room_serial = token
for row in rows: for row in rows:

View file

@ -63,11 +63,8 @@ class TypingHandler(object):
self._member_typing_until = {} # clock time we expect to stop self._member_typing_until = {} # clock time we expect to stop
self._member_last_federation_poke = {} self._member_last_federation_poke = {}
# map room IDs to serial numbers
self._room_serials = {}
self._latest_room_serial = 0 self._latest_room_serial = 0
# map room IDs to sets of users currently typing self._reset()
self._room_typing = {}
# caches which room_ids changed at which serials # caches which room_ids changed at which serials
self._typing_stream_change_cache = StreamChangeCache( self._typing_stream_change_cache = StreamChangeCache(
@ -79,6 +76,15 @@ class TypingHandler(object):
5000, 5000,
) )
def _reset(self):
"""
Reset the typing handler's data caches.
"""
# map room IDs to serial numbers
self._room_serials = {}
# map room IDs to sets of users currently typing
self._room_typing = {}
def _handle_timeouts(self): def _handle_timeouts(self):
logger.info("Checking for typing timeouts") logger.info("Checking for typing timeouts")

View file

@ -15,9 +15,11 @@
from mock import Mock from mock import Mock
from synapse.rest.client.v1 import admin, login, room
from synapse.rest.client.v2_alpha import sync from synapse.rest.client.v2_alpha import sync
from tests import unittest from tests import unittest
from tests.server import TimedOutException
class FilterTestCase(unittest.HomeserverTestCase): class FilterTestCase(unittest.HomeserverTestCase):
@ -65,3 +67,124 @@ class FilterTestCase(unittest.HomeserverTestCase):
["next_batch", "rooms", "account_data", "to_device", "device_lists"] ["next_batch", "rooms", "account_data", "to_device", "device_lists"]
).issubset(set(channel.json_body.keys())) ).issubset(set(channel.json_body.keys()))
) )
class SyncTypingTests(unittest.HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
sync.register_servlets,
]
user_id = True
hijack_auth = False
def test_sync_backwards_typing(self):
"""
If the typing serial goes backwards and the typing handler is then reset
(such as when the master restarts and sets the typing serial to 0), we
do not incorrectly return typing information that had a serial greater
than the now-reset serial.
"""
typing_url = "/rooms/%s/typing/%s?access_token=%s"
sync_url = "/sync?timeout=3000000&access_token=%s&since=%s"
# Register the user who gets notified
user_id = self.register_user("user", "pass")
access_token = self.login("user", "pass")
# Register the user who sends the message
other_user_id = self.register_user("otheruser", "pass")
other_access_token = self.login("otheruser", "pass")
# Create a room
room = self.helper.create_room_as(user_id, tok=access_token)
# Invite the other person
self.helper.invite(room=room, src=user_id, tok=access_token, targ=other_user_id)
# The other user joins
self.helper.join(room=room, user=other_user_id, tok=other_access_token)
# The other user sends some messages
self.helper.send(room, body="Hi!", tok=other_access_token)
self.helper.send(room, body="There!", tok=other_access_token)
# Start typing.
request, channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.render(request)
self.assertEquals(200, channel.code)
request, channel = self.make_request(
"GET", "/sync?access_token=%s" % (access_token,)
)
self.render(request)
self.assertEquals(200, channel.code)
next_batch = channel.json_body["next_batch"]
# Stop typing.
request, channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": false}',
)
self.render(request)
self.assertEquals(200, channel.code)
# Start typing.
request, channel = self.make_request(
"PUT",
typing_url % (room, other_user_id, other_access_token),
b'{"typing": true, "timeout": 30000}',
)
self.render(request)
self.assertEquals(200, channel.code)
# Should return immediately
request, channel = self.make_request(
"GET", sync_url % (access_token, next_batch)
)
self.render(request)
self.assertEquals(200, channel.code)
next_batch = channel.json_body["next_batch"]
# Reset typing serial back to 0, as if the master had.
typing = self.hs.get_typing_handler()
typing._latest_room_serial = 0
# Since it checks the state token, we need some state to update to
# invalidate the stream token.
self.helper.send(room, body="There!", tok=other_access_token)
request, channel = self.make_request(
"GET", sync_url % (access_token, next_batch)
)
self.render(request)
self.assertEquals(200, channel.code)
next_batch = channel.json_body["next_batch"]
# This should time out! But it does not, because our stream token is
# ahead, and therefore it's saying the typing (that we've actually
# already seen) is new, since it's got a token above our new, now-reset
# stream token.
request, channel = self.make_request(
"GET", sync_url % (access_token, next_batch)
)
self.render(request)
self.assertEquals(200, channel.code)
next_batch = channel.json_body["next_batch"]
# Clear the typing information, so that it doesn't think everything is
# in the future.
typing._reset()
# Now it SHOULD fail as it never completes!
request, channel = self.make_request(
"GET", sync_url % (access_token, next_batch)
)
self.assertRaises(TimedOutException, self.render, request)

View file

@ -21,6 +21,12 @@ from synapse.util import Clock
from tests.utils import setup_test_homeserver as _sth from tests.utils import setup_test_homeserver as _sth
class TimedOutException(Exception):
"""
A web query timed out.
"""
@attr.s @attr.s
class FakeChannel(object): class FakeChannel(object):
""" """
@ -153,7 +159,7 @@ def wait_until_result(clock, request, timeout=100):
x += 1 x += 1
if x > timeout: if x > timeout:
raise Exception("Timed out waiting for request to finish.") raise TimedOutException("Timed out waiting for request to finish.")
clock.advance(0.1) clock.advance(0.1)