diff --git a/consumer.go b/consumer.go index 8a17f4a..b8d93d9 100644 --- a/consumer.go +++ b/consumer.go @@ -5,28 +5,27 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" "log" - "time" ) -func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { +func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { runCtx, cancel := context.WithCancel(ctx) defer cancel() - limiter := rate.NewLimiter(rate.Every(time.Millisecond*500), 100) + limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) - deliveries, err := queue.Consume() + deliveries, err := client.Consume() if err != nil { log.Printf("Could not start consuming: %s\n", err) return } chClosedCh := make(chan *amqp.Error, 1) - queue.Channel.NotifyClose(chClosedCh) + client.Channel.NotifyClose(chClosedCh) for { select { case <-runCtx.Done(): - err = queue.Close() + err = client.Close() if err != nil { log.Printf("Close failed: %s\n", err) } @@ -35,14 +34,14 @@ func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { case amqErr := <-chClosedCh: log.Printf("AMQP Channel closed due to: %s\n", amqErr) - deliveries, err = queue.Consume() + deliveries, err = client.Consume() if err != nil { log.Println("Error trying to consume, will try again") continue } chClosedCh = make(chan *amqp.Error, 1) - queue.Channel.NotifyClose(chClosedCh) + client.Channel.NotifyClose(chClosedCh) case delivery := <-deliveries: msgCh <- delivery.Body diff --git a/handler.go b/handler.go index eacd93b..6109124 100644 --- a/handler.go +++ b/handler.go @@ -2,7 +2,6 @@ package rabbit import ( "context" - "errors" amqp "github.com/rabbitmq/amqp091-go" "log" "os" @@ -21,45 +20,48 @@ type Client struct { notifyChanClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isReady bool - reconnectDelay time.Duration - reInitDelay time.Duration - resendDelay time.Duration + opts options } -//const ( -// reconnectDelay = 5 * time.Second -// reInitDelay = 2 * time.Second -// resendDelay = 5 * time.Second -//) - -type Deps struct { - ReconnectDelay time.Duration - ReInitDelay time.Duration - ResendDelay time.Duration - QueueName string - Addr string +type options struct { + reconnectDelay time.Duration + reInitDelay time.Duration + resendDelay time.Duration + consumerRateLimit time.Duration + consumerBurstSize int } -var ( - errNotConnected = errors.New("not connected to a server") - errAlreadyClosed = errors.New("already closed: not connected to the server") - errShutdown = errors.New("client is shutting down") -) - -func NewClient(deps Deps) *Client { - client := Client{ - mutex: &sync.Mutex{}, - logger: log.New(os.Stdout, "", log.LstdFlags), - queueName: deps.QueueName, - done: make(chan bool), - reconnectDelay: deps.ReconnectDelay, - reInitDelay: deps.ReInitDelay, - resendDelay: deps.ResendDelay, +func NewClient(addr, queueName string, opts ...Option) (*Client, error) { + if addr == "" { + return nil, errNoAddr } - go client.handleReconnect(deps.Addr) + if queueName == "" { + return nil, errNoQueue + } - return &client + client := Client{ + mutex: &sync.Mutex{}, + logger: log.New(os.Stdout, "", log.LstdFlags), + queueName: queueName, + done: make(chan bool), + } + + o := options{ + reconnectDelay: 5, + reInitDelay: 2, + resendDelay: 5, + consumerRateLimit: time.Millisecond * 500, + consumerBurstSize: 10, + } + + for _, opt := range opts { + opt(&o) + } + + go client.handleReconnect(addr) + + return &client, nil } func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte { diff --git a/messages.go b/messages.go new file mode 100644 index 0000000..0f634d1 --- /dev/null +++ b/messages.go @@ -0,0 +1,11 @@ +package rabbit + +import "errors" + +var ( + errNoAddr = errors.New("no address") + errNoQueue = errors.New("no queue") + errNotConnected = errors.New("not connected to a server") + errAlreadyClosed = errors.New("already closed: not connected to the server") + errShutdown = errors.New("client is shutting down") +) diff --git a/options.go b/options.go new file mode 100644 index 0000000..aaa7828 --- /dev/null +++ b/options.go @@ -0,0 +1,35 @@ +package rabbit + +import "time" + +type Option func(*options) + +func WithReconnectDelay(t time.Duration) Option { + return func(op *options) { + op.reconnectDelay = t + } +} + +func WithReInitDelay(t time.Duration) Option { + return func(op *options) { + op.reInitDelay = t + } +} + +func WithResendDelay(t time.Duration) Option { + return func(op *options) { + op.resendDelay = t + } +} + +func WithConsumerRateLimit(t time.Duration) Option { + return func(op *options) { + op.consumerRateLimit = t + } +} + +func WithConsumerBurstSize(t int) Option { + return func(op *options) { + op.consumerBurstSize = t + } +} diff --git a/service.go b/service.go index 622e282..889e8c7 100644 --- a/service.go +++ b/service.go @@ -22,7 +22,7 @@ func (c *Client) handleReconnect(addr string) { select { case <-c.done: return - case <-time.After(c.reconnectDelay): + case <-time.After(c.opts.reconnectDelay): } continue } @@ -59,7 +59,7 @@ func (c *Client) handleReInit(conn *amqp.Connection) bool { case <-c.notifyConnClose: c.logger.Println("Connection closed. Reconnecting...") return false - case <-time.After(c.reInitDelay): + case <-time.After(c.opts.reInitDelay): } continue } @@ -129,7 +129,7 @@ func (c *Client) Push(data []byte) error { select { case <-c.done: return errShutdown - case <-time.After(c.resendDelay): + case <-time.After(c.opts.resendDelay): } continue }