Command Side Usage

Surge provides an engine that must be configured with domain logic. It uses this domain logic under the hood to determine what events should be emitted based on a command and how to update the state of an aggregate based on those events. This page describes how to configure, create, and interact with a Surge command engine.

Module Info

sbt
val SurgeVersion = "0.0.0+1-862ac3a4-SNAPSHOT"
libraryDependencies += "com.ukg" %% "surge-engine-command-scaladsl" % SurgeVersion
Maven
<properties>
  <surge.version>0.0.0+1-862ac3a4-SNAPSHOT</surge.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.ukg</groupId>
    <artifactId>surge-engine-command-scaladsl_${scala.binary.version}</artifactId>
    <version>${surge.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  SurgeVersion: "0.0.0+1-862ac3a4-SNAPSHOT",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.ukg:surge-engine-command-scaladsl_${versions.ScalaBinary}:${versions.SurgeVersion}"
}

Domain Logic

Aggregates

An aggregate type is the base type for an instance of the Surge engine. Multiple Surge engines can be run within a service, but one instance of the engine should map to exactly one type of aggregate. Aggregates should be simple data classes used to maintain any context/state needed to process incoming commands.

Note

Please, make sure your Aggregate inherits java.io.Serializable to guarantee it gets serialized and deserialized correctly.

Scala
sourceobject BankAccount {
  implicit val format: Format[BankAccount] = Json.format
}
case class BankAccount(accountNumber: UUID, accountOwner: String, securityCode: String, balance: Double)
Java
source@JsonSerialize
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class BankAccount {
    private final UUID accountNumber;
    private final String accountOwner;
    private final String securityCode;
    private double balance;

    public BankAccount(UUID accountNumber, String accountOwner, String securityCode, double balance) {
        this.accountNumber = accountNumber;
        this.accountOwner = accountOwner;
        this.securityCode = securityCode;
        this.balance = balance;
    }

    public UUID getAccountNumber() {
        return accountNumber;
    }

    public String getAccountOwner() {
        return accountOwner;
    }

    public String getSecurityCode() {
        return securityCode;
    }

    public double getBalance() {
        return balance;
    }
}

Commands

Commands are data classes intended to handled by an instance of an aggregate to emit events or reject the command and return an error to the end user. Surge strongly types a command engine on an aggregate type, command type, and event type, so any commands you intend to send to an aggregate must inherit from the same base class/interface.

Scala
sourcesealed trait BankAccountCommand {
  def accountNumber: UUID
}
case class CreateAccount(accountNumber: UUID, accountOwner: String, securityCode: String, initialBalance: Double) extends BankAccountCommand
case class CreditAccount(accountNumber: UUID, amount: Double) extends BankAccountCommand
case class DebitAccount(accountNumber: UUID, amount: Double) extends BankAccountCommand
Java
sourcepublic interface BankAccountCommand {
    UUID getAccountNumber();
}
sourcepublic class CreateAccount implements BankAccountCommand {
    private final UUID accountNumber;
    private final String accountOwner;
    private final double balance;
    private final String securityCode;

    public CreateAccount(UUID accountNumber, String accountOwner, String securityCode, double initialBalance) {
        this.accountNumber = accountNumber;
        this.accountOwner = accountOwner;
        this.securityCode = securityCode;
        this.balance = initialBalance;
    }

    @Override
    public UUID getAccountNumber() {
        return accountNumber;
    }

    public String getAccountOwner() {
        return accountOwner;
    }

    public double getBalance() {
        return balance;
    }

    public String getSecurityCode() {
        return securityCode;
    }
}
sourcepublic class CreditAccount implements BankAccountCommand {
    private final UUID accountNumber;
    private final double amount;
    public CreditAccount(UUID accountNumber, double amount) {
        this.accountNumber = accountNumber;
        this.amount = amount;
    }
    @Override
    public UUID getAccountNumber() {
        return accountNumber;
    }

    public double getAmount() {
        return amount;
    }
}
sourcepublic class DebitAccount implements BankAccountCommand {
    private final UUID accountNumber;
    private final double amount;
    public DebitAccount(UUID accountNumber, double amount) {
        this.accountNumber = accountNumber;
        this.amount = amount;
    }

    @Override
    public UUID getAccountNumber() {
        return accountNumber;
    }

    public double getAmount() {
        return amount;
    }
}

Events

Events are the result of successfully applied commands.

Scala
sourcesealed trait BankAccountEvent {
  def accountNumber: UUID
  def toJson: JsValue
}
object BankAccountCreated {
  implicit val format: Format[BankAccountCreated] = Json.format
}
case class BankAccountCreated(accountNumber: UUID, accountOwner: String, securityCode: String, balance: Double) extends BankAccountEvent {
  override def toJson: JsValue = Json.toJson(this)
}

