自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(45)
  • 收藏
  • 关注

原创 hive求某天是当年第几周形如yyyyww

两者是有差异mysql是从周一作为这周起始时间,hive是从周日作为这周起始时间。mysql有date_format求当年第几周。为了同mysql保持一致需要当前时间减1天。同时hive也有相应的函数。

2022-08-10 14:14:27 741 1

原创 HIVE实现连续时间间隔时间序列

hive生成连续时间间隔时间序列。比如每隔10分钟生成连续时间序列

2022-06-07 14:44:02 2178

原创 HIVE生成数字全局唯一键ID

1.row_number() over() (生成全局递增唯一ID)selectrow_number() over() id,afrom test1此方法会产生一个reduce。数据量比较大会产生数据倾斜。2.生成全局唯一数字ID(非递增连续)先将原始数据随机分成101份(可以理解为步长)create table test2asselecta,cast(rand()*100 as bigint) numfrom test1再每份内排序后生成一个唯一IDselect

2021-06-17 20:44:17 2482 2

原创 HIVE列转行注意点

HIVE列转行注意分隔成数组字段不能为null当hive进行列转行需要注意分割成数组不能为null,不然这列就会漏掉。原始数据结果展示对于这样的情况只能用union all将两部分数据合起来selecta,bbfrom testlateral view explode(split(b,’:’)) num as bbwhere b is not nullunion allselecta,b bbfrom testwhere b is null或者用nvl将null转化

2021-06-09 20:55:48 167 2

原创 HIVE解析JSON数组

