Conquering NServiceBus part 3 – A simple Saga

by Mikael Henriksson 1. April 2010 15:44

This post is part of a series and the source code can be found at http://github.com/MikaelHenrixon/ConqueringNServiceBus

  1. Conquering NServiceBus part 1 – Getting Started
  2. Conquering NServiceBus part 2 – Initial configuration
  3. Conquering NServiceBus part 3 – A simple Saga
  4. Conquering NServiceBus part 4 – Testing
  5. Conquering NServiceBus part 5 – Troubleshooting DTC
  6. Conquering NServiceBus part 6 – Upgrading StructureMap

Ok, so we have configured a way of sending and receiving messages but it’s still very basic. How can I achieve what I was originally raving about with pushing many small messages to somewhere and keeping it alive for a certain given period after which it should be either finalized and passed on to another endpoint or invalidated and deleted. Preferably with some sort of message going out about what happened but you can probably figure that out yourself but we will come to that.

Most of what we need for the saga is located in “NServiceBus.Saga”. Lets try and make this really simple the basic needs are:

  1. A message to begin the saga
  2. A message that continues the saga
  3. A message that ends the saga
  4. Message handler for all the saga messages
  5. A class to handle the data for the saga

Let us begin with the messages that, without those the rest is not important.

public class BeginSagaMessage : IMessage
{
	public int SomeProcessId {get;set;}
	public DateTime BeginDate {get;set;}	
}

public class ContinueSagaMessage : IMessage
{
	public int SomeProcessId {get;set;}
	public int Foo {get;set;}
	public string Bar {get;set;}	
}

public class EndSagaMessage : IMessage
{
	public int SomeProcessId {get;set;}
	public string Message {get;set;}
}

 

These are three very simple messages and I suppose it does not make sense at the moment for such a simple scenario but stay with me. It’s really easy to complicate things but before we do we should make it work.

So the above messages will be sent from wherever need to be and in the previous post in the series I talked about how to send messages. To me it is much easier to start there since it’s less work getting that working. Now comes the time to configure the receiving end. Now we need to create a new MessageHandler but this time we will make it a bit special.

public class MySaga : Saga<MySagaData>
                      , IAmStartedByMessages<BeginSagaMessage>
                      , IHandleMessages<ContinueSagaMessage>
                      , IHandleMessages<EndSagaMessage>
{

    public void Handle(BeginSagaMessage message)
    {
        Data.BeginDate = message.BeginDate;
        Data.SomeProcessId = message.SomeProcessId;
    }

    public void Handle(ContinueSagaMessage message)
    {
        var child = new Child
        {
            Foo = message.Foo,
            Bar = message.Bar
        };
        Data.AddChild(child);
    }

    public void Handle(EndSagaMessage message)
    {
        Data.CompletionMessage = message.Message;
        MarkAsComplete();
    }
}

The first thing that is important to notice is  that we inherit from the base class Saga<T> which provides functionality for handling the saga information (persisting it to the chosen storage etc). I will go through how to customize pretty much everything to suite your needs.

Next thing is that we specify what messages start the saga. This is important because if the saga does not exist for when this type of message is received a new Saga will be created. If a message of other types come a Saga won’t be created. The only thing that happens then is that a message is logged that the Saga could not be found.  As you can see above It looks like magic at first glance. This is a big win for convention over configuration. I remember the first time I had a look at NServiceBus it looked a lot different :)

All the processing and persistence is handled in the background and by default NServiceBus does a good job of AutoMapping “YourSagaData” class to a database of your choosing but I was not happy with that. Mostly because I was curious to how it works (and partly because I had to follow DBA guidelines).

Let’s start with the persistent objects then.

public class MySagaData : IContainSagaData
{
    public virtual int SomeProcessId { get; set; }
    public virtual DateTime BeginDate { get; set; }
    public virtual ICollection<Child> Children { get; set; }
    public virtual string CompletionMessage { get; set; }

    /// <summary>
    ///     The 3 last properties belong to the 
    ///     SagaData and is intented for internal use 
    ///     only
    /// </summary>
    public virtual Guid Id { get; set; }

    public virtual string Originator { get; set; }
    public virtual string OriginalMessageId { get; set; }

    public virtual void AddChild(Child child)
    {
        child.Parent = this;
        Children.Add(child);
    }
}

