Поскольку за каждым приобретением стоит небольшая потеря, процесс накопления знаний иногда может привести к утечкам. Несмотря на нашу тенденцию воспринимать себя как сверхлюдей, правда в том, что мы подвержены ошибкам и обладаем ограниченным объемом памяти. Может быть удручающе осознавать, что мы можем забыть ценные знания, которые мы усердно приобретали в течение многих дней и месяцев усилий. Следовательно, я взял на себя личное обязательство писать заметки и создавать руководства по различным темам, предоставляя себе ценный ресурс для периодического просмотра. Кроме того, я намерен поделиться этими ресурсами с другими, кто хочет освежить свои основные знания.

Сегодняшняя статья посвящена Spark: Apache Spark — это невероятно мощная распределенная вычислительная система с открытым исходным кодом, которая революционизирует обработку и аналитику больших данных. Он предоставляет унифицированную и масштабируемую структуру, позволяющую эффективно обрабатывать массивные наборы данных на кластерах компьютеров. Spark устраняет ограничения традиционных систем обработки данных, используя вычисления в памяти и параллельную обработку.

Spark написан на Scala и работает на виртуальной машине Java JVM. Spark имеет встроенные компоненты для обработки потоковых данных, машинного обучения, обработки графов и даже взаимодействия с данными через SQL.

Благодаря своей скорости, простоте использования, масштабируемости, отказоустойчивости и универсальности Apache Spark стал популярным в экосистеме больших данных. Он широко используется для различных вариантов использования, включая исследование данных, подготовку, машинное обучение, аналитику в реальном времени и приложения для потоковой передачи данных. Его влияние на обработку больших данных и аналитику невозможно переоценить.

Если Spark был написан на Scala, как его использовать на python? Это супер просто PySpark! который представляет собой слой Python для достижения искры scala.

import pyspark

# Initialize a SparkContext
sc = pyspark.SparkContext('local[*]')

# Read the text file and create an RDD
txt = sc.textFile('file.txt')

# Count the total number of lines in the text file
print(txt.count())  # Prints the line count to the console

# Filter out lines that contain the word 'python' (case-insensitive)
python_lines = txt.filter(lambda line: 'python' in line.lower())

# Count the number of lines that contain the word 'python'
print(python_lines.count())  # Prints the count to the console

В этом фрагменте кода:

  • Мы импортируем модуль pyspark, который предоставляет функции и классы для работы со Spark в Python.

SparkContext — это точка входа для функциональности Spark. Здесь мы создаем экземпляр с именем scдля подключения к кластеру Sparkr и выполнения распределенных операций. В этом случае мы запускаем Spark в локальном режиме, используя все доступные ядра на машине.

  • Далее читаем текстовый файл методом sc.textFile(). Здесь мы читаем файл, расположенный по адресу 'file.txt'.
  • Чтобы определить общее количество строк в текстовом файле, мы используем txt.count(). Это подсчитывает элементы в RDD txt, что соответствует количеству строк. Затем результат выводится на консоль.
  • Мы создаем новый RDD с именем python_lines, применяя операцию фильтрации к txt RDD. Лямбда-функция lambda line: 'python' in line.lower() проверяет, присутствует ли слово «python» (в нижнем регистре) в каждой строке. Это помогает нам отфильтровать строки, не содержащие слова «python».
  • Наконец, мы подсчитываем количество строк, содержащих слово «python», используя python_lines.count(). Результат выводится на консоль.

Искровые фреймы данных:

Чтобы начать работу с кадрами данных Spark, сначала необходимо создать объект SparkSession из файла SparkContext. Вы можете рассматривать SparkContext как соединение с кластером, а SparkSession как интерфейс с этим соединением. Но что, если вы не уверены, что он уже существует? Создание нескольких SparkSession и SparkContext может вызвать проблемы, поэтому рекомендуется использовать метод SparkSession.builder.getOrCreate(). Это возвращает существующий SparkSession, если он уже есть в среде, или создает новый, если необходимо!

import pyspark
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("My_car_data").getOrCreate()

# Read a CSV file and create a DataFrame
data_df = spark.read.csv("car_data.csv", header=True, inferSchema=True)

# Show the data in the DataFrame
data_df.show()

#To list all the data inside the cluster: spark.catalog.listTables()
tables = spark.catalog.listTables()

