package rabbit import ( "context" amqp "github.com/rabbitmq/amqp091-go" "log" "time" ) func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { runCtx, cancel := context.WithCancel(ctx) defer cancel() deliveries, err := queue.Consume() if err != nil { log.Printf("Could not start consuming: %s\n", err) return } chClosedCh := make(chan *amqp.Error, 1) queue.Channel.NotifyClose(chClosedCh) for { select { case <-runCtx.Done(): err = queue.Close() if err != nil { log.Printf("Close failed: %s\n", err) } return case amqErr := <-chClosedCh: log.Printf("AMQP Channel closed due to: %s\n", amqErr) deliveries, err = queue.Consume() if err != nil { log.Println("Error trying to consume, will try again") continue } chClosedCh = make(chan *amqp.Error, 1) queue.Channel.NotifyClose(chClosedCh) case delivery := <-deliveries: msgCh <- delivery.Body log.Printf("Received message: %s\n", delivery.Body) if err = delivery.Ack(false); err != nil { log.Printf("Error acknowledging message: %s\n", err) } <-time.After(time.Second * 2) } } }