自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(54)
  • 收藏
  • 关注

原创 Flink重分区算子解析 - StreamPartitioner

Flink分区器

2022-07-28 13:47:07 947 1

原创 FlinkSQL - 级联窗口计算并Sink2Hbase

FlinkSQL 级联窗口计算并Sink2Hbase

2022-07-25 14:54:44 522 1

原创 FlinkCDC 2.0使用实践体验

一、背景说明所谓CDC:全称是 Change Data Capture ,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。目前实时链路对于数据的处理是大多数使用的方案是通过工具,对业务数据日志的监控(如canal/maxwell),并连接到kafka,实现对业务数据的实时获取,在实时数仓架构上,ods层一般也会设计在kafka(数据入湖另外说),参考下面图1。而通过FlinkCDC则可以在确保数据一

2021-09-13 17:37:51 1099

转载 Flink-分组窗口 | Over Windows | SQL 中的 Group Windows | SQL 中的 Over Windows

窗口(Windows)时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口、根据时间段做计算了。下面我们就来看看Table API和SQL中,怎么利用时间字段做窗口操作。在Table API和SQL中,主要有两种窗口:Group Windows和Over Windows分组窗口(Group Windows)Group Windows 是使用 window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在 grou

2021-07-07 17:10:50 653

原创 Flink从Kafka取数WordCount后TableApi写入ES

一、背景说明需求为从Kafka消费对应主题数据,通过TableApi对数据进行WordCount后,基于DDL写法将数据写入ES。二、代码部分说明:代码中关于Kafka及ES的连接部分可以抽象到单独的工具类使用,这里只是一个演示的小demo,后续操作均可自行扩展,如Kakfa一般处理为json格式数据,引入fastjson等工具使用富函数进行格式处理即可。package com.flinksql.test;import org.apache.flink.api.common.functions

2021-06-20 10:33:55 201

原创 FlinkSQL写入Kafka/ES/MySQL示例-JAVA

一、背景说明Flink的API做了4层的封装,上两层TableAPI、SQL语法相对简单便于编写,面对小需求可以快速上手解决,本文参考官网及部分线上教程编写source端、sink端代码,分别读取socket、kafka及文本作为source,并将流数据输出写入Kafka、ES及MySQL,方便后续查看使用。二、代码部分说明:这里使用connect及DDL两种写法,connect满足Flink1.10及以前版本使用,目前官方文档均是以DDL写法作为介绍,建议1.10以后的版本使用DDL写法操作,通

2021-06-18 16:53:34 1559

原创 Flink使用connect实现双流join全外连接

一、背景说明在Flink中可以使用Window join或者Interval Join实现双流join,不过使用join只能实现内连接,如果要实现左右连接或者外连接,则可以通过connect算子来实现。现有订单数据及支付数据如下方说明,基于数据时间实现订单及支付数据的关联,超时或者缺失则由侧输出流输出//OrderLog.csv 订单数据,首列为订单id,付款成功则类型为pay(第二列),且生成支付id(第三列),最后列为时间34729,create,,155843084234730,create,

2021-06-08 17:57:27 1148 1

原创 Flink使用二次聚合实现TopN计算-乱序数据

一、背景说明:在上篇文章实现了TopN计算,但是碰到迟到数据则会无法在当前窗口计算,需要对其中的键控状态优化Flink使用二次聚合实现TopN计算本次需求是对数据进行统计,要求每隔5秒,输出最近10分钟内访问量最多的前N个URL,数据流预览如下(每次一条从端口传入):208.115.111.72 - - 17/05/2015:10:25:49 +0000 GET /?N=A&page=21 //15:50-25:50窗口数据208.115.111.72 - - 17/05/2015:1

2021-05-26 17:07:30 348 3

原创 Flink使用二次聚合实现TopN计算

一、背景说明:有需求需要对数据进行统计,要求每隔5分钟输出最近1小时内点击量最多的前N个商品,数据格式预览如下:543462,1715,1464116,pv,1511658000662867,2244074,1575622,pv,1511658000561558,3611281,965809,pv,1511658000894923,3076029,1879194,pv,1511658000834377,4541270,3738615,pv,1511658000315321,942195,433

2021-05-24 15:38:26 766

原创 Flink去重统计-基于自定义布隆过滤器

