connection timeout
This commit is contained in:
parent
46e07b7577
commit
9d837385b1
2 changed files with 30 additions and 2 deletions
26
handler.go
26
handler.go
|
|
@ -1,8 +1,10 @@
|
||||||
package rabbit
|
package rabbit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
"log"
|
"log"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -20,9 +22,11 @@ type Client struct {
|
||||||
notifyConfirm chan amqp.Confirmation
|
notifyConfirm chan amqp.Confirmation
|
||||||
isReady bool
|
isReady bool
|
||||||
opts options
|
opts options
|
||||||
|
connected chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type options struct {
|
type options struct {
|
||||||
|
connectTimeout time.Duration
|
||||||
reconnectDelay time.Duration
|
reconnectDelay time.Duration
|
||||||
reInitDelay time.Duration
|
reInitDelay time.Duration
|
||||||
resendDelay time.Duration
|
resendDelay time.Duration
|
||||||
|
|
@ -30,7 +34,7 @@ type options struct {
|
||||||
consumerBurstSize int
|
consumerBurstSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(addr, queueName string, opts ...Option) *Client {
|
func NewClient(addr, queueName string, opts ...Option) (*Client, error) {
|
||||||
if addr == "" {
|
if addr == "" {
|
||||||
log.Fatal(errNoAddr)
|
log.Fatal(errNoAddr)
|
||||||
}
|
}
|
||||||
|
|
@ -47,6 +51,7 @@ func NewClient(addr, queueName string, opts ...Option) *Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
o := options{
|
o := options{
|
||||||
|
connectTimeout: 15 * time.Second,
|
||||||
reconnectDelay: 5,
|
reconnectDelay: 5,
|
||||||
reInitDelay: 2,
|
reInitDelay: 2,
|
||||||
resendDelay: 5,
|
resendDelay: 5,
|
||||||
|
|
@ -58,9 +63,13 @@ func NewClient(addr, queueName string, opts ...Option) *Client {
|
||||||
opt(&o)
|
opt(&o)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := client.connectAndSignal(addr, o.connectTimeout); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
go client.handleReconnect(addr)
|
go client.handleReconnect(addr)
|
||||||
|
|
||||||
return &client
|
return &client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPublisher(client *Client) Publisher {
|
func NewPublisher(client *Client) Publisher {
|
||||||
|
|
@ -70,3 +79,16 @@ func NewPublisher(client *Client) Publisher {
|
||||||
func NewConsumer(client *Client) Consumer {
|
func NewConsumer(client *Client) Consumer {
|
||||||
return &consumeHandler{client: client}
|
return &consumeHandler{client: client}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) connectAndSignal(addr string, timeout time.Duration) error {
|
||||||
|
dialer := &net.Dialer{Timeout: timeout}
|
||||||
|
conn, err := amqp.DialConfig(addr, amqp.Config{
|
||||||
|
Dial: dialer.Dial,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.connection = conn
|
||||||
|
close(c.connected)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,12 @@ import "time"
|
||||||
|
|
||||||
type Option func(*options)
|
type Option func(*options)
|
||||||
|
|
||||||
|
func WithConnectTimeout(t time.Duration) Option {
|
||||||
|
return func(op *options) {
|
||||||
|
op.connectTimeout = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithReconnectDelay(t time.Duration) Option {
|
func WithReconnectDelay(t time.Duration) Option {
|
||||||
return func(op *options) {
|
return func(op *options) {
|
||||||
op.reconnectDelay = t
|
op.reconnectDelay = t
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue