自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(112)
  • 资源 (3)
  • 收藏
  • 关注

原创 spark-streaming使用spark-ML模型实时预测

不得不说Spark是一款优秀的计算引擎,继承Spark-ML、Spark-Graphx机器学习和图计算框架,Spark-ML一般用于离线分析和挖掘,生成模型。如果我们把模型保存在HDFS,需要在实时计算里面使用提前训练好的模型,解决方案如下:1、通过转换序列化方式,把模型转换成可以被其他语言调用的方式,如:java、python2、在spark-streaming中使用具体读取kafak的配置信息和保证EOS的不在这里体现,主要体现如何使用Spark-ML训练好的模型,具体代码如下:val sp

2020-05-26 17:59:40 1733 1

原创 Spark-SQL处理小文件问题

一、小文件产生的原因1、在使用spark sql处理数据的过程中,如果有shuffle产生,依赖于spark.sql.shuffle.partitions配置信息,默认为200,当处理的数据量比较大时,通常会把该值调大,以避免单个分区处理的数据太大出现异常或者拖慢整个任务的执行时间。2、如果没有shuffle产生,文件的数量依赖于数据源的文件数量以及文件是否可切分等特性决定任务的并发度即tas...

2020-04-03 23:14:57 3535 2

原创 Spark-SQL adaptive 自适应框架

一、自适应框架能解决什么问题1、目前SparkSQL中reduce阶段的task个数取决于固定参数spark.sql.shuffle.partition(默认值200),一个作业一旦设置了该参数,它运行过程中的所有阶段的reduce个数都是同一个值。而对于不同的作业,以及同一个作业内的不同reduce阶段,实际的数据量大小可能相差很大,比如reduce阶段要处理的数据可能是10MB,也有可能是...

2020-04-03 22:56:06 5759 3

原创 spark批量写入redis

最近工作中,在融合数据的时候,需要将10亿+的记录push到redis中,运维的同学帮忙搭建好redis集群,100主 + 100 从 (单节点8G),最开始打算第一次批量写入使用spark去写入到redis,因为数据存放在Hive表。一、相关依赖的jar包compile group: 'com.redislabs', name: 'spark-redis', version: '2.3.0'...

2019-11-26 10:13:23 2954

原创 spark streaming读取kafka 零丢失(四)

在移动互联网时代,处处都存在着实时处理或者流处理,目前比较常用的框架包括spark-streaming + kafka 等;由于spark-streaming读取kafka维护元数据的方式有1、通过checkpoint保存2、Direct DStream API 可以通过设置commit.offset.auto=true 设置自动提交3、自己手动维护,自己实现方法将消费到的DStream中的...

2019-09-29 19:44:09 367 5

原创 spark mllib IDF源码解析

通常在文本向量化的过程中,通常用的比较多的就是tf-idf、word2vec、CountVectorizer的这几个方法,前面的博客有分析过HashingTF和CountVectorizer方法,今天来分析IDF,通常IDF和HashingTF一期联合使用。1、IDF计算公式:计算公式:idf = log((m + 1) / (d(t) + 1))m : 代表语料库文档数量 t :包含该词...

2019-08-23 10:19:25 329

原创 spark mllib CountVectorizer源码解析

CountVectorizer和CountVectorizerModel旨在通过计数来将一个文档转换为向量。当不存在先验字典时,Countvectorizer可作为Estimator来提取词汇,并生成一个CountVectorizerModel。该模型产生文档关于词语的稀疏表示,其表示可以传递给其他算法如LDA。 在fitting过程中,countvectorizer将根据语料库中的词频排序从高到...

2019-08-20 10:47:14 570

原创 spark mllib HashingTF解析

