basic app

This commit is contained in:
nquidox 2025-10-02 20:35:53 +03:00
parent 0a6e246a5c
commit 8d6c2b6687
30 changed files with 1469 additions and 0 deletions

150
internal/app/app.go Normal file
View file

@ -0,0 +1,150 @@
package app
import (
"context"
log "github.com/sirupsen/logrus"
"net"
"os"
"os/signal"
"parsing-service/config"
"parsing-service/internal/appState"
"parsing-service/internal/network"
"parsing-service/internal/parsers"
"parsing-service/internal/processor"
"parsing-service/internal/shared"
"runtime"
"syscall"
"time"
)
type App struct {
ClientAddress string
ServerAddress string
NumCPUs int
CheckPeriod time.Duration
StartTime time.Time
RetryCount int
RetryMinutes int
State *appState.State
Network *network.Network
}
func New(c *config.Config) *App {
numCPUs := c.NumCPUs
if numCPUs < 1 {
numCPUs = runtime.NumCPU()
}
st := appState.NewState(numCPUs, c.CheckPeriod, c.TasksConfig.RetryCount, c.TasksConfig.RetryMinutes)
return &App{
ClientAddress: c.Host + ":" + c.ClientPort,
ServerAddress: c.Host + ":" + c.ServerPort,
NumCPUs: numCPUs,
CheckPeriod: time.Duration(c.CheckPeriod),
StartTime: time.Now(),
RetryCount: c.TasksConfig.RetryCount,
RetryMinutes: c.TasksConfig.RetryMinutes,
State: st,
Network: network.NewHandler(),
}
}
func (app *App) Run() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Info("Application start")
log.WithFields(log.Fields{
"ClientAddress": app.ClientAddress,
"Number of CPUs": app.NumCPUs,
}).Debug("App settings")
server := newServer(app)
client := newClient(app)
period := time.NewTicker(app.CheckPeriod * time.Hour)
defer period.Stop()
sender := make(chan shared.TaskResult, app.NumCPUs*10)
//task processor
handlers := map[string]parsers.TaskHandler{
shared.OriginSurugaya: parsers.NewSurugayaParser(),
shared.OriginMandarake: parsers.NewMandarakeParser(),
}
taskProcessor := processor.New(processor.Deps{
Handlers: handlers,
Out: sender,
State: app.State,
Ctx: ctx,
Client: client,
NumCPUs: app.NumCPUs,
})
process := func() {
app.State.SetStatus(appState.StatusRequestTasks)
log.Info("Requesting data for parsing")
receivedTasks := app.Network.RequestTasks(ctx, client)
log.WithField("length", len(receivedTasks)).Debug("End receiving")
taskProcessor.StartWork(receivedTasks)
}
go func() {
process() //immediate start
for range period.C {
process()
}
}()
//done tasks sender
go func() {
ticker := time.NewTicker(time.Second * 2)
defer ticker.Stop()
var sendData []shared.TaskResult
for {
select {
case task := <-sender:
sendData = append(sendData, task)
case <-ticker.C:
l := len(sendData)
if l > 0 {
log.WithField("length", l).Debug("Sending parsed data")
app.Network.SendResult(client, sendData)
sendData = sendData[:0]
}
}
}
}()
//gRPC Server for status response
go func() {
listener, err := net.Listen("tcp", app.ServerAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Infof("gRPC Server listening at %v", app.ServerAddress)
if err := server.Serve(listener); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGTERM)
<-sigint
log.Info("Shutting down...")
period.Stop()
server.GracefulStop()
cancel()
}()
<-ctx.Done()
}

21
internal/app/client.go Normal file
View file

@ -0,0 +1,21 @@
package app
import (
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "parsing-service/proto/taskProcessor"
)
func newClient(app *App) pb.TaskProcessorClient {
var opts []grpc.DialOption
insec := grpc.WithTransportCredentials(insecure.NewCredentials())
opts = append(opts, insec)
conn, err := grpc.NewClient(app.ClientAddress, opts...)
if err != nil {
log.Fatal(err)
}
return pb.NewTaskProcessorClient(conn)
}

