diff --git a/CHANGES.rst b/CHANGES.rst index 8c180750ad..e77b31b583 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,89 @@ +Changes in synapse v0.16.0-rc1 (2016-06-03) +=========================================== + +Version 0.15 was not released. See v0.15.0-rc1 below for additional changes. + +Features: + +* Add email notifications for missed messages (PR #759, #786, #799, #810, #815, + #821) +* Add a ``url_preview_ip_range_whitelist`` config param (PR #760) +* Add /report endpoint (PR #762) +* Add basic ignore user API (PR #763) +* Add an openidish mechanism for proving that you own a given user_id (PR #765) +* Allow clients to specify a server_name to avoid 'No known servers' (PR #794) +* Add secondary_directory_servers option to fetch room list from other servers + (PR #808, #813) + +Changes: + +* Report per request metrics for all of the things using request_handler (PR + #756) +* Correctly handle ``NULL`` password hashes from the database (PR #775) +* Allow receipts for events we haven't seen in the db (PR #784) +* Make synctl read a cache factor from config file (PR #785) +* Increment badge count per missed convo, not per msg (PR #793) +* Special case m.room.third_party_invite event auth to match invites (PR #814) + + +Bug fixes: + +* Fix typo in event_auth servlet path (PR #757) +* Fix password reset (PR #758) + + +Performance improvements: + +* Reduce database inserts when sending transactions (PR #767) +* Queue events by room for persistence (PR #768) +* Add cache to ``get_user_by_id`` (PR #772) +* Add and use ``get_domain_from_id`` (PR #773) +* Use tree cache for ``get_linearized_receipts_for_room`` (PR #779) +* Remove unused indices (PR #782) +* Add caches to ``bulk_get_push_rules*`` (PR #804) +* Cache ``get_event_reference_hashes`` (PR #806) +* Add ``get_users_with_read_receipts_in_room`` cache (PR #809) +* Use state to calculate ``get_users_in_room`` (PR #811) +* Load push rules in storage layer so that they get cached (PR #825) +* Make ``get_joined_hosts_for_room`` use get_users_in_room (PR #828) +* Poke notifier on next reactor tick (PR #829) +* Change CacheMetrics to be quicker (PR #830) + + +Changes in synapse v0.15.0-rc1 (2016-04-26) +=========================================== + +Features: + +* Add login support for Javascript Web Tokens, thanks to Niklas Riekenbrauck + (PR #671,#687) +* Add URL previewing support (PR #688) +* Add login support for LDAP, thanks to Christoph Witzany (PR #701) +* Add GET endpoint for pushers (PR #716) + +Changes: + +* Never notify for member events (PR #667) +* Deduplicate identical ``/sync`` requests (PR #668) +* Require user to have left room to forget room (PR #673) +* Use DNS cache if within TTL (PR #677) +* Let users see their own leave events (PR #699) +* Deduplicate membership changes (PR #700) +* Increase performance of pusher code (PR #705) +* Respond with error status 504 if failed to talk to remote server (PR #731) +* Increase search performance on postgres (PR #745) + +Bug fixes: + +* Fix bug where disabling all notifications still resulted in push (PR #678) +* Fix bug where users couldn't reject remote invites if remote refused (PR #691) +* Fix bug where synapse attempted to backfill from itself (PR #693) +* Fix bug where profile information was not correctly added when joining remote + rooms (PR #703) +* Fix bug where register API required incorrect key name for AS registration + (PR #727) + + Changes in synapse v0.14.0 (2016-03-30) ======================================= @@ -511,7 +597,7 @@ Configuration: * Add support for changing the bind host of the metrics listener via the ``metrics_bind_host`` option. - + Changes in synapse v0.9.0-r5 (2015-05-21) ========================================= @@ -853,7 +939,7 @@ See UPGRADE for information about changes to the client server API, including breaking backwards compatibility with VoIP calls and registration API. Homeserver: - * When a user changes their displayname or avatar the server will now update + * When a user changes their displayname or avatar the server will now update all their join states to reflect this. * The server now adds "age" key to events to indicate how old they are. This is clock independent, so at no point does any server or webclient have to @@ -911,7 +997,7 @@ Changes in synapse 0.2.2 (2014-09-06) ===================================== Homeserver: - * When the server returns state events it now also includes the previous + * When the server returns state events it now also includes the previous content. * Add support for inviting people when creating a new room. * Make the homeserver inform the room via `m.room.aliases` when a new alias @@ -923,7 +1009,7 @@ Webclient: * Handle `m.room.aliases` events. * Asynchronously send messages and show a local echo. * Inform the UI when a message failed to send. - * Only autoscroll on receiving a new message if the user was already at the + * Only autoscroll on receiving a new message if the user was already at the bottom of the screen. * Add support for ban/kick reasons. diff --git a/synapse/__init__.py b/synapse/__init__.py index 7de51fbe8d..3b290db79f 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.14.0" +__version__ = "0.16.0-rc1" diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py new file mode 100644 index 0000000000..f4b416f777 --- /dev/null +++ b/synapse/app/synchrotron.py @@ -0,0 +1,510 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.api.constants import EventTypes, PresenceState +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.config.appservice import AppServiceConfig +from synapse.events import FrozenEvent +from synapse.handlers.presence import PresenceHandler +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.server import HomeServer +from synapse.storage.client_ips import ClientIpStore +from synapse.storage.engines import create_engine +from synapse.storage.presence import UserPresenceState +from synapse.storage.roommember import RoomMemberStore +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.stringutils import random_string +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import contextlib +import ujson as json + +logger = logging.getLogger("synapse.app.synchrotron") + + +class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) + self.macaroon_secret_key = config["macaroon_secret_key"] + self.expire_access_token = config.get("expire_access_token", False) + + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("synchroton.pid") + return """\ + # Slave configuration + + # The replication listener on the synapse to talk to. + #replication_url: https://localhost:{replication_port}/_synapse/replication + + server_name: "%(server_name)s" + + listeners: + # Enable a /sync listener on the synchrontron + #- type: http + # port: {http_port} + # bind_address: "" + # Enable a ssh manhole listener on the synchrotron + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Enable a metric listener on the synchrotron + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"] + # compress: False + + report_stats: False + + daemonize: False + + pid_file: %(pid_file)s + """ % locals() + + +class SynchrotronSlavedStore( + SlavedPushRuleStore, + SlavedEventStore, + SlavedReceiptsStore, + SlavedAccountDataStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + SlavedFilteringStore, + SlavedPresenceStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStre because the constructor is different +): + def get_presence_list_accepted(self, user_localpart): + return () + + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + +UPDATE_SYNCING_USERS_MS = 10 * 1000 + + +class SynchrotronPresence(object): + def __init__(self, hs): + self.http_client = hs.get_simple_http_client() + self.store = hs.get_datastore() + self.user_to_num_current_syncs = {} + self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.clock = hs.get_clock() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = { + state.user_id: state + for state in active_presence + } + + self.process_id = random_string(16) + logger.info("Presence process_id is %r", self.process_id) + + self._sending_sync = False + self._need_to_send_sync = False + self.clock.looping_call( + self._send_syncing_users_regularly, + UPDATE_SYNCING_USERS_MS, + ) + + reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + + def set_state(self, user, state): + # TODO Hows this supposed to work? + pass + + get_states = PresenceHandler.get_states.__func__ + current_state_for_users = PresenceHandler.current_state_for_users.__func__ + + @defer.inlineCallbacks + def user_syncing(self, user_id, affect_presence): + if affect_presence: + curr_sync = self.user_to_num_current_syncs.get(user_id, 0) + self.user_to_num_current_syncs[user_id] = curr_sync + 1 + prev_states = yield self.current_state_for_users([user_id]) + if prev_states[user_id].state == PresenceState.OFFLINE: + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users_now() + + def _end(): + if affect_presence: + self.user_to_num_current_syncs[user_id] -= 1 + + @contextlib.contextmanager + def _user_syncing(): + try: + yield + finally: + _end() + + defer.returnValue(_user_syncing()) + + @defer.inlineCallbacks + def _on_shutdown(self): + # When the synchrotron is shutdown tell the master to clear the in + # progress syncs for this process + self.user_to_num_current_syncs.clear() + yield self._send_syncing_users_now() + + def _send_syncing_users_regularly(self): + # Only send an update if we aren't in the middle of sending one. + if not self._sending_sync: + preserve_fn(self._send_syncing_users_now)() + + @defer.inlineCallbacks + def _send_syncing_users_now(self): + if self._sending_sync: + # We don't want to race with sending another update. + # Instead we wait for that update to finish and send another + # update afterwards. + self._need_to_send_sync = True + return + + # Flag that we are sending an update. + self._sending_sync = True + + yield self.http_client.post_json_get_json(self.syncing_users_url, { + "process_id": self.process_id, + "syncing_users": [ + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count > 0 + ], + }) + + # Unset the flag as we are no longer sending an update. + self._sending_sync = False + if self._need_to_send_sync: + # If something happened while we were sending the update then + # we might need to send another update. + # TODO: Check if the update that was sent matches the current state + # as we only need to send an update if they are different. + self._need_to_send_sync = False + yield self._send_syncing_users_now() + + def process_replication(self, result): + stream = result.get("presence", {"rows": []}) + for row in stream["rows"]: + ( + position, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) = row + self.user_to_current_state[user_id] = UserPresenceState( + user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) + + +class SynchrotronTyping(object): + def __init__(self, hs): + self._latest_room_serial = 0 + self._room_serials = {} + self._room_typing = {} + + def stream_positions(self): + return {"typing": self._latest_room_serial} + + def process_replication(self, result): + stream = result.get("typing") + if stream: + self._latest_room_serial = int(stream["position"]) + + for row in stream["rows"]: + position, room_id, typing_json = row + typing = json.loads(typing_json) + self._room_serials[room_id] = position + self._room_typing[room_id] = typing + + +class SynchrotronApplicationService(object): + def notify_interested_services(self, event): + pass + + +class SynchrotronServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + sync.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse synchrotron now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + clock = self.get_clock() + notifier = self.get_notifier() + presence_handler = self.get_presence_handler() + typing_handler = self.get_typing_handler() + + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + def notify_from_stream( + result, stream_name, stream_key, room=None, user=None + ): + stream = result.get(stream_name) + if stream: + position_index = stream["field_names"].index("position") + if room: + room_index = stream["field_names"].index(room) + if user: + user_index = stream["field_names"].index(user) + + users = () + rooms = () + for row in stream["rows"]: + position = row[position_index] + + if user: + users = (row[user_index],) + + if room: + rooms = (row[room_index],) + + notifier.on_new_event( + stream_key, position, users=users, rooms=rooms + ) + + def notify(result): + stream = result.get("events") + if stream: + max_position = stream["position"] + for row in stream["rows"]: + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + event = FrozenEvent(event_json, internal_metadata_dict=internal) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + notifier.on_new_room_event( + event, position, max_position, extra_users + ) + + notify_from_stream( + result, "push_rules", "push_rules_key", user="user_id" + ) + notify_from_stream( + result, "user_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "room_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "tag_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "receipts", "receipt_key", room="room_id" + ) + notify_from_stream( + result, "typing", "typing_key", room="room_id" + ) + + next_expire_broken_caches_ms = 0 + while True: + try: + args = store.stream_positions() + args.update(typing_handler.stream_positions()) + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) + yield store.process_replication(result) + typing_handler.process_replication(result) + presence_handler.process_replication(result) + notify(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(5) + + def build_presence_handler(self): + return SynchrotronPresence(self) + + def build_typing_handler(self): + return SynchrotronTyping(self) + + +def setup(config_options): + try: + config = SynchrotronConfig.load_config( + "Synapse synchrotron", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + if not config: + sys.exit(0) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ss = SynchrotronServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + application_service_handler=SynchrotronApplicationService(), + ) + + ss.setup() + ss.start_listening() + + change_resource_limit(ss.config.soft_file_limit) + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + return ss + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + + if ps.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ps.config.soft_file_limit) + reactor.run() + + daemon = Daemonize( + app="synapse-pusher", + pid=ps.config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + + daemon.start() + else: + reactor.run() diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 88402e42a6..e5c3929cd7 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -186,7 +186,7 @@ class Mailer(object): multipart_msg = MIMEMultipart('alternative') multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text) - multipart_msg['From'] = self.hs.config.email_notif_from + multipart_msg['From'] = from_string multipart_msg['To'] = email_address multipart_msg['Date'] = email.utils.formatdate() multipart_msg['Message-ID'] = email.utils.make_msgid() diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6928a213e8..e93c3de66c 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) -from ._base import Cache, LoggingTransaction +from ._base import LoggingTransaction from .directory import DirectoryStore from .events import EventsStore from .presence import PresenceStore, UserPresenceState @@ -45,6 +45,7 @@ from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore from .openid import OpenIdStore +from .client_ips import ClientIpStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -58,12 +59,6 @@ import logging logger = logging.getLogger(__name__) -# Number of msec of granularity to store the user IP 'last seen' time. Smaller -# times give more inserts into the database even for readonly API hits -# 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 - - class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, PresenceStore, TransactionStore, @@ -84,6 +79,7 @@ class DataStore(RoomMemberStore, RoomStore, AccountDataStore, EventPushActionsStore, OpenIdStore, + ClientIpStore, ): def __init__(self, db_conn, hs): @@ -91,11 +87,6 @@ class DataStore(RoomMemberStore, RoomStore, self._clock = hs.get_clock() self.database_engine = hs.database_engine - self.client_ip_last_seen = Cache( - name="client_ip_last_seen", - keylen=4, - ) - self._stream_id_gen = StreamIdGenerator( db_conn, "events", "stream_ordering", extra_tables=[("local_invites", "stream_id")] @@ -216,39 +207,6 @@ class DataStore(RoomMemberStore, RoomStore, return [UserPresenceState(**row) for row in rows] - @defer.inlineCallbacks - def insert_client_ip(self, user, access_token, ip, user_agent): - now = int(self._clock.time_msec()) - key = (user.to_string(), access_token, ip) - - try: - last_seen = self.client_ip_last_seen.get(key) - except KeyError: - last_seen = None - - # Rate-limited inserts - if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: - defer.returnValue(None) - - self.client_ip_last_seen.prefill(key, now) - - # It's safe not to lock here: a) no unique constraint, - # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely - yield self._simple_upsert( - "user_ips", - keyvalues={ - "user_id": user.to_string(), - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - }, - values={ - "last_seen": now, - }, - desc="insert_client_ip", - lock=False, - ) - @defer.inlineCallbacks def count_daily_users(self): """ diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py new file mode 100644 index 0000000000..a90990e006 --- /dev/null +++ b/synapse/storage/client_ips.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore, Cache + +from twisted.internet import defer + + +# Number of msec of granularity to store the user IP 'last seen' time. Smaller +# times give more inserts into the database even for readonly API hits +# 120 seconds == 2 minutes +LAST_SEEN_GRANULARITY = 120 * 1000 + + +class ClientIpStore(SQLBaseStore): + + def __init__(self, hs): + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, + ) + + super(ClientIpStore, self).__init__(hs) + + @defer.inlineCallbacks + def insert_client_ip(self, user, access_token, ip, user_agent): + now = int(self._clock.time_msec()) + key = (user.to_string(), access_token, ip) + + try: + last_seen = self.client_ip_last_seen.get(key) + except KeyError: + last_seen = None + + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + defer.returnValue(None) + + self.client_ip_last_seen.prefill(key, now) + + # It's safe not to lock here: a) no unique constraint, + # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely + yield self._simple_upsert( + "user_ips", + keyvalues={ + "user_id": user.to_string(), + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + }, + values={ + "last_seen": now, + }, + desc="insert_client_ip", + lock=False, + )