自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(104)
  • 收藏
  • 关注

原创 浅尝辄止:数据库--数仓&大数据平台--数据中台

大数据思路,领导团队

2022-11-25 14:21:20 1244 1

原创 hive优化到底应该怎么优化(面试必备)

hive优化

2022-08-12 15:03:43 757

原创 Flink知识总结(汇总)

一、Flink cdc1.简介1.1 CDC种类FlinkCDC,简单了解下Change Data Capture(变更数据获取)的概念:核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费CDC的种类:主要分为基于查询和基于Binlog两种方式,区别:针对Binlog的CDC,有如下的区别分析:1.2 FlinkCDC 版本Flink自然也不甘示弱,FlinkCDC应运而生,通过

2022-04-20 18:31:43 2795

转载 数仓指标体系

什么是指标#数据团队出具的一个统计报表字段即为指标。比如最近七天的订单量,一个促销活动的购买转化率等等。一个指标具体到计算实施,主要有以下几部分组成指标加工逻辑,比如count ,sum, avg维度 比如按部门、地域进行指标统计,对应sql中的group by业务限定/修饰词 比如以不同的支付渠道来算对应的指标,微信支付的订单退款率,支付宝支付的订单退款率 。对应sql中的where除此之外,指标本身还可以衍生、派生出更多的指标,基于这些特点,可以将指标进行分类原子指标

2021-08-17 10:18:54 2653 2

原创 MongoDB与大数据平台的整合(Hive、Spark)