27
internal/app/server.go Normal file
View file

@ -0,0 +1,27 @@
package app
import (
"context"
"google.golang.org/grpc"
"parsing-service/internal/appState"
pb "parsing-service/proto/taskProcessor"
)
type Server struct {
pb.UnimplementedTaskProcessorServer
state *appState.State
}
func newServer(app *App) *grpc.Server {
s := grpc.NewServer()
srv := &Server{
state: app.State,
}
pb.RegisterTaskProcessorServer(s, srv)
return s
}
func (s *Server) ProcessorStatus(_ context.Context, _ *pb.ProcessorStatusRequest) (*pb.ProcessorStatusResponse, error) {
resp := s.state.StateResponse()
return resp, nil
}

View file

@ -0,0 +1,99 @@
package appState
import (
pb "parsing-service/proto/taskProcessor"
"sync/atomic"
"time"
)
type State struct {
appStart int64
lastCheck atomic.Int64
tasksReceived atomic.Int32
tasksInProgress atomic.Int32
tasksFirstTry atomic.Int32
tasksDoneAfterRetry atomic.Int32
tasksFailed atomic.Int32
workStatus atomic.Int32
numCPUs int32
checkPeriod int32
RetriesCount int
retriesMinutes int32
}
func NewState(numCPUs, checkPeriod, retriesCount, retriesMinutes int) *State {
now := time.Now().Unix()
state := &State{
appStart: now,
numCPUs: int32(numCPUs),
checkPeriod: int32(checkPeriod),
RetriesCount: retriesCount,
retriesMinutes: int32(retriesMinutes),
}
state.lastCheck.Store(now)
return state
}
func (s *State) ResetCounters() {
s.tasksReceived.Store(0)
s.tasksInProgress.Store(0)
s.tasksFirstTry.Store(0)
s.tasksDoneAfterRetry.Store(0)
s.tasksFailed.Store(0)
}
func (s *State) SetStatus(status Status) {
s.workStatus.Store(int32(status))
}
func (s *State) SetLastCheck() {
s.lastCheck.Store(time.Now().Unix())
}
func (s *State) SetTasksReceived(num int) {
s.tasksReceived.Swap(int32(num))
s.tasksInProgress.Swap(int32(num))
}
func (s *State) TaskDone() {
s.tasksInProgress.Add(-1)
}
func (s *State) FirstTry() {
s.tasksFirstTry.Add(1)
}
func (s *State) DoneAfterRetry() {
s.tasksDoneAfterRetry.Add(1)
}
func (s *State) Failed() {
s.tasksFailed.Add(1)
}
func (s *State) InProgress() bool {
return s.tasksInProgress.Load() > 0
}
func (s *State) SetIdleStatus() {
s.workStatus.Store(int32(StatusIdle))
}
func (s *State) StateResponse() *pb.ProcessorStatusResponse {
return &pb.ProcessorStatusResponse{
AppStart: s.appStart,
LastCheck: s.lastCheck.Load(),
TasksReceived: s.tasksReceived.Load(),
TasksInProgress: s.tasksInProgress.Load(),
TasksFirstTry: s.tasksFirstTry.Load(),
TasksDoneAfterRetry: s.tasksDoneAfterRetry.Load(),
TasksFailed: s.tasksFailed.Load(),
WorkStatus: Status(s.workStatus.Load()).String(),
NumCPUs: s.numCPUs,
CheckPeriod: s.checkPeriod,
RetriesCount: int32(s.RetriesCount),
RetriesMinutes: s.retriesMinutes,
}
}

View file

@ -0,0 +1,24 @@
package appState
type Status int32
const (
StatusIdle Status = iota
StatusRequestTasks
StatusWorkInProgress
StatusFailure
)
var statusNames = [...]string{
"Idle",
"Requesting tasks",
"Work in progress",
"Failure",
}
func (s Status) String() string {
if s < 0 || s >= Status(len(statusNames)) {
return "unknown"
}
return statusNames[s]
}

