1<?php
2
3/*
4 * This file is part of the Symfony package.
5 *
6 * (c) Fabien Potencier <fabien@symfony.com>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Symfony\Component\VarDumper\Caster;
13
14use RdKafka\Conf;
15use RdKafka\Exception as RdKafkaException;
16use RdKafka\KafkaConsumer;
17use RdKafka\Message;
18use RdKafka\Metadata\Broker as BrokerMetadata;
19use RdKafka\Metadata\Collection as CollectionMetadata;
20use RdKafka\Metadata\Partition as PartitionMetadata;
21use RdKafka\Metadata\Topic as TopicMetadata;
22use RdKafka\Topic;
23use RdKafka\TopicConf;
24use RdKafka\TopicPartition;
25use Symfony\Component\VarDumper\Cloner\Stub;
26
27/**
28 * Casts RdKafka related classes to array representation.
29 *
30 * @author Romain Neutron <imprec@gmail.com>
31 */
32class RdKafkaCaster
33{
34    public static function castKafkaConsumer(KafkaConsumer $c, array $a, Stub $stub, bool $isNested)
35    {
36        $prefix = Caster::PREFIX_VIRTUAL;
37
38        try {
39            $assignment = $c->getAssignment();
40        } catch (RdKafkaException $e) {
41            $assignment = [];
42        }
43
44        $a += [
45            $prefix.'subscription' => $c->getSubscription(),
46            $prefix.'assignment' => $assignment,
47        ];
48
49        $a += self::extractMetadata($c);
50
51        return $a;
52    }
53
54    public static function castTopic(Topic $c, array $a, Stub $stub, bool $isNested)
55    {
56        $prefix = Caster::PREFIX_VIRTUAL;
57
58        $a += [
59            $prefix.'name' => $c->getName(),
60        ];
61
62        return $a;
63    }
64
65    public static function castTopicPartition(TopicPartition $c, array $a)
66    {
67        $prefix = Caster::PREFIX_VIRTUAL;
68
69        $a += [
70            $prefix.'offset' => $c->getOffset(),
71            $prefix.'partition' => $c->getPartition(),
72            $prefix.'topic' => $c->getTopic(),
73        ];
74
75        return $a;
76    }
77
78    public static function castMessage(Message $c, array $a, Stub $stub, bool $isNested)
79    {
80        $prefix = Caster::PREFIX_VIRTUAL;
81
82        $a += [
83            $prefix.'errstr' => $c->errstr(),
84        ];
85
86        return $a;
87    }
88
89    public static function castConf(Conf $c, array $a, Stub $stub, bool $isNested)
90    {
91        $prefix = Caster::PREFIX_VIRTUAL;
92
93        foreach ($c->dump() as $key => $value) {
94            $a[$prefix.$key] = $value;
95        }
96
97        return $a;
98    }
99
100    public static function castTopicConf(TopicConf $c, array $a, Stub $stub, bool $isNested)
101    {
102        $prefix = Caster::PREFIX_VIRTUAL;
103
104        foreach ($c->dump() as $key => $value) {
105            $a[$prefix.$key] = $value;
106        }
107
108        return $a;
109    }
110
111    public static function castRdKafka(\RdKafka $c, array $a, Stub $stub, bool $isNested)
112    {
113        $prefix = Caster::PREFIX_VIRTUAL;
114
115        $a += [
116            $prefix.'out_q_len' => $c->getOutQLen(),
117        ];
118
119        $a += self::extractMetadata($c);
120
121        return $a;
122    }
123
124    public static function castCollectionMetadata(CollectionMetadata $c, array $a, Stub $stub, bool $isNested)
125    {
126        $a += iterator_to_array($c);
127
128        return $a;
129    }
130
131    public static function castTopicMetadata(TopicMetadata $c, array $a, Stub $stub, bool $isNested)
132    {
133        $prefix = Caster::PREFIX_VIRTUAL;
134
135        $a += [
136            $prefix.'name' => $c->getTopic(),
137            $prefix.'partitions' => $c->getPartitions(),
138        ];
139
140        return $a;
141    }
142
143    public static function castPartitionMetadata(PartitionMetadata $c, array $a, Stub $stub, bool $isNested)
144    {
145        $prefix = Caster::PREFIX_VIRTUAL;
146
147        $a += [
148            $prefix.'id' => $c->getId(),
149            $prefix.'err' => $c->getErr(),
150            $prefix.'leader' => $c->getLeader(),
151        ];
152
153        return $a;
154    }
155
156    public static function castBrokerMetadata(BrokerMetadata $c, array $a, Stub $stub, bool $isNested)
157    {
158        $prefix = Caster::PREFIX_VIRTUAL;
159
160        $a += [
161            $prefix.'id' => $c->getId(),
162            $prefix.'host' => $c->getHost(),
163            $prefix.'port' => $c->getPort(),
164        ];
165
166        return $a;
167    }
168
169    private static function extractMetadata($c)
170    {
171        $prefix = Caster::PREFIX_VIRTUAL;
172
173        try {
174            $m = $c->getMetadata(true, null, 500);
175        } catch (RdKafkaException $e) {
176            return [];
177        }
178
179        return [
180            $prefix.'orig_broker_id' => $m->getOrigBrokerId(),
181            $prefix.'orig_broker_name' => $m->getOrigBrokerName(),
182            $prefix.'brokers' => $m->getBrokers(),
183            $prefix.'topics' => $m->getTopics(),
184        ];
185    }
186}
187