8 min read
Are you a data scientist, engineer, or researcher, just getting into distributed processing using PySpark? Chances are that you’re going to want to run some of the popular new Python libraries that everybody is talking about, like MatPlotLib.
If so, you may have noticed that it's not as simple as installing it on your local machine and submitting jobs to the cluster. In order for the Spark executors to access these libraries, they have to live on each of the Spark worker nodes.
You could go through and manually install each of these environments using pip, but maybe you also want the ability to use multiple versions of Python or other libraries like pandas? Maybe you also want to allow other colleagues to specify their own environments and combinations?
If this is the case, then you should be looking toward using Condas to provide specialized and personalized Python configurations that are accessible to Python programs. Conda is a tool to keep track of Conda packages and tarball files containing Python (or other) libraries and to maintain the dependencies between packages and the platform.
For this blog, we’ll focus on submitting jobs from spark-submit. In a later iteration of this blog, we’ll cover how to use these environments from a notebook like Apache Zeppelin.
If you have a larger cluster, I recommend using a tool like pssh (parallel SSH) to automate these steps across all nodes.
To begin, we’ll download and run the Miniconda installer for Linux (64-bit) on each node where Apache Spark executors will run. Please make sure, before beginning the install, that you have the bzip2 library installed on all hosts:
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -P /tmp/ bash /tmp/Miniconda3-latest-Linux-x86_64.sh
I recommend choosing /opt/miniconda3/ as the install directory, and, when the install completes, you’ll need to close and reopen your terminal session.
If your install is successful, you should be able to run ‘conda list’ and see the following packages:
conda list # packages in environment at /opt/miniconda3: # asn1crypto 0.22.0 py36_0 cffi 1.10.0 py36_0 conda 4.3.21 py36_0 conda-env 2.6.0 0 cryptography 1.8.1 py36_0 idna 2.5 py36_0 libffi 3.2.1 1 openssl 1.0.2l 0 packaging 16.8 py36_0 pip 9.0.1 py36_1 pycosat 0.6.2 py36_0 pycparser 2.17 py36_0 pyopenssl 17.0.0 py36_0 pyparsing 2.1.4 py36_0 python 3.6.1 2 readline 6.2 2 requests 2.14.2 py36_0 ruamel_yaml 0.11.14 py36_1 setuptools 27.2.0 py36_0 six 1.10.0 py36_0 sqlite 3.13.0 0 tk 8.5.18 0 wheel 0.29.0 py36_0 xz 5.2.2 1 yaml 0.1.6 0 zlib 1.2.8 3
Miniconda installs an initial default Conda, running Python 3.6.1. To make sure this installation worked, run a version command:
python -V Python 3.6.1 :: Continuum Analytics, Inc.
To explain what’s going on here: we haven’t removed the previous default version of Python, and it can still be found by referencing the default path: /bin/python. We’ve simply added some new Python packages, like Java alternatives, that we can point to while submitting jobs without disrupting our cluster environment. See:
/bin/python -V Python 2.7.5
Now, let’s go ahead and create a test environment with access to Python 3.5 and NumPy libraries.
First, we create the Conda and specify the Python version (do this as your cluster user):
conda create --name mapr_numpy python=3.5
Next, let’s go ahead and install NumPy to this environment:
conda install --name mapr_numpy numpy
Then, let’s activate this environment, and check the Python version:
source activate mapr_numpy (mapr_numpy) [centos]# python -V Python 3.5.3 :: Continuum Analytics, Inc. conda info --envs # conda environments: # mapr_numpy * /opt/miniconda3/envs/mapr_numpy root /opt/miniconda3
Please complete these steps for all nodes that will run PySpark code.
Let’s begin with something very simple referencing environments and checking the Python version to make sure it’s being set correctly. Here, I’ve made a tiny script that prints the Python version:
$SPARK_HOME/bin/spark-submit spark_test.py 3.6.1 |Continuum Analytics, Inc.| (default, May 11 2017, 13:09:58) PYSPARK_PYTHON=/bin/python $SPARK_HOME/bin/spark-submit spark_test.py 2.7.5 (default, Nov 6 2016, 00:28:07) PYSPARK_PYTHON=/opt/miniconda3/envs/mapr_numpy/bin/python $SPARK_HOME/bin/spark-submit spark_test.py 3.5.3 |Continuum Analytics, Inc.| (default, Mar 6 2017, 11:58:13)
Now, let’s make sure this worked!
I’ve created a little test script called spark_numpy_test.py, containing the following:
import numpy a1 = numpy.array([1,2,3,4,5]) a1sum = a1.sum() print(a1sum)
If I were to run this script without activating or pointing to my Conda with NumPy installed, I would see this error:
[mapr]$ $SPARK_HOME/bin/spark-submit --master yarn spark_numpy_test.py Traceback (most recent call last): File "/mapr/my.cluster.com/user/mapr/spark_numpy_test.py", line 1, in <module> import numpy ModuleNotFoundError: No module named 'numpy'
In order to get around this error, specify the Python environment in the submit statement:
[mapr]$ PYSPARK_PYTHON=/opt/miniconda3/envs/mapr_numpy/bin/python $SPARK_HOME/bin/spark-submit --master yarn spark_numpy_test.py 15
We’re going to run through a quick example of word tokenization to demonstrate the use of Python environments with Spark on YARN.
First, we’ll create a new Conda, and insatll the NLTK library to it on all cluster nodes:
conda create --name mapr_nltk nltk python=3.5 source activate mapr_nltk
Note that some builds of PySpark are not compatible with Python 3.6, so we’ve specified an older version.
Next, we have to download the demo data from the NLTK repository:
(mapr_nltk) [mapr]# python -m nltk.downloader -d /mapr/my.cluster.com/user/mapr/nltk all-corpora [nltk_data] Downloading collection 'all' [nltk_data] | [nltk_data] | Downloading package abc to [nltk_data] | /mapr/my.cluster.com/user/mapr/nltk... [nltk_data] | Unzipping corpora/abc.zip. [nltk_data] | Downloading package alpino to [nltk_data] | /mapr/my.cluster.com/user/mapr/nltk... [nltk_data] | Unzipping corpora/alpino.zip. [nltk_data] | Downloading package biocreative_ppi to [...]
This step will download all of the data to the directory that you specify–in this case, the default MapR-FS directory for the cluster user, accessible by all nodes in the cluster.
Next, create the following Python script: nltk_test.py
from pyspark import SparkConf from pyspark import SparkContext conf = SparkConf() sc = SparkContext(conf=conf) data = sc.textFile('/user/mapr/nltk/corpora/state_union/1972-Nixon.txt') def word_tokenize(x): import nltk nltk.data.path.append("/mapr/my.cluster.com/user/mapr/nltk/") return nltk.word_tokenize(x) def pos_tag(x): import nltk nltk.data.path.append("/mapr/my.cluster.com/user/mapr/nltk/") return nltk.pos_tag([x]) words = data.flatMap(word_tokenize) print (words.take(10)) pos_word = words.map(pos_tag) print (pos_word.take(5))
Then, run the following as the cluster user to test:
[mapr]$ PYSPARK_PYTHON=/opt/miniconda3/envs/mapr_nltk/bin/python $SPARK_HOME/bin/spark-submit --master yarn nltk_test.py 'Address', 'on', 'the', 'State', 'of', 'the', 'Union', 'Delivered', 'Before', 'a'] [[('Address', 'NN')], [('on', 'IN')], [('the', 'DT')], [('State', 'NN')], [('of', 'IN')]]
Stay ahead of the bleeding edge...get the best of Big Data in your inbox.