-
Notifications
You must be signed in to change notification settings - Fork 32
/
main.py
226 lines (200 loc) · 12.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import webbrowser, threading, requests, tls_client, ctypes, random, json, time, base64, sys, re, os
from prettytable import PrettyTable
from colorama import init, Fore
from urllib.parse import urlparse, unquote, quote
from string import ascii_letters, digits
class Zefoy:
def __init__(self):
self.base_url = 'https://zefoy.com/'
self.headers = {'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'}
self.session = tls_client.Session(client_identifier="chrome112", random_tls_extension_order=True)
self.captcha_auto_solve = True
self.captcha_1 = None
self.captcha_ = {}
self.service = 'Views'
self.comment_id = None
self.video_key = None
self.services = {}
self.services_ids = {}
self.services_status = {}
self.url = 'None'
self.text = 'By @plowside [No url]'
def get_captcha(self):
if os.path.exists('session'): self.session.cookies.set("PHPSESSID", open('session',encoding='utf-8').read(), domain='zefoy.com')
request = self.session.get(self.base_url, headers=self.headers)
if 'Enter Video URL' in request.text: self.video_key = request.text.split('" placeholder="Enter Video URL"')[0].split('name="')[-1]; return True
elif '<title>Just a moment...</title>' in request.text:
while True:
input('On zefoy.com cloudflare protection is enabled, try again later.')
exit()
try:
for x in re.findall(r'<input type="hidden" name="(.*)" value="(.*)">', request.text): self.captcha_[x[0]] = x[1]
self.captcha_1 = request.text.split('type="text" name="')[1].split('" oninput="this.value=this.value.toLowerCase()"')[0]
captcha_url = request.text.split('<img src="')[1].split('" onerror="imgOnError()" class="')[0]
request = self.session.get(f"{self.base_url}{captcha_url}",headers=self.headers)
open('captcha.png', 'wb').write(request.content)
print('Solving captcha..')
return False
except Exception as e:
print(f"Can\'t get captcha: {e}", type(e))
time.sleep(2)
self.get_captcha()
def send_captcha(self, new_session = False):
if new_session:
self.session = tls_client.Session(client_identifier="chrome112", random_tls_extension_order=True)
try: os.remove('session')
except: pass
time.sleep(2)
if self.get_captcha(): print('Connected to session'); return (True, 'The session already exists')
captcha_solve = self.solve_captcha('captcha.png')[1]
self.captcha_[self.captcha_1] = captcha_solve
request = self.session.post(self.base_url, headers=self.headers, data=self.captcha_)
if 'Enter Video URL' in request.text:
print('Session was created')
open('session','w',encoding='utf-8').write(self.session.cookies.get('PHPSESSID'))
self.video_key = request.text.split('" placeholder="Enter Video URL"')[0].split('name="')[-1]
return (True,captcha_solve)
else: return (False,captcha_solve)
def solve_captcha(self, path_to_file = None, b64 = None, delete_tag = ['\n','\r']):
if path_to_file: task = path_to_file
else: open('temp.png','wb').write(base64.b64decode(b64)); task = 'temp.png'
if self.captcha_auto_solve:
request = requests.post("https://plowsidecaptcha.pythonanywhere.com/captcha", files={"file": ("captcha.png", open(task,'rb'), "image/png")}).json()
solved_text = request['captcha_text']
else:
os.system('start captcha.png')
solved_text = input("Solve captcha(open captcha.png if the image didn't open itself): ")
for x in delete_tag: solved_text = solved_text.replace(x,'')
return (True, solved_text)
def get_status_services(self):
request = self.session.get(self.base_url, headers=self.headers).text
for x in re.findall(r'<h5 class="card-title">.+</h5>\n.+\n.+', request): self.services[x.split('<h5 class="card-title">')[1].split('<')[0].strip()] = x.split('d-sm-inline-block">')[1].split('</small>')[0].strip()
for x in re.findall(r'<h5 class="card-title mb-3">.+</h5>\n<form action=".+">', request): self.services_ids[x.split('title mb-3">')[1].split('<')[0].strip()] = x.split('<form action="')[1].split('">')[0].strip()
for x in re.findall(r'<h5 class="card-title">.+</h5>\n.+<button .+', request): self.services_status[x.split('<h5 class="card-title">')[1].split('<')[0].strip()] = False if 'disabled class' in x else True
return (self.services, self.services_status)
def get_table(self, i = 1):
table = PrettyTable(field_names=["ID", "Services", "Status"], title="Status Services", header_style="upper",border=True)
while True:
if len(self.get_status_services()[0])>1:break
else: print('Cant get services, retrying...');self.send_captcha();time.sleep(2)
for service in self.services: table.add_row([f"{Fore.CYAN}{i}{Fore.RESET}", service, f"{Fore.GREEN if 'ago updated' in self.services[service] else Fore.RED}{self.services[service]}{Fore.RESET}"]); i+=1
table.title = f"{Fore.YELLOW}Total Online Services: {len([x for x in self.services_status if self.services_status[x]])}{Fore.RESET}"
print(table)
def find_video(self):
if self.service is None: return (False, "You didn't choose the service")
while True:
if self.service not in self.services_ids: self.get_status_services(); time.sleep(1)
request = self.session.post(f'{self.base_url}{self.services_ids[self.service]}', headers={'user-agent':self.headers['user-agent'], 'origin':'https://zefoy.com'}, files={self.video_key: (None, self.url)})
try: self.video_info = base64.b64decode(unquote(request.text.encode()[::-1])).decode()
except: time.sleep(3); continue
#print(f'\n\n\n\n\n\n!-----------------------------------------------------!\nVIDEO INFO: {self.video_info}')
if 'Session expired. Please re-login' in self.video_info: print('Session expired. Reloging...');self.send_captcha(); return (False,)
elif 'service is currently not working' in self.video_info: return (True,'Service is currently not working, try again later. | You can change it in config.')
elif """onsubmit="showHideElements""" in self.video_info:
self.video_info = [self.video_info.split('" name="')[1].split('"')[0],self.video_info.split('value="')[1].split('"')[0]]
return (True, request.text)
elif 'Checking Timer...' in self.video_info:
try: t=int(re.findall(r'ltm=(\d*);', self.video_info)[0])
except: return (False,)
if t==0:self.find_video()
elif t >= 1000: print('Your IP was banned')
print(f'Time to next use: {t}')
_=time.time()
while time.time()-2<_+t:time.sleep(1)
continue
elif 'Too many requests. Please slow' in self.video_info: time.sleep(3)
else: print(self.video_info)
def use_service(self):
if self.find_video()[0] is False: return False
self.token = "".join(random.choices(ascii_letters+digits, k=16))
payload = {self.video_info[0]: (None, self.video_info[1])}
request = self.session.post(f'{self.base_url}{self.services_ids[self.service]}', headers={'user-agent':self.headers['user-agent'], 'origin':'https://zefoy.com'}, files=payload)
try: res = base64.b64decode(unquote(request.text.encode()[::-1])).decode()
except: time.sleep(3); return ""
#print(f'\n\n\n\n\n\n!-----------------------------------------------------!\nRESPONSE: {res}')
if self.service == 'Comments Hearts':
v = re.search(r'<i class="text-red fa fa-heart"><\/i><\/div>\n<input type="hidden" name="([^"]+)".*\n<input type="hidden" name="([^"]+)"', res)
if not v: time.sleep(3); return ""
request = self.session.post(f'{self.base_url}{self.services_ids[self.service]}', headers={'user-agent':self.headers['user-agent'], 'origin':'https://zefoy.com'}, files={v.group(1): (None, self.video_info[1]), v.group(2): (None, self.comment_id)})
try: res = base64.b64decode(unquote(request.text.encode()[::-1])).decode()
except: time.sleep(3); return ""
#print(f'\n\n\n\n\n\n!-----------------------------------------------------!\nRESPONSE: {res}')
if 'Session expired. Please re-login' in res: print('Session expired. Reloging...');self.send_captcha(); return ""
elif 'sans-serif;text-align:center;color:green;\'>' in res: print(res.split("sans-serif;text-align:center;color:green;'>")[1].split("</")[0].strip())
elif 'Too many requests. Please slow' in res or 'Checking Timer' in res: time.sleep(3)
elif 'service is currently not working' in res: return ('Service is currently not working, try again later. | You can change it in config.')
elif 'Please try again later. Server too busy' in self.video_info: print('Error on submit: Please try again later. Server too busy.')
else: print(res.split("sans-serif;text-align:center;color:green;'>")[1].split("</")[0].strip())
def get_video_info(self):
request = self.session.get(f'https://tiktok.livecounts.io/video/stats/{urlparse(self.url).path.rpartition("/")[2]}',headers={'authority':'tiktok.livecounts.io','origin':'https://livecounts.io','user-agent':self.headers['user-agent']}).json()
if 'viewCount' in request: return request
else: return {'viewCount':0, 'likeCount':0,'commentCount':0,'shareCount':0}
def get_video_id(self, url_ = None, set_url=True):
if url_ is None: url_ = self.url
if url_[-1] == '/': url_=url_[:-1]
url = urlparse(url_).path.rpartition('/')[2]
if url.isdigit(): self.url = url_; return url_
request = self.session.get(f'https://api.tokcount.com/?type=videoID&username=https://vm.tiktok.com/{url}',headers={'origin': 'https://tokcount.com','authority': 'api.tokcount.com','user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'})
if request.text == '': print('Invalid URL | Replace in config'); return False
else: json_=request.json()
if 'author' not in json_: print(f'{self.url}| invalid URL | Replace in config'); return False
if set_url: self.url = f'https://www.tiktok.com/@{json_["author"]}/video/{json_["id"]}';print(f'Formated video url --> {self.url}')
return request.text
def check_config(self, once=False):
while True:
try:
last_url = self.url
config = json.loads(open('config.json',encoding='utf-8',errors='ignore').read())
self.url = config['url']
self.service = config['service']
self.comment_id = config['comment_id']
self.captcha_auto_solve = config['captcha_auto_solve']
self.proxy_ = config['proxy'] if config['proxy'] not in ('', ' ') else None
try:
if self.proxy_:
proxy = self.proxy_.split(':')
if len(proxy) == 2: proxy_str = f"http://{proxy[0]}:{proxy[1]}"
else: proxy_str = f"http://{proxy[2]}:{proxy[3]}@{proxy[0]}:{proxy[1]}"
if self.session.proxies != {"http": proxy_str,"https": proxy_str}:
print(proxy_str)
self.session.proxies = {"http": proxy_str,"https": proxy_str}
if self.session.get('http://ip.bablosoft.com').status_code != 200: raise Exception('Invalid proxy')
print('Proxies are connected')
else:
if self.session.proxies != {}:
print('Proxies are disabled')
self.session.proxies = {}
except Exception as e:
print(f'Failed to change the proxy: {e}')
self.proxy_ = None
self.session.proxies = {}
if last_url != self.url: self.get_video_id()
self.change_config()
except Exception as e: print(e)
if once: break
time.sleep(4)
def change_config(self):
open('config.json','w',encoding='utf-8',errors='ignore').write(json.dumps({'url':self.url,'service':self.service,'comment_id':self.comment_id,'proxy':self.proxy_,'captcha_auto_solve':self.captcha_auto_solve},indent=4))
def update_name(self):
while True:
try:
ctypes.windll.kernel32.SetConsoleTitleA(self.text.encode())
video_info = self.get_video_info()
self.text = f"By @plowside | Views: {video_info['viewCount']} | Likes: {video_info['likeCount']} | Comments: {video_info['commentCount']} | Shares: {video_info['shareCount']}"
except: pass
time.sleep(5)
if os.path.exists('config.json') is False: open('config.json','w',encoding='utf-8',errors='ignore').write(json.dumps({'url':'https://www.tiktok.com/t/ZTRToxYct','service':'Views','comment_id':None,'proxy':None,'captcha_auto_solve':False},indent=4))
Z = Zefoy()
Z.check_config(True)
threading.Thread(target=Z.check_config).start()
threading.Thread(target=Z.update_name).start()
Z.send_captcha()
Z.get_table()
while True:
try:
if 'Service is currently not working, try again later' in str(Z.use_service()):
print('Service is currently not working, try again later. | You can change it in config.')
time.sleep(5)
except Exception as e:
print(f'Critical ERROR | retrying in 30 seconds. ||| {e}')
time.sleep(30)