一、背景说明在Flink中对流数据进行去重计算是常有操作,如流量域对独立访客之类的统计,去重思路一般有三个:基于Hashset来实现去重数据存在内存,容量小,服务重启会丢失。使用状态编程ValueState/MapState实现去重常用方式,可以使用内存/文件系统/RocksDB作为状态后端存储。结合Redis使用布隆过滤器实现去重适用对上亿数据量进行去重实现,占用资源少效率高,有小概率误判。这里以自定义布隆过滤器的方式,实现Flink窗口计算中独立访客的统计,数据集样例如下: 二、

2021-05-19 20:05:05 890 4

原创 Flink键控状态AggregatingState开发实例

一、键控状态说明参考官网说明,几个键控状态介绍如下:ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还

2021-05-08 13:44:54 718

原创 Flink实现连续数据异常预警-定时器

通过一个小需求的开发来简明Flink定时器的应用。一、需求说明:对水库的数据进行实时监测,如果10秒内水位线没有下降,则做预警输出。数据使用socket进行模拟,数据包含 传感器编号/时间戳/水位 三个字段。实现逻辑说明如下图:数据1实时进来,第一条数据水位线为1米,建立定时器,第四条数据在10秒内进来,并且水位降低,则删除定时器,随即后续无任何再操作。数据2实时进来,第一条数据水位线为1米,建立定时器,第四条数据在第10秒进来,10秒内水位无下降,出发定时器的规则,在侧输出流输出预警提.

2021-05-07 14:30:41 1469

原创 Flink使用Watermark处理延迟数据-侧输出流

一、关于Flink的Watermark1.12版本之后默认时间语义为Event time(事件时间),并且实际使用也是以事件时间为主,故这边背景均以基于时间事件的来说明。支持event time的流式处理框架需要一种能够测量event time 进度的方式;比如, 一个窗口算子创建了一个长度为1小时的窗口,那么这个算子需要知道事件时间已经到达了这个窗口的关闭时间, 从而在程序中去关闭这个窗口。事件时间可以不依赖处理时间来表示时间的进度.例如,在程序中,即使处理时间和事件时间有相同的速度, 事件时间

2021-05-06 13:48:21 990

原创 FlinkSQL使用自定义UDTF函数行转列-IK分词器

一、背景说明本文基于IK分词器,自定义一个UDTF(Table Functions),实现类似Hive的explode行转列的效果,以此来简明开发过程。如下图Flink三层API接口中,Table API的接口位于最顶层也是最易用的一层,可以套用SQL语法进行代码编写,对于有SQL基础的能很快上手,但是不足之处在于灵活度有限,自有函数不能满足使用的时候,需要通过自定义函数实现,类似Hive的UDF/UDTF/UDAF自定义函数,在Flink也可以称之为Scalar Functions/Table Fun

2021-05-04 23:08:54 632 1

原创 Flink实时维表查询优化-旁路缓存

