Skip to content

Commit

Permalink
Handle stream errors and timeouts.
Browse files Browse the repository at this point in the history
  • Loading branch information
timdorr committed Oct 11, 2019
1 parent e1f025d commit 0ea61a0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 32 deletions.
4 changes: 4 additions & 0 deletions lib/tesla_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
require 'eventmachine'
require 'faye/websocket'

require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'

require 'tesla_api/version'
require 'tesla_api/client'
require 'tesla_api/stream'
Expand Down
70 changes: 38 additions & 32 deletions lib/tesla_api/stream.rb
Original file line number Diff line number Diff line change
@@ -1,47 +1,53 @@
module TeslaApi
module Stream
def stream(&receiver)
EventMachine.run do
socket = create_streaming_socket
Async do |task|
Async::WebSocket::Client.connect(endpoint) do |connection|
on_timeout = ->(subtask) do
subtask.sleep TIMEOUT
task.stop
end

socket.on(:open) do |event|
socket.send(JSON.generate(stream_connect_message))
end
connection.write(stream_connect_message)
timeout = task.async(&on_timeout)

socket.on(:message) do |event|
data = JSON.parse(event.data.pack('c*'))

if data['msg_type'] == 'data:update'
attributes = data['value'].split(',')

receiver.call({
time: DateTime.strptime((attributes[0].to_i/1000).to_s, '%s'),
speed: attributes[1].to_f,
odometer: attributes[2].to_f,
soc: attributes[3].to_f,
elevation: attributes[4].to_f,
est_heading: attributes[5].to_f,
est_lat: attributes[6].to_f,
est_lng: attributes[7].to_f,
power: attributes[8].to_f,
shift_state: attributes[9].to_s,
range: attributes[10].to_f,
est_range: attributes[11].to_f,
heading: attributes[12].to_f
})
end
end
while message = connection.read
timeout.stop
timeout = task.async(&on_timeout)

socket.on(:close) do |event|
EventMachine.stop
case message[:msg_type]
when 'data:update'
attributes = message[:value].split(',')

receiver.call({
time: DateTime.strptime((attributes[0].to_i/1000).to_s, '%s'),
speed: attributes[1].to_f,
odometer: attributes[2].to_f,
soc: attributes[3].to_f,
elevation: attributes[4].to_f,
est_heading: attributes[5].to_f,
est_lat: attributes[6].to_f,
est_lng: attributes[7].to_f,
power: attributes[8].to_f,
shift_state: attributes[9].to_s,
range: attributes[10].to_f,
est_range: attributes[11].to_f,
heading: attributes[12].to_f
})
when 'data:error'
task.stop
end
end
end
end
end

private

def create_streaming_socket
Faye::WebSocket::Client.new(streaming_endpoint)
TIMEOUT = 30

def endpoint
Async::HTTP::Endpoint.parse(streaming_endpoint)
end

def streaming_endpoint
Expand Down
1 change: 1 addition & 0 deletions tesla_api.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ['lib']

spec.add_dependency 'async-websocket'
spec.add_dependency 'faraday'
spec.add_dependency 'faraday_middleware'
spec.add_dependency 'faye-websocket'
Expand Down

0 comments on commit 0ea61a0

Please sign in to comment.