diff --git a/consumer.go b/consumer.go index 22dde93..5c68bf6 100644 --- a/consumer.go +++ b/consumer.go @@ -4,7 +4,6 @@ import ( "context" amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" - "log" "time" ) @@ -29,78 +28,84 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) + reconnectSignal := make(chan struct{}, 1) + reconnectTrigger := func() { + select { + case reconnectSignal <- struct{}{}: + default: + } + } + + // initial consume deliveries, err := client.consume() if err != nil { - log.Printf("Could not start consuming: %s\n", err) + client.logger.Printf("Could not start consuming: %s\n", err) return } chClosedCh := make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) - reconnectTimer := time.NewTimer(0) - defer reconnectTimer.Stop() - <-reconnectTimer.C - for { - if !reconnectTimer.Stop() { - select { - case <-reconnectTimer.C: - default: - } - } - select { case <-runCtx.Done(): err = client.Close() if err != nil { - log.Printf("Close failed: %s\n", err) + client.logger.Printf("Close failed: %s\n", err) } return - case amqErr := <-chClosedCh: - log.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) - reconnectTimer.Reset(time.Second) + case <-reconnectSignal: + select { + case <-runCtx.Done(): + return + case <-time.After(client.opts.reconnectDelay): + } - case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - log.Println("Error trying to consume, will try again. Retry in 5 seconds.") - reconnectTimer.Reset(time.Second * 5) + client.logger.Printf("Consume reconnect failed, retry: %s\n", err) + reconnectTrigger() continue } + // re-create closing chan chClosedCh = make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) + client.logger.Println("Consume reconnect success") + + case amqErr := <-chClosedCh: + client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + reconnectTrigger() case delivery, ok := <-deliveries: if !ok { - log.Println("Deliveries channel closed unexpectedly") - reconnectTimer.Reset(time.Second) + client.logger.Println("Deliveries channel closed unexpectedly") + reconnectTrigger() continue } if err = limiter.Wait(runCtx); err != nil { - log.Printf("Wait limiter failed: %s\n", err) + client.logger.Printf("Wait limiter failed: %s\n", err) } select { case <-runCtx.Done(): if err = delivery.Nack(false, true); err != nil { - log.Printf("Error nacking message: %s\n", err) + client.logger.Printf("Error nacking message: %s\n", err) } err = client.Close() if err != nil { - log.Printf("Close failed: %s\n", err) + client.logger.Printf("Close failed: %s\n", err) } return case msgCh <- delivery.Body: - log.Printf("Received message: %s\n", delivery.Body) + client.logger.Printf("Received message: %s\n", delivery.Body) if err = delivery.Ack(false); err != nil { - log.Printf("Error acknowledging message: %s\n", err) + client.logger.Printf("Error acknowledging message: %s\n", err) } } } diff --git a/handler.go b/handler.go index 63145de..6a8dd3f 100644 --- a/handler.go +++ b/handler.go @@ -6,13 +6,14 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "io" "log" + "os" "sync" "time" ) type Client struct { mutex *sync.Mutex - queueName string + queueOptions *QueueOpts logger *log.Logger connection *amqp.Connection Channel *amqp.Channel @@ -35,21 +36,33 @@ type options struct { logger *log.Logger } -func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { +type QueueOpts struct { + QueueName string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Args amqp.Table +} + +func NewClient(address Address, queueOpts QueueOpts, opts ...Option) (*Client, error) { + l := log.New(os.Stdout, "", log.LstdFlags) + addr, err := address.makeAddr() if err != nil { return nil, errors.Join(errBadAddr, err) } - if queueName == "" { - log.Fatal(errNoQueue) + if queueOpts.QueueName == "" { + l.Fatal(errNoQueue) } client := Client{ - mutex: &sync.Mutex{}, - queueName: queueName, - done: make(chan bool), - connected: make(chan struct{}), + mutex: &sync.Mutex{}, + queueOptions: &queueOpts, + done: make(chan bool), + connected: make(chan struct{}), + logger: l, } o := options{ diff --git a/publisher.go b/publisher.go index a65154a..c938eb5 100644 --- a/publisher.go +++ b/publisher.go @@ -3,7 +3,6 @@ package rabbit import ( "context" "errors" - "log" "time" ) @@ -23,15 +22,15 @@ func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte { case <-ctx.Done(): for msg := range ch { if err := p.push(msg); err != nil { - log.Printf("Error publishing message (shutdown): %s", err) + p.client.logger.Printf("Error publishing message (shutdown): %s", err) } } - log.Println("Publisher stopped") + p.client.logger.Println("Publisher stopped") return case msg := <-ch: if err := p.push(msg); err != nil { - log.Printf("Error publishing message: %s", err) + p.client.logger.Printf("Error publishing message: %s", err) } } } diff --git a/service.go b/service.go index 6e3355d..c3f9ec8 100644 --- a/service.go +++ b/service.go @@ -78,7 +78,16 @@ func (c *Client) init(conn *amqp.Connection) error { return err } - _, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) + //_, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) + _, err = ch.QueueDeclare( + c.queueOptions.QueueName, + c.queueOptions.Durable, + c.queueOptions.AutoDelete, + c.queueOptions.Exclusive, + c.queueOptions.NoWait, + c.queueOptions.Args, + ) + if err != nil { return err } @@ -120,7 +129,7 @@ func (c *Client) unsafePush(data []byte) error { return c.Channel.PublishWithContext( ctx, "", - c.queueName, + c.queueOptions.QueueName, false, false, amqp.Publishing{ @@ -147,7 +156,7 @@ func (c *Client) consume() (<-chan amqp.Delivery, error) { } return c.Channel.Consume( - c.queueName, + c.queueOptions.QueueName, "", false, false,