print("Tables in the Spark session:")
for table in tables:
    print(table.name)

Одним из преимуществ интерфейса DataFrame является то, что вы можете выполнять SQL-запросы к таблицам в вашем кластере Spark, скрывать их во фрейм данных pandas и преобразовывать фрейм данных pandas в фрейм данных spark, как показано в следующем примере.

# Apply an SQL query to count the number of rows
new_table= spark.sql("SELECT * FROM my_table")

# Convert the results to a pandas DataFrame
pd_table = new_table.toPandas()
# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_table)

# Add spark_temp to the catalog which necessary
# step to see the table in caltalog list

spark_temp.createOrReplaceTempView("temp")

# Examine the tables in the catalog again
print(spark.catalog.listTables())

Вы можете создать DataFrame из файла .csv так же, как и с обычными pandas DataFrames. SparkSession имеет атрибут .read, который имеет несколько методов для чтения различных источников данных в Spark DataFrames.

# Don't change this file path
file_path = "cars.csv"

# Read in the airports data
cars_spark= spark.read.csv(file_path, header=True)

# Show the data
cars_spark.show()

мы рассмотрим функциональные возможности, предоставляемые классом Spark DataFrame для выполнения общих операций с данными.

withColumn() принимает два аргумента. Первый аргумент — это строка, представляющая имя нового столбца, а второй аргумент — сам новый столбец.

Чтобы создать новый столбец, вам необходимо предоставить объект класса Column. Создать такой объект просто, извлекая столбец из вашего DataFrame, используя df.colName.

Важно отметить, что обновление Spark DataFrame отличается от работы с pandas, потому что Spark DataFrame неизменяемы. Это означает, что они не могут быть изменены напрямую, и, следовательно, столбцы не могут быть обновлены на месте. Вместо этого эти методы возвращают новый DataFrame. Чтобы перезаписать исходный DataFrame, вам необходимо переназначить возвращенный DataFrame с помощью соответствующего метода.

df = df.withColumn("newCol", 2* df.oldCol) 

Метод .cast() в сочетании с методом .withColumn(). Важно отметить, что .cast() работает со столбцами, а .withColumn() работает с DataFrames. Единственный аргумент, который вам нужно передать .cast(), — это тип значения, которое вы хотите создать, в строковой форме. Например, чтобы создать целые числа, вы передадите аргумент "integer", а для десятичных чисел вы будете использовать "double".

model_data = model_data.withColumn("newCol", 2* df.oldCol.cast("double"))

filter() принимает либо выражение, которое следует за предложением WHERE выражения SQL, в виде строки, либо столбец Spark с логическими (True/False) значениями.

Например, следующие два выражения дадут один и тот же результат:

flights.filter("air_time > 120").show()
flights.filter(flights.air_time > 120).show() 
flights.filter("air_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL").show()

groupBy(): Группировать строки на основе одного или нескольких столбцов.

agg(): выполнять функции агрегирования (например, суммировать, подсчитывать, усреднять) для сгруппированных данных.

# Import necessary libraries
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create the DataFrame
salesData = spark.createDataFrame([
    ("Product A", "Region 1", 10),
    ("Product B", "Region 1", 15),
    ("Product A", "Region 2", 5),
    ("Product C", "Region 2", 8),
    ("Product B", "Region 1", 20)
], ["product", "region", "quantity_sold"])

# Group by product and region, and sum the quantity_sold
groupedData = salesData.groupBy("product", "region").sum("quantity_sold")

# Show the result
groupedData.show()

Дополнительные функции для очистки и подготовки наборов данных:

orderBy(): Сортировка DataFrame по одному или нескольким столбцам.

join(): объединение двух фреймов данных на основе общего столбца.

distinct(): Получить отдельные строки из DataFrame.

drop(): удалить один или несколько столбцов из DataFrame.

describe(): Вычислить сводную статистику числовых столбцов.

pivot(): Поверните DataFrame на основе значения столбца.

sample(): Произвольная выборка части строк из DataFrame.

union(): Объединить два кадра данных по вертикали.

cache(): Кэшируйте DataFrame в памяти для более быстрого доступа.

Машинное обучение Spark:

Машинное обучение PySpark содержит оценщики и преобразователи, и оба они создают конвейер машинного обучения.

Оценщики: Оценщики относятся к алгоритмам или моделям, которые можно обучить на данных для создания преобразователей. Их основная функция состоит в том, чтобы учиться на предоставленных данных и соответствующим образом подгонять модель. Оценщики используют метод fit(), который принимает DataFrame в качестве входных данных и обучает модель, используя заданные данные. Примеры оценщиков в PySpark включают LinearRegression, DecisionTreeRegressor, LogisticRegression и RandomForestClassifier.

Преобразователи. Преобразователи, с другой стороны, представляют собой модели, которые могут преобразовывать кадры данных, применяя ряд операций к входным данным. Они берут входной DataFrame и создают новый DataFrame с дополнительными столбцами, полученными в результате преобразований. Преобразователи используют метод transform() для применения указанных операций к DataFrame. PySpark предоставляет различные преобразователи, такие как StringIndexer, OneHotEncoder и StandardScaler.

В конвейере машинного обучения PySpark оценщики и преобразователи объединяются для создания последовательности преобразований и обучения модели. Этот конвейер обеспечивает эффективную и масштабируемую обработку данных и построение моделей.

Типичный рабочий процесс включает в себя обучение Estimator на обучающих данных для получения обученной модели, которая затем используется в качестве Transformer для создания прогнозов или преобразования функций на новых данных с использованием метода transform().

Используя Estimators и Transformers в конвейере, вы можете обеспечить согласованные этапы предварительной обработки и надежно применять их как к обучающим, так и к тестовым данным, обеспечивая воспроизводимость и масштабируемость рабочего процесса машинного обучения.

Подготовьте свои данные, импортировав набор данных и разделив его на обучение и тестирование.

data = spark.read.csv("data.csv", header=True, inferSchema=True)
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

Обработка категориальных признаков:

  • StringIndexer: этот преобразователь присваивает уникальный числовой индекс каждой уникальной категории в категориальном столбце.
  • OneHotEncoder: этот преобразователь преобразует категориальные признаки в двоичные векторы, где каждый вектор представляет наличие или отсутствие категории.
  • VectorIndexer: этот преобразователь автоматически идентифицирует категориальные признаки и индексирует их.
# Import library
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier",outputCol="carrier_index")
# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index",outputCol="carrier_fact")

Обработка числовых функций:

  • StandardScaler: этот преобразователь стандартизирует числовые характеристики, вычитая среднее значение и масштабируя его до единичной дисперсии.
  • MinMaxScaler: этот преобразователь масштабирует числовые характеристики до указанного диапазона, часто от 0 до 1.
  • Bucketizer: этот преобразователь дискретизирует непрерывные числовые признаки в ячейки или ведра.

Комбинация функций:

  • VectorAssembler: этот преобразователь объединяет несколько столбцов функций в один векторный столбец. Он обычно используется для сборки всех соответствующих функций перед обучением модели.
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

Estimator это LogisticRegression

# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

Производительность модели оценивается с помощью BinaryClassificationEvaluator. Общая метрика для алгоритмов бинарной классификации называется AUC или площадью под кривой. чем ближе AUC к единице, тем лучше модель.

import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

Трубопровод

Pipeline — это класс в модуле pyspark.ml, который объединяет все Estimators и Transformers, которые вы уже создали. Это позволяет многократно использовать один и тот же процесс моделирования, заключая его в один простой объект.

# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler,lr])

Соответствие конвейеру: вызовите метод fit() для объекта конвейера, передав данные обучения. Это будет последовательно обучать и подгонять каждую стадию конвейера. Преобразуйте данные: после подгонки конвейера вызовите метод transform() для подогнанного объекта конвейера, передав нужный набор данных. Это применяет преобразования, изученные во время обучения, к новому набору данных. Получите доступ к преобразованным данным: выходом метода transform() будет новый кадр данных, содержащий преобразованные функции или прогнозы, в зависимости от этапов конвейера. После преобразования данных с помощью подобранного конвейера мы можем оценить результаты, используя оценщик, который мы определили ранее.

# training the pipline
model = flights_pipe.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model performance
evaluation_result = evaluator.evaluate(predictions)
print("Evaluation Result:", evaluation_result)

Спасибо, что нашли время прочитать эту статью. В ней я сделал общий обзор PySpark с примерами. Это быстрый и эффективный способ освежить ваше понимание основ PySpark.