HIVE解析JSON数组数据示例:[{“payAmount”:“375000”,“payChannelCode”:“BOC”},{“payAmount”:“376000”,“payChannelCode”:“AOC”}]1.get_json_object函数提取json数组里面特定字段值get_json_object可以提取json数组指标位置的值(跟数组一样)selectget_json_object('[{"payAmount":"375000","payChannelCode":"BOC

2021-06-07 20:22:10 4090

原创 hive正则表达式反向引用

反向引用捕获会返回一个捕获组,这个分组是保存在内存中的,不仅可以在正则表达式外部通过程序进行引用,也可以在正则表达式内部进行引用,这种引用方式就是反向引用。根据捕获组的命名规则,反向引用可分为:1.数字编号组反向引用:\k或\number2.命名编号组反向引用:\k或\‘name’捕获组是匹配子表达式的内容按序号或者命名保存起来以便使用,主要是用来查找一些重复的内容或者替换指定字符。J...

2020-01-22 10:21:31 1276

原创 SQL16号统计1~15号数据,1号统计上月15~月底数据

由于我司提供的脚本周期只有天,周这种。于是想用周期为天来执行脚本。逻辑如下:selectday,count(*)from testwhere( ‘20191215’ = ‘date’ and day>=‘20191201’ and day<=‘20191215’)or(‘20200101’ = ‘date’ and day>=‘20191215’ and day...

2020-01-19 09:41:10 726

原创 hive存储复杂的json格式

1.hive复合数据类型 ARRAY < data_type > MAP < primitive_type, data_type > STRUCT < col_name : data_type [COMMENT col_comment], ...> 2.json建构于两种结构 “名称/值”对的集合(A collection of name...

2020-01-13 11:59:06 1610

原创 使用max函数实现唯一性

原始数据type line amount 1 a 111 b 222 a 1112 b 2221.列转行select type, max(case when line='a' then amount else 0 end ) a, max(case when lin...

2020-01-10 11:50:15 315

原创 hive使用size函数求分割后字符串个数注意点

1.split的字段串是空串(a='')size(split(a,','))=12.split的字段串是null(a is null)size(split(a,','))=-1

2020-01-02 15:48:42 3009

原创 hive大量小文件处理方法总结

大量的小文件对namenode产生很大压力,加上最近平台要求清理小文件。下面总结一下工作中用到的小文件方法。1.参数法(普遍方法,适用于大量map端小文件和reduce后数据量还比较大。比如:日志文件从text格式insert到orc格式)1.map端参数set mapred.max.split.size=256000000;//每个Map最大输入大小set mapred.min.spli...

2019-12-30 16:46:43 2417

原创 hive时间戳转化格式化

1.bigint转化成时间戳 1.1 bigint为13(毫秒级) 1.date_format函数 select date_format(cast(1577379134405 as timestamp),'yyyyMMddHHmmss') 2.from_unixtime函数 select from_unixtime(cast(substr(15773791...

2019-12-27 17:37:13 2722

原创 hive实现like模糊配对

testa表(字段a) aaa bba cccddd dddaaatestb表(字段b) a b 1. 使用like+concat模糊配对select a.afrom testa a ,testb bwhere a like concat('%',b.b,'%')group by a.a2. 使用locate函数select a.a...

2019-12-26 19:57:43 5104

原创 hive字符串截取

1.根据分割符截取1.1 split函数selectsplit(‘a????️c’,’:’)[0],split(‘a????️c’,’:’)[2]1.2 regexp_extract函数selectregexp_extract(‘a????️c’,’(.)????.)????.)’,1),regexp_extract(‘a????️c’,’(.)????.)????.)’,3)selectregexp_extract(‘a????...

2019-12-26 09:48:41 17288

原创 hive除重的几种方法

1.除重不做统计操作1.1使用distinctselectdistncta,bfrom test1.2使用group byselecta,bfrom testgroup bya,b1.3使用分组函数selecta,bfrom(selecta,brow_number() over(partition by a,b) as row_idfrom tes...

2019-12-24 19:40:40 647

原创 Flink数据流类型转换

Flink为流处理和批处理分别提供了DataStream API和DataSet API。在开发工作中都是利用这些API进行程序编程。不同DataStream之间转换从而形成stream图。网上Flink1.4流程转换图有点旧,不过也能说明之间的转换关系。从上面的转换关系可以看出各种不同的流通过operator最后都转换为DataStream。可以从官网查看各个流的转换关系https://ci....

2019-08-11 20:14:53 4418

原创 Flink使用BucketingSink自定义多目录写入

由于平台的不稳定性,小时解析日志老是出错需要人为干涉。最近在想能不能通过flink实时解析日志入库。查了一下网上的资料可以使用BucketingSink来将数据写入到HDFS上。于是想根据自定义文件目录来实行多目录写入。添加pom依赖` <dependency> <groupId>org.apache.flink</groupId> ...

2019-08-03 16:20:58 3752 1

原创 IDEA开启异步调试

由于现在很多程序都是异步执行,于是debug的时候需要开启异步调试,刚好IDEA有这个功能。

2019-07-28 11:11:45 8157

原创 kill(杀死)hive进程(job)的2种方法

先去yarn界面查找你要杀死的程序,在最右边ApplicationMaster点进入下面界面方法1.yarn application -kill application_Id方法2.hadoop job -kill job_id

2019-07-26 17:16:04 8127 1

原创 Flink的KeyedStateStore的5中state

KeyedStateStore有ValueState,ListState,ReducingState,AggregatingState,MapState5中状态。下面实现这5种状态示例。1.KeyedStateStore接口的所有get方法2.ValueState DataStream<MyType> stream = ...; KeyedStream<MyType...

2019-07-23 22:00:01 701

原创 hive字段类型为bigint但在where条件却用了字符串a!=''导致执行计划和结果有问题

今天写了一段SQL,代码逻辑没有问题,但是结果老是跟实际不符合。于是用explain查了一下执行计划也没有什么问题。于是只能一步一步临时表的查问题。最后定位到where a!=’’。下面是hive支持的隐式类型转换图表hive测试结果:...

2019-07-22 16:16:26 1660

原创 Flink的StreamExecutionEnvironment和ExecutionEnvironment一点思考

Flink程序首先就是创建运行环境StreamExecutionEnvironment和ExecutionEnvironment,查看了一下源码这两个类是抽象类,但是getExecutionEnvironment是静态方法。也就是说抽象类里面定义了静态方法同时根据不同的子类类型来new。这样做的好处是提供统一访问接口,具体实现方法根据具体的子类来。这个跟Java模板模式一样。...

2019-07-21 14:52:45 10991 2

原创 Flink本地DEBUG日志输出到控制台

最近在读Flink源码,于是在本地DEBUG自带的例子源码,但是程序的日志输出级别是INFO,其中很多DEBUG级别的日志是不输出到控制台。怎样来修改日志级别让其输出到控制台方便阅读源码。于是在网上查看了一下Flink使用的log4j日志框架,设置log4j日志级别必须要log4j的配置文件。在编译好的源码中竟然有这个配置文件,那只要修改这个文件就可以在本地控制台看到DEBUG级别的日志了。l...

2019-07-21 14:16:49 9480

原创 sparkSQL实现对hive动态分区

1.开始hive动态分区参数val spark = SparkSession.builder().appName(“test”).enableHiveSupport().config(“spark.sql.adaptive.enabled”,true).config(“spark.sql.adaptive.shuffle.targetPostShuffleInputSize”,1342...

2019-07-19 18:00:28 3275

原创 spark读取多个目录下多个文件

最近项目要用准时数据,于是改用sparkSQL还对数据进行统计。kafka落到HDFS上是按照天分区,小时文件。HDFS上的目录结构如下:A/20190101/2019010100 A/20190101/2019010101B/20190101/2019010100 B/20190101/2019010101方法1:使用sparkContext.read.text(paths=[p...

2019-07-19 17:50:28 8616 2

原创 hive实现树形结构展开

最近遇到了树形结构存储,比如地市,节目目录这种结构。比如:节目目录展开drop table aps.tmp_q1;create table aps.tmp_q1 as select epg_cate_id alternatekey, epg_cata_ttl chinesetitle, first_py englishtitle, bppc_id bppcid, par...

2019-07-01 10:55:01 5782 1

原创 Flink状态计算与状态恢复(checkpoint)

通过扩展FlatMapFunction函数来对状态进行累计,通过实现CheckpointedFunction接口实现checkpoint和在初始化。 class CheckpointCount extends FlatMapFunction[(Int,Long),(Int,Long,Long)] with CheckpointedFunction{ /** * The core ...

2019-06-29 23:02:09 1214

原创 Flink自定义状态实现wordcoun通过扩展RichFlatMapFunction

def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection(List( (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), ...

2019-06-26 22:46:30 3222

原创 Flink的mapWithState示例及源码分析

Flink的状态函数mapWithState使用,代码如下: def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection(List( (1L, 3L), (1L, 5L), ...

2019-06-26 22:29:29 2238

原创 Flink自定义Assigning Timestamps和Watermarks

为了让event time工作,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这个通常是通过抽取或者访问事件中某些字段的时间戳来获取的。时间戳的分配伴随着水印的生成,告诉系统事件时间中的进度。下面介绍几种自定义事件时间戳方法1.在数据流源中定义可以看Flink静态Session Windows这边文章里面有2.使用DataStream API中的assignA...

2019-06-23 11:42:31 7462

原创 Flink静态Session Windows

1.Session Windows会话窗口分配器按活动会话对元素进行分组。与翻滚窗口和滑动窗口相比,会话窗口不重叠并且没有固定的开始和结束时间。当会话窗口在一段时间内没有接收到元素时,即当发生不活动的间隙时,会话窗口关闭。会话窗口分配器可以设置静态会话间隙和动态会话间隙(其中会话间隙为session gap)。如下图:2.使用Flink自带的示例:package org.apache.fl...

2019-06-22 13:37:27 190

原创 Flink自定义source

有些维度表是存在于Redis或者MySQL中需要读取这些数据,但是现在官网只有Sink。于是可以通过自定义一个Source来实现。自定义的Source需要实现(SourceFunction、ParallelSourceFunction)或者扩展(RichSourceFunction,RichParallelSourceFunction)其中的任意一个。下面的代码实现一个读出本地文件的一个Sour...

2019-06-18 23:27:46 1178

原创 IDE安装protobuf插件

1.在IDE上面按照protobuf插件2.在POM文件填加下面插件和jar包 <properties> <scala.version>2.11.12</scala.version> <spark.version>2.1.0.5</spark.version> <grpc.ver...

2019-06-18 20:09:34 2560 1

原创 flink的分区key指定方法

flink窗口函数和group by,join,coGroup算子都需要指定key进行转换。下面的介绍一些key几种方式1.根据字段位置。主要对tuple类型,pojo类会出错。注意:这个是相对于最外元素而言,同时不能指定嵌套里面的位置指定嵌套里面的字段为key,从结果看出是按照嵌套的第二字段分区的但是以第一个进入为第一个字段了。2.根据字段名称,主要是pojo类。但是对sum这种聚合函数都...

2019-06-15 21:21:15 2869

原创 Windows环境下idea编译flink1.7.2源码

最近看了了flink很多知识,于是决定编译一下源码,方便以后的学习。1.官网下载flink1.7.2源码,解压2.将源码导入idea3.查看一下idea有没有安装代码检查插件和相应的设置。可以参照官网。不然会报错。我这里没有设置IDE的配置而是在解压的当前目录下执行了maven checkstyle plugin命令。网上我看很多人都是将顶级pom文件这个插件给注释掉了。执行命令或者注...

2019-06-15 18:55:01 1716

原创 hive某一特殊值较多出现数据倾斜

最近业务说数据出现异常个别端的数据量级超大,于是查看了一下代码,看到最近程序出现了数据倾斜的情况。于是查看了一下数据分布情况,看到两张表join后空值类特别大。注意:这里的空值为’’,不是null于是将右表的空值剔除掉key is not null and trim(key) !=’’.实际业务中还有很多默认的特殊值比如:-1这种。1.特殊值特殊处理来解决2.在on条件on case wh...

2019-06-14 09:55:22 471

原创 hive数值超出了int大小出现负数

今天统计SQL结果出现负数,代码查了一遍,重新跑了代码片段都是正的,找了好久看了结果都是10亿多了,然后看到定义字段为int类型

2019-06-12 16:40:31 2879 1

原创 hive报metadata.HiveException: Hive Runtime Error while processing row (tag=0)错误

今天跑一条统计的SQL出现Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {“key”:错误报出的异常是说group by数字格式异常,后来核查了一下SQL发现group by后面出现了重复。个人觉得报出这样的异常是由于s...

2019-06-11 19:54:05 7261 1

原创 hive多次join未进行谓词下推

最近执行了一段多表join出现数据倾斜的情况,下面说一下处理过程:1.首先去yarn界面查看到job出现个别任务reduce特别的慢2.检查一下on条件里面的字段类型是否一致,比如on a.id=b.id(a.id为bigint,b.id为string),结果都是一致。3.查看一下主表的数据分布情况,确实有个别的类比较大,此时将count(distinct id)改为sum() group ...

2019-06-09 23:11:26 465

原创 window环境下使用IDEA开发第一个Fink程序Wordcount

1.下载netcat并解压,在当前目录下执行nc.exe -l -p 9000注意:有些杀毒软件会阻拦2.在IDEA编写代码(注:一定要引入防止出现隐式转换出错 org.apache.flink.streaming.api.scala.)import org.apache.flink.streaming.api.scala.import org.apache.flink.streami...

2019-06-07 08:46:49 269

空空如也

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除