diff --git a/address.go b/address.go deleted file mode 100644 index dadf5d3..0000000 --- a/address.go +++ /dev/null @@ -1,34 +0,0 @@ -package rabbit - -import ( - "errors" - "fmt" - "net/url" -) - -type Address struct { - Username string - Password string - Host string - Port uint16 - Vhost string -} - -func (a *Address) makeAddr() (string, error) { - if a.Host == "" { - return "", errors.New("no host provided") - } - - if a.Vhost == "" { - return "", errors.New("no vhost provided") - } - - //"amqp://username:password@host:port/vhost" - return fmt.Sprintf("amqp://%v:%v@%v:%v/%v", - url.QueryEscape(a.Username), - url.QueryEscape(a.Password), - a.Host, - a.Port, - url.PathEscape(a.Vhost), - ), nil -} diff --git a/consumer.go b/consumer.go index 5c68bf6..22dde93 100644 --- a/consumer.go +++ b/consumer.go @@ -4,6 +4,7 @@ import ( "context" amqp "github.com/rabbitmq/amqp091-go" "golang.org/x/time/rate" + "log" "time" ) @@ -28,84 +29,78 @@ func runConsumer(ctx context.Context, client *Client, msgCh chan []byte) { limiter := rate.NewLimiter(rate.Every(client.opts.consumerRateLimit), client.opts.consumerBurstSize) - reconnectSignal := make(chan struct{}, 1) - reconnectTrigger := func() { - select { - case reconnectSignal <- struct{}{}: - default: - } - } - - // initial consume deliveries, err := client.consume() if err != nil { - client.logger.Printf("Could not start consuming: %s\n", err) + log.Printf("Could not start consuming: %s\n", err) return } chClosedCh := make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) + reconnectTimer := time.NewTimer(0) + defer reconnectTimer.Stop() + <-reconnectTimer.C + for { + if !reconnectTimer.Stop() { + select { + case <-reconnectTimer.C: + default: + } + } + select { case <-runCtx.Done(): err = client.Close() if err != nil { - client.logger.Printf("Close failed: %s\n", err) + log.Printf("Close failed: %s\n", err) } return - case <-reconnectSignal: - select { - case <-runCtx.Done(): - return - case <-time.After(client.opts.reconnectDelay): - } + case amqErr := <-chClosedCh: + log.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) + reconnectTimer.Reset(time.Second) + case <-reconnectTimer.C: deliveries, err = client.consume() if err != nil { - client.logger.Printf("Consume reconnect failed, retry: %s\n", err) - reconnectTrigger() + log.Println("Error trying to consume, will try again. Retry in 5 seconds.") + reconnectTimer.Reset(time.Second * 5) continue } - // re-create closing chan chClosedCh = make(chan *amqp.Error, 1) client.Channel.NotifyClose(chClosedCh) - client.logger.Println("Consume reconnect success") - - case amqErr := <-chClosedCh: - client.logger.Printf("AMQP Channel closed due to: %s Reconnecting...\n", amqErr) - reconnectTrigger() case delivery, ok := <-deliveries: if !ok { - client.logger.Println("Deliveries channel closed unexpectedly") - reconnectTrigger() + log.Println("Deliveries channel closed unexpectedly") + reconnectTimer.Reset(time.Second) continue } if err = limiter.Wait(runCtx); err != nil { - client.logger.Printf("Wait limiter failed: %s\n", err) + log.Printf("Wait limiter failed: %s\n", err) } select { case <-runCtx.Done(): if err = delivery.Nack(false, true); err != nil { - client.logger.Printf("Error nacking message: %s\n", err) + log.Printf("Error nacking message: %s\n", err) } err = client.Close() if err != nil { - client.logger.Printf("Close failed: %s\n", err) + log.Printf("Close failed: %s\n", err) } return case msgCh <- delivery.Body: - client.logger.Printf("Received message: %s\n", delivery.Body) + log.Printf("Received message: %s\n", delivery.Body) if err = delivery.Ack(false); err != nil { - client.logger.Printf("Error acknowledging message: %s\n", err) + log.Printf("Error acknowledging message: %s\n", err) } } } diff --git a/handler.go b/handler.go index 6a8dd3f..61dc5dc 100644 --- a/handler.go +++ b/handler.go @@ -1,10 +1,8 @@ package rabbit import ( - "errors" "fmt" amqp "github.com/rabbitmq/amqp091-go" - "io" "log" "os" "sync" @@ -13,7 +11,7 @@ import ( type Client struct { mutex *sync.Mutex - queueOptions *QueueOpts + queueName string logger *log.Logger connection *amqp.Connection Channel *amqp.Channel @@ -33,55 +31,39 @@ type options struct { resendDelay time.Duration consumerRateLimit time.Duration consumerBurstSize int - logger *log.Logger } -type QueueOpts struct { - QueueName string - Durable bool - AutoDelete bool - Exclusive bool - NoWait bool - Args amqp.Table -} - -func NewClient(address Address, queueOpts QueueOpts, opts ...Option) (*Client, error) { - l := log.New(os.Stdout, "", log.LstdFlags) - - addr, err := address.makeAddr() - if err != nil { - return nil, errors.Join(errBadAddr, err) +func NewClient(addr, queueName string, opts ...Option) (*Client, error) { + if addr == "" { + log.Fatal(errNoAddr) } - if queueOpts.QueueName == "" { - l.Fatal(errNoQueue) + if queueName == "" { + log.Fatal(errNoQueue) } client := Client{ - mutex: &sync.Mutex{}, - queueOptions: &queueOpts, - done: make(chan bool), - connected: make(chan struct{}), - logger: l, + mutex: &sync.Mutex{}, + logger: log.New(os.Stdout, "", log.LstdFlags), + queueName: queueName, + done: make(chan bool), + connected: make(chan struct{}), } o := options{ connectTimeout: 15 * time.Second, - reconnectDelay: 5 * time.Second, - reInitDelay: 2 * time.Second, - resendDelay: 5 * time.Second, + reconnectDelay: 5, + reInitDelay: 2, + resendDelay: 5, consumerRateLimit: time.Millisecond * 500, consumerBurstSize: 10, - logger: log.New(io.Discard, "", 0), } for _, opt := range opts { opt(&o) } - client.logger = o.logger - - if err = client.connectAndSignal(addr, o.connectTimeout); err != nil { + if err := client.connectAndSignal(addr, o.connectTimeout); err != nil { return nil, fmt.Errorf("failed to connect: %w", err) } diff --git a/messages.go b/messages.go index fd422ab..0f634d1 100644 --- a/messages.go +++ b/messages.go @@ -3,7 +3,7 @@ package rabbit import "errors" var ( - errBadAddr = errors.New("bad address") + errNoAddr = errors.New("no address") errNoQueue = errors.New("no queue") errNotConnected = errors.New("not connected to a server") errAlreadyClosed = errors.New("already closed: not connected to the server") diff --git a/options.go b/options.go index 3e84f95..37ad97c 100644 --- a/options.go +++ b/options.go @@ -1,11 +1,6 @@ package rabbit -import ( - "io" - "log" - "os" - "time" -) +import "time" type Option func(*options) @@ -44,17 +39,3 @@ func WithConsumerBurstSize(t int) Option { op.consumerBurstSize = t } } - -func WithLogger(l *log.Logger) Option { - return func(op *options) { op.logger = l } -} - -func WithLogging(enabled bool) Option { - return func(op *options) { - if enabled { - op.logger = log.New(os.Stdout, "", log.LstdFlags) - } else { - op.logger = log.New(io.Discard, "", 0) - } - } -} diff --git a/publisher.go b/publisher.go index c938eb5..a65154a 100644 --- a/publisher.go +++ b/publisher.go @@ -3,6 +3,7 @@ package rabbit import ( "context" "errors" + "log" "time" ) @@ -22,15 +23,15 @@ func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte { case <-ctx.Done(): for msg := range ch { if err := p.push(msg); err != nil { - p.client.logger.Printf("Error publishing message (shutdown): %s", err) + log.Printf("Error publishing message (shutdown): %s", err) } } - p.client.logger.Println("Publisher stopped") + log.Println("Publisher stopped") return case msg := <-ch: if err := p.push(msg); err != nil { - p.client.logger.Printf("Error publishing message: %s", err) + log.Printf("Error publishing message: %s", err) } } } diff --git a/service.go b/service.go index c3f9ec8..cf6ccf1 100644 --- a/service.go +++ b/service.go @@ -8,6 +8,10 @@ import ( func (c *Client) handleReconnect(addr string) { for { + //c.mutex.Lock() + //c.isReady = false + //c.mutex.Unlock() + c.logger.Println("Connecting to server...") conn, err := c.connect(addr) @@ -41,6 +45,10 @@ func (c *Client) connect(addr string) (*amqp.Connection, error) { func (c *Client) handleReInit(conn *amqp.Connection) bool { for { + //c.mutex.Lock() + //c.isReady = false + //c.mutex.Unlock() + if err := c.init(conn); err != nil { c.logger.Printf("Failed to initialize connection: %s", err) @@ -78,16 +86,7 @@ func (c *Client) init(conn *amqp.Connection) error { return err } - //_, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) - _, err = ch.QueueDeclare( - c.queueOptions.QueueName, - c.queueOptions.Durable, - c.queueOptions.AutoDelete, - c.queueOptions.Exclusive, - c.queueOptions.NoWait, - c.queueOptions.Args, - ) - + _, err = ch.QueueDeclare(c.queueName, false, false, false, false, nil) if err != nil { return err } @@ -129,7 +128,7 @@ func (c *Client) unsafePush(data []byte) error { return c.Channel.PublishWithContext( ctx, "", - c.queueOptions.QueueName, + c.queueName, false, false, amqp.Publishing{ @@ -156,7 +155,7 @@ func (c *Client) consume() (<-chan amqp.Delivery, error) { } return c.Channel.Consume( - c.queueOptions.QueueName, + c.queueName, "", false, false,