job reporting improvements

This commit is contained in:
Jamie Pine 2022-03-27 01:32:18 -07:00
parent 9fd8997fd1
commit 38357af534
4 changed files with 83 additions and 36 deletions

View file

@ -21,14 +21,13 @@ pub struct IndexerJob {
impl Job for IndexerJob {
async fn run(&self, ctx: WorkerContext) -> Result<()> {
// invoke scan_path function
scan_path(&ctx.core_ctx, self.path.as_str(), |progress| {
let core_ctx = ctx.core_ctx.clone();
scan_path(&core_ctx, self.path.as_str(), move |progress| {
// update job progress
ctx.sender
.send(WorkerEvent::Progressed(vec![
JobReportUpdate::TaskCount(progress.chunk_count),
JobReportUpdate::CompletedTaskCount(progress.saved_chunks),
JobReportUpdate::Message(progress.message),
]))
.send(WorkerEvent::Progressed(
progress.iter().map(|p| p.clone().into()).collect(),
))
.unwrap_or(());
})
.await?;
@ -36,18 +35,18 @@ impl Job for IndexerJob {
}
}
pub struct ScanProgress {
pub total_paths: i64,
pub chunk_count: i64,
pub saved_chunks: i64,
pub message: String,
#[derive(Clone)]
pub enum ScanProgress {
ChunkCount(i64),
SavedChunks(i64),
Message(String),
}
// creates a vector of valid path buffers from a directory
pub async fn scan_path(
ctx: &CoreContext,
path: &str,
on_progress: impl Fn(ScanProgress),
on_progress: impl Fn(Vec<ScanProgress>) + Send + Sync + 'static,
) -> Result<()> {
println!("Scanning directory: {}", &path);
let db = &ctx.database;
@ -77,7 +76,7 @@ pub async fn scan_path(
}
let dir_path = path.clone();
let (paths, scan_start) = tokio::task::spawn_blocking(move || {
let (paths, scan_start, on_progress) = tokio::task::spawn_blocking(move || {
// store every valid path discovered
let mut paths: Vec<(PathBuf, i64, Option<i64>)> = Vec::new();
// store a hashmap of directories to their file ids for fast lookup
@ -115,6 +114,7 @@ pub async fn scan_path(
.unwrap_or("");
let parent_dir_id = dirs.get(&*parent_path);
// println!("Discovered: {:?}, {:?}", &path, &parent_dir_id);
on_progress(vec![ScanProgress::Message(format!("Found: {:?}", &path))]);
let file_id = get_id();
paths.push((path.to_owned(), file_id, parent_dir_id.cloned()));
@ -127,7 +127,7 @@ pub async fn scan_path(
dirs.insert(_path, file_id);
}
}
(paths, scan_start)
(paths, scan_start, on_progress)
})
.await
.unwrap();
@ -136,12 +136,16 @@ pub async fn scan_path(
let scan_read_time = scan_start.elapsed();
for (i, chunk) in paths.chunks(100).enumerate() {
on_progress(ScanProgress {
total_paths: paths.len() as i64,
chunk_count: i as i64,
saved_chunks: i as i64,
message: format!("Writing {} files to db at chunk {}", chunk.len(), i),
});
on_progress(vec![
ScanProgress::ChunkCount(i as i64),
ScanProgress::SavedChunks(i as i64),
ScanProgress::Message(format!(
"Writing {} files to db at chunk {}",
chunk.len(),
i
)),
]);
info!(
"{}",
format!("Writing {} files to db at chunk {}", chunk.len(), i)
@ -318,3 +322,15 @@ fn is_app_bundle(entry: &DirEntry) -> bool {
// }
// Ok(())
// }
impl From<ScanProgress> for JobReportUpdate {
fn from(progress: ScanProgress) -> Self {
match progress {
ScanProgress::ChunkCount(count) => JobReportUpdate::TaskCount(count),
ScanProgress::SavedChunks(count) => {
JobReportUpdate::CompletedTaskCount(count)
},
ScanProgress::Message(message) => JobReportUpdate::Message(message),
}
}
}

View file

@ -41,6 +41,10 @@ impl Jobs {
self.running_workers.insert(id, wrapped_worker);
}
}
pub fn complete(&mut self, job_id: String) {
// remove worker from running workers
self.running_workers.remove(&job_id);
}
pub async fn get_running(&self) -> Vec<JobReport> {
let mut ret = vec![];

View file

@ -1,13 +1,11 @@
use super::jobs::{JobReport, JobReportUpdate, JobStatus};
use crate::{ClientQuery, CoreContext, CoreEvent, InternalEvent, Job};
use std::sync::Arc;
use crate::{ClientQuery, CoreContext, CoreEvent, Job};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
};
use super::jobs::{JobReport, JobReportUpdate, JobStatus};
// used to update the worker state from inside the worker thread
pub enum WorkerEvent {
Progressed(Vec<JobReportUpdate>),
@ -22,6 +20,7 @@ enum WorkerState {
#[derive(Clone)]
pub struct WorkerContext {
pub uuid: String,
pub core_ctx: CoreContext,
pub sender: UnboundedSender<WorkerEvent>,
}
@ -73,16 +72,27 @@ impl Worker {
ctx.clone(),
));
let uuid = worker_mut.job_report.id.clone();
tokio::spawn(async move {
println!("new worker thread spawned");
// this is provided to the job function and used to issue updates
let worker_ctx = WorkerContext {
uuid,
core_ctx,
sender: worker_sender,
};
let result = job.run(worker_ctx.clone()).await;
worker_ctx.sender.send(WorkerEvent::Completed).unwrap_or(());
worker_ctx
.core_ctx
.internal_sender
.send(InternalEvent::JobComplete(worker_ctx.uuid.clone()))
.unwrap_or(());
if let Err(_) = result {
worker_ctx.sender.send(WorkerEvent::Failed).unwrap_or(());
}

View file

@ -68,18 +68,26 @@ impl CoreController {
}
}
#[derive(Debug)]
pub enum InternalEvent {
JobIngest(Box<dyn Job>),
JobComplete(String),
}
#[derive(Clone)]
pub struct CoreContext {
pub database: Arc<PrismaClient>,
pub event_sender: mpsc::Sender<CoreEvent>,
pub job_ingest_sender: UnboundedSender<Box<dyn Job>>,
pub internal_sender: UnboundedSender<InternalEvent>,
}
impl CoreContext {
pub fn spawn_job(&self, job: Box<dyn Job>) {
self.job_ingest_sender.send(job).unwrap_or_else(|e| {
error!("Failed to spawn job. {:?}", e);
});
self.internal_sender
.send(InternalEvent::JobIngest(job))
.unwrap_or_else(|e| {
error!("Failed to spawn job. {:?}", e);
});
}
pub async fn emit(&self, event: CoreEvent) {
self.event_sender.send(event).await.unwrap_or_else(|e| {
@ -105,9 +113,11 @@ pub struct Core {
UnboundedReceiver<ReturnableMessage<ClientCommand>>,
),
event_sender: mpsc::Sender<CoreEvent>,
job_ingest_channel: (
UnboundedSender<Box<dyn Job>>,
UnboundedReceiver<Box<dyn Job>>,
// a channel for child threads to send events back to the core
internal_channel: (
UnboundedSender<InternalEvent>,
UnboundedReceiver<InternalEvent>,
),
}
@ -134,7 +144,7 @@ impl Core {
let database = Arc::new(db::create_connection().await.unwrap());
let job_ingest_channel = unbounded_channel::<Box<dyn Job>>();
let internal_channel = unbounded_channel::<InternalEvent>();
let core = Core {
state,
@ -143,7 +153,7 @@ impl Core {
jobs: Jobs::new(),
event_sender,
database,
job_ingest_channel,
internal_channel,
};
(core, event_recv)
@ -153,7 +163,7 @@ impl Core {
CoreContext {
database: self.database.clone(),
event_sender: self.event_sender.clone(),
job_ingest_sender: self.job_ingest_channel.0.clone(),
internal_sender: self.internal_channel.0.clone(),
}
}
@ -177,8 +187,15 @@ impl Core {
let res = self.exec_command(msg.data).await;
msg.return_sender.send(res).unwrap_or(());
}
Some(job) = self.job_ingest_channel.1.recv() => {
self.jobs.ingest(&ctx, job).await;
Some(event) = self.internal_channel.1.recv() => {
match event {
InternalEvent::JobIngest(job) => {
self.jobs.ingest(&ctx, job).await;
},
InternalEvent::JobComplete(id) => {
self.jobs.complete(id);
},
}
}
}
}