diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index df9ba5f206146..e924bcef494b9 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -2583,11 +2583,6 @@ def test_illegal_names(self): # -- Old tests from 0.13.1 (before refactor using sqlalchemy) -def date_format(dt): - """Returns date in YYYYMMDD format.""" - return dt.strftime("%Y%m%d") - - _formatters = { datetime: "'{}'".format, str: "'{}'".format, @@ -2616,31 +2611,44 @@ def format_query(sql, *args): def tquery(query, con=None): """Replace removed sql.tquery function""" res = sql.execute(query, con=con).fetchall() - if res is None: - return None - else: - return list(res) + return None if res is None else list(res) -@pytest.mark.single -class TestXSQLite(SQLiteMixIn): - @pytest.fixture(autouse=True) - def setup_method(self, request, datapath): - self.method = request.function +class TestXSQLite: + def setup_method(self): self.conn = sqlite3.connect(":memory:") - # In some test cases we may close db connection - # Re-open conn here so we can perform cleanup in teardown - yield - self.method = request.function - self.conn = sqlite3.connect(":memory:") + def teardown_method(self): + self.conn.close() + + def drop_table(self, table_name): + cur = self.conn.cursor() + cur.execute(f"DROP TABLE IF EXISTS {sql._get_valid_sqlite_name(table_name)}") + self.conn.commit() def test_basic(self): frame = tm.makeTimeDataFrame() - self._check_roundtrip(frame) + sql.to_sql(frame, name="test_table", con=self.conn, index=False) + result = sql.read_sql("select * from test_table", self.conn) - def test_write_row_by_row(self): + # HACK! Change this once indexes are handled properly. + result.index = frame.index + expected = frame + tm.assert_frame_equal(result, frame) + + frame["txt"] = ["a"] * len(frame) + frame2 = frame.copy() + new_idx = Index(np.arange(len(frame2))) + 10 + frame2["Idx"] = new_idx.copy() + sql.to_sql(frame2, name="test_table2", con=self.conn, index=False) + result = sql.read_sql("select * from test_table2", self.conn, index_col="Idx") + expected = frame.copy() + expected.index = new_idx + expected.index.name = "Idx" + tm.assert_frame_equal(expected, result) + + def test_write_row_by_row(self): frame = tm.makeTimeDataFrame() frame.iloc[0, 0] = np.nan create_sql = sql.get_schema(frame, "test") @@ -2682,7 +2690,6 @@ def test_schema(self): if len(tokens) == 2 and tokens[0] == "A": assert tokens[1] == "DATETIME" - frame = tm.makeTimeDataFrame() create_sql = sql.get_schema(frame, "test", keys=["A", "B"]) lines = create_sql.splitlines() assert 'PRIMARY KEY ("A", "B")' in create_sql @@ -2727,30 +2734,6 @@ def test_execute_closed_connection(self): with tm.external_error_raised(sqlite3.ProgrammingError): tquery("select * from test", con=self.conn) - def test_na_roundtrip(self): - pass - - def _check_roundtrip(self, frame): - sql.to_sql(frame, name="test_table", con=self.conn, index=False) - result = sql.read_sql("select * from test_table", self.conn) - - # HACK! Change this once indexes are handled properly. - result.index = frame.index - - expected = frame - tm.assert_frame_equal(result, expected) - - frame["txt"] = ["a"] * len(frame) - frame2 = frame.copy() - new_idx = Index(np.arange(len(frame2))) + 10 - frame2["Idx"] = new_idx.copy() - sql.to_sql(frame2, name="test_table2", con=self.conn, index=False) - result = sql.read_sql("select * from test_table2", self.conn, index_col="Idx") - expected = frame.copy() - expected.index = new_idx - expected.index.name = "Idx" - tm.assert_frame_equal(expected, result) - def test_keyword_as_column_names(self): df = DataFrame({"From": np.ones(5)}) sql.to_sql(df, con=self.conn, name="testkeywords", index=False) @@ -2776,13 +2759,6 @@ def test_if_exists(self): table_name = "table_if_exists" sql_select = f"SELECT * FROM {table_name}" - def clean_up(test_table_to_drop): - """ - Drops tables created from individual tests - so no dependencies arise from sequential tests - """ - self.drop_table(test_table_to_drop) - msg = "'notvalidvalue' is not valid for if_exists" with pytest.raises(ValueError, match=msg): sql.to_sql( @@ -2791,7 +2767,7 @@ def clean_up(test_table_to_drop): name=table_name, if_exists="notvalidvalue", ) - clean_up(table_name) + self.drop_table(table_name) # test if_exists='fail' sql.to_sql( @@ -2819,301 +2795,7 @@ def clean_up(test_table_to_drop): index=False, ) assert tquery(sql_select, con=self.conn) == [(3, "C"), (4, "D"), (5, "E")] - clean_up(table_name) - - # test if_exists='append' - sql.to_sql( - frame=df_if_exists_1, - con=self.conn, - name=table_name, - if_exists="fail", - index=False, - ) - assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")] - sql.to_sql( - frame=df_if_exists_2, - con=self.conn, - name=table_name, - if_exists="append", - index=False, - ) - assert tquery(sql_select, con=self.conn) == [ - (1, "A"), - (2, "B"), - (3, "C"), - (4, "D"), - (5, "E"), - ] - clean_up(table_name) - - -@pytest.mark.single -@pytest.mark.db -@pytest.mark.skip( - reason="gh-13611: there is no support for MySQL if SQLAlchemy is not installed" -) -class TestXMySQL(MySQLMixIn): - @pytest.fixture(autouse=True, scope="class") - def setup_class(cls): - pymysql = pytest.importorskip("pymysql") - pymysql.connect(host="localhost", user="root", passwd="", db="pandas") - try: - pymysql.connect(read_default_group="pandas") - except pymysql.ProgrammingError as err: - raise RuntimeError( - "Create a group of connection parameters under the heading " - "[pandas] in your system's mysql default file, " - "typically located at ~/.my.cnf or /etc/.my.cnf." - ) from err - except pymysql.Error as err: - raise RuntimeError( - "Cannot connect to database. " - "Create a group of connection parameters under the heading " - "[pandas] in your system's mysql default file, " - "typically located at ~/.my.cnf or /etc/.my.cnf." - ) from err - - @pytest.fixture(autouse=True) - def setup_method(self, request, datapath): - pymysql = pytest.importorskip("pymysql") - pymysql.connect(host="localhost", user="root", passwd="", db="pandas") - try: - pymysql.connect(read_default_group="pandas") - except pymysql.ProgrammingError as err: - raise RuntimeError( - "Create a group of connection parameters under the heading " - "[pandas] in your system's mysql default file, " - "typically located at ~/.my.cnf or /etc/.my.cnf." - ) from err - except pymysql.Error as err: - raise RuntimeError( - "Cannot connect to database. " - "Create a group of connection parameters under the heading " - "[pandas] in your system's mysql default file, " - "typically located at ~/.my.cnf or /etc/.my.cnf." - ) from err - - self.method = request.function - - def test_basic(self): - frame = tm.makeTimeDataFrame() - self._check_roundtrip(frame) - - def test_write_row_by_row(self): - frame = tm.makeTimeDataFrame() - frame.iloc[0, 0] = np.nan - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = sql.get_schema(frame, "test") - cur = self.conn.cursor() - cur.execute(drop_sql) - cur.execute(create_sql) - ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" - for _, row in frame.iterrows(): - fmt_sql = format_query(ins, *row) - tquery(fmt_sql, con=self.conn) - - self.conn.commit() - - result = sql.read_sql("select * from test", con=self.conn) - result.index = frame.index - tm.assert_frame_equal(result, frame, rtol=1e-3) - # GH#32571 result comes back rounded to 6 digits in some builds; - # no obvious pattern - - def test_chunksize_read_type(self): - frame = tm.makeTimeDataFrame() - frame.index.name = "index" - drop_sql = "DROP TABLE IF EXISTS test" - cur = self.conn.cursor() - cur.execute(drop_sql) - sql.to_sql(frame, name="test", con=self.conn) - query = "select * from test" - chunksize = 5 - chunk_gen = read_sql_query( - sql=query, con=self.conn, chunksize=chunksize, index_col="index" - ) - chunk_df = next(chunk_gen) - tm.assert_frame_equal(frame[:chunksize], chunk_df) - - def test_execute(self): - frame = tm.makeTimeDataFrame() - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = sql.get_schema(frame, "test") - cur = self.conn.cursor() - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", "Unknown table.*") - cur.execute(drop_sql) - cur.execute(create_sql) - ins = "INSERT INTO test VALUES (%s, %s, %s, %s)" - - row = frame.iloc[0].values.tolist() - sql.execute(ins, self.conn, params=tuple(row)) - self.conn.commit() - - result = sql.read_sql("select * from test", self.conn) - result.index = frame.index[:1] - tm.assert_frame_equal(result, frame[:1]) - - def test_schema(self): - frame = tm.makeTimeDataFrame() - create_sql = sql.get_schema(frame, "test") - lines = create_sql.splitlines() - for line in lines: - tokens = line.split(" ") - if len(tokens) == 2 and tokens[0] == "A": - assert tokens[1] == "DATETIME" - - frame = tm.makeTimeDataFrame() - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = sql.get_schema(frame, "test", keys=["A", "B"]) - lines = create_sql.splitlines() - assert "PRIMARY KEY (`A`, `B`)" in create_sql - cur = self.conn.cursor() - cur.execute(drop_sql) - cur.execute(create_sql) - - def test_execute_fail(self): - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = """ - CREATE TABLE test - ( - a TEXT, - b TEXT, - c REAL, - PRIMARY KEY (a(5), b(5)) - ); - """ - cur = self.conn.cursor() - cur.execute(drop_sql) - cur.execute(create_sql) - - sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn) - sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.conn) - - with pytest.raises(Exception, match=""): - sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn) - - def test_execute_closed_connection(self, request, datapath): - drop_sql = "DROP TABLE IF EXISTS test" - create_sql = """ - CREATE TABLE test - ( - a TEXT, - b TEXT, - c REAL, - PRIMARY KEY (a(5), b(5)) - ); - """ - cur = self.conn.cursor() - cur.execute(drop_sql) - cur.execute(create_sql) - - sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn) - self.conn.close() - - with pytest.raises(Exception, match=""): - tquery("select * from test", con=self.conn) - - # Initialize connection again (needed for tearDown) - self.setup_method(request, datapath) - - def test_na_roundtrip(self): - pass - - def _check_roundtrip(self, frame): - drop_sql = "DROP TABLE IF EXISTS test_table" - cur = self.conn.cursor() - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", "Unknown table.*") - cur.execute(drop_sql) - sql.to_sql(frame, name="test_table", con=self.conn, index=False) - result = sql.read_sql("select * from test_table", self.conn) - - # HACK! Change this once indexes are handled properly. - result.index = frame.index - result.index.name = frame.index.name - - expected = frame - tm.assert_frame_equal(result, expected) - - frame["txt"] = ["a"] * len(frame) - frame2 = frame.copy() - index = Index(np.arange(len(frame2))) + 10 - frame2["Idx"] = index - drop_sql = "DROP TABLE IF EXISTS test_table2" - cur = self.conn.cursor() - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", "Unknown table.*") - cur.execute(drop_sql) - sql.to_sql(frame2, name="test_table2", con=self.conn, index=False) - result = sql.read_sql("select * from test_table2", self.conn, index_col="Idx") - expected = frame.copy() - - # HACK! Change this once indexes are handled properly. - expected.index = index - expected.index.names = result.index.names - tm.assert_frame_equal(expected, result) - - def test_keyword_as_column_names(self): - df = DataFrame({"From": np.ones(5)}) - sql.to_sql( - df, con=self.conn, name="testkeywords", if_exists="replace", index=False - ) - - def test_if_exists(self): - df_if_exists_1 = DataFrame({"col1": [1, 2], "col2": ["A", "B"]}) - df_if_exists_2 = DataFrame({"col1": [3, 4, 5], "col2": ["C", "D", "E"]}) - table_name = "table_if_exists" - sql_select = f"SELECT * FROM {table_name}" - - def clean_up(test_table_to_drop): - """ - Drops tables created from individual tests - so no dependencies arise from sequential tests - """ - self.drop_table(test_table_to_drop) - - # test if invalid value for if_exists raises appropriate error - with pytest.raises(ValueError, match=""): - sql.to_sql( - frame=df_if_exists_1, - con=self.conn, - name=table_name, - if_exists="notvalidvalue", - ) - clean_up(table_name) - - # test if_exists='fail' - sql.to_sql( - frame=df_if_exists_1, - con=self.conn, - name=table_name, - if_exists="fail", - index=False, - ) - with pytest.raises(ValueError, match=""): - sql.to_sql( - frame=df_if_exists_1, con=self.conn, name=table_name, if_exists="fail" - ) - - # test if_exists='replace' - sql.to_sql( - frame=df_if_exists_1, - con=self.conn, - name=table_name, - if_exists="replace", - index=False, - ) - assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")] - sql.to_sql( - frame=df_if_exists_2, - con=self.conn, - name=table_name, - if_exists="replace", - index=False, - ) - assert tquery(sql_select, con=self.conn) == [(3, "C"), (4, "D"), (5, "E")] - clean_up(table_name) + self.drop_table(table_name) # test if_exists='append' sql.to_sql( @@ -3138,4 +2820,4 @@ def clean_up(test_table_to_drop): (4, "D"), (5, "E"), ] - clean_up(table_name) + self.drop_table(table_name)