diff --git a/iothub/hook_message_work/hook_message_work.go b/iothub/hook_message_work/hook_message_work.go index 85e35b9..cc77015 100644 --- a/iothub/hook_message_work/hook_message_work.go +++ b/iothub/hook_message_work/hook_message_work.go @@ -34,9 +34,15 @@ func (s *HookService) handleOne(msg *netbase.DeviceEventInfo) { if s.Ch != nil { // 用于并发限制 s.Ch <- struct{}{} } + // 用于做优雅关闭, 主要作用是,程序关闭,将队列中的消息处理完成后在关闭, s.Wg.Add(1) go func() { - defer s.Wg.Done() + defer func() { + s.Wg.Done() + if s.Ch != nil { // 用于并发限制 + <-s.Ch + } + }() switch msg.Type { case message.RowMes, message.AttributesMes, message.TelemetryMes, message.RpcRequestMes: msgVals := make(map[string]interface{})