多的消费者通过统一不工作MassTransit相同的消息消费者、消息、工作、统一不

2023-09-04 00:33:03 作者:限量版溫柔

我有一个很大的问题,因为最近似乎是在错误 MassTransit.UnityIntegration 包,主要是由于该注册名称不被认为是事实。

举例来说,如果我注册我的课是这样的:

  VAR集装箱=​​新UnityContainer()
    .RegisterType<消费< COMMAND1>。所有,Handler1>(Handler1)
    .RegisterType<消费< COMMAND1>。所有,Handler3>(Handler3);
 

这几行后,我用的是 LoadFrom 扩展方法来获得注册的消费者在容器是这样的:

  IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom(MSMQ://本地主机/ MyQueue的);
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService(MSMQ://本地主机/ mt_subscriptions);
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(_s => _s.LoadFrom(容器));
    });
 

什么情况是,我的处理程序不会被调用相关消息打不到车的时候。

消费者在哪 在无数个 楚门的世界 里

在琢磨了一会儿,我决定去看看的执行情况和很明显,为什么出现这种情况:

这是内部的主要code中的 LoadFrom 方法:

 公共静态无效LoadFrom(这SubscriptionBusServiceConfigurator配置,IUnityContainer容器)
{
    IList的<类型> concreteTypes = FindTypes&其中; IConsumer>(容器中,x =>!x.Implements&其中; ISaga>());
    如果(concreteTypes.Count大于0)
    {
        VAR consumerConfigurator =新UnityConsumerFactoryConfigurator(配置器,容器);

        的foreach(在concreteTypes型concreteType)
            consumerConfigurator.ConfigureConsumer(concreteType);
    }

    ...

}
 

请注意,它只能找到的类型和不通过的名称正向的任何信息。这是 FindTypes< T> 实施

 静态的IList<类型> FindTypes< T>(IUnityContainer容器,Func键<类型,布尔过滤器过滤)
{
    返回container.Registrations
                        。凡(R => r.MappedToType.Implements< T>())
                        。选择(R => r.MappedToType)
                        。凡(过滤器)
                        .ToList();
}
 

在一些迂回,这一切都归结到这一行,在 UnityConsumerFactory&LT内部; T> 类,实际上造成了消费者的实例:

  VAR消费= childContainer.Resolve< T>();
 

这绝对不会与团结工作,当有多个登记,因为登记的唯一途径(然后解析)的多种实现在Unity是给他们上的 RegisterType 打电话,后来就上指定此名称解析电话。

也许我失去了一些东西在这一切完全基本的和错误是我的一部分?在源MassTransit统一的组件可以发现此处。我没有看在$ C $下的其他容器,因为我不熟悉他们,但我认为这已经在某种程度上被处理?我认为具有一个以上的消费者对同一容器内的相同的消息类型实际上是相当普遍的。

在这种特殊情况下,这将是更好地传递下去,不仅键入从注册在容器中,也用于注册的名称。

更新

那么问题就比较清楚一点,现在特拉维斯花时间来解释它。我应该更早注意到了这一点。

看来我应该直接注册的类型,他们到工厂内被正确解决,是这样的:

  VAR集装箱=​​新UnityContainer()
    .RegisterType< Handler1>()
    .RegisterType&其中; Handler3>();
 

通过这种方式,我也可以省略登记名字,因为现在容器内其生成密钥是不同的。

那么,这将很好地工作,如果这是我们的现实情况,但事实并非如此。让我来解释究竟是什么,我们正在做的:

