ctx + write only chan
This commit is contained in:
parent
42646a6a08
commit
40d1d3dd29
1 changed files with 12 additions and 2 deletions
14
publisher.go
14
publisher.go
|
|
@ -1,24 +1,34 @@
|
||||||
package rabbit
|
package rabbit
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Publisher interface {
|
type Publisher interface {
|
||||||
Start(chanLen uint) <-chan []byte
|
Start(chanLen uint) chan<- []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type pubHandler struct {
|
type pubHandler struct {
|
||||||
client *Client
|
client *Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pubHandler) Start(chanLen uint) <-chan []byte {
|
func (p *pubHandler) Start(ctx context.Context, chanLen uint) chan<- []byte {
|
||||||
ch := make(chan []byte, chanLen)
|
ch := make(chan []byte, chanLen)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
for msg := range ch {
|
||||||
|
if err := p.push(msg); err != nil {
|
||||||
|
log.Printf("Error publishing message (shutdown): %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Println("Publisher stopped")
|
||||||
|
return
|
||||||
|
|
||||||
case msg := <-ch:
|
case msg := <-ch:
|
||||||
if err := p.push(msg); err != nil {
|
if err := p.push(msg); err != nil {
|
||||||
log.Printf("Error publishing message: %s", err)
|
log.Printf("Error publishing message: %s", err)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue