自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(190)
  • 收藏
  • 关注

原创 Flink SQL自定义jobName

Flink 版本1.14.0通过StreamTableEnvironment获取对应的Configuration 然后设置pipeline.name就可以自定义jobName了 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Configuration configuration = tableEnv.getConfig().getConfiguration(); .

2022-03-31 14:31:19 4666

原创 Flink idea测试checkPoint

在Flink StreamExecutionEnvironment对应的configuration中新增配置execution.savepoint.path就可以在启动Flink任务的时候默认从上一次的状态中恢复过来 Configuration configuration1 = new Configuration(); //flink parallelism=16 savepoint state// configuration1.setString("ex

2022-03-31 14:22:59 4081 3

原创 Mac搭建Flink集群

1.安装brew/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"2.安装Flinkbrew install apache-flinkflink安装完成后会有一个本地安装地址/usr/local/Cellar/apache-flink/1.14.43.启动Flink集群cd/usr/local/Cellar/apache-flink/1.14.4..

2022-03-15 14:27:50 2364

原创 数据治理闲谈

第一层:数据分层、分割数据域、数据准确性、数据产出的稳定性 第二层:数据血缘、打通数据孤岛(OneID)、统一数据服务(OneService)第三层:成本分摊

2022-03-04 11:34:22 232

原创 flink SQL 内置函数源码

flink 版本1.12org.apache.flink.table.planner.codegen.calls.StringCallGen这个类里面搜索函数名称就可以找到对应的实现类和对应的实现方法参考博客

2021-12-27 17:44:12 1447

原创 flink 1.12 SQL Demo

Flink 版本 1.12.3source是kafka 维表是MySQL source left join 维表public class FlinkTableDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ...

2021-12-24 11:15:23 1674

原创 Springboot整合flink过程思考

本人不是专业的后端开发,以下只代表个人看法,如有问题请指出。目标希望将flink的开发变成和Springboot注解开发一样,提高开发效率。具体实践通过nacos拉取配置消息,然后初始化连接池。然后在flink function中使用的对应的mapper文件,使用@Autowired注入然后调用对应的方法,但是在调用function类用的都是new此时会抛出NullPointerException。解决方案一 不使用@Autowired 而且在open中通过getBean()获取对应的实例,但是这

2021-12-15 18:13:23 3531

原创 Streaming Systems 笔记一

将对无界数据的处理分为4个方面:1.和时间无关 没有Event Time2.近似计算3.Windowing by Processing Time4.Windowing by Event Time处理4个方面的通用方式:3个W一个H1.What results are calculated? Answered via transformations 计算的结果是什么?答案...

2021-10-31 19:08:50 377

原创 Springboot遇到的异常

1.No tests found for given includesimport org.junit.Test 修改为 import org.junit.jupiter.api.Test

2021-08-10 14:24:43 286

原创 实时数仓的思考

目前主流的数仓构建思想都是以维度建模为主,以下都是在工作中的一些思考和想法。1.维表延迟问题主流join动态无时序维表时如何解决因为维表延迟而导致的数据不准问题?目前来看好像只能通过批处理来修复流数据。如果通过批处理来修复那还能说是kappa架构?2.历史数据的处理以及新增需求后的刷数这里说的历史数据处理和刷数的需求在kappa架构不是不能实现而且需要考虑到时间长度的问题。当数据量级上去后使用标准的流式架构处理大量的历史数据(ETL)是一个非常耗时的问题。有时一天都完成不了刷数任务而且由于是使

2021-08-03 14:32:35 340

原创 Json问题记录

1.在使用jackson时字段无法赋值举个栗子:private String aTest;此时json中的字段也是 aTest:"aaa"但是一直赋值失败,zu

2021-06-10 16:43:47 92

原创 Druid 笔记一

1Apache Druid简介及架构 1.1 Druid概述1.1.1 什么是Druid数据分析的基础架构可以分为以下几类:使用Hadoop/Spark进行分析 将Hadoop/Spark的结果导入 RDBMS 中提供数据分析 将结果保存到容量更大的 NoSQL 数据库中,解决数据分析的存储瓶颈,例如:HBase 将数据源进行流式处理,对接流式计算框架(如Storm、Spark、Flink),结果保存到 RDBMS或NoSQL中 将数据源进行流式处理,对接分析数据库,例如:Dr...