public class Child
{
    public virtual int InternalId { get; set; }
    public virtual MySagaData Parent { get; set; }
    public virtual int Foo { get; set; }
    public virtual string Bar { get; set; }
}

Nothing really special, implement interface IContainsSagaData gives you 3 properties that you should treat as non existing. These are used by NServiceBus internally. Getting this far is not a problem. We can run the saga with Production profile which should AutoMap the classes above to a database and then the only thing we need to do is provide a connection string.

What next then if I want to handle persistence myself?

public class MySagaDataMap : ClassMap<MySagaData>
{
    public MySagaDataMap()
    {
        Id(x => x.Id);
        Map(x => x.OriginalMessageId);
        Map(x => x.Originator);

        Map(x => x.SomeProcessId);
        Map(x => x.BeginDate);
        Map(x => x.CompletionMessage);

        HasMany(x => x.Children)
            .KeyColumn("SomeProcessId")
            .Cascade.All()
            .Inverse();
    }
}

public class ChildMap : ClassMap<Child>
{
    public ChildMap()
    {
        Id(x => x.InternalId);
        Map(x => x.Foo);
        Map(x => x.Bar);

        References(x => x.Parent)
            .Column("SomeProcessId");
    }
}

That gives us the mappings we need but we need more! Let’s create a SagaPersister and implementing ISagaPersister should be straightforward.

public class MySagaPersister : ISagaPersister
{
    public MySagaPersister(ISessionFactory sessionFactory)
    {
        SessionFactory = sessionFactory;
    }

    public ISessionFactory SessionFactory { get; set; }

    public void Save(ISagaEntity saga)
    {
        var session = SessionFactory.GetCurrentSession();
        session.Save(saga);
    }

    public void Update(ISagaEntity saga)
    {
        var session = SessionFactory.GetCurrentSession();
        session.Merge(saga);
    }

    public T Get<T>(Guid sagaId) where T : ISagaEntity
    {
        return SessionFactory.GetCurrentSession().CreateCriteria(typeof (T), "b")
            .Add(Restrictions.Eq("b.SomeProcessId", sagaId))
            .SetFetchMode("b.Children", FetchMode.Eager)
            .SetCacheable(true)
            .SetCacheMode(CacheMode.Normal)
            .UniqueResult<T>();
    }

    public T Get<T>(string property, object value) where T : ISagaEntity
    {
        return SessionFactory.GetCurrentSession().CreateCriteria(typeof (T), "b")
            .Add(Restrictions.Eq("b." + property, value))
            .SetFetchMode("b.Children", FetchMode.Eager)
            .UniqueResult<T>();
    }

    public void Complete(ISagaEntity saga)
    {
        SessionFactory.GetCurrentSession().Delete(saga);
    }
}

EDIT 2010-08-23 : This IS in the scope of NServiceBus and I’ll make a couple of suggestions here. The reason for the constructor dependency is to force whatever container to inject a SessionFactory into the class. This can be done through registering the persister with NServiceBus own API like below;

public static class Extensions
{
    public static Configure SetSagaPersister(this Configure config, IContainer container)
    {
        config.Configurer.RegisterSingleton(typeof (ISagaPersister),
            new MySagaPersister(container.GetInstance<ISessionFactory>()));

        return config;
    }
}

This extension method would be use at configuration time. Just add it after adding the sagas and you are good to go. Next up finding sagas…

The first alternative is to override ConfigureHowToFindSaga in the MySaga class.

public override void ConfigureHowToFindSaga()
{
	ConfigureMapping<ContinueSagaMessage>(t => t.SomeProcessId, m => m.SomeProcessId);
	ConfigureMapping<EndSagaMessage>(t => t.SomeProcessId, m => m.SomeProcessId);
}
This is what happens above:
protected void ConfigureMapping<TMessage>(
		Expression<Func<T, object>> sagaEntityProperty, 
		Expression<Func<TMessage, object>> messageProperty) 
	where TMessage : IMessage;

You pass in the MessageType and and then specify what properties should be compared for equality. This happens in the background and you will never see this taking place. It works fine for most scenarios but I wanted to eagerly load my “children” and wanted to prematurely optimize things a bit so I decided to go for another approach  which is to create my own Finder and it’s actually not that hard.

