From f587fe8c1a8709c8c16508ec0ef62461e597ed11 Mon Sep 17 00:00:00 2001 From: Amichai Rothman Date: Tue, 15 Oct 2024 12:29:53 +0300 Subject: [PATCH] JAMES-2433 Implement EventStore for a JPA backend --- event-sourcing/event-store-jpa/pom.xml | 128 ++++++++++++++++++ .../eventstore/jpa/JPAEventStore.java | 118 ++++++++++++++++ .../eventstore/jpa/model/JPAEvent.java | 124 +++++++++++++++++ .../jpa/JPAEventSourcingSystemTest.java | 33 +++++ .../jpa/JPAEventStoreExtension.java | 44 ++++++ .../eventstore/jpa/JPAEventStoreTest.java | 33 +++++ .../eventstore/jpa/model/JPAEventTest.java | 35 +++++ .../src/test/resources/persistence.xml | 40 ++++++ event-sourcing/pom.xml | 1 + 9 files changed, 556 insertions(+) create mode 100644 event-sourcing/event-store-jpa/pom.xml create mode 100644 event-sourcing/event-store-jpa/src/main/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStore.java create mode 100644 event-sourcing/event-store-jpa/src/main/java/org/apache/james/eventsourcing/eventstore/jpa/model/JPAEvent.java create mode 100644 event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventSourcingSystemTest.java create mode 100644 event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStoreExtension.java create mode 100644 event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStoreTest.java create mode 100644 event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/model/JPAEventTest.java create mode 100644 event-sourcing/event-store-jpa/src/test/resources/persistence.xml diff --git a/event-sourcing/event-store-jpa/pom.xml b/event-sourcing/event-store-jpa/pom.xml new file mode 100644 index 00000000000..f7e1fab711e --- /dev/null +++ b/event-sourcing/event-store-jpa/pom.xml @@ -0,0 +1,128 @@ + + + + 4.0.0 + + + org.apache.james + event-sourcing + 3.9.0-SNAPSHOT + + + event-sourcing-event-store-jpa + + Apache James :: Event sourcing :: Event Store :: JPA + JPA implementation for James Event Store + + + + ${james.groupId} + apache-james-backends-jpa + + + ${james.groupId} + apache-james-backends-jpa + test-jar + test + + + ${james.groupId} + event-sourcing-core + test + + + ${james.groupId} + event-sourcing-core + test-jar + test + + + ${james.groupId} + event-sourcing-event-store-api + + + ${james.groupId} + event-sourcing-event-store-api + test-jar + test + + + ${james.groupId} + event-sourcing-pojo + test-jar + test + + + ${james.groupId} + testing-base + test + + + io.projectreactor + reactor-core + + + org.apache.derby + derby + test + + + org.mockito + mockito-core + test + + + + + + + org.apache.openjpa + openjpa-maven-plugin + ${apache.openjpa.version} + + org/apache/james/eventsourcing/eventstore/jpa/model/JPAEvent.class + true + true + + + log + TOOL=TRACE + + + metaDataFactory + jpa(Types=org.apache.james.eventsourcing.eventstore.jpa.model.JPAEvent) + + + ${basedir}/src/test/resources/persistence.xml + + + + enhancer + + enhance + + process-classes + + + + + + + \ No newline at end of file diff --git a/event-sourcing/event-store-jpa/src/main/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStore.java b/event-sourcing/event-store-jpa/src/main/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStore.java new file mode 100644 index 00000000000..72dbeb2f693 --- /dev/null +++ b/event-sourcing/event-store-jpa/src/main/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStore.java @@ -0,0 +1,118 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.eventsourcing.eventstore.jpa; + + +import static org.apache.james.eventsourcing.eventstore.jpa.model.JPAEvent.DELETE_AGGREGATE_QUERY; +import static org.apache.james.eventsourcing.eventstore.jpa.model.JPAEvent.SELECT_AGGREGATE_QUERY; + +import jakarta.inject.Inject; +import jakarta.persistence.EntityManagerFactory; +import jakarta.persistence.PersistenceUnit; + +import org.apache.james.backends.jpa.TransactionRunner; +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.eventstore.EventStoreFailedException; +import org.apache.james.eventsourcing.eventstore.History; +import org.apache.james.eventsourcing.eventstore.JsonEventSerializer; +import org.apache.james.eventsourcing.eventstore.jpa.model.JPAEvent; +import org.reactivestreams.Publisher; + +import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import reactor.core.publisher.Mono; +import scala.collection.Iterable; +import scala.collection.JavaConverters; + +@PersistenceUnit(unitName = "James") +public class JPAEventStore implements EventStore { + + /** + * The entity manager to access the database. + */ + private EntityManagerFactory entityManagerFactory; + + /** + * The JSON serializer to serialize the event data. + */ + private JsonEventSerializer jsonEventSerializer; + + /** + * Constructs a JPAEventStore. + */ + @Inject + public JPAEventStore(EntityManagerFactory entityManagerFactory, JsonEventSerializer jsonEventSerializer) { + this.jsonEventSerializer = jsonEventSerializer; + this.entityManagerFactory = entityManagerFactory; + } + + @Override + public Publisher appendAll(Iterable events) { + if (events.isEmpty()) { + return Mono.empty(); + } + Preconditions.checkArgument(Event.belongsToSameAggregate(events)); + AggregateId aggregateId = events.head().getAggregateId(); + return Mono.fromRunnable(() -> new TransactionRunner(entityManagerFactory).runAndHandleException( + entityManager -> + JavaConverters.asJava(events).forEach(Throwing.consumer(e -> { + JPAEvent jpaEvent = new JPAEvent(aggregateId, e.eventId(), jsonEventSerializer.serialize(e)); + entityManager.persist(jpaEvent); + })), + exception -> { + EventStoreFailedException esfe = new EventStoreFailedException("Unable to add events"); + esfe.initCause(exception); + throw esfe; + })); + } + + @Override + @SuppressWarnings("unchecked") + public Publisher getEventsOfAggregate(AggregateId aggregateId) { + Preconditions.checkNotNull(aggregateId); + return Mono.fromSupplier(() -> new TransactionRunner(entityManagerFactory).runAndRetrieveResult( + entityManager -> History.of( + (Event[]) entityManager.createNamedQuery(SELECT_AGGREGATE_QUERY) + .setParameter("aggregateId", aggregateId.asAggregateKey()) + .getResultStream() + .map(Throwing.function(e -> jsonEventSerializer.deserialize(((JPAEvent) e).getEvent()))) + .toArray(Event[]::new)))); + } + + @Override + public Publisher remove(AggregateId aggregateId) { + return Mono.fromSupplier(() -> new TransactionRunner(entityManagerFactory).runAndRetrieveResult( + entityManager -> { + entityManager.createNamedQuery(DELETE_AGGREGATE_QUERY) + .setParameter("aggregateId", aggregateId.asAggregateKey()) + .executeUpdate(); + return null; + })); + } + + @VisibleForTesting + protected void removeAll() { + new TransactionRunner(entityManagerFactory).runAndRetrieveResult( + entityManager -> entityManager.createQuery("DELETE FROM JPAEvent").executeUpdate()); + } +} diff --git a/event-sourcing/event-store-jpa/src/main/java/org/apache/james/eventsourcing/eventstore/jpa/model/JPAEvent.java b/event-sourcing/event-store-jpa/src/main/java/org/apache/james/eventsourcing/eventstore/jpa/model/JPAEvent.java new file mode 100644 index 00000000000..db689322203 --- /dev/null +++ b/event-sourcing/event-store-jpa/src/main/java/org/apache/james/eventsourcing/eventstore/jpa/model/JPAEvent.java @@ -0,0 +1,124 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.eventsourcing.eventstore.jpa.model; + +import static org.apache.james.eventsourcing.eventstore.jpa.model.JPAEvent.DELETE_AGGREGATE_QUERY; +import static org.apache.james.eventsourcing.eventstore.jpa.model.JPAEvent.JPAEventId; +import static org.apache.james.eventsourcing.eventstore.jpa.model.JPAEvent.SELECT_AGGREGATE_QUERY; + +import java.io.Serializable; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.IdClass; +import jakarta.persistence.Index; +import jakarta.persistence.Lob; +import jakarta.persistence.NamedQuery; +import jakarta.persistence.Table; + +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.EventId; + +import com.google.common.base.Objects; + + + +/** + * JPAEvent class for the James Event Sourcing to be used for JPA persistence. + */ +@Entity(name = "JPAEvent") +@Table(name = JPAEvent.JAMES_EVENTS, indexes = { + @Index(name = "AGGREGATE_ID_INDEX", columnList = "AGGREGATE_ID") +}) +@NamedQuery(name = SELECT_AGGREGATE_QUERY, query = "SELECT e FROM JPAEvent e WHERE e.aggregateId=:aggregateId") +@NamedQuery(name = DELETE_AGGREGATE_QUERY, query = "DELETE FROM JPAEvent e WHERE e.aggregateId=:aggregateId") +@IdClass(JPAEventId.class) +public class JPAEvent { + public static final String SELECT_AGGREGATE_QUERY = "selectAggregateEvents"; + public static final String DELETE_AGGREGATE_QUERY = "deleteAggregateEvents"; + + public static final String JAMES_EVENTS = "JAMES_EVENTS"; + + public static class JPAEventId implements Serializable { + + private static final long serialVersionUID = 1L; + + private String aggregateId; + + private int eventId; + + public JPAEventId() { + } + + @Override + public int hashCode() { + return Objects.hashCode(aggregateId, eventId); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final JPAEventId other = (JPAEventId) obj; + return Objects.equal(this.aggregateId, other.aggregateId) + && Objects.equal(this.eventId, other.eventId); + } + } + + @Id + @Column(name = "AGGREGATE_ID", nullable = false, length = 100) + private String aggregateId = ""; + + @Id + @Column(name = "EVENT_ID", nullable = false) + private int eventId; + + @Lob + @Column(name = "EVENT", nullable = false, length = 1048576000) + private String event = ""; + + /** + * Default no-args constructor for JPA class enhancement. + * The constructor need to be public or protected to be used by JPA. + * See: http://docs.oracle.com/javaee/6/tutorial/doc/bnbqa.html + * Do not us this constructor, it is for JPA only. + */ + protected JPAEvent() { + } + + public JPAEvent(AggregateId aggregateId, EventId eventId, String event) { + this.aggregateId = aggregateId.asAggregateKey(); + this.eventId = eventId.serialize(); + this.event = event; + } + + public EventId getEventId() { + return EventId.fromSerialized(eventId); + } + + public String getEvent() { + return event; + } + +} diff --git a/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventSourcingSystemTest.java b/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventSourcingSystemTest.java new file mode 100644 index 00000000000..5da1bb6d2f0 --- /dev/null +++ b/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventSourcingSystemTest.java @@ -0,0 +1,33 @@ +/*************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.eventsourcing.eventstore.jpa; + +import org.apache.james.eventsourcing.EventSourcingSystemTest; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(JPAEventStoreExtension.class) +class JPAEventSourcingSystemTest implements EventSourcingSystemTest { + + @AfterEach + public void tearDown(EventStore store) { + ((JPAEventStore)store).removeAll(); + } +} diff --git a/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStoreExtension.java b/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStoreExtension.java new file mode 100644 index 00000000000..e4234e7f0c3 --- /dev/null +++ b/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStoreExtension.java @@ -0,0 +1,44 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.eventsourcing.eventstore.jpa; + +import org.apache.james.backends.jpa.JpaTestCluster; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.eventstore.JsonEventSerializer; +import org.apache.james.eventsourcing.eventstore.dto.TestEventDTOModules; +import org.apache.james.eventsourcing.eventstore.jpa.model.JPAEvent; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolver; + +class JPAEventStoreExtension implements ParameterResolver { + + public static final JpaTestCluster JPA_TEST_CLUSTER = JpaTestCluster.create(JPAEvent.class); + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) { + return parameterContext.getParameter().getType().equals(EventStore.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) { + JsonEventSerializer jsonEventSerializer = JsonEventSerializer.forModules(TestEventDTOModules.TEST_TYPE()).withoutNestedType(); + return new JPAEventStore(JPA_TEST_CLUSTER.getEntityManagerFactory(), jsonEventSerializer); + } +} diff --git a/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStoreTest.java b/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStoreTest.java new file mode 100644 index 00000000000..d69d46c915a --- /dev/null +++ b/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/JPAEventStoreTest.java @@ -0,0 +1,33 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.eventsourcing.eventstore.jpa; + +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.eventstore.EventStoreContract; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(JPAEventStoreExtension.class) +class JPAEventStoreTest implements EventStoreContract { + + @AfterEach + public void tearDown(EventStore store) { + ((JPAEventStore)store).removeAll(); + } +} diff --git a/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/model/JPAEventTest.java b/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/model/JPAEventTest.java new file mode 100644 index 00000000000..967edecd6dd --- /dev/null +++ b/event-sourcing/event-store-jpa/src/test/java/org/apache/james/eventsourcing/eventstore/jpa/model/JPAEventTest.java @@ -0,0 +1,35 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.eventsourcing.eventstore.jpa.model; + +import org.junit.jupiter.api.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; +import nl.jqno.equalsverifier.Warning; + +class JPAEventTest { + + @Test + void shouldMatchBeanContract() { + EqualsVerifier.forClass(JPAEvent.JPAEventId.class) + .usingGetClass() + .suppress(Warning.SURROGATE_KEY) + .verify(); + } +} diff --git a/event-sourcing/event-store-jpa/src/test/resources/persistence.xml b/event-sourcing/event-store-jpa/src/test/resources/persistence.xml new file mode 100644 index 00000000000..fe4665d269d --- /dev/null +++ b/event-sourcing/event-store-jpa/src/test/resources/persistence.xml @@ -0,0 +1,40 @@ + + + + + + + org.apache.openjpa.persistence.PersistenceProviderImpl + osgi:service/javax.sql.DataSource/(osgi.jndi.service.name=jdbc/james) + org.apache.james.eventsourcing.eventstore.jpa.model.JPAEvent + true + + + + + + + + + + diff --git a/event-sourcing/pom.xml b/event-sourcing/pom.xml index f14f296631e..c4033287a87 100644 --- a/event-sourcing/pom.xml +++ b/event-sourcing/pom.xml @@ -36,6 +36,7 @@ event-sourcing-pojo event-store-api event-store-cassandra + event-store-jpa event-store-memory