自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+

大白

  • 博客(74)
  • 收藏
  • 关注

原创 【Flink检查点/保存点提交】Flink检查点和保存点(何时生成|何时创建|何时启动|何时触发)

flink检查点在什么情况下触发,flink检查点如何重新运行,flink保存点在什么情况下触发flink如何在不取消当前应用时创建保存点 flink如何在去顶当前flink应用之前生成保存点flink如何从保存点处启动程序,flink启动时遇到的问题

2022-10-08 01:00:00 642

原创 【Flink检查点/保存点提交】Flink提交时的参数解释与内存占比计算

m:代表的是jobmanager-yn : 代表启动多少个taskManager-ys:代表每个taskManager启动多少个slot(默认一个slot一个core,因此当我们设置算子的并行度时,不要超过slot的总数)-yjm:设置jobManager的内存大小(单位MB)-ytm:设置每个taskManager的内存大小(单位MB)-ynm:提交任务时设置名称-c:指定main路径-d:简单点说就是开启后台执行。

2022-10-04 01:00:00 732 1

原创 【Flink检查点/保存点提交】Flink检查点与保存点的不同

保存点和检查点内部的生成算法是一致的,工作方式也一致,但保存点相比较检查点有什么不同呢?b) 保存点:由用户通过flink命令行或者web控制台进行手动触发。保存点相比较检查点来说存储了更为详细的一些元数据信息。a) 检查点:通过代码进行生成。

2022-09-30 01:00:00 187

原创 【Flink检查点/保存点提交】Flink提交报错与检查点生成多个问题

【Flink检查点/保存点提交】Flink提交报错与检查点生成多个问题。NoClassDefFoundError

2022-09-27 23:24:59 672

原创 【Flink读写外部系统】Flink自定义kafka分区并输出

Flink数据发送到kafka,并自定义Kafka分区;注意这里是通过FlinkkafkaProducer将数据发送到kafka;跟下面的检查点是不一样的FlinkkafkaProducer下的消费保障总共分为3级别val kafkaSink = new FlinkKafkaProducer[ResultDt]("topicName", kafkaPro, FlinkKafkaProducer.Semantic.EXACTLY_ONCE)

2022-09-25 01:00:00 931

原创 【Flink读写外部系统】Flink读取Kakfa与水位线的生成

Flink kafka提取事件时间生成水位线;可以通过调用FlinkKakfkaConsumer. assignTimestampsAndWatermarks()方法.同时向kafka消费者提供一个AssignerWithPeriodicWatermarks对象来定义水位线。 为了利用Kafka各个分区的保序性特征,分配器会在每个分区上定义水位线,然后再对各个分区之间的水位线进行合并。

2022-09-22 01:00:00 433

原创 【Flink读写外部系统】Flink异步访问外部系统_mysql

Flink异步读取Mysql操作Flink中提供的异步IO的模式,不需要使用map函数阻塞式的加载数据,而是使用异步方法同时处理大量请求 2. 不过这就需要数据库支持异步请求,如果不支持异步请求也可以手动维护线程池调用,只不过效率上没有原生的异步client更高效。 3. 比如Mysql可以通过Vertx支持异步查询(但我翻阅技术博客后发现,有的人说Vertx只支持scala版本2.12及以上的,具体没验证过),HBase2.x也支持异步查询

2022-09-19 23:26:07 847

原创 【Flink读写外部系统】Flink自定义数据源函数_输出

读取数据源并输出到socket中,DataStream API为我们提供了一个专门的SinkFunction接口以及它对应的RichSinkFunction抽象类

2022-09-17 01:00:00 184

原创 【构建并发程序】8-并发队列之阻塞队列

在创建ArrayBlockingQueue队列时,我们需要设置它的容量,即该队列能够容纳的元素数量的最大值。如果生产者创建元素的速度比消费者处理元素的速度快,那么就应该使用有界队列。否则,不断增长的无界队列会消耗光该程序的所有可用内存。LinkedBlockungQueue无界队列的例子;Main线程中对并发队列进行入队操作,其它线程对并发队列进行出队操作;如果消费者处理元素的速度要比生产者创建元素的速度快时,就可以使用该无界队列。

