-
Notifications
You must be signed in to change notification settings - Fork 4
/
zoom_monitor.py
250 lines (215 loc) · 10.3 KB
/
zoom_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
from contextlib import contextmanager
import time
import urllib.parse
from selenium.common.exceptions import (ElementClickInterceptedException,
ElementNotInteractableException,
NoSuchElementException,
TimeoutException
)
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from viam.logging import getLogger, setLevel
import browser
# XPath path expression to find participants button node
PARTICIPANTS_BTN = ".//*[contains(@class, 'SvgParticipants')]"
@contextmanager
def monitor_zoom(url, log_level):
zoom = ZoomMonitor(url, log_level)
try:
yield zoom
finally:
zoom.clean_up()
class MeetingEndedException(Exception):
"""
We'll raise this if we detect that the meeting has been ended by the host.
"""
pass
class ZoomMonitor():
"""
Given a URL to a Zoom meeting, join the meeting using Selenium controlling
a Chrome browser. We provide a way to count how many meeting participants
currently have their hands raised.
"""
def __init__(self, url, log_level):
self._logger = getLogger(__name__)
self._meeting_ended = False
setLevel(log_level)
self._driver = browser.spawn_driver()
raw_url = self._get_raw_url(url)
self._logger.debug(f"parsed URL {url} to {raw_url}")
self._driver.get(raw_url)
self._join_meeting()
@staticmethod
def _get_raw_url(url):
"""
Remove any Google redirection or Zoom prompts to the Zoom meeting.
Returns the URL needed to connect inside the Selenium browser.
"""
# Remove all blackslashes since shells may automatically add them.
url = url.replace("\\", "")
# Google Calendar wraps its links in a redirect. In these links, the
# "real" URL is stored in the `q` parameter in the CGI arguments.
parsed_url = urllib.parse.urlparse(url)
if "google.com" in parsed_url.netloc:
cgi_params = urllib.parse.parse_qs(parsed_url.query)
url = cgi_params["q"][0]
# Many links that we receive from Zoom will prompt you to open the
# Zoom app if it's available. Replace the domain name and first couple
# directories in the path to skip that.
return f"https://app.zoom.us/wc/join/{url.split('/')[-1]}"
def _join_meeting(self):
"""
Set our name and join the meeting.
"""
self._logger.debug("logging in...")
self._wait_for_element(By.ID, "input-for-name")
self._driver.find_element(By.ID, "input-for-name").send_keys(
"Hand Raiser Bot")
self._driver.find_element(By.CSS_SELECTOR, ".zm-btn").click()
self._wait_for_element(By.XPATH, PARTICIPANTS_BTN, timeout_s=30)
self._logger.info("logged into Zoom successfully")
def _wait_for_element(self, approach, value, timeout_s=5):
"""
Wait until there is at least one element identified by the approach
and value. If `timeout_s` seconds elapse without such an element
appearing, we raise a TimeoutException.
"""
WebDriverWait(self._driver, timeout_s).until(lambda _:
len(self._driver.find_elements(approach, value)) != 0)
def _check_if_meeting_ended(self):
"""
Throw a MeetingEndedException if the meeting has been ended by the
host, and otherwise do nothing.
"""
try:
modal_title = self._driver.find_element(
By.CLASS_NAME, "zm-modal-body-title")
except NoSuchElementException:
return
if modal_title.text == "This meeting has been ended by host":
self._meeting_ended = True # Don't try logging out later
raise MeetingEndedException()
def _ignore_recording(self):
"""
If we are notified that someone is recording this meeting, click past
so we can count hands some more. This notification can come either at
the beginning if we joined when the recording was already in progress,
or in the middle of the meeting if someone starts recording.
"""
try:
outer = self._driver.find_element(
By.CLASS_NAME, "recording-disclaimer-dialog")
except NoSuchElementException:
return # No one has started recording a video recently!
# Click "Got it" to acknowledge that the meeting is being recorded.
outer.find_element(By.CLASS_NAME, "zm-btn--primary").click()
def _open_participants_list(self):
"""
Wait until we can open the participants list, then open it, then wait
until it's opened.
"""
# First, check if it's already opened, and if so return immediately.
try:
self._driver.find_element(
By.CLASS_NAME, "participants-wrapper__inner")
return # Already opened!
except NoSuchElementException:
pass # We need to open it.
# Right when we join Zoom, the participants button is not clickable so
# we have to wait. Attempt to click the button a few times.
for attempt in range(5):
try:
button = self._find_participants_button()
except NoSuchElementException:
self._logger.info("Could not find participants button.")
time.sleep(1)
continue # Go to the next attempt
# Sometimes, the button is hidden off the bottom of the window,
# but moving the mouse to it will make it visible again. This
# tends to happen after someone stops sharing their screen.
ActionChains(self._driver).move_to_element(button).perform()
try:
button.click()
except (ElementClickInterceptedException,
ElementNotInteractableException) as e:
self._logger.info(f"DOM isn't set up ({e}); try again soon.")
time.sleep(1)
continue # Go to the next attempt
self._logger.debug("participants list clicked")
try:
# Now that we've clicked the participants list without raising
# an exception, wait until it shows up. If it doesn't show up
# yet, it might be that we've highlighted the button but
# haven't properly clicked it, and the next iteration's attempt
# will succeed.
self._wait_for_element(
By.CLASS_NAME, "participants-wrapper__inner", timeout_s=1)
except TimeoutException:
self._logger.info("timed out waiting for participants list,"
"will try clicking again soon.")
continue # Go to the next attempt
self._logger.info("participants list opened")
return # Success!
# If we get here, none of our attempts opened the participants list.
raise ElementClickInterceptedException(
f"Could not open participants list after {attempt + 1} attempts")
def _find_participants_button(self):
"""
We want to click on an item with the participants icon. However, the
icon itself is not clickable. A click would be intercepted by its
grandparent element, a button with the class
"footer-button-base__button". Since it's not obvious how to click an
SVG element's grandparent, look through all footer buttons.
Return the button that contains the participants icon.
"""
for outer in self._driver.find_elements(By.CLASS_NAME,
"footer-button-base__button"):
self._logger.debug(f"trying to find participants button in {outer}")
try:
# Check if this footer button contains the participants
outer.find_element(By.XPATH, PARTICIPANTS_BTN)
return outer
except NoSuchElementException:
self._logger.debug("participants not present, next...")
continue # wrong footer element, try the next one
raise NoSuchElementException("could not find participants button")
def clean_up(self):
"""
Leave the meeting and shut down the web server.
"""
try: # If anything goes wrong, close the browser anyway.
if self._meeting_ended:
return # Just abandon the meeting without trying to leave it.
# Find the "leave" button and click on it.
self._driver.find_element(
By.CLASS_NAME, "footer__leave-btn").click()
self._driver.find_element(
By.CLASS_NAME, "leave-meeting-options__btn").click()
finally:
self._driver.quit()
def count_hands(self):
"""
Return the number of people in the participants list with raised hands
"""
self._check_if_meeting_ended()
self._ignore_recording()
# WARNING: there's a race condition right here. If someone starts
# recording the meeting here, after _ignore_recording returns and
# before _open_participants_list runs, we will time out opening the
# list and crash. It's such an unlikely event that we haven't bothered
# fixing it yet.
# If someone else shares their screen, it closes the participants list.
# So, try reopening it every time we want to count hands.
self._open_participants_list()
# We want to find an SVG element whose class is
# "lazy-svg-icon__icon lazy-icon-nvf/270b". However,
# `find_elements(By.CLASS_NAME, ...)` has problems when the class name
# contains a slash. So, instead we use xpath to find class names that
# contain "270b" (the hex value of the Unicode code point for the "hand
# raised" emoji). Elements whose class contains "270b" show up in
# several places, however, so we restrict it to only the ones that are
# within the participants list.
return len(self._driver.find_elements(
By.XPATH, "//*[@class='participants-wrapper__inner']"
"//*[contains(@class, '270b')]"))