Commit 64a1dfa7 authored by chenfm's avatar chenfm

初始提交

parents
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.esv.flink</groupId>
<artifactId>my-flink-project</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.1</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
<alibaba-fastjson.version>1.2.62</alibaba-fastjson.version>
<mqtt-client.version>1.16</mqtt-client.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.fusesource.mqtt-client/mqtt-client -->
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>${mqtt-client.version}</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${alibaba-fastjson.version}</version>
<scope>compile</scope>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.MF</exclude>
<exclude>log4j.properties</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.esv.flink.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.esv.flink;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* Skeleton for a Flink Batch Job.
*
* <p>For a tutorial how to write a Flink batch application, check the
* tutorials and examples on the <a href="https://flink.apache.org/docs/stable/">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution,
* change the main class in the POM.xml file to this class (simply search for 'mainClass')
* and run 'mvn clean package' on the command line.
*/
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
/*
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.readTextFile(textPath);
*
* then, transform the resulting DataSet<String> using operations
* like
* .filter()
* .flatMap()
* .join()
* .coGroup()
*
* and many more.
* Have a look at the programming guide for the Java API:
*
* https://flink.apache.org/docs/latest/apis/batch/index.html
*
* and the examples
*
* https://flink.apache.org/docs/latest/apis/batch/examples.html
*
*/
// execute program
env.execute("Flink Batch Java API Skeleton");
}
}
package com.esv.flink;
import lombok.extern.slf4j.Slf4j;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
/**
* @description:
* @project: emqdemo
* @name: com.esv.flink.EmqClient
* @author: chenfm
* @email: chenfengman@esvtek.com
* @createTime: 2020/7/30 15:52
* @version: 1.0
*/
@Slf4j
public class EmqClient {
private String broker;
private String subTopic = "$esv/iot/#";
private String clientId = "subscribe_emqx_flink";
public EmqClient(String broker) {
this.broker = broker;
}
public FutureConnection run () {
try {
MQTT mqtt = new MQTT();
mqtt.setHost(broker);
mqtt.setCleanSession(true);
mqtt.setReconnectAttemptsMax(6);
mqtt.setReconnectDelay(2000);
mqtt.setKeepAlive((short) 30);
mqtt.setSendBufferSize(64);
mqtt.setClientId(clientId);
Topic[] topics = {
new Topic(subTopic, QoS.AT_MOST_ONCE)
};
FutureConnection connection = mqtt.futureConnection();
connection.connect();
connection.subscribe(topics);
log.info("mqtt连接成功");
return connection;
} catch (Exception me) {
me.printStackTrace();
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.esv.flink;
import com.esv.flink.bean.EmqData;
import com.esv.flink.sink.AlarmRedisSinkFunction;
import com.esv.flink.sink.EmqDataRichSinkFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.util.Collector;
import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.Message;
import java.nio.charset.StandardCharsets;
/**
* Skeleton for a Flink Streaming Job.
*
* <p>For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the <a href="https://flink.apache.org/docs/stable/">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*/
@Slf4j
public class StreamingJob {
public static void main(String[] args) throws Exception {
log.info("start flink.");
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromArgs(new String[0]);
env.getConfig().setGlobalJobParameters(params);
String brokerUrl = params.get("brokerUrl", "tcp://192.168.31.248:1883");
String postgresqlInfo = params.get("postgresqlInfo", "jdbc:postgresql://192.168.31.248:5432/iot$$iot$$123456");
String redisInfo = params.get("redisInfo", "192.168.31.248");
String mysqlInfo = params.get("mysqlInfo", "192.168.31.248:3306$$data_center$$123456");
log.info("load param brokerUrl: {}", brokerUrl);
log.info("load param postgresqlInfo: {}", postgresqlInfo);
log.info("load param redisInfo: {}", redisInfo);
log.info("load param mysqlInfo: {}", mysqlInfo);
env.setParallelism(1);
EmqSource emqSource = new EmqSource(brokerUrl);
DataStream<String> inStream = env.addSource(emqSource);
inStream.print();
SingleOutputStreamOperator<EmqData> emqDataStream = inStream.flatMap(new FlatMapFunction<String, EmqData>() {
@Override
public void flatMap(String s, Collector<EmqData> collector) throws Exception {
String[] tokens = s.toLowerCase().split("@@");
if (tokens.length > 1) {
String topic = tokens[0];
String content = tokens[1];
collector.collect(new EmqData(topic, content, 1));
}
}
});
RichSinkFunction<EmqData> emqDataRichSinkFunction = new EmqDataRichSinkFunction(postgresqlInfo);
emqDataStream.addSink(emqDataRichSinkFunction);
RichSinkFunction<EmqData> alarmRedisSinkFunction = new AlarmRedisSinkFunction(redisInfo, mysqlInfo);
emqDataStream.addSink(alarmRedisSinkFunction);
SingleOutputStreamOperator<Tuple2<String, Integer>> res = emqDataStream.map(new MapFunction<EmqData, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(EmqData emqData) throws Exception {
return new Tuple2<>(emqData.getTopic(), emqData.getCount());
}
}).keyBy(0).sum(1);
if (params.has("output")) {
res.writeAsText(params.get("output"));
} else {
log.info("Printing result to stdout. Use --output to specify output path.");
res.print();
}
// res.writeAsText("D://flink_result.txt");
// execute program
env.execute("Streaming WordCount");
}
public static class EmqSource implements ParallelSourceFunction<String> {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
private String brokerUrl;
public EmqSource(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
public void run(SourceContext<String> ctx) throws Exception {
EmqClient emqClient = new EmqClient(brokerUrl);
FutureConnection connection = emqClient.run();
int num = 0;
while (isRunning) {
Future<Message> future = connection.receive();
Message message = future.await();
num ++;
String topic = message.getTopic();
String context = new String(message.getPayload(), StandardCharsets.UTF_8);
ctx.collect(topic + "@@" + context);
log.info("接收数据条数num:" + num);
System.out.println("接收数据条数num:" + num);
}
connection.disconnect();
}
public void cancel() {
isRunning = false;
}
}
}
package com.esv.flink.bean;
import lombok.Data;
/**
* @description:
* @project: my-flink-project
* @name: com.esv.flink.bean.DataModelAlarmRule
* @author: chenfm
* @email: chenfengman@esvtek.com
* @createTime: 2020/8/6 19:27
* @version: 1.0
*/
@Data
public class DataModelAlarmRule {
/**
* 规则id
**/
private Long id;
/**
* 数据模型ID
**/
private Long modelId;
/**
* 告警规则(字典表)
**/
private Integer ruleExpression;
/**
* 告警等级(字典表)
**/
private Integer alarmLevel;
/**
* 阈值
**/
private String threshold;
/**
* 模型属性ID
**/
private Long propertyId;
/**
* 属性编码
**/
private String propertyCode;
/**
* 属性默认值
**/
private String propertyDefaultValue;
/**
* 属性名
**/
private String propertyName;
/**
* 属性类型
**/
private Integer propertyType;
/**
* 属性单位
**/
private String propertyUnit;
}
package com.esv.flink.bean;
import lombok.Data;
import java.sql.Timestamp;
/**
* @description:
* @project: my-flink-project
* @name: com.esv.flink.bean.DeviceDataAlarm
* @author: chenfm
* @email: chenfengman@esvtek.com
* @createTime: 2020/8/7 10:01
* @version: 1.0
*/
@Data
public class DeviceDataAlarm {
public DeviceDataAlarm() {
}
public DeviceDataAlarm(long deviceId, long alarmRuleId, Timestamp reportTime) {
this.deviceId = deviceId;
this.alarmRuleId = alarmRuleId;
this.reportTime = reportTime;
this.alarmTime = new Timestamp(System.currentTimeMillis());
}
/**
* description 设备id
* author chenfm
* createTime 2020/8/7 10:02
**/
private long deviceId;
/**
* description 告警规则id
* author chenfm
* createTime 2020/8/7 10:02
**/
private long alarmRuleId;
/**
* description 上报值
* author chenfm
* createTime 2020/8/7 15:22
**/
private String reportValue;
/**
* description 数据上报时间
* author chenfm
* createTime 2020/8/7 10:02
**/
private Timestamp reportTime;
/**
* description 告警时间
* author chenfm
* createTime 2020/8/7 10:02
**/
private Timestamp alarmTime;
}
package com.esv.flink.bean;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
/**
* @description:
* @project: my-flink-project
* @name: com.esv.flink.bean.EmqData
* @author: chenfm
* @email: chenfengman@esvtek.com
* @createTime: 2020/8/3 15:28
* @version: 1.0
*/
@Data
public class EmqData {
private long modelId;
private long deviceId;
private String topic;
private long reportTime;
private JSONObject jsonObject;
private int count;
public EmqData() {
}
public EmqData(String topic, String content, int count) {
this.topic = topic;
this.count = count;
this.modelId = Long.parseLong(topic.split("/")[2]);
this.deviceId = Long.parseLong(topic.split("/")[3]);
this.jsonObject = JSON.parseObject(content);
long timestamp;
if (jsonObject.containsKey("time")) {
timestamp = Long.parseLong(jsonObject.get("time").toString());
jsonObject.remove("time");
} else {
timestamp = System.currentTimeMillis();
}
this.reportTime = timestamp;
}
@Override
public String toString() {
return "EmqData{" +
"topic='" + topic + '\'' +
", content='" + jsonObject.toString() + '\'' +
", count=" + count +
'}';
}
}
package com.esv.flink.sink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.esv.flink.bean.DataModelAlarmRule;
import com.esv.flink.bean.DeviceDataAlarm;
import com.esv.flink.bean.EmqData;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
import java.math.BigDecimal;
import java.sql.*;
import java.util.List;
/**
* @description:
* @project: my-flink-project
* @name: com.esv.flink.sink.AlarmRedisSinkFunction
* @author: chenfm
* @email: chenfengman@esvtek.com
* @createTime: 2020/8/6 17:14
* @version: 1.0
*/
@Slf4j
public class AlarmRedisSinkFunction extends RichSinkFunction<EmqData> {
private String redisInfo;
private String mysqlInfo;
private transient Jedis jedis;
private transient Connection connection;
static {
//将mysql驱动注册到DriverManager中去
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
log.error(e.getMessage(), e);
}
}
public AlarmRedisSinkFunction(String redisInfo, String mysqlInfo) {
this.redisInfo = redisInfo;
this.mysqlInfo = mysqlInfo;
}
//获取数据库连接信息
private void getConnection() {
try {
String[] infoArray = this.mysqlInfo.split("\\$\\$");
//数据库链接
String url = "jdbc:mysql://" + infoArray[0] + "/data_center?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false";
String user = infoArray[1];
String password = infoArray[2];
//数据库连接信息
this.connection = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
jedis = new Jedis(redisInfo);
this.getConnection();
}
@Override
public void close() throws Exception {
super.close();
jedis.close();
connection.close();
}
@Override
public void invoke(EmqData emqData, Context context) throws Exception {
long modelId = emqData.getModelId();
String value = jedis.get("datacenter-iot-service::data_model::alarm_rule::" + modelId);
log.info("redis data model alarm rule value: {}", value);
if (StringUtils.isBlank(value)) {
return;
}
JSONObject jsonObject = emqData.getJsonObject();
if (value.startsWith("\"") && value.endsWith("\"")) {
value = value.substring(1, value.length() - 1);
value = StringEscapeUtils.unescapeJava(value);
}
List<DataModelAlarmRule> ruleList = JSON.parseArray(value, DataModelAlarmRule.class);
for (DataModelAlarmRule dataModelAlarmRule : ruleList) {
Long ruleId = this.checkRule(dataModelAlarmRule, jsonObject);
if (ruleId != null) {
// 触发告警, 保存到mysql
DeviceDataAlarm deviceDataAlarm =
new DeviceDataAlarm(emqData.getDeviceId(), ruleId, new Timestamp(emqData.getReportTime()));
deviceDataAlarm.setReportValue(jsonObject.getString(dataModelAlarmRule.getPropertyCode()));
this.saveAlarmToMysql(deviceDataAlarm);
}
}
}
/**
* description 保存告警信息到mysql
* param [deviceDataAlarm]
* return void
* author chenfm
* createTime 2020/8/7 11:23
**/
private void saveAlarmToMysql(DeviceDataAlarm deviceDataAlarm) {
String sql = "insert into device_data_alarm(device_id, alarm_rule_id, report_value, report_time, alarm_time)" +
" values(?, ?, ?, ?, ?)";
log.info("保存告警信息sql: {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.setLong(1, deviceDataAlarm.getDeviceId());
preparedStatement.setLong(2, deviceDataAlarm.getAlarmRuleId());
preparedStatement.setString(3, deviceDataAlarm.getReportValue());
preparedStatement.setTimestamp(4, deviceDataAlarm.getReportTime());
preparedStatement.setTimestamp(5, deviceDataAlarm.getAlarmTime());
preparedStatement.executeUpdate();
} catch (SQLException e) {
log.error(e.getMessage(), e);
}
}
/**
* description 判断是否满足告警条件
* param [dataModelAlarmRule, jsonObject]
* return java.lang.Long
* author chenfm
* createTime 2020/8/7 11:22
**/
private Long checkRule(DataModelAlarmRule dataModelAlarmRule, JSONObject jsonObject) {
int propertyType = dataModelAlarmRule.getPropertyType();
if (propertyType > 4) {
// 不是数值类型, 无法比较大小
return null;
}
String value = String.valueOf(jsonObject.get(dataModelAlarmRule.getPropertyCode()));
String threshold = dataModelAlarmRule.getThreshold();
switch (dataModelAlarmRule.getRuleExpression()) {
case 1: // 大于(>)
if ((propertyType != 1) && (stringValueCompare(value, threshold) > 0)) {
return dataModelAlarmRule.getId();
}
break;
case 2: // 小于(<)
if ((propertyType != 1) && (stringValueCompare(value, threshold) < 0)) {
return dataModelAlarmRule.getId();
}
break;
case 3: // 等于(=)
if ((propertyType == 1 && StringUtils.equals(threshold, value))
|| (propertyType != 1 && stringValueCompare(value, threshold) == 0)) {
return dataModelAlarmRule.getId();
}
break;
case 4: // 不等于(!=)
if ((propertyType == 1 && !StringUtils.equals(threshold, value))
|| (propertyType != 1 && stringValueCompare(value, threshold) != 0)) {
return dataModelAlarmRule.getId();
}
break;
default:
return null;
}
return null;
}
private int stringValueCompare(String value, String threshold) {
return new BigDecimal(value).compareTo(new BigDecimal(threshold));
}
}
package com.esv.flink.sink;
import com.alibaba.fastjson.JSONObject;
import com.esv.flink.bean.EmqData;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
/**
* @description:
* @project: my-flink-project
* @name: com.esv.flink.MyRichSinkFunction
* @author: chenfm
* @email: chenfengman@esvtek.com
* @createTime: 2020/8/3 15:26
* @version: 1.0
*/
@Slf4j
public class EmqDataRichSinkFunction extends RichSinkFunction<EmqData> {
private String postgresqlInfo;
private Connection connection = null;
static {
//将postgresql驱动注册到DriverManager中去
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
log.error(e.getMessage(), e);
}
}
public EmqDataRichSinkFunction(String postgresqlInfo) {
this.postgresqlInfo = postgresqlInfo;
}
//获取数据库连接信息
private void getConnection() {
try {
String[] infoArray = postgresqlInfo.split("\\$\\$");
//数据库链接
String url = infoArray[0];
String user = infoArray[1];
String password = infoArray[2];
//数据库连接信息
this.connection = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
private String makeInsertSql(String topic, JSONObject jsonObject) {
String tableName = getTableName(topic);
HashSet<String> keySet = new LinkedHashSet<>(jsonObject.keySet());
HashSet<String> valueSet = new LinkedHashSet<>(keySet.size());
keySet.forEach(s -> valueSet.add(String.valueOf(jsonObject.get(s))));
return "insert into "
+ tableName
+ "(time, device_id"
+ getSqlParams(keySet, false)
+ ") values (?,?"
+ getSqlParams(valueSet, true)
+ ")";
}
private String getTableName(String topic) {
String modelId = topic.split("/")[2];
return "iot_data_model_" + modelId;
}
private String getSqlParams(Collection collection, boolean isString) {
StringBuilder builder = new StringBuilder();
for (Object key : collection) {
builder.append(",");
if (isString) {
builder.append("'");
}
builder.append(key.toString());
if (isString) {
builder.append("'");
}
}
return builder.toString();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
log.info("EmqDataRichSinkFunction open.");
this.getConnection();
}
@Override
public void close() throws Exception {
super.close();
log.info("EmqDataRichSinkFunction close.");
if (this.connection != null) {
this.connection.close();
}
}
@Override
public void invoke(EmqData emqData, Context context) throws Exception {
log.info("EmqDataRichSinkFunction invoke. topic=" + emqData.getTopic());
String deviceId = emqData.getTopic().split("/")[3];
String sql = this.makeInsertSql(emqData.getTopic(), emqData.getJsonObject());
log.info("insert sql: {}", sql);
PreparedStatement pstmt = null;
try {
pstmt = connection.prepareStatement(sql);
pstmt.setTimestamp(1, new Timestamp(emqData.getReportTime()));
pstmt.setInt(2, Integer.parseInt(deviceId));
pstmt.executeUpdate();
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
if (pstmt != null) {
pstmt.close();
}
}
}
}
Manifest-Version: 1.0
Main-Class: com.esv.flink.StreamingJob
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment