Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(34)

Unified Diff: Lib/test/test_socket.py

Issue 27744: Add AF_ALG (Linux Kernel crypto) to socket module
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
index 7fce13e175d1655650934084b0ba05d55bc88606..f4cd20052a916d51c006722fd29911cc0eb42bfc 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -21,6 +21,7 @@ import pickle
import struct
import random
import string
+from binascii import hexlify, unhexlify
try:
import multiprocessing
except ImportError:
@@ -5325,6 +5326,169 @@ class SendfileUsingSendfileTest(SendfileUsingSendTest):
def meth_from_sock(self, sock):
return getattr(sock, "_sendfile_use_sendfile")
+@unittest.skipUnless(hasattr(socket, "AF_ALG"), 'AF_ALG required')
+class LinuxKernelCryptoAPI(unittest.TestCase):
+ # tests for AF_ALG
+ def create_alg(self, typ, name):
+ sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
+ sock.bind((typ, name))
+ return sock
+
+ def test_sha256(self):
+ expected = b"ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
+ with self.create_alg('hash', 'sha256') as algo:
+ op, _ = algo.accept()
+ with op:
+ op.sendall(b"abc")
+ self.assertEqual(hexlify(op.recv(512)), expected)
Martin Panter 2016/09/04 16:01:03 Python 3.5 has a bytes.hex() method. Perhaps you c
christian.heimes 2016/09/04 16:58:44 Nice, I still have to write polyglot code and wasn
+
+ op, _ = algo.accept()
+ with op:
+ op.send(b'a', socket.MSG_MORE)
+ op.send(b'b', socket.MSG_MORE)
+ op.send(b'c', socket.MSG_MORE)
+ op.send(b'')
+ self.assertEqual(hexlify(op.recv(512)), expected)
+
+ def test_hmac_sha1(self):
+ expected = b"effcdf6ae5eb2fa2d27416d5f184df9c259a7c79"
+ with self.create_alg('hash', 'hmac(sha1)') as algo:
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
+ op, _ = algo.accept()
+ with op:
+ op.sendall(b"what do ya want for nothing?")
+ self.assertEqual(hexlify(op.recv(512)), expected)
+
+ def test_aes_cbc(self):
+ key = unhexlify('06a9214036b8a15b512e03d534120006')
Martin Panter 2016/09/04 16:01:03 Use bytes.fromhex(); it saves an import :)
+ iv = unhexlify('3dafba429d9eb430b422da802c9fac41')
+ msg = b"Single block msg"
+ ciphertext = unhexlify('e353779c1079aeb82708942dbe77181a')
+ msglen = len(msg)
+ with self.create_alg('skcipher', 'cbc(aes)') as algo:
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
+ flags=socket.MSG_MORE)
+ op.sendall(msg)
+ self.assertEqual(op.recv(msglen), ciphertext)
+
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg([ciphertext],
+ op=socket.ALG_OP_DECRYPT, iv=iv)
+ self.assertEqual(op.recv(msglen), msg)
+
+ # long message
+ multiplier = 1024
+ longmsg = [msg] * multiplier
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg(longmsg,
+ op=socket.ALG_OP_ENCRYPT, iv=iv)
+ enc = op.recv(msglen * multiplier)
+ self.assertEqual(len(enc), msglen * multiplier)
+ self.assertTrue(enc[:msglen], ciphertext)
+
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg([enc],
+ op=socket.ALG_OP_DECRYPT, iv=iv)
+ dec = op.recv(msglen * multiplier)
+ self.assertEqual(len(dec), msglen * multiplier)
+ self.assertEqual(dec, msg * multiplier)
+
+ @support.requires_linux_version(3, 19)
+ def test_aead_aes_gcm(self):
+ key = unhexlify('c939cc13397c1d37de6ae0e1cb7c423c')
+ iv = unhexlify('b3d8cc017cbb89b39e0f67e2')
+ plain = unhexlify('c3b3c41f113a31b73d9a5cd432103069')
+ assoc = unhexlify('24825602bd12a984e0092d3e448eda5f')
+ expected_ct = unhexlify('93fe7d9e9bfd10348a5606e5cafa7354')
+ expected_tag = unhexlify('0032a1dc85f1c9786925a2e71d8272dd')
+
+ taglen = len(expected_tag)
+ assoclen = len(assoc)
+
+ with self.create_alg('aead', 'gcm(aes)') as algo:
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
+ None, taglen)
+
+ # send assoc, plain and tag buffer in separate steps
+ op, _ = algo.accept()
+ with op:
+ op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
+ assoclen=assoclen, flags=socket.MSG_MORE)
+ op.sendall(assoc, socket.MSG_MORE)
+ op.sendall(plain, socket.MSG_MORE)
+ op.sendall(b'\x00' * taglen)
+ res = op.recv(assoclen + len(plain) + taglen)
+ self.assertEqual(expected_ct, res[assoclen:-taglen])
+ self.assertEqual(expected_tag, res[-taglen:])
+
+ # now with msg
+ op, _ = algo.accept()
+ with op:
+ msg = assoc + plain + b'\x00' * taglen
+ op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
+ assoclen=assoclen)
+ res = op.recv(assoclen + len(plain) + taglen)
+ self.assertEqual(expected_ct, res[assoclen:-taglen])
+ self.assertEqual(expected_tag, res[-taglen:])
+
+ # create anc data manually
+ pack_uint32 = struct.Struct('I').pack
+ op, _ = algo.accept()
+ with op:
+ msg = assoc + plain + b'\x00' * taglen
+ op.sendmsg(
+ [msg],
+ ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
+ [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
+ [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
+ )
+ )
+ res = op.recv(len(msg))
+ self.assertEqual(expected_ct, res[assoclen:-taglen])
+ self.assertEqual(expected_tag, res[-taglen:])
+
+ # decrypt and verify
+ op, _ = algo.accept()
+ with op:
+ msg = assoc + expected_ct + expected_tag
+ op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
+ assoclen=assoclen)
+ res = op.recv(len(msg))
+ self.assertEqual(plain, res[assoclen:-taglen])
+
+ def test_drbg_pr_sha256(self):
+ # deterministic random bit generator, prediction resistance, sha256
+ with self.create_alg('rng', 'drbg_pr_sha256') as algo:
+ extra_seed = os.urandom(32)
+ algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
+ op, _ = algo.accept()
+ with op:
+ rn = op.recv(32)
+ self.assertEqual(len(rn), 32)
+
+ def test_sendmsg_afalg_args(self):
+ sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg()
+
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(op=None)
+
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(1)
+
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
+
+ with self.assertRaises(TypeError):
+ sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
def test_main():
tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
@@ -5352,6 +5516,7 @@ def test_main():
tests.extend([TIPCTest, TIPCThreadableTest])
tests.extend([BasicCANTest, CANTest])
tests.extend([BasicRDSTest, RDSTest])
+ tests.append(LinuxKernelCryptoAPI)
tests.extend([
CmsgMacroTests,
SendmsgUDPTest,

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+