object BankAccountUpdated {
  implicit val format: Format[BankAccountUpdated] = Json.format
}
case class BankAccountUpdated(accountNumber: UUID, newBalance: Double) extends BankAccountEvent {
  override def toJson: JsValue = Json.toJson(this)
}
Java
source@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
        property = "name")
@JsonSubTypes({
        @JsonSubTypes.Type(value = BankAccountCreated.class, name = "BankAccountCreated"),
        @JsonSubTypes.Type(value = BankAccountUpdated.class, name = "BankAccountUpdated"),
})
public interface BankAccountEvent {

    UUID getAccountNumber();
}
source@JsonSerialize
@JsonTypeName("BankAccountCreated")
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class BankAccountCreated implements BankAccountEvent {

    private final UUID accountNumber;
    private final String accountOwner;
    private final String securityCode;
    private final double balance;

    public BankAccountCreated(UUID accountNumber, String accountOwner, String securityCode,
                              double balance) {
        this.accountNumber = accountNumber;
        this.accountOwner = accountOwner;
        this.securityCode = securityCode;
        this.balance = balance;
    }

    @Override
    public UUID getAccountNumber() {
        return accountNumber;
    }

    public String getAccountOwner() {
        return accountOwner;
    }

    public String getSecurityCode() {
        return securityCode;
    }

    public double getBalance() {
        return balance;
    }
}
source@JsonSerialize
@JsonTypeName("BankAccountUpdated")
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class BankAccountUpdated implements BankAccountEvent {

    public double amount;
    public UUID accountNumber;

    public BankAccountUpdated(UUID accountNumber, double amount) {
        this.amount = amount;
        this.accountNumber = accountNumber;
    }

    @Override
    public UUID getAccountNumber() {
        return accountNumber;
    }

    public double getAmount() {
        return amount;
    }
}

Surge Configuration

The configuration of a Surge engine is done by implementing a command model interface with your particular domain logic and a Surge model interface with Kafka topic configuration.

The command model interface is a way to express the way a domain should handle commands and process events for a particular instance of an aggregate. This is the main wiring of domain logic that Surge leverages. It is strongly typed on aggregate Id type, aggregate, base command, and base event type.

