Examples

Public API

import norfs.helpers

Filesystem Clients

local_fs_client = norfs.helpers.local()
memory_fs_client = norfs.helpers.memory()

import boto3
s3_fs_client = norfs.helpers.s3(s3_client=boto3.client('s3'))

Files and Directories

cwd = local_fs_client.dir('.')
list(cwd.list())
[Directory(fs=LocalFileSystem(), path=/home/jovyan/work/.ipynb_checkpoints),
 File(fs=LocalFileSystem(), path=/home/jovyan/work/Dockerfile),
 File(fs=LocalFileSystem(), path=/home/jovyan/work/Public API.ipynb),
 File(fs=LocalFileSystem(), path=/home/jovyan/work/Untitled.ipynb)]
local_file = cwd.file('demo.txt')
local_file.write(b'Hello norfs!')
local_file.read()
b'Hello norfs!'
s3_fs_client.file('myBucket/hello-world.txt').read()
b'Hello World!'
s3_dir = s3_fs_client.dir('myBucket/norfs-demo/')
list(s3_dir.list())
[]
type(local_file)
norfs.filesystem.File
type(s3_dir.file('a_file.txt'))
norfs.filesystem.File
type(cwd)
norfs.filesystem.Directory
type(s3_dir)
norfs.filesystem.Directory

Copying

copy_client = norfs.helpers.get_copy_client(local_fs_client, s3_fs_client, memory_fs_client)
copy_client.copy(local_file, s3_dir)
list(s3_dir.list())
[File(fs=S3FileSystem(s3_client=<botocore.client.S3 object at 0x7f5b32227e48>, uri_protocol=s3, separator=/), path=myBucket/norfs-demo/demo.txt)]
s3_dir.file(local_file.name).read()
b'Hello norfs!'

Key Store

import norfs.helpers

local_fs_client = norfs.helpers.local()

import boto3
s3_fs_client = norfs.helpers.s3(s3_client=boto3.client('s3'))
class Store:

    def __init__(self, store_file):
        self._file = store_file
        self._store = set()

    def __enter__(self):
        if self._file.exists():
            self._store = set(x.decode('utf-8') for x in self._file.read().split(b'\n') if x)

        return self._store

    def __exit__(self, *args):
        self._file.write(b'\n'.join(x.encode('utf-8') for x in self._store if x))
store_file = local_fs_client.file('store')

with Store(store_file) as store:
    for i in range(10):
        store.add(str(i))

store_file.read()
b'0\n7\n8\n6\n3\n2\n1\n9\n5\n4'
s3_store_file = s3_fs_client.file('myBucket/norfs-demo/store')

with Store(s3_store_file) as store:
    for i in range(10):
        store.add(str(i))

s3_store_file.read()
b'0\n7\n8\n6\n3\n2\n1\n9\n5\n4'

From config using URI

class FileFactory:

    def __init__(self, mapping):
        self._mapping = mapping

    def file_from_uri(self, file_uri):
        protocol, path = file_uri.split('://')
        return self._mapping[protocol.lower()].file(path)
file_factory = FileFactory({'file': local_fs_client, 's3': s3_fs_client})
def main(store_uri):
    store_file = file_factory.file_from_uri(store_uri)

    with Store(store_file) as store:
        for i in range(5, 20):
            store.add(str(i))

    print(store_file.read())
main('file://./store')
b'15\n0\n8\n7\n17\n13\n10\n6\n16\n19\n11\n3\n2\n12\n14\n18\n1\n9\n5\n4'
main('s3://myBucket/norfs-demo/store')
b'15\n0\n8\n7\n17\n13\n10\n6\n16\n19\n11\n3\n2\n12\n14\n18\n1\n9\n5\n4'

PySpark

import norfs.helpers

local_fs_client = norfs.helpers.local()

import boto3
s3_fs_client = norfs.helpers.s3(s3_client=boto3.client('s3'))
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
def spark_count_lines(input_file):
    rdd = sc.textFile(input_file.uri)
    return rdd.count()
def init_input_file(input_file):
    input_file.write(b'\n'.join(str(x).encode('utf-8') for x in range(123)))
local_file = local_fs_client.file('input_file')
s3_file = s3_fs_client.file('myBucket/norfs-demo/input_file')
init_input_file(local_file)
spark_count_lines(local_file)
123
init_input_file(s3_file)
spark_count_lines(s3_file)
123