import collections.abc
import operator
import os.path
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from contextlib import asynccontextmanager
from functools import reduce
from io import BytesIO, StringIO
from pathlib import PurePath
from typing import Any, Mapping, Sequence, Tuple, Union
[docs]class BaseProtocol(metaclass=ABCMeta):
def __init__(self, *args, **kwargs):
super().__init__()
self.path_sep = os.path.sep
[docs] @staticmethod
@abstractmethod
async def ls(path: str, pattern: str = None, *args, **kwargs) -> Sequence:
pass
[docs] @abstractmethod
async def open(self, path, *args, **kwargs):
pass
[docs] @staticmethod
@abstractmethod
async def exists(path: str) -> bool:
pass
[docs] @staticmethod
@abstractmethod
async def cp(src_path: str, dst_path: str):
pass
[docs] @abstractmethod
async def mkdir(self, path: str):
pass
[docs] @abstractmethod
async def mkdirs(self, path: str):
pass
[docs] @abstractmethod
async def mv(self, src_path: str, dst_path: str):
pass
[docs] @abstractmethod
async def rm(self, path: str):
pass
[docs] @staticmethod
@abstractmethod
async def is_dir(path: str) -> bool:
pass
[docs] @abstractmethod
async def glob(self, pattern: str) -> Tuple:
pass
[docs]def nested_defaultdict():
return defaultdict(nested_defaultdict)
[docs]class ContextualStringIO(StringIO):
async def __aenter__(self):
return self
async def __aexit__(self, *args):
self.close()
return False
[docs]class ContextualBytesIO(BytesIO):
async def __aenter__(self):
return self
async def __aexit__(self, *args):
self.close()
return False
[docs]class MemProtocol(BaseProtocol):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.path_sep = os.path.sep
self.tree = {}
@staticmethod
def _split_path(path: Union[str, PurePath]):
try:
path_parts = path.parts
except AttributeError:
path_parts = PurePath(path).parts
return path_parts
@classmethod
def _remove_tree_item(cls, tree: Mapping, path: Union[str, PurePath]):
path_parts = cls._split_path(path)
try:
del reduce(operator.getitem, path_parts[:-1], tree)[path_parts[-1]]
except KeyError:
pass
@classmethod
def _get_tree_item(cls, tree: Mapping, path: Union[str, PurePath]):
path_parts = cls._split_path(path)
try:
return reduce(operator.getitem, path_parts, tree)
except KeyError:
raise FileNotFoundError
@classmethod
def _set_tree_item(cls, tree: Mapping, path: Union[str, PurePath], value: Any):
path_parts = cls._split_path(path)
current_node = tree
for path_part in path_parts[:-1]:
try:
current_node = current_node[path_part]
if not isinstance(current_node, collections.abc.Mapping):
raise FileNotFoundError(f'Node already exists: {current_node}')
except KeyError:
new_node = {}
current_node[path_part] = new_node
current_node = new_node
try:
if type(current_node[path_parts[-1]]) != type(value):
raise FileNotFoundError('Node already exists')
current_node[path_parts[-1]] = value
except KeyError:
current_node[path_parts[-1]] = value
@classmethod
def _set_parent_tree_item(cls, tree: Mapping, path: Union[str, PurePath], value: Any):
try:
parent_item_path = path.parent
except AttributeError:
parent_item_path = PurePath(path).parent
cls._set_tree_item(tree, parent_item_path, value)
[docs] async def ls(self, path: Union[str, PurePath], pattern: str = None, *args, **kwargs) -> Sequence:
item = self._get_tree_item(self.tree, path)
return tuple(item)
[docs] @asynccontextmanager
async def open(self, path: Union[str, PurePath], *args, **kwargs):
mode = kwargs.pop('mode', args[0] if len(args) else 'r')
try:
encoding = kwargs.get('encoding', 'utf-8')
item = self._get_tree_item(self.tree, path)
if 'b' in mode:
f = ContextualBytesIO(item)
else:
f = ContextualStringIO(item.decode(encoding))
yield f
if 'w' in mode:
if 'b' in mode:
item = f.getvalue()
else:
item = f.getvalue().encode(encoding)
self._set_tree_item(self.tree, path, item)
except FileNotFoundError:
if 'w' in mode or 'a' in mode:
f = ContextualStringIO(None, *args, **kwargs)
yield f
self._set_tree_item(self.tree, path, f)
else:
raise
[docs] async def exists(self, path: Union[str, PurePath]) -> bool:
try:
self._get_tree_item(self.tree, path)
except FileNotFoundError:
return False
return True
[docs] async def cp(self, src_path: Union[str, PurePath], dst_path: Union[str, PurePath]):
src_path = PurePath(src_path)
dst_path_is_dir = isinstance(dst_path, str) and (dst_path.endswith('/') or dst_path.endswith('\\'))
dst_path = PurePath(dst_path)
src_item = self._get_tree_item(self.tree, src_path)
src_is_dir = isinstance(src_item, collections.abc.Mapping)
try:
dst_item = self._get_tree_item(self.tree, dst_path)
if isinstance(dst_item, collections.abc.Mapping):
if src_is_dir:
pass
else:
self._set_tree_item(self.tree, dst_path.joinpath(src_path.name), src_item)
else:
if dst_path_is_dir:
raise ValueError(f'Unable to copy {src_path} to directory path. it is a file')
if src_is_dir:
raise ValueError(f'Unable to copy directory {src_path} to file {dst_path}')
else:
self._set_tree_item(self.tree, dst_path, src_item)
self._set_tree_item(self.tree, dst_path, src_item)
except FileNotFoundError:
if src_is_dir:
pass
else:
if dst_path_is_dir:
self._set_tree_item(self.tree, dst_path.joinpath(src_path.name), src_item)
else:
self._set_tree_item(self.tree, dst_path, src_item)
[docs] async def mkdir(self, path: Union[str, PurePath]):
await self.mkdirs(path)
[docs] async def mkdirs(self, path: Union[str, PurePath]):
self._set_tree_item(self.tree, path, {})
[docs] async def mv(self, src_path: Union[str, PurePath], dst_path: Union[str, PurePath]):
await self.cp(src_path, dst_path)
await self.rm(src_path)
[docs] async def rm(self, path: Union[str, PurePath]):
self._remove_tree_item(self.tree, path)
[docs] async def is_dir(self, path: Union[str, PurePath]) -> bool:
return isinstance(self._get_tree_item(self.tree, path), collections.abc.Mapping)
[docs] async def glob(self, pattern: str) -> Tuple:
pass