dapeng-kstream开发的初衷,由于kafka-kstream有一定的学习成本,为了让业务同事能够快速的定制自己的业务告警逻辑,通过dapeng-kstream的封装,业务同事只需要关注自己的消息处理逻辑,同时封装后的dapeng-kstream针对我们的dapeng部署中心会更加容易扩展,目前0.1-SNAPSHOT版本,还需要业务同事自己写一个业务相关的代码。
后续业务同事可以在部署中心页面定制自己的业务告警逻辑,而不需要再自己写代码,敬请期待.
前期在部署中心页面功能未完善之前,暂时需要运维帮忙手动操作一下,后续页面操作功能完善后,业务部门就只需要关心自己的告警逻辑。
- 用户自定义告警逻辑(例如:orderWarning.sc)保存为文件并上传到服务器指定目录(放在工作目录下的functions目录下,在kstream.properties中可以指定工作目录) (后面会说到具体如何定义自己的告警逻辑)
- 调用DapengKStream引擎(需要将kstream.properties和mailConfig.properties与jar包放在同一个目录下) =>
java -jar dapengKstream.jar
/**
* 提供一个返回布尔值的函数: true: 下一个节点继续处理该消息,false: 丢弃
* k: kafka流式消息的key, 生产者发送消息时指定的key 默认为null, 可以通过dapengMap, windowAlert函数转换为其他值
* v: kafka流式消息的value, value一般为业务接收的原始消息, 可以通过dapengMap,windowAlert函数转换值
* @param p the provided func
* @return DapengKStream[K,V]
*/
def dapengFilter(p: (K,V) ⇒ Boolean):DapengKStream[K,V]
/**
* 根据业务ServiceTag 过滤消息, 如: orderService
* @param serviceName the serviceName for filter
* @return DapengKStream[K,V]
*/
def serviceFilter(serviceName: String):DapengKStream[K,V]
// 根据日志级别过滤消息
/**
* 根据消息的日志级别过滤: 如: INFO, WARN, ERROR
* @param logLevel the logLevel to input
* @return DapengKStream[K,V]
*/
def logLevelFilter(logLevel: String)
/**
* 提供一个Key, Value转换的函数,该方法可以对消息的Key, Value 进行转换处理, 默认的key为Null
* @param mapper the function to input
* @tparam KR new transformed Key value
* @tparam VR new transformed Value
* @return
*/
def dapengMap[KR, VR](mapper: (K, V) => (KR, VR)): DapengKStream[KR, VR]
/**
*
* @param duration 定时任务触发间隔
* @param keyWord 统计的关键字
* @param countTimesToWarn 告警阈值
* @param warningType 发送告警类型: "mail": 发邮件, "dingding": 发钉钉, "all", 同时发邮件跟钉钉
* @param userTag 根据ServiceTag 获取发送的用户
* @param subject 邮件 或 钉钉的主题
* @return KStream[K,V]
*/
def windowAlert(duration: Duration,
keyWord: String,
countTimesToWarn: Int,
warningType:String,
userTag: String,
subject: String)
/**
* 该方式适用于有定时启动范围的需求: 如2点到6点内,统计每分钟的指定消息
* @param timeFrom 开始范围
* @param timeTo 结束范围
* @param duration 定时间隔
* @param keyWord 统计的关键消息
* @param countTimesToWarn 告警统计阈值
* @param warningType 发送告警类型: "mail": 发邮件, "dingding": 发钉钉, "all", 同时发邮件跟钉钉
* @param userTag 根据ServiceTag 获取发送的用户
* @param subject 邮件 或 钉钉的主题
* @return KStream[K,V]
*/
def timeRangeAlert(timeFrom: Int,
timeTo: Int,
duration: Duration,
keyWord: String,
countTimesToWarn: Int,
warningType: String,
userTag: String,
subject: String): KStream[K,V]
/**
* 根据ServiceTag获取用户组,并发送钉钉消息
* @param user 业务用户组, 如: orderService
* @param mapper 发送到钉钉的目标消息(_, context) = mapper(key, value)
* @return
*/
def sendDingding(userTag: String, mapper: (K,V) => (K, String))
/**
* 根据ServiceTag获取用户组,并根据设置的标题发送邮件
* @param user 业务用户组, 如:orderService
* @param subject 邮件标题
* @return KStream[K,V]
*/
def sendMail(user: String, subject: String)
注: Dapeng-KStream原生支持Kstream, 如果熟悉KStream Api的也可以使用原生的Kafka-KStreamApi无缝接入
我们以以下几个需求来看如果使用这套Api:
业务需要把如下代码复制到某一文件中,具体的升级流程,详见
### 1. 基本部署流程
-
1.线上的堆栈异常要及时通过钉钉通知开发人员,同时发送错误邮件。
topic("order_topic") .dapengFilter((_,v) => v.contains("ERROR") || v.contains("Exception")) .sendMail("orderGroup", "订单错误异常告警") .sendDingding("orderGrooup", (k,v: String) => { val msg = s" 订单错误异常告警,日志信息为: ${String.valueOf(v)}" (k, msg) })
上面的意思是: 订阅对应的消息主题,过滤消息包含错误或异常的消息,然后发邮件,钉钉
-
2.某jvm进程一分钟内FullGc次数超过2次告警。
topic("efk") .serviceFilter("orderService") .dapengFilter((_,v) => v.contains("FullGc")) .windowAlert(Duration.ofMinutes(1), "ERROR", 2, "all", //(all: 包含发邮件,钉钉告警, 详见2的APi接口参数) "orderGroup", "[订单错误统计告警]")
上面的意思是:订阅对应的消息主题,根据serviceTag:
orderService
过滤订单的消息,根据消息内容:FullGc
过滤后,每隔1分钟统计该消息,如果次数等于或超过2次后,发送 邮件,钉钉告警,同时主题是: "[订单错误统计告警]", 内容为原消息, 如果需要针对消息做进一步加工转换的话,可以在dapengFilter后 接一个dapengMap的函数, 如:topic("efk") .serviceFilter("orderService") .dapengFilter((_,v) => v.contains("FullGc")) .dapengMap((k,v) => s"FullGc高过预期: $v") .windowAlert(Duration.ofMinutes(1), "ERROR", 2, "all", //(all: 包含发邮件,钉钉告警, 详见2的APi接口参数) "orderGroup", "[订单错误统计告警]")
-
3.单个订单总金额超过10w或者子单个数超过1k的告警, 需业务埋点去提取关键信息
//如: 业务埋点: logger.info(s"@@@: createOrder orderNo: ${order.orderNo} totalAmount: ${order.orderActualAmount}, orderDetailSize: ${request.orderDetails.size}") val matcher = "@@@: (createOrder orderNo: )(\\w+)(, totalAmount: )(-?\\d+.?\\d*)(, orderDetailSize: )(\\d+)".r topic("test") .serviceFilter("orderService") .dapengFilter((_,v) => v.contains("@@@:")) .dapengMap((k,v) => { //业务自定义格式切割 val r = matcher findAllMatchIn v map( i => { i.group(4).toDouble}) val amount = if (r.hasNext) r.next() else 0D (k,String.valueOf(amount)) }) .filter((k,v) => v.toDouble > 1000) .sendMail("orderGroup", "订单异常")
-
4.在凌晨6点到第二天凌晨2点, 一分钟内没有订单产生的话告警
// 4. 在凌晨6点到第二天凌晨2点, 一分钟内没有订单产生的话告警 topic("test") .serviceFilter("orderService") .dapengFilter((_,v) => v.contains("createOrder")) .timeRangeAlert(2, 6, Duration.ofMinutes(1), "createOrder", 0, "all", "orderGroup", "[一分钟内没有订单创建,请查看]")
- 业务只需要修改自己的告警文件内容. 无需额外的操作。
我们有独立的进程去检测告警文件是否修改,如果有修改,会杀掉原有进程,并基于新的文件内容启动新的告警进程
- 将用户输入的代码, 结合DapengStream引擎,构造成一个完整的KafkaKStream流处理主程序(当前为字符串形态)
- 将上述构造好的代码字符串封装在一个类型为:
() => Unit
的函数中 (当前也是 字符串心态) - 将封装好的函数包装在Ammonite(感兴趣的关注
ammonite.io
)的主函数中,将字符串翻译成可执行的函数代码,并执行,如下述伪代码
val input = s"$userInput"
val header = s"$initializeCode"
val runner = s"startFuncCode"
val func = header + input + runner
val execFunction = wrapFunction(func)
val file = new File("doAction.sc")
val outputStream = new FileOutputStream(file)
outputStream.write(execFunction.getBytes())
outputStream.close()
val result: (Res[Any], Seq[(Path, Long)]) = ammonite.Main().runScript(Path(file.getAbsolutePath), Seq(("args", Option.empty)))
result._1 match {
case Success(x: (() => Unit)) =>
println("matched function. start to execute...")
x()
case Success(x: (Any => Any)) => x()
case Success(x: (Any => Unit)) => x()
case _ => throw new Exception(s"非法函数...${result._1}")
}