Учебное пособие по PySpark для начинающих: учитесь на ПРИМЕРАХ

by moiseevrus

Прежде чем изучать PySpark, давайте разберемся:

Что такое Apache Spark?

Spark — это решение для работы с большими данными, которое оказалось проще и быстрее, чем Hadoop MapReduce. Spark — это программное обеспечение с открытым исходным кодом, разработанное лабораторией RAD Калифорнийского университета в Беркли в 2009 году. С момента своего публичного выпуска в 2010 году Spark стал популярнее и используется в отрасли в беспрецедентных масштабах.

В эпоху больших данных специалистам- практикам как никогда нужны быстрые и надежные инструменты для обработки потоковых данных. Ранние инструменты, такие как MapReduce, были любимыми, но медленными. Чтобы решить эту проблему, Spark предлагает быстрое и универсальное решение. Основное различие между Spark и MapReduce заключается в том, что Spark выполняет вычисления в памяти, а затем на жестком диске. Это обеспечивает высокоскоростной доступ и обработку данных, сокращая время с часов до минут.

Что такое ПиСпарк?

PySpark — это инструмент, созданный сообществом Apache Spark для использования Python со Spark. Это позволяет работать с RDD (Resilient Distributed Dataset) в Python. Он также предлагает PySpark Shell для связывания API Python с ядром Spark для запуска контекста Spark. Spark — это механизм имен для реализации кластерных вычислений, а PySpark — это библиотека Python для использования Spark.

Как работает искра?

Spark основан на вычислительном движке, что означает, что он заботится о планировании, распределении и мониторинге приложений. Каждая задача выполняется на различных рабочих машинах, называемых вычислительным кластером. Вычислительный кластер относится к разделению задач. Одна машина выполняет одну задачу, а другие вносят свой вклад в конечный результат посредством другой задачи. В конце концов, все задачи объединяются для получения результата. Администратор Spark предоставляет 360-градусный обзор различных заданий Spark.

 

Как работает искра

Как работает искра

Spark предназначен для работы с

  • питон
  • Ява
  • Скала
  • SQL

Важной особенностью Spark является огромное количество встроенной библиотеки, в том числе MLlib для машинного обучения . Spark также предназначен для работы с кластерами Hadoop и может читать широкий спектр файлов, включая данные Hive, CSV, JSON, данные Casandra и другие.

Зачем использовать Спарк?

Как будущий специалист по работе с данными, вы должны быть знакомы со знаменитыми библиотеками Python: Pandas и scikit-learn. Эти две библиотеки отлично подходят для изучения набора данных до среднего размера. Обычные проекты машинного обучения строятся на основе следующей методологии:

  • Загрузить данные на диск
  • Импорт данных в память машины
  • Обработка/анализ данных
  • Создайте модель машинного обучения
  • Сохраните прогноз обратно на диск

Проблема возникает, если специалист по данным хочет обрабатывать данные, которые слишком велики для одного компьютера. В ранние дни науки о данных специалисты-практики брали образцы, поскольку обучение работе с огромными наборами данных не всегда требовалось. Специалист по данным найдет хорошую статистическую выборку, проведет дополнительную проверку надежности и предложит отличную модель.

Однако с этим есть некоторые проблемы:

  • Отражает ли набор данных реальный мир?
  • Включают ли данные конкретный пример?
  • Подходит ли модель для выборки?

Возьмем, к примеру, рекомендацию пользователей. Рекомендатели полагаются на сравнение пользователей с другими пользователями при оценке их предпочтений. Если специалист по работе с данными возьмет только часть данных, не будет когорты пользователей, очень похожих друг на друга. Рекомендателям нужно работать с полным набором данных или вообще не использовать.

Каково решение?

Решение было очевидным в течение долгого времени, разделить проблему на несколько компьютеров. Параллельные вычисления также сопряжены с множеством проблем. У разработчиков часто возникают проблемы с написанием параллельного кода, и в конечном итоге им приходится решать кучу сложных проблем, связанных с самой многопроцессорностью.

Pyspark предоставляет специалисту по данным API, который можно использовать для решения проблем с параллельными данными. Pyspark справляется со сложностями многопроцессорной обработки, такими как распределение данных, распространение кода и сбор выходных данных от рабочих процессов на кластере машин.

Spark может работать автономно, но чаще всего работает поверх инфраструктуры кластерных вычислений, такой как Hadoop. Однако при тестировании и разработке специалист по данным может эффективно запускать Spark на своих устройствах разработки или ноутбуках без кластера.

• Одним из основных преимуществ Spark является создание архитектуры, которая включает в себя управление потоковой передачей данных, беспрепятственные запросы данных, прогнозирование с помощью машинного обучения и доступ в реальном времени к различным анализам.

• Spark тесно сотрудничает с языком SQL, т. е. со структурированными данными. Это позволяет запрашивать данные в режиме реального времени.

• Основная работа специалиста по обработке и анализу данных заключается в анализе и построении прогностических моделей. Короче говоря, специалист по данным должен знать, как запрашивать данные с помощью SQL , создавать статистические отчеты и использовать машинное обучение для получения прогнозов. Специалист по данным тратит значительную часть своего времени на очистку, преобразование и анализ данных. Когда набор данных или рабочий процесс данных готов, специалист по данным использует различные методы для обнаружения идей и скрытых закономерностей. Манипуляции с данными должны быть надежными и такими же простыми в использовании. Spark — правильный инструмент благодаря своей скорости и богатому API.

В этом руководстве по PySpark вы узнаете, как создать классификатор на примерах PySpark.

Как установить PySpark с AWS

Команда Jupyter создает образ Docker для эффективного запуска Spark. Ниже приведены шаги, которые вы можете выполнить, чтобы установить экземпляр PySpark в AWS.

Обратитесь к нашему руководству по AWS и TensorFlow.

Шаг 1: Создайте экземпляр

Прежде всего, вам нужно создать экземпляр. Перейдите в свою учетную запись AWS и запустите экземпляр. Вы можете увеличить хранилище до 15 г и использовать ту же группу безопасности, что и в руководстве по TensorFlow.

Шаг 2: Откройте соединение

Откройте соединение и установите контейнер Docker. Подробнее см. в туториале по TensorFlow с Docker . Обратите внимание, что вы должны находиться в правильном рабочем каталоге.

Просто запустите эти коды, чтобы установить Docker:

sudo yum update -y
sudo yum install -y docker
sudo service docker start
sudo user-mod -a -G docker ec2-user
exit

Шаг 3. Повторно откройте соединение и установите Spark.

После повторного открытия соединения вы можете установить образ, содержащий PySpark.

## Spark
docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook

## Allow preserving Jupyter notebook
sudo chown 1000 ~/work

## Install tree to see our working directory next
sudo yum install -y tree

Шаг 4: Откройте Юпитер

Проверьте контейнер и его имя

docker ps

Запустите докер с журналами докеров, за которыми следует имя докера. Например, докер регистрирует zealous_goldwasser

Перейдите в свой браузер и запустите Jupyter. Адрес http://localhost:8888/. Вставьте пароль, предоставленный терминалом.

Примечание . Если вы хотите загрузить/загрузить файл на свой компьютер AWS, вы можете использовать программное обеспечение Cyberduck, https://cyberduck.io/ .

Как установить PySpark на Windows/Mac с помощью Conda

Ниже приведен подробный процесс установки PySpark на Windows/Mac с помощью Anaconda:

Чтобы установить Spark на свой локальный компьютер, рекомендуется создать новую среду conda. Эта новая среда установит Python 3.6, Spark и все зависимости.

Пользователь Mac

cd anaconda3
touch hello-spark.yml
vi hello-spark.yml

Пользователь Windows

