Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(261493)

Delta Between Two Patch Sets: Lib/test/test_telnetlib.py

Issue 16510: Using appropriate checks in tests
Left Patch Set: Created 5 years, 11 months ago
Right Patch Set: Created 5 years, 6 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/test/test_sys.py ('k') | Lib/test/test_tempfile.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 import socket 1 import socket
2 import select 2 import selectors
3 import telnetlib 3 import telnetlib
4 import time 4 import time
5 import contextlib 5 import contextlib
6 6
7 import unittest
8 from unittest import TestCase 7 from unittest import TestCase
9 from test import support 8 from test import support
10 threading = support.import_module('threading') 9 threading = support.import_module('threading')
11 10
12 HOST = support.HOST 11 HOST = support.HOST
13 12
14 def server(evt, serv): 13 def server(evt, serv):
15 serv.listen(5) 14 serv.listen(5)
16 evt.set() 15 evt.set()
17 try: 16 try:
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
66 65
67 def testTimeoutValue(self): 66 def testTimeoutValue(self):
68 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 67 telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
69 self.assertEqual(telnet.sock.gettimeout(), 30) 68 self.assertEqual(telnet.sock.gettimeout(), 30)
70 telnet.sock.close() 69 telnet.sock.close()
71 70
72 def testTimeoutOpen(self): 71 def testTimeoutOpen(self):
73 telnet = telnetlib.Telnet() 72 telnet = telnetlib.Telnet()
74 telnet.open(HOST, self.port, timeout=30) 73 telnet.open(HOST, self.port, timeout=30)
75 self.assertEqual(telnet.sock.gettimeout(), 30) 74 self.assertEqual(telnet.sock.gettimeout(), 30)
75 telnet.sock.close()
76
77 def testGetters(self):
78 # Test telnet getter methods
79 telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
80 t_sock = telnet.sock
81 self.assertEqual(telnet.get_socket(), t_sock)
82 self.assertEqual(telnet.fileno(), t_sock.fileno())
76 telnet.sock.close() 83 telnet.sock.close()
77 84
78 class SocketStub(object): 85 class SocketStub(object):
79 ''' a socket proxy that re-defines sendall() ''' 86 ''' a socket proxy that re-defines sendall() '''
80 def __init__(self, reads=()): 87 def __init__(self, reads=()):
81 self.reads = list(reads) # Intentionally make a copy. 88 self.reads = list(reads) # Intentionally make a copy.
82 self.writes = [] 89 self.writes = []
83 self.block = False 90 self.block = False
84 def sendall(self, data): 91 def sendall(self, data):
85 self.writes.append(data) 92 self.writes.append(data)
(...skipping 11 matching lines...) Expand all
97 raise NotImplementedError() 104 raise NotImplementedError()
98 def close(self): pass 105 def close(self): pass
99 def sock_avail(self): 106 def sock_avail(self):
100 return (not self.sock.block) 107 return (not self.sock.block)
101 def msg(self, msg, *args): 108 def msg(self, msg, *args):
102 with support.captured_stdout() as out: 109 with support.captured_stdout() as out:
103 telnetlib.Telnet.msg(self, msg, *args) 110 telnetlib.Telnet.msg(self, msg, *args)
104 self._messages += out.getvalue() 111 self._messages += out.getvalue()
105 return 112 return
106 113
107 def mock_select(*s_args): 114 class MockSelector(selectors.BaseSelector):
108 block = False
109 for l in s_args:
110 for fob in l:
111 if isinstance(fob, TelnetAlike):
112 block = fob.sock.block
113 if block:
114 return [[], [], []]
115 else:
116 return s_args
117
118 class MockPoller(object):
119 test_case = None # Set during TestCase setUp.
120 115
121 def __init__(self): 116 def __init__(self):
122 self._file_objs = [] 117 self.keys = {}
123 118
124 def register(self, fd, eventmask): 119 @property
125 self.test_case.assertTrue(hasattr(fd, 'fileno'), fd) 120 def resolution(self):
126 self.test_case.assertEqual(eventmask, select.POLLIN|select.POLLPRI) 121 return 1e-3
127 self._file_objs.append(fd) 122
128 123 def register(self, fileobj, events, data=None):
129 def poll(self, timeout=None): 124 key = selectors.SelectorKey(fileobj, 0, events, data)
125 self.keys[fileobj] = key
126 return key
127
128 def unregister(self, fileobj):
129 return self.keys.pop(fileobj)
130
131 def select(self, timeout=None):
130 block = False 132 block = False
131 for fob in self._file_objs: 133 for fileobj in self.keys:
132 if isinstance(fob, TelnetAlike): 134 if isinstance(fileobj, TelnetAlike):
133 block = fob.sock.block 135 block = fileobj.sock.block
136 break
134 if block: 137 if block:
135 return [] 138 return []
136 else: 139 else:
137 return zip(self._file_objs, [select.POLLIN]*len(self._file_objs)) 140 return [(key, key.events) for key in self.keys.values()]
138 141
139 def unregister(self, fd): 142 def get_map(self):
140 self._file_objs.remove(fd) 143 return self.keys
144
141 145
142 @contextlib.contextmanager 146 @contextlib.contextmanager
143 def test_socket(reads): 147 def test_socket(reads):
144 def new_conn(*ignored): 148 def new_conn(*ignored):
145 return SocketStub(reads) 149 return SocketStub(reads)
146 try: 150 try:
147 old_conn = socket.create_connection 151 old_conn = socket.create_connection
148 socket.create_connection = new_conn 152 socket.create_connection = new_conn
149 yield None 153 yield None
150 finally: 154 finally:
151 socket.create_connection = old_conn 155 socket.create_connection = old_conn
152 return 156 return
153 157
154 def test_telnet(reads=(), cls=TelnetAlike, use_poll=None): 158 def test_telnet(reads=(), cls=TelnetAlike):
155 ''' return a telnetlib.Telnet object that uses a SocketStub with 159 ''' return a telnetlib.Telnet object that uses a SocketStub with
156 reads queued up to be read ''' 160 reads queued up to be read '''
157 for x in reads: 161 for x in reads:
158 assert type(x) is bytes, x 162 assert type(x) is bytes, x
159 with test_socket(reads): 163 with test_socket(reads):
160 telnet = cls('dummy', 0) 164 telnet = cls('dummy', 0)
161 telnet._messages = '' # debuglevel output 165 telnet._messages = '' # debuglevel output
162 if use_poll is not None:
163 if use_poll and not telnet._has_poll:
164 raise unittest.SkipTest('select.poll() required.')
165 telnet._has_poll = use_poll
166 return telnet 166 return telnet
167
168 167
169 class ExpectAndReadTestCase(TestCase): 168 class ExpectAndReadTestCase(TestCase):
170 def setUp(self): 169 def setUp(self):
171 self.old_select = select.select 170 self.old_selector = telnetlib._TelnetSelector
172 select.select = mock_select 171 telnetlib._TelnetSelector = MockSelector
173 self.old_poll = False
174 if hasattr(select, 'poll'):
175 self.old_poll = select.poll
176 select.poll = MockPoller
177 MockPoller.test_case = self
178
179 def tearDown(self): 172 def tearDown(self):
180 if self.old_poll: 173 telnetlib._TelnetSelector = self.old_selector
181 MockPoller.test_case = None
182 select.poll = self.old_poll
183 select.select = self.old_select
184
185 174
186 class ReadTests(ExpectAndReadTestCase): 175 class ReadTests(ExpectAndReadTestCase):
187 def test_read_until(self): 176 def test_read_until(self):
188 """ 177 """
189 read_until(expected, timeout=None) 178 read_until(expected, timeout=None)
190 test the blocking version of read_util 179 test the blocking version of read_util
191 """ 180 """
192 want = [b'xxxmatchyyy'] 181 want = [b'xxxmatchyyy']
193 telnet = test_telnet(want) 182 telnet = test_telnet(want)
194 data = telnet.read_until(b'match') 183 data = telnet.read_until(b'match')
195 self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, te lnet.sock.reads)) 184 self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, te lnet.sock.reads))
196 185
197 reads = [b'x' * 50, b'match', b'y' * 50] 186 reads = [b'x' * 50, b'match', b'y' * 50]
198 expect = b''.join(reads[:-1]) 187 expect = b''.join(reads[:-1])
199 telnet = test_telnet(reads) 188 telnet = test_telnet(reads)
200 data = telnet.read_until(b'match') 189 data = telnet.read_until(b'match')
201 self.assertEqual(data, expect) 190 self.assertEqual(data, expect)
202 191
203 def test_read_until_with_poll(self):
204 """Use select.poll() to implement telnet.read_until()."""
205 want = [b'x' * 10, b'match', b'y' * 10]
206 telnet = test_telnet(want, use_poll=True)
207 select.select = lambda *_: self.fail('unexpected select() call.')
208 data = telnet.read_until(b'match')
209 self.assertEqual(data, b''.join(want[:-1]))
210
211 def test_read_until_with_select(self):
212 """Use select.select() to implement telnet.read_until()."""
213 want = [b'x' * 10, b'match', b'y' * 10]
214 telnet = test_telnet(want, use_poll=False)
215 if self.old_poll:
216 select.poll = lambda *_: self.fail('unexpected poll() call.')
217 data = telnet.read_until(b'match')
218 self.assertEqual(data, b''.join(want[:-1]))
219 192
220 def test_read_all(self): 193 def test_read_all(self):
221 """ 194 """
222 read_all() 195 read_all()
223 Read all data until EOF; may block. 196 Read all data until EOF; may block.
224 """ 197 """
225 reads = [b'x' * 500, b'y' * 500, b'z' * 500] 198 reads = [b'x' * 500, b'y' * 500, b'z' * 500]
226 expect = b''.join(reads) 199 expect = b''.join(reads)
227 telnet = test_telnet(reads) 200 telnet = test_telnet(reads)
228 data = telnet.read_all() 201 data = telnet.read_all()
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after
412 """ 385 """
413 expect(expected, [timeout]) 386 expect(expected, [timeout])
414 Read until the expected string has been seen, or a timeout is 387 Read until the expected string has been seen, or a timeout is
415 hit (default is no timeout); may block. 388 hit (default is no timeout); may block.
416 """ 389 """
417 want = [b'x' * 10, b'match', b'y' * 10] 390 want = [b'x' * 10, b'match', b'y' * 10]
418 telnet = test_telnet(want) 391 telnet = test_telnet(want)
419 (_,_,data) = telnet.expect([b'match']) 392 (_,_,data) = telnet.expect([b'match'])
420 self.assertEqual(data, b''.join(want[:-1])) 393 self.assertEqual(data, b''.join(want[:-1]))
421 394
422 def test_expect_with_poll(self):
423 """Use select.poll() to implement telnet.expect()."""
424 want = [b'x' * 10, b'match', b'y' * 10]
425 telnet = test_telnet(want, use_poll=True)
426 select.select = lambda *_: self.fail('unexpected select() call.')
427 (_,_,data) = telnet.expect([b'match'])
428 self.assertEqual(data, b''.join(want[:-1]))
429
430 def test_expect_with_select(self):
431 """Use select.select() to implement telnet.expect()."""
432 want = [b'x' * 10, b'match', b'y' * 10]
433 telnet = test_telnet(want, use_poll=False)
434 if self.old_poll:
435 select.poll = lambda *_: self.fail('unexpected poll() call.')
436 (_,_,data) = telnet.expect([b'match'])
437 self.assertEqual(data, b''.join(want[:-1]))
438
439 395
440 def test_main(verbose=None): 396 def test_main(verbose=None):
441 support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests, 397 support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests,
442 ExpectTests) 398 ExpectTests)
443 399
444 if __name__ == '__main__': 400 if __name__ == '__main__':
445 test_main() 401 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+