View file

@ -0,0 +1,35 @@
package logging
import (
"fmt"
log "github.com/sirupsen/logrus"
"os"
"path"
"runtime"
)
func LogSetup(lvl string) {
l, err := log.ParseLevel(lvl)
if err != nil {
log.SetLevel(log.DebugLevel)
}
log.SetFormatter(
&log.TextFormatter{
FullTimestamp: true,
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
filename := path.Base(f.File)
return fmt.Sprintf("%s()", f.Function), fmt.Sprintf(" %s:%d", filename, f.Line)
},
},
)
if l == log.DebugLevel {
log.SetLevel(l)
//log.SetReportCaller(true)
} else {
log.SetLevel(l)
}
log.SetOutput(os.Stdout)
}

View file

@ -0,0 +1,7 @@
package network
type Network struct{}
func NewHandler() *Network {
return &Network{}
}

View file

@ -0,0 +1,12 @@
package network
import (
"context"
"parsing-service/internal/shared"
pb "parsing-service/proto/taskProcessor"
)
type Handler interface {
RequestTasks(ctx context.Context, client pb.TaskProcessorClient) []shared.TaskResponse
SendResult(client pb.TaskProcessorClient, tasksDone []shared.TaskResult)
}

View file

@ -0,0 +1,38 @@
package network
import (
"context"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/emptypb"
"io"
"parsing-service/internal/shared"
pb "parsing-service/proto/taskProcessor"
)
func (n *Network) RequestTasks(ctx context.Context, client pb.TaskProcessorClient) []shared.TaskResponse {
var tasksList []shared.TaskResponse
stream, err := client.RequestTask(ctx, &emptypb.Empty{})
if err != nil {
log.WithField("err", err).Error("Error calling Request Tasks")
return nil
}
for {
response, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.WithField("err", err).Error("Error receiving response")
return nil
}
tasksList = append(tasksList, shared.TaskResponse{
MerchUuid: response.MerchUuid,
OriginSurugayaLink: response.OriginSurugayaLink,
OriginMandarakeLink: response.OriginMandarakeLink,
})
log.WithField("entry added", response.MerchUuid).Debug("gRPC Receive")
}
return tasksList
}

35
internal/network/send.go Normal file
View file

@ -0,0 +1,35 @@
package network
import (
"context"
log "github.com/sirupsen/logrus"
"parsing-service/internal/shared"
pb "parsing-service/proto/taskProcessor"
)
func (n *Network) SendResult(client pb.TaskProcessorClient, tasksDone []shared.TaskResult) {
stream, err := client.SendResult(context.Background())
if err != nil {
log.Fatalf("Error calling PostMerch: %v", err)
}
merchResponses := make([]pb.Result, len(tasksDone))
for i, task := range tasksDone {
merchResponses[i] = pb.Result{
MerchUuid: task.MerchUuid,
OriginName: task.Origin,
Price: task.Price,
}
}
for i := range merchResponses {
response := &merchResponses[i]
if err = stream.Send(response); err != nil {
log.Fatalf("Error sending request: %v", err)
}
}
if err = stream.CloseSend(); err != nil {
log.Fatalf("Error closing stream: %v", err)
}
}

View file

@ -0,0 +1,10 @@
package parsers
import (
"parsing-service/internal/appState"
"parsing-service/internal/shared"
)
type TaskHandler interface {
HandleTask(task shared.Task, sender chan shared.TaskResult, state *appState.State) error
}

View file

@ -0,0 +1,18 @@
package parsers
import (
log "github.com/sirupsen/logrus"
"parsing-service/internal/appState"
"parsing-service/internal/shared"
)
type MandarakeParser struct{}
func NewMandarakeParser() *MandarakeParser {
return &MandarakeParser{}
}
func (s *MandarakeParser) HandleTask(task shared.Task, sender chan shared.TaskResult, state *appState.State) error {
log.Debug("Handling Mandarake Task")
return nil
}

View file