cd C:\Users\Admin\Anaconda3
echo.>hello-spark.yml
notepad hello-spark.yml

Вы можете редактировать файл .yml. Будьте осторожны с отступом. Два пробела необходимы перед –

name: hello-spark 
    dependencies:
    
    - python=3.6
    - jupyter
    - ipython
    - numpy
    - numpy-base
    - pandas
    - py4j
    - pyspark
    - pytz

Сохраните его и создайте среду. Это занимает некоторое время

conda env create -f hello-spark.yml

Для получения более подробной информации о местоположении, пожалуйста, ознакомьтесь с руководством по установке TensorFlow.

Вы можете проверить всю среду, установленную на вашем компьютере

conda env list
Activate hello-spark

Пользователь Mac

source activate hello-spark

Пользователь Windows

activate hello-spark

Примечание. Вы уже создали специальную среду TensorFlow для запуска руководств по TensorFlow. Удобнее создать новое окружение, отличное от hello-tf. Нет смысла перегружать hello-tf Spark или любыми другими библиотеками машинного обучения.

Представьте, что большая часть вашего проекта использует TensorFlow, но вам нужно использовать Spark для одного конкретного проекта. Вы можете настроить среду TensorFlow для всего своего проекта и создать отдельную среду для Spark. Вы можете добавить столько библиотек в среду Spark, сколько хотите, не мешая среде TensorFlow. Как только вы закончите с проектом Spark, вы можете стереть его, не затрагивая среду TensorFlow.

Юпитер

Откройте Jupyter Notebook и проверьте, работает ли PySpark. В новую записную книжку вставьте следующий пример кода PySpark:

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Если отображается ошибка, вероятно, на вашем компьютере не установлена ​​Java. В Mac откройте терминал и напишите java -версия, если есть версия java, убедитесь, что она 1.8. В Windows перейдите в Приложение и проверьте, есть ли папка Java. Если есть папка Java, убедитесь, что установлена ​​Java 1.8. На момент написания этой статьи PySpark несовместим с Java 9 и выше.

Если вам нужно установить Java, вы должны подумать ссылку и скачать jdk-8u181-windows-x64.exe

Пользователям Mac рекомендуется использовать brew.

brew tap caskroom/versions
brew cask install java8

Обратитесь к этому пошаговому руководству по установке Java

Примечание . Используйте команду «Удалить», чтобы полностью стереть среду.

 conda env remove -n hello-spark -y

Контекст искры

SparkContext — это внутренний механизм, который позволяет подключаться к кластерам. Если вы хотите запустить операцию, вам нужен SparkContext.

Создайте SparkContext

Прежде всего, вам нужно инициировать SparkContext.

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Теперь, когда SparkContext готов, вы можете создать набор данных под названием RDD, Resilient Distributed Dataset. Вычисления в RDD автоматически распараллеливаются в кластере.

nums= sc.parallelize([1,2,3,4])
Вы можете получить доступ к первой строке с помощью take
nums.take(1)
[1]

Вы можете применить преобразование к данным с помощью лямбда-функции. В приведенном ниже примере PySpark вы возвращаете квадрат чисел. Это преобразование карты

squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print('%i ' % (num))
1 
4 
9 
16

SQLContext

Более удобный способ — использовать DataFrame. SparkContext уже установлен, вы можете использовать его для создания dataFrame. Вам также необходимо объявить SQLContext

SQLContext позволяет подключать движок к различным источникам данных. Он используется для запуска функций Spark SQL.

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Теперь в этом учебнике Spark по Python давайте создадим список кортежей. Каждый кортеж будет содержать имена людей и их возраст. Требуется четыре шага:

Шаг 1) Создайте список кортежей с информацией

[('John',19),('Smith',29),('Adam',35),('Henry',50)]

Шаг 2) Создайте RDD

rdd = sc.parallelize(list_p)

Шаг 3) Преобразование кортежей

rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

Шаг 4) Создайте контекст DataFrame

sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

Если вы хотите получить доступ к типу каждой функции, вы можете использовать printSchema()

DF_ppl.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

Пример машинного обучения с PySpark

Теперь, когда у вас есть краткое представление о Spark и SQLContext, вы готовы создать свою первую программу машинного обучения.

Ниже приведены шаги по созданию программы машинного обучения с помощью PySpark:

  • Шаг 1) Базовая работа с PySpark
  • Шаг 2) Предварительная обработка данных
  • Шаг 3) Создайте конвейер обработки данных
  • Шаг 4) Создайте классификатор: логистика
  • Шаг 5) Обучите и оцените модель
  • Шаг 6) Настройте гиперпараметр

В этом руководстве по машинному обучению PySpark мы будем использовать набор данных для взрослых. Цель этого руководства — научиться использовать Pyspark. Дополнительные сведения о наборе данных см. в этом руководстве.

Обратите внимание, что набор данных не имеет значения, и вы можете подумать, что вычисления занимают много времени. Spark предназначен для обработки значительного объема данных. Производительность Spark увеличивается по сравнению с другими библиотеками машинного обучения по мере увеличения обрабатываемого набора данных.

Шаг 1) Базовая работа с PySpark

Прежде всего, вам нужно инициализировать SQLContext, который еще не запущен.

#from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc.addFile(url)
sqlContext = SQLContext(sc)

затем вы можете прочитать файл cvs с помощью sqlContext.read.csv. Вы используете inferSchema со значением True, чтобы указать Spark автоматически угадывать тип данных. По умолчанию установлено значение False.

df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=True)

Давайте посмотрим на тип данных

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
Вы можете увидеть данные с show
df.show(5, truncate = False)
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|age|workclass       |fnlwgt|education|education_num|marital           |occupation       |relationship |race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
|39 |State-gov       |77516 |Bachelors|13           |Never-married     |Adm-clerical     |Not-in-family|White|Male  |2174        |0           |40        |United-States |<=50K|
|50 |Self-emp-not-inc|83311 |Bachelors|13           |Married-civ-spouse|Exec-managerial  |Husband      |White|Male  |0           |0           |13        |United-States |<=50K|
|38 |Private         |215646|HS-grad  |9            |Divorced          |Handlers-cleaners|Not-in-family|White|Male  |0           |0           |40        |United-States |<=50K|
|53 |Private         |234721|11th     |7            |Married-civ-spouse|Handlers-cleaners|Husband      |Black|Male  |0           |0           |40        |United-States |<=50K|
|28 |Private         |338409|Bachelors|13           |Married-civ-spouse|Prof-specialty   |Wife         |Black|Female|0           |0           |40        |Cuba          |<=50K|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+----------+--------------+-----+
only showing top 5 rows

Если вы не установили для inderShema значение True, вот что происходит с типом. Есть все в строке.

df_string = sqlContext.read.csv(SparkFiles.get("adult.csv"), header=True, inferSchema=  False)
df_string.printSchema()
root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

Чтобы преобразовать непрерывную переменную в правильный формат, вы можете использовать преобразование столбцов. Вы можете использовать withColumn, чтобы указать Spark, в каком столбце выполнять преобразование.
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()
root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)

from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

Выберите столбцы

Вы можете выбрать и показать строки с выбором и именами функций. Ниже выбираются age и fnlwgt.

df.select('age','fnlwgt').show(5)
+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows

Считать по группам

Если вы хотите подсчитать количество вхождений по группам, вы можете связать:

  • группа по()
  • считать()

вместе. В приведенном ниже примере PySpark вы подсчитываете количество строк по уровню образования.

df.groupBy("education").count().sort("count",ascending=True).show()
+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   51|
|     1st-4th|  168|
|     5th-6th|  333|
|   Doctorate|  413|
|        12th|  433|
|         9th|  514|
| Prof-school|  576|
|     7th-8th|  646|
|        10th|  933|
|  Assoc-acdm| 1067|
|        11th| 1175|
|   Assoc-voc| 1382|
|     Masters| 1723|
|   Bachelors| 5355|
|Some-college| 7291|
|     HS-grad|10501|
+------------+-----+

