diff -r 4aab4e0cd759 Lib/test/support/windows_helper.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Lib/test/support/windows_helper.py Sun Apr 19 23:21:25 2015 +0300 @@ -0,0 +1,208 @@ +# Various Windows helpers for tests + +import contextlib +import ctypes +from ctypes import wintypes +import functools +import types +import unittest + + +__all__ = ( + 'skip_unless_privilege', + 'adjust_privilege', + 'acquired_privilege', +) + + +class TOKEN_INFORMATION_CLASS: + TokenUser = 1 + TokenGroups = 2 + TokenPrivileges = 3 + + +class LUID(ctypes.Structure): + _fields_ = [ + ('low_part', wintypes.DWORD), + ('high_part', wintypes.LONG), + ] + + +class LUID_AND_ATTRIBUTES(ctypes.Structure): + _fields_ = [ + ('LUID', LUID), + ('attributes', wintypes.DWORD), + ] + + def enable(self): + self.attributes |= SE_PRIVILEGE_ENABLED + + +class TOKEN_PRIVILEGES(ctypes.Structure): + _fields_ = [ + ('count', wintypes.DWORD), + ('privileges', LUID_AND_ATTRIBUTES * 0), + ] + + def get_array(self): + array_type = LUID_AND_ATTRIBUTES * self.count + privileges = ctypes.cast(self.privileges, + ctypes.POINTER(array_type)).contents + return privileges + + +SE_PRIVILEGE_ENABLED_BY_DEFAULT = 0x00000001 +SE_PRIVILEGE_ENABLED = 0x00000002 +SE_PRIVILEGE_REMOVED = 0x00000004 +SE_PRIVILEGE_USED_FOR_ACCESS = 0x80000000 +TOKEN_ADJUST_PRIVILEGES = 0x20 +TOKEN_QUERY = 8 + +PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES) +windll = ctypes.LibraryLoader(ctypes.WinDLL) + +LookupPrivilegeName = windll.advapi32.LookupPrivilegeNameW +LookupPrivilegeName.argtypes = ( + wintypes.LPWSTR, + ctypes.POINTER(LUID), + wintypes.LPWSTR, + ctypes.POINTER(wintypes.DWORD), +) +LookupPrivilegeName.restype = wintypes.BOOL + +LookupPrivilegeValue = windll.advapi32.LookupPrivilegeValueW +LookupPrivilegeValue.argtypes = ( + wintypes.LPWSTR, + wintypes.LPWSTR, + ctypes.POINTER(LUID), +) +LookupPrivilegeValue.restype = wintypes.BOOL + +GetTokenInformation = windll.advapi32.GetTokenInformation +GetTokenInformation.argtypes = ( + wintypes.HANDLE, + ctypes.c_uint, + ctypes.c_void_p, + wintypes.DWORD, + ctypes.POINTER(wintypes.DWORD), +) +GetTokenInformation.restype = wintypes.BOOL + +AdjustTokenPrivileges = windll.advapi32.AdjustTokenPrivileges +AdjustTokenPrivileges.restype = wintypes.BOOL +AdjustTokenPrivileges.argtypes = ( + wintypes.HANDLE, + wintypes.BOOL, + PTOKEN_PRIVILEGES, + wintypes.DWORD, + PTOKEN_PRIVILEGES, + ctypes.POINTER(wintypes.DWORD), +) + +OpenProcessToken = windll.advapi32.OpenProcessToken +OpenProcessToken.argtypes = ( + wintypes.HANDLE, + wintypes.DWORD, + ctypes.POINTER(wintypes.HANDLE)) +OpenProcessToken.restype = wintypes.BOOL + +GetCurrentProcess = windll.kernel32.GetCurrentProcess +GetCurrentProcess.restype = wintypes.HANDLE + + +def get_current_process_token(): + """Get a token to the current process.""" + token = wintypes.HANDLE() + res = OpenProcessToken( + GetCurrentProcess(), + TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, token) + if not res > 0: + raise RuntimeError("Couldn't get process token") + return token + + +def get_luid(privilege): + luid = LUID() + + # system name, privilege name and the LUID structure + if not LookupPrivilegeValue(None, privilege, luid) > 0: + raise RuntimeError("Couldn't lookup privilege value") + return luid + + +def adjust_privilege(privilege, old_token=None): + """Enable the given Windows privilege. + + The privileges can be found in + http://msdn.microsoft.com/en-us/library/windows/desktop/bb530716%28v=vs.85%29.aspx. + The functions uses the text representation of the privileges, + e.g. SeBackupPrivilege, SeShutdownPrivilege etc. + Returns a namespace with attributes: + - success: True if the privilege was acquired + - token_privileges: The token privileges used. They can be reused + to restore the old privileges the current process had. + """ + size = ctypes.sizeof(TOKEN_PRIVILEGES) + size += ctypes.sizeof(LUID_AND_ATTRIBUTES) + buffer = ctypes.create_string_buffer(size) + tp = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents + tp.count = 1 + + tp.get_array()[0].enable() + tp.get_array()[0].LUID = get_luid(privilege) + token = get_current_process_token() + + buffer_length = size if old_token else 0 + # current token handle, disable all privileges, new state, + # buffer length, previous state and return length + AdjustTokenPrivileges(token, False, tp, + buffer_length, old_token, + ctypes.c_ulong(buffer_length)) + return types.SimpleNamespace(token_privileges=tp, + success=not windll.kernel32.GetLastError()) + + +def skip_unless_privilege(privilege, restore_early=True): + """Skip if the given privilege can't be acquired. + + The privileges supported are the same privileges + supported by `adjust_privilege`. + """ + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kw): + result = adjust_privilege(privilege) + if not result.success: + raise unittest.SkipTest( + "Privilege {} wasn't acquired.".format(privilege)) + + # disable the privilege. + if restore_early: + adjust_privilege(privilege, old_token=result.token_privileges) + try: + return func(*args, **kw) + finally: + if not restore_early: + adjust_privilege(privilege, + old_token=result.token_privileges) + return wrapper + return decorator + + +@contextlib.contextmanager +def acquired_privilege(privilege): + """Try to acquire the given privilege for the current process. + + If the privilege can't be acquired, a RuntimeError will be + raised. + """ + result = adjust_privilege(privilege) + if not result.success: + raise RuntimeError("Could not acquire privilege {!r}" + .format(privilege)) + + try: + yield + finally: + adjust_privilege(privilege, old_token=result.token_privileges)