Skip to content

jt808服务端 单机[2核4G]并发10w+ 100%测试覆盖率 有多种例子支持二次开发(每日保存亿+经纬度[2核4G] 平台下发数据获取回复 主动安全协议 JT1078流媒体播放等)

License

Notifications You must be signed in to change notification settings

cuteLittleDevil/go-jt808

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

go-jt808

  • 本项目已更好支持二次开发为目标 可通过各种自定义事件去完成相应功能 常见案例如下
  1. jt1078视频 详情
jt808服务端 jt1078服务端 模拟器在2核4G腾讯云服务器
平台下发0x9101等指令 模拟器开始推流等动作
流媒体服务 语言 描述 说明
RTVS 不开源可使用 在线测试页面 http://49.234.235.7:17001 详情点击
LAL go 在线播放地址 http://49.234.235.7:8080/live/295696659617_1.flv 详情点击
sky-java java 需要部署后 HTTP请求 10秒内拉流 参考格式如下
http://222.244.144.181:7777/video/1001-1-0-0.live.mp4
详情点击
  1. 存储经纬度 详情
jt808服务端 模拟器 消息队列 数据库都运行在2核4G腾讯云服务器
测试每秒保存5000条的情况 约5.5小时保存了近1亿的经纬度
  1. 平台下发指令给终端 获取参数 立即拍摄
主动下发给设备指令 获取应答的情况
  1. 协议交互详情 代码参考
使用自定义模拟器 可以轻松生成测试用的报文 有详情描述
  1. 自定义协议扩展 代码参考
自定义附加信息处理 获取想要的扩展内容

  • 看飞哥的单机TCP百万并发 好奇有数据情况的表现 因此国庆准备试一试有数据的情况
  • 性能测试 单机[2核4G机器]并发10w+ 每日保存4亿+经纬度 详情
  • 支持JT808(2011/2013/209) JT1078(需要其他流媒体服务)
  • 支持主动安全扩展(苏标) 支持分包和自动补传
特点 描述
安全可靠 核心协议部分测试覆盖率100% 纯原生go实现(不依赖任何库)
简洁优雅 核心代码不到1000行 不使用任何锁 仅使用channel完成
易于扩展 方便二次开发 有JT1078流媒体对接 保存经纬度等案例

快速开始

package main

import (
	"github.com/cuteLittleDevil/go-jt808/service"
	"log/slog"
	"os"
)

func init() {
	logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		AddSource:   true,
		Level:       slog.LevelDebug,
		ReplaceAttr: nil,
	}))
	slog.SetDefault(logger)
}

func main() {
	goJt808 := service.New(
		service.WithHostPorts("0.0.0.0:8080"),
		service.WithNetwork("tcp"),
	)
	goJt808.Run()
}

  • 目前(2024-10-01)前的go语言版本个人觉得都不好因此都不推荐参考 推荐参考资料如下

参考资料

项目名称 语言 日期 Star 数 链接
JT808 C# 2024-10-01 534 JT808 C#
jt808-server Java 2024-10-01 1.4k+ JT808 Java

性能测试

  • java模拟器(QQ群下载 373203450)
  • go模拟器 详情点击

连接数测试

详情点击

  • 2台云服务器各开5w+客户端 总计10w+
服务端版本 场景 并发数 服务器配置 服务器使用资源情况 描述
v0.3.0 连接数测试 10w+ 2核4G 120%+cpu 1.7G内存 10.0.16.5上开启服务端和模拟器
10.0.16.14机器上开启模拟器

模拟经纬度存储测试

详情点击

  • save进程丢失了部分数据 channel队列溢出抛弃 (测试channel队列为100)
  • 保存1亿丢失826条 保存4.32亿丢失1216条(分两次测试)
服务端版本 客户端 服务器配置 服务使用资源情况 描述
v0.3.0 1w go模拟器 2核4G 35%cpu 180.4MB内存 每秒5000 一共保存经纬度1亿
实际保存99999174 成功率99.999%
服务 cpu 内存 描述
server 35% 180.4MB 808服务端
client 23% 196MB 模拟客户端
save 18% 68.8MB 存储数据服务
nats-server 20% 14.8MB 消息队列
taosadapter 37% 124.3MB tdengine数据库适配
taosd 15% 124.7MB tdengine数据库

使用案例

1. 协议处理

1.1 协议解析

func main() {
	t := terminal.New(terminal.WithHeader(consts.JT808Protocol2013, "1")) // 自定义模拟器 2013版本
	data := t.CreateDefaultCommandData(consts.T0100Register) // 生成的预值注册指令 0x0100
	fmt.Println(fmt.Sprintf("模拟器生成的[%x]", data))

	jtMsg := jt808.NewJTMessage()
	_ = jtMsg.Decode(data) // 解析固定请求头

	var t0x0100 model.T0x0100 // 把body数据解析到结构体中
	_ = t0x0100.Parse(jtMsg)
	fmt.Println(jtMsg.Header.String())
	fmt.Println(t0x0100.String())
}