Опишите данные

Чтобы получить сводную статистику данных, вы можете использовать описать(). Он вычислит:

  • считать
  • иметь в виду
  • стандартное отклонение
  • мин
  • Максимум
df.describe().show()
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|    education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|    capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+
|  count|             32561|      32561|             32561|       32561|            32561|   32561|           32561|       32561|             32561| 32561|             32561|           32561|             32561|         32561|32561|
|   mean| 38.58164675532078|       null|189778.36651208502|        null| 10.0806793403151|    null|            null|        null|              null|  null|1077.6488437087312| 87.303829734959|40.437455852092995|          null| null|
| stddev|13.640432553581356|       null|105549.97769702227|        null|2.572720332067397|    null|            null|        null|              null|  null| 7385.292084840354|402.960218649002|12.347428681731838|          null| null|
|    min|                17|          ?|             12285|        10th|                1|Divorced|               ?|     Husband|Amer-Indian-Eskimo|Female|                 0|               0|                 1|             ?|<=50K|
|    max|                90|Without-pay|           1484705|Some-college|               16| Widowed|Transport-moving|        Wife|             White|  Male|             99999|            4356|                99|    Yugoslavia| >50K|
+-------+------------------+-----------+------------------+------------+-----------------+--------+----------------+------------+------------------+------+------------------+----------------+------------------+--------------+-----+

Если вы хотите получить сводную статистику только по одному столбцу, добавьте имя столбца внутри description().

df.describe('capital_gain').show()
+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840354|
|    min|                 0|
|    max|             99999|
+-------+------------------+

Вычисление кросс-таблицы

В некоторых случаях может быть интересно увидеть описательную статистику между двумя парными столбцами. Например, вы можете подсчитать количество людей с доходом ниже или выше 50 тысяч по уровню образования. Эта операция называется перекрестной таблицей.

df.crosstab('возраст', 'метка').sort("age_label").show()
+---------+-----+----+
|age_label|<=50K|>50K|
+---------+-----+----+
|       17|  395|   0|
|       18|  550|   0|
|       19|  710|   2|
|       20|  753|   0|
|       21|  717|   3|
|       22|  752|  13|
|       23|  865|  12|
|       24|  767|  31|
|       25|  788|  53|
|       26|  722|  63|
|       27|  754|  81|
|       28|  748| 119|
|       29|  679| 134|
|       30|  690| 171|
|       31|  705| 183|
|       32|  639| 189|
|       33|  684| 191|
|       34|  643| 243|
|       35|  659| 217|
|       36|  635| 263|
+---------+-----+----+
only showing top 20 rows

Вы можете видеть, что ни у кого нет дохода выше 50 тысяч, когда они молоды.

Удалить столбец

Есть два интуитивно понятных API для удаления столбцов:

  • drop(): удалить столбец
  • dropna(): удалить NA

Ниже вы опускаете столбец education_num

df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

Данные фильтра

Вы можете использовать filter() для применения описательной статистики к подмножеству данных. Например, вы можете подсчитать количество людей старше 40 лет.

df.filter(df.age > 40).count()

13443

Описательная статистика по группам

Наконец, вы можете группировать данные по группам и выполнять статистические операции, такие как среднее значение.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()
+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 535.5687804878049|
|       Never-married|376.58831788823363|
|Married-spouse-ab...| 653.9832535885167|
|            Divorced| 728.4148098131893|
|             Widowed| 571.0715005035247|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
+--------------------+------------------+

Шаг 2) Предварительная обработка данных

Обработка данных является важным шагом в машинном обучении. После удаления мусорных данных вы получите важные сведения.

