package rabbit import ( "context" amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" "time" ) type Consumer interface { Start(ctx context.Context, chanLen uint) <-chan []byte } type consumeHandler struct { client *Client } func (c *consumeHandler) Start(ctx context.Context, chanLen uint) <-chan []byte { msgCh := make(chan []byte, chanLen) go runConsumer(ctx, c.client, msgCh) return msgCh } func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { runCtx, cancel := context.WithCancel(ctx) defer cancel() limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) reconnectSignal := make(chan struct{}, 1) reconnectTrigger := func() { select { case reconnectSignal <- struct{}{}: default: } } // initial consume deliveries, err := client.consume() if err != nil { client.logger.Printf("Could not start consuming: %s\n", err) return } chClosedCh := make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) for { select { case <-runCtx.Done(): err = client.Close() if err != nil { client.logger.Printf("Close failed: %s\n", err) } return case <-reconnectSignal: select { case <-runCtx.Done(): return case <-time.After(client.opts.reconnectDelay): } deliveries, err = client.consume() if err != nil { client.logger.Printf("Consume reconnect failed, retry: %s\n", err) reconnectTrigger() continue } // re-create closing chan chClosedCh = make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) client.logger.Println("Consume reconnect success") case amqErr := <-chClosedCh: client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) reconnectTrigger() case delivery, ok := <-deliveries: if !ok { client.logger.Println("Deliveries channel closed unexpectedly") reconnectTrigger() continue } if err = limiter.Wait(runCtx); err != nil { client.logger.Printf("Wait limiter failed: %s\n", err) } select { case <-runCtx.Done(): if err = delivery.Nack(false, true); err != nil { client.logger.Printf("Error nacking message: %s\n", err) } err = client.Close() if err != nil { client.logger.Printf("Close failed: %s\n", err) } return case msgCh <- delivery.Body: client.logger.Printf("Received message: %s\n", delivery.Body) if err = delivery.Ack(false); err != nil { client.logger.Printf("Error acknowledging message: %s\n", err) } } } } }