From b31f933acabb320ac1580141e333e4a8762bcfd7 Mon Sep 17 00:00:00 2001 From: eson <9673575+githubcontent@user.noreply.gitee.com> Date: Mon, 31 Jul 2023 17:09:23 +0800 Subject: [PATCH] finish --- go.mod | 13 +------ go.sum | 35 ++++------------- main.go | 61 ++++++++++++++++++++++++++++- priority_queue.go | 12 ++++++ sm.go | 71 +++++++++++++++++++++++++--------- start_test.go | 97 ++++++++++++++++++++++++++--------------------- 6 files changed, 189 insertions(+), 100 deletions(-) diff --git a/go.mod b/go.mod index 650bc67..4495b03 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,16 @@ module fusenrender go 1.20 require ( - github.com/dgraph-io/badger/v3 v3.2103.5 github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be + github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 ) require ( + github.com/474420502/batchexecute v0.0.2 // indirect github.com/DataDog/zstd v1.4.5 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/VictoriaMetrics/metrics v1.18.1 // indirect github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect - github.com/cespare/xxhash v1.1.0 // indirect github.com/cockroachdb/errors v1.9.0 // indirect github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect github.com/cockroachdb/pebble v0.0.0-20221207173255-0f086d933dac // indirect @@ -31,7 +31,6 @@ require ( github.com/hashicorp/memberlist v0.3.1 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/kr/text v0.2.0 // indirect - github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 // indirect github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376 // indirect github.com/miekg/dns v1.1.26 // indirect github.com/pierrec/lz4/v4 v4.1.14 // indirect @@ -43,22 +42,14 @@ require ( github.com/valyala/histogram v1.2.0 // indirect golang.org/x/crypto v0.11.0 // indirect golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect - google.golang.org/protobuf v1.26.0 // indirect ) require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/dgraph-io/ristretto v0.1.1 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect - github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/flatbuffers v1.12.1 // indirect github.com/klauspost/compress v1.12.3 // indirect github.com/pkg/errors v0.9.1 // indirect - go.opencensus.io v0.22.5 // indirect golang.org/x/net v0.12.0 // indirect golang.org/x/sys v0.10.0 // indirect gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index d28f554..b2cc84a 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/474420502/batchexecute v0.0.2 h1:gTMyUh2x6HpV7/+nOLj4qFRO/l5/B/eg7dXsHxLJpbg= +github.com/474420502/batchexecute v0.0.2/go.mod h1:IWazO1QTaB5LyWwMxSqIX/6g/UXwwpnqk0AVM5j24J0= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -13,8 +15,6 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= -github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= -github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/VictoriaMetrics/metrics v1.18.1 h1:OZ0+kTTto8oPfHnVAnTOoyl0XlRhRkoQrD2n2cOuRw0= github.com/VictoriaMetrics/metrics v1.18.1/go.mod h1:ArjwVz7WpgpegX/JpB0zpNF2h2232kErkEnzH1sxMmA= @@ -25,8 +25,6 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= -github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -61,14 +59,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= -github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw= -github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= -github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -107,10 +99,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= -github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -124,7 +113,6 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -133,8 +121,6 @@ github.com/gomodule/redigo v1.7.1-0.20190724094224-574c33c3df38/go.mod h1:B4C85q github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= -github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -162,6 +148,7 @@ github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uP github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= +github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= @@ -254,6 +241,7 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -262,6 +250,7 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108 github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1lskyM0= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c h1:Lgl0gzECD8GnQ5QCWA8o6BtfL6mDH5rQgM4/fX3avOs= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE= @@ -290,9 +279,6 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= -github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= @@ -332,8 +318,6 @@ github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= -go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -396,6 +380,7 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -404,7 +389,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -429,7 +413,6 @@ golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -464,6 +447,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/gonum v0.8.2 h1:CCXrcPKiGGotvnN6jfUsKk4rRqm7q09/YbKb5xCEvtM= gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= @@ -471,13 +455,11 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180518175338-11a468237815/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= @@ -493,12 +475,11 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/main.go b/main.go index 2a8352c..3d0d793 100644 --- a/main.go +++ b/main.go @@ -1,14 +1,18 @@ package fusenrender import ( + "context" "flag" "fmt" + "log" "os" "os/signal" "path/filepath" "runtime" "syscall" + "time" + "github.com/474420502/batchexecute" "github.com/lni/dragonboat/v4" "github.com/lni/dragonboat/v4/config" "github.com/lni/dragonboat/v4/logger" @@ -18,6 +22,59 @@ func main() { } +func BatchFunc(nh *dragonboat.NodeHost) { + + BatchQueueExecute = batchexecute.NewBatchExecute[bool](func(params *bool) { + + cs := nh.GetNoOPSession(128) + for { + + cmd := Command{ + Name: "dequeue", + Group: "test", + } + + data, err := cmd.Encode() + if err != nil { + log.Println(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + result, err := nh.SyncPropose(ctx, cs, data) + cancel() + if err != nil { + log.Println(err) + break + } else { + if len(result.Data) == 0 { + log.Println("wait 10 second") + var m runtime.MemStats + + runtime.ReadMemStats(&m) + + allocMB := float64(m.Alloc) / 1024 / 1024 + totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 + sysMB := float64(m.Sys) / 1024 / 1024 + + fmt.Printf("Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", allocMB, totalAllocMB, sysMB) + time.Sleep(time.Second * 10) + break + } else { + var item QueueItem + err := item.Decode(result.Data) + if err != nil { + log.Println(err) + } + log.Println(item) + + } + } + } + + }) + +} + var addresses []string = []string{ "localhost:5500", "localhost:5501", @@ -77,7 +134,7 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat HeartbeatRTT: 1, CheckQuorum: true, - SnapshotEntries: 10, + SnapshotEntries: 100, CompactionOverhead: 5, } @@ -109,6 +166,8 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat os.Exit(1) } + BatchFunc(nh) + return nh } diff --git a/priority_queue.go b/priority_queue.go index 8b6e5c6..2754a2a 100644 --- a/priority_queue.go +++ b/priority_queue.go @@ -19,6 +19,18 @@ type PriorityQueue[T any] struct { mu sync.Mutex } +func (pq *PriorityQueue[T]) Empty() bool { + pq.mu.Lock() + defer pq.mu.Unlock() + return len(pq.Queue) == 0 +} + +func (pq *PriorityQueue[T]) Size() int64 { + pq.mu.Lock() + defer pq.mu.Unlock() + return int64(pq.Queue.Len()) +} + func (pq *PriorityQueue[T]) Push(x *Slice[T]) { pq.mu.Lock() defer pq.mu.Unlock() diff --git a/sm.go b/sm.go index 465ba57..62a45da 100644 --- a/sm.go +++ b/sm.go @@ -9,14 +9,24 @@ import ( "sync" "time" + "github.com/474420502/batchexecute" sm "github.com/lni/dragonboat/v4/statemachine" ) +var BatchQueueExecute *batchexecute.BatchExecute[bool] + +// SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列 type SMQueue struct { - shardID uint64 + // 所属的Shard ID + shardID uint64 + // Replica ID replicaID uint64 - mu sync.Mutex + counter *batchexecute.BatchExecute[int64] + + // 互斥锁,保护队列Map的并发访问 + mu *sync.Mutex + // 组名到队列的映射 queues map[string]*PriorityQueue[QueueItem] } @@ -35,11 +45,16 @@ func (cmd *Command) Encode() ([]byte, error) { return val, nil } +// QueueItem表示队列中的一个项 type QueueItem struct { - Group string `json:"group"` // 组名 - Priority uint32 `json:"priority"` // 处理的优先级 + // 队列组名 + Group string `json:"group"` // 组名 + // 优先级 + Priority uint32 `json:"priority"` // 处理的优先级 + // 创建时间 CreateAt time.Time `json:"create_at"` // 创建时间 统一utc - Data any `json:"data"` // 操作的数据结构 + // 要排队的数据 + Data any `json:"data"` // 操作的数据结构 } func (item *QueueItem) Encode() ([]byte, error) { @@ -50,17 +65,29 @@ func (item *QueueItem) Encode() ([]byte, error) { return val, nil } +func (item *QueueItem) Decode(buf []byte) error { + err := json.Unmarshal(buf, item) + if err != nil { + return err + } + return nil +} + func (item *QueueItem) GetKey() []byte { return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix())) } // // NewSMQueue creates and return a new ExampleStateMachine object. func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { - + mu := &sync.Mutex{} return &SMQueue{ shardID: shardID, replicaID: replicaID, - + counter: batchexecute.NewBatchExecute[int64](func(params *int64) { + log.Printf("queue remain: %d\n", *params) + time.Sleep(time.Second * 5) + }), + mu: mu, queues: make(map[string]*PriorityQueue[QueueItem]), } } @@ -73,6 +100,7 @@ func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) { return item, nil } +// Update处理Entry中的更新命令 // Update updates the object using the specified committed raft entry. func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { @@ -98,31 +126,36 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { queue = NewPriorityQueue[QueueItem]() s.queues[cmd.Group] = queue } - defer s.mu.Unlock() + s.mu.Unlock() queue.Push(&Slice[QueueItem]{ Key: key, Value: cmd.Item, }) result.Value = uint64(len(d)) - - log.Printf("%#v, %d, %#v", s.queues, len(queue.Queue), queue.Queue) - + go BatchQueueExecute.Execute(nil) // 通知可以执行update return result, err case "dequeue": - - // prefix := []byte(fmt.Sprintf("%s_", cmd.Group)) - s.mu.Lock() - var queue *PriorityQueue[QueueItem] var ok bool + + s.mu.Lock() if queue, ok = s.queues[cmd.Group]; !ok { queue = NewPriorityQueue[QueueItem]() s.queues[cmd.Group] = queue } - defer s.mu.Unlock() + s.mu.Unlock() + + if queue.Empty() { + return result, nil + } item := queue.Pop() + if item == nil { + result.Value = 0 + return result, nil + } + if item != nil { d, err := item.Encode() if err != nil { @@ -130,8 +163,12 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { } e.Result.Data = d result.Data = d + queue.Empty() + size := queue.Size() + go s.counter.Execute(&size) + // log.Println("queue remain:", queue.Size()) } - log.Printf("%#v", s.queues) + return result, err default: return result, fmt.Errorf("unknonw cmd type: %s", cmd.Name) diff --git a/start_test.go b/start_test.go index 7d87816..ea90179 100644 --- a/start_test.go +++ b/start_test.go @@ -4,9 +4,11 @@ import ( "context" "fusenrender" "log" + "testing" "time" + "github.com/google/uuid" "github.com/lni/goutils/syncutil" ) @@ -20,14 +22,7 @@ func TestStartNodeA(t *testing.T) { panic(err) } - item := &fusenrender.QueueItem{ - Group: "test", - Priority: uint32(1), - CreateAt: time.Now(), - Data: "a", - } - - a := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) raftStopper := syncutil.NewStopper() @@ -36,12 +31,19 @@ func TestStartNodeA(t *testing.T) { raftStopper.RunWorker(func() { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. - cs := a.GetNoOPSession(128) - ticker := time.NewTicker(5 * time.Second) + cs := nh.GetNoOPSession(128) + ticker := time.NewTicker(100 * time.Millisecond) for { select { case <-ticker.C: - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + + item := &fusenrender.QueueItem{ + Group: "test", + Priority: uint32(2), + CreateAt: time.Now(), + Data: uuid.New().String(), + } + cmd := fusenrender.Command{ Name: "enqueue", Group: "test", @@ -53,12 +55,13 @@ func TestStartNodeA(t *testing.T) { log.Println(err) } - result, err := a.SyncPropose(ctx, cs, data) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + _, err = nh.SyncPropose(ctx, cs, data) if err != nil { log.Println(err) } - log.Println(len(result.Data)) + // log.Println("enqueue", len(result.Data)) cancel() case <-raftStopper.ShouldStop(): @@ -77,13 +80,6 @@ func TestStartNodeB(t *testing.T) { panic(err) } - item := &fusenrender.QueueItem{ - Group: "test", - Priority: uint32(1), - CreateAt: time.Now(), - Data: "b", - } - nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) raftStopper := syncutil.NewStopper() @@ -94,11 +90,17 @@ func TestStartNodeB(t *testing.T) { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. cs := nh.GetNoOPSession(128) - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) for { select { case <-ticker.C: - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + + item := &fusenrender.QueueItem{ + Group: "test", + Priority: uint32(1), + CreateAt: time.Now(), + Data: uuid.New().String(), + } cmd := fusenrender.Command{ Name: "enqueue", Group: "test", @@ -110,12 +112,14 @@ func TestStartNodeB(t *testing.T) { log.Println(err) } - result, err := nh.SyncPropose(ctx, cs, data) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + _, err = nh.SyncPropose(ctx, cs, data) if err != nil { log.Println(err) + } else { + // log.Println("enqueue", len(result.Data)) } - log.Println(result) cancel() case <-raftStopper.ShouldStop(): @@ -136,37 +140,42 @@ func TestStartNodeC(t *testing.T) { } nh := fusenrender.StartNode(svc.ServerID, 128, svc.Address()) + log.Println(nh) raftStopper := syncutil.NewStopper() - - // ch := make(chan string, 16) - raftStopper.RunWorker(func() { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. - cs := nh.GetNoOPSession(128) - ticker := time.NewTicker(1 * time.Second) + // cs := nh.GetNoOPSession(128) + ticker := time.NewTicker(5 * time.Second) for { select { case <-ticker.C: - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - cmd := fusenrender.Command{ - Name: "dequeue", - Group: "test", - } - data, err := cmd.Encode() - if err != nil { - log.Println(err) - } + // for { - result, err := nh.SyncPropose(ctx, cs, data) - if err != nil { - log.Println(err) - } + // ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + // cmd := fusenrender.Command{ + // Name: "dequeue", + // Group: "test", + // } - log.Println(string(result.Data)) - cancel() + // data, err := cmd.Encode() + // if err != nil { + // log.Println(err) + // } + + // result, err := nh.SyncPropose(ctx, cs, data) + // if err != nil { + // log.Println(err) + // } + + // log.Println(len(result.Data), string(result.Data)) + // cancel() + // if len(result.Data) == 0 { + // break + // } + // } case <-raftStopper.ShouldStop(): return