1## This Source Code Form is subject to the terms of the Mozilla Public
2## License, v. 2.0. If a copy of the MPL was not distributed with this
3## file, You can obtain one at https://mozilla.org/MPL/2.0/.
4##
5## Copyright (c) 2019-2021 VMware, Inc. or its affiliates.  All rights reserved.
6
7defmodule RabbitMQ.CLI.Diagnostics.Commands.ConsumeEventStreamCommand do
8  @moduledoc """
9  Displays standard log file location on the target node
10  """
11  @behaviour RabbitMQ.CLI.CommandBehaviour
12
13  def switches(), do: [duration: :integer, pattern: :string, timeout: :integer]
14  def aliases(), do: [d: :duration, t: :timeout]
15
16  def merge_defaults(args, opts) do
17    {args, Map.merge(%{duration: :infinity, pattern: ".*", quiet: true}, opts)}
18  end
19
20  use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
21
22  def run([], %{node: node_name, timeout: timeout, duration: duration, pattern: pattern}) do
23    pid = self()
24    ref = make_ref()
25    subscribed = :rabbit_misc.rpc_call(
26      node_name,
27      :rabbit_event_consumer, :register,
28      [pid, ref, duration, pattern],
29      timeout)
30    case subscribed do
31      {:ok, ^ref} ->
32        Stream.unfold(:confinue,
33          fn(:finished) -> nil
34            (:confinue) ->
35              receive do
36              {^ref, data, :finished} ->
37                {data, :finished};
38              {^ref, data, :confinue} ->
39                {data, :confinue}
40            end
41          end)
42      error -> error
43    end
44  end
45
46  use RabbitMQ.CLI.DefaultOutput
47
48  def formatter(), do: RabbitMQ.CLI.Formatters.JsonStream
49
50  def printer(), do: RabbitMQ.CLI.Printers.StdIORaw
51
52  def help_section(), do: :observability_and_health_checks
53
54  def description(), do: "Streams internal events from a running node. Output is jq-compatible."
55
56  def usage, do: "consume_event_stream [--duration|-d <seconds>] [--pattern <pattern>]"
57
58  def usage_additional() do
59    [
60      ["<duration_in_seconds>", "duration in seconds to stream log. Defaults to infinity"],
61      ["<pattern>", "regular expression to pick events"]
62    ]
63  end
64
65  def banner([], %{node: node_name, duration: :infinity}) do
66    "Streaming logs from node #{node_name} ..."
67  end
68  def banner([], %{node: node_name, duration: duration}) do
69    "Streaming logs from node #{node_name} for #{duration} seconds ..."
70  end
71end
72