Skip to content

Commit

Permalink
change bigqueue dep, add clear method, add tests (#67)
Browse files Browse the repository at this point in the history
* change bigqueue dep, add clear method, add tests

* update plugin build version

* readme update
  • Loading branch information
tamir-michaeli authored Jul 11, 2023
1 parent 677fd99 commit ad34458
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 183 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ This appender uses [BigQueue](https://github.com/bulldog2011/bigqueue) implement
If you use Gradle, add the dependency to your project as follows:

```java
implementation 'io.logz.sender:logzio-java-sender:1.1.7'
implementation 'io.logz.sender:logzio-java-sender:2.0.0'
```

### Parameters
Expand Down Expand Up @@ -123,19 +123,21 @@ public class LogzioSenderExample {


### Release notes
- 2.0.0 - **THIS IS A SNAPSHOT RELEASE - SUPPORTED WITH JDK 11 AND ABOVE**
- Replaced `BigQueue` module:
- Fixes an issue where DiskQueue was not clearing disk space when using JDK 11 and above.
- Added `clear()` to `LogzioSender` - enables to clear the disk/in memory queue on demand.
- 1.1.8
- Fix an issue where log is not being truncated properly between size of 32.7k to 500k.
- 1.1.7
- replaced bigqueue dependency to a better maintained module


<details>
<summary markdown="span"> Expand to check old versions </summary>
- 1.1.7
- replaced bigqueue dependency to a better maintained module
- 1.1.5
- dependency bumps
- validation and handling for oversized logs with exceedMaxSizeAction parameter

<details>
<summary markdown="span"> Expand to check old versions </summary>

- 1.1.2
- LogsQueue interface is now public
- 1.1.1
Expand Down
2 changes: 1 addition & 1 deletion logzio-sender-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>logzio-java-sender</artifactId>
<groupId>io.logz.sender</groupId>
<version>1.1.8</version>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
28 changes: 15 additions & 13 deletions logzio-sender/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
<parent>
<artifactId>logzio-java-sender</artifactId>
<groupId>io.logz.sender</groupId>
<version>1.1.8</version>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>logzio-sender</artifactId>

<repositories>
<repository>
<id>ikasaneip-snapshots</id>
<name>Ikasan EIP Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
Expand All @@ -30,8 +36,7 @@
<createSourcesJar>true</createSourcesJar>
<artifactSet>
<includes>
<include>org.kairosdb:metrics4j</include>
<include>org.kairosdb:bigqueue</include>
<include>org.ikasan:bigqueue</include>
<include>com.google.guava:guava</include>
<include>com.google.code.gson:gson</include>
</includes>
Expand All @@ -47,12 +52,8 @@
<shadedPattern>io.logz.sender.com.google.gson</shadedPattern>
</relocation>
<relocation>
<pattern>org.kairosdb.bigqueue</pattern>
<shadedPattern>io.logz.sender.org.kairosdb.bigqueue</shadedPattern>
</relocation>
<relocation>
<pattern>org.kairosdb.metrics4j</pattern>
<shadedPattern>io.logz.sender.org.kairosdb.metrics4j</shadedPattern>
<pattern>org.ikasan.bigqueue</pattern>
<shadedPattern>io.logz.sender.org.ikasan.bigqueue</shadedPattern>
</relocation>
</relocations>
<filters>
Expand All @@ -69,17 +70,18 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.kairosdb</groupId>
<groupId>org.ikasan</groupId>
<artifactId>bigqueue</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
6 changes: 5 additions & 1 deletion logzio-sender/src/main/java/io/logz/sender/DiskQueue.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.logz.sender;

import org.kairosdb.bigqueue.BigQueueImpl;
import org.ikasan.bigqueue.BigQueueImpl;
import io.logz.sender.exceptions.LogzioParameterErrorException;

import java.io.File;
Expand Down Expand Up @@ -104,6 +104,10 @@ private void gcBigQueue() {
}
}

public void clear() throws IOException {
logsQueue.removeAll();
}

@Override
public void close() {
gcBigQueue();
Expand Down
5 changes: 5 additions & 0 deletions logzio-sender/src/main/java/io/logz/sender/InMemoryQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public boolean isEmpty() {
return logsBuffer.isEmpty();
}

@Override
public void clear() {
logsBuffer.clear();
}

private boolean isEnoughSpace() {
if (!dontCheckEnoughMemorySpace && size >= capacityInBytes) {
reporter.warning(String.format("Logz.io: Dropping logs - we crossed the memory threshold of %d MB",
Expand Down
2 changes: 2 additions & 0 deletions logzio-sender/src/main/java/io/logz/sender/LogsQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@


import java.io.Closeable;
import java.io.IOException;

public interface LogsQueue extends Closeable {
void enqueue(byte[] log);
byte[] dequeue();
boolean isEmpty();
void clear() throws IOException;
}
4 changes: 4 additions & 0 deletions logzio-sender/src/main/java/io/logz/sender/LogzioSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ public void drainQueueAndSend() {
}
}

public void clearQueue() throws IOException {
this.logsQueue.clear();
}

public void send(JsonObject jsonMessage) {

// check for oversized message
Expand Down
44 changes: 31 additions & 13 deletions logzio-sender/src/test/java/io/logz/sender/DiskQueueTest.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package io.logz.sender;

import com.google.gson.JsonObject;
import io.logz.sender.LogzioSender.Builder;
import io.logz.sender.exceptions.LogzioParameterErrorException;
import io.logz.test.TestEnvironment;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -23,21 +25,22 @@ public class DiskQueueTest extends LogzioSenderTest {

@Override
protected Builder getLogzioSenderBuilder(String token, String type, Integer drainTimeout,
Integer socketTimeout, Integer serverTimeout,
ScheduledExecutorService tasks,
boolean compressRequests) throws LogzioParameterErrorException {
Integer socketTimeout, Integer serverTimeout,
ScheduledExecutorService tasks,
boolean compressRequests) throws LogzioParameterErrorException {
Builder logzioSenderBuilder = super.getLogzioSenderBuilder(token, type, drainTimeout,
socketTimeout, serverTimeout, tasks, compressRequests);

if (queueDir == null) {
queueDir = TestEnvironment.createTempDirectory();
queueDir.deleteOnExit();
}

return logzioSenderBuilder
.withDiskQueue()
.setQueueDir(queueDir)
.setFsPercentThreshold(FS_PERCENT_THRESHOLD)
.setCheckDiskSpaceInterval(1000)
.setQueueDir(queueDir)
.setFsPercentThreshold(FS_PERCENT_THRESHOLD)
.setCheckDiskSpaceInterval(1000)
.endDiskQueue();
}

Expand All @@ -49,7 +52,7 @@ protected void setZeroThresholdQueue(LogzioSender.Builder logzioSenderBuilder) {
private void setFsPercentThreshold(LogzioSender.Builder logzioSenderBuilder, int fsPercentThreshold) {
logzioSenderBuilder
.withDiskQueue()
.setFsPercentThreshold(fsPercentThreshold)
.setFsPercentThreshold(fsPercentThreshold)
.endDiskQueue();
}

Expand All @@ -70,9 +73,10 @@ public void testSenderCantWriteToEmptyDirectory() {
10 * 1000, tasks, false);
LogzioSender testSender = createLogzioSender(testSenderBuilder);
throw new LogzioParameterErrorException("Should not reach here", "fail");
} catch(LogzioParameterErrorException | IOException e) {
} catch (LogzioParameterErrorException | IOException e) {
assertTrue(e.getMessage().contains(tempDirectory.getAbsolutePath()));
}

assertTrue(tempDirectory.exists());
tempDirectory.delete();
tasks.shutdownNow();
Expand All @@ -88,16 +92,30 @@ public void testSenderCreatesDirectoryWhichDoesNotExists() throws Exception {
File queueDir = new File(tempDirectory, "dirWhichDoesNotExists");
String message1 = "Just sending something - " + random(5);
ScheduledExecutorService tasks = Executors.newScheduledThreadPool(3);

assertFalse(queueDir.exists());
setQueueDir(queueDir);
Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout,
10 * 1000, 10 * 1000, tasks, false);
LogzioSender testSender = createLogzioSender(testSenderBuilder);

testSender.send( createJsonMessage(loggerName, message1));
testSender.send(createJsonMessage(loggerName, message1));
assertTrue(queueDir.exists());
tempDirectory.delete();
tasks.shutdownNow();
}

@Test
public void testFilesCleanedFromDisk() throws Exception {
Logger logger = LoggerFactory.getLogger(LogzioSenderTest.class);
ScheduledExecutorService tasks = Executors.newScheduledThreadPool(1);
LogzioTestStatusReporter logy = new LogzioTestStatusReporter(logger);
File tempDirectory = TestEnvironment.createTempDirectory();
File queueDir = new File(tempDirectory, "testFilesDeletion");
DiskQueue diskQueue = LogzioSender.builder().withDiskQueue().setGcPersistedQueueFilesIntervalSeconds(1).setQueueDir(queueDir).setDiskSpaceTasks(tasks).setReporter(logy).build();
JsonObject testMessage = createJsonMessage("testFilesDeleted", "testMessage");
diskQueue.enqueue(testMessage.toString().getBytes(StandardCharsets.UTF_8));
File dataFile = new File(queueDir.getAbsolutePath() + "/data/page-0.dat");
assertTrue(dataFile.length() > 0);
diskQueue.clear();
assertTrue(dataFile.length() == 0);
}
}
32 changes: 14 additions & 18 deletions logzio-sender/src/test/java/io/logz/sender/InMemoryQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
import com.google.gson.JsonObject;
import io.logz.sender.LogzioSender.Builder;
import io.logz.sender.exceptions.LogzioParameterErrorException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.logz.sender.LogzioTestSenderUtil.createJsonMessage;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class InMemoryQueueTest extends LogzioSenderTest {
private final static long defaultCapacityInBytes = 100 * 1024 * 1024;
Expand All @@ -22,10 +22,8 @@ protected Builder getLogzioSenderBuilder(String token, String type, Integer drai
Integer socketTimeout, Integer serverTimeout,
ScheduledExecutorService tasks, boolean compressRequests)
throws LogzioParameterErrorException {

Builder logzioSenderBuilder = super.getLogzioSenderBuilder(token, type, drainTimeout,
socketTimeout, serverTimeout, tasks, compressRequests);

setCapacityInBytes(logzioSenderBuilder, defaultCapacityInBytes);
return logzioSenderBuilder;
}
Expand Down Expand Up @@ -56,27 +54,21 @@ public void checkCapacityReachedToSizeBelowCapacity() throws LogzioParameterErro
String loggerName = "checkCrossCapacityInBytesName";
int drainTimeout = 2;
int successfulLogs = 3;

String message = "Log before drop - " + random(5);
JsonObject log = createJsonMessage(loggerName, message);

int logSize = log.toString().getBytes(StandardCharsets.UTF_8).length;
ScheduledExecutorService tasks = Executors.newScheduledThreadPool(3);

Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000,
10 * 1000, tasks, false);
setCapacityInBytes(testSenderBuilder, logSize * successfulLogs);

LogzioSender testSender = createLogzioSender(testSenderBuilder);

sleepSeconds(drainTimeout - 1);
for (int i = 0; i <= successfulLogs; i++) {
testSender.send(log);
}

sleepSeconds(2 * drainTimeout);
mockListener.assertNumberOfReceivedMsgs(successfulLogs);

sleepSeconds(2 * drainTimeout);
testSender.send(log);
sleepSeconds(2 * drainTimeout);
Expand All @@ -91,33 +83,37 @@ public void checkLogMessageCountLimitWithCapacityInBytes() throws LogzioParamete
String loggerName = "checkLogMessageCountLimitOnly";
int drainTimeout = 2;
int successfulLogs = 3;

String message = "Log before drop - " + random(5);
JsonObject log = createJsonMessage(loggerName, message);

ScheduledExecutorService tasks = Executors.newScheduledThreadPool(3);

Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000,
10 * 1000, tasks, false);
setLogsCountLimit(testSenderBuilder, successfulLogs);

LogzioSender testSender = createLogzioSender(testSenderBuilder);

sleepSeconds(drainTimeout - 1);
for (int i = 0; i <= successfulLogs; i++) {
testSender.send(log);
}

sleepSeconds(2 * drainTimeout);
mockListener.assertNumberOfReceivedMsgs(successfulLogs);

sleepSeconds(2 * drainTimeout);
testSender.send(log);
sleepSeconds(2 * drainTimeout);
mockListener.assertNumberOfReceivedMsgs(successfulLogs + 1);
tasks.shutdownNow();
}


@Test
public void testFilesCleanedFromQueue() {
Logger logger = LoggerFactory.getLogger(LogzioSenderTest.class);
LogzioTestStatusReporter logy = new LogzioTestStatusReporter(logger);
InMemoryQueue inMemoryQueue = LogzioSender.builder().withInMemoryQueue().setReporter(logy).build();
JsonObject testMessage = createJsonMessage("testFilesDeleted", "testMessage");
inMemoryQueue.enqueue(testMessage.toString().getBytes(StandardCharsets.UTF_8));
assertTrue(!inMemoryQueue.isEmpty());
inMemoryQueue.clear();
assertTrue(inMemoryQueue.isEmpty());
}
}

Loading

0 comments on commit ad34458

Please sign in to comment.