Skip to content
forked from gojuukaze/YTask

YTask is an asynchronous task queue for handling distributed jobs in golang(go异步任务队列)

License

Notifications You must be signed in to change notification settings

fengqirui/YTask

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

57 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

YTask

YTask is an asynchronous task queue for handling distributed jobs in golang
golang异步任务/队列 框架

  • 中文文档 (Chinese document has more detailed instructions. If you know Chinese, read Chinese document)
  • En Doc
  • Github

install

go get github.com/gojuukaze/YTask/v2

architecture diagram

architecture_diagram

todo

  • save result
  • task retry
  • run multi group
  • more option in TaskCtl
  • support more type

doc

Quick Start

server demo

package main

import (
	"context"
	"github.com/gojuukaze/YTask/v2"
	"os"
	"os/signal"
	"syscall"
)
type User struct {
	Id   int
	Name string
}

func add(a int,b int)int {
    return a+b
}

func appendUser(user User, ids []int, names []string) []User {
	var r = make([]User, 0)
	r = append(r, user)
	for i := range ids {
		r = append(r, User{ids[i],names[i],})
	}
	return r
}

func main() {
	// For the server, you do not need to set up the poolSize
	// Server端无需设置poolSize,
	broker := ytask.Broker.NewRedisBroker("127.0.0.1", "6379", "", 0, 0)
	backend := ytask.Backend.NewRedisBackend("127.0.0.1", "6379", "", 0, 0)

	ser := ytask.Server.NewServer(
		ytask.Config.Broker(&broker),
		ytask.Config.Backend(&backend),
		ytask.Config.Debug(true),
		ytask.Config.StatusExpires(60*5),
		ytask.Config.ResultExpires(60*5),
	)

	ser.Add("group1", "add", add)
	ser.Add("group1", "append_user", appendUser)

	ser.Run("group1", 3)

	quit := make(chan os.Signal, 1)

	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	ser.Shutdown(context.Background())

}

client demo

package main

import (
	"fmt"
	"github.com/gojuukaze/YTask/v2"
	"github.com/gojuukaze/YTask/v2/server"
	"time"
)
type User struct {
	Id   int
	Name string
}
var client server.Client


func main() {
	// For the client, you need to set up the poolSize
	// 对于client你需要设置poolSize
	broker := ytask.Broker.NewRedisBroker("127.0.0.1", "6379", "", 0, 5)
	backend := ytask.Backend.NewRedisBackend("127.0.0.1", "6379", "", 0, 5)

	ser := ytask.Server.NewServer(
		ytask.Config.Broker(&broker),
		ytask.Config.Backend(&backend),
		ytask.Config.Debug(true),
		ytask.Config.StatusExpires(60*5),
		ytask.Config.ResultExpires(60*5),
	)

	client = ser.GetClient()

	// task add
	taskId, err := client.Send("group1", "add", 123, 44)
	_ = err
	result, err := client.GetResult(taskId, 2*time.Second, 300*time.Millisecond)
	_ = err

	if result.IsSuccess() {
		sum, err := result.GetInt64(0)
        // or
        var sum2 int
        err = result.Get(0, &sum2)
		if err != nil {
			fmt.Println(err)
		}
		fmt.Println("add(123,44) =", int(sum))
	} else {
		fmt.Println("result failure")
	}
    // task append user
	taskId, _ = client.Send("group1", "append_user", User{1, "aa"}, []int{322, 11}, []string{"bb", "cc"})
	_ = err
	result, _ = client.GetResult(taskId, 2*time.Second, 300*time.Millisecond)
	var users []User
    result.Get(0, &users)
    fmt.Println(users)

}

other example

Also take a look at example directory

cd example/v2
go run server/main.go 

go run send/main.go

usage

server

  • init
import "github.com/gojuukaze/YTask/v2"

ser := ytask.Server.NewServer(
		ytask.Config.Broker(&broker),
		ytask.Config.Backend(&backend),
		...
)

server config

Config require default code other
Broker * ytask.Config.Broker
Backend nil ytask.Config.Backend
Debug FALSE ytask.Config.Debug
StatusExpires 1day ytask.Config.StatusExpires "task status expires in ex seconds, -1:forever"
ResultExpires 1day ytask.Config.ResultExpires "task result expires in ex seconds, -1:forever"
  • StatusExpires, ResultExpires is not valid for Mongo backend, 0 means no storage, > 0 means permanent storage

add worker func

// group1 : group name is also the query name
// add : worker name 
// addFunc : worker func
ser.Add("group1","add",addFunc)

run and shutdown

// group1 : run group name
// 3 : number of worker goroutine
ser.Run("group1",3)

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
ser.Shutdown(context.Background())

You cannot run multiple groups with the same server.

ser:=ytask.Server.NewServer(...)
ser.Run("g1",1)
// panic
ser.Run("g2",1)

This feature is already under development

client

get client

