From 38829c0179d6450a0bf4835b51218b6d0148dc57 Mon Sep 17 00:00:00 2001 From: nquidox Date: Fri, 3 Oct 2025 19:17:01 +0300 Subject: [PATCH] huge refactor --- cmd/main.go | 6 +- config.env | 12 +- config/config.go | 29 +++- go.mod | 2 +- internal/app/app.go | 78 ++++----- internal/app/client.go | 30 +++- internal/app/server.go | 6 +- internal/appState/state.go | 2 +- internal/network/interface.go | 4 +- internal/network/recieve.go | 4 +- internal/network/send.go | 4 +- internal/parsers/interface.go | 6 +- internal/parsers/mandarake.go | 7 +- internal/parsers/surugaya.go | 71 +++++++- internal/processor/handler.go | 10 +- internal/processor/service.go | 52 +++--- internal/processor/worker.go | 18 -- internal/shared/task.go | 7 +- proto/scrapper.proto | 19 ++ proto/surugayaScrapper/scrapper.pb.go | 193 +++++++++++++++++++++ proto/surugayaScrapper/scrapper_grpc.pb.go | 115 ++++++++++++ proto/task.proto | 4 +- proto/taskProcessor/task.pb.go | 6 +- 23 files changed, 544 insertions(+), 141 deletions(-) delete mode 100644 internal/processor/worker.go create mode 100644 proto/scrapper.proto create mode 100644 proto/surugayaScrapper/scrapper.pb.go create mode 100644 proto/surugayaScrapper/scrapper_grpc.pb.go diff --git a/cmd/main.go b/cmd/main.go index 95e13e4..1fd8de8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,9 +1,9 @@ package main import ( - "parsing-service/config" - "parsing-service/internal/app" - "parsing-service/internal/logging" + "task-processor/config" + "task-processor/internal/app" + "task-processor/internal/logging" ) func main() { diff --git a/config.env b/config.env index 4fabc68..cf0a214 100644 --- a/config.env +++ b/config.env @@ -1,9 +1,15 @@ -APP_HOST=0.0.0.0 -APP_CLIENT_PORT=9050 -APP_SERVER_PORT=9060 APP_LOG_LEVEL=error APP_NUMCPUS=-1 APP_CHECK_PERIOD=6 +GRPC_SERVER_HOST=0.0.0.0 +GRPC_SERVER_PORT=9060 + +GRPC_API_CLIENT_HOST=0.0.0.0 +GRPC_API_CLIENT_PORT=9050 + +GRPC_SURUGAYA_SCRAPPER_HOST=0.0.0.0 +GRPC_SURUGAYA_SCRAPPER_PORT=9070 + TASK_RETRY_COUNT=3 TASK_RETRY_MINUTES=5 \ No newline at end of file diff --git a/config/config.go b/config/config.go index 507311c..66900d2 100644 --- a/config/config.go +++ b/config/config.go @@ -7,13 +7,20 @@ import ( ) type Config struct { - Host string - ClientPort string - ServerPort string LogLevel string NumCPUs int CheckPeriod int - TasksConfig TasksConfig + TasksCfg TasksConfig + GrpcCfg GrpcConfig +} + +type GrpcConfig struct { + ServerHost string + ServerPort string + ApiClientHost string + ApiClientPort string + SurugayaScrapperHost string + SurugayaScrapperPort string } type TasksConfig struct { @@ -23,14 +30,20 @@ type TasksConfig struct { func NewConfig() *Config { return &Config{ - Host: getEnv("APP_HOST", "0.0.0.0"), - ClientPort: getEnv("APP_PORT", "9050"), - ServerPort: getEnv("APP_SERVER_PORT", "9060"), LogLevel: getEnv("APP_LOG_LEVEL", "debug"), NumCPUs: getEnvInt("APP_NUMCPUS", -1), CheckPeriod: getEnvInt("APP_CHECK_PERIOD", 6), - TasksConfig: TasksConfig{ + GrpcCfg: GrpcConfig{ + ServerHost: getEnv("GRPC_SERVER_HOST", "0.0.0.0"), + ServerPort: getEnv("GRPC_SERVER_PORT", "9060"), + ApiClientHost: getEnv("GRPC_API_CLIENT_HOST", "0.0.0.0"), + ApiClientPort: getEnv("GRPC_API_CLIENT_PORT", "9050"), + SurugayaScrapperHost: getEnv("GRPC_SURGAYA_SCRAPPER_HOST", "0.0.0.0"), + SurugayaScrapperPort: getEnv("GRPC_SURGAYA_SCRAPPER_PORT", "9070"), + }, + + TasksCfg: TasksConfig{ RetryCount: getEnvInt("TASK_RETRY_COUNT", 3), RetryMinutes: getEnvInt("TASK_RETRY_MINUTES", 5), }, diff --git a/go.mod b/go.mod index 72b6afb..85b30f9 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module parsing-service +module task-processor go 1.25.1 diff --git a/internal/app/app.go b/internal/app/app.go index 4551b3b..d9875ae 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -6,27 +6,26 @@ import ( "net" "os" "os/signal" - "parsing-service/config" - "parsing-service/internal/appState" - "parsing-service/internal/network" - "parsing-service/internal/parsers" - "parsing-service/internal/processor" - "parsing-service/internal/shared" "runtime" "syscall" + "task-processor/config" + "task-processor/internal/appState" + "task-processor/internal/network" + "task-processor/internal/parsers" + "task-processor/internal/processor" + "task-processor/internal/shared" "time" ) type App struct { - ClientAddress string - ServerAddress string - NumCPUs int - CheckPeriod time.Duration - StartTime time.Time - RetryCount int - RetryMinutes int - State *appState.State - Network *network.Network + config *config.Config + checkPeriod time.Duration + startTime time.Time + retryCount int + retryMinutes int + state *appState.State + network *network.Network + numCPUs int } func New(c *config.Config) *App { @@ -35,18 +34,17 @@ func New(c *config.Config) *App { numCPUs = runtime.NumCPU() } - st := appState.NewState(numCPUs, c.CheckPeriod, c.TasksConfig.RetryCount, c.TasksConfig.RetryMinutes) + st := appState.NewState(numCPUs, c.CheckPeriod, c.TasksCfg.RetryCount, c.TasksCfg.RetryMinutes) return &App{ - ClientAddress: c.Host + ":" + c.ClientPort, - ServerAddress: c.Host + ":" + c.ServerPort, - NumCPUs: numCPUs, - CheckPeriod: time.Duration(c.CheckPeriod), - StartTime: time.Now(), - RetryCount: c.TasksConfig.RetryCount, - RetryMinutes: c.TasksConfig.RetryMinutes, - State: st, - Network: network.NewHandler(), + config: c, + checkPeriod: time.Duration(c.CheckPeriod), + startTime: time.Now(), + retryCount: c.TasksCfg.RetryCount, + retryMinutes: c.TasksCfg.RetryMinutes, + state: st, + network: network.NewHandler(), + numCPUs: numCPUs, } } @@ -56,38 +54,40 @@ func (app *App) Run() { log.Info("Application start") log.WithFields(log.Fields{ - "ClientAddress": app.ClientAddress, - "Number of CPUs": app.NumCPUs, + "Service address": app.config.GrpcCfg.ServerHost + ":" + app.config.GrpcCfg.ServerPort, + "Number of CPUs": app.numCPUs, }).Debug("App settings") server := newServer(app) - client := newClient(app) + apiClient := newApiClient(app.config.GrpcCfg.ApiClientHost + ":" + app.config.GrpcCfg.ApiClientPort) - period := time.NewTicker(app.CheckPeriod * time.Hour) + period := time.NewTicker(app.checkPeriod * time.Hour) defer period.Stop() - sender := make(chan shared.TaskResult, app.NumCPUs*10) + sender := make(chan shared.TaskResult, app.numCPUs*10) + + // external scrapper + surugayaScrapper := newSurugayaScrapperClient(app.config.GrpcCfg.SurugayaScrapperHost + ":" + app.config.GrpcCfg.SurugayaScrapperPort) //task processor handlers := map[string]parsers.TaskHandler{ - shared.OriginSurugaya: parsers.NewSurugayaParser(), + shared.OriginSurugaya: parsers.NewSurugayaParser(ctx, surugayaScrapper), shared.OriginMandarake: parsers.NewMandarakeParser(), } taskProcessor := processor.New(processor.Deps{ Handlers: handlers, Out: sender, - State: app.State, + State: app.state, Ctx: ctx, - Client: client, - NumCPUs: app.NumCPUs, + NumCPUs: app.numCPUs, }) process := func() { - app.State.SetStatus(appState.StatusRequestTasks) + app.state.SetStatus(appState.StatusRequestTasks) log.Info("Requesting data for parsing") - receivedTasks := app.Network.RequestTasks(ctx, client) + receivedTasks := app.network.RequestTasks(ctx, apiClient) log.WithField("length", len(receivedTasks)).Debug("End receiving") taskProcessor.StartWork(receivedTasks) @@ -116,7 +116,7 @@ func (app *App) Run() { l := len(sendData) if l > 0 { log.WithField("length", l).Debug("Sending parsed data") - app.Network.SendResult(client, sendData) + app.network.SendResult(apiClient, sendData) sendData = sendData[:0] } } @@ -125,12 +125,12 @@ func (app *App) Run() { //gRPC Server for status response go func() { - listener, err := net.Listen("tcp", app.ServerAddress) + listener, err := net.Listen("tcp", app.config.GrpcCfg.ServerHost+":"+app.config.GrpcCfg.ServerPort) if err != nil { log.Fatalf("failed to listen: %v", err) } - log.Infof("gRPC Server listening at %v", app.ServerAddress) + log.Infof("gRPC Server listening at %v", app.config.GrpcCfg.ServerHost+":"+app.config.GrpcCfg.ServerPort) if err := server.Serve(listener); err != nil { log.Fatalf("failed to serve: %v", err) } diff --git a/internal/app/client.go b/internal/app/client.go index 1058a61..04c16af 100644 --- a/internal/app/client.go +++ b/internal/app/client.go @@ -4,18 +4,40 @@ import ( log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - pb "parsing-service/proto/taskProcessor" + sc "task-processor/proto/surugayaScrapper" + tp "task-processor/proto/taskProcessor" ) -func newClient(app *App) pb.TaskProcessorClient { +func newApiClient(address string) tp.TaskProcessorClient { var opts []grpc.DialOption insec := grpc.WithTransportCredentials(insecure.NewCredentials()) opts = append(opts, insec) - conn, err := grpc.NewClient(app.ClientAddress, opts...) + conn, err := grpc.NewClient(address, opts...) if err != nil { log.Fatal(err) } - return pb.NewTaskProcessorClient(conn) + log.WithFields(log.Fields{ + "address": address, + }).Debug("gRPC | API client") + + return tp.NewTaskProcessorClient(conn) +} + +func newSurugayaScrapperClient(address string) sc.SurugayaScrapperClient { + var opts []grpc.DialOption + insec := grpc.WithTransportCredentials(insecure.NewCredentials()) + opts = append(opts, insec) + + conn, err := grpc.NewClient(address, opts...) + if err != nil { + log.Fatal(err) + } + + log.WithFields(log.Fields{ + "address": address, + }).Debug("gRPC | Surugaya scrapper client") + + return sc.NewSurugayaScrapperClient(conn) } diff --git a/internal/app/server.go b/internal/app/server.go index a2bc864..f837430 100644 --- a/internal/app/server.go +++ b/internal/app/server.go @@ -3,8 +3,8 @@ package app import ( "context" "google.golang.org/grpc" - "parsing-service/internal/appState" - pb "parsing-service/proto/taskProcessor" + "task-processor/internal/appState" + pb "task-processor/proto/taskProcessor" ) type Server struct { @@ -15,7 +15,7 @@ type Server struct { func newServer(app *App) *grpc.Server { s := grpc.NewServer() srv := &Server{ - state: app.State, + state: app.state, } pb.RegisterTaskProcessorServer(s, srv) return s diff --git a/internal/appState/state.go b/internal/appState/state.go index 04ad62f..9da344b 100644 --- a/internal/appState/state.go +++ b/internal/appState/state.go @@ -1,8 +1,8 @@ package appState import ( - pb "parsing-service/proto/taskProcessor" "sync/atomic" + pb "task-processor/proto/taskProcessor" "time" ) diff --git a/internal/network/interface.go b/internal/network/interface.go index 1225549..43e14ad 100644 --- a/internal/network/interface.go +++ b/internal/network/interface.go @@ -2,8 +2,8 @@ package network import ( "context" - "parsing-service/internal/shared" - pb "parsing-service/proto/taskProcessor" + "task-processor/internal/shared" + pb "task-processor/proto/taskProcessor" ) type Handler interface { diff --git a/internal/network/recieve.go b/internal/network/recieve.go index 40b2104..638ac30 100644 --- a/internal/network/recieve.go +++ b/internal/network/recieve.go @@ -5,8 +5,8 @@ import ( log "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/emptypb" "io" - "parsing-service/internal/shared" - pb "parsing-service/proto/taskProcessor" + "task-processor/internal/shared" + pb "task-processor/proto/taskProcessor" ) func (n *Network) RequestTasks(ctx context.Context, client pb.TaskProcessorClient) []shared.TaskResponse { diff --git a/internal/network/send.go b/internal/network/send.go index 44b976e..0c079e6 100644 --- a/internal/network/send.go +++ b/internal/network/send.go @@ -3,8 +3,8 @@ package network import ( "context" log "github.com/sirupsen/logrus" - "parsing-service/internal/shared" - pb "parsing-service/proto/taskProcessor" + "task-processor/internal/shared" + pb "task-processor/proto/taskProcessor" ) func (n *Network) SendResult(client pb.TaskProcessorClient, tasksDone []shared.TaskResult) { diff --git a/internal/parsers/interface.go b/internal/parsers/interface.go index b9fc4ae..d3030f5 100644 --- a/internal/parsers/interface.go +++ b/internal/parsers/interface.go @@ -1,10 +1,10 @@ package parsers import ( - "parsing-service/internal/appState" - "parsing-service/internal/shared" + "task-processor/internal/appState" + "task-processor/internal/shared" ) type TaskHandler interface { - HandleTask(task shared.Task, sender chan shared.TaskResult, state *appState.State) error + HandleTasks(task []shared.Task, sender chan shared.TaskResult, state *appState.State) } diff --git a/internal/parsers/mandarake.go b/internal/parsers/mandarake.go index 708a17b..cbf9bd0 100644 --- a/internal/parsers/mandarake.go +++ b/internal/parsers/mandarake.go @@ -2,8 +2,8 @@ package parsers import ( log "github.com/sirupsen/logrus" - "parsing-service/internal/appState" - "parsing-service/internal/shared" + "task-processor/internal/appState" + "task-processor/internal/shared" ) type MandarakeParser struct{} @@ -12,7 +12,6 @@ func NewMandarakeParser() *MandarakeParser { return &MandarakeParser{} } -func (s *MandarakeParser) HandleTask(task shared.Task, sender chan shared.TaskResult, state *appState.State) error { +func (s *MandarakeParser) HandleTasks(task []shared.Task, sender chan shared.TaskResult, state *appState.State) { log.Debug("Handling Mandarake Task") - return nil } diff --git a/internal/parsers/surugaya.go b/internal/parsers/surugaya.go index 373fb08..1f4a08c 100644 --- a/internal/parsers/surugaya.go +++ b/internal/parsers/surugaya.go @@ -1,18 +1,71 @@ package parsers import ( + "context" log "github.com/sirupsen/logrus" - "parsing-service/internal/appState" - "parsing-service/internal/shared" + "io" + "task-processor/internal/appState" + "task-processor/internal/shared" + sc "task-processor/proto/surugayaScrapper" ) -type SurugayaParser struct{} - -func NewSurugayaParser() *SurugayaParser { - return &SurugayaParser{} +type SurugayaParser struct { + scrapper sc.SurugayaScrapperClient + ctx context.Context } -func (s *SurugayaParser) HandleTask(task shared.Task, sender chan shared.TaskResult, state *appState.State) error { - log.Debug("Handling Surugaya Task") - return nil +func NewSurugayaParser(ctx context.Context, scrapper sc.SurugayaScrapperClient) *SurugayaParser { + log.Debug("Surugaya parser init") + return &SurugayaParser{ + scrapper: scrapper, + ctx: ctx, + } +} + +func (s *SurugayaParser) HandleTasks(task []shared.Task, sender chan shared.TaskResult, state *appState.State) { + log.WithField("count", len(task)).Debug("Handling Surugaya Tasks") + + stream, err := s.scrapper.ProcessTasks(s.ctx) + if err != nil { + log.WithField("err", err).Error("Error creating stream") + return + } + + for _, t := range task { + if err = stream.Send(&sc.Task{ + MerchUuid: t.MerchUuid, + Link: t.Link, + }); err != nil { + log.WithField("err", err).Error("Error sending task") + return + } + } + + if err = stream.CloseSend(); err != nil { + log.WithError(err).Warn("Failed to close send stream") + } + + counter := 0 + for { + result, err := stream.Recv() + + if err == io.EOF { + break + } + + if err != nil { + log.WithError(err).Error("Error receiving result") + return + } + + sender <- shared.TaskResult{ + MerchUuid: result.GetMerchUuid(), + Origin: shared.OriginSurugaya, + Price: result.GetPrice(), + } + counter++ + } + + log.WithField("count", counter).Debug("All Surugaya results received") + } diff --git a/internal/processor/handler.go b/internal/processor/handler.go index 782bb20..3cde11e 100644 --- a/internal/processor/handler.go +++ b/internal/processor/handler.go @@ -2,10 +2,9 @@ package processor import ( "context" - "parsing-service/internal/appState" - "parsing-service/internal/parsers" - "parsing-service/internal/shared" - pb "parsing-service/proto/taskProcessor" + "task-processor/internal/appState" + "task-processor/internal/parsers" + "task-processor/internal/shared" ) type Processor struct { @@ -13,7 +12,6 @@ type Processor struct { out chan shared.TaskResult state *appState.State ctx context.Context - client pb.TaskProcessorClient numCPUs int } @@ -22,7 +20,6 @@ type Deps struct { Out chan shared.TaskResult State *appState.State Ctx context.Context - Client pb.TaskProcessorClient NumCPUs int } @@ -32,7 +29,6 @@ func New(deps Deps) *Processor { out: deps.Out, state: deps.State, ctx: deps.Ctx, - client: deps.Client, numCPUs: deps.NumCPUs, } } diff --git a/internal/processor/service.go b/internal/processor/service.go index 684f502..96eb10e 100644 --- a/internal/processor/service.go +++ b/internal/processor/service.go @@ -2,34 +2,33 @@ package processor import ( log "github.com/sirupsen/logrus" - "parsing-service/internal/appState" - "parsing-service/internal/shared" "sync" + "task-processor/internal/appState" + "task-processor/internal/shared" ) func (p *Processor) StartWork(receivedTasks []shared.TaskResponse) { log.Info("Starting work...") p.state.ResetCounters() - in := make(chan shared.Task, p.numCPUs*10) + if len(receivedTasks) == 0 { + p.state.SetStatus(appState.StatusIdle) + return + } - wg := &sync.WaitGroup{} - for i := 0; i < p.numCPUs*10; i++ { + p.state.SetStatus(appState.StatusWorkInProgress) + p.state.SetTasksReceived(len(receivedTasks)) + + sorted := p.sortTasks(receivedTasks) + + var wg sync.WaitGroup + for origin, tasks := range sorted { wg.Add(1) - go func() { + go func(origin string, tasks []shared.Task) { defer wg.Done() - p.worker(in) - }() + p.handlers[origin].HandleTasks(tasks, p.out, p.state) + }(origin, tasks) } - - tasksNumber := len(receivedTasks) - if tasksNumber > 0 { - p.state.SetStatus(appState.StatusWorkInProgress) - p.state.SetTasksReceived(tasksNumber) - p.sortTasks(in, receivedTasks) - } - - close(in) wg.Wait() log.Debug("All goroutines finished") @@ -38,23 +37,24 @@ func (p *Processor) StartWork(receivedTasks []shared.TaskResponse) { log.Debugf("State | %+v", p.state) } -func (p *Processor) sortTasks(in chan<- shared.Task, receivedTasks []shared.TaskResponse) { +func (p *Processor) sortTasks(receivedTasks []shared.TaskResponse) map[string][]shared.Task { + sorted := make(map[string][]shared.Task, len(receivedTasks)) for _, task := range receivedTasks { switch { case task.OriginSurugayaLink != "": - in <- shared.Task{ - MerchUuid: task.MerchUuid, - Origin: shared.OriginSurugaya, - Link: task.OriginSurugayaLink, - RetryCount: 3, - } + sorted[shared.OriginSurugaya] = append(sorted[shared.OriginSurugaya], shared.Task{ + MerchUuid: task.MerchUuid, + Origin: shared.OriginSurugaya, + Link: task.OriginSurugayaLink, + }) case task.OriginMandarakeLink != "": - in <- shared.Task{ + sorted[shared.OriginMandarake] = append(sorted[shared.OriginMandarake], shared.Task{ MerchUuid: task.MerchUuid, Origin: shared.OriginMandarake, Link: task.OriginMandarakeLink, RetryCount: 3, - } + }) } } + return sorted } diff --git a/internal/processor/worker.go b/internal/processor/worker.go deleted file mode 100644 index 4049042..0000000 --- a/internal/processor/worker.go +++ /dev/null @@ -1,18 +0,0 @@ -package processor - -import ( - log "github.com/sirupsen/logrus" - "parsing-service/internal/shared" -) - -const zeroPrice = 0 //for debug purposes - -func (p *Processor) worker(in <-chan shared.Task) { - for task := range in { - err := p.handlers[task.Origin].HandleTask(task, p.out, p.state) - if err != nil { - log.WithField("err", err).Error("Worker | Handle task") - continue - } - } -} diff --git a/internal/shared/task.go b/internal/shared/task.go index 7be6f20..63ea3ee 100644 --- a/internal/shared/task.go +++ b/internal/shared/task.go @@ -16,5 +16,10 @@ type TaskResponse struct { type TaskResult struct { MerchUuid string Origin string - Price uint32 + Price int32 +} + +type SurugayaTask struct { + MerchUuid string + Link string } diff --git a/proto/scrapper.proto b/proto/scrapper.proto new file mode 100644 index 0000000..5533910 --- /dev/null +++ b/proto/scrapper.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package surugayaScrapper; + +option go_package = "./surugayaScrapper"; + +message Task{ + string merch_uuid = 1; + string link = 2; +} + +message Result{ + string merch_uuid = 1; + int32 price = 2; +} + +service SurugayaScrapper { + rpc ProcessTasks (stream Task) returns (stream Result); +} diff --git a/proto/surugayaScrapper/scrapper.pb.go b/proto/surugayaScrapper/scrapper.pb.go new file mode 100644 index 0000000..7f2c249 --- /dev/null +++ b/proto/surugayaScrapper/scrapper.pb.go @@ -0,0 +1,193 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.8 +// protoc v6.32.0 +// source: scrapper.proto + +package surugayaScrapper + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Task struct { + state protoimpl.MessageState `protogen:"open.v1"` + MerchUuid string `protobuf:"bytes,1,opt,name=merch_uuid,json=merchUuid,proto3" json:"merch_uuid,omitempty"` + Link string `protobuf:"bytes,2,opt,name=link,proto3" json:"link,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Task) Reset() { + *x = Task{} + mi := &file_scrapper_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Task) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Task) ProtoMessage() {} + +func (x *Task) ProtoReflect() protoreflect.Message { + mi := &file_scrapper_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Task.ProtoReflect.Descriptor instead. +func (*Task) Descriptor() ([]byte, []int) { + return file_scrapper_proto_rawDescGZIP(), []int{0} +} + +func (x *Task) GetMerchUuid() string { + if x != nil { + return x.MerchUuid + } + return "" +} + +func (x *Task) GetLink() string { + if x != nil { + return x.Link + } + return "" +} + +type Result struct { + state protoimpl.MessageState `protogen:"open.v1"` + MerchUuid string `protobuf:"bytes,1,opt,name=merch_uuid,json=merchUuid,proto3" json:"merch_uuid,omitempty"` + Price int32 `protobuf:"varint,2,opt,name=price,proto3" json:"price,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Result) Reset() { + *x = Result{} + mi := &file_scrapper_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Result) ProtoMessage() {} + +func (x *Result) ProtoReflect() protoreflect.Message { + mi := &file_scrapper_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Result.ProtoReflect.Descriptor instead. +func (*Result) Descriptor() ([]byte, []int) { + return file_scrapper_proto_rawDescGZIP(), []int{1} +} + +func (x *Result) GetMerchUuid() string { + if x != nil { + return x.MerchUuid + } + return "" +} + +func (x *Result) GetPrice() int32 { + if x != nil { + return x.Price + } + return 0 +} + +var File_scrapper_proto protoreflect.FileDescriptor + +const file_scrapper_proto_rawDesc = "" + + "\n" + + "\x0escrapper.proto\x12\x10surugayaScrapper\"9\n" + + "\x04Task\x12\x1d\n" + + "\n" + + "merch_uuid\x18\x01 \x01(\tR\tmerchUuid\x12\x12\n" + + "\x04link\x18\x02 \x01(\tR\x04link\"=\n" + + "\x06Result\x12\x1d\n" + + "\n" + + "merch_uuid\x18\x01 \x01(\tR\tmerchUuid\x12\x14\n" + + "\x05price\x18\x02 \x01(\x05R\x05price2X\n" + + "\x10SurugayaScrapper\x12D\n" + + "\fProcessTasks\x12\x16.surugayaScrapper.Task\x1a\x18.surugayaScrapper.Result(\x010\x01B\x14Z\x12./surugayaScrapperb\x06proto3" + +var ( + file_scrapper_proto_rawDescOnce sync.Once + file_scrapper_proto_rawDescData []byte +) + +func file_scrapper_proto_rawDescGZIP() []byte { + file_scrapper_proto_rawDescOnce.Do(func() { + file_scrapper_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_scrapper_proto_rawDesc), len(file_scrapper_proto_rawDesc))) + }) + return file_scrapper_proto_rawDescData +} + +var file_scrapper_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_scrapper_proto_goTypes = []any{ + (*Task)(nil), // 0: surugayaScrapper.Task + (*Result)(nil), // 1: surugayaScrapper.Result +} +var file_scrapper_proto_depIdxs = []int32{ + 0, // 0: surugayaScrapper.SurugayaScrapper.ProcessTasks:input_type -> surugayaScrapper.Task + 1, // 1: surugayaScrapper.SurugayaScrapper.ProcessTasks:output_type -> surugayaScrapper.Result + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_scrapper_proto_init() } +func file_scrapper_proto_init() { + if File_scrapper_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_scrapper_proto_rawDesc), len(file_scrapper_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_scrapper_proto_goTypes, + DependencyIndexes: file_scrapper_proto_depIdxs, + MessageInfos: file_scrapper_proto_msgTypes, + }.Build() + File_scrapper_proto = out.File + file_scrapper_proto_goTypes = nil + file_scrapper_proto_depIdxs = nil +} diff --git a/proto/surugayaScrapper/scrapper_grpc.pb.go b/proto/surugayaScrapper/scrapper_grpc.pb.go new file mode 100644 index 0000000..7daa867 --- /dev/null +++ b/proto/surugayaScrapper/scrapper_grpc.pb.go @@ -0,0 +1,115 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.32.0 +// source: scrapper.proto + +package surugayaScrapper + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + SurugayaScrapper_ProcessTasks_FullMethodName = "/surugayaScrapper.SurugayaScrapper/ProcessTasks" +) + +// SurugayaScrapperClient is the client API for SurugayaScrapper service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SurugayaScrapperClient interface { + ProcessTasks(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Task, Result], error) +} + +type surugayaScrapperClient struct { + cc grpc.ClientConnInterface +} + +func NewSurugayaScrapperClient(cc grpc.ClientConnInterface) SurugayaScrapperClient { + return &surugayaScrapperClient{cc} +} + +func (c *surugayaScrapperClient) ProcessTasks(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Task, Result], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &SurugayaScrapper_ServiceDesc.Streams[0], SurugayaScrapper_ProcessTasks_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[Task, Result]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type SurugayaScrapper_ProcessTasksClient = grpc.BidiStreamingClient[Task, Result] + +// SurugayaScrapperServer is the server API for SurugayaScrapper service. +// All implementations must embed UnimplementedSurugayaScrapperServer +// for forward compatibility. +type SurugayaScrapperServer interface { + ProcessTasks(grpc.BidiStreamingServer[Task, Result]) error + mustEmbedUnimplementedSurugayaScrapperServer() +} + +// UnimplementedSurugayaScrapperServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedSurugayaScrapperServer struct{} + +func (UnimplementedSurugayaScrapperServer) ProcessTasks(grpc.BidiStreamingServer[Task, Result]) error { + return status.Errorf(codes.Unimplemented, "method ProcessTasks not implemented") +} +func (UnimplementedSurugayaScrapperServer) mustEmbedUnimplementedSurugayaScrapperServer() {} +func (UnimplementedSurugayaScrapperServer) testEmbeddedByValue() {} + +// UnsafeSurugayaScrapperServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SurugayaScrapperServer will +// result in compilation errors. +type UnsafeSurugayaScrapperServer interface { + mustEmbedUnimplementedSurugayaScrapperServer() +} + +func RegisterSurugayaScrapperServer(s grpc.ServiceRegistrar, srv SurugayaScrapperServer) { + // If the following call pancis, it indicates UnimplementedSurugayaScrapperServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&SurugayaScrapper_ServiceDesc, srv) +} + +func _SurugayaScrapper_ProcessTasks_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SurugayaScrapperServer).ProcessTasks(&grpc.GenericServerStream[Task, Result]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type SurugayaScrapper_ProcessTasksServer = grpc.BidiStreamingServer[Task, Result] + +// SurugayaScrapper_ServiceDesc is the grpc.ServiceDesc for SurugayaScrapper service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SurugayaScrapper_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "surugayaScrapper.SurugayaScrapper", + HandlerType: (*SurugayaScrapperServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "ProcessTasks", + Handler: _SurugayaScrapper_ProcessTasks_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "scrapper.proto", +} diff --git a/proto/task.proto b/proto/task.proto index bb17f67..35cce81 100644 --- a/proto/task.proto +++ b/proto/task.proto @@ -14,7 +14,7 @@ message Task{ message Result{ string merch_uuid = 1; string origin_name = 2; - uint32 price = 3; + int32 price = 3; } message ProcessorStatusRequest{} @@ -38,4 +38,4 @@ service TaskProcessor { rpc RequestTask(google.protobuf.Empty) returns (stream Task); rpc SendResult(stream Result) returns (google.protobuf.Empty); rpc ProcessorStatus(ProcessorStatusRequest) returns (ProcessorStatusResponse); -} \ No newline at end of file +} diff --git a/proto/taskProcessor/task.pb.go b/proto/taskProcessor/task.pb.go index 7ff9f0c..886cbfa 100644 --- a/proto/taskProcessor/task.pb.go +++ b/proto/taskProcessor/task.pb.go @@ -86,7 +86,7 @@ type Result struct { state protoimpl.MessageState `protogen:"open.v1"` MerchUuid string `protobuf:"bytes,1,opt,name=merch_uuid,json=merchUuid,proto3" json:"merch_uuid,omitempty"` OriginName string `protobuf:"bytes,2,opt,name=origin_name,json=originName,proto3" json:"origin_name,omitempty"` - Price uint32 `protobuf:"varint,3,opt,name=price,proto3" json:"price,omitempty"` + Price int32 `protobuf:"varint,3,opt,name=price,proto3" json:"price,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -135,7 +135,7 @@ func (x *Result) GetOriginName() string { return "" } -func (x *Result) GetPrice() uint32 { +func (x *Result) GetPrice() int32 { if x != nil { return x.Price } @@ -326,7 +326,7 @@ const file_task_proto_rawDesc = "" + "merch_uuid\x18\x01 \x01(\tR\tmerchUuid\x12\x1f\n" + "\vorigin_name\x18\x02 \x01(\tR\n" + "originName\x12\x14\n" + - "\x05price\x18\x03 \x01(\rR\x05price\"\x18\n" + + "\x05price\x18\x03 \x01(\x05R\x05price\"\x18\n" + "\x16ProcessorStatusRequest\"\xc5\x03\n" + "\x17ProcessorStatusResponse\x12\x1a\n" + "\bappStart\x18\x01 \x01(\x03R\bappStart\x12\x1c\n" +