From f9d8fb29958177a5e2472514234f8b1c5faa27be Mon Sep 17 00:00:00 2001 From: PandaX <18610165312@163.com> Date: Mon, 30 Oct 2023 08:10:23 +0800 Subject: [PATCH] =?UTF-8?q?[feat]=E6=B7=BB=E5=8A=A0=E8=A7=84=E5=88=99?= =?UTF-8?q?=E5=BC=95=E6=93=8Edebug=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/rule_engine/message/node_debug_data.go | 206 +++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 pkg/rule_engine/message/node_debug_data.go diff --git a/pkg/rule_engine/message/node_debug_data.go b/pkg/rule_engine/message/node_debug_data.go new file mode 100644 index 0000000..de3ac2b --- /dev/null +++ b/pkg/rule_engine/message/node_debug_data.go @@ -0,0 +1,206 @@ +package message + +import ( + "sort" + "sync" +) + +type RuleChainDebugData struct { + //Data 规则链ID->节点列表调试数据 + Data map[string]*NodeDebugData + // MaxSize 每个节点允许的最大数量 + MaxSize int + mu sync.RWMutex +} + +// NewRuleChainDebugData 创建一个新的规则链调试数据列表数据 +func NewRuleChainDebugData(maxSize int) *RuleChainDebugData { + if maxSize <= 0 { + maxSize = 60 + } + return &RuleChainDebugData{ + Data: make(map[string]*NodeDebugData), + MaxSize: maxSize, + } +} + +func (d *RuleChainDebugData) Add(chainId string, nodeId string, data DebugData) { + d.mu.Lock() + ruleChainData, ok := d.Data[chainId] + if !ok { + ruleChainData = NewNodeDebugData(d.MaxSize) + d.Data[chainId] = ruleChainData + } + defer d.mu.Unlock() + + ruleChainData.Add(nodeId, data) +} + +// Get 获取指定规则链的节点调试数据列表 +func (d *RuleChainDebugData) Get(chainId string, nodeId string) *FixedQueue { + d.mu.RLock() + ruleChainData, ok := d.Data[chainId] + defer d.mu.RUnlock() + if ok { + return ruleChainData.Get(nodeId) + } else { + return nil + } +} +func (d *RuleChainDebugData) GetToPage(chainId string, nodeId string) DebugDataPage { + list := d.Get(chainId, nodeId) + var page = DebugDataPage{} + if list != nil { + page.Total = list.Len() + //ts降序排序 + sort.Slice(list.Items, func(i, j int) bool { + return list.Items[i].Ts > list.Items[j].Ts + }) + page.Items = list.Items + } + return page +} +func (d *RuleChainDebugData) Clear(chainId string) { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.Data, chainId) +} + +// NodeDebugData 节点调试数据 +type NodeDebugData struct { + Data map[string]*FixedQueue + // MaxSize 每个节点允许的最大数量 + MaxSize int + mu sync.RWMutex +} + +// NewNodeDebugData 创建一个新的节点调试数据列表数据 +func NewNodeDebugData(maxSize int) *NodeDebugData { + if maxSize <= 0 { + maxSize = 60 + } + return &NodeDebugData{ + Data: make(map[string]*FixedQueue), + MaxSize: maxSize, + } +} + +func (d *NodeDebugData) Add(nodeId string, data DebugData) { + d.mu.Lock() + list, ok := d.Data[nodeId] + if !ok { + list = NewFixedQueue(d.MaxSize) + d.Data[nodeId] = list + } + defer d.mu.Unlock() + + list.Push(data) +} + +// Get 获取自定节点列表数据 +func (d *NodeDebugData) Get(nodeId string) *FixedQueue { + d.mu.RLock() + defer d.mu.RUnlock() + + if list, ok := d.Data[nodeId]; ok { + return list + } else { + return nil + } + +} + +func (d *NodeDebugData) Clear(nodeId string) { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.Data, nodeId) +} + +// DebugData 调试数据 +// OnDebug 回调函数提供的数据 +type DebugData struct { + Ts string + NodeId string `json:"nodeId"` + MsgId string `json:"msgId"` + DebugType string `json:"debugType"` // In or Out + DeviceName string `json:"deviceName"` + MsgType string `json:"msgType"` + Msg Msg `json:"msg"` + Metadata Metadata `json:"metadata"` + Error string `json:"error"` +} + +// DebugDataPage 分页返回数据 +type DebugDataPage struct { + //每页多少条,默认读取所有 + Size int `json:"Size"` + //当前第几页,默认读取所有 + Current int `json:"current"` + //总数 + Total int `json:"total"` + //记录 + Items []DebugData `json:"items"` +} + +// FixedQueue 固定大小的队列,如果超过会自动清除最旧的数据 +type FixedQueue struct { + // Items 数据列表 + Items []DebugData + // MaxSize 最大允许的条数 + MaxSize int + mu sync.RWMutex +} + +// NewFixedQueue 创建一个新的固定大小的队列 +func NewFixedQueue(maxSize int) *FixedQueue { + return &FixedQueue{ + Items: make([]DebugData, 0, maxSize), + MaxSize: maxSize, + } +} + +// Push 向队列中添加一个元素,如果超过最大大小,会删除最旧的元素 +func (q *FixedQueue) Push(item DebugData) { + q.mu.Lock() + defer q.mu.Unlock() + if len(q.Items) == q.MaxSize { + q.Items = q.Items[1:] + } + q.Items = append(q.Items, item) +} + +// Pop 从队列中弹出一个元素,如果队列为空,返回false +func (q *FixedQueue) Pop() (DebugData, bool) { + q.mu.Lock() + defer q.mu.Unlock() + if len(q.Items) == 0 { + return DebugData{}, false + } + item := q.Items[0] + q.Items = q.Items[1:] + return item, true +} + +// Len 返回队列中的元素个数 +func (q *FixedQueue) Len() int { + q.mu.RLock() + defer q.mu.RUnlock() + return len(q.Items) +} + +// Peek 返回队列中的第一个元素,但不删除它,如果队列为空,返回false +func (q *FixedQueue) Peek() (DebugData, bool) { + q.mu.RLock() + defer q.mu.RUnlock() + if len(q.Items) == 0 { + return DebugData{}, false + } + return q.Items[0], true +} + +// Clear 清空队列中的所有元素 +func (q *FixedQueue) Clear() { + q.mu.Lock() + defer q.mu.Unlock() + q.Items = make([]DebugData, 0, q.MaxSize) +}