Scala
sourceobject BankAccountCommandModel extends AggregateCommandModel[BankAccount, BankAccountCommand, BankAccountEvent] {
  override def processCommand(aggregate: Option[BankAccount], command: BankAccountCommand): Try[Seq[BankAccountEvent]] = {
    command match {
      case create: CreateAccount =>
        if (aggregate.isDefined) {
          // Aggregate already exists - no need to recreate
          Success(Seq.empty)
        } else {
          Success(Seq(BankAccountCreated(create.accountNumber, create.accountOwner, create.securityCode, create.initialBalance)))
        }
      case credit: CreditAccount =>
        aggregate
          .map { existing =>
            Success(Seq(BankAccountUpdated(existing.accountNumber, existing.balance + credit.amount)))
          }
          .getOrElse(Failure(new AccountDoesNotExistException(command.accountNumber)))
      case debit: DebitAccount =>
        aggregate
          .map { existing =>
            if (existing.balance >= debit.amount) {
              Success(Seq(BankAccountUpdated(existing.accountNumber, existing.balance - debit.amount)))
            } else {
              Failure(new InsufficientFundsException(existing.accountNumber))
            }
          }
          .getOrElse(Failure(new AccountDoesNotExistException(command.accountNumber)))
    }
  }

  override def handleEvent(aggregate: Option[BankAccount], event: BankAccountEvent): Option[BankAccount] = {
    event match {
      case create: BankAccountCreated  => Some(BankAccount(create.accountNumber, create.accountOwner, create.securityCode, create.balance))
      case updated: BankAccountUpdated => aggregate.map(_.copy(balance = updated.newBalance))
    }
  }
}
Java
sourcepublic class BankAccountCommandModel implements AggregateCommandModel<BankAccount, BankAccountCommand, BankAccountEvent> {

    @Override
    public List<BankAccountEvent> processCommand(Optional<BankAccount> aggregate, BankAccountCommand command) {

        // users can feel free to use vavr pattern matching as an alternative to instanceOf.
        if (command instanceof CreateAccount) {
            CreateAccount createAccount = (CreateAccount) command;
            if (aggregate.isPresent()) {
                return new ArrayList<>();
            } else {
                BankAccountCreated bankAccountCreated = new BankAccountCreated(createAccount.getAccountNumber(),
                        createAccount.getAccountOwner()
                        , createAccount.getSecurityCode()
                        , createAccount.getBalance());
                return Collections.singletonList(bankAccountCreated);

            }
        }
        if (command instanceof CreditAccount) {
            CreditAccount creditAccount = (CreditAccount) command;

            if (aggregate.isPresent()) {
                BankAccount bankAccount = aggregate.get();
                BankAccountUpdated bankAccountUpdated = new BankAccountUpdated(creditAccount.getAccountNumber()
                        , bankAccount.getBalance() + creditAccount.getAmount());
                return Collections.singletonList(bankAccountUpdated);

            } else {
                throw new RuntimeException("Account does not exist");
            }
        }
        if (command instanceof DebitAccount) {
            DebitAccount debitAccount = (DebitAccount) command;
            if (aggregate.isPresent()) {
                BankAccount bankAccount = aggregate.get();
                if (bankAccount.getBalance() >= debitAccount.getAmount()) {
                    BankAccountUpdated bankAccountUpdated = new BankAccountUpdated(bankAccount.getAccountNumber(),
                            bankAccount.getBalance() - debitAccount.getAmount());
                    return Collections.singletonList(bankAccountUpdated);

                } else {
                    throw new RuntimeException("Insufficient funds");
                }
            } else {
                throw new RuntimeException("Account does not exist");
            }
        }
        throw new RuntimeException("Unhandled command");
    }

    @Override
    public Optional<BankAccount> handleEvent(Optional<BankAccount> aggregate, BankAccountEvent event) {

        // users can feel free to use vavr pattern matching as an alternative to instanceOf.
        if (event instanceof BankAccountCreated) {
            BankAccountCreated bankAccountCreated = (BankAccountCreated)event;
            Optional<BankAccount> bankAccount;
            bankAccount = Optional.of(new BankAccount(event.getAccountNumber(), bankAccountCreated.getAccountOwner(),
                    bankAccountCreated.getSecurityCode(), bankAccountCreated.getBalance()));
            return bankAccount;
        }
        if (event instanceof BankAccountUpdated) {
            BankAccountUpdated bankAccountUpdated = (BankAccountUpdated)event;

            return aggregate.map((item) -> new BankAccount(item.getAccountNumber(), item.getAccountOwner()
                    , item.getSecurityCode(), bankAccountUpdated.getAmount()));
        }
        throw new RuntimeException("Unhandled event");
    }

}

The command model gets wired into Surge through a Surge model, which additionally includes the topics to publish to.

