import enum import ctypes import contextlib from ctypes import wintypes kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) advapi32 = ctypes.WinDLL('advapi32', use_last_error=True) ERROR_NO_TOKEN = 0x03F0 # Generic / System Rights GENERIC_READ = 0x80000000 GENERIC_WRITE = 0x40000000 GENERIC_EXECUTE = 0x20000000 GENERIC_ALL = 0x10000000 MAXIMUM_ALLOWED = 0x02000000 ACCESS_SYSTEM_SECURITY = 0x01000000 # Standard Rights DELETE = 0x00010000 READ_CONTROL = 0x00020000 WRITE_DAC = 0x00040000 WRITE_OWNER = 0x00080000 SYNCHRONIZE = 0x00100000 STANDARD_RIGHTS_READ = READ_CONTROL STANDARD_RIGHTS_WRITE = READ_CONTROL STANDARD_RIGHTS_EXECUTE = READ_CONTROL STANDARD_RIGHTS_REQUIRED = DELETE | READ_CONTROL | WRITE_DAC | WRITE_OWNER STANDARD_RIGHTS_ALL = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE # Token Rights TOKEN_ASSIGN_PRIMARY = 0x0001 TOKEN_DUPLICATE = 0x0002 TOKEN_IMPERSONATE = 0x0004 TOKEN_QUERY = 0x0008 TOKEN_QUERY_SOURCE = 0x0010 TOKEN_ADJUST_PRIVILEGES = 0x0020 TOKEN_ADJUST_GROUPS = 0x0040 TOKEN_ADJUST_DEFAULT = 0x0080 TOKEN_ADJUST_SESSIONID = 0x0100 TOKEN_ALL_ACCESS = (STANDARD_RIGHTS_REQUIRED | TOKEN_ASSIGN_PRIMARY | TOKEN_DUPLICATE | TOKEN_IMPERSONATE | TOKEN_QUERY | TOKEN_QUERY_SOURCE | TOKEN_ADJUST_PRIVILEGES | TOKEN_ADJUST_GROUPS | TOKEN_ADJUST_DEFAULT | TOKEN_ADJUST_SESSIONID) # Group Attributes SE_GROUP_MANDATORY = 0x00000001 SE_GROUP_ENABLED_BY_DEFAULT = 0x00000002 SE_GROUP_ENABLED = 0x00000004 SE_GROUP_OWNER = 0x00000008 SE_GROUP_USE_FOR_DENY_ONLY = 0x00000010 SE_GROUP_INTEGRITY = 0x00000020 SE_GROUP_INTEGRITY_ENABLED = 0x00000040 SE_GROUP_RESOURCE = 0x20000000 SE_GROUP_LOGON_ID = 0xC0000000 class c_enum(enum.IntEnum): @classmethod def from_param(cls, obj): return ctypes.c_int(cls(obj)) class SECURITY_IMPERSONATION_LEVEL(c_enum): SecurityAnonymous = 0 SecurityIdentification = 1 SecurityImpersonation = 2 SecurityDelegation = 3 globals().update(SECURITY_IMPERSONATION_LEVEL.__members__) class TOKEN_INFORMATION_CLASS(c_enum): TokenUser = 1 TokenGroups = 2 TokenPrivileges = 3 TokenOwner = 4 TokenPrimaryGroup = 5 TokenDefaultDacl = 6 TokenSource = 7 TokenType = 8 TokenImpersonationLevel = 9 TokenSessionId = 12 TokenElevationType = 18 TokenLinkedToken = 19 TokenElevation = 20 TokenIntegrityLevel = 25 TokenUIAccess = 26 TokenMandatoryPolicy = 27 TokenLogonSid = 28 globals().update(TOKEN_INFORMATION_CLASS.__members__) class TOKEN_TYPE(c_enum): TokenPrimary = 1 TokenImpersonation = 2 globals().update(TOKEN_TYPE.__members__) class WELL_KNOWN_SID_TYPE(c_enum): WinCreatorOwnerSid = 3 WinAnonymousSid = 13 WinAuthenticatedUserSid = 17 WinLocalSystemSid = 22 WinLocalServiceSid = 23 WinNetworkServiceSid = 24 WinBuiltinAdministratorsSid = 26 WinBuiltinUsersSid = 27 WinBuiltinGuestsSid = 28 WinUntrustedLabelSid = 65 WinLowLabelSid = 66 WinMediumLabelSid = 67 WinHighLabelSid = 68 WinSystemLabelSid = 69 WinCreatorOwnerRightsSid = 71 WinMediumPlusLabelSid = 79 globals().update(WELL_KNOWN_SID_TYPE.__members__) PSID = ctypes.POINTER(ctypes.c_char) class SID_AND_ATTRIBUTES(ctypes.Structure): _fields_ = (('Sid', PSID), ('Attributes', wintypes.DWORD)) class TOKEN_MANDATORY_LABEL(ctypes.Structure): _fields_ = (('Label', SID_AND_ATTRIBUTES),) class HANDLE(ctypes.c_void_p): closed = False def close(self): if not self.closed: kernel32.CloseHandle(self) self.closed = True def __eq__(self, other): return self.value == getattr(other, 'value', other) def __repr__(self): return '%s(%s)' % (type(self).__name__, self.value) kernel32.GetCurrentProcess.restype = HANDLE kernel32.GetCurrentThread.restype = HANDLE def create_well_known_sid(well_known_sid_type, domain_sid=None): cbSid = wintypes.DWORD() advapi32.CreateWellKnownSid(well_known_sid_type, domain_sid, None, ctypes.byref(cbSid)) if cbSid.value: Sid = (ctypes.c_char * cbSid.value)() if advapi32.CreateWellKnownSid(well_known_sid_type, domain_sid, Sid, ctypes.byref(cbSid)): return Sid raise ctypes.WinError(ctypes.get_last_error()) @contextlib.contextmanager def impersonate_self(impersonation_level): hToken = HANDLE() hNewToken = HANDLE() impersonating = revert_to_process = False try: if (not advapi32.OpenThreadToken(kernel32.GetCurrentThread(), TOKEN_ALL_ACCESS, True, ctypes.byref(hToken)) and ctypes.get_last_error() == ERROR_NO_TOKEN): revert_to_process = True advapi32.OpenProcessToken(kernel32.GetCurrentProcess(), TOKEN_ALL_ACCESS, ctypes.byref(hToken)) if (not hToken or not advapi32.DuplicateTokenEx(hToken, TOKEN_ALL_ACCESS, None, impersonation_level, TokenImpersonation, ctypes.byref(hNewToken)) or not advapi32.SetThreadToken(None, hNewToken)): raise ctypes.WinError(ctypes.get_last_error()) impersonating = True yield hNewToken finally: try: if impersonating: hOldToken = None if revert_to_process else hToken if not advapi32.SetThreadToken(None, hOldToken): raise ctypes.WinError(ctypes.get_last_error()) finally: if hNewToken: hNewToken.close() if hToken: hToken.close() integrity_level_sid = { 'untrusted': create_well_known_sid(WinUntrustedLabelSid), 'low' : create_well_known_sid(WinLowLabelSid), 'medium' : create_well_known_sid(WinMediumLabelSid), 'high' : create_well_known_sid(WinHighLabelSid), 'system' : create_well_known_sid(WinSystemLabelSid), } @contextlib.contextmanager def token_integrity_level(label): with impersonate_self(SecurityImpersonation) as hToken: token_info = TOKEN_MANDATORY_LABEL() token_info.Label.Attributes = SE_GROUP_INTEGRITY token_info.Label.Sid = integrity_level_sid[label] if not advapi32.SetTokenInformation(hToken, TokenIntegrityLevel, ctypes.byref(token_info), ctypes.sizeof(token_info)): raise ctypes.WinError(ctypes.get_last_error()) yield