-
Notifications
You must be signed in to change notification settings - Fork 3
/
mediawiki.py
executable file
·214 lines (174 loc) · 7.02 KB
/
mediawiki.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/python2.7
"""Methods for interacting with a mediawiki instance, like wikipedia."""
import requests
import time
# Rate-limit aggressively. Can be increased if using against a test wiki. Should
# stay cautious when used for wikipedia.
EDIT_PAGES_PER_SECOND = 0.1
QUERY_PAGES_PER_SECOND = 1
# TODO: Set a user agent.
def rate_limited(max_per_second):
"""Rate limiting decorator-with-args.
Args:
max_per_second: (float) How many times per second to do the thing.
Returns:
(func): A decorator
"""
interval = 1.0 / float(max_per_second)
def decorator(func):
"""A rate-limiting decorator.
Args:
func: (func) The thing to wrap.
Returns:
(func): The rate-limting function to apply to the thing to wrap.
"""
last_called = [0.0]
def rate_limited_function(*args, **kwargs):
"""The actual rate limiting logic."""
elapsed = time.time() - last_called[0]
wait = interval - elapsed
if wait > 0:
time.sleep(wait)
ret = func(*args, **kwargs)
last_called[0] = time.time()
return ret
return rate_limited_function
return decorator
class WikiException(Exception):
"""Failed to log in to mediawiki."""
pass
class Wiki(object):
"""Login credentials and methods for interacting with a mediawiki isntance."""
def __init__(self, url, username, password, draft_prefix="Draft:"):
"""Log in to the wiki.
Args:
url: (str) url of the wiki
username: login for the wiki
password: password for the wiki
draft_prefix: (str) How draft pages are named.
"""
self.url = url
self.login_cookies = self.get_login_cookies(username, password)
self.draft_prefix = draft_prefix
def get_login_cookies(self, username, password):
"""Return login cookies.
Args:
username: (str)
password: (str)
Returns:
(requests.cookies.RequestsCookieJar): delicious cookies
Raises:
WikiException: login failed.
"""
if not username or not password:
raise WikiException("Empty username or password")
payload = {'action': 'query', 'format': 'json', 'utf8': '',
'meta': 'tokens', 'type': 'login'}
req1 = requests.post(self.url + 'api.php', data=payload)
if not req1.ok:
raise WikiException("query: Got status code %s from %s: %s"% (
req1.status_code, req1.url, req1.reason))
try:
login_token = req1.json()['query']['tokens']['logintoken']
except ValueError, ex:
raise WikiException("Couldn't parse JSON from login token:", ex)
payload = {'action': 'login', 'format': 'json', 'utf8': '',
'lgname': username, 'lgpassword': password,
'lgtoken': login_token}
# TODO: It returns a 200 even for a wrong password. Waaaat?
req2 = requests.post(self.url + 'api.php', data=payload,
cookies=req1.cookies)
if not req2.ok:
raise WikiException("login: Got status code %s from %s: %s"% (
req2.status_code, req2.url, req2.reason))
if len(req2.cookies) == 0:
raise WikiException("Didn't get any login cookies.")
return req2.cookies
@rate_limited(QUERY_PAGES_PER_SECOND)
def does_page_exist(self, page_to_query):
"""Checks whether a page already exists.
Args:
page_to_query: (string) What to look up.
Returns:
(str) the page url if it exists; None otherwise.
Raises:
WikiException: Bad data from the wiki.
"""
params = ('?format=json&action=query&titles=%s&prop=info&inprop=url' %
page_to_query)
req = requests.get(self.url + 'api.php' + params)
if not req.ok:
raise WikiException("Got status code %s from %s: %s"% (
req.status_code, req.url, req.reason))
try:
pages = req.json()['query']['pages']
for k in pages: # though there should only be one
if k != "-1": # we have a live page!
return pages[k]['fullurl']
except ValueError, ex:
raise WikiException("Couldn't parse JSON:", ex)
def does_draft_exist(self, page_to_query):
"""Checks whether a draft page exists.
Args:
page_to_query: (string) What to look up. The page name will be prepended
with |self.draft_prefix|.
Returns:
(str) the page url if it exists; None otherwise.
"""
draft_to_query = "%s%s" % (self.draft_prefix, page_to_query)
return self.does_page_exist(draft_to_query)
@rate_limited(EDIT_PAGES_PER_SECOND)
def create_page(self, person, create_draft=False):
"""Create a page if it doesn't exist. If it already exists, just silently
does nothing.
Args:
person: (candidate.Candidate) data about one candidate
create_draft: (bool) Whether to create a draft page using the
|self.draft_prefix| variable.
Returns:
(str) Url of page, whether newly created or already existing.
Raises:
WikiException: Couldn't create the page. The page already existing does
not raise an exception.
"""
if create_draft:
page_to_edit = "%s%s" % (self.draft_prefix, person.name())
else:
page_to_edit = person.name()
params = '?format=json&action=query&meta=tokens&continue='
req = requests.get(self.url + 'api.php' + params,
cookies=self.login_cookies)
if req.status_code != 200:
raise WikiException("Got status code %s from %s: %s"% (
req.status_code, req.url, req.reason))
try:
edit_token = req.json()['query']['tokens']['csrftoken']
except ValueError, ex:
raise WikiException("Couldn't parse edit token from JSON:", ex)
edit_cookie = self.login_cookies.copy()
edit_cookie.update(req.cookies)
content_to_write = person.wikipedia_content()
payload = {'action': 'edit', 'assert': 'user', 'format': 'json', 'utf8': '',
'text': content_to_write, 'summary': 'candidatebot did this',
'title': page_to_edit, 'token': edit_token, 'createonly': True}
req = requests.post(self.url + 'api.php', data=payload, cookies=edit_cookie)
if not req.ok:
raise WikiException("Got status code %s from %s: %s"% (
req.status_code, req.url, req.reason))
# I can't find this information anywhere except in the text /o\
if req.text.find('"result":"Failure"') > -1:
raise WikiException("Saw error in creation response: %s" % req.text)
created_page = self.does_page_exist(page_to_edit)
if not created_page:
raise WikiException("Wiki page wasn't created at %s, but there were no "
"errors. " % page_to_edit)
# Add to the list of stubs we've created.
link = "[[%s]]<br>" % page_to_edit
list_page = "%s%s" % (self.draft_prefix, "CandidatebotListOfPages")
payload = {'action': 'edit', 'assert': 'user', 'format': 'json',
'utf8': '', 'appendtext': link,
'summary': 'candidatebot did this', 'title': list_page,
'token': edit_token}
req = requests.post(self.url + 'api.php', data=payload,
cookies=edit_cookie)
return created_page