Skip to content

Commit

Permalink
Tuning the Client Ids
Browse files Browse the repository at this point in the history
  • Loading branch information
yatharthranjan committed Nov 7, 2017
1 parent 6add22c commit 69b652c
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 29 deletions.
1 change: 0 additions & 1 deletion .idea/.name

This file was deleted.

23 changes: 1 addition & 22 deletions .idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions src/main/java/org/radarcns/monitor/AbstractKafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ public AbstractKafkaMonitor(RadarPropertyHandler radar, Collection<String> topic

properties = new Properties();
String deserializer = KafkaAvroDeserializer.class.getName();
String monitorClientId = getClass().getName() + "-" + clientId;
properties.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
properties.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
properties.setProperty(GROUP_ID_CONFIG, groupId);
properties.setProperty(CLIENT_ID_CONFIG, clientId);
properties.setProperty(CLIENT_ID_CONFIG, monitorClientId);
properties.setProperty(ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.setProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1001");
properties.setProperty(SESSION_TIMEOUT_MS_CONFIG, "15101");
Expand All @@ -105,7 +106,7 @@ public AbstractKafkaMonitor(RadarPropertyHandler radar, Collection<String> topic
this.topics = topics;
this.pollTimeout = new AtomicLong(Long.MAX_VALUE);
this.done = false;
this.clientId = clientId;
this.clientId = monitorClientId;
this.groupId = groupId;

PersistentStateStore localStateStore;
Expand All @@ -121,7 +122,7 @@ public AbstractKafkaMonitor(RadarPropertyHandler radar, Collection<String> topic
S localState = stateDefault;
if (stateStore != null && stateDefault != null) {
try {
localState = stateStore.retrieveState(groupId, clientId, stateDefault);
localState = stateStore.retrieveState(groupId, monitorClientId, stateDefault);
logger.info("Using existing {} from persistence store.",
stateDefault.getClass().getName());
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class BatteryLevelMonitor extends
*/
public BatteryLevelMonitor(RadarPropertyHandler radar, Collection<String> topics,
EmailSender sender, Status minLevel, long logInterval) {
super(radar, topics, "battery_monitors", "2", new BatteryLevelState());
super(radar, topics, "battery_monitors", "1", new BatteryLevelState());

Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void createBatteryMonitor() throws Exception {
assertEquals(BatteryLevelMonitor.class, monitor.getClass());
BatteryLevelMonitor batteryMonitor = (BatteryLevelMonitor) monitor;
batteryMonitor.evaluateRecords(new ConsumerRecords<>(Collections.emptyMap()));
assertTrue(new File(config.getPersistencePath(), "battery_monitors_2.yml").isFile());
assertTrue(new File(config.getPersistencePath(), "battery_monitors_" +
BatteryLevelMonitor.class.getName() + "-1.yml").isFile());
}

@Test(expected = IOException.class)
Expand All @@ -79,7 +80,8 @@ public void createDisconnectMonitor() throws Exception {
assertEquals(DisconnectMonitor.class, monitor.getClass());
DisconnectMonitor disconnectMonitor = (DisconnectMonitor) monitor;
disconnectMonitor.evaluateRecords(new ConsumerRecords<>(Collections.emptyMap()));
assertTrue(new File(config.getPersistencePath(), "disconnect_monitor_1.yml").isFile());
assertTrue(new File(config.getPersistencePath(), "disconnect_monitor_" +
DisconnectMonitor.class.getName() + "-1.yml").isFile());
}

@Test
Expand Down

0 comments on commit 69b652c

Please sign in to comment.