2021-01-24 16:40:37 305 2

原创 Kylin 笔记一

1Kylin概述1.1 背景、历史及特点 数据源和模型:主要支持Hive、Kafka构建引擎:早期支持MapReduce计算引擎,新版本支持Spark、Flink计算引擎。除了全量构建外,对基于时间的分区特性,支持增量构建存储引擎:构建好的Cube以Key-Value的形式存储在HBase中,通过优化Rowkey加速查询。每一种维度的排列组合计算结果被保存为一个物化视图,叫Cuboid优化算法:Cube本身就是用空间换时间,也会根据算法,剪枝优化掉一些多余的Cuboid,寻求平衡 访问接口:支持标.

2021-01-20 17:49:04 360

原创 Kudu 笔记一

第一部分 概述基于HDFS的存储技术,比如Parquet,具有高吞吐量连续读取数据的能力;而HBase和Cassandra等技术适用于低延迟的随机读写场景,那么有没有一种技术可以同时具备这两种优点呢? Kudu提供了一种“happy medium”的选择:数据模型KUDU 的数据模型与传统的关系型数据库类似,一个 KUDU 集群由多个表组成,每个表由多个字段组成,一个表必须指定一个由若干个(>=1)字段组成的主键,如下图: Kudu更像关系型数据库,而不是像HBas.

2021-01-20 09:44:52 168

原创 spark 整合kudu注意事项

kudu引擎暂不支持sparkSQL中 <、>、or 这些谓词下推,支持like,但仅限于“201907%”这种形式,不支持“201907%02”这种形式 使用SparkSQL读取Kudu表时,必须使用between或者in来指定range分区字段的范围,否则会变成全表扫描,效率极低!!! 插入数据、更新数据和删除行的df必须包含所有主键,主键不可为空 删除kudu分区会同时删除分区内的数据 ...

2021-01-14 11:37:08 215

原创 SPI

大佬文章大佬文章中已经说的很清楚了,我这里基于自己的理解简单抽象一下。创建父类A,A对应的子类B和C。在META-INF/services下创建文件A,文件名称是A的全路径名称。A文件中编写B和C的全路径名称。然后通过SPI就可以自动将B和C加载到内存中。...

2020-12-25 11:21:32 84

原创 Flink 源码解析 Flink-Connector (Kafka)

flink版本:<flink.version>1.10.0</flink.version>FlinkKafkaConsumerBase:所有Flink Kafka Consumer数据源的基类。这个类实现了所有Kafka版本的公共行为open方法 @Override public void open(Configuration configuration) throws Exception { // determine the offset commit mode

2020-12-23 19:48:17 1266

原创 Flink 笔记二 Flink的State--状态原理及原理剖析

1.状态类型State按照是否有key划分为KeyedState和OperatorState Keyed State:KeyedStream流上的每一个Key都对应一个State(1)Keyed State表示和Key相关的一种State,只能用于KeydStream类型数据集对应的Functions和 Operators之上。Keyed State是Operator State的特例,区别在于 Keyed State 事先按照key对数据集进行了分区,每个 Key State 仅对应ー个O

2020-12-22 17:40:59 1626 4

原创 数仓笔记 二

1.事实表分类1)事务事实表事务事实表记录的事务层面的事实,保存的是最原子的数据,也称“原子事实表”。事务事实表中的数据在事务事件发生后产生,数据的粒度通常是每个事务一条记录。一旦事务被提交,事实表 数据被插入,数据就不再进行更改,其更新方式为增量更新。2)周期快照事实表周期快照事实表以具有规律性的、可预见的时间间隔来记录事实,时间间隔如每天、每月、每年等等。典型的例子如销售日快照表、库存日快照表等。它统计的是间隔周期内的度量统计,如历史至今、自然年至今、季度至今等等。 ...

2020-12-15 14:17:17 182

原创 impala 笔记一

