diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 05fc1ee8..e0182aa2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,10 @@ jobs: with: name: "config.json" json: ${{ secrets.CONFIG_JSON }} + - name: Add Key + run: | + echo "${{ secrets.AUTONOMIO_DEV_PEM }}" > autonomio-dev.pem + chmod 0400 autonomio-dev.pem - name: Tests run: | pip install tensorflow>=2.0 diff --git a/docs/DistributedScan.md b/docs/DistributedScan.md new file mode 100644 index 00000000..adb099ae --- /dev/null +++ b/docs/DistributedScan.md @@ -0,0 +1,316 @@ +# DistributedScan + +The experiment is configured and started through the `DistributedScan()` command. All of the options effecting the experiment, other than the hyperparameters themselves, are configured through the Scan arguments. The most common use-case is where ~10 arguments are invoked. + +## Minimal Example + +```python +jako.DistributedScan(x='x', y='y', params=p, model=input_model,config='config.json') + +``` + +## DistributedScan Arguments + +`x`, `y`, `params`, `model` and `config` are the only required arguments to start the experiment, all other are optional. + +Argument | Input | Description +--------- | ------- | ----------- +`x` | array or list of arrays | prediction features +`y` | array or list of arrays | prediction outcome variable +`params` | dict or ParamSpace object | the parameter dictionary or the ParamSpace object after splitting +`model` | function | the Keras model as a function +`experiment_name` | str | Used for creating the experiment logging folder +`x_val` | array or list of arrays | validation data for x +`y_val` | array or list of arrays | validation data for y +`val_split` | float | validation data split ratio +`random_method` | str | the random method to be used +`seed` | float | Seed for random states +`performance_target` | list | A result at which point to end experiment +`fraction_limit` | float | The fraction of permutations to be processed +`round_limit` | int | Maximum number of permutations in the experiment +`time_limit` | datetime | Time limit for experiment in format `%Y-%m-%d %H:%M` +`boolean_limit` | function | Limit permutations based on a lambda function +`reduction_method` | str | Type of reduction optimizer to be used used +`reduction_interval` | int | Number of permutations after which reduction is applied +`reduction_window` | int | the lookback window for reduction process +`reduction_threshold` | float | The threshold at which reduction is applied +`reduction_metric` | str | The metric to be used for reduction +`minimize_loss` | bool | `reduction_metric` is a loss +`disable_progress_bar` | bool | Disable live updating progress bar +`print_params` | bool | Print each permutation hyperparameters +`clear_session` | bool | Clear backend session between permutations +`save_weights` | bool | Keep model weights (increases memory pressure for large models) +`config` | str or dict | Configuration containing information about machines to distribute and database to upload the data. + + +## DistributedScan Object Properties + +Once the `DistributedScan()` procedures are completed, an object with several useful properties is returned.The namespace is strictly kept clean, so all the properties consist of meaningful contents. + +In the case conducted the following experiment, we can access the properties in `distributed_scan_object` which is a python class object. + +```python +distributed_scan_object = jako.DistributedScan(x, y, model=iris_model, params=p, fraction_limit=0.1, config='config.json') +``` +
+ +**`best_model`** picks the best model based on a given metric and returns the index number for the model. + +```python +distributed_scan_object.best_model(metric='f1score', asc=False) +``` +NOTE: `metric` has to be one of the metrics used in the experiment, and `asc` has to be True for the case where the metric is something to be minimized. + +
+ +**`data`** returns a pandas DataFrame with the results for the experiment together with the hyperparameter permutation details. + +```python +distributed_scan_object.data +``` + +
+ +**`details`** returns a pandas Series with various meta-information about the experiment. + +```python +distributed_scan_object.details +``` + +
+ +**`evaluate_models`** creates a new column in `distributed_scan_object.data` with result from kfold cross-evaluation. + +```python +distributed_scan_object.evaluate_models(x_val=x_val, + y_val=y_val, + n_models=10, + metric='f1score', + folds=5, + shuffle=True, + average='binary', + asc=False) +``` + +Argument | Description +-------- | ----------- +`distributed_scan_object` | The class object returned by DistributedScan() upon completion of the experiment. +`x_val` | Input data (features) in the same format as used in DistributedScan(), but should not be the same data (or it will not be much of validation). +`y_val` | Input data (labels) in the same format as used in DistributedScan(), but should not be the same data (or it will not be much of validation). +`n_models` | The number of models to be evaluated. If set to 10, then 10 models with the highest metric value are evaluated. See below. +`metric` | The metric to be used for picking the models to be evaluated. +`folds` | The number of folds to be used in the evaluation. +`shuffle` | If the data is to be shuffled or not. Set always to False for timeseries but keep in mind that you might get periodical/seasonal bias. +`average` |One of the supported averaging methods: 'binary', 'micro', or 'macro' +`asc` |Set to True if the metric is to be minimized. +`saved` | bool | if a model saved on local machine should be used +`custom_objects` | dict | if the model has a custom object, pass it here + +
+ +**`learning_entropy`** returns a pandas DataFrame with entropy measure for each permutation in terms of how much there is variation between results of each epoch in the permutation. + +```python +distributed_scan_object.learning_entropy +``` + +
+ +**`params`** returns a dictionary with the original input parameter ranges for the experiment. + +```python +distributed_scan_object.params +``` + +
+ +**`round_times`** returns a pandas DataFrame with the time when each permutation started, ended, and how many seconds it took. + +```python +distributed_scan_object.round_times +``` + +
+ +
+ +**`round_history`** returns epoch-by-epoch data for each model in a dictionary. + +```python +distributed_scan_object.round_history +``` + +
+ +**`saved_models`** returns the JSON (dictionary) for each model. + +```python +distributed_scan_object.saved_models +``` + +
+ +**`saved_weights`** returns the weights for each model. + +```python +distributed_scan_object.saved_weights +``` + +
+ +**`x`** returns the input data (features). + +```python +distributed_scan_object.x +``` + +
+ +**`y`** returns the input data (labels). + +```python +distributed_scan_object.y +``` + +## Input Model + +The input model is any Keras or tf.keras model. It's the model that Jako will use as the basis for the hyperparameter experiment. + +#### A minimal example + +```python +def input_model(x_train, y_train, x_val, y_val, params): + + model.add(Dense(12, input_dim=8, activation=params['activation'])) + model.add(Dense(1, activation='sigmoid')) + model.compile(loss='binary_crossentropy', params['optimizer']) + out = model.fit(x=x_train, + y=y_train, + validation_data=[x_val, y_val]) + + return out, model +``` +See specific details about defining the model [here](Examples_Typical?id=defining-the-model). + +#### Models with multiple inputs or outputs (list of arrays) + +For both cases, `DistributedScan(... x_val, y_val ...)` must be explicitly set i.e. you split the data yourself before passing it into Jako. Using the above minimal example as a reference. + +For **multi-input** change `model.fit()` as highlighted below: + +```python +out = model.fit(x=[x_train_a, x_train_b], + y=y_train, + validation_data=[[x_val_a, x_val_b], y_val]) +``` + +For **multi-output** the same structure is expected but instead of changing the `x` argument values, now change `y`: + +```python + out = model.fit(x=x_train, + y=[y_train_a, y_train_b], + validation_data=[x_val, [y_val_a, y_val_b]]) +``` +For the case where its both **multi-input** and **multi-output** now both `x` and `y` argument values follow the same structure: + +```python + out = model.fit(x=[x_train_a, x_train_b], + y=[y_train_a, y_train_b], + validation_data=[[x_val_a, x_val_b], [y_val_a, y_val_b]]) +``` + + +## Parameter Dictionary + +The first step in an experiment is to decide the hyperparameters you want to use in the optimization process. + +#### A minimal example + +```python +p = { + 'first_neuron': [12, 24, 48], + 'activation': ['relu', 'elu'], + 'batch_size': [10, 20, 30] +} +``` + +#### Supported Input Formats + +Parameters may be inputted either in a list or tuple. + +As a set of discreet values in a list: + +```python +p = {'first_neuron': [12, 24, 48]} +``` +As a range of values `(min, max, steps)`: + +```python +p = {'first_neuron': (12, 48, 2)} +``` + +For the case where a static value is preferred, but it's still useful to include it in in the parameters dictionary, use list: + +```python +p = {'first_neuron': [48]} +``` + +## DistributedScan Config file + +A config file has all the information regarding connection to remote machines. The config file will also use one of the remote machines as the central datastore. A sample config file will look like the following: + +``` +{ + "run_central_node": true, + "machines": [ + { + "machine_id": 1, + "JAKO_IP_ADDRESS": "machine_1_ip_address", + "JAKO_PORT": machine_1_port, + "JAKO_USER": "machine_1_username", + "JAKO_PASSWORD": "machine_1_password" + }, + { + "machine_id": 2, + "JAKO_IP_ADDRESS": "machine_2_ip_address", + "JAKO_PORT": machine_2_port, + "JAKO_USER": "machine_2_username", + "JAKO_KEY_FILENAME": "machine_2_key_file_path" + } + ], + "database": { + "DB_HOST_MACHINE_ID": 1, #the id for machine which is the datastore + "DB_USERNAME": "database_username", + "DB_PASSWORD": "database_password", + "DB_TYPE": "database_type", + "DATABASE_NAME": "database_name", + "DB_PORT": database_port, + "DB_ENCODING": "LATIN1", + "DB_UPDATE_INTERVAL": 5 + } +} +``` + +### DistributeScan config arguments + +Argument | Input | Description +--------- | ------- | ----------- +`run_central_node` | bool | if set to true, the central machine where the script runs will also be included in distributed run. +`machines` | list of dict | list of machine configurations +`machine_id` | int | id for each machine in ascending order. +`JAKO_IP_ADDRESS` | str | ip address for the remote machine +`JAKO_PORT` | int | port number for the remote machine +`JAKO_USER` | str | username for the remote machine +`JAKO_PASSWORD` | str | password for the remote machine +`JAKO_KEY_FILENAME` | str | if password not available, the path to RSA private key of the machine could be supplied to this argument. +`database` | dict | configuration parameters for central datastore +`DB_HOST_MACHINE_ID` | int | The machine id to one of the remote machines that can be used as the host where the database resides. +`DB_USERNAME` | str | database username +`DB_PASSWORD` | str | database_password +`DB_TYPE` | str | database_type. Default is `postgres`.The available options are `postgres`, `mysql` and `sqlite` +`DATABASE_NAME` | str | database name +`DB_PORT` | int | database_port +`DB_ENCODING` | str | Defaults to `LATIN1` +`DB_UPDATE_INTERVAL` | int | The frequency with which database update happens. The value is in seconds.Defaults to `5`. + + diff --git a/docs/Install_Options.md b/docs/Install_Options.md new file mode 100644 index 00000000..ae056a73 --- /dev/null +++ b/docs/Install_Options.md @@ -0,0 +1,57 @@ +# Install Options + +Before installing jako, it is recommended to first setup and start the following: +* A python or conda environment. +* A postgresql database setup in one of the machines. This will be used as the central datastore. + + +## Installing Jako + +#### Creating a python virtual environment +```python +virtualenv -p python3 jako_env +source jako_env/bin/activate +``` + +#### Creating a conda virtual environment +```python +conda create --name jako_env +conda activate jako_env +``` + +#### Install latest from PyPi +```python +pip install jako +``` + +#### Install a specific version from PyPi +```python +pip install jako==0.1 +``` + +#### Upgrade installation from PyPi +```python +pip install -U --no-deps jako +``` + +#### Install from monthly +```python +pip install --upgrade --no-deps --force-reinstall git+https://github.com/autonomio/jako +``` + +#### Install from weekly +```python +pip install --upgrade --no-deps --force-reinstall git+https://github.com/autonomio/jako@dev +``` + +#### Install from daily +```python +pip install --upgrade --no-deps --force-reinstall git+https://github.com/autonomio/jako@daily-dev +``` + +## Installing a postgres database + +To enable postgres in your central datastore, follow the steps in this +* Postgres for ubuntu machine: [link](https://blog.logrocket.com/setting-up-a-remote-postgres-database-server-on-ubuntu-18-04/) +* Postgres for Mac Machines: [link](https://www.sqlshack.com/setting-up-a-postgresql-database-on-mac/) + diff --git a/jako/database/database.py b/jako/database/database.py index f51bd3e5..9c46df1b 100644 --- a/jako/database/database.py +++ b/jako/database/database.py @@ -1,6 +1,5 @@ from sqlalchemy import create_engine -from sqlalchemy_utils import database_exists, create_database, drop_database -from sqlalchemy.exc import DatabaseError +from sqlalchemy_utils import database_exists, drop_database from sqlalchemy.schema import DropTable @@ -15,7 +14,6 @@ def __init__(self, database_name='EXPERIMENT_LOG', table_name='experiment_log', encoding='LATIN1'): - ''' Parameters @@ -43,42 +41,36 @@ def __init__(self, elif db_type == 'mysql': if port is None: port = 3306 - DB_URL = ( - 'mysql+pymysql://' - + username - + ':' - + password - + '@' - + host - + ':' - + str(port) - + '/' - + database_name - ) + + url = '''mysql+pymysql://{}:{}@{}:{}/{}'''.format(username, + password, + host, + str(port), + database_name) + + DB_URL = (url) + elif db_type == 'postgres': if port is None: port = 5432 - DB_URL = ( - 'postgresql://' - + username - + ':' - + password - + '@' - + host - + ':' - + str(port) - + '/' - + database_name - ) + + url = 'postgresql://{}:{}@{}:{}/{}'.format(username, + password, + host, + str(port), + database_name) + + DB_URL = (url) + self.DB_URL = DB_URL def create_db(self): - ''' Create database if it doesn't exists. ''' - - engine = create_engine(self.DB_URL, echo=False, isolation_level='AUTOCOMMIT') + + engine = create_engine(self.DB_URL, + echo=False, isolation_level='AUTOCOMMIT') if not database_exists(engine.url): @@ -99,7 +91,7 @@ def create_db(self): ) except Exception as e: - pass + print(e) return engine @@ -129,7 +121,8 @@ def write_to_db(self, data_frame): ''' engine = self.create_db() - data_frame.to_sql(self.table_name, con=engine, if_exists='append', index=False) + data_frame.to_sql(self.table_name, con=engine, + if_exists='append', index=False) def query_table(self, query): ''' @@ -150,10 +143,8 @@ def query_table(self, query): def show_table_content(self): ''' - - - Returns - ------- + Returns + ------- res |`list` of `tuples` | Query output from the database ''' @@ -162,8 +153,6 @@ def show_table_content(self): def return_table_df(self): ''' - - Returns ------- data | Pandas DataFrame object | returns the database as a dataframe @@ -177,12 +166,55 @@ def return_table_df(self): def return_existing_experiment_ids(self): ''' - Returns ------- ids | Pandas Series object | returns the experiment id of the table ''' - table = self.return_table_df() - ids = table.iloc[:, -1] - return ids + + query_str = 'SELECT experiment_id from {}'.format(self.table_name) + res = self.query_table(query_str) + res = [val[0] for val in res] + + return res + + def return_columns(self): + ''' + Returns + ------- + cols | list| returns the columns of the table + ''' + + query_string = """select COLUMN_NAME from information_schema.columns + where table_name='{}'""" + query_string = query_string.format(self.table_name) + cols = self.query_table(query_string) + cols = [col[0] for col in cols] + + return cols + + def add_new_columns(self, columns): + ''' Add a new column to the Database''' + + query_str = 'ALTER TABLE {}'.format(self.table_name) + col_query_str = ' ADD COLUMN {} varchar,' + + for col in columns: + query_str = query_str + col_query_str.format(col) + + query_str = query_str.rstrip(',') + ';' + + try: + self.query_table(query_str) + except Exception as e: + exception_str1 = ''' + This result object does not return rows. + ''' + exception_str2 = '(psycopg2.errors.DuplicateColumn)' + exception_str3 = '(psycopg2.errors.UndefinedTable)' + exceptions = [exception_str1, exception_str2, exception_str3] + e = str(e) + if any(ex in e for ex in exceptions): + pass + else: + raise Exception(e) diff --git a/jako/database/experiment_database.py b/jako/database/experiment_database.py deleted file mode 100644 index e2363d79..00000000 --- a/jako/database/experiment_database.py +++ /dev/null @@ -1,165 +0,0 @@ -from sqlalchemy import create_engine -from sqlalchemy_utils import database_exists, drop_database -from sqlalchemy.exc import DatabaseError -from sqlalchemy.schema import DropTable - - -class ExperimentDatabase: - - def __init__(self, - username=None, - password=None, - host=None, - port=None, - db_type='sqlite', - database_name='EXPERIMENT_LOG', - table_name='experiment_log', - encoding='LATIN1'): - - '''For creating and managing the experiment database - - Arguments - --------- - username | str | The default is None. - password | str | The default is None. - host | str , optional| The default is None. - port | str , optional| The default is None. - db_type | str | The default is 'sqlite'. - database_name | str | The default is 'EXPERIMENT_LOG'. - table_name | str | The default is 'experiment_log'. - - Returns - ------- - None - ''' - - self.db_type = db_type - self.database_name = database_name - self.table_name = table_name - self.encoding = encoding - DB_URL = '' - if db_type == 'sqlite': - DB_URL = 'sqlite:///' + database_name + '.db' - elif db_type == 'mysql': - if port is None: - port = 3306 - DB_URL = ( - 'mysql+pymysql://' - + username - + ':' - + password - + '@' - + host - + ':' - + str(port) - + '/' - + database_name - ) - elif db_type == 'postgres': - if port is None: - port = 5432 - DB_URL = ( - 'postgresql://' - + username - + ':' - + password - + '@' - + host - + ':' - + str(port) - + '/' - + database_name - ) - self.DB_URL = DB_URL - - def _create_db(self): - - '''Create database if it doesn't exists. - ''' - - engine = create_engine(self.DB_URL, echo=False, isolation_level='AUTOCOMMIT') - - if not database_exists(engine.url): - - new_engine = create_engine( - self.DB_URL.replace(self.database_name, ''), - echo=False, - isolation_level='AUTOCOMMIT', - ) - conn = new_engine.connect() - - try: - conn.execute( - ''' - CREATE DATABASE {} ENCODING '{}' - '''.format( - self.database_name, self.encoding - ) - ) - - except Exception as e: - pass - - return engine - - def _query_table(self, query): - - '''Makes query in the database - - Arguments - --------- - query | `str`| Database query for the respective sql engine - - Returns - ------- - res | `list` of `tuples` | Query output from the database - - ''' - - engine = self._create_db() - res = engine.execute(query).fetchall() - - return res - - def _show_table_content(self): - - '''Returns the values from the database - - Returns - ------- - res |`list` of `tuples` | Query output from the database - ''' - - res = self._query_table('SELECT * FROM {}'.format(self.table_name)) - - return res - - def _return_table_df(self): - - '''Returns the whole table from the database - - Returns - ------- - data | Pandas DataFrame object | returns the database as a dataframe - ''' - - import pandas as pd - - table = self._show_table_content() - data = pd.DataFrame(table) - - return data - - def _return_existing_experiment_ids(self): - - '''Returns those experiment_id already in the db - - Returns - ------- - ids | Pandas Series object | returns the experiment id of the table - ''' - - table = self._return_table_df() - ids = table.iloc[:, -1] - - return ids diff --git a/jako/distribute/DistributedScan.py b/jako/distribute/DistributedScan.py index 2619a6ba..1021f370 100644 --- a/jako/distribute/DistributedScan.py +++ b/jako/distribute/DistributedScan.py @@ -30,7 +30,6 @@ def __init__(self, clear_session=True, save_weights=True, config='config.json'): - '''Distributed version of talos.Scan() for the local machine. Parameters @@ -91,9 +90,10 @@ def __init__(self, arguments_dict = self.__dict__ remove_parameters = ["x", "y", "model"] - arguments_dict = {k: v for k, v in arguments_dict.items() if k not in remove_parameters} + arguments_dict = {k: v for k, v in arguments_dict.items() + if k not in remove_parameters} - self.file_path = 'tmp/scanfile_remote.py' + self.file_path = '/tmp/scanfile_remote.py' self.save_timestamp = time.strftime('%D%H%M%S').replace('/', '') @@ -111,38 +111,53 @@ def __init__(self, else: TypeError('`config` must be dict or filename string.') + # write database name as same as experiment name + self.config_data['database']['DB_TABLE_NAME'] = experiment_name + + if isinstance(config, str): + config_path = config + + else: + config_path = 'config.json' + + with open(config_path, 'w') as f: + json.dump(self.config_data, f, indent=2) + if 'finished_scan_run' in self.config_data.keys(): del self.config_data['finished_scan_run'] # handles location for params,data and model - if not os.path.exists("tmp/"): - os.mkdir("tmp") + if not os.path.exists("/tmp/"): + os.mkdir("/tmp/") - self.dest_dir = "./tmp" + self.dest_dir = "/tmp/" # save data in numpy format - np.save("tmp/x_data_remote.npy", x) - np.save("tmp/y_data_remote.npy", y) + np.save("/tmp/x_data_remote.npy", x) + np.save("/tmp/y_data_remote.npy", y) # get model function as a string model_func = inspect.getsource(model).lstrip() self.model_func = model_func self.model_name = model.__name__ - + from .distribute_database import get_db_object from .distribute_utils import get_experiment_stage - - db=get_db_object(self) - self.stage=get_experiment_stage(self, db) - + + db = get_db_object(self) + self.stage = get_experiment_stage(self, db) + if not self.stage: - self.stage=0 + self.stage = 0 arguments_dict["stage"] = self.stage - - with open('tmp/arguments_remote.json', 'w') as outfile: + + with open('/tmp/arguments_remote.json', 'w') as outfile: json.dump(arguments_dict, outfile, indent=2) + with open('/tmp/remote_config.json', 'w') as outfile: + json.dump(self.config, outfile, indent=2) + from .distribute_run import distribute_run distribute_run(self) diff --git a/jako/distribute/RemoteScan.py b/jako/distribute/RemoteScan.py index 13f90dd0..ddd3b273 100644 --- a/jako/distribute/RemoteScan.py +++ b/jako/distribute/RemoteScan.py @@ -2,41 +2,19 @@ class RemoteScan(Scan): - def __init__(self, x, y, params, model, experiment_name, - x_val=None, - y_val=None, - val_split=0.3, - random_method='uniform_mersenne', - seed=None, - performance_target=None, - fraction_limit=None, - round_limit=None, - time_limit=None, - boolean_limit=None, - reduction_method=None, - reduction_interval=50, - reduction_window=20, - reduction_threshold=0.2, - reduction_metric='val_acc', - minimize_loss=False, - disable_progress_bar=False, - print_params=False, - clear_session=True, - save_weights=True, - config='tmp/remote_config.json'): - + **kwargs): '''Distributed version of talos.Scan() for the remote machines. - + Parameters ---------- params | `dict` | Hyperparameters for distribution. - config | str or dict | The default is 'tmp/remote_config.json'. + config | str or dict | The default is '/tmp/remote_config.json'. Returns ------- @@ -52,64 +30,59 @@ def __init__(self, self.params = params self.model = model self.experiment_name = experiment_name - self.x_val = x_val - self.y_val = y_val - self.val_split = val_split + self.x_val = kwargs['x_val'] + self.y_val = kwargs['y_val'] + self.val_split = kwargs['val_split'] # randomness - self.random_method = random_method - self.seed = seed + self.random_method = kwargs['random_method'] + self.seed = kwargs['seed'] # limiters - self.performance_target = performance_target - self.fraction_limit = fraction_limit - self.round_limit = round_limit - self.time_limit = time_limit - self.boolean_limit = boolean_limit + self.performance_target = kwargs['performance_target'] + self.fraction_limit = kwargs['fraction_limit'] + self.round_limit = kwargs['round_limit'] + self.time_limit = kwargs['time_limit'] + self.boolean_limit = kwargs['boolean_limit'] # optimization - self.reduction_method = reduction_method - self.reduction_interval = reduction_interval - self.reduction_window = reduction_window - self.reduction_threshold = reduction_threshold - self.reduction_metric = reduction_metric - self.minimize_loss = minimize_loss + self.reduction_method = kwargs['reduction_method'] + self.reduction_interval = kwargs['reduction_interval'] + self.reduction_window = kwargs['reduction_window'] + self.reduction_threshold = kwargs['reduction_threshold'] + self.reduction_metric = kwargs['reduction_metric'] + self.minimize_loss = kwargs['minimize_loss'] # display - self.disable_progress_bar = disable_progress_bar - self.print_params = print_params + self.disable_progress_bar = kwargs['disable_progress_bar'] + self.print_params = kwargs['print_params'] # performance - self.clear_session = clear_session - self.save_weights = save_weights + self.clear_session = kwargs['clear_session'] + self.save_weights = kwargs['save_weights'] # distributed configurations - self.config = config + self.config = kwargs['config'] self.save_timestamp = time.strftime('%D%H%M%S').replace('/', '') - if isinstance(config, str): - with open(config, 'r') as f: + if isinstance(self.config, str): + with open(self.config, 'r') as f: self.config_data = json.load(f) - elif isinstance(config, dict): - self.config_data = config - with open('tmp/remote_config.json', 'w') as outfile: - json.dump(self.config_data, outfile, indent=2) - else: - TypeError('`config` must be dict or filename string.') + TypeError('Pass the correct `config` path') if 'finished_scan_run' in self.config_data.keys(): del self.config_data['finished_scan_run'] config = self.config_data - - with open("tmp/arguments_remote.json" ,"r") as f: - arguments_dict=json.load(f) - - self.stage=arguments_dict["stage"] - + + with open("/tmp/arguments_remote.json", "r") as f: + arguments_dict = json.load(f) + + self.stage = arguments_dict["stage"] + if 'run_central_node' in config.keys(): run_central_node = config['run_central_node'] else: @@ -134,9 +107,8 @@ def __init__(self, # create the threadpool threads = [] - args = ([self, update_db_n_seconds, current_machine_id,self.stage]) + args = ([self, update_db_n_seconds, current_machine_id, self.stage]) thread = threading.Thread(target=update_db, args=args) - thread.start() threads.append(thread) diff --git a/jako/distribute/distribute_database.py b/jako/distribute/distribute_database.py index e8e08f5c..746e49e3 100644 --- a/jako/distribute/distribute_database.py +++ b/jako/distribute/distribute_database.py @@ -1,6 +1,9 @@ import time -import pandas as pd -from .distribute_utils import read_config, write_config, add_experiment_id, fetch_latest_file +from .distribute_utils import read_config, write_config +from .distribute_utils import add_experiment_id, add_timestamp +from .distribute_utils import fetch_latest_file +import sys + def get_db_object(self): config = self.config_data @@ -15,7 +18,7 @@ def get_db_object(self): for machine in machine_config: if int(machine['machine_id']) == host_machine_id: - host = machine['TALOS_IP_ADDRESS'] + host = machine['JAKO_IP_ADDRESS'] break port = db_config['DB_PORT'] @@ -34,9 +37,9 @@ def get_db_object(self): encoding=encoding) return db - -def update_db(self, update_db_n_seconds, current_machine_id,stage): - + + +def update_db(self, update_db_n_seconds, current_machine_id, stage): '''Make changes to the datastore based on a time interval Parameters @@ -45,22 +48,38 @@ def update_db(self, update_db_n_seconds, current_machine_id,stage): Returns ------- - db | Database object | Database object with engine + db | Database object | Database object with engine ''' # update the database every n seconds db = get_db_object(self) config = self.config_data + def __start_upload(results_data): + if len(results_data) > 0: + db_cols = db.return_columns() + df_cols = results_data.columns + missing_columns = [col for col in db_cols if col not in df_cols] + new_columns = [col for col in df_cols if col not in db_cols] + + if len(missing_columns) > 0: + exception_str = '''You have to change the experiment_name or + add at least value for {} + into the input parameter'''.format(missing_columns) + raise Exception(exception_str) + + if len(new_columns) > 0: + db.add_new_columns(new_columns) + db.write_to_db(results_data) return db start_time = int(self.save_timestamp) start_row = 0 - end_row = 0 + end_row = 0 while True: @@ -79,50 +98,44 @@ def __start_upload(results_data): continue if len(results_data) > 0: - start_row=end_row - end_row=len(results_data) - - if start_row!=end_row and end_row>start_row: - - results_data = add_experiment_id(self, - results_data, + start_row = end_row + end_row = len(results_data) + + if start_row != end_row and end_row > start_row: + results_data = add_timestamp(self, results_data) + results_data = add_experiment_id(self, + results_data, current_machine_id, start_row, end_row, db, stage) - + __start_upload(results_data) - - if int(current_machine_id) == 0: - remote = False - else: - remote = True - - new_config = read_config(self,remote) + + new_config = read_config(self) if 'finished_scan_run' in new_config.keys(): results_data = fetch_latest_file(self) - + start_row = end_row end_row = len(results_data) - if start_row != end_row and end_row>start_row: - + if start_row != end_row and end_row > start_row: + results_data = add_timestamp(self, results_data) results_data = add_experiment_id(self, - results_data, + results_data, current_machine_id, start_row, end_row, db, stage) - + __start_upload(results_data) write_config(self, new_config) - print('Scan Run Finished in machine id : ' + current_machine_id) - exit() + sys.exit() else: @@ -131,5 +144,3 @@ def __start_upload(results_data): else: print('Database credentials not given.') - - diff --git a/jako/distribute/distribute_finish.py b/jako/distribute/distribute_finish.py new file mode 100644 index 00000000..1c06072d --- /dev/null +++ b/jako/distribute/distribute_finish.py @@ -0,0 +1,70 @@ +def distribute_finish(self): + import pandas as pd + import os + import json + import numpy as np + + attrs_final = ['data', 'x', 'y', 'learning_entropy', 'round_times', + 'params', 'saved_models', 'saved_weights', 'round_history', + 'details'] + + keys = list(self.__dict__.keys()) + for key in keys: + if key not in attrs_final: + delattr(self, key) + + from talos.scan.scan_addon import func_best_model, func_evaluate + self.best_model = func_best_model.__get__(self) + self.evaluate_models = func_evaluate.__get__(self) + + all_filenames = ['/tmp/' + file for file in os.listdir('/tmp/')] + + scan_data_list = [] + scan_details_list = [] + scan_learning_entropy_list = [] + scan_round_times_list = [] + scan_saved_models_dict = {} + scan_round_history_dict = {} + scan_saved_weights_dict = {} + + for file in all_filenames: + if file.endswith('scan_data.csv'): + df = pd.read_csv(file) + scan_data_list.append(df) + + if file.endswith('scan_details.csv'): + df = pd.read_csv(file) + scan_details_list.append(df) + + if file.endswith('scan_learning_entropy.csv'): + df = pd.read_csv(file) + scan_learning_entropy_list.append(df) + + if file.endswith('scan_round_times.csv'): + df = pd.read_csv(file) + scan_round_times_list.append(df) + + if file.endswith('scan_saved_models.json'): + with open(file, 'r') as f: + scan_saved_model = json.load(f) + scan_saved_models_dict.update(scan_saved_model) + + if file.endswith('scan_round_history.npy'): + scan_round_history = np.load(file, allow_pickle=True) + keyname = os.path.basename(file).replace(".npy", '') + scan_round_history_dict[keyname] = scan_round_history + + if file.endswith('scan_saved_weights.npy'): + scan_saved_weights = np.load(file, allow_pickle=True) + keyname = os.path.basename(file).replace(".npy", '') + scan_saved_weights_dict[keyname] = scan_saved_weights + + self.data = pd.concat(scan_data_list) + self.details = pd.concat(scan_details_list) + self.learning_entropy = pd.concat(scan_learning_entropy_list) + self.round_times = pd.concat(scan_round_times_list) + self.saved_models = scan_saved_models_dict + self.round_history = scan_round_history_dict + self.saved_weights = scan_saved_weights_dict + + return self diff --git a/jako/distribute/distribute_params.py b/jako/distribute/distribute_params.py index a5044399..19f84d80 100644 --- a/jako/distribute/distribute_params.py +++ b/jako/distribute/distribute_params.py @@ -1,9 +1,8 @@ from talos import Scan -from .distribute_utils import read_config, write_config +from .distribute_utils import read_config, write_config, write_scan_namespace def create_param_space(self, n_splits): - '''Creates the parameter space for each machine Parameters @@ -12,7 +11,7 @@ def create_param_space(self, n_splits): Returns ------- - param_grid | `list` of `dict` | Split parameter spaces for each machine to run. + param_grid | `list` of `dict` | Split parameter spaces for each machine. ''' @@ -40,22 +39,22 @@ def create_param_space(self, n_splits): def run_scan(self, machines, run_central_node, machine_id): - '''Run `talos.Scan()` on each machine. Parameters ---------- machines | int | - run_central_node | bool | + run_central_node | bool | Returns ------- None. ''' - + # runs Scan in a machine after param split machine_id = int(machine_id) + current_machine_id = machine_id if not run_central_node: if machine_id != 0: @@ -64,6 +63,8 @@ def run_scan(self, machines, run_central_node, machine_id): split_params = create_param_space(self, n_splits=machines) split_params = split_params.param_spaces[machine_id] + print('Started experiment in machine id : {}'.format(current_machine_id)) + scan_object = Scan(x=self.x, y=self.y, params=split_params, @@ -90,14 +91,13 @@ def run_scan(self, machines, run_central_node, machine_id): clear_session=self.clear_session, save_weights=self.save_weights) + print('Completed experiment in machine id : {}'.format(current_machine_id)) + new_config = read_config(self) new_config['finished_scan_run'] = True - - if machine_id == 0: + + if int(current_machine_id) == 0: new_config['current_machine_id'] = 0 - remote=False - - else: - remote=True - write_config(self, new_config,remote) + write_config(self, new_config) + write_scan_namespace(self, scan_object, current_machine_id) diff --git a/jako/distribute/distribute_run.py b/jako/distribute/distribute_run.py index ccb062ac..70b46283 100644 --- a/jako/distribute/distribute_run.py +++ b/jako/distribute/distribute_run.py @@ -1,13 +1,13 @@ import json import threading +import os from .distribute_params import run_scan from .distribute_utils import return_current_machine_id, ssh_connect -from .distribute_utils import ssh_file_transfer, ssh_run +from .distribute_utils import ssh_file_transfer, ssh_run, ssh_get_files from .distribute_database import update_db def run_central_machine(self, n_splits, run_central_node): - '''Runs `talos.Scan()` in the central machine. Parameters @@ -21,12 +21,11 @@ def run_central_machine(self, n_splits, run_central_node): ''' machine_id = 0 - + run_scan(self, n_splits, run_central_node, machine_id) def distribute_run(self): - ''' Parameters @@ -64,15 +63,15 @@ def distribute_run(self): clients = ssh_connect(self) for machine_id, client in clients.items(): - + new_config = config new_config['current_machine_id'] = machine_id - - with open('tmp/remote_config.json', 'w') as outfile: + + with open('/tmp/remote_config.json', 'w') as outfile: json.dump(new_config, outfile) - + ssh_file_transfer(self, client, machine_id) - + # create the threads threads = [] @@ -83,13 +82,13 @@ def distribute_run(self): thread.start() threads.append(thread) - args = ([self, update_db_n_seconds, current_machine_id,self.stage]) + args = ([self, update_db_n_seconds, current_machine_id, self.stage]) thread = threading.Thread(target=update_db, args=args) thread.start() threads.append(thread) for machine_id, client in clients.items(): - + args = (self, client, machine_id) thread = threading.Thread(target=ssh_run, args=args) thread.start() @@ -97,3 +96,13 @@ def distribute_run(self): for t in threads: t.join() + + for file in os.listdir('/tmp/'): + if file.startswith('machine_id'): + os.remove('/tmp/' + file) + + for machine_id, client in clients.items(): + ssh_get_files(self, client, machine_id) + + from .distribute_finish import distribute_finish + self = distribute_finish(self) diff --git a/jako/distribute/distribute_utils.py b/jako/distribute/distribute_utils.py index 1a245c4e..841ec642 100644 --- a/jako/distribute/distribute_utils.py +++ b/jako/distribute/distribute_utils.py @@ -2,6 +2,7 @@ import paramiko import os import pandas as pd +import datetime def create_temp_file(self): @@ -11,14 +12,14 @@ def create_temp_file(self): import json import pickle -x=np.load('tmp/x_data_remote.npy') -y=np.load('tmp/y_data_remote.npy') - +x=np.load('/tmp/x_data_remote.npy') +y=np.load('/tmp/y_data_remote.npy') + {} -with open('tmp/arguments_remote.json','r') as f: +with open('/tmp/arguments_remote.json','r') as f: arguments_dict=json.load(f) - + t=RemoteScan(x=x, y=y, params=arguments_dict['params'], @@ -44,16 +45,16 @@ def create_temp_file(self): print_params=arguments_dict['print_params'], clear_session=arguments_dict['clear_session'], save_weights=arguments_dict['save_weights'], - config='tmp/remote_config.json' + config='/tmp/remote_config.json' ) '''.format(self.model_func, self.model_name) - with open("tmp/scanfile_remote.py", "w") as f: + with open("/tmp/scanfile_remote.py", "w") as f: f.write(filestr) def return_current_machine_id(self,): - ''' return machine id after checking the ip from config''' + ''' Return machine id after checking the ip from config''' current_machine_id = 0 if 'current_machine_id' in self.config_data.keys(): @@ -63,7 +64,7 @@ def return_current_machine_id(self,): def return_central_machine_id(self): - ''' return central machine id as mentioned in config''' + ''' Return central machine id as mentioned in config''' central_id = 0 config_data = self.config_data if 'database' in config_data.keys(): @@ -71,93 +72,85 @@ def return_central_machine_id(self): return central_id -def read_config(self,remote=False): - +def read_config(self): '''Read config from file''' - - if remote: - config_path="tmp/remote_config.json" - - else: - config_path="config.json" - + + config_path = "/tmp/remote_config.json" + with open(config_path, 'r') as f: config_data = json.load(f) - + return config_data -def write_config(self, new_config,remote=False): - - ''' write config to file''' - - if remote: - config_path="tmp/remote_config.json" - - else: - config_path="config.json" - +def write_config(self, new_config): + ''' Write config to file''' + + config_path = "/tmp/remote_config.json" + with open(config_path, 'w') as outfile: json.dump(new_config, outfile, indent=2) def ssh_connect(self): - ''' Returns ------- clients | `list` | List of client objects of machines after connection. ''' - + configs = self.config_data['machines'] clients = {} - + for config in configs: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - host = config['TALOS_IP_ADDRESS'] - port = config['TALOS_PORT'] - username = config['TALOS_USER'] - - if 'TALOS_PASSWORD' in config.keys(): - password = config['TALOS_PASSWORD'] + host = config['JAKO_IP_ADDRESS'] + port = config['JAKO_PORT'] + username = config['JAKO_USER'] + + if 'JAKO_PASSWORD' in config.keys(): + password = config['JAKO_PASSWORD'] client.connect(host, port, username, password) - - elif 'TALOS_KEY_FILENAME' in config.keys(): - client.connect(host, port, username, key_filename=config['TALOS_KEY_FILENAME']) + + elif 'JAKO_KEY_FILENAME' in config.keys(): + client.connect(host, port, username, + key_filename=config['JAKO_KEY_FILENAME']) clients[config['machine_id']] = client - + return clients def ssh_file_transfer(self, client, machine_id): - - '''transfer the current talos script to the remote machines''' - + '''Transfer the current talos script to the remote machines''' + create_temp_file(self) sftp = client.open_sftp() try: sftp.chdir(self.dest_dir) # Test if dest dir exists - + except IOError: sftp.mkdir(self.dest_dir) # Create dest dir sftp.chdir(self.dest_dir) - for file in os.listdir("tmp"): - sftp.put("tmp/"+file, file) + data_files = ['y_data_remote.npy', 'x_data_remote.npy'] + scan_script_files = ['scanfile_remote.py'] + additional_scan_files = ['remote_config.json', 'arguments_remote.json'] + scan_filenames = data_files + scan_script_files + additional_scan_files + for file in os.listdir("/tmp/"): + if file in scan_filenames: + sftp.put("/tmp/" + file, file) - sftp.put('tmp/remote_config.json', 'remote_config.json') sftp.close() def ssh_run(self, client, machine_id): - '''Run the transmitted script remotely without args and show its output. - + Parameters ---------- client | `Object` | paramiko ssh client object @@ -169,27 +162,44 @@ def ssh_run(self, client, machine_id): None. ''' + execute_str = 'python3 /tmp/scanfile_remote.py' + stdin, stdout, stderr = client.exec_command(execute_str) - stdin, stdout, stderr = client.exec_command('python3 tmp/scanfile_remote.py') - if stderr: for line in stderr: try: # Process each error line in the remote output print(line) - except: - print('Cannot Output error') + except Exception as e: + print(e) for line in stdout: try: # Process each line in the remote output print(line) - except: - print('Cannot Output error') + except Exception as e: + print(e) -def fetch_latest_file(self): +def ssh_get_files(self, client, machine_id): + '''Get files via ssh from a machine''' + sftp = client.open_sftp() + + scan_object_filenames = ('scan_details.csv', 'scan_learning_entropy.csv', + 'scan_round_history.npy', 'scan_round_times.csv', + 'scan_saved_models.json', 'scan_saved_weights.npy', + 'scan_data.csv') + + sftp.chdir(self.dest_dir) + for file in sftp.listdir(self.dest_dir): + if file.endswith(scan_object_filenames): + sftp.get(self.dest_dir + file, '/tmp/' + file) + + sftp.close() + + +def fetch_latest_file(self): '''Fetch the latest csv for an experiment''' experiment_name = self.experiment_name @@ -198,52 +208,139 @@ def fetch_latest_file(self): if not os.path.exists(experiment_name): return [] - filelist = [ - os.path.join(experiment_name, i) - for i in os.listdir(experiment_name) - if i.endswith('.csv') and int(i.replace('.csv', '')) >= int(save_timestamp) - ] + filelist = [] + for file in os.listdir(experiment_name): + if file.endswith('.csv'): + if int(file.replace('.csv', '')) >= int(save_timestamp): + latest_file = os.path.join(experiment_name, file) + filelist.append(latest_file) if filelist: latest_filepath = max(filelist, key=os.path.getmtime) + try: results_data = pd.read_csv(latest_filepath) return results_data except Exception as e: - return [] + e = str(e) + allowed_exception = 'No columns to parse from file' + + if allowed_exception in e: + return [] + else: + raise Exception(e) else: return [] - -def get_experiment_stage(self,db): - + + +def add_timestamp(self, results_data): + '''Adds timestamp to the DataFrame''' + + ct = datetime.datetime.now() + hour = ct.hour + minute = ct.minute + day = ct.day + month = ct.month + year = ct.year + + if minute < 10: + minute = '0' + str(minute) + + if hour < 10: + hour = '0' + str(hour) + + timestamp = "{}:{}/{}-{}-{}".format(hour, minute, day, month, year) + results_data["timestamp"] = [timestamp] * len(results_data) + + return results_data + + +def get_experiment_stage(self, db): + '''Get the current number of times of experiment run''' + try: ids = db.return_existing_experiment_ids() - stage=int(list(ids)[-1].split("-")[0])+1 - + stage = int(list(ids)[-1].split("-")[0]) + 1 + except Exception as e: - stage=0 - + allowed_exception = '(psycopg2.errors.UndefinedTable)' + e = str(e) + if allowed_exception in e: + pass + else: + raise Exception(e) + + stage = 0 + return stage - -def add_experiment_id(self, results_data, machine_id,start_row,end_row,db,stage): +def add_experiment_id(self, results_data, machine_id, start_row, + end_row, db, stage): '''Generate experiment id from model id and row number''' try: ids = db.return_existing_experiment_ids() if "experiment_id" in results_data.columns: - results_data = results_data[ - ~results_data['experiment_id'].isin(ids) - ] - + results_data = results_data[~results_data['experiment_id'].isin(ids + )] + except Exception as e: - pass - - results_data=results_data.iloc[start_row:end_row] - results_data['experiment_id'] = [ - str(stage)+"-"+ str(machine_id)+"-"+str(i) - for i in range(start_row,end_row) - ] - + allowed_exception = '(psycopg2.errors.UndefinedTable)' + e = str(e) + if allowed_exception in e: + pass + else: + raise Exception(e) + + results_data = results_data.iloc[start_row:end_row] + experiment_ids = [] + for i in range(start_row, end_row): + experiment_id = str(stage) + "-" + str(machine_id) + "-" + str(i) + experiment_ids.append(experiment_id) + results_data["experiment_id"] = experiment_ids + return results_data + + +def write_scan_namespace(self, scan_object, machine_id): + ''' + + Parameters + ---------- + scan_object | talos.Scan object | Scan object after Scan run + + Returns + ------- + None. + + ''' + import pandas as pd + import json + import numpy as np + import os + + write_path = os.path.join('/tmp/', 'machine_id_' + str(machine_id) + '_') + scan_details = scan_object.details + scan_data = scan_object.data + scan_learning_entropy = scan_object.learning_entropy + scan_round_history = scan_object.round_history + scan_round_times = scan_object.round_times + scan_saved_models = scan_object.saved_models + scan_saved_weights = scan_object.saved_weights + + details_df = pd.DataFrame({'scan_details': scan_details}) + details_df.to_csv(write_path + 'scan_details.csv') + + scan_data.to_csv(write_path + 'scan_data.csv') + scan_learning_entropy.to_csv(write_path + 'scan_learning_entropy.csv') + scan_round_times.to_csv(write_path + 'scan_round_times.csv') + + np.save(write_path + 'scan_round_history.npy', scan_round_history) + + with open(write_path + 'scan_saved_models.json', 'w') as f: + scan_saved_models = {'saved_models_machine_id_' + str(machine_id): + scan_saved_models} + json.dump(scan_saved_models, f, indent=2) + + np.save(write_path + 'scan_saved_weights.npy', scan_saved_weights) diff --git a/jako/distribute/script_string.py b/jako/distribute/script_string.py index 5fa47091..c688ebe6 100644 --- a/jako/distribute/script_string.py +++ b/jako/distribute/script_string.py @@ -1,15 +1,15 @@ def script_script(): - + out = ''' from jako import RemoteScan import numpy as np import json import pickle -x=np.load('tmp/x_data_remote.npy') -y=np.load('tmp/y_data_remote.npy') +x=np.load('/tmp/x_data_remote.npy') +y=np.load('/tmp/y_data_remote.npy') {} -with open('tmp/arguments_remote.json','r') as f: +with open('/tmp/arguments_remote.json','r') as f: arguments_dict=json.load(f) t=RemoteScan(x=x, @@ -37,7 +37,7 @@ def script_script(): print_params=arguments_dict['print_params'], clear_session=arguments_dict['clear_session'], save_weights=arguments_dict['save_weights'], - config='tmp/remote_config.json' + config='/tmp/remote_config.json' )''' - + return out diff --git a/requirements.txt b/requirements.txt index f742ae9d..d7a28bb0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,6 @@ talos numpy pandas paramiko +psycopg2-binary sqlalchemy sqlalchemy_utils diff --git a/setup.py b/setup.py index 567f0870..6a3a7f84 100755 --- a/setup.py +++ b/setup.py @@ -4,9 +4,9 @@ DESCRIPTION = 'Distributed Hyperparameter Experiments with Talos' LONG_DESCRIPTION = '''\ -Jako makes it straightforward to distribute Talos -experiments across one or more remote machines -without asking you to change anything in the way you are +Jako makes it straightforward to distribute Talos +experiments across one or more remote machines +without asking you to change anything in the way you are already working with Talos. ''' @@ -16,7 +16,7 @@ URL = 'http://autonom.io' LICENSE = 'MIT' DOWNLOAD_URL = 'https://github.com/autonomio/jako/' -VERSION = '0.1' +VERSION = '0.1.1' try: @@ -30,6 +30,7 @@ 'numpy', 'pandas', 'paramiko', + 'psycopg2-binary', 'sqlalchemy', 'sqlalchemy_utils']