Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix]: incr record add support for int32 indices; refactor embedding name to ids mapping #324

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
235 changes: 114 additions & 121 deletions docs/source/feature/odl_sample.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@
select aa as request_id, bb as user_id, cc as item_id, 'expose' as event_type,
ee as event_time, ff as scene
from user_expose_log;
end;
```

- project: datahub project
- topic: datahub topic
- subId: datahub订阅id, 每个flink sql任务需要单独创建订阅id, 否则会有冲突
- scene: extra fields, 可选字段
- scene: 附带信息, 可选字段

1. 样本Events聚合(OnlineSampleAggr):

- 上传资源包: [rec-realtime-0.8-SNAPSHOT.jar](http://easyrec.oss-cn-beijing.aliyuncs.com/deploy/rec-realtime-0.8-SNAPSHOT.jar)
- 上传资源包: [rec-realtime-1.0-SNAPSHOT.jar](http://easyrec.oss-cn-beijing.aliyuncs.com/deploy/rec-realtime-1.0-SNAPSHOT.jar)
![image.png](../../images/odl_events_aggr.png)

- 入口参数: com.aliyun.rec.feature.job.OnlineSampleEventsAggr
Expand All @@ -143,7 +144,7 @@
datahub.inputTopic: user_behavior_log
datahub.sinkTopic: odl_sample_aggr
datahub.projectName: odl_sample_preprocess
datahub.startInSecs: '1655571600'
datahub.startInSecs: -900

input.userid: user_id
input.itemid: item_id
Expand All @@ -152,9 +153,11 @@
input.event-duration: play_time
input.event-ts: ts
input.expose-event: expose
input.event-extra: 'scene'
input.wait-positive-secs: '900'
state.max-event-num: '100'
input.event-extra: scene
input.wait-positive-secs: 900
input.subId: 167XXXXXXX
state.ttl: 86400
debug-mode: false
```

- datahub参数配置
Expand All @@ -165,20 +168,26 @@
- inputTopic: 读取的datahub topic
- sinkTopic: 写入的datahub topic
- startInSecs: 开始读取的位点,单位是seconds
- input: datahub schema配置
- input: datahub输入topic schema配置
- userid: userid字段名
- itemid: itemid字段名
- request-id: request_id字段名
- event-duration: event持续时间
- event-duration: event持续时间, 可选配置
- 如不需要构造时长label,可以不设置
- event-type: event类型字段
- event-ts: event发生时间字段(seconds)
- expose-event: 曝光事件类型
- 曝光事件延迟不再下发
- 其它事件延迟会补充下发
- event-extra: 其它event相关字段,多个字段以","分割
- wait-positive-secs: 等待正样本的时间, 单位是seconds
- 等待时间越长label越准确, 但是消耗的内存也越大, 样本下发延迟也越大
- state:
- max-event-num: 只存最新的n个event, 默认n是Integer.MAX_VALUE
- ttl: event在flink中保存的时间, 一般设成1天比较准确, 如果内存不够,可以视情况设小
- state的存储格式是MapState\<String, EventData>, key是event_type, value是event信息
- 内部实现有两个辅助state:
- timer state: 记录timer是否触发
- event number state: 记录该request_id, item_id下面发生的event数目
- debug-mode: 默认是false, 为true时, 打印详细的event信息和timer注册以及触发的信息
- datahub topic schema:
- inputTopic: user_behavior_log
Expand All @@ -194,8 +203,8 @@
- events数据格式:
```json
[
{"duration":6493,"eventTime":1659667790,"eventType":"play","properties":{"scene":"main"}},
{"duration":6259,"eventTime":1659667796,"eventType":"play","properties":{"scene":"main"}}
{"duration":6493,"eventTime":1659667790,"eventType":"play","scene":"main"},
{"duration":6259,"eventTime":1659667796,"eventType":"play","scene":"main"}
]
```

