Skip to content

Commit

Permalink
feat: emit events in identity persister
Browse files Browse the repository at this point in the history
  • Loading branch information
aeneasr committed Sep 17, 2024
1 parent 7d6a458 commit 912a49a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
10 changes: 0 additions & 10 deletions identity/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ import (
"slices"
"sort"

"go.opentelemetry.io/otel/trace"

"github.com/ory/kratos/schema"
"github.com/ory/kratos/x/events"
"github.com/ory/x/sqlcon"

"github.com/ory/x/otelx"
Expand Down Expand Up @@ -101,7 +98,6 @@ func (m *Manager) Create(ctx context.Context, i *Identity, opts ...ManagerOption
return err
}

trace.SpanFromContext(ctx).AddEvent(events.NewIdentityCreated(ctx, i.ID))
return nil
}

Expand Down Expand Up @@ -346,10 +342,6 @@ func (m *Manager) CreateIdentities(ctx context.Context, identities []*Identity,
return err
}

for _, i := range identities {
trace.SpanFromContext(ctx).AddEvent(events.NewIdentityCreated(ctx, i.ID))
}

return nil
}

Expand Down Expand Up @@ -416,7 +408,6 @@ func (m *Manager) UpdateSchemaID(ctx context.Context, id uuid.UUID, schemaID str
return err
}

trace.SpanFromContext(ctx).AddEvent(events.NewIdentityUpdated(ctx, id))
return m.r.PrivilegedIdentityPool().UpdateIdentity(ctx, original)
}

Expand Down Expand Up @@ -477,7 +468,6 @@ func (m *Manager) UpdateTraits(ctx context.Context, id uuid.UUID, traits Traits,
return err
}

trace.SpanFromContext(ctx).AddEvent(events.NewIdentityUpdated(ctx, id))
return m.r.PrivilegedIdentityPool().UpdateIdentity(ctx, updated)
}

Expand Down
35 changes: 28 additions & 7 deletions persistence/sql/identity/persister_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"sync"
"time"

"github.com/ory/kratos/x/events"

"github.com/ory/x/crdbx"

"github.com/gobuffalo/pop/v6"
Expand Down Expand Up @@ -538,7 +540,7 @@ func (p *IdentityPersister) CreateIdentity(ctx context.Context, ident *identity.
func (p *IdentityPersister) CreateIdentities(ctx context.Context, identities ...*identity.Identity) (err error) {
ctx, span := p.r.Tracer(ctx).Tracer().Start(ctx, "persistence.sql.CreateIdentities",
trace.WithAttributes(
attribute.Int("num_identities", len(identities)),
attribute.Int("identities.count", len(identities)),
attribute.Stringer("network.id", p.NetworkID(ctx))))
defer otelx.End(span, &err)

Expand Down Expand Up @@ -568,7 +570,7 @@ func (p *IdentityPersister) CreateIdentities(ctx context.Context, identities ...
}
}

return p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error {
if err := p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error {
conn := &batch.TracerConnection{
Tracer: p.r.Tracer(ctx),
Connection: tx,
Expand All @@ -590,7 +592,15 @@ func (p *IdentityPersister) CreateIdentities(ctx context.Context, identities ...
return sqlcon.HandleError(err)
}
return nil
})
}); err != nil {
return err
}

for _, ident := range identities {
span.AddEvent(events.NewIdentityCreated(ctx, ident.ID))
}

return nil
}

func (p *IdentityPersister) HydrateIdentityAssociations(ctx context.Context, i *identity.Identity, expand identity.Expandables) (err error) {
Expand Down Expand Up @@ -960,10 +970,15 @@ func (p *IdentityPersister) UpdateIdentityColumns(ctx context.Context, i *identi
attribute.Stringer("network.id", p.NetworkID(ctx))))
defer otelx.End(span, &err)

return p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error {
if err := p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error {
_, err := tx.Where("id = ? AND nid = ?", i.ID, p.NetworkID(ctx)).UpdateQuery(i, columns...)
return sqlcon.HandleError(err)
})
}); err != nil {
return err

Check warning on line 977 in persistence/sql/identity/persister_identity.go

View check run for this annotation

Codecov / codecov/patch

persistence/sql/identity/persister_identity.go#L977

Added line #L977 was not covered by tests
}

span.AddEvent(events.NewIdentityUpdated(ctx, i.ID))
return nil
}

func (p *IdentityPersister) UpdateIdentity(ctx context.Context, i *identity.Identity) (err error) {
Expand All @@ -978,7 +993,7 @@ func (p *IdentityPersister) UpdateIdentity(ctx context.Context, i *identity.Iden
}

i.NID = p.NetworkID(ctx)
return sqlcon.HandleError(p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error {
if err := sqlcon.HandleError(p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error {
// This returns "ErrNoRows" if the identity does not exist
if err := update.Generic(WithTransaction(ctx, tx), tx, p.r.Tracer(ctx).Tracer(), i); err != nil {
return err
Expand All @@ -1003,7 +1018,12 @@ func (p *IdentityPersister) UpdateIdentity(ctx context.Context, i *identity.Iden
}

return sqlcon.HandleError(p.createIdentityCredentials(ctx, tx, i))
}))
})); err != nil {
return err
}

span.AddEvent(events.NewIdentityUpdated(ctx, i.ID))
return nil
}

func (p *IdentityPersister) DeleteIdentity(ctx context.Context, id uuid.UUID) (err error) {
Expand All @@ -1028,6 +1048,7 @@ func (p *IdentityPersister) DeleteIdentity(ctx context.Context, id uuid.UUID) (e
if count == 0 {
return errors.WithStack(sqlcon.ErrNoRows)
}
span.AddEvent(events.NewIdentityDeleted(ctx, id))
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions x/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
VerificationSucceeded semconv.Event = "VerificationSucceeded"
IdentityCreated semconv.Event = "IdentityCreated"
IdentityUpdated semconv.Event = "IdentityUpdated"
IdentityDeleted semconv.Event = "IdentityDeleted"
WebhookDelivered semconv.Event = "WebhookDelivered"
WebhookSucceeded semconv.Event = "WebhookSucceeded"
WebhookFailed semconv.Event = "WebhookFailed"
Expand Down Expand Up @@ -262,6 +263,16 @@ func NewIdentityCreated(ctx context.Context, identityID uuid.UUID) (string, trac
)
}

func NewIdentityDeleted(ctx context.Context, identityID uuid.UUID) (string, trace.EventOption) {
return IdentityDeleted.String(),
trace.WithAttributes(
append(
semconv.AttributesFromContext(ctx),
semconv.AttrIdentityID(identityID),
)...,
)
}

func NewIdentityUpdated(ctx context.Context, identityID uuid.UUID) (string, trace.EventOption) {
return IdentityUpdated.String(),
trace.WithAttributes(
Expand Down

0 comments on commit 912a49a

Please sign in to comment.