[ENG-778] Location clean up job (#1123)

* Introducing thumbnail remover actor
Also tweaking the orphan remover actor

* Rust fmt
This commit is contained in:
Ericson "Fogo" Soares 2023-07-20 12:17:54 -03:00 committed by GitHub
parent 8d6a060343
commit 03e71e98a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 316 additions and 71 deletions

View file

@ -8,7 +8,10 @@ use crate::{
LocationManager,
},
node::NodeConfigManager,
object::{orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path},
object::{
orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path,
thumbnail_remover::ThumbnailRemoverActor,
},
prisma::{file_path, location, PrismaClient},
sync::SyncManager,
util::{db::maybe_missing, error::FileIOError},
@ -48,6 +51,7 @@ pub struct Library {
/// p2p identity
pub identity: Arc<Identity>,
pub orphan_remover: OrphanRemoverActor,
pub thumbnail_remover: ThumbnailRemoverActor,
}
impl Debug for Library {

View file

@ -2,7 +2,10 @@ use crate::{
invalidate_query,
location::{indexer, LocationManagerError},
node::{NodeConfig, Platform},
object::{orphan_remover::OrphanRemoverActor, tag},
object::{
orphan_remover::OrphanRemoverActor, preview::THUMBNAIL_CACHE_DIR_NAME, tag,
thumbnail_remover::ThumbnailRemoverActor,
},
prisma::location,
sync::{SyncManager, SyncMessage},
util::{
@ -475,6 +478,13 @@ impl LibraryManager {
// key_manager,
sync: Arc::new(sync_manager),
orphan_remover: OrphanRemoverActor::spawn(db.clone()),
thumbnail_remover: ThumbnailRemoverActor::spawn(
db.clone(),
node_context
.config
.data_directory()
.join(THUMBNAIL_CACHE_DIR_NAME),
),
db,
node_context,
identity,

View file

@ -284,20 +284,32 @@ pub fn filter_existing_file_path_params(
/// the materialized path
#[allow(unused)]
pub fn loose_find_existing_file_path_params(
IsolatedFilePathData {
materialized_path,
location_id,
name,
extension,
..
}: &IsolatedFilePathData,
) -> Vec<file_path::WhereParam> {
vec![
file_path::location_id::equals(Some(*location_id)),
file_path::materialized_path::equals(Some(materialized_path.to_string())),
file_path::name::equals(Some(name.to_string())),
file_path::extension::equals(Some(extension.to_string())),
]
location_id: location::id::Type,
location_path: impl AsRef<Path>,
full_path: impl AsRef<Path>,
) -> Result<Vec<file_path::WhereParam>, FilePathError> {
let location_path = location_path.as_ref();
let full_path = full_path.as_ref();
let file_iso_file_path =
IsolatedFilePathData::new(location_id, location_path, full_path, false)?;
let dir_iso_file_path = IsolatedFilePathData::new(location_id, location_path, full_path, true)?;
Ok(vec![
file_path::location_id::equals(Some(location_id)),
file_path::materialized_path::equals(Some(
file_iso_file_path.materialized_path.to_string(),
)),
file_path::name::in_vec(vec![
file_iso_file_path.name.to_string(),
dir_iso_file_path.name.to_string(),
]),
file_path::extension::in_vec(vec![
file_iso_file_path.extension.to_string(),
dir_iso_file_path.extension.to_string(),
]),
])
}
pub async fn ensure_sub_path_is_in_location(

View file

@ -451,6 +451,7 @@ impl StatefulJob for IndexerJobInit {
if run_metadata.total_updated_paths > 0 {
// Invoking orphan remover here as we probably have some orphans objects due to updates
ctx.library.orphan_remover.invoke().await;
ctx.library.thumbnail_remover.invoke().await;
}
Ok(Some(json!({"init: ": init, "run_metadata": run_metadata})))

View file

@ -116,6 +116,7 @@ pub async fn shallow(
invalidate_query!(library, "search.paths");
library.orphan_remover.invoke().await;
library.thumbnail_remover.invoke().await;
Ok(())
}

View file

@ -35,6 +35,7 @@ use crate::location::file_path_helper::get_inode_and_device_from_path;
use std::{
collections::HashSet,
ffi::OsStr,
fs::Metadata,
path::{Path, PathBuf},
str::FromStr,
@ -43,7 +44,7 @@ use std::{
use sd_file_ext::extensions::ImageExtension;
use chrono::{DateTime, Local, Utc};
use notify::{Event, EventKind};
use notify::Event;
use prisma_client_rust::{raw, PrismaValue};
use serde_json::json;
use tokio::{fs, io::ErrorKind};
@ -55,10 +56,9 @@ use super::INodeAndDevice;
pub(super) fn check_event(event: &Event, ignore_paths: &HashSet<PathBuf>) -> bool {
// if path includes .DS_Store, .spacedrive file creation or is in the `ignore_paths` set, we ignore
!event.paths.iter().any(|p| {
let path_str = p.to_str().expect("Found non-UTF-8 path");
path_str.contains(".DS_Store")
|| (path_str.contains(".spacedrive") && matches!(event.kind, EventKind::Create(_)))
p.file_name()
.and_then(OsStr::to_str)
.map_or(false, |name| name == ".DS_Store" || name == ".spacedrive")
|| ignore_paths.contains(p)
})
}
@ -600,8 +600,10 @@ pub(super) async fn rename(
if let Some(file_path) = db
.file_path()
.find_first(loose_find_existing_file_path_params(
&IsolatedFilePathData::new(location_id, &location_path, old_path, false)?,
))
location_id,
&location_path,
old_path,
)?)
.exec()
.await?
{
@ -664,8 +666,8 @@ pub(super) async fn remove(
let Some(file_path) = library.db
.file_path()
.find_first(loose_find_existing_file_path_params(
&IsolatedFilePathData::new(location_id, &location_path, full_path, false)?,
))
location_id, &location_path, full_path,
)?)
.exec()
.await? else {
return Ok(());
@ -715,8 +717,6 @@ pub(super) async fn remove_by_file_path(
.await?;
}
}
library.orphan_remover.invoke().await;
}
Err(e) => return Err(FileIOError::from((path, e)).into()),
}
@ -791,8 +791,10 @@ pub(super) async fn extract_inode_and_device_from_path(
.db
.file_path()
.find_first(loose_find_existing_file_path_params(
&IsolatedFilePathData::new(location_id, location_path, path, false)?,
))
location_id,
location_path,
path,
)?)
.select(file_path::select!({ inode device }))
.exec()
.await?

View file

@ -706,11 +706,10 @@ pub async fn delete_directory(
})],
);
for params in children_params.chunks(512) {
db.file_path().delete_many(params.to_vec()).exec().await?;
}
db.file_path().delete_many(children_params).exec().await?;
library.orphan_remover.invoke().await;
library.thumbnail_remover.invoke().await;
invalidate_query!(library, "search.paths");
Ok(())

View file

@ -98,6 +98,9 @@ impl StatefulJob for FileDeleterJobInit {
let init = self;
invalidate_query!(ctx.library, "search.paths");
ctx.library.orphan_remover.invoke().await;
ctx.library.thumbnail_remover.invoke().await;
Ok(Some(json!({ "init": init })))
}
}

View file

@ -9,6 +9,7 @@ pub mod fs;
pub mod orphan_remover;
pub mod preview;
pub mod tag;
pub mod thumbnail_remover;
pub mod validation;
// Objects are primarily created by the identifier from Paths

View file

@ -1,58 +1,48 @@
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::*;
use tracing::{debug, error};
use crate::prisma::{object, tag_on_object, PrismaClient};
use crate::prisma::*;
use std::{sync::Arc, time::Duration};
use tokio::{
select,
sync::mpsc,
time::{interval_at, Instant, MissedTickBehavior},
};
use tracing::{error, trace};
const TEN_SECONDS: Duration = Duration::from_secs(10);
const ONE_MINUTE: Duration = Duration::from_secs(60);
// Actor that can be invoked to find and delete objects with no matching file paths
#[derive(Clone)]
pub struct OrphanRemoverActor {
tx: Sender<()>,
tx: mpsc::Sender<()>,
}
impl OrphanRemoverActor {
pub fn spawn(db: Arc<PrismaClient>) -> Self {
let (tx, mut rx) = channel(4);
let (tx, mut rx) = mpsc::channel(4);
tokio::spawn(async move {
while let Some(()) = rx.recv().await {
// prevents timeouts
tokio::time::sleep(Duration::from_millis(10)).await;
let mut last_checked = Instant::now();
loop {
let objs = match db
.object()
.find_many(vec![object::file_paths::none(vec![])])
.take(512)
.select(object::select!({ id pub_id }))
.exec()
.await
{
Ok(objs) => objs,
Err(e) => {
error!("Failed to fetch orphaned objects: {e}");
let mut check_interval = interval_at(Instant::now() + ONE_MINUTE, ONE_MINUTE);
check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
// Here we wait for a signal or for the tick interval to be reached
select! {
_ = check_interval.tick() => {}
signal = rx.recv() => {
if signal.is_none() {
break;
}
};
if objs.is_empty() {
break;
}
}
debug!("Removing {} orphaned objects", objs.len());
let ids: Vec<_> = objs.iter().map(|o| o.id).collect();
if let Err(e) = db
._batch((
db.tag_on_object()
.delete_many(vec![tag_on_object::object_id::in_vec(ids.clone())]),
db.object().delete_many(vec![object::id::in_vec(ids)]),
))
.await
{
error!("Failed to remove orphaned objects: {e}");
}
// For any of them we process a clean up if a time since the last one already passed
if last_checked.elapsed() > TEN_SECONDS {
Self::process_clean_up(&db).await;
last_checked = Instant::now();
}
}
});
@ -63,4 +53,42 @@ impl OrphanRemoverActor {
pub async fn invoke(&self) {
self.tx.send(()).await.ok();
}
async fn process_clean_up(db: &PrismaClient) {
loop {
let Ok(objects_ids) = db
.object()
.find_many(vec![object::file_paths::none(vec![])])
.take(512)
.select(object::select!({ id }))
.exec()
.await
.map(|objects| objects.into_iter()
.map(|object| object.id)
.collect::<Vec<_>>()
)
.map_err(|e| error!("Failed to fetch orphaned objects: {e:#?}"))
else {
break;
};
if objects_ids.is_empty() {
break;
}
trace!("Removing {} orphaned objects", objects_ids.len());
if let Err(e) = db
._batch((
db.tag_on_object()
.delete_many(vec![tag_on_object::object_id::in_vec(objects_ids.clone())]),
db.object()
.delete_many(vec![object::id::in_vec(objects_ids)]),
))
.await
{
error!("Failed to remove orphaned objects: {e:#?}");
}
}
}
}

View file

@ -0,0 +1,184 @@
use crate::{
prisma::{file_path, PrismaClient},
util::error::{FileIOError, NonUtf8PathError},
};
use std::{collections::HashSet, ffi::OsStr, path::Path, sync::Arc, time::Duration};
use futures::future::try_join_all;
use thiserror::Error;
use tokio::{
fs, select,
sync::mpsc,
time::{interval_at, Instant, MissedTickBehavior},
};
use tracing::error;
const TEN_SECONDS: Duration = Duration::from_secs(10);
const FIVE_MINUTES: Duration = Duration::from_secs(5 * 60);
#[derive(Error, Debug)]
enum ThumbnailRemoverActorError {
#[error("database error")]
Database(#[from] prisma_client_rust::QueryError),
#[error("missing file name: {}", .0.display())]
MissingFileName(Box<Path>),
#[error(transparent)]
FileIO(#[from] FileIOError),
#[error(transparent)]
NonUtf8Path(#[from] NonUtf8PathError),
}
#[derive(Clone)]
pub struct ThumbnailRemoverActor {
tx: mpsc::Sender<()>,
}
impl ThumbnailRemoverActor {
pub fn spawn(db: Arc<PrismaClient>, thumbnails_directory: impl AsRef<Path>) -> Self {
let (tx, mut rx) = mpsc::channel(4);
let thumbnails_directory = thumbnails_directory.as_ref().to_path_buf();
tokio::spawn(async move {
let mut last_checked = Instant::now();
let mut check_interval = interval_at(Instant::now() + FIVE_MINUTES, FIVE_MINUTES);
check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
// Here we wait for a signal or for the tick interval to be reached
select! {
_ = check_interval.tick() => {}
signal = rx.recv() => {
if signal.is_none() {
break;
}
}
}
// For any of them we process a clean up if a time since the last one already passed
if last_checked.elapsed() > TEN_SECONDS {
if let Err(e) = Self::process_clean_up(&db, &thumbnails_directory).await {
error!("Got an error when trying to clean stale thumbnails: {e:#?}");
}
last_checked = Instant::now();
}
}
});
Self { tx }
}
pub async fn invoke(&self) {
self.tx.send(()).await.ok();
}
async fn process_clean_up(
db: &PrismaClient,
thumbnails_directory: &Path,
) -> Result<(), ThumbnailRemoverActorError> {
let mut read_dir = fs::read_dir(thumbnails_directory)
.await
.map_err(|e| FileIOError::from((thumbnails_directory, e)))?;
while let Some(entry) = read_dir
.next_entry()
.await
.map_err(|e| FileIOError::from((thumbnails_directory, e)))?
{
let entry_path = entry.path();
if !entry
.metadata()
.await
.map_err(|e| FileIOError::from((thumbnails_directory, e)))?
.is_dir()
{
continue;
}
let entry_path_name = entry_path
.file_name()
.ok_or_else(|| {
ThumbnailRemoverActorError::MissingFileName(entry.path().into_boxed_path())
})?
.to_str()
.ok_or_else(|| NonUtf8PathError(entry.path().into_boxed_path()))?;
let mut thumbnails_paths_by_cas_id = Vec::new();
let mut entry_read_dir = fs::read_dir(&entry_path)
.await
.map_err(|e| FileIOError::from((&entry_path, e)))?;
while let Some(thumb_entry) = entry_read_dir
.next_entry()
.await
.map_err(|e| FileIOError::from((&entry_path, e)))?
{
let thumb_path = thumb_entry.path();
if thumb_path
.extension()
.and_then(OsStr::to_str)
.map_or(true, |ext| ext != "webp")
{
continue;
}
let thumbnail_name = thumb_path
.file_stem()
.ok_or_else(|| {
ThumbnailRemoverActorError::MissingFileName(entry.path().into_boxed_path())
})?
.to_str()
.ok_or_else(|| NonUtf8PathError(entry.path().into_boxed_path()))?;
thumbnails_paths_by_cas_id
.push((format!("{}{}", entry_path_name, thumbnail_name), thumb_path));
}
if thumbnails_paths_by_cas_id.is_empty() {
fs::remove_dir(&entry_path)
.await
.map_err(|e| FileIOError::from((entry_path, e)))?;
continue;
}
let thumbs_in_db = db
.file_path()
.find_many(vec![file_path::cas_id::in_vec(
thumbnails_paths_by_cas_id
.iter()
.map(|(cas_id, _)| cas_id)
.cloned()
.collect(),
)])
.select(file_path::select!({ cas_id }))
.exec()
.await?
.into_iter()
.map(|file_path| {
file_path
.cas_id
.expect("only file paths with a cas_id were queried")
})
.collect::<HashSet<_>>();
try_join_all(
thumbnails_paths_by_cas_id
.into_iter()
.filter_map(|(cas_id, path)| {
(!thumbs_in_db.contains(&cas_id)).then_some(async move {
fs::remove_file(&path)
.await
.map_err(|e| FileIOError::from((path, e)))
})
}),
)
.await?;
}
Ok(())
}
}