flow/flowcontext.go

60 lines
1.4 KiB
Go
Raw Permalink Normal View History

package flow
import (
"log"
"time"
)
// FlowContext 流程的上下文, 用于传递每个流程之间的参数
type FlowContext struct {
RootFlow *Flow
CurrentFlow *FlowNode
CurrentWrite []byte // 8 byte
CurrentRead []byte // 14 byte
}
// Sensor 返回传感器数值, 每次调用都是最新的.
func (cxt *FlowContext) Sensor() *Sensor {
buf := make([]byte, 14)
operator.portReaderLock.Lock()
n, err := operator.port.Read(buf)
cxt.CurrentRead = buf
operator.portReaderLock.Unlock()
if err != nil {
log.Println("read bufferr is error:", err)
} else {
if n == 14 {
return NewSensor(buf)
}
//TODO: 断包, 沾包 处理
log.Println("读取长度不等于14, len = ", n)
}
return nil
}
// Write 该写入函数包含了 自动封装日志格式
func (cxt *FlowContext) Write(flag OperatorFlag) {
operator.portWriterLock.Lock()
wbuf := OperatorOption(flag)
operator.port.Write(wbuf)
operator.port.Flush()
cxt.CurrentWrite = wbuf
operator.portWriterLock.Unlock()
if len(wbuf) == 8 {
flowpath := cxt.CurrentFlow.Path + ">" + cxt.CurrentFlow.Name // 路径
_, err := operator.oplog.Exec("insert into operator(ts , rootflow, flowpath, writelog, readlog) values(?,?,?,?,?)", time.Now().Unix(), cxt.RootFlow.Name, flowpath, cxt.CurrentWrite, cxt.CurrentRead)
if err != nil {
log.Println("日志写入错误: ", err)
}
} else {
log.Println("write buffer len is not equal to 8, now is", len(wbuf), "buf: ", wbuf)
}
}