Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multilabel-classification datasets #11 #30

Merged
merged 4 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ generator:

```python
import gbif_dl
url_generator = gbif_dl.api.generate_urls(
data_generator = gbif_dl.api.generate_urls(
queries=queries,
label="speciesKey",
)
Expand All @@ -69,7 +69,7 @@ necessarily have to be part of the query attributes. The `label` is later used t
Iterating over the generator now yields the media data returning a few thousand urls.

```python
for i in url_generator:
for i in data_generator:
print(i)
```

Expand Down Expand Up @@ -103,7 +103,7 @@ Very often users won't be using all media downloads from a given query since thi
In the following example, we will receive a balanced dataset assembled from `3 species * 2 datasets = 6 streams` and only get minumum number of total samples from all 6 streams:

```python
url_generator = gbif_dl.api.generate_urls(
data_generator = gbif_dl.api.generate_urls(
queries=queries,
label="speciesKey",
nb_samples=-1,
Expand All @@ -119,7 +119,7 @@ For other, more advanced, use-cases users can add more constraints:
The following dataset consist of exactly 1000 samples for which the distribution of `speciesKey` is maintained from the full query of all samples. Furthermore, we only allow a maxmimum of 800 samples per species.

```python
url_generator = gbifmediads.api.generate_urls(
data_generator = gbifmediads.api.generate_urls(
queries=queries,
label="speciesKey",
nb_samples=1000,
Expand All @@ -135,10 +135,10 @@ A url generator can also be created from a GBIF download link given a registered

* `dwca_root_path`: Set root path where to store the DWCA zip files. Defaults to None, which results in the creation of a temporary directory, If the path and DWCA archive already exist, it will not be downloaded again.

The following example creates a url_generator with the the same output class label as in the example above.
The following example creates a data_generator with the the same output class label as in the example above.

```python
url_generator = gbif_dl.dwca.generate_urls(
data_generator = gbif_dl.dwca.generate_urls(
"10.15468/dl.vnm42s", dwca_root_path="dwcas", label="speciesKey"
)
```
Expand All @@ -147,7 +147,7 @@ url_generator = gbif_dl.dwca.generate_urls(
Downloading from a url generator can simply be done by running.

```python
gbif_dl.io.download(url_generator, root="my_dataset")
gbif_dl.io.download(data_generator, root="my_dataset")
```

The downloader provides very fast download speeds by using an async queue. Some fail-safe functionality is provided by setting the number of `retries`, default to 3.
Expand All @@ -160,7 +160,7 @@ The downloader provides very fast download speeds by using an async queue. Some

```python
from gbif_dl.dataloaders.torch import GBIFImageDataset
dataset = GBIFImageDataset(root='my_dataset', generator=url_generator, download=True)
dataset = GBIFImageDataset(root='my_dataset', generator=data_generator, download=True)
```

> ⚠️ Note that we do not provide train/validation/test splits of the dataset as this would be more useful to design specifically to the downstream task.
Expand Down
2 changes: 1 addition & 1 deletion gbif_dl/dataloaders/torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


class GBIFImageDataset(torchvision.datasets.ImageFolder):
"""GBIF Image Dataset
"""GBIF Image Dataset for multi-class classification

Args:
root (str):
Expand Down
19 changes: 13 additions & 6 deletions gbif_dl/generators/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
def gbif_query_generator(
page_limit: int = 300,
mediatype: str = 'StillImage',
label: str = 'speciesKey',
label: Optional[str] = None,
*args, **kwargs
) -> MediaData:
"""Performs media queries GBIF yielding url and label

Args:
page_limit (int, optional): GBIF api uses paging which can be modified. Defaults to 300.
mediatype (str, optional): Sets GBIF mediatype. Defaults to 'StillImage'.
label (str, optional): Sets label. Defaults to 'speciesKey'.
label (str, optional): Output label name.
Defaults to `None` which yields all metadata.


Returns:
str: [description]
Expand Down Expand Up @@ -57,10 +59,15 @@ def gbif_query_generator(
media['identifier'].encode('utf-8')
).hexdigest()

if label is not None:
output_label = str(metadata.get(label))
else:
output_label = metadata

yield {
"url": media['identifier'],
"basename": hashed_url,
"label": str(metadata.get(label))
"label": output_label,
}

if resp['endOfRecords']:
Expand Down Expand Up @@ -97,7 +104,7 @@ def dproduct(dicts):

def generate_urls(
queries: Dict,
label: str = "speciesKey",
label: Optional[str] = None,
split_streams_by: Optional[Union[str, List]] = None,
nb_samples_per_stream: Optional[int] = None,
nb_samples: Optional[int] = None,
Expand All @@ -111,8 +118,8 @@ def generate_urls(
Args:
queries (Dict):
dictionary of queries supported by the GBIF api
label (str, optional): label identfier, according to query api.
Defaults to "speciesKey".
label (str, optional): Output label name.
Defaults to `None` which yields all metadata.
nb_samples (int):
Limit the total number of samples retrieved from the API.
When set to -1 and `split_streams_by` is not `None`,
Expand Down
14 changes: 10 additions & 4 deletions gbif_dl/generators/dwca.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ def dwca_generator(
url.encode('utf-8')
).hexdigest()

if label is not None:
output_label = str(row.data.get(gbifqualname + label))
else:
output_label = row.data

yield {
"url": url,
"basename": hashed_url,
"label": str(row.data.get(gbifqualname + label))
"label": output_label,
}

if delete:
Expand Down Expand Up @@ -109,8 +114,8 @@ def _is_doi(identifier: str) -> bool:
def generate_urls(
identifier: str,
dwca_root_path=None,
label: Optional[str] = "speciesKey",
mediatype: Optional[str] = "StillImage",
label: Optional[str] = None,
mediatype: Optional[str] = "StillImage"
delete: Optional[bool] = False
):
"""Generate GBIF items from DOI or GBIF download key
Expand All @@ -120,7 +125,8 @@ def generate_urls(
dwca_root_path (str, optional): Set root path where to store
Darwin Core zip files. Defaults to None, which results in
the creation of temporary directries
label (str): output label
label (str, optional): Output label name.
Defaults to `None` which yields all metadata.
mediatype (str, optional): Sets GBIF mediatype. Defaults to 'StillImage'.
the creation of temporary directories.
delete (bool, optional): Delete darwin core archive when finished.
Expand Down
62 changes: 19 additions & 43 deletions gbif_dl/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
from pathlib import Path
from typing import AsyncGenerator, Callable, Generator, Union, Optional
import sys
import json
import functools

from attr import dataclass


if sys.version_info >= (3, 8):
from typing import TypedDict # pylint: disable=no-name-in-module
Expand All @@ -18,7 +23,7 @@
import aiostream
from aiohttp_retry import RetryClient, ExponentialRetry
from tqdm.asyncio import tqdm

from .utils import watchdog, run_async

class MediaData(TypedDict):
""" Media dict representation received from api or dwca generators"""
Expand Down Expand Up @@ -54,8 +59,13 @@ async def download_single(
"""
url = item['url']

# check for path
label_path = Path(root, item['label'])
# create subfolder when label is a single str
if isinstance(item['label'], str):
label_path = Path(root, item['label'])
# otherwise make it a flat file hierarchy
else:
label_path = Path(root)

label_path.mkdir(parents=True, exist_ok=True)

check_files_with_same_basename = label_path.glob(item['basename'] + "*")
Expand Down Expand Up @@ -91,6 +101,10 @@ async def download_single(
async with aiofiles.open(file_path, "+wb") as f:
await f.write(content)

if isinstance(item['label'], dict):
json_path = (label_path / item['basename']).with_suffix('.json')
async with aiofiles.open(json_path, mode='+w') as fp:
await fp.write(json.dumps(item['label']))

async def download_queue(
queue: asyncio.Queue,
Expand Down Expand Up @@ -133,8 +147,8 @@ async def download_queue(
async def download_from_asyncgen(
items: AsyncGenerator,
root: str = "data",
tcp_connections: int = 256,
nb_workers: int = 256,
tcp_connections: int = 64,
nb_workers: int = 64,
batch_size: int = 16,
retries: int = 3,
verbose: bool = False,
Expand Down Expand Up @@ -213,44 +227,6 @@ async def download_from_asyncgen(
w.cancel()


def get_or_create_eventloop():
try:
return asyncio.get_event_loop()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()

class RunThread(threading.Thread):
def __init__(self, func, args, kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
super().__init__()

def run(self):
self.result = asyncio.run(self.func(*self.args, **self.kwargs))


def run_async(func, *args, **kwargs):
"""async wrapper to detect if asyncio loop is already running

This is useful when already running in async thread.
"""
try:
loop = get_or_create_eventloop()
except RuntimeError:
loop = None
if loop and loop.is_running():
thread = RunThread(func, args, kwargs)
thread.start()
thread.join()
return thread.result
else:
return asyncio.run(func(*args, **kwargs))


def download(
items: Union[Generator, AsyncGenerator, Iterable],
root: str = "data",
Expand Down
55 changes: 55 additions & 0 deletions gbif_dl/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import asyncio
import functools
import threading

def watchdog(afunc):
"""Stops all tasks if there is an error"""
@functools.wraps(afunc)
async def run(*args, **kwargs):
try:
await afunc(*args, **kwargs)
except asyncio.CancelledError:
return
except Exception as err:
print(f'exception {err}')
asyncio.get_event_loop().stop()
return run


def get_or_create_eventloop():
try:
return asyncio.get_event_loop()
except RuntimeError as ex:
if "There is no current event loop in thread" in str(ex):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return asyncio.get_event_loop()


class RunThread(threading.Thread):
def __init__(self, func, args, kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
super().__init__()

def run(self):
self.result = asyncio.run(self.func(*self.args, **self.kwargs))


def run_async(func, *args, **kwargs):
"""async wrapper to detect if asyncio loop is already running

This is useful when already running in async thread.
"""
try:
loop = get_or_create_eventloop()
except RuntimeError:
loop = None
if loop and loop.is_running():
thread = RunThread(func, args, kwargs)
thread.start()
thread.join()
return thread.result
else:
return asyncio.run(func(*args, **kwargs))