/ PYTHON, MACHINELEARNING

pyspark使用心得

一. 安装

要使用PySpark,本地要有Java开发环境。

  • Java 8 : brew install --cask homebrew/cask-versions/adoptopenjdk8

  • pyspark安装:pip install pyspark

二. 和pandas之间的代码使用

2.1 读取csv

spark在读取csv上优势就很明显了,能直接快速读取几个G的大文件

pandas读取大的csv,只能将其拆分为多个chunk进行读取,假如我们直接读取csv,可能会直接报内存不够导致进程被干掉。

import pandas as pd
df = pd.read_csv(path, index_col=False, iterator=True, chunksize=100000)
for df_i in df:
    print(df_i)

pyspark读取csv,快速高效

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('learn').master("local").getOrCreate()
print(spark)
df = spark.read.csv(path,header=True)

2.2 写csv

pandas写入csv

df.to_csv('test.csv',index=False)

pyspark写入csv时,指定某个目录,这里推荐使用repartition(1),让所有分区文件合并成一个,不然得话存储为多个分片文件

spark_df.repartition(1).write.csv("data/", encoding="utf-8", header=True,mode='overwrite')

2.3 构建Dataframe

pandas构建dataframe

df = pd.DataFrame([['Sport', 1, 1], ['Flow', 2, 9], ['Sport', 2, 2],['Hear', 1, 6]],
                      columns=['type', 'lenth', 'score'])

pyspark构建dataframe

spark_df = spark.createDataFrame([['Sport', 1, 1], ['Flow', 2, 9], ['Sport', 2, 2],['Hear', 1, 6]], 
                                ['type', 'lenth', 'score'])

pandas的dataframe 转 pyspark的dataframe

spark_df = spark.createDataFrame(df)
spark_df.show()

2.4 自定义函数

在处理同一批数据(130w条样本测试)时,使用pyspark(local模式)处理需要0.05s,而pandas的apply函数则需要15s,快了300倍!

pandas的自定义函数apply

def is_sport(x):
    if x == 'sport':
        return 1
    else:
        return 0
df['is_sport'] = df['type'].apply(is_sport)

pyspark的自定义函数udf

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
type_formater = F.udf(is_sport,IntegerType())
new_type = type_formater(F.col('type')).alias('new_type')
spark_df.select(['type','lenth',new_type]).show()

2.5 查询函数

pandas查询函数query

df = df.query('score == 1')

pyspark查询函数filter

spark_df.filter("score == 1").show()

2.6 分组聚合函数

pandas分组函数groupby

df.groupby('type').sum()

pyspark分组函数groupBy

spark_df.groupBy('type').sum().show()

三. 机器学习

3.1 构建特征

VectorAssembler是一个Transformer,用来将数据集中多个属性按次序组合成一个类型为向量vector的属性。

from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["lenth","score"],outputCol="Features")
output=featureassembler.transform(spark_df)
output.show()

3.2 构建label

使用StringIndexer来构建数据集的label,默认的index是从0开始

indexer=StringIndexer(inputCol="type",outputCol="label")
output=indexer.fit(output).transform(output)
output.show()

3.3 训练模型

选择需要的特征后,将数据集拆分,进行训练,这里使用的随机森林模型

finalized_data=output.select("Features","label")

train_data,test_data=finalized_data.randomSplit([0.9,0.1])
rf=RandomForestClassificationModel(labelCol='label',featuresCol='Features',numTrees=20,maxBins=122)
rf=rf.fit(train_data)
rf.save('./model')