You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
@gresrun: Is #90 really fixed? I have seen a similar Exception in my code. I have written a little ugly presentation test to show, that this exception can still be thrown.
And below there is the test class. I know that this code is not very cool, but it shows this exception:
Exception in thread "Worker-2 Jesque-2.1.1: STOPPING" java.lang.ClassCastException: [B cannot be cast to java.lang.Long
at redis.clients.jedis.Connection.getIntegerReply(Connection.java:222)
at redis.clients.jedis.Jedis.srem(Jedis.java:1091)
at net.greghaines.jesque.worker.WorkerImpl.run(WorkerImpl.java:198)
at java.lang.Thread.run(Thread.java:745)
Please notice the timeout is set to 20. If I increase the timeout to e.g. 5.000 (default), then the ClassCastException is not thrown (or perhaps the test have to run longer than before!).
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.ConfigBuilder;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.client.ClientPoolImpl;
import net.greghaines.jesque.worker.JobFactory;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerImplFactory;
import net.greghaines.jesque.worker.WorkerListener;
import net.greghaines.jesque.worker.WorkerPool;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPool;
public class JesqueWorkerPoolTest{
private static final Logger LOG = LoggerFactory.getLogger(JesqueWorkerPoolTest.class);
private static final Random RANDOM = new Random();
private static final Object[] args = new Object[] {};
private static final Map<String, ? extends Object> vars = new HashMap<String, Object>();
private static final String className = JesqueWorkerPoolTest.class.getName();
private static final String Q_NAME = "JustTESTQ";
private static final List<String> queues = new ArrayList<String>();
static {
queues.add(Q_NAME);
}
private static volatile long lastRun = System.currentTimeMillis();
private static volatile AtomicLong workerStopCounter = new AtomicLong();
public static void main(
String[] mainargs) {
AtomicLong jobCounter = new AtomicLong();
Config jesqueConfig = getConfig();
JobFactory jobFactory = new JobFactory() {
@Override
public Object materializeJob(
Job job) throws Exception {
return new Runnable() {
@Override
public void run() {
System.out.println("Run JOB:" + jobCounter.getAndIncrement());
lastRun = System.currentTimeMillis();
}
};
}
};
ClientPoolImpl jesqueClientPool = new ClientPoolImpl(jesqueConfig, new JedisPool(jesqueConfig.getHost()));
WorkerImplFactory workerFactory = new WorkerImplFactory(jesqueConfig, queues, jobFactory);
WorkerPool workerPool = new WorkerPool(workerFactory, 4);
workerPool.getWorkerEventEmitter().addListener(new WorkerListener() {
@Override
public void onEvent(
WorkerEvent event,
Worker worker,
String queue,
Job job,
Object runner,
Object result,
Throwable t) {
ZonedDateTime lastWorkerProblemEventTime = ZonedDateTime.now();
String lastWorkerProblemMsg = String.format(
"Event '%s' @ %s:%s@%s. JobArgs: %s; JobVars:%s. EventTime:%s", event == null ? "<EventIsNull>"
: event.toString(), worker == null ? "<WorkerIsNull>" : worker.getName(), queue,
runner == null ? "<RunnerIsNull>" : runner.getClass().getSimpleName(),
job != null ? job.getArgs() : "null", job != null ? job.getVars() : "null",
lastWorkerProblemEventTime.toString());
if (t == null) {
LOG.info(lastWorkerProblemMsg);
} else {
LOG.warn(lastWorkerProblemMsg, t);
}
if (event.compareTo(WorkerEvent.WORKER_STOP) == 0) {
workerStopCounter.incrementAndGet();
}
}
});
long startWokerPool = System.currentTimeMillis();
workerPool.run();
while (workerStopCounter.get() < 4 && 100000 > (System.currentTimeMillis() - lastRun)) {
jesqueClientPool.enqueue(Q_NAME, new Job(className, args, vars));
LOG.info("workerStopCounter:" + workerStopCounter.get());
LOG.info("lastRun:" + lastRun);
LOG.info("System.currentTimeMillis() - lastRun:" + (System.currentTimeMillis() - lastRun));
try {
Thread.sleep(RANDOM.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long endWorkerPool = System.currentTimeMillis();
LOG.info("jobCounter:" + jobCounter.get());
LOG.info("workerStopCounter:" + workerStopCounter.get());
long workerDurationMS = endWorkerPool - startWokerPool;
LOG.info("WorkerPoolWorkTime:" + new Duration(workerDurationMS));
LOG.info("workerPool.isShutdown():" + workerPool.isShutdown());
workerPool.end(true);
}
private static Config getConfig() {
ConfigBuilder configBuilder = new ConfigBuilder();
configBuilder.withHost("myredishost");
configBuilder.withTimeout(20);
configBuilder.withPort(6379);
return configBuilder.build();
}
}
Perhaps the problem of Stopping workers due to some known exceptions could be reduced (workaround), if the Pool would spawn new Workers if there is the WorkerEvent.WORKER_STOP event with this type of Exception? Because now I have to monitor the WorkerPool and start own workers or in the end I'm stucked because all workers are down.
`
The text was updated successfully, but these errors were encountered:
@gresrun: Is #90 really fixed? I have seen a similar Exception in my code. I have written a little ugly presentation test to show, that this exception can still be thrown.
I'm using jesque v2.1.1
Some informations about my redis DB:
And below there is the test class. I know that this code is not very cool, but it shows this exception:
Please notice the timeout is set to 20. If I increase the timeout to e.g. 5.000 (default), then the ClassCastException is not thrown (or perhaps the test have to run longer than before!).
Perhaps the problem of Stopping workers due to some known exceptions could be reduced (workaround), if the Pool would spawn new Workers if there is the WorkerEvent.WORKER_STOP event with this type of Exception? Because now I have to monitor the WorkerPool and start own workers or in the end I'm stucked because all workers are down.
`
The text was updated successfully, but these errors were encountered: