-
Notifications
You must be signed in to change notification settings - Fork 1
/
oncaller.py
173 lines (137 loc) · 4.77 KB
/
oncaller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from sys import exit
from datetime import datetime, timedelta, date
import os.path
import pickle
import click
import json
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import csv_reader
from itertools import cycle, islice
from pprint import pprint
scope = ["https://www.googleapis.com/auth/calendar"]
credentials = None
# default timezone
defaultTimeZone = "America/Los_Angeles"
# your public calendarID
defaultcalendarId = "[email protected]"
oncall_engineers = csv_reader.rotations
oncall_teams = csv_reader.team_details
oncall_managers = csv_reader.emails
try:
with open("./credentials/token.pickle", "rb") as token:
credentials = pickle.load(token)
except:
if not credentials or not credentials.valid:
if credentials and credentials.expired and credentials.refresh_token:
credentials.refresh(Request())
else:
print("""Something wrong with creds""")
raise
service = build("calendar", "v3", credentials=credentials)
# Call G Calendar API
def get_cal_events():
now = datetime.utcnow().isoformat() + "Z"
events_result = (
service.events()
.list(
calendarId=defaultcalendarId,
timeMin=now,
maxResults=30,
singleEvents=True,
orderBy="startTime",
)
.execute()
)
events = events_result.get("items", [])
if not events:
print("No upcoming oncall events!")
for event in events:
start = event["start"].get("dateTime", event["start"].get("date"))
print(start, event["summary"])
calendarList = service.calendarList().list().execute()["items"]
calendar_id = calendarList[0]["id"]
return events
def get_team(name):
team_info = csv_reader.team_details
return team_info[name]
def get_manager_info(name):
manager_info = csv_reader.emails
return manager_info[name]
def get_techlead_info(name):
techlead_info = csv_reader.emails
return techlead_info[name]
def create_event(
on_call_rotation_index,
rotation,
engineer_info,
team_info,
manager_info,
techlead_info,
next_monday,
number_of_eng,
):
start = next_monday + timedelta(weeks=on_call_rotation_index)
end = start + timedelta(days=6)
event = {
"summary": f"{rotation} Rotation: {engineer_info['name']} ({team_info['emoji']} {team_info['name']} ) ",
"start": {
"dateTime": start.strftime("%Y-%m-%dT%H:%M:%M"),
"timeZone": defaultTimeZone,
},
"end": {
"dateTime": end.strftime("%Y-%m-%dT%H:%M:%M"),
"timeZone": defaultTimeZone,
},
"recurrence": [
# len of oncall number_of_eng
f"RRULE:FREQ=WEEKLY;INTERVAL={number_of_eng};COUNT=1" ### for recurring X-week basis set COUNT=X
],
"attendees": [
{"email": f"{engineer_info['email']}"},
{"email": f"{manager_info['email']}"},
{"email": f"{techlead_info['email']}"},
],
"reminders": {
"useDefault": False,
"overrides": [
{"method": "email", "minutes": 24 * 60},
{"method": "popup", "minutes": 10},
],
},
}
event = (
service.events()
.insert(calendarId=defaultcalendarId, body=event, sendNotifications=True)
.execute()
)
print(event)
print("Event created: %s" % (event.get("htmlLink")))
if __name__ == "__main__":
print("Checking next 14 oncall events on calendar...")
if click.confirm("Do you want to continue?", default=False):
get_cal_events()
print("Sheduling oncalls rotations on calendar...")
if click.confirm("Do you want to continue?!", default=True):
today = datetime.today()
next_monday = today + timedelta(days=-today.weekday(), weeks=1)
for rotation in oncall_engineers.keys():
number_of_eng = len(oncall_engineers[rotation])
for on_call_rotation_index in oncall_engineers[rotation]:
engineer_info = oncall_engineers[rotation][on_call_rotation_index]
team_info = get_team(engineer_info["team"])
manager_info = get_manager_info(team_info["manager"])
techlead_info = get_manager_info(team_info["tech lead"])
print(
create_event(
on_call_rotation_index,
rotation,
engineer_info,
team_info,
manager_info,
techlead_info,
next_monday,
number_of_eng,
)
)