package com.hs.admin.job; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import com.hs.admin.base.CodeMessageEnum; import com.hs.admin.base.HsRuntimeException; import com.hs.admin.base.RabbitMqConstant; import com.hs.admin.base.Result; import com.hs.admin.bean.*; import com.hs.admin.dao.AuthTokenDao; import com.hs.admin.dao.JcbDao; import com.hs.admin.dao.SysTaskDao; import com.hs.admin.dao.TableLogDao; import com.hs.admin.scheduler.TokenUtil; import com.hs.admin.util.*; import com.hs.admin.vo.ResponseVO; import com.hs.admin.vo.ResultVO; import org.quartz.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.ObjectUtils; import javax.annotation.Resource; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * @author xieheng */ @DisallowConcurrentExecution @Component public class Select implements Job { @Resource RabbitTemplate rabbitTemplate; @Resource JcbDao jcbDao; @Resource SysTaskDao sysTaskDao; @Resource AuthTokenDao authTokenDao; @Resource Scheduler scheduler; @Resource TokenUtil tokenUtil; @Resource TableLogDao tableLogDao; @Value("${hs_cloud_last_time}") private String lastTime; @Value("${hs_cloud_last_time_name}") private String lastTimeKey; @Value("${ocid_client_id}") private String clientId; @Value("${ocid_client_secret}") private String clientSecret; @Value("${ocid_username}") private String username; @Value("${ocid_scope}") private String scope; @Value("${ocid_password}") private String password; @Value("${ocid_upload_url}") private String uploadUrl; @Override @Transactional(rollbackFor = Exception.class) public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { SysTask sysTask = null; ResponseVO vo = new ResponseVO(); JobDataMap jobDataMap = jobExecutionContext.getTrigger().getJobDataMap(); String triggerTask = jobDataMap.get("triggerTask").toString(); sysTask = JsonUtils.jsonToBean(triggerTask, SysTask.class); LogUtil.info(">>>>>>>>客户端开始执行定时任务,参数:" + sysTask); if (sysTask == null) { return; } String jcbId = sysTask.getJcbId(); //操作类型 1查询,2上传,3下发,4执行 String ctype = sysTask.getCtype(); MqData mqData = JsonUtils.jsonToBean(sysTask.getContent(), MqData.class); if (ObjectUtils.isEmpty(mqData.getSql())) { JSONObject jsonObject = JSONObject.parseObject(sysTask.getContent()); String content = jsonObject.getString("content"); mqData = JsonUtils.jsonToBean(content, MqData.class); } //根据jcbId选择db操作todo DataBase dbInfo = jcbDao.getDbInfo(jcbId); sysTask.setState(0); JdbcTemplate jdbcTemplate = DbUtil.getJdbcTemplate(dbInfo); if ("1".equals(ctype)) { //查询操作 vo = selectOperate(mqData, jdbcTemplate, vo); } else if ("2".equals(ctype)) { //上传操作 vo = uploadOperate(jcbId, mqData, jdbcTemplate, vo); } else if ("3".equals(ctype)) { //下发操作 vo = sendOperate(mqData, jdbcTemplate, vo); } else { return; } //参数 vo.setParams(sysTask); SysTask task = sysTaskDao.findTaskByJcbIdAndCid(sysTask.getJcbId(), sysTask.getCid()); //查询每次只更新 if (task != null && "1".equals(sysTask.getCtype())) { task.setState(sysTask.getState()); task.setContent(JsonUtils.beanToJson(vo)); task.setCreateTime(new Date()); sysTaskDao.updateByPrimaryKey(task); } else { sysTask.setContent(JsonUtils.beanToJson(vo)); sysTask.setCreateTime(new Date()); sysTaskDao.insert(sysTask.init()); //保存完置空,服务器不需要内容 ResultVO resultVO = vo.getResultVO(); resultVO.setResult(""); vo.setResultVO(resultVO); } //更新租户最新活跃时间 jcbDao.updateModifyTime(sysTask.getJcbId(), System.currentTimeMillis()); //将db返回结果发送消息队列 //重新组装参数返回服务端 HashMap map = new HashMap<>(); map.put("jcbId", jcbId); map.put("taskId", sysTask.getId()); vo.setParams(map); rabbitTemplate.convertAndSend(RabbitMqConstant.MSG_QUEUE, JsonUtils.beanToJson(vo)); LogUtil.info(">>>>>>>客户端任务执行成功,发送消息到服务端:task=" + JsonUtils.beanToJson(vo)); } /** * @description: 下发操作 * @author: XieHeng * @param: mqData * @param: jdbcTemplate * @return: Result * @date: 2021/5/30 12:40 下午 */ private ResponseVO sendOperate(MqData mqData, JdbcTemplate jdbcTemplate, ResponseVO vo) { String sql = mqData.getSql(); List> mapList = null; if (sql.toLowerCase().startsWith("select")) { BeanPropertyRowMapper rowMapper = new BeanPropertyRowMapper<>(Map.class); mapList = jdbcTemplate.queryForList(sql); } else { try { int i = jdbcTemplate.update(sql); } catch (DataAccessException e) { vo.setResultVO(new ResultVO(mapList, false, "false")); return vo; } } vo.setResultVO(new ResultVO(mapList, true, "success")); return vo; } /** * @description: 查询操作 * @author: XieHeng * @param: mqData * @param: jcbId * @return: String * @date: 2021/5/22 10:11 上午 */ private ResponseVO selectOperate(MqData mqData, JdbcTemplate jdbcTemplate, ResponseVO vo) { String sql = mqData.getSql(); //查询操作 List mapList = jdbcTemplate.queryForList(sql); String resultData = JsonUtils.beanToJson(mapList); LogUtil.info(">>>>>>>>>>>客户端处理结果集:" + resultData); vo.setResultVO(new ResultVO(resultData, true, "success")); return vo; } /** * @description: 上传操作 * @author: XieHeng * @param: mqData * @param: jcbId * @return: boolean * @date: 2021/5/22 9:39 上午 */ private ResponseVO uploadOperate(String jcbId, MqData mqData, JdbcTemplate jdbcTemplate, ResponseVO vo) { String sql = mqData.getSql(); String tableName = mqData.getTableName(); String url = mqData.getUrl(); if (ObjectUtils.isEmpty(url)) { url = uploadUrl; } //获取token String token = getToken(jcbId); //根据tableName查询最后一次执行时间 TableLog tableLog = null; try { tableLog = jdbcTemplate.queryForObject("select * from table_log where table_name=?", new BeanPropertyRowMapper<>(TableLog.class), tableName); } catch (DataAccessException e) { LogUtil.info("?>>>>>>>>查询table表为空"); } LogUtil.info("?>>>>>>>>上传操作获取上一次执行时间:" + JsonUtils.beanToJson(tableLog)); String hsCloudLastTime; if (ObjectUtils.isEmpty(tableLog)) { hsCloudLastTime = lastTime; } else { hsCloudLastTime = DateUtil.format(tableLog.getHsCloudLastTime(), "yyyy-MM-dd HH:mm:ss"); } String regex = "\\{([^}])*\\}"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(sql); while (matcher.find()) { String group = matcher.group(); sql = sql.replaceAll("\\{" + lastTimeKey + "\\}", "'" + hsCloudLastTime + "'"); } sql = sql.replaceAll("[\\t\\n\\r]", " "); LogUtil.info("***************:sql=" + sql); String billId = UUID.randomUUID().toString(); List> mapList = jdbcTemplate.queryForList(sql); if (ObjectUtils.isEmpty(mapList)) { LogUtil.info(">>>>>>>查询结果集为空"); ResultVO resultVO = new ResultVO("查询结果集为空", true, "success"); vo.setResultVO(resultVO); return vo; } String dataStr = JSON.toJSONString(mapList, SerializerFeature.DisableCircularReferenceDetect, SerializerFeature.WriteDateUseDateFormat); List list = JsonUtils.jsonToList(dataStr, Map.class); //获取最大返回时间 String dateTime = mapList.stream().max(Comparator.comparing(map -> map.get(lastTimeKey).toString())).get().get(lastTimeKey).toString(); Date date = DateUtil.toDateMillisecond(dateTime); HashMap map = new HashMap<>(); map.put("bill_type_code", tableName); map.put("bill_id", billId); map.put("bill_data", list); LogUtil.info(">>>>>>>>..upload:url=" + url + ",参数:" + JsonUtils.beanToJson(map) + ",token=" + token); String data = HttpUtil.postWithToken(url, map, token); LogUtil.info(">>>>>>>>..upload结果:" + data); if (!ObjectUtils.isEmpty(data)) { Map resultMap = JsonUtils.jsonToMap(data); boolean b = Boolean.parseBoolean(resultMap.get("success").toString()); //如果上传接口返回成功,处理本地记录 if (b) { int i; if (tableLog != null) { String tSql = "update table_log set hs_cloud_last_time=? where id=?"; jdbcTemplate.update(tSql, date, tableLog.getId()); } else { String tSql = "insert into table_log(table_name,hs_cloud_last_time)values (?,?)"; jdbcTemplate.update(tSql, tableName, dateTime); } vo.setParams(list); } vo.setResultVO(new ResultVO(data, true, "success")); } else { throw new HsRuntimeException(CodeMessageEnum.REQUEST_ERROR.getCode(), "上传接口错误,无返回值"); } return vo; } private String getToken(String jcbId) { String token; try { AuthToken authToken = authTokenDao.findToken(jcbId); //如果token为空,通过接口重新获取 //token 24小时有效期,23小时(82800000)刷新token Date date = new Date(System.currentTimeMillis() - 82800000); if (ObjectUtils.isEmpty(authToken)) { token = tokenUtil.getToken(jcbId); authTokenDao.insert(new AuthToken(jcbId, token, date)); } else { Date lasttime = authToken.getLasttime(); if (lasttime.before(new Date())) { token = tokenUtil.getToken(jcbId); authTokenDao.updateToken(jcbId, token, date); } else { token = authToken.getToken(); } } } catch (Exception e) { token = tokenUtil.getToken(jcbId); } return token; } }