自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(45)
  • 资源 (4)
  • 收藏
  • 关注

原创 Scala闭包和Java闭包

Scala 闭包  函数在变量不处于其有效作用域时,还能对变量进行访问。下面举例说明:// 定义函数,它的返回值是一个匿名函数def getFunc(msg : String) = (name : String) => println(msg + "," + name)// 定义两个函数变量val fuc1 = getFunc("hello")val fuc2 = getFu...

2019-11-06 15:55:00 288 1

原创 图 - DFS深度优先搜索和BFS广度优先搜索

图的概念  图是一种非线性表数据结构;图中的元素叫顶点(vertex),图中一个顶点可以与任意其他顶点建立连接关系,我们把这种建立的关系叫做边(edge),和顶点相连的边的条数叫度(degree);在有向图中又分为入度和出度,入度表示多少条边指向这个顶点,出度表示有多少条边是以这个顶点为起点指向其他顶点。图的存储方法1. 邻接矩阵(adjacency matrix)  邻接矩阵底层依...

2019-10-27 15:23:26 203

原创 SparkStreaming — 数据接收BlockGenerator源码分析

数据接收源码分析  上一篇博客中分析到,Receiver数据接收主要是通过BlockGenerator来进行接收和存储的,下面我们就源码来对照之前的流程进行分析。  首先是创建BlockGenerator的时候初始化的一些重要组件,如下所示: // blockInterval是有一个默认值的,默认是200ms,将数据封装成block的时间间隔 private val blockIn...

2019-09-17 21:05:12 300 1

原创 SparkStreaming — 数据接收原理

SparkStreaming的数据接收原理  Spark Streaming数据接收主要是发生在Receiver启动之后,启动的一个组件BlockGenerator,通过这个组件来进行数据的接收和存储。具体的流程如下:  如上图所示,假设Receiver接收的数据源来自Kafka。Receiver启动的时候会先启动BlockGenerator,调用它的start方法启动BlockGene...

2019-09-17 19:58:21 294

原创 Spark Streaming — StreamingCongtext初始化及Receiver启动

StreamingContext初始化  StreamingContext在初始化的时候,会创建两个重要的组件DStreamGraph和JobScheduler,如下所示: // 这里初始化的一个重要的组件DStreamGraph, // 它里面保存了Spark Streaming Application中一系列的DStream的依赖关系,以及互相之间的算子的应用 private[...

2019-09-17 17:25:54 275

原创 SparkStreaming — 架构原理分析

Spark Streaming运行原理  首先我们看一下Spark Streaming的运行流程,和之前Spark Core一样都是先创建Context,这里创建的是StreamingContext。Spark Streaming的启动和Spark Core不一样,它是调用StreamingContext.start()方法来启动,但是触发执行RDD算子也是一样的需要action操作,下面就先...

2019-09-15 22:48:45 190

原创 SparkCore — checkpoint机制

checkpoint的原理及其使用场景  checkpoint是Spark提供的一个高级的功能,对于特别复杂的Spark程序而言,从一开始的RDD开始到最后结束,有很多transformation操作,并且执行时间很长,假如这时候,可能出现某个反复使用的RDD,因为节点故障的缘故,导致数据丢失(虽然之前持久化,但依然可能导致丢失)。这时候,由于没有容错机制的存在,当后面transformati...

2019-09-11 11:04:11 187

原创 SparkCore — CacheManager源码分析

