Commit f5d2f496 authored by chenfm's avatar chenfm

从配置文件读取环境信息

parent dfe49283
......@@ -22,6 +22,7 @@ import com.esv.flink.bean.EmqData;
import com.esv.flink.sink.AlarmRedisSinkFunction;
import com.esv.flink.sink.EmqDataRichSinkFunction;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -36,7 +37,9 @@ import org.fusesource.mqtt.client.Future;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.Message;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
/**
* Skeleton for a Flink Streaming Job.
......@@ -57,17 +60,21 @@ public class StreamingJob {
log.info("start flink.");
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromArgs(new String[0]);
env.getConfig().setGlobalJobParameters(params);
String brokerUrl = params.get("brokerUrl", "tcp://192.168.0.122:1883");
String postgresqlInfo = params.get("postgresqlInfo", "jdbc:postgresql://192.168.0.17:54321/iot$$iot$$123456");
String redisInfo = params.get("redisInfo", "192.168.0.17");
String mysqlInfo = params.get("mysqlInfo", "192.168.0.17:3306/iot$$iot$$123456");
log.info("load param brokerUrl: {}", brokerUrl);
log.info("load param postgresqlInfo: {}", postgresqlInfo);
log.info("load param redisInfo: {}", redisInfo);
log.info("load param mysqlInfo: {}", mysqlInfo);
ParameterTool parameterTool = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(parameterTool);
String envParam = parameterTool.get("env");
log.info("run config env: {}", envParam);
if (StringUtils.isBlank(envParam)) {
throw new Exception("Parameter [env] is null, please input [--env] parameter.");
}
String fileName = "application-" + envParam + ".properties";
InputStream configStream = StreamingJob.class.getClassLoader().getResourceAsStream(fileName);
Properties properties = new Properties();
properties.load(configStream);
String brokerUrl = properties.getProperty("broker.url", "tcp://192.168.0.122:1883");
env.setParallelism(1);
EmqSource emqSource = new EmqSource(brokerUrl);
......@@ -85,9 +92,9 @@ public class StreamingJob {
}
}
});
RichSinkFunction<EmqData> emqDataRichSinkFunction = new EmqDataRichSinkFunction(postgresqlInfo);
RichSinkFunction<EmqData> emqDataRichSinkFunction = new EmqDataRichSinkFunction(properties);
emqDataStream.addSink(emqDataRichSinkFunction);
RichSinkFunction<EmqData> alarmRedisSinkFunction = new AlarmRedisSinkFunction(redisInfo, mysqlInfo);
RichSinkFunction<EmqData> alarmRedisSinkFunction = new AlarmRedisSinkFunction(properties);
emqDataStream.addSink(alarmRedisSinkFunction);
SingleOutputStreamOperator<Tuple2<String, Integer>> res = emqDataStream.map(new MapFunction<EmqData, Tuple2<String, Integer>>() {
......@@ -97,8 +104,8 @@ public class StreamingJob {
}
}).keyBy(0).sum(1);
if (params.has("output")) {
res.writeAsText(params.get("output"));
if (parameterTool.has("output")) {
res.writeAsText(parameterTool.get("output"));
} else {
log.info("Printing result to stdout. Use --output to specify output path.");
res.print();
......@@ -121,16 +128,17 @@ public class StreamingJob {
public void run(SourceContext<String> ctx) throws Exception {
EmqClient emqClient = new EmqClient(brokerUrl);
FutureConnection connection = emqClient.run();
int num = 0;
while (isRunning) {
Future<Message> future = connection.receive();
Message message = future.await();
num ++;
String topic = message.getTopic();
String context = new String(message.getPayload(), StandardCharsets.UTF_8);
ctx.collect(topic + "@@" + context);
log.info("接收数据条数num:" + num);
System.out.println("接收数据条数num:" + num);
if ((num ++) % 50 == 0) {
log.info("接收数据条数num: {}, topuc: {}, context: {}", num, topic, context);
}
}
connection.disconnect();
}
......
......@@ -15,6 +15,7 @@ import redis.clients.jedis.Jedis;
import java.math.BigDecimal;
import java.sql.*;
import java.util.List;
import java.util.Properties;
/**
* @description:
......@@ -28,8 +29,10 @@ import java.util.List;
@Slf4j
public class AlarmRedisSinkFunction extends RichSinkFunction<EmqData> {
private String redisInfo;
private String mysqlInfo;
private String redisUrl;
private String mysqlUrl;
private String mysqlUser;
private String mysqlPwd;
private transient Jedis jedis;
private transient Connection connection;
......@@ -43,33 +46,35 @@ public class AlarmRedisSinkFunction extends RichSinkFunction<EmqData> {
}
}
public AlarmRedisSinkFunction(String redisInfo, String mysqlInfo) {
this.redisInfo = redisInfo;
this.mysqlInfo = mysqlInfo;
public AlarmRedisSinkFunction(Properties properties) {
this.redisUrl = properties.getProperty("redis.url", "192.168.31.248");
this.mysqlUrl = properties.getProperty("mysql.url", "jdbc:mysql://192.168.31.248:3306/data_center?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false");
this.mysqlUser = properties.getProperty("mysql.user", "data-center");
this.mysqlPwd = properties.getProperty("mysql.pwd", "123456");
}
//获取数据库连接信息
private void getConnection() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
jedis = new Jedis(redisUrl);
log.info("redis connect success: {}", redisUrl);
} catch (Exception e) {
log.error("redis connect failed.");
log.error(e.getMessage(), e);
throw(e);
}
try {
String[] infoArray = this.mysqlInfo.split("\\$\\$");
//数据库链接
String url = "jdbc:mysql://" + infoArray[0] + "?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false";
String user = infoArray[1];
String password = infoArray[2];
//数据库连接信息
this.connection = DriverManager.getConnection(url, user, password);
this.connection = DriverManager.getConnection(mysqlUrl, mysqlUser, mysqlPwd);
log.info("mysql connect success: {}", mysqlUrl);
} catch (Exception e) {
log.error("mysql connect failed.");
log.error(e.getMessage(), e);
throw(e);
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
jedis = new Jedis(redisInfo);
this.getConnection();
}
@Override
public void close() throws Exception {
super.close();
......@@ -81,7 +86,7 @@ public class AlarmRedisSinkFunction extends RichSinkFunction<EmqData> {
public void invoke(EmqData emqData, Context context) throws Exception {
long modelId = emqData.getModelId();
String value = jedis.get("datacenter-iot-service::data_model::alarm_rule::" + modelId);
log.info("redis data model alarm rule value: {}", value);
log.debug("redis data model alarm rule value: {}", value);
if (StringUtils.isBlank(value)) {
return;
}
......
......@@ -13,6 +13,7 @@ import java.sql.Timestamp;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Properties;
/**
* @description:
......@@ -26,8 +27,11 @@ import java.util.LinkedHashSet;
@Slf4j
public class EmqDataRichSinkFunction extends RichSinkFunction<EmqData> {
private String postgresqlInfo;
private Connection connection = null;
private String postgresUrl;
private String postgresUser;
private String postgresPwd;
private transient Connection connection = null;
static {
//将postgresql驱动注册到DriverManager中去
......@@ -38,23 +42,10 @@ public class EmqDataRichSinkFunction extends RichSinkFunction<EmqData> {
}
}
public EmqDataRichSinkFunction(String postgresqlInfo) {
this.postgresqlInfo = postgresqlInfo;
}
//获取数据库连接信息
private void getConnection() {
try {
String[] infoArray = postgresqlInfo.split("\\$\\$");
//数据库链接
String url = infoArray[0];
String user = infoArray[1];
String password = infoArray[2];
//数据库连接信息
this.connection = DriverManager.getConnection(url, user, password);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
public EmqDataRichSinkFunction(Properties properties) {
this.postgresUrl = properties.getProperty("postgres.url", "jdbc:postgresql://192.168.31.248:54321/iot");
this.postgresUser = properties.getProperty("postgres.user", "iot");
this.postgresPwd = properties.getProperty("postgres.pwd", "123456");
}
private String makeInsertSql(String topic, JSONObject jsonObject) {
......@@ -96,7 +87,15 @@ public class EmqDataRichSinkFunction extends RichSinkFunction<EmqData> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
log.info("EmqDataRichSinkFunction open.");
this.getConnection();
try {
//数据库链接
this.connection = DriverManager.getConnection(postgresUrl, postgresUser, postgresPwd);
log.info("postgres connect success: {}", postgresUrl);
} catch (Exception e) {
log.error("postgres connect failed.");
log.error(e.getMessage(), e);
throw(e);
}
}
@Override
......@@ -111,11 +110,11 @@ public class EmqDataRichSinkFunction extends RichSinkFunction<EmqData> {
@Override
public void invoke(EmqData emqData, Context context) throws Exception {
log.info("EmqDataRichSinkFunction invoke. topic=" + emqData.getTopic());
log.debug("EmqDataRichSinkFunction invoke. topic=" + emqData.getTopic());
String deviceId = emqData.getTopic().split("/")[3];
String sql = this.makeInsertSql(emqData.getTopic(), emqData.getJsonObject());
log.info("insert sql: {}", sql);
log.debug("insert sql: {}", sql);
PreparedStatement pstmt = null;
try {
pstmt = connection.prepareStatement(sql);
......
broker.url= tcp://192.168.31.248:1883
postgres.url=jdbc:postgresql://192.168.31.248:5432/iot
postgres.user=iot
postgres.pwd=123456
redis.url=192.168.31.248
mysql.url=jdbc:mysql://192.168.31.248:3306/data_center?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
mysql.user=data_center
mysql.pwd=123456
\ No newline at end of file
broker.url= tcp://192.168.0.122:1883
postgres.url=jdbc:postgresql://192.168.0.17:54321/iot
postgres.user=iot
postgres.pwd=123456
redis.url=192.168.0.17
mysql.url=jdbc:mysql://192.168.0.17:3306/iot?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
mysql.user=iot
mysql.pwd=123456
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment