Dataware for data-driven transformation

Python Environments for PySpark, Part 1: Using Condas

Contributed by

8 min read

Are you a data scientist, engineer, or researcher, just getting into distributed processing using PySpark? Chances are that you’re going to want to run some of the popular new Python libraries that everybody is talking about, like MatPlotLib.

If so, you may have noticed that it's not as simple as installing it on your local machine and submitting jobs to the cluster. In order for the Spark executors to access these libraries, they have to live on each of the Spark worker nodes.

You could go through and manually install each of these environments using pip, but maybe you also want the ability to use multiple versions of Python or other libraries like pandas? Maybe you also want to allow other colleagues to specify their own environments and combinations?

If this is the case, then you should be looking toward using Condas to provide specialized and personalized Python configurations that are accessible to Python programs. Conda is a tool to keep track of Conda packages and tarball files containing Python (or other) libraries and to maintain the dependencies between packages and the platform.

Continuum Analytics provides an installer for Conda called Miniconda, which contains only Conda and its dependencies, and this installer is what we’ll be using today.

For this blog, we’ll focus on submitting jobs from spark-submit. In a later iteration of this blog, we’ll cover how to use these environments from a notebook like Apache Zeppelin.

Installing Miniconda and Python Libraries to All Nodes

If you have a larger cluster, I recommend using a tool like pssh (parallel SSH) to automate these steps across all nodes.

To begin, we’ll download and run the Miniconda installer for Linux (64-bit) on each node where Apache Spark executors will run. Please make sure, before beginning the install, that you have the bzip2 library installed on all hosts:

wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -P /tmp/
bash /tmp/Miniconda3-latest-Linux-x86_64.sh

I recommend choosing /opt/miniconda3/ as the install directory, and, when the install completes, you’ll need to close and reopen your terminal session.

If your install is successful, you should be able to run ‘conda list’ and see the following packages:

conda list
# packages in environment at /opt/miniconda3:
#
asn1crypto                0.22.0                   py36_0  
cffi                      1.10.0                   py36_0  
conda                     4.3.21                   py36_0  
conda-env                 2.6.0                         0  
cryptography              1.8.1                    py36_0  
idna                      2.5                      py36_0  
libffi                    3.2.1                         1  
openssl                   1.0.2l                        0  
packaging                 16.8                     py36_0  
pip                       9.0.1                    py36_1  
pycosat                   0.6.2                    py36_0  
pycparser                 2.17                     py36_0  
pyopenssl                 17.0.0                   py36_0  
pyparsing                 2.1.4                    py36_0  
python                    3.6.1                         2  
readline                  6.2                           2  
requests                  2.14.2                   py36_0  
ruamel_yaml               0.11.14                  py36_1  
setuptools                27.2.0                   py36_0  
six                       1.10.0                   py36_0  
sqlite                    3.13.0                        0  
tk                        8.5.18                        0  
wheel                     0.29.0                   py36_0  
xz                        5.2.2                         1  
yaml                      0.1.6                         0  
zlib                      1.2.8                         3  

Miniconda installs an initial default Conda, running Python 3.6.1. To make sure this installation worked, run a version command:

python -V
Python 3.6.1 :: Continuum Analytics, Inc.

To explain what’s going on here: we haven’t removed the previous default version of Python, and it can still be found by referencing the default path: /bin/python. We’ve simply added some new Python packages, like Java alternatives, that we can point to while submitting jobs without disrupting our cluster environment. See:

/bin/python -V
Python 2.7.5

Now, let’s go ahead and create a test environment with access to Python 3.5 and NumPy libraries.

First, we create the Conda and specify the Python version (do this as your cluster user):

conda create --name mapr_numpy python=3.5

Next, let’s go ahead and install NumPy to this environment:

conda install --name mapr_numpy numpy

Then, let’s activate this environment, and check the Python version:

source activate mapr_numpy
(mapr_numpy) [centos]# python -V
Python 3.5.3 :: Continuum Analytics, Inc.
conda info --envs
# conda environments:
#
mapr_numpy           *  /opt/miniconda3/envs/mapr_numpy
root                     /opt/miniconda3

Please complete these steps for all nodes that will run PySpark code.

Using Spark-Submit with Conda

Let’s begin with something very simple referencing environments and checking the Python version to make sure it’s being set correctly. Here, I’ve made a tiny script that prints the Python version:

