Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First pass at new command line API #63

Merged
merged 10 commits into from
Mar 7, 2024
17 changes: 4 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ The Climate-Aware Task Scheduler is a lightweight Python package designed to sch

*Currently CATS only works in the UK. If you are aware of APIs for realtime grid carbon intensity data in other countries please open an issue and let us know.*

***

## Features

- Estimates the carbon intensity of the electricity grid in real-time
Expand All @@ -18,8 +16,6 @@ The Climate-Aware Task Scheduler is a lightweight Python package designed to sch
- Lightweight and easy to integrate into existing workflows
- Supports Python 3.9+

***

## Installation

Install via `pip` as follows:
Expand All @@ -28,8 +24,6 @@ Install via `pip` as follows:
pip install git+https://github.com/GreenScheduler/cats
```

***

## Documentation

Full documentation is available at [greenscheduler.github.io/cats/](https://greenscheduler.github.io/cats/). The below sections
Expand All @@ -41,23 +35,24 @@ the documentation for more details.
You can run `cats` with:

```bash
python -m cats -d <job_duration> --loc <postcode>
cats -d <job_duration> --loc <postcode>
```

The postcode is optional, and can be pulled from the `config.yml` file or, if that is not present, inferred using the server IP address. Job duration is in minutes, specified as an integer.

The scheduler then calls a function that estimates the best time to start the job given predicted carbon intensity over the next 48 hours. The workflow is the same as for other popular schedulers. Switching to `cats` should be transparent to cluster users.

It will display the time to start the job on standard out and optionally some information about the carbon intensity on standard error.
By default, the optimal time to start the job is shown in a human readable format. This information can be output in a machine readable format by passing `--format=json`. The date format in the machine readable output can be controlled using `--dateformat` which accepts a [strftime(3)](https://manpages.debian.org/stable/manpages-dev/strftime.3.en.html) format date.


#### Use with schedulers

You can use CATS with, for example, the ``at`` job scheduler by running:

```bash
ls | at -t `python -m cats -d 5 --loc OX1`
cats -d 5 --loc OX1 --scheduler at --command 'ls'
```
This schedules a command (`ls`) that has an expected runtime less than 5 minutes using the at scheduler.

#### Console demonstration

Expand Down Expand Up @@ -101,14 +96,10 @@ cats -d 120 --config .config/config.yml \
--jobinfo cpus=2,gpus=0,memory=8,partition=CPU_partition
```

***

## Contributing

We welcome contributions from the community! If you find a bug or have an idea for a new feature, please open an issue on our GitHub repository or submit a pull request.

***

## License

