package rabbit import ( "fmt" amqp "github.com/rabbitmq/amqp091-go" "log" "net" "os" "sync" "time" ) type Client struct { mutex *sync.Mutex queueName string logger *log.Logger connection *amqp.Connection Channel *amqp.Channel done chan bool notifyConnClose chan *amqp.Error notifyChanClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isReady bool opts options connected chan struct{} } type options struct { connectTimeout time.Duration reconnectDelay time.Duration reInitDelay time.Duration resendDelay time.Duration consumerRateLimit time.Duration consumerBurstSize int } func NewClient(addr, queueName string, opts ...Option) (*Client, error) { if addr == "" { log.Fatal(errNoAddr) } if queueName == "" { log.Fatal(errNoQueue) } client := Client{ mutex: &sync.Mutex{}, logger: log.New(os.Stdout, "", log.LstdFlags), queueName: queueName, done: make(chan bool), connected: make(chan struct{}), } o := options{ connectTimeout: 15 * time.Second, reconnectDelay: 5, reInitDelay: 2, resendDelay: 5, consumerRateLimit: time.Millisecond * 500, consumerBurstSize: 10, } for _, opt := range opts { opt(&o) } if err := client.connectAndSignal(addr, o.connectTimeout); err != nil { return nil, fmt.Errorf("failed to connect: %w", err) } go client.handleReconnect(addr) return &client, nil } func NewPublisher(client *Client) Publisher { return &pubHandler{client: client} } func NewConsumer(client *Client) Consumer { return &consumeHandler{client: client} } func (c *Client) connectAndSignal(addr string, timeout time.Duration) error { dialer := &net.Dialer{Timeout: timeout} conn, err := amqp.DialConfig(addr, amqp.Config{ Dial: dialer.Dial, }) if err != nil { return err } c.connection = conn close(c.connected) return nil }