-
Notifications
You must be signed in to change notification settings - Fork 9
/
zarr_from_esgf.py
107 lines (82 loc) · 2.65 KB
/
zarr_from_esgf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import sys
import logging
import tempfile
from fsspec.implementations.local import LocalFileSystem
from pangeo_forge_recipes.storage import FSSpecTarget, CacheFSSpecTarget, MetadataTarget
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.recipes import XarrayZarrRecipe
from mysearch import (
esgf_search,
) # We probably want to strip this out later, left as is for now.
DATASET_ID = sys.argv[1]
# CMIP6.CMIP.CCCma.CanESM5.historical.r1i1p1f1.Omon.so.gn.v20190429
facet_labels = (
"mip_era",
"activity_id",
"institution_id",
"source_id",
"experiment_id",
"member_id",
"table_id",
"variable_id",
"grid_label",
"version",
)
facet_vals = DATASET_ID.split(".")
if len(facet_vals) != 10:
raise ValueError(
"Please specify a query of the form {"
+ ("}.{".join(facet_labels).upper())
+ "}"
)
facets = dict(zip(facet_labels, facet_vals))
if facets["mip_era"] != "CMIP6":
raise ValueError("Only CMIP6 mip_era supported")
# version is still not working
# if facets["version"].startswith("v"):
# facets["version"] = facets["version"][1:]
node_dict = {
"llnl": "https://esgf-node.llnl.gov/esg-search/search",
"ipsl": "https://esgf-node.ipsl.upmc.fr/esg-search/search",
"ceda": "https://esgf-index1.ceda.ac.uk/esg-search/search",
"dkrz": "https://esgf-data.dkrz.de/esg-search/search",
}
# version doesn't work here
keep_facets = (
"activity_id",
"institution_id",
"source_id",
"experiment_id",
"member_id",
"table_id",
"variable_id",
"grid_label",
)
search_facets = {f: facets[f] for f in keep_facets}
search_node = "llnl"
ESGF_site = node_dict[
search_node
] # TODO: We might have to be more clever here and search through different nodes. For later.
df = esgf_search(search_facets, server=ESGF_site) # this modifies the dict inside?
# get list of urls
urls = df["url"].tolist()
# sort urls in decending time order (to be able to pass them directly to the pangeo-forge recipe)
end_dates = [url.split("-")[-1].replace(".nc", "") for url in urls]
urls = [url for _, url in sorted(zip(end_dates, urls))]
# TODO Check that there are no gaps or duplicates.
pattern = pattern_from_file_sequence(urls, "time")
recipe = XarrayZarrRecipe(
pattern,
target_chunks={"time": 3},
xarray_concat_kwargs={"join": "exact"},
)
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
stream=sys.stdout,
)
logger = logging.getLogger("pangeo_forge_recipes")
logger.setLevel(logging.INFO)
recipe.to_function()()
print(recipe.target)