diff --git a/consumer.go b/consumer.go index 22dde93..6f125c7 100644 --- a/consumer.go +++ b/consumer.go @@ -4,7 +4,6 @@ import ( "context" amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" - "log" "time" ) @@ -31,7 +30,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { deliveries, err := client.consume() if err != nil { - log.Printf("Could not start consuming: %s\n", err) + client.logger.Printf("Could not start consuming: %s\n", err) return } @@ -55,18 +54,18 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { case <-runCtx.Done(): err = client.Close() if err != nil { - log.Printf("Close failed: %s\n", err) + client.logger.Printf("Close failed: %s\n", err) } return case amqErr := <-chClosedCh: - log.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) reconnectTimer.Reset(time.Second) case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - log.Println("Error trying to consume, will try again. Retry in 5 seconds.") + client.logger.Println("Error trying to consume, will try again. Retry in 5 seconds.") reconnectTimer.Reset(time.Second * 5) continue } @@ -76,31 +75,31 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { case delivery, ok := <-deliveries: if !ok { - log.Println("Deliveries channel closed unexpectedly") + client.logger.Println("Deliveries channel closed unexpectedly") reconnectTimer.Reset(time.Second) continue } if err = limiter.Wait(runCtx); err != nil { - log.Printf("Wait limiter failed: %s\n", err) + client.logger.Printf("Wait limiter failed: %s\n", err) } select { case <-runCtx.Done(): if err = delivery.Nack(false, true); err != nil { - log.Printf("Error nacking message: %s\n", err) + client.logger.Printf("Error nacking message: %s\n", err) } err = client.Close() if err != nil { - log.Printf("Close failed: %s\n", err) + client.logger.Printf("Close failed: %s\n", err) } return case msgCh <- delivery.Body: - log.Printf("Received message: %s\n", delivery.Body) + client.logger.Printf("Received message: %s\n", delivery.Body) if err = delivery.Ack(false); err != nil { - log.Printf("Error acknowledging message: %s\n", err) + client.logger.Printf("Error acknowledging message: %s\n", err) } } } diff --git a/handler.go b/handler.go index 63145de..68f5815 100644 --- a/handler.go +++ b/handler.go @@ -6,6 +6,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "io" "log" + "os" "sync" "time" ) @@ -36,13 +37,15 @@ type options struct { } func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { + l := log.New(os.Stdout, "", log.LstdFlags) + addr, err := address.makeAddr() if err != nil { return nil, errors.Join(errBadAddr, err) } if queueName == "" { - log.Fatal(errNoQueue) + l.Fatal(errNoQueue) } client := Client{ @@ -50,6 +53,7 @@ func NewClient(address Address, queueName string, opts ...Option) (*Client, erro queueName: queueName, done: make(chan bool), connected: make(chan struct{}), + logger: l, } o := options{ diff --git a/publisher.go b/publisher.go index a65154a..ccb415f 100644 --- a/publisher.go +++ b/publisher.go @@ -23,15 +23,15 @@ func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte { case <-ctx.Done(): for msg := range ch { if err := p.push(msg); err != nil { - log.Printf("Error publishing message (shutdown): %s", err) + p.client.logger.Printf("Error publishing message (shutdown): %s", err) } } - log.Println("Publisher stopped") + p.client.logger.Println("Publisher stopped") return case msg := <-ch: if err := p.push(msg); err != nil { - log.Printf("Error publishing message: %s", err) + p.client.logger.Printf("Error publishing message: %s", err) } } }