Skip to content

Commit

Permalink
Add bandwith limiting to MarsToxic
Browse files Browse the repository at this point in the history
  • Loading branch information
burke committed Nov 11, 2024
1 parent 2721212 commit 1a0c2f3
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 9 deletions.
100 changes: 92 additions & 8 deletions toxics/mars.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package toxics

import (
"fmt"
"math"
"time"

"github.com/rs/zerolog/log"
"github.com/Shopify/toxiproxy/v2/stream"
)

// The MarsToxic simulates the communication delay to Mars based on current orbital positions.
Expand All @@ -11,13 +15,19 @@ import (
// Further possibilities here:
// * drop packets entirely during solar conjunction
// * corrupt frames in the liminal period before/after conjunction
// * buffering through the disk (maybe a FIFO, idk) would model data in flight better
//
// We could to the hard block but we're kind of at the wrong layer to do corruption.
type MarsToxic struct {
// Optional additional latency in milliseconds
ExtraLatency int64 `json:"extra_latency"`
// Rate in KB/s (0 means unlimited)
Rate int64 `json:"rate"`
// Reference time for testing, if zero current time is used
ReferenceTime time.Time `json:"-"`
// Speed of light in km/s (defaults to 299792.458 if 0) It's (probably?)
// obvious you won't want to change this. It's useful for testing.
SpeedOfLight float64 `json:"speed_of_light"`
}

// Since we're buffering for several minutes, we need a large buffer.
Expand Down Expand Up @@ -56,8 +66,11 @@ func (t *MarsToxic) Delay() time.Duration {
// Calculate current distance in kilometers
distanceKm := meanDistance - amplitude*math.Cos(phase)

// Speed of light is exactly 299,792.458 km/s
speedOfLight := 299792.458 // km/s
// Speed of light is exactly 299,792.458 km/s by default
speedOfLight := t.SpeedOfLight
if speedOfLight <= 0 {
speedOfLight = 299792.458 // km/s
}

// One-way time = distance / speed of light
// Convert to milliseconds
Expand All @@ -70,23 +83,94 @@ func (t *MarsToxic) Delay() time.Duration {
}

func (t *MarsToxic) Pipe(stub *ToxicStub) {
logger := log.With().
Str("component", "MarsToxic").
Str("method", "Pipe").
Str("toxic_type", "mars").
Str("addr", fmt.Sprintf("%p", t)).
Logger()

var sleep time.Duration = 0
for {
select {
case <-stub.Interrupt:
logger.Trace().Msg("MarsToxic was interrupted")
return
case c := <-stub.Input:
if c == nil {
stub.Close()
return
}
sleep := t.Delay() - time.Since(c.Timestamp)

// Set timestamp when we receive the chunk
if c.Timestamp.IsZero() {
c.Timestamp = time.Now()
}

// Calculate Mars delay once for this chunk
marsDelay := t.Delay()

// Calculate bandwidth delay if rate is set
if t.Rate > 0 {
bytesPerSecond := t.Rate * 1024

// If chunk is too large, split it
if int64(len(c.Data)) > bytesPerSecond/10 { // 100ms worth of data
bytesPerInterval := bytesPerSecond/10 // bytes per 100ms
remainingData := c.Data
chunkStart := c.Timestamp

// First, wait for Mars delay
select {
case <-time.After(marsDelay):
case <-stub.Interrupt:
return
}

for len(remainingData) > 0 {
chunkSize := int(bytesPerInterval)
if chunkSize > len(remainingData) {
chunkSize = len(remainingData)
}

chunk := &stream.StreamChunk{
Data: remainingData[:chunkSize],
Timestamp: chunkStart,
}

select {
case <-time.After(100 * time.Millisecond):
chunkStart = chunkStart.Add(100 * time.Millisecond)
stub.Output <- chunk
remainingData = remainingData[chunkSize:]
case <-stub.Interrupt:
logger.Trace().Msg("MarsToxic was interrupted during writing data")
return
}
}
continue
}

// For small chunks, calculate bandwidth delay
sleep = time.Duration(float64(len(c.Data)) / float64(bytesPerSecond) * float64(time.Second))
}

// Apply both Mars delay and bandwidth delay
totalDelay := marsDelay
if sleep > 0 {
totalDelay += sleep
}

select {
case <-time.After(sleep):
c.Timestamp = c.Timestamp.Add(sleep)
stub.Output <- c
case <-time.After(totalDelay):
c.Timestamp = c.Timestamp.Add(totalDelay)
stub.Output <- c
case <-stub.Interrupt:
// Exit fast without applying latency.
stub.Output <- c // Don't drop any data on the floor
logger.Trace().Msg("MarsToxic was interrupted during writing data")
err := stub.WriteOutput(c, 5*time.Second)
if err != nil {
logger.Warn().Err(err).Msg("Could not write last packets after interrupt")
}
return
}
}
Expand Down
107 changes: 106 additions & 1 deletion toxics/mars_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/Shopify/toxiproxy/v2/toxics"
"github.com/Shopify/toxiproxy/v2/stream"
)

func TestMarsDelayCalculation(t *testing.T) {
Expand Down Expand Up @@ -54,7 +55,111 @@ func TestMarsExtraLatencyCalculation(t *testing.T) {
expected := 242 * time.Second // ~4 minutes (3 min base + 1 min extra)
delay := marsToxic.Delay()

tolerance := time.Duration(float64(expected) * 0.03) // 3% tolerance
tolerance := time.Duration(float64(expected) * 0.04) // 4% tolerance
if diff := delay - expected; diff < -tolerance || diff > tolerance {
t.Errorf("Expected delay of %v (±%v), got %v (%.1f%% difference)",
expected,
tolerance,
delay,
float64(diff) / float64(expected) * 100,
)
}
}

func TestMarsBandwidth(t *testing.T) {
marsToxic := &toxics.MarsToxic{
ReferenceTime: time.Date(2018, 7, 27, 0, 0, 0, 0, time.UTC), // At opposition
Rate: 100, // 100 KB/s
SpeedOfLight: 299792.458 * 1000, // 1000x normal speed for faster testing
}

input := make(chan *stream.StreamChunk)
output := make(chan *stream.StreamChunk)
stub := toxics.NewToxicStub(input, output)
done := make(chan bool)

go func() {
marsToxic.Pipe(stub)
done <- true
}()

// Send 50KB of data
dataSize := 50 * 1024 // 50KB

// At 100 KB/s, 50KB should take exactly 0.5 seconds
// Expected timing:
// - Bandwidth delay: 500ms (50KB at 100KB/s)
// - Mars delay: ~182ms (at opposition, with 1000x speed of light)
expectedDelay := 500*time.Millisecond + time.Duration(float64(182*time.Second)/1000)

start := time.Now()

testData := make([]byte, dataSize)
for i := range testData {
testData[i] = byte(i % 256) // Fill with recognizable pattern
}

select {
case input <- &stream.StreamChunk{
Data: testData,
}:
case <-time.After(5 * time.Second):
t.Fatal("Timeout while sending data")
}

// Collect all chunks
var receivedData []byte
timeout := time.After(5 * time.Second)

for len(receivedData) < dataSize {
select {
case chunk := <-output:
receivedData = append(receivedData, chunk.Data...)
case <-timeout:
t.Fatalf("Timeout while receiving data. Got %d of %d bytes", len(receivedData), dataSize)
}
}

elapsed := time.Since(start)

// Should take at least 0.5 seconds (50KB at 100KB/s) plus reduced Mars delay
tolerance := time.Duration(float64(expectedDelay) * 0.04) // 4% tolerance for timing

if elapsed < expectedDelay-tolerance || elapsed > expectedDelay+tolerance {
t.Errorf("Expected total delay of %v (±%v), got %v", expectedDelay, tolerance, elapsed)
}

if len(receivedData) != dataSize {
t.Errorf("Expected %d bytes, got %d", dataSize, len(receivedData))
}

// Verify data integrity
for i := range receivedData {
if receivedData[i] != byte(i%256) {
t.Errorf("Data corruption at byte %d: expected %d, got %d", i, byte(i%256), receivedData[i])
break
}
}

close(input)
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Timeout waiting for toxic to finish")
}
}

func TestMarsSpeedOfLight(t *testing.T) {
// Test with 1000x speed of light to reduce delays
marsToxic := &toxics.MarsToxic{
ReferenceTime: time.Date(2018, 7, 27, 0, 0, 0, 0, time.UTC), // At opposition
SpeedOfLight: 299792.458 * 1000, // 1000x normal speed
}

delay := marsToxic.Delay()
expected := time.Duration(float64(182*time.Second) / 1000) // ~182ms (normal 182s / 1000)

tolerance := time.Duration(float64(expected) * 0.04) // 4% tolerance
if diff := delay - expected; diff < -tolerance || diff > tolerance {
t.Errorf("Expected delay of %v (±%v), got %v (%.1f%% difference)",
expected,
Expand Down

0 comments on commit 1a0c2f3

Please sign in to comment.