public class MySagaFinder :
    IFindSagas<MySagaData>.Using<ContinueSagaMessage>
    , IFindSagas<MySagaData>.Using<EndSagaMessage>
{
    public ISessionFactory SessionFactory { get; set; }
    public ISagaPersister Persister { get; set; }

    public MySagaData FindBy(ContinueSagaMessage message)
    {
        return FindBy(message.SomeProcessId);
    }
    
    public MySagaData FindBy(EndSagaMessage message)
    {
        return FindBy(message.SomeProcessId);
    }

    public MySagaData FindBy(Guid id)
    {
        return SessionFactory.GetCurrentSession().QueryOver<MySagaData>()
			.Where(x => x.SomeProcessId == id)
			.Fetch(x => x.Children)
			.Eager
			.Cacheable()
			.SingleOrDefault();
    }
}

 

It don’t get much easier than that. Just creating this class and putting it in your assembly should cause NServiceBus to use the “MySagaFinder”. Now we need to change the endpoint config a little to make use of  the latest changes (read previous post). We could end up with something like:

public class EndpointConfig : IConfigureThisEndpoint, IWantCustomInitialization
{
    private readonly IEnumerable<Assembly> _assemblies = new List<Assembly>
    {
        typeof (BeginSagaMessage).Assembly,
        typeof (EndpointConfig).Assembly,
        Assembly.Load("NServiceBus"),
        Assembly.Load("NServiceBus.Core"),
        Assembly.Load("NServiceBus.Host")
    };

    private IContainer _container;

    public void Init()
    {
        var file = new FileInfo("log4net.config");
        if (file.Exists) {
            XmlConfigurator.Configure(file);
        }

        InitializeSagaConnection();
        InitializeNServiceBus();
    }

    private void InitializeSagaConnection()
    {
        ObjectFactory.Initialize(x => x.AddRegistry<SagaRegistry>());
        _container = ObjectFactory.Container;
    }

    private void InitializeNServiceBus()
    {
        Configure.With(_assemblies)
            .StructureMapBuilder(_container)
            .Log4Net()
            .XmlSerializer()
            .MsmqTransport()
                .IsTransactional(true)
                .IsolationLevel(IsolationLevel.RepeatableRead)
            .UnicastBus()
                .LoadMessageHandlers()
            .Sagas()
            .SetSagaPersister(_container);
    }
}

SagaRegistry is just a place to initialize the MySaga NHibernate configuration and add what’s needed to the StructureMap container.

public class SagaRegistry : Registry
{
    public SagaRegistry()
    {
        Config = Fluently.Configure()
            .Database(MsSqlConfiguration.MsSql2008.ConnectionString(x => x.FromConnectionStringWithKey(Environment.MachineName))
                          .Cache(c =>
                                 c.UseQueryCache()
                                     .QueryCacheFactory<StandardQueryCacheFactory>()
                                     .ProviderClass<HashtableCacheProvider>()
                                     .UseMinimalPuts())
                          .UseReflectionOptimizer()
                          .MaxFetchDepth(2)
                          .AdoNetBatchSize(24)
                          .CurrentSessionContext<ThreadStaticSessionContext>()
            ).Mappings(m => m.FluentMappings.AddFromAssemblyOf<MySagaDataMap>()).BuildConfiguration();
        Config.SetProperty(NHibernate.Cfg.Environment.GenerateStatistics, "true");
        Config.SetProperty(NHibernate.Cfg.Environment.PropertyUseReflectionOptimizer, "true");

        ForSingletonOf<Configuration>()
            .Use(x => Config);

        Factory = Config.BuildSessionFactory();
        ForSingletonOf<ISessionFactory>()
            .Use(x => Factory)
            .WithName(Environment.MachineName);
    }

    public Configuration Config { get; set; }
    public ISessionFactory Factory { get; set; }
}

here I specify that when ISessionFactory is requested from the container the one I just created is returned and I chose to handle the containers choice of ISagaPersister here as well since that is very much tied to my NHibernate configuration.

Unfortunately this is not the end of things. Now you have all the parts in place, you have truly customized the way NServiceBus works but this won’t be done unless you create a profile and tell NServiceBus to run only that profile. If you would start NServiceBus like this it would run the default profile which is integration  with SQLite database with automatic schema generation for storage. It is time to specify what profile to run.

public class MyProfile : IProfile
{
}

Now when you start the debugger you just add the full name (including namespaces)

vs2008_debug_options

Hope this helps!

Tags:

NServiceBus

blog comments powered by Disqus

About the author

Life architect specialized in programming