diff --git a/consumer.go b/consumer.go index 5c68bf6..6f125c7 100644 --- a/consumer.go +++ b/consumer.go @@ -28,15 +28,6 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) - reconnectSignal := make(chan struct{}, 1) - reconnectTrigger := func() { - select { - case reconnectSignal <- struct{}{}: - default: - } - } - - // initial consume deliveries, err := client.consume() if err != nil { client.logger.Printf("Could not start consuming: %s\n", err) @@ -46,7 +37,18 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { chClosedCh := make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) + reconnectTimer := time.NewTimer(0) + defer reconnectTimer.Stop() + <-reconnectTimer.C + for { + if !reconnectTimer.Stop() { + select { + case <-reconnectTimer.C: + default: + } + } + select { case <-runCtx.Done(): @@ -56,33 +58,25 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { } return - case <-reconnectSignal: - select { - case <-runCtx.Done(): - return - case <-time.After(client.opts.reconnectDelay): - } + case amqErr := <-chClosedCh: + client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + reconnectTimer.Reset(time.Second) + case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - client.logger.Printf("Consume reconnect failed, retry: %s\n", err) - reconnectTrigger() + client.logger.Println("Error trying to consume, will try again. Retry in 5 seconds.") + reconnectTimer.Reset(time.Second * 5) continue } - // re-create closing chan chClosedCh = make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) - client.logger.Println("Consume reconnect success") - - case amqErr := <-chClosedCh: - client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) - reconnectTrigger() case delivery, ok := <-deliveries: if !ok { client.logger.Println("Deliveries channel closed unexpectedly") - reconnectTrigger() + reconnectTimer.Reset(time.Second) continue }