Command Side Usage
Surge provides an engine that must be configured with domain logic. It uses this domain logic under the hood to determine what events should be emitted based on a command and how to update the state of an aggregate based on those events. This page describes how to configure, create, and interact with a Surge command engine.
Module Info
- sbt
val SurgeVersion = "0.0.0+1-862ac3a4-SNAPSHOT" libraryDependencies += "com.ukg" %% "surge-engine-command-scaladsl" % SurgeVersion
- Maven
<properties> <surge.version>0.0.0+1-862ac3a4-SNAPSHOT</surge.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.ukg</groupId> <artifactId>surge-engine-command-scaladsl_${scala.binary.version}</artifactId> <version>${surge.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ SurgeVersion: "0.0.0+1-862ac3a4-SNAPSHOT", ScalaBinary: "2.13" ] dependencies { implementation "com.ukg:surge-engine-command-scaladsl_${versions.ScalaBinary}:${versions.SurgeVersion}" }
Domain Logic
Aggregates
An aggregate type is the base type for an instance of the Surge engine. Multiple Surge engines can be run within a service, but one instance of the engine should map to exactly one type of aggregate. Aggregates should be simple data classes used to maintain any context/state needed to process incoming commands.
Please, make sure your Aggregate inherits java.io.Serializable
to guarantee it gets serialized and deserialized correctly.
- Scala
-
source
object BankAccount { implicit val format: Format[BankAccount] = Json.format } case class BankAccount(accountNumber: UUID, accountOwner: String, securityCode: String, balance: Double)
- Java
-
source
@JsonSerialize @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) public class BankAccount { private final UUID accountNumber; private final String accountOwner; private final String securityCode; private double balance; public BankAccount(UUID accountNumber, String accountOwner, String securityCode, double balance) { this.accountNumber = accountNumber; this.accountOwner = accountOwner; this.securityCode = securityCode; this.balance = balance; } public UUID getAccountNumber() { return accountNumber; } public String getAccountOwner() { return accountOwner; } public String getSecurityCode() { return securityCode; } public double getBalance() { return balance; } }
Commands
Commands are data classes intended to handled by an instance of an aggregate to emit events or reject the command and return an error to the end user. Surge strongly types a command engine on an aggregate type, command type, and event type, so any commands you intend to send to an aggregate must inherit from the same base class/interface.
- Scala
-
source
sealed trait BankAccountCommand { def accountNumber: UUID } case class CreateAccount(accountNumber: UUID, accountOwner: String, securityCode: String, initialBalance: Double) extends BankAccountCommand case class CreditAccount(accountNumber: UUID, amount: Double) extends BankAccountCommand case class DebitAccount(accountNumber: UUID, amount: Double) extends BankAccountCommand
- Java
-
source
public interface BankAccountCommand { UUID getAccountNumber(); }
source
public class CreateAccount implements BankAccountCommand { private final UUID accountNumber; private final String accountOwner; private final double balance; private final String securityCode; public CreateAccount(UUID accountNumber, String accountOwner, String securityCode, double initialBalance) { this.accountNumber = accountNumber; this.accountOwner = accountOwner; this.securityCode = securityCode; this.balance = initialBalance; } @Override public UUID getAccountNumber() { return accountNumber; } public String getAccountOwner() { return accountOwner; } public double getBalance() { return balance; } public String getSecurityCode() { return securityCode; } }
source
public class CreditAccount implements BankAccountCommand { private final UUID accountNumber; private final double amount; public CreditAccount(UUID accountNumber, double amount) { this.accountNumber = accountNumber; this.amount = amount; } @Override public UUID getAccountNumber() { return accountNumber; } public double getAmount() { return amount; } }
source
public class DebitAccount implements BankAccountCommand { private final UUID accountNumber; private final double amount; public DebitAccount(UUID accountNumber, double amount) { this.accountNumber = accountNumber; this.amount = amount; } @Override public UUID getAccountNumber() { return accountNumber; } public double getAmount() { return amount; } }
Events
Events are the result of successfully applied commands.
- Scala
-
source
sealed trait BankAccountEvent { def accountNumber: UUID def toJson: JsValue } object BankAccountCreated { implicit val format: Format[BankAccountCreated] = Json.format } case class BankAccountCreated(accountNumber: UUID, accountOwner: String, securityCode: String, balance: Double) extends BankAccountEvent { override def toJson: JsValue = Json.toJson(this) } object BankAccountUpdated { implicit val format: Format[BankAccountUpdated] = Json.format } case class BankAccountUpdated(accountNumber: UUID, newBalance: Double) extends BankAccountEvent { override def toJson: JsValue = Json.toJson(this) }
- Java
-
source
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "name") @JsonSubTypes({ @JsonSubTypes.Type(value = BankAccountCreated.class, name = "BankAccountCreated"), @JsonSubTypes.Type(value = BankAccountUpdated.class, name = "BankAccountUpdated"), }) public interface BankAccountEvent { UUID getAccountNumber(); }
source
@JsonSerialize @JsonTypeName("BankAccountCreated") @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) public class BankAccountCreated implements BankAccountEvent { private final UUID accountNumber; private final String accountOwner; private final String securityCode; private final double balance; public BankAccountCreated(UUID accountNumber, String accountOwner, String securityCode, double balance) { this.accountNumber = accountNumber; this.accountOwner = accountOwner; this.securityCode = securityCode; this.balance = balance; } @Override public UUID getAccountNumber() { return accountNumber; } public String getAccountOwner() { return accountOwner; } public String getSecurityCode() { return securityCode; } public double getBalance() { return balance; } }
source
@JsonSerialize @JsonTypeName("BankAccountUpdated") @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) public class BankAccountUpdated implements BankAccountEvent { public double amount; public UUID accountNumber; public BankAccountUpdated(UUID accountNumber, double amount) { this.amount = amount; this.accountNumber = accountNumber; } @Override public UUID getAccountNumber() { return accountNumber; } public double getAmount() { return amount; } }
Surge Configuration
The configuration of a Surge engine is done by implementing a command model interface with your particular domain logic and a Surge model interface with Kafka topic configuration.
The command model interface is a way to express the way a domain should handle commands and process events for a particular instance of an aggregate. This is the main wiring of domain logic that Surge leverages. It is strongly typed on aggregate Id type, aggregate, base command, and base event type.
- Scala
-
source
object BankAccountCommandModel extends AggregateCommandModel[BankAccount, BankAccountCommand, BankAccountEvent] { override def processCommand(aggregate: Option[BankAccount], command: BankAccountCommand): Try[Seq[BankAccountEvent]] = { command match { case create: CreateAccount => if (aggregate.isDefined) { // Aggregate already exists - no need to recreate Success(Seq.empty) } else { Success(Seq(BankAccountCreated(create.accountNumber, create.accountOwner, create.securityCode, create.initialBalance))) } case credit: CreditAccount => aggregate .map { existing => Success(Seq(BankAccountUpdated(existing.accountNumber, existing.balance + credit.amount))) } .getOrElse(Failure(new AccountDoesNotExistException(command.accountNumber))) case debit: DebitAccount => aggregate .map { existing => if (existing.balance >= debit.amount) { Success(Seq(BankAccountUpdated(existing.accountNumber, existing.balance - debit.amount))) } else { Failure(new InsufficientFundsException(existing.accountNumber)) } } .getOrElse(Failure(new AccountDoesNotExistException(command.accountNumber))) } } override def handleEvent(aggregate: Option[BankAccount], event: BankAccountEvent): Option[BankAccount] = { event match { case create: BankAccountCreated => Some(BankAccount(create.accountNumber, create.accountOwner, create.securityCode, create.balance)) case updated: BankAccountUpdated => aggregate.map(_.copy(balance = updated.newBalance)) } } }
- Java
-
source
public class BankAccountCommandModel implements AggregateCommandModel<BankAccount, BankAccountCommand, BankAccountEvent> { @Override public List<BankAccountEvent> processCommand(Optional<BankAccount> aggregate, BankAccountCommand command) { // users can feel free to use vavr pattern matching as an alternative to instanceOf. if (command instanceof CreateAccount) { CreateAccount createAccount = (CreateAccount) command; if (aggregate.isPresent()) { return new ArrayList<>(); } else { BankAccountCreated bankAccountCreated = new BankAccountCreated(createAccount.getAccountNumber(), createAccount.getAccountOwner() , createAccount.getSecurityCode() , createAccount.getBalance()); return Collections.singletonList(bankAccountCreated); } } if (command instanceof CreditAccount) { CreditAccount creditAccount = (CreditAccount) command; if (aggregate.isPresent()) { BankAccount bankAccount = aggregate.get(); BankAccountUpdated bankAccountUpdated = new BankAccountUpdated(creditAccount.getAccountNumber() , bankAccount.getBalance() + creditAccount.getAmount()); return Collections.singletonList(bankAccountUpdated); } else { throw new RuntimeException("Account does not exist"); } } if (command instanceof DebitAccount) { DebitAccount debitAccount = (DebitAccount) command; if (aggregate.isPresent()) { BankAccount bankAccount = aggregate.get(); if (bankAccount.getBalance() >= debitAccount.getAmount()) { BankAccountUpdated bankAccountUpdated = new BankAccountUpdated(bankAccount.getAccountNumber(), bankAccount.getBalance() - debitAccount.getAmount()); return Collections.singletonList(bankAccountUpdated); } else { throw new RuntimeException("Insufficient funds"); } } else { throw new RuntimeException("Account does not exist"); } } throw new RuntimeException("Unhandled command"); } @Override public Optional<BankAccount> handleEvent(Optional<BankAccount> aggregate, BankAccountEvent event) { // users can feel free to use vavr pattern matching as an alternative to instanceOf. if (event instanceof BankAccountCreated) { BankAccountCreated bankAccountCreated = (BankAccountCreated)event; Optional<BankAccount> bankAccount; bankAccount = Optional.of(new BankAccount(event.getAccountNumber(), bankAccountCreated.getAccountOwner(), bankAccountCreated.getSecurityCode(), bankAccountCreated.getBalance())); return bankAccount; } if (event instanceof BankAccountUpdated) { BankAccountUpdated bankAccountUpdated = (BankAccountUpdated)event; return aggregate.map((item) -> new BankAccount(item.getAccountNumber(), item.getAccountOwner() , item.getSecurityCode(), bankAccountUpdated.getAmount())); } throw new RuntimeException("Unhandled event"); } }
The command model gets wired into Surge through a Surge model, which additionally includes the topics to publish to.
- Scala
-
source
object BankAccountSurgeModel extends SurgeCommandBusinessLogic[UUID, BankAccount, BankAccountCommand, BankAccountEvent] { override def commandModel: AggregateCommandModel[BankAccount, BankAccountCommand, BankAccountEvent] = BankAccountCommandModel override def aggregateName: String = "bank-account" override def stateTopic: KafkaTopic = KafkaTopic("bank-account-state") override def eventsTopic: KafkaTopic = KafkaTopic("bank-account-events") override def aggregateReadFormatting: SurgeAggregateReadFormatting[BankAccount] = { (bytes: Array[Byte]) => Json.parse(bytes).asOpt[BankAccount] } override def aggregateWriteFormatting: SurgeAggregateWriteFormatting[BankAccount] = (agg: BankAccount) => { SerializedAggregate(Json.toJson(agg).toString().getBytes(), Map("aggregate_id" -> agg.accountNumber.toString)) } override def eventWriteFormatting: SurgeEventWriteFormatting[BankAccountEvent] = (evt: BankAccountEvent) => { SerializedMessage(evt.accountNumber.toString, Json.toJson(evt)(Json.format[BankAccountEvent]).toString().getBytes()) } }
- Java
-
source
public class BankAccountSurgeModel extends SurgeCommandBusinessLogic<UUID, BankAccount, BankAccountCommand, BankAccountEvent> { @Override public AggregateCommandModel<BankAccount, BankAccountCommand, BankAccountEvent> commandModel() { return new BankAccountCommandModel(); } @Override public String aggregateName() { return "bank-account"; } @Override public KafkaTopic stateTopic() { return new KafkaTopic("bank-account-state"); } @Override public KafkaTopic eventsTopic() { return new KafkaTopic("bank-account-events"); } @Override public SurgeAggregateReadFormatting<BankAccount> aggregateReadFormatting() { return new SurgeAggregateReadFormattingBankAccount(); } @Override public SurgeEventWriteFormatting<BankAccountEvent> eventWriteFormatting() { return new SurgeEventWriteFormattingBankEvent(); } @Override public SurgeAggregateWriteFormatting<BankAccount> aggregateWriteFormatting() { return new SurgeAggregateWriteFormattingBankAccount(); } }
source
public class SurgeAggregateReadFormattingBankAccount implements SurgeAggregateReadFormatting<BankAccount> { @Override public Option<BankAccount> readState(byte[] bytes) { ObjectMapper objectMapper = new ObjectMapper(); try { return Option.apply(objectMapper.readValue(bytes, BankAccount.class)); } catch (IOException e) { return Option.empty(); } } }
source
public class SurgeAggregateWriteFormattingBankAccount implements SurgeAggregateWriteFormatting<BankAccount> { private final ObjectMapper objectMapper = new ObjectMapper(); @Override public SerializedAggregate writeState(BankAccount bankAccount) { try { byte[] bankAccountBytes = objectMapper.writeValueAsBytes(bankAccount); return SerializedAggregate.create(bankAccountBytes); } catch (Exception e) { throw new RuntimeException(e); } } }
source
public class SurgeEventWriteFormattingBankEvent implements SurgeEventWriteFormatting<BankAccountEvent> { private final ObjectMapper objectMapper = new ObjectMapper(); @Override public SerializedMessage writeEvent(BankAccountEvent evt) { try { return SerializedMessage.create(evt.getAccountNumber().toString(), objectMapper.writeValueAsBytes(evt)); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } }
Creating and interacting with the engine
Once the configuration is defined you can inject it into a Surge engine:
- Scala
-
source
object BankAccountEngine { implicit val ec = DiagnosticContextFuturePropagation.global lazy val surgeEngine: SurgeCommand[UUID, BankAccount, BankAccountCommand, BankAccountEvent] = { val engine = SurgeCommand(BankAccountSurgeModel) engine.start() engine } }
- Java
-
source
BankAccountSurgeModel bankAccountSurgeModel = new BankAccountSurgeModel(); SurgeCommand<UUID, BankAccount, BankAccountCommand, BankAccountEvent> surgeCommand = new SurgeCommandBuilder() .withBusinessLogic(bankAccountSurgeModel).build(); surgeCommand.start();
You can interact with the running engine via two main methods, sendCommand
for sending commands and getState
for fetching state:
- Scala
-
source
val accountNumber = UUID.randomUUID() val createAccount = CreateAccount(accountNumber, "Jane Doe", "1234", 1000.0) val createdAccount: Future[Option[BankAccount]] = BankAccountEngine.surgeEngine.aggregateFor(accountNumber).sendCommand(createAccount).map { case CommandSuccess(aggregateState) => aggregateState case CommandFailure(reason) => throw reason } val creditAccount = CreditAccount(accountNumber, 100.0) val creditedAccount: Future[Option[BankAccount]] = BankAccountEngine.surgeEngine.aggregateFor(accountNumber).sendCommand(creditAccount).map { case CommandSuccess(aggregateState) => aggregateState case CommandFailure(reason) => throw reason }
- Java
-
source
UUID accountNumber = UUID.randomUUID(); logger.info("Account number is: {}", accountNumber); CreateAccount createAccount = new CreateAccount(accountNumber, "Jane Doe", "1234", 1000.0); CompletionStage<CommandResult<BankAccount>> completionStageCreateAccount = surgeCommand .aggregateFor(accountNumber).sendCommand(createAccount); completionStageCreateAccount.whenComplete((CommandResult<BankAccount> result, Throwable ex) -> { if (ex != null) { ex.printStackTrace(); } else { if (result instanceof CommandSuccess) { CommandSuccess<BankAccount> commandSuccess = (CommandSuccess<BankAccount>) result; logger.info("Aggregate state is: {} ", commandSuccess.aggregateState()); } else if (result instanceof CommandFailure) { CommandFailure<BankAccount> commandFailure = (CommandFailure<BankAccount>) result; commandFailure.reason().printStackTrace(); } } });
The sendCommand
method will return a CommandSuccess
if the command was processed successfully, even if no events are emitted or a CommandFailure
wrapping any exception thrown as part of the command processing logic if one is thrown.
- Scala
-
source
val currentState: Future[Option[BankAccount]] = BankAccountEngine.surgeEngine.aggregateFor(accountNumber).getState
- Java
-
source
CompletionStage<Optional<BankAccount>> currentState = surgeCommand.aggregateFor(accountNumber).getState(); currentState.whenComplete((bankAccount, throwable) -> { if (throwable != null) { throwable.printStackTrace(); } else { logger.info("Current state is: {}", bankAccount); } });
The getState
method will return the current state of the given aggregate if it exists.