自定义博客皮肤VIP专享

*博客头图:

格式为PNG、JPG,宽度*高度大于1920*100像素,不超过2MB,主视觉建议放在右侧,请参照线上博客头图

请上传大于1920*100像素的图片!

博客底图:

图片格式为PNG、JPG,不超过1MB,可上下左右平铺至整个背景

栏目图:

图片格式为PNG、JPG,图片宽度*高度为300*38像素,不超过0.5MB

主标题颜色:

RGB颜色,例如:#AFAFAF

Hover:

RGB颜色,例如:#AFAFAF

副标题颜色:

RGB颜色,例如:#AFAFAF

自定义博客皮肤

-+
  • 博客(23)
  • 收藏
  • 关注

原创 读取kafka的偏移量的工具类

读取kafka的偏移量的工具类// An highlighted blockimport java.util.Propertiesimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.sp

2021-10-11 18:50:08 220

原创 Maxwell 的使用

Maxwell 的使用1.Maxwell介绍Maxwell介绍Maxwell是由美国zendesk开源,用java编写的Mysql实时抓取软件,其抓取的原理也是基于binlog。2.Maxwell 和canal工具对比➢Maxwell没有canal那种server+client模式,只有一个server把数据发送到消息队列或redis。如果需要多个实例,通过指定不同配置文件启动多个进程。➢Maxwell有一个亮点功能,就是canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell

2021-10-11 18:27:00 851

原创 variflight 多线程爬虫获取所有航班信息,绕过封锁ip

先登录这个网址获取航班列表http://www.variflight.com/sitemap.html?AE71649A58c77=然后随便点击一个获取其Cookie,由于这个网址封ip 和验证码比较厉害记得验证完以后获得其Request Header 的全部内容放到 代码的request 里面的headers 里,如果有代理ip最好用代理ip获取数据,如果没有可以用自己的手机热点进行代理,在程序被中断要打开手机飞行模型,然后再让电脑链接热点,这个时候我们就又切换了一个ip,这个重复进行这个网址数

2021-07-06 17:26:35 5087 1

原创 大数据组件启动顺序

2021-07-06 17:00:31 234

原创 flink 读取kafka数据

下面展示一些 内联代码片。package io.github.interestinglab.waterdrop.flink.testimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.

2021-06-08 16:43:21 240

原创 flink 将数据写入到kafka

下面展示一些 内联代码片。package io.github.flink.test/** * Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的 * 输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。 * stream.addSink(new MySink(xxxx)) */import org.apache.flink.api.common.serialization.SimpleStringSchemai

2021-06-08 16:34:35 670

原创 多线程爬取携程网酒店数据,星级数据和经纬度

下面展示一些 内联代码片。多线程爬取携程网酒店星级数据和经纬度import threadingfrom queue import Queueimport configimport urllibimport reimport pandas as pdnum_of_threads=10def write_fun(line): with open('酒店0502.csv','a',encoding='utf-8') as f: f.write(line)

2021-06-08 16:22:39 981

原创 flink source 写入mysql或者jdbc

// A code blockvar foo = 'bar';package io.github.flink.testimport java.util.Randomimport org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}import org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.

2021-06-08 15:45:40 236

原创 flink 链接source 将数据写入Elasticsearch

maven 依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.9.2</version> </dependency&gt

2021-06-08 11:47:08 227

原创 flink KeyedProcessFunction 源码分析

KeyedProcessFunction 继承 AbstractRichFunction 函数Function为基础函数接口,为纯接口里面没有方法RichFunction 继承基础接口Function里面封装了到了运行环境函数,open和close 方法还有得到迭代运行时间内容public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction { private static fi.

2021-06-04 17:11:02 201 1

原创 flink BoundedOutOfOrdernessTimestampExtractor

BoundedOutOfOrdernessTimestampExtractor 实现了AssignerWithPeriodicWatermarks (注册周期生成Watermarks)源码如下第一次注册会生成当前最大时间戳currentMaxTimestamp =lastEmittedWatermark - 时间间隔(maxOutOfOrderness)public abstract class BoundedOutOfOrdernessTimestampExtractor<T>

2021-06-03 17:57:44 1262 1

原创 Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.DocWriteRequest.parent()Ljava/lang/

// 创建index request,准备发送数据val indexRequest = Requests.indexRequest().index(“wtte”).type(“data”).source(json)flink 写入elasticsearch 报错 Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.DocWriteRequest.parent()Ljava/lang/解决 添加maven 依赖 &l

2021-06-02 15:15:58 369

原创 ES常用命令

#查看集群健康Get /_cat/health/?vGet /_cat/health?v#查看节点情况GET /_cat/nodes/?v#查询各个索引状态GET /_cat/indices?v#创建一个索引PUT /movie_indexPUT /sensor#删除一个索引DELETE /movie_index#查看某一个索引的分片情况GET /_cat/shards/movie_index?vGET /_cat/shards/sensor?v#创建文.

2021-06-02 14:38:32 260

原创 Caused by: redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required

flink 链接redis报错Caused by: redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required原因是没有权限(NOAUTH Authentication required),解决办法就是加上val conf = new FlinkJedisPoolConfig.Builder().setHost(“127.0.0.1”).setPort(6379).setPassword(’’

2021-06-02 11:42:35 830

Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeinfo.TypeInformation 错误

Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeinfo/TypeInformationat io.github.interestinglab.waterdrop.flink.test.RedisSinkTest.main(RedisSinkTest.scala)Caused by: java.lang.ClassNotFoundException: org.apache

2021-06-02 11:22:56 700

原创 Flink入门参数设置(二)

// An highlighted blockpublic class ConfigKeyName { public final static String TIME_CHARACTERISTIC = "execution.time-characteristic"; public final static String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public final static String PAR

2021-05-27 14:26:55 299 3

原创 Flink 入门和环境创建以及各种参数含义(一)

// An highlighted blockimport org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import org.apache.flink.runtime.state.StateBackend;import o..

2021-05-27 11:43:35 165

原创 获取马蜂窝景点数据和景点评论数据

CREATE TABLE `poi` ( `poi_id` int NOT NULL, `name` varchar(128) DEFAULT NULL, `image` varchar(512) DEFAULT NULL, `link` varchar(512) DEFAULT NULL, `lat` float DEFAULT NULL, `lng` float DEFAULT NULL, `type` int DEFAULT NULL, `is_cnmain` int.

2021-05-25 13:24:40 1102 2

原创 获取搜狗指数数据

// An highlighted blockimport requests, re ,jsonfrom bs4 import BeautifulSoupimport randomimport jsonimport timeuser_agent_list = [ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Saf

2021-05-25 11:39:16 171

原创 获取www.maigoo.com 景点爬虫,包括1A到5A级景点名称。景点地址,景点描述和景点经纬度

下面展示一些 内联代码片。// An highlighted blockimport requests, re ,jsonfrom bs4 import BeautifulSoupimport randomimport jsonimport timeuser_agent_list = [ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.

2021-05-25 09:51:37 855

原创 Spark read.json 用法

// An highlighted block val otherPeopleDataset = ssc.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = ssc.read.json(otherPeopleDataset) otherPeople.show() val schema = ssc.re

2021-05-25 09:34:10 1330

原创 spark解析json用到的算子

// A code block// An highlighted block import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ //定义字段类型 val jsonSchema = new StructType().add("battery_level", LongType).add("c02_level",LongType).add("cca3",StringType).add

2021-05-24 18:25:42 133

原创 spark 报错 No implicits found for parameter evidence$4: Encoder

spark 报错 No implicits found for parameter evidence$4: Encoder解决添加 import ssc.implicits._// An highlighted block//No implicits found for parameter evidence$4: Encoderimport scala.collection.JavaConversions._import org.apache.spark.sql.SparkSessionimpo

2021-05-24 15:06:41 530 5

空空如也

空空如也

TA创建的收藏夹 TA关注的收藏夹

TA关注的人

提示
确定要删除当前文章?
取消 删除