1# COPYRIGHT (C) 2020-2021 Nicotine+ Team 2# COPYRIGHT (C) 2008-2012 Quinox <quinox@users.sf.net> 3# COPYRIGHT (C) 2007-2009 Daelstorm <daelstorm@gmail.com> 4# COPYRIGHT (C) 2003-2004 Hyriand <hyriand@thegraveyard.org> 5# COPYRIGHT (C) 2001-2003 Alexander Kanavin 6# 7# This program is free software: you can redistribute it and/or modify 8# it under the terms of the GNU General Public License as published by 9# the Free Software Foundation, either version 3 of the License, or 10# (at your option) any later version. 11# 12# This program is distributed in the hope that it will be useful, 13# but WITHOUT ANY WARRANTY; without even the implied warranty of 14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15# GNU General Public License for more details. 16# 17# You should have received a copy of the GNU General Public License 18# along with this program. If not, see <http://www.gnu.org/licenses/>. 19 20""" 21This module implements Soulseek networking protocol. 22""" 23 24import selectors 25import socket 26import struct 27import sys 28import threading 29import time 30 31from pynicotine.logfacility import log 32from pynicotine.slskmessages import AcceptChildren 33from pynicotine.slskmessages import AckNotifyPrivileges 34from pynicotine.slskmessages import AddThingIHate 35from pynicotine.slskmessages import AddThingILike 36from pynicotine.slskmessages import AddToPrivileged 37from pynicotine.slskmessages import AddUser 38from pynicotine.slskmessages import AdminCommand 39from pynicotine.slskmessages import AdminMessage 40from pynicotine.slskmessages import BranchLevel 41from pynicotine.slskmessages import BranchRoot 42from pynicotine.slskmessages import CantConnectToPeer 43from pynicotine.slskmessages import CantCreateRoom 44from pynicotine.slskmessages import ChangePassword 45from pynicotine.slskmessages import CheckPrivileges 46from pynicotine.slskmessages import ChildDepth 47from pynicotine.slskmessages import ConnClose 48from pynicotine.slskmessages import ConnCloseIP 49from pynicotine.slskmessages import ConnectError 50from pynicotine.slskmessages import ConnectToPeer 51from pynicotine.slskmessages import DistribAlive 52from pynicotine.slskmessages import DistribAliveInterval 53from pynicotine.slskmessages import DistribBranchLevel 54from pynicotine.slskmessages import DistribBranchRoot 55from pynicotine.slskmessages import DistribChildDepth 56from pynicotine.slskmessages import DistribEmbeddedMessage 57from pynicotine.slskmessages import DistribMessage 58from pynicotine.slskmessages import DistribSearch 59from pynicotine.slskmessages import DownloadFile 60from pynicotine.slskmessages import EmbeddedMessage 61from pynicotine.slskmessages import ExactFileSearch 62from pynicotine.slskmessages import FileError 63from pynicotine.slskmessages import FileMessage 64from pynicotine.slskmessages import FileOffset 65from pynicotine.slskmessages import FileRequest 66from pynicotine.slskmessages import FileSearch 67from pynicotine.slskmessages import FileSearchRequest 68from pynicotine.slskmessages import FileSearchResult 69from pynicotine.slskmessages import FolderContentsRequest 70from pynicotine.slskmessages import FolderContentsResponse 71from pynicotine.slskmessages import GetPeerAddress 72from pynicotine.slskmessages import GetSharedFileList 73from pynicotine.slskmessages import GetUserStats 74from pynicotine.slskmessages import GetUserStatus 75from pynicotine.slskmessages import GivePrivileges 76from pynicotine.slskmessages import GlobalRecommendations 77from pynicotine.slskmessages import GlobalUserList 78from pynicotine.slskmessages import HaveNoParent 79from pynicotine.slskmessages import IncConn 80from pynicotine.slskmessages import InitPeerConn 81from pynicotine.slskmessages import InitServerConn 82from pynicotine.slskmessages import ItemRecommendations 83from pynicotine.slskmessages import ItemSimilarUsers 84from pynicotine.slskmessages import JoinPublicRoom 85from pynicotine.slskmessages import JoinRoom 86from pynicotine.slskmessages import LeavePublicRoom 87from pynicotine.slskmessages import LeaveRoom 88from pynicotine.slskmessages import Login 89from pynicotine.slskmessages import MessageAcked 90from pynicotine.slskmessages import MessageProgress 91from pynicotine.slskmessages import MessageUser 92from pynicotine.slskmessages import MessageUsers 93from pynicotine.slskmessages import MinParentsInCache 94from pynicotine.slskmessages import PossibleParents 95from pynicotine.slskmessages import NotifyPrivileges 96from pynicotine.slskmessages import ParentInactivityTimeout 97from pynicotine.slskmessages import ParentMinSpeed 98from pynicotine.slskmessages import ParentSpeedRatio 99from pynicotine.slskmessages import PeerInit 100from pynicotine.slskmessages import PeerInitMessage 101from pynicotine.slskmessages import PeerMessage 102from pynicotine.slskmessages import PierceFireWall 103from pynicotine.slskmessages import PlaceholdUpload 104from pynicotine.slskmessages import PlaceInLineResponse 105from pynicotine.slskmessages import PlaceInQueue 106from pynicotine.slskmessages import PlaceInQueueRequest 107from pynicotine.slskmessages import PMessageUser 108from pynicotine.slskmessages import PrivateRoomAdded 109from pynicotine.slskmessages import PrivateRoomAddOperator 110from pynicotine.slskmessages import PrivateRoomAddUser 111from pynicotine.slskmessages import PrivateRoomDismember 112from pynicotine.slskmessages import PrivateRoomDisown 113from pynicotine.slskmessages import PrivateRoomOperatorAdded 114from pynicotine.slskmessages import PrivateRoomOperatorRemoved 115from pynicotine.slskmessages import PrivateRoomOwned 116from pynicotine.slskmessages import PrivateRoomRemoved 117from pynicotine.slskmessages import PrivateRoomRemoveOperator 118from pynicotine.slskmessages import PrivateRoomRemoveUser 119from pynicotine.slskmessages import PrivateRoomSomething 120from pynicotine.slskmessages import PrivateRoomToggle 121from pynicotine.slskmessages import PrivateRoomUsers 122from pynicotine.slskmessages import PrivilegedUsers 123from pynicotine.slskmessages import PublicRoomMessage 124from pynicotine.slskmessages import QueuedDownloads 125from pynicotine.slskmessages import UploadDenied 126from pynicotine.slskmessages import QueueUpload 127from pynicotine.slskmessages import Recommendations 128from pynicotine.slskmessages import RelatedSearch 129from pynicotine.slskmessages import Relogged 130from pynicotine.slskmessages import RemoveThingIHate 131from pynicotine.slskmessages import RemoveThingILike 132from pynicotine.slskmessages import RemoveUser 133from pynicotine.slskmessages import ResetDistributed 134from pynicotine.slskmessages import RoomAdded 135from pynicotine.slskmessages import RoomList 136from pynicotine.slskmessages import RoomRemoved 137from pynicotine.slskmessages import RoomSearch 138from pynicotine.slskmessages import RoomTickerAdd 139from pynicotine.slskmessages import RoomTickerRemove 140from pynicotine.slskmessages import RoomTickerSet 141from pynicotine.slskmessages import RoomTickerState 142from pynicotine.slskmessages import SayChatroom 143from pynicotine.slskmessages import SearchInactivityTimeout 144from pynicotine.slskmessages import SearchParent 145from pynicotine.slskmessages import SendConnectToken 146from pynicotine.slskmessages import SendDownloadSpeed 147from pynicotine.slskmessages import SendUploadSpeed 148from pynicotine.slskmessages import ServerMessage 149from pynicotine.slskmessages import ServerPing 150from pynicotine.slskmessages import SetCurrentConnectionCount 151from pynicotine.slskmessages import SetDownloadLimit 152from pynicotine.slskmessages import SetStatus 153from pynicotine.slskmessages import SetUploadLimit 154from pynicotine.slskmessages import SetWaitPort 155from pynicotine.slskmessages import SharedFileList 156from pynicotine.slskmessages import SharedFoldersFiles 157from pynicotine.slskmessages import SimilarUsers 158from pynicotine.slskmessages import TransferRequest 159from pynicotine.slskmessages import TransferResponse 160from pynicotine.slskmessages import TunneledMessage 161from pynicotine.slskmessages import UnknownPeerMessage 162from pynicotine.slskmessages import UploadFailed 163from pynicotine.slskmessages import UploadFile 164from pynicotine.slskmessages import UploadQueueNotification 165from pynicotine.slskmessages import UserInfoReply 166from pynicotine.slskmessages import UserInfoRequest 167from pynicotine.slskmessages import UserInterests 168from pynicotine.slskmessages import UserJoinedRoom 169from pynicotine.slskmessages import UserLeftRoom 170from pynicotine.slskmessages import UserPrivileged 171from pynicotine.slskmessages import UserSearch 172from pynicotine.slskmessages import WishlistInterval 173from pynicotine.slskmessages import WishlistSearch 174 175 176""" Set the maximum number of open files to the hard limit reported by the OS. 177Our MAXSOCKETS value needs to be lower than the file limit, otherwise our open 178sockets in combination with other file activity can exceed the file limit, 179effectively halting the program. """ 180 181if sys.platform == "win32": 182 183 """ For Windows, FD_SETSIZE is set to 512 in the Python source. 184 This limit is hardcoded, so we'll have to live with it for now. """ 185 186 MAXSOCKETS = 512 187else: 188 import resource # pylint: disable=import-error 189 190 if sys.platform == "darwin": 191 192 """ Maximum number of files a process can open is 10240 on macOS. 193 macOS reports INFINITE as hard limit, so we need this special case. """ 194 195 MAXFILELIMIT = 10240 196 else: 197 _SOFTLIMIT, MAXFILELIMIT = resource.getrlimit(resource.RLIMIT_NOFILE) 198 199 try: 200 resource.setrlimit(resource.RLIMIT_NOFILE, (MAXFILELIMIT, MAXFILELIMIT)) 201 202 except Exception as rlimit_error: 203 log.add("Failed to set RLIMIT_NOFILE: %s", rlimit_error) 204 205 """ Set the maximum number of open sockets to a lower value than the hard limit, 206 otherwise we just waste resources. 207 The maximum is 1024, but can be lower if the file limit is too low. """ 208 209 MAXSOCKETS = min(max(int(MAXFILELIMIT * 0.75), 50), 1024) 210 211UINT_UNPACK = struct.Struct("<I").unpack 212DOUBLE_UINT_UNPACK = struct.Struct("<II").unpack 213 214UINT_PACK = struct.Struct("<I").pack 215 216 217class Connection: 218 """ 219 Holds data about a connection. conn is a socket object, 220 addr is (ip, port) pair, ibuf and obuf are input and output msgBuffer, 221 init is a PeerInit object (see slskmessages docstrings). 222 """ 223 224 __slots__ = ("conn", "addr", "ibuf", "obuf", "events", "init", "lastactive", "lastreadlength") 225 226 def __init__(self, conn=None, addr=None, events=None): 227 self.conn = conn 228 self.addr = addr 229 self.events = events 230 self.ibuf = bytearray() 231 self.obuf = bytearray() 232 self.init = None 233 self.lastactive = time.time() 234 self.lastreadlength = 100 * 1024 235 236 237class PeerConnection(Connection): 238 239 __slots__ = ("filereq", "filedown", "fileupl", "filereadbytes", "bytestoread", "piercefw", 240 "lastcallback") 241 242 def __init__(self, conn=None, addr=None, events=None, init=None): 243 Connection.__init__(self, conn, addr, events) 244 self.filereq = None 245 self.filedown = None 246 self.fileupl = None 247 self.filereadbytes = 0 248 self.bytestoread = 0 249 self.init = init 250 self.piercefw = None 251 self.lastactive = time.time() 252 self.lastcallback = time.time() 253 254 255class PeerConnectionInProgress: 256 """ As all p2p connect()s are non-blocking, this class is used to 257 hold data about a connection that is not yet established. msgObj is 258 a message to be sent after the connection has been established. 259 """ 260 __slots__ = ("conn", "msg_obj", "lastactive") 261 262 def __init__(self, conn=None, msg_obj=None): 263 self.conn = conn 264 self.msg_obj = msg_obj 265 self.lastactive = time.time() 266 267 268class SlskProtoThread(threading.Thread): 269 """ This is a networking thread that actually does all the communication. 270 It sends data to the NicotineCore via a callback function and receives 271 data via a deque object. """ 272 273 """ Server and peers send each other small binary messages, that start 274 with length and message code followed by the actual message data. 275 These are the codes.""" 276 277 servercodes = { 278 Login: 1, 279 SetWaitPort: 2, 280 GetPeerAddress: 3, 281 AddUser: 5, 282 RemoveUser: 6, 283 GetUserStatus: 7, 284 SayChatroom: 13, 285 JoinRoom: 14, 286 LeaveRoom: 15, 287 UserJoinedRoom: 16, 288 UserLeftRoom: 17, 289 ConnectToPeer: 18, 290 MessageUser: 22, 291 MessageAcked: 23, 292 FileSearch: 26, 293 SetStatus: 28, 294 ServerPing: 32, # Deprecated 295 SendConnectToken: 33, # Obsolete 296 SendDownloadSpeed: 34, # Obsolete 297 SharedFoldersFiles: 35, 298 GetUserStats: 36, 299 QueuedDownloads: 40, # Obsolete 300 Relogged: 41, 301 UserSearch: 42, 302 AddThingILike: 51, # Deprecated 303 RemoveThingILike: 52, # Deprecated 304 Recommendations: 54, # Deprecated 305 GlobalRecommendations: 56, # Deprecated 306 UserInterests: 57, # Deprecated 307 AdminCommand: 58, # Obsolete 308 PlaceInLineResponse: 60, # Obsolete 309 RoomAdded: 62, # Obsolete 310 RoomRemoved: 63, # Obsolete 311 RoomList: 64, 312 ExactFileSearch: 65, # Obsolete 313 AdminMessage: 66, 314 GlobalUserList: 67, # Obsolete 315 TunneledMessage: 68, # Obsolete 316 PrivilegedUsers: 69, 317 HaveNoParent: 71, 318 SearchParent: 73, # Deprecated 319 ParentMinSpeed: 83, 320 ParentSpeedRatio: 84, 321 ParentInactivityTimeout: 86, # Obsolete 322 SearchInactivityTimeout: 87, # Obsolete 323 MinParentsInCache: 88, # Obsolete 324 DistribAliveInterval: 90, # Obsolete 325 AddToPrivileged: 91, # Obsolete 326 CheckPrivileges: 92, 327 EmbeddedMessage: 93, 328 AcceptChildren: 100, 329 PossibleParents: 102, 330 WishlistSearch: 103, 331 WishlistInterval: 104, 332 SimilarUsers: 110, # Deprecated 333 ItemRecommendations: 111, # Deprecated 334 ItemSimilarUsers: 112, # Deprecated 335 RoomTickerState: 113, 336 RoomTickerAdd: 114, 337 RoomTickerRemove: 115, 338 RoomTickerSet: 116, 339 AddThingIHate: 117, # Deprecated 340 RemoveThingIHate: 118, # Deprecated 341 RoomSearch: 120, 342 SendUploadSpeed: 121, 343 UserPrivileged: 122, # Deprecated 344 GivePrivileges: 123, 345 NotifyPrivileges: 124, # Deprecated 346 AckNotifyPrivileges: 125, # Deprecated 347 BranchLevel: 126, 348 BranchRoot: 127, 349 ChildDepth: 129, 350 ResetDistributed: 130, 351 PrivateRoomUsers: 133, 352 PrivateRoomAddUser: 134, 353 PrivateRoomRemoveUser: 135, 354 PrivateRoomDismember: 136, 355 PrivateRoomDisown: 137, 356 PrivateRoomSomething: 138, # Obsolete 357 PrivateRoomAdded: 139, 358 PrivateRoomRemoved: 140, 359 PrivateRoomToggle: 141, 360 ChangePassword: 142, 361 PrivateRoomAddOperator: 143, 362 PrivateRoomRemoveOperator: 144, 363 PrivateRoomOperatorAdded: 145, 364 PrivateRoomOperatorRemoved: 146, 365 PrivateRoomOwned: 148, 366 MessageUsers: 149, 367 JoinPublicRoom: 150, # Deprecated 368 LeavePublicRoom: 151, # Deprecated 369 PublicRoomMessage: 152, # Deprecated 370 RelatedSearch: 153, # Obsolete 371 CantConnectToPeer: 1001, 372 CantCreateRoom: 1003 373 } 374 375 peerinitcodes = { 376 PierceFireWall: 0, 377 PeerInit: 1 378 } 379 380 peercodes = { 381 GetSharedFileList: 4, 382 SharedFileList: 5, 383 FileSearchRequest: 8, # Obsolete 384 FileSearchResult: 9, 385 UserInfoRequest: 15, 386 UserInfoReply: 16, 387 PMessageUser: 22, # Deprecated 388 FolderContentsRequest: 36, 389 FolderContentsResponse: 37, 390 TransferRequest: 40, 391 TransferResponse: 41, 392 PlaceholdUpload: 42, # Obsolete 393 QueueUpload: 43, 394 PlaceInQueue: 44, 395 UploadFailed: 46, 396 UploadDenied: 50, 397 PlaceInQueueRequest: 51, 398 UploadQueueNotification: 52, # Deprecated 399 UnknownPeerMessage: 12547 400 } 401 402 distribcodes = { 403 DistribAlive: 0, 404 DistribSearch: 3, 405 DistribBranchLevel: 4, 406 DistribBranchRoot: 5, 407 DistribChildDepth: 7, # Deprecated 408 DistribEmbeddedMessage: 93 409 } 410 411 IN_PROGRESS_STALE_AFTER = 2 412 CONNECTION_MAX_IDLE = 60 413 CONNCOUNT_CALLBACK_INTERVAL = 0.5 414 415 def __init__(self, core_callback, queue, bindip, interface, port, port_range, network_filter, eventprocessor): 416 """ core_callback is a NicotineCore callback function to be called with messages 417 list as a parameter. queue is deque object that holds network messages from 418 NicotineCore. """ 419 420 threading.Thread.__init__(self) 421 422 self.name = "NetworkThread" 423 424 if sys.platform not in ("linux", "darwin"): 425 # TODO: support custom network interface for other systems than Linux and macOS 426 interface = None 427 428 self._core_callback = core_callback 429 self._queue = queue 430 self._callback_msgs = [] 431 self._want_abort = False 432 self.server_disconnected = True 433 self.bindip = bindip 434 self.listenport = None 435 self.portrange = (port, port) if port else port_range 436 self.interface = interface 437 self._network_filter = network_filter 438 self._eventprocessor = eventprocessor 439 440 self.serverclasses = {} 441 for code_class, code_id in self.servercodes.items(): 442 self.serverclasses[code_id] = code_class 443 444 self.peerinitclasses = {} 445 for code_class, code_id in self.peerinitcodes.items(): 446 self.peerinitclasses[code_id] = code_class 447 448 self.peerclasses = {} 449 for code_class, code_id in self.peercodes.items(): 450 self.peerclasses[code_id] = code_class 451 452 self.distribclasses = {} 453 for code_class, code_id in self.distribcodes.items(): 454 self.distribclasses[code_id] = code_class 455 456 # Select Networking Input and Output sockets 457 self.selector = selectors.DefaultSelector() 458 459 self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 460 self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 461 self.listen_socket.setblocking(0) 462 463 self.server_socket = None 464 self._numsockets = 1 465 466 self._conns = {} 467 self._connsinprogress = {} 468 self._calc_upload_limit_function = self._calc_upload_limit_none 469 self._upload_limit = 0 470 self._download_limit = 0 471 self._upload_limit_split = 0 472 self._download_limit_split = 0 473 self._ulimits = {} 474 self._dlimits = {} 475 self.total_uploads = 0 476 self.total_downloads = 0 477 self.last_conncount_callback = time.time() 478 self.last_cycle_time = time.time() 479 self.current_cycle_loop_count = 0 480 self.last_cycle_loop_count = 0 481 self.loops_per_second = 0 482 483 self.bind_listen_port() 484 485 self.daemon = True 486 self.start() 487 488 """ General """ 489 490 def validate_listen_port(self): 491 492 if self.listenport is not None: 493 return True 494 495 return False 496 497 def validate_network_interface(self): 498 499 try: 500 if self.interface and self.interface not in (name for _i, name in socket.if_nameindex()): 501 return False 502 503 except AttributeError: 504 pass 505 506 return True 507 508 @staticmethod 509 def get_interface_ip_address(if_name): 510 511 try: 512 import fcntl 513 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 514 515 ip_if = fcntl.ioctl(sock.fileno(), 516 0x8915, # SIOCGIFADDR 517 struct.pack('256s', if_name.encode()[:15])) 518 519 ip_address = socket.inet_ntoa(ip_if[20:24]) 520 521 except ImportError: 522 ip_address = None 523 524 return ip_address 525 526 def bind_to_network_interface(self, sock, if_name): 527 528 try: 529 if sys.platform == "linux": 530 sock.setsockopt(socket.SOL_SOCKET, 25, if_name.encode()) 531 self.bindip = None 532 return 533 534 if sys.platform == "darwin": 535 sock.setsockopt(socket.IPPROTO_IP, 25, socket.if_nametoindex(if_name)) 536 self.bindip = None 537 return 538 539 except PermissionError: 540 pass 541 542 # System does not support changing the network interface 543 # Retrieve the IP address of the interface, and bind to it instead 544 self.bindip = self.get_interface_ip_address(if_name) 545 546 def bind_listen_port(self): 547 548 if not self.validate_network_interface(): 549 return 550 551 if self.interface and not self.bindip: 552 self.bind_to_network_interface(self.listen_socket, self.interface) 553 554 ip_address = self.bindip or '' 555 556 for listenport in range(int(self.portrange[0]), int(self.portrange[1]) + 1): 557 try: 558 self.listen_socket.bind((ip_address, listenport)) 559 self.listen_socket.listen() 560 self.listenport = listenport 561 log.add(_("Listening on port: %i"), listenport) 562 log.add_debug("Maximum number of concurrent connections (sockets): %i", MAXSOCKETS) 563 break 564 565 except socket.error: 566 continue 567 568 def server_connect(self): 569 """ We've connected to the server """ 570 self.server_disconnected = False 571 572 def server_disconnect(self): 573 """ We've disconnected from the server, clean up """ 574 575 self.server_disconnected = True 576 self.server_socket = None 577 578 for connection in self._conns.copy(): 579 self.close_connection(self._conns, connection) 580 581 for connection in self._connsinprogress.copy(): 582 self.close_connection(self._connsinprogress, connection) 583 584 self._queue.clear() 585 586 if not self._want_abort: 587 self._callback_msgs.append(SetCurrentConnectionCount(0)) 588 589 def abort(self): 590 """ Call this to abort the thread """ 591 self._want_abort = True 592 593 self.selector.close() 594 self.server_disconnect() 595 596 """ File Transfers """ 597 598 @staticmethod 599 def _is_upload(conn): 600 return conn.__class__ is PeerConnection and conn.fileupl is not None 601 602 @staticmethod 603 def _is_download(conn): 604 return conn.__class__ is PeerConnection and conn.filedown is not None 605 606 def _calc_upload_limit(self, limit_disabled=False, limit_per_transfer=False): 607 608 limit = self._upload_limit 609 loop_limit = 1024 # 1 KB/s is the minimum upload speed per transfer 610 611 if limit_disabled or limit < loop_limit: 612 self._upload_limit_split = 0 613 return 614 615 if not limit_per_transfer and self.total_uploads > 1: 616 limit = limit // self.total_uploads 617 618 self._upload_limit_split = int(limit) 619 620 def _calc_upload_limit_by_transfer(self): 621 return self._calc_upload_limit(limit_per_transfer=True) 622 623 def _calc_upload_limit_none(self): 624 return self._calc_upload_limit(limit_disabled=True) 625 626 def _calc_download_limit(self): 627 628 limit = self._download_limit 629 loop_limit = 1024 # 1 KB/s is the minimum download speed per transfer 630 631 if limit < loop_limit: 632 # Download limit disabled 633 self._download_limit_split = 0 634 return 635 636 if self.total_downloads > 1: 637 limit = limit // self.total_downloads 638 639 self._download_limit_split = int(limit) 640 641 def _calc_loops_per_second(self): 642 """ Calculate number of loops per second. This value is used to split the 643 per-second transfer speed limit evenly for each loop. """ 644 645 curtime = time.time() 646 647 if curtime - self.last_cycle_time >= 1: 648 self.loops_per_second = (self.last_cycle_loop_count + self.current_cycle_loop_count) // 2 649 650 self.last_cycle_loop_count = self.current_cycle_loop_count 651 self.last_cycle_time = curtime 652 self.current_cycle_loop_count = 0 653 else: 654 self.current_cycle_loop_count = self.current_cycle_loop_count + 1 655 656 def set_conn_speed_limit(self, connection, limit, limits): 657 658 limit = limit // (self.loops_per_second or 1) 659 660 if limit > 0: 661 limits[connection] = limit 662 663 """ Connections """ 664 665 def socket_still_active(self, conn): 666 667 try: 668 connection = self._conns[conn] 669 670 except KeyError: 671 return False 672 673 return len(connection.obuf) > 0 or len(connection.ibuf) > 0 674 675 @staticmethod 676 def pack_network_message(msg_obj): 677 678 try: 679 return msg_obj.make_network_message() 680 681 except Exception: 682 from traceback import format_exc 683 log.add(("Unable to pack message type %(msg_type)s. %(error)s"), 684 {'msg_type': msg_obj.__class__, 'error': format_exc()}) 685 686 return None 687 688 @staticmethod 689 def unpack_network_message(msg_class, msg_buffer, msg_size, conn_type, conn=None): 690 691 try: 692 if conn is not None: 693 msg = msg_class(conn) 694 else: 695 msg = msg_class() 696 697 msg.parse_network_message(msg_buffer) 698 return msg 699 700 except Exception as error: 701 log.add(("Unable to parse %(conn_type)s message type %(msg_type)s size %(size)i " 702 "contents %(msg_buffer)s: %(error)s"), 703 {'conn_type': conn_type, 'msg_type': msg_class, 'size': msg_size, 704 'msg_buffer': msg_buffer, 'error': error}) 705 706 return None 707 708 def modify_connection_events(self, conn_obj, events): 709 710 if conn_obj.events != events: 711 self.selector.modify(conn_obj.conn, events) 712 conn_obj.events = events 713 714 def close_connection(self, connection_list, connection): 715 716 if connection not in connection_list: 717 # Already removed 718 return 719 720 conn_obj = connection_list[connection] 721 722 if self._is_download(conn_obj): 723 self.total_downloads -= 1 724 self._calc_download_limit() 725 726 elif self._is_upload(conn_obj): 727 self.total_uploads -= 1 728 self._calc_upload_limit_function() 729 730 # If we're shutting down, we've already closed the selector in abort() 731 if not self._want_abort: 732 self.selector.unregister(connection) 733 734 connection.close() 735 del connection_list[connection] 736 self._numsockets -= 1 737 738 if connection is self.server_socket: 739 # Disconnected from server, clean up connections and queue 740 self.server_disconnect() 741 742 def close_connection_by_ip(self, ip_address): 743 744 for connection in self._conns.copy(): 745 conn_obj = self._conns.get(connection) 746 747 if not conn_obj or connection is self.server_socket: 748 continue 749 750 addr = conn_obj.addr 751 752 if ip_address == addr[0]: 753 log.add_conn("Blocking peer connection to IP address %(ip)s:%(port)s", { 754 "ip": addr[0], 755 "port": addr[1] 756 }) 757 self._callback_msgs.append(ConnClose(connection, addr)) 758 self.close_connection(self._conns, connection) 759 760 """ Server Connection """ 761 762 @staticmethod 763 def set_server_socket_keepalive(server_socket, idle=10, interval=4, count=10): 764 """ Ensure we are disconnected from the server in case of connectivity issues, 765 by sending TCP keepalive pings. Assuming default values are used, once we reach 766 10 seconds of idle time, we start sending keepalive pings once every 4 seconds. 767 If 10 failed pings have been sent in a row (40 seconds), the connection is presumed 768 dead. """ 769 770 if hasattr(socket, 'SO_KEEPALIVE'): 771 server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # pylint: disable=maybe-no-member 772 773 if hasattr(socket, 'TCP_KEEPINTVL'): 774 server_socket.setsockopt(socket.IPPROTO_TCP, 775 socket.TCP_KEEPINTVL, interval) # pylint: disable=maybe-no-member 776 777 if hasattr(socket, 'TCP_KEEPCNT'): 778 server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, count) # pylint: disable=maybe-no-member 779 780 if hasattr(socket, 'TCP_KEEPIDLE'): 781 server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle) # pylint: disable=maybe-no-member 782 783 elif hasattr(socket, 'TCP_KEEPALIVE'): 784 # macOS fallback 785 786 server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, idle) # pylint: disable=maybe-no-member 787 788 elif hasattr(socket, 'SIO_KEEPALIVE_VALS'): 789 """ Windows fallback 790 Probe count is set to 10 on a system level, and can't be modified. 791 https://docs.microsoft.com/en-us/windows/win32/winsock/so-keepalive """ 792 793 server_socket.ioctl( 794 socket.SIO_KEEPALIVE_VALS, # pylint: disable=maybe-no-member 795 ( 796 1, 797 idle * 1000, 798 interval * 1000 799 ) 800 ) 801 802 def init_server_conn(self, msg_obj): 803 804 try: 805 self.server_socket = server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 806 server_socket.setblocking(0) 807 808 # Detect if our connection to the server is still alive 809 self.set_server_socket_keepalive(server_socket) 810 811 if self.bindip: 812 server_socket.bind((self.bindip, 0)) 813 814 elif self.interface: 815 self.bind_to_network_interface(server_socket, self.interface) 816 817 server_socket.connect_ex(msg_obj.addr) 818 819 self.selector.register(server_socket, selectors.EVENT_READ | selectors.EVENT_WRITE) 820 self._connsinprogress[server_socket] = PeerConnectionInProgress(server_socket, msg_obj) 821 self._numsockets += 1 822 823 except socket.error as err: 824 self._callback_msgs.append(ConnectError(msg_obj, err)) 825 server_socket.close() 826 self.server_disconnect() 827 828 def process_server_input(self, conn, msg_buffer): 829 """ Server has sent us something, this function retrieves messages 830 from the msg_buffer, creates message objects and returns them and the rest 831 of the msg_buffer. 832 """ 833 834 msg_buffer_mem = memoryview(msg_buffer) 835 buffer_len = len(msg_buffer_mem) 836 idx = 0 837 838 # Server messages are 8 bytes or greater in length 839 while buffer_len >= 8: 840 msgsize, msgtype = DOUBLE_UINT_UNPACK(msg_buffer_mem[idx:idx + 8]) 841 msgsize_total = msgsize + 4 842 843 if msgsize_total > buffer_len or msgsize < 0: 844 # Invalid message size or buffer is being filled 845 break 846 847 # Unpack server messages 848 if msgtype in self.serverclasses: 849 msg = self.unpack_network_message( 850 self.serverclasses[msgtype], msg_buffer_mem[idx + 8:idx + msgsize_total], msgsize - 4, "server") 851 852 if msg is not None: 853 self._callback_msgs.append(msg) 854 855 else: 856 log.add("Server message type %(type)i size %(size)i contents %(msg_buffer)s unknown", 857 {'type': msgtype, 'size': msgsize - 4, 'msg_buffer': msg_buffer[idx + 8:idx + msgsize_total]}) 858 859 idx += msgsize_total 860 buffer_len -= msgsize_total 861 862 conn.ibuf = msg_buffer[idx:] 863 864 def process_server_output(self, msg_obj): 865 866 if self.server_socket not in self._conns: 867 log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", { 868 'type': msg_obj.__class__, 869 'msg_obj': vars(msg_obj) 870 }) 871 return 872 873 msg = self.pack_network_message(msg_obj) 874 875 if msg is None: 876 return 877 878 conn_obj = self._conns[self.server_socket] 879 conn_obj.obuf.extend(UINT_PACK(len(msg) + 4)) 880 conn_obj.obuf.extend(UINT_PACK(self.servercodes[msg_obj.__class__])) 881 conn_obj.obuf.extend(msg) 882 883 self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE) 884 885 """ Peer Init """ 886 887 def process_peer_init_input(self, conn, msg_buffer): 888 889 msg_buffer_mem = memoryview(msg_buffer) 890 buffer_len = len(msg_buffer_mem) 891 idx = 0 892 893 # Peer init messages are 8 bytes or greater in length 894 while buffer_len >= 8 and conn.init is None: 895 msgsize = UINT_UNPACK(msg_buffer_mem[idx:idx + 4])[0] 896 msgsize_total = msgsize + 4 897 898 if msgsize_total > buffer_len or msgsize < 0: 899 # Invalid message size or buffer is being filled 900 break 901 902 msgtype = msg_buffer_mem[idx + 4] 903 904 # Unpack peer init messages 905 if msgtype in self.peerinitclasses: 906 msg = self.unpack_network_message( 907 self.peerinitclasses[msgtype], msg_buffer_mem[idx + 5:idx + msgsize_total], msgsize - 1, 908 "peer init", conn) 909 910 if msg is not None: 911 if self.peerinitclasses[msgtype] is PierceFireWall: 912 conn.piercefw = msg 913 914 elif self.peerinitclasses[msgtype] is PeerInit: 915 conn.init = msg 916 917 self._callback_msgs.append(msg) 918 919 else: 920 if conn.piercefw is None: 921 log.add("Peer init message type %(type)i size %(size)i contents %(msg_buffer)s unknown", 922 {'type': msgtype, 'size': msgsize - 1, 923 'msg_buffer': msg_buffer[idx + 5:idx + msgsize_total]}) 924 925 self._callback_msgs.append(ConnClose(conn.conn, conn.addr)) 926 self.close_connection(self._conns, conn) 927 928 break 929 930 idx += msgsize_total 931 buffer_len -= msgsize_total 932 933 conn.ibuf = msg_buffer[idx:] 934 935 def process_peer_init_output(self, msg_obj): 936 937 if msg_obj.conn not in self._conns: 938 log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", { 939 'type': msg_obj.__class__, 940 'msg_obj': vars(msg_obj) 941 }) 942 return 943 944 # Pack peer init messages 945 if msg_obj.__class__ is PierceFireWall: 946 conn_obj = self._conns[msg_obj.conn] 947 msg = self.pack_network_message(msg_obj) 948 949 if msg is None: 950 return 951 952 conn_obj.piercefw = msg_obj 953 954 conn_obj.obuf.extend(UINT_PACK(len(msg) + 1)) 955 conn_obj.obuf.extend(bytes([self.peerinitcodes[msg_obj.__class__]])) 956 conn_obj.obuf.extend(msg) 957 958 elif msg_obj.__class__ is PeerInit: 959 conn_obj = self._conns[msg_obj.conn] 960 msg = self.pack_network_message(msg_obj) 961 962 if msg is None: 963 return 964 965 conn_obj.init = msg_obj 966 967 if conn_obj.piercefw is not None: 968 return 969 970 conn_obj.obuf.extend(UINT_PACK(len(msg) + 1)) 971 conn_obj.obuf.extend(bytes([self.peerinitcodes[msg_obj.__class__]])) 972 conn_obj.obuf.extend(msg) 973 974 self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE) 975 976 """ Peer Connection """ 977 978 def init_peer_conn(self, msg_obj): 979 980 try: 981 conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 982 conn.setblocking(0) 983 984 if self.bindip: 985 conn.bind((self.bindip, 0)) 986 987 elif self.interface: 988 self.bind_to_network_interface(conn, self.interface) 989 990 conn.connect_ex(msg_obj.addr) 991 992 self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE) 993 self._connsinprogress[conn] = PeerConnectionInProgress(conn, msg_obj) 994 self._numsockets += 1 995 996 except socket.error as err: 997 self._callback_msgs.append(ConnectError(msg_obj, err)) 998 conn.close() 999 1000 def process_peer_input(self, conn, msg_buffer): 1001 """ We have a "P" connection (p2p exchange), peer has sent us 1002 something, this function retrieves messages 1003 from the msg_buffer, creates message objects and returns them 1004 and the rest of the msg_buffer. 1005 """ 1006 1007 msg_buffer_mem = memoryview(msg_buffer) 1008 buffer_len = len(msg_buffer_mem) 1009 idx = 0 1010 search_result_received = False 1011 1012 # Peer messages are 8 bytes or greater in length 1013 while buffer_len >= 8: 1014 msgsize, msgtype = DOUBLE_UINT_UNPACK(msg_buffer_mem[idx:idx + 8]) 1015 msgsize_total = msgsize + 4 1016 1017 try: 1018 peer_class = self.peerclasses[msgtype] 1019 1020 if peer_class in (SharedFileList, UserInfoReply): 1021 # Send progress to the main thread 1022 self._callback_msgs.append( 1023 MessageProgress(conn.init.target_user, peer_class, buffer_len, msgsize_total)) 1024 1025 except KeyError: 1026 pass 1027 1028 if msgsize_total > buffer_len or msgsize < 0: 1029 # Invalid message size or buffer is being filled 1030 break 1031 1032 # Unpack peer messages 1033 if msgtype in self.peerclasses: 1034 msg = self.unpack_network_message( 1035 self.peerclasses[msgtype], msg_buffer_mem[idx + 8:idx + msgsize_total], msgsize - 4, "peer", conn) 1036 1037 if msg.__class__ is FileSearchResult: 1038 search_result_received = True 1039 1040 if msg is not None: 1041 self._callback_msgs.append(msg) 1042 1043 else: 1044 host = port = "unknown" 1045 1046 if conn.init.conn is not None and conn.addr is not None: 1047 host = conn.addr[0] 1048 port = conn.addr[1] 1049 1050 log.add(("Peer message type %(type)s size %(size)i contents %(msg_buffer)s unknown, " 1051 "from user: %(user)s, %(host)s:%(port)s"), 1052 {'type': msgtype, 'size': msgsize - 4, 'msg_buffer': msg_buffer[idx + 8:idx + msgsize_total], 1053 'user': conn.init.target_user, 'host': host, 'port': port}) 1054 1055 idx += msgsize_total 1056 buffer_len -= msgsize_total 1057 1058 conn.ibuf = msg_buffer[idx:] 1059 1060 if search_result_received and not self.socket_still_active(conn.conn): 1061 # Forcibly close peer connection. Only used after receiving a search result, 1062 # as we need to get rid of peer connections before they pile up. 1063 1064 self._callback_msgs.append(ConnClose(conn.conn, conn.addr)) 1065 self.close_connection(self._conns, conn.conn) 1066 1067 def process_peer_output(self, msg_obj): 1068 1069 if msg_obj.conn not in self._conns: 1070 log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", { 1071 'type': msg_obj.__class__, 1072 'msg_obj': vars(msg_obj) 1073 }) 1074 return 1075 1076 # Pack peer messages 1077 msg = self.pack_network_message(msg_obj) 1078 1079 if msg is None: 1080 return 1081 1082 conn_obj = self._conns[msg_obj.conn] 1083 conn_obj.obuf.extend(UINT_PACK(len(msg) + 4)) 1084 conn_obj.obuf.extend(UINT_PACK(self.peercodes[msg_obj.__class__])) 1085 conn_obj.obuf.extend(msg) 1086 1087 self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE) 1088 1089 """ File Connection """ 1090 1091 def process_file_input(self, conn, msg_buffer): 1092 """ We have a "F" connection (filetransfer), peer has sent us 1093 something, this function retrieves messages 1094 from the msg_buffer, creates message objects and returns them 1095 and the rest of the msg_buffer. 1096 """ 1097 1098 if conn.filereq is None: 1099 msgsize = 4 1100 msg = self.unpack_network_message(FileRequest, msg_buffer[:msgsize], msgsize, "file", conn.conn) 1101 1102 if msg is not None and msg.req is not None: 1103 self._callback_msgs.append(msg) 1104 conn.filereq = msg 1105 1106 msg_buffer = msg_buffer[msgsize:] 1107 1108 elif conn.filedown is not None: 1109 leftbytes = conn.bytestoread - conn.filereadbytes 1110 addedbytes = msg_buffer[:leftbytes] 1111 1112 if leftbytes > 0: 1113 try: 1114 conn.filedown.file.write(addedbytes) 1115 1116 except IOError as strerror: 1117 self._callback_msgs.append(FileError(conn, conn.filedown.file, strerror)) 1118 self._callback_msgs.append(ConnClose(conn.conn, conn.addr)) 1119 self.close_connection(self._conns, conn.conn) 1120 1121 except ValueError: 1122 pass 1123 1124 addedbyteslen = len(addedbytes) 1125 curtime = time.time() 1126 finished = ((leftbytes - addedbyteslen) == 0) 1127 1128 if finished or (curtime - conn.lastcallback) > 1: 1129 # We save resources by not sending data back to the NicotineCore 1130 # every time a part of a file is downloaded 1131 1132 self._callback_msgs.append(DownloadFile(conn.conn, conn.filedown.file)) 1133 conn.lastcallback = curtime 1134 1135 if finished: 1136 self._callback_msgs.append(ConnClose(conn.conn, conn.addr)) 1137 self.close_connection(self._conns, conn.conn) 1138 1139 conn.filereadbytes += addedbyteslen 1140 msg_buffer = msg_buffer[leftbytes:] 1141 1142 elif conn.fileupl is not None and conn.fileupl.offset is None: 1143 msgsize = 8 1144 msg = self.unpack_network_message(FileOffset, msg_buffer[:msgsize], msgsize, "file", conn) 1145 1146 if msg is not None and msg.offset is not None: 1147 try: 1148 conn.fileupl.file.seek(msg.offset) 1149 self.modify_connection_events(conn, selectors.EVENT_READ | selectors.EVENT_WRITE) 1150 1151 except IOError as strerror: 1152 self._callback_msgs.append(FileError(conn, conn.fileupl.file, strerror)) 1153 self._callback_msgs.append(ConnClose(conn.conn, conn.addr)) 1154 self.close_connection(self._conns, conn.conn) 1155 1156 except ValueError: 1157 pass 1158 1159 conn.fileupl.offset = msg.offset 1160 self._callback_msgs.append(conn.fileupl) 1161 1162 msg_buffer = msg_buffer[msgsize:] 1163 1164 conn.ibuf = msg_buffer 1165 1166 def process_file_output(self, msg_obj): 1167 1168 if msg_obj.conn not in self._conns: 1169 log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", { 1170 'type': msg_obj.__class__, 1171 'msg_obj': vars(msg_obj) 1172 }) 1173 return 1174 1175 # Pack file messages 1176 if msg_obj.__class__ is FileRequest: 1177 msg = self.pack_network_message(msg_obj) 1178 1179 if msg is None: 1180 return 1181 1182 conn_obj = self._conns[msg_obj.conn] 1183 conn_obj.filereq = msg_obj 1184 conn_obj.obuf.extend(msg) 1185 1186 self._callback_msgs.append(msg_obj) 1187 1188 elif msg_obj.__class__ is FileOffset: 1189 msg = self.pack_network_message(msg_obj) 1190 1191 if msg is None: 1192 return 1193 1194 conn_obj = self._conns[msg_obj.conn] 1195 conn_obj.bytestoread = msg_obj.filesize - msg_obj.offset 1196 conn_obj.obuf.extend(msg) 1197 1198 self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE) 1199 1200 """ Distributed Connection """ 1201 1202 def process_distrib_input(self, conn, msg_buffer): 1203 """ We have a distributed network connection, parent has sent us 1204 something, this function retrieves messages 1205 from the msg_buffer, creates message objects and returns them 1206 and the rest of the msg_buffer. 1207 """ 1208 1209 msg_buffer_mem = memoryview(msg_buffer) 1210 buffer_len = len(msg_buffer_mem) 1211 idx = 0 1212 1213 # Distributed messages are 5 bytes or greater in length 1214 while buffer_len >= 5: 1215 msgsize = UINT_UNPACK(msg_buffer_mem[idx:idx + 4])[0] 1216 msgsize_total = msgsize + 4 1217 1218 if msgsize_total > buffer_len or msgsize < 0: 1219 # Invalid message size or buffer is being filled 1220 break 1221 1222 msgtype = msg_buffer_mem[idx + 4] 1223 1224 # Unpack distributed messages 1225 if msgtype in self.distribclasses: 1226 msg = self.unpack_network_message( 1227 self.distribclasses[msgtype], msg_buffer_mem[idx + 5:idx + msgsize_total], msgsize - 1, 1228 "distrib", conn) 1229 1230 if msg is not None: 1231 self._callback_msgs.append(msg) 1232 1233 else: 1234 log.add("Distrib message type %(type)i size %(size)i contents %(msg_buffer)s unknown", 1235 {'type': msgtype, 'size': msgsize - 1, 'msg_buffer': msg_buffer[idx + 5:idx + msgsize_total]}) 1236 self._callback_msgs.append(ConnClose(conn.conn, conn.addr)) 1237 self.close_connection(self._conns, conn) 1238 break 1239 1240 idx += msgsize_total 1241 buffer_len -= msgsize_total 1242 1243 conn.ibuf = msg_buffer[idx:] 1244 1245 def process_distrib_output(self, msg_obj): 1246 1247 if msg_obj.conn not in self._conns: 1248 log.add_conn("Can't send the message over the closed connection: %(type)s %(msg_obj)s", { 1249 'type': msg_obj.__class__, 1250 'msg_obj': vars(msg_obj) 1251 }) 1252 return 1253 1254 # Pack distributed messages 1255 msg = self.pack_network_message(msg_obj) 1256 1257 if msg is None: 1258 return 1259 1260 conn_obj = self._conns[msg_obj.conn] 1261 conn_obj.obuf.extend(UINT_PACK(len(msg) + 1)) 1262 conn_obj.obuf.extend(bytes([self.distribcodes[msg_obj.__class__]])) 1263 conn_obj.obuf.extend(msg) 1264 1265 self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE) 1266 1267 """ Connection I/O """ 1268 1269 def process_conn_input(self, connection, conn_obj): 1270 1271 if connection is self.server_socket: 1272 self.process_server_input(conn_obj, conn_obj.ibuf) 1273 1274 elif conn_obj.init is None: 1275 self.process_peer_init_input(conn_obj, conn_obj.ibuf) 1276 1277 elif conn_obj.init is not None and conn_obj.init.conn_type == 'P': 1278 self.process_peer_input(conn_obj, conn_obj.ibuf) 1279 1280 elif conn_obj.init is not None and conn_obj.init.conn_type == 'F': 1281 self.process_file_input(conn_obj, conn_obj.ibuf) 1282 1283 elif conn_obj.init is not None and conn_obj.init.conn_type == 'D': 1284 self.process_distrib_input(conn_obj, conn_obj.ibuf) 1285 1286 else: 1287 # Unknown message type 1288 log.add("Can't handle connection type %s", conn_obj.init.conn_type) 1289 1290 def process_conn_output(self): 1291 """ Processes messages sent by the main thread. queue holds the messages, 1292 conns and connsinprogress are dictionaries holding Connection and 1293 PeerConnectionInProgress messages. """ 1294 1295 msg_list = self._queue.copy() 1296 self._queue.clear() 1297 1298 for msg_obj in msg_list: 1299 if self.server_disconnected: 1300 # Disconnected from server, stop processing queue 1301 return 1302 1303 msg_class = msg_obj.__class__ 1304 1305 if issubclass(msg_class, PeerInitMessage): 1306 self.process_peer_init_output(msg_obj) 1307 1308 elif issubclass(msg_class, PeerMessage): 1309 self.process_peer_output(msg_obj) 1310 1311 elif msg_class is InitPeerConn: 1312 if self._numsockets < MAXSOCKETS: 1313 self.init_peer_conn(msg_obj) 1314 else: 1315 # Connection limit reached, re-queue 1316 self._queue.append(msg_obj) 1317 1318 elif issubclass(msg_class, DistribMessage): 1319 self.process_distrib_output(msg_obj) 1320 1321 elif issubclass(msg_class, FileMessage): 1322 self.process_file_output(msg_obj) 1323 1324 elif issubclass(msg_class, ServerMessage): 1325 self.process_server_output(msg_obj) 1326 1327 elif msg_class is ConnClose and msg_obj.conn in self._conns: 1328 conn = msg_obj.conn 1329 1330 self._callback_msgs.append(ConnClose(conn, self._conns[conn].addr)) 1331 self.close_connection(self._conns, conn) 1332 1333 elif msg_class is ConnCloseIP: 1334 self.close_connection_by_ip(msg_obj.addr) 1335 1336 elif msg_class is InitServerConn: 1337 if self._numsockets < MAXSOCKETS: 1338 self.init_server_conn(msg_obj) 1339 1340 elif msg_class is DownloadFile and msg_obj.conn in self._conns: 1341 self._conns[msg_obj.conn].filedown = msg_obj 1342 self.total_downloads += 1 1343 self._calc_download_limit() 1344 1345 elif msg_class is UploadFile and msg_obj.conn in self._conns: 1346 self._conns[msg_obj.conn].fileupl = msg_obj 1347 self.total_uploads += 1 1348 self._calc_upload_limit_function() 1349 1350 elif msg_class is SetDownloadLimit: 1351 self._download_limit = msg_obj.limit * 1024 1352 self._calc_download_limit() 1353 1354 elif msg_class is SetUploadLimit: 1355 if msg_obj.uselimit: 1356 if msg_obj.limitby: 1357 self._calc_upload_limit_function = self._calc_upload_limit 1358 else: 1359 self._calc_upload_limit_function = self._calc_upload_limit_by_transfer 1360 1361 else: 1362 self._calc_upload_limit_function = self._calc_upload_limit_none 1363 1364 self._upload_limit = msg_obj.limit * 1024 1365 self._calc_upload_limit_function() 1366 1367 def read_data(self, conn_obj): 1368 1369 connection = conn_obj.conn 1370 1371 # Check for a download limit 1372 if connection in self._dlimits: 1373 limit = self._dlimits[connection] 1374 else: 1375 limit = None 1376 1377 conn_obj.lastactive = time.time() 1378 data = connection.recv(conn_obj.lastreadlength) 1379 conn_obj.ibuf.extend(data) 1380 1381 if limit is None: 1382 # Unlimited download data 1383 if len(data) >= conn_obj.lastreadlength // 2: 1384 conn_obj.lastreadlength = conn_obj.lastreadlength * 2 1385 1386 else: 1387 # Speed Limited Download data (transfers) 1388 conn_obj.lastreadlength = limit 1389 1390 if not data: 1391 return False 1392 1393 return True 1394 1395 def write_data(self, conn_obj): 1396 1397 connection = conn_obj.conn 1398 1399 if connection in self._ulimits: 1400 limit = self._ulimits[connection] 1401 else: 1402 limit = None 1403 1404 conn_obj.lastactive = time.time() 1405 1406 if conn_obj.obuf: 1407 if limit is None: 1408 bytes_send = connection.send(conn_obj.obuf) 1409 else: 1410 bytes_send = connection.send(conn_obj.obuf[:limit]) 1411 1412 conn_obj.obuf = conn_obj.obuf[bytes_send:] 1413 else: 1414 bytes_send = 0 1415 1416 if connection is self.server_socket: 1417 return 1418 1419 if conn_obj.fileupl is not None and conn_obj.fileupl.offset is not None: 1420 conn_obj.fileupl.sentbytes += bytes_send 1421 1422 totalsentbytes = conn_obj.fileupl.offset + conn_obj.fileupl.sentbytes + len(conn_obj.obuf) 1423 1424 try: 1425 size = conn_obj.fileupl.size 1426 1427 if totalsentbytes < size: 1428 bytestoread = bytes_send * 2 - len(conn_obj.obuf) + 10 * 4024 1429 1430 if bytestoread > 0: 1431 read = conn_obj.fileupl.file.read(bytestoread) 1432 conn_obj.obuf.extend(read) 1433 1434 self.modify_connection_events(conn_obj, selectors.EVENT_READ | selectors.EVENT_WRITE) 1435 1436 except IOError as strerror: 1437 self._callback_msgs.append(FileError(conn_obj, conn_obj.fileupl.file, strerror)) 1438 self._callback_msgs.append(ConnClose(connection, conn_obj.addr)) 1439 self.close_connection(self._conns, connection) 1440 1441 except ValueError: 1442 pass 1443 1444 if bytes_send <= 0: 1445 return 1446 1447 curtime = time.time() 1448 finished = (conn_obj.fileupl.offset + conn_obj.fileupl.sentbytes == size) 1449 1450 if finished or (curtime - conn_obj.lastcallback) > 1: 1451 # We save resources by not sending data back to the NicotineCore 1452 # every time a part of a file is uploaded 1453 1454 self._callback_msgs.append(conn_obj.fileupl) 1455 conn_obj.lastcallback = curtime 1456 1457 if not conn_obj.obuf: 1458 # Nothing else to send, stop watching connection for writes 1459 self.modify_connection_events(conn_obj, selectors.EVENT_READ) 1460 1461 """ Networking Loop """ 1462 1463 def run(self): 1464 1465 # Listen socket needs to be registered for selection here instead of __init__, 1466 # otherwise connections break on certain systems (OpenBSD confirmed) 1467 self.selector.register(self.listen_socket, selectors.EVENT_READ) 1468 1469 while not self._want_abort: 1470 1471 if self.server_disconnected: 1472 # We're not connected to the server at the moment 1473 time.sleep(0.1) 1474 continue 1475 1476 curtime = time.time() 1477 1478 # Send updated connection count to NicotineCore. Avoid sending too many 1479 # updates at once, if there are a lot of connections. 1480 if (curtime - self.last_conncount_callback) > self.CONNCOUNT_CALLBACK_INTERVAL: 1481 self._callback_msgs.append(SetCurrentConnectionCount(self._numsockets)) 1482 self.last_conncount_callback = curtime 1483 1484 # Process outgoing messages 1485 if self._queue: 1486 self.process_conn_output() 1487 1488 # Check which connections are ready to send/receive data 1489 try: 1490 key_events = self.selector.select(timeout=-1) 1491 input_list = set(key.fileobj for key, event in key_events if event & selectors.EVENT_READ) 1492 output_list = set(key.fileobj for key, event in key_events if event & selectors.EVENT_WRITE) 1493 1494 except OSError as error: 1495 # Error recieved; terminate networking loop 1496 1497 log.add("Major Socket Error: Networking terminated! %s", error) 1498 self._want_abort = True 1499 1500 except ValueError as error: 1501 # Possibly opened too many sockets 1502 1503 log.add("select ValueError: %s", error) 1504 time.sleep(0.1) 1505 1506 self._callback_msgs.clear() 1507 continue 1508 1509 # Manage incoming connections to listen socket 1510 if self._numsockets < MAXSOCKETS and not self.server_disconnected and self.listen_socket in input_list: 1511 try: 1512 incconn, incaddr = self.listen_socket.accept() 1513 except Exception: 1514 time.sleep(0.01) 1515 else: 1516 if self._network_filter.is_ip_blocked(incaddr[0]): 1517 log.add_conn("Ignoring connection request from blocked IP address %(ip)s:%(port)s", { 1518 'ip': incaddr[0], 1519 'port': incaddr[1] 1520 }) 1521 incconn.close() 1522 1523 else: 1524 events = selectors.EVENT_READ 1525 incconn.setblocking(0) 1526 1527 self._conns[incconn] = PeerConnection(conn=incconn, addr=incaddr, events=events) 1528 self._numsockets += 1 1529 self._callback_msgs.append(IncConn(incconn, incaddr)) 1530 1531 # Event flags are modified to include 'write' in subsequent loops, if necessary. 1532 # Don't do it here, otherwise connections may break. 1533 self.selector.register(incconn, events) 1534 1535 # Manage outgoing connections in progress 1536 for connection_in_progress in self._connsinprogress.copy(): 1537 try: 1538 conn_obj = self._connsinprogress[connection_in_progress] 1539 1540 except KeyError: 1541 # Connection was removed, possibly disconnecting from the server 1542 continue 1543 1544 msg_obj = conn_obj.msg_obj 1545 1546 if (curtime - conn_obj.lastactive) > self.IN_PROGRESS_STALE_AFTER: 1547 # Connection failed 1548 1549 self._callback_msgs.append(ConnectError(msg_obj, "Timed out")) 1550 self.close_connection(self._connsinprogress, connection_in_progress) 1551 continue 1552 1553 try: 1554 if connection_in_progress in input_list: 1555 # Check if the socket has any data for us 1556 connection_in_progress.recv(1, socket.MSG_PEEK) 1557 1558 except socket.error as err: 1559 self._callback_msgs.append(ConnectError(msg_obj, err)) 1560 self.close_connection(self._connsinprogress, connection_in_progress) 1561 1562 else: 1563 if connection_in_progress in output_list: 1564 # Connection has been established 1565 1566 addr = msg_obj.addr 1567 events = selectors.EVENT_READ | selectors.EVENT_WRITE 1568 1569 if connection_in_progress is self.server_socket: 1570 self._conns[self.server_socket] = Connection( 1571 conn=self.server_socket, addr=addr, events=events) 1572 1573 self._callback_msgs.append(InitServerConn(self.server_socket, addr)) 1574 1575 else: 1576 if self._network_filter.is_ip_blocked(addr[0]): 1577 log.add_conn("Ignoring connection request from blocked IP address %(ip)s:%(port)s", { 1578 "ip": addr[0], 1579 "port": addr[1] 1580 }) 1581 self._callback_msgs.append(ConnectError(msg_obj, "Blocked IP address")) 1582 self.close_connection(self._connsinprogress, connection_in_progress) 1583 continue 1584 1585 self._conns[connection_in_progress] = PeerConnection( 1586 conn=connection_in_progress, addr=addr, events=events, init=msg_obj.init) 1587 1588 self._callback_msgs.append(InitPeerConn(connection_in_progress, addr)) 1589 1590 del self._connsinprogress[connection_in_progress] 1591 1592 # Process read/write for active connections 1593 for connection in self._conns.copy(): 1594 try: 1595 conn_obj = self._conns[connection] 1596 1597 except KeyError: 1598 # Connection was removed, possibly disconnecting from the server 1599 continue 1600 1601 if (connection is not self.server_socket 1602 and (curtime - conn_obj.lastactive) > self.CONNECTION_MAX_IDLE): 1603 # No recent activity, peer connection is stale 1604 1605 self._callback_msgs.append(ConnClose(connection, conn_obj.addr)) 1606 self.close_connection(self._conns, connection) 1607 continue 1608 1609 if connection in input_list: 1610 if self._is_download(conn_obj): 1611 self.set_conn_speed_limit(connection, self._download_limit_split, self._dlimits) 1612 1613 try: 1614 if not self.read_data(conn_obj): 1615 # No data received, socket was likely closed remotely 1616 self._callback_msgs.append(ConnClose(connection, conn_obj.addr)) 1617 self.close_connection(self._conns, connection) 1618 continue 1619 1620 except socket.error as err: 1621 log.add_conn(("Cannot read data from connection %(addr)s, closing connection. " 1622 "Error: %(error)s"), { 1623 "addr": conn_obj.addr, 1624 "error": err 1625 }) 1626 self._callback_msgs.append(ConnClose(connection, conn_obj.addr)) 1627 self.close_connection(self._conns, connection) 1628 continue 1629 1630 if conn_obj.ibuf: 1631 self.process_conn_input(connection, conn_obj) 1632 1633 if connection in output_list: 1634 if self._is_upload(conn_obj): 1635 self.set_conn_speed_limit(connection, self._upload_limit_split, self._ulimits) 1636 1637 try: 1638 self.write_data(conn_obj) 1639 1640 except Exception as err: 1641 log.add_conn("Cannot write data to connection %(addr)s, closing connection. Error: %(error)s", { 1642 "addr": conn_obj.addr, 1643 "error": err 1644 }) 1645 self._callback_msgs.append(ConnClose(connection, conn_obj.addr)) 1646 self.close_connection(self._conns, connection) 1647 continue 1648 1649 # Inform the main thread 1650 if self._callback_msgs: 1651 self._core_callback(list(self._callback_msgs)) 1652 self._callback_msgs.clear() 1653 1654 # Reset transfer speed limits 1655 self._ulimits = {} 1656 self._dlimits = {} 1657 1658 self._calc_loops_per_second() 1659 1660 # Don't exhaust the CPU 1661 time.sleep(1 / 60) 1662 1663 # Networking thread aborted 1664