Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Refactor FedEx & Multi-obj optimizer & scripts #518

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
6f2c104
add yaml for fedex in PubMed
rayrayraykk Jan 4, 2023
894c7c3
init commit for hpo scripts
rayrayraykk Jan 6, 2023
eaf93cc
add run bash
rayrayraykk Jan 6, 2023
8bbc209
add nohup
rayrayraykk Jan 6, 2023
f6724e2
add scripts for learning from scratch
rayrayraykk Jan 6, 2023
17bf4b1
add scripts for learning from scratch
rayrayraykk Jan 6, 2023
256912c
add scripts for run alpha
rayrayraykk Jan 9, 2023
ae3c388
fix minor bugs
rayrayraykk Jan 9, 2023
c6a19db
enable client_cfg
rayrayraykk Jan 9, 2023
f4bfe3c
add run for phpo
rayrayraykk Jan 9, 2023
37d8142
add TODO
rayrayraykk Jan 9, 2023
11796e8
add attack for fedex
rayrayraykk Jan 11, 2023
7800fff
fix minor bugs
rayrayraykk Jan 11, 2023
6513551
rm print
rayrayraykk Jan 11, 2023
be46716
rm print
rayrayraykk Jan 11, 2023
d5f1901
add draw scipts
rayrayraykk Jan 12, 2023
cf17681
remove extra line
rayrayraykk Jan 12, 2023
1b899cc
add suffix
rayrayraykk Jan 12, 2023
a434410
fix scripts
rayrayraykk Jan 12, 2023
700c2b9
add space
rayrayraykk Jan 12, 2023
56d171d
modify device num
rayrayraykk Jan 12, 2023
f09fc97
Merge branch 'alibaba:master' into refactor_fedex
rayrayraykk Jan 13, 2023
55a26f5
refactor fedex
rayrayraykk Jan 13, 2023
5a16d57
update interface
rayrayraykk Jan 13, 2023
a08cef9
fix minor bugs
rayrayraykk Jan 13, 2023
0c67427
add config_id
rayrayraykk Jan 13, 2023
2db5c64
add info to show
rayrayraykk Jan 13, 2023
1f62e01
Merge branch 'alibaba:master' into refactor_fedex
rayrayraykk Jan 16, 2023
3cc47b7
fix yaml for hpo
rayrayraykk Jan 16, 2023
4b0b52d
roll back
rayrayraykk Jan 16, 2023
308b0b5
add analisis
rayrayraykk Jan 16, 2023
6e827e1
add analisis
rayrayraykk Jan 16, 2023
d0bf037
add learn_from_scratch
rayrayraykk Jan 16, 2023
6cc10d8
add scipts
rayrayraykk Jan 16, 2023
58d3af4
update scripts
rayrayraykk Jan 17, 2023
81482d0
add run
rayrayraykk Jan 17, 2023
1499e0b
add run_from_scatch
rayrayraykk Jan 18, 2023
f2a88b9
remove empty file
rayrayraykk Jan 18, 2023
1a7862d
add multi-obj
rayrayraykk Jan 19, 2023
b7f7415
Merge branch 'alibaba:master' into dev_multi
rayrayraykk Feb 10, 2023
86a6e84
fix yaml
rayrayraykk Feb 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ def objective(trial, benchmark, valid_budgets, configspace):
cfg.benchmark.algo,
device=cfg.benchmark.device)
sampler = TPESampler(seed=cfg.optimizer.seed)
study = optuna.create_study(direction='minimize', sampler=sampler)
if cfg.optimizer.type == 'tpe_md':
pruner = MedianPruner()
sh_iters = precompute_sh_iters(cfg.optimizer.min_budget,
Expand All @@ -124,7 +123,9 @@ def objective(trial, benchmark, valid_budgets, configspace):
]
else:
raise NotImplementedError

