OPSI.web2.test.test_stream module

class OPSI.web2.test.test_stream.AdapterTestCase(methodName='runTest')

Bases: twisted.trial._asynctest.TestCase

test_adapt()
class OPSI.web2.test.test_stream.CompoundStreamTest

CompoundStream lets you combine many streams into one continuous stream. For example, let’s make a stream: >>> s = CompoundStream()

Then, add a couple streams: >>> s.addStream(MemoryStream(“Stream1”)) >>> s.addStream(MemoryStream(“Stream2”))

The length is the sum of all the streams: >>> s.length 14

We can read data from the stream: >>> str(s.read()) ‘Stream1’

After having read some data, length is now smaller, as you might expect: >>> s.length 7

So, continue reading... >>> str(s.read()) ‘Stream2’

Now that the stream is exhausted: >>> s.read() is None True >>> s.length 0

We can also create CompoundStream more easily like so: >>> s = CompoundStream([‘hello’, MemoryStream(‘ world’)]) >>> str(s.read()) ‘hello’ >>> str(s.read()) ‘ world’

For a more complicated example, let’s try reading from a file: >>> s = CompoundStream() >>> s.addStream(FileStream(open(sibpath(__file__, “stream_data.txt”)))) >>> s.addStream(“================”) >>> s.addStream(FileStream(open(sibpath(__file__, “stream_data.txt”))))

Again, the length is the sum: >>> int(s.length) 58

>>> str(s.read())
"We've got some text!\n"
>>> str(s.read())
'================'

What if you close the stream? >>> s.close() >>> s.read() is None True >>> s.length 0

Error handling works using Deferreds: >>> m = MemoryStream(“after”) >>> s = CompoundStream([TestStreamer([defer.fail(ZeroDivisionError())]), m]) >>> l = []; x = s.read().addErrback(lambda _: l.append(1)) >>> l [1] >>> s.length 0 >>> m.length # streams after the failed one got closed 0

class OPSI.web2.test.test_stream.FallbackSplitTest(methodName='runTest')

Bases: twisted.trial._asynctest.TestCase

test_closeboth()
test_closeboth_rev()
test_closeleft()
test_closeright()
test_split()
test_split2()
test_splitsplit()
class OPSI.web2.test.test_stream.FileStreamTest(methodName='runTest')

Bases: OPSI.web2.test.test_stream.SimpleStreamTests, twisted.trial._asynctest.TestCase

makeStream(*args, **kw)
setUpClass()
test_close()
test_read2()
class OPSI.web2.test.test_stream.MMapFileStreamTest(methodName='runTest')

Bases: OPSI.web2.test.test_stream.SimpleStreamTests, twisted.trial._asynctest.TestCase

makeStream(*args, **kw)
setUpClass()
test_mmapwrapper()
class OPSI.web2.test.test_stream.MemoryStreamTest(methodName='runTest')

Bases: OPSI.web2.test.test_stream.SimpleStreamTests, twisted.trial._asynctest.TestCase

makeStream(*args, **kw)
test_close()
test_read2()
class OPSI.web2.test.test_stream.ProcessStreamerTest(methodName='runTest')

Bases: twisted.trial._asynctest.TestCase

runCode(code, inputStream=None)
test_badexit()
test_errouput()
test_input()
test_inputerror()
test_output()
test_processclosedinput()
class OPSI.web2.test.test_stream.ProducerStreamTestCase(methodName='runTest')

Bases: twisted.trial._asynctest.TestCase

test_failfinish()
class OPSI.web2.test.test_stream.ReadStreamTestCase(methodName='runTest')

Bases: twisted.trial._asynctest.TestCase

test_processingException()
test_pull()
test_pullException()
test_pullFailure()
class OPSI.web2.test.test_stream.SimpleStreamTests
test_read()
test_split()
text = '1234567890'
class OPSI.web2.test.test_stream.TestBufferedStream(methodName='runTest')

Bases: twisted.trial._asynctest.TestCase

setUp()
test_read()

Make sure read() also functions. (note that this test uses an implementation detail of this particular stream. s.read() isn’t guaranteed to return self.data on all streams.)

test_readExactly()

Test readExactly with a number.

test_readExactlyBig()

Test readExactly with a number larger than the size of the datastream.

test_readline()

Test that readline reads a line.

test_readlineFinished()

Test readline on a finished stream.

test_readlineNegSize()

Ensure that readline with a negative size raises an exception.

test_readlineSizeInDelimiter()

Test behavior of readline when size falls inside the delimiter.

test_readlineWithBigSize()

Test the size argument when it’s bigger than the length of the line.

test_readlineWithSize()

Test the size argument to readline

test_readlineWithZero()

Test readline with size = 0.

class OPSI.web2.test.test_stream.TestStreamer(list)
close()
closeCalled = 0
length = None
read()
readCalled = 0
class OPSI.web2.test.test_stream.TestSubstream(methodName='runTest')

Bases: twisted.trial._asynctest.TestCase

setUp()
suckTheMarrow(s)
testEmptySubstream()
testEnd()
testNotStart()
testPastEnd()
testReverseStartEnd()
testStart()
OPSI.web2.test.test_stream.bufstr(data)