2022-09-14 01:00:00 95

原创 【构建并发程序】7-如何理解并发队列?

Q: 首先什么是队列?A: 队列的一个特点是先进先出;后进先出Q: 什么是并发队列?A: 同时有多个队列先进先出;后进先出。

2022-09-12 01:00:00 138

原创 【构建并发程序】6-并发集合(可变集合与不可变集合)

可互加微信,方便交流(备注请求时注明CSDN来的朋友):a917655983不可变集合使用原子变量更加效率 (因为不可变集合还需要重新赋值)可变集合可变集合使用synchronzied更加效率(因为可变集合无需赋值)(本文章虽然采用的代码为scala代码,但java代码与Scala代码可以互相转换,且本质上两者所阐述的东西都是一致的)

2022-09-10 01:00:00 109

原创 【构建并发程序】5-惰性值lazy与循环依赖问题

可互加微信,方便交流(备注请求时注明CSDN来的朋友):a917655983 假如我们对一个变量使用了lazy 则这个变量只有再被使用时才会执行初始化。惰性值在多线程中会遇到的问题 由于惰性值在访问时才会进行初始话,可以缩短程序的执行事件進而提高效率;问题:但是在多线程-线程池中,如果多个线程都需要访问惰性值,此时惰性值就会频繁的进行初始话,从而影响效率。对于我来说,我只想让惰性值被线程池中的某一个线程初始话一次就够了,没必要多个线程,每访问一次都要初始化一次。这个问题已经通过Scala(java同

2022-09-08 01:00:00 103

原创 【构建并发程序】4-原子变量-CAS-ABA问题

简单点就是,3个线程对某个原子变量val inte = new AutoicInteger(1)进行操作A线程要做inte.compareAndSet(1,2)B线程要做inte.compareAndSet(1,3)C线程要做inte.compareAndSet(3,1)A线程没抢过B,C两个线程,且BC两个线程中,B线程先执行的,然后是C线程,当C线程执行完后inte的内存地址已发生过两次偏移,新的地址值是1。

2022-09-06 01:00:00 219

原创 【构建并发程序】3-原子变量

原子变量还定义了其他复杂的原子操作,例如getAndSet,decrementAndGet,addAndGet之类的方法。而这些原子操作都实现了基本的原子操作(compareAndSet比较并转换操作),CAS会接收原子变量的当前值和新值,并在当前值等于预期值的情况下以原子方式将当前值替换为新值;原子变量被存储在支持复杂的可线性化操作的内存区域中。可线性化操作是指可立刻执行的任何操作,例如Volatile变量的写入操作就是一种可线性化操作。一个复杂的线性化操作至少包含2个步骤:读取和写入;

2022-09-04 14:15:50 183

原创 【构建并发程序】2-线程池-的注意事项与缺点

线程池-的注意事项与缺点;不写shutdown为什么堵塞:因为是守护线程,main线程没退出,守护线程也不会“退出”, awaitTermination不会退出,所以就在这堵塞了shutdown放在其前面,awaitTermination等待期间,shutdown也在等待任务的结束。2. 因为ForkJoinPool(Executor)对象中的线程是守护线程,因此shutdown要与awaitTermination组合使用,要不然主线程很快结束,线程还没执行呢,也就结束了。

2022-09-03 07:59:26 754

原创 【构建并发程序】1-线程池-Executor-ExecutionContext

在每秒数千个请求的情况下,为每个请求都要创建一个新线程会拖慢系统的运行速度,因此同一个线程应该可以能够让许多个请求反复来进行使用;这些可重用的线程通常被称为线程池。Executor是一个简单的接口,它定义了一个名为execute的方法,该方法接受Runnable对象,并且最终会调用该对象中的run方法。通常Executor接口会以并发的方式将Runnable对象,处理为调用execute方 法的线程,然后将这些线程实现为 “线程池”;

2022-09-02 20:25:34 202

原创 【Flink】Flink中什么是富函数

目录在什么场景中使用富函数?如何使用富函数?富函数中的方法open()/close()如何通过代码实现一个富函数?在什么场景中使用富函数?  许多时候我们需要在函数处理第一条记录之前就进行一些初始话工作或是取得函数运行时相关的上下文信息。如何使用富函数?  我定义了一个map算子,现在我如果要在map算子在处理第一条记录之前,就想取得该函数运行时的相关信息,和进行一些初始话工作。那么代码就应当类似于如下方式:**.map(x=> 定义一个富函数)富函数中的方法open()/close()

2020-05-22 17:10:21 2090

原创 【Flink读写外部系统】Flink重启策略/配置重启策略

目录重启策略配置重启策略重启策略Flink何时才会重启?一个拥有检查点的应用如果出现问题,他会经过一些列步骤来进行重启。重启过程中可能会出现的问题?有些时候,应用可能会被相同的故障不断“杀死”举例:  当我处理数据的时候程序出现了Bug,导致处理算子出现异常,此时程序就会陷入一个循环中:  启动任务、恢复状态、继续处理。在继续处理的时候,由于这个Bug没有被修复,然后  启动任务、恢复状态、继续处理。类似于这样周而复始   … …Q:问题君来了,这里面的“启动任务”我是知道的.

2020-05-19 09:21:05 2319 2

原创 【Flink读写外部系统】Flink自定义数据源函数_读取mysql

  实现自定义的数据源函数读取,Flink为我们提供了两个方法:SourceFunction通过实现RichSourceFunction来定义非并行的数据源连接器。ParllelSourceFunction通过试下RichParalleSourceFunction来定义能并行的数据源连接器。SourceFunction和ParallSourceFunction中定义了有两个方法:分别是run()方法和cancel()方法Run():  Run只会被

2020-05-17 11:15:06 1876

原创 【Flink】Flink_Kakfa自定义输出分区

在我们将消息写入kafka的topic时,我们可以通过FlinkkafkaPartitioner指定写入topic的哪个分区。在不指定的情况下,默认的分区器会将每个数据任务映射到一个单独的kafka分区中,即单个任务的所有记录都会发往同一分区。如果任务数多余分区数,则每个分区可能会包含多个任务发来的记录。 而如果任务数小于分区数,则默认配置会导致有些分区收不到数据。若此时恰好有使用事件时间的Flink应用消费了该Topic,那么可能会导致问题;导致问题的原因Flink_Kafka为了利用Ka.

2020-05-16 19:58:53 794

原创 【Flink】Flink_Kakfa详解及描述

目录Topic(主题与分区)Flink_Kafka连接器(flink-connector-kafka_2.12)Flink_Kafka提取事件时间生成水位线Topic(主题与分区)Kafka将事件流,组织为不同的主题(Topic)。每个主题都是一个事件日志,其事件读取顺序和写入顺序完全相同。为了实现主体读写的伸缩性,Kafka允许用户在创建topic时定义分区,即将topic拆分为多个分区分散在集群上。同时对每个分区上的事件提供保序,但kafka不能保证不同分区之间的事件顺序,因此

2020-05-16 19:49:46 348

原创 【Flink】Flink读写文件

读写文件1 读取文件-readFile2 写入到文件-StreamingFileSink    2.1 在了解-StreamingFileSink之前你需要了解的知识点        2.1.1 结论    2.2 行编码        2.2.1 行编码自定义-BucketAssigner    2.3 批量编码        2.3.1 批量编码自定义-BucketAssigner1 读取文件-readFileQ:什么是文件数据源?A:Apache Flink提供了一个可重置的数据源连接器,支持将

2020-05-15 10:57:25 9159

原创 【Flink】Flink事件时间何时触发窗口计算

Flink使用事件时间时何时才会触发窗口计算1.1 浏览本博客前你需要了解的知识点    flink内部是划分窗口范围的?    触发的条件?    何时第一次触发?1.2 示例:触发窗口计算    第一次触发计算    何时第二次触发计算    何时触发多个窗口计算?思考:    什么时候才会触发窗口计算?    既然使用的是事件时间那么必然会涉及到水位线(water_mark),水位线在其中扮演的角色是什么?    此时我们带着疑问,一步一步的探究注意:1、本篇博客中的所有解释都是在滚动窗口

2020-05-12 15:55:41 4893 1

原创 【Flink】Flink时间概念与水位线

注意:本篇博客中的所有解释都是在滚动窗口的前提下目录1 时间概念类型1.1 事件生成时间(Event Time)1.2 事件接入时间(Ingestion Time)1.3 事件处理时间(Processing Time)1.4 指定时间概念2 water_mark(水位线)2.1 water_mark是如何生成的?2.2 最大允许乱序时间如何理解?2.3 Flink中的water_marker机制1 时间概念类型1.1 事件生成时间(Event Time)    在事件时间模式下,Flink流式应用处

2020-05-11 18:40:35 887

原创 5、算法-综合以上4步进行总和

数组实现栈package base.第一章.背包_队列_和栈.数组实现定容栈;import java.lang.Iterable;import java.util.Iterator;/** * Created by MK on 2018/7/31. * 现在我们就通过上面所学的迭代,来重新定义下压栈 * { * 这份泛型的可迭代的Stack API的实现是所有集合类(实现...

2018-08-08 17:41:49 223

原创 4、算法-定容栈数组调节长度

代码package base.第一章.背包_队列_和栈.数组实现定容栈.定容栈;/** * Created by MK on 2018/7/25. * 定容栈调节数组长度 * 选择用数组表示栈,意味者用例必须要预估栈的大小,同时还需要时刻检测栈是否满,防止push的时候溢出,这样很不好。 * <p> * 为此我们需要修改数组的实现,动态调整数组a[]的大小,使得它即...

2018-08-08 17:40:16 260

原创 3、算法-定容栈泛型

代码package base.第一章.背包_队列_和栈.数组实现定容栈.定容栈;/** * Created by MK on 2018/7/24. * public class FiexedCapacityStackOfItem<Item> * FiexedCapacityStackOfItem(int cap) * void push(Item item) * I...

2018-08-08 17:38:17 160

原创 2、算法-定容栈

代码package base.第一章.背包_队列_和栈.数组实现定容栈.定容栈;/** * Created by MK on 2018/7/23. * 定容栈: 只能处理String值,并且容量要指定一个固定的数,且不支持迭代 * <p> * public class FiexedCapacityStackOfStrings * FiexedCapacityStac...

2018-08-08 17:36:52 470

原创 1、算法-数组实现定容栈(前提需要了解的)

N++与++N假如我有一个定长的数组 a =[0,3,2,1] ,其中a.length = 4那么我如果N++的话,先输出4,再输出5,6,7那么如果是++N的话,先输出5,然后输出6,7,8  同理--N与N--与上面是一样的逻辑定容栈的实现是许多用例(集合数据类型--Bag,Stack,Queue )的基础模块定容栈:只能处理String值,并且容量要指定一...

2018-08-07 14:22:50 228

原创 2、算法-双栈之算术表达式

package base.第一章.背包_队列_和栈.双栈之算术表达式;import base.第一章.背包_队列_和栈.api.下压栈.Stack;import edu.princeton.cs.algs4.StdIn;import edu.princeton.cs.algs4.StdOut;/** * Created by MK on 2018/7/20. * 使用双栈,求得算...

2018-08-07 14:17:51 334

原创 1、算法-算式表达式原理

(1+ ( (2+3) * (4 * 5)))我们如何通过算法来计算上面的表达式呢?方法:用2个栈(一个用于保存运算符,一个用于保存操作数),其中的运算符包括:+,-,*,/,(,),表达式有括号、运算符、和操作数(数字)组成。我们根据以下4种情况从左到右逐个将这些实体送入栈处理将操作数压入操作数栈将运算符压入运算符栈忽略左括号在遇到右括号时,弹出一个运算符,弹出所需数...

2018-08-07 14:14:41 617

原创 1、算法-算术表达式

算术表达式求值(1+ ( (2+3) * (4 * 5)))我们如何通过算法来计算上面的表达式呢?方法:用2个栈(一个用于保存运算符,一个用于保存操作数),其中的运算符包括:+,-,*,/,(,),表达式有括号、运算符、和操作数(数字)组成。我们根据以下4种情况从左到右逐个将这些实体送入栈处理将操作数压入操作数栈将运算符压入运算符栈忽略左括号在遇到右括号时,弹出一个...

2018-08-07 14:11:26 649

原创 4、算法-队列

队列package base.第一章.背包_队列_和栈.api.队列;import java.util.Iterator;/** * Created by MK on 2018/7/18. * 先进先出(FIFO)队列,链表实现 */public class Queue<Item> implements Iterable<Item> { pr...

2018-08-07 14:04:25 157

原创 3、算法-背包(API)

背包package base.第一章.背包_队列_和栈.api.背包;import java.util.Iterator;import java.util.NoSuchElementException;/** * Created by MK on 2018/7/17. * 背包是一种不支持从中删除元素的的集合数据类型----它的目的就是帮助用例收集元素并迭代遍历所有收集到的元素...

2018-08-07 14:03:16 195

原创 2、算法-下压栈(API)

下压栈package base.第一章.背包_队列_和栈.api.下压栈;import java.util.Iterator;/** * Created by MK on 2018/7/19. * 创建栈(Stack)---以链表的方式实现 * public class Stack<Item> implements Iterable<Item> * S...

2018-08-07 14:02:02 190

原创 1、算法-背包、队列和栈(API)提前需要了解的

       照例,我们对集合型的抽象数据类型的讨论从定义他们的API开始,如表1.3.1所示。每份API都含有一个无参数的构造函数、一个向集合中添加单个元素的方法、一个测试集合是否为空的方法和一个返回集合大小的方法。Stack和Queue都含有一个能够删除集合中的特定元素的方法。  背包 Public class Bag<Item> implem...

2018-08-07 14:00:14 232

原创 算法-标准输入和输出

package base.第一章;import edu.princeton.cs.algs4.StdIn;import edu.princeton.cs.algs4.StdOut;import edu.princeton.cs.algs4.StdRandom;/** * Created by MK on 2018/5/22. * 标准输入和标准输出 */public clas...

2018-08-07 13:50:31 2377

原创 算法-标准绘图(StdDraw)

标准会图库很简单:我们可以将它想象为一个抽象的能够在二维画布上画出点和直线的绘图设备。这个设备能够调用StdDraw中的静态方法画出一些基本的几何图形,包括点、直线、圆、多边形等。使用标准绘图库的话,需要把algs4.jar放到我工程的依赖中,我存到了H盘package base.第一章;import edu.princeton.cs.algs4.StdDraw;i...

2018-08-07 13:48:54 12894 2

原创 算法-二维数组的各种运算

public class 数组_二维数组 { //找出数组中最大的元素 private static Double max(int[] a) { double max = a[0]; //如果after > before,则将大的这个值赋值给上一个数 for (int i = 1; i < a.length; i++) ...

2018-07-19 14:06:51 8168

原创 算法-2个返回函数的递归

多个函数的递归public class two个函数的递归 { private static int fibonacci(int n) { if (n == 0) { return 0; } else if (n == 1) { return 1; } else { ...

2018-07-19 14:04:12 708

空空如也

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除