diff -r 27adb952813b Lib/aifc.py --- a/Lib/aifc.py Sun Nov 13 04:11:37 2011 +0100 +++ b/Lib/aifc.py Sun Nov 20 18:20:12 2011 +0400 @@ -162,6 +162,12 @@ except struct.error: raise EOFError +def _read_ushort(file): + try: + return struct.unpack('>H', file.read(2))[0] + except struct.error: + raise EOFError + def _read_string(file): length = ord(file.read(1)) if length == 0: @@ -194,13 +200,19 @@ def _write_short(f, x): f.write(struct.pack('>h', x)) +def _write_ushort(f, x): + f.write(struct.pack('>H', x)) + def _write_long(f, x): + f.write(struct.pack('>l', x)) + +def _write_ulong(f, x): f.write(struct.pack('>L', x)) def _write_string(f, s): if len(s) > 255: raise ValueError("string exceeds maximum pstring length") - f.write(struct.pack('b', len(s))) + f.write(struct.pack('B', len(s))) f.write(s) if len(s) & 1 == 0: f.write(b'\x00') @@ -218,7 +230,7 @@ lomant = 0 else: fmant, expon = math.frexp(x) - if expon > 16384 or fmant >= 1: # Infinity or NaN + if expon > 16384 or fmant >= 1 or fmant != fmant: # Infinity or NaN expon = sign|0x7FFF himant = 0 lomant = 0 @@ -234,9 +246,9 @@ fmant = math.ldexp(fmant - fsmant, 32) fsmant = math.floor(fmant) lomant = int(fsmant) - _write_short(f, expon) - _write_long(f, himant) - _write_long(f, lomant) + _write_ushort(f, expon) + _write_ulong(f, himant) + _write_ulong(f, lomant) from chunk import Chunk @@ -274,7 +286,8 @@ # _ssnd_seek_needed -- 1 iff positioned correctly in audio # file for readframes() # _ssnd_chunk -- instantiation of a chunk class for the SSND chunk - # _framesize -- size of one frame in the file + # _framesize -- size in bits of one frame in the file: G722 uses + # 4 bits per sample, which can result in single frame size < 1 byte def initfp(self, file): self._version = 0 @@ -384,13 +397,13 @@ if self._ssnd_seek_needed: self._ssnd_chunk.seek(0) dummy = self._ssnd_chunk.read(8) - pos = self._soundpos * self._framesize + pos = (self._soundpos * self._framesize) // 8 if pos: self._ssnd_chunk.seek(pos + 8) self._ssnd_seek_needed = 0 if nframes == 0: return b'' - data = self._ssnd_chunk.read(nframes * self._framesize) + data = self._ssnd_chunk.read((nframes * self._framesize) // 8) if self._convert and data: data = self._convert(data) self._soundpos = self._soundpos + len(data) // (self._nchannels @@ -422,7 +435,7 @@ self._nframes = _read_long(chunk) self._sampwidth = (_read_short(chunk) + 7) // 8 self._framerate = int(_read_float(chunk)) - self._framesize = self._nchannels * self._sampwidth + self._framesize = self._nchannels * self._sampwidth * 8 if self._aifc: #DEBUG: SGI's soundeditor produces a bad size :-( kludge = 0 @@ -539,8 +552,7 @@ self._aifc = 1 # AIFF-C is default def __del__(self): - if self._file: - self.close() + self.close() # # User visible methods. @@ -643,8 +655,8 @@ raise Error('marker ID must be > 0') if pos < 0: raise Error('marker position must be >= 0') - if not isinstance(name, str): - raise Error('marker name must be a string') + if not isinstance(name, bytes): + raise Error('marker name must be bytes') for i in range(len(self._markers)): if id == self._markers[i][0]: self._markers[i] = id, pos, name @@ -681,19 +693,21 @@ self._patchheader() def close(self): - self._ensure_header_written(0) - if self._datawritten & 1: - # quick pad to even size - self._file.write(b'\x00') - self._datawritten = self._datawritten + 1 - self._writemarkers() - if self._nframeswritten != self._nframes or \ - self._datalength != self._datawritten or \ - self._marklength: - self._patchheader() - # Prevent ref cycles - self._convert = None - self._file.close() + if self._file: + self._ensure_header_written(0) + if self._datawritten & 1: + # quick pad to even size + self._file.write(b'\x00') + self._datawritten = self._datawritten + 1 + self._writemarkers() + if self._nframeswritten != self._nframes or \ + self._datalength != self._datawritten or \ + self._marklength: + self._patchheader() + # Prevent ref cycles + self._convert = None + self._file.close() + self._file = None # # Internal methods. @@ -716,18 +730,12 @@ def _ensure_header_written(self, datasize): if not self._nframeswritten: - if self._comptype in (b'ULAW', b'ALAW'): + if self._comptype in (b'ULAW', b'ulaw', b'ALAW', b'alaw', b'G722'): if not self._sampwidth: self._sampwidth = 2 if self._sampwidth != 2: raise Error('sample width must be 2 when compressing ' - 'with ulaw/ULAW or alaw/ALAW') - if self._comptype == b'G722': - if not self._sampwidth: - self._sampwidth = 2 - if self._sampwidth != 2: - raise Error('sample width must be 2 when compressing ' - 'with G7.22 (ADPCM)') + 'with ulaw/ULAW, alaw/ALAW or G7.22 (ADPCM)') if not self._nchannels: raise Error('# channels not specified') if not self._sampwidth: @@ -743,8 +751,6 @@ self._convert = self._lin2ulaw elif self._comptype in (b'alaw', b'ALAW'): self._convert = self._lin2alaw - else: - raise Error('unsupported compression type') def _write_header(self, initlength): if self._aifc and self._comptype != b'NONE': @@ -769,15 +775,15 @@ if self._aifc: self._file.write(b'AIFC') self._file.write(b'FVER') - _write_long(self._file, 4) - _write_long(self._file, self._version) + _write_ulong(self._file, 4) + _write_ulong(self._file, self._version) else: self._file.write(b'AIFF') self._file.write(b'COMM') - _write_long(self._file, commlength) + _write_ulong(self._file, commlength) _write_short(self._file, self._nchannels) self._nframes_pos = self._file.tell() - _write_long(self._file, self._nframes) + _write_ulong(self._file, self._nframes) _write_short(self._file, self._sampwidth * 8) _write_float(self._file, self._framerate) if self._aifc: @@ -785,9 +791,9 @@ _write_string(self._file, self._compname) self._file.write(b'SSND') self._ssnd_length_pos = self._file.tell() - _write_long(self._file, self._datalength + 8) - _write_long(self._file, 0) - _write_long(self._file, 0) + _write_ulong(self._file, self._datalength + 8) + _write_ulong(self._file, 0) + _write_ulong(self._file, 0) def _write_form_length(self, datalength): if self._aifc: @@ -798,8 +804,8 @@ else: commlength = 18 verslength = 0 - _write_long(self._file, 4 + verslength + self._marklength + \ - 8 + commlength + 16 + datalength) + _write_ulong(self._file, 4 + verslength + self._marklength + \ + 8 + commlength + 16 + datalength) return commlength def _patchheader(self): @@ -817,9 +823,9 @@ self._file.seek(self._form_length_pos, 0) dummy = self._write_form_length(datalength) self._file.seek(self._nframes_pos, 0) - _write_long(self._file, self._nframeswritten) + _write_ulong(self._file, self._nframeswritten) self._file.seek(self._ssnd_length_pos, 0) - _write_long(self._file, datalength + 8) + _write_ulong(self._file, datalength + 8) self._file.seek(curpos, 0) self._nframes = self._nframeswritten self._datalength = datalength @@ -834,13 +840,13 @@ length = length + len(name) + 1 + 6 if len(name) & 1 == 0: length = length + 1 - _write_long(self._file, length) + _write_ulong(self._file, length) self._marklength = length + 8 _write_short(self._file, len(self._markers)) for marker in self._markers: id, pos, name = marker _write_short(self._file, id) - _write_long(self._file, pos) + _write_ulong(self._file, pos) _write_string(self._file, name) def open(f, mode=None): diff -r 27adb952813b Lib/test/test_aifc.py --- a/Lib/test/test_aifc.py Sun Nov 13 04:11:37 2011 +0100 +++ b/Lib/test/test_aifc.py Sun Nov 20 18:20:12 2011 +0400 @@ -1,6 +1,8 @@ from test.support import findfile, run_unittest, TESTFN import unittest import os +import io +import struct import aifc @@ -21,6 +23,7 @@ pass try: os.remove(TESTFN) + os.remove(TESTFN + '.aiff') except OSError: pass @@ -31,6 +34,7 @@ def test_params(self): f = self.f = aifc.open(self.sndfilepath) + self.assertEqual(f.getfp().name, self.sndfilepath) self.assertEqual(f.getnchannels(), 2) self.assertEqual(f.getsampwidth(), 2) self.assertEqual(f.getframerate(), 48000) @@ -44,6 +48,7 @@ def test_read(self): f = self.f = aifc.open(self.sndfilepath) + self.assertEqual(f.readframes(0), b'') self.assertEqual(f.tell(), 0) self.assertEqual(f.readframes(2), b'\x00\x00\x00\x00\x0b\xd4\x0b\xd4') f.rewind() @@ -57,6 +62,10 @@ self.assertEqual(f.readframes(2), b'\x17t\x17t"\xad"\xad') f.setpos(pos0) self.assertEqual(f.readframes(2), b'\x00\x00\x00\x00\x0b\xd4\x0b\xd4') + with self.assertRaises(aifc.Error): + f.setpos(-1) + with self.assertRaises(aifc.Error): + f.setpos(f.getnframes() + 1) def test_write(self): f = self.f = aifc.open(self.sndfilepath) @@ -91,8 +100,8 @@ self.assertEqual(f.getparams()[0:3], fout.getparams()[0:3]) self.assertEqual(fout.getcomptype(), b'ULAW') self.assertEqual(fout.getcompname(), b'foo') - # XXX: this test fails, not sure if it should succeed or not - # self.assertEqual(f.readframes(5), fout.readframes(5)) + # ULAW is lossy compression, so frames may not match + self.assertNotEqual(f.readframes(5), fout.readframes(5)) def test_close(self): class Wrapfile(object): @@ -109,9 +118,230 @@ f.close() self.assertEqual(testfile.closed, True) + def test_wrong_open_mode(self): + with self.assertRaises(aifc.Error): + aifc.open(TESTFN, 'wrong_mode') + + def test_read_wrong_form(self): + b1 = io.BytesIO(b'WRNG' + struct.pack('>L', 0)) + b2 = io.BytesIO(b'FORM' + struct.pack('>L', 4) + b'WRNG') + self.assertRaises(aifc.Error, aifc.open, b1) + self.assertRaises(aifc.Error, aifc.open, b2) + + def test_read_no_comm_chunk(self): + b = io.BytesIO(b'FORM' + struct.pack('>L', 4) + b'AIFF') + self.assertRaises(aifc.Error, aifc.open, b) + + def test_read_wrong_compression_type(self): + b = b'FORM' + struct.pack('>L', 4) + b'AIFC' + b += b'COMM' + struct.pack('>LhlhhLL', 23, 0, 0, 0, 0, 0, 0) + b += b'WRNG' + struct.pack('B', 0) + self.assertRaises(aifc.Error, aifc.open, io.BytesIO(b)) + + def test_read_wrong_marks(self): + b = b'FORM' + struct.pack('>L', 4) + b'AIFF' + b += b'COMM' + struct.pack('>LhlhhLL', 18, 0, 0, 0, 0, 0, 0) + b += b'SSND' + struct.pack('>L', 8) + b'\x00' * 8 + b += b'MARK' + struct.pack('>LhB', 3, 1, 1) + f = self.f = aifc.open (io.BytesIO(b)) + self.assertEqual(f.getmarkers(), None) + + def test_read_comm_kludge_compname_even(self): + b = b'FORM' + struct.pack('>L', 4) + b'AIFC' + b += b'COMM' + struct.pack('>LhlhhLL', 18, 0, 0, 0, 0, 0, 0) + b += b'NONE' + struct.pack('B', 4) + b'even' + b'\x00' + b += b'SSND' + struct.pack('>L', 8) + b'\x00' * 8 + f = self.f = aifc.open (io.BytesIO(b)) + self.assertEqual(f.getcompname(), b'even') + + def test_read_comm_kludge_compname_odd(self): + b = b'FORM' + struct.pack('>L', 4) + b'AIFC' + b += b'COMM' + struct.pack('>LhlhhLL', 18, 0, 0, 0, 0, 0, 0) + b += b'NONE' + struct.pack('B', 3) + b'odd' + b += b'SSND' + struct.pack('>L', 8) + b'\x00' * 8 + f = self.f = aifc.open (io.BytesIO(b)) + self.assertEqual(f.getcompname(), b'odd') + + def test_write_params_raises(self): + fout = self.fout = aifc.open(io.BytesIO(), 'wb') + wrong_params = (0, 0, 0, 0, b'WRNG', '') + self.assertRaises(aifc.Error, fout.setparams, wrong_params) + self.assertRaises(aifc.Error, fout.getparams) + self.assertRaises(aifc.Error, fout.setnchannels, 0) + self.assertRaises(aifc.Error, fout.getnchannels) + self.assertRaises(aifc.Error, fout.setsampwidth, 0) + self.assertRaises(aifc.Error, fout.getsampwidth) + self.assertRaises(aifc.Error, fout.setframerate, 0) + self.assertRaises(aifc.Error, fout.getframerate) + self.assertRaises(aifc.Error, fout.setcomptype, b'WRNG', '') + fout.aiff() + fout.setnchannels(1) + fout.setsampwidth(1) + fout.setframerate(1) + fout.setnframes(1) + fout.writeframes(b'\x00') + self.assertRaises(aifc.Error, fout.setparams, (1, 1, 1, 1, 1, 1)) + self.assertRaises(aifc.Error, fout.setnchannels, 1) + self.assertRaises(aifc.Error, fout.setsampwidth, 1) + self.assertRaises(aifc.Error, fout.setframerate, 1) + self.assertRaises(aifc.Error, fout.setnframes, 1) + self.assertRaises(aifc.Error, fout.setcomptype, b'NONE', '') + self.assertRaises(aifc.Error, fout.aiff) + self.assertRaises(aifc.Error, fout.aifc) + + def test_write_params_singles(self): + fout = self.fout = aifc.open(io.BytesIO(), 'wb') + fout.aifc() + fout.setnchannels(1) + fout.setsampwidth(2) + fout.setframerate(3) + fout.setnframes(4) + fout.setcomptype(b'NONE', b'name') + self.assertEqual(fout.getnchannels(), 1) + self.assertEqual(fout.getsampwidth(), 2) + self.assertEqual(fout.getframerate(), 3) + self.assertEqual(fout.getnframes(), 0) + self.assertEqual(fout.tell(), 0) + self.assertEqual(fout.getcomptype(), b'NONE') + self.assertEqual(fout.getcompname(), b'name') + fout.writeframes(b'\x00' * 4 * fout.getsampwidth() * fout.getnchannels()) + self.assertEqual(fout.getnframes(), 4) + self.assertEqual(fout.tell(), 4) + + def test_write_params_bunch(self): + fout = self.fout = aifc.open(io.BytesIO(), 'wb') + fout.aifc() + p = (1, 2, 3, 4, b'NONE', b'name') + fout.setparams(p) + self.assertEqual(fout.getparams(), p) + fout.initfp(None) + + def test_write_header_raises(self): + fout = self.fout = aifc.open(io.BytesIO(), 'wb') + self.assertRaises(aifc.Error, fout.close) + fout.setnchannels(1) + self.assertRaises(aifc.Error, fout.close) + fout.setsampwidth(1) + self.assertRaises(aifc.Error, fout.close) + fout.initfp(None) + + def test_write_header_comptype_raises(self): + for comptype in (b'ULAW', b'ulaw', b'ALAW', b'alaw', b'G722'): + fout = self.fout = aifc.open(io.BytesIO(), 'wb') + fout.setsampwidth(1) + fout.setcomptype(comptype, b'') + self.assertRaises(aifc.Error, fout.close) + fout.initfp(None) + + def test_write_header_comptype_sampwidth(self): + for comptype in (b'ULAW', b'ulaw', b'ALAW', b'alaw', b'G722'): + fout = self.fout = aifc.open(io.BytesIO(), 'wb') + fout.setnchannels(1) + fout.setframerate(1) + fout.setcomptype(comptype, b'') + fout.close() + self.assertEqual(fout.getsampwidth(), 2) + fout.initfp(None) + + def test_write_markers_raises(self): + fout = self.fout = aifc.open(io.BytesIO(), 'wb') + self.assertRaises(aifc.Error, fout.setmark, 0, 0, b'') + self.assertRaises(aifc.Error, fout.setmark, 1, -1, b'') + self.assertRaises(aifc.Error, fout.setmark, 1, 0, None) + self.assertRaises(aifc.Error, fout.getmark, 1) + fout.initfp(None) + + def test_write_markers_values(self): + fout = self.fout = aifc.open(io.BytesIO(), 'wb') + self.assertEqual(fout.getmarkers(), None) + fout.setmark(1, 0, b'foo1') + fout.setmark(1, 1, b'foo2') + self.assertEqual(fout.getmark(1), (1, 1, b'foo2')) + self.assertEqual(fout.getmarkers(), [(1, 1, b'foo2')]) + fout.initfp(None) + + def test_read_markers(self): + fout = self.fout = aifc.open(TESTFN, 'wb') + fout.aiff() + fout.setparams((1, 1, 1, 1, b'NONE', b'')) + fout.setmark(1, 0, b'odd') + fout.setmark(2, 0, b'even') + fout.writeframes(b'\x00') + fout.close() + f = self.f = aifc.open(TESTFN, 'rb') + self.assertEqual(f.getmarkers(), [(1, 0, b'odd'), (2, 0, b'even')]) + self.assertEqual(f.getmark(1), (1, 0, b'odd')) + self.assertEqual(f.getmark(2), (2, 0, b'even')) + self.assertRaises(aifc.Error, f.getmark, 3) + + def test_read_compressed_frames(self): + def check_frames(comptype, nframes, sampwidth): + fout = self.fout = aifc.open(TESTFN, 'wb') + fout.aifc() + fout.setparams((1, sampwidth, 1, nframes, comptype, b'')) + frames = b'\x00' * nframes * fout.getnchannels() * sampwidth + fout.writeframes(frames) + fout.close() + f = self.f = aifc.open(TESTFN, 'rb') + self.assertEqual(len(f.readframes(nframes)), len(frames)) + f.close() + for comptype in (b'ULAW', b'ulaw', b'ALAW', b'alaw'): + for nframes in (1, 2): + check_frames(comptype, nframes, 2) + for nframes in (2, 4): + check_frames(b'G722', nframes, 2) + + def test_write_aiff_by_extension(self): + sampwidth = 2 + fout = self.fout = aifc.open(TESTFN + '.aiff', 'wb') + fout.setparams((1, sampwidth, 1, 1, b'ULAW', b'')) + frames = b'\x00' * fout.getnchannels() * sampwidth + fout.writeframes(frames) + fout.close() + f = self.f = aifc.open(TESTFN + '.aiff', 'rb') + self.assertEqual(f.getcomptype(), b'NONE') + f.close() + + +class AIFCLowLevelTest(unittest.TestCase): + + def test_read_written(self): + def read_written(self, what): + f = io.BytesIO() + getattr(aifc, '_write_' + what)(f, x) + f.seek(0) + return getattr(aifc, '_read_' + what)(f) + for x in (-1, 0, 0.1, 1): + self.assertEqual(read_written(x, 'float'), x) + for x in (float('NaN'), float('Inf')): + self.assertEqual(read_written(x, 'float'), aifc._HUGE_VAL) + for x in (b'', b'foo'): + self.assertEqual(read_written(x, 'string'), x) + for x in (-0x7FFFFFFF, -1, 0, 1, 0x7FFFFFFF): + self.assertEqual(read_written(x, 'long'), x) + for x in (0, 1, 0xFFFFFFFF): + self.assertEqual(read_written(x, 'ulong'), x) + for x in (-0x7FFF, -1, 0, 1, 0x7FFF): + self.assertEqual(read_written(x, 'short'), x) + for x in (0, 1, 0xFFFF): + self.assertEqual(read_written(x, 'ushort'), x) + + def test_read_raises(self): + f = io.BytesIO(b'\x00') + self.assertRaises(EOFError, aifc._read_ulong, f) + self.assertRaises(EOFError, aifc._read_long, f) + self.assertRaises(EOFError, aifc._read_ushort, f) + self.assertRaises(EOFError, aifc._read_short, f) + + def test_write_long_string_raises(self): + f = io.BytesIO() + with self.assertRaises(ValueError): + aifc._write_string(f, b'too long' * 255) + def test_main(): run_unittest(AIFCTest) + run_unittest(AIFCLowLevelTest) if __name__ == "__main__": diff -r 27adb952813b Lib/test/test_audioop.py --- a/Lib/test/test_audioop.py Sun Nov 13 04:11:37 2011 +0100 +++ b/Lib/test/test_audioop.py Sun Nov 20 18:20:12 2011 +0400 @@ -195,10 +195,15 @@ self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2) self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state) self.assertRaises(audioop.error, audioop.lin2ulaw, data, size) + self.assertRaises(audioop.error, audioop.lin2alaw, data, size) + self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state) + + def test_wrongsize(self): + data = 'abc' + state = None + for size in (-1, 3, 5): self.assertRaises(audioop.error, audioop.ulaw2lin, data, size) - self.assertRaises(audioop.error, audioop.lin2alaw, data, size) self.assertRaises(audioop.error, audioop.alaw2lin, data, size) - self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state) self.assertRaises(audioop.error, audioop.adpcm2lin, data, size, state) def test_main(): diff -r 27adb952813b Misc/ACKS --- a/Misc/ACKS Sun Nov 13 04:11:37 2011 +0100 +++ b/Misc/ACKS Sun Nov 20 18:20:12 2011 +0400 @@ -762,6 +762,7 @@ Michael Piotrowski Antoine Pitrou Jean-François Piéronne +Oleg Plakhotnyuk Remi Pointel Guilherme Polo Michael Pomraning diff -r 27adb952813b Modules/audioop.c --- a/Modules/audioop.c Sun Nov 13 04:11:37 2011 +0100 +++ b/Modules/audioop.c Sun Nov 20 18:20:12 2011 +0400 @@ -1311,7 +1311,7 @@ &cp, &len, &size) ) return 0; - if (!audioop_check_parameters(len, size)) + if (!audioop_check_parameters(len*size, size)) return NULL; if (len > PY_SSIZE_T_MAX/size) { @@ -1380,7 +1380,7 @@ &cp, &len, &size) ) return 0; - if (!audioop_check_parameters(len, size)) + if (!audioop_check_parameters(len*size, size)) return NULL; if (len > PY_SSIZE_T_MAX/size) { @@ -1524,7 +1524,7 @@ &cp, &len, &size, &state) ) return 0; - if (!audioop_check_parameters(len, size)) + if (!audioop_check_parameters(len*size, size)) return NULL; /* Decode state, should have (value, step) */