WOAH job progress in realtime... in the UI! worked first time too but that was down to Brendan

Co-authored-by: Brendan Allan <brendonovich@outlook.com>
This commit is contained in:
Jamie Pine 2022-03-27 00:11:02 -07:00
parent 80369e005c
commit 9fd8997fd1
3 changed files with 158 additions and 105 deletions

View file

@ -12,7 +12,7 @@ use std::{
};
use walkdir::{DirEntry, WalkDir};
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct IndexerJob {
pub path: String,
}
@ -51,6 +51,7 @@ pub async fn scan_path(
) -> Result<()> {
println!("Scanning directory: {}", &path);
let db = &ctx.database;
let path = path.to_string();
let location = create_location(&ctx, &path).await?;
@ -62,7 +63,7 @@ pub async fn scan_path(
id: Option<i64>,
}
// grab the next id so we can increment in memory for batch inserting
let mut next_file_id = match db
let first_file_id = match db
._query_raw::<QueryRes>(r#"SELECT MAX(id) id FROM file_paths"#)
.await
{
@ -70,59 +71,67 @@ pub async fn scan_path(
Err(e) => Err(anyhow!("Error querying for next file id: {}", e))?,
};
let mut get_id = || {
next_file_id += 1;
next_file_id
};
//check is path is a directory
if !PathBuf::from(path).is_dir() {
return Err(anyhow::anyhow!("{} is not a directory", path));
if !PathBuf::from(&path).is_dir() {
return Err(anyhow::anyhow!("{} is not a directory", &path));
}
let dir_path = path.clone();
// store every valid path discovered
let mut paths: Vec<(PathBuf, i64, Option<i64>)> = Vec::new();
// store a hashmap of directories to their file ids for fast lookup
let mut dirs: HashMap<String, i64> = HashMap::new();
// begin timer for logging purposes
let scan_start = Instant::now();
// walk through directory recursively
for entry in WalkDir::new(path).into_iter().filter_entry(|dir| {
let approved = !is_hidden(dir)
&& !is_app_bundle(dir)
&& !is_node_modules(dir)
&& !is_library(dir);
approved
}) {
// extract directory entry or log and continue if failed
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
println!("Error reading file {}", e);
continue;
},
let (paths, scan_start) = tokio::task::spawn_blocking(move || {
// store every valid path discovered
let mut paths: Vec<(PathBuf, i64, Option<i64>)> = Vec::new();
// store a hashmap of directories to their file ids for fast lookup
let mut dirs: HashMap<String, i64> = HashMap::new();
// begin timer for logging purposes
let scan_start = Instant::now();
let mut next_file_id = first_file_id;
let mut get_id = || {
next_file_id += 1;
next_file_id
};
let path = entry.path();
let parent_path = path
.parent()
.unwrap_or(Path::new(""))
.to_str()
.unwrap_or("");
let parent_dir_id = dirs.get(&*parent_path);
// println!("Discovered: {:?}, {:?}", &path, &parent_dir_id);
let file_id = get_id();
paths.push((path.to_owned(), file_id, parent_dir_id.cloned()));
if entry.file_type().is_dir() {
let _path = match path.to_str() {
Some(path) => path.to_owned(),
None => continue,
// walk through directory recursively
for entry in WalkDir::new(&dir_path).into_iter().filter_entry(|dir| {
let approved = !is_hidden(dir)
&& !is_app_bundle(dir)
&& !is_node_modules(dir)
&& !is_library(dir);
approved
}) {
// extract directory entry or log and continue if failed
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
println!("Error reading file {}", e);
continue;
},
};
dirs.insert(_path, file_id);
let path = entry.path();
let parent_path = path
.parent()
.unwrap_or(Path::new(""))
.to_str()
.unwrap_or("");
let parent_dir_id = dirs.get(&*parent_path);
// println!("Discovered: {:?}, {:?}", &path, &parent_dir_id);
let file_id = get_id();
paths.push((path.to_owned(), file_id, parent_dir_id.cloned()));
if entry.file_type().is_dir() {
let _path = match path.to_str() {
Some(path) => path.to_owned(),
None => continue,
};
dirs.insert(_path, file_id);
}
}
}
(paths, scan_start)
})
.await
.unwrap();
let db_write_start = Instant::now();
let scan_read_time = scan_start.elapsed();
@ -166,7 +175,7 @@ pub async fn scan_path(
}
println!(
"scan of {:?} completed in {:?}. {:?} files found. db write completed in {:?}",
path,
&path,
scan_read_time,
paths.len(),
db_write_start.elapsed()

View file

@ -1,16 +1,16 @@
use super::worker::{Worker, WorkerContext};
use crate::{prisma::JobData, CoreContext};
use anyhow::Result;
use dyn_clone::DynClone;
use int_enum::IntEnum;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt::Debug};
use std::{collections::HashMap, fmt::Debug, sync::Arc};
use tokio::sync::Mutex;
use ts_rs::TS;
const MAX_WORKERS: usize = 4;
#[async_trait::async_trait]
pub trait Job: Send + Sync + Debug + DynClone {
pub trait Job: Send + Sync + Debug {
async fn run(&self, ctx: WorkerContext) -> Result<()>;
}
@ -18,7 +18,7 @@ pub trait Job: Send + Sync + Debug + DynClone {
pub struct Jobs {
job_queue: Vec<Box<dyn Job>>,
// workers are spawned when jobs are picked off the queue
running_workers: HashMap<String, Worker>,
running_workers: HashMap<String, Arc<Mutex<Worker>>>,
}
impl Jobs {
@ -30,19 +30,26 @@ impl Jobs {
}
pub async fn ingest(&mut self, ctx: &CoreContext, job: Box<dyn Job>) {
// create worker to process job
let mut worker = Worker::new(job);
let worker = Worker::new(job);
let id = worker.id();
if self.running_workers.len() < MAX_WORKERS {
worker.spawn(ctx).await;
self.running_workers.insert(worker.id(), worker);
let wrapped_worker = Arc::new(Mutex::new(worker));
Worker::spawn(wrapped_worker.clone(), ctx).await;
self.running_workers.insert(id, wrapped_worker);
}
}
pub async fn get_running(&self) -> Vec<JobReport> {
self.running_workers
.values()
.into_iter()
.map(|worker| worker.job_report.clone())
.collect()
let mut ret = vec![];
for worker in self.running_workers.values() {
let worker = worker.lock().await;
ret.push(worker.job_report.clone());
}
ret
}
}

View file

@ -1,6 +1,10 @@
use std::sync::Arc;
use crate::{ClientQuery, CoreContext, CoreEvent, Job};
use dyn_clone::clone_box;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
};
use super::jobs::{JobReport, JobReportUpdate, JobStatus};
@ -11,6 +15,11 @@ pub enum WorkerEvent {
Failed,
}
enum WorkerState {
Pending(Box<dyn Job>, UnboundedReceiver<WorkerEvent>),
Running,
}
#[derive(Clone)]
pub struct WorkerContext {
pub core_ctx: CoreContext,
@ -20,31 +29,49 @@ pub struct WorkerContext {
// a worker is a dedicated thread that runs a single job
// once the job is complete the worker will exit
pub struct Worker {
job: Box<dyn Job>,
pub job_report: JobReport,
worker_channel: (UnboundedSender<WorkerEvent>, UnboundedReceiver<WorkerEvent>),
state: WorkerState,
worker_sender: UnboundedSender<WorkerEvent>,
}
impl Worker {
pub fn new(job: Box<dyn Job>) -> Self {
let (worker_sender, worker_receiver) = unbounded_channel();
let uuid = uuid::Uuid::new_v4().to_string();
println!("worker uuid: {}", &uuid);
Self {
job,
state: WorkerState::Pending(job, worker_receiver),
job_report: JobReport::new(uuid),
worker_channel: unbounded_channel(),
worker_sender,
}
}
// spawns a thread and extracts channel sender to communicate with it
pub async fn spawn(&mut self, ctx: &CoreContext) {
pub async fn spawn(worker: Arc<Mutex<Self>>, ctx: &CoreContext) {
println!("spawning worker");
// we capture the worker receiver channel so state can be updated from inside the worker
let worker_sender = self.worker_channel.0.clone();
let mut worker_mut = worker.lock().await;
let (job, worker_receiver) =
match std::mem::replace(&mut worker_mut.state, WorkerState::Running) {
WorkerState::Pending(job, worker_receiver) => {
worker_mut.state = WorkerState::Running;
(job, worker_receiver)
},
WorkerState::Running => unreachable!(),
};
let worker_sender = worker_mut.worker_sender.clone();
let core_ctx = ctx.clone();
let job = clone_box(&*self.job);
worker_mut.job_report.status = JobStatus::Running;
self.track_progress(&ctx).await;
tokio::spawn(Worker::track_progress(
worker.clone(),
worker_receiver,
ctx.clone(),
));
tokio::spawn(async move {
println!("new worker thread spawned");
@ -61,44 +88,54 @@ impl Worker {
}
});
}
pub fn id(&self) -> String {
self.job_report.id.to_owned()
}
async fn track_progress(&mut self, ctx: &CoreContext) {
async fn track_progress(
worker: Arc<Mutex<Self>>,
mut channel: UnboundedReceiver<WorkerEvent>,
ctx: CoreContext,
) {
println!("tracking progress");
self.job_report.status = JobStatus::Running;
loop {
tokio::select! {
Some(command) = self.worker_channel.1.recv() => {
match command {
WorkerEvent::Progressed(changes) => {
println!("worker event: progressed");
for change in changes {
match change {
JobReportUpdate::TaskCount(task_count) => {
self.job_report.task_count = task_count;
},
JobReportUpdate::CompletedTaskCount(completed_task_count) => {
self.job_report.completed_task_count = completed_task_count;
},
JobReportUpdate::Message(message) => {
self.job_report.message = message;
},
}
}
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning)).await;
},
WorkerEvent::Completed => {
self.job_report.status = JobStatus::Completed;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning)).await;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetHistory)).await;
},
WorkerEvent::Failed => {
self.job_report.status = JobStatus::Failed;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetHistory)).await;
},
while let Some(command) = channel.recv().await {
let mut worker = worker.lock().await;
match command {
WorkerEvent::Progressed(changes) => {
println!("worker event: progressed");
for change in changes {
match change {
JobReportUpdate::TaskCount(task_count) => {
worker.job_report.task_count = task_count;
},
JobReportUpdate::CompletedTaskCount(completed_task_count) => {
worker.job_report.completed_task_count =
completed_task_count;
},
JobReportUpdate::Message(message) => {
worker.job_report.message = message;
},
}
}
}
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning))
.await;
},
WorkerEvent::Completed => {
worker.job_report.status = JobStatus::Completed;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning))
.await;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetHistory))
.await;
break;
},
WorkerEvent::Failed => {
worker.job_report.status = JobStatus::Failed;
ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetHistory))
.await;
break;
},
}
}
}