server transport via websocket

Co-authored-by: Oscar Beaumont <oscar@otbeaumont.me>
Co-authored-by: Brendan Allan <brendonovich@outlook.com>
This commit is contained in:
Jamie Pine 2022-04-23 06:50:17 -07:00
parent 5a5afe783a
commit 93d80233d7
11 changed files with 579 additions and 36 deletions

View file

@ -1,5 +1,6 @@
{
"cSpell.words": [
"actix",
"bpfrpt",
"creationdate",
"ipfs",

328
Cargo.lock generated
View file

@ -8,6 +8,237 @@ version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "actix"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f728064aca1c318585bf4bb04ffcfac9e75e508ab4e8b1bd9ba5dfe04e2cbed5"
dependencies = [
"actix-rt",
"actix_derive",
"bitflags",
"bytes",
"crossbeam-channel",
"futures-core",
"futures-sink",
"futures-task",
"futures-util",
"log",
"once_cell",
"parking_lot 0.12.0",
"pin-project-lite 0.2.8",
"smallvec",
"tokio",
"tokio-util 0.7.1",
]
[[package]]
name = "actix-codec"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57a7559404a7f3573127aab53c08ce37a6c6a315c374a31070f3c91cd1b4a7fe"
dependencies = [
"bitflags",
"bytes",
"futures-core",
"futures-sink",
"log",
"memchr",
"pin-project-lite 0.2.8",
"tokio",
"tokio-util 0.7.1",
]
[[package]]
name = "actix-http"
version = "3.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5885cb81a0d4d0d322864bea1bb6c2a8144626b4fdc625d4c51eba197e7797a"
dependencies = [
"actix-codec",
"actix-rt",
"actix-service",
"actix-utils",
"ahash",
"base64 0.13.0",
"bitflags",
"brotli",
"bytes",
"bytestring",
"derive_more",
"encoding_rs",
"flate2",
"futures-core",
"h2",
"http",
"httparse",
"httpdate",
"itoa 1.0.1",
"language-tags",
"local-channel",
"log",
"mime",
"percent-encoding",
"pin-project-lite 0.2.8",
"rand 0.8.5",
"sha-1 0.10.0",
"smallvec",
"zstd",
]
[[package]]
name = "actix-macros"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "465a6172cf69b960917811022d8f29bc0b7fa1398bc4f78b3c466673db1213b6"
dependencies = [
"quote",
"syn",
]
[[package]]
name = "actix-router"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb60846b52c118f2f04a56cc90880a274271c489b2498623d58176f8ca21fa80"
dependencies = [
"bytestring",
"firestorm",
"http",
"log",
"regex",
"serde",
]
[[package]]
name = "actix-rt"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ea16c295198e958ef31930a6ef37d0fb64e9ca3b6116e6b93a8bdae96ee1000"
dependencies = [
"futures-core",
"tokio",
]
[[package]]
name = "actix-server"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0da34f8e659ea1b077bb4637948b815cd3768ad5a188fdcd74ff4d84240cd824"
dependencies = [
"actix-rt",
"actix-service",
"actix-utils",
"futures-core",
"futures-util",
"mio 0.8.2",
"num_cpus",
"socket2 0.4.4",
"tokio",
"tracing",
]
[[package]]
name = "actix-service"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a"
dependencies = [
"futures-core",
"paste",
"pin-project-lite 0.2.8",
]
[[package]]
name = "actix-utils"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e491cbaac2e7fc788dfff99ff48ef317e23b3cf63dbaf7aaab6418f40f92aa94"
dependencies = [
"local-waker",
"pin-project-lite 0.2.8",
]
[[package]]
name = "actix-web"
version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4e5ebffd51d50df56a3ae0de0e59487340ca456f05dd0b90c0a7a6dd6a74d31"
dependencies = [
"actix-codec",
"actix-http",
"actix-macros",
"actix-router",
"actix-rt",
"actix-server",
"actix-service",
"actix-utils",
"actix-web-codegen",
"ahash",
"bytes",
"bytestring",
"cfg-if 1.0.0",
"cookie",
"derive_more",
"encoding_rs",
"futures-core",
"futures-util",
"itoa 1.0.1",
"language-tags",
"log",
"mime",
"once_cell",
"pin-project-lite 0.2.8",
"regex",
"serde",
"serde_json",
"serde_urlencoded",
"smallvec",
"socket2 0.4.4",
"time 0.3.9",
"url",
]
[[package]]
name = "actix-web-actors"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31efe7896f3933ce03dd4710be560254272334bb321a18fd8ff62b1a557d9d19"
dependencies = [
"actix",
"actix-codec",
"actix-http",
"actix-web",
"bytes",
"bytestring",
"futures-core",
"pin-project-lite 0.2.8",
"tokio",
]
[[package]]
name = "actix-web-codegen"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7525bedf54704abb1d469e88d7e7e9226df73778798a69cea5022d53b2ae91bc"
dependencies = [
"actix-router",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "actix_derive"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d44b8fee1ced9671ba043476deddef739dd0959bf77030b26b738cc591737a7"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "addr2line"
version = "0.17.0"
@ -706,6 +937,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "bytestring"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90706ba19e97b90786e19dc0d5e2abd80008d99d4c0c5d1ad0b5e72cec7c494d"
dependencies = [
"bytes",
]
[[package]]
name = "cache-padded"
version = "1.2.0"
@ -1005,6 +1245,17 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb4a24b1aaf0fd0ce8b45161144d6f42cd91677fd5940fd431183eb023b3a2b8"
[[package]]
name = "cookie"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05"
dependencies = [
"percent-encoding",
"time 0.3.9",
"version_check",
]
[[package]]
name = "core-derive"
version = "0.1.0"
@ -1868,6 +2119,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "firestorm"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d3d6188b8804df28032815ea256b6955c9625c24da7525f387a7af02fbb8f01"
[[package]]
name = "fixedbitset"
version = "0.1.9"
@ -3207,6 +3464,12 @@ dependencies = [
"log",
]
[[package]]
name = "language-tags"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388"
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -3909,6 +4172,24 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "local-channel"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6246c68cf195087205a0512559c97e15eaf95198bf0e206d662092cdcb03fe9f"
dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"local-waker",
]
[[package]]
name = "local-waker"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "902eb695eb0591864543cbfbf6d742510642a605a61fc5e97fe6ceb5a30ac4fb"
[[package]]
name = "lock_api"
version = "0.3.4"
@ -6623,7 +6904,12 @@ dependencies = [
name = "server"
version = "0.1.0"
dependencies = [
"actix",
"actix-web",
"actix-web-actors",
"sdcore",
"serde",
"serde_json",
"tokio",
]
@ -7553,10 +7839,18 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [
"itoa 1.0.1",
"libc",
"num_threads",
"time-macros",
]
[[package]]
name = "time-macros"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792"
[[package]]
name = "tinyvec"
version = "1.5.1"
@ -7583,7 +7877,10 @@ dependencies = [
"memchr",
"mio 0.8.2",
"num_cpus",
"once_cell",
"parking_lot 0.12.0",
"pin-project-lite 0.2.8",
"signal-hook-registry",
"socket2 0.4.4",
"tokio-macros",
"winapi 0.3.9",
@ -7914,7 +8211,7 @@ version = "1.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ee73e6e4924fe940354b8d4d98cad5231175d615cd855b758adc658c0aac6a0"
dependencies = [
"cfg-if 0.1.10",
"cfg-if 1.0.0",
"rand 0.8.5",
"static_assertions",
]
@ -9018,6 +9315,35 @@ dependencies = [
"synstructure",
]
[[package]]
name = "zstd"
version = "0.10.0+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b1365becbe415f3f0fcd024e2f7b45bacfb5bdd055f0dc113571394114e7bdd"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "4.1.4+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f7cd17c9af1a4d6c24beb1cc54b17e2ef7b593dc92f19e9d9acad8b182bbaee"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "1.6.3+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "zvariant"
version = "3.1.2"

View file

@ -6,5 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix = "0.13.0"
actix-web = "4.0.1"
actix-web-actors = "4.1.0"
sdcore = { path = "../../core" }
tokio = "1.17.0"
serde = "1.0.136"
serde_json = "1.0.79"
tokio = { version = "1.17.0", features = ["sync", "rt"] }

7
apps/server/Dockerfile Normal file
View file

@ -0,0 +1,7 @@
FROM gcr.io/distroless/cc
COPY ./server /sdserver
EXPOSE 8080
ENTRYPOINT [ "/sdserver" ]

View file

@ -1,8 +1,166 @@
use sdcore::Core;
use sdcore::{ClientCommand, ClientQuery, Core, CoreController, CoreEvent, CoreResponse};
use std::{env, path::Path};
#[tokio::main]
async fn main() {
use actix::{
Actor, AsyncContext, ContextFutureSpawner, Handler, Message, StreamHandler,
WrapFuture,
};
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
/// Define HTTP actor
struct Socket {
event_receiver: web::Data<mpsc::Receiver<CoreEvent>>,
core: web::Data<CoreController>,
}
impl Actor for Socket {
type Context = ws::WebsocketContext<Self>;
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "type", content = "data")]
enum SocketMessagePayload {
Command(ClientCommand),
Query(ClientQuery),
}
#[derive(Serialize, Deserialize, Message)]
#[rtype(result = "()")]
#[serde(rename_all = "camelCase")]
struct SocketMessage {
id: String,
payload: SocketMessagePayload,
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Socket {
fn handle(
&mut self,
msg: Result<ws::Message, ws::ProtocolError>,
ctx: &mut Self::Context,
) {
// TODO: Add heartbeat and reconnect logic in the future. We can refer to https://github.com/actix/examples/blob/master/websockets/chat/src/session.rs for the heartbeat stuff.
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => {
let msg: SocketMessage = serde_json::from_str(&text).unwrap();
ctx.notify(msg);
},
_ => (),
}
}
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase", tag = "type", content = "data")]
pub enum SocketResponsePayload {
Query(CoreResponse),
}
#[derive(Message, Serialize)]
#[rtype(result = "()")]
struct SocketResponse {
id: String,
payload: SocketResponsePayload,
}
impl Handler<SocketResponse> for Socket {
type Result = ();
fn handle(&mut self, msg: SocketResponse, ctx: &mut Self::Context) {
let string = serde_json::to_string(&msg).unwrap();
println!("sending response: {string}");
ctx.text(string);
}
}
impl Handler<SocketMessage> for Socket {
type Result = ();
fn handle(&mut self, msg: SocketMessage, ctx: &mut Self::Context) -> Self::Result {
let core = self.core.clone();
let recipient = ctx.address().recipient();
let fut = async move {
match msg.payload {
SocketMessagePayload::Query(query) => {
match core.query(query).await {
Ok(response) => recipient.do_send(SocketResponse {
id: msg.id.clone(),
payload: SocketResponsePayload::Query(response),
}),
Err(err) => {
// println!("query error: {:?}", err);
// Err(err.to_string())
},
};
},
SocketMessagePayload::Command(command) => {
match core.command(command).await {
Ok(response) => recipient.do_send(SocketResponse {
id: msg.id.clone(),
payload: SocketResponsePayload::Query(response),
}),
Err(err) => {
// println!("command error: {:?}", err);
// Err(err.to_string())
},
};
},
_ => {},
}
};
fut.into_actor(self).spawn(ctx);
()
}
}
async fn index(
req: HttpRequest,
stream: web::Payload,
event_receiver: web::Data<mpsc::Receiver<CoreEvent>>,
controller: web::Data<CoreController>,
) -> Result<HttpResponse, Error> {
let resp = ws::start(
Socket {
event_receiver,
core: controller,
},
&req,
stream,
);
println!("{:?}", resp);
resp
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let (event_receiver, controller) = setup().await;
println!("Listening http://localhost:8080");
HttpServer::new(move || {
App::new()
.app_data(event_receiver.clone())
.app_data(controller.clone())
.route("/ws", web::get().to(index))
})
.bind(("0.0.0.0", 8080))?
.run()
.await
}
async fn setup() -> (
web::Data<mpsc::Receiver<CoreEvent>>,
web::Data<CoreController>,
) {
let data_dir_var = "DATA_DIR";
let data_dir = match env::var(data_dir_var) {
Ok(path) => path,
@ -11,7 +169,7 @@ async fn main() {
let data_dir_path = Path::new(&data_dir);
let (mut core, mut event_receiver) = Core::new(data_dir_path.to_path_buf()).await;
let (mut core, event_receiver) = Core::new(data_dir_path.to_path_buf()).await;
core.initializer().await;
@ -19,7 +177,7 @@ async fn main() {
tokio::spawn(async move {
core.start().await;
})
.await
.unwrap();
});
(web::Data::new(event_receiver), web::Data::new(controller))
}

View file

@ -1,16 +1,65 @@
import React from 'react';
import SpacedriveInterface from '@sd/interface';
import { ClientCommand, ClientQuery } from '@sd/core';
import { ClientCommand, ClientQuery, CoreEvent } from '@sd/core';
import { BaseTransport } from '@sd/client';
const websocket = new WebSocket('ws://localhost:8080/ws');
const randomId = () => Math.random().toString(36).slice(2);
// bind state to core via Tauri
class Transport extends BaseTransport {
async query(query: ClientQuery) {
// return await invoke('client_query_transport', { data: query });
requestMap = new Map<string, (data: any) => void>();
constructor() {
super();
websocket.addEventListener('message', (event) => {
if (!event.data) return;
const { id, payload } = JSON.parse(event.data);
const { type, data } = payload;
if (type === 'event') {
this.emit('core_event', data);
} else if (type === 'query' || type === 'command') {
if (this.requestMap.has(id)) {
this.requestMap.get(id)?.(data);
this.requestMap.delete(id);
}
}
});
}
async command(query: ClientCommand) {
// return await invoke('client_command_transport', { data: query });
async query(query: ClientQuery) {
const id = randomId();
let resolve: (data: any) => void;
const promise = new Promise((res) => {
resolve = res;
});
// @ts-ignore
this.requestMap.set(id, resolve);
websocket.send(JSON.stringify({ id, payload: { type: 'query', data: query } }));
return await promise;
}
async command(command: ClientCommand) {
const id = randomId();
let resolve: (data: any) => void;
const promise = new Promise((res) => {
resolve = res;
});
// @ts-ignore
this.requestMap.set(id, resolve);
websocket.send(JSON.stringify({ id, payload: { type: 'command', data: command } }));
return await promise;
}
}
@ -20,9 +69,6 @@ function App() {
{/* <header className="App-header"></header> */}
<SpacedriveInterface
transport={new Transport()}
onCoreEvent={function (event: any): void {
return;
}}
platform={'browser'}
convertFileSrc={function (url: string): string {
return url;

View file

@ -161,6 +161,12 @@ model FilePath {
@@map("file_paths")
}
// if there is a conflicting cas_id, the conficting file should be updated to have a larger cas_id as the feild is unique, however this record is kept to tell the indexer (upon discovering this CAS) that there is alternate versions of the file and to check by a full integrity hash to define for which to associate with.
model FileConflict {
original_file_id Int @unique
detactched_file_id Int @unique
}
model MediaData {
id Int @id
pixel_width Int?

View file

@ -66,7 +66,7 @@ impl Job for FileIdentifierJob {
println!("UPDATING PATH: {}", update_file_path);
block_on(db._execute_raw(&update_file_path)).unwrap();
}
completed += 1;
println!("completed: {}", completed);
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(completed)]);

View file

@ -9,6 +9,7 @@ import create from 'zustand';
import { useKey, useWindowSize } from 'rooks';
import { useSearchParams } from 'react-router-dom';
import { AppPropsContext } from '../../App';
import FileThumb from './FileThumb';
type ExplorerState = {
selectedRowIndex: number;
@ -226,19 +227,7 @@ const RenderCell: React.FC<{ colKey?: ColumnKey; dirId?: number; file?: FilePath
return (
<div className="flex flex-row items-center overflow-hidden">
<div className="w-6 h-6 mr-3">
{row.is_dir ? (
<img className="mt-0.5 pointer-events-none z-90" src="/svg/folder.svg" />
) : (
hasThumbnail &&
location?.data_path && (
<img
className="mt-0.5 pointer-events-none z-90"
src={appPropsContext?.convertFileSrc(
`${location.data_path}/thumbnails/${location.location_id}/${row.temp_cas_id}.webp`
)}
/>
)
)}
<FileThumb file={row} locationId={location.location_id} />
</div>
{/* {colKey == 'name' &&
(() => {

View file

@ -3,7 +3,7 @@ import { FilePath } from '@sd/core';
import clsx from 'clsx';
import React, { useContext } from 'react';
import { AppPropsContext } from '../../App';
import icons from '../../assets/icons';
import { ReactComponent as Folder } from '../../assets/svg/folder.svg';
export default function FileThumb(props: {
@ -15,7 +15,7 @@ export default function FileThumb(props: {
const { data: client } = useBridgeQuery('ClientGetState');
if (props.file.is_dir) {
return <Folder className="" />;
return <Folder className="max-w-[170px]" />;
}
if (props.file.has_local_thumbnail && client?.data_path) {
@ -29,5 +29,9 @@ export default function FileThumb(props: {
);
}
if (icons[props.file.extension as keyof typeof icons]) {
let Icon = icons[props.file.extension as keyof typeof icons];
return <Icon className={clsx('max-w-[170px] w-full h-full', props.className)} />;
}
return <div></div>;
}

View file

@ -58,8 +58,9 @@ export function MacOSTrafficLights() {
export const Sidebar: React.FC<SidebarProps> = (props) => {
const appPropsContext = useContext(AppPropsContext);
const { data: locations } = useBridgeQuery('SysGetLocations');
const { data: locations } = useBridgeQuery('SysGetLocations', undefined, {});
const { mutate: createLocation } = useBridgeCommand('LocCreate');
const { data: clientState } = useBridgeQuery('ClientGetState', undefined, {});
const tags = [
{ id: 1, name: 'Keepsafe', color: '#FF6788' },
@ -92,9 +93,9 @@ export const Sidebar: React.FC<SidebarProps> = (props) => {
variant: 'gray'
}}
// buttonIcon={<Book weight="bold" className="w-4 h-4 mt-0.5 mr-1" />}
buttonText="Jeff's Library"
buttonText={clientState?.client_name || 'Loading...'}
items={[
[{ name: `Jeff's Library`, selected: true }, { name: 'Private Library' }],
[{ name: clientState?.client_name || '', selected: true }, { name: 'Private Library' }],
[
{ name: 'Library Settings', icon: CogIcon },
{ name: 'Add Library', icon: PlusIcon },