From 8b1bc57cd05e9cf565506db57a1542cbcca32cb2 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 14:39:24 +0300 Subject: [PATCH 01/26] simple rate limiter --- consumer.go | 7 +++++-- go.mod | 2 ++ go.sum | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index 2127b65..8a17f4a 100644 --- a/consumer.go +++ b/consumer.go @@ -3,6 +3,7 @@ package rabbit import ( "context" amqp "github.com/rabbitmq/amqp091-go" + "golang.org/x/time/rate" "log" "time" ) @@ -11,6 +12,8 @@ func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { runCtx, cancel := context.WithCancel(ctx) defer cancel() + limiter := rate.NewLimiter(rate.Every(time.Millisecond*500), 100) + deliveries, err := queue.Consume() if err != nil { log.Printf("Could not start consuming: %s\n", err) @@ -44,11 +47,11 @@ func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { case delivery := <-deliveries: msgCh <- delivery.Body log.Printf("Received message: %s\n", delivery.Body) - + if err = delivery.Ack(false); err != nil { log.Printf("Error acknowledging message: %s\n", err) } - <-time.After(time.Second * 2) + limiter.Wait(runCtx) } } } diff --git a/go.mod b/go.mod index d18bf43..0a3c615 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,5 @@ module repo.nqws.ru/merch-tracker-v2/mt-rabbit go 1.25.0 require github.com/rabbitmq/amqp091-go v1.10.0 + +require golang.org/x/time v0.14.0 // indirect diff --git a/go.sum b/go.sum index 024eebe..64cd225 100644 --- a/go.sum +++ b/go.sum @@ -2,3 +2,5 @@ github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzuk github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= +golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= From 3f0401de1d462fc41af76e2a25f17a4a076bcf02 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 15:31:13 +0300 Subject: [PATCH 02/26] options added --- consumer.go | 15 ++++++------ handler.go | 68 +++++++++++++++++++++++++++-------------------------- messages.go | 11 +++++++++ options.go | 35 +++++++++++++++++++++++++++ service.go | 6 ++--- 5 files changed, 91 insertions(+), 44 deletions(-) create mode 100644 messages.go create mode 100644 options.go diff --git a/consumer.go b/consumer.go index 8a17f4a..b8d93d9 100644 --- a/consumer.go +++ b/consumer.go @@ -5,28 +5,27 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" "log" - "time" ) -func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { +func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { runCtx, cancel := context.WithCancel(ctx) defer cancel() - limiter := rate.NewLimiter(rate.Every(time.Millisecond*500), 100) + limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) - deliveries, err := queue.Consume() + deliveries, err := client.Consume() if err != nil { log.Printf("Could not start consuming: %s\n", err) return } chClosedCh := make(chan *amqp.Error, 1) - queue.Channel.NotifyClose(chClosedCh) + client.Channel.NotifyClose(chClosedCh) for { select { case <-runCtx.Done(): - err = queue.Close() + err = client.Close() if err != nil { log.Printf("Close failed: %s\n", err) } @@ -35,14 +34,14 @@ func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { case amqErr := <-chClosedCh: log.Printf("AMQP Channel closed due to: %s\n", amqErr) - deliveries, err = queue.Consume() + deliveries, err = client.Consume() if err != nil { log.Println("Error trying to consume, will try again") continue } chClosedCh = make(chan *amqp.Error, 1) - queue.Channel.NotifyClose(chClosedCh) + client.Channel.NotifyClose(chClosedCh) case delivery := <-deliveries: msgCh <- delivery.Body diff --git a/handler.go b/handler.go index eacd93b..6109124 100644 --- a/handler.go +++ b/handler.go @@ -2,7 +2,6 @@ package rabbit import ( "context" - "errors" amqp "github.com/rabbitmq/amqp091-go" "log" "os" @@ -21,45 +20,48 @@ type Client struct { notifyChanClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isReady bool - reconnectDelay time.Duration - reInitDelay time.Duration - resendDelay time.Duration + opts options } -//const ( -// reconnectDelay = 5 * time.Second -// reInitDelay = 2 * time.Second -// resendDelay = 5 * time.Second -//) - -type Deps struct { - ReconnectDelay time.Duration - ReInitDelay time.Duration - ResendDelay time.Duration - QueueName string - Addr string +type options struct { + reconnectDelay time.Duration + reInitDelay time.Duration + resendDelay time.Duration + consumerRateLimit time.Duration + consumerBurstSize int } -var ( - errNotConnected = errors.New("not connected to a server") - errAlreadyClosed = errors.New("already closed: not connected to the server") - errShutdown = errors.New("client is shutting down") -) - -func NewClient(deps Deps) *Client { - client := Client{ - mutex: &sync.Mutex{}, - logger: log.New(os.Stdout, "", log.LstdFlags), - queueName: deps.QueueName, - done: make(chan bool), - reconnectDelay: deps.ReconnectDelay, - reInitDelay: deps.ReInitDelay, - resendDelay: deps.ResendDelay, +func NewClient(addr, queueName string, opts ...Option) (*Client, error) { + if addr == "" { + return nil, errNoAddr } - go client.handleReconnect(deps.Addr) + if queueName == "" { + return nil, errNoQueue + } - return &client + client := Client{ + mutex: &sync.Mutex{}, + logger: log.New(os.Stdout, "", log.LstdFlags), + queueName: queueName, + done: make(chan bool), + } + + o := options{ + reconnectDelay: 5, + reInitDelay: 2, + resendDelay: 5, + consumerRateLimit: time.Millisecond * 500, + consumerBurstSize: 10, + } + + for _, opt := range opts { + opt(&o) + } + + go client.handleReconnect(addr) + + return &client, nil } func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte { diff --git a/messages.go b/messages.go new file mode 100644 index 0000000..0f634d1 --- /dev/null +++ b/messages.go @@ -0,0 +1,11 @@ +package rabbit + +import "errors" + +var ( + errNoAddr = errors.New("no address") + errNoQueue = errors.New("no queue") + errNotConnected = errors.New("not connected to a server") + errAlreadyClosed = errors.New("already closed: not connected to the server") + errShutdown = errors.New("client is shutting down") +) diff --git a/options.go b/options.go new file mode 100644 index 0000000..aaa7828 --- /dev/null +++ b/options.go @@ -0,0 +1,35 @@ +package rabbit + +import "time" + +type Option func(*options) + +func WithReconnectDelay(t time.Duration) Option { + return func(op *options) { + op.reconnectDelay = t + } +} + +func WithReInitDelay(t time.Duration) Option { + return func(op *options) { + op.reInitDelay = t + } +} + +func WithResendDelay(t time.Duration) Option { + return func(op *options) { + op.resendDelay = t + } +} + +func WithConsumerRateLimit(t time.Duration) Option { + return func(op *options) { + op.consumerRateLimit = t + } +} + +func WithConsumerBurstSize(t int) Option { + return func(op *options) { + op.consumerBurstSize = t + } +} diff --git a/service.go b/service.go index 622e282..889e8c7 100644 --- a/service.go +++ b/service.go @@ -22,7 +22,7 @@ func (c *Client) handleReconnect(addr string) { select { case <-c.done: return - case <-time.After(c.reconnectDelay): + case <-time.After(c.opts.reconnectDelay): } continue } @@ -59,7 +59,7 @@ func (c *Client) handleReInit(conn *amqp.Connection) bool { case <-c.notifyConnClose: c.logger.Println("Connection closed. Reconnecting...") return false - case <-time.After(c.reInitDelay): + case <-time.After(c.opts.reInitDelay): } continue } @@ -129,7 +129,7 @@ func (c *Client) Push(data []byte) error { select { case <-c.done: return errShutdown - case <-time.After(c.resendDelay): + case <-time.After(c.opts.resendDelay): } continue } From 6addcca08d22dcd1e68a074b74846a0e67c840b4 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 16:04:48 +0300 Subject: [PATCH 03/26] private methods + push move --- consumer.go | 4 ++-- service.go | 31 ++----------------------------- 2 files changed, 4 insertions(+), 31 deletions(-) diff --git a/consumer.go b/consumer.go index b8d93d9..9aac126 100644 --- a/consumer.go +++ b/consumer.go @@ -13,7 +13,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) - deliveries, err := client.Consume() + deliveries, err := client.consume() if err != nil { log.Printf("Could not start consuming: %s\n", err) return @@ -34,7 +34,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { case amqErr := <-chClosedCh: log.Printf("AMQP Channel closed due to: %s\n", amqErr) - deliveries, err = client.Consume() + deliveries, err = client.consume() if err != nil { log.Println("Error trying to consume, will try again") continue diff --git a/service.go b/service.go index 889e8c7..c997307 100644 --- a/service.go +++ b/service.go @@ -2,7 +2,6 @@ package rabbit import ( "context" - "errors" amqp "github.com/rabbitmq/amqp091-go" "time" ) @@ -115,33 +114,7 @@ func (c *Client) changeChannel(channel *amqp.Channel) { c.Channel.NotifyPublish(c.notifyConfirm) } -func (c *Client) Push(data []byte) error { - c.mutex.Lock() - if !c.isReady { - c.mutex.Unlock() - return errors.New("failed to push: not connected") - } - c.mutex.Unlock() - for { - err := c.UnsafePush(data) - if err != nil { - c.logger.Println("Push failed. Retrying...") - select { - case <-c.done: - return errShutdown - case <-time.After(c.opts.resendDelay): - } - continue - } - confirm := <-c.notifyConfirm - if confirm.Ack { - c.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) - return nil - } - } -} - -func (c *Client) UnsafePush(data []byte) error { +func (c *Client) unsafePush(data []byte) error { c.mutex.Lock() if !c.isReady { c.mutex.Unlock() @@ -165,7 +138,7 @@ func (c *Client) UnsafePush(data []byte) error { ) } -func (c *Client) Consume() (<-chan amqp.Delivery, error) { +func (c *Client) consume() (<-chan amqp.Delivery, error) { c.mutex.Lock() if !c.isReady { c.mutex.Unlock() From 9366cbbf6d2dc5b257c581aa2ff1b36917758821 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 16:05:12 +0300 Subject: [PATCH 04/26] publisher added --- handler.go | 4 ++++ publisher.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 publisher.go diff --git a/handler.go b/handler.go index 6109124..a203b98 100644 --- a/handler.go +++ b/handler.go @@ -70,3 +70,7 @@ func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte return msgCh } + +func NewPublisher(client *Client) Publisher { + return &PubHandler{client: client} +} diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..69abeae --- /dev/null +++ b/publisher.go @@ -0,0 +1,40 @@ +package rabbit + +import ( + "errors" + "time" +) + +type Publisher interface { + Push(data []byte) error +} + +type PubHandler struct { + client *Client +} + +func (p *PubHandler) Push(data []byte) error { + p.client.mutex.Lock() + if !p.client.isReady { + p.client.mutex.Unlock() + return errors.New("failed to push: not connected") + } + p.client.mutex.Unlock() + for { + err := p.client.unsafePush(data) + if err != nil { + p.client.logger.Println("Push failed. Retrying...") + select { + case <-p.client.done: + return errShutdown + case <-time.After(p.client.opts.resendDelay): + } + continue + } + confirm := <-p.client.notifyConfirm + if confirm.Ack { + p.client.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) + return nil + } + } +} From 4c5b1c7030ecee1ea97fe67a4cf55a8dc06f9251 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 16:20:09 +0300 Subject: [PATCH 05/26] consumer constructor --- consumer.go | 17 +++++++++++++++++ handler.go | 13 +++++-------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/consumer.go b/consumer.go index 9aac126..323cdb8 100644 --- a/consumer.go +++ b/consumer.go @@ -7,6 +7,23 @@ import ( "log" ) +type Consumer interface { + Start() chan []byte +} + +type consumeHandler struct { + ctx context.Context + client *Client + chanLen int +} + +func (c *consumeHandler) Start() chan []byte { + msgCh := make(chan []byte, c.chanLen) + go runConsumer(c.ctx, c.client, msgCh) + + return msgCh +} + func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { runCtx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/handler.go b/handler.go index a203b98..503d9c1 100644 --- a/handler.go +++ b/handler.go @@ -64,13 +64,10 @@ func NewClient(addr, queueName string, opts ...Option) (*Client, error) { return &client, nil } -func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte { - msgCh := make(chan []byte, chanLen) - go runConsumer(ctx, client, msgCh) - - return msgCh -} - func NewPublisher(client *Client) Publisher { - return &PubHandler{client: client} + return &pubHandler{client: client} +} + +func NewConsumer(ctx context.Context, client *Client, chanLen int) Consumer { + return &consumeHandler{ctx: ctx, client: client, chanLen: chanLen} } From 23b439f0aa120d11493cd8a0fe93e5040481726b Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 16:20:19 +0300 Subject: [PATCH 06/26] privated --- publisher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/publisher.go b/publisher.go index 69abeae..45c3772 100644 --- a/publisher.go +++ b/publisher.go @@ -9,11 +9,11 @@ type Publisher interface { Push(data []byte) error } -type PubHandler struct { +type pubHandler struct { client *Client } -func (p *PubHandler) Push(data []byte) error { +func (p *pubHandler) Push(data []byte) error { p.client.mutex.Lock() if !p.client.isReady { p.client.mutex.Unlock() From 2846c1eda09727ba0aa6653cdc3d8316ebbde3c4 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 16:33:15 +0300 Subject: [PATCH 07/26] panic on empty values --- handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/handler.go b/handler.go index 503d9c1..5f8c93e 100644 --- a/handler.go +++ b/handler.go @@ -31,13 +31,13 @@ type options struct { consumerBurstSize int } -func NewClient(addr, queueName string, opts ...Option) (*Client, error) { +func NewClient(addr, queueName string, opts ...Option) *Client { if addr == "" { - return nil, errNoAddr + log.Fatal(errNoAddr) } if queueName == "" { - return nil, errNoQueue + log.Fatal(errNoQueue) } client := Client{ From 13ee3230e3453ac53281b41fafeeece2e8dd46af Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 18:29:00 +0300 Subject: [PATCH 08/26] fix --- handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/handler.go b/handler.go index 5f8c93e..7a91921 100644 --- a/handler.go +++ b/handler.go @@ -61,7 +61,7 @@ func NewClient(addr, queueName string, opts ...Option) *Client { go client.handleReconnect(addr) - return &client, nil + return &client } func NewPublisher(client *Client) Publisher { From 42646a6a08ba1cafb3c029ab01b4069f696bdd75 Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 21 Feb 2026 15:03:43 +0300 Subject: [PATCH 09/26] reconnect timer + fixes --- consumer.go | 67 +++++++++++++++++++++++++++++++++++++++------------- handler.go | 5 ++-- publisher.go | 20 ++++++++++++++-- 3 files changed, 71 insertions(+), 21 deletions(-) diff --git a/consumer.go b/consumer.go index 323cdb8..20c1317 100644 --- a/consumer.go +++ b/consumer.go @@ -5,21 +5,20 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" "log" + "time" ) type Consumer interface { - Start() chan []byte + Start(ctx context.Context, chanLen uint) chan []byte } type consumeHandler struct { - ctx context.Context - client *Client - chanLen int + client *Client } -func (c *consumeHandler) Start() chan []byte { - msgCh := make(chan []byte, c.chanLen) - go runConsumer(c.ctx, c.client, msgCh) +func (c *consumeHandler) Start(ctx context.Context, chanLen uint) chan []byte { + msgCh := make(chan []byte, chanLen) + go runConsumer(ctx, c.client, msgCh) return msgCh } @@ -39,8 +38,20 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { chClosedCh := make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) + reconnectTimer := time.NewTimer(0) + defer reconnectTimer.Stop() + <-reconnectTimer.C + for { + if !reconnectTimer.Stop() { + select { + case <-reconnectTimer.C: + default: + } + } + select { + case <-runCtx.Done(): err = client.Close() if err != nil { @@ -49,25 +60,49 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { return case amqErr := <-chClosedCh: - log.Printf("AMQP Channel closed due to: %s\n", amqErr) + log.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + reconnectTimer.Reset(time.Second) + case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - log.Println("Error trying to consume, will try again") + log.Println("Error trying to consume, will try again. Retry in 5 seconds.") + reconnectTimer.Reset(time.Second * 5) continue } chClosedCh = make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) - case delivery := <-deliveries: - msgCh <- delivery.Body - log.Printf("Received message: %s\n", delivery.Body) - - if err = delivery.Ack(false); err != nil { - log.Printf("Error acknowledging message: %s\n", err) + case delivery, ok := <-deliveries: + if !ok { + log.Println("Deliveries channel closed unexpectedly") + reconnectTimer.Reset(time.Second) + continue + } + + if err = limiter.Wait(runCtx); err != nil { + log.Printf("Wait limiter failed: %s\n", err) + } + + select { + case <-runCtx.Done(): + if err = delivery.Nack(false, true); err != nil { + log.Printf("Error nacking message: %s\n", err) + } + + err = client.Close() + if err != nil { + log.Printf("Close failed: %s\n", err) + } + return + + case msgCh <- delivery.Body: + log.Printf("Received message: %s\n", delivery.Body) + if err = delivery.Ack(false); err != nil { + log.Printf("Error acknowledging message: %s\n", err) + } } - limiter.Wait(runCtx) } } } diff --git a/handler.go b/handler.go index 7a91921..949887f 100644 --- a/handler.go +++ b/handler.go @@ -1,7 +1,6 @@ package rabbit import ( - "context" amqp "github.com/rabbitmq/amqp091-go" "log" "os" @@ -68,6 +67,6 @@ func NewPublisher(client *Client) Publisher { return &pubHandler{client: client} } -func NewConsumer(ctx context.Context, client *Client, chanLen int) Consumer { - return &consumeHandler{ctx: ctx, client: client, chanLen: chanLen} +func NewConsumer(client *Client) Consumer { + return &consumeHandler{client: client} } diff --git a/publisher.go b/publisher.go index 45c3772..e71a9b8 100644 --- a/publisher.go +++ b/publisher.go @@ -2,18 +2,34 @@ package rabbit import ( "errors" + "log" "time" ) type Publisher interface { - Push(data []byte) error + Start(chanLen uint) <-chan []byte } type pubHandler struct { client *Client } -func (p *pubHandler) Push(data []byte) error { +func (p *pubHandler) Start(chanLen uint) <-chan []byte { + ch := make(chan []byte, chanLen) + go func() { + for { + select { + case msg := <-ch: + if err := p.push(msg); err != nil { + log.Printf("Error publishing message: %s", err) + } + } + } + }() + return ch +} + +func (p *pubHandler) push(data []byte) error { p.client.mutex.Lock() if !p.client.isReady { p.client.mutex.Unlock() From 40d1d3dd298d54a7cb8584af939dedde505f3217 Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 21 Feb 2026 15:54:50 +0300 Subject: [PATCH 10/26] ctx + write only chan --- publisher.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/publisher.go b/publisher.go index e71a9b8..0743889 100644 --- a/publisher.go +++ b/publisher.go @@ -1,24 +1,34 @@ package rabbit import ( + "context" "errors" "log" "time" ) type Publisher interface { - Start(chanLen uint) <-chan []byte + Start(chanLen uint) chan<- []byte } type pubHandler struct { client *Client } -func (p *pubHandler) Start(chanLen uint) <-chan []byte { +func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte { ch := make(chan []byte, chanLen) go func() { for { select { + case <-ctx.Done(): + for msg := range ch { + if err := p.push(msg); err != nil { + log.Printf("Error publishing message (shutdown): %s", err) + } + } + log.Println("Publisher stopped") + return + case msg := <-ch: if err := p.push(msg); err != nil { log.Printf("Error publishing message: %s", err) From c8a6a7f8fae6fbeec811b7f846578fa8864f887c Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 21 Feb 2026 15:55:29 +0300 Subject: [PATCH 11/26] read only chan --- consumer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consumer.go b/consumer.go index 20c1317..22dde93 100644 --- a/consumer.go +++ b/consumer.go @@ -9,14 +9,14 @@ import ( ) type Consumer interface { - Start(ctx context.Context, chanLen uint) chan []byte + Start(ctx context.Context, chanLen uint) <-chan []byte } type consumeHandler struct { client *Client } -func (c *consumeHandler) Start(ctx context.Context, chanLen uint) chan []byte { +func (c *consumeHandler) Start(ctx context.Context, chanLen uint) <-chan []byte { msgCh := make(chan []byte, chanLen) go runConsumer(ctx, c.client, msgCh) From 46e07b7577f8d57fed2827f668866ad196119ce8 Mon Sep 17 00:00:00 2001 From: nquidox Date: Sat, 21 Feb 2026 16:07:19 +0300 Subject: [PATCH 12/26] bugfix --- publisher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/publisher.go b/publisher.go index 0743889..a65154a 100644 --- a/publisher.go +++ b/publisher.go @@ -8,7 +8,7 @@ import ( ) type Publisher interface { - Start(chanLen uint) chan<- []byte + Start(ctx context.Context, chanLen uint) chan<- []byte } type pubHandler struct { From 9d837385b1e9968864e11a31811cc41e285e5c77 Mon Sep 17 00:00:00 2001 From: nquidox Date: Thu, 2 Apr 2026 15:36:37 +0300 Subject: [PATCH 13/26] connection timeout --- handler.go | 26 ++++++++++++++++++++++++-- options.go | 6 ++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/handler.go b/handler.go index 949887f..9383673 100644 --- a/handler.go +++ b/handler.go @@ -1,8 +1,10 @@ package rabbit import ( + "fmt" amqp "github.com/rabbitmq/amqp091-go" "log" + "net" "os" "sync" "time" @@ -20,9 +22,11 @@ type Client struct { notifyConfirm chan amqp.Confirmation isReady bool opts options + connected chan struct{} } type options struct { + connectTimeout time.Duration reconnectDelay time.Duration reInitDelay time.Duration resendDelay time.Duration @@ -30,7 +34,7 @@ type options struct { consumerBurstSize int } -func NewClient(addr, queueName string, opts ...Option) *Client { +func NewClient(addr, queueName string, opts ...Option) (*Client, error) { if addr == "" { log.Fatal(errNoAddr) } @@ -47,6 +51,7 @@ func NewClient(addr, queueName string, opts ...Option) *Client { } o := options{ + connectTimeout: 15 * time.Second, reconnectDelay: 5, reInitDelay: 2, resendDelay: 5, @@ -58,9 +63,13 @@ func NewClient(addr, queueName string, opts ...Option) *Client { opt(&o) } + if err := client.connectAndSignal(addr, o.connectTimeout); err != nil { + return nil, fmt.Errorf("failed to connect: %w", err) + } + go client.handleReconnect(addr) - return &client + return &client, nil } func NewPublisher(client *Client) Publisher { @@ -70,3 +79,16 @@ func NewPublisher(client *Client) Publisher { func NewConsumer(client *Client) Consumer { return &consumeHandler{client: client} } + +func (c *Client) connectAndSignal(addr string, timeout time.Duration) error { + dialer := &net.Dialer{Timeout: timeout} + conn, err := amqp.DialConfig(addr, amqp.Config{ + Dial: dialer.Dial, + }) + if err != nil { + return err + } + c.connection = conn + close(c.connected) + return nil +} diff --git a/options.go b/options.go index aaa7828..37ad97c 100644 --- a/options.go +++ b/options.go @@ -4,6 +4,12 @@ import "time" type Option func(*options) +func WithConnectTimeout(t time.Duration) Option { + return func(op *options) { + op.connectTimeout = t + } +} + func WithReconnectDelay(t time.Duration) Option { return func(op *options) { op.reconnectDelay = t From d62d41b41a4dcdd307d3280d8a9482cf2dd9184a Mon Sep 17 00:00:00 2001 From: nquidox Date: Thu, 2 Apr 2026 15:46:21 +0300 Subject: [PATCH 14/26] chan fix --- handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/handler.go b/handler.go index 9383673..249221f 100644 --- a/handler.go +++ b/handler.go @@ -48,6 +48,7 @@ func NewClient(addr, queueName string, opts ...Option) (*Client, error) { logger: log.New(os.Stdout, "", log.LstdFlags), queueName: queueName, done: make(chan bool), + connected: make(chan struct{}), } o := options{ From ecda63761ccbc581fec4873e5f4c2b0012b21d77 Mon Sep 17 00:00:00 2001 From: nquidox Date: Thu, 2 Apr 2026 15:50:55 +0300 Subject: [PATCH 15/26] different type of check connection --- handler.go | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/handler.go b/handler.go index 249221f..e54146b 100644 --- a/handler.go +++ b/handler.go @@ -4,7 +4,6 @@ import ( "fmt" amqp "github.com/rabbitmq/amqp091-go" "log" - "net" "os" "sync" "time" @@ -82,14 +81,26 @@ func NewConsumer(client *Client) Consumer { } func (c *Client) connectAndSignal(addr string, timeout time.Duration) error { - dialer := &net.Dialer{Timeout: timeout} - conn, err := amqp.DialConfig(addr, amqp.Config{ - Dial: dialer.Dial, - }) - if err != nil { - return err + type result struct { + conn *amqp.Connection + err error + } + resCh := make(chan result, 1) + + go func() { + conn, err := amqp.Dial(addr) + resCh <- result{conn, err} + }() + + select { + case <-time.After(timeout): + return fmt.Errorf("connection timeout after %v", timeout) + case res := <-resCh: + if res.err != nil { + return res.err + } + c.connection = res.conn + close(c.connected) + return nil } - c.connection = conn - close(c.connected) - return nil } From 8f967361e88ba8947f641633710399aa4910667a Mon Sep 17 00:00:00 2001 From: nquidox Date: Thu, 2 Apr 2026 16:09:17 +0300 Subject: [PATCH 16/26] connection check --- handler.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/handler.go b/handler.go index e54146b..61dc5dc 100644 --- a/handler.go +++ b/handler.go @@ -99,7 +99,13 @@ func (c *Client) connectAndSignal(addr string, timeout time.Duration) error { if res.err != nil { return res.err } - c.connection = res.conn + + c.changeConnection(res.conn) + if err := c.init(res.conn); err != nil { + res.conn.Close() + return fmt.Errorf("init failed: %w", err) + } + close(c.connected) return nil } From 5e8661cfcd9a30c8febece21a757649ab5f01209 Mon Sep 17 00:00:00 2001 From: nquidox Date: Thu, 2 Apr 2026 16:15:36 +0300 Subject: [PATCH 17/26] isReady disabled --- service.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/service.go b/service.go index c997307..cf6ccf1 100644 --- a/service.go +++ b/service.go @@ -8,9 +8,9 @@ import ( func (c *Client) handleReconnect(addr string) { for { - c.mutex.Lock() - c.isReady = false - c.mutex.Unlock() + //c.mutex.Lock() + //c.isReady = false + //c.mutex.Unlock() c.logger.Println("Connecting to server...") @@ -45,9 +45,9 @@ func (c *Client) connect(addr string) (*amqp.Connection, error) { func (c *Client) handleReInit(conn *amqp.Connection) bool { for { - c.mutex.Lock() - c.isReady = false - c.mutex.Unlock() + //c.mutex.Lock() + //c.isReady = false + //c.mutex.Unlock() if err := c.init(conn); err != nil { c.logger.Printf("Failed to initialize connection: %s", err) From f23b3886890d8c38bf446ac377660176ca41e7df Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 10:27:01 +0300 Subject: [PATCH 18/26] mutex removed --- service.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/service.go b/service.go index cf6ccf1..6e3355d 100644 --- a/service.go +++ b/service.go @@ -8,10 +8,6 @@ import ( func (c *Client) handleReconnect(addr string) { for { - //c.mutex.Lock() - //c.isReady = false - //c.mutex.Unlock() - c.logger.Println("Connecting to server...") conn, err := c.connect(addr) @@ -45,10 +41,6 @@ func (c *Client) connect(addr string) (*amqp.Connection, error) { func (c *Client) handleReInit(conn *amqp.Connection) bool { for { - //c.mutex.Lock() - //c.isReady = false - //c.mutex.Unlock() - if err := c.init(conn); err != nil { c.logger.Printf("Failed to initialize connection: %s", err) From 4810a8666e72981f9fc646fa227e1b1030aaa4f8 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:20:53 +0300 Subject: [PATCH 19/26] msg change --- messages.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/messages.go b/messages.go index 0f634d1..fd422ab 100644 --- a/messages.go +++ b/messages.go @@ -3,7 +3,7 @@ package rabbit import "errors" var ( - errNoAddr = errors.New("no address") + errBadAddr = errors.New("bad address") errNoQueue = errors.New("no queue") errNotConnected = errors.New("not connected to a server") errAlreadyClosed = errors.New("already closed: not connected to the server") From e58b052251ad6b9c89aac62ada6d679b9a49bde1 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:21:13 +0300 Subject: [PATCH 20/26] logging options added --- options.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/options.go b/options.go index 37ad97c..3e84f95 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,11 @@ package rabbit -import "time" +import ( + "io" + "log" + "os" + "time" +) type Option func(*options) @@ -39,3 +44,17 @@ func WithConsumerBurstSize(t int) Option { op.consumerBurstSize = t } } + +func WithLogger(l *log.Logger) Option { + return func(op *options) { op.logger = l } +} + +func WithLogging(enabled bool) Option { + return func(op *options) { + if enabled { + op.logger = log.New(os.Stdout, "", log.LstdFlags) + } else { + op.logger = log.New(io.Discard, "", 0) + } + } +} From 44d99a4ba560caaaa94fa154c0dcdf65fba8b812 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:21:24 +0300 Subject: [PATCH 21/26] address maker --- address.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 address.go diff --git a/address.go b/address.go new file mode 100644 index 0000000..dadf5d3 --- /dev/null +++ b/address.go @@ -0,0 +1,34 @@ +package rabbit + +import ( + "errors" + "fmt" + "net/url" +) + +type Address struct { + Username string + Password string + Host string + Port uint16 + Vhost string +} + +func (a *Address) makeAddr() (string, error) { + if a.Host == "" { + return "", errors.New("no host provided") + } + + if a.Vhost == "" { + return "", errors.New("no vhost provided") + } + + //"amqp://username:password@host:port/vhost" + return fmt.Sprintf("amqp://%v:%v@%v:%v/%v", + url.QueryEscape(a.Username), + url.QueryEscape(a.Password), + a.Host, + a.Port, + url.PathEscape(a.Vhost), + ), nil +} From a2862350b37c2c6ae98c4a736be1f3c46f522f9c Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Apr 2026 11:22:13 +0300 Subject: [PATCH 22/26] added address, logging + time fixes --- handler.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/handler.go b/handler.go index 61dc5dc..63145de 100644 --- a/handler.go +++ b/handler.go @@ -1,10 +1,11 @@ package rabbit import ( + "errors" "fmt" amqp "github.com/rabbitmq/amqp091-go" + "io" "log" - "os" "sync" "time" ) @@ -31,11 +32,13 @@ type options struct { resendDelay time.Duration consumerRateLimit time.Duration consumerBurstSize int + logger *log.Logger } -func NewClient(addr, queueName string, opts ...Option) (*Client, error) { - if addr == "" { - log.Fatal(errNoAddr) +func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { + addr, err := address.makeAddr() + if err != nil { + return nil, errors.Join(errBadAddr, err) } if queueName == "" { @@ -44,7 +47,6 @@ func NewClient(addr, queueName string, opts ...Option) (*Client, error) { client := Client{ mutex: &sync.Mutex{}, - logger: log.New(os.Stdout, "", log.LstdFlags), queueName: queueName, done: make(chan bool), connected: make(chan struct{}), @@ -52,18 +54,21 @@ func NewClient(addr, queueName string, opts ...Option) (*Client, error) { o := options{ connectTimeout: 15 * time.Second, - reconnectDelay: 5, - reInitDelay: 2, - resendDelay: 5, + reconnectDelay: 5 * time.Second, + reInitDelay: 2 * time.Second, + resendDelay: 5 * time.Second, consumerRateLimit: time.Millisecond * 500, consumerBurstSize: 10, + logger: log.New(io.Discard, "", 0), } for _, opt := range opts { opt(&o) } - if err := client.connectAndSignal(addr, o.connectTimeout); err != nil { + client.logger = o.logger + + if err = client.connectAndSignal(addr, o.connectTimeout); err != nil { return nil, fmt.Errorf("failed to connect: %w", err) } From 154320d288ad2efec9a6cc435ddf843610ac285c Mon Sep 17 00:00:00 2001 From: nquidox Date: Wed, 8 Apr 2026 12:05:39 +0300 Subject: [PATCH 23/26] logger fix --- consumer.go | 21 ++++++++++----------- handler.go | 6 +++++- publisher.go | 6 +++--- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/consumer.go b/consumer.go index 22dde93..6f125c7 100644 --- a/consumer.go +++ b/consumer.go @@ -4,7 +4,6 @@ import ( "context" amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" - "log" "time" ) @@ -31,7 +30,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { deliveries, err := client.consume() if err != nil { - log.Printf("Could not start consuming: %s\n", err) + client.logger.Printf("Could not start consuming: %s\n", err) return } @@ -55,18 +54,18 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { case <-runCtx.Done(): err = client.Close() if err != nil { - log.Printf("Close failed: %s\n", err) + client.logger.Printf("Close failed: %s\n", err) } return case amqErr := <-chClosedCh: - log.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) reconnectTimer.Reset(time.Second) case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - log.Println("Error trying to consume, will try again. Retry in 5 seconds.") + client.logger.Println("Error trying to consume, will try again. Retry in 5 seconds.") reconnectTimer.Reset(time.Second * 5) continue } @@ -76,31 +75,31 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { case delivery, ok := <-deliveries: if !ok { - log.Println("Deliveries channel closed unexpectedly") + client.logger.Println("Deliveries channel closed unexpectedly") reconnectTimer.Reset(time.Second) continue } if err = limiter.Wait(runCtx); err != nil { - log.Printf("Wait limiter failed: %s\n", err) + client.logger.Printf("Wait limiter failed: %s\n", err) } select { case <-runCtx.Done(): if err = delivery.Nack(false, true); err != nil { - log.Printf("Error nacking message: %s\n", err) + client.logger.Printf("Error nacking message: %s\n", err) } err = client.Close() if err != nil { - log.Printf("Close failed: %s\n", err) + client.logger.Printf("Close failed: %s\n", err) } return case msgCh <- delivery.Body: - log.Printf("Received message: %s\n", delivery.Body) + client.logger.Printf("Received message: %s\n", delivery.Body) if err = delivery.Ack(false); err != nil { - log.Printf("Error acknowledging message: %s\n", err) + client.logger.Printf("Error acknowledging message: %s\n", err) } } } diff --git a/handler.go b/handler.go index 63145de..68f5815 100644 --- a/handler.go +++ b/handler.go @@ -6,6 +6,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "io" "log" + "os" "sync" "time" ) @@ -36,13 +37,15 @@ type options struct { } func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { + l := log.New(os.Stdout, "", log.LstdFlags) + addr, err := address.makeAddr() if err != nil { return nil, errors.Join(errBadAddr, err) } if queueName == "" { - log.Fatal(errNoQueue) + l.Fatal(errNoQueue) } client := Client{ @@ -50,6 +53,7 @@ func NewClient(address Address, queueName string, opts ...Option) (*Client, erro queueName: queueName, done: make(chan bool), connected: make(chan struct{}), + logger: l, } o := options{ diff --git a/publisher.go b/publisher.go index a65154a..ccb415f 100644 --- a/publisher.go +++ b/publisher.go @@ -23,15 +23,15 @@ func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte { case <-ctx.Done(): for msg := range ch { if err := p.push(msg); err != nil { - log.Printf("Error publishing message (shutdown): %s", err) + p.client.logger.Printf("Error publishing message (shutdown): %s", err) } } - log.Println("Publisher stopped") + p.client.logger.Println("Publisher stopped") return case msg := <-ch: if err := p.push(msg); err != nil { - log.Printf("Error publishing message: %s", err) + p.client.logger.Printf("Error publishing message: %s", err) } } } From 9b9d8a306405910a6da1e7406ad0f8f505e64929 Mon Sep 17 00:00:00 2001 From: nquidox Date: Wed, 8 Apr 2026 12:21:19 +0300 Subject: [PATCH 24/26] removed unused import --- publisher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/publisher.go b/publisher.go index ccb415f..c938eb5 100644 --- a/publisher.go +++ b/publisher.go @@ -3,7 +3,6 @@ package rabbit import ( "context" "errors" - "log" "time" ) From fd495892faae5b3880fba6656fbc60a8097c244e Mon Sep 17 00:00:00 2001 From: nquidox Date: Mon, 27 Apr 2026 00:06:46 +0300 Subject: [PATCH 25/26] queue opts --- handler.go | 25 +++++++++++++++++-------- service.go | 15 ++++++++++++--- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/handler.go b/handler.go index 68f5815..6a8dd3f 100644 --- a/handler.go +++ b/handler.go @@ -13,7 +13,7 @@ import ( type Client struct { mutex *sync.Mutex - queueName string + queueOptions *QueueOpts logger *log.Logger connection *amqp.Connection Channel *amqp.Channel @@ -36,7 +36,16 @@ type options struct { logger *log.Logger } -func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { +type QueueOpts struct { + QueueName string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Args amqp.Table +} + +func NewClient(address Address, queueOpts QueueOpts, opts ...Option) (*Client, error) { l := log.New(os.Stdout, "", log.LstdFlags) addr, err := address.makeAddr() @@ -44,16 +53,16 @@ func NewClient(address Address, queueName string, opts ...Option) (*Client, erro return nil, errors.Join(errBadAddr, err) } - if queueName == "" { + if queueOpts.QueueName == "" { l.Fatal(errNoQueue) } client := Client{ - mutex: &sync.Mutex{}, - queueName: queueName, - done: make(chan bool), - connected: make(chan struct{}), - logger: l, + mutex: &sync.Mutex{}, + queueOptions: &queueOpts, + done: make(chan bool), + connected: make(chan struct{}), + logger: l, } o := options{ diff --git a/service.go b/service.go index 6e3355d..c3f9ec8 100644 --- a/service.go +++ b/service.go @@ -78,7 +78,16 @@ func (c *Client) init(conn *amqp.Connection) error { return err } - _, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) + //_, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) + _, err = ch.QueueDeclare( + c.queueOptions.QueueName, + c.queueOptions.Durable, + c.queueOptions.AutoDelete, + c.queueOptions.Exclusive, + c.queueOptions.NoWait, + c.queueOptions.Args, + ) + if err != nil { return err } @@ -120,7 +129,7 @@ func (c *Client) unsafePush(data []byte) error { return c.Channel.PublishWithContext( ctx, "", - c.queueName, + c.queueOptions.QueueName, false, false, amqp.Publishing{ @@ -147,7 +156,7 @@ func (c *Client) consume() (<-chan amqp.Delivery, error) { } return c.Channel.Consume( - c.queueName, + c.queueOptions.QueueName, "", false, false, From 72f039e4f7819930e5e6ed3faa6aa10ba81392aa Mon Sep 17 00:00:00 2001 From: nquidox Date: Mon, 4 May 2026 11:50:59 +0300 Subject: [PATCH 26/26] reconnect bugfix --- consumer.go | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/consumer.go b/consumer.go index 6f125c7..5c68bf6 100644 --- a/consumer.go +++ b/consumer.go @@ -28,6 +28,15 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) + reconnectSignal := make(chan struct{}, 1) + reconnectTrigger := func() { + select { + case reconnectSignal <- struct{}{}: + default: + } + } + + // initial consume deliveries, err := client.consume() if err != nil { client.logger.Printf("Could not start consuming: %s\n", err) @@ -37,18 +46,7 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { chClosedCh := make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) - reconnectTimer := time.NewTimer(0) - defer reconnectTimer.Stop() - <-reconnectTimer.C - for { - if !reconnectTimer.Stop() { - select { - case <-reconnectTimer.C: - default: - } - } - select { case <-runCtx.Done(): @@ -58,25 +56,33 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { } return - case amqErr := <-chClosedCh: - client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) - reconnectTimer.Reset(time.Second) + case <-reconnectSignal: + select { + case <-runCtx.Done(): + return + case <-time.After(client.opts.reconnectDelay): + } - case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - client.logger.Println("Error trying to consume, will try again. Retry in 5 seconds.") - reconnectTimer.Reset(time.Second * 5) + client.logger.Printf("Consume reconnect failed, retry: %s\n", err) + reconnectTrigger() continue } + // re-create closing chan chClosedCh = make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) + client.logger.Println("Consume reconnect success") + + case amqErr := <-chClosedCh: + client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + reconnectTrigger() case delivery, ok := <-deliveries: if !ok { client.logger.Println("Deliveries channel closed unexpectedly") - reconnectTimer.Reset(time.Second) + reconnectTrigger() continue }