@ -0,0 +1,18 @@
package parsers
import (
log "github.com/sirupsen/logrus"
"parsing-service/internal/appState"
"parsing-service/internal/shared"
)
type SurugayaParser struct{}
func NewSurugayaParser() *SurugayaParser {
return &SurugayaParser{}
}
func (s *SurugayaParser) HandleTask(task shared.Task, sender chan shared.TaskResult, state *appState.State) error {
log.Debug("Handling Surugaya Task")
return nil
}

View file

@ -0,0 +1,38 @@
package processor
import (
"context"
"parsing-service/internal/appState"
"parsing-service/internal/parsers"
"parsing-service/internal/shared"
pb "parsing-service/proto/taskProcessor"
)
type Processor struct {
handlers map[string]parsers.TaskHandler
out chan shared.TaskResult
state *appState.State
ctx context.Context
client pb.TaskProcessorClient
numCPUs int
}
type Deps struct {
Handlers map[string]parsers.TaskHandler
Out chan shared.TaskResult
State *appState.State
Ctx context.Context
Client pb.TaskProcessorClient
NumCPUs int
}
func New(deps Deps) *Processor {
return &Processor{
handlers: deps.Handlers,
out: deps.Out,
state: deps.State,
ctx: deps.Ctx,
client: deps.Client,
numCPUs: deps.NumCPUs,
}
}

View file

@ -0,0 +1,60 @@
package processor
import (
log "github.com/sirupsen/logrus"
"parsing-service/internal/appState"
"parsing-service/internal/shared"
"sync"
)
func (p *Processor) StartWork(receivedTasks []shared.TaskResponse) {
log.Info("Starting work...")
p.state.ResetCounters()
in := make(chan shared.Task, p.numCPUs*10)
wg := &sync.WaitGroup{}
for i := 0; i < p.numCPUs*10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
p.worker(in)
}()
}
tasksNumber := len(receivedTasks)
if tasksNumber > 0 {
p.state.SetStatus(appState.StatusWorkInProgress)
p.state.SetTasksReceived(tasksNumber)
p.sortTasks(in, receivedTasks)
}
close(in)
wg.Wait()
log.Debug("All goroutines finished")
p.state.SetLastCheck()
p.state.SetStatus(appState.StatusIdle)
log.Debugf("State | %+v", p.state)
}
func (p *Processor) sortTasks(in chan<- shared.Task, receivedTasks []shared.TaskResponse) {
for _, task := range receivedTasks {
switch {
case task.OriginSurugayaLink != "":
in <- shared.Task{
MerchUuid: task.MerchUuid,
Origin: shared.OriginSurugaya,
Link: task.OriginSurugayaLink,
RetryCount: 3,
}
case task.OriginMandarakeLink != "":
in <- shared.Task{
MerchUuid: task.MerchUuid,
Origin: shared.OriginMandarake,
Link: task.OriginMandarakeLink,
RetryCount: 3,
}
}
}
}

View file

@ -0,0 +1,18 @@
package processor
import (
log "github.com/sirupsen/logrus"
"parsing-service/internal/shared"
)
const zeroPrice = 0 //for debug purposes
func (p *Processor) worker(in <-chan shared.Task) {
for task := range in {
err := p.handlers[task.Origin].HandleTask(task, p.out, p.state)
if err != nil {
log.WithField("err", err).Error("Worker | Handle task")
continue
}
}
}

View file

@ -0,0 +1,6 @@
package shared
type Channels struct {
Surugaya chan Task
Mandarake chan Task
}

View file

@ -0,0 +1,6 @@
package shared
const (
OriginSurugaya = "surugaya"
OriginMandarake = "mandarake"
)

20
internal/shared/task.go Normal file
View file

@ -0,0 +1,20 @@
package shared
type Task struct {
MerchUuid string
Origin string
Link string
RetryCount int
}
type TaskResponse struct {
MerchUuid string
OriginSurugayaLink string
OriginMandarakeLink string
}
type TaskResult struct {
MerchUuid string
Origin string
Price uint32
}