自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(150)
  • 收藏
  • 关注

原创 qlik为app添加定时调度

qlik给app设置定时调度

2023-11-22 11:17:29 115

原创 java和sql生成时间维度数据

【代码】java和sql生成时间维度数据。

2023-08-24 14:42:00 979

原创 Scala反射调用object

【代码】Scala反射调用object。

2023-08-23 13:03:27 597

原创 Git生成ssh密钥,配置gitLab

【代码】Git生成ssh密钥,配置gitLab。

2023-08-16 15:51:24 109

原创 MySQL查询当前执行的sql,kill当前执行的sql

查询当前执行的sql:kill当前执行的sql:kill id。

2023-04-13 16:00:32 177

原创 hive创建映射hbase外表

hbase

2023-02-23 19:38:05 589

原创 JDBC工具类

jdbc工具类

2022-12-26 15:12:24 132

原创 git初始化本地仓库

git远程仓库初始化

2022-12-22 13:59:14 492

原创 Spark读取mysql(jdbc)数据分区两种API

Spark读取mysql(jdbc)数据分区两种API

2022-09-25 15:15:06 691

原创 Mysql分组TopN

Mysq分组TopN。

2022-07-29 11:02:25 265

原创 hive SQL struct类型和Array<struct>类型转json字符串

数据格式:default_address是struct类型,address是Array<struct>类型:例如:array<struct<id:string,customer_id:string>>目标格式:[ { "id": "1111", "customer_id": "2222" }, { "id": "3333", "customer_id": "444

2022-05-28 00:55:04 2917

原创 MySQL8递归查询

表:人员表:t_sys_user组织机构表:t_sys_organization组织机构人员关系表:t_sys_user_organization需求:传入一个组织机构id查询该机构下面所有的人员SQL:with recursive t as( SELECT * FROM t_sys_organization WHERE organization_id='150030595752464384' UNION ALL SELECT c.* FROM t_sys_o

2022-04-21 12:36:24 647

原创 MapReduce为什么要排序

MapReduce在reduce阶段需要分组,将key相同的放在一起进行规约,为了达到该目的,有两种算法:hashmap和sorthashmap算法太耗内存,而sort通过外排可对任意数据量分组,只要磁盘够大就行。map端排序是为了减轻reduce端排序的压力。注:在spark中,除了sort的方法,也提供hashmap,用户可配置,毕竟sort开销太大了。...

2022-03-26 10:46:16 3135

原创 ntile函数

适用sparkSQL和hiveSQL作用: ntile函数可以对序号进行分组处理。这就相当于将查询出来的记录集放到指定长度的数组中,每一个数组中存放一定数量的记录。ntile函数为每条记录生的序号就是这条记录所在数组的索引(从1开台)。也可以将每一个分配录的数组元素称为"桶"。ntile函数有一个参数,用来指定桶数。下面的SQL语句使用ntile函数对t_table表进行了装桶处理:SELECT ntile(4) over...

2021-12-29 16:32:08 2120

原创 cdh6.x命令行启动与关闭

关闭所有cloudera-scm-agent节点# 使用root用户# 在所有节点上执行service cloudera-scm-agent stop# 查看是否关闭service cloudera-scm-agent status关闭cloudera-scm-server节点# 使用root用户# 在cloudera-scm-server节点上执行service cloudera-scm-server stop# 查看是否关闭service cloudera-sc.

2021-12-01 17:38:29 1627

原创 linux安装cdh

环境准备:linux版本:阿里云ecs,Red Hat 4.8.5,四台机器(一台主节点,三台从节点)cdh版本:cdh-6.1.0-install-soft.tar.gz,离线安装java版本: jdk1.8.0_181-cloudera.tar.gz关闭防火墙systemctl stop firewalld 关闭防火墙systemctl disable firewalld 禁止防火墙开机自启vi /etc/selinux/config —> SELINUX=disab..

2021-12-01 17:06:16 1842

原创 sublime安装json格式化插件

安装pretty json插件:1,快捷键crtl+shift+p输入pic:2,输入

2021-05-29 14:17:45 3663

原创 Spark Streaming连接kafka消费多个topic只消费其中一个topic一个分区的问题

代码:import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext

2021-05-27 10:35:36 1010

原创 Spark中K-Means算法计算RFM

环境与定义K-Means:org.apache.spark.ml.clustering.KMeans,版本2.11RFM:根据业务需求的不同,RFM也会存在不同的定义,此处的定义:R:最近一次购买时间距今的天数(0 - 365/366) F:有购买行为的天数(一天多次购买算一次) M:购买的总金额(注意0或者超大金额是否需要过滤掉)只计算一年内的订单数据结算结果(标签值):将用户共分为8种类型实现:1,加载订单数据转换为DataFrame:SELECT

2021-04-29 16:17:51 2645

原创 spark报错ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from

spark任务报错(不过计算后的结果是对的):21/04/29 14:38:38 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from /172.16.4.156:37528 is closed21/04/29 14:38:39 ERROR cluster.YarnScheduler: Lost executor 3 on cdh-slave1.test.com: C

2021-04-29 14:53:45 5239 6

原创 记一次Spark内存溢出与性能优化

场景执行一个spark任务后,发现控制台一直报错如下:21/04/21 10:32:29 ERROR cluster.YarnScheduler: Lost executor 3 on cdh-slave1.test.com: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryO

2021-04-21 16:06:41 675

原创 记一次生产kafka重复消费事故

生产环境消费者处理一条消息耗费时间过长,导致重复消费原配置:public ConcurrentKafkaListenerContainerFactory<String, String> msgContainerFactory() { //System.out.println(groupId); Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOO

2021-04-18 21:44:34 532

原创 Kafka消费者一些重要参数

消费者重复消费场景:假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样 并不能避免重复消费的发送,而且也会使位移提交更加频繁消费者丢失消息场景:按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下 会发生的呢?我们来看下图中的情形:拉取线程不...

2021-01-31 00:08:08 1057

原创 Kafka生产者原理和重要参数

生产者原理一个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender线程。 主线程:中由 kafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator, 也称为消息收集器)中。 RecordAccumulator:主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。缓存的大小可以通过生产者客户端参数 buffer.memory配置,默认值为 335544..

2021-01-30 23:24:41 493

原创 Kafka基础架构

1)Producer :消息生产者,就是向kafka broker发消息的客户端;2)Consumer :消息消费者,向kafka broker取消息的客户端;3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内 消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。4)Broker :一台kafka服务器就是一个broker。一个集群由多个br...

