libobs/util: Add buffered file serializer

Adapted from 898256d416

Co-authored-by: Richard Stanway <r1ch@r1ch.net>
This commit is contained in:
derrod 2024-04-27 06:22:22 +02:00 committed by Lain
parent dc4cba7427
commit 89c7a9608b
4 changed files with 491 additions and 0 deletions

View file

@ -181,3 +181,44 @@ File Output Serializer Functions
.. function:: void file_output_serializer_free(struct serializer *s)
Frees the file output serializer and saves the file.
---------------------
Buffered File Output Serializer
===============================
Provides a buffered file serializer that writes data asynchronously to prevent stalls due to slow disk I/O.
Writes will only block when the buffer is full.
The buffer and chunk size are configurable with the defaults being 256 MiB and 1 MiB respectively.
.. versionadded:: 30.2
.. code:: cpp
#include <util/buffered-file-serializer.h>
Buffered File Output Serializer Functions
-----------------------------------------
.. function:: bool buffered_file_serializer_init_defaults(struct serializer *s, const char *path)
Initializes a buffered file output serializer with default buffer and chunk sizes.
:return: *true* if file created successfully, *false* otherwise
---------------------
.. function:: bool buffered_file_serializer_init(struct serializer *s, const char *path, size_t max_bufsize, size_t chunk_size)
Initialize buffered writer with specified buffer and chunk sizes. Setting either to `0` will use the default value.
:return: *true* if file created successfully, *false* otherwise
---------------------
.. function:: void buffered_file_serializer_free(struct serializer *s)
Frees the file output serializer and saves the file. Will block until I/O thread completes outstanding writes.

View file

@ -99,6 +99,8 @@ target_sources(
util/bitstream.h
util/bmem.c
util/bmem.h
util/buffered-file-serializer.c
util/buffered-file-serializer.h
util/c99defs.h
util/cf-lexer.c
util/cf-lexer.h

View file

@ -0,0 +1,414 @@
/*
* Copyright (c) 2024 Dennis Sädtler <dennis@obsproject.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "buffered-file-serializer.h"
#include <inttypes.h>
#include "platform.h"
#include "threading.h"
#include "deque.h"
#include "dstr.h"
static const size_t DEFAULT_BUF_SIZE = 256ULL * 1048576ULL; // 256 MiB
static const size_t DEFAULT_CHUNK_SIZE = 1048576; // 1 MiB
/* ========================================================================== */
/* Buffered writer based on ffmpeg-mux implementation */
struct io_header {
uint64_t seek_offset;
uint64_t data_length;
};
struct io_buffer {
bool active;
bool shutdown_requested;
bool output_error;
os_event_t *buffer_space_available_event;
os_event_t *new_data_available_event;
pthread_t io_thread;
pthread_mutex_t data_mutex;
FILE *output_file;
struct deque data;
uint64_t next_pos;
size_t buffer_size;
size_t chunk_size;
};
struct file_output_data {
struct dstr filename;
struct io_buffer io;
};
static void *io_thread(void *opaque)
{
struct file_output_data *out = opaque;
os_set_thread_name("buffered writer i/o thread");
// Chunk collects the writes into a larger batch
size_t chunk_used = 0;
size_t chunk_size = out->io.chunk_size;
unsigned char *chunk = bmalloc(chunk_size);
if (!chunk) {
os_atomic_set_bool(&out->io.output_error, true);
fprintf(stderr, "Error allocating memory for output\n");
goto error;
}
bool shutting_down;
bool want_seek = false;
bool force_flush_chunk = false;
// current_seek_position is a virtual position updated as we read from
// the buffer, if it becomes discontinuous due to a seek request we
// flush the chunk. next_seek_position is the actual offset we should
// seek to when we write the chunk.
uint64_t current_seek_position = 0;
uint64_t next_seek_position;
for (;;) {
// Wait for data to be written to the buffer
os_event_wait(out->io.new_data_available_event);
// Loop to write in chunk_size chunks
for (;;) {
pthread_mutex_lock(&out->io.data_mutex);
shutting_down = os_atomic_load_bool(
&out->io.shutdown_requested);
// Fetch as many writes as possible from the deque
// and fill up our local chunk. This may involve
// seeking, so take care of that as well.
for (;;) {
size_t available = out->io.data.size;
// Buffer is empty (now) or was already empty (we got
// woken up to exit)
if (!available)
break;
// Get seek offset and data size
struct io_header header;
deque_peek_front(&out->io.data, &header,
sizeof(header));
// Do we need to seek?
if (header.seek_offset !=
current_seek_position) {
// If there's already part of a chunk pending,
// flush it at the current offset. Similarly,
// if we already plan to seek, then seek.
if (chunk_used || want_seek) {
force_flush_chunk = true;
break;
}
// Mark that we need to seek and where to
want_seek = true;
next_seek_position = header.seek_offset;
// Update our virtual position
current_seek_position =
header.seek_offset;
}
// Make sure there's enough room for the data, if
// not then force a flush
if (header.data_length + chunk_used >
chunk_size) {
force_flush_chunk = true;
break;
}
// Remove header that we already read
deque_pop_front(&out->io.data, NULL,
sizeof(header));
// Copy from the buffer to our local chunk
deque_pop_front(&out->io.data,
chunk + chunk_used,
header.data_length);
// Update offsets
chunk_used += header.data_length;
current_seek_position += header.data_length;
}
// Signal that there is more room in the buffer
os_event_signal(out->io.buffer_space_available_event);
// Try to avoid lots of small writes unless this was the final
// data left in the buffer. The buffer might be entirely empty
// if we were woken up to exit.
if (!force_flush_chunk &&
(!chunk_used ||
(chunk_used < 65536 && !shutting_down))) {
os_event_reset(
out->io.new_data_available_event);
pthread_mutex_unlock(&out->io.data_mutex);
break;
}
pthread_mutex_unlock(&out->io.data_mutex);
// Seek if we need to
if (want_seek) {
os_fseeki64(out->io.output_file,
next_seek_position, SEEK_SET);
// Update the next virtual position, making sure to take
// into account the size of the chunk we're about to write.
current_seek_position =
next_seek_position + chunk_used;
want_seek = false;
// If we did a seek but do not have any data left to write
// return to the start of the loop.
if (!chunk_used) {
force_flush_chunk = false;
continue;
}
}
// Write the current chunk to the output file
size_t bytes_written = fwrite(chunk, 1, chunk_used,
out->io.output_file);
if (bytes_written != chunk_used) {
blog(LOG_ERROR,
"Error writing to '%s': %s (%zu != %zu)\n",
out->filename.array, strerror(errno),
bytes_written, chunk_used);
os_atomic_set_bool(&out->io.output_error, true);
goto error;
}
chunk_used = 0;
force_flush_chunk = false;
}
// If this was the last chunk, time to exit
if (shutting_down)
break;
}
error:
if (chunk)
bfree(chunk);
fclose(out->io.output_file);
return NULL;
}
/* ========================================================================== */
/* Serializer Implementation */
static int64_t file_output_seek(void *opaque, int64_t offset,
enum serialize_seek_type seek_type)
{
struct file_output_data *out = opaque;
// If the output thread failed, signal that back up the stack
if (os_atomic_load_bool(&out->io.output_error))
return -1;
// Update where the next write should go
pthread_mutex_lock(&out->io.data_mutex);
switch (seek_type) {
case SERIALIZE_SEEK_START:
out->io.next_pos = offset;
break;
case SERIALIZE_SEEK_CURRENT:
out->io.next_pos += offset;
break;
case SERIALIZE_SEEK_END:
out->io.next_pos -= offset;
break;
}
pthread_mutex_unlock(&out->io.data_mutex);
return (int64_t)out->io.next_pos;
}
#ifndef _WIN32
static inline size_t max(size_t a, size_t b)
{
return a > b ? a : b;
}
static inline size_t min(size_t a, size_t b)
{
return a < b ? a : b;
}
#endif
static size_t file_output_write(void *opaque, const void *buf, size_t buf_size)
{
struct file_output_data *out = opaque;
if (!buf_size)
return 0;
// Split writes into at chunks that are at most chunk_size bytes
uintptr_t ptr = (uintptr_t)buf;
size_t remaining = buf_size;
while (remaining) {
if (os_atomic_load_bool(&out->io.output_error))
return 0;
pthread_mutex_lock(&out->io.data_mutex);
size_t next_chunk_size = min(remaining, out->io.chunk_size);
// Avoid unbounded growth of the deque, cap to buffer_size
size_t cap = max(out->io.data.capacity, out->io.buffer_size);
size_t free_space = cap - out->io.data.size;
if (free_space < next_chunk_size + sizeof(struct io_header)) {
blog(LOG_DEBUG, "Waiting for I/O thread...");
// No space, wait for the I/O thread to make space
os_event_reset(out->io.buffer_space_available_event);
pthread_mutex_unlock(&out->io.data_mutex);
os_event_wait(out->io.buffer_space_available_event);
continue;
}
// Calculate how many chunks we can fit into the buffer
size_t num_chunks = free_space / (next_chunk_size +
sizeof(struct io_header));
while (remaining && num_chunks--) {
struct io_header header = {
.data_length = next_chunk_size,
.seek_offset = out->io.next_pos,
};
// Copy the data into the buffer
deque_push_back(&out->io.data, &header, sizeof(header));
deque_push_back(&out->io.data, (const void *)ptr,
next_chunk_size);
// Advance the next write position
out->io.next_pos += next_chunk_size;
// Update remainder and advance data pointer
remaining -= next_chunk_size;
ptr += next_chunk_size;
next_chunk_size = min(remaining, out->io.chunk_size);
}
// Tell the I/O thread that there's new data to be written
os_event_signal(out->io.new_data_available_event);
pthread_mutex_unlock(&out->io.data_mutex);
}
return buf_size - remaining;
}
static int64_t file_output_get_pos(void *opaque)
{
struct file_output_data *out = opaque;
// If thread failed return -1
if (os_atomic_load_bool(&out->io.output_error))
return -1;
return (int64_t)out->io.next_pos;
}
bool buffered_file_serializer_init_defaults(struct serializer *s,
const char *path)
{
return buffered_file_serializer_init(s, path, 0, 0);
}
bool buffered_file_serializer_init(struct serializer *s, const char *path,
size_t max_bufsize, size_t chunk_size)
{
struct file_output_data *out;
out = bzalloc(sizeof(*out));
dstr_init_copy(&out->filename, path);
out->io.output_file = os_fopen(path, "wb");
if (!out->io.output_file)
return false;
out->io.buffer_size = max_bufsize ? max_bufsize : DEFAULT_BUF_SIZE;
out->io.chunk_size = chunk_size ? chunk_size : DEFAULT_CHUNK_SIZE;
// Start at 1MB, this can grow up to max_bufsize depending
// on how fast data is going in and out.
deque_reserve(&out->io.data, 1048576);
pthread_mutex_init(&out->io.data_mutex, NULL);
os_event_init(&out->io.buffer_space_available_event,
OS_EVENT_TYPE_AUTO);
os_event_init(&out->io.new_data_available_event, OS_EVENT_TYPE_AUTO);
pthread_create(&out->io.io_thread, NULL, io_thread, out);
out->io.active = true;
s->data = out;
s->read = NULL;
s->write = file_output_write;
s->seek = file_output_seek;
s->get_pos = file_output_get_pos;
return true;
}
void buffered_file_serializer_free(struct serializer *s)
{
struct file_output_data *out = s->data;
if (!out)
return;
if (out->io.active) {
os_atomic_set_bool(&out->io.shutdown_requested, true);
// Wakes up the I/O thread and waits for it to finish
pthread_mutex_lock(&out->io.data_mutex);
os_event_signal(out->io.new_data_available_event);
pthread_mutex_unlock(&out->io.data_mutex);
pthread_join(out->io.io_thread, NULL);
os_event_destroy(out->io.new_data_available_event);
os_event_destroy(out->io.buffer_space_available_event);
pthread_mutex_destroy(&out->io.data_mutex);
blog(LOG_DEBUG, "Final buffer capacity: %zu KiB",
out->io.data.capacity / 1024);
deque_free(&out->io.data);
}
dstr_free(&out->filename);
bfree(out);
}

View file

@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 Dennis Sädtler <dennis@obsproject.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#pragma once
#include "serializer.h"
#ifdef __cplusplus
extern "C" {
#endif
EXPORT bool buffered_file_serializer_init_defaults(struct serializer *s,
const char *path);
EXPORT bool buffered_file_serializer_init(struct serializer *s,
const char *path, size_t max_bufsize,
size_t chunk_size);
EXPORT void buffered_file_serializer_free(struct serializer *s);
#ifdef __cplusplus
}
#endif