【调整】暗色主题样式

This commit is contained in:
cai
2025-12-12 17:39:52 +08:00
parent 13669666e4
commit d01b42139c
1199 changed files with 203816 additions and 4592 deletions

View File

@@ -0,0 +1,33 @@
package workflow
import "ALLinSSL/backend/public"
func NewExecutionContext(RunID string) *ExecutionContext {
Logger, _ := public.NewLogger(public.GetSettingIgnoreError("workflow_log_path") + RunID + ".log")
return &ExecutionContext{
Data: make(map[string]any),
Status: make(map[string]ExecutionStatus),
RunID: RunID,
Logger: Logger,
}
}
func (ctx *ExecutionContext) SetOutput(nodeID string, output any, status ExecutionStatus) {
ctx.mu.Lock()
defer ctx.mu.Unlock()
ctx.Data[nodeID] = output
ctx.Status[nodeID] = status
}
func (ctx *ExecutionContext) GetOutput(nodeID string) (any, bool) {
ctx.mu.RLock()
defer ctx.mu.RUnlock()
out, ok := ctx.Data[nodeID]
return out, ok
}
func (ctx *ExecutionContext) GetStatus(nodeID string) ExecutionStatus {
ctx.mu.RLock()
defer ctx.mu.RUnlock()
return ctx.Status[nodeID]
}

View File

