// Copyright 2020 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "cast/streaming/session_messager.h" #include "absl/strings/ascii.h" #include "cast/common/public/message_port.h" #include "cast/streaming/message_fields.h" #include "util/json/json_helpers.h" #include "util/json/json_serialization.h" #include "util/osp_logging.h" namespace openscreen { namespace cast { SessionMessager::SessionMessager(MessagePort* message_port, std::string source_id, ErrorCallback cb) : message_port_(message_port), error_callback_(std::move(cb)) { OSP_DCHECK(message_port_); message_port_->SetClient(this, source_id); } SessionMessager::~SessionMessager() { message_port_->ResetClient(); } void SessionMessager::SetHandler(std::string message_type, SessionMessager::MessageCallback cb) { OSP_DCHECK(std::none_of( callbacks_.begin(), callbacks_.end(), [message_type](std::pair pair) { return pair.first == message_type; })); callbacks_.emplace_back(message_type, std::move(cb)); } Error SessionMessager::SendMessage(SessionMessager::Message message) { if (message.message_namespace != kCastWebrtcNamespace) { return Error(Error::Code::kParameterInvalid, "Invalid namespace"); } if (message.body.isNull()) { return Error(Error::Code::kParameterInvalid, "No body"); } message.body[kSequenceNumber] = message.sequence_number; auto body_or_error = json::Stringify(message.body); if (body_or_error.is_error()) { return std::move(body_or_error.error()); } OSP_DVLOG << "Sending message: DESTINATION[" << message.sender_id << "], NAMESPACE[" << message.message_namespace << "], BODY:\n" << body_or_error.value(); message_port_->PostMessage(message.sender_id, message.message_namespace, body_or_error.value()); return Error::None(); } void SessionMessager::OnMessage(const std::string& source_id, const std::string& message_namespace, const std::string& message) { ErrorOr message_json = json::Parse(message); if (message_namespace != kCastWebrtcNamespace) { OSP_DLOG_WARN << "Received message with unknown namespace: " << message_namespace; return; } if (!message_json) { OSP_DLOG_WARN << "Received an invalid message: " << message; error_callback_(Error::Code::kJsonParseError); return; } OSP_DVLOG << "Received a message: " << message; int sequence_number; if (!json::ParseAndValidateInt(message_json.value()[kSequenceNumber], &sequence_number)) { OSP_DLOG_WARN << "Invalid message sequence number"; return; } std::string type; if (!json::ParseAndValidateString(message_json.value()[kMessageType], &type)) { OSP_DLOG_WARN << "Invalid message key"; return; } // Not all messages have results, but if they do the result must be OK. std::string result; if (!json::ParseAndValidateString(message_json.value()[kResult], &result)) { result.clear(); } for (const auto& pair : callbacks_) { if (pair.first == type) { // Currently all body keys are the lowercase version of the message type // key. This may need to be refactored if this is no longer the case. absl::AsciiStrToLower(&type); Json::Value body; if (result.empty() || result == kResultOk) { body = message_json.value()[type]; } else { body = message_json.value()[kErrorMessageBody]; } pair.second(Message{source_id.data(), message_namespace.data(), sequence_number, std::move(body)}); return; } } } void SessionMessager::OnError(Error error) { OSP_DLOG_WARN << "Received an error in the session messager: " << error; } } // namespace cast } // namespace openscreen