diff -r 253c31930b32 Lib/test/support/__init__.py --- a/Lib/test/support/__init__.py Sat Jul 26 00:58:55 2014 +0200 +++ b/Lib/test/support/__init__.py Wed Aug 06 17:25:12 2014 +0300 @@ -2071,6 +2071,10 @@ def skip_unless_symlink(test): """Skip decorator for tests that require functional symlink""" + if sys.platform.startswith("win"): + from .windows_helper import skip_unless_privilege + return skip_unless_privilege("SeCreateSymbolicLinkPrivilege", + restore_early=False)(test) ok = can_symlink() msg = "Requires functional symlink implementation" return test if ok else unittest.skip(msg)(test) diff -r 253c31930b32 Lib/test/support/windows_helper.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Lib/test/support/windows_helper.py Wed Aug 06 17:25:12 2014 +0300 @@ -0,0 +1,206 @@ +# Various Windows helpers for tests + +import ctypes +import unittest +from functools import wraps +from ctypes import wintypes +from contextlib import contextmanager +from types import SimpleNamespace + +__all__ = ( + 'skip_unless_privilege', + 'adjust_privilege', + 'acquired_privilege', +) + +class TOKEN_INFORMATION_CLASS: + TokenUser = 1 + TokenGroups = 2 + TokenPrivileges = 3 + +class LUID(ctypes.Structure): + _fields_ = [ + ('low_part', wintypes.DWORD), + ('high_part', wintypes.LONG), + ] + +class LUID_AND_ATTRIBUTES(ctypes.Structure): + _fields_ = [ + ('LUID', LUID), + ('attributes', wintypes.DWORD), + ] + + def enable(self): + self.attributes |= SE_PRIVILEGE_ENABLED + +class TOKEN_PRIVILEGES(ctypes.Structure): + _fields_ = [ + ('count', wintypes.DWORD), + ('privileges', LUID_AND_ATTRIBUTES * 0), + ] + + def get_array(self): + array_type = LUID_AND_ATTRIBUTES * self.count + privileges = ctypes.cast(self.privileges, + ctypes.POINTER(array_type)).contents + return privileges + +SE_PRIVILEGE_ENABLED_BY_DEFAULT = 0x00000001 +SE_PRIVILEGE_ENABLED = 0x00000002 +SE_PRIVILEGE_REMOVED = 0x00000004 +SE_PRIVILEGE_USED_FOR_ACCESS = 0x80000000 +TOKEN_ADJUST_PRIVILEGES = 0x20 +TOKEN_QUERY = 8 + +PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES) +windll = ctypes.LibraryLoader(ctypes.WinDLL) + +LookupPrivilegeName = windll.advapi32.LookupPrivilegeNameW +LookupPrivilegeName.argtypes = ( + wintypes.LPWSTR, + ctypes.POINTER(LUID), + wintypes.LPWSTR, + ctypes.POINTER(wintypes.DWORD), +) +LookupPrivilegeName.restype = wintypes.BOOL + +LookupPrivilegeValue = windll.advapi32.LookupPrivilegeValueW +LookupPrivilegeValue.argtypes = ( + wintypes.LPWSTR, + wintypes.LPWSTR, + ctypes.POINTER(LUID), +) +LookupPrivilegeValue.restype = wintypes.BOOL + +GetTokenInformation = windll.advapi32.GetTokenInformation +GetTokenInformation.argtypes = ( + wintypes.HANDLE, + ctypes.c_uint, + ctypes.c_void_p, + wintypes.DWORD, + ctypes.POINTER(wintypes.DWORD), +) +GetTokenInformation.restype = wintypes.BOOL + +AdjustTokenPrivileges = windll.advapi32.AdjustTokenPrivileges +AdjustTokenPrivileges.restype = wintypes.BOOL +AdjustTokenPrivileges.argtypes = ( + wintypes.HANDLE, + wintypes.BOOL, + PTOKEN_PRIVILEGES, + wintypes.DWORD, + PTOKEN_PRIVILEGES, + ctypes.POINTER(wintypes.DWORD), +) + +OpenProcessToken = windll.advapi32.OpenProcessToken +OpenProcessToken.argtypes = ( + wintypes.HANDLE, wintypes.DWORD, + ctypes.POINTER(wintypes.HANDLE)) +OpenProcessToken.restype = wintypes.BOOL + +GetCurrentProcess = windll.kernel32.GetCurrentProcess +GetCurrentProcess.restype = wintypes.HANDLE + +def get_current_process_token(): + """ Get a token to the current process. """ + token = wintypes.HANDLE() + res = OpenProcessToken( + GetCurrentProcess(), + TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, token) + if not res > 0: + raise RuntimeError("Couldn't get process token") + return token + +def get_luid(privilege): + luid = LUID() + + # system name, privilege name and the LUID structure + if not LookupPrivilegeValue(None, privilege, luid) > 0: + raise RuntimeError("Couldn't lookup privilege value") + return luid + +def adjust_privilege(privilege, old_token=None): + """ Enable the given Windows privilege. + + The privileges can be found in + http://msdn.microsoft.com/en-us/library/windows/desktop/bb530716%28v=vs.85%29.aspx. + The functions uses the text representation of the privileges, + e.g. SeBackupPrivilege, SeShutdownPrivilege etc. + Returns a namespace with attributes: + - result: the result of AdjustTokenPrivileges + - success: True if the privilege was acquired + - token_privileges: The token privileges used. They can be reused + to restore the old privileges the current process had. + """ + size = ctypes.sizeof(TOKEN_PRIVILEGES) + size += ctypes.sizeof(LUID_AND_ATTRIBUTES) + buffer = ctypes.create_string_buffer(size) + tp = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents + tp.count = 1 + + tp.get_array()[0].enable() + tp.get_array()[0].LUID = get_luid(privilege) + token = get_current_process_token() + + buffer_length = size if old_token else 0 + # current token handle, disable all privileges, new state, + # buffer length, previous state and return length + res = AdjustTokenPrivileges(token, False, tp, + buffer_length, old_token, + ctypes.c_ulong(buffer_length)) + return SimpleNamespace(result=res, token_privileges=tp, + success=not windll.kernel32.GetLastError()) + +def skip_unless_privilege(privilege, restore_early=True): + """ Raise an unittest.SkipTest exception if the given + privilege can't be acquired. The privileges supported + are the same privileges supported by `adjust_privilege`. + + :param privilege: The privilege which should be acquired. + :param restore_early: + Restore the privilege before running the function. + Otherwise, the privlege will be restored after. + """ + + def decorator(func): + @wraps(func) + def wrapper(*args, **kw): + result = adjust_privilege(privilege) + if not result.success: + raise unittest.SkipTest( + "Privilege {} wasn't acquired.".format(privilege)) + + # disable the privilege. + if restore_early: + adjust_privilege(privilege, old_token=result.token_privileges) + try: + return func(*args, **kw) + finally: + if not restore_early: + adjust_privilege(privilege, + old_token=result.token_privileges) + return wrapper + return decorator + +@contextmanager +def acquired_privilege(privilege): + """ Context manager for acquiring and releasing + a privilege for the current process using the `with` statement. + If the privilege can't be acquired, a RuntimeError will be + raised. + + :: + with acquired_privilege('SeBackupPrivilege'): + winreg.SaveKey(key, "file") + # no longer has this privilege + """ + result = adjust_privilege(privilege) + if not result.success: + raise RuntimeError("Could not acquire privilege {!r}" + .format(privilege)) + + try: + yield + finally: + adjust_privilege(privilege, old_token=result.token_privileges)