Commit 6aca7e80 authored by chenfm's avatar chenfm

连接MQTT增加账号信息

parent f5d2f496
...@@ -6,6 +6,8 @@ import org.fusesource.mqtt.client.MQTT; ...@@ -6,6 +6,8 @@ import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.client.Topic;
import java.util.Properties;
/** /**
* @description: * @description:
* @project: emqdemo * @project: emqdemo
...@@ -19,11 +21,17 @@ import org.fusesource.mqtt.client.Topic; ...@@ -19,11 +21,17 @@ import org.fusesource.mqtt.client.Topic;
public class EmqClient { public class EmqClient {
private String broker; private String broker;
private String subTopic = "$esv/iot/#"; private String subTopic;
private String clientId = "subscribe_emqx_flink"; private String clientId;
private String username;
private String password;
public EmqClient(String broker) { public EmqClient(Properties properties) {
this.broker = broker; this.broker = properties.getProperty("mqtt.host");
this.subTopic = properties.getProperty("mqtt.subscribe.topic");
this.clientId = properties.getProperty("mqtt.client.id");
this.username = properties.getProperty("mqtt.username");
this.password = properties.getProperty("mqtt.password");
} }
public FutureConnection run () { public FutureConnection run () {
...@@ -36,6 +44,8 @@ public class EmqClient { ...@@ -36,6 +44,8 @@ public class EmqClient {
mqtt.setKeepAlive((short) 30); mqtt.setKeepAlive((short) 30);
mqtt.setSendBufferSize(64); mqtt.setSendBufferSize(64);
mqtt.setClientId(clientId); mqtt.setClientId(clientId);
mqtt.setUserName(username);
mqtt.setPassword(password);
Topic[] topics = { Topic[] topics = {
new Topic(subTopic, QoS.AT_MOST_ONCE) new Topic(subTopic, QoS.AT_MOST_ONCE)
...@@ -44,13 +54,11 @@ public class EmqClient { ...@@ -44,13 +54,11 @@ public class EmqClient {
FutureConnection connection = mqtt.futureConnection(); FutureConnection connection = mqtt.futureConnection();
connection.connect(); connection.connect();
connection.subscribe(topics); connection.subscribe(topics);
log.info("mqtt连接成功: {}", broker);
log.info("mqtt连接成功");
return connection; return connection;
} catch (Exception e) {
} catch (Exception me) { log.error(e.getMessage(), e);
me.printStackTrace();
} }
return null; return null;
} }
......
...@@ -77,7 +77,7 @@ public class StreamingJob { ...@@ -77,7 +77,7 @@ public class StreamingJob {
String brokerUrl = properties.getProperty("broker.url", "tcp://192.168.0.122:1883"); String brokerUrl = properties.getProperty("broker.url", "tcp://192.168.0.122:1883");
env.setParallelism(1); env.setParallelism(1);
EmqSource emqSource = new EmqSource(brokerUrl); EmqSource emqSource = new EmqSource(properties);
DataStream<String> inStream = env.addSource(emqSource); DataStream<String> inStream = env.addSource(emqSource);
inStream.print(); inStream.print();
...@@ -113,20 +113,20 @@ public class StreamingJob { ...@@ -113,20 +113,20 @@ public class StreamingJob {
// res.writeAsText("D://flink_result.txt"); // res.writeAsText("D://flink_result.txt");
// execute program // execute program
env.execute("Streaming WordCount"); env.execute("Streaming DataCenter IOT.");
} }
public static class EmqSource implements ParallelSourceFunction<String> { public static class EmqSource implements ParallelSourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true; private volatile boolean isRunning = true;
private String brokerUrl; private Properties properties;
public EmqSource(String brokerUrl) { public EmqSource(Properties properties) {
this.brokerUrl = brokerUrl; this.properties = properties;
} }
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
EmqClient emqClient = new EmqClient(brokerUrl); EmqClient emqClient = new EmqClient(properties);
FutureConnection connection = emqClient.run(); FutureConnection connection = emqClient.run();
int num = 0; int num = 0;
......
broker.url= tcp://192.168.31.248:1883 mqtt.host=tcp://192.168.31.248:1883
mqtt.username=esv_mqtt_server
mqtt.password=123456
mqtt.subscribe.topic=$esv/iot/#
mqtt.client.id=subscribe_emqx_flink
postgres.url=jdbc:postgresql://192.168.31.248:5432/iot postgres.url=jdbc:postgresql://192.168.31.248:5432/iot
postgres.user=iot postgres.user=iot
postgres.pwd=123456 postgres.pwd=123456
......
broker.url= tcp://192.168.0.122:1883 mqtt.host=tcp://192.168.0.122:1883
mqtt.username=esv_mqtt_server
mqtt.password=123456
mqtt.subscribe.topic=$esv/iot/#
mqtt.client.id=subscribe_emqx_flink
postgres.url=jdbc:postgresql://192.168.0.17:54321/iot postgres.url=jdbc:postgresql://192.168.0.17:54321/iot
postgres.user=iot postgres.user=iot
postgres.pwd=123456 postgres.pwd=123456
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment