From 8f967361e88ba8947f641633710399aa4910667a Mon Sep 17 00:00:00 2001 From: nquidox Date: Thu, 2 Apr 2026 16:09:17 +0300 Subject: [PATCH 01/11] connection check --- handler.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/handler.go b/handler.go index e54146b..61dc5dc 100644 --- a/handler.go +++ b/handler.go @@ -99,7 +99,13 @@ func (c *Client) connectAndSignal(addr string, timeout time.Duration) error { if res.err != nil { return res.err } - c.connection = res.conn + + c.changeConnection(res.conn) + if err := c.init(res.conn); err != nil { + res.conn.Close() + return fmt.Errorf("init failed: %w", err) + } + close(c.connected) return nil } From 5e8661cfcd9a30c8febece21a757649ab5f01209 Mon Sep 17 00:00:00 2001 From: nquidox Date: Thu, 2 Apr 2026 16:15:36 +0300 Subject: [PATCH 02/11] isReady disabled --- service.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/service.go b/service.go index c997307..cf6ccf1 100644 --- a/service.go +++ b/service.go @@ -8,9 +8,9 @@ import ( func (c *Client) handleReconnect(addr string) { for { - c.mutex.Lock() - c.isReady = false - c.mutex.Unlock() + //c.mutex.Lock() + //c.isReady = false + //c.mutex.Unlock() c.logger.Println("Connecting to server...") @@ -45,9 +45,9 @@ func (c *Client) connect(addr string) (*amqp.Connection, error) { func (c *Client) handleReInit(conn *amqp.Connection) bool { for { - c.mutex.Lock() - c.isReady = false - c.mutex.Unlock() + //c.mutex.Lock() + //c.isReady = false + //c.mutex.Unlock() if err := c.init(conn); err != nil { c.logger.Printf("Failed to initialize connection: %s", err) From f23b3886890d8c38bf446ac377660176ca41e7df Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 10:27:01 +0300 Subject: [PATCH 03/11] mutex removed --- service.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/service.go b/service.go index cf6ccf1..6e3355d 100644 --- a/service.go +++ b/service.go @@ -8,10 +8,6 @@ import ( func (c *Client) handleReconnect(addr string) { for { - //c.mutex.Lock() - //c.isReady = false - //c.mutex.Unlock() - c.logger.Println("Connecting to server...") conn, err := c.connect(addr) @@ -45,10 +41,6 @@ func (c *Client) connect(addr string) (*amqp.Connection, error) { func (c *Client) handleReInit(conn *amqp.Connection) bool { for { - //c.mutex.Lock() - //c.isReady = false - //c.mutex.Unlock() - if err := c.init(conn); err != nil { c.logger.Printf("Failed to initialize connection: %s", err) From 4810a8666e72981f9fc646fa227e1b1030aaa4f8 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:20:53 +0300 Subject: [PATCH 04/11] msg change --- messages.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/messages.go b/messages.go index 0f634d1..fd422ab 100644 --- a/messages.go +++ b/messages.go @@ -3,7 +3,7 @@ package rabbit import "errors" var ( - errNoAddr = errors.New("no address") + errBadAddr = errors.New("bad address") errNoQueue = errors.New("no queue") errNotConnected = errors.New("not connected to a server") errAlreadyClosed = errors.New("already closed: not connected to the server") From e58b052251ad6b9c89aac62ada6d679b9a49bde1 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:21:13 +0300 Subject: [PATCH 05/11] logging options added --- options.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/options.go b/options.go index 37ad97c..3e84f95 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,11 @@ package rabbit -import "time" +import ( + "io" + "log" + "os" + "time" +) type Option func(*options) @@ -39,3 +44,17 @@ func WithConsumerBurstSize(t int) Option { op.consumerBurstSize = t } } + +func WithLogger(l *log.Logger) Option { + return func(op *options) { op.logger = l } +} + +func WithLogging(enabled bool) Option { + return func(op *options) { + if enabled { + op.logger = log.New(os.Stdout, "", log.LstdFlags) + } else { + op.logger = log.New(io.Discard, "", 0) + } + } +} From 44d99a4ba560caaaa94fa154c0dcdf65fba8b812 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:21:24 +0300 Subject: [PATCH 06/11] address maker --- address.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 address.go diff --git a/address.go b/address.go new file mode 100644 index 0000000..dadf5d3 --- /dev/null +++ b/address.go @@ -0,0 +1,34 @@ +package rabbit + +import ( + "errors" + "fmt" + "net/url" +) + +type Address struct { + Username string + Password string + Host string + Port uint16 + Vhost string +} + +func (a *Address) makeAddr() (string, error) { + if a.Host == "" { + return "", errors.New("no host provided") + } + + if a.Vhost == "" { + return "", errors.New("no vhost provided") + } + + //"amqp://username:password@host:port/vhost" + return fmt.Sprintf("amqp://%v:%v@%v:%v/%v", + url.QueryEscape(a.Username), + url.QueryEscape(a.Password), + a.Host, + a.Port, + url.PathEscape(a.Vhost), + ), nil +} From a2862350b37c2c6ae98c4a736be1f3c46f522f9c Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:22:13 +0300 Subject: [PATCH 07/11] added address, logging + time fixes --- handler.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/handler.go b/handler.go index 61dc5dc..63145de 100644 --- a/handler.go +++ b/handler.go @@ -1,10 +1,11 @@ package rabbit import ( + "errors" "fmt" amqp "github.com/rabbitmq/amqp091-go" + "io" "log" - "os" "sync" "time" ) @@ -31,11 +32,13 @@ type options struct { resendDelay time.Duration consumerRateLimit time.Duration consumerBurstSize int + logger *log.Logger } -func NewClient(addr, queueName string, opts ...Option) (*Client, error) { - if addr == "" { - log.Fatal(errNoAddr) +func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { + addr, err := address.makeAddr() + if err != nil { + return nil, errors.Join(errBadAddr, err) } if queueName == "" { @@ -44,7 +47,6 @@ func NewClient(addr, queueName string, opts ...Option) (*Client, error) { client := Client{ mutex: &sync.Mutex{}, - logger: log.New(os.Stdout, "", log.LstdFlags), queueName: queueName, done: make(chan bool), connected: make(chan struct{}), @@ -52,18 +54,21 @@ func NewClient(addr, queueName string, opts ...Option) (*Client, error) { o := options{ connectTimeout: 15 * time.Second, - reconnectDelay: 5, - reInitDelay: 2, - resendDelay: 5, + reconnectDelay: 5 * time.Second, + reInitDelay: 2 * time.Second, + resendDelay: 5 * time.Second, consumerRateLimit: time.Millisecond * 500, consumerBurstSize: 10, + logger: log.New(io.Discard, "", 0), } for _, opt := range opts { opt(&o) } - if err := client.connectAndSignal(addr, o.connectTimeout); err != nil { + client.logger = o.logger + + if err = client.connectAndSignal(addr, o.connectTimeout); err != nil { return nil, fmt.Errorf("failed to connect: %w", err) } From 154320d288ad2efec9a6cc435ddf843610ac285c Mon Sep 17 00:00:00 2001 From: nquidox Date: Wed, 8 Apr 2026 12:05:39 +0300 Subject: [PATCH 08/11] logger fix --- consumer.go | 21 ++++++++++----------- handler.go | 6 +++++- publisher.go | 6 +++--- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/consumer.go b/consumer.go index 22dde93..6f125c7 100644 --- a/consumer.go +++ b/consumer.go @@ -4,7 +4,6 @@ import ( "context" amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" - "log" "time" ) @@ -31,7 +30,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { deliveries, err := client.consume() if err != nil { - log.Printf("Could not start consuming: %s\n", err) + client.logger.Printf("Could not start consuming: %s\n", err) return } @@ -55,18 +54,18 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { case <-runCtx.Done(): err = client.Close() if err != nil { - log.Printf("Close failed: %s\n", err) + client.logger.Printf("Close failed: %s\n", err) } return case amqErr := <-chClosedCh: - log.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) reconnectTimer.Reset(time.Second) case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - log.Println("Error trying to consume, will try again. Retry in 5 seconds.") + client.logger.Println("Error trying to consume, will try again. Retry in 5 seconds.") reconnectTimer.Reset(time.Second * 5) continue } @@ -76,31 +75,31 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { case delivery, ok := <-deliveries: if !ok { - log.Println("Deliveries channel closed unexpectedly") + client.logger.Println("Deliveries channel closed unexpectedly") reconnectTimer.Reset(time.Second) continue } if err = limiter.Wait(runCtx); err != nil { - log.Printf("Wait limiter failed: %s\n", err) + client.logger.Printf("Wait limiter failed: %s\n", err) } select { case <-runCtx.Done(): if err = delivery.Nack(false, true); err != nil { - log.Printf("Error nacking message: %s\n", err) + client.logger.Printf("Error nacking message: %s\n", err) } err = client.Close() if err != nil { - log.Printf("Close failed: %s\n", err) + client.logger.Printf("Close failed: %s\n", err) } return case msgCh <- delivery.Body: - log.Printf("Received message: %s\n", delivery.Body) + client.logger.Printf("Received message: %s\n", delivery.Body) if err = delivery.Ack(false); err != nil { - log.Printf("Error acknowledging message: %s\n", err) + client.logger.Printf("Error acknowledging message: %s\n", err) } } } diff --git a/handler.go b/handler.go index 63145de..68f5815 100644 --- a/handler.go +++ b/handler.go @@ -6,6 +6,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "io" "log" + "os" "sync" "time" ) @@ -36,13 +37,15 @@ type options struct { } func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { + l := log.New(os.Stdout, "", log.LstdFlags) + addr, err := address.makeAddr() if err != nil { return nil, errors.Join(errBadAddr, err) } if queueName == "" { - log.Fatal(errNoQueue) + l.Fatal(errNoQueue) } client := Client{ @@ -50,6 +53,7 @@ func NewClient(address Address, queueName string, opts ...Option) (*Client, erro queueName: queueName, done: make(chan bool), connected: make(chan struct{}), + logger: l, } o := options{ diff --git a/publisher.go b/publisher.go index a65154a..ccb415f 100644 --- a/publisher.go +++ b/publisher.go @@ -23,15 +23,15 @@ func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte { case <-ctx.Done(): for msg := range ch { if err := p.push(msg); err != nil { - log.Printf("Error publishing message (shutdown): %s", err) + p.client.logger.Printf("Error publishing message (shutdown): %s", err) } } - log.Println("Publisher stopped") + p.client.logger.Println("Publisher stopped") return case msg := <-ch: if err := p.push(msg); err != nil { - log.Printf("Error publishing message: %s", err) + p.client.logger.Printf("Error publishing message: %s", err) } } } From 9b9d8a306405910a6da1e7406ad0f8f505e64929 Mon Sep 17 00:00:00 2001 From: nquidox Date: Wed, 8 Apr 2026 12:21:19 +0300 Subject: [PATCH 09/11] removed unused import --- publisher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/publisher.go b/publisher.go index ccb415f..c938eb5 100644 --- a/publisher.go +++ b/publisher.go @@ -3,7 +3,6 @@ package rabbit import ( "context" "errors" - "log" "time" ) From fd495892faae5b3880fba6656fbc60a8097c244e Mon Sep 17 00:00:00 2001 From: nquidox Date: Mon, 27 Apr 2026 00:06:46 +0300 Subject: [PATCH 10/11] queue opts --- handler.go | 25 +++++++++++++++++-------- service.go | 15 ++++++++++++--- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/handler.go b/handler.go index 68f5815..6a8dd3f 100644 --- a/handler.go +++ b/handler.go @@ -13,7 +13,7 @@ import ( type Client struct { mutex *sync.Mutex - queueName string + queueOptions *QueueOpts logger *log.Logger connection *amqp.Connection Channel *amqp.Channel @@ -36,7 +36,16 @@ type options struct { logger *log.Logger } -func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { +type QueueOpts struct { + QueueName string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Args amqp.Table +} + +func NewClient(address Address, queueOpts QueueOpts, opts ...Option) (*Client, error) { l := log.New(os.Stdout, "", log.LstdFlags) addr, err := address.makeAddr() @@ -44,16 +53,16 @@ func NewClient(address Address, queueName string, opts ...Option) (*Client, erro return nil, errors.Join(errBadAddr, err) } - if queueName == "" { + if queueOpts.QueueName == "" { l.Fatal(errNoQueue) } client := Client{ - mutex: &sync.Mutex{}, - queueName: queueName, - done: make(chan bool), - connected: make(chan struct{}), - logger: l, + mutex: &sync.Mutex{}, + queueOptions: &queueOpts, + done: make(chan bool), + connected: make(chan struct{}), + logger: l, } o := options{ diff --git a/service.go b/service.go index 6e3355d..c3f9ec8 100644 --- a/service.go +++ b/service.go @@ -78,7 +78,16 @@ func (c *Client) init(conn *amqp.Connection) error { return err } - _, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) + //_, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) + _, err = ch.QueueDeclare( + c.queueOptions.QueueName, + c.queueOptions.Durable, + c.queueOptions.AutoDelete, + c.queueOptions.Exclusive, + c.queueOptions.NoWait, + c.queueOptions.Args, + ) + if err != nil { return err } @@ -120,7 +129,7 @@ func (c *Client) unsafePush(data []byte) error { return c.Channel.PublishWithContext( ctx, "", - c.queueName, + c.queueOptions.QueueName, false, false, amqp.Publishing{ @@ -147,7 +156,7 @@ func (c *Client) consume() (<-chan amqp.Delivery, error) { } return c.Channel.Consume( - c.queueName, + c.queueOptions.QueueName, "", false, false, From 72f039e4f7819930e5e6ed3faa6aa10ba81392aa Mon Sep 17 00:00:00 2001 From: nquidox Date: Mon, 4 May 2026 11:50:59 +0300 Subject: [PATCH 11/11] reconnect bugfix --- consumer.go | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/consumer.go b/consumer.go index 6f125c7..5c68bf6 100644 --- a/consumer.go +++ b/consumer.go @@ -28,6 +28,15 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) + reconnectSignal := make(chan struct{}, 1) + reconnectTrigger := func() { + select { + case reconnectSignal <- struct{}{}: + default: + } + } + + // initial consume deliveries, err := client.consume() if err != nil { client.logger.Printf("Could not start consuming: %s\n", err) @@ -37,18 +46,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { chClosedCh := make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) - reconnectTimer := time.NewTimer(0) - defer reconnectTimer.Stop() - <-reconnectTimer.C - for { - if !reconnectTimer.Stop() { - select { - case <-reconnectTimer.C: - default: - } - } - select { case <-runCtx.Done(): @@ -58,25 +56,33 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { } return - case amqErr := <-chClosedCh: - client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) - reconnectTimer.Reset(time.Second) + case <-reconnectSignal: + select { + case <-runCtx.Done(): + return + case <-time.After(client.opts.reconnectDelay): + } - case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - client.logger.Println("Error trying to consume, will try again. Retry in 5 seconds.") - reconnectTimer.Reset(time.Second * 5) + client.logger.Printf("Consume reconnect failed, retry: %s\n", err) + reconnectTrigger() continue } + // re-create closing chan chClosedCh = make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) + client.logger.Println("Consume reconnect success") + + case amqErr := <-chClosedCh: + client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + reconnectTrigger() case delivery, ok := <-deliveries: if !ok { client.logger.Println("Deliveries channel closed unexpectedly") - reconnectTimer.Reset(time.Second) + reconnectTrigger() continue }