[ENG-1160] Opening a location while indexing causes a lot of jobs to be spawned (#1554)

Solved, but missing a frontend error message
This commit is contained in:
Ericson "Fogo" Soares 2023-10-12 23:47:13 -03:00 committed by GitHub
parent eb208a9e17
commit 996361f081
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 175 additions and 19 deletions

View file

@ -1,11 +1,15 @@
use crate::{
invalidate_query,
job::StatefulJob,
location::{
delete_location, find_location, indexer::rules::IndexerRuleCreateArgs, light_scan_location,
location_with_indexer_rules, non_indexed::NonIndexedPathItem, relink_location,
scan_location, scan_location_sub_path, LocationCreateArgs, LocationError,
delete_location, find_location,
indexer::{rules::IndexerRuleCreateArgs, IndexerJobInit},
light_scan_location, location_with_indexer_rules,
non_indexed::NonIndexedPathItem,
relink_location, scan_location, scan_location_sub_path, LocationCreateArgs, LocationError,
LocationUpdateArgs,
},
object::file_identifier::file_identifier_job::FileIdentifierJobInit,
p2p::PeerMetadata,
prisma::{file_path, indexer_rule, indexer_rules_in_location, location, object, SortOrder},
util::AbortOnDrop,
@ -307,24 +311,44 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
pub sub_path: String,
}
R.with2(library())
.subscription(|(node, library), args: LightScanArgs| async move {
let location = find_location(&library, args.location_id)
R.with2(library()).subscription(
|(node, library),
LightScanArgs {
location_id,
sub_path,
}: LightScanArgs| async move {
if node
.jobs
.has_job_running(|job_identity| {
job_identity.target_location == location_id
&& (job_identity.name == <IndexerJobInit as StatefulJob>::NAME
|| job_identity.name
== <FileIdentifierJobInit as StatefulJob>::NAME)
})
.await
{
return Err(rspc::Error::new(
ErrorCode::Conflict,
"We're still indexing this location, pleases wait a bit...".to_string(),
));
}
let location = find_location(&library, location_id)
.include(location_with_indexer_rules::include())
.exec()
.await?
.ok_or(LocationError::IdNotFound(args.location_id))?;
.ok_or(LocationError::IdNotFound(location_id))?;
let handle = tokio::spawn(async move {
if let Err(e) =
light_scan_location(node, library, location, args.sub_path).await
if let Err(e) = light_scan_location(node, library, location, sub_path).await
{
error!("light scan error: {e:#?}");
}
});
Ok(AbortOnDrop(handle))
})
},
)
})
.procedure(
"online",

View file

@ -26,9 +26,8 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use super::{JobManagerError, JobReport, JobStatus, StatefulJob};
use super::{JobIdentity, JobManagerError, JobReport, JobStatus, StatefulJob};
// db is single threaded, nerd
const MAX_WORKERS: usize = 5;
pub enum JobManagerEvent {
@ -355,6 +354,15 @@ impl Jobs {
false
}
pub async fn has_job_running(&self, predicate: impl Fn(JobIdentity) -> bool) -> bool {
for worker in self.running_workers.read().await.values() {
if worker.who_am_i().await.map(&predicate).unwrap_or(false) {
return true;
}
}
false
}
}
#[macro_use]

View file

@ -9,6 +9,8 @@ use std::{
time::Instant,
};
use sd_prisma::prisma::location;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{select, sync::mpsc};
use tracing::{debug, info, trace, warn};
@ -27,6 +29,14 @@ pub use worker::*;
pub type JobResult = Result<JobMetadata, JobError>;
pub type JobMetadata = Option<serde_json::Value>;
#[derive(Debug)]
pub struct JobIdentity {
pub id: Uuid,
pub name: &'static str,
pub target_location: location::id::Type,
pub status: JobStatus,
}
#[derive(Debug, Default)]
pub struct JobRunErrors(pub Vec<String>);
@ -84,6 +94,9 @@ pub trait StatefulJob:
data: &mut Option<Self::Data>,
) -> Result<JobInitOutput<Self::RunMetadata, Self::Step>, JobError>;
/// The location id where this job will act upon
fn target_location(&self) -> location::id::Type;
/// is called for each step in the job. These steps are created in the `Self::init` method.
async fn execute_step(
&self,
@ -182,7 +195,6 @@ pub struct Job<SJob: StatefulJob> {
hash: u64,
report: Option<JobReport>,
state: Option<JobState<SJob>>,
// stateful_job: Option<SJob>,
next_jobs: VecDeque<Box<dyn DynJob>>,
}
@ -462,6 +474,8 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
.take()
.expect("critical error: missing job state");
let target_location = init.target_location();
let stateful_job = Arc::new(init);
let ctx = Arc::new(ctx);
@ -469,6 +483,9 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
let mut job_should_run = true;
let job_time = Instant::now();
// Just for self identification purposes
let mut inner_status = JobStatus::Running;
// Checking if we have a brand new job, or if we are resuming an old one.
let working_data = if let Some(data) = data {
Some(data)
@ -496,16 +513,42 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
select! {
Some(command) = commands_rx.recv() => {
match command {
WorkerCommand::IdentifyYourself(tx) => {
if tx.send(
JobIdentity {
id: job_id,
name: job_name,
target_location,
status: inner_status
}
).is_err() {
warn!("Failed to send IdentifyYourself event reply");
}
}
WorkerCommand::Pause(when) => {
debug!(
"Pausing Job at init phase <id='{job_id}', name='{job_name}'> took {:?}",
when.elapsed()
);
inner_status = JobStatus::Paused;
// In case of a Pause command, we keep waiting for the next command
let paused_time = Instant::now();
while let Some(command) = commands_rx.recv().await {
match command {
WorkerCommand::IdentifyYourself(tx) => {
if tx.send(
JobIdentity {
id: job_id,
name: job_name,
target_location,
status: inner_status
}
).is_err() {
warn!("Failed to send IdentifyYourself event reply");
}
}
WorkerCommand::Resume(when) => {
debug!(
"Resuming Job at init phase <id='{job_id}', name='{job_name}'> took {:?}",
@ -515,6 +558,8 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
"Total paused time {:?} Job <id='{job_id}', name='{job_name}'>",
paused_time.elapsed()
);
inner_status = JobStatus::Running;
break;
}
// The job can also be shutdown or canceled while paused
@ -654,16 +699,42 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
// Here we have a channel that we use to receive commands from the worker
Some(command) = commands_rx.recv() => {
match command {
WorkerCommand::IdentifyYourself(tx) => {
if tx.send(
JobIdentity {
id: job_id,
name: job_name,
target_location,
status: inner_status
}
).is_err() {
warn!("Failed to send IdentifyYourself event reply");
}
}
WorkerCommand::Pause(when) => {
debug!(
"Pausing Job <id='{job_id}', name='{job_name}'> took {:?}",
when.elapsed()
);
inner_status = JobStatus::Paused;
// In case of a Pause command, we keep waiting for the next command
let paused_time = Instant::now();
while let Some(command) = commands_rx.recv().await {
match command {
WorkerCommand::IdentifyYourself(tx) => {
if tx.send(
JobIdentity {
id: job_id,
name: job_name,
target_location,
status: inner_status
}
).is_err() {
warn!("Failed to send IdentifyYourself event reply");
}
}
WorkerCommand::Resume(when) => {
debug!(
"Resuming Job <id='{job_id}', name='{job_name}'> took {:?}",
@ -673,6 +744,7 @@ impl<SJob: StatefulJob> DynJob for Job<SJob> {
"Total paused time {:?} Job <id='{job_id}', name='{job_name}'>",
paused_time.elapsed(),
);
inner_status = JobStatus::Running;
break;
}
// The job can also be shutdown or canceled while paused

View file

@ -22,7 +22,8 @@ use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;
use super::{
DynJob, JobError, JobReport, JobReportUpdate, JobRunErrors, JobRunOutput, JobStatus, Jobs,
DynJob, JobError, JobIdentity, JobReport, JobReportUpdate, JobRunErrors, JobRunOutput,
JobStatus, Jobs,
};
#[derive(Debug, Clone, Serialize, Type)]
@ -47,6 +48,7 @@ pub enum WorkerEvent {
pub enum WorkerCommand {
Pause(Instant),
Resume(Instant),
IdentifyYourself(oneshot::Sender<JobIdentity>),
Cancel(Instant, oneshot::Sender<()>),
Shutdown(Instant, oneshot::Sender<()>),
}
@ -174,6 +176,23 @@ impl Worker {
}
}
pub async fn who_am_i(&self) -> Option<JobIdentity> {
let (tx, rx) = oneshot::channel();
if self
.commands_tx
.send(WorkerCommand::IdentifyYourself(tx))
.await
.is_err()
{
warn!("Failed to send identify yourself command to a job worker");
return None;
}
rx.await
.map_err(|_| warn!("Failed to receive identify yourself answer from a job worker"))
.ok()
}
pub async fn resume(&self) {
if self.report_watch_rx.borrow().status == JobStatus::Paused {
self.paused.store(false, Ordering::Relaxed);

View file

@ -150,6 +150,10 @@ impl StatefulJob for IndexerJobInit {
const NAME: &'static str = "indexer";
const IS_BATCHED: bool = true;
fn target_location(&self) -> location::id::Type {
self.location.id
}
/// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`.
async fn init(
&self,

View file

@ -90,8 +90,6 @@ pub async fn shallow(
let to_remove_count = to_remove.len();
debug!("Walker at shallow indexer found {to_remove_count} file_paths to be removed");
node.thumbnailer
.remove_cas_ids(
to_remove
@ -167,12 +165,15 @@ pub async fn shallow(
})
.collect::<Vec<_>>();
debug!("Walker at shallow indexer found {to_update_count} file_paths to be updated");
for step in update_steps {
execute_indexer_update_step(&step, library).await?;
}
debug!(
"Walker at shallow indexer found: \
To create: {to_create_count}; To update: {to_update_count}; To remove: {to_remove_count};"
);
if to_create_count > 0 || to_update_count > 0 || to_remove_count > 0 {
if to_walk_path != location_path {
reverse_update_directories_sizes(to_walk_path, location_id, location_path, library)

View file

@ -77,6 +77,10 @@ impl StatefulJob for FileIdentifierJobInit {
const NAME: &'static str = "file_identifier";
const IS_BATCHED: bool = true;
fn target_location(&self) -> location::id::Type {
self.location.id
}
async fn init(
&self,
ctx: &WorkerContext,

View file

@ -54,6 +54,10 @@ impl StatefulJob for FileCopierJobInit {
const NAME: &'static str = "file_copier";
fn target_location(&self) -> location::id::Type {
self.target_location_id
}
async fn init(
&self,
ctx: &WorkerContext,

View file

@ -42,6 +42,10 @@ impl StatefulJob for FileCutterJobInit {
const NAME: &'static str = "file_cutter";
fn target_location(&self) -> location::id::Type {
self.target_location_id
}
async fn init(
&self,
ctx: &WorkerContext,

View file

@ -33,6 +33,10 @@ impl StatefulJob for FileDeleterJobInit {
const NAME: &'static str = "file_deleter";
fn target_location(&self) -> location::id::Type {
self.location_id
}
async fn init(
&self,
ctx: &WorkerContext,

View file

@ -62,6 +62,10 @@ impl StatefulJob for FileEraserJobInit {
const NAME: &'static str = "file_eraser";
fn target_location(&self) -> location::id::Type {
self.location_id
}
async fn init(
&self,
ctx: &WorkerContext,

View file

@ -67,6 +67,10 @@ impl StatefulJob for MediaProcessorJobInit {
const NAME: &'static str = "media_processor";
const IS_BATCHED: bool = true;
fn target_location(&self) -> location::id::Type {
self.location.id
}
async fn init(
&self,
ctx: &WorkerContext,

View file

@ -59,7 +59,7 @@ enum DatabaseMessage {
// Thumbnails directory have the following structure:
// thumbnails/
// ├── version.txt
//└── <cas_id>[0..2]/ # sharding
// └── <cas_id>[0..2]/ # sharding
// └── <cas_id>.webp
pub struct Thumbnailer {
cas_ids_to_delete_tx: chan::Sender<Vec<String>>,

View file

@ -58,6 +58,10 @@ impl StatefulJob for ObjectValidatorJobInit {
const NAME: &'static str = "object_validator";
fn target_location(&self) -> location::id::Type {
self.location.id
}
async fn init(
&self,
ctx: &WorkerContext,