在我们开始使用MassTransit,我们已经有了用于命令模式的界面,名为 ICommandHandler< TCommand> ,其中 TCommand 是为系统中的命令的基本模型。当我们开始考虑使用一个服务总线,很明显,从一开始走,它应该有可能以后切换到另一种服务总线实现没有太多的麻烦。考虑到这一点,我着手创建一个抽象的概念在我们的指挥界面表现得像消费者一个MT的期望。这就是我想出了:

 公共类CommandHandlerToConsumerAdapter< T> :消费< T>。所有
    其中T:类,ICommand的
{
    私人只读ICommandHandler< T> _commandHandler;

    公共CommandHandlerToConsumerAdapter(ICommandHandler< T> commandHandler)
    {
        _commandHandler = commandHandler;
    }

    公共无效消耗(T _Message)
    {
        _commandHandler.Handle(_Message);
    }
}
 

这是一个非常简单的适配器类。它接收一个 ICommandHandler< T> 的实施,并使其像一个消耗< T>。所有实例。遗憾的是,MT需要消息模型是类的,因为我们没有对我们的命令,这些命令的约束,但是这是一个小的不便,我们继续添加其中T:类制约我们的接口。

然后,由于我们的处理程序的接口,在容器中已注册,这将是注册的MT接口与该适配器实现,并可以使容器注入真正实现了它的问题。举例来说,一个更现实的例子(从我们的code基直取):

  .RegisterType< ICommandHandler< ApplicationInstallationCommand>中CommandRecorder>(记录)
.RegisterType< ICommandHandler< ApplicationInstallationCommand>中InstallOperation>(遗嘱执行人)
.RegisterType<消费< ApplicationInstallationResult>。所有,CommandHandlerToConsumerAdapter< ApplicationInstallationResult>>()
.RegisterType<消费< ApplicationInstallationCommand>。所有,CommandHandlerToConsumerAdapter< ApplicationInstallationCommand>>
  (记录,新InjectionConstructor(新ResolvedParameter< ICommandHandler< ApplicationInstallationCommand>>(记录)))
.RegisterType<消费< ApplicationInstallationCommand>。所有,CommandHandlerToConsumerAdapter< ApplicationInstallationCommand>>
  (执行人,新InjectionConstructor(新ResolvedParameter< ICommandHandler< ApplicationInstallationCommand>>(遗嘱执行人)))
 

命名注册有一点令人费解,但必须的,因为我们现在有两个消费者对同样的信息。虽然并不像我们希望的那样干净,我们可以忍受的,因为这促进了我们的code巨大的去耦MassTransit特定的逻辑:适配器类是在一个单独的程序,由最后一层只引用的系统中,容器登记用途。这似乎是一个很不错的主意,但现在证实不支持容器集成类背后的查找逻辑。

请注意,我无法在这里注册的具体类,因为在中央的通用适配器类。

更新2:

在以下特拉维斯的意见,我想这个简单的code这也不能正常工作(我不明白为什么,因为它似乎完全有效)。这是一个明确的消费类工厂登记没有任何自动容器集成:

  _sbc.Consume(()=> container.resolve<消费< ApplicationInstallationCommand>。所有>(记录))
 

这解析调用正确的给我pviously注册了$ P $ CommandHandlerToConsumerAdapter< ApplicationInstallationCommand> 实例,它实现了消耗< ApplicationInstallationCommand>。所有的,这反过来应该是支持的基本接口之一。发布的 ApplicationInstallationCommand 权后,这并不算什么,因为如果处理程序是无效的或类似的东西。

这工作虽然:

  _sbc.Consume(()=>(CommandHandlerToConsumerAdapter< ApplicationInstallationCommand>)container.resolve<消费< ApplicationInstallationCommand>。所有>(记录))
 

很显然,一些深跌的API中被处理,而不是通用接口基础上自行编译类型各种各样的非通用的方式。

我的意思是......这是可行的与此,但报名code是越来越费解,没有明显的理由(因为我会考虑作为MT的一部分非标准的实施细则)。也许我只是抓住救命稻草吗?也许这一切归结为为什么MT不能接受它自己的,已经通用的接口?为什么它需要具体类型在编译时看到,这是即使是我传递给它的实例的类型为消耗℃的消息处理程序,X>。所有,同时在编译的时候?

更新3:

