diff --git a/examples/breakout_impala_binding_test.yaml b/examples/breakout_impala_binding_test.yaml new file mode 100644 index 0000000..cdd4acf --- /dev/null +++ b/examples/breakout_impala_binding_test.yaml @@ -0,0 +1,37 @@ +alg_para: + alg_name: IMPALAOpt + alg_config: + train_per_checkpoint: 1 + prepare_times_per_train: 1 + BATCH_SIZE: 512 + + +env_para: + env_name: VectorAtariEnv + env_info: + name: BreakoutNoFrameskip-v4 + vision: False + vector_env_size: 1 + +agent_para: + agent_name: AtariImpalaOpt + agent_num : 1 + agent_config: + max_steps: 128 + complete_step: 10020000 + +model_para: + actor: + model_name: ImpalaCnnOpt + state_dim: [84,84,4] + input_dtype: uint8 + state_mean: 0.0 + state_std: 255.0 + action_dim: 4 + model_config: + LR: 0.0005 + sample_batch_step: 128 + grad_norm_clip: 40.0 + +env_num: 32 +speedup: 2 diff --git a/xt/framework/broker.py b/xt/framework/broker.py index 9fa1605..2a205df 100644 --- a/xt/framework/broker.py +++ b/xt/framework/broker.py @@ -496,10 +496,44 @@ def create_explorer(self, config_info): p = Process(target=explorer.start) p.start() - cpu_count = psutil.cpu_count() - if speedup and cpu_count > (env_num + start_core): + # updated balanced core binding + """ + Speed up training by setting 'speedup' through config.yaml + 0: System scheduling without core binding + 1: [default] Binding sequentially according to the core id and explore id + 2: Balanced core binding. + 3: Pipeline (to be updated) + """ + # cpu_count = psutil.cpu_count() + # if speedup and cpu_count > (env_num + start_core): + # _p = psutil.Process(p.pid) + # _p.cpu_affinity([start_core + env_id]) + + socket = 0 + with open('/proc/cpuinfo', 'r') as f: + for line in f: # number of sockets = maximum physical cpu id + if line.lower().startswith('physical'): # physical id : xxx + socket = max(socket, int(line.split()[-1])) + socket = socket + 1 + logical, physical = psutil.cpu_count(), psutil.cpu_count(logical=False) + + if speedup == 3: # pipeline: to be updated + pass + elif speedup == 2 and logical >= env_num: # balanced core binding + base = ((env_id // physical) % (logical // physical)) * physical + env = env_id % physical + cpu = env // socket + (env % socket) * (physical // socket) + base + _p = psutil.Process(p.pid) + _p.cpu_affinity([cpu]) + logging.info("speedup({:d}): binding env {:d} to CPU {:d}".format(speedup, env_id, cpu)) + elif speedup == 1 and logical > (env_num + start_core): # sequential core binding _p = psutil.Process(p.pid) _p.cpu_affinity([start_core + env_id]) + logging.info("speedup({:d}): binding env {:d} to CPU {:d}" + .format(speedup, env_id, start_core + env_id)) + else: # scheduled by os + logging.info("speedup({:d}): no binding".format(speedup)) + self.send_explorer_q.update({env_id: send_explorer}) self.explore_process.update({env_id: p})