Skip to content

Commit

Permalink
Merge pull request #26 from CDOT-CV/Feature/Add-Unit-Tests
Browse files Browse the repository at this point in the history
Add Unit Tests
  • Loading branch information
drewjj authored Jul 16, 2024
2 parents 089318f + 44979a5 commit 05cc92d
Show file tree
Hide file tree
Showing 14 changed files with 610 additions and 67 deletions.
15 changes: 15 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"java.test.config": {
"env": {
"TEST_VARIABLE": "testValue",
"TEST_VARIABLE_EMPTY": "",
"AWS_ACCESS_KEY_ID": "testAccessKey",
"AWS_SECRET_ACCESS_KEY": "testSecretKey",
"AWS_EXPIRATION": "2020-01-01 00:00:00",
"AWS_SESSION_TOKEN": "testSessionToken",
"API_ENDPOINT": "testApiEndpoint",
"CONFLUENT_KEY": "testConfluentKey",
"CONFLUENT_SECRET": "testConfluentSecret",
}
},
}
57 changes: 56 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,41 @@
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<jmockit.version>1.49</jmockit.version>
<!-- Allow override of github organization when publishing artifacts to github -->
<github_organization>usdot-jpo-ode</github_organization>
<argLine>
-javaagent:${user.home}/.m2/repository/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar
</argLine>
<jacoco.version>0.8.11</jacoco.version>
</properties>
<dependencies>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<version>${jmockit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.9.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.3.3</version>
<scope>test</scope>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down Expand Up @@ -101,6 +126,36 @@
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version>
<configuration>
<argLine>-javaagent:${user.home}/.m2/repository/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar -Xshare:off</argLine>
<systemPropertyVariables>
<loader.path>${loader.path}</loader.path>
<buildDirectory>${project.build.directory}</buildDirectory>
</systemPropertyVariables>
<environmentVariables>
<TEST_VARIABLE>testValue</TEST_VARIABLE>
<TEST_VARIABLE_EMPTY></TEST_VARIABLE_EMPTY>
<AWS_ACCESS_KEY_ID>testAccessKey</AWS_ACCESS_KEY_ID>
<AWS_SECRET_ACCESS_KEY>testSecretKey</AWS_SECRET_ACCESS_KEY>
<AWS_SESSION_TOKEN>testSessionToken</AWS_SESSION_TOKEN>
<AWS_EXPIRATION>2020-01-01 00:00:00</AWS_EXPIRATION>
<API_ENDPOINT>testApiEndpoint</API_ENDPOINT>
<CONFLUENT_KEY>testConfluentKey</CONFLUENT_KEY>
<CONFLUENT_SECRET>testConfluentSecret</CONFLUENT_SECRET>
</environmentVariables>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
<distributionManagement>
Expand Down
161 changes: 95 additions & 66 deletions src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,24 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
Expand All @@ -53,19 +67,6 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AwsDepositor {
private final Logger logger = LoggerFactory.getLogger(AwsDepositor.class);
private final long CONSUMER_POLL_TIMEOUT_MS = 60000;
Expand All @@ -79,6 +80,8 @@ public class AwsDepositor {
private String keyName;
private boolean waitOpt;

private boolean runDepositor = true;

private String K_AWS_ACCESS_KEY_ID;
private String K_AWS_SECRET_ACCESS_KEY;
private String K_AWS_SESSION_TOKEN;
Expand All @@ -98,45 +101,12 @@ public class AwsDepositor {

public static void main(String[] args) throws Exception {
AwsDepositor awsDepositor = new AwsDepositor();
awsDepositor.run(args);
awsDepositor.run();
}

public void run(String[] args) throws Exception {
endpoint = getEnvironmentVariable("BOOTSTRAP_SERVER", "");
topic = getEnvironmentVariable("DEPOSIT_TOPIC", "");
group = getEnvironmentVariable("DEPOSIT_GROUP", "");
destination = getEnvironmentVariable("DESTINATION", "firehose");
if (System.getenv("WAIT") != null && System.getenv("WAIT") != "")
{ waitOpt = true; }
else
{ waitOpt = false; }

// S3 properties
bucketName = getEnvironmentVariable("DEPOSIT_BUCKET_NAME", "");
awsRegion = getEnvironmentVariable("REGION", "us-east-1");
keyName = getEnvironmentVariable("DEPOSIT_KEY_NAME", "");

K_AWS_ACCESS_KEY_ID = getEnvironmentVariable("AWS_ACCESS_KEY_ID", "AccessKeyId");
K_AWS_SECRET_ACCESS_KEY = getEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "SecretAccessKey");
K_AWS_SESSION_TOKEN = getEnvironmentVariable("AWS_SESSION_TOKEN", "SessionToken");
K_AWS_EXPIRATION = getEnvironmentVariable("AWS_EXPIRATION", "Expiration");
API_ENDPOINT = getEnvironmentVariable("API_ENDPOINT", "");
HEADER_Accept = getEnvironmentVariable("HEADER_ACCEPT", "application/json");
HEADER_X_API_KEY = getEnvironmentVariable("HEADER_X_API_KEY", "");

logger.debug("Bucket name: {}", bucketName);
logger.debug("AWS Region: {}", awsRegion);
logger.debug("Key name: {}", keyName);
logger.debug("Kafka topic: {}", topic);
logger.debug("Destination: {}", destination);
logger.debug("Wait: {}", waitOpt);
logger.debug("AWS_ACCESS_KEY_ID: {}", K_AWS_ACCESS_KEY_ID);
logger.debug("AWS_SECRET_ACCESS_KEY: {}", K_AWS_SECRET_ACCESS_KEY);
logger.debug("AWS_SESSION_TOKEN: {}", K_AWS_SESSION_TOKEN);
logger.debug("AWS_EXPIRATION: {}", K_AWS_EXPIRATION);
logger.debug("API_ENDPOINT: {}", API_ENDPOINT);
logger.debug("HEADER_Accept: {}", HEADER_Accept);
logger.debug("HEADER_X_API_KEY: {}", HEADER_X_API_KEY);
public void run() throws Exception {
// Pull in environment variables
depositorSetup();

if (API_ENDPOINT.length() > 0) {
JSONObject profile = generateAWSProfile();
Expand Down Expand Up @@ -187,16 +157,16 @@ public void run(String[] args) throws Exception {
}


while (true) {
KafkaConsumer<String, String> stringConsumer = new KafkaConsumer<String, String>(props);
while (getRunDepositor()) {
KafkaConsumer<String, String> stringConsumer = getKafkaConsumer(props);

logger.debug("Subscribing to topic " + topic);
stringConsumer.subscribe(Arrays.asList(topic));

try {
boolean gotMessages = false;

while (true) {
while (getRunDepositor()) {
ConsumerRecords<String, String> records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS));
if (records != null && !records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
Expand Down Expand Up @@ -234,7 +204,7 @@ public void run(String[] args) throws Exception {
}
}

private static void addConfluentProperties(Properties props) {
static void addConfluentProperties(Properties props) {
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
Expand All @@ -250,7 +220,7 @@ private static void addConfluentProperties(Properties props) {
}
}

private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerRecord<String, String> record)
void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerRecord<String, String> record)
throws InterruptedException, ExecutionException, IOException {
try {
// IMPORTANT!!!
Expand All @@ -261,9 +231,8 @@ private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerReco
ByteBuffer data = convertStringToByteBuffer(msg, Charset.defaultCharset());

// Check the expiration time for the profile credentials
LocalDateTime current_datetime = LocalDateTime.now();
LocalDateTime expiration_datetime = LocalDateTime.parse(AWS_EXPIRATION,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
LocalDateTime current_datetime = getLocalDateTime();
LocalDateTime expiration_datetime = getExpirationDateTime();
System.out.println();
if (expiration_datetime.isBefore(current_datetime) && API_ENDPOINT.length() > 0) {
// If credential is expired, generate aws credentials
Expand Down Expand Up @@ -307,7 +276,7 @@ private void depositToFirehose(AmazonKinesisFirehoseAsync firehose, ConsumerReco
}
}

private void depositToS3(AmazonS3 s3, ConsumerRecord<String, String> record) throws IOException {
void depositToS3(AmazonS3 s3, ConsumerRecord<String, String> record) throws IOException {
try {
long time = System.currentTimeMillis();
String timeStamp = Long.toString(time);
Expand Down Expand Up @@ -346,7 +315,7 @@ private void depositToS3(AmazonS3 s3, ConsumerRecord<String, String> record) thr
}
}

private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord<String, String> record) {
void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerRecord<String, String> record) {
String recordValue = record.value();
Bucket bucket = gcsStorage.get(depositBucket);
byte[] bytes = recordValue.getBytes(Charset.defaultCharset());
Expand All @@ -362,7 +331,7 @@ private void depositToGCS(Storage gcsStorage, String depositBucket, ConsumerReco
}
}

private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
// Default is to deposit to Kinesis/Firehose, override via .env
// variables if S3 deposit desired
logger.debug("=============================");
Expand All @@ -372,7 +341,7 @@ private AmazonKinesisFirehoseAsync buildFirehoseClient(String awsRegion) {
return AmazonKinesisFirehoseAsyncClientBuilder.standard().withRegion(awsRegion).build();
}

private AmazonS3 createS3Client(String awsRegion) {
AmazonS3 createS3Client(String awsRegion) {
logger.debug("============== ========");
logger.debug("Connecting to Amazon S3");
logger.debug("=======================");
Expand All @@ -397,7 +366,7 @@ public ByteBuffer convertStringToByteBuffer(String msg, Charset charset) {
return ByteBuffer.wrap(msg.getBytes(charset));
}

private File createSampleFile(String json) throws IOException {
File createSampleFile(String json) throws IOException {
File file = File.createTempFile("aws-java-sdk-", ".json");
file.deleteOnExit();

Expand All @@ -408,8 +377,8 @@ private File createSampleFile(String json) throws IOException {
return file;
}

private JSONObject generateAWSProfile() throws IOException {
CloseableHttpClient client = HttpClients.createDefault();
JSONObject generateAWSProfile() throws IOException {
CloseableHttpClient client = getHttpClient();
HttpPost httpPost = new HttpPost(API_ENDPOINT);
JSONObject jsonResult = new JSONObject();
String json = "{}";
Expand All @@ -435,7 +404,9 @@ private JSONObject generateAWSProfile() throws IOException {
return jsonResult;
}

private static String getEnvironmentVariable(String variableName, String defaultValue) {
static String getEnvironmentVariable(String variableName, String defaultValue) {
// get all environment variables
Map<String, String> env = System.getenv();
String value = System.getenv(variableName);
if (value == null || value.equals("")) {
System.out.println("Something went wrong retrieving the environment variable " + variableName);
Expand All @@ -445,4 +416,62 @@ private static String getEnvironmentVariable(String variableName, String default
return value;
}

CloseableHttpClient getHttpClient() {
return HttpClients.createDefault();
}

LocalDateTime getLocalDateTime() {
return LocalDateTime.now();
}

LocalDateTime getExpirationDateTime() {
return LocalDateTime.parse(K_AWS_EXPIRATION,
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}

void depositorSetup() {
endpoint = getEnvironmentVariable("BOOTSTRAP_SERVER", "");
topic = getEnvironmentVariable("DEPOSIT_TOPIC", "");
group = getEnvironmentVariable("DEPOSIT_GROUP", "");
destination = getEnvironmentVariable("DESTINATION", "firehose");
if (System.getenv("WAIT") != null && System.getenv("WAIT") != "")
{ waitOpt = true; }
else
{ waitOpt = false; }

// S3 properties
bucketName = getEnvironmentVariable("DEPOSIT_BUCKET_NAME", "");
awsRegion = getEnvironmentVariable("REGION", "us-east-1");
keyName = getEnvironmentVariable("DEPOSIT_KEY_NAME", "");

K_AWS_ACCESS_KEY_ID = getEnvironmentVariable("AWS_ACCESS_KEY_ID", "AccessKeyId");
K_AWS_SECRET_ACCESS_KEY = getEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "SecretAccessKey");
K_AWS_SESSION_TOKEN = getEnvironmentVariable("AWS_SESSION_TOKEN", "SessionToken");
K_AWS_EXPIRATION = getEnvironmentVariable("AWS_EXPIRATION", "Expiration");
API_ENDPOINT = getEnvironmentVariable("API_ENDPOINT", "");
HEADER_Accept = getEnvironmentVariable("HEADER_ACCEPT", "application/json");
HEADER_X_API_KEY = getEnvironmentVariable("HEADER_X_API_KEY", "");

logger.debug("Bucket name: {}", bucketName);
logger.debug("AWS Region: {}", awsRegion);
logger.debug("Key name: {}", keyName);
logger.debug("Kafka topic: {}", topic);
logger.debug("Destination: {}", destination);
logger.debug("Wait: {}", waitOpt);
logger.debug("AWS_ACCESS_KEY_ID: {}", K_AWS_ACCESS_KEY_ID);
logger.debug("AWS_SECRET_ACCESS_KEY: {}", K_AWS_SECRET_ACCESS_KEY);
logger.debug("AWS_SESSION_TOKEN: {}", K_AWS_SESSION_TOKEN);
logger.debug("AWS_EXPIRATION: {}", K_AWS_EXPIRATION);
logger.debug("API_ENDPOINT: {}", API_ENDPOINT);
logger.debug("HEADER_Accept: {}", HEADER_Accept);
logger.debug("HEADER_X_API_KEY: {}", HEADER_X_API_KEY);
}

boolean getRunDepositor() {
return runDepositor;
}

KafkaConsumer<String, String> getKafkaConsumer(Properties props) {
return new KafkaConsumer<>(props);
}
}
Loading

0 comments on commit 05cc92d

Please sign in to comment.