From 59d356960c5523f183131e774653f99b8a09aa5a Mon Sep 17 00:00:00 2001 From: Yuriy Didukh Date: Fri, 6 Sep 2024 17:00:44 +0300 Subject: [PATCH] Fix of the issue when removal of the last event in the InMemory table led to a broken references chain --- .../holder/SnapshotableStreamEventQueue.java | 3 + .../query/table/DeleteFromTableTestCase.java | 56 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/event/stream/holder/SnapshotableStreamEventQueue.java b/modules/siddhi-core/src/main/java/io/siddhi/core/event/stream/holder/SnapshotableStreamEventQueue.java index ff8e922568..e7ac5f0645 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/event/stream/holder/SnapshotableStreamEventQueue.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/event/stream/holder/SnapshotableStreamEventQueue.java @@ -99,6 +99,9 @@ public void remove() { } if (previousToLastReturned != null) { previousToLastReturned.setNext(lastReturned.getNext()); + if (lastReturned.getNext() == null) { + last = previousToLastReturned; + } } else { first = lastReturned.getNext(); if (first == null) { diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java index 0e6df8284c..0cc8933f7f 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/query/table/DeleteFromTableTestCase.java @@ -334,5 +334,61 @@ public void receive(Event[] events) { } + @Test + public void deleteFromTableTest6() throws InterruptedException { + log.info("deleteFromTableTest6"); + + SiddhiManager siddhiManager = new SiddhiManager(); + + String streams = "" + + "define stream StockStream (symbol string, price float, vol long); " + + "define stream DeleteStockStream (symbol string, price float, vol long); " + + "define stream CountStockStream (symbol string); " + + "define table StockTable (symbol string, price float, volume long); "; + String query = "" + + "@info(name = 'query1') " + + "from StockStream " + + "select symbol, price, vol as volume " + + "insert into StockTable ;" + + "" + + "@info(name = 'query2') " + + "from DeleteStockStream[vol>=100] " + + "delete StockTable " + + " on StockTable.symbol==symbol ;" + + "" + + "@info(name = 'query3') " + + "from CountStockStream#window.length(0) join StockTable" + + " on CountStockStream.symbol==StockTable.symbol " + + "select CountStockStream.symbol as symbol " + + "insert into CountResultsStream ;"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + + InputHandler stockStream = siddhiAppRuntime.getInputHandler("StockStream"); + InputHandler deleteStockStream = siddhiAppRuntime.getInputHandler("DeleteStockStream"); + InputHandler countStockStream = siddhiAppRuntime.getInputHandler("CountStockStream"); + + siddhiAppRuntime.addCallback("CountResultsStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + inEventCount += events.length; + } + }); + siddhiAppRuntime.start(); + + stockStream.send(new Object[]{"WSO2", 55.6f, 100L}); + stockStream.send(new Object[]{"IBM", 75.6f, 100L}); + // Remove last event in the StockTable + deleteStockStream.send(new Object[]{"IBM", 57.6f, 100L}); + + stockStream.send(new Object[]{"WSO2", 57.6f, 100L}); + countStockStream.send(new Object[]{"WSO2"}); + + Thread.sleep(500); + AssertJUnit.assertEquals(2, inEventCount); + siddhiAppRuntime.shutdown(); + + } }