# 前置条件
接口:Condition
/**
* 逻辑执行条件
* @author 无量
* @date 2021/9/3 17:01
*/
public interface Condition {
/**
* 验证条件
* @param globalParams
* @return
*/
boolean preCondition(GlobalParams globalParams);
}
为减少大家的编码复杂度,这里我们无需实现此接口,只需要编写preCondition的函数实现即可,代码如下:
globalParams.getModelMap().get("order").getInteger("total")!=1
# 参数说明
GlobalParams 对象:
public class GlobalParams implements Serializable {
/**
* 参数列表,maps包含全局参数、流水线参数、系统变量、api传入参数
*/
private Map<String, Object> maps;
/**
* 模型列表,流水线历史的任务生成的对象模型
*/
private Map<String, JSONObject> modelMap;
}
maps
解释:包含全局参数、流水线参数、系统变量、api传入参数
举例:
在变量维护中新增参数appKey,值为3766213
这里前置条件为appKey不为空,则编辑代码如下
!globalParams.getMaps().get("appKey").toString().equals("")
modelMap
解释:模型列表,流水线历史的任务生成的对象模型
举例:
前一个任务输出了订单模型
这里前置条件为校验订单数量大于0,则编辑代码如下
globalParams.getModelMap().get("order").getInteger("total")>0
# 模型循环计划
接口:LoopPlan
/**
*
* 循环执行计划
* @author 无量
* @date 2021/9/17 22:04
*/
public interface LoopPlan extends Plan{
/**
* 获取遍历数据
* @param globalParams
* @return
*/
public <T> List<T> getCollection(GlobalParams globalParams);
}
参数说明: /help/imgs
举例:
如下:自定义循环对象,添加循环对象
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.w2n0.interconnection.domain.common.entity.GlobalParams;
import cn.w2n0.interconnection.domain.node.entity.LoopPlan;
import java.util.ArrayList;
import java.util.List;
public class GroovyPlanTest implements LoopPlan {
@Override
public List<JSONObject> getCollection(GlobalParams globalParams) {
List<JSONObject> jsonObjectList=new ArrayList<>();
JSONObject jsonObject=new JSONObject();
jsonObject.put("orderNo","000001");
jsonObjectList.add(jsonObject);
JSONObject jsonObject1=new JSONObject();
jsonObject1.put("orderNo","000002");
jsonObjectList.add(jsonObject1);
return jsonObjectList;
}
}
# 递归循环计划
说明:
递归计划是根据输入参数校验的,如果校验成功则允许递归否则不允许递归,递归执行结束后任务标记为完成状态。
接口:RecursionPlan
/**
*
* 递归
* 业务得执行反映在全局参数中,根据全局参数校验当前任务是否递归执行
* @author 无量
* @date 2021/9/17 22:04
*/
public interface RecursionPlan extends Plan{
/**
* 是否执行递归
* @param globalParams 全局执行
* @return true:允许嘀咕,false不允许递归
*/
public boolean permit(GlobalParams globalParams);
}
参数说明: 参见
举例:
import cn.w2n0.interconnection.domain.common.entity.GlobalParams;
import cn.w2n0.interconnection.domain.node.entity.RecursionPlan;
public class DemoRecursion implements RecursionPlan {
@Override
public boolean permit(GlobalParams globalParams) {
int startRow=Integer.parseInt(globalParams.getMaps().get("K3StartRow").toString());
int limit=Integer.parseInt(globalParams.getMaps().get("k3Limit").toString());
if(startRow==0&&!globalParams.getModelMap().containsKey("item"))
{
return true;
}
else
{
int count=globalParams.getModelMap().get("item").getInteger("total");
startRow=startRow+limit;
if(startRow <= count)
{
globalParams.getMaps().put("K3StartRow",startRow);
return true;
}
}
return false;
}
}
# 循环执行条件
接口:LoopCondition
/**
* 循环执行条件
* @author 无量
* @date 2021/9/18 4:30
*/
public interface LoopCondition {
/**
* 验证条件
* @param object
* @return
*/
boolean preCondition(JSONObject object);
}
为减少大家的编码复杂度,这里我们无需实现此接口,只需要编写preCondition的函数实现即可,代码如下:
(Boolean)object.get("deliver")==false
# 参数说明
object 对象类型为JSONObject,在根据数组模型循环object 代表当前数组模型的子项
# 请求参数
接口:ParamGenerator
/**
* 参数生成器接口
*
* @author 无量
* @date 2021/9/3 20:36
*/
public interface ParamGenerator {
/**
* 参数解析方法
* @param currentParam 待解析的参数
* @param headerParams 请求头参数
* @param requestParams 请求参数
* @param body 请求体
* @param currentObject 当前对象
* @param globalParams 全局参数
* @return
*/
public Object resolve(RequestParam currentParam, MultiValueMap<String, String> headerParams, MultiValueMap<String, String> requestParams, Object body, JSONObject currentObject, GlobalParams globalParams);
}
举例:
如下:某平台的签名参数,参数名sign,签名规则:
import com.alibaba.fastjson.JSONObject;
import cn.w2n0.interconnection.domain.common.entity.GlobalParams;
import cn.w2n0.interconnection.domain.node.entity.ParamGenerator;
import cn.w2n0.interconnection.domain.node.entity.RequestParam;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.MultiValueMap;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author 无量
* @date 2021/9/4 6:15
*/参见
public class GroovyTest implements ParamGenerator {
public final String SIGN_METHOD_MD5 = "md5";
public final String SIGN_METHOD_HMAC = "hmac";
private final String CHARSET_UTF8 = "utf-8";
@Override
public Object resolve(RequestParam currentParam, MultiValueMap<String, String> headerParams, MultiValueMap<String, String> requestParams, Object body, JSONObject currentObject, GlobalParams globalParams) {
try {
String secret=globalParams.getMaps().get("tp_secret").toString();
return signTopRequest(requestParams, secret, SIGN_METHOD_HMAC);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 对TOP请求进行签名。
*/
private String signTopRequest(MultiValueMap<String, String> params, String secret, String signMethod) throws IOException {
// 第一步:检查参数是否已经排序
String[] keys = params.keySet().toArray(new String[0]);
Arrays.sort(keys);
// 第二步:把所有参数名和参数值串在一起
StringBuilder query = new StringBuilder();
if (SIGN_METHOD_MD5.equals(signMethod)) {
query.append(secret);
}
for (String key : keys) {
String value = params.get(key).get(0).toString();
if (StringUtils.isNotEmpty(key) && StringUtils.isNotEmpty(value)) {
query.append(key).append(value);
}
}
// 第三步:使用MD5/HMAC加密
byte[] bytes;
if (SIGN_METHOD_HMAC.equals(signMethod)) {
bytes = encryptHMAC(query.toString(), secret);
} else {
query.append(secret);
bytes = encryptMD5(query.toString());
}
// 第四步:把二进制转化为大写的十六进制
return byte2hex(bytes);
}
/**
* 对字节流进行HMAC_MD5摘要。
*/
private byte[] encryptHMAC(String data, String secret) throws IOException {
byte[] bytes = null;
try {
SecretKey secretKey = new SecretKeySpec(secret.getBytes(CHARSET_UTF8), "HmacMD5");
Mac mac = Mac.getInstance(secretKey.getAlgorithm());
mac.init(secretKey);
bytes = mac.doFinal(data.getBytes(CHARSET_UTF8));
} catch (GeneralSecurityException gse) {
throw new IOException(gse.toString());
}
return bytes;
}
/**
* 对字符串采用UTF-8编码后,用MD5进行摘要。
*/
private byte[] encryptMD5(String data) throws IOException {
return encryptMD5(data.getBytes(CHARSET_UTF8));
}
/**
* 对字节流进行MD5摘要。
*/
private byte[] encryptMD5(byte[] data) throws IOException {
byte[] bytes = null;
try {
MessageDigest md = MessageDigest.getInstance("MD5");
bytes = md.digest(data);
} catch (GeneralSecurityException gse) {
throw new IOException(gse.toString());
}
return bytes;
}
/**
* 把字节流转换为十六进制表示方式。
*/
private String byte2hex(byte[] bytes) {
StringBuilder sign = new StringBuilder();
for (int i = 0; i < bytes.length; i++) {
String hex = Integer.toHexString(bytes[i] & 0xFF);
if (hex.length() == 1) {
sign.append("0");
}
sign.append(hex.toUpperCase());
}
return sign.toString();
}
}
# 错误消息解析器
接口:ErrorMessageParser
/**
* 错误消息解析器
* @author 无量
* @date 2021/9/4 2:21
*/
public interface ErrorMessageParser<T> {
/**
* 校验,并解析节点返回的消息是否为错误消息
* @param body 节点返回信息
* @return
*/
public ErrorMessageResult parser(T body);
}
# 参数说明
ErrorMessageResult对象:
public class ErrorMessageResult {
/**
* 是否错误
* true:没有错误
* false:有错误
*/
private boolean success;
/**
* 错误类型
*/
private ErrorLogicType logicType;
/**
* 错误消息
*/
private String message;
}
ErrorLogicType对象:
public enum ErrorLogicType {
/**
* 退出流水线
*/
eixtFlow,
/**
* 退出当前节点
* 会执行流水线的其他节点
*/
exitFlowNode,
/**
* 重试,仍然失败退出流水线
*/
retryEixtFlow,
/**
* 重试,仍然失败退出当前节点
*/
retryExitFlowNode,
/**
* 正常返回,
*/
success;
}
举例:
import cn.w2n0.interconnection.domain.node.entity.ErrorMessageParser;
import cn.w2n0.interconnection.domain.node.entity.ErrorMessageResult;
import cn.w2n0.interconnection.domain.node.entity.error.ErrorLogicType;
/**
* @author 无量
* @date 2021/12/30 15:25
*/
public class GroovyTest implements ErrorMessageParser<String> {
@Override
public ErrorMessageResult parser(String body) {
ErrorMessageResult errorMessageResult=new ErrorMessageResult();
errorMessageResult.setLogicType(ErrorLogicType.eixtFlow);
errorMessageResult.setMessage("系统错误退出流水线");
errorMessageResult.setSuccess(false);
return errorMessageResult;
}
}
# 格式化为模型
接口:MessageModelParser
/**
* 消息解析成模型接口
* @author 无量
* @date 2021/9/4 2:21
*/
public interface MessageModelParser<T> extends MessageParser {
/**
* 把节点的返回消息解析成模型对象
* @param body 解析消息
* @param object 当前模型
* @param modelMap 流水线模型
*/
public void parse(T body, JSONObject object, Map<String, JSONObject> modelMap);
}
举例:
添加商品模型item,并设置模型属性total(商品数量)
import cn.w2n0.interconnection.domain.node.entity.MessageModelParser;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.Map;
public class K3ItemCount implements MessageModelParser<String> {
@Override
public void parse(String body, JSONObject object, Map<String, JSONObject> modelMap) {
JSONArray jsonArray= JSON.parseArray(body);
int size = jsonArray.size();
JSONObject jsonObject=new JSONObject();
jsonObject.put("total",size);
modelMap.put("item",jsonObject);
}
}
# 参数说明
body:任务返回的原始信息
object:当前对象,执行计划为循环执行时本参数有效
modelMap:全局模型
# 脚本格式化消息并返回
接口:MessageModelParser
/**
* 消息格式化接口
* @author 无量
* @date 2021/9/4 2:21
*/
public interface MessageFormat<T> extends MessageParser {
/**
* 把节点的返回消息格式化成模型对象
* @param body 待格式化对象
* @return 模型对象对应的字符串
*/
public String parse(T body);
}
举例:
import cn.w2n0.interconnection.domain.common.core.exception.FlowRuntimeException;
import cn.w2n0.interconnection.domain.flow.support.ResultError;
import cn.w2n0.interconnection.domain.node.entity.MessageFormat;
import groovy.lang.GroovyClassLoader;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import static com.google.common.base.Preconditions.checkArgument;
/**
* groovy 格式化
* @author 无量
* @date 2021/9/5 19:06
*/
public class GroovyTest implements MessageFormat<String>{
@Override
public String parse(String body) {
return body;
}
}