diff --git a/config.yml b/config.yml index 4642600..14838dc 100644 --- a/config.yml +++ b/config.yml @@ -11,8 +11,6 @@ server: http-port: 9002 tcp-port: 9003 cors: true - # 数据上报 队列池 - queue-num: 1000 # 接口限流 rate: enable: true @@ -31,8 +29,9 @@ jwt: expire-time: 604800 #数据上报并发识别任务数量限制 queue: - enable: false - num: 3000 + queue-pool: 5 #消息队列池 + task-num: 100 #任务队列数 + ch-num: 3000 #并发执行数 redis: host: 127.0.0.1 diff --git a/go.mod b/go.mod index 5e89a87..c8e31e7 100644 --- a/go.mod +++ b/go.mod @@ -58,6 +58,8 @@ require ( github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect + github.com/goccy/go-json v0.9.7 // indirect + github.com/golang-queue/queue v0.1.3 // indirect github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect github.com/golang/mock v1.6.0 // indirect diff --git a/go.sum b/go.sum index 332f867..fce3d7c 100644 --- a/go.sum +++ b/go.sum @@ -127,8 +127,12 @@ github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= +github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40= +github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= @@ -335,6 +339,7 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= @@ -391,6 +396,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -601,6 +607,7 @@ gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:a gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI= gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/iothub/client/updclient/udp.go b/iothub/client/updclient/udp.go index 748b1eb..2ff6e2d 100644 --- a/iothub/client/updclient/udp.go +++ b/iothub/client/updclient/udp.go @@ -12,7 +12,6 @@ type UdpClientT struct { Addr *net.UDPAddr } -// var UdpClient = make(map[string]*UdpClientT) var UdpClient sync.Map func Send(deviceId, msg string) error { diff --git a/iothub/hook_message_work/hook_service.go b/iothub/hook_message_work/hook_service.go index cd7c9d4..5be9ac1 100644 --- a/iothub/hook_message_work/hook_service.go +++ b/iothub/hook_message_work/hook_service.go @@ -1,6 +1,10 @@ package hook_message_work import ( + "context" + "encoding/json" + "github.com/golang-queue/queue" + "github.com/golang-queue/queue/core" "pandax/iothub/netbase" "pandax/pkg/global" "sync" @@ -8,19 +12,28 @@ import ( type HookService struct { Cache sync.Map - Wg sync.WaitGroup // 优雅关闭 + Queue *queue.Queue Ch chan struct{} // 并发限制 + Wg sync.WaitGroup // 优雅关闭 MessageCh chan *netbase.DeviceEventInfo } func NewHookService() *HookService { hs := &HookService{ Cache: sync.Map{}, - MessageCh: make(chan *netbase.DeviceEventInfo), - } - // 并发限制,代表服务器处理能力 - if global.Conf.Queue.Enable && global.Conf.Queue.Num > 0 { - hs.Ch = make(chan struct{}, global.Conf.Queue.Num) + Ch: make(chan struct{}, global.Conf.Queue.ChNum), + MessageCh: make(chan *netbase.DeviceEventInfo, global.Conf.Queue.TaskNum), } + pool := queue.NewPool(int(global.Conf.Queue.QueuePool), queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error { + v, ok := m.(*netbase.DeviceEventInfo) + if !ok { + if err := json.Unmarshal(m.Bytes(), &v); err != nil { + return err + } + } + hs.MessageCh <- v + return nil + })) + hs.Queue = pool return hs } diff --git a/iothub/netbase/iothub_session.go b/iothub/netbase/iothub_session.go index d9ba545..2684b1e 100644 --- a/iothub/netbase/iothub_session.go +++ b/iothub/netbase/iothub_session.go @@ -1,6 +1,7 @@ package netbase import ( + "encoding/json" "pandax/pkg/global/model" ) @@ -11,3 +12,11 @@ type DeviceEventInfo struct { Type string `json:"type"` RequestId string `json:"requestId"` } + +func (j *DeviceEventInfo) Bytes() []byte { + b, err := json.Marshal(j) + if err != nil { + panic(err) + } + return b +} diff --git a/iothub/server/emqxserver/hook.go b/iothub/server/emqxserver/hook.go index 3362ffb..cdc0f88 100644 --- a/iothub/server/emqxserver/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -281,9 +281,8 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess data.RequestId = id } - //TODO 如果设备消息;量过大,推荐采用NATS队列处理 - s.HookService.MessageCh <- data - + //将数据放到队列中 + s.HookService.Queue.Queue(data) res.Value = &exhook2.ValuedResponse_Message{Message: in.Message} return res, nil } diff --git a/pkg/config/queue.go b/pkg/config/queue.go index b0b0aaa..6103c66 100644 --- a/pkg/config/queue.go +++ b/pkg/config/queue.go @@ -1,6 +1,7 @@ package config type Queue struct { - Enable bool `yaml:" enable"` - Num int64 `yaml:" num"` //并发数 + QueuePool int64 `yaml:"queue-pool"` //消息队列池 + TaskNum int64 `yaml:"task-num"` //任务队列数 + ChNum int64 `yaml:"ch-num"` //并发数 }