diff --git a/consumer.go b/consumer.go index 5c68bf6..6f125c7 100644 --- a/consumer.go +++ b/consumer.go @@ -28,15 +28,6 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) - reconnectSignal := make(chan struct{}, 1) - reconnectTrigger := func() { - select { - case reconnectSignal <- struct{}{}: - default: - } - } - - // initial consume deliveries, err := client.consume() if err != nil { client.logger.Printf("Could not start consuming: %s\n", err) @@ -46,7 +37,18 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { chClosedCh := make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) + reconnectTimer := time.NewTimer(0) + defer reconnectTimer.Stop() + <-reconnectTimer.C + for { + if !reconnectTimer.Stop() { + select { + case <-reconnectTimer.C: + default: + } + } + select { case <-runCtx.Done(): @@ -56,33 +58,25 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { } return - case <-reconnectSignal: - select { - case <-runCtx.Done(): - return - case <-time.After(client.opts.reconnectDelay): - } + case amqErr := <-chClosedCh: + client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + reconnectTimer.Reset(time.Second) + case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - client.logger.Printf("Consume reconnect failed, retry: %s\n", err) - reconnectTrigger() + client.logger.Println("Error trying to consume, will try again. Retry in 5 seconds.") + reconnectTimer.Reset(time.Second * 5) continue } - // re-create closing chan chClosedCh = make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) - client.logger.Println("Consume reconnect success") - - case amqErr := <-chClosedCh: - client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) - reconnectTrigger() case delivery, ok := <-deliveries: if !ok { client.logger.Println("Deliveries channel closed unexpectedly") - reconnectTrigger() + reconnectTimer.Reset(time.Second) continue } diff --git a/handler.go b/handler.go index 6a8dd3f..68f5815 100644 --- a/handler.go +++ b/handler.go @@ -13,7 +13,7 @@ import ( type Client struct { mutex *sync.Mutex - queueOptions *QueueOpts + queueName string logger *log.Logger connection *amqp.Connection Channel *amqp.Channel @@ -36,16 +36,7 @@ type options struct { logger *log.Logger } -type QueueOpts struct { - QueueName string - Durable bool - AutoDelete bool - Exclusive bool - NoWait bool - Args amqp.Table -} - -func NewClient(address Address, queueOpts QueueOpts, opts ...Option) (*Client, error) { +func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { l := log.New(os.Stdout, "", log.LstdFlags) addr, err := address.makeAddr() @@ -53,16 +44,16 @@ func NewClient(address Address, queueOpts QueueOpts, opts ...Option) (*Client, e return nil, errors.Join(errBadAddr, err) } - if queueOpts.QueueName == "" { + if queueName == "" { l.Fatal(errNoQueue) } client := Client{ - mutex: &sync.Mutex{}, - queueOptions: &queueOpts, - done: make(chan bool), - connected: make(chan struct{}), - logger: l, + mutex: &sync.Mutex{}, + queueName: queueName, + done: make(chan bool), + connected: make(chan struct{}), + logger: l, } o := options{ diff --git a/service.go b/service.go index c3f9ec8..6e3355d 100644 --- a/service.go +++ b/service.go @@ -78,16 +78,7 @@ func (c *Client) init(conn *amqp.Connection) error { return err } - //_, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) - _, err = ch.QueueDeclare( - c.queueOptions.QueueName, - c.queueOptions.Durable, - c.queueOptions.AutoDelete, - c.queueOptions.Exclusive, - c.queueOptions.NoWait, - c.queueOptions.Args, - ) - + _, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) if err != nil { return err } @@ -129,7 +120,7 @@ func (c *Client) unsafePush(data []byte) error { return c.Channel.PublishWithContext( ctx, "", - c.queueOptions.QueueName, + c.queueName, false, false, amqp.Publishing{ @@ -156,7 +147,7 @@ func (c *Client) consume() (<-chan amqp.Delivery, error) { } return c.Channel.Consume( - c.queueOptions.QueueName, + c.queueName, "", false, false,