1package Kafka::Protocol;
2
3=head1 NAME
4
5Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.
6
7=head1 VERSION
8
9This documentation refers to C<Kafka::Protocol> version 0.8010 .
10
11=cut
12
13#-- Pragmas --------------------------------------------------------------------
14
15use 5.010;
16use strict;
17use warnings;
18
19# ENVIRONMENT ------------------------------------------------------------------
20
21our $VERSION = '0.8010';
22
23use Exporter qw(
24    import
25);
26our @EXPORT_OK = qw(
27    decode_fetch_response
28    decode_metadata_response
29    decode_offset_response
30    decode_produce_response
31    encode_fetch_request
32    encode_metadata_request
33    encode_offset_request
34    encode_produce_request
35    _decode_MessageSet_template
36    _decode_MessageSet_array
37    _encode_MessageSet_array
38    _encode_string
39    _pack64
40    _unpack64
41    _verify_string
42    $APIVERSION
43    $BAD_OFFSET
44    $COMPRESSION_CODEC_MASK
45    $CONSUMERS_REPLICAID
46    $NULL_BYTES_LENGTH
47    $_int64_template
48);
49
50#-- load the modules -----------------------------------------------------------
51
52use Compress::Snappy;
53use Const::Fast;
54use IO::Compress::Gzip qw(
55    gzip
56    $GzipError
57);
58use IO::Uncompress::Gunzip qw(
59    gunzip
60    $GunzipError
61);
62use Params::Util qw(
63    _ARRAY
64    _HASH
65    _SCALAR
66    _STRING
67);
68use Scalar::Util qw(
69    dualvar
70);
71use String::CRC32;
72
73use Kafka qw(
74    $BITS64
75    $BLOCK_UNTIL_IS_COMMITTED
76    $COMPRESSION_GZIP
77    $COMPRESSION_NONE
78    $COMPRESSION_SNAPPY
79    $DEFAULT_MAX_WAIT_TIME
80    %ERROR
81    $ERROR_COMPRESSION
82    $ERROR_MISMATCH_ARGUMENT
83    $ERROR_NOT_BINARY_STRING
84    $ERROR_REQUEST_OR_RESPONSE
85    $NOT_SEND_ANY_RESPONSE
86    $RECEIVE_EARLIEST_OFFSETS
87    $RECEIVE_LATEST_OFFSET
88    $WAIT_WRITTEN_TO_LOCAL_LOG
89);
90use Kafka::Exceptions;
91use Kafka::Internals qw(
92    $APIKEY_FETCH
93    $APIKEY_METADATA
94    $APIKEY_OFFSET
95    $APIKEY_PRODUCE
96    $PRODUCER_ANY_OFFSET
97);
98
99#-- declarations ---------------------------------------------------------------
100
101=head1 SYNOPSIS
102
103    use 5.010;
104    use strict;
105    use warnings;
106
107    use Data::Compare;
108    use Kafka qw(
109        $COMPRESSION_NONE
110        $ERROR_NO_ERROR
111        $REQUEST_TIMEOUT
112        $WAIT_WRITTEN_TO_LOCAL_LOG
113    );
114    use Kafka::Internals qw(
115        $PRODUCER_ANY_OFFSET
116    );
117    use Kafka::Protocol qw(
118        decode_produce_response
119        encode_produce_request
120    );
121
122    # a encoded produce request hex stream
123    my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );
124
125    # a decoded produce request
126    my $decoded = {
127        CorrelationId                       => 4,
128        ClientId                            => q{},
129        RequiredAcks                        => $WAIT_WRITTEN_TO_LOCAL_LOG,
130        Timeout                             => $REQUEST_TIMEOUT * 100,  # ms
131        topics                              => [
132            {
133                TopicName                   => 'mytopic',
134                partitions                  => [
135                    {
136                        Partition           => 0,
137                        MessageSet              => [
138                            {
139                                Offset          => $PRODUCER_ANY_OFFSET,
140                                MagicByte       => 0,
141                                Attributes      => $COMPRESSION_NONE,
142                                Key             => q{},
143                                Value           => 'Hello!',
144                            },
145                        ],
146                    },
147                ],
148            },
149        ],
150    };
151
152    my $encoded_request = encode_produce_request( $decoded );
153    say 'encoded correctly' if $encoded_request eq $encoded;
154
155    # a encoded produce response hex stream
156    $encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );
157
158    # a decoded produce response
159    $decoded = {
160        CorrelationId                           => 4,
161        topics                                  => [
162            {
163                TopicName                       => 'mytopic',
164                partitions                      => [
165                    {
166                        Partition               => 0,
167                        ErrorCode               => $ERROR_NO_ERROR,
168                        Offset                  => 0,
169                    },
170                ],
171            },
172        ],
173    };
174
175    my $decoded_response = decode_produce_response( \$encoded );
176    say 'decoded correctly' if Compare( $decoded_response, $decoded );
177
178    # more examples, see t/*_decode_encode.t
179
180=head1 DESCRIPTION
181
182This module is not a user module.
183
184In order to achieve better performance,
185functions of this module do not perform arguments validation.
186
187The main features of the C<Kafka::Protocol> module are:
188
189=over 3
190
191=item *
192
193Supports parsing the Apache Kafka protocol.
194
195=item *
196
197Supports Apache Kafka Requests and Responses (PRODUCE and FETCH).
198Within this package we currently support
199access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.
200
201=item *
202
203Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.
204
205=back
206
207=cut
208
209# A Guide To The Kafka Protocol 0.8:
210# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
211#
212# -- Protocol Primitive Types
213# int8, int16, int32, int64
214#     Signed integers
215#     stored in big endian order.
216# bytes, string
217#     consist of a signed integer
218#     giving a length N
219#     followed by N bytes of content.
220#     A length of -1 indicates null.
221#     string uses an int16 for its size,
222#     and bytes uses an int32.
223# Arrays
224#     These will always be encoded as an int32 size containing the length N
225#     followed by N repetitions of the structure
226#     which can itself be made up of other primitive types.
227#
228# -- N.B.
229# - The response will always match the paired request
230# - One structure common to both the produce and fetch requests is the message set format.
231# - MessageSets are not preceded by an int32 like other array elements in the protocol.
232# - A message set is also the unit of compression in Kafka,
233#     and we allow messages to recursively contain compressed message sets.
234#
235# -- Protocol Fields
236# ApiKey => int16                 That identifies the API being invoked
237# ApiVersion => int16             This is a numeric version number for this api.
238#                                Currently the supported version for all APIs is 0.
239# Attributes => int8              Metadata attributes about the message.
240#                                 The lowest 2 bits contain the compression codec used for the message.
241# ClientId => string              This is a user supplied identifier for the client application.
242# CorrelationId => int32          This is a user-supplied integer.
243#                                 It will be passed back in the response by the server, unmodified.
244#                                 It is useful for matching request and response between the client and server.
245# Crc => int32                    The CRC32 of the remainder of the message bytes.
246# ErrorCode => int16              The error from this partition, if any.
247#                                 Errors are given on a per-partition basis
248#                                     because a given partition may be unavailable or maintained on a different host,
249#                                     while others may have successfully accepted the produce request.
250# FetchOffset => int64            The offset to begin this fetch from.
251# HighwaterMarkOffset => int64    The offset at the end of the log for this partition.
252#                                 This can be used by the client to determine how many messages behind the end of the log they are.
253#                                 - 0.8 documents: Replication design
254#                                 The high watermark is the offset of the last committed message.
255#                                 Each log is periodically synced to disks.
256#                                 Data before the flushed offset is guaranteed to be persisted on disks.
257#                                 As we will see, the flush offset can be before or after high watermark.
258#                                 - 0.7 documents: Wire protocol
259#                                 If the last segment file for the partition is not empty and was modified earlier than TIME,
260#                                         it will return both the first offset for that segment and the high water mark.
261#                                 The high water mark is not the offset of the last message,
262#                                         but rather the offset that the next message sent to the partition will be written to.
263# Host => string                  The brokers hostname
264# Isr => [ReplicaId]              The set subset of the replicas that are "caught up" to the leader - a set of in-sync replicas (ISR)
265# Key => bytes                    An optional message key
266#                                 The key can be null.
267# Leader => int32                 The node id for the kafka broker currently acting as leader for this partition.
268#                                 If no leader exists because we are in the middle of a leader election this id will be -1.
269# MagicByte => int8               A version id used to allow backwards compatible evolution of the message binary format.
270# MaxBytes => int32               The maximum bytes to include in the message set for this partition.
271# MaxNumberOfOffsets => int32     Kafka here is return up to 'MaxNumberOfOffsets' of offsets
272# MaxWaitTime => int32            The maximum amount of time (ms)
273#                                     to block waiting
274#                                     if insufficient data is available at the time the request is issued.
275# MessageSetSize => int32         The size in bytes of the message set for this partition
276# MessageSize => int32            The size of the subsequent request or response message in bytes
277# MinBytes => int32               The minimum number of bytes of messages that must be available to give a response.
278#                                 If the client sets this to 0 the server will always respond immediately.
279#                                 If this is set to 1,
280#                                     the server will respond as soon
281#                                     as at least one partition
282#                                     has at least 1 byte of data
283#                                     or the specified timeout occurs.
284#                                 By setting higher values
285#                                     in combination with the timeout
286#                                     for reading only large chunks of data
287# NodeId => int32                 The id of the broker.
288#                                 This must be set to a unique integer for each broker.
289# Offset => int64                 The offset used in kafka as the log sequence number.
290#                                 When the producer is sending messages it doesn't actually know the offset
291#                                     and can fill in any value here it likes.
292# Partition => int32              The id of the partition the fetch is for
293#                                     or the partition that data is being published to
294#                                     or the partition this response entry corresponds to.
295# Port => int32                   The brokers port
296# ReplicaId => int32              Indicates the node id of the replica initiating this request.
297#                                 Normal client consumers should always specify this as -1 as they have no node id.
298# Replicas => [ReplicaId]         The set of alive nodes that currently acts as slaves for the leader for this partition.
299# RequiredAcks => int16           Indicates how many acknowledgements the servers should receive
300#                                     before responding to the request.
301#                                 If it is 0 the server does not send any response.
302#                                 If it is 1, the server will wait the data is written to the local log before sending a response.
303#                                 If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.
304#                                 For any number > 1 the server will block waiting for this number of acknowledgements to occur
305#                                 (but the server will never wait for more acknowledgements than there are in-sync replicas).
306# Size => int32                   The size of the subsequent request or response message in bytes
307# Time => int64                   Used to ask for all messages before a certain time (ms).
308#                                 There are two special values.
309#                                 Specify -1 to receive the latest offset (this will only ever return one offset).
310#                                 Specify -2 to receive the earliest available offsets.
311# Timeout => int32                This provides a maximum time (ms) the server can await the receipt
312#                                     of the number of acknowledgements in RequiredAcks.
313#                                 The timeout is not an exact limit on the request time for a few reasons:
314#                                 (1) it does not include network latency,
315#                                 (2) the timer begins at the beginning of the processing of this request
316#                                     so if many requests are queued due to server overload
317#                                     that wait time will not be included,
318#                                 (3) we will not terminate a local write
319#                                     so if the local write time exceeds this timeout it will not be respected.
320#                                 To get a hard timeout of this type the client should use the socket timeout.
321# TopicName => string             The name of the topic.
322# Value => bytes                  The actual message contents
323#                                 Kafka supports recursive messages in which case this may itself contain a message set.
324#                                 The message can be null.
325
326our $_int64_template;                           # Used to unpack a 64 bit value
327if ( $BITS64 ) {
328    $_int64_template    = q{q>};
329    # unpack a big-endian signed quad (64-bit) value on 64 bit systems.
330    *_unpack64          = sub { $_[0] };
331    # pack a big-endian signed quad (64-bit) value on 64 bit systems.
332    *_pack64            = sub { pack( q{q>}, $_[0] ) };
333} else {
334    eval q{ require Kafka::Int64; }                 ## no critic
335        or die "Cannot load Kafka::Int64 : $@";
336
337    $_int64_template    = q{a[8]};
338    # unpack a big-endian signed quad (64-bit) value on 32 bit systems.
339    *_unpack64          = \&Kafka::Int64::unpackq;
340    # pack a big-endian signed quad (64-bit) value on 32 bit systems.
341    *_pack64            = \&Kafka::Int64::packq;
342}
343
344=head2 EXPORT
345
346The following constants are available for export
347
348=cut
349
350=head3 C<$APIVERSION>
351
352According to Apache Kafka documentation: 'This is a numeric version number for this api.
353Currently the supported version for all APIs is 0 .'
354
355=cut
356const our $APIVERSION                   => 0;
357
358# Attributes
359
360# According to Apache Kafka documentation:
361# Attributes - Metadata attributes about the message.
362# The lowest 2 bits contain the compression codec used for the message.
363const our $COMPRESSION_CODEC_MASK       => 0b11;
364
365=head3 C<$CONSUMERS_REPLICAID>
366
367According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'
368
369=cut
370const our $CONSUMERS_REPLICAID          => -1;
371
372=head3 C<$NULL_BYTES_LENGTH>
373
374According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'
375
376=cut
377const our $NULL_BYTES_LENGTH            => -1;
378
379=head3 C<$BAD_OFFSET>
380
381According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset
382and can fill in any value here it likes.'
383
384=cut
385const our $BAD_OFFSET                   => -1;
386
387my ( $_Request_header_template,             $_Request_header_length ) = (
388    q{l>s>s>l>s>},          # Size
389                            # 2 ApiKey
390                            # 2 ApiVersion
391                            # 4 CorrelationId
392                            # 2 ClientId length
393    10                      # 'Size' is not included in the calculation of length
394);
395my ( $_ProduceRequest_header_template,      $_ProduceRequest_header_length ) = (
396    q{s>l>l>},              # 2 RequiredAcks
397                            # 4 Timeout
398                            # 4 topics array size
399    10
400);
401my ( $_MessageSet_template,                 $_MessageSet_length ) = (
402    q{a[8]l>},              #    a8                  # 8 Offset
403                            # 4 MessageSize
404    12
405);
406my ( $_FetchRequest_header_template,        $_FetchRequest_header_length ) = (
407    q{l>l>l>l>},
408                            # 4 ReplicaId
409                            # 4 MaxWaitTime
410                            # 4 MinBytes
411                            # 4 topics array size
412    16
413);
414my ( $_FetchRequest_body_template,          $_FetchRequest_body_length ) = (
415    q{l>a[8]l>},            # 4 Partition
416                            # 8 FetchOffset
417                            # 4 MaxBytes
418    16
419);
420my ( $_OffsetRequest_header_template,       $_OffsetRequest_header_length ) = (
421    q{l>l>},                # 4 ReplicaId
422                            # 4 topics array size
423    8
424);
425my ( $_OffsetRequest_body_template,         $_OffsetRequest_body_length ) = (
426    q{l>a[8]l>},            # 4 Partition
427                            # 8 Time
428                            # 4 MaxNumberOfOffsets
429    16
430);
431my ( $_FetchResponse_header_template,       $_FetchResponse_header_length ) = (
432    q{x[l]l>l>},            # Size (skip)
433                            # 4 CorrelationId
434                            # 4 topics array size
435    8
436);
437my ( $_Message_template,                    $_Message_length ) = (
438    qq(${_int64_template}l>l>ccl>),
439                            # 8 Offset
440                            # MessageSize
441                            # Crc
442                            # MagicByte
443                            # Attributes
444                            # Key length
445    8                       # Only Offset length
446);
447my ( $_FetchResponse_topic_body_template,   $_FetchResponse_topic_body_length )= (
448    qq(s>/al>l>s>${_int64_template}),
449                            # TopicName
450                            # partitions array size
451                            # 4 Partition
452                            # 2 ErrorCode
453                            # 8 HighwaterMarkOffset
454    14                      # without TopicName and partitions array size
455);
456my $_Key_or_Value_template = q{X[l]l>/a};   # Key or Value
457
458#-- public functions -----------------------------------------------------------
459
460=head2 FUNCTIONS
461
462The following functions are available for C<Kafka::MockProtocol> module.
463
464=cut
465
466# PRODUCE Request --------------------------------------------------------------
467
468=head3 C<encode_produce_request( $Produce_Request, $compression_codec )>
469
470Encodes the argument and returns a reference to the encoded binary string
471representing a Request buffer.
472
473This function take argument. The following argument is currently recognized:
474
475=over 3
476
477=item C<$Produce_Request>
478
479C<$Produce_Request> is a reference to the hash representing
480the structure of the PRODUCE Request (examples see C<t/*_decode_encode.t>).
481
482=item C<$compression_codec>
483
484Optional.
485
486C<$compression_codec> sets the required type of C<$messages> compression,
487if the compression is desirable.
488
489Supported codecs:
490L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>,
491L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>,
492L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>.
493
494=back
495
496=cut
497sub encode_produce_request {
498    my ( $Produce_Request, $compression_codec ) = @_;
499
500    my @data;
501    my $request = {
502                                                # template    => '...',
503                                                # len         => ...,
504        data        => \@data,
505    };
506
507    _encode_request_header( $request, $APIKEY_PRODUCE, $Produce_Request );
508                                                                            # Size
509                                                                            # ApiKey
510                                                                            # ApiVersion
511                                                                            # CorrelationId
512                                                                            # ClientId
513
514    my $topics_array = $Produce_Request->{topics};
515    push( @data,
516        $Produce_Request->{RequiredAcks},                                   # RequiredAcks
517        $Produce_Request->{Timeout},                                        # Timeout
518        scalar( @$topics_array ),                                           # topics array size
519    );
520    $request->{template}    .= $_ProduceRequest_header_template;
521    $request->{len}         += $_ProduceRequest_header_length;
522
523    foreach my $topic ( @$topics_array ) {
524        $request->{template}    .= q{s>};                                   # string length
525        $request->{len}         += 2;
526        _encode_string( $request, $topic->{TopicName} );                    # TopicName
527
528        my $partitions_array = $topic->{partitions};
529        push( @data, scalar( @$partitions_array ) );
530        $request->{template}    .= q{l>};                                   # partitions array size
531        $request->{len}         += 4;
532        foreach my $partition ( @$partitions_array ) {
533            push( @data, $partition->{Partition} );
534            $request->{template}    .= q{l>};                               # Partition
535            $request->{len}         += 4;
536
537            _encode_MessageSet_array( $request, $partition->{MessageSet}, $compression_codec );
538        }
539    }
540
541    return pack( $request->{template}, $request->{len}, @data );
542}
543
544# PRODUCE Response -------------------------------------------------------------
545
546my $_decode_produce_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))};
547                                        # x[l]                    # Size (skip)
548                                        # l>                      # CorrelationId
549
550                                        # l>                      # topics array size
551                                        # X[l]
552                                        # l>/(                    # topics array
553                                        #     s>/a                    # TopicName
554
555                                        #     l>                      # partitions array size
556                                        #     X[l]
557                                        #     l>/(                    # partitions array
558                                        #         l>                      # Partition
559                                        #         s>                      # ErrorCode
560                                        #         $_int64_template        # Offset
561                                        #     )
562                                        # )
563
564=head3 C<decode_produce_response( $bin_stream_ref )>
565
566Decodes the argument and returns a reference to the hash representing
567the structure of the PRODUCE Response (examples see C<t/*_decode_encode.t>).
568
569This function take argument. The following argument is currently recognized:
570
571=over 3
572
573=item C<$bin_stream_ref>
574
575C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
576must be a non-empty binary string.
577
578=back
579
580=cut
581sub decode_produce_response {
582    my ( $bin_stream_ref ) = @_;
583
584    my @data = unpack( $_decode_produce_response_template, $$bin_stream_ref );
585
586    my $i = 0;
587    my $Produce_Response = {};
588
589    $Produce_Response->{CorrelationId}              =  $data[ $i++ ];   # CorrelationId
590
591    my $topics_array = $Produce_Response->{topics}  =  [];
592    my $topics_array_size                           =  $data[ $i++ ];   # topics array size
593    while ( $topics_array_size-- ) {
594        my $topic = {
595            TopicName                               => $data[ $i++ ],
596        };
597
598        my $partitions_array = $topic->{partitions} =  [];
599        my $partitions_array_size                   =  $data[ $i++ ];   # partitions array size
600        while ( $partitions_array_size-- ) {
601            my $partition = {
602                Partition                           => $data[ $i++ ],   # Partition
603                ErrorCode                           => $data[ $i++ ],   # ErrorCode
604                Offset                   => _unpack64( $data[ $i++ ] ), # Offset
605            };
606
607            push( @$partitions_array, $partition );
608        }
609
610        push( @$topics_array, $topic );
611    }
612
613    return $Produce_Response;
614}
615
616# FETCH Request ----------------------------------------------------------------
617
618=head3 C<encode_fetch_request( $Fetch_Request )>
619
620Encodes the argument and returns a reference to the encoded binary string
621representing a Request buffer.
622
623This function take argument. The following argument is currently recognized:
624
625=over 3
626
627=item C<$Fetch_Request>
628
629C<$Fetch_Request> is a reference to the hash representing
630the structure of the FETCH Request (examples see C<t/*_decode_encode.t>).
631
632=back
633
634=cut
635sub encode_fetch_request {
636    my ( $Fetch_Request ) = @_;
637
638    my @data;
639    my $request = {
640                                                # template    => '...',
641                                                # len         => ...,
642        data        => \@data,
643    };
644
645    _encode_request_header( $request, $APIKEY_FETCH, $Fetch_Request );
646                                                                            # Size
647                                                                            # ApiKey
648                                                                            # ApiVersion
649                                                                            # CorrelationId
650                                                                            # ClientId
651
652    push( @data, $CONSUMERS_REPLICAID );                                    # ReplicaId
653    my $topics_array = $Fetch_Request->{topics};
654    push( @data,
655        $Fetch_Request->{MaxWaitTime},                                      # MaxWaitTime
656        $Fetch_Request->{MinBytes},                                         # MinBytes
657        scalar( @$topics_array ),                                           # topics array size
658    );
659    $request->{template}    .= $_FetchRequest_header_template;
660    $request->{len}         += $_FetchRequest_header_length;
661
662    foreach my $topic ( @$topics_array ) {
663        $request->{template}    .= q{s>};                                   # string length
664        $request->{len}         += 2;
665        _encode_string( $request, $topic->{TopicName} );                    # TopicName
666
667        my $partitions_array = $topic->{partitions};
668        push( @data, scalar( @$partitions_array ) );
669        $request->{template}    .= q{l>};                                   # partitions array size
670        $request->{len}         += 4;
671        foreach my $partition ( @$partitions_array ) {
672            push( @data,
673                $partition->{Partition},                                    # Partition
674                _pack64( $partition->{FetchOffset} ),                       # FetchOffset
675                $partition->{MaxBytes},                                     # MaxBytes
676            );
677            $request->{template}    .= $_FetchRequest_body_template;
678            $request->{len}         += $_FetchRequest_body_length;
679        }
680    }
681
682    return pack( $request->{template}, $request->{len}, @data );
683}
684
685# FETCH Response ---------------------------------------------------------------
686
687=head3 C<decode_fetch_response( $bin_stream_ref )>
688
689Decodes the argument and returns a reference to the hash representing
690the structure of the FETCH Response (examples see C<t/*_decode_encode.t>).
691
692This function take argument. The following argument is currently recognized:
693
694=over 3
695
696=item C<$bin_stream_ref>
697
698C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
699must be a non-empty binary string.
700
701=back
702
703=cut
704sub decode_fetch_response {
705    my ( $bin_stream_ref ) = @_;
706
707# According to Apache Kafka documentation:
708# As an optimization the server is allowed to return a partial message at the end of the message set.
709# Clients should handle this case.
710# NOTE: look inside _decode_MessageSet_template and _decode_MessageSet_array
711
712    my @data;
713    my $response = {
714                                                # template      => '...',
715                                                # stream_offset => ...,
716        data        => \@data,
717        bin_stream  => $bin_stream_ref,
718    };
719
720    _decode_fetch_response_template( $response );
721    @data = unpack( $response->{template}, $$bin_stream_ref );
722
723    my $i = 0;
724    my $Fetch_Response = {};
725
726    $Fetch_Response->{CorrelationId}                        =  $data[ $i++ ];   # CorrelationId
727
728    my $topics_array = $Fetch_Response->{topics}            =  [];
729    my $topics_array_size                                   =  $data[ $i++ ];   # topics array size
730    while ( $topics_array_size-- ) {
731        my $topic = {
732            TopicName                                       => $ data[ $i++ ],  # TopicName
733        };
734
735        my $partitions_array = $topic->{partitions}         =  [];
736        my $partitions_array_size                           =  $data[ $i++ ];   # partitions array size
737        my ( $MessageSetSize, $MessageSet_array );
738        while ( $partitions_array_size-- ) {
739            my $partition = {
740                Partition                                   => $data[ $i++ ],   # Partition
741                ErrorCode                                   => $data[ $i++ ],   # ErrorCode
742                HighwaterMarkOffset              => _unpack64( $data[ $i++ ] ), # HighwaterMarkOffset
743            };
744
745            $MessageSetSize                                 =  $data[ $i++ ];   # MessageSetSize
746            $MessageSet_array = $partition->{MessageSet}    =  [];
747
748            _decode_MessageSet_array( $response, $MessageSetSize, \$i, $MessageSet_array );
749
750            push( @$partitions_array, $partition );
751        }
752
753        push( @$topics_array, $topic );
754    }
755
756    return $Fetch_Response;
757}
758
759# OFFSET Request ---------------------------------------------------------------
760
761=head3 C<encode_offset_request( $Offset_Request )>
762
763Encodes the argument and returns a reference to the encoded binary string
764representing a Request buffer.
765
766This function take argument. The following argument is currently recognized:
767
768=over 3
769
770=item C<$Offset_Request>
771
772C<$Offset_Request> is a reference to the hash representing
773the structure of the OFFSET Request (examples see C<t/*_decode_encode.t>).
774
775=back
776
777=cut
778sub encode_offset_request {
779    my ( $Offset_Request ) = @_;
780
781    my @data;
782    my $request = {
783                                                # template    => '...',
784                                                # len         => ...,
785        data        => \@data,
786    };
787
788    _encode_request_header( $request, $APIKEY_OFFSET, $Offset_Request );
789                                                                            # Size
790                                                                            # ApiKey
791                                                                            # ApiVersion
792                                                                            # CorrelationId
793                                                                            # ClientId
794
795    my $topics_array = $Offset_Request->{topics};
796    push( @data,
797        $CONSUMERS_REPLICAID,                                               # ReplicaId
798        scalar( @$topics_array ),                                           # topics array size
799    );
800    $request->{template}    .= $_OffsetRequest_header_template;
801    $request->{len}         += $_OffsetRequest_header_length;
802
803    foreach my $topic ( @$topics_array ) {
804        $request->{template}    .= q{s>};                                   # string length
805        $request->{len}          += 2;
806        _encode_string( $request, $topic->{TopicName} );                    # TopicName
807
808        my $partitions_array = $topic->{partitions};
809        push( @data, scalar( @$partitions_array ) );
810        $request->{template}    .= q{l>};                                   # partitions array size
811        $request->{len}         += 4;   # [l] partitions array size
812        foreach my $partition ( @$partitions_array ) {
813            push( @data,
814                $partition->{Partition},                                    # Partition
815                _pack64( $partition->{Time} ),                              # Time
816                $partition->{MaxNumberOfOffsets},                           # MaxNumberOfOffsets
817            );
818            $request->{template}    .= $_OffsetRequest_body_template;
819            $request->{len}         += $_OffsetRequest_body_length;
820        }
821    }
822
823    return pack( $request->{template}, $request->{len}, @data );
824}
825
826# OFFSET Response --------------------------------------------------------------
827
828my $_decode_offset_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>l>X[l]l>/(${_int64_template})))};
829                                        # x[l]                    # Size (skip)
830                                        # l>                      # CorrelationId
831
832                                        # l>                      # topics array size
833                                        # X[l]
834                                        # l>/(                    # topics array
835                                        #     s>/a                    # TopicName
836
837                                        #     l>                      # PartitionOffsets array size
838                                        #     X[l]
839                                        #     l>/(                    # PartitionOffsets array
840                                        #         l>                      # Partition
841                                        #         s>                      # ErrorCode
842
843                                        #         l>                      # Offset array size
844                                        #         X[l]
845                                        #         l>/(                    # Offset array
846                                        #             $_int64_template        # Offset
847                                        #         )
848                                        #     )
849                                        # )
850
851=head3 C<decode_offset_response( $bin_stream_ref )>
852
853Decodes the argument and returns a reference to the hash representing
854the structure of the OFFSET Response (examples see C<t/*_decode_encode.t>).
855
856This function take argument. The following argument is currently recognized:
857
858=over 3
859
860=item C<$bin_stream_ref>
861
862C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
863must be a non-empty binary string.
864
865=back
866
867=cut
868sub decode_offset_response {
869    my ( $bin_stream_ref ) = @_;
870
871    my @data = unpack( $_decode_offset_response_template, $$bin_stream_ref );
872
873    my $i = 0;
874    my $Offset_Response = {};
875
876    $Offset_Response->{CorrelationId}                           =  $data[ $i++ ];   # CorrelationId
877
878    my $topics_array = $Offset_Response->{topics}               =  [];
879    my $topics_array_size                                       =  $data[ $i++ ];   # topics array size
880    while ( $topics_array_size-- ) {
881        my $topic = {
882            TopicName                                           => $data[ $i++ ],   # TopicName
883        };
884
885        my $PartitionOffsets_array = $topic->{PartitionOffsets} =  [];
886        my $PartitionOffsets_array_size                         =  $data[ $i++ ];   # PartitionOffsets array size
887        my ( $PartitionOffset, $Offset_array, $Offset_array_size );
888        while ( $PartitionOffsets_array_size-- ) {
889            $PartitionOffset = {
890                Partition                                       => $data[ $i++ ],   # Partition
891                ErrorCode                                       => $data[ $i++ ],   # ErrorCode
892            };
893
894            $Offset_array = $PartitionOffset->{Offset}          =  [];
895            $Offset_array_size                                  =  $data[ $i++ ];   # Offset array size
896            while ( $Offset_array_size-- ) {
897                push( @$Offset_array,                   _unpack64( $data[ $i++ ] ) );   # Offset
898            }
899
900            push( @$PartitionOffsets_array, $PartitionOffset );
901        }
902
903        push( @$topics_array, $topic );
904    }
905
906    return $Offset_Response;
907}
908
909# METADATA Request -------------------------------------------------------------
910
911=head3 C<encode_metadata_request( $Metadata_Request )>
912
913Encodes the argument and returns a reference to the encoded binary string
914representing a Request buffer.
915
916This function take argument. The following argument is currently recognized:
917
918=over 3
919
920=item C<$Metadata_Request>
921
922C<$Metadata_Request> is a reference to the hash representing
923the structure of the METADATA Request (examples see C<t/*_decode_encode.t>).
924
925=back
926
927=cut
928sub encode_metadata_request {
929    my ( $Metadata_Request ) = @_;
930
931    my @data;
932    my $request = {
933                                                # template    => '...',
934                                                # len         => ...,
935        data        => \@data,
936    };
937
938    _encode_request_header( $request, $APIKEY_METADATA, $Metadata_Request );
939                                                                            # Size
940                                                                            # ApiKey
941                                                                            # ApiVersion
942                                                                            # CorrelationId
943                                                                            # ClientId
944
945    my $topics_array = $Metadata_Request->{topics};
946    push( @data, scalar( @$topics_array ) );                                # topics array size
947    $request->{template}    .= q{l>};
948    $request->{len}         += 4;
949
950    foreach my $topic ( @$topics_array ) {
951        $request->{template}    .= q{s>};                                   # string length
952        $request->{len}         += 2;
953        _encode_string( $request, $topic );                                 # TopicName
954    }
955
956    return pack( $request->{template}, $request->{len}, @data );
957}
958
959# METADATA Response ------------------------------------------------------------
960
961my $_decode_metadata_response_template = q{x[l]l>l>X[l]l>/(l>s>/al>)l>X[l]l>/(s>s>/al>X[l]l>/(s>l>l>l>X[l]l>/(l>)l>X[l]l>/(l>)))};
962                                        # x[l]                    # Size (skip)
963                                        # l>                      # CorrelationId
964
965                                        # l>                      # Broker array size
966                                        # X[l]
967                                        # l>/(                    # Broker array
968                                        #     l>                      # NodeId
969                                        #     s>/a                    # Host
970                                        #     l>                      # Port
971                                        # )
972
973                                        # l>                      # TopicMetadata array size
974                                        # X[l]
975                                        # l>/(                    # TopicMetadata array
976                                        #     s>                      # ErrorCode
977                                        #     s>/a                    # TopicName
978
979                                        #     l>                      # PartitionMetadata array size
980                                        #     X[l]
981                                        #     l>/(                    # PartitionMetadata array
982                                        #         s>                      # ErrorCode
983                                        #         l>                      # Partition
984                                        #         l>                      # Leader
985
986                                        #         l>                      # Replicas array size
987                                        #         X[l]
988                                        #         l>/(                    # Replicas array
989                                        #             l>                      # ReplicaId
990                                        #         )
991
992                                        #         l>                      # Isr array size
993                                        #         X[l]
994                                        #         l>/(                    # Isr array
995                                        #             l>                      # ReplicaId
996                                        #         )
997                                        #     )
998                                        # )
999
1000=head3 C<decode_metadata_response( $bin_stream_ref )>
1001
1002Decodes the argument and returns a reference to the hash representing
1003the structure of the METADATA Response (examples see C<t/*_decode_encode.t>).
1004
1005This function take argument. The following argument is currently recognized:
1006
1007=over 3
1008
1009=item C<$bin_stream_ref>
1010
1011C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer
1012must be a non-empty binary string.
1013
1014=back
1015
1016=cut
1017sub decode_metadata_response {
1018    my ( $bin_stream_ref ) = @_;
1019
1020    my @data = unpack( $_decode_metadata_response_template, $$bin_stream_ref );
1021
1022    my $i = 0;
1023    my $Metadata_Response = {};
1024
1025    $Metadata_Response->{CorrelationId}                           =  $data[ $i++ ];   # CorrelationId
1026
1027    my $Broker_array = $Metadata_Response->{Broker}               =  [];
1028    my $Broker_array_size                                         =  $data[ $i++ ];   # Broker array size
1029    while ( $Broker_array_size-- ) {
1030        push( @$Broker_array, {
1031            NodeId                                                => $data[ $i++ ],   # NodeId
1032            Host                                                  => $data[ $i++ ],   # Host
1033            Port                                                  => $data[ $i++ ],   # Port
1034            }
1035        );
1036    }
1037
1038    my $TopicMetadata_array = $Metadata_Response->{TopicMetadata} =  [];
1039    my $TopicMetadata_array_size                                  =  $data[ $i++ ];   # TopicMetadata array size
1040    while ( $TopicMetadata_array_size-- ) {
1041        my $TopicMetadata = {
1042            ErrorCode                                             => $data[ $i++ ],   # ErrorCode
1043            TopicName                                             => $data[ $i++ ],   # TopicName
1044        };
1045
1046        my $PartitionMetadata_array = $TopicMetadata->{PartitionMetadata} =  [];
1047        my $PartitionMetadata_array_size                          =  $data[ $i++ ];   # PartitionMetadata array size
1048        while ( $PartitionMetadata_array_size-- ) {
1049            my $PartitionMetadata = {
1050                ErrorCode                                         => $data[ $i++ ],   # ErrorCode
1051                Partition                                         => $data[ $i++ ],   # Partition
1052                Leader                                            => $data[ $i++ ],   # Leader
1053            };
1054
1055            my $Replicas_array = $PartitionMetadata->{Replicas}   =  [];
1056            my $Replicas_array_size                               =  $data[ $i++ ];   # Replicas array size
1057            while ( $Replicas_array_size-- ) {
1058                push( @$Replicas_array,                              $data[ $i++ ] ); # ReplicaId
1059            }
1060
1061            my $Isr_array = $PartitionMetadata->{Isr}             =  [];
1062            my $Isr_array_size                                    =  $data[ $i++ ];   # Isr array size
1063            while ( $Isr_array_size-- ) {
1064                push( @$Isr_array,                                   $data[ $i++ ] ); # ReplicaId
1065            }
1066
1067            push( @$PartitionMetadata_array, $PartitionMetadata );
1068        }
1069
1070        push( @$TopicMetadata_array, $TopicMetadata );
1071    }
1072
1073    return $Metadata_Response;
1074}
1075
1076#-- private functions ----------------------------------------------------------
1077
1078# Generates a template to encrypt the request header
1079sub _encode_request_header {
1080    my ( $request, $api_key, $request_ref ) = @_;
1081
1082    @{ $request->{data} } = (
1083                                                                            # Size
1084        $api_key,                                                           # ApiKey
1085        $APIVERSION,                                                        # ApiVersion
1086        $request_ref->{CorrelationId},                                      # CorrelationId
1087    );
1088    $request->{template}    = $_Request_header_template;
1089    $request->{len}         = $_Request_header_length;
1090    _encode_string( $request, $request_ref->{ClientId} );                   # ClientId
1091}
1092
1093# Generates a template to decrypt the fetch response body
1094sub _decode_fetch_response_template {
1095    my ( $response ) = @_;
1096
1097    $response->{template}       = $_FetchResponse_header_template;
1098    $response->{stream_offset}  = $_FetchResponse_header_length;    # bytes before topics array size
1099                                                                                # [l] Size
1100                                                                                # [l] CorrelationId
1101    my $topics_array_size = unpack(
1102         q{x[}.$response->{stream_offset}
1103        .q{]l>},                            # topics array size
1104        ${ $response->{bin_stream} }
1105    );
1106    $response->{stream_offset} += 4;        # bytes before TopicName length
1107                                                                                # [l] topics array size
1108
1109    my ( $TopicName_length, $partitions_array_size );
1110    while ( $topics_array_size-- ) {
1111        $TopicName_length = unpack(
1112             q{x[}.$response->{stream_offset}
1113            .q{]s>},                        # TopicName length
1114            ${ $response->{bin_stream} }
1115        );
1116        $response->{stream_offset} +=       # bytes before partitions array size
1117              2                                                                 # [s] TopicName length
1118            + $TopicName_length                                                 # TopicName
1119            ;
1120        $partitions_array_size = unpack(
1121             q{x[}.$response->{stream_offset}
1122            .q{]l>},                        # partitions array size
1123            ${ $response->{bin_stream} }
1124        );
1125        $response->{stream_offset} += 4;    # bytes before Partition
1126                                                                                # [l] partitions array size
1127
1128        $response->{template}       .= $_FetchResponse_topic_body_template;
1129        $response->{stream_offset}  += $_FetchResponse_topic_body_length;   # (without TopicName and partitions array size)
1130                                            # bytes before MessageSetSize
1131                                                                                # TopicName
1132                                                                                # [l] # partitions array size
1133                                                                                # [l] Partition
1134                                                                                # [s] ErrorCode
1135                                                                                # [q] HighwaterMarkOffset
1136
1137        _decode_MessageSet_template( $response );
1138    }
1139}
1140
1141# Decrypts MessageSet
1142sub _decode_MessageSet_array {
1143    my ( $response, $MessageSetSize, $i_ref, $MessageSet_array_ref ) = @_;
1144
1145    my $data = $response->{data};
1146    my $data_array_size = scalar @{ $data };
1147
1148# NOTE: not all messages can be returned
1149    my ( $Message, $MessageSize, $Crc, $Key_length, $Value_length );
1150    while ( $MessageSetSize && $$i_ref < $data_array_size ) {
1151
1152        $Message = {
1153            Offset                                        => _unpack64( $data->[ $$i_ref++ ] ), # Offset
1154        };
1155
1156        $MessageSize                                                 =  $data->[ $$i_ref++ ];   # MessageSize
1157# NOTE: The CRC is the CRC32 of the remainder of the message bytes.
1158# This is used to check the integrity of the message on the broker and consumer:
1159# MagicByte + Attributes + Key length + Key + Value length + Value
1160        $Crc                                                         =  $data->[ $$i_ref++ ];   # Crc
1161# WARNING: The current version of the module does not support the following:
1162# A message set is also the unit of compression in Kafka,
1163# and we allow messages to recursively contain compressed message sets to allow batch compression.
1164        $Message->{MagicByte}                                        =  $data->[ $$i_ref++ ];   # MagicByte
1165        $Message->{Attributes}                                       =  $data->[ $$i_ref++ ];   # Attributes
1166
1167        $Key_length                                                  =  $data->[ $$i_ref++ ];   # Key length
1168        $Message->{Key}   = $Key_length   == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ];   # Key
1169        $Value_length                                                =  $data->[ $$i_ref++ ];   # Value length
1170        $Message->{Value} = $Value_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ];   # Value
1171
1172        if ( my $compression_codec = $Message->{Attributes} & $COMPRESSION_CODEC_MASK ) {
1173            my $decompressed;
1174            if ( $compression_codec == $COMPRESSION_GZIP ) {
1175                gunzip( \$Message->{Value} => \$decompressed )
1176                    or _error( $ERROR_COMPRESSION, "gunzip failed: $GunzipError" );
1177            } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
1178                my ( $header, $x_version, $x_compatversion, undef ) = unpack( q{a[8]L>L>L>}, $Message->{Value} );   # undef - $x_length
1179
1180                # Special thanks to Colin Blower
1181                if ( $header eq "\x82SNAPPY\x00" ) {
1182                    # Found a xerial header.... nonstandard snappy compression header, remove the header
1183                    if ( $x_compatversion == 1 && $x_version == 1 ) {
1184                        $Message->{Value} = substr( $Message->{Value}, 20 );    # 20 = q{a[8]L>L>L>}
1185                    } else {
1186                        #warn("V $x_version and comp $x_compatversion");
1187                        _error( $ERROR_COMPRESSION, "Snappy compression with incompatible xerial header version found (x_version = $x_version, x_compatversion = $x_compatversion)" );
1188                    }
1189                }
1190
1191                $decompressed = Compress::Snappy::decompress( $Message->{Value} )
1192                    // _error( $ERROR_COMPRESSION, 'Unable to decompress snappy compressed data' );
1193            } else {
1194                _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
1195            }
1196            my @data;
1197            my $Value_length = length $decompressed;
1198            my $resp = {
1199                data            => \@data,
1200                bin_stream      => \$decompressed,
1201                stream_offset   => 0,
1202            };
1203            _decode_MessageSet_sized_template( $Value_length, $resp );
1204            @data = unpack( $resp->{template}, ${ $resp->{bin_stream} } );
1205            my $i = 0;  # i_ref
1206            my $size = length( $decompressed );
1207            _decode_MessageSet_array(
1208                $resp,
1209                $size,  # message set size
1210                \$i,    # i_ref
1211                $MessageSet_array_ref,
1212            );
1213        } else {
1214            push( @$MessageSet_array_ref, $Message );
1215        }
1216
1217        $MessageSetSize -= 12
1218                                    # [q] Offset
1219                                    # [l] MessageSize
1220            + $MessageSize          # Message
1221            ;
1222    }
1223}
1224
1225# Generates a template to encrypt MessageSet
1226sub _encode_MessageSet_array {
1227    my ( $request, $MessageSet_array_ref, $compression_codec ) = @_;
1228
1229    my ( $MessageSize, $Key, $Value, $key_length, $value_length, $message_body, $message_set );
1230
1231    if ( $compression_codec ) {
1232        foreach my $MessageSet ( @$MessageSet_array_ref ) {
1233            $key_length   = length( $Key    = $MessageSet->{Key} );
1234            $value_length = length( $Value  = $MessageSet->{Value} );
1235
1236            $message_body = pack(
1237                    q{ccl>}                                         # MagicByte
1238                                                                    # Attributes
1239                                                                    # Key length
1240                    .( $key_length   ? qq{a[$key_length]}   : q{} ) # Key
1241                    .q{l>}                                          # Value length
1242                    .( $value_length ? qq{a[$value_length]} : q{} ) # Value
1243                ,
1244                0,
1245                $COMPRESSION_NONE,  # According to Apache Kafka documentation:
1246                                    # The lowest 2 bits contain the compression codec used for the message.
1247                                    # The other bits should be set to 0.
1248                $key_length     ? ( $key_length,    $Key )    : ( $NULL_BYTES_LENGTH ),
1249                $value_length   ? ( $value_length,  $Value )  : ( $NULL_BYTES_LENGTH ),
1250            );
1251
1252            $message_set .= pack( qq(x[8]l>l>),     # 8 Offset ($PRODUCER_ANY_OFFSET)
1253                length( $message_body ) + 4,        # [l] MessageSize ( $message_body + Crc )
1254                crc32( $message_body )              # [l] Crc
1255            ).$message_body;
1256        }
1257
1258        $MessageSet_array_ref = [
1259            {
1260                Offset  => $PRODUCER_ANY_OFFSET,
1261                Key     => $Key,
1262            }
1263        ];
1264
1265        # Compression
1266        if ( $compression_codec == $COMPRESSION_GZIP ) {
1267            $MessageSet_array_ref->[0]->{Value} = q{};
1268            gzip( \$message_set => \$MessageSet_array_ref->[0]->{Value} )
1269                or _error( $ERROR_COMPRESSION, "gzip failed: $GzipError" );
1270        } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) {
1271            $MessageSet_array_ref->[0]->{Value} = Compress::Snappy::compress( $message_set )
1272                // _error( $ERROR_COMPRESSION, 'Unable to compress snappy data' );
1273        } else {
1274             _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" );
1275        }
1276    }
1277
1278    my $data = $request->{data};
1279    my $MessageSetSize = 0;
1280    my %sizes;
1281    foreach my $MessageSet ( @$MessageSet_array_ref ) {
1282        $MessageSetSize +=
1283              12                                                            # [q] Offset
1284                                                                            # [l] MessageSize
1285            + ( $sizes{ $MessageSet } =                                     # MessageSize
1286                  10                                                        # [l] Crc
1287                                                                            # [c] MagicByte
1288                                                                            # [c] Attributes
1289                                                                            # [l] Key length
1290                + length( $MessageSet->{Key}    //= q{} )                   # Key
1291                + 4                                                         # [l] Value length
1292                + length( $MessageSet->{Value}  //= q{} )                   # Value
1293            )   # MessageSize
1294            ;
1295    }
1296    push( @$data, $MessageSetSize );
1297    $request->{template}    .= q{l>};                                       # MessageSetSize
1298    $request->{len}         += 4;
1299
1300    foreach my $MessageSet ( @$MessageSet_array_ref ) {
1301        push( @$data,
1302            _pack64( $MessageSet->{Offset} ),                               # Offset (It may be $PRODUCER_ANY_OFFSET)
1303            ( $MessageSize = $sizes{ $MessageSet } ),                       # MessageSize
1304        );
1305        $request->{template}    .= $_MessageSet_template;
1306        $request->{len}         += $_MessageSet_length;
1307
1308        $key_length   = length( $Key    = $MessageSet->{Key} );
1309        $value_length = length( $Value  = $MessageSet->{Value} );
1310
1311        $message_body = pack(
1312                q{ccl>}                                         # MagicByte
1313                                                                # Attributes
1314                                                                # Key length
1315                .( $key_length   ? qq{a[$key_length]}   : q{} ) # Key
1316                .q{l>}                                          # Value length
1317                .( $value_length ? qq{a[$value_length]} : q{} ) # Value
1318            ,
1319            0,
1320            $compression_codec // $COMPRESSION_NONE,    # According to Apache Kafka documentation:
1321                                # The lowest 2 bits contain the compression codec used for the message.
1322                                # The other bits should be set to 0.
1323            $key_length     ? ( $key_length,    $Key )    : ( $NULL_BYTES_LENGTH ),
1324            $value_length   ? ( $value_length,  $Value )  : ( $NULL_BYTES_LENGTH ),
1325        );
1326
1327        push( @$data, crc32( $message_body ), $message_body );
1328        # Message
1329        $request->{template} .= q{l>a[}                                         # Crc
1330            .( $MessageSize - 4 )   # 4 Crc
1331            .qq{]};
1332        # Message body:
1333                                                                                # MagicByte
1334                                                                                # Attributes
1335                                                                                # Key length
1336                                                                                # Key
1337                                                                                # Value length
1338                                                                                # Value
1339        $request->{len} += $MessageSize;    # Message
1340    }
1341}
1342
1343# Generates a template to decrypt MessageSet
1344sub _decode_MessageSet_template {
1345    my ( $response ) = @_;
1346
1347    my $MessageSetSize = unpack(
1348         q{x[}.$response->{stream_offset}
1349        .q{]l>},                            # MessageSetSize
1350        ${ $response->{bin_stream} }
1351    );
1352    $response->{template} .= q{l>};         # MessageSetSize
1353    $response->{stream_offset} += 4;        # bytes before Offset
1354
1355    return _decode_MessageSet_sized_template($MessageSetSize, $response);
1356}
1357
1358sub _decode_MessageSet_sized_template {
1359    my ( $MessageSetSize, $response ) = @_;
1360
1361    my $bin_stream_length = length ${ $response->{bin_stream} };
1362
1363    my ( $local_template, $MessageSize, $Key_length, $Value_length );
1364    CREATE_TEMPLATE:
1365    while ( $MessageSetSize ) {
1366# Not the full MessageSet
1367        last CREATE_TEMPLATE if $MessageSetSize < 22;
1368                # [q] Offset
1369                # [l] MessageSize
1370                # [l] Crc
1371                # [c] MagicByte
1372                # [c] Attributes
1373                # [l] Key length
1374                # [l] Value length
1375
1376        $local_template = q{};
1377        MESSAGE_SET:
1378        {
1379            $local_template .= $_Message_template;
1380            $response->{stream_offset} += $_Message_length; # (Only Offset length)
1381                # [q] Offset
1382                # [l] MessageSize
1383                # [l] Crc
1384                # [c] MagicByte
1385                # [c] Attributes
1386                # [l] Key length
1387                                                # bytes before MessageSize
1388                                                                                # [q] Offset
1389            $MessageSize = unpack(
1390                 q{x[}.$response->{stream_offset}
1391                .q{]l>},                        # MessageSize
1392                ${ $response->{bin_stream} }
1393            );
1394
1395            $response->{stream_offset} += 10;   # bytes before Crc
1396                                                                                # [l] MessageSize
1397                                                # bytes before Key length
1398                                                                                # [l] Crc
1399                                                                                # [c] MagicByte
1400                                                                                # [c] Attributes
1401            $Key_length = unpack(
1402                 q{x[}.$response->{stream_offset}
1403                .q{]l>},                        # Key length
1404                ${ $response->{bin_stream} }
1405            );
1406
1407            $response->{stream_offset} += 4;    # bytes before Key or Value length
1408                                                                                # [l] Key length
1409            $response->{stream_offset} += $Key_length   # bytes before Key
1410                if $Key_length != $NULL_BYTES_LENGTH;                           # Key
1411            if ( $bin_stream_length >= $response->{stream_offset} + 4 ) {   # + [l] Value length
1412                $local_template .= $_Key_or_Value_template
1413                    if $Key_length != $NULL_BYTES_LENGTH;
1414            }
1415            else {
1416# Not the full MessageSet
1417                $local_template = q{};
1418                last MESSAGE_SET;
1419            }
1420
1421            $local_template .= q{l>};           # Value length
1422            $Value_length = unpack(
1423                 q{x[}.$response->{stream_offset}
1424                .q{]l>},                        # Value length
1425                ${ $response->{bin_stream} }
1426            );
1427            $response->{stream_offset} +=       # bytes before Value or next Message
1428                  4                                                             # [l] Value length
1429                ;
1430            $response->{stream_offset} += $Value_length # bytes before next Message
1431                if $Value_length != $NULL_BYTES_LENGTH;                         # Value
1432            if ( $bin_stream_length >= $response->{stream_offset} ) {
1433                $local_template .= $_Key_or_Value_template
1434                    if $Value_length != $NULL_BYTES_LENGTH;
1435            }
1436            else {
1437# Not the full MessageSet
1438                $local_template = q{};
1439                last MESSAGE_SET;
1440            }
1441        }
1442
1443        if ( $local_template ) {
1444            $response->{template} .= $local_template;
1445            $MessageSetSize -= 12
1446                                        # [q] Offset
1447                                        # [l] MessageSize
1448                + $MessageSize          # Message
1449                ;
1450        }
1451        else {
1452            last CREATE_TEMPLATE;
1453        }
1454    }
1455}
1456
1457# Generates a template to encrypt string
1458sub _encode_string {
1459    my ( $request, $string ) = @_;
1460
1461    if ( $string eq q{} ) {
1462        push( @{ $request->{data} }, 0 );
1463    }
1464    else {
1465        my $string_length = length $string;
1466        push( @{ $request->{data} }, $string_length, $string );
1467        $request->{template}    .= q{a*};   # string;
1468        $request->{len}         += $string_length;
1469    }
1470}
1471
1472# Handler for errors
1473sub _error {
1474    Kafka::Exception::Protocol->throw( throw_args( @_ ) );
1475}
1476
14771;
1478
1479__END__
1480
1481=head1 DIAGNOSTICS
1482
1483In order to achieve better performance, functions of this module do not perform
1484arguments validation.
1485
1486=head1 SEE ALSO
1487
1488The basic operation of the Kafka package modules:
1489
1490L<Kafka|Kafka> - constants and messages used by the Kafka package modules.
1491
1492L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster.
1493
1494L<Kafka::Producer|Kafka::Producer> - interface for producing client.
1495
1496L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client.
1497
1498L<Kafka::Message|Kafka::Message> - interface to access Kafka message
1499properties.
1500
1501L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the
1502protocol on 32 bit systems.
1503
1504L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the
1505Apache Kafka's Protocol.
1506
1507L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server.
1508
1509L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions.
1510
1511L<Kafka::Internals|Kafka::Internals> - internal constants and functions used
1512by several package modules.
1513
1514A wealth of detail about the Apache Kafka and the Kafka Protocol:
1515
1516Main page at L<http://kafka.apache.org/>
1517
1518Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol>
1519
1520=head1 SOURCE CODE
1521
1522Kafka package is hosted on GitHub:
1523L<https://github.com/TrackingSoft/Kafka>
1524
1525=head1 AUTHOR
1526
1527Sergey Gladkov, E<lt>sgladkov@trackingsoft.comE<gt>
1528
1529=head1 CONTRIBUTORS
1530
1531Alexander Solovey
1532
1533Jeremy Jordan
1534
1535Sergiy Zuban
1536
1537Vlad Marchenko
1538
1539=head1 COPYRIGHT AND LICENSE
1540
1541Copyright (C) 2012-2013 by TrackingSoft LLC.
1542
1543This package is free software; you can redistribute it and/or modify it under
1544the same terms as Perl itself. See I<perlartistic> at
1545L<http://dev.perl.org/licenses/artistic.html>.
1546
1547This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
1548without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
1549PARTICULAR PURPOSE.
1550
1551=cut
1552