diff --git a/common/configdb.h b/common/configdb.h index 5acba65e..dc5eea52 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -105,8 +105,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native try: (table, row) = key.split(self.TABLE_NAME_SEPARATOR, 1) if table in self.handlers: - client = self.get_redis_client(self.db_name) - data = self.raw_to_typed(client.hgetall(key)) + if item['data'] == 'del': + data = None + else: + client = self.get_redis_client(self.db_name) + data = self.raw_to_typed(client.hgetall(key)) if table in init_data and row in init_data[table]: cache_hit = init_data[table][row] == data del init_data[table][row] diff --git a/tests/test_redis_ut.py b/tests/test_redis_ut.py index 17322b57..2a8de621 100644 --- a/tests/test_redis_ut.py +++ b/tests/test_redis_ut.py @@ -634,46 +634,98 @@ def thread_coming_entry(): def test_ConfigDBInit(): table_name_1 = 'TEST_TABLE_1' table_name_2 = 'TEST_TABLE_2' + table_name_3 = 'TEST_TABLE_3' test_key = 'key1' - test_data = {'field1': 'value1'} - test_data_update = {'field1': 'value2'} + test_data = {'field1': 'value1', 'field2': 'value2'} + + queue = multiprocessing.Queue() manager = multiprocessing.Manager() ret_data = manager.dict() - def test_handler(table, key, data, ret): - ret[table] = {key: data} - - def test_init_handler(data, ret): + def test_handler(table, key, data, ret, q=None): + if table not in ret: + ret[table] = {} + if data is None: + ret[table] = {k: v for k, v in ret[table].items() if k != key} + if q: + q.put(ret[table]) + elif key not in ret[table] or ret[table][key] != data: + ret[table] = {key: data} + if q: + q.put(ret[table]) + + def test_init_handler(data, ret, queue): ret.update(data) + queue.put(ret) - def thread_listen(ret): + def thread_listen(ret, queue): config_db = ConfigDBConnector() config_db.connect(wait_for_init=False) - config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret), + config_db.subscribe(table_name_1, lambda table, key, data: test_handler(table, key, data, ret, queue), fire_init_data=False) - config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret), + config_db.subscribe(table_name_2, lambda table, key, data: test_handler(table, key, data, ret, queue), fire_init_data=True) + config_db.subscribe(table_name_3, lambda table, key, data: test_handler(table, key, data, ret, queue), + fire_init_data=False) - config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret)) + config_db.listen(init_data_handler=lambda data: test_init_handler(data, ret, queue)) config_db = ConfigDBConnector() config_db.connect(wait_for_init=False) client = config_db.get_redis_client(config_db.CONFIG_DB) client.flushdb() - # Init table data - config_db.set_entry(table_name_1, test_key, test_data) - config_db.set_entry(table_name_2, test_key, test_data) + # Prepare unique data per each table to track if correct data are received in the update + table_1_data = {f'{table_name_1}_{k}': v for k, v in test_data.items()} + config_db.set_entry(table_name_1, test_key, table_1_data) + table_2_data = {f'{table_name_2}_{k}': v for k, v in test_data.items()} + config_db.set_entry(table_name_2, test_key, table_2_data) + config_db.set_entry(table_name_3, test_key, {}) - thread = multiprocessing.Process(target=thread_listen, args=(ret_data,)) - thread.start() - time.sleep(5) - thread.terminate() + # Run the listener in a separate process. It is not possible to stop a listener when it is running as a thread. + # When it runs in a separate process we can terminate it with a signal. + listener = multiprocessing.Process(target=thread_listen, args=(ret_data, queue)) + listener.start() - assert ret_data[table_name_1] == {test_key: test_data} - assert ret_data[table_name_2] == {test_key: test_data} + try: + # During the subscription to table 2 'fire_init_data=True' is passed. The callback should be called before the listener. + # Verify that callback is fired before listener initialization. + data = queue.get(timeout=5) + assert data == {test_key: table_2_data} + + # Wait for init data + init_data = queue.get(timeout=5) + + # Verify that all tables initialized correctly + assert init_data[table_name_1] == {test_key: table_1_data} + assert init_data[table_name_2] == {test_key: table_2_data} + assert init_data[table_name_3] == {test_key: {}} + + # Remove one key-value pair from the data. Verify that the entry is updated correctly + table_1_data.popitem() + config_db.set_entry(table_name_1, test_key, table_1_data) + data = queue.get(timeout=5) + assert data == {test_key: table_1_data} + + # Remove all key-value pairs. Verify that the table still contains key + config_db.set_entry(table_name_1, test_key, {}) + data = queue.get(timeout=5) + assert data == {test_key: {}} + + # Remove the key + config_db.set_entry(table_name_1, test_key, None) + data = queue.get(timeout=5) + assert test_key not in data + + # Remove the entry (with no attributes) from the table. + # Verify that the update is received and a callback is called + config_db.set_entry(table_name_3, test_key, None) + table_3_data = queue.get(timeout=5) + assert test_key not in table_3_data + finally: + listener.terminate() def test_DBConnectFailure():