pyspark使用总结-第二篇
一. 常见的OOM
1.1 常用的解决方案
我们在使用spark的时候,经常在save数据的时候,都会遇到内存溢出的问题,这通常是由于数据量过大导致的。以下是一些可能的解决方案:
- 增加分区数:如果数据集非常大,可以尝试增加分区数。可以使用
repartition()
或coalesce()
函数来增加分区数。增加分区数可以将数据均匀地分布在更多的节点上,从而减少每个节点上的内存压力。 - 压缩数据:如果数据集包含大量重复的值,可以考虑使用压缩算法来减少内存使用。Pyspark提供了多种压缩算法,如Snappy、Gzip等。可以使用
option("compression", "snappy")
来设置压缩算法。 - 增加集群资源:可以考虑增加集群资源。可以增加集群的节点数或增加每个节点的内存。可以通过调整
spark.driver.memory
和spark.executor.memory
参数来增加内存分配,特别对于driver而言,最好把内存设置大一些。
1.2 代码方面的优化
如果以上常用的解决方案依旧无法解决OOM的问题,那么我们可能需要考虑是否需要优化pyspark的代码了
- UDF过于复杂:尽可能将结果拆分不同的列,然后再用简单的udf来组合这些列进行计算。
- 多用filter算子:提前将大量数据剔除
- 多用select算子:只保留需要的列,减少内存的使用
- 尽量少用collect、count算子:像这些action算子基本都会把executor的数据全部加载回driver上,导致driver的内存吃紧。
二. 在集群上使用自建的python环境
2.1 构建python环境
由于集群是centos,那么我们构建python环境的时候最好选择也是centos。
-
conda构建python3.8:
conda create -n yy_env python=3.8
-
安装相关包:
pip intall -r requirements.txt
-
进入miniconda目录下:
cd /root/miniconda3/envs
-
压缩python环境:
tar zcvf yy_env.tar.gz yy_env/
2.2 从本地传到指定文件目录
-
如果需要推送到mdfs上,需要用mdfs和hdfs之间的映射关系
-
利用hadoop脚本上传至hdfs上
sh hadoop.sh fs -Dipc.client.fallback-to-simple-auth-allowed=true -put file:///yy_env.tar.gz hdfs://env/
2.3 编写spark_conf
spark.yarn.dist.archives=mdfs:///env/yy_env.tar.gz;
spark.yarn.appMasterEnv.PYSPARK_PYTHON=./yy_env.tar.gz/yy_env/bin/python3;
spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./yy_env.tar.gz/yy_env/bin/python3;
spark.executorEnv.PYSPARK_PYTHON=./yy_env.tar.gz/yy_env/bin/python3;
spark.sql.broadcastTimeout=800;
spark.sql.broadcastMaxRetries=3;
spark.executor.heartbeatInterval=100000
三. pandas和pyspark的dataframe转换
两者之间的转换,代码很简单,但是实际中就会发现,当海量数据需要进行转换的时候,消耗时间成本是多么大!
3.1 转换的代码
# pandas ==> pyspark
pyspark_df = spark.createDataFrame(pandas_df)
# pyspark ==> pandas
pandas_df = pyspark_df.toPandas()
3.2 爬坑
在转化代码中有哪些坑呢?
-
pandas转pyspark的时候,如果你的pandas的版本过低,就会报错,这里你可以选择以下2个方案解决:
-
升级pandas
-
在代码中添加:
pd.DataFrame.iteritems = pd.DataFrame.items
-
-
耗时过长,这里也有以下方案能缩减耗时:
-
减少df的列和行 ==> 减少数据
-
利用pyArrow加速:
pip install pyarrow
spark = SparkSession.builder.config("spark.sql.execution.arrow.pyspark.enabled", "true") # 加速转pandasdf的速度
-
四. 常用脚本
4.1 加载数据
# 加载mdfs文件,无表头
schema = StructType([
StructField("link_id", StringType(), True)
])
df = spark.read.csv(file_path, header=True,schema=schema)
# 加载mdfs文件,有表头
df = spark.read.csv(file_path, header=True)
# 加载hive表
df = spark.sql("select a,b from save_tabel where ")
df = spark.read.format("iceberg").load(hive_tabel).where("a = a").select("a","b")
4.2 保存数据
# 保存mdfs
df.repartition(partition_num).write.option("header", "true").format("csv").mode("overwrite").save(save_path)
# 保存hive表: append是添加,overwrite是覆盖
df.write.format("iceberg").option("mergeSchema", "true").mode("append").save(save_tabel)
4.3 常用代码
# 1. 根据某一或者几列去重
df = df.dropDuplicates(subset=['a','b'])
# 2. df上下拼接(保证两个df的列名和顺序一致)
df = df_1.union(df_2)
# 3. df横向拼接
df = df_1.join(df_2, on='a', how= 'inner')
# 4. 构建常数列
df = df.withColumn('a', F.lit('0'))
# 5. groupby多列,其他的列聚合成list
df = df.groupby('a','b').agg(F.collect_list('c'),F.collect_list('d'),F.collect_list('e'))
df = df.withColumn('c', F.col('collect_list(c)').cast(StringType()))
# 6. filter过滤多个条件
df = df.filter((F.col('a') == 1) & (F.col('b') == 1)) # 且
# 7. 两个df按照某一列进行计算差集
diff_df = df_1.select("a").subtract(df_2.select("a")).distinct()
# 8. explode with split
df = df.withColumn("aa", F.explode(F.split(F.col("a"), ';')))
4.4 udf使用
可能在一个处理的过程中往往会使用多个自定义的udf函数,但是当项目非常大的时候,最好还是把归属于这个处理类的udf集成到类中:
class A:
@staticmethod
@F.udf(returnType=IntegerType())
def is_a_equal0(a):
if a == 0:
return 1
else:
return 0
需要返回多列
def aaa(var_list):
@F.udf(returnType=StringType())
def bbb(value):
# 在这里可以对每个值进行自定义的处理操作
rs = ''
value_js = json.loads(value)
for v in var_list:
if rs:
rs += (';' + str(value_js[v]))
else:
rs += str(value_js[v])
return rs
return bbb
need_vars = ['a','b','c']
df = df.withColumn("need_data", aaa(need_vars)(F.col("data")))
df = df.withColumn("s", F.split(df['data'], ";"))
for i, v in enumerate(need_vars):
df = df.withColumn(v, df['s'].getItem(i))