diff --git a/flow.go b/flow.go index a20a0b6..b2ec5b0 100644 --- a/flow.go +++ b/flow.go @@ -1,109 +1,195 @@ package flow -import ( - "log" - "time" +type FlowType int + +const ( + TNode FlowType = 1 + TRoot FlowType = 2 ) -// FlowContext 流程的上下文, 用于传递每个流程之间的参数 -type FlowContext struct { - RootFlow *Flow - CurrentFlow *FlowNode +type ExecuteStatus int - CurrentWrite []byte // 8 byte - CurrentRead []byte // 14 byte +const ( + SFailure ExecuteStatus = 0 + SSuccess ExecuteStatus = 1 + SAgain ExecuteStatus = 2 +) + +type IFlow interface { + GetName() string + + GetRootFlow() IFlow + SetRootFlow(IFlow) + + SetContext(*FlowContext) + GetContext() *FlowContext + + GetPath() string + SetPath(path string) + + GetNext() IFlow + SetNext(IFlow) + + GetPrev() IFlow + SetPrev(IFlow) + + Execute() ExecuteStatus } // FlowNode 流程节点 type FlowNode struct { + IFlow + Name string Path string - prev *FlowNode - next *FlowNode + RootFlow IFlow - Task func(cxt *FlowContext) int + prev IFlow + next IFlow + + Task func(cxt *FlowContext) ExecuteStatus } -// Write 该写入函数包含了 自动封装日志格式 -func (cxt *FlowContext) Write(flag OperatorFlag) { - - operator.portWriterLock.Lock() - wbuf := OperatorOption(flag) - operator.port.Write(wbuf) - operator.port.Flush() - cxt.CurrentWrite = wbuf - operator.portWriterLock.Unlock() - - if len(wbuf) == 8 { - flowpath := cxt.CurrentFlow.Path + ">" + cxt.CurrentFlow.Name // 路径 - _, err := operator.oplog.Exec("insert into operator(ts , rootflow, flowpath, writelog, readlog) values(?,?,?,?,?)", time.Now().Unix(), cxt.RootFlow.Name, flowpath, cxt.CurrentWrite, cxt.CurrentRead) - if err != nil { - log.Println("日志写入错误: ", err) - } - } else { - log.Println("write buffer len is not equal to 8, now is", len(wbuf), "buf: ", wbuf) - } - +func (fn *FlowNode) GetRootFlow() IFlow { + return fn.RootFlow +} +func (fn *FlowNode) SetRootFlow(fl IFlow) { + fn.RootFlow = fl } -// Sensor 返回传感器数值, 每次调用都是最新的. -func (cxt *FlowContext) Sensor() *Sensor { - buf := make([]byte, 14) +// GetPath Get return Path string +func (fn *FlowNode) GetPath() string { + return fn.Path +} - operator.portReaderLock.Lock() - n, err := operator.port.Read(buf) - cxt.CurrentRead = buf - operator.portReaderLock.Unlock() - if err != nil { - log.Println("read bufferr is error:", err) - } else { +// SetPath Set Path string +func (fn *FlowNode) SetPath(Path string) { + fn.Path = Path +} - if n == 14 { - return NewSensor(buf) - } - //TODO: 断包, 沾包 处理 - log.Println("读取长度不等于14, len = ", n) - } +// GetName Get return Name string +func (fn *FlowNode) GetName() string { + return fn.Name +} - return nil +func (fn *FlowNode) GetPrev() IFlow { + return fn.prev +} + +func (fn *FlowNode) SetPrev(prev IFlow) { + fn.prev = prev +} + +func (fn *FlowNode) GetNext() IFlow { + return fn.next +} + +func (fn *FlowNode) SetNext(next IFlow) { + fn.next = next +} + +// GetContext Get return Context *FlowContext +func (fn *FlowNode) GetContext() *FlowContext { + return fn.GetRootFlow().GetContext() +} + +// SetContext Set Context *FlowContext +func (fn *FlowNode) SetContext(Context *FlowContext) { + fn.GetRootFlow().SetContext(Context) +} + +// Execute Get return Name string +func (fn *FlowNode) Execute() ExecuteStatus { + return fn.Task(fn.GetContext()) } // Flow 流程 type Flow struct { - Name string + IFlow + RootFlow IFlow + + Name string Context *FlowContext // 执行流程时候的上下文 - Head *FlowNode - Tail *FlowNode + Head IFlow + Tail IFlow } +func (flow *Flow) GetRootFlow() IFlow { + return flow.RootFlow +} +func (flow *Flow) SetRootFlow(fl IFlow) { + flow.RootFlow = fl +} + +// GetContext Get return Context *FlowContext +func (flow *Flow) GetContext() *FlowContext { + return flow.Context +} + +// SetContext Set Context *FlowContext +func (flow *Flow) SetContext(Context *FlowContext) { + flow.Context = Context +} + +// GetName Get return Name string +func (flow *Flow) GetName() string { + return flow.Name +} + +// Execute Get return Name string +// func (flow *Flow) Execute() { +// flow.Context. +// } + // New 创建一个流程, 相当于例子 `干洗` func New(name string) *Flow { f := &Flow{Name: name, Context: &FlowContext{}} return f } -// Add 添加 -func (flow *Flow) Add(name string, task func(cxt *FlowContext) int) { +func (flow *Flow) GetPrev() IFlow { + return flow.Head.GetPrev() +} - node := &FlowNode{Name: name, Task: task} +func (flow *Flow) SetPrev(prev IFlow) { + // panic("flow can't call SetPrev") + flow.Head.SetPrev(prev) +} + +func (flow *Flow) GetNext() IFlow { + return flow.Tail.GetNext() +} + +func (flow *Flow) SetNext(next IFlow) { + // panic("flow can't call SetNext") + flow.Tail.SetNext(next) +} + +// Add 添加 +func (flow *Flow) Add(fl IFlow) { if flow.Head == nil { - flow.Head = node - flow.Tail = node - - node.Path = node.Name + flow.Head = fl + flow.Tail = fl + fl.SetRootFlow(flow) + fl.SetContext(flow.GetContext()) + fl.SetPath(fl.GetName()) return } - node.Path = flow.Tail.Path + ">" + flow.Tail.Name + fl.SetPath(flow.Tail.GetPath() + ">" + flow.Tail.GetName()) + fl.SetRootFlow(flow) + fl.SetContext(flow.GetContext()) - flow.Tail.next = node - flow.Tail = node + fl.SetPrev(flow.Tail) + flow.Tail.SetNext(fl) + flow.Tail = fl - if node.next != nil { + if fl.GetNext() != nil { panic("tail next is not nil") } + } diff --git a/flow_test.go b/flow_test.go new file mode 100644 index 0000000..e407e19 --- /dev/null +++ b/flow_test.go @@ -0,0 +1,17 @@ +package flow + +import ( + "testing" + + "github.com/davecgh/go-spew/spew" +) + +func TestFlowAdd(t *testing.T) { + flow := New("干洗") + + flow.Add(&FlowNode{Name: "干洗01"}) + flow.Add(&FlowNode{Name: "干洗02"}) + flow.Add(&FlowNode{Name: "干洗03"}) + + t.Error(spew.Sdump(flow)) +} diff --git a/flowcontext.go b/flowcontext.go new file mode 100644 index 0000000..4555303 --- /dev/null +++ b/flowcontext.go @@ -0,0 +1,59 @@ +package flow + +import ( + "log" + "time" +) + +// FlowContext 流程的上下文, 用于传递每个流程之间的参数 +type FlowContext struct { + RootFlow *Flow + CurrentFlow *FlowNode + + CurrentWrite []byte // 8 byte + CurrentRead []byte // 14 byte +} + +// Sensor 返回传感器数值, 每次调用都是最新的. +func (cxt *FlowContext) Sensor() *Sensor { + buf := make([]byte, 14) + + operator.portReaderLock.Lock() + n, err := operator.port.Read(buf) + cxt.CurrentRead = buf + operator.portReaderLock.Unlock() + if err != nil { + log.Println("read bufferr is error:", err) + } else { + + if n == 14 { + return NewSensor(buf) + } + //TODO: 断包, 沾包 处理 + log.Println("读取长度不等于14, len = ", n) + } + + return nil +} + +// Write 该写入函数包含了 自动封装日志格式 +func (cxt *FlowContext) Write(flag OperatorFlag) { + + operator.portWriterLock.Lock() + wbuf := OperatorOption(flag) + operator.port.Write(wbuf) + operator.port.Flush() + cxt.CurrentWrite = wbuf + operator.portWriterLock.Unlock() + + if len(wbuf) == 8 { + flowpath := cxt.CurrentFlow.Path + ">" + cxt.CurrentFlow.Name // 路径 + _, err := operator.oplog.Exec("insert into operator(ts , rootflow, flowpath, writelog, readlog) values(?,?,?,?,?)", time.Now().Unix(), cxt.RootFlow.Name, flowpath, cxt.CurrentWrite, cxt.CurrentRead) + if err != nil { + log.Println("日志写入错误: ", err) + } + } else { + log.Println("write buffer len is not equal to 8, now is", len(wbuf), "buf: ", wbuf) + } + +} diff --git a/go.mod b/go.mod index 67a9432..32b29d2 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module 474420502.top/test/flow go 1.13 require ( + github.com/davecgh/go-spew v1.1.1 github.com/mattn/go-sqlite3 v1.11.0 github.com/smartystreets/goconvey v1.6.4 // indirect github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07 diff --git a/go.sum b/go.sum index 192565b..7e5aa9f 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= diff --git a/my.cfg b/my.cfg index af338fa..28882bb 100644 --- a/my.cfg +++ b/my.cfg @@ -1,4 +1,4 @@ [config] -portid = /dev/pts3 +portid = /dev/pts/14 baud = 9600 readtimeout = 5 \ No newline at end of file diff --git a/operator.go b/operator.go index 71323d7..d5da1a2 100644 --- a/operator.go +++ b/operator.go @@ -8,6 +8,7 @@ import ( "sync" "time" + _ "github.com/mattn/go-sqlite3" serial "github.com/tarm/serial" "gopkg.in/ini.v1" ) @@ -91,16 +92,16 @@ func init() { var rtimeout time.Duration var cfg *ini.File - cfg, err := ini.Load("my_test.cfg") + cfg, err := ini.Load("my.cfg") if err != nil { log.Println(err) log.Println("加载配置my.cfg失败, 将使用默认值") - f, err := os.OpenFile("./my_test.cfg", os.O_CREATE|os.O_WRONLY, 0666) + f, err := os.OpenFile("./my.cfg", os.O_CREATE|os.O_WRONLY, 0666) if err != nil { panic(err) } f.WriteString("[config]\nportid = COM1\nbaud = 9600\nreadtimeout = 5") - cfg, err = ini.Load("my_test.cfg") + cfg, err = ini.Load("my.cfg") if err != nil { panic(err) }