Index: Lib/test/test_sysconfig.py =================================================================== --- Lib/test/test_sysconfig.py (revision 83553) +++ Lib/test/test_sysconfig.py (working copy) @@ -12,7 +12,7 @@ from copy import copy, deepcopy from test.support import (run_unittest, TESTFN, unlink, get_attribute, - captured_stdout, skip_unless_symlink) + captured_stdout) import sysconfig from sysconfig import (get_paths, get_platform, get_config_vars, @@ -239,7 +239,8 @@ 'posix_home', 'posix_prefix', 'posix_user') self.assertEquals(get_scheme_names(), wanted) - @skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_symlink(self): # On Windows, the EXE needs to know where pythonXY.dll is at so we have # to add the directory to the path. Index: Lib/test/test_shutil.py =================================================================== --- Lib/test/test_shutil.py (revision 83553) +++ Lib/test/test_shutil.py (working copy) @@ -271,7 +271,8 @@ shutil.rmtree(src_dir) shutil.rmtree(os.path.dirname(dst_dir)) - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_dont_copy_file_onto_link_to_itself(self): # bug 851123. os.mkdir(TESTFN) @@ -301,7 +302,8 @@ except OSError: pass - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_rmtree_on_symlink(self): # bug 1669. os.mkdir(TESTFN) @@ -326,26 +328,27 @@ finally: os.remove(TESTFN) - @unittest.skipUnless(hasattr(os, 'mkfifo'), 'requires os.mkfifo') - def test_copytree_named_pipe(self): - os.mkdir(TESTFN) - try: - subdir = os.path.join(TESTFN, "subdir") - os.mkdir(subdir) - pipe = os.path.join(subdir, "mypipe") - os.mkfifo(pipe) + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") + def test_copytree_named_pipe(self): + os.mkdir(TESTFN) try: - shutil.copytree(TESTFN, TESTFN2) - except shutil.Error as e: - errors = e.args[0] - self.assertEqual(len(errors), 1) - src, dst, error_msg = errors[0] - self.assertEqual("`%s` is a named pipe" % pipe, error_msg) - else: - self.fail("shutil.Error should have been raised") - finally: - shutil.rmtree(TESTFN, ignore_errors=True) - shutil.rmtree(TESTFN2, ignore_errors=True) + subdir = os.path.join(TESTFN, "subdir") + os.mkdir(subdir) + pipe = os.path.join(subdir, "mypipe") + os.mkfifo(pipe) + try: + shutil.copytree(TESTFN, TESTFN2) + except shutil.Error as e: + errors = e.args[0] + self.assertEqual(len(errors), 1) + src, dst, error_msg = errors[0] + self.assertEqual("`%s` is a named pipe" % pipe, error_msg) + else: + self.fail("shutil.Error should have been raised") + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + shutil.rmtree(TESTFN2, ignore_errors=True) def test_copytree_special_func(self): @@ -362,7 +365,8 @@ shutil.copytree(src_dir, dst_dir, copy_function=_copy) self.assertEquals(len(copied), 2) - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_copytree_dangling_symlinks(self): # a dangling symlink raises an error at the end Index: Lib/test/symlink_support.py =================================================================== --- Lib/test/symlink_support.py (revision 83553) +++ Lib/test/symlink_support.py (working copy) @@ -1,206 +0,0 @@ -""" -A module built to test if the current process has the privilege to -create symlinks on Windows. -""" - -# allow script to run natively under python 2.6+ -from __future__ import print_function - -import ctypes -from ctypes import wintypes - -GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess -GetCurrentProcess.restype = wintypes.HANDLE -OpenProcessToken = ctypes.windll.advapi32.OpenProcessToken -OpenProcessToken.argtypes = (wintypes.HANDLE, wintypes.DWORD, - ctypes.POINTER(wintypes.HANDLE)) -OpenProcessToken.restype = wintypes.BOOL - -class LUID(ctypes.Structure): - _fields_ = [ - ('low_part', wintypes.DWORD), - ('high_part', wintypes.LONG), - ] - - def __eq__(self, other): - return ( - self.high_part == other.high_part and - self.low_part == other.low_part - ) - - def __ne__(self, other): - return not (self==other) - -LookupPrivilegeValue = ctypes.windll.advapi32.LookupPrivilegeValueW -LookupPrivilegeValue.argtypes = ( - wintypes.LPWSTR, # system name - wintypes.LPWSTR, # name - ctypes.POINTER(LUID), - ) -LookupPrivilegeValue.restype = wintypes.BOOL - -class TOKEN_INFORMATION_CLASS: - TokenUser = 1 - TokenGroups = 2 - TokenPrivileges = 3 - # ... see http://msdn.microsoft.com/en-us/library/aa379626%28VS.85%29.aspx - -SE_PRIVILEGE_ENABLED_BY_DEFAULT = (0x00000001) -SE_PRIVILEGE_ENABLED = (0x00000002) -SE_PRIVILEGE_REMOVED = (0x00000004) -SE_PRIVILEGE_USED_FOR_ACCESS = (0x80000000) - -class LUID_AND_ATTRIBUTES(ctypes.Structure): - _fields_ = [ - ('LUID', LUID), - ('attributes', wintypes.DWORD), - ] - - def is_enabled(self): - return bool(self.attributes & SE_PRIVILEGE_ENABLED) - - def enable(self): - self.attributes |= SE_PRIVILEGE_ENABLED - - def get_name(self): - size = wintypes.DWORD(10240) - buf = ctypes.create_unicode_buffer(size.value) - res = LookupPrivilegeName(None, self.LUID, buf, size) - if res == 0: - raise RuntimeError - return buf[:size.value] - - def __str__(self): - name = self.name - fmt = ['{name}', '{name} (enabled)'][self.is_enabled()] - return fmt.format(**vars()) - -LookupPrivilegeName = ctypes.windll.advapi32.LookupPrivilegeNameW -LookupPrivilegeName.argtypes = ( - wintypes.LPWSTR, # lpSystemName - ctypes.POINTER(LUID), # lpLuid - wintypes.LPWSTR, # lpName - ctypes.POINTER(wintypes.DWORD), #cchName - ) -LookupPrivilegeName.restype = wintypes.BOOL - -class TOKEN_PRIVILEGES(ctypes.Structure): - _fields_ = [ - ('count', wintypes.DWORD), - ('privileges', LUID_AND_ATTRIBUTES*0), - ] - - def get_array(self): - array_type = LUID_AND_ATTRIBUTES*self.count - privileges = ctypes.cast(self.privileges, - ctypes.POINTER(array_type)).contents - return privileges - - def __iter__(self): - return iter(self.get_array()) - -PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES) - -GetTokenInformation = ctypes.windll.advapi32.GetTokenInformation -GetTokenInformation.argtypes = [ - wintypes.HANDLE, # TokenHandle - ctypes.c_uint, # TOKEN_INFORMATION_CLASS value - ctypes.c_void_p, # TokenInformation - wintypes.DWORD, # TokenInformationLength - ctypes.POINTER(wintypes.DWORD), # ReturnLength - ] -GetTokenInformation.restype = wintypes.BOOL - -# http://msdn.microsoft.com/en-us/library/aa375202%28VS.85%29.aspx -AdjustTokenPrivileges = ctypes.windll.advapi32.AdjustTokenPrivileges -AdjustTokenPrivileges.restype = wintypes.BOOL -AdjustTokenPrivileges.argtypes = [ - wintypes.HANDLE, # TokenHandle - wintypes.BOOL, # DisableAllPrivileges - PTOKEN_PRIVILEGES, # NewState (optional) - wintypes.DWORD, # BufferLength of PreviousState - PTOKEN_PRIVILEGES, # PreviousState (out, optional) - ctypes.POINTER(wintypes.DWORD), # ReturnLength - ] - -def get_process_token(): - "Get the current process token" - token = wintypes.HANDLE() - TOKEN_ALL_ACCESS = 0xf01ff - res = OpenProcessToken(GetCurrentProcess(), TOKEN_ALL_ACCESS, token) - if not res > 0: - raise RuntimeError("Couldn't get process token") - return token - -def get_symlink_luid(): - "Get the LUID for the SeCreateSymbolicLinkPrivilege" - symlink_luid = LUID() - res = LookupPrivilegeValue(None, "SeCreateSymbolicLinkPrivilege", - symlink_luid) - if not res > 0: - raise RuntimeError("Couldn't lookup privilege value") - return symlink_luid - -def get_privilege_information(): - "Get all privileges associated with the current process." - # first call with zero length to determine what size buffer we need - - return_length = wintypes.DWORD() - params = [ - get_process_token(), - TOKEN_INFORMATION_CLASS.TokenPrivileges, - None, - 0, - return_length, - ] - - res = GetTokenInformation(*params) - - # assume we now have the necessary length in return_length - - buffer = ctypes.create_string_buffer(return_length.value) - params[2] = buffer - params[3] = return_length.value - - res = GetTokenInformation(*params) - assert res > 0, "Error in second GetTokenInformation (%d)" % res - - privileges = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents - return privileges - -def report_privilege_information(): - "Report all privilege information assigned to the current process." - privileges = get_privilege_information() - print("found {0} privileges".format(privileges.count)) - tuple(map(print, privileges)) - -def enable_symlink_privilege(): - """ - Try to assign the symlink privilege to the current process token. - Return True if the assignment is successful. - """ - # create a space in memory for a TOKEN_PRIVILEGES structure - # with one element - size = ctypes.sizeof(TOKEN_PRIVILEGES) - size += ctypes.sizeof(LUID_AND_ATTRIBUTES) - buffer = ctypes.create_string_buffer(size) - tp = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents - tp.count = 1 - tp.get_array()[0].enable() - tp.get_array()[0].LUID = get_symlink_luid() - token = get_process_token() - res = AdjustTokenPrivileges(token, False, tp, 0, None, None) - if res == 0: - raise RuntimeError("Error in AdjustTokenPrivileges") - - ERROR_NOT_ALL_ASSIGNED = 1300 - return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED - -def main(): - assigned = enable_symlink_privilege() - msg = ['failure', 'success'][assigned] - - print("Symlink privilege assignment completed with {0}".format(msg)) - -if __name__ == '__main__': - main() Index: Lib/test/test_os.py =================================================================== --- Lib/test/test_os.py (revision 83553) +++ Lib/test/test_os.py (working copy) @@ -522,7 +522,7 @@ f = open(path, "w") f.write("I'm " + path + " and proud of it. Blame test_os.\n") f.close() - if support.can_symlink(): + if hasattr(os, "symlink"): os.symlink(os.path.abspath(t2_path), link_path) sub2_tree = (sub2_path, ["link"], ["tmp3"]) else: @@ -566,7 +566,7 @@ self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"])) self.assertEqual(all[2 - 2 * flipped], sub2_tree) - if support.can_symlink(): + if hasattr(os, "symlink"): # Walk, following symlinks. for root, dirs, files in os.walk(walk_path, followlinks=True): if root == link_path: @@ -1035,14 +1035,8 @@ self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") -def skipUnlessWindows6(test): - if (hasattr(sys, 'getwindowsversion') - and sys.getwindowsversion().major >= 6): - return test - return unittest.skip("Requires Windows Vista or later")(test) - @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") -@support.skip_unless_symlink +@unittest.skipUnless(hasattr(os, "symlink"), "Requires symlink implementation") class Win32SymlinkTests(unittest.TestCase): filelink = 'filelinktest' filelink_target = os.path.abspath(__file__) Index: Lib/test/test_tarfile.py =================================================================== --- Lib/test/test_tarfile.py (revision 83553) +++ Lib/test/test_tarfile.py (working copy) @@ -293,7 +293,8 @@ @unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_extract_hardlink(self): # Test hardlink extraction (e.g. bug #857297). tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") @@ -698,7 +699,8 @@ os.remove(target) os.remove(link) - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_symlink_size(self): path = os.path.join(TEMPDIR, "symlink") os.symlink("link_target", path) Index: Lib/test/test_httpservers.py =================================================================== --- Lib/test/test_httpservers.py (revision 83553) +++ Lib/test/test_httpservers.py (working copy) @@ -299,7 +299,7 @@ # The shebang line should be pure ASCII: use symlink if possible. # See issue #7668. - if support.can_symlink(): + if hasattr(os, "symlink"): self.pythonexe = os.path.join(self.parent_dir, 'python') os.symlink(sys.executable, self.pythonexe) else: Index: Lib/test/test_glob.py =================================================================== --- Lib/test/test_glob.py (revision 83553) +++ Lib/test/test_glob.py (working copy) @@ -1,5 +1,5 @@ import unittest -from test.support import run_unittest, TESTFN, skip_unless_symlink, can_symlink +from test.support import run_unittest, TESTFN import glob import os import shutil @@ -25,7 +25,7 @@ self.mktemp('ZZZ') self.mktemp('a', 'bcd', 'EF') self.mktemp('a', 'bcd', 'efg', 'ha') - if can_symlink(): + if hasattr(os, "symlink"): os.symlink(self.norm('broken'), self.norm('sym1')) os.symlink(self.norm('broken'), self.norm('sym2')) @@ -98,7 +98,8 @@ # either of these results are reasonable self.assertIn(res[0], [self.tempdir, self.tempdir + os.sep]) - @skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_glob_broken_symlinks(self): eq = self.assertSequencesEqual_noorder eq(self.glob('sym*'), [self.norm('sym1'), self.norm('sym2')]) Index: Lib/test/test_platform.py =================================================================== --- Lib/test/test_platform.py (revision 83553) +++ Lib/test/test_platform.py (working copy) @@ -10,7 +10,8 @@ def test_architecture(self): res = platform.architecture() - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_architecture_via_symlink(self): # issue3762 # On Windows, the EXE needs to know where pythonXY.dll is at so we have # to add the directory to the path. Index: Lib/test/support.py =================================================================== --- Lib/test/support.py (revision 83553) +++ Lib/test/support.py (working copy) @@ -39,7 +39,7 @@ "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", "get_attribute", - "swap_item", "swap_attr", "can_symlink", "skip_unless_symlink"] + "swap_item", "swap_attr"] class Error(Exception): @@ -1169,27 +1169,6 @@ except: break -try: - from .symlink_support import enable_symlink_privilege -except: - enable_symlink_privilege = lambda: True - -def can_symlink(): - """It's no longer sufficient to test for the presence of symlink in the - os module - on Windows XP and earlier, os.symlink exists but a - NotImplementedError is thrown. - """ - has_symlink = hasattr(os, 'symlink') - is_old_windows = sys.platform == "win32" and sys.getwindowsversion().major < 6 - has_privilege = False if is_old_windows else enable_symlink_privilege() - return has_symlink and (not is_old_windows) and has_privilege - -def skip_unless_symlink(test): - """Skip decorator for tests that require functional symlink""" - selector = can_symlink() - msg = "Requires functional symlink implementation" - return [unittest.skip(msg)(test), test][selector] - @contextlib.contextmanager def swap_attr(obj, attr, new_val): """Temporary swap out an attribute with a new object. Index: Lib/test/test_posixpath.py =================================================================== --- Lib/test/test_posixpath.py (revision 83553) +++ Lib/test/test_posixpath.py (working copy) @@ -155,7 +155,7 @@ f.write(b"foo") f.close() self.assertIs(posixpath.islink(support.TESTFN + "1"), False) - if support.can_symlink(): + if hasattr(os, "symlink"): os.symlink(support.TESTFN + "1", support.TESTFN + "2") self.assertIs(posixpath.islink(support.TESTFN + "2"), True) os.remove(support.TESTFN + "1") @@ -180,7 +180,8 @@ @unittest.skipIf( sys.platform.startswith('win'), "posixpath.samefile does not work on links in Windows") - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_samefile_on_links(self): test_fn1 = support.TESTFN + "1" test_fn2 = support.TESTFN + "2" @@ -204,7 +205,8 @@ @unittest.skipIf( sys.platform.startswith('win'), "posixpath.samestat does not work on links in Windows") - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") def test_samestat_on_links(self): test_fn1 = support.TESTFN + "1" test_fn2 = support.TESTFN + "2" @@ -273,7 +275,8 @@ self.assertEqual(posixpath.normpath(b"///..//./foo/.//bar"), b"/foo/bar") - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash def test_realpath_basic(self): # Basic operation. @@ -283,7 +286,8 @@ finally: support.unlink(ABSTFN) - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash def test_realpath_symlink_loops(self): # Bug #930024, return the path unchanged if we get into an infinite @@ -307,7 +311,8 @@ support.unlink(ABSTFN+"1") support.unlink(ABSTFN+"2") - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash def test_realpath_resolve_parents(self): # We also need to resolve any symlinks in the parents of a relative @@ -328,7 +333,8 @@ safe_rmdir(ABSTFN + "/y") safe_rmdir(ABSTFN) - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash def test_realpath_resolve_before_normalizing(self): # Bug #990669: Symbolic links should be resolved before we @@ -358,7 +364,8 @@ safe_rmdir(ABSTFN + "/k") safe_rmdir(ABSTFN) - @support.skip_unless_symlink + @unittest.skipUnless(hasattr(os, "symlink"), + "Missing symlink implementation") @skip_if_ABSTFN_contains_backslash def test_realpath_resolve_first(self): # Bug #1213894: The first component of the path, if not absolute, Index: Modules/posixmodule.c =================================================================== --- Modules/posixmodule.c (revision 83553) +++ Modules/posixmodule.c (working copy) @@ -7653,8 +7653,8 @@ {"symlink", posix_symlink, METH_VARARGS, posix_symlink__doc__}, #endif /* HAVE_SYMLINK */ #if !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) - {"symlink", (PyCFunction)win_symlink, METH_VARARGS | METH_KEYWORDS, - win_symlink__doc__}, + {"_symlink", (PyCFunction)win_symlink, METH_VARARGS | METH_KEYWORDS, + win_symlink__doc__}, #endif /* !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) */ #ifdef HAVE_SYSTEM {"system", posix_system, METH_VARARGS, posix_system__doc__}, @@ -7967,6 +7967,47 @@ } #endif +#if !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) +void +enable_symlink() +{ + const char *priv = "SeCreateSymbolicLinkPrivilege"; + HANDLE tok; + TOKEN_PRIVILEGES tok_priv; + LUID luid; + int meth_idx = 0; + + if (!OpenProcessToken(GetCurrentProcess(), TOKEN_ALL_ACCESS, &tok)) + return; + + if (!LookupPrivilegeValue(NULL, priv, &luid)) + return; + + tok_priv.PrivilegeCount = 1; + tok_priv.Privileges[0].Luid = luid; + tok_priv.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED; + + if (!AdjustTokenPrivileges(tok, FALSE, &tok_priv, + sizeof(TOKEN_PRIVILEGES), + (PTOKEN_PRIVILEGES) NULL, (PDWORD) NULL)) + return; + + if(GetLastError() == ERROR_NOT_ALL_ASSIGNED) { + /* We couldn't acquire the necessary privilege, so leave the + method hidden for this user. */ + return; + } else { + /* We've successfully acquired the symlink privilege so rename + the method to it's proper "os.symlink" name. */ + while(posix_methods[meth_idx].ml_meth != (PyCFunction)win_symlink) + meth_idx++; + posix_methods[meth_idx].ml_name = "symlink"; + + return; + } +} +#endif /* !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) */ + static int all_ins(PyObject *d) { @@ -8220,6 +8261,10 @@ { PyObject *m, *v; +#if !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) + enable_symlink(); +#endif + m = PyModule_Create(&posixmodule); if (m == NULL) return NULL;