自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(103)
  • 收藏
  • 关注

原创 spark 读 http 的 API 接口,将读出的JSON字符串,直接转成 DataFrame

【代码】spark 读 http 的 API 接口,将读出的JSON字符串,直接转成 DataFrame。

2023-08-11 10:04:24 345

原创 实测解决 flink cdc mysql 时间字段差8小时/差13小时问题

实测解决 flink cdc mysql 时间字段差8小时/差13小时问题

2022-10-18 14:08:39 4110 15

原创 gp 索引列表查询,表字段类型,字段注释查询

--所有索引列表SELECT n.nspname as "Schema", c.relname as "Name", c2.relname as "Table"FROM pg_catalog.pg_class c JOIN pg_catalog.pg_roles r ON r.oid = c.relowner LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace LEFT JOIN

2022-03-09 11:40:15 1764

原创 Mysql binlog日志读出来的时间差8小时问题解决方案

//timeStamp = 2022-02-16T12:27:20.828524Z String zonedDateTime = LocalDateTime.parse(timeStamp.replace("Z", "")) .atZone(ZoneId.of("UTC"))//设置老时区 .withZoneSameInstant(ZoneId.of("Asia/Shanghai"))//设置新时区

2022-02-16 14:49:37 1449

原创 flink 自定义带超时时间的计数窗口触发器

自定义带超时时间的计数窗口触发器import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.common.state.ReducingState;import org.apache.flink.api.common.state.ReducingStateDescriptor;import org.apache.flink.api.common.typeutils.base.LongSe

2022-02-11 15:40:07 1778

原创 yarn的WEB页面上不能看log日志

要是yarn的WEB页面上不能看。 就直接 yarn logs -applicationId applicationid_XXX_XX > XXX_XX.txt

2021-12-29 16:52:19 1034

原创 greenplum主键存在就不插入的SQL

该sql当id=1的数据已存在时,就不会插入数据,如果不存在就正常插入INSERT INTO ods.ods_iot_project(id, project_id, status) SELECT 1,456,456 FROM (SELECT 1) tmp WHERE NOT EXISTS ( SELECT 1 FROM ods.ods_iot_project WHERE id=1 );...

2021-12-22 10:51:36 398

原创 flink 通过事务和checkpoint的共同提交,保证EXACTLY_ONCE

获得gp的数据库连接池,使用连接池,不要使用单连接package com.ysservice.dataStreamApi.utils;import com.alibaba.druid.pool.DruidDataSourceFactory;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;import java.util.Properties;/** * @Descripti

2021-11-17 15:37:13 1189

原创 flink使用countWindow,往gp中批量写数据,并自定义超时触发器

设置超时触发时间类型为ProcessingTime env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);主流程代码dataStream .keyBy(r -> r.f1)//按表名进行keyBy,避免同一张表的数据传入出现先后问题 .timeWindow(Time.of(10, TimeUnit.SECONDS))//超时时间10秒 .trigger(new CountTri

2021-11-15 15:48:03 1929 2

原创 flink jdbc写gp,以多value的形式,快速写入

package com.ysservice.dataStreamApi.sink;import com.ysservice.dataStreamApi.utils.GreenplumUtil;import com.ysservice.dataStreamApi.utils.RegexUtils;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;impo

2021-11-12 19:53:06 2035

原创 flink通过checkpoint的完成信号,提交jdbc sql,保证数据一致性

package com.ysservice.dataStreamApi.sink;import com.ysservice.dataStreamApi.utils.GreenplumUtil;import com.ysservice.dataStreamApi.utils.RegexUtils;import org.apache.flink.api.common.state.CheckpointListener;import org.apache.flink.api.java.tuple.Tupl

2021-11-11 15:27:19 1847

原创 flink cdc 2.0.0 sql 开发模板,及踩坑记录

flink cdc sql 开发模板flink cdc sql 读mysql的binlog日志,实时同步到mysql开发模板使用flink cdc前提条件:读取目标库的用户必须开启binlog权限package com.ysservice; import com.typesafe.config.Config;import com.typesafe.config.ConfigFactory;import com.ysservice.utils.MyCheckpoint;import com.

2021-10-29 15:56:02 3317 6

原创 flink cdc 将binlog日志反序列化成sql语句

package com.ysservice.streaming.source;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;import com.ysservice.streaming.utils.MysqlUtil;import com.ysservice.streaming.utils.SystemConstants;import io.debezium.data.Envelope;import

2021-10-18 17:13:37 723 3

原创 flink table api 有界无界窗口的使用

Table aggVin = inputTable.window(Over.partitionBy("vin").orderBy("rt").preceding("1.rows").as("w")) .select("vin,rt,carTime,min(carTime) over w as min_carTime") .select("vin,rt,carTime,min_carTime,(carTime - min_carT...

2021-08-21 00:17:06 325

原创 搜索ES内的数据

GET /charge_slice_computer/_search{ "sort": [ { "charge_start_time": { "order": "desc" } } ]}

2020-12-02 11:33:02 129

原创 配置文件获取方法

private val config = ConfigFactory.load() lazy val cassConfig = config.getConfig("mysql") lazy val mysqlUrl = cassConfig.getString("mysql.url") lazy val username = cassConfig.getString("mysql.username") lazy val password = cassConfig.getString("m.

2020-11-30 14:05:51 7611

原创 统一列的字段类型为:StringType

bigDF.select(bigDF.columns.map(column => { bigDF.col(column).cast(StringType) }): _*).na.fill("")

2020-11-30 13:53:33 347

原创 Spark on Yarn详解

Spark on Yarn详解Spark 可以跑在很多集群上,比如跑在local上,跑在Standalone上,跑在Apache Mesos上,跑在Hadoop YARN上等等。不管你Spark跑在什么上面,它的代码都是一样的,区别只是–master的时候不一样。其中Spark on YARN是工作中或生产上用的非常多的一种运行模式。今天主要对Spark on Yarn 这种方式做讲解。yarn模式两种提交任务方式Spark可以和Yarn整合,将Application提交到Yarn上运行,Yarn有两

2020-11-03 21:18:01 4861

原创 spark-streaming_实时写入hbase

当处理实时数据是聚合类的运算是,可以写入到mysql中,因为数据量不大,但如果是非聚合类的数据,mysql中存放不下,此时可以使用支持覆盖写入或事务的大型数据库,例如:hbase,ES,clickhousehbase在写入数据时如果行键相同的数据写进来就会覆盖原始数据,所以当我们在运算时将每条数据赋予唯一的行键(例如:订单号,或者设备号加时间戳),即使一批数据写入到一半时中断了,重新写入时会覆盖之前的值,保证了数据的幂等性在写入数据时,记偏移量,可以将偏移量和数据记在同一行不同的列族中(hbase支持行

2020-11-02 21:13:05 696

原创 redis介绍

RedisNosql技术1redis介绍1.1什么是NoSqlNoSql是为了解决高并发、高可扩展、高可用以及高写入而产生的数据库解决方案。NoSql就是Not Only sql。Nosql是非关系型数据库,它是关系型数据库的良好补充,而不能替代关系型数据库。1.2Nosql数据库分类(了解)键值(Key-Value)存储数据库相关产品: Tokyo Cabinet/Tyrant、Redis、Voldemort、Berkeley DB典型应用: 内容缓存,主要用于处理大量数据的高访问负载

2020-11-02 20:35:12 312

原创 sparkstreaming对接kafka

sparkstreaming注意要点sparkstreaming是一个微对比准实时的计算框架flink与sparkstreaming不同的是,sparkstreaming中的task执行完之后就会被释放掉,而flink不会,进去之后就不会释放,可以重复使用,spark的task是临时工,flink的是合同工sparkstreaming执行程序时构建完rdd需要开启开启任务并挂起任务才会开始执行程序和一直在后台挂起ssc.startssc.awaitTermination()sparkstream

2020-11-01 00:59:44 325

原创 Spark Streaming简介及原理

Spark Streaming简介及原理简介:SparkStreaming是一套框架。SparkStreaming是Spark核心API的一个扩展,可以实现高吞吐量的,具备容错机制的实时流数据处理。支持多种数据源获取数据:Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,进行处理后,处理结构保存在HDFS、DataBase等各种地方。Dashboards:图形监控界面,Spark Streaming可以输出到前端的监控页面上。*使用的最多的是kafk

2020-11-01 00:15:48 686

原创 kafka快速入门

Kafka快速⼊⻔1.Kafka的特点1.解耦:  允许你独⽴的扩展或修改两边的处理过程,只要确保它们遵守同样的接⼝约束。2.冗余:  消息队列把数据进⾏持久化直到它们已经被完全处理,通过这⼀⽅式规避了数据丢失⻛险。许多消息队列所采⽤的"插⼊-获取-删除"范式中,在把⼀个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从⽽确保你的数据被安全的保存直到你使⽤完毕。3.扩展性:  因为消息队列解耦了你的处理过程,所以增⼤消息⼊队和处理的频率是很容易的,只要另外增加处理过

2020-10-30 21:31:15 1151

原创 Hive 的 TRANSFORM 自定义函数

Hive 的 TRANSFORM 关键字提供了在 SQL 中调用自写脚本的功能。适合实现 Hive 中没有的 功能 又不想写 UDF 的情况,脚本一般都是python写的。Json 数据: {"movie":"1193","rate":"5","timeStamp":"978300760","uid":"1"} 需求:把json的字段timeStamp转换为日期编号。 1、先加载 rating.json 文件到 hive 的一个原始表 rate_json create table rate_js

2020-10-28 23:29:25 202

转载 hive使用动态分区插入数据详解

往hive分区表中插入数据时,如果需要创建的分区很多,比如以表中某个字段进行分区存储,则需要复制粘贴修改很多sql去执行,效率低。因为hive是批处理系统,所以hive提供了一个动态分区功能,其可以基于查询参数的位置去推断分区的名称,从而建立分区。1.创建一个单一字段分区表hive> create table dpartition(id int ,name string ) partitioned by(ct string );2.往表里装载数据,并且动态建立分区,以city建立

2020-10-23 18:20:36 1571 1

原创 大数据之spark_spark整合hive

spark整合hive1.安装MySQL并创建一个普通用户,并且授权set global validate_password_policy=0;set global validate_password_length=1; -- 这个两个设置以后 密码很简单不会报错CREATE USER 'hive'@'%' IDENTIFIED BY '123456'; GRANT ALL PRIVILEGES ON hivedb.* TO 'hive'@'%' IDENTIFIED BY '123456' W

2020-10-05 11:59:38 231

原创 大数据之spark_spark sql的自定义函数

用户自定义函数UDF:输入一行返回一行UDTF:输入一行返回多行,类似于flatmapUDAF:输入多行返回一行,类似于聚合函数用户自定义UDF函数UDF案例11.在sql语句中根据ip查询省市地址package com.doit.spark.day11import com.doit.spark.day11.Utils.ip2Longimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDim

2020-10-04 23:56:32 655

原创 大数据之spark_spark SQL案例

连续登录三天的用户案例:数据uid,dtguid01,2018-02-28guid01,2018-03-01guid01,2018-03-01guid01,2018-03-05guid01,2018-03-02guid01,2018-03-04guid01,2018-03-06guid01,2018-03-07guid02,2018-03-01guid02,2018-03-03guid02,2018-03-02guid02,2018-03-06SQL代码实现package c

2020-10-04 00:12:31 277

原创 大数据之spark_spark SQL的建表语句

1.通过RDD+case class创建DataFramepackage com.doit.spark.day10import org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, SparkSession}object CaseClassCreateDataFrame { def main(args: Array[String]): Unit = { //构建SparkSession val spar

2020-10-03 22:56:53 3170

原创 大数据之spark_spark SQL的简介

Spark SQL概述Spark SQL定义Spark SQL是Spark用来处理结构化数据的一个模块什么是DataFrames与RDD类似,DataFrame也是一个分布式数据容器[抽象的]。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加

2020-09-29 00:03:12 209

原创 大数据之spark_spark的深入理解

Spark执行流程提交任务spark-submit --master spark://node-1.51doit.cn:7070 --executor-memory 1g --total-executor-cores 4 --class cn.51doit.spark.WordCount /root/wc.jar hdfs://node-1.51doit.cn:9000/wc hdfs://node-1.51doit.cn:9000/out0spark任务执行模式Client模式(默认)Dirv

2020-09-28 22:55:45 450

原创 大数据之spark_累加器

累加器用来统计数据条数,可以统计总条数,也可以统计指定条件筛选后的数据条数,例如:处理日志数据时,出现的脏数据条数,如果我们想返回所有的脏数据并返回有用的数据,那么我们需要触发两次Job才能做到,如果用了累加器则不用触发两次Job累加器它是先在每个Task中进行累加,返回Driver端时再整体累加所有Task中的累加器,累加器在每个Task上是独立的(class 修饰的),这样可以避免线程安全问题运用累加器时,实际上就是一个闭包,它定义在Driver端,通过Task发送到Executor端,运用在每个

2020-09-28 22:27:07 346

原创 大数据之spark_spark中的线程安全问题

Task多线程问题问题原因一个Executor是一个进程,一个进程中可以同时运行多个线程Task,如果多个Task使用了共享的成员变量,就会出现线程不安全的问题案例需求使用spark将日期字符串转换成long类型时间戳样例数据2019-11-06 15:59:502019-11-06 15:59:512019-11-06 15:59:522019-11-06 15:59:532019-11-06 15:59:542019-11-06 15:59:552019-11-06 15:59

2020-09-27 23:53:31 878

原创 大数据之spark_spark中的序列化问题详解

Task序列化问题问题现象本质原因1.封装数据的bean没有实现序列化(报错图1)封装数据的bean需要写入本地磁盘和在网络中传输,没有实现序列化出出错2.rdd的算子中传入的函数用到了外部引用类型(闭包),且没有实现序列化(报错图2)本质原因的Task是在Driver端生成的,函数也是在Driver端定义的,但是真正执行是需要将Task和Task中引用都序列化,如果有没法序列化的,就会出现Task没有序列化异常Task在底层有实现序列化接口的,所以Task可以被传输到Executor端,

2020-09-27 21:59:42 841 2

原创 大数据之spark_自定义排序

自定义排序使用rdd的sortBy或sortByKey方法进行排序,实现根据需求实现灵活的排序规则,在给定的数据中,有三个字段,分别是:姓名,年龄,颜值。先按照颜值的从高到低进行排序,如果颜值相等,再安装年龄的升序排序,如果不指定排序规则,直接对Bean排序:sortBy(x => x)会报错数据name,age,fvtaoge,30,99.99xingge,18,9999.99hangge,29,99.99错误示范class BoyBean (val name :String,var

2020-09-27 21:12:57 471

原创 大数据之spark_spark的广播变量及ip地址案例分析

广播变量广播变量通常是为了实现mapside join,可以将Driver端的数据广播到属于该application的Executor,然后通过Driver广播变量返回的引用,获取事先广播到Executor的数据广播变量是通过BT的方式广播的(TorrentBroadcast),多个Executor可以相互传递数据,可以提高效率在Driver端使用sc.broadcast这个方法进行广播,并且该方法是阻塞的(同步的)广播变量一但广播出去就不能改变,为了以后可以定期的改变要关联的数据,可以定义一个o

2020-09-25 21:50:09 2285

原创 大数据之spark_spark中的特殊算子cache、persist、checkpoint

cache、persist将数据缓存到内存,第一次触发Action,才会将数据放入内存,以后在触发Action,可以复用前面内存中缓存的数据,可以提升技术效率cache和persist的使用场景:一个application多次触发Action,为了复用前面RDD的数据,避免反复读取HDFS(数据源)中的数据和重复计算,persist可以将数据缓存到内存或磁盘【executor所在的内存或磁盘】,第一次触发action才放入到内存或磁盘,以后会缓存的RDD进行操作可以复用缓存的数据。一个RDD多次触发A

2020-09-25 20:53:42 477

原创 大数据之spark_spark运行结构剖析

基本概念RDD 概念:分布式、弹性、可容错的抽象数据集 特点 1.有多个分区,分区数量决定任务并行数 从HDFS中读取 如果是从HDFS中读取数据,分区的数量由hdfs中数据的输入切片数量决定 sc.textFile可以指定rdd的分区数量(textFile会生成两个RDD,一个是HadoopRDD负责读取数据,读取到 的数据是k,v类型的,然后内部又调用了一个MapPartitionsRDD,将k的偏移量去掉,只保留v数据, saveAsTextFile中也

2020-09-24 23:14:32 266

原创 大数据之spark_流量统计案例

数据用户id,开始时间,结束时间,所用流量1,2020-02-18 14:20:30,2020-02-18 14:46:30,201,2020-02-18 14:47:20,2020-02-18 15:20:30,301,2020-02-18 15:37:23,2020-02-18 16:05:26,401,2020-02-18 16:06:27,2020-02-18 17:20:49,501,2020-02-18 17:21:50,2020-02-18 18:03:27,602,2020-0

2020-09-24 22:42:15 482

原创 大数据之spark_连续三天用户登录案例

数据guid01,2018-02-28guid01,2018-03-01guid01,2018-03-05guid01,2018-03-02guid01,2018-03-04guid01,2018-03-06guid01,2018-03-07guid02,2018-03-01guid02,2018-03-03guid02,2018-03-02guid02,2018-03-06要求求每个用户登录记录中连续三天以上有登录记录的用户,并显示出他每个阶段的开始登录时间和结束登录时间实现方

2020-09-24 22:08:12 262

空空如也

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除