diff --git a/src/main/java/com/kakao/connector/kafka/KafkaSinkTask.java b/src/main/java/com/kakao/connector/kafka/KafkaSinkTask.java index b02f0ec..610d060 100644 --- a/src/main/java/com/kakao/connector/kafka/KafkaSinkTask.java +++ b/src/main/java/com/kakao/connector/kafka/KafkaSinkTask.java @@ -85,38 +85,32 @@ public void start(Map props) { @Override public void put(Collection records) { - if (samplingEnabled) { - for (SinkRecord record : records) { - if (record.value() != null) { - try { - String value = record.value().toString(); - boolean samplingCondition = Math.random() < samplingPercentage; - if (samplingCondition - && isFilteringMatch(value) - ) { + for (SinkRecord record : records) { + if (record.value() != null) { + try { + String value = record.value().toString(); + if (!samplingEnabled && isFilteringMatch(value)) { + sendRecord(value); + } else if (samplingEnabled) { + if (shouldSendRecord(value)) { sendRecord(value); } - } catch (Exception e) { - log.error(e.getMessage() + " / " + connectorName, e); - } - } - } - } else { - for (SinkRecord record : records) { - if (record.value() != null) { - try { - String value = record.value().toString(); - if (isFilteringMatch(value)) { - sendRecord(value); - } - } catch (Exception e) { - log.error(e.getMessage() + " / " + connectorName, e); } + } catch (Exception e) { + logError(e); } } } } + private boolean shouldSendRecord(String value) { + return Math.random() < samplingPercentage && isFilteringMatch(value); + } + + private void logError(Exception e) { + log.error(e.getMessage() + " / " + connectorName, e); + } + private void sendRecord(String value) { ProducerRecord sendRecord = getSinkRecord(value); producer.send(sendRecord, new ProducerCallback()); @@ -143,7 +137,7 @@ private long parsingTimestamp(String value) { LocalDateTime dateTime = LocalDateTime.parse(timestampFieldValue, formatter); return Timestamp.valueOf(dateTime).getTime(); } catch (Exception e) { - log.error(e.getMessage() + " / " + connectorName, e); + logError(e); return System.currentTimeMillis(); } } @@ -151,13 +145,13 @@ private long parsingTimestamp(String value) { private String parsingMessageKey(String value) { try { StringBuilder messageKey = new StringBuilder(); - List fields = Arrays.asList(keyParsingField.split(",")); + String[] fields = keyParsingField.split(","); for (String field : fields) { messageKey.append(JsonPath.read(value, field).toString()); } return messageKey.toString(); } catch (Exception e) { - log.error(e.getMessage() + " / " + connectorName, e); + logError(e); return null; } }