3 Lestat.Z.

尚未进行身份认证

我要认证

lestatzhang.com

等级
TA的排名 1w+

spark动态资源分配参数一览

属性 默认值 含义 自版本 spark.dynamicAllocation.enabled false 是否使用动态资源分配,该资源分配将根据工作负载上下扩展在此应用程序中注册的执行程序的数量。 这需要设置spark.shuffle.service.enabled或spark.dynamicAllocation.shuffleTracking.enabled。 以下配置也相关:spark.dynamicAllocation.minExecutors,spa.

2020-07-14 16:03:07

Spark Dynamic Resource Allocation动态资源分配

动态资源分配Spark提供了一种机制,可以根据工作负载动态调整应用程序占用的资源。 这意味着,如果不再使用资源,应用程序可以将资源返还给群集,并在以后有需求时再次请求它们。 如果多个应用程序共享您的Spark集群中的资源,则此功能特别有用。默认情况下,此功能是禁用的,并且在各种类型的集群管理器中都可使用用。资源分配政策从较高的层次上讲,Spark应该在不再使用executor时将其放弃,并在需要它们时获取他们。由于没有确定的方法可以预测即将被删除的执行程序是否将在不久的将来运行任务,或者即将被添加的

2020-07-14 15:46:09

shell同时后台运行多个进程并等待

代码如下job_list='job1 job2 job3'pids=''for j in `echo $job_list`;do run_job $j & pids="$pids $!"donefor pid in $pids;do wait $pid || let "RES=1"doneif [[ "$RES" -ne 0 ]]; then exit $RESfi

2020-06-15 12:08:05

Spark中对dataframe内重复列求和

前言在处理dataframe中的字段名是,发现有些字段名在处理后是重复,于是新定义了策略,对这些相同列名的字段进行求和合并summing the duplicated columns代码实现#Hanle Duplicatesfrom operator import addfrom functools import reduceimport collectionsoriginal_columns = [...]columns_after_cleanup=[...]counter=colle

2020-06-08 11:19:55

spark中递归join一系列dataframe

代码如下:def join_dfs(df_list,key=['id']): if len(df_list) == 1: retirm df_list[0] def join_df(df1,df2,key=['id']): return df1.join(df2, key) return reduce(join_df, df_list)def join_df_recursive(df_list, key=['id']): len_df = len(df_list) if len_df

2020-05-21 13:14:12

注意pyspark中替换nan值的问题

在PySpark中,NaN与Null不同。 两者都与空字符串“”不同,因此,可能需要在所有数据集特定填充值的基础上检查每个字符。本来想使用df.replace(float('nan'),None)将dataframe中的 nan 值全部转化为空,但是测试下来发现,这个函数在spark中除了会将 nan替换掉,还会将 0 也替换成空(此处是int(0),0.0不受影响),因此,要注意。最后,还是建议用isnan来判断然后替换:import pyspark.sql.functions as Fdf .

2020-05-21 09:42:18

pyspark中获取dataframe的大小

