Architecture, NServiceBus

Storing the state of a Long Running Process

In the previous two posts in this series, we’ve seen some examples of long running processes and how to model them. In this article we’ll see where to store the state of a long running process. This is an important topic when talking about long running processes because long running means stateful. We’ll discuss three patterns: storing the state in the domain entity, in the message or in a process instance. To better explain these patterns, we’ll implement subflows from the Order Fulfillment enterprise process.

Order Fulfillment

You can find the code on my GitHub account.

Store the state in the Domain Entity

This is probably the most used approach of the three, although it’s not the best choice in most cases. But it’s overused because it’s simple: you just store the state in the domain entity.

Requirement

Let’s start with what Finance needs to do when it receives the OrderPlaced event: charge the customer. To do that, it will integrate with a 3rd party payment provider. The long running process in this case handles two message:

  • the OrderPlaced event – in which case it will send a ChargeCreditCardRequest
  • the ChargeCreditCardRespone

Implementation

Since we only have two transitions, we could store the state in the Order entity.

Entities Example

Let’s have a look at the code. We’ll use NServiceBus, but the code is readable even if you don’t know NServiceBus or .Net.

This is the OrderPlacedHandler:

public Task Handle(IOrderPlaced message, IMessageHandlerContext context)
{
	var order = new Order(message.OrderId, message.TotalValue);
	order.Status = OrderStatus.Pending;

	orders.Save(order);

	context.Send(new ChargeCreditCardRequest { CorrelationId = message.OrderId, Amount = message.TotalValue });

	return Task.CompletedTask;
}

We save the order with a Pending status and send a ChargeCreditCardRequest.

This is what happens when we get a response back:

public Task Handle(ChargeCreditCardResponse message, IMessageHandlerContext context)
{
	var order = orders.GetById(message.CorrelationId);

	order.Status = message.CardHasBeenCharged ? OrderStatus.Paid : OrderStatus.PaymentFailed;

	Log.Info($"Payment Status for Order with Id {order.OrderId} is {order.Status}");

	orders.Save(order);

	if (order.Status == OrderStatus.Paid)
	{
		context.Publish<IOrderCharged>(orderCharged => { orderCharged.OrderId = order.OrderId; });
	}

	return Task.CompletedTask;
}

Benefits

Easy

This is easy to do. We don’t need any new framework or library.

Drawbacks

Breaks the Single Responsibility Principle

public class Order
{
	public Order(Guid orderId, decimal totalValue)
	{
		this.OrderId = orderId;
		this.TotalValue = totalValue;
	}

	public decimal TotalValue { get; }

	// Process Logic
	public OrderStatus Status { get; set; }

	public Guid OrderId { get; }

	// Business Logic
	public void Cancel(DateTime cancellationDate) { }

	public void ApplyDiscount(DiscountCode discountCode) { }
}

The Order entity can now change for two different reasons: a change in the business rules or a change in the process.

Limited flexibility

The problem with many simple processes is that they don’t stay simple for long. Here are two examples:

  • Cancelling old pending orders. Let’s say that the business comes with a new requirement. We should cancel all pending orders that are older than 30 minutes. How would you implement that? Without the right tool (like NServiceBus’ timeout) you would probably introduce a batch job. This would periodically query the database for pending orders older than 30 minutes and cancel them. The problem is that now you’ve broken the encapsulation of the Order class because you’ve put business logic in the batch job. Batch jobs are usually a code smell and should be avoided.
  • Another example is if we decide to first validate the charge and then charge the customer’s credit card. A two step process has quickly become a three step process.

Tips

You should use this approach only when the process has a couple of steps. Even then, I would think twice. Event if it’s simple now, it might become more complex soon.

Store the state in the Message

As we mentioned earlier, we actually want to validate the charge before charging the customer. Let’s have a look at the requirement.

Requirement

Imagine that we have three credit card validators throughout our organization. One is free, one is expensive and one is a very expensive fraud detection algorithm. Considering that validating credit card charges cost money, the business wants to minimize this cost. To do this, we want to always first pass the charge through the Free Credit Card Validation. If a charge is larger than 500$, we also want to use the Expensive Credit Card Validator. If it’s larger than 2000$, then we also want to use the Very Expensive Fraud Detection Algorithm. Basically, we want to pass through a series of validation steps depending on the order value.

