From 9fd8997fd1e1621a38e5c94f885607b59fdddd37 Mon Sep 17 00:00:00 2001 From: Jamie Pine <32987599+jamiepine@users.noreply.github.com> Date: Sun, 27 Mar 2022 00:11:02 -0700 Subject: [PATCH] WOAH job progress in realtime... in the UI! worked first time too but that was down to Brendan Co-authored-by: Brendan Allan --- packages/core/src/file/indexer.rs | 109 ++++++++++++++------------ packages/core/src/job/jobs.rs | 31 +++++--- packages/core/src/job/worker.rs | 123 +++++++++++++++++++----------- 3 files changed, 158 insertions(+), 105 deletions(-) diff --git a/packages/core/src/file/indexer.rs b/packages/core/src/file/indexer.rs index f528c3a75..92d575563 100644 --- a/packages/core/src/file/indexer.rs +++ b/packages/core/src/file/indexer.rs @@ -12,7 +12,7 @@ use std::{ }; use walkdir::{DirEntry, WalkDir}; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct IndexerJob { pub path: String, } @@ -51,6 +51,7 @@ pub async fn scan_path( ) -> Result<()> { println!("Scanning directory: {}", &path); let db = &ctx.database; + let path = path.to_string(); let location = create_location(&ctx, &path).await?; @@ -62,7 +63,7 @@ pub async fn scan_path( id: Option, } // grab the next id so we can increment in memory for batch inserting - let mut next_file_id = match db + let first_file_id = match db ._query_raw::(r#"SELECT MAX(id) id FROM file_paths"#) .await { @@ -70,59 +71,67 @@ pub async fn scan_path( Err(e) => Err(anyhow!("Error querying for next file id: {}", e))?, }; - let mut get_id = || { - next_file_id += 1; - next_file_id - }; - //check is path is a directory - if !PathBuf::from(path).is_dir() { - return Err(anyhow::anyhow!("{} is not a directory", path)); + if !PathBuf::from(&path).is_dir() { + return Err(anyhow::anyhow!("{} is not a directory", &path)); } + let dir_path = path.clone(); - // store every valid path discovered - let mut paths: Vec<(PathBuf, i64, Option)> = Vec::new(); - // store a hashmap of directories to their file ids for fast lookup - let mut dirs: HashMap = HashMap::new(); - // begin timer for logging purposes - let scan_start = Instant::now(); - // walk through directory recursively - for entry in WalkDir::new(path).into_iter().filter_entry(|dir| { - let approved = !is_hidden(dir) - && !is_app_bundle(dir) - && !is_node_modules(dir) - && !is_library(dir); - approved - }) { - // extract directory entry or log and continue if failed - let entry = match entry { - Ok(entry) => entry, - Err(e) => { - println!("Error reading file {}", e); - continue; - }, + let (paths, scan_start) = tokio::task::spawn_blocking(move || { + // store every valid path discovered + let mut paths: Vec<(PathBuf, i64, Option)> = Vec::new(); + // store a hashmap of directories to their file ids for fast lookup + let mut dirs: HashMap = HashMap::new(); + // begin timer for logging purposes + let scan_start = Instant::now(); + + let mut next_file_id = first_file_id; + let mut get_id = || { + next_file_id += 1; + next_file_id }; - let path = entry.path(); - - let parent_path = path - .parent() - .unwrap_or(Path::new("")) - .to_str() - .unwrap_or(""); - let parent_dir_id = dirs.get(&*parent_path); - // println!("Discovered: {:?}, {:?}", &path, &parent_dir_id); - - let file_id = get_id(); - paths.push((path.to_owned(), file_id, parent_dir_id.cloned())); - - if entry.file_type().is_dir() { - let _path = match path.to_str() { - Some(path) => path.to_owned(), - None => continue, + // walk through directory recursively + for entry in WalkDir::new(&dir_path).into_iter().filter_entry(|dir| { + let approved = !is_hidden(dir) + && !is_app_bundle(dir) + && !is_node_modules(dir) + && !is_library(dir); + approved + }) { + // extract directory entry or log and continue if failed + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + println!("Error reading file {}", e); + continue; + }, }; - dirs.insert(_path, file_id); + let path = entry.path(); + + let parent_path = path + .parent() + .unwrap_or(Path::new("")) + .to_str() + .unwrap_or(""); + let parent_dir_id = dirs.get(&*parent_path); + // println!("Discovered: {:?}, {:?}", &path, &parent_dir_id); + + let file_id = get_id(); + paths.push((path.to_owned(), file_id, parent_dir_id.cloned())); + + if entry.file_type().is_dir() { + let _path = match path.to_str() { + Some(path) => path.to_owned(), + None => continue, + }; + dirs.insert(_path, file_id); + } } - } + (paths, scan_start) + }) + .await + .unwrap(); + let db_write_start = Instant::now(); let scan_read_time = scan_start.elapsed(); @@ -166,7 +175,7 @@ pub async fn scan_path( } println!( "scan of {:?} completed in {:?}. {:?} files found. db write completed in {:?}", - path, + &path, scan_read_time, paths.len(), db_write_start.elapsed() diff --git a/packages/core/src/job/jobs.rs b/packages/core/src/job/jobs.rs index d6786344b..f0d8bdcd6 100644 --- a/packages/core/src/job/jobs.rs +++ b/packages/core/src/job/jobs.rs @@ -1,16 +1,16 @@ use super::worker::{Worker, WorkerContext}; use crate::{prisma::JobData, CoreContext}; use anyhow::Result; -use dyn_clone::DynClone; use int_enum::IntEnum; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, fmt::Debug}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use tokio::sync::Mutex; use ts_rs::TS; const MAX_WORKERS: usize = 4; #[async_trait::async_trait] -pub trait Job: Send + Sync + Debug + DynClone { +pub trait Job: Send + Sync + Debug { async fn run(&self, ctx: WorkerContext) -> Result<()>; } @@ -18,7 +18,7 @@ pub trait Job: Send + Sync + Debug + DynClone { pub struct Jobs { job_queue: Vec>, // workers are spawned when jobs are picked off the queue - running_workers: HashMap, + running_workers: HashMap>>, } impl Jobs { @@ -30,19 +30,26 @@ impl Jobs { } pub async fn ingest(&mut self, ctx: &CoreContext, job: Box) { // create worker to process job - let mut worker = Worker::new(job); + let worker = Worker::new(job); + let id = worker.id(); if self.running_workers.len() < MAX_WORKERS { - worker.spawn(ctx).await; - self.running_workers.insert(worker.id(), worker); + let wrapped_worker = Arc::new(Mutex::new(worker)); + + Worker::spawn(wrapped_worker.clone(), ctx).await; + + self.running_workers.insert(id, wrapped_worker); } } pub async fn get_running(&self) -> Vec { - self.running_workers - .values() - .into_iter() - .map(|worker| worker.job_report.clone()) - .collect() + let mut ret = vec![]; + + for worker in self.running_workers.values() { + let worker = worker.lock().await; + ret.push(worker.job_report.clone()); + } + + ret } } diff --git a/packages/core/src/job/worker.rs b/packages/core/src/job/worker.rs index 924496265..62cf424d0 100644 --- a/packages/core/src/job/worker.rs +++ b/packages/core/src/job/worker.rs @@ -1,6 +1,10 @@ +use std::sync::Arc; + use crate::{ClientQuery, CoreContext, CoreEvent, Job}; -use dyn_clone::clone_box; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + Mutex, +}; use super::jobs::{JobReport, JobReportUpdate, JobStatus}; @@ -11,6 +15,11 @@ pub enum WorkerEvent { Failed, } +enum WorkerState { + Pending(Box, UnboundedReceiver), + Running, +} + #[derive(Clone)] pub struct WorkerContext { pub core_ctx: CoreContext, @@ -20,31 +29,49 @@ pub struct WorkerContext { // a worker is a dedicated thread that runs a single job // once the job is complete the worker will exit pub struct Worker { - job: Box, pub job_report: JobReport, - worker_channel: (UnboundedSender, UnboundedReceiver), + state: WorkerState, + worker_sender: UnboundedSender, } impl Worker { pub fn new(job: Box) -> Self { + let (worker_sender, worker_receiver) = unbounded_channel(); let uuid = uuid::Uuid::new_v4().to_string(); + println!("worker uuid: {}", &uuid); + Self { - job, + state: WorkerState::Pending(job, worker_receiver), job_report: JobReport::new(uuid), - worker_channel: unbounded_channel(), + worker_sender, } } // spawns a thread and extracts channel sender to communicate with it - pub async fn spawn(&mut self, ctx: &CoreContext) { + pub async fn spawn(worker: Arc>, ctx: &CoreContext) { println!("spawning worker"); // we capture the worker receiver channel so state can be updated from inside the worker - let worker_sender = self.worker_channel.0.clone(); + let mut worker_mut = worker.lock().await; + + let (job, worker_receiver) = + match std::mem::replace(&mut worker_mut.state, WorkerState::Running) { + WorkerState::Pending(job, worker_receiver) => { + worker_mut.state = WorkerState::Running; + (job, worker_receiver) + }, + WorkerState::Running => unreachable!(), + }; + + let worker_sender = worker_mut.worker_sender.clone(); let core_ctx = ctx.clone(); - let job = clone_box(&*self.job); + worker_mut.job_report.status = JobStatus::Running; - self.track_progress(&ctx).await; + tokio::spawn(Worker::track_progress( + worker.clone(), + worker_receiver, + ctx.clone(), + )); tokio::spawn(async move { println!("new worker thread spawned"); @@ -61,44 +88,54 @@ impl Worker { } }); } + pub fn id(&self) -> String { self.job_report.id.to_owned() } - async fn track_progress(&mut self, ctx: &CoreContext) { + + async fn track_progress( + worker: Arc>, + mut channel: UnboundedReceiver, + ctx: CoreContext, + ) { println!("tracking progress"); - self.job_report.status = JobStatus::Running; - loop { - tokio::select! { - Some(command) = self.worker_channel.1.recv() => { - match command { - WorkerEvent::Progressed(changes) => { - println!("worker event: progressed"); - for change in changes { - match change { - JobReportUpdate::TaskCount(task_count) => { - self.job_report.task_count = task_count; - }, - JobReportUpdate::CompletedTaskCount(completed_task_count) => { - self.job_report.completed_task_count = completed_task_count; - }, - JobReportUpdate::Message(message) => { - self.job_report.message = message; - }, - } - } - ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning)).await; - }, - WorkerEvent::Completed => { - self.job_report.status = JobStatus::Completed; - ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning)).await; - ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetHistory)).await; - }, - WorkerEvent::Failed => { - self.job_report.status = JobStatus::Failed; - ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetHistory)).await; - }, + while let Some(command) = channel.recv().await { + let mut worker = worker.lock().await; + + match command { + WorkerEvent::Progressed(changes) => { + println!("worker event: progressed"); + for change in changes { + match change { + JobReportUpdate::TaskCount(task_count) => { + worker.job_report.task_count = task_count; + }, + JobReportUpdate::CompletedTaskCount(completed_task_count) => { + worker.job_report.completed_task_count = + completed_task_count; + }, + JobReportUpdate::Message(message) => { + worker.job_report.message = message; + }, + } } - } + ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning)) + .await; + }, + WorkerEvent::Completed => { + worker.job_report.status = JobStatus::Completed; + ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning)) + .await; + ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetHistory)) + .await; + break; + }, + WorkerEvent::Failed => { + worker.job_report.status = JobStatus::Failed; + ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetHistory)) + .await; + break; + }, } } }