From 4c5b1c7030ecee1ea97fe67a4cf55a8dc06f9251 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 20 Feb 2026 16:20:09 +0300 Subject: [PATCH] consumer constructor --- consumer.go | 17 +++++++++++++++++ handler.go | 13 +++++-------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/consumer.go b/consumer.go index 9aac126..323cdb8 100644 --- a/consumer.go +++ b/consumer.go @@ -7,6 +7,23 @@ import ( "log" ) +type Consumer interface { + Start() chan []byte +} + +type consumeHandler struct { + ctx context.Context + client *Client + chanLen int +} + +func (c *consumeHandler) Start() chan []byte { + msgCh := make(chan []byte, c.chanLen) + go runConsumer(c.ctx, c.client, msgCh) + + return msgCh +} + func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { runCtx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/handler.go b/handler.go index a203b98..503d9c1 100644 --- a/handler.go +++ b/handler.go @@ -64,13 +64,10 @@ func NewClient(addr, queueName string, opts ...Option) (*Client, error) { return &client, nil } -func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte { - msgCh := make(chan []byte, chanLen) - go runConsumer(ctx, client, msgCh) - - return msgCh -} - func NewPublisher(client *Client) Publisher { - return &PubHandler{client: client} + return &pubHandler{client: client} +} + +func NewConsumer(ctx context.Context, client *Client, chanLen int) Consumer { + return &consumeHandler{ctx: ctx, client: client, chanLen: chanLen} }