diff --git a/fatal.log b/fatal.log index 2b1707b..34fad41 100644 --- a/fatal.log +++ b/fatal.log @@ -921,3 +921,170 @@ 2023-09-25 11:13:46.835 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/alarm/panel] : 获取面板告警分组 ->8ms 2023-09-25 11:13:46.867 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/panel] [uid=1] [uname=panda] : 获取DevicePanel ->28ms 2023-09-25 11:37:46.272 [ERROR] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:41] : error http serve: http: Server closed + +2023-09-25 11:37:55-------------------------------- +2023-09-25 11:37:55.768 [INFO] [D:/workspace/go/pkg/mod/github.com/!panda!x!g!o/!panda!kit@v0.0.0-20230914055936-853aa978dda1/starter/gorm.go:36] : 连接mysql [root:!MyEMS1@tcp(127.0.0.1:3306)/pandax_iot?charset=utf8&loc=Local&parseTime=true] +2023-09-25 11:37:55.773 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:46] : mysql连接成功 +2023-09-25 11:37:55.775 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:51] : Redis连接成功 +2023-09-25 11:37:55.776 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:58] : Tdengine连接成功 +2023-09-25 11:37:55.783 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:76] : 路由初始化完成 +2023-09-25 11:37:55.783 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:38] : HTTP Server listen: :7788 +2023-09-25 11:37:55.784 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/hook.go:37] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: :9001 +2023-09-25 11:37:55.789 [INFO] [D:/workspace/go/project/PandaX/PandaX/apps/job/jobs/jobbase.go:87] : 2023-09-25 11:37:55 [INFO] JobCore start success. +2023-09-25 11:37:58.333 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/hook.go:124] : pandax断开连接 +2023-09-25 11:37:58.335 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/hook.go:153] : 账号pandax,密码pandax,开始认证 +2023-09-25 11:37:58.336 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/hook.go:96] : Client emqx@172.17.0.4 Connected +2023-09-25 11:38:20.016 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/alarm/panel] : 获取面板告警分组 ->5ms +2023-09-25 11:38:20.053 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/panel] : 获取DevicePanel ->30ms +2023-09-25 11:41:02.136 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/device/panel] [uid=1] : 获取DevicePanel ->0ms +2023-09-25 11:41:02.310 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/alarm/panel] [uid=1] [uname=panda] : 获取面板告警分组 ->31ms +2023-09-25 11:41:10.679 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/alarm/panel] : 获取面板告警分组 ->7ms +2023-09-25 11:41:10.687 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/panel] : 获取DevicePanel ->0ms +2023-09-25 11:42:32.648 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/alarm/panel] [uid=1] [uname=panda] : 获取面板告警分组 ->41ms +2023-09-25 11:42:32.666 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/panel] [uid=1] [uname=panda] : 获取DevicePanel ->0ms +2023-09-25 11:42:37.581 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/alarm/panel] : 获取面板告警分组 ->8ms +2023-09-25 11:42:37.589 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/panel] [uid=1] [uname=panda] : 获取DevicePanel ->0ms +2023-09-25 11:42:58.537 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/panel] : 获取DevicePanel ->0ms +2023-09-25 11:42:58.619 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/alarm/panel] : 获取面板告警分组 ->17ms +2023-09-25 11:44:51.053 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/system/notice/list] : 获取通知分页列表 ->15ms +2023-09-25 11:44:54.834 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/system/notice/list] : 获取通知分页列表 ->13ms +2023-09-25 11:46:37.350 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/panel] : 获取DevicePanel ->0ms +2023-09-25 11:46:37.411 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/alarm/panel] : 获取面板告警分组 ->9ms +2023-09-25 11:47:14.400 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/device/panel] [uid=1] : 获取DevicePanel ->0ms +2023-09-25 11:47:14.460 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/device/alarm/panel] [uid=1] : 获取面板告警分组 ->14ms +2023-09-25 13:41:30.542 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/panel] [uid=1] [uname=panda] : 获取DevicePanel ->0ms +2023-09-25 13:41:30.570 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/alarm/panel] [uid=1] [uname=panda] : 获取面板告警分组 ->14ms +2023-09-25 13:42:12.677 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/panel] [uid=1] [uname=panda] : 获取DevicePanel ->0ms +2023-09-25 13:42:12.740 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/alarm/panel] : 获取面板告警分组 ->18ms +2023-09-25 13:42:16.789 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/system/notice/list] [uid=1] [uname=panda] : 获取通知分页列表 ->10ms +2023-09-25 13:42:17.592 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/system/notice/list] [uid=1] : 获取通知分页列表 ->12ms +2023-09-25 13:43:34.172 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/system/user/list] : 得到用户分页列表 ->11ms +2023-09-25 13:43:34.172 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/system/organization/organizationTree] : 获取所有组织树 ->4ms +2023-09-25 13:43:34.186 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/system/dict/data/type] [uid=1] [uname=panda] : 获取字典数据列表通过字典类型 ->6ms +2023-09-25 13:43:34.202 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/system/dict/data/type] : 获取字典数据列表通过字典类型 ->9ms +2023-09-25 13:43:36.730 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/panel] [uid=1] [uname=panda] : 获取DevicePanel ->0ms +2023-09-25 13:43:36.744 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/alarm/panel] : 获取面板告警分组 ->5ms +2023-09-25 13:43:40.308 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/system/dict/data/type] [uid=1] [uname=panda] : 获取字典数据列表通过字典类型 ->6ms +2023-09-25 13:43:40.318 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/system/role/list] [uid=1] : 获取角色分页列表 ->8ms +2023-09-25 13:43:43.689 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/system/dict/data/type] : 获取字典数据列表通过字典类型 ->6ms +2023-09-25 13:43:43.717 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/product/category/list] : 获取ProductCategory列表 ->25ms +2023-09-25 13:43:45.943 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/device/product/category/list/tree/label] [uid=1] : 获取ProductCategory树 ->6ms +2023-09-25 13:43:45.947 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/product/list] [uid=1] [uname=panda] : 获取Product分页列表 ->21ms +2023-09-25 13:43:45.957 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/system/dict/data/type] : 获取字典数据列表通过字典类型 ->6ms +2023-09-25 13:43:46.004 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/upload/get/d7432e83df771fad85aa4e74235b0591_20230819092625.png] : 获取图片 ->0ms +2023-09-25 13:43:54.529 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/template/list] : 获取Template分页列表 ->103ms +2023-09-25 13:44:41.553 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/system/notice/list] [uid=1] [uname=panda] : 获取通知分页列表 ->14ms + +2023-09-25 17:54:02-------------------------------- +2023-09-25 17:54:02.922 [INFO] [D:/workspace/go/pkg/mod/github.com/!panda!x!g!o/!panda!kit@v0.0.0-20230914055936-853aa978dda1/starter/gorm.go:36] : 连接mysql [root:!MyEMS1@tcp(127.0.0.1:3306)/pandax_iot?charset=utf8&loc=Local&parseTime=true] +2023-09-25 17:54:02.927 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:45] : mysql连接成功 +2023-09-25 17:54:02.929 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:50] : Redis连接成功 +2023-09-25 17:54:02.929 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:57] : Tdengine连接成功 +2023-09-25 17:54:02.937 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:75] : 路由初始化完成 +2023-09-25 17:54:02.937 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:38] : HTTP Server listen: :7788 +2023-09-25 17:54:02.937 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:46] : HTTP Server listen: :7788 +2023-09-25 17:54:02.937 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:35] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: +2023-09-25 17:54:02.942 [INFO] [D:/workspace/go/project/PandaX/PandaX/apps/job/jobs/jobbase.go:87] : 2023-09-25 17:54:02 [INFO] JobCore start success. +2023-09-25 17:54:03.045 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:114] : pandax断开连接 +2023-09-25 17:54:03.049 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:143] : 账号pandax,密码pandax,开始认证 +2023-09-25 17:54:03.052 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:86] : Client emqx@172.17.0.4 Connected +2023-09-25 17:54:30.252 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:143] : 账号9GOIPOI6GQ,密码YWRlMTA0MmYtMzc2MS0zZTljLThjNjAtMzNhMzg4ZjdkOGQ3,开始认证 +2023-09-25 17:54:30.256 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:86] : Client emqx@172.17.0.4 Connected +2023-09-25 17:54:31.066 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:168] : 9GOIPOI6GQ订阅了[name:"$dz/events/device/s7-device/data/update"] +2023-09-25 17:54:31.378 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:168] : 9GOIPOI6GQ订阅了[name:"v1/devices/me/telemetry"] +2023-09-25 17:54:31.665 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:168] : 9GOIPOI6GQ订阅了[name:"$hw/events/device/s7-device/twin/update"] +2023-09-25 17:54:31.964 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:168] : 9GOIPOI6GQ订阅了[name:"v1/devices/me/rpc/request/+"] +2023-09-25 17:55:09.205 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [POST=/system/user/login] : 登录 ->74ms +2023-09-25 17:55:09.233 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/system/user/auth] : 认证信息 ->19ms +2023-09-25 17:55:09.772 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/alarm/panel] : 获取面板告警分组 ->4ms +2023-09-25 17:55:09.784 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/panel] : 获取DevicePanel ->26ms +2023-09-25 17:55:14.901 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/system/dict/data/type] : 获取字典数据列表通过字典类型 ->9ms +2023-09-25 17:55:14.917 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/product/list/all] : 获取Product分页列表 ->9ms +2023-09-25 17:55:15.003 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/group/list/tree/label] : 获取DeviceGroup树 ->127ms +2023-09-25 17:55:15.007 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/list] : 获取Device分页列表 ->122ms +2023-09-25 17:55:22.700 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uid=1] [uname=panda] [GET=/device/9GOIPOI6GQ/status] : 获取Device状态信息 ->11ms +2023-09-25 17:55:24.055 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [GET=/device/9GOIPOI6GQ/status] [uid=1] [uname=panda] : 获取Device状态信息 ->9ms +2023-09-25 17:55:28.001 [ERROR] [D:/workspace/go/project/PandaX/PandaX/iothub/hook_message_work/hook_message_work.go:157] : +2023-09-25 17:55:35.327 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/device/9GOIPOI6GQ/status] [uid=1] : 获取Device状态信息 ->7ms +2023-09-25 17:55:36.418 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/middleware/log.go:34] [uname=panda] [GET=/device/9GOIPOI6GQ/status] [uid=1] : 获取Device状态信息 ->8ms +2023-09-25 18:00:22.020 [ERROR] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:48] : error http serve: http: Server closed + +2023-09-25 18:00:41-------------------------------- +2023-09-25 18:00:42.003 [INFO] [D:/workspace/go/pkg/mod/github.com/!panda!x!g!o/!panda!kit@v0.0.0-20230914055936-853aa978dda1/starter/gorm.go:36] : 连接mysql [root:!MyEMS1@tcp(127.0.0.1:3306)/pandax_iot?charset=utf8&loc=Local&parseTime=true] +2023-09-25 18:00:42.008 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:45] : mysql连接成功 +2023-09-25 18:00:42.011 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:50] : Redis连接成功 +2023-09-25 18:00:42.011 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:57] : Tdengine连接成功 +2023-09-25 18:00:42.019 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:75] : 路由初始化完成 +2023-09-25 18:00:42.019 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:38] : HTTP Server listen: :7788 +2023-09-25 18:00:42.019 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:46] : HTTP Server listen: :7788 +2023-09-25 18:00:42.019 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:35] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: +2023-09-25 18:00:42.024 [INFO] [D:/workspace/go/project/PandaX/PandaX/apps/job/jobs/jobbase.go:87] : 2023-09-25 18:00:42 [INFO] JobCore start success. +2023-09-25 18:00:42.159 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:143] : 账号pandax,密码pandax,开始认证 +2023-09-25 18:00:42.160 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:86] : Client emqx@172.17.0.4 Connected +2023-09-25 18:00:47.166 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/http_server.go:47] : HTTP Server listen: +2023-09-25 18:00:47.166 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/hook.go:29] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: + +2023-09-25 18:01:54-------------------------------- +2023-09-25 18:01:54.586 [INFO] [D:/workspace/go/pkg/mod/github.com/!panda!x!g!o/!panda!kit@v0.0.0-20230914055936-853aa978dda1/starter/gorm.go:36] : 连接mysql [root:!MyEMS1@tcp(127.0.0.1:3306)/pandax_iot?charset=utf8&loc=Local&parseTime=true] +2023-09-25 18:01:54.592 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:45] : mysql连接成功 +2023-09-25 18:01:54.594 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:50] : Redis连接成功 +2023-09-25 18:01:54.594 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:57] : Tdengine连接成功 +2023-09-25 18:01:54.602 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:75] : 路由初始化完成 +2023-09-25 18:01:54.602 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:38] : HTTP Server listen: :7788 +2023-09-25 18:01:54.602 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:46] : HTTP Server listen: :7788 +2023-09-25 18:01:54.602 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:35] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: +2023-09-25 18:01:54.607 [INFO] [D:/workspace/go/project/PandaX/PandaX/apps/job/jobs/jobbase.go:87] : 2023-09-25 18:01:54 [INFO] JobCore start success. +2023-09-25 18:01:57.966 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:114] : pandax断开连接 +2023-09-25 18:01:57.968 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:143] : 账号pandax,密码pandax,开始认证 +2023-09-25 18:01:57.971 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:86] : Client emqx@172.17.0.4 Connected +2023-09-25 18:02:02.976 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/hook.go:29] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: +2023-09-25 18:02:02.976 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/http_server.go:50] : HTTP Server listen: :9002 + +2023-09-25 18:03:34-------------------------------- +2023-09-25 18:03:34.131 [INFO] [D:/workspace/go/pkg/mod/github.com/!panda!x!g!o/!panda!kit@v0.0.0-20230914055936-853aa978dda1/starter/gorm.go:36] : 连接mysql [root:!MyEMS1@tcp(127.0.0.1:3306)/pandax_iot?charset=utf8&loc=Local&parseTime=true] +2023-09-25 18:03:34.137 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:45] : mysql连接成功 +2023-09-25 18:03:34.139 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:50] : Redis连接成功 +2023-09-25 18:03:34.139 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:57] : Tdengine连接成功 +2023-09-25 18:03:34.146 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:75] : 路由初始化完成 +2023-09-25 18:03:34.146 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:38] : HTTP Server listen: :7788 +2023-09-25 18:03:34.146 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:46] : HTTP Server listen: :7788 +2023-09-25 18:03:34.147 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:35] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: +2023-09-25 18:03:34.152 [INFO] [D:/workspace/go/project/PandaX/PandaX/apps/job/jobs/jobbase.go:87] : 2023-09-25 18:03:34 [INFO] JobCore start success. +2023-09-25 18:03:38.746 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:114] : pandax断开连接 +2023-09-25 18:03:38.747 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:143] : 账号pandax,密码pandax,开始认证 +2023-09-25 18:03:38.750 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:86] : Client emqx@172.17.0.4 Connected +2023-09-25 18:03:43.755 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/http_server.go:50] : HTTP Server listen: :9002 +2023-09-25 18:03:43.755 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/hook.go:29] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: +2023-09-25 18:06:02.612 [ERROR] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:48] : error http serve: http: Server closed + +2023-09-25 18:06:10-------------------------------- +2023-09-25 18:06:10.712 [INFO] [D:/workspace/go/pkg/mod/github.com/!panda!x!g!o/!panda!kit@v0.0.0-20230914055936-853aa978dda1/starter/gorm.go:36] : 连接mysql [root:!MyEMS1@tcp(127.0.0.1:3306)/pandax_iot?charset=utf8&loc=Local&parseTime=true] +2023-09-25 18:06:10.718 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:45] : mysql连接成功 +2023-09-25 18:06:10.720 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:50] : Redis连接成功 +2023-09-25 18:06:10.720 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:57] : Tdengine连接成功 +2023-09-25 18:06:10.730 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:75] : 路由初始化完成 +2023-09-25 18:06:10.730 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:38] : HTTP Server listen: :7788 +2023-09-25 18:06:10.730 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:46] : HTTP Server listen: :7788 +2023-09-25 18:06:10.731 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:35] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: +2023-09-25 18:06:10.736 [INFO] [D:/workspace/go/project/PandaX/PandaX/apps/job/jobs/jobbase.go:87] : 2023-09-25 18:06:10 [INFO] JobCore start success. +2023-09-25 18:06:14.668 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:114] : pandax断开连接 +2023-09-25 18:06:14.669 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:143] : 账号pandax,密码pandax,开始认证 +2023-09-25 18:06:14.674 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:86] : Client emqx@172.17.0.4 Connected +2023-09-25 18:06:19.679 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/hook.go:29] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: +2023-09-25 18:06:19.680 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/http_server.go:50] : HTTP Server listen: :9002 + +2023-09-25 18:08:10-------------------------------- +2023-09-25 18:08:10.162 [INFO] [D:/workspace/go/pkg/mod/github.com/!panda!x!g!o/!panda!kit@v0.0.0-20230914055936-853aa978dda1/starter/gorm.go:36] : 连接mysql [root:!MyEMS1@tcp(127.0.0.1:3306)/pandax_iot?charset=utf8&loc=Local&parseTime=true] +2023-09-25 18:08:10.168 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:45] : mysql连接成功 +2023-09-25 18:08:10.171 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:50] : Redis连接成功 +2023-09-25 18:08:10.171 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:57] : Tdengine连接成功 +2023-09-25 18:08:10.189 [INFO] [D:/workspace/go/project/PandaX/PandaX/main.go:75] : 路由初始化完成 +2023-09-25 18:08:10.189 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:38] : HTTP Server listen: :7788 +2023-09-25 18:08:10.190 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:35] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: +2023-09-25 18:08:10.190 [INFO] [D:/workspace/go/project/PandaX/PandaX/pkg/transport/http_server.go:46] : HTTP Server listen: :7788 +2023-09-25 18:08:10.199 [INFO] [D:/workspace/go/project/PandaX/PandaX/apps/job/jobs/jobbase.go:87] : 2023-09-25 18:08:10 [INFO] JobCore start success. +2023-09-25 18:08:13.720 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:114] : pandax断开连接 +2023-09-25 18:08:13.721 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:143] : 账号pandax,密码pandax,开始认证 +2023-09-25 18:08:13.724 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/emqxserver/hook.go:86] : Client emqx@172.17.0.4 Connected +2023-09-25 18:08:18.727 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/http_server.go:50] : HTTP Server listen: :9002 +2023-09-25 18:08:18.727 [INFO] [D:/workspace/go/project/PandaX/PandaX/iothub/server/httpserver/hook.go:29] : IOTHUB HOOK Start SUCCESS,Grpc Server listen: diff --git a/iothub/hook_message_work.go b/iothub/hook_message_work/hook_message_work.go similarity index 95% rename from iothub/hook_message_work.go rename to iothub/hook_message_work/hook_message_work.go index 35fadd4..35313ec 100644 --- a/iothub/hook_message_work.go +++ b/iothub/hook_message_work/hook_message_work.go @@ -1,4 +1,4 @@ -package iothub +package hook_message_work import ( "context" @@ -9,6 +9,7 @@ import ( "pandax/apps/device/services" ruleEntity "pandax/apps/rule/entity" ruleService "pandax/apps/rule/services" + "pandax/iothub/netbase" "pandax/pkg/global" "pandax/pkg/rule_engine" "pandax/pkg/rule_engine/message" @@ -22,19 +23,19 @@ import ( func (s *HookService) MessageWork() { for { select { - case msg := <-s.messageCh: + case msg := <-s.MessageCh: s.handleOne(msg) // 处理消息 } } } -func (s *HookService) handleOne(msg *DeviceEventInfo) { - if s.ch != nil { // 用于并发限制 - s.ch <- struct{}{} +func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { + if s.Ch != nil { // 用于并发限制 + s.Ch <- struct{}{} } - s.wg.Add(1) + s.Wg.Add(1) go func() { - defer s.wg.Done() + defer s.Wg.Done() etoken := &tool.DeviceAuth{} err := global.RedisDb.Get(msg.DeviceId, etoken) if err != nil { diff --git a/iothub/hook_message_work/hook_service.go b/iothub/hook_message_work/hook_service.go new file mode 100644 index 0000000..cd7c9d4 --- /dev/null +++ b/iothub/hook_message_work/hook_service.go @@ -0,0 +1,26 @@ +package hook_message_work + +import ( + "pandax/iothub/netbase" + "pandax/pkg/global" + "sync" +) + +type HookService struct { + Cache sync.Map + Wg sync.WaitGroup // 优雅关闭 + Ch chan struct{} // 并发限制 + MessageCh chan *netbase.DeviceEventInfo +} + +func NewHookService() *HookService { + hs := &HookService{ + Cache: sync.Map{}, + MessageCh: make(chan *netbase.DeviceEventInfo), + } + // 并发限制,代表服务器处理能力 + if global.Conf.Queue.Enable && global.Conf.Queue.Num > 0 { + hs.Ch = make(chan struct{}, global.Conf.Queue.Num) + } + return hs +} diff --git a/iothub/iothub.go b/iothub/iothub.go new file mode 100644 index 0000000..667feff --- /dev/null +++ b/iothub/iothub.go @@ -0,0 +1,19 @@ +package iothub + +import ( + "pandax/iothub/hook_message_work" + "pandax/iothub/server/emqxserver" + "pandax/iothub/server/httpserver" +) + +func InitIothub() { + service := hook_message_work.NewHookService() + // 初始化EMQX + emqxserver.InitEmqxHook("", service) + // 初始化HTTP + httpserver.InitHttpHook("", service) + //初始化TCP + + // 开启线程处理消息 + go service.MessageWork() +} diff --git a/iothub/hook_base.go b/iothub/netbase/hook_base.go similarity index 91% rename from iothub/hook_base.go rename to iothub/netbase/hook_base.go index 93bfadc..b849ff7 100644 --- a/iothub/hook_base.go +++ b/iothub/netbase/hook_base.go @@ -1,9 +1,9 @@ -package iothub +package netbase import ( "encoding/json" "pandax/apps/device/services" - exhook "pandax/iothub/protobuf" + "pandax/iothub/server/emqxserver/protobuf" "pandax/pkg/global" "pandax/pkg/tool" "regexp" @@ -11,7 +11,7 @@ import ( "time" ) -func (s *HookService) auth(username, password string) bool { +func Auth(username, password string) bool { // 根据token,去查设备Id以及设备类型 if username == "pandax" && password == "pandax" { return true @@ -50,7 +50,7 @@ func (s *HookService) auth(username, password string) bool { } // 解析遥测数据类型 返回标准带时间戳格式 -func updateDeviceTelemetryData(data string) map[string]interface{} { +func UpdateDeviceTelemetryData(data string) map[string]interface{} { tel := make(map[string]interface{}) err := json.Unmarshal([]byte(data), &tel) if err != nil { @@ -74,7 +74,7 @@ func updateDeviceTelemetryData(data string) map[string]interface{} { return tel } -func updateDeviceAttributesData(data string) map[string]interface{} { +func UpdateDeviceAttributesData(data string) map[string]interface{} { tel := make(map[string]interface{}) err := json.Unmarshal([]byte(data), &tel) if err != nil { @@ -127,7 +127,7 @@ func EncodeData(jsonData interface{}) ([]byte, error) { return byteData, nil } -func getRequestIdFromTopic(reg, topic string) (requestId string) { +func GetRequestIdFromTopic(reg, topic string) (requestId string) { re := regexp.MustCompile(reg) res := re.FindStringSubmatch(topic) if len(res) > 1 { diff --git a/iothub/iothub_session.go b/iothub/netbase/iothub_session.go similarity index 91% rename from iothub/iothub_session.go rename to iothub/netbase/iothub_session.go index 67c8e30..67f8348 100644 --- a/iothub/iothub_session.go +++ b/iothub/netbase/iothub_session.go @@ -1,4 +1,4 @@ -package iothub +package netbase type DeviceEventInfo struct { DeviceId string `json:"deviceId"` diff --git a/iothub/grpc_server.go b/iothub/server/emqxserver/grpc_server.go similarity index 97% rename from iothub/grpc_server.go rename to iothub/server/emqxserver/grpc_server.go index eed0bf8..2fe2106 100644 --- a/iothub/grpc_server.go +++ b/iothub/server/emqxserver/grpc_server.go @@ -1,4 +1,4 @@ -package iothub +package emqxserver import ( "context" diff --git a/iothub/hook.go b/iothub/server/emqxserver/hook.go similarity index 51% rename from iothub/hook.go rename to iothub/server/emqxserver/hook.go index 566f49d..ebe9a78 100644 --- a/iothub/hook.go +++ b/iothub/server/emqxserver/hook.go @@ -1,62 +1,52 @@ -package iothub +package emqxserver import ( "context" "encoding/json" "fmt" - exhook "pandax/iothub/protobuf" + "pandax/iothub/hook_message_work" + "pandax/iothub/netbase" + exhook2 "pandax/iothub/server/emqxserver/protobuf" "pandax/pkg/global" "pandax/pkg/mqtt" "pandax/pkg/rule_engine/message" "pandax/pkg/tdengine" "pandax/pkg/tool" "strconv" - "sync" "time" ) // 创建设备 设备需要一个账号密码 账号使用 namespace.devicename // 需要创建一个应用事件表? 例如边缘kuiper掉线事件记录或设备事件 type 事件分类 -type HookService struct { - exhook.UnimplementedHookProviderServer - cache sync.Map - wg sync.WaitGroup // 优雅关闭 - ch chan struct{} // 并发限制 - messageCh chan *DeviceEventInfo +type HookGrpcService struct { + exhook2.UnimplementedHookProviderServer + HookService *hook_message_work.HookService } -func InitEmqxHook(addr string) *HookService { +func InitEmqxHook(addr string, hs *hook_message_work.HookService) { s := NewServer(addr) - service := NewHookService() - exhook.RegisterHookProviderServer(s.GetServe(), service) + hgs := NewHookGrpcService(hs) + exhook2.RegisterHookProviderServer(s.GetServe(), hgs) err := s.Start(context.TODO()) if err != nil { global.Log.Panic("grpc服务启动错误", err) } else { global.Log.Infof("IOTHUB HOOK Start SUCCESS,Grpc Server listen: %s", addr) } - // 开启线程处理消息 - go service.MessageWork() // 初始化 MQTT客户端 global.MqttClient = mqtt.InitMqtt(global.Conf.Mqtt.Broker, global.Conf.Mqtt.Username, global.Conf.Mqtt.Password) - return service } -func NewHookService() *HookService { - hs := &HookService{ - cache: sync.Map{}, - messageCh: make(chan *DeviceEventInfo), +func NewHookGrpcService(hs *hook_message_work.HookService) *HookGrpcService { + hgs := &HookGrpcService{ + HookService: hs, } - // 并发限制,代表服务器处理能力 - if global.Conf.Queue.Enable && global.Conf.Queue.Num > 0 { - hs.ch = make(chan struct{}, global.Conf.Queue.Num) - } - return hs + return hgs } -func (s *HookService) OnProviderLoaded(ctx context.Context, in *exhook.ProviderLoadedRequest) (*exhook.LoadedResponse, error) { - hooks := []*exhook.HookSpec{ +func (s *HookGrpcService) OnProviderLoaded(ctx context.Context, in *exhook2.ProviderLoadedRequest) (*exhook2.LoadedResponse, error) { + hooks := []*exhook2.HookSpec{ {Name: "client.connect"}, {Name: "client.connack"}, {Name: "client.connected"}, @@ -77,25 +67,25 @@ func (s *HookService) OnProviderLoaded(ctx context.Context, in *exhook.ProviderL {Name: "message.acked"}, {Name: "message.dropped"}, } - return &exhook.LoadedResponse{Hooks: hooks}, nil + return &exhook2.LoadedResponse{Hooks: hooks}, nil } -func (s *HookService) OnProviderUnloaded(ctx context.Context, in *exhook.ProviderUnloadedRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnProviderUnloaded(ctx context.Context, in *exhook2.ProviderUnloadedRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnClientConnect(ctx context.Context, in *exhook.ClientConnectRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnClientConnect(ctx context.Context, in *exhook2.ClientConnectRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnClientConnack(ctx context.Context, in *exhook.ClientConnackRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnClientConnack(ctx context.Context, in *exhook2.ClientConnackRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnClientConnected(ctx context.Context, in *exhook.ClientConnectedRequest) (*exhook.EmptySuccess, error) { +func (s *HookGrpcService) OnClientConnected(ctx context.Context, in *exhook2.ClientConnectedRequest) (*exhook2.EmptySuccess, error) { global.Log.Info(fmt.Sprintf("Client %s Connected ", in.Clientinfo.GetNode())) ts := time.Now().Format("2006-01-02 15:04:05.000") - username := GetUserName(in.Clientinfo) + username := netbase.GetUserName(in.Clientinfo) ci := &tdengine.ConnectInfo{ ClientID: in.Clientinfo.Clientid, DeviceId: username, @@ -105,24 +95,24 @@ func (s *HookService) OnClientConnected(ctx context.Context, in *exhook.ClientCo Type: message.ConnectMes, Ts: ts, } - v, err := EncodeData(*ci) + v, err := netbase.EncodeData(*ci) if err != nil { return nil, err } // 添加设备上线记录 - data := &DeviceEventInfo{ + data := &netbase.DeviceEventInfo{ DeviceId: username, Datas: string(v), Type: message.ConnectMes, } - s.messageCh <- data + s.HookService.MessageCh <- data - return &exhook.EmptySuccess{}, nil + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnClientDisconnected(ctx context.Context, in *exhook.ClientDisconnectedRequest) (*exhook.EmptySuccess, error) { +func (s *HookGrpcService) OnClientDisconnected(ctx context.Context, in *exhook2.ClientDisconnectedRequest) (*exhook2.EmptySuccess, error) { global.Log.Info(fmt.Sprintf("%s断开连接", in.Clientinfo.Username)) - devicename := GetUserName(in.Clientinfo) + devicename := netbase.GetUserName(in.Clientinfo) ts := time.Now().Format("2006-01-02 15:04:05.000") ci := &tdengine.ConnectInfo{ @@ -134,99 +124,99 @@ func (s *HookService) OnClientDisconnected(ctx context.Context, in *exhook.Clien Type: message.DisConnectMes, Ts: ts, } - v, err := EncodeData(*ci) + v, err := netbase.EncodeData(*ci) if err != nil { return nil, err } // 添加设备下线记录 - data := &DeviceEventInfo{ + data := &netbase.DeviceEventInfo{ DeviceId: devicename, Datas: string(v), Type: message.DisConnectMes, } - s.messageCh <- data - return &exhook.EmptySuccess{}, nil + s.HookService.MessageCh <- data + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnClientAuthenticate(ctx context.Context, in *exhook.ClientAuthenticateRequest) (*exhook.ValuedResponse, error) { +func (s *HookGrpcService) OnClientAuthenticate(ctx context.Context, in *exhook2.ClientAuthenticateRequest) (*exhook2.ValuedResponse, error) { global.Log.Info(fmt.Sprintf("账号%s,密码%s,开始认证", in.Clientinfo.Username, in.Clientinfo.Password)) - res := &exhook.ValuedResponse{} - res.Type = exhook.ValuedResponse_STOP_AND_RETURN - res.Value = &exhook.ValuedResponse_BoolResult{BoolResult: false} + res := &exhook2.ValuedResponse{} + res.Type = exhook2.ValuedResponse_STOP_AND_RETURN + res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false} - username := GetUserName(in.Clientinfo) - pw := GetPassword(in.Clientinfo) + username := netbase.GetUserName(in.Clientinfo) + pw := netbase.GetPassword(in.Clientinfo) if username == "" || pw == "" { global.Log.Warn(fmt.Sprintf("invalid username %s or password %s", username, pw)) return res, nil } - authRes := s.auth(username, pw) - res.Value = &exhook.ValuedResponse_BoolResult{BoolResult: authRes} + authRes := netbase.Auth(username, pw) + res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: authRes} return res, nil } -func (s *HookService) OnClientAuthorize(ctx context.Context, in *exhook.ClientAuthorizeRequest) (*exhook.ValuedResponse, error) { - reply := &exhook.ValuedResponse{} - reply.Type = exhook.ValuedResponse_STOP_AND_RETURN - reply.Value = &exhook.ValuedResponse_BoolResult{BoolResult: true} +func (s *HookGrpcService) OnClientAuthorize(ctx context.Context, in *exhook2.ClientAuthorizeRequest) (*exhook2.ValuedResponse, error) { + reply := &exhook2.ValuedResponse{} + reply.Type = exhook2.ValuedResponse_STOP_AND_RETURN + reply.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: true} return reply, nil } -func (s *HookService) OnClientSubscribe(ctx context.Context, in *exhook.ClientSubscribeRequest) (*exhook.EmptySuccess, error) { +func (s *HookGrpcService) OnClientSubscribe(ctx context.Context, in *exhook2.ClientSubscribeRequest) (*exhook2.EmptySuccess, error) { global.Log.Info(fmt.Sprintf("%s订阅了%v", in.Clientinfo.Username, in.TopicFilters)) // 验证topic 是否是规定的topic,可做topic白名单 - return &exhook.EmptySuccess{}, nil + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnClientUnsubscribe(ctx context.Context, in *exhook.ClientUnsubscribeRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnClientUnsubscribe(ctx context.Context, in *exhook2.ClientUnsubscribeRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnSessionCreated(ctx context.Context, in *exhook.SessionCreatedRequest) (*exhook.EmptySuccess, error) { +func (s *HookGrpcService) OnSessionCreated(ctx context.Context, in *exhook2.SessionCreatedRequest) (*exhook2.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnSessionSubscribed(ctx context.Context, in *exhook.SessionSubscribedRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnSessionSubscribed(ctx context.Context, in *exhook2.SessionSubscribedRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnSessionUnsubscribed(ctx context.Context, in *exhook.SessionUnsubscribedRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnSessionUnsubscribed(ctx context.Context, in *exhook2.SessionUnsubscribedRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnSessionResumed(ctx context.Context, in *exhook.SessionResumedRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnSessionResumed(ctx context.Context, in *exhook2.SessionResumedRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnSessionDiscarded(ctx context.Context, in *exhook.SessionDiscardedRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnSessionDiscarded(ctx context.Context, in *exhook2.SessionDiscardedRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnSessionTakenover(ctx context.Context, in *exhook.SessionTakenoverRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnSessionTakenover(ctx context.Context, in *exhook2.SessionTakenoverRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnSessionTerminated(ctx context.Context, in *exhook.SessionTerminatedRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnSessionTerminated(ctx context.Context, in *exhook2.SessionTerminatedRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnMessagePublish(ctx context.Context, in *exhook.MessagePublishRequest) (*exhook.ValuedResponse, error) { - res := &exhook.ValuedResponse{} - res.Type = exhook.ValuedResponse_STOP_AND_RETURN - res.Value = &exhook.ValuedResponse_BoolResult{BoolResult: false} +func (s *HookGrpcService) OnMessagePublish(ctx context.Context, in *exhook2.MessagePublishRequest) (*exhook2.ValuedResponse, error) { + res := &exhook2.ValuedResponse{} + res.Type = exhook2.ValuedResponse_STOP_AND_RETURN + res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false} if in.Message.From == mqtt.DefaultDownStreamClientId { - res.Value = &exhook.ValuedResponse_BoolResult{BoolResult: true} + res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: true} return res, nil } // 获取topic类型 ts := time.Now().Format("2006-01-02 15:04:05.000") eventType := IotHubTopic.GetMessageType(in.Message.Topic) datas := string(in.GetMessage().GetPayload()) - data := &DeviceEventInfo{ + data := &netbase.DeviceEventInfo{ Type: eventType, Datas: datas, DeviceId: in.Message.Headers["username"], @@ -237,7 +227,7 @@ func (s *HookService) OnMessagePublish(ctx context.Context, in *exhook.MessagePu err := json.Unmarshal(in.GetMessage().GetPayload(), &subData) if err != nil { global.Log.Warn(fmt.Sprintf("子网关上报数据格式错误")) - res.Value = &exhook.ValuedResponse_BoolResult{BoolResult: false} + res.Value = &exhook2.ValuedResponse_BoolResult{BoolResult: false} return res, nil } // key就是deviceId @@ -252,14 +242,14 @@ func (s *HookService) OnMessagePublish(ctx context.Context, in *exhook.MessagePu if in.Message.Topic == AttributesGatewayTopic { data.Type = message.AttributesMes marshal, _ := json.Marshal(value) - attributesData := updateDeviceAttributesData(string(marshal)) + attributesData := netbase.UpdateDeviceAttributesData(string(marshal)) if attributesData == nil { continue } bytes, _ := json.Marshal(attributesData) data.Datas = string(bytes) // 子设备发送到队列里 - s.messageCh <- data + s.HookService.MessageCh <- data } if in.Message.Topic == TelemetryGatewayTopic { data.Type = message.TelemetryMes @@ -274,14 +264,14 @@ func (s *HookService) OnMessagePublish(ctx context.Context, in *exhook.MessagePu } for _, da := range telData { td, _ := json.Marshal(da) - telemetryData := updateDeviceTelemetryData(string(td)) + telemetryData := netbase.UpdateDeviceTelemetryData(string(td)) if telemetryData == nil { continue } bytes, _ := json.Marshal(telemetryData) data.Datas = string(bytes) // 子设备发送到队列里 - s.messageCh <- data + s.HookService.MessageCh <- data } } if in.Message.Topic == ConnectGatewayTopic { @@ -294,10 +284,10 @@ func (s *HookService) OnMessagePublish(ctx context.Context, in *exhook.MessagePu Type: message.ConnectMes, Ts: ts, } - v, _ := EncodeData(*ci) + v, _ := netbase.EncodeData(*ci) data.Datas = string(v) // 子设备发送到队列里 - s.messageCh <- data + s.HookService.MessageCh <- data } if in.Message.Topic == DisconnectGatewayTopic { data.Type = message.DisConnectMes @@ -309,13 +299,13 @@ func (s *HookService) OnMessagePublish(ctx context.Context, in *exhook.MessagePu Type: message.DisConnectMes, Ts: ts, } - v, _ := EncodeData(*ci) + v, _ := netbase.EncodeData(*ci) data.Datas = string(v) // 子设备发送到队列里 - s.messageCh <- data + s.HookService.MessageCh <- data } } - res.Value = &exhook.ValuedResponse_Message{Message: in.Message} + res.Value = &exhook2.ValuedResponse_Message{Message: in.Message} return res, nil } @@ -323,7 +313,7 @@ func (s *HookService) OnMessagePublish(ctx context.Context, in *exhook.MessagePu case message.RowMes: data.Datas = fmt.Sprintf(`{"ts": "%s","rowdata": "%s"}`, ts, data.Datas) case message.AttributesMes: - attributesData := updateDeviceAttributesData(data.Datas) + attributesData := netbase.UpdateDeviceAttributesData(data.Datas) if attributesData == nil { return res, nil } @@ -331,7 +321,7 @@ func (s *HookService) OnMessagePublish(ctx context.Context, in *exhook.MessagePu data.Datas = string(bytes) case message.TelemetryMes: // 数据处理 如果上传的数据没有时间戳 添加时间戳更改格式化 - telemetryData := updateDeviceTelemetryData(data.Datas) + telemetryData := netbase.UpdateDeviceTelemetryData(data.Datas) if telemetryData == nil { return res, nil } @@ -339,28 +329,25 @@ func (s *HookService) OnMessagePublish(ctx context.Context, in *exhook.MessagePu data.Datas = string(bytes) case message.RpcRequestMes: // 获取请求id - id := getRequestIdFromTopic(RpcReqReg, in.Message.Topic) + id := netbase.GetRequestIdFromTopic(RpcReqReg, in.Message.Topic) data.RequestId = id } //TODO 如果设备消息;量过大,推荐采用NATS队列处理 - s.messageCh <- data + s.HookService.MessageCh <- data - res.Value = &exhook.ValuedResponse_Message{Message: in.Message} + res.Value = &exhook2.ValuedResponse_Message{Message: in.Message} return res, nil } -func (s *HookService) OnMessageDelivered(ctx context.Context, in *exhook.MessageDeliveredRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnMessageDelivered(ctx context.Context, in *exhook2.MessageDeliveredRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnMessageDropped(ctx context.Context, in *exhook.MessageDroppedRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil +func (s *HookGrpcService) OnMessageDropped(ctx context.Context, in *exhook2.MessageDroppedRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } -func (s *HookService) OnMessageAcked(ctx context.Context, in *exhook.MessageAckedRequest) (*exhook.EmptySuccess, error) { - return &exhook.EmptySuccess{}, nil -} -func (s *HookService) Stop() { - s.wg.Wait() +func (s *HookGrpcService) OnMessageAcked(ctx context.Context, in *exhook2.MessageAckedRequest) (*exhook2.EmptySuccess, error) { + return &exhook2.EmptySuccess{}, nil } diff --git a/iothub/protobuf/exhook.pb.go b/iothub/server/emqxserver/protobuf/exhook.pb.go similarity index 100% rename from iothub/protobuf/exhook.pb.go rename to iothub/server/emqxserver/protobuf/exhook.pb.go diff --git a/iothub/protobuf/exhook.proto b/iothub/server/emqxserver/protobuf/exhook.proto similarity index 100% rename from iothub/protobuf/exhook.proto rename to iothub/server/emqxserver/protobuf/exhook.proto diff --git a/iothub/protobuf/exhook_grpc.pb.go b/iothub/server/emqxserver/protobuf/exhook_grpc.pb.go similarity index 100% rename from iothub/protobuf/exhook_grpc.pb.go rename to iothub/server/emqxserver/protobuf/exhook_grpc.pb.go diff --git a/iothub/topic.go b/iothub/server/emqxserver/topic.go similarity index 98% rename from iothub/topic.go rename to iothub/server/emqxserver/topic.go index fa0ac23..76cdd17 100644 --- a/iothub/topic.go +++ b/iothub/server/emqxserver/topic.go @@ -1,4 +1,4 @@ -package iothub +package emqxserver import ( "pandax/pkg/rule_engine/message" diff --git a/iothub/server/httpserver/hook.go b/iothub/server/httpserver/hook.go new file mode 100644 index 0000000..3c1c2a3 --- /dev/null +++ b/iothub/server/httpserver/hook.go @@ -0,0 +1,66 @@ +package httpserver + +import ( + "context" + "github.com/emicklei/go-restful/v3" + "io" + "log" + "net" + "net/http" + "pandax/iothub/hook_message_work" + "pandax/pkg/global" + "strings" +) + +type HookHttpService struct { + HookService *hook_message_work.HookService +} + +func InitHttpHook(addr string, hs *hook_message_work.HookService) { + server := NewHttpServer(addr) + service := NewHookHttpService(hs) + container := server.Container + ws := new(restful.WebService) + ws.Path("/api/v1").Produces(restful.MIME_JSON) + ws.Route(ws.POST("/{token}/{pathType}").Filter(basicAuthenticate).To(service.hook)) + container.Add(ws) + + server.srv.ConnState = func(conn net.Conn, state http.ConnState) { + switch state { + case http.StateNew: + log.Println("New connection", conn.RemoteAddr()) + case http.StateActive: + log.Println("Connection active", conn.RemoteAddr()) + case http.StateIdle: + log.Println("Connection idle", conn.RemoteAddr()) + case http.StateHijacked, http.StateClosed: + log.Println("Connection closed", conn.RemoteAddr()) + } + } + err := server.Start(context.TODO()) + if err != nil { + global.Log.Error("IOTHUB HTTP服务启动错误", err) + } else { + global.Log.Infof("IOTHUB HOOK Start SUCCESS,Grpc Server listen: %s", addr) + } +} + +func NewHookHttpService(hs *hook_message_work.HookService) *HookHttpService { + hhs := &HookHttpService{ + HookService: hs, + } + return hhs +} + +// 获取token进行认证 +func basicAuthenticate(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) { + path := req.Request.URL.Path + log.Println(path) + split := strings.Split(path, "/") + log.Println(split) + chain.ProcessFilter(req, resp) +} + +func (hhs *HookHttpService) hook(req *restful.Request, resp *restful.Response) { + io.WriteString(resp, "42") +} diff --git a/iothub/server/httpserver/http_server.go b/iothub/server/httpserver/http_server.go new file mode 100644 index 0000000..baa802d --- /dev/null +++ b/iothub/server/httpserver/http_server.go @@ -0,0 +1,62 @@ +package httpserver + +import ( + "context" + "github.com/emicklei/go-restful/v3" + "net/http" + "pandax/pkg/global" +) + +const DefaultPort = ":9002" + +type HttpServer struct { + Addr string + srv *http.Server + Container *restful.Container +} + +func NewHttpServer(addr string) *HttpServer { + if addr == "" { + addr = DefaultPort + } + c := restful.NewContainer() + c.EnableContentEncoding(true) + return &HttpServer{ + Addr: addr, + Container: c, + srv: &http.Server{ + Addr: addr, + Handler: c, + }, + } +} + +func (s *HttpServer) GetServe() *http.Server { + return s.srv +} + +func (s *HttpServer) Type() string { + return "HTTP" +} + +func (s *HttpServer) Start(ctx context.Context) error { + go func() { + if global.Conf.Server.Tls.Enable { + global.Log.Infof("HTTPS Server listen: %s", s.Addr) + if err := s.srv.ListenAndServeTLS(global.Conf.Server.Tls.CertFile, global.Conf.Server.Tls.KeyFile); err != nil { + global.Log.Errorf("error http serve: %s", err) + } + } else { + global.Log.Infof("HTTP Server listen: %s", s.Addr) + if err := s.srv.ListenAndServe(); err != nil { + global.Log.Errorf("error http serve: %s", err) + } + } + }() + return nil +} + +func (s *HttpServer) Stop(ctx context.Context) error { + s.srv.Shutdown(ctx) + return nil +} diff --git a/main.go b/main.go index 2d4ba37..8a540d3 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "github.com/PandaXGO/PandaKit/logger" "github.com/PandaXGO/PandaKit/rediscli" "github.com/PandaXGO/PandaKit/restfulx" @@ -76,13 +75,13 @@ var rootCmd = &cobra.Command{ global.Log.Info("路由初始化完成") app.Start(context.TODO()) //开启IOTHUB - hs := iothub.InitEmqxHook(fmt.Sprintf(":%d", global.Conf.Server.GrpcPort)) + iothub.InitIothub() + //emqxserver.InitEmqxHook(fmt.Sprintf(":%d", global.Conf.Server.GrpcPort)) //开启视频服务 go engine.Run(context.Background(), "config.yml") stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGTERM, os.Interrupt) <-stop - hs.Stop() if err := app.Stop(context.TODO()); err != nil { log.Fatalf("fatal app stop: %s", err) os.Exit(-3) diff --git a/pkg/events/event.go b/pkg/events/event_bus.go similarity index 100% rename from pkg/events/event.go rename to pkg/events/event_bus.go diff --git a/pkg/transport/http_server.go b/pkg/transport/http_server.go index e5bda20..6f7d4c2 100644 --- a/pkg/transport/http_server.go +++ b/pkg/transport/http_server.go @@ -37,8 +37,16 @@ func (s *HttpServer) Type() Type { func (s *HttpServer) Start(ctx context.Context) error { global.Log.Infof("HTTP Server listen: %s", s.Addr) go func() { - if err := s.srv.ListenAndServe(); err != nil { - global.Log.Errorf("error http serve: %s", err) + if global.Conf.Server.Tls.Enable { + global.Log.Infof("HTTPS Server listen: %s", s.Addr) + if err := s.srv.ListenAndServeTLS(global.Conf.Server.Tls.CertFile, global.Conf.Server.Tls.KeyFile); err != nil { + global.Log.Errorf("error http serve: %s", err) + } + } else { + global.Log.Infof("HTTP Server listen: %s", s.Addr) + if err := s.srv.ListenAndServe(); err != nil { + global.Log.Errorf("error http serve: %s", err) + } } }() return nil diff --git a/shutdown.bat b/shutdown.bat index a296908..6109f2d 100644 --- a/shutdown.bat +++ b/shutdown.bat @@ -1 +1 @@ -taskkill /pid 16528 -t -f \ No newline at end of file +taskkill /pid 28656 -t -f \ No newline at end of file