pyspark使用心得
一. 安装
要使用PySpark,本地要有Java开发环境。
-
Java 8 :
brew install --cask homebrew/cask-versions/adoptopenjdk8
-
pyspark安装:
pip install pyspark
二. 和pandas之间的代码使用
2.1 读取csv
spark在读取csv上优势就很明显了,能直接快速读取几个G的大文件
pandas读取大的csv,只能将其拆分为多个chunk进行读取,假如我们直接读取csv,可能会直接报内存不够导致进程被干掉。
import pandas as pd
df = pd.read_csv(path, index_col=False, iterator=True, chunksize=100000)
for df_i in df:
print(df_i)
pyspark读取csv,快速高效
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('learn').master("local").getOrCreate()
print(spark)
df = spark.read.csv(path,header=True)
2.2 写csv
pandas写入csv
df.to_csv('test.csv',index=False)
pyspark写入csv时,指定某个目录,这里推荐使用repartition(1),让所有分区文件合并成一个,不然得话存储为多个分片文件
spark_df.repartition(1).write.csv("data/", encoding="utf-8", header=True,mode='overwrite')
2.3 构建Dataframe
pandas构建dataframe
df = pd.DataFrame([['Sport', 1, 1], ['Flow', 2, 9], ['Sport', 2, 2],['Hear', 1, 6]],
columns=['type', 'lenth', 'score'])
pyspark构建dataframe
spark_df = spark.createDataFrame([['Sport', 1, 1], ['Flow', 2, 9], ['Sport', 2, 2],['Hear', 1, 6]],
['type', 'lenth', 'score'])
pandas的dataframe 转 pyspark的dataframe
spark_df = spark.createDataFrame(df)
spark_df.show()
2.4 自定义函数
在处理同一批数据(130w条样本测试)时,使用pyspark(local模式)处理需要0.05s,而pandas的apply函数则需要15s,快了300倍!
pandas的自定义函数apply
def is_sport(x):
if x == 'sport':
return 1
else:
return 0
df['is_sport'] = df['type'].apply(is_sport)
pyspark的自定义函数udf
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
type_formater = F.udf(is_sport,IntegerType())
new_type = type_formater(F.col('type')).alias('new_type')
spark_df.select(['type','lenth',new_type]).show()
2.5 查询函数
pandas查询函数query
df = df.query('score == 1')
pyspark查询函数filter
spark_df.filter("score == 1").show()
2.6 分组聚合函数
pandas分组函数groupby
df.groupby('type').sum()
pyspark分组函数groupBy
spark_df.groupBy('type').sum().show()
三. 机器学习
3.1 构建特征
VectorAssembler是一个Transformer,用来将数据集中多个属性按次序组合成一个类型为向量vector的属性。
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["lenth","score"],outputCol="Features")
output=featureassembler.transform(spark_df)
output.show()
3.2 构建label
使用StringIndexer来构建数据集的label,默认的index是从0开始
indexer=StringIndexer(inputCol="type",outputCol="label")
output=indexer.fit(output).transform(output)
output.show()
3.3 训练模型
选择需要的特征后,将数据集拆分,进行训练,这里使用的随机森林模型
finalized_data=output.select("Features","label")
train_data,test_data=finalized_data.randomSplit([0.9,0.1])
rf=RandomForestClassificationModel(labelCol='label',featuresCol='Features',numTrees=20,maxBins=122)
rf=rf.fit(train_data)
rf.save('./model')