mirror of
https://github.com/spacedriveapp/spacedrive
synced 2024-07-02 11:13:29 +00:00
Automatic job progress (#717)
Remove time tracker task + sender level debouncing
This commit is contained in:
parent
7ebc467184
commit
8ea6a1c329
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
Warnings:
|
||||
|
||||
- You are about to drop the column `date_modified` on the `job` table. All the data in the column will be lost.
|
||||
- You are about to drop the column `seconds_elapsed` on the `job` table. All the data in the column will be lost.
|
||||
|
||||
*/
|
||||
-- RedefineTables
|
||||
PRAGMA foreign_keys=OFF;
|
||||
CREATE TABLE "new_job" (
|
||||
"id" BLOB NOT NULL PRIMARY KEY,
|
||||
"name" TEXT NOT NULL,
|
||||
"node_id" INTEGER NOT NULL,
|
||||
"action" INTEGER NOT NULL,
|
||||
"status" INTEGER NOT NULL DEFAULT 0,
|
||||
"data" BLOB,
|
||||
"metadata" BLOB,
|
||||
"parent_id" BLOB,
|
||||
"task_count" INTEGER NOT NULL DEFAULT 1,
|
||||
"completed_task_count" INTEGER NOT NULL DEFAULT 0,
|
||||
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"date_started" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
"date_completed" DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT "job_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "node" ("id") ON DELETE CASCADE ON UPDATE CASCADE,
|
||||
CONSTRAINT "job_parent_id_fkey" FOREIGN KEY ("parent_id") REFERENCES "job" ("id") ON DELETE CASCADE ON UPDATE CASCADE
|
||||
);
|
||||
INSERT INTO "new_job" ("action", "completed_task_count", "data", "date_created", "id", "metadata", "name", "node_id", "parent_id", "status", "task_count") SELECT "action", "completed_task_count", "data", "date_created", "id", "metadata", "name", "node_id", "parent_id", "status", "task_count" FROM "job";
|
||||
DROP TABLE "job";
|
||||
ALTER TABLE "new_job" RENAME TO "job";
|
||||
PRAGMA foreign_key_check;
|
||||
PRAGMA foreign_keys=ON;
|
|
@ -367,8 +367,8 @@ model Job {
|
|||
task_count Int @default(1)
|
||||
completed_task_count Int @default(0)
|
||||
date_created DateTime @default(now())
|
||||
date_modified DateTime @default(now())
|
||||
seconds_elapsed Int @default(0)
|
||||
date_started DateTime? @default(now()) // Started execution
|
||||
date_completed DateTime? @default(now()) // Finished execution
|
||||
|
||||
nodes Node @relation(fields: [node_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
|
||||
|
||||
|
|
|
@ -359,7 +359,6 @@ pub enum JobReportUpdate {
|
|||
TaskCount(usize),
|
||||
CompletedTaskCount(usize),
|
||||
Message(String),
|
||||
SecondsElapsed(u64),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Type, Clone)]
|
||||
|
@ -369,7 +368,8 @@ pub struct JobReport {
|
|||
pub data: Option<Vec<u8>>,
|
||||
pub metadata: Option<serde_json::Value>,
|
||||
pub created_at: Option<DateTime<Utc>>,
|
||||
pub updated_at: Option<DateTime<Utc>>,
|
||||
pub started_at: Option<DateTime<Utc>>,
|
||||
pub completed_at: Option<DateTime<Utc>>,
|
||||
|
||||
pub parent_id: Option<Uuid>,
|
||||
|
||||
|
@ -379,8 +379,6 @@ pub struct JobReport {
|
|||
|
||||
pub message: String,
|
||||
// pub percentage_complete: f64,
|
||||
// #[ts(type = "string")] // TODO: Make this work with specta
|
||||
pub seconds_elapsed: i32,
|
||||
}
|
||||
|
||||
impl Display for JobReport {
|
||||
|
@ -403,7 +401,8 @@ impl From<job::Data> for JobReport {
|
|||
task_count: data.task_count,
|
||||
completed_task_count: data.completed_task_count,
|
||||
created_at: Some(data.date_created.into()),
|
||||
updated_at: Some(data.date_modified.into()),
|
||||
started_at: data.date_started.map(|d| d.into()),
|
||||
completed_at: data.date_completed.map(|d| d.into()),
|
||||
data: data.data,
|
||||
metadata: data.metadata.and_then(|m| {
|
||||
serde_json::from_slice(&m).unwrap_or_else(|e| -> Option<serde_json::Value> {
|
||||
|
@ -412,7 +411,6 @@ impl From<job::Data> for JobReport {
|
|||
})
|
||||
}),
|
||||
message: String::new(),
|
||||
seconds_elapsed: data.seconds_elapsed,
|
||||
// SAFETY: We created this uuid before
|
||||
parent_id: data.parent_id.map(|id| Uuid::from_slice(&id).unwrap()),
|
||||
}
|
||||
|
@ -425,7 +423,8 @@ impl JobReport {
|
|||
id: uuid,
|
||||
name,
|
||||
created_at: None,
|
||||
updated_at: None,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
status: JobStatus::Queued,
|
||||
task_count: 0,
|
||||
data: None,
|
||||
|
@ -433,7 +432,6 @@ impl JobReport {
|
|||
parent_id: None,
|
||||
completed_task_count: 0,
|
||||
message: String::new(),
|
||||
seconds_elapsed: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -446,7 +444,6 @@ impl JobReport {
|
|||
pub async fn create(&mut self, library: &Library) -> Result<(), JobError> {
|
||||
let now = Utc::now();
|
||||
self.created_at = Some(now);
|
||||
self.updated_at = Some(now);
|
||||
|
||||
library
|
||||
.db
|
||||
|
@ -460,7 +457,7 @@ impl JobReport {
|
|||
[
|
||||
job::data::set(self.data.clone()),
|
||||
job::date_created::set(now.into()),
|
||||
job::date_modified::set(now.into()),
|
||||
job::date_started::set(self.started_at.map(|d| d.into())),
|
||||
],
|
||||
[self
|
||||
.parent_id
|
||||
|
@ -471,9 +468,8 @@ impl JobReport {
|
|||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update(&mut self, library: &Library) -> Result<(), JobError> {
|
||||
let now = Utc::now();
|
||||
self.updated_at = Some(now);
|
||||
library
|
||||
.db
|
||||
.job()
|
||||
|
@ -485,8 +481,8 @@ impl JobReport {
|
|||
job::metadata::set(serde_json::to_vec(&self.metadata).ok()),
|
||||
job::task_count::set(self.task_count),
|
||||
job::completed_task_count::set(self.completed_task_count),
|
||||
job::date_modified::set(now.into()),
|
||||
job::seconds_elapsed::set(self.seconds_elapsed),
|
||||
job::date_started::set(self.started_at.map(|v| v.into())),
|
||||
job::date_completed::set(self.completed_at.map(|v| v.into())),
|
||||
],
|
||||
)
|
||||
.exec()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use crate::invalidate_query;
|
||||
use crate::job::{DynJob, JobError, JobManager, JobReportUpdate, JobStatus};
|
||||
use crate::library::Library;
|
||||
use chrono::Utc;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::{
|
||||
|
@ -9,19 +10,18 @@ use tokio::{
|
|||
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||
Mutex,
|
||||
},
|
||||
time::{interval_at, Instant},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use super::{JobMetadata, JobReport};
|
||||
|
||||
const JOB_REPORT_UPDATE_INTERVAL: Duration = Duration::from_millis(1000 / 60);
|
||||
|
||||
// used to update the worker state from inside the worker thread
|
||||
#[derive(Debug)]
|
||||
pub enum WorkerEvent {
|
||||
Progressed {
|
||||
updates: Vec<JobReportUpdate>,
|
||||
debounce: bool,
|
||||
},
|
||||
Progressed(Vec<JobReportUpdate>),
|
||||
Completed(oneshot::Sender<()>, JobMetadata),
|
||||
Failed(oneshot::Sender<()>),
|
||||
Paused(Vec<u8>, oneshot::Sender<()>),
|
||||
|
@ -32,24 +32,26 @@ pub struct WorkerContext {
|
|||
pub library: Library,
|
||||
events_tx: UnboundedSender<WorkerEvent>,
|
||||
shutdown_tx: Arc<broadcast::Sender<()>>,
|
||||
// Used for debouncing
|
||||
last_event: Instant,
|
||||
}
|
||||
|
||||
impl WorkerContext {
|
||||
pub fn progress(&self, updates: Vec<JobReportUpdate>) {
|
||||
self.events_tx
|
||||
.send(WorkerEvent::Progressed {
|
||||
updates,
|
||||
debounce: false,
|
||||
})
|
||||
.send(WorkerEvent::Progressed(updates))
|
||||
.expect("critical error: failed to send worker worker progress event updates");
|
||||
}
|
||||
pub fn progress_debounced(&self, updates: Vec<JobReportUpdate>) {
|
||||
self.events_tx
|
||||
.send(WorkerEvent::Progressed {
|
||||
updates,
|
||||
debounce: true,
|
||||
})
|
||||
.expect("critical error: failed to send worker worker progress event updates");
|
||||
|
||||
pub fn progress_debounced(&mut self, updates: Vec<JobReportUpdate>) {
|
||||
let now = Instant::now();
|
||||
if self.last_event.duration_since(now) > JOB_REPORT_UPDATE_INTERVAL {
|
||||
self.last_event = now;
|
||||
|
||||
self.events_tx
|
||||
.send(WorkerEvent::Progressed(updates))
|
||||
.expect("critical error: failed to send worker worker progress event updates");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shutdown_rx(&self) -> broadcast::Receiver<()> {
|
||||
|
@ -104,6 +106,9 @@ impl Worker {
|
|||
let job_id = worker.report.id;
|
||||
|
||||
worker.report.status = JobStatus::Running;
|
||||
if worker.report.started_at.is_none() {
|
||||
worker.report.started_at = Some(Utc::now());
|
||||
}
|
||||
|
||||
// If the report doesn't have a created_at date, it's a new report
|
||||
if worker.report.created_at.is_none() {
|
||||
|
@ -131,29 +136,10 @@ impl Worker {
|
|||
library: library.clone(),
|
||||
events_tx: worker_events_tx,
|
||||
shutdown_tx: job_manager.shutdown_tx(),
|
||||
last_event: (Instant::now()
|
||||
- (JOB_REPORT_UPDATE_INTERVAL + Duration::from_secs(1))), // So we don't miss the first event
|
||||
};
|
||||
|
||||
// track time
|
||||
let events_tx = worker_ctx.events_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut interval = interval_at(
|
||||
Instant::now() + Duration::from_millis(1000),
|
||||
Duration::from_millis(1000),
|
||||
);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if events_tx
|
||||
.send(WorkerEvent::Progressed {
|
||||
updates: vec![JobReportUpdate::SecondsElapsed(1)],
|
||||
debounce: false,
|
||||
})
|
||||
.is_err() && events_tx.is_closed()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let (done_tx, done_rx) = oneshot::channel();
|
||||
|
||||
match job.run(job_manager.clone(), worker_ctx.clone()).await {
|
||||
|
@ -202,21 +188,11 @@ impl Worker {
|
|||
mut worker_events_rx: UnboundedReceiver<WorkerEvent>,
|
||||
library: Library,
|
||||
) {
|
||||
let mut last = Instant::now();
|
||||
|
||||
while let Some(command) = worker_events_rx.recv().await {
|
||||
let mut worker = worker.lock().await;
|
||||
|
||||
match command {
|
||||
WorkerEvent::Progressed { updates, debounce } => {
|
||||
if debounce {
|
||||
let current = Instant::now();
|
||||
if current.duration_since(last) > Duration::from_millis(1000 / 60) {
|
||||
last = current
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
WorkerEvent::Progressed(updates) => {
|
||||
// protect against updates if job is not running
|
||||
if worker.report.status != JobStatus::Running {
|
||||
continue;
|
||||
|
@ -232,9 +208,6 @@ impl Worker {
|
|||
JobReportUpdate::Message(message) => {
|
||||
worker.report.message = message;
|
||||
}
|
||||
JobReportUpdate::SecondsElapsed(seconds) => {
|
||||
worker.report.seconds_elapsed += seconds as i32;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -244,6 +217,7 @@ impl Worker {
|
|||
worker.report.status = JobStatus::Completed;
|
||||
worker.report.data = None;
|
||||
worker.report.metadata = metadata;
|
||||
worker.report.completed_at = Some(Utc::now());
|
||||
if let Err(e) = worker.report.update(&library).await {
|
||||
error!("failed to update job report: {:#?}", e);
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use crate::{
|
||||
job::{JobError, JobInitData, JobResult, JobState, StatefulJob, WorkerContext},
|
||||
library::Library,
|
||||
location::file_path_helper::{
|
||||
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
|
||||
file_path_just_id_materialized_path, filter_existing_file_path_params,
|
||||
|
@ -10,7 +9,10 @@ use crate::{
|
|||
prisma::location,
|
||||
};
|
||||
|
||||
use std::{collections::HashMap, path::Path};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use chrono::Utc;
|
||||
use itertools::Itertools;
|
||||
|
@ -50,13 +52,11 @@ impl StatefulJob for IndexerJob {
|
|||
}
|
||||
|
||||
/// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`.
|
||||
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
|
||||
let Library {
|
||||
last_file_path_id_manager,
|
||||
db,
|
||||
..
|
||||
} = &ctx.library;
|
||||
|
||||
async fn init(
|
||||
&self,
|
||||
mut ctx: WorkerContext,
|
||||
state: &mut JobState<Self>,
|
||||
) -> Result<(), JobError> {
|
||||
let location_id = state.init.location.id;
|
||||
let location_path = Path::new(&state.init.location.path);
|
||||
|
||||
|
@ -82,7 +82,9 @@ impl StatefulJob for IndexerJob {
|
|||
.await
|
||||
.map_err(IndexerError::from)?;
|
||||
|
||||
let sub_path_file_path = db
|
||||
let sub_path_file_path = ctx
|
||||
.library
|
||||
.db
|
||||
.file_path()
|
||||
.find_first(filter_existing_file_path_params(
|
||||
&MaterializedPath::new(location_id, location_path, &full_path, true)
|
||||
|
@ -103,29 +105,33 @@ impl StatefulJob for IndexerJob {
|
|||
};
|
||||
|
||||
let scan_start = Instant::now();
|
||||
|
||||
let found_paths = walk(
|
||||
to_walk_path,
|
||||
&indexer_rules_by_kind,
|
||||
|path, total_entries| {
|
||||
IndexerJobData::on_scan_progress(
|
||||
&ctx,
|
||||
vec![
|
||||
ScanProgress::Message(format!("Scanning {}", path.display())),
|
||||
ScanProgress::ChunkCount(total_entries / BATCH_SIZE),
|
||||
],
|
||||
);
|
||||
},
|
||||
// if we're not using a sub_path, then its a full indexing and we must include root dir
|
||||
state.init.sub_path.is_none(),
|
||||
)
|
||||
.await?;
|
||||
let found_paths = {
|
||||
let ctx = &mut ctx; // Borrow outside of closure so it's not moved
|
||||
walk(
|
||||
to_walk_path,
|
||||
&indexer_rules_by_kind,
|
||||
|path, total_entries| {
|
||||
IndexerJobData::on_scan_progress(
|
||||
ctx,
|
||||
vec![
|
||||
ScanProgress::Message(format!("Scanning {}", path.display())),
|
||||
ScanProgress::ChunkCount(total_entries / BATCH_SIZE),
|
||||
],
|
||||
);
|
||||
},
|
||||
// if we're not using a sub_path, then its a full indexing and we must include root dir
|
||||
state.init.sub_path.is_none(),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
// NOTE:
|
||||
// As we're passing the list of currently existing file paths to the `find_many_file_paths_by_full_path` query,
|
||||
// it means that `dirs_ids` contains just paths that still exists on the filesystem.
|
||||
dirs_ids.extend(
|
||||
db.file_path()
|
||||
ctx.library
|
||||
.db
|
||||
.file_path()
|
||||
.find_many(
|
||||
filter_file_paths_by_many_full_path_params(
|
||||
&location::Data::from(&state.init.location),
|
||||
|
@ -157,14 +163,15 @@ impl StatefulJob for IndexerJob {
|
|||
location_id,
|
||||
dirs_ids.values().copied().collect(),
|
||||
maybe_parent_file_path,
|
||||
db,
|
||||
&ctx.library.db,
|
||||
)
|
||||
.await
|
||||
.map_err(IndexerError::from)?;
|
||||
|
||||
// Syncing the last file path id manager, as we potentially just removed a bunch of ids
|
||||
last_file_path_id_manager
|
||||
.sync(location_id, db)
|
||||
ctx.library
|
||||
.last_file_path_id_manager
|
||||
.sync(location_id, &ctx.library.db)
|
||||
.await
|
||||
.map_err(IndexerError::from)?;
|
||||
|
||||
|
@ -206,8 +213,10 @@ impl StatefulJob for IndexerJob {
|
|||
let total_paths = new_paths.len();
|
||||
|
||||
// grab the next id so we can increment in memory for batch inserting
|
||||
let first_file_id = last_file_path_id_manager
|
||||
.increment(location_id, total_paths as i32, db)
|
||||
let first_file_id = ctx
|
||||
.library
|
||||
.last_file_path_id_manager
|
||||
.increment(location_id, total_paths as i32, &ctx.library.db)
|
||||
.await
|
||||
.map_err(IndexerError::from)?;
|
||||
|
||||
|
@ -235,27 +244,28 @@ impl StatefulJob for IndexerJob {
|
|||
removed_paths,
|
||||
});
|
||||
|
||||
state.steps = new_paths
|
||||
state.steps = VecDeque::with_capacity(new_paths.len() / BATCH_SIZE);
|
||||
for (i, chunk) in new_paths
|
||||
.into_iter()
|
||||
.chunks(BATCH_SIZE)
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, chunk)| {
|
||||
let chunk_steps = chunk.collect::<Vec<_>>();
|
||||
IndexerJobData::on_scan_progress(
|
||||
&ctx,
|
||||
vec![
|
||||
ScanProgress::SavedChunks(i),
|
||||
ScanProgress::Message(format!(
|
||||
"Writing {} of {} to db",
|
||||
i * chunk_steps.len(),
|
||||
total_paths,
|
||||
)),
|
||||
],
|
||||
);
|
||||
chunk_steps
|
||||
})
|
||||
.collect();
|
||||
{
|
||||
let chunk_steps = chunk.collect::<Vec<_>>();
|
||||
IndexerJobData::on_scan_progress(
|
||||
&mut ctx,
|
||||
vec![
|
||||
ScanProgress::SavedChunks(i),
|
||||
ScanProgress::Message(format!(
|
||||
"Writing {} of {} to db",
|
||||
i * chunk_steps.len(),
|
||||
total_paths,
|
||||
)),
|
||||
],
|
||||
);
|
||||
|
||||
state.steps.push_back(chunk_steps);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ pub struct IndexerJobStepEntry {
|
|||
}
|
||||
|
||||
impl IndexerJobData {
|
||||
fn on_scan_progress(ctx: &WorkerContext, progress: Vec<ScanProgress>) {
|
||||
fn on_scan_progress(ctx: &mut WorkerContext, progress: Vec<ScanProgress>) {
|
||||
ctx.progress_debounced(
|
||||
progress
|
||||
.iter()
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use crate::{
|
||||
job::{JobError, JobInitData, JobResult, JobState, StatefulJob, WorkerContext},
|
||||
library::Library,
|
||||
location::file_path_helper::{
|
||||
ensure_sub_path_is_directory, ensure_sub_path_is_in_location,
|
||||
file_path_just_id_materialized_path, filter_existing_file_path_params,
|
||||
|
@ -70,13 +69,11 @@ impl StatefulJob for ShallowIndexerJob {
|
|||
}
|
||||
|
||||
/// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`.
|
||||
async fn init(&self, ctx: WorkerContext, state: &mut JobState<Self>) -> Result<(), JobError> {
|
||||
let Library {
|
||||
last_file_path_id_manager,
|
||||
db,
|
||||
..
|
||||
} = &ctx.library;
|
||||
|
||||
async fn init(
|
||||
&self,
|
||||
mut ctx: WorkerContext,
|
||||
state: &mut JobState<Self>,
|
||||
) -> Result<(), JobError> {
|
||||
let location_id = state.init.location.id;
|
||||
let location_path = Path::new(&state.init.location.path);
|
||||
|
||||
|
@ -105,7 +102,9 @@ impl StatefulJob for ShallowIndexerJob {
|
|||
|
||||
(
|
||||
full_path,
|
||||
db.file_path()
|
||||
ctx.library
|
||||
.db
|
||||
.file_path()
|
||||
.find_first(filter_existing_file_path_params(&materialized_path))
|
||||
.select(file_path_just_id_materialized_path::select())
|
||||
.exec()
|
||||
|
@ -116,7 +115,9 @@ impl StatefulJob for ShallowIndexerJob {
|
|||
} else {
|
||||
(
|
||||
location_path.to_path_buf(),
|
||||
db.file_path()
|
||||
ctx.library
|
||||
.db
|
||||
.file_path()
|
||||
.find_first(filter_existing_file_path_params(
|
||||
&MaterializedPath::new(location_id, location_path, location_path, true)
|
||||
.map_err(IndexerError::from)?,
|
||||
|
@ -130,22 +131,27 @@ impl StatefulJob for ShallowIndexerJob {
|
|||
};
|
||||
|
||||
let scan_start = Instant::now();
|
||||
let found_paths = walk_single_dir(
|
||||
to_walk_path,
|
||||
&indexer_rules_by_kind,
|
||||
|path, total_entries| {
|
||||
IndexerJobData::on_scan_progress(
|
||||
&ctx,
|
||||
vec![
|
||||
ScanProgress::Message(format!("Scanning {}", path.display())),
|
||||
ScanProgress::ChunkCount(total_entries / BATCH_SIZE),
|
||||
],
|
||||
);
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let found_paths = {
|
||||
let ctx = &mut ctx; // Borrow outside of closure so it's not moved
|
||||
walk_single_dir(
|
||||
to_walk_path,
|
||||
&indexer_rules_by_kind,
|
||||
|path, total_entries| {
|
||||
IndexerJobData::on_scan_progress(
|
||||
ctx,
|
||||
vec![
|
||||
ScanProgress::Message(format!("Scanning {}", path.display())),
|
||||
ScanProgress::ChunkCount(total_entries / BATCH_SIZE),
|
||||
],
|
||||
);
|
||||
},
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
||||
let (already_existing_file_paths, mut to_retain) = db
|
||||
let (already_existing_file_paths, mut to_retain) = ctx
|
||||
.library
|
||||
.db
|
||||
.file_path()
|
||||
.find_many(
|
||||
filter_file_paths_by_many_full_path_params(
|
||||
|
@ -171,14 +177,19 @@ impl StatefulJob for ShallowIndexerJob {
|
|||
to_retain.push(parent_id);
|
||||
|
||||
// Removing all other file paths that are not in the filesystem anymore
|
||||
let removed_paths =
|
||||
retain_file_paths_in_location(location_id, to_retain, Some(parent_file_path), db)
|
||||
.await
|
||||
.map_err(IndexerError::from)?;
|
||||
let removed_paths = retain_file_paths_in_location(
|
||||
location_id,
|
||||
to_retain,
|
||||
Some(parent_file_path),
|
||||
&ctx.library.db,
|
||||
)
|
||||
.await
|
||||
.map_err(IndexerError::from)?;
|
||||
|
||||
// Syncing the last file path id manager, as we potentially just removed a bunch of ids
|
||||
last_file_path_id_manager
|
||||
.sync(location_id, db)
|
||||
ctx.library
|
||||
.last_file_path_id_manager
|
||||
.sync(location_id, &ctx.library.db)
|
||||
.await
|
||||
.map_err(IndexerError::from)?;
|
||||
|
||||
|
@ -212,8 +223,10 @@ impl StatefulJob for ShallowIndexerJob {
|
|||
let total_paths = new_paths.len();
|
||||
|
||||
// grab the next id so we can increment in memory for batch inserting
|
||||
let first_file_id = last_file_path_id_manager
|
||||
.increment(location_id, total_paths as i32, db)
|
||||
let first_file_id = ctx
|
||||
.library
|
||||
.last_file_path_id_manager
|
||||
.increment(location_id, total_paths as i32, &ctx.library.db)
|
||||
.await
|
||||
.map_err(IndexerError::from)?;
|
||||
|
||||
|
@ -242,7 +255,7 @@ impl StatefulJob for ShallowIndexerJob {
|
|||
.map(|(i, chunk)| {
|
||||
let chunk_steps = chunk.collect::<Vec<_>>();
|
||||
IndexerJobData::on_scan_progress(
|
||||
&ctx,
|
||||
&mut ctx,
|
||||
vec![
|
||||
ScanProgress::SavedChunks(i),
|
||||
ScanProgress::Message(format!(
|
||||
|
|
|
@ -64,7 +64,7 @@ type ToWalkEntry = (PathBuf, Option<bool>);
|
|||
pub(super) async fn walk(
|
||||
root: impl AsRef<Path>,
|
||||
rules_per_kind: &HashMap<RuleKind, Vec<IndexerRule>>,
|
||||
update_notifier: impl Fn(&Path, usize),
|
||||
mut update_notifier: impl FnMut(&Path, usize) + '_,
|
||||
include_root: bool,
|
||||
) -> Result<Vec<WalkEntry>, IndexerError> {
|
||||
let root = root.as_ref().to_path_buf();
|
||||
|
@ -91,7 +91,7 @@ pub(super) async fn walk(
|
|||
(current_path, parent_dir_accepted_by_its_children),
|
||||
&mut read_dir,
|
||||
rules_per_kind,
|
||||
&update_notifier,
|
||||
&mut update_notifier,
|
||||
&mut indexed_paths,
|
||||
Some(&mut to_walk),
|
||||
)
|
||||
|
@ -106,7 +106,7 @@ async fn inner_walk_single_dir(
|
|||
(current_path, parent_dir_accepted_by_its_children): ToWalkEntry,
|
||||
read_dir: &mut fs::ReadDir,
|
||||
rules_per_kind: &HashMap<RuleKind, Vec<IndexerRule>>,
|
||||
update_notifier: &impl Fn(&Path, usize),
|
||||
update_notifier: &mut impl FnMut(&Path, usize),
|
||||
indexed_paths: &mut HashMap<PathBuf, WalkEntry>,
|
||||
mut maybe_to_walk: Option<&mut VecDeque<(PathBuf, Option<bool>)>>,
|
||||
) -> Result<(), IndexerError> {
|
||||
|
@ -387,7 +387,7 @@ async fn prepared_indexed_paths(
|
|||
pub(super) async fn walk_single_dir(
|
||||
root: impl AsRef<Path>,
|
||||
rules_per_kind: &HashMap<RuleKind, Vec<IndexerRule>>,
|
||||
update_notifier: impl Fn(&Path, usize),
|
||||
mut update_notifier: impl FnMut(&Path, usize) + '_,
|
||||
) -> Result<Vec<WalkEntry>, IndexerError> {
|
||||
let root = root.as_ref().to_path_buf();
|
||||
|
||||
|
@ -399,7 +399,7 @@ pub(super) async fn walk_single_dir(
|
|||
(root.clone(), None),
|
||||
&mut read_dir,
|
||||
rules_per_kind,
|
||||
&update_notifier,
|
||||
&mut update_notifier,
|
||||
&mut indexed_paths,
|
||||
None,
|
||||
)
|
||||
|
|
|
@ -16,6 +16,7 @@ import {
|
|||
TrashSimple,
|
||||
X
|
||||
} from 'phosphor-react';
|
||||
import { useEffect, useState } from 'react';
|
||||
import { JobReport, useLibraryMutation, useLibraryQuery } from '@sd/client';
|
||||
import { Button, CategoryHeading, PopoverClose, ProgressBar, Tooltip } from '@sd/ui';
|
||||
|
||||
|
@ -137,12 +138,37 @@ export function JobsManager() {
|
|||
);
|
||||
}
|
||||
|
||||
function JobTimeText({ job }: { job: JobReport }) {
|
||||
const [_, setRerenderPlz] = useState(0);
|
||||
|
||||
let text: string;
|
||||
if (job.status === 'Running') {
|
||||
text = `Elapsed ${dayjs(job.started_at).fromNow(true)}`;
|
||||
} else if (job.completed_at) {
|
||||
text = `Took ${dayjs(job.started_at).from(job.completed_at, true)}`;
|
||||
} else {
|
||||
text = `Took ${dayjs(job.started_at).fromNow(true)}`;
|
||||
}
|
||||
|
||||
useEffect(() => {
|
||||
if (job.status === 'Running') {
|
||||
const interval = setInterval(() => {
|
||||
setRerenderPlz((x) => x + 1); // Trigger React to rerender and dayjs to update
|
||||
}, 1000);
|
||||
return () => clearInterval(interval);
|
||||
}
|
||||
}, [job.status]);
|
||||
|
||||
return <>{text}</>;
|
||||
}
|
||||
|
||||
function Job({ job }: { job: JobReport }) {
|
||||
const niceData = getNiceData(job)[job.name] || {
|
||||
name: job.name,
|
||||
icon: Question
|
||||
};
|
||||
const isRunning = job.status === 'Running';
|
||||
|
||||
return (
|
||||
// Do we actually need bg-opacity-60 here? Where is the bg?
|
||||
// eslint-disable-next-line tailwindcss/migration-from-tailwind-2
|
||||
|
@ -161,19 +187,10 @@ function Job({ job }: { job: JobReport }) {
|
|||
)}
|
||||
<div className="flex items-center truncate text-ink-faint">
|
||||
<span className="text-xs">
|
||||
{isRunning ? 'Elapsed' : job.status === 'Failed' ? 'Failed after' : 'Took'}{' '}
|
||||
{job.seconds_elapsed
|
||||
? dayjs.duration({ seconds: job.seconds_elapsed }).humanize()
|
||||
: 'less than a second'}
|
||||
<JobTimeText job={job} />
|
||||
</span>
|
||||
<span className="mx-1 opacity-50">•</span>
|
||||
{
|
||||
<span className="text-xs">
|
||||
{isRunning
|
||||
? 'Unknown time remaining'
|
||||
: dayjs(job.created_at).toNow(true) + ' ago'}
|
||||
</span>
|
||||
}
|
||||
{<span className="text-xs">{dayjs(job.created_at).fromNow()}</span>}
|
||||
</div>
|
||||
{/* <span className="mt-0.5 opacity-50 text-tiny text-ink-faint">{job.id}</span> */}
|
||||
</div>
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
"client": "pnpm --filter @sd/client -- ",
|
||||
"prisma": "cd core && cargo prisma",
|
||||
"codegen": "cargo test -p sd-core api::tests::test_and_export_rspc_bindings -- --exact",
|
||||
"typecheck": "turbo run typecheck",
|
||||
"typecheck": "pnpm -r typecheck",
|
||||
"lint": "turbo run lint",
|
||||
"lint:fix": "turbo run lint -- --fix",
|
||||
"clean": "rimraf -g \"node_modules/\" \"**/node_modules/\" \"target/\" \"**/.build/\" \"**/.next/\" \"**/dist/!(.gitignore)**\""
|
||||
|
|
|
@ -163,7 +163,7 @@ export type IndexerRuleCreateArgs = { kind: RuleKind, name: string, parameters:
|
|||
|
||||
export type InvalidateOperationEvent = { key: string, arg: any, result: any | null }
|
||||
|
||||
export type JobReport = { id: string, name: string, data: number[] | null, metadata: any | null, created_at: string | null, updated_at: string | null, parent_id: string | null, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: number }
|
||||
export type JobReport = { id: string, name: string, data: number[] | null, metadata: any | null, created_at: string | null, started_at: string | null, completed_at: string | null, parent_id: string | null, status: JobStatus, task_count: number, completed_task_count: number, message: string }
|
||||
|
||||
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused"
|
||||
|
||||
|
|
Loading…
Reference in a new issue