6 Jaming R

尚未进行身份认证

暂无相关简介

等级
TA的排名 15w+

flink 关于何时编写分配watermark

我的电脑是8线程,当我运行下面这段代码时,可以输出结果,但是当把并行度注释掉时,就不输出结果了,这是为什么呢?输入数据一样,watermark应该都是一样的啊输入数据为:1585721697000,xiao,81585721700000,xiao,101585721705000,xiao,41585721715000,xiao,9case class Line(id:Long,n...

2020-04-02 11:16:32

flink 在idea中编写的wordcount 重新启动后没有从检查点恢复之前的数据,为什么?

启用了检查点和状态后端,看hdfs目录确实有内容,然而重新启动后并没有从检查点恢复数据,这是为什么??object kafkatest { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Pro...

2020-03-07 11:08:42

kafka消费者报错:Class org.apache.kafka.common.serialization.StringDeserializer could not be found.

org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringDeserializer for configuration key.deserializer: Class org.apache.kafka.common.serialization.S...

2019-04-03 09:27:44

spark2.0 读取本地json数据到Elasticsearch时,过滤非法json

package esimport net.minidev.json.parser.JSONParserimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{Dataset, SparkSession}import scala.collection.Map...

2019-03-18 14:59:26

spark restful api 启动任务,运行完之后到historyserver

首先1、 spark-env.sh中添加export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://xiaoqi0:9000/sparkeventlog"2、spark-defaults.c...

2018-04-11 13:26:02

python 插入mysql数据库字符串中含有单引号或双引号报错

可以使用pymysql.escape_string(str_content)转换一下

2018-03-14 16:19:15

Hbase 查询语句记录

1.1.1 查询前几条数据scan'logs',{LIMIT=>1}1.1.2 3查询表的总条数count'logs'1.1.3 4删除表数据truncate't1'1.1.4 5查看过滤器 show_filters1.1.1 6行键过滤器scan'logs',{FILTER=>"RowFilter(=,'substring:2017-12-27')"}1.1.2 例子:  h...

2018-03-01 11:04:19

spring batch 并行执行任务

@RequestMapping("/startall") @ResponseBody String startall() throws ParseException { if (TimeFlag.getJobRun()) { System.out.println("Cron Start All Data Rsync Job. But Job Running."); return "Job...

2018-02-24 13:55:27

快学scala 记录

用scala的ArrayBuffer 调用java方法要导入隐式转换import scala.collection.JavaConversions.bufferAsJavaListimport scala.collection.mutable.ArrayBufferval command =ArrayBuffer("ls","-al","/home/cay")val pb = new Proces...

2018-01-31 16:19:33

sparkmllib交替最小二乘法

http://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html不需要用户和商品属性的信息,这类算法通常称为协同过滤算法例子:根据两个用户的年龄相同来判断他们可能有相似的偏好,这不叫协同过滤。相反,根据两个用户播放过许多相同歌曲来判断他们可能都喜欢某首歌,这才叫协同过滤。SparkMLlib 的ALS算法 要求用户和产品ID必须是...

2018-01-17 17:01:22

elasticsearch 内存溢出

ES在做聚合的时候,容易发生内存溢出https://www.elastic.co/guide/cn/elasticsearch/guide/current/_limiting_memory_usage.html

2017-12-22 15:19:39

elasticsearch 去重

ES字段折叠(collapse)可以去重,但是不hits里面的总数是不去重的,而聚合的cardinality方法去重,会有误差,默认30000之内的数据没有误差,参数precisionThresholdecisionThreshold最大可以调到40000.DSL语句GET /mtd_customers/mtd_customers_type/_search{  "query": {    "boo...

2017-12-16 12:28:03

sparksql 从oracle读取数据然后整合到elasticsearch

pom.xm<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="ht

2017-11-16 14:54:45

elasticsearch5安装

安装es:http://www.jianshu.com/p/a4d236ed17c3安装head:http://blog.csdn.net/u013066244/article/details/63279337最后将head/_site/app.js 中的localhost改成自己的ip就行了分片主分片副本分片索引(index)类型(type)id一旦文档被存储在Elasticsearch中,它就...

2017-07-20 14:21:52

flume到kafka,structuredStreaming从kafka消费

flume配置a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure r1a1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe k1#a1.sinks.k1.type = loggera1.s...

2017-07-14 17:34:33

spark2.2 structured Streaming

其实官方文档都说明了:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.htmlpackage com.renjiaming.spark2T2import java.util.concurrent.TimeUnitimport org.apache.log4j.{Level, Logger}i...

2017-07-14 11:03:38

查看更多

勋章 我的勋章
  • 勤写标兵Lv1
    勤写标兵Lv1
    授予每个自然周发布1篇到3篇原创IT博文的用户。本勋章将于次周周三上午根据用户上周的博文发布情况由系统自动颁发。