Skip to content

Instantly share code, notes, and snippets.

@praveend
Last active September 10, 2016 08:37
Show Gist options
  • Select an option

  • Save praveend/fe9a0c5eacd6b43ee210e88a374eb230 to your computer and use it in GitHub Desktop.

Select an option

Save praveend/fe9a0c5eacd6b43ee210e88a374eb230 to your computer and use it in GitHub Desktop.
Bangalore Spark Enthusiast Spark Machine Learning meetup demo notebook code
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<pyspark.context.SparkContext at 0x10649a048>"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sc\n",
"#Referenced from https://github.com/bradenrc/Spark_POT/blob/master/Modules/MachineLearning/Classification/Intro%20to%20Stats%20and%20Machine%20Learning.ipynb"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" % Total % Received % Xferd Average Speed Time Time Time Current\n",
" Dload Upload Total Spent Left Speed\n",
"100 58625 100 58625 0 0 26045 0 0:00:02 0:00:02 --:--:-- 32088\n"
]
}
],
"source": [
"!curl https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/MachineLearning/Classification/train.csv -o train.csv"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>age</th>\n",
" <th>classRank</th>\n",
" <th>fare</th>\n",
" <th>parChi</th>\n",
" <th>sibSpou</th>\n",
" <th>survived</th>\n",
" <th>cherbourg</th>\n",
" <th>queenstown</th>\n",
" <th>southampton</th>\n",
" <th>male</th>\n",
" <th>female</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>22.0</td>\n",
" <td>3</td>\n",
" <td>7.2500</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>38.0</td>\n",
" <td>1</td>\n",
" <td>71.2833</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>26.0</td>\n",
" <td>3</td>\n",
" <td>7.9250</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>35.0</td>\n",
" <td>1</td>\n",
" <td>53.1000</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>35.0</td>\n",
" <td>3</td>\n",
" <td>8.0500</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>0</td>\n",
" <td>1</td>\n",
" <td>1</td>\n",
" <td>0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" age classRank fare parChi sibSpou survived cherbourg queenstown \\\n",
"0 22.0 3 7.2500 0 1 0 0 0 \n",
"1 38.0 1 71.2833 0 1 1 1 0 \n",
"2 26.0 3 7.9250 0 0 1 0 0 \n",
"3 35.0 1 53.1000 0 1 1 0 0 \n",
"4 35.0 3 8.0500 0 0 0 0 0 \n",
"\n",
" southampton male female \n",
"0 1 1 0 \n",
"1 0 0 1 \n",
"2 1 0 1 \n",
"3 1 0 1 \n",
"4 1 1 0 "
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pyspark.sql import SQLContext,Row\n",
"from pyspark.sql.functions import lit\n",
"\n",
"loadTitanicData = sc.textFile(\"train.csv\")\n",
"header = loadTitanicData.first()\n",
"loadTitanicData = loadTitanicData.filter(lambda l: l != header).\\\n",
" map(lambda l: l.split(\",\")).\\\n",
" map(lambda l: [l[1],l[2],l[4],l[5],l[6],l[7],l[9],l[11]]).\\\n",
" filter(lambda l: len(l[3]) > 0 and len(l[7]) > 0).\\\n",
" map(lambda l: Row(survived=int(l[0]),\\\n",
" classRank=int(l[1]),\\\n",
" sex=l[2],\\\n",
" age=float(l[3]),\\\n",
" sibSpou=int(l[4]),\\\n",
" parChi=int(l[5]),\\\n",
" fare=float(l[6]),\\\n",
" embarked=l[7]))\n",
"\n",
"sqlContext = SQLContext(sc)\n",
"titanicDf = sqlContext.createDataFrame(loadTitanicData)\n",
"\n",
"from pyspark.sql.functions import UserDefinedFunction\n",
"from pyspark.sql.types import IntegerType\n",
"isCherb = UserDefinedFunction(lambda x: 1 if x == 'C' else 0, IntegerType())\n",
"isQueen = UserDefinedFunction(lambda x: 1 if x == 'Q' else 0, IntegerType())\n",
"isSouth = UserDefinedFunction(lambda x: 1 if x == 'S' else 0, IntegerType())\n",
"isMale = UserDefinedFunction(lambda x: 1 if x == 'male' else 0, IntegerType())\n",
"isFemale = UserDefinedFunction(lambda x: 1 if x == 'female' else 0, IntegerType())\n",
"titanicDf = titanicDf.withColumn(\"cherbourg\",isCherb(titanicDf.embarked)).\\\n",
" withColumn(\"queenstown\",isQueen(titanicDf.embarked)).\\\n",
" withColumn(\"southampton\",isSouth(titanicDf.embarked)).\\\n",
" withColumn(\"male\",isMale(titanicDf.sex)).\\\n",
" withColumn(\"female\",isFemale(titanicDf.sex))\n",
"\n",
"titanicDf = titanicDf.drop(\"sex\").drop(\"embarked\")\n",
"titanicDf.toPandas().head()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"age -0.08244586804341371\n",
"classRank -0.3564615884452382\n",
"fare 0.26609960047658054\n",
"parChi 0.09526529428685307\n",
"sibSpou -0.015523023631749422\n",
"survived 1.0\n",
"cherbourg 0.1956727170209808\n",
"queenstown -0.04896609370573839\n",
"southampton -0.15901541067713143\n",
"male -0.5367616233485033\n",
"female 0.5367616233485033\n"
]
}
],
"source": [
"for col in titanicDf.columns:\n",
" print (col + \" \" + str(titanicDf.corr('survived',col)))"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"classRank 0.0\n",
"age 0.10209429637643497\n",
"sibSpou 0.00042907488830223883\n",
"parChi 6.681060065050204e-05\n",
"fare 4.748476645222155e-08\n",
"cherbourg 1.7776808480807205e-07\n",
"queenstown 0.19135595497570845\n",
"southampton 2.2049207911267743e-05\n",
"male 0.0\n",
"female 0.0\n"
]
}
],
"source": [
"from pyspark.mllib.regression import LabeledPoint \n",
"from pyspark.mllib.stat import Statistics\n",
"\n",
"labRDD = titanicDf.map(lambda l: LabeledPoint(l.survived, [l.classRank,l.age,l.sibSpou,l.parChi,l.fare,\\\n",
" l.cherbourg,l.queenstown,l.southampton,l.male,l.female]))\n",
"features = [\"classRank\",\"age\",\"sibSpou\",\"parChi\",\"fare\",\"cherbourg\",\"queenstown\",\"southampton\",\"male\",\"female\"]\n",
"goodnessOfFitTestResult = Statistics.chiSqTest(labRDD)\n",
"count = 0\n",
"for result in goodnessOfFitTestResult:\n",
" #print result.pValue\n",
" print (features[count] + \" \" + str(result.pValue))\n",
" count = count + 1"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Row(features=DenseVector([22.0, 3.0, 7.25, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0]), label=0.0), Row(features=DenseVector([26.0, 3.0, 7.925, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0]), label=1.0)]\n",
"[Row(features=DenseVector([38.0, 1.0, 71.2833, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0]), label=1.0), Row(features=DenseVector([35.0, 1.0, 53.1, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0]), label=1.0)]\n"
]
}
],
"source": [
"from pyspark.mllib.linalg import Vectors\n",
"titanicDf = titanicDf.map(lambda l: Row(label=float(l.survived),features=\\\n",
" Vectors.dense([l.age,float(l.classRank),l.fare,float(l.parChi),float(l.sibSpou),\\\n",
" float(l.cherbourg),float(l.queenstown),float(l.southampton),\\\n",
" float(l.male),float(l.female)]))).toDF()\n",
"testDf, trainDf = titanicDf.randomSplit([.15,.85],1)\n",
"\n",
"print (testDf.take(2))\n",
"print\n",
"print (trainDf.take(2))"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/praveendevarao/praveen_work/STC/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/ml/classification.py:207: UserWarning: weights is deprecated. Use coefficients instead.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Error:\n",
"0.17117117117117123\n",
"+--------------------+-----+--------------------+--------------------+----------+\n",
"| features|label| rawPrediction| probability|prediction|\n",
"+--------------------+-----+--------------------+--------------------+----------+\n",
"|[22.0,3.0,7.25,0....| 0.0|[1.65332679048888...|[0.83934016889995...| 0.0|\n",
"|[26.0,3.0,7.925,0...| 1.0|[-0.2060407128740...|[0.44867128064588...| 1.0|\n",
"|[2.0,3.0,21.075,1...| 0.0|[1.56784594165630...|[0.82747631336755...| 0.0|\n",
"|[14.0,2.0,30.0708...| 1.0|[-1.3763035321682...|[0.20160332485907...| 1.0|\n",
"|[2.0,3.0,29.125,1...| 0.0|[1.89480332029065...|[0.86930223266283...| 0.0|\n",
"|[8.0,3.0,21.075,1...| 0.0|[-0.1387490095984...|[0.46536828853557...| 1.0|\n",
"|[38.0,3.0,31.3875...| 1.0|[0.17608253780195...|[0.54390724745976...| 0.0|\n",
"|[66.0,2.0,10.5,0....| 0.0|[1.62961280406392...|[0.83611658991107...| 0.0|\n",
"|[42.0,1.0,52.0,0....| 0.0|[0.67402430347840...|[0.66240368157782...| 0.0|\n",
"|[14.0,3.0,11.2417...| 1.0|[-0.7375602742519...|[0.32353787482101...| 1.0|\n",
"|[19.0,3.0,7.8792,...| 1.0|[-0.0887857590718...|[0.47781812979371...| 1.0|\n",
"|[18.0,3.0,17.8,0....| 0.0|[-0.2378446839957...|[0.44081756264295...| 1.0|\n",
"|[21.0,3.0,7.8,0.0...| 0.0|[1.51667074234126...|[0.82004770720354...| 0.0|\n",
"|[11.0,3.0,46.9,2....| 0.0|[1.91040989782854...|[0.87106519073717...| 0.0|\n",
"|[45.0,1.0,83.475,...| 0.0|[0.64219577735737...|[0.65524965027166...| 0.0|\n",
"|[25.0,3.0,7.65,0....| 0.0|[1.58025568084292...|[0.82924072557201...| 0.0|\n",
"|[0.83,2.0,29.0,2....| 1.0|[0.60607417847582...|[0.64704474985088...| 0.0|\n",
"|[28.0,1.0,47.1,0....| 0.0|[0.34566405294421...|[0.58556572965705...| 0.0|\n",
"|[29.0,3.0,8.05,0....| 0.0|[1.64245606923779...|[0.83786885817701...| 0.0|\n",
"|[59.0,3.0,7.25,0....| 0.0|[2.11852496475534...|[0.89269071180358...| 0.0|\n",
"+--------------------+-----+--------------------+--------------------+----------+\n",
"only showing top 20 rows\n",
"\n",
"Coefficients:\n",
"[-0.0158018334818,-0.591343562615,0.00251736383054,-0.026419046642,-0.119469664559,0.29231409134,-0.38845628511,-0.160703792199,-0.900702156103,0.900703796042]\n"
]
}
],
"source": [
"from pyspark.ml.classification import LogisticRegression\n",
"lr = LogisticRegression()\n",
"lrModel = lr.fit(trainDf)\n",
"\n",
"lrPred = lrModel.transform(testDf)\n",
"print (\"Error:\")\n",
"print (lrPred.map(lambda line: (line.label - line.prediction)**2).mean())\n",
"lrPred.show()\n",
"print (\"Coefficients:\")\n",
"print (lrModel.coefficients)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Error:\n",
"0.2072072072072072\n"
]
}
],
"source": [
"from pyspark.ml import Pipeline\n",
"from pyspark.ml.classification import DecisionTreeClassifier\n",
"from pyspark.ml.feature import StringIndexer, VectorIndexer\n",
"\n",
"labelIndexer = StringIndexer(inputCol=\"label\", outputCol=\"indexedLabel\")\n",
"featureIndexer = VectorIndexer(inputCol=\"features\", outputCol=\"indexedFeatures\", maxCategories=4)\n",
"\n",
"dtc = DecisionTreeClassifier(labelCol=\"indexedLabel\", featuresCol=\"indexedFeatures\")\n",
"dtcPipeline = Pipeline(stages=[labelIndexer, featureIndexer, dtc])\n",
"\n",
"dtcModel = dtcPipeline.fit(trainDf)\n",
"dtcPred = dtcModel.transform(testDf)\n",
"\n",
"print (\"Error:\")\n",
"print (dtcPred.map(lambda line: (line.label - line.prediction)**2).mean())"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Error:\n",
"0.19819819819819823\n"
]
}
],
"source": [
"from pyspark.ml.classification import RandomForestClassifier\n",
"\n",
"rfc = RandomForestClassifier(labelCol=\"indexedLabel\", featuresCol=\"indexedFeatures\")\n",
"rfcPipeline = Pipeline(stages=[labelIndexer, featureIndexer, rfc])\n",
"\n",
"rfcModel = rfcPipeline.fit(trainDf)\n",
"rfcPred = rfcModel.transform(testDf)\n",
"\n",
"print (\"Error:\")\n",
"print (rfcPred.map(lambda line: (line.label - line.prediction)**2).mean())"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"rfcacc is : 0.8740026595744683\n",
"dtcacc is : 0.6313164893617023\n",
"lracc is : 0.8816489361702128\n"
]
}
],
"source": [
"from pyspark.ml.evaluation import BinaryClassificationEvaluator\n",
"evaluator = BinaryClassificationEvaluator()\n",
"rfcacc = evaluator.evaluate(rfcPred)\n",
"lracc = evaluator.evaluate(lrPred)\n",
"dtcacc = evaluator.evaluate(dtcPred)\n",
"print (\"rfcacc is : \" + str(rfcacc))\n",
"print (\"dtcacc is : \" + str(dtcacc))\n",
"print (\"lracc is : \" + str(lracc))"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[Row(features=DenseVector([27.0, 3.0, 50.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0]), rawPrediction=DenseVector([1.6247, -1.6247]), probability=DenseVector([0.8354, 0.1646]), prediction=0.0)]"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"userInput = sc.parallelize([Row(features=Vectors.dense([27.0, 3.0,50.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0]))]).toDF()\n",
"lrModel.transform(userInput).take(1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "pySpark (Spark 1.6.0)",
"language": "python",
"name": "pyspark"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment