Files
PandaX/pkg/shadow/shadow.go
PandaX eec9f3f004 设备影子添加互斥锁
Signed-off-by: PandaX <18610165312@163.com>
2025-04-14 02:22:09 +00:00

229 lines
5.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package shadow
import (
"errors"
"fmt"
"sync"
"time"
)
var UnknownDeviceErr = errors.New("unknown device")
var DeviceRepeatErr = errors.New("device already exists")
var BindPointDataErr = errors.New("bind online point data can't be parsed")
type OnlineChangeCallback func(deviceName string, online bool) // 设备上/下线回调
// DeviceShadow 设备影子
type DeviceShadow interface {
AddDevice(device Device) (err error)
GetDevice(deviceName string) (device Device, err error)
HasDevice(deviceName string) bool
DeleteDevice(deviceName ...string) error
GetDeviceUpdateAt(deviceName string) (time.Time, error)
GetDeviceStatus(deviceName string) (online bool, err error)
RefreshDeviceStatus(deviceName string)
SetOnline(deviceName string) (err error)
SetOffline(deviceName string) (err error)
SetOnlineChangeCallback(handlerFunc OnlineChangeCallback)
// StopStatusListener 停止设备状态监听
StopStatusListener()
// SetDeviceTTL 设备影子过期时间, 过期设备将下线
SetDeviceTTL(ttl int)
}
type deviceShadow struct {
m *sync.Map
ticker *time.Ticker
mutex *sync.RWMutex // 添加互斥锁
handlerFunc OnlineChangeCallback //上下线执行的回调函数
ttl int
}
var DeviceShadowInstance DeviceShadow
func init() {
shadow := &deviceShadow{
m: &sync.Map{},
ticker: time.NewTicker(time.Second),
ttl: 3600, // 默认1小时
handlerFunc: deviceHandler,
mutex: &sync.RWMutex{},
}
go shadow.checkOnOff()
DeviceShadowInstance = shadow
}
func InitDeviceShadow(deviceName string) Device {
device, err := DeviceShadowInstance.GetDevice(deviceName)
if err == UnknownDeviceErr {
device = NewDevice(deviceName)
DeviceShadowInstance.AddDevice(device)
}
return device
}
func (d *deviceShadow) AddDevice(device Device) (err error) {
if _, ok := d.m.Load(device.Name); ok {
return DeviceRepeatErr
}
device.updatedAt = time.Now()
d.m.Store(device.Name, device)
return nil
}
// SetDeviceTTL 设备影子过期时间
func (d *deviceShadow) SetDeviceTTL(ttl int) {
d.ttl = ttl
}
func (d *deviceShadow) GetDevice(deviceName string) (device Device, err error) {
d.mutex.RLock()
defer d.mutex.RUnlock()
if deviceAny, ok := d.m.Load(deviceName); ok {
return deviceAny.(Device), nil
} else {
return Device{}, UnknownDeviceErr
}
}
func (d *deviceShadow) HasDevice(deviceName string) bool {
if _, ok := d.m.Load(deviceName); ok {
return ok
}
return false
}
func (d *deviceShadow) DeleteDevice(deviceName ...string) error {
if len(deviceName) == 0 {
return nil
}
for _, v := range deviceName {
d.m.Delete(v)
}
return nil
}
func (d *deviceShadow) GetDeviceUpdateAt(deviceName string) (time.Time, error) {
if deviceAny, ok := d.m.Load(deviceName); ok {
return deviceAny.(Device).updatedAt, nil
} else {
return time.Time{}, UnknownDeviceErr
}
}
func (d *deviceShadow) changeOnOff(deviceName string, online bool) (err error) {
deviceAny, ok := d.m.Load(deviceName)
if !ok {
return UnknownDeviceErr
}
device := deviceAny.(Device)
if device.online != online {
device.online = online
device.updatedAt = time.Now()
d.m.Store(deviceName, device)
d.handlerCallback(deviceName, online)
}
return
}
func (d *deviceShadow) GetDeviceStatus(deviceName string) (online bool, err error) {
if deviceAny, ok := d.m.Load(deviceName); ok {
device := deviceAny.(Device)
return device.online, nil
} else {
return false, UnknownDeviceErr
}
}
// RefreshDeviceStatus 刷新设备状态
func (d *deviceShadow) RefreshDeviceStatus(deviceName string) {
if deviceAny, ok := d.m.Load(deviceName); ok {
device := deviceAny.(Device)
if device.online {
return
}
device.online = true
device.updatedAt = time.Now()
d.m.Store(deviceName, device)
}
}
func (d *deviceShadow) SetOnline(deviceName string) (err error) {
return d.changeOnOff(deviceName, true)
}
func (d *deviceShadow) SetOffline(deviceName string) (err error) {
return d.changeOnOff(deviceName, false)
}
func (d *deviceShadow) SetOnlineChangeCallback(handlerFunc OnlineChangeCallback) {
d.handlerFunc = handlerFunc
}
func (d *deviceShadow) StopStatusListener() {
d.ticker.Stop()
}
// 定时检测设备是否离线
func (d *deviceShadow) checkOnOff() {
for range d.ticker.C {
d.m.Range(func(key, value interface{}) bool {
device, ok := value.(Device)
if !ok {
return true
}
if device.online && time.Since(device.updatedAt) > time.Duration(d.ttl)*time.Second {
_ = d.SetOffline(device.Name)
}
return true
})
}
}
func (d *deviceShadow) handlerCallback(deviceName string, online bool) {
if d.handlerFunc != nil {
go d.handlerFunc(deviceName, online)
}
}
// 解析在离线状态绑定点位值支持数据类型int、float、string、bool
func parseOnlineBindPV(pv interface{}) (online bool, err error) {
switch v := pv.(type) {
case string:
return parseStrOnlineBindPV(v)
case int8, int16, int32, int, int64, uint8, uint16, uint32, uint, uint64:
s := fmt.Sprintf("%d", v)
return parseStrOnlineBindPV(s)
case float32, float64:
s := fmt.Sprintf("%.0f", v)
return parseStrOnlineBindPV(s)
case bool:
if v {
return true, nil
}
return
default:
return false, BindPointDataErr
}
}
func parseStrOnlineBindPV(pv string) (online bool, err error) {
onlineList := []string{"on", "online", "1", "true", "yes"}
offlineList := []string{"off", "offline", "0", "false", "no"}
for _, v := range onlineList {
if pv == v {
return true, nil
}
}
for _, v := range offlineList {
if pv == v {
return false, nil
}
}
return false, BindPointDataErr
}