From a0cc4602bd3352effc10baa95eecafa1d1fee1c3 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Sat, 4 Nov 2023 23:34:50 -0700 Subject: [PATCH] Replace fs with fsspec (#523) * Replace fs with fsspec * update * update * update * update * update * update * update * update * update docs * update --- .devcontainer/devcontainer.json | 6 +- RELEASE.md | 4 + fugue/__init__.py | 1 - fugue/_utils/io.py | 173 +++++++++--------- fugue/dataframe/utils.py | 37 ++-- fugue/execution/execution_engine.py | 7 - fugue/execution/native_execution_engine.py | 16 +- fugue/workflow/_checkpoint.py | 18 +- fugue_dask/_io.py | 51 +++--- fugue_dask/execution_engine.py | 17 +- fugue_duckdb/_io.py | 58 +++--- fugue_duckdb/execution_engine.py | 9 +- fugue_ibis/execution_engine.py | 6 +- fugue_ray/_utils/io.py | 32 ++-- fugue_spark/_utils/io.py | 8 +- fugue_spark/execution_engine.py | 9 +- fugue_test/builtin_suite.py | 24 +-- fugue_test/execution_suite.py | 31 ++-- fugue_test/plugins/misc/__init__.py | 2 + fugue_test/plugins/misc/fixtures.py | 18 ++ requirements.txt | 2 + setup.py | 3 +- tests/fugue/dataframe/test_utils.py | 6 +- tests/fugue/utils/test_io.py | 140 +++++++------- .../workflow/test_workflow_determinism.py | 2 +- tests/fugue_dask/test_io.py | 8 +- tests/fugue_ray/test_execution_engine.py | 12 +- tests/fugue_spark/utils/test_io.py | 29 ++- 28 files changed, 341 insertions(+), 388 deletions(-) create mode 100644 fugue_test/plugins/misc/__init__.py create mode 100644 fugue_test/plugins/misc/fixtures.py diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 06732abf..55677d0a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,6 +1,5 @@ { "name": "Fugue Development Environment", - "_image": "fugueproject/devenv:0.7.7", "image": "mcr.microsoft.com/vscode/devcontainers/python:3.10", "customizations": { "vscode": { @@ -8,6 +7,7 @@ "terminal.integrated.shell.linux": "/bin/bash", "python.pythonPath": "/usr/local/bin/python", "python.defaultInterpreterPath": "/usr/local/bin/python", + "editor.defaultFormatter": "ms-python.black-formatter", "isort.interpreter": [ "/usr/local/bin/python" ], @@ -16,6 +16,9 @@ ], "pylint.interpreter": [ "/usr/local/bin/python" + ], + "black-formatter.interpreter": [ + "/usr/local/bin/python" ] }, "extensions": [ @@ -24,6 +27,7 @@ "ms-python.flake8", "ms-python.pylint", "ms-python.mypy", + "ms-python.black-formatter", "GitHub.copilot", "njpwerner.autodocstring" ] diff --git a/RELEASE.md b/RELEASE.md index 2bc60c1d..5519aded 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -2,6 +2,10 @@ ## 0.8.7 +- [488](https://github.com/fugue-project/fugue/issues/488) Migrate from fs to fsspec +- [521](https://github.com/fugue-project/fugue/issues/521) Add `as_dicts` to Fugue API +- [516](https://github.com/fugue-project/fugue/issues/516) Use `_collect_as_arrow` for Spark `as_arrow`` +- [520](https://github.com/fugue-project/fugue/pull/520) Add Python 3.10 to Windows Tests - [506](https://github.com/fugue-project/fugue/issues/506) Adopt pandas `ExtensionDType` - [504](https://github.com/fugue-project/fugue/issues/504) Create Fugue pytest fixtures - [503](https://github.com/fugue-project/fugue/issues/503) Deprecate python 3.7 support diff --git a/fugue/__init__.py b/fugue/__init__.py index 3de97585..5824aeda 100644 --- a/fugue/__init__.py +++ b/fugue/__init__.py @@ -1,6 +1,5 @@ # flake8: noqa from triad.collections import Schema -from triad.collections.fs import FileSystem from fugue.api import out_transform, transform from fugue.bag.array_bag import ArrayBag diff --git a/fugue/_utils/io.py b/fugue/_utils/io.py index d3d89eb7..82cf4484 100644 --- a/fugue/_utils/io.py +++ b/fugue/_utils/io.py @@ -1,14 +1,14 @@ import os import pathlib from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union -from urllib.parse import urlparse -import fs as pfs import pandas as pd +from fsspec import AbstractFileSystem +from fsspec.implementations.local import LocalFileSystem from triad.collections.dict import ParamDict -from triad.collections.fs import FileSystem from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw +from triad.utils.io import join, url_to_fs from triad.utils.pandas_like import PD_UTILS from fugue.dataframe import LocalBoundedDataFrame, LocalDataFrame, PandasDataFrame @@ -16,23 +16,14 @@ class FileParser(object): def __init__(self, path: str, format_hint: Optional[str] = None): - last = len(path) - has_glob = False self._orig_format_hint = format_hint - for i in range(len(path)): - if path[i] in ["/", "\\"]: - last = i - if path[i] in ["*", "?"]: - has_glob = True - break - if not has_glob: - self._uri = urlparse(path) - self._glob_pattern = "" - self._path = self._uri.path + self._has_glob = "*" in path or "?" in path + self._raw_path = path + self._fs, self._fs_path = url_to_fs(path) + if not self.is_local: + self._path = self._fs.unstrip_protocol(self._fs_path) else: - self._uri = urlparse(path[:last]) - self._glob_pattern = path[last + 1 :] - self._path = pfs.path.combine(self._uri.path, self._glob_pattern) + self._path = os.path.abspath(self._fs._strip_protocol(path)) if format_hint is None or format_hint == "": for k, v in _FORMAT_MAP.items(): @@ -48,56 +39,64 @@ def __init__(self, path: str, format_hint: Optional[str] = None): self._format = format_hint def assert_no_glob(self) -> "FileParser": - assert_or_throw(self.glob_pattern == "", f"{self.path} has glob pattern") + assert_or_throw(not self.has_glob, f"{self.raw_path} has glob pattern") return self - def with_glob(self, glob: str, format_hint: Optional[str] = None) -> "FileParser": - uri = self.uri - if glob != "": - uri = pfs.path.combine(uri, glob) - return FileParser(uri, format_hint or self._orig_format_hint) - @property - def glob_pattern(self) -> str: - return self._glob_pattern + def has_glob(self): + return self._has_glob @property - def uri(self) -> str: - return self._uri.geturl() + def is_local(self): + return isinstance(self._fs, LocalFileSystem) - @property - def uri_with_glob(self) -> str: - if self.glob_pattern == "": - return self.uri - return pfs.path.combine(self.uri, self.glob_pattern) + def join(self, path: str, format_hint: Optional[str] = None) -> "FileParser": + if not self.has_glob: + _path = join(self.path, path) + else: + _path = join(self.parent, path) + return FileParser(_path, format_hint or self._orig_format_hint) @property def parent(self) -> str: - dn = os.path.dirname(self.uri) - return dn if dn != "" else "." - - @property - def scheme(self) -> str: - return self._uri.scheme + return self._fs.unstrip_protocol(self._fs._parent(self._fs_path)) @property def path(self) -> str: return self._path + @property + def raw_path(self) -> str: + return self._raw_path + @property def suffix(self) -> str: - return "".join(pathlib.Path(self.path.lower()).suffixes) + return "".join(pathlib.Path(self.raw_path.lower()).suffixes) @property def file_format(self) -> str: return self._format + def make_parent_dirs(self) -> None: + self._fs.makedirs(self._fs._parent(self._fs_path), exist_ok=True) + + def find_all(self) -> Iterable["FileParser"]: + if self.has_glob: + for x in self._fs.glob(self._fs_path): + yield FileParser(self._fs.unstrip_protocol(x)) + else: + yield self + + def open(self, *args: Any, **kwargs: Any) -> Any: + self.assert_no_glob() + return self._fs.open(self._fs_path, *args, **kwargs) + def load_df( uri: Union[str, List[str]], format_hint: Optional[str] = None, columns: Any = None, - fs: Optional[FileSystem] = None, + fs: Optional[AbstractFileSystem] = None, **kwargs: Any, ) -> LocalBoundedDataFrame: if isinstance(uri, str): @@ -117,7 +116,7 @@ def save_df( uri: str, format_hint: Optional[str] = None, mode: str = "overwrite", - fs: Optional[FileSystem] = None, + fs: Optional[AbstractFileSystem] = None, **kwargs: Any, ) -> None: assert_or_throw( @@ -125,40 +124,28 @@ def save_df( ) p = FileParser(uri, format_hint).assert_no_glob() if fs is None: - fs = FileSystem() + fs, _ = url_to_fs(uri) if fs.exists(uri): assert_or_throw(mode == "overwrite", FileExistsError(uri)) try: - fs.remove(uri) - except Exception: - try: - fs.removetree(uri) - except Exception: # pragma: no cover - pass + fs.rm(uri, recursive=True) + except Exception: # pragma: no cover + pass _FORMAT_SAVE[p.file_format](df, p, **kwargs) def _get_single_files( - fp: Iterable[FileParser], fs: Optional[FileSystem] + fp: Iterable[FileParser], fs: Optional[AbstractFileSystem] ) -> Iterable[FileParser]: - if fs is None: - fs = FileSystem() for f in fp: - if f.glob_pattern != "": - files = [ - FileParser(pfs.path.combine(f.uri, pfs.path.basename(x.path))) - for x in fs.opendir(f.uri).glob(f.glob_pattern) - ] - yield from _get_single_files(files, fs) - else: - yield f + yield from f.find_all() def _save_parquet(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None: PD_UTILS.to_parquet_friendly( df.as_pandas(), partition_cols=kwargs.get("partition_cols", []) ).to_parquet( - p.uri, + p.assert_no_glob().path, **{ "engine": "pyarrow", "schema": df.schema.pa_schema, @@ -171,34 +158,36 @@ def _load_parquet( p: FileParser, columns: Any = None, **kwargs: Any ) -> Tuple[pd.DataFrame, Any]: if columns is None: - pdf = pd.read_parquet(p.uri, **{"engine": "pyarrow", **kwargs}) + pdf = pd.read_parquet(p.path, **{"engine": "pyarrow", **kwargs}) return pdf, None if isinstance(columns, list): # column names - pdf = pd.read_parquet(p.uri, columns=columns, **{"engine": "pyarrow", **kwargs}) + pdf = pd.read_parquet( + p.path, columns=columns, **{"engine": "pyarrow", **kwargs} + ) return pdf, None schema = Schema(columns) pdf = pd.read_parquet( - p.uri, columns=schema.names, **{"engine": "pyarrow", **kwargs} + p.path, columns=schema.names, **{"engine": "pyarrow", **kwargs} ) return pdf, schema def _save_csv(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None: - df.as_pandas().to_csv(p.uri, **{"index": False, "header": False, **kwargs}) + with p.open("w") as f: + df.as_pandas().to_csv(f, **{"index": False, "header": False, **kwargs}) -def _safe_load_csv(path: str, **kwargs: Any) -> pd.DataFrame: +def _safe_load_csv(p: FileParser, **kwargs: Any) -> pd.DataFrame: def load_dir() -> pd.DataFrame: - fs = FileSystem() - return pd.concat( - [ - pd.read_csv(pfs.path.combine(path, pfs.path.basename(x.path)), **kwargs) - for x in fs.opendir(path).glob("*.csv") - ] - ) + dfs: List[pd.DataFrame] = [] + for _p in p.join("*.csv").find_all(): # type: ignore + with _p.open("r") as f: + dfs.append(pd.read_csv(f, **kwargs)) + return pd.concat(dfs) try: - return pd.read_csv(path, **kwargs) + with p.open("r") as f: + return pd.read_csv(f, **kwargs) except IsADirectoryError: return load_dir() except pd.errors.ParserError: # pragma: no cover @@ -224,7 +213,7 @@ def _load_csv( # noqa: C901 header = kw["header"] del kw["header"] if str(header) in ["True", "0"]: - pdf = _safe_load_csv(p.uri, **{"index_col": False, "header": 0, **kw}) + pdf = _safe_load_csv(p, **{"index_col": False, "header": 0, **kw}) if columns is None: return pdf, None if isinstance(columns, list): # column names @@ -236,12 +225,14 @@ def _load_csv( # noqa: C901 raise ValueError("columns must be set if without header") if isinstance(columns, list): # column names pdf = _safe_load_csv( - p.uri, **{"index_col": False, "header": None, "names": columns, **kw} + p, + **{"index_col": False, "header": None, "names": columns, **kw}, ) return pdf, None schema = Schema(columns) pdf = _safe_load_csv( - p.uri, **{"index_col": False, "header": None, "names": schema.names, **kw} + p, + **{"index_col": False, "header": None, "names": schema.names, **kw}, ) return pdf, schema else: @@ -249,27 +240,31 @@ def _load_csv( # noqa: C901 def _save_json(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None: - df.as_pandas().to_json(p.uri, **{"orient": "records", "lines": True, **kwargs}) + with p.open("w") as f: + df.as_pandas().to_json(f, **{"orient": "records", "lines": True, **kwargs}) -def _safe_load_json(path: str, **kwargs: Any) -> pd.DataFrame: +def _safe_load_json(p: FileParser, **kwargs: Any) -> pd.DataFrame: kw = {"orient": "records", "lines": True, **kwargs} + + def load_dir() -> pd.DataFrame: + dfs: List[pd.DataFrame] = [] + for _p in p.join("*.json").find_all(): # type: ignore + with _p.open("r") as f: + dfs.append(pd.read_json(f, **kw)) + return pd.concat(dfs) + try: - return pd.read_json(path, **kw) + with p.open("r") as f: + return pd.read_json(f, **kw) except (IsADirectoryError, PermissionError): - fs = FileSystem() - return pd.concat( - [ - pd.read_json(pfs.path.combine(path, pfs.path.basename(x.path)), **kw) - for x in fs.opendir(path).glob("*.json") - ] - ) + return load_dir() def _load_json( p: FileParser, columns: Any = None, **kwargs: Any ) -> Tuple[pd.DataFrame, Any]: - pdf = _safe_load_json(p.uri, **kwargs).reset_index(drop=True) + pdf = _safe_load_json(p, **kwargs).reset_index(drop=True) if columns is None: return pdf, None if isinstance(columns, list): # column names diff --git a/fugue/dataframe/utils.py b/fugue/dataframe/utils.py index ffd7c3f8..2afe32d2 100644 --- a/fugue/dataframe/utils.py +++ b/fugue/dataframe/utils.py @@ -1,15 +1,15 @@ -import os import pickle -from typing import Any, Iterable, Optional, Tuple, List, Dict +from typing import Any, Dict, Iterable, List, Optional, Tuple import pandas as pd import pyarrow as pa -from fs import open_fs -from triad import FileSystem, Schema, assert_or_throw +from fsspec import AbstractFileSystem +from triad import Schema, assert_or_throw from triad.collections.schema import SchemaError from triad.exceptions import InvalidOperationError from triad.utils.assertion import assert_arg_not_none from triad.utils.assertion import assert_or_throw as aot +from triad.utils.io import url_to_fs from triad.utils.pyarrow import pa_batch_to_dicts from .api import as_fugue_df, get_column_names, normalize_column_names, rename @@ -112,7 +112,6 @@ def serialize_df( df: Optional[DataFrame], threshold: int = -1, file_path: Optional[str] = None, - fs: Optional[FileSystem] = None, ) -> Optional[bytes]: """Serialize input dataframe to base64 string or to file if it's larger than threshold @@ -121,15 +120,8 @@ def serialize_df( :param threshold: file byte size threshold, defaults to -1 :param file_path: file path to store the data (used only if the serialized data is larger than ``threshold``), defaults to None - :param fs: :class:`~triad:triad.collections.fs.FileSystem`, defaults to None :raises InvalidOperationError: if file is large but ``file_path`` is not provided :return: a pickled blob either containing the data or the file path - - .. note:: - - If fs is not provided but it needs to write to disk, then it will use - :meth:`~fs:fs.opener.registry.Registry.open_fs` to try to open the file to - write. """ if df is None: return None @@ -140,24 +132,20 @@ def serialize_df( else: if file_path is None: raise InvalidOperationError("file_path is not provided") - if fs is None: - with open_fs( - os.path.dirname(file_path), writeable=True, create=False - ) as _fs: - _fs.writebytes(os.path.basename(file_path), data) - else: - fs.writebytes(file_path, data) + fs, path = url_to_fs(file_path) + with fs.open(path, "wb") as f: + f.write(data) return pickle.dumps(file_path) def deserialize_df( - data: Optional[bytes], fs: Optional[FileSystem] = None + data: Optional[bytes], fs: Optional[AbstractFileSystem] = None ) -> Optional[LocalBoundedDataFrame]: """Deserialize json string to :class:`~fugue.dataframe.dataframe.LocalBoundedDataFrame` :param json_str: json string containing the base64 data or a file path - :param fs: :class:`~triad:triad.collections.fs.FileSystem`, defaults to None + :param fs: the file system to use, defaults to None :raises ValueError: if the json string is invalid, not generated from :func:`~.serialize_df` :return: :class:`~fugue.dataframe.dataframe.LocalBoundedDataFrame` if ``json_str`` @@ -169,10 +157,9 @@ def deserialize_df( if isinstance(obj, LocalBoundedDataFrame): return obj elif isinstance(obj, str): - if fs is None: - with open_fs(os.path.dirname(obj), create=False) as _fs: - return pickle.loads(_fs.readbytes(os.path.basename(obj))) - return pickle.loads(fs.readbytes(obj)) + fs, path = url_to_fs(obj) + with fs.open(path, "rb") as f: + return pickle.load(f) raise ValueError("data is invalid") diff --git a/fugue/execution/execution_engine.py b/fugue/execution/execution_engine.py index 0f6ccd3a..468ab097 100644 --- a/fugue/execution/execution_engine.py +++ b/fugue/execution/execution_engine.py @@ -18,7 +18,6 @@ from uuid import uuid4 from triad import ParamDict, Schema, SerializableRLock, assert_or_throw, to_uuid -from triad.collections.fs import FileSystem from triad.collections.function_wrapper import AnnotatedParam from triad.exceptions import InvalidOperationError from triad.utils.convert import to_size @@ -471,12 +470,6 @@ def set_sql_engine(self, engine: SQLEngine) -> None: """ self._sql_engine = engine - @property - @abstractmethod - def fs(self) -> FileSystem: # pragma: no cover - """File system of this engine instance""" - raise NotImplementedError - @abstractmethod def create_default_map_engine(self) -> MapEngine: # pragma: no cover """Default MapEngine if user doesn't specify""" diff --git a/fugue/execution/native_execution_engine.py b/fugue/execution/native_execution_engine.py index 6cae5f10..c6d91d38 100644 --- a/fugue/execution/native_execution_engine.py +++ b/fugue/execution/native_execution_engine.py @@ -1,12 +1,13 @@ import logging import os from typing import Any, Callable, Dict, List, Optional, Type, Union + import numpy as np import pandas as pd from triad import Schema from triad.collections.dict import IndexedOrderedDict -from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw +from triad.utils.io import makedirs from triad.utils.pandas_like import PandasUtils from fugue._utils.io import load_df, save_df @@ -179,7 +180,6 @@ class NativeExecutionEngine(ExecutionEngine): def __init__(self, conf: Any = None): super().__init__(conf) - self._fs = FileSystem() self._log = logging.getLogger() def __repr__(self) -> str: @@ -189,10 +189,6 @@ def __repr__(self) -> str: def log(self) -> logging.Logger: return self._log - @property - def fs(self) -> FileSystem: - return self._fs - @property def is_distributed(self) -> bool: return False @@ -395,9 +391,7 @@ def load_df( **kwargs: Any, ) -> LocalBoundedDataFrame: return self.to_df( - load_df( - path, format_hint=format_hint, columns=columns, fs=self.fs, **kwargs - ) + load_df(path, format_hint=format_hint, columns=columns, **kwargs) ) def save_df( @@ -413,9 +407,9 @@ def save_df( partition_spec = partition_spec or PartitionSpec() if not force_single and not partition_spec.empty: kwargs["partition_cols"] = partition_spec.partition_by - self.fs.makedirs(os.path.dirname(path), recreate=True) + makedirs(os.path.dirname(path), exist_ok=True) df = self.to_df(df) - save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) + save_df(df, path, format_hint=format_hint, mode=mode, **kwargs) @fugue_annotated_param(NativeExecutionEngine) diff --git a/fugue/workflow/_checkpoint.py b/fugue/workflow/_checkpoint.py index 2cc25039..382244cc 100644 --- a/fugue/workflow/_checkpoint.py +++ b/fugue/workflow/_checkpoint.py @@ -1,14 +1,15 @@ from typing import Any -import fs as pfs +from triad.utils.assertion import assert_or_throw +from triad.utils.hash import to_uuid +from triad.utils.io import exists, join, makedirs, rm + from fugue.collections.partition import PartitionSpec from fugue.collections.yielded import PhysicalYielded from fugue.constants import FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH from fugue.dataframe import DataFrame from fugue.exceptions import FugueWorkflowCompileError, FugueWorkflowRuntimeError from fugue.execution.execution_engine import ExecutionEngine -from triad.utils.assertion import assert_or_throw -from triad.utils.hash import to_uuid class Checkpoint(object): @@ -130,7 +131,6 @@ def is_null(self) -> bool: class CheckpointPath(object): def __init__(self, engine: ExecutionEngine): self._engine = engine - self._fs = engine.fs self._log = engine.log self._path = engine.conf.get(FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH, "").strip() self._temp_path = "" @@ -143,14 +143,14 @@ def init_temp_path(self, execution_id: str) -> str: if self._path == "": self._temp_path = "" return "" - self._temp_path = pfs.path.combine(self._path, execution_id) - self._fs.makedirs(self._temp_path, recreate=True) + self._temp_path = join(self._path, execution_id) + makedirs(self._temp_path, exist_ok=True) return self._temp_path def remove_temp_path(self): if self._temp_path != "": try: - self._fs.removetree(self._temp_path) + rm(self._temp_path, recursive=True) except Exception as e: # pragma: no cover self._log.info("Unable to remove " + self._temp_path, e) @@ -162,7 +162,7 @@ def get_temp_file(self, obj_id: str, permanent: bool) -> str: f"{FUGUE_CONF_WORKFLOW_CHECKPOINT_PATH} is not set" ), ) - return pfs.path.combine(path, obj_id + ".parquet") + return join(path, obj_id + ".parquet") def get_table_name(self, obj_id: str, permanent: bool) -> str: path = self._path if permanent else self._temp_path @@ -170,6 +170,6 @@ def get_table_name(self, obj_id: str, permanent: bool) -> str: def temp_file_exists(self, path: str) -> bool: try: - return self._fs.exists(path) + return exists(path) except Exception: # pragma: no cover return False diff --git a/fugue_dask/_io.py b/fugue_dask/_io.py index e4d0001b..1e320a80 100644 --- a/fugue_dask/_io.py +++ b/fugue_dask/_io.py @@ -1,13 +1,12 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Union -import fsspec -import fs as pfs import pandas as pd from dask import dataframe as dd +from fsspec import AbstractFileSystem from triad.collections.dict import ParamDict -from triad.collections.fs import FileSystem from triad.collections.schema import Schema from triad.utils.assertion import assert_or_throw +from triad.utils.io import join, makedirs, url_to_fs from fugue._utils.io import FileParser, _get_single_files from fugue_dask.dataframe import DaskDataFrame @@ -19,7 +18,7 @@ def load_df( uri: Union[str, List[str]], format_hint: Optional[str] = None, columns: Any = None, - fs: Optional[FileSystem] = None, + fs: Optional[AbstractFileSystem] = None, **kwargs: Any, ) -> DaskDataFrame: if isinstance(uri, str): @@ -39,7 +38,7 @@ def save_df( uri: str, format_hint: Optional[str] = None, mode: str = "overwrite", - fs: Optional[FileSystem] = None, + fs: Optional[AbstractFileSystem] = None, **kwargs: Any, ) -> None: assert_or_throw( @@ -48,16 +47,13 @@ def save_df( ) p = FileParser(uri, format_hint).assert_no_glob() if fs is None: - fs = FileSystem() + fs, _ = url_to_fs(uri) if fs.exists(uri): assert_or_throw(mode == "overwrite", FileExistsError(uri)) try: - fs.remove(uri) - except Exception: - try: - fs.removetree(uri) - except Exception: # pragma: no cover - pass + fs.rm(uri, recursive=True) + except Exception: # pragma: no cover + pass _FORMAT_SAVE[p.file_format](df, p, **kwargs) @@ -67,7 +63,7 @@ def _save_parquet(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: "write_index": False, **kwargs, } - DASK_UTILS.to_parquet_friendly(df.native).to_parquet(p.uri, **params) + DASK_UTILS.to_parquet_friendly(df.native).to_parquet(p.path, **params) def _load_parquet( @@ -80,27 +76,26 @@ def _load_parquet( if pd.__version__ >= "1.5": dtype_backend = "pyarrow" if columns is None: - pdf = dd.read_parquet(p.uri, dtype_backend=dtype_backend, **params) + pdf = dd.read_parquet(p.path, dtype_backend=dtype_backend, **params) schema = Schema(pdf.head(1)) return pdf, schema if isinstance(columns, list): # column names pdf = dd.read_parquet( - p.uri, columns=columns, dtype_backend=dtype_backend, **params + p.path, columns=columns, dtype_backend=dtype_backend, **params ) schema = Schema(pdf.head(1)) return pdf, schema schema = Schema(columns) pdf = dd.read_parquet( - p.uri, columns=schema.names, dtype_backend=dtype_backend, **params + p.path, columns=schema.names, dtype_backend=dtype_backend, **params ) return pdf, schema def _save_csv(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: - fs, path = fsspec.core.url_to_fs(p.uri) - fs.makedirs(path, exist_ok=True) + makedirs(p.path, exist_ok=True) df.native.to_csv( - pfs.path.combine(p.uri, "*.csv"), **{"index": False, "header": False, **kwargs} + p.join("*.csv").path, **{"index": False, "header": False, **kwargs} ) @@ -108,7 +103,7 @@ def _safe_load_csv(path: str, **kwargs: Any) -> dd.DataFrame: try: return dd.read_csv(path, **kwargs) except (IsADirectoryError, PermissionError): - return dd.read_csv(pfs.path.combine(path, "*.csv"), **kwargs) + return dd.read_csv(join(path, "*.csv"), **kwargs) def _load_csv( # noqa: C901 @@ -127,7 +122,7 @@ def _load_csv( # noqa: C901 header = kw["header"] del kw["header"] if str(header) in ["True", "0"]: - pdf = _safe_load_csv(p.uri, **{"header": 0, **kw}) + pdf = _safe_load_csv(p.path, **{"header": 0, **kw}) if columns is None: return pdf, None if isinstance(columns, list): # column names @@ -138,34 +133,32 @@ def _load_csv( # noqa: C901 if columns is None: raise ValueError("columns must be set if without header") if isinstance(columns, list): # column names - pdf = _safe_load_csv(p.uri, **{"header": None, "names": columns, **kw}) + pdf = _safe_load_csv(p.path, **{"header": None, "names": columns, **kw}) return pdf, None schema = Schema(columns) - pdf = _safe_load_csv(p.uri, **{"header": None, "names": schema.names, **kw}) + pdf = _safe_load_csv(p.path, **{"header": None, "names": schema.names, **kw}) return pdf, schema else: raise NotImplementedError(f"{header} is not supported") def _save_json(df: DaskDataFrame, p: FileParser, **kwargs: Any) -> None: - fs, path = fsspec.core.url_to_fs(p.uri) - fs.makedirs(path, exist_ok=True) - df.native.to_json(pfs.path.combine(p.uri, "*.json"), **kwargs) + makedirs(p.path, exist_ok=True) + df.native.to_json(p.join("*.json").path, **kwargs) def _safe_load_json(path: str, **kwargs: Any) -> dd.DataFrame: try: return dd.read_json(path, **kwargs) except (IsADirectoryError, PermissionError): - x = dd.read_json(pfs.path.combine(path, "*.json"), **kwargs) - print(x.compute()) + x = dd.read_json(join(path, "*.json"), **kwargs) return x def _load_json( p: FileParser, columns: Any = None, **kwargs: Any ) -> Tuple[dd.DataFrame, Any]: - pdf = _safe_load_json(p.uri, **kwargs).reset_index(drop=True) + pdf = _safe_load_json(p.path, **kwargs).reset_index(drop=True) if columns is None: return pdf, None if isinstance(columns, list): # column names diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index abc07ea5..7dd0bae0 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -7,18 +7,17 @@ from distributed import Client from triad.collections import Schema from triad.collections.dict import IndexedOrderedDict, ParamDict -from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw from triad.utils.hash import to_uuid from triad.utils.pandas_like import PandasUtils from triad.utils.threading import RunOnce +from triad.utils.io import makedirs from fugue import StructuredRawSQL from fugue.collections.partition import ( PartitionCursor, PartitionSpec, parse_presort_exp, ) -from fugue.exceptions import FugueBug from fugue.constants import KEYWORD_PARALLELISM, KEYWORD_ROWCOUNT from fugue.dataframe import ( AnyDataFrame, @@ -28,6 +27,7 @@ PandasDataFrame, ) from fugue.dataframe.utils import get_join_schemas +from fugue.exceptions import FugueBug from fugue.execution.execution_engine import ExecutionEngine, MapEngine, SQLEngine from fugue.execution.native_execution_engine import NativeExecutionEngine from fugue_dask._constants import FUGUE_DASK_DEFAULT_CONF @@ -206,7 +206,6 @@ def __init__(self, dask_client: Optional[Client] = None, conf: Any = None): p = ParamDict(FUGUE_DASK_DEFAULT_CONF) p.update(ParamDict(conf)) super().__init__(p) - self._fs = FileSystem() self._log = logging.getLogger() self._client = DASK_UTILS.get_or_create_client(dask_client) self._native = NativeExecutionEngine(conf=conf) @@ -227,10 +226,6 @@ def dask_client(self) -> Client: def log(self) -> logging.Logger: return self._log - @property - def fs(self) -> FileSystem: - return self._fs - def create_default_sql_engine(self) -> SQLEngine: return DaskSQLEngine(self) @@ -527,9 +522,7 @@ def load_df( **kwargs: Any, ) -> DaskDataFrame: return self.to_df( - load_df( - path, format_hint=format_hint, columns=columns, fs=self.fs, **kwargs - ) + load_df(path, format_hint=format_hint, columns=columns, **kwargs) ) def save_df( @@ -556,9 +549,9 @@ def save_df( else: if not partition_spec.empty: kwargs["partition_on"] = partition_spec.partition_by - self.fs.makedirs(os.path.dirname(path), recreate=True) + makedirs(os.path.dirname(path), exist_ok=True) df = self.to_df(df) - save_df(df, path, format_hint=format_hint, mode=mode, fs=self.fs, **kwargs) + save_df(df, path, format_hint=format_hint, mode=mode, **kwargs) def to_dask_engine_df(df: Any, schema: Any = None) -> DaskDataFrame: diff --git a/fugue_duckdb/_io.py b/fugue_duckdb/_io.py index 7d444fe5..56d21373 100644 --- a/fugue_duckdb/_io.py +++ b/fugue_duckdb/_io.py @@ -3,9 +3,9 @@ from duckdb import DuckDBPyConnection from triad import ParamDict, Schema -from triad.collections.fs import FileSystem -from triad.utils.assertion import assert_or_throw +from triad.utils.assertion import assert_or_throw +from triad.utils.io import isdir, makedirs, rm, exists from fugue._utils.io import FileParser, load_df, save_df from fugue.collections.sql import TempTableName from fugue.dataframe import ArrowDataFrame, LocalBoundedDataFrame @@ -18,26 +18,17 @@ from fugue_duckdb.dataframe import DuckDataFrame -def _get_single_files( - fp: Iterable[FileParser], fs: FileSystem, fmt: str -) -> Iterable[FileParser]: - def _isdir(d: str) -> bool: - try: - return fs.isdir(d) - except Exception: # pragma: no cover - return False - +def _get_files(fp: Iterable[FileParser], fmt: str) -> Iterable[FileParser]: for f in fp: - if f.glob_pattern == "" and _isdir(f.uri): - yield f.with_glob("*." + fmt, fmt) + if not f.has_glob and isdir(f.path): + yield from f.join("*." + fmt, fmt).find_all() else: yield f class DuckDBIO: - def __init__(self, fs: FileSystem, con: DuckDBPyConnection) -> None: + def __init__(self, con: DuckDBPyConnection) -> None: self._con = con - self._fs = fs self._format_load = {"csv": self._load_csv, "parquet": self._load_parquet} self._format_save = {"csv": self._save_csv, "parquet": self._save_parquet} @@ -55,11 +46,9 @@ def load_df( else: fp = [FileParser(u, format_hint) for u in uri] if fp[0].file_format not in self._format_load: - return load_df( - uri, format_hint=format_hint, columns=columns, fs=self._fs, **kwargs - ) + return load_df(uri, format_hint=format_hint, columns=columns, **kwargs) dfs: List[DuckDataFrame] = [] - for f in _get_single_files(fp, self._fs, fp[0].file_format): + for f in _get_files(fp, fp[0].file_format): df = self._format_load[f.file_format](f, columns, **kwargs) dfs.append(df) rel = dfs[0].native @@ -83,26 +72,20 @@ def save_df( ) p = FileParser(uri, format_hint).assert_no_glob() if (p.file_format not in self._format_save) or ("partition_cols" in kwargs): - self._fs.makedirs(os.path.dirname(uri), recreate=True) + makedirs(os.path.dirname(uri), exist_ok=True) ldf = ArrowDataFrame(df.as_arrow()) - return save_df( - ldf, uri=uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs - ) - fs = self._fs - if fs.exists(uri): + return save_df(ldf, uri=uri, format_hint=format_hint, mode=mode, **kwargs) + if exists(uri): assert_or_throw(mode == "overwrite", FileExistsError(uri)) try: - fs.remove(uri) - except Exception: - try: - fs.removetree(uri) - except Exception: # pragma: no cover - pass - if not fs.exists(p.parent): - fs.makedirs(p.parent, recreate=True) + rm(uri, recursive=True) + except Exception: # pragma: no cover + pass + p.make_parent_dirs() self._format_save[p.file_format](df, p, **kwargs) def _save_csv(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): + p.assert_no_glob() dn = TempTableName() df.native.create_view(dn.key) kw = ParamDict({k.lower(): v for k, v in kwargs.items()}) @@ -111,7 +94,7 @@ def _save_csv(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): for k, v in kw.items(): params.append(f"{k.upper()} " + encode_value_to_expr(v)) pm = ", ".join(params) - query = f"COPY {dn.key} TO {encode_value_to_expr(p.uri)} WITH ({pm})" + query = f"COPY {dn.key} TO {encode_value_to_expr(p.path)} WITH ({pm})" self._con.execute(query) def _load_csv( # noqa: C901 @@ -125,7 +108,7 @@ def _load_csv( # noqa: C901 ValueError("when csv has no header, columns must be specified"), ) kw.pop("auto_detect", None) - params: List[str] = [encode_value_to_expr(p.uri_with_glob)] + params: List[str] = [encode_value_to_expr(p.path)] kw["header"] = 1 if header else 0 kw["auto_detect"] = 1 if infer_schema else 0 if infer_schema: @@ -188,6 +171,7 @@ def _load_csv( # noqa: C901 return DuckDataFrame(self._con.from_query(query)) def _save_parquet(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): + p.assert_no_glob() dn = TempTableName() df.native.create_view(dn.key) kw = ParamDict({k.lower(): v for k, v in kwargs.items()}) @@ -196,7 +180,7 @@ def _save_parquet(self, df: DuckDataFrame, p: FileParser, **kwargs: Any): for k, v in kw.items(): params.append(f"{k.upper()} " + encode_value_to_expr(v)) pm = ", ".join(params) - query = f"COPY {dn.key} TO {encode_value_to_expr(p.uri)}" + query = f"COPY {dn.key} TO {encode_value_to_expr(p.path)}" if len(params) > 0: query += f" WITH ({pm})" self._con.execute(query) @@ -205,7 +189,7 @@ def _load_parquet( self, p: FileParser, columns: Any = None, **kwargs: Any ) -> DuckDataFrame: kw = ParamDict({k.lower(): v for k, v in kwargs.items()}) - params: List[str] = [encode_value_to_expr(p.uri_with_glob)] + params: List[str] = [encode_value_to_expr(p.path)] if isinstance(columns, list): cols = ", ".join(encode_column_names(columns)) else: diff --git a/fugue_duckdb/execution_engine.py b/fugue_duckdb/execution_engine.py index ca67b43f..5d6070b3 100644 --- a/fugue_duckdb/execution_engine.py +++ b/fugue_duckdb/execution_engine.py @@ -4,7 +4,6 @@ import duckdb from duckdb import DuckDBPyConnection, DuckDBPyRelation from triad import SerializableRLock -from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw from triad.utils.schema import quote_name @@ -195,10 +194,6 @@ def connection(self) -> DuckDBPyConnection: def log(self) -> logging.Logger: return self._native_engine.log - @property - def fs(self) -> FileSystem: - return self._native_engine.fs - def create_default_sql_engine(self) -> SQLEngine: return DuckDBEngine(self) @@ -488,7 +483,7 @@ def load_df( columns: Any = None, **kwargs: Any, ) -> LocalBoundedDataFrame: - dio = DuckDBIO(self.fs, self.connection) + dio = DuckDBIO(self.connection) return dio.load_df(path, format_hint, columns, **kwargs) def save_df( @@ -504,7 +499,7 @@ def save_df( partition_spec = partition_spec or PartitionSpec() if not partition_spec.empty and not force_single: kwargs["partition_cols"] = partition_spec.partition_by - dio = DuckDBIO(self.fs, self.connection) + dio = DuckDBIO(self.connection) dio.save_df(_to_duck_df(self, df), path, format_hint, mode, **kwargs) def convert_yield_dataframe(self, df: DataFrame, as_local: bool) -> DataFrame: diff --git a/fugue_ibis/execution_engine.py b/fugue_ibis/execution_engine.py index cdc532b0..1c8b3c4c 100644 --- a/fugue_ibis/execution_engine.py +++ b/fugue_ibis/execution_engine.py @@ -5,7 +5,7 @@ import ibis from ibis import BaseBackend -from triad import FileSystem, assert_or_throw +from triad import assert_or_throw from fugue import StructuredRawSQL from fugue.bag import Bag, LocalBag @@ -375,10 +375,6 @@ def create_default_map_engine(self) -> MapEngine: def log(self) -> logging.Logger: return self.non_ibis_engine.log - @property - def fs(self) -> FileSystem: - return self.non_ibis_engine.fs - def get_current_parallelism(self) -> int: return self.non_ibis_engine.get_current_parallelism() diff --git a/fugue_ray/_utils/io.py b/fugue_ray/_utils/io.py index 6198b81c..5e53e095 100644 --- a/fugue_ray/_utils/io.py +++ b/fugue_ray/_utils/io.py @@ -4,23 +4,24 @@ import pyarrow as pa import ray.data as rd -from fugue import ExecutionEngine -from fugue._utils.io import FileParser, save_df -from fugue.collections.partition import PartitionSpec -from fugue.dataframe import DataFrame -from fugue_ray.dataframe import RayDataFrame from pyarrow import csv as pacsv from pyarrow import json as pajson from ray.data.datasource import FileExtensionFilter from triad.collections import Schema from triad.collections.dict import ParamDict from triad.utils.assertion import assert_or_throw +from triad.utils.io import exists, makedirs, rm + +from fugue import ExecutionEngine +from fugue._utils.io import FileParser, save_df +from fugue.collections.partition import PartitionSpec +from fugue.dataframe import DataFrame +from fugue_ray.dataframe import RayDataFrame class RayIO(object): def __init__(self, engine: ExecutionEngine): self._engine = engine - self._fs = engine.fs self._logger = engine.log self._loads: Dict[str, Callable[..., DataFrame]] = { "csv": self._load_csv, @@ -49,7 +50,7 @@ def load_df( len(fmts) == 1, NotImplementedError("can't support multiple formats") ) fmt = fmts[0] - files = [f.uri for f in fp] + files = [f.path for f in fp] return self._loads[fmt](files, columns, **kwargs) def save_df( @@ -63,24 +64,21 @@ def save_df( **kwargs: Any, ) -> None: partition_spec = partition_spec or PartitionSpec() - if self._fs.exists(uri): + if exists(uri): assert_or_throw(mode == "overwrite", FileExistsError(uri)) try: - self._fs.remove(uri) - except Exception: - try: - self._fs.removetree(uri) - except Exception: # pragma: no cover - pass + rm(uri, recursive=True) + except Exception: # pragma: no cover + pass p = FileParser(uri, format_hint) if not force_single: df = self._prepartition(df, partition_spec=partition_spec) - self._saves[p.file_format](df=df, uri=p.uri, **kwargs) + self._saves[p.file_format](df=df, uri=p.path, **kwargs) else: ldf = df.as_local() - self._fs.makedirs(os.path.dirname(uri), recreate=True) - save_df(ldf, uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs) + makedirs(os.path.dirname(uri), exist_ok=True) + save_df(ldf, uri, format_hint=format_hint, mode=mode, **kwargs) def _save_parquet( self, diff --git a/fugue_spark/_utils/io.py b/fugue_spark/_utils/io.py index c36a8856..7a18b67d 100644 --- a/fugue_spark/_utils/io.py +++ b/fugue_spark/_utils/io.py @@ -4,7 +4,6 @@ from pyspark.sql import SparkSession from triad.collections import Schema from triad.collections.dict import ParamDict -from triad.collections.fs import FileSystem from triad.utils.assertion import assert_or_throw from fugue._utils.io import FileParser, save_df @@ -16,9 +15,8 @@ class SparkIO(object): - def __init__(self, spark_session: SparkSession, fs: FileSystem): + def __init__(self, spark_session: SparkSession): self._session = spark_session - self._fs = fs self._loads: Dict[str, Callable[..., DataFrame]] = { "csv": self._load_csv, "parquet": self._load_parquet, @@ -41,7 +39,7 @@ def load_df( len(fmts) == 1, NotImplementedError("can't support multiple formats") ) fmt = fmts[0] - files = [f.uri for f in fp] + files = [f.path for f in fp] return self._loads[fmt](files, columns, **kwargs) def save_df( @@ -64,7 +62,7 @@ def save_df( ldf = df.as_local() if isinstance(ldf, PandasDataFrame) and hasattr(ldf.native, "attrs"): ldf.native.attrs = {} # pragma: no cover - save_df(ldf, uri, format_hint=format_hint, mode=mode, fs=self._fs, **kwargs) + save_df(ldf, uri, format_hint=format_hint, mode=mode, **kwargs) def _get_writer( self, sdf: ps.DataFrame, partition_spec: PartitionSpec diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index 4a3d917b..fa527b14 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -11,7 +11,7 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import broadcast, col, lit, row_number from pyspark.sql.window import Window -from triad import FileSystem, IndexedOrderedDict, ParamDict, Schema, SerializableRLock +from triad import IndexedOrderedDict, ParamDict, Schema, SerializableRLock from triad.utils.assertion import assert_arg_not_none, assert_or_throw from triad.utils.hash import to_uuid from triad.utils.iter import EmptyAwareIterable @@ -360,13 +360,12 @@ def __init__(self, spark_session: Optional[SparkSession] = None, conf: Any = Non cf.update(ParamDict(conf)) super().__init__(cf) self._lock = SerializableRLock() - self._fs = FileSystem() self._log = logging.getLogger() self._broadcast_func = RunOnce( self._broadcast, lambda *args, **kwargs: id(args[0]) ) self._persist_func = RunOnce(self._persist, lambda *args, **kwargs: id(args[0])) - self._io = SparkIO(self.spark_session, self.fs) + self._io = SparkIO(self.spark_session) self._registered_dfs: Dict[str, SparkDataFrame] = {} def __repr__(self) -> str: @@ -395,10 +394,6 @@ def is_distributed(self) -> bool: def log(self) -> logging.Logger: return self._log - @property - def fs(self) -> FileSystem: - return self._fs - def create_default_sql_engine(self) -> SQLEngine: return SparkSQLEngine(self) diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index aa3af323..53b70bdc 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -12,11 +12,12 @@ from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional from unittest import TestCase from uuid import uuid4 - +from triad.utils.io import write_text, join import numpy as np import pandas as pd import pyarrow as pa import pytest +from fsspec.implementations.local import LocalFileSystem from pytest import raises from triad import SerializableRLock @@ -28,7 +29,6 @@ DataFrame, DataFrames, ExecutionEngine, - FileSystem, FugueWorkflow, LocalDataFrame, OutputCoTransformer, @@ -65,6 +65,8 @@ FugueWorkflowRuntimeValidationError, ) +_LOCAL_FS = LocalFileSystem(auto_mkdir=True) + class BuiltInTests(object): """Workflow level general test suite. It is a more general end to end @@ -633,9 +635,8 @@ def test_out_transform(self): # noqa: C901 tmpdir = str(self.tmpdir) def incr(): - fs = FileSystem(auto_close=False).makedirs(tmpdir, recreate=True) - fs.writetext(str(uuid4()) + ".txt", "") - return fs.glob("*.txt").count().files + write_text(join(tmpdir, str(uuid4()) + ".txt"), "") + return len(_LOCAL_FS.glob(join(tmpdir, "*.txt"))) def t1(df: Iterable[Dict[str, Any]]) -> Iterable[Dict[str, Any]]: for row in df: @@ -717,9 +718,8 @@ def test_out_cotransform(self): # noqa: C901 tmpdir = str(self.tmpdir) def incr(): - fs = FileSystem(auto_close=False).makedirs(tmpdir, recreate=True) - fs.writetext(str(uuid4()) + ".txt", "") - return fs.glob("*.tx" "t").count().files + write_text(join(tmpdir, str(uuid4()) + ".txt"), "") + return len(_LOCAL_FS.glob(join(tmpdir, "*.txt"))) def t1( df: Iterable[Dict[str, Any]], df2: pd.DataFrame @@ -1348,7 +1348,7 @@ def test_io(self): b.partition(num=3).save(path, fmt="parquet", single=True) b.save(path2, header=True) dag.run(self.engine) - assert FileSystem().isfile(path) + assert _LOCAL_FS.isfile(path) with FugueWorkflow() as dag: a = dag.load(path, fmt="parquet", columns=["a", "c"]) a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int")) @@ -1359,9 +1359,9 @@ def test_io(self): b = dag.df([[6, 1], [2, 7]], "c:int,a:long") b.partition(by="c").save(path3, fmt="parquet", single=False) dag.run(self.engine) - assert FileSystem().isdir(path3) - assert FileSystem().isdir(os.path.join(path3, "c=6")) - assert FileSystem().isdir(os.path.join(path3, "c=2")) + assert _LOCAL_FS.isdir(path3) + assert _LOCAL_FS.isdir(os.path.join(path3, "c=6")) + assert _LOCAL_FS.isdir(os.path.join(path3, "c=2")) # TODO: in test below, once issue #288 is fixed, use dag.load # instead of pd.read_parquet pdf = pd.read_parquet(path3).sort_values("a").reset_index(drop=True) diff --git a/fugue_test/execution_suite.py b/fugue_test/execution_suite.py index bf3c6148..ec1c8aa6 100644 --- a/fugue_test/execution_suite.py +++ b/fugue_test/execution_suite.py @@ -15,8 +15,8 @@ import pandas as pd import pytest from pytest import raises -from triad.collections.fs import FileSystem from triad.exceptions import InvalidOperationError +from triad.utils.io import isfile, makedirs, touch import fugue.api as fa import fugue.column.functions as ff @@ -62,7 +62,6 @@ def make_engine(self) -> ExecutionEngine: # pragma: no cover def test_init(self): print(self.engine) assert self.engine.log is not None - assert self.engine.fs is not None assert copy.copy(self.engine) is self.engine assert copy.deepcopy(self.engine) is self.engine @@ -985,17 +984,16 @@ def on_init(partition_no, dfs): df_eq(res, [[1, "z1"]], "a:int,v:str", throw=True) @pytest.fixture(autouse=True) - def init_tmpdir(self, tmpdir): + def init_tmpdir(self, tmpdir, tmp_mem_dir): self.tmpdir = tmpdir def test_save_single_and_load_parquet(self): - e = self.engine b = ArrayDataFrame([[6, 1], [2, 7]], "c:int,a:long") path = os.path.join(self.tmpdir, "a", "b") - e.fs.makedirs(path, recreate=True) + makedirs(path, exist_ok=True) # over write folder with single file fa.save(b, path, format_hint="parquet", force_single=True) - assert e.fs.isfile(path) + assert isfile(path) c = fa.load(path, format_hint="parquet", columns=["a", "c"], as_fugue=True) df_eq(c, [[1, 6], [7, 2]], "a:long,c:int", throw=True) @@ -1019,7 +1017,7 @@ def test_load_parquet_folder(self): path = os.path.join(self.tmpdir, "a", "b") fa.save(a, os.path.join(path, "a.parquet"), engine=native) fa.save(b, os.path.join(path, "b.parquet"), engine=native) - FileSystem().touch(os.path.join(path, "_SUCCESS")) + touch(os.path.join(path, "_SUCCESS")) c = fa.load(path, format_hint="parquet", columns=["a", "c"], as_fugue=True) df_eq(c, [[1, 6], [7, 2], [8, 4]], "a:long,c:int", throw=True) @@ -1038,13 +1036,12 @@ def test_load_parquet_files(self): df_eq(c, [[1, 6], [7, 2], [8, 4]], "a:long,c:int", throw=True) def test_save_single_and_load_csv(self): - e = self.engine b = ArrayDataFrame([[6.1, 1.1], [2.1, 7.1]], "c:double,a:double") path = os.path.join(self.tmpdir, "a", "b") - e.fs.makedirs(path, recreate=True) + makedirs(path, exist_ok=True) # over write folder with single file fa.save(b, path, format_hint="csv", header=True, force_single=True) - assert e.fs.isfile(path) + assert isfile(path) c = fa.load( path, format_hint="csv", header=True, infer_schema=False, as_fugue=True ) @@ -1099,13 +1096,12 @@ def test_save_single_and_load_csv(self): df_eq(c, [["1.1", "60.1"], ["7.1", "20.1"]], "a:str,c:str", throw=True) def test_save_single_and_load_csv_no_header(self): - e = self.engine b = ArrayDataFrame([[6.1, 1.1], [2.1, 7.1]], "c:double,a:double") path = os.path.join(self.tmpdir, "a", "b") - e.fs.makedirs(path, recreate=True) + makedirs(path, exist_ok=True) # over write folder with single file fa.save(b, path, format_hint="csv", header=False, force_single=True) - assert e.fs.isfile(path) + assert isfile(path) with raises(ValueError): c = fa.load( @@ -1190,7 +1186,7 @@ def test_load_csv_folder(self): header=True, engine=native, ) - FileSystem().touch(os.path.join(path, "_SUCCESS")) + touch(os.path.join(path, "_SUCCESS")) c = fa.load( path, format_hint="csv", @@ -1204,13 +1200,12 @@ def test_load_csv_folder(self): ) def test_save_single_and_load_json(self): - e = self.engine b = ArrayDataFrame([[6, 1], [2, 7]], "c:int,a:long") path = os.path.join(self.tmpdir, "a", "b") - e.fs.makedirs(path, recreate=True) + makedirs(path, exist_ok=True) # over write folder with single file fa.save(b, path, format_hint="json", force_single=True) - assert e.fs.isfile(path) + assert isfile(path) c = fa.load(path, format_hint="json", columns=["a", "c"], as_fugue=True) df_eq(c, [[1, 6], [7, 2]], "a:long,c:long", throw=True) @@ -1241,7 +1236,7 @@ def test_load_json_folder(self): path = os.path.join(self.tmpdir, "a", "b") fa.save(a, os.path.join(path, "a.json"), format_hint="json", engine=native) fa.save(b, os.path.join(path, "b.json"), format_hint="json", engine=native) - FileSystem().touch(os.path.join(path, "_SUCCESS")) + touch(os.path.join(path, "_SUCCESS")) c = fa.load(path, format_hint="json", columns=["a", "c"], as_fugue=True) df_eq(c, [[1, 6], [7, 2], [8, 4], [4, 3]], "a:long,c:long", throw=True) diff --git a/fugue_test/plugins/misc/__init__.py b/fugue_test/plugins/misc/__init__.py new file mode 100644 index 00000000..9407d26a --- /dev/null +++ b/fugue_test/plugins/misc/__init__.py @@ -0,0 +1,2 @@ +# flake8: noqa +from .fixtures import tmp_mem_dir diff --git a/fugue_test/plugins/misc/fixtures.py b/fugue_test/plugins/misc/fixtures.py new file mode 100644 index 00000000..6ca80012 --- /dev/null +++ b/fugue_test/plugins/misc/fixtures.py @@ -0,0 +1,18 @@ +import uuid + +import pytest +from triad.utils.io import makedirs, rm + + +@pytest.fixture +def tmp_mem_dir(): + uuid_str = str(uuid.uuid4())[:5] + path = "memory://test_" + uuid_str + makedirs(path) + try: + yield path + finally: + try: + rm(path, recursive=True) + except Exception: # pragma: no cover + pass diff --git a/requirements.txt b/requirements.txt index ff04ff56..b3eaef95 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,6 +25,8 @@ seaborn notebook<7 jupyter_contrib_nbextensions +s3fs + pyspark[connect] duckdb-engine>=0.6.4 sqlalchemy==2.0.10 # 2.0.11 has a bug diff --git a/setup.py b/setup.py index a4d81db0..1e5425df 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ def get_version() -> str: keywords="distributed spark dask ray sql dsl domain specific language", url="http://github.com/fugue-project/fugue", install_requires=[ - "triad==0.9.2.dev5", + "triad==0.9.2.dev8", "adagio>=0.2.4", # sql dependencies "qpd>=0.4.4", @@ -109,6 +109,7 @@ def get_version() -> str: "fugue_test_dask = fugue_test.plugins.dask[dask]", "fugue_test_ray = fugue_test.plugins.ray[ray]", "fugue_test_duckdb = fugue_test.plugins.duckdb[duckdb]", + "fugue_test_misc = fugue_test.plugins.misc", ], }, ) diff --git a/tests/fugue/dataframe/test_utils.py b/tests/fugue/dataframe/test_utils.py index a23f32e6..00c36d1e 100644 --- a/tests/fugue/dataframe/test_utils.py +++ b/tests/fugue/dataframe/test_utils.py @@ -5,7 +5,7 @@ import pandas as pd import pyarrow as pa from pytest import raises -from triad import FileSystem, Schema +from triad import Schema from triad.collections.schema import SchemaError from triad.exceptions import InvalidOperationError, NoneArgumentError @@ -112,7 +112,6 @@ def assert_eq(df, df_expected=None, raw=False): else: df_eq(df_expected, df_actual, throw=True) - fs = FileSystem() assert deserialize_df(serialize_df(None)) is None assert_eq(ArrayDataFrame([], "a:int,b:int")) assert_eq(ArrayDataFrame([[None, None]], "a:int,b:int")) @@ -136,8 +135,7 @@ def assert_eq(df, df_expected=None, raw=False): path = os.path.join(tmpdir, "1.pkl") df = ArrayDataFrame([[None, None]], "a:int,b:int") - s = serialize_df(df, 0, path, fs) - df_eq(df, deserialize_df(s, fs), throw=True) + s = serialize_df(df, 0, path) df_eq(df, deserialize_df(s), throw=True) s = serialize_df(df, 0, path) diff --git a/tests/fugue/utils/test_io.py b/tests/fugue/utils/test_io.py index e8d157dd..a99a9aff 100644 --- a/tests/fugue/utils/test_io.py +++ b/tests/fugue/utils/test_io.py @@ -1,64 +1,87 @@ import os +import sys +import pytest +from pytest import raises +from triad.utils.io import makedirs, read_text, touch, exists + +from fugue._utils.io import _FORMAT_MAP, FileParser, load_df, save_df from fugue.dataframe.array_dataframe import ArrayDataFrame from fugue.dataframe.pandas_dataframe import PandasDataFrame from fugue.dataframe.utils import _df_eq as df_eq -from fugue._utils.io import FileParser, load_df, save_df, _FORMAT_MAP -from fugue.exceptions import FugueDataFrameOperationError -from pytest import raises -from triad.collections.fs import FileSystem -from triad.exceptions import InvalidOperationError -def test_file_parser(): - f = FileParser("c.parquet") - assert "c.parquet" == f.uri - assert "c.parquet" == f.uri_with_glob - assert "" == f.scheme - assert "c.parquet" == f.path +@pytest.mark.skipif(sys.platform.startswith("win"), reason="not a test for windows") +def test_file_parser_linux(): + f = FileParser("/a/b/c.parquet") + assert "/a/b/c.parquet" == f.path + assert not f.has_glob assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "" == f.glob_pattern - assert "." == f.parent + assert "file:///a/b" == f.parent - f = FileParser("/a/b/c.parquet") - assert "/a/b/c.parquet" == f.uri - assert "/a/b/c.parquet" == f.uri_with_glob - assert "" == f.scheme - assert "/a/b/c.parquet" == f.path + +@pytest.mark.skipif( + not sys.platform.startswith("win"), reason="a test only for windows" +) +def test_file_parser_win(): + f = FileParser("c:\\a\\c.parquet") + assert "c:\\a\\c.parquet" == f.path + assert ".parquet" == f.suffix + assert "parquet" == f.file_format + assert not f.has_glob + assert "file://c:/a" == f.parent + + f = FileParser("c:\\a\\*.parquet") + assert "c:\\a\\*.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "" == f.glob_pattern - assert "/a/b" == f.parent + assert f.has_glob + assert "file://c:/a" == f.parent + + +def test_file_parser(tmpdir): + f = FileParser("c.parquet") + assert "c.parquet" == f.raw_path + assert ".parquet" == f.suffix + assert "parquet" == f.file_format + # assert "." == f.parent + + tp = os.path.join(str(tmpdir), "a", "b") + f = FileParser(os.path.join(tp, "c.parquet")) + assert not exists(tp) + f.make_parent_dirs() + assert exists(tp) + f.make_parent_dirs() + assert exists(tp) + + f = FileParser("memory://c.parquet") + assert "memory://c.parquet" == f.raw_path + assert "memory:///c.parquet" == f.path + assert ".parquet" == f.suffix + assert "parquet" == f.file_format + assert "memory:///" == f.parent for k, v in _FORMAT_MAP.items(): f = FileParser(f"s3://a/b/c{k}") - assert f"s3://a/b/c{k}" == f.uri - assert "s3" == f.scheme - assert f"/b/c{k}" == f.path + assert f"s3://a/b/c{k}" == f.raw_path assert k == f.suffix assert v == f.file_format assert "s3://a/b" == f.parent f = FileParser("s3://a/b/c.test.parquet") - assert "s3://a/b/c.test.parquet" == f.uri - assert "s3" == f.scheme - assert "/b/c.test.parquet" == f.path + assert "s3://a/b/c.test.parquet" == f.raw_path assert ".test.parquet" == f.suffix assert "parquet" == f.file_format assert "s3://a/b" == f.parent f = FileParser("s3://a/b/c.ppp.gz", "csv") - assert "s3://a/b/c.ppp.gz" == f.uri - assert "s3" == f.scheme - assert "/b/c.ppp.gz" == f.path + assert "s3://a/b/c.ppp.gz" == f.raw_path assert ".ppp.gz" == f.suffix assert "csv" == f.file_format f = FileParser("s3://a/b/c", "csv") - assert "s3://a/b/c" == f.uri - assert "s3" == f.scheme - assert "/b/c" == f.path + assert "s3://a/b/c" == f.raw_path assert "" == f.suffix assert "csv" == f.file_format @@ -67,48 +90,42 @@ def test_file_parser(): raises(NotImplementedError, lambda: FileParser("s3://a/b/c")) -def test_file_parser_glob(): +@pytest.mark.skipif(sys.platform.startswith("win"), reason="not a test for windows") +def test_file_parser_glob_linux(): f = FileParser("/a/b/*.parquet") - assert "/a/b" == f.uri - assert "" == f.scheme assert "/a/b/*.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "*.parquet" == f.glob_pattern - assert "/a/b/*.parquet" == f.uri_with_glob + assert f.has_glob f = FileParser("/a/b/*123.parquet") - assert "/a/b" == f.uri - assert "" == f.scheme assert "/a/b/*123.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "*123.parquet" == f.glob_pattern - assert "/a/b/*123.parquet" == f.uri_with_glob + assert f.has_glob + +def test_file_parser_glob(): f = FileParser("s3://a/b/*.parquet") - assert "s3://a/b" == f.uri - assert "s3" == f.scheme - assert "/b/*.parquet" == f.path + assert "s3://a/b/*.parquet" == f.path assert ".parquet" == f.suffix assert "parquet" == f.file_format - assert "*.parquet" == f.glob_pattern - assert "s3://a/b/*.parquet" == f.uri_with_glob + assert f.has_glob - ff = FileParser("s3://a/b", "parquet").with_glob("*.csv", "csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b", "parquet").join("*.csv", "csv") + assert "s3://a/b/*.csv" == ff.path assert "csv" == ff.file_format - ff = FileParser("s3://a/b/", "csv").with_glob("*.csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b/", "csv").join("*.csv") + assert "s3://a/b/*.csv" == ff.path assert "csv" == ff.file_format - ff = FileParser("s3://a/b/*.parquet").with_glob("*.csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b/*.parquet").join("*.csv") + assert "s3://a/b/*.csv" == ff.path assert "csv" == ff.file_format - ff = FileParser("s3://a/b/*.parquet", "parquet").with_glob("*.csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b/*.parquet", "parquet").join("*.csv") + assert "s3://a/b/*.csv" == ff.path assert "parquet" == ff.file_format - ff = FileParser("s3://a/b/*.parquet", "parquet").with_glob("*.csv", "csv") - assert "s3://a/b/*.csv" == ff.uri_with_glob + ff = FileParser("s3://a/b/*.parquet", "parquet").join("*.csv", "csv") + assert "s3://a/b/*.csv" == ff.path assert "csv" == ff.file_format @@ -132,14 +149,13 @@ def test_parquet_io(tmpdir): raises(Exception, lambda: load_df(path, columns="bb:str,a:int")) # load directory - fs = FileSystem() for name in ["folder.parquet", "folder"]: folder = os.path.join(tmpdir, name) - fs.makedirs(folder) + makedirs(folder) f0 = os.path.join(folder, "_SUCCESS") f1 = os.path.join(folder, "1.parquet") f2 = os.path.join(folder, "3.parquet") - fs.touch(f0) + touch(f0) save_df(df1, f1) save_df(df1, f2) @@ -178,12 +194,11 @@ def test_parquet_io(tmpdir): def test_csv_io(tmpdir): - fs = FileSystem() df1 = PandasDataFrame([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.csv") # without header save_df(df1, path) - assert fs.readtext(path).startswith("1,2,3") + assert read_text(path).startswith("1,2,3") raises(ValueError, lambda: load_df(path, header=False)) actual = load_df(path, columns=["a", "b", "c"], header=False, infer_schema=True) assert [[1, 2, 3]] == actual.as_array() @@ -193,7 +208,7 @@ def test_csv_io(tmpdir): assert actual.schema == "a:double,b:str,c:str" # with header save_df(df1, path, header=True) - assert fs.readtext(path).startswith("a,b,c") + assert read_text(path).startswith("a,b,c") actual = load_df(path, header=True) assert [["1", "2", "3"]] == actual.as_array() actual = load_df(path, header=True, infer_schema=True) @@ -210,7 +225,6 @@ def test_csv_io(tmpdir): def test_json(tmpdir): - fs = FileSystem() df1 = PandasDataFrame([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.json") save_df(df1, path) diff --git a/tests/fugue/workflow/test_workflow_determinism.py b/tests/fugue/workflow/test_workflow_determinism.py index cc3f0483..a2645b32 100644 --- a/tests/fugue/workflow/test_workflow_determinism.py +++ b/tests/fugue/workflow/test_workflow_determinism.py @@ -1,6 +1,6 @@ from typing import Any, Dict, Iterable, List -from fugue import FileSystem, FugueWorkflow, Schema +from fugue import FugueWorkflow, Schema from fugue.execution.native_execution_engine import NativeExecutionEngine from triad import to_uuid diff --git a/tests/fugue_dask/test_io.py b/tests/fugue_dask/test_io.py index 7cca8c78..3e063bbc 100644 --- a/tests/fugue_dask/test_io.py +++ b/tests/fugue_dask/test_io.py @@ -1,8 +1,7 @@ import os from pytest import mark, raises -from triad.collections.fs import FileSystem -from triad.exceptions import InvalidOperationError +from triad.utils.io import makedirs, touch from fugue._utils.io import save_df as pd_save_df from fugue.dataframe.utils import _df_eq as df_eq @@ -30,14 +29,13 @@ def test_parquet_io(tmpdir, fugue_dask_client): raises(Exception, lambda: load_df(path, columns="bb:str,a:int")) # load directory - fs = FileSystem() for name in ["folder.parquet", "folder"]: folder = os.path.join(tmpdir, name) - fs.makedirs(folder) + makedirs(folder) f0 = os.path.join(folder, "_SUCCESS") f1 = os.path.join(folder, "1.parquet") f2 = os.path.join(folder, "3.parquet") - fs.touch(f0) + touch(f0) pd_save_df(df1, f1) pd_save_df(df1, f2) diff --git a/tests/fugue_ray/test_execution_engine.py b/tests/fugue_ray/test_execution_engine.py index ba55cdde..9f45f135 100644 --- a/tests/fugue_ray/test_execution_engine.py +++ b/tests/fugue_ray/test_execution_engine.py @@ -4,7 +4,7 @@ import pandas as pd import pytest import ray.data as rd -from triad import FileSystem +from triad.utils.io import isfile import fugue.api as fa from fugue import ArrayDataFrame, DataFrame, FugueWorkflow, transform @@ -189,7 +189,7 @@ def make_engine(self): connection=self._con, ) return e - + def test_datetime_in_workflow(self): pass @@ -223,7 +223,7 @@ def test_io(self): b.partition(num=3).save(path, fmt="parquet", single=True) b.save(path2, header=True) dag.run(self.engine) - assert FileSystem().isfile(path) + assert isfile(path) with FugueWorkflow() as dag: a = dag.load(path, fmt="parquet", columns=["a", "c"]) a.assert_eq(dag.df([[1, 6], [7, 2]], "a:long,c:int")) @@ -236,9 +236,9 @@ def test_io(self): # with FugueWorkflow() as dag: # b = dag.df([[6, 1], [2, 7]], "c:int,a:long") # b.partition(by="c").save(path3, fmt="parquet", single=False) - # assert FileSystem().isdir(path3) - # assert FileSystem().isdir(os.path.join(path3, "c=6")) - # assert FileSystem().isdir(os.path.join(path3, "c=2")) + # assert isdir(path3) + # assert isdir(os.path.join(path3, "c=6")) + # assert isdir(os.path.join(path3, "c=2")) # # TODO: in test below, once issue #288 is fixed, use dag.load # # instead of pd.read_parquet # pd.testing.assert_frame_equal( diff --git a/tests/fugue_spark/utils/test_io.py b/tests/fugue_spark/utils/test_io.py index 2c6356e0..c0532139 100644 --- a/tests/fugue_spark/utils/test_io.py +++ b/tests/fugue_spark/utils/test_io.py @@ -1,20 +1,20 @@ import os +from pyspark.sql import SparkSession +from pyspark.sql.utils import AnalysisException +from pytest import raises +from triad.utils.io import isfile, makedirs, touch + from fugue.collections.partition import PartitionSpec from fugue.dataframe.pandas_dataframe import PandasDataFrame from fugue.dataframe.utils import _df_eq as df_eq -from fugue_spark.dataframe import SparkDataFrame from fugue_spark._utils.convert import to_schema, to_spark_schema from fugue_spark._utils.io import SparkIO -from pyspark.sql import SparkSession -from pyspark.sql.utils import AnalysisException -from pytest import raises -from triad.collections.fs import FileSystem -from triad.exceptions import InvalidOperationError +from fugue_spark.dataframe import SparkDataFrame def test_parquet_io(tmpdir, spark_session): - si = SparkIO(spark_session, FileSystem()) + si = SparkIO(spark_session) df1 = _df([["1", 2, 3]], "a:str,b:int,c:long") df2 = _df([[[1, 2]]], "a:[int]") # {a:int} will become {a:long} because pyarrow lib has issue @@ -33,16 +33,15 @@ def test_parquet_io(tmpdir, spark_session): raises(Exception, lambda: si.load_df(path, columns="bb:str,a:int")) # load directory - fs = FileSystem() folder = os.path.join(tmpdir, "folder") - fs.makedirs(folder) + makedirs(folder) f0 = os.path.join(folder, "_SUCCESS") f1 = os.path.join(folder, "1.parquet") f2 = os.path.join(folder, "3.parquet") - fs.touch(f0) + touch(f0) si.save_df(df1, f1, force_single=True) si.save_df(df1, f2, force_single=True) - assert fs.isfile(f1) + assert isfile(f1) actual = si.load_df(folder, "parquet") df_eq(actual, [["1", 2, 3], ["1", 2, 3]], "a:str,b:int,c:long") @@ -61,8 +60,7 @@ def test_parquet_io(tmpdir, spark_session): def test_csv_io(tmpdir, spark_session): - fs = FileSystem() - si = SparkIO(spark_session, fs) + si = SparkIO(spark_session) df1 = _df([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.csv") # without header @@ -91,8 +89,7 @@ def test_csv_io(tmpdir, spark_session): def test_json_io(tmpdir, spark_session): - fs = FileSystem() - si = SparkIO(spark_session, fs) + si = SparkIO(spark_session) df1 = _df([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.json") si.save_df(df1, path) @@ -106,7 +103,7 @@ def test_json_io(tmpdir, spark_session): def test_save_with_partition(tmpdir, spark_session): - si = SparkIO(spark_session, FileSystem()) + si = SparkIO(spark_session) df1 = _df([["1", 2, 3]], "a:str,b:int,c:long") path = os.path.join(tmpdir, "a.parquet") si.save_df(df1, path, partition_spec=PartitionSpec(num=2))