@@ -0,0 +1,259 @@
package workflow
import (
"ALLinSSL/backend/internal/cert"
certApply "ALLinSSL/backend/internal/cert/apply"
certDeploy "ALLinSSL/backend/internal/cert/deploy"
"ALLinSSL/backend/internal/private_ca"
"ALLinSSL/backend/internal/report"
"ALLinSSL/backend/public"
"errors"
"fmt"
"strconv"
)
// var executors map[string]func(map[string]any) (any, error)
//
// func RegistExector(executorName string, executor func(map[string]any) (any, error)) {
// executors[executorName] = executor
// }
func Executors(exec string, params map[string]any) (any, error) {
switch exec {
case "apply":
return apply(params)
case "deploy":
return deploy(params)
case "upload":
return upload(params)
case "notify":
return notify(params)
case "private_ca":
return privateCa(params)
default:
return nil, nil
}
}
func privateCa(params map[string]any) (any, error) {
logger := params["logger"].(*public.Logger)
logger.Info("=============私有CA签发证书=============")
certificate, err := private_ca.WorkflowCreateLeafCert(params, logger)
if err != nil {
logger.Error(err.Error())
logger.Info("=============签发失败=============")
return nil, err
}
logger.Info("=============签发成功=============")
return certificate, nil
}
func apply(params map[string]any) (any, error) {
logger := params["logger"].(*public.Logger)
logger.Info("=============申请证书=============")
certificate, err := certApply.Apply(params, logger)
if err != nil {
logger.Error(err.Error())
logger.Info("=============申请失败=============")
return nil, err
}
logger.Info("=============申请成功=============")
return certificate, nil
}
func deploy(params map[string]any) (any, error) {
logger := params["logger"].(*public.Logger)
logger.Info("=============部署证书=============")
certificate := params["certificate"]
if certificate == nil {
logger.Error("证书不存在")
logger.Info("=============部署失败=============")
return nil, errors.New("证书不存在")
}
certificateMap, ok := params["certificate"].(map[string]any)
if !ok {
logger.Error("证书不存在")
logger.Info("=============部署失败=============")
return nil, errors.New("证书不存在")
}
certStr, ok := certificateMap["cert"].(string)
if !ok {
logger.Error("证书格式错误")
logger.Info("=============部署失败=============")
return nil, errors.New("证书格式错误")
}
nowSha256, err := public.GetSHA256(certStr)
if err != nil {
logger.Error("解析证书sha256失败" + err.Error())
logger.Info("=============部署失败=============")
return nil, err
}
s, err := public.NewSqlite("data/data.db", "")
if err != nil {
logger.Error("新建数据库连接失败" + err.Error())
logger.Info("=============部署失败=============")
return nil, err
}
defer s.Close()
s.TableName = "workflow_history"
historyData, err := s.Where("id=?", []any{params["_runId"]}).Find()
if err != nil {
logger.Error("查询表workflow_history失败" + err.Error())
logger.Info("=============部署失败=============")
return nil, err
}
workflowId := historyData["workflow_id"]
s.TableName = "workflow_deploy"
deployData, err := s.Where("workflow_id=? and id=?", []any{workflowId, params["NodeId"]}).Select()
if err != nil {
logger.Error("查询表workflow_deploy失败" + err.Error())
logger.Info("=============部署失败=============")
return nil, err
}
if params["skip"] != nil {
var skip int
switch v := params["skip"].(type) {
case int:
skip = v
case float64:
skip = int(v)
case string:
skip, _ = strconv.Atoi(v)
}
if skip == 1 {
if len(deployData) > 0 {
beSha256, ok := deployData[0]["cert_hash"].(string)
if !ok {
logger.Error("证书hash格式错误")
logger.Info("=============部署失败=============")
return nil, errors.New("证书hash格式错误")
}
if beSha256 == nowSha256 && deployData[0]["status"].(string) == "success" {
logger.Info("与上次部署的证书sha256相同且上次部署成功跳过重复部署")
logger.Info("=============部署成功=============")
return map[string]any{
"skip": true,
}, nil
}
}
}
}
err = certDeploy.Deploy(params, logger)
var status string
if err != nil {
status = "fail"
logger.Error(err.Error())
logger.Info("=============部署失败=============")
} else {
status = "success"
logger.Info("=============部署成功=============")
}
if len(deployData) > 0 {
s.Where("workflow_id=? and id=?", []any{workflowId, params["NodeId"]}).Update(map[string]interface{}{"cert_hash": nowSha256, "status": status})
} else {
s.Insert(map[string]interface{}{"cert_hash": nowSha256, "workflow_id": workflowId, "id": params["NodeId"], "status": status})
}
return nil, err
}
func upload(params map[string]any) (any, error) {
logger := params["logger"].(*public.Logger)
logger.Info("=============上传证书=============")
// 判断证书id走本地还是走旧上传应在之后的迭代中移除旧代码
if params["cert_id"] == nil {
keyStr, ok := params["key"].(string)
if !ok {
logger.Error("上传的密钥有误")
logger.Info("=============上传失败=============")
return nil, errors.New("上传的密钥有误")
}
certStr, ok := params["cert"].(string)
if !ok {
logger.Error("上传的证书有误")
logger.Info("=============上传失败=============")
return nil, errors.New("上传的证书有误")
}
_, err := cert.UploadCert(keyStr, certStr)
if err != nil {
logger.Error(err.Error())
logger.Info("=============上传失败=============")
return nil, err
}
logger.Info("=============上传成功=============")
return params, nil
} else {
certId := ""
switch v := params["cert_id"].(type) {
case float64:
certId = strconv.Itoa(int(v))
case string:
certId = v
default:
logger.Info("=============上传证书获取失败=============")
return nil, errors.New("证书 ID 类型错误")
}
result := map[string]any{}
certObj, err := cert.GetCert(certId)
if err != nil {
logger.Error(err.Error())
logger.Info("=============上传证书获取失败=============")
return nil, err
}
if certObj == nil {
logger.Error("证书不存在")
logger.Info("=============上传证书获取失败=============")
return nil, errors.New("证书不存在")
}
logger.Debug(fmt.Sprintf("证书 ID: %s", certId))
result["cert"] = certObj["cert"]
result["key"] = certObj["key"]
return result, nil
}
}
func notify(params map[string]any) (any, error) {
// fmt.Println("通知:", params)
logger := params["logger"].(*public.Logger)
logger.Info("=============发送通知=============")
if fromNodeData, ok := params["fromNodeData"].(map[string]any); ok && fromNodeData != nil {
if v, ok := fromNodeData["skip"].(bool); ok && v {
// 如果 skip 是 true则跳过通知
var skip bool
switch v := params["skip"].(type) {
case int:
skip = v == 1
case float64:
skip = v == 1
case string:
skip = v == "1" || v == "true"
case bool:
skip = v
default:
skip = false
}
if skip {
logger.Debug("上个节点已跳过操作,跳过通知")
logger.Info("=============发送执行完成=============")
return map[string]any{
"skip": true,
}, nil
}
}
}
logger.Debug(fmt.Sprintf("发送通知:%s", params["subject"].(string)))
err := report.Notify(params)
if err != nil {
logger.Error(err.Error())
logger.Info("=============发送失败=============")
return nil, err
}
logger.Info("=============发送成功=============")
return fmt.Sprintf("通知到: %s", params["message"]), nil
}

View File

