Implement initial population of users who share rooms table

This commit is contained in:
Erik Johnston 2017-06-15 09:59:04 +01:00
parent ebcd55d641
commit 72613bc379
2 changed files with 193 additions and 9 deletions

View file

@ -14,12 +14,12 @@
# limitations under the License. # limitations under the License.
import logging import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.async import sleep
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -41,12 +41,15 @@ class UserDirectoyHandler(object):
one public room. one public room.
""" """
INITIAL_SLEEP_MS = 50
def __init__(self, hs): def __init__(self, hs):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.server_name = hs.hostname self.server_name = hs.hostname
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.notifier.add_replication_callback(self.notify_new_event) self.notifier.add_replication_callback(self.notify_new_event)
@ -55,6 +58,9 @@ class UserDirectoyHandler(object):
self.initially_handled_users = set() self.initially_handled_users = set()
self.initially_handled_users_in_public = set() self.initially_handled_users_in_public = set()
self.initially_handled_users_share = set()
self.initially_handled_users_share_private_room = set()
# The current position in the current_state_delta stream # The current position in the current_state_delta stream
self.pos = None self.pos = None
@ -140,10 +146,14 @@ class UserDirectoyHandler(object):
logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids)) logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
yield self._handle_intial_room(room_id) yield self._handle_intial_room(room_id)
num_processed_rooms += 1 num_processed_rooms += 1
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
logger.info("Processed all rooms.") logger.info("Processed all rooms.")
self.initially_handled_users = None self.initially_handled_users = None
self.initially_handled_users_in_public = None
self.initially_handled_users_share = None
self.initially_handled_users_share_private_room = None
yield self.store.update_user_directory_stream_pos(new_pos) yield self.store.update_user_directory_stream_pos(new_pos)
@ -158,7 +168,8 @@ class UserDirectoyHandler(object):
is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id) is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
users_with_profile = yield self.state.get_current_user_in_room(room_id) users_with_profile = yield self.state.get_current_user_in_room(room_id)
unhandled_users = set(users_with_profile) - self.initially_handled_users user_ids = set(users_with_profile)
unhandled_users = user_ids - self.initially_handled_users
yield self.store.add_profiles_to_user_dir( yield self.store.add_profiles_to_user_dir(
room_id, { room_id, {
@ -175,6 +186,69 @@ class UserDirectoyHandler(object):
) )
self.initially_handled_users_in_public != unhandled_users self.initially_handled_users_in_public != unhandled_users
# We now go and figure out the new users who share rooms with user entries
# We sleep aggressively here as otherwise it can starve resources.
# We also batch up inserts/updates, but try to avoid too many at once.
to_insert = set()
to_update = set()
count = 0
for user_id in user_ids:
if count % 100 == 0:
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
continue
for other_user_id in user_ids:
if user_id == other_user_id:
continue
if count % 100 == 0:
yield sleep(self.INITIAL_SLEEP_MS / 1000.)
count += 1
user_set = (user_id, other_user_id)
if user_set in self.initially_handled_users_share_private_room:
continue
if user_set in self.initially_handled_users_share:
if is_public:
continue
to_update.add(user_set)
else:
to_insert.add(user_set)
if is_public:
self.initially_handled_users_share.add(user_set)
else:
self.initially_handled_users_share_private_room.add(user_set)
if len(to_insert) > 100:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert,
)
to_insert.clear()
if len(to_update) > 100:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update,
)
to_update.clear()
if to_insert:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert,
)
to_insert.clear()
if to_update:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update,
)
to_update.clear()
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_deltas(self, deltas): def _handle_deltas(self, deltas):
"""Called with the state deltas to process """Called with the state deltas to process

View file