[MIT License](https://github.com/GreenScheduler/cats/blob/main/LICENSE)
Binary file modified cats.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
166 changes: 138 additions & 28 deletions cats/__init__.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,73 @@
from argparse import ArgumentParser
from typing import Optional
from datetime import datetime, timedelta
from typing import Optional
import subprocess
import dataclasses
import requests
import logging
import yaml
import sys
import subprocess
import json

from .check_clean_arguments import validate_jobinfo, validate_duration
from .optimise_starttime import get_avg_estimates # noqa: F401
from .CI_api_interface import API_interfaces, InvalidLocationError
from .CI_api_query import get_CI_forecast # noqa: F401
from .carbonFootprint import greenAlgorithmsCalculator
from .carbonFootprint import greenAlgorithmsCalculator, Estimates
from .forecast import CarbonIntensityAverageEstimate

# To add a scheduler, add a date format here
# and create a scheduler_<new>(...) function
SCHEDULER_DATE_FORMAT = {"at": "%Y%m%d%H%M"}

def parse_arguments():
"""
Parse command line arguments
:return: [dict] parsed arguments
"""
parser = ArgumentParser(prog="cats", description="A climate aware job scheduler")
description_text = """
The Climate-Aware Task Scheduler (cats) command line program helps you run
your calculations in a way that minimises their impact on the climate by
delaying computation until a time when the ammount of CO2 produced to
generate the power you will use is predicted to be minimised.

By default, the command simply returns information about when the
calculation should be undertaken and compares the carbon intensity
(gCO2/kWh) of running the calculation now with the carbon intensity at that
time in the future. To undertake this calculation, cats needs to know the
predicted duration of the calculation (which you must supply, see `-d`) and
your location (which can be inferred from your IP address (but see `-l`). If
additional information about the power consumption of your computer is
available (see `--jobinfo`) the predicted CO2 usage will be reported.

To make use of this information, you will need to couple cats with a task
scheduler of some kind. The command to schedule is specified with the `-c`
or `--command` parameter, and the scheduler can be selected using the
`--scheduler` option.

Example:
cats -d 1 --loc RG1 --scheduler=at --command='ls'
"""

example_text = """
Examples\n
********\n

Cats can be used to report information on the best time to run a calculation and the amount
of CO2. Information about a 90 minute calculation in centeral Oxford can be found by running:

cats -d 90 --loc OX1 --jobinfo="cpus=2,gpus=0,memory=8,partition=CPU_partition"

The `at` scheduler is available from the command line on most Linux and MacOS computers,
and can be the easest way to use cats to minimise the carbon intensity of calculations on
smaller computers. For example, the above calculation can be scheduled by running:

cats -d 90 --loc OX1 -s at -c 'mycommand'
"""

parser = ArgumentParser(prog="cats", description=description_text, epilog=example_text)

### Required

Expand All @@ -24,12 +76,23 @@ def parse_arguments():

### Optional

parser.add_argument(
"-s", "--scheduler", type=str,
help="Pass command using `-c` to scheduler. Currently, the only supported scheduler is at",
choices=["at"]
)
parser.add_argument(
"-a", "--api", type=str,
help="API to use to obtain carbon intensity forecasts. Overrides `config.yml`. "
"For now, only choice is `carbonintensity.org.uk` (hence UK only forecasts). "
"Default: `carbonintensity.org.uk`."
)
parser.add_argument(
"-c", "--command", help="Command to schedule, requires --scheduler to be set"
)
parser.add_argument(
"--dateformat", help="Output date format in strftime(3) format or one of the supported schedulers ('at')."
)
parser.add_argument(
"-l", "--location", type=str,
help="Location of the computing facility. For the UK, first half of a postcode (e.g. `M15`), "
Expand All @@ -51,12 +114,61 @@ def parse_arguments():
"Default: if absent, the total carbon footprint is not estimated."
)

parser.add_argument("--format", type=str, help="Format to output optimal start time and carbon emmission"
"estimate savings in. Currently only JSON is supported.", choices=["json"])

return parser

@dataclasses.dataclass
class CATSOutput:
"""Carbon Aware Task Scheduler output"""

carbonIntensityAPI: str
carbonIntensityNow: CarbonIntensityAverageEstimate
carbonIntensityOptimal: CarbonIntensityAverageEstimate
location: str
countryISO3: str
emmissionEstimate: Optional[Estimates] = None

def __str__(self) -> str:
out = f"Best job start time: {self.carbonIntensityOptimal.start}"

if self.emmissionEstimate:
out += (f"Estimated emmissions for running job now: {self.emmissionEstimate.now}\n"
f"Estimated emmissions for running delayed job: {self.emmissionEstimate.best})\n"
f" (- {self.emmissionEstimate.savings})"
)
return out

def to_json(self, dateformat: str = "", **kwargs) -> str:
data = dataclasses.asdict(self)
for ci in ["carbonIntensityNow", "carbonIntensityOptimal"]:
if dateformat == "":
data[ci]["start"] = data[ci]["start"].isoformat()
data[ci]["end"] = data[ci]["end"].isoformat()
else:
data[ci]["start"] = data[ci]["start"].strftime(dateformat)
data[ci]["end"] = data[ci]["end"].strftime(dateformat)

return json.dumps(data, **kwargs)


def schedule_at(output: CATSOutput, args: list[str]) -> None:
"Schedule job with optimal start time using at(1)"
proc = subprocess.Popen(args, stdout=subprocess.PIPE)
output = subprocess.check_output(
("at", "-t", output.carbonIntensityOptimal.start.strftime(SCHEDULER_DATE_FORMAT["at"])),
stdin=proc.stdout,
)


def main(arguments=None):
parser = parse_arguments()
args = parser.parse_args(arguments)
if args.command and not args.scheduler:
print("cats: To run a command with the -c or --command option, you must\n"
" specify the scheduler with the -s or --scheduler option")
sys.exit(1)

##################################
## Validate and clean arguments ##
Expand All @@ -67,16 +179,16 @@ def main(arguments=None):
# if path to config file provided, it is used
with open(args.config, "r") as f:
config = yaml.safe_load(f)
sys.stderr.write(f"Using provided config file: {args.config}\n")
logging.info(f"Using provided config file: {args.config}\n")
else:
# if no path provided, look for `config.yml` in current directory
try:
with open("config.yml", "r") as f:
config = yaml.safe_load(f)
sys.stderr.write("Using config.yml found in current directory\n")
logging.info("Using config.yml found in current directory\n")
except FileNotFoundError:
config = {}
sys.stderr.write("WARNING: config file not found\n")
logging.warning("config file not found")

## CI API choice
list_CI_APIs = ['carbonintensity.org.uk']
Expand All @@ -89,20 +201,20 @@ def main(arguments=None):

if choice_CI_API not in list_CI_APIs:
raise ValueError(f"{choice_CI_API} is not a valid API choice, it needs to be one of {list_CI_APIs}.")
sys.stderr.write(f"Using {choice_CI_API} for carbon intensity forecasts\n")
logging.info(f"Using {choice_CI_API} for carbon intensity forecasts\n")

## Location
if args.location:
location = args.location
sys.stderr.write(f"Using location provided: {location}\n")
logging.info(f"Using location provided: {location}")
elif "location" in config.keys():
location = config["location"]
sys.stderr.write(f"Using location from config file: {location}\n")
logging.info(f"Using location from config file: {location}")
else:
r = requests.get("https://ipapi.co/json").json()
postcode = r["postal"]
location = postcode
sys.stderr.write(f"WARNING: location not provided. Estimating location from IP address: {location}.\n")
logging.warning(f"location not provided. Estimating location from IP address: {location}.")

## Duration
duration = validate_duration(args.duration)
Expand All @@ -115,8 +227,8 @@ def main(arguments=None):
try:
CI_forecast = get_CI_forecast(location, CI_API_interface)
except InvalidLocationError:
sys.stderr.write(f"Error: unknown location {location}\n")
sys.stderr.write(
logging.error(f"Error: unknown location {location}\n")
logging.error(
"Location should be be specified as the outward code,\n"
"for example 'SW7' for postcode 'SW7 EAZ'.\n"
)
Expand All @@ -131,38 +243,36 @@ def main(arguments=None):
now_avg, best_avg = get_avg_estimates(
CI_forecast, duration=duration
)
sys.stderr.write(str(best_avg) + "\n")

sys.stderr.write(f"Best job start time: {best_avg.start}\n")
print(f"{best_avg.start:%Y%m%d%H%M}") # for POSIX compatibility with at -t
output = CATSOutput(choice_CI_API, now_avg, best_avg, location, "GBR")

################################
## Calculate carbon footprint ##
################################

error_message = "Not enough information to estimate total carbon footprint, both --jobinfo and config files are needed.\n"

if args.jobinfo:
jobinfo = validate_jobinfo(args.jobinfo, expected_partition_names=config['partitions'].keys())

if not (jobinfo and config):
sys.stderr.write(error_message)
logging.warning("Not enough information to estimate total carbon footprint, "
"both --jobinfo and config files are needed.\n")
else:
estim = greenAlgorithmsCalculator(
output.emmissionEstimate = greenAlgorithmsCalculator(
config=config,
runtime=timedelta(minutes=args.duration),
averageBest_carbonIntensity=best_avg.value, # TODO replace with real carbon intensity
averageNow_carbonIntensity=now_avg.value,
**jobinfo,
).get_footprint()

sys.stderr.write(f"Estimated emmissions for running job now: {estim.now}\n")
msg = (
f"Estimated emmissions for running delayed job: {estim.best})\n"
f" (- {estim.savings})"
)
sys.stderr.write(msg)

if args.format == "json":
if isinstance(args.dateformat, str) and "%" not in args.dateformat:
dateformat = SCHEDULER_DATE_FORMAT.get(args.dateformat, "")
else:
dateformat = args.dateformat or ""
print(output.to_json(dateformat, sort_keys=True, indent=2))
else:
print(output)
if args.command and args.scheduler == "at":
schedule_at(output, args.command.split())
abhidg marked this conversation as resolved.
Show resolved Hide resolved

if __name__ == "__main__":
main()
main()
4 changes: 2 additions & 2 deletions docs/source/use-with-schedulers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ You can use CATS with the ``at`` job scheduler by running:

.. code-block:: console

$ <command> | at -t `python -m cats -d <job_duration> --loc <postcode>`
$ cats -d <job_duration> --loc <postcode> --scheduler at --command '<command>'

As an example, if you want to schedule a run of ``ls`` with a 5 minute
duration, in the 'OX1' postcode that would look like:

.. code-block:: console

$ ls | at -t `python -m cats -d 5 --loc OX1`
$ cats -d 5 --loc OX1 --scheduler at --command 'ls'


Demonstration
Expand Down
Loading