OPSI.web2.stream module

The stream module provides a simple abstraction of streaming data. While Twisted already has some provisions for handling this in its Producer/Consumer model, the rather complex interactions between producer and consumer makes it difficult to implement something like the CompoundStream object. Thus, this API.

The IStream interface is very simple. It consists of two methods: read, and close. The read method should either return some data, None if there is no data left to read, or a Deferred. Close frees up any underlying resources and causes read to return None forevermore.

IByteStream adds a bit more to the API: 1) read is required to return objects conforming to the buffer interface. 2) .length, which may either an integer number of bytes remaining, or None if unknown 3) .split(position). Split takes a position, and splits the stream in two pieces, returning the two new streams. Using the original stream after calling split is not allowed.

There are two builtin source stream classes: FileStream and MemoryStream. The first produces data from a file object, the second from a buffer in memory. Any number of these can be combined into one stream with the CompoundStream object. Then, to interface with other parts of Twisted, there are two transcievers: StreamProducer and ProducerStream. The first takes a stream and turns it into an IPushProducer, which will write to a consumer. The second is a consumer which is a stream, so that other producers can write to it.

class OPSI.web2.stream.FileStream(f, start=0, length=None, useMMap=True)

Bases: OPSI.web2.stream.SimpleStream

CHUNK_SIZE = 65504
close()
f = None
read(sendfile=False)
class OPSI.web2.stream.MemoryStream(mem, start=0, length=None)

Bases: OPSI.web2.stream.SimpleStream

A stream that reads data from a buffer object.

close()
read()
class OPSI.web2.stream.CompoundStream(buckets=())

Bases: object

A stream which is composed of many other streams.

Call addStream to add substreams.

addStream(bucket)

Add a stream to the output

close()
deferred = None
length = 0
read(sendfile=False)
split(point)
OPSI.web2.stream.readAndDiscard(stream)

Read all the data from the given stream, and throw it out.

Returns Deferred which will be triggered on finish.

OPSI.web2.stream.fallbackSplit(stream, point)
class OPSI.web2.stream.ProducerStream(length=None)

Bases: object

Turns producers into a IByteStream. Thus, implements IConsumer and IByteStream.

bufferSize = 5
close()

Called by reader of stream when it is done reading.

closed = False
deferred = None
failed = False
finish(failure=None)

Called by producer when it is done.

If the optional failure argument is passed a Failure instance, the stream will return it as errback on next Deferred.

length = None
producer = None
producerPaused = False
read()
registerProducer(producer, streaming)
split(point)
unregisterProducer()
write(data)
class OPSI.web2.stream.StreamProducer(stream, enforceStr=True)

Bases: object

A push producer which gets its data by reading a stream.

beginProducing(consumer)
consumer = None
deferred = None
finishedCallback = None
pauseProducing()
paused = False
resumeProducing()
stopProducing(failure=ConnectionLost())
class OPSI.web2.stream.BufferedStream(stream)

Bases: object

A stream which buffers its data to provide operations like readline and readExactly.

data = ''
length
pushback(pushed)

Push data back into the buffer.

read()
readExactly(size=None)

Read exactly size bytes of data, or, if size is None, read the entire stream into a string.

readline(delimiter='\r\n', size=None)

Read a line of data from the string, bounded by delimiter. The delimiter is included in the return value.

If size is specified, read and return at most that many bytes, even if the delimiter has not yet been reached. If the size limit falls within a delimiter, the rest of the delimiter, and the next line will be returned together.

split(offset)
OPSI.web2.stream.readStream(stream, gotDataCallback)

Pass a stream’s data to a callback.

Returns Deferred which will be triggered on finish. Errors in reading the stream or in processing it will be returned via this Deferred.

class OPSI.web2.stream.ProcessStreamer(inputStream, program, args, env={})

Bases: object

Runs a process hooked up to streams.

Requires an input stream, has attributes ‘outStream’ and ‘errStream’ for stdout and stderr.

outStream and errStream are public attributes providing streams for stdout and stderr of the process.

run()

Run the process.

Returns Deferred which will eventually have errback for non-clean (exit code > 0) exit, with ProcessTerminated, or callback with None on exit code 0.

OPSI.web2.stream.readIntoFile(stream, outFile)

Read a stream and write it into a file.

Returns Deferred which will be triggered on finish.

OPSI.web2.stream.generatorToStream(fun)

Converts a generator function into a stream.

The function should take an iterator as its first argument, which will be converted from a stream by this wrapper, and yield items which are turned into the results from the stream’s ‘read’ call.

One important point: before every call to input.next(), you MUST do a “yield input.wait” first. Yielding this magic value takes care of ensuring that the input is not a deferred before you see it.

>>> from OPSI.web2 import stream
>>> from string import maketrans
>>> alphabet = 'abcdefghijklmnopqrstuvwxyz'
>>>
>>> def encrypt(input, key):
...     code = alphabet[key:] + alphabet[:key]
...     translator = maketrans(alphabet+alphabet.upper(), code+code.upper())
...     yield input.wait
...     for s in input:
...         yield str(s).translate(translator)
...         yield input.wait
...
>>> encrypt = stream.generatorToStream(encrypt)
>>>
>>> plaintextStream = stream.MemoryStream('SampleSampleSample')
>>> encryptedStream = encrypt(plaintextStream, 13)
>>> encryptedStream.read()
'FnzcyrFnzcyrFnzcyr'
>>>
>>> plaintextStream = stream.MemoryStream('SampleSampleSample')
>>> encryptedStream = encrypt(plaintextStream, 13)
>>> evenMoreEncryptedStream = encrypt(encryptedStream, 13)
>>> evenMoreEncryptedStream.read()
'SampleSampleSample'