Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

我看作者实验的时候用的开源的DS2代码,有没有发现DS2代码每次算子的true_process_rates是累积的,拿来作为算子并行度对应处理能力的观测值,符合预期吗?感觉不太对,求解答一下~ #2

Open
why520it opened this issue Aug 15, 2024 · 10 comments

Comments

@why520it
Copy link

在parse.rs里:
fn update_rates(topology: &mut Topology, mut logs: HashMap<OperatorId,HashMap<OperatorInstanceId,Vec<Log>>>, epoch: Epoch) { for (op_id,instance_logs) in logs.iter_mut() { // Aggregate rates per logical operator let &(idx,_) = topology.dictionary.get(op_id).expect("Operator not found in dictionary."); match topology.logical_graph.node_weight_mut(idx) { Some(ref mut op_info) => { let op_logs = &mut op_info.rates; for (_,mut logs) in instance_logs.iter_mut() { // Make sure the given logs correspond to a single epoch, i.e. the given one assert_eq!(logs.len(),1); let mut ep = epoch; for &(_,true_proc_rate,true_out_rate,obs_proc_rate,obs_out_rate) in logs.iter() {// Add given rates to the existing ones (if any) let op_log = op_logs.entry(ep).or_insert_with(|| Rates::default()); op_log.0 += true_proc_rate; op_log.1 += true_out_rate; op_log.2 += obs_proc_rate; op_log.3 += obs_out_rate; ep += 1; } } } None => panic!("Operator not found in topology.") }; } }

@why520it
Copy link
Author

这样每个算子的并行度对应的处理能力,一直是累增的

@ljqcodelove
Copy link
Owner

Hi, 不好意思最近因为在整理vldb 2024 pre的时候才看到了github,首先这个函数里
op_logs 是rust 的 HashMap
op_logs.entry(ep) 意味着为当前的 ep 所对应的 rate(四元 tuples,分别对应 true_proc_rate, true_out_rate, obs_proc_rate, obs_out_rate)
那么此时注意看第一次操作这个 hashMap 一定会空,那么此时其实是对应的 Rates的 default
所以加上了 true_out_rate
那么接下来 ep 自行了自增操作,意味着下次一定会新取一个新的 key ,不会对老的key进行操作
那么在 scaling.rs 里可以看到 ds2 是这样操作的
let optimal_instances_per_epoch = input_rate/(true_processing_rate[epoch as usize].1 / op_instances as f64);
所以这里只是为了取最新的 epoch 的值,对应的其实并不是累加值
调优的过程中由于 https://github.com/strymon-system/ds2/blob/master/controller/src/bin/manager.rs 213 行
let epoch = &filepath[idx+1..];
所以 epoch 保证是递增的

@ljqcodelove
Copy link
Owner

另外在 https://github.com/strymon-system/ds2/blob/master/controller/src/bin/manager.rs
297 行中
let _ = Command::new("rm")
.arg("-r")
.arg(metrics_repo_path.to_str().unwrap())
.output()
.expect("Failed to remove log files.");

保证了每次删除 log,即以前的 log 文件不会存在,那么 epoch 就会保证从 0 开始了

@why520it
Copy link
Author

why520it commented Aug 27, 2024

@ljqcodelove :在进行Reconfiguring后,是会把metrics_repo_path里面的log删除,然后epoch又是从0开始的,但是在[https://github.com/strymon-system/ds2/blob/master/controller/src/bin/manager.rs]:221行的update_flink_rates函数,会不断的设置log里的rates进去,之前为0的epoch的rates已经设置到了topo中,在267行后Reconfiguring之后这些topo中的rates没有清0。 随后restart后然后epoch又是从0开始,然后又会调用. 199行的 update_flink_rates->update_rates函数,完成累加的操作,这样除了第一次收集的rates不是累积的,后面其实都是累积的好像。

@ljqcodelove
Copy link
Owner

@why520it 我当时因为在腾讯机器下有文件权限问题 rust 监听权限问题没有按照 ds2 的代码去跑
详见这个 issue strymon-system/ds2#7
所以自己魔改了一下 DS2
https://github.com/ljqcodelove/ds2/tree/44b2d530636d101fca6a3e86bfaed5763fd7570f
监听了他父亲所在的目录
在这个 repo 中
由于各中原因 我每次调用都是调用 https://github.com/ljqcodelove/ds2/blob/44b2d530636d101fca6a3e86bfaed5763fd7570f/controller/start.sh
然后要改的并发度我会在 https://github.com/ljqcodelove/ds2/blob/44b2d530636d101fca6a3e86bfaed5763fd7570f/controller/examples/topology/flink_query11_topology.csv
中用脚本修改
然后因为我start 每次都是调用 manager 新的,所以好像没有遇到你说的这个问题
实际上在开源 DS2 的代码中:
/// * No previously recorded rates are cleared; rates in logs are simply added to the existing ones (if any)
fn update_rates(topology: &mut Topology, mut logs: HashMap<OperatorId,HashMap<OperatorInstanceId,Vec>>, epoch: Epoch)
他明确说到确实会累加,不过我理解最新的版本可能就每次update的时候先调用一下 topology.rs 里的 clear 清空就好了
我有点不太清楚他这里为什么要进行简单累加,不过你按照我的实验方式的话(每次都会重新启动 manager.rs,所以我的 op_info 每次都是初始化的)当时就是我提给 ds2 的 issue,我因为按照他们的方式运行不成功,我当时的 ds2 只会运行到给出推荐并发度就不进行调优了,所以我就自己绕了过去,他们也没有fix我这个issue(后来可能是文件权限问题,当时不在腾讯内部机器而是在腾讯云租了机器以后,ds2就会跑通,但应该会遇到你的这个问题),不过你如果按照我的方式就不会遇到这个问题 hh.

@ljqcodelove
Copy link
Owner

@why520it 因为当时 ds2 我无法连续启动,所以我对于每个任务都手动写了脚本
辛酸泪都在 https://github.com/ljqcodelove/ds2/tree/44b2d530636d101fca6a3e86bfaed5763fd7570f/flink-scaling-scripts 这个文件夹里
所以我之前一直无法 get 到为什么会累加,因为我的实验方式保证了每次都是初始化一个新的 topo 以及 新的 log

@ljqcodelove
Copy link
Owner

当然类似 ds2.toml 里我会各种设置
activation_time = 100 其实这个是不合理的,但是当时是为了观测噪声 所以如果你用我的 repo 的话,ds2.toml 需要自己设置哈,
我看了确实,如果我按照他的调用方法是会累加的,但是我当时每次都会初始化一个 manager,所以没有遇到这个问题。

@ljqcodelove
Copy link
Owner

这个项目是 腾讯犀牛鸟基金项目,ContTune部署在了腾讯TEG Oceanus,每天调k级别的任务,因为bupt是国防老八的关系,所以我们的作者list并没有出现腾讯,落地在腾讯+Flink1.13,flink1.7,flink1.9(现在支持了最新版本的flink)的代码中,由于 Flink 主要是 Scala 和 Java,所以 DS2 那块是用 java 改写的,因此线上代码也不会遇到这个 issue。

@why520it
Copy link
Author

@ljqcodelove 十分感谢你提供的DS2,我看了一下 我这边也是为每个任务都手动写了脚本,确实很痛苦哈哈哈。
另外悄悄地问一下大佬,ContTune相关的代码是否方便还可以提供一些,想进行参考和实验对比😄。我看你这边开源提供的GP比较简单,要是能有更详细的就好了,包括给定process_rate计算并行度这块🙏

@ljqcodelove
Copy link
Owner

@why520it 其实 ContTune 真的很简单,当你能用一个 GP 拟合出并发度和处理能力的时候,对于一个输入 input rate,你只要 for 循环并发度从小到大找到第一个 GP 的mean 均值(非UCB)大于等于这个input rate 的并发度,就可以了

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants