自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(282)
  • 收藏
  • 关注

原创 Scala可变参数如何传递数组?

对于java中的可变参数来说,可以直接传递数组类型的数据。但是scala中不支持最直接传递数组类型的数据。可以通过数组名:_*的方式使用传递数组类型的数据。

2023-12-28 10:02:23 395

原创 Scala函数作为参数传递的理解

在上面的例子中,函数f1的参数列表是f: (Int, Int) => Int,表示参数是一个函数类型,该函数有两个Int类型的参数,返回值是一个Int类型的值。而我们定义的add函数,恰好满足该条件。f1(add _)表示将add函数当作整体传递到f1函数中,即将add函数当作参数传递。很明显,add函数在调用的时候必须传两个Int类型的参数,所以f1(add)可以推断出并不是要调用add函数,可以省略下划线。在上面的例子中,f2的返回值类型是f3的函数类型,即 () => int。

2022-09-04 22:00:14 1234 1

原创 scala中函数名 _(下划线)如何理解?

Scala中函数名 _表示的将函数整体赋值给一个变量,即用变量表示一个函数。如果变量明确类型,那么可以省略函数名后面的_需要注意的是,通过变量调用该函数时,需要在变量名后面加()。

2022-09-04 21:07:38 537

原创 Flink使用KeyedProcessFunction求topN

***Authorpanghu*Date2022-04-19自定义数据源,SourceFunction接口实现的数据源并行度只能设置为1//向下游发送数据Randomrandom=newRandom();id=2"};//每隔一秒生成一次数据Thread.sleep(1000L);//通过标识位控制退出循环,中断数据源flag=false;}}...

2022-07-25 15:48:51 366

原创 centos7安装python3

2.创建一个目录存放python3的安装文件,比如mkdir/opt/module/python3。如果出现这样的界面就代表python3安装成功了,输入exit()退出页面。可以使用wget方式下载,如果wget下载很慢,可以在这个地址(如果出现这样的界面就代表pip3可以正常使用,这个警告可以忽略。)手动下载tar.xz格式的压缩包,然后上传到服务器。1.先进入解压后的Python-3.8.6文件夹。...

2022-07-23 10:20:27 307

原创 血泪史:centos7误删了/usr/bin/python2,如何恢复?

有个不懂python的朋友在我这借了一个python工具类想要简化工作,因为公司服务器没有python3环境,就想自己安装一下环境,但是没想到被某些博客误导,博客中提到删除python2的软连接,朋友是python小白,不小心误删了/usr/bin/目录下的python2文件!找了几篇博客,大致都是需要先完全卸载服务器的python2残留文件,再去重新安装对应版本,具体怎么操作不做过多描述,但是问题又来了,朋友没有权限删除残留文件!,查看服务器内核版本,我的是7.4.1708版本,注意你的版本号。...

2022-07-22 18:42:00 1798

原创 Flink处理迟到数据的几种方式

由于大部分乱序数据已经被水位线的延迟等到了,所以往往迟到的数据不会太多。这样,我们会在水位线到达窗口结束时间时,先快速地输出一个近似正确的计算结果;然后保持窗口继续等到延迟数据,每来一条数据,窗口就会再次计算,并将更新后的结果输出。那就要用到最后一招了用窗口的侧输出流来收集关窗以后的迟到数据。因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。即使我们有了前面的双重保证,可窗口不能一直等下去,最后总要真正关闭。窗口一旦关闭,后续的数据就都要被丢弃了。那如果真的还有漏网之鱼又该怎么办呢?...

2022-07-21 15:39:25 2220

原创 Flink自定义Sink写入到HBase

import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichS

2022-05-29 20:41:33 972

原创 Flink将数据写入到mysql

import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;import org.apache.flink.connector.jdbc.JdbcExecutionOptions;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.st

2022-05-29 20:40:56 996

原创 Flink将数据写入到redis

import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.Stre

2022-05-29 20:40:08 1000

原创 Flink将数据写入到Kafka

import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connecto

2022-05-29 20:39:18 1330

原创 Flink将数据写入文件

import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.core.fs.Path;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink

2022-05-29 20:38:32 1432

原创 Flink物理分区的几种方式

随机分区(shuffle)最简单的重分区方式就是shuffle,通过调用DataStream的.shuffle()方法,将上游数据随机分配到下游的并行任务中。轮询分区(Round-Robin)轮询也是常见的重分区方式,通过调用DataStream的.rebalance()方法,将上游的数据平均分配到下游所有的并行任务中。重缩放分区(rescale)重缩放分区和轮询分区非常类型,当调用resacle()方法时,底层调用的其实就是Round-Robin算法进行轮询。不同的是,rescale重缩放

