mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 10:58:35 +08:00
207 lines
4.6 KiB
Go
207 lines
4.6 KiB
Go
package message
|
||
|
||
import (
|
||
"sort"
|
||
"sync"
|
||
)
|
||
|
||
type RuleChainDebugData struct {
|
||
//Data 规则链ID->节点列表调试数据
|
||
Data map[string]*NodeDebugData
|
||
// MaxSize 每个节点允许的最大数量
|
||
MaxSize int
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// NewRuleChainDebugData 创建一个新的规则链调试数据列表数据
|
||
func NewRuleChainDebugData(maxSize int) *RuleChainDebugData {
|
||
if maxSize <= 0 {
|
||
maxSize = 60
|
||
}
|
||
return &RuleChainDebugData{
|
||
Data: make(map[string]*NodeDebugData),
|
||
MaxSize: maxSize,
|
||
}
|
||
}
|
||
|
||
func (d *RuleChainDebugData) Add(chainId string, nodeId string, data DebugData) {
|
||
d.mu.Lock()
|
||
ruleChainData, ok := d.Data[chainId]
|
||
if !ok {
|
||
ruleChainData = NewNodeDebugData(d.MaxSize)
|
||
d.Data[chainId] = ruleChainData
|
||
}
|
||
defer d.mu.Unlock()
|
||
|
||
ruleChainData.Add(nodeId, data)
|
||
}
|
||
|
||
// Get 获取指定规则链的节点调试数据列表
|
||
func (d *RuleChainDebugData) Get(chainId string, nodeId string) *FixedQueue {
|
||
d.mu.RLock()
|
||
ruleChainData, ok := d.Data[chainId]
|
||
defer d.mu.RUnlock()
|
||
if ok {
|
||
return ruleChainData.Get(nodeId)
|
||
} else {
|
||
return nil
|
||
}
|
||
}
|
||
func (d *RuleChainDebugData) GetToPage(chainId string, nodeId string) DebugDataPage {
|
||
list := d.Get(chainId, nodeId)
|
||
var page = DebugDataPage{}
|
||
if list != nil {
|
||
page.Total = list.Len()
|
||
//ts降序排序
|
||
sort.Slice(list.Items, func(i, j int) bool {
|
||
return list.Items[i].Ts > list.Items[j].Ts
|
||
})
|
||
page.Items = list.Items
|
||
}
|
||
return page
|
||
}
|
||
func (d *RuleChainDebugData) Clear(chainId string) {
|
||
d.mu.Lock()
|
||
defer d.mu.Unlock()
|
||
delete(d.Data, chainId)
|
||
}
|
||
|
||
// NodeDebugData 节点调试数据
|
||
type NodeDebugData struct {
|
||
Data map[string]*FixedQueue
|
||
// MaxSize 每个节点允许的最大数量
|
||
MaxSize int
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// NewNodeDebugData 创建一个新的节点调试数据列表数据
|
||
func NewNodeDebugData(maxSize int) *NodeDebugData {
|
||
if maxSize <= 0 {
|
||
maxSize = 60
|
||
}
|
||
return &NodeDebugData{
|
||
Data: make(map[string]*FixedQueue),
|
||
MaxSize: maxSize,
|
||
}
|
||
}
|
||
|
||
func (d *NodeDebugData) Add(nodeId string, data DebugData) {
|
||
d.mu.Lock()
|
||
list, ok := d.Data[nodeId]
|
||
if !ok {
|
||
list = NewFixedQueue(d.MaxSize)
|
||
d.Data[nodeId] = list
|
||
}
|
||
defer d.mu.Unlock()
|
||
|
||
list.Push(data)
|
||
}
|
||
|
||
// Get 获取自定节点列表数据
|
||
func (d *NodeDebugData) Get(nodeId string) *FixedQueue {
|
||
d.mu.RLock()
|
||
defer d.mu.RUnlock()
|
||
|
||
if list, ok := d.Data[nodeId]; ok {
|
||
return list
|
||
} else {
|
||
return nil
|
||
}
|
||
|
||
}
|
||
|
||
func (d *NodeDebugData) Clear(nodeId string) {
|
||
d.mu.Lock()
|
||
defer d.mu.Unlock()
|
||
delete(d.Data, nodeId)
|
||
}
|
||
|
||
// DebugData 调试数据
|
||
// OnDebug 回调函数提供的数据
|
||
type DebugData struct {
|
||
Ts string `json:"ts"`
|
||
NodeId string `json:"nodeId"`
|
||
MsgId string `json:"msgId"`
|
||
DebugType string `json:"debugType"` // In or Out
|
||
DeviceName string `json:"deviceName"`
|
||
MsgType string `json:"msgType"`
|
||
Msg Msg `json:"msg"`
|
||
Metadata Metadata `json:"metadata"`
|
||
Error string `json:"error"`
|
||
}
|
||
|
||
// DebugDataPage 分页返回数据
|
||
type DebugDataPage struct {
|
||
//每页多少条,默认读取所有
|
||
Size int `json:"Size"`
|
||
//当前第几页,默认读取所有
|
||
Current int `json:"current"`
|
||
//总数
|
||
Total int `json:"total"`
|
||
//记录
|
||
Items []DebugData `json:"items"`
|
||
}
|
||
|
||
// FixedQueue 固定大小的队列,如果超过会自动清除最旧的数据
|
||
type FixedQueue struct {
|
||
// Items 数据列表
|
||
Items []DebugData
|
||
// MaxSize 最大允许的条数
|
||
MaxSize int
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// NewFixedQueue 创建一个新的固定大小的队列
|
||
func NewFixedQueue(maxSize int) *FixedQueue {
|
||
return &FixedQueue{
|
||
Items: make([]DebugData, 0, maxSize),
|
||
MaxSize: maxSize,
|
||
}
|
||
}
|
||
|
||
// Push 向队列中添加一个元素,如果超过最大大小,会删除最旧的元素
|
||
func (q *FixedQueue) Push(item DebugData) {
|
||
q.mu.Lock()
|
||
defer q.mu.Unlock()
|
||
if len(q.Items) == q.MaxSize {
|
||
q.Items = q.Items[1:]
|
||
}
|
||
q.Items = append(q.Items, item)
|
||
}
|
||
|
||
// Pop 从队列中弹出一个元素,如果队列为空,返回false
|
||
func (q *FixedQueue) Pop() (DebugData, bool) {
|
||
q.mu.Lock()
|
||
defer q.mu.Unlock()
|
||
if len(q.Items) == 0 {
|
||
return DebugData{}, false
|
||
}
|
||
item := q.Items[0]
|
||
q.Items = q.Items[1:]
|
||
return item, true
|
||
}
|
||
|
||
// Len 返回队列中的元素个数
|
||
func (q *FixedQueue) Len() int {
|
||
q.mu.RLock()
|
||
defer q.mu.RUnlock()
|
||
return len(q.Items)
|
||
}
|
||
|
||
// Peek 返回队列中的第一个元素,但不删除它,如果队列为空,返回false
|
||
func (q *FixedQueue) Peek() (DebugData, bool) {
|
||
q.mu.RLock()
|
||
defer q.mu.RUnlock()
|
||
if len(q.Items) == 0 {
|
||
return DebugData{}, false
|
||
}
|
||
return q.Items[0], true
|
||
}
|
||
|
||
// Clear 清空队列中的所有元素
|
||
func (q *FixedQueue) Clear() {
|
||
q.mu.Lock()
|
||
defer q.mu.Unlock()
|
||
q.Items = make([]DebugData, 0, q.MaxSize)
|
||
}
|