Routing Slip - Requirement

Pattern

Luckily, the Enterprise Integration Patterns book, describes a pattern that is a good fit in this situation: the Routing Slip.

How do we route a message consecutively through a series of processing steps when the sequence of steps is not known at design-time and may vary for each message?

Routing Slip
Source: http://www.enterpriseintegrationpatterns.com/patterns/messaging/RoutingTable.html

Attach a Routing Slip to each message, specifying the sequence of processing steps. Wrap each
component with a special message router that reads the Routing Slip and routes the message to
the next component in the list.

Implementation

Routing Slip Example

This is how we would use the Routing Slip pattern in our case. The OrderPlacedHandler would compute the routing slip and attach it to the ValidateCreditCardCharge message. The routing slip infrastructure would take care of routing this message to the correct endpoints.

Let’s dive deeper into the code.

The OrderPlacedHandler computes the routing slip for the message and then routes the message. (NServiceBus doesn’t support routing slips out of the box. You need to use the RoutingSlip NuGet package)

public Task Handle(IOrderPlaced message, IMessageHandlerContext context)
{
	Log.Info($"Order with Id {message.OrderId} placed. Validating...");

	var order = new Order(message.OrderId, message.TotalValue);
	order.Status = OrderStatus.Validating;

	orders.Save(order);

	var validateCreditCardCharge =
		new ValidateCreditCardCharge { Amount = message.TotalValue, CorrelationId = order.OrderId };

	List<string> destinations = GetDestinationsFor(validateCreditCardCharge);
	context.Route(validateCreditCardCharge, order.OrderId, destinations.ToArray());

	return Task.CompletedTask;
}

The GetDestinationsFor method computes the routing slips based on the order’s total amount. This adds all the required validators. At the end, it also adds the Finance Endpoint, which will act as the results aggregator.

private static List<string> GetDestinationsFor(ValidateCreditCardCharge validateCreditCardCharge)
{
	var destinations = new List<string> { "ItOps.FreeCreditCardValidator.Endpoint" };
	if (validateCreditCardCharge.Amount > 500)
	{
		destinations.Add("ItOps.ExpensiveCreditCardValidator.Endpoint");
	}

	if (validateCreditCardCharge.Amount > 2000)
	{
		destinations.Add("ItOps.VeryExpensiveFraudDetection.Endpoint");
	}

	destinations.Add("Finance.Endpoint");
	return destinations;
}

Let’s see how will the ExpensiveCreditCardValidator handle this message:

public Task Handle(ValidateCreditCardCharge message, IMessageHandlerContext context)
{
	Log.Info($"Validating Credit Card Charge with Id {message.CorrelationId} is expensive!");

	if (!IsValid(message))
	{
		var routingSlip = context.Extensions.Get<RoutingSlip>();
		routingSlip.Attachments["ExpensiveCreditCardValidator.ValidationError"] = "ExpensiveCreditCardValidator found an issue with the charge";
		routingSlip.RouteToLastStep();
	}

	return Task.CompletedTask;
}

If the charge is not valid, then we add an attachment to the routing slip, in order to inform the results aggregator. We also short circuit the process and route the message to the last step. If the charge is valid, we don’t do anything. We simply let the router do its job.

As we said, the Finance.Endpoint is the results aggregator. When it handles the ValidateCreditCardCharge message, it inspects the routing slip and makes decisions based on its attachments.

public Task Handle(ValidateCreditCardCharge message, IMessageHandlerContext context)
{
	var routingSlip = context.Extensions.Get<RoutingSlip>();
	var order = orders.GetById(message.CorrelationId);

	if (routingSlip.Attachments.Any())
	{
		order.Status = OrderStatus.ValidationFailed;
		LogResponse(routingSlip.Attachments, order.OrderId);
	}
	else
	{
		order.Status = OrderStatus.Pending;
		context.Send(new ChargeCreditCardRequest { CorrelationId = order.OrderId, Amount = order.TotalValue });
	}

	orders.Save(order);

	return Task.CompletedTask;
}

Benefits

No bottlenecks