代码如下:from pyspark.serializers import PickleSerializer, AutoBatchedSerializerdef _to_java_obj_rdd(df): rdd=df.rdd._reserialise(AutoBatchedSerializer(PickleSerializer())) return rdd.ctx._jvm.org.apache.spark.mllib.api.python.Serde.pythonToJava(rdd._jrdd

2020-05-19 19:40:42

Hive中使用parquet

如何创建parquet外部表CREATE EXTERNAL TABLE IF NOT EXISTS XXXX (...)STORED AS PARQUETLOCATION '/data/hive/xxxx'TBLPROPERTIES ('parquet.compression'='SNAPPY')Parquet字段定义与Hive表中字段的关系如果hive表定义的字段包含且大于parquet文件字段,则parquet文件不存在的字段被查询为NULLparquet文件可以包含hive表定义

2020-05-19 14:27:16

python计算server时区时间差

记录一段python中关于timezone的小代码,将另一个时区服务器的时间转为utc+8import timefrom bson import ObjectIdfrom datetime import datetime, timedelta, timezonefrom dateutil.parser import parsedt1 = parse('2020-03-20')now_d...

2020-03-20 11:10:35

hive中匹配反斜杠\

今天在做查询时候需要匹配某个字段中的反斜杠\,测试了一下,可以使用以下方法select name from my_table where name != regexp_replace(name,'\\\\','\\\\\\\\');本来想使用like 加 %来匹配的,但是一直无效,猜测是\转义的优先级比较高所以换一个思路,将每个字段中的’\‘的数量翻倍,这样,不含’\'的字段值不变。...

2019-12-09 21:42:50

python链表反转(递归)

代码如下:class ListNode(): def __init__(self,x): self.val =x self.next = None def printNode(head): curr=head while curr is not None: print(curr.val,end=' -&g...

2019-12-03 12:00:26

python 实现链表反转操作

代码如下:class ListNode(): def __init__(self,x): self.val =x self.next = None def printNode(head): curr=head while curr is not None: print(curr.val,end=' -&g...

2019-11-30 17:51:58

python实现堆排序

代码如下:def heap_adjust(arr, i, l): temp = arr[i] j = i*2 +1 while j< l: if j+1< l and arr[j]<arr[j+1]: j += 1 if arr[j]<temp: break ...

2019-11-30 17:08:46

python实现树的前序/中序/后序遍历以及深度/广度遍历等

代码如下:class Node(): def __init__(self, x): self.val = x self.left = self.right = None class Tree(): def __init__(self): self.root = None def add(sel...

2019-11-30 16:48:56

leetcode算法练习 —— 求两个排序排序数组的中位数

问题描述给定两个排序过的数组,求这两个排序数组的中位数, 在O(log(min(n, m)))内。解题思路将所有元素重组成有序的两部分,中位数就在段点处def find_median(a, b): #if len(a) > len(b): # a, b = b, a imin, imax = 0, len(a) n, m = len(a), len(b) while imi...

2019-10-15 21:06:13

pyspark OneHotEncoder用法实例

def encode_columns(df, col_list): indexers = [ StringIndexer(inputCol=c, outputCol=f'{c}_indexed').setHandleInvalid("keep") for c in col_list ] encoder = OneHotEncoderEstimator( inputCols = [in...

2019-10-10 21:29:43

pyspark target mean encoding入门版

写了一个简单版本的target mean encoding, 代码如下:from pyspark.sql.functions import create_mapfrom itertolls import chainagg = df.select([f,target]).groupnBy(f).agg(avg_(target).alias('mean'), count_(target).ali...

2019-10-03 20:24:56

Sparkexception: Values to assemble cannot be null

使用VectorAssembler的时候出现Sparkexception: Values to assemble cannot be null的错误,原因是因为Vector不能包含null值。解决方法如下:assembler.transform(df.na.fill(999999999))...

2019-09-30 14:42:19

'requirement failed: Cannot have an empty string for name. '

今天测试pyspark重的encode出现了pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Cannot have an empty string for name. '这个问题的主要原因是OneHotEncoder不能处理空字符串。所以我们需要将数据集中的空字符串提前处理一下。一般可以使用 dropna 或...

2019-09-30 13:43:41

python中的quicksort 和mergesort

def quicksort(alist): return quicksorthelper(alist, 0 , len(alist)-1) def quicksorthelper(alist, first, last): if first < last: splitpartition = partition(alist, first, last) ...

2019-09-24 11:54:59

查看更多

勋章 我的勋章
  • GitHub
    GitHub
    绑定GitHub第三方账户获取
  • 签到新秀
    签到新秀
    累计签到获取,不积跬步,无以至千里,继续坚持!
  • 技术圈认证
    技术圈认证
    用户完成年度认证,即可获得
  • 专栏达人
    专栏达人
    授予成功创建个人博客专栏的用户。专栏中添加五篇以上博文即可点亮!撰写博客专栏浓缩技术精华,专栏达人就是你!
  • 持之以恒
    持之以恒
    授予每个自然月内发布4篇或4篇以上原创或翻译IT博文的用户。不积跬步无以至千里,不积小流无以成江海,程序人生的精彩需要坚持不懈地积累!
  • 勤写标兵Lv4
    勤写标兵Lv4
    授予每个自然周发布9篇以上(包括9篇)原创IT博文的用户。本勋章将于次周周三上午根据用户上周的博文发布情况由系统自动颁发。