From 1d341c14bacffe63e927284e9b55b65803327a2f Mon Sep 17 00:00:00 2001 From: Frey Alfredsson Date: Sun, 6 Jun 2021 22:43:36 +0200 Subject: [PATCH] Added the HPFQ scheduler and reference scheduling This commit adds the HierarchicalPacket Fair Queueing (HPFQ) scheduler from the paper "Programmable packet scheduling at line rate" by Sivaraman, Anirudh, et al. This commit also changes the framework to make it easy to schedule references to flows, queues, or schedulers. It does so by adding a reference with every enqueue operation. By adding the reference with the packet or flow, we can create a rank from packet or flow and queue the reference. This commit adds the HierarchicalPacket Fair Queueing (HPFQ) scheduler from the paper "Programmable packet scheduling at line rate" by Sivaraman, Anirudh, et al. This commit also changes the framework to make it easy to schedule references to flows, queues, or schedulres. Signed-off-by: Frey Alfredsson --- queue-exp/pifo_fifo.py | 17 ++--- queue-exp/pifo_hpfq.py | 87 +++++++++++++++++++++++++ queue-exp/pifo_srpt.py | 24 +++---- queue-exp/pifo_stfq.py | 19 +++--- queue-exp/pifo_wfq.py | 21 +++--- queue-exp/{pifo_lib.py => sched_lib.py} | 30 +++++---- 6 files changed, 149 insertions(+), 49 deletions(-) create mode 100755 queue-exp/pifo_hpfq.py rename queue-exp/{pifo_lib.py => sched_lib.py} (89%) diff --git a/queue-exp/pifo_fifo.py b/queue-exp/pifo_fifo.py index 8d91cc2..ce7b69e 100755 --- a/queue-exp/pifo_fifo.py +++ b/queue-exp/pifo_fifo.py @@ -32,23 +32,24 @@ along with this program. If not, see . """ -from pifo_lib import Packet, Runner, Pifo -from pifo_lib import SchedulingAlgorithm +from sched_lib import Packet, Runner, Pifo, SchedulingAlgorithm class Fifo(SchedulingAlgorithm): """First in, first out (FIFO)""" - def __init__(self): + def __init__(self, name=None): + super().__init__(name) self._pifo = Pifo() - def enqueue(self, item): - rank = self.get_rank(item) - self._pifo.enqueue(item, rank) - - def get_rank(self, item): + def get_rank(self, _): + """Rank the items in FIFO order.""" return self._pifo.qlen + def enqueue(self, ref, item): + rank = self.get_rank(item) + self._pifo.enqueue(ref, rank) + def dequeue(self): return self._pifo.dequeue() diff --git a/queue-exp/pifo_hpfq.py b/queue-exp/pifo_hpfq.py new file mode 100755 index 0000000..92eb5d8 --- /dev/null +++ b/queue-exp/pifo_hpfq.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +# coding: utf-8 -*- +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# pifo-hpfq.py + +"""HierarchicalPacket Fair Queueing (HPFQ) + +This scheduling algorithm is mentioned in the paper "Programmable packet +scheduling at line rate" by Sivaraman, Anirudh, et al. It creates a hierarchy of +WFQ schedulers. The central scheduler is called root and contains references to +other WFQ schedulers. Those two WFQ schedulers are called left and right. We +chose that packets with flow ids lower than ten go into the left scheduler in +our implementation. In contrast, the others go into the right scheduler.""" + +__copyright__ = """ +Copyright (c) 2021, Toke Høiland-Jørgensen +Copyright (c) 2021, Frey Alfredsson +""" + +__license__ = """ +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + +from sched_lib import Packet, Runner, SchedulingAlgorithm +from pifo_wfq import Wfq + + +class Hpfq(SchedulingAlgorithm): + """HierarchicalPacket Fair Queueing (HPFQ)""" + + def __init__(self, name=None): + super().__init__(name) + self._root = Wfq("root") + self._left = Wfq("Left") + self._right = Wfq("Right") + + def enqueue(self, ref, item): + queue = None + if item.flow < 10: + self._left.enqueue(ref, item) + queue = self._left + else: + self._right.enqueue(ref, item) + queue = self._right + + self._root.enqueue(queue, item) + + def dequeue(self): + queue = self._root.dequeue() + return queue.dequeue() if queue is not None else None + + def dump(self): + print(" Root:") + self._root.dump() + print(" Left:") + self._left.dump() + print(" Right:") + self._right.dump() + + +if __name__ == "__main__": + pkts = [ + Packet(flow=1, idn=1, length=200), + Packet(flow=1, idn=2, length=200), + Packet(flow=10, idn=1, length=200), + Packet(flow=10, idn=2, length=200), + Packet(flow=2, idn=1, length=100), + Packet(flow=2, idn=2, length=100), + Packet(flow=2, idn=3, length=100), + Packet(flow=20, idn=1, length=100), + Packet(flow=20, idn=2, length=100), + Packet(flow=20, idn=3, length=100), + ] + Runner(pkts, Hpfq()).run() diff --git a/queue-exp/pifo_srpt.py b/queue-exp/pifo_srpt.py index 82304d5..014d406 100755 --- a/queue-exp/pifo_srpt.py +++ b/queue-exp/pifo_srpt.py @@ -36,15 +36,15 @@ along with this program. If not, see . """ -from pifo_lib import Packet, Runner, Pifo -from pifo_lib import SchedulingAlgorithm -from pifo_lib import FlowTracker +from sched_lib import Packet, Runner, Pifo, SchedulingAlgorithm +from sched_lib import FlowTracker class Srpt(SchedulingAlgorithm): """Shortest Remaining Processing Time""" - def __init__(self): + def __init__(self, name=None): + super().__init__(name) self._pifo = Pifo() self._flow_tracker = FlowTracker() @@ -57,22 +57,24 @@ def __init__(self): else: self._remains[pkt.flow] = pkt.length - def get_rank(self, pkt): - rank = self._remains[pkt.flow] - self._remains[pkt.flow] -= pkt.length + def get_rank(self, item): + """Rank the items by their remaining total flow length.""" + + rank = self._remains[item.flow] + self._remains[item.flow] -= item.length return rank - def enqueue(self, item): + def enqueue(self, ref, item): flow = self._flow_tracker.enqueue(item) rank = self.get_rank(item) self._pifo.enqueue(flow, rank) def dequeue(self): flow = self._pifo.dequeue() - pkt = None + item = None if flow is not None: - pkt = flow.dequeue() - return pkt + item = flow.dequeue() + return item def dump(self): self._pifo.dump() diff --git a/queue-exp/pifo_stfq.py b/queue-exp/pifo_stfq.py index bc72b21..be91af3 100755 --- a/queue-exp/pifo_stfq.py +++ b/queue-exp/pifo_stfq.py @@ -34,31 +34,34 @@ along with this program. If not, see . """ -from pifo_lib import Packet, Runner, Pifo -from pifo_lib import SchedulingAlgorithm +from sched_lib import Packet, Runner, Pifo, SchedulingAlgorithm class Stfq(SchedulingAlgorithm): """Start-Time Fair Queuing (STFQ)""" - def __init__(self): + def __init__(self, name=None): + super().__init__(name) self._pifo = Pifo() self._last_finish = {} self._virt_time = 0 - def get_rank(self, pkt): - flow_id = pkt.flow + def get_rank(self, item): + """Rank the items by their start time, which we calculate from the + finish time of the last packet. + """ + flow_id = item.flow if flow_id in self._last_finish: rank = max(self._virt_time, self._last_finish[flow_id]) else: rank = self._virt_time - self._last_finish[flow_id] = rank + pkt.length + self._last_finish[flow_id] = rank + item.length return rank - def enqueue(self, item): + def enqueue(self, ref, item): rank = self.get_rank(item) - self._pifo.enqueue(item, rank) + self._pifo.enqueue(ref, rank) def dequeue(self): return self._pifo.dequeue() diff --git a/queue-exp/pifo_wfq.py b/queue-exp/pifo_wfq.py index 2a76858..85e1b00 100755 --- a/queue-exp/pifo_wfq.py +++ b/queue-exp/pifo_wfq.py @@ -34,31 +34,36 @@ along with this program. If not, see . """ -from pifo_lib import Packet, Runner, Pifo -from pifo_lib import SchedulingAlgorithm +from sched_lib import Packet, Runner, Pifo, SchedulingAlgorithm class Wfq(SchedulingAlgorithm): """Weighted Fair Queueing (WFQ)""" - def __init__(self): + def __init__(self, name=None): + super().__init__(name) self._pifo = Pifo() self._last_finish = {} self._virt_time = 0 - def get_rank(self, pkt): - flow = pkt.flow + def get_rank(self, item): + """Rank the items by their start time, which we calculate from the + finish time of the last packet. However, we divide the packet's length + with weights to prioritize them. We determine the weights by splitting + the flow-ids into odd and even numbers. + """ + flow = item.flow weight = 50 if flow % 2 == 1 else 100 if flow in self._last_finish: rank = max(self._virt_time, self._last_finish[flow]) else: rank = self._virt_time - self._last_finish[flow] = rank + pkt.length / weight + self._last_finish[flow] = rank + item.length / weight return rank - def enqueue(self, item): + def enqueue(self, ref, item): rank = self.get_rank(item) - self._pifo.enqueue(item, rank) + self._pifo.enqueue(ref, rank) def dequeue(self): return self._pifo.dequeue() diff --git a/queue-exp/pifo_lib.py b/queue-exp/sched_lib.py similarity index 89% rename from queue-exp/pifo_lib.py rename to queue-exp/sched_lib.py index 19c6e40..40eb4d3 100644 --- a/queue-exp/pifo_lib.py +++ b/queue-exp/sched_lib.py @@ -48,7 +48,7 @@ def run(self): print(" Inserting packets into scheduler:") pprint(self.input_pkts, indent=4) for p in self.input_pkts: - self.scheduler.enqueue(p) + self.scheduler.enqueue(p, p) print(" Scheduler state:") self.scheduler.dump() output = [] @@ -70,10 +70,10 @@ class SchedulingAlgorithm(): Please look at the pifo_fifo.py to see how you implement a FIFO. """ - def __init__(self): - raise NotImplementedError(self.__class__.__name__ + ' missing implementation') + def __init__(self, name=None): + self._name = name - def enqueue(self, item): + def enqueue(self, ref, item): raise NotImplementedError(self.__class__.__name__ + ' missing implementation') def dequeue(self): @@ -83,16 +83,19 @@ def dump(self): raise NotImplementedError(self.__class__.__name__ + ' missing implementation') def __next__(self): - item = self.dequeue() - if item is None: + pkt = self.dequeue() + if pkt is None: raise StopIteration - return item + return pkt def __iter__(self): return self def __repr__(self): - return f"{self.__class__.__name__} - {self.__class__.__doc__}" + result = f"{self.__class__.__name__} - {self.__class__.__doc__}" + if self._name is not None: + result = f"{self._name}: {result}" + return result class Queue: @@ -100,8 +103,8 @@ def __init__(self, idx=None): self._list = [] self.idx = idx - def enqueue(self, item): - self._list.append(item) + def enqueue(self, ref, rank=None): + self._list.append(ref) def peek(self): try: @@ -139,11 +142,11 @@ def dump(self): class Pifo(Queue): - def enqueue(self, item, rank): + def enqueue(self, ref, rank): if rank is None: raise ValueError("Rank can't be of value 'None'.") - super().enqueue((rank, item)) + super().enqueue((rank, ref)) self.sort() def sort(self): @@ -160,8 +163,7 @@ def peek(self): class Flow(Queue): def __init__(self, idx): - super().__init__() - self.idx = idx + super().__init__(idx) def __repr__(self): return f"F(I:{self.idx}, Q:{self.qlen}, L:{self.length})"