mirror of
https://gitee.com/XM-GO/PandaX.git
synced 2026-04-23 02:48:34 +08:00
@@ -6,12 +6,7 @@ const logTableName = "device_log"
|
||||
|
||||
// CreateLogStable 添加LOG超级表
|
||||
func (s *TdEngine) CreateLogStable() (err error) {
|
||||
var name string
|
||||
err = s.db.QueryRow("SELECT stable_name FROM information_schema.ins_stables WHERE stable_name = 'device_log' LIMIT 1").Scan(&name)
|
||||
if name != "" {
|
||||
return
|
||||
}
|
||||
sql := "CREATE STABLE device_log (ts TIMESTAMP, type VARCHAR(20), content VARCHAR(1000)) TAGS (device VARCHAR(255))"
|
||||
sql := "CREATE STABLE IF NOT EXISTS device_log (ts TIMESTAMP, type VARCHAR(20), content VARCHAR(1000)) TAGS (device VARCHAR(255))"
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
}
|
||||
@@ -28,8 +23,8 @@ func (s *TdEngine) InsertLog(log *TdLog) (err error) {
|
||||
func (s *TdEngine) ClearLog() (err error) {
|
||||
ts := time.Now().Add(-7 * 24 * time.Hour).Format("2006-01-02")
|
||||
|
||||
sql := "DELETE FROM device_log WHERE ts < '" + ts + "'"
|
||||
_, err = s.db.Exec(sql)
|
||||
sql := "DELETE FROM device_log WHERE ts < ?"
|
||||
_, err = s.db.Exec(sql, ts)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3,27 +3,26 @@ package tdengine
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/kakuilan/kgo"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// RunSql 运行
|
||||
func (s *TdEngine) RunSql(sql string) (err error) {
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
func (s *TdEngine) RunSql(sql string) error {
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// InsertDevice 数据入库
|
||||
func (s *TdEngine) InsertDevice(deviceKey string, data map[string]interface{}) (err error) {
|
||||
func (s *TdEngine) InsertDevice(deviceKey string, data map[string]interface{}) error {
|
||||
if len(data) == 0 {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
field = []string{}
|
||||
value = []interface{}{}
|
||||
placeholders = []string{}
|
||||
field []string
|
||||
value []interface{}
|
||||
placeholders []string
|
||||
)
|
||||
|
||||
for k, v := range data {
|
||||
@@ -33,24 +32,24 @@ func (s *TdEngine) InsertDevice(deviceKey string, data map[string]interface{}) (
|
||||
}
|
||||
|
||||
sql := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", strings.ToLower(deviceKey), strings.Join(field, ","), strings.Join(placeholders, ","))
|
||||
_, err = s.db.Exec(sql, value...)
|
||||
_, err := s.db.Exec(sql, value...)
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// CreateStable 创建超级表 rowdata 源数据,在需要数据解析时使用
|
||||
func (s *TdEngine) CreateStable(table string) (err error) {
|
||||
func (s *TdEngine) CreateStable(table string) error {
|
||||
columns := []string{"ts TIMESTAMP,rowdata NCHAR(255)"}
|
||||
sql := fmt.Sprintf("CREATE STABLE IF NOT EXISTS %s.%s (%s) TAGS (device nchar(64))", s.dbName, table, strings.Join(columns, ","))
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// CreateTable 添加子表
|
||||
func (s *TdEngine) CreateTable(stable, table string) (err error) {
|
||||
func (s *TdEngine) CreateTable(stable, table string) error {
|
||||
sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s USING %s TAGS ('%s')", table, stable, table)
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *TdEngine) column(dataType, key, name string, maxLength int) string {
|
||||
@@ -89,77 +88,77 @@ func (s *TdEngine) column(dataType, key, name string, maxLength int) string {
|
||||
}
|
||||
|
||||
// 删除超级表
|
||||
func (s *TdEngine) DropStable(table string) (err error) {
|
||||
func (s *TdEngine) DropStable(table string) error {
|
||||
sql := fmt.Sprintf("DROP STABLE IF EXISTS %s.%s", s.dbName, strings.ToLower(table))
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// 删除子表
|
||||
func (s *TdEngine) DropTable(table string) (err error) {
|
||||
func (s *TdEngine) DropTable(table string) error {
|
||||
sql := fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", s.dbName, strings.ToLower(table))
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// AddSTableField 添加数据库超级表字段
|
||||
func (s *TdEngine) AddSTableField(tableName, fieldName string, dataType string, len int) (err error) {
|
||||
func (s *TdEngine) AddSTableField(tableName, fieldName string, dataType string, len int) error {
|
||||
sql := fmt.Sprintf("ALTER STABLE %s.%s ADD COLUMN %s", s.dbName, strings.ToLower(tableName), s.column(dataType, fieldName, "", len))
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// AddTableField 添加数据库表字段
|
||||
func (s *TdEngine) AddTableField(tableName, fieldName string, dataType string, len int) (err error) {
|
||||
func (s *TdEngine) AddTableField(tableName, fieldName string, dataType string, len int) error {
|
||||
sql := fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN %s", s.dbName, strings.ToLower(tableName), s.column(dataType, fieldName, "", len))
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// DelTableField 删除数据库表字段
|
||||
func (s *TdEngine) DelTableField(tableName, fieldName string) (err error) {
|
||||
func (s *TdEngine) DelTableField(tableName, fieldName string) error {
|
||||
sql := fmt.Sprintf("ALTER TABLE %s.%s DROP COLUMN %s", s.dbName, strings.ToLower(tableName), fieldName)
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// DelSTableField 删除数据库超级表字段
|
||||
func (s *TdEngine) DelSTableField(tableName, fieldName string) (err error) {
|
||||
func (s *TdEngine) DelSTableField(tableName, fieldName string) error {
|
||||
sql := fmt.Sprintf("ALTER STABLE %s.%s DROP COLUMN %s", s.dbName, strings.ToLower(tableName), fieldName)
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// ModifyDatabaseField 修改数据库指定字段长度
|
||||
func (s *TdEngine) ModifyDatabaseField(tableName, fieldName string, dataType string, len int) (err error) {
|
||||
func (s *TdEngine) ModifyDatabaseField(tableName, fieldName string, dataType string, len int) error {
|
||||
sql := fmt.Sprintf("ALTER STABLE %s.%s MODIFY COLUMN %s", s.dbName, strings.ToLower(tableName), s.column(dataType, fieldName, "", len))
|
||||
_, err = s.db.Exec(sql)
|
||||
_, err := s.db.Exec(sql)
|
||||
if err != nil {
|
||||
err = errors.New("设置字段长度失败,长度只能增大不能缩小")
|
||||
return errors.New("设置字段长度失败,长度只能增大不能缩小")
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddTag 添加标签
|
||||
func (s *TdEngine) AddTag(tableName, tagName string, dataType string, len int) (err error) {
|
||||
func (s *TdEngine) AddTag(tableName, tagName string, dataType string, len int) error {
|
||||
sql := fmt.Sprintf("ALTER STABLE %s.%s ADD TAG %s", s.dbName, tableName, s.column(dataType, tagName, "", len))
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// DelTag 删除标签
|
||||
func (s *TdEngine) DelTag(tableName, tagName string) (err error) {
|
||||
func (s *TdEngine) DelTag(tableName, tagName string) error {
|
||||
sql := fmt.Sprintf("ALTER STABLE %s.%s DROP TAG %s", s.dbName, tableName, tagName)
|
||||
_, err = s.db.Exec(sql)
|
||||
return
|
||||
_, err := s.db.Exec(sql)
|
||||
return err
|
||||
}
|
||||
|
||||
// ModifyTag 修改标签
|
||||
func (s *TdEngine) ModifyTag(tableName, tagName string, dataType string, len int) (err error) {
|
||||
func (s *TdEngine) ModifyTag(tableName, tagName string, dataType string, len int) error {
|
||||
sql := fmt.Sprintf("ALTER STABLE %s.%s MODIFY TAG %s", s.dbName, tableName, s.column(dataType, tagName, "", len))
|
||||
_, err = s.db.Exec(sql)
|
||||
_, err := s.db.Exec(sql)
|
||||
if err != nil {
|
||||
err = errors.New("设置标签长度失败,长度只能增大不能缩小")
|
||||
return errors.New("设置标签长度失败,长度只能增大不能缩小")
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user