mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-04 13:23:28 +00:00
Basic sync operation backfill (#2101)
* basic sync operation backfill * no changes
This commit is contained in:
parent
2a283479e6
commit
9bc1a472a8
264
core/crates/sync/src/backfill.rs
Normal file
264
core/crates/sync/src/backfill.rs
Normal file
|
@ -0,0 +1,264 @@
|
||||||
|
use sd_prisma::{
|
||||||
|
prisma::{
|
||||||
|
file_path, label, label_on_object, location, object, tag, tag_on_object, PrismaClient,
|
||||||
|
},
|
||||||
|
prisma_sync,
|
||||||
|
};
|
||||||
|
use sd_sync::OperationFactory;
|
||||||
|
use sd_utils::chain_optional_iter;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use crate::crdt_op_unchecked_db;
|
||||||
|
|
||||||
|
pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) {
|
||||||
|
println!("backfill started");
|
||||||
|
db.crdt_operation()
|
||||||
|
.delete_many(vec![])
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let locations = db.location().find_many(vec![]).exec().await.unwrap();
|
||||||
|
db.crdt_operation()
|
||||||
|
.create_many(
|
||||||
|
locations
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|l| {
|
||||||
|
sync.shared_create(
|
||||||
|
prisma_sync::location::SyncId { pub_id: l.pub_id },
|
||||||
|
chain_optional_iter(
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
l.name.map(|v| (location::name::NAME, json!(v))),
|
||||||
|
l.path.map(|v| (location::path::NAME, json!(v))),
|
||||||
|
l.total_capacity
|
||||||
|
.map(|v| (location::total_capacity::NAME, json!(v))),
|
||||||
|
l.available_capacity
|
||||||
|
.map(|v| (location::available_capacity::NAME, json!(v))),
|
||||||
|
l.size_in_bytes
|
||||||
|
.map(|v| (location::size_in_bytes::NAME, json!(v))),
|
||||||
|
l.is_archived
|
||||||
|
.map(|v| (location::is_archived::NAME, json!(v))),
|
||||||
|
l.generate_preview_media
|
||||||
|
.map(|v| (location::generate_preview_media::NAME, json!(v))),
|
||||||
|
l.sync_preview_media
|
||||||
|
.map(|v| (location::sync_preview_media::NAME, json!(v))),
|
||||||
|
l.hidden.map(|v| (location::hidden::NAME, json!(v))),
|
||||||
|
l.date_created
|
||||||
|
.map(|v| (location::date_created::NAME, json!(v))),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let objects = db.object().find_many(vec![]).exec().await.unwrap();
|
||||||
|
db.crdt_operation()
|
||||||
|
.create_many(
|
||||||
|
objects
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|o| {
|
||||||
|
sync.shared_create(
|
||||||
|
prisma_sync::object::SyncId { pub_id: o.pub_id },
|
||||||
|
chain_optional_iter(
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
o.kind.map(|v| (object::kind::NAME, json!(v))),
|
||||||
|
o.hidden.map(|v| (object::hidden::NAME, json!(v))),
|
||||||
|
o.favorite.map(|v| (object::favorite::NAME, json!(v))),
|
||||||
|
o.important.map(|v| (object::important::NAME, json!(v))),
|
||||||
|
o.note.map(|v| (object::note::NAME, json!(v))),
|
||||||
|
o.date_created
|
||||||
|
.map(|v| (object::date_created::NAME, json!(v))),
|
||||||
|
o.date_accessed
|
||||||
|
.map(|v| (object::date_accessed::NAME, json!(v))),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let file_paths = db
|
||||||
|
.file_path()
|
||||||
|
.find_many(vec![])
|
||||||
|
.include(file_path::include!({
|
||||||
|
location: select { pub_id }
|
||||||
|
object: select { pub_id }
|
||||||
|
}))
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
db.crdt_operation()
|
||||||
|
.create_many(
|
||||||
|
file_paths
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|fp| {
|
||||||
|
sync.shared_create(
|
||||||
|
prisma_sync::file_path::SyncId { pub_id: fp.pub_id },
|
||||||
|
chain_optional_iter(
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
fp.is_dir.map(|v| (file_path::is_dir::NAME, json!(v))),
|
||||||
|
fp.cas_id.map(|v| (file_path::cas_id::NAME, json!(v))),
|
||||||
|
fp.integrity_checksum
|
||||||
|
.map(|v| (file_path::integrity_checksum::NAME, json!(v))),
|
||||||
|
fp.location.map(|l| {
|
||||||
|
(
|
||||||
|
file_path::location::NAME,
|
||||||
|
json!(prisma_sync::location::SyncId { pub_id: l.pub_id }),
|
||||||
|
)
|
||||||
|
}),
|
||||||
|
fp.materialized_path
|
||||||
|
.map(|v| (file_path::materialized_path::NAME, json!(v))),
|
||||||
|
fp.name.map(|v| (file_path::name::NAME, json!(v))),
|
||||||
|
fp.extension.map(|v| (file_path::extension::NAME, json!(v))),
|
||||||
|
fp.hidden.map(|v| (file_path::hidden::NAME, json!(v))),
|
||||||
|
fp.size_in_bytes_bytes
|
||||||
|
.map(|v| (file_path::size_in_bytes_bytes::NAME, json!(v))),
|
||||||
|
fp.inode.map(|v| (file_path::inode::NAME, json!(v))),
|
||||||
|
fp.date_created
|
||||||
|
.map(|v| (file_path::date_created::NAME, json!(v))),
|
||||||
|
fp.date_modified
|
||||||
|
.map(|v| (file_path::date_modified::NAME, json!(v))),
|
||||||
|
fp.date_indexed
|
||||||
|
.map(|v| (file_path::date_indexed::NAME, json!(v))),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let tags = db.tag().find_many(vec![]).exec().await.unwrap();
|
||||||
|
db.crdt_operation()
|
||||||
|
.create_many(
|
||||||
|
tags.into_iter()
|
||||||
|
.flat_map(|t| {
|
||||||
|
sync.shared_create(
|
||||||
|
prisma_sync::tag::SyncId { pub_id: t.pub_id },
|
||||||
|
chain_optional_iter(
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
t.name.map(|v| (tag::name::NAME, json!(v))),
|
||||||
|
t.color.map(|v| (tag::color::NAME, json!(v))),
|
||||||
|
t.date_created.map(|v| (tag::date_created::NAME, json!(v))),
|
||||||
|
t.date_modified
|
||||||
|
.map(|v| (tag::date_modified::NAME, json!(v))),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let tag_on_objects = db
|
||||||
|
.tag_on_object()
|
||||||
|
.find_many(vec![])
|
||||||
|
.include(tag_on_object::include!({
|
||||||
|
tag: select { pub_id }
|
||||||
|
object: select { pub_id }
|
||||||
|
}))
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
db.crdt_operation()
|
||||||
|
.create_many(
|
||||||
|
tag_on_objects
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|t_o| {
|
||||||
|
sync.relation_create(
|
||||||
|
prisma_sync::tag_on_object::SyncId {
|
||||||
|
tag: prisma_sync::tag::SyncId {
|
||||||
|
pub_id: t_o.tag.pub_id,
|
||||||
|
},
|
||||||
|
object: prisma_sync::object::SyncId {
|
||||||
|
pub_id: t_o.object.pub_id,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
chain_optional_iter(
|
||||||
|
[],
|
||||||
|
[t_o.date_created
|
||||||
|
.map(|v| (tag_on_object::date_created::NAME, json!(v)))],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let labels = db.label().find_many(vec![]).exec().await.unwrap();
|
||||||
|
db.crdt_operation()
|
||||||
|
.create_many(
|
||||||
|
labels
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|l| {
|
||||||
|
sync.shared_create(
|
||||||
|
prisma_sync::label::SyncId { name: l.name },
|
||||||
|
[
|
||||||
|
(label::date_created::NAME, json!(l.date_created)),
|
||||||
|
(label::date_modified::NAME, json!(l.date_modified)),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let label_on_objects = db
|
||||||
|
.label_on_object()
|
||||||
|
.find_many(vec![])
|
||||||
|
.select(label_on_object::select!({
|
||||||
|
object: select { pub_id }
|
||||||
|
label: select { name }
|
||||||
|
}))
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
db.crdt_operation()
|
||||||
|
.create_many(
|
||||||
|
label_on_objects
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|l_o| {
|
||||||
|
sync.relation_create(
|
||||||
|
prisma_sync::label_on_object::SyncId {
|
||||||
|
label: prisma_sync::label::SyncId {
|
||||||
|
name: l_o.label.name,
|
||||||
|
},
|
||||||
|
object: prisma_sync::object::SyncId {
|
||||||
|
pub_id: l_o.object.pub_id,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[],
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||||
|
.collect(),
|
||||||
|
)
|
||||||
|
.exec()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
println!("backfill ended")
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here
|
#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here
|
||||||
|
|
||||||
mod actor;
|
mod actor;
|
||||||
|
pub mod backfill;
|
||||||
mod db_operation;
|
mod db_operation;
|
||||||
pub mod ingest;
|
pub mod ingest;
|
||||||
mod manager;
|
mod manager;
|
||||||
|
@ -46,3 +47,20 @@ pub fn crdt_op_db(op: &CRDTOperation) -> crdt_operation::Create {
|
||||||
_params: vec![],
|
_params: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn crdt_op_unchecked_db(
|
||||||
|
op: &CRDTOperation,
|
||||||
|
instance_id: i32,
|
||||||
|
) -> crdt_operation::CreateUnchecked {
|
||||||
|
crdt_operation::CreateUnchecked {
|
||||||
|
id: op.id.as_bytes().to_vec(),
|
||||||
|
timestamp: op.timestamp.0 as i64,
|
||||||
|
instance_id,
|
||||||
|
kind: op.kind().to_string(),
|
||||||
|
data: serde_json::to_vec(&op.data).unwrap(),
|
||||||
|
model: op.model.to_string(),
|
||||||
|
record_id: serde_json::to_vec(&op.record_id).unwrap(),
|
||||||
|
_params: vec![],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -32,4 +32,17 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
||||||
.await?)
|
.await?)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
.procedure("backfill", {
|
||||||
|
R.with2(library())
|
||||||
|
.mutation(|(_, library), _: ()| async move {
|
||||||
|
sd_core_sync::backfill::backfill_operations(
|
||||||
|
&library.db,
|
||||||
|
&library.sync,
|
||||||
|
library.config().await.instance_id,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,9 +3,11 @@ import { stringify } from 'uuid';
|
||||||
import {
|
import {
|
||||||
CRDTOperation,
|
CRDTOperation,
|
||||||
CRDTOperationData,
|
CRDTOperationData,
|
||||||
|
useLibraryMutation,
|
||||||
useLibraryQuery,
|
useLibraryQuery,
|
||||||
useLibrarySubscription
|
useLibrarySubscription
|
||||||
} from '@sd/client';
|
} from '@sd/client';
|
||||||
|
import { Button } from '@sd/ui';
|
||||||
import { useRouteTitle } from '~/hooks/useRouteTitle';
|
import { useRouteTitle } from '~/hooks/useRouteTitle';
|
||||||
|
|
||||||
type MessageGroup = {
|
type MessageGroup = {
|
||||||
|
@ -18,6 +20,9 @@ export const Component = () => {
|
||||||
useRouteTitle('Sync');
|
useRouteTitle('Sync');
|
||||||
|
|
||||||
const messages = useLibraryQuery(['sync.messages']);
|
const messages = useLibraryQuery(['sync.messages']);
|
||||||
|
const backfillSyncMessages = useLibraryMutation(['sync.backfill'], {
|
||||||
|
onSuccess: () => messages.refetch()
|
||||||
|
});
|
||||||
|
|
||||||
useLibrarySubscription(['sync.newMessage'], {
|
useLibrarySubscription(['sync.newMessage'], {
|
||||||
onData: () => messages.refetch()
|
onData: () => messages.refetch()
|
||||||
|
@ -30,6 +35,13 @@ export const Component = () => {
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<ul className="space-y-4 p-4">
|
<ul className="space-y-4 p-4">
|
||||||
|
<Button
|
||||||
|
variant="accent"
|
||||||
|
onClick={() => backfillSyncMessages.mutate(null)}
|
||||||
|
disabled={backfillSyncMessages.isLoading}
|
||||||
|
>
|
||||||
|
Backfill Sync Messages
|
||||||
|
</Button>
|
||||||
{groups?.map((group, index) => <OperationGroup key={index} group={group} />)}
|
{groups?.map((group, index) => <OperationGroup key={index} group={group} />)}
|
||||||
</ul>
|
</ul>
|
||||||
);
|
);
|
||||||
|
@ -72,7 +84,7 @@ function calculateGroups(messages: CRDTOperation[]) {
|
||||||
return messages.reduce<MessageGroup[]>((acc, op) => {
|
return messages.reduce<MessageGroup[]>((acc, op) => {
|
||||||
const { data } = op;
|
const { data } = op;
|
||||||
|
|
||||||
const id = stringify((op.record_id as any).pub_id);
|
const id = JSON.stringify(op.record_id);
|
||||||
|
|
||||||
const latest = (() => {
|
const latest = (() => {
|
||||||
const latest = acc[acc.length - 1];
|
const latest = acc[acc.length - 1];
|
||||||
|
|
|
@ -117,6 +117,7 @@ export type Procedures = {
|
||||||
{ key: "search.saved.create", input: LibraryArgs<{ name: string; search?: string | null; filters?: string | null; description?: string | null; icon?: string | null }>, result: null } |
|
{ key: "search.saved.create", input: LibraryArgs<{ name: string; search?: string | null; filters?: string | null; description?: string | null; icon?: string | null }>, result: null } |
|
||||||
{ key: "search.saved.delete", input: LibraryArgs<number>, result: null } |
|
{ key: "search.saved.delete", input: LibraryArgs<number>, result: null } |
|
||||||
{ key: "search.saved.update", input: LibraryArgs<[number, Args]>, result: null } |
|
{ key: "search.saved.update", input: LibraryArgs<[number, Args]>, result: null } |
|
||||||
|
{ key: "sync.backfill", input: LibraryArgs<null>, result: null } |
|
||||||
{ key: "tags.assign", input: LibraryArgs<{ targets: Target[]; tag_id: number; unassign: boolean }>, result: null } |
|
{ key: "tags.assign", input: LibraryArgs<{ targets: Target[]; tag_id: number; unassign: boolean }>, result: null } |
|
||||||
{ key: "tags.create", input: LibraryArgs<TagCreateArgs>, result: Tag } |
|
{ key: "tags.create", input: LibraryArgs<TagCreateArgs>, result: Tag } |
|
||||||
{ key: "tags.delete", input: LibraryArgs<number>, result: null } |
|
{ key: "tags.delete", input: LibraryArgs<number>, result: null } |
|
||||||
|
@ -246,7 +247,7 @@ export type FileDeleterJobInit = { location_id: number; file_path_ids: number[]
|
||||||
|
|
||||||
export type FileEraserJobInit = { location_id: number; file_path_ids: number[]; passes: string }
|
export type FileEraserJobInit = { location_id: number; file_path_ids: number[]; passes: string }
|
||||||
|
|
||||||
export type FilePath = { id: number; pub_id: number[]; is_dir: boolean | null; cas_id: string | null; integrity_checksum: string | null; location_id: number | null; materialized_path: string | null; name: string | null; extension: string | null; hidden: boolean | null; size_in_bytes: string | null; size_in_bytes_bytes: number[] | null; inode: number[] | null; object_id: number | null; key_id: number | null; date_created: string | null; date_modified: string | null; date_indexed: string | null }
|
export type FilePath = { id: number; pub_id: number[]; is_dir: boolean | null; cas_id: string | null; integrity_checksum: string | null; location_id: number | null; materialized_path: string | null; name: string | null; extension: string | null; hidden: boolean | null; size_in_bytes: string | null; size_in_bytes_bytes: number[] | null; inode: number[] | null; object_id: number | null; date_created: string | null; date_modified: string | null; date_indexed: string | null }
|
||||||
|
|
||||||
export type FilePathCursor = { isDir: boolean; variant: FilePathCursorVariant }
|
export type FilePathCursor = { isDir: boolean; variant: FilePathCursorVariant }
|
||||||
|
|
||||||
|
@ -260,7 +261,7 @@ export type FilePathOrder = { field: "name"; value: SortOrder } | { field: "size
|
||||||
|
|
||||||
export type FilePathSearchArgs = { take?: number | null; orderAndPagination?: OrderAndPagination<number, FilePathOrder, FilePathCursor> | null; filters?: SearchFilterArgs[]; groupDirectories?: boolean }
|
export type FilePathSearchArgs = { take?: number | null; orderAndPagination?: OrderAndPagination<number, FilePathOrder, FilePathCursor> | null; filters?: SearchFilterArgs[]; groupDirectories?: boolean }
|
||||||
|
|
||||||
export type FilePathWithObject = { id: number; pub_id: number[]; is_dir: boolean | null; cas_id: string | null; integrity_checksum: string | null; location_id: number | null; materialized_path: string | null; name: string | null; extension: string | null; hidden: boolean | null; size_in_bytes: string | null; size_in_bytes_bytes: number[] | null; inode: number[] | null; object_id: number | null; key_id: number | null; date_created: string | null; date_modified: string | null; date_indexed: string | null; object: { id: number; pub_id: number[]; kind: number | null; key_id: number | null; hidden: boolean | null; favorite: boolean | null; important: boolean | null; note: string | null; date_created: string | null; date_accessed: string | null } | null }
|
export type FilePathWithObject = { id: number; pub_id: number[]; is_dir: boolean | null; cas_id: string | null; integrity_checksum: string | null; location_id: number | null; materialized_path: string | null; name: string | null; extension: string | null; hidden: boolean | null; size_in_bytes: string | null; size_in_bytes_bytes: number[] | null; inode: number[] | null; object_id: number | null; date_created: string | null; date_modified: string | null; date_indexed: string | null; object: { id: number; pub_id: number[]; kind: number | null; key_id: number | null; hidden: boolean | null; favorite: boolean | null; important: boolean | null; note: string | null; date_created: string | null; date_accessed: string | null } | null }
|
||||||
|
|
||||||
export type Flash = {
|
export type Flash = {
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue