This commit is contained in:
XM-GO
2023-08-22 15:17:14 +08:00
parent 85f4f328f4
commit 4344771547
143 changed files with 13004 additions and 6957 deletions

View File

@@ -0,0 +1,30 @@
package tdengine
import "time"
type TDEngineTablesList struct {
TableName string `json:"tableName" description:"表名"`
DbName string `json:"dbName" description:"数据库名"`
StableName string `json:"stableName" description:"超级表名"`
CreateTime *time.Time `json:"createTime" description:"创建时间"`
}
type TDEngineTableInfo struct {
Field string `json:"field" description:"字段名"`
Type string `json:"type" description:"类型"`
Length int `json:"length" description:"长度"`
Note string `json:"note" description:"note"`
}
type TableDataInfo struct {
Filed []string `json:"filed" description:"字段"`
Info []map[string]interface{} `json:"info" description:"数据"`
}
// 日志 TDengine
type TdLog struct {
Ts string `json:"ts" dc:"时间"`
Device string `json:"device" dc:"设备标识"`
Type string `json:"type" dc:"日志类型"`
Content string `json:"content" dc:"日志内容"`
}

203
pkg/tdengine/tdengine.go Normal file
View File

@@ -0,0 +1,203 @@
package tdengine
import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v3/taosRestful"
"time"
)
const (
TIME_TYPE_PROP = "telemetry"
TIME_TYPE_ATRE = "attributes"
TIME_TYPE_LOGS = "logs"
TIME_TYPE_ALARM = "alarm"
TIME_TYPE_EVENT = "event"
)
type TdEngine struct {
db *sql.DB
dbName string
}
func NewTdengine(username, password, host, db string) (*TdEngine, error) {
dsn := fmt.Sprintf("%s:%s@%s(%s)/%s",
username, password, "http", host, db)
open, err := sql.Open("taosRestful", dsn)
return &TdEngine{
db: open,
dbName: db,
}, err
}
// GetTdEngineAllDb 获取所有数据库
func (s *TdEngine) GetTdEngineAllDb() (data []string, err error) {
rows, err := s.db.Query("show databases;")
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var name string
err = rows.Scan(&name)
data = append(data, name)
}
return
}
// GetListTableByDatabases 获取指定数据库下所有的表列表
func (s *TdEngine) GetListTableByDatabases() (data []*TDEngineTablesList, err error) {
rows, err := s.db.Query("SELECT table_name AS tableName, db_name AS dbName, create_time AS createTime, stable_name AS stableName FROM information_schema.ins_tables WHERE db_name = '" + s.dbName + "'")
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var tableName, db, stableName string
var createTime *time.Time
err = rows.Scan(&tableName, &db, &createTime, &stableName)
if err != nil {
return
}
var tDEngineTablesList = new(TDEngineTablesList)
tDEngineTablesList.TableName = tableName
tDEngineTablesList.DbName = db
tDEngineTablesList.StableName = stableName
tDEngineTablesList.CreateTime = createTime
data = append(data, tDEngineTablesList)
}
return
}
// GetTdEngineTableInfoByTable 获取指定数据表结构信息
func (s *TdEngine) GetTdEngineTableInfoByTable(tableName string) (data []*TDEngineTableInfo, err error) {
rows, err := s.db.Query("DESCRIBE " + s.dbName + "." + tableName + ";")
if err != nil {
return
}
defer rows.Close()
for rows.Next() {
var tDEngineTableInfo = new(TDEngineTableInfo)
err = rows.Scan(&tDEngineTableInfo.Field, &tDEngineTableInfo.Type, &tDEngineTableInfo.Length, &tDEngineTableInfo.Note)
if err != nil {
return
}
data = append(data, tDEngineTableInfo)
}
return
}
// GetTdEngineTableDataByTable 获取指定数据表数据信息
func (s *TdEngine) GetTdEngineTableDataByTable(tableName string) (data *TableDataInfo, err error) {
data = new(TableDataInfo)
rows, err := s.db.Query("SELECT * FROM " + tableName)
if err != nil {
return
}
defer rows.Close()
//获取查询结果字段
columns, _ := rows.Columns()
//字段数组
var filed []string
//封装scanArg
scanArgs := make([]any, len(columns))
for i := range columns {
filed = append(filed, columns[i])
scanArgs[i] = &columns[i]
}
data.Filed = append(data.Filed, filed...)
for rows.Next() {
err = rows.Scan(scanArgs...)
if err != nil {
return
}
//封装返回结果
var resultMap = make(map[string]interface{})
for i := range columns {
resultMap[filed[i]] = columns[i]
}
data.Info = append(data.Info, resultMap)
}
return
}
// GetOne 超级表查询,单条数据
func (s *TdEngine) GetOne(sql string, args ...any) (rs map[string]interface{}, err error) {
rows, err := s.db.Query(sql, args...)
if err != nil {
return nil, err
}
defer rows.Close()
columns, _ := rows.Columns()
values := make([]any, len(columns))
rs = make(map[string]interface{}, len(columns))
for i := range values {
values[i] = new(any)
}
for rows.Next() {
err = rows.Scan(values...)
if err != nil {
return nil, err
}
for i, c := range columns {
//rs[c] = s.Time(values[i])
rs[c] = values[i]
}
rows.Close()
}
return
}
// GetAll 超级表查询,多条数据
func (s *TdEngine) GetAll(sql string, args ...any) (rs []map[string]interface{}, err error) {
rows, err := s.db.Query(sql, args...)
if err != nil {
return nil, err
}
defer rows.Close()
columns, _ := rows.Columns()
for rows.Next() {
values := make([]any, len(columns))
for i := range values {
values[i] = new(any)
}
err = rows.Scan(values...)
if err != nil {
return nil, err
}
m := make(map[string]interface{}, len(columns))
for i, c := range columns {
//m[c] = s.Time(gvar.New(values[i]))
m[c] = values[i]
}
rs = append(rs, m)
}
return
}
// REST连接时区处理
func (s *TdEngine) Time(v string) (rs string) {
if t, err := time.Parse("2006-01-02 15:04:05 +0000 UTC", v); err == nil {
rs = t.Local().Format("2006-01-02 15:04:05")
return
}
rs = v
return
}