@@ -0,0 +1,49 @@
package workflow
import (
"ALLinSSL/backend/public"
"sync"
)
type ExecutionStatus string
const (
StatusSuccess ExecutionStatus = "success"
StatusFailed ExecutionStatus = "fail"
)
type WorkflowNodeParams struct {
Name string `json:"name"`
FromNodeID string `json:"fromNodeId,omitempty"`
}
type WorkflowNode struct {
Id string `json:"id"`
Type string `json:"type"`
Name string `json:"name"`
Config map[string]any `json:"config"`
Inputs []WorkflowNodeParams `json:"inputs"`
// Outputs []WorkflowNodeParams `json:"outputs"`
ChildNode *WorkflowNode `json:"childNode,omitempty"`
ConditionNodes []*WorkflowNode `json:"conditionNodes,omitempty"`
Validated bool `json:"validated"`
}
type ExecutionContext struct {
Data map[string]any
Status map[string]ExecutionStatus
mu sync.RWMutex
RunID string
Logger *public.Logger
}
type ExecTime struct {
Type string `json:"type"`
Month int `json:"month,omitempty"`
Week int `json:"week,omitempty"`
Hour int `json:"hour"`
Minute int `json:"minute"`
}

View File

@@ -0,0 +1,326 @@
package workflow
import (
"ALLinSSL/backend/public"
"encoding/json"
"fmt"
"sync"
"time"
)
func GetSqlite() (*public.Sqlite, error) {
s, err := public.NewSqlite("data/data.db", "")
if err != nil {
return nil, err
}
s.TableName = "workflow"
return s, nil
}
func GetList(search string, p, limit int64) ([]map[string]any, int, error) {
var data []map[string]any
var count int64
s, err := GetSqlite()
if err != nil {
return data, 0, err
}
defer s.Close()
var limits []int64
if p >= 0 && limit >= 0 {
limits = []int64{0, limit}
if p > 1 {
limits[0] = (p - 1) * limit
limits[1] = limit
}
}
if search != "" {
count, err = s.Where("name like ?", []interface{}{"%" + search + "%"}).Count()
data, err = s.Where("name like ?", []interface{}{"%" + search + "%"}).Order("update_time", "desc").Limit(limits).Select()
} else {
count, err = s.Count()
data, err = s.Order("update_time", "desc").Limit(limits).Select()
}
if err != nil {
return data, 0, err
}
return data, int(count), nil
}
func AddWorkflow(name, content, execType, active, execTime string) error {
var node WorkflowNode
err := json.Unmarshal([]byte(content), &node)
if err != nil {
return fmt.Errorf("检测到工作流配置有问题:%v", err)
}
s, err := GetSqlite()
if err != nil {
return err
}
defer s.Close()
now := time.Now().Format("2006-01-02 15:04:05")
_, err = s.Insert(map[string]interface{}{
"name": name,
"content": content,
"exec_type": execType,
"active": active,
"exec_time": execTime,
"create_time": now,
"update_time": now,
})
if err != nil {
return err
}
return nil
}
func DelWorkflow(id string) error {
s, err := GetSqlite()
if err != nil {
return err
}
defer s.Close()
_, err = s.Where("id=?", []interface{}{id}).Delete()
if err != nil {
return err
}
// 清理工作流历史记录
err = CleanWorkflowHistory()
if err != nil {
return fmt.Errorf("清理工作流历史记录失败: %v", err)
}
return nil
}
func UpdDb(id string, data map[string]any) error {
s, err := GetSqlite()
if err != nil {
return err
}
defer s.Close()
data["update_time"] = time.Now().Format("2006-01-02 15:04:05")
_, err = s.Where("id=?", []interface{}{id}).Update(data)
if err != nil {
return err
}
return nil
}
func UpdWorkflow(id, name, content, execType, active, execTime string) error {
var node WorkflowNode
err := json.Unmarshal([]byte(content), &node)
if err != nil {
return fmt.Errorf("检测到工作流配置有问题:%v", err)
}
err = UpdDb(id, map[string]interface{}{
"name": name,
"content": content,
"exec_type": execType,
"active": active,
"exec_time": execTime,
})
if err != nil {
return err
}
return nil
}
func UpdExecType(id, execType string) error {
err := UpdDb(id, map[string]interface{}{
"exec_type": execType,
})
if err != nil {
return err
}
return nil
}
func UpdActive(id, active string) error {
err := UpdDb(id, map[string]interface{}{
"active": active,
})
if err != nil {
return err
}
return nil
}
func ExecuteWorkflow(id string) error {
s, err := GetSqlite()
if err != nil {
return err
}
defer s.Close()
data, err := s.Where("id=?", []interface{}{id}).Select()
if err != nil {
return err
}
if len(data) == 0 {
return fmt.Errorf("workflow not found")
}
if data[0]["last_run_status"] != nil && data[0]["last_run_status"].(string) == "running" {
return fmt.Errorf("工作流正在执行中")
}
content := data[0]["content"].(string)
go func(id, c string) {
// defer wg.Done()
// WorkflowID := strconv.FormatInt(id, 10)
RunID, err := AddWorkflowHistory(id, "manual")
if err != nil {
return
}
ctx := NewExecutionContext(RunID)
defer ctx.Logger.Close()
err = RunWorkflow(c, ctx)
if err != nil {
fmt.Println("执行工作流失败:", err)
SetWorkflowStatus(id, RunID, "fail")
} else {
SetWorkflowStatus(id, RunID, "success")
}
}(id, content)
return nil
}
func SetWorkflowStatus(id, RunID, status string) {
_ = UpdateWorkflowHistory(RunID, status)
_ = UpdDb(id, map[string]interface{}{"last_run_status": status})
}
func resolveInputs(inputs []WorkflowNodeParams, ctx *ExecutionContext) map[string]any {
resolved := make(map[string]any)
for _, input := range inputs {
if input.FromNodeID != "" {
if val, ok := ctx.GetOutput(input.FromNodeID); ok {
// 暂时没有新的类型可以先写死
// switch strings.Split(strings.TrimPrefix(input.FromNodeID, "-"), "-")[0] {
// case "apply":
// input.Name = "certificate"
// case "upload":
// input.Name = "certificate"
// }
// resolved[input.Name] = val
resolved["certificate"] = val
}
}
}
return resolved
}
func RunNode(node *WorkflowNode, ctx *ExecutionContext) error {
// 获取上下文
inputs := resolveInputs(node.Inputs, ctx)
// 组装参数
if node.Config == nil {
node.Config = make(map[string]any)
}
for k, v := range inputs {
node.Config[k] = v
}
node.Config["_runId"] = ctx.RunID
node.Config["logger"] = ctx.Logger
node.Config["NodeId"] = node.Id
// 执行当前节点
result, err := Executors(node.Type, node.Config)
var status ExecutionStatus
if err != nil {
status = StatusFailed
if node.ChildNode == nil || node.ChildNode.Type != "execute_result_branch" {
return err
}
} else {
status = StatusSuccess
}
ctx.SetOutput(node.Id, result, status)
// 普通的并行
if node.Type == "branch" {
if len(node.ConditionNodes) > 0 {
var wg sync.WaitGroup
errChan := make(chan error, len(node.ConditionNodes))
for _, branch := range node.ConditionNodes {
if branch.ChildNode != nil {
if branch.ChildNode.Config == nil {
branch.ChildNode.Config = make(map[string]any)
}
branch.ChildNode.Config["fromNodeData"] = node.Config["fromNodeData"]
}
wg.Add(1)
go func(node *WorkflowNode) {
defer wg.Done()
if err = RunNode(node, ctx); err != nil {
errChan <- err
}
}(branch)
}
wg.Wait()
close(errChan)
for err := range errChan {
if err != nil {
return err
}
}
}
}
// 条件分支
if node.Type == "execute_result_branch" {
//
if len(node.ConditionNodes) > 0 {
lastStatus := ctx.GetStatus(node.Config["fromNodeId"].(string))
for _, branch := range node.ConditionNodes {
if branch.Config["type"] == string(lastStatus) {
if branch.ChildNode != nil {
if branch.ChildNode.Config == nil {
branch.ChildNode.Config = make(map[string]any)
}
fromNodeData, ok := ctx.GetOutput(node.Config["fromNodeId"].(string))
if !ok {
fromNodeData = nil
}
branch.ChildNode.Config["fromNodeData"] = fromNodeData
}
err := RunNode(branch, ctx)
if err != nil {
return fmt.Errorf("执行条件分支失败: %v", err)
}
}
}
}
}
if node.ChildNode != nil {
if node.ChildNode.Config == nil {
node.ChildNode.Config = make(map[string]any)
}
fromNodeData, ok := ctx.GetOutput(node.Id)
if ok && fromNodeData != nil && node.ChildNode.Config["fromNodeData"] == nil {
node.ChildNode.Config["fromNodeData"] = fromNodeData
}
return RunNode(node.ChildNode, ctx)
}
return nil
}
func RunWorkflow(content string, ctx *ExecutionContext) error {
var node WorkflowNode
err := json.Unmarshal([]byte(content), &node)
if err != nil {
return err
} else {
ctx.Logger.Info("=============开始执行=============")
err = RunNode(&node, ctx)
// fmt.Println(err)
if err != nil {
ctx.Logger.Info("=============执行失败=============")
return err
}
ctx.Logger.Info("=============执行完成=============")
return nil
}
}

