package rabbit import ( "errors" "fmt" amqp "github.com/rabbitmq/amqp091-go" "io" "log" "os" "sync" "time" ) type Client struct { mutex *sync.Mutex queueName string logger *log.Logger connection *amqp.Connection Channel *amqp.Channel done chan bool notifyConnClose chan *amqp.Error notifyChanClose chan *amqp.Error notifyConfirm chan amqp.Confirmation isReady bool opts options connected chan struct{} } type options struct { connectTimeout time.Duration reconnectDelay time.Duration reInitDelay time.Duration resendDelay time.Duration consumerRateLimit time.Duration consumerBurstSize int logger *log.Logger } func NewClient(address Address, queueName string, opts ...Option) (*Client, error) { l := log.New(os.Stdout, "", log.LstdFlags) addr, err := address.makeAddr() if err != nil { return nil, errors.Join(errBadAddr, err) } if queueName == "" { l.Fatal(errNoQueue) } client := Client{ mutex: &sync.Mutex{}, queueName: queueName, done: make(chan bool), connected: make(chan struct{}), logger: l, } o := options{ connectTimeout: 15 * time.Second, reconnectDelay: 5 * time.Second, reInitDelay: 2 * time.Second, resendDelay: 5 * time.Second, consumerRateLimit: time.Millisecond * 500, consumerBurstSize: 10, logger: log.New(io.Discard, "", 0), } for _, opt := range opts { opt(&o) } client.logger = o.logger if err = client.connectAndSignal(addr, o.connectTimeout); err != nil { return nil, fmt.Errorf("failed to connect: %w", err) } go client.handleReconnect(addr) return &client, nil } func NewPublisher(client *Client) Publisher { return &pubHandler{client: client} } func NewConsumer(client *Client) Consumer { return &consumeHandler{client: client} } func (c *Client) connectAndSignal(addr string, timeout time.Duration) error { type result struct { conn *amqp.Connection err error } resCh := make(chan result, 1) go func() { conn, err := amqp.Dial(addr) resCh <- result{conn, err} }() select { case <-time.After(timeout): return fmt.Errorf("connection timeout after %v", timeout) case res := <-resCh: if res.err != nil { return res.err } c.changeConnection(res.conn) if err := c.init(res.conn); err != nil { res.conn.Close() return fmt.Errorf("init failed: %w", err) } close(c.connected) return nil } }