One of the main advantages of the routing slips is that it doesn’t introduce any bottlenecks. Jimmy Bogard has a good presentation on how he used the Routing Slip pattern to perform search database enrichment. (The routing slip part starts at the 12:30 minute mark).

Easy to change

Not only that there isn’t a bottleneck, but the routing logic is also in a single place. This makes it easier to understand and change. If you want to add a new validator or change the validation rules, you only need to update the OrderPlacedHandler.

Low coupling

The components in this design are loosely coupled. The validation steps don’t know about each other. This makes it easy to update the validation process without changing the existing steps.

Drawbacks

Harder to monitor

The downside of this solution is that it’s harder to debug and monitor. If you want to know the validation status for an order, you actually have take a look at the message. The message headers will contain the itinerary (where has the message been and where it will go next) and any attachments.

Tips

Although it doesn’t have many downsides, the routing slip pattern has a big limitation: you can use this solution only if the process steps can be determined upfront for each message and the process is sequential. This means that steps cannot depend on intermediate results. Use Routing Slips when the process steps can be determined upfront for each message. But, in many cases, you need to know what messages were received in order to decide what to do next. Or the process isn’t sequential (it contains some parallel steps). In these cases, the Process Manager pattern could help. Let’s see how.

Store the state in a Process Instance

The third approach is to store the state in a process instance. Let’s have a look at our shipping policy to see why would we need this.

Requirement

Business people had a look at different shipping providers and came up with the following shipping policy:

  • First, attempt to ship with Fan Courier.
  • If cannot ship with Fan Courier, attempt to ship with Urgent Cargus.
  • If we did not receive a response from Fan Courier within the agreed SLA, cancel the Fan Courier shipment and attempt to ship with Urgent Cargus.
  • If we cannot ship with Urgent Cargus or did not receive a response within the agreed SLA, notify the IT department.

Pattern

Enterprise Integration Patterns  are here to help!  The Process Manager pattern points us in the right direction.

How do we route a message through multiple processing steps when the required steps may not be known at design-time and may not be sequential?

Process Manager
source: http://www.enterpriseintegrationpatterns.com/patterns/messaging/ProcessManager.html

Use a central processing unit, a Process Manager, to maintain the state of the sequence and determine the next processing step based on intermediate results.

The Process Manager can be just another type of entity (although I don’t recommend implementing it yourself). Or you could use a framework. NServiceBus and MassTransit have the Saga construct. Another option is to use a lightweight workflow engine, like Camunda.

Implementation

Process Manager Example

Let’s see the state for a process instance:

public class ShipOrderProcessData : IContainSagaData
{
	public Guid Id { get; set; }

	public string Originator { get; set; }

	public string OriginalMessageId { get; set; }

	public Guid OrderId { get; set; }

	public ShippingStatus Status { get; set; }
}

public enum ShippingStatus
{
	ShippingWithUrgentCargus,

	ShippingWithFanCourier
}

The first three properties are required by NServiceBus. They represent the Id of the long running process, the Id of the message that started the process and the endpoint that sent this message. The last two represent our custom state: the order id and a shipping status.

Now let’s look at the behavior of the long running process.

