Source code for norfs.copy.s3
import os
import traceback
from typing import (
    Any,
    Dict,
    List,
)
from norfs.copy.base import (
    CopyDirectory,
    CopyFile,
    GenericCopyStrategy,
)
from norfs.fs.base import (
    FileSystemOperationError,
    Path,
)
[docs]class S3ToS3CopyStrategy(GenericCopyStrategy):
    def __init__(self, s3_client: Any) -> None:
        self._s3_client = s3_client
    def _list_dir(self, dir_drive: str, dir_tail: str) -> List[str]:
        response: Dict[str, List[Dict[str, str]]]
        try:
            response = self._s3_client.list_objects_v2(
                Bucket=dir_drive,
                Prefix=dir_tail
            )
        except Exception:
            raise FileSystemOperationError(traceback.format_exc())
        items: List[str] = []
        for item in response.get("Contents", []):
            items.append(item["Key"])
        while response.get("IsTruncated", False):
            try:
                response = self._s3_client.list_objects_v2(
                    Bucket=dir_drive,
                    Prefix=dir_tail,
                    ContinuationToken=response.get("NextContinuationToken", "")
                )
            except Exception:
                raise FileSystemOperationError(traceback.format_exc())
            for item in response.get("Contents", []):
                items.append(item["Key"])
        return items
[docs]    def copy_dir_to_dir(self, src: CopyDirectory, dst: CopyDirectory) -> None:
        dir_path_str: str = src.fs.path_to_string(src.path)
        src_dir_tail: str = dir_path_str[dir_path_str.find("/") + 1:]
        dir_path_str = src.fs.path_to_string(dst.path)
        dst_dir_tail: str = dir_path_str[dir_path_str.find("/") + 1:]
        for src_obj_path in self._list_dir(src.path.drive, src_dir_tail):
            dst_obj_path: str = src_obj_path.replace(src_dir_tail, dst_dir_tail)
            if src_obj_path.endswith("/"):
                copy_source = {
                    'Bucket': src.path.drive,
                    'Key': src_obj_path
                }
                self._s3_client.copy(copy_source, dst.path.drive, dst_obj_path)
            else:
                src_file: CopyFile = CopyFile(src.fs, src.fs.parse_path(f'{src.path.drive}/{src_obj_path}'))
                dst_file: CopyFile = CopyFile(dst.fs, dst.fs.parse_path(f'{dst.path.drive}/{dst_obj_path}'))
                self.copy_file_to_file(src_file, dst_file) 
[docs]    def copy_file_to_file(self, src: CopyFile, dst: CopyFile) -> None:
        src_path_str: str = src.fs.path_to_string(src.path)
        src_tail: str = src_path_str[src_path_str.find("/") + 1:]
        copy_source = {
            'Bucket': src.path.drive,
            'Key': src_tail
        }
        dst_path_str: str = dst.fs.path_to_string(dst.path)
        dst_tail: str = dst_path_str[dst_path_str.find("/") + 1:]
        self._s3_client.copy(copy_source, dst.path.drive, dst_tail)  
[docs]class S3ToLocalCopyStrategy(GenericCopyStrategy):
    def __init__(self, s3_client: Any) -> None:
        self._s3_client = s3_client
[docs]    def copy_file_to_file(self, src: CopyFile, dst: CopyFile) -> None:
        parent_dir: Path = dst.path.parent
        if not dst.fs.path_exists(parent_dir):
            os.makedirs(dst.fs.path_to_string(parent_dir))
        src_path_str: str = src.fs.path_to_string(src.path)
        src_tail: str = src_path_str[src_path_str.find("/") + 1:]
        self._s3_client.download_file(src.path.drive, src_tail, dst.fs.path_to_string(dst.path))