Remove code which updates application_services_state.last_txn (#12680)

This column is unused as of #12209, so let's stop writing to it.
This commit is contained in:
Richard van der Hoff 2022-05-17 12:07:18 +02:00 committed by GitHub
parent a34a41f135
commit 24b590de32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 35 additions and 55 deletions

1
changelog.d/12680.misc Normal file
View file

@ -0,0 +1 @@
Remove code which updates unused database column `application_services_state.last_txn`.

View file

@ -203,19 +203,29 @@ class ApplicationServiceTransactionWorkerStore(
"""Get the application service state. """Get the application service state.
Args: Args:
service: The service whose state to set. service: The service whose state to get.
Returns: Returns:
An ApplicationServiceState or none. An ApplicationServiceState, or None if we have yet to attempt any
transactions to the AS.
""" """
result = await self.db_pool.simple_select_one( # if we have created transactions for this AS but not yet attempted to send
# them, we will have a row in the table with state=NULL (recording the stream
# positions we have processed up to).
#
# On the other hand, if we have yet to create any transactions for this AS at
# all, then there will be no row for the AS.
#
# In either case, we return None to indicate "we don't yet know the state of
# this AS".
result = await self.db_pool.simple_select_one_onecol(
"application_services_state", "application_services_state",
{"as_id": service.id}, {"as_id": service.id},
["state"], retcol="state",
allow_none=True, allow_none=True,
desc="get_appservice_state", desc="get_appservice_state",
) )
if result: if result:
return ApplicationServiceState(result.get("state")) return ApplicationServiceState(result)
return None return None
async def set_appservice_state( async def set_appservice_state(
@ -296,14 +306,6 @@ class ApplicationServiceTransactionWorkerStore(
""" """
def _complete_appservice_txn(txn: LoggingTransaction) -> None: def _complete_appservice_txn(txn: LoggingTransaction) -> None:
# Set current txn_id for AS to 'txn_id'
self.db_pool.simple_upsert_txn(
txn,
"application_services_state",
{"as_id": service.id},
{"last_txn": txn_id},
)
# Delete txn # Delete txn
self.db_pool.simple_delete_txn( self.db_pool.simple_delete_txn(
txn, txn,
@ -452,16 +454,15 @@ class ApplicationServiceTransactionWorkerStore(
% (stream_type,) % (stream_type,)
) )
def set_appservice_stream_type_pos_txn(txn: LoggingTransaction) -> None: # this may be the first time that we're recording any state for this AS, so
stream_id_type = "%s_stream_id" % stream_type # we don't yet know if a row for it exists; hence we have to upsert here.
txn.execute( await self.db_pool.simple_upsert(
"UPDATE application_services_state SET %s = ? WHERE as_id=?" table="application_services_state",
% stream_id_type, keyvalues={"as_id": service.id},
(pos, service.id), values={f"{stream_type}_stream_id": pos},
) # no need to lock when emulating upsert: as_id is a unique key
lock=False,
await self.db_pool.runInteraction( desc="set_appservice_stream_type_pos",
"set_appservice_stream_type_pos", set_appservice_stream_type_pos_txn
) )

View file

@ -61,7 +61,9 @@ Changes in SCHEMA_VERSION = 68:
Changes in SCHEMA_VERSION = 69: Changes in SCHEMA_VERSION = 69:
- We now write to `device_lists_changes_in_room` table. - We now write to `device_lists_changes_in_room` table.
- Use sequence to generate future `application_services_txns.txn_id`s - We now use a PostgreSQL sequence to generate future txn_ids for
`application_services_txns`. `application_services_state.last_txn` is no longer
updated.
Changes in SCHEMA_VERSION = 70: Changes in SCHEMA_VERSION = 70:
- event_reference_hashes is no longer written to. - event_reference_hashes is no longer written to.
@ -71,6 +73,7 @@ Changes in SCHEMA_VERSION = 70:
SCHEMA_COMPAT_VERSION = ( SCHEMA_COMPAT_VERSION = (
# We now assume that `device_lists_changes_in_room` has been filled out for # We now assume that `device_lists_changes_in_room` has been filled out for
# recent device_list_updates. # recent device_list_updates.
# ... and that `application_services_state.last_txn` is not used.
69 69
) )
"""Limit on how far the synapse codebase can be rolled back without breaking db compat """Limit on how far the synapse codebase can be rolled back without breaking db compat

View file

@ -434,16 +434,6 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
}, },
) )
# "Complete" a transaction.
# All this really does for us is make an entry in the application_services_state
# database table, which tracks the current stream_token per stream ID per AS.
self.get_success(
self.hs.get_datastores().main.complete_appservice_txn(
0,
interested_appservice,
)
)
# Now, pretend that we receive a large burst of read receipts (300 total) that # Now, pretend that we receive a large burst of read receipts (300 total) that
# all come in at once. # all come in at once.
for i in range(300): for i in range(300):

View file

@ -14,7 +14,7 @@
import json import json
import os import os
import tempfile import tempfile
from typing import List, Optional, cast from typing import List, cast
from unittest.mock import Mock from unittest.mock import Mock
import yaml import yaml
@ -149,15 +149,12 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
outfile.write(yaml.dump(as_yaml)) outfile.write(yaml.dump(as_yaml))
self.as_yaml_files.append(as_token) self.as_yaml_files.append(as_token)
def _set_state( def _set_state(self, id: str, state: ApplicationServiceState):
self, id: str, state: ApplicationServiceState, txn: Optional[int] = None
):
return self.db_pool.runOperation( return self.db_pool.runOperation(
self.engine.convert_param_style( self.engine.convert_param_style(
"INSERT INTO application_services_state(as_id, state, last_txn) " "INSERT INTO application_services_state(as_id, state) VALUES(?,?)"
"VALUES(?,?,?)"
), ),
(id, state.value, txn), (id, state.value),
) )
def _insert_txn(self, as_id, txn_id, events): def _insert_txn(self, as_id, txn_id, events):
@ -280,17 +277,6 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
self.store.complete_appservice_txn(txn_id=txn_id, service=service) self.store.complete_appservice_txn(txn_id=txn_id, service=service)
) )
res = self.get_success(
self.db_pool.runQuery(
self.engine.convert_param_style(
"SELECT last_txn FROM application_services_state WHERE as_id=?"
),
(service.id,),
)
)
self.assertEqual(1, len(res))
self.assertEqual(txn_id, res[0][0])
res = self.get_success( res = self.get_success(
self.db_pool.runQuery( self.db_pool.runQuery(
self.engine.convert_param_style( self.engine.convert_param_style(
@ -316,14 +302,13 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
res = self.get_success( res = self.get_success(
self.db_pool.runQuery( self.db_pool.runQuery(
self.engine.convert_param_style( self.engine.convert_param_style(
"SELECT last_txn, state FROM application_services_state WHERE as_id=?" "SELECT state FROM application_services_state WHERE as_id=?"
), ),
(service.id,), (service.id,),
) )
) )
self.assertEqual(1, len(res)) self.assertEqual(1, len(res))
self.assertEqual(txn_id, res[0][0]) self.assertEqual(ApplicationServiceState.UP.value, res[0][0])
self.assertEqual(ApplicationServiceState.UP.value, res[0][1])
res = self.get_success( res = self.get_success(
self.db_pool.runQuery( self.db_pool.runQuery(