Например, вы знаете, что возраст не является линейной функцией дохода. Когда люди молоды, их доход обычно ниже, чем у людей среднего возраста. После выхода на пенсию домохозяйство использует свои сбережения, что означает уменьшение дохода. Чтобы зафиксировать эту закономерность, вы можете добавить квадрат к признаку возраста.

Добавить возрастной квадрат

Чтобы добавить новую функцию, вам необходимо:

  1. Выберите столбец
  2. Примените преобразование и добавьте его в DataFrame.
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)

Вы можете видеть, что age_square был успешно добавлен во фрейм данных. Вы можете изменить порядок переменных с помощью select. Ниже вы вводите age_square сразу после возраста.

COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()
Row(age=39, age_square=1521.0, workclass='State-gov', fnlwgt=77516, education='Bachelors', education_num=13, marital='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country='United-States', label='<=50K')

ROC-метрики

Модуль BinaryClassificationEvaluator включает показатели ROC. Кривая рабочих характеристик приемника — еще один распространенный инструмент, используемый при бинарной классификации. Она очень похожа на кривую точность/отзыв, но вместо того, чтобы строить график зависимости точности от полноты, кривая ROC показывает истинно положительный уровень (т.е. отзыв) по сравнению с уровнем ложноположительных результатов. Частота ложноположительных результатов — это доля отрицательных случаев, которые ошибочно классифицируются как положительные. Он равен единице минус истинная отрицательная ставка. Истинный отрицательный показатель также называется специфичностью. Следовательно, кривая ROC отображает чувствительность (отзыв) по сравнению с 1 – специфичностью.

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8940481662695192areaUnderROC

print(evaluator.evaluate(predictions))

0.8940481662695192

Шаг 6) Настройте гиперпараметр

И последнее, но не менее важное: вы можете настроить гиперпараметры. Как и в scikit, вы создаете сетку параметров и добавляете параметры, которые хотите настроить.

Чтобы сократить время вычислений, вы настраиваете параметр регуляризации только с двумя значениями.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Наконец, вы оцениваете модель с помощью метода перекрестной проверки с 5-кратным повторением. Тренировка занимает около 16 минут.
from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Время обучения модели: 978,807 секунды

Лучший гиперпараметр регуляризации — 0,01 с точностью 85,316%.

accuracy_m(model = cvModel)
Model accuracy: 85.316%

Вы можете извлечь рекомендуемый параметр, объединив cvModel.bestModel с extractParamMap().

bestModel = cvModel.bestModel
bestModel.extractParamMap()
{Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='maxIter', doc='maximum number of iterations (>= 0)'): 10,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities'): 'probability',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name'): 'rawPrediction',
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='regParam', doc='regularization parameter (>= 0)'): 0.01,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='standardization', doc='whether to standardize the training features before fitting the model'): True,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='threshold', doc='threshold in binary classification prediction, in range [0, 1]'): 0.5,
 Param(parent='LogisticRegression_4d8f8ce4d6a02d8c29a0', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0)'): 1e-06}

Резюме

Spark — это фундаментальный инструмент для специалиста по данным. Это позволяет практикующему специалисту подключать приложение к различным источникам данных, беспрепятственно выполнять анализ данных или добавлять прогностическую модель.

Чтобы начать работу со Spark, вам необходимо инициировать контекст Spark с помощью:

`SparkContext()“

и контекст SQL для подключения к источнику данных:

`SQLContext()“

В этом руководстве вы узнаете, как обучить логистическую регрессию:

  1. Преобразуйте набор данных в Dataframe с помощью:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Обратите внимание, что имя столбца метки — newlabel, и все функции собраны в функции. Измените эти значения, если они отличаются в вашем наборе данных.

  1. Создайте набор поездов/тестов
randomSplit([.8,.2],seed=1234)
  1. Обучите модель
LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0,3)
lr.fit()
  1. Сделать прогноз
linearModel.transform()

You may also like

Leave a Comment