Skip to content

Commit

Permalink
add postgresql db connector
Browse files Browse the repository at this point in the history
  • Loading branch information
AmaliMatharaarachchi committed Mar 6, 2024
1 parent 843dcfa commit 5f5f99d
Show file tree
Hide file tree
Showing 24 changed files with 967 additions and 278 deletions.
10 changes: 2 additions & 8 deletions common-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,9 @@ func main() {
go server.StartInternalServer()
if conf.CommonController.Database.Enabled {
logger.Info("Starting the Database connection")
go startDB()
database.ConnectToDB()
}
defer database.CloseDBConn()
logger.Info("Starting the Common Controller")
commoncontroller.InitCommonControllerServer(conf)

}

func startDB() {
database.ConnectToDB()
database.GetApplicationByUUID()
defer database.CloseDBConn()
}
2 changes: 1 addition & 1 deletion common-controller/internal/config/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var defaultConfig = &Config{
RetryInterval: 5,
Persistence: persistence{Type: "K8s"}},
Database: database{
Enabled: true,
Enabled: false,
Name: "DATAPLANE",
Username: "wso2carbon",
Password: "wso2carbon",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ArtifactDeployer interface {
DeployApplicationMappings(applicationMapping server.ApplicationMapping) error
UpdateApplicationMappings(applicationMapping server.ApplicationMapping) error
DeployKeyMappings(keyMapping server.ApplicationKeyMapping) error
UpdateKeyMappings(keyMapping server.ApplicationKeyMapping) error
GetApplication(applicationID string) (server.Application, error)
GetSubscription(subscriptionID string) (server.Subscription, error)
GetApplicationMappings(applicationID string) (server.ApplicationMapping, error)
Expand Down
32 changes: 26 additions & 6 deletions common-controller/internal/controlplane/controlplane_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (controlPlaneGrpcClient *Agent) initializeGrpcStreaming() *grpc.ClientConn
}
func (controlPlaneGrpcClient *Agent) handleEvents(event *subscription.Event) {
loggers.LoggerAPKOperator.Infof("Received event %s", event.Type)
if event.Type == constants.AllEvnts {
if event.Type == constants.AllEvents {
go controlPlaneGrpcClient.retrieveAllData()
} else if event.Type == constants.ApplicationCreated {
loggers.LoggerAPKOperator.Infof("Received APPLICATION_CREATED event.")
Expand Down Expand Up @@ -296,7 +296,19 @@ func (controlPlaneGrpcClient *Agent) handleEvents(event *subscription.Event) {
loggers.LoggerAPKOperator.Infof("Received ApplicationMapping %s", applicationMapping.UUID)
controlPlaneGrpcClient.artifactDeployer.UpdateApplicationMappings(applicationMapping)
}

} else if event.Type == constants.ApplicationKeyMappingUpdated {
loggers.LoggerAPKOperator.Infof("Received APPLICATION_KEY_MAPPING_UPDATED event.")
if event.ApplicationKeyMapping != nil {
applicationKeyMapping := server.ApplicationKeyMapping{ApplicationUUID: event.ApplicationKeyMapping.ApplicationUUID,
SecurityScheme: event.ApplicationKeyMapping.SecurityScheme,
ApplicationIdentifier: event.ApplicationKeyMapping.ApplicationIdentifier,
KeyType: event.ApplicationKeyMapping.KeyType,
EnvID: event.ApplicationKeyMapping.EnvID,
OrganizationID: event.ApplicationKeyMapping.Organization,
}
loggers.LoggerAPKOperator.Infof("Received ApplicationKeyMapping %s", applicationKeyMapping.ApplicationUUID)
controlPlaneGrpcClient.artifactDeployer.UpdateKeyMappings(applicationKeyMapping)
}
}
}
func (controlPlaneGrpcClient *Agent) retrieveAllData() {
Expand Down Expand Up @@ -429,20 +441,28 @@ func (controlPlaneGrpcClient *Agent) retrieveDataFromResponseChannel(response re
loggers.LoggerAPI.Infof("Received Subscription information.")
subList := newResponse.(*SubscriptionList)
resolvedSubscriptionList := marshalMultipleSubscriptions(subList)
controlPlaneGrpcClient.artifactDeployer.DeployAllSubscriptions(resolvedSubscriptionList)
if len(resolvedSubscriptionList.List) > 0 {
controlPlaneGrpcClient.artifactDeployer.DeployAllSubscriptions(resolvedSubscriptionList)
}

case *ApplicationList:
loggers.LoggerAPI.Infof("Received Application information.")
appList := newResponse.(*ApplicationList)
resolvedApplicationList := marshalMultipleApplications(appList)
resolvedApplicationKeyMappingList := marshalMultipleApplicationKeyMappings(appList)
controlPlaneGrpcClient.artifactDeployer.DeployAllApplications(resolvedApplicationList)
controlPlaneGrpcClient.artifactDeployer.DeployAllKeyMappings(resolvedApplicationKeyMappingList)
if len(resolvedApplicationList.List) > 0 {
controlPlaneGrpcClient.artifactDeployer.DeployAllApplications(resolvedApplicationList)
}
if len(resolvedApplicationKeyMappingList.List) > 0 {
controlPlaneGrpcClient.artifactDeployer.DeployAllKeyMappings(resolvedApplicationKeyMappingList)
}
case *ApplicationMappingList:
loggers.LoggerAPI.Infof("Received Application Mapping information.")
appMappingList := newResponse.(*ApplicationMappingList)
resolvedApplicationMappingList := marshalMultipleApplicationMappings(appMappingList)
controlPlaneGrpcClient.artifactDeployer.DeployAllApplicationMappings(resolvedApplicationMappingList)
if len(resolvedApplicationMappingList.List) > 0 {
controlPlaneGrpcClient.artifactDeployer.DeployAllApplicationMappings(resolvedApplicationMappingList)
}
default:
loggers.LoggerAPI.Debugf("Unknown type %T", t)
}
Expand Down
10 changes: 8 additions & 2 deletions common-controller/internal/controlplane/k8s_artifact_deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/wso2/apk/common-controller/internal/loggers"
"github.com/wso2/apk/common-controller/internal/server"
cpv1alpha2 "github.com/wso2/apk/common-go-libs/apis/cp/v1alpha2"
"github.com/wso2/apk/common-go-libs/constants"
"github.com/wso2/apk/common-go-libs/utils"
k8error "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -96,6 +97,11 @@ func (k8sArtifactDeployer K8sArtifactDeployer) UpdateApplication(application ser
return nil
}

// UpdateKeyMappings updates a key mapping
func (k8sArtifactDeployer K8sArtifactDeployer) UpdateKeyMappings(keyMapping server.ApplicationKeyMapping) error {
return nil
}

// DeploySubscription deploys a subscription
func (k8sArtifactDeployer K8sArtifactDeployer) DeploySubscription(subscription server.Subscription) error {
crSubscription := cpv1alpha2.Subscription{ObjectMeta: v1.ObjectMeta{Name: subscription.UUID, Namespace: utils.GetOperatorPodNamespace()},
Expand Down Expand Up @@ -151,7 +157,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeployKeyMappings(keyMapping serv
if crApplication.Spec.SecuritySchemes != nil {
securitySchemes = *crApplication.Spec.SecuritySchemes
}
if keyMapping.SecurityScheme == "OAuth2" {
if keyMapping.SecurityScheme == constants.OAuth2 {
if securitySchemes.OAuth2 == nil {
securitySchemes.OAuth2 = &cpv1alpha2.SecurityScheme{Environments: []cpv1alpha2.Environment{generateSecurityScheme(keyMapping)}}
} else {
Expand Down Expand Up @@ -257,7 +263,7 @@ func (k8sArtifactDeployer K8sArtifactDeployer) DeleteKeyMappings(keyMapping serv
}
if crApplication.Spec.SecuritySchemes != nil {
securitySchemes := *crApplication.Spec.SecuritySchemes
if keyMapping.SecurityScheme == "OAuth2" && securitySchemes.OAuth2 != nil {
if keyMapping.SecurityScheme == constants.OAuth2 && securitySchemes.OAuth2 != nil {
if securitySchemes.OAuth2.Environments != nil && len(securitySchemes.OAuth2.Environments) > 0 {
environments := make([]cpv1alpha2.Environment, 0)
for _, environment := range securitySchemes.OAuth2.Environments {
Expand Down
245 changes: 235 additions & 10 deletions common-controller/internal/database/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,244 @@

package database

import "github.com/wso2/apk/common-controller/internal/loggers"
import (
"github.com/jackc/pgx/v5"
"github.com/wso2/apk/common-controller/internal/loggers"
"github.com/wso2/apk/common-controller/internal/server"
)

// GetApplicationByUUID returns the Application details from the DB for a given application
func GetApplicationByUUID() {
loggers.LoggerAPI.Error("amaliiii ")
rows, _ := ExecDBQuery(queryTest)
// deployApplicationAttributes deploys application attributes
func deployApplicationwithAttributes(tx pgx.Tx, application server.Application) error {
PrepareQueries(tx, insertApplication, insertApplicationAttributes)
err := AddApplication(tx, application.UUID, application.Name, application.Owner, application.OrganizationID)
if err != nil {
loggers.LoggerAPI.Error("Error while adding application ", err)
return err
}
for attributeKey, attributeValue := range application.Attributes {
err = AddApplicationAttributes(tx, application.UUID, attributeKey, attributeValue)
if err != nil {
loggers.LoggerAPI.Error("Error while adding application attributes ", err)
return err
}
}
return nil
}

func updateApplicationAttributes(tx pgx.Tx, application server.Application) error {
PrepareQueries(tx, insertApplicationAttributes, deleteAllAppAttributes)
err := DeleteApplicationAttributes(tx, application.UUID)
if err != nil {
loggers.LoggerAPI.Error("Error while deleting application attributes ", err)
return err
}
for attributeKey, attributeValue := range application.Attributes {
err = AddApplicationAttributes(tx, application.UUID, attributeKey, attributeValue)
if err != nil {
loggers.LoggerAPI.Error("Error while adding application attributes ", err)
return err
}
}
return nil
}

// GetAllApplications gets all applications from the database
func GetAllApplications(tx pgx.Tx) ([]server.Application, error) {
rows, err := ExecDBQueryRows(tx, getAllApplicationAttributes)
if err != nil {
return nil, err
}
defer rows.Close()

appAttributes := make(map[string]map[string]string)
for rows.Next() {
var uuid string
var name string
var attrib string
err := rows.Scan(&uuid, &name, &attrib)
if err != nil {
return nil, err
}
if _, ok := appAttributes[uuid]; !ok {
appAttributes[uuid] = map[string]string{}
}
appAttributes[uuid][name] = attrib
}

rows, err = ExecDBQueryRows(tx, getAllApplications)
if err != nil {
return nil, err
}
defer rows.Close()
var applications []server.Application
for rows.Next() {
var app server.Application
err := rows.Scan(&app.UUID, &app.Name, &app.Owner, &app.OrganizationID)
if err != nil {
return nil, err
}
app.Attributes = appAttributes[app.UUID]
applications = append(applications, app)
}
return applications, nil
}

// GetAllSubscription gets all subscriptions from the database
func GetAllSubscription(tx pgx.Tx) ([]server.Subscription, error) {
rows, err := ExecDBQueryRows(tx, getAllSubscriptions)
if err != nil {
return nil, err
}
defer rows.Close()
var subscriptions []server.Subscription
for rows.Next() {
sub := server.Subscription{
SubscribedAPI: &server.SubscribedAPI{},
}

err := rows.Scan(&sub.UUID, &sub.SubscribedAPI.Name, &sub.SubscribedAPI.Version, &sub.SubStatus, &sub.Organization)
if err != nil {
return nil, err
}
subscriptions = append(subscriptions, sub)
}
return subscriptions, nil
}

// GetAllApplicationKeyMappings gets all application key mappings from the database
func GetAllApplicationKeyMappings(tx pgx.Tx) ([]server.ApplicationKeyMapping, error) {
rows, err := ExecDBQueryRows(tx, getAllApplicationKeyMappings)
if err != nil {
return nil, err
}
defer rows.Close()
var appKeyMappings []server.ApplicationKeyMapping
for rows.Next() {
var appKeyMapping server.ApplicationKeyMapping
err := rows.Scan(&appKeyMapping.ApplicationUUID, &appKeyMapping.SecurityScheme, &appKeyMapping.ApplicationIdentifier,
&appKeyMapping.KeyType, &appKeyMapping.EnvID, &appKeyMapping.OrganizationID)
if err != nil {
return nil, err
}
appKeyMappings = append(appKeyMappings, appKeyMapping)
}
return appKeyMappings, nil
}

// GetAllAppSubs gets all application subscription mappings from the database
func GetAllAppSubs(tx pgx.Tx) ([]server.ApplicationMapping, error) {
rows, err := ExecDBQueryRows(tx, getAllAppSubs)
if err != nil {
return nil, err
}
defer rows.Close()
var appSubs []server.ApplicationMapping
for rows.Next() {
values, err := rows.Values()
if err == nil {
loggers.LoggerDatabase.Info(values)
} else {
loggers.LoggerDatabase.Error(err)
var appSub server.ApplicationMapping
err := rows.Scan(&appSub.UUID, &appSub.ApplicationRef, &appSub.SubscriptionRef, &appSub.OrganizationID)
if err != nil {
return nil, err
}
appSubs = append(appSubs, appSub)
}
return appSubs, nil
}

// AddApplication adds an application to the database
func AddApplication(tx pgx.Tx, uuid, name, owner, org string) error {
return ExecDBQuery(tx, insertApplication, uuid, name, owner, org)
}

// UpdateApplication updates an application in the database
func UpdateApplication(tx pgx.Tx, uuid, name, owner, org string) error {
return ExecDBQuery(tx, updateApplication, uuid, name, owner, org)
}

// DeleteApplication deletes an application from the database
func DeleteApplication(tx pgx.Tx, uuid string) error {
return ExecDBQuery(tx, deleteApplication, uuid)
}

// DeleteAllApplications deletes all applications from the database
func DeleteAllApplications(tx pgx.Tx) error {
return ExecDBQuery(tx, deleteAllApplications)
}

// AddApplicationAttributes adds attributes to an application in the database
func AddApplicationAttributes(tx pgx.Tx, appUUID, name, appAttribute string) error {
return ExecDBQuery(tx, insertApplicationAttributes, appUUID, name, appAttribute)
}

// DeleteApplicationAttributes deletes attributes of an application from the database
func DeleteApplicationAttributes(tx pgx.Tx, appUUID string) error {
return ExecDBQuery(tx, deleteApplicationAttributes, appUUID)
}

// DeleteAllAppAttributes deletes all attributes of all applications from the database
func DeleteAllAppAttributes(tx pgx.Tx) error {
return ExecDBQuery(tx, deleteAllAppAttributes)
}

// AddSubscription adds a subscription to the database
func AddSubscription(tx pgx.Tx, uuid, apiName, apiVersion, subStatus, organization string) error {
return ExecDBQuery(tx, insertSubscription, uuid, apiName, apiVersion, subStatus, organization)
}

// UpdateSubscription updates a subscription in the database
func UpdateSubscription(tx pgx.Tx, uuid, apiName, apiVersion, subStatus, organization string) error {
return ExecDBQuery(tx, updateSubscription, uuid, apiName, apiVersion, subStatus, organization)
}

// DeleteSubscription deletes a subscription from the database
func DeleteSubscription(tx pgx.Tx, uuid string) error {
return ExecDBQuery(tx, deleteSubscription, uuid)
}

// DeleteAllSubscriptions deletes all subscriptions from the database
func DeleteAllSubscriptions(tx pgx.Tx) error {
return ExecDBQuery(tx, deleteAllSubscriptions)
}

// AddApplicationKeyMapping adds a key mapping to the database
func AddApplicationKeyMapping(tx pgx.Tx, applicationUUID, securityScheme, applicationIdentifier, keyType, env,
organization string) error {
return ExecDBQuery(tx, insertApplicationKeyMapping, applicationUUID, securityScheme, applicationIdentifier, keyType,
env, organization)
}

// DeleteApplicationKeyMapping deletes a key mapping from the database
func DeleteApplicationKeyMapping(tx pgx.Tx, applicationUUID, securityScheme, env string) error {
return ExecDBQuery(tx, deleteApplicationKeyMapping, applicationUUID, securityScheme, env)
}

// UpdateApplicationKeyMapping updates a key mapping in the database
func UpdateApplicationKeyMapping(tx pgx.Tx, applicationUUID, securityScheme, applicationIdentifier, keyType, env,
organization string) error {
return ExecDBQuery(tx, updateApplicationKeyMapping, applicationUUID, securityScheme, applicationIdentifier, keyType,
env, organization)
}

// DeleteAllApplicationKeyMappings deletes all key mappings from the database
func DeleteAllApplicationKeyMappings(tx pgx.Tx) error {
return ExecDBQuery(tx, deleteAllApplicationKeyMappings)
}

// AddAppSub adds an application subscription mapping to the database
func AddAppSub(tx pgx.Tx, uuid, appUUID, subUUID, organization string) error {
return ExecDBQuery(tx, insertAppSub, uuid, appUUID, subUUID, organization)
}

// UpdateAppSub updates an application subscription mapping in the database
func UpdateAppSub(tx pgx.Tx, uuid, appUUID, subUUID, organization string) error {
return ExecDBQuery(tx, updateAppSub, uuid, appUUID, subUUID, organization)
}

// DeleteAppSub deletes an application subscription mapping from the database
func DeleteAppSub(tx pgx.Tx, uuid string) error {
return ExecDBQuery(tx, deleteAppSub, uuid)
}

// DeleteAllAppSub deletes all application subscription mappings from the database
func DeleteAllAppSub(tx pgx.Tx) error {
return ExecDBQuery(tx, deleteAllAppSub)
}
Loading

0 comments on commit 5f5f99d

Please sign in to comment.