Skip to content

Commit

Permalink
Replace fs with fsspec (#523)
Browse files Browse the repository at this point in the history
* Replace fs with fsspec

* update

* update

* update

* update

* update

* update

* update

* update

* update docs

* update
  • Loading branch information
goodwanghan authored Nov 5, 2023
1 parent 83f2b2b commit a0cc460
Show file tree
Hide file tree
Showing 28 changed files with 341 additions and 388 deletions.
6 changes: 5 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"name": "Fugue Development Environment",
"_image": "fugueproject/devenv:0.7.7",
"image": "mcr.microsoft.com/vscode/devcontainers/python:3.10",
"customizations": {
"vscode": {
"settings": {
"terminal.integrated.shell.linux": "/bin/bash",
"python.pythonPath": "/usr/local/bin/python",
"python.defaultInterpreterPath": "/usr/local/bin/python",
"editor.defaultFormatter": "ms-python.black-formatter",
"isort.interpreter": [
"/usr/local/bin/python"
],
Expand All @@ -16,6 +16,9 @@
],
"pylint.interpreter": [
"/usr/local/bin/python"
],
"black-formatter.interpreter": [
"/usr/local/bin/python"
]
},
"extensions": [
Expand All @@ -24,6 +27,7 @@
"ms-python.flake8",
"ms-python.pylint",
"ms-python.mypy",
"ms-python.black-formatter",
"GitHub.copilot",
"njpwerner.autodocstring"
]
Expand Down
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 0.8.7

- [488](https://github.com/fugue-project/fugue/issues/488) Migrate from fs to fsspec
- [521](https://github.com/fugue-project/fugue/issues/521) Add `as_dicts` to Fugue API
- [516](https://github.com/fugue-project/fugue/issues/516) Use `_collect_as_arrow` for Spark `as_arrow``
- [520](https://github.com/fugue-project/fugue/pull/520) Add Python 3.10 to Windows Tests
- [506](https://github.com/fugue-project/fugue/issues/506) Adopt pandas `ExtensionDType`
- [504](https://github.com/fugue-project/fugue/issues/504) Create Fugue pytest fixtures
- [503](https://github.com/fugue-project/fugue/issues/503) Deprecate python 3.7 support
Expand Down
1 change: 0 additions & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# flake8: noqa
from triad.collections import Schema
from triad.collections.fs import FileSystem

from fugue.api import out_transform, transform
from fugue.bag.array_bag import ArrayBag
Expand Down
173 changes: 84 additions & 89 deletions fugue/_utils/io.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,29 @@
import os
import pathlib
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import urlparse

import fs as pfs
import pandas as pd
from fsspec import AbstractFileSystem
from fsspec.implementations.local import LocalFileSystem
from triad.collections.dict import ParamDict
from triad.collections.fs import FileSystem
from triad.collections.schema import Schema
from triad.utils.assertion import assert_or_throw
from triad.utils.io import join, url_to_fs
from triad.utils.pandas_like import PD_UTILS

from fugue.dataframe import LocalBoundedDataFrame, LocalDataFrame, PandasDataFrame


class FileParser(object):
def __init__(self, path: str, format_hint: Optional[str] = None):
last = len(path)
has_glob = False
self._orig_format_hint = format_hint
for i in range(len(path)):
if path[i] in ["/", "\\"]:
last = i
if path[i] in ["*", "?"]:
has_glob = True
break
if not has_glob:
self._uri = urlparse(path)
self._glob_pattern = ""
self._path = self._uri.path
self._has_glob = "*" in path or "?" in path
self._raw_path = path
self._fs, self._fs_path = url_to_fs(path)
if not self.is_local:
self._path = self._fs.unstrip_protocol(self._fs_path)
else:
self._uri = urlparse(path[:last])
self._glob_pattern = path[last + 1 :]
self._path = pfs.path.combine(self._uri.path, self._glob_pattern)
self._path = os.path.abspath(self._fs._strip_protocol(path))

if format_hint is None or format_hint == "":
for k, v in _FORMAT_MAP.items():
Expand All @@ -48,56 +39,64 @@ def __init__(self, path: str, format_hint: Optional[str] = None):
self._format = format_hint

def assert_no_glob(self) -> "FileParser":
assert_or_throw(self.glob_pattern == "", f"{self.path} has glob pattern")
assert_or_throw(not self.has_glob, f"{self.raw_path} has glob pattern")
return self

def with_glob(self, glob: str, format_hint: Optional[str] = None) -> "FileParser":
uri = self.uri
if glob != "":
uri = pfs.path.combine(uri, glob)
return FileParser(uri, format_hint or self._orig_format_hint)

@property
def glob_pattern(self) -> str:
return self._glob_pattern
def has_glob(self):
return self._has_glob

@property
def uri(self) -> str:
return self._uri.geturl()
def is_local(self):
return isinstance(self._fs, LocalFileSystem)

@property
def uri_with_glob(self) -> str:
if self.glob_pattern == "":
return self.uri
return pfs.path.combine(self.uri, self.glob_pattern)
def join(self, path: str, format_hint: Optional[str] = None) -> "FileParser":
if not self.has_glob:
_path = join(self.path, path)
else:
_path = join(self.parent, path)
return FileParser(_path, format_hint or self._orig_format_hint)

@property
def parent(self) -> str:
dn = os.path.dirname(self.uri)
return dn if dn != "" else "."

@property
def scheme(self) -> str:
return self._uri.scheme
return self._fs.unstrip_protocol(self._fs._parent(self._fs_path))

@property
def path(self) -> str:
return self._path

@property
def raw_path(self) -> str:
return self._raw_path

@property
def suffix(self) -> str:
return "".join(pathlib.Path(self.path.lower()).suffixes)
return "".join(pathlib.Path(self.raw_path.lower()).suffixes)

@property
def file_format(self) -> str:
return self._format

def make_parent_dirs(self) -> None:
self._fs.makedirs(self._fs._parent(self._fs_path), exist_ok=True)

def find_all(self) -> Iterable["FileParser"]:
if self.has_glob:
for x in self._fs.glob(self._fs_path):
yield FileParser(self._fs.unstrip_protocol(x))
else:
yield self

def open(self, *args: Any, **kwargs: Any) -> Any:
self.assert_no_glob()
return self._fs.open(self._fs_path, *args, **kwargs)


def load_df(
uri: Union[str, List[str]],
format_hint: Optional[str] = None,
columns: Any = None,
fs: Optional[FileSystem] = None,
fs: Optional[AbstractFileSystem] = None,
**kwargs: Any,
) -> LocalBoundedDataFrame:
if isinstance(uri, str):
Expand All @@ -117,48 +116,36 @@ def save_df(
uri: str,
format_hint: Optional[str] = None,
mode: str = "overwrite",
fs: Optional[FileSystem] = None,
fs: Optional[AbstractFileSystem] = None,
**kwargs: Any,
) -> None:
assert_or_throw(
mode in ["overwrite", "error"], NotImplementedError(f"{mode} is not supported")
)
p = FileParser(uri, format_hint).assert_no_glob()
if fs is None:
fs = FileSystem()
fs, _ = url_to_fs(uri)
if fs.exists(uri):
assert_or_throw(mode == "overwrite", FileExistsError(uri))
try:
fs.remove(uri)
except Exception:
try:
fs.removetree(uri)
except Exception: # pragma: no cover
pass
fs.rm(uri, recursive=True)
except Exception: # pragma: no cover
pass
_FORMAT_SAVE[p.file_format](df, p, **kwargs)


def _get_single_files(
fp: Iterable[FileParser], fs: Optional[FileSystem]
fp: Iterable[FileParser], fs: Optional[AbstractFileSystem]
) -> Iterable[FileParser]:
if fs is None:
fs = FileSystem()
for f in fp:
if f.glob_pattern != "":
files = [
FileParser(pfs.path.combine(f.uri, pfs.path.basename(x.path)))
for x in fs.opendir(f.uri).glob(f.glob_pattern)
]
yield from _get_single_files(files, fs)
else:
yield f
yield from f.find_all()


def _save_parquet(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None:
PD_UTILS.to_parquet_friendly(
df.as_pandas(), partition_cols=kwargs.get("partition_cols", [])
).to_parquet(
p.uri,
p.assert_no_glob().path,
**{
"engine": "pyarrow",
"schema": df.schema.pa_schema,
Expand All @@ -171,34 +158,36 @@ def _load_parquet(
p: FileParser, columns: Any = None, **kwargs: Any
) -> Tuple[pd.DataFrame, Any]:
if columns is None:
pdf = pd.read_parquet(p.uri, **{"engine": "pyarrow", **kwargs})
pdf = pd.read_parquet(p.path, **{"engine": "pyarrow", **kwargs})
return pdf, None
if isinstance(columns, list): # column names
pdf = pd.read_parquet(p.uri, columns=columns, **{"engine": "pyarrow", **kwargs})
pdf = pd.read_parquet(
p.path, columns=columns, **{"engine": "pyarrow", **kwargs}
)
return pdf, None
schema = Schema(columns)
pdf = pd.read_parquet(
p.uri, columns=schema.names, **{"engine": "pyarrow", **kwargs}
p.path, columns=schema.names, **{"engine": "pyarrow", **kwargs}
)
return pdf, schema


def _save_csv(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None:
df.as_pandas().to_csv(p.uri, **{"index": False, "header": False, **kwargs})
with p.open("w") as f:
df.as_pandas().to_csv(f, **{"index": False, "header": False, **kwargs})


def _safe_load_csv(path: str, **kwargs: Any) -> pd.DataFrame:
def _safe_load_csv(p: FileParser, **kwargs: Any) -> pd.DataFrame:
def load_dir() -> pd.DataFrame:
fs = FileSystem()
return pd.concat(
[
pd.read_csv(pfs.path.combine(path, pfs.path.basename(x.path)), **kwargs)
for x in fs.opendir(path).glob("*.csv")
]
)
dfs: List[pd.DataFrame] = []
for _p in p.join("*.csv").find_all(): # type: ignore
with _p.open("r") as f:
dfs.append(pd.read_csv(f, **kwargs))
return pd.concat(dfs)

try:
return pd.read_csv(path, **kwargs)
with p.open("r") as f:
return pd.read_csv(f, **kwargs)
except IsADirectoryError:
return load_dir()
except pd.errors.ParserError: # pragma: no cover
Expand All @@ -224,7 +213,7 @@ def _load_csv( # noqa: C901
header = kw["header"]
del kw["header"]
if str(header) in ["True", "0"]:
pdf = _safe_load_csv(p.uri, **{"index_col": False, "header": 0, **kw})
pdf = _safe_load_csv(p, **{"index_col": False, "header": 0, **kw})
if columns is None:
return pdf, None
if isinstance(columns, list): # column names
Expand All @@ -236,40 +225,46 @@ def _load_csv( # noqa: C901
raise ValueError("columns must be set if without header")
if isinstance(columns, list): # column names
pdf = _safe_load_csv(
p.uri, **{"index_col": False, "header": None, "names": columns, **kw}
p,
**{"index_col": False, "header": None, "names": columns, **kw},
)
return pdf, None
schema = Schema(columns)
pdf = _safe_load_csv(
p.uri, **{"index_col": False, "header": None, "names": schema.names, **kw}
p,
**{"index_col": False, "header": None, "names": schema.names, **kw},
)
return pdf, schema
else:
raise NotImplementedError(f"{header} is not supported")


def _save_json(df: LocalDataFrame, p: FileParser, **kwargs: Any) -> None:
df.as_pandas().to_json(p.uri, **{"orient": "records", "lines": True, **kwargs})
with p.open("w") as f:
df.as_pandas().to_json(f, **{"orient": "records", "lines": True, **kwargs})


def _safe_load_json(path: str, **kwargs: Any) -> pd.DataFrame:
def _safe_load_json(p: FileParser, **kwargs: Any) -> pd.DataFrame:
kw = {"orient": "records", "lines": True, **kwargs}

def load_dir() -> pd.DataFrame:
dfs: List[pd.DataFrame] = []
for _p in p.join("*.json").find_all(): # type: ignore
with _p.open("r") as f:
dfs.append(pd.read_json(f, **kw))
return pd.concat(dfs)

try:
return pd.read_json(path, **kw)
with p.open("r") as f:
return pd.read_json(f, **kw)
except (IsADirectoryError, PermissionError):
fs = FileSystem()
return pd.concat(
[
pd.read_json(pfs.path.combine(path, pfs.path.basename(x.path)), **kw)
for x in fs.opendir(path).glob("*.json")
]
)
return load_dir()


def _load_json(
p: FileParser, columns: Any = None, **kwargs: Any
) -> Tuple[pd.DataFrame, Any]:
pdf = _safe_load_json(p.uri, **kwargs).reset_index(drop=True)
pdf = _safe_load_json(p, **kwargs).reset_index(drop=True)
if columns is None:
return pdf, None
if isinstance(columns, list): # column names
Expand Down
Loading

0 comments on commit a0cc460

Please sign in to comment.