Scala
sourceobject BankAccountSurgeModel extends SurgeCommandBusinessLogic[UUID, BankAccount, BankAccountCommand, BankAccountEvent] {
  override def commandModel: AggregateCommandModel[BankAccount, BankAccountCommand, BankAccountEvent] = BankAccountCommandModel

  override def aggregateName: String = "bank-account"

  override def stateTopic: KafkaTopic = KafkaTopic("bank-account-state")

  override def eventsTopic: KafkaTopic = KafkaTopic("bank-account-events")

  override def aggregateReadFormatting: SurgeAggregateReadFormatting[BankAccount] = { (bytes: Array[Byte]) =>
    Json.parse(bytes).asOpt[BankAccount]
  }

  override def aggregateWriteFormatting: SurgeAggregateWriteFormatting[BankAccount] = (agg: BankAccount) => {
    SerializedAggregate(Json.toJson(agg).toString().getBytes(), Map("aggregate_id" -> agg.accountNumber.toString))
  }

  override def eventWriteFormatting: SurgeEventWriteFormatting[BankAccountEvent] = (evt: BankAccountEvent) => {
    SerializedMessage(evt.accountNumber.toString, Json.toJson(evt)(Json.format[BankAccountEvent]).toString().getBytes())
  }
}
Java
sourcepublic class BankAccountSurgeModel extends SurgeCommandBusinessLogic<UUID, BankAccount, BankAccountCommand,
        BankAccountEvent> {
    @Override
    public AggregateCommandModel<BankAccount, BankAccountCommand, BankAccountEvent> commandModel() {
        return new BankAccountCommandModel();
    }

    @Override
    public String aggregateName() {
        return "bank-account";
    }

    @Override
    public KafkaTopic stateTopic() {
        return new KafkaTopic("bank-account-state");
    }

    @Override
    public KafkaTopic eventsTopic() {
        return new KafkaTopic("bank-account-events");
    }

    @Override
    public SurgeAggregateReadFormatting<BankAccount> aggregateReadFormatting() {
        return new SurgeAggregateReadFormattingBankAccount();
    }

    @Override
    public SurgeEventWriteFormatting<BankAccountEvent> eventWriteFormatting() {
        return new SurgeEventWriteFormattingBankEvent();
    }

    @Override
    public SurgeAggregateWriteFormatting<BankAccount> aggregateWriteFormatting() {
        return new SurgeAggregateWriteFormattingBankAccount();
    }
}
sourcepublic class SurgeAggregateReadFormattingBankAccount implements SurgeAggregateReadFormatting<BankAccount> {
    @Override
    public Option<BankAccount> readState(byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return Option.apply(objectMapper.readValue(bytes, BankAccount.class));
        } catch (IOException e) {
            return Option.empty();
        }
    }
}
sourcepublic class SurgeAggregateWriteFormattingBankAccount implements SurgeAggregateWriteFormatting<BankAccount> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public SerializedAggregate writeState(BankAccount bankAccount) {
        try {
            byte[] bankAccountBytes = objectMapper.writeValueAsBytes(bankAccount);
            return SerializedAggregate.create(bankAccountBytes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
sourcepublic class SurgeEventWriteFormattingBankEvent implements SurgeEventWriteFormatting<BankAccountEvent> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public SerializedMessage writeEvent(BankAccountEvent evt) {
        try {
            return SerializedMessage.create(evt.getAccountNumber().toString(), objectMapper.writeValueAsBytes(evt));
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

Creating and interacting with the engine

Once the configuration is defined you can inject it into a Surge engine:

Scala
sourceobject BankAccountEngine {
  implicit val ec = DiagnosticContextFuturePropagation.global
  lazy val surgeEngine: SurgeCommand[UUID, BankAccount, BankAccountCommand, BankAccountEvent] = {
    val engine = SurgeCommand(BankAccountSurgeModel)
    engine.start()
    engine
  }
}
Java
sourceBankAccountSurgeModel bankAccountSurgeModel = new BankAccountSurgeModel();
SurgeCommand<UUID, BankAccount, BankAccountCommand, BankAccountEvent> surgeCommand = new SurgeCommandBuilder()
        .withBusinessLogic(bankAccountSurgeModel).build();
surgeCommand.start();

You can interact with the running engine via two main methods, sendCommand for sending commands and getState for fetching state:

Scala
sourceval accountNumber = UUID.randomUUID()
val createAccount = CreateAccount(accountNumber, "Jane Doe", "1234", 1000.0)

val createdAccount: Future[Option[BankAccount]] = BankAccountEngine.surgeEngine.aggregateFor(accountNumber).sendCommand(createAccount).map {
  case CommandSuccess(aggregateState) => aggregateState
  case CommandFailure(reason)         => throw reason
}
val creditAccount = CreditAccount(accountNumber, 100.0)

val creditedAccount: Future[Option[BankAccount]] = BankAccountEngine.surgeEngine.aggregateFor(accountNumber).sendCommand(creditAccount).map {
  case CommandSuccess(aggregateState) => aggregateState
  case CommandFailure(reason)         => throw reason
}
Java
sourceUUID accountNumber = UUID.randomUUID();
logger.info("Account number is: {}", accountNumber);
CreateAccount createAccount = new CreateAccount(accountNumber, "Jane Doe",
        "1234", 1000.0);

CompletionStage<CommandResult<BankAccount>> completionStageCreateAccount = surgeCommand
        .aggregateFor(accountNumber).sendCommand(createAccount);

completionStageCreateAccount.whenComplete((CommandResult<BankAccount> result, Throwable ex) -> {
    if (ex != null) {
        ex.printStackTrace();
    } else {
        if (result instanceof CommandSuccess) {
            CommandSuccess<BankAccount> commandSuccess = (CommandSuccess<BankAccount>) result;
            logger.info("Aggregate state is: {} ", commandSuccess.aggregateState());
        } else if (result instanceof CommandFailure) {
            CommandFailure<BankAccount> commandFailure = (CommandFailure<BankAccount>) result;
            commandFailure.reason().printStackTrace();
        }
    }
});

The sendCommand method will return a CommandSuccess if the command was processed successfully, even if no events are emitted or a CommandFailure wrapping any exception thrown as part of the command processing logic if one is thrown.

Scala
sourceval currentState: Future[Option[BankAccount]] = BankAccountEngine.surgeEngine.aggregateFor(accountNumber).getState
Java
sourceCompletionStage<Optional<BankAccount>> currentState = surgeCommand.aggregateFor(accountNumber).getState();
currentState.whenComplete((bankAccount, throwable) -> {
    if (throwable != null) {
        throwable.printStackTrace();
    } else {
        logger.info("Current state is: {}", bankAccount);
    }
});

The getState method will return the current state of the given aggregate if it exists.

The source code for this page can be found here.