/ PYTHON, MACHINELEARNING

pyspark使用总结-第二篇

一. 常见的OOM

1.1 常用的解决方案

我们在使用spark的时候,经常在save数据的时候,都会遇到内存溢出的问题,这通常是由于数据量过大导致的。以下是一些可能的解决方案:

  1. 增加分区数:如果数据集非常大,可以尝试增加分区数。可以使用repartition()coalesce()函数来增加分区数。增加分区数可以将数据均匀地分布在更多的节点上,从而减少每个节点上的内存压力。
  2. 压缩数据:如果数据集包含大量重复的值,可以考虑使用压缩算法来减少内存使用。Pyspark提供了多种压缩算法,如Snappy、Gzip等。可以使用option("compression", "snappy")来设置压缩算法。
  3. 增加集群资源:可以考虑增加集群资源。可以增加集群的节点数或增加每个节点的内存。可以通过调整spark.driver.memoryspark.executor.memory参数来增加内存分配,特别对于driver而言,最好把内存设置大一些。

1.2 代码方面的优化

如果以上常用的解决方案依旧无法解决OOM的问题,那么我们可能需要考虑是否需要优化pyspark的代码了

  • UDF过于复杂:尽可能将结果拆分不同的列,然后再用简单的udf来组合这些列进行计算。
  • 多用filter算子:提前将大量数据剔除
  • 多用select算子:只保留需要的列,减少内存的使用
  • 尽量少用collect、count算子:像这些action算子基本都会把executor的数据全部加载回driver上,导致driver的内存吃紧。

二. 在集群上使用自建的python环境

2.1 构建python环境

由于集群是centos,那么我们构建python环境的时候最好选择也是centos。

  • conda构建python3.8:conda create -n yy_env python=3.8

  • 安装相关包:pip intall -r requirements.txt

  • 进入miniconda目录下:cd /root/miniconda3/envs

  • 压缩python环境:tar zcvf yy_env.tar.gz yy_env/

2.2 从本地传到指定文件目录

  • 如果需要推送到mdfs上,需要用mdfs和hdfs之间的映射关系

  • 利用hadoop脚本上传至hdfs上

sh hadoop.sh fs -Dipc.client.fallback-to-simple-auth-allowed=true -put file:///yy_env.tar.gz hdfs://env/

2.3 编写spark_conf

spark.yarn.dist.archives=mdfs:///env/yy_env.tar.gz;
spark.yarn.appMasterEnv.PYSPARK_PYTHON=./yy_env.tar.gz/yy_env/bin/python3;
spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./yy_env.tar.gz/yy_env/bin/python3;
spark.executorEnv.PYSPARK_PYTHON=./yy_env.tar.gz/yy_env/bin/python3;
spark.sql.broadcastTimeout=800;
spark.sql.broadcastMaxRetries=3;
spark.executor.heartbeatInterval=100000

三. pandas和pyspark的dataframe转换

两者之间的转换,代码很简单,但是实际中就会发现,当海量数据需要进行转换的时候,消耗时间成本是多么大!

3.1 转换的代码

# pandas ==> pyspark
pyspark_df = spark.createDataFrame(pandas_df)

# pyspark ==> pandas
pandas_df = pyspark_df.toPandas() 

3.2 爬坑

在转化代码中有哪些坑呢?

  1. pandas转pyspark的时候,如果你的pandas的版本过低,就会报错,这里你可以选择以下2个方案解决:

    • 升级pandas

    • 在代码中添加:pd.DataFrame.iteritems = pd.DataFrame.items

  2. 耗时过长,这里也有以下方案能缩减耗时:

    • 减少df的列和行 ==> 减少数据

    • 利用pyArrow加速:pip install pyarrow

      spark = SparkSession.builder.config("spark.sql.execution.arrow.pyspark.enabled", "true") # 加速转pandasdf的速度
      

四. 常用脚本

4.1 加载数据

# 加载mdfs文件,无表头
schema = StructType([
            StructField("link_id", StringType(), True)
        ])
df = spark.read.csv(file_path, header=True,schema=schema)

# 加载mdfs文件,有表头
df = spark.read.csv(file_path, header=True)

# 加载hive表
df = spark.sql("select a,b from save_tabel where ")
df = spark.read.format("iceberg").load(hive_tabel).where("a = a").select("a","b")

4.2 保存数据

# 保存mdfs
df.repartition(partition_num).write.option("header", "true").format("csv").mode("overwrite").save(save_path)


# 保存hive表: append是添加,overwrite是覆盖
df.write.format("iceberg").option("mergeSchema", "true").mode("append").save(save_tabel)

4.3 常用代码

# 1. 根据某一或者几列去重
df = df.dropDuplicates(subset=['a','b'])


# 2. df上下拼接(保证两个df的列名和顺序一致)
df = df_1.union(df_2)

# 3. df横向拼接
df = df_1.join(df_2, on='a', how= 'inner')

# 4. 构建常数列
df = df.withColumn('a', F.lit('0'))

# 5. groupby多列,其他的列聚合成list
df = df.groupby('a','b').agg(F.collect_list('c'),F.collect_list('d'),F.collect_list('e'))
df = df.withColumn('c', F.col('collect_list(c)').cast(StringType()))

# 6. filter过滤多个条件
df = df.filter((F.col('a') == 1) & (F.col('b') == 1)) # 且

# 7. 两个df按照某一列进行计算差集
diff_df = df_1.select("a").subtract(df_2.select("a")).distinct()

# 8. explode with split
df = df.withColumn("aa", F.explode(F.split(F.col("a"), ';'))) 

4.4 udf使用

可能在一个处理的过程中往往会使用多个自定义的udf函数,但是当项目非常大的时候,最好还是把归属于这个处理类的udf集成到类中:

class A:

    @staticmethod
    @F.udf(returnType=IntegerType())
    def is_a_equal0(a):
        if a == 0:
            return 1
        else:
            return 0

需要返回多列

    def aaa(var_list):

        @F.udf(returnType=StringType())
        def bbb(value):
            # 在这里可以对每个值进行自定义的处理操作
            rs = ''
            value_js = json.loads(value)
            for v in var_list:
                if rs:
                    rs += (';' + str(value_js[v]))
                else:
                    rs += str(value_js[v])
            return rs

        return bbb

    need_vars = ['a','b','c']
    df = df.withColumn("need_data", aaa(need_vars)(F.col("data")))
    df = df.withColumn("s", F.split(df['data'], ";"))
    for i, v in enumerate(need_vars):
        df = df.withColumn(v, df['s'].getItem(i))