forked from Jxck-S/plane-notify
-
Notifications
You must be signed in to change notification settings - Fork 0
/
defADSBX.py
91 lines (88 loc) · 3.46 KB
/
defADSBX.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import requests
import json
import configparser
from datetime import datetime
from http.client import IncompleteRead
import http.client as http
import urllib3
import socket
main_config = configparser.ConfigParser()
main_config.read('./configs/mainconf.ini')
api_version = main_config.get('ADSBX', 'API_VERSION')
def pull(url, headers):
try:
response = requests.get(url, headers = headers, timeout=30)
print ("HTTP Status Code:", response.status_code)
response.raise_for_status()
except (requests.HTTPError, ConnectionError, requests.Timeout, urllib3.exceptions.ConnectionError) as error_message:
print("Basic Connection Error")
print(error_message)
response = None
except (requests.RequestException, IncompleteRead, ValueError, socket.timeout, socket.gaierror) as error_message:
print("Connection Error")
print(error_message)
response = None
except Exception as error_message:
print("Connection Error uncaught, basic exception for all")
print(error_message)
response = None
return response
def pull_adsbx(planes):
api_version = int(main_config.get('ADSBX', 'API_VERSION'))
if api_version not in [1, 2]:
raise ValueError("Bad ADSBX API Version")
if main_config.getboolean('ADSBX', 'ENABLE_PROXY') is False:
if api_version == 1:
if len(planes) > 1:
url = "https://adsbexchange.com/api/aircraft/json/"
elif len(planes) == 1:
url = "https://adsbexchange.com/api/aircraft/icao/" + str(list(planes.keys())[0]) + "/"
elif api_version == 2:
url = "https://adsbexchange.com/api/aircraft/v2/all"
else:
if main_config.has_option('ADSBX', 'PROXY_HOST'):
if api_version == 1:
url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/json/all"
if api_version == 2:
url = main_config.get('ADSBX', 'PROXY_HOST') + "/api/aircraft/v2/all"
else:
raise ValueError("Proxy enabled but no host")
headers = {
'api-auth': main_config.get('ADSBX', 'API_KEY'),
'Accept-Encoding': 'gzip'
}
response = pull(url, headers)
if response is not None:
try:
data = json.loads(response.text)
except (json.decoder.JSONDecodeError, ValueError) as error_message:
print("Error with JSON")
print(error_message)
data = None
except TypeError as error_message:
print("Type Error", error_message)
data = None
else:
if "msg" in data.keys() and data['msg'] != "No error":
raise ValueError("Error from ADSBX: msg = ", data['msg'])
if "ctime" in data.keys():
data_ctime = float(data['ctime']) / 1000.0
print("Data ctime:",datetime.utcfromtimestamp(data_ctime))
if "now" in data.keys():
data_now = float(data['now']) / 1000.0
print("Data now time:",datetime.utcfromtimestamp(data_now))
print("Current UTC:", datetime.utcnow())
else:
data = None
return data
def pull_date_ras(date):
url = f"https://globe.adsbexchange.com/globe_history/{date}/acas/acas.json"
headers = {
'Accept-Encoding': 'gzip'
}
response = pull(url, headers)
if response is not None:
data = response.text.splitlines()
else:
data = None
return data