Commit fdc55079 authored by chenfm's avatar chenfm

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	src/main/java/com/esv/datacenter/iot/module/devicemodel/service/DeviceInstanceService.java
#	src/main/java/com/esv/datacenter/iot/module/devicemodel/service/impl/DeviceInstanceServiceImpl.java
parents b741784f 431f4572
package com.esv.datacenter.iot.common.component;
import lombok.Data;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
/**
* @description:
* @author: huangchaobin@esvtek.com
* @createTime: 2020/08/13 15:39
* @version:1.0
*/
@Data
public class MqttAcl {
/**
*
*/
private Integer id;
/**
* 0: deny, 1: allow
*/
private Integer allow;
/**
* IpAddress
*/
private String ipAddr;
/**
* Username
*/
private String username;
/**
* ClientId
*/
private String clientId;
/**
* 1: subscribe, 2: publish, 3: pubsub
*/
private Integer access;
/**
* Topic Filter
*/
private String topic;
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.JSON_STYLE);
}
}
package com.esv.datacenter.iot.common.component;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
/**
* @description:
* @author: huangchaobin@esvtek.com
* @createTime: 2020/08/13 15:36
* @version:1.0
*/
@Component
@RefreshScope
@Slf4j
public class MqttClientAuthComponent {
@Value("${emq.data-source.jdbc-url}")
private String jdbcUrl;
@Value("${emq.data-source.driver-class-name}")
private String driverClassName;
@Value("${emq.data-source.validation-query}")
private String validationQuery;
@Value("${emq.data-source.username}")
private String username;
@Value("${emq.data-source.password}")
private String password;
@Value("${emq.data-source.connection-timeout}")
private Long connectionTimeout;
@Value("${emq.data-source.minimum-idle}")
private Integer minimumIdle;
@Value("${emq.data-source.maximum-pool-size}")
private Integer maximumPoolSize;
@Value("${emq.data-source.max-lifetime}")
private Long maxLifetime;
@Value("${emq.client-auth.table-name}")
private String tableName;
@Value("${emq.client-auth.username}")
private String clientAuthUserName;
@Autowired
private DynamicDataSource dynamicDataSource;
/**
* @description 保存client Topic发布/订阅权限
* @param mqttAclList:
* @return void
* @author huangChaobin@esvtek.com
* @createTime 2020/08/13 16:06
**/
public void saveClientAcl(List<MqttAcl> mqttAclList) {
HikariDataSource dataSource = this.getHikariDataSource4Transaction();
String clientId = mqttAclList.get(0).getClientId();
try {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
// 删除clientId原有记录
StringBuffer sb = new StringBuffer();
sb.append("delete from ").append(tableName).append(" where clientid='").append(clientId).append("';");
String deleteSql = sb.toString();
log.info("删除clientId原有记录SQL:{}", deleteSql);
jdbcTemplate.execute(deleteSql);
// 新增clientId记录
for (MqttAcl mqttAcl : mqttAclList) {
sb = new StringBuffer();
sb.append("INSERT INTO ").append(tableName)
.append("(allow, username, clientid, access, topic) VALUES(")
.append(mqttAcl.getAllow())
.append(",'").append(clientAuthUserName).append("'")
.append(",'").append(mqttAcl.getClientId()).append("'")
.append(",").append(mqttAcl.getAccess())
.append(",'").append(mqttAcl.getTopic()).append("'")
.append(");");
String insertSql = sb.toString();
log.info("新增clientId记录SQL:{}", insertSql);
jdbcTemplate.execute(insertSql);
}
} catch (Exception e) {
log.error("保存[clientId={}]Topic发布/订阅权限失败", clientId);
log.error(e.getMessage(), e);
throw e;
} finally {
// 关闭数据源
if (Objects.nonNull(dataSource) && !dataSource.isClosed()) {
dataSource.close();
}
}
}
/**
* @description 删除clientId记录
* @param clientId:
* @return void
* @author huangChaobin@esvtek.com
* @createTime 2020/08/13 16:54
**/
public void deleteClientAcl(String clientId) {
HikariDataSource dataSource = this.getHikariDataSource4Transaction();
try {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
// 删除clientId原有记录
StringBuffer sb = new StringBuffer();
sb.append("delete from ").append(tableName).append(" where clientid='").append(clientId).append("';");
String deleteSql = sb.toString();
log.info("删除clientId记录SQL:{}", deleteSql);
jdbcTemplate.execute(deleteSql);
} catch (Exception e) {
log.error("删除[clientId={}]Topic发布/订阅权限失败", clientId);
log.error(e.getMessage(), e);
throw e;
} finally {
// 关闭数据源
if (Objects.nonNull(dataSource) && !dataSource.isClosed()) {
dataSource.close();
}
}
}
private HikariDataSource getHikariDataSource4Transaction() {
HikariDataSource dataSource = this.dynamicDataSource.getDynamicDataSource4Transaction(initDataSourceConfig());
return dataSource;
}
private DataSourceConfig initDataSourceConfig() {
DataSourceConfig dataSourceConfig = new DataSourceConfig(jdbcUrl, driverClassName, username, password,
validationQuery, connectionTimeout, minimumIdle, maximumPoolSize, maxLifetime);
return dataSourceConfig;
}
}
......@@ -81,8 +81,9 @@ public class TimescaleComponent {
String table = tablePrefix + modelId;
try {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
// 校验表名是否已存在
if (checkTableExits(dataSource, table)) {
if (checkTableExits(jdbcTemplate, table)) {
log.warn("创建表失败,表[{}]已存在", table);
return false;
}
......@@ -96,7 +97,6 @@ public class TimescaleComponent {
log.info("超表SQL:{}", hyperSql);
// 使用jdbcTemplate来执行sql
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
jdbcTemplate.execute(tableSql);
jdbcTemplate.execute(indexSql);
jdbcTemplate.execute(hyperSql);
......@@ -128,7 +128,8 @@ public class TimescaleComponent {
try {
// 校验表名是否已存在
if (!checkTableExits(dataSource, table)) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
if (!checkTableExits(jdbcTemplate, table)) {
log.warn("删除表失败,表[{}]不存在", table);
return false;
}
......@@ -138,7 +139,6 @@ public class TimescaleComponent {
log.info("删除表SQL:{}", deleteTableSql);
// 使用jdbcTemplate来执行sql
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
jdbcTemplate.execute(deleteTableSql);
log.info("删除表[{}]成功", table);
} catch (Exception e) {
......@@ -228,7 +228,7 @@ public class TimescaleComponent {
return sb.toString();
}
private Boolean checkTableExits(HikariDataSource dataSource, String table) {
private Boolean checkTableExits(JdbcTemplate jdbcTemplate, String table) {
StringBuffer sb = new StringBuffer();
sb.append("select t1.tablename")
.append(" from pg_tables t1, pg_class t2")
......@@ -239,7 +239,6 @@ public class TimescaleComponent {
String sql = sb.toString();
// 使用jdbcTemplate来执行sql
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List<Map<String, Object>> resultMapList = jdbcTemplate.queryForList(sql);
if (null != resultMapList && 0 < resultMapList.size()) {
return true;
......
......@@ -41,10 +41,11 @@ public interface DeviceInstanceDao extends BaseMapper<DeviceInstanceEntity> {
/**
* @description 查询设备列表
* @param dto:
* @return java.util.List<com.esv.datacenter.iot.module.devicemodel.dto.DeviceInstanceDto>
* @author huangChaobin@esvtek.com
* @createTime 2020/08/12 15:33
* @createTime 2020/08/13 16:58
**/
List<DeviceInstanceDto> select4List();
List<DeviceInstanceDto> select4List(DeviceInstanceDto dto);
}
......@@ -14,6 +14,7 @@ import java.util.Date;
*/
@Data
public class DeviceInstanceDto {
/**
* 主键
*/
......@@ -55,7 +56,6 @@ public class DeviceInstanceDto {
**/
private Date onlineUpdateTime;
/**
* 创建者
*/
......
......@@ -115,7 +115,7 @@ public interface DeviceInstanceService extends IService<DeviceInstanceEntity> {
* @author huangChaobin@esvtek.com
* @createTime 2020/08/12 15:34
**/
List<DeviceInstanceDto> get4List();
List<DeviceInstanceDto> get4List(DeviceInstanceDto dto);
/**
* description 根据通信id更新设备在线状态
......@@ -135,5 +135,16 @@ public interface DeviceInstanceService extends IService<DeviceInstanceEntity> {
**/
DeviceInstanceVO deviceInstanceDetail(Long deviceInstanceId);
/**
* @description 保存client Mqtt Topic发布/订阅权限
* @param deviceTypeId:
* @param deviceInstanceId:
* @param clientId:
* @return void
* @author huangChaobin@esvtek.com
* @createTime 2020/08/13 16:44
**/
void saveInstanceMqttTopicAuth(Long deviceTypeId, Long deviceInstanceId, String clientId);
}
......@@ -2,14 +2,20 @@ package com.esv.datacenter.iot.module.devicemodel.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.esv.datacenter.iot.common.component.MqttAcl;
import com.esv.datacenter.iot.common.component.MqttClientAuthComponent;
import com.esv.datacenter.iot.module.datamodel.entity.DataModelEntity;
import com.esv.datacenter.iot.module.devicemodel.dao.DeviceDataMapDao;
import com.esv.datacenter.iot.module.devicemodel.dto.DeviceInstanceDto;
import com.esv.datacenter.iot.module.devicemodel.entity.DeviceDataMapEntity;
import com.esv.datacenter.iot.module.devicemodel.service.DeviceDataMapService;
import com.esv.datacenter.iot.module.devicemodel.service.DeviceInstanceService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
......@@ -17,6 +23,12 @@ import java.util.Objects;
@Service("deviceDataMapService")
public class DeviceDataMapServiceImpl extends ServiceImpl<DeviceDataMapDao, DeviceDataMapEntity> implements DeviceDataMapService {
@Autowired
MqttClientAuthComponent mqttClientAuthComponent;
@Autowired
DeviceInstanceService deviceInstanceService;
@Override
public int getCountByDataModelId(Long dataModelId) {
return this.getBaseMapper().selectCount(new LambdaQueryWrapper<DeviceDataMapEntity>()
......@@ -34,8 +46,9 @@ public class DeviceDataMapServiceImpl extends ServiceImpl<DeviceDataMapDao, Devi
public void saveDeviceDataMap(Long deviceTypeId, String dataModelIds) {
this.getBaseMapper().delete(new LambdaQueryWrapper<DeviceDataMapEntity>().eq(DeviceDataMapEntity::getDeviceTypeId, deviceTypeId));
String[] dataModelIdList = null;
if (Objects.nonNull(StringUtils.trimToNull(dataModelIds))) {
String[] dataModelIdList = dataModelIds.split(",");
dataModelIdList = dataModelIds.split(",");
for (String dataModelId : dataModelIdList) {
DeviceDataMapEntity entity = new DeviceDataMapEntity();
entity.setDeviceTypeId(deviceTypeId);
......@@ -43,6 +56,33 @@ public class DeviceDataMapServiceImpl extends ServiceImpl<DeviceDataMapDao, Devi
this.getBaseMapper().insert(entity);
}
}
// 更新设备实例的Mqtt Topic权限
DeviceInstanceDto deviceInstanceDto = new DeviceInstanceDto();
deviceInstanceDto.setDeviceTypeId(deviceTypeId);
List<DeviceInstanceDto> dtoList = deviceInstanceService.get4List(deviceInstanceDto);
String communicationId;
Long deviceInstanceId;
for (DeviceInstanceDto dto : dtoList) {
communicationId = dto.getCommunicationId();
mqttClientAuthComponent.deleteClientAcl(communicationId);
deviceInstanceId = dto.getId();
if (Objects.nonNull(dataModelIdList)) {
List<MqttAcl> mqttAclList = new ArrayList<>();
for (String dataModelId : dataModelIdList) {
MqttAcl mqttAcl = new MqttAcl();
mqttAcl.setAllow(1);
mqttAcl.setClientId(communicationId);
mqttAcl.setAccess(2);
StringBuffer sb = new StringBuffer();
sb.append("$esv/iot/").append(dataModelId).append(deviceTypeId).append(deviceInstanceId).append("/data/upload");
mqttAcl.setTopic(sb.toString());
mqttAclList.add(mqttAcl);
}
mqttClientAuthComponent.saveClientAcl(mqttAclList);
}
}
}
@Override
......@@ -51,4 +91,5 @@ public class DeviceDataMapServiceImpl extends ServiceImpl<DeviceDataMapDao, Devi
entity.setDeviceTypeId(deviceTypeId);
return this.getBaseMapper().selectDataModelByDeviceTypeId(entity);
}
}
\ No newline at end of file
......@@ -5,18 +5,24 @@ import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.esv.datacenter.iot.common.component.MqttAcl;
import com.esv.datacenter.iot.common.component.MqttClientAuthComponent;
import com.esv.datacenter.iot.common.exception.EException;
import com.esv.datacenter.iot.common.response.ECode;
import com.esv.datacenter.iot.common.vo.PageResultVO;
import com.esv.datacenter.iot.module.datamodel.entity.DataModelEntity;
import com.esv.datacenter.iot.module.devicemodel.dao.DeviceInstanceDao;
import com.esv.datacenter.iot.module.devicemodel.dto.DeviceInstanceDto;
import com.esv.datacenter.iot.module.devicemodel.entity.DeviceInstanceEntity;
import com.esv.datacenter.iot.module.devicemodel.form.DeviceInstanceForm;
import com.esv.datacenter.iot.module.devicemodel.service.DeviceDataMapService;
import com.esv.datacenter.iot.module.devicemodel.service.DeviceInstanceService;
import com.esv.datacenter.iot.module.devicemodel.vo.DeviceInstanceVO;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.ArrayList;
......@@ -28,6 +34,12 @@ import java.util.Objects;
@Service("deviceInstanceService")
public class DeviceInstanceServiceImpl extends ServiceImpl<DeviceInstanceDao, DeviceInstanceEntity> implements DeviceInstanceService {
@Autowired
MqttClientAuthComponent mqttClientAuthComponent;
@Autowired
DeviceDataMapService deviceDataMapService;
@Override
public int getInstanceCountByTypeId(Long deviceTypeId) {
return this.getBaseMapper().selectCount(new LambdaQueryWrapper<DeviceInstanceEntity>()
......@@ -35,6 +47,7 @@ public class DeviceInstanceServiceImpl extends ServiceImpl<DeviceInstanceDao, De
}
@Override
@Transactional(rollbackFor = Exception.class)
public Long insertDeviceInstance(DeviceInstanceForm form) {
int count = this.getBaseMapper().selectCount(new LambdaQueryWrapper<DeviceInstanceEntity>()
.eq(DeviceInstanceEntity::getName, form.getName()));
......@@ -44,15 +57,25 @@ public class DeviceInstanceServiceImpl extends ServiceImpl<DeviceInstanceDao, De
DeviceInstanceEntity entity = new DeviceInstanceEntity();
BeanUtils.copyProperties(form, entity);
entity.setCommunicationId(UUID.randomUUID().toString().replaceAll("-", ""));
String communicationId = UUID.randomUUID().toString().replaceAll("-", "");
entity.setCommunicationId(communicationId);
this.getBaseMapper().insert(entity);
Long deviceInstanceId = entity.getId();
return entity.getId();
// 保存client Mqtt Topic发布/订阅权限
this.saveInstanceMqttTopicAuth(form.getDeviceTypeId(), deviceInstanceId, communicationId);
return deviceInstanceId;
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteInstance(Long id) {
this.getBaseMapper().deleteById(id);
// 删除设备Mqtt Topic权限记录
DeviceInstanceEntity deviceInstanceEntity = this.getInstanceById(id);
mqttClientAuthComponent.deleteClientAcl(deviceInstanceEntity.getCommunicationId());
}
@Override
......@@ -145,8 +168,8 @@ public class DeviceInstanceServiceImpl extends ServiceImpl<DeviceInstanceDao, De
}
@Override
public List<DeviceInstanceDto> get4List() {
return this.getBaseMapper().select4List();
public List<DeviceInstanceDto> get4List(DeviceInstanceDto dto) {
return this.getBaseMapper().select4List(dto);
}
@Override
......@@ -182,4 +205,26 @@ public class DeviceInstanceServiceImpl extends ServiceImpl<DeviceInstanceDao, De
return deviceInstanceVO;
}
@Override
public void saveInstanceMqttTopicAuth(Long deviceTypeId, Long deviceInstanceId, String clientId) {
// 获取设备对应的数据模型
List<DataModelEntity> dataModelEntityList = deviceDataMapService.getDataModelByDeviceTypeId(deviceTypeId);
if (Objects.isNull(dataModelEntityList) || 0 == dataModelEntityList.size()) {
return;
}
List<MqttAcl> mqttAclList = new ArrayList<>();
for (DataModelEntity dataModelEntity : dataModelEntityList) {
MqttAcl mqttAcl = new MqttAcl();
mqttAcl.setAllow(1);
mqttAcl.setClientId(clientId);
mqttAcl.setAccess(2);
StringBuffer sb = new StringBuffer();
sb.append("$esv/iot/").append(dataModelEntity.getId()).append(deviceTypeId).append(deviceInstanceId).append("/data/upload");
mqttAcl.setTopic(sb.toString());
mqttAclList.add(mqttAcl);
}
mqttClientAuthComponent.saveClientAcl(mqttAclList);
}
}
\ No newline at end of file
......@@ -154,7 +154,7 @@ public class DeviceTypeServiceImpl extends ServiceImpl<DeviceTypeDao, DeviceType
@Override
public List<DeviceTypeStatisticsVO> getDeviceTypeStatistics() {
// 获取所有设备列表
List<DeviceInstanceDto> dtoList = deviceInstanceService.get4List();
List<DeviceInstanceDto> dtoList = deviceInstanceService.get4List(new DeviceInstanceDto());
// TODO,获取所有在线设备通信ID
......
......@@ -86,3 +86,20 @@ timescale:
table-field:
map: string-text,number-numeric,integer-int8,boolean-bit(1),date-date,time-int4,datetime-timestamptz
table-prefix: iot_data_model_
emq:
api:
url: 192.168.31.248:18083
auth: Basic YWRtaW46cHVibGlj
client-auth:
table-name: mqtt_acl
username: esv_mqtt_client
data-source:
jdbc-url: jdbc:mysql://192.168.31.248:3306/emq_mqtt
driver-class-name: com.mysql.cj.jdbc.Driver
validation-query: SELECT 1
username: emq_mqtt
password: 123456
connection-timeout: 10000
minimum-idle: 1
maximum-pool-size: 1
max-lifetime: 0
\ No newline at end of file
......@@ -90,3 +90,16 @@ emq:
api:
url: 192.168.31.248:18083
auth: Basic YWRtaW46cHVibGlj
client-auth:
table-name: mqtt_acl
username: esv_mqtt_client
data-source:
jdbc-url: jdbc:mysql://192.168.31.248:3306/emq_mqtt
driver-class-name: com.mysql.cj.jdbc.Driver
validation-query: SELECT 1
username: emq_mqtt
password: 123456
connection-timeout: 10000
minimum-idle: 1
maximum-pool-size: 1
max-lifetime: 0
\ No newline at end of file
......@@ -55,11 +55,15 @@
group by device_type_id
</select>
<select id="select4List" resultType="com.esv.datacenter.iot.module.devicemodel.dto.DeviceInstanceDto">
<select id="select4List" parameterType="com.esv.datacenter.iot.module.devicemodel.dto.DeviceInstanceDto"
resultType="com.esv.datacenter.iot.module.devicemodel.dto.DeviceInstanceDto">
select a.id, a.device_type_id,a.communication_id,
b.name as deviceTypeName
from device_instance a, device_type b
where a.device_type_id = b.id and a.deleted = false and b.deleted = false
<if test="deviceTypeId != null">
and a.device_type_id = #{deviceTypeId}
</if>
ORDER BY b.name ASC
</select>
......
package com.esv.datacenter.iot.common.component;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: huangchaobin@esvtek.com
* @createTime: 2020/08/13 16:07
* @version:1.0
*/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class MqttClientAuthComponentTest {
@Autowired
private MqttClientAuthComponent mqttClientAuthComponent;
@Test
public void saveClientAcl_test() {
List<MqttAcl> mqttAclList = new ArrayList<>();
MqttAcl mqttAcl = new MqttAcl();
mqttAcl.setAllow(1);
mqttAcl.setClientId(String.valueOf(System.currentTimeMillis()));
mqttAcl.setAccess(2);
mqttAcl.setTopic("$esv/iot/1/2/3/data/upload");
mqttAclList.add(mqttAcl);
mqttClientAuthComponent.saveClientAcl(mqttAclList);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment