// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. namespace DotNetty.Codecs.Mqtt.Tests { using System.Collections.Generic; using System.Linq; using DotNetty.Buffers; using DotNetty.Codecs.Mqtt.Packets; using DotNetty.Transport.Channels; using Moq; using Xunit; public class MqttCodecTests { static readonly IByteBufferAllocator Allocator = new UnpooledByteBufferAllocator(); readonly MqttDecoder serverDecoder; readonly MqttDecoder clientDecoder; readonly Mock contextMock; public MqttCodecTests() { this.serverDecoder = new MqttDecoder(true, 256 * 1024); this.clientDecoder = new MqttDecoder(false, 256 * 1024); this.contextMock = new Mock(MockBehavior.Strict); this.contextMock.Setup(x => x.Removed).Returns(false); this.contextMock.Setup(x => x.Allocator).Returns(UnpooledByteBufferAllocator.Default); } [Theory] [InlineData("a", true, 0, null, null, "will/topic/name", new byte[] { 5, 3, 255, 6, 5 }, QualityOfService.ExactlyOnce, true)] [InlineData("11a_2", false, 1, "user1", null, "will", new byte[0], QualityOfService.AtLeastOnce, false)] [InlineData("abc/ж", false, 10, "", "pwd", null, null, null, false)] [InlineData("", true, 1000, "имя", "密碼", null, null, null, false)] public void TestConnectMessage(string clientId, bool cleanSession, int keepAlive, string userName, string password, string willTopicName, byte[] willMessage, QualityOfService? willQos, bool willRetain) { var packet = new ConnectPacket(); packet.ClientId = clientId; packet.CleanSession = cleanSession; packet.KeepAliveInSeconds = keepAlive; if (userName != null) { packet.Username = userName; if (password != null) { packet.Password = password; } } if (willTopicName != null) { packet.WillTopicName = willTopicName; packet.WillMessage = Unpooled.WrappedBuffer(willMessage); packet.WillQualityOfService = willQos ?? QualityOfService.AtMostOnce; packet.WillRetain = willRetain; } ConnectPacket recoded = this.RecodePacket(packet, true, true); this.contextMock.Verify(x => x.FireChannelRead(It.IsAny()), Times.Once); Assert.Equal(packet.ClientId, recoded.ClientId); Assert.Equal(packet.CleanSession, recoded.CleanSession); Assert.Equal(packet.KeepAliveInSeconds, recoded.KeepAliveInSeconds); Assert.Equal(packet.HasUsername, recoded.HasUsername); if (packet.HasUsername) { Assert.Equal(packet.Username, recoded.Username); } Assert.Equal(packet.HasPassword, recoded.HasPassword); if (packet.HasPassword) { Assert.Equal(packet.Password, recoded.Password); } if (packet.HasWill) { Assert.Equal(packet.WillTopicName, recoded.WillTopicName); Assert.True(ByteBufferUtil.Equals(Unpooled.WrappedBuffer(willMessage), recoded.WillMessage)); Assert.Equal(packet.WillQualityOfService, recoded.WillQualityOfService); Assert.Equal(packet.WillRetain, recoded.WillRetain); } } [Theory] [InlineData(false, ConnectReturnCode.Accepted)] [InlineData(true, ConnectReturnCode.Accepted)] [InlineData(false, ConnectReturnCode.RefusedUnacceptableProtocolVersion)] [InlineData(false, ConnectReturnCode.RefusedIdentifierRejected)] [InlineData(false, ConnectReturnCode.RefusedServerUnavailable)] [InlineData(false, ConnectReturnCode.RefusedBadUsernameOrPassword)] [InlineData(false, ConnectReturnCode.RefusedNotAuthorized)] public void TestConnAckMessage(bool sessionPresent, ConnectReturnCode returnCode) { var packet = new ConnAckPacket(); packet.SessionPresent = sessionPresent; packet.ReturnCode = returnCode; ConnAckPacket recoded = this.RecodePacket(packet, false, true); this.contextMock.Verify(x => x.FireChannelRead(It.IsAny()), Times.Once); Assert.Equal(packet.SessionPresent, recoded.SessionPresent); Assert.Equal(packet.ReturnCode, recoded.ReturnCode); } [Theory] [InlineData(1, new[] { "+", "+/+", "//", "/#", "+//+" }, new[] { QualityOfService.ExactlyOnce, QualityOfService.AtLeastOnce, QualityOfService.AtMostOnce, QualityOfService.ExactlyOnce, QualityOfService.AtMostOnce })] [InlineData(ushort.MaxValue, new[] { "a" }, new[] { QualityOfService.AtLeastOnce })] public void TestSubscribeMessage(int packetId, string[] topicFilters, QualityOfService[] requestedQosValues) { var packet = new SubscribePacket(packetId, topicFilters.Zip(requestedQosValues, (topic, qos) => new SubscriptionRequest(topic, qos)).ToArray()); SubscribePacket recoded = this.RecodePacket(packet, true, true); clientDecoder this.contextMock.Verify(x => x.FireChannelRead(It.IsAny()), Times.Once); Assert.Equal(packet.Requests, recoded.Requests, EqualityComparer.Default); Assert.Equal(packet.PacketId, recoded.PacketId); } [Theory] [InlineData(1, new[] { QualityOfService.ExactlyOnce, QualityOfService.AtLeastOnce, QualityOfService.AtMostOnce, QualityOfService.Failure, QualityOfService.AtMostOnce })] [InlineData(ushort.MaxValue, new[] { QualityOfService.AtLeastOnce })] public void TestSubAckMessage(int packetId, QualityOfService[] qosValues) { var packet = new SubAckPacket(); packet.PacketId = packetId; packet.ReturnCodes = qosValues; SubAckPacket recoded = this.RecodePacket(packet, false, true); this.contextMock.Verify(x => x.FireChannelRead(It.IsAny()), Times.Once); Assert.Equal(packet.ReturnCodes, recoded.ReturnCodes); Assert.Equal(packet.PacketId, recoded.PacketId); } [Theory] [InlineData(1, new[] { "+", "+/+", "//", "/#", "+//+" })] [InlineData(ushort.MaxValue, new[] { "a" })] public void TestUnsubscribeMessage(int packetId, string[] topicFilters) { var packet = new UnsubscribePacket(packetId, topicFilters); UnsubscribePacket recoded = this.RecodePacket(packet, true, true); this.contextMock.Verify(x => x.FireChannelRead(It.IsAny()), Times.Once); Assert.Equal(packet.TopicFilters, recoded.TopicFilters); Assert.Equal(packet.PacketId, recoded.PacketId); } [Theory] [InlineData(QualityOfService.AtMostOnce, false, false, 1, "a", null)] [InlineData(QualityOfService.ExactlyOnce, true, false, ushort.MaxValue, "/", new byte[0])] [InlineData(QualityOfService.AtLeastOnce, false, true, 129, "a/b", new byte[] { 1, 2, 3 })] [InlineData(QualityOfService.ExactlyOnce, true, true, ushort.MaxValue - 1, "topic/name/that/is/longer/than/256/characters/topic/name/that/is/longer/than/256/characters/topic/name/that/is/longer/than/256/characters/topic/name/that/is/longer/than/256/characters/topic/name/that/is/longer/than/256/characters/topic/name/that/is/longer/than/256/characters/", new byte[] { 1 })] public void TestPublishMessage(QualityOfService qos, bool dup, bool retain, int packetId, string topicName, byte[] payload) { var packet = new PublishPacket(qos, dup, retain); packet.TopicName = topicName; if (qos > QualityOfService.AtMostOnce) { packet.PacketId = packetId; } packet.Payload = payload == null ? null : Unpooled.WrappedBuffer(payload); PublishPacket recoded = this.RecodePacket(packet, false, true); this.contextMock.Verify(x => x.FireChannelRead(It.IsAny()), Times.Once); Assert.Equal(packet.TopicName, recoded.TopicName); if (packet.QualityOfService > QualityOfService.AtMostOnce) { Assert.Equal(packet.PacketId, recoded.PacketId); } Assert.True(ByteBufferUtil.Equals(payload == null ? Unpooled.Empty : Unpooled.WrappedBuffer(payload), recoded.Payload)); } [Theory] [InlineData(1)] [InlineData(127)] [InlineData(128)] [InlineData(256)] [InlineData(257)] [InlineData(ushort.MaxValue)] public void TestPacketIdOnlyResponseMessages(int packetId) { this.TestPublishResponseMessage(packetId, true); this.TestPublishResponseMessage(packetId, false); this.TestPublishResponseMessage(packetId, true); this.TestPublishResponseMessage(packetId, false); this.TestPublishResponseMessage(packetId, true); this.TestPublishResponseMessage(packetId, false); this.TestPublishResponseMessage(packetId, true); this.TestPublishResponseMessage(packetId, false); this.TestPublishResponseMessage(packetId, false); } void TestPublishResponseMessage(int packetId, bool useServer) where T : PacketWithId, new() { var packet = new T { PacketId = packetId }; T recoded = this.RecodePacket(packet, useServer, true); this.contextMock.Verify(x => x.FireChannelRead(It.IsAny()), Times.Once); this.contextMock.ResetCalls(); Assert.Equal(packet.PacketId, recoded.PacketId); } [Fact] public void TestEmptyPacketMessages() { this.TestEmptyPacketMessage(PingReqPacket.Instance, true); this.TestEmptyPacketMessage(PingRespPacket.Instance, false); this.TestEmptyPacketMessage(DisconnectPacket.Instance, true); } void TestEmptyPacketMessage(T packet, bool useServer) where T : Packet { T recoded = this.RecodePacket(packet, useServer, true); this.contextMock.Verify(x => x.FireChannelRead(It.IsAny()), Times.Once); } T RecodePacket(T packet, bool useServer, bool explodeForDecode) where T : Packet { var output = new List(); MqttEncoder.DoEncode(Allocator, packet, output); T observedPacket = null; this.contextMock.Setup(x => x.FireChannelRead(It.IsAny())) .Callback((object message) => observedPacket = Assert.IsAssignableFrom(message)) .Returns(this.contextMock.Object); foreach (IByteBuffer message in output) { MqttDecoder mqttDecoder = useServer ? this.serverDecoder : this.clientDecoder; if (explodeForDecode) { while (message.IsReadable()) { IByteBuffer finalBuffer = message.ReadBytes(1); mqttDecoder.ChannelRead(this.contextMock.Object, finalBuffer); } } else { mqttDecoder.ChannelRead(this.contextMock.Object, message); } } return observedPacket; } } }