Source code for norfs.fs.local

import os
import shutil
import stat
import traceback

from typing import (
    Dict,
    Iterable,
    List,
    Optional,
)


from norfs.fs.base import (
    BaseFileSystem,
    FSObjectPath,
    FSObjectType,
    FileSystemOperationError,
    Path,
)
from norfs.permissions import Policy, Perm, Scope


_local_fs_perms = {
    (Scope.OWNER, Perm.READ): stat.S_IRUSR,
    (Scope.OWNER, Perm.WRITE): stat.S_IWUSR,
    (Scope.OWNER, Perm.EXECUTE): stat.S_IXUSR,
    (Scope.GROUP, Perm.READ): stat.S_IRGRP,
    (Scope.GROUP, Perm.WRITE): stat.S_IWGRP,
    (Scope.GROUP, Perm.EXECUTE): stat.S_IXGRP,
    (Scope.OTHERS, Perm.READ): stat.S_IROTH,
    (Scope.OTHERS, Perm.WRITE): stat.S_IWOTH,
    (Scope.OTHERS, Perm.EXECUTE): stat.S_IXOTH,
}


[docs]class LocalFileSystem(BaseFileSystem): # General operations
[docs] def parse_path(self, path: str) -> Path: abs_path: str = os.path.abspath(os.path.normpath(path)) drive: str tail_str: str drive, tail_str = os.path.splitdrive(abs_path) tail: List[str] = tail_str.split(os.sep) return Path(drive, *tail)
[docs] def path_exists(self, path: Path) -> bool: return os.path.exists(self.path_to_string(path))
[docs] def path_to_string(self, path: Path) -> str: return os.path.join(path.drive, os.sep.join(path.tail))
[docs] def path_to_uri(self, path: Path) -> str: return "file:///{}/{}".format(path.drive, os.path.join(*path.tail))
# File operations
[docs] def file_read(self, path: Path) -> bytes: try: with open(self.path_to_string(path), "rb") as f: return f.read() except Exception: raise FileSystemOperationError(traceback.format_exc())
[docs] def file_write(self, path: Path, content: bytes) -> None: try: parent_path: str = self.path_to_string(path.parent) if not os.path.exists(parent_path): os.makedirs(parent_path) with open(self.path_to_string(path), "wb") as f: f.write(content) except Exception: raise FileSystemOperationError(traceback.format_exc())
[docs] def file_remove(self, path: Path) -> None: try: os.remove(self.path_to_string(path)) except Exception: raise FileSystemOperationError(traceback.format_exc())
[docs] def file_set_perms(self, path: Path, policies: List[Policy]) -> None: """ Set permissions for a file. This works as expected on a unix file system. `Perm.WRITE_PERMS` and `Perm.READ_PERMS` are ignored. """ mode = 0 for policy in policies: for perm in policy.perms: mode |= _local_fs_perms.get((policy.scope, perm), 0) os.chmod(self.path_to_string(path), mode)
[docs] def file_set_properties(self, path: Path, content_type: Optional[str] = None, tags: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, str]] = None) -> None: """ Has no effect. """ ...
# Directory operations
[docs] def dir_list(self, path: Path) -> Iterable[FSObjectPath]: path_str: str = self.path_to_string(path) items: List[str] try: items = os.listdir(path_str) except FileNotFoundError: return for item in items: full_path: str = os.path.join(path_str, item) item_path: Path = path.child(item) if os.path.isfile(full_path): yield FSObjectPath(FSObjectType.FILE, item_path) elif os.path.isdir(full_path): yield FSObjectPath(FSObjectType.DIR, item_path) else: yield FSObjectPath(FSObjectType.OTHER, item_path)
[docs] def dir_remove(self, path: Path) -> None: try: shutil.rmtree(self.path_to_string(path)) except Exception: raise FileSystemOperationError(traceback.format_exc())