public class ShipOrderProcessManager : Saga<ShipOrderProcessData>,
									  IAmStartedByMessages<ShipOrder>,
									  IHandleMessages<ShipWithFanCourierResponse>,
									  IHandleMessages<ShipWithUrgentCargusResponse>,
									  IHandleTimeouts<DidNotReceiveAResponseFromFanCourierTimeout>,
									  IHandleTimeouts<DidNotReceiveAResponseFromUrgentCargusTimeout>
{
	private static readonly ILog Log = LogManager.GetLogger(typeof(ShipOrderProcessManager));

	private readonly TimeSpan shipmentSla = TimeSpan.FromSeconds(10);

	public Task Handle(ShipOrder message, IMessageHandlerContext context)
	{
		Log.Info($"Received ShipOrder command for Order with Id {message.OrderId}.");
		Log.Info($"Attempting to ship Order with Id {Data.OrderId} with FanCourier.");

		Data.OrderId = message.OrderId;
		Data.Status = ShippingStatus.ShippingWithFanCourier;

		context.Send(new ShipWithFanCourierRequest { CorrelationId = Data.OrderId });

		RequestTimeout(context, shipmentSla, new DidNotReceiveAResponseFromFanCourierTimeout());

		return Task.CompletedTask;
	}

	public Task Handle(ShipWithFanCourierResponse message, IMessageHandlerContext context)
	{
		if (message.PackageShipped)
		{
			if (Data.Status == ShippingStatus.ShippingWithFanCourier)
			{
				context.Publish<IOrderShipped>(m => { m.OrderId = Data.OrderId; });

				// Mark the process as complete. 
				// This will remove the process instance from the data store.
				MarkAsComplete();

				Log.Info($"Done shipping Order with Id {Data.OrderId}.");
			}
		}
		else
		{
			ShipWithUrgentCargus(context);
		}

		return Task.CompletedTask;
	}

	public Task Handle(ShipWithUrgentCargusResponse message, IMessageHandlerContext context)
	{
		if (message.PackageShipped)
		{
			context.Publish<IOrderShipped>(m => { m.OrderId = Data.OrderId; });
			MarkAsComplete();

			Log.Info($"Done shipping Order with Id {Data.OrderId}.");
		}
		else
		{
			throw new CannotShipOrderException(Data.OrderId);
		}

		return Task.CompletedTask;
	}

	public Task Timeout(DidNotReceiveAResponseFromFanCourierTimeout state, IMessageHandlerContext context)
	{
		if (Data.Status == ShippingStatus.ShippingWithFanCourier)
		{
			context.Send(new CancelFanCourierShipping { CorrelationId = Data.OrderId });
			ShipWithUrgentCargus(context);
		}

		return Task.CompletedTask;
	}

	public Task Timeout(DidNotReceiveAResponseFromUrgentCargusTimeout state, IMessageHandlerContext context)
	{
		throw new CannotShipOrderException(Data.OrderId);
	}

	protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderProcessData> mapper)
	{
		mapper.ConfigureMapping<ShipOrder>(message => message.OrderId).ToSaga(sagaData => sagaData.OrderId);
	}

	private void ShipWithUrgentCargus(IMessageHandlerContext context)
	{
		Log.Info($"Attempting to ship Order with Id {Data.OrderId} with UrgentCargus.");

		Data.Status = ShippingStatus.ShippingWithUrgentCargus;
		context.Send(new ShipWithUrgentCargusRequest { CorrelationId = Data.OrderId });

		RequestTimeout(context, shipmentSla, new DidNotReceiveAResponseFromUrgentCargusTimeout());
	}
}

Benefits

Can handle complex flows

The Process Manager pattern is very flexible. You can probably use it to implement any long running process, no matter how complex. Even if you can, that doesn’t mean you should, because it has its own drawbacks. But it’s a very good tool in the toolbox.

Easy to change

This implementation is easy to understand and change, because the logic in in a single place.

Good encapsulation

The ShipOrderProcessManager contains the entire shipping policy. Its data (the ShipOrderProcessData) is private to the process manager. Its behavior is exposed through the messages that it can handle. This keeps the data and the behavior in a single place and you don’t need to add batch jobs.

Drawbacks

“God” class

Because the Process Manager is easy to understand, we can abuse it and put too much logic in it. We must make sure that it adheres to the Single Responsibility Principle, so it doesn’t become a “God” class. A Process Manager implementation can grow and you’ll need to refactor it. Just like any other class.

Performance bottleneck

Because the Process Manager determines the next step in the sequence, it needs to handle all response messages. This can become a bottleneck. Jimmy Bogard has a good article on reducing the load on a Process Manager.

Tips

  • Use a Process Manager when the process changes based on intermediate results.
  • Use a Process Manager when the process is complex. This pattern can handle complexity really well.

Other Examples

In the accompanying code, we have used this pattern in several places. For example:

Conclusion

This article presented three strategies for storing the state of a long running process. I personally find myself using the Process Manager most often. The routing slip pattern, although useful, has limited applicability (because of the restrictions outlined above). I do sometimes store the state of the process in a domain entity, but with a twist: we use NServiceBus Sagas to implement some of our entities.

Here are some useful resources:

In the next article we’ll go over some patterns for handling failure.