diff --git a/address.go b/address.go new file mode 100644 index 0000000..dadf5d3 --- /dev/null +++ b/address.go @@ -0,0 +1,34 @@ +package rabbit + +import ( + "errors" + "fmt" + "net/url" +) + +type Address struct { + Username string + Password string + Host string + Port uint16 + Vhost string +} + +func (a *Address) makeAddr() (string, error) { + if a.Host == "" { + return "", errors.New("no host provided") + } + + if a.Vhost == "" { + return "", errors.New("no vhost provided") + } + + //"amqp://username:password@host:port/vhost" + return fmt.Sprintf("amqp://%v:%v@%v:%v/%v", + url.QueryEscape(a.Username), + url.QueryEscape(a.Password), + a.Host, + a.Port, + url.PathEscape(a.Vhost), + ), nil +} diff --git a/consumer.go b/consumer.go index 8a17f4a..5c68bf6 100644 --- a/consumer.go +++ b/consumer.go @@ -4,54 +4,110 @@ import ( "context" amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" - "log" "time" ) -func runConsumer(ctx context.Context, queue *Client, msgCh chan []byte) { +type Consumer interface { + Start(ctx context.Context, chanLen uint) <-chan []byte +} + +type consumeHandler struct { + client *Client +} + +func (c *consumeHandler) Start(ctx context.Context, chanLen uint) <-chan []byte { + msgCh := make(chan []byte, chanLen) + go runConsumer(ctx, c.client, msgCh) + + return msgCh +} + +func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { runCtx, cancel := context.WithCancel(ctx) defer cancel() - limiter := rate.NewLimiter(rate.Every(time.Millisecond*500), 100) + limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) - deliveries, err := queue.Consume() + reconnectSignal := make(chan struct{}, 1) + reconnectTrigger := func() { + select { + case reconnectSignal <- struct{}{}: + default: + } + } + + // initial consume + deliveries, err := client.consume() if err != nil { - log.Printf("Could not start consuming: %s\n", err) + client.logger.Printf("Could not start consuming: %s\n", err) return } chClosedCh := make(chan *amqp.Error, 1) - queue.Channel.NotifyClose(chClosedCh) + client.Channel.NotifyClose(chClosedCh) for { select { + case <-runCtx.Done(): - err = queue.Close() + err = client.Close() if err != nil { - log.Printf("Close failed: %s\n", err) + client.logger.Printf("Close failed: %s\n", err) } return - case amqErr := <-chClosedCh: - log.Printf("AMQP Channel closed due to: %s\n", amqErr) + case <-reconnectSignal: + select { + case <-runCtx.Done(): + return + case <-time.After(client.opts.reconnectDelay): + } - deliveries, err = queue.Consume() + deliveries, err = client.consume() if err != nil { - log.Println("Error trying to consume, will try again") + client.logger.Printf("Consume reconnect failed, retry: %s\n", err) + reconnectTrigger() continue } + // re-create closing chan chClosedCh = make(chan *amqp.Error, 1) - queue.Channel.NotifyClose(chClosedCh) + client.Channel.NotifyClose(chClosedCh) + client.logger.Println("Consume reconnect success") - case delivery := <-deliveries: - msgCh <- delivery.Body - log.Printf("Received message: %s\n", delivery.Body) + case amqErr := <-chClosedCh: + client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + reconnectTrigger() - if err = delivery.Ack(false); err != nil { - log.Printf("Error acknowledging message: %s\n", err) + case delivery, ok := <-deliveries: + if !ok { + client.logger.Println("Deliveries channel closed unexpectedly") + reconnectTrigger() + continue + } + + if err = limiter.Wait(runCtx); err != nil { + client.logger.Printf("Wait limiter failed: %s\n", err) + } + + select { + case <-runCtx.Done(): + if err = delivery.Nack(false, true); err != nil { + client.logger.Printf("Error nacking message: %s\n", err) + } + + err = client.Close() + if err != nil { + client.logger.Printf("Close failed: %s\n", err) + } + return + + case msgCh <- delivery.Body: + client.logger.Printf("Received message: %s\n", delivery.Body) + if err = delivery.Ack(false); err != nil { + client.logger.Printf("Error acknowledging message: %s\n", err) + } } - limiter.Wait(runCtx) } } } diff --git a/handler.go b/handler.go index eacd93b..6a8dd3f 100644 --- a/handler.go +++ b/handler.go @@ -1,9 +1,10 @@ package rabbit import ( - "context" "errors" + "fmt" amqp "github.com/rabbitmq/amqp091-go" + "io" "log" "os" "sync" @@ -12,7 +13,7 @@ import ( type Client struct { mutex *sync.Mutex - queueName string + queueOptions *QueueOpts logger *log.Logger connection *amqp.Connection Channel *amqp.Channel @@ -21,50 +22,109 @@ type Client struct { notifyChanClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isReady bool - reconnectDelay time.Duration - reInitDelay time.Duration - resendDelay time.Duration + opts options + connected chan struct{} } -//const ( -// reconnectDelay = 5 * time.Second -// reInitDelay = 2 * time.Second -// resendDelay = 5 * time.Second -//) - -type Deps struct { - ReconnectDelay time.Duration - ReInitDelay time.Duration - ResendDelay time.Duration - QueueName string - Addr string +type options struct { + connectTimeout time.Duration + reconnectDelay time.Duration + reInitDelay time.Duration + resendDelay time.Duration + consumerRateLimit time.Duration + consumerBurstSize int + logger *log.Logger } -var ( - errNotConnected = errors.New("not connected to a server") - errAlreadyClosed = errors.New("already closed: not connected to the server") - errShutdown = errors.New("client is shutting down") -) +type QueueOpts struct { + QueueName string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Args amqp.Table +} -func NewClient(deps Deps) *Client { - client := Client{ - mutex: &sync.Mutex{}, - logger: log.New(os.Stdout, "", log.LstdFlags), - queueName: deps.QueueName, - done: make(chan bool), - reconnectDelay: deps.ReconnectDelay, - reInitDelay: deps.ReInitDelay, - resendDelay: deps.ResendDelay, +func NewClient(address Address, queueOpts QueueOpts, opts ...Option) (*Client, error) { + l := log.New(os.Stdout, "", log.LstdFlags) + + addr, err := address.makeAddr() + if err != nil { + return nil, errors.Join(errBadAddr, err) } - go client.handleReconnect(deps.Addr) + if queueOpts.QueueName == "" { + l.Fatal(errNoQueue) + } - return &client + client := Client{ + mutex: &sync.Mutex{}, + queueOptions: &queueOpts, + done: make(chan bool), + connected: make(chan struct{}), + logger: l, + } + + o := options{ + connectTimeout: 15 * time.Second, + reconnectDelay: 5 * time.Second, + reInitDelay: 2 * time.Second, + resendDelay: 5 * time.Second, + consumerRateLimit: time.Millisecond * 500, + consumerBurstSize: 10, + logger: log.New(io.Discard, "", 0), + } + + for _, opt := range opts { + opt(&o) + } + + client.logger = o.logger + + if err = client.connectAndSignal(addr, o.connectTimeout); err != nil { + return nil, fmt.Errorf("failed to connect: %w", err) + } + + go client.handleReconnect(addr) + + return &client, nil } -func StartConsumer(ctx context.Context, client *Client, chanLen int) chan []byte { - msgCh := make(chan []byte, chanLen) - go runConsumer(ctx, client, msgCh) - - return msgCh +func NewPublisher(client *Client) Publisher { + return &pubHandler{client: client} +} + +func NewConsumer(client *Client) Consumer { + return &consumeHandler{client: client} +} + +func (c *Client) connectAndSignal(addr string, timeout time.Duration) error { + type result struct { + conn *amqp.Connection + err error + } + resCh := make(chan result, 1) + + go func() { + conn, err := amqp.Dial(addr) + resCh <- result{conn, err} + }() + + select { + case <-time.After(timeout): + return fmt.Errorf("connection timeout after %v", timeout) + case res := <-resCh: + if res.err != nil { + return res.err + } + + c.changeConnection(res.conn) + if err := c.init(res.conn); err != nil { + res.conn.Close() + return fmt.Errorf("init failed: %w", err) + } + + close(c.connected) + return nil + } } diff --git a/messages.go b/messages.go new file mode 100644 index 0000000..fd422ab --- /dev/null +++ b/messages.go @@ -0,0 +1,11 @@ +package rabbit + +import "errors" + +var ( + errBadAddr = errors.New("bad address") + errNoQueue = errors.New("no queue") + errNotConnected = errors.New("not connected to a server") + errAlreadyClosed = errors.New("already closed: not connected to the server") + errShutdown = errors.New("client is shutting down") +) diff --git a/options.go b/options.go new file mode 100644 index 0000000..3e84f95 --- /dev/null +++ b/options.go @@ -0,0 +1,60 @@ +package rabbit + +import ( + "io" + "log" + "os" + "time" +) + +type Option func(*options) + +func WithConnectTimeout(t time.Duration) Option { + return func(op *options) { + op.connectTimeout = t + } +} + +func WithReconnectDelay(t time.Duration) Option { + return func(op *options) { + op.reconnectDelay = t + } +} + +func WithReInitDelay(t time.Duration) Option { + return func(op *options) { + op.reInitDelay = t + } +} + +func WithResendDelay(t time.Duration) Option { + return func(op *options) { + op.resendDelay = t + } +} + +func WithConsumerRateLimit(t time.Duration) Option { + return func(op *options) { + op.consumerRateLimit = t + } +} + +func WithConsumerBurstSize(t int) Option { + return func(op *options) { + op.consumerBurstSize = t + } +} + +func WithLogger(l *log.Logger) Option { + return func(op *options) { op.logger = l } +} + +func WithLogging(enabled bool) Option { + return func(op *options) { + if enabled { + op.logger = log.New(os.Stdout, "", log.LstdFlags) + } else { + op.logger = log.New(io.Discard, "", 0) + } + } +} diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..c938eb5 --- /dev/null +++ b/publisher.go @@ -0,0 +1,65 @@ +package rabbit + +import ( + "context" + "errors" + "time" +) + +type Publisher interface { + Start(ctx context.Context, chanLen uint) chan<- []byte +} + +type pubHandler struct { + client *Client +} + +func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte { + ch := make(chan []byte, chanLen) + go func() { + for { + select { + case <-ctx.Done(): + for msg := range ch { + if err := p.push(msg); err != nil { + p.client.logger.Printf("Error publishing message (shutdown): %s", err) + } + } + p.client.logger.Println("Publisher stopped") + return + + case msg := <-ch: + if err := p.push(msg); err != nil { + p.client.logger.Printf("Error publishing message: %s", err) + } + } + } + }() + return ch +} + +func (p *pubHandler) push(data []byte) error { + p.client.mutex.Lock() + if !p.client.isReady { + p.client.mutex.Unlock() + return errors.New("failed to push: not connected") + } + p.client.mutex.Unlock() + for { + err := p.client.unsafePush(data) + if err != nil { + p.client.logger.Println("Push failed. Retrying...") + select { + case <-p.client.done: + return errShutdown + case <-time.After(p.client.opts.resendDelay): + } + continue + } + confirm := <-p.client.notifyConfirm + if confirm.Ack { + p.client.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) + return nil + } + } +} diff --git a/service.go b/service.go index 622e282..c3f9ec8 100644 --- a/service.go +++ b/service.go @@ -2,17 +2,12 @@ package rabbit import ( "context" - "errors" amqp "github.com/rabbitmq/amqp091-go" "time" ) func (c *Client) handleReconnect(addr string) { for { - c.mutex.Lock() - c.isReady = false - c.mutex.Unlock() - c.logger.Println("Connecting to server...") conn, err := c.connect(addr) @@ -22,7 +17,7 @@ func (c *Client) handleReconnect(addr string) { select { case <-c.done: return - case <-time.After(c.reconnectDelay): + case <-time.After(c.opts.reconnectDelay): } continue } @@ -46,10 +41,6 @@ func (c *Client) connect(addr string) (*amqp.Connection, error) { func (c *Client) handleReInit(conn *amqp.Connection) bool { for { - c.mutex.Lock() - c.isReady = false - c.mutex.Unlock() - if err := c.init(conn); err != nil { c.logger.Printf("Failed to initialize connection: %s", err) @@ -59,7 +50,7 @@ func (c *Client) handleReInit(conn *amqp.Connection) bool { case <-c.notifyConnClose: c.logger.Println("Connection closed. Reconnecting...") return false - case <-time.After(c.reInitDelay): + case <-time.After(c.opts.reInitDelay): } continue } @@ -87,7 +78,16 @@ func (c *Client) init(conn *amqp.Connection) error { return err } - _, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) + //_, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) + _, err = ch.QueueDeclare( + c.queueOptions.QueueName, + c.queueOptions.Durable, + c.queueOptions.AutoDelete, + c.queueOptions.Exclusive, + c.queueOptions.NoWait, + c.queueOptions.Args, + ) + if err != nil { return err } @@ -115,33 +115,7 @@ func (c *Client) changeChannel(channel *amqp.Channel) { c.Channel.NotifyPublish(c.notifyConfirm) } -func (c *Client) Push(data []byte) error { - c.mutex.Lock() - if !c.isReady { - c.mutex.Unlock() - return errors.New("failed to push: not connected") - } - c.mutex.Unlock() - for { - err := c.UnsafePush(data) - if err != nil { - c.logger.Println("Push failed. Retrying...") - select { - case <-c.done: - return errShutdown - case <-time.After(c.resendDelay): - } - continue - } - confirm := <-c.notifyConfirm - if confirm.Ack { - c.logger.Printf("Push confirmed [%d]!", confirm.DeliveryTag) - return nil - } - } -} - -func (c *Client) UnsafePush(data []byte) error { +func (c *Client) unsafePush(data []byte) error { c.mutex.Lock() if !c.isReady { c.mutex.Unlock() @@ -155,7 +129,7 @@ func (c *Client) UnsafePush(data []byte) error { return c.Channel.PublishWithContext( ctx, "", - c.queueName, + c.queueOptions.QueueName, false, false, amqp.Publishing{ @@ -165,7 +139,7 @@ func (c *Client) UnsafePush(data []byte) error { ) } -func (c *Client) Consume() (<-chan amqp.Delivery, error) { +func (c *Client) consume() (<-chan amqp.Delivery, error) { c.mutex.Lock() if !c.isReady { c.mutex.Unlock() @@ -182,7 +156,7 @@ func (c *Client) Consume() (<-chan amqp.Delivery, error) { } return c.Channel.Consume( - c.queueName, + c.queueOptions.QueueName, "", false, false,