Skip to content

Commit

Permalink
Support forwarding events to another Slipstream (#2)
Browse files Browse the repository at this point in the history
This works by using two plugins;
 1) TransmitterPlugin will serialize most* incoming events and
    send them via TCP/IP to a receiver, as configured in settings
 2) ReceiverPlugin deserializes the incoming data to events
    and sends them on its own eventbus

*most events are sent: "Internal" events such as Plugin management
(register/unregister/enable/disable) are ignoed as we wouldnt be able to
use different plugins on the receiver side

Filemonitoring events are also ignored. The files are not shared
between, so these events doesnt make much sense on the receiver.

Utility events (writing to console, saying text, playing audio) are also
ignored.

If you look at the event, ExcludeFromTxrx is used to determine if
a event is transferred to the receiver or not.

The protocol between Transmitter and Receiver is json based. Each event
is converted to json and each json string gets a linefeed (\n) as
delimiter. json strings are assumed not to contain linefeeds!
  • Loading branch information
dennis authored Dec 30, 2020
1 parent e137ca4 commit 683cfc9
Show file tree
Hide file tree
Showing 51 changed files with 954 additions and 355 deletions.
3 changes: 3 additions & 0 deletions App.config
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
<setting name="TwitchChannel" serializeAs="String">
<value />
</setting>
<setting name="TxrxIpPort" serializeAs="String">
<value />
</setting>
</Slipstream.Properties.Settings>
</userSettings>
</configuration>
33 changes: 31 additions & 2 deletions Backend/Engine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public Engine(IEventBus eventBus, IStateService stateService, IApplicationConfig
RegisterPlugin(new Shared.Events.Internal.CommandPluginRegister() { Id = "AudioPlugin", PluginName = "AudioPlugin", Settings = ApplicationConfiguration.GetAudioSettingsEvent() });
RegisterPlugin(new Shared.Events.Internal.CommandPluginRegister() { Id = "IRacingPlugin", PluginName = "IRacingPlugin" });
RegisterPlugin(new Shared.Events.Internal.CommandPluginRegister() { Id = "TwitchPlugin", PluginName = "TwitchPlugin", Settings = ApplicationConfiguration.GetTwitchSettingsEvent() });
RegisterPlugin(new Shared.Events.Internal.CommandPluginRegister() { Id = "TransmitterPlugin", PluginName = "TransmitterPlugin", Settings = ApplicationConfiguration.GetTxrxSettingsEvent() }, false);
RegisterPlugin(new Shared.Events.Internal.CommandPluginRegister() { Id = "ReceiverPlugin", PluginName = "ReceiverPlugin", Settings = ApplicationConfiguration.GetTxrxSettingsEvent() }, false);

// Tell Plugins that we're live - this will make eventbus distribute events
EventBus.Enabled = true;
Expand All @@ -57,10 +59,13 @@ private void OnCommandPluginStates(CommandPluginStates _)
}));
}

private void RegisterPlugin(CommandPluginRegister e)
private void RegisterPlugin(CommandPluginRegister e, bool enable = true)
{
OnCommandPluginRegister(e);
EventBus.PublishEvent(new Shared.Events.Internal.CommandPluginEnable() { Id = e.Id });
if (enable)
{
EventBus.PublishEvent(new Shared.Events.Internal.CommandPluginEnable() { Id = e.Id });
}
}

public IEventBusSubscription RegisterListener()
Expand Down Expand Up @@ -136,6 +141,30 @@ private void OnCommandPluginRegister(Shared.Events.Internal.CommandPluginRegiste
}
}
break;
case "TransmitterPlugin":
{
if (!(ev.Settings is TxrxSettings settings))
{
throw new Exception("Unexpected settings for TransmitterPlugin");
}
else
{
PluginManager.RegisterPlugin(new TransmitterPlugin(ev.Id, EventBus, settings));
}
}
break;
case "ReceiverPlugin":
{
if (!(ev.Settings is TxrxSettings settings))
{
throw new Exception("Unexpected settings for ReceiverPlugin");
}
else
{
PluginManager.RegisterPlugin(new ReceiverPlugin(ev.Id, EventBus, settings));
}
}
break;
default:
throw new Exception($"Unknown plugin '{ev.PluginName}'");
}
Expand Down
135 changes: 135 additions & 0 deletions Backend/Plugins/ReceiverPlugin.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using Slipstream.Backend.Services;
using Slipstream.Shared;
using Slipstream.Shared.Events.Setting;
using Slipstream.Shared.Events.Utility;
using System;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using EventHandler = Slipstream.Shared.EventHandler;

#nullable enable

namespace Slipstream.Backend.Plugins
{
class ReceiverPlugin : BasePlugin
{
private readonly IEventBus EventBus;
private string Ip = "";
private Int32 Port = 42424;
private TcpListener? Listener;
private readonly TxrxService TxrxService = new TxrxService();
private Socket? Client;
private const int READ_BUFFER_SIZE = 1024 * 16;
readonly byte[] ReadBuffer = new byte[READ_BUFFER_SIZE];

public ReceiverPlugin(string id, IEventBus eventBus, TxrxSettings settings) : base(id, "ReceiverPlugin", "ReceiverPlugin", "ReceiverPlugin")
{
EventBus = eventBus;

OnSetting(settings);

EventHandler.OnSettingTxrxSettings += (s, e) => OnSetting(e.Event);
}

public override void OnEnable()
{
// To avoid that we get an endless loop, we will Unregister the "other" end in this instance
EventBus.PublishEvent(new Shared.Events.Internal.CommandPluginUnregister { Id = "TransmitterPlugin" });
}

public override void OnDisable()
{
Client?.Dispose();
Client = null;
}

private void OnSetting(TxrxSettings e)
{
var input = e.TxrxIpPort.Split(':');

if (input.Length == 2)
{
Ip = input[0];
if (!Int32.TryParse(input[1], out Port))
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"ReceiverPlugin: Invalid port in TxrxHost provided: '{e.TxrxIpPort}'" });
}
}
else
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"ReceiverPlugin: Invalid TxrxHost provided: '{e.TxrxIpPort}'" });
}
}

