[优化]设备数据上传并发处理,添加队列,以及并发数控制

This commit is contained in:
PandaX
2023-11-28 17:59:01 +08:00
parent 58341b0236
commit 287c8a1b05
8 changed files with 45 additions and 16 deletions

View File

@@ -11,8 +11,6 @@ server:
http-port: 9002
tcp-port: 9003
cors: true
# 数据上报 队列池
queue-num: 1000
# 接口限流
rate:
enable: true
@@ -31,8 +29,9 @@ jwt:
expire-time: 604800
#数据上报并发识别任务数量限制
queue:
enable: false
num: 3000
queue-pool: 5 #消息队列池
task-num: 100 #任务队列数
ch-num: 3000 #并发执行数
redis:
host: 127.0.0.1

2
go.mod
View File

@@ -58,6 +58,8 @@ require (
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/golang-queue/queue v0.1.3 // indirect
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/mock v1.6.0 // indirect

7
go.sum
View File

@@ -127,8 +127,12 @@ github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang-queue/queue v0.1.3 h1:FGIrn8e0fN8EmL3glP0rFEcYVtWUGMEeqX4h4nnzh40=
github.com/golang-queue/queue v0.1.3/go.mod h1:h/PhaoMwT5Jc4sQNus7APgWBUItm6QC9k6JtmwrsRos=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
@@ -335,6 +339,7 @@ github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
@@ -391,6 +396,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@@ -601,6 +607,7 @@ gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:a
gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI=
gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=

View File

@@ -12,7 +12,6 @@ type UdpClientT struct {
Addr *net.UDPAddr
}
// var UdpClient = make(map[string]*UdpClientT)
var UdpClient sync.Map
func Send(deviceId, msg string) error {

View File

@@ -1,6 +1,10 @@
package hook_message_work
import (
"context"
"encoding/json"
"github.com/golang-queue/queue"
"github.com/golang-queue/queue/core"
"pandax/iothub/netbase"
"pandax/pkg/global"
"sync"
@@ -8,19 +12,28 @@ import (
type HookService struct {
Cache sync.Map
Wg sync.WaitGroup // 优雅关闭
Queue *queue.Queue
Ch chan struct{} // 并发限制
Wg sync.WaitGroup // 优雅关闭
MessageCh chan *netbase.DeviceEventInfo
}
func NewHookService() *HookService {
hs := &HookService{
Cache: sync.Map{},
MessageCh: make(chan *netbase.DeviceEventInfo),
}
// 并发限制,代表服务器处理能力
if global.Conf.Queue.Enable && global.Conf.Queue.Num > 0 {
hs.Ch = make(chan struct{}, global.Conf.Queue.Num)
Ch: make(chan struct{}, global.Conf.Queue.ChNum),
MessageCh: make(chan *netbase.DeviceEventInfo, global.Conf.Queue.TaskNum),
}
pool := queue.NewPool(int(global.Conf.Queue.QueuePool), queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error {
v, ok := m.(*netbase.DeviceEventInfo)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
return err
}
}
hs.MessageCh <- v
return nil
}))
hs.Queue = pool
return hs
}

View File

@@ -1,6 +1,7 @@
package netbase
import (
"encoding/json"
"pandax/pkg/global/model"
)
@@ -11,3 +12,11 @@ type DeviceEventInfo struct {
Type string `json:"type"`
RequestId string `json:"requestId"`
}
func (j *DeviceEventInfo) Bytes() []byte {
b, err := json.Marshal(j)
if err != nil {
panic(err)
}
return b
}

View File

@@ -281,9 +281,8 @@ func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.Mess
data.RequestId = id
}
//TODO 如果设备消息量过大推荐采用NATS队列处理
s.HookService.MessageCh <- data
//将数据放到队列中
s.HookService.Queue.Queue(data)
res.Value = &exhook2.ValuedResponse_Message{Message: in.Message}
return res, nil
}

View File

@@ -1,6 +1,7 @@
package config
type Queue struct {
Enable bool `yaml:" enable"`
Num int64 `yaml:" num"` //并发
QueuePool int64 `yaml:"queue-pool"` //消息队列池
TaskNum int64 `yaml:"task-num"` //任务队列
ChNum int64 `yaml:"ch-num"` //并发数
}