# CMU MSP 36602
# Solution for SparkHW.pdf
# Name: 
# April 2019
# Runstring: spark-submit sparkHW.py |& egrep -v "(INFO|WARN)"

# Setup spark  [No need to run inside interactive pyspark!]
from pyspark import SparkContext
from pyspark.sql import SparkSession

# For creating schemas manually:
from pyspark.sql.types import StructField, StructType, \
                              IntegerType, StringType, DoubleType
                              
# For running random forest models:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest

## Spark setup [Do not run inside interactive pyspark!]
sc = SparkContext("local", "car imports", pyFiles=[])
sqlContext = SparkSession(sc)

## Schema for both datasets
mySchema = StructType([StructField('x0', DoubleType()),
                       StructField('x1', DoubleType()),
                       StructField('x2', DoubleType()),
                       StructField('x3', DoubleType()),
                       StructField('x4', StringType()),
                       StructField('x5', StringType()),
                       StructField('x6', StringType()),
                       StructField('x7', IntegerType()),
                       StructField('x8', IntegerType()),
                       StructField('y', DoubleType())])

## Read from csv
train = sqlContext.read.csv("train.csv", schema=mySchema,
                            header=True)
# Test code:
# train.show(5)
# train.printSchema()

test = 
# Test code:
# test.show(5)

## Convert 'train' and 'test' to RDDs as needed for RandomForest.
train = 
test = 

## Add in categorical variables
#
# Both 'train' and 'train' need to end up as RDDs with numeric
# variables only.  Here we need to convert "A", "B", ... to 0, 1, ...
# and "T", "U", ... to 0, 1, ...
#
# Note: The StringIndexer() function is good general way to
#       recode strings as numerics as needed for spark's
#       random forest.  See
#       https://stackoverflow.com/questions/36942233/
#       apply-stringindexer-to-several-columns-in-a-pyspark-dataframe
#
# But here you can just use the ord(str) function to convert the
# single letter values to their ASCII value, then subtract off the
# value needed to make the range start at zero.
# You can use map() to create the new row format.  I suggest first
# computing "minx4", "minx5", and "minx6" which contain the mininum
# values of x4-x6.  While you are at it, save the maximum values, too.

minx4 = 
minx5 = 
minx6 = 
maxx4 = 
maxx5 = 
maxx6 = 

train = train.map(

test = test.map(

                         
# As a final step for data prep, we need to convert the data type
# to LabeledPoint.  Use map() and have the map return
# LabeledPoint(y, x) where 'y' is the y (DV, label) value from
# the original RDD, and 'x' is a list or tuple of the x values (IVs,
# features).  Do it for 'test', too.
train = 
test = 

# FYI, a LabeledPoint has two attributes, "label" and "features":
# jnk = train.first()
# jnk.label  # this is a float
# jnk.features  # this is a DenseVector


# For the three categorical variables, RandomForest needs a dictionary
# where the key is the index of the variable in the "features" and the
# value is the number of different values.  Call this dictionary "sizes".
sizes = {


# Fit the model, using .trainRegressor instead of trainClassifier
# because 'y' is continuous.
model = RandomForest.trainRegressor(train, 
                                    categoricalFeaturesInfo=sizes,
                                    numTrees=30, featureSubsetStrategy="auto",
                                    impurity='variance',
                                    maxDepth=10, maxBins=32)

# As in the class notes, assemble labels and predictions.  As opposed
# to a classifier, we need to compute mean squared error.  Compute
# and print that value for your 'test' set.
predictions = model.predict(test.map(lambda x: x.features))
labelsAndPreds = test.map(lambda lp: lp.label).zip(predictions)
testErr = 
print('Test Error = ' + str(testErr))
