diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 15b7898a45..adc1eb1d0b 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -293,7 +293,7 @@ class ReplicationResource(Resource): push_rules, current_position, limit ) writer.write_header_and_rows("push_rules", rows, ( - "position", "stream_ordering", "user_id", "rule_id", "op", + "position", "event_stream_ordering", "user_id", "rule_id", "op", "priority_class", "priority", "conditions", "actions" )) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 57e1ca5509..9dbad2fd5f 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -106,24 +106,25 @@ class PushRuleStore(SQLBaseStore): ): conditions_json = json.dumps(conditions) actions_json = json.dumps(actions) - with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids if before or after: yield self.runInteraction( "_add_push_rule_relative_txn", self._add_push_rule_relative_txn, - stream_id, stream_ordering, user_id, rule_id, priority_class, + stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json, before, after, ) else: yield self.runInteraction( "_add_push_rule_highest_priority_txn", self._add_push_rule_highest_priority_txn, - stream_id, stream_ordering, user_id, rule_id, priority_class, + stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json, ) def _add_push_rule_relative_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json, before, after ): # Lock the table since otherwise we'll have annoying races between the @@ -175,12 +176,12 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_id, priority_class, new_rule_priority)) self._upsert_push_rule_txn( - txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, new_rule_priority, conditions_json, actions_json, ) def _add_push_rule_highest_priority_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, conditions_json, actions_json ): # Lock the table since otherwise we'll have annoying races between the @@ -202,12 +203,12 @@ class PushRuleStore(SQLBaseStore): self._upsert_push_rule_txn( txn, - stream_id, stream_ordering, user_id, rule_id, priority_class, new_prio, + stream_id, event_stream_ordering, user_id, rule_id, priority_class, new_prio, conditions_json, actions_json, ) def _upsert_push_rule_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, priority_class, + self, txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, priority, conditions_json, actions_json, update_stream=True ): """Specialised version of _simple_upsert_txn that picks a push_rule_id @@ -245,7 +246,7 @@ class PushRuleStore(SQLBaseStore): if update_stream: self._insert_push_rules_update_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, op="ADD", data={ "priority_class": priority_class, @@ -266,7 +267,7 @@ class PushRuleStore(SQLBaseStore): user_id (str): The matrix ID of the push rule owner rule_id (str): The rule_id of the rule to be deleted """ - def delete_push_rule_txn(txn, stream_id, stream_ordering): + def delete_push_rule_txn(txn, stream_id, event_stream_ordering): self._simple_delete_one_txn( txn, "push_rules", @@ -274,26 +275,28 @@ class PushRuleStore(SQLBaseStore): ) self._insert_push_rules_update_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE" ) - with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids yield self.runInteraction( - "delete_push_rule", delete_push_rule_txn, stream_id, stream_ordering + "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering ) @defer.inlineCallbacks def set_push_rule_enabled(self, user_id, rule_id, enabled): - with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids yield self.runInteraction( "_set_push_rule_enabled_txn", self._set_push_rule_enabled_txn, - stream_id, stream_ordering, user_id, rule_id, enabled + stream_id, event_stream_ordering, user_id, rule_id, enabled ) def _set_push_rule_enabled_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, enabled + self, txn, stream_id, event_stream_ordering, user_id, rule_id, enabled ): new_id = self._push_rules_enable_id_gen.get_next() self._simple_upsert_txn( @@ -305,7 +308,7 @@ class PushRuleStore(SQLBaseStore): ) self._insert_push_rules_update_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, op="ENABLE" if enabled else "DISABLE" ) @@ -313,14 +316,14 @@ class PushRuleStore(SQLBaseStore): def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): actions_json = json.dumps(actions) - def set_push_rule_actions_txn(txn, stream_id, stream_ordering): + def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering): if is_default_rule: # Add a dummy rule to the rules table with the user specified # actions. priority_class = -1 priority = 1 self._upsert_push_rule_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, priority_class, priority, "[]", actions_json, update_stream=False ) @@ -333,22 +336,23 @@ class PushRuleStore(SQLBaseStore): ) self._insert_push_rules_update_txn( - txn, stream_id, stream_ordering, user_id, rule_id, + txn, stream_id, event_stream_ordering, user_id, rule_id, op="ACTIONS", data={"actions": actions_json} ) - with self._push_rules_stream_id_gen.get_next() as (stream_id, stream_ordering): + with self._push_rules_stream_id_gen.get_next() as ids: + stream_id, event_stream_ordering = ids yield self.runInteraction( "set_push_rule_actions", set_push_rule_actions_txn, - stream_id, stream_ordering + stream_id, event_stream_ordering ) def _insert_push_rules_update_txn( - self, txn, stream_id, stream_ordering, user_id, rule_id, op, data=None + self, txn, stream_id, event_stream_ordering, user_id, rule_id, op, data=None ): values = { "stream_id": stream_id, - "stream_ordering": stream_ordering, + "event_stream_ordering": event_stream_ordering, "user_id": user_id, "rule_id": rule_id, "op": op, @@ -372,7 +376,7 @@ class PushRuleStore(SQLBaseStore): """Get all the push rules changes that have happend on the server""" def get_all_push_rule_updates_txn(txn): sql = ( - "SELECT stream_id, stream_ordering, user_id, rule_id," + "SELECT stream_id, event_stream_ordering, user_id, rule_id," " op, priority_class, priority, conditions, actions" " FROM push_rules_stream" " WHERE ? < stream_id AND stream_id <= ?" diff --git a/synapse/storage/schema/delta/30/push_rule_stream.sql b/synapse/storage/schema/delta/30/push_rule_stream.sql index e8418bb35f..735aa8d5f6 100644 --- a/synapse/storage/schema/delta/30/push_rule_stream.sql +++ b/synapse/storage/schema/delta/30/push_rule_stream.sql @@ -17,7 +17,7 @@ CREATE TABLE push_rules_stream( stream_id BIGINT NOT NULL, - stream_ordering BIGINT NOT NULL, + event_stream_ordering BIGINT NOT NULL, user_id TEXT NOT NULL, rule_id TEXT NOT NULL, op TEXT NOT NULL, -- One of "ENABLE", "DISABLE", "ACTIONS", "ADD", "DELETE"