Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

About running the job only one time on restart #111

Open
sohakes opened this issue Jul 11, 2016 · 3 comments
Open

About running the job only one time on restart #111

sohakes opened this issue Jul 11, 2016 · 3 comments

Comments

@sohakes
Copy link

sohakes commented Jul 11, 2016

If I stop my application and restart it after some time (using the same queue names), it will execute the jobs as many times as it should have run if my application didn't stop. Is there a way to make it execute only one time?

From what I got in the lua script, it adds the frequency in the job score when popping it, so I guess I could achieve what I want by setting all the existing job scores as the current time when the application starts. But I wanted to know if there is a solution for that already, or some other idea, because it looks hackish.

The delayed job, if I understand correctly, don't fit well since I need to re-add the job after it runs, and I guess I could lose a job if my application stopped in the middle of the process.

@argvk
Copy link
Contributor

argvk commented Nov 3, 2016

there may be cases where a delay is acceptable.
IMHO, a way to do it would be send the job to the worker along with the score, thereby allowing the worker to either noop it or handle however.

@sohakes
Copy link
Author

sohakes commented Nov 22, 2016

I end up doing it in a weird way a long time ago. There are probably better solutions (if I recall correctly, Jesque works by adding the waitTime to each key until it's bigger than the current time - so another solution to this problem is to simply set the time to now + waitTime instead of adding), but I just created a method that is called when the application is started, and uses a lua script to reset all scores. Class:

@Service
public class ResetQueueScores {
    private final String resetQueueScriptHash;
    
    Jedis jedis;
    
    public static String readScript(final String resourceName) throws IOException {
        final StringBuilder buf = new StringBuilder();
        Resource resource = new ClassPathResource(resourceName);
        try (final InputStream inputStream = resource.getInputStream()) {
            if (inputStream == null) {
                throw new IOException("Could not find script resource: " + resourceName);
            }
            try (final BufferedReader reader = new BufferedReader(
                    new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
                String prefix = "";
                String line;
                while ((line = reader.readLine()) != null) {
                    line = line.trim();
                    if (line.length() > 0) {
                        buf.append(prefix).append(line.trim());
                        prefix = "\n";
                    }
                }
            }
        }
        return buf.toString();
    }

    @Autowired
    public ResetQueueScores(Jedis jedis) {
        this.jedis = jedis;
        try {
            resetQueueScriptHash = jedis.scriptLoad(readScript("reset_scores.lua"));
        } catch (IOException e) {
            throw new RuntimeException("couldn't load reset_scores.lua script on redis :(: " + e.getMessage());
        }
    }

    public void resetQueueScore(String queue, Long waitTime) {
        final String key = JesqueUtils.createKey("resque", QUEUE, queue);
        jedis.evalsha(resetQueueScriptHash, 1, key, Long.toString(System.currentTimeMillis() + waitTime));
    }
}

The script:

local queueKey = KEYS[1]
local now = ARGV[1]
local payload = nil

local ok, queueType = next(redis.call('TYPE', queueKey))
if queueType == 'zset' then
	local zsetMembers = redis.call('zrange', queueKey, '0', '-1')
	for k,member in pairs(zsetMembers) do 
		redis.call('zadd', queueKey, now, member) 
	end
end

return payload

Putting it here in case it helps someone.

@cyao2q
Copy link

cyao2q commented Jan 23, 2021

重置脚本 需要判断是否是循环任务 是否score小于now

String script = "local queueKey = KEYS[1]\n" + "local freqKey = KEYS[2]\n" + "local now = ARGV[1]\n" + "local ok, queueType = next(redis.call('TYPE', queueKey))\n" + "if queueType == 'zset' then\n" + "\tlocal zsetMembers = redis.call('zrange', queueKey, '0', '-1')\n" + "\tfor k,member in pairs(zsetMembers) do \n" + "\t\tlocal score = redis.call('ZSCORE', queueKey, member)\n" + "\t\tlocal frequency = redis.call('HGET', freqKey, member)\n" + "\t\tif frequency ~= nil and score < now then\n" + "\t\t\t\tredis.call('zadd', queueKey, now, member)\n" + "\t\tend\n" + "\tend\n" + "end\n" + "return nil";

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants