1 /* 2 * Copyright (c) 2013-2021, The PurpleI2P Project 3 * 4 * This file is part of Purple i2pd project and licensed under BSD3 5 * 6 * See full license text in LICENSE file at top of project tree 7 */ 8 9 #include <string.h> 10 #include "Crypto.h" 11 #include "Log.h" 12 #include "TunnelBase.h" 13 #include "RouterContext.h" 14 #include "Destination.h" 15 #include "Datagram.h" 16 17 namespace i2p 18 { 19 namespace datagram 20 { DatagramDestination(std::shared_ptr<i2p::client::ClientDestination> owner,bool gzip)21 DatagramDestination::DatagramDestination (std::shared_ptr<i2p::client::ClientDestination> owner, bool gzip): 22 m_Owner (owner), m_Receiver (nullptr), m_RawReceiver (nullptr), m_Gzip (gzip) 23 { 24 if (m_Gzip) 25 m_Deflator.reset (new i2p::data::GzipDeflator); 26 27 auto identityLen = m_Owner->GetIdentity ()->GetFullLen (); 28 m_From.resize (identityLen); 29 m_Owner->GetIdentity ()->ToBuffer (m_From.data (), identityLen); 30 m_Signature.resize (m_Owner->GetIdentity ()->GetSignatureLen ()); 31 } 32 ~DatagramDestination()33 DatagramDestination::~DatagramDestination () 34 { 35 m_Sessions.clear(); 36 } 37 SendDatagramTo(const uint8_t * payload,size_t len,const i2p::data::IdentHash & identity,uint16_t fromPort,uint16_t toPort)38 void DatagramDestination::SendDatagramTo(const uint8_t * payload, size_t len, const i2p::data::IdentHash & identity, uint16_t fromPort, uint16_t toPort) 39 { 40 auto session = ObtainSession(identity); 41 SendDatagram (session, payload, len, fromPort, toPort); 42 FlushSendQueue (session); 43 } 44 SendRawDatagramTo(const uint8_t * payload,size_t len,const i2p::data::IdentHash & identity,uint16_t fromPort,uint16_t toPort)45 void DatagramDestination::SendRawDatagramTo(const uint8_t * payload, size_t len, const i2p::data::IdentHash & identity, uint16_t fromPort, uint16_t toPort) 46 { 47 auto session = ObtainSession(identity); 48 SendRawDatagram (session, payload, len, fromPort, toPort); 49 FlushSendQueue (session); 50 } 51 GetSession(const i2p::data::IdentHash & ident)52 std::shared_ptr<DatagramSession> DatagramDestination::GetSession(const i2p::data::IdentHash & ident) 53 { 54 return ObtainSession(ident); 55 } 56 SendDatagram(std::shared_ptr<DatagramSession> session,const uint8_t * payload,size_t len,uint16_t fromPort,uint16_t toPort)57 void DatagramDestination::SendDatagram (std::shared_ptr<DatagramSession> session, const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort) 58 { 59 if (session) 60 { 61 if (m_Owner->GetIdentity ()->GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1) 62 { 63 uint8_t hash[32]; 64 SHA256(payload, len, hash); 65 m_Owner->Sign (hash, 32, m_Signature.data ()); 66 } 67 else 68 m_Owner->Sign (payload, len, m_Signature.data ()); 69 70 auto msg = CreateDataMessage ({{m_From.data (), m_From.size ()}, {m_Signature.data (), m_Signature.size ()}, {payload, len}}, 71 fromPort, toPort, false, !session->IsRatchets ()); // datagram 72 session->SendMsg(msg); 73 } 74 } 75 SendRawDatagram(std::shared_ptr<DatagramSession> session,const uint8_t * payload,size_t len,uint16_t fromPort,uint16_t toPort)76 void DatagramDestination::SendRawDatagram (std::shared_ptr<DatagramSession> session, const uint8_t * payload, size_t len, uint16_t fromPort, uint16_t toPort) 77 { 78 if (session) 79 session->SendMsg(CreateDataMessage ({{payload, len}}, fromPort, toPort, true, !session->IsRatchets ())); // raw 80 } 81 FlushSendQueue(std::shared_ptr<DatagramSession> session)82 void DatagramDestination::FlushSendQueue (std::shared_ptr<DatagramSession> session) 83 { 84 if (session) 85 session->FlushSendQueue (); 86 } 87 HandleDatagram(uint16_t fromPort,uint16_t toPort,uint8_t * const & buf,size_t len)88 void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort,uint8_t * const &buf, size_t len) 89 { 90 i2p::data::IdentityEx identity; 91 size_t identityLen = identity.FromBuffer (buf, len); 92 const uint8_t * signature = buf + identityLen; 93 size_t headerLen = identityLen + identity.GetSignatureLen (); 94 95 bool verified = false; 96 if (identity.GetSigningKeyType () == i2p::data::SIGNING_KEY_TYPE_DSA_SHA1) 97 { 98 uint8_t hash[32]; 99 SHA256(buf + headerLen, len - headerLen, hash); 100 verified = identity.Verify (hash, 32, signature); 101 } 102 else 103 verified = identity.Verify (buf + headerLen, len - headerLen, signature); 104 105 if (verified) 106 { 107 auto h = identity.GetIdentHash(); 108 auto session = ObtainSession(h); 109 session->Ack(); 110 auto r = FindReceiver(toPort); 111 if(r) 112 r(identity, fromPort, toPort, buf + headerLen, len -headerLen); 113 else 114 LogPrint (eLogWarning, "DatagramDestination: no receiver for port ", toPort); 115 } 116 else 117 LogPrint (eLogWarning, "Datagram signature verification failed"); 118 } 119 HandleRawDatagram(uint16_t fromPort,uint16_t toPort,const uint8_t * buf,size_t len)120 void DatagramDestination::HandleRawDatagram (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len) 121 { 122 if (m_RawReceiver) 123 m_RawReceiver (fromPort, toPort, buf, len); 124 else 125 LogPrint (eLogWarning, "DatagramDestination: no receiver for raw datagram"); 126 } 127 FindReceiver(uint16_t port)128 DatagramDestination::Receiver DatagramDestination::FindReceiver(uint16_t port) 129 { 130 std::lock_guard<std::mutex> lock(m_ReceiversMutex); 131 Receiver r = m_Receiver; 132 auto itr = m_ReceiversByPorts.find(port); 133 if (itr != m_ReceiversByPorts.end()) 134 r = itr->second; 135 return r; 136 } 137 HandleDataMessagePayload(uint16_t fromPort,uint16_t toPort,const uint8_t * buf,size_t len,bool isRaw)138 void DatagramDestination::HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len, bool isRaw) 139 { 140 // unzip it 141 uint8_t uncompressed[MAX_DATAGRAM_SIZE]; 142 size_t uncompressedLen = m_Inflator.Inflate (buf, len, uncompressed, MAX_DATAGRAM_SIZE); 143 if (uncompressedLen) 144 { 145 if (isRaw) 146 HandleRawDatagram (fromPort, toPort, uncompressed, uncompressedLen); 147 else 148 HandleDatagram (fromPort, toPort, uncompressed, uncompressedLen); 149 } 150 else 151 LogPrint (eLogWarning, "Datagram: decompression failed"); 152 } 153 CreateDataMessage(const std::vector<std::pair<const uint8_t *,size_t>> & payloads,uint16_t fromPort,uint16_t toPort,bool isRaw,bool checksum)154 std::shared_ptr<I2NPMessage> DatagramDestination::CreateDataMessage ( 155 const std::vector<std::pair<const uint8_t *, size_t> >& payloads, 156 uint16_t fromPort, uint16_t toPort, bool isRaw, bool checksum) 157 { 158 size_t size; 159 auto msg = m_I2NPMsgsPool.AcquireShared (); 160 uint8_t * buf = msg->GetPayload (); 161 buf += 4; // reserve for length 162 163 if (m_Gzip && m_Deflator) 164 size = m_Deflator->Deflate (payloads, buf, msg->maxLen - msg->len); 165 else 166 size = i2p::data::GzipNoCompression (payloads, buf, msg->maxLen - msg->len); 167 168 if (size) 169 { 170 htobe32buf (msg->GetPayload (), size); // length 171 htobe16buf (buf + 4, fromPort); // source port 172 htobe16buf (buf + 6, toPort); // destination port 173 buf[9] = isRaw ? i2p::client::PROTOCOL_TYPE_RAW : i2p::client::PROTOCOL_TYPE_DATAGRAM; // raw or datagram protocol 174 msg->len += size + 4; 175 msg->FillI2NPMessageHeader (eI2NPData, 0, checksum); 176 } 177 else 178 msg = nullptr; 179 return msg; 180 } 181 CleanUp()182 void DatagramDestination::CleanUp () 183 { 184 if (m_Sessions.empty ()) return; 185 auto now = i2p::util::GetMillisecondsSinceEpoch(); 186 LogPrint(eLogDebug, "DatagramDestination: clean up sessions"); 187 std::unique_lock<std::mutex> lock(m_SessionsMutex); 188 // for each session ... 189 for (auto it = m_Sessions.begin (); it != m_Sessions.end (); ) 190 { 191 // check if expired 192 if (now - it->second->LastActivity() >= DATAGRAM_SESSION_MAX_IDLE) 193 { 194 LogPrint(eLogInfo, "DatagramDestination: expiring idle session with ", it->first.ToBase32()); 195 it->second->Stop (); 196 it = m_Sessions.erase (it); // we are expired 197 } 198 else 199 it++; 200 } 201 } 202 ObtainSession(const i2p::data::IdentHash & identity)203 std::shared_ptr<DatagramSession> DatagramDestination::ObtainSession(const i2p::data::IdentHash & identity) 204 { 205 std::shared_ptr<DatagramSession> session = nullptr; 206 std::lock_guard<std::mutex> lock(m_SessionsMutex); 207 auto itr = m_Sessions.find(identity); 208 if (itr == m_Sessions.end()) { 209 // not found, create new session 210 session = std::make_shared<DatagramSession>(m_Owner, identity); 211 session->Start (); 212 m_Sessions[identity] = session; 213 } else { 214 session = itr->second; 215 } 216 return session; 217 } 218 GetInfoForRemote(const i2p::data::IdentHash & remote)219 std::shared_ptr<DatagramSession::Info> DatagramDestination::GetInfoForRemote(const i2p::data::IdentHash & remote) 220 { 221 std::lock_guard<std::mutex> lock(m_SessionsMutex); 222 for ( auto & item : m_Sessions) 223 { 224 if(item.first == remote) return std::make_shared<DatagramSession::Info>(item.second->GetSessionInfo()); 225 } 226 return nullptr; 227 } 228 DatagramSession(std::shared_ptr<i2p::client::ClientDestination> localDestination,const i2p::data::IdentHash & remoteIdent)229 DatagramSession::DatagramSession(std::shared_ptr<i2p::client::ClientDestination> localDestination, 230 const i2p::data::IdentHash & remoteIdent) : 231 m_LocalDestination(localDestination), 232 m_RemoteIdent(remoteIdent), 233 m_RequestingLS(false) 234 { 235 } 236 Start()237 void DatagramSession::Start () 238 { 239 m_LastUse = i2p::util::GetMillisecondsSinceEpoch (); 240 } 241 Stop()242 void DatagramSession::Stop () 243 { 244 } 245 SendMsg(std::shared_ptr<I2NPMessage> msg)246 void DatagramSession::SendMsg(std::shared_ptr<I2NPMessage> msg) 247 { 248 // we used this session 249 m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); 250 if (msg || m_SendQueue.empty ()) 251 m_SendQueue.push_back(msg); 252 // flush queue right away if full 253 if (!msg || m_SendQueue.size() >= DATAGRAM_SEND_QUEUE_MAX_SIZE) 254 FlushSendQueue(); 255 } 256 GetSessionInfo() const257 DatagramSession::Info DatagramSession::GetSessionInfo() const 258 { 259 if(!m_RoutingSession) 260 return DatagramSession::Info(nullptr, nullptr, m_LastUse); 261 262 auto routingPath = m_RoutingSession->GetSharedRoutingPath(); 263 if (!routingPath) 264 return DatagramSession::Info(nullptr, nullptr, m_LastUse); 265 auto lease = routingPath->remoteLease; 266 auto tunnel = routingPath->outboundTunnel; 267 if(lease) 268 { 269 if(tunnel) 270 return DatagramSession::Info(lease->tunnelGateway, tunnel->GetEndpointIdentHash(), m_LastUse); 271 else 272 return DatagramSession::Info(lease->tunnelGateway, nullptr, m_LastUse); 273 } 274 else if(tunnel) 275 return DatagramSession::Info(nullptr, tunnel->GetEndpointIdentHash(), m_LastUse); 276 else 277 return DatagramSession::Info(nullptr, nullptr, m_LastUse); 278 } 279 Ack()280 void DatagramSession::Ack() 281 { 282 m_LastUse = i2p::util::GetMillisecondsSinceEpoch(); 283 auto path = GetSharedRoutingPath(); 284 if(path) 285 path->updateTime = i2p::util::GetSecondsSinceEpoch (); 286 if (IsRatchets ()) 287 SendMsg (nullptr); // send empty message in case if we have some data to send 288 } 289 GetSharedRoutingPath()290 std::shared_ptr<i2p::garlic::GarlicRoutingPath> DatagramSession::GetSharedRoutingPath () 291 { 292 if (!m_RemoteLeaseSet || m_RemoteLeaseSet->IsExpired ()) 293 { 294 m_RemoteLeaseSet = m_LocalDestination->FindLeaseSet(m_RemoteIdent); 295 if (!m_RemoteLeaseSet) 296 { 297 if(!m_RequestingLS) 298 { 299 m_RequestingLS = true; 300 m_LocalDestination->RequestDestination(m_RemoteIdent, std::bind(&DatagramSession::HandleLeaseSetUpdated, this, std::placeholders::_1)); 301 } 302 return nullptr; 303 } 304 } 305 306 if (!m_RoutingSession || m_RoutingSession->IsTerminated () || !m_RoutingSession->IsReadyToSend ()) 307 { 308 bool found = false; 309 for (auto& it: m_PendingRoutingSessions) 310 if (it->GetOwner () && m_RoutingSession->IsReadyToSend ()) // found established session 311 { 312 m_RoutingSession = it; 313 m_PendingRoutingSessions.clear (); 314 found = true; 315 break; 316 } 317 if (!found) 318 { 319 m_RoutingSession = m_LocalDestination->GetRoutingSession(m_RemoteLeaseSet, true); 320 if (!m_RoutingSession->GetOwner () || !m_RoutingSession->IsReadyToSend ()) 321 m_PendingRoutingSessions.push_back (m_RoutingSession); 322 } 323 } 324 325 auto path = m_RoutingSession->GetSharedRoutingPath(); 326 if (path && m_RoutingSession->IsRatchets () && 327 m_LastUse > m_RoutingSession->GetLastActivityTimestamp ()*1000 + DATAGRAM_SESSION_PATH_TIMEOUT) 328 { 329 m_RoutingSession->SetSharedRoutingPath (nullptr); 330 path = nullptr; 331 } 332 333 if (path) 334 { 335 if (path->outboundTunnel && !path->outboundTunnel->IsEstablished ()) 336 { 337 // bad outbound tunnel, switch outbound tunnel 338 path->outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(path->outboundTunnel); 339 if (!path->outboundTunnel) 340 m_RoutingSession->SetSharedRoutingPath (nullptr); 341 } 342 343 if (path->remoteLease && path->remoteLease->ExpiresWithin(DATAGRAM_SESSION_LEASE_HANDOVER_WINDOW)) 344 { 345 // bad lease, switch to next one 346 if (m_RemoteLeaseSet) 347 { 348 auto ls = m_RemoteLeaseSet->GetNonExpiredLeasesExcluding( 349 [&](const i2p::data::Lease& l) -> bool 350 { 351 return l.tunnelID == path->remoteLease->tunnelID; 352 }); 353 auto sz = ls.size(); 354 if (sz) 355 { 356 auto idx = rand() % sz; 357 path->remoteLease = ls[idx]; 358 } 359 else 360 m_RoutingSession->SetSharedRoutingPath (nullptr); 361 } 362 else 363 { 364 // no remote lease set? 365 LogPrint(eLogWarning, "DatagramSession: no cached remote lease set for ", m_RemoteIdent.ToBase32()); 366 m_RoutingSession->SetSharedRoutingPath (nullptr); 367 } 368 } 369 } 370 else 371 { 372 // no current path, make one 373 path = std::make_shared<i2p::garlic::GarlicRoutingPath>(); 374 375 if (m_RemoteLeaseSet) 376 { 377 // pick random next good lease 378 auto ls = m_RemoteLeaseSet->GetNonExpiredLeases(); 379 auto sz = ls.size(); 380 if (sz) 381 { 382 auto idx = rand() % sz; 383 path->remoteLease = ls[idx]; 384 } 385 else 386 return nullptr; 387 388 auto leaseRouter = i2p::data::netdb.FindRouter (path->remoteLease->tunnelGateway); 389 path->outboundTunnel = m_LocalDestination->GetTunnelPool()->GetNextOutboundTunnel(nullptr, 390 leaseRouter ? leaseRouter->GetCompatibleTransports (false) : (i2p::data::RouterInfo::CompatibleTransports)i2p::data::RouterInfo::eAllTransports); 391 if (!path->outboundTunnel) return nullptr; 392 } 393 else 394 { 395 // no remote lease set currently, bail 396 LogPrint(eLogWarning, "DatagramSession: no remote lease set found for ", m_RemoteIdent.ToBase32()); 397 return nullptr; 398 } 399 m_RoutingSession->SetSharedRoutingPath(path); 400 } 401 return path; 402 } 403 HandleLeaseSetUpdated(std::shared_ptr<i2p::data::LeaseSet> ls)404 void DatagramSession::HandleLeaseSetUpdated(std::shared_ptr<i2p::data::LeaseSet> ls) 405 { 406 m_RequestingLS = false; 407 if(!ls) return; 408 // only update lease set if found and newer than previous lease set 409 uint64_t oldExpire = 0; 410 if(m_RemoteLeaseSet) oldExpire = m_RemoteLeaseSet->GetExpirationTime(); 411 if(ls && ls->GetExpirationTime() > oldExpire) m_RemoteLeaseSet = ls; 412 } 413 FlushSendQueue()414 void DatagramSession::FlushSendQueue () 415 { 416 if (m_SendQueue.empty ()) return; 417 std::vector<i2p::tunnel::TunnelMessageBlock> send; 418 auto routingPath = GetSharedRoutingPath(); 419 // if we don't have a routing path we will drop all queued messages 420 if(routingPath && routingPath->outboundTunnel && routingPath->remoteLease) 421 { 422 for (const auto & msg : m_SendQueue) 423 { 424 auto m = m_RoutingSession->WrapSingleMessage(msg); 425 if (m) 426 send.push_back(i2p::tunnel::TunnelMessageBlock{i2p::tunnel::eDeliveryTypeTunnel,routingPath->remoteLease->tunnelGateway, routingPath->remoteLease->tunnelID, m}); 427 } 428 routingPath->outboundTunnel->SendTunnelDataMsg(send); 429 } 430 m_SendQueue.clear(); 431 } 432 } 433 } 434