1using Gee; 2using Xmpp.Xep; 3using Xmpp; 4 5namespace Xmpp.Xep.Jingle { 6 7private const string NS_URI = "urn:xmpp:jingle:1"; 8private const string ERROR_NS_URI = "urn:xmpp:jingle:errors:1"; 9 10public errordomain IqError { 11 BAD_REQUEST, 12 NOT_ACCEPTABLE, 13 NOT_IMPLEMENTED, 14 UNSUPPORTED_INFO, 15 OUT_OF_ORDER, 16 RESOURCE_CONSTRAINT, 17} 18 19void send_iq_error(IqError iq_error, XmppStream stream, Iq.Stanza iq) { 20 ErrorStanza error; 21 if (iq_error is IqError.BAD_REQUEST) { 22 error = new ErrorStanza.bad_request(iq_error.message); 23 } else if (iq_error is IqError.NOT_ACCEPTABLE) { 24 error = new ErrorStanza.not_acceptable(iq_error.message); 25 } else if (iq_error is IqError.NOT_IMPLEMENTED) { 26 error = new ErrorStanza.feature_not_implemented(iq_error.message); 27 } else if (iq_error is IqError.UNSUPPORTED_INFO) { 28 StanzaNode unsupported_info = new StanzaNode.build("unsupported-info", ERROR_NS_URI).add_self_xmlns(); 29 error = new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_FEATURE_NOT_IMPLEMENTED, iq_error.message, unsupported_info); 30 } else if (iq_error is IqError.OUT_OF_ORDER) { 31 StanzaNode out_of_order = new StanzaNode.build("out-of-order", ERROR_NS_URI).add_self_xmlns(); 32 error = new ErrorStanza.build(ErrorStanza.TYPE_MODIFY, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, iq_error.message, out_of_order); 33 } else if (iq_error is IqError.RESOURCE_CONSTRAINT) { 34 error = new ErrorStanza.resource_constraint(iq_error.message); 35 } else { 36 assert_not_reached(); 37 } 38 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, error) { to=iq.from }); 39} 40 41public errordomain Error { 42 GENERAL, 43 BAD_REQUEST, 44 INVALID_PARAMETERS, 45 UNSUPPORTED_TRANSPORT, 46 UNSUPPORTED_SECURITY, 47 NO_SHARED_PROTOCOLS, 48 TRANSPORT_ERROR, 49} 50 51StanzaNode? get_single_node_anyns(StanzaNode parent, string? node_name = null) throws IqError { 52 StanzaNode? result = null; 53 foreach (StanzaNode child in parent.get_all_subnodes()) { 54 if (node_name == null || child.name == node_name) { 55 if (result != null) { 56 if (node_name != null) { 57 throw new IqError.BAD_REQUEST(@"multiple $(node_name) nodes"); 58 } else { 59 throw new IqError.BAD_REQUEST(@"expected single subnode"); 60 } 61 } 62 result = child; 63 } 64 } 65 return result; 66} 67 68class ContentNode { 69 public Role creator; 70 public string name; 71 public StanzaNode? description; 72 public StanzaNode? transport; 73 public StanzaNode? security; 74} 75 76ContentNode get_single_content_node(StanzaNode jingle) throws IqError { 77 Gee.List<StanzaNode> contents = jingle.get_subnodes("content"); 78 if (contents.size == 0) { 79 throw new IqError.BAD_REQUEST("missing content node"); 80 } 81 if (contents.size > 1) { 82 throw new IqError.NOT_IMPLEMENTED("can't process multiple content nodes"); 83 } 84 StanzaNode content = contents[0]; 85 string? creator_str = content.get_attribute("creator"); 86 // Vala can't typecheck the ternary operator here. 87 Role? creator = null; 88 if (creator_str != null) { 89 creator = Role.parse(creator_str); 90 } else { 91 // TODO(hrxi): now, is the creator attribute optional or not (XEP-0166 92 // Jingle)? 93 creator = Role.INITIATOR; 94 } 95 96 string? name = content.get_attribute("name"); 97 StanzaNode? description = get_single_node_anyns(content, "description"); 98 StanzaNode? transport = get_single_node_anyns(content, "transport"); 99 StanzaNode? security = get_single_node_anyns(content, "security"); 100 if (name == null || creator == null) { 101 throw new IqError.BAD_REQUEST("missing name or creator"); 102 } 103 104 return new ContentNode() { 105 creator=creator, 106 name=name, 107 description=description, 108 transport=transport, 109 security=security 110 }; 111} 112 113// This module can only be attached to one stream at a time. 114public class Module : XmppStreamModule, Iq.Handler { 115 public static Xmpp.ModuleIdentity<Module> IDENTITY = new Xmpp.ModuleIdentity<Module>(NS_URI, "0166_jingle"); 116 117 private HashMap<string, ContentType> content_types = new HashMap<string, ContentType>(); 118 private HashMap<string, Transport> transports = new HashMap<string, Transport>(); 119 private HashMap<string, SecurityPrecondition> security_preconditions = new HashMap<string, SecurityPrecondition>(); 120 121 private XmppStream? current_stream = null; 122 123 public override void attach(XmppStream stream) { 124 stream.add_flag(new Flag()); 125 stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); 126 stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this); 127 current_stream = stream; 128 } 129 public override void detach(XmppStream stream) { 130 stream.get_module(ServiceDiscovery.Module.IDENTITY).remove_feature(stream, NS_URI); 131 stream.get_module(Iq.Module.IDENTITY).unregister_from_namespace(NS_URI, this); 132 } 133 134 public void register_content_type(ContentType content_type) { 135 content_types[content_type.content_type_ns_uri()] = content_type; 136 } 137 public ContentType? get_content_type(string ns_uri) { 138 if (!content_types.has_key(ns_uri)) { 139 return null; 140 } 141 return content_types[ns_uri]; 142 } 143 public void register_transport(Transport transport) { 144 transports[transport.transport_ns_uri()] = transport; 145 } 146 public Transport? get_transport(string ns_uri) { 147 if (!transports.has_key(ns_uri)) { 148 return null; 149 } 150 return transports[ns_uri]; 151 } 152 public async Transport? select_transport(XmppStream stream, TransportType type, Jid receiver_full_jid, Set<string> blacklist) { 153 Transport? result = null; 154 foreach (Transport transport in transports.values) { 155 if (transport.transport_type() != type) { 156 continue; 157 } 158 if (transport.transport_ns_uri() in blacklist) { 159 continue; 160 } 161 if (yield transport.is_transport_available(stream, receiver_full_jid)) { 162 if (result != null) { 163 if (result.transport_priority() >= transport.transport_priority()) { 164 continue; 165 } 166 } 167 result = transport; 168 } 169 } 170 return result; 171 } 172 public void register_security_precondition(SecurityPrecondition precondition) { 173 security_preconditions[precondition.security_ns_uri()] = precondition; 174 } 175 public SecurityPrecondition? get_security_precondition(string? ns_uri) { 176 if (ns_uri == null) return null; 177 if (!security_preconditions.has_key(ns_uri)) { 178 return null; 179 } 180 return security_preconditions[ns_uri]; 181 } 182 183 private async bool is_jingle_available(XmppStream stream, Jid full_jid) { 184 bool? has_jingle = yield stream.get_module(ServiceDiscovery.Module.IDENTITY).has_entity_feature(stream, full_jid, NS_URI); 185 return has_jingle != null && has_jingle; 186 } 187 188 public async bool is_available(XmppStream stream, TransportType type, Jid full_jid) { 189 return (yield is_jingle_available(stream, full_jid)) && (yield select_transport(stream, type, full_jid, Set.empty())) != null; 190 } 191 192 public async Session create_session(XmppStream stream, TransportType type, Jid receiver_full_jid, Senders senders, string content_name, StanzaNode description, string? precondition_name = null, Object? precondation_options = null) throws Error { 193 if (!yield is_jingle_available(stream, receiver_full_jid)) { 194 throw new Error.NO_SHARED_PROTOCOLS("No Jingle support"); 195 } 196 Transport? transport = yield select_transport(stream, type, receiver_full_jid, Set.empty()); 197 if (transport == null) { 198 throw new Error.NO_SHARED_PROTOCOLS("No suitable transports"); 199 } 200 SecurityPrecondition? precondition = get_security_precondition(precondition_name); 201 if (precondition_name != null && precondition == null) { 202 throw new Error.UNSUPPORTED_SECURITY("No suitable security precondiiton found"); 203 } 204 Jid? my_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid; 205 if (my_jid == null) { 206 throw new Error.GENERAL("Couldn't determine own JID"); 207 } 208 TransportParameters transport_params = transport.create_transport_parameters(stream, my_jid, receiver_full_jid); 209 SecurityParameters? security_params = precondition != null ? precondition.create_security_parameters(stream, my_jid, receiver_full_jid, precondation_options) : null; 210 Session session = new Session.initiate_sent(random_uuid(), type, transport_params, security_params, my_jid, receiver_full_jid, content_name, send_terminate_and_remove_session); 211 StanzaNode content = new StanzaNode.build("content", NS_URI) 212 .put_attribute("creator", "initiator") 213 .put_attribute("name", content_name) 214 .put_attribute("senders", senders.to_string()) 215 .put_node(description) 216 .put_node(transport_params.to_transport_stanza_node()); 217 if (security_params != null) { 218 content.put_node(security_params.to_security_stanza_node(stream, my_jid, receiver_full_jid)); 219 } 220 StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) 221 .add_self_xmlns() 222 .put_attribute("action", "session-initiate") 223 .put_attribute("initiator", my_jid.to_string()) 224 .put_attribute("sid", session.sid) 225 .put_node(content); 226 Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=receiver_full_jid }; 227 228 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { 229 // TODO(hrxi): handle errors 230 stream.get_flag(Flag.IDENTITY).add_session(session); 231 }); 232 233 return session; 234 } 235 236 public void handle_session_initiate(XmppStream stream, string sid, StanzaNode jingle, Iq.Stanza iq) throws IqError { 237 ContentNode content = get_single_content_node(jingle); 238 if (content.description == null || content.transport == null) { 239 throw new IqError.BAD_REQUEST("missing description or transport node"); 240 } 241 Jid? my_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid; 242 if (my_jid == null) { 243 throw new IqError.RESOURCE_CONSTRAINT("Couldn't determine own JID"); 244 } 245 Transport? transport = get_transport(content.transport.ns_uri); 246 TransportParameters? transport_params = null; 247 if (transport != null) { 248 transport_params = transport.parse_transport_parameters(stream, my_jid, iq.from, content.transport); 249 } else { 250 // terminate the session below 251 } 252 253 ContentType? content_type = get_content_type(content.description.ns_uri); 254 if (content_type == null) { 255 // TODO(hrxi): how do we signal an unknown content type? 256 throw new IqError.NOT_IMPLEMENTED("unknown content type"); 257 } 258 ContentParameters content_params = content_type.parse_content_parameters(content.description); 259 260 SecurityPrecondition? precondition = content.security != null ? get_security_precondition(content.security.ns_uri) : null; 261 SecurityParameters? security_params = null; 262 if (precondition != null) { 263 debug("Using precondition %s", precondition.security_ns_uri()); 264 security_params = precondition.parse_security_parameters(stream, my_jid, iq.from, content.security); 265 } else if (content.security != null) { 266 throw new IqError.NOT_IMPLEMENTED("unknown security precondition"); 267 } 268 269 TransportType type = content_type.content_type_transport_type(); 270 Session session = new Session.initiate_received(sid, type, transport_params, security_params, my_jid, iq.from, content.name, send_terminate_and_remove_session); 271 stream.get_flag(Flag.IDENTITY).add_session(session); 272 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); 273 274 if (transport == null || transport.transport_type() != type) { 275 StanzaNode reason = new StanzaNode.build("reason", NS_URI) 276 .put_node(new StanzaNode.build("unsupported-transports", NS_URI)); 277 session.terminate(reason, "unsupported transports"); 278 return; 279 } 280 281 content_params.on_session_initiate(stream, session); 282 } 283 284 private void send_terminate_and_remove_session(Jid to, string sid, StanzaNode reason) { 285 StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) 286 .add_self_xmlns() 287 .put_attribute("action", "session-terminate") 288 .put_attribute("sid", sid) 289 .put_node(reason); 290 Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=to }; 291 current_stream.get_module(Iq.Module.IDENTITY).send_iq(current_stream, iq); 292 293 // Immediately remove the session from the open sessions as per the 294 // XEP, don't wait for confirmation. 295 current_stream.get_flag(Flag.IDENTITY).remove_session(sid); 296 } 297 298 public async void on_iq_set(XmppStream stream, Iq.Stanza iq) { 299 try { 300 handle_iq_set(stream, iq); 301 } catch (IqError e) { 302 send_iq_error(e, stream, iq); 303 } 304 } 305 306 public void handle_iq_set(XmppStream stream, Iq.Stanza iq) throws IqError { 307 StanzaNode? jingle = iq.stanza.get_subnode("jingle", NS_URI); 308 string? sid = jingle != null ? jingle.get_attribute("sid") : null; 309 string? action = jingle != null ? jingle.get_attribute("action") : null; 310 if (jingle == null || sid == null || action == null) { 311 throw new IqError.BAD_REQUEST("missing jingle node, sid or action"); 312 } 313 Session? session = stream.get_flag(Flag.IDENTITY).get_session(sid); 314 if (action == "session-initiate") { 315 if (session != null) { 316 // TODO(hrxi): Info leak if other clients use predictable session IDs? 317 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_MODIFY, ErrorStanza.CONDITION_CONFLICT, "session ID already in use", null)) { to=iq.from }); 318 return; 319 } 320 handle_session_initiate(stream, sid, jingle, iq); 321 return; 322 } 323 if (session == null) { 324 StanzaNode unknown_session = new StanzaNode.build("unknown-session", ERROR_NS_URI).add_self_xmlns(); 325 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found(unknown_session)) { to=iq.from }); 326 return; 327 } 328 session.handle_iq_set(stream, action, jingle, iq); 329 } 330 331 public override string get_ns() { return NS_URI; } 332 public override string get_id() { return IDENTITY.id; } 333} 334 335public enum TransportType { 336 DATAGRAM, 337 STREAMING, 338} 339 340public enum Senders { 341 BOTH, 342 INITIATOR, 343 NONE, 344 RESPONDER; 345 346 public string to_string() { 347 switch (this) { 348 case BOTH: return "both"; 349 case INITIATOR: return "initiator"; 350 case NONE: return "none"; 351 case RESPONDER: return "responder"; 352 } 353 assert_not_reached(); 354 } 355} 356 357public delegate void SessionTerminate(Jid to, string sid, StanzaNode reason); 358 359public interface Transport : Object { 360 public abstract string transport_ns_uri(); 361 public async abstract bool is_transport_available(XmppStream stream, Jid full_jid); 362 public abstract TransportType transport_type(); 363 public abstract int transport_priority(); 364 public abstract TransportParameters create_transport_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid) throws Error; 365 public abstract TransportParameters parse_transport_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid, StanzaNode transport) throws IqError; 366} 367 368 369// Gets a null `stream` if connection setup was unsuccessful and another 370// transport method should be tried. 371public interface TransportParameters : Object { 372 public abstract string transport_ns_uri(); 373 public abstract StanzaNode to_transport_stanza_node(); 374 public abstract void on_transport_accept(StanzaNode transport) throws IqError; 375 public abstract void on_transport_info(StanzaNode transport) throws IqError; 376 public abstract void create_transport_connection(XmppStream stream, Session session); 377} 378 379public enum Role { 380 INITIATOR, 381 RESPONDER; 382 383 public string to_string() { 384 switch (this) { 385 case INITIATOR: return "initiator"; 386 case RESPONDER: return "responder"; 387 } 388 assert_not_reached(); 389 } 390 391 public static Role parse(string role) throws IqError { 392 switch (role) { 393 case "initiator": return INITIATOR; 394 case "responder": return RESPONDER; 395 } 396 throw new IqError.BAD_REQUEST(@"invalid role $(role)"); 397 } 398} 399 400public interface ContentType : Object { 401 public abstract string content_type_ns_uri(); 402 public abstract TransportType content_type_transport_type(); 403 public abstract ContentParameters parse_content_parameters(StanzaNode description) throws IqError; 404 public abstract void handle_content_session_info(XmppStream stream, Session session, StanzaNode info, Iq.Stanza iq) throws IqError; 405} 406 407public interface ContentParameters : Object { 408 public abstract void on_session_initiate(XmppStream stream, Session session); 409} 410 411public interface SecurityPrecondition : Object { 412 public abstract string security_ns_uri(); 413 public abstract SecurityParameters? create_security_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid, Object options) throws Jingle.Error; 414 public abstract SecurityParameters? parse_security_parameters(XmppStream stream, Jid local_full_jid, Jid peer_full_jid, StanzaNode security) throws IqError; 415} 416 417public interface SecurityParameters : Object { 418 public abstract string security_ns_uri(); 419 public abstract StanzaNode to_security_stanza_node(XmppStream stream, Jid local_full_jid, Jid peer_full_jid); 420 public abstract IOStream wrap_stream(IOStream stream); 421} 422 423public class Session { 424 // INITIATE_SENT -> CONNECTING -> [REPLACING_TRANSPORT -> CONNECTING ->]... ACTIVE -> ENDED 425 // INITIATE_RECEIVED -> CONNECTING -> [WAITING_FOR_TRANSPORT_REPLACE -> CONNECTING ->].. ACTIVE -> ENDED 426 public enum State { 427 INITIATE_SENT, 428 REPLACING_TRANSPORT, 429 INITIATE_RECEIVED, 430 WAITING_FOR_TRANSPORT_REPLACE, 431 CONNECTING, 432 ACTIVE, 433 ENDED, 434 } 435 436 public State state { get; private set; } 437 438 public Role role { get; private set; } 439 public string sid { get; private set; } 440 public TransportType type_ { get; private set; } 441 public Jid local_full_jid { get; private set; } 442 public Jid peer_full_jid { get; private set; } 443 public Role content_creator { get; private set; } 444 public string content_name { get; private set; } 445 public SecurityParameters? security { get; private set; } 446 447 private Connection connection; 448 public IOStream conn { get { return connection; } } 449 450 public bool terminate_on_connection_close { get; set; } 451 452 // INITIATE_SENT | INITIATE_RECEIVED | CONNECTING 453 Set<string> tried_transport_methods = new HashSet<string>(); 454 TransportParameters? transport = null; 455 456 SessionTerminate session_terminate_handler; 457 458 public Session.initiate_sent(string sid, TransportType type, TransportParameters transport, SecurityParameters? security, Jid local_full_jid, Jid peer_full_jid, string content_name, owned SessionTerminate session_terminate_handler) { 459 this.state = State.INITIATE_SENT; 460 this.role = Role.INITIATOR; 461 this.sid = sid; 462 this.type_ = type; 463 this.local_full_jid = local_full_jid; 464 this.peer_full_jid = peer_full_jid; 465 this.content_creator = Role.INITIATOR; 466 this.content_name = content_name; 467 this.tried_transport_methods = new HashSet<string>(); 468 this.tried_transport_methods.add(transport.transport_ns_uri()); 469 this.transport = transport; 470 this.security = security; 471 this.connection = new Connection(this); 472 this.session_terminate_handler = (owned)session_terminate_handler; 473 this.terminate_on_connection_close = true; 474 } 475 476 public Session.initiate_received(string sid, TransportType type, TransportParameters? transport, SecurityParameters? security, Jid local_full_jid, Jid peer_full_jid, string content_name, owned SessionTerminate session_terminate_handler) { 477 this.state = State.INITIATE_RECEIVED; 478 this.role = Role.RESPONDER; 479 this.sid = sid; 480 this.type_ = type; 481 this.local_full_jid = local_full_jid; 482 this.peer_full_jid = peer_full_jid; 483 this.content_creator = Role.INITIATOR; 484 this.content_name = content_name; 485 this.transport = transport; 486 this.security = security; 487 this.tried_transport_methods = new HashSet<string>(); 488 if (transport != null) { 489 this.tried_transport_methods.add(transport.transport_ns_uri()); 490 } 491 this.connection = new Connection(this); 492 this.session_terminate_handler = (owned)session_terminate_handler; 493 this.terminate_on_connection_close = true; 494 } 495 496 public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { 497 // Validate action. 498 switch (action) { 499 case "session-accept": 500 case "session-info": 501 case "session-terminate": 502 case "transport-accept": 503 case "transport-info": 504 case "transport-reject": 505 case "transport-replace": 506 break; 507 case "content-accept": 508 case "content-add": 509 case "content-modify": 510 case "content-reject": 511 case "content-remove": 512 case "description-info": 513 case "security-info": 514 throw new IqError.NOT_IMPLEMENTED(@"$(action) is not implemented"); 515 default: 516 throw new IqError.BAD_REQUEST("invalid action"); 517 } 518 ContentNode? content = null; 519 StanzaNode? transport = null; 520 // Do some pre-processing. 521 if (action != "session-info" && action != "session-terminate") { 522 content = get_single_content_node(jingle); 523 verify_content(content); 524 switch (action) { 525 case "transport-accept": 526 case "transport-reject": 527 case "transport-replace": 528 case "transport-info": 529 switch (state) { 530 case State.INITIATE_SENT: 531 case State.REPLACING_TRANSPORT: 532 case State.INITIATE_RECEIVED: 533 case State.WAITING_FOR_TRANSPORT_REPLACE: 534 case State.CONNECTING: 535 break; 536 default: 537 throw new IqError.OUT_OF_ORDER("transport-* unsupported after connection setup"); 538 } 539 // TODO(hrxi): What to do with description nodes? 540 if (content.transport == null) { 541 throw new IqError.BAD_REQUEST("missing transport node"); 542 } 543 transport = content.transport; 544 break; 545 } 546 } 547 switch (action) { 548 case "session-accept": 549 if (state != State.INITIATE_SENT) { 550 throw new IqError.OUT_OF_ORDER("got session-accept while not waiting for one"); 551 } 552 handle_session_accept(stream, content, jingle, iq); 553 break; 554 case "session-info": 555 handle_session_info(stream, jingle, iq); 556 break; 557 case "session-terminate": 558 handle_session_terminate(stream, jingle, iq); 559 break; 560 case "transport-accept": 561 handle_transport_accept(stream, transport, jingle, iq); 562 break; 563 case "transport-reject": 564 handle_transport_reject(stream, jingle, iq); 565 break; 566 case "transport-replace": 567 handle_transport_replace(stream, transport, jingle, iq); 568 break; 569 case "transport-info": 570 handle_transport_info(stream, transport, jingle, iq); 571 break; 572 } 573 } 574 void handle_session_accept(XmppStream stream, ContentNode content, StanzaNode jingle, Iq.Stanza iq) throws IqError { 575 string? responder_str = jingle.get_attribute("responder"); 576 Jid responder = iq.from; 577 if (responder_str != null) { 578 try { 579 responder = new Jid(responder_str); 580 } catch (InvalidJidError e) { 581 warning("Received invalid session accept: %s", e.message); 582 } 583 } 584 // TODO(hrxi): more sanity checking, perhaps replace who we're talking to 585 if (!responder.is_full()) { 586 throw new IqError.BAD_REQUEST("invalid responder JID"); 587 } 588 if (content.description == null || content.transport == null) { 589 throw new IqError.BAD_REQUEST("missing description or transport node"); 590 } 591 if (content.transport.ns_uri != transport.transport_ns_uri()) { 592 throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method"); 593 } 594 transport.on_transport_accept(content.transport); 595 // TODO(hrxi): handle content.description :) 596 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); 597 598 state = State.CONNECTING; 599 transport.create_transport_connection(stream, this); 600 } 601 void connection_created(XmppStream stream, IOStream? conn) { 602 if (state != State.CONNECTING) { 603 return; 604 } 605 if (conn != null) { 606 state = State.ACTIVE; 607 tried_transport_methods.clear(); 608 if (security != null) { 609 connection.set_inner(security.wrap_stream(conn)); 610 } else { 611 connection.set_inner(conn); 612 } 613 transport = null; 614 } else { 615 if (role == Role.INITIATOR) { 616 select_new_transport.begin(stream); 617 } else { 618 state = State.WAITING_FOR_TRANSPORT_REPLACE; 619 } 620 } 621 } 622 void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { 623 connection.on_terminated_by_jingle("remote terminated jingle session"); 624 state = State.ENDED; 625 stream.get_flag(Flag.IDENTITY).remove_session(sid); 626 627 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); 628 // TODO(hrxi): also handle presence type=unavailable 629 } 630 void handle_session_info(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { 631 StanzaNode? info = get_single_node_anyns(jingle); 632 if (info == null) { 633 // Jingle session ping 634 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); 635 return; 636 } 637 ContentType? content_type = stream.get_module(Module.IDENTITY).get_content_type(info.ns_uri); 638 if (content_type == null) { 639 throw new IqError.UNSUPPORTED_INFO("unknown session-info namespace"); 640 } 641 content_type.handle_content_session_info(stream, this, info, iq); 642 } 643 async void select_new_transport(XmppStream stream) { 644 Transport? new_transport = yield stream.get_module(Module.IDENTITY).select_transport(stream, type_, peer_full_jid, tried_transport_methods); 645 if (new_transport == null) { 646 StanzaNode reason = new StanzaNode.build("reason", NS_URI) 647 .put_node(new StanzaNode.build("failed-transport", NS_URI)); 648 terminate(reason, "failed transport"); 649 return; 650 } 651 tried_transport_methods.add(new_transport.transport_ns_uri()); 652 transport = new_transport.create_transport_parameters(stream, local_full_jid, peer_full_jid); 653 StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) 654 .add_self_xmlns() 655 .put_attribute("action", "transport-replace") 656 .put_attribute("sid", sid) 657 .put_node(new StanzaNode.build("content", NS_URI) 658 .put_attribute("creator", "initiator") 659 .put_attribute("name", content_name) 660 .put_node(transport.to_transport_stanza_node()) 661 ); 662 Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; 663 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); 664 state = State.REPLACING_TRANSPORT; 665 } 666 void handle_transport_accept(XmppStream stream, StanzaNode transport_node, StanzaNode jingle, Iq.Stanza iq) throws IqError { 667 if (state != State.REPLACING_TRANSPORT) { 668 throw new IqError.OUT_OF_ORDER("no outstanding transport-replace request"); 669 } 670 if (transport_node.ns_uri != transport.transport_ns_uri()) { 671 throw new IqError.BAD_REQUEST("transport-accept with unnegotiated transport method"); 672 } 673 transport.on_transport_accept(transport_node); 674 state = State.CONNECTING; 675 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); 676 transport.create_transport_connection(stream, this); 677 } 678 void handle_transport_reject(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { 679 if (state != State.REPLACING_TRANSPORT) { 680 throw new IqError.OUT_OF_ORDER("no outstanding transport-replace request"); 681 } 682 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); 683 select_new_transport.begin(stream); 684 } 685 void handle_transport_replace(XmppStream stream, StanzaNode transport_node, StanzaNode jingle, Iq.Stanza iq) throws IqError { 686 Transport? transport = stream.get_module(Module.IDENTITY).get_transport(transport_node.ns_uri); 687 TransportParameters? parameters = null; 688 if (transport != null) { 689 // Just parse the transport info for the errors. 690 parameters = transport.parse_transport_parameters(stream, local_full_jid, peer_full_jid, transport_node); 691 } 692 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); 693 if (state != State.WAITING_FOR_TRANSPORT_REPLACE || transport == null) { 694 StanzaNode jingle_response = new StanzaNode.build("jingle", NS_URI) 695 .add_self_xmlns() 696 .put_attribute("action", "transport-reject") 697 .put_attribute("sid", sid) 698 .put_node(new StanzaNode.build("content", NS_URI) 699 .put_attribute("creator", "initiator") 700 .put_attribute("name", content_name) 701 .put_node(transport_node) 702 ); 703 Iq.Stanza iq_response = new Iq.Stanza.set(jingle_response) { to=peer_full_jid }; 704 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq_response); 705 return; 706 } 707 this.transport = parameters; 708 StanzaNode jingle_response = new StanzaNode.build("jingle", NS_URI) 709 .add_self_xmlns() 710 .put_attribute("action", "transport-accept") 711 .put_attribute("sid", sid) 712 .put_node(new StanzaNode.build("content", NS_URI) 713 .put_attribute("creator", "initiator") 714 .put_attribute("name", content_name) 715 .put_node(this.transport.to_transport_stanza_node()) 716 ); 717 Iq.Stanza iq_response = new Iq.Stanza.set(jingle_response) { to=peer_full_jid }; 718 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq_response); 719 720 state = State.CONNECTING; 721 this.transport.create_transport_connection(stream, this); 722 } 723 void handle_transport_info(XmppStream stream, StanzaNode transport, StanzaNode jingle, Iq.Stanza iq) throws IqError { 724 this.transport.on_transport_info(transport); 725 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); 726 } 727 void verify_content(ContentNode content) throws IqError { 728 if (content.name != content_name || content.creator != content_creator) { 729 throw new IqError.BAD_REQUEST("unknown content"); 730 } 731 } 732 public void set_transport_connection(XmppStream stream, IOStream? conn) { 733 if (state != State.CONNECTING) { 734 return; 735 } 736 connection_created(stream, conn); 737 } 738 public void send_transport_info(XmppStream stream, StanzaNode transport) { 739 if (state != State.CONNECTING) { 740 return; 741 } 742 StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) 743 .add_self_xmlns() 744 .put_attribute("action", "transport-info") 745 .put_attribute("sid", sid) 746 .put_node(new StanzaNode.build("content", NS_URI) 747 .put_attribute("creator", "initiator") 748 .put_attribute("name", content_name) 749 .put_node(transport) 750 ); 751 Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; 752 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); 753 } 754 public void accept(XmppStream stream, StanzaNode description) { 755 if (state != State.INITIATE_RECEIVED) { 756 return; // TODO(hrxi): what to do? 757 } 758 StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) 759 .add_self_xmlns() 760 .put_attribute("action", "session-accept") 761 .put_attribute("sid", sid) 762 .put_node(new StanzaNode.build("content", NS_URI) 763 .put_attribute("creator", "initiator") 764 .put_attribute("name", content_name) 765 .put_node(description) 766 .put_node(transport.to_transport_stanza_node()) 767 ); 768 Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; 769 stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); 770 771 state = State.CONNECTING; 772 transport.create_transport_connection(stream, this); 773 } 774 775 public void reject(XmppStream stream) { 776 if (state != State.INITIATE_RECEIVED) { 777 return; // TODO(hrxi): what to do? 778 } 779 StanzaNode reason = new StanzaNode.build("reason", NS_URI) 780 .put_node(new StanzaNode.build("decline", NS_URI)); 781 terminate(reason, "declined"); 782 } 783 784 public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) { 785 StanzaNode reason = new StanzaNode.build("reason", NS_URI) 786 .put_node(new StanzaNode.build("failed-application", NS_URI)); 787 if (application_reason != null) { 788 reason.put_node(application_reason); 789 } 790 terminate(reason, "application error"); 791 } 792 793 public void on_connection_error(IOError error) { 794 // TODO(hrxi): where can we get an XmppStream from? 795 StanzaNode reason = new StanzaNode.build("reason", NS_URI) 796 .put_node(new StanzaNode.build("failed-transport", NS_URI)) 797 .put_node(new StanzaNode.build("text", NS_URI) 798 .put_node(new StanzaNode.text(error.message)) 799 ); 800 terminate(reason, @"transport error: $(error.message)"); 801 } 802 public void on_connection_close() { 803 if (terminate_on_connection_close) { 804 StanzaNode reason = new StanzaNode.build("reason", NS_URI) 805 .put_node(new StanzaNode.build("success", NS_URI)); 806 terminate(reason, "success"); 807 } 808 } 809 810 public void terminate(StanzaNode reason, string? local_reason) { 811 if (state == State.ENDED) { 812 return; 813 } 814 if (state == State.ACTIVE) { 815 if (local_reason != null) { 816 connection.on_terminated_by_jingle(@"local session-terminate: $(local_reason)"); 817 } else { 818 connection.on_terminated_by_jingle("local session-terminate"); 819 } 820 } 821 822 session_terminate_handler(peer_full_jid, sid, reason); 823 state = State.ENDED; 824 } 825} 826 827public class Connection : IOStream { 828 public class Input : InputStream { 829 private weak Connection connection; 830 public Input(Connection connection) { 831 this.connection = connection; 832 } 833 public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError { 834 throw new IOError.NOT_SUPPORTED("can't do non-async reads on jingle connections"); 835 } 836 public override async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { 837 return yield connection.read_async(buffer, io_priority, cancellable); 838 } 839 public override bool close(Cancellable? cancellable = null) throws IOError { 840 return connection.close_read(cancellable); 841 } 842 public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { 843 return yield connection.close_read_async(io_priority, cancellable); 844 } 845 } 846 public class Output : OutputStream { 847 private weak Connection connection; 848 public Output(Connection connection) { 849 this.connection = connection; 850 } 851 public override ssize_t write(uint8[] buffer, Cancellable? cancellable = null) throws IOError { 852 throw new IOError.NOT_SUPPORTED("can't do non-async writes on jingle connections"); 853 } 854 public override async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { 855 return yield connection.write_async(buffer, io_priority, cancellable); 856 } 857 public override bool close(Cancellable? cancellable = null) throws IOError { 858 return connection.close_write(cancellable); 859 } 860 public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { 861 return yield connection.close_write_async(io_priority, cancellable); 862 } 863 } 864 865 private Input input; 866 private Output output; 867 public override InputStream input_stream { get { return input; } } 868 public override OutputStream output_stream { get { return output; } } 869 870 private weak Session session; 871 private IOStream? inner = null; 872 private string? error = null; 873 874 private bool read_closed = false; 875 private bool write_closed = false; 876 877 private class OnSetInnerCallback { 878 public SourceFunc callback; 879 public int io_priority; 880 } 881 882 Gee.List<OnSetInnerCallback> callbacks = new ArrayList<OnSetInnerCallback>(); 883 884 public Connection(Session session) { 885 this.input = new Input(this); 886 this.output = new Output(this); 887 this.session = session; 888 } 889 890 public void set_inner(IOStream inner) { 891 assert(this.inner == null); 892 this.inner = inner; 893 foreach (OnSetInnerCallback c in callbacks) { 894 Idle.add((owned) c.callback, c.io_priority); 895 } 896 callbacks = null; 897 } 898 899 public void on_terminated_by_jingle(string reason) { 900 if (error == null) { 901 close_async.begin(); 902 error = reason; 903 } 904 } 905 906 private void check_for_errors() throws IOError { 907 if (error != null) { 908 throw new IOError.CLOSED(error); 909 } 910 } 911 private async void wait_and_check_for_errors(int io_priority, Cancellable? cancellable = null) throws IOError { 912 while (true) { 913 check_for_errors(); 914 if (inner != null) { 915 return; 916 } 917 SourceFunc callback = wait_and_check_for_errors.callback; 918 ulong id = 0; 919 if (cancellable != null) { 920 id = cancellable.connect(() => callback()); 921 } 922 callbacks.add(new OnSetInnerCallback() { callback=(owned)callback, io_priority=io_priority}); 923 yield; 924 if (cancellable != null) { 925 cancellable.disconnect(id); 926 } 927 } 928 } 929 private void handle_connection_error(IOError error) { 930 Session? strong = session; 931 if (strong != null) { 932 strong.on_connection_error(error); 933 } 934 } 935 private void handle_connection_close() { 936 Session? strong = session; 937 if (strong != null) { 938 strong.on_connection_close(); 939 } 940 } 941 942 public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { 943 yield wait_and_check_for_errors(io_priority, cancellable); 944 try { 945 return yield inner.input_stream.read_async(buffer, io_priority, cancellable); 946 } catch (IOError e) { 947 handle_connection_error(e); 948 throw e; 949 } 950 } 951 public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { 952 yield wait_and_check_for_errors(io_priority, cancellable); 953 try { 954 return yield inner.output_stream.write_async(buffer, io_priority, cancellable); 955 } catch (IOError e) { 956 handle_connection_error(e); 957 throw e; 958 } 959 } 960 public bool close_read(Cancellable? cancellable = null) throws IOError { 961 check_for_errors(); 962 if (read_closed) { 963 return true; 964 } 965 close_read_async.begin(GLib.Priority.DEFAULT, cancellable); 966 return true; 967 } 968 public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { 969 debug("Closing Jingle input stream"); 970 yield wait_and_check_for_errors(io_priority, cancellable); 971 if (read_closed) { 972 return true; 973 } 974 read_closed = true; 975 IOError error = null; 976 bool result = true; 977 try { 978 result = yield inner.input_stream.close_async(io_priority, cancellable); 979 } catch (IOError e) { 980 if (error == null) { 981 error = e; 982 } 983 } 984 try { 985 result = (yield close_if_both_closed(io_priority, cancellable)) && result; 986 } catch (IOError e) { 987 if (error == null) { 988 error = e; 989 } 990 } 991 if (error != null) { 992 handle_connection_error(error); 993 throw error; 994 } 995 return result; 996 } 997 public bool close_write(Cancellable? cancellable = null) throws IOError { 998 check_for_errors(); 999 if (write_closed) { 1000 return true; 1001 } 1002 close_write_async.begin(GLib.Priority.DEFAULT, cancellable); 1003 return true; 1004 } 1005 public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { 1006 yield wait_and_check_for_errors(io_priority, cancellable); 1007 if (write_closed) { 1008 return true; 1009 } 1010 write_closed = true; 1011 IOError error = null; 1012 bool result = true; 1013 try { 1014 result = yield inner.output_stream.close_async(io_priority, cancellable); 1015 } catch (IOError e) { 1016 if (error == null) { 1017 error = e; 1018 } 1019 } 1020 try { 1021 result = (yield close_if_both_closed(io_priority, cancellable)) && result; 1022 } catch (IOError e) { 1023 if (error == null) { 1024 error = e; 1025 } 1026 } 1027 if (error != null) { 1028 handle_connection_error(error); 1029 throw error; 1030 } 1031 return result; 1032 } 1033 private async bool close_if_both_closed(int io_priority, Cancellable? cancellable = null) throws IOError { 1034 if (read_closed && write_closed) { 1035 handle_connection_close(); 1036 //return yield inner.close_async(io_priority, cancellable); 1037 } 1038 return true; 1039 } 1040} 1041 1042public class Flag : XmppStreamFlag { 1043 public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "jingle"); 1044 1045 private HashMap<string, Session> sessions = new HashMap<string, Session>(); 1046 1047 public void add_session(Session session) { 1048 sessions[session.sid] = session; 1049 } 1050 public Session? get_session(string sid) { 1051 return sessions.has_key(sid) ? sessions[sid] : null; 1052 } 1053 public void remove_session(string sid) { 1054 sessions.unset(sid); 1055 } 1056 1057 public override string get_ns() { return NS_URI; } 1058 public override string get_id() { return IDENTITY.id; } 1059} 1060 1061} 1062