Source code for norfs.copy.s3

import os
import traceback

from typing import (
    Any,
    Dict,
    List,
)

from norfs.copy.base import (
    CopyDirectory,
    CopyFile,
    GenericCopyStrategy,
)
from norfs.fs.base import (
    FileSystemOperationError,
    Path,
)


[docs]class S3ToS3CopyStrategy(GenericCopyStrategy): def __init__(self, s3_client: Any) -> None: self._s3_client = s3_client def _list_dir(self, dir_drive: str, dir_tail: str) -> List[str]: response: Dict[str, List[Dict[str, str]]] try: response = self._s3_client.list_objects_v2( Bucket=dir_drive, Prefix=dir_tail ) except Exception: raise FileSystemOperationError(traceback.format_exc()) items: List[str] = [] for item in response.get("Contents", []): items.append(item["Key"]) while response.get("IsTruncated", False): try: response = self._s3_client.list_objects_v2( Bucket=dir_drive, Prefix=dir_tail, ContinuationToken=response.get("NextContinuationToken", "") ) except Exception: raise FileSystemOperationError(traceback.format_exc()) for item in response.get("Contents", []): items.append(item["Key"]) return items
[docs] def copy_dir_to_dir(self, src: CopyDirectory, dst: CopyDirectory) -> None: dir_path_str: str = src.fs.path_to_string(src.path) src_dir_tail: str = dir_path_str[dir_path_str.find("/") + 1:] dir_path_str = src.fs.path_to_string(dst.path) dst_dir_tail: str = dir_path_str[dir_path_str.find("/") + 1:] for src_obj_path in self._list_dir(src.path.drive, src_dir_tail): dst_obj_path: str = src_obj_path.replace(src_dir_tail, dst_dir_tail) if src_obj_path.endswith("/"): copy_source = { 'Bucket': src.path.drive, 'Key': src_obj_path } self._s3_client.copy(copy_source, dst.path.drive, dst_obj_path) else: src_file: CopyFile = CopyFile(src.fs, src.fs.parse_path(f'{src.path.drive}/{src_obj_path}')) dst_file: CopyFile = CopyFile(dst.fs, dst.fs.parse_path(f'{dst.path.drive}/{dst_obj_path}')) self.copy_file_to_file(src_file, dst_file)
[docs] def copy_file_to_file(self, src: CopyFile, dst: CopyFile) -> None: src_path_str: str = src.fs.path_to_string(src.path) src_tail: str = src_path_str[src_path_str.find("/") + 1:] copy_source = { 'Bucket': src.path.drive, 'Key': src_tail } dst_path_str: str = dst.fs.path_to_string(dst.path) dst_tail: str = dst_path_str[dst_path_str.find("/") + 1:] self._s3_client.copy(copy_source, dst.path.drive, dst_tail)
[docs]class S3ToLocalCopyStrategy(GenericCopyStrategy): def __init__(self, s3_client: Any) -> None: self._s3_client = s3_client
[docs] def copy_file_to_file(self, src: CopyFile, dst: CopyFile) -> None: parent_dir: Path = dst.path.parent if not dst.fs.path_exists(parent_dir): os.makedirs(dst.fs.path_to_string(parent_dir)) src_path_str: str = src.fs.path_to_string(src.path) src_tail: str = src_path_str[src_path_str.find("/") + 1:] self._s3_client.download_file(src.path.drive, src_tail, dst.fs.path_to_string(dst.path))