diff --git a/pkg/rule_engine/nodes/action_save_timeseries_node.go b/pkg/rule_engine/nodes/action_save_timeseries_node.go index 448d8e9..5362673 100644 --- a/pkg/rule_engine/nodes/action_save_timeseries_node.go +++ b/pkg/rule_engine/nodes/action_save_timeseries_node.go @@ -40,7 +40,27 @@ func (n *saveTimeSeriesNode) Handle(msg *message.Message) error { } else { return errors.New("元数据中为获取到设备ID") } - err := global.TdDb.InsertDevice(deviceName+"_telemetry", msg.Msg) + msgData := make(map[string]any) + // 去掉多余的参数 + if pid, ok := msg.Metadata.GetValue("productId").(string); ok { + pts, err := services.ProductTemplateModelDao.FindList(entity.ProductTemplate{Pid: pid, Classify: strings.ToLower(message.TelemetrgMes)}) + if err != nil { + return errors.New("为获取到设备物模型信息") + } + for _, pt := range *pts { + value := msg.Msg.GetValue(pt.key) + msgData[pt.key] = value + } + } else { + return errors.New("元素组中为获取到设备ID") + } + ts := msg.Msg.GetValue("ts") + if ts == nil { + msgData["ts"] = time.Now().Local().Format("2006-01-02 15:04:05.000") + } else { + msgData["ts"] = ts + } + err := global.TdDb.InsertDevice(deviceName+"_telemetry", msgData) if err != nil { n.Debug(msg, message.DEBUGOUT, err.Error()) if failureLabelNode != nil {