Skip to content

Commit

Permalink
Merge pull request #33 from kuzxnia/fix_conn_leak
Browse files Browse the repository at this point in the history
Fix conn leak
  • Loading branch information
kuzxnia authored Mar 18, 2024
2 parents f4ec0e9 + 2079518 commit 773d050
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 19 deletions.
13 changes: 12 additions & 1 deletion cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ func provideWorkloadCommands() []*cobra.Command {
Use: CommandConfigWorkload,
Short: "Config",
GroupID: WorkloadGroup.ID,
PersistentPreRunE: persistentPreRunE,
PersistentPostRun: persistentPostRun,
RunE: func(cmd *cobra.Command, args []string) (err error) {
flags := cmd.Flags()
configFile, _ := flags.GetString(ConfigFile)
Expand Down Expand Up @@ -340,6 +342,15 @@ func provideOrchiestrationCommands() []*cobra.Command {
helmSet, _ := flags.GetStringSlice(FlagHelmSet)
workloadConfigPath, _ := flags.GetString(FlagWorkloadConfig)

cfg, err := ParseConfigFile(workloadConfigPath, false)
if err != nil {
return err
}
configValues, err := cfg.Values()
if err != nil {
return err
}

rsm := resourcemanager.ResourceManagerConfig{
KubeconfigPath: srcKubeconfigPath,
Context: srcContext,
Expand All @@ -351,7 +362,7 @@ func provideOrchiestrationCommands() []*cobra.Command {
ResourceManagerConfig: rsm,
Name: args[0],
HelmValues: helmSet,
WorkloadConfigString: workloadConfigPath,
WorkloadConfigString: configValues,
}

return UpgradeResources(&request)
Expand Down
2 changes: 1 addition & 1 deletion cli/workload/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// checks if process is running in local system
// here should be cli config request - not lbot one
func SetConfigWorkload(conn *grpc.ClientConn, parsedConfig *lbot.ConfigRequest) (err error) {
func SetConfigWorkload(conn grpc.ClientConnInterface, parsedConfig *lbot.ConfigRequest) (err error) {
requestConfig := BuildConfigRequest(parsedConfig)

fmt.Println("🚀 Setting new config")
Expand Down
2 changes: 1 addition & 1 deletion helm/workload/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ workload:
name:
replicas: 1
agent:
image: kuzxnia/loadbot:v1.0.5
image: kuzxnia/loadbot:v1.0.6
port: 1234
config:
8 changes: 4 additions & 4 deletions lbot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ func (c *JobRequest) UnmarshalJSON(data []byte) (err error) {
Pace uint64 `json:"pace"`
DataSize uint64 `json:"data_size"`
BatchSize uint64 `json:"batch_size"`
Duration config.Duration `json:"duration"`
Duration config.Duration `json:"duration"`
Operations uint64 `json:"operations"`
Timeout config.Duration `json:"timeout"` // if not set, default
Timeout config.Duration `json:"timeout"` // if not set, default
Filter map[string]interface{} `json:"filter"`
}
// default values
Expand All @@ -259,9 +259,9 @@ func (c *JobRequest) UnmarshalJSON(data []byte) (err error) {
c.Pace = tmp.Pace
c.DataSize = tmp.DataSize
c.BatchSize = tmp.BatchSize
c.Duration = tmp.Duration.Duration
c.Duration = tmp.Duration.Duration
c.Operations = tmp.Operations
c.Timeout = tmp.Timeout.Duration
c.Timeout = tmp.Timeout.Duration
c.Filter = tmp.Filter

return
Expand Down
3 changes: 2 additions & 1 deletion lbot/database/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func NewMongoClient(connectionString string, cfg *config.Job, schema *config.Sch
opts = opts.
ApplyURI(connectionString).
SetReadPreference(readpref.SecondaryPreferred()).
SetMaxConnecting(100).
SetMaxPoolSize(cfg.Connections * 2).
// SetMaxConnecting(100).
SetMaxConnIdleTime(90 * time.Second).
SetTimeout(cfg.Timeout)

Expand Down
14 changes: 14 additions & 0 deletions lbot/lbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (l *Lbot) Run() error {
if err != nil {
return err
}
defer client.Disconnect()

for _, job := range l.Config.Jobs {
err = client.RunJob(*job)
Expand All @@ -56,6 +57,7 @@ func (l *Lbot) SetCommandState(command *database.Command, state database.Command
if err != nil {
return err
}
defer client.Disconnect()
command.State = state.String()
return client.SaveCommand(command)
}
Expand All @@ -65,6 +67,7 @@ func (l *Lbot) SetWorkloadState(workload *database.Workload, state database.Work
if err != nil {
return err
}
defer client.Disconnect()
workload.State = state.String()
return client.SaveWorkload(workload)
}
Expand Down Expand Up @@ -147,6 +150,7 @@ func (l *Lbot) InitAgent(id primitive.ObjectID, name string) error {
if err != nil {
return err
}
defer client.Disconnect()
ct, err := client.ClusterTime()
if err != nil {
return errors.Wrap(err, "get cluster time")
Expand All @@ -167,6 +171,7 @@ func (l *Lbot) AgentHeartBeat(id primitive.ObjectID, name string) error {
if err != nil {
return err
}
defer client.Disconnect()

agentStatus := database.AgentStatus{
Id: id,
Expand All @@ -180,9 +185,11 @@ func (l *Lbot) HandleWorkload() {
log.Println("Fetching new workloads")
// todo: change to generic abstraction
client, err := database.NewInternalMongoClient(l.Config.ConnectionString)
defer client.Disconnect()
if err != nil {
return
}
defer client.Disconnect()

// todo: change to commands
workload, err := client.GetNewWorkloads()
Expand Down Expand Up @@ -218,6 +225,7 @@ func (l *Lbot) HandleCommand() {
if err != nil {
return
}
defer client.Disconnect()

log.Println("Fetching not finished commands")
// todo: change to commands
Expand Down Expand Up @@ -265,6 +273,7 @@ func (l *Lbot) IsMasterAgent(agentId primitive.ObjectID) (bool, error) {
if err != nil {
return false, err
}
defer client.Disconnect()
return client.IsMasterAgent(agentId)
}

Expand All @@ -274,6 +283,7 @@ func (l *Lbot) UpdateRunningAgents() error {
if err != nil {
return err
}
defer client.Disconnect()

runningAgents, err := client.GetAgentWithHeartbeatWithin()
if err != nil {
Expand All @@ -297,6 +307,7 @@ func (l *Lbot) GetNextUnFinishedCommand() (*database.Command, error) {
if err != nil {
return nil, err
}
defer client.Disconnect()
return client.GetNextUnFinishedCommand()
}

Expand All @@ -305,6 +316,7 @@ func (l *Lbot) AreWorkloadsFinished(command *database.Command) (bool, error) {
if err != nil {
return false, err
}
defer client.Disconnect()
workloads, err := client.GetCommandWorkloads(command)

finished := lo.Filter(workloads, func(w *database.Workload, index int) bool {
Expand All @@ -319,6 +331,7 @@ func (l *Lbot) GenerateWorkload(command *database.Command) ([]*database.Workload
if err != nil {
return nil, err
}
defer client.Disconnect()
ct, err := client.ClusterTime()
if err != nil {
return nil, errors.Wrap(err, "get cluster time")
Expand Down Expand Up @@ -356,6 +369,7 @@ func (l *Lbot) SaveWorkloads(ws []*database.Workload) error {
if err != nil {
return err
}
defer client.Disconnect()

var worklods []interface{}
for _, w := range ws {
Expand Down
4 changes: 2 additions & 2 deletions lbot/resourcemanager/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *HelmManager) Install(request *InstallRequest) (err error) {
installer.Labels = map[string]string{"role": "workload"}

options := values.Options{
Values: append([]string{"workload.name=" + request.Name}, request.HelmValues...),
Values: append([]string{"workload.name=" + request.Name, "workload.namespace=" + request.Namespace}, request.HelmValues...),
LiteralValues: []string{"workload.config=" + request.WorkloadConfigString},
}

Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *HelmManager) Upgrade(request *UpgradeRequest) (err error) {
upgrader.Labels = map[string]string{"role": "workload"}

options := values.Options{
Values: append([]string{"workload.name=" + request.Name}, request.HelmValues...),
Values: append([]string{"workload.name=" + request.Name, "workload.namespace=" + request.Namespace}, request.HelmValues...),
LiteralValues: []string{"workload.config=" + request.WorkloadConfigString},
}

Expand Down
Binary file modified lbot/resourcemanager/workload-chart.tgz
Binary file not shown.
2 changes: 1 addition & 1 deletion lbot/schema/data_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type LiveDataProvider struct {
func NewLiveDataProvider(job *config.Job, schema *config.Schema) *LiveDataProvider {
return &LiveDataProvider{
job: job,
dataGenerator: NewDataGenerator(schema, job.DataSize),
dataGenerator: NewDataGenerator(schema, int(job.DataSize)),
}
}

Expand Down
35 changes: 27 additions & 8 deletions lbot/schema/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package schema
import (
"errors"
"math/rand"
"time"
"unsafe"

"github.com/go-faker/faker/v4"
"github.com/go-faker/faker/v4/pkg/options"
Expand Down Expand Up @@ -63,8 +65,8 @@ type DataGenerator interface {
GenerateFromTemplate(interface{}) (interface{}, error)
}

func NewDataGenerator(schema *config.Schema, dataSize uint64) DataGenerator {
// todo: check size of object using, unsafe.Sizeof( )
func NewDataGenerator(schema *config.Schema, dataSize int) DataGenerator {
// todo: check size of object using, unsafe.Sizeof( )

if schema != nil {
return DataGenerator(
Expand All @@ -82,7 +84,7 @@ func NewDataGenerator(schema *config.Schema, dataSize uint64) DataGenerator {
}

type MeasurableDataGenerator struct {
dataSize uint64
dataSize int
}

func (g *MeasurableDataGenerator) Generate() (interface{}, error) {
Expand All @@ -109,14 +111,31 @@ func (g *MeasurableDataGenerator) GenerateFromTemplate(template interface{}) (in
}
}

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)

var src = rand.NewSource(time.Now().UnixNano())

func randStringBytes(n uint64) string {
func randStringBytes(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return string(b)

return *(*string)(unsafe.Pointer(&b))
}

type StructuralizableDataGenerator struct {
Expand Down

0 comments on commit 773d050

Please sign in to comment.