View File

@@ -0,0 +1,165 @@
package workflow
import (
"ALLinSSL/backend/public"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
// GetSqliteObjWH 工作流执行历史记录表对象
func GetSqliteObjWH() (*public.Sqlite, error) {
s, err := public.NewSqlite("data/data.db", "")
if err != nil {
return nil, err
}
s.TableName = "workflow_history"
return s, nil
}
// GetListWH 获取工作流执行历史记录列表
func GetListWH(id string, p, limit int64) ([]map[string]any, int, error) {
var data []map[string]any
var count int64
s, err := GetSqliteObjWH()
if err != nil {
return data, 0, err
}
defer s.Close()
var limits []int64
if p >= 0 && limit >= 0 {
limits = []int64{0, limit}
if p > 1 {
limits[0] = (p - 1) * limit
limits[1] = limit
}
}
if id == "" {
count, err = s.Count()
data, err = s.Limit(limits).Order("create_time", "desc").Select()
} else {
count, err = s.Where("workflow_id=?", []interface{}{id}).Count()
data, err = s.Where("workflow_id=?", []interface{}{id}).Limit(limits).Order("create_time", "desc").Select()
}
if err != nil {
return data, 0, err
}
return data, int(count), nil
}
// 添加工作流执行历史记录
func AddWorkflowHistory(workflowID, execType string) (string, error) {
s, err := GetSqliteObjWH()
if err != nil {
return "", err
}
defer s.Close()
now := time.Now().Format("2006-01-02 15:04:05")
ID := public.GenerateUUID()
_, err = s.Insert(map[string]interface{}{
"id": ID,
"workflow_id": workflowID,
"status": "running",
"exec_type": execType,
"create_time": now,
})
if err != nil {
return "", err
}
_ = UpdDb(workflowID, map[string]interface{}{"last_run_status": "running", "last_run_time": now})
return ID, nil
}
// 工作流执行结束
func UpdateWorkflowHistory(id, status string) error {
s, err := GetSqliteObjWH()
if err != nil {
return err
}
defer s.Close()
now := time.Now().Format("2006-01-02 15:04:05")
_, err = s.Where("id=?", []interface{}{id}).Update(map[string]interface{}{
"status": status,
"end_time": now,
})
if err != nil {
return err
}
return nil
}
func StopWorkflow(id string) error {
s, err := GetSqliteObjWH()
if err != nil {
return err
}
defer s.Close()
data, err := s.Where("id=?", []interface{}{id}).Select()
if err != nil {
return err
}
if len(data) == 0 {
return nil
}
SetWorkflowStatus(data[0]["workflow_id"].(string), id, "fail")
return nil
}
func GetExecLog(id string) (string, error) {
log, err := os.ReadFile(filepath.Join(public.GetSettingIgnoreError("workflow_log_path"), id+".log"))
if err != nil {
return "", err
}
return string(log), nil
}
func CleanWorkflowHistory() error {
s, err := GetSqlite()
if err != nil {
return err
}
defer s.Close()
// 获取所有工作流ID
data, err := s.Select()
if err != nil {
return err
}
var workflowIds []string
for _, v := range data {
if workflowId, ok := v["id"].(int64); ok {
workflowIds = append(workflowIds, strconv.FormatInt(workflowId, 10))
}
}
workflowIdsStr := strings.Join(workflowIds, ",")
s.TableName = "workflow_history"
// 获取无意义的工作流记录id
data, err = s.Where("workflow_id NOT IN ("+workflowIdsStr+")", nil).Select()
if err != nil {
return err
}
// 删除无意义的工作流记录
_, err = s.Where("workflow_id NOT IN ("+workflowIdsStr+")", nil).Delete()
if err != nil {
return err
}
// 删除工作流执行日志
logPath := public.GetSettingIgnoreError("workflow_log_path")
if logPath == "" {
logPath = "logs/workflow"
}
for _, v := range data {
if id, ok := v["id"].(string); ok && id != "" {
logFile := filepath.Join(logPath, id+".log")
if _, err := os.Stat(logFile); err == nil {
if err := os.Remove(logFile); err != nil {
return err
}
}
}
}
return nil
}