1# Copyright (c) Twisted Matrix Laboratories.
2# See LICENSE for details.
3
4"""
5Tests for L{twisted.words.im.ircsupport}.
6"""
7
8from twisted.test.proto_helpers import StringTransport
9from twisted.words.im.basechat import ChatUI, Conversation, GroupConversation
10from twisted.words.im.ircsupport import IRCAccount, IRCProto
11from twisted.words.im.locals import OfflineError
12from twisted.words.test.test_irc import IRCTestCase
13
14
15class StubConversation(Conversation):
16    def show(self):
17        pass
18
19    def showMessage(self, message, metadata):
20        self.message = message
21        self.metadata = metadata
22
23
24class StubGroupConversation(GroupConversation):
25    def setTopic(self, topic, nickname):
26        self.topic = topic
27        self.topicSetBy = nickname
28
29    def show(self):
30        pass
31
32    def showGroupMessage(self, sender, text, metadata=None):
33        self.metadata = metadata
34        self.text = text
35        self.metadata = metadata
36
37
38class StubChatUI(ChatUI):
39    def getConversation(self, group, Class=StubConversation, stayHidden=0):
40        return ChatUI.getGroupConversation(self, group, Class, stayHidden)
41
42    def getGroupConversation(self, group, Class=StubGroupConversation, stayHidden=0):
43        return ChatUI.getGroupConversation(self, group, Class, stayHidden)
44
45
46class IRCProtoTests(IRCTestCase):
47    """
48    Tests for L{IRCProto}.
49    """
50
51    def setUp(self):
52        self.account = IRCAccount(
53            "Some account", False, "alice", None, "example.com", 6667
54        )
55        self.proto = IRCProto(self.account, StubChatUI(), None)
56        self.transport = StringTransport()
57
58    def test_login(self):
59        """
60        When L{IRCProto} is connected to a transport, it sends I{NICK} and
61        I{USER} commands with the username from the account object.
62        """
63        self.proto.makeConnection(self.transport)
64        self.assertEqualBufferValue(
65            self.transport.value(),
66            "NICK alice\r\n" "USER alice foo bar :Twisted-IM user\r\n",
67        )
68
69    def test_authenticate(self):
70        """
71        If created with an account with a password, L{IRCProto} sends a
72        I{PASS} command before the I{NICK} and I{USER} commands.
73        """
74        self.account.password = "secret"
75        self.proto.makeConnection(self.transport)
76        self.assertEqualBufferValue(
77            self.transport.value(),
78            "PASS secret\r\n"
79            "NICK alice\r\n"
80            "USER alice foo bar :Twisted-IM user\r\n",
81        )
82
83    def test_channels(self):
84        """
85        If created with an account with a list of channels, L{IRCProto}
86        joins each of those channels after registering.
87        """
88        self.account.channels = ["#foo", "#bar"]
89        self.proto.makeConnection(self.transport)
90        self.assertEqualBufferValue(
91            self.transport.value(),
92            "NICK alice\r\n"
93            "USER alice foo bar :Twisted-IM user\r\n"
94            "JOIN #foo\r\n"
95            "JOIN #bar\r\n",
96        )
97
98    def test_isupport(self):
99        """
100        L{IRCProto} can interpret I{ISUPPORT} (I{005}) messages from the server
101        and reflect their information in its C{supported} attribute.
102        """
103        self.proto.makeConnection(self.transport)
104        self.proto.dataReceived(":irc.example.com 005 alice MODES=4 CHANLIMIT=#:20\r\n")
105        self.assertEqual(4, self.proto.supported.getFeature("MODES"))
106
107    def test_nick(self):
108        """
109        IRC NICK command changes the nickname of a user.
110        """
111        self.proto.makeConnection(self.transport)
112        self.proto.dataReceived(":alice JOIN #group1\r\n")
113        self.proto.dataReceived(":alice1 JOIN #group1\r\n")
114        self.proto.dataReceived(":alice1 NICK newnick\r\n")
115        self.proto.dataReceived(":alice3 NICK newnick3\r\n")
116        self.assertIn("newnick", self.proto._ingroups)
117        self.assertNotIn("alice1", self.proto._ingroups)
118
119    def test_part(self):
120        """
121        IRC PART command removes a user from an IRC channel.
122        """
123        self.proto.makeConnection(self.transport)
124        self.proto.dataReceived(":alice1 JOIN #group1\r\n")
125        self.assertIn("group1", self.proto._ingroups["alice1"])
126        self.assertNotIn("group2", self.proto._ingroups["alice1"])
127        self.proto.dataReceived(":alice PART #group1\r\n")
128        self.proto.dataReceived(":alice1 PART #group1\r\n")
129        self.proto.dataReceived(":alice1 PART #group2\r\n")
130        self.assertNotIn("group1", self.proto._ingroups["alice1"])
131        self.assertNotIn("group2", self.proto._ingroups["alice1"])
132
133    def test_quit(self):
134        """
135        IRC QUIT command removes a user from all IRC channels.
136        """
137        self.proto.makeConnection(self.transport)
138        self.proto.dataReceived(":alice1 JOIN #group1\r\n")
139        self.assertIn("group1", self.proto._ingroups["alice1"])
140        self.assertNotIn("group2", self.proto._ingroups["alice1"])
141        self.proto.dataReceived(":alice1 JOIN #group3\r\n")
142        self.assertIn("group3", self.proto._ingroups["alice1"])
143        self.proto.dataReceived(":alice1 QUIT\r\n")
144        self.assertTrue(len(self.proto._ingroups["alice1"]) == 0)
145        self.proto.dataReceived(":alice3 QUIT\r\n")
146
147    def test_topic(self):
148        """
149        IRC TOPIC command changes the topic of an IRC channel.
150        """
151        self.proto.makeConnection(self.transport)
152        self.proto.dataReceived(":alice1 JOIN #group1\r\n")
153        self.proto.dataReceived(":alice1 TOPIC #group1 newtopic\r\n")
154        groupConversation = self.proto.getGroupConversation("group1")
155        self.assertEqual(groupConversation.topic, "newtopic")
156        self.assertEqual(groupConversation.topicSetBy, "alice1")
157
158    def test_privmsg(self):
159        """
160        PRIVMSG sends a private message to a user or channel.
161        """
162        self.proto.makeConnection(self.transport)
163        self.proto.dataReceived(":alice1 PRIVMSG t2 test_message_1\r\n")
164        conversation = self.proto.chat.getConversation(self.proto.getPerson("alice1"))
165        self.assertEqual(conversation.message, "test_message_1")
166
167        self.proto.dataReceived(":alice1 PRIVMSG #group1 test_message_2\r\n")
168        groupConversation = self.proto.getGroupConversation("group1")
169        self.assertEqual(groupConversation.text, "test_message_2")
170
171        self.proto.setNick("alice")
172        self.proto.dataReceived(":alice PRIVMSG #foo test_message_3\r\n")
173        groupConversation = self.proto.getGroupConversation("foo")
174        self.assertFalse(hasattr(groupConversation, "text"))
175        conversation = self.proto.chat.getConversation(self.proto.getPerson("alice"))
176        self.assertFalse(hasattr(conversation, "message"))
177
178    def test_action(self):
179        """
180        CTCP ACTION to a user or channel.
181        """
182        self.proto.makeConnection(self.transport)
183        self.proto.dataReceived(":alice1 PRIVMSG alice1 :\01ACTION smiles\01\r\n")
184        conversation = self.proto.chat.getConversation(self.proto.getPerson("alice1"))
185        self.assertEqual(conversation.message, "smiles")
186
187        self.proto.dataReceived(":alice1 PRIVMSG #group1 :\01ACTION laughs\01\r\n")
188        groupConversation = self.proto.getGroupConversation("group1")
189        self.assertEqual(groupConversation.text, "laughs")
190
191        self.proto.setNick("alice")
192        self.proto.dataReceived(":alice PRIVMSG #group1 :\01ACTION cries\01\r\n")
193        groupConversation = self.proto.getGroupConversation("foo")
194        self.assertFalse(hasattr(groupConversation, "text"))
195        conversation = self.proto.chat.getConversation(self.proto.getPerson("alice"))
196        self.assertFalse(hasattr(conversation, "message"))
197
198    def test_rplNamreply(self):
199        """
200        RPL_NAMREPLY server response (353) lists all the users in a channel.
201        RPL_ENDOFNAMES server response (363) is sent at the end of RPL_NAMREPLY
202        to indicate that there are no more names.
203        """
204        self.proto.makeConnection(self.transport)
205        self.proto.dataReceived(
206            ":example.com 353 z3p = #bnl :pSwede Dan- SkOyg @MrOp +MrPlus\r\n"
207        )
208        expectedInGroups = {
209            "Dan-": ["bnl"],
210            "pSwede": ["bnl"],
211            "SkOyg": ["bnl"],
212            "MrOp": ["bnl"],
213            "MrPlus": ["bnl"],
214        }
215        expectedNamReplies = {"bnl": ["pSwede", "Dan-", "SkOyg", "MrOp", "MrPlus"]}
216        self.assertEqual(expectedInGroups, self.proto._ingroups)
217        self.assertEqual(expectedNamReplies, self.proto._namreplies)
218
219        self.proto.dataReceived(":example.com 366 alice #bnl :End of /NAMES list\r\n")
220        self.assertEqual({}, self.proto._namreplies)
221        groupConversation = self.proto.getGroupConversation("bnl")
222        self.assertEqual(expectedNamReplies["bnl"], groupConversation.members)
223
224    def test_rplTopic(self):
225        """
226        RPL_TOPIC server response (332) is sent when a channel's topic is changed
227        """
228        self.proto.makeConnection(self.transport)
229        self.proto.dataReceived(":example.com 332 alice, #foo :Some random topic\r\n")
230        self.assertEqual("Some random topic", self.proto._topics["foo"])
231
232    def test_sendMessage(self):
233        """
234        L{IRCPerson.sendMessage}
235        """
236        self.proto.makeConnection(self.transport)
237        person = self.proto.getPerson("alice")
238        self.assertRaises(OfflineError, person.sendMessage, "Some message")
239
240        person.account.client = self.proto
241        self.transport.clear()
242        person.sendMessage("Some message 2")
243        self.assertEqual(
244            self.transport.io.getvalue(), b"PRIVMSG alice :Some message 2\r\n"
245        )
246
247        self.transport.clear()
248        person.sendMessage("smiles", {"style": "emote"})
249        self.assertEqual(
250            self.transport.io.getvalue(), b"PRIVMSG alice :\x01ACTION smiles\x01\r\n"
251        )
252
253    def test_sendGroupMessage(self):
254        """
255        L{IRCGroup.sendGroupMessage}
256        """
257        self.proto.makeConnection(self.transport)
258        group = self.proto.chat.getGroup("#foo", self.proto)
259        self.assertRaises(OfflineError, group.sendGroupMessage, "Some message")
260
261        group.account.client = self.proto
262        self.transport.clear()
263        group.sendGroupMessage("Some message 2")
264        self.assertEqual(
265            self.transport.io.getvalue(), b"PRIVMSG #foo :Some message 2\r\n"
266        )
267
268        self.transport.clear()
269        group.sendGroupMessage("smiles", {"style": "emote"})
270        self.assertEqual(
271            self.transport.io.getvalue(), b"PRIVMSG #foo :\x01ACTION smiles\x01\r\n"
272        )
273