Make Job table all optional (#958)

optional time
This commit is contained in:
Brendan Allan 2023-06-16 11:20:53 +02:00 committed by GitHub
parent af23ef69d3
commit 019459194d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 70 additions and 44 deletions

View file

@ -367,13 +367,14 @@ model ObjectInSpace {
//// Job ////
model Job {
id Bytes @id
name String
node_id Int
id Bytes @id
name String?
node_id Int?
action String? // Will be composed of "{action_description}(-{children_order})*"
// Enum: sd_core::job::job_manager:JobStatus
status Int @default(0) // 0 = Queued
status Int? // 0 = Queued
// List of errors, separated by "\n\n" in case of failed jobs or completed with errors
errors_text String?
@ -383,15 +384,15 @@ model Job {
parent_id Bytes?
task_count Int @default(1)
completed_task_count Int @default(0)
task_count Int?
completed_task_count Int?
date_estimated_completion DateTime? // Estimated timestamp that the job will be complete at
date_created DateTime @default(now())
date_started DateTime? @default(now()) // Started execution
date_created DateTime?
date_started DateTime? // Started execution
date_completed DateTime? // Finished execution
node Node @relation(fields: [node_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
node Node? @relation(fields: [node_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
parent Job? @relation("jobs_dependency", fields: [parent_id], references: [id], onDelete: Cascade, onUpdate: Cascade)
children Job[] @relation("jobs_dependency")

View file

@ -95,7 +95,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.exec()
.await?
.into_iter()
.map(JobReport::from)
.flat_map(JobReport::try_from)
.collect();
let active_reports = ctx.jobs.get_active_reports().await;

View file

@ -96,6 +96,9 @@ pub enum JobManagerError {
#[error("job not found: {0}")]
NotFound(Uuid),
#[error("missing-field: {0}")]
MissingField(#[from] MissingFieldError),
}
impl From<JobManagerError> for rspc::Error {
@ -116,6 +119,11 @@ impl From<JobManagerError> for rspc::Error {
"Job not found".to_string(),
value,
),
JobManagerError::MissingField(_) => Self::with_cause(
rspc::ErrorCode::InternalServerError,
"Missing field".to_string(),
value,
),
}
}
}

View file

@ -229,9 +229,9 @@ impl JobManager {
pub async fn cold_resume(self: Arc<Self>, library: &Library) -> Result<(), JobManagerError> {
// Include the Queued status in the initial find condition
let find_condition = vec![or(vec![
job::status::equals(JobStatus::Paused as i32),
job::status::equals(JobStatus::Running as i32),
job::status::equals(JobStatus::Queued as i32),
job::status::equals(Some(JobStatus::Paused as i32)),
job::status::equals(Some(JobStatus::Running as i32)),
job::status::equals(Some(JobStatus::Queued as i32)),
])];
let all_jobs = library
@ -241,9 +241,11 @@ impl JobManager {
.exec()
.await?
.into_iter()
.map(JobReport::from);
.map(JobReport::try_from);
for job in all_jobs {
let job = job?;
match initialize_resumable_job(job.clone(), None) {
Ok(resumable_job) => {
info!("Resuming job: {} with uuid {}", job.name, job.id);
@ -260,7 +262,7 @@ impl JobManager {
.job()
.update(
job::id::equals(job.id.as_bytes().to_vec()),
vec![job::status::set(JobStatus::Canceled as i32)],
vec![job::status::set(Some(JobStatus::Canceled as i32))],
)
.exec()
.await?;

View file

@ -1,7 +1,10 @@
use crate::{
library::Library,
prisma::{job, node},
util,
util::{
self,
db::{maybe_missing, MissingFieldError},
},
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
@ -59,12 +62,14 @@ impl Display for JobReport {
}
// convert database struct into a resource struct
impl From<job::Data> for JobReport {
fn from(data: job::Data) -> Self {
Self {
impl TryFrom<job::Data> for JobReport {
type Error = MissingFieldError;
fn try_from(data: job::Data) -> Result<Self, Self::Error> {
Ok(Self {
id: Uuid::from_slice(&data.id).expect("corrupted database"),
is_background: false, // deprecated
name: data.name,
name: maybe_missing(data.name, "job.name")?,
action: data.action,
data: data.data,
metadata: data.metadata.and_then(|m| {
@ -77,32 +82,38 @@ impl From<job::Data> for JobReport {
.errors_text
.map(|errors_str| errors_str.split("\n\n").map(str::to_string).collect())
.unwrap_or_default(),
created_at: Some(data.date_created.into()),
created_at: data.date_created.map(DateTime::into),
started_at: data.date_started.map(DateTime::into),
completed_at: data.date_completed.map(DateTime::into),
parent_id: data
.parent_id
.map(|id| Uuid::from_slice(&id).expect("corrupted database")),
status: JobStatus::try_from(data.status).expect("corrupted database"),
task_count: data.task_count,
completed_task_count: data.completed_task_count,
status: JobStatus::try_from(maybe_missing(data.status, "job.status")?)
.expect("corrupted database"),
task_count: maybe_missing(data.task_count, "job.task_count")?,
completed_task_count: maybe_missing(
data.completed_task_count,
"job.completed_task_count",
)?,
message: String::new(),
estimated_completion: data
.date_estimated_completion
.map_or(Utc::now(), DateTime::into),
}
})
}
}
// I despise having to write this twice, but it seems to be the only way to
// remove the data field from the struct
// would love to get this DRY'd up
impl From<job_without_data::Data> for JobReport {
fn from(data: job_without_data::Data) -> Self {
Self {
impl TryFrom<job_without_data::Data> for JobReport {
type Error = MissingFieldError;
fn try_from(data: job_without_data::Data) -> Result<Self, Self::Error> {
Ok(Self {
id: Uuid::from_slice(&data.id).expect("corrupted database"),
is_background: false, // deprecated
name: data.name,
name: maybe_missing(data.name, "job.name")?,
action: data.action,
data: None,
metadata: data.metadata.and_then(|m| {
@ -115,20 +126,24 @@ impl From<job_without_data::Data> for JobReport {
.errors_text
.map(|errors_str| errors_str.split("\n\n").map(str::to_string).collect())
.unwrap_or_default(),
created_at: Some(data.date_created.into()),
created_at: data.date_created.map(DateTime::into),
started_at: data.date_started.map(DateTime::into),
completed_at: data.date_completed.map(DateTime::into),
parent_id: data
.parent_id
.map(|id| Uuid::from_slice(&id).expect("corrupted database")),
status: JobStatus::try_from(data.status).expect("corrupted database"),
task_count: data.task_count,
completed_task_count: data.completed_task_count,
status: JobStatus::try_from(maybe_missing(data.status, "job.status")?)
.expect("corrupted database"),
task_count: maybe_missing(data.task_count, "job.task_count")?,
completed_task_count: maybe_missing(
data.completed_task_count,
"job.completed_task_count",
)?,
message: String::new(),
estimated_completion: data
.date_estimated_completion
.map_or(Utc::now(), DateTime::into),
}
})
}
}
@ -197,14 +212,14 @@ impl JobReport {
.job()
.create(
self.id.as_bytes().to_vec(),
self.name.clone(),
node::id::equals(library.node_local_id),
util::db::chain_optional_iter(
[
job::node::connect(node::id::equals(library.node_local_id)),
job::name::set(Some(self.name.clone())),
job::action::set(self.action.clone()),
job::data::set(self.data.clone()),
job::date_created::set(now.into()),
job::status::set(self.status as i32),
job::date_created::set(Some(now.into())),
job::status::set(Some(self.status as i32)),
job::date_started::set(self.started_at.map(|d| d.into())),
],
[self
@ -224,16 +239,16 @@ impl JobReport {
.update(
job::id::equals(self.id.as_bytes().to_vec()),
vec![
job::status::set(self.status as i32),
job::status::set(Some(self.status as i32)),
job::errors_text::set(
(!self.errors_text.is_empty()).then(|| self.errors_text.join("\n\n")),
),
job::data::set(self.data.clone()),
job::metadata::set(serde_json::to_vec(&self.metadata).ok()),
job::task_count::set(self.task_count),
job::completed_task_count::set(self.completed_task_count),
job::date_started::set(self.started_at.map(|v| v.into())),
job::date_completed::set(self.completed_at.map(|v| v.into())),
job::task_count::set(Some(self.task_count)),
job::completed_task_count::set(Some(self.completed_task_count)),
job::date_started::set(self.started_at.map(Into::into)),
job::date_completed::set(self.completed_at.map(Into::into)),
],
)
.exec()

View file

@ -184,7 +184,7 @@ export type MaybeNot<T> = T | { not: T }
export type MediaData = { id: number; pixel_width: number | null; pixel_height: number | null; longitude: number | null; latitude: number | null; fps: number | null; capture_device_make: string | null; capture_device_model: string | null; capture_device_software: string | null; duration_seconds: number | null; codecs: string | null; streams: number | null }
export type Node = { id: number; pub_id: number[]; name: string; platform: number; date_created: string }
export type Node = { id: number; pub_id: number[]; name: string; platform: number; version: string | null; last_seen: string; timezone: string | null; date_created: string }
/**
* NodeConfig is the configuration for a node. This is shared between all libraries and is stored in a JSON file on disk.