$SPARK_HOME/bin/spark-submit spark_test.py
3.6.1 |Continuum Analytics, Inc.| (default, May 11 2017, 13:09:58)

PYSPARK_PYTHON=/bin/python $SPARK_HOME/bin/spark-submit spark_test.py
2.7.5 (default, Nov  6 2016, 00:28:07)

PYSPARK_PYTHON=/opt/miniconda3/envs/mapr_numpy/bin/python $SPARK_HOME/bin/spark-submit spark_test.py  
3.5.3 |Continuum Analytics, Inc.| (default, Mar  6 2017, 11:58:13)

Testing NumPy

Now, let’s make sure this worked!

I’ve created a little test script called spark_numpy_test.py, containing the following:

import numpy
a1 = numpy.array([1,2,3,4,5])
a1sum = a1.sum()
print(a1sum)

If I were to run this script without activating or pointing to my Conda with NumPy installed, I would see this error:

[mapr]$ $SPARK_HOME/bin/spark-submit --master yarn spark_numpy_test.py
Traceback (most recent call last):
  File "/mapr/my.cluster.com/user/mapr/spark_numpy_test.py", line 1, in <module>
    import numpy
ModuleNotFoundError: No module named 'numpy'

In order to get around this error, specify the Python environment in the submit statement:

[mapr]$ PYSPARK_PYTHON=/opt/miniconda3/envs/mapr_numpy/bin/python $SPARK_HOME/bin/spark-submit --master yarn spark_numpy_test.py
15

Now for Something a Little More Advanced...

This example of PySpark uses the NLTK Library for Natural Language Processing and has been adapted from Continuum Analytics:

We’re going to run through a quick example of word tokenization to demonstrate the use of Python environments with Spark on YARN.

First, we’ll create a new Conda, and insatll the NLTK library to it on all cluster nodes:

conda create --name mapr_nltk nltk python=3.5
source activate mapr_nltk

Note that some builds of PySpark are not compatible with Python 3.6, so we’ve specified an older version.

Next, we have to download the demo data from the NLTK repository:

(mapr_nltk) [mapr]# python -m nltk.downloader -d /mapr/my.cluster.com/user/mapr/nltk all-corpora
[nltk_data] Downloading collection 'all'
[nltk_data]    |
[nltk_data]    | Downloading package abc to
[nltk_data]    |     /mapr/my.cluster.com/user/mapr/nltk...
[nltk_data]    |   Unzipping corpora/abc.zip.
[nltk_data]    | Downloading package alpino to
[nltk_data]    |     /mapr/my.cluster.com/user/mapr/nltk...
[nltk_data]    |   Unzipping corpora/alpino.zip.
[nltk_data]    | Downloading package biocreative_ppi to
[...]

This step will download all of the data to the directory that you specify–in this case, the default MapR-FS directory for the cluster user, accessible by all nodes in the cluster.

Next, create the following Python script: nltk_test.py

from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()
sc = SparkContext(conf=conf)

data = sc.textFile('/user/mapr/nltk/corpora/state_union/1972-Nixon.txt')

def word_tokenize(x):
        import nltk
        nltk.data.path.append("/mapr/my.cluster.com/user/mapr/nltk/")
        return nltk.word_tokenize(x)

def pos_tag(x):
        import nltk
        nltk.data.path.append("/mapr/my.cluster.com/user/mapr/nltk/")
        return nltk.pos_tag([x])

words = data.flatMap(word_tokenize)
print (words.take(10))

pos_word = words.map(pos_tag)
print (pos_word.take(5))

Then, run the following as the cluster user to test:

[mapr]$  PYSPARK_PYTHON=/opt/miniconda3/envs/mapr_nltk/bin/python $SPARK_HOME/bin/spark-submit --master yarn nltk_test.py
'Address', 'on', 'the', 'State', 'of', 'the', 'Union', 'Delivered', 'Before', 'a']
[[('Address', 'NN')], [('on', 'IN')], [('the', 'DT')], [('State', 'NN')], [('of', 'IN')]]

Additional Resources


This blog post was published June 29, 2017.
Categories

50,000+ of the smartest have already joined!

Stay ahead of the bleeding edge...get the best of Big Data in your inbox.


Get our latest posts in your inbox

Subscribe Now