-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection.py
54 lines (43 loc) · 1.52 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from psycopg2 import connect, ProgrammingError
class DatabaseConnection:
def __init__(self, connection_config: dict):
self.connection = connect(**connection_config)
def execute_query(self, query: str, data=None) -> tuple:
result = None
cursor = self.connection.cursor()
if data:
cursor.execute(query, data)
else:
cursor.execute(query)
try:
result = cursor.fetchall()
except ProgrammingError:
result = []
self.connection.commit()
cursor.close()
return result
def close(self):
self.connection.close()
class QueriesManager:
def __init__(self):
self._queries = {}
def add_query(self, path: str, name: str):
with open(path, "r", encoding="utf-8") as file:
contents = file.read()
self._queries[name] = contents
def __getitem__(self, query_name: str) -> str:
return self._queries[query_name]
def __setitem__(self, query_name: str, path: str) -> str:
return self.add_query(path, query_name)
def items(self):
for name, query in self._queries.items():
yield name, query
def __str__(self) -> str:
result = []
result.append("QueriesManager")
for query_name, query in self._queries.items():
shortened_query = query.split()[0]
buffer = f"\t{query_name}: {shortened_query}...;"
result.append(buffer)
result = "\n".join(result)
return result