diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..4e6f54e --- /dev/null +++ b/.dockerignore @@ -0,0 +1,7 @@ +.idea +.git +.gitignore +.forgejo +config.env +cmd/build.sh +cmd/parserService \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cc41c90 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +cmd/build.sh +cmd/parserService \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..694497b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +FROM golang:1.25.1-alpine3.22 AS builder + +WORKDIR /build + +COPY go.* ./ + +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 GOOS=linux go build -o app ./cmd + + +FROM alpine:latest + +RUN mkdir -p /home && \ + apk --no-cache add curl + +COPY --from=builder /build/app /home/app + +EXPOSE 9050 + +CMD ["./home/app"] diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..95e13e4 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,17 @@ +package main + +import ( + "parsing-service/config" + "parsing-service/internal/app" + "parsing-service/internal/logging" +) + +func main() { + c := config.NewConfig() + + logging.LogSetup(c.LogLevel) + + appl := app.New(c) + + appl.Run() +} diff --git a/config.env b/config.env new file mode 100644 index 0000000..4fabc68 --- /dev/null +++ b/config.env @@ -0,0 +1,9 @@ +APP_HOST=0.0.0.0 +APP_CLIENT_PORT=9050 +APP_SERVER_PORT=9060 +APP_LOG_LEVEL=error +APP_NUMCPUS=-1 +APP_CHECK_PERIOD=6 + +TASK_RETRY_COUNT=3 +TASK_RETRY_MINUTES=5 \ No newline at end of file diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..507311c --- /dev/null +++ b/config/config.go @@ -0,0 +1,57 @@ +package config + +import ( + log "github.com/sirupsen/logrus" + "os" + "strconv" +) + +type Config struct { + Host string + ClientPort string + ServerPort string + LogLevel string + NumCPUs int + CheckPeriod int + TasksConfig TasksConfig +} + +type TasksConfig struct { + RetryCount int + RetryMinutes int +} + +func NewConfig() *Config { + return &Config{ + Host: getEnv("APP_HOST", "0.0.0.0"), + ClientPort: getEnv("APP_PORT", "9050"), + ServerPort: getEnv("APP_SERVER_PORT", "9060"), + LogLevel: getEnv("APP_LOG_LEVEL", "debug"), + NumCPUs: getEnvInt("APP_NUMCPUS", -1), + CheckPeriod: getEnvInt("APP_CHECK_PERIOD", 6), + + TasksConfig: TasksConfig{ + RetryCount: getEnvInt("TASK_RETRY_COUNT", 3), + RetryMinutes: getEnvInt("TASK_RETRY_MINUTES", 5), + }, + } +} + +func getEnv(key, fallback string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return fallback +} + +func getEnvInt(key string, fallback int) int { + if value, ok := os.LookupEnv(key); ok { + num, err := strconv.Atoi(value) + if err != nil { + log.WithField("default", -1).Warn("Config | Can't parse value as int") + return fallback + } + return num + } + return fallback +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..72b6afb --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module parsing-service + +go 1.25.1 + +require ( + github.com/sirupsen/logrus v1.9.3 + google.golang.org/grpc v1.75.1 + google.golang.org/protobuf v1.36.10 +) + +require ( + golang.org/x/net v0.44.0 // indirect + golang.org/x/sys v0.36.0 // indirect + golang.org/x/text v0.29.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2da4c56 --- /dev/null +++ b/go.sum @@ -0,0 +1,50 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 h1:i8QOKZfYg6AbGVZzUAY3LrNWCKF8O6zFisU9Wl9RER4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ= +google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= +google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/app/app.go b/internal/app/app.go new file mode 100644 index 0000000..4551b3b --- /dev/null +++ b/internal/app/app.go @@ -0,0 +1,150 @@ +package app + +import ( + "context" + log "github.com/sirupsen/logrus" + "net" + "os" + "os/signal" + "parsing-service/config" + "parsing-service/internal/appState" + "parsing-service/internal/network" + "parsing-service/internal/parsers" + "parsing-service/internal/processor" + "parsing-service/internal/shared" + "runtime" + "syscall" + "time" +) + +type App struct { + ClientAddress string + ServerAddress string + NumCPUs int + CheckPeriod time.Duration + StartTime time.Time + RetryCount int + RetryMinutes int + State *appState.State + Network *network.Network +} + +func New(c *config.Config) *App { + numCPUs := c.NumCPUs + if numCPUs < 1 { + numCPUs = runtime.NumCPU() + } + + st := appState.NewState(numCPUs, c.CheckPeriod, c.TasksConfig.RetryCount, c.TasksConfig.RetryMinutes) + + return &App{ + ClientAddress: c.Host + ":" + c.ClientPort, + ServerAddress: c.Host + ":" + c.ServerPort, + NumCPUs: numCPUs, + CheckPeriod: time.Duration(c.CheckPeriod), + StartTime: time.Now(), + RetryCount: c.TasksConfig.RetryCount, + RetryMinutes: c.TasksConfig.RetryMinutes, + State: st, + Network: network.NewHandler(), + } +} + +func (app *App) Run() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + log.Info("Application start") + log.WithFields(log.Fields{ + "ClientAddress": app.ClientAddress, + "Number of CPUs": app.NumCPUs, + }).Debug("App settings") + + server := newServer(app) + client := newClient(app) + + period := time.NewTicker(app.CheckPeriod * time.Hour) + defer period.Stop() + + sender := make(chan shared.TaskResult, app.NumCPUs*10) + + //task processor + handlers := map[string]parsers.TaskHandler{ + shared.OriginSurugaya: parsers.NewSurugayaParser(), + shared.OriginMandarake: parsers.NewMandarakeParser(), + } + + taskProcessor := processor.New(processor.Deps{ + Handlers: handlers, + Out: sender, + State: app.State, + Ctx: ctx, + Client: client, + NumCPUs: app.NumCPUs, + }) + + process := func() { + app.State.SetStatus(appState.StatusRequestTasks) + log.Info("Requesting data for parsing") + + receivedTasks := app.Network.RequestTasks(ctx, client) + log.WithField("length", len(receivedTasks)).Debug("End receiving") + + taskProcessor.StartWork(receivedTasks) + } + + go func() { + process() //immediate start + for range period.C { + process() + } + }() + + //done tasks sender + go func() { + ticker := time.NewTicker(time.Second * 2) + defer ticker.Stop() + + var sendData []shared.TaskResult + + for { + select { + case task := <-sender: + sendData = append(sendData, task) + + case <-ticker.C: + l := len(sendData) + if l > 0 { + log.WithField("length", l).Debug("Sending parsed data") + app.Network.SendResult(client, sendData) + sendData = sendData[:0] + } + } + } + }() + + //gRPC Server for status response + go func() { + listener, err := net.Listen("tcp", app.ServerAddress) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + log.Infof("gRPC Server listening at %v", app.ServerAddress) + if err := server.Serve(listener); err != nil { + log.Fatalf("failed to serve: %v", err) + } + }() + + go func() { + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, syscall.SIGTERM) + <-sigint + log.Info("Shutting down...") + + period.Stop() + server.GracefulStop() + cancel() + }() + <-ctx.Done() +} diff --git a/internal/app/client.go b/internal/app/client.go new file mode 100644 index 0000000..1058a61 --- /dev/null +++ b/internal/app/client.go @@ -0,0 +1,21 @@ +package app + +import ( + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + pb "parsing-service/proto/taskProcessor" +) + +func newClient(app *App) pb.TaskProcessorClient { + var opts []grpc.DialOption + insec := grpc.WithTransportCredentials(insecure.NewCredentials()) + opts = append(opts, insec) + + conn, err := grpc.NewClient(app.ClientAddress, opts...) + if err != nil { + log.Fatal(err) + } + + return pb.NewTaskProcessorClient(conn) +} diff --git a/internal/app/server.go b/internal/app/server.go new file mode 100644 index 0000000..a2bc864 --- /dev/null +++ b/internal/app/server.go @@ -0,0 +1,27 @@ +package app + +import ( + "context" + "google.golang.org/grpc" + "parsing-service/internal/appState" + pb "parsing-service/proto/taskProcessor" +) + +type Server struct { + pb.UnimplementedTaskProcessorServer + state *appState.State +} + +func newServer(app *App) *grpc.Server { + s := grpc.NewServer() + srv := &Server{ + state: app.State, + } + pb.RegisterTaskProcessorServer(s, srv) + return s +} + +func (s *Server) ProcessorStatus(_ context.Context, _ *pb.ProcessorStatusRequest) (*pb.ProcessorStatusResponse, error) { + resp := s.state.StateResponse() + return resp, nil +} diff --git a/internal/appState/state.go b/internal/appState/state.go new file mode 100644 index 0000000..04ad62f --- /dev/null +++ b/internal/appState/state.go @@ -0,0 +1,99 @@ +package appState + +import ( + pb "parsing-service/proto/taskProcessor" + "sync/atomic" + "time" +) + +type State struct { + appStart int64 + lastCheck atomic.Int64 + tasksReceived atomic.Int32 + tasksInProgress atomic.Int32 + tasksFirstTry atomic.Int32 + tasksDoneAfterRetry atomic.Int32 + tasksFailed atomic.Int32 + workStatus atomic.Int32 + numCPUs int32 + checkPeriod int32 + RetriesCount int + retriesMinutes int32 +} + +func NewState(numCPUs, checkPeriod, retriesCount, retriesMinutes int) *State { + now := time.Now().Unix() + + state := &State{ + appStart: now, + numCPUs: int32(numCPUs), + checkPeriod: int32(checkPeriod), + RetriesCount: retriesCount, + retriesMinutes: int32(retriesMinutes), + } + + state.lastCheck.Store(now) + return state +} + +func (s *State) ResetCounters() { + s.tasksReceived.Store(0) + s.tasksInProgress.Store(0) + s.tasksFirstTry.Store(0) + s.tasksDoneAfterRetry.Store(0) + s.tasksFailed.Store(0) +} + +func (s *State) SetStatus(status Status) { + s.workStatus.Store(int32(status)) +} + +func (s *State) SetLastCheck() { + s.lastCheck.Store(time.Now().Unix()) +} + +func (s *State) SetTasksReceived(num int) { + s.tasksReceived.Swap(int32(num)) + s.tasksInProgress.Swap(int32(num)) +} + +func (s *State) TaskDone() { + s.tasksInProgress.Add(-1) +} + +func (s *State) FirstTry() { + s.tasksFirstTry.Add(1) +} + +func (s *State) DoneAfterRetry() { + s.tasksDoneAfterRetry.Add(1) +} + +func (s *State) Failed() { + s.tasksFailed.Add(1) +} + +func (s *State) InProgress() bool { + return s.tasksInProgress.Load() > 0 +} + +func (s *State) SetIdleStatus() { + s.workStatus.Store(int32(StatusIdle)) +} + +func (s *State) StateResponse() *pb.ProcessorStatusResponse { + return &pb.ProcessorStatusResponse{ + AppStart: s.appStart, + LastCheck: s.lastCheck.Load(), + TasksReceived: s.tasksReceived.Load(), + TasksInProgress: s.tasksInProgress.Load(), + TasksFirstTry: s.tasksFirstTry.Load(), + TasksDoneAfterRetry: s.tasksDoneAfterRetry.Load(), + TasksFailed: s.tasksFailed.Load(), + WorkStatus: Status(s.workStatus.Load()).String(), + NumCPUs: s.numCPUs, + CheckPeriod: s.checkPeriod, + RetriesCount: int32(s.RetriesCount), + RetriesMinutes: s.retriesMinutes, + } +} diff --git a/internal/appState/status.go b/internal/appState/status.go new file mode 100644 index 0000000..a085600 --- /dev/null +++ b/internal/appState/status.go @@ -0,0 +1,24 @@ +package appState + +type Status int32 + +const ( + StatusIdle Status = iota + StatusRequestTasks + StatusWorkInProgress + StatusFailure +) + +var statusNames = [...]string{ + "Idle", + "Requesting tasks", + "Work in progress", + "Failure", +} + +func (s Status) String() string { + if s < 0 || s >= Status(len(statusNames)) { + return "unknown" + } + return statusNames[s] +} diff --git a/internal/logging/logging.go b/internal/logging/logging.go new file mode 100644 index 0000000..19fcc22 --- /dev/null +++ b/internal/logging/logging.go @@ -0,0 +1,35 @@ +package logging + +import ( + "fmt" + log "github.com/sirupsen/logrus" + "os" + "path" + "runtime" +) + +func LogSetup(lvl string) { + l, err := log.ParseLevel(lvl) + if err != nil { + log.SetLevel(log.DebugLevel) + } + + log.SetFormatter( + &log.TextFormatter{ + FullTimestamp: true, + CallerPrettyfier: func(f *runtime.Frame) (string, string) { + filename := path.Base(f.File) + return fmt.Sprintf("%s()", f.Function), fmt.Sprintf(" %s:%d", filename, f.Line) + }, + }, + ) + + if l == log.DebugLevel { + log.SetLevel(l) + //log.SetReportCaller(true) + } else { + log.SetLevel(l) + } + + log.SetOutput(os.Stdout) +} diff --git a/internal/network/handler.go b/internal/network/handler.go new file mode 100644 index 0000000..50d049b --- /dev/null +++ b/internal/network/handler.go @@ -0,0 +1,7 @@ +package network + +type Network struct{} + +func NewHandler() *Network { + return &Network{} +} diff --git a/internal/network/interface.go b/internal/network/interface.go new file mode 100644 index 0000000..1225549 --- /dev/null +++ b/internal/network/interface.go @@ -0,0 +1,12 @@ +package network + +import ( + "context" + "parsing-service/internal/shared" + pb "parsing-service/proto/taskProcessor" +) + +type Handler interface { + RequestTasks(ctx context.Context, client pb.TaskProcessorClient) []shared.TaskResponse + SendResult(client pb.TaskProcessorClient, tasksDone []shared.TaskResult) +} diff --git a/internal/network/recieve.go b/internal/network/recieve.go new file mode 100644 index 0000000..40b2104 --- /dev/null +++ b/internal/network/recieve.go @@ -0,0 +1,38 @@ +package network + +import ( + "context" + log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/emptypb" + "io" + "parsing-service/internal/shared" + pb "parsing-service/proto/taskProcessor" +) + +func (n *Network) RequestTasks(ctx context.Context, client pb.TaskProcessorClient) []shared.TaskResponse { + var tasksList []shared.TaskResponse + stream, err := client.RequestTask(ctx, &emptypb.Empty{}) + if err != nil { + log.WithField("err", err).Error("Error calling Request Tasks") + return nil + } + + for { + response, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.WithField("err", err).Error("Error receiving response") + return nil + } + + tasksList = append(tasksList, shared.TaskResponse{ + MerchUuid: response.MerchUuid, + OriginSurugayaLink: response.OriginSurugayaLink, + OriginMandarakeLink: response.OriginMandarakeLink, + }) + log.WithField("entry added", response.MerchUuid).Debug("gRPC Receive") + } + return tasksList +} diff --git a/internal/network/send.go b/internal/network/send.go new file mode 100644 index 0000000..44b976e --- /dev/null +++ b/internal/network/send.go @@ -0,0 +1,35 @@ +package network + +import ( + "context" + log "github.com/sirupsen/logrus" + "parsing-service/internal/shared" + pb "parsing-service/proto/taskProcessor" +) + +func (n *Network) SendResult(client pb.TaskProcessorClient, tasksDone []shared.TaskResult) { + stream, err := client.SendResult(context.Background()) + if err != nil { + log.Fatalf("Error calling PostMerch: %v", err) + } + + merchResponses := make([]pb.Result, len(tasksDone)) + for i, task := range tasksDone { + merchResponses[i] = pb.Result{ + MerchUuid: task.MerchUuid, + OriginName: task.Origin, + Price: task.Price, + } + } + + for i := range merchResponses { + response := &merchResponses[i] + if err = stream.Send(response); err != nil { + log.Fatalf("Error sending request: %v", err) + } + } + + if err = stream.CloseSend(); err != nil { + log.Fatalf("Error closing stream: %v", err) + } +} diff --git a/internal/parsers/interface.go b/internal/parsers/interface.go new file mode 100644 index 0000000..b9fc4ae --- /dev/null +++ b/internal/parsers/interface.go @@ -0,0 +1,10 @@ +package parsers + +import ( + "parsing-service/internal/appState" + "parsing-service/internal/shared" +) + +type TaskHandler interface { + HandleTask(task shared.Task, sender chan shared.TaskResult, state *appState.State) error +} diff --git a/internal/parsers/mandarake.go b/internal/parsers/mandarake.go new file mode 100644 index 0000000..708a17b --- /dev/null +++ b/internal/parsers/mandarake.go @@ -0,0 +1,18 @@ +package parsers + +import ( + log "github.com/sirupsen/logrus" + "parsing-service/internal/appState" + "parsing-service/internal/shared" +) + +type MandarakeParser struct{} + +func NewMandarakeParser() *MandarakeParser { + return &MandarakeParser{} +} + +func (s *MandarakeParser) HandleTask(task shared.Task, sender chan shared.TaskResult, state *appState.State) error { + log.Debug("Handling Mandarake Task") + return nil +} diff --git a/internal/parsers/surugaya.go b/internal/parsers/surugaya.go new file mode 100644 index 0000000..373fb08 --- /dev/null +++ b/internal/parsers/surugaya.go @@ -0,0 +1,18 @@ +package parsers + +import ( + log "github.com/sirupsen/logrus" + "parsing-service/internal/appState" + "parsing-service/internal/shared" +) + +type SurugayaParser struct{} + +func NewSurugayaParser() *SurugayaParser { + return &SurugayaParser{} +} + +func (s *SurugayaParser) HandleTask(task shared.Task, sender chan shared.TaskResult, state *appState.State) error { + log.Debug("Handling Surugaya Task") + return nil +} diff --git a/internal/processor/handler.go b/internal/processor/handler.go new file mode 100644 index 0000000..782bb20 --- /dev/null +++ b/internal/processor/handler.go @@ -0,0 +1,38 @@ +package processor + +import ( + "context" + "parsing-service/internal/appState" + "parsing-service/internal/parsers" + "parsing-service/internal/shared" + pb "parsing-service/proto/taskProcessor" +) + +type Processor struct { + handlers map[string]parsers.TaskHandler + out chan shared.TaskResult + state *appState.State + ctx context.Context + client pb.TaskProcessorClient + numCPUs int +} + +type Deps struct { + Handlers map[string]parsers.TaskHandler + Out chan shared.TaskResult + State *appState.State + Ctx context.Context + Client pb.TaskProcessorClient + NumCPUs int +} + +func New(deps Deps) *Processor { + return &Processor{ + handlers: deps.Handlers, + out: deps.Out, + state: deps.State, + ctx: deps.Ctx, + client: deps.Client, + numCPUs: deps.NumCPUs, + } +} diff --git a/internal/processor/service.go b/internal/processor/service.go new file mode 100644 index 0000000..684f502 --- /dev/null +++ b/internal/processor/service.go @@ -0,0 +1,60 @@ +package processor + +import ( + log "github.com/sirupsen/logrus" + "parsing-service/internal/appState" + "parsing-service/internal/shared" + "sync" +) + +func (p *Processor) StartWork(receivedTasks []shared.TaskResponse) { + log.Info("Starting work...") + p.state.ResetCounters() + + in := make(chan shared.Task, p.numCPUs*10) + + wg := &sync.WaitGroup{} + for i := 0; i < p.numCPUs*10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + p.worker(in) + }() + } + + tasksNumber := len(receivedTasks) + if tasksNumber > 0 { + p.state.SetStatus(appState.StatusWorkInProgress) + p.state.SetTasksReceived(tasksNumber) + p.sortTasks(in, receivedTasks) + } + + close(in) + wg.Wait() + + log.Debug("All goroutines finished") + p.state.SetLastCheck() + p.state.SetStatus(appState.StatusIdle) + log.Debugf("State | %+v", p.state) +} + +func (p *Processor) sortTasks(in chan<- shared.Task, receivedTasks []shared.TaskResponse) { + for _, task := range receivedTasks { + switch { + case task.OriginSurugayaLink != "": + in <- shared.Task{ + MerchUuid: task.MerchUuid, + Origin: shared.OriginSurugaya, + Link: task.OriginSurugayaLink, + RetryCount: 3, + } + case task.OriginMandarakeLink != "": + in <- shared.Task{ + MerchUuid: task.MerchUuid, + Origin: shared.OriginMandarake, + Link: task.OriginMandarakeLink, + RetryCount: 3, + } + } + } +} diff --git a/internal/processor/worker.go b/internal/processor/worker.go new file mode 100644 index 0000000..4049042 --- /dev/null +++ b/internal/processor/worker.go @@ -0,0 +1,18 @@ +package processor + +import ( + log "github.com/sirupsen/logrus" + "parsing-service/internal/shared" +) + +const zeroPrice = 0 //for debug purposes + +func (p *Processor) worker(in <-chan shared.Task) { + for task := range in { + err := p.handlers[task.Origin].HandleTask(task, p.out, p.state) + if err != nil { + log.WithField("err", err).Error("Worker | Handle task") + continue + } + } +} diff --git a/internal/shared/channels.go b/internal/shared/channels.go new file mode 100644 index 0000000..e4805e8 --- /dev/null +++ b/internal/shared/channels.go @@ -0,0 +1,6 @@ +package shared + +type Channels struct { + Surugaya chan Task + Mandarake chan Task +} diff --git a/internal/shared/originConst.go b/internal/shared/originConst.go new file mode 100644 index 0000000..432002b --- /dev/null +++ b/internal/shared/originConst.go @@ -0,0 +1,6 @@ +package shared + +const ( + OriginSurugaya = "surugaya" + OriginMandarake = "mandarake" +) diff --git a/internal/shared/task.go b/internal/shared/task.go new file mode 100644 index 0000000..7be6f20 --- /dev/null +++ b/internal/shared/task.go @@ -0,0 +1,20 @@ +package shared + +type Task struct { + MerchUuid string + Origin string + Link string + RetryCount int +} + +type TaskResponse struct { + MerchUuid string + OriginSurugayaLink string + OriginMandarakeLink string +} + +type TaskResult struct { + MerchUuid string + Origin string + Price uint32 +} diff --git a/proto/task.proto b/proto/task.proto new file mode 100644 index 0000000..bb17f67 --- /dev/null +++ b/proto/task.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; +import "google/protobuf/empty.proto"; + +package taskProcessor; +option go_package = "./taskProcessor"; + + +message Task{ + string merch_uuid = 1; + string origin_surugaya_link = 2; + string origin_mandarake_link = 3; +} + +message Result{ + string merch_uuid = 1; + string origin_name = 2; + uint32 price = 3; +} + +message ProcessorStatusRequest{} + +message ProcessorStatusResponse { + int64 appStart = 1; + int64 lastCheck = 2; + int32 tasksReceived = 3; + int32 tasksInProgress = 4; + int32 tasksFirstTry = 5; + int32 tasksDoneAfterRetry = 6; + int32 tasksFailed = 7; + string workStatus = 8; + int32 numCPUs = 9; + int32 checkPeriod = 10; + int32 retriesCount = 11; + int32 retriesMinutes = 12; +} + +service TaskProcessor { + rpc RequestTask(google.protobuf.Empty) returns (stream Task); + rpc SendResult(stream Result) returns (google.protobuf.Empty); + rpc ProcessorStatus(ProcessorStatusRequest) returns (ProcessorStatusResponse); +} \ No newline at end of file diff --git a/proto/taskProcessor/task.pb.go b/proto/taskProcessor/task.pb.go new file mode 100644 index 0000000..7ff9f0c --- /dev/null +++ b/proto/taskProcessor/task.pb.go @@ -0,0 +1,409 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.8 +// protoc v6.32.0 +// source: task.proto + +package taskProcessor + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Task struct { + state protoimpl.MessageState `protogen:"open.v1"` + MerchUuid string `protobuf:"bytes,1,opt,name=merch_uuid,json=merchUuid,proto3" json:"merch_uuid,omitempty"` + OriginSurugayaLink string `protobuf:"bytes,2,opt,name=origin_surugaya_link,json=originSurugayaLink,proto3" json:"origin_surugaya_link,omitempty"` + OriginMandarakeLink string `protobuf:"bytes,3,opt,name=origin_mandarake_link,json=originMandarakeLink,proto3" json:"origin_mandarake_link,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Task) Reset() { + *x = Task{} + mi := &file_task_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Task) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Task) ProtoMessage() {} + +func (x *Task) ProtoReflect() protoreflect.Message { + mi := &file_task_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Task.ProtoReflect.Descriptor instead. +func (*Task) Descriptor() ([]byte, []int) { + return file_task_proto_rawDescGZIP(), []int{0} +} + +func (x *Task) GetMerchUuid() string { + if x != nil { + return x.MerchUuid + } + return "" +} + +func (x *Task) GetOriginSurugayaLink() string { + if x != nil { + return x.OriginSurugayaLink + } + return "" +} + +func (x *Task) GetOriginMandarakeLink() string { + if x != nil { + return x.OriginMandarakeLink + } + return "" +} + +type Result struct { + state protoimpl.MessageState `protogen:"open.v1"` + MerchUuid string `protobuf:"bytes,1,opt,name=merch_uuid,json=merchUuid,proto3" json:"merch_uuid,omitempty"` + OriginName string `protobuf:"bytes,2,opt,name=origin_name,json=originName,proto3" json:"origin_name,omitempty"` + Price uint32 `protobuf:"varint,3,opt,name=price,proto3" json:"price,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Result) Reset() { + *x = Result{} + mi := &file_task_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Result) ProtoMessage() {} + +func (x *Result) ProtoReflect() protoreflect.Message { + mi := &file_task_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Result.ProtoReflect.Descriptor instead. +func (*Result) Descriptor() ([]byte, []int) { + return file_task_proto_rawDescGZIP(), []int{1} +} + +func (x *Result) GetMerchUuid() string { + if x != nil { + return x.MerchUuid + } + return "" +} + +func (x *Result) GetOriginName() string { + if x != nil { + return x.OriginName + } + return "" +} + +func (x *Result) GetPrice() uint32 { + if x != nil { + return x.Price + } + return 0 +} + +type ProcessorStatusRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ProcessorStatusRequest) Reset() { + *x = ProcessorStatusRequest{} + mi := &file_task_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ProcessorStatusRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProcessorStatusRequest) ProtoMessage() {} + +func (x *ProcessorStatusRequest) ProtoReflect() protoreflect.Message { + mi := &file_task_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProcessorStatusRequest.ProtoReflect.Descriptor instead. +func (*ProcessorStatusRequest) Descriptor() ([]byte, []int) { + return file_task_proto_rawDescGZIP(), []int{2} +} + +type ProcessorStatusResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + AppStart int64 `protobuf:"varint,1,opt,name=appStart,proto3" json:"appStart,omitempty"` + LastCheck int64 `protobuf:"varint,2,opt,name=lastCheck,proto3" json:"lastCheck,omitempty"` + TasksReceived int32 `protobuf:"varint,3,opt,name=tasksReceived,proto3" json:"tasksReceived,omitempty"` + TasksInProgress int32 `protobuf:"varint,4,opt,name=tasksInProgress,proto3" json:"tasksInProgress,omitempty"` + TasksFirstTry int32 `protobuf:"varint,5,opt,name=tasksFirstTry,proto3" json:"tasksFirstTry,omitempty"` + TasksDoneAfterRetry int32 `protobuf:"varint,6,opt,name=tasksDoneAfterRetry,proto3" json:"tasksDoneAfterRetry,omitempty"` + TasksFailed int32 `protobuf:"varint,7,opt,name=tasksFailed,proto3" json:"tasksFailed,omitempty"` + WorkStatus string `protobuf:"bytes,8,opt,name=workStatus,proto3" json:"workStatus,omitempty"` + NumCPUs int32 `protobuf:"varint,9,opt,name=numCPUs,proto3" json:"numCPUs,omitempty"` + CheckPeriod int32 `protobuf:"varint,10,opt,name=checkPeriod,proto3" json:"checkPeriod,omitempty"` + RetriesCount int32 `protobuf:"varint,11,opt,name=retriesCount,proto3" json:"retriesCount,omitempty"` + RetriesMinutes int32 `protobuf:"varint,12,opt,name=retriesMinutes,proto3" json:"retriesMinutes,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ProcessorStatusResponse) Reset() { + *x = ProcessorStatusResponse{} + mi := &file_task_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ProcessorStatusResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProcessorStatusResponse) ProtoMessage() {} + +func (x *ProcessorStatusResponse) ProtoReflect() protoreflect.Message { + mi := &file_task_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProcessorStatusResponse.ProtoReflect.Descriptor instead. +func (*ProcessorStatusResponse) Descriptor() ([]byte, []int) { + return file_task_proto_rawDescGZIP(), []int{3} +} + +func (x *ProcessorStatusResponse) GetAppStart() int64 { + if x != nil { + return x.AppStart + } + return 0 +} + +func (x *ProcessorStatusResponse) GetLastCheck() int64 { + if x != nil { + return x.LastCheck + } + return 0 +} + +func (x *ProcessorStatusResponse) GetTasksReceived() int32 { + if x != nil { + return x.TasksReceived + } + return 0 +} + +func (x *ProcessorStatusResponse) GetTasksInProgress() int32 { + if x != nil { + return x.TasksInProgress + } + return 0 +} + +func (x *ProcessorStatusResponse) GetTasksFirstTry() int32 { + if x != nil { + return x.TasksFirstTry + } + return 0 +} + +func (x *ProcessorStatusResponse) GetTasksDoneAfterRetry() int32 { + if x != nil { + return x.TasksDoneAfterRetry + } + return 0 +} + +func (x *ProcessorStatusResponse) GetTasksFailed() int32 { + if x != nil { + return x.TasksFailed + } + return 0 +} + +func (x *ProcessorStatusResponse) GetWorkStatus() string { + if x != nil { + return x.WorkStatus + } + return "" +} + +func (x *ProcessorStatusResponse) GetNumCPUs() int32 { + if x != nil { + return x.NumCPUs + } + return 0 +} + +func (x *ProcessorStatusResponse) GetCheckPeriod() int32 { + if x != nil { + return x.CheckPeriod + } + return 0 +} + +func (x *ProcessorStatusResponse) GetRetriesCount() int32 { + if x != nil { + return x.RetriesCount + } + return 0 +} + +func (x *ProcessorStatusResponse) GetRetriesMinutes() int32 { + if x != nil { + return x.RetriesMinutes + } + return 0 +} + +var File_task_proto protoreflect.FileDescriptor + +const file_task_proto_rawDesc = "" + + "\n" + + "\n" + + "task.proto\x12\rtaskProcessor\x1a\x1bgoogle/protobuf/empty.proto\"\x8b\x01\n" + + "\x04Task\x12\x1d\n" + + "\n" + + "merch_uuid\x18\x01 \x01(\tR\tmerchUuid\x120\n" + + "\x14origin_surugaya_link\x18\x02 \x01(\tR\x12originSurugayaLink\x122\n" + + "\x15origin_mandarake_link\x18\x03 \x01(\tR\x13originMandarakeLink\"^\n" + + "\x06Result\x12\x1d\n" + + "\n" + + "merch_uuid\x18\x01 \x01(\tR\tmerchUuid\x12\x1f\n" + + "\vorigin_name\x18\x02 \x01(\tR\n" + + "originName\x12\x14\n" + + "\x05price\x18\x03 \x01(\rR\x05price\"\x18\n" + + "\x16ProcessorStatusRequest\"\xc5\x03\n" + + "\x17ProcessorStatusResponse\x12\x1a\n" + + "\bappStart\x18\x01 \x01(\x03R\bappStart\x12\x1c\n" + + "\tlastCheck\x18\x02 \x01(\x03R\tlastCheck\x12$\n" + + "\rtasksReceived\x18\x03 \x01(\x05R\rtasksReceived\x12(\n" + + "\x0ftasksInProgress\x18\x04 \x01(\x05R\x0ftasksInProgress\x12$\n" + + "\rtasksFirstTry\x18\x05 \x01(\x05R\rtasksFirstTry\x120\n" + + "\x13tasksDoneAfterRetry\x18\x06 \x01(\x05R\x13tasksDoneAfterRetry\x12 \n" + + "\vtasksFailed\x18\a \x01(\x05R\vtasksFailed\x12\x1e\n" + + "\n" + + "workStatus\x18\b \x01(\tR\n" + + "workStatus\x12\x18\n" + + "\anumCPUs\x18\t \x01(\x05R\anumCPUs\x12 \n" + + "\vcheckPeriod\x18\n" + + " \x01(\x05R\vcheckPeriod\x12\"\n" + + "\fretriesCount\x18\v \x01(\x05R\fretriesCount\x12&\n" + + "\x0eretriesMinutes\x18\f \x01(\x05R\x0eretriesMinutes2\xee\x01\n" + + "\rTaskProcessor\x12<\n" + + "\vRequestTask\x12\x16.google.protobuf.Empty\x1a\x13.taskProcessor.Task0\x01\x12=\n" + + "\n" + + "SendResult\x12\x15.taskProcessor.Result\x1a\x16.google.protobuf.Empty(\x01\x12`\n" + + "\x0fProcessorStatus\x12%.taskProcessor.ProcessorStatusRequest\x1a&.taskProcessor.ProcessorStatusResponseB\x11Z\x0f./taskProcessorb\x06proto3" + +var ( + file_task_proto_rawDescOnce sync.Once + file_task_proto_rawDescData []byte +) + +func file_task_proto_rawDescGZIP() []byte { + file_task_proto_rawDescOnce.Do(func() { + file_task_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_task_proto_rawDesc), len(file_task_proto_rawDesc))) + }) + return file_task_proto_rawDescData +} + +var file_task_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_task_proto_goTypes = []any{ + (*Task)(nil), // 0: taskProcessor.Task + (*Result)(nil), // 1: taskProcessor.Result + (*ProcessorStatusRequest)(nil), // 2: taskProcessor.ProcessorStatusRequest + (*ProcessorStatusResponse)(nil), // 3: taskProcessor.ProcessorStatusResponse + (*emptypb.Empty)(nil), // 4: google.protobuf.Empty +} +var file_task_proto_depIdxs = []int32{ + 4, // 0: taskProcessor.TaskProcessor.RequestTask:input_type -> google.protobuf.Empty + 1, // 1: taskProcessor.TaskProcessor.SendResult:input_type -> taskProcessor.Result + 2, // 2: taskProcessor.TaskProcessor.ProcessorStatus:input_type -> taskProcessor.ProcessorStatusRequest + 0, // 3: taskProcessor.TaskProcessor.RequestTask:output_type -> taskProcessor.Task + 4, // 4: taskProcessor.TaskProcessor.SendResult:output_type -> google.protobuf.Empty + 3, // 5: taskProcessor.TaskProcessor.ProcessorStatus:output_type -> taskProcessor.ProcessorStatusResponse + 3, // [3:6] is the sub-list for method output_type + 0, // [0:3] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_task_proto_init() } +func file_task_proto_init() { + if File_task_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_task_proto_rawDesc), len(file_task_proto_rawDesc)), + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_task_proto_goTypes, + DependencyIndexes: file_task_proto_depIdxs, + MessageInfos: file_task_proto_msgTypes, + }.Build() + File_task_proto = out.File + file_task_proto_goTypes = nil + file_task_proto_depIdxs = nil +} diff --git a/proto/taskProcessor/task_grpc.pb.go b/proto/taskProcessor/task_grpc.pb.go new file mode 100644 index 0000000..487a7d8 --- /dev/null +++ b/proto/taskProcessor/task_grpc.pb.go @@ -0,0 +1,195 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.32.0 +// source: task.proto + +package taskProcessor + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + TaskProcessor_RequestTask_FullMethodName = "/taskProcessor.TaskProcessor/RequestTask" + TaskProcessor_SendResult_FullMethodName = "/taskProcessor.TaskProcessor/SendResult" + TaskProcessor_ProcessorStatus_FullMethodName = "/taskProcessor.TaskProcessor/ProcessorStatus" +) + +// TaskProcessorClient is the client API for TaskProcessor service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TaskProcessorClient interface { + RequestTask(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Task], error) + SendResult(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Result, emptypb.Empty], error) + ProcessorStatus(ctx context.Context, in *ProcessorStatusRequest, opts ...grpc.CallOption) (*ProcessorStatusResponse, error) +} + +type taskProcessorClient struct { + cc grpc.ClientConnInterface +} + +func NewTaskProcessorClient(cc grpc.ClientConnInterface) TaskProcessorClient { + return &taskProcessorClient{cc} +} + +func (c *taskProcessorClient) RequestTask(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (grpc.ServerStreamingClient[Task], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &TaskProcessor_ServiceDesc.Streams[0], TaskProcessor_RequestTask_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[emptypb.Empty, Task]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TaskProcessor_RequestTaskClient = grpc.ServerStreamingClient[Task] + +func (c *taskProcessorClient) SendResult(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[Result, emptypb.Empty], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &TaskProcessor_ServiceDesc.Streams[1], TaskProcessor_SendResult_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[Result, emptypb.Empty]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TaskProcessor_SendResultClient = grpc.ClientStreamingClient[Result, emptypb.Empty] + +func (c *taskProcessorClient) ProcessorStatus(ctx context.Context, in *ProcessorStatusRequest, opts ...grpc.CallOption) (*ProcessorStatusResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(ProcessorStatusResponse) + err := c.cc.Invoke(ctx, TaskProcessor_ProcessorStatus_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TaskProcessorServer is the server API for TaskProcessor service. +// All implementations must embed UnimplementedTaskProcessorServer +// for forward compatibility. +type TaskProcessorServer interface { + RequestTask(*emptypb.Empty, grpc.ServerStreamingServer[Task]) error + SendResult(grpc.ClientStreamingServer[Result, emptypb.Empty]) error + ProcessorStatus(context.Context, *ProcessorStatusRequest) (*ProcessorStatusResponse, error) + mustEmbedUnimplementedTaskProcessorServer() +} + +// UnimplementedTaskProcessorServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedTaskProcessorServer struct{} + +func (UnimplementedTaskProcessorServer) RequestTask(*emptypb.Empty, grpc.ServerStreamingServer[Task]) error { + return status.Errorf(codes.Unimplemented, "method RequestTask not implemented") +} +func (UnimplementedTaskProcessorServer) SendResult(grpc.ClientStreamingServer[Result, emptypb.Empty]) error { + return status.Errorf(codes.Unimplemented, "method SendResult not implemented") +} +func (UnimplementedTaskProcessorServer) ProcessorStatus(context.Context, *ProcessorStatusRequest) (*ProcessorStatusResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ProcessorStatus not implemented") +} +func (UnimplementedTaskProcessorServer) mustEmbedUnimplementedTaskProcessorServer() {} +func (UnimplementedTaskProcessorServer) testEmbeddedByValue() {} + +// UnsafeTaskProcessorServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TaskProcessorServer will +// result in compilation errors. +type UnsafeTaskProcessorServer interface { + mustEmbedUnimplementedTaskProcessorServer() +} + +func RegisterTaskProcessorServer(s grpc.ServiceRegistrar, srv TaskProcessorServer) { + // If the following call pancis, it indicates UnimplementedTaskProcessorServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&TaskProcessor_ServiceDesc, srv) +} + +func _TaskProcessor_RequestTask_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(emptypb.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(TaskProcessorServer).RequestTask(m, &grpc.GenericServerStream[emptypb.Empty, Task]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TaskProcessor_RequestTaskServer = grpc.ServerStreamingServer[Task] + +func _TaskProcessor_SendResult_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TaskProcessorServer).SendResult(&grpc.GenericServerStream[Result, emptypb.Empty]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TaskProcessor_SendResultServer = grpc.ClientStreamingServer[Result, emptypb.Empty] + +func _TaskProcessor_ProcessorStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ProcessorStatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TaskProcessorServer).ProcessorStatus(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TaskProcessor_ProcessorStatus_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TaskProcessorServer).ProcessorStatus(ctx, req.(*ProcessorStatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// TaskProcessor_ServiceDesc is the grpc.ServiceDesc for TaskProcessor service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TaskProcessor_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "taskProcessor.TaskProcessor", + HandlerType: (*TaskProcessorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ProcessorStatus", + Handler: _TaskProcessor_ProcessorStatus_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "RequestTask", + Handler: _TaskProcessor_RequestTask_Handler, + ServerStreams: true, + }, + { + StreamName: "SendResult", + Handler: _TaskProcessor_SendResult_Handler, + ClientStreams: true, + }, + }, + Metadata: "task.proto", +}