OPSI.web2.test.test_stream module¶
-
class
OPSI.web2.test.test_stream.
AdapterTestCase
(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase
-
test_adapt
()¶
-
-
class
OPSI.web2.test.test_stream.
CompoundStreamTest
¶ CompoundStream lets you combine many streams into one continuous stream. For example, let’s make a stream: >>> s = CompoundStream()
Then, add a couple streams: >>> s.addStream(MemoryStream(“Stream1”)) >>> s.addStream(MemoryStream(“Stream2”))
The length is the sum of all the streams: >>> s.length 14
We can read data from the stream: >>> str(s.read()) ‘Stream1’
After having read some data, length is now smaller, as you might expect: >>> s.length 7
So, continue reading... >>> str(s.read()) ‘Stream2’
Now that the stream is exhausted: >>> s.read() is None True >>> s.length 0
We can also create CompoundStream more easily like so: >>> s = CompoundStream([‘hello’, MemoryStream(‘ world’)]) >>> str(s.read()) ‘hello’ >>> str(s.read()) ‘ world’
For a more complicated example, let’s try reading from a file: >>> s = CompoundStream() >>> s.addStream(FileStream(open(sibpath(__file__, “stream_data.txt”)))) >>> s.addStream(“================”) >>> s.addStream(FileStream(open(sibpath(__file__, “stream_data.txt”))))
Again, the length is the sum: >>> int(s.length) 58
>>> str(s.read()) "We've got some text!\n" >>> str(s.read()) '================'
What if you close the stream? >>> s.close() >>> s.read() is None True >>> s.length 0
Error handling works using Deferreds: >>> m = MemoryStream(“after”) >>> s = CompoundStream([TestStreamer([defer.fail(ZeroDivisionError())]), m]) >>> l = []; x = s.read().addErrback(lambda _: l.append(1)) >>> l [1] >>> s.length 0 >>> m.length # streams after the failed one got closed 0
-
class
OPSI.web2.test.test_stream.
FallbackSplitTest
(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase
-
test_closeboth
()¶
-
test_closeboth_rev
()¶
-
test_closeleft
()¶
-
test_closeright
()¶
-
test_split
()¶
-
test_split2
()¶
-
test_splitsplit
()¶
-
-
class
OPSI.web2.test.test_stream.
FileStreamTest
(methodName='runTest')¶ Bases:
OPSI.web2.test.test_stream.SimpleStreamTests
,twisted.trial._asynctest.TestCase
-
makeStream
(*args, **kw)¶
-
setUpClass
()¶
-
test_close
()¶
-
test_read2
()¶
-
-
class
OPSI.web2.test.test_stream.
MMapFileStreamTest
(methodName='runTest')¶ Bases:
OPSI.web2.test.test_stream.SimpleStreamTests
,twisted.trial._asynctest.TestCase
-
makeStream
(*args, **kw)¶
-
setUpClass
()¶
-
test_mmapwrapper
()¶
-
-
class
OPSI.web2.test.test_stream.
MemoryStreamTest
(methodName='runTest')¶ Bases:
OPSI.web2.test.test_stream.SimpleStreamTests
,twisted.trial._asynctest.TestCase
-
makeStream
(*args, **kw)¶
-
test_close
()¶
-
test_read2
()¶
-
-
class
OPSI.web2.test.test_stream.
ProcessStreamerTest
(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase
-
runCode
(code, inputStream=None)¶
-
test_badexit
()¶
-
test_errouput
()¶
-
test_input
()¶
-
test_inputerror
()¶
-
test_output
()¶
-
test_processclosedinput
()¶
-
-
class
OPSI.web2.test.test_stream.
ProducerStreamTestCase
(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase
-
test_failfinish
()¶
-
-
class
OPSI.web2.test.test_stream.
ReadStreamTestCase
(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase
-
test_processingException
()¶
-
test_pull
()¶
-
test_pullException
()¶
-
test_pullFailure
()¶
-
-
class
OPSI.web2.test.test_stream.
TestBufferedStream
(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase
-
setUp
()¶
-
test_read
()¶ Make sure read() also functions. (note that this test uses an implementation detail of this particular stream. s.read() isn’t guaranteed to return self.data on all streams.)
-
test_readExactly
()¶ Test readExactly with a number.
-
test_readExactlyBig
()¶ Test readExactly with a number larger than the size of the datastream.
-
test_readline
()¶ Test that readline reads a line.
-
test_readlineFinished
()¶ Test readline on a finished stream.
-
test_readlineNegSize
()¶ Ensure that readline with a negative size raises an exception.
-
test_readlineSizeInDelimiter
()¶ Test behavior of readline when size falls inside the delimiter.
-
test_readlineWithBigSize
()¶ Test the size argument when it’s bigger than the length of the line.
-
test_readlineWithSize
()¶ Test the size argument to readline
-
test_readlineWithZero
()¶ Test readline with size = 0.
-
-
class
OPSI.web2.test.test_stream.
TestStreamer
(list)¶ -
close
()¶
-
closeCalled
= 0¶
-
length
= None¶
-
read
()¶
-
readCalled
= 0¶
-
-
class
OPSI.web2.test.test_stream.
TestSubstream
(methodName='runTest')¶ Bases:
twisted.trial._asynctest.TestCase
-
setUp
()¶
-
suckTheMarrow
(s)¶
-
testEmptySubstream
()¶
-
testEnd
()¶
-
testNotStart
()¶
-
testPastEnd
()¶
-
testReverseStartEnd
()¶
-
testStart
()¶
-
-
OPSI.web2.test.test_stream.
bufstr
(data)¶