Last active
September 10, 2016 08:37
-
-
Save praveend/fe9a0c5eacd6b43ee210e88a374eb230 to your computer and use it in GitHub Desktop.
Bangalore Spark Enthusiast Spark Machine Learning meetup demo notebook code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "<pyspark.context.SparkContext at 0x10649a048>" | |
| ] | |
| }, | |
| "execution_count": 1, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "sc\n", | |
| "#Referenced from https://github.com/bradenrc/Spark_POT/blob/master/Modules/MachineLearning/Classification/Intro%20to%20Stats%20and%20Machine%20Learning.ipynb" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| " % Total % Received % Xferd Average Speed Time Time Time Current\n", | |
| " Dload Upload Total Spent Left Speed\n", | |
| "100 58625 100 58625 0 0 26045 0 0:00:02 0:00:02 --:--:-- 32088\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "!curl https://raw.githubusercontent.com/bradenrc/Spark_POT/master/Modules/MachineLearning/Classification/train.csv -o train.csv" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 3, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/html": [ | |
| "<div>\n", | |
| "<table border=\"1\" class=\"dataframe\">\n", | |
| " <thead>\n", | |
| " <tr style=\"text-align: right;\">\n", | |
| " <th></th>\n", | |
| " <th>age</th>\n", | |
| " <th>classRank</th>\n", | |
| " <th>fare</th>\n", | |
| " <th>parChi</th>\n", | |
| " <th>sibSpou</th>\n", | |
| " <th>survived</th>\n", | |
| " <th>cherbourg</th>\n", | |
| " <th>queenstown</th>\n", | |
| " <th>southampton</th>\n", | |
| " <th>male</th>\n", | |
| " <th>female</th>\n", | |
| " </tr>\n", | |
| " </thead>\n", | |
| " <tbody>\n", | |
| " <tr>\n", | |
| " <th>0</th>\n", | |
| " <td>22.0</td>\n", | |
| " <td>3</td>\n", | |
| " <td>7.2500</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>1</td>\n", | |
| " <td>0</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>1</th>\n", | |
| " <td>38.0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>71.2833</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>1</td>\n", | |
| " <td>1</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>2</th>\n", | |
| " <td>26.0</td>\n", | |
| " <td>3</td>\n", | |
| " <td>7.9250</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>3</th>\n", | |
| " <td>35.0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>53.1000</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>1</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " </tr>\n", | |
| " <tr>\n", | |
| " <th>4</th>\n", | |
| " <td>35.0</td>\n", | |
| " <td>3</td>\n", | |
| " <td>8.0500</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>0</td>\n", | |
| " <td>1</td>\n", | |
| " <td>1</td>\n", | |
| " <td>0</td>\n", | |
| " </tr>\n", | |
| " </tbody>\n", | |
| "</table>\n", | |
| "</div>" | |
| ], | |
| "text/plain": [ | |
| " age classRank fare parChi sibSpou survived cherbourg queenstown \\\n", | |
| "0 22.0 3 7.2500 0 1 0 0 0 \n", | |
| "1 38.0 1 71.2833 0 1 1 1 0 \n", | |
| "2 26.0 3 7.9250 0 0 1 0 0 \n", | |
| "3 35.0 1 53.1000 0 1 1 0 0 \n", | |
| "4 35.0 3 8.0500 0 0 0 0 0 \n", | |
| "\n", | |
| " southampton male female \n", | |
| "0 1 1 0 \n", | |
| "1 0 0 1 \n", | |
| "2 1 0 1 \n", | |
| "3 1 0 1 \n", | |
| "4 1 1 0 " | |
| ] | |
| }, | |
| "execution_count": 3, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "from pyspark.sql import SQLContext,Row\n", | |
| "from pyspark.sql.functions import lit\n", | |
| "\n", | |
| "loadTitanicData = sc.textFile(\"train.csv\")\n", | |
| "header = loadTitanicData.first()\n", | |
| "loadTitanicData = loadTitanicData.filter(lambda l: l != header).\\\n", | |
| " map(lambda l: l.split(\",\")).\\\n", | |
| " map(lambda l: [l[1],l[2],l[4],l[5],l[6],l[7],l[9],l[11]]).\\\n", | |
| " filter(lambda l: len(l[3]) > 0 and len(l[7]) > 0).\\\n", | |
| " map(lambda l: Row(survived=int(l[0]),\\\n", | |
| " classRank=int(l[1]),\\\n", | |
| " sex=l[2],\\\n", | |
| " age=float(l[3]),\\\n", | |
| " sibSpou=int(l[4]),\\\n", | |
| " parChi=int(l[5]),\\\n", | |
| " fare=float(l[6]),\\\n", | |
| " embarked=l[7]))\n", | |
| "\n", | |
| "sqlContext = SQLContext(sc)\n", | |
| "titanicDf = sqlContext.createDataFrame(loadTitanicData)\n", | |
| "\n", | |
| "from pyspark.sql.functions import UserDefinedFunction\n", | |
| "from pyspark.sql.types import IntegerType\n", | |
| "isCherb = UserDefinedFunction(lambda x: 1 if x == 'C' else 0, IntegerType())\n", | |
| "isQueen = UserDefinedFunction(lambda x: 1 if x == 'Q' else 0, IntegerType())\n", | |
| "isSouth = UserDefinedFunction(lambda x: 1 if x == 'S' else 0, IntegerType())\n", | |
| "isMale = UserDefinedFunction(lambda x: 1 if x == 'male' else 0, IntegerType())\n", | |
| "isFemale = UserDefinedFunction(lambda x: 1 if x == 'female' else 0, IntegerType())\n", | |
| "titanicDf = titanicDf.withColumn(\"cherbourg\",isCherb(titanicDf.embarked)).\\\n", | |
| " withColumn(\"queenstown\",isQueen(titanicDf.embarked)).\\\n", | |
| " withColumn(\"southampton\",isSouth(titanicDf.embarked)).\\\n", | |
| " withColumn(\"male\",isMale(titanicDf.sex)).\\\n", | |
| " withColumn(\"female\",isFemale(titanicDf.sex))\n", | |
| "\n", | |
| "titanicDf = titanicDf.drop(\"sex\").drop(\"embarked\")\n", | |
| "titanicDf.toPandas().head()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 4, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "age -0.08244586804341371\n", | |
| "classRank -0.3564615884452382\n", | |
| "fare 0.26609960047658054\n", | |
| "parChi 0.09526529428685307\n", | |
| "sibSpou -0.015523023631749422\n", | |
| "survived 1.0\n", | |
| "cherbourg 0.1956727170209808\n", | |
| "queenstown -0.04896609370573839\n", | |
| "southampton -0.15901541067713143\n", | |
| "male -0.5367616233485033\n", | |
| "female 0.5367616233485033\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "for col in titanicDf.columns:\n", | |
| " print (col + \" \" + str(titanicDf.corr('survived',col)))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 5, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "classRank 0.0\n", | |
| "age 0.10209429637643497\n", | |
| "sibSpou 0.00042907488830223883\n", | |
| "parChi 6.681060065050204e-05\n", | |
| "fare 4.748476645222155e-08\n", | |
| "cherbourg 1.7776808480807205e-07\n", | |
| "queenstown 0.19135595497570845\n", | |
| "southampton 2.2049207911267743e-05\n", | |
| "male 0.0\n", | |
| "female 0.0\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "from pyspark.mllib.regression import LabeledPoint \n", | |
| "from pyspark.mllib.stat import Statistics\n", | |
| "\n", | |
| "labRDD = titanicDf.map(lambda l: LabeledPoint(l.survived, [l.classRank,l.age,l.sibSpou,l.parChi,l.fare,\\\n", | |
| " l.cherbourg,l.queenstown,l.southampton,l.male,l.female]))\n", | |
| "features = [\"classRank\",\"age\",\"sibSpou\",\"parChi\",\"fare\",\"cherbourg\",\"queenstown\",\"southampton\",\"male\",\"female\"]\n", | |
| "goodnessOfFitTestResult = Statistics.chiSqTest(labRDD)\n", | |
| "count = 0\n", | |
| "for result in goodnessOfFitTestResult:\n", | |
| " #print result.pValue\n", | |
| " print (features[count] + \" \" + str(result.pValue))\n", | |
| " count = count + 1" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 6, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "[Row(features=DenseVector([22.0, 3.0, 7.25, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0]), label=0.0), Row(features=DenseVector([26.0, 3.0, 7.925, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0]), label=1.0)]\n", | |
| "[Row(features=DenseVector([38.0, 1.0, 71.2833, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0]), label=1.0), Row(features=DenseVector([35.0, 1.0, 53.1, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0]), label=1.0)]\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "from pyspark.mllib.linalg import Vectors\n", | |
| "titanicDf = titanicDf.map(lambda l: Row(label=float(l.survived),features=\\\n", | |
| " Vectors.dense([l.age,float(l.classRank),l.fare,float(l.parChi),float(l.sibSpou),\\\n", | |
| " float(l.cherbourg),float(l.queenstown),float(l.southampton),\\\n", | |
| " float(l.male),float(l.female)]))).toDF()\n", | |
| "testDf, trainDf = titanicDf.randomSplit([.15,.85],1)\n", | |
| "\n", | |
| "print (testDf.take(2))\n", | |
| "print\n", | |
| "print (trainDf.take(2))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 7, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "/Users/praveendevarao/praveen_work/STC/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/ml/classification.py:207: UserWarning: weights is deprecated. Use coefficients instead.\n" | |
| ] | |
| }, | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Error:\n", | |
| "0.17117117117117123\n", | |
| "+--------------------+-----+--------------------+--------------------+----------+\n", | |
| "| features|label| rawPrediction| probability|prediction|\n", | |
| "+--------------------+-----+--------------------+--------------------+----------+\n", | |
| "|[22.0,3.0,7.25,0....| 0.0|[1.65332679048888...|[0.83934016889995...| 0.0|\n", | |
| "|[26.0,3.0,7.925,0...| 1.0|[-0.2060407128740...|[0.44867128064588...| 1.0|\n", | |
| "|[2.0,3.0,21.075,1...| 0.0|[1.56784594165630...|[0.82747631336755...| 0.0|\n", | |
| "|[14.0,2.0,30.0708...| 1.0|[-1.3763035321682...|[0.20160332485907...| 1.0|\n", | |
| "|[2.0,3.0,29.125,1...| 0.0|[1.89480332029065...|[0.86930223266283...| 0.0|\n", | |
| "|[8.0,3.0,21.075,1...| 0.0|[-0.1387490095984...|[0.46536828853557...| 1.0|\n", | |
| "|[38.0,3.0,31.3875...| 1.0|[0.17608253780195...|[0.54390724745976...| 0.0|\n", | |
| "|[66.0,2.0,10.5,0....| 0.0|[1.62961280406392...|[0.83611658991107...| 0.0|\n", | |
| "|[42.0,1.0,52.0,0....| 0.0|[0.67402430347840...|[0.66240368157782...| 0.0|\n", | |
| "|[14.0,3.0,11.2417...| 1.0|[-0.7375602742519...|[0.32353787482101...| 1.0|\n", | |
| "|[19.0,3.0,7.8792,...| 1.0|[-0.0887857590718...|[0.47781812979371...| 1.0|\n", | |
| "|[18.0,3.0,17.8,0....| 0.0|[-0.2378446839957...|[0.44081756264295...| 1.0|\n", | |
| "|[21.0,3.0,7.8,0.0...| 0.0|[1.51667074234126...|[0.82004770720354...| 0.0|\n", | |
| "|[11.0,3.0,46.9,2....| 0.0|[1.91040989782854...|[0.87106519073717...| 0.0|\n", | |
| "|[45.0,1.0,83.475,...| 0.0|[0.64219577735737...|[0.65524965027166...| 0.0|\n", | |
| "|[25.0,3.0,7.65,0....| 0.0|[1.58025568084292...|[0.82924072557201...| 0.0|\n", | |
| "|[0.83,2.0,29.0,2....| 1.0|[0.60607417847582...|[0.64704474985088...| 0.0|\n", | |
| "|[28.0,1.0,47.1,0....| 0.0|[0.34566405294421...|[0.58556572965705...| 0.0|\n", | |
| "|[29.0,3.0,8.05,0....| 0.0|[1.64245606923779...|[0.83786885817701...| 0.0|\n", | |
| "|[59.0,3.0,7.25,0....| 0.0|[2.11852496475534...|[0.89269071180358...| 0.0|\n", | |
| "+--------------------+-----+--------------------+--------------------+----------+\n", | |
| "only showing top 20 rows\n", | |
| "\n", | |
| "Coefficients:\n", | |
| "[-0.0158018334818,-0.591343562615,0.00251736383054,-0.026419046642,-0.119469664559,0.29231409134,-0.38845628511,-0.160703792199,-0.900702156103,0.900703796042]\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "from pyspark.ml.classification import LogisticRegression\n", | |
| "lr = LogisticRegression()\n", | |
| "lrModel = lr.fit(trainDf)\n", | |
| "\n", | |
| "lrPred = lrModel.transform(testDf)\n", | |
| "print (\"Error:\")\n", | |
| "print (lrPred.map(lambda line: (line.label - line.prediction)**2).mean())\n", | |
| "lrPred.show()\n", | |
| "print (\"Coefficients:\")\n", | |
| "print (lrModel.coefficients)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 8, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Error:\n", | |
| "0.2072072072072072\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "from pyspark.ml import Pipeline\n", | |
| "from pyspark.ml.classification import DecisionTreeClassifier\n", | |
| "from pyspark.ml.feature import StringIndexer, VectorIndexer\n", | |
| "\n", | |
| "labelIndexer = StringIndexer(inputCol=\"label\", outputCol=\"indexedLabel\")\n", | |
| "featureIndexer = VectorIndexer(inputCol=\"features\", outputCol=\"indexedFeatures\", maxCategories=4)\n", | |
| "\n", | |
| "dtc = DecisionTreeClassifier(labelCol=\"indexedLabel\", featuresCol=\"indexedFeatures\")\n", | |
| "dtcPipeline = Pipeline(stages=[labelIndexer, featureIndexer, dtc])\n", | |
| "\n", | |
| "dtcModel = dtcPipeline.fit(trainDf)\n", | |
| "dtcPred = dtcModel.transform(testDf)\n", | |
| "\n", | |
| "print (\"Error:\")\n", | |
| "print (dtcPred.map(lambda line: (line.label - line.prediction)**2).mean())" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 9, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Error:\n", | |
| "0.19819819819819823\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "from pyspark.ml.classification import RandomForestClassifier\n", | |
| "\n", | |
| "rfc = RandomForestClassifier(labelCol=\"indexedLabel\", featuresCol=\"indexedFeatures\")\n", | |
| "rfcPipeline = Pipeline(stages=[labelIndexer, featureIndexer, rfc])\n", | |
| "\n", | |
| "rfcModel = rfcPipeline.fit(trainDf)\n", | |
| "rfcPred = rfcModel.transform(testDf)\n", | |
| "\n", | |
| "print (\"Error:\")\n", | |
| "print (rfcPred.map(lambda line: (line.label - line.prediction)**2).mean())" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 10, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "rfcacc is : 0.8740026595744683\n", | |
| "dtcacc is : 0.6313164893617023\n", | |
| "lracc is : 0.8816489361702128\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n", | |
| "evaluator = BinaryClassificationEvaluator()\n", | |
| "rfcacc = evaluator.evaluate(rfcPred)\n", | |
| "lracc = evaluator.evaluate(lrPred)\n", | |
| "dtcacc = evaluator.evaluate(dtcPred)\n", | |
| "print (\"rfcacc is : \" + str(rfcacc))\n", | |
| "print (\"dtcacc is : \" + str(dtcacc))\n", | |
| "print (\"lracc is : \" + str(lracc))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 11, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "[Row(features=DenseVector([27.0, 3.0, 50.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0]), rawPrediction=DenseVector([1.6247, -1.6247]), probability=DenseVector([0.8354, 0.1646]), prediction=0.0)]" | |
| ] | |
| }, | |
| "execution_count": 11, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "userInput = sc.parallelize([Row(features=Vectors.dense([27.0, 3.0,50.0, 0.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0]))]).toDF()\n", | |
| "lrModel.transform(userInput).take(1)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "anaconda-cloud": {}, | |
| "kernelspec": { | |
| "display_name": "pySpark (Spark 1.6.0)", | |
| "language": "python", | |
| "name": "pyspark" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.5.2" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 0 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment