This commit is contained in:
Erik Johnston 2024-06-17 18:07:39 +01:00
parent 1a3a6b63ef
commit cdbe676bc4

View file

@ -400,6 +400,10 @@ class ReadableFileWrapper:
@implementer(interfaces.IConsumer)
@implementer(interfaces.IPushProducer)
class MultipartFileConsumer:
"""Wraps a given consumer so that any data that gets written to it gets
converted to a multipart format.
"""
def __init__(
self,
clock: Clock,
@ -415,11 +419,16 @@ class MultipartFileConsumer:
self.file_content_type = file_content_type
self.boundary = uuid4().hex.encode("ascii")
# The producer that registered with us, and if its a push or pull
# producer.
self.producer: Optional["interfaces.IProducer"] = None
self.streaming = Optional[None]
# Whether the wrapped consumer has asked us to pause.
self.paused = False
### IConsumer APIs ###
def registerProducer(
self, producer: "interfaces.IProducer", streaming: bool
) -> None:
@ -450,8 +459,6 @@ class MultipartFileConsumer:
self.wrapped_consumer.registerProducer(self, True)
run_in_background(self._resumeProducingRepeatedly)
def unregisterProducer(self) -> None:
"""
Stop consuming data from a producer, without disconnecting.
@ -491,6 +498,8 @@ class MultipartFileConsumer:
self.wrapped_consumer.write(data)
### IPushProducer APIs ###
def stopProducing(self) -> None:
"""
Stop producing data.
@ -530,10 +539,13 @@ class MultipartFileConsumer:
if self.streaming:
cast("interfaces.IPushProducer", self.producer).resumeProducing()
return
else:
# If the producer is not a streaming producer we need to start
# repeatedly calling `resumeProducing` in a loop.
run_in_background(self._resumeProducingRepeatedly)
### Internal APIs. ###
async def _resumeProducingRepeatedly(self) -> None:
assert self.producer is not None
assert not self.streaming