Service Fabric 无状态服务器自定义 UDP 侦听器侦听器、自定义、状态、服务器

2023-09-07 12:58:31 作者:失去的,总是最美好的,例如回忆。

我们正在尝试定义一个 Service Fabric 无状态服务来侦听 UDP 数据.

We are trying to define a Service Fabric Stateless Service which listeners for UDP data.

我们正在与微软合作,他们说它是受支持的,我应该为 TCP 设置;下面是来自 ServiceManifest.xml 文件的片段:

We are working with Microsoft who have said that it IS supported and that I should set up for TCP; below is the snippet from the ServiceManifest.xml file :

<Resources>
    <Endpoints>
      <!-- This endpoint is used by the communication listener to obtain the port on which to 
           listen. Please note that if your service is partitioned, this port is shared with 
           replicas of different partitions that are placed in your code. -->
      <Endpoint Name="ServiceEndpoint" Protocol="tcp" Port="12345" Type="Input" />
    </Endpoints>
</Resources>

服务启动正常,但我无法让服务接收任何 UDP 数据,如果我执行 netstat -a,我在 TCP 或 UDP 端口上看不到任何监听.

The service starts fine but I cannot make the service receive any UDP data and if I do a netstat -a I cannot see anything listening on the port in either TCP or UDP.

我在网上做了很多研究,但在创建自定义 ICommunicationListener 方面没有发现太多,但我希望其他人能够验证这是否应该通过 SF 实现.

I've done plenty of research on online and I've not found much on creating custom ICommunicationListener but I'm hoping that someone else might be able to verify if this should be possible with SF.

这是 ICommunicationListener 的实现:

Here is the ICommunicationListener implementation:

public UdpCommunicationListener(string serviceEndPoint,
            ServiceInitializationParameters serviceInitializationParameters, Action<UdpReceiveResult> connector)
   {
       if (serviceInitializationParameters == null)
       {
           throw new ArgumentNullException(nameof(serviceInitializationParameters));
       }

       var endPoint = serviceInitializationParameters
            .CodePackageActivationContext
            .GetEndpoint(serviceEndPoint ?? "ServiceEndPoint");

       _connector = connector;

        _ipAddress = FabricRuntime.GetNodeContext().IPAddressOrFQDN;
        _port = endPoint.Port;

        _server = new UdpServer(_ipAddress, _port);

        _server.Open();
    }

    public Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        _listener = _server.Listen(_connector);

        return Task.FromResult($"udp::{_ipAddress}:{_port}");
    }

    public Task CloseAsync(CancellationToken cancellationToken)
    {
        this.Abort();

        return Task.FromResult(true);
    }

    public void Abort()
    {
        _listener.Dispose();
        _server?.Close();
    }
}

public class UdpServer
{
    private readonly UdpClient _udpClient;
    private IObservable<UdpReceiveResult> _receiveStream;

    public UdpServer(string ipAddress, int port)
    {
        Id = Guid.NewGuid();

        _udpClient = new UdpClient(ipAddress, port);
    }

    public Guid Id { get; }

    public void Open()
    {
        _receiveStream = _udpClient.ReceiveStream().Publish().RefCount();
    }

    public void Close()
    {
        //TODO: Not sure how to stop the process
    }

    public IDisposable Listen(Action<UdpReceiveResult> process)
    {
        return _receiveStream.Subscribe(async r =>
        {
                process(r);
        });
    }
}

推荐答案

我解决了 UdpServer 组件的一个缺陷,现在它可以在 Service Fabric 服务中托管.

There was an defect with the UdpServer component which I resolved and this now works hosted in a Service Fabric Service.

这行代码的问题:

_udpClient = new UdpClient(ipAddress, port);

这是监听流量的错误过载,它必须是:

This is the wrong overload for listening for traffic, it needed to be:

_udpClient = new UdpClient(port);

我试过了:

_udpClient = new UdpClient(new IPAddress(IPAddress.Parse(_ipAddress)),port)

但这不起作用;由于检索主机的通信侦听(如其自身描述)的行返回主机名而不是 IPAddress,我认为您可以通过对清单进行一些更改来改变此行为,但现在仅端口就足够了.

But this doesn't work; as the line from the Communication listen (as it describes itself) which retrieves the host returns the hostname and not the IPAddress, I think you could alter this behaviour by makes some changes to the manifest but just the port was sufficient for now.