1package transfer
2
3import (
4	"context"
5	"encoding/base64"
6	"errors"
7	"fmt"
8	"net"
9
10	"github.com/golang/protobuf/proto"
11
12	hadoop "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
13	hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
14	"github.com/colinmarc/hdfs/v2/internal/sasl"
15)
16
17const (
18	authMethod    = "TOKEN"
19	authMechanism = "DIGEST-MD5"
20	authServer    = "0"
21	authProtocol  = "hdfs"
22)
23
24// SaslDialer dials using the underlying DialFunc, then negotiates
25// authentication with the datanode. The resulting Conn implements whatever
26// data protection level is specified by the server, whether it be wire
27// encryption or integrity checks.
28type SaslDialer struct {
29	DialFunc   func(ctx context.Context, network, addr string) (net.Conn, error)
30	Key        *hdfs.DataEncryptionKeyProto
31	Token      *hadoop.TokenProto
32	EnforceQop string
33}
34
35func (d *SaslDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
36	if d.DialFunc == nil {
37		d.DialFunc = (&net.Dialer{}).DialContext
38	}
39
40	conn, err := d.DialFunc(ctx, network, addr)
41	if err != nil {
42		return nil, err
43	}
44
45	return d.wrapDatanodeConn(conn)
46}
47
48// wrapDatanodeConn performs a shortened SASL negotiation with the datanode,
49// then returns a wrapped connection or any error encountered. In the case of
50// a protection setting of 'authentication', the bare connection is returned.
51func (d *SaslDialer) wrapDatanodeConn(conn net.Conn) (net.Conn, error) {
52	auth := &hadoop.RpcSaslProto_SaslAuth{}
53	auth.Method = proto.String(authMethod)
54	auth.Mechanism = proto.String(authMechanism)
55	auth.ServerId = proto.String(authServer)
56	auth.Protocol = proto.String(authProtocol)
57
58	ourToken := &hadoop.TokenProto{}
59	ourToken.Kind = d.Token.Kind
60	ourToken.Password = d.Token.Password[:]
61	ourToken.Service = d.Token.Service
62	ourToken.Identifier = d.Token.GetIdentifier()
63
64	// If the server defaults have EncryptDataTransfer set but the encryption
65	// key is empty, the namenode doesn't want us to encrypt the block token.
66	if d.Key != nil && len(d.Key.Nonce) > 0 {
67		// Amusingly, this is unsigned in the proto struct but is expected
68		// to be signed here.
69		keyId := int32(d.Key.GetKeyId())
70
71		ourToken.Identifier = []byte(fmt.Sprintf("%d %s %s",
72			keyId,
73			d.Key.GetBlockPoolId(),
74			base64.StdEncoding.EncodeToString(d.Key.Nonce)))
75		ourToken.Password = d.Key.EncryptionKey
76	} else {
77		ourToken.Identifier = make([]byte,
78			base64.StdEncoding.EncodedLen(len(d.Token.GetIdentifier())))
79		base64.StdEncoding.Encode(ourToken.Identifier, d.Token.GetIdentifier())
80	}
81
82	dgst := digestMD5Handshake{
83		authID:   ourToken.Identifier,
84		passwd:   base64.StdEncoding.EncodeToString(ourToken.Password),
85		hostname: auth.GetServerId(),
86		service:  auth.GetProtocol(),
87	}
88
89	// Begin the handshake with 0xDEADBEEF and an empty message.
90	msg := &hdfs.DataTransferEncryptorMessageProto{}
91	msg.Status = hdfs.DataTransferEncryptorMessageProto_SUCCESS.Enum()
92	data, err := makePrefixedMessage(msg)
93	if err != nil {
94		return nil, err
95	}
96
97	data = append([]byte{0xDE, 0xAD, 0xBE, 0xEF}, data...)
98	_, err = conn.Write(data)
99	if err != nil {
100		return nil, err
101	}
102
103	// The response includes a challenge. Compute it and send it back.
104	resp := &hdfs.DataTransferEncryptorMessageProto{}
105	err = readPrefixedMessage(conn, msg)
106	if err != nil {
107		return nil, err
108	}
109
110	challengeResponse, err := dgst.challengeStep1(msg.Payload)
111	if err != nil {
112		return nil, err
113	}
114
115	// Use the server's QOP unless one was specified in the local configuration.
116	privacy := false
117	integrity := false
118	switch dgst.token.Qop[0] {
119	case sasl.QopPrivacy:
120		privacy = true
121		integrity = true
122	case sasl.QopIntegrity:
123		if d.EnforceQop == "privacy" {
124			return nil, errors.New("negotiating data protection: invalid qop: 'integrity'")
125		}
126
127		privacy = false
128		integrity = true
129	default:
130		if d.EnforceQop == "privacy" || d.EnforceQop == "integrity" {
131			return nil, fmt.Errorf("negotiating data protection: invalid qop: %s", dgst.token.Qop)
132		}
133	}
134
135	msg = &hdfs.DataTransferEncryptorMessageProto{}
136	msg.Status = hdfs.DataTransferEncryptorMessageProto_SUCCESS.Enum()
137	msg.Payload = []byte(challengeResponse)
138
139	if privacy {
140		// Indicate to the server that we want AES.
141		opt := &hdfs.CipherOptionProto{}
142		opt.Suite = hdfs.CipherSuiteProto_AES_CTR_NOPADDING.Enum()
143		msg.CipherOption = append(msg.CipherOption, opt)
144	}
145
146	data, err = makePrefixedMessage(msg)
147	if err != nil {
148		return nil, err
149	}
150
151	_, err = conn.Write(data)
152	if err != nil {
153		return nil, err
154	}
155
156	// Read another response from the server.
157	err = readPrefixedMessage(conn, resp)
158	if err != nil {
159		return nil, err
160	}
161
162	err = dgst.challengeStep2(resp.Payload)
163	if err != nil {
164		return nil, err
165	}
166
167	// Authentication done; we can return the bare connection if we don't need
168	// to do anything else.
169	if !privacy && !integrity {
170		return conn, nil
171	}
172
173	kic, kis := generateIntegrityKeys(dgst.a1())
174
175	var wrapped digestMD5Conn
176	if privacy {
177		if dgst.cipher == "" {
178			return nil, fmt.Errorf("no available cipher among choices: %v", dgst.token.Cipher)
179		}
180
181		kcc, kcs := generatePrivacyKeys(dgst.a1(), dgst.cipher)
182		wrapped = newDigestMD5PrivacyConn(conn, kic, kis, kcc, kcs)
183	} else {
184		wrapped = newDigestMD5IntegrityConn(conn, kic, kis)
185	}
186
187	// If we're going to encrypt, we use the above wrapped connection just for
188	// finishing the handshake.
189	if len(resp.GetCipherOption()) > 0 {
190		cipher := resp.GetCipherOption()[0]
191		var outKey []byte
192
193		decoded, err := wrapped.decode(cipher.InKey)
194		if err != nil {
195			return nil, err
196		}
197
198		inKey := make([]byte, len(decoded))
199		copy(inKey, decoded)
200
201		if outKey, err = wrapped.decode(cipher.OutKey); err != nil {
202			return nil, err
203		}
204
205		return newAesConn(conn, inKey, outKey, cipher.InIv, cipher.OutIv)
206	}
207
208	return wrapped, nil
209}
210