Skip to content

Commit

Permalink
Merge pull request #54 from GreenScheduler/adjust_integration_window
Browse files Browse the repository at this point in the history
Account for job start/end time not exactly matching forecast data points
  • Loading branch information
tlestang authored Aug 1, 2023
2 parents 0b574d4 + 50319ee commit f625fa2
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 33 deletions.
115 changes: 96 additions & 19 deletions cats/forecast.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta


@dataclass(order=True)
Expand Down Expand Up @@ -27,37 +27,114 @@ class CarbonIntensityAverageEstimate:

class WindowedForecast:

def __init__(self, data: list[CarbonIntensityPointEstimate], window_size: int):
self.times = [point.datetime for point in data]
self.intensities = [point.value for point in data]
# Integration window size in number of time intervals covered
# by the window.
self.window_size = window_size
def __init__(
self,
data: list[CarbonIntensityPointEstimate],
duration: int, # in minutes
start: datetime,
):
self.data_stepsize = data[1].datetime - data[0].datetime
self.start = start
# TODO: Expect duration as a timedelta directly
self.end = start + timedelta(minutes=duration)

# Restrict data points so that start time falls within the
# first data interval. In other we don't need any data prior
# the closest data preceding (on the left of) the job start
# time.
def bisect_right(data, t):
for i, d in enumerate(data):
if d.datetime > t:
return i - 1
# bisect_right(data, start) returns the index of the first
# data point with datetime value immediately preceding the job
# start time
self.data = data[bisect_right(data, start):]

# Find number of data points in a window, by finding the index
# of the closest data point past the job end time. Could be
# done with the bisect module in the stdlib for python 3.10+
# ('key' parameter was introduced in 3.10).
#
# bisect_left(data, self.end, key=lambda x: x.datetime)
#
def bisect_left(data, t):
for i, d in enumerate(data):
if d.datetime >= t:
return i
self.ndata = bisect_left(self.data, self.end) + 1

def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate:
"""Return the average of timeseries data from index over the
window size. Data points are integrated using the trapeziodal
rule, that is assuming that forecast data points are joined
with a straight line.
Integral value between two points is the intensity value at
the midpoint times the duration between the two points. This
duration is assumed to be unity and the average is computed by
dividing the total integral value by the number of intervals.
"""
v = [ # If you think of a better name, pls help!
0.5 * (a + b)
for a, b in zip(
self.intensities[index: index + self.window_size],
self.intensities[index + 1 : index + self.window_size + 1]
)]

# Account for the fact that the start and end of each window
# might not fall exactly on data points. The starting
# intensity is interpolated between the first (index) and
# second data point (index + 1) in the window. The ending
# intensity value is interpolated between the last and
# penultimate data points in he window.
window_start = self.start + index * self.data_stepsize
window_end = self.end + index * self.data_stepsize
lbound = self.interp(
self.data[index],
self.data[index + 1],
when=window_start,
)
rbound = self.interp(
self.data[index + self.ndata - 2],
self.data[index + self.ndata - 1],
when=window_end,
)
# window_data <- [lbound] + [...bulk...] + [rbound] where
# lbound and rbound are interpolated intensity values.
window_data = (
[lbound] +
self.data[index + 1: index + self.ndata - 1] +
[rbound]
)
acc = [
0.5 * (a.value + b.value) * (b.datetime - a.datetime).total_seconds()
for a, b in zip(window_data[:-1], window_data[1:])
]
duration = window_data[-1].datetime - window_data[0].datetime
return CarbonIntensityAverageEstimate(
start=self.times[index],
# Note that `end` points to the _start_ of the last
# interval in the window.
end=self.times[index + self.window_size],
value=sum(v) / self.window_size,
start=window_start,
end=window_end,
value=sum(acc) / duration.total_seconds(),
)

@staticmethod
def interp(
p1: CarbonIntensityPointEstimate,
p2: CarbonIntensityPointEstimate,
when: datetime
) -> CarbonIntensityPointEstimate:
"""Return carbon intensity pt estimate at a time between data
points, assuming points are joined by a straight line (linear
interpolation).
"""
timestep = (p2.datetime - p1.datetime).total_seconds()

slope = (p2.value - p1.value) / timestep
offset = (when - p1.datetime).total_seconds()

return CarbonIntensityPointEstimate(
value=p1.value + slope * offset,
datetime=when,
)

def __iter__(self):
for index in range(self.__len__()):
yield self.__getitem__(index)

def __len__(self):
return len(self.times) - self.window_size - 1
return len(self.data) - self.ndata
12 changes: 2 additions & 10 deletions cats/optimise_starttime.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from math import ceil
from datetime import datetime
from .forecast import WindowedForecast


Expand All @@ -16,13 +16,5 @@ def get_avg_estimates(data, duration=None):
# raise ValueError(
# "Windowed method timespan cannot be greater than the cached timespan"
# )

# get length of interval between timestamps
interval = (
data[1].datetime - data[0].datetime
).total_seconds() / 60
wf = WindowedForecast(
data=data,
window_size=ceil(duration / interval)
)
wf = WindowedForecast(data, duration, start=datetime.now())
return wf[0], min(wf)
131 changes: 127 additions & 4 deletions tests/test_windowed_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

def test_has_right_length():
window_size = 160 # In number of time intervals
wf = WindowedForecast(DATA, window_size)
wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime)

# Expecting (200 - 160 - 1) (39) data points in the time
# integrated timeseries.
Expand All @@ -39,7 +39,7 @@ def test_values():
# a step size `step` small compared the the integration window

