Skip to content

Commit

Permalink
Use monotonic clocks for TSpace.
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshilliard committed May 27, 2021
1 parent 6c05df8 commit d9bc510
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 53 deletions.
62 changes: 33 additions & 29 deletions jpos/src/main/java/org/jpos/space/TSpace.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.jpos.space;
import org.jpos.util.Loggeable;

import java.io.PrintStream;
import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;

Expand All @@ -41,7 +41,7 @@ public class TSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable {
private static final long NRD_RESOLUTION = 500L;
private static final int MAX_ENTRIES_IN_DUMP = 1000;
private final Set[] expirables;
private long lastLongGC = Instant.now().toEpochMilli();
private Duration lastLongGC = Duration.ofNanos(System.nanoTime());

public TSpace () {
super();
Expand Down Expand Up @@ -70,7 +70,7 @@ public void out (K key, V value, long timeout) {
throw new NullPointerException ("key=" + key + ", value=" + value);
Object v = value;
if (timeout > 0) {
v = new Expirable (value, Instant.now().toEpochMilli() + timeout);
v = new Expirable (value, Duration.ofNanos(System.nanoTime()).plus(Duration.ofMillis(timeout)));
}
synchronized (this) {
List l = getList(key);
Expand Down Expand Up @@ -113,13 +113,14 @@ public synchronized V in (Object key) {
@Override
public synchronized V in (Object key, long timeout) {
Object obj;
Instant now = Instant.now();
long duration;
Duration to = Duration.ofMillis(timeout);
Duration now = Duration.ofNanos(System.nanoTime());
Duration duration;
while ((obj = inp (key)) == null &&
(duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
to.compareTo(duration = Duration.ofNanos(System.nanoTime()).minus(now)) > 0)
{
try {
this.wait (timeout - duration);
this.wait (Math.max(to.minus(duration).toMillis(), 1L));
} catch (InterruptedException e) { }
}
return (V) obj;
Expand All @@ -139,13 +140,14 @@ public synchronized V rd (Object key) {
@Override
public synchronized V rd (Object key, long timeout) {
Object obj;
Instant now = Instant.now();
long duration;
Duration to = Duration.ofMillis(timeout);
Duration now = Duration.ofNanos(System.nanoTime());
Duration duration;
while ((obj = rdp (key)) == null &&
(duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
to.compareTo(duration = Duration.ofNanos(System.nanoTime()).minus(now)) > 0)
{
try {
this.wait (timeout - duration);
this.wait (Math.max(to.minus(duration).toMillis(), 1L));
} catch (InterruptedException e) { }
}
return (V) obj;
Expand All @@ -163,13 +165,14 @@ public synchronized void nrd (Object key) {
@Override
public synchronized V nrd (Object key, long timeout) {
Object obj;
Instant now = Instant.now();
long duration;
Duration to = Duration.ofMillis(timeout);
Duration now = Duration.ofNanos(System.nanoTime());
Duration duration;
while ((obj = rdp (key)) != null &&
(duration = Duration.between(now, Instant.now()).toMillis()) < timeout)
to.compareTo(duration = Duration.ofNanos(System.nanoTime()).minus(now)) > 0)
{
try {
this.wait (Math.min(NRD_RESOLUTION, timeout - duration));
this.wait (Math.min(NRD_RESOLUTION, Math.max(to.minus(duration).toMillis(), 1L)));
} catch (InterruptedException ignored) { }
}
return (V) obj;
Expand All @@ -186,9 +189,9 @@ public void run () {

public void gc () {
gc(0);
if (Instant.now().toEpochMilli() - lastLongGC > GCLONG) {
if (Duration.ofMillis(GCLONG).compareTo(Duration.ofNanos(System.nanoTime()).minus(lastLongGC)) > 0) {
gc(1);
lastLongGC = Instant.now().toEpochMilli();
lastLongGC = Duration.ofNanos(System.nanoTime());
}
}

Expand Down Expand Up @@ -336,7 +339,7 @@ public void push (K key, V value, long timeout) {
throw new NullPointerException ("key=" + key + ", value=" + value);
Object v = value;
if (timeout > 0) {
v = new Expirable (value, Instant.now().toEpochMilli() + timeout);
v = new Expirable (value, Duration.ofNanos(System.nanoTime()).plus(Duration.ofMillis(timeout)));
}
synchronized (this) {
List l = getList(key);
Expand Down Expand Up @@ -373,7 +376,7 @@ public void put (K key, V value, long timeout) {
throw new NullPointerException ("key=" + key + ", value=" + value);
Object v = value;
if (timeout > 0) {
v = new Expirable (value, Instant.now().toEpochMilli() + timeout);
v = new Expirable (value, Duration.ofNanos(System.nanoTime()).plus(Duration.ofMillis(timeout)));
}
synchronized (this) {
List l = new LinkedList();
Expand All @@ -399,14 +402,15 @@ public boolean existAny (K[] keys) {

@Override
public boolean existAny (K[] keys, long timeout) {
Instant now = Instant.now();
long duration;
while ((duration = Duration.between(now, Instant.now()).toMillis()) < timeout) {
Duration to = Duration.ofMillis(timeout);
Duration now = Duration.ofNanos(System.nanoTime());
Duration duration;
while (to.compareTo(duration = Duration.ofNanos(System.nanoTime()).minus(now)) > 0) {
if (existAny (keys))
return true;
synchronized (this) {
try {
wait (timeout - duration);
wait (Math.max(to.minus(duration).toMillis(), 1L));
} catch (InterruptedException e) { }
}
}
Expand Down Expand Up @@ -520,16 +524,16 @@ static class Expirable implements Comparable, Serializable {
static final long serialVersionUID = 0xA7F22BF5;

Object value;
long expires;
Duration expires;

public Expirable (Object value, long expires) {
public Expirable (Object value, Duration expires) {
super();
this.value = value;
this.expires = expires;
}

public boolean isExpired () {
return expires < Instant.now().toEpochMilli();
return expires.compareTo(Duration.ofNanos(System.nanoTime())) < 0;
}

@Override
Expand All @@ -547,10 +551,10 @@ public Object getValue() {
@Override
public int compareTo (Object obj) {
Expirable other = (Expirable) obj;
long otherExpires = other.expires;
if (otherExpires == expires)
Duration otherExpires = other.expires;
if (otherExpires.equals(expires))
return 0;
else if (expires < otherExpires)
else if (expires.compareTo(otherExpires) < 0)
return -1;
else
return 1;
Expand Down
5 changes: 2 additions & 3 deletions jpos/src/test/java/org/jpos/space/TSpacePerformanceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.jpos.space;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -184,9 +183,9 @@ public void testDeadLockWithNotify() throws Throwable {
for (int i=0; i<size; i++)
es.execute(new WriteSpaceWithNotifyTask("WriteTask2-"+i,sp2,sp1));

Instant stamp = Instant.now();
Duration stamp = Duration.ofNanos(System.nanoTime());
while (((ThreadPoolExecutor)es).getActiveCount() > 0) {
if (Duration.between(stamp, Instant.now()).toMillis() < 10000){
if (Duration.ofNanos(System.nanoTime()).minus(stamp).toMillis() < 10000){
ISOUtil.sleep(100);
continue;
}
Expand Down
29 changes: 15 additions & 14 deletions jpos/src/test/java/org/jpos/space/TSpaceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.time.Duration;
import java.util.AbstractSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -62,27 +63,27 @@ public void testDump() throws Throwable {

@Test
public void testExpirableCompareTo() throws Throwable {
int result = new TSpace.Expirable(Integer.valueOf(0), 1L).compareTo(new TSpace.Expirable(new Object(), 0L));
int result = new TSpace.Expirable(Integer.valueOf(0), Duration.ofMillis(1L)).compareTo(new TSpace.Expirable(new Object(), Duration.ofMillis(0L)));
assertEquals(1, result, "result");
}

@Test
public void testExpirableCompareTo1() throws Throwable {
TSpace.Expirable obj = new TSpace.Expirable(new Object(), 0L);
int result = new TSpace.Expirable(new Object(), 0L).compareTo(obj);
TSpace.Expirable obj = new TSpace.Expirable(new Object(), Duration.ofMillis(0L));
int result = new TSpace.Expirable(new Object(), Duration.ofMillis(0L)).compareTo(obj);
assertEquals(0, result, "result");
}

@Test
public void testExpirableCompareTo2() throws Throwable {
int result = new TSpace.Expirable(null, 0L).compareTo(new TSpace.Expirable(new Object(), 1L));
int result = new TSpace.Expirable(null, Duration.ofMillis(0L)).compareTo(new TSpace.Expirable(new Object(), Duration.ofMillis(1L)));
assertEquals(-1, result, "result");
}

@Test
public void testExpirableCompareToThrowsNullPointerException() throws Throwable {
try {
new TSpace.Expirable(new Object(), 100L).compareTo(null);
new TSpace.Expirable(new Object(), Duration.ofMillis(100L)).compareTo(null);
fail("Expected NullPointerException to be thrown");
} catch (NullPointerException ex) {
if (isJavaVersionAtMost(JAVA_14)) {
Expand All @@ -96,51 +97,51 @@ public void testExpirableCompareToThrowsNullPointerException() throws Throwable
@Test
public void testExpirableConstructor() throws Throwable {
Object value = new Object();
TSpace.Expirable expirable = new TSpace.Expirable(value, 100L);
assertEquals(100L, expirable.expires, "expirable.expires");
TSpace.Expirable expirable = new TSpace.Expirable(value, Duration.ofMillis(100L));
assertEquals(Duration.ofMillis(100L), expirable.expires, "expirable.expires");
assertSame(value, expirable.value, "expirable.value");
}

@Test
public void testExpirableGetValue() throws Throwable {
String result = (String) new TSpace.Expirable("", 9184833384926L).getValue();
String result = (String) new TSpace.Expirable("", Duration.ofNanos(System.nanoTime()).plus(Duration.ofMillis(9184833384926L))).getValue();
assertEquals("", result, "result");
}

@Test
public void testExpirableGetValue1() throws Throwable {
Object result = new TSpace.Expirable(null, 9184833384926L).getValue();
Object result = new TSpace.Expirable(null, Duration.ofMillis(9184833384926L)).getValue();
assertNull(result, "result");
}

@Test
public void testExpirableGetValue2() throws Throwable {
Object result = new TSpace.Expirable(new Object(), 100L).getValue();
Object result = new TSpace.Expirable(new Object(), Duration.ofNanos(System.nanoTime()).minus(Duration.ofMillis(100L))).getValue();
assertNull(result, "result");
}

@Test
public void testExpirableIsExpired() throws Throwable {
boolean result = new TSpace.Expirable("", 9184833384926L).isExpired();
boolean result = new TSpace.Expirable("", Duration.ofNanos(System.nanoTime()).plus(Duration.ofMillis(9184833384926L))).isExpired();
assertFalse(result, "result");
}

@Test
public void testExpirableIsExpired1() throws Throwable {
boolean result = new TSpace.Expirable(new Object(), 100L).isExpired();
boolean result = new TSpace.Expirable(new Object(), Duration.ofNanos(System.nanoTime()).minus(Duration.ofMillis(100L))).isExpired();
assertTrue(result, "result");
}

@Test
public void testExpirableToString() throws Throwable {
new TSpace.Expirable(";\"i", 100L).toString();
new TSpace.Expirable(";\"i", Duration.ofMillis(100L)).toString();
assertTrue(true, "Test completed without Exception");
}

@Test
public void testExpirableToStringThrowsNullPointerException() throws Throwable {
try {
new TSpace.Expirable(null, 100L).toString();
new TSpace.Expirable(null, Duration.ofMillis(100L)).toString();
fail("Expected NullPointerException to be thrown");
} catch (NullPointerException ex) {
if (isJavaVersionAtMost(JAVA_14)) {
Expand Down
13 changes: 6 additions & 7 deletions jpos/src/test/java/org/jpos/space/TSpaceTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.jpos.space;

import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;

Expand Down Expand Up @@ -289,28 +288,28 @@ public void run() {
sp.out("KA", Boolean.TRUE);
}
}.start();
Instant now = Instant.now();
Duration now = Duration.ofNanos(System.nanoTime());
assertTrue(sp.existAny(new String[] { "KA", "KB" }, 2000L), "existAnyWithTimeout ([KA,KB], delay)");
long elapsed = Duration.between(now, Instant.now()).toMillis();
long elapsed = Duration.ofNanos(System.nanoTime()).minus(now).toMillis();
assertTrue(elapsed > 900L, "delay was > 1000");
}

@Test
public void testNRD() {
Instant now = Instant.now();
Duration now = Duration.ofNanos(System.nanoTime());
sp.out("NRD", "NRDTEST", 1000L);
sp.nrd("NRD");
long elapsed = Duration.between(now, Instant.now()).toMillis();
long elapsed = Duration.ofNanos(System.nanoTime()).minus(now).toMillis();
assertTrue(elapsed >= 1000L, "Invalid elapsed time " + elapsed);
}
@Test
public void testNRDWithDelay() {
Instant now = Instant.now();
Duration now = Duration.ofNanos(System.nanoTime());
sp.out("NRD", "NRDTEST", 1000L);
Object obj = sp.nrd("NRD", 500L);
assertNotNull(obj, "Object should not be null");
obj = sp.nrd("NRD", 5000L);
long elapsed = Duration.between(now, Instant.now()).toMillis();
long elapsed = Duration.ofNanos(System.nanoTime()).minus(now).toMillis();
assertTrue(elapsed >= 1000L && elapsed <= 2000L, "Invalid elapsed time " + elapsed);
assertNull(obj, "Object should be null");
}
Expand Down

0 comments on commit d9bc510

Please sign in to comment.