Files
PandaX/pkg/rule_engine/message/node_debug_data.go
2023-10-30 09:26:01 +08:00

207 lines
4.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package message
import (
"sort"
"sync"
)
type RuleChainDebugData struct {
//Data 规则链ID->节点列表调试数据
Data map[string]*NodeDebugData
// MaxSize 每个节点允许的最大数量
MaxSize int
mu sync.RWMutex
}
// NewRuleChainDebugData 创建一个新的规则链调试数据列表数据
func NewRuleChainDebugData(maxSize int) *RuleChainDebugData {
if maxSize <= 0 {
maxSize = 60
}
return &RuleChainDebugData{
Data: make(map[string]*NodeDebugData),
MaxSize: maxSize,
}
}
func (d *RuleChainDebugData) Add(chainId string, nodeId string, data DebugData) {
d.mu.Lock()
ruleChainData, ok := d.Data[chainId]
if !ok {
ruleChainData = NewNodeDebugData(d.MaxSize)
d.Data[chainId] = ruleChainData
}
defer d.mu.Unlock()
ruleChainData.Add(nodeId, data)
}
// Get 获取指定规则链的节点调试数据列表
func (d *RuleChainDebugData) Get(chainId string, nodeId string) *FixedQueue {
d.mu.RLock()
ruleChainData, ok := d.Data[chainId]
defer d.mu.RUnlock()
if ok {
return ruleChainData.Get(nodeId)
} else {
return nil
}
}
func (d *RuleChainDebugData) GetToPage(chainId string, nodeId string) DebugDataPage {
list := d.Get(chainId, nodeId)
var page = DebugDataPage{}
if list != nil {
page.Total = list.Len()
//ts降序排序
sort.Slice(list.Items, func(i, j int) bool {
return list.Items[i].Ts > list.Items[j].Ts
})
page.Items = list.Items
}
return page
}
func (d *RuleChainDebugData) Clear(chainId string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.Data, chainId)
}
// NodeDebugData 节点调试数据
type NodeDebugData struct {
Data map[string]*FixedQueue
// MaxSize 每个节点允许的最大数量
MaxSize int
mu sync.RWMutex
}
// NewNodeDebugData 创建一个新的节点调试数据列表数据
func NewNodeDebugData(maxSize int) *NodeDebugData {
if maxSize <= 0 {
maxSize = 60
}
return &NodeDebugData{
Data: make(map[string]*FixedQueue),
MaxSize: maxSize,
}
}
func (d *NodeDebugData) Add(nodeId string, data DebugData) {
d.mu.Lock()
list, ok := d.Data[nodeId]
if !ok {
list = NewFixedQueue(d.MaxSize)
d.Data[nodeId] = list
}
defer d.mu.Unlock()
list.Push(data)
}
// Get 获取自定节点列表数据
func (d *NodeDebugData) Get(nodeId string) *FixedQueue {
d.mu.RLock()
defer d.mu.RUnlock()
if list, ok := d.Data[nodeId]; ok {
return list
} else {
return nil
}
}
func (d *NodeDebugData) Clear(nodeId string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.Data, nodeId)
}
// DebugData 调试数据
// OnDebug 回调函数提供的数据
type DebugData struct {
Ts string `json:"ts"`
NodeId string `json:"nodeId"`
MsgId string `json:"msgId"`
DebugType string `json:"debugType"` // In or Out
DeviceName string `json:"deviceName"`
MsgType string `json:"msgType"`
Msg Msg `json:"msg"`
Metadata Metadata `json:"metadata"`
Error string `json:"error"`
}
// DebugDataPage 分页返回数据
type DebugDataPage struct {
//每页多少条,默认读取所有
Size int `json:"Size"`
//当前第几页,默认读取所有
Current int `json:"current"`
//总数
Total int `json:"total"`
//记录
Items []DebugData `json:"items"`
}
// FixedQueue 固定大小的队列,如果超过会自动清除最旧的数据
type FixedQueue struct {
// Items 数据列表
Items []DebugData
// MaxSize 最大允许的条数
MaxSize int
mu sync.RWMutex
}
// NewFixedQueue 创建一个新的固定大小的队列
func NewFixedQueue(maxSize int) *FixedQueue {
return &FixedQueue{
Items: make([]DebugData, 0, maxSize),
MaxSize: maxSize,
}
}
// Push 向队列中添加一个元素,如果超过最大大小,会删除最旧的元素
func (q *FixedQueue) Push(item DebugData) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.Items) == q.MaxSize {
q.Items = q.Items[1:]
}
q.Items = append(q.Items, item)
}
// Pop 从队列中弹出一个元素如果队列为空返回false
func (q *FixedQueue) Pop() (DebugData, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.Items) == 0 {
return DebugData{}, false
}
item := q.Items[0]
q.Items = q.Items[1:]
return item, true
}
// Len 返回队列中的元素个数
func (q *FixedQueue) Len() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.Items)
}
// Peek 返回队列中的第一个元素但不删除它如果队列为空返回false
func (q *FixedQueue) Peek() (DebugData, bool) {
q.mu.RLock()
defer q.mu.RUnlock()
if len(q.Items) == 0 {
return DebugData{}, false
}
return q.Items[0], true
}
// Clear 清空队列中的所有元素
func (q *FixedQueue) Clear() {
q.mu.Lock()
defer q.mu.Unlock()
q.Items = make([]DebugData, 0, q.MaxSize)
}