study = optuna.create_study(direction='minimize',
sampler=sampler,
pruner=pruner)
study.optimize(func=partial(
objective,
benchmark=benchmark,
Expand Down
2 changes: 1 addition & 1 deletion benchmark/FedHPOBench/scripts/cnn/cifar10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ data:
splits: [0.8,0.2,0.0]
batch_size: 32
num_workers: 0
transform: [['ToTensor'], ['Normalize', {'mean': [0.1307], 'std': [0.3081]}]]
transform: [['ToTensor'], ['Normalize', {'mean': [0.4914, 0.4822, 0.4465], 'std': [0.2470, 0.2435, 0.2616]}]]
args: [{'download': True}]
splitter: 'lda'
splitter_args: [{'alpha': 0.5}]
Expand Down
2 changes: 1 addition & 1 deletion benchmark/FedHPOBench/scripts/cnn/femnist.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ data:
splits: [0.6,0.2,0.2]
batch_size: 16
subsample: 0.05
transform: [['ToTensor'], ['Normalize', {'mean': [0.1307], 'std': [0.3081]}]]
transform: [['ToTensor'], ['Normalize', {'mean': [0.9637], 'std': [0.1592]}]]
num_workers: 0
model:
type: convnet2
Expand Down
2 changes: 1 addition & 1 deletion benchmark/FedHPOBench/scripts/cnn/femnist_dp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ data:
splits: [0.6,0.2,0.2]
batch_size: 16
subsample: 0.05
transform: [['ToTensor'], ['Normalize', {'mean': [0.1307], 'std': [0.3081]}]]
transform: [['ToTensor'], ['Normalize', {'mean': [0.9637], 'std': [0.1592]}]]
num_workers: 0
model:
type: convnet2
Expand Down
205 changes: 117 additions & 88 deletions federatedscope/autotune/algos.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
from federatedscope.core.auxiliaries.worker_builder import get_client_cls, \
get_server_cls
from federatedscope.core.auxiliaries.runner_builder import get_runner
from federatedscope.core.configs.yacs_config import CfgNode
from federatedscope.autotune.utils import parse_search_space, \
config2cmdargs, config2str, summarize_hpo_results, log2wandb
config2cmdargs, config2str, summarize_hpo_results, log2wandb, \
flatten2nestdict

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,14 +72,17 @@ def get_scheduler(init_cfg, client_cfgs=None):
client_cfgs: client-specific configuration
"""

if init_cfg.hpo.scheduler in [
'sha', 'rs', 'bo_kde', 'bohb', 'hb', 'bo_gp', 'bo_rf'
]:
scheduler = SuccessiveHalvingAlgo(init_cfg, client_cfgs)
# TODO: fix wrap_sha
support_optimizer = [
'sha', 'rs', 'bo_kde', 'hb', 'bohb', 'wrap_rs', 'wrap_bo_kde',
'wrap_hb', 'wrap_bohb', 'bo_gp', 'bo_rf', 'wrap_bo_gp', 'wrap_bo_rf',
'multi'
]
assert init_cfg.hpo.scheduler in support_optimizer, \
f'`init_cfg.hpo.scheduler` must be one of {support_optimizer}.'
scheduler = SuccessiveHalvingAlgo(init_cfg, client_cfgs)
# elif init_cfg.hpo.scheduler == 'pbt':
# scheduler = PBT(init_cfg)
elif init_cfg.hpo.scheduler.startswith('wrap', client_cfgs):
scheduler = SHAWrapFedex(init_cfg)
return scheduler


Expand All @@ -100,6 +105,16 @@ def __init__(self, cfg, client_cfgs=None):
os.makedirs(self._cfg.hpo.working_folder, exist_ok=True)
self._search_space = parse_search_space(self._cfg.hpo.ss)

# Convert to client_cfg
if self._cfg.hpo.personalized_ss:
# Do not support wrap_scheduler
client_num = self._cfg.federate.client_num
ss_client = CS.ConfigurationSpace()
for i in range(1, client_num + 1):
ss_client.add_configuration_space(f'client_{i}',
self._search_space,
delimiter='.')
self._search_space = ss_client
self._init_configs = self._setup()

logger.info(self._init_configs)
Expand Down Expand Up @@ -159,7 +174,14 @@ def _evaluate(self, configs):
thread_results[available_worker].clear()

trial_cfg = self._cfg.clone()
trial_cfg.merge_from_list(config2cmdargs(config))
if self._cfg.hpo.personalized_ss:
if isinstance(self._client_cfgs, CS.Configuration):
self._client_cfgs.merge_from_list(
config2cmdargs(config))
else:
self._client_cfgs = CfgNode(flatten2nestdict(config))
else:
trial_cfg.merge_from_list(config2cmdargs(config))
flags[available_worker].clear()
trial = TrialExecutor(i, flags[available_worker],
thread_results[available_worker],
Expand All @@ -185,7 +207,14 @@ def _evaluate(self, configs):
perfs = [None] * len(configs)
for i, config in enumerate(configs):
trial_cfg = self._cfg.clone()
trial_cfg.merge_from_list(config2cmdargs(config))
if self._cfg.hpo.personalized_ss:
if isinstance(self._client_cfgs, CS.Configuration):
self._client_cfgs.merge_from_list(
config2cmdargs(config))
else:
self._client_cfgs = CfgNode(flatten2nestdict(config))
else:
trial_cfg.merge_from_list(config2cmdargs(config))
results = make_trial(trial_cfg, self._client_cfgs)
key1, key2 = trial_cfg.hpo.metric.split('.')
perfs[i] = results[key1][key2]
Expand Down Expand Up @@ -327,85 +356,85 @@ def _generate_next_population(self, configs, perfs):
return next_population


class SHAWrapFedex(SuccessiveHalvingAlgo):
"""This SHA is customized as a wrapper for FedEx algorithm."""
def _make_local_perturbation(self, config):
neighbor = dict()
for k in config:
if 'fedex' in k or 'fedopt' in k or k in [
'federate.save_to', 'federate.total_round_num', 'eval.freq'
]:
# a workaround
continue
hyper = self._search_space.get(k)
if isinstance(hyper, CS.UniformFloatHyperparameter):
lb, ub = hyper.lower, hyper.upper
diameter = self._cfg.hpo.table.eps * (ub - lb)
new_val = (config[k] -
0.5 * diameter) + np.random.uniform() * diameter
neighbor[k] = float(np.clip(new_val, lb, ub))
elif isinstance(hyper, CS.UniformIntegerHyperparameter):
lb, ub = hyper.lower, hyper.upper
diameter = self._cfg.hpo.table.eps * (ub - lb)
new_val = round(
float((config[k] - 0.5 * diameter) +
np.random.uniform() * diameter))
neighbor[k] = int(np.clip(new_val, lb, ub))
elif isinstance(hyper, CS.CategoricalHyperparameter):
if len(hyper.choices) == 1:
neighbor[k] = config[k]
else:
threshold = self._cfg.hpo.table.eps * len(
hyper.choices) / (len(hyper.choices) - 1)
rn = np.random.uniform()
new_val = np.random.choice(
hyper.choices) if rn <= threshold else config[k]
if type(new_val) in [np.int32, np.int64]:
neighbor[k] = int(new_val)
elif type(new_val) in [np.float32, np.float64]:
neighbor[k] = float(new_val)
else:
neighbor[k] = str(new_val)
else:
raise TypeError("Value of {} has an invalid type {}".format(
k, type(config[k])))

return neighbor

def _setup(self):
# self._cache_yaml()
init_configs = super(SHAWrapFedex, self)._setup()
new_init_configs = []
for idx, trial_cfg in enumerate(init_configs):
arms = dict(("arm{}".format(1 + j),
self._make_local_perturbation(trial_cfg))
for j in range(self._cfg.hpo.table.num - 1))
arms['arm0'] = dict(
(k, v) for k, v in trial_cfg.items() if k in arms['arm1'])
with open(
os.path.join(self._cfg.hpo.working_folder,
f'{idx}_tmp_grid_search_space.yaml'),
'w') as f:
yaml.dump(arms, f)
new_trial_cfg = dict()
for k in trial_cfg:
if k not in arms['arm0']:
new_trial_cfg[k] = trial_cfg[k]
new_trial_cfg['hpo.table.idx'] = idx
new_trial_cfg['hpo.fedex.ss'] = os.path.join(
self._cfg.hpo.working_folder,
f"{new_trial_cfg['hpo.table.idx']}_tmp_grid_search_space.yaml")
new_trial_cfg['federate.save_to'] = os.path.join(
self._cfg.hpo.working_folder, "idx_{}.pth".format(idx))
new_init_configs.append(new_trial_cfg)

self._search_space.add_hyperparameter(
CS.CategoricalHyperparameter("hpo.table.idx",
choices=list(
range(len(new_init_configs)))))

return new_init_configs

# class SHAWrapFedex(SuccessiveHalvingAlgo):
# """This SHA is customized as a wrapper for FedEx algorithm."""
# def _make_local_perturbation(self, config):
# neighbor = dict()
# for k in config:
# if 'fedex' in k or 'fedopt' in k or k in [
# 'federate.save_to', 'federate.total_round_num',
# 'eval.freq'
# ]:
# # a workaround
# continue
# hyper = self._search_space.get(k)
# if isinstance(hyper, CS.UniformFloatHyperparameter):
# lb, ub = hyper.lower, hyper.upper
# diameter = self._cfg.fedex.wrapper.eps * (ub - lb)
# new_val = (config[k] -
# 0.5 * diameter) + np.random.uniform() * diameter
# neighbor[k] = float(np.clip(new_val, lb, ub))
# elif isinstance(hyper, CS.UniformIntegerHyperparameter):
# lb, ub = hyper.lower, hyper.upper
# diameter = self._cfg.fedex.wrapper.eps * (ub - lb)
# new_val = round(
# float((config[k] - 0.5 * diameter) +
# np.random.uniform() * diameter))
# neighbor[k] = int(np.clip(new_val, lb, ub))
# elif isinstance(hyper, CS.CategoricalHyperparameter):
# if len(hyper.choices) == 1:
# neighbor[k] = config[k]
# else:
# threshold = self._cfg.fedex.wrapper.eps * len(
# hyper.choices) / (len(hyper.choices) - 1)
# rn = np.random.uniform()
# new_val = np.random.choice(
# hyper.choices) if rn <= threshold else config[k]
# if type(new_val) in [np.int32, np.int64]:
# neighbor[k] = int(new_val)
# elif type(new_val) in [np.float32, np.float64]:
# neighbor[k] = float(new_val)
# else:
# neighbor[k] = str(new_val)
# else:
# raise TypeError("Value of {} has an invalid type {}".format(
# k, type(config[k])))
#
# return neighbor
#
# def _setup(self):
# # self._cache_yaml()
# init_configs = super(SHAWrapFedex, self)._setup()
# new_init_configs = []
# for idx, trial_cfg in enumerate(init_configs):
# arms = dict(("arm{}".format(1 + j),
# self._make_local_perturbation(trial_cfg))
# for j in range(self._cfg.hpo.fedex.wrapper.arm - 1))
# arms['arm0'] = dict(
# (k, v) for k, v in trial_cfg.items() if k in arms['arm1'])
# with open(
# os.path.join(self._cfg.hpo.working_folder,
# f'{idx}_tmp_grid_search_space.yaml'),
# 'w') as f:
# yaml.dump(arms, f)
# new_trial_cfg = dict()
# for k in trial_cfg:
# if k not in arms['arm0']:
# new_trial_cfg[k] = trial_cfg[k]
# new_trial_cfg['hpo.table.idx'] = idx
# new_trial_cfg['hpo.fedex.ss'] = os.path.join(
# self._cfg.hpo.working_folder,
# f"{new_trial_cfg['hpo.table.idx']}_tmp_grid_search_space.yaml")
# new_trial_cfg['federate.save_to'] = os.path.join(
# self._cfg.hpo.working_folder, "idx_{}.pth".format(idx))
# new_init_configs.append(new_trial_cfg)
#
# self._search_space.add_hyperparameter(
# CS.CategoricalHyperparameter("hpo.table.idx",
# choices=list(
# range(len(new_init_configs)))))
#
# return new_init_configs

# TODO: refactor PBT to enable async parallel
# class PBT(IterativeScheduler):
Expand Down
5 changes: 5 additions & 0 deletions federatedscope/autotune/fedex/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import json
import copy
import numpy as np

from federatedscope.core.message import Message
from federatedscope.core.workers import Client
Expand Down Expand Up @@ -56,6 +57,10 @@ def callback_funcs_for_model_para(self, message: Message):
rnd=self.state,
role='Client #{}'.format(self.ID),
return_raw=True))
# Inject
if self.ID in self._cfg.hpo.fedex.attack.id:
results['val_avg_loss_after'] += \
self._cfg.hpo.fedex.attack.sigma * np.random.randn()

results['arms'] = arms
results['client_id'] = self.ID - 1
Expand Down
Loading