在处理文本数据,尤其是自然语言处理的场景中,hashingTF使用的比较多;Mllib使用hashing trick实现词频。元素的特征应用一个hash`函数映射到一个索引(即词),通过这个索引计算词频。这个方法避免计算全局的词-索引映射,因为全局的词-索引映射在大规模语料中花费较大。但是,它会出现哈希冲突,这是因为不同的元素特征可能得到相同的哈希值。为了减少碰撞冲突,我们可以增加目标特征的维...

2019-08-19 20:49:56 5255

原创 Hive统计每日新增及其二日和三十日回访比例

数据如下:一、求每日新增方法:每日新增即用户第一次访问,那么此时按照用户的id为key做分组,求他访问的最大时间和最小时间(天);如果最大时间等于最小时间,那么说明用户是第一次访问,否则不是;那么总的来说就是按照用户的最小时间统计即可select min_date,count(distinct user_id) as per_day_newfrom( select u...

2019-07-30 09:19:47 1716

原创 spark mongodb 踩坑 Caused by: java.io.InvalidClassException: com.mongodb.spark.rdd.MongoRDD

本人在工作中用到了部分spark-mongodb的整合,今天在测试环境测试的时候,提交任务到yarn出现问题,具体描述如:根据错误提示,搜索了很久,说class 序列号问题经过仔细排查后,发现,分析:client端类版本与server端不一致;线上环境的jar的版本和本次打jar包的环境的版本不一致导致的compile group: 'org.mongodb.spark', name: '...

2019-07-19 13:55:23 695

转载 spark sql 自适应 Adaptive Execution

一、Spark 目前现有的一些问题问题一:Shuffle partition数量没有达到最优在Spark SQL中,我们可以通过spark.sql.shuffle.partition来设置shuffle后的partition数量,默认值是200。shuffle partition的数量等同于下一Stage的Reduce Task的数量。因为shuffle的原因,这些Task处理的数据量残差不齐...

2019-07-17 21:43:23 502

原创 那些年用过的时间衰减函数

在我们日常的应用中时间衰减函数无处不在,比如避免出现犹太反应(强者愈强,弱者愈弱)各种排行榜;通常我们希望某些指标如文章热度、电影评分随着时间的漂移越来越低或者随着时间的衰减出现一个系数能拟合这一过程,比较出名的就是牛顿冷却定律。一、刚开始接触时间衰减,是在使用elasticsearch的过程中对function_score函数的使用function_score支持的衰减函数有三种,分别是 l...

2019-07-17 21:07:06 21376

原创 spark 排序实现原理 RangePartitioner

sprak Core中比较常用的排序方法sortBy和sortKByKey,这是一个shuffle类算法子,宽依赖,出发DAGSchedular划分Stage,那么他们排序的原理是啥呢?第一步Stage0:分区采样Sample,创建RangePartitioner,先对输入的数据的key做采样,来估算Key的分布,然后按照指定的排序切分range,尽量让每个partition对应的range里...

2019-07-17 18:23:38 1138 1

原创 spark core sortBy和sortByKey探索

感觉自己好久没有更新过博客了,本人最近有点儿迷失,特来写篇技术博客,以做自警不知道大家有没有注意到,大家在编写spark程序调用sortBy/sortByKey这两个算子的时候大家会不会有这样子的疑问,他们两个明明是transformation,为啥在执行的时候却触发了作业的执行呢?今天就和大家一起一探究竟? val wordCountRdd = spark.sparkContext.text...

2019-07-17 17:14:26 283

原创 spark streaming 应用程序 监控 邮件提醒

spark streaming应用程序,放到线上后,怎么监测spark streaming程序的阻塞状态, 虽然spark 提供了spark webUI去查看,但是作为开发人员总不能天天去看spark webUI页面吧, 去官网看,貌似可以通过请求spark 自带的jetty 服务器可以获取相关检测统计信息,http://host:8088/proxy/application_1517...

2018-03-05 15:15:31 3724

原创 Spark LDA

spark1.4版本的LDA原文比较简单,下面主要是以翻译官网为主。理论部分 LDA是一个主题模型,它能够推理出一个文本文档集合的主题。LDA可以认为是一个聚类算法,原因如下:主题对应聚类中心,文档对应数据集中的样本(数据行) 主题和文档都在一个特征空间中,其特征向量是词频向量 跟使用传统的距离来评估聚类不一样的是,LDA使用评估方式是一个函数,该函数基于文档如何生成的统计模型。 ...

2018-02-28 18:00:06 520

原创 Elasticsearch 优化

一、Elasticsearch查询过程Elasticsearch查询分两个阶段查询阶段和提取阶段 查询阶段 客户端向集群中的某个节点(假设节点1)发起查询请求,节点1会创建一个from+size大小的队列(from:偏移量,size:要取回的文档个数)。 节点1向集群中所有其他的分片(主或从)发起查询请求,每个分片也会创建一个from+size大小的队列,并将查询结果添加到队列中。 其...

2018-02-11 16:58:34 379

原创 spark 问题

spark on yarn 问题spark通过yarn提交任务,任务没跑就停止了,8088页面显示成功,点击history显示Shutdown hook called before final status was reported.查看了很久才发现,数据对接的数据格式改变了,我程序中有if(StringUtils.isBlank(difference)){ Sys

2018-01-31 15:38:30 1769

原创 文本自动摘要 -- textteaser

单文本摘要算法现在有很多 ,可以实现基于关键词的摘要方法,也有基于pagerank实现的textrank,也有textteaser 今天简单来介绍和实现textteaser摘要算法:统计指标:1)句子长度,长度为某个长度的句子为最理想的长度,依照距离这个长度的远近来打分。 2)句子位置,根据句子在全文中的位置,给出分数。(比如每段的第一句是核心句的比例大概是70%) 3)句子是否

2018-01-06 11:06:00 3539

原创 汉字转拼音 繁简转换 nlp-lang

项目中使用汉子转拼音,繁简体的相互转换繁简相互转换 String str = "點下面繁體字按鈕進行在線轉換" ; System.out.println(JianFan.f2j(str)); String str1 = "点下面繁体字按钮进行在线转换"; System.out.println(JianFan.j2f(str)```;结果:

2018-01-06 10:07:49 1408

原创 finger print 文本去重

任何一段信息文字,都可以对应一个不太长的随机数,作为区别它和其它信息的指纹(Fingerprint)。只要算法设计的好,任何两段信息的指纹都很难重复,就如同人类的指纹一样。信息指纹在加密、信息压缩和处理中有着广泛的应用。我们这里的做法是文章抽取特征词,压缩为md5指纹。利用这些指纹进行hash去重。广泛应用在。搜索结果推荐结果去重。String content = "卓尔防线继续伤筋动

2018-01-06 09:46:22 1230

转载 文本分类-fastText

一、FastText架构 1. fastText 原理fastText 方法包含三部分:模型架构、层次 Softmax 和 N-gram 特征。下面我们一一介绍。1.1 模型架构fastText 模型架构如下图所示。fastText 模型输入一个词的序列(一段文本或者一句话),输出这个词序列属于不同类别的概率。序列中的词和词组组成特征向量,特征向量通过线性变换映射到中间层,中间层再映射到标签。fa

2017-12-23 15:39:26 958

原创 java tf-idf提取关键字

最近在研究nlp,nlp第一步就是分词,目前开源的工具中,java的有中科院的分词工具nlpir、还有word分词器,ansj_seg等,python的比较火的jieba,ansj_seg5.x版本之后提供了提取关键字的方法,jieba也提供了提取关键字的方法。 提取关键字比较常用的算法有tf-idf、textrank。其中tf-idf是统计词频和逆文档词频,textrank是基于pagerank

2017-12-07 23:04:11 4143 2

原创 spark streaming 读取kafka 配置

使用SparkStreaming集成kafka时有几个比较重要的参数: (1)spark.streaming.stopGracefullyOnShutdown (true / false)默认fasle 确保在kill任务时,能够处理完最后一批数据,再关闭程序,不会发生强制kill导致数据处理中断,没处理完的数据丢失 (2)spark.streaming.backpressure.enable

2017-09-01 13:39:35 1331

原创 mongodb 分片集群 删除片键 code : 20 sharding already enabled for collection

mongodb分片集群指定片键删除了一个collection,再次为这个collection创建片键的时候,会报错{ "ok" : 0, "errmsg" : "sharding already enabled for collection test.test", "code" : 20}解决办法就是在config库里面删除这个collection相关的信息,主要涉及

2017-08-30 21:14:57 3432

原创 spark-sql like查询

当时遇到使用两个表,需要用到like的时候,建议使用map side join或者使用spark sql的broad cast joinsqlContext.sql( """ |select * from left A,right B where A.url like contact(B.url,'%') """.stripMargin) val importantBroad =

2017-08-27 14:53:47 8125

原创 spark-sql not in 优化

spark-sql支持not in val conf = new SparkConf().setAppName("spark_sql").setMaster("local[2]") val sc = SparkContext.getOrCreate(conf) val pathes = Array("","","") val sqlContext = new SQLCon

2017-08-27 14:23:33 3753

转载 java 获取kafka offsets(偏移量)