Expand All @@ -222,119 +231,103 @@
);
```

1. 样本join全埋点特征

```sql
create temporary table odl_sample_with_lbl(
`request_id` STRING,
`user_id` STRING,
`item_id` STRING,
`ln_play_time` DOUBLE,
`is_valid_play` BIGINT,
`min_ts` BIGINT,
`max_ts` BIGINT,
`ts` AS TO_TIMESTAMP(
FROM_UNIXTIME(if (min_ts is not null and min_ts < UNIX_TIMESTAMP(),
min_ts, UNIX_TIMESTAMP()), 'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
) WITH (
'connector' = 'datahub',
'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/',
'project' = 'easy_rec_proj',
'topic' = 'odl_sample_with_lbl',
'subId' = '165519436817538OG0',
'accessId' = 'LTAIxxx',
'accessKey' = 'xxxxxxxxx',
'startTime' = '2022-07-02 14:30:00'
);

create temporary table odl_callback_log(
`request_id` STRING,
`request_time` BIGINT,
`module` STRING,
`user_id` STRING,
`item_id` STRING,
`scene` STRING,
`generate_features` STRING,
`ts` AS
TO_TIMESTAMP(FROM_UNIXTIME(if(request_time is not null and request_time < UNIX_TIMESTAMP(),
request_time, UNIX_TIMESTAMP()), 'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
) WITH (
'connector' = 'datahub',
'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/',
'project' = 'easy_rec_proj',
'topic' = 'odl_callback_log',
'subId' = '16567769418786B4JH',
'accessId' = 'LTAIxxx',
'accessKey' = 'xxxxxx'
'startTime' = '2022-07-02 14:30:00'
);


create temporary view sample_view as
select a.request_id, a.user_id, a.item_id, a.ln_play_time, a.is_valid_play, feature, b.request_time
from odl_sample_with_lbl a
inner join (
select * from (
select request_id, item_id, request_time, generate_features as feature, ts,
row_number() over(partition by request_id, item_id order by proctime() asc) as rn
from odl_callback_log
where `module` = 'item' and (generate_features is not null and generate_features <> '')
) where rn = 1
) b
on a.request_id = b.request_id and a.item_id = b.item_id
where a.ts between b.ts - INTERVAL '30' SECONDS and b.ts + INTERVAL '30' MINUTE;
```

- create temporary table注意事项:
- ts作为watermark需要限制小于当前时间, 防止因为异常的timestamp导致watermark混乱
- temporary table可以只列举需要的字段,不必枚举所有字段
- datahub connector更多参数请参考[文档](https://help.aliyun.com/document_detail/177534.html)
- kafka connector参考[文档](https://help.aliyun.com/document_detail/177144.html)
- odl_callback_log需要做去重, 防止因为重复调用造成样本重复
- flink配置开启ttl(millisecond), 控制state大小:
```sql
table.exec.state.ttl: '2400000'
```
- ttl(miliseconds)的设置考虑两个因素:
- odl_sample_with_lbl相对请求时间request_time的延迟
- ttl \< 相对延迟, 就会有样本丢失
- 统计相对延迟:
- 将odl_sample_with_lbl / odl_callback_log落到MaxCompute
- 按request_id join 计算ts的差异
- ttl越大state越大, 保存checkpoint时间越长, 性能下降
- 存储引擎开启gemini kv分离(generate_features字段值很大):
1. label join 特征

- 上传资源包: [rec-realtime-1.0-SNAPSHOT.jar](http://easyrec.oss-cn-beijing.aliyuncs.com/deploy/rec-realtime-1.0-SNAPSHOT.jar)
![image.png](../../images/odl_events_aggr.png)

- 入口参数: com.aliyun.rec.feature.job.SampleFeatureJoinJob

- flink配置:

```sql
state.backend.gemini.kv.separate.mode: GLOBAL_ENABLE
state.backend.gemini.kv.separate.value.size.threshold: '500'
datahub.endpoint: 'http://dh-cn-hangzhou-int-vpc.aliyuncs.com/'
datahub.accessId: LTAIxxxxxxxxx
datahub.accessKey: Jdqtxxxxxxxx
datahub.projectName: xxx_proj
callback.table_name: pairec_callback_log
callback.request_id: request_id
callback.user_id: user_id
callback.item_id: item_id
callback.user_fea: user_features
callback.generate_fea: generate_features
callback.request_time: request_time
callback.subId: 1671540xxxxxxxxxxx
sample.table_name: odl_sample_with_lbl
sample.request_id: request_id
sample.user_id: user_id
sample.item_id: style_id
sample.event_ts: min_ts
sample.labels: 'ln_play_time:double,is_valid_play:bigint'
sample.subId: 16715xxxxxxxxxxxxx
output.table_name: odl_sample_with_feature_and_lbl
datahub.startInSecs: '-600'
timer.max_wait_ts: '300'
state.ttl: '2400'
state.user_fea_ttl: '7200'
eas.end_point: 13010xxxxxxxxxxx.vpc.cn-beijing.pai-eas.aliyuncs.com
eas.model_name: rank_callback_public
eas.token: YTcwZGU1ZTxxxxxxx
taskmanager.memory.network.max: 64mb
```

1. 实时样本写入Datahub / Kafka

```sql
create temporary table odl_sample_with_fea_and_lbl(
`request_id` string,
`user_id` string,
`item_id` string,
`ln_play_time` double,
`is_valid_play` bigint,
`feature` string,
`request_time` bigint
) WITH (
'connector' = 'datahub',
'endPoint' = 'http://dh-cn-beijing-int-vpc.aliyuncs.com/',
'project' = 'odl_sample',
'topic' = 'odl_sample_with_fea_and_lbl',
'subId' = '1656xxxxxx',
'accessId' = 'LTAIxxxxxxx',
'accessKey' = 'Q82Mxxxxxxxx'
);
insert into odl_sample_with_fea_and_lbl
select * from sample_view;
```

- subId: datahub subscription id
- datahub参数配置
- accessId: 鉴权id
- accessKey: 鉴权secret
- projectName: 项目名称
- endpoint: 使用带vpc的endpoint
- startInSecs: 开始读取的位点,单位是seconds
- sinkTopic: 输出表配置
- callback topic配置: 样本特征回流埋点配置
- table_name: 样本特征回流topic
- request_id: 请求id, string类型
- user_id: 用户id, string类型
- item_id: 商品id, string类型
- user_fea: 用户特征
- generate_fea: fg之后生成的特征
- request_time: 请求时间, bigint类型, 用于设置watermark
- subId: 订阅Id, 注意不要和其它的任务重复
- sample topic配置
- table_name: 样本topic
- request_id: 请求id, string类型
- user_id: 用户id, string类型
- item_id: 商品id, string类型
- event_ts: 样本时间, 用于设置watermark
- labels: 样本label
- 格式为label_name:label_type,多个label之间用","分割
- 也可以放一些其它要写入join的表的列, 如comments等
- subId: datahub订阅id
- output topic配置:
- table_name: 输出topic
- request_id: 请求id, 可选配置, 默认和sample.request_id一致
- user_id: 用户id, 可选配置, 默认和sample.user_id一致
- item_id: 商品id, 可选配置, 默认和sample.item_id一致
- features: 样本特征, 可选配置, 默认: "features"
- subId: datahub订阅id
- timer.max_wait_ts: sample到了之后, 等待特征到达的时间
- 等待时间越长,鲁棒性越好,但是消耗的内存越多
- state.ttl: 单位seconds
- generate_feature在state中的保留时间
- 一般要大于timer.max_wait_ts, 否则join成功率比较低
- 越大join的成功率越高, 但是消耗的内存也越多
- state.user_fea_ttl: 单位seconds
- 如果内存不够, 同时又配置了callback_eas, 可以将user_fea存储的久一点
- 当generate_feature过期之后, 仍然可以通过user_fea请求eas获得完整的样本特征
- callback_eas: 针对延迟的样本, 是否需要重新请求eas获得特征, 默认是true
- eas: 如果callback_eas是true, 需要设置eas相关参数
- end_point: 请求url
- model_name: 模型名称
- token: 鉴权token
- debug-mode: 可选配置, 默认是false
- 设置成true打印调试信息, 排查join问题
- 任务稳定后, 建议关闭, 避免影响性能
- taskmanager.memory.network.max: flink系统配置, 减少network的内存消耗

- 备选方案: 也可以使用flink sql实现label和feature的join

- [参考文档](./odl_sample_join_sql.md)
- 优点:更灵活
- 缺点: 性能不高, 内存消耗大

### 数据诊断

Expand Down
Loading