在与特拉维斯下面讨论,我决定彻底放下UnityIntegration组装和去独立消费者呼吁订阅。

我创建了一个小的扩展类在我们MassTransit特定的汇编,方便的事情:

 公共静态类CommandHandlerEx
{
    公共静态CommandHandlerToConsumerAdapter< T> ToConsumer< T>(这ICommandHandler< T> _handler)
        其中T:类,ICommand的
    {
        返回新CommandHandlerToConsumerAdapter< T>(_处理程序);
    }
}
 

最后注册这样的处理程序:

  VAR集装箱=​​新UnityContainer()
    .RegisterType< ICommandHandler< ApplicationInstallationCommand>中CommandRecorder>(记录)
    .RegisterType< ICommandHandler< ApplicationInstallationCommand>中InstallOperation>(遗嘱执行人);

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom(MSMQ://本地主机/ MyQueue的);
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService(MSMQ://本地主机/ mt_subscriptions);
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(RegisterConsumers);
    });

私人无效RegisterConsumers(SubscriptionBusServiceConfigurator _s)
{
    _s.Consumer(()=&GT; container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Recorder").ToConsumer());
    _s.Consumer(()=&GT; container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Executor").ToConsumer());
}
 

采用全前天去尝试工作的事情出来后,我强烈建议你远离容器扩展组件,如果你想预期的行为,从容器中和/或如果你想定制类等(如我没有脱钩我们从MT专用code消息类)2主要的原因:

在扩展的逻辑遍历在容器中的注册找到消费者类。这一点,在我看来,可怕的设计。如果有什么东西想从容器的实现,它应该只需要调用解决 ResolveAll 在它的界面(或其等值非统一计算),不关心究竟是什么登记,以及他们的具体类型。这可以有code假定容器可以返回没有明确登记类型的严重后果。幸运的是,它不是与这些类的话,但我们有一个容器扩展名自动创建一个基于构建的关键装饰类型,并且他们并不需要明确在容器上注册。

消费者注册使用的 ContainerRegistration 实例 MappedToType 属性调用解决在容器上。这仅仅是在任何情况下,不只是在MassTransit的情况下完全错误的。在统一的类型是注册为一个映射(上面就像在摘录,以成分)或直接作为一个具体类型。在这两种情况下,逻辑应该使用 RegisteredType 键入从容器来解决。它的工作原理,现在的方式是,如果你碰巧有其接口登记处理,MT将完全绕过您注册逻辑,并呼吁解决的具体类型来代替,该works在统一开箱的,可能导致联合国predictable的行为,因为你认为它应该是一个单身像你注册,但它最终被一个瞬时对象(默认值),而不是,例如。

回首现在我可以看到它是更为复杂,我原本相信。有相当多的学习的过程中太,所以​​这是很好的。

更新4:

昨天,我决定做最后签之前,重构整个适配器的做法有点。我去与MassTransit的界面模式创建我的适配器也一样,因为我认为这是一个非常漂亮和干净的语法。

下面是结果:

 公共密封类CommandHandlerToConsumerAdapter&LT; T&GT;
    其中T:类,ICommand的
{
    公共密封类全部:消费&LT; T&GT;。所有
    {
        私人只读ICommandHandler&LT; T&GT; m_commandHandler;

        公众所有(ICommandHandler&LT; T&GT; _commandHandler)
        {
            m_commandHandler = _commandHandler;
        }

        公共无效消耗(T _Message)
        {
            m_commandHandler.Handle(_Message);
        }
    }
}
 

很遗憾这打破,因为在被引用的Magnum库中的实用方法,出现未处理的异常MassTransit的code,在一个名为 ToShortTypeName

扩展方法

下面是例外:

  

在System.String.Substring(的Int32在startIndex,的Int32长度)      在Magnum.Extensions.ExtensionsToType.ToShortTypeName(类型类型)      在MassTransit.Pipeline.Sinks.ConsumerMessageSink2.<>c__DisplayClass1.<Selector>b__0(IConsumeContext1上下文)在d:\ BuildAgent-02 \工作\ aa063b4295dfc097的\ src \ MassTransit \管道\水槽\ ConsumerMessageSink.cs:行51      在MassTransit.Pipeline.Sinks.InboundConvertMessageSink`1.<>c_DisplayClass2.<>c_DisplayClass4.b__1(IConsumeContext X)在d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Pipeline\Sinks\InboundConvertMessageSink.cs:line 45      在MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(IReceiveContext上下文)在d:\ BuildAgent-02 \工作\ aa063b4295dfc097的\ src \ MassTransit \背景\ ServiceBusReceiveContext.cs:行162

解决方案

虽然我不知道统一整合,所有的容器,你必须注册你的消费者为具体类型的容器,而不是消耗&LT;&GT; 接口。我认为它只是 RegisterType&LT; Handler1,Handler1&GT;()但我不能完全肯定这一点。

如果你不喜欢 LoadFrom 扩展你的容器,你不需要反正使用它。你可以永远只是解决了消费者自己,并通过 _sbc.Consume注册它们(()=&GT; container.resolve&LT; YourConsumerType&GT;())的配置,而不​​是。该 LoadFrom 扩展只是一个说服了谁是在一个共同的方式使用容器的人。

下面code ++工程,它是使用容器我所期望的样子,不知道您的域多,人们使用它。如果你想了解的信息也必然要好一点,我建议使用RabbitMQ的,因为你可以很容易地看到那里的东西最终被撂荒的交流绑定。在这一点上,这是远远超出了一个SO问题,我想,如果你再有什么把这个邮件列表。

 使用系统;
使用MassTransit;
使用Microsoft.P​​ractices.Unity;

命名空间MT_Unity
{
    类节目
    {
        静态无效的主要(字串[] args)
        {
            使用(VAR集装箱=​​新UnityContainer()
                .RegisterType&其中; ICommandHandler&其中; MyCommand&gt;中MyCommandHandler&GT;()
                .RegisterType&其中; CommandHandlerToConsumerAdapter&其中; MyCommand&GT;&GT;())

            使用(IServiceBus consumerBus = ServiceBusFactory.New(SBC =&GT;
                    {
                        sbc.ReceiveFrom(RabbitMQ的://本地主机/客户);
                        sbc.UseRabbitMq();


                        sbc.Subscribe(S =&GT; s.Consumer(()=&GT; container.Resolve&其中; CommandHandlerToConsumerAdapter&其中; MyCommand&GT;&GT;()));
                    }))
            使用(IServiceBus publisherBus = ServiceBusFactory.New(SBC =&GT;
                    {
                        sbc.ReceiveFrom(RabbitMQ的://本地主机/发行人);
                        sbc.UseRabbitMq();
                    }))
            {
                publisherBus.Publish(新MyCommand());

                Console.ReadKey();
            }
        }
    }

    公共类CommandHandlerToConsumerAdapter&LT; T&GT; :消费&LT; T&GT;。所有其中T:类,ICommand的
    {
        私人只读ICommandHandler&LT; T&GT; _commandHandler;

        公共CommandHandlerToConsumerAdapter(ICommandHandler&LT; T&GT; commandHandler)
        {
            _commandHandler = commandHandler;
        }

        公共无效消耗(T消息)
        {
            _commandHandler.Handle(消息);
        }
    }

    公共接口的ICommand {}
    公共类MyCommand:ICommand的{}

    公共接口ICommandHandler&LT; T&GT;其中T:类,ICommand的
    {
        无效手柄(T消息);
    }

    公共类MyCommandHandler:ICommandHandler&LT; MyCommand&GT;
    {
        公共MyCommandHandler()
        {

        }
        公共无效手柄(MyCommand消息)
        {
            Console.WriteLine(处理的MyCommand);
        }
    }

}
 

I'm having a lot of problems lately because of what seems to be a bug in the MassTransit.UnityIntegration package, primarily due to the fact that registration names are not being considered.

For instance, if I register my classes like this:

var container = new UnityContainer()
    .RegisterType<Consumes<Command1>.All, Handler1>("Handler1")
    .RegisterType<Consumes<Command1>.All, Handler3>("Handler3");

A few lines later, I use the LoadFrom extension method to get the registered consumers in the container like this:

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(_s => _s.LoadFrom(container));
    });

What happens is that my handlers are never called when the associated messages hit the bus.

After pondering for a while, I decided to take a look at the implementation and it became clear why this happens:

This is the main code inside the LoadFrom method:

public static void LoadFrom(this SubscriptionBusServiceConfigurator configurator, IUnityContainer container)
{
    IList<Type> concreteTypes = FindTypes<IConsumer>(container, x => !x.Implements<ISaga>());
    if (concreteTypes.Count > 0)
    {
        var consumerConfigurator = new UnityConsumerFactoryConfigurator(configurator, container);

        foreach (Type concreteType in concreteTypes)
            consumerConfigurator.ConfigureConsumer(concreteType);
    }

    ...

}

Notice that it only finds the types and does not pass any information of the names forward. This is the FindTypes<T> implementation:

static IList<Type> FindTypes<T>(IUnityContainer container, Func<Type, bool> filter)
{
    return container.Registrations
                        .Where(r => r.MappedToType.Implements<T>())
                        .Select(r => r.MappedToType)
                        .Where(filter)
                        .ToList();
}

After a few indirections, it all comes down to this single line, inside the UnityConsumerFactory<T> class, that actually creates the instance of the consumer:

var consumer = childContainer.Resolve<T>();

This absolutely will not work with Unity when there are multiple registrations, because the only way to register (and then resolve) multiple implementations in Unity is to give them a name on the RegisterType call and later on specifying this name on the Resolve call.

Perhaps I'm missing something completely basic in all this and the error is on my part? The source for the MassTransit Unity components can be found here. I did not look into the code for the other containers because I'm not familiar with them, but I assume this has been handled in some way? I think having more than one consumer for the same message type inside the same container is actually quite common.

In this particular case, it would be better to pass along not only the Type from the registration in the container, but also the Name used for the registration.

Update

Well the problem is a bit more clear now that Travis took the time to explain it. I should have noticed it earlier.

It seems I should be registering the types directly for they to be correctly resolved inside the factory, like this:

var container = new UnityContainer()
    .RegisterType<Handler1>()
    .RegisterType<Handler3>();

With this approach, I can also omit the registration name, since now their build keys inside the container are different.

Well, this would work perfectly if this was our real scenario, but it isn't. Let me explain what exactly we are doing:

Before we started using MassTransit, we already had an interface used for the command pattern, called ICommandHandler<TCommand>, where TCommand is a base model for commands in the system. When we started considering the use of a service bus, it was clear from the get go that it should be possible to switch later on to another service bus implementation without much hassle. With that in mind, I proceeded to create an abstraction over our commanding interface to behave like one of the consumers that MT expects. This is what I came up with:

public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All
    where T : class, ICommand
{
    private readonly ICommandHandler<T> _commandHandler;

    public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
    {
        _commandHandler = commandHandler;
    }

    public void Consume(T _message)
    {
        _commandHandler.Handle(_message);
    }
}

It's a very simple adapter class. It receives an ICommandHandler<T> implementation and makes it behave like a Consumes<T>.All instance. It was unfortunate that MT required message models to be classes, since we did not have that constraint on our commands, but that was a small inconvenience, and we proceeded to add the where T : class constraint to our interfaces.

Then, since our handler interfaces were already registered in the container, it would be a matter of registering the MT interface with this adapter implementation and letting the container inject the real implementations over it. For instance, a more realistic example (taken straight from our code base):

.RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
.RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor")
.RegisterType<Consumes<ApplicationInstallationResult>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationResult>>()
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Recorder", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Recorder")))
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Executor", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Executor")))

The named registrations there are a bit convoluted but required, since we now have two consumers for the same message. Although not as clean as we hoped, we could live with that since this promotes a huge decoupling of our code from MassTransit specific logic: the adapter class is in a separate assembly, referenced ONLY by the final layer in the system, for container registration purposes. That seems like a very nice idea, but is confirmed unsupported now by the lookup logic behind the container integration classes.

Notice that I'm unable to register the concrete classes here, since there is a generic adapter class in the middle.

Update 2:

After following Travis' advice, I tried this simple code which also does not work (I fail to see why, as it seems perfectly valid). It's an explicit consumer factory registration without any automatic container integration:

_sbc.Consume(() => container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))

That resolve call correctly gives me the previously registered CommandHandlerToConsumerAdapter<ApplicationInstallationCommand> instance, which implements the Consumes<ApplicationInstallationCommand>.All, which in turn should be one of THE base interfaces supported. Publishing an ApplicationInstallationCommand right after this does nothing, as if the handler was invalid or something similar.

This works though:

_sbc.Consume(() => (CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>) container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))

It is clear that something deep down in the API is handling the compile type in a non generic way of sorts, instead of basing itself on the generic interface.

I mean... it is workable with this but the registration code is getting convoluted for no apparent reason (due to what I would consider as 'non-standard implementation details' on MT's part). Maybe I'm just grasping at straws here? Perhaps this all boils down to 'why does MT not accept it's own, already generic, interface?' Why does it need the concrete type at compile time to see that it is a message handler even though the instance that I'm passing to it is typed as Consumes<X>.All, also at compile time?

Update 3:

After discussing with Travis below, I decided to drop the UnityIntegration assembly completely and go with standalone Consumer calls on the subscription.

I've created a small extension class in our MassTransit specific assembly to facilitate things:

public static class CommandHandlerEx
{
    public static CommandHandlerToConsumerAdapter<T> ToConsumer<T>(this ICommandHandler<T> _handler)
        where T : class, ICommand
    {
        return new CommandHandlerToConsumerAdapter<T>(_handler);
    }
}

And finally registered the handlers like this:

var container = new UnityContainer()
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor");

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(RegisterConsumers);
    });

private void RegisterConsumers(SubscriptionBusServiceConfigurator _s)
{
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Recorder").ToConsumer());
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Executor").ToConsumer());
}

After using the whole day yesterday to try working things out, I strongly suggest that you stay away from the container extension assemblies if you want expected behavior out of the container and/or if you want to customize the classes etc (like I did to decouple our messaging classes from MT specific code) for 2 main reasons:

The logic in the extensions traverse the registrations in the container to find the consumer classes. This is, in my opinion, terrible design. If something wants an implementation from the container, it should just call Resolve or ResolveAll on it's interface (or their equivalent in non Unity terms), without caring for what exactly is registered and what their concrete types are. This can have serious consequences with code that assumes the container can return types that were not explicitly registered. Luckily it's not the case with these classes, but we do have a container extension that automatically creates decorator types based on the build key, and they don't need to be explicitly registered on the container.

The consumer registration uses the MappedToType property on the ContainerRegistration instance to call Resolve on the container. This is just completely wrong on any situation, not just in MassTransit's context. Types in Unity are either registered as a mapping (like in the excerpts above, with a From and To component) or directly as a single concrete type. In BOTH cases the logic should use the RegisteredType type to resolve from the container. The way it works now is that, if you happen to register the handlers with their interfaces, MT will completely bypass your registration logic and call resolve on the concrete type instead, which works in Unity out of the box, possibly causing unpredictable behavior because you think it should be a singleton like you registered but it ends up being a transient object (the default) instead, for instance.

Looking back at it now I can see it was much more complicated that I originally believed. There was quite a bit of learning in the process too, so that's good.

Update 4:

Yesterday I decided to refactor the whole adapter approach a bit before making the final checkin. I went with MassTransit's interface pattern to create my adapters too, because I think that is a very nice and clean syntax.

Here is the result:

public sealed class CommandHandlerToConsumerAdapter<T>
    where T : class, ICommand
{
    public sealed class All : Consumes<T>.All
    {
        private readonly ICommandHandler<T> m_commandHandler;

        public All(ICommandHandler<T> _commandHandler)
        {
            m_commandHandler = _commandHandler;
        }

        public void Consume(T _message)
        {
            m_commandHandler.Handle(_message);
        }
    }
}

Unfortunatelly this breaks MassTransit's code because of an unhandled exception on a utility method in the referenced Magnum library, on an extension method called ToShortTypeName.

Here is the exception:

at System.String.Substring(Int32 startIndex, Int32 length) at Magnum.Extensions.ExtensionsToType.ToShortTypeName(Type type) at MassTransit.Pipeline.Sinks.ConsumerMessageSink2.<>c__DisplayClass1.<Selector>b__0(IConsumeContext1 context) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Pipeline\Sinks\ConsumerMessageSink.cs:line 51 at MassTransit.Pipeline.Sinks.InboundConvertMessageSink`1.<>c_DisplayClass2.<>c_DisplayClass4.b__1(IConsumeContext x) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Pipeline\Sinks\InboundConvertMessageSink.cs:line 45 at MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(IReceiveContext context) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Context\ServiceBusReceiveContext.cs:line 162

解决方案

While I don't know the Unity integration, with all the containers, you must register your consumers as the concrete type in the container and not the Consumes<> interfaces. I assume it's just RegisterType<Handler1, Handler1>() but I'm not totally sure on that.

If you don't like the LoadFrom extension for your container, you don't need to use it anyways. You can always just resolve the consumers yourself and register them via _sbc.Consume(() => container.resolve<YourConsumerType>()) in the configuration instead. The LoadFrom extension is just a convince for people who are using the container in a common way.

The following code works, which is using the container the way I would expect, without knowing your domain more, one to use it. If you want to understand how messages are bound a little bit better, I'd suggest using RabbitMQ because you can easily see where things end up by fallowing the exchange bindings. At this point, this is well beyond a SO question, I'd bring this to the mailing list if you have anything further.

using System;
using MassTransit;
using Microsoft.Practices.Unity;

namespace MT_Unity
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var container = new UnityContainer()
                .RegisterType<ICommandHandler<MyCommand>, MyCommandHandler>()
                .RegisterType<CommandHandlerToConsumerAdapter<MyCommand>>())

            using (IServiceBus consumerBus = ServiceBusFactory.New(sbc =>
                    {
                        sbc.ReceiveFrom("rabbitmq://localhost/consumer");
                        sbc.UseRabbitMq();


                        sbc.Subscribe(s => s.Consumer(() => container.Resolve<CommandHandlerToConsumerAdapter<MyCommand>>()));
                    }))
            using (IServiceBus publisherBus = ServiceBusFactory.New(sbc =>
                    {
                        sbc.ReceiveFrom("rabbitmq://localhost/publisher");
                        sbc.UseRabbitMq();
                    }))
            {
                publisherBus.Publish(new MyCommand());

                Console.ReadKey();
            }
        }
    }

    public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All where T : class, ICommand
    {
        private readonly ICommandHandler<T> _commandHandler;

        public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
        {
            _commandHandler = commandHandler;
        }

        public void Consume(T message)
        {
            _commandHandler.Handle(message);
        }
    }

    public interface ICommand { }
    public class MyCommand : ICommand { }

    public interface ICommandHandler<T> where T : class, ICommand
    {
        void Handle(T message);
    }

    public class MyCommandHandler : ICommandHandler<MyCommand>
    {
        public MyCommandHandler()
        {

        }
        public void Handle(MyCommand message)
        {
            Console.WriteLine("Handled MyCommand");
        }
    }

}