@ -16,16 +16,19 @@
from twisted.internet import defer from twisted.internet import defer
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.api.constants import EventTypes, JoinRules from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id from synapse.types import get_domain_from_id, get_localpart_from_id
import re import re
import logging
logger = logging.getLogger(__name__)
class UserDirectoryStore(SQLBaseStore): class UserDirectoryStore(SQLBaseStore):
@cachedInlineCallbacks(cache_context=True) @cachedInlineCallbacks(cache_context=True)
def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context): def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context):
"""Check if the room is either world_readable or publically joinable """Check if the room is either world_readable or publically joinable
@ -281,14 +284,118 @@ class UserDirectoryStore(SQLBaseStore):
desc="get_users_in_dir_due_to_room", desc="get_users_in_dir_due_to_room",
) )
@defer.inlineCallbacks
def get_all_rooms(self): def get_all_rooms(self):
"""Get all room_ids we've ever known about """Get all room_ids we've ever known about, in ascending order of "size"
""" """
return self._simple_select_onecol( sql = """
table="current_state_events", SELECT room_id FROM current_state_events
keyvalues={}, GROUP BY room_id
retcol="DISTINCT room_id", ORDER BY count(*) ASC
desc="get_all_rooms", """
rows = yield self._execute("get_all_rooms", None, sql)
defer.returnValue([room_id for room_id, in rows])
def add_users_who_share_room(self, room_id, share_private, user_id_tuples):
"""Insert entries into the users_who_share_rooms table. The first
user should be a local user.
Args:
room_id (str)
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
def _add_users_who_share_room_txn(txn):
self._simple_insert_many_txn(
txn,
table="users_who_share_rooms",
values=[
{
"user_id": user_id,
"other_user_id": other_user_id,
"room_id": room_id,
"share_private": share_private,
}
for user_id, other_user_id in user_id_tuples
],
)
for user_id, other_user_id in user_id_tuples:
txn.call_after(
self.get_users_who_share_room_from_dir.invalidate,
(user_id,),
)
txn.call_after(
self.get_if_users_share_a_room.invalidate,
(user_id, other_user_id),
)
return self.runInteraction(
"add_users_who_share_room", _add_users_who_share_room_txn
)
def update_users_who_share_room(self, room_id, share_private, user_id_sets):
"""Updates entries in the users_who_share_rooms table. The first
user should be a local user.
Args:
room_id (str)
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
def _update_users_who_share_room_txn(txn):
sql = """
UPDATE users_who_share_rooms
SET room_id = ?, share_private = ?
WHERE user_id = ? AND other_user_id = ?
"""
txn.executemany(
sql,
(
(room_id, share_private, uid, oid)
for uid, oid in user_id_sets
)
)
for user_id, other_user_id in user_id_sets:
txn.call_after(
self.get_users_who_share_room_from_dir.invalidate,
(user_id,),
)
txn.call_after(
self.get_if_users_share_a_room.invalidate,
(user_id, other_user_id),
)
return self.runInteraction(
"update_users_who_share_room", _update_users_who_share_room_txn
)
def remove_user_who_share_room(self, user_id, other_user_id):
"""Deletes entries in the users_who_share_rooms table. The first
user should be a local user.
Args:
room_id (str)
share_private (bool): Is the room private
user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
"""
def _remove_user_who_share_room_txn(txn):
self._simple_delete_txn(
txn,
table="users_who_share_rooms",
keyvalues={
"user_id": user_id,
"other_user_id": other_user_id,
},
)
txn.call_after(
self.get_users_who_share_room_from_dir.invalidate,
(user_id,),
)
txn.call_after(
self.get_if_users_share_a_room.invalidate,
(user_id, other_user_id),
)
return self.runInteraction(
"remove_user_who_share_room", _remove_user_who_share_room_txn
) )
def delete_all_from_user_dir(self): def delete_all_from_user_dir(self):
@ -298,8 +405,11 @@ class UserDirectoryStore(SQLBaseStore):
txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory")
txn.execute("DELETE FROM user_directory_search") txn.execute("DELETE FROM user_directory_search")
txn.execute("DELETE FROM users_in_pubic_room") txn.execute("DELETE FROM users_in_pubic_room")
txn.execute("DELETE FROM users_who_share_rooms")
txn.call_after(self.get_user_in_directory.invalidate_all) txn.call_after(self.get_user_in_directory.invalidate_all)
txn.call_after(self.get_user_in_public_room.invalidate_all) txn.call_after(self.get_user_in_public_room.invalidate_all)
txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all)
txn.call_after(self.get_if_users_share_a_room.invalidate_all)
return self.runInteraction( return self.runInteraction(
"delete_all_from_user_dir", _delete_all_from_user_dir_txn "delete_all_from_user_dir", _delete_all_from_user_dir_txn
) )