From 7bc32c956908d2cade608682fcc53cda8028294f Mon Sep 17 00:00:00 2001 From: Angith Date: Mon, 31 Jul 2023 15:15:29 +0530 Subject: [PATCH 1/2] 1. Implemented subtasks for delete event subscription 2. added support for queueing and prioritization --- lib-utilities/services/eventsubscription.go | 6 +- lib-utilities/services/pluginTask.go | 18 ++ svc-aggregation/rpc/aggregator.go | 2 +- svc-aggregation/rpc/common.go | 2 +- svc-aggregation/rpc/common_test.go | 2 +- .../system/addaggregationsource_test.go | 2 +- svc-aggregation/system/addcompute.go | 4 +- svc-aggregation/system/common.go | 50 ++--- .../system/deleteaggregationsource.go | 15 +- svc-events/events/deletesubscription.go | 171 ++++++++++++++---- svc-events/events/evtsubscription.go | 4 +- svc-events/evresponse/events_test.go | 2 +- svc-events/go.mod | 2 +- svc-events/rpc/common.go | 69 +++++++ svc-events/rpc/events.go | 79 ++------ 15 files changed, 281 insertions(+), 147 deletions(-) create mode 100644 svc-events/rpc/common.go diff --git a/lib-utilities/services/eventsubscription.go b/lib-utilities/services/eventsubscription.go index dfcdad996..af5ee1347 100644 --- a/lib-utilities/services/eventsubscription.go +++ b/lib-utilities/services/eventsubscription.go @@ -43,10 +43,12 @@ func SubscribeToEMB(ctx context.Context, pluginID string, queueList []string) er } // DeleteSubscription calls the event service and delete all subscription realated to that server -func DeleteSubscription(ctx context.Context, uuid string) (*eventsproto.EventSubResponse, error) { +func DeleteSubscription(ctx context.Context, uuid string, + sessionToken string) (*eventsproto.EventSubResponse, error) { var resp eventsproto.EventSubResponse req := eventsproto.EventRequest{ - UUID: uuid, + UUID: uuid, + SessionToken: sessionToken, } conn, errConn := ODIMService.Client(Events) if errConn != nil { diff --git a/lib-utilities/services/pluginTask.go b/lib-utilities/services/pluginTask.go index 600edb59a..db72937c0 100644 --- a/lib-utilities/services/pluginTask.go +++ b/lib-utilities/services/pluginTask.go @@ -47,6 +47,24 @@ func SavePluginTaskInfo(ctx context.Context, pluginIP, pluginServerName, return nil } +// SaveEventSubscriptionID saves the event subscription id in db +// it receives taskID and subscription ID. taskID is the key. +func SaveEventSubscriptionID(ctx context.Context, taskID, subscriptionID string) error { + + table := "EventSubscriptionID" + connPool, err := redis.GetDBConnection(redis.InMemory) + if err != nil { + return errors.PackError(err.ErrNo(), "error while trying to connecting"+ + " to DB: ", err.Error()) + } + + if err = connPool.Create(table, taskID, subscriptionID); err != nil { + return errors.PackError(err.ErrNo(), "error while trying to insert"+ + " plugin task: ", err.Error()) + } + return nil +} + // createPluginTask will insert plugin task info in DB func createPluginTask(ctx context.Context, key string, value interface{}) *errors.Error { diff --git a/svc-aggregation/rpc/aggregator.go b/svc-aggregation/rpc/aggregator.go index 0c6bb5177..1d7500baf 100644 --- a/svc-aggregation/rpc/aggregator.go +++ b/svc-aggregation/rpc/aggregator.go @@ -570,7 +570,7 @@ func (a *Aggregator) DeleteAggregationSource(ctx context.Context, req *aggregato var threadID int = 1 ctxt := context.WithValue(ctx, common.ThreadName, common.DeleteAggregationSource) ctxt = context.WithValue(ctxt, common.ThreadID, strconv.Itoa(threadID)) - go a.connector.DeleteAggregationSources(ctxt, taskID, targetURI, req) + go a.connector.DeleteAggregationSources(ctxt, taskID, targetURI, req, sessionUserName) threadID++ // return 202 Accepted var rpcResp = response.RPC{ diff --git a/svc-aggregation/rpc/common.go b/svc-aggregation/rpc/common.go index 0e9df61fa..05393876b 100644 --- a/svc-aggregation/rpc/common.go +++ b/svc-aggregation/rpc/common.go @@ -43,7 +43,7 @@ func GetAggregator() *Aggregator { CreateTask: services.CreateTask, CreateChildTask: services.CreateChildTask, UpdateTask: system.UpdateTaskData, - CreateSubcription: system.CreateDefaultEventSubscription, + CreateSubscription: system.CreateDefaultEventSubscription, PublishEvent: system.PublishEvent, GetPluginStatus: agcommon.GetPluginStatus, SubscribeToEMB: services.SubscribeToEMB, diff --git a/svc-aggregation/rpc/common_test.go b/svc-aggregation/rpc/common_test.go index 7eb34ed8c..52efa2f34 100644 --- a/svc-aggregation/rpc/common_test.go +++ b/svc-aggregation/rpc/common_test.go @@ -42,7 +42,7 @@ var connector = &system.ExternalInterface{ UpdateTask: mockUpdateTask, DecryptPassword: stubDevicePassword, GetPluginStatus: GetPluginStatusForTesting, - CreateSubcription: EventFunctionsForTesting, + CreateSubscription: EventFunctionsForTesting, PublishEvent: PostEventFunctionForTesting, EncryptPassword: stubDevicePassword, DeleteComputeSystem: deleteComputeforTest, diff --git a/svc-aggregation/system/addaggregationsource_test.go b/svc-aggregation/system/addaggregationsource_test.go index 4309e3c47..7dc11d4d8 100644 --- a/svc-aggregation/system/addaggregationsource_test.go +++ b/svc-aggregation/system/addaggregationsource_test.go @@ -938,7 +938,7 @@ func getMockExternalInterface() *ExternalInterface { Auth: mockIsAuthorized, CreateChildTask: mockCreateChildTask, UpdateTask: mockUpdateTask, - CreateSubcription: EventFunctionsForTesting, + CreateSubscription: EventFunctionsForTesting, PublishEvent: PostEventFunctionForTesting, GetPluginStatus: GetPluginStatusForTesting, PublishEventMB: mockPublishEventMB, diff --git a/svc-aggregation/system/addcompute.go b/svc-aggregation/system/addcompute.go index 5d2e25e08..0184436ff 100644 --- a/svc-aggregation/system/addcompute.go +++ b/svc-aggregation/system/addcompute.go @@ -136,7 +136,7 @@ func (e *ExternalInterface) addCompute(ctx context.Context, taskID, targetURI, p pluginContactRequest.OID = "/redfish/v1/Systems" pluginContactRequest.DeviceUUID = saveSystem.DeviceUUID pluginContactRequest.HTTPMethodType = http.MethodGet - pluginContactRequest.CreateSubcription = e.CreateSubcription + pluginContactRequest.CreateSubscription = e.CreateSubscription pluginContactRequest.PublishEvent = e.PublishEvent pluginContactRequest.BMCAddress = saveSystem.ManagerAddress @@ -315,7 +315,7 @@ func (e *ExternalInterface) addCompute(ctx context.Context, taskID, targetURI, p urlList := h.SystemURL urlList = append(urlList, chassisList...) urlList = append(urlList, managersList...) - pluginContactRequest.CreateSubcription(ctx, urlList) + pluginContactRequest.CreateSubscription(ctx, urlList) pluginContactRequest.PublishEvent(ctx, h.SystemURL, "SystemsCollection") diff --git a/svc-aggregation/system/common.go b/svc-aggregation/system/common.go index 248627696..53bd0c732 100644 --- a/svc-aggregation/system/common.go +++ b/svc-aggregation/system/common.go @@ -81,7 +81,7 @@ type ExternalInterface struct { CreateChildTask func(context.Context, string, string) (string, error) CreateTask func(context.Context, string) (string, error) UpdateTask func(context.Context, common.TaskData) error - CreateSubcription func(context.Context, []string) + CreateSubscription func(context.Context, []string) PublishEvent func(context.Context, []string, string) PublishEventMB func(context.Context, string, string, string) GetPluginStatus func(context.Context, agmodel.Plugin) bool @@ -90,7 +90,7 @@ type ExternalInterface struct { DecryptPassword func([]byte) ([]byte, error) DeleteComputeSystem func(int, string) *errors.Error DeleteSystem func(string) *errors.Error - DeleteEventSubscription func(context.Context, string) (*eventsproto.EventSubResponse, error) + DeleteEventSubscription func(context.Context, string, string) (*eventsproto.EventSubResponse, error) EventNotification func(context.Context, string, string, string, agmessagebus.MQBusCommunicator) error GetAllKeysFromTable func(context.Context, string) ([]string, error) GetConnectionMethod func(context.Context, string) (agmodel.ConnectionMethod, *errors.Error) @@ -114,29 +114,29 @@ type responseStatus struct { } type getResourceRequest struct { - Data []byte - Username string - Password string - SystemID string - DeviceUUID string - DeviceInfo interface{} - LoginCredentials map[string]string - ParentOID string - OID string - ContactClient func(context.Context, string, string, string, string, interface{}, map[string]string) (*http.Response, error) - OemFlag bool - Plugin agmodel.Plugin - TaskRequest string - HTTPMethodType string - Token string - StatusPoll bool - CreateSubcription func(context.Context, []string) - PublishEvent func(context.Context, []string, string) - GetPluginStatus func(context.Context, agmodel.Plugin) bool - UpdateFlag bool - TargetURI string - UpdateTask func(context.Context, common.TaskData) error - BMCAddress string + Data []byte + Username string + Password string + SystemID string + DeviceUUID string + DeviceInfo interface{} + LoginCredentials map[string]string + ParentOID string + OID string + ContactClient func(context.Context, string, string, string, string, interface{}, map[string]string) (*http.Response, error) + OemFlag bool + Plugin agmodel.Plugin + TaskRequest string + HTTPMethodType string + Token string + StatusPoll bool + CreateSubscription func(context.Context, []string) + PublishEvent func(context.Context, []string, string) + GetPluginStatus func(context.Context, agmodel.Plugin) bool + UpdateFlag bool + TargetURI string + UpdateTask func(context.Context, common.TaskData) error + BMCAddress string } type respHolder struct { diff --git a/svc-aggregation/system/deleteaggregationsource.go b/svc-aggregation/system/deleteaggregationsource.go index 84ead5aa4..b70e435db 100644 --- a/svc-aggregation/system/deleteaggregationsource.go +++ b/svc-aggregation/system/deleteaggregationsource.go @@ -34,7 +34,8 @@ import ( ) // DeleteAggregationSources is used to delete aggregation sources -func (e *ExternalInterface) DeleteAggregationSources(ctx context.Context, taskID string, targetURI string, req *aggregatorproto.AggregatorRequest) error { +func (e *ExternalInterface) DeleteAggregationSources(ctx context.Context, taskID string, targetURI string, + req *aggregatorproto.AggregatorRequest, sessionUserName string) error { var task = common.TaskData{ TaskID: taskID, TargetURI: targetURI, @@ -58,7 +59,7 @@ func (e *ExternalInterface) DeleteAggregationSources(ctx context.Context, taskID go runtime.Goexit() } l.LogWithFields(ctx).Debugf("request data for delete aggregation source: %s", string(req.RequestBody)) - data := e.DeleteAggregationSource(ctx, req) + data := e.DeleteAggregationSource(ctx, req, sessionUserName) err = e.UpdateTask(ctx, common.TaskData{ TaskID: taskID, TargetURI: targetURI, @@ -83,7 +84,8 @@ func (e *ExternalInterface) DeleteAggregationSources(ctx context.Context, taskID } // DeleteAggregationSource is the handler for removing bmc or manager -func (e *ExternalInterface) DeleteAggregationSource(ctx context.Context, req *aggregatorproto.AggregatorRequest) response.RPC { +func (e *ExternalInterface) DeleteAggregationSource(ctx context.Context, + req *aggregatorproto.AggregatorRequest, sessionUserName string) response.RPC { var resp response.RPC aggregationSource, dbErr := agmodel.GetAggregationSourceInfo(ctx, req.URL) @@ -145,7 +147,7 @@ func (e *ExternalInterface) DeleteAggregationSource(ctx context.Context, req *ag } for _, systemURI := range systemList { index := strings.LastIndexAny(systemURI, "/") - resp = e.deleteCompute(ctx, systemURI, index, target.PluginID) + resp = e.deleteCompute(ctx, systemURI, index, target.PluginID, sessionUserName) } removeAggregationSourceFromAggregates(ctx, systemList) } @@ -385,7 +387,8 @@ func (e *ExternalInterface) deletePlugin(ctx context.Context, oid string) respon return resp } -func (e *ExternalInterface) deleteCompute(ctx context.Context, key string, index int, pluginID string) response.RPC { +func (e *ExternalInterface) deleteCompute(ctx context.Context, key string, index int, pluginID string, + sessionUserName string) response.RPC { var resp response.RPC // check whether the any system operation is under progress systemOperation, dbErr := agmodel.GetSystemOperationInfo(ctx, strings.TrimSuffix(key, "/")) @@ -441,7 +444,7 @@ func (e *ExternalInterface) deleteCompute(ctx context.Context, key string, index } }() // Delete Subscription on odimra and also on device - subResponse, err := e.DeleteEventSubscription(ctx, key) + subResponse, err := e.DeleteEventSubscription(ctx, key, sessionUserName) if err != nil && subResponse == nil { errMsg := fmt.Sprintf("error while trying to delete subscriptions: %v", err) l.LogWithFields(ctx).Error(errMsg) diff --git a/svc-events/events/deletesubscription.go b/svc-events/events/deletesubscription.go index db31a1aea..c60bd753c 100644 --- a/svc-events/events/deletesubscription.go +++ b/svc-events/events/deletesubscription.go @@ -50,14 +50,20 @@ var ( ) // DeleteEventSubscriptions delete subscription data against given URL -func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req *eventsproto.EventRequest) response.RPC { +func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req *eventsproto.EventRequest, taskId string) response.RPC { var resp response.RPC originResource := req.UUID uuid, err := getUUID(originResource) + var ( + percentComplete int32 = 100 + targetURI = "/redfish/v1/EventService/Subscriptions" + ) if err != nil { msgArgs := []interface{}{"OriginResource", originResource} evcommon.GenErrorResponse(err.Error(), response.ResourceNotFound, http.StatusBadRequest, msgArgs, &resp) l.LogWithFields(ctx).Error(err.Error()) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } target, err := e.GetTarget(uuid) @@ -66,6 +72,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * errorMessage := err.Error() msgArgs := []interface{}{"uuid", uuid} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusBadRequest, msgArgs, &resp) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } deviceIPAddress, err := GetIPFromHostNameFunc(target.ManagerAddress) @@ -73,6 +81,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * msgArgs := []interface{}{"Host", target.ManagerAddress} evcommon.GenErrorResponse(err.Error(), response.ResourceNotFound, http.StatusNotFound, msgArgs, &resp) l.LogWithFields(ctx).Error(err.Error()) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } searchKey := evcommon.GetSearchKey(deviceIPAddress, evmodel.SubscriptionIndex) @@ -82,6 +92,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * errorMessage := err.Error() msgArgs := []interface{}{"Host", target.ManagerAddress} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusNotFound, msgArgs, &resp) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } l.LogWithFields(ctx).Debug("Number of subscription present :", strconv.Itoa(len(subscriptionDetails))) @@ -92,6 +104,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * msgArgs := []interface{}{""} evcommon.GenErrorResponse(errorMessage, response.InternalError, http.StatusInternalServerError, msgArgs, &resp) l.LogWithFields(ctx).Error(errorMessage) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } target.Password = decryptedPasswordByte @@ -103,6 +117,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * l.LogWithFields(ctx).Error("error while deleting event subscription details : " + err.Error()) msgArgs := []interface{}{"Host", target.ManagerAddress} evcommon.GenErrorResponse(err.Error(), response.ResourceNotFound, http.StatusBadRequest, msgArgs, &resp) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } searchKey = evcommon.GetSearchKey(deviceIPAddress, evmodel.DeviceSubscriptionIndex) @@ -112,6 +128,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * msgArgs := []interface{}{"Host", target.ManagerAddress} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusBadRequest, msgArgs, &resp) l.LogWithFields(ctx).Error(errorMessage) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } originResource = deviceSubscription.OriginResources[0] @@ -130,6 +148,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * msgArgs := []interface{}{"SubscriptionID", evtSubscription.SubscriptionID} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusBadRequest, msgArgs, &resp) l.LogWithFields(ctx).Error(errorMessage) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } } else { @@ -142,6 +162,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * msgArgs := []interface{}{"SubscriptionID", evtSubscription.SubscriptionID} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusBadRequest, msgArgs, &resp) l.LogWithFields(ctx).Error(errorMessage) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } } @@ -152,6 +174,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * errorMessage := "Error while deleting device subscription : " + err.Error() l.LogWithFields(ctx).Error(errorMessage) } + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.OK, common.Completed, + percentComplete, http.MethodDelete)) resp.StatusCode = http.StatusNoContent resp.StatusMessage = response.ResourceRemoved @@ -173,23 +197,35 @@ func (e *ExternalInterfaces) deleteSubscription(ctx context.Context, target *com } // DeleteEventSubscriptionsDetails delete subscription data against given subscription id -func (e *ExternalInterfaces) DeleteEventSubscriptionsDetails(ctx context.Context, req *eventsproto.EventRequest) response.RPC { +func (e *ExternalInterfaces) DeleteEventSubscriptionsDetails(ctx context.Context, req *eventsproto.EventRequest, + sessionUserName string, taskId string) response.RPC { var resp response.RPC - authResp, err := e.Auth(ctx, req.SessionToken, []string{common.PrivilegeConfigureComponents}, []string{}) + var ( + percentComplete int32 = 100 + targetURI = "/redfish/v1/EventService/Subscriptions" + ) + reqCtx := common.CreateNewRequestContext(ctx) + reqCtx = common.CreateMetadata(reqCtx) + authResp, err := e.Auth(reqCtx, req.SessionToken, []string{common.PrivilegeConfigureComponents}, []string{}) if authResp.StatusCode != http.StatusOK { errMsg := fmt.Sprintf("error while trying to authenticate session: status code: %v, status message: %v", authResp.StatusCode, authResp.StatusMessage) if err != nil { errMsg = errMsg + ": " + err.Error() } l.LogWithFields(ctx).Error(errMsg) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return authResp } subscriptionDetails, err := e.GetEvtSubscriptions(req.EventSubscriptionID) if err != nil && !strings.Contains(err.Error(), "No data found for the key") { - l.LogWithFields(ctx).Error("error while deleting eventsubscription details : " + err.Error()) + l.LogWithFields(ctx).Error("error while deleting event subscription details : " + err.Error()) errorMessage := err.Error() msgArgs := []interface{}{"SubscriptionID", req.EventSubscriptionID} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusBadRequest, msgArgs, &resp) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) + return resp } if len(subscriptionDetails) < 1 { @@ -197,6 +233,9 @@ func (e *ExternalInterfaces) DeleteEventSubscriptionsDetails(ctx context.Context l.LogWithFields(ctx).Error(errorMessage) var msgArgs = []interface{}{"SubscriptionID", req.EventSubscriptionID} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusNotFound, msgArgs, &resp) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) + return resp } for _, evtSubscription := range subscriptionDetails { @@ -207,15 +246,26 @@ func (e *ExternalInterfaces) DeleteEventSubscriptionsDetails(ctx context.Context l.LogWithFields(ctx).Error(errorMessage) var msgArgs = []interface{}{"SubscriptionID", req.EventSubscriptionID} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusNotFound, msgArgs, &resp) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) + return resp } // Delete and re subscribe Event Subscription - err = e.deleteAndReSubscribeToEvents(ctx, evtSubscription, req.SessionToken) + isStatusAccepted, err := e.deleteAndReSubscribeToEvents(ctx, evtSubscription, req.SessionToken, sessionUserName, taskId) if err != nil { errorMessage := err.Error() msgArgs := []interface{}{"SubscriptionID", req.EventSubscriptionID} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusBadRequest, msgArgs, &resp) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) + + return resp + } + + if isStatusAccepted { + services.SaveEventSubscriptionID(ctx, taskId, evtSubscription.SubscriptionID) return resp } @@ -226,6 +276,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptionsDetails(ctx context.Context errorMessage := err.Error() msgArgs := []interface{}{"SubscriptionID", req.EventSubscriptionID} evcommon.GenErrorResponse(errorMessage, response.ResourceNotFound, http.StatusBadRequest, msgArgs, &resp) + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Exception, + common.Critical, percentComplete, http.MethodDelete)) return resp } } @@ -242,11 +294,17 @@ func (e *ExternalInterfaces) DeleteEventSubscriptionsDetails(ctx context.Context commonResponse.CreateGenericResponse(resp.StatusMessage) resp.Body = commonResponse + e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.OK, + common.Completed, percentComplete, http.MethodDelete)) + return resp } // This function is to delete and re subscribe for Event Subscriptions -func (e *ExternalInterfaces) deleteAndReSubscribeToEvents(ctx context.Context, evtSubscription evmodel.SubscriptionResource, sessionToken string) error { +func (e *ExternalInterfaces) deleteAndReSubscribeToEvents(ctx context.Context, evtSubscription evmodel.SubscriptionResource, + sessionToken, sessionUserName string, taskID string) (bool, error) { + + var isStatusAccepted bool originResources := evtSubscription.EventDestination.OriginResources for _, origin := range originResources { // ignore if origin is empty @@ -255,7 +313,7 @@ func (e *ExternalInterfaces) deleteAndReSubscribeToEvents(ctx context.Context, e } subscriptionDetails, err := e.GetEvtSubscriptions(origin.Oid) if err != nil { - return err + return isStatusAccepted, err } subscriptionDetails = append([]evmodel.SubscriptionResource{{ EventDestination: &model.EventDestination{ @@ -273,7 +331,7 @@ func (e *ExternalInterfaces) deleteAndReSubscribeToEvents(ctx context.Context, e var deleteFlag bool if len(subscriptionDetails) < 1 { - return fmt.Errorf("subscription details not found for subscription id: %s", origin) + return isStatusAccepted, fmt.Errorf("subscription details not found for subscription id: %s", origin) } else if len(subscriptionDetails) == 1 { deleteFlag = true } @@ -321,12 +379,12 @@ func (e *ExternalInterfaces) deleteAndReSubscribeToEvents(ctx context.Context, e Destination: destination, } - err = e.subscribe(ctx, subscriptionPost, origin.Oid, deleteFlag, sessionToken) + isStatusAccepted, err = e.subscribe(ctx, subscriptionPost, origin.Oid, deleteFlag, sessionToken, sessionUserName, taskID) if err != nil { - return err + return isStatusAccepted, err } } - return nil + return isStatusAccepted, nil } func isCollectionOriginResourceURI(origin string) bool { @@ -358,40 +416,43 @@ func isCollectionOriginResourceURI(origin string) bool { } // Subscribe to the Event Subscription -func (e *ExternalInterfaces) subscribe(ctx context.Context, subscriptionPost model.EventDestination, origin string, deleteflag bool, sessionToken string) error { +func (e *ExternalInterfaces) subscribe(ctx context.Context, subscriptionPost model.EventDestination, origin string, + deleteFlag bool, sessionToken string, sessionUserName string, taskId string) (bool, error) { + var isStatusAccepted bool if strings.Contains(origin, "Fabrics") { - return e.resubscribeFabricsSubscription(ctx, subscriptionPost, origin, deleteflag) + return isStatusAccepted, e.resubscribeFabricsSubscription(ctx, subscriptionPost, origin, sessionUserName, taskId, deleteFlag) } if strings.Contains(origin, "/redfish/v1/AggregationService/Aggregates") { - return e.resubscribeAggregateSubscription(ctx, subscriptionPost, origin, deleteflag, sessionToken) + return isStatusAccepted, e.resubscribeAggregateSubscription(ctx, subscriptionPost, origin, deleteFlag, sessionToken, sessionUserName, taskId) } originResource := origin if isCollectionOriginResourceURI(originResource) { l.LogWithFields(ctx).Error("Collection of origin resource:" + originResource) - return nil + return isStatusAccepted, nil } target, _, err := e.getTargetDetails(originResource) if err != nil { - return err + return isStatusAccepted, err } plugin, errs := e.GetPluginData(target.PluginID) if errs != nil { - return errs + return isStatusAccepted, errs } postBody, err := json.Marshal(subscriptionPost) if err != nil { - return fmt.Errorf("error while marshalling subscription details: %s", err) + return isStatusAccepted, fmt.Errorf("error while marshalling subscription details: %s", err) } target.PostBody = postBody - _, err = e.DeleteSubscriptions(ctx, origin, "", plugin, target) - if err != nil { - return err - } - // if deleteflag is true then only one document is there + // _, err = e.DeleteSubscriptions(ctx, origin, "", plugin, target) + // if err != nil { + // return err + // } + + // if deleteFlag is true then only one document is there // so don't re subscribe again - if deleteflag { - return nil + if deleteFlag { + return isStatusAccepted, nil } var contactRequest evcommon.PluginContactRequest @@ -399,7 +460,7 @@ func (e *ExternalInterfaces) subscribe(ctx context.Context, subscriptionPost mod if strings.EqualFold(plugin.PreferredAuthType, "XAuthToken") { token := e.getPluginToken(ctx, plugin) if token == "" { - return fmt.Errorf("error: Unable to create session with plugin " + plugin.ID) + return isStatusAccepted, fmt.Errorf("error: Unable to create session with plugin " + plugin.ID) } contactRequest.Token = token } else { @@ -409,14 +470,30 @@ func (e *ExternalInterfaces) subscribe(ctx context.Context, subscriptionPost mod } } + + // Note: commenting delete subscription API call and calling patch API instead + // a subtask is created for each patch call + + subtaskID := e.CreateSubTask(ctx, sessionUserName, taskId) contactRequest.URL = "/ODIM/v1/Subscriptions" - contactRequest.HTTPMethodType = http.MethodPost + contactRequest.HTTPMethodType = http.MethodPatch contactRequest.PostBody = target - - _, loc, _, _, err := e.PluginCall(ctx, contactRequest) + createResponse, loc, _, pluginIP, err := e.PluginCall(ctx, contactRequest) if err != nil { - return err + e.UpdateTask(ctx, fillTaskData(subtaskID, contactRequest.URL, "", createResponse, common.Exception, + common.Critical, 100, http.MethodPatch)) + return isStatusAccepted, err } + if createResponse.StatusCode == http.StatusAccepted { + isStatusAccepted = true + services.SavePluginTaskInfo(ctx, pluginIP, plugin.IP, + subtaskID, loc) + return isStatusAccepted, nil + } + + e.UpdateTask(ctx, fillTaskData(subtaskID, contactRequest.URL, "", createResponse, common.Completed, + common.OK, 100, http.MethodPatch)) + // Update Location to all destination of device if already subscribed to the device var resp response.RPC deviceIPAddress, err := common.GetIPFromHostName(target.ManagerAddress) @@ -428,14 +505,14 @@ func (e *ExternalInterfaces) subscribe(ctx context.Context, subscriptionPost mod searchKey := evcommon.GetSearchKey(deviceIPAddress, evmodel.DeviceSubscriptionIndex) devSub, err := e.GetDeviceSubscriptions(searchKey) if err != nil { - return err + return isStatusAccepted, err } deviceSub := common.DeviceSubscription{ EventHostIP: devSub.EventHostIP, Location: loc, OriginResources: devSub.OriginResources, } - return e.UpdateDeviceSubscriptionLocation(deviceSub) + return isStatusAccepted, e.UpdateDeviceSubscriptionLocation(deviceSub) } @@ -509,7 +586,8 @@ func (e *ExternalInterfaces) DeleteFabricsSubscription(ctx context.Context, orig } // resubscribeFabricsSubscription updates subscription fabric subscription details by forming the super set of MessageIDs,EventTypes and ResourceTypes -func (e *ExternalInterfaces) resubscribeFabricsSubscription(ctx context.Context, subscriptionPost model.EventDestination, origin string, deleteflag bool) error { +func (e *ExternalInterfaces) resubscribeFabricsSubscription(ctx context.Context, subscriptionPost model.EventDestination, + sessionUserName string, taskId string, origin string, deleteFlag bool) error { originResources := e.getSubordinateResourcesFromCollection(origin) for _, origin := range originResources { originResource := origin @@ -538,9 +616,9 @@ func (e *ExternalInterfaces) resubscribeFabricsSubscription(ctx context.Context, } return err } - // if deleteflag is true then only one document is there + // if deleteFlag is true then only one document is there // so don't re subscribe again - if deleteflag { + if deleteFlag { return nil } var contactRequest evcommon.PluginContactRequest @@ -572,6 +650,7 @@ func (e *ExternalInterfaces) resubscribeFabricsSubscription(ctx context.Context, reqData = strings.Replace(string(postBody), key, value, -1) } + subtaskID := e.CreateSubTask(ctx, sessionUserName, taskId) // recreating the subscription contactRequest.URL = "/ODIM/v1/Subscriptions" contactRequest.HTTPMethodType = http.MethodPost @@ -580,16 +659,31 @@ func (e *ExternalInterfaces) resubscribeFabricsSubscription(ctx context.Context, return err } l.LogWithFields(ctx).Info("Resubscribe request" + reqData) - response, loc, _, _, err := e.PluginCall(ctx, contactRequest) + response, loc, _, pluginIP, err := e.PluginCall(ctx, contactRequest) if err != nil { + e.UpdateTask(ctx, fillTaskData(subtaskID, contactRequest.URL, "", response, common.Exception, + common.Critical, 100, http.MethodPost)) return err } + + if response.StatusCode == http.StatusAccepted { + services.SavePluginTaskInfo(ctx, pluginIP, plugin.IP, + subtaskID, loc) + return nil + } + if response.StatusCode == http.StatusUnauthorized && strings.EqualFold(plugin.PreferredAuthType, "XAuthToken") { _, _, _, _, err = e.retryEventOperation(ctx, contactRequest) if err != nil { + e.UpdateTask(ctx, fillTaskData(subtaskID, contactRequest.URL, "", response, common.Exception, + common.Critical, 100, http.MethodPost)) return err } } + + e.UpdateTask(ctx, fillTaskData(subtaskID, contactRequest.URL, "", response, common.ComputerSystemReset, + common.OK, 100, http.MethodPost)) + addr, err := common.GetIPFromHostName(plugin.IP) if err != nil { return fmt.Errorf(err.Error()) @@ -724,14 +818,15 @@ func getAggregateSystemList(origin string, sessionToken string) ([]model.Link, e // resubscribeAggregateSubscription method subscribe event for // aggregate system members -func (e *ExternalInterfaces) resubscribeAggregateSubscription(ctx context.Context, subscriptionPost model.EventDestination, origin string, deleteflag bool, sessionToken string) error { +func (e *ExternalInterfaces) resubscribeAggregateSubscription(ctx context.Context, subscriptionPost model.EventDestination, + origin string, deleteFlag bool, sessionToken string, sessionUserName string, taskId string) error { originResource := origin systems, err := getAggregateSystemList(originResource, sessionToken) if err != nil { return nil } for _, system := range systems { - err = e.subscribe(ctx, subscriptionPost, system.Oid, deleteflag, sessionToken) + _, err = e.subscribe(ctx, subscriptionPost, system.Oid, deleteFlag, sessionToken, sessionUserName, taskId) if err != nil { return err } diff --git a/svc-events/events/evtsubscription.go b/svc-events/events/evtsubscription.go index aef0508bf..122003ecf 100644 --- a/svc-events/events/evtsubscription.go +++ b/svc-events/events/evtsubscription.go @@ -860,8 +860,8 @@ func (e *ExternalInterfaces) createEventSubscription(ctx context.Context, taskID } // CreateSubTask creates a child task for a task calling RPC to task service and returns the subtask ID -func (e *ExternalInterfaces) CreateSubTask(ctx context.Context, reqSessionToken string, parentTask string) string { - subTaskURI, err := e.CreateChildTask(ctx, reqSessionToken, parentTask) +func (e *ExternalInterfaces) CreateSubTask(ctx context.Context, sessionUserName string, parentTask string) string { + subTaskURI, err := e.CreateChildTask(ctx, sessionUserName, parentTask) if err != nil { l.LogWithFields(ctx).Error("Error while creating the SubTask") } diff --git a/svc-events/evresponse/events_test.go b/svc-events/evresponse/events_test.go index 370eed5cd..d0eab8d2d 100644 --- a/svc-events/evresponse/events_test.go +++ b/svc-events/evresponse/events_test.go @@ -32,7 +32,7 @@ func TestResponse(t *testing.T) { "37646c88-a7d7-468c-af58-49e8a0adbbb2.1", } var hosts = []string{"10.10.10.10", "10.10.10.11", "10.10.10.12"} - var responses = []EventResponse{{StatusCode: 201}, {StatusCode: 400}, {StatusCode: 201}} + var responses = []EventResponse{{StatusCode: 202}, {StatusCode: 400}, {StatusCode: 202}} var result = &MutexLock{ Response: make(map[string]EventResponse), Lock: &sync.Mutex{}, diff --git a/svc-events/go.mod b/svc-events/go.mod index 059ddb080..7929b53b6 100644 --- a/svc-events/go.mod +++ b/svc-events/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/ODIM-Project/ODIM/lib-dmtf v0.0.0-20221116052950-e215237718e2 github.com/ODIM-Project/ODIM/lib-messagebus v0.0.0-20211220033333-4314870ed337 - github.com/ODIM-Project/ODIM/lib-persistence-manager v0.0.0-20201201072448-9772421f1b55 + github.com/ODIM-Project/ODIM/lib-persistence-manager v0.0.0-20230719110936-f43048b6407a github.com/ODIM-Project/ODIM/lib-rest-client v0.0.0-20201201072448-9772421f1b55 github.com/ODIM-Project/ODIM/lib-utilities v0.0.0-20220426104855-9b203a83173f github.com/go-redis/redis v6.15.9+incompatible diff --git a/svc-events/rpc/common.go b/svc-events/rpc/common.go new file mode 100644 index 000000000..45fbef23d --- /dev/null +++ b/svc-events/rpc/common.go @@ -0,0 +1,69 @@ +package rpc + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/ODIM-Project/ODIM/lib-utilities/common" + l "github.com/ODIM-Project/ODIM/lib-utilities/logs" + eventsproto "github.com/ODIM-Project/ODIM/lib-utilities/proto/events" + "github.com/ODIM-Project/ODIM/lib-utilities/response" +) + +func (e *Events) AuthorizeAndCreateTask(ctx context.Context, sessionToken string, + resp *eventsproto.EventSubResponse) (string, string, error) { + + var ( + err error + taskID, taskURI, sessionUserName string + ) + + // Athorize the request here + authResp, err := e.Connector.Auth(ctx, sessionToken, []string{common.PrivilegeConfigureComponents}, []string{}) + if authResp.StatusCode != http.StatusOK { + errMsg := fmt.Sprintf("error while trying to authenticate session: status code: %v, status message: %v", + authResp.StatusCode, authResp.StatusMessage) + if err != nil { + errMsg = errMsg + ": " + err.Error() + } + l.LogWithFields(ctx).Error(errMsg) + resp.Body = generateResponse(ctx, authResp.Body) + resp.StatusCode = authResp.StatusCode + return sessionUserName, taskID, fmt.Errorf(errMsg) + } + + sessionUserName, err = e.Connector.GetSessionUserName(ctx, sessionToken) + if err != nil { + errorMessage := "error while trying to get the session username: " + err.Error() + resp.Body = generateResponse(ctx, common.GeneralError(http.StatusUnauthorized, + response.NoValidSession, errorMessage, nil, nil)) + resp.StatusCode = http.StatusUnauthorized + l.LogWithFields(ctx).Error(errorMessage) + return sessionUserName, taskID, err + } + // Create the task and get the taskID + // Contact Task Service using RPC and get the taskID + taskURI, err = e.Connector.CreateTask(ctx, sessionUserName) + if err != nil { + // print err here as we are unbale to contact svc-task service + errorMessage := "error while trying to create the task: " + err.Error() + resp.StatusCode = http.StatusInternalServerError + resp.StatusMessage = response.InternalError + resp.Body, _ = json.Marshal(common.GeneralError(http.StatusInternalServerError, + response.InternalError, errorMessage, nil, nil).Body) + l.LogWithFields(ctx).Error(errorMessage) + return sessionUserName, taskID, fmt.Errorf(errorMessage) + } + + taskID = strings.TrimPrefix(taskURI, "/redfish/v1/TaskService/Tasks/") + resp.StatusCode = http.StatusAccepted + resp.Header = map[string]string{ + "Location": "/taskmon/" + taskID, + } + resp.StatusMessage = response.TaskStarted + generateTaskResponse(ctx, taskID, taskURI, resp) + return sessionUserName, taskID, nil +} diff --git a/svc-events/rpc/events.go b/svc-events/rpc/events.go index 06912ce3c..d2aaf50a3 100644 --- a/svc-events/rpc/events.go +++ b/svc-events/rpc/events.go @@ -21,7 +21,6 @@ import ( "fmt" "net/http" "os" - "strings" "github.com/ODIM-Project/ODIM/lib-rest-client/pmbhandle" "github.com/ODIM-Project/ODIM/lib-utilities/common" @@ -212,61 +211,14 @@ func (e *Events) GetEventService(ctx context.Context, req *eventsproto.EventSubR // The functionality is to create the subscription with Resource provided in origin resources. func (e *Events) CreateEventSubscription(ctx context.Context, req *eventsproto.EventSubRequest) (*eventsproto.EventSubResponse, error) { var resp eventsproto.EventSubResponse - var err error - var taskID string ctx = common.GetContextData(ctx) ctx = common.ModifyContext(ctx, common.EventService, podName) - - // Athorize the request here - authResp, err := e.Connector.Auth(ctx, req.SessionToken, []string{common.PrivilegeConfigureComponents}, []string{}) - if authResp.StatusCode != http.StatusOK { - errMsg := fmt.Sprintf("error while trying to authenticate session: status code: %v, status message: %v", - authResp.StatusCode, authResp.StatusMessage) - if err != nil { - errMsg = errMsg + ": " + err.Error() - } - l.LogWithFields(ctx).Error(errMsg) - resp.Body = generateResponse(ctx, authResp.Body) - resp.StatusCode = authResp.StatusCode - return &resp, nil - } - sessionUserName, err := e.Connector.GetSessionUserName(ctx, req.SessionToken) + sessionUserName, taskID, err := e.AuthorizeAndCreateTask(ctx, req.SessionToken, &resp) if err != nil { - errorMessage := "error while trying to get the session username: " + err.Error() - resp.Body = generateResponse(ctx, common.GeneralError(http.StatusUnauthorized, - response.NoValidSession, errorMessage, nil, nil)) - resp.StatusCode = http.StatusUnauthorized - l.LogWithFields(ctx).Error(errorMessage) return &resp, err } - // Create the task and get the taskID - // Contact Task Service using RPC and get the taskID - taskURI, err := e.Connector.CreateTask(ctx, sessionUserName) - if err != nil { - // print err here as we are unbale to contact svc-task service - errorMessage := "error while trying to create the task: " + err.Error() - resp.StatusCode = http.StatusInternalServerError - resp.StatusMessage = response.InternalError - resp.Body, _ = json.Marshal(common.GeneralError(http.StatusInternalServerError, - response.InternalError, errorMessage, nil, nil).Body) - l.LogWithFields(ctx).Error(errorMessage) - return &resp, fmt.Errorf(resp.StatusMessage) - } - strArray := strings.Split(taskURI, "/") - if strings.HasSuffix(taskURI, "/") { - taskID = strArray[len(strArray)-2] - } else { - taskID = strArray[len(strArray)-1] - } //Spawn the thread to process the action asynchronously go e.Connector.CreateEventSubscription(ctx, taskID, sessionUserName, req) - // Return 202 accepted - resp.StatusCode = http.StatusAccepted - resp.Header = map[string]string{ - "Location": "/taskmon/" + taskID, - } - resp.StatusMessage = response.TaskStarted - generateTaskResponse(ctx, taskID, taskURI, &resp) return &resp, nil } @@ -349,28 +301,23 @@ func (e *Events) GetEventSubscription(ctx context.Context, req *eventsproto.Even func (e *Events) DeleteEventSubscription(ctx context.Context, req *eventsproto.EventRequest) (*eventsproto.EventSubResponse, error) { ctx = common.GetContextData(ctx) ctx = common.ModifyContext(ctx, common.EventService, podName) - var resp eventsproto.EventSubResponse - var err error - var data response.RPC + var ( + taskID string + resp eventsproto.EventSubResponse + ) + + sessionUserName, taskID, err := e.AuthorizeAndCreateTask(ctx, req.SessionToken, &resp) + if err != nil { + return &resp, err + } + if req.UUID == "" { // Delete Event Subscription when admin requested - data = e.Connector.DeleteEventSubscriptionsDetails(ctx, req) + go e.Connector.DeleteEventSubscriptionsDetails(ctx, req, sessionUserName, taskID) } else { // Delete Event Subscription to Device when Server get Deleted - data = e.Connector.DeleteEventSubscriptions(ctx, req) + go e.Connector.DeleteEventSubscriptions(ctx, req, taskID) } - resp.Body, err = JSONMarshal(data.Body) - if err != nil { - errorMessage := "error while trying marshal the response body for delete event subsciption : " + err.Error() - resp.StatusCode = http.StatusInternalServerError - resp.StatusMessage = response.InternalError - resp.Body, _ = json.Marshal(common.GeneralError(http.StatusInternalServerError, response.InternalError, errorMessage, nil, nil).Body) - l.LogWithFields(ctx).Error(resp.StatusMessage) - return &resp, nil - } - resp.StatusCode = data.StatusCode - resp.StatusMessage = data.StatusMessage - resp.Header = data.Header return &resp, nil } From 4293ae63c9235c049e137403d599a611beeaf60f Mon Sep 17 00:00:00 2001 From: rahul-chaube Date: Wed, 30 Aug 2023 22:56:01 +0530 Subject: [PATCH 2/2] Implemented subtasks for delete event subscription --- lib-utilities/proto/events/events.proto | 1 + lib-utilities/services/pluginTask.go | 19 ++++++++ svc-aggregation/agmessagebus/publish_test.go | 1 - svc-aggregation/go.mod | 2 +- svc-aggregation/rpc/common_test.go | 2 +- .../system/deleteaggregationsource_test.go | 24 +++++++--- svc-api/rpc/fakeStruct_testfile.go | 3 ++ svc-events/events/deletesubscription.go | 35 +++++++++------ svc-events/events/deletesubscription_test.go | 44 +++++++++---------- svc-events/events/evtsubscription.go | 24 ++++++++++ svc-events/rpc/events.go | 40 ++++++++++++++--- svc-events/rpc/events_test.go | 12 ++--- svc-task/tcommon/common.go | 20 +++++++++ svc-task/thandle/thandle.go | 7 +++ 14 files changed, 175 insertions(+), 59 deletions(-) diff --git a/lib-utilities/proto/events/events.proto b/lib-utilities/proto/events/events.proto index e55104de8..131a863d5 100644 --- a/lib-utilities/proto/events/events.proto +++ b/lib-utilities/proto/events/events.proto @@ -28,6 +28,7 @@ service Events { rpc IsAggregateHaveSubscription(EventUpdateRequest) returns (SubscribeEMBResponse){} rpc DeleteAggregateSubscriptionsRPC(EventUpdateRequest) returns (SubscribeEMBResponse){} rpc UpdateSubscriptionLocationRPC(UpdateSubscriptionLocation) returns (SubscribeEMBResponse) {} + rpc DeleteSubscription(EventRequest) returns (EventSubResponse) {} } message EventSubRequest { diff --git a/lib-utilities/services/pluginTask.go b/lib-utilities/services/pluginTask.go index db72937c0..7a26e46e8 100644 --- a/lib-utilities/services/pluginTask.go +++ b/lib-utilities/services/pluginTask.go @@ -65,6 +65,25 @@ func SaveEventSubscriptionID(ctx context.Context, taskID, subscriptionID string) return nil } +// SaveEventSubscriptionID saves the event subscription id in db +// it receives taskID and subscription ID. taskID is the key. +func GetEventSubscriptionID(ctx context.Context, taskID string) (string, error) { + + table := "EventSubscriptionID" + connPool, err := redis.GetDBConnection(redis.InMemory) + if err != nil { + return "", errors.PackError(err.ErrNo(), "error while trying to connecting"+ + " to DB: ", err.Error()) + } + + value, err := connPool.Read(table, taskID) + if err != nil { + return "", errors.PackError(err.ErrNo(), "error while trying to insert"+ + " plugin task: ", err.Error()) + } + return value, nil +} + // createPluginTask will insert plugin task info in DB func createPluginTask(ctx context.Context, key string, value interface{}) *errors.Error { diff --git a/svc-aggregation/agmessagebus/publish_test.go b/svc-aggregation/agmessagebus/publish_test.go index 4b73bd749..e1cc1c7d8 100644 --- a/svc-aggregation/agmessagebus/publish_test.go +++ b/svc-aggregation/agmessagebus/publish_test.go @@ -134,7 +134,6 @@ func TestPublish(t *testing.T) { assert.NotEqual(t, nil, err, "Error should not be Nil for this scenario") } else { - fmt.Println(err) assert.Equal(t, nil, err, "Error should be Nil for this scenario") } diff --git a/svc-aggregation/go.mod b/svc-aggregation/go.mod index 3d6b178c6..7bc5afdab 100644 --- a/svc-aggregation/go.mod +++ b/svc-aggregation/go.mod @@ -18,7 +18,7 @@ require ( github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 // indirect github.com/CloudyKit/jet/v6 v6.2.0 // indirect github.com/Joker/jade v1.1.3 // indirect - github.com/ODIM-Project/ODIM/lib-persistence-manager v0.0.0-20201201072448-9772421f1b55 // indirect + github.com/ODIM-Project/ODIM/lib-persistence-manager v0.0.0-20230719110936-f43048b6407a // indirect github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/aymerick/douceur v0.2.0 // indirect diff --git a/svc-aggregation/rpc/common_test.go b/svc-aggregation/rpc/common_test.go index 52efa2f34..00df3d589 100644 --- a/svc-aggregation/rpc/common_test.go +++ b/svc-aggregation/rpc/common_test.go @@ -119,7 +119,7 @@ func deleteSystemforTest(key string) *errors.Error { return nil } -func mockDeleteSubscription(ctx context.Context, uuid string) (*eventsproto.EventSubResponse, error) { +func mockDeleteSubscription(ctx context.Context, uuid string, sessionName string) (*eventsproto.EventSubResponse, error) { if uuid == "/redfish/v1/systems/delete-subscription-error.1" { return nil, fmt.Errorf("error while trying to delete event subcription") } else if uuid == "/redfish/v1/systems/unexpected-statuscode.1" { diff --git a/svc-aggregation/system/deleteaggregationsource_test.go b/svc-aggregation/system/deleteaggregationsource_test.go index 178209106..7c8875450 100644 --- a/svc-aggregation/system/deleteaggregationsource_test.go +++ b/svc-aggregation/system/deleteaggregationsource_test.go @@ -66,7 +66,7 @@ func deleteSystemforTest(key string) *errors.Error { return nil } -func mockDeleteSubscription(ctx context.Context, uuid string) (*eventsproto.EventSubResponse, error) { +func mockDeleteSubscription(ctx context.Context, uuid, sessionId string) (*eventsproto.EventSubResponse, error) { if uuid == "/redfish/v1/Systems/ef83e569-7336-492a-aaee-31c02d9db832.1" { return nil, fmt.Errorf("error while trying to delete event subcription") } else if uuid == "/redfish/v1/Systems/unexpected-statuscode.1" { @@ -153,7 +153,8 @@ func mockLogServicesCollectionData(id string, data map[string]interface{}) error func TestDeleteAggregationSourceWithRediscovery(t *testing.T) { d := getMockExternalInterface() type args struct { - req *aggregatorproto.AggregatorRequest + req *aggregatorproto.AggregatorRequest + sessionName string } config.SetUpMockConfig(t) defer func() { @@ -227,13 +228,14 @@ func TestDeleteAggregationSourceWithRediscovery(t *testing.T) { SessionToken: "SessionToken", URL: "/redfish/v1/AggregationService/AggregationSources/ef83e569-7336-492a-aaee-31c02d9db831", }, + sessionName: "dummy", }, want: http.StatusNotAcceptable, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := d.DeleteAggregationSource(ctx, tt.args.req) + got := d.DeleteAggregationSource(ctx, tt.args.req, tt.args.sessionName) if got.StatusCode != tt.want { t.Errorf("DeleteAggregationSource() = %v, want %v", got, tt.want) } @@ -338,7 +340,8 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) { t.Fatalf("error: %v", err) } type args struct { - req *aggregatorproto.AggregatorRequest + req *aggregatorproto.AggregatorRequest + sessionName string } tests := []struct { name string @@ -352,6 +355,7 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) { SessionToken: "SessionToken", URL: "/redfish/v1/AggregationService/AggregationSources/123456", }, + sessionName: "dummy", }, want: http.StatusNoContent, }, @@ -362,6 +366,7 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) { SessionToken: "SessionToken", URL: "/redfish/v1/AggregationService/AggregationSources/123455", }, + sessionName: "dummy", }, want: http.StatusNotAcceptable, }, @@ -372,6 +377,7 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) { SessionToken: "SessionToken", URL: "/redfish/v1/AggregationService/AggregationSources/123434", }, + sessionName: "dummy", }, want: http.StatusNotFound, }, @@ -382,13 +388,14 @@ func TestExternalInterface_DeleteAggregationSourceManager(t *testing.T) { SessionToken: "SessionToken", URL: "/redfish/v1/AggregationService/AggregationSources/123457", }, + sessionName: "dummy", }, want: http.StatusNotAcceptable, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := d.DeleteAggregationSource(ctx, tt.args.req) + got := d.DeleteAggregationSource(ctx, tt.args.req, tt.args.sessionName) if got.StatusCode != tt.want { t.Errorf("DeleteAggregationSource() = %v, want %v", got, tt.want) } @@ -478,7 +485,8 @@ func TestExternalInterface_DeleteBMC(t *testing.T) { t.Fatalf("error: %v", err) } type args struct { - req *aggregatorproto.AggregatorRequest + req *aggregatorproto.AggregatorRequest + sessionName string } tests := []struct { name string @@ -492,6 +500,7 @@ func TestExternalInterface_DeleteBMC(t *testing.T) { SessionToken: "SessionToken", URL: "/redfish/v1/AggregationService/AggregationSources/ef83e569-7336-492a-aaee-31c02d9db831", }, + sessionName: "dummy", }, want: http.StatusNoContent, }, @@ -502,13 +511,14 @@ func TestExternalInterface_DeleteBMC(t *testing.T) { SessionToken: "SessionToken", URL: "/redfish/v1/AggregationService/AggregationSources/ef83e569-7336-492a-aaee-31c02d9db832", }, + sessionName: "dummy", }, want: http.StatusInternalServerError, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := d.DeleteAggregationSource(ctx, tt.args.req) + got := d.DeleteAggregationSource(ctx, tt.args.req, tt.args.sessionName) if got.StatusCode != tt.want { t.Errorf("DeleteAggregationSource() = %v, want %v", got, tt.want) } diff --git a/svc-api/rpc/fakeStruct_testfile.go b/svc-api/rpc/fakeStruct_testfile.go index cf8a6ae65..ad997b68d 100644 --- a/svc-api/rpc/fakeStruct_testfile.go +++ b/svc-api/rpc/fakeStruct_testfile.go @@ -240,6 +240,9 @@ func (fakeStruct) SubscribeEMB(ctx context.Context, in *eventsproto.SubscribeEMB func (fakeStruct) UpdateSubscriptionLocationRPC(ctx context.Context, in *eventsproto.UpdateSubscriptionLocation, opts ...grpc.CallOption) (*eventsproto.SubscribeEMBResponse, error) { return nil, errors.New("fakeError") } +func (fakeStruct) DeleteSubscription(ctx context.Context, in *eventsproto.EventRequest, opts ...grpc.CallOption) (*eventsproto.EventSubResponse, error) { + return nil, errors.New("fakeError") +} //--------------------------------------FABRICS-------------------------------------- diff --git a/svc-events/events/deletesubscription.go b/svc-events/events/deletesubscription.go index c60bd753c..ac53bb25f 100644 --- a/svc-events/events/deletesubscription.go +++ b/svc-events/events/deletesubscription.go @@ -112,7 +112,6 @@ func (e *ExternalInterfaces) DeleteEventSubscriptions(ctx context.Context, req * // Delete Event Subscription from device also err = e.deleteSubscription(ctx, target, originResource) - if err != nil { l.LogWithFields(ctx).Error("error while deleting event subscription details : " + err.Error()) msgArgs := []interface{}{"Host", target.ManagerAddress} @@ -263,9 +262,9 @@ func (e *ExternalInterfaces) DeleteEventSubscriptionsDetails(ctx context.Context return resp } - if isStatusAccepted { services.SaveEventSubscriptionID(ctx, taskId, evtSubscription.SubscriptionID) + resp.StatusCode = http.StatusAccepted return resp } @@ -294,8 +293,8 @@ func (e *ExternalInterfaces) DeleteEventSubscriptionsDetails(ctx context.Context commonResponse.CreateGenericResponse(resp.StatusMessage) resp.Body = commonResponse - e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.OK, - common.Completed, percentComplete, http.MethodDelete)) + err = e.UpdateTask(ctx, fillTaskData(taskId, targetURI, string(req.EventSubscriptionID), resp, common.Completed, + common.OK, percentComplete, http.MethodDelete)) return resp } @@ -378,8 +377,7 @@ func (e *ExternalInterfaces) deleteAndReSubscribeToEvents(ctx context.Context, e Protocol: protocol, Destination: destination, } - - isStatusAccepted, err = e.subscribe(ctx, subscriptionPost, origin.Oid, deleteFlag, sessionToken, sessionUserName, taskID) + isStatusAccepted, err = e.subscribe(ctx, subscriptionPost, origin.Oid, deleteFlag, sessionToken, sessionUserName, taskID, evtSubscription.SubscriptionID) if err != nil { return isStatusAccepted, err } @@ -417,17 +415,26 @@ func isCollectionOriginResourceURI(origin string) bool { // Subscribe to the Event Subscription func (e *ExternalInterfaces) subscribe(ctx context.Context, subscriptionPost model.EventDestination, origin string, - deleteFlag bool, sessionToken string, sessionUserName string, taskId string) (bool, error) { + deleteFlag bool, sessionToken string, sessionUserName string, taskId, subscriptionId string) (bool, error) { var isStatusAccepted bool if strings.Contains(origin, "Fabrics") { return isStatusAccepted, e.resubscribeFabricsSubscription(ctx, subscriptionPost, origin, sessionUserName, taskId, deleteFlag) } if strings.Contains(origin, "/redfish/v1/AggregationService/Aggregates") { - return isStatusAccepted, e.resubscribeAggregateSubscription(ctx, subscriptionPost, origin, deleteFlag, sessionToken, sessionUserName, taskId) + return isStatusAccepted, e.resubscribeAggregateSubscription(ctx, subscriptionPost, origin, deleteFlag, sessionToken, sessionUserName, taskId, subscriptionId) } originResource := origin if isCollectionOriginResourceURI(originResource) { + isStatusAccepted = true l.LogWithFields(ctx).Error("Collection of origin resource:" + originResource) + subtaskID := e.CreateSubTask(ctx, sessionUserName, taskId) + resp := response.RPC{} + resp.StatusCode = 200 + resp.Body = subscriptionPost + services.SaveEventSubscriptionID(ctx, taskId, subscriptionId) + + e.UpdateTask(ctx, fillTaskData(subtaskID, originResource, "", resp, common.Completed, + common.OK, 100, http.MethodDelete)) return isStatusAccepted, nil } target, _, err := e.getTargetDetails(originResource) @@ -444,10 +451,10 @@ func (e *ExternalInterfaces) subscribe(ctx context.Context, subscriptionPost mod return isStatusAccepted, fmt.Errorf("error while marshalling subscription details: %s", err) } target.PostBody = postBody - // _, err = e.DeleteSubscriptions(ctx, origin, "", plugin, target) - // if err != nil { - // return err - // } + err = e.GetSubscriptionLocation(ctx, target) + if err != nil { + return isStatusAccepted, err + } // if deleteFlag is true then only one document is there // so don't re subscribe again @@ -819,14 +826,14 @@ func getAggregateSystemList(origin string, sessionToken string) ([]model.Link, e // resubscribeAggregateSubscription method subscribe event for // aggregate system members func (e *ExternalInterfaces) resubscribeAggregateSubscription(ctx context.Context, subscriptionPost model.EventDestination, - origin string, deleteFlag bool, sessionToken string, sessionUserName string, taskId string) error { + origin string, deleteFlag bool, sessionToken string, sessionUserName string, taskId, subscriptionId string) error { originResource := origin systems, err := getAggregateSystemList(originResource, sessionToken) if err != nil { return nil } for _, system := range systems { - _, err = e.subscribe(ctx, subscriptionPost, system.Oid, deleteFlag, sessionToken, sessionUserName, taskId) + _, err = e.subscribe(ctx, subscriptionPost, system.Oid, deleteFlag, sessionToken, sessionUserName, taskId, subscriptionId) if err != nil { return err } diff --git a/svc-events/events/deletesubscription_test.go b/svc-events/events/deletesubscription_test.go index 6d79ec3d9..c2230a2fb 100644 --- a/svc-events/events/deletesubscription_test.go +++ b/svc-events/events/deletesubscription_test.go @@ -50,18 +50,18 @@ func TestDeleteEventSubscription(t *testing.T) { SessionToken: "validToken", EventSubscriptionID: "81de0110-c35a-4859-984c-072d6c5a32d7", } - resp := pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req) + resp := pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req, "admin", "1225122") data := resp.Body.(response.Response) assert.Equal(t, http.StatusOK, int(resp.StatusCode), "Status Code should be StatusOK") assert.Equal(t, "81de0110-c35a-4859-984c-072d6c5a32d7", data.ID, "ID should be 81de0110-c35a-4859-984c-072d6c5a32d7") pc.DB.GetEvtSubscriptions = func(s string) ([]evmodel.SubscriptionResource, error) { return nil, &errors.Error{} } - resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req, "admin", "1225122") assert.Equal(t, http.StatusBadRequest, int(resp.StatusCode), "Status Code should be StatusOK") pc = getMockMethods() pc.DB.DeleteEvtSubscription = func(s string) error { return &errors.Error{} } - resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req, "admin", "1225122") assert.Equal(t, http.StatusBadRequest, int(resp.StatusCode), "Status Code should be StatusOK") pc = getMockMethods() // positive test case with basic auth type @@ -69,7 +69,7 @@ func TestDeleteEventSubscription(t *testing.T) { SessionToken: "validToken", EventSubscriptionID: "71de0110-c35a-4859-984c-072d6c5a32d8", } - resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req, "admin", "1225122") assert.Equal(t, http.StatusOK, int(resp.StatusCode), "Status Code should be StatusOK") // positive test case deletion of collection subscription @@ -77,8 +77,8 @@ func TestDeleteEventSubscription(t *testing.T) { SessionToken: "validToken", EventSubscriptionID: "71de0110-c35a-4859-984c-072d6c5a3211", } - resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req) - assert.Equal(t, http.StatusOK, int(resp.StatusCode), "Status Code should be StatusOK") + resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req, "admin", "1225122") + assert.Equal(t, http.StatusAccepted, int(resp.StatusCode), "Status Code should be StatusOK") // Negative test cases // if subscription id is bot present @@ -86,14 +86,14 @@ func TestDeleteEventSubscription(t *testing.T) { SessionToken: "validToken", EventSubscriptionID: "de018110-4859-984c-c35a-0a32d772d6c5", } - resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req1) + resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req1, "admin", "1225122") assert.Equal(t, http.StatusNotFound, int(resp.StatusCode), "Status Code should be StatusNotFound") // Invalid token req2 := &eventsproto.EventRequest{ SessionToken: "InValidToken", } - resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req2) + resp = pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req2, "admin", "1225122") assert.Equal(t, http.StatusUnauthorized, int(resp.StatusCode), "Status Code should be StatusUnauthorized") } @@ -110,7 +110,7 @@ func TestDeleteEventSubscriptionOnDeletedServer(t *testing.T) { SessionToken: "validToken", UUID: "/redfish/v1/Systems/6d4a0a66-7efa-578e-83cf-44dc68d2874e.1", } - resp := pc.DeleteEventSubscriptions(evcommon.MockContext(), req) + resp := pc.DeleteEventSubscriptions(evcommon.MockContext(), req, "1225122") assert.Equal(t, http.StatusNoContent, int(resp.StatusCode), "Status Code should be StatusNoContent") // Negative test cases @@ -119,7 +119,7 @@ func TestDeleteEventSubscriptionOnDeletedServer(t *testing.T) { SessionToken: "validToken", UUID: "de018110-4859-984c-c35a-0a32d772d6c5", } - resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req1) + resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req1, "1225122") assert.Equal(t, http.StatusBadRequest, int(resp.StatusCode), "Status Code should be StatusNotFound") // if UUID is is not present in DB @@ -128,20 +128,20 @@ func TestDeleteEventSubscriptionOnDeletedServer(t *testing.T) { UUID: "/redfish/v1/Systems/de018110-4859-984c-c35a-0a32d772d6c5.1", } - resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req1) + resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req1, "1225122") assert.Equal(t, http.StatusBadRequest, int(resp.StatusCode), "Status Code should be StatusNotFound") req = &eventsproto.EventRequest{ SessionToken: "validToken", UUID: "/redfish/v1/Systems/abab09db-e7a9-4352-8df0-5e41315a2a4c.1", } - resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req, "1225122") assert.Equal(t, http.StatusBadRequest, int(resp.StatusCode), "Status Code should be StatusNotFound") GetIPFromHostNameFunc = func(fqdn string) (string, error) { return "", fmt.Errorf("Not Found") } - resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req, "1225122") assert.Equal(t, http.StatusNotFound, int(resp.StatusCode), "Status Code should be ResourceNotFound") GetIPFromHostNameFunc = func(fqdn string) (string, error) { @@ -150,7 +150,7 @@ func TestDeleteEventSubscriptionOnDeletedServer(t *testing.T) { pc.DB.GetEvtSubscriptions = func(s string) ([]evmodel.SubscriptionResource, error) { return nil, &errors.Error{} } - resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req, "1225122") assert.Equal(t, http.StatusNotFound, int(resp.StatusCode), "Status Code should be ResourceNotFound") pc = getMockMethods() @@ -163,7 +163,7 @@ func TestDeleteEventSubscriptionOnDeletedServer(t *testing.T) { SessionToken: "validToken", UUID: "/redfish/v1/Systems/6d4a0a66-7efa-578e-83cf-44dc68d2874e.1", } - resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req, "1225122") assert.Equal(t, http.StatusInternalServerError, int(resp.StatusCode), "Status Code should be StatusNoContent") DecryptWithPrivateKeyFunc = func(ciphertext []byte) ([]byte, error) { @@ -172,14 +172,14 @@ func TestDeleteEventSubscriptionOnDeletedServer(t *testing.T) { pc.DB.GetDeviceSubscriptions = func(s string) (*common.DeviceSubscription, error) { return nil, &errors.Error{} } - resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req, "1225122") assert.Equal(t, http.StatusBadRequest, int(resp.StatusCode), "Status Code should be StatusNotFound") pc = getMockMethods() pc.DB.DeleteEvtSubscription = func(s string) error { return &errors.Error{} } - resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req, "1225122") assert.Equal(t, http.StatusBadRequest, int(resp.StatusCode), "Status Code should be StatusNoContent") pc = getMockMethods() @@ -188,7 +188,7 @@ func TestDeleteEventSubscriptionOnDeletedServer(t *testing.T) { } req.UUID = "/redfish/v1/Systems/6d4a0a66-7efa-578e-83cf-44dc68d2874d.1" - resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req) + resp = pc.DeleteEventSubscriptions(evcommon.MockContext(), req, "1225122") assert.Equal(t, http.StatusBadRequest, int(resp.StatusCode), "Status Code should be StatusNoContent") } @@ -206,7 +206,7 @@ func TestDeleteEventSubscriptionOnFabrics(t *testing.T) { SessionToken: "validToken", EventSubscriptionID: "71de0110-c35a-4859-984c-072d6c5a32d9", } - resp := pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req) + resp := pc.DeleteEventSubscriptionsDetails(evcommon.MockContext(), req, "admin", "test") assert.Equal(t, http.StatusOK, int(resp.StatusCode), "Status Code should be StatusOK") } @@ -376,9 +376,9 @@ func TestExternalInterfaces_resubscribeFabricsSubscription(t *testing.T) { config.SetUpMockConfig(t) pc := getMockMethods() event := model.EventDestination{} - err := pc.resubscribeFabricsSubscription(evcommon.MockContext(), event, "/fabric/scascascsaa", false) + err := pc.resubscribeFabricsSubscription(evcommon.MockContext(), event, "/fabric/scascascsaa", "admin", "test", false) assert.Nil(t, err) - err = pc.resubscribeFabricsSubscription(evcommon.MockContext(), event, "/redfish/v1/Fabrics/6d4a0a66-7efa-578e-83cf-44dc68d2874e", false) + err = pc.resubscribeFabricsSubscription(evcommon.MockContext(), event, "/redfish/v1/Fabrics/6d4a0a66-7efa-578e-83cf-44dc68d2874e", "admin", "test", false) assert.Nil(t, err) } @@ -387,7 +387,7 @@ func TestExternalInterfaces_subscribe(t *testing.T) { pc := getMockMethods() event := model.EventDestination{} - err := pc.subscribe(evcommon.MockContext(), event, "/redfish/v1/Systems/6d4a0a66-7efa-578e-83cf-44dc68d2874e.1", false, "valid") + _, err := pc.subscribe(evcommon.MockContext(), event, "/redfish/v1/Systems/6d4a0a66-7efa-578e-83cf-44dc68d2874e.1", false, "valid", "admin", "test", "test") assert.Nil(t, err) } diff --git a/svc-events/events/evtsubscription.go b/svc-events/events/evtsubscription.go index 00c5e2410..1e953e00c 100644 --- a/svc-events/events/evtsubscription.go +++ b/svc-events/events/evtsubscription.go @@ -816,6 +816,30 @@ func (e *ExternalInterfaces) DeleteSubscriptions(ctx context.Context, originReso return resp, nil } +// DeleteSubscriptions will delete subscription from device +func (e *ExternalInterfaces) GetSubscriptionLocation(ctx context.Context, target *common.Target) error { + var err error + var deviceSubscription *common.DeviceSubscription + addr, err := common.GetIPFromHostName(target.ManagerAddress) + if err != nil { + return err + } + searchKey := evcommon.GetSearchKey(addr, evmodel.DeviceSubscriptionIndex) + deviceSubscription, err = e.GetDeviceSubscriptions(searchKey) + + if err != nil { + // if its first subscription then no need to check events subscribed + if strings.Contains(err.Error(), "No data found for the key") { + return nil + } + errorMessage := "Error while get subscription details: " + err.Error() + + l.LogWithFields(ctx).Error(errorMessage) + return err + } + target.Location = deviceSubscription.Location + return nil +} func (e *ExternalInterfaces) createEventSubscription(ctx context.Context, taskID string, subTaskID string, subTaskChan chan<- int32, reqSessionToken string, targetURI string, request model.EventDestination, originResource string, result *evresponse.MutexLock, wg *sync.WaitGroup, collectionFlag bool, collectionName string, aggregateResource string, isAggregateCollection bool) { diff --git a/svc-events/rpc/events.go b/svc-events/rpc/events.go index d2aaf50a3..c13bab571 100644 --- a/svc-events/rpc/events.go +++ b/svc-events/rpc/events.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "os" + "strings" "github.com/ODIM-Project/ODIM/lib-rest-client/pmbhandle" "github.com/ODIM-Project/ODIM/lib-utilities/common" @@ -302,20 +303,39 @@ func (e *Events) DeleteEventSubscription(ctx context.Context, req *eventsproto.E ctx = common.GetContextData(ctx) ctx = common.ModifyContext(ctx, common.EventService, podName) var ( - taskID string - resp eventsproto.EventSubResponse + // taskID string + resp eventsproto.EventSubResponse ) - sessionUserName, taskID, err := e.AuthorizeAndCreateTask(ctx, req.SessionToken, &resp) - if err != nil { - return &resp, err - } - if req.UUID == "" { // Delete Event Subscription when admin requested + sessionUserName, taskID, err := e.AuthorizeAndCreateTask(ctx, req.SessionToken, &resp) + if err != nil { + return &resp, err + } go e.Connector.DeleteEventSubscriptionsDetails(ctx, req, sessionUserName, taskID) } else { // Delete Event Subscription to Device when Server get Deleted + // Task Service using RPC and get the taskID + taskURI, err := e.Connector.CreateTask(ctx, req.SessionToken) + if err != nil { + resp = eventsproto.EventSubResponse{} + errMsg := "Unable to create task: " + err.Error() + // // generateResponse(common.GeneralError(http.StatusInternalServerError, response.InternalError, errMsg, nil, nil), resp) + l.LogWithFields(ctx).Error(errMsg) + return &resp, nil + } + var taskID string + strArray := strings.Split(taskURI, "/") + if strings.HasSuffix(taskURI, "/") { + taskID = strArray[len(strArray)-2] + } else { + taskID = strArray[len(strArray)-1] + } + resp.StatusCode = http.StatusAccepted + resp.Header = map[string]string{ + "Location": "/taskmon/" + taskID, + } go e.Connector.DeleteEventSubscriptions(ctx, req, taskID) } return &resp, nil @@ -409,3 +429,9 @@ func (e *Events) UpdateSubscriptionLocationRPC(ctx context.Context, in *eventspr resp.Status = isUpdated return &resp, nil } + +func (e *Events) DeleteSubscription(ctx context.Context, in *eventsproto.EventRequest) (*eventsproto.EventSubResponse, error) { + var resp eventsproto.EventSubResponse + err := e.Connector.DeleteEvtSubscription(in.EventSubscriptionID) + return &resp, err +} diff --git a/svc-events/rpc/events_test.go b/svc-events/rpc/events_test.go index 2b6f18cbb..a894f715e 100644 --- a/svc-events/rpc/events_test.go +++ b/svc-events/rpc/events_test.go @@ -244,17 +244,17 @@ func TestDeleteEventSubscription(t *testing.T) { resp, err := events.DeleteEventSubscription(evcommon.MockContext(), req) assert.Nil(t, err, "There should be no error") - assert.Equal(t, int(resp.StatusCode), http.StatusOK, "Status code should be StatusOK.") + assert.Equal(t, http.StatusAccepted, int(resp.StatusCode), "Status code should be Accepted.") req.EventSubscriptionID = "81de0110" delResp, _ := events.DeleteEventSubscription(evcommon.MockContext(), req) - assert.Equal(t, int(delResp.StatusCode), http.StatusNotFound, "Status code should be StatusNotFound.") + assert.Equal(t, int(delResp.StatusCode), http.StatusAccepted, "Status code should be StatusNotFound.") JSONMarshal = func(v interface{}) ([]byte, error) { return nil, fmt.Errorf("") } resp, err = events.DeleteEventSubscription(evcommon.MockContext(), req) assert.Nil(t, err, "There should be an error") - assert.Equal(t, int(resp.StatusCode), http.StatusInternalServerError, "Status code should be StatusInternalServerError.") + assert.Equal(t, http.StatusAccepted, int(resp.StatusCode), "Status code should be StatusInternalServerError.") JSONMarshal = func(v interface{}) ([]byte, error) { return json.Marshal(v) } } @@ -264,18 +264,18 @@ func TestDeleteEventSubscriptionwithUUID(t *testing.T) { events := getMockPluginContactInitializer() // Positive test cases req := &eventsproto.EventRequest{ - SessionToken: "validToken", + SessionToken: "admin", UUID: "/redfish/v1/Systems/6d4a0a66-7efa-578e-83cf-44dc68d2874e.1", } resp, err := events.DeleteEventSubscription(evcommon.MockContext(), req) assert.Nil(t, err, "There should be no error") - assert.Equal(t, int(resp.StatusCode), http.StatusNoContent, "Status code should be StatusNoContent.") + assert.Equal(t, http.StatusAccepted, int(resp.StatusCode), "Status code should be StatusAccepted.") req.UUID = "81de0110" delResp, _ := events.DeleteEventSubscription(evcommon.MockContext(), req) - assert.Equal(t, int(delResp.StatusCode), http.StatusBadRequest, "Status code should be StatusBadRequest.") + assert.Equal(t, http.StatusAccepted, int(delResp.StatusCode), "Status code should be StatusAccepted.") } func TestCreateDefaultSubscriptions(t *testing.T) { diff --git a/svc-task/tcommon/common.go b/svc-task/tcommon/common.go index 75f2572e4..f4c2b7bc7 100644 --- a/svc-task/tcommon/common.go +++ b/svc-task/tcommon/common.go @@ -131,6 +131,26 @@ func UpdateSubscriptionLocation(ctx context.Context, location, host string) { l.LogWithFields(ctx).Debug("Location update status ", isUpdated) } +// DeleteSubscription do the RPC call to events service to delete subscription +// once a delete event subscription is completed +func DeleteSubscription(ctx context.Context, subscriptionID string) { + conn, err := services.ODIMService.Client(services.Events) + if err != nil { + l.LogWithFields(ctx).Error("Error while Event ", err.Error()) + return + } + defer conn.Close() + event := eventproto.NewEventsClient(conn) + _, err = event.DeleteSubscription(ctx, &eventproto.EventRequest{ + EventSubscriptionID: subscriptionID, + }) + if err != nil { + l.LogWithFields(ctx).Info("Error while delete subscription ", err) + return + } + l.LogWithFields(ctx).Debug("Subscription delete completed ", subscriptionID) +} + // UpdateAccount do the RPC call to events service to update account details func UpdateRemoteAccount(ctx context.Context, location, host string) { conn, err := services.ODIMService.Client(services.Managers) diff --git a/svc-task/thandle/thandle.go b/svc-task/thandle/thandle.go index f5e7e90a0..66b27ca83 100644 --- a/svc-task/thandle/thandle.go +++ b/svc-task/thandle/thandle.go @@ -32,6 +32,7 @@ import ( l "github.com/ODIM-Project/ODIM/lib-utilities/logs" taskproto "github.com/ODIM-Project/ODIM/lib-utilities/proto/task" "github.com/ODIM-Project/ODIM/lib-utilities/response" + "github.com/ODIM-Project/ODIM/lib-utilities/services" "github.com/ODIM-Project/ODIM/svc-task/tcommon" "github.com/ODIM-Project/ODIM/svc-task/tmodel" "github.com/ODIM-Project/ODIM/svc-task/tresponse" @@ -1252,6 +1253,9 @@ func (ts *TasksRPC) updateParentTask(ctx context.Context, taskID, taskStatus, ta parentTask.TaskFinalResponse = nil parentTask.StatusCode = http.StatusCreated } + if value, err := services.GetEventSubscriptionID(ctx, parentTask.ID); err == nil { + tcommon.DeleteSubscription(ctx, value) + } ts.updateTaskToCompleted(parentTask) return nil } @@ -1299,6 +1303,9 @@ func (ts *TasksRPC) validateChildTasksAndUpdateParentTask(ctx context.Context, c parentTask.TaskFinalResponse = nil parentTask.StatusCode = http.StatusCreated } + if value, err := services.GetEventSubscriptionID(ctx, parentTask.ID); err == nil { + tcommon.DeleteSubscription(ctx, value) + } ts.updateTaskToCompleted(parentTask) }