1/*
2NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5This program is free software: you can redistribute it and/or modify
6it under the terms of the GNU General Public License as published by
7the Free Software Foundation, version 3 of the License.
8
9This program is distributed in the hope that it will be useful,
10but WITHOUT ANY WARRANTY; without even the implied warranty of
11MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12GNU General Public License for more details.
13
14You should have received a copy of the GNU General Public License
15along with this program.  If not, see <http://www.gnu.org/licenses/>.
16*/
17
18package nncp
19
20import (
21	"archive/tar"
22	"bufio"
23	"bytes"
24	"errors"
25	"fmt"
26	"io"
27	"os"
28	"path/filepath"
29	"strconv"
30	"strings"
31	"time"
32
33	xdr "github.com/davecgh/go-xdr/xdr2"
34	"github.com/dustin/go-humanize"
35	"github.com/klauspost/compress/zstd"
36	"golang.org/x/crypto/blake2b"
37)
38
39const (
40	MaxFileSize = 1 << 62
41
42	TarBlockSize = 512
43	TarExt       = ".tar"
44)
45
46type PktEncWriteResult struct {
47	pktEncRaw []byte
48	size      int64
49	err       error
50}
51
52func (ctx *Ctx) Tx(
53	node *Node,
54	pkt *Pkt,
55	nice uint8,
56	srcSize, minSize, maxSize int64,
57	src io.Reader,
58	pktName string,
59	areaId *AreaId,
60) (*Node, int64, error) {
61	var area *Area
62	if areaId != nil {
63		area = ctx.AreaId2Area[*areaId]
64		if area.Prv == nil {
65			return nil, 0, errors.New("area has no encryption keys")
66		}
67	}
68	hops := make([]*Node, 0, 1+len(node.Via))
69	hops = append(hops, node)
70	lastNode := node
71	for i := len(node.Via); i > 0; i-- {
72		lastNode = ctx.Neigh[*node.Via[i-1]]
73		hops = append(hops, lastNode)
74	}
75	wrappers := len(hops)
76	if area != nil {
77		wrappers++
78	}
79	var expectedSize int64
80	if srcSize > 0 {
81		expectedSize = srcSize + PktOverhead
82		expectedSize += sizePadCalc(expectedSize, minSize, wrappers)
83		expectedSize = PktEncOverhead + sizeWithTags(expectedSize)
84		if maxSize != 0 && expectedSize > maxSize {
85			return nil, 0, TooBig
86		}
87		if !ctx.IsEnoughSpace(expectedSize) {
88			return nil, 0, errors.New("is not enough space")
89		}
90	}
91	tmp, err := ctx.NewTmpFileWHash()
92	if err != nil {
93		return nil, 0, err
94	}
95
96	results := make(chan PktEncWriteResult)
97	pipeR, pipeW := io.Pipe()
98	var pipeRPrev io.Reader
99	if area == nil {
100		go func(src io.Reader, dst io.WriteCloser) {
101			ctx.LogD("tx", LEs{
102				{"Node", hops[0].Id},
103				{"Nice", int(nice)},
104				{"Size", expectedSize},
105			}, func(les LEs) string {
106				return fmt.Sprintf(
107					"Tx packet to %s (source %s) nice: %s",
108					ctx.NodeName(hops[0].Id),
109					humanize.IBytes(uint64(expectedSize)),
110					NicenessFmt(nice),
111				)
112			})
113			pktEncRaw, size, err := PktEncWrite(
114				ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst,
115			)
116			results <- PktEncWriteResult{pktEncRaw, size, err}
117			dst.Close()
118		}(src, pipeW)
119	} else {
120		go func(src io.Reader, dst io.WriteCloser) {
121			ctx.LogD("tx", LEs{
122				{"Area", area.Id},
123				{"Nice", int(nice)},
124				{"Size", expectedSize},
125			}, func(les LEs) string {
126				return fmt.Sprintf(
127					"Tx area packet to %s (source %s) nice: %s",
128					ctx.AreaName(areaId),
129					humanize.IBytes(uint64(expectedSize)),
130					NicenessFmt(nice),
131				)
132			})
133			areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)}
134			copy(areaNode.Id[:], area.Id[:])
135			copy(areaNode.ExchPub[:], area.Pub[:])
136			pktEncRaw, size, err := PktEncWrite(
137				ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst,
138			)
139			results <- PktEncWriteResult{pktEncRaw, size, err}
140			dst.Close()
141		}(src, pipeW)
142		pipeRPrev = pipeR
143		pipeR, pipeW = io.Pipe()
144		go func(src io.Reader, dst io.WriteCloser) {
145			pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:])
146			if err != nil {
147				panic(err)
148			}
149			ctx.LogD("tx", LEs{
150				{"Node", hops[0].Id},
151				{"Nice", int(nice)},
152				{"Size", expectedSize},
153			}, func(les LEs) string {
154				return fmt.Sprintf(
155					"Tx packet to %s (source %s) nice: %s",
156					ctx.NodeName(hops[0].Id),
157					humanize.IBytes(uint64(expectedSize)),
158					NicenessFmt(nice),
159				)
160			})
161			pktEncRaw, size, err := PktEncWrite(
162				ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst,
163			)
164			results <- PktEncWriteResult{pktEncRaw, size, err}
165			dst.Close()
166		}(pipeRPrev, pipeW)
167	}
168	for i := 1; i < len(hops); i++ {
169		pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:])
170		if err != nil {
171			panic(err)
172		}
173		pipeRPrev = pipeR
174		pipeR, pipeW = io.Pipe()
175		go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) {
176			ctx.LogD("tx", LEs{
177				{"Node", node.Id},
178				{"Nice", int(nice)},
179			}, func(les LEs) string {
180				return fmt.Sprintf(
181					"Tx trns packet to %s nice: %s",
182					ctx.NodeName(node.Id),
183					NicenessFmt(nice),
184				)
185			})
186			pktEncRaw, size, err := PktEncWrite(
187				ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst,
188			)
189			results <- PktEncWriteResult{pktEncRaw, size, err}
190			dst.Close()
191		}(hops[i], pktTrns, pipeRPrev, pipeW)
192	}
193	go func() {
194		_, err := CopyProgressed(
195			tmp.W, pipeR, "Tx",
196			LEs{{"Pkt", pktName}, {"FullSize", expectedSize}},
197			ctx.ShowPrgrs,
198		)
199		results <- PktEncWriteResult{err: err}
200	}()
201	var pktEncRaw []byte
202	var pktEncMsg []byte
203	var payloadSize int64
204	if area != nil {
205		r := <-results
206		payloadSize = r.size
207		pktEncMsg = r.pktEncRaw
208		wrappers--
209	}
210	for i := 0; i <= wrappers; i++ {
211		r := <-results
212		if r.err != nil {
213			tmp.Fd.Close()
214			return nil, 0, err
215		}
216		if r.pktEncRaw != nil {
217			pktEncRaw = r.pktEncRaw
218			if payloadSize == 0 {
219				payloadSize = r.size
220			}
221		}
222	}
223	nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
224	err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
225	os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name))
226	if err != nil {
227		return lastNode, 0, err
228	}
229	if ctx.HdrUsage {
230		ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
231	}
232	if area != nil {
233		msgHashRaw := blake2b.Sum256(pktEncMsg)
234		msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
235		seenDir := filepath.Join(
236			ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(),
237		)
238		seenPath := filepath.Join(seenDir, msgHash)
239		les := LEs{
240			{"Node", node.Id},
241			{"Nice", int(nice)},
242			{"Size", expectedSize},
243			{"Area", areaId},
244			{"AreaMsg", msgHash},
245		}
246		logMsg := func(les LEs) string {
247			return fmt.Sprintf(
248				"Tx area packet to %s (source %s) nice: %s, area %s: %s",
249				ctx.NodeName(node.Id),
250				humanize.IBytes(uint64(expectedSize)),
251				NicenessFmt(nice),
252				area.Name,
253				msgHash,
254			)
255		}
256		if err = ensureDir(seenDir); err != nil {
257			ctx.LogE("tx-mkdir", les, err, logMsg)
258			return lastNode, 0, err
259		}
260		if fd, err := os.Create(seenPath); err == nil {
261			fd.Close()
262			if err = DirSync(seenDir); err != nil {
263				ctx.LogE("tx-dirsync", les, err, logMsg)
264				return lastNode, 0, err
265			}
266		}
267		ctx.LogI("tx-area", les, logMsg)
268	}
269	return lastNode, payloadSize, err
270}
271
272type DummyCloser struct{}
273
274func (dc DummyCloser) Close() error { return nil }
275
276func prepareTxFile(srcPath string) (
277	reader io.Reader,
278	closer io.Closer,
279	srcSize int64,
280	archived bool,
281	rerr error,
282) {
283	if srcPath == "-" {
284		reader = os.Stdin
285		closer = os.Stdin
286		return
287	}
288
289	srcStat, err := os.Stat(srcPath)
290	if err != nil {
291		rerr = err
292		return
293	}
294	mode := srcStat.Mode()
295
296	if mode.IsRegular() {
297		// It is regular file, just send it
298		src, err := os.Open(srcPath)
299		if err != nil {
300			rerr = err
301			return
302		}
303		reader = src
304		closer = src
305		srcSize = srcStat.Size()
306		return
307	}
308
309	if !mode.IsDir() {
310		rerr = errors.New("unsupported file type")
311		return
312	}
313
314	// It is directory, create PAX archive with its contents
315	archived = true
316	basePath := filepath.Base(srcPath)
317	rootPath, err := filepath.Abs(srcPath)
318	if err != nil {
319		rerr = err
320		return
321	}
322	type einfo struct {
323		path    string
324		modTime time.Time
325		size    int64
326	}
327	dirs := make([]einfo, 0, 1<<10)
328	files := make([]einfo, 0, 1<<10)
329	rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error {
330		if err != nil {
331			return err
332		}
333		if info.IsDir() {
334			// directory header, PAX record header+contents
335			srcSize += TarBlockSize + 2*TarBlockSize
336			dirs = append(dirs, einfo{path: path, modTime: info.ModTime()})
337		} else {
338			// file header, PAX record header+contents, file content
339			srcSize += TarBlockSize + 2*TarBlockSize + info.Size()
340			if n := info.Size() % TarBlockSize; n != 0 {
341				srcSize += TarBlockSize - n // padding
342			}
343			files = append(files, einfo{
344				path:    path,
345				modTime: info.ModTime(),
346				size:    info.Size(),
347			})
348		}
349		return nil
350	})
351	if rerr != nil {
352		return
353	}
354
355	r, w := io.Pipe()
356	reader = r
357	closer = DummyCloser{}
358	srcSize += 2 * TarBlockSize // termination block
359
360	go func() error {
361		tarWr := tar.NewWriter(w)
362		hdr := tar.Header{
363			Typeflag: tar.TypeDir,
364			Mode:     0777,
365			PAXRecords: map[string]string{
366				"comment": "Autogenerated by " + VersionGet(),
367			},
368			Format: tar.FormatPAX,
369		}
370		for _, e := range dirs {
371			hdr.Name = basePath + e.path[len(rootPath):]
372			hdr.ModTime = e.modTime
373			if err = tarWr.WriteHeader(&hdr); err != nil {
374				return w.CloseWithError(err)
375			}
376		}
377		hdr.Typeflag = tar.TypeReg
378		hdr.Mode = 0666
379		for _, e := range files {
380			hdr.Name = basePath + e.path[len(rootPath):]
381			hdr.ModTime = e.modTime
382			hdr.Size = e.size
383			if err = tarWr.WriteHeader(&hdr); err != nil {
384				return w.CloseWithError(err)
385			}
386			fd, err := os.Open(e.path)
387			if err != nil {
388				fd.Close()
389				return w.CloseWithError(err)
390			}
391			if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil {
392				fd.Close()
393				return w.CloseWithError(err)
394			}
395			fd.Close()
396		}
397		if err = tarWr.Close(); err != nil {
398			return w.CloseWithError(err)
399		}
400		return w.Close()
401	}()
402	return
403}
404
405func (ctx *Ctx) TxFile(
406	node *Node,
407	nice uint8,
408	srcPath, dstPath string,
409	chunkSize, minSize, maxSize int64,
410	areaId *AreaId,
411) error {
412	dstPathSpecified := false
413	if dstPath == "" {
414		if srcPath == "-" {
415			return errors.New("Must provide destination filename")
416		}
417		dstPath = filepath.Base(srcPath)
418	} else {
419		dstPathSpecified = true
420	}
421	dstPath = filepath.Clean(dstPath)
422	if filepath.IsAbs(dstPath) {
423		return errors.New("Relative destination path required")
424	}
425	reader, closer, srcSize, archived, err := prepareTxFile(srcPath)
426	if closer != nil {
427		defer closer.Close()
428	}
429	if err != nil {
430		return err
431	}
432	if archived && !dstPathSpecified {
433		dstPath += TarExt
434	}
435
436	if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) {
437		pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath))
438		if err != nil {
439			return err
440		}
441		_, finalSize, err := ctx.Tx(
442			node, pkt, nice,
443			srcSize, minSize, maxSize,
444			bufio.NewReader(reader), dstPath, areaId,
445		)
446		les := LEs{
447			{"Type", "file"},
448			{"Node", node.Id},
449			{"Nice", int(nice)},
450			{"Src", srcPath},
451			{"Dst", dstPath},
452			{"Size", finalSize},
453		}
454		logMsg := func(les LEs) string {
455			return fmt.Sprintf(
456				"File %s (%s) sent to %s:%s",
457				srcPath,
458				humanize.IBytes(uint64(finalSize)),
459				ctx.NodeName(node.Id),
460				dstPath,
461			)
462		}
463		if err == nil {
464			ctx.LogI("tx", les, logMsg)
465		} else {
466			ctx.LogE("tx", les, err, logMsg)
467		}
468		return err
469	}
470
471	br := bufio.NewReader(reader)
472	var sizeFull int64
473	var chunkNum int
474	checksums := [][MTHSize]byte{}
475	for {
476		lr := io.LimitReader(br, chunkSize)
477		path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum)
478		pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
479		if err != nil {
480			return err
481		}
482		hsh := MTHNew(0, 0)
483		_, size, err := ctx.Tx(
484			node, pkt, nice,
485			0, minSize, maxSize,
486			io.TeeReader(lr, hsh),
487			path, areaId,
488		)
489
490		les := LEs{
491			{"Type", "file"},
492			{"Node", node.Id},
493			{"Nice", int(nice)},
494			{"Src", srcPath},
495			{"Dst", path},
496			{"Size", size},
497		}
498		logMsg := func(les LEs) string {
499			return fmt.Sprintf(
500				"File %s (%s) sent to %s:%s",
501				srcPath,
502				humanize.IBytes(uint64(size)),
503				ctx.NodeName(node.Id),
504				path,
505			)
506		}
507		if err == nil {
508			ctx.LogI("tx", les, logMsg)
509		} else {
510			ctx.LogE("tx", les, err, logMsg)
511			return err
512		}
513
514		sizeFull += size - PktOverhead
515		var checksum [MTHSize]byte
516		hsh.Sum(checksum[:0])
517		checksums = append(checksums, checksum)
518		chunkNum++
519		if size < chunkSize {
520			break
521		}
522		if _, err = br.Peek(1); err != nil {
523			break
524		}
525	}
526
527	metaPkt := ChunkedMeta{
528		Magic:     MagicNNCPMv2.B,
529		FileSize:  uint64(sizeFull),
530		ChunkSize: uint64(chunkSize),
531		Checksums: checksums,
532	}
533	var buf bytes.Buffer
534	_, err = xdr.Marshal(&buf, metaPkt)
535	if err != nil {
536		return err
537	}
538	path := dstPath + ChunkedSuffixMeta
539	pkt, err := NewPkt(PktTypeFile, nice, []byte(path))
540	if err != nil {
541		return err
542	}
543	metaPktSize := int64(buf.Len())
544	_, _, err = ctx.Tx(
545		node,
546		pkt,
547		nice,
548		metaPktSize, minSize, maxSize,
549		&buf, path, areaId,
550	)
551	les := LEs{
552		{"Type", "file"},
553		{"Node", node.Id},
554		{"Nice", int(nice)},
555		{"Src", srcPath},
556		{"Dst", path},
557		{"Size", metaPktSize},
558	}
559	logMsg := func(les LEs) string {
560		return fmt.Sprintf(
561			"File %s (%s) sent to %s:%s",
562			srcPath,
563			humanize.IBytes(uint64(metaPktSize)),
564			ctx.NodeName(node.Id),
565			path,
566		)
567	}
568	if err == nil {
569		ctx.LogI("tx", les, logMsg)
570	} else {
571		ctx.LogE("tx", les, err, logMsg)
572	}
573	return err
574}
575
576func (ctx *Ctx) TxFreq(
577	node *Node,
578	nice, replyNice uint8,
579	srcPath, dstPath string,
580	minSize int64,
581) error {
582	dstPath = filepath.Clean(dstPath)
583	if filepath.IsAbs(dstPath) {
584		return errors.New("Relative destination path required")
585	}
586	srcPath = filepath.Clean(srcPath)
587	if filepath.IsAbs(srcPath) {
588		return errors.New("Relative source path required")
589	}
590	pkt, err := NewPkt(PktTypeFreq, replyNice, []byte(srcPath))
591	if err != nil {
592		return err
593	}
594	src := strings.NewReader(dstPath)
595	size := int64(src.Len())
596	_, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil)
597	les := LEs{
598		{"Type", "freq"},
599		{"Node", node.Id},
600		{"Nice", int(nice)},
601		{"ReplyNice", int(replyNice)},
602		{"Src", srcPath},
603		{"Dst", dstPath},
604	}
605	logMsg := func(les LEs) string {
606		return fmt.Sprintf(
607			"File request from %s:%s to %s sent",
608			ctx.NodeName(node.Id), srcPath,
609			dstPath,
610		)
611	}
612	if err == nil {
613		ctx.LogI("tx", les, logMsg)
614	} else {
615		ctx.LogE("tx", les, err, logMsg)
616	}
617	return err
618}
619
620func (ctx *Ctx) TxExec(
621	node *Node,
622	nice, replyNice uint8,
623	handle string,
624	args []string,
625	in io.Reader,
626	minSize int64, maxSize int64,
627	noCompress bool,
628	areaId *AreaId,
629) error {
630	path := make([][]byte, 0, 1+len(args))
631	path = append(path, []byte(handle))
632	for _, arg := range args {
633		path = append(path, []byte(arg))
634	}
635	pktType := PktTypeExec
636	if noCompress {
637		pktType = PktTypeExecFat
638	}
639	pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0}))
640	if err != nil {
641		return err
642	}
643	compressErr := make(chan error, 1)
644	if !noCompress {
645		pr, pw := io.Pipe()
646		compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault))
647		if err != nil {
648			return err
649		}
650		go func(r io.Reader) {
651			if _, err := io.Copy(compressor, r); err != nil {
652				compressErr <- err
653				return
654			}
655			compressErr <- compressor.Close()
656			pw.Close()
657		}(in)
658		in = pr
659	}
660	_, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId)
661	if !noCompress {
662		e := <-compressErr
663		if err == nil {
664			err = e
665		}
666	}
667	dst := strings.Join(append([]string{handle}, args...), " ")
668	les := LEs{
669		{"Type", "exec"},
670		{"Node", node.Id},
671		{"Nice", int(nice)},
672		{"ReplyNice", int(replyNice)},
673		{"Dst", dst},
674		{"Size", size},
675	}
676	logMsg := func(les LEs) string {
677		return fmt.Sprintf(
678			"Exec sent to %s@%s (%s)",
679			ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
680		)
681	}
682	if err == nil {
683		ctx.LogI("tx", les, logMsg)
684	} else {
685		ctx.LogE("tx", les, err, logMsg)
686	}
687	return err
688}
689
690func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
691	les := LEs{
692		{"Type", "trns"},
693		{"Node", node.Id},
694		{"Nice", int(nice)},
695		{"Size", size},
696	}
697	logMsg := func(les LEs) string {
698		return fmt.Sprintf(
699			"Transitional packet to %s (%s) (nice %s)",
700			ctx.NodeName(node.Id),
701			humanize.IBytes(uint64(size)),
702			NicenessFmt(nice),
703		)
704	}
705	ctx.LogD("tx", les, logMsg)
706	if !ctx.IsEnoughSpace(size) {
707		err := errors.New("is not enough space")
708		ctx.LogE("tx", les, err, logMsg)
709		return err
710	}
711	tmp, err := ctx.NewTmpFileWHash()
712	if err != nil {
713		return err
714	}
715	if _, err = CopyProgressed(
716		tmp.W, src, "Tx trns",
717		LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
718		ctx.ShowPrgrs,
719	); err != nil {
720		return err
721	}
722	nodePath := filepath.Join(ctx.Spool, node.Id.String())
723	err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
724	if err == nil {
725		ctx.LogI("tx", les, logMsg)
726	} else {
727		ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
728	}
729	os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name))
730	return err
731}
732