-
Notifications
You must be signed in to change notification settings - Fork 3
/
upnp.py
95 lines (80 loc) · 3.16 KB
/
upnp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python
# encoding: utf-8
import io
import xmltok
try:
import urequests as requests
except ImportError:
import requests
soap_action_template = 'urn:schemas-upnp-org:service:{service_type}:{version}#{action}'
soap_body_template = (
'<?xml version="1.0"?>'
'<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"'
' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
'<s:Body>'
'<u:{action} xmlns:u="urn:schemas-upnp-org:service:'
'{service_type}:{version}">'
'{arguments}'
'</u:{action}>'
'</s:Body>'
'</s:Envelope>'
)
def _unescape(value):
# xmltok doesn't unescape any of the characters that are escaped inside the
# tokens. As Sonos regularly includes XML as text inside their UPnP responses,
# it's important we unescape it.
# Do the noddiest thing possible, as it seems to be enough.
return (value
.replace('<', '<')
.replace('>', '>')
.replace('"', '"')
.replace('&', '&')
.replace(''', '\'')
)
def parse_response(action, resp):
arguments = []
action_response_tag = ('u', action + 'Response')
# We want to look for a tag <u:{action}Response>, and produce a list of
# ({name}, {value}) tuples, for each <{name}>{value}</{name}> child element
# of it. Rather than use a proper parser, use the MicroPython XML tokenizer.
tokens = xmltok.tokenize(resp)
token = token_value = None
try:
while not (token == xmltok.START_TAG and token_value == action_response_tag):
token, token_value, *_ = next(tokens)
argument_name = None
while not (token == xmltok.END_TAG and token_value == action_response_tag):
token, token_value, *_ = next(tokens)
# This will produce some screwy results on dodgy input, but it's
# nice and simple.
if token == xmltok.START_TAG:
_, argument_name = token_value
elif token == xmltok.TEXT:
arguments.append((argument_name, _unescape(token_value)))
argument_name = None
except StopIteration:
raise Exception('Bad UPnP response')
return dict(arguments)
def send_command(url, service_type, version, action, arguments):
# NOTE: Does not deal with any escaping.
wrapped_arguments = ''.join(
'<{name}>{value}</{name}>'.format(name=name, value=value)
for name, value in arguments
)
soap = soap_body_template.format(
service_type=service_type, version=version, action=action,
arguments=wrapped_arguments
).encode('utf-8')
soap_action = soap_action_template.format(
service_type=service_type, version=version, action=action,
)
headers = {
'Content-Type': 'text/xml; charset="utf-8"',
'SOAPACTION': soap_action,
}
resp = requests.post(url, headers=headers, data=soap)
if resp.status_code == 200:
# Need a file-like object to unicode string.
return parse_response(action, io.StringIO(resp.text))
else:
raise Exception('UPnP command failed: %s' % resp.content.decode('utf-8'))