From 6addcca08d22dcd1e68a074b74846a0e67c840b4 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 16:04:48 +0300 Subject: [PATCH] private methods + push move --- consumer.go | 4 ++-- service.go | 31 ++----------------------------- 2 files changed, 4 insertions(+), 31 deletions(-) diff --git a/consumer.go b/consumer.go index b8d93d9..9aac126 100644 --- a/consumer.go +++ b/consumer.go @@ -13,7 +13,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) - deliveries, err := client.Consume() + deliveries, err := client.consume() if err != nil { log.Printf("Could not start consuming: %s\n", err) return @@ -34,7 +34,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { case amqErr := <-chClosedCh: log.Printf("AMQP Channel closed due to: %s\n", amqErr) - deliveries, err = client.Consume() + deliveries, err = client.consume() if err != nil { log.Println("Error trying to consume, will try again") continue diff --git a/service.go b/service.go index 889e8c7..c997307 100644 --- a/service.go +++ b/service.go @@ -2,7 +2,6 @@ package rabbit import ( "context" - "errors" amqp "github.com/rabbitmq/amqp091-go" "time" ) @@ -115,33 +114,7 @@ func (c *Client) changeChannel(channel *amqp.Channel) { c.Channel.NotifyPublish(c.notifyConfirm) } -func (c *Client) Push(data []byte) error { - c.mutex.Lock() - if !c.isReady { - c.mutex.Unlock() - return errors.New("failed to push: not connected") - } - c.mutex.Unlock() - for { - err := c.UnsafePush(data) - if err != nil { - c.logger.Println("Push failed. Retrying...") - select { - case <-c.done: - return errShutdown - case <-time.After(c.opts.resendDelay): - } - continue - } - confirm := <-c.notifyConfirm - if confirm.Ack { - c.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) - return nil - } - } -} - -func (c *Client) UnsafePush(data []byte) error { +func (c *Client) unsafePush(data []byte) error { c.mutex.Lock() if !c.isReady { c.mutex.Unlock() @@ -165,7 +138,7 @@ func (c *Client) UnsafePush(data []byte) error { ) } -func (c *Client) Consume() (<-chan amqp.Delivery, error) { +func (c *Client) consume() (<-chan amqp.Delivery, error) { c.mutex.Lock() if !c.isReady { c.mutex.Unlock()