1# COPYRIGHT (C) 2020-2021 Nicotine+ Team
2# COPYRIGHT (C) 2008-2012 Quinox <quinox@users.sf.net>
3# COPYRIGHT (C) 2007-2009 Daelstorm <daelstorm@gmail.com>
4# COPYRIGHT (C) 2003-2004 Hyriand <hyriand@thegraveyard.org>
5# COPYRIGHT (C) 2001-2003 Alexander Kanavin
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License as published by
9# the Free Software Foundation, either version 3 of the License, or
10# (at your option) any later version.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20"""
21This module implements Soulseek networking protocol.
22"""
23
24import selectors
25import socket
26import struct
27import sys
28import threading
29import time
30
31from pynicotine.logfacility import log
32from pynicotine.slskmessages import AcceptChildren
33from pynicotine.slskmessages import AckNotifyPrivileges
34from pynicotine.slskmessages import AddThingIHate
35from pynicotine.slskmessages import AddThingILike
36from pynicotine.slskmessages import AddToPrivileged
37from pynicotine.slskmessages import AddUser
38from pynicotine.slskmessages import AdminCommand
39from pynicotine.slskmessages import AdminMessage
40from pynicotine.slskmessages import BranchLevel
41from pynicotine.slskmessages import BranchRoot
42from pynicotine.slskmessages import CantConnectToPeer
43from pynicotine.slskmessages import CantCreateRoom
44from pynicotine.slskmessages import ChangePassword
45from pynicotine.slskmessages import CheckPrivileges
46from pynicotine.slskmessages import ChildDepth
47from pynicotine.slskmessages import ConnClose
48from pynicotine.slskmessages import ConnCloseIP
49from pynicotine.slskmessages import ConnectError
50from pynicotine.slskmessages import ConnectToPeer
51from pynicotine.slskmessages import DistribAlive
52from pynicotine.slskmessages import DistribAliveInterval
53from pynicotine.slskmessages import DistribBranchLevel
54from pynicotine.slskmessages import DistribBranchRoot
55from pynicotine.slskmessages import DistribChildDepth
56from pynicotine.slskmessages import DistribEmbeddedMessage
57from pynicotine.slskmessages import DistribMessage
58from pynicotine.slskmessages import DistribSearch
59from pynicotine.slskmessages import DownloadFile
60from pynicotine.slskmessages import EmbeddedMessage
61from pynicotine.slskmessages import ExactFileSearch
62from pynicotine.slskmessages import FileError
63from pynicotine.slskmessages import FileMessage
64from pynicotine.slskmessages import FileOffset
65from pynicotine.slskmessages import FileRequest
66from pynicotine.slskmessages import FileSearch
67from pynicotine.slskmessages import FileSearchRequest
68from pynicotine.slskmessages import FileSearchResult
69from pynicotine.slskmessages import FolderContentsRequest
70from pynicotine.slskmessages import FolderContentsResponse
71from pynicotine.slskmessages import GetPeerAddress
72from pynicotine.slskmessages import GetSharedFileList
73from pynicotine.slskmessages import GetUserStats
74from pynicotine.slskmessages import GetUserStatus
75from pynicotine.slskmessages import GivePrivileges
76from pynicotine.slskmessages import GlobalRecommendations
77from pynicotine.slskmessages import GlobalUserList
78from pynicotine.slskmessages import HaveNoParent
79from pynicotine.slskmessages import IncConn
80from pynicotine.slskmessages import InitPeerConn
81from pynicotine.slskmessages import InitServerConn
82from pynicotine.slskmessages import ItemRecommendations
83from pynicotine.slskmessages import ItemSimilarUsers
84from pynicotine.slskmessages import JoinPublicRoom
85from pynicotine.slskmessages import JoinRoom
86from pynicotine.slskmessages import LeavePublicRoom
87from pynicotine.slskmessages import LeaveRoom
88from pynicotine.slskmessages import Login
89from pynicotine.slskmessages import MessageAcked
90from pynicotine.slskmessages import MessageProgress
91from pynicotine.slskmessages import MessageUser
92from pynicotine.slskmessages import MessageUsers
93from pynicotine.slskmessages import MinParentsInCache
94from pynicotine.slskmessages import PossibleParents
95from pynicotine.slskmessages import NotifyPrivileges
96from pynicotine.slskmessages import ParentInactivityTimeout
97from pynicotine.slskmessages import ParentMinSpeed
98from pynicotine.slskmessages import ParentSpeedRatio
99from pynicotine.slskmessages import PeerInit
100from pynicotine.slskmessages import PeerInitMessage
101from pynicotine.slskmessages import PeerMessage
102from pynicotine.slskmessages import PierceFireWall
103from pynicotine.slskmessages import PlaceholdUpload
104from pynicotine.slskmessages import PlaceInLineResponse
105from pynicotine.slskmessages import PlaceInQueue
106from pynicotine.slskmessages import PlaceInQueueRequest
107from pynicotine.slskmessages import PMessageUser
108from pynicotine.slskmessages import PrivateRoomAdded
109from pynicotine.slskmessages import PrivateRoomAddOperator
110from pynicotine.slskmessages import PrivateRoomAddUser
111from pynicotine.slskmessages import PrivateRoomDismember
112from pynicotine.slskmessages import PrivateRoomDisown
113from pynicotine.slskmessages import PrivateRoomOperatorAdded
114from pynicotine.slskmessages import PrivateRoomOperatorRemoved
115from pynicotine.slskmessages import PrivateRoomOwned
116from pynicotine.slskmessages import PrivateRoomRemoved
117from pynicotine.slskmessages import PrivateRoomRemoveOperator
118from pynicotine.slskmessages import PrivateRoomRemoveUser
119from pynicotine.slskmessages import PrivateRoomSomething
120from pynicotine.slskmessages import PrivateRoomToggle
121from pynicotine.slskmessages import PrivateRoomUsers
122from pynicotine.slskmessages import PrivilegedUsers
123from pynicotine.slskmessages import PublicRoomMessage
124from pynicotine.slskmessages import QueuedDownloads
125from pynicotine.slskmessages import UploadDenied
126from pynicotine.slskmessages import QueueUpload
127from pynicotine.slskmessages import Recommendations
128from pynicotine.slskmessages import RelatedSearch
129from pynicotine.slskmessages import Relogged
130from pynicotine.slskmessages import RemoveThingIHate
131from pynicotine.slskmessages import RemoveThingILike
132from pynicotine.slskmessages import RemoveUser
133from pynicotine.slskmessages import ResetDistributed
134from pynicotine.slskmessages import RoomAdded
135from pynicotine.slskmessages import RoomList
136from pynicotine.slskmessages import RoomRemoved
137from pynicotine.slskmessages import RoomSearch
138from pynicotine.slskmessages import RoomTickerAdd
139from pynicotine.slskmessages import RoomTickerRemove
140from pynicotine.slskmessages import RoomTickerSet
141from pynicotine.slskmessages import RoomTickerState
142from pynicotine.slskmessages import SayChatroom
143from pynicotine.slskmessages import SearchInactivityTimeout
144from pynicotine.slskmessages import SearchParent
145from pynicotine.slskmessages import SendConnectToken
146from pynicotine.slskmessages import SendDownloadSpeed
147from pynicotine.slskmessages import SendUploadSpeed
148from pynicotine.slskmessages import ServerMessage
149from pynicotine.slskmessages import ServerPing
150from pynicotine.slskmessages import SetCurrentConnectionCount
151from pynicotine.slskmessages import SetDownloadLimit
152from pynicotine.slskmessages import SetStatus
153from pynicotine.slskmessages import SetUploadLimit
154from pynicotine.slskmessages import SetWaitPort
155from pynicotine.slskmessages import SharedFileList
156from pynicotine.slskmessages import SharedFoldersFiles
157from pynicotine.slskmessages import SimilarUsers
158from pynicotine.slskmessages import TransferRequest
159from pynicotine.slskmessages import TransferResponse
160from pynicotine.slskmessages import TunneledMessage
161from pynicotine.slskmessages import UnknownPeerMessage
162from pynicotine.slskmessages import UploadFailed
163from pynicotine.slskmessages import UploadFile
164from pynicotine.slskmessages import UploadQueueNotification
165from pynicotine.slskmessages import UserInfoReply
166from pynicotine.slskmessages import UserInfoRequest
167from pynicotine.slskmessages import UserInterests
168from pynicotine.slskmessages import UserJoinedRoom
169from pynicotine.slskmessages import UserLeftRoom
170from pynicotine.slskmessages import UserPrivileged
171from pynicotine.slskmessages import UserSearch
172from pynicotine.slskmessages import WishlistInterval
173from pynicotine.slskmessages import WishlistSearch
174
175
176""" Set the maximum number of open files to the hard limit reported by the OS.
177Our MAXSOCKETS value needs to be lower than the file limit, otherwise our open
178sockets in combination with other file activity can exceed the file limit,
179effectively halting the program. """
180
181if sys.platform == "win32":
182
183    """ For Windows, FD_SETSIZE is set to 512 in the Python source.
184    This limit is hardcoded, so we'll have to live with it for now. """
185
186    MAXSOCKETS = 512
187else:
188    import resource  # pylint: disable=import-error
189
190    if sys.platform == "darwin":
191
192        """ Maximum number of files a process can open is 10240 on macOS.
193        macOS reports INFINITE as hard limit, so we need this special case. """
194
195        MAXFILELIMIT = 10240
196    else:
197        _SOFTLIMIT, MAXFILELIMIT = resource.getrlimit(resource.RLIMIT_NOFILE)
198
199    try:
200        resource.setrlimit(resource.RLIMIT_NOFILE, (MAXFILELIMIT, MAXFILELIMIT))
201
202    except Exception as rlimit_error:
203        log.add("Failed to set RLIMIT_NOFILE: %s", rlimit_error)
204
205    """ Set the maximum number of open sockets to a lower value than the hard limit,
206    otherwise we just waste resources.
207    The maximum is 1024, but can be lower if the file limit is too low. """
208
209    MAXSOCKETS = min(max(int(MAXFILELIMIT * 0.75), 50), 1024)
210
211UINT_UNPACK = struct.Struct("<I").unpack
212DOUBLE_UINT_UNPACK = struct.Struct("<II").unpack
213
214UINT_PACK = struct.Struct("<I").pack
215
216
217class Connection:
218    """
219    Holds data about a connection. conn is a socket object,
220    addr is (ip, port) pair, ibuf and obuf are input and output msgBuffer,
221    init is a PeerInit object (see slskmessages docstrings).
222    """
223
224    __slots__ = ("conn", "addr", "ibuf", "obuf", "events", "init", "lastactive", "lastreadlength")
225
226    def __init__(self, conn=None, addr=None, events=None):
227        self.conn = conn
228        self.addr = addr
229        self.events = events
230        self.ibuf = bytearray()
231        self.obuf = bytearray()
232        self.init = None
233        self.lastactive = time.time()
234        self.lastreadlength = 100 * 1024
235
236
237class PeerConnection(Connection):
238
239    __slots__ = ("filereq", "filedown", "fileupl", "filereadbytes", "bytestoread", "piercefw",
240                 "lastcallback")
241
242    def __init__(self, conn=None, addr=None, events=None, init=None):
243        Connection.__init__(self, conn, addr, events)
244        self.filereq = None
245        self.filedown = None
246        self.fileupl = None
247        self.filereadbytes = 0
248        self.bytestoread = 0
249        self.init = init
250        self.piercefw = None
251        self.lastactive = time.time()
252        self.lastcallback = time.time()
253
254
255class PeerConnectionInProgress:
256    """ As all p2p connect()s are non-blocking, this class is used to
257    hold data about a connection that is not yet established. msgObj is
258    a message to be sent after the connection has been established.
259    """
260    __slots__ = ("conn", "msg_obj", "lastactive")
261
262    def __init__(self, conn=None, msg_obj=None):
263        self.conn = conn
264        self.msg_obj = msg_obj
265        self.lastactive = time.time()
266
267
268class SlskProtoThread(threading.Thread):
269    """ This is a networking thread that actually does all the communication.
270    It sends data to the NicotineCore via a callback function and receives
271    data via a deque object. """
272
273    """ Server and peers send each other small binary messages, that start
274    with length and message code followed by the actual message data.
275    These are the codes."""
276
277    servercodes = {
278        Login: 1,
279        SetWaitPort: 2,
280        GetPeerAddress: 3,
281        AddUser: 5,
282        RemoveUser: 6,
283        GetUserStatus: 7,
284        SayChatroom: 13,
285        JoinRoom: 14,
286        LeaveRoom: 15,
287        UserJoinedRoom: 16,
288        UserLeftRoom: 17,
289        ConnectToPeer: 18,
290        MessageUser: 22,
291        MessageAcked: 23,
292        FileSearch: 26,
293        SetStatus: 28,
294        ServerPing: 32,               # Deprecated
295        SendConnectToken: 33,         # Obsolete
296        SendDownloadSpeed: 34,        # Obsolete
297        SharedFoldersFiles: 35,
298        GetUserStats: 36,
299        QueuedDownloads: 40,          # Obsolete
300        Relogged: 41,
301        UserSearch: 42,
302        AddThingILike: 51,            # Deprecated
303        RemoveThingILike: 52,         # Deprecated
304        Recommendations: 54,          # Deprecated
305        GlobalRecommendations: 56,    # Deprecated
306        UserInterests: 57,            # Deprecated
307        AdminCommand: 58,             # Obsolete
308        PlaceInLineResponse: 60,      # Obsolete
309        RoomAdded: 62,                # Obsolete
310        RoomRemoved: 63,              # Obsolete
311        RoomList: 64,
312        ExactFileSearch: 65,          # Obsolete
313        AdminMessage: 66,
314        GlobalUserList: 67,           # Obsolete
315        TunneledMessage: 68,          # Obsolete
316        PrivilegedUsers: 69,
317        HaveNoParent: 71,
318        SearchParent: 73,             # Deprecated
319        ParentMinSpeed: 83,
320        ParentSpeedRatio: 84,
321        ParentInactivityTimeout: 86,  # Obsolete
322        SearchInactivityTimeout: 87,  # Obsolete
323        MinParentsInCache: 88,        # Obsolete
324        DistribAliveInterval: 90,     # Obsolete
325        AddToPrivileged: 91,          # Obsolete
326        CheckPrivileges: 92,
327        EmbeddedMessage: 93,
328        AcceptChildren: 100,
329        PossibleParents: 102,
330        WishlistSearch: 103,
331        WishlistInterval: 104,
332        SimilarUsers: 110,            # Deprecated
333        ItemRecommendations: 111,     # Deprecated
334        ItemSimilarUsers: 112,        # Deprecated
335        RoomTickerState: 113,
336        RoomTickerAdd: 114,
337        RoomTickerRemove: 115,
338        RoomTickerSet: 116,
339        AddThingIHate: 117,           # Deprecated
340        RemoveThingIHate: 118,        # Deprecated
341        RoomSearch: 120,
342        SendUploadSpeed: 121,
343        UserPrivileged: 122,          # Deprecated
344        GivePrivileges: 123,
345        NotifyPrivileges: 124,        # Deprecated
346        AckNotifyPrivileges: 125,     # Deprecated
347        BranchLevel: 126,
348        BranchRoot: 127,
349        ChildDepth: 129,
350        ResetDistributed: 130,
351        PrivateRoomUsers: 133,
352        PrivateRoomAddUser: 134,
353        PrivateRoomRemoveUser: 135,
354        PrivateRoomDismember: 136,
355        PrivateRoomDisown: 137,
356        PrivateRoomSomething: 138,    # Obsolete
357        PrivateRoomAdded: 139,
358        PrivateRoomRemoved: 140,
359        PrivateRoomToggle: 141,
360        ChangePassword: 142,
361        PrivateRoomAddOperator: 143,
362        PrivateRoomRemoveOperator: 144,
363        PrivateRoomOperatorAdded: 145,
364        PrivateRoomOperatorRemoved: 146,
365        PrivateRoomOwned: 148,
366        MessageUsers: 149,
367        JoinPublicRoom: 150,          # Deprecated
368        LeavePublicRoom: 151,         # Deprecated
369        PublicRoomMessage: 152,       # Deprecated
370        RelatedSearch: 153,           # Obsolete
371        CantConnectToPeer: 1001,
372        CantCreateRoom: 1003
373    }
374
375    peerinitcodes = {
376        PierceFireWall: 0,
377        PeerInit: 1
378    }
379
380    peercodes = {
381        GetSharedFileList: 4,
382        SharedFileList: 5,
383        FileSearchRequest: 8,         # Obsolete
384        FileSearchResult: 9,
385        UserInfoRequest: 15,
386        UserInfoReply: 16,
387        PMessageUser: 22,             # Deprecated
388        FolderContentsRequest: 36,
389        FolderContentsResponse: 37,
390        TransferRequest: 40,
391        TransferResponse: 41,
392        PlaceholdUpload: 42,          # Obsolete
393        QueueUpload: 43,
394        PlaceInQueue: 44,
395        UploadFailed: 46,
396        UploadDenied: 50,
397        PlaceInQueueRequest: 51,
398        UploadQueueNotification: 52,  # Deprecated
399        UnknownPeerMessage: 12547
400    }
401
402    distribcodes = {
403        DistribAlive: 0,
404        DistribSearch: 3,
405        DistribBranchLevel: 4,
406        DistribBranchRoot: 5,
407        DistribChildDepth: 7,         # Deprecated
408        DistribEmbeddedMessage: 93
409    }
410
411    IN_PROGRESS_STALE_AFTER = 2
412    CONNECTION_MAX_IDLE = 60
413    CONNCOUNT_CALLBACK_INTERVAL = 0.5
414
415    def __init__(self, core_callback, queue, bindip, interface, port, port_range, network_filter, eventprocessor):
416        """ core_callback is a NicotineCore callback function to be called with messages
417        list as a parameter. queue is deque object that holds network messages from
418        NicotineCore. """
419
420        threading.Thread.__init__(self)
421
422        self.name = "NetworkThread"
423
424        if sys.platform not in ("linux", "darwin"):
425            # TODO: support custom network interface for other systems than Linux and macOS
426            interface = None
427
428        self._core_callback = core_callback
429        self._queue = queue
430        self._callback_msgs = []
431        self._want_abort = False
432        self.server_disconnected = True
433        self.bindip = bindip
434        self.listenport = None
435        self.portrange = (port, port) if port else port_range
436        self.interface = interface
437        self._network_filter = network_filter
438        self._eventprocessor = eventprocessor
439
440        self.serverclasses = {}
441        for code_class, code_id in self.servercodes.items():
442            self.serverclasses[code_id] = code_class
443
444        self.peerinitclasses = {}
445        for code_class, code_id in self.peerinitcodes.items():
446            self.peerinitclasses[code_id] = code_class
447
448        self.peerclasses = {}
449        for code_class, code_id in self.peercodes.items():
450            self.peerclasses[code_id] = code_class
451
452        self.distribclasses = {}
453        for code_class, code_id in self.distribcodes.items():
454            self.distribclasses[code_id] = code_class
455
456        # Select Networking Input and Output sockets
457        self.selector = selectors.DefaultSelector()
458
459        self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
460        self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
461        self.listen_socket.setblocking(0)
462
463        self.server_socket = None
464        self._numsockets = 1
465
466        self._conns = {}
467        self._connsinprogress = {}
468        self._calc_upload_limit_function = self._calc_upload_limit_none
469        self._upload_limit = 0
470        self._download_limit = 0
471        self._upload_limit_split = 0
472        self._download_limit_split = 0
473        self._ulimits = {}
474        self._dlimits = {}
475        self.total_uploads = 0
476        self.total_downloads = 0
477        self.last_conncount_callback = time.time()
478        self.last_cycle_time = time.time()
479        self.current_cycle_loop_count = 0
480        self.last_cycle_loop_count = 0
481        self.loops_per_second = 0
482
483        self.bind_listen_port()
484
485        self.daemon = True
486        self.start()
487
488    """ General """
489
490    def validate_listen_port(self):
491
492        if self.listenport is not None:
493            return True
494
495        return False
496
497    def validate_network_interface(self):
498
499        try:
500            if self.interface and self.interface not in (name for _i, name in socket.if_nameindex()):
501                return False
502
503        except AttributeError:
504            pass
505
506        return True
507
508    @staticmethod
509    def get_interface_ip_address(if_name):
510
511        try:
512            import fcntl
513            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
514
515            ip_if = fcntl.ioctl(sock.fileno(),
516                                0x8915,  # SIOCGIFADDR
517                                struct.pack('256s', if_name.encode()[:15]))
518
519            ip_address = socket.inet_ntoa(ip_if[20:24])
520
521        except ImportError:
522            ip_address = None
523
524        return ip_address
525
526    def bind_to_network_interface(self, sock, if_name):
527
528        try:
529            if sys.platform == "linux":
530                sock.setsockopt(socket.SOL_SOCKET, 25, if_name.encode())
531                self.bindip = None
532                return
533
534            if sys.platform == "darwin":
535                sock.setsockopt(socket.IPPROTO_IP, 25, socket.if_nametoindex(if_name))
536                self.bindip = None
537                return
538
539        except PermissionError:
540            pass
541
542        # System does not support changing the network interface
543        # Retrieve the IP address of the interface, and bind to it instead
544        self.bindip = self.get_interface_ip_address(if_name)
545
546    def bind_listen_port(self):
547
548        if not self.validate_network_interface():
549            return
550
551        if self.interface and not self.bindip:
552            self.bind_to_network_interface(self.listen_socket, self.interface)
553
554        ip_address = self.bindip or ''
555
556        for listenport in range(int(self.portrange[0]), int(self.portrange[1]) + 1):
557            try:
558                self.listen_socket.bind((ip_address, listenport))
559                self.listen_socket.listen()
560                self.listenport = listenport
561                log.add(_("Listening on port: %i"), listenport)
562                log.add_debug("Maximum number of concurrent connections (sockets): %i", MAXSOCKETS)
563                break
564
565            except socket.error:
566                continue
567
568    def server_connect(self):
569        """ We've connected to the server """
570        self.server_disconnected = False
571
572    def server_disconnect(self):
573        """ We've disconnected from the server, clean up """
574
575        self.server_disconnected = True
576        self.server_socket = None
577
578        for connection in self._conns.copy():
579            self.close_connection(self._conns, connection)
580
581        for connection in self._connsinprogress.copy():
582            self.close_connection(self._connsinprogress, connection)
583
584        self._queue.clear()
585
586        if not self._want_abort:
587            self._callback_msgs.append(SetCurrentConnectionCount(0))
588
589    def abort(self):
590        """ Call this to abort the thread """
591        self._want_abort = True
592
593        self.selector.close()
594        self.server_disconnect()
595
596    """ File Transfers """
597
598    @staticmethod
599    def _is_upload(conn):
600        return conn.__class__ is PeerConnection and conn.fileupl is not None
601
602    @staticmethod
603    def _is_download(conn):
604        return conn.__class__ is PeerConnection and conn.filedown is not None
605
606    def _calc_upload_limit(self, limit_disabled=False, limit_per_transfer=False):
607
608        limit = self._upload_limit
609        loop_limit = 1024  # 1 KB/s is the minimum upload speed per transfer
610
611        if limit_disabled or limit < loop_limit:
612            self._upload_limit_split = 0
613            return
614
615        if not limit_per_transfer and self.total_uploads > 1:
616            limit = limit // self.total_uploads
617
618        self._upload_limit_split = int(limit)
619
620    def _calc_upload_limit_by_transfer(self):
621        return self._calc_upload_limit(limit_per_transfer=True)
622
623    def _calc_upload_limit_none(self):
624        return self._calc_upload_limit(limit_disabled=True)
625
626    def _calc_download_limit(self):
627
628        limit = self._download_limit
629        loop_limit = 1024  # 1 KB/s is the minimum download speed per transfer
630
631        if limit < loop_limit:
632            # Download limit disabled
633            self._download_limit_split = 0
634            return
635
636        if self.total_downloads > 1:
637            limit = limit // self.total_downloads
638
639        self._download_limit_split = int(limit)
640
641    def _calc_loops_per_second(self):
642        """ Calculate number of loops per second. This value is used to split the
643        per-second transfer speed limit evenly for each loop. """
644
645        curtime = time.time()
646
647        if curtime - self.last_cycle_time >= 1:
648            self.loops_per_second = (self.last_cycle_loop_count + self.current_cycle_loop_count) // 2
649
650            self.last_cycle_loop_count = self.current_cycle_loop_count
651            self.last_cycle_time = curtime
652            self.current_cycle_loop_count = 0
653        else:
654            self.current_cycle_loop_count = self.current_cycle_loop_count + 1
655
656    def set_conn_speed_limit(self, connection, limit, limits):
657
658        limit = limit // (self.loops_per_second or 1)
659
660        if limit > 0:
661            limits[connection] = limit
662
663    """ Connections """
664
665    def socket_still_active(self, conn):
666
667        try:
668            connection = self._conns[conn]
669
670        except KeyError:
671            return False
672
673        return len(connection.obuf) > 0 or len(connection.ibuf) > 0
674
675    @staticmethod
676    def pack_network_message(msg_obj):
677
678        try:
679            return msg_obj.make_network_message()
680
681        except Exception:
682            from traceback import format_exc
683            log.add(("Unable to pack message type %(msg_type)s. %(error)s"),
684                    {'msg_type': msg_obj.__class__, 'error': format_exc()})
685
686        return None
687
688    @staticmethod
689    def unpack_network_message(msg_class, msg_buffer, msg_size, conn_type, conn=None):
690
691        try:
692            if conn is not None:
693                msg = msg_class(conn)
694            else:
695                msg = msg_class()
696
697            msg.parse_network_message(msg_buffer)
698            return msg
699
700        except Exception as error:
701            log.add(("Unable to parse %(conn_type)s message type %(msg_type)s size %(size)i "
702                    "contents %(msg_buffer)s: %(error)s"),
703                    {'conn_type': conn_type, 'msg_type': msg_class, 'size': msg_size,
704                     'msg_buffer': msg_buffer, 'error': error})
705
706        return None
707
708    def modify_connection_events(self, conn_obj, events):
709
710        if conn_obj.events != events:
711            self.selector.modify(conn_obj.conn, events)
712            conn_obj.events = events
713
714    def close_connection(self, connection_list, connection):
715
716        if connection not in connection_list:
717            # Already removed
718            return
719
720        conn_obj = connection_list[connection]
721
722        if self._is_download(conn_obj):
723            self.total_downloads -= 1
724            self._calc_download_limit()
725
726        elif self._is_upload(conn_obj):
727            self.total_uploads -= 1
728            self._calc_upload_limit_function()
729
730        # If we're shutting down, we've already closed the selector in abort()
731        if not self._want_abort:
732            self.selector.unregister(connection)
733
734        connection.close()
735        del connection_list[connection]
736        self._numsockets -= 1
737
738        if connection is self.server_socket:
739            # Disconnected from server, clean up connections and queue
740            self.server_disconnect()
741
742    def close_connection_by_ip(self, ip_address):
743
744        for connection in self._conns.copy():
745            conn_obj = self._conns.get(connection)
746
747            if not conn_obj or connection is self.server_socket:
748                continue
749
750            addr = conn_obj.addr
751
752            if ip_address == addr[0]:
753                log.add_conn("Blocking peer connection to IP address %(ip)s:%(port)s", {
754                    "ip": addr[0],
755                    "port": addr[1]
756                })
757                self._callback_msgs.append(ConnClose(connection, addr))
758                self.close_connection(self._conns, connection)
759
760    """ Server Connection """
761
762    @staticmethod
763    def set_server_socket_keepalive(server_socket, idle=10, interval=4, count=10):
764        """ Ensure we are disconnected from the server in case of connectivity issues,
765        by sending TCP keepalive pings. Assuming default values are used, once we reach
766        10 seconds of idle time, we start sending keepalive pings once every 4 seconds.
767        If 10 failed pings have been sent in a row (40 seconds), the connection is presumed
768        dead. """
769
770        if hasattr(socket, 'SO_KEEPALIVE'):
771            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)  # pylint: disable=maybe-no-member
772
773        if hasattr(socket, 'TCP_KEEPINTVL'):
774            server_socket.setsockopt(socket.IPPROTO_TCP,
775                                     socket.TCP_KEEPINTVL, interval)  # pylint: disable=maybe-no-member
776
777        if hasattr(socket, 'TCP_KEEPCNT'):
778            server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, count)  # pylint: disable=maybe-no-member
779
780        if hasattr(socket, 'TCP_KEEPIDLE'):
781            server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)  # pylint: disable=maybe-no-member
782
783        elif hasattr(socket, 'TCP_KEEPALIVE'):
784            # macOS fallback
785
786            server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, idle)  # pylint: disable=maybe-no-member
787
788        elif hasattr(socket, 'SIO_KEEPALIVE_VALS'):
789            """ Windows fallback
790            Probe count is set to 10 on a system level, and can't be modified.
791            https://docs.microsoft.com/en-us/windows/win32/winsock/so-keepalive """
792
793            server_socket.ioctl(
794                socket.SIO_KEEPALIVE_VALS,  # pylint: disable=maybe-no-member
795                (
796                    1,
797                    idle * 1000,
798                    interval * 1000
799                )
800            )
801
802    def init_server_conn(self, msg_obj):
803
804        try:
805            self.server_socket = server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
806            server_socket.setblocking(0)
807
808            # Detect if our connection to the server is still alive
809            self.set_server_socket_keepalive(server_socket)
810
811            if self.bindip:
812                server_socket.bind((self.bindip, 0))
813
814            elif self.interface:
815                self.bind_to_network_interface(server_socket, self.interface)
816
817            server_socket.connect_ex(msg_obj.addr)
818
819            self.selector.register(server_socket, selectors.EVENT_READ | selectors.EVENT_WRITE)
820            self._connsinprogress[server_socket] = PeerConnectionInProgress(server_socket, msg_obj)
821            self._numsockets += 1
822
823        except socket.error as err:
824            self._callback_msgs.append(ConnectError(msg_obj, err))
825            server_socket.close()
826            self.server_disconnect()
827
828    def process_server_input(self, conn, msg_buffer):
829        """ Server has sent us something, this function retrieves messages
830        from the msg_buffer, creates message objects and returns them and the rest
831        of the msg_buffer.
832        """
833
834        msg_buffer_mem = memoryview(msg_buffer)
835        buffer_len = len(msg_buffer_mem)
836        idx = 0
837
838        # Server messages are 8 bytes or greater in length
839        while buffer_len >= 8:
840            msgsize, msgtype = DOUBLE_UINT_UNPACK(msg_buffer_mem[idx:idx + 8])
841            msgsize_total = msgsize + 4
842
843            if msgsize_total > buffer_len or msgsize < 0:
844                # Invalid message size or buffer is being filled
845                break
846
847            # Unpack server messages
848            if msgtype in self.serverclasses:
849                msg = self.unpack_network_message(
850                    self.serverclasses[msgtype], msg_buffer_mem[idx + 8:idx + msgsize_total], msgsize - 4, "server")
851
852                if msg is not None:
853                    self._callback_msgs.append(msg)
854
855            else:
856                log.add("Server message type %(type)i size %(size)i contents %(msg_buffer)s unknown",
857                        {'type': msgtype, 'size': msgsize - 4, 'msg_buffer': msg_buffer[idx + 8:idx + msgsize_total]})
858
859            idx += msgsize_total
860            buffer_len -= msgsize_total
861
862        conn.ibuf = msg_buffer[idx:]
863
864    def process_server_output(self, msg_obj):
865
866        if self.server_socket not in self._conns:
867            log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", {
868                'type': msg_obj.__class__,
869                'msg_obj': vars(msg_obj)
870            })
871            return
872
873        msg = self.pack_network_message(msg_obj)
874
875        if msg is None:
876            return
877
878        conn_obj = self._conns[self.server_socket]
879        conn_obj.obuf.extend(UINT_PACK(len(msg) + 4))
880        conn_obj.obuf.extend(UINT_PACK(self.servercodes[msg_obj.__class__]))
881        conn_obj.obuf.extend(msg)
882
883        self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE)
884
885    """ Peer Init """
886
887    def process_peer_init_input(self, conn, msg_buffer):
888
889        msg_buffer_mem = memoryview(msg_buffer)
890        buffer_len = len(msg_buffer_mem)
891        idx = 0
892
893        # Peer init messages are 8 bytes or greater in length
894        while buffer_len >= 8 and conn.init is None:
895            msgsize = UINT_UNPACK(msg_buffer_mem[idx:idx + 4])[0]
896            msgsize_total = msgsize + 4
897
898            if msgsize_total > buffer_len or msgsize < 0:
899                # Invalid message size or buffer is being filled
900                break
901
902            msgtype = msg_buffer_mem[idx + 4]
903
904            # Unpack peer init messages
905            if msgtype in self.peerinitclasses:
906                msg = self.unpack_network_message(
907                    self.peerinitclasses[msgtype], msg_buffer_mem[idx + 5:idx + msgsize_total], msgsize - 1,
908                    "peer init", conn)
909
910                if msg is not None:
911                    if self.peerinitclasses[msgtype] is PierceFireWall:
912                        conn.piercefw = msg
913
914                    elif self.peerinitclasses[msgtype] is PeerInit:
915                        conn.init = msg
916
917                    self._callback_msgs.append(msg)
918
919            else:
920                if conn.piercefw is None:
921                    log.add("Peer init message type %(type)i size %(size)i contents %(msg_buffer)s unknown",
922                            {'type': msgtype, 'size': msgsize - 1,
923                             'msg_buffer': msg_buffer[idx + 5:idx + msgsize_total]})
924
925                    self._callback_msgs.append(ConnClose(conn.conn, conn.addr))
926                    self.close_connection(self._conns, conn)
927
928                break
929
930            idx += msgsize_total
931            buffer_len -= msgsize_total
932
933        conn.ibuf = msg_buffer[idx:]
934
935    def process_peer_init_output(self, msg_obj):
936
937        if msg_obj.conn not in self._conns:
938            log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", {
939                'type': msg_obj.__class__,
940                'msg_obj': vars(msg_obj)
941            })
942            return
943
944        # Pack peer init messages
945        if msg_obj.__class__ is PierceFireWall:
946            conn_obj = self._conns[msg_obj.conn]
947            msg = self.pack_network_message(msg_obj)
948
949            if msg is None:
950                return
951
952            conn_obj.piercefw = msg_obj
953
954            conn_obj.obuf.extend(UINT_PACK(len(msg) + 1))
955            conn_obj.obuf.extend(bytes([self.peerinitcodes[msg_obj.__class__]]))
956            conn_obj.obuf.extend(msg)
957
958        elif msg_obj.__class__ is PeerInit:
959            conn_obj = self._conns[msg_obj.conn]
960            msg = self.pack_network_message(msg_obj)
961
962            if msg is None:
963                return
964
965            conn_obj.init = msg_obj
966
967            if conn_obj.piercefw is not None:
968                return
969
970            conn_obj.obuf.extend(UINT_PACK(len(msg) + 1))
971            conn_obj.obuf.extend(bytes([self.peerinitcodes[msg_obj.__class__]]))
972            conn_obj.obuf.extend(msg)
973
974        self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE)
975
976    """ Peer Connection """
977
978    def init_peer_conn(self, msg_obj):
979
980        try:
981            conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
982            conn.setblocking(0)
983
984            if self.bindip:
985                conn.bind((self.bindip, 0))
986
987            elif self.interface:
988                self.bind_to_network_interface(conn, self.interface)
989
990            conn.connect_ex(msg_obj.addr)
991
992            self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE)
993            self._connsinprogress[conn] = PeerConnectionInProgress(conn, msg_obj)
994            self._numsockets += 1
995
996        except socket.error as err:
997            self._callback_msgs.append(ConnectError(msg_obj, err))
998            conn.close()
999
1000    def process_peer_input(self, conn, msg_buffer):
1001        """ We have a "P" connection (p2p exchange), peer has sent us
1002        something, this function retrieves messages
1003        from the msg_buffer, creates message objects and returns them
1004        and the rest of the msg_buffer.
1005        """
1006
1007        msg_buffer_mem = memoryview(msg_buffer)
1008        buffer_len = len(msg_buffer_mem)
1009        idx = 0
1010        search_result_received = False
1011
1012        # Peer messages are 8 bytes or greater in length
1013        while buffer_len >= 8:
1014            msgsize, msgtype = DOUBLE_UINT_UNPACK(msg_buffer_mem[idx:idx + 8])
1015            msgsize_total = msgsize + 4
1016
1017            try:
1018                peer_class = self.peerclasses[msgtype]
1019
1020                if peer_class in (SharedFileList, UserInfoReply):
1021                    # Send progress to the main thread
1022                    self._callback_msgs.append(
1023                        MessageProgress(conn.init.target_user, peer_class, buffer_len, msgsize_total))
1024
1025            except KeyError:
1026                pass
1027
1028            if msgsize_total > buffer_len or msgsize < 0:
1029                # Invalid message size or buffer is being filled
1030                break
1031
1032            # Unpack peer messages
1033            if msgtype in self.peerclasses:
1034                msg = self.unpack_network_message(
1035                    self.peerclasses[msgtype], msg_buffer_mem[idx + 8:idx + msgsize_total], msgsize - 4, "peer", conn)
1036
1037                if msg.__class__ is FileSearchResult:
1038                    search_result_received = True
1039
1040                if msg is not None:
1041                    self._callback_msgs.append(msg)
1042
1043            else:
1044                host = port = "unknown"
1045
1046                if conn.init.conn is not None and conn.addr is not None:
1047                    host = conn.addr[0]
1048                    port = conn.addr[1]
1049
1050                log.add(("Peer message type %(type)s size %(size)i contents %(msg_buffer)s unknown, "
1051                         "from user: %(user)s, %(host)s:%(port)s"),
1052                        {'type': msgtype, 'size': msgsize - 4, 'msg_buffer': msg_buffer[idx + 8:idx + msgsize_total],
1053                         'user': conn.init.target_user, 'host': host, 'port': port})
1054
1055            idx += msgsize_total
1056            buffer_len -= msgsize_total
1057
1058        conn.ibuf = msg_buffer[idx:]
1059
1060        if search_result_received and not self.socket_still_active(conn.conn):
1061            # Forcibly close peer connection. Only used after receiving a search result,
1062            # as we need to get rid of peer connections before they pile up.
1063
1064            self._callback_msgs.append(ConnClose(conn.conn, conn.addr))
1065            self.close_connection(self._conns, conn.conn)
1066
1067    def process_peer_output(self, msg_obj):
1068
1069        if msg_obj.conn not in self._conns:
1070            log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", {
1071                'type': msg_obj.__class__,
1072                'msg_obj': vars(msg_obj)
1073            })
1074            return
1075
1076        # Pack peer messages
1077        msg = self.pack_network_message(msg_obj)
1078
1079        if msg is None:
1080            return
1081
1082        conn_obj = self._conns[msg_obj.conn]
1083        conn_obj.obuf.extend(UINT_PACK(len(msg) + 4))
1084        conn_obj.obuf.extend(UINT_PACK(self.peercodes[msg_obj.__class__]))
1085        conn_obj.obuf.extend(msg)
1086
1087        self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE)
1088
1089    """ File Connection """
1090
1091    def process_file_input(self, conn, msg_buffer):
1092        """ We have a "F" connection (filetransfer), peer has sent us
1093        something, this function retrieves messages
1094        from the msg_buffer, creates message objects and returns them
1095        and the rest of the msg_buffer.
1096        """
1097
1098        if conn.filereq is None:
1099            msgsize = 4
1100            msg = self.unpack_network_message(FileRequest, msg_buffer[:msgsize], msgsize, "file", conn.conn)
1101
1102            if msg is not None and msg.req is not None:
1103                self._callback_msgs.append(msg)
1104                conn.filereq = msg
1105
1106            msg_buffer = msg_buffer[msgsize:]
1107
1108        elif conn.filedown is not None:
1109            leftbytes = conn.bytestoread - conn.filereadbytes
1110            addedbytes = msg_buffer[:leftbytes]
1111
1112            if leftbytes > 0:
1113                try:
1114                    conn.filedown.file.write(addedbytes)
1115
1116                except IOError as strerror:
1117                    self._callback_msgs.append(FileError(conn, conn.filedown.file, strerror))
1118                    self._callback_msgs.append(ConnClose(conn.conn, conn.addr))
1119                    self.close_connection(self._conns, conn.conn)
1120
1121                except ValueError:
1122                    pass
1123
1124            addedbyteslen = len(addedbytes)
1125            curtime = time.time()
1126            finished = ((leftbytes - addedbyteslen) == 0)
1127
1128            if finished or (curtime - conn.lastcallback) > 1:
1129                # We save resources by not sending data back to the NicotineCore
1130                # every time a part of a file is downloaded
1131
1132                self._callback_msgs.append(DownloadFile(conn.conn, conn.filedown.file))
1133                conn.lastcallback = curtime
1134
1135            if finished:
1136                self._callback_msgs.append(ConnClose(conn.conn, conn.addr))
1137                self.close_connection(self._conns, conn.conn)
1138
1139            conn.filereadbytes += addedbyteslen
1140            msg_buffer = msg_buffer[leftbytes:]
1141
1142        elif conn.fileupl is not None and conn.fileupl.offset is None:
1143            msgsize = 8
1144            msg = self.unpack_network_message(FileOffset, msg_buffer[:msgsize], msgsize, "file", conn)
1145
1146            if msg is not None and msg.offset is not None:
1147                try:
1148                    conn.fileupl.file.seek(msg.offset)
1149                    self.modify_connection_events(conn, selectors.EVENT_READ | selectors.EVENT_WRITE)
1150
1151                except IOError as strerror:
1152                    self._callback_msgs.append(FileError(conn, conn.fileupl.file, strerror))
1153                    self._callback_msgs.append(ConnClose(conn.conn, conn.addr))
1154                    self.close_connection(self._conns, conn.conn)
1155
1156                except ValueError:
1157                    pass
1158
1159                conn.fileupl.offset = msg.offset
1160                self._callback_msgs.append(conn.fileupl)
1161
1162            msg_buffer = msg_buffer[msgsize:]
1163
1164        conn.ibuf = msg_buffer
1165
1166    def process_file_output(self, msg_obj):
1167
1168        if msg_obj.conn not in self._conns:
1169            log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", {
1170                'type': msg_obj.__class__,
1171                'msg_obj': vars(msg_obj)
1172            })
1173            return
1174
1175        # Pack file messages
1176        if msg_obj.__class__ is FileRequest:
1177            msg = self.pack_network_message(msg_obj)
1178
1179            if msg is None:
1180                return
1181
1182            conn_obj = self._conns[msg_obj.conn]
1183            conn_obj.filereq = msg_obj
1184            conn_obj.obuf.extend(msg)
1185
1186            self._callback_msgs.append(msg_obj)
1187
1188        elif msg_obj.__class__ is FileOffset:
1189            msg = self.pack_network_message(msg_obj)
1190
1191            if msg is None:
1192                return
1193
1194            conn_obj = self._conns[msg_obj.conn]
1195            conn_obj.bytestoread = msg_obj.filesize - msg_obj.offset
1196            conn_obj.obuf.extend(msg)
1197
1198        self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE)
1199
1200    """ Distributed Connection """
1201
1202    def process_distrib_input(self, conn, msg_buffer):
1203        """ We have a distributed network connection, parent has sent us
1204        something, this function retrieves messages
1205        from the msg_buffer, creates message objects and returns them
1206        and the rest of the msg_buffer.
1207        """
1208
1209        msg_buffer_mem = memoryview(msg_buffer)
1210        buffer_len = len(msg_buffer_mem)
1211        idx = 0
1212
1213        # Distributed messages are 5 bytes or greater in length
1214        while buffer_len >= 5:
1215            msgsize = UINT_UNPACK(msg_buffer_mem[idx:idx + 4])[0]
1216            msgsize_total = msgsize + 4
1217
1218            if msgsize_total > buffer_len or msgsize < 0:
1219                # Invalid message size or buffer is being filled
1220                break
1221
1222            msgtype = msg_buffer_mem[idx + 4]
1223
1224            # Unpack distributed messages
1225            if msgtype in self.distribclasses:
1226                msg = self.unpack_network_message(
1227                    self.distribclasses[msgtype], msg_buffer_mem[idx + 5:idx + msgsize_total], msgsize - 1,
1228                    "distrib", conn)
1229
1230                if msg is not None:
1231                    self._callback_msgs.append(msg)
1232
1233            else:
1234                log.add("Distrib message type %(type)i size %(size)i contents %(msg_buffer)s unknown",
1235                        {'type': msgtype, 'size': msgsize - 1, 'msg_buffer': msg_buffer[idx + 5:idx + msgsize_total]})
1236                self._callback_msgs.append(ConnClose(conn.conn, conn.addr))
1237                self.close_connection(self._conns, conn)
1238                break
1239
1240            idx += msgsize_total
1241            buffer_len -= msgsize_total
1242
1243        conn.ibuf = msg_buffer[idx:]
1244
1245    def process_distrib_output(self, msg_obj):
1246
1247        if msg_obj.conn not in self._conns:
1248            log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", {
1249                'type': msg_obj.__class__,
1250                'msg_obj': vars(msg_obj)
1251            })
1252            return
1253
1254        # Pack distributed messages
1255        msg = self.pack_network_message(msg_obj)
1256
1257        if msg is None:
1258            return
1259
1260        conn_obj = self._conns[msg_obj.conn]
1261        conn_obj.obuf.extend(UINT_PACK(len(msg) + 1))
1262        conn_obj.obuf.extend(bytes([self.distribcodes[msg_obj.__class__]]))
1263        conn_obj.obuf.extend(msg)
1264
1265        self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE)
1266
1267    """ Connection I/O """
1268
1269    def process_conn_input(self, connection, conn_obj):
1270
1271        if connection is self.server_socket:
1272            self.process_server_input(conn_obj, conn_obj.ibuf)
1273
1274        elif conn_obj.init is None:
1275            self.process_peer_init_input(conn_obj, conn_obj.ibuf)
1276
1277        elif conn_obj.init is not None and conn_obj.init.conn_type == 'P':
1278            self.process_peer_input(conn_obj, conn_obj.ibuf)
1279
1280        elif conn_obj.init is not None and conn_obj.init.conn_type == 'F':
1281            self.process_file_input(conn_obj, conn_obj.ibuf)
1282
1283        elif conn_obj.init is not None and conn_obj.init.conn_type == 'D':
1284            self.process_distrib_input(conn_obj, conn_obj.ibuf)
1285
1286        else:
1287            # Unknown message type
1288            log.add("Can't handle connection type %s", conn_obj.init.conn_type)
1289
1290    def process_conn_output(self):
1291        """ Processes messages sent by the main thread. queue holds the messages,
1292        conns and connsinprogress are dictionaries holding Connection and
1293        PeerConnectionInProgress messages. """
1294
1295        msg_list = self._queue.copy()
1296        self._queue.clear()
1297
1298        for msg_obj in msg_list:
1299            if self.server_disconnected:
1300                # Disconnected from server, stop processing queue
1301                return
1302
1303            msg_class = msg_obj.__class__
1304
1305            if issubclass(msg_class, PeerInitMessage):
1306                self.process_peer_init_output(msg_obj)
1307
1308            elif issubclass(msg_class, PeerMessage):
1309                self.process_peer_output(msg_obj)
1310
1311            elif msg_class is InitPeerConn:
1312                if self._numsockets < MAXSOCKETS:
1313                    self.init_peer_conn(msg_obj)
1314                else:
1315                    # Connection limit reached, re-queue
1316                    self._queue.append(msg_obj)
1317
1318            elif issubclass(msg_class, DistribMessage):
1319                self.process_distrib_output(msg_obj)
1320
1321            elif issubclass(msg_class, FileMessage):
1322                self.process_file_output(msg_obj)
1323
1324            elif issubclass(msg_class, ServerMessage):
1325                self.process_server_output(msg_obj)
1326
1327            elif msg_class is ConnClose and msg_obj.conn in self._conns:
1328                conn = msg_obj.conn
1329
1330                self._callback_msgs.append(ConnClose(conn, self._conns[conn].addr))
1331                self.close_connection(self._conns, conn)
1332
1333            elif msg_class is ConnCloseIP:
1334                self.close_connection_by_ip(msg_obj.addr)
1335
1336            elif msg_class is InitServerConn:
1337                if self._numsockets < MAXSOCKETS:
1338                    self.init_server_conn(msg_obj)
1339
1340            elif msg_class is DownloadFile and msg_obj.conn in self._conns:
1341                self._conns[msg_obj.conn].filedown = msg_obj
1342                self.total_downloads += 1
1343                self._calc_download_limit()
1344
1345            elif msg_class is UploadFile and msg_obj.conn in self._conns:
1346                self._conns[msg_obj.conn].fileupl = msg_obj
1347                self.total_uploads += 1
1348                self._calc_upload_limit_function()
1349
1350            elif msg_class is SetDownloadLimit:
1351                self._download_limit = msg_obj.limit * 1024
1352                self._calc_download_limit()
1353
1354            elif msg_class is SetUploadLimit:
1355                if msg_obj.uselimit:
1356                    if msg_obj.limitby:
1357                        self._calc_upload_limit_function = self._calc_upload_limit
1358                    else:
1359                        self._calc_upload_limit_function = self._calc_upload_limit_by_transfer
1360
1361                else:
1362                    self._calc_upload_limit_function = self._calc_upload_limit_none
1363
1364                self._upload_limit = msg_obj.limit * 1024
1365                self._calc_upload_limit_function()
1366
1367    def read_data(self, conn_obj):
1368
1369        connection = conn_obj.conn
1370
1371        # Check for a download limit
1372        if connection in self._dlimits:
1373            limit = self._dlimits[connection]
1374        else:
1375            limit = None
1376
1377        conn_obj.lastactive = time.time()
1378        data = connection.recv(conn_obj.lastreadlength)
1379        conn_obj.ibuf.extend(data)
1380
1381        if limit is None:
1382            # Unlimited download data
1383            if len(data) >= conn_obj.lastreadlength // 2:
1384                conn_obj.lastreadlength = conn_obj.lastreadlength * 2
1385
1386        else:
1387            # Speed Limited Download data (transfers)
1388            conn_obj.lastreadlength = limit
1389
1390        if not data:
1391            return False
1392
1393        return True
1394
1395    def write_data(self, conn_obj):
1396
1397        connection = conn_obj.conn
1398
1399        if connection in self._ulimits:
1400            limit = self._ulimits[connection]
1401        else:
1402            limit = None
1403
1404        conn_obj.lastactive = time.time()
1405
1406        if conn_obj.obuf:
1407            if limit is None:
1408                bytes_send = connection.send(conn_obj.obuf)
1409            else:
1410                bytes_send = connection.send(conn_obj.obuf[:limit])
1411
1412            conn_obj.obuf = conn_obj.obuf[bytes_send:]
1413        else:
1414            bytes_send = 0
1415
1416        if connection is self.server_socket:
1417            return
1418
1419        if conn_obj.fileupl is not None and conn_obj.fileupl.offset is not None:
1420            conn_obj.fileupl.sentbytes += bytes_send
1421
1422            totalsentbytes = conn_obj.fileupl.offset + conn_obj.fileupl.sentbytes + len(conn_obj.obuf)
1423
1424            try:
1425                size = conn_obj.fileupl.size
1426
1427                if totalsentbytes < size:
1428                    bytestoread = bytes_send * 2 - len(conn_obj.obuf) + 10 * 4024
1429
1430                    if bytestoread > 0:
1431                        read = conn_obj.fileupl.file.read(bytestoread)
1432                        conn_obj.obuf.extend(read)
1433
1434                        self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE)
1435
1436            except IOError as strerror:
1437                self._callback_msgs.append(FileError(conn_obj, conn_obj.fileupl.file, strerror))
1438                self._callback_msgs.append(ConnClose(connection, conn_obj.addr))
1439                self.close_connection(self._conns, connection)
1440
1441            except ValueError:
1442                pass
1443
1444            if bytes_send <= 0:
1445                return
1446
1447            curtime = time.time()
1448            finished = (conn_obj.fileupl.offset + conn_obj.fileupl.sentbytes == size)
1449
1450            if finished or (curtime - conn_obj.lastcallback) > 1:
1451                # We save resources by not sending data back to the NicotineCore
1452                # every time a part of a file is uploaded
1453
1454                self._callback_msgs.append(conn_obj.fileupl)
1455                conn_obj.lastcallback = curtime
1456
1457        if not conn_obj.obuf:
1458            # Nothing else to send, stop watching connection for writes
1459            self.modify_connection_events(conn_obj, selectors.EVENT_READ)
1460
1461    """ Networking Loop """
1462
1463    def run(self):
1464
1465        # Listen socket needs to be registered for selection here instead of __init__,
1466        # otherwise connections break on certain systems (OpenBSD confirmed)
1467        self.selector.register(self.listen_socket, selectors.EVENT_READ)
1468
1469        while not self._want_abort:
1470
1471            if self.server_disconnected:
1472                # We're not connected to the server at the moment
1473                time.sleep(0.1)
1474                continue
1475
1476            curtime = time.time()
1477
1478            # Send updated connection count to NicotineCore. Avoid sending too many
1479            # updates at once, if there are a lot of connections.
1480            if (curtime - self.last_conncount_callback) > self.CONNCOUNT_CALLBACK_INTERVAL:
1481                self._callback_msgs.append(SetCurrentConnectionCount(self._numsockets))
1482                self.last_conncount_callback = curtime
1483
1484            # Process outgoing messages
1485            if self._queue:
1486                self.process_conn_output()
1487
1488            # Check which connections are ready to send/receive data
1489            try:
1490                key_events = self.selector.select(timeout=-1)
1491                input_list = set(key.fileobj for key, event in key_events if event & selectors.EVENT_READ)
1492                output_list = set(key.fileobj for key, event in key_events if event & selectors.EVENT_WRITE)
1493
1494            except OSError as error:
1495                # Error recieved; terminate networking loop
1496
1497                log.add("Major Socket Error: Networking terminated! %s", error)
1498                self._want_abort = True
1499
1500            except ValueError as error:
1501                # Possibly opened too many sockets
1502
1503                log.add("select ValueError: %s", error)
1504                time.sleep(0.1)
1505
1506                self._callback_msgs.clear()
1507                continue
1508
1509            # Manage incoming connections to listen socket
1510            if self._numsockets < MAXSOCKETS and not self.server_disconnected and self.listen_socket in input_list:
1511                try:
1512                    incconn, incaddr = self.listen_socket.accept()
1513                except Exception:
1514                    time.sleep(0.01)
1515                else:
1516                    if self._network_filter.is_ip_blocked(incaddr[0]):
1517                        log.add_conn("Ignoring connection request from blocked IP address %(ip)s:%(port)s", {
1518                            'ip': incaddr[0],
1519                            'port': incaddr[1]
1520                        })
1521                        incconn.close()
1522
1523                    else:
1524                        events = selectors.EVENT_READ
1525                        incconn.setblocking(0)
1526
1527                        self._conns[incconn] = PeerConnection(conn=incconn, addr=incaddr, events=events)
1528                        self._numsockets += 1
1529                        self._callback_msgs.append(IncConn(incconn, incaddr))
1530
1531                        # Event flags are modified to include 'write' in subsequent loops, if necessary.
1532                        # Don't do it here, otherwise connections may break.
1533                        self.selector.register(incconn, events)
1534
1535            # Manage outgoing connections in progress
1536            for connection_in_progress in self._connsinprogress.copy():
1537                try:
1538                    conn_obj = self._connsinprogress[connection_in_progress]
1539
1540                except KeyError:
1541                    # Connection was removed, possibly disconnecting from the server
1542                    continue
1543
1544                msg_obj = conn_obj.msg_obj
1545
1546                if (curtime - conn_obj.lastactive) > self.IN_PROGRESS_STALE_AFTER:
1547                    # Connection failed
1548
1549                    self._callback_msgs.append(ConnectError(msg_obj, "Timed out"))
1550                    self.close_connection(self._connsinprogress, connection_in_progress)
1551                    continue
1552
1553                try:
1554                    if connection_in_progress in input_list:
1555                        # Check if the socket has any data for us
1556                        connection_in_progress.recv(1, socket.MSG_PEEK)
1557
1558                except socket.error as err:
1559                    self._callback_msgs.append(ConnectError(msg_obj, err))
1560                    self.close_connection(self._connsinprogress, connection_in_progress)
1561
1562                else:
1563                    if connection_in_progress in output_list:
1564                        # Connection has been established
1565
1566                        addr = msg_obj.addr
1567                        events = selectors.EVENT_READ | selectors.EVENT_WRITE
1568
1569                        if connection_in_progress is self.server_socket:
1570                            self._conns[self.server_socket] = Connection(
1571                                conn=self.server_socket, addr=addr, events=events)
1572
1573                            self._callback_msgs.append(InitServerConn(self.server_socket, addr))
1574
1575                        else:
1576                            if self._network_filter.is_ip_blocked(addr[0]):
1577                                log.add_conn("Ignoring connection request from blocked IP address %(ip)s:%(port)s", {
1578                                    "ip": addr[0],
1579                                    "port": addr[1]
1580                                })
1581                                self._callback_msgs.append(ConnectError(msg_obj, "Blocked IP address"))
1582                                self.close_connection(self._connsinprogress, connection_in_progress)
1583                                continue
1584
1585                            self._conns[connection_in_progress] = PeerConnection(
1586                                conn=connection_in_progress, addr=addr, events=events, init=msg_obj.init)
1587
1588                            self._callback_msgs.append(InitPeerConn(connection_in_progress, addr))
1589
1590                        del self._connsinprogress[connection_in_progress]
1591
1592            # Process read/write for active connections
1593            for connection in self._conns.copy():
1594                try:
1595                    conn_obj = self._conns[connection]
1596
1597                except KeyError:
1598                    # Connection was removed, possibly disconnecting from the server
1599                    continue
1600
1601                if (connection is not self.server_socket
1602                        and (curtime - conn_obj.lastactive) > self.CONNECTION_MAX_IDLE):
1603                    # No recent activity, peer connection is stale
1604
1605                    self._callback_msgs.append(ConnClose(connection, conn_obj.addr))
1606                    self.close_connection(self._conns, connection)
1607                    continue
1608
1609                if connection in input_list:
1610                    if self._is_download(conn_obj):
1611                        self.set_conn_speed_limit(connection, self._download_limit_split, self._dlimits)
1612
1613                    try:
1614                        if not self.read_data(conn_obj):
1615                            # No data received, socket was likely closed remotely
1616                            self._callback_msgs.append(ConnClose(connection, conn_obj.addr))
1617                            self.close_connection(self._conns, connection)
1618                            continue
1619
1620                    except socket.error as err:
1621                        log.add_conn(("Cannot read data from connection %(addr)s, closing connection. "
1622                                      "Error: %(error)s"), {
1623                            "addr": conn_obj.addr,
1624                            "error": err
1625                        })
1626                        self._callback_msgs.append(ConnClose(connection, conn_obj.addr))
1627                        self.close_connection(self._conns, connection)
1628                        continue
1629
1630                if conn_obj.ibuf:
1631                    self.process_conn_input(connection, conn_obj)
1632
1633                if connection in output_list:
1634                    if self._is_upload(conn_obj):
1635                        self.set_conn_speed_limit(connection, self._upload_limit_split, self._ulimits)
1636
1637                    try:
1638                        self.write_data(conn_obj)
1639
1640                    except Exception as err:
1641                        log.add_conn("Cannot write data to connection %(addr)s, closing connection. Error: %(error)s", {
1642                            "addr": conn_obj.addr,
1643                            "error": err
1644                        })
1645                        self._callback_msgs.append(ConnClose(connection, conn_obj.addr))
1646                        self.close_connection(self._conns, connection)
1647                        continue
1648
1649            # Inform the main thread
1650            if self._callback_msgs:
1651                self._core_callback(list(self._callback_msgs))
1652                self._callback_msgs.clear()
1653
1654            # Reset transfer speed limits
1655            self._ulimits = {}
1656            self._dlimits = {}
1657
1658            self._calc_loops_per_second()
1659
1660            # Don't exhaust the CPU
1661            time.sleep(1 / 60)
1662
1663        # Networking thread aborted
1664