-
Notifications
You must be signed in to change notification settings - Fork 473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
JAMES-2433 Implement EventStore for a JPA backend #2452
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is a cool first shot at it!
I did do a few comments to polish the code here.
Next steps would be to wire the event store into existing app and expose features relying on event sourcing onto the same applications.
EventStoreFailedException esfe = new EventStoreFailedException("Unable to add events"); | ||
esfe.initCause(e); | ||
throw esfe; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventStoreFailedException esfe = new EventStoreFailedException("Unable to add events"); | |
esfe.initCause(e); | |
throw esfe; | |
throw new EventStoreFailedException("Unable to add events", e); |
Mutability can be avoided
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I intended to do, but this exception doesn't support a cause parameter... I left the scala api part alone for now.
Preconditions.checkArgument(Event.belongsToSameAggregate(events)); | ||
AggregateId aggregateId = events.head().getAggregateId(); | ||
EntityManager entityManager = entityManagerFactory.createEntityManager(); | ||
final EntityTransaction transaction = entityManager.getTransaction(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can handle this with TransactionRunner I believe
return Mono.fromRunnable(() -> TransactionRunner.runAndHandleException(entityManager -> {
events.foreach(Throwing.runnable(e -> {
JPAEvent jpaEvent = new JPAEvent(aggregateId, e.eventId(), jsonEventSerializer.serialize(e));
entityManager.persist(jpaEvent);
}));
}, e -> {
throw new EventStoreFailedException("Unable to add events", e);
});
Note that:
e
variable name is ambiguous, we shall favorevent
- Wrap the whole transformation into a mono not just the final transformation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TransactionWorker api isn't as shown above, but was easy enough to adjust. However the Throwing part I gave a few tries with several variations and types but couldn't get to work yet. If you have a working snippet that could save me some time trying to mess with the java/scala type system incompatibilities (Function1 is not a Function etc)...
btw I think it would have been nice if at least the events source api part remained in java (even if other module implementations were in scala) rather than making the interface itself use scala objects and have the scala types leak into the java code... scala was designed to try an play nice with java, but not necessarily the other way around. but perhaps that's just my own preference or lack of experience stitching them together :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I got it to work with JavaConverters.asJava(events), a bit ugly but I don't know how to get the scala objects to play nice with Throwing. If you have a better way let me know.
List<Event> events = (List<Event>) entityManager.createNamedQuery(SELECT_AGGREGATE_QUERY) | ||
.setParameter("aggregateId", aggregateId.asAggregateKey()) | ||
.getResultStream() | ||
.map(e -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use THrowing.function ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@Override | ||
public Publisher<Void> remove(AggregateId aggregateId) { | ||
EntityManager entityManager = entityManagerFactory.createEntityManager(); | ||
final EntityTransaction transaction = entityManager.getTransaction(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use transaction runner here too and wrap the whole operation onto a mono...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return Mono.empty(); | ||
} | ||
|
||
// for use in tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use @VisibleForTesting annotation them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@Override | ||
public int hashCode() { | ||
return Objects.hashCode(aggregateId, eventId); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (this == obj) { | ||
return true; | ||
} | ||
if (obj == null || getClass() != obj.getClass()) { | ||
return false; | ||
} | ||
final JPAEventId other = (JPAEventId) obj; | ||
return Objects.equal(this.aggregateId, other.aggregateId) | ||
&& Objects.equal(this.eventId, other.eventId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shall test equals and hash code with equals verifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean here.
Hello, you could use https://jqno.nl/equalsverifier/ for test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@Column(name = "EVENT_ID", nullable = false) | ||
private int eventId; | ||
|
||
@Column(name = "EVENT", nullable = false, length = 16 * 1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may hurt: are we sure this is enough to fit all possible serialized events?
For instance RuleSetDefined may be larger than this for people holding many filters...
Other implementation are unbounded on the size of this field, likely so should do the JPA one.
I suggest looking into using an unbounded byte[]
here if need be..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea what this is used for and how big the events are, usually events and jsons (used in the other modules as well) are small-ish creatures :-)
I did start with 1mb and it errored, so just picked an arbitrary value to get through the prototype. I suppose if it needs to be a blob or whatnot we can do that. Maybe add to the contract (test) whatever size you think the implementations must support to make sure they're all compatible with the requirements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to large @lob
9234fdd
to
3f8f587
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall ok to me!
@Inject | ||
public void setJsonEventSerializer(JsonEventSerializer jsonEventSerializer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we inject via the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I moved both injections to a constructor, though other classes are inconsistent in constructor/setter injection point and I'm not sure if it matters... if you have a preference perhaps you should standardize the injection location throughout the code.
I don't know how/where/what is needed to finish integrating this, or if you intend on doing that separately after merging this PR? |
Anything else to be done here? (ci failure seems unrelated?) |
This is just a rough draft and not ready to merge yet, but since I have no previous experience with Scala or Reactive Streams or Guice or this event api or its usages, if anyone could help in improving and finalizing this, I think we can get this done :-)