paginate sync backfill (#2163)

* paginate sync backfill

* add backfill timeout

* use gt instead of gte
This commit is contained in:
Brendan Allan 2024-03-06 15:13:44 +08:00 committed by GitHub
parent 3bd1622e93
commit 26d7a240db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,7 +1,9 @@
use std::future::Future;
use sd_prisma::{
prisma::{
file_path, label, label_on_object, location, media_data, object, tag, tag_on_object,
PrismaClient,
PrismaClient, SortOrder,
},
prisma_sync,
};
@ -13,203 +15,262 @@ use crate::crdt_op_unchecked_db;
pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, instance_id: i32) {
db._transaction()
.with_timeout(9999999999)
.run(|db| async move {
println!("backfill started");
db.crdt_operation().delete_many(vec![]).exec().await?;
let locations = db.location().find_many(vec![]).exec().await?;
db.crdt_operation()
.create_many(
locations
.into_iter()
.flat_map(|l| {
use location::*;
sync.shared_create(
prisma_sync::location::SyncId { pub_id: l.pub_id },
chain_optional_iter(
[],
[
option_sync_entry!(l.name, name),
option_sync_entry!(l.path, path),
option_sync_entry!(l.total_capacity, total_capacity),
option_sync_entry!(
l.available_capacity,
available_capacity
paginate(
|cursor| {
db.location()
.find_many(vec![location::id::gt(cursor)])
.order_by(location::id::order(SortOrder::Asc))
.take(1000)
.exec()
},
|location| location.id,
|locations| {
db.crdt_operation()
.create_many(
locations
.into_iter()
.flat_map(|l| {
use location::*;
sync.shared_create(
prisma_sync::location::SyncId { pub_id: l.pub_id },
chain_optional_iter(
[],
[
option_sync_entry!(l.name, name),
option_sync_entry!(l.path, path),
option_sync_entry!(
l.total_capacity,
total_capacity
),
option_sync_entry!(
l.available_capacity,
available_capacity
),
option_sync_entry!(l.size_in_bytes, size_in_bytes),
option_sync_entry!(l.is_archived, is_archived),
option_sync_entry!(
l.generate_preview_media,
generate_preview_media
),
option_sync_entry!(
l.sync_preview_media,
sync_preview_media
),
option_sync_entry!(l.hidden, hidden),
option_sync_entry!(l.date_created, date_created),
],
),
option_sync_entry!(l.size_in_bytes, size_in_bytes),
option_sync_entry!(l.is_archived, is_archived),
option_sync_entry!(
l.generate_preview_media,
generate_preview_media
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await?;
paginate(
|cursor| {
db.object()
.find_many(vec![object::id::gt(cursor)])
.order_by(object::id::order(SortOrder::Asc))
.take(1000)
.exec()
},
|object| object.id,
|objects| {
db.crdt_operation()
.create_many(
objects
.into_iter()
.flat_map(|o| {
use object::*;
sync.shared_create(
prisma_sync::object::SyncId { pub_id: o.pub_id },
chain_optional_iter(
[],
[
option_sync_entry!(o.kind, kind),
option_sync_entry!(o.hidden, hidden),
option_sync_entry!(o.favorite, favorite),
option_sync_entry!(o.important, important),
option_sync_entry!(o.note, note),
option_sync_entry!(o.date_created, date_created),
option_sync_entry!(o.date_accessed, date_accessed),
],
),
option_sync_entry!(
l.sync_preview_media,
sync_preview_media
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await?;
paginate(
|cursor| {
db.media_data()
.find_many(vec![media_data::id::gt(cursor)])
.order_by(media_data::id::order(SortOrder::Asc))
.take(1000)
.include(media_data::include!({
object: select { pub_id }
}))
.exec()
},
|o| o.id,
|media_datas| {
db.crdt_operation()
.create_many(
media_datas
.into_iter()
.flat_map(|md| {
use media_data::*;
sync.shared_create(
prisma_sync::media_data::SyncId {
object: prisma_sync::object::SyncId {
pub_id: md.object.pub_id,
},
},
chain_optional_iter(
[],
[
option_sync_entry!(md.resolution, resolution),
option_sync_entry!(md.media_date, media_date),
option_sync_entry!(
md.media_location,
media_location
),
option_sync_entry!(md.camera_data, camera_data),
option_sync_entry!(md.artist, artist),
option_sync_entry!(md.description, description),
option_sync_entry!(md.copyright, copyright),
option_sync_entry!(md.exif_version, exif_version),
option_sync_entry!(md.epoch_time, epoch_time),
],
),
option_sync_entry!(l.hidden, hidden),
option_sync_entry!(l.date_created, date_created),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await?;
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await?;
let objects = db.object().find_many(vec![]).exec().await?;
db.crdt_operation()
.create_many(
objects
.into_iter()
.flat_map(|o| {
use object::*;
paginate(
|cursor| {
db.file_path()
.find_many(vec![file_path::id::gt(cursor)])
.order_by(file_path::id::order(SortOrder::Asc))
.include(file_path::include!({
location: select { pub_id }
object: select { pub_id }
}))
.exec()
},
|o| o.id,
|file_paths| {
db.crdt_operation()
.create_many(
file_paths
.into_iter()
.flat_map(|fp| {
use file_path::*;
sync.shared_create(
prisma_sync::object::SyncId { pub_id: o.pub_id },
chain_optional_iter(
[],
[
option_sync_entry!(o.kind, kind),
option_sync_entry!(o.hidden, hidden),
option_sync_entry!(o.favorite, favorite),
option_sync_entry!(o.important, important),
option_sync_entry!(o.note, note),
option_sync_entry!(o.date_created, date_created),
option_sync_entry!(o.date_accessed, date_accessed),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await?;
let media_datas = db
.media_data()
.find_many(vec![])
.include(media_data::include!({
object: select { pub_id }
}))
.exec()
.await?;
db.crdt_operation()
.create_many(
media_datas
.into_iter()
.flat_map(|md| {
use media_data::*;
sync.shared_create(
prisma_sync::media_data::SyncId {
object: prisma_sync::object::SyncId {
pub_id: md.object.pub_id,
},
},
chain_optional_iter(
[],
[
option_sync_entry!(md.resolution, resolution),
option_sync_entry!(md.media_date, media_date),
option_sync_entry!(md.media_location, media_location),
option_sync_entry!(md.camera_data, camera_data),
option_sync_entry!(md.artist, artist),
option_sync_entry!(md.description, description),
option_sync_entry!(md.copyright, copyright),
option_sync_entry!(md.exif_version, exif_version),
option_sync_entry!(md.epoch_time, epoch_time),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await?;
let file_paths = db
.file_path()
.find_many(vec![])
.include(file_path::include!({
location: select { pub_id }
object: select { pub_id }
}))
.exec()
.await?;
db.crdt_operation()
.create_many(
file_paths
.into_iter()
.flat_map(|fp| {
use file_path::*;
sync.shared_create(
prisma_sync::file_path::SyncId { pub_id: fp.pub_id },
chain_optional_iter(
[],
[
option_sync_entry!(fp.is_dir, is_dir),
option_sync_entry!(fp.cas_id, cas_id),
option_sync_entry!(
fp.integrity_checksum,
integrity_checksum
sync.shared_create(
prisma_sync::file_path::SyncId { pub_id: fp.pub_id },
chain_optional_iter(
[],
[
option_sync_entry!(fp.is_dir, is_dir),
option_sync_entry!(fp.cas_id, cas_id),
option_sync_entry!(
fp.integrity_checksum,
integrity_checksum
),
option_sync_entry!(
fp.location.map(|l| {
prisma_sync::location::SyncId {
pub_id: l.pub_id,
}
}),
location
),
option_sync_entry!(
fp.materialized_path,
materialized_path
),
option_sync_entry!(fp.name, name),
option_sync_entry!(fp.extension, extension),
option_sync_entry!(fp.hidden, hidden),
option_sync_entry!(
fp.size_in_bytes_bytes,
size_in_bytes_bytes
),
option_sync_entry!(fp.inode, inode),
option_sync_entry!(fp.date_created, date_created),
option_sync_entry!(fp.date_modified, date_modified),
option_sync_entry!(fp.date_indexed, date_indexed),
],
),
option_sync_entry!(
fp.location.map(|l| prisma_sync::location::SyncId {
pub_id: l.pub_id
}),
location
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await?;
paginate(
|cursor| {
db.tag()
.find_many(vec![tag::id::gt(cursor)])
.order_by(tag::id::order(SortOrder::Asc))
.exec()
},
|tag| tag.id,
|tags| {
db.crdt_operation()
.create_many(
tags.into_iter()
.flat_map(|t| {
sync.shared_create(
prisma_sync::tag::SyncId { pub_id: t.pub_id },
chain_optional_iter(
[],
[
t.name.map(|v| (tag::name::NAME, json!(v))),
t.color.map(|v| (tag::color::NAME, json!(v))),
t.date_created
.map(|v| (tag::date_created::NAME, json!(v))),
t.date_modified
.map(|v| (tag::date_modified::NAME, json!(v))),
],
),
option_sync_entry!(fp.materialized_path, materialized_path),
option_sync_entry!(fp.name, name),
option_sync_entry!(fp.extension, extension),
option_sync_entry!(fp.hidden, hidden),
option_sync_entry!(
fp.size_in_bytes_bytes,
size_in_bytes_bytes
),
option_sync_entry!(fp.inode, inode),
option_sync_entry!(fp.date_created, date_created),
option_sync_entry!(fp.date_modified, date_modified),
option_sync_entry!(fp.date_indexed, date_indexed),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await?;
let tags = db.tag().find_many(vec![]).exec().await?;
db.crdt_operation()
.create_many(
tags.into_iter()
.flat_map(|t| {
sync.shared_create(
prisma_sync::tag::SyncId { pub_id: t.pub_id },
chain_optional_iter(
[],
[
t.name.map(|v| (tag::name::NAME, json!(v))),
t.color.map(|v| (tag::color::NAME, json!(v))),
t.date_created.map(|v| (tag::date_created::NAME, json!(v))),
t.date_modified
.map(|v| (tag::date_modified::NAME, json!(v))),
],
),
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await?;
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await?;
let tag_on_objects = db
.tag_on_object()
@ -249,25 +310,35 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
.exec()
.await?;
let labels = db.label().find_many(vec![]).exec().await?;
db.crdt_operation()
.create_many(
labels
.into_iter()
.flat_map(|l| {
sync.shared_create(
prisma_sync::label::SyncId { name: l.name },
[
(label::date_created::NAME, json!(l.date_created)),
(label::date_modified::NAME, json!(l.date_modified)),
],
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
.await?;
paginate(
|cursor| {
db.label()
.find_many(vec![label::id::gt(cursor)])
.order_by(label::id::order(SortOrder::Asc))
.exec()
},
|label| label.id,
|labels| {
db.crdt_operation()
.create_many(
labels
.into_iter()
.flat_map(|l| {
sync.shared_create(
prisma_sync::label::SyncId { name: l.name },
[
(label::date_created::NAME, json!(l.date_created)),
(label::date_modified::NAME, json!(l.date_modified)),
],
)
})
.map(|o| crdt_op_unchecked_db(&o, instance_id))
.collect(),
)
.exec()
},
)
.await?;
let label_on_objects = db
.label_on_object()
@ -307,3 +378,27 @@ pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager, insta
.await
.unwrap();
}
async fn paginate<
T,
E: std::fmt::Debug,
TGetter: Future<Output = Result<Vec<T>, E>>,
TOperations: Future<Output = Result<i64, E>>,
>(
getter: impl Fn(i32) -> TGetter,
id: impl Fn(&T) -> i32,
operations: impl Fn(Vec<T>) -> TOperations,
) -> Result<(), E> {
let mut next_cursor = Some(0);
loop {
let Some(cursor) = next_cursor else {
break;
};
let items = getter(cursor).await?;
next_cursor = items.last().map(&id);
operations(items).await?;
}
Ok(())
}