From ad344588654357a9d67520194768d06bdd876849 Mon Sep 17 00:00:00 2001 From: tamirmich Date: Tue, 11 Jul 2023 16:09:31 +0300 Subject: [PATCH] change bigqueue dep, add clear method, add tests (#67) * change bigqueue dep, add clear method, add tests * update plugin build version * readme update --- README.md | 16 +- logzio-sender-test/pom.xml | 2 +- logzio-sender/pom.xml | 28 +-- .../main/java/io/logz/sender/DiskQueue.java | 6 +- .../java/io/logz/sender/InMemoryQueue.java | 5 + .../main/java/io/logz/sender/LogsQueue.java | 2 + .../java/io/logz/sender/LogzioSender.java | 4 + .../java/io/logz/sender/DiskQueueTest.java | 44 +++-- .../io/logz/sender/InMemoryQueueTest.java | 32 ++-- .../java/io/logz/sender/LogzioSenderTest.java | 164 +++++------------- .../io/logz/sender/LogzioTestSenderUtil.java | 1 - pom.xml | 21 ++- 12 files changed, 142 insertions(+), 183 deletions(-) diff --git a/README.md b/README.md index 6a90f14..b0e3439 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ This appender uses [BigQueue](https://github.com/bulldog2011/bigqueue) implement If you use Gradle, add the dependency to your project as follows: ```java -implementation 'io.logz.sender:logzio-java-sender:1.1.7' +implementation 'io.logz.sender:logzio-java-sender:2.0.0' ``` ### Parameters @@ -123,19 +123,21 @@ public class LogzioSenderExample { ### Release notes + - 2.0.0 - **THIS IS A SNAPSHOT RELEASE - SUPPORTED WITH JDK 11 AND ABOVE** + - Replaced `BigQueue` module: + - Fixes an issue where DiskQueue was not clearing disk space when using JDK 11 and above. + - Added `clear()` to `LogzioSender` - enables to clear the disk/in memory queue on demand. - 1.1.8 - Fix an issue where log is not being truncated properly between size of 32.7k to 500k. - - 1.1.7 - - replaced bigqueue dependency to a better maintained module + + +
+ Expand to check old versions - 1.1.7 - replaced bigqueue dependency to a better maintained module - 1.1.5 - dependency bumps - validation and handling for oversized logs with exceedMaxSizeAction parameter - -
- Expand to check old versions - - 1.1.2 - LogsQueue interface is now public - 1.1.1 diff --git a/logzio-sender-test/pom.xml b/logzio-sender-test/pom.xml index 229d1a4..ef0b60a 100644 --- a/logzio-sender-test/pom.xml +++ b/logzio-sender-test/pom.xml @@ -5,7 +5,7 @@ logzio-java-sender io.logz.sender - 1.1.8 + 2.0.0 4.0.0 diff --git a/logzio-sender/pom.xml b/logzio-sender/pom.xml index 207b234..33e1d50 100644 --- a/logzio-sender/pom.xml +++ b/logzio-sender/pom.xml @@ -5,12 +5,18 @@ logzio-java-sender io.logz.sender - 1.1.8 + 2.0.0 4.0.0 logzio-sender - + + + ikasaneip-snapshots + Ikasan EIP Snapshot Repository + https://oss.sonatype.org/content/repositories/snapshots/ + + @@ -30,8 +36,7 @@ true - org.kairosdb:metrics4j - org.kairosdb:bigqueue + org.ikasan:bigqueue com.google.guava:guava com.google.code.gson:gson @@ -47,12 +52,8 @@ io.logz.sender.com.google.gson - org.kairosdb.bigqueue - io.logz.sender.org.kairosdb.bigqueue - - - org.kairosdb.metrics4j - io.logz.sender.org.kairosdb.metrics4j + org.ikasan.bigqueue + io.logz.sender.org.ikasan.bigqueue @@ -69,8 +70,8 @@ org.apache.maven.plugins maven-compiler-plugin - 8 - 8 + 11 + 11 @@ -78,8 +79,9 @@ - org.kairosdb + org.ikasan bigqueue + 1.0.0-SNAPSHOT org.junit.jupiter diff --git a/logzio-sender/src/main/java/io/logz/sender/DiskQueue.java b/logzio-sender/src/main/java/io/logz/sender/DiskQueue.java index b47f9ef..6ed4489 100644 --- a/logzio-sender/src/main/java/io/logz/sender/DiskQueue.java +++ b/logzio-sender/src/main/java/io/logz/sender/DiskQueue.java @@ -1,6 +1,6 @@ package io.logz.sender; -import org.kairosdb.bigqueue.BigQueueImpl; +import org.ikasan.bigqueue.BigQueueImpl; import io.logz.sender.exceptions.LogzioParameterErrorException; import java.io.File; @@ -104,6 +104,10 @@ private void gcBigQueue() { } } + public void clear() throws IOException { + logsQueue.removeAll(); + } + @Override public void close() { gcBigQueue(); diff --git a/logzio-sender/src/main/java/io/logz/sender/InMemoryQueue.java b/logzio-sender/src/main/java/io/logz/sender/InMemoryQueue.java index 3597b8b..5826aad 100644 --- a/logzio-sender/src/main/java/io/logz/sender/InMemoryQueue.java +++ b/logzio-sender/src/main/java/io/logz/sender/InMemoryQueue.java @@ -63,6 +63,11 @@ public boolean isEmpty() { return logsBuffer.isEmpty(); } + @Override + public void clear() { + logsBuffer.clear(); + } + private boolean isEnoughSpace() { if (!dontCheckEnoughMemorySpace && size >= capacityInBytes) { reporter.warning(String.format("Logz.io: Dropping logs - we crossed the memory threshold of %d MB", diff --git a/logzio-sender/src/main/java/io/logz/sender/LogsQueue.java b/logzio-sender/src/main/java/io/logz/sender/LogsQueue.java index c718296..303dac8 100644 --- a/logzio-sender/src/main/java/io/logz/sender/LogsQueue.java +++ b/logzio-sender/src/main/java/io/logz/sender/LogsQueue.java @@ -2,9 +2,11 @@ import java.io.Closeable; +import java.io.IOException; public interface LogsQueue extends Closeable { void enqueue(byte[] log); byte[] dequeue(); boolean isEmpty(); + void clear() throws IOException; } \ No newline at end of file diff --git a/logzio-sender/src/main/java/io/logz/sender/LogzioSender.java b/logzio-sender/src/main/java/io/logz/sender/LogzioSender.java index 844b43a..504ea8c 100644 --- a/logzio-sender/src/main/java/io/logz/sender/LogzioSender.java +++ b/logzio-sender/src/main/java/io/logz/sender/LogzioSender.java @@ -140,6 +140,10 @@ public void drainQueueAndSend() { } } + public void clearQueue() throws IOException { + this.logsQueue.clear(); + } + public void send(JsonObject jsonMessage) { // check for oversized message diff --git a/logzio-sender/src/test/java/io/logz/sender/DiskQueueTest.java b/logzio-sender/src/test/java/io/logz/sender/DiskQueueTest.java index 4d6a85c..d0b2f79 100644 --- a/logzio-sender/src/test/java/io/logz/sender/DiskQueueTest.java +++ b/logzio-sender/src/test/java/io/logz/sender/DiskQueueTest.java @@ -1,14 +1,16 @@ package io.logz.sender; +import com.google.gson.JsonObject; import io.logz.sender.LogzioSender.Builder; import io.logz.sender.exceptions.LogzioParameterErrorException; import io.logz.test.TestEnvironment; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -23,9 +25,9 @@ public class DiskQueueTest extends LogzioSenderTest { @Override protected Builder getLogzioSenderBuilder(String token, String type, Integer drainTimeout, - Integer socketTimeout, Integer serverTimeout, - ScheduledExecutorService tasks, - boolean compressRequests) throws LogzioParameterErrorException { + Integer socketTimeout, Integer serverTimeout, + ScheduledExecutorService tasks, + boolean compressRequests) throws LogzioParameterErrorException { Builder logzioSenderBuilder = super.getLogzioSenderBuilder(token, type, drainTimeout, socketTimeout, serverTimeout, tasks, compressRequests); @@ -33,11 +35,12 @@ protected Builder getLogzioSenderBuilder(String token, String type, Integer drai queueDir = TestEnvironment.createTempDirectory(); queueDir.deleteOnExit(); } + return logzioSenderBuilder .withDiskQueue() - .setQueueDir(queueDir) - .setFsPercentThreshold(FS_PERCENT_THRESHOLD) - .setCheckDiskSpaceInterval(1000) + .setQueueDir(queueDir) + .setFsPercentThreshold(FS_PERCENT_THRESHOLD) + .setCheckDiskSpaceInterval(1000) .endDiskQueue(); } @@ -49,7 +52,7 @@ protected void setZeroThresholdQueue(LogzioSender.Builder logzioSenderBuilder) { private void setFsPercentThreshold(LogzioSender.Builder logzioSenderBuilder, int fsPercentThreshold) { logzioSenderBuilder .withDiskQueue() - .setFsPercentThreshold(fsPercentThreshold) + .setFsPercentThreshold(fsPercentThreshold) .endDiskQueue(); } @@ -70,9 +73,10 @@ public void testSenderCantWriteToEmptyDirectory() { 10 * 1000, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); throw new LogzioParameterErrorException("Should not reach here", "fail"); - } catch(LogzioParameterErrorException | IOException e) { + } catch (LogzioParameterErrorException | IOException e) { assertTrue(e.getMessage().contains(tempDirectory.getAbsolutePath())); } + assertTrue(tempDirectory.exists()); tempDirectory.delete(); tasks.shutdownNow(); @@ -88,16 +92,30 @@ public void testSenderCreatesDirectoryWhichDoesNotExists() throws Exception { File queueDir = new File(tempDirectory, "dirWhichDoesNotExists"); String message1 = "Just sending something - " + random(5); ScheduledExecutorService tasks = Executors.newScheduledThreadPool(3); - assertFalse(queueDir.exists()); setQueueDir(queueDir); Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, 10 * 1000, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); - - testSender.send( createJsonMessage(loggerName, message1)); + testSender.send(createJsonMessage(loggerName, message1)); assertTrue(queueDir.exists()); tempDirectory.delete(); tasks.shutdownNow(); } + + @Test + public void testFilesCleanedFromDisk() throws Exception { + Logger logger = LoggerFactory.getLogger(LogzioSenderTest.class); + ScheduledExecutorService tasks = Executors.newScheduledThreadPool(1); + LogzioTestStatusReporter logy = new LogzioTestStatusReporter(logger); + File tempDirectory = TestEnvironment.createTempDirectory(); + File queueDir = new File(tempDirectory, "testFilesDeletion"); + DiskQueue diskQueue = LogzioSender.builder().withDiskQueue().setGcPersistedQueueFilesIntervalSeconds(1).setQueueDir(queueDir).setDiskSpaceTasks(tasks).setReporter(logy).build(); + JsonObject testMessage = createJsonMessage("testFilesDeleted", "testMessage"); + diskQueue.enqueue(testMessage.toString().getBytes(StandardCharsets.UTF_8)); + File dataFile = new File(queueDir.getAbsolutePath() + "/data/page-0.dat"); + assertTrue(dataFile.length() > 0); + diskQueue.clear(); + assertTrue(dataFile.length() == 0); + } } diff --git a/logzio-sender/src/test/java/io/logz/sender/InMemoryQueueTest.java b/logzio-sender/src/test/java/io/logz/sender/InMemoryQueueTest.java index c76ae75..67369d2 100644 --- a/logzio-sender/src/test/java/io/logz/sender/InMemoryQueueTest.java +++ b/logzio-sender/src/test/java/io/logz/sender/InMemoryQueueTest.java @@ -3,16 +3,16 @@ import com.google.gson.JsonObject; import io.logz.sender.LogzioSender.Builder; import io.logz.sender.exceptions.LogzioParameterErrorException; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static io.logz.sender.LogzioTestSenderUtil.createJsonMessage; +import static org.junit.jupiter.api.Assertions.assertTrue; public class InMemoryQueueTest extends LogzioSenderTest { private final static long defaultCapacityInBytes = 100 * 1024 * 1024; @@ -22,10 +22,8 @@ protected Builder getLogzioSenderBuilder(String token, String type, Integer drai Integer socketTimeout, Integer serverTimeout, ScheduledExecutorService tasks, boolean compressRequests) throws LogzioParameterErrorException { - Builder logzioSenderBuilder = super.getLogzioSenderBuilder(token, type, drainTimeout, socketTimeout, serverTimeout, tasks, compressRequests); - setCapacityInBytes(logzioSenderBuilder, defaultCapacityInBytes); return logzioSenderBuilder; } @@ -56,19 +54,14 @@ public void checkCapacityReachedToSizeBelowCapacity() throws LogzioParameterErro String loggerName = "checkCrossCapacityInBytesName"; int drainTimeout = 2; int successfulLogs = 3; - String message = "Log before drop - " + random(5); JsonObject log = createJsonMessage(loggerName, message); - int logSize = log.toString().getBytes(StandardCharsets.UTF_8).length; ScheduledExecutorService tasks = Executors.newScheduledThreadPool(3); - Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, 10 * 1000, tasks, false); setCapacityInBytes(testSenderBuilder, logSize * successfulLogs); - LogzioSender testSender = createLogzioSender(testSenderBuilder); - sleepSeconds(drainTimeout - 1); for (int i = 0; i <= successfulLogs; i++) { testSender.send(log); @@ -76,7 +69,6 @@ public void checkCapacityReachedToSizeBelowCapacity() throws LogzioParameterErro sleepSeconds(2 * drainTimeout); mockListener.assertNumberOfReceivedMsgs(successfulLogs); - sleepSeconds(2 * drainTimeout); testSender.send(log); sleepSeconds(2 * drainTimeout); @@ -91,18 +83,13 @@ public void checkLogMessageCountLimitWithCapacityInBytes() throws LogzioParamete String loggerName = "checkLogMessageCountLimitOnly"; int drainTimeout = 2; int successfulLogs = 3; - String message = "Log before drop - " + random(5); JsonObject log = createJsonMessage(loggerName, message); - ScheduledExecutorService tasks = Executors.newScheduledThreadPool(3); - Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, 10 * 1000, tasks, false); setLogsCountLimit(testSenderBuilder, successfulLogs); - LogzioSender testSender = createLogzioSender(testSenderBuilder); - sleepSeconds(drainTimeout - 1); for (int i = 0; i <= successfulLogs; i++) { testSender.send(log); @@ -110,7 +97,6 @@ public void checkLogMessageCountLimitWithCapacityInBytes() throws LogzioParamete sleepSeconds(2 * drainTimeout); mockListener.assertNumberOfReceivedMsgs(successfulLogs); - sleepSeconds(2 * drainTimeout); testSender.send(log); sleepSeconds(2 * drainTimeout); @@ -118,6 +104,16 @@ public void checkLogMessageCountLimitWithCapacityInBytes() throws LogzioParamete tasks.shutdownNow(); } - + @Test + public void testFilesCleanedFromQueue() { + Logger logger = LoggerFactory.getLogger(LogzioSenderTest.class); + LogzioTestStatusReporter logy = new LogzioTestStatusReporter(logger); + InMemoryQueue inMemoryQueue = LogzioSender.builder().withInMemoryQueue().setReporter(logy).build(); + JsonObject testMessage = createJsonMessage("testFilesDeleted", "testMessage"); + inMemoryQueue.enqueue(testMessage.toString().getBytes(StandardCharsets.UTF_8)); + assertTrue(!inMemoryQueue.isEmpty()); + inMemoryQueue.clear(); + assertTrue(inMemoryQueue.isEmpty()); + } } diff --git a/logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java b/logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java index 2265fa4..71bf237 100644 --- a/logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java +++ b/logzio-sender/src/test/java/io/logz/sender/LogzioSenderTest.java @@ -41,8 +41,8 @@ public void startListenerAndExecutors() throws Exception { @AfterEach public void stopListenerAndExecutors() { if (mockListener != null) - mockListener.stop(); - if (tasks != null){ + mockListener.stop(); + if (tasks != null) { tasks.shutdownNow(); } } @@ -60,25 +60,24 @@ protected LogzioSender.Builder getLogzioSenderBuilder(String token, String type, Integer socketTimeout, Integer serverTimeout, ScheduledExecutorService tasks, boolean compressRequests) - throws LogzioParameterErrorException{ - LogzioTestStatusReporter logy = new LogzioTestStatusReporter(logger); - HttpsRequestConfiguration httpsRequestConfiguration = HttpsRequestConfiguration - .builder() - .setCompressRequests(compressRequests) - .setConnectTimeout(serverTimeout) - .setSocketTimeout(socketTimeout) - .setLogzioToken(token) - .setLogzioType(type) - .setLogzioListenerUrl("http://" + mockListener.getHost() + ":" + mockListener.getPort()) - .build(); - - return LogzioSender - .builder() - .setDebug(false) - .setTasksExecutor(tasks) - .setDrainTimeoutSec(drainTimeout) - .setReporter(logy) - .setHttpsRequestConfiguration(httpsRequestConfiguration); + throws LogzioParameterErrorException { + LogzioTestStatusReporter logy = new LogzioTestStatusReporter(logger); + HttpsRequestConfiguration httpsRequestConfiguration = HttpsRequestConfiguration + .builder() + .setCompressRequests(compressRequests) + .setConnectTimeout(serverTimeout) + .setSocketTimeout(socketTimeout) + .setLogzioToken(token) + .setLogzioType(type) + .setLogzioListenerUrl("http://" + mockListener.getHost() + ":" + mockListener.getPort()) + .build(); + return LogzioSender + .builder() + .setDebug(false) + .setTasksExecutor(tasks) + .setDrainTimeoutSec(drainTimeout) + .setReporter(logy) + .setHttpsRequestConfiguration(httpsRequestConfiguration); } protected LogzioSender createLogzioSender(LogzioSender.Builder logzioSenderBuilder) throws LogzioParameterErrorException, IOException { @@ -90,7 +89,7 @@ protected LogzioSender createLogzioSender(LogzioSender.Builder logzioSenderBuild protected abstract void setZeroThresholdQueue(LogzioSender.Builder logzioSenderBuilder); protected String random(int numberOfChars) { - return UUID.randomUUID().toString().substring(0, numberOfChars-1); + return UUID.randomUUID().toString().substring(0, numberOfChars - 1); } @Test @@ -99,18 +98,14 @@ public void simpleAppending() throws Exception { String type = random(8); String loggerName = "simpleAppending"; int drainTimeout = 2; - String message1 = "Testing.." + random(5); String message2 = "Warning test.." + random(5); - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, - 10 * 1000, 10 * 1000, tasks,false); + 10 * 1000, 10 * 1000, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); - - testSender.send( createJsonMessage(loggerName, message1)); - testSender.send( createJsonMessage(loggerName, message2)); - sleepSeconds(drainTimeout *3); - + testSender.send(createJsonMessage(loggerName, message1)); + testSender.send(createJsonMessage(loggerName, message2)); + sleepSeconds(drainTimeout * 3); mockListener.assertNumberOfReceivedMsgs(2); mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); @@ -122,19 +117,15 @@ public void malformedBulk() throws Exception { String type = random(8); String loggerName = "malformedBulk"; int drainTimeout = 1; - String message1 = "Testing.." + random(5); String message2 = "Warning test.." + random(5); - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, - 10 * 1000, 10 * 1000, tasks,false); + 10 * 1000, 10 * 1000, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); - testSender.send(createJsonMessage(loggerName, message1)); testSender.send(createJsonMessage(loggerName, message2)); testSender.send("bug".getBytes(StandardCharsets.UTF_8)); sleepSeconds(drainTimeout * 5); - mockListener.assertNumberOfReceivedMsgs(2); mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); @@ -147,18 +138,14 @@ public void simpleByteArrayAppending() throws Exception { String type = random(8); String loggerName = "simpleByteArrayAppending"; int drainTimeout = 2; - String message1 = "Testing.." + random(5); String message2 = "Warning test.." + random(5); - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, - 10 * 1000, 10 * 1000, tasks,false); + 10 * 1000, 10 * 1000, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); - - testSender.send( createJsonMessage(loggerName, message1).toString().getBytes(StandardCharsets.UTF_8)); - testSender.send( createJsonMessage(loggerName, message2).toString().getBytes(StandardCharsets.UTF_8)); - sleepSeconds(drainTimeout *3); - + testSender.send(createJsonMessage(loggerName, message1).toString().getBytes(StandardCharsets.UTF_8)); + testSender.send(createJsonMessage(loggerName, message2).toString().getBytes(StandardCharsets.UTF_8)); + sleepSeconds(drainTimeout * 3); mockListener.assertNumberOfReceivedMsgs(2); mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); @@ -170,18 +157,14 @@ public void simpleGzipAppending() throws Exception { String type = random(8); String loggerName = "simpleGzipAppending"; int drainTimeout = 2; - String message1 = "Testing.." + random(5); String message2 = "Warning test.." + random(5); - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, - 10 * 1000, tasks, true); + 10 * 1000, tasks, true); LogzioSender testSender = createLogzioSender(testSenderBuilder); - - testSender.send( createJsonMessage(loggerName, message1)); - testSender.send( createJsonMessage(loggerName, message2)); - sleepSeconds(drainTimeout *3); - + testSender.send(createJsonMessage(loggerName, message1)); + testSender.send(createJsonMessage(loggerName, message2)); + sleepSeconds(drainTimeout * 3); mockListener.assertNumberOfReceivedMsgs(2); mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); @@ -193,23 +176,17 @@ public void multipleQueueDrains() throws Exception { String type = random(8); String loggerName = "multipleQueueDrains"; int drainTimeout = 2; - String message1 = "Testing first drain - " + random(5); String message2 = "And the second drain" + random(5); - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, 10 * 1000, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); - - testSender.send(createJsonMessage( loggerName, message1)); + testSender.send(createJsonMessage(loggerName, message1)); sleepSeconds(2 * drainTimeout); - mockListener.assertNumberOfReceivedMsgs(1); mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); - testSender.send(createJsonMessage(loggerName, message2)); sleepSeconds(2 * drainTimeout); - mockListener.assertNumberOfReceivedMsgs(2); mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); } @@ -220,20 +197,15 @@ public void longDrainTimeout() throws Exception { String type = random(8); String loggerName = "longDrainTimeout"; int drainTimeout = 10; - String message1 = "Sending one log - " + random(5); String message2 = "And one more important one - " + random(5); - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, 10 * 1000, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); - testSender.send(createJsonMessage(loggerName, message1)); testSender.send(createJsonMessage(loggerName, message2)); - mockListener.assertNumberOfReceivedMsgs(0); sleepSeconds(drainTimeout + 1); - mockListener.assertNumberOfReceivedMsgs(2); mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); @@ -249,12 +221,10 @@ public void fsPercentDrop() throws Exception { tempDirectoryThatWillBeInTheSameFsAsTheQueue.deleteOnExit(); String message1 = "First log that will be dropped - " + random(5); String message2 = "And a second drop - " + random(5); - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeoutSec, 10 * 1000, 10 * 1000, tasks, false); setZeroThresholdQueue(testSenderBuilder); LogzioSender testSender = createLogzioSender(testSenderBuilder); - // verify the thread that checks for space made at least one check sleepSeconds(2 * drainTimeoutSec); testSender.send(createJsonMessage(loggerName, message1)); @@ -270,35 +240,23 @@ public void serverCrash() throws Exception { String type = random(8); String loggerName = "serverCrash"; int drainTimeout = 1; - String message1 = "Log before drop - " + random(5); String message2 = "Log during drop - " + random(5); String message3 = "Log after drop - " + random(5); - - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, + LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, 10 * 1000, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); - testSender.send(createJsonMessage(loggerName, message1)); - sleepSeconds(2 * drainTimeout); - mockListener.assertNumberOfReceivedMsgs(1); mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); - mockListener.stop(); - testSender.send(createJsonMessage(loggerName, message2)); sleepSeconds(2 * drainTimeout); - mockListener.assertNumberOfReceivedMsgs(1); // haven't changed - still 1 - mockListener.start(); - testSender.send(createJsonMessage(loggerName, message3)); - sleepSeconds(2 * drainTimeout); - mockListener.assertNumberOfReceivedMsgs(3); mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); mockListener.assertLogReceivedIs(message3, token, type, loggerName, LOGLEVEL); @@ -311,38 +269,24 @@ public void getTimeoutFromServer() throws Exception { String loggerName = "getTimeoutFromServer"; int drainTimeout = 1; int serverTimeout = 2000; - String message1 = "Log that will be sent - " + random(5); String message2 = "Log that would timeout and then being re-sent - " + random(5); - int socketTimeout = serverTimeout / 2; - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, socketTimeout, serverTimeout, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); - testSender.send(createJsonMessage(loggerName, message1)); - sleepSeconds(2 * drainTimeout); - mockListener.assertNumberOfReceivedMsgs(1); mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); - mockListener.setTimeoutMillis(serverTimeout); mockListener.setServerTimeoutMode(true); - testSender.send(createJsonMessage(loggerName, message2)); - sleepSeconds((socketTimeout / 1000) * MAX_RETRIES_ATTEMPTS + retryTotalDelay()); - mockListener.assertNumberOfReceivedMsgs(1); // Stays the same - mockListener.setServerTimeoutMode(false); - sleepSeconds(2 * drainTimeout); - mockListener.assertNumberOfReceivedMsgs(2); - mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); } @@ -354,7 +298,6 @@ private int retryTotalDelay() { sleepBetweenRetry *= 2; } return totalSleepTime; - } @Test @@ -363,29 +306,21 @@ public void getExceptionFromServer() throws Exception { String type = random(8); String loggerName = "getExceptionFromServer"; int drainTimeout = 1; - - String message1 = "Log that will be sent - " + random(5); + String message1 = "Log that will be sent - " + random(5); String message2 = "Log that would get exception and be sent again - " + random(5); - LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, 10 * 1000, tasks, false); LogzioSender testSender = createLogzioSender(testSenderBuilder); - testSender.send(createJsonMessage(loggerName, message1)); sleepSeconds(2 * drainTimeout); - mockListener.assertNumberOfReceivedMsgs(1); mockListener.assertLogReceivedIs(message1, token, type, loggerName, LOGLEVEL); mockListener.setFailWithServerError(true); - testSender.send(createJsonMessage(loggerName, message2)); sleepSeconds(2 * drainTimeout); - mockListener.assertNumberOfReceivedMsgs(1); // Haven't changed mockListener.setFailWithServerError(false); - Thread.sleep(drainTimeout * 1000 * 2); - mockListener.assertNumberOfReceivedMsgs(2); mockListener.assertLogReceivedIs(message2, token, type, loggerName, LOGLEVEL); } @@ -396,14 +331,11 @@ public void checkExceedingMaxSizeJsonLogWithCut() throws LogzioParameterErrorExc String type = random(8); String loggerName = "checkExceedingMaxSizeJsonLogWithCutName"; int drainTimeout = 2; - String message = new String(Files.readAllBytes(Paths.get(EXCEEDING_LOG_FILE_PATH)), StandardCharsets.UTF_8); JsonObject log = createJsonMessage(loggerName, message); - int logSize = log.toString().getBytes(StandardCharsets.UTF_8).length; ScheduledExecutorService tasks = Executors.newScheduledThreadPool(1); - LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks,"cut"); - + LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks, "cut"); testSender.send(log); sleepSeconds(2 * drainTimeout); mockListener.assertLogReceivedByMessage(message.substring(0, MAX_LOG_LINE_SIZE_IN_BYTES - TRUNCATED_MESSAGE_SUFFIX.length()) + TRUNCATED_MESSAGE_SUFFIX); @@ -416,14 +348,11 @@ public void checkExceedingMaxSizeJsonMessagFieldeWithCut() throws LogzioParamete String type = random(8); String loggerName = "checkExceedingMaxSizeJsonLogWithCutName"; int drainTimeout = 2; - String message = new String(Files.readAllBytes(Paths.get(EXCEEDING_MESSAGE_FIELD_FILE_PATH)), StandardCharsets.UTF_8); JsonObject log = createJsonMessage(loggerName, message); - int logSize = log.toString().getBytes(StandardCharsets.UTF_8).length; ScheduledExecutorService tasks = Executors.newScheduledThreadPool(1); - LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks,"cut"); - + LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks, "cut"); testSender.send(log); sleepSeconds(2 * drainTimeout); mockListener.assertLogReceivedByMessage(message.substring(0, MAX_LOG_LINE_SIZE_IN_BYTES - TRUNCATED_MESSAGE_SUFFIX.length()) + TRUNCATED_MESSAGE_SUFFIX); @@ -436,34 +365,28 @@ public void checkExceedingMaxSizeBytesLogWithCut() throws LogzioParameterErrorEx String type = random(8); String loggerName = "checkExceedingMaxSizeBytesLogWithCutName"; int drainTimeout = 2; - String message = new String(Files.readAllBytes(Paths.get(EXCEEDING_LOG_FILE_PATH)), StandardCharsets.UTF_8); JsonObject log = createJsonMessage(loggerName, message); - int logSize = log.toString().getBytes(StandardCharsets.UTF_8).length; ScheduledExecutorService tasks = Executors.newScheduledThreadPool(1); - - LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks,"cut"); + LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks, "cut"); testSender.send(log.toString().getBytes(StandardCharsets.UTF_8)); sleepSeconds(2 * drainTimeout); mockListener.assertLogReceivedByMessage(message.substring(0, MAX_LOG_LINE_SIZE_IN_BYTES - TRUNCATED_MESSAGE_SUFFIX.length()) + TRUNCATED_MESSAGE_SUFFIX); tasks.shutdownNow(); } - @Test public void checkExceedingMaxSizeJsonLogWithDrop() throws LogzioParameterErrorException, IOException { String token = "checkExceedingMaxSizeJsonLogWithDrop"; String type = random(8); String loggerName = "checkExceedingMaxSizeJsonLogWithDropName"; int drainTimeout = 2; - String message = new String(Files.readAllBytes(Paths.get(EXCEEDING_LOG_FILE_PATH)), StandardCharsets.UTF_8); JsonObject log = createJsonMessage(loggerName, message); - int logSize = log.toString().getBytes(StandardCharsets.UTF_8).length; ScheduledExecutorService tasks = Executors.newScheduledThreadPool(1); - LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks,"drop"); + LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks, "drop"); testSender.send(log); sleepSeconds(2 * drainTimeout); mockListener.assertNumberOfReceivedMsgs(0); @@ -476,20 +399,17 @@ public void checkExceedingMaxSizeBytesLogWithDrop() throws LogzioParameterErrorE String type = random(8); String loggerName = "checkExceedingMaxSizeBytesLogWithDropName"; int drainTimeout = 2; - String message = new String(Files.readAllBytes(Paths.get(EXCEEDING_LOG_FILE_PATH)), StandardCharsets.UTF_8); JsonObject log = createJsonMessage(loggerName, message); - int logSize = log.toString().getBytes(StandardCharsets.UTF_8).length; ScheduledExecutorService tasks = Executors.newScheduledThreadPool(1); - LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks,"drop"); + LogzioSender testSender = getLogzioSenderWithAndExceedMaxSizeAction(token, type, drainTimeout, logSize, tasks, "drop"); testSender.send(log.toString().getBytes(StandardCharsets.UTF_8)); sleepSeconds(2 * drainTimeout); mockListener.assertNumberOfReceivedMsgs(0); tasks.shutdownNow(); } - private LogzioSender getLogzioSenderWithAndExceedMaxSizeAction(String token, String type, int drainTimeout, int logSize, ScheduledExecutorService tasks, String exceedMaxSizeAction) throws LogzioParameterErrorException, IOException { LogzioSender.Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000, diff --git a/logzio-sender/src/test/java/io/logz/sender/LogzioTestSenderUtil.java b/logzio-sender/src/test/java/io/logz/sender/LogzioTestSenderUtil.java index 91b851c..bcfa78b 100644 --- a/logzio-sender/src/test/java/io/logz/sender/LogzioTestSenderUtil.java +++ b/logzio-sender/src/test/java/io/logz/sender/LogzioTestSenderUtil.java @@ -14,5 +14,4 @@ public static JsonObject createJsonMessage(String loggerName, String message){ obj.addProperty("logger", loggerName); return obj; } - } diff --git a/pom.xml b/pom.xml index 8504705..7388c21 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.logz.sender logzio-java-sender pom - 1.1.8 + 2.0.0 Logz.io Logs Sender Send your log messages to your logz.io account in an encrypted, non-blocking manner. @@ -22,8 +22,8 @@ - Roi Rav-Hon - roi@logz.io + Tamir Michaeli + tamir.michaeli@logz.io Logz.io http://logz.io @@ -35,6 +35,13 @@ https://github.com/logzio/logzio-java-sender + + + ikasaneip-snapshots + Ikasan EIP Snapshot Repository + https://oss.sonatype.org/content/repositories/snapshots/ + + logzio-sender-test logzio-sender @@ -47,8 +54,8 @@ maven-compiler-plugin 3.5.1 - 1.8 - 1.8 + 11 + 11 @@ -130,9 +137,9 @@ 1.6.4 - org.kairosdb + org.ikasan bigqueue - 1.0.3 + 1.0.0-SNAPSHOT