[ENG-1722] Numeric sync model IDs (#2298)

* numeric sync model ids

* migration

* fix test compilation
This commit is contained in:
Brendan Allan 2024-04-11 11:46:30 +08:00 committed by GitHub
parent 73a9b41c2a
commit 40fa3380e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 435 additions and 377 deletions

View file

@ -26,7 +26,7 @@ impl crdt_include::Data {
instance: self.instance(),
timestamp: self.timestamp(),
record_id: rmp_serde::from_slice(&self.record_id).unwrap(),
model: self.model,
model: self.model as u16,
data: rmp_serde::from_slice(&self.data).unwrap(),
}
}
@ -46,7 +46,7 @@ impl cloud_crdt_include::Data {
instance: self.instance(),
timestamp: self.timestamp(),
record_id: rmp_serde::from_slice(&self.record_id).unwrap(),
model: self.model,
model: self.model as u16,
data: serde_json::from_slice(&self.data).unwrap(),
}
}
@ -67,7 +67,7 @@ fn crdt_op_db(op: &CRDTOperation) -> crdt_operation::Create {
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: op.kind().to_string(),
data: to_vec(&op.data).unwrap(),
model: op.model.to_string(),
model: op.model as i32,
record_id: rmp_serde::to_vec(&op.record_id).unwrap(),
_params: vec![],
}

View file

@ -219,7 +219,7 @@ impl Actor {
.crdt_operation()
.find_first(vec![
crdt_operation::timestamp::gte(op.timestamp.as_u64() as i64),
crdt_operation::model::equals(op.model.to_string()),
crdt_operation::model::equals(op.model as i32),
crdt_operation::record_id::equals(serde_json::to_vec(&op.record_id).unwrap()),
crdt_operation::kind::equals(op.kind().to_string()),
])

View file

@ -43,7 +43,7 @@ pub fn crdt_op_db(op: &CRDTOperation) -> crdt_operation::Create {
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: op.kind().to_string(),
data: rmp_serde::to_vec(&op.data).unwrap(),
model: op.model.to_string(),
model: op.model as i32,
record_id: rmp_serde::to_vec(&op.record_id).unwrap(),
_params: vec![],
}
@ -59,7 +59,7 @@ pub fn crdt_op_unchecked_db(
instance_id,
kind: op.kind().to_string(),
data: rmp_serde::to_vec(&op.data).unwrap(),
model: op.model.to_string(),
model: op.model as i32,
record_id: rmp_serde::to_vec(&op.record_id).unwrap(),
_params: vec![],
}

View file

@ -55,7 +55,7 @@ async fn writes_operations_and_rows_together() -> Result<(), Box<dyn std::error:
// 1 create, 2 update
assert_eq!(operations.len(), 3);
assert_eq!(operations[0].model, prisma::location::NAME);
assert_eq!(operations[0].model, prisma_sync::location::MODEL_ID as i32);
let locations = instance.db.location().find_many(vec![]).exec().await?;

View file

@ -0,0 +1,37 @@
/*
Warnings:
- You are about to alter the column `model` on the `cloud_crdt_operation` table. The data in that column could be lost. The data in that column will be cast from `String` to `Int`.
- You are about to alter the column `model` on the `crdt_operation` table. The data in that column could be lost. The data in that column will be cast from `String` to `Int`.
*/
-- RedefineTables
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_cloud_crdt_operation" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"timestamp" BIGINT NOT NULL,
"model" INTEGER NOT NULL,
"record_id" BLOB NOT NULL,
"kind" TEXT NOT NULL,
"data" BLOB NOT NULL,
"instance_id" INTEGER NOT NULL,
CONSTRAINT "cloud_crdt_operation_instance_id_fkey" FOREIGN KEY ("instance_id") REFERENCES "instance" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
);
INSERT INTO "new_cloud_crdt_operation" ("data", "id", "instance_id", "kind", "model", "record_id", "timestamp") SELECT "data", "id", "instance_id", "kind", "model", "record_id", "timestamp" FROM "cloud_crdt_operation";
DROP TABLE "cloud_crdt_operation";
ALTER TABLE "new_cloud_crdt_operation" RENAME TO "cloud_crdt_operation";
CREATE TABLE "new_crdt_operation" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"timestamp" BIGINT NOT NULL,
"model" INTEGER NOT NULL,
"record_id" BLOB NOT NULL,
"kind" TEXT NOT NULL,
"data" BLOB NOT NULL,
"instance_id" INTEGER NOT NULL,
CONSTRAINT "crdt_operation_instance_id_fkey" FOREIGN KEY ("instance_id") REFERENCES "instance" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
);
INSERT INTO "new_crdt_operation" ("data", "id", "instance_id", "kind", "model", "record_id", "timestamp") SELECT "data", "id", "instance_id", "kind", "model", "record_id", "timestamp" FROM "crdt_operation";
DROP TABLE "crdt_operation";
ALTER TABLE "new_crdt_operation" RENAME TO "crdt_operation";
PRAGMA foreign_key_check;
PRAGMA foreign_keys=ON;

