From eb839238497bbb3fd2bc1ab4014a4894c9d5d622 Mon Sep 17 00:00:00 2001 From: PandaX <18610165312@163.com> Date: Mon, 9 Oct 2023 11:15:23 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90fix=E3=80=91=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E5=B9=B6=E5=8F=91=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iothub/hook_message_work/hook_message_work.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/iothub/hook_message_work/hook_message_work.go b/iothub/hook_message_work/hook_message_work.go index 85e35b9..cc77015 100644 --- a/iothub/hook_message_work/hook_message_work.go +++ b/iothub/hook_message_work/hook_message_work.go @@ -34,9 +34,15 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { if s.Ch != nil { // 用于并发限制 s.Ch <- struct{}{} } + // 用于做优雅关闭, 主要作用是,程序关闭,将队列中的消息处理完成后在关闭, s.Wg.Add(1) go func() { - defer s.Wg.Done() + defer func() { + s.Wg.Done() + if s.Ch != nil { // 用于并发限制 + <-s.Ch + } + }() switch msg.Type { case message.RowMes, message.AttributesMes, message.TelemetryMes, message.RpcRequestMes: msgVals := make(map[string]interface{})