1.Impala的诞⽣Impala抛弃了MapReduce使⽤了类似于传统的MPP数据库技术,⼤⼤提⾼了查询的速度。2.MPP是什么?MPP (Massively Parallel Processing),就是⼤规模并⾏处理,在MPP集群中,每个节点资源都是独⽴享有也就是有独⽴的磁盘和内存,每个节点通过⽹络互相连接,彼此协同计算,作为整体提供数据服务。简单来说,MPP是将任务并⾏的分散到多个服务器和节点上,在每个节点上计算完成后,将各⾃部分的结果汇总在⼀起得到最终的结果。对于MPP架构.

2020-12-15 10:21:26 174

原创 Hbase 笔记 一

Hbase是一种无模式(schema-less)的数据库。

2020-12-15 09:25:26 108

原创 Spark编码和参数优化

编码优化:① RDD复用 ② RDD持久化 ③ 巧用 filter ④ 选择高性能算子 ⑤ 设置合并的并行度 ⑥ 广播大变量 ⑦ Kryo序列化 ⑧ 多使用Spark SQL ⑨ 优化数据结构 ⑩ 使用高性能库参数优化:① Shuffle调优 ② 内存调优 ③ 资源分配 ④ 动态资源分配 ⑤ 调节本地等待时长 ⑥ 调节连接等待时长 1 编码的优化1、RDD复用避免创建重复的RDD。在开发过程中要注意:对于同一份数据,..

2020-12-11 19:07:20 482

原创 Spark 数据倾斜优化

1.基本概念什么是数据倾斜 Task之间数据分配的非常不均匀数据倾斜有哪些现象Executor lost、OOM、Shuffle过程出错、程序执行慢 单个Executor执行时间特别久,整体任务卡在某个阶段不能结束 正常运行的任务突然失败大多数 Task 运行正常,个别Task运行缓慢或发生OOM数据倾斜造成的危害有哪些个别任务耗时远高于其它任务,轻则造成系统资源的浪费,使整体应用耗时过大,不能充分发挥分布式系统并行计算的优势 个别Task发生OOM,导致整体作业运行

2020-12-11 18:47:23 148

原创 spark BlockManager

BlockManager是一个嵌入在 Spark 中的 key-value型分布式存储系统,也是Master-Slave 结构的,RDD-cache、 shuffle-output、broadcast 等的实现都是基于BlockManager来实现的:shuffle 的过程中使用 BlockManager 作为数据的中转站 将广播变量发送到 Executor 时, broadcast 底层使用的数据存储层 如果对一个 RDD 进行了cache,CacheManager也是把数据放在了BlockMa

2020-12-11 18:33:49 316

原创 spark 内存管理演进

1.堆内内存与堆外内存 作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。1.1堆内内存堆内内存的大小,由 Spark 应用程序启动时的 executor-memory 或s.

2020-12-11 18:20:00 93

原创 spark shuffle演进

1.Shuffle定义在 Spark 或 MapReduce 分布式计算框架中,数据被分成一块一块的分区,分布在集群中各节点上,每个计算任务一次处理一个分区,当需要对具有某种共同特征的一类数据进行计算时,就需要将集群中的这类数据汇聚到同一节点。这个按照一定的规则对数据重新分区的过程就是Shuffle。2.Spark Shuffle的两个阶段SparkShuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的

2020-12-11 15:20:23 204 1

原创 spark yarn 提交流程

1.Client向RM提交请求,上传jar包到HDFS上2.RM在集群中选择一个NM,在其上启动AppMaster,在AppMaster中实例化SparkContext(Driver)3.AppMaster向RM注册应用程序并申请资源。RM监控AppMaster的状态直到AppMaster结束。4.AppMaster申请到资源后,与NM通信在Container中启动Executor进程5.Executor向driver反向注册,申请任务6.Driver对应用进行解析,最后将Task发送到Exe..

2020-12-09 11:14:16 1794

原创 spark源码分析一:spark join 操作何时是宽依赖,何时是窄依赖

