Merge pull request #1126 from matrix-org/erikj/public_room_cache

Add very basic filter API to /publicRooms
This commit is contained in:
Erik Johnston 2016-09-16 13:17:29 +01:00 committed by GitHub
commit 9e1283c824
7 changed files with 158 additions and 24 deletions

View file

@ -718,11 +718,14 @@ class FederationClient(FederationBase):
raise RuntimeError("Failed to send to any server.")
def get_public_rooms(self, destination, limit=None, since_token=None):
def get_public_rooms(self, destination, limit=None, since_token=None,
search_filter=None):
if destination == self.server_name:
return
return self.transport_layer.get_public_rooms(destination, limit, since_token)
return self.transport_layer.get_public_rooms(
destination, limit, since_token, search_filter
)
@defer.inlineCallbacks
def query_auth(self, destination, room_id, event_id, local_auth):

View file

@ -248,7 +248,8 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def get_public_rooms(self, remote_server, limit, since_token):
def get_public_rooms(self, remote_server, limit, since_token,
search_filter=None):
path = PREFIX + "/publicRooms"
args = {}
@ -257,6 +258,8 @@ class TransportLayerClient(object):
if since_token:
args["since"] = [since_token]
# TODO(erikj): Actually send the search_filter across federation.
response = yield self.client.get_json(
destination=remote_server,
path=path,

View file

@ -38,8 +38,14 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None):
if search_filter:
# We explicitly don't bother caching searches.
return self._get_public_room_list(limit, since_token, search_filter)
def get_local_public_room_list(self, limit=None, since_token=None):
result = self.response_cache.get((limit, since_token))
if not result:
result = self.response_cache.set(
@ -49,7 +55,8 @@ class RoomListHandler(BaseHandler):
return result
@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None):
def _get_public_room_list(self, limit=None, since_token=None,
search_filter=None):
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
else:
@ -115,22 +122,18 @@ class RoomListHandler(BaseHandler):
sorted_rooms = sorted_rooms[:since_token.current_limit]
sorted_rooms.reverse()
new_limit = None
if limit:
if sorted_rooms[limit:]:
new_limit = limit
if since_token:
if since_token.direction_is_forward:
new_limit += since_token.current_limit
else:
new_limit = since_token.current_limit - new_limit
new_limit = max(0, new_limit)
sorted_rooms = sorted_rooms[:limit]
rooms_to_scan = sorted_rooms
if limit and not search_filter:
rooms_to_scan = sorted_rooms[:limit + 1]
chunk = []
@defer.inlineCallbacks
def handle_room(room_id):
if limit and len(chunk) > limit + 1:
# We've already got enough, so lets just drop it.
return
num_joined_users = rooms_to_num_joined[room_id]
if num_joined_users == 0:
return
@ -210,12 +213,37 @@ class RoomListHandler(BaseHandler):
if avatar_url:
result["avatar_url"] = avatar_url
chunk.append(result)
if _matches_room_entry(result, search_filter):
chunk.append(result)
yield concurrently_execute(handle_room, sorted_rooms, 10)
yield concurrently_execute(handle_room, rooms_to_scan, 10)
chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
# Work out the new limit of the batch for pagination, or None if we
# know there are no more results that would be returned.
new_limit = None
if chunk and (not limit or len(chunk) > limit):
if limit:
chunk = chunk[:limit]
addition = 1
if since_token:
addition += since_token.current_limit
if not since_token or since_token.direction_is_forward:
last_room_id = chunk[-1]["room_id"]
else:
last_room_id = chunk[0]["room_id"]
addition *= -1
try:
new_limit = sorted_rooms.index(last_room_id) + addition
if new_limit >= len(sorted_rooms):
new_limit = None
except ValueError:
pass
results = {
"chunk": chunk,
}
@ -253,13 +281,48 @@ class RoomListHandler(BaseHandler):
defer.returnValue(results)
@defer.inlineCallbacks
def get_remote_public_room_list(self, server_name, limit=None, since_token=None):
res = yield self.hs.get_replication_layer().get_public_rooms(
def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
search_filter=None):
if search_filter:
# We currently don't support searching across federation, so we have
# to do it manually without pagination
limit = None
since_token = None
res = yield self._get_remote_list_cached(
server_name, limit=limit, since_token=since_token,
)
if search_filter:
res = {"chunk": [
entry
for entry in list(res.get("chunk", []))
if _matches_room_entry(entry, search_filter)
]}
defer.returnValue(res)
def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
search_filter=None):
repl_layer = self.hs.get_replication_layer()
if search_filter:
# We can't cache when asking for search
return repl_layer.get_public_rooms(
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
)
result = self.remote_response_cache.get((server_name, limit, since_token))
if not result:
result = self.remote_response_cache.set(
(server_name, limit, since_token),
repl_layer.get_public_rooms(
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
)
)
return result
class RoomListNextBatch(namedtuple("RoomListNextBatch", (
"stream_ordering", # stream_ordering of the first public room list
@ -294,3 +357,18 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
return self._replace(
**kwds
)
def _matches_room_entry(room_entry, search_filter):
if search_filter and search_filter.get("generic_search_term", None):
generic_search_term = search_filter["generic_search_term"]
if generic_search_term in room_entry.get("name", ""):
return True
elif generic_search_term in room_entry.get("topic", ""):
return True
elif generic_search_term in room_entry.get("canonical_alias", ""):
return True
else:
return True
return False

View file

@ -62,6 +62,7 @@ class SlavedEventStore(BaseSlavedStore):
)
self.stream_ordering_month_ago = 0
self._stream_order_on_start = self.get_room_max_stream_ordering()
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.

View file

@ -337,6 +337,34 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
defer.returnValue((200, data))
@defer.inlineCallbacks
def on_POST(self, request):
yield self.auth.get_user_by_req(request)
server = parse_string(request, "server", default=None)
content = parse_json_object_from_request(request)
limit = int(content.get("limit", 100))
since_token = content.get("since", None)
search_filter = content.get("filter", None)
handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
server,
limit=limit,
since_token=since_token,
search_filter=search_filter,
)
else:
data = yield handler.get_local_public_room_list(
limit=limit,
since_token=since_token,
search_filter=search_filter,
)
defer.returnValue((200, data))
# TODO: Needs unit testing
class RoomMemberListRestServlet(ClientV1RestServlet):

View file

@ -222,6 +222,8 @@ class DataStore(RoomMemberStore, RoomStore,
self._find_stream_orderings_for_times, 60 * 60 * 1000
)
self._stream_order_on_start = self.get_room_max_stream_ordering()
super(DataStore, self).__init__(hs)
def take_presence_startup_info(self):

View file

@ -348,7 +348,14 @@ class EventFederationStore(SQLBaseStore):
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
stream_ordering = min(last_change, stream_ordering)
# We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a
# try and pin to a stream_ordering from before a restart
last_change = max(self._stream_order_on_start, last_change)
if last_change > self.stream_ordering_month_ago:
stream_ordering = min(last_change, stream_ordering)
return self._get_forward_extremeties_for_room(room_id, stream_ordering)
@ -369,7 +376,7 @@ class EventFederationStore(SQLBaseStore):
INNER JOIN (
SELECT room_id, MAX(stream_ordering) AS stream_ordering
FROM stream_ordering_to_exterm
WHERE stream_ordering < ? GROUP BY room_id
WHERE stream_ordering <= ? GROUP BY room_id
) AS rms USING (room_id, stream_ordering)
WHERE room_id = ?
""")
@ -386,9 +393,21 @@ class EventFederationStore(SQLBaseStore):
def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
# Delete entries older than a month, while making sure we don't delete
# the only entries for a room.
sql = ("""
DELETE FROM stream_ordering_to_exterm
WHERE
(
SELECT max(stream_ordering) AS stream_ordering
FROM stream_ordering_to_exterm
WHERE room_id = stream_ordering_to_exterm.room_id
) > ?
AND stream_ordering < ?
""")
txn.execute(
"DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?",
(self.stream_ordering_month_ago,)
sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
return self.runInteraction(
"_delete_old_forward_extrem_cache",