Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented subtasks for delete event subscription #1302

Merged
merged 7 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib-utilities/proto/events/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ service Events {
rpc IsAggregateHaveSubscription(EventUpdateRequest) returns (SubscribeEMBResponse){}
rpc DeleteAggregateSubscriptionsRPC(EventUpdateRequest) returns (SubscribeEMBResponse){}
rpc UpdateSubscriptionLocationRPC(UpdateSubscriptionLocation) returns (SubscribeEMBResponse) {}
rpc DeleteSubscription(EventRequest) returns (EventSubResponse) {}
}

message EventSubRequest {
Expand Down
6 changes: 4 additions & 2 deletions lib-utilities/services/eventsubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ func SubscribeToEMB(ctx context.Context, pluginID string, queueList []string) er
}

// DeleteSubscription calls the event service and delete all subscription realated to that server
func DeleteSubscription(ctx context.Context, uuid string) (*eventsproto.EventSubResponse, error) {
func DeleteSubscription(ctx context.Context, uuid string,
sessionToken string) (*eventsproto.EventSubResponse, error) {
var resp eventsproto.EventSubResponse
req := eventsproto.EventRequest{
UUID: uuid,
UUID: uuid,
SessionToken: sessionToken,
}
conn, errConn := ODIMService.Client(Events)
if errConn != nil {
Expand Down
37 changes: 37 additions & 0 deletions lib-utilities/services/pluginTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,43 @@ func SavePluginTaskInfo(ctx context.Context, pluginIP, pluginServerName,
return nil
}

// SaveEventSubscriptionID saves the event subscription id in db
// it receives taskID and subscription ID. taskID is the key.
func SaveEventSubscriptionID(ctx context.Context, taskID, subscriptionID string) error {

table := "EventSubscriptionID"
connPool, err := redis.GetDBConnection(redis.InMemory)
if err != nil {
return errors.PackError(err.ErrNo(), "error while trying to connecting"+
" to DB: ", err.Error())
}

if err = connPool.Create(table, taskID, subscriptionID); err != nil {
return errors.PackError(err.ErrNo(), "error while trying to insert"+
" plugin task: ", err.Error())
}
return nil
}

// SaveEventSubscriptionID saves the event subscription id in db
// it receives taskID and subscription ID. taskID is the key.
func GetEventSubscriptionID(ctx context.Context, taskID string) (string, error) {

table := "EventSubscriptionID"
connPool, err := redis.GetDBConnection(redis.InMemory)
if err != nil {
return "", errors.PackError(err.ErrNo(), "error while trying to connecting"+
" to DB: ", err.Error())
}

value, err := connPool.Read(table, taskID)
if err != nil {
return "", errors.PackError(err.ErrNo(), "error while trying to insert"+
" plugin task: ", err.Error())
}
return value, nil
}

// createPluginTask will insert plugin task info in DB
func createPluginTask(ctx context.Context, key string,
value interface{}) *errors.Error {
Expand Down
1 change: 0 additions & 1 deletion svc-aggregation/agmessagebus/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func TestPublish(t *testing.T) {
assert.NotEqual(t, nil, err, "Error should not be Nil for this scenario")

} else {
fmt.Println(err)
assert.Equal(t, nil, err, "Error should be Nil for this scenario")

}
Expand Down
2 changes: 1 addition & 1 deletion svc-aggregation/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 // indirect
github.com/CloudyKit/jet/v6 v6.2.0 // indirect
github.com/Joker/jade v1.1.3 // indirect
github.com/ODIM-Project/ODIM/lib-persistence-manager v0.0.0-20201201072448-9772421f1b55 // indirect
github.com/ODIM-Project/ODIM/lib-persistence-manager v0.0.0-20230719110936-f43048b6407a // indirect
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion svc-aggregation/rpc/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ func (a *Aggregator) DeleteAggregationSource(ctx context.Context, req *aggregato
var threadID int = 1
ctxt := context.WithValue(ctx, common.ThreadName, common.DeleteAggregationSource)
ctxt = context.WithValue(ctxt, common.ThreadID, strconv.Itoa(threadID))
go a.connector.DeleteAggregationSources(ctxt, taskID, targetURI, req)
go a.connector.DeleteAggregationSources(ctxt, taskID, targetURI, req, sessionUserName)
threadID++
// return 202 Accepted
var rpcResp = response.RPC{
Expand Down
2 changes: 1 addition & 1 deletion svc-aggregation/rpc/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func GetAggregator() *Aggregator {
CreateTask: services.CreateTask,
CreateChildTask: services.CreateChildTask,
UpdateTask: system.UpdateTaskData,
CreateSubcription: system.CreateDefaultEventSubscription,
CreateSubscription: system.CreateDefaultEventSubscription,
PublishEvent: system.PublishEvent,
GetPluginStatus: agcommon.GetPluginStatus,
SubscribeToEMB: services.SubscribeToEMB,
Expand Down
4 changes: 2 additions & 2 deletions svc-aggregation/rpc/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var connector = &system.ExternalInterface{
UpdateTask: mockUpdateTask,
DecryptPassword: stubDevicePassword,
GetPluginStatus: GetPluginStatusForTesting,
CreateSubcription: EventFunctionsForTesting,
CreateSubscription: EventFunctionsForTesting,
PublishEvent: PostEventFunctionForTesting,
EncryptPassword: stubDevicePassword,
DeleteComputeSystem: deleteComputeforTest,
Expand Down Expand Up @@ -119,7 +119,7 @@ func deleteSystemforTest(key string) *errors.Error {
return nil
}

func mockDeleteSubscription(ctx context.Context, uuid string) (*eventsproto.EventSubResponse, error) {
func mockDeleteSubscription(ctx context.Context, uuid string, sessionName string) (*eventsproto.EventSubResponse, error) {
if uuid == "/redfish/v1/systems/delete-subscription-error.1" {
return nil, fmt.Errorf("error while trying to delete event subcription")
} else if uuid == "/redfish/v1/systems/unexpected-statuscode.1" {
Expand Down
2 changes: 1 addition & 1 deletion svc-aggregation/system/addaggregationsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func getMockExternalInterface() *ExternalInterface {
Auth: mockIsAuthorized,
CreateChildTask: mockCreateChildTask,
UpdateTask: mockUpdateTask,
CreateSubcription: EventFunctionsForTesting,
CreateSubscription: EventFunctionsForTesting,
PublishEvent: PostEventFunctionForTesting,
GetPluginStatus: GetPluginStatusForTesting,
PublishEventMB: mockPublishEventMB,
Expand Down
4 changes: 2 additions & 2 deletions svc-aggregation/system/addcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (e *ExternalInterface) addCompute(ctx context.Context, taskID, targetURI, p
pluginContactRequest.OID = "/redfish/v1/Systems"
pluginContactRequest.DeviceUUID = saveSystem.DeviceUUID
pluginContactRequest.HTTPMethodType = http.MethodGet
pluginContactRequest.CreateSubcription = e.CreateSubcription
pluginContactRequest.CreateSubscription = e.CreateSubscription
pluginContactRequest.PublishEvent = e.PublishEvent
pluginContactRequest.BMCAddress = saveSystem.ManagerAddress

Expand Down Expand Up @@ -315,7 +315,7 @@ func (e *ExternalInterface) addCompute(ctx context.Context, taskID, targetURI, p
urlList := h.SystemURL
urlList = append(urlList, chassisList...)
urlList = append(urlList, managersList...)
pluginContactRequest.CreateSubcription(ctx, urlList)
pluginContactRequest.CreateSubscription(ctx, urlList)

pluginContactRequest.PublishEvent(ctx, h.SystemURL, "SystemsCollection")

Expand Down
50 changes: 25 additions & 25 deletions svc-aggregation/system/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type ExternalInterface struct {
CreateChildTask func(context.Context, string, string) (string, error)
CreateTask func(context.Context, string) (string, error)
UpdateTask func(context.Context, common.TaskData) error
CreateSubcription func(context.Context, []string)
CreateSubscription func(context.Context, []string)
PublishEvent func(context.Context, []string, string)
PublishEventMB func(context.Context, string, string, string)
GetPluginStatus func(context.Context, agmodel.Plugin) bool
Expand All @@ -90,7 +90,7 @@ type ExternalInterface struct {
DecryptPassword func([]byte) ([]byte, error)
DeleteComputeSystem func(int, string) *errors.Error
DeleteSystem func(string) *errors.Error
DeleteEventSubscription func(context.Context, string) (*eventsproto.EventSubResponse, error)
DeleteEventSubscription func(context.Context, string, string) (*eventsproto.EventSubResponse, error)
EventNotification func(context.Context, string, string, string, agmessagebus.MQBusCommunicator) error
GetAllKeysFromTable func(context.Context, string) ([]string, error)
GetConnectionMethod func(context.Context, string) (agmodel.ConnectionMethod, *errors.Error)
Expand All @@ -114,29 +114,29 @@ type responseStatus struct {
}

type getResourceRequest struct {
Data []byte
Username string
Password string
SystemID string
DeviceUUID string
DeviceInfo interface{}
LoginCredentials map[string]string
ParentOID string
OID string
ContactClient func(context.Context, string, string, string, string, interface{}, map[string]string) (*http.Response, error)
OemFlag bool
Plugin agmodel.Plugin
TaskRequest string
HTTPMethodType string
Token string
StatusPoll bool
CreateSubcription func(context.Context, []string)
PublishEvent func(context.Context, []string, string)
GetPluginStatus func(context.Context, agmodel.Plugin) bool
UpdateFlag bool
TargetURI string
UpdateTask func(context.Context, common.TaskData) error
BMCAddress string
Data []byte
Username string
Password string
SystemID string
DeviceUUID string
DeviceInfo interface{}
LoginCredentials map[string]string
ParentOID string
OID string
ContactClient func(context.Context, string, string, string, string, interface{}, map[string]string) (*http.Response, error)
OemFlag bool
Plugin agmodel.Plugin
TaskRequest string
HTTPMethodType string
Token string
StatusPoll bool
CreateSubscription func(context.Context, []string)
PublishEvent func(context.Context, []string, string)
GetPluginStatus func(context.Context, agmodel.Plugin) bool
UpdateFlag bool
TargetURI string
UpdateTask func(context.Context, common.TaskData) error
BMCAddress string
}

type respHolder struct {
Expand Down
15 changes: 9 additions & 6 deletions svc-aggregation/system/deleteaggregationsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
)

// DeleteAggregationSources is used to delete aggregation sources
func (e *ExternalInterface) DeleteAggregationSources(ctx context.Context, taskID string, targetURI string, req *aggregatorproto.AggregatorRequest) error {
func (e *ExternalInterface) DeleteAggregationSources(ctx context.Context, taskID string, targetURI string,
req *aggregatorproto.AggregatorRequest, sessionUserName string) error {
var task = common.TaskData{
TaskID: taskID,
TargetURI: targetURI,
Expand All @@ -58,7 +59,7 @@ func (e *ExternalInterface) DeleteAggregationSources(ctx context.Context, taskID
go runtime.Goexit()
}
l.LogWithFields(ctx).Debugf("request data for delete aggregation source: %s", string(req.RequestBody))
data := e.DeleteAggregationSource(ctx, req)
data := e.DeleteAggregationSource(ctx, req, sessionUserName)
err = e.UpdateTask(ctx, common.TaskData{
TaskID: taskID,
TargetURI: targetURI,
Expand All @@ -83,7 +84,8 @@ func (e *ExternalInterface) DeleteAggregationSources(ctx context.Context, taskID
}

// DeleteAggregationSource is the handler for removing bmc or manager
func (e *ExternalInterface) DeleteAggregationSource(ctx context.Context, req *aggregatorproto.AggregatorRequest) response.RPC {
func (e *ExternalInterface) DeleteAggregationSource(ctx context.Context,
req *aggregatorproto.AggregatorRequest, sessionUserName string) response.RPC {
var resp response.RPC

aggregationSource, dbErr := agmodel.GetAggregationSourceInfo(ctx, req.URL)
Expand Down Expand Up @@ -145,7 +147,7 @@ func (e *ExternalInterface) DeleteAggregationSource(ctx context.Context, req *ag
}
for _, systemURI := range systemList {
index := strings.LastIndexAny(systemURI, "/")
resp = e.deleteCompute(ctx, systemURI, index, target.PluginID)
resp = e.deleteCompute(ctx, systemURI, index, target.PluginID, sessionUserName)
}
removeAggregationSourceFromAggregates(ctx, systemList)
}
Expand Down Expand Up @@ -385,7 +387,8 @@ func (e *ExternalInterface) deletePlugin(ctx context.Context, oid string) respon
return resp
}

func (e *ExternalInterface) deleteCompute(ctx context.Context, key string, index int, pluginID string) response.RPC {
func (e *ExternalInterface) deleteCompute(ctx context.Context, key string, index int, pluginID string,
sessionUserName string) response.RPC {
var resp response.RPC
// check whether the any system operation is under progress
systemOperation, dbErr := agmodel.GetSystemOperationInfo(ctx, strings.TrimSuffix(key, "/"))
Expand Down Expand Up @@ -441,7 +444,7 @@ func (e *ExternalInterface) deleteCompute(ctx context.Context, key string, index
}
}()
// Delete Subscription on odimra and also on device
subResponse, err := e.DeleteEventSubscription(ctx, key)
subResponse, err := e.DeleteEventSubscription(ctx, key, sessionUserName)
if err != nil && subResponse == nil {
errMsg := fmt.Sprintf("error while trying to delete subscriptions: %v", err)
l.LogWithFields(ctx).Error(errMsg)
Expand Down
24 changes: 17 additions & 7 deletions svc-aggregation/system/deleteaggregationsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func deleteSystemforTest(key string) *errors.Error {
return nil
}

func mockDeleteSubscription(ctx context.Context, uuid string) (*eventsproto.EventSubResponse, error) {
func mockDeleteSubscription(ctx context.Context, uuid, sessionId string) (*eventsproto.EventSubResponse, error) {
if uuid == "/redfish/v1/Systems/ef83e569-7336-492a-aaee-31c02d9db832.1" {
return nil, fmt.Errorf("error while trying to delete event subcription")
} else if uuid == "/redfish/v1/Systems/unexpected-statuscode.1" {
Expand Down Expand Up @@ -153,7 +153,8 @@ func mockLogServicesCollectionData(id string, data map[string]interface{}) error
func TestDeleteAggregationSourceWithRediscovery(t *testing.T) {
d := getMockExternalInterface()
type args struct {
req *aggregatorproto.AggregatorRequest
req *aggregatorproto.AggregatorRequest
sessionName string
}
config.SetUpMockConfig(t)
defer func() {
Expand Down Expand Up @@ -227,13 +228,14 @@ func TestDeleteAggregationSourceWithRediscovery(t *testing.T) {
SessionToken: "SessionToken",
URL: "/redfish/v1/AggregationService/AggregationSources/ef83e569-7336-492a-aaee-31c02d9db831",
},
sessionName: "dummy",
},
want: http.StatusNotAcceptable,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := d.DeleteAggregationSource(ctx, tt.args.req)
got := d.DeleteAggregationSource(ctx, tt.args.req, tt.args.sessionName)
if got.StatusCode != tt.want {
t.Errorf("DeleteAggregationSource() = %v, want %v", got, tt.want)
}
Expand Down Expand Up @@ -338,7 +340,8 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) {
t.Fatalf("error: %v", err)
}
type args struct {
req *aggregatorproto.AggregatorRequest
req *aggregatorproto.AggregatorRequest
sessionName string
}
tests := []struct {
name string
Expand All @@ -352,6 +355,7 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) {
SessionToken: "SessionToken",
URL: "/redfish/v1/AggregationService/AggregationSources/123456",
},
sessionName: "dummy",
},
want: http.StatusNoContent,
},
Expand All @@ -362,6 +366,7 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) {
SessionToken: "SessionToken",
URL: "/redfish/v1/AggregationService/AggregationSources/123455",
},
sessionName: "dummy",
},
want: http.StatusNotAcceptable,
},
Expand All @@ -372,6 +377,7 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) {
SessionToken: "SessionToken",
URL: "/redfish/v1/AggregationService/AggregationSources/123434",
},
sessionName: "dummy",
},
want: http.StatusNotFound,
},
Expand All @@ -382,13 +388,14 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) {
SessionToken: "SessionToken",
URL: "/redfish/v1/AggregationService/AggregationSources/123457",
},
sessionName: "dummy",
},
want: http.StatusNotAcceptable,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := d.DeleteAggregationSource(ctx, tt.args.req)
got := d.DeleteAggregationSource(ctx, tt.args.req, tt.args.sessionName)
if got.StatusCode != tt.want {
t.Errorf("DeleteAggregationSource() = %v, want %v", got, tt.want)
}
Expand Down Expand Up @@ -478,7 +485,8 @@ func TestExternalInterface_DeleteBMC(t *testing.T) {
t.Fatalf("error: %v", err)
}
type args struct {
req *aggregatorproto.AggregatorRequest
req *aggregatorproto.AggregatorRequest
sessionName string
}
tests := []struct {
name string
Expand All @@ -492,6 +500,7 @@ func TestExternalInterface_DeleteBMC(t *testing.T) {
SessionToken: "SessionToken",
URL: "/redfish/v1/AggregationService/AggregationSources/ef83e569-7336-492a-aaee-31c02d9db831",
},
sessionName: "dummy",
},
want: http.StatusNoContent,
},
Expand All @@ -502,13 +511,14 @@ func TestExternalInterface_DeleteBMC(t *testing.T) {
SessionToken: "SessionToken",
URL: "/redfish/v1/AggregationService/AggregationSources/ef83e569-7336-492a-aaee-31c02d9db832",
},
sessionName: "dummy",
},
want: http.StatusInternalServerError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := d.DeleteAggregationSource(ctx, tt.args.req)
got := d.DeleteAggregationSource(ctx, tt.args.req, tt.args.sessionName)
if got.StatusCode != tt.want {
t.Errorf("DeleteAggregationSource() = %v, want %v", got, tt.want)
}
Expand Down
Loading
Loading