OPSI.web2.test.test_stream module¶
-
class
OPSI.web2.test.test_stream.AdapterTestCase(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase-
test_adapt()¶
-
-
class
OPSI.web2.test.test_stream.CompoundStreamTest¶ CompoundStream lets you combine many streams into one continuous stream. For example, let’s make a stream: >>> s = CompoundStream()
Then, add a couple streams: >>> s.addStream(MemoryStream(“Stream1”)) >>> s.addStream(MemoryStream(“Stream2”))
The length is the sum of all the streams: >>> s.length 14
We can read data from the stream: >>> str(s.read()) ‘Stream1’
After having read some data, length is now smaller, as you might expect: >>> s.length 7
So, continue reading... >>> str(s.read()) ‘Stream2’
Now that the stream is exhausted: >>> s.read() is None True >>> s.length 0
We can also create CompoundStream more easily like so: >>> s = CompoundStream([‘hello’, MemoryStream(‘ world’)]) >>> str(s.read()) ‘hello’ >>> str(s.read()) ‘ world’
For a more complicated example, let’s try reading from a file: >>> s = CompoundStream() >>> s.addStream(FileStream(open(sibpath(__file__, “stream_data.txt”)))) >>> s.addStream(“================”) >>> s.addStream(FileStream(open(sibpath(__file__, “stream_data.txt”))))
Again, the length is the sum: >>> int(s.length) 58
>>> str(s.read()) "We've got some text!\n" >>> str(s.read()) '================'
What if you close the stream? >>> s.close() >>> s.read() is None True >>> s.length 0
Error handling works using Deferreds: >>> m = MemoryStream(“after”) >>> s = CompoundStream([TestStreamer([defer.fail(ZeroDivisionError())]), m]) >>> l = []; x = s.read().addErrback(lambda _: l.append(1)) >>> l [1] >>> s.length 0 >>> m.length # streams after the failed one got closed 0
-
class
OPSI.web2.test.test_stream.FallbackSplitTest(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase-
test_closeboth()¶
-
test_closeboth_rev()¶
-
test_closeleft()¶
-
test_closeright()¶
-
test_split()¶
-
test_split2()¶
-
test_splitsplit()¶
-
-
class
OPSI.web2.test.test_stream.FileStreamTest(methodName='runTest')¶ Bases:
OPSI.web2.test.test_stream.SimpleStreamTests,twisted.trial._asynctest.TestCase-
makeStream(*args, **kw)¶
-
setUpClass()¶
-
test_close()¶
-
test_read2()¶
-
-
class
OPSI.web2.test.test_stream.MMapFileStreamTest(methodName='runTest')¶ Bases:
OPSI.web2.test.test_stream.SimpleStreamTests,twisted.trial._asynctest.TestCase-
makeStream(*args, **kw)¶
-
setUpClass()¶
-
test_mmapwrapper()¶
-
-
class
OPSI.web2.test.test_stream.MemoryStreamTest(methodName='runTest')¶ Bases:
OPSI.web2.test.test_stream.SimpleStreamTests,twisted.trial._asynctest.TestCase-
makeStream(*args, **kw)¶
-
test_close()¶
-
test_read2()¶
-
-
class
OPSI.web2.test.test_stream.ProcessStreamerTest(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase-
runCode(code, inputStream=None)¶
-
test_badexit()¶
-
test_errouput()¶
-
test_input()¶
-
test_inputerror()¶
-
test_output()¶
-
test_processclosedinput()¶
-
-
class
OPSI.web2.test.test_stream.ProducerStreamTestCase(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase-
test_failfinish()¶
-
-
class
OPSI.web2.test.test_stream.ReadStreamTestCase(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase-
test_processingException()¶
-
test_pull()¶
-
test_pullException()¶
-
test_pullFailure()¶
-
-
class
OPSI.web2.test.test_stream.TestBufferedStream(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase-
setUp()¶
-
test_read()¶ Make sure read() also functions. (note that this test uses an implementation detail of this particular stream. s.read() isn’t guaranteed to return self.data on all streams.)
-
test_readExactly()¶ Test readExactly with a number.
-
test_readExactlyBig()¶ Test readExactly with a number larger than the size of the datastream.
-
test_readline()¶ Test that readline reads a line.
-
test_readlineFinished()¶ Test readline on a finished stream.
-
test_readlineNegSize()¶ Ensure that readline with a negative size raises an exception.
-
test_readlineSizeInDelimiter()¶ Test behavior of readline when size falls inside the delimiter.
-
test_readlineWithBigSize()¶ Test the size argument when it’s bigger than the length of the line.
-
test_readlineWithSize()¶ Test the size argument to readline
-
test_readlineWithZero()¶ Test readline with size = 0.
-
-
class
OPSI.web2.test.test_stream.TestStreamer(list)¶ -
close()¶
-
closeCalled= 0¶
-
length= None¶
-
read()¶
-
readCalled= 0¶
-
-
class
OPSI.web2.test.test_stream.TestSubstream(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase-
setUp()¶
-
suckTheMarrow(s)¶
-
testEmptySubstream()¶
-
testEnd()¶
-
testNotStart()¶
-
testPastEnd()¶
-
testReverseStartEnd()¶
-
testStart()¶
-
-
OPSI.web2.test.test_stream.bufstr(data)¶