The SageMaker PySpark SDK provides a pyspark interface to Amazon SageMaker, allowing customers to train using the Spark Estimator API, host their model on Amazon SageMaker, and make predictions with their model using the Spark Transformer API. You can find the latest, most up to date, documentation at Read the Docs.
sagemaker_pyspark works with python 2.7 and python 3.x. To install it use pip
:
$ pip install sagemaker_pyspark
You can also install sagemaker_pyspark from source:
$ git clone [email protected]:aws/sagemaker-spark.git
$ cd sagemaker-pyspark-sdk
$ python setup.py install
Next, set up credentials (in e.g. ~/.aws/credentials
):
[default]
aws_access_key_id = YOUR_KEY
aws_secret_access_key = YOUR_KEY
Then, set up a default region (in e.g. ~/.aws/config
):
[default]
region=us-west-2
Then, to load the sagemaker jars programatically:
import sagemaker_pyspark
from pyspark.sql import SparkSession
classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()
Alternatively pass the jars to your pyspark job via the --jars flag:
$ spark-submit --jars `bin/sagemakerpyspark-jars`
If you want to play around in interactive mode, the pyspark shell can be used too:
$ pyspark --jars `bin/sagemakerpyspark-jars`
You can also use the --packages flag and pass in the Maven coordinates for SageMaker Spark:
$ pyspark --packages com.amazonaws:sagemaker-spark_2.11:spark_2.1.1-1.0
In PySpark, we recommend using "s3://" to access the EMR file system(EMRFS) in EMR and "s3a://" to access S3A file system in other environments. Examples:
data_s3 = spark.read.format("libsvm").load("s3://some-bucket/some-prefix")
data_s3a = spark.read.format("libsvm").load("s3a://some-bucket/some-prefix")
A KMeansSageMakerEstimator runs a training job using the Amazon SageMaker KMeans algorithm upon invocation of fit(), returning a SageMakerModel.
from pyspark import SparkContext, SparkConf
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator
# Load the sagemaker_pyspark classpath. If you used --jars to submit your job
# there is no need to do this in code.
conf = (SparkConf()
.set("spark.driver.extraClassPath", ":".join(classpath_jars())))
SparkContext(conf=conf)
iam_role = "arn:aws:iam:0123456789012:role/MySageMakerRole"
region = "us-east-1"
training_data = spark.read.format("libsvm").option("numFeatures", "784")
.load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region))
test_data = spark.read.format("libsvm").option("numFeatures", "784")
.load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region))
kmeans_estimator = KMeansSageMakerEstimator(
trainingInstanceType="ml.m4.xlarge",
trainingInstanceCount=1,
endpointInstanceType="ml.m4.xlarge",
endpointInitialInstanceCount=1,
sagemakerRole=IAMRole(iam_role))
kmeans_estimator.setK(10)
kmeans_estimator.setFeatureDim(784)
kmeans_model = kmeans_estimator.fit(training_data)
transformed_data = kmeans_model.transform(test_data)
transformed_data.show()
The SageMakerEstimator expects an input DataFrame with a column named "features" that holds a Spark ML Vector. The estimator also serializes a "label" column of Doubles if present. Other columns are ignored. The dimension of this input vector should be equal to the feature dimension given as a hyperparameter.
The Amazon SageMaker KMeans algorithm accepts many parameters, but K (the number of clusters) and FeatureDim (the number of features per Row) are required.
You can set other hyperparameters, for details on them, run:
kmeans_estimator.explainParams()
After training is complete, an Amazon SageMaker Endpoint is created to host the model and serve predictions. Upon invocation of transform(), the SageMakerModel predicts against their hosted model. Like the SageMakerEstimator, the SageMakerModel expects an input DataFrame with a column named "features" that holds a Spark ML Vector equal in dimension to the value of the FeatureDim parameter.
You can view the PySpark API Documentation for SageMaker Spark here
A XGBoostSageMakerEstimator runs a training job using the Amazon SageMaker XGBoost algorithm upon invocation of fit(), returning a SageMakerModel.
from pyspark import SparkContext, SparkConf
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator
# Load the sagemaker_pyspark classpath. If you used --jars to submit your job
# there is no need to do this in code.
conf = (SparkConf()
.set("spark.driver.extraClassPath", ":".join(classpath_jars())))
SparkContext(conf=conf)
iam_role = "arn:aws:iam:0123456789012:role/MySageMakerRole"
region = "us-east-1"
training_data = spark.read.format("libsvm").option("numFeatures", "784")
.load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region))
test_data = spark.read.format("libsvm").option("numFeatures", "784")
.load("s3a://sagemaker-sample-data-{}/spark/mnist/train/".format(region))
xgboost_estimator = XGBoostSageMakerEstimator(
trainingInstanceType="ml.m4.xlarge",
trainingInstanceCount=1,
endpointInstanceType="ml.m4.xlarge",
endpointInitialInstanceCount=1,
sagemakerRole=IAMRole(iam_role))
xgboost_estimator.setObjective('multi:softmax')
xgboost_estimator.setNumRound(25)
xgboost_estimator.setNumClasses(10)
xgboost_model = xgboost_estimator.fit(training_data)
transformed_data = xgboost_model.transform(test_data)
transformed_data.show()
The SageMakerEstimator expects an input DataFrame with a column named "features" that holds a Spark ML Vector. The estimator also serializes a "label" column of Doubles if present. Other columns are ignored.
The Amazon SageMaker XGBoost algorithm accepts many parameters. Objective (the learning objective of your model, in this case multi-class classification) and NumRounds (the number of rounds to perform tree boosting on) are required. For multi-class classification NumClasses (the number of classes to classify the data into) is required as well.
You can set other hyperparameters, for details on them, run:
xgboost_estimator.explainParams()
After training is complete, an Amazon SageMaker Endpoint is created to host the model and serve predictions. Upon invocation of transform(), the SageMakerModel predicts against the hosted model.
You can view the PySpark API Documentation for SageMaker Spark here
sagemaker_pyspark comes pre-installed in the SageMaker Notebook Environment. There are 2 use cases that we support:
- running on local spark
- connecting to an EMR spark cluster
Create a notebook using the conda_python2
or conda_python3
Kernels. Then you can
initialize a spark context the same way it is described in the QuickStart section:
import sagemaker_pyspark
from pyspark.sql import SparkSession
classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()
Note: Make sure your SageMaker Notebook instance can talk to your EMR Cluster. This means:
- They are in the same VPC.
- The EMR Cluster Security group allows TCP port 8998 on the SageMaker Notebook Security group to ingress.
sagemaker_pyspark works with EMR-5-8.0
(which runs Spark 2.2). To install sagemaker_pyspark
in EMR:
Create a bootstrap script to install sagemaker_pyspark in your new EMR cluster:
#!/bin/bash
sudo pip install sagemaker_pyspark
sudo /usr/bin/pip-3.4 install sagemaker_pyspark
Upload this script to an S3 bucket:
$ aws s3 cp bootstrap.sh s3://your-bucket/prefix/
In the AWS Console launch a new EMR Spark Cluster, set s3://your-bucket/prefix/bootstrap.sh as the bootstrap script. Make sure to:
- Run the Cluster in the same VPC as your SageMaker Notebook Instance.
- Provide an SSH Key that you have access to, as there will be some manual configuration required.
Once the cluster is launched, login to the master node:
$ ssh -i /path/to/ssh-key.pem hadoop@your-emr-cluster-public-dns
Create a backup of the default spark configuration:
$ cd /usr/lib/spark/conf
$ sudo cp spark-defaults.conf spark-defaults.conf.bk
Grab the EMR classpath from the installed sagemaker_pyspark:
$ sagemakerpyspark-emr-jars :
the output will be a ":" separated list of jar files. Copy the output and append it to the
spark.driver.extraClassPath
and spark.executor.extraClassPath
sections of
spark-defaults.conf
Make sure that there is a ":" after the original classpath before you paste the sagemaker_pyspark classpath.
Before proceeding to configure your Notebook instance, open port 8998
to allow ingress from the
security group in the Notebook instance.
Open a terminal session in your notebook: new->terminal
Copy the default sparkmagic config
You can download it in your terminal using:
$ wget https://raw.githubusercontent
.com/jupyter-incubator/sparkmagic/master/sparkmagic/example_config.json
In the kernel_python_credentials
section, replace the url
with
http://your-cluster-private-dns-name:8998
.
Override the default spark magic config
$ cp example_config.json ~/.sparkmagic/config.json
Launch a notebook using either the pyspark2
or pyspark3
Kernel. As soon as you try to run
any code block, the notebook will connect to your spark cluster and get a SparkSession
for you.
Since sagemaker_pyspark depends on the Scala spark modules, you need to be able to build those. Follow the instructions in here.
For the python side, assuming that you have python and virtualenv
installed, set up your
environment and install the required dependencies like this instead of the
pip install sagemaker_pyspark
defined above:
$ git clone https://github.com/aws/sagemaker-spark.git
$ cd sagemaker-spark/sagemaker-pyspark-sdk/
$ virtualenv venv
....
$ . venv/bin/activate
$ pip install -r requirements.txt
$ pip install -e .
Our recommended way of running the tests is using pyenv + pyenv-virtualenv. This allows you to test on different python versions, and to test the installed distribution instead of your local files.
Install pyenv, pyenv-virtualenv and pyenv-virtualenvwrapper
You can do this in OSX using brew
$ brew install pyenv pyenv-virtualenv pyenv-virtualenvwrapper
For linux you can just follow the steps in each of the package's Readme. Or if your distribution has them as packages that is a good alternative.
make sure to add the pyenv and virtualenv init functions to your corresponding shell init (.bashrc, .zshrc, etc):
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"
Start a new shell once you do that to pick up your changes.
Setup the python version we need. At the moment we are testing with python 2.7, 3.5 and 3.6 so we need to install these versions:
$ pyenv install 2.7.10
$ pyenv install 3.5.2
$ pyenv install 3.6.2
Set them as global versions
$ pyenv global 2.7.10 3.5.2 3.6.2
Verify they show up when you do:
$ pyenv versions
Restart your shell and run the command again to verify that it persists across shell sessions.
Now we just need to install tox to run our tests:
$ pip install tox
Run the tests by running:
$ tox