Skip to content

Commit

Permalink
Update database_manager.py
Browse files Browse the repository at this point in the history
Code Quality fixes
  • Loading branch information
j54j6 committed May 17, 2024
1 parent e930f46 commit 339d9d4
Showing 1 changed file with 45 additions and 41 deletions.
86 changes: 45 additions & 41 deletions database_manager.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
#!/usr/bin/env python

"""
#
# Project by j54j6
# This file provides a simple abstraction layer database files for most projects
#

"""
#Python modules
import logging
import logging
import os
import json

#temporarily removed sql alchemy. It is not possible to use a dynamic database scheme (JSON Based) scheme. HELP NEEDED :)
#temporarily removed sql alchemy.
#It is not possible to use a dynamic database scheme (JSON Based) scheme.
#HELP NEEDED :)
#Feel free to add it - so we can support both SQLite and MySQL
#from sqlalchemy import create_engine, Column, Integer, String, Engine, MetaData, StaticPool, text
#from sqlalchemy import create_engine, Column, Integer, String, engine, MetaData, StaticPool, text

#As replacement use sqlite python module
import sqlite3
Expand All @@ -25,16 +26,16 @@
#Variabvle to check if the db is already initialized
global db_init
db_init:bool = False
#Engine Object
global engine
engine = None
#ENGINE Object
global ENGINE
ENGINE = None

# init logger
logger = logging.getLogger(__name__)

def check_db():
"""This function is used to initialize the database. """
global engine
global ENGINE
global db_init
logger.info("Init database...")
logger.info("read config...")
Expand All @@ -45,7 +46,7 @@ def check_db():

db_driver = config.get("db", "db_driver")
try:
if(db_driver == "sqlite"):
if db_driver == "sqlite":
logger.info("Selected DB Driver is SQLite")
db_path = os.path.abspath(config.get("db", "db_path"))

Expand All @@ -55,14 +56,14 @@ def check_db():
# return False

#OLD SQL ALCHEMY CODE
#engine = create_engine(f"sqlite:///{db_path}")
#engine.connect()
#logger.info("Engine created")
#ENGINE = create_ENGINE(f"sqlite:///{db_path}")
#ENGINE.connect()
#logger.info("ENGINE created")
#db_init = True

#NEW SQLite Code
try:
engine = sqlite3.connect(db_path, check_same_thread=False)
ENGINE = sqlite3.connect(db_path, check_same_thread=False)
db_init = True
logger.debug("DB initializied!")
return True
Expand All @@ -71,35 +72,37 @@ def check_db():
logger.error("Error while conencting to SQLite DB! - Error: %s", e)
return False

elif(db_driver == "mysql"):
elif db_driver == "mysql":
logger.error("Currently MySQL is not supported :( - If you are able to use SQLAlchemy feel free to modify this file and create a PR <3)")
return False
# username = config.get("db", "db_user")
# password = config.get("db", "db_pass")
# hostname = config.get("db", "db_host")
# database_name = config.get("db", "db_name")
# engine = create_engine(f"mysql://{username}:{password}@{hostname}/{database_name}")
# engine.connect()
# logger.info("Engine created")
# ENGINE = create_ENGINE(f"mysql://{username}:{password}@{hostname}/{database_name}")
# ENGINE.connect()
# logger.info("ENGINE created")
# db_init = True
# return True
elif(db_driver == "memory"):
elif db_driver == "memory":
logger.info("Selected DB Driver is SQLite-Memory")

#OLD SQLALCHEMY Code
#engine = create_engine("sqlite://" ,
#ENGINE = create_ENGINE("sqlite://" ,
# connect_args={'check_same_thread':False},
# poolclass=StaticPool)
#engine.connect()
#logger.info("Engine created")
#ENGINE.connect()
#logger.info("ENGINE created")
#db_init = True
#return True
try:
engine = sqlite3.connect("file::memory:?cache=shared")
ENGINE = sqlite3.connect("file::memory:?cache=shared")
db_init = True
return True
except sqlite3.Error as e:
logger.error("Error while creating in memory %s Database! - SQL Error: %s", db_driver, e)
#Line Break because of PyLint best Practice (C0301)
logger.error("Error while creating in memory %s Database! - SQL Error: %s",
db_driver, e)
return False
else:
logger.error("Currently only SQLite and MySQL is supported :) - Please choose one ^^")
Expand All @@ -109,10 +112,11 @@ def check_db():
return False

