flow/flow.go
2019-11-07 11:16:48 +08:00

104 lines
2.1 KiB
Go

package flow
import (
"log"
"time"
)
// FlowContext 流程的上下文, 用于传递每个流程之间的参数
type FlowContext struct {
RootFlow *Flow
CurrentFlow *FlowNode
CurrentWrite []byte // 8 byte
CurrentRead []byte // 14 byte
}
// FlowNode 流程节点
type FlowNode struct {
Name string
Path string
prev *FlowNode
next *FlowNode
Task func(cxt *FlowContext) int
}
// Write 该写入函数包含了 自动封装日志格式
func (cxt *FlowContext) Write(flag OperatorFlag) {
operator.portWriterLock.Lock()
wbuf := OperatorOption(flag)
operator.port.Write(wbuf)
operator.port.Flush()
cxt.CurrentWrite = wbuf
operator.portWriterLock.Unlock()
if len(wbuf) == 8 {
flowpath := cxt.CurrentFlow.Path + ">" + cxt.CurrentFlow.Name // 路径
_, err := operator.oplog.Exec("insert into operator(ts , rootflow, flowpath, writelog, readlog) values(?,?,?,?,?)", time.Now().Unix(), cxt.RootFlow.Name, flowpath, cxt.CurrentWrite, cxt.CurrentRead)
if err != nil {
log.Println("日志写入错误: ", err)
}
} else {
log.Println("write buffer len is not equal to 8, now is", len(wbuf), "buf: ", wbuf)
}
}
// Sensor 返回传感器数值, 每次调用都是最新的.
func (cxt *FlowContext) Sensor() *Sensor {
buf := make([]byte, 14)
operator.portReaderLock.Lock()
n, err := operator.port.Read(buf)
cxt.CurrentRead = buf
operator.portReaderLock.Unlock()
if err != nil {
log.Println("read bufferr is error:", err)
} else {
if n == 14 {
return NewSensor(buf)
}
//TODO: 断包, 沾包 处理
log.Println("读取长度不等于14, len = ", n)
}
return nil
}
// Flow 流程
type Flow struct {
Name string
Context *FlowContext // 执行流程时候的上下文
Head *FlowNode
Tail *FlowNode
}
// Add 添加
func (flow *Flow) Add(name string, task func(cxt *FlowContext) int) {
node := &FlowNode{Name: name, Task: task}
if flow.Head == nil {
flow.Head = node
flow.Tail = node
node.Path = node.Name
return
}
node.Path = flow.Tail.Path + ">" + flow.Tail.Name
flow.Tail.next = node
flow.Tail = node
if node.next != nil {
panic("tail next is not nil")
}
}