public KafkaInfoClient(){ init(); } public Map<Integer,Long> getEarliestOffset(String topic) { //kafka.api.OffsetRequest.EarliestTime() = -2 return getTopicOffset(topic,k

2017-08-27 14:08:05 10320 2

原创 spark streaming读取kafka 零丢失(三)

方式二: 方法二就是每次streaming 消费了kafka的数据后,将消费的kafka offsets更新到zookeeper。当你的程序挂掉或者升级的时候,就可以接着上次的读取,实现数据的令丢失和 at most once。而且使用checkpoint的方式可能会导致数据重复消费,spark streaming维护的offset和zookeeper维护的偏移量不同步导致数据丢失或者重复消费等。

2017-08-27 14:01:26 465

原创 spark streaming读取kafka数据令丢失(二)

方式二: 方法二就是每次streaming 消费了kafka的数据后,将消费的kafka offsets更新到zookeeper。当你的程序挂掉或者升级的时候,就可以接着上次的读取,实现数据的令丢失和 at most once。而且使用checkpoint的方式可能会导致数据重复消费,spark streaming维护的offset和zookeeper维护的偏移量不同步导致数据丢失或者重复消费等。

2017-08-27 13:58:28 1315

原创 spark-streaming 读取kafka数据不丢失(一)

spark streaming 读取kafka的时候,数据丢失是一个很大的问题,streaming 通过direct方式读取kafka,提供了checkpoint方式去自己维护读取kafka的offset,将数据放到hdfs。 方式: def main(args: Array[String]) { def func(): StreamingContext ={ val conf

2017-08-27 13:56:31 776

原创 spark-streaming 读取kafka的方式

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了。 一、基于Receiver的方式这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。recei

2017-08-27 13:19:48 370

原创 java 正则去除中文标点符号

在作文本分析,尤其是分词的时候,我们需要把不需要的标点符号去除,防止在词转向量的时候,把中文符号添加进去。"[\\pP+~$`^=|<>~`$^+=|<>¥×]""[\\p{P}+~$`^=|<>~`$^+=|<>¥×]"我使用的是ansj分词器,其5.x版本有提供Recognition接口,用于在分词的时候剔除不需要的,比如剔除停用词、标点符号,根据词性剔除一批词,也支持正则表达式,很强大。

2017-08-20 11:03:38 3648

原创 kafka各版本差异

kafka-0.8.2 新特性   producer不再区分同步(sync)和异步方式(async),所有的请求以异步方式发送,这样提升了客户端效率。producer请求会返回一个应答对象,包括偏移量或者错误信。这种异步方地批量的发送消息到kafka broker节点,因而可以减少server端资源的开销。新的producer和所有的服务器网络通信都是异步地,在ack=-1模式下需要等待所有的re

2017-08-04 15:42:41 3921

原创 kafka 安装配置

一、Zookeeper的安装与配置1.安装解压安装文件到zookeepertar -zxf zookeeper-3.4.10.tar.gz 重命名sudo mv zookeeper-3.4.10 /usr/local/ 2. 配置创建数据目录tmp/datamkdir -p /var/data/zookeeper/datasudo chmod 777 /var/data/zookeeper/

2017-07-25 10:35:40 371

原创 Json字符串转Java Bean

最近在项目里面需要从kafka推送的数据读取数据,spark streaming处理,由于推过来的是Json字符串,需要转换成dataFrame做进一步处理,但是Json字符串字段很多,而且还不固定;我想转换代码如下: val NewsDF = sqlContext.createDataFrame(NewsRdd,classOf[News])但是怎么吧json字符串转换成JavaBean呢

2017-07-25 10:30:57 1046

原创 spark java.lang.OutOfMemory, unable to create new native thread

如果出现如下错误: java.lang.OutOfMemory, unable to create new native thread Caused by: java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.

2017-06-22 14:11:31 1193

转载 spark内存管理模块

Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优。本文旨在梳理出 Spark 内存管理的脉络,抛砖引玉,引出读者对这个话题的深入探讨。本文中阐述的原理基于 Spark 2.1 版本,阅读本文需要读者有一定的 Spark 和 Java 基础,了解 RDD、Shuffl

2017-05-05 13:05:38 691

原创 spark on yarn

1、spark-default export SPARK_LOCAL_DIRS=/home/hadoop/spark/tmp export SPARK_HOME=/usr/install/spark2、spark-env//This requires spark.shuffle.service.enabled to be set. The following //configurations a

2017-05-04 08:13:57 420

原创 hadoop安装配置

这个是很早之前写的一个配置文档了,最近总有人管我要,现在发出来,有些地方可能需要修改。仅供参考。 1.配置Hadoop环境变量 1.1展开hadoop安装包 在master解压主目录中的hadoop压缩包 sudo tar -zxf hadoop-2.6.5.tar.gz sudo mv hadoop-2.6.5 hadoop sudo chmod -R +x ~/hadoop 2.

2017-05-04 08:11:39 280

基于C++的定时关机的实现

基于C++的定时关机的实现

2015-03-30

C++实现的简单投票系统

基于C++实现的简单的投票系统 基于C++实现的简单的投票系统 代码简单易懂 希望能够给大家提供学习的资料

2015-03-30

VC显示行号

VC++6.0 编译器  显示行号的插件

2015-03-30

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除