一、背景说明:在目前实时数仓中,由于维表具有主键唯一性的特点,Hbase/Redis通常作为维表存放选择Hbase:数据存于磁盘具有持久性但是查询效率慢。Redis:数据存于内存查询效率高,但维表多数据量大时候占用资源多。基于旁路缓存思想,对维表存储的优化的思路为:维表数据存储在Hbase,使用Redis作为缓存,但查询维表时有限查询Redis,如果没有该维表则去Hbase查询后并将维表数据放入Redis,并按一定时间保存,超过时间Redis自动清理(可使不常用维表无需常驻内存,缺点是首次查询较

2021-05-02 16:21:09 1957 1

原创 使用Flink-CEP标记网页跳出用户代码开发

一、需求说明:对页面日志数据进行ETL,对跳出用户进行标记后输出到Kafka。跳出用户定义:条件1:不是从其他页面跳转过来的页面,是一个首次访问页面。日志数据表现为不存在last_page_id字段。条件2:距离首次访问结束后10秒内,没有对其他的页面再进行访问。ps:该需求一般为实时项目中对kafka日志数据进行消费后处理,后续输出到kafka计算页面跳出率用于运营分析使用,该文重点在于代码部分的处理,因此测试数据简化输入及输出。测试数据说明://mid对应设备,page_id对应当

2021-04-18 15:25:17 342

原创 MapReduce之数据分片思想

尽管MR由于计算效率问题,已经不适用于大多数业务场景,Hive3开始计算引擎改为TEZ,但MR的经典思路在Hadoop生态各组件都有体现,重温后对各组件原理的理解还有使用都有帮助,如Spark的RDD分区里面的源码就能看到MR分片思想的影子。这里仅拿输入Map前分片(Split)这个动作的源码做窥探,深入挖掘下分片(split)的思想。一、简单总结进行MR前,对要计算的各文件按分片逻辑切分,多少个分片则对应产生多少个Map计算。分片逻辑以某个值(splitSize)为基准,超过其1.1倍时,则再做分片

2021-03-16 20:02:45 444

原创 HDFS名称节点工作过程

一、名称节点(NameNode)1.什么是名称节点在HDFS中,名称节点负责管理分布式文件系统的命名空间(Namespace),保存了两个核心的数据结构-FsImage和EditLog。FsImage:用于维护文件系统树以及文件树中所有的文件和文件夹的元数据。EditLog:中记录所有针对文件创建、删除、重命名等操作的日志文件。名称节点记录了每个文件中各个块所在的数据节点的位置信息,但并不持久化存储这些信息,而是在系统每次启动时扫描所有数据节点重构得到这些信息。2.名称节点工作过程名

2021-02-15 15:24:32 5086 1

原创 HDFS读写过程-从类调用角度

一、HDFS相关类说明FileSystem:通用文件系统的抽象基类,可以被分布式文件系统继承,所有可能使用Hadoop文件系统的代码都要使用到这个类。DistributedFileSystem:Hadoop为FileSystem这个抽象类提供了多种具体的实现,DistributedFileSystem就是FileSystem在HDFS文件系统中的实现。FSDataInputStream:FileSystem的open()方法返回的是一个输入流FSDataInputStream对象,在HDSF文件系统中

2021-02-14 22:33:45 369

原创 Spark+Kafka实时监控Oracle数据预警

目标: 监控Oracle某张记录表,有新增数据则获取表数据,并推送到微信企业。流程: Kafka实时监控Oracle指定表,获取该表操作信息(日志),使用Spark Structured Streaming消费Kafka,获取数据后清洗后存入指定目录,Python实时监控该目录,提取文本里面数据并推送到微信。(Oracle一台服务器,Kafka及Spark在另外一台服务器)架构: Oracle+Kafka+Spark Structured Streaming+Pythoncentos7oracle

2021-02-03 19:07:15 408

原创 面向对象-基于Python代码例子简谈

最近同事聊起所谓面向对象,这里使用Python实现一个搜索引擎的小功能,来作为对面向对象的理解。相关概念网上很多,这里不再多复述,先从代码走起,再代入概念。代码功能:根目录下有1.txt 2.txt 3.txt 三个文件,实现输入文件名,返回是否有该文件及该文件的文件大小代码要实现的功能很简单,这里引用类的写法是为了作为解释面向对象的例子,简单功能可以无需这样写,如若是工程开发,这样写的好处在于通过面向对象的设计,定义好了各功能接口,模块化开发,实现高内聚低耦合。#SearchEngineBase

2021-01-29 19:38:11 335

原创 sqoop处理换行符--包含clob字段情况

一、sqoop同步oracle数据非clob字段:要处理换行符,在sqoop脚本中,参数使用下面任意一个即可#替换为空 --hive-delims-replacement "" #删除--hive-drop-import-delims 二、sqoop同步oracle数据clob字段clob字段一般包含大量文本,必然包含换行符,使用上面参数没有作用,需要使用下面参数:#指定clob列为String类型--map-column-java CLOB列=String 例子如下:orac

2021-01-26 18:20:13 1345 3

原创 Structured Streaming编程练习-日志分析

Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。日志一般会通过Kafka等有容错保障的源发送,本实验为了简化,直接将Syslog通过Socket源发送。新建一个终端,执行如下命令:$tail -n+1 -f /var/log/syslog | nc -lk 9988“tail -n+1 -f /var/log/syslog”表示从第一行开始打印文件syslog的内容。“-f”表示如果文件有增加则持续输出最新的内容。然后,通过管道把文件内容发送到nc程序(nc.

2021-01-25 17:25:53 1761 4

原创 Spark Structured Streaming延迟数据处理实例

一、Spark中水印作用Spark内部引擎的实现是保留内部状态的,以便让基于事件时间的窗口聚合可以更新旧的数据,但是如果一个查询持续运行多天,那么系统绑定中间状态累积的数量也会随之增加,为了释放资源,用户可以通过自定义水印来告知系统可以丢弃哪些在内存中旧状态的数据。自定义水印可以使用withWatermark()方法。二、延迟数据处理实例(基于pyspark)通过一个实例说明,Spark如何处理迟到数据以及水印在迟到数据处理的作用。该实例中,首先建立一个基于CSV文件的输入源,模拟实时写入CSV文件

2021-01-22 20:04:43 1352

转载 数据仓库之拉链表(原理、设计以及在Hive中的实现)

本文将会谈一谈在数据仓库中拉链表相关的内容,包括它的原理、设计、以及在我们大数据场景下的实现方式。全文由下面几个部分组成:1.先分享一下拉链表的用途、什么是拉链表。2.通过一些小的使用场景来对拉链表做近一步的阐释,以及拉链表和常用的切片表的区别。3.举一个具体的应用场景,来设计并实现一份拉链表,最后并通过一些例子说明如何使用我们设计的这张表(因为现在Hive的大规模使用,我们会以Hive场景下的设计为例)。4.分析一下拉链表的优缺点,并对前面的提到的一些内容进行补充说明,比如说拉链表和流水表的区别

2021-01-21 21:23:19 737

原创 hive.groupby.skewindata及数据倾斜优化

一、hive.groupby.skewindataset hive.groupby.skewindata=true;数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可

2021-01-20 20:01:06 4223

原创 浅谈Hive中Map Join原理及场景

Hive中的Join可分为Common Join(Reduce阶段完成join)和Map Join(Map阶段完成join)一、Map Join作用及原理作用简单来说,在Map阶段进行join,而不是Common Join那样在Reduce阶段按照join列进行分发后在每个Reduce节点上进行join,一来省去Shuffle这个代价昂贵的阶段,二来不需要分发也就没有倾斜的问题。ps:关于Shuffle的过程可读上篇文章:https://blog.csdn.net/rango_lhl/article

2021-01-19 17:47:10 592

原创 简要MR与Spark在Shuffle区别

一、区别①本质上相同,都是把Map端数据分类处理后交由Reduce的过程。②数据流有所区别,MR按map, spill, merge, shuffle, sort, reduce等各阶段逐一实现。Spark基于DAG数据流,可实现更复杂数据流操作(根据宽/窄依赖实现)③实现功能上有所区别,MR在map中做了排序操作,而Spark假定大多数应用场景Shuffle数据的排序操作不是必须的,而是采用Aggregator机制(Hashmap每个元素<K,V>形式)实现。(下面有较详细说明)p

2021-01-18 14:44:13 2037

原创 基于Python开发数据宽表实例

搭建宽表作用,就是为了让业务部门的数据分析人员,在日常工作可以直接提取所需指标,快速做出对应专题的数据分析。在实际工作中,数据量及数据源繁多,如果每个数据分析人员都从计算加工到出报告,除了工作效率巨慢也会导致服务器资源紧张。因此建设数据集市层,包含了该宽表层并在非工作时间做自动生成。本文引用CDNow网站的一份用户购买CD明细数据,梳理业务需求,搭建一套数据宽表。该CD数据包括用户ID,购买日期,购买数量,购买金额四个字段(此项目中用userid,datatime,products,amounts字段

2021-01-14 11:31:32 527

原创 Spark Streaming处理Flume数据练习

把Flume Source(netcat类型),从终端上不断给FlumeSource发送消息,Flume把消息汇集到Sink(avro类型),由Sink把消息推送给Spark Streaming并处理后输出版本信息:spark2.4.0 Flume 1.7.0(基于pyspark)一、Flume安装①、文件导入# 将apache-flume-1.7.0-bin.tar.gz解压到/usr/local目录下sudo tar -zxvf apache-flume-1.7.0-bin.tar.g.

2021-01-12 16:22:51 344

原创 for update语句锁机制问题

数据库小知识学习系列问题:MySQL InnoDB中,select where xxx=123 for update;该xxx没有索引,是使用表锁还是全部数据加行锁?答:InnoDB引擎(默认RR隔离级别)中的行级锁是依赖索引实现的,问题条件xxx为非索引字段,所以使用的是表锁。如若隔离级别为RC,则通过 where 条件走非索引列过滤之后(where xxx=123),不符合条件的记录上的行锁,会释放掉。也就是先表锁后释放非过滤条件的行锁。ps:如果是Oracle,由于默认使用RC隔离,效

2021-01-11 14:00:21 293

转载 K-means最优K值计算(利用SSE)

 参考代码def distEclud(vecA, vecB): """ 计算两个向量的欧式距离的平方,并返回 """ return np.sum(np.power(vecA - vecB, 2)) def test_Kmeans_nclusters(data_train): """ 计算不同的k值时,SSE的大小变化 """

2018-12-19 16:52:04 8587

转载 Python绘制雷达图(俗称六芒星)

原文链接:https://blog.csdn.net/Just_youHG/article/details/83904618 背景《Python数据分析与挖掘实战》 案例2–航空公司客户价值分析在该案例中的雷达图缺少相应的代码,查看相关文档之后,实现的代码如下。数据用于作图的数据对象名为data_cluster,数据展示如下:注:其中(ZL,ZR,ZF,ZM,ZC)...

2018-12-19 15:59:45 4664

转载 Pandas数据分析练手题(十题)

CSDN广告多,以后都是在博客园写,这边转载: https://www.cnblogs.com/rango-lhl/p/9729334.html------------------------------------------------------------------------------------------------------------------------------...

2018-10-04 23:01:28 13855 2

转载 Kaggle泰坦尼克-Python(建模完整流程,小白学习用)

CSDN广告多,以后都是在博客园写,这边转载:https://www.cnblogs.com/rango-lhl/p/9686195.html————————————————————————————————————————————————————————参考Kernels里面评论较高的一篇文章,整理作者解决整个问题的过程,梳理该篇是用以了解到整个完整的建模过程,如何思考问题,处理问题,...

2018-09-30 11:20:54 1482

原创 模型稳定度指标PSI

 群体稳定性指标PSI(Population Stability Index)是衡量模型的预测值与实际值偏差大小的指标。PSI = sum((实际占比-预期占比)* ln(实际占比/预期占比))举例:比如训练一个logistic回归模型,预测时候会有个概率输出p。测试集上的输出设定为p1吧,将它从小到大排序后10等分,如0-0.1,0.1-0.2,......。现在用这个模型去...

2018-08-03 15:20:06 18496

翻译 使用 NumPy 和 Pandas 进行 Python 式数据清理

原文地址:Pythonic Data Cleaning With NumPy and Pandas原文作者:Malay Agarwal译文出自:掘金翻译计划本文永久链接:github.com/xitu/gold-m…译者:bambooom校对者:luochen1992,Hopsken使用 NumPy 和 Pandas 进行 Python 式数据清理数据科学家花费大量时间清理数据集,将它们清理为可以...

2018-07-09 15:22:38 3602

原创 Oracle异常日期处理(ORA-01839:指定月份的日期无效)

跑数过程日期提示如下错误:在条件中现在了日期格式like '____-__-__' 依旧报错,格式没错误仍旧报错,一时没有头绪,后考虑用更高效的方式找出异常值,参考编写了如下函数:create or replace function fun_date_YorN(i_dt varchar2) return number is v_dt date;begin v_dt := to_date(...

2018-05-24 10:59:29 24725

转载 决策树(decision tree)——连续值处理

转载至决策树(decision tree)(三)——连续值处理| 连续值处理    因为连续属性的可取值数目不再有限,因此不能像前面处理离散属性枚举离散属性取值来对结点进行划分。因此需要连续属性离散化,常用的离散化策略是二分法,这个技术也是C4.5中采用的策略。下面来具体介绍下,如何采用二分法对连续属性离散化:    下面举个具体的例子,来看看到底是怎样划分的。给定数据集如下(数据集来自周志华《机...

2018-05-03 17:05:23 2072

原创 Python3中文词云制作

本文需要的两个Python类库 jieba:中文分词分词工具 wordcloud:Python下的词云生成工具文章来源为原博文从韩寒新浪博客爬取的309篇博文 Python3爬虫韩寒新浪博客文章 Python + worldcloud + jieba 十分钟学会用任意中文文本生成词云本次并未制定逻辑规则用代码剔除部分高频词汇,后续考虑在生成词云前新增一步词频排序,把超过一定次数的次列入过

2018-04-09 15:58:18 2029

空空如也

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除