[ENG-906] Initial library sync (#1095)

* ffs

* typo

* yeet library data over p2p

* fix a bunch of edge cases

* report complete status on responder

* better log

* fix types

* mobile debug screen

* mobile + P2P is a mess

* feature flag mobile p2p pairing

* wrong one

---------

Co-authored-by: Brendan Allan <brendonovich@outlook.com>
This commit is contained in:
Oscar Beaumont 2023-07-17 19:53:25 +08:00 committed by GitHub
parent 20eae57e75
commit 53ab3178c2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 788 additions and 94 deletions

View file

@ -32,6 +32,7 @@
"@tanstack/react-query": "^4.29.1",
"class-variance-authority": "^0.5.3",
"dayjs": "^1.11.8",
"event-target-polyfill": "^0.0.3",
"expo": "~48.0.19",
"expo-linking": "~4.0.1",
"expo-media-library": "~15.2.3",

View file

@ -19,6 +19,8 @@ import { useSnapshot } from 'valtio';
import {
ClientContextProvider,
LibraryContextProvider,
NotificationContextProvider,
P2PContextProvider,
RspcProvider,
initPlausible,
useClientContext,
@ -30,6 +32,7 @@ import { useTheme } from './hooks/useTheme';
import { changeTwTheme, tw } from './lib/tailwind';
import RootNavigator from './navigation';
import OnboardingNavigator from './navigation/OnboardingNavigator';
import { P2P } from './screens/p2p';
import { currentLibraryStore } from './utils/nav';
dayjs.extend(advancedFormat);
@ -111,7 +114,12 @@ function AppContainer() {
<BottomSheetModalProvider>
<StatusBar style="light" />
<ClientContextProvider currentLibraryId={id}>
<AppNavigation />
<P2PContextProvider>
<P2P />
<NotificationContextProvider>
<AppNavigation />
</NotificationContextProvider>
</P2PContextProvider>
</ClientContextProvider>
</BottomSheetModalProvider>
</MenuProvider>

View file

@ -1,4 +1,5 @@
import AsyncStorage from '@react-native-async-storage/async-storage';
import 'event-target-polyfill';
import * as SplashScreen from 'expo-splash-screen';
import { Suspense, lazy } from 'react';
import { Platform } from 'react-native';
@ -7,6 +8,17 @@ import { reactNativeLink } from './lib/rspcReactNativeTransport';
// Enable the splash screen
SplashScreen.preventAutoHideAsync();
// The worlds worse pollyfill for "CustomEvent". I tried "custom-event-pollyfill" from npm but it uses `document` :(
if (typeof globalThis.CustomEvent !== 'function') {
// @ts-expect-error
globalThis.CustomEvent = (event, params) => {
const evt = new Event(event, params);
// @ts-expect-error
evt.detail = params.detail;
return evt;
};
}
const _localStorage = new Map<string, string>();
// We patch stuff onto `globalThis` so that `@sd/client` can use it. This is super hacky but as far as I can tell, there's no better way to do this.

View file

@ -7,6 +7,7 @@ import GeneralSettingsScreen from '~/screens/settings/client/GeneralSettings';
import LibrarySettingsScreen from '~/screens/settings/client/LibrarySettings';
import PrivacySettingsScreen from '~/screens/settings/client/PrivacySettings';
import AboutScreen from '~/screens/settings/info/About';
import DebugScreen from '~/screens/settings/info/Debug';
import SupportScreen from '~/screens/settings/info/Support';
import EditLocationSettingsScreen from '~/screens/settings/library/EditLocationSettings';
// import KeysSettingsScreen from '~/screens/settings/library/KeysSettings';
@ -104,6 +105,11 @@ export default function SettingsNavigator() {
component={SupportScreen}
options={{ headerTitle: 'Support' }}
/>
<SettingsStack.Screen
name="Debug"
component={DebugScreen}
options={{ headerTitle: 'Debug' }}
/>
</SettingsStack.Navigator>
);
}
@ -130,6 +136,7 @@ export type SettingsStackParamList = {
// Info
About: undefined;
Support: undefined;
Debug: undefined;
};
export type SettingsStackScreenProps<Screen extends keyof SettingsStackParamList> =

View file

@ -0,0 +1,26 @@
import { useBridgeMutation, useFeatureFlag, useLibraryContext, useP2PEvents } from '@sd/client';
export function P2P() {
// const pairingResponse = useBridgeMutation('p2p.pairingResponse');
// const activeLibrary = useLibraryContext();
const pairingEnabled = useFeatureFlag('p2pPairing');
useP2PEvents((data) => {
if (data.type === 'PairingRequest' && pairingEnabled) {
console.log('Pairing incoming from', data.name);
// TODO: open pairing screen and guide user through the process. For now we auto-accept
// pairingResponse.mutate([
// data.id,
// { decision: 'accept', libraryId: activeLibrary.library.uuid }
// ]);
}
// TODO: For now until UI is implemented
if (data.type === 'PairingProgress') {
console.log('Pairing progress', data);
}
});
return null;
}

View file

@ -1,6 +1,7 @@
import {
Books,
FlyingSaucer,
Gear,
GearSix,
HardDrive,
Heart,
@ -12,7 +13,8 @@ import {
TagSimple
} from 'phosphor-react-native';
import React from 'react';
import { SectionList, Text, View } from 'react-native';
import { SectionList, Text, TouchableWithoutFeedback, View } from 'react-native';
import { DebugState, useDebugState, useDebugStateEnabler } from '@sd/client';
import { SettingsItem, SettingsItemDivider } from '~/components/settings/SettingsItem';
import { tw, twStyle } from '~/lib/tailwind';
import { SettingsStackParamList, SettingsStackScreenProps } from '~/navigation/SettingsNavigator';
@ -26,7 +28,7 @@ type SectionType = {
}[];
};
const sections: SectionType[] = [
const sections: (debugState: DebugState) => SectionType[] = (debugState) => [
{
title: 'Client',
data: [
@ -99,7 +101,16 @@ const sections: SectionType[] = [
icon: Heart,
navigateTo: 'Support',
title: 'Support'
}
},
...(debugState.enabled
? ([
{
icon: Gear,
navigateTo: 'Debug',
title: 'Debug'
}
] as const)
: [])
]
}
];
@ -118,10 +129,12 @@ function renderSectionHeader({ section }: { section: { title: string } }) {
}
export default function SettingsScreen({ navigation }: SettingsStackScreenProps<'Home'>) {
const debugState = useDebugState();
return (
<View style={tw`flex-1`}>
<SectionList
sections={sections}
sections={sections(debugState)}
contentContainerStyle={tw`py-4`}
ItemSeparatorComponent={SettingsItemDivider}
renderItem={({ item }) => (
@ -132,13 +145,7 @@ export default function SettingsScreen({ navigation }: SettingsStackScreenProps<
/>
)}
renderSectionHeader={renderSectionHeader}
ListFooterComponent={
<View style={tw`mb-4 mt-6 items-center`}>
<Text style={tw`text-base font-bold text-ink`}>Spacedrive</Text>
{/* TODO: Get this automatically (expo-device have this?) */}
<Text style={tw`mt-0.5 text-xs font-medium text-ink-faint`}>v0.1.0</Text>
</View>
}
ListFooterComponent={<FooterComponent />}
showsVerticalScrollIndicator={false}
stickySectionHeadersEnabled={false}
initialNumToRender={50}
@ -146,3 +153,17 @@ export default function SettingsScreen({ navigation }: SettingsStackScreenProps<
</View>
);
}
function FooterComponent() {
const onClick = useDebugStateEnabler();
return (
<View style={tw`mb-4 mt-6 items-center`}>
<TouchableWithoutFeedback onPress={onClick}>
<Text style={tw`text-base font-bold text-ink`}>Spacedrive</Text>
</TouchableWithoutFeedback>
{/* TODO: Get this automatically (expo-device have this?) */}
<Text style={tw`mt-0.5 text-xs font-medium text-ink-faint`}>v0.1.0</Text>
</View>
);
}

View file

@ -0,0 +1,35 @@
import React from 'react';
import { Text, View } from 'react-native';
import { getDebugState, toggleFeatureFlag, useDebugState, useFeatureFlags } from '@sd/client';
import { Button } from '~/components/primitive/Button';
import { tw } from '~/lib/tailwind';
import { SettingsStackScreenProps } from '~/navigation/SettingsNavigator';
const DebugScreen = ({ navigation }: SettingsStackScreenProps<'Debug'>) => {
const debugState = useDebugState();
const featureFlags = useFeatureFlags();
return (
<View>
<Text style={tw`text-ink`}>Debug</Text>
<Button onPress={() => toggleFeatureFlag(['p2pPairing', 'spacedrop'])}>
<Text style={tw`text-ink`}>Toggle P2P</Text>
</Button>
<Button onPress={() => (getDebugState().rspcLogger = !getDebugState().rspcLogger)}>
<Text style={tw`text-ink`}>Toggle rspc logger</Text>
</Button>
<Text style={tw`text-ink`}>{JSON.stringify(featureFlags)}</Text>
<Text style={tw`text-ink`}>{JSON.stringify(debugState)}</Text>
<Button
onPress={() => {
navigation.popToTop();
navigation.replace('Home');
getDebugState().enabled = false;
}}
>
<Text style={tw`text-ink`}>Disable Debug Mode</Text>
</Button>
</View>
);
};
export default DebugScreen;

View file

@ -1,12 +1,46 @@
import React from 'react';
import { Text, View } from 'react-native';
import { isEnabled, useBridgeMutation, useDiscoveredPeers } from '@sd/client';
import { Button } from '~/components/primitive/Button';
import { tw } from '~/lib/tailwind';
import { SettingsStackScreenProps } from '~/navigation/SettingsNavigator';
const NodesSettingsScreen = ({ navigation }: SettingsStackScreenProps<'NodesSettings'>) => {
const onlineNodes = useDiscoveredPeers();
const p2pPair = useBridgeMutation('p2p.pair', {
onSuccess(data) {
console.log(data);
}
});
return (
<View>
<Text style={tw`text-ink`}>TODO</Text>
<Text style={tw`text-ink`}>Pairing</Text>
{[...onlineNodes.entries()].map(([id, node]) => (
<View key={id} style={tw`flex`}>
<Text style={tw`text-ink`}>{node.name}</Text>
<Button
onPress={() => {
if (!isEnabled('p2pPairing')) {
alert('P2P Pairing is not enabled!');
}
// TODO: This is not great
p2pPair.mutateAsync(id).then((id) => {
// TODO: Show UI lmao
// startPairing(id, {
// name: node.name,
// os: node.operating_system
// });
});
}}
>
<Text>Pair</Text>
</Button>
</View>
))}
</View>
);
};

View file

@ -35,7 +35,7 @@ pub struct LibraryConfig {
#[async_trait::async_trait]
impl Migrate for LibraryConfig {
const CURRENT_VERSION: u32 = 7;
const CURRENT_VERSION: u32 = 8;
type Ctx = (NodeConfig, Arc<PrismaClient>);
@ -200,7 +200,7 @@ impl Migrate for LibraryConfig {
if instances.len() > 1 {
return Err(MigratorError::Custom(
"7 - More than one node found in the DB... This can't be automatically reconciled!"
"7 - More than one instance found in the DB... This can't be automatically reconciled!"
.into(),
));
}
@ -210,6 +210,9 @@ impl Migrate for LibraryConfig {
));
};
config.remove("instance_id");
config.insert("instance_id".into(), Value::Number(instance.id.into()));
// We are relinking all locations to the current instance.
// If you have more than one node in your database and your not @Oscar, something went horribly wrong so this is fine.
db.location()
@ -217,6 +220,18 @@ impl Migrate for LibraryConfig {
.exec()
.await?;
}
8 => {
let instances = db.instance().find_many(vec![]).exec().await?;
let Some(instance) = instances.first() else {
return Err(MigratorError::Custom(
"8 - No nodes found... How did you even get this far?!".into(),
));
};
// This should be in 7 but it's added to ensure to hell it runs.
config.remove("instance_id");
config.insert("instance_id".into(), Value::Number(instance.id.into()));
}
v => unreachable!("Missing migration for library version {}", v),
}

View file

@ -169,6 +169,7 @@ impl LibraryManager {
node_context.clone(),
&subscribers,
None,
true,
)
.await?,
);
@ -202,7 +203,7 @@ impl LibraryManager {
description: Option<String>,
node_cfg: NodeConfig,
) -> Result<LibraryConfigWrapped, LibraryManagerError> {
self.create_with_uuid(Uuid::new_v4(), name, description, node_cfg)
self.create_with_uuid(Uuid::new_v4(), name, description, node_cfg, true)
.await
}
@ -212,6 +213,7 @@ impl LibraryManager {
name: LibraryName,
description: Option<String>,
node_cfg: NodeConfig,
should_seed: bool,
) -> Result<LibraryConfigWrapped, LibraryManagerError> {
if name.as_ref().is_empty() || name.as_ref().chars().all(|x| x.is_whitespace()) {
return Err(LibraryManagerError::InvalidConfig(
@ -251,16 +253,17 @@ impl LibraryManager {
date_created: now,
_params: vec![instance::id::set(config.instance_id)],
}),
should_seed,
)
.await?;
debug!("Loaded library '{id:?}'");
// Run seeders
tag::seed::new_library(&library).await?;
indexer::rules::seed::new_or_existing_library(&library).await?;
debug!("Seeded library '{id:?}'");
if should_seed {
tag::seed::new_library(&library).await?;
indexer::rules::seed::new_or_existing_library(&library).await?;
debug!("Seeded library '{id:?}'");
}
invalidate_query!(library, "library.list");
@ -398,6 +401,7 @@ impl LibraryManager {
node_context: NodeContext,
subscribers: &RwLock<Vec<Box<dyn SubscriberFn>>>,
create: Option<instance::Create>,
should_seed: bool,
) -> Result<Library, LibraryManagerError> {
let db_path = db_path.as_ref();
let db_url = format!(
@ -476,7 +480,10 @@ impl LibraryManager {
identity,
};
indexer::rules::seed::new_or_existing_library(&library).await?;
if should_seed {
library.orphan_remover.invoke().await;
indexer::rules::seed::new_or_existing_library(&library).await?;
}
for location in library
.db

View file

@ -14,50 +14,44 @@ impl OrphanRemoverActor {
pub fn spawn(db: Arc<PrismaClient>) -> Self {
let (tx, mut rx) = channel(4);
tokio::spawn({
let tx = tx.clone();
async move {
tx.send(()).await.ok();
tokio::spawn(async move {
while let Some(()) = rx.recv().await {
// prevents timeouts
tokio::time::sleep(Duration::from_millis(10)).await;
while let Some(()) = rx.recv().await {
// prevents timeouts
tokio::time::sleep(Duration::from_millis(10)).await;
loop {
let objs = match db
.object()
.find_many(vec![object::file_paths::none(vec![])])
.take(512)
.select(object::select!({ id pub_id }))
.exec()
.await
{
Ok(objs) => objs,
Err(e) => {
error!("Failed to fetch orphaned objects: {e}");
break;
}
};
if objs.is_empty() {
loop {
let objs = match db
.object()
.find_many(vec![object::file_paths::none(vec![])])
.take(512)
.select(object::select!({ id pub_id }))
.exec()
.await
{
Ok(objs) => objs,
Err(e) => {
error!("Failed to fetch orphaned objects: {e}");
break;
}
};
debug!("Removing {} orphaned objects", objs.len());
if objs.is_empty() {
break;
}
let ids: Vec<_> = objs.iter().map(|o| o.id).collect();
debug!("Removing {} orphaned objects", objs.len());
if let Err(e) = db
._batch((
db.tag_on_object().delete_many(vec![
tag_on_object::object_id::in_vec(ids.clone()),
]),
db.object().delete_many(vec![object::id::in_vec(ids)]),
))
.await
{
error!("Failed to remove orphaned objects: {e}");
}
let ids: Vec<_> = objs.iter().map(|o| o.id).collect();
if let Err(e) = db
._batch((
db.tag_on_object()
.delete_many(vec![tag_on_object::object_id::in_vec(ids.clone())]),
db.object().delete_many(vec![object::id::in_vec(ids)]),
))
.await
{
error!("Failed to remove orphaned objects: {e}");
}
}
}

View file

@ -0,0 +1,368 @@
use sd_prisma::prisma::*;
// TODO: Turn this entire file into a Prisma generator cause it could be way more maintainable
// Pairing will fail if the two clients aren't on versions with identical DB models so it's safe to send them and ignore migrations.
const ITEMS_PER_BATCH: i64 = 1000;
macro_rules! impl_for_models {
($($variant:ident($model:ident)),* $(,)+) => {
/// Represents any DB model to be ingested into the database as part of the initial sync
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum ModelData {
$(
$variant(Vec<$model::Data>),
)*
}
impl ModelData {
/// Length of data
pub fn len(&self) -> usize {
match self {
$(
Self::$variant(data) => data.len(),
)*
}
}
/// Get count of all of the rows in the database
pub async fn total_count(db: &PrismaClient) -> Result<i64, prisma_client_rust::QueryError> {
let mut total_count = 0;
let ($( $model ),*) = tokio::join!(
$(
db.$model().count(vec![]).exec(),
)*
);
$(total_count += $model?;)*
Ok(total_count)
}
/// Insert the data into the database
pub async fn insert(self, db: &PrismaClient) -> Result<(), prisma_client_rust::QueryError> {
match self {
$(
Self::$variant(data) => {
// TODO: Prisma Client Rust is broke
// db.$model().create_many(data.into_iter().map(|v| FromData(v).into()).collect()).exec().await?;
for i in data {
$model::CreateUnchecked::from(FromData(i)).to_query(db).exec().await?;
}
}
)*
}
Ok(())
}
}
/// This exists to determine the next model to sync.
/// It emulates `.window()` functionality but for a `macro_rules`
// TODO: When replacing with a generator this can be removed and done at compile time
#[derive(Debug)]
enum ModelSyncCursorIterator {
Done = 0,
$(
$variant,
)*
}
impl<'a> From<&'a ModelSyncCursor> for ModelSyncCursorIterator {
fn from(cursor: &'a ModelSyncCursor) -> Self {
match cursor {
$(
ModelSyncCursor::$variant(_) => Self::$variant,
)*
ModelSyncCursor::Done => Self::Done,
}
}
}
impl ModelSyncCursorIterator {
pub fn next(self) -> ModelSyncCursor {
let i = self as i32;
match i + 1 {
$(
v if v == Self::$variant as i32 => ModelSyncCursor::$variant(0),
)*
_ => ModelSyncCursor::Done,
}
}
}
/// Represent where we ar eup to with the sync
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum ModelSyncCursor {
$(
$variant(i64),
)*
Done,
}
impl ModelSyncCursor {
pub fn new() -> Self {
new_impl!($( $variant ),*)
}
pub async fn next(&mut self, db: &PrismaClient) -> Option<Result<ModelData, prisma_client_rust::QueryError>> {
match self {
$(
Self::$variant(cursor) => {
match db.$model()
.find_many(vec![])
.skip(*cursor)
.take(ITEMS_PER_BATCH + 1)
.exec()
.await {
Ok(data) => {
if data.len() <= ITEMS_PER_BATCH as usize {
*self = ModelSyncCursorIterator::from(&*self).next();
} else {
*self = Self::$variant(*cursor + ITEMS_PER_BATCH);
}
Some(Ok(ModelData::$variant(data)))
},
Err(e) => return Some(Err(e)),
}
},
)*
Self::Done => None
}
}
}
};
}
macro_rules! new_impl {
($x:ident, $($y:ident),+) => {
Self::$x(0)
};
}
impl PartialEq for ModelData {
// Crude EQ impl based only on ID's not struct content.
// It's super annoying PCR does have this impl but it kinda makes sense with relation fetching.
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(ModelData::SharedOperation(a), ModelData::SharedOperation(b)) => a
.iter()
.map(|x| x.id.clone())
.eq(b.iter().map(|x| x.id.clone())),
(ModelData::Volume(a), ModelData::Volume(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::Location(a), ModelData::Location(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::FilePath(a), ModelData::FilePath(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::Object(a), ModelData::Object(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::Tag(a), ModelData::Tag(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::TagOnObject(a), ModelData::TagOnObject(b)) => a
.iter()
.map(|x| (x.tag_id, x.object_id))
.eq(b.iter().map(|x| (x.tag_id, x.object_id))),
(ModelData::IndexerRule(a), ModelData::IndexerRule(b)) => {
a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id))
}
(ModelData::IndexerRulesInLocation(a), ModelData::IndexerRulesInLocation(b)) => a
.iter()
.map(|x| (x.location_id, x.indexer_rule_id))
.eq(b.iter().map(|x| (x.location_id, x.indexer_rule_id))),
(ModelData::Preference(a), ModelData::Preference(b)) => a
.iter()
.map(|x| (x.key.clone(), x.value.clone()))
.eq(b.iter().map(|x| (x.key.clone(), x.value.clone()))),
_ => false,
}
}
}
/// Meaningless wrapper to avoid Rust's orphan rule
struct FromData<T>(T);
impl From<FromData<shared_operation::Data>> for shared_operation::CreateUnchecked {
fn from(FromData(data): FromData<shared_operation::Data>) -> Self {
Self {
id: data.id,
timestamp: data.timestamp,
model: data.model,
record_id: data.record_id,
kind: data.kind,
data: data.data,
instance_id: data.instance_id,
_params: vec![],
}
}
}
impl From<FromData<volume::Data>> for volume::CreateUnchecked {
fn from(FromData(data): FromData<volume::Data>) -> Self {
Self {
name: data.name,
mount_point: data.mount_point,
_params: vec![
volume::id::set(data.id),
volume::total_bytes_capacity::set(data.total_bytes_capacity),
volume::total_bytes_available::set(data.total_bytes_available),
volume::disk_type::set(data.disk_type),
volume::filesystem::set(data.filesystem),
volume::is_system::set(data.is_system),
volume::date_modified::set(data.date_modified),
],
}
}
}
impl From<FromData<location::Data>> for location::CreateUnchecked {
fn from(FromData(data): FromData<location::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
location::id::set(data.id),
location::name::set(data.name),
location::path::set(data.path),
location::total_capacity::set(data.total_capacity),
location::available_capacity::set(data.available_capacity),
location::is_archived::set(data.is_archived),
location::generate_preview_media::set(data.generate_preview_media),
location::sync_preview_media::set(data.sync_preview_media),
location::hidden::set(data.hidden),
location::date_created::set(data.date_created),
location::instance_id::set(data.instance_id),
],
}
}
}
impl From<FromData<file_path::Data>> for file_path::CreateUnchecked {
fn from(FromData(data): FromData<file_path::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
file_path::id::set(data.id),
file_path::is_dir::set(data.is_dir),
file_path::cas_id::set(data.cas_id),
file_path::integrity_checksum::set(data.integrity_checksum),
file_path::location_id::set(data.location_id),
file_path::materialized_path::set(data.materialized_path),
file_path::name::set(data.name),
file_path::extension::set(data.extension),
file_path::size_in_bytes::set(data.size_in_bytes),
file_path::size_in_bytes_bytes::set(data.size_in_bytes_bytes),
file_path::inode::set(data.inode),
file_path::device::set(data.device),
file_path::object_id::set(data.object_id),
file_path::key_id::set(data.key_id),
file_path::date_created::set(data.date_created),
file_path::date_modified::set(data.date_modified),
file_path::date_indexed::set(data.date_indexed),
],
}
}
}
impl From<FromData<object::Data>> for object::CreateUnchecked {
fn from(FromData(data): FromData<object::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
object::id::set(data.id),
object::kind::set(data.kind),
object::key_id::set(data.key_id),
object::hidden::set(data.hidden),
object::favorite::set(data.favorite),
object::important::set(data.important),
object::note::set(data.note),
object::date_created::set(data.date_created),
object::date_accessed::set(data.date_accessed),
],
}
}
}
impl From<FromData<tag::Data>> for tag::CreateUnchecked {
fn from(FromData(data): FromData<tag::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
tag::id::set(data.id),
tag::name::set(data.name),
tag::color::set(data.color),
tag::redundancy_goal::set(data.redundancy_goal),
tag::date_created::set(data.date_created),
tag::date_modified::set(data.date_modified),
],
}
}
}
impl From<FromData<tag_on_object::Data>> for tag_on_object::CreateUnchecked {
fn from(FromData(data): FromData<tag_on_object::Data>) -> Self {
Self {
tag_id: data.tag_id,
object_id: data.object_id,
_params: vec![],
}
}
}
impl From<FromData<indexer_rule::Data>> for indexer_rule::CreateUnchecked {
fn from(FromData(data): FromData<indexer_rule::Data>) -> Self {
Self {
pub_id: data.pub_id,
_params: vec![
indexer_rule::id::set(data.id),
indexer_rule::name::set(data.name),
indexer_rule::default::set(data.default),
indexer_rule::rules_per_kind::set(data.rules_per_kind),
indexer_rule::date_created::set(data.date_created),
indexer_rule::date_modified::set(data.date_modified),
],
}
}
}
impl From<FromData<indexer_rules_in_location::Data>>
for indexer_rules_in_location::CreateUnchecked
{
fn from(FromData(data): FromData<indexer_rules_in_location::Data>) -> Self {
Self {
location_id: data.location_id,
indexer_rule_id: data.indexer_rule_id,
_params: vec![],
}
}
}
impl From<FromData<preference::Data>> for preference::CreateUnchecked {
fn from(FromData(data): FromData<preference::Data>) -> Self {
Self {
key: data.key,
_params: vec![preference::value::set(data.value)],
}
}
}
// Ensure you order the models to Foreign Keys are created before the models that reference them.
impl_for_models! {
Object(object),
SharedOperation(shared_operation),
Volume(volume),
Location(location),
FilePath(file_path),
Tag(tag),
TagOnObject(tag_on_object),
IndexerRule(indexer_rule),
IndexerRulesInLocation(indexer_rules_in_location),
Preference(preference),
}

View file

@ -16,11 +16,13 @@ use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::broadcast,
};
use tracing::info;
use tracing::{debug, info};
use uuid::Uuid;
mod initial_sync;
mod proto;
pub use initial_sync::*;
use proto::*;
use crate::{
@ -117,6 +119,21 @@ impl PairingManager {
// TODO: Future - Library in pairing state
// TODO: Create library
if self
.library_manager
.get_all_libraries()
.await
.into_iter()
.find(|i| i.id == library_id)
.is_some()
{
self.emit_progress(pairing_id, PairingStatus::LibraryAlreadyExists);
// TODO: Properly handle this at a protocol level so the error is on both sides
return;
}
let library_config = self
.library_manager
.create_with_uuid(
@ -124,6 +141,7 @@ impl PairingManager {
LibraryName::new(library_name).unwrap(),
library_description,
node_config,
false, // We will sync everything which will conflict with the seeded stuff
)
.await
.unwrap();
@ -144,12 +162,36 @@ impl PairingManager {
// 3.
// TODO: Either rollback or update library out of pairing state
// TODO: Fake initial sync
// TODO: This should timeout if taking too long so it can't be used as a DOS style thing???
let mut total = 0;
let mut synced = 0;
loop {
match SyncData::from_stream(&mut stream).await.unwrap() {
SyncData::Data { total_models, data } => {
if let Some(total_models) = total_models {
total = total_models;
}
synced += data.len();
data.insert(&library.db).await.unwrap();
// Prevent divide by zero
if total != 0 {
self.emit_progress(
pairing_id,
PairingStatus::InitialSyncProgress(
((synced as f32 / total as f32) * 100.0) as u8,
),
);
}
}
SyncData::Finished => break,
}
}
// TODO: Done message to frontend
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
tokio::time::sleep(std::time::Duration::from_secs(30)).await; // TODO
stream.flush().await.unwrap();
}
PairingResponse::Rejected => {
info!("Pairing '{pairing_id}' rejected by remote");
@ -232,12 +274,41 @@ impl PairingManager {
// TODO: Pairing confirmation + rollback
let total = ModelData::total_count(&library.db).await.unwrap();
let mut synced = 0;
info!("Starting sync of {} rows", total);
let mut cursor = ModelSyncCursor::new();
while let Some(data) = cursor.next(&library.db).await {
let data = data.unwrap();
let total_models = match synced {
0 => Some(total),
_ => None,
};
synced += data.len();
self.emit_progress(
pairing_id,
PairingStatus::InitialSyncProgress((synced as f32 / total as f32 * 100.0) as u8), // SAFETY: It's a percentage
);
debug!(
"Initial library sync cursor={:?} items={}",
cursor,
data.len()
);
stream
.write_all(&SyncData::Data { total_models, data }.to_bytes().unwrap())
.await
.unwrap();
}
stream
.write_all(&SyncData::Finished.to_bytes().unwrap())
.await
.unwrap();
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
stream.flush().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(30)).await; // TODO
// };
// inner().await.unwrap();
}
}
@ -253,6 +324,7 @@ pub enum PairingDecision {
pub enum PairingStatus {
EstablishingConnection,
PairingRequested,
LibraryAlreadyExists,
PairingDecisionRequest,
PairingInProgress {
library_name: String,

View file

@ -5,12 +5,14 @@ use sd_p2p::{
proto::{decode, encode},
spacetunnel::Identity,
};
use sd_prisma::prisma::instance;
use sd_prisma::prisma::*;
use tokio::io::{AsyncRead, AsyncReadExt};
use uuid::Uuid;
use crate::node::Platform;
use super::ModelData;
/// Terminology:
/// Instance - DB model which represents a single `.db` file.
/// Originator - begins the pairing process and is asking to join a library that will be selected by the responder.
@ -76,6 +78,19 @@ pub enum PairingConfirmation {
Error,
}
/// 4. Sync the data in the database with the originator.
/// Sent `Responder` -> `Originator`.
#[derive(Debug, PartialEq)]
pub enum SyncData {
Data {
/// Only included in first request and is an **estimate** of how many models will be sent.
/// It will likely be wrong so should be constrained to being used for UI purposes only.
total_models: Option<i64>,
data: ModelData,
},
Finished,
}
impl Instance {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
@ -211,9 +226,7 @@ impl PairingConfirmation {
match stream.read_u8().await.unwrap() {
0 => Ok(Self::Ok),
1 => Ok(Self::Error),
_ => {
todo!();
}
_ => todo!(), // TODO: Error handling
}
}
@ -225,6 +238,52 @@ impl PairingConfirmation {
}
}
impl SyncData {
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> Result<Self, (&'static str, decode::Error)> {
let discriminator = stream
.read_u8()
.await
.map_err(|e| ("discriminator", e.into()))?;
match discriminator {
0 => Ok(Self::Data {
total_models: match stream
.read_i64_le()
.await
.map_err(|e| ("total_models", e.into()))?
{
0 => None,
n => Some(n),
},
data: rmp_serde::from_slice(
&decode::buf(stream).await.map_err(|e| ("data", e.into()))?,
)
.unwrap(), // TODO: Error handling
}),
1 => Ok(Self::Finished),
_ => todo!(), // TODO: Error handling
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
let mut buf = Vec::new();
match self {
Self::Data { total_models, data } => {
buf.push(0);
buf.extend((total_models.unwrap_or(0) as i64).to_le_bytes());
encode::buf(&mut buf, &rmp_serde::to_vec_named(data)?);
}
Self::Finished => {
buf.push(1);
}
}
Ok(buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -298,5 +357,24 @@ mod tests {
let result = PairingConfirmation::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = SyncData::Data {
total_models: Some(123),
data: ModelData::Location(vec![]),
};
let mut cursor = std::io::Cursor::new(original.to_bytes().unwrap());
let result = SyncData::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
{
let original = SyncData::Finished;
let mut cursor = std::io::Cursor::new(original.to_bytes().unwrap());
let result = SyncData::from_stream(&mut cursor).await.unwrap();
assert_eq!(original, result);
}
}
}

View file

@ -114,7 +114,7 @@ impl InitConfig {
Some(lib) => lib,
None => {
let library = library_manager
.create_with_uuid(lib.id, lib.name, lib.description, node_cfg.clone())
.create_with_uuid(lib.id, lib.name, lib.description, node_cfg.clone(), true)
.await?;
match library_manager.get_library(library.uuid).await {

View file

@ -1,4 +1,4 @@
import { useBridgeMutation, useDiscoveredPeers, useFeatureFlag } from '@sd/client';
import { isEnabled, useBridgeMutation, useDiscoveredPeers, useFeatureFlag } from '@sd/client';
import { Button } from '@sd/ui';
import { startPairing } from '~/app/p2p/pairing';
import { Heading } from '../Layout';

View file

@ -1,8 +1,7 @@
import { AppLogo } from '@sd/assets/images';
import { Discord, Github } from '@sd/assets/svgs/brands';
import { Globe } from 'phosphor-react';
import { useEffect, useState } from 'react';
import { getDebugState, useBridgeQuery } from '@sd/client';
import { useBridgeQuery, useDebugStateEnabler } from '@sd/client';
import { Button, Divider } from '@sd/ui';
import { useOperatingSystem } from '~/hooks/useOperatingSystem';
import { usePlatform } from '~/util/Platform';
@ -13,18 +12,7 @@ export const Component = () => {
const os = useOperatingSystem();
const currentPlatformNiceName =
os === 'browser' ? 'Web' : os == 'macOS' ? os : os.charAt(0).toUpperCase() + os.slice(1);
const [clicked, setClicked] = useState(0);
useEffect(() => {
if (clicked >= 5) {
getDebugState().enabled = true;
}
const timeout = setTimeout(() => setClicked(0), 1000);
return () => clearTimeout(timeout);
}, [clicked]);
const onClick = useDebugStateEnabler();
return (
<div className="h-auto">
@ -33,7 +21,7 @@ export const Component = () => {
src={AppLogo}
className="mr-8 h-[88px] w-[88px]"
draggable="false"
onClick={() => setClicked((clicked) => clicked + 1)}
onClick={onClick}
/>
<div className="flex flex-col">
<h1 className="text-2xl font-bold">

View file

@ -1,4 +1,4 @@
import { useOnFeatureFlagsChange, useP2PEvents, withFeatureFlag } from '@sd/client';
import { useFeatureFlag, useP2PEvents, withFeatureFlag } from '@sd/client';
import { SpacedropUI } from './Spacedrop';
import { startPairing } from './pairing';
@ -6,8 +6,9 @@ export const SpacedropUI2 = withFeatureFlag('spacedrop', SpacedropUI);
// Entrypoint of P2P UI
export function P2P() {
const pairingEnabled = useFeatureFlag('p2pPairing');
useP2PEvents((data) => {
if (data.type === 'PairingRequest') {
if (data.type === 'PairingRequest' && pairingEnabled) {
startPairing(data.id, {
name: data.name,
os: data.os

View file

@ -60,6 +60,9 @@ function OriginatorDialog({
.with({ type: 'PairingRequested' }, () => (
<PairingLoading msg="Requesting to pair..." />
))
.with({ type: 'LibraryAlreadyExists' }, () => (
<PairingLoading msg={`Pairing failed due to library already existing!`} />
))
.with({ type: 'PairingDecisionRequest' }, () => (
<PairingResponder pairingId={pairingId} />
))

View file

@ -257,7 +257,7 @@ export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: Peer
export type PairingDecision = { decision: "accept"; libraryId: string } | { decision: "reject" }
export type PairingStatus = { type: "EstablishingConnection" } | { type: "PairingRequested" } | { type: "PairingDecisionRequest" } | { type: "PairingInProgress"; data: { library_name: string; library_description: string | null } } | { type: "InitialSyncProgress"; data: number } | { type: "PairingComplete"; data: string } | { type: "PairingRejected" }
export type PairingStatus = { type: "EstablishingConnection" } | { type: "PairingRequested" } | { type: "LibraryAlreadyExists" } | { type: "PairingDecisionRequest" } | { type: "PairingInProgress"; data: { library_name: string; library_description: string | null } } | { type: "InitialSyncProgress"; data: number } | { type: "PairingComplete"; data: string } | { type: "PairingRejected" }
export type PeerId = string

View file

@ -1,3 +1,4 @@
import { useEffect, useState } from 'react';
import { useSnapshot } from 'valtio';
import { valtioPersist } from '../lib/valito';
@ -24,3 +25,19 @@ export function useDebugState() {
export function getDebugState() {
return debugState;
}
export function useDebugStateEnabler(): () => void {
const [clicked, setClicked] = useState(0);
useEffect(() => {
if (clicked >= 5) {
getDebugState().enabled = true;
}
const timeout = setTimeout(() => setClicked(0), 1000);
return () => clearTimeout(timeout);
}, [clicked]);
return () => setClicked((c) => c + 1);
}

View file

@ -25,7 +25,7 @@ export function P2PContextProvider({ children }: PropsWithChildren) {
useBridgeSubscription(['p2p.events'], {
onData(data) {
events.current.dispatchEvent(new CustomEvent<P2PEvent>('p2p-event', { detail: data }));
events.current.dispatchEvent(new CustomEvent('p2p-event', { detail: data }));
if (data.type === 'DiscoveredPeer') {
setDiscoveredPeer([discoveredPeers.set(data.peer_id, data.metadata)]);

View file

@ -328,6 +328,9 @@ importers:
dayjs:
specifier: ^1.11.8
version: 1.11.8
event-target-polyfill:
specifier: ^0.0.3
version: 0.0.3
expo:
specifier: ~48.0.19
version: 48.0.19(@babel/core@7.22.1)
@ -14239,6 +14242,10 @@ packages:
es5-ext: 0.10.62
dev: true
/event-target-polyfill@0.0.3:
resolution: {integrity: sha512-ZMc6UuvmbinrCk4RzGyVmRyIsAyxMRlp4CqSrcQRO8Dy0A9ldbiRy5kdtBj4OtP7EClGdqGfIqo9JmOClMsGLQ==}
dev: false
/event-target-shim@5.0.1:
resolution: {integrity: sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==}
engines: {node: '>=6'}