import collections
import posixpath
import tarfile
import errno
import io
import stat
from pathlib_abc import PathBase, UnsupportedOperation
_tar_stat_fields = ('st_mode st_ino st_dev st_nlink st_uid st_gid '
'st_size st_atime st_mtime st_ctime st_uname st_gname')
class TarStatResult(collections.namedtuple('_TarStatResult', _tar_stat_fields)):
"""Tar-specific version of os.stat_result. Returned by TarPath.stat()."""
__slots__ = ()
@classmethod
def from_tarinfo(cls, tarobj, tarinfo):
"""Create a TarStatResult from TarFile and TarInfo objects."""
if tarinfo.type in tarfile.REGULAR_TYPES:
st_mode = stat.S_IFREG
elif tarinfo.type == tarfile.DIRTYPE:
st_mode = stat.S_IFDIR
elif tarinfo.type == tarfile.SYMTYPE or tarinfo.type == tarfile.LNKTYPE:
st_mode = stat.S_IFLNK
elif tarinfo.type == tarfile.FIFOTYPE:
st_mode = stat.S_IFIFO
elif tarinfo.type == tarfile.CHRTYPE:
st_mode = stat.S_IFCHR
elif tarinfo.type == tarfile.BLKTYPE:
st_mode = stat.S_IFBLK
else:
raise ValueError(tarinfo.type)
return cls(st_mode=tarinfo.mode | st_mode,
st_ino=tarinfo.offset_data,
st_dev=id(tarobj),
st_nlink=0,
st_uid=tarinfo.uid,
st_gid=tarinfo.gid,
st_size=tarinfo.size,
st_atime=0,
st_mtime=tarinfo.mtime,
st_ctime=0,
st_uname=tarinfo.uname,
st_gname=tarinfo.gname)
@classmethod
def implied_directory(cls, tarobj, path):
"""Create a TarStatResult for a directory that is implied to exist
by another archive member's path.
"""
return cls(stat.S_IFDIR, hash(path), id(tarobj), 0, 0, 0, 0, 0, 0, 0, None, None)
class TarPathWriter(io.BytesIO):
"""File object that flushes its contents to a tar archive on close.
Returned by TarPath.open(mode="w").
"""
def __init__(self, tarobj, path):
super().__init__()
self.tarobj = tarobj
self.path = path
def close(self):
info = tarfile.TarInfo(self.path)
info.size = self.tell()
self.seek(0)
self.tarobj.addfile(info, self)
super().close()
class TarPath(PathBase):
"""A pathlib-compatible interface for tar files."""
__slots__ = ('tarobj',)
parser = posixpath
def __init__(self, *pathsegments, tarobj):
super().__init__(*pathsegments)
self.tarobj = tarobj
def __repr__(self):
return f"{type(self).__name__}({str(self)!r}, tarobj={self.tarobj!r})"
def __hash__(self):
return hash((id(self.tarobj), str(self)))
def __eq__(self, other):
if not isinstance(other, TarPath):
return NotImplemented
return self.tarobj is other.tarobj and str(self) == str(other)
def with_segments(self, *pathsegments):
"""Construct a new TarPath object with the same underlying TarFile
object from any number of path-like objects.
"""
return type(self)(*pathsegments, tarobj=self.tarobj)
def stat(self, *, follow_symlinks=True):
"""Return the path's status, similar to os.stat()."""
if follow_symlinks:
resolved = self.resolve()
else:
resolved = self.parent.resolve() / self.name
implied_directory = False
for info in reversed(self.tarobj.getmembers()):
path = self.with_segments(info.name)
if path == resolved:
return TarStatResult.from_tarinfo(self.tarobj, info)
elif resolved in path.parents:
implied_directory = True
if implied_directory:
return TarStatResult.implied_directory(self.tarobj, str(resolved))
else:
raise FileNotFoundError(errno.ENOENT, "Not found", str(self))
def owner(self, *, follow_symlinks=True):
"""Return the user name of the path owner."""
name = self.stat(follow_symlinks=follow_symlinks).st_uname
if name is not None:
return name
raise UnsupportedOperation()
def group(self, *, follow_symlinks=True):
"""Return the group name of the path owner."""
name = self.stat(follow_symlinks=follow_symlinks).st_gname
if name is not None:
return name
raise UnsupportedOperation()
def open(self, mode='r', buffering=-1, encoding=None, errors=None, newline=None):
"""Open the archive member pointed by this path and return a file
object, similar to the built-in open() function.
"""
if buffering != -1:
raise UnsupportedOperation()
action = ''.join(c for c in mode if c not in 'btU')
if action == 'r':
fileobj = self.tarobj.extractfile(str(self.resolve()))
elif action == 'w':
fileobj = TarPathWriter(self.tarobj, str(self.resolve()))
else:
raise UnsupportedOperation()
if 'b' not in mode:
fileobj = io.TextIOWrapper(fileobj, encoding, errors, newline)
return fileobj
def iterdir(self):
"""Yield path objects of the directory contents. The children are
yielded in arbitrary order.
"""
resolved = self.resolve()
seen = set()
for info in self.tarobj.getmembers():
path = self.with_segments(info.name)
if path == resolved:
if info.type != tarfile.DIRTYPE:
raise NotADirectoryError(errno.ENOTDIR, "Not a directory", str(self))
while True:
parent = path.parent
if parent == path:
break
elif parent == resolved:
path_str = str(path)
if path_str not in seen:
seen.add(path_str)
yield self / path.name
break
path = parent
if not seen:
raise FileNotFoundError(errno.ENOENT, "File not found", str(self))
def readlink(self):
"""Return the path to which the symbolic link points."""
for info in reversed(self.tarobj.getmembers()):
path = self.with_segments(info.name)
if path == self:
if info.issym():
return self.with_segments(info.linkname)
else:
raise OSError(errno.EINVAL, "Not a symlink", str(self))
elif self in path.parents:
raise OSError(errno.EINVAL, "Not a symlink", str(self))
raise FileNotFoundError(errno.ENOENT, "File not found", str(self))
def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""Create a new directory at this given path."""
info = tarfile.TarInfo(str(self))
info.type = tarfile.DIRTYPE
info.mode = mode
self.tarobj.addfile(info)
def symlink_to(self, target, target_is_directory=False):
"""Make this path a symlink pointing to the target path."""
info = tarfile.TarInfo(str(self))
info.type = tarfile.SYMTYPE
info.linkname = str(self.with_segments(target))
self.tarobj.addfile(info)
def hardlink_to(self, target):
"""Make this path a hard link pointing to the target path."""
info = tarfile.TarInfo(str(self))
info.type = tarfile.LNKTYPE
info.linkname = str(self.with_segments(target))
self.tarobj.addfile(info)