1## This Source Code Form is subject to the terms of the Mozilla Public 2## License, v. 2.0. If a copy of the MPL was not distributed with this 3## file, You can obtain one at https://mozilla.org/MPL/2.0/. 4## 5## Copyright (c) 2019-2021 VMware, Inc. or its affiliates. All rights reserved. 6 7defmodule RabbitMQ.CLI.Diagnostics.Commands.ConsumeEventStreamCommand do 8 @moduledoc """ 9 Displays standard log file location on the target node 10 """ 11 @behaviour RabbitMQ.CLI.CommandBehaviour 12 13 def switches(), do: [duration: :integer, pattern: :string, timeout: :integer] 14 def aliases(), do: [d: :duration, t: :timeout] 15 16 def merge_defaults(args, opts) do 17 {args, Map.merge(%{duration: :infinity, pattern: ".*", quiet: true}, opts)} 18 end 19 20 use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments 21 22 def run([], %{node: node_name, timeout: timeout, duration: duration, pattern: pattern}) do 23 pid = self() 24 ref = make_ref() 25 subscribed = :rabbit_misc.rpc_call( 26 node_name, 27 :rabbit_event_consumer, :register, 28 [pid, ref, duration, pattern], 29 timeout) 30 case subscribed do 31 {:ok, ^ref} -> 32 Stream.unfold(:confinue, 33 fn(:finished) -> nil 34 (:confinue) -> 35 receive do 36 {^ref, data, :finished} -> 37 {data, :finished}; 38 {^ref, data, :confinue} -> 39 {data, :confinue} 40 end 41 end) 42 error -> error 43 end 44 end 45 46 use RabbitMQ.CLI.DefaultOutput 47 48 def formatter(), do: RabbitMQ.CLI.Formatters.JsonStream 49 50 def printer(), do: RabbitMQ.CLI.Printers.StdIORaw 51 52 def help_section(), do: :observability_and_health_checks 53 54 def description(), do: "Streams internal events from a running node. Output is jq-compatible." 55 56 def usage, do: "consume_event_stream [--duration|-d <seconds>] [--pattern <pattern>]" 57 58 def usage_additional() do 59 [ 60 ["<duration_in_seconds>", "duration in seconds to stream log. Defaults to infinity"], 61 ["<pattern>", "regular expression to pick events"] 62 ] 63 end 64 65 def banner([], %{node: node_name, duration: :infinity}) do 66 "Streaming logs from node #{node_name} ..." 67 end 68 def banner([], %{node: node_name, duration: duration}) do 69 "Streaming logs from node #{node_name} for #{duration} seconds ..." 70 end 71end 72