一、 mongodb与hive整合1.拷贝jar包到hive的lib目录2.在hive中创建表,关联mongodbcreate table individuals(id int,name string,age int,work STRUCT<title:string,hours:int>)stored by "com.mongodb.hadoop.hive.MongoStorageHandler"with serdeproperties('mongo.columns.map

2021-03-16 15:02:38 730

原创 MongoDB的安装与操作使用

一、MongoDB1.MongoDB是一个面向文档的基于分布式的NoSQL数据库存储格式是类似于Json格式(BSON)2.特点高性能易部署易使用存储数据非常方便3.数据模型单个集合中的文档不必具有相同的字段,集合的不同文档的字段的数据类型可能有所不同模式验证:可以指定Json模式,插入或更新数据的时候,会按该模式进行验证上限集合:可以指定MongoDB的集合大小,比如指定1G,当数据达到1G,集合大小不会再增加二、安装1.配置yum源cd /etc/yum.repos.

2021-03-16 14:33:21 187

原创 IDEA+MAVEN 打jar包

目录一、分类二、胖包三、瘦包一、分类jar包是分为胖包和瘦包,何为胖包,何为瘦包首先胖包指的是带依赖的jar包瘦包就是没有依赖的jar包二、胖包1.在pom.xml添加如下Maven插件:<build> <plugins> <!--java打包插件--> <plugin> <groupId>org.apache.maven.plugins&

2021-03-16 14:01:43 372

原创 结业考试总结

几个小技巧1.collect_set(Hive中collect相关的函数有collect_list和collect_set。它们都是将分组中的某列转为一个数组返回,不同的是collect_list不去重而collect_set去重。)select student_id,score,concat_ws(",",collect_set(question_id))from ex_exam_record group by student_id,score,question_id==>001

2021-01-21 22:26:41 253

原创 hbase与hive的集成(2)

前言:数据写入到hbase后,可以在利用hbase和hive的集成,把数据同步到hive中进行相关的操作,hive查询更加方便。代码如下://动态分区hive> set hive.exec.dynamic.partition=true; hive> set hive.exec.dynamic.partition.mode=nostrict;hive> set hive.auto.convert.join=false;//创建数据库create database eve.

2021-01-12 19:19:03 83

原创 设计模式(附上详细的案列分析)

1.上篇博客有讲述从flume把数据写到Kafka的内容,但是当我们有多次重复代码要使用的时候,这个时候可以引入一种设计模式,把相同的代码通过接口的方式来封装起来,这样以后修改的时候,只需调用该接口实现我们需要的逻辑方法,即可2.这里我们引用上次博客的代码来做案列分析实现从Kafka导入到hbase中:public class userAttendTohb { public static void main(String[] args) { //Kafka消费端属性 第一部分

2021-01-12 19:06:48 117

原创 Hive与HBase集成

目录1、Hive与HBase集成使用场景将HBase作为Hive数据源,让HBase支持类SQL操作 - 将Hive ETL数据存入HBase,便于快速查询 - 构建低延时的数据仓库,利用HBase快速读写能力,实现数据实时查询 2、Hive与HBase集成原理通过两者本身对外的API接口互相通信完成,由Hive的lib目录中hive-hbase-handler-*.jar工具类实现- HBaseStorageHandler:实现Hive与HBase集成的类,对HiveStorage

2021-01-12 18:37:27 230

原创 实现数据从flume到Kafka,在从Kafka读取到hbase中的案列分析

1.添加依赖 <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apac

2021-01-06 20:10:57 281

原创 Flink之窗口函数以及watermark详解

1 Time与Window1.1 Flink中涉及的时间- **Event Time**:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每⼀条⽇志都会记录⾃己的生成时间,Flink通过时间戳分配器访问事件时间戳。- **Ingestion Time**:是数据进入Flink的时间。- **Processing Time**:是每⼀个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。 ## 1.2 Window1.2.1 Win

2021-01-05 19:36:46 705

原创 Flink DataStream常用算子

Flink中的算子是将一个或多个DataStream转换为新的DataStream,可以将多个转换组合成复杂的数据流拓扑。在Flink中,有多种不同的DataStream类型,他们之间是使用各种算子进行的。如下图所示: 以下列举下常用的算子,用到的代码例子都是Flink监听9000端口做为数据源。以下方法可以启动一个9000的socket端口服务。Linux平台上可以使用bashnc -lk 9000如果是 Windows 平台,可以通过 安装 ncat 然后运行:bashncat -

2020-12-31 17:16:12 182

原创 spark streaming 之window窗口操作

窗口函数,就是在DStream流上,以一个可配置的长度为窗口,以一个可配置的速率向前移动窗口,根据窗口函数的具体内容,分别对当前窗口中的这一波数据采取某个对应的操作算子。需要注意的是窗口长度,和窗口移动速率需要是batch time的整数倍。1.window(windowLength, slideInterval)该操作由一个DStream对象调用,传入一个窗口长度参数,一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream。object SparkWindowDe

2020-12-23 19:04:29 328 1

原创 spark streaming(实时流词频统计)

首先在idea里导入maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId&

2020-12-21 20:07:54 1636

原创 Scala之Array的方法

#Scala之Array的方法Scala:Array(集合、序列)数组是一种可变的、可索引的数据集合。在Scala中用Array[T]的形式来表示Java中的数组形式 T[]。val numbers = Array(1, 2, 3, 4) //声明一个数组对象val first = numbers(0) // 读取第一个元素numbers(3) = 100 // 替换第四个元素为100val biggerNumbers = numbers.map(_ * 2) // 所有元素乘2Scala提

2020-12-21 18:55:55 425

原创 flume+kafka+kafka stream 整合

1.使用背景先说一下,为什么要使用 Flume + Kafka?以实时流处理项目为例,由于采集的数据量可能存在峰值和峰谷,假设是一个电商项目,那么峰值通常出现在秒杀时,这时如果直接将 Flume 聚合后的数据输入到 Storm 等分布式计算框架中,可能就会超过集群的处理能力,这时采用 Kafka 就可以起到削峰的作用。Kafka 天生为大数据场景而设计,具有高吞吐的特性,能很好地抗住峰值数据的冲击。2.整合流程启动Zookeeper和Kafka这里启动一个单节点的 Kafka 作为测试:#

2020-12-16 19:48:55 149

原创 Kafka Stream 总结与使用

一. KafKa Streams简介功能强大高拓展性,弹性,容错有状态和无状态处理基于事件时间的Window,Join,Aggergations轻量级无需专门的集群没有外部以来一个库,而不是框架完全集成100%的Kafka 0.10.0版本兼容易于集成到现有的应用程序程序部署无需手工处理(这个指的应该是Kafka多分区机制对Kafka Streams多实例的自动匹配)实时性毫秒级延迟并非微批处理窗口允许乱序数据允许迟到数据虽然Kafka Streams只是

2020-12-15 19:57:45 2282 2

原创 hive的高级查询练习

一.每个店铺top3(分组求topN)题目:有50w个京东店铺,每个顾客访问任何一个店铺的任何一个商品时,都会产生一条访问日志,访问日志存储的表名为visit,访客用户id为user_id,被访问的店铺名称为shop。数据部分内容如下:u1 au2 bu1 bu1 au3 cu4 bu1 au2 cu5 bu4 bu6 cu2 cu1 au2 au2 au3 au5 a需求:1.每个店铺UV(访客数)去重select shop, count( dsitinct u

2020-12-14 16:36:11 364

原创 hive的四种排序以及窗口函数的使用

1.order by:对数据进行全局排序,因此只有一个Reducer(多个Reducer无法保证全局有序),然而只有一个Reducer,会导致当输入规模较大时,消耗较长的计算时间2.sort by:局部排序,一般要设置下reduces个数,默认是-1,当个数为1,相当于order by,一般设置为大于1,保证每个reducer的输出有序,并不保证全局有序。如果运行过慢,可以设置为本地模式:set hive.exec.mode.local.auto=true;但是需要设置reduces个数为

2020-12-11 15:31:02 1097

原创 hadoop中数据的导入导出整理 --方便自己使用整理

本篇文章绝对干货,都是自己使用后整理出来的,图有点丑,但很利于大家的记忆,话不多说,上图1.本地文件上传到hdfshdfs dfs -put '本地路径下的文件名' 'hdfs文件存放路径'2.从hdfs里下载文件hdfs dfs -get 'hdfs下的文件名' '本地文件路径'3.本地文件导入(加载)到hive中local:本地路径,不会移动数据into:追加数据overwrite:覆盖数据load data local inpath '本地文件路径' into table hi

2020-12-10 14:25:34 2352

原创 hive中分桶、视图、侧视图的用法

目录1.分桶2.视图3.侧视图(lateral view)1.分桶首先明确一点:分区对应的是文件夹分桶对应的是文件分桶对应于HDFS中的文件更高的查询处理效率使抽样(sampling)更高效根据“桶列”的哈希函数将数据进行分桶1.创建一个分桶表,关键字是clusteredcreate table if not exists hr_bucket(name string,id int,num string,time2 string )clustered by (id) int

2020-12-10 14:03:22 401

原创 hive中创建内外部表以及如何分区使用(超详细)

目录一.hive表的分类1.内部表2.外部表3.内外部表的区别二hive表的分区使用1.静态分区1.单级分区2.多级分区2.动态分区3.静态分区和动态分区的区别首先,我们有一个路径为/opt/software/employee_id.txt的文档,内容如下:Michael|100|Montreal,Toronto|Male,30|DB:80|Product:DeveloperLeadWill|101|Montreal|Male,35|Perl:85|Product:Lead,Test:LeadSte

2020-12-09 18:47:36 2415

原创 三分钟教会你如何使用IDEA操作Kafka创建生产者消费者(有详细案列)

目录一、Kafka生产者详解1.生产者发送消息的过程2.创建生产者二.消费者详解1.消费者和消费者群组2.创建消费者一、Kafka生产者详解1.生产者发送消息的过程首先介绍一下 Kafka 生产者发送消息的过程:Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。接下来,数据被传

2020-12-07 17:40:27 4482

原创 消息中间件以及Kafka的安装和使用

目录一.消息中间件1.概念2.为什么使用3.主要消息中间件的对比二.kafka1.安装2.名词解释:3.操作一.消息中间件1.概念关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。2.为什么使用业务解耦,代码解耦:例如,订单业务需要发送短信、推送app通知信息、扣除会员积分、发送email等,如果把这些全部写在订单业务代码中,订单代码将会变得十分臃肿,不利于修改维护,事物管理十分麻烦,使用中间件就不会有上述问题同步变异步,加快业务响应时间,相对于RPC来说,异步通信使得生产

2020-12-02 19:09:55 277

原创 flume日志采集总结+实战分析+自定义拦截器讲解(全)

1.安装至官方网站下载flume的安装包,然后上传到moba解压后,修改conf下的文件名:mv flume-env.sh.template flume-env.sh打开文件,修改如下配置:export JAVA_HOME=/opt/jdk1.8.0_221export JAVA_OPTS="-Xms2048m -Xmx2048m -Dcom.sun.management.jmxremote"安装其他插件yum install -y nc //瑞士军刀称呼yum list telnet*

2020-12-01 19:36:23 2083 1

原创 基于Spark GraphX 的图形数据分析

目录为什么需要图计算一.图(Graph)的基本概念二.Spark GraphX 简介三.GraphX API1.属性图应用示例-12.属性图应用示例-23.查看图信息4.图的算子5.GraphX API 应用6.练习一:谁是网络红人?7.PageRank in GraphX8.练习2:PageRank应用9.Pregel概述10.练习3:使用Pregel计算单源最短路径为什么需要图计算许多大数据以大规模图或网络的形式呈现- 许多非图结构的大数据,常会被转换为图模型进行分析- 图数据结构很好地表达了数据

2020-11-25 18:39:27 288

原创 spark高级操作之json文件的分析处理

步骤一,上传一篇json文件到hdfs,文件内容可以用百度搜素:json文件在线解析 如下:hdfs dfs -put /opt/kb09file/op.log /kb09file步骤二:,分析json文件,按照“|”切割,分成两部分步骤三:对jsonStrRDD的第二部分进行截取(substring)操作val jsonRDD=jsonStrRDD.map(x=>{var jsonStr=x._2;jsonStr=jsonStr.substring(0,jsonStr.length-

2020-11-23 14:09:54 856

原创 Spark高级操作之json复杂和嵌套数据结构的操作

一,基本介绍本文主要讲spark2.0版本以后存在的Sparksql的一些实用的函数,帮助解决复杂嵌套的json数据格式,比如,map和嵌套结构。Spark2.1在spark 的Structured Streaming也可以使用这些功能函数。下面几个是本文重点要讲的方法。A),get_json_object()B),from_json()C),to_json()D),explode()E),selectExpr()二,准备阶段首先,创建一个没有任何嵌套的JSon Schemaimport

2020-11-19 13:53:22 674

原创 Spark数据分析及处理(实战分析)

项目需求:使用Spark完成下列日志分析项目需求:1.日志数据清洗2.用户留存分析1.数据清洗读入日志文件并转化为RDD[Row]类型按照Tab切割数据过滤掉字段数量少于8个的对数据进行清洗按照第一列和第二列对数据进行去重过滤掉状态码非200过滤掉event_time为空的数据将url按照”&”以及”=”切割保存数据将数据写入mysql表中代码分析如下:日志如下图片,分析日志,处理需求 //spark操作外部数据 val spark = Sp

2020-11-18 19:23:34 14086 2

原创 Spark SQL如何读取外部数据源

目录1.hive表2.MySQL表3.spark的内置函数4.spark的UDF函数5.Spark SQL CLI1.hive表案列分析: //spark连接hive val spark = SparkSession.builder() .master("local[*]") .config("hive.metastore.uris","thrift://192.168.195.20:9083") .enableHiveSupport()

2020-11-16 19:36:14 424

原创 基于RDD的Spark应用程序开发案列讲解(词频统计)

步骤一:在电脑D盘上创建一个文件a.txt,内容如下:hello javahello sparkhell0 scalahello rqmspark hi步骤二:在IDEA里创建Scala工程,并做好词频统计,输出val rdd=sc.textFile("D:\\a.txt)rdd.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_)步骤三:先提前写好路径,在resource创建文件夹info.properties,写好

2020-11-12 17:39:10 651

原创 用spark装载文件数据源以及使用spark sql操作数据

1、装载CSV数据源文件预览使用sparksession方法 val conf = new SparkConf().setMaster("local[2]").setAppName("Demo")val session = SparkSession.builder().config(conf).getOrCreate() //固定格式,写死的方法 val df = session.read.format("csv").option("header", "true"). load

2020-11-12 17:09:59 124

原创 Apache Spark分布式计算原理(超详细)

Spark WordCount运行原理一、RDD的依赖关系1.窄依赖窄依赖就是指父RDD的每个分区只被一个子RDD分区使用,子RDD分区通常只对应常数个父RDD分区,如下图所示【其中每个小方块代表一个RDD Partition】窄依赖有分为两种:一种是一对一的依赖,即OneToOneDependency还有一个是范围的依赖,即RangeDependency,它仅仅被org.apache.spark.rdd.UnionRDD使用。UnionRDD是把多个RDD合成一个RDD,这些RDD是被拼接

2020-11-10 19:26:40 904

原创 RDD分区与算子详解

目录一、RDD分区二、RDD的操作1.转换算子2.动作算子一、RDD分区分区是RDD被拆分并发送到节点的不同块之一我们拥有的分区越多,得到的并行性就越强每个分区都是被分发到不同Worker Node的候选者每个分区对应一个Task二、RDD的操作分为lazy与non-lazy两种Transformation(lazy):也称转换操作、转换算子Actions(non-lazy):立即执行,也称动作操作、动作算子1.转换算子对于转换操作,RDD的所有转换都不会直接计算结果仅记

2020-11-04 16:00:37 493

原创 在IDEA中添加Scala工程以及spark-RDD详解

使用maven来创建一个spark工程,使用的开发工具为IDEA1.创建好一个maven工程,导入依赖包<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> &

2020-11-03 15:05:48 1950 1

原创 Maven工程使用大全(很详细)

什么是Maven?如今我们构建一个项目需要用到很多第三方的类库,如写一个使用Spring的Web项目就需要引入大量的jar包。一个项目Jar包的数量之多往往让我们瞠目结舌,并且Jar包之间的关系错综复杂,一个Jar包往往又会引用其他Jar包,缺少任何一个Jar包都会导致项目编译失败。以往开发项目时,程序员往往需要花较多的精力在引用Jar包搭建项目环境上,而这一项工作尤为艰难,少一个Jar包、多一个Jar包往往会报一些让人摸不着头脑的异常。而Maven就是一款帮助程序员构建项目的工具,我们只需要告诉Ma

2020-11-03 14:15:44 585

原创 spark配置安装教程

一、spark介绍1.为什么使用Spark1.MapReduce编程模型的局限性繁杂只有Map和Reduce两个操作,复杂的逻辑需要大量的样板代码处理效率低:Map中间结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据任务调度与启动开销大不适合迭代处理、交互式处理和流式处理2.Spark是类Hadoop MapReduce的通用并行框架Job中间输出结果可以保存在内存,不再需要读写HDFS比MapReduce平均快10倍以上2.spark简介诞生于

2020-10-30 14:29:10 415 1

原创 Scala的高阶函数(2)

1.collect:通过执行一个并行计算(偏函数),得到一个新的数组对象val chars = Array('a','b','c') val newchars = chars.collect(fun) println("newchars:"+newchars.mkString(",")) //我们通过下面的偏函数,把chars数组的小写a转换为大写的A val fun:PartialFunction[Char,Char] = { case 'a' => 'A' c

2020-10-27 15:30:25 358

空空如也

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除