问题来源:join 操作何时是宽依赖,何时是窄依赖测试代码:object JoinDemo { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "D:\\hadoop-2.9.2") val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster(

2020-12-09 10:11:28 585

原创 spark livy session使用一

python调用livy session restfulAPI demo:import jsonimport reimport sysimport textwrapimport timeimport requestsdef extract_table_name_from_sql(sql_str): # remove the /* */ comments q = re.sub(r"/\*[^*]*\*+(?:[^*/][^*]*\*+)*/", "", sql_str)

2020-12-03 12:57:07 1213

原创 spark livy batch使用一

用python调用livy restfulAPI demo:import json, pprint, requests, textwrapimport time, syshost = 'http://localhost:8998'headers = {'Content-Type': 'application/json'}batchs_url = host + '/batches'print(batchs_url)data = {'file': '自定义.jar' , 'cla

2020-12-03 11:53:25 827

原创 数据结构与算法 笔记一 数据结构与算法总览

数据结构主要分为三大块:一维: 基础: 数组 Array(String)、链表 Linked List 高级:栈 stack、队列 queue 、双端队列 deque、集合 set、映射 map(hash or map)、etc 二维: 基础:树tree、图 graph 高级:二叉搜索树 binary search tree(red-black tree、AVL)、堆 heap、并查集 disjoint set、字典树 trie、etc 特殊: 位运算bitwise、...

2020-11-20 16:25:05 95

原创 scala 问题笔记

1.scala string转java map 使用的是fastjsonJSON.parseObject(args(0), classOf[util.Map[_, _]])

2020-11-11 09:39:44 103

原创 spark 作业笔记一

1.累加器不支持自定义类型2.SparkSQL和Spark的计算性能差异来自于DataFrame3.df.select(rand())的作用是生成均为正数的浮点随机数

2020-11-11 09:15:54 128

原创 python 问题记录和解决方式

1.在settings.py中定义了常量,如何在别的python中使用导包from flask import current_appcurrent_app.config.get("XXX")

2020-11-10 16:03:16 152

原创 spark 问题笔记

1.在idea中运行报错:scala.Product.$init$(Lscala/Product;)V检查pom文件中scala的版本和idea中scala的版本是否一致 idea中scala版本为2.11解决办法:<artifactId>spark-core_2.12</artifactId> -> <artifactId>spark-core_2.11</artifactId>2.idea中java.io.IOException: ...

2020-10-22 12:48:21 361

原创 zeepelin 遇到的问题

1.zeepelin启动成功后web页面访问失败解决:修改conf/zeppelin-site.xml 后访问成功<property> <name>zeppelin.server.addr</name> <value>0.0.0.0</value> <description>Server binding address</description></property>...

2020-09-28 11:30:07 338

原创 CAP+BASE

CAP 三指标1.一致性(Consistency)2.可用性(Availability)3.分区容错性(Partition Tolerance)1.一致性说的是客户端的每次读操作,不管访问哪个节点,要么读到的都是同一份最新写入的数据,要么读取失败。2.可用性说的是任何来自客户端的请求,不管访问哪个非故障节点,都能得到响应数据,但不保证是同一份最新数据。你也可以把可用性看作是分布式系统对访问本系统的客户端的另外一种承诺:我尽力给你返回数据,不会不响应你,但是我不保证每个节点给你的数据都是最新的。这

2020-09-25 17:11:10 111

原创 离线数仓拉链表

1、创建表加载数据(准备工作)-- 用户信息 DROP TABLE IF EXISTS test.userinfo; CREATE TABLE test.userinfo( userid STRING COMMENT '用户编号', mobile STRING COMMENT '手机号码', regdate STRING COMMENT '注册日期') COMMENT '用户信息' PARTITIONED BY (dt string) row format delimite

2020-09-20 14:48:18 303

原创 kafka 笔记十 kafka 延时队列

延时队列两个follower副本都已经拉取到了leader副本的最新位置,此时又向leader副本发送拉取请求,而leader副本并没有新的消息写入,那么此时leader副本该如何处理呢?可以直接返回空的拉取结果给follower副本,不过在leader副本一直没有新消息写入的情况下,follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。 Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes,由参数fetch.min.bytes配置,

2020-08-26 18:32:16 6079

原创 kafka 笔记九 kafka 消息重复

消息重复的场景及解决方案 消息重复和丢失是kafka中很常见的问题,主要发生在以下三个阶段: 1. 生产者阶段 2. broke阶段 3. 消费者阶段1 生产者阶段重复场景生产发送的消息没有收到正确的broke响应,导致生产者重试。生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。 说明:1. new KafkaProducer()后创建一个后台线程KafkaThre

2020-08-26 17:32:34 1203

空空如也

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除