2022-05-28 22:06:56 1198

原创 Flink自定义UDF

准备POJO类package com.wanshun.bigdata.chapter05;/** * Author:panghu * Date:2022-04-19 * Description:定义一个Flink POJO数据类型的类,方便数据的解析和序列化 * POJO类型的类需要满足以下几点: * 1.类是公共的和独立的(没有非静态内部类) * 2.有一个公共的无参构造器 * 3.所有的属性都是公共的且非final的,或者有公共的getter和setter方法 * 4.所有属性的类

2022-05-28 19:21:27 285

原创 Flink自定义数据源

准备POJO类package com.wanshun.bigdata.chapter05;/** * Author:panghu * Date:2022-04-19 * Description:定义一个Flink POJO数据类型的类,方便数据的解析和序列化 * POJO类型的类需要满足以下几点: * 1.类是公共的和独立的(没有非静态内部类) * 2.有一个公共的无参构造器 * 3.所有的属性都是公共的且非final的,或者有公共的getter和setter方法 * 4.所有属性的类

2022-05-28 11:05:12 473

原创 Flink从集合、元素、文件、端口、Kafka读取数据

定义POJO类POJO类型的类需要满足以下几点:1.类是公共的和独立的(没有非静态内部类)2.有一个公共的无参构造器3.所有的属性都是公共的且非final的,或者有公共的getter和setter方法4.所有属性的类型都是可序列化的public class Event { public String user; public String url; public Long timeStamp; public Event() { } publ

2022-05-28 11:00:29 360

原创 Flink的并行度概念和并行度的设置

并行度概念并行度可以认为同时处理数据的子任务数,在大数据场景下,我们都是依赖分布式框架做并行计算,从而提高数据的吞吐量。Flink中实现任务并行的方法就是将一个算子操作复制到多个节点(或者线程),当数据到来时,就可以到其中任何一个节点上执行。像这样将一个任务拆分到多个并行的子任务,分发到不同节点,就真正实现了并行计算。包含并行子任务的数据流就是并行数据流,它需要多个分区来分配并行任务。一般情况下,一个流程序的并行度=所有算子中最大的并行度。一个程序中,不同的算子可能会有不同的并行度。就比如下图中,出了s

2022-05-05 16:33:57 6653

原创 Flink作业提交流程(Yarn集群模式)

会话模式1.客户端通过REST接口,将作业提交给分发器。2.分发器启动JobMaster,并将Job资源提交到JobMaster。3.JobMaster向Flink的资源管理器请求资源(slots)。4.Flink的资源管理器向Yarn资源管理器请求Container容器。5.Yarn资源管理器启动新的TaskManager容器。6.TaskManager向Flink的资源管理器注册自己可用的slots。7.Flink的资源管理器通知TaskManager,向新的作业提供slots。8.T

2022-04-18 14:19:44 3671

原创 Flink集群部署模式

会话模式会话模式需要先启动一个Flink集群,保持一个会话,通过客户端向集群提交作业。这样做的好处就是:集群的生命周期不受作业影响,作业结束了就释放资源,集群仍然能够正常使用。缺点就是:资源共享,不同的作业会抢占资源,一旦资源不够,提交新的作业就会失败,另外同一个TaskManager上会运行很多的作业,一旦其中一个作业失败导致TaskManager宕机,那么所有的作业都会受影响。会话模式比较适合于单规模小、执行时间短的大量作业。单作业模式但作业模式就是一对一模式,一个作业对应一个集群。客户端运行

2022-04-15 11:47:49 2810

原创 python中如何使用正则表达式,看这一篇文章就够了

python正则表达式的使用

2022-04-02 17:41:55 3562

原创 Hive使用动态分区向表插入数据报错java heap space或者GC overhead limit exceeded

问题描述今天使用hive动台分区向表插入数据时,一直报错java heap space或者GC overhead limit exceeded。排查了好久,发现以下问题。1.表使用的是parquet存储格式(或者ORC)。2.虽然数据量很小,但是分区比较多。hive在进行多分区操作时,只启动map端任务,不启动reduce,再加上parquet格式的表会为每一个分区创建一个FileWriter,分区太多就会导致内存溢出。解决方案1.首先确定开启了动态分区和非严格模式。set hive.exec

2021-11-23 19:12:38 2667

原创 python引入自己创建的工具类时报错no module named xxx

首先要注意目录层级关系,如果两个包是同层级的,如下图的情况。如果我想在dwd包下的read_test_csv.py文件中引用utils包下的rd.py工具类。如果直接写from utils import rd会报错,这是因为运行dwd包中的程序时,默认会在dwd层开始找要导入的module,dwd层没有utils,所以会报错。解决方法在read_test_csv.py中先导入sys,使用sys.path.append指定需要导入的包的路径,可以是绝对路径,也可以是相对路径。注意要先append,才能

2021-10-12 13:16:04 1852

原创 tensorflow占位符的使用

定义方法:a = tf.placeholder(dtype)a = tf.placeholder(dtype=tf.float32)b = tf.placeholder(dtype=tf.float32)out = tf.multiply(a, b)填充数据时,使用run()方法的feed_dict参数指定张量对应的值即可,数据格式和字典类似。with tf.Session() as sess: # 填充占位符,填充形式类字典 res = sess.run(out, feed_d

2021-09-06 14:24:27 446

原创 tensorflow tfrecords格式的数据保存与加载

import tensorflow as tfimport osos.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'class tfrecords_file(object): def __init__(self, file_list): self.file_list = file_list # 定义属性 self.height = 32 self.width = 32 self.c

2021-08-04 15:40:28 243

原创 tensorflow加载二进制文件

步骤1.找到文件,创建文件列表2.创建文件列表队列3.创建文件阅读器,加载数据4.解码,处理数据。5.批处理import tensorflow as tfimport osos.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'class bin_file(object): """加载二进制文件 """ def __init__(self, file_list): self.file_list = file_list

2021-08-04 15:33:15 216

原创 tensorflow加载图片格式的数据

步骤1.找到文件,指定文件列表2.创建文件列表队列3.创建阅读器,读取文件。4.解码,统一图片大小(统一特征),固定形状5.批处理import tensorflow as tfimport osos.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'def read_images(file_list): # 2.创建文件列表队列 file_queue = tf.train.string_input_producer(file_list)

2021-08-04 15:28:23 428

原创 tensorflow读取csv文件

读取步骤1.找到文件,指定文件列表。路径+文件名2.创建文件列表队列。3.创建文件阅读器,读取文件,返回的是k-v类型,分别是:文件名-文件内容。默认只读取一行。4.解码,转为tensor张量,指定每一列的数据类型和默认值。5.批处理。import tensorflow as tfimport osos.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'def read_csv(file_list): # 2.创建文件队列 file_queue

2021-08-04 15:25:01 999

原创 tensorboard增加变量显示

tensorboard增加变量显示,可以观察模型的参数、损失值的变化。import tensorflow as tf# 收集tensortf.summary.scalar("losses", loss) # 收集标量tf.summary.histogram("weight", w) # 收集高维变量# 合并tensormerged = tf.summary.merge_all()with tf.Session() as sess: for i in range(500):

2021-07-28 18:02:34 289 1

原创 tensorflow创建变量作用域

import tensorflow as tfwith tf.variable_scope("data"): x = tf.random_normal([100, 1], mean=1.70, stddev=0.50, name='x_data') y = tf.matmul(x, [[0.7]]) + 0.9作用使得tensorboard观察模型代码时更加清晰。

2021-07-28 17:50:17 273

原创 tensorflow自定义命令行参数

import tensorflow as tf# 自定义命令行参数tf.app.flags.DEFINE_integer(flag_name='step', default_value=200, docstring='训练次数')FLAGS = tf.app.flags.FLAGSwith tf.Session() as sess: # 获取命令行参数 for i in range(FLAGS.step): print(i)flag_name:参数名字。default_

2021-07-28 17:46:19 120

原创 tensorflow训练模型保存和还原

import tensorflow as tf# 保存训练模型saver = tf.train.Saver(var_list=[w,b],max_to_keep=5)with tf.Session() as sess: # 运行初始化op sess.run(init_op) # 还原变量 saver.restore(sess, '/home/sun/py_code/py36/deep_learning/day01/tmp/ckpt/m

2021-07-28 17:07:10 358

原创 tensorflow创建变量

# 定义变量,值是随机生成的var = tf.Variable(tf.random_normal(shape=[2, 3], mean=0, stddev=1.0), name='var')# 变量必须要进行显示初始化opinit_op = tf.global_variables_initializer()with tf.Session() as sess: # 必须运行初始化op sess.run(init_op) print(sess.run(var))注意1.变

2021-07-28 11:55:35 249

原创 sklearn常用库合集及使用示例

文本类型特征值化DictVectorizer、CountVectorizer、TfidfVectorizer。字典类型的数据进行特征值化from sklearn.feature_extraction import DictVectorizerdictV = DictVectorizer(sparse=False)res = dictV.fit_transform(X)sparse:是否产生稀疏矩阵,默认为True。X:字典类型的列表文本类型的数据进行特征值化from sklearn.f

2021-07-26 15:15:58 764

原创 sklearn特征选择:VarianceThreshold和PCA

PCA是一种分析、简化数据的技术。其目的是数据维度压缩,尽可能降低原始数据的维度,损失少量信息。作用: 削减线性回归和聚类分析中的特征数量。API使用from sklearn.decomposition import PCApca = PCA(n_components = 0.9)data = pca.fit_transform(X)其中n_components可以用小数和整数表示,用小数表示代表降维后的主成分方差和最小所占比例的阈值,一般设置为90%~95%;用证书表示代表降维后的维度数量

2021-07-21 14:45:06 1347

原创 sklearn缺失值处理

处理方式1.删除:如果某一行或者某一列缺失值所占比重过大,那么可以删除此行或者此列。2.插补:通过每行或者每列的平均值、中位数,对空值进行填充。通常我们使用插补法,按照每列特征的平均值或者中位数进行填充。代码实现import numpy as npfrom sklearn.impute import SimpleImputer# 老版本的sklearn需要引用如下API# import sklearn.preprocessing.Imputerdef im(): """

2021-07-21 10:36:45 1102

原创 特征处理:归一化和标准化的区别及使用

归一化定义把原始的特征数据映射到[0,1]范围内。这个[0,1]范围是默认的,可以手动指定。计算公式x’ = (x - min) / (max - min)x’’ = x’ * (mx - mi) + mi需要注意的是:x’’ 才是最终的计算结果。其中:x表示当前值,min表示每一列特征的最小值,max表示每一列特征的最大值;mx表示[0,1]这个指定范围内的最大值,mi表示[0,1]这个指定范围内的最小值。代码实现from sklearn.preprocessing import Mi

2021-07-21 10:09:37 795

原创 特征工程:对非数值类型的数据进行特征值化

DictVectorizer:对字典类型的非数值数据进行特征值化。CountVectorizer:对文本类型的数据进行特征值化,得到的是每个词出现的次数。TfidfVectorizer:对文本类型的数据进行特征值化,结果 = log(总文档数/该词出现的文档数),表示该词的重要程度。DictVectorizerfrom sklearn.feature_extraction import DictVectorizerdef dictVec(): """ 字典特征提取:对字典类型.

2021-07-20 18:04:52 514 1

原创 Hive求截止到当前日期累计消费金额或使用次数

说明经常遇到这种类型的问题:求截止到当前日期(或在此日期之前),某用户的累计消费金额或者累计使用次数。只要遇到截止到当前日期为止,就要想到窗口函数,因为最后求的是累积消费金额,就要使用sum(金额字段) over(partition by 用户 order by 日期字段)。场景题下面是力扣的一道SQL题,以此题为例:activity 表:编写一个 SQL 查询,同时报告每组玩家和日期,以及玩家到目前为止玩了多少游戏。也就是说,在此日期之前玩家所玩的游戏总数。最终结果如下:SQLSELE

2021-07-09 09:53:28 2311

原创 大数据场景下使用SQL求UV和PV的问题

表数据现有user_age表如下:user_app表如下:需求求:0-10岁的用户、11-20岁的用户、21-30岁的用户、30岁以上的用户使用app的人数(uv)和次数(pv)。分析需求乍一看不难,但是如何实现同时求出uv和pv?有的同学可能最先想到的就是uv = count(distinct uid),pv = count(*)。在MySQL中这样写固然可以得到结果,但是在大数据场景中呢?如果是Hive中这样写发生数据倾斜了怎么处理?所以我们需要换一种思路。首先可以在user_app

2021-07-08 10:07:35 3411 1

原创 Spark的shuffle过程

Spark的shuffle分为老版本的HashShuffle(现在已经弃用)和新版本的SortShuffle。Shuffle过程发生在宽依赖切分Stage的过程中,前一个Stage称作ShuffleMap Stage,后一个Stage称作Result Stage。HashShuffle原理未经优化的HashShuffle1.Map Task将数据写入buffer缓冲区,待缓冲区达到阈值时开始溢写文件,文件数量取决于(等于)Reduce Task的数量。Reduce Task的数量取决于(等于)上一个

2021-07-06 17:16:40 1052 1

空空如也

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除