部分输出 输出详情

模拟器生成的[7e010000300000000000010001001f006e63643132337777772e3830382e636f6d0000000000000000003736353433323101b2e2413132333435363738797e]
[0100] 消息ID:[256] [终端-注册]
消息体属性对象: {
        [0000000000110000] 消息体属性对象:[48]
        版本号:[JT2013]
        [bit15] [0]
        [bit14] 协议版本标识:[0]
        [bit13] 是否分包:[false]
        [bit10-12] 加密标识:[0] 0-不加密 1-RSA
        [bit0-bit9] 消息体长度:[48]
}
[000000000001] 终端手机号:[1]
[0001] 消息流水号:[1]
数据体对象:{
        终端-注册:[001f006e63643132337777772e3830382e636f6d0000000000000000003736353433323101b2e2413132333435363738]
        [001f] 省域ID:[31]
        [006e] 市县域ID:[110]
        [6364313233] 制造商ID(5):[cd123]
        [7777772e3830382e636f6d000000000000000000] 终端型号(20):[www.808.com]
        [37363534333231] 终端ID(7):[7654321]
        [01] 车牌颜色:[1]
        [b2e2413132333435363738] 车牌号:[测A12345678]
}

1.2 自定义协议扩展 (附加信息)

自定义解析扩展 0x33为例 关键代码如下

func (l *Location) Parse(jtMsg *jt808.JTMessage) error {
	l.T0x0200AdditionDetails.CustomAdditionContentFunc = func(id uint8, content []byte) (model.AdditionContent, bool) {
		if id == uint8(consts.A0x01Mile) {
			l.customMile = 100
		}
		if id == 0x33 {
			value := content[0]
			l.customValue = value
			return model.AdditionContent{
				Data:        content,
				CustomValue: value,
			}, true
		}
		return model.AdditionContent{}, false
	}
	return l.T0x0200.Parse(jtMsg)
}

func (l *Location) OnReadExecutionEvent(message *service.Message) {
	if v, ok := tmp.Additions[consts.A0x01Mile]; ok {
		fmt.Println(fmt.Sprintf("里程[%d] 自定义辅助里程[%d]", v.Content.Mile, tmp.customMile))
	}
	id := consts.JT808LocationAdditionType(0x33)
	if v, ok := tmp.Additions[id]; ok {
		fmt.Println("自定义未知信息扩展", v.Content.CustomValue, tmp.customValue)
	}
}

部分输出 输出详情

里程[11] 自定义辅助里程[100]
自定义未知信息扩展 32 32

1.3 平台下发给终端参数 (8104查询终端参数)

关键代码如下

	replyMsg := goJt808.SendActiveMessage(&service.ActiveMessage{
		Key:              phone,                           // 默认使用手机号作为唯一key 根据key找到对应终端的TCP链接
		Command:          consts.P8104QueryTerminalParams, // 下发的指令
		Body:             nil,                             // 下发的body数据 8104为空
		OverTimeDuration: 3 * time.Second,                 // 超时时间 设备这段时间没有回复则失败
	})
	var t0x0104 model.T0x0104
	if err := t0x0104.Parse(replyMsg.JTMessage); err != nil {
		panic(err)
	}
	fmt.Println(t0x0104.String())

部分输出 输出详情

数据体对象:{
        [0003] 应答消息流水号:[3]
        [5b] 应答参数个数:[91]
        终端-查询参数:

        {
                [0001]终端参数ID:1 终端心跳发送间隔,单位为秒(s)
                参数长度[4] 是否存在[true]
                [0000000a]参数值:[10]
        }
		...
        {
                [0110]终端参数ID:272 CAN总线ID单独采集设置:
                参数长度[8] 是否存在[true]
                [0000000000000101]参数值:[[0 0 0 0 0 0 1 1]]
        }
        未知终端参数id:[33 117 118 119 121 122 123 124]
}

2. 自定义保存经纬度

详情点击

  • 自定义实现0x0200的消息处理 把数据发送到nats 关键代码如下
type T0x0200 struct {
	model.T0x0200
}

func (t *T0x0200) OnReadExecutionEvent(message *service.Message) {
	var t0x0200 model.T0x0200
	if err := t0x0200.Parse(message.JTMessage); err != nil {
		fmt.Println(err)
		return
	}
	location := shared.NewLocation(message.Header.TerminalPhoneNo, t0x0200.Latitude, t0x0200.Longitude)
	if err := mq.Default().Pub(shared.SubLocation, location.Encode()); err != nil {
		fmt.Println(err)
		return
	}
}