def check_table_exist(table_name:str):
""" This function checks if the passed table name exists in the database"""
#OLD SQLALCHEMY CODE
#sql_meta = MetaData()
#try:
# sql_meta.reflect(bind=engine)#
# sql_meta.reflect(bind=ENGINE)#

# if table_name in sql_meta.tables:
# return True
Expand All @@ -128,21 +132,21 @@ def check_table_exist(table_name:str):
if not init:
return False

cursor = engine.cursor()
cursor = ENGINE.cursor()

try:
table_exist = cursor.execute("""SELECT name FROM sqlite_master WHERE type='table'
AND name=?; """, [table_name]).fetchall()

if table_exist == []:
return False
else:
return True
return True
except sqlite3.Error as e:
logger.error("Error while checking for table! - Error: %s",e)
return False

def prepare_sql_create_statement(name, scheme):
""" This function is used to create tables based on a defined json scheme. Check documentation for help """
query:str = f"CREATE TABLE {name} ("
primary_key_defined = False
#Iterate over all defined columns. Check for different optionas and add them to the query.
Expand All @@ -161,19 +165,19 @@ def prepare_sql_create_statement(name, scheme):
return False
c_query += " " + options["type"]

if "not_null" in options and options["not_null"] == True:
if "not_null" in options and options["not_null"] is True:
c_query += " NOT NULL"

if "primary_key" in options and options["primary_key"] == True and not primary_key_defined:
if "primary_key" in options and options["primary_key"] is True and not primary_key_defined:
c_query += " PRIMARY KEY"
primary_key_defined = True
elif "primary_key" in options and options["primary_key"] == True and primary_key_defined == True:
elif "primary_key" in options and options["primary_key"] is True and primary_key_defined is True:
logger.warning("There are at least 2 primary keys defined! - Please check config. Ignore Primary Key %s", column_name)

if "auto_increment" in options and options["auto_increment"] == True:
if "auto_increment" in options and options["auto_increment"] is True:
c_query += " AUTOINCREMENT"

if "unique" in options and options["unique"] == True:
if "unique" in options and options["unique"] is True:
c_query += " UNIQUE"

if "default" in options:
Expand Down Expand Up @@ -212,7 +216,7 @@ def create_table(name:str, scheme:json):

#OLD SQLALCHEMY CODE
#try:
# with engine.connect() as conn:
# with ENGINE.connect() as conn:
# conn.execute(text(query))
# conn.commit()
# return True
Expand All @@ -221,9 +225,9 @@ def create_table(name:str, scheme:json):
# return False

try:
cursor = engine.cursor()
cursor = ENGINE.cursor()
cursor.execute(query)
engine.commit()
ENGINE.commit()

table_exist = check_table_exist(name)

Expand Down Expand Up @@ -262,7 +266,7 @@ def fetch_value(table:str, row_name:str, value:str, data_filter:list = None, is_
#logger.debug(f"Prepared Query: {query}")

#try:
# with engine.connect() as conn:
# with ENGINE.connect() as conn:
# logger.debug(f"Prepared query: {query}")
# data = conn.execute(text(query))
# if not is_unique:
Expand All @@ -272,7 +276,7 @@ def fetch_value(table:str, row_name:str, value:str, data_filter:list = None, is_
#except Exception as e:
# logger.error(f"Error while executing Insert Statement! - Error: {e}")
# return False
cursor = engine.cursor()
cursor = ENGINE.cursor()
try:
query = f"SELECT {query_filter} from {table} WHERE {row_name} = ?;"
data = cursor.execute(query, [value])
Expand Down Expand Up @@ -366,7 +370,7 @@ def insert_value(table:str, data:json):
#query = f"Insert into {table} ({keys}) VALUES ({values});"
#logger.debug(f"Prepared Query: {query}")
#try:
# with engine.connect() as conn:
# with ENGINE.connect() as conn:
# conn.execute(text(query))
# conn.commit()
# return True
Expand All @@ -375,15 +379,15 @@ def insert_value(table:str, data:json):
# return False

try:
cursor = engine.cursor()
cursor = ENGINE.cursor()
len_data = len(data)
value_placeholder = ""
for _ in range(len_data):
value_placeholder += "?,"
value_placeholder = value_placeholder[:-1]
query = f"Insert into {table} ({keys}) VALUES ({value_placeholder})"
cursor.execute(query, values)
engine.commit()
ENGINE.commit()

#Maybe a check if all data are inserted will be added in the future by adding a select statement (call fetch function)
return True
Expand Down

0 comments on commit 339d9d4

Please sign in to comment.