private void SetupListener()
{
IPAddress localAddr = IPAddress.Parse(Ip);
Listener = new TcpListener(localAddr, Port);
Listener.Start();

EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"ReceiverPlugin listening on {Listener.LocalEndpoint}" });
}

private void AcceptClient()
{
Debug.Assert(Client == null);
Debug.Assert(Listener != null);

Client = Listener!.AcceptSocket();

EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"ReceiverPlugin got a connection from {Client.RemoteEndPoint}" });
}

private void ReadData()
{
Debug.Assert(Client != null);

try
{
if (Client!.Available == 0)
return;

int length = Client.Receive(ReadBuffer);

string json = System.Text.Encoding.Unicode.GetString(ReadBuffer, 0, length);

TxrxService.Parse(json, (@event) => EventBus.PublishEvent(@event));

// Check if disconnceted
if (Client!.Poll(200, SelectMode.SelectRead) && Client!.Available == 0)
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"ReceiverPlugin disconnected {Client.RemoteEndPoint}" });

Client.Dispose();
Client = null;
}
}
catch (SocketException e)
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"ReceiverPlugin: Cant receieve data: {e.Message}" });
}
}

public override void Loop()
{
if (!Enabled || Ip.Length == 0)
return;

if (Listener == null)
{
SetupListener();
}

if (Listener != null && Client == null)
{
AcceptClient();
}

if (Client != null)
{
ReadData();
}
}
}
}
149 changes: 149 additions & 0 deletions Backend/Plugins/TransmitterPlugin.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
using Slipstream.Backend.Services;
using Slipstream.Shared;
using Slipstream.Shared.Events.Setting;
using Slipstream.Shared.Events.Utility;
using System;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading;
using EventHandler = Slipstream.Shared.EventHandler;

#nullable enable

namespace Slipstream.Backend.Plugins
{
class TransmitterPlugin : BasePlugin
{
private readonly IEventBus EventBus;
private string Ip = "";
private Int32 Port = 42424;
private TcpClient? Client = null;
private readonly TxrxService TxrxService = new TxrxService();

public TransmitterPlugin(string id, IEventBus eventBus, TxrxSettings settings) : base(id, "TransmitterPlugin", "TransmitterPlugin", "TransmitterPlugin")
{
EventBus = eventBus;

OnSetting(settings);

EventHandler.OnSettingTxrxSettings += (s, e) => OnSetting(e.Event);
EventHandler.OnDefault += (s, e) => OnEvent(e.Event);
}

private void OnEvent(IEvent @event)
{
if (!Enabled || Client == null || !Client.Connected || @event.ExcludeFromTxrx)
return;

try
{
string json = TxrxService.Serialize(@event);

Debug.WriteLine($"Sending '{json}'");

byte[] data = System.Text.Encoding.Unicode.GetBytes(json);

Client.GetStream().Write(data, 0, data.Length);
}
catch (SocketException e)
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"TransmitterPlugin: Cant send {@event.EventType}: {e.Message}" });
Reset();
}
catch (System.IO.IOException e)
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"TransmitterPlugin: Cant send {@event.EventType}: {e.Message}" });
Reset();
}
}

private void OnSetting(TxrxSettings e)
{
var input = e.TxrxIpPort.Split(':');

if (input.Length == 2)
{
Ip = input[0];
if (!Int32.TryParse(input[1], out Port))
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"TransmitterPlugin: Invalid port in TxrxIpPort provided: '{e.TxrxIpPort}'" });
}
else
{
Reset();
}
}
else
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"TransmitterPlugin: Invalid TxrxIpPort provided: '{e.TxrxIpPort}'" });
}
}

private void Reset()
{
Client?.Close();
Client?.Dispose();
Client = null;
}

private void Connect()
{
Debug.Assert(Client == null);

try
{
Client = new TcpClient
{
SendTimeout = 500,
ExclusiveAddressUse = true,
};

var result = Client.BeginConnect(Ip, Port, null, null);
var success = result.AsyncWaitHandle.WaitOne(TimeSpan.FromMilliseconds(1000));
Client.EndConnect(result);

if (success)
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"TransmitterPlugin: Connected to '{Ip}:{Port}'" });
return;
}
else
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"TransmitterPlugin: Can't connect to '{Ip}:{Port}'" });
Client?.Dispose();
Client = null;
}
}
catch (Exception e)
{
EventBus.PublishEvent(new CommandWriteToConsole() { Message = $"TransmitterPlugin: Error connecting to '{Ip}:{Port}': {e.Message}" });
}

Client = null;

Thread.Sleep(1000);
}

public override void OnEnable()
{
// To avoid that we get an endless loop, we will Unregister the "other" end in this instance
EventBus.PublishEvent(new Shared.Events.Internal.CommandPluginUnregister { Id = "ReceiverPlugin" });
}

public override void OnDisable()
{
Reset();
}

public override void Loop()
{
if (!Enabled || Ip.Length == 0)
return;

if (Client == null)
{
Connect();
}
}
}
}
Loading

0 comments on commit 683cfc9

Please sign in to comment.