func (t *T0x0200) OnWriteExecutionEvent(_ service.Message) {}
  • 使用自定义0x0200消息处理启动
	goJt808 := service.New(
		service.WithHostPorts("0.0.0.0:8080"),
		service.WithNetwork("tcp"),
		service.WithCustomHandleFunc(func() map[consts.JT808CommandType]service.Handler {
			return map[consts.JT808CommandType]service.Handler{
				consts.T0200LocationReport:      &T0x0200{},
			}
		}),
	)
	goJt808.Run()

3. jt808附件上传

4. jt1078相关

4.1 流媒体服务使用lal

详情点击

  • 把1078格式流转对应格式 放入lal服务中 核心代码参考
func (j *jt1078) createStream(name string) chan<- *Packet {
	...
	ch := make(chan *Packet, 100)
	go func(session logic.ICustomizePubSessionContext, ch <-chan *Packet) {
		for v := range ch {
				...
				switch v.Flag.PT {
				case PTG711A:
					tmp.PayloadType = base.AvPacketPtG711A
				case PTG711U:
					tmp.PayloadType = base.AvPacketPtG711U
				case PTH264:
				case PTH265:
					tmp.PayloadType = base.AvPacketPtHevc
				default:
					slog.Warn("未知类型",
						slog.Any("pt", v.Flag.PT))
				}
				if err := session.FeedAvPacket(tmp); err != nil {
					slog.Warn("session.FeedAvPacket",
						slog.Any("err", err))
				}
			}
		}
	}(session, ch)
}

4.2 流媒体服务使用rtvs

详情点击

	type Handler interface {
		Protocol() consts.JT808CommandType
		Parse(jtMsg *jt808.JTMessage) error
		Encode() []byte
	}
	var (
		handler  Handler
		overTime time.Duration
	)
	overTime = 3 * time.Second
	switch content[:4] {
	case "9101":
		handler = &model.P0x9101{}
	case "9102":
		handler = &model.P0x9102{}
	case "9201":
		handler = &model.P0x9201{}
		overTime = 5 * time.Second
	case "9202":
		handler = &model.P0x9202{}
	case "9205":
		handler = &model.P0x9205{}
		overTime = 10 * time.Second
	case "9206":
		handler = &model.P0x9206{}
	}
	if handler == nil {
		return nil, fmt.Errorf("unknown command: %s", content[:4])
	}
	if err := handler.Parse(jtMsg); err != nil {
		return nil, err
	}
	return &service.ActiveMessage{
		Key:              jtMsg.Header.TerminalPhoneNo,
		Command:          handler.Protocol(),
		Body:             handler.Encode(),
		OverTimeDuration: overTime,
	}, nil

协议对接完成情况

JT808 终端通讯协议消息对照表

序号 消息 ID 完成情况 测试情况 消息体名称 2019 版本 2011 版本
1 0x0001 终端通用应答
2 0x8001 平台-通用应答
3 0x0002 终端心跳
5 0x0100 终端注册 修改 被修改
4 0x8003 补传分包请求 被新增
6 0x8100 平台-注册应答
8 0x0102 终端鉴权 修改
9 0x8103 设置终端参数 修改且增加 被修改
10 0x8104 平台-查询终端参数
11 0x0104 查询终端参数应答
18 0x0200 位置信息汇报 增加附加信息 被修改
49 0x0704 定位数据批量上传 修改 被新增
51 0x0800 多媒体事件信息上传 被修改
52 0x0801 多媒体数据上传 修改 被修改
53 0x8800 平台-多媒体数据上传应答 被修改
54 0x8801 平台-摄像头立即拍摄命令 修改
55 0x0805 摄像头立即拍摄命令应答 修改 被新增

JT1078 扩展

序号 消息 ID 完成情况 测试情况 消息体名称
13 0x1003 终端上传音视频属性
14 0x1005 终端上传乘客流量
15 0x1205 终端上传音视频资源列表
16 0x1206 文件上传完成通知
17 0x9003 平台-查询终端音视频属性
18 0x9101 平台-实时音视频传输请求
19 0x9102 平台-音视频实时传输控制
20 0x9105 平台-实时音视频传输状态通知
21 0x9201 平台-下发远程录像回放请求
22 0x9202 平台-下发远程录像回放控制
23 0x9205 平台-查询资源列表
24 0x9206 平台-文件上传指令
25 0x9207 平台-文件上传控制

主动安全(苏标)扩展

序号 消息 ID 完成情况 测试情况 消息体名称
1 0x1210 报警附件信息消息
2 0x1211 文件信息上传
3 0x1212 文件上传完成消息
4 0x9208 报警附件上传指令
5 0x9212 文件上传完成消息应答

About

jt808服务端 单机[2核4G]并发10w+ 100%测试覆盖率 有多种例子支持二次开发(每日保存亿+经纬度[2核4G] 平台下发数据获取回复 主动安全协议 JT1078流媒体播放等)

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published