Index: Doc/library/os.rst =================================================================== --- Doc/library/os.rst (revision 79899) +++ Doc/library/os.rst (working copy) @@ -890,7 +890,7 @@ Like :func:`stat`, but do not follow symbolic links. This is an alias for :func:`stat` on platforms that do not support symbolic links, such as - Windows. + Windows prior to 6.0 (Vista). .. function:: mkfifo(path[, mode]) @@ -1000,7 +1000,7 @@ and the call may raise an UnicodeDecodeError. If the *path* is a bytes object, the result will be a bytes object. - Availability: Unix. + Availability: Unix, Windows. .. function:: remove(path) @@ -1154,10 +1154,26 @@ .. function:: symlink(source, link_name) - Create a symbolic link pointing to *source* named *link_name*. Availability: - Unix. + Create a symbolic link pointing to *source* named *link_name*. On Windows, + symlink version takes an additional, optional parameter, target_is_directory, + which defaults to False + + symlink(source, link_name, target_is_directory=False) + On Windows, a symlink represents a file or a directory, and does not + morph to the target dynamically. For this reason, when creating a + symlink on Windows, if the target is not already present, the symlink + will default to being a file symlink. If target_is_directory is set to + True, the symlink will be created as a directory symlink. This + parameter is ignored if the target exists (and the symlink is created + with the same type as the target). + Symbolic link support was introduced in Windows 6.0. *symlink* will raise + a NotImplementedError on Windows versions earlier than 6.0. + + Availability: Unix, Windows. + + .. function:: unlink(path) Remove (delete) the file *path*. This is the same function as Index: Doc/library/os.path.rst =================================================================== --- Doc/library/os.path.rst (revision 79899) +++ Doc/library/os.path.rst (working copy) @@ -228,9 +228,12 @@ .. function:: samefile(path1, path2) - Return ``True`` if both pathname arguments refer to the same file or directory - (as indicated by device number and i-node number). Raise an exception if a - :func:`os.stat` call on either pathname fails. Availability: Unix. + Return ``True`` if both pathname arguments refer to the same file or directory. + On Unix, this is determined by the device number and i-node number and raises an + exception if a :func:`os.stat` call on either pathname fails. On Windows, two + files are the same if they resolve to the same final path name using the Windows + API call GetFinalPathNameByHandle and this function raises an exception if + handles cannot be obtained to either file. Availability: Windows, Unix. .. function:: sameopenfile(fp1, fp2) Index: Lib/ntpath.py =================================================================== --- Lib/ntpath.py (revision 79899) +++ Lib/ntpath.py (working copy) @@ -16,7 +16,8 @@ "getatime","getctime", "islink","exists","lexists","isdir","isfile", "ismount", "expanduser","expandvars","normpath","abspath", "splitunc","curdir","pardir","sep","pathsep","defpath","altsep", - "extsep","devnull","realpath","supports_unicode_filenames","relpath"] + "extsep","devnull","realpath","supports_unicode_filenames","relpath", + "samefile",] # strings representing various path-related bits and pieces # These are primarily for export; internally, they are hardcoded. @@ -306,17 +307,29 @@ return split(p)[0] # Is a path a symbolic link? -# This will always return false on systems where posix.lstat doesn't exist. +# This will always return false on systems where os.lstat doesn't exist. def islink(path): - """Test for symbolic link. - On WindowsNT/95 and OS/2 always returns false + """Test whether a path is a symbolic link. + This will always return false for Windows prior to 6.0 + and for OS/2. """ - return False + try: + st = os.lstat(path) + except (os.error, AttributeError): + return False + return stat.S_ISLNK(st.st_mode) -# alias exists to lexists -lexists = exists +# Being true for dangling symbolic links is also useful. +def lexists(path): + """Test whether a path exists. Returns True for broken symbolic links""" + try: + st = os.lstat(path) + except (os.error, WindowsError): + return False + return True + # Is a path a mount point? Either a root (with or without drive letter) # or an UNC path with at most a / or \ after the mount point. @@ -609,3 +622,17 @@ if not rel_list: return _get_dot(path) return join(*rel_list) + + +# determine if two files are in fact the same file +def samefile(f1, f2): + "Test whether two pathnames reference the same actual file" + try: + from nt import _getfinalpathname + return _getfinalpathname(f1) == _getfinalpathname(f2) + except (NotImplementedError, ImportError): + # On Windows XP and earlier, two files are the same if their + # absolute pathnames are the same. + # Also, on other operating systems, fake this method with a + # Windows-XP approximation. + return abspath(f1) == abspath(f2) Index: Lib/tarfile.py =================================================================== --- Lib/tarfile.py (revision 79899) +++ Lib/tarfile.py (working copy) @@ -2230,7 +2230,10 @@ else: # See extract(). os.link(tarinfo._link_target, targetpath) - except AttributeError: + except (AttributeError, NotImplementedError, WindowsError): + # AttributeError if no os.symlink + # NotImplementedError if on Windows XP + # WindowsError (1314) if the required privilege is not held by the client if tarinfo.issym(): linkpath = os.path.dirname(tarinfo.name) + "/" + \ tarinfo.linkname Index: Lib/test/test_sysconfig.py =================================================================== --- Lib/test/test_sysconfig.py (revision 79899) +++ Lib/test/test_sysconfig.py (working copy) @@ -11,7 +11,7 @@ import shutil from copy import copy, deepcopy -from test.support import run_unittest, TESTFN, unlink, get_attribute +from test.support import (run_unittest, TESTFN, run_command_via_symlink,) import sysconfig from sysconfig import (get_paths, get_platform, get_config_vars, @@ -238,23 +238,10 @@ 'posix_prefix', 'posix_user') self.assertEquals(get_scheme_names(), wanted) - def test_symlink(self): - # Issue 7880 - symlink = get_attribute(os, "symlink") - def get(python): - cmd = [python, '-c', - 'import sysconfig; print(sysconfig.get_platform())'] - p = subprocess.Popen(cmd, stdout=subprocess.PIPE) - return p.communicate() - real = os.path.realpath(sys.executable) - link = os.path.abspath(TESTFN) - symlink(real, link) - try: - self.assertEqual(get(real), get(link)) - finally: - unlink(link) + # issue7880 + test_symlink = run_command_via_symlink( + 'import sysconfig; print(sysconfig.get_platform())') - def test_main(): run_unittest(TestSysConfig) Index: Lib/test/test_shutil.py =================================================================== --- Lib/test/test_shutil.py (revision 79899) +++ Lib/test/test_shutil.py (working copy) @@ -266,47 +266,49 @@ shutil.rmtree(src_dir) shutil.rmtree(os.path.dirname(dst_dir)) - if hasattr(os, "symlink"): - def test_dont_copy_file_onto_link_to_itself(self): - # bug 851123. - os.mkdir(TESTFN) - src = os.path.join(TESTFN, 'cheese') - dst = os.path.join(TESTFN, 'shop') - try: - f = open(src, 'w') - f.write('cheddar') - f.close() + @support.skip_unless_can_symlink + def test_dont_copy_file_onto_link_to_itself(self): + # bug 851123. + os.mkdir(TESTFN) + src = os.path.join(TESTFN, 'cheese') + dst = os.path.join(TESTFN, 'shop') + try: + f = open(src, 'w') + f.write('cheddar') + f.close() + if hasattr(os, "link"): os.link(src, dst) self.assertRaises(shutil.Error, shutil.copyfile, src, dst) self.assertEqual(open(src,'r').read(), 'cheddar') os.remove(dst) - # Using `src` here would mean we end up with a symlink pointing - # to TESTFN/TESTFN/cheese, while it should point at - # TESTFN/cheese. - os.symlink('cheese', dst) - self.assertRaises(shutil.Error, shutil.copyfile, src, dst) - self.assertEqual(open(src,'r').read(), 'cheddar') - os.remove(dst) - finally: - try: - shutil.rmtree(TESTFN) - except OSError: - pass - - def test_rmtree_on_symlink(self): - # bug 1669. - os.mkdir(TESTFN) + # Using `src` here would mean we end up with a symlink pointing + # to TESTFN/TESTFN/cheese, while it should point at + # TESTFN/cheese. + os.symlink('cheese', dst) + self.assertRaises(shutil.Error, shutil.copyfile, src, dst) + self.assertEqual(open(src,'r').read(), 'cheddar') + os.remove(dst) + finally: try: - src = os.path.join(TESTFN, 'cheese') - dst = os.path.join(TESTFN, 'shop') - os.mkdir(src) - os.symlink(src, dst) - self.assertRaises(OSError, shutil.rmtree, dst) - finally: - shutil.rmtree(TESTFN, ignore_errors=True) + shutil.rmtree(TESTFN) + except OSError: + pass + @support.skip_unless_can_symlink + def test_rmtree_on_symlink(self): + # bug 1669. + os.mkdir(TESTFN) + try: + src = os.path.join(TESTFN, 'cheese') + dst = os.path.join(TESTFN, 'shop') + os.mkdir(src) + os.symlink(src, dst) + self.assertRaises(OSError, shutil.rmtree, dst) + finally: + shutil.rmtree(TESTFN, ignore_errors=True) + if hasattr(os, "mkfifo"): # Issue #3002: copyfile and copytree block indefinitely on named pipes def test_copyfile_named_pipe(self): Index: Lib/test/symlink_support.py =================================================================== --- Lib/test/symlink_support.py (revision 0) +++ Lib/test/symlink_support.py (revision 0) @@ -0,0 +1,203 @@ +""" +A module built to test if the current process has the privilege to +create symlinks on Windows. +""" + +# allow script to run natively under python 2.6+ +from __future__ import print_function + +import ctypes +from ctypes import wintypes + +GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess +GetCurrentProcess.restype = wintypes.HANDLE +OpenProcessToken = ctypes.windll.advapi32.OpenProcessToken +OpenProcessToken.argtypes = (wintypes.HANDLE, wintypes.DWORD, ctypes.POINTER(wintypes.HANDLE)) +OpenProcessToken.restype = wintypes.BOOL + +class LUID(ctypes.Structure): + _fields_ = [ + ('low_part', wintypes.DWORD), + ('high_part', wintypes.LONG), + ] + + def __eq__(self, other): + return ( + self.high_part == other.high_part and + self.low_part == other.low_part + ) + + def __ne__(self, other): + return not (self==other) + +LookupPrivilegeValue = ctypes.windll.advapi32.LookupPrivilegeValueW +LookupPrivilegeValue.argtypes = ( + wintypes.LPWSTR, # system name + wintypes.LPWSTR, # name + ctypes.POINTER(LUID), + ) +LookupPrivilegeValue.restype = wintypes.BOOL + +class TOKEN_INFORMATION_CLASS: + TokenUser = 1 + TokenGroups = 2 + TokenPrivileges = 3 + # ... see http://msdn.microsoft.com/en-us/library/aa379626%28VS.85%29.aspx + +SE_PRIVILEGE_ENABLED_BY_DEFAULT = (0x00000001) +SE_PRIVILEGE_ENABLED = (0x00000002) +SE_PRIVILEGE_REMOVED = (0x00000004) +SE_PRIVILEGE_USED_FOR_ACCESS = (0x80000000) + +class LUID_AND_ATTRIBUTES(ctypes.Structure): + _fields_ = [ + ('LUID', LUID), + ('attributes', wintypes.DWORD), + ] + + def is_enabled(self): + return bool(self.attributes & SE_PRIVILEGE_ENABLED) + + def enable(self): + self.attributes |= SE_PRIVILEGE_ENABLED + + def get_name(self): + size = wintypes.DWORD(10240) + buf = ctypes.create_unicode_buffer(size.value) + res = LookupPrivilegeName(None, self.LUID, buf, size) + if res == 0: + raise RuntimeError + return buf[:size.value] + + def __str__(self): + name = self.name + fmt = ['{name}', '{name} (enabled)'][self.is_enabled()] + return fmt.format(**vars()) + +LookupPrivilegeName = ctypes.windll.advapi32.LookupPrivilegeNameW +LookupPrivilegeName.argtypes = ( + wintypes.LPWSTR, # lpSystemName + ctypes.POINTER(LUID), # lpLuid + wintypes.LPWSTR, # lpName + ctypes.POINTER(wintypes.DWORD), #cchName + ) +LookupPrivilegeName.restype = wintypes.BOOL + +class TOKEN_PRIVILEGES(ctypes.Structure): + _fields_ = [ + ('count', wintypes.DWORD), + ('privileges', LUID_AND_ATTRIBUTES*0), + ] + + def get_array(self): + array_type = LUID_AND_ATTRIBUTES*self.count + privileges = ctypes.cast(self.privileges, ctypes.POINTER(array_type)).contents + return privileges + + def __iter__(self): + return iter(self.get_array()) + +PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES) + +GetTokenInformation = ctypes.windll.advapi32.GetTokenInformation +GetTokenInformation.argtypes = [ + wintypes.HANDLE, # TokenHandle + ctypes.c_uint, # TOKEN_INFORMATION_CLASS value + ctypes.c_void_p, # TokenInformation + wintypes.DWORD, # TokenInformationLength + ctypes.POINTER(wintypes.DWORD), # ReturnLength + ] +GetTokenInformation.restype = wintypes.BOOL + +# http://msdn.microsoft.com/en-us/library/aa375202%28VS.85%29.aspx +AdjustTokenPrivileges = ctypes.windll.advapi32.AdjustTokenPrivileges +AdjustTokenPrivileges.restype = wintypes.BOOL +AdjustTokenPrivileges.argtypes = [ + wintypes.HANDLE, # TokenHandle + wintypes.BOOL, # DisableAllPrivileges + PTOKEN_PRIVILEGES, # NewState (optional) + wintypes.DWORD, # BufferLength of PreviousState + PTOKEN_PRIVILEGES, # PreviousState (out, optional) + ctypes.POINTER(wintypes.DWORD), # ReturnLength + ] + +def get_process_token(): + "Get the current process token" + token = wintypes.HANDLE() + TOKEN_ALL_ACCESS = 0xf01ff + res = OpenProcessToken(GetCurrentProcess(), TOKEN_ALL_ACCESS, token) + if not res > 0: + raise RuntimeError("Couldn't get process token") + return token + +def get_symlink_luid(): + "Get the LUID for the SeCreateSymbolicLinkPrivilege" + symlink_luid = LUID() + res = LookupPrivilegeValue(None, "SeCreateSymbolicLinkPrivilege", symlink_luid) + if not res > 0: + raise RuntimeError("Couldn't lookup privilege value") + return symlink_luid + +def get_privilege_information(): + "Get all privileges associated with the current process." + # first call with zero length to determine what size buffer we need + + return_length = wintypes.DWORD() + params = [ + get_process_token(), + TOKEN_INFORMATION_CLASS.TokenPrivileges, + None, + 0, + return_length, + ] + + res = GetTokenInformation(*params) + + # assume we now have the necessary length in return_length + + buffer = ctypes.create_string_buffer(return_length.value) + params[2] = buffer + params[3] = return_length.value + + res = GetTokenInformation(*params) + assert res > 0, "Error in second GetTokenInformation (%d)" % res + + privileges = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents + return privileges + +def report_privilege_information(): + "Report all privilege information assigned to the current process." + privileges = get_privilege_information() + print("found {0} privileges".format(privileges.count)) + tuple(map(print, privileges)) + +def enable_symlink_privilege(): + """ + Try to assign the symlink privilege to the current process token. + Return True if the assignment is successful. + """ + # create a space in memory for a TOKEN_PRIVILEGES structure + # with one element + size = ctypes.sizeof(TOKEN_PRIVILEGES) + size += ctypes.sizeof(LUID_AND_ATTRIBUTES) + buffer = ctypes.create_string_buffer(size) + tp = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents + tp.count = 1 + tp.get_array()[0].enable() + tp.get_array()[0].LUID = get_symlink_luid() + token = get_process_token() + res = AdjustTokenPrivileges(token, False, tp, 0, None, None) + if res == 0: + raise RuntimeError("Error in AdjustTokenPrivileges") + + ERROR_NOT_ALL_ASSIGNED = 1300 + return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED + +def main(): + assigned = enable_symlink_privilege() + msg = ['failure', 'success'][assigned] + + print("Symlink privilege assignment completed with {0}".format(msg)) + +if __name__ == '__main__': + main() Property changes on: Lib\test\symlink_support.py ___________________________________________________________________ Added: svn:keywords + Id Rev Author Date Index: Lib/test/test_os.py =================================================================== --- Lib/test/test_os.py (revision 79899) +++ Lib/test/test_os.py (working copy) @@ -464,7 +464,7 @@ f = open(path, "w") f.write("I'm " + path + " and proud of it. Blame test_os.\n") f.close() - if hasattr(os, "symlink"): + if support.can_symlink(): os.symlink(os.path.abspath(t2_path), link_path) sub2_tree = (sub2_path, ["link"], ["tmp3"]) else: @@ -508,7 +508,7 @@ self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"])) self.assertEqual(all[2 - 2 * flipped], sub2_tree) - if hasattr(os, "symlink"): + if support.can_symlink(): # Walk, following symlinks. for root, dirs, files in os.walk(walk_path, followlinks=True): if root == link_path: Index: Lib/test/test_tarfile.py =================================================================== --- Lib/test/test_tarfile.py (revision 79899) +++ Lib/test/test_tarfile.py (working copy) @@ -671,16 +671,16 @@ os.remove(target) os.remove(link) + @support.skip_unless_can_symlink def test_symlink_size(self): - if hasattr(os, "symlink"): - path = os.path.join(TEMPDIR, "symlink") - os.symlink("link_target", path) - try: - tar = tarfile.open(tmpname, self.mode) - tarinfo = tar.gettarinfo(path) - self.assertEqual(tarinfo.size, 0) - finally: - os.remove(path) + path = os.path.join(TEMPDIR, "symlink") + os.symlink("link_target", path) + try: + tar = tarfile.open(tmpname, self.mode) + tarinfo = tar.gettarinfo(path) + self.assertEqual(tarinfo.size, 0) + finally: + os.remove(path) def test_add_self(self): # Test for #1257255. Index: Lib/test/test_win_symlink.py =================================================================== --- Lib/test/test_win_symlink.py (revision 0) +++ Lib/test/test_win_symlink.py (revision 0) @@ -0,0 +1,85 @@ +import os +import sys +import stat +import platform + +import unittest + +def skipUnlessVistaOrGreater(test): + if hasattr(sys, 'getwindowsversion') and sys.getwindowsversion().major >= 6: + return test + return unittest.skip("Requires Windows Vista or later")(test) + +@unittest.skipUnless(platform.system() == 'Windows', "requires Windows") +@skipUnlessVistaOrGreater +class WindowsSymlinkTest(unittest.TestCase): + filelink = 'filelinktest' + filelink_target = os.path.abspath(__file__) + dirlink = 'dirlinktest' + dirlink_target = os.path.dirname(filelink_target) + missing_link = 'missing link' + + def setUp(self): + assert os.path.exists(self.dirlink_target) + assert os.path.exists(self.filelink_target) + assert not os.path.exists(self.dirlink) + assert not os.path.exists(self.filelink) + assert not os.path.exists(self.missing_link) + + def test_directory_link(self): + os.symlink(self.dirlink_target, self.dirlink) + self.assertTrue(os.path.exists(self.dirlink)) + self.assertTrue(os.path.isdir(self.dirlink)) + self.assertTrue(os.path.islink(self.dirlink)) + self.check_stat(self.dirlink, self.dirlink_target) + + def test_file_link(self): + os.symlink(self.filelink_target, self.filelink) + self.assertTrue(os.path.exists(self.filelink)) + self.assertTrue(os.path.isfile(self.filelink)) + self.assertTrue(os.path.islink(self.filelink)) + self.check_stat(self.filelink, self.filelink_target) + + def _create_missing_dir_link(self): + 'Create a "directory" link to a non-existent target' + linkname = self.missing_link + if os.path.lexists(linkname): + os.remove(linkname) + target = r'c:\\target does not exist.29r3c740' + assert not os.path.exists(target) + target_is_dir = True + os.symlink(target, linkname, target_is_dir) + + def test_remove_directory_link_to_missing_target(self): + self._create_missing_dir_link() + # For compatibility with Unix, os.remove will check the + # directory status and call RemoveDirectory if the symlink + # was created with target_is_dir==True. + os.remove(self.missing_link) + + @unittest.skip("currently fails; consider for improvement") + def test_isdir_on_directory_link_to_missing_target(self): + self._create_missing_dir_link() + # consider having isdir return true for directory links + self.assertTrue(os.path.isdir(self.missing_link)) + + @unittest.skip("currently fails; consider for improvement") + def test_rmdir_on_directory_link_to_missing_target(self): + self._create_missing_dir_link() + # consider allowing rmdir to remove directory links + os.rmdir(self.missing_link) + + def check_stat(self, link, target): + self.assertEqual(os.stat(link), os.stat(target)) + self.assertNotEqual(os.lstat(link), os.stat(link)) + + def tearDown(self): + if os.path.exists(self.filelink): + os.remove(self.filelink) + if os.path.exists(self.dirlink): + os.rmdir(self.dirlink) + if os.path.lexists(self.missing_link): + os.remove(self.missing_link) + +if __name__ == "__main__": + unittest.main() Property changes on: Lib\test\test_win_symlink.py ___________________________________________________________________ Added: svn:keywords + Id Rev Author Date Index: Lib/test/test_httpservers.py =================================================================== --- Lib/test/test_httpservers.py (revision 79899) +++ Lib/test/test_httpservers.py (working copy) @@ -300,7 +300,7 @@ # The shebang line should be pure ASCII: use symlink if possible. # See issue #7668. - if hasattr(os, 'symlink'): + if support.can_symlink(): self.pythonexe = os.path.join(self.parent_dir, 'python') os.symlink(sys.executable, self.pythonexe) else: Index: Lib/test/test_glob.py =================================================================== --- Lib/test/test_glob.py (revision 79899) +++ Lib/test/test_glob.py (working copy) @@ -1,5 +1,6 @@ import unittest from test.support import run_unittest, TESTFN +import test.support import glob import os import shutil @@ -25,7 +26,7 @@ self.mktemp('ZZZ') self.mktemp('a', 'bcd', 'EF') self.mktemp('a', 'bcd', 'efg', 'ha') - if hasattr(os, 'symlink'): + if test.support.can_symlink(): os.symlink(self.norm('broken'), self.norm('sym1')) os.symlink(self.norm('broken'), self.norm('sym2')) @@ -98,12 +99,12 @@ # either of these results are reasonable self.assertIn(res[0], [self.tempdir, self.tempdir + os.sep]) + @test.support.skip_unless_can_symlink def test_glob_broken_symlinks(self): - if hasattr(os, 'symlink'): - eq = self.assertSequencesEqual_noorder - eq(self.glob('sym*'), [self.norm('sym1'), self.norm('sym2')]) - eq(self.glob('sym1'), [self.norm('sym1')]) - eq(self.glob('sym2'), [self.norm('sym2')]) + eq = self.assertSequencesEqual_noorder + eq(self.glob('sym*'), [self.norm('sym1'), self.norm('sym2')]) + eq(self.glob('sym1'), [self.norm('sym1')]) + eq(self.glob('sym2'), [self.norm('sym2')]) def test_main(): Index: Lib/test/test_platform.py =================================================================== --- Lib/test/test_platform.py (revision 79899) +++ Lib/test/test_platform.py (working copy) @@ -6,24 +6,14 @@ from test import support + class PlatformTest(unittest.TestCase): def test_architecture(self): res = platform.architecture() - if hasattr(os, "symlink"): - def test_architecture_via_symlink(self): # issue3762 - def get(python): - cmd = [python, '-c', - 'import platform; print(platform.architecture())'] - p = subprocess.Popen(cmd, stdout=subprocess.PIPE) - return p.communicate() - real = os.path.realpath(sys.executable) - link = os.path.abspath(support.TESTFN) - os.symlink(real, link) - try: - self.assertEqual(get(real), get(link)) - finally: - os.remove(link) + # issue3762 + test_architecture_via_symlink = support.run_command_via_symlink( + 'import platform; print(platform.architecture())') def test_platform(self): for aliased in (False, True): Index: Lib/test/support.py =================================================================== --- Lib/test/support.py (revision 79899) +++ Lib/test/support.py (working copy) @@ -17,6 +17,7 @@ import importlib import collections import re +import subprocess __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", "verbose", "use_resources", "max_memuse", "record_original_stdout", @@ -31,7 +32,8 @@ "set_memlimit", "bigmemtest", "bigaddrspacetest", "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", "threading_cleanup", "reap_children", "cpython_only", "check_impl_detail", "get_attribute", - "swap_item", "swap_attr"] + "can_symlink", "skip_unless_can_symlink", "swap_item", "swap_attr", + "run_command_via_symlink",] class Error(Exception): @@ -1104,6 +1106,29 @@ except: break +try: + from symlink_support import enable_symlink_privilege as has_symlink_privilege +except: + has_symlink_privilege = lambda: True + +def can_symlink(): + """ + It's no longer sufficient to test for the presence of symlink + in the os module - on Windows XP and earlier, os.symlink exists + but a NotImplementedError is thrown. + """ + has_symlink_attr = hasattr(os, 'symlink') + is_windows = sys.platform.startswith('win') + is_old_windows = is_windows and sys.getwindowsversion().major < 6 + has_privilege = has_symlink_privilege() + return has_symlink_attr and (not is_old_windows) and has_privilege + +def skip_unless_can_symlink(test): + "skip decorator for tests that require functional symlink" + selector = can_symlink() + msg = "Cannot run without functional symlink implementation" + return [unittest.skip(msg)(test), test][selector] + @contextlib.contextmanager def swap_attr(obj, attr, new_val): """Temporary swap out an attribute with a new object. @@ -1157,3 +1182,36 @@ yield finally: del obj[item] + +def run_command_via_symlink(command): + """ + Return a test methad that runs the command in a separate Python + instance + using the canonical executable and also a symlink to that + executable and compares the results (which should be equal). + """ + @skip_unless_can_symlink + def test(self): + def alter_environment(): + "Python DLL needs to be in the path (see issue8342)" + env = dict(os.environ) + if sys.platform == 'win32': + dll_loc = os.path.dirname(sys.executable) + env['PATH'] = ';'.join([dll_loc, env['PATH']]) + return env + + def get(python): + cmd = [python, '-c', command] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, + env=alter_environment(), + ) + return p.communicate() + real = os.path.realpath(sys.executable) + link = os.path.abspath(TESTFN) + os.symlink(real, link) + try: + self.assertEqual(get(real), get(link)) + finally: + os.remove(link) + return test + Index: Lib/test/test_posixpath.py =================================================================== --- Lib/test/test_posixpath.py (revision 79899) +++ Lib/test/test_posixpath.py (working copy) @@ -1,7 +1,9 @@ import unittest from test import support, test_genericpath -import posixpath, os +import posixpath +import os +import sys from posixpath import realpath, abspath, dirname, basename # An absolute path to a temporary filename for testing. We can't rely on TESTFN @@ -9,6 +11,16 @@ ABSTFN = abspath(support.TESTFN) +def skip_if_ABSTFN_contains_backslash(test): + """ + On Windows, posixpath.abspath still returns paths with backslashes + instead of posix forward slashes. If this is the case, several tests + fail, so skip them. + """ + found_backslash = '\\' in ABSTFN + msg = "ABSTFN is not a posix path - tests fail" + return [test, unittest.skip(msg)(test)][found_backslash] + def safe_rmdir(dirname): try: os.rmdir(dirname) @@ -143,7 +155,7 @@ f.write(b"foo") f.close() self.assertIs(posixpath.islink(support.TESTFN + "1"), False) - if hasattr(os, "symlink"): + if support.can_symlink(): os.symlink(support.TESTFN + "1", support.TESTFN + "2") self.assertIs(posixpath.islink(support.TESTFN + "2"), True) os.remove(support.TESTFN + "1") @@ -154,86 +166,60 @@ if not f.close(): f.close() + @staticmethod + def _create_file(filename): + with open(filename, 'wb') as f: + f.write(b'foo') + def test_samefile(self): - f = open(support.TESTFN + "1", "wb") - try: - f.write(b"foo") - f.close() - self.assertIs( - posixpath.samefile( - support.TESTFN + "1", - support.TESTFN + "1" - ), - True - ) - # If we don't have links, assume that os.stat doesn't return resonable - # inode information and thus, that samefile() doesn't work - if hasattr(os, "symlink"): - os.symlink( - support.TESTFN + "1", - support.TESTFN + "2" - ) - self.assertIs( - posixpath.samefile( - support.TESTFN + "1", - support.TESTFN + "2" - ), - True - ) - os.remove(support.TESTFN + "2") - f = open(support.TESTFN + "2", "wb") - f.write(b"bar") - f.close() - self.assertIs( - posixpath.samefile( - support.TESTFN + "1", - support.TESTFN + "2" - ), - False - ) - finally: - if not f.close(): - f.close() + test_fn = support.TESTFN + "1" + self._create_file(test_fn) + self.assertTrue(posixpath.samefile(test_fn, test_fn)) + self.assertRaises(TypeError, posixpath.samefile) + @unittest.skipIf( + sys.platform.startswith('win'), + "posixpath.samefile does not work on links in Windows") + @support.skip_unless_can_symlink + def test_samefile_on_links(self): + test_fn1 = support.TESTFN + "1" + test_fn2 = support.TESTFN + "2" + self._create_file(test_fn1) + + os.symlink(test_fn1, test_fn2) + self.assertTrue(posixpath.samefile(test_fn1, test_fn2)) + os.remove(test_fn2) + + self._create_file(test_fn2) + self.assertFalse(posixpath.samefile(test_fn1, test_fn2)) + + def test_samestat(self): - f = open(support.TESTFN + "1", "wb") - try: - f.write(b"foo") - f.close() - self.assertIs( - posixpath.samestat( - os.stat(support.TESTFN + "1"), - os.stat(support.TESTFN + "1") - ), - True - ) - # If we don't have links, assume that os.stat() doesn't return resonable - # inode information and thus, that samefile() doesn't work - if hasattr(os, "symlink"): - if hasattr(os, "symlink"): - os.symlink(support.TESTFN + "1", support.TESTFN + "2") - self.assertIs( - posixpath.samestat( - os.stat(support.TESTFN + "1"), - os.stat(support.TESTFN + "2") - ), - True - ) - os.remove(support.TESTFN + "2") - f = open(support.TESTFN + "2", "wb") - f.write(b"bar") - f.close() - self.assertIs( - posixpath.samestat( - os.stat(support.TESTFN + "1"), - os.stat(support.TESTFN + "2") - ), - False - ) - finally: - if not f.close(): - f.close() + test_fn = support.TESTFN + "1" + self._create_file(test_fn) + test_fns = [test_fn]*2 + stats = map(os.stat, test_fns) + self.assertTrue(posixpath.samestat(*stats)) + @unittest.skipIf( + sys.platform.startswith('win'), + "posixpath.samestat does not work on links in Windows") + @support.skip_unless_can_symlink + def test_samestat_on_links(self): + test_fn1 = support.TESTFN + "1" + test_fn2 = support.TESTFN + "2" + test_fns = (test_fn1, test_fn2) + os.symlink(*test_fns) + stats = map(os.stat, test_fns) + self.assertTrue(posixpath.samestat(*stats)) + os.remove(test_fn2) + + self._create_file(test_fn2) + stats = map(os.stat, test_fns) + self.assertFalse(posixpath.samestat(*stats)) + + self.assertRaises(TypeError, posixpath.samestat) + def test_ismount(self): self.assertIs(posixpath.ismount("/"), True) @@ -286,103 +272,112 @@ self.assertEqual(posixpath.normpath(b"///..//./foo/.//bar"), b"/foo/bar") - if hasattr(os, "symlink"): - def test_realpath_basic(self): - # Basic operation. - try: - os.symlink(ABSTFN+"1", ABSTFN) - self.assertEqual(realpath(ABSTFN), ABSTFN+"1") - finally: - support.unlink(ABSTFN) + @support.skip_unless_can_symlink + @skip_if_ABSTFN_contains_backslash + def test_realpath_basic(self): + # Basic operation. + try: + os.symlink(ABSTFN+"1", ABSTFN) + self.assertEqual(realpath(ABSTFN), ABSTFN+"1") + finally: + support.unlink(ABSTFN) - def test_realpath_symlink_loops(self): - # Bug #930024, return the path unchanged if we get into an infinite - # symlink loop. - try: - old_path = abspath('.') - os.symlink(ABSTFN, ABSTFN) - self.assertEqual(realpath(ABSTFN), ABSTFN) + @support.skip_unless_can_symlink + @skip_if_ABSTFN_contains_backslash + def test_realpath_symlink_loops(self): + # Bug #930024, return the path unchanged if we get into an infinite + # symlink loop. + try: + old_path = abspath('.') + os.symlink(ABSTFN, ABSTFN) + self.assertEqual(realpath(ABSTFN), ABSTFN) - os.symlink(ABSTFN+"1", ABSTFN+"2") - os.symlink(ABSTFN+"2", ABSTFN+"1") - self.assertEqual(realpath(ABSTFN+"1"), ABSTFN+"1") - self.assertEqual(realpath(ABSTFN+"2"), ABSTFN+"2") + os.symlink(ABSTFN+"1", ABSTFN+"2") + os.symlink(ABSTFN+"2", ABSTFN+"1") + self.assertEqual(realpath(ABSTFN+"1"), ABSTFN+"1") + self.assertEqual(realpath(ABSTFN+"2"), ABSTFN+"2") - # Test using relative path as well. - os.chdir(dirname(ABSTFN)) - self.assertEqual(realpath(basename(ABSTFN)), ABSTFN) - finally: - os.chdir(old_path) - support.unlink(ABSTFN) - support.unlink(ABSTFN+"1") - support.unlink(ABSTFN+"2") + # Test using relative path as well. + os.chdir(dirname(ABSTFN)) + self.assertEqual(realpath(basename(ABSTFN)), ABSTFN) + finally: + os.chdir(old_path) + support.unlink(ABSTFN) + support.unlink(ABSTFN+"1") + support.unlink(ABSTFN+"2") - def test_realpath_resolve_parents(self): - # We also need to resolve any symlinks in the parents of a relative - # path passed to realpath. E.g.: current working directory is - # /usr/doc with 'doc' being a symlink to /usr/share/doc. We call - # realpath("a"). This should return /usr/share/doc/a/. - try: - old_path = abspath('.') - os.mkdir(ABSTFN) - os.mkdir(ABSTFN + "/y") - os.symlink(ABSTFN + "/y", ABSTFN + "/k") + @support.skip_unless_can_symlink + @skip_if_ABSTFN_contains_backslash + def test_realpath_resolve_parents(self): + # We also need to resolve any symlinks in the parents of a relative + # path passed to realpath. E.g.: current working directory is + # /usr/doc with 'doc' being a symlink to /usr/share/doc. We call + # realpath("a"). This should return /usr/share/doc/a/. + try: + old_path = abspath('.') + os.mkdir(ABSTFN) + os.mkdir(ABSTFN + "/y") + os.symlink(ABSTFN + "/y", ABSTFN + "/k") - os.chdir(ABSTFN + "/k") - self.assertEqual(realpath("a"), ABSTFN + "/y/a") - finally: - os.chdir(old_path) - support.unlink(ABSTFN + "/k") - safe_rmdir(ABSTFN + "/y") - safe_rmdir(ABSTFN) + os.chdir(ABSTFN + "/k") + self.assertEqual(realpath("a"), ABSTFN + "/y/a") + finally: + os.chdir(old_path) + support.unlink(ABSTFN + "/k") + safe_rmdir(ABSTFN + "/y") + safe_rmdir(ABSTFN) - def test_realpath_resolve_before_normalizing(self): - # Bug #990669: Symbolic links should be resolved before we - # normalize the path. E.g.: if we have directories 'a', 'k' and 'y' - # in the following hierarchy: - # a/k/y - # - # and a symbolic link 'link-y' pointing to 'y' in directory 'a', - # then realpath("link-y/..") should return 'k', not 'a'. - try: - old_path = abspath('.') - os.mkdir(ABSTFN) - os.mkdir(ABSTFN + "/k") - os.mkdir(ABSTFN + "/k/y") - os.symlink(ABSTFN + "/k/y", ABSTFN + "/link-y") + @support.skip_unless_can_symlink + @skip_if_ABSTFN_contains_backslash + def test_realpath_resolve_before_normalizing(self): + # Bug #990669: Symbolic links should be resolved before we + # normalize the path. E.g.: if we have directories 'a', 'k' and 'y' + # in the following hierarchy: + # a/k/y + # + # and a symbolic link 'link-y' pointing to 'y' in directory 'a', + # then realpath("link-y/..") should return 'k', not 'a'. + try: + old_path = abspath('.') + os.mkdir(ABSTFN) + os.mkdir(ABSTFN + "/k") + os.mkdir(ABSTFN + "/k/y") + os.symlink(ABSTFN + "/k/y", ABSTFN + "/link-y") - # Absolute path. - self.assertEqual(realpath(ABSTFN + "/link-y/.."), ABSTFN + "/k") - # Relative path. - os.chdir(dirname(ABSTFN)) - self.assertEqual(realpath(basename(ABSTFN) + "/link-y/.."), - ABSTFN + "/k") - finally: - os.chdir(old_path) - support.unlink(ABSTFN + "/link-y") - safe_rmdir(ABSTFN + "/k/y") - safe_rmdir(ABSTFN + "/k") - safe_rmdir(ABSTFN) + # Absolute path. + self.assertEqual(realpath(ABSTFN + "/link-y/.."), ABSTFN + "/k") + # Relative path. + os.chdir(dirname(ABSTFN)) + self.assertEqual(realpath(basename(ABSTFN) + "/link-y/.."), + ABSTFN + "/k") + finally: + os.chdir(old_path) + support.unlink(ABSTFN + "/link-y") + safe_rmdir(ABSTFN + "/k/y") + safe_rmdir(ABSTFN + "/k") + safe_rmdir(ABSTFN) - def test_realpath_resolve_first(self): - # Bug #1213894: The first component of the path, if not absolute, - # must be resolved too. + @support.skip_unless_can_symlink + @skip_if_ABSTFN_contains_backslash + def test_realpath_resolve_first(self): + # Bug #1213894: The first component of the path, if not absolute, + # must be resolved too. - try: - old_path = abspath('.') - os.mkdir(ABSTFN) - os.mkdir(ABSTFN + "/k") - os.symlink(ABSTFN, ABSTFN + "link") - os.chdir(dirname(ABSTFN)) + try: + old_path = abspath('.') + os.mkdir(ABSTFN) + os.mkdir(ABSTFN + "/k") + os.symlink(ABSTFN, ABSTFN + "link") + os.chdir(dirname(ABSTFN)) - base = basename(ABSTFN) - self.assertEqual(realpath(base + "link"), ABSTFN) - self.assertEqual(realpath(base + "link/k"), ABSTFN + "/k") - finally: - os.chdir(old_path) - support.unlink(ABSTFN + "link") - safe_rmdir(ABSTFN + "/k") - safe_rmdir(ABSTFN) + base = basename(ABSTFN) + self.assertEqual(realpath(base + "link"), ABSTFN) + self.assertEqual(realpath(base + "link/k"), ABSTFN + "/k") + finally: + os.chdir(old_path) + support.unlink(ABSTFN + "link") + safe_rmdir(ABSTFN + "/k") + safe_rmdir(ABSTFN) def test_relpath(self): (real_getcwd, os.getcwd) = (os.getcwd, lambda: r"/home/user/bar") Index: Modules/posixmodule.c =================================================================== --- Modules/posixmodule.c (revision 79899) +++ Modules/posixmodule.c (working copy) @@ -601,7 +601,7 @@ #ifdef MS_WINDOWS static PyObject * -win32_error(char* function, char* filename) +win32_error(char* function, const char* filename) { /* XXX We should pass the function name along in the future. (winreg.c also wants to pass the function name.) @@ -1007,12 +1007,31 @@ return TRUE; } +/* +About the following functions: win32_lstat, win32_lstat_w, + win32_stat, win32_stat_w + + In Posix, stat automatically traverses symlinks and returns + the stat structure for the target. In Windows, the equivalent + GetFileAttributes by default does not traverse symlinks and + instead returns attributes for the symlink. + + Therefore, win32_lstat will get the attributes traditionally, + and win32_stat will first explicitly resolve the symlink target + and then will call win32_lstat on that result. + + The _w represent Unicode equivalents of the aformentioned ANSI + functions. +*/ + static int -win32_stat(const char* path, struct win32_stat *result) +win32_lstat(const char* path, struct win32_stat *result) { WIN32_FILE_ATTRIBUTE_DATA info; int code; char *dot; + WIN32_FIND_DATAA find_data; + HANDLE find_data_handle; if (!GetFileAttributesExA(path, GetFileExInfoStandard, &info)) { if (GetLastError() != ERROR_SHARING_VIOLATION) { /* Protocol violation: we explicitly clear errno, instead of @@ -1032,6 +1051,25 @@ code = attribute_data_to_stat(&info, result); if (code != 0) return code; + + /* Get WIN32_FIND_DATA structure for the path to determine if + it is a symlink */ + if(info.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) + { + find_data_handle = FindFirstFileA(path, &find_data); + if(find_data_handle != INVALID_HANDLE_VALUE) + { + if(find_data.dwReserved0 == IO_REPARSE_TAG_SYMLINK) + { + /* first clear the S_IFMT bits */ + result->st_mode ^= (result->st_mode & 0170000); + /* now set the bits that make this a symlink */ + result->st_mode |= 0120000; + } + FindClose(find_data_handle); + } + } + /* Set S_IFEXEC if it is an .exe, .bat, ... */ dot = strrchr(path, '.'); if (dot) { @@ -1045,11 +1083,13 @@ } static int -win32_wstat(const wchar_t* path, struct win32_stat *result) +win32_lstat_w(const wchar_t* path, struct win32_stat *result) { int code; const wchar_t *dot; WIN32_FILE_ATTRIBUTE_DATA info; + WIN32_FIND_DATAW find_data; + HANDLE find_data_handle; if (!GetFileAttributesExW(path, GetFileExInfoStandard, &info)) { if (GetLastError() != ERROR_SHARING_VIOLATION) { /* Protocol violation: we explicitly clear errno, instead of @@ -1069,6 +1109,25 @@ code = attribute_data_to_stat(&info, result); if (code < 0) return code; + + /* Get WIN32_FIND_DATA structure for the path to determine if + it is a symlink */ + if(info.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) + { + find_data_handle = FindFirstFileW(path, &find_data); + if(find_data_handle != INVALID_HANDLE_VALUE) + { + if(find_data.dwReserved0 == IO_REPARSE_TAG_SYMLINK) + { + /* first clear the S_IFMT bits */ + result->st_mode ^= (result->st_mode & 0170000); + /* now set the bits that make this a symlink */ + result->st_mode |= 0120000; + } + FindClose(find_data_handle); + } + } + /* Set IFEXEC if it is an .exe, .bat, ... */ dot = wcsrchr(path, '.'); if (dot) { @@ -1081,7 +1140,167 @@ return code; } +/* Grab GetFinalPathNameByHandle dynamically from kernel32 */ +static int has_GetFinalPathNameByHandle = 0; +static DWORD (CALLBACK *Py_GetFinalPathNameByHandleA)(HANDLE, LPSTR, DWORD, DWORD); +static DWORD (CALLBACK *Py_GetFinalPathNameByHandleW)(HANDLE, LPWSTR, DWORD, DWORD); static int +check_GetFinalPathNameByHandle() +{ + HINSTANCE hKernel32; + /* only recheck */ + if (!has_GetFinalPathNameByHandle) + { + hKernel32 = GetModuleHandle("KERNEL32"); + *(FARPROC*)&Py_GetFinalPathNameByHandleA = GetProcAddress(hKernel32, "GetFinalPathNameByHandleA"); + *(FARPROC*)&Py_GetFinalPathNameByHandleW = GetProcAddress(hKernel32, "GetFinalPathNameByHandleW"); + has_GetFinalPathNameByHandle = Py_GetFinalPathNameByHandleA && Py_GetFinalPathNameByHandleW; + } + return has_GetFinalPathNameByHandle; +} + +static int +win32_stat(const char* path, struct win32_stat *result) +{ + /* Traverse the symlink to the target using + GetFinalPathNameByHandle() + */ + int code; + HANDLE hFile; + int buf_size; + char *target_path; + int result_length; + WIN32_FILE_ATTRIBUTE_DATA info; + + if(!check_GetFinalPathNameByHandle()) + { + /* if the OS doesn't have GetFinalPathNameByHandle, it doesn't + have symlinks, so just fall back to the traditional behavior + found in lstat. */ + return win32_lstat(path, result); + } + + hFile = CreateFileA( + path, + 0, /* desired access */ + 0, /* share mode */ + NULL, /* security attributes */ + OPEN_EXISTING, + /* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */ + FILE_FLAG_BACKUP_SEMANTICS, + NULL); + + if(hFile == INVALID_HANDLE_VALUE) + { + /* Either the target doesn't exist, or we don't have access to + * get a handle to it. If the former, we need to return an error. + * If the latter, we can use attributes_from_dir. + */ + if (GetLastError() != ERROR_SHARING_VIOLATION) { + /* Protocol violation: we explicitly clear errno, instead of + setting it to a POSIX error. Callers should use GetLastError. */ + errno = 0; + return -1; + } else { + /* Could not get attributes on open file. Fall back to + reading the directory. */ + if (!attributes_from_dir(path, &info)) { + /* Very strange. This should not fail now */ + errno = 0; + return -1; + } + } + code = attribute_data_to_stat(&info, result); + } + + buf_size = Py_GetFinalPathNameByHandleA(hFile, 0, 0, VOLUME_NAME_DOS); + if(!buf_size) return -1; + target_path = (char *)malloc((buf_size+1)*sizeof(char)); + result_length = Py_GetFinalPathNameByHandleA(hFile, target_path, buf_size, VOLUME_NAME_DOS); + + if(!result_length) return -1; + if(!CloseHandle(hFile)) return -1; + target_path[result_length] = 0; + code = win32_lstat(target_path, result); + free(target_path); + + return code; +} + +static int +win32_stat_w(const wchar_t* path, struct win32_stat *result) +{ + /* Traverse the symlink to the target using + GetFinalPathNameByHandle() + */ + int code; + HANDLE hFile; + int buf_size; + wchar_t *target_path; + int result_length; + WIN32_FILE_ATTRIBUTE_DATA info; + + if(!check_GetFinalPathNameByHandle()) + { + /* if the OS doesn't have GetFinalPathNameByHandle, it doesn't + have symlinks, so just fall back to the traditional behavior + found in lstat. */ + return win32_lstat_w(path, result); + } + + hFile = CreateFileW( + path, + 0, /* desired access */ + 0, /* share mode */ + NULL, /* security attributes */ + OPEN_EXISTING, + /* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */ + FILE_ATTRIBUTE_NORMAL|FILE_FLAG_BACKUP_SEMANTICS, + NULL); + + if(hFile == INVALID_HANDLE_VALUE) + { + /* Either the target doesn't exist, or we don't have access to + * get a handle to it. If the former, we need to return an error. + * If the latter, we can use attributes_from_dir. + */ + if (GetLastError() != ERROR_SHARING_VIOLATION) { + /* Protocol violation: we explicitly clear errno, instead of + setting it to a POSIX error. Callers should use GetLastError. */ + errno = 0; + return -1; + } else { + /* Could not get attributes on open file. Fall back to + reading the directory. */ + if (!attributes_from_dir_w(path, &info)) { + /* Very strange. This should not fail now */ + errno = 0; + return -1; + } + } + code = attribute_data_to_stat(&info, result); + } + else /* hFile != INVALID_HANDLE_VALUE */ + { + /* We have a good handle to the target, use it to determine the + * target path name (then we'll call lstat on it). + */ + buf_size = Py_GetFinalPathNameByHandleW(hFile, 0, 0, VOLUME_NAME_DOS); + if(!buf_size) return -1; + target_path = (wchar_t *)malloc((buf_size+1)*sizeof(wchar_t)); + result_length = Py_GetFinalPathNameByHandleW(hFile, target_path, buf_size, VOLUME_NAME_DOS); + + if(!result_length) return -1; + if(!CloseHandle(hFile)) return -1; + target_path[result_length] = 0; + code = win32_lstat_w(target_path, result); + free(target_path); + } + + return code; +} + +static int win32_fstat(int file_number, struct win32_stat *result) { BY_HANDLE_FILE_INFORMATION info; @@ -2488,6 +2707,64 @@ } return PyBytes_FromString(outbuf); } /* end of posix__getfullpathname */ + +/* A helper function for samepath on windows */ +static PyObject * +posix__getfinalpathname(PyObject *self, PyObject *args) +{ + HANDLE hFile; + int buf_size; + wchar_t *target_path; + int result_length; + PyObject *result; + wchar_t *path; + + if (!PyArg_ParseTuple(args, "u|:_getfullpathname", &path)) { + return NULL; + } + + if(!check_GetFinalPathNameByHandle()) + { + /* if the OS doesn't have GetFinalPathNameByHandle, return a + NotImplementedError. */ + return PyErr_Format(PyExc_NotImplementedError, + "GetFinalPathNameByHandle not available on this platform"); + } + + hFile = CreateFileW( + path, + 0, /* desired access */ + 0, /* share mode */ + NULL, /* security attributes */ + OPEN_EXISTING, + /* FILE_FLAG_BACKUP_SEMANTICS is required to open a directory */ + FILE_FLAG_BACKUP_SEMANTICS, + NULL); + + if(hFile == INVALID_HANDLE_VALUE) + { + return win32_error_unicode("GetFinalPathNamyByHandle", path); + return PyErr_Format(PyExc_RuntimeError, "Could not get a handle to file."); + } + + /* + * We have a good handle to the target, use it to determine the + * target path name. + */ + buf_size = Py_GetFinalPathNameByHandleW(hFile, 0, 0, VOLUME_NAME_NT); + if(!buf_size) return win32_error_unicode("GetFinalPathNameByHandle", path); + target_path = (wchar_t *)malloc((buf_size+1)*sizeof(wchar_t)); + if(!target_path) return PyErr_NoMemory(); + result_length = Py_GetFinalPathNameByHandleW(hFile, target_path, buf_size, VOLUME_NAME_DOS); + + if(!result_length) return win32_error_unicode("GetFinalPathNamyByHandle", path); + if(!CloseHandle(hFile)) return win32_error_unicode("GetFinalPathNameByHandle", path); + target_path[result_length] = 0; + result = PyUnicode_FromUnicode(target_path, result_length); + free(target_path); + return result; + +} /* end of posix__getfinalpathname */ #endif /* MS_WINDOWS */ PyDoc_STRVAR(posix_mkdir__doc__, @@ -2668,7 +2945,7 @@ posix_stat(PyObject *self, PyObject *args) { #ifdef MS_WINDOWS - return posix_do_stat(self, args, "O&:stat", STAT, "U:stat", win32_wstat); + return posix_do_stat(self, args, "O&:stat", STAT, "U:stat", win32_stat_w); #else return posix_do_stat(self, args, "O&:stat", STAT, NULL, NULL); #endif @@ -2721,7 +2998,44 @@ return PyLong_FromLong((long)i); } +#ifdef MS_WINDOWS +/* override the default DeleteFileW behavior so that directory +symlinks can be removed with this function, the same as with +Unix symlinks */ +BOOL WINAPI Py_DeleteFileW(LPCWSTR lpFileName) +{ + WIN32_FILE_ATTRIBUTE_DATA info; + WIN32_FIND_DATAW find_data; + HANDLE find_data_handle; + int is_directory = 0; + int is_link = 0; + + if (GetFileAttributesExW(lpFileName, GetFileExInfoStandard, &info)) { + is_directory = info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY; + + /* Get WIN32_FIND_DATA structure for the path to determine if + it is a symlink */ + if(is_directory && + info.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) + { + find_data_handle = FindFirstFileW(lpFileName, &find_data); + if(find_data_handle != INVALID_HANDLE_VALUE) + { + is_link = find_data.dwReserved0 == IO_REPARSE_TAG_SYMLINK; + FindClose(find_data_handle); + } + } + } + + if (is_directory && is_link) + { + return RemoveDirectoryW(lpFileName); + } + return DeleteFileW(lpFileName); +} +#endif /* MS_WINDOWS */ + PyDoc_STRVAR(posix_unlink__doc__, "unlink(path)\n\n\ Remove a file (same as remove(path))."); @@ -2734,7 +3048,7 @@ posix_unlink(PyObject *self, PyObject *args) { #ifdef MS_WINDOWS - return win32_1str(args, "remove", "y:remove", DeleteFileA, "U:remove", DeleteFileW); + return win32_1str(args, "remove", "y:remove", DeleteFileA, "U:remove", Py_DeleteFileW); #else return posix_1str(args, "O&:remove", unlink); #endif @@ -4614,7 +4928,7 @@ return posix_do_stat(self, args, "O&:lstat", lstat, NULL, NULL); #else /* !HAVE_LSTAT */ #ifdef MS_WINDOWS - return posix_do_stat(self, args, "O&:lstat", STAT, "U:lstat", win32_wstat); + return posix_do_stat(self, args, "O&:lstat", STAT, "U:lstat", win32_lstat_w); #else return posix_do_stat(self, args, "O&:lstat", STAT, NULL, NULL); #endif @@ -4691,7 +5005,193 @@ } #endif /* HAVE_SYMLINK */ +#if !defined(HAVE_READLINK) && defined(MS_WINDOWS) +PyDoc_STRVAR(win_readlink__doc__, +"readlink(path) -> path\n\n\ +Return a string representing the path to which the symbolic link points."); + +/* The following structure was copied from http://msdn.microsoft.com/en-us/library/ms791514.aspx + * as the required include doesn't seem to be present in the Windows SDK (at least as included + * with Visual Studio Express). + * #include "ntifs.h" + */ +typedef struct _REPARSE_DATA_BUFFER { + ULONG ReparseTag; + USHORT ReparseDataLength; + USHORT Reserved; + union { + struct { + USHORT SubstituteNameOffset; + USHORT SubstituteNameLength; + USHORT PrintNameOffset; + USHORT PrintNameLength; + ULONG Flags; + WCHAR PathBuffer[1]; + } SymbolicLinkReparseBuffer; + struct { + USHORT SubstituteNameOffset; + USHORT SubstituteNameLength; + USHORT PrintNameOffset; + USHORT PrintNameLength; + WCHAR PathBuffer[1]; + } MountPointReparseBuffer; + struct { + UCHAR DataBuffer[1]; + } GenericReparseBuffer; + }; +} REPARSE_DATA_BUFFER, *PREPARSE_DATA_BUFFER; + +#define REPARSE_DATA_BUFFER_HEADER_SIZE FIELD_OFFSET(REPARSE_DATA_BUFFER, GenericReparseBuffer) + +#define MAXIMUM_REPARSE_DATA_BUFFER_SIZE ( 16 * 1024 ) + +/* Windows readlink implementation */ +static PyObject * +win_readlink(PyObject *self, PyObject *args) +{ + wchar_t *path; + DWORD n_bytes_returned; + DWORD io_result; + PyObject *result; + HANDLE reparse_point_handle; + + char target_buffer[MAXIMUM_REPARSE_DATA_BUFFER_SIZE]; + REPARSE_DATA_BUFFER *rdb = (REPARSE_DATA_BUFFER *)target_buffer; + wchar_t *print_name; + + if (!PyArg_ParseTuple(args, + "u:readlink", + &path)) + return NULL; + + /* First get a handle to the reparse point */ + Py_BEGIN_ALLOW_THREADS + reparse_point_handle = CreateFileW( + path, + 0, + 0, + 0, + OPEN_EXISTING, + FILE_FLAG_OPEN_REPARSE_POINT|FILE_FLAG_BACKUP_SEMANTICS, + 0); + Py_END_ALLOW_THREADS + + if (reparse_point_handle==INVALID_HANDLE_VALUE) + { + return win32_error_unicode("readlink", path); + } + + Py_BEGIN_ALLOW_THREADS + /* New call DeviceIoControl to read the reparse point */ + io_result = DeviceIoControl( + reparse_point_handle, + FSCTL_GET_REPARSE_POINT, + 0, 0, /* in buffer */ + target_buffer, sizeof(target_buffer), + &n_bytes_returned, + 0 /* we're not using OVERLAPPED_IO */ + ); + CloseHandle(reparse_point_handle); + Py_END_ALLOW_THREADS + + if (io_result==0) + { + return win32_error_unicode("readlink", path); + } + + if (rdb->ReparseTag != IO_REPARSE_TAG_SYMLINK) + { + PyErr_SetString(PyExc_ValueError, + "not a symbolic link"); + return NULL; + } + print_name = rdb->SymbolicLinkReparseBuffer.PathBuffer + rdb->SymbolicLinkReparseBuffer.PrintNameOffset; + result = PyUnicode_FromWideChar(print_name, rdb->SymbolicLinkReparseBuffer.PrintNameLength/2); + return result; +} + +#endif /* !defined(HAVE_READLINK) && defined(MS_WINDOWS) */ + +#if !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) + +/* Grab CreateSymbolicLinkW dynamically from kernel32 */ +static int has_CreateSymbolicLinkW = 0; +static DWORD (CALLBACK *Py_CreateSymbolicLinkW)(LPWSTR, LPWSTR, DWORD); +static int +check_CreateSymbolicLinkW() +{ + HINSTANCE hKernel32; + /* only recheck */ + if (has_CreateSymbolicLinkW) + return has_CreateSymbolicLinkW; + hKernel32 = GetModuleHandle("KERNEL32"); + *(FARPROC*)&Py_CreateSymbolicLinkW = GetProcAddress(hKernel32, "CreateSymbolicLinkW"); + if (Py_CreateSymbolicLinkW) + has_CreateSymbolicLinkW = 1; + return has_CreateSymbolicLinkW; +} + +PyDoc_STRVAR(win_symlink__doc__, +"symlink(src, dst, target_is_directory=False)\n\n\ +Create a symbolic link pointing to src named dst.\n\ +target_is_directory is required if the target is to be interpreted as\n\ +a directory.\n\ +This function requires Windows 6.0 or greater, and raises a\n\ +NotImplementedError otherwise."); + +static PyObject * +win_symlink(PyObject *self, PyObject *args, PyObject *kwargs) +{ + static char *kwlist[] = {"src", "dest", "target_is_directory", NULL}; + PyObject *src, *dest; + int target_is_directory = 0; + DWORD res; + WIN32_FILE_ATTRIBUTE_DATA src_info; + + if (!check_CreateSymbolicLinkW()) + { + /* raise NotImplementedError */ + return PyErr_Format(PyExc_NotImplementedError, + "CreateSymbolicLinkW not found"); + } + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|i:symlink", + kwlist, &src, &dest, &target_is_directory)) + return NULL; + if (!convert_to_unicode(&src)) { return NULL; } + if (!convert_to_unicode(&dest)) { + Py_DECREF(src); + return NULL; + } + + /* if src is a directory, ensure target_is_directory==1 */ + if( + GetFileAttributesExW( + PyUnicode_AsUnicode(src), GetFileExInfoStandard, &src_info + )) + { + target_is_directory = target_is_directory || + (src_info.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY); + } + + Py_BEGIN_ALLOW_THREADS + res = Py_CreateSymbolicLinkW( + PyUnicode_AsUnicode(dest), + PyUnicode_AsUnicode(src), + target_is_directory); + Py_END_ALLOW_THREADS + Py_DECREF(src); + Py_DECREF(dest); + if (!res) + { + return win32_error_unicode("symlink", PyUnicode_AsUnicode(src)); + } + + Py_INCREF(Py_None); + return Py_None; +} +#endif /* !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) */ + #ifdef HAVE_TIMES #if defined(PYCC_VACPP) && defined(PYOS_OS2) static long @@ -7119,6 +7619,9 @@ #ifdef HAVE_READLINK {"readlink", posix_readlink, METH_VARARGS, posix_readlink__doc__}, #endif /* HAVE_READLINK */ +#if !defined(HAVE_READLINK) && defined(MS_WINDOWS) + {"readlink", win_readlink, METH_VARARGS, win_readlink__doc__}, +#endif /* !defined(HAVE_READLINK) && defined(MS_WINDOWS) */ {"rename", posix_rename, METH_VARARGS, posix_rename__doc__}, {"rmdir", posix_rmdir, METH_VARARGS, posix_rmdir__doc__}, {"stat", posix_stat, METH_VARARGS, posix_stat__doc__}, @@ -7126,6 +7629,9 @@ #ifdef HAVE_SYMLINK {"symlink", posix_symlink, METH_VARARGS, posix_symlink__doc__}, #endif /* HAVE_SYMLINK */ +#if !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) + {"symlink", (PyCFunction)win_symlink, METH_VARARGS | METH_KEYWORDS, win_symlink__doc__}, +#endif /* !defined(HAVE_SYMLINK) && defined(MS_WINDOWS) */ #ifdef HAVE_SYSTEM {"system", posix_system, METH_VARARGS, posix_system__doc__}, #endif @@ -7349,6 +7855,7 @@ {"abort", posix_abort, METH_NOARGS, posix_abort__doc__}, #ifdef MS_WINDOWS {"_getfullpathname", posix__getfullpathname, METH_VARARGS, NULL}, + {"_getfinalpathname", posix__getfinalpathname, METH_VARARGS, NULL}, #endif #ifdef HAVE_GETLOADAVG {"getloadavg", posix_getloadavg, METH_NOARGS, posix_getloadavg__doc__},