Sync settings page (#2460)

sync settings page
This commit is contained in:
Brendan Allan 2024-05-07 15:39:22 +08:00 committed by GitHub
parent 44478207e7
commit 408499229b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 310 additions and 250 deletions

1
Cargo.lock generated
View file

@ -8365,6 +8365,7 @@ dependencies = [
"prisma-client-rust",
"rmp-serde",
"rmpv",
"sd-actors",
"sd-prisma",
"sd-sync",
"sd-utils",

View file

@ -11,6 +11,7 @@ default = []
sd-prisma = { path = "../../../crates/prisma" }
sd-sync = { path = "../../../crates/sync" }
sd-utils = { path = "../../../crates/utils" }
sd-actors = { path = "../../../crates/actors" }
prisma-client-rust = { workspace = true }
serde = { workspace = true }

View file

@ -1,4 +1,6 @@
use tokio::sync::mpsc;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
pub trait ActorTypes {
type Event;
@ -7,10 +9,19 @@ pub trait ActorTypes {
}
pub struct ActorIO<T: ActorTypes> {
pub event_rx: mpsc::Receiver<T::Event>,
pub event_rx: Arc<Mutex<mpsc::Receiver<T::Event>>>,
pub req_tx: mpsc::Sender<T::Request>,
}
impl<T: ActorTypes> Clone for ActorIO<T> {
fn clone(&self) -> Self {
Self {
event_rx: self.event_rx.clone(),
req_tx: self.req_tx.clone(),
}
}
}
impl<T: ActorTypes> ActorIO<T> {
pub async fn send(&self, value: T::Request) -> Result<(), mpsc::error::SendError<T::Request>> {
self.req_tx.send(value).await
@ -26,6 +37,8 @@ pub fn create_actor_io<T: ActorTypes>() -> (ActorIO<T>, HandlerIO<T>) {
let (req_tx, req_rx) = mpsc::channel(20);
let (event_tx, event_rx) = mpsc::channel(20);
let event_rx = Arc::new(Mutex::new(event_rx));
(ActorIO { event_rx, req_tx }, HandlerIO { event_tx, req_rx })
}

View file

@ -71,7 +71,7 @@ impl Actor {
self.shared.active.store(false, Ordering::Relaxed);
self.shared.active_notify.notify_waiters();
wait!(self.io.event_rx, Event::Notification);
wait!(self.io.event_rx.lock().await, Event::Notification);
self.shared.active.store(true, Ordering::Relaxed);
self.shared.active_notify.notify_waiters();
@ -94,10 +94,12 @@ impl Actor {
.await
.ok();
let mut event_rx = self.io.event_rx.lock().await;
loop {
tokio::select! {
biased;
res = self.io.event_rx.recv() => {
res = event_rx.recv() => {
if let Some(Event::Messages(event)) = res { break State::Ingesting(event) }
}
res = &mut rx => {
@ -144,23 +146,33 @@ impl Actor {
})
}
pub fn spawn(shared: Arc<SharedState>) -> Handler {
pub async fn declare(shared: Arc<SharedState>) -> Handler {
let (actor_io, handler_io) = create_actor_io::<Self>();
tokio::spawn(async move {
let mut this = Self {
state: Some(Default::default()),
io: actor_io,
shared,
};
shared
.actors
.declare(
"Sync Ingester",
{
let shared = shared.clone();
move || async move {
let mut this = Self {
state: Some(Default::default()),
io: actor_io,
shared,
};
loop {
this = match this.tick().await {
Some(this) => this,
None => break,
};
}
});
loop {
this = match this.tick().await {
Some(this) => this,
None => break,
};
}
}
},
true,
)
.await;
Handler {
event_tx: handler_io.event_tx,
@ -459,9 +471,10 @@ mod test {
emit_messages_flag: Arc::new(AtomicBool::new(true)),
active: Default::default(),
active_notify: Default::default(),
actors: Default::default(),
});
(Actor::spawn(shared.clone()), shared)
(Actor::declare(shared.clone()).await, shared)
}
/// If messages tx is dropped, actor should reset and assume no further messages

View file

@ -34,6 +34,7 @@ pub struct SharedState {
pub clock: uhlc::HLC,
pub active: AtomicBool,
pub active_notify: tokio::sync::Notify,
pub actors: Arc<sd_actors::Actors>,
}
#[must_use]

View file

@ -1,4 +1,6 @@
use crate::{crdt_op_db, db_operation::*, ingest, SharedState, SyncMessage, NTP64};
use crate::{
actor::ActorTypes, crdt_op_db, db_operation::*, ingest, SharedState, SyncMessage, NTP64,
};
use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, instance, PrismaClient, SortOrder};
use sd_sync::{CRDTOperation, OperationFactory};
@ -46,11 +48,12 @@ pub struct New {
impl Manager {
#[allow(clippy::new_ret_no_self)]
pub fn new(
pub async fn new(
db: &Arc<PrismaClient>,
instance: Uuid,
emit_messages_flag: &Arc<AtomicBool>,
timestamps: HashMap<Uuid, NTP64>,
actors: &Arc<sd_actors::Actors>,
) -> New {
let (tx, rx) = broadcast::channel(64);
@ -64,9 +67,10 @@ impl Manager {
emit_messages_flag: emit_messages_flag.clone(),
active: Default::default(),
active_notify: Default::default(),
actors: actors.clone(),
});
let ingest = ingest::Actor::spawn(shared.clone());
let ingest = ingest::Actor::declare(shared.clone()).await;
New {
manager: Self {

View file

@ -463,34 +463,41 @@ impl Libraries {
// let key_manager = Arc::new(KeyManager::new(vec![]).await?);
// seed_keymanager(&db, &key_manager).await?;
let sync = sync::Manager::new(&db, instance_id, &config.generate_sync_operations, {
db._batch(
instances
.iter()
.map(|i| {
db.crdt_operation()
.find_first(vec![crdt_operation::instance::is(vec![
instance::id::equals(i.id),
])])
.order_by(crdt_operation::timestamp::order(SortOrder::Desc))
})
.collect::<Vec<_>>(),
)
.await?
.into_iter()
.zip(&instances)
.map(|(op, i)| {
(
from_bytes_to_uuid(&i.pub_id),
sd_sync::NTP64(op.map(|o| o.timestamp).unwrap_or_default() as u64),
)
})
.collect()
});
let sync_manager = Arc::new(sync.manager);
let actors = Default::default();
let sync = sync::Manager::new(
&db,
instance_id,
&config.generate_sync_operations,
{
db._batch(
instances
.iter()
.map(|i| {
db.crdt_operation()
.find_first(vec![crdt_operation::instance::is(vec![
instance::id::equals(i.id),
])])
.order_by(crdt_operation::timestamp::order(SortOrder::Desc))
})
.collect::<Vec<_>>(),
)
.await?
.into_iter()
.zip(&instances)
.map(|(op, i)| {
(
from_bytes_to_uuid(&i.pub_id),
sd_sync::NTP64(op.map(|o| o.timestamp).unwrap_or_default() as u64),
)
})
.collect()
},
&actors,
)
.await;
let sync_manager = Arc::new(sync.manager);
let cloud = crate::cloud::start(node, &actors, id, instance_id, &sync_manager, &db).await;
let (tx, mut rx) = broadcast::channel(10);

View file

@ -86,10 +86,7 @@ export default () => {
onClick={() => {
// if debug telemetry sharing is about to be disabled, but telemetry logging is enabled
// then disable it
if (
!debugState.shareFullTelemetry === false &&
debugState.telemetryLogging
)
if (!debugState.shareFullTelemetry === false && debugState.telemetryLogging)
debugState.telemetryLogging = false;
debugState.shareFullTelemetry = !debugState.shareFullTelemetry;
}}
@ -105,10 +102,7 @@ export default () => {
onClick={() => {
// if telemetry logging is about to be enabled, but debug telemetry sharing is disabled
// then enable it
if (
!debugState.telemetryLogging &&
debugState.shareFullTelemetry === false
)
if (!debugState.telemetryLogging && debugState.shareFullTelemetry === false)
debugState.shareFullTelemetry = true;
debugState.telemetryLogging = !debugState.telemetryLogging;
}}
@ -125,8 +119,7 @@ export default () => {
size="sm"
variant="gray"
onClick={() => {
if (nodeState?.data?.data_path)
platform.openPath!(nodeState?.data?.data_path);
if (nodeState?.data?.data_path) platform.openPath!(nodeState?.data?.data_path);
}}
>
Open
@ -229,7 +222,7 @@ function FeatureFlagSelector() {
<span className="truncate">Feature Flags</span>
</Dropdown.Button>
}
className="mt-1 shadow-none data-[side=bottom]:slide-in-from-top-2 dark:divide-menu-selected/30 dark:border-sidebar-line dark:bg-sidebar-box"
className="z-[999] mt-1 shadow-none data-[side=bottom]:slide-in-from-top-2 dark:divide-menu-selected/30 dark:border-sidebar-line dark:bg-sidebar-box"
alignToTrigger
>
{[...features, ...backendFeatures].map((feat) => (
@ -239,11 +232,7 @@ function FeatureFlagSelector() {
iconProps={{ weight: 'bold', size: 16 }}
onClick={() => toggleFeatureFlag(feat)}
className="font-medium text-white"
icon={
featureFlags.find((f) => feat === f) !== undefined
? CheckSquare
: undefined
}
icon={featureFlags.find((f) => feat === f) !== undefined ? CheckSquare : undefined}
/>
))}
</DropdownMenu.Root>
@ -281,9 +270,7 @@ function CloudOriginSelect() {
}
value={origin.data}
>
<SelectOption value="https://app.spacedrive.com">
https://app.spacedrive.com
</SelectOption>
<SelectOption value="https://app.spacedrive.com">https://app.spacedrive.com</SelectOption>
<SelectOption value="http://localhost:3000">http://localhost:3000</SelectOption>
</Select>
)}
@ -295,10 +282,7 @@ function ExplorerBehaviorSelect() {
const { explorerOperatingSystem } = useExplorerOperatingSystem();
return (
<Select
value={explorerOperatingSystem}
onChange={(v) => (explorerOperatingSystemStore.os = v)}
>
<Select value={explorerOperatingSystem} onChange={(v) => (explorerOperatingSystemStore.os = v)}>
<SelectOption value="macOS">macOS</SelectOption>
<SelectOption value="windows">windows</SelectOption>
</Select>

View file

@ -25,10 +25,6 @@ export default function DebugSection() {
<Icon component={Database} />
Cache
</SidebarLink>
<SidebarLink to="debug/actors">
<Icon component={Factory} />
Actors
</SidebarLink>
<SidebarLink to="debug/p2p/overview">
<Icon component={ShareNetwork} />
P2P

View file

@ -2,7 +2,6 @@ import { RouteObject } from 'react-router';
export const debugRoutes = [
{ path: 'cloud', lazy: () => import('./cloud') },
{ path: 'sync', lazy: () => import('./sync') },
{ path: 'actors', lazy: () => import('./actors') },
{
path: 'p2p',

View file

@ -1,163 +0,0 @@
import { useEffect, useMemo } from 'react';
import {
CRDTOperation,
CRDTOperationData,
useLibraryMutation,
useLibraryQuery,
useLibrarySubscription,
useZodForm
} from '@sd/client';
import { Button, Dialog, dialogManager, useDialog, UseDialogProps, z } from '@sd/ui';
import { useRouteTitle } from '~/hooks/useRouteTitle';
type MessageGroup = {
model: number;
id: string;
messages: { data: CRDTOperationData; timestamp: number }[];
};
export const Component = () => {
useRouteTitle('Sync');
const syncEnabled = useLibraryQuery(['sync.enabled']);
const messages = useLibraryQuery(['sync.messages']);
const backfillSync = useLibraryMutation(['sync.backfill'], {
onSuccess: async () => {
await syncEnabled.refetch();
await messages.refetch();
}
});
useLibrarySubscription(['sync.newMessage'], {
onData: () => messages.refetch()
});
const groups = useMemo(
() => (messages.data && calculateGroups(messages.data)) || [],
[messages]
);
return (
<ul className="space-y-4 p-4">
{!syncEnabled.data && (
<Button
variant="accent"
onClick={() => {
dialogManager.create((dialogProps) => (
<SyncBackfillDialog {...dialogProps} />
));
}}
disabled={backfillSync.isLoading}
>
Enable sync messages
</Button>
)}
{groups?.map((group, index) => <OperationGroup key={index} group={group} />)}
</ul>
);
};
const OperationGroup = ({ group }: { group: MessageGroup }) => {
const [header, contents] = (() => {
const header = (
<div className="flex items-center space-x-2 p-2">
<span>{group.model}</span>
<span className="">{group.id}</span>
</div>
);
const contents = (
<ul className="flex flex-col space-y-2 p-2">
{group.messages.map((message, index) => (
<li key={index} className="flex flex-row justify-between px-2">
{typeof message.data === 'string' ? (
<p>Delete</p>
) : 'u' in message.data ? (
<p>Update - {message.data.u.field}</p>
) : (
<div>
<p>Create</p>
<ul>
{Object.entries(message.data.c).map(([key, value]) => (
<li className="pl-2" key={key}>
{key}: {JSON.stringify(value)}
</li>
))}
</ul>
</div>
)}
<p className="text-gray-400">{message.timestamp}</p>
</li>
))}
</ul>
);
return [header, contents];
})();
return (
<div className="divide-y divide-gray bg-app-darkBox">
{header}
{contents}
</div>
);
};
function calculateGroups(messages: CRDTOperation[]) {
return messages.reduce<MessageGroup[]>((acc, op) => {
const { data } = op;
const id = JSON.stringify(op.record_id);
const latest = (() => {
const latest = acc[acc.length - 1];
if (!latest || latest.model !== op.model || latest.id !== id) {
const group: MessageGroup = {
model: op.model,
id,
messages: []
};
acc.push(group);
return group;
} else return latest;
})();
latest.messages.push({
data,
timestamp: op.timestamp
});
return acc;
}, []);
}
function SyncBackfillDialog(props: UseDialogProps) {
const form = useZodForm({ schema: z.object({}) });
const dialog = useDialog(props);
const enableSync = useLibraryMutation(['sync.backfill'], {});
// dialog is in charge of enabling sync
useEffect(() => {
form.handleSubmit(
async () => {
await enableSync.mutateAsync(null).then(() => (dialog.state.open = false));
},
() => {}
)();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
return (
<Dialog
title="Backfilling Sync Operations"
description="Library is paused until backfill completes"
form={form}
dialog={dialog}
hideButtons
ignoreClickOutside
/>
);
}

View file

@ -1,11 +1,11 @@
import { Info } from '@phosphor-icons/react';
import clsx from 'clsx';
import { PropsWithChildren } from 'react';
import { PropsWithChildren, ReactNode } from 'react';
import { ErrorMessage, Tooltip } from '@sd/ui';
import { usePlatform } from '~/util/Platform';
interface Props {
title: string;
title: ReactNode;
registerName?: string;
description?: string | JSX.Element;
mini?: boolean;
@ -29,12 +29,7 @@ export default ({ mini, registerName, ...props }: PropsWithChildren<Props>) => {
<h3 className="text-sm font-medium text-ink">{props.title}</h3>
{props.toolTipLabel && (
<Tooltip label={props.toolTipLabel as string}>
<Info
onClick={() =>
props.infoUrl && platform.openLink(props.infoUrl)
}
size={15}
/>
<Info onClick={() => props.infoUrl && platform.openLink(props.infoUrl)} size={15} />
</Tooltip>
)}
</div>
@ -43,9 +38,7 @@ export default ({ mini, registerName, ...props }: PropsWithChildren<Props>) => {
</div>
{mini && props.children}
</div>
{registerName ? (
<ErrorMessage name={registerName} className="mt-1 w-full text-xs" />
) : null}
{registerName ? <ErrorMessage name={registerName} className="mt-1 w-full text-xs" /> : null}
</>
);
};

View file

@ -1,4 +1,5 @@
import {
ArrowsClockwise,
Books,
Cloud,
Database,
@ -116,6 +117,12 @@ export default () => {
<Icon component={MagnifyingGlass} />
Saved Searches
</SidebarLink> */}
{useFeatureFlag('cloudSync') && (
<SidebarLink to="library/sync">
<Icon component={ArrowsClockwise} />
{t('sync')}
</SidebarLink>
)}
<SidebarLink disabled to="library/clouds">
<Icon component={Cloud} />
{t('clouds')}

View file

@ -1,12 +1,216 @@
import { inferSubscriptionResult } from '@oscartbeaumont-sd/rspc-client';
import clsx from 'clsx';
import { useEffect, useState } from 'react';
import {
Procedures,
useLibraryMutation,
useLibraryQuery,
useLibrarySubscription,
useZodForm
} from '@sd/client';
import { Button, Dialog, dialogManager, useDialog, UseDialogProps, z } from '@sd/ui';
import { useLocale } from '~/hooks';
import { Heading } from '../Layout';
import Setting from '../Setting';
const ACTORS = {
Ingest: 'Sync Ingest',
CloudSend: 'Cloud Sync Sender',
CloudReceive: 'Cloud Sync Receiver',
CloudIngest: 'Cloud Sync Ingest'
};
export const Component = () => {
const { t } = useLocale();
const syncEnabled = useLibraryQuery(['sync.enabled']);
const backfillSync = useLibraryMutation(['sync.backfill'], {
onSuccess: async () => {
await syncEnabled.refetch();
}
});
const [data, setData] = useState<inferSubscriptionResult<Procedures, 'library.actors'>>({});
useLibrarySubscription(['library.actors'], { onData: setData });
return (
<>
<Heading title={t('sync')} description={t('sync_description')} />
{syncEnabled.data === false ? (
<Setting
mini
title="Enable Sync"
description="Generate sync operations for all the existing data in this library, and configure Spacedrive to generate sync operations when things happen in future."
>
<div>
<Button
className="text-nowrap"
variant="accent"
onClick={() => {
dialogManager.create((dialogProps) => <SyncBackfillDialog {...dialogProps} />);
}}
disabled={backfillSync.isLoading}
>
Enable sync
</Button>
</div>
</Setting>
) : (
<>
<Setting
mini
title={
<>
Ingester
<OnlineIndicator online={data[ACTORS.Ingest] ?? false} />
</>
}
description="This process takes sync operations from P2P connections and Spacedrive Cloud and applies them to the library."
>
<div>
{data[ACTORS.Ingest] ? (
<StopButton name={ACTORS.Ingest} />
) : (
<StartButton name={ACTORS.Ingest} />
)}
</div>
</Setting>
<CloudSync data={data} />
</>
)}
</>
);
};
function SyncBackfillDialog(props: UseDialogProps) {
const form = useZodForm({ schema: z.object({}) });
const dialog = useDialog(props);
const enableSync = useLibraryMutation(['sync.backfill'], {});
// dialog is in charge of enabling sync
useEffect(() => {
form.handleSubmit(
async () => {
await enableSync.mutateAsync(null).then(() => (dialog.state.open = false));
},
() => {}
)();
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);
return (
<Dialog
title="Backfilling Sync Operations"
description="Library is paused until backfill completes"
form={form}
dialog={dialog}
hideButtons
ignoreClickOutside
/>
);
}
function CloudSync({ data }: { data: inferSubscriptionResult<Procedures, 'library.actors'> }) {
return (
<>
<div>
<h1 className="mb-0.5 text-lg font-bold text-ink">Cloud Sync</h1>
<p className="text-sm text-ink-faint">
Manage the processes that sync your library with Spacedrive Cloud
</p>
</div>
<Setting
mini
title={
<>
Sender <OnlineIndicator online={data[ACTORS.CloudSend] ?? false} />
</>
}
description="This process sends sync operations to Spacedrive Cloud."
>
<div>
{data[ACTORS.CloudSend] ? (
<StopButton name={ACTORS.CloudSend} />
) : (
<StartButton name={ACTORS.CloudSend} />
)}
</div>
</Setting>
<Setting
mini
title={
<>
Receiver
<OnlineIndicator online={data[ACTORS.CloudReceive] ?? false} />
</>
}
description="This process receives and stores operations from Spacedrive Cloud."
>
<div>
{data[ACTORS.CloudReceive] ? (
<StopButton name={ACTORS.CloudReceive} />
) : (
<StartButton name={ACTORS.CloudReceive} />
)}
</div>
</Setting>
<Setting
mini
title={
<>
Ingester
<OnlineIndicator online={data[ACTORS.CloudIngest] ?? false} />
</>
}
description="This process takes received cloud operations and sends them to the main sync ingester."
>
<div>
{data[ACTORS.CloudIngest] ? (
<StopButton name={ACTORS.CloudIngest} />
) : (
<StartButton name={ACTORS.CloudIngest} />
)}
</div>
</Setting>
</>
);
}
function StartButton({ name }: { name: string }) {
const startActor = useLibraryMutation(['library.startActor']);
return (
<Button
variant="accent"
disabled={startActor.isLoading}
onClick={() => startActor.mutate(name)}
>
{startActor.isLoading ? 'Starting...' : 'Start'}
</Button>
);
}
function StopButton({ name }: { name: string }) {
const stopActor = useLibraryMutation(['library.stopActor']);
return (
<Button variant="accent" disabled={stopActor.isLoading} onClick={() => stopActor.mutate(name)}>
{stopActor.isLoading ? 'Stopping...' : 'Stop'}
</Button>
);
}
function OnlineIndicator({ online }: { online: boolean }) {
return (
<div
className={clsx(
'ml-1.5 inline-block size-2.5 rounded-full',
online ? 'bg-green-500' : 'bg-red-500'
)}
/>
);
}