From 42646a6a08ba1cafb3c029ab01b4069f696bdd75 Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 21 Feb 2026 15:03:43 +0300 Subject: [PATCH] reconnect timer + fixes --- consumer.go | 67 +++++++++++++++++++++++++++++++++++++++------------- handler.go | 5 ++-- publisher.go | 20 ++++++++++++++-- 3 files changed, 71 insertions(+), 21 deletions(-) diff --git a/consumer.go b/consumer.go index 323cdb8..20c1317 100644 --- a/consumer.go +++ b/consumer.go @@ -5,21 +5,20 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" "log" + "time" ) type Consumer interface { - Start() chan []byte + Start(ctx context.Context, chanLen uint) chan []byte } type consumeHandler struct { - ctx context.Context - client *Client - chanLen int + client *Client } -func (c *consumeHandler) Start() chan []byte { - msgCh := make(chan []byte, c.chanLen) - go runConsumer(c.ctx, c.client, msgCh) +func (c *consumeHandler) Start(ctx context.Context, chanLen uint) chan []byte { + msgCh := make(chan []byte, chanLen) + go runConsumer(ctx, c.client, msgCh) return msgCh } @@ -39,8 +38,20 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { chClosedCh := make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) + reconnectTimer := time.NewTimer(0) + defer reconnectTimer.Stop() + <-reconnectTimer.C + for { + if !reconnectTimer.Stop() { + select { + case <-reconnectTimer.C: + default: + } + } + select { + case <-runCtx.Done(): err = client.Close() if err != nil { @@ -49,25 +60,49 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { return case amqErr := <-chClosedCh: - log.Printf("AMQP Channel closed due to: %s\n", amqErr) + log.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + reconnectTimer.Reset(time.Second) + case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - log.Println("Error trying to consume, will try again") + log.Println("Error trying to consume, will try again. Retry in 5 seconds.") + reconnectTimer.Reset(time.Second * 5) continue } chClosedCh = make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) - case delivery := <-deliveries: - msgCh <- delivery.Body - log.Printf("Received message: %s\n", delivery.Body) - - if err = delivery.Ack(false); err != nil { - log.Printf("Error acknowledging message: %s\n", err) + case delivery, ok := <-deliveries: + if !ok { + log.Println("Deliveries channel closed unexpectedly") + reconnectTimer.Reset(time.Second) + continue + } + + if err = limiter.Wait(runCtx); err != nil { + log.Printf("Wait limiter failed: %s\n", err) + } + + select { + case <-runCtx.Done(): + if err = delivery.Nack(false, true); err != nil { + log.Printf("Error nacking message: %s\n", err) + } + + err = client.Close() + if err != nil { + log.Printf("Close failed: %s\n", err) + } + return + + case msgCh <- delivery.Body: + log.Printf("Received message: %s\n", delivery.Body) + if err = delivery.Ack(false); err != nil { + log.Printf("Error acknowledging message: %s\n", err) + } } - limiter.Wait(runCtx) } } } diff --git a/handler.go b/handler.go index 7a91921..949887f 100644 --- a/handler.go +++ b/handler.go @@ -1,7 +1,6 @@ package rabbit import ( - "context" amqp "github.com/rabbitmq/amqp091-go" "log" "os" @@ -68,6 +67,6 @@ func NewPublisher(client *Client) Publisher { return &pubHandler{client: client} } -func NewConsumer(ctx context.Context, client *Client, chanLen int) Consumer { - return &consumeHandler{ctx: ctx, client: client, chanLen: chanLen} +func NewConsumer(client *Client) Consumer { + return &consumeHandler{client: client} } diff --git a/publisher.go b/publisher.go index 45c3772..e71a9b8 100644 --- a/publisher.go +++ b/publisher.go @@ -2,18 +2,34 @@ package rabbit import ( "errors" + "log" "time" ) type Publisher interface { - Push(data []byte) error + Start(chanLen uint) <-chan []byte } type pubHandler struct { client *Client } -func (p *pubHandler) Push(data []byte) error { +func (p *pubHandler) Start(chanLen uint) <-chan []byte { + ch := make(chan []byte, chanLen) + go func() { + for { + select { + case msg := <-ch: + if err := p.push(msg); err != nil { + log.Printf("Error publishing message: %s", err) + } + } + } + }() + return ch +} + +func (p *pubHandler) push(data []byte) error { p.client.mutex.Lock() if !p.client.isReady { p.client.mutex.Unlock()