Introducing Indexer rules (#363)

* Introducing indexer rules for the indexer job
* Solving a minor race condition at `Worker::spawn`
* Many refactors at database accesses with prisma
* Documenting some features of indexer rules

* Updating to new prisma rev

* Updating photo

* Some documentation tweaks

* fix note editing

* Implementing some requested changes
* Fixing unit tests on walk function
* Using rsps::Error::with_cause instead of new

* Introducing a compoung unique key between `node_id` and `local_path` at `locations` table

Co-authored-by: Oscar Beaumont <oscar@otbeaumont.me>
This commit is contained in:
Ericson "Fogo" Soares 2022-09-01 20:38:26 -03:00 committed by GitHub
parent cc6e921ca0
commit 25c111a8bf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
44 changed files with 2408 additions and 1007 deletions

29
Cargo.lock generated
View file

@ -1866,6 +1866,7 @@ dependencies = [
"fnv",
"log",
"regex",
"serde",
]
[[package]]
@ -4896,10 +4897,12 @@ dependencies = [
"ffmpeg-next",
"fs_extra",
"futures",
"globset",
"hostname 0.3.1",
"image",
"include_dir",
"int-enum",
"itertools",
"once_cell",
"prisma-client-rust",
"ring 0.17.0-alpha.11",
@ -4909,12 +4912,13 @@ dependencies = [
"serde",
"serde_json",
"sysinfo",
"tempfile",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
"tracing-test",
"uuid 1.1.2",
"walkdir",
"webp",
]
@ -6272,6 +6276,29 @@ dependencies = [
"tracing-log",
]
[[package]]
name = "tracing-test"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3d272c44878d2bbc9f4a20ad463724f03e19dbc667c6e84ac433ab7ffcc70b"
dependencies = [
"lazy_static",
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "744324b12d69a9fc1edea4b38b7b1311295b662d161ad5deac17bb1358224a08"
dependencies = [
"lazy_static",
"quote",
"syn",
]
[[package]]
name = "treediff"
version = "3.0.2"

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

After

Width:  |  Height:  |  Size: 391 KiB

View file

@ -2,116 +2,130 @@
export type Operations = {
queries:
{ key: ["library.getStatistics", LibraryArgs<null>], result: Statistics } |
{ key: ["jobs.getRunning", LibraryArgs<null>], result: Array<JobReport> } |
{ key: ["version"], result: string } |
{ key: ["files.readMetadata", LibraryArgs<number>], result: null } |
{ key: ["locations.getExplorerDir", LibraryArgs<GetExplorerDirArgs>], result: DirectoryWithContents } |
{ key: ["jobs.getHistory", LibraryArgs<null>], result: Array<JobReport> } |
{ key: ["library.get"], result: Array<LibraryConfigWrapped> } |
{ key: ["volumes.get"], result: Array<Volume> } |
{ key: ["locations.indexer_ruleslist", LibraryArgs<null>], result: Array<IndexerRule> } |
{ key: ["files.readMetadata", LibraryArgs<number>], result: null } |
{ key: ["tags.getFilesForTag", LibraryArgs<number>], result: Tag | null } |
{ key: ["library.getStatistics", LibraryArgs<null>], result: Statistics } |
{ key: ["locations.get", LibraryArgs<null>], result: Array<Location> } |
{ key: ["library.get"], result: Array<LibraryConfigWrapped> } |
{ key: ["locations.indexer_rulesget", LibraryArgs<number>], result: IndexerRule } |
{ key: ["locations.getById", LibraryArgs<number>], result: Location | null } |
{ key: ["jobs.getRunning", LibraryArgs<null>], result: Array<JobReport> } |
{ key: ["volumes.get"], result: Array<Volume> } |
{ key: ["locations.getExplorerDir", LibraryArgs<GetExplorerDirArgs>], result: DirectoryWithContents } |
{ key: ["tags.get", LibraryArgs<null>], result: Array<Tag> } |
{ key: ["getNode"], result: NodeState },
mutations:
{ key: ["tags.create", LibraryArgs<TagCreateArgs>], result: Tag } |
{ key: ["files.setFavorite", LibraryArgs<SetFavoriteArgs>], result: null } |
{ key: ["locations.indexer_rulesdelete", LibraryArgs<number>], result: null } |
{ key: ["jobs.identifyUniqueFiles", LibraryArgs<IdentifyUniqueFilesArgs>], result: null } |
{ key: ["files.delete", LibraryArgs<number>], result: null } |
{ key: ["library.edit", EditLibraryArgs], result: null } |
{ key: ["library.delete", string], result: null } |
{ key: ["jobs.generateThumbsForLocation", LibraryArgs<GenerateThumbsForLocationArgs>], result: null } |
{ key: ["files.setNote", LibraryArgs<SetNoteArgs>], result: null } |
{ key: ["library.create", string], result: null } |
{ key: ["locations.quickRescan", LibraryArgs<null>], result: null } |
{ key: ["library.edit", EditLibraryArgs], result: null } |
{ key: ["jobs.generateThumbsForLocation", LibraryArgs<GenerateThumbsForLocationArgs>], result: null } |
{ key: ["files.delete", LibraryArgs<number>], result: null } |
{ key: ["locations.update", LibraryArgs<LocationUpdateArgs>], result: null } |
{ key: ["tags.create", LibraryArgs<TagCreateArgs>], result: Tag } |
{ key: ["locations.delete", LibraryArgs<number>], result: null } |
{ key: ["locations.create", LibraryArgs<LocationCreateArgs>], result: Location } |
{ key: ["files.setNote", LibraryArgs<SetNoteArgs>], result: null } |
{ key: ["tags.update", LibraryArgs<TagUpdateArgs>], result: null } |
{ key: ["tags.assign", LibraryArgs<TagAssignArgs>], result: null } |
{ key: ["locations.create", LibraryArgs<string>], result: Location } |
{ key: ["locations.update", LibraryArgs<LocationUpdateArgs>], result: null } |
{ key: ["locations.indexer_rulescreate", LibraryArgs<IndexerRuleCreateArgs>], result: IndexerRule } |
{ key: ["locations.fullRescan", LibraryArgs<number>], result: null } |
{ key: ["tags.delete", LibraryArgs<number>], result: null },
{ key: ["tags.delete", LibraryArgs<number>], result: null } |
{ key: ["locations.quickRescan", LibraryArgs<null>], result: null } |
{ key: ["files.setFavorite", LibraryArgs<SetFavoriteArgs>], result: null },
subscriptions:
{ key: ["jobs.newThumbnail", LibraryArgs<null>], result: string } |
{ key: ["invalidateQuery"], result: InvalidateOperationEvent }
{ key: ["invalidateQuery"], result: InvalidateOperationEvent } |
{ key: ["jobs.newThumbnail", LibraryArgs<null>], result: string }
};
export interface TagCreateArgs { name: string, color: string }
export interface Location { id: number, pub_id: Array<number>, node_id: number | null, name: string | null, local_path: string | null, total_capacity: number | null, available_capacity: number | null, filesystem: string | null, disk_type: number | null, is_removable: boolean | null, is_online: boolean, date_created: string, node: Node | null | null, file_paths: Array<FilePath> | null }
export interface File { id: number, cas_id: string, integrity_checksum: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, tags: Array<TagOnFile> | null, labels: Array<LabelOnFile> | null, albums: Array<FileInAlbum> | null, spaces: Array<FileInSpace> | null, paths: Array<FilePath> | null, comments: Array<Comment> | null, media_data: MediaData | null | null, key: Key | null | null }
export interface EditLibraryArgs { id: string, name: string | null, description: string | null }
export interface LibraryArgs<T> { library_id: string, arg: T }
export interface NodeConfig { version: string | null, id: string, name: string, p2p_port: number | null }
export interface MediaData { id: number, pixel_width: number | null, pixel_height: number | null, longitude: number | null, latitude: number | null, fps: number | null, capture_device_make: string | null, capture_device_model: string | null, capture_device_software: string | null, duration_seconds: number | null, codecs: string | null, streams: number | null, files: File | null | null }
export interface Space { id: number, pub_id: Array<number>, name: string | null, description: string | null, date_created: string, date_modified: string, files: Array<FileInSpace> | null }
export interface Album { id: number, pub_id: Array<number>, name: string, is_hidden: boolean, date_created: string, date_modified: string, files: Array<FileInAlbum> | null }
export interface LabelOnFile { date_created: string, label_id: number, label: Label | null, file_id: number, file: File | null }
export interface JobReport { id: string, name: string, data: Array<number> | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: number }
export interface FilePath { id: number, is_dir: boolean, location_id: number | null, materialized_path: string, name: string, extension: string | null, file_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, file: File | null | null, location: Location | null | null, key: Key | null | null }
export interface LocationUpdateArgs { id: number, name: string | null }
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused"
export interface Node { id: number, pub_id: Array<number>, name: string, platform: number, version: string | null, last_seen: string, timezone: string | null, date_created: string, sync_events: Array<SyncEvent> | null, jobs: Array<Job> | null, Location: Array<Location> | null }
export interface Key { id: number, checksum: string, name: string | null, date_created: string | null, algorithm: number | null, files: Array<File> | null, file_paths: Array<FilePath> | null }
export interface SyncEvent { id: number, node_id: number, timestamp: string, record_id: Array<number>, kind: number, column: string | null, value: string, node: Node | null }
export interface SetFavoriteArgs { id: number, favorite: boolean }
export interface InvalidateOperationEvent { key: string, arg: any }
export interface NodeState { version: string | null, id: string, name: string, p2p_port: number | null, data_path: string }
export interface ConfigMetadata { version: string | null }
export interface LocationUpdateArgs { id: number, name: string | null, indexer_rules_ids: Array<number> }
export interface Volume { name: string, mount_point: string, total_capacity: bigint, available_capacity: bigint, is_removable: boolean, disk_type: string | null, file_system: string | null, is_root_filesystem: boolean }
export interface FileInAlbum { date_created: string, album_id: number, album: Album | null, file_id: number, file: File | null }
export interface Comment { id: number, pub_id: Array<number>, content: string, date_created: string, date_modified: string, file_id: number | null, file: File | null | null }
export interface LibraryConfig { version: string | null, name: string, description: string }
export interface TagUpdateArgs { id: number, name: string | null, color: string | null }
export interface TagAssignArgs { file_id: number, tag_id: number }
export interface Statistics { id: number, date_captured: string, total_file_count: number, library_db_size: string, total_bytes_used: string, total_bytes_capacity: string, total_unique_bytes: string, total_bytes_free: string, preview_media_bytes: string }
export interface GenerateThumbsForLocationArgs { id: number, path: string }
export interface SetNoteArgs { id: number, note: string | null }
export interface Job { id: Array<number>, name: string, node_id: number, action: number, status: number, data: Array<number> | null, task_count: number, completed_task_count: number, date_created: string, date_modified: string, seconds_elapsed: number, nodes: Node | null }
export interface GetExplorerDirArgs { location_id: number, path: string, limit: number }
export interface Label { id: number, pub_id: Array<number>, name: string | null, date_created: string, date_modified: string, label_files: Array<LabelOnFile> | null }
export interface LibraryConfigWrapped { uuid: string, config: LibraryConfig }
export interface TagOnFile { date_created: string, tag_id: number, tag: Tag | null, file_id: number, file: File | null }
export interface Tag { id: number, pub_id: Array<number>, name: string | null, color: string | null, total_files: number | null, redundancy_goal: number | null, date_created: string, date_modified: string, tag_files: Array<TagOnFile> | null }
export interface SetFavoriteArgs { id: number, favorite: boolean }
export interface IdentifyUniqueFilesArgs { id: number, path: string }
export interface GenerateThumbsForLocationArgs { id: number, path: string }
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused"
export interface ConfigMetadata { version: string | null }
export interface File { id: number, cas_id: string, integrity_checksum: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, tags: Array<TagOnFile> | null, labels: Array<LabelOnFile> | null, albums: Array<FileInAlbum> | null, spaces: Array<FileInSpace> | null, paths: Array<FilePath> | null, comments: Array<Comment> | null, media_data: MediaData | null | null, key: Key | null | null }
export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent"
export interface Label { id: number, pub_id: Array<number>, name: string | null, date_created: string, date_modified: string, label_files: Array<LabelOnFile> | null }
export interface FilePath { id: number, is_dir: boolean, location_id: number, materialized_path: string, name: string, extension: string | null, file_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, file: File | null | null, location: Location | null | null, key: Key | null | null }
export interface IndexerRuleCreateArgs { kind: RuleKind, name: string, parameters: Array<number> }
export interface SyncEvent { id: number, node_id: number, timestamp: string, record_id: Array<number>, kind: number, column: string | null, value: string, node: Node | null }
export interface Statistics { id: number, date_captured: string, total_file_count: number, library_db_size: string, total_bytes_used: string, total_bytes_capacity: string, total_unique_bytes: string, total_bytes_free: string, preview_media_bytes: string }
export interface NodeConfig { version: string | null, id: string, name: string, p2p_port: number | null }
export interface DirectoryWithContents { directory: FilePath, contents: Array<FilePath> }
export interface SetNoteArgs { id: number, note: string | null }
export interface IndexerRulesInLocation { date_created: string, location_id: number, location: Location | null, indexer_rule_id: number, indexer_rule: IndexerRule | null }
export interface JobReport { id: string, name: string, data: Array<number> | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: number }
export interface TagUpdateArgs { id: number, name: string | null, color: string | null }
export interface FileInSpace { date_created: string, space_id: number, space: Space | null, file_id: number, file: File | null }
export interface LabelOnFile { date_created: string, label_id: number, label: Label | null, file_id: number, file: File | null }
export interface LibraryArgs<T> { library_id: string, arg: T }
export interface Location { id: number, pub_id: Array<number>, node_id: number | null, name: string | null, local_path: string | null, total_capacity: number | null, available_capacity: number | null, filesystem: string | null, disk_type: number | null, is_removable: boolean | null, is_online: boolean, date_created: string, node: Node | null | null, file_paths: Array<FilePath> | null, indexer_rules: Array<IndexerRulesInLocation> | null }
export interface Node { id: number, pub_id: Array<number>, name: string, platform: number, version: string | null, last_seen: string, timezone: string | null, date_created: string, sync_events: Array<SyncEvent> | null, jobs: Array<Job> | null, Location: Array<Location> | null }
export interface LocationCreateArgs { path: string, indexer_rules_ids: Array<number> }
export interface InvalidateOperationEvent { key: string, arg: any }
export interface LibraryConfigWrapped { uuid: string, config: LibraryConfig }
export interface Tag { id: number, pub_id: Array<number>, name: string | null, color: string | null, total_files: number | null, redundancy_goal: number | null, date_created: string, date_modified: string, tag_files: Array<TagOnFile> | null }
export interface Job { id: Array<number>, name: string, node_id: number, action: number, status: number, data: Array<number> | null, task_count: number, completed_task_count: number, date_created: string, date_modified: string, seconds_elapsed: number, nodes: Node | null }
export interface TagOnFile { date_created: string, tag_id: number, tag: Tag | null, file_id: number, file: File | null }
export interface Key { id: number, checksum: string, name: string | null, date_created: string | null, algorithm: number | null, files: Array<File> | null, file_paths: Array<FilePath> | null }
export interface FileInAlbum { date_created: string, album_id: number, album: Album | null, file_id: number, file: File | null }
export interface EditLibraryArgs { id: string, name: string | null, description: string | null }
export interface TagCreateArgs { name: string, color: string }
export interface NodeState { version: string | null, id: string, name: string, p2p_port: number | null, data_path: string }
export interface Comment { id: number, pub_id: Array<number>, content: string, date_created: string, date_modified: string, file_id: number | null, file: File | null | null }
export interface IndexerRule { id: number, kind: number, name: string, parameters: Array<number>, date_created: string, date_modified: string, locations: Array<IndexerRulesInLocation> | null }
export interface TagAssignArgs { file_id: number, tag_id: number }
export interface GetExplorerDirArgs { location_id: number, path: string, limit: number }
export interface LibraryConfig { version: string | null, name: string, description: string }
export interface Space { id: number, pub_id: Array<number>, name: string | null, description: string | null, date_created: string, date_modified: string, files: Array<FileInSpace> | null }
export interface MediaData { id: number, pixel_width: number | null, pixel_height: number | null, longitude: number | null, latitude: number | null, fps: number | null, capture_device_make: string | null, capture_device_model: string | null, capture_device_software: string | null, duration_seconds: number | null, codecs: string | null, streams: number | null, files: File | null | null }

View file

@ -39,7 +39,6 @@ rspc = { version = "0.0.4", features = [
"chrono",
"tracing",
] }
walkdir = "^2.3.2"
uuid = { version = "1.1.2", features = ["v4", "serde"] }
sysinfo = "0.23.9"
thiserror = "1.0.30"
@ -56,3 +55,9 @@ tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
async-stream = "0.3.3"
once_cell = "1.13.0"
ctor = "0.1.22"
globset = { version = "^0.4.9", features = ["serde1"] }
itertools = "^0.10.3"
[dev-dependencies]
tempfile = "^3.3.0"
tracing-test = "^0.2.3"

View file

@ -2,116 +2,130 @@
export type Operations = {
queries:
{ key: ["library.getStatistics", LibraryArgs<null>], result: Statistics } |
{ key: ["jobs.getRunning", LibraryArgs<null>], result: Array<JobReport> } |
{ key: ["version"], result: string } |
{ key: ["files.readMetadata", LibraryArgs<number>], result: null } |
{ key: ["locations.getExplorerDir", LibraryArgs<GetExplorerDirArgs>], result: DirectoryWithContents } |
{ key: ["jobs.getHistory", LibraryArgs<null>], result: Array<JobReport> } |
{ key: ["library.get"], result: Array<LibraryConfigWrapped> } |
{ key: ["volumes.get"], result: Array<Volume> } |
{ key: ["locations.indexer_ruleslist", LibraryArgs<null>], result: Array<IndexerRule> } |
{ key: ["files.readMetadata", LibraryArgs<number>], result: null } |
{ key: ["tags.getFilesForTag", LibraryArgs<number>], result: Tag | null } |
{ key: ["library.getStatistics", LibraryArgs<null>], result: Statistics } |
{ key: ["locations.get", LibraryArgs<null>], result: Array<Location> } |
{ key: ["library.get"], result: Array<LibraryConfigWrapped> } |
{ key: ["locations.indexer_rulesget", LibraryArgs<number>], result: IndexerRule } |
{ key: ["locations.getById", LibraryArgs<number>], result: Location | null } |
{ key: ["jobs.getRunning", LibraryArgs<null>], result: Array<JobReport> } |
{ key: ["volumes.get"], result: Array<Volume> } |
{ key: ["locations.getExplorerDir", LibraryArgs<GetExplorerDirArgs>], result: DirectoryWithContents } |
{ key: ["tags.get", LibraryArgs<null>], result: Array<Tag> } |
{ key: ["getNode"], result: NodeState },
mutations:
{ key: ["tags.create", LibraryArgs<TagCreateArgs>], result: Tag } |
{ key: ["files.setFavorite", LibraryArgs<SetFavoriteArgs>], result: null } |
{ key: ["locations.indexer_rulesdelete", LibraryArgs<number>], result: null } |
{ key: ["jobs.identifyUniqueFiles", LibraryArgs<IdentifyUniqueFilesArgs>], result: null } |
{ key: ["files.delete", LibraryArgs<number>], result: null } |
{ key: ["library.edit", EditLibraryArgs], result: null } |
{ key: ["library.delete", string], result: null } |
{ key: ["jobs.generateThumbsForLocation", LibraryArgs<GenerateThumbsForLocationArgs>], result: null } |
{ key: ["files.setNote", LibraryArgs<SetNoteArgs>], result: null } |
{ key: ["library.create", string], result: null } |
{ key: ["locations.quickRescan", LibraryArgs<null>], result: null } |
{ key: ["library.edit", EditLibraryArgs], result: null } |
{ key: ["jobs.generateThumbsForLocation", LibraryArgs<GenerateThumbsForLocationArgs>], result: null } |
{ key: ["files.delete", LibraryArgs<number>], result: null } |
{ key: ["locations.update", LibraryArgs<LocationUpdateArgs>], result: null } |
{ key: ["tags.create", LibraryArgs<TagCreateArgs>], result: Tag } |
{ key: ["locations.delete", LibraryArgs<number>], result: null } |
{ key: ["locations.create", LibraryArgs<LocationCreateArgs>], result: Location } |
{ key: ["files.setNote", LibraryArgs<SetNoteArgs>], result: null } |
{ key: ["tags.update", LibraryArgs<TagUpdateArgs>], result: null } |
{ key: ["tags.assign", LibraryArgs<TagAssignArgs>], result: null } |
{ key: ["locations.create", LibraryArgs<string>], result: Location } |
{ key: ["locations.update", LibraryArgs<LocationUpdateArgs>], result: null } |
{ key: ["locations.indexer_rulescreate", LibraryArgs<IndexerRuleCreateArgs>], result: IndexerRule } |
{ key: ["locations.fullRescan", LibraryArgs<number>], result: null } |
{ key: ["tags.delete", LibraryArgs<number>], result: null },
{ key: ["tags.delete", LibraryArgs<number>], result: null } |
{ key: ["locations.quickRescan", LibraryArgs<null>], result: null } |
{ key: ["files.setFavorite", LibraryArgs<SetFavoriteArgs>], result: null },
subscriptions:
{ key: ["jobs.newThumbnail", LibraryArgs<null>], result: string } |
{ key: ["invalidateQuery"], result: InvalidateOperationEvent }
{ key: ["invalidateQuery"], result: InvalidateOperationEvent } |
{ key: ["jobs.newThumbnail", LibraryArgs<null>], result: string }
};
export interface TagCreateArgs { name: string, color: string }
export interface Location { id: number, pub_id: Array<number>, node_id: number | null, name: string | null, local_path: string | null, total_capacity: number | null, available_capacity: number | null, filesystem: string | null, disk_type: number | null, is_removable: boolean | null, is_online: boolean, date_created: string, node: Node | null | null, file_paths: Array<FilePath> | null }
export interface File { id: number, cas_id: string, integrity_checksum: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, tags: Array<TagOnFile> | null, labels: Array<LabelOnFile> | null, albums: Array<FileInAlbum> | null, spaces: Array<FileInSpace> | null, paths: Array<FilePath> | null, comments: Array<Comment> | null, media_data: MediaData | null | null, key: Key | null | null }
export interface EditLibraryArgs { id: string, name: string | null, description: string | null }
export interface LibraryArgs<T> { library_id: string, arg: T }
export interface NodeConfig { version: string | null, id: string, name: string, p2p_port: number | null }
export interface MediaData { id: number, pixel_width: number | null, pixel_height: number | null, longitude: number | null, latitude: number | null, fps: number | null, capture_device_make: string | null, capture_device_model: string | null, capture_device_software: string | null, duration_seconds: number | null, codecs: string | null, streams: number | null, files: File | null | null }
export interface Space { id: number, pub_id: Array<number>, name: string | null, description: string | null, date_created: string, date_modified: string, files: Array<FileInSpace> | null }
export interface Album { id: number, pub_id: Array<number>, name: string, is_hidden: boolean, date_created: string, date_modified: string, files: Array<FileInAlbum> | null }
export interface LabelOnFile { date_created: string, label_id: number, label: Label | null, file_id: number, file: File | null }
export interface JobReport { id: string, name: string, data: Array<number> | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: number }
export interface FilePath { id: number, is_dir: boolean, location_id: number | null, materialized_path: string, name: string, extension: string | null, file_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, file: File | null | null, location: Location | null | null, key: Key | null | null }
export interface LocationUpdateArgs { id: number, name: string | null }
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused"
export interface Node { id: number, pub_id: Array<number>, name: string, platform: number, version: string | null, last_seen: string, timezone: string | null, date_created: string, sync_events: Array<SyncEvent> | null, jobs: Array<Job> | null, Location: Array<Location> | null }
export interface Key { id: number, checksum: string, name: string | null, date_created: string | null, algorithm: number | null, files: Array<File> | null, file_paths: Array<FilePath> | null }
export interface SyncEvent { id: number, node_id: number, timestamp: string, record_id: Array<number>, kind: number, column: string | null, value: string, node: Node | null }
export interface SetFavoriteArgs { id: number, favorite: boolean }
export interface InvalidateOperationEvent { key: string, arg: any }
export interface NodeState { version: string | null, id: string, name: string, p2p_port: number | null, data_path: string }
export interface ConfigMetadata { version: string | null }
export interface LocationUpdateArgs { id: number, name: string | null, indexer_rules_ids: Array<number> }
export interface Volume { name: string, mount_point: string, total_capacity: bigint, available_capacity: bigint, is_removable: boolean, disk_type: string | null, file_system: string | null, is_root_filesystem: boolean }
export interface FileInAlbum { date_created: string, album_id: number, album: Album | null, file_id: number, file: File | null }
export interface Comment { id: number, pub_id: Array<number>, content: string, date_created: string, date_modified: string, file_id: number | null, file: File | null | null }
export interface LibraryConfig { version: string | null, name: string, description: string }
export interface TagUpdateArgs { id: number, name: string | null, color: string | null }
export interface TagAssignArgs { file_id: number, tag_id: number }
export interface Statistics { id: number, date_captured: string, total_file_count: number, library_db_size: string, total_bytes_used: string, total_bytes_capacity: string, total_unique_bytes: string, total_bytes_free: string, preview_media_bytes: string }
export interface GenerateThumbsForLocationArgs { id: number, path: string }
export interface SetNoteArgs { id: number, note: string | null }
export interface Job { id: Array<number>, name: string, node_id: number, action: number, status: number, data: Array<number> | null, task_count: number, completed_task_count: number, date_created: string, date_modified: string, seconds_elapsed: number, nodes: Node | null }
export interface GetExplorerDirArgs { location_id: number, path: string, limit: number }
export interface Label { id: number, pub_id: Array<number>, name: string | null, date_created: string, date_modified: string, label_files: Array<LabelOnFile> | null }
export interface LibraryConfigWrapped { uuid: string, config: LibraryConfig }
export interface TagOnFile { date_created: string, tag_id: number, tag: Tag | null, file_id: number, file: File | null }
export interface Tag { id: number, pub_id: Array<number>, name: string | null, color: string | null, total_files: number | null, redundancy_goal: number | null, date_created: string, date_modified: string, tag_files: Array<TagOnFile> | null }
export interface SetFavoriteArgs { id: number, favorite: boolean }
export interface IdentifyUniqueFilesArgs { id: number, path: string }
export interface GenerateThumbsForLocationArgs { id: number, path: string }
export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused"
export interface ConfigMetadata { version: string | null }
export interface File { id: number, cas_id: string, integrity_checksum: string | null, kind: number, size_in_bytes: string, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, date_modified: string, date_indexed: string, tags: Array<TagOnFile> | null, labels: Array<LabelOnFile> | null, albums: Array<FileInAlbum> | null, spaces: Array<FileInSpace> | null, paths: Array<FilePath> | null, comments: Array<Comment> | null, media_data: MediaData | null | null, key: Key | null | null }
export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent"
export interface Label { id: number, pub_id: Array<number>, name: string | null, date_created: string, date_modified: string, label_files: Array<LabelOnFile> | null }
export interface FilePath { id: number, is_dir: boolean, location_id: number, materialized_path: string, name: string, extension: string | null, file_id: number | null, parent_id: number | null, key_id: number | null, date_created: string, date_modified: string, date_indexed: string, file: File | null | null, location: Location | null | null, key: Key | null | null }
export interface IndexerRuleCreateArgs { kind: RuleKind, name: string, parameters: Array<number> }
export interface SyncEvent { id: number, node_id: number, timestamp: string, record_id: Array<number>, kind: number, column: string | null, value: string, node: Node | null }
export interface Statistics { id: number, date_captured: string, total_file_count: number, library_db_size: string, total_bytes_used: string, total_bytes_capacity: string, total_unique_bytes: string, total_bytes_free: string, preview_media_bytes: string }
export interface NodeConfig { version: string | null, id: string, name: string, p2p_port: number | null }
export interface DirectoryWithContents { directory: FilePath, contents: Array<FilePath> }
export interface SetNoteArgs { id: number, note: string | null }
export interface IndexerRulesInLocation { date_created: string, location_id: number, location: Location | null, indexer_rule_id: number, indexer_rule: IndexerRule | null }
export interface JobReport { id: string, name: string, data: Array<number> | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: number }
export interface TagUpdateArgs { id: number, name: string | null, color: string | null }
export interface FileInSpace { date_created: string, space_id: number, space: Space | null, file_id: number, file: File | null }
export interface LabelOnFile { date_created: string, label_id: number, label: Label | null, file_id: number, file: File | null }
export interface LibraryArgs<T> { library_id: string, arg: T }
export interface Location { id: number, pub_id: Array<number>, node_id: number | null, name: string | null, local_path: string | null, total_capacity: number | null, available_capacity: number | null, filesystem: string | null, disk_type: number | null, is_removable: boolean | null, is_online: boolean, date_created: string, node: Node | null | null, file_paths: Array<FilePath> | null, indexer_rules: Array<IndexerRulesInLocation> | null }
export interface Node { id: number, pub_id: Array<number>, name: string, platform: number, version: string | null, last_seen: string, timezone: string | null, date_created: string, sync_events: Array<SyncEvent> | null, jobs: Array<Job> | null, Location: Array<Location> | null }
export interface LocationCreateArgs { path: string, indexer_rules_ids: Array<number> }
export interface InvalidateOperationEvent { key: string, arg: any }
export interface LibraryConfigWrapped { uuid: string, config: LibraryConfig }
export interface Tag { id: number, pub_id: Array<number>, name: string | null, color: string | null, total_files: number | null, redundancy_goal: number | null, date_created: string, date_modified: string, tag_files: Array<TagOnFile> | null }
export interface Job { id: Array<number>, name: string, node_id: number, action: number, status: number, data: Array<number> | null, task_count: number, completed_task_count: number, date_created: string, date_modified: string, seconds_elapsed: number, nodes: Node | null }
export interface TagOnFile { date_created: string, tag_id: number, tag: Tag | null, file_id: number, file: File | null }
export interface Key { id: number, checksum: string, name: string | null, date_created: string | null, algorithm: number | null, files: Array<File> | null, file_paths: Array<FilePath> | null }
export interface FileInAlbum { date_created: string, album_id: number, album: Album | null, file_id: number, file: File | null }
export interface EditLibraryArgs { id: string, name: string | null, description: string | null }
export interface TagCreateArgs { name: string, color: string }
export interface NodeState { version: string | null, id: string, name: string, p2p_port: number | null, data_path: string }
export interface Comment { id: number, pub_id: Array<number>, content: string, date_created: string, date_modified: string, file_id: number | null, file: File | null | null }
export interface IndexerRule { id: number, kind: number, name: string, parameters: Array<number>, date_created: string, date_modified: string, locations: Array<IndexerRulesInLocation> | null }
export interface TagAssignArgs { file_id: number, tag_id: number }
export interface GetExplorerDirArgs { location_id: number, path: string, limit: number }
export interface LibraryConfig { version: string | null, name: string, description: string }
export interface Space { id: number, pub_id: Array<number>, name: string | null, description: string | null, date_created: string, date_modified: string, files: Array<FileInSpace> | null }
export interface MediaData { id: number, pixel_width: number | null, pixel_height: number | null, longitude: number | null, latitude: number | null, fps: number | null, capture_device_make: string | null, capture_device_model: string | null, capture_device_software: string | null, duration_seconds: number | null, codecs: string | null, streams: number | null, files: File | null | null }

View file

@ -0,0 +1,29 @@
/*
Warnings:
- A unique constraint covering the columns `[local_path]` on the table `locations` will be added. If there are existing duplicate values, this will fail.
*/
-- CreateTable
CREATE TABLE "indexer_rules" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"kind" INTEGER NOT NULL,
"name" TEXT NOT NULL,
"parameters" BLOB NOT NULL,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- CreateTable
CREATE TABLE "indexer_rules_in_location" (
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"location_id" INTEGER NOT NULL,
"indexer_rule_id" INTEGER NOT NULL,
PRIMARY KEY ("location_id", "indexer_rule_id"),
CONSTRAINT "indexer_rules_in_location_location_id_fkey" FOREIGN KEY ("location_id") REFERENCES "locations" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION,
CONSTRAINT "indexer_rules_in_location_indexer_rule_id_fkey" FOREIGN KEY ("indexer_rule_id") REFERENCES "indexer_rules" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION
);
-- CreateIndex
CREATE UNIQUE INDEX "locations_local_path_key" ON "locations"("local_path");

View file

@ -0,0 +1,35 @@
/*
Warnings:
- The primary key for the `file_paths` table will be changed. If it partially fails, the table could be left without primary key constraint.
- Made the column `location_id` on table `file_paths` required. This step will fail if there are existing NULL values in that column.
*/
-- RedefineTables
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_file_paths" (
"id" INTEGER NOT NULL,
"is_dir" BOOLEAN NOT NULL DEFAULT false,
"location_id" INTEGER NOT NULL,
"materialized_path" TEXT NOT NULL,
"name" TEXT NOT NULL,
"extension" TEXT,
"file_id" INTEGER,
"parent_id" INTEGER,
"key_id" INTEGER,
"date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"date_indexed" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY ("location_id", "id"),
CONSTRAINT "file_paths_file_id_fkey" FOREIGN KEY ("file_id") REFERENCES "files" ("id") ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT "file_paths_location_id_fkey" FOREIGN KEY ("location_id") REFERENCES "locations" ("id") ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT "file_paths_key_id_fkey" FOREIGN KEY ("key_id") REFERENCES "keys" ("id") ON DELETE SET NULL ON UPDATE CASCADE
);
INSERT INTO "new_file_paths" ("date_created", "date_indexed", "date_modified", "extension", "file_id", "id", "is_dir", "key_id", "location_id", "materialized_path", "name", "parent_id") SELECT "date_created", "date_indexed", "date_modified", "extension", "file_id", "id", "is_dir", "key_id", "location_id", "materialized_path", "name", "parent_id" FROM "file_paths";
DROP TABLE "file_paths";
ALTER TABLE "new_file_paths" RENAME TO "file_paths";
CREATE INDEX "file_paths_location_id_idx" ON "file_paths"("location_id");
CREATE UNIQUE INDEX "file_paths_location_id_materialized_path_name_extension_key" ON "file_paths"("location_id", "materialized_path", "name", "extension");
PRAGMA foreign_key_check;
PRAGMA foreign_keys=ON;

View file

@ -0,0 +1,11 @@
/*
Warnings:
- A unique constraint covering the columns `[node_id,local_path]` on the table `locations` will be added. If there are existing duplicate values, this will fail.
*/
-- DropIndex
DROP INDEX "locations_local_path_key";
-- CreateIndex
CREATE UNIQUE INDEX "locations_node_id_local_path_key" ON "locations"("node_id", "local_path");

View file

@ -84,7 +84,7 @@ model Volume {
model Location {
id Int @id @default(autoincrement())
pub_id Bytes @unique
pub_id Bytes @unique
node_id Int?
name String?
local_path String?
@ -98,6 +98,9 @@ model Location {
node Node? @relation(fields: [node_id], references: [id])
file_paths FilePath[]
indexer_rules IndexerRulesInLocation[]
@@unique([node_id, local_path])
@@map("locations")
}
@ -144,10 +147,10 @@ model File {
}
model FilePath {
id Int @id @default(autoincrement())
id Int
is_dir Boolean @default(false)
// location that owns this path
location_id Int?
location_id Int
// a path generated from local file_path ids eg: "34/45/67/890"
materialized_path String
// the name and extension
@ -174,6 +177,7 @@ model FilePath {
key Key? @relation(fields: [key_id], references: [id])
@@id([location_id, id])
@@unique([location_id, materialized_path, name, extension])
@@index([location_id])
@@map("file_paths")
@ -356,3 +360,29 @@ model Comment {
@@map("comments")
}
model IndexerRule {
id Int @id @default(autoincrement())
kind Int
name String
parameters Bytes
date_created DateTime @default(now())
date_modified DateTime @default(now())
locations IndexerRulesInLocation[]
@@map("indexer_rules")
}
model IndexerRulesInLocation {
date_created DateTime @default(now())
location_id Int
location Location @relation(fields: [location_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
indexer_rule_id Int
indexer_rule IndexerRule @relation(fields: [indexer_rule_id], references: [id], onDelete: NoAction, onUpdate: NoAction)
@@id([location_id, indexer_rule_id])
@@map("indexer_rules_in_location")
}

View file

@ -1,8 +1,8 @@
use crate::{api::locations::GetExplorerDirArgs, invalidate_query, prisma::file};
use rspc::Type;
use serde::Deserialize;
use crate::{api::locations::GetExplorerDirArgs, invalidate_query, prisma::file};
use super::{LibraryArgs, RouterBuilder};
#[derive(Type, Deserialize)]

View file

@ -1,14 +1,15 @@
use std::path::PathBuf;
use rspc::Type;
use serde::Deserialize;
use crate::{
encode::{ThumbnailJob, ThumbnailJobInit},
file::cas::{FileIdentifierJob, FileIdentifierJobInit},
job::{Job, JobManager},
location::{fetch_location, LocationError},
prisma::location,
};
use rspc::Type;
use serde::Deserialize;
use std::path::PathBuf;
use super::{CoreEvent, LibraryArgs, RouterBuilder};
#[derive(Type, Deserialize)]
@ -40,6 +41,16 @@ pub(crate) fn mount() -> RouterBuilder {
|ctx, arg: LibraryArgs<GenerateThumbsForLocationArgs>| async move {
let (args, library) = arg.get_library(&ctx).await?;
if library
.db
.location()
.count(vec![location::id::equals(args.id)])
.exec()
.await? == 0
{
return Err(LocationError::IdNotFound(args.id).into());
}
library
.spawn_job(Job::new(
ThumbnailJobInit {
@ -62,8 +73,10 @@ pub(crate) fn mount() -> RouterBuilder {
library
.spawn_job(Job::new(
FileIdentifierJobInit {
location_id: args.id,
path: args.path,
location: fetch_location(&library, args.id)
.exec()
.await?
.ok_or(LocationError::IdNotFound(args.id))?,
},
Box::new(FileIdentifierJob {}),
))

View file

@ -1,16 +1,16 @@
use crate::{
library::LibraryConfig,
prisma::statistics,
volume::{get_volumes, save_volume},
};
use chrono::Utc;
use fs_extra::dir::get_size;
use fs_extra::dir::get_size; // TODO: Remove this dependency as it is sync instead of async
use rspc::Type;
use serde::Deserialize;
use tokio::fs;
use uuid::Uuid;
use crate::{
library::LibraryConfig,
prisma::statistics,
sys::{get_volumes, save_volume},
};
use super::{LibraryArgs, RouterBuilder};
#[derive(Type, Deserialize)]

View file

@ -1,16 +1,17 @@
use std::path::PathBuf;
use rspc::{Error, ErrorCode, Type};
use serde::{Deserialize, Serialize};
use tracing::info;
use crate::{
encode::THUMBNAIL_CACHE_DIR_NAME,
invalidate_query,
prisma::{file_path, location},
sys::{self, create_location, scan_location},
location::{
fetch_location, indexer::indexer_rules::IndexerRuleCreateArgs, scan_location,
with_indexer_rules, LocationCreateArgs, LocationError, LocationUpdateArgs,
},
prisma::{file_path, indexer_rule, indexer_rules_in_location, location},
};
use rspc::{self, ErrorCode, Type};
use serde::{Deserialize, Serialize};
use tracing::info;
use super::{LibraryArgs, RouterBuilder};
#[derive(Serialize, Deserialize, Type, Debug)]
@ -19,13 +20,7 @@ pub struct DirectoryWithContents {
pub contents: Vec<file_path::Data>,
}
#[derive(Type, Deserialize)]
pub struct LocationUpdateArgs {
pub id: i32,
pub name: Option<String>,
}
#[derive(Clone, Serialize, Deserialize, Type)]
#[derive(Clone, Serialize, Deserialize, Type, Debug)]
pub struct GetExplorerDirArgs {
pub location_id: i32,
pub path: String,
@ -74,19 +69,21 @@ pub(crate) fn mount() -> RouterBuilder {
.db
.file_path()
.find_first(vec![
file_path::location_id::equals(Some(location.id)),
file_path::location_id::equals(location.id),
file_path::materialized_path::equals(args.path),
file_path::is_dir::equals(true),
])
.exec()
.await?
.ok_or_else(|| Error::new(ErrorCode::NotFound, "Directory not found".into()))?;
.ok_or_else(|| {
rspc::Error::new(ErrorCode::NotFound, "Directory not found".into())
})?;
let file_paths = library
.db
.file_path()
.find_many(vec![
file_path::location_id::equals(Some(location.id)),
file_path::location_id::equals(location.id),
file_path::parent_id::equals(Some(directory.id)),
])
.with(file_path::file::fetch())
@ -119,28 +116,22 @@ pub(crate) fn mount() -> RouterBuilder {
})
},
)
.mutation("create", |ctx, arg: LibraryArgs<PathBuf>| async move {
let (path, library) = arg.get_library(&ctx).await?;
let location = create_location(&library, &path).await?;
scan_location(&library, location.id, path).await;
Ok(location)
})
.mutation(
"create",
|ctx, arg: LibraryArgs<LocationCreateArgs>| async move {
let (create_args, library) = arg.get_library(&ctx).await?;
let location = create_args.create(&library).await?;
scan_location(&library, &location).await?;
Ok(location)
},
)
.mutation(
"update",
|ctx, arg: LibraryArgs<LocationUpdateArgs>| async move {
let (args, library) = arg.get_library(&ctx).await?;
library
.db
.location()
.update(
location::id::equals(args.id),
vec![location::name::set(args.name)],
)
.exec()
.await?;
Ok(())
let (update_args, library) = arg.get_library(&ctx).await?;
update_args.update(&library).await.map_err(Into::into)
},
)
.mutation("delete", |ctx, arg: LibraryArgs<i32>| async move {
@ -149,7 +140,16 @@ pub(crate) fn mount() -> RouterBuilder {
library
.db
.file_path()
.delete_many(vec![file_path::location_id::equals(Some(location_id))])
.delete_many(vec![file_path::location_id::equals(location_id)])
.exec()
.await?;
library
.db
.indexer_rules_in_location()
.delete_many(vec![indexer_rules_in_location::location_id::equals(
location_id,
)])
.exec()
.await?;
@ -163,10 +163,7 @@ pub(crate) fn mount() -> RouterBuilder {
invalidate_query!(
library,
"locations.get": LibraryArgs<()>,
LibraryArgs {
library_id: library.id,
arg: ()
}
LibraryArgs::new(library.id, ())
);
info!("Location {} deleted", location_id);
@ -174,11 +171,77 @@ pub(crate) fn mount() -> RouterBuilder {
Ok(())
})
.mutation("fullRescan", |ctx, arg: LibraryArgs<i32>| async move {
let (id, library) = arg.get_library(&ctx).await?;
let (location_id, library) = arg.get_library(&ctx).await?;
sys::scan_location(&library, id, String::new()).await;
scan_location(
&library,
&fetch_location(&library, location_id)
.with(with_indexer_rules(location_id))
.exec()
.await?
.ok_or(LocationError::IdNotFound(location_id))?,
)
.await
.map_err(Into::into)
})
.mutation("quickRescan", |_, _: LibraryArgs<()>| todo!())
.merge("indexer_rules", mount_indexer_rule_routes())
}
fn mount_indexer_rule_routes() -> RouterBuilder {
<RouterBuilder>::new()
.mutation(
"create",
|ctx, arg: LibraryArgs<IndexerRuleCreateArgs>| async move {
let (create_args, library) = arg.get_library(&ctx).await?;
create_args.create(&library).await.map_err(Into::into)
},
)
.mutation("delete", |ctx, arg: LibraryArgs<i32>| async move {
let (indexer_rule_id, library) = arg.get_library(&ctx).await?;
library
.db
.indexer_rules_in_location()
.delete_many(vec![indexer_rules_in_location::indexer_rule_id::equals(
indexer_rule_id,
)])
.exec()
.await?;
library
.db
.indexer_rule()
.delete(indexer_rule::id::equals(indexer_rule_id))
.exec()
.await?;
Ok(())
})
.mutation("quickRescan", |_, _: LibraryArgs<()>| todo!())
.query("get", |ctx, arg: LibraryArgs<i32>| async move {
let (indexer_rule_id, library) = arg.get_library(&ctx).await?;
library
.db
.indexer_rule()
.find_unique(indexer_rule::id::equals(indexer_rule_id))
.exec()
.await?
.ok_or_else(|| {
rspc::Error::new(
ErrorCode::NotFound,
format!("Indexer rule <id={indexer_rule_id}> not found"),
)
})
})
.query("list", |ctx, arg: LibraryArgs<()>| async move {
let (_, library) = arg.get_library(&ctx).await?;
library
.db
.indexer_rule()
.find_many(vec![])
.exec()
.await
.map_err(Into::into)
})
}

View file

@ -1,19 +1,18 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};
use rspc::{Config, ErrorCode, Type};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use uuid::Uuid;
use crate::{
job::JobManager,
library::{LibraryContext, LibraryManager},
node::{NodeConfig, NodeConfigManager},
};
use rspc::{Config, ErrorCode, Type};
use serde::{Deserialize, Serialize};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::broadcast;
use uuid::Uuid;
use utils::{InvalidRequests, InvalidateOperationEvent};
pub type Router = rspc::Router<Ctx>;
@ -38,12 +37,15 @@ pub struct Ctx {
/// Can wrap a query argument to require it to contain a `library_id` and provide helpers for working with libraries.
#[derive(Clone, Serialize, Deserialize, Type)]
pub struct LibraryArgs<T> {
// If you want to make these public, your doing it wrong.
pub library_id: Uuid,
pub arg: T,
library_id: Uuid,
arg: T,
}
impl<T> LibraryArgs<T> {
pub fn new(library_id: Uuid, arg: T) -> Self {
Self { library_id, arg }
}
pub async fn get_library(self, ctx: &Ctx) -> Result<(T, LibraryContext), rspc::Error> {
match ctx.library_manager.get_ctx(self.library_id).await {
Some(library) => Ok((self.arg, library)),
@ -55,13 +57,13 @@ impl<T> LibraryArgs<T> {
}
}
mod files;
mod jobs;
mod libraries;
mod locations;
mod tags;
pub mod files;
pub mod jobs;
pub mod libraries;
pub mod locations;
pub mod tags;
pub mod utils;
mod volumes;
pub mod volumes;
pub use files::*;
pub use jobs::*;

View file

@ -1,12 +1,12 @@
use rspc::Type;
use serde::Deserialize;
use uuid::Uuid;
use crate::{
invalidate_query,
prisma::{file, tag},
};
use rspc::Type;
use serde::Deserialize;
use uuid::Uuid;
use super::{LibraryArgs, RouterBuilder};
#[derive(Type, Deserialize)]

View file

@ -1,19 +1,17 @@
use crate::api::Router;
use rspc::{internal::specta::DataType, Type};
use serde::Serialize;
use serde_json::Value;
use std::sync::Arc;
#[cfg(debug_assertions)]
use std::sync::Mutex;
/// holds information about all invalidation queries done with the [`invalidate_query!`] macro so we can check they are valid when building the router.
#[cfg(debug_assertions)]
use once_cell::sync::OnceCell;
use rspc::{internal::specta::DataType, Type};
use serde::Serialize;
use serde_json::Value;
use crate::api::Router;
/// holds information about all invalidation queries done with the [invalidate_query!] macro so we can check they are valid when building the router.
#[cfg(debug_assertions)]
pub(crate) static INVALIDATION_REQUESTS: OnceCell<Mutex<InvalidRequests>> = OnceCell::new();
pub(crate) static INVALIDATION_REQUESTS: Mutex<InvalidRequests> =
Mutex::new(InvalidRequests::new());
#[derive(Debug, Clone, Serialize, Type)]
pub struct InvalidateOperationEvent {
@ -46,14 +44,17 @@ pub(crate) struct InvalidRequests {
}
impl InvalidRequests {
const fn new() -> Self {
Self {
queries: Vec::new(),
}
}
#[allow(unused_variables)]
pub(crate) fn validate(r: Arc<Router>) {
#[cfg(debug_assertions)]
{
let invalidate_requests = crate::api::utils::INVALIDATION_REQUESTS
.get_or_init(Default::default)
.lock()
.unwrap();
let invalidate_requests = INVALIDATION_REQUESTS.lock().unwrap();
let queries = r.queries();
for req in &invalidate_requests.queries {
@ -61,9 +62,9 @@ impl InvalidRequests {
if let Some(arg) = &req.arg_ty {
if &query_ty.ty.arg_ty != arg {
panic!(
"Error at '{}': Attempted to invalid query '{}' but the argument type does not match the type defined on the router.",
req.macro_src, req.key
);
"Error at '{}': Attempted to invalid query '{}' but the argument type does not match the type defined on the router.",
req.macro_src, req.key
);
}
}
} else {
@ -77,7 +78,7 @@ impl InvalidRequests {
}
}
/// invalidate_query is a macro which stores a list of all of it's invocations so it can ensure all of the queries match the queries attached to the router.
/// `invalidate_query` is a macro which stores a list of all of it's invocations so it can ensure all of the queries match the queries attached to the router.
/// This allows invalidate to be type-safe even when the router keys are stringly typed.
/// ```ignore
/// invalidate_query!(
@ -97,7 +98,6 @@ macro_rules! invalidate_query {
#[ctor::ctor]
fn invalidate() {
crate::api::utils::INVALIDATION_REQUESTS
.get_or_init(|| Default::default())
.lock()
.unwrap()
.queries
@ -112,7 +112,7 @@ macro_rules! invalidate_query {
// The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit.
crate::api::utils::InvalidateOperationEvent::dangerously_create($key, serde_json::Value::Null)
}};
($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr) => {{
($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr $(,)?) => {{
let _: $arg_ty = $arg; // Assert the type the user provided is correct
let ctx: &crate::library::LibraryContext = &$ctx; // Assert the context is the correct type
@ -121,7 +121,6 @@ macro_rules! invalidate_query {
#[ctor::ctor]
fn invalidate() {
crate::api::utils::INVALIDATION_REQUESTS
.get_or_init(|| Default::default())
.lock()
.unwrap()
.queries

View file

@ -1,4 +1,4 @@
use crate::sys::get_volumes;
use crate::volume::get_volumes;
use super::{Router, RouterBuilder};

View file

@ -1,9 +1,11 @@
use crate::{
api::CoreEvent,
api::{locations::GetExplorerDirArgs, CoreEvent, LibraryArgs},
invalidate_query,
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::LibraryContext,
prisma::{file_path, location},
};
use image::{self, imageops, DynamicImage, GenericImageView};
use serde::{Deserialize, Serialize};
use std::{
@ -66,8 +68,9 @@ impl StatefulJob for ThumbnailJob {
.unwrap();
info!(
"Searching for images in location {} at path {:#?}",
location.id, state.init.path
"Searching for images in location {} at path {}",
location.id,
state.init.path.display()
);
// create all necessary directories if they don't exist
@ -150,6 +153,21 @@ impl StatefulJob for ThumbnailJob {
info!("Thumb exists, skipping... {}", output_path.display());
}
// With this invalidate query, we update the user interface to show each new thumbnail
let library_ctx = ctx.library_ctx();
invalidate_query!(
library_ctx,
"locations.getExplorerDir": LibraryArgs<GetExplorerDirArgs>,
LibraryArgs::new(
library_ctx.id,
GetExplorerDirArgs {
location_id: state.init.location_id,
path: "".to_string(),
limit: 100
}
)
);
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(
state.step_number + 1,
)]);
@ -211,9 +229,9 @@ pub async fn get_images(
ctx: &LibraryContext,
location_id: i32,
path: impl AsRef<Path>,
) -> Result<Vec<file_path::Data>, std::io::Error> {
) -> Result<Vec<file_path::Data>, JobError> {
let mut params = vec![
file_path::location_id::equals(Some(location_id)),
file_path::location_id::equals(location_id),
file_path::extension::in_vec(vec![
"png".to_string(),
"jpeg".to_string(),
@ -223,20 +241,17 @@ pub async fn get_images(
]),
];
let path_str = path.as_ref().as_os_str().to_str().unwrap().to_string();
let path_str = path.as_ref().to_string_lossy().to_string();
if !path_str.is_empty() {
params.push(file_path::materialized_path::starts_with(path_str))
params.push(file_path::materialized_path::starts_with(path_str));
}
let image_files = ctx
.db
ctx.db
.file_path()
.find_many(params)
.with(file_path::file::fetch())
.exec()
.await
.unwrap();
Ok(image_files)
.map_err(Into::into)
}

View file

@ -1,5 +1,4 @@
use data_encoding::HEXLOWER;
use ring::digest::{Context, SHA256};
use std::path::PathBuf;
use tokio::{

View file

@ -1,10 +1,9 @@
use super::checksum::generate_cas_id;
use crate::{
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
library::LibraryContext,
prisma::{file, file_path, location},
};
use chrono::{DateTime, FixedOffset};
use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction};
use serde::{Deserialize, Serialize};
@ -15,6 +14,8 @@ use std::{
use tokio::{fs, io};
use tracing::{error, info};
use super::checksum::generate_cas_id;
// we break this job into chunks of 100 to improve performance
static CHUNK_SIZE: usize = 100;
pub const IDENTIFIER_JOB_NAME: &str = "file_identifier";
@ -26,17 +27,27 @@ pub struct FileIdentifierJob {}
// finally: creating unique file records, and linking them to their file_paths
#[derive(Serialize, Deserialize, Clone)]
pub struct FileIdentifierJobInit {
pub location_id: i32,
pub path: PathBuf,
pub location: location::Data,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct FilePathIdAndLocationIdCursor {
file_path_id: i32,
location_id: i32,
}
impl From<&FilePathIdAndLocationIdCursor> for file_path::UniqueWhereParam {
fn from(cursor: &FilePathIdAndLocationIdCursor) -> Self {
file_path::location_id_id(cursor.location_id, cursor.file_path_id)
}
}
#[derive(Serialize, Deserialize)]
pub struct FileIdentifierJobState {
total_count: usize,
task_count: usize,
location: location::Data,
location_path: PathBuf,
cursor: i32,
cursor: FilePathIdAndLocationIdCursor,
}
#[async_trait::async_trait]
@ -58,21 +69,15 @@ impl StatefulJob for FileIdentifierJob {
let library = ctx.library_ctx();
let location = library
.db
.location()
.find_unique(location::id::equals(state.init.location_id))
.exec()
.await?
.unwrap();
let location_path = location
let location_path = state
.init
.location
.local_path
.as_ref()
.map(PathBuf::from)
.unwrap_or_default();
let total_count = count_orphan_file_paths(&library, location.id.into()).await?;
let total_count = count_orphan_file_paths(&library, state.init.location.id.into()).await?;
info!("Found {} orphan file paths", total_count);
let task_count = (total_count as f64 / CHUNK_SIZE as f64).ceil() as usize;
@ -84,9 +89,11 @@ impl StatefulJob for FileIdentifierJob {
state.data = Some(FileIdentifierJobState {
total_count,
task_count,
location,
location_path,
cursor: 1,
cursor: FilePathIdAndLocationIdCursor {
file_path_id: 1,
location_id: state.init.location.id,
},
});
state.steps = (0..task_count).map(|_| ()).collect();
@ -109,7 +116,7 @@ impl StatefulJob for FileIdentifierJob {
.expect("critical error: missing data on job state");
// get chunk of orphans to process
let file_paths = match get_orphan_file_paths(&ctx.library_ctx(), data.cursor).await {
let file_paths = match get_orphan_file_paths(&ctx.library_ctx(), &data.cursor).await {
Ok(file_paths) => file_paths,
Err(e) => {
info!("Error getting orphan file paths: {:#?}", e);
@ -161,7 +168,10 @@ impl StatefulJob for FileIdentifierJob {
.db
.file_path()
.update(
file_path::id::equals(*cas_lookup.get(&existing_file.cas_id).unwrap()),
file_path::location_id_id(
state.init.location.id,
*cas_lookup.get(&existing_file.cas_id).unwrap(),
),
vec![file_path::file_id::set(Some(existing_file.id))],
)
.exec()
@ -221,7 +231,10 @@ impl StatefulJob for FileIdentifierJob {
.db
.file_path()
.update(
file_path::id::equals(*cas_lookup.get(&created_file.cas_id).unwrap()),
file_path::location_id_id(
state.init.location.id,
*cas_lookup.get(&created_file.cas_id).unwrap(),
),
vec![file_path::file_id::set(Some(created_file.id))],
)
.exec()
@ -233,7 +246,7 @@ impl StatefulJob for FileIdentifierJob {
// handle last step
if let Some(last_row) = file_paths.last() {
data.cursor = last_row.id;
data.cursor.file_path_id = last_row.id;
} else {
return Ok(());
}
@ -275,7 +288,7 @@ struct CountRes {
count: Option<usize>,
}
pub async fn count_orphan_file_paths(
async fn count_orphan_file_paths(
ctx: &LibraryContext,
location_id: i64,
) -> Result<usize, prisma_client_rust::QueryError> {
@ -284,14 +297,14 @@ pub async fn count_orphan_file_paths(
"SELECT COUNT(*) AS count FROM file_paths WHERE file_id IS NULL AND is_dir IS FALSE AND location_id = {}",
PrismaValue::Int(location_id)
))
.exec()
.exec()
.await?;
Ok(files_count[0].count.unwrap_or(0))
}
pub async fn get_orphan_file_paths(
async fn get_orphan_file_paths(
ctx: &LibraryContext,
cursor: i32,
cursor: &FilePathIdAndLocationIdCursor,
) -> Result<Vec<file_path::Data>, prisma_client_rust::QueryError> {
info!(
"discovering {} orphan file paths at cursor: {:?}",
@ -304,26 +317,26 @@ pub async fn get_orphan_file_paths(
file_path::is_dir::equals(false),
])
.order_by(file_path::id::order(Direction::Asc))
.cursor(file_path::id::equals(cursor))
.cursor(cursor.into())
.take(CHUNK_SIZE as i64)
.exec()
.await
}
#[derive(Deserialize, Serialize, Debug)]
pub struct CreateFile {
struct CreateFile {
pub cas_id: String,
pub size_in_bytes: i64,
pub date_created: DateTime<FixedOffset>,
}
#[derive(Deserialize, Serialize, Debug)]
pub struct FileCreated {
struct FileCreated {
pub id: i32,
pub cas_id: String,
}
pub async fn prepare_file(
async fn prepare_file(
location_path: impl AsRef<Path>,
file_path: &file_path::Data,
) -> Result<CreateFile, io::Error> {

View file

@ -1,386 +0,0 @@
use crate::{
job::{JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
prisma::location,
sys::create_location,
};
use chrono::{DateTime, Utc};
use prisma_client_rust::{raw, raw::Raw, PrismaValue};
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
ffi::OsStr,
path::{Path, PathBuf},
time::Duration,
};
use tokio::{fs, time::Instant};
use tracing::{error, info};
use walkdir::{DirEntry, WalkDir};
static BATCH_SIZE: usize = 100;
pub const INDEXER_JOB_NAME: &str = "indexer";
#[derive(Clone)]
pub enum ScanProgress {
ChunkCount(usize),
SavedChunks(usize),
Message(String),
}
pub struct IndexerJob {}
#[derive(Serialize, Deserialize, Clone)]
pub struct IndexerJobInit {
pub path: PathBuf,
}
#[derive(Serialize, Deserialize)]
pub struct IndexerJobData {
location: location::Data,
db_write_start: DateTime<Utc>,
scan_read_time: Duration,
total_paths: usize,
}
pub(crate) type IndexerJobStep = Vec<(PathBuf, i32, Option<i32>, bool)>;
impl IndexerJobData {
fn on_scan_progress(ctx: WorkerContext, progress: Vec<ScanProgress>) {
ctx.progress(
progress
.iter()
.map(|p| match p.clone() {
ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c),
ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p),
ScanProgress::Message(m) => JobReportUpdate::Message(m),
})
.collect(),
)
}
}
#[async_trait::async_trait]
impl StatefulJob for IndexerJob {
type Init = IndexerJobInit;
type Data = IndexerJobData;
type Step = IndexerJobStep;
fn name(&self) -> &'static str {
INDEXER_JOB_NAME
}
// creates a vector of valid path buffers from a directory
async fn init(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let location = create_location(&ctx.library_ctx(), &state.init.path).await?;
// query db to highers id, so we can increment it for the new files indexed
#[derive(Deserialize, Serialize, Debug)]
struct QueryRes {
id: Option<i32>,
}
// grab the next id so we can increment in memory for batch inserting
let first_file_id = match ctx
.library_ctx()
.db
._query_raw::<QueryRes>(raw!("SELECT MAX(id) id FROM file_paths"))
.exec()
.await
{
Ok(rows) => rows[0].id.unwrap_or(0),
Err(e) => panic!("Error querying for next file id: {:#?}", e),
};
//check is path is a directory
if !state.init.path.is_dir() {
// return Err(anyhow::anyhow!("{} is not a directory", &path));
panic!("{:#?} is not a directory", state.init.path);
}
// spawn a dedicated thread to scan the directory for performance
let path = state.init.path.clone();
let inner_ctx = ctx.clone();
let (paths, scan_start) = tokio::task::spawn_blocking(move || {
// store every valid path discovered
let mut paths: Vec<(PathBuf, i32, Option<i32>, bool)> = Vec::new();
// store a hashmap of directories to their file ids for fast lookup
let mut dirs = HashMap::new();
// begin timer for logging purposes
let scan_start = Instant::now();
let mut next_file_id = first_file_id;
let mut get_id = || {
next_file_id += 1;
next_file_id
};
// walk through directory recursively
for entry in WalkDir::new(&path).into_iter().filter_entry(|dir| {
// check if entry is approved
!is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir)
}) {
// extract directory entry or log and continue if failed
let entry = match entry {
Ok(entry) => entry,
Err(e) => {
error!("Error reading file {}", e);
continue;
}
};
let path = entry.path();
info!("Found filesystem path: {:?}", path);
let parent_path = path
.parent()
.unwrap_or_else(|| Path::new(""))
.to_str()
.unwrap_or("");
let parent_dir_id = dirs.get(parent_path);
let path_str = match path.as_os_str().to_str() {
Some(path_str) => path_str,
None => {
error!("Error reading file {}", &path.display());
continue;
}
};
IndexerJobData::on_scan_progress(
inner_ctx.clone(),
vec![
ScanProgress::Message(format!("Scanning {}", path_str)),
ScanProgress::ChunkCount(paths.len() / BATCH_SIZE),
],
);
let file_id = get_id();
let file_type = entry.file_type();
let is_dir = file_type.is_dir();
if is_dir || file_type.is_file() {
paths.push((path.to_owned(), file_id, parent_dir_id.cloned(), is_dir));
}
if is_dir {
let _path = match path.to_str() {
Some(path) => path.to_owned(),
None => continue,
};
dirs.insert(_path, file_id);
}
}
(paths, scan_start)
})
.await?;
state.data = Some(IndexerJobData {
location,
db_write_start: Utc::now(),
scan_read_time: scan_start.elapsed(),
total_paths: paths.len(),
});
state.steps = paths
.chunks(BATCH_SIZE)
.enumerate()
.map(|(i, chunk)| {
IndexerJobData::on_scan_progress(
ctx.clone(),
vec![
ScanProgress::SavedChunks(i as usize),
ScanProgress::Message(format!(
"Writing {} of {} to db",
i * chunk.len(),
paths.len(),
)),
],
);
chunk.to_vec()
})
.collect();
Ok(())
}
async fn execute_step(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
// vector to store active models
let mut files = Vec::new();
let step = &state.steps[0];
let data = state
.data
.as_ref()
.expect("critical error: missing data on job state");
for (file_path, file_id, parent_dir_id, is_dir) in step {
files.extend(
match prepare_values(file_path, *file_id, &data.location, parent_dir_id, *is_dir)
.await
{
Ok(values) => values.to_vec(),
Err(e) => {
error!("Error creating file model from path {:?}: {}", file_path, e);
continue;
}
},
);
}
let raw = Raw::new(
&format!("
INSERT INTO file_paths (id, is_dir, location_id, materialized_path, name, extension, parent_id, date_created)
VALUES {}
",
vec!["({}, {}, {}, {}, {}, {}, {}, {})"; step.len()].join(", ")
),
files
);
let count = ctx.library_ctx().db._execute_raw(raw).exec().await;
info!("Inserted {:?} records", count);
Ok(())
}
async fn finalize(
&self,
_ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let data = state
.data
.as_ref()
.expect("critical error: missing data on job state");
info!(
"scan of {:?} completed in {:?}. {:?} files found. db write completed in {:?}",
state.init.path,
data.scan_read_time,
data.total_paths,
Utc::now() - data.db_write_start,
);
Ok(())
}
}
// // PathContext provides the indexer with instruction to handle particular directory structures and identify rich context.
// pub struct PathContext {
// // an app specific key "com.github.repo"
// pub key: String,
// pub name: String,
// pub is_dir: bool,
// // possible file extensions for this path
// pub extensions: Vec<String>,
// // sub-paths that must be found
// pub must_contain_sub_paths: Vec<String>,
// // sub-paths that are ignored
// pub always_ignored_sub_paths: Option<String>,
// }
// reads a file at a path and creates an ActiveModel with metadata
async fn prepare_values(
file_path: impl AsRef<Path>,
id: i32,
location: &location::Data,
parent_id: &Option<i32>,
is_dir: bool,
) -> Result<[PrismaValue; 8], std::io::Error> {
let file_path = file_path.as_ref();
let metadata = fs::metadata(file_path).await?;
let location_path = location.local_path.as_ref().map(PathBuf::from).unwrap();
// let size = metadata.len();
let name;
let extension;
let date_created: DateTime<Utc> = metadata.created().unwrap().into();
// if the 'file_path' is not a directory, then get the extension and name.
// if 'file_path' is a directory, set extension to an empty string to avoid periods in folder names
// - being interpreted as file extensions
if is_dir {
extension = "".to_string();
name = extract_name(file_path.file_name());
} else {
extension = extract_name(file_path.extension());
name = extract_name(file_path.file_stem());
}
let materialized_path = file_path.strip_prefix(location_path).unwrap();
let materialized_path_as_string = materialized_path.to_str().unwrap_or("").to_owned();
let values = [
PrismaValue::Int(id as i64),
PrismaValue::Boolean(metadata.is_dir()),
PrismaValue::Int(location.id as i64),
PrismaValue::String(materialized_path_as_string),
PrismaValue::String(name),
PrismaValue::String(extension.to_lowercase()),
parent_id
.map(|id| PrismaValue::Int(id as i64))
.unwrap_or(PrismaValue::Null),
PrismaValue::DateTime(date_created.into()),
];
Ok(values)
}
// extract name from OsStr returned by PathBuff
fn extract_name(os_string: Option<&OsStr>) -> String {
os_string
.unwrap_or_default()
.to_str()
.unwrap_or_default()
.to_owned()
}
fn is_hidden(entry: &DirEntry) -> bool {
entry
.file_name()
.to_str()
.map(|s| s.starts_with('.'))
.unwrap_or(false)
}
fn is_library(entry: &DirEntry) -> bool {
entry
.path()
.to_str()
// make better this is shit
.map(|s| s.contains("/Library/"))
.unwrap_or(false)
}
fn is_node_modules(entry: &DirEntry) -> bool {
entry
.file_name()
.to_str()
.map(|s| s.contains("node_modules"))
.unwrap_or(false)
}
fn is_app_bundle(entry: &DirEntry) -> bool {
let is_dir = entry.metadata().unwrap().is_dir();
let contains_dot = entry
.file_name()
.to_str()
.map(|s| s.contains(".app") | s.contains(".bundle"))
.unwrap_or(false);
// let is_app_bundle = is_dir && contains_dot;
// if is_app_bundle {
// let path_buff = entry.path();
// let path = path_buff.to_str().unwrap();
// self::path(&path, );
// }
is_dir && contains_dot
}

View file

@ -1,2 +1 @@
pub mod cas;
pub mod indexer;

View file

@ -1,13 +1,12 @@
use crate::{
encode::{ThumbnailJob, THUMBNAIL_JOB_NAME},
file::{
cas::{FileIdentifierJob, IDENTIFIER_JOB_NAME},
indexer::{IndexerJob, INDEXER_JOB_NAME},
},
file::cas::{FileIdentifierJob, IDENTIFIER_JOB_NAME},
job::{worker::Worker, DynJob, Job, JobError},
library::LibraryContext,
location::indexer::indexer_job::{IndexerJob, INDEXER_JOB_NAME},
prisma::{job, node},
};
use int_enum::IntEnum;
use rspc::Type;
use serde::{Deserialize, Serialize};
@ -80,9 +79,13 @@ impl JobManager {
let wrapped_worker = Arc::new(Mutex::new(worker));
Worker::spawn(Arc::clone(&self), Arc::clone(&wrapped_worker), ctx.clone()).await;
running_workers.insert(job_id, wrapped_worker);
if let Err(e) =
Worker::spawn(Arc::clone(&self), Arc::clone(&wrapped_worker), ctx.clone()).await
{
error!("Error spawning worker: {:?}", e);
} else {
running_workers.insert(job_id, wrapped_worker);
}
} else {
self.job_queue.write().await.push_back(job);
}
@ -271,20 +274,14 @@ impl JobReport {
}
pub async fn create(&self, ctx: &LibraryContext) -> Result<(), JobError> {
let mut params = Vec::new();
if self.data.is_some() {
params.push(job::data::set(self.data.clone()))
}
ctx.db
.job()
.create(
self.id.as_bytes().to_vec(),
self.name.clone(),
1,
JobStatus::Running as i32,
node::id::equals(ctx.node_local_id),
params,
vec![job::data::set(self.data.clone())],
)
.exec()
.await?;

View file

@ -1,4 +1,5 @@
use crate::sys::LocationError;
use crate::location::{indexer::IndexerError, LocationError};
use rmp_serde::{decode::Error as DecodeError, encode::Error as EncodeError};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{collections::VecDeque, fmt::Debug};
@ -31,6 +32,8 @@ pub enum JobError {
"Tried to resume a job that doesn't have saved state data: job <name='{1}', uuid='{0}'>"
)]
MissingJobDataState(Uuid, String),
#[error("Indexer error: {0}")]
IndexerError(#[from] IndexerError),
#[error("Job paused")]
Paused(Vec<u8>),
}

View file

@ -2,6 +2,7 @@ use crate::job::{DynJob, JobError, JobManager, JobReportUpdate, JobStatus};
use crate::library::LibraryContext;
use crate::{api::LibraryArgs, invalidate_query};
use std::{sync::Arc, time::Duration};
use tokio::sync::oneshot;
use tokio::{
sync::{
broadcast,
@ -18,9 +19,9 @@ use super::JobReport;
#[derive(Debug)]
pub enum WorkerEvent {
Progressed(Vec<JobReportUpdate>),
Completed,
Failed,
Paused(Vec<u8>),
Completed(oneshot::Sender<()>),
Failed(oneshot::Sender<()>),
Paused(Vec<u8>, oneshot::Sender<()>),
}
#[derive(Clone)]
@ -75,7 +76,7 @@ impl Worker {
job_manager: Arc<JobManager>,
worker_mutex: Arc<Mutex<Self>>,
ctx: LibraryContext,
) {
) -> Result<(), JobError> {
let mut worker = worker_mutex.lock().await;
// we capture the worker receiver channel so state can be updated from inside the worker
let worker_events_tx = worker.worker_events_tx.clone();
@ -93,7 +94,7 @@ impl Worker {
let old_status = worker.report.status;
worker.report.status = JobStatus::Running;
if matches!(old_status, JobStatus::Queued) {
worker.report.create(&ctx).await.unwrap();
worker.report.create(&ctx).await?;
}
drop(worker);
@ -133,29 +134,36 @@ impl Worker {
}
});
let (done_tx, done_rx) = oneshot::channel();
if let Err(e) = job.run(worker_ctx.clone()).await {
if let JobError::Paused(state) = e {
worker_ctx
.events_tx
.send(WorkerEvent::Paused(state))
.send(WorkerEvent::Paused(state, done_tx))
.expect("critical error: failed to send worker pause event");
} else {
error!("job '{}' failed with error: {:#?}", job_id, e);
worker_ctx
.events_tx
.send(WorkerEvent::Failed)
.send(WorkerEvent::Failed(done_tx))
.expect("critical error: failed to send worker fail event");
}
} else {
// handle completion
worker_ctx
.events_tx
.send(WorkerEvent::Completed)
.send(WorkerEvent::Completed(done_tx))
.expect("critical error: failed to send worker complete event");
}
if let Err(e) = done_rx.await {
error!("failed to wait for worker completion: {:#?}", e);
}
job_manager.complete(&ctx, job_id).await;
});
Ok(())
}
async fn track_progress(
@ -192,77 +200,72 @@ impl Worker {
invalidate_query!(
library,
"jobs.getRunning": LibraryArgs<()>,
LibraryArgs {
library_id: library.id,
arg: ()
}
LibraryArgs::new(library.id, ())
);
}
WorkerEvent::Completed => {
WorkerEvent::Completed(done_tx) => {
worker.report.status = JobStatus::Completed;
worker.report.data = None;
worker
.report
.update(&library)
.await
.expect("critical error: failed to update job report");
if let Err(e) = worker.report.update(&library).await {
error!("failed to update job report: {:#?}", e);
}
invalidate_query!(
library,
"jobs.getRunning": LibraryArgs<()>,
LibraryArgs {
library_id: library.id,
arg: ()
}
LibraryArgs::new(library.id, ())
);
invalidate_query!(
library,
"jobs.getHistory": LibraryArgs<()>,
LibraryArgs {
library_id: library.id,
arg: ()
}
LibraryArgs::new(library.id, ())
);
info!("{}", worker.report);
done_tx
.send(())
.expect("critical error: failed to send worker completion");
break;
}
WorkerEvent::Failed => {
WorkerEvent::Failed(done_tx) => {
worker.report.status = JobStatus::Failed;
worker.report.data = None;
worker
.report
.update(&library)
.await
.expect("critical error: failed to update job report");
if let Err(e) = worker.report.update(&library).await {
error!("failed to update job report: {:#?}", e);
}
invalidate_query!(library, "library.get": (), ());
warn!("{}", worker.report);
done_tx
.send(())
.expect("critical error: failed to send worker completion");
break;
}
WorkerEvent::Paused(state) => {
WorkerEvent::Paused(state, done_tx) => {
worker.report.status = JobStatus::Paused;
worker.report.data = Some(state);
worker
.report
.update(&library)
.await
.expect("critical error: failed to update job report");
if let Err(e) = worker.report.update(&library).await {
error!("failed to update job report: {:#?}", e);
}
info!("{}", worker.report);
invalidate_query!(
library,
"jobs.getHistory": LibraryArgs<()>,
LibraryArgs {
library_id: library.id,
arg: ()
}
LibraryArgs::new(library.id, ()),
);
done_tx
.send(())
.expect("critical error: failed to send worker completion");
break;
}
}

View file

@ -14,10 +14,11 @@ pub(crate) mod encode;
pub(crate) mod file;
pub(crate) mod job;
pub(crate) mod library;
pub(crate) mod location;
pub(crate) mod node;
pub(crate) mod prisma;
pub(crate) mod sys;
pub(crate) mod util;
pub(crate) mod volume;
#[derive(Clone)]
pub struct NodeContext {

View file

@ -1,18 +1,24 @@
use crate::{
invalidate_query,
node::Platform,
prisma::node,
util::{
db::load_and_migrate,
seeder::{indexer_rules_seeder, SeederError},
},
NodeContext,
};
use std::{
env, fs, io,
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};
use thiserror::Error;
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::{
invalidate_query, node::Platform, prisma::node, util::db::load_and_migrate, NodeContext,
};
use super::{LibraryConfig, LibraryConfigWrapped, LibraryContext};
/// LibraryManager is a singleton that manages all libraries for a node.
@ -41,11 +47,17 @@ pub enum LibraryManagerError {
Uuid(#[from] uuid::Error),
#[error("error opening database as the path contains non-UTF-8 characters")]
InvalidDatabasePath(PathBuf),
#[error("Failed to run seeder: {0}")]
Seeder(#[from] SeederError),
}
impl From<LibraryManagerError> for rspc::Error {
fn from(error: LibraryManagerError) -> Self {
rspc::Error::new(rspc::ErrorCode::InternalServerError, error.to_string())
rspc::Error::with_cause(
rspc::ErrorCode::InternalServerError,
error.to_string(),
error,
)
}
}
@ -254,6 +266,9 @@ impl LibraryManager {
.exec()
.await?;
// Run seeders
indexer_rules_seeder(&db).await?;
Ok(LibraryContext {
id,
config,

View file

@ -0,0 +1,61 @@
use rspc::{self, ErrorCode};
use std::path::PathBuf;
use thiserror::Error;
use tokio::io;
use uuid::Uuid;
/// Error type for location related errors
#[derive(Error, Debug)]
pub enum LocationError {
// Not Found errors
#[error("Location not found (path: {0:?})")]
PathNotFound(PathBuf),
#[error("Location not found (uuid: {0})")]
UuidNotFound(Uuid),
#[error("Location not found (id: {0})")]
IdNotFound(i32),
// User errors
#[error("Location not a directory (path: {0:?})")]
NotDirectory(PathBuf),
#[error("Missing local_path (id: {0})")]
MissingLocalPath(i32),
// Internal Errors
#[error("Failed to create location (uuid {uuid:?})")]
CreateFailure { uuid: Uuid },
#[error("Failed to read location dotfile (path: {1:?}); (error: {0:?})")]
DotfileReadFailure(io::Error, PathBuf),
#[error("Failed to serialize dotfile for location (at path: {1:?}); (error: {0:?})")]
DotfileSerializeFailure(serde_json::Error, PathBuf),
#[error("Dotfile location is read only (at path: {0:?})")]
ReadonlyDotFileLocationFailure(PathBuf),
#[error("Failed to write dotfile (path: {1:?}); (error: {0:?})")]
DotfileWriteFailure(io::Error, PathBuf),
#[error("Failed to open file from local os (error: {0:?})")]
FileReadError(io::Error),
#[error("Failed to read mounted volumes from local os (error: {0:?})")]
VolumeReadError(String),
#[error("Failed to connect to database (error: {0:?})")]
IOError(io::Error),
#[error("Database error (error: {0:?})")]
DatabaseError(#[from] prisma_client_rust::QueryError),
}
impl From<LocationError> for rspc::Error {
fn from(err: LocationError) -> Self {
match err {
LocationError::PathNotFound(_)
| LocationError::UuidNotFound(_)
| LocationError::IdNotFound(_) => {
rspc::Error::with_cause(ErrorCode::NotFound, err.to_string(), err)
}
LocationError::NotDirectory(_) | LocationError::MissingLocalPath(_) => {
rspc::Error::with_cause(ErrorCode::BadRequest, err.to_string(), err)
}
_ => rspc::Error::with_cause(ErrorCode::InternalServerError, err.to_string(), err),
}
}
}

View file

@ -0,0 +1,319 @@
use crate::{
job::{JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
prisma::{file_path, location},
};
use chrono::{DateTime, Utc};
use itertools::Itertools;
use prisma_client_rust::Direction;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, ffi::OsStr, path::PathBuf, time::Duration};
use tokio::time::Instant;
use tracing::info;
use super::{
indexer_rules::IndexerRule,
walk::{walk, WalkEntry},
};
/// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database.
const BATCH_SIZE: usize = 1000;
pub const INDEXER_JOB_NAME: &str = "indexer";
#[derive(Clone)]
pub enum ScanProgress {
ChunkCount(usize),
SavedChunks(usize),
Message(String),
}
/// A `IndexerJob` is a stateful job that walks a directory and indexes all files.
/// First it walks the directory and generates a list of files to index, chunked into
/// batches of [`BATCH_SIZE`]. Then for each chunk it write the file metadata to the database.
pub struct IndexerJob;
/// `IndexerJobInit` receives a `location::Data` object to be indexed
#[derive(Serialize, Deserialize, Clone)]
pub struct IndexerJobInit {
pub location: location::Data,
}
/// `IndexerJobData` contains the state of the indexer job, which includes a `location_path` that
/// is cached and casted on `PathBuf` from `local_path` column in the `location` table. It also
/// contains some metadata for logging purposes.
#[derive(Serialize, Deserialize)]
pub struct IndexerJobData {
location_path: PathBuf,
db_write_start: DateTime<Utc>,
scan_read_time: Duration,
total_paths: usize,
}
/// `IndexerJobStep` is a type alias, specifying that each step of the [`IndexerJob`] is a vector of
/// `IndexerJobStepEntry`. The size of this vector is given by the [`BATCH_SIZE`] constant.
pub type IndexerJobStep = Vec<IndexerJobStepEntry>;
/// `IndexerJobStepEntry` represents a single file to be indexed, given its metadata to be written
/// on the `file_path` table in the database
#[derive(Serialize, Deserialize)]
pub struct IndexerJobStepEntry {
path: PathBuf,
created_at: DateTime<Utc>,
file_id: i32,
parent_id: Option<i32>,
is_dir: bool,
}
impl IndexerJobData {
fn on_scan_progress(ctx: WorkerContext, progress: Vec<ScanProgress>) {
ctx.progress(
progress
.iter()
.map(|p| match p.clone() {
ScanProgress::ChunkCount(c) => JobReportUpdate::TaskCount(c),
ScanProgress::SavedChunks(p) => JobReportUpdate::CompletedTaskCount(p),
ScanProgress::Message(m) => JobReportUpdate::Message(m),
})
.collect(),
)
}
}
#[async_trait::async_trait]
impl StatefulJob for IndexerJob {
type Init = IndexerJobInit;
type Data = IndexerJobData;
type Step = IndexerJobStep;
fn name(&self) -> &'static str {
INDEXER_JOB_NAME
}
/// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`.
async fn init(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let location_path = state
.init
.location
.local_path
.as_ref()
.map(PathBuf::from)
.unwrap();
// query db to highers id, so we can increment it for the new files indexed
#[derive(Deserialize, Serialize, Debug)]
struct QueryRes {
id: Option<i32>,
}
// TODO: use a select to fetch only the id instead of entire record when prisma supports it
// grab the next id so we can increment in memory for batch inserting
let first_file_id = ctx
.library_ctx()
.db
.file_path()
.find_first(vec![])
.order_by(file_path::id::order(Direction::Desc))
.exec()
.await?
.map(|r| r.id)
.unwrap_or(0);
let mut indexer_rules_by_kind = HashMap::new();
for indexer_rule_in_location in state
.init
.location
.indexer_rules
.as_ref()
.expect("critical error: indexer job init received a location object without indexes_rules_in_location being fetched")
{
let indexer_rule_data = indexer_rule_in_location.indexer_rule.as_ref()
.expect("critical error: indexer job init received a indexes_rules_in_location object without indexes_rules being fetched");
let indexer_rule = IndexerRule::try_from(indexer_rule_data.as_ref())?;
indexer_rules_by_kind.entry(indexer_rule.kind).or_insert(vec![]).push(indexer_rule);
}
let scan_start = Instant::now();
let inner_ctx = ctx.clone();
let paths = walk(
location_path.clone(),
&indexer_rules_by_kind,
move |path, total_entries| {
IndexerJobData::on_scan_progress(
inner_ctx.clone(),
vec![
ScanProgress::Message(format!("Scanning {}", path.display())),
ScanProgress::ChunkCount(total_entries / BATCH_SIZE),
],
);
},
)
.await?;
let total_paths = paths.len();
let mut dirs_ids = HashMap::new();
let paths_entries = paths
.into_iter()
.zip(first_file_id..(first_file_id + total_paths as i32))
.map(
|(
WalkEntry {
path,
is_dir,
created_at,
},
file_id,
)| {
let parent_id = if let Some(parent_dir) = path.parent() {
dirs_ids.get(parent_dir).copied()
} else {
None
};
dirs_ids.insert(path.clone(), file_id);
IndexerJobStepEntry {
path,
created_at,
file_id,
parent_id,
is_dir,
}
},
)
.collect::<Vec<_>>();
let total_entries = paths_entries.len();
state.data = Some(IndexerJobData {
location_path,
db_write_start: Utc::now(),
scan_read_time: scan_start.elapsed(),
total_paths: total_entries,
});
state.steps = paths_entries
.into_iter()
.chunks(BATCH_SIZE)
.into_iter()
.enumerate()
.map(|(i, chunk)| {
let chunk_steps = chunk.collect::<Vec<_>>();
IndexerJobData::on_scan_progress(
ctx.clone(),
vec![
ScanProgress::SavedChunks(i as usize),
ScanProgress::Message(format!(
"Writing {} of {} to db",
i * chunk_steps.len(),
total_entries,
)),
],
);
chunk_steps
})
.collect();
Ok(())
}
/// Process each chunk of entries in the indexer job, writing to the `file_path` table
async fn execute_step(
&self,
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let location_path = &state
.data
.as_ref()
.expect("critical error: missing data on job state")
.location_path;
let count = ctx
.library_ctx()
.db
.file_path()
.create_many(
state.steps[0]
.iter()
.map(|entry| {
let name;
let extension;
// if 'entry.path' is a directory, set extension to an empty string to
// avoid periods in folder names being interpreted as file extensions
if entry.is_dir {
extension = "".to_string();
name = extract_name(entry.path.file_name());
} else {
// if the 'entry.path' is not a directory, then get the extension and name.
extension = extract_name(entry.path.extension());
name = extract_name(entry.path.file_stem());
}
let materialized_path = entry
.path
.strip_prefix(location_path)
.unwrap()
.to_string_lossy()
.to_string();
file_path::create(
entry.file_id,
materialized_path,
name,
vec![
file_path::is_dir::set(entry.is_dir),
file_path::extension::set(Some(extension)),
file_path::location_id::set(state.init.location.id),
file_path::parent_id::set(entry.parent_id),
file_path::date_created::set(entry.created_at.into()),
],
)
})
.collect(),
)
.exec()
.await?;
info!("Inserted {count} records");
Ok(())
}
/// Logs some metadata about the indexer job
async fn finalize(
&self,
_ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
let data = state
.data
.as_ref()
.expect("critical error: missing data on job state");
info!(
"scan of {} completed in {:?}. {:?} files found. db write completed in {:?}",
state.init.location.local_path.as_ref().unwrap(),
data.scan_read_time,
data.total_paths,
(Utc::now() - data.db_write_start)
.to_std()
.expect("critical error: non-negative duration"),
);
Ok(())
}
}
/// Extract name from OsStr returned by PathBuff
fn extract_name(os_string: Option<&OsStr>) -> String {
os_string
.unwrap_or_default()
.to_str()
.unwrap_or_default()
.to_owned()
}

View file

@ -0,0 +1,382 @@
use crate::{
library::LibraryContext,
location::indexer::IndexerError,
prisma::{indexer_rule, PrismaClient},
};
use chrono::{DateTime, Utc};
use globset::Glob;
use int_enum::IntEnum;
use rmp_serde;
use rspc::Type;
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, path::Path};
use tokio::fs;
/// `IndexerRuleCreateArgs` is the argument received from the client using rspc to create a new indexer rule.
/// Note that `parameters` field **MUST** be a JSON object serialized to bytes.
///
/// In case of `RuleKind::AcceptFilesByGlob` or `RuleKind::RejectFilesByGlob`, it will be a
/// single string containing a glob pattern.
///
/// In case of `RuleKind::AcceptIfChildrenDirectoriesArePresent` or `RuleKind::RejectIfChildrenDirectoriesArePresent` the
/// `parameters` field must be a vector of strings containing the names of the directories.
#[derive(Type, Deserialize)]
pub struct IndexerRuleCreateArgs {
pub kind: RuleKind,
pub name: String,
pub parameters: Vec<u8>,
}
impl IndexerRuleCreateArgs {
pub async fn create(self, ctx: &LibraryContext) -> Result<indexer_rule::Data, IndexerError> {
let parameters = match self.kind {
RuleKind::AcceptFilesByGlob | RuleKind::RejectFilesByGlob => rmp_serde::to_vec(
&Glob::new(&serde_json::from_slice::<String>(&self.parameters)?)?,
)?,
RuleKind::AcceptIfChildrenDirectoriesArePresent
| RuleKind::RejectIfChildrenDirectoriesArePresent => {
rmp_serde::to_vec(&serde_json::from_slice::<Vec<String>>(&self.parameters)?)?
}
};
ctx.db
.indexer_rule()
.create(self.kind as i32, self.name, parameters, vec![])
.exec()
.await
.map_err(Into::into)
}
}
#[repr(i32)]
#[non_exhaustive]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Type, Eq, PartialEq, IntEnum, Hash)]
pub enum RuleKind {
AcceptFilesByGlob = 0,
RejectFilesByGlob = 1,
AcceptIfChildrenDirectoriesArePresent = 2,
RejectIfChildrenDirectoriesArePresent = 3,
}
/// `ParametersPerKind` is a mapping from `RuleKind` to the parameters required for each kind of rule.
/// In case of doubt about globs, consult <https://docs.rs/globset/latest/globset/#syntax>
///
/// We store directly globs in the database, serialized using rmp_serde.
///
/// In case of `ParametersPerKind::AcceptIfChildrenDirectoriesArePresent` or `ParametersPerKind::RejectIfChildrenDirectoriesArePresent`
/// first we change the data structure to a vector, then we serialize it.
#[derive(Debug)]
pub enum ParametersPerKind {
AcceptFilesByGlob(Glob),
RejectFilesByGlob(Glob),
AcceptIfChildrenDirectoriesArePresent(HashSet<String>),
RejectIfChildrenDirectoriesArePresent(HashSet<String>),
}
impl ParametersPerKind {
async fn apply(&self, source: impl AsRef<Path>) -> Result<bool, IndexerError> {
match self {
ParametersPerKind::AcceptIfChildrenDirectoriesArePresent(children) => {
accept_dir_for_its_children(source, children).await
}
ParametersPerKind::RejectIfChildrenDirectoriesArePresent(children) => {
reject_dir_for_its_children(source, children).await
}
ParametersPerKind::AcceptFilesByGlob(glob) => accept_by_glob(source, glob),
ParametersPerKind::RejectFilesByGlob(glob) => reject_by_glob(source, glob),
}
}
fn serialize(self) -> Result<Vec<u8>, IndexerError> {
match self {
Self::AcceptFilesByGlob(glob) | Self::RejectFilesByGlob(glob) => {
rmp_serde::to_vec_named(&glob).map_err(Into::into)
}
Self::AcceptIfChildrenDirectoriesArePresent(children)
| Self::RejectIfChildrenDirectoriesArePresent(children) => {
rmp_serde::to_vec(&children.into_iter().collect::<Vec<_>>()).map_err(Into::into)
}
}
}
}
#[derive(Debug)]
pub struct IndexerRule {
pub id: Option<i32>,
pub kind: RuleKind,
pub name: String,
pub parameters: ParametersPerKind,
pub date_created: DateTime<Utc>,
pub date_modified: DateTime<Utc>,
}
impl IndexerRule {
pub fn new(kind: RuleKind, name: String, parameters: ParametersPerKind) -> Self {
Self {
id: None,
kind,
name,
parameters,
date_created: Utc::now(),
date_modified: Utc::now(),
}
}
pub async fn apply(&self, source: impl AsRef<Path>) -> Result<bool, IndexerError> {
self.parameters.apply(source).await
}
pub async fn save(self, client: &PrismaClient) -> Result<(), IndexerError> {
if let Some(id) = self.id {
client
.indexer_rule()
.upsert(
indexer_rule::id::equals(id),
(
self.kind as i32,
self.name,
self.parameters.serialize()?,
vec![],
),
vec![indexer_rule::date_modified::set(Utc::now().into())],
)
.exec()
.await?;
} else {
client
.indexer_rule()
.create(
self.kind as i32,
self.name,
self.parameters.serialize()?,
vec![],
)
.exec()
.await?;
}
Ok(())
}
}
impl TryFrom<&indexer_rule::Data> for IndexerRule {
type Error = IndexerError;
fn try_from(data: &indexer_rule::Data) -> Result<Self, Self::Error> {
let kind = RuleKind::from_int(data.kind)?;
Ok(Self {
id: Some(data.id),
kind,
name: data.name.clone(),
parameters: match kind {
RuleKind::AcceptFilesByGlob | RuleKind::RejectFilesByGlob => {
let glob_str = rmp_serde::from_slice(&data.parameters)?;
if matches!(kind, RuleKind::AcceptFilesByGlob) {
ParametersPerKind::AcceptFilesByGlob(glob_str)
} else {
ParametersPerKind::RejectFilesByGlob(glob_str)
}
}
RuleKind::AcceptIfChildrenDirectoriesArePresent
| RuleKind::RejectIfChildrenDirectoriesArePresent => {
let childrens = rmp_serde::from_slice::<Vec<String>>(&data.parameters)?
.into_iter()
.collect();
if matches!(kind, RuleKind::AcceptIfChildrenDirectoriesArePresent) {
ParametersPerKind::AcceptIfChildrenDirectoriesArePresent(childrens)
} else {
ParametersPerKind::RejectIfChildrenDirectoriesArePresent(childrens)
}
}
},
date_created: data.date_created.into(),
date_modified: data.date_modified.into(),
})
}
}
impl TryFrom<indexer_rule::Data> for IndexerRule {
type Error = IndexerError;
fn try_from(data: indexer_rule::Data) -> Result<Self, Self::Error> {
Self::try_from(&data)
}
}
fn accept_by_glob(source: impl AsRef<Path>, glob: &Glob) -> Result<bool, IndexerError> {
Ok(glob.compile_matcher().is_match(source.as_ref()))
}
fn reject_by_glob(source: impl AsRef<Path>, reject_glob: &Glob) -> Result<bool, IndexerError> {
Ok(!reject_glob.compile_matcher().is_match(source.as_ref()))
}
async fn accept_dir_for_its_children(
source: impl AsRef<Path>,
children: &HashSet<String>,
) -> Result<bool, IndexerError> {
let source = source.as_ref();
let mut read_dir = fs::read_dir(source).await?;
while let Some(entry) = read_dir.next_entry().await? {
if entry.metadata().await?.is_dir()
&& children.contains(entry.file_name().to_string_lossy().as_ref())
{
return Ok(true);
}
}
Ok(false)
}
async fn reject_dir_for_its_children(
source: impl AsRef<Path>,
children: &HashSet<String>,
) -> Result<bool, IndexerError> {
let source = source.as_ref();
let mut read_dir = fs::read_dir(source).await?;
while let Some(entry) = read_dir.next_entry().await? {
if entry.metadata().await?.is_dir()
&& children.contains(entry.file_name().to_string_lossy().as_ref())
{
return Ok(false);
}
}
Ok(true)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
use tokio::fs;
#[tokio::test]
async fn test_reject_hidden_file() {
let hidden = Path::new(".hidden.txt");
let normal = Path::new("normal.txt");
let hidden_inner_dir = Path::new("/test/.hidden/");
let hidden_inner_file = Path::new("/test/.hidden/file.txt");
let normal_inner_dir = Path::new("/test/normal/");
let normal_inner_file = Path::new("/test/normal/inner.txt");
let rule = IndexerRule::new(
RuleKind::RejectFilesByGlob,
"ignore hidden files".to_string(),
ParametersPerKind::RejectFilesByGlob(Glob::new("**/.*").unwrap()),
);
assert!(!rule.apply(hidden).await.unwrap());
assert!(rule.apply(normal).await.unwrap());
assert!(!rule.apply(hidden_inner_dir).await.unwrap());
assert!(!rule.apply(hidden_inner_file).await.unwrap());
assert!(rule.apply(normal_inner_dir).await.unwrap());
assert!(rule.apply(normal_inner_file).await.unwrap());
}
#[tokio::test]
async fn test_reject_specific_dir() {
let project_file = Path::new("/test/project/src/main.rs");
let project_build_dir = Path::new("/test/project/target");
let project_build_dir_inner = Path::new("/test/project/target/debug/");
let rule = IndexerRule::new(
RuleKind::RejectFilesByGlob,
"ignore build directory".to_string(),
ParametersPerKind::RejectFilesByGlob(Glob::new("{**/target/*,**/target}").unwrap()),
);
assert!(rule.apply(project_file).await.unwrap());
assert!(!rule.apply(project_build_dir).await.unwrap());
assert!(!rule.apply(project_build_dir_inner).await.unwrap());
}
#[tokio::test]
async fn test_only_photos() {
let text = Path::new("file.txt");
let png = Path::new("photo1.png");
let jpg = Path::new("photo1.png");
let jpeg = Path::new("photo3.jpeg");
let inner_text = Path::new("/test/file.txt");
let inner_png = Path::new("/test/photo1.png");
let inner_jpg = Path::new("/test/photo2.jpg");
let inner_jpeg = Path::new("/test/photo3.jpeg");
let many_inner_dirs_text = Path::new("/test/1/2/3/4/4/5/6/file.txt");
let many_inner_dirs_png = Path::new("/test/1/2/3/4/4/5/6/photo1.png");
let rule = IndexerRule::new(
RuleKind::AcceptFilesByGlob,
"only photos".to_string(),
ParametersPerKind::AcceptFilesByGlob(Glob::new("*.{jpg,png,jpeg}").unwrap()),
);
assert!(!rule.apply(text).await.unwrap());
assert!(rule.apply(png).await.unwrap());
assert!(rule.apply(jpg).await.unwrap());
assert!(rule.apply(jpeg).await.unwrap());
assert!(!rule.apply(inner_text).await.unwrap());
assert!(rule.apply(inner_png).await.unwrap());
assert!(rule.apply(inner_jpg).await.unwrap());
assert!(rule.apply(inner_jpeg).await.unwrap());
assert!(!rule.apply(many_inner_dirs_text).await.unwrap());
assert!(rule.apply(many_inner_dirs_png).await.unwrap());
}
#[tokio::test]
async fn test_directory_has_children() {
let root = tempdir().unwrap();
let project1 = root.path().join("project1");
let project2 = root.path().join("project2");
let not_project = root.path().join("not_project");
fs::create_dir(&project1).await.unwrap();
fs::create_dir(&project2).await.unwrap();
fs::create_dir(&not_project).await.unwrap();
fs::create_dir(project1.join(".git")).await.unwrap();
fs::create_dir(project2.join(".git")).await.unwrap();
fs::create_dir(project2.join("books")).await.unwrap();
let childrens = [".git".to_string()].into_iter().collect::<HashSet<_>>();
let rule = IndexerRule::new(
RuleKind::AcceptIfChildrenDirectoriesArePresent,
"git projects".to_string(),
ParametersPerKind::AcceptIfChildrenDirectoriesArePresent(childrens),
);
assert!(rule.apply(project1).await.unwrap());
assert!(rule.apply(project2).await.unwrap());
assert!(!rule.apply(not_project).await.unwrap());
}
#[tokio::test]
async fn test_reject_directory_by_its_children() {
let root = tempdir().unwrap();
let project1 = root.path().join("project1");
let project2 = root.path().join("project2");
let not_project = root.path().join("not_project");
fs::create_dir(&project1).await.unwrap();
fs::create_dir(&project2).await.unwrap();
fs::create_dir(&not_project).await.unwrap();
fs::create_dir(project1.join(".git")).await.unwrap();
fs::create_dir(project2.join(".git")).await.unwrap();
fs::create_dir(project2.join("books")).await.unwrap();
let childrens = [".git".to_string()].into_iter().collect::<HashSet<_>>();
let rule = IndexerRule::new(
RuleKind::RejectIfChildrenDirectoriesArePresent,
"git projects".to_string(),
ParametersPerKind::RejectIfChildrenDirectoriesArePresent(childrens),
);
assert!(!rule.apply(project1).await.unwrap());
assert!(!rule.apply(project2).await.unwrap());
assert!(rule.apply(not_project).await.unwrap());
}
}

View file

@ -0,0 +1,54 @@
pub mod indexer_job;
pub mod indexer_rules;
mod walk;
use globset::Error;
use indexer_rules::RuleKind;
use int_enum::IntEnumError;
use rmp_serde::{decode::Error as RMPDecodeError, encode::Error as RMPEncodeError};
use rspc::ErrorCode;
use serde_json::Error as SerdeJsonError;
use std::io;
use thiserror::Error;
/// Error type for the indexer module
#[derive(Error, Debug)]
pub enum IndexerError {
// Not Found errors
#[error("Indexer rule not found: <id={0}>")]
IndexerRuleNotFound(i32),
// User errors
#[error("Invalid indexer rule kind integer: {0}")]
InvalidRuleKindInt(#[from] IntEnumError<RuleKind>),
#[error("Glob builder error: {0}")]
GlobBuilderError(#[from] Error),
// Internal Errors
#[error("Database error: {0}")]
DatabaseError(#[from] prisma_client_rust::QueryError),
#[error("I/O error: {0}")]
IOError(#[from] io::Error),
#[error("Indexer rule parameters json serialization error: {0}")]
RuleParametersSerdeJson(#[from] SerdeJsonError),
#[error("Indexer rule parameters encode error: {0}")]
RuleParametersRMPEncode(#[from] RMPEncodeError),
#[error("Indexer rule parameters decode error: {0}")]
RuleParametersRMPDecode(#[from] RMPDecodeError),
}
impl From<IndexerError> for rspc::Error {
fn from(err: IndexerError) -> Self {
match err {
IndexerError::IndexerRuleNotFound(_) => {
rspc::Error::with_cause(ErrorCode::NotFound, err.to_string(), err)
}
IndexerError::InvalidRuleKindInt(_) | IndexerError::GlobBuilderError(_) => {
rspc::Error::with_cause(ErrorCode::BadRequest, err.to_string(), err)
}
_ => rspc::Error::with_cause(ErrorCode::InternalServerError, err.to_string(), err),
}
}
}

View file

@ -0,0 +1,552 @@
use chrono::{DateTime, Utc};
use std::{
cmp::Ordering,
collections::{HashMap, VecDeque},
hash::{Hash, Hasher},
path::{Path, PathBuf},
};
use tokio::fs;
use tracing::{debug, error};
use super::{
indexer_rules::{IndexerRule, RuleKind},
IndexerError,
};
/// `WalkEntry` represents a single path in the filesystem, for any comparison purposes, we only
/// consider the path itself, not the metadata.
#[derive(Clone, Debug)]
pub(super) struct WalkEntry {
pub(super) path: PathBuf,
pub(super) is_dir: bool,
pub(super) created_at: DateTime<Utc>,
}
impl PartialEq for WalkEntry {
fn eq(&self, other: &Self) -> bool {
self.path == other.path
}
}
impl Eq for WalkEntry {}
impl Hash for WalkEntry {
fn hash<H: Hasher>(&self, state: &mut H) {
self.path.hash(state);
}
}
impl PartialOrd for WalkEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
self.path.partial_cmp(&other.path)
}
}
impl Ord for WalkEntry {
fn cmp(&self, other: &Self) -> Ordering {
self.path.cmp(&other.path)
}
}
/// This function walks through the filesystem, applying the rules to each entry and then returning
/// a list of accepted entries. There are some useful comments in the implementation of this function
/// in case of doubts.
pub(super) async fn walk(
root: PathBuf,
rules_per_kind: &HashMap<RuleKind, Vec<IndexerRule>>,
update_notifier: impl Fn(&Path, usize),
) -> Result<Vec<WalkEntry>, IndexerError> {
let mut to_walk = VecDeque::with_capacity(1);
to_walk.push_back((root.clone(), None));
let mut indexed_paths = HashMap::new();
while let Some((current_path, parent_dir_accepted_by_its_children)) = to_walk.pop_front() {
let mut read_dir = match fs::read_dir(&current_path).await {
Ok(read_dir) => read_dir,
Err(e) => {
error!(
"Error reading directory {}: {:#?}",
current_path.display(),
e
);
continue;
}
};
// Marking with a loop label here in case of rejection or erros, to continue with next entry
'entries: loop {
let entry = match read_dir.next_entry().await {
Ok(Some(entry)) => entry,
Ok(None) => break,
Err(e) => {
error!(
"Error reading entry in {}: {:#?}",
current_path.display(),
e
);
continue;
}
};
// Accept by children has three states,
// None if we don't now yet or if this check doesn't apply
// Some(true) if this check applies and it passes
// Some(false) if this check applies and it was rejected
// and we pass the current parent state to its children
let mut accept_by_children_dir = parent_dir_accepted_by_its_children;
let current_path = entry.path();
update_notifier(&current_path, indexed_paths.len());
debug!(
"Current filesystem path: {}, accept_by_children_dir: {:#?}",
current_path.display(),
accept_by_children_dir
);
if let Some(reject_rules) = rules_per_kind.get(&RuleKind::RejectFilesByGlob) {
for reject_rule in reject_rules {
// It's ok to unwrap here, reject rules are infallible
if !reject_rule.apply(&current_path).await.unwrap() {
debug!(
"Path {} rejected by rule {}",
current_path.display(),
reject_rule.name
);
continue 'entries;
}
}
}
let metadata = entry.metadata().await?;
// TODO: Hard ignoring symlinks for now, but this should be configurable
if metadata.is_symlink() {
continue 'entries;
}
let is_dir = metadata.is_dir();
if is_dir {
// If it is a directory, first we check if we must reject it and its children entirely
if let Some(reject_by_children_rules) =
rules_per_kind.get(&RuleKind::RejectIfChildrenDirectoriesArePresent)
{
for reject_by_children_rule in reject_by_children_rules {
match reject_by_children_rule.apply(&current_path).await {
Ok(false) => {
debug!(
"Path {} rejected by rule {}",
current_path.display(),
reject_by_children_rule.name
);
continue 'entries;
}
Ok(true) => {}
Err(e) => {
error!(
"Error applying rule {} to path {}: {:#?}",
reject_by_children_rule.name,
current_path.display(),
e
);
continue 'entries;
}
}
}
}
// Then we check if we must accept it and its children
if let Some(accept_by_children_rules) =
rules_per_kind.get(&RuleKind::AcceptIfChildrenDirectoriesArePresent)
{
for accept_by_children_rule in accept_by_children_rules {
match accept_by_children_rule.apply(&current_path).await {
Ok(true) => {
accept_by_children_dir = Some(true);
break;
}
Ok(false) => {}
Err(e) => {
error!(
"Error applying rule {} to path {}: {:#?}",
accept_by_children_rule.name,
current_path.display(),
e
);
continue 'entries;
}
}
}
// If it wasn't accepted then we mark as rejected
if accept_by_children_dir.is_none() {
debug!(
"Path {} rejected because it didn't passed in any AcceptIfChildrenDirectoriesArePresent rule",
current_path.display()
);
accept_by_children_dir = Some(false);
}
}
// Then we mark this directory the be walked in too
to_walk.push_back((entry.path(), accept_by_children_dir));
}
let mut accept_by_glob = false;
if let Some(accept_rules) = rules_per_kind.get(&RuleKind::AcceptFilesByGlob) {
for accept_rule in accept_rules {
// It's ok to unwrap here, accept rules are infallible
if accept_rule.apply(&current_path).await.unwrap() {
debug!(
"Path {} accepted by rule {}",
current_path.display(),
accept_rule.name
);
accept_by_glob = true;
break;
}
}
if !accept_by_glob {
debug!(
"Path {} reject because it didn't passed in any AcceptFilesByGlob rules",
current_path.display()
);
continue 'entries;
}
} else {
// If there are no accept rules, then accept all paths
accept_by_glob = true;
}
if accept_by_glob
&& (accept_by_children_dir.is_none() || accept_by_children_dir.unwrap())
{
indexed_paths.insert(
current_path.clone(),
WalkEntry {
path: current_path.clone(),
is_dir,
created_at: metadata.created()?.into(),
},
);
// If the ancestors directories wasn't indexed before, now we do
for ancestor in current_path
.ancestors()
.skip(1) // Skip the current directory as it was already indexed
.take_while(|&ancestor| ancestor != root)
{
debug!("Indexing ancestor {}", ancestor.display());
if !indexed_paths.contains_key(ancestor) {
indexed_paths.insert(
ancestor.to_path_buf(),
WalkEntry {
path: ancestor.to_path_buf(),
is_dir: true,
created_at: fs::metadata(ancestor).await?.created()?.into(),
},
);
} else {
// If indexed_paths contains the current ancestors, then it will contain
// also all if its ancestors too, so we can stop here
break;
}
}
}
}
}
let mut indexed_paths = indexed_paths.into_values().collect::<Vec<_>>();
// Also adding the root location path
let root_created_at = fs::metadata(&root).await?.created()?.into();
indexed_paths.push(WalkEntry {
path: root,
is_dir: true,
created_at: root_created_at,
});
// Sorting so we can give each path a crescent id given the filesystem hierarchy
indexed_paths.sort();
Ok(indexed_paths)
}
#[cfg(test)]
mod tests {
use super::super::indexer_rules::ParametersPerKind;
use super::*;
use chrono::Utc;
use globset::Glob;
use std::collections::BTreeSet;
use tempfile::{tempdir, TempDir};
use tokio::fs;
use tracing_test::traced_test;
async fn prepare_location() -> TempDir {
let root = tempdir().unwrap();
let root_path = root.path();
let rust_project = root_path.join("rust_project");
let inner_project = root_path.join("inner");
let node_project = inner_project.join("node_project");
let photos = root_path.join("photos");
fs::create_dir(&rust_project).await.unwrap();
fs::create_dir(&inner_project).await.unwrap();
fs::create_dir(&node_project).await.unwrap();
fs::create_dir(&photos).await.unwrap();
// Making rust and node projects a git repository
fs::create_dir(rust_project.join(".git")).await.unwrap();
fs::create_dir(node_project.join(".git")).await.unwrap();
// Populating rust project
fs::File::create(rust_project.join("Cargo.toml"))
.await
.unwrap();
let rust_src_dir = rust_project.join("src");
fs::create_dir(&rust_src_dir).await.unwrap();
fs::File::create(rust_src_dir.join("main.rs"))
.await
.unwrap();
let rust_target_dir = rust_project.join("target");
fs::create_dir(&rust_target_dir).await.unwrap();
let rust_build_dir = rust_target_dir.join("debug");
fs::create_dir(&rust_build_dir).await.unwrap();
fs::File::create(rust_build_dir.join("main")).await.unwrap();
// Populating node project
fs::File::create(node_project.join("package.json"))
.await
.unwrap();
let node_src_dir = node_project.join("src");
fs::create_dir(&node_src_dir).await.unwrap();
fs::File::create(node_src_dir.join("App.tsx"))
.await
.unwrap();
let node_modules = node_project.join("node_modules");
fs::create_dir(&node_modules).await.unwrap();
let node_modules_dep = node_modules.join("react");
fs::create_dir(&node_modules_dep).await.unwrap();
fs::File::create(node_modules_dep.join("package.json"))
.await
.unwrap();
// Photos directory
for photo in ["photo1.png", "photo2.jpg", "photo3.jpeg", "text.txt"].iter() {
fs::File::create(photos.join(photo)).await.unwrap();
}
root
}
#[tokio::test]
async fn test_walk_without_rules() {
let root = prepare_location().await;
let root_path = root.path();
let any_datetime = Utc::now();
#[rustfmt::skip]
let expected = [
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("photos"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("photos/text.txt"), is_dir: false, created_at: any_datetime },
]
.into_iter()
.collect::<BTreeSet<_>>();
let actual = walk(root_path.to_path_buf(), &HashMap::new(), |_, _| {})
.await
.unwrap()
.into_iter()
.collect::<BTreeSet<_>>();
assert_eq!(actual, expected);
}
#[tokio::test]
#[traced_test]
async fn test_only_photos() {
let root = prepare_location().await;
let root_path = root.path();
let any_datetime = Utc::now();
#[rustfmt::skip]
let expected = [
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("photos"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("photos/photo1.png"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("photos/photo2.jpg"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("photos/photo3.jpeg"), is_dir: false, created_at: any_datetime },
]
.into_iter()
.collect::<BTreeSet<_>>();
let only_photos_rule = [(
RuleKind::AcceptFilesByGlob,
vec![IndexerRule::new(
RuleKind::AcceptFilesByGlob,
"only photos".to_string(),
ParametersPerKind::AcceptFilesByGlob(Glob::new("{*.png,*.jpg,*.jpeg}").unwrap()),
)],
)]
.into_iter()
.collect::<HashMap<_, _>>();
let actual = walk(root_path.to_path_buf(), &only_photos_rule, |_, _| {})
.await
.unwrap()
.into_iter()
.collect::<BTreeSet<_>>();
assert_eq!(actual, expected);
}
#[tokio::test]
#[traced_test]
async fn test_git_repos() {
let root = prepare_location().await;
let root_path = root.path();
let any_datetime = Utc::now();
#[rustfmt::skip]
let expected = [
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/target"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/target/debug"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/target/debug/main"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/node_modules"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/node_modules/react/package.json"), is_dir: false, created_at: any_datetime },
]
.into_iter()
.collect::<BTreeSet<_>>();
let git_repos = [(
RuleKind::AcceptIfChildrenDirectoriesArePresent,
vec![IndexerRule::new(
RuleKind::AcceptIfChildrenDirectoriesArePresent,
"git repos".to_string(),
ParametersPerKind::AcceptIfChildrenDirectoriesArePresent(
[".git".to_string()].into_iter().collect(),
),
)],
)]
.into_iter()
.collect::<HashMap<_, _>>();
let actual = walk(root_path.to_path_buf(), &git_repos, |_, _| {})
.await
.unwrap()
.into_iter()
.collect::<BTreeSet<_>>();
assert_eq!(actual, expected);
}
#[tokio::test]
#[traced_test]
async fn git_repos_without_deps_or_build_dirs() {
let root = prepare_location().await;
let root_path = root.path();
let any_datetime = Utc::now();
#[rustfmt::skip]
let expected = [
WalkEntry { path: root_path.to_path_buf(), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/.git"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/Cargo.toml"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/src"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("rust_project/src/main.rs"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/.git"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/package.json"), is_dir: false, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/src"), is_dir: true, created_at: any_datetime.clone() },
WalkEntry { path: root_path.join("inner/node_project/src/App.tsx"), is_dir: false, created_at: any_datetime },
]
.into_iter()
.collect::<BTreeSet<_>>();
let git_repos_no_deps_no_build_dirs = [
(
RuleKind::AcceptIfChildrenDirectoriesArePresent,
vec![IndexerRule::new(
RuleKind::AcceptIfChildrenDirectoriesArePresent,
"git repos".to_string(),
ParametersPerKind::AcceptIfChildrenDirectoriesArePresent(
[".git".to_string()].into_iter().collect(),
),
)],
),
(
RuleKind::RejectFilesByGlob,
vec![
IndexerRule::new(
RuleKind::RejectFilesByGlob,
"reject node_modules".to_string(),
ParametersPerKind::RejectFilesByGlob(
Glob::new("{**/node_modules/*,**/node_modules}").unwrap(),
),
),
IndexerRule::new(
RuleKind::RejectFilesByGlob,
"reject rust build dir".to_string(),
ParametersPerKind::RejectFilesByGlob(
Glob::new("{**/target/*,**/target}").unwrap(),
),
),
],
),
]
.into_iter()
.collect::<HashMap<_, _>>();
let actual = walk(
root_path.to_path_buf(),
&git_repos_no_deps_no_build_dirs,
|_, _| {},
)
.await
.unwrap()
.into_iter()
.collect::<BTreeSet<_>>();
assert_eq!(actual, expected);
}
}

282
core/src/location/mod.rs Normal file
View file

@ -0,0 +1,282 @@
use crate::{
api::LibraryArgs,
encode::{ThumbnailJob, ThumbnailJobInit},
file::cas::{FileIdentifierJob, FileIdentifierJobInit},
invalidate_query,
job::Job,
library::LibraryContext,
prisma::{indexer_rule, indexer_rules_in_location, location},
};
use rspc::Type;
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, path::PathBuf};
use tokio::{
fs::{metadata, File},
io::AsyncWriteExt,
};
use tracing::{debug, info};
use uuid::Uuid;
mod error;
pub mod indexer;
pub use error::LocationError;
use indexer::indexer_job::{IndexerJob, IndexerJobInit};
static DOTFILE_NAME: &str = ".spacedrive";
/// `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location.
/// It has the actual path and a vector of indexer rules ids, to create many-to-many relationships
/// between the location and indexer rules.
#[derive(Type, Deserialize)]
pub struct LocationCreateArgs {
pub path: PathBuf,
pub indexer_rules_ids: Vec<i32>,
}
impl LocationCreateArgs {
pub async fn create(self, ctx: &LibraryContext) -> Result<location::Data, LocationError> {
// check if we have access to this location
if !self.path.exists() {
return Err(LocationError::PathNotFound(self.path));
}
let path_metadata = metadata(&self.path)
.await
.map_err(|e| LocationError::DotfileReadFailure(e, self.path.clone()))?;
if path_metadata.permissions().readonly() {
return Err(LocationError::ReadonlyDotFileLocationFailure(self.path));
}
if !path_metadata.is_dir() {
return Err(LocationError::NotDirectory(self.path));
}
debug!(
"Trying to create new location for '{}'",
self.path.display()
);
let uuid = Uuid::new_v4();
let mut location = ctx
.db
.location()
.create(
uuid.as_bytes().to_vec(),
vec![
location::name::set(Some(
self.path.file_name().unwrap().to_str().unwrap().to_string(),
)),
location::is_online::set(true),
location::local_path::set(Some(self.path.to_string_lossy().to_string())),
location::node_id::set(Some(ctx.node_local_id)),
],
)
.exec()
.await?;
info!("Created location: {:?}", location);
if !self.indexer_rules_ids.is_empty() {
link_location_and_indexer_rules(ctx, location.id, &self.indexer_rules_ids).await?;
}
// Updating our location variable to include information about the indexer rules
location = fetch_location(ctx, location.id)
.with(with_indexer_rules(location.id))
.exec()
.await?
.ok_or(LocationError::IdNotFound(location.id))?;
// write a file called .spacedrive to path containing the location id in JSON format
let mut dotfile = File::create(self.path.with_file_name(DOTFILE_NAME))
.await
.map_err(|e| LocationError::DotfileWriteFailure(e, self.path.clone()))?;
let json_bytes = serde_json::to_vec(&DotSpacedrive {
location_uuid: uuid,
library_uuid: ctx.id,
})
.map_err(|e| LocationError::DotfileSerializeFailure(e, self.path.clone()))?;
dotfile
.write_all(&json_bytes)
.await
.map_err(|e| LocationError::DotfileWriteFailure(e, self.path))?;
invalidate_query!(
ctx,
"locations.get": LibraryArgs<()>,
LibraryArgs::new(ctx.id, ())
);
Ok(location)
}
}
/// `LocationUpdateArgs` is the argument received from the client using `rspc` to update a location.
/// It contains the id of the location to be updated, possible a name to change the current location's name
/// and a vector of indexer rules ids to add or remove from the location.
///
/// It is important to note that only the indexer rule ids in this vector will be used from now on.
/// Old rules that aren't in this vector will be purged.
#[derive(Type, Deserialize)]
pub struct LocationUpdateArgs {
pub id: i32,
pub name: Option<String>,
pub indexer_rules_ids: Vec<i32>,
}
impl LocationUpdateArgs {
pub async fn update(self, ctx: &LibraryContext) -> Result<(), LocationError> {
let location = fetch_location(ctx, self.id)
.with(with_indexer_rules(self.id))
.exec()
.await?
.ok_or(LocationError::IdNotFound(self.id))?;
if location.name != self.name {
ctx.db
.location()
.update(
location::id::equals(self.id),
vec![location::name::set(self.name)],
)
.exec()
.await?;
}
let current_rules_ids = location
.indexer_rules
.unwrap()
.iter()
.map(|r| r.indexer_rule_id)
.collect::<HashSet<_>>();
let new_rules_ids = self.indexer_rules_ids.into_iter().collect::<HashSet<_>>();
if current_rules_ids != new_rules_ids {
let rule_ids_to_add = new_rules_ids
.difference(&current_rules_ids)
.copied()
.collect::<Vec<_>>();
let rule_ids_to_remove = current_rules_ids
.difference(&new_rules_ids)
.copied()
.collect::<Vec<_>>();
if !rule_ids_to_remove.is_empty() {
ctx.db
.indexer_rules_in_location()
.delete_many(vec![
indexer_rules_in_location::location_id::equals(self.id),
indexer_rules_in_location::indexer_rule_id::in_vec(rule_ids_to_remove),
])
.exec()
.await?;
}
if !rule_ids_to_add.is_empty() {
link_location_and_indexer_rules(ctx, self.id, &rule_ids_to_add).await?;
}
}
Ok(())
}
}
#[derive(Serialize, Deserialize, Default)]
pub struct DotSpacedrive {
pub location_uuid: Uuid,
pub library_uuid: Uuid,
}
// checks to see if a location is:
// - accessible on from the local filesystem
// - already exists in the database
// pub async fn check_location(path: &str) -> Result<DotSpacedrive, LocationError> {
// let dotfile: DotSpacedrive = match fs::File::open(format!("{}/{}", path.clone(), DOTFILE_NAME))
// {
// Ok(file) => serde_json::from_reader(file).unwrap_or(DotSpacedrive::default()),
// Err(e) => return Err(LocationError::DotfileReadFailure(e)),
// };
// Ok(dotfile)
// }
pub fn fetch_location(ctx: &LibraryContext, location_id: i32) -> location::FindUnique {
ctx.db
.location()
.find_unique(location::id::equals(location_id))
}
pub fn with_indexer_rules(location_id: i32) -> location::indexer_rules::Fetch {
location::indexer_rules::fetch(vec![indexer_rules_in_location::location_id::equals(
location_id,
)])
.with(indexer_rules_in_location::indexer_rule::fetch())
}
async fn link_location_and_indexer_rules(
ctx: &LibraryContext,
location_id: i32,
rules_ids: &[i32],
) -> Result<(), LocationError> {
ctx.db
.indexer_rules_in_location()
.create_many(
rules_ids
.iter()
.map(|id| {
indexer_rules_in_location::create(
location::id::equals(location_id),
indexer_rule::id::equals(*id),
vec![],
)
})
.collect(),
)
.exec()
.await?;
Ok(())
}
pub async fn scan_location(
ctx: &LibraryContext,
location: &location::Data,
) -> Result<(), LocationError> {
if location.local_path.is_none() {
return Err(LocationError::MissingLocalPath(location.id));
};
let location_id = location.id;
ctx.spawn_job(Job::new(
IndexerJobInit {
location: location.clone(),
},
Box::new(IndexerJob {}),
))
.await;
ctx.queue_job(Job::new(
FileIdentifierJobInit {
location: location.clone(),
},
Box::new(FileIdentifierJob {}),
))
.await;
ctx.queue_job(Job::new(
ThumbnailJobInit {
location_id,
path: PathBuf::new(),
background: true,
},
Box::new(ThumbnailJob {}),
))
.await;
Ok(())
}

View file

@ -8,7 +8,6 @@ use std::{
};
use thiserror::Error;
use tokio::sync::{RwLock, RwLockWriteGuard};
use uuid::Uuid;
/// NODE_STATE_CONFIG_NAME is the name of the file which stores the NodeState

View file

@ -1,3 +1,5 @@
use crate::prisma::node;
use chrono::{DateTime, Utc};
use int_enum::IntEnum;
use rspc::Type;
@ -5,7 +7,7 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;
mod config;
use crate::prisma::node;
pub use config::*;
#[derive(Debug, Clone, Serialize, Deserialize, Type)]

View file

@ -1,205 +0,0 @@
use crate::{
api::LibraryArgs,
encode::{ThumbnailJob, ThumbnailJobInit},
file::{
cas::{FileIdentifierJob, FileIdentifierJobInit},
indexer::{IndexerJob, IndexerJobInit},
},
invalidate_query,
job::Job,
library::LibraryContext,
prisma::location,
};
use rspc::ErrorCode;
use serde::{Deserialize, Serialize};
use std::{
fmt::Debug,
path::{Path, PathBuf},
};
use thiserror::Error;
use tokio::{
fs::{metadata, File},
io::{self, AsyncWriteExt},
};
use tracing::info;
use uuid::Uuid;
#[derive(Serialize, Deserialize, Default)]
pub struct DotSpacedrive {
pub location_uuid: Uuid,
pub library_uuid: Uuid,
}
static DOTFILE_NAME: &str = ".spacedrive";
// checks to see if a location is:
// - accessible on from the local filesystem
// - already exists in the database
// pub async fn check_location(path: &str) -> Result<DotSpacedrive, LocationError> {
// let dotfile: DotSpacedrive = match fs::File::open(format!("{}/{}", path.clone(), DOTFILE_NAME))
// {
// Ok(file) => serde_json::from_reader(file).unwrap_or(DotSpacedrive::default()),
// Err(e) => return Err(LocationError::DotfileReadFailure(e)),
// };
// Ok(dotfile)
// }
pub async fn scan_location(ctx: &LibraryContext, location_id: i32, path: impl AsRef<Path>) {
let path_buf = path.as_ref().to_path_buf();
ctx.spawn_job(Job::new(
IndexerJobInit {
path: path_buf.clone(),
},
Box::new(IndexerJob {}),
))
.await;
ctx.queue_job(Job::new(
FileIdentifierJobInit {
location_id,
path: path_buf.clone(),
},
Box::new(FileIdentifierJob {}),
))
.await;
ctx.queue_job(Job::new(
ThumbnailJobInit {
location_id,
path: path_buf,
background: true,
},
Box::new(ThumbnailJob {}),
))
.await;
}
pub async fn create_location(
library: &LibraryContext,
path: impl AsRef<Path> + Debug,
) -> Result<location::Data, LocationError> {
let path = path.as_ref();
// check if we have access to this location
if !path.exists() {
return Err(LocationError::PathNotFound(path.to_owned()));
}
if metadata(path)
.await
.map_err(|e| LocationError::DotfileReadFailure(e, path.to_owned()))?
.permissions()
.readonly()
{
return Err(LocationError::ReadonlyDotFileLocationFailure(
path.to_owned(),
));
}
let path_string = path.to_str().unwrap().to_string();
// check if location already exists
let location_resource = if let Some(location) = library
.db
.location()
.find_first(vec![location::local_path::equals(Some(
path_string.clone(),
))])
.exec()
.await?
{
location
} else {
info!(
"Location does not exist, creating new location for '{}'",
path_string
);
let uuid = Uuid::new_v4();
let location = library
.db
.location()
.create(
uuid.as_bytes().to_vec(),
vec![
location::name::set(Some(
path.file_name().unwrap().to_str().unwrap().to_string(),
)),
location::is_online::set(true),
location::local_path::set(Some(path_string)),
location::node_id::set(Some(library.node_local_id)),
],
)
.exec()
.await?;
info!("Created location: {:?}", location);
// write a file called .spacedrive to path containing the location id in JSON format
let mut dotfile = File::create(path.with_file_name(DOTFILE_NAME))
.await
.map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?;
let data = DotSpacedrive {
location_uuid: uuid,
library_uuid: library.id,
};
let json_bytes = serde_json::to_vec(&data)
.map_err(|e| LocationError::DotfileSerializeFailure(e, path.to_owned()))?;
dotfile
.write_all(&json_bytes)
.await
.map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?;
invalidate_query!(
library,
"locations.get": LibraryArgs<()>,
LibraryArgs {
library_id: library.id,
arg: ()
}
);
location
};
Ok(location_resource)
}
#[derive(Error, Debug)]
pub enum LocationError {
#[error("Failed to create location (uuid {uuid:?})")]
CreateFailure { uuid: Uuid },
#[error("Failed to read location dotfile (path: {1:?})")]
DotfileReadFailure(io::Error, PathBuf),
#[error("Failed to serialize dotfile for location (at path: {1:?})")]
DotfileSerializeFailure(serde_json::Error, PathBuf),
#[error("Dotfile location is read only (at path: {0:?})")]
ReadonlyDotFileLocationFailure(PathBuf),
#[error("Failed to write dotfile (path: {1:?})")]
DotfileWriteFailure(io::Error, PathBuf),
#[error("Location not found (path: {0:?})")]
PathNotFound(PathBuf),
#[error("Location not found (uuid: {0})")]
UuidNotFound(Uuid),
#[error("Location not found (id: {0})")]
IdNotFound(i32),
#[error("Failed to open file from local os")]
FileReadError(io::Error),
#[error("Failed to read mounted volumes from local os")]
VolumeReadError(String),
#[error("Failed to connect to database (error: {0:?})")]
IOError(io::Error),
#[error("Database error")]
DatabaseError(#[from] prisma_client_rust::QueryError),
}
impl From<LocationError> for rspc::Error {
fn from(err: LocationError) -> Self {
rspc::Error::new(ErrorCode::InternalServerError, err.to_string())
}
}

View file

@ -1,5 +0,0 @@
mod locations;
mod volumes;
pub use locations::*;
pub use volumes::*;

View file

@ -11,9 +11,9 @@ static MIGRATIONS_DIR: Dir = include_dir!("$CARGO_MANIFEST_DIR/prisma/migrations
/// MigrationError represents an error that occurring while opening a initialising and running migrations on the database.
#[derive(Error, Debug)]
pub enum MigrationError {
#[error("An error occurred while initialising a new database connection")]
#[error("An error occurred while initialising a new database connection: {0}")]
DatabaseInitialization(#[from] NewClientError),
#[error("An error occurred with the database while applying migrations")]
#[error("An error occurred with the database while applying migrations: {0}")]
DatabaseError(#[from] prisma_client_rust::QueryError),
#[error("An error occurred reading the embedded migration files. {0}. Please report to Spacedrive developers!")]
InvalidEmbeddedMigration(&'static str),

View file

@ -1 +1,2 @@
pub mod db;
pub mod seeder;

50
core/src/util/seeder.rs Normal file
View file

@ -0,0 +1,50 @@
use crate::{
location::indexer::{
indexer_rules::{IndexerRule, ParametersPerKind, RuleKind},
IndexerError,
},
prisma::PrismaClient,
};
use globset::Glob;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum SeederError {
#[error("Failed to run indexer rules seeder: {0}")]
IndexerRules(#[from] IndexerError),
#[error("An error occurred with the database while applying migrations: {0}")]
DatabaseError(#[from] prisma_client_rust::QueryError),
}
pub async fn indexer_rules_seeder(client: &PrismaClient) -> Result<(), SeederError> {
if client.indexer_rule().count(vec![]).exec().await? == 0 {
for rule in [
IndexerRule::new(
RuleKind::RejectFilesByGlob,
"Reject Hidden Files".to_string(),
ParametersPerKind::RejectFilesByGlob(
Glob::new("**/.*").map_err(IndexerError::GlobBuilderError)?,
),
),
IndexerRule::new(
RuleKind::AcceptIfChildrenDirectoriesArePresent,
"Git Repositories".into(),
ParametersPerKind::AcceptIfChildrenDirectoriesArePresent(
[".git".to_string()].into_iter().collect(),
),
),
IndexerRule::new(
RuleKind::AcceptFilesByGlob,
"Only Images".to_string(),
ParametersPerKind::AcceptFilesByGlob(
Glob::new("*.{jpg,png,jpeg,gif,webp}")
.map_err(IndexerError::GlobBuilderError)?,
),
),
] {
rule.save(client).await?;
}
}
Ok(())
}

View file

@ -1,10 +1,10 @@
use crate::{library::LibraryContext, prisma::volume::*};
use rspc::Type;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use std::process::Command;
use sysinfo::{DiskExt, System, SystemExt};
use thiserror::Error;
#[derive(Serialize, Deserialize, Debug, Default, Clone, Type)]
pub struct Volume {
@ -28,7 +28,7 @@ pub enum VolumeError {
impl From<VolumeError> for rspc::Error {
fn from(e: VolumeError) -> Self {
rspc::Error::new(rspc::ErrorCode::InternalServerError, e.to_string())
rspc::Error::with_cause(rspc::ErrorCode::InternalServerError, e.to_string(), e)
}
}

View file

@ -4,7 +4,7 @@ import { FilePath, Location } from '@sd/core';
import { Button, TextArea } from '@sd/ui';
import moment from 'moment';
import { Heart, Link } from 'phosphor-react';
import React, { useEffect, useState } from 'react';
import React, { useCallback, useEffect, useState } from 'react';
import types from '../../constants/file-types.json';
import { Tooltip } from '../tooltip/Tooltip';
@ -30,6 +30,14 @@ const MetaItem = (props: MetaItemProps) => {
const Divider = () => <div className="w-full my-1 h-[1px] bg-gray-100 dark:bg-gray-550" />;
function debounce<T>(fn: (args: T) => void, delay: number): (args: T) => void {
let timerId: number | undefined;
return (...args) => {
clearTimeout(timerId);
timerId = setTimeout(() => fn(...args), delay);
};
}
export const Inspector = (props: {
locationId: number;
location?: Location | null;
@ -47,24 +55,19 @@ export const Inspector = (props: {
);
const { mutate: fileSetNote } = useLibraryMutation('files.setNote');
// notes are cached in a store by their file id
// this is so we can ensure every note has been sent to Rust even
// when quickly navigating files, which cancels update function
const [note, setNote] = useState(props.location?.local_path || '');
const [note, setNote] = useState(props.selectedFile?.file?.note || '');
useEffect(() => {
// Update debounced value after delay
const handler = setTimeout(() => {
setNote(props.selectedFile?.file?.note || '');
}, [props.selectedFile?.file?.note]);
const debouncedNote = useCallback(
debounce((note: string) => {
fileSetNote({
id: file_id,
note
});
}, 500);
return () => {
clearTimeout(handler);
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [note]);
}, 2000),
[file_id]
);
const toggleFavorite = () => {
if (!isFavoriteLoading) {
@ -81,6 +84,7 @@ export const Inspector = (props: {
function handleNoteUpdate(e: React.ChangeEvent<HTMLTextAreaElement>) {
if (e.target.value !== note) {
setNote(e.target.value);
debouncedNote(e.target.value);
}
}

View file

@ -12,6 +12,7 @@ import clsx from 'clsx';
import { CirclesFour, Planet } from 'phosphor-react';
import React, { useContext, useEffect } from 'react';
import { NavLink, NavLinkProps, useNavigate } from 'react-router-dom';
import { LocationCreateArgs } from '@sd/core';
import { Folder } from '../icons/Folder';
import RunningJobsWidget from '../jobs/RunningJobsWidget';
@ -222,7 +223,8 @@ export const Sidebar: React.FC<SidebarProps> = (props) => {
onClick={() => {
appProps?.openDialog({ directory: true }).then((result) => {
console.log(result);
if (result) createLocation(result as string);
// TODO: Pass indexer rules ids to create location
if (result) createLocation( { path: result as string, indexer_rules_ids: [] } as LocationCreateArgs);
});
}}
className={clsx(

View file

@ -2,6 +2,7 @@ import { useLibraryMutation, useLibraryQuery } from '@sd/client';
import { AppPropsContext } from '@sd/client';
import { Button } from '@sd/ui';
import React, { useContext } from 'react';
import { LocationCreateArgs } from '@sd/core';
import LocationListItem from '../../../components/location/LocationListItem';
import { InputContainer } from '../../../components/primitive/InputContainer';
@ -28,7 +29,8 @@ export default function LocationSettings() {
size="sm"
onClick={() => {
appProps?.openDialog({ directory: true }).then((result) => {
if (result) createLocation(result as string);
// TODO: Pass indexer rules ids to create location
if (result) createLocation({ path: result as string, indexer_rules_ids: [] } as LocationCreateArgs);
});
}}
>