window_size = 160
wf = WindowedForecast(DATA, window_size)
wf = WindowedForecast(DATA, window_size, start=DATA[0].datetime)
expected = [

math.cos((i + window_size) * step) - math.cos(i * step)
Expand Down Expand Up @@ -68,7 +68,9 @@ def test_minimise_average():
]

window_size = 6
result = min(WindowedForecast(data, window_size))
# Data points separated by 30 minutes intervals
duration = window_size * 30
result = min(WindowedForecast(data, duration, start=data[0].datetime))

# Intensity point estimates over best runtime period
v = [10, 8, 7, 7, 5, 8, 8]
Expand All @@ -95,7 +97,9 @@ def test_average_intensity_now():
]

window_size = 11
result = WindowedForecast(data, window_size)[0]
# Data points separated by 30 minutes intervals
duration = window_size * 30
result = WindowedForecast(data, duration, start=data[0].datetime)[0]

# Intensity point estimates over best runtime period
v = [p.value for p in data[:window_size + 1]]
Expand All @@ -107,3 +111,122 @@ def test_average_intensity_now():
) / window_size
)
assert (result == expected)


def test_average_intensity_with_offset():
# Case where job start and end time are not colocated with data
# carbon intensity data points. In this case cats interpolate the
# intensity value at beginning and end of each potential job
# duration window.
CI_forecast = [
CarbonIntensityPointEstimate(26, datetime(2023,1,1,8,30)),
CarbonIntensityPointEstimate(40, datetime(2023,1,1,9,0)),
CarbonIntensityPointEstimate(50, datetime(2023,1,1,9,30)),
CarbonIntensityPointEstimate(60, datetime(2023,1,1,10,0)),
CarbonIntensityPointEstimate(25, datetime(2023,1,1,10,30)),
]
duration = 70 # in minutes
# First available data point is for 08:00 but the job
# starts 15 minutes later.
job_start = datetime.fromisoformat("2023-01-01T08:45")
result = WindowedForecast(CI_forecast, duration, start=job_start)[1]

interp1 = 40 + 15 * (50 - 40) / 30
interp2 = 60 + 25 * (25 - 60) / 30
expected = CarbonIntensityAverageEstimate(
start=datetime(2023,1,1,9,15),
end=datetime(2023,1,1,10,25),
value=(
0.5 * (interp1 + 50) * 15 +
0.5 * (50 + 60) * 30 +
0.5 * (60 + interp2) * 25
) / duration
)
assert result == expected

# Test that the WindowedForecast is able to work with a job start
# beyond the first data interval. Not technically required when
# working with carbonintensity.org.uk, but useful generalisation
# nontheless.

# When start at 09:15 the WindowedForecast's 'data' attribute
# should discard the first data point at 08:30.
job_start = datetime.fromisoformat("2023-01-01T09:15")
result = WindowedForecast(CI_forecast, duration, start=job_start)[0]
assert result == expected

def test_average_intensity_with_offset_long_job():
# Case where job start and end time are not colocated with data
# carbon intensity data points. In this case cats interpolate the
# intensity value at beginning and end of each potential job
# duration window.
with open(TEST_DATA, "r") as f:
csvfile = csv.reader(f, delimiter=",")
next(csvfile) # Skip header line
data = [
CarbonIntensityPointEstimate(
datetime=datetime.fromisoformat(datestr[:-1]),
value=float(intensity_value),
)
for datestr, _, _, intensity_value in csvfile
]

duration = 194 # in minutes
# First available data point is for 12:30 but the job
# starts 18 minutes later.
job_start = datetime.fromisoformat("2023-05-04T12:48")
result = WindowedForecast(data, duration, start=job_start)[2]

# First and last element in v are interpolated intensity value.
# e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8
v = [16.8, 18, 19, 17, 16, 11, 11, 11, 11]
data_timestep = data[1].datetime - data[0].datetime # 30 minutes
expected = CarbonIntensityAverageEstimate(
start=job_start + 2 * data_timestep,
end=job_start + 2 * data_timestep + timedelta(minutes=duration),
value=(
0.5 * (v[0] + v[1]) * 12 +
sum([0.5 * (a + b) * 30 for a, b in zip(v[1:-2], v[2:-1])]) +
0.5 * (v[7] + v[8]) * 2
) / duration
)
assert (result == expected)

def test_average_intensity_with_offset_short_job():
# Case where job is short: start and end time fall between two
# consecutive data points (first and second).
with open(TEST_DATA, "r") as f:
csvfile = csv.reader(f, delimiter=",")
next(csvfile) # Skip header line
data = [
CarbonIntensityPointEstimate(
datetime=datetime.fromisoformat(datestr[:-1]),
value=float(intensity_value),
)
for datestr, _, _, intensity_value in csvfile
]

duration = 6 # in minutes
# First available data point is for 12:30 but the job
# starts 6 minutes later.
job_start = datetime.fromisoformat("2023-05-04T12:48")
result = WindowedForecast(data, duration, start=job_start)[2]

# Job starts at 12:48 and ends at 12:54. For each candidate
# running window, both start and end times fall between two
# consecutive data points (e.g. 13:30 and 14:00 for the third
# window).
#
# First and second element in v are interpolated intensity
# values. e.g v[0] = 15 + 18min * (18 - 15) / 30min = 16.8
# and v[1] = v[-1] = 15 + 24min * (18 - 15) / 30min = 17.4
v = [16.8, 17.4]
data_timestep = data[1].datetime - data[0].datetime
expected = CarbonIntensityAverageEstimate(
start=job_start + 2 * data_timestep,
end=job_start + 2 * data_timestep + timedelta(minutes=duration),
value=sum(
[0.5 * (a + b) for a, b in zip(v[:-1], v[1:])]
) / (len(v) - 1)
)
assert (result == expected)

0 comments on commit f625fa2

Please sign in to comment.