Beginning of sync stuff (#502)

This commit is contained in:
Brendan Allan 2023-01-04 23:11:55 -08:00 committed by GitHub
parent 2673a233c2
commit 676e512c4a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
62 changed files with 1572 additions and 1918 deletions

View file

@ -1,5 +1,6 @@
[alias]
prisma = "run -p prisma-cli --"
prisma = "run -p prisma-cli --bin prisma --"
prisma-sync = "run -p prisma-cli --bin sync --"
[target.x86_64-apple-darwin]
rustflags = [

View file

@ -7,11 +7,11 @@ runs:
id: cache-prisma
uses: actions/cache@v3
with:
path: ./core/src/prisma.rs
key: prisma-${{ runner.os }}-${{ hashFiles('./core/prisma/*', './Cargo.toml') }}
path: ./core/src/prisma*.rs
key: prisma-0-${{ runner.os }}-${{ hashFiles('./core/prisma/*', './Cargo.toml') }}
- name: Generate Prisma client
working-directory: core
if: steps.cache-prisma.outputs.cache-hit != 'true'
shell: bash
run: cargo run -p prisma-cli --release -- generate
run: cargo run -p prisma-cli --bin prisma --release -- generate

2
.gitignore vendored
View file

@ -61,7 +61,7 @@ yalc.lock
todos.md
examples/*/*.lock
/target
/core/src/prisma.rs
prisma*.rs
/sdserver_data
.spacedrive

612
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -5,7 +5,6 @@ members = [
"crates/*",
# "crates/p2p/tunnel",
# "crates/p2p/tunnel/utils",
"crates/sync/example/api",
"apps/cli",
"apps/desktop/src-tauri",
"apps/mobile/rust",
@ -13,18 +12,21 @@ members = [
]
[workspace.dependencies]
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.6.3", features = [
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", rev = "6581dc56abadddada4a076ede88acc7ce038042c", features = [
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.6.3", features = [
prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust.git", rev = "6581dc56abadddada4a076ede88acc7ce038042c", features = [
"rspc",
"sqlite-create-many",
"migrations",
"sqlite",
], default-features = false }
prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-rust.git", rev = "6581dc56abadddada4a076ede88acc7ce038042c", features = [
"sqlite",
], default-features = false }
rspc = { version = "0.1.2" }
normi = { version = "0.0.1" }

View file

@ -59,6 +59,7 @@ ffmpeg-next = { version = "5.1.1", optional = true, features = [] }
sd-ffmpeg = { path = "../crates/ffmpeg", optional = true }
sd-crypto = { path = "../crates/crypto", features = ["rspc", "serde"] }
sd-file-ext = { path = "../crates/file-ext"}
sd-sync = { path = "../crates/sync" }
fs_extra = "1.2.0"
tracing = "0.1.36"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
@ -70,6 +71,7 @@ globset = { version = "^0.4.9", features = ["serde1"] }
itertools = "^0.10.5"
enumflags2 = "0.7.5"
notify = { version = "5.0.0", default-features = false, features = ["macos_kqueue"], optional = true }
uhlc = "0.5.1"
[dev-dependencies]
tempfile = "^3.3.0"

View file

@ -0,0 +1,53 @@
-- CreateTable
CREATE TABLE "owned_operation" (
"id" BLOB NOT NULL PRIMARY KEY,
"timestamp" BIGINT NOT NULL,
"data" BLOB NOT NULL,
"model" TEXT NOT NULL,
"node_id" INTEGER NOT NULL,
CONSTRAINT "owned_operation_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "node" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
);
-- CreateTable
CREATE TABLE "shared_operation" (
"id" BLOB NOT NULL PRIMARY KEY,
"timestamp" BIGINT NOT NULL,
"model" TEXT NOT NULL,
"record_id" BLOB NOT NULL,
"kind" TEXT NOT NULL,
"data" BLOB NOT NULL,
"node_id" INTEGER NOT NULL,
CONSTRAINT "shared_operation_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "node" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
);
-- RedefineTables
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_object" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"cas_id" TEXT NOT NULL,
"integrity_checksum" TEXT,
"name" TEXT,
"extension" TEXT,
"kind" INTEGER NOT NULL DEFAULT 0,
"size_in_bytes" TEXT NOT NULL DEFAULT '0',
"key_id" INTEGER,
"hidden" BOOLEAN NOT NULL DEFAULT false,
"favorite" BOOLEAN NOT NULL DEFAULT false,
"important" BOOLEAN NOT NULL DEFAULT false,
"has_thumbnail" BOOLEAN NOT NULL DEFAULT false,
"has_thumbstrip" BOOLEAN NOT NULL DEFAULT false,
"has_video_preview" BOOLEAN NOT NULL DEFAULT false,
"ipfs_id" TEXT,
"note" TEXT,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_indexed" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "object_key_id_fkey" FOREIGN KEY ("key_id") REFERENCES "key" ("id") ON DELETE SET NULL ON UPDATE CASCADE
);
INSERT INTO "new_object" ("cas_id", "date_created", "date_indexed", "date_modified", "extension", "favorite", "has_thumbnail", "has_thumbstrip", "has_video_preview", "hidden", "id", "important", "integrity_checksum", "ipfs_id", "key_id", "kind", "name", "note", "size_in_bytes") SELECT "cas_id", "date_created", "date_indexed", "date_modified", "extension", "favorite", "has_thumbnail", "has_thumbstrip", "has_video_preview", "hidden", "id", "important", "integrity_checksum", "ipfs_id", "key_id", "kind", "name", "note", "size_in_bytes" FROM "object";
DROP TABLE "object";
ALTER TABLE "new_object" RENAME TO "object";
CREATE UNIQUE INDEX "object_cas_id_key" ON "object"("cas_id");
CREATE UNIQUE INDEX "object_integrity_checksum_key" ON "object"("integrity_checksum");
PRAGMA foreign_key_check;
PRAGMA foreign_keys=ON;

View file

@ -8,6 +8,38 @@ generator client {
output = "../src/prisma.rs"
}
generator sync {
provider = "cargo prisma-sync"
output = "../src/prisma_sync.rs"
}
model OwnedOperation {
id Bytes @id
timestamp BigInt
data Bytes
model String
node_id Int
node Node @relation(fields: [node_id], references: [id])
@@map("owned_operation")
}
model SharedOperation {
id Bytes @id
timestamp BigInt
model String
record_id Bytes
kind String
data Bytes
node_id Int
node Node @relation(fields: [node_id], references: [id])
@@map("shared_operation")
}
model SyncEvent {
id Int @id @default(autoincrement())
node_id Int
@ -53,7 +85,9 @@ model Node {
sync_events SyncEvent[]
jobs Job[]
Location Location[]
Location Location[]
OwnedOperation OwnedOperation[]
SharedOperation SharedOperation[]
@@map("node")
}
@ -106,7 +140,7 @@ model Object {
name String?
extension String?
kind Int @default(0)
size_in_bytes String
size_in_bytes String @default("0")
key_id Int?
// handy ways to mark an object
hidden Boolean @default(false)

View file

@ -20,10 +20,12 @@ pub(crate) mod library;
pub(crate) mod location;
pub(crate) mod node;
pub(crate) mod object;
pub(crate) mod sync;
pub(crate) mod util;
pub(crate) mod volume;
pub(crate) mod prisma;
pub(crate) mod prisma_sync;
#[derive(Clone)]
pub struct NodeContext {

View file

@ -1,6 +1,6 @@
use crate::{
api::CoreEvent, job::DynJob, location::LocationManager, node::NodeConfigManager,
prisma::PrismaClient, NodeContext,
prisma::PrismaClient, sync::SyncManager, NodeContext,
};
use std::{
@ -25,6 +25,7 @@ pub struct LibraryContext {
pub config: LibraryConfig,
/// db holds the database client for the current library.
pub db: Arc<PrismaClient>,
pub sync: Arc<SyncManager>,
/// key manager that provides encryption keys to functions that require them
pub key_manager: Arc<KeyManager>,
/// node_local_id holds the local ID of the node which is running the library.

View file

@ -2,6 +2,7 @@ use crate::{
invalidate_query,
node::Platform,
prisma::{node, PrismaClient},
sync::SyncManager,
util::{
db::{load_and_migrate, write_storedkey_to_db},
seeder::{indexer_rules_seeder, SeederError},
@ -342,11 +343,14 @@ impl LibraryManager {
.exec()
.await?;
let (sync_manager, _) = SyncManager::new(db.clone(), id);
Ok(LibraryContext {
id,
local_id: node_data.id,
config,
key_manager: create_keymanager(&db).await?,
sync: Arc::new(sync_manager),
db,
node_local_id: node_data.id,
node_context,

View file

@ -2,7 +2,6 @@ use crate::{library::LibraryContext, prisma::file_path};
use std::sync::atomic::{AtomicI32, Ordering};
use chrono::{DateTime, Utc};
use prisma_client_rust::{Direction, QueryError};
static LAST_FILE_PATH_ID: AtomicI32 = AtomicI32::new(0);
@ -81,61 +80,3 @@ pub async fn create_file_path(
Ok(created_path)
}
pub struct FilePathBatchCreateEntry {
pub id: i32,
pub location_id: i32,
pub materialized_path: String,
pub name: String,
pub extension: Option<String>,
pub parent_id: Option<i32>,
pub is_dir: bool,
pub created_at: DateTime<Utc>,
}
pub async fn create_many_file_paths(
library_ctx: &LibraryContext,
entries: Vec<FilePathBatchCreateEntry>,
) -> Result<i64, QueryError> {
library_ctx
.db
.file_path()
.create_many(
entries
.into_iter()
.map(
|FilePathBatchCreateEntry {
id,
location_id,
mut materialized_path,
name,
extension,
parent_id,
is_dir,
created_at,
}| {
// If this new file_path is a directory, materialized_path must end with "/"
if is_dir && !materialized_path.ends_with('/') {
materialized_path += "/";
}
file_path::create_unchecked(
id,
location_id,
materialized_path,
name,
vec![
file_path::is_dir::set(is_dir),
file_path::parent_id::set(parent_id),
file_path::extension::set(extension),
file_path::date_created::set(created_at.into()),
],
)
},
)
.collect(),
)
.skip_duplicates()
.exec()
.await
}

View file

@ -15,14 +15,12 @@ use std::{
use chrono::{DateTime, Utc};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;
use tracing::info;
use super::{
super::file_path_helper::{
create_many_file_paths, get_max_file_path_id, set_max_file_path_id,
FilePathBatchCreateEntry,
},
super::file_path_helper::{get_max_file_path_id, set_max_file_path_id},
rules::IndexerRule,
walk::{walk, WalkEntry},
};
@ -233,11 +231,12 @@ impl StatefulJob for IndexerJob {
.data
.as_ref()
.expect("critical error: missing data on job state");
let db = &ctx.library_ctx.db;
let location_path = &data.location_path;
let location_id = state.init.location.id;
let entries = state.steps[0]
let (sync_stuff, paths): (Vec<_>, Vec<_>) = state.steps[0]
.iter()
.map(|entry| {
let name;
@ -253,7 +252,7 @@ impl StatefulJob for IndexerJob {
extension = Some(extract_name(entry.path.extension()).to_lowercase());
name = extract_name(entry.path.file_stem());
}
let materialized_path = entry
let mut materialized_path = entry
.path
.strip_prefix(location_path)
.unwrap()
@ -261,20 +260,54 @@ impl StatefulJob for IndexerJob {
.expect("Found non-UTF-8 path")
.to_string();
FilePathBatchCreateEntry {
id: entry.file_id,
location_id,
materialized_path,
name,
extension,
parent_id: entry.parent_id,
is_dir: entry.is_dir,
created_at: entry.created_at,
if entry.is_dir && !materialized_path.ends_with('/') {
materialized_path += "/";
}
})
.collect();
let count = create_many_file_paths(&ctx.library_ctx, entries).await?;
use file_path::*;
(
(
json!({
"id": entry.file_id,
"location_id": state.init.location.pub_id,
}),
[
("materialized_path", json!(materialized_path.clone())),
("name", json!(name.clone())),
("is_dir", json!(entry.is_dir)),
("extension", json!(extension.clone())),
("parent_id", json!(entry.parent_id)),
("date_created", json!(entry.created_at)),
],
),
file_path::create_unchecked(
entry.file_id,
location_id,
materialized_path,
name,
vec![
is_dir::set(entry.is_dir),
extension::set(extension),
parent_id::set(entry.parent_id),
date_created::set(entry.created_at.into()),
],
),
)
})
.unzip();
let count = ctx
.library_ctx
.sync
.write_op(
db,
ctx.library_ctx
.sync
.owned_create_many("FilePath", sync_stuff, true),
db.file_path().create_many(paths).skip_duplicates(),
)
.await?;
info!("Inserted {count} records");

View file

@ -171,12 +171,12 @@ async fn inner_create_file(
.object()
.upsert(
object::cas_id::equals(cas_id.clone()),
(
object::create_unchecked(
cas_id.clone(),
size_str.clone(),
vec![
object::date_created::set(date_created),
object::kind::set(kind.int_value()),
object::size_in_bytes::set(size_str.clone()),
],
),
vec![

View file

@ -9,14 +9,15 @@ use crate::{
prisma::{file_path, indexer_rules_in_location, location, node, object},
};
use rspc::Type;
use serde::Deserialize;
use serde_json::json;
use std::{
collections::HashSet,
path::{Path, PathBuf},
};
use prisma_client_rust::QueryError;
use rspc::Type;
use serde::Deserialize;
use tokio::{fs, io};
use tracing::{debug, info};
use uuid::Uuid;
@ -317,6 +318,8 @@ async fn create_location(
location_path: impl AsRef<Path>,
indexer_rules_ids: &[i32],
) -> Result<indexer_job_location::Data, LocationError> {
let db = &ctx.db;
let location_name = location_path
.as_ref()
.file_name()
@ -325,26 +328,38 @@ async fn create_location(
.unwrap()
.to_string();
let mut location = ctx
.db
.location()
.create(
location_pub_id.as_bytes().to_vec(),
node::id::equals(ctx.node_local_id),
vec![
location::name::set(Some(location_name.clone())),
location::is_online::set(true),
location::local_path::set(Some(
location_path
.as_ref()
.to_str()
.expect("Found non-UTF-8 path")
.to_string(),
)),
],
let local_path = location_path
.as_ref()
.to_str()
.expect("Found non-UTF-8 path")
.to_string();
let location = ctx
.sync
.write_op(
db,
ctx.sync.owned_create(
"Location",
json!({ "id": location_pub_id.as_bytes() }),
[
("node", json!({ "pub_id": ctx.id.as_bytes() })),
("name", json!(location_name)),
("is_online", json!(true)),
("local_path", json!(&local_path)),
],
),
db.location()
.create(
location_pub_id.as_bytes().to_vec(),
node::id::equals(ctx.node_local_id),
vec![
location::name::set(Some(location_name.clone())),
location::is_online::set(true),
location::local_path::set(Some(local_path)),
],
)
.include(indexer_job_location::include()),
)
.include(indexer_job_location::include())
.exec()
.await?;
if !indexer_rules_ids.is_empty() {
@ -352,7 +367,7 @@ async fn create_location(
}
// Updating our location variable to include information about the indexer rules
location = fetch_location(ctx, location.id)
let location = fetch_location(ctx, location.id)
.include(indexer_job_location::include())
.exec()
.await?

View file

@ -69,10 +69,9 @@ impl StatefulJob for FullFileIdentifierJob {
info!("Identifying orphan File Paths...");
let location_id = state.init.location_id;
let db = &ctx.library_ctx.db;
let location = ctx
.library_ctx
.db
let location = db
.location()
.find_unique(location::id::equals(location_id))
.exec()
@ -97,9 +96,7 @@ impl StatefulJob for FullFileIdentifierJob {
// update job with total task count based on orphan file_paths count
ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]);
let first_path_id = ctx
.library_ctx
.db
let first_path_id = db
.file_path()
.find_first(orphan_path_filters(location_id, None))
.exec()

View file

@ -5,6 +5,7 @@ use crate::{
prisma::{file_path, object},
};
use chrono::{DateTime, FixedOffset};
use serde_json::json;
use std::{
collections::{HashMap, HashSet},
path::{Path, PathBuf},
@ -90,36 +91,49 @@ async fn batch_update_file_paths(
objects: &[object::Data],
cas_id_lookup: &HashMap<String, Vec<i32>>,
) -> Result<Vec<file_path::Data>, QueryError> {
let mut file_path_updates = Vec::new();
let (sync, updates): (Vec<_>, Vec<_>) = objects
.iter()
.flat_map(|object| {
let file_path_ids = cas_id_lookup.get(&object.cas_id).unwrap();
let sync = &library.sync;
objects.iter().for_each(|object| {
let file_path_ids = cas_id_lookup.get(&object.cas_id).unwrap();
file_path_ids.iter().map(|file_path_id| {
info!(
"Linking: <file_path_id = '{}', object_id = '{}'>",
file_path_id, object.id
);
file_path_updates.extend(file_path_ids.iter().map(|file_path_id| {
info!(
"Linking: <file_path_id = '{}', object_id = '{}'>",
file_path_id, object.id
);
library.db.file_path().update(
file_path::location_id_id(location_id, *file_path_id),
vec![file_path::object_id::set(Some(object.id))],
)
}));
});
(
sync.owned_update(
"FilePath",
json!({
"id": file_path_id,
"location_id": location_id
}),
[("object", json!({ "cas_id": object.cas_id }))],
),
library.db.file_path().update(
file_path::location_id_id(location_id, *file_path_id),
vec![file_path::object::connect(object::id::equals(object.id))],
),
)
})
})
.unzip();
info!(
"Updating {} file paths for {} objects",
file_path_updates.len(),
updates.len(),
objects.len()
);
library.db._batch(file_path_updates).await
library.sync.write_ops(&library.db, sync, updates).await
}
async fn generate_provisional_objects(
location_path: impl AsRef<Path>,
file_paths: &[file_path::Data],
) -> HashMap<i32, (String, String, Vec<object::SetParam>)> {
) -> HashMap<i32, (String, Vec<object::SetParam>)> {
let mut provisional_objects = HashMap::with_capacity(file_paths.len());
// analyze each file_path
@ -145,10 +159,10 @@ async fn generate_provisional_objects(
file_path_id,
object::create_unchecked(
cas_id,
size_str,
vec![
object::date_created::set(date_created),
object::kind::set(kind.int_value()),
object::size_in_bytes::set(size_str),
],
),
);
@ -159,6 +173,7 @@ async fn generate_provisional_objects(
}
};
}
provisional_objects
}
@ -175,7 +190,7 @@ async fn identifier_job_step(
let unique_cas_ids = provisional_objects
.values()
.map(|(cas_id, _, _)| cas_id.clone())
.map(|(cas_id, _)| cas_id.clone())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
@ -184,7 +199,7 @@ async fn identifier_job_step(
let mut cas_id_lookup: HashMap<String, Vec<i32>> = HashMap::with_capacity(unique_cas_ids.len());
// populate cas_id_lookup with file_path_ids
for (file_path_id, (cas_id, _, _)) in provisional_objects.iter() {
for (file_path_id, (cas_id, _)) in provisional_objects.iter() {
cas_id_lookup
.entry(cas_id.clone())
.or_insert_with(Vec::new)
@ -223,12 +238,12 @@ async fn identifier_job_step(
// extract objects that don't already exist in the database
let new_objects = provisional_objects
.into_iter()
.filter(|(_, (cas_id, _, _))| !existing_object_cas_ids.contains(cas_id))
.filter(|(_, (cas_id, _))| !existing_object_cas_ids.contains(cas_id))
.collect::<Vec<_>>();
let new_objects_cas_ids = new_objects
.iter()
.map(|(_, (cas_id, _, _))| cas_id.clone())
.map(|(_, (cas_id, _))| cas_id.clone())
.collect::<Vec<_>>();
info!(
@ -243,12 +258,7 @@ async fn identifier_job_step(
let total_created_files = library
.db
.object()
.create_many(
new_objects
.into_iter()
.map(|(_, (cas_id, size, params))| (cas_id, size, params))
.collect(),
)
.create_many(new_objects.into_iter().map(|(_, p)| p).collect())
.skip_duplicates()
.exec()
.await

519
core/src/sync/mod.rs Normal file
View file

@ -0,0 +1,519 @@
use futures::future::join_all;
use sd_sync::*;
use serde::Deserialize;
use serde_json::{from_slice, from_value, to_vec, Value};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::mpsc::{self, Receiver, Sender};
use uhlc::{HLCBuilder, HLC, NTP64};
use uuid::Uuid;
use crate::prisma::{
file_path, location, node, object, owned_operation, shared_operation, PrismaClient,
};
pub struct SyncManager {
db: Arc<PrismaClient>,
node: Uuid,
_clocks: HashMap<Uuid, NTP64>,
clock: HLC,
tx: Sender<CRDTOperation>,
}
impl SyncManager {
pub fn new(db: Arc<PrismaClient>, node: Uuid) -> (Self, Receiver<CRDTOperation>) {
let (tx, rx) = mpsc::channel(64);
(
Self {
db,
node,
clock: HLCBuilder::new().with_id(node.into()).build(),
_clocks: Default::default(),
tx,
},
rx,
)
}
pub async fn write_ops<'item, Q: prisma_client_rust::BatchItem<'item>>(
&self,
tx: &PrismaClient,
ops: Vec<CRDTOperation>,
queries: Q,
) -> prisma_client_rust::Result<<Q as prisma_client_rust::BatchItemParent>::ReturnValue> {
let owned = ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::Owned(owned_op) => Some(tx.owned_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
to_vec(&owned_op.items).unwrap(),
owned_op.model.clone(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
vec![],
)),
_ => None,
})
.collect::<Vec<_>>();
let shared = ops
.iter()
.filter_map(|op| match &op.typ {
CRDTOperationType::Shared(shared_op) => {
let kind = match &shared_op.data {
SharedOperationData::Create(_) => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
Some(tx.shared_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
shared_op.model.to_string(),
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
vec![],
))
}
_ => None,
})
.collect::<Vec<_>>();
let (res, _) = tx._batch((queries, (owned, shared))).await?;
for op in ops {
self.tx.send(op).await.ok();
}
Ok(res)
}
pub async fn write_op<'query, Q: prisma_client_rust::Query<'query>>(
&self,
tx: &PrismaClient,
op: CRDTOperation,
query: Q,
) -> prisma_client_rust::Result<Q::ReturnValue> {
let ret = match &op.typ {
CRDTOperationType::Owned(owned_op) => {
tx._batch((
tx.owned_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
to_vec(&owned_op.items).unwrap(),
owned_op.model.clone(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
vec![],
),
query,
))
.await?
.1
}
CRDTOperationType::Shared(shared_op) => {
let kind = match &shared_op.data {
SharedOperationData::Create(_) => "c",
SharedOperationData::Update { .. } => "u",
SharedOperationData::Delete => "d",
};
tx._batch((
tx.shared_operation().create(
op.id.as_bytes().to_vec(),
op.timestamp.0 as i64,
shared_op.model.to_string(),
to_vec(&shared_op.record_id).unwrap(),
kind.to_string(),
to_vec(&shared_op.data).unwrap(),
node::pub_id::equals(op.node.as_bytes().to_vec()),
vec![],
),
query,
))
.await?
.1
}
_ => todo!(),
};
self.tx.send(op).await.ok();
Ok(ret)
}
pub async fn get_ops(&self) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
owned_operation::include!(owned_op_with_node { node });
impl TryInto<CRDTOperation> for owned_op_with_node::Data {
type Error = ();
fn try_into(self) -> Result<CRDTOperation, Self::Error> {
let id = Uuid::from_slice(&self.id).map_err(|_| ())?;
let node = Uuid::from_slice(&self.node.pub_id).map_err(|_| ())?;
Ok(CRDTOperation {
id,
node,
timestamp: NTP64(self.timestamp as u64),
typ: CRDTOperationType::Owned(OwnedOperation {
model: self.model,
items: serde_json::from_slice(&self.data).unwrap(),
}),
})
}
}
shared_operation::include!(shared_op_with_node { node });
impl TryInto<CRDTOperation> for shared_op_with_node::Data {
type Error = ();
fn try_into(self) -> Result<CRDTOperation, Self::Error> {
let id = Uuid::from_slice(&self.id).map_err(|_| ())?;
let node = Uuid::from_slice(&self.node.pub_id).map_err(|_| ())?;
Ok(CRDTOperation {
id,
node,
timestamp: NTP64(self.timestamp as u64),
typ: CRDTOperationType::Shared(SharedOperation {
record_id: serde_json::from_slice(&self.record_id).unwrap(),
model: self.model,
data: from_slice(&self.data).unwrap(),
}),
})
}
}
let owned = self
.db
.owned_operation()
.find_many(vec![])
.include(owned_op_with_node::include())
.exec()
.await?
.into_iter()
.map(TryInto::try_into);
let shared = self
.db
.shared_operation()
.find_many(vec![])
.include(shared_op_with_node::include())
.exec()
.await?
.into_iter()
.map(TryInto::try_into);
let mut result: Vec<CRDTOperation> = owned.chain(shared).flatten().collect();
result.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
Ok(result)
}
pub async fn ingest_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> {
match op.typ {
CRDTOperationType::Owned(owned_op) => match owned_op.model.as_str() {
"FilePath" => {
#[derive(Deserialize)]
struct FilePathId {
location_id: Vec<u8>,
id: i32,
}
for item in owned_op.items {
let id: FilePathId = serde_json::from_value(item.id).unwrap();
let location = self
.db
.location()
.find_unique(location::pub_id::equals(id.location_id))
.select(location::select!({ id }))
.exec()
.await?
.unwrap();
match item.data {
OwnedOperationData::Create(mut data) => {
self.db
.file_path()
.create(
id.id,
location::id::equals(location.id),
serde_json::from_value(
data.remove("materialized_path").unwrap(),
)
.unwrap(),
serde_json::from_value(data.remove("name").unwrap())
.unwrap(),
data.into_iter()
.flat_map(|(k, v)| {
file_path::SetParam::deserialize(&k, v)
})
.collect(),
)
.exec()
.await?;
}
OwnedOperationData::CreateMany {
values,
skip_duplicates,
} => {
let location_ids = values
.iter()
.map(|(id, _)| {
serde_json::from_value::<FilePathId>(id.clone())
.unwrap()
.location_id
})
.collect::<HashSet<_>>();
let location_id_mappings =
join_all(location_ids.iter().map(|id| async move {
self.db
.location()
.find_unique(location::pub_id::equals(id.clone()))
.exec()
.await
.map(|o| o.map(|v| (id, v.id)))
}))
.await
.into_iter()
.flatten()
.flatten()
.collect::<HashMap<_, _>>();
let mut q = self.db.file_path().create_many(
values
.into_iter()
.map(|(id, mut data)| {
let id: FilePathId =
serde_json::from_value(id).unwrap();
file_path::create_unchecked(
id.id,
*location_id_mappings.get(&id.location_id).unwrap(),
serde_json::from_value(
data.remove("materialized_path").unwrap(),
)
.unwrap(),
serde_json::from_value(
data.remove("name").unwrap(),
)
.unwrap(),
data.into_iter()
.flat_map(|(k, v)| {
file_path::SetParam::deserialize(&k, v)
})
.collect(),
)
})
.collect(),
);
if skip_duplicates {
q = q.skip_duplicates()
}
q.exec().await?;
}
OwnedOperationData::Update(data) => {
self.db
.file_path()
.update(
file_path::location_id_id(location.id, id.id),
data.into_iter()
.flat_map(|(k, v)| {
file_path::SetParam::deserialize(&k, v)
})
.collect(),
)
.exec()
.await?;
}
_ => todo!(),
}
}
}
"Location" => {
#[derive(Deserialize)]
struct LocationId {
id: Vec<u8>,
}
for item in owned_op.items {
let id: LocationId = serde_json::from_value(item.id).unwrap();
match item.data {
OwnedOperationData::Create(mut data) => {
self.db
.location()
.create(
id.id,
{
let val: std::collections::HashMap<String, Value> =
from_value(data.remove("node").unwrap()).unwrap();
let val = val.into_iter().next().unwrap();
node::UniqueWhereParam::deserialize(&val.0, val.1)
.unwrap()
},
data.into_iter()
.flat_map(|(k, v)| {
location::SetParam::deserialize(&k, v)
})
.collect(),
)
.exec()
.await?;
}
_ => todo!(),
}
}
}
_ => {}
},
CRDTOperationType::Shared(shared_op) => match shared_op.model.as_str() {
"Object" => {
let cas_id: String = from_value(shared_op.record_id).unwrap();
match shared_op.data {
SharedOperationData::Create(_) => {
self.db
.object()
.upsert(
object::cas_id::equals(cas_id.clone()),
(cas_id, vec![]),
vec![],
)
.exec()
.await
.ok();
}
SharedOperationData::Update { field, value } => {
self.db
.object()
.update(
object::cas_id::equals(cas_id),
vec![object::SetParam::deserialize(&field, value).unwrap()],
)
.exec()
.await?;
}
_ => todo!(),
}
}
_ => todo!(),
},
_ => {}
}
Ok(())
}
fn new_op(&self, typ: CRDTOperationType) -> CRDTOperation {
let timestamp = self.clock.new_timestamp();
CRDTOperation {
node: self.node,
timestamp: *timestamp.get_time(),
id: Uuid::new_v4(),
typ,
}
}
pub fn owned_create<const SIZE: usize>(
&self,
model: &str,
id: Value,
values: [(&'static str, Value); SIZE],
) -> CRDTOperation {
self.new_op(CRDTOperationType::Owned(OwnedOperation {
model: model.to_string(),
items: [(id, values)]
.into_iter()
.map(|(id, data)| OwnedOperationItem {
id,
data: OwnedOperationData::Create(
data.into_iter().map(|(k, v)| (k.to_string(), v)).collect(),
),
})
.collect(),
}))
}
pub fn owned_create_many<const SIZE: usize>(
&self,
model: &str,
data: impl IntoIterator<Item = (Value, [(&'static str, Value); SIZE])>,
skip_duplicates: bool,
) -> CRDTOperation {
self.new_op(CRDTOperationType::Owned(OwnedOperation {
model: model.to_string(),
items: vec![OwnedOperationItem {
id: Value::Null,
data: OwnedOperationData::CreateMany {
values: data
.into_iter()
.map(|(id, data)| {
(
id,
data.into_iter().map(|(k, v)| (k.to_string(), v)).collect(),
)
})
.collect(),
skip_duplicates,
},
}],
}))
}
pub fn owned_update<const SIZE: usize>(
&self,
model: &str,
id: Value,
values: [(&'static str, Value); SIZE],
) -> CRDTOperation {
self.new_op(CRDTOperationType::Owned(OwnedOperation {
model: model.to_string(),
items: [(id, values)]
.into_iter()
.map(|(id, data)| OwnedOperationItem {
id,
data: OwnedOperationData::Update(
data.into_iter().map(|(k, v)| (k.to_string(), v)).collect(),
),
})
.collect(),
}))
}
pub fn shared_create(&self, model: &str, record_id: Value) -> CRDTOperation {
self.new_op(CRDTOperationType::Shared(SharedOperation {
model: model.to_string(),
record_id,
data: SharedOperationData::Create(SharedOperationCreateData::Atomic),
}))
}
pub fn shared_update(
&self,
model: &str,
record_id: Value,
field: &str,
value: Value,
) -> CRDTOperation {
self.new_op(CRDTOperationType::Shared(SharedOperation {
model: model.to_string(),
record_id,
data: SharedOperationData::Update {
field: field.to_string(),
value,
},
}))
}
}

View file

@ -143,8 +143,8 @@ impl HashingAlgorithm {
impl Display for HashingAlgorithm {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
Self::Argon2id(p) => write!(f, "Argon2id ({})", p),
Self::BalloonBlake3(p) => write!(f, "BLAKE3-Balloon ({})", p),
Self::Argon2id(p) => write!(f, "Argon2id ({p})"),
Self::BalloonBlake3(p) => write!(f, "BLAKE3-Balloon ({p})"),
}
}
}

View file

@ -5,3 +5,4 @@ edition = "2021"
[dependencies]
prisma-client-rust-cli = { workspace = true }
sd-sync-generator = { path = "../sync-generator" }

View file

@ -0,0 +1,3 @@
fn main() {
sd_sync_generator::run();
}

View file

@ -0,0 +1,15 @@
[package]
name = "sd-sync-generator"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
nom = "7.1.1"
once_cell = "1.16.0"
prisma-client-rust-sdk = { workspace = true }
proc-macro2 = "1.0.47"
quote = "1.0.21"
serde = { version = "1.0.147", features = ["derive"] }
thiserror = "1.0.37"

View file

@ -0,0 +1,38 @@
use crate::prelude::*;
pub fn r#struct(datamodel: &dml::Datamodel) -> TokenStream {
let model_action_fns = datamodel.models.iter().map(|model| {
let model_name_snake = snake_ident(&model.name);
let model_actions_struct = quote!(super::#model_name_snake::Actions);
quote! {
pub fn #model_name_snake(&self) -> #model_actions_struct {
#model_actions_struct { client: self }
}
}
});
quote! {
pub struct PrismaCRDTClient {
pub(super) client: #PRISMA::PrismaClient,
pub node_id: Vec<u8>,
operation_sender: #MPSC::Sender<#SYNC::CRDTOperation>
}
impl PrismaCRDTClient {
pub(super) fn _new(
client: #PRISMA::PrismaClient,
node_id: Vec<u8>,
operation_sender: #MPSC::Sender<#SYNC::CRDTOperation>
) -> Self {
Self {
client,
node_id,
operation_sender,
}
}
#(#model_action_fns)*
}
}
}

View file

@ -0,0 +1,114 @@
pub use prisma_client_rust_sdk::prelude::*;
#[derive(Debug, serde::Serialize, thiserror::Error)]
enum Error {}
#[derive(serde::Deserialize)]
struct SDSyncGenerator {}
impl PrismaGenerator for SDSyncGenerator {
const NAME: &'static str = "SD Sync Generator";
const DEFAULT_OUTPUT: &'static str = "prisma-sync.rs";
type Error = Error;
fn generate(self, args: GenerateArgs) -> Result<String, Self::Error> {
let set_param_impls = args.dml.models().map(|model| {
let model_name_snake = snake_ident(&model.name);
let field_matches = model.fields().filter_map(|field| {
let field_name_snake = snake_ident(field.name());
let field_name_snake_str = field_name_snake.to_string();
match field {
dml::Field::ScalarField(_) => {
Some(quote! {
#field_name_snake_str => crate::prisma::#model_name_snake::#field_name_snake::set(::serde_json::from_value(val).unwrap()),
})
},
dml::Field::RelationField(relation_field) => {
let relation_model_name_snake = snake_ident(&relation_field.relation_info.referenced_model);
match &relation_field.relation_info.references[..] {
[_] => {
Some(quote! {
#field_name_snake_str => {
let val: std::collections::HashMap<String, ::serde_json::Value> = ::serde_json::from_value(val).unwrap();
let val = val.into_iter().next().unwrap();
crate::prisma::#model_name_snake::#field_name_snake::connect(
crate::prisma::#relation_model_name_snake::UniqueWhereParam::deserialize(&val.0, val.1).unwrap()
)
},
})
},
_ => None
}
},
_ => None
}
});
match field_matches.clone().count() {
0 => quote!(),
_ => quote! {
impl crate::prisma::#model_name_snake::SetParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
})
}
}
}
}
});
let unique_where_param_impls = args.dml.models().map(|model| {
let model_name_snake = snake_ident(&model.name);
let field_matches = model
.loose_unique_criterias()
.iter()
.flat_map(|criteria| match &criteria.fields[..] {
[field] => {
let unique_field_name_str = &field.name;
let unique_field_name_snake = snake_ident(unique_field_name_str);
Some(quote!(#unique_field_name_str =>
crate::prisma::#model_name_snake::#unique_field_name_snake::equals(
::serde_json::from_value(val).unwrap()
),
))
}
_ => None,
})
.collect::<Vec<_>>();
match field_matches.len() {
0 => quote!(),
_ => quote! {
impl crate::prisma::#model_name_snake::UniqueWhereParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
})
}
}
},
}
});
Ok(quote! {
#(#set_param_impls)*
#(#unique_where_param_impls)*
}
.to_string())
}
}
pub fn run() {
SDSyncGenerator::run();
}

View file

@ -0,0 +1,96 @@
use crate::{attribute::Attribute, prelude::*};
pub enum ModelType {
Owned,
SharedUnique,
SharedAtomic,
Relation,
}
pub fn model_sync_type(model: &dml::Model, dml: &dml::Datamodel) -> ModelType {
let type_attribute = model
.documentation
.as_ref()
.map(Attribute::parse)
.unwrap()
.unwrap();
match type_attribute.name {
"owned" => ModelType::Owned,
"shared" => ModelType::SharedUnique, // TODO: fix
"relation" => ModelType::Relation,
_ => unreachable!(),
}
}
pub fn module(model: &dml::Model, dml: &dml::Datamodel) -> TokenStream {
let model_name_snake = snake_ident(&model.name);
let set_params_enum = set_params_enum(model, dml);
let actions_struct = actions_struct(model, dml);
quote! {
pub mod #model_name_snake {
#set_params_enum
#actions_struct
}
}
}
pub fn set_params_enum(model: &dml::Model, dml: &dml::Datamodel) -> TokenStream {
quote! {
pub enum SetParam {}
}
}
pub fn create_fn(model: &dml::Model, dml: &dml::Datamodel) -> TokenStream {
let required_scalar_fields = model.required_scalar_fields();
let args = required_scalar_fields.iter().map(|field| {
let name = snake_ident(field.name());
let typ = field.type_tokens(quote!(crate::prisma::));
quote!(#name: #typ)
});
match model_sync_type(model, dml) {
ModelType::Owned => {
quote! {
pub fn create(&self, #(#args),*, _params: Vec<SetParam>) {
}
}
}
ModelType::SharedUnique => {
quote! {
pub fn create(&self, #(#args),*, _params: Vec<SetParam>) {}
}
}
ModelType::SharedAtomic => {
quote! {
pub fn create(&self, _params: Vec<SetParam>) {}
}
}
ModelType::Relation => {
quote! {
pub fn create(&self, _params: Vec<SetParam>) {}
}
}
_ => todo!(),
}
}
pub fn actions_struct(model: &dml::Model, dml: &dml::Datamodel) -> TokenStream {
let create_fn = create_fn(model, dml);
quote! {
pub struct Actions<'a> {
pub(super) client: &'a super::#CRDT_CLIENT
}
impl<'a> Actions<'a> {
#create_fn
}
}
}

View file

@ -11,3 +11,5 @@ serde = "1.0.145"
serde_json = "1.0.85"
uhlc = "0.5.1"
uuid = { version = "1.1.2", features = ["serde", "v4"] }
prisma-client-rust = { workspace = true }
serde-value = "0.7.0"

View file

@ -1,73 +0,0 @@
```rust
pub fn update_with_timestamp(&self, timestamp: &Timestamp) -> Result<(), String> {
let mut now = (self.clock)();
now.0 &= LMASK;
let msg_time = timestamp.get_time();
if *msg_time > now && *msg_time - now > self.delta {
let err_msg = format!(
"incoming timestamp from {} exceeding delta {}ms is rejected: {} vs. now: {}",
timestamp.get_id(),
self.delta.to_duration().as_millis(),
msg_time,
now
);
warn!("{}", err_msg);
Err(err_msg)
} else {
let mut last_time = lock!(self.last_time);
let max_time = cmp::max(cmp::max(now, *msg_time), *last_time);
if max_time == now {
*last_time = now;
} else if max_time == *msg_time {
*last_time = *msg_time + 1;
} else {
*last_time += 1;
}
Ok(())
}
}
```
```javascript
Timestamp.recv = function (msg) {
if (!clock) {
return null;
}
var now = Date.now();
var msg_time = msg.millis();
var msg_time = msg.counter();
if (msg_time - now > config.maxDrift) {
throw new Timestamp.ClockDriftError();
}
var last_time = clock.timestamp.millis();
var last_time = clock.timestamp.counter();
var max_time = Math.max(Math.max(last_time, now), msg_time);
var last_time =
max_time === last_time && lNew === msg_time
? Math.max(last_time, msg_time) + 1
: max_time === last_time
? last_time + 1
: max_time === msg_time
? msg_time + 1
: 0;
// 3.
if (max_time - phys > config.maxDrift) {
throw new Timestamp.ClockDriftError();
}
if (last_time > MAX_COUNTER) {
throw new Timestamp.OverflowError();
}
clock.timestamp.setMillis(max_time);
clock.timestamp.setCounter(last_time);
return new Timestamp(clock.timestamp.millis(), clock.timestamp.counter(), clock.timestamp.node());
};
```

View file

@ -1,115 +0,0 @@
# Owned Records
Node which owns the record is the sole source of truth for that record's state
# Shared Records
This includes Shared Record O-M Relations, since the foreign key is stored on the Many record.
## Create
```json
{
"type": "CREATE",
"recordId": "{uuid}",
"model": "{model}",
"data": {
"key": "value"
},
"node": "{uuid}",
"timestamp": {
"hybrid": "logical clock"
}
}
```
## Update
```json
{
"type": "UPDATE",
"recordId": "{uuid}",
"field": "{field}",
"value": "{value}",
"node": "{uuid}",
"timestamp": {
"hybrid": "logical clock"
}
}
```
## Delete
```json
{
"type": "DELETE",
"recordId": "{uuid}",
"node": "{uuid}",
"timestamp": {
"hybrid": "logical clock"
}
}
```
# Shared Record M-M Relations
x-M relations usually signify an item belonging to a group. 1-M relations are handled normally by Shared Records since the ID of the record is just the ID of the M record. M-M relations require custom handling since they are identified by the two records they join, so 2 create instructions
UNANSWERED: M-M relations that _can_ be duplicated. In this case, a single ID for the relation would suffice, in the same way that 1-M relations do.
NOTE: Ordering is very important when syncing relations. If a target of a relation doesn't exist, what should happen? This presents two options:
1. Don't use a foreign key, just join/fetch separately on a possibly non-existent foreign id. This is pretty cringe since Prisma only affords the niceties of relations if foreign keys are actually used.
2. Require that all operations are synced in order, independent of which node they were created on. This is nicer since it means that in order for a node to create a relation in the first place it must possess a message indicating creation of the relation target, but it is much more difficult to coordinate deletion of messages this way. Probably still doable though.
Option 2 is probably the best way to go, since having to do annoying joins and losing database ergonomics is not great. Additionally, option 2 would result in the ability to sync shared data between any two nodes, even if the node being synced from didn't create the operation in the first place.
## Create
```json
{
"type": "CREATE",
// Record that is being assigned to a group eg. a file
"relationItem": "{uuid}",
// Group that the record is being assigned to eg. a photo album
"relationGroup": "{uuid}",
// Name of the model which represents the relation
"relation": "model",
"node": "{uuid}",
"timestamp": {
"hybrid": "logical clock"
}
}
```
## Update
```json
{
"type": "UPDATE",
"relationItem": "{uuid}",
"relationGroup": "{uuid}",
"relation": "model",
"field": "field",
"value": "{value}",
"node": "{uuid}",
"timestamp": {
"hybrid": "logical clock"
}
}
```
## Delete
```json
{
"type": "DELETE",
"relationItem": "{uuid}",
"relationGroup": "{uuid}",
"relation": "relation",
"node": "{uuid}",
"timestamp": {
"hybrid": "logical clock"
}
}
```

View file

@ -1,52 +0,0 @@
# Prisma Schema
`prisma-crdt` introduces new attributes that must be applied to fields and models via triple slash documentation comments.
_Sync ID_: As well as having a primary key - denoted in Prisma with the `@id` field attribute - `prisma-crdt` introduces another ID - the _Sync ID_.
A model's Sync ID defaults to its regular ID, and is what identifies a model inside a sync operation.
Regular IDs are often not suitable for use inside a sync operation, as they may not be unique when sent to other nodes - eg. autoincrementing IDs - so something more unique can used, like a UUID.
## Model Attributes
#### `@local`
Model that is entirely disconnected from sync.
_Arguments_
- `id` (optional): Scalar field to override the default Sync ID.
#### `@owned`
Model that is synced via replicating from its owner to all other nodes, with the other nodes treating the model's owner as its single source of truth.
_Arguments_
- `owner`: Field that identifies the owner of this model. If a scalar, will directly use that value in sync operations. If a relation, the Sync ID of the related model will be resolved for sync operations.
- `id` (optional): Scalar field to override the default Sync ID.
#### `@shared`
Model that is synced via updates on a per-field basis.
_Arguments_
- `id` (optional): Scalar field to override the default Sync ID.
- `create` (optional): How the model should be created.
- `Unique` (default): Model can be created with many required arguemnts, but ID provided _must_ be unique across all nodes. Useful for Tags since their IDs are non-deterministic.
- `Atomic`: Require the model to have no required arguments apart from ID and apply all create arguments as atomic updates. Necessary for models with the same ID that can be created on multiple nodes. Useful for Files since their ID is dependent on their content, and could be the same across nodes.
#### `@relation`
Similar to `@shared`, but identified by the two records that it relates. Sync ID is always the combination of `item` and `group`.
_Arguments_
- `item`: Field that identifies the item that the relation links to. Operates like the `owner` argument of `@owned`.
- `group`: Field that identifies the group that the item should be related to. Operates like the `owner` argument of `@owned`.
## Field Attributes
#### `@node`
A relation whose value is automatically set to the current node. This could be done manually, but `@node` allows `node_id` fields to not be stored in `OwnedOperationData`, but rather be resolved from the `node_id` field of a `CRDTOperation`, saving on bandwidth.

View file

@ -1,6 +1,5 @@
[package]
name = "example-2"
default-run = "example-2"
name = "sd-sync-example"
version = "0.1.0"
edition = "2021"
rust-version = "1.64"
@ -12,9 +11,9 @@ serde = { version = "1.0.145", features = ["derive"] }
axum = "0.5.16"
rspc = { workspace = true, features = ["axum"] }
tokio = { version = "1.21.2", features = ["full"] }
# prisma-client-rust = { workspace = true }
prisma-client-rust = { workspace = true }
dotenv = "0.15.0"
tower-http = { version = "0.3.4", features = ["cors"] }
sd-sync = { path = "../.." }
sd-sync = { path = ".." }
uuid = { version = "1.1.2", features = ["v4"] }
http = "0.2.8"

File diff suppressed because one or more lines are too long

View file

@ -1,12 +0,0 @@
[package]
name = "prisma-cli"
version = "0.1.0"
edition = "2021"
[dependencies]
prisma-client-rust-cli = { workspace = true, features = [
"migrations",
] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.13.0", features = ["full"] }

View file

@ -8,11 +8,27 @@ datasource db {
generator client {
provider = "cargo prisma"
output = "../api/src/prisma.rs"
output = "../src/prisma.rs"
}
model User {
id Int @id @default(autoincrement())
name String
email String @unique
generator sync {
provider = "cargo run -p prisma-cli --bin sync --"
output = "../src/prisma_sync.rs"
}
/// @owned
model FilePath {
id Bytes @id
path String
object Object? @relation(fields: [object_id], references: [id])
object_id Bytes?
}
/// @shared
model Object {
id Bytes @id
name String
paths FilePath[] @relation()
}

View file

@ -1,3 +0,0 @@
fn main() {
prisma_client_rust_cli::run();
}

View file

@ -8,9 +8,11 @@ use std::path::PathBuf;
use tokio::sync::Mutex;
use uuid::Uuid;
#[derive(Default)]
use crate::prisma::{file_path, PrismaClient};
pub struct Ctx {
pub dbs: HashMap<Uuid, Db>,
pub prisma: PrismaClient,
}
type Router = rspc::Router<Arc<Mutex<Ctx>>>;
@ -25,8 +27,24 @@ fn to_map(v: &impl serde::Serialize) -> serde_json::Map<String, Value> {
pub(crate) fn new() -> RouterBuilder<Arc<Mutex<Ctx>>> {
Router::new()
.config(Config::new().export_ts_bindings(
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../web/src/utils/bindings.ts"),
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("web/src/utils/bindings.ts"),
))
.mutation("testCreate", |r| {
r(|ctx, _: String| async move {
let prisma = &ctx.lock().await.prisma;
let res = prisma
.file_path()
.create(vec![], String::new(), vec![])
.exec_raw()
.await
.unwrap();
file_path::Create::operation_from_data(&res);
Ok(())
})
})
.mutation("createDatabase", |r| {
r(|ctx, _: String| async move {
let dbs = &mut ctx.lock().await.dbs;
@ -99,7 +117,10 @@ pub(crate) fn new() -> RouterBuilder<Arc<Mutex<Ctx>>> {
model: "FilePath".to_string(),
items: vec![OwnedOperationItem {
id: serde_json::to_value(id).unwrap(),
data: OwnedOperationData::Create(to_map(&file_path)),
data: OwnedOperationData::Create(
serde_json::from_value(serde_json::to_value(&file_path).unwrap())
.unwrap(),
),
}],
}));

View file

@ -1,3 +1,4 @@
use api::Ctx;
use axum::{
http::{HeaderValue, Method},
routing::get,
@ -7,13 +8,17 @@ use tokio::sync::Mutex;
use tower_http::cors::CorsLayer;
mod api;
// mod prisma;
mod prisma;
mod prisma_sync;
mod utils;
fn router() -> axum::Router {
async fn router() -> axum::Router {
let router = api::new().build().arced();
let ctx = Arc::new(Mutex::new(Default::default()));
let ctx = Arc::new(Mutex::new(Ctx {
dbs: Default::default(),
prisma: prisma::new_client().await.unwrap(),
}));
axum::Router::new()
.route("/", get(|| async { "Hello 'rspc'!" }))
@ -33,7 +38,7 @@ async fn main() {
let addr = "[::]:9000".parse::<SocketAddr>().unwrap(); // This listens on IPv6 and IPv4
println!("{} listening on http://{}", env!("CARGO_CRATE_NAME"), addr);
axum::Server::bind(&addr)
.serve(router().into_make_service())
.serve(router().await.into_make_service())
.with_graceful_shutdown(utils::axum_shutdown_signal())
.await
.expect("Error with HTTP server!");

View file

@ -5,144 +5,154 @@ import { tests } from './test';
import { CRDTOperationType, rspc } from './utils/rspc';
export function App() {
const dbs = rspc.useQuery(['dbs', 'cringe']);
const dbs = rspc.useQuery(['dbs', 'cringe']);
const operations = rspc.useQuery(['operations', 'cringe']);
const operations = rspc.useQuery(['operations', 'cringe']);
const createDb = rspc.useMutation('createDatabase');
const removeDbs = rspc.useMutation('removeDatabases');
const createDb = rspc.useMutation('createDatabase');
const removeDbs = rspc.useMutation('removeDatabases');
const testCreate = rspc.useMutation('testCreate');
return (
<div className="w-screen h-screen flex flex-row divide-x divide-gray-300">
<div className="p-2 space-y-2 flex flex-col">
<div className="space-x-2">
<button className={ButtonStyles} onClick={() => createDb.mutate('pullOperations')}>
Add Database
</button>
<button className={ButtonStyles} onClick={() => removeDbs.mutate('pullOperations')}>
Remove Databases
</button>
</div>
<ul className="w-full">
{Object.entries(tests).map(([key, test]) => (
<li key={key}>
<button className="p-2 bg-green-300" onClick={() => test.run()}>
{test.name}
</button>
</li>
))}
</ul>
</div>
<div className="flex-1">
<ul className="p-2 gap-2 flex flex-row flex-wrap">
{dbs.data?.map((id) => (
<Suspense fallback={null} key={id}>
<DatabaseView id={id} />
</Suspense>
))}
</ul>
</div>
<div className="w-96 p-2 flex flex-col items-stretch">
<h1 className="text-center font-bold text-2xl">All Operations</h1>
<ul className="space-y-2">
{operations.data?.map((op) => (
<li key={op.id} className="bg-indigo-200 rounded-md p-2">
<p className="truncate">ID: {op.id}</p>
<p className="truncate">Timestamp: {op.timestamp.toString()}</p>
<p className="truncate">Node: {op.node}</p>
</li>
))}
</ul>
</div>
</div>
);
return (
<div className="w-screen h-screen flex flex-row divide-x divide-gray-300">
<div className="p-2 space-y-2 flex flex-col">
<div className="space-x-2">
<button className={ButtonStyles} onClick={() => createDb.mutate('pullOperations')}>
Add Database
</button>
<button className={ButtonStyles} onClick={() => removeDbs.mutate('pullOperations')}>
Remove Databases
</button>
<button className={ButtonStyles} onClick={() => testCreate.mutate('testCreate')}>
Test Create
</button>
</div>
<ul className="w-full">
{Object.entries(tests).map(([key, test]) => (
<li key={key}>
<button className="p-2 bg-green-300" onClick={() => test.run()}>
{test.name}
</button>
</li>
))}
</ul>
</div>
<div className="flex-1">
<ul className="p-2 gap-2 flex flex-row flex-wrap">
{dbs.data?.map((id) => (
<Suspense fallback={null} key={id}>
<DatabaseView id={id} />
</Suspense>
))}
</ul>
</div>
<div className="w-96 p-2 flex flex-col items-stretch">
<h1 className="text-center font-bold text-2xl">All Operations</h1>
<ul className="space-y-2">
{operations.data?.map((op) => (
<li key={op.id} className="bg-indigo-200 rounded-md p-2">
<p className="truncate">ID: {op.id}</p>
<p className="truncate">Timestamp: {op.timestamp.toString()}</p>
<p className="truncate">Node: {op.node}</p>
</li>
))}
</ul>
</div>
</div>
);
}
interface DatabaseViewProps {
id: string;
id: string;
}
const TABS = ['File Paths', 'Objects', 'Tags', 'Operations'];
function DatabaseView(props: DatabaseViewProps) {
const [currentTab, setCurrentTab] = useState<typeof TABS[number]>('Operations');
const [currentTab, setCurrentTab] = useState<typeof TABS[number]>('Operations');
const pullOperations = rspc.useMutation('pullOperations');
const pullOperations = rspc.useMutation('pullOperations');
return (
<div className="bg-indigo-300 rounded-md min-w-[32rem] flex-1 overflow-hidden">
<div className="flex flex-row justify-between items-center mx-2">
<h1 className="p-2 text-xl font-medium">{props.id}</h1>
<button className={ButtonStyles} onClick={() => pullOperations.mutate(props.id)}>
Pull Operations
</button>
</div>
<div>
<nav className="space-x-2">
{TABS.map((tab) => (
<button
key={tab}
className={clsx('px-2 py-1', tab === currentTab && 'bg-indigo-400')}
onClick={() => setCurrentTab(tab)}
>
{tab}
</button>
))}
</nav>
<Suspense>
{currentTab === 'File Paths' && <FilePathList db={props.id} />}
{currentTab === 'Operations' && <OperationList db={props.id} />}
</Suspense>
</div>
</div>
);
return (
<div className="bg-indigo-300 rounded-md min-w-[32rem] flex-1 overflow-hidden">
<div className="flex flex-row justify-between items-center mx-2">
<h1 className="p-2 text-xl font-medium">{props.id}</h1>
<button className={ButtonStyles} onClick={() => pullOperations.mutate(props.id)}>
Pull Operations
</button>
</div>
<div>
<nav className="space-x-2">
{TABS.map((tab) => (
<button
key={tab}
className={clsx('px-2 py-1', tab === currentTab && 'bg-indigo-400')}
onClick={() => setCurrentTab(tab)}
>
{tab}
</button>
))}
</nav>
<Suspense>
{currentTab === 'File Paths' && <FilePathList db={props.id} />}
{currentTab === 'Operations' && <OperationList db={props.id} />}
</Suspense>
</div>
</div>
);
}
function FilePathList(props: { db: string }) {
const createFilePath = rspc.useMutation('file_path.create');
const filePaths = rspc.useQuery(['file_path.list', props.db]);
const createFilePath = rspc.useMutation('file_path.create');
const filePaths = rspc.useQuery(['file_path.list', props.db]);
return (
<div >
{filePaths.data && (
<ul className="font-mono">
{filePaths.data.sort((a, b) => a.id.localeCompare(b.id)).map((path) => (
<li key={path.id}>{JSON.stringify(path)}</li>
))}
</ul>
)}
<button className="text-center" onClick={() => createFilePath.mutate(props.db)}>
Create
</button>
</div>
);
return (
<div>
{filePaths.data && (
<ul className="font-mono">
{filePaths.data
.sort((a, b) => a.id.localeCompare(b.id))
.map((path) => (
<li key={path.id}>{JSON.stringify(path)}</li>
))}
</ul>
)}
<button className="text-center" onClick={() => createFilePath.mutate(props.db)}>
Create
</button>
</div>
);
}
function messageType(msg: CRDTOperationType) {
if ('items' in msg) {
return 'Owned';
} else if ('record_id' in msg) {
return 'Shared';
}
if ('items' in msg) {
return 'Owned';
} else if ('record_id' in msg) {
return 'Shared';
}
}
function OperationList(props: { db: string }) {
const messages = rspc.useQuery(['message.list', props.db]);
const messages = rspc.useQuery(['message.list', props.db]);
return (
<div>
{messages.data && (
<table className="font-mono border-spacing-x-4 border-separate">
{messages.data.sort((a, b) => Number(a.timestamp - b.timestamp)).map((message) => (
<tr key={message.id}>
<td className="border border-transparent">{message.id}</td>
<td className="border border-transparent">{new Date(Number(message.timestamp) / 10000000).toLocaleTimeString()}</td>
<td className="border border-transparent">{messageType(message.typ)}</td>
</tr>
))}
</table>
)}
</div>
);
return (
<div>
{messages.data && (
<table className="font-mono border-spacing-x-4 border-separate">
{messages.data
.sort((a, b) => Number(a.timestamp - b.timestamp))
.map((message) => (
<tr key={message.id}>
<td className="border border-transparent">{message.id}</td>
<td className="border border-transparent">
{new Date(Number(message.timestamp) / 10000000).toLocaleTimeString()}
</td>
<td className="border border-transparent">{messageType(message.typ)}</td>
</tr>
))}
</table>
)}
</div>
);
}
const ButtonStyles = 'bg-blue-500 text-white px-2 py-1 rounded-md';

View file

@ -11,7 +11,8 @@ export type Procedures = {
{ key: "createDatabase", input: string, result: string } |
{ key: "file_path.create", input: string, result: FilePath } |
{ key: "pullOperations", input: string, result: null } |
{ key: "removeDatabases", input: string, result: null },
{ key: "removeDatabases", input: string, result: null } |
{ key: "testCreate", input: string, result: null },
subscriptions: never
};

View file

@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{collections::BTreeMap, fmt::Debug};
use rspc::Type;
use serde::{Deserialize, Serialize};
@ -23,11 +23,14 @@ pub struct RelationOperation {
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub enum SharedOperationCreateData {
#[serde(rename = "u")]
Unique(Map<String, Value>),
#[serde(rename = "a")]
Atomic,
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
#[serde(untagged)]
pub enum SharedOperationData {
Create(SharedOperationCreateData),
Update { field: String, value: Value },
@ -36,15 +39,19 @@ pub enum SharedOperationData {
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub struct SharedOperation {
pub record_id: Uuid,
pub record_id: Value,
pub model: String,
pub data: SharedOperationData,
}
#[derive(Serialize, Deserialize, Clone, Debug, Type)]
pub enum OwnedOperationData {
Create(Map<String, Value>),
Update(Map<String, Value>),
Create(BTreeMap<String, Value>),
CreateMany {
values: Vec<(Value, BTreeMap<String, Value>)>,
skip_duplicates: bool,
},
Update(BTreeMap<String, Value>),
Delete,
}

View file

@ -192,8 +192,11 @@ impl Db {
match item.data {
OwnedOperationData::Create(data) => {
self.file_paths
.insert(id, from_value(Value::Object(data)).unwrap());
self.file_paths.insert(
id,
from_value(Value::Object(data.into_iter().collect()))
.unwrap(),
);
}
OwnedOperationData::Update(data) => {
let obj = self.file_paths.get_mut(&id).unwrap();
@ -252,11 +255,13 @@ impl Db {
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use super::*;
fn to_map(v: &impl serde::Serialize) -> serde_json::Map<String, Value> {
fn to_map(v: &impl serde::Serialize) -> BTreeMap<String, Value> {
match to_value(&v).unwrap() {
Value::Object(m) => m,
Value::Object(m) => m.into_iter().collect(),
_ => unreachable!(),
}
}

View file

@ -1,5 +1,58 @@
mod crdt;
mod db;
// mod db;
pub use crdt::*;
pub use db::*;
// pub use db::*;
use prisma_client_rust::ModelActions;
use serde_value::Value;
use std::collections::BTreeMap;
pub trait CreateCRDTMutation<T: ModelActions> {
fn operation_from_data(
d: &BTreeMap<String, Value>,
typ: CreateOperationType,
) -> CRDTOperationType;
}
pub enum CreateOperationType {
Owned,
SharedUnique,
SharedAtomic,
Relation,
}
impl<T: ModelActions> CreateCRDTMutation<T> for prisma_client_rust::Create<'_, T> {
fn operation_from_data(
_: &BTreeMap<String, Value>,
typ: CreateOperationType,
) -> CRDTOperationType {
match typ {
CreateOperationType::Owned => {
todo!()
// let id = serde_json::to_value(
// d.iter()
// .filter(|(field, _)| T::id_fields().iter().any(|f| f == field))
// .collect::<BTreeMap<_, _>>(),
// )
// .unwrap();
// CRDTOperationType::Owned(OwnedOperation {
// model: T::MODEL.to_string(),
// items: [OwnedOperationItem {
// id,
// data: OwnedOperationData::Create(
// d.clone()
// .into_iter()
// .filter(|(field, _)| T::id_fields().iter().all(|f| f != field))
// .map(|(k, v)| (k, serde_json::to_value(v).unwrap()))
// .collect(),
// ),
// }]
// .to_vec(),
// })
}
_ => todo!(),
}
}
}

View file

@ -93,7 +93,7 @@ export type ExplorerContext = { type: "Location" } & Location | { type: "Tag" }
export interface ExplorerData { context: ExplorerContext, items: Array<ExplorerItem> }
export type ExplorerItem = { type: "Path" } & { id: number, is_dir: boolean, location_id: number, materialized_path: string, name: string, extension: string | null, object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, object: Object | null } | { type: "Object" } & { id: number, cas_id: string, integrity_checksum: string | null, name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, file_paths: Array<FilePath> }
export type ExplorerItem = { type: "Path" } & FilePathWithObject | { type: "Object" } & ObjectWithFilePaths
export interface FileDecryptorJobInit { location_id: number, object_id: number, output_path: string | null, password: string | null, save_to_library: boolean | null }
@ -188,3 +188,7 @@ export interface TagCreateArgs { name: string, color: string }
export interface TagUpdateArgs { id: number, name: string | null, color: string | null }
export interface Volume { name: string, mount_point: string, total_capacity: bigint, available_capacity: bigint, is_removable: boolean, disk_type: string | null, file_system: string | null, is_root_filesystem: boolean }
export interface FilePathWithObject { id: number, is_dir: boolean, location_id: number, materialized_path: string, name: string, extension: string | null, object_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, object: Object | null }
export interface ObjectWithFilePaths { id: number, cas_id: string, integrity_checksum: string | null, name: string | null, extension: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, file_paths: Array<FilePath> }