代码同步

This commit is contained in:
zhangchenhao
2025-05-10 16:45:46 +08:00
parent ad6b3cfa64
commit 8a9d766b50
103 changed files with 967 additions and 762 deletions

View File

@@ -8,6 +8,7 @@ import (
"ALLinSSL/backend/public"
"errors"
"fmt"
"strconv"
)
// var executors map[string]func(map[string]any) (any, error)
@@ -33,7 +34,7 @@ func Executors(exec string, params map[string]any) (any, error) {
func apply(params map[string]any) (any, error) {
logger := params["logger"].(*public.Logger)
logger.Info("=============申请证书=============")
certificate, err := certApply.Apply(params, logger)
if err != nil {
@@ -67,28 +68,57 @@ func deploy(params map[string]any) (any, error) {
func upload(params map[string]any) (any, error) {
logger := params["logger"].(*public.Logger)
logger.Info("=============上传证书=============")
keyStr, ok := params["key"].(string)
if !ok {
logger.Error("上传的密钥有误")
logger.Info("=============上传失败=============")
return nil, errors.New("上传的密钥有误")
// 判断证书id走本地还是走旧上传应在之后的迭代中移除旧代码
if params["cert_id"] == nil {
keyStr, ok := params["key"].(string)
if !ok {
logger.Error("上传的密钥有误")
logger.Info("=============上传失败=============")
return nil, errors.New("上传的密钥有误")
}
certStr, ok := params["cert"].(string)
if !ok {
logger.Error("上传的证书有误")
logger.Info("=============上传失败=============")
return nil, errors.New("上传的证书有误")
}
_, err := cert.UploadCert(keyStr, certStr)
if err != nil {
logger.Error(err.Error())
logger.Info("=============上传失败=============")
return nil, err
}
logger.Info("=============上传成功=============")
return params, nil
} else {
certId := ""
switch v := params["cert_id"].(type) {
case float64:
certId = strconv.Itoa(int(v))
case string:
certId = v
default:
logger.Info("=============上传证书获取失败=============")
return nil, errors.New("证书 ID 类型错误")
}
result := map[string]any{}
certObj, err := cert.GetCert(certId)
if err != nil {
logger.Error(err.Error())
logger.Info("=============上传证书获取失败=============")
return nil, err
}
if certObj == nil {
logger.Error("证书不存在")
logger.Info("=============上传证书获取失败=============")
return nil, errors.New("证书不存在")
}
logger.Debug(fmt.Sprintf("证书 ID: %s", certId))
result["cert"] = certObj["cert"]
result["key"] = certObj["key"]
return result, nil
}
certStr, ok := params["cert"].(string)
if !ok {
logger.Error("上传的证书有误")
logger.Info("=============上传失败=============")
return nil, errors.New("上传的证书有误")
}
err := cert.UploadCert(keyStr, certStr)
if err != nil {
logger.Error(err.Error())
logger.Info("=============上传失败=============")
return nil, err
}
logger.Info("=============上传成功=============")
return params, nil
}
func notify(params map[string]any) (any, error) {

View File

@@ -4,7 +4,6 @@ import (
"ALLinSSL/backend/public"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
)
@@ -27,7 +26,7 @@ func GetList(search string, p, limit int64) ([]map[string]any, int, error) {
return data, 0, err
}
defer s.Close()
var limits []int64
if p >= 0 && limit >= 0 {
limits = []int64{0, limit}
@@ -36,7 +35,7 @@ func GetList(search string, p, limit int64) ([]map[string]any, int, error) {
limits[1] = p * limit
}
}
if search != "" {
count, err = s.Where("name like ?", []interface{}{"%" + search + "%"}).Count()
data, err = s.Where("name like ?", []interface{}{"%" + search + "%"}).Order("update_time", "desc").Limit(limits).Select()
@@ -56,7 +55,7 @@ func AddWorkflow(name, content, execType, active, execTime string) error {
if err != nil {
return fmt.Errorf("检测到工作流配置有问题:%v", err)
}
s, err := GetSqlite()
if err != nil {
return err
@@ -161,7 +160,7 @@ func ExecuteWorkflow(id string) error {
return fmt.Errorf("工作流正在执行中")
}
content := data[0]["content"].(string)
go func(id, c string) {
// defer wg.Done()
// WorkflowID := strconv.FormatInt(id, 10)
@@ -192,13 +191,15 @@ func resolveInputs(inputs []WorkflowNodeParams, ctx *ExecutionContext) map[strin
for _, input := range inputs {
if input.FromNodeID != "" {
if val, ok := ctx.GetOutput(input.FromNodeID); ok {
switch strings.Split(strings.TrimPrefix(input.FromNodeID, "-"), "-")[0] {
case "apply":
input.Name = "certificate"
case "upload":
input.Name = "certificate"
}
resolved[input.Name] = val
// 暂时没有新的类型可以先写死
// switch strings.Split(strings.TrimPrefix(input.FromNodeID, "-"), "-")[0] {
// case "apply":
// input.Name = "certificate"
// case "upload":
// input.Name = "certificate"
// }
// resolved[input.Name] = val
resolved["certificate"] = val
}
}
}
@@ -217,10 +218,10 @@ func RunNode(node *WorkflowNode, ctx *ExecutionContext) error {
}
node.Config["_runId"] = ctx.RunID
node.Config["logger"] = ctx.Logger
// 执行当前节点
result, err := Executors(node.Type, node.Config)
var status ExecutionStatus
if err != nil {
status = StatusFailed
@@ -230,9 +231,9 @@ func RunNode(node *WorkflowNode, ctx *ExecutionContext) error {
} else {
status = StatusSuccess
}
ctx.SetOutput(node.Id, result, status)
// 普通的并行
if node.Type == "branch" {
if len(node.ConditionNodes) > 0 {
@@ -268,7 +269,7 @@ func RunNode(node *WorkflowNode, ctx *ExecutionContext) error {
}
}
}
if node.ChildNode != nil {
return RunNode(node.ChildNode, ctx)
}

View File

@@ -1,117 +1,117 @@
package workflow
import (
"ALLinSSL/backend/public"
"os"
"path/filepath"
"time"
)
// GetSqliteObjWH 工作流执行历史记录表对象
func GetSqliteObjWH() (*public.Sqlite, error) {
s, err := public.NewSqlite("data/data.db", "")
if err != nil {
return nil, err
}
s.Connect()
s.TableName = "workflow_history"
return s, nil
}
// GetListWH 获取工作流执行历史记录列表
func GetListWH(id string, p, limit int64) ([]map[string]any, int, error) {
var data []map[string]any
var count int64
s, err := GetSqliteObjWH()
if err != nil {
return data, 0, err
}
defer s.Close()
var limits []int64
if p >= 0 && limit >= 0 {
limits = []int64{0, limit}
if p > 1 {
limits[0] = (p - 1) * limit
limits[1] = p * limit
}
}
if id == "" {
count, err = s.Count()
data, err = s.Limit(limits).Order("create_time", "desc").Select()
} else {
count, err = s.Where("workflow_id=?", []interface{}{id}).Count()
data, err = s.Where("workflow_id=?", []interface{}{id}).Limit(limits).Order("create_time", "desc").Select()
}
if err != nil {
return data, 0, err
}
return data, int(count), nil
}
// 添加工作流执行历史记录
func AddWorkflowHistory(workflowID, execType string) (string, error) {
s, err := GetSqliteObjWH()
if err != nil {
return "", err
}
defer s.Close()
now := time.Now().Format("2006-01-02 15:04:05")
ID := public.GenerateUUID()
_, err = s.Insert(map[string]interface{}{
"id": ID,
"workflow_id": workflowID,
"status": "running",
"exec_type": execType,
"create_time": now,
})
if err != nil {
return "", err
}
_ = UpdDb(workflowID, map[string]interface{}{"last_run_status": "running", "last_run_time": now})
return ID, nil
}
// 工作流执行结束
func UpdateWorkflowHistory(id, status string) error {
s, err := GetSqliteObjWH()
if err != nil {
return err
}
defer s.Close()
now := time.Now().Format("2006-01-02 15:04:05")
_, err = s.Where("id=?", []interface{}{id}).Update(map[string]interface{}{
"status": status,
"end_time": now,
})
if err != nil {
return err
}
return nil
}
func StopWorkflow(id string) error {
s, err := GetSqliteObjWH()
if err != nil {
return err
}
defer s.Close()
data, err := s.Where("id=?", []interface{}{id}).Select()
if err != nil {
return err
}
if len(data) == 0 {
return nil
}
SetWorkflowStatus(data[0]["workflow_id"].(string), id, "fail")
return nil
}
func GetExecLog(id string) (string, error) {
log, err := os.ReadFile(filepath.Join(public.GetSettingIgnoreError("workflow_log_path"), id+".log"))
if err != nil {
return "", err
}
return string(log), nil
}
package workflow
import (
"ALLinSSL/backend/public"
"os"
"path/filepath"
"time"
)
// GetSqliteObjWH 工作流执行历史记录表对象
func GetSqliteObjWH() (*public.Sqlite, error) {
s, err := public.NewSqlite("data/data.db", "")
if err != nil {
return nil, err
}
s.Connect()
s.TableName = "workflow_history"
return s, nil
}
// GetListWH 获取工作流执行历史记录列表
func GetListWH(id string, p, limit int64) ([]map[string]any, int, error) {
var data []map[string]any
var count int64
s, err := GetSqliteObjWH()
if err != nil {
return data, 0, err
}
defer s.Close()
var limits []int64
if p >= 0 && limit >= 0 {
limits = []int64{0, limit}
if p > 1 {
limits[0] = (p - 1) * limit
limits[1] = p * limit
}
}
if id == "" {
count, err = s.Count()
data, err = s.Limit(limits).Order("create_time", "desc").Select()
} else {
count, err = s.Where("workflow_id=?", []interface{}{id}).Count()
data, err = s.Where("workflow_id=?", []interface{}{id}).Limit(limits).Order("create_time", "desc").Select()
}
if err != nil {
return data, 0, err
}
return data, int(count), nil
}
// 添加工作流执行历史记录
func AddWorkflowHistory(workflowID, execType string) (string, error) {
s, err := GetSqliteObjWH()
if err != nil {
return "", err
}
defer s.Close()
now := time.Now().Format("2006-01-02 15:04:05")
ID := public.GenerateUUID()
_, err = s.Insert(map[string]interface{}{
"id": ID,
"workflow_id": workflowID,
"status": "running",
"exec_type": execType,
"create_time": now,
})
if err != nil {
return "", err
}
_ = UpdDb(workflowID, map[string]interface{}{"last_run_status": "running", "last_run_time": now})
return ID, nil
}
// 工作流执行结束
func UpdateWorkflowHistory(id, status string) error {
s, err := GetSqliteObjWH()
if err != nil {
return err
}
defer s.Close()
now := time.Now().Format("2006-01-02 15:04:05")
_, err = s.Where("id=?", []interface{}{id}).Update(map[string]interface{}{
"status": status,
"end_time": now,
})
if err != nil {
return err
}
return nil
}
func StopWorkflow(id string) error {
s, err := GetSqliteObjWH()
if err != nil {
return err
}
defer s.Close()
data, err := s.Where("id=?", []interface{}{id}).Select()
if err != nil {
return err
}
if len(data) == 0 {
return nil
}
SetWorkflowStatus(data[0]["workflow_id"].(string), id, "fail")
return nil
}
func GetExecLog(id string) (string, error) {
log, err := os.ReadFile(filepath.Join(public.GetSettingIgnoreError("workflow_log_path"), id+".log"))
if err != nil {
return "", err
}
return string(log), nil
}