Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kiq().with_labels(delay=X) in documents not a valid invocation? #279

Open
pahrohfit opened this issue Jan 25, 2024 · 5 comments
Open

kiq().with_labels(delay=X) in documents not a valid invocation? #279

pahrohfit opened this issue Jan 25, 2024 · 5 comments

Comments

@pahrohfit
Copy link

The documentation for Kicker shows .with_labels(delay=1):

async def main():
    # This task was initially assigned to broker,
    # but this time it is going to be sent using
    # the second broker with additional label `delay=1`.
    task = await my_async_task.kicker().with_broker(second_broker).with_labels(delay=1).kiq()
    print(await task.get_result())

But this doesn't appear to be a valid invocation of taskiq, just purely an example with a label? This code, when set to delay=600 executes from the Worker immedately, rather than after a delay.

Is the intention to use the Scheduler to achieve this functionality? If so, the docs should probably remove delay in favor of something else as a generic, since delay seems to only be part of cli.scheduler.run.delayed_send() -- or better yet, maybe delay should become a standard part of the kiq() invocation to remove the dependancies on a single running Scheduler if not looking for anything remotely crontab based and light weight, and is a cleaner resolution to #187 ?

@reuben-dutton
Copy link

I can't speak to the intent with regards to Scheduler, but the delay label has a use in the taskiq-aio-pika plugin library here, which is only relevant if you intend on using RabbitMQ as a message broker.

@pahrohfit
Copy link
Author

I can't speak to the intent with regards to Scheduler, but the delay label has a use in the taskiq-aio-pika plugin library here, which is only relevant if you intend on using RabbitMQ as a message broker.

Ahhhh ... just read though it and see that its expected to be implemented at the broker layer.

@s3rius
Copy link
Member

s3rius commented Jan 30, 2024

If you want to have delayed tasks, I'd suggest you to use scheduler and schedule task by time. The scheduler is compatible with any broker but it should run as a separate service.

Here's the discussion with how you can use it. https://github.com/orgs/taskiq-python/discussions/275 I will update docs later.

@Arutemu64
Copy link

Do I get it right that scheduler polls sources once a minute so if I want to schedule a new task with a few seconds delay using RedisScheduleSource it can take up to a minute for scheduler to send it to workers? Doesn't sound very good, I hope I'm wrong.

@s3rius
Copy link
Member

s3rius commented Feb 12, 2024

No, you're correct. It will take up to minute for a default scheduler to send a task. If you want to add delay for few seconds I'd ask you to do something like this:

import asyncio
from typing import Any, Coroutine

async def _sub_delay(seconds: int, future: Coroutine[Any, Any, Any]):
    await asyncio.sleep(seconds)
    await future


def delayed_await(seconds: int, future: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
    loop = asyncio.get_running_loop()
    return loop.create_task(_sub_delay(seconds, future))

This thing allows you to create small delays before sending tasks on client's side without awaiting them. But be aware that such technique is good only for small delays.

The usage is like this:

delayed_await(3, my_task.kiq("arg1", 2))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants