1package transfer 2 3import ( 4 "context" 5 "encoding/base64" 6 "errors" 7 "fmt" 8 "net" 9 10 "github.com/golang/protobuf/proto" 11 12 hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common" 13 hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs" 14 "github.com/colinmarc/hdfs/v2/internal/sasl" 15) 16 17const ( 18 authMethod = "TOKEN" 19 authMechanism = "DIGEST-MD5" 20 authServer = "0" 21 authProtocol = "hdfs" 22) 23 24// SaslDialer dials using the underlying DialFunc, then negotiates 25// authentication with the datanode. The resulting Conn implements whatever 26// data protection level is specified by the server, whether it be wire 27// encryption or integrity checks. 28type SaslDialer struct { 29 DialFunc func(ctx context.Context, network, addr string) (net.Conn, error) 30 Key *hdfs.DataEncryptionKeyProto 31 Token *hadoop.TokenProto 32 EnforceQop string 33} 34 35func (d *SaslDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { 36 if d.DialFunc == nil { 37 d.DialFunc = (&net.Dialer{}).DialContext 38 } 39 40 conn, err := d.DialFunc(ctx, network, addr) 41 if err != nil { 42 return nil, err 43 } 44 45 return d.wrapDatanodeConn(conn) 46} 47 48// wrapDatanodeConn performs a shortened SASL negotiation with the datanode, 49// then returns a wrapped connection or any error encountered. In the case of 50// a protection setting of 'authentication', the bare connection is returned. 51func (d *SaslDialer) wrapDatanodeConn(conn net.Conn) (net.Conn, error) { 52 auth := &hadoop.RpcSaslProto_SaslAuth{} 53 auth.Method = proto.String(authMethod) 54 auth.Mechanism = proto.String(authMechanism) 55 auth.ServerId = proto.String(authServer) 56 auth.Protocol = proto.String(authProtocol) 57 58 ourToken := &hadoop.TokenProto{} 59 ourToken.Kind = d.Token.Kind 60 ourToken.Password = d.Token.Password[:] 61 ourToken.Service = d.Token.Service 62 ourToken.Identifier = d.Token.GetIdentifier() 63 64 // If the server defaults have EncryptDataTransfer set but the encryption 65 // key is empty, the namenode doesn't want us to encrypt the block token. 66 if d.Key != nil && len(d.Key.Nonce) > 0 { 67 // Amusingly, this is unsigned in the proto struct but is expected 68 // to be signed here. 69 keyId := int32(d.Key.GetKeyId()) 70 71 ourToken.Identifier = []byte(fmt.Sprintf("%d %s %s", 72 keyId, 73 d.Key.GetBlockPoolId(), 74 base64.StdEncoding.EncodeToString(d.Key.Nonce))) 75 ourToken.Password = d.Key.EncryptionKey 76 } else { 77 ourToken.Identifier = make([]byte, 78 base64.StdEncoding.EncodedLen(len(d.Token.GetIdentifier()))) 79 base64.StdEncoding.Encode(ourToken.Identifier, d.Token.GetIdentifier()) 80 } 81 82 dgst := digestMD5Handshake{ 83 authID: ourToken.Identifier, 84 passwd: base64.StdEncoding.EncodeToString(ourToken.Password), 85 hostname: auth.GetServerId(), 86 service: auth.GetProtocol(), 87 } 88 89 // Begin the handshake with 0xDEADBEEF and an empty message. 90 msg := &hdfs.DataTransferEncryptorMessageProto{} 91 msg.Status = hdfs.DataTransferEncryptorMessageProto_SUCCESS.Enum() 92 data, err := makePrefixedMessage(msg) 93 if err != nil { 94 return nil, err 95 } 96 97 data = append([]byte{0xDE, 0xAD, 0xBE, 0xEF}, data...) 98 _, err = conn.Write(data) 99 if err != nil { 100 return nil, err 101 } 102 103 // The response includes a challenge. Compute it and send it back. 104 resp := &hdfs.DataTransferEncryptorMessageProto{} 105 err = readPrefixedMessage(conn, msg) 106 if err != nil { 107 return nil, err 108 } 109 110 challengeResponse, err := dgst.challengeStep1(msg.Payload) 111 if err != nil { 112 return nil, err 113 } 114 115 // Use the server's QOP unless one was specified in the local configuration. 116 privacy := false 117 integrity := false 118 switch dgst.token.Qop[0] { 119 case sasl.QopPrivacy: 120 privacy = true 121 integrity = true 122 case sasl.QopIntegrity: 123 if d.EnforceQop == "privacy" { 124 return nil, errors.New("negotiating data protection: invalid qop: 'integrity'") 125 } 126 127 privacy = false 128 integrity = true 129 default: 130 if d.EnforceQop == "privacy" || d.EnforceQop == "integrity" { 131 return nil, fmt.Errorf("negotiating data protection: invalid qop: %s", dgst.token.Qop) 132 } 133 } 134 135 msg = &hdfs.DataTransferEncryptorMessageProto{} 136 msg.Status = hdfs.DataTransferEncryptorMessageProto_SUCCESS.Enum() 137 msg.Payload = []byte(challengeResponse) 138 139 if privacy { 140 // Indicate to the server that we want AES. 141 opt := &hdfs.CipherOptionProto{} 142 opt.Suite = hdfs.CipherSuiteProto_AES_CTR_NOPADDING.Enum() 143 msg.CipherOption = append(msg.CipherOption, opt) 144 } 145 146 data, err = makePrefixedMessage(msg) 147 if err != nil { 148 return nil, err 149 } 150 151 _, err = conn.Write(data) 152 if err != nil { 153 return nil, err 154 } 155 156 // Read another response from the server. 157 err = readPrefixedMessage(conn, resp) 158 if err != nil { 159 return nil, err 160 } 161 162 err = dgst.challengeStep2(resp.Payload) 163 if err != nil { 164 return nil, err 165 } 166 167 // Authentication done; we can return the bare connection if we don't need 168 // to do anything else. 169 if !privacy && !integrity { 170 return conn, nil 171 } 172 173 kic, kis := generateIntegrityKeys(dgst.a1()) 174 175 var wrapped digestMD5Conn 176 if privacy { 177 if dgst.cipher == "" { 178 return nil, fmt.Errorf("no available cipher among choices: %v", dgst.token.Cipher) 179 } 180 181 kcc, kcs := generatePrivacyKeys(dgst.a1(), dgst.cipher) 182 wrapped = newDigestMD5PrivacyConn(conn, kic, kis, kcc, kcs) 183 } else { 184 wrapped = newDigestMD5IntegrityConn(conn, kic, kis) 185 } 186 187 // If we're going to encrypt, we use the above wrapped connection just for 188 // finishing the handshake. 189 if len(resp.GetCipherOption()) > 0 { 190 cipher := resp.GetCipherOption()[0] 191 var outKey []byte 192 193 decoded, err := wrapped.decode(cipher.InKey) 194 if err != nil { 195 return nil, err 196 } 197 198 inKey := make([]byte, len(decoded)) 199 copy(inKey, decoded) 200 201 if outKey, err = wrapped.decode(cipher.OutKey); err != nil { 202 return nil, err 203 } 204 205 return newAesConn(conn, inKey, outKey, cipher.InIv, cipher.OutIv) 206 } 207 208 return wrapped, nil 209} 210