[PATCH 4/7] rtemstoolkit/path: Merge RSB changes.
chrisj at rtems.org
chrisj at rtems.org
Sun Jun 9 00:24:59 UTC 2019
From: Chris Johns <chrisj at rtems.org>
---
rtemstoolkit/path.py | 294 ++++++++++++++++++++++++++++++++++---------
1 file changed, 233 insertions(+), 61 deletions(-)
diff --git a/rtemstoolkit/path.py b/rtemstoolkit/path.py
index 845dfe8..9401e99 100644
--- a/rtemstoolkit/path.py
+++ b/rtemstoolkit/path.py
@@ -40,37 +40,60 @@ import glob
import os
import shutil
import string
+import sys
from rtemstoolkit import error
from rtemstoolkit import log
-
+windows_posix = sys.platform == 'msys'
windows = os.name == 'nt'
+win_maxpath = 254
+
def host(path):
if path is not None:
while '//' in path:
path = path.replace('//', '/')
- if windows and len(path) > 2:
- if path[0] == '/' and path[2] == '/' and \
- (path[1] in string.ascii_lowercase or \
- path[1] in string.ascii_uppercase):
- path = ('%s:%s' % (path[1], path[2:])).replace('/', '\\')
+ if windows:
+ if len(path) > 2 and \
+ path[0] == '/' and path[2] == '/' and \
+ (path[1] in string.ascii_lowercase or \
+ path[1] in string.ascii_uppercase):
+ path = '%s:%s' % (path[1], path[2:])
+ path = path.replace('/', '\\')
+ if len(path) > win_maxpath:
+ if path.startswith('\\\\?\\'):
+ path = path[4:]
+ path = u'\\'.join([u'\\\\?', path])
return path
def shell(path):
+ if isinstance(path, bytes):
+ path = path.decode('ascii')
if path is not None:
- if windows and len(path) > 1 and path[1] == ':':
- path = ('/%s%s' % (path[0], path[2:])).replace('\\', '/')
+ if windows or windows_posix:
+ path = path.encode('ascii', 'ignore').decode('ascii')
+ if path.startswith('\\\\?\\'):
+ path = path[4:]
+ if len(path) > 1 and path[1] == ':':
+ path = '/%s%s' % (path[0].lower(), path[2:])
+ path = path.replace('\\', '/')
while '//' in path:
path = path.replace('//', '/')
return path
def basename(path):
+ path = shell(path)
return shell(os.path.basename(path))
def dirname(path):
+ path = shell(path)
return shell(os.path.dirname(path))
+def is_abspath(path):
+ if path is not None and len(path) > 0:
+ return '/' == path[0]
+ return False
+
def join(path, *args):
path = shell(path)
for arg in args:
@@ -81,36 +104,91 @@ def join(path, *args):
return shell(path)
def abspath(path):
+ path = shell(path)
return shell(os.path.abspath(host(path)))
+def relpath(path, start = None):
+ path = shell(path)
+ if start is None:
+ path = os.path.relpath(host(path))
+ else:
+ path = os.path.relpath(host(path), start)
+ return shell(path)
+
def splitext(path):
+ path = shell(path)
root, ext = os.path.splitext(host(path))
return shell(root), ext
+def listdir(path, error = True):
+ path = host(path)
+ files = []
+ if not os.path.exists(path):
+ if error:
+ raise error.general('path does not exist : %s' % (path))
+ elif not isdir(path):
+ if error:
+ raise error.general('path is not a directory: %s' % (path))
+ else:
+ if windows:
+ try:
+ files = os.listdir(host(path))
+ except IOError:
+ raise error.general('Could not list files: %s' % (path))
+ except OSError as e:
+ raise error.general('Could not list files: %s: %s' % (path, str(e)))
+ except WindowsError as e:
+ raise error.general('Could not list files: %s: %s' % (path, str(e)))
+ else:
+ try:
+ files = os.listdir(host(path))
+ except IOError:
+ raise error.general('Could not list files: %s' % (path))
+ except OSError as e:
+ raise error.general('Could not list files: %s: %s' % (path, str(e)))
+ return files
+
def exists(paths):
+ def _exists(p):
+ if not is_abspath(p):
+ p = shell(join(os.getcwd(), host(p)))
+ return basename(p) in ['.'] + listdir(dirname(p), error = False)
+
if type(paths) == list:
results = []
for p in paths:
- results += [os.path.exists(host(p))]
+ results += [_exists(shell(p))]
return results
- return os.path.exists(host(paths))
+ return _exists(shell(paths))
def isdir(path):
+ path = shell(path)
return os.path.isdir(host(path))
def isfile(path):
+ path = shell(path)
return os.path.isfile(host(path))
def isabspath(path):
+ path = shell(path)
return path[0] == '/'
+def isreadable(path):
+ path = shell(path)
+ return os.access(host(path), os.R_OK)
+
def iswritable(path):
+ path = shell(path)
return os.access(host(path), os.W_OK)
+def isreadwritable(path):
+ path = shell(path)
+ return isreadable(path) and iswritable(path)
+
def ispathwritable(path):
path = host(path)
while len(path) != 0:
- if os.path.exists(path):
+ if exists(path):
return iswritable(path)
path = os.path.dirname(path)
return False
@@ -138,18 +216,55 @@ def mkdir(path):
except OSError as e:
raise error.general('cannot make directory: %s: %s' % (path, str(e)))
+def chdir(path):
+ path = shell(path)
+ os.chdir(host(path))
+
def removeall(path):
+ #
+ # Perform the removal of the directory tree manually so we can
+ # make sure on Windows the files are correctly encoded to avoid
+ # the file name size limit. On Windows the os.walk fails once we
+ # get to the max path length.
+ #
+ def _isdir(path):
+ hpath = host(path)
+ return os.path.isdir(hpath) and not os.path.islink(hpath)
+
+ def _remove_node(path):
+ hpath = host(path)
+ if not os.path.islink(hpath) and not os.access(hpath, os.W_OK):
+ os.chmod(hpath, stat.S_IWUSR)
+ if _isdir(path):
+ os.rmdir(hpath)
+ else:
+ os.unlink(hpath)
+
+ def _remove(path):
+ dirs = []
+ for name in listdir(path):
+ path_ = join(path, name)
+ hname = host(path_)
+ if _isdir(path_):
+ dirs += [name]
+ else:
+ _remove_node(path_)
+ for name in dirs:
+ dir = join(path, name)
+ _remove(dir)
+ _remove_node(dir)
- def _onerror(function, path, excinfo):
- print('removeall error: (%s) %s' % (excinfo, path))
+ path = shell(path)
+ hpath = host(path)
- path = host(path)
- shutil.rmtree(path, onerror = _onerror)
+ if os.path.exists(hpath):
+ _remove(path)
+ _remove_node(path)
def expand(name, paths):
l = []
for p in paths:
- l += [join(p, name)]
+ l += [join(shell(p), name)]
return l
def expanduser(path):
@@ -157,33 +272,6 @@ def expanduser(path):
path = os.path.expanduser(path)
return shell(path)
-def listdir(path):
- path = host(path)
- files = []
- if not exists(path):
- raise error.general('path does not exist : %s' % (path))
- elif not isdir(path):
- raise error.general('path is not a directory: %s' % (path))
- else:
- if windows:
- try:
- files = os.listdir(host(path))
- except IOError:
- raise error.general('Could not list files: %s' % (path))
- except OSError as e:
- raise error.general('Could not list files: %s: %s' % (path, str(e)))
- except WindowsError as e:
- raise error.general('Could not list files: %s: %s' % (path, str(e)))
- else:
- try:
- files = os.listdir(host(path))
- except IOError:
- raise error.general('Could not list files: %s' % (path))
- except OSError as e:
- raise error.general('Could not list files: %s: %s' % (path, str(e)))
-
- return files
-
def collect_files(path_):
#
# Convert to shell paths and return shell paths.
@@ -206,30 +294,59 @@ def collect_files(path_):
files = [host(path_)]
return sorted(files)
+def copy(src, dst):
+ src = shell(src)
+ dst = shell(dst)
+ hsrc = host(src)
+ hdst = host(dst)
+ try:
+ shutil.copy(hsrc, hdst)
+ except OSError as why:
+ if windows:
+ if WindowsError is not None and isinstance(why, WindowsError):
+ pass
+ else:
+ raise error.general('copying tree (1): %s -> %s: %s' % (hsrc, hdst, str(why)))
+
def copy_tree(src, dst):
+ trace = False
+
hsrc = host(src)
hdst = host(dst)
- if os.path.exists(src) and os.path.isdir(src):
+ if exists(src):
names = listdir(src)
else:
- names = [basename(src)]
- src = dirname(src)
-
- if not os.path.isdir(dst):
- os.makedirs(dst)
+ names = []
+
+ if trace:
+ print('path.copy_tree:')
+ print(' src: "%s"' % (src))
+ print(' hsrc: "%s"' % (hsrc))
+ print(' dst: "%s"' % (dst))
+ print(' hdst: "%s"' % (hdst))
+ print(' names: %r' % (names))
+
+ if not os.path.isdir(hdst):
+ if trace:
+ print(' mkdir: %s' % (hdst))
+ try:
+ os.makedirs(hdst)
+ except OSError as why:
+ raise error.general('copying tree: cannot create target directory %s: %s' % \
+ (hdst, str(why)))
for name in names:
- srcname = os.path.join(src, name)
- dstname = os.path.join(dst, name)
+ srcname = host(os.path.join(hsrc, name))
+ dstname = host(os.path.join(hdst, name))
try:
if os.path.islink(srcname):
linkto = os.readlink(srcname)
- if os.path.exists(dstname):
+ if exists(shell(dstname)):
if os.path.islink(dstname):
dstlinkto = os.readlink(dstname)
if linkto != dstlinkto:
- log.warning('copying tree: update of link does not match: %s -> %s' % \
+ log.warning('copying tree: link does not match: %s -> %s' % \
(dstname, dstlinkto))
os.remove(dstname)
else:
@@ -241,20 +358,74 @@ def copy_tree(src, dst):
elif os.path.isdir(srcname):
copy_tree(srcname, dstname)
else:
- shutil.copy2(srcname, dstname)
+ shutil.copyfile(host(srcname), host(dstname))
+ shutil.copystat(host(srcname), host(dstname))
except shutil.Error as err:
- raise error.general('copying tree: %s -> %s: %s' % (src, dst, str(err)))
+ raise error.general('copying tree (2): %s -> %s: %s' % \
+ (hsrc, hdst, str(err)))
except EnvironmentError as why:
- raise error.general('copying tree: %s -> %s: %s' % (srcname, dstname, str(why)))
+ raise error.general('copying tree (3): %s -> %s: %s' % \
+ (srcname, dstname, str(why)))
try:
- shutil.copystat(src, dst)
+ shutil.copystat(hsrc, hdst)
except OSError as why:
- ok = False
if windows:
if WindowsError is not None and isinstance(why, WindowsError):
- ok = True
- if not ok:
- raise error.general('copying tree: %s -> %s: %s' % (src, dst, str(why)))
+ pass
+ else:
+ raise error.general('copying tree (4): %s -> %s: %s' % (hsrc, hdst, str(why)))
+
+def get_size(path, depth = -1):
+ #
+ # Get the size the directory tree manually to the required depth.
+ # This makes sure on Windows the files are correctly encoded to avoid
+ # the file name size limit. On Windows the os.walk fails once we
+ # get to the max path length on Windows.
+ #
+ def _isdir(path):
+ hpath = host(path)
+ return os.path.isdir(hpath) and not os.path.islink(hpath)
+
+ def _node_size(path):
+ hpath = host(path)
+ size = 0
+ if not os.path.islink(hpath):
+ size = os.path.getsize(hpath)
+ return size
+
+ def _get_size(path, depth, level = 0):
+ level += 1
+ dirs = []
+ size = 0
+ for name in listdir(path):
+ path_ = join(path, shell(name))
+ hname = host(path_)
+ if _isdir(path_):
+ dirs += [shell(name)]
+ else:
+ size += _node_size(path_)
+ if depth < 0 or level < depth:
+ for name in dirs:
+ dir = join(path, name)
+ size += _get_size(dir, depth, level)
+ return size
+
+ path = shell(path)
+ hpath = host(path)
+ size = 0
+
+ if os.path.exists(hpath):
+ size = _get_size(path, depth)
+
+ return size
+
+def get_humanize_size(path, depth = -1):
+ size = get_size(path, depth)
+ for unit in ['','K','M','G','T','P','E','Z']:
+ if abs(size) < 1024.0:
+ return "%5.3f%sB" % (size, unit)
+ size /= 1024.0
+ return "%.3f%sB" % (size, 'Y')
if __name__ == '__main__':
print(host('/a/b/c/d-e-f'))
@@ -271,3 +442,4 @@ if __name__ == '__main__':
print(basename('x:/sd/df/fg/me.txt'))
print(dirname('x:/sd/df/fg/me.txt'))
print(join('s:/d/', '/g', '/tyty/fgfg'))
+ print(join('s:/d/e\\f/g', '/h', '/tyty/zxzx', '\\mm\\nn/p'))
--
2.20.1 (Apple Git-117)
More information about the devel
mailing list