1package Kafka::Protocol; 2 3=head1 NAME 4 5Kafka::Protocol - Functions to process messages in the Apache Kafka protocol. 6 7=head1 VERSION 8 9This documentation refers to C<Kafka::Protocol> version 0.8010 . 10 11=cut 12 13#-- Pragmas -------------------------------------------------------------------- 14 15use 5.010; 16use strict; 17use warnings; 18 19# ENVIRONMENT ------------------------------------------------------------------ 20 21our $VERSION = '0.8010'; 22 23use Exporter qw( 24 import 25); 26our @EXPORT_OK = qw( 27 decode_fetch_response 28 decode_metadata_response 29 decode_offset_response 30 decode_produce_response 31 encode_fetch_request 32 encode_metadata_request 33 encode_offset_request 34 encode_produce_request 35 _decode_MessageSet_template 36 _decode_MessageSet_array 37 _encode_MessageSet_array 38 _encode_string 39 _pack64 40 _unpack64 41 _verify_string 42 $APIVERSION 43 $BAD_OFFSET 44 $COMPRESSION_CODEC_MASK 45 $CONSUMERS_REPLICAID 46 $NULL_BYTES_LENGTH 47 $_int64_template 48); 49 50#-- load the modules ----------------------------------------------------------- 51 52use Compress::Snappy; 53use Const::Fast; 54use IO::Compress::Gzip qw( 55 gzip 56 $GzipError 57); 58use IO::Uncompress::Gunzip qw( 59 gunzip 60 $GunzipError 61); 62use Params::Util qw( 63 _ARRAY 64 _HASH 65 _SCALAR 66 _STRING 67); 68use Scalar::Util qw( 69 dualvar 70); 71use String::CRC32; 72 73use Kafka qw( 74 $BITS64 75 $BLOCK_UNTIL_IS_COMMITTED 76 $COMPRESSION_GZIP 77 $COMPRESSION_NONE 78 $COMPRESSION_SNAPPY 79 $DEFAULT_MAX_WAIT_TIME 80 %ERROR 81 $ERROR_COMPRESSION 82 $ERROR_MISMATCH_ARGUMENT 83 $ERROR_NOT_BINARY_STRING 84 $ERROR_REQUEST_OR_RESPONSE 85 $NOT_SEND_ANY_RESPONSE 86 $RECEIVE_EARLIEST_OFFSETS 87 $RECEIVE_LATEST_OFFSET 88 $WAIT_WRITTEN_TO_LOCAL_LOG 89); 90use Kafka::Exceptions; 91use Kafka::Internals qw( 92 $APIKEY_FETCH 93 $APIKEY_METADATA 94 $APIKEY_OFFSET 95 $APIKEY_PRODUCE 96 $PRODUCER_ANY_OFFSET 97); 98 99#-- declarations --------------------------------------------------------------- 100 101=head1 SYNOPSIS 102 103 use 5.010; 104 use strict; 105 use warnings; 106 107 use Data::Compare; 108 use Kafka qw( 109 $COMPRESSION_NONE 110 $ERROR_NO_ERROR 111 $REQUEST_TIMEOUT 112 $WAIT_WRITTEN_TO_LOCAL_LOG 113 ); 114 use Kafka::Internals qw( 115 $PRODUCER_ANY_OFFSET 116 ); 117 use Kafka::Protocol qw( 118 decode_produce_response 119 encode_produce_request 120 ); 121 122 # a encoded produce request hex stream 123 my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' ); 124 125 # a decoded produce request 126 my $decoded = { 127 CorrelationId => 4, 128 ClientId => q{}, 129 RequiredAcks => $WAIT_WRITTEN_TO_LOCAL_LOG, 130 Timeout => $REQUEST_TIMEOUT * 100, # ms 131 topics => [ 132 { 133 TopicName => 'mytopic', 134 partitions => [ 135 { 136 Partition => 0, 137 MessageSet => [ 138 { 139 Offset => $PRODUCER_ANY_OFFSET, 140 MagicByte => 0, 141 Attributes => $COMPRESSION_NONE, 142 Key => q{}, 143 Value => 'Hello!', 144 }, 145 ], 146 }, 147 ], 148 }, 149 ], 150 }; 151 152 my $encoded_request = encode_produce_request( $decoded ); 153 say 'encoded correctly' if $encoded_request eq $encoded; 154 155 # a encoded produce response hex stream 156 $encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' ); 157 158 # a decoded produce response 159 $decoded = { 160 CorrelationId => 4, 161 topics => [ 162 { 163 TopicName => 'mytopic', 164 partitions => [ 165 { 166 Partition => 0, 167 ErrorCode => $ERROR_NO_ERROR, 168 Offset => 0, 169 }, 170 ], 171 }, 172 ], 173 }; 174 175 my $decoded_response = decode_produce_response( \$encoded ); 176 say 'decoded correctly' if Compare( $decoded_response, $decoded ); 177 178 # more examples, see t/*_decode_encode.t 179 180=head1 DESCRIPTION 181 182This module is not a user module. 183 184In order to achieve better performance, 185functions of this module do not perform arguments validation. 186 187The main features of the C<Kafka::Protocol> module are: 188 189=over 3 190 191=item * 192 193Supports parsing the Apache Kafka protocol. 194 195=item * 196 197Supports Apache Kafka Requests and Responses (PRODUCE and FETCH). 198Within this package we currently support 199access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses. 200 201=item * 202 203Support for working with 64 bit elements of the Kafka protocol on 32 bit systems. 204 205=back 206 207=cut 208 209# A Guide To The Kafka Protocol 0.8: 210# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol 211# 212# -- Protocol Primitive Types 213# int8, int16, int32, int64 214# Signed integers 215# stored in big endian order. 216# bytes, string 217# consist of a signed integer 218# giving a length N 219# followed by N bytes of content. 220# A length of -1 indicates null. 221# string uses an int16 for its size, 222# and bytes uses an int32. 223# Arrays 224# These will always be encoded as an int32 size containing the length N 225# followed by N repetitions of the structure 226# which can itself be made up of other primitive types. 227# 228# -- N.B. 229# - The response will always match the paired request 230# - One structure common to both the produce and fetch requests is the message set format. 231# - MessageSets are not preceded by an int32 like other array elements in the protocol. 232# - A message set is also the unit of compression in Kafka, 233# and we allow messages to recursively contain compressed message sets. 234# 235# -- Protocol Fields 236# ApiKey => int16 That identifies the API being invoked 237# ApiVersion => int16 This is a numeric version number for this api. 238# Currently the supported version for all APIs is 0. 239# Attributes => int8 Metadata attributes about the message. 240# The lowest 2 bits contain the compression codec used for the message. 241# ClientId => string This is a user supplied identifier for the client application. 242# CorrelationId => int32 This is a user-supplied integer. 243# It will be passed back in the response by the server, unmodified. 244# It is useful for matching request and response between the client and server. 245# Crc => int32 The CRC32 of the remainder of the message bytes. 246# ErrorCode => int16 The error from this partition, if any. 247# Errors are given on a per-partition basis 248# because a given partition may be unavailable or maintained on a different host, 249# while others may have successfully accepted the produce request. 250# FetchOffset => int64 The offset to begin this fetch from. 251# HighwaterMarkOffset => int64 The offset at the end of the log for this partition. 252# This can be used by the client to determine how many messages behind the end of the log they are. 253# - 0.8 documents: Replication design 254# The high watermark is the offset of the last committed message. 255# Each log is periodically synced to disks. 256# Data before the flushed offset is guaranteed to be persisted on disks. 257# As we will see, the flush offset can be before or after high watermark. 258# - 0.7 documents: Wire protocol 259# If the last segment file for the partition is not empty and was modified earlier than TIME, 260# it will return both the first offset for that segment and the high water mark. 261# The high water mark is not the offset of the last message, 262# but rather the offset that the next message sent to the partition will be written to. 263# Host => string The brokers hostname 264# Isr => [ReplicaId] The set subset of the replicas that are "caught up" to the leader - a set of in-sync replicas (ISR) 265# Key => bytes An optional message key 266# The key can be null. 267# Leader => int32 The node id for the kafka broker currently acting as leader for this partition. 268# If no leader exists because we are in the middle of a leader election this id will be -1. 269# MagicByte => int8 A version id used to allow backwards compatible evolution of the message binary format. 270# MaxBytes => int32 The maximum bytes to include in the message set for this partition. 271# MaxNumberOfOffsets => int32 Kafka here is return up to 'MaxNumberOfOffsets' of offsets 272# MaxWaitTime => int32 The maximum amount of time (ms) 273# to block waiting 274# if insufficient data is available at the time the request is issued. 275# MessageSetSize => int32 The size in bytes of the message set for this partition 276# MessageSize => int32 The size of the subsequent request or response message in bytes 277# MinBytes => int32 The minimum number of bytes of messages that must be available to give a response. 278# If the client sets this to 0 the server will always respond immediately. 279# If this is set to 1, 280# the server will respond as soon 281# as at least one partition 282# has at least 1 byte of data 283# or the specified timeout occurs. 284# By setting higher values 285# in combination with the timeout 286# for reading only large chunks of data 287# NodeId => int32 The id of the broker. 288# This must be set to a unique integer for each broker. 289# Offset => int64 The offset used in kafka as the log sequence number. 290# When the producer is sending messages it doesn't actually know the offset 291# and can fill in any value here it likes. 292# Partition => int32 The id of the partition the fetch is for 293# or the partition that data is being published to 294# or the partition this response entry corresponds to. 295# Port => int32 The brokers port 296# ReplicaId => int32 Indicates the node id of the replica initiating this request. 297# Normal client consumers should always specify this as -1 as they have no node id. 298# Replicas => [ReplicaId] The set of alive nodes that currently acts as slaves for the leader for this partition. 299# RequiredAcks => int16 Indicates how many acknowledgements the servers should receive 300# before responding to the request. 301# If it is 0 the server does not send any response. 302# If it is 1, the server will wait the data is written to the local log before sending a response. 303# If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. 304# For any number > 1 the server will block waiting for this number of acknowledgements to occur 305# (but the server will never wait for more acknowledgements than there are in-sync replicas). 306# Size => int32 The size of the subsequent request or response message in bytes 307# Time => int64 Used to ask for all messages before a certain time (ms). 308# There are two special values. 309# Specify -1 to receive the latest offset (this will only ever return one offset). 310# Specify -2 to receive the earliest available offsets. 311# Timeout => int32 This provides a maximum time (ms) the server can await the receipt 312# of the number of acknowledgements in RequiredAcks. 313# The timeout is not an exact limit on the request time for a few reasons: 314# (1) it does not include network latency, 315# (2) the timer begins at the beginning of the processing of this request 316# so if many requests are queued due to server overload 317# that wait time will not be included, 318# (3) we will not terminate a local write 319# so if the local write time exceeds this timeout it will not be respected. 320# To get a hard timeout of this type the client should use the socket timeout. 321# TopicName => string The name of the topic. 322# Value => bytes The actual message contents 323# Kafka supports recursive messages in which case this may itself contain a message set. 324# The message can be null. 325 326our $_int64_template; # Used to unpack a 64 bit value 327if ( $BITS64 ) { 328 $_int64_template = q{q>}; 329 # unpack a big-endian signed quad (64-bit) value on 64 bit systems. 330 *_unpack64 = sub { $_[0] }; 331 # pack a big-endian signed quad (64-bit) value on 64 bit systems. 332 *_pack64 = sub { pack( q{q>}, $_[0] ) }; 333} else { 334 eval q{ require Kafka::Int64; } ## no critic 335 or die "Cannot load Kafka::Int64 : $@"; 336 337 $_int64_template = q{a[8]}; 338 # unpack a big-endian signed quad (64-bit) value on 32 bit systems. 339 *_unpack64 = \&Kafka::Int64::unpackq; 340 # pack a big-endian signed quad (64-bit) value on 32 bit systems. 341 *_pack64 = \&Kafka::Int64::packq; 342} 343 344=head2 EXPORT 345 346The following constants are available for export 347 348=cut 349 350=head3 C<$APIVERSION> 351 352According to Apache Kafka documentation: 'This is a numeric version number for this api. 353Currently the supported version for all APIs is 0 .' 354 355=cut 356const our $APIVERSION => 0; 357 358# Attributes 359 360# According to Apache Kafka documentation: 361# Attributes - Metadata attributes about the message. 362# The lowest 2 bits contain the compression codec used for the message. 363const our $COMPRESSION_CODEC_MASK => 0b11; 364 365=head3 C<$CONSUMERS_REPLICAID> 366 367According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.' 368 369=cut 370const our $CONSUMERS_REPLICAID => -1; 371 372=head3 C<$NULL_BYTES_LENGTH> 373 374According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.' 375 376=cut 377const our $NULL_BYTES_LENGTH => -1; 378 379=head3 C<$BAD_OFFSET> 380 381According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset 382and can fill in any value here it likes.' 383 384=cut 385const our $BAD_OFFSET => -1; 386 387my ( $_Request_header_template, $_Request_header_length ) = ( 388 q{l>s>s>l>s>}, # Size 389 # 2 ApiKey 390 # 2 ApiVersion 391 # 4 CorrelationId 392 # 2 ClientId length 393 10 # 'Size' is not included in the calculation of length 394); 395my ( $_ProduceRequest_header_template, $_ProduceRequest_header_length ) = ( 396 q{s>l>l>}, # 2 RequiredAcks 397 # 4 Timeout 398 # 4 topics array size 399 10 400); 401my ( $_MessageSet_template, $_MessageSet_length ) = ( 402 q{a[8]l>}, # a8 # 8 Offset 403 # 4 MessageSize 404 12 405); 406my ( $_FetchRequest_header_template, $_FetchRequest_header_length ) = ( 407 q{l>l>l>l>}, 408 # 4 ReplicaId 409 # 4 MaxWaitTime 410 # 4 MinBytes 411 # 4 topics array size 412 16 413); 414my ( $_FetchRequest_body_template, $_FetchRequest_body_length ) = ( 415 q{l>a[8]l>}, # 4 Partition 416 # 8 FetchOffset 417 # 4 MaxBytes 418 16 419); 420my ( $_OffsetRequest_header_template, $_OffsetRequest_header_length ) = ( 421 q{l>l>}, # 4 ReplicaId 422 # 4 topics array size 423 8 424); 425my ( $_OffsetRequest_body_template, $_OffsetRequest_body_length ) = ( 426 q{l>a[8]l>}, # 4 Partition 427 # 8 Time 428 # 4 MaxNumberOfOffsets 429 16 430); 431my ( $_FetchResponse_header_template, $_FetchResponse_header_length ) = ( 432 q{x[l]l>l>}, # Size (skip) 433 # 4 CorrelationId 434 # 4 topics array size 435 8 436); 437my ( $_Message_template, $_Message_length ) = ( 438 qq(${_int64_template}l>l>ccl>), 439 # 8 Offset 440 # MessageSize 441 # Crc 442 # MagicByte 443 # Attributes 444 # Key length 445 8 # Only Offset length 446); 447my ( $_FetchResponse_topic_body_template, $_FetchResponse_topic_body_length )= ( 448 qq(s>/al>l>s>${_int64_template}), 449 # TopicName 450 # partitions array size 451 # 4 Partition 452 # 2 ErrorCode 453 # 8 HighwaterMarkOffset 454 14 # without TopicName and partitions array size 455); 456my $_Key_or_Value_template = q{X[l]l>/a}; # Key or Value 457 458#-- public functions ----------------------------------------------------------- 459 460=head2 FUNCTIONS 461 462The following functions are available for C<Kafka::MockProtocol> module. 463 464=cut 465 466# PRODUCE Request -------------------------------------------------------------- 467 468=head3 C<encode_produce_request( $Produce_Request, $compression_codec )> 469 470Encodes the argument and returns a reference to the encoded binary string 471representing a Request buffer. 472 473This function take argument. The following argument is currently recognized: 474 475=over 3 476 477=item C<$Produce_Request> 478 479C<$Produce_Request> is a reference to the hash representing 480the structure of the PRODUCE Request (examples see C<t/*_decode_encode.t>). 481 482=item C<$compression_codec> 483 484Optional. 485 486C<$compression_codec> sets the required type of C<$messages> compression, 487if the compression is desirable. 488 489Supported codecs: 490L<$COMPRESSION_NONE|Kafka/$COMPRESSION_NONE>, 491L<$COMPRESSION_GZIP|Kafka/$COMPRESSION_GZIP>, 492L<$COMPRESSION_SNAPPY|Kafka/$COMPRESSION_SNAPPY>. 493 494=back 495 496=cut 497sub encode_produce_request { 498 my ( $Produce_Request, $compression_codec ) = @_; 499 500 my @data; 501 my $request = { 502 # template => '...', 503 # len => ..., 504 data => \@data, 505 }; 506 507 _encode_request_header( $request, $APIKEY_PRODUCE, $Produce_Request ); 508 # Size 509 # ApiKey 510 # ApiVersion 511 # CorrelationId 512 # ClientId 513 514 my $topics_array = $Produce_Request->{topics}; 515 push( @data, 516 $Produce_Request->{RequiredAcks}, # RequiredAcks 517 $Produce_Request->{Timeout}, # Timeout 518 scalar( @$topics_array ), # topics array size 519 ); 520 $request->{template} .= $_ProduceRequest_header_template; 521 $request->{len} += $_ProduceRequest_header_length; 522 523 foreach my $topic ( @$topics_array ) { 524 $request->{template} .= q{s>}; # string length 525 $request->{len} += 2; 526 _encode_string( $request, $topic->{TopicName} ); # TopicName 527 528 my $partitions_array = $topic->{partitions}; 529 push( @data, scalar( @$partitions_array ) ); 530 $request->{template} .= q{l>}; # partitions array size 531 $request->{len} += 4; 532 foreach my $partition ( @$partitions_array ) { 533 push( @data, $partition->{Partition} ); 534 $request->{template} .= q{l>}; # Partition 535 $request->{len} += 4; 536 537 _encode_MessageSet_array( $request, $partition->{MessageSet}, $compression_codec ); 538 } 539 } 540 541 return pack( $request->{template}, $request->{len}, @data ); 542} 543 544# PRODUCE Response ------------------------------------------------------------- 545 546my $_decode_produce_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>${_int64_template}))}; 547 # x[l] # Size (skip) 548 # l> # CorrelationId 549 550 # l> # topics array size 551 # X[l] 552 # l>/( # topics array 553 # s>/a # TopicName 554 555 # l> # partitions array size 556 # X[l] 557 # l>/( # partitions array 558 # l> # Partition 559 # s> # ErrorCode 560 # $_int64_template # Offset 561 # ) 562 # ) 563 564=head3 C<decode_produce_response( $bin_stream_ref )> 565 566Decodes the argument and returns a reference to the hash representing 567the structure of the PRODUCE Response (examples see C<t/*_decode_encode.t>). 568 569This function take argument. The following argument is currently recognized: 570 571=over 3 572 573=item C<$bin_stream_ref> 574 575C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer 576must be a non-empty binary string. 577 578=back 579 580=cut 581sub decode_produce_response { 582 my ( $bin_stream_ref ) = @_; 583 584 my @data = unpack( $_decode_produce_response_template, $$bin_stream_ref ); 585 586 my $i = 0; 587 my $Produce_Response = {}; 588 589 $Produce_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId 590 591 my $topics_array = $Produce_Response->{topics} = []; 592 my $topics_array_size = $data[ $i++ ]; # topics array size 593 while ( $topics_array_size-- ) { 594 my $topic = { 595 TopicName => $data[ $i++ ], 596 }; 597 598 my $partitions_array = $topic->{partitions} = []; 599 my $partitions_array_size = $data[ $i++ ]; # partitions array size 600 while ( $partitions_array_size-- ) { 601 my $partition = { 602 Partition => $data[ $i++ ], # Partition 603 ErrorCode => $data[ $i++ ], # ErrorCode 604 Offset => _unpack64( $data[ $i++ ] ), # Offset 605 }; 606 607 push( @$partitions_array, $partition ); 608 } 609 610 push( @$topics_array, $topic ); 611 } 612 613 return $Produce_Response; 614} 615 616# FETCH Request ---------------------------------------------------------------- 617 618=head3 C<encode_fetch_request( $Fetch_Request )> 619 620Encodes the argument and returns a reference to the encoded binary string 621representing a Request buffer. 622 623This function take argument. The following argument is currently recognized: 624 625=over 3 626 627=item C<$Fetch_Request> 628 629C<$Fetch_Request> is a reference to the hash representing 630the structure of the FETCH Request (examples see C<t/*_decode_encode.t>). 631 632=back 633 634=cut 635sub encode_fetch_request { 636 my ( $Fetch_Request ) = @_; 637 638 my @data; 639 my $request = { 640 # template => '...', 641 # len => ..., 642 data => \@data, 643 }; 644 645 _encode_request_header( $request, $APIKEY_FETCH, $Fetch_Request ); 646 # Size 647 # ApiKey 648 # ApiVersion 649 # CorrelationId 650 # ClientId 651 652 push( @data, $CONSUMERS_REPLICAID ); # ReplicaId 653 my $topics_array = $Fetch_Request->{topics}; 654 push( @data, 655 $Fetch_Request->{MaxWaitTime}, # MaxWaitTime 656 $Fetch_Request->{MinBytes}, # MinBytes 657 scalar( @$topics_array ), # topics array size 658 ); 659 $request->{template} .= $_FetchRequest_header_template; 660 $request->{len} += $_FetchRequest_header_length; 661 662 foreach my $topic ( @$topics_array ) { 663 $request->{template} .= q{s>}; # string length 664 $request->{len} += 2; 665 _encode_string( $request, $topic->{TopicName} ); # TopicName 666 667 my $partitions_array = $topic->{partitions}; 668 push( @data, scalar( @$partitions_array ) ); 669 $request->{template} .= q{l>}; # partitions array size 670 $request->{len} += 4; 671 foreach my $partition ( @$partitions_array ) { 672 push( @data, 673 $partition->{Partition}, # Partition 674 _pack64( $partition->{FetchOffset} ), # FetchOffset 675 $partition->{MaxBytes}, # MaxBytes 676 ); 677 $request->{template} .= $_FetchRequest_body_template; 678 $request->{len} += $_FetchRequest_body_length; 679 } 680 } 681 682 return pack( $request->{template}, $request->{len}, @data ); 683} 684 685# FETCH Response --------------------------------------------------------------- 686 687=head3 C<decode_fetch_response( $bin_stream_ref )> 688 689Decodes the argument and returns a reference to the hash representing 690the structure of the FETCH Response (examples see C<t/*_decode_encode.t>). 691 692This function take argument. The following argument is currently recognized: 693 694=over 3 695 696=item C<$bin_stream_ref> 697 698C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer 699must be a non-empty binary string. 700 701=back 702 703=cut 704sub decode_fetch_response { 705 my ( $bin_stream_ref ) = @_; 706 707# According to Apache Kafka documentation: 708# As an optimization the server is allowed to return a partial message at the end of the message set. 709# Clients should handle this case. 710# NOTE: look inside _decode_MessageSet_template and _decode_MessageSet_array 711 712 my @data; 713 my $response = { 714 # template => '...', 715 # stream_offset => ..., 716 data => \@data, 717 bin_stream => $bin_stream_ref, 718 }; 719 720 _decode_fetch_response_template( $response ); 721 @data = unpack( $response->{template}, $$bin_stream_ref ); 722 723 my $i = 0; 724 my $Fetch_Response = {}; 725 726 $Fetch_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId 727 728 my $topics_array = $Fetch_Response->{topics} = []; 729 my $topics_array_size = $data[ $i++ ]; # topics array size 730 while ( $topics_array_size-- ) { 731 my $topic = { 732 TopicName => $ data[ $i++ ], # TopicName 733 }; 734 735 my $partitions_array = $topic->{partitions} = []; 736 my $partitions_array_size = $data[ $i++ ]; # partitions array size 737 my ( $MessageSetSize, $MessageSet_array ); 738 while ( $partitions_array_size-- ) { 739 my $partition = { 740 Partition => $data[ $i++ ], # Partition 741 ErrorCode => $data[ $i++ ], # ErrorCode 742 HighwaterMarkOffset => _unpack64( $data[ $i++ ] ), # HighwaterMarkOffset 743 }; 744 745 $MessageSetSize = $data[ $i++ ]; # MessageSetSize 746 $MessageSet_array = $partition->{MessageSet} = []; 747 748 _decode_MessageSet_array( $response, $MessageSetSize, \$i, $MessageSet_array ); 749 750 push( @$partitions_array, $partition ); 751 } 752 753 push( @$topics_array, $topic ); 754 } 755 756 return $Fetch_Response; 757} 758 759# OFFSET Request --------------------------------------------------------------- 760 761=head3 C<encode_offset_request( $Offset_Request )> 762 763Encodes the argument and returns a reference to the encoded binary string 764representing a Request buffer. 765 766This function take argument. The following argument is currently recognized: 767 768=over 3 769 770=item C<$Offset_Request> 771 772C<$Offset_Request> is a reference to the hash representing 773the structure of the OFFSET Request (examples see C<t/*_decode_encode.t>). 774 775=back 776 777=cut 778sub encode_offset_request { 779 my ( $Offset_Request ) = @_; 780 781 my @data; 782 my $request = { 783 # template => '...', 784 # len => ..., 785 data => \@data, 786 }; 787 788 _encode_request_header( $request, $APIKEY_OFFSET, $Offset_Request ); 789 # Size 790 # ApiKey 791 # ApiVersion 792 # CorrelationId 793 # ClientId 794 795 my $topics_array = $Offset_Request->{topics}; 796 push( @data, 797 $CONSUMERS_REPLICAID, # ReplicaId 798 scalar( @$topics_array ), # topics array size 799 ); 800 $request->{template} .= $_OffsetRequest_header_template; 801 $request->{len} += $_OffsetRequest_header_length; 802 803 foreach my $topic ( @$topics_array ) { 804 $request->{template} .= q{s>}; # string length 805 $request->{len} += 2; 806 _encode_string( $request, $topic->{TopicName} ); # TopicName 807 808 my $partitions_array = $topic->{partitions}; 809 push( @data, scalar( @$partitions_array ) ); 810 $request->{template} .= q{l>}; # partitions array size 811 $request->{len} += 4; # [l] partitions array size 812 foreach my $partition ( @$partitions_array ) { 813 push( @data, 814 $partition->{Partition}, # Partition 815 _pack64( $partition->{Time} ), # Time 816 $partition->{MaxNumberOfOffsets}, # MaxNumberOfOffsets 817 ); 818 $request->{template} .= $_OffsetRequest_body_template; 819 $request->{len} += $_OffsetRequest_body_length; 820 } 821 } 822 823 return pack( $request->{template}, $request->{len}, @data ); 824} 825 826# OFFSET Response -------------------------------------------------------------- 827 828my $_decode_offset_response_template = qq{x[l]l>l>X[l]l>/(s>/al>X[l]l>/(l>s>l>X[l]l>/(${_int64_template})))}; 829 # x[l] # Size (skip) 830 # l> # CorrelationId 831 832 # l> # topics array size 833 # X[l] 834 # l>/( # topics array 835 # s>/a # TopicName 836 837 # l> # PartitionOffsets array size 838 # X[l] 839 # l>/( # PartitionOffsets array 840 # l> # Partition 841 # s> # ErrorCode 842 843 # l> # Offset array size 844 # X[l] 845 # l>/( # Offset array 846 # $_int64_template # Offset 847 # ) 848 # ) 849 # ) 850 851=head3 C<decode_offset_response( $bin_stream_ref )> 852 853Decodes the argument and returns a reference to the hash representing 854the structure of the OFFSET Response (examples see C<t/*_decode_encode.t>). 855 856This function take argument. The following argument is currently recognized: 857 858=over 3 859 860=item C<$bin_stream_ref> 861 862C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer 863must be a non-empty binary string. 864 865=back 866 867=cut 868sub decode_offset_response { 869 my ( $bin_stream_ref ) = @_; 870 871 my @data = unpack( $_decode_offset_response_template, $$bin_stream_ref ); 872 873 my $i = 0; 874 my $Offset_Response = {}; 875 876 $Offset_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId 877 878 my $topics_array = $Offset_Response->{topics} = []; 879 my $topics_array_size = $data[ $i++ ]; # topics array size 880 while ( $topics_array_size-- ) { 881 my $topic = { 882 TopicName => $data[ $i++ ], # TopicName 883 }; 884 885 my $PartitionOffsets_array = $topic->{PartitionOffsets} = []; 886 my $PartitionOffsets_array_size = $data[ $i++ ]; # PartitionOffsets array size 887 my ( $PartitionOffset, $Offset_array, $Offset_array_size ); 888 while ( $PartitionOffsets_array_size-- ) { 889 $PartitionOffset = { 890 Partition => $data[ $i++ ], # Partition 891 ErrorCode => $data[ $i++ ], # ErrorCode 892 }; 893 894 $Offset_array = $PartitionOffset->{Offset} = []; 895 $Offset_array_size = $data[ $i++ ]; # Offset array size 896 while ( $Offset_array_size-- ) { 897 push( @$Offset_array, _unpack64( $data[ $i++ ] ) ); # Offset 898 } 899 900 push( @$PartitionOffsets_array, $PartitionOffset ); 901 } 902 903 push( @$topics_array, $topic ); 904 } 905 906 return $Offset_Response; 907} 908 909# METADATA Request ------------------------------------------------------------- 910 911=head3 C<encode_metadata_request( $Metadata_Request )> 912 913Encodes the argument and returns a reference to the encoded binary string 914representing a Request buffer. 915 916This function take argument. The following argument is currently recognized: 917 918=over 3 919 920=item C<$Metadata_Request> 921 922C<$Metadata_Request> is a reference to the hash representing 923the structure of the METADATA Request (examples see C<t/*_decode_encode.t>). 924 925=back 926 927=cut 928sub encode_metadata_request { 929 my ( $Metadata_Request ) = @_; 930 931 my @data; 932 my $request = { 933 # template => '...', 934 # len => ..., 935 data => \@data, 936 }; 937 938 _encode_request_header( $request, $APIKEY_METADATA, $Metadata_Request ); 939 # Size 940 # ApiKey 941 # ApiVersion 942 # CorrelationId 943 # ClientId 944 945 my $topics_array = $Metadata_Request->{topics}; 946 push( @data, scalar( @$topics_array ) ); # topics array size 947 $request->{template} .= q{l>}; 948 $request->{len} += 4; 949 950 foreach my $topic ( @$topics_array ) { 951 $request->{template} .= q{s>}; # string length 952 $request->{len} += 2; 953 _encode_string( $request, $topic ); # TopicName 954 } 955 956 return pack( $request->{template}, $request->{len}, @data ); 957} 958 959# METADATA Response ------------------------------------------------------------ 960 961my $_decode_metadata_response_template = q{x[l]l>l>X[l]l>/(l>s>/al>)l>X[l]l>/(s>s>/al>X[l]l>/(s>l>l>l>X[l]l>/(l>)l>X[l]l>/(l>)))}; 962 # x[l] # Size (skip) 963 # l> # CorrelationId 964 965 # l> # Broker array size 966 # X[l] 967 # l>/( # Broker array 968 # l> # NodeId 969 # s>/a # Host 970 # l> # Port 971 # ) 972 973 # l> # TopicMetadata array size 974 # X[l] 975 # l>/( # TopicMetadata array 976 # s> # ErrorCode 977 # s>/a # TopicName 978 979 # l> # PartitionMetadata array size 980 # X[l] 981 # l>/( # PartitionMetadata array 982 # s> # ErrorCode 983 # l> # Partition 984 # l> # Leader 985 986 # l> # Replicas array size 987 # X[l] 988 # l>/( # Replicas array 989 # l> # ReplicaId 990 # ) 991 992 # l> # Isr array size 993 # X[l] 994 # l>/( # Isr array 995 # l> # ReplicaId 996 # ) 997 # ) 998 # ) 999 1000=head3 C<decode_metadata_response( $bin_stream_ref )> 1001 1002Decodes the argument and returns a reference to the hash representing 1003the structure of the METADATA Response (examples see C<t/*_decode_encode.t>). 1004 1005This function take argument. The following argument is currently recognized: 1006 1007=over 3 1008 1009=item C<$bin_stream_ref> 1010 1011C<$bin_stream_ref> is a reference to the encoded Response buffer. The buffer 1012must be a non-empty binary string. 1013 1014=back 1015 1016=cut 1017sub decode_metadata_response { 1018 my ( $bin_stream_ref ) = @_; 1019 1020 my @data = unpack( $_decode_metadata_response_template, $$bin_stream_ref ); 1021 1022 my $i = 0; 1023 my $Metadata_Response = {}; 1024 1025 $Metadata_Response->{CorrelationId} = $data[ $i++ ]; # CorrelationId 1026 1027 my $Broker_array = $Metadata_Response->{Broker} = []; 1028 my $Broker_array_size = $data[ $i++ ]; # Broker array size 1029 while ( $Broker_array_size-- ) { 1030 push( @$Broker_array, { 1031 NodeId => $data[ $i++ ], # NodeId 1032 Host => $data[ $i++ ], # Host 1033 Port => $data[ $i++ ], # Port 1034 } 1035 ); 1036 } 1037 1038 my $TopicMetadata_array = $Metadata_Response->{TopicMetadata} = []; 1039 my $TopicMetadata_array_size = $data[ $i++ ]; # TopicMetadata array size 1040 while ( $TopicMetadata_array_size-- ) { 1041 my $TopicMetadata = { 1042 ErrorCode => $data[ $i++ ], # ErrorCode 1043 TopicName => $data[ $i++ ], # TopicName 1044 }; 1045 1046 my $PartitionMetadata_array = $TopicMetadata->{PartitionMetadata} = []; 1047 my $PartitionMetadata_array_size = $data[ $i++ ]; # PartitionMetadata array size 1048 while ( $PartitionMetadata_array_size-- ) { 1049 my $PartitionMetadata = { 1050 ErrorCode => $data[ $i++ ], # ErrorCode 1051 Partition => $data[ $i++ ], # Partition 1052 Leader => $data[ $i++ ], # Leader 1053 }; 1054 1055 my $Replicas_array = $PartitionMetadata->{Replicas} = []; 1056 my $Replicas_array_size = $data[ $i++ ]; # Replicas array size 1057 while ( $Replicas_array_size-- ) { 1058 push( @$Replicas_array, $data[ $i++ ] ); # ReplicaId 1059 } 1060 1061 my $Isr_array = $PartitionMetadata->{Isr} = []; 1062 my $Isr_array_size = $data[ $i++ ]; # Isr array size 1063 while ( $Isr_array_size-- ) { 1064 push( @$Isr_array, $data[ $i++ ] ); # ReplicaId 1065 } 1066 1067 push( @$PartitionMetadata_array, $PartitionMetadata ); 1068 } 1069 1070 push( @$TopicMetadata_array, $TopicMetadata ); 1071 } 1072 1073 return $Metadata_Response; 1074} 1075 1076#-- private functions ---------------------------------------------------------- 1077 1078# Generates a template to encrypt the request header 1079sub _encode_request_header { 1080 my ( $request, $api_key, $request_ref ) = @_; 1081 1082 @{ $request->{data} } = ( 1083 # Size 1084 $api_key, # ApiKey 1085 $APIVERSION, # ApiVersion 1086 $request_ref->{CorrelationId}, # CorrelationId 1087 ); 1088 $request->{template} = $_Request_header_template; 1089 $request->{len} = $_Request_header_length; 1090 _encode_string( $request, $request_ref->{ClientId} ); # ClientId 1091} 1092 1093# Generates a template to decrypt the fetch response body 1094sub _decode_fetch_response_template { 1095 my ( $response ) = @_; 1096 1097 $response->{template} = $_FetchResponse_header_template; 1098 $response->{stream_offset} = $_FetchResponse_header_length; # bytes before topics array size 1099 # [l] Size 1100 # [l] CorrelationId 1101 my $topics_array_size = unpack( 1102 q{x[}.$response->{stream_offset} 1103 .q{]l>}, # topics array size 1104 ${ $response->{bin_stream} } 1105 ); 1106 $response->{stream_offset} += 4; # bytes before TopicName length 1107 # [l] topics array size 1108 1109 my ( $TopicName_length, $partitions_array_size ); 1110 while ( $topics_array_size-- ) { 1111 $TopicName_length = unpack( 1112 q{x[}.$response->{stream_offset} 1113 .q{]s>}, # TopicName length 1114 ${ $response->{bin_stream} } 1115 ); 1116 $response->{stream_offset} += # bytes before partitions array size 1117 2 # [s] TopicName length 1118 + $TopicName_length # TopicName 1119 ; 1120 $partitions_array_size = unpack( 1121 q{x[}.$response->{stream_offset} 1122 .q{]l>}, # partitions array size 1123 ${ $response->{bin_stream} } 1124 ); 1125 $response->{stream_offset} += 4; # bytes before Partition 1126 # [l] partitions array size 1127 1128 $response->{template} .= $_FetchResponse_topic_body_template; 1129 $response->{stream_offset} += $_FetchResponse_topic_body_length; # (without TopicName and partitions array size) 1130 # bytes before MessageSetSize 1131 # TopicName 1132 # [l] # partitions array size 1133 # [l] Partition 1134 # [s] ErrorCode 1135 # [q] HighwaterMarkOffset 1136 1137 _decode_MessageSet_template( $response ); 1138 } 1139} 1140 1141# Decrypts MessageSet 1142sub _decode_MessageSet_array { 1143 my ( $response, $MessageSetSize, $i_ref, $MessageSet_array_ref ) = @_; 1144 1145 my $data = $response->{data}; 1146 my $data_array_size = scalar @{ $data }; 1147 1148# NOTE: not all messages can be returned 1149 my ( $Message, $MessageSize, $Crc, $Key_length, $Value_length ); 1150 while ( $MessageSetSize && $$i_ref < $data_array_size ) { 1151 1152 $Message = { 1153 Offset => _unpack64( $data->[ $$i_ref++ ] ), # Offset 1154 }; 1155 1156 $MessageSize = $data->[ $$i_ref++ ]; # MessageSize 1157# NOTE: The CRC is the CRC32 of the remainder of the message bytes. 1158# This is used to check the integrity of the message on the broker and consumer: 1159# MagicByte + Attributes + Key length + Key + Value length + Value 1160 $Crc = $data->[ $$i_ref++ ]; # Crc 1161# WARNING: The current version of the module does not support the following: 1162# A message set is also the unit of compression in Kafka, 1163# and we allow messages to recursively contain compressed message sets to allow batch compression. 1164 $Message->{MagicByte} = $data->[ $$i_ref++ ]; # MagicByte 1165 $Message->{Attributes} = $data->[ $$i_ref++ ]; # Attributes 1166 1167 $Key_length = $data->[ $$i_ref++ ]; # Key length 1168 $Message->{Key} = $Key_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Key 1169 $Value_length = $data->[ $$i_ref++ ]; # Value length 1170 $Message->{Value} = $Value_length == $NULL_BYTES_LENGTH ? q{} : $data->[ $$i_ref++ ]; # Value 1171 1172 if ( my $compression_codec = $Message->{Attributes} & $COMPRESSION_CODEC_MASK ) { 1173 my $decompressed; 1174 if ( $compression_codec == $COMPRESSION_GZIP ) { 1175 gunzip( \$Message->{Value} => \$decompressed ) 1176 or _error( $ERROR_COMPRESSION, "gunzip failed: $GunzipError" ); 1177 } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) { 1178 my ( $header, $x_version, $x_compatversion, undef ) = unpack( q{a[8]L>L>L>}, $Message->{Value} ); # undef - $x_length 1179 1180 # Special thanks to Colin Blower 1181 if ( $header eq "\x82SNAPPY\x00" ) { 1182 # Found a xerial header.... nonstandard snappy compression header, remove the header 1183 if ( $x_compatversion == 1 && $x_version == 1 ) { 1184 $Message->{Value} = substr( $Message->{Value}, 20 ); # 20 = q{a[8]L>L>L>} 1185 } else { 1186 #warn("V $x_version and comp $x_compatversion"); 1187 _error( $ERROR_COMPRESSION, "Snappy compression with incompatible xerial header version found (x_version = $x_version, x_compatversion = $x_compatversion)" ); 1188 } 1189 } 1190 1191 $decompressed = Compress::Snappy::decompress( $Message->{Value} ) 1192 // _error( $ERROR_COMPRESSION, 'Unable to decompress snappy compressed data' ); 1193 } else { 1194 _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" ); 1195 } 1196 my @data; 1197 my $Value_length = length $decompressed; 1198 my $resp = { 1199 data => \@data, 1200 bin_stream => \$decompressed, 1201 stream_offset => 0, 1202 }; 1203 _decode_MessageSet_sized_template( $Value_length, $resp ); 1204 @data = unpack( $resp->{template}, ${ $resp->{bin_stream} } ); 1205 my $i = 0; # i_ref 1206 my $size = length( $decompressed ); 1207 _decode_MessageSet_array( 1208 $resp, 1209 $size, # message set size 1210 \$i, # i_ref 1211 $MessageSet_array_ref, 1212 ); 1213 } else { 1214 push( @$MessageSet_array_ref, $Message ); 1215 } 1216 1217 $MessageSetSize -= 12 1218 # [q] Offset 1219 # [l] MessageSize 1220 + $MessageSize # Message 1221 ; 1222 } 1223} 1224 1225# Generates a template to encrypt MessageSet 1226sub _encode_MessageSet_array { 1227 my ( $request, $MessageSet_array_ref, $compression_codec ) = @_; 1228 1229 my ( $MessageSize, $Key, $Value, $key_length, $value_length, $message_body, $message_set ); 1230 1231 if ( $compression_codec ) { 1232 foreach my $MessageSet ( @$MessageSet_array_ref ) { 1233 $key_length = length( $Key = $MessageSet->{Key} ); 1234 $value_length = length( $Value = $MessageSet->{Value} ); 1235 1236 $message_body = pack( 1237 q{ccl>} # MagicByte 1238 # Attributes 1239 # Key length 1240 .( $key_length ? qq{a[$key_length]} : q{} ) # Key 1241 .q{l>} # Value length 1242 .( $value_length ? qq{a[$value_length]} : q{} ) # Value 1243 , 1244 0, 1245 $COMPRESSION_NONE, # According to Apache Kafka documentation: 1246 # The lowest 2 bits contain the compression codec used for the message. 1247 # The other bits should be set to 0. 1248 $key_length ? ( $key_length, $Key ) : ( $NULL_BYTES_LENGTH ), 1249 $value_length ? ( $value_length, $Value ) : ( $NULL_BYTES_LENGTH ), 1250 ); 1251 1252 $message_set .= pack( qq(x[8]l>l>), # 8 Offset ($PRODUCER_ANY_OFFSET) 1253 length( $message_body ) + 4, # [l] MessageSize ( $message_body + Crc ) 1254 crc32( $message_body ) # [l] Crc 1255 ).$message_body; 1256 } 1257 1258 $MessageSet_array_ref = [ 1259 { 1260 Offset => $PRODUCER_ANY_OFFSET, 1261 Key => $Key, 1262 } 1263 ]; 1264 1265 # Compression 1266 if ( $compression_codec == $COMPRESSION_GZIP ) { 1267 $MessageSet_array_ref->[0]->{Value} = q{}; 1268 gzip( \$message_set => \$MessageSet_array_ref->[0]->{Value} ) 1269 or _error( $ERROR_COMPRESSION, "gzip failed: $GzipError" ); 1270 } elsif ( $compression_codec == $COMPRESSION_SNAPPY ) { 1271 $MessageSet_array_ref->[0]->{Value} = Compress::Snappy::compress( $message_set ) 1272 // _error( $ERROR_COMPRESSION, 'Unable to compress snappy data' ); 1273 } else { 1274 _error( $ERROR_COMPRESSION, "Unknown compression codec $compression_codec" ); 1275 } 1276 } 1277 1278 my $data = $request->{data}; 1279 my $MessageSetSize = 0; 1280 my %sizes; 1281 foreach my $MessageSet ( @$MessageSet_array_ref ) { 1282 $MessageSetSize += 1283 12 # [q] Offset 1284 # [l] MessageSize 1285 + ( $sizes{ $MessageSet } = # MessageSize 1286 10 # [l] Crc 1287 # [c] MagicByte 1288 # [c] Attributes 1289 # [l] Key length 1290 + length( $MessageSet->{Key} //= q{} ) # Key 1291 + 4 # [l] Value length 1292 + length( $MessageSet->{Value} //= q{} ) # Value 1293 ) # MessageSize 1294 ; 1295 } 1296 push( @$data, $MessageSetSize ); 1297 $request->{template} .= q{l>}; # MessageSetSize 1298 $request->{len} += 4; 1299 1300 foreach my $MessageSet ( @$MessageSet_array_ref ) { 1301 push( @$data, 1302 _pack64( $MessageSet->{Offset} ), # Offset (It may be $PRODUCER_ANY_OFFSET) 1303 ( $MessageSize = $sizes{ $MessageSet } ), # MessageSize 1304 ); 1305 $request->{template} .= $_MessageSet_template; 1306 $request->{len} += $_MessageSet_length; 1307 1308 $key_length = length( $Key = $MessageSet->{Key} ); 1309 $value_length = length( $Value = $MessageSet->{Value} ); 1310 1311 $message_body = pack( 1312 q{ccl>} # MagicByte 1313 # Attributes 1314 # Key length 1315 .( $key_length ? qq{a[$key_length]} : q{} ) # Key 1316 .q{l>} # Value length 1317 .( $value_length ? qq{a[$value_length]} : q{} ) # Value 1318 , 1319 0, 1320 $compression_codec // $COMPRESSION_NONE, # According to Apache Kafka documentation: 1321 # The lowest 2 bits contain the compression codec used for the message. 1322 # The other bits should be set to 0. 1323 $key_length ? ( $key_length, $Key ) : ( $NULL_BYTES_LENGTH ), 1324 $value_length ? ( $value_length, $Value ) : ( $NULL_BYTES_LENGTH ), 1325 ); 1326 1327 push( @$data, crc32( $message_body ), $message_body ); 1328 # Message 1329 $request->{template} .= q{l>a[} # Crc 1330 .( $MessageSize - 4 ) # 4 Crc 1331 .qq{]}; 1332 # Message body: 1333 # MagicByte 1334 # Attributes 1335 # Key length 1336 # Key 1337 # Value length 1338 # Value 1339 $request->{len} += $MessageSize; # Message 1340 } 1341} 1342 1343# Generates a template to decrypt MessageSet 1344sub _decode_MessageSet_template { 1345 my ( $response ) = @_; 1346 1347 my $MessageSetSize = unpack( 1348 q{x[}.$response->{stream_offset} 1349 .q{]l>}, # MessageSetSize 1350 ${ $response->{bin_stream} } 1351 ); 1352 $response->{template} .= q{l>}; # MessageSetSize 1353 $response->{stream_offset} += 4; # bytes before Offset 1354 1355 return _decode_MessageSet_sized_template($MessageSetSize, $response); 1356} 1357 1358sub _decode_MessageSet_sized_template { 1359 my ( $MessageSetSize, $response ) = @_; 1360 1361 my $bin_stream_length = length ${ $response->{bin_stream} }; 1362 1363 my ( $local_template, $MessageSize, $Key_length, $Value_length ); 1364 CREATE_TEMPLATE: 1365 while ( $MessageSetSize ) { 1366# Not the full MessageSet 1367 last CREATE_TEMPLATE if $MessageSetSize < 22; 1368 # [q] Offset 1369 # [l] MessageSize 1370 # [l] Crc 1371 # [c] MagicByte 1372 # [c] Attributes 1373 # [l] Key length 1374 # [l] Value length 1375 1376 $local_template = q{}; 1377 MESSAGE_SET: 1378 { 1379 $local_template .= $_Message_template; 1380 $response->{stream_offset} += $_Message_length; # (Only Offset length) 1381 # [q] Offset 1382 # [l] MessageSize 1383 # [l] Crc 1384 # [c] MagicByte 1385 # [c] Attributes 1386 # [l] Key length 1387 # bytes before MessageSize 1388 # [q] Offset 1389 $MessageSize = unpack( 1390 q{x[}.$response->{stream_offset} 1391 .q{]l>}, # MessageSize 1392 ${ $response->{bin_stream} } 1393 ); 1394 1395 $response->{stream_offset} += 10; # bytes before Crc 1396 # [l] MessageSize 1397 # bytes before Key length 1398 # [l] Crc 1399 # [c] MagicByte 1400 # [c] Attributes 1401 $Key_length = unpack( 1402 q{x[}.$response->{stream_offset} 1403 .q{]l>}, # Key length 1404 ${ $response->{bin_stream} } 1405 ); 1406 1407 $response->{stream_offset} += 4; # bytes before Key or Value length 1408 # [l] Key length 1409 $response->{stream_offset} += $Key_length # bytes before Key 1410 if $Key_length != $NULL_BYTES_LENGTH; # Key 1411 if ( $bin_stream_length >= $response->{stream_offset} + 4 ) { # + [l] Value length 1412 $local_template .= $_Key_or_Value_template 1413 if $Key_length != $NULL_BYTES_LENGTH; 1414 } 1415 else { 1416# Not the full MessageSet 1417 $local_template = q{}; 1418 last MESSAGE_SET; 1419 } 1420 1421 $local_template .= q{l>}; # Value length 1422 $Value_length = unpack( 1423 q{x[}.$response->{stream_offset} 1424 .q{]l>}, # Value length 1425 ${ $response->{bin_stream} } 1426 ); 1427 $response->{stream_offset} += # bytes before Value or next Message 1428 4 # [l] Value length 1429 ; 1430 $response->{stream_offset} += $Value_length # bytes before next Message 1431 if $Value_length != $NULL_BYTES_LENGTH; # Value 1432 if ( $bin_stream_length >= $response->{stream_offset} ) { 1433 $local_template .= $_Key_or_Value_template 1434 if $Value_length != $NULL_BYTES_LENGTH; 1435 } 1436 else { 1437# Not the full MessageSet 1438 $local_template = q{}; 1439 last MESSAGE_SET; 1440 } 1441 } 1442 1443 if ( $local_template ) { 1444 $response->{template} .= $local_template; 1445 $MessageSetSize -= 12 1446 # [q] Offset 1447 # [l] MessageSize 1448 + $MessageSize # Message 1449 ; 1450 } 1451 else { 1452 last CREATE_TEMPLATE; 1453 } 1454 } 1455} 1456 1457# Generates a template to encrypt string 1458sub _encode_string { 1459 my ( $request, $string ) = @_; 1460 1461 if ( $string eq q{} ) { 1462 push( @{ $request->{data} }, 0 ); 1463 } 1464 else { 1465 my $string_length = length $string; 1466 push( @{ $request->{data} }, $string_length, $string ); 1467 $request->{template} .= q{a*}; # string; 1468 $request->{len} += $string_length; 1469 } 1470} 1471 1472# Handler for errors 1473sub _error { 1474 Kafka::Exception::Protocol->throw( throw_args( @_ ) ); 1475} 1476 14771; 1478 1479__END__ 1480 1481=head1 DIAGNOSTICS 1482 1483In order to achieve better performance, functions of this module do not perform 1484arguments validation. 1485 1486=head1 SEE ALSO 1487 1488The basic operation of the Kafka package modules: 1489 1490L<Kafka|Kafka> - constants and messages used by the Kafka package modules. 1491 1492L<Kafka::Connection|Kafka::Connection> - interface to connect to a Kafka cluster. 1493 1494L<Kafka::Producer|Kafka::Producer> - interface for producing client. 1495 1496L<Kafka::Consumer|Kafka::Consumer> - interface for consuming client. 1497 1498L<Kafka::Message|Kafka::Message> - interface to access Kafka message 1499properties. 1500 1501L<Kafka::Int64|Kafka::Int64> - functions to work with 64 bit elements of the 1502protocol on 32 bit systems. 1503 1504L<Kafka::Protocol|Kafka::Protocol> - functions to process messages in the 1505Apache Kafka's Protocol. 1506 1507L<Kafka::IO|Kafka::IO> - low-level interface for communication with Kafka server. 1508 1509L<Kafka::Exceptions|Kafka::Exceptions> - module designated to handle Kafka exceptions. 1510 1511L<Kafka::Internals|Kafka::Internals> - internal constants and functions used 1512by several package modules. 1513 1514A wealth of detail about the Apache Kafka and the Kafka Protocol: 1515 1516Main page at L<http://kafka.apache.org/> 1517 1518Kafka Protocol at L<https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol> 1519 1520=head1 SOURCE CODE 1521 1522Kafka package is hosted on GitHub: 1523L<https://github.com/TrackingSoft/Kafka> 1524 1525=head1 AUTHOR 1526 1527Sergey Gladkov, E<lt>sgladkov@trackingsoft.comE<gt> 1528 1529=head1 CONTRIBUTORS 1530 1531Alexander Solovey 1532 1533Jeremy Jordan 1534 1535Sergiy Zuban 1536 1537Vlad Marchenko 1538 1539=head1 COPYRIGHT AND LICENSE 1540 1541Copyright (C) 2012-2013 by TrackingSoft LLC. 1542 1543This package is free software; you can redistribute it and/or modify it under 1544the same terms as Perl itself. See I<perlartistic> at 1545L<http://dev.perl.org/licenses/artistic.html>. 1546 1547This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; 1548without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 1549PARTICULAR PURPOSE. 1550 1551=cut 1552