Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed event pod crashing #1310

Merged
merged 5 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions plugin-dell/dpmessagebus/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dpmessagebus
import (
"context"
"encoding/json"
"strings"

dc "github.com/ODIM-Project/ODIM/lib-messagebus/datacommunicator"
"github.com/ODIM-Project/ODIM/lib-utilities/common"
Expand Down Expand Up @@ -48,6 +49,10 @@ func Publish(ctx context.Context, data interface{}) bool {
log.Error("Failed to unmarshal the event, got: " + err.Error())
return false
}
if message.OdataType != "" && !strings.Contains(message.OdataType, "MetricReport") {
event.Request, err = formatEventRequest(ctx, message)
}

if err := K.Distribute(event); err != nil {
log.Error("Unable Publish events to kafka: " + err.Error())
return false
Expand All @@ -57,3 +62,14 @@ func Publish(ctx context.Context, data interface{}) bool {
}
return true
}
func formatEventRequest(ctx context.Context, eventData common.MessageData) ([]byte, error) {
for _, event := range eventData.Events {
if event.OriginOfCondition == nil {
event.OriginOfCondition = &common.Link{
Oid: "",
}
}
}
data1, _ := json.Marshal(eventData)
return data1, nil
}
5 changes: 1 addition & 4 deletions plugin-dell/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ module github.com/ODIM-Project/ODIM/plugin-dell

go 1.19


require (
github.com/ODIM-Project/ODIM/lib-dmtf v0.0.0-20230329075134-e5327c1b94d8
github.com/ODIM-Project/ODIM/lib-messagebus v0.0.0-20230329075134-e5327c1b94d8
Expand All @@ -22,7 +21,7 @@ require (
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 // indirect
github.com/CloudyKit/jet/v6 v6.2.0 // indirect
github.com/Joker/jade v1.1.3 // indirect
github.com/ODIM-Project/ODIM/lib-persistence-manager v0.0.0-20220214074518-c500db3ba816 // indirect
github.com/ODIM-Project/ODIM/lib-persistence-manager v0.0.0-20230719110936-f43048b6407a // indirect
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect
github.com/ajg/form v1.5.1 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
Expand All @@ -42,7 +41,6 @@ require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/gorilla/css v1.0.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down Expand Up @@ -103,7 +101,6 @@ require (
moul.io/http2curl/v2 v2.3.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect

)

Expand Down
13 changes: 7 additions & 6 deletions svc-aggregation/agcommon/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func ContactPlugin(ctx context.Context, req agmodel.PluginContactRequest, server
return pmbhandle.ContactPlugin(ctx, reqURL, req.HTTPMethodType, req.Token, "", req.PostBody, req.LoginCredential)
}

// GetDeviceSubscriptionDetails is for getting device event susbcription details
// GetDeviceSubscriptionDetails is for getting device event subscription details
func GetDeviceSubscriptionDetails(ctx context.Context, serverAddress string) (string, []string, error) {
deviceIPAddress, _, _, err := LookupHost(serverAddress)
if err != nil {
Expand Down Expand Up @@ -491,12 +491,13 @@ func GetSubscribedEvtTypes(ctx context.Context, searchKey string) ([]string, err
}
var eventTypes []string
for _, sub := range subscriptions {
var subscription map[string]interface{}
if err := JSONUnMarshalFunc([]byte(sub), &subscription); err != nil {
return nil, fmt.Errorf("error while unmarshalling event subscription: %v", err.Error())
var eventDestination map[string]interface{}
if err := JSONUnMarshalFunc([]byte(sub), &eventDestination); err != nil {
return nil, fmt.Errorf("error while unmarshal event subscription: %v", err.Error())
}
for _, evtTyps := range subscription["EventTypes"].([]interface{}) {
eventTypes = append(eventTypes, evtTyps.(string))
subscription := eventDestination["EventDestination"].(map[string]interface{})
for _, evtType := range subscription["EventTypes"].([]interface{}) {
eventTypes = append(eventTypes, evtType.(string))
}
}
eventTypes = removeDuplicates(eventTypes)
Expand Down
2 changes: 1 addition & 1 deletion svc-aggregation/agmodel/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ func GetEventSubscriptions(key string) ([]string, error) {
}
subscriptions, gerr := conn.GetEvtSubscriptions(common.SubscriptionIndex, "*"+key+"*")
if gerr != nil {
return nil, fmt.Errorf("error while trying to get event subsciption details: %v", gerr.Error())
return nil, fmt.Errorf("error while trying to get event subscription details: %v", gerr.Error())
}
return subscriptions, nil
}
Expand Down
2 changes: 1 addition & 1 deletion svc-aggregation/system/pluginHealthCheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (e *ExternalInterface) SendStartUpData(ctx context.Context, startUpReq *agg
l.LogWithFields(ctx).Infof("received plugin start up event from %s(%s)", plugin.ID, plugin.PluginType)

// for plugins managing resources of non Compute type, at present
// there is no usecase to share inventory, so subscribing to
// there is no use case to share inventory, so subscribing to
// EMB topic of the plugin should be enough
if plugin.PluginType != "Compute" {
phc := agcommon.PluginHealthCheckInterface{
Expand Down
Loading