diff --git a/handler.go b/handler.go index 949887f..9383673 100644 --- a/handler.go +++ b/handler.go @@ -1,8 +1,10 @@ package rabbit import ( + "fmt" amqp "github.com/rabbitmq/amqp091-go" "log" + "net" "os" "sync" "time" @@ -20,9 +22,11 @@ type Client struct { notifyConfirm chan amqp.Confirmation isReady bool opts options + connected chan struct{} } type options struct { + connectTimeout time.Duration reconnectDelay time.Duration reInitDelay time.Duration resendDelay time.Duration @@ -30,7 +34,7 @@ type options struct { consumerBurstSize int } -func NewClient(addr, queueName string, opts ...Option) *Client { +func NewClient(addr, queueName string, opts ...Option) (*Client, error) { if addr == "" { log.Fatal(errNoAddr) } @@ -47,6 +51,7 @@ func NewClient(addr, queueName string, opts ...Option) *Client { } o := options{ + connectTimeout: 15 * time.Second, reconnectDelay: 5, reInitDelay: 2, resendDelay: 5, @@ -58,9 +63,13 @@ func NewClient(addr, queueName string, opts ...Option) *Client { opt(&o) } + if err := client.connectAndSignal(addr, o.connectTimeout); err != nil { + return nil, fmt.Errorf("failed to connect: %w", err) + } + go client.handleReconnect(addr) - return &client + return &client, nil } func NewPublisher(client *Client) Publisher { @@ -70,3 +79,16 @@ func NewPublisher(client *Client) Publisher { func NewConsumer(client *Client) Consumer { return &consumeHandler{client: client} } + +func (c *Client) connectAndSignal(addr string, timeout time.Duration) error { + dialer := &net.Dialer{Timeout: timeout} + conn, err := amqp.DialConfig(addr, amqp.Config{ + Dial: dialer.Dial, + }) + if err != nil { + return err + } + c.connection = conn + close(c.connected) + return nil +} diff --git a/options.go b/options.go index aaa7828..37ad97c 100644 --- a/options.go +++ b/options.go @@ -4,6 +4,12 @@ import "time" type Option func(*options) +func WithConnectTimeout(t time.Duration) Option { + return func(op *options) { + op.connectTimeout = t + } +} + func WithReconnectDelay(t time.Duration) Option { return func(op *options) { op.reconnectDelay = t