CacheManager源码分析  CacheManager主要发生在利用RDD的数据执行算子的时候,也就是在Task端ShuffleWriter的write方法写数据的时候,它传入了RDD的iterator方法作为参数。RDD的iterator()会读取或计算RDD的数据。我们分析一下iterator()方法:RDD.iterator()方法 final def iterator(sp...

2019-09-10 23:02:48 197

原创 SparkCore — CacheManager持久化原理

CacheManager持久化原理  CacheManager的主要功能就是在进行RDD计算的时候,会先从持久化的RDD中读取当前计算RDD需要的数据。  下面是RDD在进行计算的时候,读取数据的流程:  RDD在计算的时候,首先会判断是否被持久化过,如果没有持久化,那么会用父RDD的数据来执行我们定义的算子计算新的RDD的partition的数据;假设持久化了,那么使用CacheM...

2019-09-10 21:58:49 147

原创 SparkCore — BlockManager写数据,以及缓存淘汰机制

BlockManager写数据  上一篇博客,分析了BlockManager读取数据,主要分为本地拉取和远程拉取。现在我们分析一下写数据,主要是调用doPut()方法:doPut() private def doPut( blockId: BlockId, data: BlockValues, level: StorageLevel, tel...

2019-09-10 00:36:04 243

原创 SparkCore — BlockManager读取数据

BlockManager的读取数据操作  BlockManager读取数据分为从本地读取和从远程节点上拉取,我们这里分别分析一下这两种方式。首先分析从本地拉取数据的方法doGetLocal(),在进行分析这个方法之前,我们先看看BlockManager的初始化方法initialize():BlockManager初始化initializedef initialize(appId: Stri...

2019-09-09 21:52:12 239

原创 SparkCore — BlockManagerMaster信息注册

BlockManager  上一篇博客简单讲解了BlockManager的运行机制,BlockManager在创建的时候首先会向BlockManagerMaster进行注册,下面我们来看源码中,是如何注册的:register注册函数/** * 注册BlockManager */ private def register(id: BlockManagerId, ma...

2019-09-09 20:44:19 139

原创 SparkCore — Executor底层数据管理组件BlockManager

BlockManager的运行过程  为了方便的讲解运行过程,如下图所示:  上图中将BlockManager运行过程中比较重要的组件画出来了,其中每个BlockManager中有比较重要的四个组件,DiskStore负责对磁盘上的数据进行读写;MemoryStore负责对内存中的数据进行读写;BlockTransferService负责对远程其他节点的数据读写;NettyBlockTr...

2019-09-09 19:53:01 217

原创 SparkCore — ShuffleReader过程

Shuffle Reader  在之前的博客中,分析了shuffle map端的操作,map最终会将输出文件信息封装为一个MapStatus发送给Driver,然后ResultTask或ShuffleMapTask在拉取数据的时候,会先去Driver上拉取自己要读取数据的信息,比如在哪个节点上,以及在文件中的位置。下面我们来分析一下ShuffleReader,首先Map操作结束之后产生的RDD...

2019-09-08 17:12:11 255

原创 SparkCore — SortShuffle源码分析下

SortShuffle 源码分析下  接着上一篇博客,我们分析到了maybeSpill()它会分析是否需要进行溢写到磁盘操作,假如需要溢写那么会调用 spill()方法,这个方法是在Spillable中定义的,它是一个trait,因此我们找它的子类,是ExternalSorter,因此到这里看这个方法的源码:override protected[this] def spill(collect...

2019-09-07 16:44:47 167

原创 SparkCore — SortShuffle源码分析上

SortShuffle源码分析  在上一篇博客中,我们从源码的角度分析了HashShuffle的两种机制的区别,对应之前的理论分析,现在我们分析SortShuffle的两种机制的区别。  同理我们首先看SortShuffleWriter的write()方法:...

2019-09-06 16:48:04 197

原创 SparkCore — HashShuffle源码分析

HashShuffle源码分析  之前分析了两种Shuffle的区别,现在我们通过源码来进行分析,首先看HashShuffle,回顾之前流程,Executor在接收到LaunchTask的消息后,调用executor的launchTask()方法,将Task封装为一个TaskRunner(线程),然后放入线程池中执行,在执行的时候最终会调用Task.run()方法,这里面调用了runTask(...

2019-09-05 18:04:05 128

原创 SparkCore — Shuffle操作

Spark的shuffle操作  Spark的shuffle操作分为HashShuffle和SortShuffle,两种shuffle。其中在Spark1.2版本之前默认的都是HashShuffle操作,HashShuffle有一个非常严重的弊端就是在没有开启consolidation机制的情况下,会产生大量的中间磁盘文件,这对性能有很大的影响;而在Spark1.2版本之后,默认的Shuffl...

2019-09-05 16:29:04 219

原创 SparkCore —Task运行状态StatusUpdate消息

Task运行状态管理  TaskRunner.run()线程在执行的时候,会对Task运行状态进行管理,主要是一开始启动的时候,产生一个StatusUpdate(),主要是将Task的状态设置为RUNNING,在Task运行结束的时候,又产生了一个消息Task.FINISHED,假设task运行过程中出错,那么会产生Task.FAILED的消息,这些消息会通过execBackend.statu...

2019-09-03 18:08:42 917

原创 SparkCore — Task执行源码分析之Task.run()源码分析

Task.run()源码分析  上一篇博客分析了TaskRunner.run()源码,它里面有两个比较重要的方法一个是Task.run() — task的执行,还是就是task执行结束后将执行结果发送给Driver的StatusUpdate(),这里我们来分析Task.run()方法:final def run( taskAttemptId: Long, attemptNum...

2019-09-03 17:32:17 365

原创 SparkCore — Task执行源码分析之TaskRunner.run()

####Task原理与源码分析  在Executor注册完成之后,接收到Driver发送的LaunchTask消息之后,会调用executor执行句柄的launchTask()方法,里面封装了TaskRunner线程,然后将其放入线程池中运行,下面看一下TaskRunner的run()方法。  由于run方法代码比较长,我把它分为三个部分来说,一个是准备工作,一个是task的执行,一个是t...

2019-09-03 16:40:19 523

原创 SparkCore — Executor注册

Executor注册  注册的机制流程如下,CoarseGrainedExecutorBackend进程启动之后,会立即向Driver发送消息注册executor消息,Driver注册成功之后,会返回RegisteredExecutor消息。然后创建管理启动Task的句柄,将Task封装在TaskRunner线程中,并将其放入线程池运行。  直接看源码,如下所示:override de...

2019-09-02 20:26:44 180

原创 SparkCore — Task分配算法

Task分配算法  接着上一篇的Task最佳位置,我们分析了submitMissingTasks()方法,其中里面比较重要的:一个是task的最佳位置计算,另一个就是提交TaskSet给TaskScheduler。下面分析提交到TaskScheduler后的TaskSet中的task是如何被分配到Executor上去的。  默认情况下,standalone模式,是使用的TaskSchedu...

2019-09-02 19:41:00 729

原创 SparkCore — Task最佳位置

Task最佳位置  上一篇博客中stage划分完之后,就对stage进行提交,使用的是submitMissingTasks()这个方法对每个stage创建一个TaskSet,然后将其提交到对应的worker的executor上运行。下面分析一下具体的流程: private def submitMissingTasks(stage: Stage, jobId: Int) { // ......

2019-09-02 17:41:15 252

原创 SparkCore — stage划分算法源码分析

DAGscheduler中的stage划分源码分析  在之前的文章中,已经分析了stage的划分算法,这里我们到源码里面去看划分算法是怎么实现的。  首先找到提交Job的入口(从action操作开始,找到action操作的runJob -> dagScheduler.runJob -> submitJob -> eventProcessLoop.JobSubmitted -&...

2019-09-02 13:49:05 308

原创 SparkCore — Master资源调度,启动Executor

 上一篇文章讲解了Master的资源调度算法,对每个可用worker分配完资源之后,下面就需要在每个worker上启动相应的executor了,下面对源码进行分析:// 给每个worker分配完资源给application之后// 遍历每个worker节点for (pos <- 0 until usableWorkers.length if assignedCores(pos) &gt...

2019-09-01 17:23:56 131

原创 SparkCore — Master资源调度机制 schedule()分析

Driver向Master进行Application注册的时候,Master注册完之后,会调用schedule()方法,进行资源调度。下面我们对schedule()源码进行分析,schedule()源码如下: private def schedule(): Unit = { // 首先判断master状态不是alive的话,直接返回,也就是说standby是不会进行资源调度的 ...

2019-09-01 16:01:00 270

原创 SparkCore — Master主备切换

Master实际上是可以配置两个的,Standalone模式下也支持主备切换,也就是说当Active Master节点挂掉的时候,standby Master就会切换为Active Master。Spark Master主备切换一般常用的有两种机制:一个是基于文件系统的;一个是基于Zookeeper。基于文件系统的主备切换闷在主Master节点挂掉之后,需要手动切换到Standby节点上;而基...

2019-08-30 15:49:59 498

原创 Spark编程入口 — AppClient接收Master注册信息补充

在之前SparkContext初始化流程图最后,AppClient通过ClientEndPoint发送注册消息给Master,Master在接收到消息之后,首先调用registerApplication,提取Application的相关信息并保存在Master节点上,接着将Application的信息使用持久化引擎进行持久化(持久化引擎常用的有两种一个是FileSystemPersistenceE...

2019-08-30 14:21:21 204

原创 Spark编程入口 — SparkContext及其初始化

在之前我们分析了Spark的内核架构运行机制,其中有一个很重要的组件SparkContext,这里我们就分析SparkContext的相关源码。我们知道所有的Spark程序,在运行之前都需要创建一个非常重要的组件,那就是SparkContext,它负责读取SparkConf中的相关配置信息,并且初始化一些Spark Application运行时需要用到的两个重要的组件DAGScheduler和...

2019-08-30 11:56:51 719

原创 Spark内核架构

为了更加清晰的知道Spark作业是怎么运行的,有必要知道Spark的内核运行机制,我们以Standalone模式为例,剖析Spark的内核运行架构,内核架构图如下所示:我们首先在一台机器上提交Spark Application,通过Spark-submit脚本进行提交,提交Spark Application的这台机器会启动一个Driver进程,Driver进程启动之后,会执行我们的Sp...

2019-08-30 10:42:56 144

原创 Spark源码:Task最佳位置计算方法

在之前的文章中《SparkCore — stage划分算法源码分析​​​​​​​》,创建完Stages之后,就开始提交Stages,在DAGScheduler.scala的submitStage方法中,使用submitMissingTasks,提交第一个Stage0,并且剩余的Stage加入等待队列,waitingStages,剩余的Stage使用submitWaitingStages()方法提交...

2019-07-25 14:23:30 342

原创 Spark — stage划分算法原理分析,加上图形展示,详细的说明了stage的划分

DAGScheduler的stage分配算法原理分析为了直观的描述DAGScheduler的划分算法,我们使用上一篇分析Job触发原理的wordcount程序示例来进行分析,具体代码请看上一篇博客:Spark源码分析之 — Job触发流程原理与源码分析如下图所示,清晰的演示了stage是如何划分的:如上所示,DAGScheduler的stage划分算法,会从触发action操作的那...

2019-06-05 16:25:35 692

原创 Spark源码分析之 — Job触发流程原理与源码分析

为了能够很好的掌握Job是如何触发的,我们以一个WordCount的程序示例来进行分析,简单的Scala的代码如下(这里可以对着最新的Spark源码来看):// 初始化SparkConf,在本地运行val conf = new SparkConf() (1) .setAppName("WordCount") .s...

2019-06-02 11:24:08 345 3

原创 数据结构之跳表

(1)、定义跳表实际上是一种增加了前向指针的链表,是一种随机化的数据结构,实质上是可以进行二分查找的有序链表;跳表在原来的有序链表上加上了多级索引,通过索引来快速查找;可以支持快速的删除、插入和查找操作。(2)、跳表的理解对于一个单链表来讲,即使链表中存储的数据是有序的,如果我们想要在其中查找某个数据,也只能从头开到尾的遍历,查询效率低,时间复杂度是O(n)。如下图所示如何提交...

2019-05-27 00:23:20 6633 3

原创 线性排序 -- 计数排序

1、定义计数排序是桶排序的一种特殊情况,当要排序的n个数据,所处的范围并不大的时候,比如最大值是K,我们就可以把数据划分为k个桶,每个桶内的数值都是相同的,省去了桶内排序的时间。2、举例说明高考查分系统:比如50万考生,满分900,最低0分,最高900分,那么可以对应901个桶,对应分数从0~900,根据考生的成绩划分到这901个桶里,桶内都是分数相同的考生,不需要排序。因此只需要依次...

2019-05-11 00:37:12 146

原创 动态规划-走台阶和国王与金矿问题

(1)、走台阶有一座N级的台阶,从下往上走,每次走一个或两个台阶,那么走上N阶总共需有多少种方法?考虑:假设台阶一共10级,那么只差最后一步就走到10级台阶,这时有几种情况?有两种:第一种,从第9级到第10级;第二种从第8级到第10级。那么现在想想,不管之前0级-9级或者0级-8级走台阶的过程,最后一步肯定是从第9级或者第8级台阶走,假设从0级-8级台阶的走法有X种,0级-9级台阶的走...

2019-04-21 17:16:56 694

原创 二分查找的几种变体

1、查找第一个值等于给定值的元素假设数据是按照从小到大进行排序的,并且存在重复数据,从有序的数组中,查找第一个值等于给定的数据。public static int bsearch1(int[] arr, int value){ if(arr.length == 0) return -1; int low = 0; int high = a...

2019-04-08 10:31:47 381

原创 二分查找并实现求数的平方根

1、二分查找对一个有序的数据集合,查找的思想类似分治思想,每次通过跟区间的中间元素对比,将待查找的区间缩小为之前的一半,直到找到要查找的元素,或者区间缩小为0。/** * 注意在实现的时候: * 1、循环终止条件: low <= high * 2、mid的取值,假如low和high比较大的话,取 (low + high) / 2, * ...

2019-04-02 17:24:02 4209 2

原创 leetcode 第三题 无重复字符的最长子串

描述:给定一个字符串,请你找出其中不含有重复字符的最长子串的长度1、解决方法:建立一个HashMap,建立每个字符和其最后出现位置之间的映射;然后再定义两个变量res和left ,其中res用来记录最长无重复子串的长度,left指向该无重复子串左边起始位置的前一个,初始化为-1;遍历整个字符串,如果该字符已经在HashMap中存在,并且如果映射值大于left的话,那么更新lef...

2019-03-29 12:58:28 154

MapReduce WordCount Java API代码实现,包括pom.xml的配置

代码实现了 MapReduce的 WordCount,演示了MapReduce的基本编程模型,并且pom.xml也已经配置好,eclispse和IDEA皆可使用,JDK版本1.7即可。

2019-03-14

Hadoop HA 部署步骤

这是Hadoop HA的高可用性环境配置步骤,需要提前配置好Hadoop和zookeeper,按照上面的步骤就可以配置HA。

2019-03-14

UNIX操作系统

UNIX操作系统英文版,讲解详细,适合初学者

2015-10-27

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除