import "github.com/gojuukaze/YTask/v2"

ser := ytask.Server.NewServer(
		ytask.Config.Broker(&broker),
		ytask.Config.Backend(&backend),
		...
)

client = ser.GetClient()

send msg

// group1 : group name
// add : worker name
// 12,33 ... : func args
// return :
//   - taskId : taskId
//   - err : error
taskId,err:=client.Send("group1","add",12,33)

// set retry count
taskId,err=client.SetTaskCtl(client.RetryCount, 5).Send("group1","add",12,33)

get result

// taskId :
// 3*time.Second : timeout
// 300*time.Millisecond : sleep time
result, _ := client.GetResult(taskId, 3*time.Second, 300*time.Millisecond)

// get worker func return
if result.IsSuccess(){
    // get worker func return
    a,err:=result.GetInt64(0)
    b,err:=result.GetBool(1)
    
    // or
    var a int
    var b bool
    err:=result.Get(0, &a)
    err:=result.Get(1, &b)

    // or
    var a int
    var b bool
    err:=result.Gets(&a, &b)
}

Warning!!!
Although YTask provides the ability to get results, don't rely on transitions.
If the backend error causes the result to not be saved, YTask will not retry again. Keep retrying will cause the task to fail to start or end.
If you need task results in particular, it is recommended that you save them yourself in the task function.

retry

default retry count is 3

there are 2 way to trigger retry

  • use panic
func add(a, b int){
    panic("xx")
}
  • use TaskCtl
func add(ctl *controller.TaskCtl,a, b int){
    ctl.Retry(errors.New("xx"))
    return
}

set retry count

  • in client
client.SetTaskCtl(client.RetryCount, 5).Send("group1", "retry", 123, 44)

disable retry

  • in server
func add(ctl *controller.TaskCtl,a, b int){
    ctl.SetRetryCount(0)
    return
}
  • in client
client.SetTaskCtl(client.RetryCount, 0).Send("group1", "retry", 123, 44)

broker

redisBroker

import "github.com/gojuukaze/YTask/v2"

// 127.0.0.1 : host
// 6379 : port
// "" : password
// 0 : db
// 10 : connection pool size. 
//      For server, if poolSize is 0, the pool size will be set automatically.
//      For client, you need to set up the poolSize by yourself
ytask.Broker.NewRedisBroker("127.0.0.1", "6379", "", 0, 10)

rabbitMqBroker

import "github.com/gojuukaze/YTask/v2"
// 127.0.0.1 : host
// 5672 : port
// guest : username
// guest : password

ytask.Broker.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest")

custom broker

type BrokerInterface interface {
    // get task
	Next(queryName string) (message.Message, error)
    // send task
	Send(queryName string, msg message.Message) error
	// Activate connection
	Activate()
	SetPoolSize(int)
	GetPoolSize()int
}

backend

redisBackend

import "github.com/gojuukaze/YTask/v2"

// 127.0.0.1 : host
// 6379 : port
// "" : password
// 0 : db
// 10 : connection pool size. 
//      For server, if poolSize is 0, the pool size will be set automatically.
//      For client, you need to set up the poolSize by yourself

ytask.Backend.NewRedisBackend("127.0.0.1", "6379", "", 0, 10)

memCacheBackend

import "github.com/gojuukaze/YTask/v2"

// 127.0.0.1 : host
// 11211 : port
// 10 : connection pool size. 

ytask.Backend.NewMemCacheBackend("127.0.0.1", "11211", 10)

mongoBackend

import "github.com/gojuukaze/YTask/v2"

// 127.0.0.1 : host
// 27017 : port
// "" : username
// "" : password
// "task": db
// "taks": collection

ytask.Backend.NewMongoBackend("127.0.0.1", "27017", "", "", "task", "task")

custom backend

type BackendInterface interface {
	SetResult(result message.Result, exTime int) error
	GetResult(key string) (message.Result, error)
	// Activate connection
	Activate()
	SetPoolSize(int)
	GetPoolSize() int
}

support type

Support all types what can be serialized to JSON

log

import (
"github.com/gojuukaze/YTask/v2/log"
"github.com/gojuukaze/go-watch-file")

// write to file
file,err:=watchFile.OpenWatchFile("xx.log")
if err != nil {
	panic(err)
}
log.YTaskLog.SetOutput(file)

// set level
log.YTaskLog.SetLevel(logrus.InfoLevel)

error

error type

const (
	ErrTypeEmptyQuery      = 1
	ErrTypeUnsupportedType = 2
	ErrTypeOutOfRange      = 3
	ErrTypeNilResult       = 4
	ErrTypeTimeOut         = 5
)

compare err

import 	"github.com/gojuukaze/YTask/v2/yerrors"
yerrors.IsEqual(err, yerrors.ErrTypeNilResult)

About

YTask is an asynchronous task queue for handling distributed jobs in golang(go异步任务队列)

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 100.0%