-
Notifications
You must be signed in to change notification settings - Fork 0
/
payment.py
109 lines (88 loc) · 4.15 KB
/
payment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# oldnum
# telegram: oldnum
# github: https://github.com/oldnum
# LiqPayAPI: https://www.liqpay.ua/doc/api
import asyncio, aiohttp
import base64, os, json
from typing import Union
from liqpay.liqpay import LiqPay
from datetime import datetime, timedelta, timezone
public_key = os.environ["public_key"]
private_key = os.environ["private_key"]
class LiqPayApi:
def __init__(self):
self.liqpay = LiqPay(public_key, private_key)
self.data_const = {
"version": "3",
"public_key": public_key
}
async def generate_url_for_pay(self, id_user: int, client: str, product_category: str, product_name: str, amount: float, id_pay: str) -> Union[str, bool]:
data = {k: v for k, v in self.data_const.items()}
time_start = datetime.now().strftime("%Y-%m-%d %H:%M")
expired_date = (datetime.now(timezone.utc) + timedelta(minutes=5)).strftime('%Y-%m-%d %H:%M:%S')
order_data = f"Payment from the user: '{id_user}', at the price: '{amount}', ID-ORDER: '{id_pay}'"
info_user = {
"_id": id_user,
"client": client
}
json_string = json.dumps(info_user)
data_dae = base64.b64encode(json_string.encode('utf-8')).decode('utf-8')
data['action'] = "pay"
data["amount"] = amount
data["currency"] = "UAH"
data["language"] = "uk"
data["product_category"] = product_category
data["product_name"] = product_name
data["dae"] = data_dae
data["description"] = f"Payment '{product_name} ({product_category})' from the user '{client}' ({id_user}), ID-ORDER: '{id_pay}'"
data["order_id"] = order_data
data["expired_date"] = expired_date
data["result_url"] = f"https://t.me/oldnum"
data["server_url"] = None
data_to_sign = self.liqpay.data_to_sign(data)
params = {'data': data_to_sign,
'signature': self.liqpay.cnb_signature(data)}
try:
url = 'https://www.liqpay.ua/api/3/checkout/'
async with aiohttp.ClientSession() as session:
async with session.post(url, data=params) as res:
if res.status == 200:
return res.url
return False
except:
return False
async def get_order_status(self, id_user: int, amount: float, id_pay: str) -> Union[dict]:
data = {k: v for k, v in self.data_const.items()}
order_data = f"Payment from the user: '{id_user}', at the price: '{amount}', ID-ORDER: '{id_pay}'"
data["action"] = "status"
data["order_id"] = order_data
result = self.liqpay.api("request", data)
if result.get("action") == "pay":
if result.get('public_key') == public_key:
if result.get('status') == "success":
return {
"status": True,
"text": "🟢 - Successfully paid !",
"data": result
}
elif result.get('status') in ["error","failure"]:
return {"status": False,
"text": "🔴 - Unsuccessful payment ..."}
elif result.get('code') == "wait_info":
return {"status": False,
"text": "🟡 - Payment confirmation is awaited ..."}
elif result.get('status') in ["try_again", "reversed"]:
return {"status": False,
"text": "🟡 - Payment declined, please try again ..."}
elif result.get('code') == "expired":
return {"status": False,
"text": "🔴 - The time for payment has already passed ..."}
return {"status": False,
"text": "🟡 - This order has not yet been paid for ..."}
async def main():
payment = LiqPayApi()
url_pay = await payment.generate_url_for_pay(17011706, "Benjamin Franklin", "Money", "Dollar", 42.0, "#BFMD1872")
print(url_pay)
status_pay = await payment.get_order_status(17011706, 42.0, "#BFMD1872")
print(status_pay)
asyncio.run(main())