View file

@ -20,7 +20,7 @@ model CRDTOperation {
id Int @id @default(autoincrement())
timestamp BigInt
model String
model Int
record_id Bytes
// Enum: ??
@ -33,6 +33,23 @@ model CRDTOperation {
@@map("crdt_operation")
}
model CloudCRDTOperation {
id Int @id @default(autoincrement())
timestamp BigInt
model Int
record_id Bytes
// Enum: ??
kind String
data Bytes
instance_id Int
instance Instance @relation(fields: [instance_id], references: [id])
@@map("cloud_crdt_operation")
}
/// @deprecated: This model has to exist solely for backwards compatibility.
model Node {
id Int @id @default(autoincrement())
@ -46,7 +63,6 @@ model Node {
@@map("node")
}
/// @local(id: pub_id)
// represents a single `.db` file (SQLite DB) that is paired to the current library.
// A `LibraryInstance` is always owned by a single `Node` but it's possible for that node to change (or two to be owned by a single node).
model Instance {
@ -103,7 +119,7 @@ model Volume {
@@map("volume")
}
/// @shared(id: pub_id)
/// @shared(id: pub_id, modelId: 1)
model Location {
id Int @id @default(autoincrement())
pub_id Bytes @unique
@ -132,7 +148,7 @@ model Location {
@@map("location")
}
/// @shared(id: pub_id)
/// @shared(id: pub_id, modelId: 2)
model FilePath {
id Int @id @default(autoincrement())
pub_id Bytes @unique
@ -181,7 +197,7 @@ model FilePath {
@@map("file_path")
}
/// @shared(id: pub_id)
/// @shared(id: pub_id, modelId: 3)
model Object {
id Int @id @default(autoincrement())
pub_id Bytes @unique
@ -276,7 +292,7 @@ model Object {
// @@map("key")
// }
/// @shared(id: object)
/// @shared(id: object, modelId: 4)
model MediaData {
id Int @id @default(autoincrement())
@ -309,7 +325,7 @@ model MediaData {
//// Tag ////
/// @shared(id: pub_id)
/// @shared(id: pub_id, modelId: 5)
model Tag {
id Int @id @default(autoincrement())
pub_id Bytes @unique
@ -326,7 +342,7 @@ model Tag {
@@map("tag")
}
/// @relation(item: object, group: tag)
/// @relation(item: object, group: tag, modelId: 6)
model TagOnObject {
object_id Int
object Object @relation(fields: [object_id], references: [id], onDelete: Restrict)
@ -342,7 +358,7 @@ model TagOnObject {
//// Label ////
/// @shared(id: name)
/// @shared(id: name, modelId: 7)
model Label {
id Int @id @default(autoincrement())
name String @unique
@ -354,7 +370,7 @@ model Label {
@@map("label")
}
/// @relation(item: object, group: label)
/// @relation(item: object, group: label, modelId: 8)
model LabelOnObject {
date_created DateTime @default(now())
@ -405,7 +421,6 @@ model Job {
// Enum: sd_core::job::job_manager:JobStatus
status Int? // 0 = Queued
// List of errors, separated by "\n\n" in case of failed jobs or completed with errors
errors_text String? // Deprecated, use `critical_error` or `non_critical_errors` instead
critical_error String? // Serialized error field with info about the failed job after completion
@ -500,7 +515,7 @@ model IndexerRulesInLocation {
@@map("indexer_rule_in_location")
}
/// @shared(id: key)
/// @shared(id: key, modelId: 9)
model Preference {
key String @id
value Bytes?
@ -518,7 +533,7 @@ model Notification {
@@map("notification")
}
/// @shared(id: pub_id)
/// @shared(id: pub_id, modelId: 10)
model SavedSearch {
id Int @id @default(autoincrement())
pub_id Bytes @unique
@ -538,21 +553,3 @@ model SavedSearch {
@@map("saved_search")
}
/// @local(id: id)
model CloudCRDTOperation {
id Int @id @default(autoincrement())
timestamp BigInt
model String
record_id Bytes
// Enum: ??
kind String
data Bytes
instance_id Int
instance Instance @relation(fields: [instance_id], references: [id])
@@map("cloud_crdt_operation")
}

View file

@ -232,7 +232,7 @@ fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create {
instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()),
kind: op.data.as_kind().to_string(),
data: to_vec(&op.data).expect("unable to serialize data"),
model: op.model.to_string(),
model: op.model as i32,
record_id: rmp_serde::to_vec(&op.record_id).expect("unable to serialize record id"),
_params: vec![],
}

View file

@ -747,12 +747,12 @@ async fn create_location(
(location::name::NAME, msgpack!(&name)),
(location::path::NAME, msgpack!(&path)),
(location::date_created::NAME, msgpack!(date_created)),
(
location::instance::NAME,
msgpack!(prisma_sync::instance::SyncId {
pub_id: uuid_to_bytes(sync.instance)
}),
),
// (
// location::instance::NAME,
// msgpack!(prisma_sync::instance::SyncId {
// pub_id: uuid_to_bytes(sync.instance)
// }),
// ),
],
),
db.location()

View file

@ -69,7 +69,7 @@ mod originator {
instance: Uuid::new_v4(),
timestamp: sync::NTP64(0),
record_id: rmpv::Value::Nil,
model: "name".to_string(),
model: 0,
data: sd_sync::CRDTOperationData::Create,
}]);

View file

@ -28,10 +28,13 @@ pub enum ModelSyncType<'a> {
// },
Shared {
id: FieldWalker<'a>,
// model ids help reduce storage cost of sync messages
model_id: u16,
},
Relation {
group: RelationFieldWalker<'a>,
item: RelationFieldWalker<'a>,
model_id: u16,
},
}
@ -49,7 +52,13 @@ impl<'a> ModelSyncType<'a> {
match attr.name {
"local" => Self::Local { id },
"shared" => Self::Shared { id },
"shared" => Self::Shared {
id,
model_id: attr
.field("modelId")
.and_then(|a| a.as_single())
.and_then(|s| s.parse().ok())?,
},
_ => return None,
}
}
@ -77,6 +86,10 @@ impl<'a> ModelSyncType<'a> {
Self::Relation {
item: get_field("item"),
group: get_field("group"),
model_id: attr
.field("modelId")
.and_then(|a| a.as_single())
.and_then(|s| s.parse().ok())?,
}
}
// "owned" => Self::Owned { id },
@ -87,9 +100,9 @@ impl<'a> ModelSyncType<'a> {
fn sync_id(&self) -> Vec<FieldWalker> {
match self {
// Self::Owned { id } => id.clone(),
Self::Local { id } => vec![*id],
Self::Shared { id } => vec![*id],
Self::Relation { group, item } => vec![(*group).into(), (*item).into()],
Self::Local { id, .. } => vec![*id],
Self::Shared { id, .. } => vec![*id],
Self::Relation { group, item, .. } => vec![(*group).into(), (*item).into()],
}
}
}

View file

@ -22,7 +22,11 @@ pub fn module((model, sync_type): ModelWithSyncType) -> Module {
});
let model_stuff = match sync_type {
ModelSyncType::Relation { item, group } => {
ModelSyncType::Relation {
item,
group,
model_id,
} => {
let item_name_snake = snake_ident(item.name());
let item_model_name_snake = snake_ident(item.related_model().name());
@ -42,12 +46,24 @@ pub fn module((model, sync_type): ModelWithSyncType) -> Module {
}
}
pub const MODEL_ID: u16 = #model_id;
impl sd_sync::SyncModel for #model_name_snake::Types {
const MODEL_ID: u16 = MODEL_ID;
}
impl sd_sync::RelationSyncModel for #model_name_snake::Types {
type SyncId = SyncId;
}
})
}
ModelSyncType::Shared { .. } => Some(quote! {
ModelSyncType::Shared { model_id, .. } => Some(quote! {
pub const MODEL_ID: u16 = #model_id;
impl sd_sync::SyncModel for #model_name_snake::Types {
const MODEL_ID: u16 = MODEL_ID;
}
impl sd_sync::SharedSyncModel for #model_name_snake::Types {
type SyncId = SyncId;
}

View file

@ -24,7 +24,7 @@ pub fn r#enum(models: Vec<ModelWithSyncType>) -> TokenStream {
(
quote!(#model_name_pascal(#model_name_snake::SyncId, sd_sync::CRDTOperationData)),
quote! {
prisma::#model_name_snake::NAME =>
#model_name_snake::MODEL_ID =>
Self::#model_name_pascal(rmpv::ext::from_value(op.record_id).ok()?, op.data)
},
)
@ -37,7 +37,7 @@ pub fn r#enum(models: Vec<ModelWithSyncType>) -> TokenStream {
let model_name_snake = snake_ident(model.name());
let match_arms = match sync_type.as_ref()? {
ModelSyncType::Shared { id } => {
ModelSyncType::Shared { id, .. } => {
let (get_id, equals_value, id_name_snake, create_id) = match id.refine() {
RefinedFieldWalker::Relation(rel) => {
let scalar_field = rel.referenced_fields().unwrap().next().unwrap();
@ -110,7 +110,7 @@ pub fn r#enum(models: Vec<ModelWithSyncType>) -> TokenStream {
}
}
}
ModelSyncType::Relation { item, group } => {
ModelSyncType::Relation { item, group, .. } => {
let compound_id = format_ident!(
"{}",
group
@ -228,7 +228,7 @@ pub fn r#enum(models: Vec<ModelWithSyncType>) -> TokenStream {
impl ModelSyncData {
pub fn from_op(op: sd_sync::CRDTOperation) -> Option<Self> {
Some(match op.model.as_str() {
Some(match op.model {
#(#matches),*,
_ => return None
})

View file

@ -9,7 +9,7 @@ pub type CompressedCRDTOperationsForModel = Vec<(rmpv::Value, Vec<CompressedCRDT
/// Stores a bunch of CRDTOperations in a more memory-efficient form for sending to the cloud.
#[derive(Serialize, Deserialize)]
pub struct CompressedCRDTOperations(
pub(self) Vec<(Uuid, Vec<(String, CompressedCRDTOperationsForModel)>)>,
pub(self) Vec<(Uuid, Vec<(u16, CompressedCRDTOperationsForModel)>)>,
);
impl CompressedCRDTOperations {
@ -121,49 +121,49 @@ mod test {
CRDTOperation {
instance,
timestamp: NTP64(0),
model: "FilePath".to_string(),
model: 0,
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
},
CRDTOperation {
instance,
timestamp: NTP64(0),
model: "FilePath".to_string(),
model: 0,
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
},
CRDTOperation {
instance,
timestamp: NTP64(0),
model: "FilePath".to_string(),
model: 0,
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
},
CRDTOperation {
instance,
timestamp: NTP64(0),
model: "Object".to_string(),
model: 1,
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
},
CRDTOperation {
instance,
timestamp: NTP64(0),
model: "Object".to_string(),
model: 1,
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
},
CRDTOperation {
instance,
timestamp: NTP64(0),
model: "FilePath".to_string(),
model: 0,
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
},
CRDTOperation {
instance,
timestamp: NTP64(0),
model: "FilePath".to_string(),
model: 0,
record_id: rmpv::Value::Nil,
data: CRDTOperationData::Create,
},
@ -171,9 +171,9 @@ mod test {
let CompressedCRDTOperations(compressed) = CompressedCRDTOperations::new(uncompressed);
assert_eq!(&compressed[0].1[0].0, "FilePath");
assert_eq!(&compressed[0].1[1].0, "Object");
assert_eq!(&compressed[0].1[2].0, "FilePath");
assert_eq!(compressed[0].1[0].0, 0);
assert_eq!(compressed[0].1[1].0, 1);
assert_eq!(compressed[0].1[2].0, 0);
assert_eq!(compressed[0].1[0].1[0].1.len(), 3);
assert_eq!(compressed[0].1[1].1[0].1.len(), 2);
@ -186,7 +186,7 @@ mod test {
Uuid::new_v4(),
vec![
(
"FilePath".to_string(),
0,
vec![(
rmpv::Value::Nil,
vec![
@ -206,7 +206,7 @@ mod test {
)],
),
(
"Object".to_string(),
1,
vec![(
rmpv::Value::Nil,
vec![
@ -222,7 +222,7 @@ mod test {
)],
),
(
"FilePath".to_string(),
0,
vec![(
rmpv::Value::Nil,
vec![
@ -243,8 +243,8 @@ mod test {
let uncompressed = compressed.into_ops();
assert_eq!(uncompressed.len(), 7);
assert_eq!(uncompressed[2].model, "FilePath");
assert_eq!(uncompressed[4].model, "Object");
assert_eq!(uncompressed[6].model, "FilePath");
assert_eq!(uncompressed[2].model, 0);
assert_eq!(uncompressed[4].model, 1);
assert_eq!(uncompressed[6].model, 0);
}
}

View file

@ -50,7 +50,7 @@ pub struct CRDTOperation {
pub instance: Uuid,
#[specta(type = u32)]
pub timestamp: NTP64,
pub model: String,
pub model: u16,
#[specta(type = serde_json::Value)]
pub record_id: rmpv::Value,
pub data: CRDTOperationData,

View file

@ -1,4 +1,3 @@
use prisma_client_rust::ModelTypes;
use uhlc::HLC;
use uuid::Uuid;
@ -22,17 +21,13 @@ pub trait OperationFactory {
fn get_clock(&self) -> &HLC;
fn get_instance(&self) -> Uuid;
fn new_op<TSyncId: SyncId<Model = TModel>, TModel: ModelTypes>(
&self,
id: &TSyncId,
data: CRDTOperationData,
) -> CRDTOperation {
fn new_op<TSyncId: SyncId>(&self, id: &TSyncId, data: CRDTOperationData) -> CRDTOperation {
let timestamp = self.get_clock().new_timestamp();
CRDTOperation {
instance: self.get_instance(),
timestamp: *timestamp.get_time(),
model: TModel::MODEL.to_string(),
model: <TSyncId::Model as crate::SyncModel>::MODEL_ID,
record_id: msgpack!(id),
data,
}

View file

@ -2,14 +2,14 @@ use prisma_client_rust::ModelTypes;
use serde::{de::DeserializeOwned, Serialize};
pub trait SyncId: Serialize + DeserializeOwned {
type Model: ModelTypes;
type Model: SyncModel;
}
pub trait LocalSyncModel: ModelTypes {
type SyncId: SyncId;
pub trait SyncModel: ModelTypes {
const MODEL_ID: u16;
}
pub trait SharedSyncModel: ModelTypes {
pub trait SharedSyncModel: SyncModel {
type SyncId: SyncId;
}
@ -20,6 +20,6 @@ pub trait RelationSyncId: SyncId {
fn split(&self) -> (&Self::ItemSyncId, &Self::GroupSyncId);
}
pub trait RelationSyncModel: ModelTypes {
pub trait RelationSyncModel: SyncModel {
type SyncId: RelationSyncId;
}