2021-01-30 22:34:22 146 1

原创 多线程消费Kafka数据

import com.base.JavaKafkaConfigurer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.Kafka.

2021-01-13 15:03:34 584 2

原创 Kafka消费者重复消费(Scala版本)

def main(args: Array[String]): Unit = { val props = new Properties() props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeseria...

2020-11-27 15:25:51 193

原创 IDEA中git撤回上一次push(指定回到某个版本)

场景:最新的push是有问题的,现在需要撤回这次push,回退到5512e这个版本操作:1,git reset --hard xxxxx (回退到指定的版本,版本是前五位)2,检查下稳一手3,git push origin 分支名 --force 将当前版本push上去注意:这次操作会删除上一次提交记录,而不是重新提交一次,所以如果需要保存文件就先备份下4,查看提交记录后发现上一次push已经被删除了...

2020-11-14 09:53:13 2049

原创 Flink版本的WordCount

环境准备:监听8888端口号的ncpom文件:<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 htt

2020-10-09 22:08:12 228

原创 Flink安装启动

安装1,下载flink安装包,下载地址:https://flink.apache.org/downloads.html2,上传flink安装包到Linux服务器上并解压tar -xvf flink-1.11.1-bin-scala_2.12.tgz -C ./3,修改conf目录下的flink-conf.yaml配置文件#指定jobmanager的地址jobmanager.rpc.address: linux01#指定taskmanager的可用槽位的数量taskmanager

2020-10-09 22:02:07 1420

原创 SQL漏斗分析

漏斗分析

2020-09-10 12:37:25 1490

原创 SQL新用户留存分析

用户留存分析

2020-09-10 12:31:23 888

原创 spark程序连接hive后导致hive的shell连接不上

出现原因:本地写spark代码操作hive,配置文件中直接连接的mysql元数据服务,连接后导致hive的shell端连接不上。排查后发现hive的元数据中VERSION表被spark连接改动了,重新修改后才能在shell端连接hiveserver2 -hiveconfig hive.root.logger=DEBUG,console解决:hive-stite文件加上这个配置,通过metastore去连接元数据服务,是因为spark内置的hive版本和元数据服务的hive版本不一致导

2020-09-10 11:39:05 438

原创 Hive高阶函数之CUBE,GROUPING SETS,ROLLUP函数

应用场景将所有的维度排列组合在一块WITH CUBE会组合所有维度,比如有n个维度,一共排列出2^n种组合查询缺点:会产生大量无意义的组合SELECT province, city, devicetype, count(distinct guid) as uv_amtFROM tmpGROUP BY province,city,devicetypeWITH CUBE;GROUPING SETS(...)可以自定义组合维度,()代表最粗粒度的组合SELECT prov

2020-09-08 20:45:50 660

原创 Linux编写执行程序脚本案例

1,执行hive语句案例:hive导入数据#!/bin/bashHIVE_HOME=/opt/apps/hive-3.1.2/DT=`date -d'-1 day' +%Y-%m-%d`if [ $1 ]thenDT=$1fi${HIVE_HOME}/bin/hive -e "load data inpath '/eventlog/app/${DT}' into table ods.event_app_log partition(dt='${DT}')"if [ $

2020-09-08 20:16:49 309

原创 项目问题:hive中读取parquet文件的问题

问题:hive中查询一段语句:select guid, sessionid, properties, first_value(properties['pageid']) over(partition by guid,sessionid order by ts) as in_page, last_value(properties['pageid']) over(partition by guid,sessionid order by ts rows betwe

2020-09-08 12:44:15 2087

原创 Hive常用命令

查看所有数据库:show databases;创建数据库:create database db_name ;查看当前正在使用的数据库:select current_database();切换数据库:use db_name ;删除表:drop table table_name;查看表类型:desc formatted table_name;查看执行引擎:set hive.execution.engine;设置执行引擎为spark:set hive.execut...

2020-09-07 21:11:16 255

原创 hive时间转换

时间戳转换原始数据1,转换时间from_unixtime(cast(t.ts/1000 as bigint)) 2,转换小时/天/年(hour/day/year)hour(from_unixtime(cast(t.ts/1000 as bigint)))

2020-09-07 20:49:31 2616

原创 yarn资源调优设置的一些参数

CPU参数目前的CPU被划分为虚拟CPU(CPU virtual Core),这里的虚拟CPU是yarn自己引入的概念,因为每个服务器的CPU计算能力不一样,有的机器可能是其他机器计算能力的两倍,然后可以通过多配置几个虚拟CPU弥补差异。在yarn中,CPU的相关配置如下:1.yarn.nodemanager.resource.cpu-vcores表示该节点服务器上yarn可以使用的虚拟CPU个数,默认是8,推荐将值配置与物理核心个数相同,如果节点CPU核心不足8个,要调小这个值,yarn不会智能

2020-09-07 20:28:07 1299

原创 项目问题:布隆过滤器与kyro序列化

https://zhuanlan.zhihu.com/p/72378274

2020-09-06 08:58:30 350

空空如也

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除