View File

@@ -0,0 +1,43 @@
package tdengine
import (
"fmt"
"github.com/kakuilan/kgo"
"strings"
)
type ConnectInfo struct {
Ts string `json:"ts"`
ClientID string `json:"clientId"`
Type string `json:"type"` // 连接类型
PeerHost string `json:"peerHost"`
SocketPort string `json:"sockPort"`
Protocol string `json:"protocol"`
DeviceId string `json:"deviceId"`
}
// CreateEventTable 创建设备连接事件表
func (s *TdEngine) CreateEventTable() (err error) {
sql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.device_connect (ts TIMESTAMP,deviceId NCHAR(64),
type NCHAR(64),clientId NCHAR(64),peerHost NCHAR(64),sockPort NCHAR(64),protocol NCHAR(64))`, s.dbName)
_, err = s.db.Exec(sql)
return
}
func (s *TdEngine) InsertEvent(data map[string]any) (err error) {
if len(data) == 0 {
return
}
var (
field = []string{}
value = []string{}
)
for k, v := range data {
field = append(field, k)
value = append(value, "'"+kgo.KConv.ToStr(v)+"'")
}
sql := "INSERT INTO ? (?) VALUES (?)"
_, err = s.db.Exec(sql, "device_connect", strings.Join(field, ","), strings.Join(value, ","))
return err
}

View File

@@ -0,0 +1,56 @@
package tdengine
import "time"
// CreateLogStable 添加LOG超级表
func (s *TdEngine) CreateLogStable() (err error) {
var name string
err = s.db.QueryRow("SELECT stable_name FROM information_schema.ins_stables WHERE stable_name = 'device_log' LIMIT 1").Scan(&name)
if name != "" {
return
}
sql := "CREATE STABLE device_log (ts TIMESTAMP, type VARCHAR(20), content VARCHAR(1000)) TAGS (device VARCHAR(255))"
_, err = s.db.Exec(sql)
return
}
// InsertLog 写入数据
func (s *TdEngine) InsertLog(log *TdLog) (err error) {
sql := "INSERT INTO ? USING device_log TAGS ('?') VALUES ('?', '?', '?')"
_, err = s.db.Exec(sql, "log_"+log.Device, log.Device, log.Ts, log.Type, log.Content)
return
}
// ClearLog 清理过期数据
func (s *TdEngine) ClearLog() (err error) {
ts := time.Now().Add(-7 * 24 * time.Hour).Format("2006-01-02")
sql := "DELETE FROM device_log WHERE ts < '" + ts + "'"
_, err = s.db.Exec(sql)
return
}
// GetAllLog 超级表查询,多条数据
func (s *TdEngine) GetAllLog(sql string, args ...any) (list []TdLog, err error) {
rows, err := s.db.Query(sql, args...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var log TdLog
err = rows.Scan(&log.Ts, &log.Type, &log.Content, &log.Device)
if err != nil {
return nil, err
}
log.Ts = s.Time(log.Ts)
list = append(list, log)
}
return
}

View File

@@ -0,0 +1,162 @@
package tdengine
import (
"errors"
"fmt"
"github.com/kakuilan/kgo"
"strconv"
"strings"
)
// RunSql 运行
func (s *TdEngine) RunSql(sql string) (err error) {
_, err = s.db.Exec(sql)
return
}
// InsertDevice 数据入库
func (s *TdEngine) InsertDevice(deviceKey string, data map[string]any) (err error) {
if len(data) == 0 {
return
}
var (
field = []string{}
value = []string{}
)
for k, v := range data {
field = append(field, k)
value = append(value, "'"+kgo.KConv.ToStr(v)+"'")
}
sql := "INSERT INTO ? (?) VALUES (?)"
_, err = s.db.Exec(sql, deviceKey, strings.Join(field, ","), strings.Join(value, ","))
return
}
// CreateStable 创建超级表 rowdata 源数据,在需要数据解析时使用
func (s *TdEngine) CreateStable(table string) (err error) {
columns := []string{"ts TIMESTAMP,rowdata NCHAR(255)"}
sql := fmt.Sprintf("CREATE STABLE IF NOT EXISTS %s.%s (%s) TAGS (device nchar(64))", s.dbName, table, strings.Join(columns, ","))
_, err = s.db.Exec(sql)
return
}
// CreateTable 添加子表
func (s *TdEngine) CreateTable(stable, table string) (err error) {
sql := fmt.Sprintf("CREATE TABLE %s USING %s TAGS ('%s')", table, stable, table)
_, err = s.db.Exec(sql)
return
}
func (s *TdEngine) column(dataType, key, name string, maxLength int) string {
column := ""
comment := ""
if name != "" {
comment = "COMMENT '" + name + "'"
}
tdType := ""
switch dataType {
case "int64":
tdType = "INT"
case "long":
tdType = "BIGINT"
case "float64":
tdType = "FLOAT"
case "double":
tdType = "DOUBLE"
case "string":
if maxLength == 0 {
maxLength = 255
}
tdType = "NCHAR(" + strconv.Itoa(maxLength) + ")"
case "boolean":
tdType = "BOOL"
case "date":
tdType = "TIMESTAMP"
default:
if maxLength == 0 {
maxLength = 255
}
tdType = "NCHAR(" + strconv.Itoa(maxLength) + ")"
}
column = fmt.Sprintf("%s %s %s", key, tdType, comment)
return column
}
// 删除超级表
func (s *TdEngine) DropStable(table string) (err error) {
sql := fmt.Sprintf("DROP STABLE IF EXISTS %s.%s", s.dbName, table)
_, err = s.db.Exec(sql)
return
}
// 删除子表
func (s *TdEngine) DropTable(table string) (err error) {
sql := fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", s.dbName, table)
_, err = s.db.Exec(sql)
return
}
// AddSTableField 添加数据库超级表字段
func (s *TdEngine) AddSTableField(tableName, fieldName string, dataType string, len int) (err error) {
sql := fmt.Sprintf("ALTER STABLE %s.%s ADD COLUMN %s", s.dbName, tableName, s.column(dataType, fieldName, "", len))
_, err = s.db.Exec(sql)
return
}
// AddTableField 添加数据库表字段
func (s *TdEngine) AddTableField(tableName, fieldName string, dataType string, len int) (err error) {
sql := fmt.Sprintf("ALTER TABLE %s.%s ADD COLUMN %s", s.dbName, tableName, s.column(dataType, fieldName, "", len))
_, err = s.db.Exec(sql)
return
}
// DelTableField 删除数据库表字段
func (s *TdEngine) DelTableField(tableName, fieldName string) (err error) {
sql := fmt.Sprintf("ALTER TABLE %s.%s DROP COLUMN %s", s.dbName, tableName, fieldName)
_, err = s.db.Exec(sql)
return
}
// DelSTableField 删除数据库超级表字段
func (s *TdEngine) DelSTableField(tableName, fieldName string) (err error) {
sql := fmt.Sprintf("ALTER STABLE %s.%s DROP COLUMN %s", s.dbName, tableName, fieldName)
_, err = s.db.Exec(sql)
return
}
// ModifyDatabaseField 修改数据库指定字段长度
func (s *TdEngine) ModifyDatabaseField(tableName, fieldName string, dataType string, len int) (err error) {
sql := fmt.Sprintf("ALTER STABLE %s.%s MODIFY COLUMN %s", s.dbName, tableName, s.column(dataType, fieldName, "", len))
_, err = s.db.Exec(sql)
if err != nil {
err = errors.New("设置字段长度失败,长度只能增大不能缩小")
}
return
}
// AddTag 添加标签
func (s *TdEngine) AddTag(tableName, tagName string, dataType string, len int) (err error) {
sql := fmt.Sprintf("ALTER STABLE %s.%s ADD TAG %s", s.dbName, tableName, s.column(dataType, tagName, "", len))
_, err = s.db.Exec(sql)
return
}
// DelTag 删除标签
func (s *TdEngine) DelTag(tableName, tagName string) (err error) {
sql := fmt.Sprintf("ALTER STABLE %s.%s DROP TAG %s", s.dbName, tableName, tagName)
_, err = s.db.Exec(sql)
return
}
// ModifyTag 修改标签
func (s *TdEngine) ModifyTag(tableName, tagName string, dataType string, len int) (err error) {
sql := fmt.Sprintf("ALTER STABLE %s.%s MODIFY TAG %s", s.dbName, tableName, s.column(dataType, tagName, "", len))
_, err = s.db.Exec(sql)
if err != nil {
err = errors.New("设置标签长度失败,长度只能增大不能缩小")
}
return
}